diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 6318f2e3f2..755ca09199 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -123,6 +123,11 @@ typedef enum { TRN_POLICY_RETRY = 1, } ETrnPolicy; +typedef enum { + TRN_EXEC_PARALLEL = 0, + TRN_EXEC_ONE_BY_ONE = 1, +} ETrnExecType; + typedef enum { DND_REASON_ONLINE = 0, DND_REASON_STATUS_MSG_TIMEOUT, @@ -151,6 +156,7 @@ typedef struct { ETrnStage stage; ETrnPolicy policy; ETrnType type; + ETrnExecType parallel; int32_t code; int32_t failedTimes; SRpcHandleInfo rpcInfo; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 84e7a17192..84bed13e2e 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -57,6 +57,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetCb(STrans *pTrans, ETrnFuncType startFunc, ETrnFuncType stopFunc, void *param, int32_t paramLen); void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb); +void mndTransSetExecOneByOne(STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); void mndTransProcessRsp(SRpcMsg *pRsp); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 82e6256295..344b4f6263 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -312,25 +312,6 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno createEpset.eps[0].port = pDnode->port; memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - { - int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq); - void *pReq = taosMemoryMalloc(contLen); - tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq); - - STransAction action = { - .epSet = alterEpset, - .pCont = pReq, - .contLen = contLen, - .msgType = TDMT_DND_ALTER_MNODE, - .acceptableCode = 0, - }; - - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - } - { int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq); void *pReq = taosMemoryMalloc(contLen); @@ -350,6 +331,25 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno } } + { + int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq); + void *pReq = taosMemoryMalloc(contLen); + tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq); + + STransAction action = { + .epSet = alterEpset, + .pCont = pReq, + .contLen = contLen, + .msgType = TDMT_DND_ALTER_MNODE, + .acceptableCode = 0, + }; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + } + return 0; } @@ -365,6 +365,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); + mndTransSetExecOneByOne(pTrans); if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER; @@ -536,7 +537,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) { if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); - + mndTransSetExecOneByOne(pTrans); if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index cb7d3e81f6..7b5d1b6c32 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -507,6 +507,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name); mndTransSetDbInfo(pTrans, pDb); + mndTransSetExecOneByOne(pTrans); if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index a7480f459a..16c56a63df 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -140,6 +140,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_INT16(pRaw, dataPos, stage, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER) + SDB_SET_INT16(pRaw, dataPos, pTrans->parallel, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) @@ -245,12 +246,15 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { int16_t stage = 0; int16_t policy = 0; int16_t type = 0; + int16_t parallel = 0; SDB_GET_INT16(pRaw, dataPos, &stage, _OVER) SDB_GET_INT16(pRaw, dataPos, &policy, _OVER) SDB_GET_INT16(pRaw, dataPos, &type, _OVER) + SDB_GET_INT16(pRaw, dataPos, ¶llel, _OVER) pTrans->stage = stage; pTrans->policy = policy; pTrans->type = type; + pTrans->parallel = parallel; SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) @@ -665,6 +669,8 @@ void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) { memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN); } +void mndTransSetExecOneByOne(STrans *pTrans) { pTrans->parallel = TRN_EXEC_ONE_BY_ONE; } + static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { SSdbRaw *pRaw = mndTransActionEncode(pTrans); if (pRaw == NULL) { @@ -970,7 +976,18 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction == NULL) continue; - if (pAction->msgSent) continue; + + if (pAction->msgSent) { + if (pAction->msgReceived) { + continue; + } else { + if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) { + break; + } else { + continue; + } + } + } int64_t signature = pTrans->id; signature = (signature << 32); @@ -990,6 +1007,9 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr pAction->msgSent = 1; pAction->msgReceived = 0; pAction->errCode = 0; + if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) { + break; + } } else { pAction->msgSent = 0; pAction->msgReceived = 0; diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 5809cedb90..b44b0da750 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -60,7 +60,9 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, return; } } - assert(0); + + // maybe config change + // assert(0); } SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 99aac7991a..5ad8df11a9 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -981,6 +981,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { } void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop) { + SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg; pSyncNode->pRaftCfg->cfg = *newConfig; int32_t ret = 0; @@ -1014,6 +1015,15 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDro // isDrop *isDrop = true; + bool IamInOld, IamInNew; + for (int i = 0; i < oldConfig.replicaNum; ++i) { + if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && + (oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { + *isDrop = false; + break; + } + } + for (int i = 0; i < newConfig->replicaNum; ++i) { if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && (newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 623182fddf..e85e9b8896 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -55,7 +55,7 @@ ./test.sh -f tsim/bnode/basic1.sim # ---- mnode -#./test.sh -f tsim/mnode/basic1.sim +./test.sh -f tsim/mnode/basic1.sim ./test.sh -f tsim/mnode/basic2.sim # ---- show diff --git a/tests/script/tsim/mnode/basic1.sim b/tests/script/tsim/mnode/basic1.sim index 198f36cdd2..9131237ca6 100644 --- a/tests/script/tsim/mnode/basic1.sim +++ b/tests/script/tsim/mnode/basic1.sim @@ -88,7 +88,7 @@ sql show mnodes print $data(1)[0] $data(1)[1] $data(1)[2] print $data(2)[0] $data(2)[1] $data(2)[2] -if $rows != 2 then +if $rows != 1 then return -1 endi if $data(1)[0] != 1 then @@ -97,16 +97,16 @@ endi if $data(1)[2] != LEADER then return -1 endi -if $data(2)[0] != NULL then +if $data(2)[0] != null then goto step2 endi -if $data(2)[2] != NULL then +if $data(2)[2] != null then goto step2 endi sleep 2000 -print =============== create drop mnodes +print =============== create mnodes sql create mnode on dnode 2 sql show mnodes if $rows != 2 then