From e350f71396b91ff1905ea666c03fe9e450499a3b Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 14 Mar 2023 19:45:31 +0800 Subject: [PATCH 1/4] enh: reset elect timer at the end of callbacks --- source/libs/sync/src/syncAppendEntries.c | 6 +++--- source/libs/sync/src/syncElection.c | 1 - source/libs/sync/src/syncMain.c | 22 ++++++++-------------- source/libs/sync/src/syncRequestVote.c | 5 ++++- source/libs/sync/src/syncSnapshot.c | 2 +- 5 files changed, 16 insertions(+), 20 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index bd1dae54d9..9ab545075c 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -105,6 +105,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SRpcMsg rpcRsp = {0}; bool accepted = false; SSyncRaftEntry* pEntry = NULL; + bool resetElect = false; // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { @@ -137,7 +138,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } syncNodeStepDown(ths, pMsg->term); - syncNodeResetElectTimer(ths); + resetElect = true; if (pMsg->dataLen < sizeof(SSyncRaftEntry)) { sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d", @@ -184,10 +185,9 @@ _SEND_RESPONSE: // commit index, i.e. leader notice me if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { sError("vgId:%d, failed to commit raft fsm log since %s.", ths->vgId, terrstr()); - goto _out; } -_out: + if (resetElect) syncNodeResetElectTimer(ths); return 0; _IGNORE: diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index e53b8ade1c..3699efbc59 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -115,6 +115,5 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { ASSERT(ret == 0); syncNodeResetElectTimer(pSyncNode); - return ret; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e5fe4a2369..ed97c8abde 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1593,8 +1593,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; syncNodeStopHeartbeatTimer(pSyncNode); - // reset elect timer - syncNodeResetElectTimer(pSyncNode); + // trace log + sNTrace(pSyncNode, "become follower %s", debugStr); // send rsp to client syncNodeLeaderChangeRsp(pSyncNode); @@ -1610,8 +1610,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // reset log buffer syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); - // trace log - sNTrace(pSyncNode, "become follower %s", debugStr); + // reset elect timer + syncNodeResetElectTimer(pSyncNode); } // TLA+ Spec @@ -2277,6 +2277,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) { int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncHeartbeat* pMsg = pRpcMsg->pCont; + bool resetElect = false; const STraceId* trace = &pRpcMsg->info.traceId; char tbuf[40] = {0}; @@ -2300,12 +2301,11 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (pMsg->term == currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs); + resetElect = true; - syncNodeResetElectTimer(ths); ths->minMatchIndex = pMsg->minMatchIndex; if (ths->state == TAOS_SYNC_STATE_FOLLOWER) { - // syncNodeFollowerCommit(ths, pMsg->commitIndex); SRpcMsg rpcMsgLocalCmd = {0}; (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); @@ -2328,7 +2328,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } if (pMsg->term >= currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { - // syncNodeStepDown(ths, pMsg->term); SRpcMsg rpcMsgLocalCmd = {0}; (void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); @@ -2348,15 +2347,10 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } } - /* - // htonl - SMsgHead* pHead = rpcMsg.pCont; - pHead->contLen = htonl(pHead->contLen); - pHead->vgId = htonl(pHead->vgId); - */ - // reply syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + + if (resetElect) syncNodeResetElectTimer(ths); return 0; } diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 2fda2a19b8..40b747b711 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -89,6 +89,7 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* ths, SyncRequestVote* pMsg) { int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncRequestVote* pMsg = pRpcMsg->pCont; + bool resetElect = false; // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) { @@ -115,7 +116,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncNodeStepDown(ths, currentTerm); // forbid elect for this round - syncNodeResetElectTimer(ths); + resetElect = true; } // send msg @@ -134,5 +135,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, ""); syncLogSendRequestVoteReply(ths, pReply, ""); syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + + if (resetElect) syncNodeResetElectTimer(ths); return 0; } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 413d6adf05..a9d0ddad17 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -798,7 +798,6 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { if (pMsg->term > raftStoreGetTerm(pSyncNode)) { syncNodeStepDown(pSyncNode, pMsg->term); } - syncNodeResetElectTimer(pSyncNode); // state, term, seq/ack int32_t code = 0; @@ -840,6 +839,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { code = -1; } + syncNodeResetElectTimer(pSyncNode); return code; } From cc76d78673e50c04db8613001763bbb8bba2c369 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 14 Mar 2023 20:35:39 +0800 Subject: [PATCH 2/4] enh: separate sync read and write events --- include/common/tmsgcb.h | 2 +- source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 4 ++-- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 17 +++++++++-------- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 16 ++++++++-------- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 4 ++-- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 16 ++++++++-------- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 6 +++--- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 20 +++++++++----------- source/dnode/mnode/impl/src/mndSync.c | 2 +- source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- 10 files changed, 44 insertions(+), 45 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index eca8740d28..9b709272b2 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -34,7 +34,7 @@ typedef enum { WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, - SYNC_CTRL_QUEUE, + SYNC_RD_QUEUE, STREAM_QUEUE, QUEUE_MAX, } EQueueType; diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index b47742b4ed..24f75e3c8b 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -34,7 +34,7 @@ typedef struct SMnodeMgmt { SSingleWorker readWorker; SSingleWorker writeWorker; SSingleWorker syncWorker; - SSingleWorker syncCtrlWorker; + SSingleWorker syncRdWorker; bool stopped; int32_t refCount; TdThreadRwlock lock; @@ -54,7 +54,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt); int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutMsgToSyncCtrlQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index fe65c3dde9..c129ea21ac 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -188,21 +188,22 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; + + if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index b0810d528f..0152e5d0b1 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -111,8 +111,8 @@ int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg); } -int32_t mmPutMsgToSyncCtrlQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutMsgToWorker(pMgmt, &pMgmt->syncCtrlWorker, pMsg); +int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutMsgToWorker(pMgmt, &pMgmt->syncRdWorker, pMsg); } int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { @@ -151,8 +151,8 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { case SYNC_QUEUE: pWorker = &pMgmt->syncWorker; break; - case SYNC_CTRL_QUEUE: - pWorker = &pMgmt->syncCtrlWorker; + case SYNC_RD_QUEUE: + pWorker = &pMgmt->syncRdWorker; break; default: terrno = TSDB_CODE_INVALID_PARA; @@ -238,12 +238,12 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { SSingleWorkerCfg scCfg = { .min = 1, .max = 1, - .name = "mnode-sync-ctrl", + .name = "mnode-sync-rd", .fp = (FItem)mmProcessSyncMsg, .param = pMgmt, }; - if (tSingleWorkerInit(&pMgmt->syncCtrlWorker, &scCfg) != 0) { - dError("failed to start mnode mnode-sync-ctrl worker since %s", terrstr()); + if (tSingleWorkerInit(&pMgmt->syncRdWorker, &scCfg) != 0) { + dError("failed to start mnode mnode-sync-rd worker since %s", terrstr()); return -1; } @@ -259,6 +259,6 @@ void mmStopWorker(SMnodeMgmt *pMgmt) { tSingleWorkerCleanup(&pMgmt->readWorker); tSingleWorkerCleanup(&pMgmt->writeWorker); tSingleWorkerCleanup(&pMgmt->syncWorker); - tSingleWorkerCleanup(&pMgmt->syncCtrlWorker); + tSingleWorkerCleanup(&pMgmt->syncRdWorker); dDebug("mnode workers are closed"); } diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index e3fa2964b7..4374ae363c 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -59,7 +59,7 @@ typedef struct { SVnode *pImpl; SMultiWorker pWriteW; SMultiWorker pSyncW; - SMultiWorker pSyncCtrlW; + SMultiWorker pSyncRdW; SMultiWorker pApplyW; STaosQueue *pQueryQ; STaosQueue *pStreamQ; @@ -107,7 +107,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc); int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToSyncRdQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d7f91b74a8..b60d2c3caf 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -549,22 +549,22 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 0524e2713a..4c5b1246e7 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -98,9 +98,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) pVnode->pSyncW.queue->threadId); tMultiWorkerCleanup(&pVnode->pSyncW); - dInfo("vgId:%d, wait for vnode sync ctrl queue:%p is empty, thread:%08" PRId64, pVnode->vgId, - pVnode->pSyncCtrlW.queue, pVnode->pSyncCtrlW.queue->threadId); - tMultiWorkerCleanup(&pVnode->pSyncCtrlW); + dInfo("vgId:%d, wait for vnode sync rd queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue, + pVnode->pSyncRdW.queue->threadId); + tMultiWorkerCleanup(&pVnode->pSyncRdW); dInfo("vgId:%d, wait for vnode apply queue:%p is empty, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue, pVnode->pApplyW.queue->threadId); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 7aa1c9f56a..e4e0d608de 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -216,9 +216,9 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg); taosWriteQitem(pVnode->pSyncW.queue, pMsg); break; - case SYNC_CTRL_QUEUE: - dGTrace("vgId:%d, msg:%p put into vnode-sync-ctrl queue", pVnode->vgId, pMsg); - taosWriteQitem(pVnode->pSyncCtrlW.queue, pMsg); + case SYNC_RD_QUEUE: + dGTrace("vgId:%d, msg:%p put into vnode-sync-rd queue", pVnode->vgId, pMsg); + taosWriteQitem(pVnode->pSyncRdW.queue, pMsg); break; case APPLY_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg); @@ -234,9 +234,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp return code; } -int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return vmPutMsgToQueue(pMgmt, pMsg, SYNC_CTRL_QUEUE); -} +int32_t vmPutMsgToSyncRdQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_RD_QUEUE); } int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); } @@ -327,18 +325,18 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { SMultiWorkerCfg wcfg = {.max = 1, .name = "vnode-write", .fp = (FItems)vnodeProposeWriteMsg, .param = pVnode->pImpl}; SMultiWorkerCfg scfg = {.max = 1, .name = "vnode-sync", .fp = (FItems)vmProcessSyncQueue, .param = pVnode}; - SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-ctrl", .fp = (FItems)vmProcessSyncQueue, .param = pVnode}; + SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-rd", .fp = (FItems)vmProcessSyncQueue, .param = pVnode}; SMultiWorkerCfg acfg = {.max = 1, .name = "vnode-apply", .fp = (FItems)vnodeApplyWriteMsg, .param = pVnode->pImpl}; (void)tMultiWorkerInit(&pVnode->pWriteW, &wcfg); (void)tMultiWorkerInit(&pVnode->pSyncW, &scfg); - (void)tMultiWorkerInit(&pVnode->pSyncCtrlW, &sccfg); + (void)tMultiWorkerInit(&pVnode->pSyncRdW, &sccfg); (void)tMultiWorkerInit(&pVnode->pApplyW, &acfg); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue); pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue); - if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncCtrlW.queue == NULL || + if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL || pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -348,8 +346,8 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteW.queue->threadId); dInfo("vgId:%d, sync-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue, pVnode->pSyncW.queue->threadId); - dInfo("vgId:%d, sync-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncCtrlW.queue, - pVnode->pSyncCtrlW.queue->threadId); + dInfo("vgId:%d, sync-rd-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue, + pVnode->pSyncRdW.queue->threadId); dInfo("vgId:%d, apply-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue, pVnode->pApplyW.queue->threadId); dInfo("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 18548db56f..4965d5c34a 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -33,7 +33,7 @@ static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return -1; } - int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg); + int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg); if (code != 0) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index b49ca70bfa..d681f5b65e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -378,7 +378,7 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return -1; } - int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg); + int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg); if (code != 0) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; From 9f97162ef77956cfa3feb304d815e2867b60d589 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 14 Mar 2023 21:02:06 +0800 Subject: [PATCH 3/4] enh: separate election timer events --- include/common/tmsgdef.h | 2 +- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 3 ++- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 3 ++- source/libs/sync/src/syncMain.c | 3 +++ source/libs/sync/src/syncMessage.c | 4 ++-- source/libs/sync/src/syncTimeout.c | 5 ----- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 46ca814e50..96d18d1abc 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -259,7 +259,7 @@ enum { TD_NEW_MSG_SEG(TDMT_SYNC_MSG) TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL) // no longer used + TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT_ELECTION, "sync-elect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_BATCH, "sync-client-request-batch", NULL, NULL) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index c129ea21ac..ce485cb6ca 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -188,17 +188,18 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index b60d2c3caf..84dfab7e6c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -549,10 +549,12 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; @@ -562,7 +564,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ed97c8abde..a617a5d293 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -182,6 +182,9 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { case TDMT_SYNC_TIMEOUT: code = syncNodeOnTimeout(pSyncNode, pMsg); break; + case TDMT_SYNC_TIMEOUT_ELECTION: + code = syncNodeOnTimeout(pSyncNode, pMsg); + break; case TDMT_SYNC_CLIENT_REQUEST: code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL); break; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 2a44588eef..72c8887803 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -22,7 +22,7 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l SSyncNode* pNode) { int32_t bytes = sizeof(SyncTimeout); pMsg->pCont = rpcMallocCont(bytes); - pMsg->msgType = TDMT_SYNC_TIMEOUT; + pMsg->msgType = (timeoutType == SYNC_TIMEOUT_ELECTION) ? TDMT_SYNC_TIMEOUT_ELECTION : TDMT_SYNC_TIMEOUT; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -31,7 +31,7 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l SyncTimeout* pTimeout = pMsg->pCont; pTimeout->bytes = bytes; - pTimeout->msgType = TDMT_SYNC_TIMEOUT; + pTimeout->msgType = pMsg->msgType; pTimeout->vgId = pNode->vgId; pTimeout->timeoutType = timeoutType; pTimeout->logicClock = logicClock; diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 859183db95..a27be2853e 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -120,9 +120,6 @@ int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pRpc) { if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) { ++(ths->pingTimerCounter); - // syncNodePingAll(ths); - // syncNodePingPeers(ths); - syncNodeTimerRoutine(ths); } @@ -138,8 +135,6 @@ int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pRpc) { ++(ths->heartbeatTimerCounter); sTrace("vgId:%d, sync timer, type:replicate count:%" PRIu64 ", lc-user:%" PRIu64, ths->vgId, ths->heartbeatTimerCounter, ths->heartbeatTimerLogicClockUser); - - // syncNodeReplicate(ths, true); } } else { From 78ab9d3bf9a8f38aecec1eb3eb20c61139e2fe58 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 28 Mar 2023 17:11:40 +0800 Subject: [PATCH 4/4] fix: process appendLog replies in vnode-sync thread --- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 4 ++-- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index ce485cb6ca..c2c9e37c8c 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -192,16 +192,16 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 84dfab7e6c..28cb5d2058 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -553,17 +553,17 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;