From 0da606c7fbcdc6a589863ce33851ae1ea39f6f6a Mon Sep 17 00:00:00 2001 From: dmchen Date: Tue, 24 Dec 2024 12:14:47 +0000 Subject: [PATCH] show code --- include/libs/sync/sync.h | 2 +- source/common/src/systable.c | 3 +- source/dnode/mnode/impl/inc/mndArbGroup.h | 2 +- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndArbGroup.c | 23 +++++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 16 ++--- source/libs/sync/src/syncMain.c | 2 +- source/os/src/osLocale.c | 2 +- tests/army/cluster/arbgroup_sync.py | 81 +++++++++++++++++++++++ 9 files changed, 114 insertions(+), 18 deletions(-) create mode 100644 tests/army/cluster/arbgroup_sync.py diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 50c096258e..3ded45bc64 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -292,7 +292,7 @@ int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm); SSyncState syncGetState(int64_t rid); int32_t syncGetArbToken(int64_t rid, char* outToken); -int32_t syncGetAssignedLogSynced(int64_t rid); +int32_t syncCheckSynced(int64_t rid); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); const char* syncStr(ESyncState state); diff --git a/source/common/src/systable.c b/source/common/src/systable.c index cb08046399..d1ddb3204f 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -73,7 +73,8 @@ static const SSysDbTableSchema arbGroupsSchema[] = { {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "v1_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "v2_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, - {.name = "is_sync", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, + {.name = "is_sync", .bytes = 100, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, + //{.name = "sync_code", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "assigned_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "assigned_token", .bytes = TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "assigned_acked", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, diff --git a/source/dnode/mnode/impl/inc/mndArbGroup.h b/source/dnode/mnode/impl/inc/mndArbGroup.h index 66ef3f766b..8abc435205 100644 --- a/source/dnode/mnode/impl/inc/mndArbGroup.h +++ b/source/dnode/mnode/impl/inc/mndArbGroup.h @@ -41,7 +41,7 @@ int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup); bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId, SArbGroup *pNewGroup); bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token, - bool newIsSync, SArbGroup *pNewGroup); + bool newIsSync, SArbGroup *pNewGroup, int32_t code); bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode, SArbGroup *pNewGroup); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index e3d2ad6d34..2cf55b88ea 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -308,6 +308,7 @@ typedef struct { int64_t dbUid; SArbGroupMember members[TSDB_ARB_GROUP_MEMBER_NUM]; int8_t isSync; + int32_t code; SArbAssignedLeader assignedLeader; int64_t version; diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index 57de63b31d..d57f265add 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -1066,7 +1066,7 @@ static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *me } bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token, - bool newIsSync, SArbGroup *pNewGroup) { + bool newIsSync, SArbGroup *pNewGroup, int32_t code) { bool updateIsSync = false; (void)taosThreadMutexLock(&pGroup->mutex); @@ -1089,6 +1089,7 @@ bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0 if (pGroup->isSync != newIsSync) { mndArbGroupDupObj(pGroup, pNewGroup); pNewGroup->isSync = newIsSync; + pNewGroup->code = code; mInfo("vgId:%d, arb isSync updating, new isSync:%d", vgId, newIsSync); updateIsSync = true; @@ -1099,7 +1100,8 @@ _OVER: return updateIsSync; } -static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token, bool newIsSync) { +static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token, bool newIsSync, + int32_t rsp_code) { int32_t code = 0; SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId); if (pGroup == NULL) { @@ -1110,7 +1112,8 @@ static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token } SArbGroup newGroup = {0}; - bool updateIsSync = mndUpdateArbGroupByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup); + bool updateIsSync = + mndUpdateArbGroupByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup, rsp_code); if (updateIsSync) { if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) { mInfo("failed to pullup update arb sync, vgId:%d, since %s", vgId, terrstr()); @@ -1186,6 +1189,7 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) { TAOS_RETURN(code); } + mInfo("vgId:%d, arb check-sync-rsp received, errCode:%d", syncRsp.vgId, syncRsp.errCode); if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) { mInfo("skip update arb sync for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", syncRsp.vgId, arbToken, syncRsp.arbToken); @@ -1194,7 +1198,8 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) { } bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS); - if ((code = mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync)) != 0) { + if ((code = mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync, + syncRsp.errCode)) != 0) { mInfo("failed to update arb sync for vgId:%d, since:%s", syncRsp.vgId, terrstr()); goto _OVER; } @@ -1333,8 +1338,16 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock &lino, _OVER); } + char strSync[100] = {0}; + sprintf(strSync, "%d,%d", pGroup->isSync, pGroup->code); + char sync[100 + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(sync, strSync, 100 + VARSTR_HEADER_SIZE); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->isSync, false), pGroup, &lino, _OVER); + RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)sync, false), pGroup, &lino, _OVER); + + // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + // RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->code, false), pGroup, &lino, + // _OVER); if (pGroup->assignedLeader.dnodeId != 0) { pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index e82209e03f..837af4b0a8 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -53,8 +53,8 @@ static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t l static int32_t vnodeProcessDropTSmaCtbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, SRpcMsg *pOriginRpc); -static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token); -static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token); +static int32_t vnodeCheckToken(SVnode *pVnode, char *member0Token, char *member1Token); +static int32_t vnodeCheckSyncd(SVnode *pVnode, char *member0Token, char *member1Token); static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); extern int32_t vnodeProcessKillCompactReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); @@ -480,7 +480,7 @@ static int32_t vnodePreProcessArbCheckSyncMsg(SVnode *pVnode, SRpcMsg *pMsg) { return TSDB_CODE_INVALID_MSG; } - int32_t ret = vnodePreCheckAssignedLogSyncd(pVnode, syncReq.member0Token, syncReq.member1Token); + int32_t ret = vnodeCheckToken(pVnode, syncReq.member0Token, syncReq.member1Token); if (ret != 0) { vError("vgId:%d, failed to preprocess arb check sync request since %s", TD_VID(pVnode), tstrerror(ret)); } @@ -2501,7 +2501,7 @@ static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pR return 0; } -static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token) { +static int32_t vnodeCheckToken(SVnode *pVnode, char *member0Token, char *member1Token) { SSyncState syncState = syncGetState(pVnode->sync); if (syncState.state != TAOS_SYNC_STATE_LEADER) { return terrno = TSDB_CODE_SYN_NOT_LEADER; @@ -2521,13 +2521,13 @@ static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, return 0; } -static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token) { - int32_t code = vnodePreCheckAssignedLogSyncd(pVnode, member0Token, member1Token); +static int32_t vnodeCheckSyncd(SVnode *pVnode, char *member0Token, char *member1Token) { + int32_t code = vnodeCheckToken(pVnode, member0Token, member1Token); if (code != 0) { return code; } - return syncGetAssignedLogSynced(pVnode->sync); + return syncCheckSynced(pVnode->sync); } static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) { @@ -2551,7 +2551,7 @@ static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t l syncRsp.member1Token = syncReq.member1Token; syncRsp.vgId = TD_VID(pVnode); - if (vnodeCheckAssignedLogSyncd(pVnode, syncReq.member0Token, syncReq.member1Token) != 0) { + if (vnodeCheckSyncd(pVnode, syncReq.member0Token, syncReq.member1Token) != 0) { vError("vgId:%d, failed to check assigned log syncd", TD_VID(pVnode)); } syncRsp.errCode = terrno; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index fbde104f4e..ddb7400eee 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -699,7 +699,7 @@ int32_t syncGetArbToken(int64_t rid, char* outToken) { TAOS_RETURN(code); } -int32_t syncGetAssignedLogSynced(int64_t rid) { +int32_t syncCheckSynced(int64_t rid) { int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { diff --git a/source/os/src/osLocale.c b/source/os/src/osLocale.c index 484436fafe..ba5b19adde 100644 --- a/source/os/src/osLocale.c +++ b/source/os/src/osLocale.c @@ -84,7 +84,7 @@ int32_t taosSetSystemLocale(const char *inLocale) { if (NULL == locale) { terrno = TSDB_CODE_INVALID_PARA; uError("failed to set locale:%s", inLocale); - return terrno; + return 0; } tstrncpy(tsLocale, locale, TD_LOCALE_LEN); diff --git a/tests/army/cluster/arbgroup_sync.py b/tests/army/cluster/arbgroup_sync.py new file mode 100644 index 0000000000..9fd8e7b1f3 --- /dev/null +++ b/tests/army/cluster/arbgroup_sync.py @@ -0,0 +1,81 @@ +import taos +import sys +import os +import subprocess +import glob +import shutil +import time + +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.srvCtl import * +from frame.caseBase import * +from frame import * +from frame.autogen import * +from frame import epath +# from frame.server.dnodes import * +# from frame.server.cluster import * + + +class TDTestCase(TBase): + + def init(self, conn, logSql, replicaVar=1): + updatecfgDict = {'dDebugFlag':131} + super(TDTestCase, self).init(conn, logSql, replicaVar=1, checkColName="c1") + + self.valgrind = 0 + self.db = "test" + self.stb = "meters" + self.childtable_count = 10 + tdSql.init(conn.cursor(), logSql) + + def run(self): + tdSql.execute('CREATE DATABASE db vgroups 1 replica 2;') + + time.sleep(1) + + count = 0 + + while count < 100: + tdSql.query("show arbgroups;") + + if tdSql.getData(0, 4) == 1: + break + + tdLog.info("wait 1 seconds for is sync") + time.sleep(1) + + count += 1 + + + tdSql.query("show db.vgroups;") + + if(tdSql.getData(0, 4) == "follower") and (tdSql.getData(0, 6) == "leader"): + tdLog.info("stop dnode2") + sc.dnodeStop(2) + + if(tdSql.getData(0, 6) == "follower") and (tdSql.getData(0, 4) == "leader"): + tdLog.info("stop dnode 3") + sc.dnodeStop(3) + + + count = 0 + while count < 100: + tdSql.query("show db.vgroups;") + + if(tdSql.getData(0, 4) == "assigned ") or (tdSql.getData(0, 6) == "assigned "): + break + + tdLog.info("wait 1 seconds for set assigned") + time.sleep(1) + + count += 1 + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())