From b51e4b18ceab4aaac68989d24369490d10127b37 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 16 Nov 2023 11:03:56 +0800 Subject: [PATCH] feat: compact return id and kill compact id --- include/common/tmsg.h | 19 ++++++++- include/common/tmsgdef.h | 1 + source/common/src/tmsg.c | 54 ++++++++++++++++++++++++++ source/libs/nodes/src/nodesUtilFuncs.c | 2 + source/libs/parser/inc/sql.y | 1 + source/libs/parser/src/parTranslater.c | 9 +++++ 6 files changed, 85 insertions(+), 1 deletion(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 86d34502c6..d43b85c261 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -335,6 +335,7 @@ typedef enum ENodeType { QUERY_NODE_KILL_CONNECTION_STMT, QUERY_NODE_KILL_QUERY_STMT, QUERY_NODE_KILL_TRANSACTION_STMT, + QUERY_NODE_KILL_COMPACT_STMT, QUERY_NODE_DELETE_STMT, QUERY_NODE_INSERT_STMT, QUERY_NODE_QUERY, @@ -1380,6 +1381,21 @@ int32_t tSerializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq); int32_t tDeserializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq); void tFreeSCompactDbReq(SCompactDbReq *pReq); +typedef struct { + int64_t compactId; + int8_t bAccepted; +} SCompactDbRsp; + +int32_t tSerializeSCompactDbRsp(void* buf, int32_t bufLen, SCompactDbRsp* pRsp); +int32_t tDeserializeSCompactDbRsp(void* buf, int32_t bufLen, SCompactDbRsp* pRsp); + +typedef struct { + int64_t compactId; +} SKillCompactReq; + +int32_t tSerializeSKillCompactReq(void* buf, int32_t bufLen, SKillCompactReq* pReq); +int32_t tDeserializeSKillCompactReq(void* buf, int32_t bufLen, SKillCompactReq* pReqp); + typedef struct { char name[TSDB_FUNC_NAME_LEN]; int8_t igExists; @@ -1886,8 +1902,9 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; char tb[TSDB_TABLE_NAME_LEN]; char user[TSDB_USER_LEN]; - char filterTb[TSDB_TABLE_NAME_LEN]; + char filterTb[TSDB_TABLE_NAME_LEN]; // for ins_columns int64_t showId; + int64_t compactId; // for compact } SRetrieveTableReq; typedef struct SSysTableSchema { diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 4eb8328caa..a1cc6a8739 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -192,6 +192,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_MND_CREATE_VIEW, "create-view", SCMCreateViewReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_VIEW, "drop-view", SCMDropViewReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_VIEW_META, "view-meta", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_KILL_COMPACT, "kill-compact", SKillCompactReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_MSG) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index dc3ba7934f..8bc9889d35 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3439,6 +3439,58 @@ void tFreeSCompactDbReq(SCompactDbReq *pReq) { FREESQL(); } +int32_t tSerializeSCompactDbRsp(void *buf, int32_t bufLen, SCompactDbRsp *pRsp) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI64(&encoder, pRsp->compactId) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->bAccepted) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSCompactDbRsp(void *buf, int32_t bufLen, SCompactDbRsp *pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI64(&decoder, &pRsp->compactId) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->bAccepted) < 0) return -1; + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +int32_t tSerializeSKillCompactReq(void *buf, int32_t bufLen, SKillCompactReq *pRsp) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI64(&encoder, pRsp->compactId) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSKillCompactReq(void *buf, int32_t bufLen, SKillCompactReq *pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI64(&decoder, &pRsp->compactId) < 0) return -1; + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + int32_t tSerializeSUseDbRspImp(SEncoder *pEncoder, const SUseDbRsp *pRsp) { if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->uid) < 0) return -1; @@ -4274,6 +4326,7 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq if (tEncodeCStr(&encoder, pReq->tb) < 0) return -1; if (tEncodeCStr(&encoder, pReq->filterTb) < 0) return -1; if (tEncodeCStr(&encoder, pReq->user) < 0) return -1; + if (tEncodeI64(&encoder, pReq->compactId) <0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -4291,6 +4344,7 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR if (tDecodeCStrTo(&decoder, pReq->tb) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->filterTb) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->compactId) <0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index d167d81c82..311199cb18 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -460,6 +460,7 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SKillQueryStmt)); case QUERY_NODE_KILL_TRANSACTION_STMT: case QUERY_NODE_KILL_CONNECTION_STMT: + case QUERY_NODE_KILL_COMPACT_STMT: return makeNode(type, sizeof(SKillStmt)); case QUERY_NODE_DELETE_STMT: return makeNode(type, sizeof(SDeleteStmt)); @@ -1097,6 +1098,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_KILL_CONNECTION_STMT: // no pointer field case QUERY_NODE_KILL_QUERY_STMT: // no pointer field case QUERY_NODE_KILL_TRANSACTION_STMT: // no pointer field + case QUERY_NODE_KILL_COMPACT_STMT: // no pointer field break; case QUERY_NODE_DELETE_STMT: { SDeleteStmt* pStmt = (SDeleteStmt*)pNode; diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 9bcf65dbbe..620fd31a97 100755 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -700,6 +700,7 @@ ignore_opt(A) ::= IGNORE UNTREATED. cmd ::= KILL CONNECTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_CONNECTION_STMT, &A); } cmd ::= KILL QUERY NK_STRING(A). { pCxt->pRootNode = createKillQueryStmt(pCxt, &A); } cmd ::= KILL TRANSACTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_TRANSACTION_STMT, &A); } +cmd ::= KILL COMPACT NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_COMPACT_STMT, &A); } /************************************************ merge/redistribute/ vgroup ******************************************/ cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index a55bd5663f..5260e4a2bb 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -7087,6 +7087,12 @@ static int32_t translateKillConnection(STranslateContext* pCxt, SKillStmt* pStmt return buildCmdMsg(pCxt, TDMT_MND_KILL_CONN, (FSerializeFunc)tSerializeSKillConnReq, &killReq); } +static int32_t translateKillCompact(STranslateContext* pCxt, SKillStmt* pStmt) { + SKillCompactReq killReq = {0}; + killReq.compactId = pStmt->targetId; + return buildCmdMsg(pCxt, TDMT_MND_KILL_COMPACT, (FSerializeFunc)tSerializeSKillCompactReq, &killReq); +} + static int32_t translateKillQuery(STranslateContext* pCxt, SKillQueryStmt* pStmt) { SKillQueryReq killReq = {0}; strcpy(killReq.queryStrId, pStmt->queryId); @@ -8473,6 +8479,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { case QUERY_NODE_KILL_CONNECTION_STMT: code = translateKillConnection(pCxt, (SKillStmt*)pNode); break; + case QUERY_NODE_KILL_COMPACT_STMT: + code = translateKillCompact(pCxt, (SKillStmt*)pNode); + break; case QUERY_NODE_KILL_QUERY_STMT: code = translateKillQuery(pCxt, (SKillQueryStmt*)pNode); break;