more code

This commit is contained in:
Hongze Cheng 2024-11-08 11:13:37 +08:00
parent 48f1fbe20b
commit f3de4bb2d8
8 changed files with 50 additions and 131 deletions

View File

@ -1613,6 +1613,7 @@ typedef struct {
STimeWindow timeRange;
int32_t sqlLen;
char* sql;
SArray* vgroupIds;
} SCompactDbReq;
int32_t tSerializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq);
@ -1627,17 +1628,6 @@ typedef struct {
int32_t tSerializeSCompactDbRsp(void* buf, int32_t bufLen, SCompactDbRsp* pRsp);
int32_t tDeserializeSCompactDbRsp(void* buf, int32_t bufLen, SCompactDbRsp* pRsp);
typedef struct {
SArray* vgroupIds;
STimeWindow timeRange;
int32_t sqlLen;
char* sql;
} SCompactVgroupsReq;
int32_t tSerializeSCompactVgroupsReq(void* buf, int32_t bufLen, SCompactVgroupsReq* pReq);
int32_t tDeserializeSCompactVgroupsReq(void* buf, int32_t bufLen, SCompactVgroupsReq* pReq);
void tFreeSCompactVgroupsReq(SCompactVgroupsReq* pReq);
typedef struct {
int32_t compactId;
int32_t sqlLen;

View File

@ -260,7 +260,6 @@
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_TASK_RESET, "stream-reset-tasks", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUDIT, "audit", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_VGROUPS, "compact-vgroups", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8

View File

@ -845,8 +845,7 @@ __async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
case TDMT_MND_SHOW_VARIABLES:
return processShowVariablesRsp;
case TDMT_MND_COMPACT_DB:
case TDMT_MND_COMPACT_VGROUPS:
return processCompactDbRsp;
return processCompactDbRsp;
default:
return genericRspCallback;
}

View File

@ -4521,6 +4521,17 @@ int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq)
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.skey));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timeRange.ekey));
ENCODESQL();
// encode vgroup list
int32_t numOfVgroups = taosArrayGetSize(pReq->vgroupIds);
TAOS_CHECK_EXIT(tEncodeI32(&encoder, numOfVgroups));
if (numOfVgroups > 0) {
for (int32_t i = 0; i < numOfVgroups; ++i) {
int64_t vgid = *(int64_t *)taosArrayGet(pReq->vgroupIds, i);
TAOS_CHECK_EXIT(tEncodeI64v(&encoder, vgid));
}
}
tEndEncode(&encoder);
_exit:
@ -4544,6 +4555,26 @@ int32_t tDeserializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.skey));
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timeRange.ekey));
DECODESQL();
// decode vgroup list
if (!tDecodeIsEnd(&decoder)) {
int32_t numOfVgroups = 0;
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfVgroups));
if (numOfVgroups > 0) {
pReq->vgroupIds = taosArrayInit(numOfVgroups, sizeof(int64_t));
if (NULL == pReq->vgroupIds) {
TAOS_CHECK_EXIT(terrno);
}
for (int32_t i = 0; i < numOfVgroups; ++i) {
int64_t vgid;
TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &vgid));
if (taosArrayPush(pReq->vgroupIds, &vgid) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
}
}
}
tEndDecode(&decoder);
_exit:
@ -4551,7 +4582,11 @@ _exit:
return code;
}
void tFreeSCompactDbReq(SCompactDbReq *pReq) { FREESQL(); }
void tFreeSCompactDbReq(SCompactDbReq *pReq) {
FREESQL();
taosArrayDestroy(pReq->vgroupIds);
pReq->vgroupIds = NULL;
}
int32_t tSerializeSCompactDbRsp(void *buf, int32_t bufLen, SCompactDbRsp *pRsp) {
SEncoder encoder = {0};
@ -4591,109 +4626,6 @@ _exit:
return code;
}
int32_t tSerializeSCompactVgroupsReq(void *buf, int32_t bufLen, SCompactVgroupsReq *pReq) {
int32_t code = TSDB_CODE_SUCCESS;
SEncoder encoder = {0};
int32_t lino;
int32_t tlen;
tEncoderInit(&encoder, buf, bufLen);
code = tStartEncode(&encoder);
TSDB_CHECK_CODE(code, lino, _exit);
// encode vgid list
code = tEncodeI32(&encoder, taosArrayGetSize(pReq->vgroupIds));
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t i = 0; i < taosArrayGetSize(pReq->vgroupIds); ++i) {
int64_t vgid = *(int64_t *)taosArrayGet(pReq->vgroupIds, i);
code = tEncodeI64v(&encoder, vgid);
TSDB_CHECK_CODE(code, lino, _exit);
}
// encode time range
code = tEncodeI64(&encoder, pReq->timeRange.skey);
TSDB_CHECK_CODE(code, lino, _exit);
code = tEncodeI64(&encoder, pReq->timeRange.ekey);
TSDB_CHECK_CODE(code, lino, _exit);
// encode sql
ENCODESQL();
tEndEncode(&encoder);
_exit:
if (code) {
tlen = code;
} else {
tlen = encoder.pos;
}
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSCompactVgroupsReq(void *buf, int32_t bufLen, SCompactVgroupsReq *pReq) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino;
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
code = tStartDecode(&decoder);
TSDB_CHECK_CODE(code, lino, _exit);
// decode vgid list
int32_t vgidNum = 0;
code = tDecodeI32(&decoder, &vgidNum);
TSDB_CHECK_CODE(code, lino, _exit);
pReq->vgroupIds = taosArrayInit(vgidNum, sizeof(int64_t));
if (NULL == pReq->vgroupIds) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
for (int32_t i = 0; i < vgidNum; ++i) {
int64_t vgid;
code = tDecodeI64v(&decoder, &vgid);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayPush(pReq->vgroupIds, &vgid) == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
}
// decode time range
code = tDecodeI64(&decoder, &pReq->timeRange.skey);
TSDB_CHECK_CODE(code, lino, _exit);
code = tDecodeI64(&decoder, &pReq->timeRange.ekey);
TSDB_CHECK_CODE(code, lino, _exit);
// decode sql
DECODESQL();
tEndDecode(&decoder);
_exit:
tDecoderClear(&decoder);
if (code) {
tFreeSCompactVgroupsReq(pReq);
}
return code;
}
void tFreeSCompactVgroupsReq(SCompactVgroupsReq *pReq) {
if (pReq->vgroupIds) {
taosArrayDestroy(pReq->vgroupIds);
pReq->vgroupIds = NULL;
}
FREESQL();
}
int32_t tSerializeSKillCompactReq(void *buf, int32_t bufLen, SKillCompactReq *pReq) {
SEncoder encoder = {0};
int32_t code = 0;

View File

@ -149,7 +149,6 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_VGROUPS, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_TRIM_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_S3MIGRATE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -37,7 +37,6 @@ const char *mndGetDbStr(const char *src);
const char *mndGetStableStr(const char *src);
int32_t mndProcessCompactDbReq(SRpcMsg *pReq);
int32_t mndProcessCompactVgroupsReq(SRpcMsg *pReq);
int32_t mndCheckDbDnodeList(SMnode *pMnode, char *db, char *dnodeListStr, SArray *dnodeList);
#ifdef __cplusplus

View File

@ -57,7 +57,6 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq);
#ifndef TD_ENTERPRISE
int32_t mndProcessCompactDbReq(SRpcMsg *pReq) { return TSDB_CODE_OPS_NOT_SUPPORT; }
int32_t mndProcessCompactVgroupsReq(SRpcMsg *pReq) { return TSDB_CODE_OPS_NOT_SUPPORT; }
#endif
int32_t mndInitDb(SMnode *pMnode) {
@ -77,7 +76,6 @@ int32_t mndInitDb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_DROP_DB, mndProcessDropDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_USE_DB, mndProcessUseDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_VGROUPS, mndProcessCompactVgroupsReq);
mndSetMsgHandle(pMnode, TDMT_MND_TRIM_DB, mndProcessTrimDbReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq);
mndSetMsgHandle(pMnode, TDMT_MND_S3MIGRATE_DB, mndProcessS3MigrateDbReq);

