From d9401fd75db59bb21cc7d4ca03a0b1e70585af03 Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 23 Jan 2025 09:37:07 +0000 Subject: [PATCH 1/9] feat/TS-5805-add-sql-command --- include/common/tmsg.h | 10 ++++++ include/common/tmsgdef.h | 1 + include/libs/nodes/cmdnodes.h | 4 +++ source/common/src/msg/tmsg.c | 40 +++++++++++++++++++++ source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mnode/impl/src/mndVgroup.c | 26 ++++++++++++++ source/libs/nodes/src/nodesCodeFuncs.c | 6 ++++ source/libs/nodes/src/nodesUtilFuncs.c | 4 +++ source/libs/parser/inc/parAst.h | 1 + source/libs/parser/inc/sql.y | 3 ++ source/libs/parser/src/parAstCreater.c | 12 ++++++- source/libs/parser/src/parTranslater.c | 14 ++++++++ 12 files changed, 121 insertions(+), 1 deletion(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 82eaa2359e..670dc9453f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -359,6 +359,7 @@ typedef enum ENodeType { QUERY_NODE_CREATE_ANODE_STMT, QUERY_NODE_DROP_ANODE_STMT, QUERY_NODE_UPDATE_ANODE_STMT, + QUERY_NODE_ASSIGN_LEADER_STMT, // show statement nodes // see 'sysTableShowAdapter', 'SYSTABLE_SHOW_TYPE_OFFSET' @@ -2653,6 +2654,15 @@ int32_t tSerializeSBalanceVgroupReq(void* buf, int32_t bufLen, SBalanceVgroupReq int32_t tDeserializeSBalanceVgroupReq(void* buf, int32_t bufLen, SBalanceVgroupReq* pReq); void tFreeSBalanceVgroupReq(SBalanceVgroupReq* pReq); +typedef struct { + int32_t useless; // useless + int32_t sqlLen; + char* sql; +} SAssignLeaderReq; + +int32_t tSerializeSAssignLeaderReq(void* buf, int32_t bufLen, SAssignLeaderReq* pReq); +int32_t tDeserializeSAssignLeaderReq(void* buf, int32_t bufLen, SAssignLeaderReq* pReq); +void tFreeSAssignLeaderReq(SAssignLeaderReq* pReq); typedef struct { int32_t vgId1; int32_t vgId2; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 9ea27485c2..4e65a4450f 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -263,6 +263,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_CONFIG, "init-config", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_SDB, "config-sdb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESET_STREAM, "reset-stream", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_ASSIGN_LEADER, "assign-leader", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 26482a87d4..ea869f515e 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -681,6 +681,10 @@ typedef struct SBalanceVgroupStmt { ENodeType type; } SBalanceVgroupStmt; +typedef struct SAssignLeaderStmt { + ENodeType type; +} SAssignLeaderStmt; + typedef struct SBalanceVgroupLeaderStmt { ENodeType type; int32_t vgId; diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index 7a51669d46..c19a885e8a 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -7644,6 +7644,46 @@ _exit: void tFreeSBalanceVgroupReq(SBalanceVgroupReq *pReq) { FREESQL(); } +int32_t tSerializeSAssignLeaderReq(void *buf, int32_t bufLen, SAssignLeaderReq *pReq) { + SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; + tEncoderInit(&encoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->useless)); + ENCODESQL(); + tEndEncode(&encoder); + +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSAssignLeaderReq(void *buf, int32_t bufLen, SAssignLeaderReq *pReq) { + SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->useless)); + DECODESQL(); + tEndDecode(&decoder); + +_exit: + tDecoderClear(&decoder); + return code; +} + +void tFreeSAssignLeaderReq(SAssignLeaderReq *pReq) { FREESQL(); } + int32_t tSerializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceVgroupLeaderReq *pReq) { SEncoder encoder = {0}; int32_t code = 0; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 8f110dbcf3..cb2b5d6897 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -160,6 +160,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_MERGE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SPLIT_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ASSIGN_LEADER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP_LEADER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index e20afb7201..6a3dbb488f 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -46,6 +46,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq); +static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq); int32_t mndInitVgroup(SMnode *pMnode) { SSdbTable table = { @@ -77,6 +78,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg); mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg); + mndSetMsgHandle(pMnode, TDMT_MND_ASSIGN_LEADER, mndProcessAssignLeaderMsg); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup); @@ -3570,6 +3572,30 @@ _OVER: TAOS_RETURN(code); } +static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq){ + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + SArray *pArray = NULL; + void *pIter = NULL; + int64_t curMs = taosGetTimestampMs(); + + SAssignLeaderReq req = {0}; + if (tDeserializeSAssignLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) { + code = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + auditRecord(pReq, pMnode->clusterId, "assignLeader", "", "", req.sql, req.sqlLen); + +_OVER: + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("failed to assign leader since %s", tstrerror(code)); + } + + tFreeSAssignLeaderReq(&req); + TAOS_RETURN(code); +} + bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) { diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index bfe86aa2ac..3e21aad20d 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -199,6 +199,8 @@ const char* nodesNodeName(ENodeType type) { return "ResetStreamStmt"; case QUERY_NODE_BALANCE_VGROUP_STMT: return "BalanceVgroupStmt"; + case QUERY_NODE_ASSIGN_LEADER_STMT: + return "AssignLeaderStmt"; case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: return "BalanceVgroupLeaderStmt"; case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: @@ -8167,6 +8169,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return dropStreamStmtToJson(pObj, pJson); case QUERY_NODE_BALANCE_VGROUP_STMT: return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to serialize. + case QUERY_NODE_ASSIGN_LEADER_STMT: + return TSDB_CODE_SUCCESS; case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: return TSDB_CODE_SUCCESS; // SBalanceVgroupLeaderStmt has no fields to serialize. case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: @@ -8536,6 +8540,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToDropStreamStmt(pJson, pObj); case QUERY_NODE_BALANCE_VGROUP_STMT: return TSDB_CODE_SUCCESS; // SBalanceVgroupStmt has no fields to deserialize. + case QUERY_NODE_ASSIGN_LEADER_STMT: + return TSDB_CODE_SUCCESS; case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: return TSDB_CODE_SUCCESS; case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index ae5b302d2d..c15753efc9 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -624,6 +624,9 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { case QUERY_NODE_BALANCE_VGROUP_STMT: code = makeNode(type, sizeof(SBalanceVgroupStmt), &pNode); break; + case QUERY_NODE_ASSIGN_LEADER_STMT: + code = makeNode(type, sizeof(SAssignLeaderStmt), &pNode); + break; case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: code = makeNode(type, sizeof(SBalanceVgroupLeaderStmt), &pNode); break; @@ -1497,6 +1500,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_RESUME_STREAM_STMT: // no pointer field case QUERY_NODE_RESET_STREAM_STMT: // no pointer field case QUERY_NODE_BALANCE_VGROUP_STMT: // no pointer field + case QUERY_NODE_ASSIGN_LEADER_STMT: case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: // no pointer field case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: // no pointer field case QUERY_NODE_MERGE_VGROUP_STMT: // no pointer field diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index 387bccf358..7807d519a9 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -309,6 +309,7 @@ SNode* createResetStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, STok SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId); SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId); SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt); +SNode* createAssingLeaderStmt(SAstCreateContext* pCxt); SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt, const SToken* pVgId); SNode* createBalanceVgroupLeaderDBNameStmt(SAstCreateContext* pCxt, const SToken* pDbName); SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 439af13d71..e95fcb64bb 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -860,6 +860,9 @@ cmd ::= KILL COMPACT NK_INTEGER(A). /************************************************ merge/redistribute/ vgroup ******************************************/ cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); } + +cmd ::= ASSIGN LEADER FORCE. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); } + cmd ::= BALANCE VGROUP LEADER on_vgroup_id(A). { pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt, &A); } cmd ::= BALANCE VGROUP LEADER DATABASE db_name(A). { pCxt->pRootNode = createBalanceVgroupLeaderDBNameStmt(pCxt, &A); } cmd ::= MERGE VGROUP NK_INTEGER(A) NK_INTEGER(B). { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &A, &B); } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index c875cbad05..06a7c9d3a5 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -3873,7 +3873,7 @@ _err: SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt) { CHECK_PARSER_STATUS(pCxt); - SBalanceVgroupStmt* pStmt = NULL; + SAssignLeaderStmt* pStmt = NULL; pCxt->errCode = nodesMakeNode(QUERY_NODE_BALANCE_VGROUP_STMT, (SNode**)&pStmt); CHECK_MAKE_NODE(pStmt); return (SNode*)pStmt; @@ -3881,6 +3881,16 @@ _err: return NULL; } +SNode* createAssignLeaderStmt(SAstCreateContext* pCxt) { + CHECK_PARSER_STATUS(pCxt); + SBalanceVgroupStmt* pStmt = NULL; + pCxt->errCode = nodesMakeNode(QUERY_NODE_ASSIGN_LEADER_STMT, (SNode**)&pStmt); + CHECK_MAKE_NODE(pStmt); + return (SNode*)pStmt; +_err: + return NULL; +} + SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt, const SToken* pVgId) { CHECK_PARSER_STATUS(pCxt); SBalanceVgroupLeaderStmt* pStmt = NULL; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 74dd1be614..48ebb7afb1 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -8490,6 +8490,10 @@ static int32_t fillCmdSql(STranslateContext* pCxt, int16_t msgType, void* pReq) FILL_CMD_SQL(sql, sqlLen, pCmdReq, SBalanceVgroupReq, pReq); break; } + case TDMT_MND_ASSIGN_LEADER: { + FILL_CMD_SQL(sql, sqlLen, pCmdReq, SAssignLeaderReq, pReq); + break; + } case TDMT_MND_REDISTRIBUTE_VGROUP: { FILL_CMD_SQL(sql, sqlLen, pCmdReq, SRedistributeVgroupReq, pReq); break; @@ -12738,6 +12742,13 @@ static int32_t translateBalanceVgroup(STranslateContext* pCxt, SBalanceVgroupStm return code; } +static int32_t translateAssignLeader(STranslateContext* pCxt, SAssignLeaderStmt* pStmt) { + SAssignLeaderReq req = {0}; + int32_t code = buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP, (FSerializeFunc)tSerializeSAssignLeaderReq, &req); + tFreeSAssignLeaderReq(&req); + return code; +} + static int32_t translateBalanceVgroupLeader(STranslateContext* pCxt, SBalanceVgroupLeaderStmt* pStmt) { SBalanceVgroupLeaderReq req = {0}; req.vgId = pStmt->vgId; @@ -13495,6 +13506,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { case QUERY_NODE_BALANCE_VGROUP_STMT: code = translateBalanceVgroup(pCxt, (SBalanceVgroupStmt*)pNode); break; + case QUERY_NODE_ASSIGN_LEADER_STMT: + code = translateAssignLeader(pCxt, (SAssignLeaderStmt*)pNode); + break; case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: code = translateBalanceVgroupLeader(pCxt, (SBalanceVgroupLeaderStmt*)pNode); break; From 9608344833d6fdf9b3a59165d33750cd9b7c1cac Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 24 Jan 2025 03:47:45 +0000 Subject: [PATCH 2/9] feat/TS-5805-force-assign-leader-add-sql --- include/common/tmsgdef.h | 3 +-- source/libs/parser/inc/parAst.h | 2 +- source/libs/parser/inc/sql.y | 4 ++-- source/libs/parser/src/parAstCreater.c | 4 ++-- source/libs/parser/src/parTranslater.c | 2 +- 5 files changed, 7 insertions(+), 8 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 4e65a4450f..5ea23e2b0f 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -187,7 +187,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) - TD_DEF_MSG_TYPE(TDMT_MND_UNUSED1, "unused", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_ASSIGN_LEADER, "assign-leader", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp) @@ -263,7 +263,6 @@ TD_DEF_MSG_TYPE(TDMT_MND_CONFIG, "init-config", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_SDB, "config-sdb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESET_STREAM, "reset-stream", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_ASSIGN_LEADER, "assign-leader", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index 7807d519a9..d05438c04e 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -309,7 +309,7 @@ SNode* createResetStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, STok SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId); SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId); SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt); -SNode* createAssingLeaderStmt(SAstCreateContext* pCxt); +SNode* createAssignLeaderStmt(SAstCreateContext* pCxt); SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt, const SToken* pVgId); SNode* createBalanceVgroupLeaderDBNameStmt(SAstCreateContext* pCxt, const SToken* pDbName); SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index e95fcb64bb..be425107c8 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -859,9 +859,9 @@ cmd ::= KILL TRANSACTION NK_INTEGER(A). cmd ::= KILL COMPACT NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_COMPACT_STMT, &A); } /************************************************ merge/redistribute/ vgroup ******************************************/ -cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); } +cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createAssignLeaderStmt(pCxt); } -cmd ::= ASSIGN LEADER FORCE. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); } +cmd ::= ASSIGN LEADER FORCE. { pCxt->pRootNode = createAssignLeaderStmt(pCxt); } cmd ::= BALANCE VGROUP LEADER on_vgroup_id(A). { pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt, &A); } cmd ::= BALANCE VGROUP LEADER DATABASE db_name(A). { pCxt->pRootNode = createBalanceVgroupLeaderDBNameStmt(pCxt, &A); } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 06a7c9d3a5..88cf1f3a2a 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -3873,7 +3873,7 @@ _err: SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt) { CHECK_PARSER_STATUS(pCxt); - SAssignLeaderStmt* pStmt = NULL; + SBalanceVgroupStmt* pStmt = NULL; pCxt->errCode = nodesMakeNode(QUERY_NODE_BALANCE_VGROUP_STMT, (SNode**)&pStmt); CHECK_MAKE_NODE(pStmt); return (SNode*)pStmt; @@ -3883,7 +3883,7 @@ _err: SNode* createAssignLeaderStmt(SAstCreateContext* pCxt) { CHECK_PARSER_STATUS(pCxt); - SBalanceVgroupStmt* pStmt = NULL; + SAssignLeaderStmt* pStmt = NULL; pCxt->errCode = nodesMakeNode(QUERY_NODE_ASSIGN_LEADER_STMT, (SNode**)&pStmt); CHECK_MAKE_NODE(pStmt); return (SNode*)pStmt; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 48ebb7afb1..08d2bf90bb 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -12744,7 +12744,7 @@ static int32_t translateBalanceVgroup(STranslateContext* pCxt, SBalanceVgroupStm static int32_t translateAssignLeader(STranslateContext* pCxt, SAssignLeaderStmt* pStmt) { SAssignLeaderReq req = {0}; - int32_t code = buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP, (FSerializeFunc)tSerializeSAssignLeaderReq, &req); + int32_t code = buildCmdMsg(pCxt, TDMT_MND_ASSIGN_LEADER, (FSerializeFunc)tSerializeSAssignLeaderReq, &req); tFreeSAssignLeaderReq(&req); return code; } From 2b4d4162a76962e5a508b15cecb2bb4b1fc40896 Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 24 Jan 2025 08:50:04 +0000 Subject: [PATCH 3/9] feat/TS-5805-force-set-assign --- include/common/tmsg.h | 1 + source/common/src/msg/tmsg.c | 4 + source/dnode/mnode/impl/src/mndArbGroup.c | 78 ++++++++++++++- source/dnode/mnode/impl/src/mndVgroup.c | 26 ----- source/libs/sync/src/syncMain.c | 10 +- tests/army/cluster/arbitrator_restart.py | 111 ++++++++++++++++++++++ 6 files changed, 196 insertions(+), 34 deletions(-) create mode 100644 tests/army/cluster/arbitrator_restart.py diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 670dc9453f..c1481cf716 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2578,6 +2578,7 @@ typedef struct { char* arbToken; int64_t arbTerm; char* memberToken; + int8_t force; } SVArbSetAssignedLeaderReq; int32_t tSerializeSVArbSetAssignedLeaderReq(void* buf, int32_t bufLen, SVArbSetAssignedLeaderReq* pReq); diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index c19a885e8a..74425a296b 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -8210,6 +8210,7 @@ int32_t tSerializeSVArbSetAssignedLeaderReq(void *buf, int32_t bufLen, SVArbSetA TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->arbToken)); TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->arbTerm)); TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->memberToken)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->force)); tEndEncode(&encoder); @@ -8239,6 +8240,9 @@ int32_t tDeserializeSVArbSetAssignedLeaderReq(void *buf, int32_t bufLen, SVArbSe TAOS_CHECK_EXIT(terrno); } TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->memberToken)); + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->force)); + } tEndDecode(&decoder); diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index 3879e8e6e2..88d0291e0d 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -46,6 +46,7 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp); static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp); static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter); +static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq); static int32_t mndArbCheckToken(const char *token1, const char *token2) { if (token1 == NULL || token2 == NULL) return -1; @@ -71,6 +72,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp); mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp); mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp); + mndSetMsgHandle(pMnode, TDMT_MND_ASSIGN_LEADER, mndProcessAssignLeaderMsg); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup); @@ -530,11 +532,12 @@ static bool mndCheckArbMemberHbTimeout(SArbGroup *pArbGroup, int32_t index, int6 } static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm, - char *memberToken) { + char *memberToken, bool force) { SVArbSetAssignedLeaderReq req = {0}; req.arbToken = arbToken; req.arbTerm = arbTerm; req.memberToken = memberToken; + if (force) req.force = 1; int32_t reqLen = tSerializeSVArbSetAssignedLeaderReq(NULL, 0, &req); int32_t contLen = reqLen + sizeof(SMsgHead); @@ -554,10 +557,10 @@ static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, ch } static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, int32_t vgId, char *arbToken, - int64_t term, char *memberToken) { + int64_t term, char *memberToken, bool force) { int32_t code = 0; int32_t contLen = 0; - void *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken); + void *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken, force); if (!pHead) { mError("vgId:%d, failed to build set-assigned request", vgId); code = -1; @@ -643,6 +646,73 @@ void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SAr *pOp = CHECK_SYNC_UPDATE; } +static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + int32_t code = -1, lino = 0; + SArray *pArray = NULL; + void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + SArbGroup *pArbGroup = NULL; + + SAssignLeaderReq req = {0}; + if (tDeserializeSAssignLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + mInfo("begin to process assign leader"); + + char arbToken[TSDB_ARB_TOKEN_SIZE]; + TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken)); + + int64_t term = mndGetTerm(pMnode); + if (term < 0) { + mError("arb failed to get term since %s", terrstr()); + code = -1; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + + while (1) { + pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup); + if (pIter == NULL) break; + + SArbGroup arbGroupDup = {0}; + + (void)taosThreadMutexLock(&pArbGroup->mutex); + mndArbGroupDupObj(pArbGroup, &arbGroupDup); + (void)taosThreadMutexUnlock(&pArbGroup->mutex); + + sdbRelease(pSdb, pArbGroup); + + int32_t dnodeId = 0; + for (int32_t i = 0; i < 2; i++) { + SDnodeObj *pDnode = mndAcquireDnode(pMnode, arbGroupDup.members[i].info.dnodeId); + bool isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs()); + mndReleaseDnode(pMnode, pDnode); + if (isonline) { + dnodeId = arbGroupDup.members[i].info.dnodeId; + break; + } + } + + (void)mndSendArbSetAssignedLeaderReq(pMnode, dnodeId, arbGroupDup.vgId, arbToken, term, "", true); + mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", arbGroupDup.vgId, dnodeId); + } + + code = 0; + + // auditRecord(pReq, pMnode->clusterId, "assignLeader", "", "", req.sql, req.sqlLen); + +_exit: + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("failed to assign leader since %s", tstrerror(code)); + } + + tFreeSAssignLeaderReq(&req); + TAOS_RETURN(code); +} + static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { int32_t code = 0, lino = 0; SMnode *pMnode = pReq->info.node; @@ -695,7 +765,7 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { mTrace("vgId:%d, arb skip to send msg by check sync", vgId); break; case CHECK_SYNC_SET_ASSIGNED_LEADER: - (void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token); + (void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token, false); mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, assgndDnodeId); break; case CHECK_SYNC_CHECK_SYNC: diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 6a3dbb488f..e20afb7201 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -46,7 +46,6 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq); -static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq); int32_t mndInitVgroup(SMnode *pMnode) { SSdbTable table = { @@ -78,7 +77,6 @@ int32_t mndInitVgroup(SMnode *pMnode) { // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg); mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg); - mndSetMsgHandle(pMnode, TDMT_MND_ASSIGN_LEADER, mndProcessAssignLeaderMsg); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup); @@ -3572,30 +3570,6 @@ _OVER: TAOS_RETURN(code); } -static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq){ - SMnode *pMnode = pReq->info.node; - int32_t code = -1; - SArray *pArray = NULL; - void *pIter = NULL; - int64_t curMs = taosGetTimestampMs(); - - SAssignLeaderReq req = {0}; - if (tDeserializeSAssignLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) { - code = TSDB_CODE_INVALID_MSG; - goto _OVER; - } - - auditRecord(pReq, pMnode->clusterId, "assignLeader", "", "", req.sql, req.sqlLen); - -_OVER: - if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("failed to assign leader since %s", tstrerror(code)); - } - - tFreeSAssignLeaderReq(&req); - TAOS_RETURN(code); -} - bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 0933fd48c7..ea91412536 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -334,10 +334,12 @@ int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) { ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm); - if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) { - sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken, - req.memberToken); - goto _OVER; + if (!req.force) { + if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) { + sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken, + req.memberToken); + goto _OVER; + } } if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) { diff --git a/tests/army/cluster/arbitrator_restart.py b/tests/army/cluster/arbitrator_restart.py new file mode 100644 index 0000000000..7a02e46630 --- /dev/null +++ b/tests/army/cluster/arbitrator_restart.py @@ -0,0 +1,111 @@ +import taos +import sys +import os +import subprocess +import glob +import shutil +import time + +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.srvCtl import * +from frame.caseBase import * +from frame import * +from frame.autogen import * +from frame import epath +# from frame.server.dnodes import * +# from frame.server.cluster import * + + +class TDTestCase(TBase): + + def init(self, conn, logSql, replicaVar=1): + updatecfgDict = {'dDebugFlag':131} + super(TDTestCase, self).init(conn, logSql, replicaVar=1, checkColName="c1") + + self.valgrind = 0 + self.db = "test" + self.stb = "meters" + self.childtable_count = 10 + tdSql.init(conn.cursor(), logSql) + + def run(self): + tdSql.execute('CREATE DATABASE db vgroups 1 replica 2;') + + time.sleep(1) + + tdSql.execute("use db;") + + tdSql.execute("CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);") + + tdSql.execute("CREATE TABLE d0 USING meters TAGS (\"California.SanFrancisco\", 2);"); + + count = 0 + + while count < 100: + tdSql.query("show arbgroups;") + + if tdSql.getData(0, 4) == 1: + break + + tdLog.info("wait 1 seconds for is sync") + time.sleep(1) + + count += 1 + + sc.dnodeStop(2) + sc.dnodeStop(3) + + sc.dnodeStart(2) + + count = 0 + while count < 100: + tdSql.query("show db.vgroups;") + + if(tdSql.getData(0, 4) == "candidate") or (tdSql.getData(0, 6) == "candidate"): + break + + tdLog.info("wait 1 seconds for candidate") + time.sleep(1) + + count += 1 + + tdSql.execute("balance vgroup;") + + count = 0 + while count < 100: + tdSql.query("show db.vgroups;") + + if(tdSql.getData(0, 4) == "assigned ") or (tdSql.getData(0, 6) == "assigned "): + break + + tdLog.info("wait 1 seconds for set assigned") + time.sleep(1) + + count += 1 + + tdSql.execute("INSERT INTO d0 VALUES (NOW, 10.3, 219, 0.31);") + + sc.dnodeStart(3) + + count = 0 + + while count < 100: + tdSql.query("show arbgroups;") + + if tdSql.getData(0, 4) == 1: + break + + tdLog.info("wait 1 seconds for is sync") + time.sleep(1) + + count += 1 + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) From b07f4a9bacbb087b65c4d20163efaf70ada02904 Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 24 Jan 2025 09:08:47 +0000 Subject: [PATCH 4/9] feat/TS-5805-force-assign-leader-change-sql --- source/libs/parser/inc/sql.y | 4 ++-- tests/army/cluster/arbitrator_restart.py | 2 +- tests/parallel_test/cases.task | 1 + 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index be425107c8..4ed05d6b74 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -859,9 +859,9 @@ cmd ::= KILL TRANSACTION NK_INTEGER(A). cmd ::= KILL COMPACT NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_COMPACT_STMT, &A); } /************************************************ merge/redistribute/ vgroup ******************************************/ -cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createAssignLeaderStmt(pCxt); } +cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); } -cmd ::= ASSIGN LEADER FORCE. { pCxt->pRootNode = createAssignLeaderStmt(pCxt); } +cmd ::= BALANCE VGROUP FORCE. { pCxt->pRootNode = createAssignLeaderStmt(pCxt); } cmd ::= BALANCE VGROUP LEADER on_vgroup_id(A). { pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt, &A); } cmd ::= BALANCE VGROUP LEADER DATABASE db_name(A). { pCxt->pRootNode = createBalanceVgroupLeaderDBNameStmt(pCxt, &A); } diff --git a/tests/army/cluster/arbitrator_restart.py b/tests/army/cluster/arbitrator_restart.py index 7a02e46630..21b7a5b82b 100644 --- a/tests/army/cluster/arbitrator_restart.py +++ b/tests/army/cluster/arbitrator_restart.py @@ -71,7 +71,7 @@ class TDTestCase(TBase): count += 1 - tdSql.execute("balance vgroup;") + tdSql.execute("BALANCE VGROUP FORCE;") count = 0 while count < 100: diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 0201c88d2b..a38b0cd45e 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -22,6 +22,7 @@ ,,y,army,./pytest.sh python3 ./test.py -f multi-level/mlevel_basic.py -N 3 -L 3 -D 2 ,,y,army,./pytest.sh python3 ./test.py -f db-encrypt/basic.py -N 3 -M 3 ,,y,army,./pytest.sh python3 ./test.py -f cluster/arbitrator.py -N 3 +,,y,army,./pytest.sh python3 ./test.py -f cluster/arbitrator_restart.py -N 3 ,,n,army,python3 ./test.py -f storage/s3/s3Basic.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f cluster/snapshot.py -N 3 -L 3 -D 2 ,,y,army,./pytest.sh python3 ./test.py -f query/function/test_func_elapsed.py From b422e2b2f25942f79da1ed96a7d7681d80f31bd9 Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 12 Feb 2025 06:29:24 +0000 Subject: [PATCH 5/9] feat/TS-5805-force-assign-leader-msg-type --- include/common/tmsgdef.h | 3 ++- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 2 +- source/dnode/mnode/impl/src/mndArbGroup.c | 2 +- source/libs/parser/src/parTranslater.c | 4 ++-- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 5ea23e2b0f..29a7e52482 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -187,7 +187,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) - TD_DEF_MSG_TYPE(TDMT_MND_ASSIGN_LEADER, "assign-leader", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_UNUSED1, "unused", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp) @@ -421,6 +421,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_ARB_CHECK_SYNC_TIMER, "mnd-arb-check-sync-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP_BATCH, "mnd-arb-update-group-batch", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_ARB_ASSIGN_LEADER, "mnd-arb-assign-leader", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_MND_ARB_MSG) TD_NEW_MSG_SEG(TDMT_MAX_MSG) // msg end mark diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index cb2b5d6897..46b0877476 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -160,7 +160,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_MERGE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SPLIT_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ASSIGN_LEADER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ARB_ASSIGN_LEADER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP_LEADER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index 2e5d20fb68..48c7dace16 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -71,7 +71,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp); mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp); mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp); - mndSetMsgHandle(pMnode, TDMT_MND_ASSIGN_LEADER, mndProcessAssignLeaderMsg); + mndSetMsgHandle(pMnode, TDMT_MND_ARB_ASSIGN_LEADER, mndProcessAssignLeaderMsg); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 756aa97a5f..474260a81f 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -8532,7 +8532,7 @@ static int32_t fillCmdSql(STranslateContext* pCxt, int16_t msgType, void* pReq) FILL_CMD_SQL(sql, sqlLen, pCmdReq, SBalanceVgroupReq, pReq); break; } - case TDMT_MND_ASSIGN_LEADER: { + case TDMT_MND_ARB_ASSIGN_LEADER: { FILL_CMD_SQL(sql, sqlLen, pCmdReq, SAssignLeaderReq, pReq); break; } @@ -12786,7 +12786,7 @@ static int32_t translateBalanceVgroup(STranslateContext* pCxt, SBalanceVgroupStm static int32_t translateAssignLeader(STranslateContext* pCxt, SAssignLeaderStmt* pStmt) { SAssignLeaderReq req = {0}; - int32_t code = buildCmdMsg(pCxt, TDMT_MND_ASSIGN_LEADER, (FSerializeFunc)tSerializeSAssignLeaderReq, &req); + int32_t code = buildCmdMsg(pCxt, TDMT_MND_ARB_ASSIGN_LEADER, (FSerializeFunc)tSerializeSAssignLeaderReq, &req); tFreeSAssignLeaderReq(&req); return code; } From 977372d376411867e21dc6800cf342f678f2fd3c Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 12 Feb 2025 06:50:37 +0000 Subject: [PATCH 6/9] feat/TS-5805-force-assign-leader-fix-cases --- source/libs/parser/inc/sql.y | 2 +- tests/army/cluster/arbitrator_restart.py | 20 ++++++++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 796ee76606..341a699288 100755 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -863,7 +863,7 @@ cmd ::= KILL COMPACT NK_INTEGER(A). /************************************************ merge/redistribute/ vgroup ******************************************/ cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); } -cmd ::= BALANCE VGROUP FORCE. { pCxt->pRootNode = createAssignLeaderStmt(pCxt); } +cmd ::= BALANCE LEADER FORCE. { pCxt->pRootNode = createAssignLeaderStmt(pCxt); } cmd ::= BALANCE VGROUP LEADER on_vgroup_id(A). { pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt, &A); } cmd ::= BALANCE VGROUP LEADER DATABASE db_name(A). { pCxt->pRootNode = createBalanceVgroupLeaderDBNameStmt(pCxt, &A); } diff --git a/tests/army/cluster/arbitrator_restart.py b/tests/army/cluster/arbitrator_restart.py index 21b7a5b82b..4a80aa1c5a 100644 --- a/tests/army/cluster/arbitrator_restart.py +++ b/tests/army/cluster/arbitrator_restart.py @@ -46,14 +46,17 @@ class TDTestCase(TBase): while count < 100: tdSql.query("show arbgroups;") - if tdSql.getData(0, 4) == 1: + if tdSql.getData(0, 4) == True: break tdLog.info("wait 1 seconds for is sync") time.sleep(1) count += 1 - + if count == 100: + tdLog.exit("arbgroup sync failed") + return + sc.dnodeStop(2) sc.dnodeStop(3) @@ -70,8 +73,11 @@ class TDTestCase(TBase): time.sleep(1) count += 1 - - tdSql.execute("BALANCE VGROUP FORCE;") + if count == 100: + tdLog.exit("wait candidate failed") + return + + tdSql.execute("BALANCE LEADER FORCE;") count = 0 while count < 100: @@ -84,6 +90,9 @@ class TDTestCase(TBase): time.sleep(1) count += 1 + if count == 100: + tdLog.exit("assign leader failed") + return tdSql.execute("INSERT INTO d0 VALUES (NOW, 10.3, 219, 0.31);") @@ -101,6 +110,9 @@ class TDTestCase(TBase): time.sleep(1) count += 1 + if count == 100: + tdLog.exit("arbgroup sync failed") + return def stop(self): tdSql.close() From ee0c280fe6ed3fe4875349f7c8ddc961e0422c5b Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 12 Feb 2025 08:20:56 +0000 Subject: [PATCH 7/9] feat/TS-5805-force-assign-leader-token --- source/libs/parser/inc/sql.y | 2 +- source/libs/parser/src/parTokenizer.c | 1 + tests/army/cluster/arbitrator_restart.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 341a699288..2bc9e8a668 100755 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -863,7 +863,7 @@ cmd ::= KILL COMPACT NK_INTEGER(A). /************************************************ merge/redistribute/ vgroup ******************************************/ cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); } -cmd ::= BALANCE LEADER FORCE. { pCxt->pRootNode = createAssignLeaderStmt(pCxt); } +cmd ::= ASSIGN LEADER FORCE. { pCxt->pRootNode = createAssignLeaderStmt(pCxt); } cmd ::= BALANCE VGROUP LEADER on_vgroup_id(A). { pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt, &A); } cmd ::= BALANCE VGROUP LEADER DATABASE db_name(A). { pCxt->pRootNode = createBalanceVgroupLeaderDBNameStmt(pCxt, &A); } diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index 3b08d403dc..300af653af 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -359,6 +359,7 @@ static SKeyword keywordTable[] = { {"ON_FAILURE", TK_ON_FAILURE}, {"NOTIFY_HISTORY", TK_NOTIFY_HISTORY}, {"REGEXP", TK_REGEXP}, + {"ASSIGN", TK_ASSIGN}, }; // clang-format on diff --git a/tests/army/cluster/arbitrator_restart.py b/tests/army/cluster/arbitrator_restart.py index 4a80aa1c5a..aad51e7d26 100644 --- a/tests/army/cluster/arbitrator_restart.py +++ b/tests/army/cluster/arbitrator_restart.py @@ -77,7 +77,7 @@ class TDTestCase(TBase): tdLog.exit("wait candidate failed") return - tdSql.execute("BALANCE LEADER FORCE;") + tdSql.execute("ASSIGN LEADER FORCE;") count = 0 while count < 100: From cdf0018b47ae22f1b90209a7bd056541d1d28f2f Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 26 Feb 2025 10:38:03 +0000 Subject: [PATCH 8/9] feat/TS-5805-force-assign-leader-update-doc --- docs/en/14-reference/03-taos-sql/20-keywords.md | 1 + docs/zh/08-operation/18-ha/02-replica2.md | 4 ++++ docs/zh/14-reference/03-taos-sql/20-keywords.md | 1 + 3 files changed, 6 insertions(+) diff --git a/docs/en/14-reference/03-taos-sql/20-keywords.md b/docs/en/14-reference/03-taos-sql/20-keywords.md index cd7d9e7a4b..682dcd3f84 100644 --- a/docs/en/14-reference/03-taos-sql/20-keywords.md +++ b/docs/en/14-reference/03-taos-sql/20-keywords.md @@ -37,6 +37,7 @@ The list of keywords is as follows: | ASOF | | | AT_ONCE | | | ATTACH | | +| ASSIGN | Version 3.3.6.0 and later | ### B diff --git a/docs/zh/08-operation/18-ha/02-replica2.md b/docs/zh/08-operation/18-ha/02-replica2.md index 7f3eb2fe5c..e238e3f4f4 100644 --- a/docs/zh/08-operation/18-ha/02-replica2.md +++ b/docs/zh/08-operation/18-ha/02-replica2.md @@ -69,9 +69,13 @@ alter database replica 2|1 | ------- | ------ | | 没有 Vnode 发生故障: Arbitrator 故障(Mnode 宕机节点超过一个,导致 Mnode 无法选主)| **持续提供服务** | | 仅一个 Vnode 故障:VGroup 已经达成同步后,某一个 Vnode 才发生故障的 | **持续提供服务** | +| 仅一个 Vnode 故障:2个 Vnode 同时故障,故障前 VGroup 达成同步,但是只有一个 Vnode 从故障中恢复服务,另一个 Vnode 服务故障 | **通过下面的命令,强制指定leader, 继续提供服务** | | 仅一个 Vnode 故障:离线 Vnode 启动后,VGroup 未达成同步前,另一个 Vnode 服务故障的 | **无法提供服务** | | 两个 Vnode 都发生故障 | **无法提供服务** | +```sql +ASSIGN LEADER FORCE; +``` ## 常见问题 diff --git a/docs/zh/14-reference/03-taos-sql/20-keywords.md b/docs/zh/14-reference/03-taos-sql/20-keywords.md index 1e8fabf571..8d5a538dd5 100644 --- a/docs/zh/14-reference/03-taos-sql/20-keywords.md +++ b/docs/zh/14-reference/03-taos-sql/20-keywords.md @@ -38,6 +38,7 @@ description: TDengine 保留关键字的详细列表 | AT_ONCE | | | ATTACH | | | AUTO | 3.3.5.0 及后续版本 | +| ASSIGN | 3.3.6.0 及后续版本 | ### B |关键字|说明| From 5635b285cd6be1829c51b524b8a1743bef2b6e82 Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 27 Feb 2025 03:17:21 +0000 Subject: [PATCH 9/9] feat/TS-5805-force-assign-leader-fix-check --- source/dnode/mnode/impl/src/mndArbGroup.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index 48c7dace16..af56283c96 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -1431,7 +1431,7 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock char strCheckSyncCode[100] = {0}; char bufUpdateTime[40] = {0}; (void)formatTimestamp(bufUpdateTime, pGroup->updateTimeMs, TSDB_TIME_PRECISION_MILLI); - tsnprintf(strCheckSyncCode, 100, "%s(%s)", tstrerror(pGroup->code), bufUpdateTime); + (void)tsnprintf(strCheckSyncCode, 100, "%s(%s)", tstrerror(pGroup->code), bufUpdateTime); char checkSyncCode[100 + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(checkSyncCode, strCheckSyncCode, 100 + VARSTR_HEADER_SIZE);