From 249aecacda766ee5b2560ae5416985b201c80d69 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 21 May 2022 21:26:27 +0800 Subject: [PATCH 01/22] enh(sync) sync/mnode integration --- include/common/tmsgdef.h | 2 + include/dnode/mnode/mnode.h | 3 + source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 2 + source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 5 + source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 42 ++++++++ source/dnode/mnode/impl/inc/mndInt.h | 4 +- source/dnode/mnode/impl/src/mndSync.c | 107 +++++++++++++++++++- source/dnode/mnode/impl/src/mndTrans.c | 5 +- source/dnode/mnode/impl/src/mnode.c | 20 +++- source/libs/sync/src/syncMain.c | 19 ++-- 10 files changed, 193 insertions(+), 16 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 455898585a..bf683fc9ac 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -158,6 +158,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "mnode-drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL) + + TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL) // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 28c470a443..16cb4cdd70 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -90,6 +90,9 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); */ int32_t mndProcessMsg(SRpcMsg *pMsg); + +int32_t mndProcessApplyMsg(SRpcMsg *pMsg); + /** * @brief Generate machine code */ diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 75e83d6547..380ae63b8d 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -33,6 +33,7 @@ typedef struct SMnodeMgmt { SSingleWorker readWorker; SSingleWorker writeWorker; SSingleWorker syncWorker; + SSingleWorker applyWorker; SSingleWorker monitorWorker; SReplica replicas[TSDB_MAX_REPLICA]; int8_t replica; @@ -59,6 +60,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt); int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutNodeMsgToApplyQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 4f7fd4a1c0..68b2889278 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -122,6 +122,11 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { return -1; } + if (syncInit() != 0) { + dError("failed to init sync since %s", terrstr()); + return -1; + } + SMnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SMnodeMgmt)); if (pMgmt == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index c4314a57b1..04fe891ce9 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -56,6 +56,32 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } + +static void mmProcessApplyQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { + SMnodeMgmt *pMgmt = pInfo->ahandle; + int32_t code = -1; + tmsg_t msgType = pMsg->msgType; + bool isRequest = msgType & 1U; + dTrace("msg:%p, get from mnode-query queue", pMsg); + + pMsg->info.node = pMgmt->pMnode; + + mndProcessApplyMsg(pMsg); + + /* + if (isRequest) { + if (pMsg->info.handle != NULL && code != 0) { + if (code != 0 && terrno != 0) code = terrno; + mmSendRsp(pMsg, code); + } + } + */ + + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + static void mmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; @@ -92,6 +118,10 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); } +int32_t mmPutNodeMsgToApplyQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutNodeMsgToWorker(&pMgmt->applyWorker, pMsg); +} + int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); } @@ -179,6 +209,18 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { return -1; } + SSingleWorkerCfg aCfg = { + .min = 1, + .max = 1, + .name = "mnode-apply", + .fp = (FItem)mmProcessApplyQueue, + .param = pMgmt, + }; + if (tSingleWorkerInit(&pMgmt->applyWorker, &aCfg) != 0) { + dError("failed to start mnode mnode-apply worker since %s", terrstr()); + return -1; + } + SSingleWorkerCfg mCfg = { .min = 1, .max = 1, diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 5258fa9e02..0982096d25 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -75,7 +75,9 @@ typedef struct { int32_t errCode; sem_t syncSem; SWal *pWal; - SSyncNode *pSyncNode; + //SSyncNode *pSyncNode; + int64_t sync; + ESyncState state; } SSyncMgmt; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 3dbe3241a7..e7144f5673 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -46,6 +46,12 @@ static void mndCloseWal(SMnode *pMnode) { } static int32_t mndRestoreWal(SMnode *pMnode) { + +// do nothing +return 0; + +#if 0 + SWal *pWal = pMnode->syncMgmt.pWal; SSdb *pSdb = pMnode->pSdb; int64_t lastSdbVer = sdbUpdateVer(pSdb, 0); @@ -114,6 +120,70 @@ static int32_t mndRestoreWal(SMnode *pMnode) { _OVER: walCloseReadHandle(pHandle); return code; + +#endif + +} + +int32_t mndSyncEqMsg(const SMsgCb* msgcb, SRpcMsg *pMsg) { + int32_t ret = 0; + if (msgcb->queueFps[SYNC_QUEUE] != NULL) { + tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); + } else { + mError("mndSyncEqMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); + } + return ret; +} + +int32_t mndSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { + int32_t ret = 0; + pMsg->info.noResp = 1; + tmsgSendReq(pEpSet, pMsg); + return ret; +} + +void mndSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + SyncIndex beginIndex = SYNC_INDEX_INVALID; + if (pFsm->FpGetSnapshot != NULL) { + SSnapshot snapshot; + pFsm->FpGetSnapshot(pFsm, &snapshot); + beginIndex = snapshot.lastApplyIndex; + } + + if (cbMeta.index > beginIndex) { + SMnode *pMnode = pFsm->data; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + + mndProcessApplyMsg((SRpcMsg*)pMsg); + //mmPutNodeMsgToApplyQueue(pMnode->pWrapper->pMgmt, pMsg); + + if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { + tsem_post(&pMgmt->syncSem); + } + } +} + +void mndSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + // strict consistent, do nothing +} + +void mndSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + // strict consistent, do nothing +} + +int32_t mndSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { + // snapshot + return 0; +} + +SSyncFSM *syncMnodeMakeFsm(SMnode *pMnode) { + SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM)); + pFsm->data = pMnode; + pFsm->FpCommitCb = mndSyncCommitCb; + pFsm->FpPreCommitCb = mndSyncPreCommitCb; + pFsm->FpRollBackCb = mndSyncRollBackCb; + pFsm->FpGetSnapshot = mndSyncGetSnapshotCb; + return pFsm; } int32_t mndInitSync(SMnode *pMnode) { @@ -133,7 +203,27 @@ int32_t mndInitSync(SMnode *pMnode) { if (pMnode->selfId == 1) { pMgmt->state = TAOS_SYNC_STATE_LEADER; } - pMgmt->pSyncNode = NULL; + + // pMgmt->pSyncNode = NULL; + SSyncInfo syncInfo; + syncInfo.vgId = 1; + SSyncCfg *pCfg = &(syncInfo.syncCfg); + pCfg->replicaNum = pMnode->replica; + pCfg->myIndex = pMnode->selfIndex; + for (int i = 0; i < pMnode->replica; ++i) { + snprintf((pCfg->nodeInfo)->nodeFqdn, sizeof((pCfg->nodeInfo)->nodeFqdn), "%s", (pMnode->replicas)[i].fqdn); + (pCfg->nodeInfo)->nodePort = (pMnode->replicas)[i].port; + } + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", pMnode->path); + syncInfo.pWal = pMnode->syncMgmt.pWal; + + syncInfo.pFsm = syncMnodeMakeFsm(pMnode); + syncInfo.FpSendMsg = mndSendMsg; + syncInfo.FpEqMsg = mndSyncEqMsg; + + pMnode->syncMgmt.sync = syncOpen(&syncInfo); + ASSERT(pMnode->syncMgmt.sync > 0); + return 0; } @@ -157,6 +247,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { SWal *pWal = pMnode->syncMgmt.pWal; SSdb *pSdb = pMnode->pSdb; +#if 0 int64_t ver = sdbUpdateVer(pSdb, 1); if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) { sdbUpdateVer(pSdb, -1); @@ -168,24 +259,32 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { walCommit(pWal, ver); walFsync(pWal, true); -#if 1 return 0; + #else + if (pMnode->replica == 1) return 0; SSyncMgmt *pMgmt = &pMnode->syncMgmt; pMgmt->errCode = 0; - SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)}; + //SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)}; + + SRpcMsg rpcMsg; + rpcMsg.code = TDMT_MND_APPLY_MSG; + rpcMsg.contLen = sdbGetRawTotalSize(pRaw); + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + memcpy(rpcMsg.pCont, pRaw, rpcMsg.contLen); bool isWeak = false; - int32_t code = syncPropose(pMgmt->pSyncNode, &buf, pMnode, isWeak); + int32_t code = syncPropose(pMgmt->sync, &rpcMsg, isWeak); if (code != 0) return code; tsem_wait(&pMgmt->syncSem); return pMgmt->errCode; #endif + } bool mndIsMaster(SMnode *pMnode) { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 35ecaa748e..f4fa4beaf1 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -683,11 +683,14 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, sync finished", pTrans->id); - code = sdbWrite(pMnode->pSdb, pRaw); +// do it in state machine commit cb +#if 0 + code = sdbWriteWithout(pMnode->pSdb, pRaw); if (code != 0) { mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); return -1; } +#endif return 0; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 8c805dd8c7..f8e5a65f0f 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -336,9 +336,25 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { return 0; } -int32_t mndStart(SMnode *pMnode) { return mndInitTimer(pMnode); } +int32_t mndStart(SMnode *pMnode) { + syncStart(pMnode->syncMgmt.sync); + return mndInitTimer(pMnode); +} -void mndStop(SMnode *pMnode) { return mndCleanupTimer(pMnode); } +void mndStop(SMnode *pMnode) { + syncStop(pMnode->syncMgmt.sync); + return mndCleanupTimer(pMnode); +} + +int32_t mndProcessApplyMsg(SRpcMsg *pMsg) { + + SSdbRaw *pRaw = pMsg->pCont; + SMnode *pMnode = pMsg->info.node; + int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw); + rpcFreeCont(pMsg->pCont); + + return code; +} int32_t mndProcessMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d9ff60bbe2..9c02b91ef0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -55,14 +55,17 @@ static void syncFreeNode(void* param); // --------------------------------- int32_t syncInit() { - int32_t ret; - tsNodeRefId = taosOpenRef(200, syncFreeNode); - if (tsNodeRefId < 0) { - sError("failed to init node ref"); - syncCleanUp(); - ret = -1; - } else { - ret = syncEnvStart(); + int32_t ret = 0; + + if (!syncEnvIsStart()) { + tsNodeRefId = taosOpenRef(200, syncFreeNode); + if (tsNodeRefId < 0) { + sError("failed to init node ref"); + syncCleanUp(); + ret = -1; + } else { + ret = syncEnvStart(); + } } return ret; From a1cd6839a7d0af955a5142bccccbd7a6e7f30848 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 21 May 2022 22:15:23 +0800 Subject: [PATCH 02/22] enh(sync) sync/mnode integration --- include/dnode/mnode/mnode.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 16cb4cdd70..ec4b4dd06d 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -90,6 +90,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); */ int32_t mndProcessMsg(SRpcMsg *pMsg); +int32_t mndProcessSyncMsg(SRpcMsg *pMsg); int32_t mndProcessApplyMsg(SRpcMsg *pMsg); From b20794f365058bbc6d6a7266530f046e6d81003b Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 21 May 2022 22:35:02 +0800 Subject: [PATCH 03/22] enh(sync) sync/mnode integration --- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 10 +++ source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 9 +- source/dnode/mnode/impl/src/mnode.c | 98 +++++++++++++++++++++ 3 files changed, 116 insertions(+), 1 deletion(-) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 2ce42d7a5f..a894a4962d 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -237,6 +237,16 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 04fe891ce9..8574e01226 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -56,6 +56,13 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } +static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { + SMnodeMgmt *pMgmt = pInfo->ahandle; + pMsg->info.node = pMgmt->pMnode; + + mndProcessSyncMsg(pMsg); + return; +} static void mmProcessApplyQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; @@ -201,7 +208,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .min = 1, .max = 1, .name = "mnode-sync", - .fp = (FItem)mmProcessQueue, + .fp = (FItem)mmProcessSyncQueue, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) { diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index f8e5a65f0f..23baa43e97 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -356,6 +356,104 @@ int32_t mndProcessApplyMsg(SRpcMsg *pMsg) { return code; } +#include "syncTools.h" + +int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { + SMnode *pMnode = pMsg->info.node; + void *ahandle = pMsg->info.ahandle; + + int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; + + if (syncEnvIsStart()) { + SSyncNode *pSyncNode = syncNodeAcquire(pMnode->syncMgmt.sync); + assert(pSyncNode != NULL); + + ESyncState state = syncGetMyRole(pMnode->syncMgmt.sync); + SyncTerm currentTerm = syncGetMyTerm(pMnode->syncMgmt.sync); + + SMsgHead *pHead = pMsg->pCont; + + char logBuf[512]; + char *syncNodeStr = sync2SimpleStr(pMnode->syncMgmt.sync); + snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); + syncRpcMsgLog2(logBuf, pMsg); + taosMemoryFree(syncNodeStr); + + SRpcMsg *pRpcMsg = pMsg; + + if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) { + SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); + syncTimeoutDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) { + SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) { + SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); + syncPingReplyDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) { + SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg); + syncClientRequestDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) { + SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) { + SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) { + SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); + syncAppendEntriesDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) { + SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); + + } else { + mError("==mndProcessSyncMsg== error msg type:%d", pRpcMsg->msgType); + ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; + } + + syncNodeRelease(pSyncNode); + } else { + mError("==mndProcessSyncMsg== error syncEnv stop"); + ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; + } + + return ret; + + + return 0; +} + int32_t mndProcessMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; void *ahandle = pMsg->info.ahandle; From e06f9fd05ce82b24398b00387dbf9426dd3bd1ee Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 22 May 2022 12:34:22 +0800 Subject: [PATCH 04/22] refactor: sync integrate into mnode --- include/common/tmsgdef.h | 1 - include/dnode/mnode/mnode.h | 2 - source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 19 +------- source/dnode/mnode/impl/inc/mndInt.h | 7 +-- source/dnode/mnode/impl/src/mndSync.c | 53 +++++++-------------- source/dnode/mnode/impl/src/mnode.c | 13 ++--- source/libs/sync/src/syncMain.c | 1 - 7 files changed, 25 insertions(+), 71 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index bf683fc9ac..e8e931daa5 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -158,7 +158,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "mnode-drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL) // Requests handled by VNODE diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index ec4b4dd06d..5b028d5055 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -89,9 +89,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); * @return int32_t 0 for success, -1 for failure. */ int32_t mndProcessMsg(SRpcMsg *pMsg); - int32_t mndProcessSyncMsg(SRpcMsg *pMsg); - int32_t mndProcessApplyMsg(SRpcMsg *pMsg); /** diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 89d657d2d5..0f1d325237 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -59,32 +59,17 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; pMsg->info.node = pMgmt->pMnode; - mndProcessSyncMsg(pMsg); - return; } static void mmProcessApplyQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; - int32_t code = -1; - tmsg_t msgType = pMsg->msgType; - bool isRequest = msgType & 1U; - dTrace("msg:%p, get from mnode-query queue", pMsg); + dTrace("msg:%p, get from mnode-apply queue", pMsg); pMsg->info.node = pMgmt->pMnode; - mndProcessApplyMsg(pMsg); - /* - if (isRequest) { - if (pMsg->info.handle != NULL && code != 0) { - if (code != 0 && terrno != 0) code = terrno; - mmSendRsp(pMsg, code); - } - } - */ - - dTrace("msg:%p, is freed, code:0x%x", pMsg, code); + dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 0982096d25..16945b1403 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -19,6 +19,7 @@ #include "mndDef.h" #include "sdb.h" +#include "syncTools.h" #include "tcache.h" #include "tdatablock.h" #include "tglobal.h" @@ -31,12 +32,14 @@ extern "C" { #endif +// clang-format off #define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} #define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} #define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} #define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }} #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} +// clang-format on #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) @@ -75,9 +78,7 @@ typedef struct { int32_t errCode; sem_t syncSem; SWal *pWal; - //SSyncNode *pSyncNode; - int64_t sync; - + int64_t sync; ESyncState state; } SSyncMgmt; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index e7144f5673..d8eb021462 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -125,24 +125,11 @@ _OVER: } -int32_t mndSyncEqMsg(const SMsgCb* msgcb, SRpcMsg *pMsg) { - int32_t ret = 0; - if (msgcb->queueFps[SYNC_QUEUE] != NULL) { - tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); - } else { - mError("mndSyncEqMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); - } - return ret; -} +int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } -int32_t mndSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { - int32_t ret = 0; - pMsg->info.noResp = 1; - tmsgSendReq(pEpSet, pMsg); - return ret; -} +int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } -void mndSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { SyncIndex beginIndex = SYNC_INDEX_INVALID; if (pFsm->FpGetSnapshot != NULL) { SSnapshot snapshot; @@ -163,26 +150,26 @@ void mndSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMe } } -void mndSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { // strict consistent, do nothing } -void mndSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +void mndSyncRollBackMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { // strict consistent, do nothing } -int32_t mndSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { +int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { // snapshot return 0; } -SSyncFSM *syncMnodeMakeFsm(SMnode *pMnode) { - SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM)); +SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { + SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pMnode; - pFsm->FpCommitCb = mndSyncCommitCb; - pFsm->FpPreCommitCb = mndSyncPreCommitCb; - pFsm->FpRollBackCb = mndSyncRollBackCb; - pFsm->FpGetSnapshot = mndSyncGetSnapshotCb; + pFsm->FpCommitCb = mndSyncCommitMsg; + pFsm->FpPreCommitCb = mndSyncPreCommitMsg; + pFsm->FpRollBackCb = mndSyncRollBackMsg; + pFsm->FpGetSnapshot = mndSyncGetSnapshot; return pFsm; } @@ -195,18 +182,11 @@ int32_t mndInitSync(SMnode *pMnode) { return -1; } - if (mndRestoreWal(pMnode) < 0) { - mError("failed to restore wal since %s", terrstr()); - return -1; - } - if (pMnode->selfId == 1) { pMgmt->state = TAOS_SYNC_STATE_LEADER; } - - // pMgmt->pSyncNode = NULL; - SSyncInfo syncInfo; - syncInfo.vgId = 1; + + SSyncInfo syncInfo = {.vgId = 1}; SSyncCfg *pCfg = &(syncInfo.syncCfg); pCfg->replicaNum = pMnode->replica; pCfg->myIndex = pMnode->selfIndex; @@ -216,9 +196,8 @@ int32_t mndInitSync(SMnode *pMnode) { } snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", pMnode->path); syncInfo.pWal = pMnode->syncMgmt.pWal; - - syncInfo.pFsm = syncMnodeMakeFsm(pMnode); - syncInfo.FpSendMsg = mndSendMsg; + syncInfo.pFsm = mndSyncMakeFsm(pMnode); + syncInfo.FpSendMsg = mndSyncSendMsg; syncInfo.FpEqMsg = mndSyncEqMsg; pMnode->syncMgmt.sync = syncOpen(&syncInfo); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 02f4e0872c..297174beb7 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -343,25 +343,18 @@ int32_t mndStart(SMnode *pMnode) { void mndStop(SMnode *pMnode) { syncStop(pMnode->syncMgmt.sync); - return mndCleanupTimer(pMnode); + return mndCleanupTimer(pMnode); } int32_t mndProcessApplyMsg(SRpcMsg *pMsg) { - SSdbRaw *pRaw = pMsg->pCont; - SMnode *pMnode = pMsg->info.node; - int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw); - rpcFreeCont(pMsg->pCont); - - return code; + SMnode *pMnode = pMsg->info.node; + return sdbWriteWithoutFree(pMnode->pSdb, pRaw); } -#include "syncTools.h" - int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; void *ahandle = pMsg->info.ahandle; - int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; if (syncEnvIsStart()) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9c02b91ef0..f4f09d0f7b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include #include "sync.h" #include "syncAppendEntries.h" #include "syncAppendEntriesReply.h" From 32d48f3b8fab0b2aae3d3ebb5be7c3b848ff9e92 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 22 May 2022 16:42:44 +0800 Subject: [PATCH 05/22] refactor: sync integrate into mnode --- source/dnode/mnode/impl/src/mndSync.c | 57 ++++++++++++++------------- source/dnode/mnode/impl/src/mnode.c | 5 ++- source/dnode/mnode/sdb/src/sdb.c | 10 +---- 3 files changed, 34 insertions(+), 38 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index d8eb021462..df289aa285 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -46,9 +46,8 @@ static void mndCloseWal(SMnode *pMnode) { } static int32_t mndRestoreWal(SMnode *pMnode) { - -// do nothing -return 0; + // do nothing + return 0; #if 0 @@ -122,7 +121,6 @@ _OVER: return code; #endif - } int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } @@ -132,7 +130,7 @@ int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { SyncIndex beginIndex = SYNC_INDEX_INVALID; if (pFsm->FpGetSnapshot != NULL) { - SSnapshot snapshot; + SSnapshot snapshot = {0}; pFsm->FpGetSnapshot(pFsm, &snapshot); beginIndex = snapshot.lastApplyIndex; } @@ -141,8 +139,9 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM SMnode *pMnode = pFsm->data; SSyncMgmt *pMgmt = &pMnode->syncMgmt; - mndProcessApplyMsg((SRpcMsg*)pMsg); - //mmPutNodeMsgToApplyQueue(pMnode->pWrapper->pMgmt, pMsg); + SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; + pApplyMsg->info.node = pFsm->data; + mndProcessApplyMsg(pApplyMsg); if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { tsem_post(&pMgmt->syncSem); @@ -182,19 +181,16 @@ int32_t mndInitSync(SMnode *pMnode) { return -1; } - if (pMnode->selfId == 1) { - pMgmt->state = TAOS_SYNC_STATE_LEADER; - } - SSyncInfo syncInfo = {.vgId = 1}; SSyncCfg *pCfg = &(syncInfo.syncCfg); pCfg->replicaNum = pMnode->replica; pCfg->myIndex = pMnode->selfIndex; - for (int i = 0; i < pMnode->replica; ++i) { - snprintf((pCfg->nodeInfo)->nodeFqdn, sizeof((pCfg->nodeInfo)->nodeFqdn), "%s", (pMnode->replicas)[i].fqdn); - (pCfg->nodeInfo)->nodePort = (pMnode->replicas)[i].port; + for (int32_t i = 0; i < pMnode->replica; ++i) { + SNodeInfo *pNodeInfo = &pCfg->nodeInfo[i]; + tstrncpy(pNodeInfo->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNodeInfo->nodeFqdn)); + pNodeInfo->nodePort = pMnode->replicas[i].port; } - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", pMnode->path); + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); syncInfo.pWal = pMnode->syncMgmt.pWal; syncInfo.pFsm = mndSyncMakeFsm(pMnode); syncInfo.FpSendMsg = mndSyncSendMsg; @@ -242,31 +238,38 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { #else - if (pMnode->replica == 1) return 0; - SSyncMgmt *pMgmt = &pMnode->syncMgmt; pMgmt->errCode = 0; - //SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)}; - - SRpcMsg rpcMsg; - rpcMsg.code = TDMT_MND_APPLY_MSG; - rpcMsg.contLen = sdbGetRawTotalSize(pRaw); - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - memcpy(rpcMsg.pCont, pRaw, rpcMsg.contLen); + SRpcMsg rsp = {0}; + rsp.code = TDMT_MND_APPLY_MSG; + rsp.contLen = sdbGetRawTotalSize(pRaw); + rsp.pCont = rpcMallocCont(rsp.contLen); + memcpy(rsp.pCont, pRaw, rsp.contLen); bool isWeak = false; - int32_t code = syncPropose(pMgmt->sync, &rpcMsg, isWeak); + int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak); + if (code == 0) { + tsem_wait(&pMgmt->syncSem); + } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { + terrno = TSDB_CODE_APP_NOT_READY; + mError("failed to propose raw:%p since not leader", pRaw); + return -1; + } else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + mError("failed to propose raw:%p since sync internal error", pRaw); + } else { + assert(0); + } if (code != 0) return code; - tsem_wait(&pMgmt->syncSem); return pMgmt->errCode; #endif - } bool mndIsMaster(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; + pMgmt->state = syncGetMyRole(pMgmt->sync); return pMgmt->state == TAOS_SYNC_STATE_LEADER; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 297174beb7..9a68e0e1f0 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -336,8 +336,9 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { return 0; } -int32_t mndStart(SMnode *pMnode) { - syncStart(pMnode->syncMgmt.sync); +int32_t mndStart(SMnode *pMnode) { + syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb); + syncStart(pMnode->syncMgmt.sync); return mndInitTimer(pMnode); } diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 1f11a77e6c..6e5dde57a6 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -31,11 +31,9 @@ SSdb *sdbInit(SSdbOpt *pOption) { char path[PATH_MAX + 100] = {0}; snprintf(path, sizeof(path), "%s%sdata", pOption->path, TD_DIRSEP); pSdb->currDir = strdup(path); - snprintf(path, sizeof(path), "%s%ssync", pOption->path, TD_DIRSEP); - pSdb->syncDir = strdup(path); snprintf(path, sizeof(path), "%s%stmp", pOption->path, TD_DIRSEP); pSdb->tmpDir = strdup(path); - if (pSdb->currDir == NULL || pSdb->currDir == NULL || pSdb->currDir == NULL) { + if (pSdb->currDir == NULL || pSdb->tmpDir == NULL) { sdbCleanup(pSdb); terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to init sdb since %s", terrstr()); @@ -149,12 +147,6 @@ static int32_t sdbCreateDir(SSdb *pSdb) { return -1; } - if (taosMkDir(pSdb->syncDir) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to create dir:%s since %s", pSdb->syncDir, terrstr()); - return -1; - } - if (taosMkDir(pSdb->tmpDir) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr()); From 6c1d1927c935bee7bee168c42000d0218d0dee10 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 23 May 2022 09:58:21 +0800 Subject: [PATCH 06/22] refactor: sync integrate into mnode --- include/dnode/mnode/sdb/sdb.h | 9 +-- source/dnode/mnode/impl/src/mndSync.c | 71 +++++++++----------- source/dnode/mnode/impl/test/sdb/sdbTest.cpp | 15 ++--- source/dnode/mnode/sdb/src/sdb.c | 4 +- 4 files changed, 44 insertions(+), 55 deletions(-) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 2abe0e5c73..3a885dd3da 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -304,13 +304,14 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type); int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type); /** - * @brief Update the version of sdb + * @brief Update the index of sdb * * @param pSdb The sdb object. - * @param val The update value of the version. - * @return int32_t The current version of sdb + * @param index The update value of the apply index. + * @return int32_t The current index of sdb */ -int64_t sdbUpdateVer(SSdb *pSdb, int32_t val); +void sdbSetApplyIndex(SSdb *pSdb, int64_t index); +int64_t sdbGetApplyIndex(SSdb *pSdb); SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen); void sdbFreeRaw(SSdbRaw *pRaw); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 0b7ed04819..df7949f6a5 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -128,38 +128,35 @@ int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SyncIndex beginIndex = SYNC_INDEX_INVALID; - if (pFsm->FpGetSnapshot != NULL) { - SSnapshot snapshot = {0}; - pFsm->FpGetSnapshot(pFsm, &snapshot); - beginIndex = snapshot.lastApplyIndex; - } - - if (cbMeta.index > beginIndex) { - SMnode *pMnode = pFsm->data; - SSyncMgmt *pMgmt = &pMnode->syncMgmt; - - SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; - pApplyMsg->info.node = pFsm->data; - mndProcessApplyMsg(pApplyMsg); - sdbUpdateVer(pMnode->pSdb, 1); + SMnode *pMnode = pFsm->data; + SSdb *pSdb = pMnode->pSdb; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + SyncIndex lastApply = sdbGetApplyIndex(pSdb); + SSdbRaw *pRaw = pMsg->pCont; + if (cbMeta.index > lastApply) { + mTrace("ver:%" PRId64 ", apply raw:%p to sdb, role:%s", cbMeta.index, pRaw, syncStr(cbMeta.state)); + sdbWriteWithoutFree(pMnode->pSdb, pRaw); + sdbSetApplyIndex(pMnode->pSdb, cbMeta.index); if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { tsem_post(&pMgmt->syncSem); } + } else { + mTrace("ver:%" PRId64 ", already apply raw:%p to sdb, last:%" PRId64, cbMeta.index, pRaw, lastApply); } } -void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +static void mndSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { // strict consistent, do nothing } -void mndSyncRollBackMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +static void mndSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { // strict consistent, do nothing } -int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { - // snapshot +static int32_t mndSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { + SMnode *pMnode = pFsm->data; + pSnapshot->lastApplyIndex = sdbGetApplyIndex(pMnode->pSdb); return 0; } @@ -182,23 +179,25 @@ int32_t mndInitSync(SMnode *pMnode) { return -1; } - SSyncInfo syncInfo = {.vgId = 1}; - SSyncCfg *pCfg = &(syncInfo.syncCfg); + SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg}; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); + syncInfo.pWal = pMgmt->pWal; + syncInfo.pFsm = mndSyncMakeFsm(pMnode); + + SSyncCfg *pCfg = &syncInfo.syncCfg; pCfg->replicaNum = pMnode->replica; pCfg->myIndex = pMnode->selfIndex; for (int32_t i = 0; i < pMnode->replica; ++i) { - SNodeInfo *pNodeInfo = &pCfg->nodeInfo[i]; - tstrncpy(pNodeInfo->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNodeInfo->nodeFqdn)); - pNodeInfo->nodePort = pMnode->replicas[i].port; + SNodeInfo *pNode = &pCfg->nodeInfo[i]; + tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); + pNode->nodePort = pMnode->replicas[i].port; } - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); - syncInfo.pWal = pMnode->syncMgmt.pWal; - syncInfo.pFsm = mndSyncMakeFsm(pMnode); - syncInfo.FpSendMsg = mndSyncSendMsg; - syncInfo.FpEqMsg = mndSyncEqMsg; - pMnode->syncMgmt.sync = syncOpen(&syncInfo); - ASSERT(pMnode->syncMgmt.sync > 0); + pMgmt->sync = syncOpen(&syncInfo); + if (pMgmt->sync <= 0) { + mError("failed to open sync since %s", terrstr()); + return -1; + } return 0; } @@ -209,16 +208,6 @@ void mndCleanupSync(SMnode *pMnode) { mndCloseWal(pMnode); } -static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) { - SMnode *pMnode = pData; - SSyncMgmt *pMgmt = &pMnode->syncMgmt; - - pMgmt->errCode = 0; - tsem_post(&pMgmt->syncSem); - - return 0; -} - int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { SWal *pWal = pMnode->syncMgmt.pWal; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp index b93adf9930..6808c412d8 100644 --- a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp +++ b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp @@ -493,9 +493,8 @@ TEST_F(MndTestSdb, 01_Write_Str) { ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2); ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1); ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2 ); - ASSERT_EQ(sdbUpdateVer(pSdb, 0), -1); - ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0); - ASSERT_EQ(sdbUpdateVer(pSdb, -1), -1); + sdbSetApplyIndex(pSdb, -1); + ASSERT_EQ(sdbGetApplyIndex(pSdb), -1); ASSERT_EQ(mnode.insertTimes, 2); ASSERT_EQ(mnode.deleteTimes, 0); @@ -537,9 +536,6 @@ TEST_F(MndTestSdb, 01_Write_Str) { ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 3); ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 4); - ASSERT_EQ(sdbUpdateVer(pSdb, 0), -1); - ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0); - ASSERT_EQ(sdbUpdateVer(pSdb, -1), -1); ASSERT_EQ(mnode.insertTimes, 3); ASSERT_EQ(mnode.deleteTimes, 0); @@ -704,8 +700,9 @@ TEST_F(MndTestSdb, 01_Write_Str) { } // write version - ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0); - ASSERT_EQ(sdbUpdateVer(pSdb, 1), 1); + sdbSetApplyIndex(pSdb, 0); + sdbSetApplyIndex(pSdb, 1); + ASSERT_EQ(sdbGetApplyIndex(pSdb), 1); ASSERT_EQ(sdbWriteFile(pSdb), 0); ASSERT_EQ(sdbWriteFile(pSdb), 0); @@ -775,7 +772,7 @@ TEST_F(MndTestSdb, 01_Read_Str) { ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2); ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1); ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 5); - ASSERT_EQ(sdbUpdateVer(pSdb, 0), 1); + ASSERT_EQ(sdbGetApplyIndex(pSdb), 1); ASSERT_EQ(mnode.insertTimes, 4); ASSERT_EQ(mnode.deleteTimes, 0); diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 6e5dde57a6..10bf4126c5 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -156,4 +156,6 @@ static int32_t sdbCreateDir(SSdb *pSdb) { return 0; } -int64_t sdbUpdateVer(SSdb *pSdb, int32_t val) { return atomic_add_fetch_64(&pSdb->curVer, val); } \ No newline at end of file +void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; } + +int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; } From c38a8055bac5384e1d4f97f93c355297fd4a843c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 23 May 2022 11:08:31 +0800 Subject: [PATCH 07/22] fix: add error code if malloc failed --- include/libs/transport/trpc.h | 20 +-- source/dnode/mnode/impl/src/mndSync.c | 173 +++++--------------------- source/libs/transport/src/trans.c | 39 +++--- 3 files changed, 61 insertions(+), 171 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index fcb00ddf01..754a203471 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -28,7 +28,7 @@ extern "C" { #define TAOS_CONN_CLIENT 1 #define IsReq(pMsg) (pMsg->msgType & 1U) -extern int tsRpcHeadSize; +extern int32_t tsRpcHeadSize; typedef struct { uint32_t clientIp; @@ -69,10 +69,10 @@ typedef struct SRpcInit { char localFqdn[TSDB_FQDN_LEN]; uint16_t localPort; // local port char * label; // for debug purpose - int numOfThreads; // number of threads to handle connections - int sessions; // number of sessions allowed + int32_t numOfThreads; // number of threads to handle connections + int32_t sessions; // number of sessions allowed int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS - int idleTime; // milliseconds, 0 means idle timer is disabled + int32_t idleTime; // milliseconds, 0 means idle timer is disabled // the following is for client app ecurity only char *user; // user name @@ -108,9 +108,9 @@ int32_t rpcInit(); void rpcCleanup(); void * rpcOpen(const SRpcInit *pRpc); void rpcClose(void *); -void * rpcMallocCont(int contLen); +void * rpcMallocCont(int32_t contLen); void rpcFreeCont(void *pCont); -void * rpcReallocCont(void *ptr, int contLen); +void * rpcReallocCont(void *ptr, int32_t contLen); // Because taosd supports multi-process mode // These functions should not be used on the server side @@ -121,10 +121,10 @@ void rpcRegisterBrokenLinkArg(SRpcMsg *msg); void rpcReleaseHandle(void *handle, int8_t type); // just release client conn to rpc instance, no close sock // These functions will not be called in the child process -void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); -void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); -int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); -void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); +void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); +int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); +void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index df7949f6a5..825b52dd9b 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,112 +17,6 @@ #include "mndSync.h" #include "mndTrans.h" -static int32_t mndInitWal(SMnode *pMnode) { - SSyncMgmt *pMgmt = &pMnode->syncMgmt; - - char path[PATH_MAX] = {0}; - snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP); - SWalCfg cfg = { - .vgId = 1, - .fsyncPeriod = 0, - .rollPeriod = -1, - .segSize = -1, - .retentionPeriod = -1, - .retentionSize = -1, - .level = TAOS_WAL_FSYNC, - }; - pMgmt->pWal = walOpen(path, &cfg); - if (pMgmt->pWal == NULL) return -1; - - return 0; -} - -static void mndCloseWal(SMnode *pMnode) { - SSyncMgmt *pMgmt = &pMnode->syncMgmt; - if (pMgmt->pWal != NULL) { - walClose(pMgmt->pWal); - pMgmt->pWal = NULL; - } -} - -static int32_t mndRestoreWal(SMnode *pMnode) { - // do nothing - return 0; - -#if 0 - - SWal *pWal = pMnode->syncMgmt.pWal; - SSdb *pSdb = pMnode->pSdb; - int64_t lastSdbVer = sdbUpdateVer(pSdb, 0); - int32_t code = -1; - - SWalReadHandle *pHandle = walOpenReadHandle(pWal); - if (pHandle == NULL) return -1; - - int64_t first = walGetFirstVer(pWal); - int64_t last = walGetLastVer(pWal); - mDebug("start to restore wal, sdbver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, lastSdbVer, first, last); - - first = TMAX(lastSdbVer + 1, first); - for (int64_t ver = first; ver >= 0 && ver <= last; ++ver) { - if (walReadWithHandle(pHandle, ver) < 0) { - mError("ver:%" PRId64 ", failed to read from wal since %s", ver, terrstr()); - goto _OVER; - } - - SWalHead *pHead = pHandle->pHead; - int64_t sdbVer = sdbUpdateVer(pSdb, 0); - if (sdbVer + 1 != ver) { - terrno = TSDB_CODE_SDB_INVALID_WAl_VER; - mError("ver:%" PRId64 ", failed to write to sdb, since inconsistent with sdbver:%" PRId64, ver, sdbVer); - goto _OVER; - } - - mTrace("ver:%" PRId64 ", will be restored, content:%p", ver, pHead->head.body); - if (sdbWriteWithoutFree(pSdb, (void *)pHead->head.body) < 0) { - mError("ver:%" PRId64 ", failed to write to sdb since %s", ver, terrstr()); - goto _OVER; - } - - sdbUpdateVer(pSdb, 1); - mDebug("ver:%" PRId64 ", is restored", ver); - } - - int64_t sdbVer = sdbUpdateVer(pSdb, 0); - mDebug("restore wal finished, sdbver:%" PRId64, sdbVer); - - mndTransPullup(pMnode); - sdbVer = sdbUpdateVer(pSdb, 0); - mDebug("pullup trans finished, sdbver:%" PRId64, sdbVer); - - if (sdbVer != lastSdbVer) { - mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer); - if (sdbWriteFile(pSdb) != 0) { - goto _OVER; - } - - if (walCommit(pWal, sdbVer) != 0) { - goto _OVER; - } - - if (walBeginSnapshot(pWal, sdbVer) < 0) { - goto _OVER; - } - - if (walEndSnapshot(pWal) < 0) { - goto _OVER; - } - } - - code = 0; - -_OVER: - walCloseReadHandle(pHandle); - return code; - -#endif -} - int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } @@ -131,18 +25,20 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM SMnode *pMnode = pFsm->data; SSdb *pSdb = pMnode->pSdb; SSyncMgmt *pMgmt = &pMnode->syncMgmt; - SyncIndex lastApply = sdbGetApplyIndex(pSdb); SSdbRaw *pRaw = pMsg->pCont; - if (cbMeta.index > lastApply) { + SSnapshot snapshot = {0}; + (*pFsm->FpGetSnapshot)(pFsm, &snapshot); + + if (cbMeta.index > snapshot.lastApplyIndex) { mTrace("ver:%" PRId64 ", apply raw:%p to sdb, role:%s", cbMeta.index, pRaw, syncStr(cbMeta.state)); - sdbWriteWithoutFree(pMnode->pSdb, pRaw); - sdbSetApplyIndex(pMnode->pSdb, cbMeta.index); + sdbWriteWithoutFree(pSdb, pRaw); + sdbSetApplyIndex(pSdb, cbMeta.index); if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { tsem_post(&pMgmt->syncSem); } } else { - mTrace("ver:%" PRId64 ", already apply raw:%p to sdb, last:%" PRId64, cbMeta.index, pRaw, lastApply); + mTrace("ver:%" PRId64 ", already apply raw:%p to sdb, last:%" PRId64, cbMeta.index, pRaw, snapshot.lastApplyIndex); } } @@ -174,7 +70,20 @@ int32_t mndInitSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; tsem_init(&pMgmt->syncSem, 0, 0); - if (mndInitWal(pMnode) < 0) { + char path[PATH_MAX + 20] = {0}; + snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP); + SWalCfg cfg = { + .vgId = 1, + .fsyncPeriod = 0, + .rollPeriod = -1, + .segSize = -1, + .retentionPeriod = -1, + .retentionSize = -1, + .level = TAOS_WAL_FSYNC, + }; + + pMgmt->pWal = walOpen(path, &cfg); + if (pMgmt->pWal == NULL) { mError("failed to open wal since %s", terrstr()); return -1; } @@ -205,56 +114,36 @@ int32_t mndInitSync(SMnode *pMnode) { void mndCleanupSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; tsem_destroy(&pMgmt->syncSem); - mndCloseWal(pMnode); + if (pMgmt->pWal != NULL) { + walClose(pMgmt->pWal); + } + + memset(pMgmt, 0, sizeof(SSyncMgmt)); } int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { - SWal *pWal = pMnode->syncMgmt.pWal; - SSdb *pSdb = pMnode->pSdb; - -#if 0 - int64_t ver = sdbUpdateVer(pSdb, 1); - if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) { - sdbUpdateVer(pSdb, -1); - mError("ver:%" PRId64 ", failed to write raw:%p to wal since %s", ver, pRaw, terrstr()); - return -1; - } - - mTrace("ver:%" PRId64 ", write to wal, raw:%p", ver, pRaw); - walCommit(pWal, ver); - walFsync(pWal, true); - - return 0; - -#else - SSyncMgmt *pMgmt = &pMnode->syncMgmt; pMgmt->errCode = 0; - SRpcMsg rsp = {0}; - rsp.code = TDMT_MND_APPLY_MSG; - rsp.contLen = sdbGetRawTotalSize(pRaw); + SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; rsp.pCont = rpcMallocCont(rsp.contLen); + if (rsp.pCont == NULL) return -1; memcpy(rsp.pCont, pRaw, rsp.contLen); - bool isWeak = false; - int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak); + const bool isWeak = false; + int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak); if (code == 0) { tsem_wait(&pMgmt->syncSem); } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { terrno = TSDB_CODE_APP_NOT_READY; - mError("failed to propose raw:%p since not leader", pRaw); - return -1; } else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - mError("failed to propose raw:%p since sync internal error", pRaw); } else { - assert(0); + terrno = TSDB_CODE_APP_ERROR; } if (code != 0) return code; return pMgmt->errCode; -#endif } bool mndIsMaster(SMnode *pMnode) { diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 5627dbfbf5..9e71c87fa5 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -17,7 +17,7 @@ #include "transComm.h" -void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { +void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOfThreads, void* fp, void* shandle) = { transInitServer, transInitClient}; void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient}; @@ -77,37 +77,38 @@ void rpcClose(void* arg) { taosMemoryFree(pRpc); return; } -void* rpcMallocCont(int contLen) { - int size = contLen + TRANS_MSG_OVERHEAD; - char* start = (char*)taosMemoryCalloc(1, (size_t)size); +void* rpcMallocCont(int32_t contLen) { + int32_t size = contLen + TRANS_MSG_OVERHEAD; + char* start = taosMemoryCalloc(1, size); if (start == NULL) { tError("failed to malloc msg, size:%d", size); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } else { tTrace("malloc mem:%p size:%d", start, size); } + return start + sizeof(STransMsgHead); } -void rpcFreeCont(void* cont) { - // impl - if (cont == NULL) { - return; - } +void rpcFreeCont(void* cont) { + if (cont == NULL) return; taosMemoryFree((char*)cont - TRANS_MSG_OVERHEAD); tTrace("free mem: %p", (char*)cont - TRANS_MSG_OVERHEAD); } -void* rpcReallocCont(void* ptr, int contLen) { - if (ptr == NULL) { - return rpcMallocCont(contLen); - } - char* st = (char*)ptr - TRANS_MSG_OVERHEAD; - int sz = contLen + TRANS_MSG_OVERHEAD; + +void* rpcReallocCont(void* ptr, int32_t contLen) { + if (ptr == NULL) return rpcMallocCont(contLen); + + char* st = (char*)ptr - TRANS_MSG_OVERHEAD; + int32_t sz = contLen + TRANS_MSG_OVERHEAD; st = taosMemoryRealloc(st, sz); if (st == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + return st + TRANS_MSG_OVERHEAD; } @@ -116,8 +117,8 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) { assert(0); } -int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } -void rpcCancelRequest(int64_t rid) { return; } +int32_t rpcReportProgress(void* pConn, char* pCont, int32_t contLen) { return -1; } +void rpcCancelRequest(int64_t rid) { return; } void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { transSendRequest(shandle, pEpSet, pMsg, NULL); @@ -129,8 +130,8 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { transSendRecv(shandle, pEpSet, pMsg, pRsp); } -void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } -int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); } +void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } +int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); } void rpcRefHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); From 97b1e95ad8ea575d047d87a61888c3f9f1880332 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 12:14:53 +0800 Subject: [PATCH 08/22] enh(sync) sync/mnode integration, add term, currentTerm in cbMeta --- include/libs/sync/sync.h | 2 ++ source/dnode/mnode/impl/src/mndSync.c | 32 ++++++++++++++++++------ source/libs/sync/src/syncAppendEntries.c | 2 ++ source/libs/sync/src/syncCommit.c | 4 ++- 4 files changed, 32 insertions(+), 8 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 9b6593e4b5..872785abe5 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -78,6 +78,8 @@ typedef struct SFsmCbMeta { int32_t code; ESyncState state; uint64_t seqNum; + SyncTerm term; + SyncTerm currentTerm; } SFsmCbMeta; typedef struct SSyncFSM { diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 0b7ed04819..5eec46056b 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -139,14 +139,32 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM SMnode *pMnode = pFsm->data; SSyncMgmt *pMgmt = &pMnode->syncMgmt; - SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; - pApplyMsg->info.node = pFsm->data; - mndProcessApplyMsg(pApplyMsg); - sdbUpdateVer(pMnode->pSdb, 1); + if (cbMeta.term < cbMeta.currentTerm) { + // restoring + + SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; + pApplyMsg->info.node = pFsm->data; + mndProcessApplyMsg(pApplyMsg); + //sdbUpdateVer(pMnode->pSdb, 1); ==> sdbSetVer(cbMeta.index, cbMeta.term); + + // mndTransPullup(pMnode); + + } else if (cbMeta.term == cbMeta.currentTerm) { + // restore finish + + SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; + pApplyMsg->info.node = pFsm->data; + mndProcessApplyMsg(pApplyMsg); + + //sdbUpdateVer(pMnode->pSdb, 1); ==> sdbSetVer(cbMeta.index, cbMeta.term); + + if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { + tsem_post(&pMgmt->syncSem); + } + } else { + ASSERT(0); + } - if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { - tsem_post(&pMgmt->syncSem); - } } } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 1a5d418e75..bf1eeb4186 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -332,6 +332,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { cbMeta.code = 0; cbMeta.state = ths->state; cbMeta.seqNum = pEntry->seqNum; + cbMeta.term = pEntry->term; + cbMeta.currentTerm = ths->pRaftStore->currentTerm; ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 0f17cf267e..01e408e543 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -110,6 +110,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { cbMeta.code = 0; cbMeta.state = pSyncNode->state; cbMeta.seqNum = pEntry->seqNum; + cbMeta.term = pEntry->term; + cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta); } @@ -162,4 +164,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { } } return false; -} \ No newline at end of file +} From 89c1e823374ec12dd421ed972d427e754e36ae3f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 23 May 2022 13:05:35 +0800 Subject: [PATCH 09/22] refactor: sync integrate into mnode --- source/dnode/mnode/impl/inc/mndInt.h | 5 +++-- source/dnode/mnode/impl/inc/mndSync.h | 3 +++ source/dnode/mnode/impl/src/mndSync.c | 24 +++++++++++++++++++++++- source/dnode/mnode/impl/src/mndTrans.c | 10 ---------- source/dnode/mnode/impl/src/mnode.c | 16 +++++++--------- 5 files changed, 36 insertions(+), 22 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 16945b1403..5a1653b937 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -75,9 +75,10 @@ typedef struct { } STelemMgmt; typedef struct { - int32_t errCode; - sem_t syncSem; SWal *pWal; + int32_t errCode; + bool restored; + sem_t syncSem; int64_t sync; ESyncState state; } SSyncMgmt; diff --git a/source/dnode/mnode/impl/inc/mndSync.h b/source/dnode/mnode/impl/inc/mndSync.h index fe557cdeac..7f35ff5927 100644 --- a/source/dnode/mnode/impl/inc/mndSync.h +++ b/source/dnode/mnode/impl/inc/mndSync.h @@ -25,7 +25,10 @@ extern "C" { int32_t mndInitSync(SMnode *pMnode); void mndCleanupSync(SMnode *pMnode); bool mndIsMaster(SMnode *pMnode); +bool mndIsRestored(SMnode *pMnode); int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw); +void mndSyncStart(SMnode *pMnode); +void mndSyncStop(SMnode *pMnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 825b52dd9b..b8ee63d05e 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -68,7 +68,6 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { int32_t mndInitSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - tsem_init(&pMgmt->syncSem, 0, 0); char path[PATH_MAX + 20] = {0}; snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP); @@ -102,6 +101,7 @@ int32_t mndInitSync(SMnode *pMnode) { pNode->nodePort = pMnode->replicas[i].port; } + tsem_init(&pMgmt->syncSem, 0, 0); pMgmt->sync = syncOpen(&syncInfo); if (pMgmt->sync <= 0) { mError("failed to open sync since %s", terrstr()); @@ -146,8 +146,30 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { return pMgmt->errCode; } +void mndSyncStart(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + int64_t lastApplyIndex = sdbGetApplyIndex(pSdb); + + syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb); + syncStart(pMnode->syncMgmt.sync); + + int64_t applyIndex = sdbGetApplyIndex(pSdb); + mndTransPullup(pMnode); + mDebug("pullup trans finished, applyIndex:%" PRId64, applyIndex); + if (applyIndex != lastApplyIndex) { + mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastApplyIndex, applyIndex); + sdbWriteFile(pSdb); + } + + pMnode->syncMgmt.restored = true; +} + +void mndSyncStop(SMnode *pMnode) { syncStop(pMnode->syncMgmt.sync); } + bool mndIsMaster(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; pMgmt->state = syncGetMyRole(pMgmt->sync); return pMgmt->state == TAOS_SYNC_STATE_LEADER; } + +bool mndIsRestored(SMnode *pMnode) { return pMnode->syncMgmt.restored; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 8e19b023cc..e4a29365e7 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -682,16 +682,6 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { } mDebug("trans:%d, sync finished", pTrans->id); - -// do it in state machine commit cb -#if 0 - code = sdbWriteWithout(pMnode->pSdb, pRaw); - if (code != 0) { - mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); - return -1; - } -#endif - return 0; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 9a68e0e1f0..554556f5b0 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -86,7 +86,7 @@ static void *mndThreadFp(void *param) { lastTime++; taosMsleep(100); if (pMnode->stopped) break; - if (!mndIsMaster(pMnode)) continue; + if (!mndIsMaster(pMnode) || !mndIsRestored(pMnode)) continue; if (lastTime % (tsTransPullupInterval * 10) == 0) { mndPullupTrans(pMnode); @@ -337,13 +337,12 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { } int32_t mndStart(SMnode *pMnode) { - syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb); - syncStart(pMnode->syncMgmt.sync); + mndSyncStart(pMnode); return mndInitTimer(pMnode); } -void mndStop(SMnode *pMnode) { - syncStop(pMnode->syncMgmt.sync); +void mndStop(SMnode *pMnode) { + mndSyncStop(pMnode); return mndCleanupTimer(pMnode); } @@ -357,7 +356,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; void *ahandle = pMsg->info.ahandle; int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; - + if (syncEnvIsStart()) { SSyncNode *pSyncNode = syncNodeAcquire(pMnode->syncMgmt.sync); assert(pSyncNode != NULL); @@ -444,7 +443,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { return ret; - return 0; } @@ -454,7 +452,7 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) { mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle); if (IsReq(pMsg)) { - if (!mndIsMaster(pMnode)) { + if (!mndIsMaster(pMnode) || !mndIsRestored(pMnode)) { terrno = TSDB_CODE_APP_NOT_READY; mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); return -1; @@ -514,7 +512,7 @@ int64_t mndGenerateUid(char *name, int32_t len) { int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, SMonGrantInfo *pGrantInfo) { - if (!mndIsMaster(pMnode)) return -1; + if (!mndIsMaster(pMnode) || !mndIsRestored(pMnode)) return -1; SSdb *pSdb = pMnode->pSdb; int64_t ms = taosGetTimestampMs(); From 68b7f6946d50644ccfb614ccd2f335b4f3d6f84e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 23 May 2022 13:14:54 +0800 Subject: [PATCH 10/22] refactor: sync integrate into mnode --- include/dnode/mnode/sdb/sdb.h | 3 +++ source/dnode/mnode/impl/src/mndSync.c | 1 + source/dnode/mnode/sdb/src/sdb.c | 5 +++++ source/dnode/mnode/sdb/src/sdbFile.c | 22 +++++++++++++++++++--- tests/test/c/sdbDump.c | 3 ++- 5 files changed, 30 insertions(+), 4 deletions(-) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 3a885dd3da..94d41a7416 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -312,6 +312,8 @@ int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type); */ void sdbSetApplyIndex(SSdb *pSdb, int64_t index); int64_t sdbGetApplyIndex(SSdb *pSdb); +void sdbSetApplyTerm(SSdb *pSdb, int64_t term); +int64_t sdbGetApplyTerm(SSdb *pSdb); SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen); void sdbFreeRaw(SSdbRaw *pRaw); @@ -340,6 +342,7 @@ typedef struct SSdb { char *tmpDir; int64_t lastCommitVer; int64_t curVer; + int64_t curTerm; int64_t tableVer[SDB_MAX]; int64_t maxId[SDB_MAX]; EKeyType keyTypes[SDB_MAX]; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index b8ee63d05e..28b8382e7c 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -53,6 +53,7 @@ static void mndSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c static int32_t mndSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { SMnode *pMnode = pFsm->data; pSnapshot->lastApplyIndex = sdbGetApplyIndex(pMnode->pSdb); + pSnapshot->lastApplyTerm = sdbGetApplyTerm(pMnode->pSdb); return 0; } diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 10bf4126c5..7b90d8acb5 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -53,6 +53,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { } pSdb->curVer = -1; + pSdb->curTerm = -1; pSdb->lastCommitVer = -1; pSdb->pMnode = pOption->pMnode; mDebug("sdb init successfully"); @@ -159,3 +160,7 @@ static int32_t sdbCreateDir(SSdb *pSdb) { void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; } int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; } + +void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; } + +int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->curTerm; } diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index a391ea8d03..b000c208c8 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -65,6 +65,16 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { return -1; } + ret = taosReadFile(pFile, &pSdb->curTerm, sizeof(int64_t)); + if (ret < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (ret != sizeof(int64_t)) { + terrno = TSDB_CODE_FILE_CORRUPTED; + return -1; + } + for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { int64_t maxId = 0; ret = taosReadFile(pFile, &maxId, sizeof(int64_t)); @@ -123,6 +133,11 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) { return -1; } + if (taosWriteFile(pFile, &pSdb->curTerm, sizeof(int64_t)) != sizeof(int64_t)) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { int64_t maxId = 0; if (i < SDB_MAX) { @@ -182,6 +197,7 @@ int32_t sdbReadFile(SSdb *pSdb) { if (sdbReadFileHead(pSdb, pFile) != 0) { mError("failed to read file:%s head since %s", file, terrstr()); pSdb->curVer = -1; + pSdb->curTerm = -1; taosMemoryFree(pRaw); taosCloseFile(&pFile); return -1; @@ -256,8 +272,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { char curfile[PATH_MAX] = {0}; snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); - mDebug("start to write file:%s, current ver:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer, - pSdb->lastCommitVer); + mDebug("start to write file:%s, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer, + pSdb->curTerm, pSdb->lastCommitVer); TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { @@ -350,7 +366,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { mError("failed to write file:%s since %s", curfile, tstrerror(code)); } else { pSdb->lastCommitVer = pSdb->curVer; - mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer); + mDebug("write file:%s successfully, ver:%" PRId64 " term:%" PRId64, curfile, pSdb->lastCommitVer, pSdb->curTerm); } terrno = code; diff --git a/tests/test/c/sdbDump.c b/tests/test/c/sdbDump.c index 2bc60f777c..1d3eba7cde 100644 --- a/tests/test/c/sdbDump.c +++ b/tests/test/c/sdbDump.c @@ -262,7 +262,7 @@ void dumpCluster(SSdb *pSdb, SJson *json) { } void dumpTrans(SSdb *pSdb, SJson *json) { - void *pIter = NULL; + void *pIter = NULL; SJson *items = tjsonCreateObject(); tjsonAddItemToObject(json, "transactions", items); @@ -294,6 +294,7 @@ void dumpTrans(SSdb *pSdb, SJson *json) { void dumpHeader(SSdb *pSdb, SJson *json) { tjsonAddIntegerToObject(json, "sver", 1); tjsonAddStringToObject(json, "curVer", i642str(pSdb->curVer)); + tjsonAddStringToObject(json, "curTerm", i642str(pSdb->curTerm)); SJson *maxIdsJson = tjsonCreateObject(); tjsonAddItemToObject(json, "maxIds", maxIdsJson); From cb2527f71f7a7c6894e8eecf193428a727a00f77 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 23 May 2022 13:44:49 +0800 Subject: [PATCH 11/22] refactor: sync integrate into mnode --- include/dnode/mnode/mnode.h | 1 - source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 2 -- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 28 --------------------- source/dnode/mnode/impl/src/mnode.c | 6 ----- 4 files changed, 37 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 5b028d5055..f2c8c916c8 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -90,7 +90,6 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); */ int32_t mndProcessMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); -int32_t mndProcessApplyMsg(SRpcMsg *pMsg); /** * @brief Generate machine code diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 380ae63b8d..75e83d6547 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -33,7 +33,6 @@ typedef struct SMnodeMgmt { SSingleWorker readWorker; SSingleWorker writeWorker; SSingleWorker syncWorker; - SSingleWorker applyWorker; SSingleWorker monitorWorker; SReplica replicas[TSDB_MAX_REPLICA]; int8_t replica; @@ -60,7 +59,6 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt); int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutNodeMsgToApplyQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 0f1d325237..f408992aa6 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -62,18 +62,6 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { mndProcessSyncMsg(pMsg); } -static void mmProcessApplyQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { - SMnodeMgmt *pMgmt = pInfo->ahandle; - dTrace("msg:%p, get from mnode-apply queue", pMsg); - - pMsg->info.node = pMgmt->pMnode; - mndProcessApplyMsg(pMsg); - - dTrace("msg:%p, is freed", pMsg); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); -} - static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pWorker->queue, pMsg); @@ -88,10 +76,6 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); } -int32_t mmPutNodeMsgToApplyQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutNodeMsgToWorker(&pMgmt->applyWorker, pMsg); -} - int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); } @@ -179,18 +163,6 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { return -1; } - SSingleWorkerCfg aCfg = { - .min = 1, - .max = 1, - .name = "mnode-apply", - .fp = (FItem)mmProcessApplyQueue, - .param = pMgmt, - }; - if (tSingleWorkerInit(&pMgmt->applyWorker, &aCfg) != 0) { - dError("failed to start mnode mnode-apply worker since %s", terrstr()); - return -1; - } - SSingleWorkerCfg mCfg = { .min = 1, .max = 1, diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 554556f5b0..4e3c8b8773 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -346,12 +346,6 @@ void mndStop(SMnode *pMnode) { return mndCleanupTimer(pMnode); } -int32_t mndProcessApplyMsg(SRpcMsg *pMsg) { - SSdbRaw *pRaw = pMsg->pCont; - SMnode *pMnode = pMsg->info.node; - return sdbWriteWithoutFree(pMnode->pSdb, pRaw); -} - int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; void *ahandle = pMsg->info.ahandle; From 8bbae8dd125c38d1f8dc042b08c1ab02f1bbaabe Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 23 May 2022 14:32:44 +0800 Subject: [PATCH 12/22] refactor: pullup after sorting by transId --- source/dnode/mnode/impl/src/mndTrans.c | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index e4a29365e7..821b04a017 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1343,19 +1343,35 @@ _OVER: return code; } -void mndTransPullup(SMnode *pMnode) { - STrans *pTrans = NULL; - void *pIter = NULL; +static int32_t mndCompareTransId(int32_t *pTransId1, int32_t *pTransId2) { return *pTransId1 >= *pTransId2 ? 1 : 0; } +void mndTransPullup(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_TRANS), sizeof(int32_t)); + if (pArray == NULL) return; + + void *pIter = NULL; while (1) { + STrans *pTrans = NULL; pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans); if (pIter == NULL) break; + taosArrayPush(pArray, &pTrans->id); + sdbRelease(pSdb, pTrans); + } - mndTransExecute(pMnode, pTrans); - sdbRelease(pMnode->pSdb, pTrans); + taosArraySort(pArray, (__compar_fn_t)mndCompareTransId); + + for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { + int32_t *pTransId = taosArrayGet(pArray, i); + STrans *pTrans = mndAcquireTrans(pMnode, *pTransId); + if (pTrans != NULL) { + mndTransExecute(pMnode, pTrans); + } + mndReleaseTrans(pMnode, pTrans); } sdbWriteFile(pMnode->pSdb); + taosArrayDestroy(pArray); } static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { From 4535722957103e94d3bbd595047af4bc92079238 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 15:41:04 +0800 Subject: [PATCH 13/22] enh(sync) sync/mnode integration, syncStart async -> sync --- include/libs/sync/sync.h | 2 +- source/libs/sync/inc/syncIO.h | 4 ++-- source/libs/sync/inc/syncInt.h | 5 +++++ source/libs/sync/src/syncAppendEntries.c | 20 ++++++++++++++++- source/libs/sync/src/syncCommit.c | 25 ++++++++++++++++++---- source/libs/sync/src/syncMain.c | 21 +++++++++++++++++- source/libs/sync/test/syncSnapshotTest.cpp | 2 ++ 7 files changed, 70 insertions(+), 9 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 872785abe5..1eed353f33 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -87,6 +87,7 @@ typedef struct SSyncFSM { void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); + void (*FpRestoreFinish)(struct SSyncFSM* pFsm); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); } SSyncFSM; @@ -119,7 +120,6 @@ typedef struct SSyncLogStore { } SSyncLogStore; - typedef struct SSyncInfo { SyncGroupId vgId; SSyncCfg syncCfg; diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index f65a317694..b69c087b5f 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -36,8 +36,8 @@ typedef struct SSyncIO { STaosQueue *pMsgQ; STaosQset * pQset; TdThread consumerTid; - void *serverRpc; - void *clientRpc; + void * serverRpc; + void * clientRpc; SEpSet myAddr; SMsgCb msgcb; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 768e1c1cf1..d3345620e9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -147,6 +147,11 @@ typedef struct SSyncNode { // tools SSyncRespMgr* pSyncRespMgr; + // restore state + bool restoreFinish; + sem_t restoreSem; + SSnapshot* pSnapshot; + } SSyncNode; // open/close -------------- diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index bf1eeb4186..18735f5b71 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -324,7 +324,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - // if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -335,6 +334,15 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { cbMeta.term = pEntry->term; cbMeta.currentTerm = ths->pRaftStore->currentTerm; ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); + + bool needExecute = true; + if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) { + needExecute = false; + } + + if (needExecute) { + ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); + } } // config change @@ -351,6 +359,16 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } } + // restore finish + if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) { + if (ths->restoreFinish == false) { + ths->pFsm->FpRestoreFinish(ths->pFsm); + ths->restoreFinish = true; + + tsem_post(&ths->restoreSem); + } + } + rpcFreeCont(rpcMsg.pCont); syncEntryDestory(pEntry); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 01e408e543..557f69a2ce 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -102,7 +102,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - // if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -110,9 +109,17 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { cbMeta.code = 0; cbMeta.state = pSyncNode->state; cbMeta.seqNum = pEntry->seqNum; - cbMeta.term = pEntry->term; - cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; - pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta); + cbMeta.term = pEntry->term; + cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; + + bool needExecute = true; + if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) { + needExecute = false; + } + + if (needExecute) { + pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta); + } } // config change @@ -129,6 +136,16 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { } } + // restore finish + if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) { + if (pSyncNode->restoreFinish == false) { + pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm); + pSyncNode->restoreFinish = true; + + tsem_post(&pSyncNode->restoreSem); + } + } + rpcFreeCont(rpcMsg.pCont); syncEntryDestory(pEntry); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f4f09d0f7b..bffebc1693 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -242,7 +242,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { return ret; } -void syncSetMsgCb(int64_t rid, const SMsgCb *msgcb) { +void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid); @@ -492,6 +492,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0); assert(pSyncNode->pSyncRespMgr != NULL); + // restore state + pSyncNode->restoreFinish = false; + pSyncNode->pSnapshot = NULL; + if (pSyncNode->pFsm->FpGetSnapshot != NULL) { + pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); + } + tsem_init(&(pSyncNode->restoreSem), 0, 0); + // start in syncNodeStart // start raft // syncNodeBecomeFollower(pSyncNode); @@ -511,6 +520,8 @@ void syncNodeStart(SSyncNode* pSyncNode) { // use this now syncNodeAppendNoop(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica + + tsem_wait(&pSyncNode->restoreSem); return; } @@ -520,6 +531,8 @@ void syncNodeStart(SSyncNode* pSyncNode) { int32_t ret = 0; // ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); + + tsem_wait(&pSyncNode->restoreSem); } void syncNodeStartStandBy(SSyncNode* pSyncNode) { @@ -556,6 +569,12 @@ void syncNodeClose(SSyncNode* pSyncNode) { taosMemoryFree(pSyncNode->pFsm); } + if (pSyncNode->pSnapshot != NULL) { + taosMemoryFree(pSyncNode->pSnapshot); + } + + tsem_destroy(&pSyncNode->restoreSem); + // free memory in syncFreeNode // taosMemoryFree(pSyncNode); } diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 62bda5b22e..8ccd698907 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -160,6 +160,8 @@ SyncClientRequest *step1(const SRpcMsg *pMsg) { } int main(int argc, char **argv) { + sprintf(tsTempDir, "%s", "."); + // taosInitLog((char *)"syncTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; From dc82c1ba226f74805edbd519b959b4254b805572 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 16:00:03 +0800 Subject: [PATCH 14/22] enh(sync) sync/mnode integration, syncStart async -> sync --- source/dnode/mnode/impl/src/mndSync.c | 39 ++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 5eec46056b..16c6b7f7fa 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -128,6 +128,18 @@ int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; + pApplyMsg->info.node = pFsm->data; + mndProcessApplyMsg(pApplyMsg); + //sdbUpdateVer(pMnode->pSdb, 1); ==> sdbSetVer(cbMeta.index, cbMeta.term); + + if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { + SMnode *pMnode = pFsm->data; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + tsem_post(&pMgmt->syncSem); + } + +#if 0 SyncIndex beginIndex = SYNC_INDEX_INVALID; if (pFsm->FpGetSnapshot != NULL) { SSnapshot snapshot = {0}; @@ -166,8 +178,23 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM } } +#endif + } +int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { + SMnode *pMnode = pFsm->data; + pSnapshot->lastApplyIndex = pMnode->pSdb->lastCommitVer; + //pSnapshot->lastApplyTerm = ...; + return 0; +} + +void mndRestoreFinish(struct SSyncFSM* pFsm) { + SMnode *pMnode = pFsm->data; + mndTransPullup(pMnode); +} + +#if 0 void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { // strict consistent, do nothing } @@ -175,19 +202,17 @@ void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta void mndSyncRollBackMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { // strict consistent, do nothing } - -int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { - // snapshot - return 0; -} +#endif SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pMnode; pFsm->FpCommitCb = mndSyncCommitMsg; - pFsm->FpPreCommitCb = mndSyncPreCommitMsg; - pFsm->FpRollBackCb = mndSyncRollBackMsg; + pFsm->FpPreCommitCb = NULL; + pFsm->FpRollBackCb = NULL; pFsm->FpGetSnapshot = mndSyncGetSnapshot; + pFsm->FpRestoreFinish = mndRestoreFinish; + pFsm->FpRestoreSnapshot = NULL; return pFsm; } From 2be77666d49ab15b6dea9ff3b3101c286c0f0360 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 23 May 2022 16:39:36 +0800 Subject: [PATCH 15/22] test: delete expired utest of mnode --- source/dnode/mgmt/test/mnode/CMakeLists.txt | 8 ++++---- source/dnode/mnode/impl/test/trans/CMakeLists.txt | 8 ++++---- source/dnode/mnode/impl/test/trans/trans2.cpp | 9 +++++++++ 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/source/dnode/mgmt/test/mnode/CMakeLists.txt b/source/dnode/mgmt/test/mnode/CMakeLists.txt index e83f5dbbec..788cf53976 100644 --- a/source/dnode/mgmt/test/mnode/CMakeLists.txt +++ b/source/dnode/mgmt/test/mnode/CMakeLists.txt @@ -4,7 +4,7 @@ target_link_libraries( dmnodeTest sut ) -add_test( - NAME dmnodeTest - COMMAND dmnodeTest -) +#add_test( +# NAME dmnodeTest +# COMMAND dmnodeTest +#) diff --git a/source/dnode/mnode/impl/test/trans/CMakeLists.txt b/source/dnode/mnode/impl/test/trans/CMakeLists.txt index 55fc3abbc2..22ff85563f 100644 --- a/source/dnode/mnode/impl/test/trans/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/trans/CMakeLists.txt @@ -31,7 +31,7 @@ target_include_directories( PUBLIC "${TD_SOURCE_DIR}/include/dnode/mnode" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../../inc" ) -add_test( - NAME transTest2 - COMMAND transTest2 -) +#add_test( +# NAME transTest2 +# COMMAND transTest2 +#) diff --git a/source/dnode/mnode/impl/test/trans/trans2.cpp b/source/dnode/mnode/impl/test/trans/trans2.cpp index c4ed48fe60..08ab70a989 100644 --- a/source/dnode/mnode/impl/test/trans/trans2.cpp +++ b/source/dnode/mnode/impl/test/trans/trans2.cpp @@ -23,6 +23,11 @@ int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { return -1; } +int32_t putToQueue(void *pMgmt, SRpcMsg *pMsg) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; +} + class MndTestTrans2 : public ::testing::Test { protected: static void InitLog() { @@ -55,6 +60,9 @@ class MndTestTrans2 : public ::testing::Test { msgCb.reportStartupFp = reportStartup; msgCb.sendReqFp = sendReq; msgCb.sendRspFp = sendRsp; + msgCb.queueFps[SYNC_QUEUE] = putToQueue; + msgCb.queueFps[WRITE_QUEUE] = putToQueue; + msgCb.queueFps[READ_QUEUE] = putToQueue; msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack tmsgSetDefault(&msgCb); @@ -77,6 +85,7 @@ class MndTestTrans2 : public ::testing::Test { static void SetUpTestSuite() { InitLog(); walInit(); + syncInit(); InitMnode(); } From ee4b694a3540c9f185c5e349625e6d8a4ca73cd6 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 18:10:04 +0800 Subject: [PATCH 16/22] enh(sync) sync/mnode integration, syncStart async -> sync --- source/libs/sync/src/syncAppendEntries.c | 4 +++- source/libs/sync/src/syncCommit.c | 4 +++- source/libs/sync/test/syncConfigChangeTest.cpp | 5 +++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 18735f5b71..ef66d580fb 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -362,7 +362,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // restore finish if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) { if (ths->restoreFinish == false) { - ths->pFsm->FpRestoreFinish(ths->pFsm); + if (ths->pFsm->FpRestoreFinish != NULL) { + ths->pFsm->FpRestoreFinish(ths->pFsm); + } ths->restoreFinish = true; tsem_post(&ths->restoreSem); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 557f69a2ce..aa4bcb4fae 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -139,7 +139,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { // restore finish if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) { if (pSyncNode->restoreFinish == false) { - pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm); + if (pSyncNode->pFsm->FpRestoreFinish != NULL) { + pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm); + } pSyncNode->restoreFinish = true; tsem_post(&pSyncNode->restoreSem); diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index cff692239a..0850ef6343 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -73,12 +73,17 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } +void FpRestoreFinishCb(struct SSyncFSM* pFsm) { + sTrace("==callback== ==FpRestoreFinishCb=="); +} + SSyncFSM* createFsm() { SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); pFsm->FpCommitCb = CommitCb; pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; pFsm->FpGetSnapshot = GetSnapshotCb; + pFsm->FpRestoreFinish = FpRestoreFinishCb; return pFsm; } From 5ec450d01cac6888af809566c356ce7ab6a547ef Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 23 May 2022 18:15:31 +0800 Subject: [PATCH 17/22] fix: stop sync after queue is empty --- source/dnode/mnode/impl/src/mndSync.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 28b8382e7c..68f1d0cd8e 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -109,11 +109,15 @@ int32_t mndInitSync(SMnode *pMnode) { return -1; } + mDebug("mnode sync is opened, id:%" PRId64, pMgmt->sync); return 0; } void mndCleanupSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; + syncStop(pMgmt->sync); + mDebug("sync:%" PRId64 " is stopped", pMgmt->sync); + tsem_destroy(&pMgmt->syncSem); if (pMgmt->pWal != NULL) { walClose(pMgmt->pWal); @@ -163,9 +167,10 @@ void mndSyncStart(SMnode *pMnode) { } pMnode->syncMgmt.restored = true; + mDebug("sync:%" PRId64 " is started", pMnode->syncMgmt.sync); } -void mndSyncStop(SMnode *pMnode) { syncStop(pMnode->syncMgmt.sync); } +void mndSyncStop(SMnode *pMnode) {} bool mndIsMaster(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; From aeb86ee8ac6d27554e4b010bc690b4aaa95e8e80 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 18:22:04 +0800 Subject: [PATCH 18/22] enh(sync) vnode FpRestoreFinish --- source/dnode/vnode/src/vnd/vnodeSync.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 8659c41807..538d3f7f87 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -141,5 +141,6 @@ SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; + pFsm->FpRestoreFinish = NULL; return pFsm; } \ No newline at end of file From 102a08f44be0445e2ea34c3485dd532250765aae Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 20:26:12 +0800 Subject: [PATCH 19/22] enh(sync) sync/mnode integration, add log --- source/libs/sync/src/syncAppendEntries.c | 1 + source/libs/sync/src/syncCommit.c | 1 + 2 files changed, 2 insertions(+) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index ef66d580fb..5f90325f65 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -368,6 +368,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ths->restoreFinish = true; tsem_post(&ths->restoreSem); + sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post"); } } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index aa4bcb4fae..f5486cd1f0 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -145,6 +145,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { pSyncNode->restoreFinish = true; tsem_post(&pSyncNode->restoreSem); + sInfo("==syncMaybeAdvanceCommitIndex== RestoreFinish tsem_post"); } } From 1d7bdbc351cf788bf445eb28f0ebc3616061763e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 21:37:13 +0800 Subject: [PATCH 20/22] fix(sync) sync/mnode integration dead lock --- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncAppendEntries.c | 5 ++++- source/libs/sync/src/syncCommit.c | 5 ++++- source/libs/sync/src/syncMain.c | 27 ++++++++++++++++++++++-- source/libs/transport/inc/transComm.h | 2 +- 5 files changed, 35 insertions(+), 6 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index d3345620e9..9246041b81 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -149,7 +149,7 @@ typedef struct SSyncNode { // restore state bool restoreFinish; - sem_t restoreSem; + //sem_t restoreSem; SSnapshot* pSnapshot; } SSyncNode; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 5f90325f65..fa735e71c0 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -366,9 +366,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ths->pFsm->FpRestoreFinish(ths->pFsm); } ths->restoreFinish = true; + sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId); + /* tsem_post(&ths->restoreSem); - sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post"); + sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post %p", ths); + */ } } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index f5486cd1f0..18c6f8930a 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -143,9 +143,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm); } pSyncNode->restoreFinish = true; + sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId); + /* tsem_post(&pSyncNode->restoreSem); - sInfo("==syncMaybeAdvanceCommitIndex== RestoreFinish tsem_post"); + sInfo("==syncMaybeAdvanceCommitIndex== RestoreFinish tsem_post %p", pSyncNode); + */ } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index bffebc1693..821ed364e8 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -499,7 +499,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); } - tsem_init(&(pSyncNode->restoreSem), 0, 0); + //tsem_init(&(pSyncNode->restoreSem), 0, 0); // start in syncNodeStart // start raft @@ -521,7 +521,19 @@ void syncNodeStart(SSyncNode* pSyncNode) { syncNodeAppendNoop(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica + /* + sInfo("==syncNodeStart== RestoreFinish begin 1 replica tsem_wait %p", pSyncNode); tsem_wait(&pSyncNode->restoreSem); + sInfo("==syncNodeStart== RestoreFinish end 1 replica tsem_wait %p", pSyncNode); + */ + + /* + while (pSyncNode->restoreFinish != true) { + taosMsleep(10); + } + */ + + sInfo("==syncNodeStart== restoreFinish ok 1 replica %p vgId:%d", pSyncNode, pSyncNode->vgId); return; } @@ -532,7 +544,18 @@ void syncNodeStart(SSyncNode* pSyncNode) { // ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); + /* + sInfo("==syncNodeStart== RestoreFinish begin multi replica tsem_wait %p", pSyncNode); tsem_wait(&pSyncNode->restoreSem); + sInfo("==syncNodeStart== RestoreFinish end multi replica tsem_wait %p", pSyncNode); + */ + + /* + while (pSyncNode->restoreFinish != true) { + taosMsleep(10); + } + */ + sInfo("==syncNodeStart== restoreFinish ok multi replica %p vgId:%d", pSyncNode, pSyncNode->vgId); } void syncNodeStartStandBy(SSyncNode* pSyncNode) { @@ -573,7 +596,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { taosMemoryFree(pSyncNode->pSnapshot); } - tsem_destroy(&pSyncNode->restoreSem); + //tsem_destroy(&pSyncNode->restoreSem); // free memory in syncFreeNode // taosMemoryFree(pSyncNode); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index e71e19edce..30f799f39e 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -94,7 +94,7 @@ typedef void* queue[2]; /* Return the structure holding the given element. */ #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) -#define TRANS_RETRY_COUNT_LIMIT 20 // retry count limit +#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit #define TRANS_RETRY_INTERVAL 15 // ms retry interval #define TRANS_CONN_TIMEOUT 3 // connect timeout From 43f2a51e4b01033182b5dc3b9ff0940fdb390d16 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 22:08:14 +0800 Subject: [PATCH 21/22] fix(sync) sync/mnode eq msg --- source/dnode/mnode/impl/src/mndSync.c | 8 +++++++- source/dnode/vnode/src/vnd/vnodeSync.c | 8 +++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index b7cae58b12..63809a3d76 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,7 +17,13 @@ #include "mndSync.h" #include "mndTrans.h" -int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } +int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); + if (code != 0) { + rpcFreeCont(pMsg->pCont); + } + return code; +} int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 538d3f7f87..882ee912cd 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -56,7 +56,13 @@ void vnodeSyncStart(SVnode *pVnode) { void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } -int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } +int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); + if (code != 0) { + rpcFreeCont(pMsg->pCont); + } + return code; +} int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } From 924b334b00169fa4faaf339cb5191f1df1829370 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 24 May 2022 00:07:09 +0800 Subject: [PATCH 22/22] fix: avoid memory leak whilel stop sync --- source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 31 ++++++++++++--------- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 21 ++++++++++++++ source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 10 ++++++- source/dnode/mnode/impl/src/mndSync.c | 7 +++-- source/dnode/mnode/impl/src/mnode.c | 2 -- 5 files changed, 52 insertions(+), 19 deletions(-) diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 75e83d6547..030d4b309e 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -24,19 +24,22 @@ extern "C" { #endif typedef struct SMnodeMgmt { - SDnodeData *pData; - SMnode *pMnode; - SMsgCb msgCb; - const char *path; - const char *name; - SSingleWorker queryWorker; - SSingleWorker readWorker; - SSingleWorker writeWorker; - SSingleWorker syncWorker; - SSingleWorker monitorWorker; - SReplica replicas[TSDB_MAX_REPLICA]; - int8_t replica; - int8_t selfIndex; + SDnodeData *pData; + SMnode *pMnode; + SMsgCb msgCb; + const char *path; + const char *name; + SSingleWorker queryWorker; + SSingleWorker readWorker; + SSingleWorker writeWorker; + SSingleWorker syncWorker; + SSingleWorker monitorWorker; + SReplica replicas[TSDB_MAX_REPLICA]; + int8_t replica; + int8_t selfIndex; + bool stopped; + int32_t refCount; + TdThreadRwlock lock; } SMnodeMgmt; // mmFile.c @@ -45,6 +48,8 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed); // mmInt.c int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg); +int32_t mmAcquire(SMnodeMgmt *pMgmt); +void mmRelease(SMnodeMgmt *pMgmt); // mmHandle.c SArray *mmGetMsgHandles(); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 68b2889278..43113d05af 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -110,6 +110,7 @@ static void mmClose(SMnodeMgmt *pMgmt) { if (pMgmt->pMnode != NULL) { mmStopWorker(pMgmt); mndClose(pMgmt->pMnode); + taosThreadRwlockDestroy(&pMgmt->lock); pMgmt->pMnode = NULL; } @@ -142,6 +143,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue; pMgmt->msgCb.mgmt = pMgmt; + taosThreadRwlockInit(&pMgmt->lock, NULL); bool deployed = false; if (mmReadFile(pMgmt, &deployed) != 0) { @@ -211,3 +213,22 @@ SMgmtFunc mmGetMgmtFunc() { return mgmtFunc; } + +int32_t mmAcquire(SMnodeMgmt *pMgmt) { + int32_t code = 0; + + taosThreadRwlockRdlock(&pMgmt->lock); + if (pMgmt->stopped) { + code = -1; + } else { + atomic_add_fetch_32(&pMgmt->refCount, 1); + } + taosThreadRwlockUnlock(&pMgmt->lock); + return code; +} + +void mmRelease(SMnodeMgmt *pMgmt) { + taosThreadRwlockRdlock(&pMgmt->lock); + atomic_sub_fetch_32(&pMgmt->refCount, 1); + taosThreadRwlockUnlock(&pMgmt->lock); +} \ No newline at end of file diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index f408992aa6..45dec1153f 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -111,7 +111,10 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); + if (mmAcquire(pMgmt) != 0) return -1; + int32_t code = mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); + mmRelease(pMgmt); + return code; } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { @@ -180,6 +183,11 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { } void mmStopWorker(SMnodeMgmt *pMgmt) { + taosThreadRwlockWrlock(&pMgmt->lock); + pMgmt->stopped = 1; + taosThreadRwlockUnlock(&pMgmt->lock); + while (pMgmt->refCount > 0) taosMsleep(10); + tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->readWorker); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 84da778300..e3f7b40526 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -152,9 +152,10 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { } void mndSyncStart(SMnode *pMnode) { - syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb); - syncStart(pMnode->syncMgmt.sync); - mDebug("sync:%" PRId64 " is started", pMnode->syncMgmt.sync); + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + syncSetMsgCb(pMgmt->sync, &pMnode->msgCb); + syncStart(pMgmt->sync); + mDebug("sync:%" PRId64 " is started", pMgmt->sync); } void mndSyncStop(SMnode *pMnode) {} diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 535d04e565..5b8c1a3101 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -435,8 +435,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { } return ret; - - return 0; } int32_t mndProcessMsg(SRpcMsg *pMsg) {