View File

@ -8080,10 +8080,6 @@ static int32_t fillCmdSql(STranslateContext* pCxt, int16_t msgType, void* pReq)
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SCompactDbReq, pReq);
break;
}
case TDMT_MND_COMPACT_VGROUPS: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SCompactVgroupsReq, pReq);
break;
}
case TDMT_MND_TMQ_DROP_TOPIC: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMDropTopicReq, pReq);
@ -10485,11 +10481,14 @@ static int32_t translateVgroupList(STranslateContext* pCxt, SNodeList* vgroupLis
}
static int32_t translateCompactVgroups(STranslateContext* pCxt, SCompactVgroupsStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS;
SCompactVgroupsReq req = {0};
int32_t code = TSDB_CODE_SUCCESS;
SName name;
SCompactDbReq req = {0};
code = tNameSetDbName(&name, pCxt->pParseCxt->acctId, ((SValueNode*)pStmt->pDbName)->literal,
strlen(((SValueNode*)pStmt->pDbName)->literal));
if (TSDB_CODE_SUCCESS == code) {
code = translateVgroupList(pCxt, pStmt->vgidList, &req.vgroupIds);
(void)tNameGetFullDbName(&name, req.db);
}
if (TSDB_CODE_SUCCESS == code) {
@ -10498,10 +10497,14 @@ static int32_t translateCompactVgroups(STranslateContext* pCxt, SCompactVgroupsS
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_COMPACT_VGROUPS, (FSerializeFunc)tSerializeSCompactVgroupsReq, &req);
code = translateVgroupList(pCxt, pStmt->vgidList, &req.vgroupIds);
}
tFreeSCompactVgroupsReq(&req);
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_COMPACT_DB, (FSerializeFunc)tSerializeSCompactDbReq, &req);
}
tFreeSCompactDbReq(&req);
return code;
}