From a72c49495bacb85492f0b4c492d13b673462cd10 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 5 Nov 2024 16:14:55 +0800 Subject: [PATCH] more code --- include/common/tmsg.h | 11 +++ include/common/tmsgdef.h | 1 + source/common/src/tmsg.c | 103 +++++++++++++++++++++++++ source/libs/parser/src/parTranslater.c | 64 +++++++++++---- 4 files changed, 164 insertions(+), 15 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7f6bd3ba87..b07678d24e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1623,6 +1623,17 @@ typedef struct { int32_t tSerializeSCompactDbRsp(void* buf, int32_t bufLen, SCompactDbRsp* pRsp); int32_t tDeserializeSCompactDbRsp(void* buf, int32_t bufLen, SCompactDbRsp* pRsp); +typedef struct { + SArray* vgroupIds; + STimeWindow timeRange; + int32_t sqlLen; + char* sql; +} SCompactVgroupsReq; + +int32_t tSerializeSCompactVgroupsReq(void* buf, int32_t bufLen, SCompactVgroupsReq* pReq); +int32_t tDeserializeSCompactVgroupsReq(void* buf, int32_t bufLen, SCompactVgroupsReq* pReq); +void tFreeSCompactVgroupsReq(SCompactVgroupsReq* pReq); + typedef struct { int32_t compactId; int32_t sqlLen; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 2c797e39bf..ce72757e18 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -259,6 +259,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_STREAM_DROP_ORPHANTASKS, "stream-drop-orphan-tasks", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_TASK_RESET, "stream-reset-tasks", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_VGROUPS, "compact-vgroups", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9b54da2c30..8196d6fa30 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4530,6 +4530,109 @@ _exit: return code; } +int32_t tSerializeSCompactVgroupsReq(void *buf, int32_t bufLen, SCompactVgroupsReq *pReq) { + int32_t code = TSDB_CODE_SUCCESS; + SEncoder encoder = {0}; + int32_t lino; + int32_t tlen; + + tEncoderInit(&encoder, buf, bufLen); + + code = tStartEncode(&encoder); + TSDB_CHECK_CODE(code, lino, _exit); + + // encode vgid list + code = tEncodeI32(&encoder, taosArrayGetSize(pReq->vgroupIds)); + TSDB_CHECK_CODE(code, lino, _exit); + + for (int32_t i = 0; i < taosArrayGetSize(pReq->vgroupIds); ++i) { + int32_t vgid = *(int32_t *)taosArrayGet(pReq->vgroupIds, i); + code = tEncodeI32(&encoder, vgid); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // encode time range + code = tEncodeI64(&encoder, pReq->timeRange.skey); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tEncodeI64(&encoder, pReq->timeRange.ekey); + TSDB_CHECK_CODE(code, lino, _exit); + + // encode sql + ENCODESQL(); + + tEndEncode(&encoder); + +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSCompactVgroupsReq(void *buf, int32_t bufLen, SCompactVgroupsReq *pReq) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino; + SDecoder decoder = {0}; + + tDecoderInit(&decoder, buf, bufLen); + + code = tStartDecode(&decoder); + TSDB_CHECK_CODE(code, lino, _exit); + + // decode vgid list + int32_t vgidNum = 0; + code = tDecodeI32(&decoder, &vgidNum); + TSDB_CHECK_CODE(code, lino, _exit); + + pReq->vgroupIds = taosArrayInit(vgidNum, sizeof(int32_t)); + if (NULL == pReq->vgroupIds) { + TSDB_CHECK_CODE(code = terrno, lino, _exit); + } + + for (int32_t i = 0; i < vgidNum; ++i) { + int32_t vgid; + + code = tDecodeI32(&decoder, &vgid); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosArrayPush(pReq->vgroupIds, &vgid) == NULL) { + TSDB_CHECK_CODE(code = terrno, lino, _exit); + } + } + + // decode time range + code = tDecodeI64(&decoder, &pReq->timeRange.skey); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tDecodeI64(&decoder, &pReq->timeRange.ekey); + TSDB_CHECK_CODE(code, lino, _exit); + + // decode sql + DECODESQL(); + + tEndDecode(&decoder); + +_exit: + tDecoderClear(&decoder); + if (code) { + tFreeSCompactVgroupsReq(pReq); + } + return code; +} + +void tFreeSCompactVgroupsReq(SCompactVgroupsReq *pReq) { + if (pReq->vgroupIds) { + taosArrayDestroy(pReq->vgroupIds); + pReq->vgroupIds = NULL; + } + + FREESQL(); +} + int32_t tSerializeSKillCompactReq(void *buf, int32_t bufLen, SKillCompactReq *pReq) { SEncoder encoder = {0}; int32_t code = 0; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 5c9202298d..627c4a45e2 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10396,27 +10396,28 @@ static int32_t translateDescribe(STranslateContext* pCxt, SDescribeStmt* pStmt) return code; } -static int32_t translateCompactRange(STranslateContext* pCxt, SCompactDatabaseStmt* pStmt, SCompactDbReq* pReq) { +static int32_t translateCompactRange(STranslateContext* pCxt, const char* dbName, SNode* pStart, SNode* pEnd, + STimeWindow* timeRange) { SDbCfgInfo dbCfg = {0}; - int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg); - if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pStart) { - ((SValueNode*)pStmt->pStart)->node.resType.precision = dbCfg.precision; - ((SValueNode*)pStmt->pStart)->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; - code = doTranslateValue(pCxt, (SValueNode*)pStmt->pStart); + int32_t code = getDBCfg(pCxt, dbName, &dbCfg); + if (TSDB_CODE_SUCCESS == code && NULL != pStart) { + ((SValueNode*)pStart)->node.resType.precision = dbCfg.precision; + ((SValueNode*)pStart)->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; + code = doTranslateValue(pCxt, (SValueNode*)pStart); } - if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pEnd) { - ((SValueNode*)pStmt->pEnd)->node.resType.precision = dbCfg.precision; - ((SValueNode*)pStmt->pEnd)->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; - code = doTranslateValue(pCxt, (SValueNode*)pStmt->pEnd); + if (TSDB_CODE_SUCCESS == code && NULL != pEnd) { + ((SValueNode*)pEnd)->node.resType.precision = dbCfg.precision; + ((SValueNode*)pEnd)->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; + code = doTranslateValue(pCxt, (SValueNode*)pEnd); } if (TSDB_CODE_SUCCESS == code) { - pReq->timeRange.skey = NULL != pStmt->pStart ? ((SValueNode*)pStmt->pStart)->datum.i : INT64_MIN; - pReq->timeRange.ekey = NULL != pStmt->pEnd ? ((SValueNode*)pStmt->pEnd)->datum.i : INT64_MAX; + timeRange->skey = NULL != pStart ? ((SValueNode*)pStart)->datum.i : INT64_MIN; + timeRange->ekey = NULL != pEnd ? ((SValueNode*)pEnd)->datum.i : INT64_MAX; } return code; } -static int32_t translateCompact(STranslateContext* pCxt, SCompactDatabaseStmt* pStmt) { +static int32_t translateCompactDb(STranslateContext* pCxt, SCompactDatabaseStmt* pStmt) { SCompactDbReq compactReq = {0}; SName name; int32_t code = TSDB_CODE_SUCCESS; @@ -10424,7 +10425,7 @@ static int32_t translateCompact(STranslateContext* pCxt, SCompactDatabaseStmt* p if (TSDB_CODE_SUCCESS != code) return code; (void)tNameGetFullDbName(&name, compactReq.db); - code = translateCompactRange(pCxt, pStmt, &compactReq); + code = translateCompactRange(pCxt, pStmt->dbName, pStmt->pStart, pStmt->pEnd, &compactReq.timeRange); if (TSDB_CODE_SUCCESS == code) { code = buildCmdMsg(pCxt, TDMT_MND_COMPACT_DB, (FSerializeFunc)tSerializeSCompactDbReq, &compactReq); } @@ -10432,6 +10433,35 @@ static int32_t translateCompact(STranslateContext* pCxt, SCompactDatabaseStmt* p return code; } +static int32_t translateVgroupList(STranslateContext* pCxt, SNodeList* vgroupList, SArray** ppVgroups) { + int32_t code = TSDB_CODE_SUCCESS; + + // TODO + ASSERT(0); + + return code; +} + +static int32_t translateCompactVgroups(STranslateContext* pCxt, SCompactVgroupsStmt* pStmt) { + int32_t code = TSDB_CODE_SUCCESS; + SCompactVgroupsReq req = {0}; + + if (TSDB_CODE_SUCCESS == code) { + code = translateVgroupList(pCxt, pStmt->vgidList, &req.vgroupIds); + } + + if (TSDB_CODE_SUCCESS == code) { + code = translateCompactRange(pCxt, NULL /* TODO */, pStmt->pStart, pStmt->pEnd, &req.timeRange); + } + + if (TSDB_CODE_SUCCESS == code) { + code = buildCmdMsg(pCxt, TDMT_MND_COMPACT_VGROUPS, (FSerializeFunc)tSerializeSCompactVgroupsReq, &req); + } + + tFreeSCompactVgroupsReq(&req); + return code; +} + static int32_t translateKillConnection(STranslateContext* pCxt, SKillStmt* pStmt) { SKillConnReq killReq = {0}; killReq.connId = pStmt->targetId; @@ -12720,8 +12750,10 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { code = translateDescribe(pCxt, (SDescribeStmt*)pNode); break; case QUERY_NODE_COMPACT_DATABASE_STMT: - code = translateCompact(pCxt, (SCompactDatabaseStmt*)pNode); + code = translateCompactDb(pCxt, (SCompactDatabaseStmt*)pNode); break; + case QUERY_NODE_COMPACT_VGROUPS_STMT: + code = translateCompactVgroups(pCxt, (SCompactVgroupsStmt*)pNode); case QUERY_NODE_ALTER_CLUSTER_STMT: code = translateAlterCluster(pCxt, (SAlterClusterStmt*)pNode); break; @@ -13039,6 +13071,7 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS case QUERY_NODE_SHOW_VARIABLES_STMT: return extractShowVariablesResultSchema(numOfCols, pSchema); case QUERY_NODE_COMPACT_DATABASE_STMT: + case QUERY_NODE_COMPACT_VGROUPS_STMT: return extractCompactDbResultSchema(numOfCols, pSchema); default: break; @@ -16222,6 +16255,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { break; case QUERY_NODE_SHOW_VARIABLES_STMT: case QUERY_NODE_COMPACT_DATABASE_STMT: + case QUERY_NODE_COMPACT_VGROUPS_STMT: pQuery->haveResultSet = true; pQuery->execMode = QUERY_EXEC_MODE_RPC; if (NULL != pCxt->pCmdMsg) {