diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index b43eafc918..2e04afdbdc 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -80,6 +80,7 @@ typedef struct SFsmCbMeta { uint64_t seqNum; SyncTerm term; SyncTerm currentTerm; + uint64_t flag; } SFsmCbMeta; typedef struct SReConfigCbMeta { @@ -87,6 +88,9 @@ typedef struct SReConfigCbMeta { SyncIndex index; SyncTerm term; SyncTerm currentTerm; + SSyncCfg oldCfg; + bool isDrop; + uint64_t flag; } SReConfigCbMeta; typedef struct SSyncFSM { @@ -162,6 +166,7 @@ void syncCleanUp(); int64_t syncOpen(const SSyncInfo* pSyncInfo); void syncStart(int64_t rid); void syncStop(int64_t rid); +int32_t syncSetStandby(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); ESyncState syncGetMyRole(int64_t rid); const char* syncGetMyRoleStr(int64_t rid); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index bb2c069eaa..2533f268e5 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -92,6 +92,13 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SEpSet epSet = {0}; dmGetMnodeEpSet(pMgmt->pData, &epSet); rpcSendRecv(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp); + if (rpcRsp.code != 0) { + dError("failed to send status msg since %s, numOfEps:%d inUse:%d", tstrerror(rpcRsp.code), epSet.numOfEps, + epSet.inUse); + for (int32_t i = 0; i < epSet.numOfEps; ++i) { + dDebug("index:%d, mnode ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); + } + } dmProcessStatusRsp(pMgmt, &rpcRsp); } diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 6266f22f39..a421be5c06 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -144,6 +144,7 @@ _OVER: static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) { mTrace("cluster:%" PRId64 ", perform insert action, row:%p", pCluster->id, pCluster); + pSdb->pMnode->clusterId = pCluster->id; return 0; } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 047562ec02..22f858c60b 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -441,7 +441,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes; SStatusRsp statusRsp = {0}; - statusRsp.dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE); + statusRsp.dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); statusRsp.dnodeCfg.dnodeId = pDnode->id; statusRsp.dnodeCfg.clusterId = pMnode->clusterId; statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp)); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index afdc27a96a..82e6256295 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -233,7 +233,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { if (pObj->pDnode == NULL) { mError("mnode:%d, no corresponding dnode exists", pObj->id); } else { - if (pObj->state == TAOS_SYNC_STATE_LEADER) { + if (pObj->id == pMnode->selfDnodeId || pObj->state == TAOS_SYNC_STATE_LEADER) { pEpSet->inUse = pEpSet->numOfEps; } addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index aa391ad0d3..f34ab28cce 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -51,6 +51,7 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { void mndRestoreFinish(struct SSyncFSM *pFsm) { SMnode *pMnode = pFsm->data; if (!pMnode->deploy) { + mInfo("mnode sync restore finished"); mndTransPullup(pMnode); pMnode->syncMgmt.restored = true; } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 69549d2a7e..2e71745f61 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -182,7 +182,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S cJSON* syncNode2Json(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode); char* syncNode2SimpleStr(const SSyncNode* pSyncNode); -void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig); +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop); SSyncNode* syncNodeAcquire(int64_t rid); void syncNodeRelease(SSyncNode* pNode); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index c9e16c53c8..008bc00dbc 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -333,7 +333,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { cbMeta.seqNum = pEntry->seqNum; cbMeta.term = pEntry->term; cbMeta.currentTerm = ths->pRaftStore->currentTerm; - ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); + cbMeta.flag = 0x11; bool needExecute = true; if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) { @@ -347,24 +347,55 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // config change if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { + SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg; + SSyncCfg newSyncCfg; int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); ASSERT(ret == 0); - syncNodeUpdateConfig(ths, &newSyncCfg); - if (ths->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(ths); - } else { - syncNodeBecomeFollower(ths); + // update new config myIndex + bool hit = false; + for (int i = 0; i < newSyncCfg.replicaNum; ++i) { + if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && + ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { + newSyncCfg.myIndex = i; + hit = true; + break; + } } - // maybe newSyncCfg.myIndex is updated in syncNodeUpdateConfig - if (ths->pFsm->FpReConfigCb != NULL) { - SReConfigCbMeta cbMeta = {0}; + SReConfigCbMeta cbMeta = {0}; + bool isDrop; + + // I am in newConfig + if (hit) { + syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop); + + // change isStandBy to normal + if (!isDrop) { + if (ths->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(ths); + } else { + syncNodeBecomeFollower(ths); + } + } + + char* sOld = syncCfg2Str(&oldSyncCfg); + char* sNew = syncCfg2Str(&newSyncCfg); + sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop); + taosMemoryFree(sOld); + taosMemoryFree(sNew); + } + + // always call FpReConfigCb + if (ths->pFsm->FpReConfigCb != NULL) { cbMeta.code = 0; cbMeta.currentTerm = ths->pRaftStore->currentTerm; cbMeta.index = pEntry->index; cbMeta.term = pEntry->term; + cbMeta.oldCfg = oldSyncCfg; + cbMeta.flag = 0x11; + cbMeta.isDrop = isDrop; ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta); } } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index a3d480956e..4a1a40a2d7 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -111,6 +111,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { cbMeta.seqNum = pEntry->seqNum; cbMeta.term = pEntry->term; cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; + cbMeta.flag = 0x1; bool needExecute = true; if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) { @@ -124,24 +125,54 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { // config change if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { + SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg; + SSyncCfg newSyncCfg; int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); ASSERT(ret == 0); - syncNodeUpdateConfig(pSyncNode, &newSyncCfg); - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(pSyncNode); - } else { - syncNodeBecomeFollower(pSyncNode); + // update new config myIndex + bool hit = false; + for (int i = 0; i < newSyncCfg.replicaNum; ++i) { + if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && + pSyncNode->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { + newSyncCfg.myIndex = i; + hit = true; + break; + } } - // maybe newSyncCfg.myIndex is updated in syncNodeUpdateConfig + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + ASSERT(hit == true); + } + + bool isDrop; + syncNodeUpdateConfig(pSyncNode, &newSyncCfg, &isDrop); + + // change isStandBy to normal + if (!isDrop) { + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(pSyncNode); + } else { + syncNodeBecomeFollower(pSyncNode); + } + } + + char* sOld = syncCfg2Str(&oldSyncCfg); + char* sNew = syncCfg2Str(&newSyncCfg); + sInfo("==config change== 0x1 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop); + taosMemoryFree(sOld); + taosMemoryFree(sNew); + if (pSyncNode->pFsm->FpReConfigCb != NULL) { SReConfigCbMeta cbMeta = {0}; cbMeta.code = 0; cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; cbMeta.index = pEntry->index; cbMeta.term = pEntry->term; + cbMeta.oldCfg = oldSyncCfg; + cbMeta.flag = 0x1; + cbMeta.isDrop = isDrop; pSyncNode->pFsm->FpReConfigCb(pSyncNode->pFsm, newSyncCfg, cbMeta); } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 914ce68245..99aac7991a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -141,9 +141,38 @@ void syncStop(int64_t rid) { taosRemoveRef(tsNodeRefId, rid); } +int32_t syncSetStandby(int64_t rid) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return -1; + } + + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return -1; + } + + // state change + pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; + syncNodeStopHeartbeatTimer(pSyncNode); + + // reset elect timer, long enough + int32_t electMS = TIMER_MAX_MS; + int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS); + ASSERT(ret == 0); + + pSyncNode->pRaftCfg->isStandBy = 1; + raftCfgPersist(pSyncNode->pRaftCfg); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return 0; +} + int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { int32_t ret = 0; char* configChange = syncCfg2Str((SSyncCfg*)pSyncCfg); + sInfo("==syncReconfig== newconfig:%s", configChange); + SRpcMsg rpcMsg = {0}; rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE; rpcMsg.info.noResp = 1; @@ -941,30 +970,19 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { int len = 256; char* s = (char*)taosMemoryMalloc(len); snprintf(s, len, - "syncNode2SimpleStr vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, electTimerLogicClock:%lu, " + "syncNode2SimpleStr vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, " + "electTimerLogicClock:%lu, " "electTimerLogicClockUser:%lu, " "electTimerMS:%d", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, pSyncNode->state, - syncUtilState2String(pSyncNode->state), pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, - pSyncNode->electTimerMS); + syncUtilState2String(pSyncNode->state), pSyncNode->pRaftCfg->isStandBy, pSyncNode->electTimerLogicClock, + pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS); return s; } -void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { - bool hit = false; - for (int i = 0; i < newConfig->replicaNum; ++i) { - if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newConfig->nodeInfo)[i].nodeFqdn) == 0 && - pSyncNode->myNodeInfo.nodePort == (newConfig->nodeInfo)[i].nodePort) { - newConfig->myIndex = i; - hit = true; - break; - } - } - ASSERT(hit == true); - +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop) { pSyncNode->pRaftCfg->cfg = *newConfig; - int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg); - ASSERT(ret == 0); + int32_t ret = 0; // init internal pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; @@ -994,9 +1012,22 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode); votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode); - pSyncNode->pRaftCfg->isStandBy = 0; - raftCfgPersist(pSyncNode->pRaftCfg); + // isDrop + *isDrop = true; + for (int i = 0; i < newConfig->replicaNum; ++i) { + if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && + (newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { + *isDrop = false; + break; + } + } + if (!(*isDrop)) { + // change isStandBy to normal + pSyncNode->pRaftCfg->isStandBy = 0; + } + + raftCfgPersist(pSyncNode->pRaftCfg); syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode); } diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index f52fef0019..7efc3f50c0 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -43,8 +43,9 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { if (cbMeta.index > beginIndex) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + snprintf(logBuf, sizeof(logBuf), + "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } else { sTrace("==callback== ==CommitCb== do not apply again %ld", cbMeta.index); @@ -54,15 +55,16 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, - cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + snprintf(logBuf, sizeof(logBuf), + "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", pFsm, + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } @@ -75,13 +77,23 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } +void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { + sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); +} + SSyncFSM* createFsm() { SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); pFsm->FpCommitCb = CommitCb; pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; + pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpRestoreFinishCb = RestoreFinishCb; + pFsm->FpSnapshotApply = NULL; + pFsm->FpSnapshotRead = NULL; + + pFsm->FpReConfigCb = ReConfigCb; + return pFsm; } @@ -109,6 +121,7 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* syncInfo.pFsm = createFsm(); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); syncInfo.pWal = pWal; + syncInfo.isStandBy = isStandBy; SSyncCfg* pCfg = &syncInfo.syncCfg; @@ -180,7 +193,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { int main(int argc, char** argv) { tsAsyncLog = 0; - sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_INFO; if (argc != 7) { usage(argv[0]); exit(-1); @@ -212,17 +225,21 @@ int main(int argc, char** argv) { int64_t rid = createSyncNode(replicaNum, myIndex, gVgId, pWal, (char*)gDir, isStandBy); assert(rid > 0); - if (isStandBy) { - syncStartStandBy(rid); - } else { - syncStart(rid); - } + syncStart(rid); + + /* + if (isStandBy) { + syncStartStandBy(rid); + } else { + syncStart(rid); + } + */ SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); assert(pSyncNode != NULL); if (isConfigChange) { - configChange(rid, 3, myIndex); + configChange(rid, 2, myIndex); } //--------------------------- diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 683f6c88c6..18a85865df 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -318,6 +318,7 @@ void transDQDestroy(SDelayQueue* queue); int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); +void transPrintEpSet(SEpSet* pEpSet); /* * init global func */ diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 159b0cdd07..ba0da63dc1 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -921,16 +921,18 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { STransConnCtx* pCtx = pMsg->ctx; SEpSet* pEpSet = &pCtx->epSet; + transPrintEpSet(pEpSet); + /* * upper layer handle retry if code equal TSDB_CODE_RPC_NETWORK_UNAVAIL */ tmsg_t msgType = pCtx->msgType; if ((pTransInst->retry != NULL && (pTransInst->retry(pResp->code))) || - ((pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && msgType == TDMT_MND_CONNECT)) { + (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL)) { pMsg->sent = 0; pMsg->st = taosGetTimestampUs(); pCtx->retryCount += 1; - if (msgType == TDMT_MND_CONNECT && pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (pCtx->retryCount < pEpSet->numOfEps) { pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps; @@ -972,7 +974,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { pCtx->pRsp = NULL; } else { tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); - pTransInst->cfp(pTransInst->parent, pResp, pEpSet); + if (pResp->code != 0) { + pTransInst->cfp(pTransInst->parent, pResp, NULL); + } else { + pTransInst->cfp(pTransInst->parent, pResp, pEpSet); + } } return 0; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 1ea03083b2..526f896ad2 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -446,4 +446,16 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_ uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0); return 0; } + +void transPrintEpSet(SEpSet* pEpSet) { + if (pEpSet == NULL) { + tTrace("NULL epset"); + return; + } + tTrace("epset begin: inUse: %d", pEpSet->inUse); + for (int i = 0; i < pEpSet->numOfEps; i++) { + tTrace("ip: %s, port: %d", pEpSet->eps[i].fqdn, pEpSet->eps[i].port); + } + tTrace("epset end"); +} #endif diff --git a/tests/script/tsim/mnode/basic1.sim b/tests/script/tsim/mnode/basic1.sim index 235889ece6..198f36cdd2 100644 --- a/tests/script/tsim/mnode/basic1.sim +++ b/tests/script/tsim/mnode/basic1.sim @@ -6,15 +6,6 @@ system sh/exec.sh -n dnode2 -s start sql connect print =============== show dnodes -sql show dnodes; -if $rows != 1 then - return -1 -endi - -if $data00 != 1 then - return -1 -endi - sql show mnodes; if $rows != 1 then return -1 @@ -30,63 +21,55 @@ endi print =============== create dnodes sql create dnode $hostname port 7200 -sleep 2000 - -sql show dnodes; -if $rows != 2 then - return -1 +$x = 0 +step1: + $x = $x + 1 + sleep 500 + if $x == 20 then + return -1 + endi +sql show dnodes -x step1 +if $data(1)[4] != ready then + goto step1 endi - -if $data00 != 1 then - return -1 -endi - -if $data10 != 2 then - return -1 -endi - -print $data02 -if $data02 != 0 then - return -1 -endi - -if $data12 != 0 then - return -1 -endi - -if $data04 != ready then - return -1 -endi - -if $data14 != ready then - return -1 -endi - -sql show mnodes; -if $rows != 1 then - return -1 -endi - -if $data00 != 1 then - return -1 -endi - -if $data02 != LEADER then - return -1 +if $data(2)[4] != ready then + goto step1 endi print =============== create drop mnode 1 sql_error create mnode on dnode 1 sql_error drop mnode on dnode 1 - -print =============== create drop mnode 2 sql create mnode on dnode 2 + +$x = 0 +step1: + $x = $x + 1 + sleep 1000 + if $x == 20 then + return -1 + endi sql show mnodes +print $data(1)[0] $data(1)[1] $data(1)[2] +print $data(2)[0] $data(2)[1] $data(2)[2] + if $rows != 2 then return -1 endi -sql_error create mnode on dnode 2 +if $data(1)[0] != 1 then + return -1 +endi +if $data(1)[2] != LEADER then + return -1 +endi +if $data(2)[0] != 2 then + return -1 +endi +if $data(2)[2] != FOLLOWER then + goto step1 +endi +sleep 2000 +print ============ drop mnodes sql drop mnode on dnode 2 sql show mnodes if $rows != 1 then @@ -94,6 +77,35 @@ if $rows != 1 then endi sql_error drop mnode on dnode 2 +$x = 0 +step2: + $x = $x + 1 + sleep 1000 + if $x == 20 then + return -1 + endi +sql show mnodes +print $data(1)[0] $data(1)[1] $data(1)[2] +print $data(2)[0] $data(2)[1] $data(2)[2] + +if $rows != 2 then + return -1 +endi +if $data(1)[0] != 1 then + return -1 +endi +if $data(1)[2] != LEADER then + return -1 +endi +if $data(2)[0] != NULL then + goto step2 +endi +if $data(2)[2] != NULL then + goto step2 +endi + +sleep 2000 + print =============== create drop mnodes sql create mnode on dnode 2 sql show mnodes @@ -101,17 +113,32 @@ if $rows != 2 then return -1 endi -print =============== restart -system sh/exec.sh -n dnode1 -s stop -x SIGINT -system sh/exec.sh -n dnode2 -s stop -x SIGINT -system sh/exec.sh -n dnode1 -s start -system sh/exec.sh -n dnode2 -s start - -sleep 2000 +$x = 0 +step3: + $x = $x + 1 + sleep 1000 + if $x == 20 then + return -1 + endi sql show mnodes +print $data(1)[0] $data(1)[1] $data(1)[2] +print $data(2)[0] $data(2)[1] $data(2)[2] + if $rows != 2 then return -1 endi +if $data(1)[0] != 1 then + return -1 +endi +if $data(1)[2] != LEADER then + return -1 +endi +if $data(2)[0] != 2 then + return -1 +endi +if $data(2)[2] != FOLLOWER then + goto step3 +endi system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT diff --git a/tests/script/tsim/mnode/basic2.sim b/tests/script/tsim/mnode/basic2.sim index f1a3a8c251..66dee512eb 100644 --- a/tests/script/tsim/mnode/basic2.sim +++ b/tests/script/tsim/mnode/basic2.sim @@ -21,29 +21,31 @@ endi print =============== create dnodes sql create dnode $hostname port 7200 -sql create dnode $hostname port 7300 -sleep 2000 - -sql show dnodes; -if $rows != 3 then - return -1 +$x = 0 +step1: + $x = $x + 1 + sleep 500 + if $x == 20 then + return -1 + endi +sql show dnodes -x step1 +if $data(1)[4] != ready then + goto step1 endi - -sql show mnodes; -if $rows != 1 then - return -1 -endi - -if $data00 != 1 then - return -1 -endi - -if $data02 != LEADER then - return -1 +if $data(2)[4] != ready then + goto step1 endi print =============== create mnode 2 sql create mnode on dnode 2 + +$x = 0 +step1: + $x = $x + 1 + sleep 1000 + if $x == 20 then + return -1 + endi sql show mnodes print $data(1)[0] $data(1)[1] $data(1)[2] print $data(2)[0] $data(2)[1] $data(2)[2] @@ -60,8 +62,8 @@ endi if $data(2)[0] != 2 then return -1 endi -if $data(2)[2] == LEADER then - return -1 +if $data(2)[2] != FOLLOWER then + goto step1 endi print =============== create user @@ -71,42 +73,47 @@ if $rows != 2 then return -1 endi -#sql create database db -#sql show databases -#if $rows != 3 then -# return -1 -#endi +sql create database db +sql show databases +if $rows != 3 then + return -1 +endi +sleep 5000 + +print =============== restart system sh/exec.sh -n dnode1 -s stop system sh/exec.sh -n dnode2 -s stop -sleep 100 system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode2 -s start sql connect - sql show mnodes if $rows != 2 then return -1 endi -if $data(1)[0] != 1 then - return -1 -endi -if $data(1)[2] != LEADER then - return -1 -endi sql show users if $rows != 2 then return -1 endi -#sql show databases -#if $rows != 3 then -# return -1 -#endi +sql show databases +if $rows != 3 then + return -1 +endi -return +sql show dnodes +if $data(1)[4] != ready then + return -1 +endi +if $data(2)[4] != ready then + return -1 +endi + +print =============== insert data +sql create table db.stb (ts timestamp, i int) tags (j int) +sql create table db.ctb using db.stb tags(1); system sh/exec.sh -n dnode1 -s stop system sh/exec.sh -n dnode2 -s stop \ No newline at end of file diff --git a/tests/script/tsim/mnode/basic3.sim b/tests/script/tsim/mnode/basic3.sim new file mode 100644 index 0000000000..40c0f01229 --- /dev/null +++ b/tests/script/tsim/mnode/basic3.sim @@ -0,0 +1,137 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +sql connect + +print =============== step1: create dnodes +sql create dnode $hostname port 7200 +sql create dnode $hostname port 7300 + +$x = 0 +step1: + $x = $x + 1 + sleep 1000 + if $x == 20 then + return -1 + endi +sql show dnodes -x step1 +if $data(1)[4] != ready then + goto step1 +endi +if $data(2)[4] != ready then + goto step1 +endi +if $data(3)[4] != ready then + goto step1 +endi + +print =============== step2: create mnode 2 +sql create mnode on dnode 2 +sql create mnode on dnode 3 + +$x = 0 +step2: + $x = $x + 1 + sleep 1000 + if $x == 20 then + return -1 + endi +sql show mnodes -x step2 +if $data(1)[2] != LEADER then + goto step2 +endi +if $data(2)[2] != FOLLOWER then + goto step2 +endi +if $data(3)[2] != FOLLOWER then + goto step2 +endi + +print =============== step3: create user +sql create user user1 PASS 'user1' +sql show users +if $rows != 2 then + return -1 +endi + +# wait mnode2 mnode3 recv data finish +sleep 10000 + +print =============== step4: stop dnode1 +system sh/exec.sh -n dnode1 -s stop + +$x = 0 +step4: + $x = $x + 1 + sleep 1000 + if $x == 20 then + return -1 + endi +sql show mnodes -x step4 +print $data(1)[0] $data(1)[1] $data(1)[2] +print $data(2)[0] $data(2)[1] $data(2)[2] +print $data(3)[0] $data(3)[1] $data(3)[2] + +sql show users +if $rows != 2 then + return -1 +endi + +sleep 1000 +sql show dnodes +if $data(2)[4] != ready then + return -1 +endi +if $data(3)[4] != ready then + return -1 +endi + +print =============== step5: stop dnode1 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s stop + +$x = 0 +step5: + $x = $x + 1 + sleep 1000 + if $x == 20 then + return -1 + endi +sql show mnodes -x step5 +print $data(1)[0] $data(1)[1] $data(1)[2] +print $data(2)[0] $data(2)[1] $data(2)[2] +print $data(3)[0] $data(3)[1] $data(3)[2] + +sql show users +if $rows != 2 then + return -1 +endi + +print =============== step6: stop dnode1 +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s stop + +$x = 0 +step6: + $x = $x + 1 + sleep 1000 + if $x == 20 then + return -1 + endi +sql show mnodes -x step6 +print $data(1)[0] $data(1)[1] $data(1)[2] +print $data(2)[0] $data(2)[1] $data(2)[2] +print $data(3)[0] $data(3)[1] $data(3)[2] + +sql show users +if $rows != 2 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop +system sh/exec.sh -n dnode2 -s stop +system sh/exec.sh -n dnode3 -s stop \ No newline at end of file