From d9401fd75db59bb21cc7d4ca03a0b1e70585af03 Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 23 Jan 2025 09:37:07 +0000 Subject: [PATCH] feat/TS-5805-add-sql-command --- include/common/tmsg.h | 10 ++++++ include/common/tmsgdef.h | 1 + include/libs/nodes/cmdnodes.h | 4 +++ source/common/src/msg/tmsg.c | 40 +++++++++++++++++++++ source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mnode/impl/src/mndVgroup.c | 26 ++++++++++++++ source/libs/nodes/src/nodesCodeFuncs.c | 6 ++++ source/libs/nodes/src/nodesUtilFuncs.c | 4 +++ source/libs/parser/inc/parAst.h | 1 + source/libs/parser/inc/sql.y | 3 ++ source/libs/parser/src/parAstCreater.c | 12 ++++++- source/libs/parser/src/parTranslater.c | 14 ++++++++ 12 files changed, 121 insertions(+), 1 deletion(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 82eaa2359e..670dc9453f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -359,6 +359,7 @@ typedef enum ENodeType { QUERY_NODE_CREATE_ANODE_STMT, QUERY_NODE_DROP_ANODE_STMT, QUERY_NODE_UPDATE_ANODE_STMT, + QUERY_NODE_ASSIGN_LEADER_STMT, // show statement nodes // see 'sysTableShowAdapter', 'SYSTABLE_SHOW_TYPE_OFFSET' @@ -2653,6 +2654,15 @@ int32_t tSerializeSBalanceVgroupReq(void* buf, int32_t bufLen, SBalanceVgroupReq int32_t tDeserializeSBalanceVgroupReq(void* buf, int32_t bufLen, SBalanceVgroupReq* pReq); void tFreeSBalanceVgroupReq(SBalanceVgroupReq* pReq); +typedef struct { + int32_t useless; // useless + int32_t sqlLen; + char* sql; +} SAssignLeaderReq; + +int32_t tSerializeSAssignLeaderReq(void* buf, int32_t bufLen, SAssignLeaderReq* pReq); +int32_t tDeserializeSAssignLeaderReq(void* buf, int32_t bufLen, SAssignLeaderReq* pReq); +void tFreeSAssignLeaderReq(SAssignLeaderReq* pReq); typedef struct { int32_t vgId1; int32_t vgId2; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 9ea27485c2..4e65a4450f 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -263,6 +263,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_CONFIG, "init-config", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_SDB, "config-sdb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESET_STREAM, "reset-stream", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_ASSIGN_LEADER, "assign-leader", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 26482a87d4..ea869f515e 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -681,6 +681,10 @@ typedef struct SBalanceVgroupStmt { ENodeType type; } SBalanceVgroupStmt; +typedef struct SAssignLeaderStmt { + ENodeType type; +} SAssignLeaderStmt; + typedef struct SBalanceVgroupLeaderStmt { ENodeType type; int32_t vgId; diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index 7a51669d46..c19a885e8a 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -7644,6 +7644,46 @@ _exit: void tFreeSBalanceVgroupReq(SBalanceVgroupReq *pReq) { FREESQL(); } +int32_t tSerializeSAssignLeaderReq(void *buf, int32_t bufLen, SAssignLeaderReq *pReq) { + SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; + tEncoderInit(&encoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->useless)); + ENCODESQL(); + tEndEncode(&encoder); + +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSAssignLeaderReq(void *buf, int32_t bufLen, SAssignLeaderReq *pReq) { + SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->useless)); + DECODESQL(); + tEndDecode(&decoder); + +_exit: + tDecoderClear(&decoder); + return code; +} + +void tFreeSAssignLeaderReq(SAssignLeaderReq *pReq) { FREESQL(); } + int32_t tSerializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceVgroupLeaderReq *pReq) { SEncoder encoder = {0}; int32_t code = 0; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 8f110dbcf3..cb2b5d6897 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -160,6 +160,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_MERGE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SPLIT_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ASSIGN_LEADER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP_LEADER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index e20afb7201..6a3dbb488f 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -46,6 +46,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq); +static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq); int32_t mndInitVgroup(SMnode *pMnode) { SSdbTable table = { @@ -77,6 +78,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg); mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg); + mndSetMsgHandle(pMnode, TDMT_MND_ASSIGN_LEADER, mndProcessAssignLeaderMsg); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup); @@ -3570,6 +3572,30 @@ _OVER: TAOS_RETURN(code); } +static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq){ + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + SArray *pArray = NULL; + void *pIter = NULL; + int64_t curMs = taosGetTimestampMs(); + + SAssignLeaderReq req = {0}; + if (tDeserializeSAssignLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) { + code = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + auditRecord(pReq, pMnode->clusterId, "assignLeader", "", "", req.sql, req.sqlLen); + +_OVER: + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("failed to assign leader since %s", tstrerror(code)); + } + + tFreeSAssignLeaderReq(&req); + TAOS_RETURN(code); +} + bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) { diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index bfe86aa2ac..3e21aad20d 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -199,6 +199,8 @@ const char* nodesNodeName(ENodeType type) { return "ResetStreamStmt"; case QUERY_NODE_BALANCE_VGROUP_STMT: return "BalanceVgroupStmt"; + case QUERY_NODE_ASSIGN_LEADER_STMT: + return "AssignLeaderStmt"; case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: return "BalanceVgroupLeaderStmt"; case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: @@ -8167,6 +8169,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return dropStreamStmtToJson(pObj, pJson); case QUERY_NODE_BALANCE_VGROUP_STMT: return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to serialize. + case QUERY_NODE_ASSIGN_LEADER_STMT: + return TSDB_CODE_SUCCESS; case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: return TSDB_CODE_SUCCESS; // SBalanceVgroupLeaderStmt has no fields to serialize. case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: @@ -8536,6 +8540,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToDropStreamStmt(pJson, pObj); case QUERY_NODE_BALANCE_VGROUP_STMT: return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to deserialize. + case QUERY_NODE_ASSIGN_LEADER_STMT: + return TSDB_CODE_SUCCESS; case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: return TSDB_CODE_SUCCESS; case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index ae5b302d2d..c15753efc9 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -624,6 +624,9 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { case QUERY_NODE_BALANCE_VGROUP_STMT: code = makeNode(type, sizeof(SBalanceVgroupStmt), &pNode); break; + case QUERY_NODE_ASSIGN_LEADER_STMT: + code = makeNode(type, sizeof(SAssignLeaderStmt), &pNode); + break; case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: code = makeNode(type, sizeof(SBalanceVgroupLeaderStmt), &pNode); break; @@ -1497,6 +1500,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_RESUME_STREAM_STMT: // no pointer field case QUERY_NODE_RESET_STREAM_STMT: // no pointer field case QUERY_NODE_BALANCE_VGROUP_STMT: // no pointer field + case QUERY_NODE_ASSIGN_LEADER_STMT: case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: // no pointer field case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: // no pointer field case QUERY_NODE_MERGE_VGROUP_STMT: // no pointer field diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index 387bccf358..7807d519a9 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -309,6 +309,7 @@ SNode* createResetStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, STok SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId); SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId); SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt); +SNode* createAssingLeaderStmt(SAstCreateContext* pCxt); SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt, const SToken* pVgId); SNode* createBalanceVgroupLeaderDBNameStmt(SAstCreateContext* pCxt, const SToken* pDbName); SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 439af13d71..e95fcb64bb 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -860,6 +860,9 @@ cmd ::= KILL COMPACT NK_INTEGER(A). /************************************************ merge/redistribute/ vgroup ******************************************/ cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); } + +cmd ::= ASSIGN LEADER FORCE. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); } + cmd ::= BALANCE VGROUP LEADER on_vgroup_id(A). { pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt, &A); } cmd ::= BALANCE VGROUP LEADER DATABASE db_name(A). { pCxt->pRootNode = createBalanceVgroupLeaderDBNameStmt(pCxt, &A); } cmd ::= MERGE VGROUP NK_INTEGER(A) NK_INTEGER(B). { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &A, &B); } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index c875cbad05..06a7c9d3a5 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -3873,7 +3873,7 @@ _err: SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt) { CHECK_PARSER_STATUS(pCxt); - SBalanceVgroupStmt* pStmt = NULL; + SAssignLeaderStmt* pStmt = NULL; pCxt->errCode = nodesMakeNode(QUERY_NODE_BALANCE_VGROUP_STMT, (SNode**)&pStmt); CHECK_MAKE_NODE(pStmt); return (SNode*)pStmt; @@ -3881,6 +3881,16 @@ _err: return NULL; } +SNode* createAssignLeaderStmt(SAstCreateContext* pCxt) { + CHECK_PARSER_STATUS(pCxt); + SBalanceVgroupStmt* pStmt = NULL; + pCxt->errCode = nodesMakeNode(QUERY_NODE_ASSIGN_LEADER_STMT, (SNode**)&pStmt); + CHECK_MAKE_NODE(pStmt); + return (SNode*)pStmt; +_err: + return NULL; +} + SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt, const SToken* pVgId) { CHECK_PARSER_STATUS(pCxt); SBalanceVgroupLeaderStmt* pStmt = NULL; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 74dd1be614..48ebb7afb1 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -8490,6 +8490,10 @@ static int32_t fillCmdSql(STranslateContext* pCxt, int16_t msgType, void* pReq) FILL_CMD_SQL(sql, sqlLen, pCmdReq, SBalanceVgroupReq, pReq); break; } + case TDMT_MND_ASSIGN_LEADER: { + FILL_CMD_SQL(sql, sqlLen, pCmdReq, SAssignLeaderReq, pReq); + break; + } case TDMT_MND_REDISTRIBUTE_VGROUP: { FILL_CMD_SQL(sql, sqlLen, pCmdReq, SRedistributeVgroupReq, pReq); break; @@ -12738,6 +12742,13 @@ static int32_t translateBalanceVgroup(STranslateContext* pCxt, SBalanceVgroupStm return code; } +static int32_t translateAssignLeader(STranslateContext* pCxt, SAssignLeaderStmt* pStmt) { + SAssignLeaderReq req = {0}; + int32_t code = buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP, (FSerializeFunc)tSerializeSAssignLeaderReq, &req); + tFreeSAssignLeaderReq(&req); + return code; +} + static int32_t translateBalanceVgroupLeader(STranslateContext* pCxt, SBalanceVgroupLeaderStmt* pStmt) { SBalanceVgroupLeaderReq req = {0}; req.vgId = pStmt->vgId; @@ -13495,6 +13506,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { case QUERY_NODE_BALANCE_VGROUP_STMT: code = translateBalanceVgroup(pCxt, (SBalanceVgroupStmt*)pNode); break; + case QUERY_NODE_ASSIGN_LEADER_STMT: + code = translateAssignLeader(pCxt, (SAssignLeaderStmt*)pNode); + break; case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: code = translateBalanceVgroupLeader(pCxt, (SBalanceVgroupLeaderStmt*)pNode); break;