From 2b4d4162a76962e5a508b15cecb2bb4b1fc40896 Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 24 Jan 2025 08:50:04 +0000 Subject: [PATCH] feat/TS-5805-force-set-assign --- include/common/tmsg.h | 1 + source/common/src/msg/tmsg.c | 4 + source/dnode/mnode/impl/src/mndArbGroup.c | 78 ++++++++++++++- source/dnode/mnode/impl/src/mndVgroup.c | 26 ----- source/libs/sync/src/syncMain.c | 10 +- tests/army/cluster/arbitrator_restart.py | 111 ++++++++++++++++++++++ 6 files changed, 196 insertions(+), 34 deletions(-) create mode 100644 tests/army/cluster/arbitrator_restart.py diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 670dc9453f..c1481cf716 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2578,6 +2578,7 @@ typedef struct { char* arbToken; int64_t arbTerm; char* memberToken; + int8_t force; } SVArbSetAssignedLeaderReq; int32_t tSerializeSVArbSetAssignedLeaderReq(void* buf, int32_t bufLen, SVArbSetAssignedLeaderReq* pReq); diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index c19a885e8a..74425a296b 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -8210,6 +8210,7 @@ int32_t tSerializeSVArbSetAssignedLeaderReq(void *buf, int32_t bufLen, SVArbSetA TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->arbToken)); TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->arbTerm)); TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->memberToken)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->force)); tEndEncode(&encoder); @@ -8239,6 +8240,9 @@ int32_t tDeserializeSVArbSetAssignedLeaderReq(void *buf, int32_t bufLen, SVArbSe TAOS_CHECK_EXIT(terrno); } TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->memberToken)); + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->force)); + } tEndDecode(&decoder); diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index 3879e8e6e2..88d0291e0d 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -46,6 +46,7 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp); static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp); static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextArbGroup(SMnode *pMnode, void *pIter); +static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq); static int32_t mndArbCheckToken(const char *token1, const char *token2) { if (token1 == NULL || token2 == NULL) return -1; @@ -71,6 +72,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp); mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp); mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp); + mndSetMsgHandle(pMnode, TDMT_MND_ASSIGN_LEADER, mndProcessAssignLeaderMsg); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup); @@ -530,11 +532,12 @@ static bool mndCheckArbMemberHbTimeout(SArbGroup *pArbGroup, int32_t index, int6 } static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm, - char *memberToken) { + char *memberToken, bool force) { SVArbSetAssignedLeaderReq req = {0}; req.arbToken = arbToken; req.arbTerm = arbTerm; req.memberToken = memberToken; + if (force) req.force = 1; int32_t reqLen = tSerializeSVArbSetAssignedLeaderReq(NULL, 0, &req); int32_t contLen = reqLen + sizeof(SMsgHead); @@ -554,10 +557,10 @@ static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, ch } static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, int32_t vgId, char *arbToken, - int64_t term, char *memberToken) { + int64_t term, char *memberToken, bool force) { int32_t code = 0; int32_t contLen = 0; - void *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken); + void *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken, force); if (!pHead) { mError("vgId:%d, failed to build set-assigned request", vgId); code = -1; @@ -643,6 +646,73 @@ void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SAr *pOp = CHECK_SYNC_UPDATE; } +static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + int32_t code = -1, lino = 0; + SArray *pArray = NULL; + void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + SArbGroup *pArbGroup = NULL; + + SAssignLeaderReq req = {0}; + if (tDeserializeSAssignLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + mInfo("begin to process assign leader"); + + char arbToken[TSDB_ARB_TOKEN_SIZE]; + TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken)); + + int64_t term = mndGetTerm(pMnode); + if (term < 0) { + mError("arb failed to get term since %s", terrstr()); + code = -1; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + + while (1) { + pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup); + if (pIter == NULL) break; + + SArbGroup arbGroupDup = {0}; + + (void)taosThreadMutexLock(&pArbGroup->mutex); + mndArbGroupDupObj(pArbGroup, &arbGroupDup); + (void)taosThreadMutexUnlock(&pArbGroup->mutex); + + sdbRelease(pSdb, pArbGroup); + + int32_t dnodeId = 0; + for (int32_t i = 0; i < 2; i++) { + SDnodeObj *pDnode = mndAcquireDnode(pMnode, arbGroupDup.members[i].info.dnodeId); + bool isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs()); + mndReleaseDnode(pMnode, pDnode); + if (isonline) { + dnodeId = arbGroupDup.members[i].info.dnodeId; + break; + } + } + + (void)mndSendArbSetAssignedLeaderReq(pMnode, dnodeId, arbGroupDup.vgId, arbToken, term, "", true); + mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", arbGroupDup.vgId, dnodeId); + } + + code = 0; + + // auditRecord(pReq, pMnode->clusterId, "assignLeader", "", "", req.sql, req.sqlLen); + +_exit: + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("failed to assign leader since %s", tstrerror(code)); + } + + tFreeSAssignLeaderReq(&req); + TAOS_RETURN(code); +} + static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { int32_t code = 0, lino = 0; SMnode *pMnode = pReq->info.node; @@ -695,7 +765,7 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { mTrace("vgId:%d, arb skip to send msg by check sync", vgId); break; case CHECK_SYNC_SET_ASSIGNED_LEADER: - (void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token); + (void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token, false); mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, assgndDnodeId); break; case CHECK_SYNC_CHECK_SYNC: diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 6a3dbb488f..e20afb7201 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -46,7 +46,6 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq); -static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq); int32_t mndInitVgroup(SMnode *pMnode) { SSdbTable table = { @@ -78,7 +77,6 @@ int32_t mndInitVgroup(SMnode *pMnode) { // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg); mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg); - mndSetMsgHandle(pMnode, TDMT_MND_ASSIGN_LEADER, mndProcessAssignLeaderMsg); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup); @@ -3572,30 +3570,6 @@ _OVER: TAOS_RETURN(code); } -static int32_t mndProcessAssignLeaderMsg(SRpcMsg *pReq){ - SMnode *pMnode = pReq->info.node; - int32_t code = -1; - SArray *pArray = NULL; - void *pIter = NULL; - int64_t curMs = taosGetTimestampMs(); - - SAssignLeaderReq req = {0}; - if (tDeserializeSAssignLeaderReq(pReq->pCont, pReq->contLen, &req) != 0) { - code = TSDB_CODE_INVALID_MSG; - goto _OVER; - } - - auditRecord(pReq, pMnode->clusterId, "assignLeader", "", "", req.sql, req.sqlLen); - -_OVER: - if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("failed to assign leader since %s", tstrerror(code)); - } - - tFreeSAssignLeaderReq(&req); - TAOS_RETURN(code); -} - bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 0933fd48c7..ea91412536 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -334,10 +334,12 @@ int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) { ths->arbTerm = TMAX(req.arbTerm, ths->arbTerm); - if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) { - sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken, - req.memberToken); - goto _OVER; + if (!req.force) { + if (strncmp(req.memberToken, ths->arbToken, TSDB_ARB_TOKEN_SIZE) != 0) { + sInfo("vgId:%d, skip to set assigned leader, token mismatch, local:%s, msg:%s", ths->vgId, ths->arbToken, + req.memberToken); + goto _OVER; + } } if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) { diff --git a/tests/army/cluster/arbitrator_restart.py b/tests/army/cluster/arbitrator_restart.py new file mode 100644 index 0000000000..7a02e46630 --- /dev/null +++ b/tests/army/cluster/arbitrator_restart.py @@ -0,0 +1,111 @@ +import taos +import sys +import os +import subprocess +import glob +import shutil +import time + +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.srvCtl import * +from frame.caseBase import * +from frame import * +from frame.autogen import * +from frame import epath +# from frame.server.dnodes import * +# from frame.server.cluster import * + + +class TDTestCase(TBase): + + def init(self, conn, logSql, replicaVar=1): + updatecfgDict = {'dDebugFlag':131} + super(TDTestCase, self).init(conn, logSql, replicaVar=1, checkColName="c1") + + self.valgrind = 0 + self.db = "test" + self.stb = "meters" + self.childtable_count = 10 + tdSql.init(conn.cursor(), logSql) + + def run(self): + tdSql.execute('CREATE DATABASE db vgroups 1 replica 2;') + + time.sleep(1) + + tdSql.execute("use db;") + + tdSql.execute("CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);") + + tdSql.execute("CREATE TABLE d0 USING meters TAGS (\"California.SanFrancisco\", 2);"); + + count = 0 + + while count < 100: + tdSql.query("show arbgroups;") + + if tdSql.getData(0, 4) == 1: + break + + tdLog.info("wait 1 seconds for is sync") + time.sleep(1) + + count += 1 + + sc.dnodeStop(2) + sc.dnodeStop(3) + + sc.dnodeStart(2) + + count = 0 + while count < 100: + tdSql.query("show db.vgroups;") + + if(tdSql.getData(0, 4) == "candidate") or (tdSql.getData(0, 6) == "candidate"): + break + + tdLog.info("wait 1 seconds for candidate") + time.sleep(1) + + count += 1 + + tdSql.execute("balance vgroup;") + + count = 0 + while count < 100: + tdSql.query("show db.vgroups;") + + if(tdSql.getData(0, 4) == "assigned ") or (tdSql.getData(0, 6) == "assigned "): + break + + tdLog.info("wait 1 seconds for set assigned") + time.sleep(1) + + count += 1 + + tdSql.execute("INSERT INTO d0 VALUES (NOW, 10.3, 219, 0.31);") + + sc.dnodeStart(3) + + count = 0 + + while count < 100: + tdSql.query("show arbgroups;") + + if tdSql.getData(0, 4) == 1: + break + + tdLog.info("wait 1 seconds for is sync") + time.sleep(1) + + count += 1 + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())