more code

This commit is contained in:
Hongze Cheng 2024-11-05 16:14:55 +08:00
parent cefc1832e2
commit a72c49495b
4 changed files with 164 additions and 15 deletions

View File

@ -1623,6 +1623,17 @@ typedef struct {
int32_t tSerializeSCompactDbRsp(void* buf, int32_t bufLen, SCompactDbRsp* pRsp);
int32_t tDeserializeSCompactDbRsp(void* buf, int32_t bufLen, SCompactDbRsp* pRsp);
typedef struct {
SArray* vgroupIds;
STimeWindow timeRange;
int32_t sqlLen;
char* sql;
} SCompactVgroupsReq;
int32_t tSerializeSCompactVgroupsReq(void* buf, int32_t bufLen, SCompactVgroupsReq* pReq);
int32_t tDeserializeSCompactVgroupsReq(void* buf, int32_t bufLen, SCompactVgroupsReq* pReq);
void tFreeSCompactVgroupsReq(SCompactVgroupsReq* pReq);
typedef struct {
int32_t compactId;
int32_t sqlLen;

View File

@ -259,6 +259,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_DROP_ORPHANTASKS, "stream-drop-orphan-tasks", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_TASK_RESET, "stream-reset-tasks", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_VGROUPS, "compact-vgroups", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8

View File

@ -4530,6 +4530,109 @@ _exit:
return code;
}
int32_t tSerializeSCompactVgroupsReq(void *buf, int32_t bufLen, SCompactVgroupsReq *pReq) {
int32_t code = TSDB_CODE_SUCCESS;
SEncoder encoder = {0};
int32_t lino;
int32_t tlen;
tEncoderInit(&encoder, buf, bufLen);
code = tStartEncode(&encoder);
TSDB_CHECK_CODE(code, lino, _exit);
// encode vgid list
code = tEncodeI32(&encoder, taosArrayGetSize(pReq->vgroupIds));
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t i = 0; i < taosArrayGetSize(pReq->vgroupIds); ++i) {
int32_t vgid = *(int32_t *)taosArrayGet(pReq->vgroupIds, i);
code = tEncodeI32(&encoder, vgid);
TSDB_CHECK_CODE(code, lino, _exit);
}
// encode time range
code = tEncodeI64(&encoder, pReq->timeRange.skey);
TSDB_CHECK_CODE(code, lino, _exit);
code = tEncodeI64(&encoder, pReq->timeRange.ekey);
TSDB_CHECK_CODE(code, lino, _exit);
// encode sql
ENCODESQL();
tEndEncode(&encoder);
_exit:
if (code) {
tlen = code;
} else {
tlen = encoder.pos;
}
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSCompactVgroupsReq(void *buf, int32_t bufLen, SCompactVgroupsReq *pReq) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino;
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
code = tStartDecode(&decoder);
TSDB_CHECK_CODE(code, lino, _exit);
// decode vgid list
int32_t vgidNum = 0;
code = tDecodeI32(&decoder, &vgidNum);
TSDB_CHECK_CODE(code, lino, _exit);
pReq->vgroupIds = taosArrayInit(vgidNum, sizeof(int32_t));
if (NULL == pReq->vgroupIds) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
for (int32_t i = 0; i < vgidNum; ++i) {
int32_t vgid;
code = tDecodeI32(&decoder, &vgid);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayPush(pReq->vgroupIds, &vgid) == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
}
// decode time range
code = tDecodeI64(&decoder, &pReq->timeRange.skey);
TSDB_CHECK_CODE(code, lino, _exit);
code = tDecodeI64(&decoder, &pReq->timeRange.ekey);
TSDB_CHECK_CODE(code, lino, _exit);
// decode sql
DECODESQL();
tEndDecode(&decoder);
_exit:
tDecoderClear(&decoder);
if (code) {
tFreeSCompactVgroupsReq(pReq);
}
return code;
}
void tFreeSCompactVgroupsReq(SCompactVgroupsReq *pReq) {
if (pReq->vgroupIds) {
taosArrayDestroy(pReq->vgroupIds);
pReq->vgroupIds = NULL;
}
FREESQL();
}
int32_t tSerializeSKillCompactReq(void *buf, int32_t bufLen, SKillCompactReq *pReq) {
SEncoder encoder = {0};
int32_t code = 0;

View File

@ -10396,27 +10396,28 @@ static int32_t translateDescribe(STranslateContext* pCxt, SDescribeStmt* pStmt)
return code;
}
static int32_t translateCompactRange(STranslateContext* pCxt, SCompactDatabaseStmt* pStmt, SCompactDbReq* pReq) {
static int32_t translateCompactRange(STranslateContext* pCxt, const char* dbName, SNode* pStart, SNode* pEnd,
STimeWindow* timeRange) {
SDbCfgInfo dbCfg = {0};
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pStart) {
((SValueNode*)pStmt->pStart)->node.resType.precision = dbCfg.precision;
((SValueNode*)pStmt->pStart)->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
code = doTranslateValue(pCxt, (SValueNode*)pStmt->pStart);
int32_t code = getDBCfg(pCxt, dbName, &dbCfg);
if (TSDB_CODE_SUCCESS == code && NULL != pStart) {
((SValueNode*)pStart)->node.resType.precision = dbCfg.precision;
((SValueNode*)pStart)->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
code = doTranslateValue(pCxt, (SValueNode*)pStart);
}
if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pEnd) {
((SValueNode*)pStmt->pEnd)->node.resType.precision = dbCfg.precision;
((SValueNode*)pStmt->pEnd)->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
code = doTranslateValue(pCxt, (SValueNode*)pStmt->pEnd);
if (TSDB_CODE_SUCCESS == code && NULL != pEnd) {
((SValueNode*)pEnd)->node.resType.precision = dbCfg.precision;
((SValueNode*)pEnd)->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
code = doTranslateValue(pCxt, (SValueNode*)pEnd);
}
if (TSDB_CODE_SUCCESS == code) {
pReq->timeRange.skey = NULL != pStmt->pStart ? ((SValueNode*)pStmt->pStart)->datum.i : INT64_MIN;
pReq->timeRange.ekey = NULL != pStmt->pEnd ? ((SValueNode*)pStmt->pEnd)->datum.i : INT64_MAX;
timeRange->skey = NULL != pStart ? ((SValueNode*)pStart)->datum.i : INT64_MIN;
timeRange->ekey = NULL != pEnd ? ((SValueNode*)pEnd)->datum.i : INT64_MAX;
}
return code;
}
static int32_t translateCompact(STranslateContext* pCxt, SCompactDatabaseStmt* pStmt) {
static int32_t translateCompactDb(STranslateContext* pCxt, SCompactDatabaseStmt* pStmt) {
SCompactDbReq compactReq = {0};
SName name;
int32_t code = TSDB_CODE_SUCCESS;
@ -10424,7 +10425,7 @@ static int32_t translateCompact(STranslateContext* pCxt, SCompactDatabaseStmt* p
if (TSDB_CODE_SUCCESS != code) return code;
(void)tNameGetFullDbName(&name, compactReq.db);
code = translateCompactRange(pCxt, pStmt, &compactReq);
code = translateCompactRange(pCxt, pStmt->dbName, pStmt->pStart, pStmt->pEnd, &compactReq.timeRange);
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_COMPACT_DB, (FSerializeFunc)tSerializeSCompactDbReq, &compactReq);
}
@ -10432,6 +10433,35 @@ static int32_t translateCompact(STranslateContext* pCxt, SCompactDatabaseStmt* p
return code;
}
static int32_t translateVgroupList(STranslateContext* pCxt, SNodeList* vgroupList, SArray** ppVgroups) {
int32_t code = TSDB_CODE_SUCCESS;
// TODO
ASSERT(0);
return code;
}
static int32_t translateCompactVgroups(STranslateContext* pCxt, SCompactVgroupsStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS;
SCompactVgroupsReq req = {0};
if (TSDB_CODE_SUCCESS == code) {
code = translateVgroupList(pCxt, pStmt->vgidList, &req.vgroupIds);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateCompactRange(pCxt, NULL /* TODO */, pStmt->pStart, pStmt->pEnd, &req.timeRange);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_COMPACT_VGROUPS, (FSerializeFunc)tSerializeSCompactVgroupsReq, &req);
}
tFreeSCompactVgroupsReq(&req);
return code;
}
static int32_t translateKillConnection(STranslateContext* pCxt, SKillStmt* pStmt) {
SKillConnReq killReq = {0};
killReq.connId = pStmt->targetId;
@ -12720,8 +12750,10 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
code = translateDescribe(pCxt, (SDescribeStmt*)pNode);
break;
case QUERY_NODE_COMPACT_DATABASE_STMT:
code = translateCompact(pCxt, (SCompactDatabaseStmt*)pNode);
code = translateCompactDb(pCxt, (SCompactDatabaseStmt*)pNode);
break;
case QUERY_NODE_COMPACT_VGROUPS_STMT:
code = translateCompactVgroups(pCxt, (SCompactVgroupsStmt*)pNode);
case QUERY_NODE_ALTER_CLUSTER_STMT:
code = translateAlterCluster(pCxt, (SAlterClusterStmt*)pNode);
break;
@ -13039,6 +13071,7 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS
case QUERY_NODE_SHOW_VARIABLES_STMT:
return extractShowVariablesResultSchema(numOfCols, pSchema);
case QUERY_NODE_COMPACT_DATABASE_STMT:
case QUERY_NODE_COMPACT_VGROUPS_STMT:
return extractCompactDbResultSchema(numOfCols, pSchema);
default:
break;
@ -16222,6 +16255,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
break;
case QUERY_NODE_SHOW_VARIABLES_STMT:
case QUERY_NODE_COMPACT_DATABASE_STMT:
case QUERY_NODE_COMPACT_VGROUPS_STMT:
pQuery->haveResultSet = true;
pQuery->execMode = QUERY_EXEC_MODE_RPC;
if (NULL != pCxt->pCmdMsg) {