diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a13d203889..559dc1009d 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -153,10 +153,10 @@ typedef struct SSyncFSM { void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm); int32_t (*FpGetSnapshot)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader); - int32_t (*FpGetSnapshotInfo)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot); + void (*FpGetSnapshotInfo)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpSnapshotStartRead)(const struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader); - int32_t (*FpSnapshotStopRead)(const struct SSyncFSM* pFsm, void* pReader); + void (*FpSnapshotStopRead)(const struct SSyncFSM* pFsm, void* pReader); int32_t (*FpSnapshotDoRead)(const struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len); int32_t (*FpSnapshotStartWrite)(const struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index c96faddc4c..54d8aa7f60 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -142,10 +142,9 @@ int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pRe return 0; } -int32_t mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { +static void mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { SMnode *pMnode = pFsm->data; sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); - return 0; } void mndRestoreFinish(const SSyncFSM *pFsm) { @@ -170,10 +169,10 @@ int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL); } -int32_t mndSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) { +static void mndSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) { mInfo("stop to read snapshot from sdb"); SMnode *pMnode = pFsm->data; - return sdbStopRead(pMnode->pSdb, pReader); + sdbStopRead(pMnode->pSdb, pReader); } int32_t mndSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index a6d81ecc0d..e799f08a17 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -392,7 +392,7 @@ void *sdbGetRowObj(SSdbRow *pRow); void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc); int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config); -int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter); +void sdbStopRead(SSdb *pSdb, SSdbIter *pIter); int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len); int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter); diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 318a746bb0..f43b6bdb25 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -585,10 +585,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *ter return 0; } -int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter) { - sdbCloseIter(pIter); - return 0; -} +void sdbStopRead(SSdb *pSdb, SSdbIter *pIter) { sdbCloseIter(pIter); } int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len) { int32_t maxlen = 4096; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 540f0c3127..00e85aa593 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -264,7 +264,7 @@ int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t * // SVSnapReader int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader); -int32_t vnodeSnapReaderClose(SVSnapReader *pReader); +void vnodeSnapReaderClose(SVSnapReader *pReader); int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData); // SVSnapWriter int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 266fdde2df..99acc24992 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1370,7 +1370,7 @@ _exit: taosMemoryFree(pWriter); } } else { - tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); + tsdbInfo("vgId:%d, %s done", TD_VID(pTsdb->pVnode), __func__); *ppWriter = pWriter; } return code; @@ -1391,7 +1391,7 @@ int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) { _exit: if (code) { - tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); + tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); } return code; } @@ -1442,7 +1442,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) { tFree(pWriter->aBuf[iBuf]); } - tsdbInfo("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__); + tsdbInfo("vgId:%d, %s done", TD_VID(pWriter->pTsdb->pVnode), __func__); taosMemoryFree(pWriter); *ppWriter = NULL; return code; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 4daab074b5..be977e7cbd 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -234,10 +234,10 @@ int vnodeAsyncCommit(SVnode *pVnode) { _exit: if (code) { - vError("vgId:%d %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), + vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), pVnode->state.commitID); } else { - vDebug("vgId:%d %s done", TD_VID(pVnode), __func__); + vDebug("vgId:%d, %s done", TD_VID(pVnode), __func__); } return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 40705e553b..fcfacd1ca9 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -67,9 +67,8 @@ _err: return code; } -int32_t vnodeSnapReaderClose(SVSnapReader *pReader) { - int32_t code = 0; - +void vnodeSnapReaderClose(SVSnapReader *pReader) { + vInfo("vgId:%d, close vnode snapshot reader", TD_VID(pReader->pVnode)); if (pReader->pRsmaReader) { rsmaSnapReaderClose(&pReader->pRsmaReader); } @@ -82,9 +81,7 @@ int32_t vnodeSnapReaderClose(SVSnapReader *pReader) { metaSnapReaderClose(&pReader->pMetaReader); } - vInfo("vgId:%d, vnode snapshot reader closed", TD_VID(pReader->pVnode)); taosMemoryFree(pReader); - return code; } int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6092888136..0668a01e32 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1189,7 +1189,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i); char *name = pOneReq->tbname; if (metaGetTableEntryByName(&mr, name) < 0) { - vDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name); + vDebug("vgId:%d, stream delete msg, skip since no table: %s", pVnode->config.vgId, name); continue; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 2c23646db1..5caaae502f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -380,9 +380,8 @@ static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return code; } -static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { +static void vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { vnodeGetSnapshot(pFsm->data, pSnapshot); - return 0; } static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { @@ -424,10 +423,9 @@ static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void * return code; } -static int32_t vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) { +static void vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) { SVnode *pVnode = pFsm->data; - int32_t code = vnodeSnapReaderClose(pReader); - return code; + vnodeSnapReaderClose(pReader); } static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { @@ -539,7 +537,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpCommitCb = vnodeSyncCommitMsg; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg; - pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot; + pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo; pFsm->FpRestoreFinishCb = vnodeRestoreFinish; pFsm->FpLeaderTransferCb = NULL; pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 524abf3c2a..b83be2bebb 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -64,7 +64,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ASSERT(pMsg->term == ths->pRaftStore->currentTerm); - sTrace("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", + sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); if (pMsg->success) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 6a545424fc..b3efcb4823 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -815,11 +815,9 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) { ASSERTS(pNode->pLogStore != NULL, "log store not created"); ASSERTS(pNode->pFsm != NULL, "pFsm not registered"); ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered"); - SSnapshot snapshot; - if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) { - sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr()); - return -1; - } + SSnapshot snapshot = {0}; + pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); + SyncIndex commitIndex = snapshot.lastApplyIndex; SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); @@ -1029,11 +1027,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { SyncIndex commitIndex = SYNC_INDEX_INVALID; if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { SSnapshot snapshot = {0}; - int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); - if (code != 0) { - sError("vgId:%d, failed to get snapshot info, code:%d", pSyncNode->vgId, code); - goto _error; - } + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); if (snapshot.lastApplyIndex > commitIndex) { commitIndex = snapshot.lastApplyIndex; sNTrace(pSyncNode, "reset commit index by snapshot"); @@ -1155,9 +1149,8 @@ _error: void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) { if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { - SSnapshot snapshot; - int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); - ASSERT(code == 0); + SSnapshot snapshot = {0}; + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); if (snapshot.lastApplyIndex > pSyncNode->commitIndex) { pSyncNode->commitIndex = snapshot.lastApplyIndex; } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index d875d3ca09..f438856ace 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -99,8 +99,9 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S return prevLogTerm; } - SSnapshot snapshot; - if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) == 0 && prevIndex == snapshot.lastApplyIndex) { + SSnapshot snapshot = {0}; + pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); + if (prevIndex == snapshot.lastApplyIndex) { return snapshot.lastApplyTerm; } @@ -145,11 +146,9 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { ASSERTS(pNode->pFsm != NULL, "pFsm not registered"); ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered"); - SSnapshot snapshot; - if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) { - sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr()); - goto _err; - } + SSnapshot snapshot = {0}; + pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); + SyncIndex commitIndex = snapshot.lastApplyIndex; SyncTerm commitTerm = TMAX(snapshot.lastApplyTerm, 0); if (syncLogValidateAlignmentOfCommit(pNode, commitIndex)) { diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 540f40a4c0..6ec4692307 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -47,7 +47,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->term = pSyncNode->pRaftStore->currentTerm; pSender->startTime = 0; pSender->endTime = 0; - pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); + pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); pSender->finish = false; } else { sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId); @@ -66,10 +66,7 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { // close reader if (pSender->pReader != NULL) { - int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); - if (ret != 0) { - sNError(pSender->pSyncNode, "stop reader error"); - } + pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); pSender->pReader = NULL; } @@ -139,8 +136,7 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { // close reader if (pSender->pReader != NULL) { - int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); - ASSERT(ret == 0); + pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); pSender->pReader = NULL; } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index ed02d29e3b..07228bddd8 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -350,7 +350,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { int64_t contLen; bool seeked = false; - wDebug("vgId:%d try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 + wDebug("vgId:%d, try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 ", applied ver:%" PRId64, pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer); @@ -405,7 +405,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { int64_t code; - wDebug("vgId:%d skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 + wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 ", applied ver:%" PRId64, pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer); @@ -429,7 +429,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { SWalCont *pReadHead = &((*ppHead)->head); int64_t ver = pReadHead->version; - wDebug("vgId:%d fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 + wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 ", applied ver:%" PRId64, pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);