From aad288ab1751e73e52079427139165d941209477 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sun, 12 Jun 2022 15:21:56 +0800 Subject: [PATCH 1/7] refactor(sync): add debug log --- include/libs/sync/sync.h | 13 ++- source/dnode/mnode/impl/src/mndMain.c | 12 +-- source/dnode/mnode/impl/src/mndSync.c | 7 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 6 +- source/dnode/vnode/src/vnd/vnodeSync.c | 3 + source/libs/sync/inc/syncInt.h | 4 + source/libs/sync/src/syncAppendEntries.c | 2 +- source/libs/sync/src/syncCommit.c | 2 +- source/libs/sync/src/syncMain.c | 130 +++++++++++++++++------ 9 files changed, 124 insertions(+), 55 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 3a77cc1e19..2673bc382b 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -47,8 +47,9 @@ typedef enum { typedef enum { TAOS_SYNC_PROPOSE_SUCCESS = 0, TAOS_SYNC_PROPOSE_NOT_LEADER = 1, - TAOS_SYNC_PROPOSE_OTHER_ERROR = 2, - TAOS_SYNC_ONLY_ONE_REPLICA = 3, + TAOS_SYNC_ONLY_ONE_REPLICA = 2, + TAOS_SYNC_NOT_IN_NEW_CONFIG = 3, + TAOS_SYNC_OTHER_ERROR = 100, } ESyncProposeCode; typedef enum { @@ -199,15 +200,13 @@ bool syncIsRestoreFinish(int64_t rid); int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); -int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg); + +// build SRpcMsg, need to call syncPropose with SRpcMsg +int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg); int32_t syncLeaderTransfer(int64_t rid); int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader); -// to be moved to static -void syncStartNormal(int64_t rid); -void syncStartStandBy(int64_t rid); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index d8a61461dd..8b86845627 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -382,22 +382,22 @@ void mndStop(SMnode *pMnode) { int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SSyncMgmt *pMgmt = &pMnode->syncMgmt; - int32_t code = TAOS_SYNC_PROPOSE_OTHER_ERROR; + int32_t code = TAOS_SYNC_OTHER_ERROR; if (!syncEnvIsStart()) { mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); - return TAOS_SYNC_PROPOSE_OTHER_ERROR; + return TAOS_SYNC_OTHER_ERROR; } SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync); if (pSyncNode == NULL) { mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType)); - return TAOS_SYNC_PROPOSE_OTHER_ERROR; + return TAOS_SYNC_OTHER_ERROR; } if (mndAcquireSyncRef(pMnode) != 0) { mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr()); - return TAOS_SYNC_PROPOSE_OTHER_ERROR; + return TAOS_SYNC_OTHER_ERROR; } char logBuf[512] = {0}; @@ -457,7 +457,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { } else { mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - code = TAOS_SYNC_PROPOSE_OTHER_ERROR; + code = TAOS_SYNC_OTHER_ERROR; } } else { @@ -495,7 +495,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { syncAppendEntriesReplyDestroy(pSyncMsg); } else { mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - code = TAOS_SYNC_PROPOSE_OTHER_ERROR; + code = TAOS_SYNC_OTHER_ERROR; } } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index e0b4cc6a57..24b4f1926d 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -236,7 +236,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { tsem_wait(&pMgmt->syncSem); } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { terrno = TSDB_CODE_APP_NOT_READY; - } else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) { + } else if (code == TAOS_SYNC_OTHER_ERROR) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; } else { terrno = TSDB_CODE_APP_ERROR; @@ -254,13 +254,16 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { void mndSyncStart(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; syncSetMsgCb(pMgmt->sync, &pMnode->msgCb); + syncStart(pMgmt->sync); + mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby); +/* if (pMgmt->standby) { syncStartStandBy(pMgmt->sync); } else { syncStart(pMgmt->sync); } - mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby); +*/ } void mndSyncStop(SMnode *pMnode) {} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 51ed739693..4d1a5060dd 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -291,7 +291,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { } int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; + int32_t ret = TAOS_SYNC_OTHER_ERROR; if (syncEnvIsStart()) { SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); @@ -372,13 +372,13 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } else { vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); - ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; + ret = TAOS_SYNC_OTHER_ERROR; } syncNodeRelease(pSyncNode); } else { vError("==vnodeProcessSyncReq== error syncEnv stop"); - ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; + ret = TAOS_SYNC_OTHER_ERROR; } return ret; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index b4021ec73d..ec95451b31 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -290,11 +290,14 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { void vnodeSyncStart(SVnode *pVnode) { syncSetMsgCb(pVnode->sync, &pVnode->msgCb); + syncStart(pVnode->sync); + /* if (pVnode->config.standby) { syncStartStandBy(pVnode->sync); } else { syncStart(pVnode->sync); } + */ } void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index e7777af749..8a44dcd9bc 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -168,6 +168,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); void syncNodeStart(SSyncNode* pSyncNode); void syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); +int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak); // option bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); @@ -232,6 +233,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId); +void syncStartNormal(int64_t rid); +void syncStartStandBy(int64_t rid); + // for debug -------------- void syncNodePrint(SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 29ee263756..08a4081ad3 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -995,7 +995,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ths->commitIndex = snapshot.lastApplyIndex; sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, commitBegin, - commitEnd, syncUtilState2String(ths->state)); + commitEnd, syncUtilState2String(ths->state)); } SyncIndex beginIndex = ths->commitIndex + 1; diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index eac9c1fbd8..3424fac5e7 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -57,7 +57,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { pSyncNode->commitIndex = snapshot.lastApplyIndex; sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId, - pSyncNode->commitIndex, snapshot.lastApplyIndex, syncUtilState2String(pSyncNode->state)); + pSyncNode->commitIndex, snapshot.lastApplyIndex, syncUtilState2String(pSyncNode->state)); } // update commit index diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 4d410644bb..fbac8c3d9d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -149,12 +149,12 @@ void syncStop(int64_t rid) { int32_t syncSetStandby(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return -1; + return TAOS_SYNC_OTHER_ERROR; } if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return -1; + return TAOS_SYNC_OTHER_ERROR; } // state change @@ -173,14 +173,68 @@ int32_t syncSetStandby(int64_t rid) { return 0; } -int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { - int32_t ret = 0; - char* newconfig = syncCfg2Str((SSyncCfg*)pSyncCfg); +int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return TAOS_SYNC_OTHER_ERROR; + } + ASSERT(rid == pSyncNode->rid); + int32_t ret = 0; + bool IamInNew = false; + for (int i = 0; i < pNewCfg->replicaNum; ++i) { + SRaftId newId; + newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort); + newId.vgId = pSyncNode->vgId; + if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) { + IamInNew = true; + } + } + if (!IamInNew) { + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return TAOS_SYNC_NOT_IN_NEW_CONFIG; + } + + char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg); + pRpcMsg->msgType = TDMT_SYNC_CONFIG_CHANGE; + pRpcMsg->info.noResp = 1; + pRpcMsg->contLen = strlen(newconfig) + 1; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig); + taosMemoryFree(newconfig); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return ret; +} + +int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return TAOS_SYNC_OTHER_ERROR; + } + ASSERT(rid == pSyncNode->rid); + + bool IamInNew = false; + for (int i = 0; i < pNewCfg->replicaNum; ++i) { + SRaftId newId; + newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort); + newId.vgId = pSyncNode->vgId; + if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) { + IamInNew = true; + } + } + if (!IamInNew) { + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return TAOS_SYNC_NOT_IN_NEW_CONFIG; + } + + char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg); if (gRaftDetailLog) { sInfo("==syncReconfig== newconfig:%s", newconfig); } + int32_t ret = 0; + SRpcMsg rpcMsg = {0}; rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE; rpcMsg.info.noResp = 1; @@ -188,27 +242,42 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", newconfig); taosMemoryFree(newconfig); - ret = syncPropose(rid, &rpcMsg, false); + ret = syncNodePropose(pSyncNode, &rpcMsg, false); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); return ret; } int32_t syncLeaderTransfer(int64_t rid) { - int32_t ret = 0; + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return TAOS_SYNC_OTHER_ERROR; + } + ASSERT(rid == pSyncNode->rid); + if (pSyncNode->peersNum > 0) { + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return TAOS_SYNC_OTHER_ERROR; + } + + SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0]; + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + + int32_t ret = syncLeaderTransferTo(rid, newLeader); return ret; } int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return false; + return TAOS_SYNC_OTHER_ERROR; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); int32_t ret = 0; if (pSyncNode->replicaNum == 1) { - taosReleaseRef(tsNodeRefId, pSyncNode->rid); sError("only one replica, cannot drop leader"); + taosReleaseRef(tsNodeRefId, pSyncNode->rid); return TAOS_SYNC_ONLY_ONE_REPLICA; } @@ -220,26 +289,11 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg); syncLeaderTransferDestroy(pMsg); - ret = syncPropose(rid, &rpcMsg, false); - + ret = syncNodePropose(pSyncNode, &rpcMsg, false); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return ret; } -int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) { - int32_t ret = 0; - char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg); - - pRpcMsg->msgType = TDMT_SYNC_CONFIG_CHANGE; - pRpcMsg->info.noResp = 1; - pRpcMsg->contLen = strlen(newconfig) + 1; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig); - taosMemoryFree(newconfig); - - return ret; -} - bool syncCanLeaderTransfer(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { @@ -272,8 +326,6 @@ bool syncCanLeaderTransfer(int64_t rid) { return matchOK; } -int32_t syncGiveUpLeader(int64_t rid) { return 0; } - int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t ret = syncPropose(rid, pMsg, isWeak); return ret; @@ -467,10 +519,19 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_PROPOSE_OTHER_ERROR; + return TAOS_SYNC_OTHER_ERROR; } assert(rid == pSyncNode->rid); sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); + ret = syncNodePropose(pSyncNode, pMsg, isWeak); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return ret; +} + +int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { + int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS; + sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { SRespStub stub; @@ -485,15 +546,14 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { ret = TAOS_SYNC_PROPOSE_SUCCESS; } else { - sTrace("syncPropose pSyncNode->FpEqMsg is NULL"); + sError("syncPropose pSyncNode->FpEqMsg is NULL"); } syncClientRequestDestroy(pSyncMsg); } else { - sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state)); + sError("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state)); ret = TAOS_SYNC_PROPOSE_NOT_LEADER; } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); return ret; } @@ -1241,7 +1301,7 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { sDebug("vgId:%d sync event become follower, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, - debugStr); + debugStr); // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -1276,7 +1336,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { sDebug("vgId:%d sync event become leader, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, - debugStr); + debugStr); // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; @@ -1885,7 +1945,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, int32_t code = 0; ESyncState state = flag; sDebug("vgId:%d sync event commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex, - endIndex, syncUtilState2String(state)); + endIndex, syncUtilState2String(state)); // execute fsm if (ths->pFsm != NULL) { From 34c906a1e11ef96187dcaf609732ccbbf10908e8 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 13 Jun 2022 10:39:32 +0800 Subject: [PATCH 2/7] refactor(sync): add debug log --- source/libs/sync/src/syncMain.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index fbac8c3d9d..fa61bf1b08 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -219,15 +219,22 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { SRaftId newId; newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort); newId.vgId = pSyncNode->vgId; + + sTrace("new[%d]: %lu, %d, my: %lu, %d", i, newId.addr, newId.vgId, pSyncNode->myRaftId.addr, pSyncNode->myRaftId.vgId); + if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) { IamInNew = true; } } + + if (!IamInNew) { + sError("sync reconfig error, not in new config"); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return TAOS_SYNC_NOT_IN_NEW_CONFIG; } + char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg); if (gRaftDetailLog) { sInfo("==syncReconfig== newconfig:%s", newconfig); From b1093b4a791709b28a1c64423f59f4c19f3ed023 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 13 Jun 2022 12:13:35 +0800 Subject: [PATCH 3/7] fix(sync): use fqdn instead of raftId --- source/libs/sync/src/syncMain.c | 25 +++++++++++------- source/libs/sync/test/CMakeLists.txt | 14 ++++++++++ source/libs/sync/test/syncRaftIdCheck.cpp | 32 +++++++++++++++++++++++ 3 files changed, 61 insertions(+), 10 deletions(-) create mode 100644 source/libs/sync/test/syncRaftIdCheck.cpp diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index fa61bf1b08..82fde80be2 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -35,7 +35,7 @@ #include "syncVoteMgr.h" #include "tref.h" -bool gRaftDetailLog = false; +bool gRaftDetailLog = true; static int32_t tsNodeRefId = -1; @@ -216,25 +216,30 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { bool IamInNew = false; for (int i = 0; i < pNewCfg->replicaNum; ++i) { - SRaftId newId; - newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort); - newId.vgId = pSyncNode->vgId; - - sTrace("new[%d]: %lu, %d, my: %lu, %d", i, newId.addr, newId.vgId, pSyncNode->myRaftId.addr, pSyncNode->myRaftId.vgId); - - if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) { + if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && + (pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { IamInNew = true; } + + /* + // some problem in inet_addr + + SRaftId newId = EMPTY_RAFT_ID; + newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort); + newId.vgId = pSyncNode->vgId; + + if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) { + IamInNew = true; + } + */ } - if (!IamInNew) { sError("sync reconfig error, not in new config"); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return TAOS_SYNC_NOT_IN_NEW_CONFIG; } - char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg); if (gRaftDetailLog) { sInfo("==syncReconfig== newconfig:%s", newconfig); diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index d39035ba53..725343e373 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -1,4 +1,5 @@ add_executable(syncTest "") +add_executable(syncRaftIdCheck "") add_executable(syncEnvTest "") add_executable(syncPingTimerTest "") add_executable(syncIOTickQTest "") @@ -54,6 +55,10 @@ target_sources(syncTest PRIVATE "syncTest.cpp" ) +target_sources(syncRaftIdCheck + PRIVATE + "syncRaftIdCheck.cpp" +) target_sources(syncEnvTest PRIVATE "syncEnvTest.cpp" @@ -257,6 +262,11 @@ target_include_directories(syncTest "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncRaftIdCheck + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_include_directories(syncEnvTest PUBLIC "${TD_SOURCE_DIR}/include/libs/sync" @@ -508,6 +518,10 @@ target_link_libraries(syncTest sync gtest_main ) +target_link_libraries(syncRaftIdCheck + sync + gtest_main +) target_link_libraries(syncEnvTest sync gtest_main diff --git a/source/libs/sync/test/syncRaftIdCheck.cpp b/source/libs/sync/test/syncRaftIdCheck.cpp new file mode 100644 index 0000000000..f23c0db82b --- /dev/null +++ b/source/libs/sync/test/syncRaftIdCheck.cpp @@ -0,0 +1,32 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncUtil.h" + +void usage(char* exe) { + printf("Usage: %s host port \n", exe); + printf("Usage: %s u64 \n", exe); +} + +int main(int argc, char** argv) { + if (argc == 2) { + uint64_t u64 = atoll(argv[1]); + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + printf("%s:%d \n", host, port); + + } else if (argc == 3) { + uint64_t u64; + char* host = argv[1]; + uint16_t port = atoi(argv[2]); + syncUtilAddr2U64(host, port); + printf("%lu \n", u64); + } else { + usage(argv[0]); + exit(-1); + } + + return 0; +} From f47a28fe8a487329a17686f762a7e54c38f0c373 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 13 Jun 2022 15:47:43 +0800 Subject: [PATCH 4/7] enh(sync): add leader transfer --- include/libs/sync/sync.h | 1 + include/libs/sync/syncTools.h | 1 + source/libs/sync/src/syncMain.c | 68 ++++++++++++++++++++++---- source/libs/sync/test/syncTestTool.cpp | 40 +++++++++++---- 4 files changed, 92 insertions(+), 18 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 2673bc382b..a369b81d26 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -111,6 +111,7 @@ typedef struct SSyncFSM { void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta); + void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 68a33d48cb..fbf124bf47 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -467,6 +467,7 @@ typedef struct SyncLeaderTransfer { SRaftId srcId; SRaftId destId; */ + SNodeInfo newNodeInfo; SRaftId newLeaderId; } SyncLeaderTransfer; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 82fde80be2..904cdae94e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -35,7 +35,7 @@ #include "syncVoteMgr.h" #include "tref.h" -bool gRaftDetailLog = true; +bool gRaftDetailLog = false; static int32_t tsNodeRefId = -1; @@ -183,13 +183,21 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg int32_t ret = 0; bool IamInNew = false; for (int i = 0; i < pNewCfg->replicaNum; ++i) { - SRaftId newId; - newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort); - newId.vgId = pSyncNode->vgId; - if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) { + if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && + (pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { IamInNew = true; } + + /* + SRaftId newId; + newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort); + newId.vgId = pSyncNode->vgId; + if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) { + IamInNew = true; + } + */ } + if (!IamInNew) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); return TAOS_SYNC_NOT_IN_NEW_CONFIG; @@ -267,7 +275,7 @@ int32_t syncLeaderTransfer(int64_t rid) { } ASSERT(rid == pSyncNode->rid); - if (pSyncNode->peersNum > 0) { + if (pSyncNode->peersNum == 0) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); return TAOS_SYNC_OTHER_ERROR; } @@ -296,6 +304,7 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId); pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort); pMsg->newLeaderId.vgId = pSyncNode->vgId; + pMsg->newNodeInfo = newLeader; ASSERT(pMsg != NULL); SRpcMsg rpcMsg = {0}; syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg); @@ -1887,10 +1896,51 @@ const char* syncStr(ESyncState state) { } static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { - SyncLeaderTransfer* pSyncLeaderTransfer; - if (syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId))) { + SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); + +/* + char host[128]; + uint16_t port; + syncUtilU642Addr(pSyncLeaderTransfer->newLeaderId.addr, host, sizeof(host), &port); + sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId, host, port, + pSyncLeaderTransfer->newLeaderId.addr); +*/ + + sDebug("vgId:%d sync event, begin leader transfer", ths->vgId); + + if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) { + sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort, + pSyncLeaderTransfer->newLeaderId.addr); + + // reset elect timer now! + int32_t electMS = 1; + int32_t ret = syncNodeRestartElectTimer(ths, electMS); + ASSERT(ret == 0); } + +/* + if (syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId))) { + // reset elect timer now! + int32_t electMS = 1; + int32_t ret = syncNodeRestartElectTimer(ths, electMS); + ASSERT(ret == 0); + } +*/ + if (ths->pFsm->FpLeaderTransferCb != NULL) { + SFsmCbMeta cbMeta; + cbMeta.code = 0; + cbMeta.currentTerm = ths->pRaftStore->currentTerm; + cbMeta.flag = 0; + cbMeta.index = pEntry->index; + cbMeta.isWeak = pEntry->isWeak; + cbMeta.seqNum = pEntry->seqNum; + cbMeta.state = ths->state; + cbMeta.term = pEntry->term; + ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, cbMeta); + } + + syncLeaderTransferDestroy(pSyncLeaderTransfer); return 0; } @@ -1992,7 +2042,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ASSERT(code == 0); } - // config change + // leader transfer if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) { code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry); ASSERT(code == 0); diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 0c8b26e9d9..ebcd7368cc 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -153,6 +153,16 @@ void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMe taosMemoryFree(s); } +void LeaderTransferCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { + char logBuf[256] = {0}; + snprintf(logBuf, sizeof(logBuf), + "==callback== ==LeaderTransferCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, flag:%lu, term:%lu " + "currentTerm:%lu \n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), + cbMeta.flag, cbMeta.term, cbMeta.currentTerm); + syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); +} + SSyncFSM* createFsm() { SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); memset(pFsm, 0, sizeof(*pFsm)); @@ -172,6 +182,8 @@ SSyncFSM* createFsm() { pFsm->FpSnapshotStopWrite = SnapshotStopWrite; pFsm->FpSnapshotDoWrite = SnapshotDoWrite; + pFsm->FpLeaderTransferCb = LeaderTransferCb; + return pFsm; } @@ -277,7 +289,8 @@ void usage(char* exe) { printf( "usage: %s replicaNum(1-5) myIndex(0-..) enableSnapshot(0/1) lastApplyIndex(>=-1) lastApplyTerm(>=0) " "writeRecordNum(>=0) " - "isStandBy(0/1) isConfigChange(0-5) iterTimes(>=0) finishLastApplyIndex(>=-1) finishLastApplyTerm(>=0) \n", + "isStandBy(0/1) isConfigChange(0-5) iterTimes(>=0) finishLastApplyIndex(>=-1) finishLastApplyTerm(>=0) " + "leaderTransfer(0/1) \n", exe); } @@ -294,9 +307,9 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { int main(int argc, char** argv) { sprintf(tsTempDir, "%s", "."); tsAsyncLog = 0; - sDebugFlag = DEBUG_SCREEN + DEBUG_FILE + DEBUG_TRACE + DEBUG_INFO + DEBUG_ERROR; + sDebugFlag = DEBUG_SCREEN + DEBUG_FILE + DEBUG_TRACE + DEBUG_INFO + DEBUG_ERROR + DEBUG_DEBUG; - if (argc != 12) { + if (argc != 13) { usage(argv[0]); exit(-1); } @@ -312,12 +325,14 @@ int main(int argc, char** argv) { int32_t iterTimes = atoi(argv[9]); int32_t finishLastApplyIndex = atoi(argv[10]); int32_t finishLastApplyTerm = atoi(argv[11]); + int32_t leaderTransfer = atoi(argv[12]); - sTrace( + sInfo( "args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, " - "isStandBy:%d, isConfigChange:%d, iterTimes:%d, finishLastApplyIndex:%d, finishLastApplyTerm:%d", + "isStandBy:%d, isConfigChange:%d, iterTimes:%d, finishLastApplyIndex:%d, finishLastApplyTerm:%d, " + "leaderTransfer:%d", replicaNum, myIndex, enableSnapshot, lastApplyIndex, lastApplyTerm, writeRecordNum, isStandBy, isConfigChange, - iterTimes, finishLastApplyIndex, finishLastApplyTerm); + iterTimes, finishLastApplyIndex, finishLastApplyTerm, leaderTransfer); // check parameter assert(replicaNum >= 1 && replicaNum <= 5); @@ -363,24 +378,31 @@ int main(int argc, char** argv) { //--------------------------- int32_t alreadySend = 0; + int32_t leaderTransferWait = 0; while (1) { char* simpleStr = syncNode2SimpleStr(pSyncNode); + leaderTransferWait++; + if (leaderTransferWait == 7) { + sTrace("begin leader transfer ..."); + int32_t ret = syncLeaderTransfer(rid); + } + if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); int32_t ret = syncPropose(rid, pRpcMsg, false); if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { - sTrace("%s value%d write not leader", simpleStr, alreadySend); + sTrace("%s value%d write not leader, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait); } else { assert(ret == 0); - sTrace("%s value%d write ok", simpleStr, alreadySend); + sTrace("%s value%d write ok, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait); } alreadySend++; rpcFreeCont(pRpcMsg->pCont); taosMemoryFree(pRpcMsg); } else { - sTrace("%s", simpleStr); + sTrace("%s, leaderTransferWait:%d", simpleStr, leaderTransferWait); } taosMsleep(1000); From 3887e940f14c12e77f556f2fd04654417e8652be Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 14 Jun 2022 10:24:05 +0800 Subject: [PATCH 5/7] fix(sync): when receive snapshot rsp from a dropped replica --- source/libs/sync/src/syncMain.c | 21 ++++++++++++--------- source/libs/sync/src/syncSnapshot.c | 6 ++++++ 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 94bc755f4e..12d09ce26c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1968,8 +1968,9 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE bool isDrop; if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) { - syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop); + syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop); + // change isStandBy to normal if (!isDrop) { if (ths->state == TAOS_SYNC_STATE_LEADER) { @@ -1978,14 +1979,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE syncNodeBecomeFollower(ths, "config change"); } } - - if (gRaftDetailLog) { - char* sOld = syncCfg2Str(&oldSyncCfg); - char* sNew = syncCfg2Str(&newSyncCfg); - sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop); - taosMemoryFree(sOld); - taosMemoryFree(sNew); - } + } else { + syncNodeBecomeFollower(ths, "config change"); + } + + if (gRaftDetailLog) { + char* sOld = syncCfg2Str(&oldSyncCfg); + char* sNew = syncCfg2Str(&newSyncCfg); + sInfo("==config change== 0x11 old:%s new:%s isDrop:%d index:%ld IamInNew:%d \n", sOld, sNew, isDrop, pEntry->index, IamInNew); + taosMemoryFree(sOld); + taosMemoryFree(sNew); } // always call FpReConfigCb diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 38742220fe..9c43c36af3 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -755,6 +755,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // sender receives ack, set seq = ack + 1, send msg from seq // if ack == SYNC_SNAPSHOT_SEQ_END, stop sender int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { + // if already drop replica, do not process + if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + sInfo("recv SyncSnapshotRsp maybe replica already dropped"); + return 0; + } + // get sender SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId)); ASSERT(pSender != NULL); From 5268b5233aa772c4304c11b970d0752b283ab9ba Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 14 Jun 2022 14:40:06 +0800 Subject: [PATCH 6/7] refactor(sync): update replica index in snapshot sender --- source/libs/sync/src/syncMain.c | 50 ++++++++++++++++++----------- source/libs/sync/src/syncSnapshot.c | 8 +++-- 2 files changed, 37 insertions(+), 21 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 8be8c14b8f..feced0afdf 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -561,7 +561,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) stub.createTime = taosGetTimestampMs(); stub.rpcMsg = *pMsg; uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); - sDebug("vgId:%d, sync event propose, type:%s seq:%" PRIu64 " handle:%p", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), + sDebug("vgId:%d sync event propose, type:%s seq:%" PRIu64 " handle:%p", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), seqNum, pMsg->info.handle); SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId); @@ -1244,7 +1244,13 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l SRaftId oldReplicasId[TSDB_MAX_REPLICA]; memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId)); SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; - memcpy(oldSenders, pSyncNode->senders, sizeof(oldSenders)); + for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + oldSenders[i] = (pSyncNode->senders)[i]; + sDebug("vgId:%d sync event save senders %d, %p", pSyncNode->vgId, i, oldSenders[i]); + if (gRaftDetailLog) { + ; + } + } // init internal pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; @@ -1286,22 +1292,24 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l // reset new for (int i = 0; i < pSyncNode->replicaNum; ++i) { // reset sender + bool reset = false; for (int j = 0; j < TSDB_MAX_REPLICA; ++j) { if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) { char host[128]; uint16_t port; syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); - sDebug("vgId:%d sync event reset sender for %lu, %s:%d", pSyncNode->vgId, (pSyncNode->replicasId)[i].addr, host, - port); + sDebug("vgId:%d sync event reset sender for %lu, newIndex:%d, %s:%d, %p", pSyncNode->vgId, + (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]); (pSyncNode->senders)[i] = oldSenders[j]; oldSenders[j] = NULL; + reset = true; + + // reset replicaIndex + int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; + (pSyncNode->senders)[i]->replicaIndex = i; + sDebug("vgId:%d sync event udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", pSyncNode->vgId, + oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); } - - // reset replicaIndex - int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; - (pSyncNode->senders)[i]->replicaIndex = i; - - sDebug("vgId:%d sync event udpate replicaIndex from %d to %d", pSyncNode->vgId, oldreplicaIndex, i); } } @@ -1309,7 +1317,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if ((pSyncNode->senders)[i] == NULL) { (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); - sDebug("vgId:%d sync event create new sender replicaIndex:%d", pSyncNode->vgId, i); + sDebug("vgId:%d sync event create new sender %p replicaIndex:%d", pSyncNode->vgId, (pSyncNode->senders)[i], i); } } @@ -1317,8 +1325,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if (oldSenders[i] != NULL) { snapshotSenderDestroy(oldSenders[i]); + sDebug("vgId:%d sync event delete old sender %p replicaIndex:%d", pSyncNode->vgId, oldSenders[i], i); oldSenders[i] = NULL; - sDebug("vgId:%d sync event delete old sender replicaIndex:%d", pSyncNode->vgId, i); } } @@ -1378,8 +1386,8 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { } void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { - sDebug("vgId:%d sync event become follower, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, - debugStr); + sDebug("vgId:%d sync event become follower, isStandBy:%d, replicaNum:%d, %s", pSyncNode->vgId, + pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr); // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -1413,8 +1421,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // /\ UNCHANGED <> // void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { - sDebug("vgId:%d sync event become leader, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, - debugStr); + sDebug("vgId:%d sync event become leader, isStandBy:%d, replicaNum:%d %s", pSyncNode->vgId, + pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr); // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; @@ -2028,14 +2036,18 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE // change isStandBy to normal if (!isDrop) { + char tmpbuf[128]; + snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum); if (ths->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(ths, "config change"); + syncNodeBecomeLeader(ths, tmpbuf); } else { - syncNodeBecomeFollower(ths, "config change"); + syncNodeBecomeFollower(ths, tmpbuf); } } } else { - syncNodeBecomeFollower(ths, "config change2"); + char tmpbuf[128]; + snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum); + syncNodeBecomeFollower(ths, tmpbuf); } if (gRaftDetailLog) { diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index d21a1e96e9..afebfa798e 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -588,6 +588,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // maybe update lastconfig if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { + int32_t oldReplicaNum = pSyncNode->replicaNum; + // update new config myIndex bool IamInNew = false; SSyncCfg newSyncCfg = pMsg->lastConfig; @@ -614,10 +616,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // change isStandBy to normal if (!isDrop) { + char tmpbuf[128]; + snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d", oldReplicaNum, newSyncCfg.replicaNum); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(pSyncNode, "config change3"); + syncNodeBecomeLeader(pSyncNode, tmpbuf); } else { - syncNodeBecomeFollower(pSyncNode, "config change3"); + syncNodeBecomeFollower(pSyncNode, tmpbuf); } } } From fa25fa32e0c56c2f3788493452e9ae77cf8cc755 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 14 Jun 2022 14:54:11 +0800 Subject: [PATCH 7/7] refactor(sync): add test syncRaftIdChec --- source/libs/sync/test/syncRaftIdCheck.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/sync/test/syncRaftIdCheck.cpp b/source/libs/sync/test/syncRaftIdCheck.cpp index f23c0db82b..90560e91e7 100644 --- a/source/libs/sync/test/syncRaftIdCheck.cpp +++ b/source/libs/sync/test/syncRaftIdCheck.cpp @@ -15,14 +15,14 @@ int main(int argc, char** argv) { char host[128]; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); - printf("%s:%d \n", host, port); + printf("%lu -> %s:%d \n", u64, host, port); } else if (argc == 3) { uint64_t u64; char* host = argv[1]; uint16_t port = atoi(argv[2]); - syncUtilAddr2U64(host, port); - printf("%lu \n", u64); + u64 = syncUtilAddr2U64(host, port); + printf("%s:%d -> %lu \n", host, port, u64); } else { usage(argv[0]); exit(-1);