From 0bbe049e58110767b60b3ed940ea402bfea175e7 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 3 Nov 2022 09:08:36 +0800 Subject: [PATCH 01/10] fix: set fs and last dataf reader to null when resetting --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index dbc02363ea..1396b822bf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -333,11 +333,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 code = TSDB_CODE_INVALID_PARA; } - _end: +_end: tsdbDataFReaderClose(&pr->pDataFReaderLast); tsdbDataFReaderClose(&pr->pDataFReader); tsdbUntakeReadSnap(pr->pVnode->pTsdb, pr->pReadSnap, "cache-l"); + pr->pDataFReaderLast = NULL; + pr->pDataFReader = NULL; for (int32_t j = 0; j < pr->numOfCols; ++j) { taosMemoryFree(pRes[j]); From 1a4b7622e418c8a5c8aa8263f317db7e8e3dff50 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 3 Nov 2022 09:39:20 +0800 Subject: [PATCH 02/10] enh(sync): add sync pre stop --- include/libs/sync/sync.h | 1 + source/dnode/mnode/impl/src/mndMain.c | 1 + source/dnode/vnode/src/vnd/vnodeOpen.c | 1 + source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/src/syncMain.c | 17 +++++++++++++++++ 5 files changed, 21 insertions(+) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 694a6ef62b..b6ff93ec85 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -211,6 +211,7 @@ void syncCleanUp(); int64_t syncOpen(SSyncInfo* pSyncInfo); void syncStart(int64_t rid); void syncStop(int64_t rid); +void syncPreStop(int64_t rid); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index f604a9289a..1b2d85bd29 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -429,6 +429,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { void mndPreClose(SMnode *pMnode) { if (pMnode != NULL) { syncLeaderTransfer(pMnode->syncMgmt.sync); + syncPreStop(pMnode->syncMgmt.sync); } } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 0696ec0901..f7164c4ac3 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -245,6 +245,7 @@ _err: void vnodePreClose(SVnode *pVnode) { if (pVnode) { syncLeaderTransfer(pVnode->sync); + syncPreStop(pVnode->sync); } } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 6ec29d69f5..a5ff653b69 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -218,6 +218,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo); void syncNodeStart(SSyncNode* pSyncNode); void syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); +void syncNodePreClose(SSyncNode* pSyncNode); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); // option diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index eb78ea2894..81077e5361 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -81,6 +81,15 @@ void syncStop(int64_t rid) { } } +void syncPreStop(int64_t rid) { + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode == NULL) return; + + syncNodePreClose(pSyncNode); + + syncNodeRelease(pSyncNode); +} + static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) { if (!syncNodeInConfig(pSyncNode, pCfg)) return false; return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1; @@ -1222,6 +1231,14 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { ASSERT(ret == 0); } +void syncNodePreClose(SSyncNode* pSyncNode) { + // stop elect timer + syncNodeStopElectTimer(pSyncNode); + + // stop heartbeat timer + syncNodeStopHeartbeatTimer(pSyncNode); +} + void syncNodeClose(SSyncNode* pSyncNode) { if (pSyncNode == NULL) { return; From 1eef25620d87d21c78e11a596881453e2643d29c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 3 Nov 2022 10:09:36 +0800 Subject: [PATCH 03/10] fix: fix tbname in crash issue --- source/libs/executor/src/executil.c | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 32386a72fd..5806aca6f2 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -925,6 +925,15 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* SArray* pTbList = getTableNameList(pList); int32_t numOfTables = taosArrayGetSize(pTbList); + SHashObj *uHash = NULL; + size_t listlen = taosArrayGetSize(list); // len > 0 means there already have uids + if (listlen > 0) { + uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + for (int i = 0; i < listlen; i++) { + int64_t *uid = taosArrayGet(list, i); + taosHashPut(uHash, uid, sizeof(int64_t), &i, sizeof(i)); + } + } for (int i = 0; i < numOfTables; i++) { char* name = taosArrayGetP(pTbList, i); @@ -933,9 +942,12 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* if (metaGetTableUidByName(metaHandle, name, &uid) == 0) { ETableType tbType = TSDB_TABLE_MAX; if (metaGetTableTypeByName(metaHandle, name, &tbType) == 0 && tbType == TSDB_CHILD_TABLE) { - taosArrayPush(list, &uid); + if (NULL == uHash || taosHashGet(uHash, &uid, sizeof(uid)) == NULL) { + taosArrayPush(list, &uid); + } } else { taosArrayDestroy(pTbList); + taosHashCleanup(uHash); return -1; } } else { @@ -944,6 +956,7 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* } } + taosHashCleanup(uHash); taosArrayDestroy(pTbList); return 0; } From d2a29ed7a7f6489bc593cc291abb579528432f34 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 3 Nov 2022 10:18:02 +0800 Subject: [PATCH 04/10] fix:add tbname in test cases --- tests/script/jenkins/basic.txt | 1 + tests/script/tsim/tag/tbNameIn.sim | 102 +++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 tests/script/tsim/tag/tbNameIn.sim diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 1a80aa0681..70b422cf53 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -449,6 +449,7 @@ ./test.sh -f tsim/tag/smallint.sim ./test.sh -f tsim/tag/tinyint.sim ./test.sh -f tsim/tag/drop_tag.sim +./test.sh -f tsim/tag/tbNameIn.sim ./test.sh -f tmp/monitor.sim #======================b1-end=============== diff --git a/tests/script/tsim/tag/tbNameIn.sim b/tests/script/tsim/tag/tbNameIn.sim new file mode 100644 index 0000000000..1af4bd6a9e --- /dev/null +++ b/tests/script/tsim/tag/tbNameIn.sim @@ -0,0 +1,102 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +print ======== step1 +sql drop database if exists db1; +sql create database db1 vgroups 3; +sql use db1; +sql create stable st1 (ts timestamp, f1 int) tags(tg1 int); +sql create table tb1 using st1 tags(1); +sql create table tb2 using st1 tags(2); +sql create table tb3 using st1 tags(3); +sql create table tb4 using st1 tags(4); +sql create table tb5 using st1 tags(5); +sql create table tb6 using st1 tags(6); +sql create table tb7 using st1 tags(7); +sql create table tb8 using st1 tags(8); + +sql insert into tb1 values ('2022-07-10 16:31:01', 1); +sql insert into tb2 values ('2022-07-10 16:31:02', 2); +sql insert into tb3 values ('2022-07-10 16:31:03', 3); +sql insert into tb4 values ('2022-07-10 16:31:04', 4); +sql insert into tb5 values ('2022-07-10 16:31:05', 5); +sql insert into tb6 values ('2022-07-10 16:31:06', 6); +sql insert into tb7 values ('2022-07-10 16:31:07', 7); +sql insert into tb8 values ('2022-07-10 16:31:08', 8); + +sql select * from tb1 where tbname in ('tb1'); +if $rows != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +sql select * from tb1 where tbname in ('tb1','tb1'); +if $rows != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +sql select * from tb1 where tbname in ('tb1','tb2','tb1'); +if $rows != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +sql select * from tb1 where tbname in ('tb1','tb2','st1'); +if $rows != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +sql select * from tb1 where tbname = 'tb1'; +if $rows != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +sql select * from tb1 where tbname > 'tb1'; +if $rows != 0 then + return -1 +endi +sql select * from st1 where tbname in ('tb1'); +if $rows != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +sql select * from st1 where tbname in ('tb1','tb1'); +if $rows != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +sql select * from st1 where tbname in ('tb1','tb2','tb1'); +if $rows != 2 then + return -1 +endi +sql select * from st1 where tbname in ('tb1','tb2','st1'); +if $rows != 2 then + return -1 +endi +sql select * from st1 where tbname = 'tb1'; +if $rows != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +sql select * from st1 where tbname > 'tb1'; +if $rows != 7 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT From 102969086da4cb6a58cc27c7dc00e47f8a067db3 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 3 Nov 2022 10:25:38 +0800 Subject: [PATCH 05/10] refactor(sync): add local-cmd:follower-commit --- source/libs/sync/inc/syncTools.h | 2 ++ source/libs/sync/src/syncAppendEntries.c | 5 +++++ source/libs/sync/src/syncMain.c | 23 +++++++++++++++++++--- source/libs/sync/src/syncMessage.c | 5 +++++ source/libs/sync/test/syncLocalCmdTest.cpp | 1 + 5 files changed, 33 insertions(+), 3 deletions(-) diff --git a/source/libs/sync/inc/syncTools.h b/source/libs/sync/inc/syncTools.h index b48519a5b0..2d87fcf7fa 100644 --- a/source/libs/sync/inc/syncTools.h +++ b/source/libs/sync/inc/syncTools.h @@ -729,6 +729,7 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); typedef enum { SYNC_LOCAL_CMD_STEP_DOWN = 100, + SYNC_LOCAL_CMD_FOLLOWER_CMT, } ESyncLocalCmd; const char* syncLocalCmdGetStr(int32_t cmd); @@ -742,6 +743,7 @@ typedef struct SyncLocalCmd { int32_t cmd; SyncTerm sdNewTerm; // step down new term + SyncIndex fcIndex;// follower commit index } SyncLocalCmd; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4638475e71..f0e296d872 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -90,6 +90,11 @@ // int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { + if (ths->state != TAOS_SYNC_STATE_FOLLOWER) { + syncNodeEventLog(ths, "can not do follower commit"); + return -1; + } + // maybe update commit index, leader notice me if (newCommitIndex > ths->commitIndex) { // has commit entry in local diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 81077e5361..014ad0425d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2842,11 +2842,25 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { syncNodeResetElectTimer(ths); ths->minMatchIndex = pMsg->minMatchIndex; -#if 0 if (ths->state == TAOS_SYNC_STATE_FOLLOWER) { - syncNodeFollowerCommit(ths, pMsg->commitIndex); + // syncNodeFollowerCommit(ths, pMsg->commitIndex); + SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId); + pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT; + pSyncMsg->fcIndex = pMsg->commitIndex; + + SRpcMsg rpcMsgLocalCmd; + syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd); + + if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { + int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); + if (code != 0) { + sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code); + rpcFreeCont(rpcMsgLocalCmd.pCont); + } else { + sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index: %" PRIu64, ths->vgId, pSyncMsg->fcIndex); + } + } } -#endif } if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { @@ -2900,6 +2914,9 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) { if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { syncNodeStepDown(ths, pMsg->sdNewTerm); + } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { + syncNodeFollowerCommit(ths, pMsg->fcIndex); + } else { syncNodeErrorLog(ths, "error local cmd"); } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 91e8ec91b7..d0df931a88 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -3400,6 +3400,8 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) { const char* syncLocalCmdGetStr(int32_t cmd) { if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) { return "step-down"; + } else if (cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { + return "follower-commit"; } return "unknown-local-cmd"; @@ -3511,6 +3513,9 @@ cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) { snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm); cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fcIndex); + cJSON_AddStringToObject(pRoot, "fc-index", u64buf); } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/test/syncLocalCmdTest.cpp b/source/libs/sync/test/syncLocalCmdTest.cpp index de908bf9c1..b42626df29 100644 --- a/source/libs/sync/test/syncLocalCmdTest.cpp +++ b/source/libs/sync/test/syncLocalCmdTest.cpp @@ -21,6 +21,7 @@ SyncLocalCmd *createMsg() { pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.vgId = 100; pMsg->sdNewTerm = 123; + pMsg->fcIndex = 456; pMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; return pMsg; From 108b9ffed67aa568ae45ebd499b1494f1a288845 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 3 Nov 2022 10:42:33 +0800 Subject: [PATCH 06/10] refact: add sequence for status msg --- include/common/tmsg.h | 2 ++ source/common/src/tmsg.c | 4 ++++ source/dnode/mgmt/mgmt_dnode/inc/dmInt.h | 1 + source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 15 ++++++++---- source/dnode/mnode/impl/src/mndDnode.c | 26 +++++++++++---------- 5 files changed, 32 insertions(+), 16 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 76b13579c1..9ddab5b553 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1128,6 +1128,7 @@ typedef struct { SQnodeLoad qload; SClusterCfg clusterCfg; SArray* pVloads; // array of SVnodeLoad + int32_t statusSeq; } SStatusReq; int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); @@ -1149,6 +1150,7 @@ typedef struct { int64_t dnodeVer; SDnodeCfg dnodeCfg; SArray* pDnodeEps; // Array of SDnodeEp + int32_t statusSeq; } SStatusRsp; int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 073e6bc8c6..562c53f899 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1020,6 +1020,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tEncodeI64(&encoder, pReq->qload.timeInQueryQueue) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.timeInFetchQueue) < 0) return -1; + if (tEncodeI32(&encoder, pReq->statusSeq) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1095,6 +1096,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tDecodeI64(&decoder, &pReq->qload.timeInQueryQueue) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.timeInFetchQueue) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->statusSeq) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); return 0; @@ -1126,6 +1128,7 @@ int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) { if (tEncodeU16(&encoder, pDnodeEp->ep.port) < 0) return -1; } + if (tEncodeI32(&encoder, pRsp->statusSeq) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1167,6 +1170,7 @@ int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) { } } + if (tDecodeI32(&decoder, &pRsp->statusSeq) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); return 0; diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index dc4412b77b..c776beb3f0 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -36,6 +36,7 @@ typedef struct SDnodeMgmt { GetVnodeLoadsFp getVnodeLoadsFp; GetMnodeLoadsFp getMnodeLoadsFp; GetQnodeLoadsFp getQnodeLoadsFp; + int32_t statusSeq; } SDnodeMgmt; // dmHandle.c diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index f12dce5149..645d3c3fbc 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -32,9 +32,13 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { } static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { + const STraceId *trace = &pRsp->info.traceId; + dGTrace("status msg received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code); + if (pRsp->code != 0) { if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) { - dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId); + dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId, + pMgmt->statusSeq); pMgmt->pData->dropped = 1; dmWriteEps(pMgmt->pData); } @@ -42,9 +46,9 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { SStatusRsp statusRsp = {0}; if (pRsp->pCont != NULL && pRsp->contLen > 0 && tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { - dTrace("status msg received from mnode, dnodeVer:%" PRId64 " saved:%" PRId64, statusRsp.dnodeVer, - pMgmt->pData->dnodeVer); if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) { + dGInfo("status msg received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq, + statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer); pMgmt->pData->dnodeVer = statusRsp.dnodeVer; dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps); @@ -91,6 +95,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { (*pMgmt->getQnodeLoadsFp)(&req.qload); + pMgmt->statusSeq++; + req.statusSeq = pMgmt->statusSeq; + int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); void *pHead = rpcMallocCont(contLen); tSerializeSStatusReq(pHead, contLen, &req); @@ -99,7 +106,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .info.ahandle = (void *)0x9527}; SRpcMsg rpcRsp = {0}; - dTrace("send status msg to mnode, dnodeVer:%" PRId64, req.dnodeVer); + dTrace("send status msg to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq); SEpSet epSet = {0}; dmGetMnodeEpSet(pMgmt->pData, &epSet); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 9d91420656..04f340b0ff 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -345,6 +345,19 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { } } + int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); + int64_t curMs = taosGetTimestampMs(); + bool online = mndIsDnodeOnline(pDnode, curMs); + bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer); + bool reboot = (pDnode->rebootTime != statusReq.rebootTime); + bool needCheck = !online || dnodeChanged || reboot; + + pDnode->accessTimes++; + pDnode->lastAccessTime = curMs; + const STraceId *trace = &pReq->info.traceId; + mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id, + pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq); + for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) { SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v); @@ -396,18 +409,6 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { mndReleaseQnode(pMnode, pQnode); } - int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); - int64_t curMs = taosGetTimestampMs(); - bool online = mndIsDnodeOnline(pDnode, curMs); - bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer); - bool reboot = (pDnode->rebootTime != statusReq.rebootTime); - bool needCheck = !online || dnodeChanged || reboot; - - pDnode->accessTimes++; - pDnode->lastAccessTime = curMs; - mTrace("dnode:%d, status received, access times:%d check:%d online:%d reboot:%d changed:%d", pDnode->id, - pDnode->accessTimes, needCheck, online, reboot, dnodeChanged); - if (needCheck) { if (statusReq.sver != tsVersion) { if (pDnode != NULL) { @@ -455,6 +456,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { pDnode->memTotal = statusReq.memTotal; SStatusRsp statusRsp = {0}; + statusRsp.statusSeq++; statusRsp.dnodeVer = dnodeVer; statusRsp.dnodeCfg.dnodeId = pDnode->id; statusRsp.dnodeCfg.clusterId = pMnode->clusterId; From 843e17744ea839e7f8ae1bbd2fd56eaf7ac53a01 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 3 Nov 2022 10:57:20 +0800 Subject: [PATCH 07/10] refact: add sequence for status msg --- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 645d3c3fbc..85a09b79fd 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -33,7 +33,7 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { const STraceId *trace = &pRsp->info.traceId; - dGTrace("status msg received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code); + dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code); if (pRsp->code != 0) { if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) { @@ -47,7 +47,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { if (pRsp->pCont != NULL && pRsp->contLen > 0 && tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) { - dGInfo("status msg received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq, + dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq, statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer); pMgmt->pData->dnodeVer = statusRsp.dnodeVer; dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); @@ -106,13 +106,13 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .info.ahandle = (void *)0x9527}; SRpcMsg rpcRsp = {0}; - dTrace("send status msg to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq); + dTrace("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq); SEpSet epSet = {0}; dmGetMnodeEpSet(pMgmt->pData, &epSet); rpcSendRecv(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { - dError("failed to send status msg since %s, numOfEps:%d inUse:%d", tstrerror(rpcRsp.code), epSet.numOfEps, + dError("failed to send status req since %s, numOfEps:%d inUse:%d", tstrerror(rpcRsp.code), epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { dDebug("index:%d, mnode ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); From f724a3d172ae34fb4611186d69d878ccb4626e1a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 3 Nov 2022 11:14:33 +0800 Subject: [PATCH 08/10] test: adjust case --- .../6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py index 62f4b248f9..5253066ec5 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py @@ -93,7 +93,9 @@ class TDTestCase: tdSql.query("select * from information_schema.ins_dnodes;") tdSql.checkData(0,1,'%s:6030'%self.host) tdSql.checkData(4,1,'%s:6430'%self.host) + tdLog.info("===>1 first check dnode and mnode %d" % dnodeNumbers) clusterComCheck.checkDnodes(dnodeNumbers) + tdLog.info("===>2 first check dnode and mnode %d" % dnodeNumbers) clusterComCheck.checkMnodeStatus(1) # fisr add three mnodes; @@ -172,7 +174,7 @@ class TDTestCase: def run(self): # print(self.master_dnode.cfgDict) - self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=2,stopRole='dnode') + self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=2,stopRole='dnode') def stop(self): tdSql.close() From 75e4d6f22ce7074eb9eec9f66861b63cb32c9545 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 3 Nov 2022 11:16:38 +0800 Subject: [PATCH 09/10] test: adjust case --- .../6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py index 5253066ec5..00f0472db3 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py @@ -93,9 +93,7 @@ class TDTestCase: tdSql.query("select * from information_schema.ins_dnodes;") tdSql.checkData(0,1,'%s:6030'%self.host) tdSql.checkData(4,1,'%s:6430'%self.host) - tdLog.info("===>1 first check dnode and mnode %d" % dnodeNumbers) clusterComCheck.checkDnodes(dnodeNumbers) - tdLog.info("===>2 first check dnode and mnode %d" % dnodeNumbers) clusterComCheck.checkMnodeStatus(1) # fisr add three mnodes; From 1fc79e289d5b56a8964c56a2f16e8caec31a514d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 3 Nov 2022 11:50:24 +0800 Subject: [PATCH 10/10] refactor(sync): modify leader transfer --- source/libs/sync/src/syncMain.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 014ad0425d..5cd1ba3025 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -444,8 +444,12 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) { return -1; } - SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0]; - int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader); + int32_t ret = 0; + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0]; + ret = syncNodeLeaderTransferTo(pSyncNode, newLeader); + } + return ret; }