From babefefa649c6ec0706bf51102faa4d70f1bc5ef Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 26 May 2022 22:50:46 +0800 Subject: [PATCH 1/7] test: case for mnode sync --- tests/script/tsim/mnode/basic2.sim | 6 +++--- tests/script/tsim/mnode/basic3.sim | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/script/tsim/mnode/basic2.sim b/tests/script/tsim/mnode/basic2.sim index 024e2b2406..6df3704469 100644 --- a/tests/script/tsim/mnode/basic2.sim +++ b/tests/script/tsim/mnode/basic2.sim @@ -119,9 +119,9 @@ if $data(2)[4] != ready then endi print =============== insert data -#sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 float, t3 binary(16)) comment "abd" -#sql create table db.ctb using db.stb tags(101, 102, "103") -#sql insert into db.ctb values(now, 1, "2") +sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 float, t3 binary(16)) comment "abd" +sql create table db.ctb using db.stb tags(101, 102, "103") +sql insert into db.ctb values(now, 1, "2") system sh/exec.sh -n dnode1 -s stop system sh/exec.sh -n dnode2 -s stop \ No newline at end of file diff --git a/tests/script/tsim/mnode/basic3.sim b/tests/script/tsim/mnode/basic3.sim index 40c0f01229..b0ee23cd8c 100644 --- a/tests/script/tsim/mnode/basic3.sim +++ b/tests/script/tsim/mnode/basic3.sim @@ -15,7 +15,7 @@ $x = 0 step1: $x = $x + 1 sleep 1000 - if $x == 20 then + if $x == 50 then return -1 endi sql show dnodes -x step1 @@ -37,7 +37,7 @@ $x = 0 step2: $x = $x + 1 sleep 1000 - if $x == 20 then + if $x == 50 then return -1 endi sql show mnodes -x step2 @@ -68,7 +68,7 @@ $x = 0 step4: $x = $x + 1 sleep 1000 - if $x == 20 then + if $x == 50 then return -1 endi sql show mnodes -x step4 @@ -98,7 +98,7 @@ $x = 0 step5: $x = $x + 1 sleep 1000 - if $x == 20 then + if $x == 50 then return -1 endi sql show mnodes -x step5 @@ -119,7 +119,7 @@ $x = 0 step6: $x = $x + 1 sleep 1000 - if $x == 20 then + if $x == 50 then return -1 endi sql show mnodes -x step6 From e0e1fb31901909430ae6c3e4bba6d25e9862df7f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 26 May 2022 23:11:40 +0800 Subject: [PATCH 2/7] refactor: make trans can exec one by one --- source/dnode/mnode/impl/inc/mndDef.h | 6 ++++ source/dnode/mnode/impl/inc/mndTrans.h | 1 + source/dnode/mnode/impl/src/mndMnode.c | 40 ++++++++++++++------------ source/dnode/mnode/impl/src/mndSma.c | 1 + source/dnode/mnode/impl/src/mndTrans.c | 22 +++++++++++++- 5 files changed, 50 insertions(+), 20 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 6318f2e3f2..755ca09199 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -123,6 +123,11 @@ typedef enum { TRN_POLICY_RETRY = 1, } ETrnPolicy; +typedef enum { + TRN_EXEC_PARALLEL = 0, + TRN_EXEC_ONE_BY_ONE = 1, +} ETrnExecType; + typedef enum { DND_REASON_ONLINE = 0, DND_REASON_STATUS_MSG_TIMEOUT, @@ -151,6 +156,7 @@ typedef struct { ETrnStage stage; ETrnPolicy policy; ETrnType type; + ETrnExecType parallel; int32_t code; int32_t failedTimes; SRpcHandleInfo rpcInfo; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 84e7a17192..84bed13e2e 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -57,6 +57,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetCb(STrans *pTrans, ETrnFuncType startFunc, ETrnFuncType stopFunc, void *param, int32_t paramLen); void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb); +void mndTransSetExecOneByOne(STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); void mndTransProcessRsp(SRpcMsg *pRsp); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 82e6256295..cb6b28eba1 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -312,25 +312,6 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno createEpset.eps[0].port = pDnode->port; memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - { - int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq); - void *pReq = taosMemoryMalloc(contLen); - tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq); - - STransAction action = { - .epSet = alterEpset, - .pCont = pReq, - .contLen = contLen, - .msgType = TDMT_DND_ALTER_MNODE, - .acceptableCode = 0, - }; - - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - } - { int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq); void *pReq = taosMemoryMalloc(contLen); @@ -350,6 +331,25 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno } } + { + int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq); + void *pReq = taosMemoryMalloc(contLen); + tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq); + + STransAction action = { + .epSet = alterEpset, + .pCont = pReq, + .contLen = contLen, + .msgType = TDMT_DND_ALTER_MNODE, + .acceptableCode = 0, + }; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + } + return 0; } @@ -368,6 +368,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER; + mndTransSetExecOneByOne(pTrans); if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; @@ -541,6 +542,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) { if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + mndTransSetExecOneByOne(pTrans); code = 0; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index cb7d3e81f6..7b5d1b6c32 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -507,6 +507,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name); mndTransSetDbInfo(pTrans, pDb); + mndTransSetExecOneByOne(pTrans); if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index a7480f459a..16c56a63df 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -140,6 +140,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_INT16(pRaw, dataPos, stage, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER) + SDB_SET_INT16(pRaw, dataPos, pTrans->parallel, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) @@ -245,12 +246,15 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { int16_t stage = 0; int16_t policy = 0; int16_t type = 0; + int16_t parallel = 0; SDB_GET_INT16(pRaw, dataPos, &stage, _OVER) SDB_GET_INT16(pRaw, dataPos, &policy, _OVER) SDB_GET_INT16(pRaw, dataPos, &type, _OVER) + SDB_GET_INT16(pRaw, dataPos, ¶llel, _OVER) pTrans->stage = stage; pTrans->policy = policy; pTrans->type = type; + pTrans->parallel = parallel; SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) @@ -665,6 +669,8 @@ void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) { memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN); } +void mndTransSetExecOneByOne(STrans *pTrans) { pTrans->parallel = TRN_EXEC_ONE_BY_ONE; } + static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { SSdbRaw *pRaw = mndTransActionEncode(pTrans); if (pRaw == NULL) { @@ -970,7 +976,18 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction == NULL) continue; - if (pAction->msgSent) continue; + + if (pAction->msgSent) { + if (pAction->msgReceived) { + continue; + } else { + if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) { + break; + } else { + continue; + } + } + } int64_t signature = pTrans->id; signature = (signature << 32); @@ -990,6 +1007,9 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr pAction->msgSent = 1; pAction->msgReceived = 0; pAction->errCode = 0; + if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) { + break; + } } else { pAction->msgSent = 0; pAction->msgReceived = 0; From 89d172f999900db825c8d500c09221effd49d88c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 26 May 2022 23:23:41 +0800 Subject: [PATCH 3/7] refactor: make trans can exec one by one --- source/dnode/mnode/impl/src/mndMnode.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index cb6b28eba1..344b4f6263 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -365,10 +365,10 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); + mndTransSetExecOneByOne(pTrans); if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER; - mndTransSetExecOneByOne(pTrans); if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; @@ -537,12 +537,11 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) { if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); - + mndTransSetExecOneByOne(pTrans); if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; - mndTransSetExecOneByOne(pTrans); code = 0; From 17ed7e0747a50a28b053341f8838b24606b760e9 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 27 May 2022 00:00:24 +0800 Subject: [PATCH 4/7] fix(sync) delete assert, due to config change --- source/libs/sync/src/syncIndexMgr.c | 4 +++- source/libs/sync/src/syncMain.c | 10 ++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 5809cedb90..b44b0da750 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -60,7 +60,9 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, return; } } - assert(0); + + // maybe config change + // assert(0); } SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 99aac7991a..5ad8df11a9 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -981,6 +981,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { } void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop) { + SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg; pSyncNode->pRaftCfg->cfg = *newConfig; int32_t ret = 0; @@ -1014,6 +1015,15 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDro // isDrop *isDrop = true; + bool IamInOld, IamInNew; + for (int i = 0; i < oldConfig.replicaNum; ++i) { + if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && + (oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { + *isDrop = false; + break; + } + } + for (int i = 0; i < newConfig->replicaNum; ++i) { if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && (newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { From c943497ad5c8b74b45f29faa73228aa95c7c89af Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 27 May 2022 09:48:25 +0800 Subject: [PATCH 5/7] test: case for drop mnode --- tests/script/jenkins/basic.txt | 2 +- tests/script/tsim/mnode/basic1.sim | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 623182fddf..e85e9b8896 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -55,7 +55,7 @@ ./test.sh -f tsim/bnode/basic1.sim # ---- mnode -#./test.sh -f tsim/mnode/basic1.sim +./test.sh -f tsim/mnode/basic1.sim ./test.sh -f tsim/mnode/basic2.sim # ---- show diff --git a/tests/script/tsim/mnode/basic1.sim b/tests/script/tsim/mnode/basic1.sim index 198f36cdd2..9131237ca6 100644 --- a/tests/script/tsim/mnode/basic1.sim +++ b/tests/script/tsim/mnode/basic1.sim @@ -88,7 +88,7 @@ sql show mnodes print $data(1)[0] $data(1)[1] $data(1)[2] print $data(2)[0] $data(2)[1] $data(2)[2] -if $rows != 2 then +if $rows != 1 then return -1 endi if $data(1)[0] != 1 then @@ -97,16 +97,16 @@ endi if $data(1)[2] != LEADER then return -1 endi -if $data(2)[0] != NULL then +if $data(2)[0] != null then goto step2 endi -if $data(2)[2] != NULL then +if $data(2)[2] != null then goto step2 endi sleep 2000 -print =============== create drop mnodes +print =============== create mnodes sql create mnode on dnode 2 sql show mnodes if $rows != 2 then From 9d93b75020392174dd65192025d51dd750a52e9f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 27 May 2022 11:52:15 +0800 Subject: [PATCH 6/7] fix: reset epset on mnode changed --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 ++ source/dnode/mgmt/node_util/src/dmEps.c | 1 + 2 files changed, 3 insertions(+) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 6fbfae8b41..c86ace695b 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -64,6 +64,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { } else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) { qWorkerProcessFetchRsp(NULL, NULL, pRpc); return; + } else if (pRpc->msgType == TDMT_MND_STATUS_RSP && pEpSet != NULL) { + dmSetMnodeEpSet(&pDnode->data, pEpSet); } else { } diff --git a/source/dnode/mgmt/node_util/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c index e0af20e41b..332dd9a58a 100644 --- a/source/dnode/mgmt/node_util/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -326,6 +326,7 @@ void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet } void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { + if (memcmp(pEpSet, &pData->mnodeEps, sizeof(SEpSet)) == 0) return; taosThreadRwlockWrlock(&pData->lock); pData->mnodeEps = *pEpSet; taosThreadRwlockUnlock(&pData->lock); From c44ec05229ad6b86c907034f7ca2286f14a4e44f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 27 May 2022 15:21:23 +0800 Subject: [PATCH 7/7] fix: error in sync sem --- source/dnode/mnode/impl/inc/mndInt.h | 1 + source/dnode/mnode/impl/inc/mndSync.h | 2 +- source/dnode/mnode/impl/src/mndMnode.c | 5 +- source/dnode/mnode/impl/src/mndSync.c | 62 ++++++++++++++++------- source/dnode/mnode/impl/src/mndTrans.c | 2 +- source/dnode/mnode/sdb/inc/sdb.h | 2 + source/dnode/mnode/sdb/src/sdbRaw.c | 5 ++ tests/script/tsim/mnode/basic2.sim | 7 +++ tests/script/tsim/valgrind/checkError.sim | 2 +- 9 files changed, 65 insertions(+), 23 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 189ea82bfc..cec49a4cbe 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -81,6 +81,7 @@ typedef struct { bool standby; bool restored; int32_t errCode; + int32_t transId; } SSyncMgmt; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndSync.h b/source/dnode/mnode/impl/inc/mndSync.h index 356f215267..cb9d70d5ee 100644 --- a/source/dnode/mnode/impl/inc/mndSync.h +++ b/source/dnode/mnode/impl/inc/mndSync.h @@ -25,7 +25,7 @@ extern "C" { int32_t mndInitSync(SMnode *pMnode); void mndCleanupSync(SMnode *pMnode); bool mndIsMaster(SMnode *pMnode); -int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw); +int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId); void mndSyncStart(SMnode *pMnode); void mndSyncStop(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 344b4f6263..03013a96de 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -702,14 +702,17 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { } } + mTrace("trans:-1, sync reconfig will be proposed"); + SSyncMgmt *pMgmt = &pMnode->syncMgmt; pMgmt->standby = 0; int32_t code = syncReconfig(pMgmt->sync, &cfg); if (code != 0) { - mError("failed to alter mnode sync since %s", terrstr()); + mError("trans:-1, failed to propose sync reconfig since %s", terrstr()); return code; } else { pMgmt->errCode = 0; + pMgmt->transId = -1; tsem_wait(&pMgmt->syncSem); mInfo("alter mnode sync result:%s", tstrerror(pMgmt->errCode)); terrno = pMgmt->errCode; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index f34ab28cce..16d836c817 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -28,16 +28,26 @@ int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SMnode *pMnode = pFsm->data; - SSdbRaw *pRaw = pMsg->pCont; + SMnode *pMnode = pFsm->data; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + SSdbRaw *pRaw = pMsg->pCont; - mTrace("raw:%p, apply to sdb, ver:%" PRId64 " term:%" PRId64 " role:%s", pRaw, cbMeta.index, cbMeta.term, - syncStr(cbMeta.state)); - sdbWriteWithoutFree(pMnode->pSdb, pRaw); - sdbSetApplyIndex(pMnode->pSdb, cbMeta.index); - sdbSetApplyTerm(pMnode->pSdb, cbMeta.term); - if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { - tsem_post(&pMnode->syncMgmt.syncSem); + int32_t transId = sdbGetIdFromRaw(pRaw); + pMgmt->errCode = cbMeta.code; + mTrace("trans:%d, is proposed, savedTransId:%d code:0x%x, ver:%" PRId64 " term:%" PRId64 " role:%s raw:%p", transId, + pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, syncStr(cbMeta.state), pRaw); + + if (pMgmt->errCode == 0) { + sdbWriteWithoutFree(pMnode->pSdb, pRaw); + sdbSetApplyIndex(pMnode->pSdb, cbMeta.index); + sdbSetApplyTerm(pMnode->pSdb, cbMeta.term); + } + + if (pMgmt->transId == transId) { + if (pMgmt->errCode != 0) { + mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode)); + } + tsem_post(&pMgmt->syncSem); } } @@ -78,11 +88,19 @@ int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char } void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { - mInfo("mndReConfig cbMeta.code:%d, cbMeta.currentTerm:%" PRId64 ", cbMeta.term:%" PRId64 ", cbMeta.index:%" PRId64, - cbMeta.code, cbMeta.currentTerm, cbMeta.term, cbMeta.index); - SMnode *pMnode = pFsm->data; - pMnode->syncMgmt.errCode = cbMeta.code; - tsem_post(&pMnode->syncMgmt.syncSem); + SMnode *pMnode = pFsm->data; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + + pMgmt->errCode = cbMeta.code; + mInfo("trans:-1, sync reconfig is proposed, savedTransId:%d code:0x%x, curTerm:%" PRId64 " term:%" PRId64, + pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term); + + if (pMgmt->transId == -1) { + if (pMgmt->errCode != 0) { + mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode)); + } + tsem_post(&pMgmt->syncSem); + } } SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { @@ -165,15 +183,17 @@ void mndCleanupSync(SMnode *pMnode) { memset(pMgmt, 0, sizeof(SSyncMgmt)); } -int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { +int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - pMgmt->errCode = 0; - - SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; + SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; rsp.pCont = rpcMallocCont(rsp.contLen); if (rsp.pCont == NULL) return -1; memcpy(rsp.pCont, pRaw, rsp.contLen); + pMgmt->errCode = 0; + pMgmt->transId = transId; + mTrace("trans:%d, will be proposed", pMgmt->transId); + const bool isWeak = false; int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak); if (code == 0) { @@ -187,7 +207,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { } rpcFreeCont(rsp.pCont); - if (code != 0) return code; + if (code != 0) { + mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code); + return code; + } + return pMgmt->errCode; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 16c56a63df..a8e78ddafe 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -680,7 +680,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { sdbSetRawStatus(pRaw, SDB_STATUS_READY); mDebug("trans:%d, sync to other nodes", pTrans->id); - int32_t code = mndSyncPropose(pMnode, pRaw); + int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id); if (code != 0) { mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); sdbFreeRaw(pRaw); diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 3d9148360a..3932defd8d 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -386,6 +386,8 @@ SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *iter, char **ppBuf, int32_t *len); const char *sdbTableName(ESdbType type); void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); +int32_t sdbGetIdFromRaw(SSdbRaw *pRaw); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c index ba3b00c12d..4b61ebb627 100644 --- a/source/dnode/mnode/sdb/src/sdbRaw.c +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -16,6 +16,11 @@ #define _DEFAULT_SOURCE #include "sdb.h" +int32_t sdbGetIdFromRaw(SSdbRaw *pRaw) { + int32_t id = *((int32_t *)(pRaw->pData)); + return id; +} + SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) { SSdbRaw *pRaw = taosMemoryCalloc(1, dataLen + sizeof(SSdbRaw)); if (pRaw == NULL) { diff --git a/tests/script/tsim/mnode/basic2.sim b/tests/script/tsim/mnode/basic2.sim index 6df3704469..18aa85cf5b 100644 --- a/tests/script/tsim/mnode/basic2.sim +++ b/tests/script/tsim/mnode/basic2.sim @@ -123,5 +123,12 @@ sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 flo sql create table db.ctb using db.stb tags(101, 102, "103") sql insert into db.ctb values(now, 1, "2") +sql select * from db.ctb +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] + +if $rows != 1 then + return -1 +endi + system sh/exec.sh -n dnode1 -s stop system sh/exec.sh -n dnode2 -s stop \ No newline at end of file diff --git a/tests/script/tsim/valgrind/checkError.sim b/tests/script/tsim/valgrind/checkError.sim index 97d16dba96..b0bc0ac832 100644 --- a/tests/script/tsim/valgrind/checkError.sim +++ b/tests/script/tsim/valgrind/checkError.sim @@ -71,7 +71,7 @@ print ====> start to check if there are ERRORS in vagrind log file for each dnod # -n : dnode[x] be check system_content sh/checkValgrind.sh -n dnode1 print cmd return result----> [ $system_content ] -if $system_content <= 1 then +if $system_content <= 2 then return 0 endi