From 9d107399081ee79fca069fba94fcc7008bcfd516 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 9 Mar 2023 16:18:44 +0800 Subject: [PATCH 1/8] enh: send rsp of negotiation failure ahead of apply queue --- source/dnode/vnode/src/vnd/vnodeSvr.c | 8 +------- source/dnode/vnode/src/vnd/vnodeSync.c | 13 ++++++++++++- source/libs/sync/src/syncPipeline.c | 2 +- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index fb8d230eba..222e795424 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -306,13 +306,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp void *pReq; int32_t len; int32_t ret; - /* - if (!pVnode->inUse) { - terrno = TSDB_CODE_VND_NO_AVAIL_BUFPOOL; - vError("vgId:%d, not ready to write since %s", TD_VID(pVnode), terrstr()); - return -1; - } - */ + if (version <= pVnode->state.applied) { vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version, pVnode->state.applied); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index e71b03d2af..989fdaf515 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -429,7 +429,18 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code); - return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg); + if (pMsg->code == 0) { + return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg); + } + + vnodePostBlockMsg(pVnode, pMsg); + SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; + if (rsp.info.handle != NULL) { + tmsgSendRsp(&rsp); + } + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + return 0; } static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index cc1a40a430..0dc723e316 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -475,7 +475,7 @@ _out: int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry, int32_t applyCode) { - if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) { + if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1) { return 0; } From 1392711e66ccbd144c1ae16cf0f1ffc818a50872 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 9 Mar 2023 18:12:40 +0800 Subject: [PATCH 2/8] enh: add fsm FpAppliedIndexCb to get the current applied index --- include/libs/sync/sync.h | 1 + source/dnode/mnode/impl/inc/mndInt.h | 1 + source/dnode/mnode/impl/src/mndSync.c | 14 ++++++++++++++ source/dnode/vnode/src/vnd/vnodeSvr.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeSync.c | 6 ++++++ 5 files changed, 24 insertions(+), 2 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 189484d1a6..5ea90906a8 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -143,6 +143,7 @@ typedef struct SSyncFSM { void* data; int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); + SyncIndex (*FpAppliedIndexCb)(const struct SSyncFSM* pFsm); int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 006c74ef3d..ffb2443808 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -113,6 +113,7 @@ typedef struct SMnode { bool deploy; char *path; int64_t checkTime; + SyncIndex applied; SSdb *pSdb; SArray *pSteps; SQHandle *pQuery; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index f702d8f148..ce3caaad6c 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -129,9 +129,13 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { int32_t code = 0; + SMnode *pMnode = pFsm->data; + atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex); + if (!syncUtilUserCommit(pMsg->msgType)) { goto _out; } + code = mndProcessWriteMsg(pFsm, pMsg, pMeta); _out: @@ -140,6 +144,11 @@ _out: return code; } +SyncIndex mndSyncAppliedIndex(const SSyncFSM *pFSM) { + SMnode *pMnode = pFSM->data; + return atomic_load_64(&pMnode->applied); +} + int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) { mInfo("start to read snapshot from sdb in atomic way"); SMnode *pMnode = pFsm->data; @@ -253,6 +262,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pMnode; pFsm->FpCommitCb = mndSyncCommitMsg; + pFsm->FpAppliedIndexCb = mndSyncAppliedIndex; pFsm->FpPreCommitCb = NULL; pFsm->FpRollBackCb = NULL; pFsm->FpRestoreFinishCb = mndRestoreFinish; @@ -321,6 +331,10 @@ int32_t mndInitSync(SMnode *pMnode) { } pMnode->pSdb->sync = pMgmt->sync; + SSnapshot snap = {0}; + sdbGetCommitInfo(pMnode->pSdb, &snap.lastApplyIndex, &snap.lastApplyTerm, &snap.lastConfigIndex); + atomic_store_64(&pMnode->applied, snap.lastApplyIndex); + mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 222e795424..bfcf401ca8 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -320,8 +320,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm); ASSERT(pVnode->state.applied + 1 == version); - pVnode->state.applied = version; - pVnode->state.applyTerm = pMsg->info.conn.applyTerm; + atomic_store_64(&pVnode->state.applied, version); + atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm); if (!syncUtilUserCommit(pMsg->msgType)) goto _exit; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 989fdaf515..9613afd837 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -454,6 +454,11 @@ static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const return 0; } +static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) { + SVnode *pVnode = pFSM->data; + return atomic_load_64(&pVnode->state.applied); +} + static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { SVnode *pVnode = pFsm->data; vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", @@ -580,6 +585,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pVnode; pFsm->FpCommitCb = vnodeSyncCommitMsg; + pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo; From 552d0bc8a024fc257edb8a8d0ed452988efefd8c Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 9 Mar 2023 18:56:12 +0800 Subject: [PATCH 3/8] enh: not allow to insert if Tsdb applied lagging behind too far --- include/util/taoserror.h | 3 ++- source/libs/sync/src/syncMain.c | 4 ++-- source/libs/sync/src/syncPipeline.c | 11 ++++++++++- source/util/src/terror.c | 1 + 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 5106196ccd..1e04cbf6e3 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -541,7 +541,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913) #define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914) #define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal -#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916) // +#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916) +#define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) // tq diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 04e4859c5e..592fe7ce7f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2165,8 +2165,8 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { // append to log buffer if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) { sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index); - terrno = TSDB_CODE_SYN_BUFFER_FULL; - (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, TSDB_CODE_SYN_BUFFER_FULL); + ASSERT(terrno != 0); + (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno); syncEntryDestroy(pEntry); return -1; } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 0dc723e316..e28a2a6872 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -48,7 +48,16 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt SyncIndex index = pEntry->index; if (index - pBuf->startIndex >= pBuf->size) { - sError("vgId:%d, failed to append due to sync log buffer full. index:%" PRId64 "", pNode->vgId, index); + terrno = TSDB_CODE_SYN_BUFFER_FULL; + sError("vgId:%d, failed to append since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); + goto _err; + } + + SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm); + if (index - appliedIndex >= pBuf->size) { + terrno = TSDB_CODE_SYN_WRITE_STALL; + sError("vgId:%d, failed to append since %s. index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64, + pNode->vgId, terrstr(), index, pBuf->commitIndex, appliedIndex); goto _err; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 33b562c8dd..8340a82761 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -422,6 +422,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync is restoring") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") //tq From 5e53b1225b74ed4d2a7ef288ddcf731e8a9ed97b Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 9 Mar 2023 19:29:09 +0800 Subject: [PATCH 4/8] enh: finish restore with commit and applied indexes instead of num of items in apply queue --- include/libs/sync/sync.h | 2 +- source/dnode/mnode/impl/src/mndSync.c | 2 +- source/dnode/vnode/src/vnd/vnodeSync.c | 18 ++++++++++++------ source/libs/sync/src/syncPipeline.c | 6 +++--- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 5ea90906a8..08a6be8015 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -147,7 +147,7 @@ typedef struct SSyncFSM { int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); - void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm); + void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx); void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SReConfigCbMeta* pMeta); void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index ce3caaad6c..998e8b71ab 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -162,7 +162,7 @@ static void mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); } -void mndRestoreFinish(const SSyncFSM *pFsm) { +void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { SMnode *pMnode = pFsm->data; if (!pMnode->deploy) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 9613afd837..594a64ea37 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -521,21 +521,27 @@ static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *p return code; } -static void vnodeRestoreFinish(const SSyncFSM *pFsm) { +static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { SVnode *pVnode = pFsm->data; + SyncIndex appliedIdx = -1; do { - int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE); - if (itemSize == 0) { - vInfo("vgId:%d, apply queue is empty, restore finish", pVnode->config.vgId); + appliedIdx = vnodeSyncAppliedIndex(pFsm); + ASSERT(appliedIdx <= commitIdx); + if (appliedIdx == commitIdx) { + vInfo("vgId:%d, no more items to be applied, restore finish", pVnode->config.vgId); break; } else { - vInfo("vgId:%d, restore not finish since %d items in apply queue", pVnode->config.vgId, itemSize); + int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE); + vInfo("vgId:%d, restore not finish since %" PRId64 + " items to be applied, and %d in apply queue. commit-index:%" PRId64 ", applied-index:%" PRId64, + pVnode->config.vgId, commitIdx - appliedIdx, itemSize, commitIdx, appliedIdx); taosMsleep(10); } } while (true); - walApplyVer(pVnode->pWal, pVnode->state.applied); + ASSERT(appliedIdx == commitIdx); + walApplyVer(pVnode->pWal, commitIdx); pVnode->restored = true; vInfo("vgId:%d, sync restore finished", pVnode->config.vgId); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index e28a2a6872..ee68824dc8 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -596,10 +596,10 @@ _out: // mark as restored if needed if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL && currentTerm <= pEntry->term) { - pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); + pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex); pNode->restoreFinish = true; - sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, - pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + sInfo("vgId:%d, restore finished. term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); } if (!inBuf) { From 96e70c2795b1ffe823955dd8191443f968129098 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 9 Mar 2023 20:00:25 +0800 Subject: [PATCH 5/8] enh: update logging msg in vnodeRestoreFinish --- source/dnode/vnode/src/vnd/vnodeSync.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 594a64ea37..f942b3fa49 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -529,13 +529,12 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) appliedIdx = vnodeSyncAppliedIndex(pFsm); ASSERT(appliedIdx <= commitIdx); if (appliedIdx == commitIdx) { - vInfo("vgId:%d, no more items to be applied, restore finish", pVnode->config.vgId); + vInfo("vgId:%d, no items to be applied, restore finish", pVnode->config.vgId); break; } else { - int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE); - vInfo("vgId:%d, restore not finish since %" PRId64 - " items to be applied, and %d in apply queue. commit-index:%" PRId64 ", applied-index:%" PRId64, - pVnode->config.vgId, commitIdx - appliedIdx, itemSize, commitIdx, appliedIdx); + vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64 + ", applied-index:%" PRId64, + pVnode->config.vgId, commitIdx - appliedIdx, commitIdx, appliedIdx); taosMsleep(10); } } while (true); From 66631229ebf0f4650ba3ae6d2b47d1cd3427ea12 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 9 Mar 2023 20:31:11 +0800 Subject: [PATCH 6/8] fix: initialize and update pMnode->applied properly --- source/dnode/mnode/impl/src/mndMain.c | 8 +++++--- source/dnode/mnode/impl/src/mndSync.c | 9 +++++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index d83b969e2d..c32212dfc1 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -380,11 +380,13 @@ static int32_t mndInitSdb(SMnode *pMnode) { } static int32_t mndOpenSdb(SMnode *pMnode) { + int32_t code = 0; if (!pMnode->deploy) { - return sdbReadFile(pMnode->pSdb); - } else { - return 0; + code = sdbReadFile(pMnode->pSdb); } + + atomic_store_64(&pMnode->applied, pMnode->pSdb->commitIndex); + return code; } static void mndCleanupSdb(SMnode *pMnode) { diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 998e8b71ab..dce7a9aadf 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -130,6 +130,9 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { int32_t code = 0; SMnode *pMnode = pFsm->data; + pMsg->info.conn.applyIndex = pMeta->index; + pMsg->info.conn.applyTerm = pMeta->term; + atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex); if (!syncUtilUserCommit(pMsg->msgType)) { @@ -176,6 +179,8 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { } else { mInfo("vgId:1, sync restore finished"); } + + ASSERT(commitIdx == mndSyncAppliedIndex(pFsm)); } int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) { @@ -331,10 +336,6 @@ int32_t mndInitSync(SMnode *pMnode) { } pMnode->pSdb->sync = pMgmt->sync; - SSnapshot snap = {0}; - sdbGetCommitInfo(pMnode->pSdb, &snap.lastApplyIndex, &snap.lastApplyTerm, &snap.lastConfigIndex); - atomic_store_64(&pMnode->applied, snap.lastApplyIndex); - mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync); return 0; } From 8139b725d2320a2a8d6380996806af188b54295f Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 10 Mar 2023 10:57:39 +0800 Subject: [PATCH 7/8] fix: refuse to write when applying progress lagging behind on restored only --- source/libs/sync/src/syncPipeline.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index ee68824dc8..6600b505c1 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -54,7 +54,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt } SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm); - if (index - appliedIndex >= pBuf->size) { + if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= pBuf->size) { terrno = TSDB_CODE_SYN_WRITE_STALL; sError("vgId:%d, failed to append since %s. index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64, pNode->vgId, terrstr(), index, pBuf->commitIndex, appliedIndex); From d679eaf6481d89fc1c1d3fdbb217e4094abcc67f Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 10 Mar 2023 18:10:25 +0800 Subject: [PATCH 8/8] enh: update pMnode->applied only when pMsg->code as zero --- source/dnode/mnode/impl/src/mndSync.c | 7 ++++--- source/dnode/vnode/src/vnd/vnodeSync.c | 17 +++++++++++------ 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index dce7a9aadf..18548db56f 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -129,16 +129,17 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { int32_t code = 0; - SMnode *pMnode = pFsm->data; pMsg->info.conn.applyIndex = pMeta->index; pMsg->info.conn.applyTerm = pMeta->term; - atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex); + if (pMsg->code == 0) { + SMnode *pMnode = pFsm->data; + atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex); + } if (!syncUtilUserCommit(pMsg->msgType)) { goto _out; } - code = mndProcessWriteMsg(pFsm, pMsg, pMeta); _out: diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index f942b3fa49..b49ca70bfa 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -429,24 +429,29 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code); + return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg); +} + +static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { if (pMsg->code == 0) { - return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg); + return vnodeSyncApplyMsg(pFsm, pMsg, pMeta); } + const STraceId *trace = &pMsg->info.traceId; + SVnode *pVnode = pFsm->data; vnodePostBlockMsg(pVnode, pMsg); + SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; if (rsp.info.handle != NULL) { tmsgSendRsp(&rsp); } + + vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code, pMeta->index); rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; return 0; } -static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { - return vnodeSyncApplyMsg(pFsm, pMsg, pMeta); -} - static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { if (pMeta->isWeak == 1) { return vnodeSyncApplyMsg(pFsm, pMsg, pMeta); @@ -539,7 +544,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) } } while (true); - ASSERT(appliedIdx == commitIdx); + ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm)); walApplyVer(pVnode->pWal, commitIdx); pVnode->restored = true;