diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index ddc036b606..ea85b796d5 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -77,7 +77,7 @@ static FORCE_INLINE int32_t syncLogReplGetNextRetryBackoff(SSyncLogReplMgr* pMgr return TMIN(pMgr->retryBackoff + 1, SYNC_MAX_RETRY_BACKOFF); } -SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); +int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pSyncTerm); int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode); @@ -93,10 +93,10 @@ int32_t syncLogReplContinue(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendE int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg); // SSyncLogBuffer -SSyncLogBuffer* syncLogBufferCreate(); -void syncLogBufferDestroy(SSyncLogBuffer* pBuf); -int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); -int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); +int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf); +void syncLogBufferDestroy(SSyncLogBuffer* pBuf); +int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); +int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); // access int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); @@ -110,9 +110,10 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode); // private -SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf); -int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf); -int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex); +int32_t syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf, + SSyncRaftEntry** ppEntry); +int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf); +int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex); int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry, int32_t applyCode, bool force); diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index b13c69e471..7f8cb78ea1 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -43,7 +43,7 @@ SSyncRaftEntry* syncEntryBuildFromRpcMsg(const SRpcMsg* pMsg, SyncTerm term, Syn SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg); SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId); void syncEntryDestroy(SSyncRaftEntry* pEntry); -void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg); // step 7 +int32_t syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg); // step 7 static FORCE_INLINE bool syncLogReplBarrier(SSyncRaftEntry* pEntry) { return pEntry->originalRpcType == TDMT_SYNC_NOOP || pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b455e3355f..2c43ba67d6 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -949,6 +949,7 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) { // open/close -------------- SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { + int32_t code = 0; SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode)); if (pSyncNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1041,9 +1042,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg; // create raft log ring buffer - pSyncNode->pLogBuf = syncLogBufferCreate(); + code = syncLogBufferCreate(&pSyncNode->pLogBuf); if (pSyncNode->pLogBuf == NULL) { - sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId); + sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId); goto _error; } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 796a45d997..33174ff85e 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -19,13 +19,13 @@ #include "syncCommit.h" #include "syncIndexMgr.h" #include "syncInt.h" +#include "syncRaftCfg.h" #include "syncRaftEntry.h" #include "syncRaftStore.h" #include "syncReplication.h" #include "syncRespMgr.h" #include "syncSnapshot.h" #include "syncUtil.h" -#include "syncRaftCfg.h" #include "syncVoteMgr.h" static bool syncIsMsgBlock(tmsg_t type) { @@ -45,28 +45,29 @@ int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) { } int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) { + int32_t code = 0; taosThreadMutexLock(&pBuf->mutex); syncLogBufferValidate(pBuf); SyncIndex index = pEntry->index; if (index - pBuf->startIndex >= pBuf->size) { - terrno = TSDB_CODE_SYN_BUFFER_FULL; - sError("vgId:%d, failed to append since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); + code = TSDB_CODE_SYN_BUFFER_FULL; + sError("vgId:%d, failed to append since %s. index:%" PRId64 "", pNode->vgId, tstrerror(code), index); goto _err; } if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) { - terrno = TSDB_CODE_SYN_NEGOTIATION_WIN_FULL; - sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, terrstr(), + code = TSDB_CODE_SYN_NEGOTIATION_WIN_FULL; + sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, tstrerror(code), index, pBuf->commitIndex); goto _err; } SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm); if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= TSDB_SYNC_APPLYQ_SIZE_LIMIT) { - terrno = TSDB_CODE_SYN_WRITE_STALL; + code = TSDB_CODE_SYN_WRITE_STALL; sError("vgId:%d, failed to append since %s. index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64, - pNode->vgId, terrstr(), index, pBuf->commitIndex, appliedIndex); + pNode->vgId, tstrerror(code), index, pBuf->commitIndex, appliedIndex); goto _err; } @@ -93,21 +94,24 @@ _err: syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); taosMsleep(1); - return -1; + TAOS_RETURN(code); } -SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { +int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pSyncTerm) { SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncRaftEntry* pEntry = NULL; SyncIndex prevIndex = index - 1; SyncTerm prevLogTerm = -1; - terrno = TSDB_CODE_SUCCESS; + int32_t code = 0; - if (prevIndex == -1 && pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore) == 0) return 0; + if (prevIndex == -1 && pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore) == 0) { + *pSyncTerm = 0; + return 0; + } if (prevIndex > pBuf->matchIndex) { - terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; - return -1; + *pSyncTerm = -1; + TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST); } ASSERT(index - 1 == prevIndex); @@ -116,7 +120,8 @@ SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem; ASSERTS(pEntry != NULL, "no log entry found"); prevLogTerm = pEntry->term; - return prevLogTerm; + *pSyncTerm = prevLogTerm; + return 0; } if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) { @@ -124,25 +129,28 @@ SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync ASSERTS(timeMs != 0, "no log entry found"); prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term; ASSERT(prevIndex == 0 || prevLogTerm != 0); - return prevLogTerm; + *pSyncTerm = prevLogTerm; + return 0; } SSnapshot snapshot = {0}; - pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); + (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); // TODO: check the return code if (prevIndex == snapshot.lastApplyIndex) { - return snapshot.lastApplyTerm; + *pSyncTerm = snapshot.lastApplyTerm; + return 0; } - if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry) == 0) { + if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry)) == 0) { prevLogTerm = pEntry->term; syncEntryDestroy(pEntry); pEntry = NULL; - return prevLogTerm; + *pSyncTerm = prevLogTerm; + return 0; } - sInfo("vgId:%d, failed to get log term since %s. index:%" PRId64, pNode->vgId, terrstr(), prevIndex); - terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; - return -1; + *pSyncTerm = -1; + sInfo("vgId:%d, failed to get log term since %s. index:%" PRId64, pNode->vgId, tstrerror(code), prevIndex); + TAOS_RETURN(code); } SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) { @@ -155,7 +163,7 @@ int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1. firstVer:%" PRId64 ", tsdb commit version:%" PRId64 "", pNode->vgId, firstVer, commitIndex); - return -1; + return TSDB_CODE_WAL_LOG_INCOMPLETE; } SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); @@ -163,7 +171,7 @@ int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer:%" PRId64 ", tsdb commit version:%" PRId64 "", pNode->vgId, lastVer, commitIndex); - return -1; + return TSDB_CODE_WAL_LOG_INCOMPLETE; } return 0; @@ -174,15 +182,13 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { ASSERTS(pNode->pFsm != NULL, "pFsm not registered"); ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered"); + int32_t code = 0, lino = 0; SSnapshot snapshot = {0}; pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); SyncIndex commitIndex = snapshot.lastApplyIndex; SyncTerm commitTerm = TMAX(snapshot.lastApplyTerm, 0); - if (syncLogValidateAlignmentOfCommit(pNode, commitIndex)) { - terrno = TSDB_CODE_WAL_LOG_INCOMPLETE; - goto _err; - } + TAOS_CHECK_EXIT(syncLogValidateAlignmentOfCommit(pNode, commitIndex)); SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); ASSERT(lastVer >= commitIndex); @@ -206,7 +212,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { } if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) { - sWarn("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); + sWarn("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, tstrerror(code), index); break; } @@ -237,8 +243,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId); if (pDummy == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm}; pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp; @@ -261,8 +266,11 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { syncLogBufferValidate(pBuf); return 0; -_err: - return -1; +_exit: + if (code != 0) { + sError("vgId:%d, failed to initialize sync log buffer at line %d since %s.", pNode->vgId, lino, tstrerror(code)); + } + TAOS_RETURN(code); } int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { @@ -283,13 +291,13 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); } pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0; - int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode); - if (ret < 0) { - sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr()); + int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode); + if (code < 0) { + sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code)); } syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); - return ret; + return code; } FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) { @@ -316,7 +324,7 @@ bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) { int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) { taosThreadMutexLock(&pBuf->mutex); syncLogBufferValidate(pBuf); - int32_t ret = -1; + int32_t code = 0; SyncIndex index = pEntry->index; SyncIndex prevIndex = pEntry->index - 1; SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf); @@ -328,21 +336,22 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); - SyncTerm term = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1); + SyncTerm term = -1; + code = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1, &term); ASSERT(pEntry->term >= 0); if (term == pEntry->term) { - ret = 0; + code = 0; } goto _out; } - if(pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER && - index > 0 && index > pBuf->totalIndex){ + if (pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER && index > 0 && + index > pBuf->totalIndex) { pBuf->totalIndex = index; sTrace("vgId:%d, update learner progress. index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64 - " != lastmatch:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex, - pBuf->matchIndex, pBuf->endIndex); + " != lastmatch:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex, + pBuf->matchIndex, pBuf->endIndex); } if (index - pBuf->startIndex >= pBuf->size) { @@ -350,6 +359,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + code = TSDB_CODE_OUT_OF_RANGE; goto _out; } @@ -358,11 +368,12 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt " != lastmatch:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + code = TSDB_CODE_ACTION_IN_PROGRESS; goto _out; } // check current in buffer - pExist = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); + code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pExist); if (pExist != NULL) { ASSERT(pEntry->index == pExist->index); if (pEntry->term != pExist->term) { @@ -372,9 +383,10 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); - SyncTerm existPrevTerm = syncLogReplGetPrevLogTerm(NULL, pNode, index); + SyncTerm existPrevTerm = -1; + (void)syncLogReplGetPrevLogTerm(NULL, pNode, index, &existPrevTerm); ASSERT(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm)); - ret = 0; + code = 0; goto _out; } } @@ -391,7 +403,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt pBuf->endIndex = TMAX(index + 1, pBuf->endIndex); // success - ret = 0; + code = 0; _out: syncEntryDestroy(pEntry); @@ -401,7 +413,7 @@ _out: } syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); - return ret; + TAOS_RETURN(code); } static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replicaNum) { @@ -409,20 +421,21 @@ static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replica } int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) { + int32_t code = 0; ASSERT(pEntry->index >= 0); SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore); - if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) { - sError("failed to truncate log store since %s. from index:%" PRId64 "", terrstr(), pEntry->index); - return -1; + if (lastVer >= pEntry->index && (code = pLogStore->syncLogTruncate(pLogStore, pEntry->index)) < 0) { + sError("failed to truncate log store since %s. from index:%" PRId64 "", tstrerror(code), pEntry->index); + TAOS_RETURN(code); } lastVer = pLogStore->syncLogLastIndex(pLogStore); ASSERT(pEntry->index == lastVer + 1); bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum); - if (pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync) < 0) { - sError("failed to append sync log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index, - pEntry->term); - return -1; + if ((code = pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync)) < 0) { + sError("failed to append sync log entry since %s. index:%" PRId64 ", term:%" PRId64 "", tstrerror(code), + pEntry->index, pEntry->term); + TAOS_RETURN(code); } lastVer = pLogStore->syncLogLastIndex(pLogStore); @@ -430,12 +443,13 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaf return 0; } -int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char *str) { +int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char* str) { taosThreadMutexLock(&pBuf->mutex); syncLogBufferValidate(pBuf); SSyncLogStore* pLogStore = pNode->pLogStore; int64_t matchIndex = pBuf->matchIndex; + int32_t code = 0; while (pBuf->matchIndex + 1 < pBuf->endIndex) { int64_t index = pBuf->matchIndex + 1; @@ -478,39 +492,40 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex); // persist - if (syncLogStorePersist(pLogStore, pNode, pEntry) < 0) { - sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(), - pEntry->index); + if ((code = syncLogStorePersist(pLogStore, pNode, pEntry)) < 0) { + sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, + tstrerror(code), pEntry->index); taosMsleep(1); goto _out; } - if(pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){ - if(pNode->pLogBuf->commitIndex == pEntry->index -1){ - sInfo("vgId:%d, to change config at %s. " - "current entry, index:%" PRId64 ", term:%" PRId64", " - "node, restore:%d, commitIndex:%" PRId64 ", " - "cond: (pre entry index:%" PRId64 "== buf commit index:%" PRId64 ")", - pNode->vgId, str, - pEntry->index, pEntry->term, - pNode->restoreFinish, pNode->commitIndex, - pEntry->index - 1, pNode->pLogBuf->commitIndex); - if(syncNodeChangeConfig(pNode, pEntry, str) != 0){ - sError("vgId:%d, failed to change config from Append since %s. index:%" PRId64, pNode->vgId, terrstr(), - pEntry->index); + if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) { + if (pNode->pLogBuf->commitIndex == pEntry->index - 1) { + sInfo( + "vgId:%d, to change config at %s. " + "current entry, index:%" PRId64 ", term:%" PRId64 + ", " + "node, restore:%d, commitIndex:%" PRId64 + ", " + "cond: (pre entry index:%" PRId64 "== buf commit index:%" PRId64 ")", + pNode->vgId, str, pEntry->index, pEntry->term, pNode->restoreFinish, pNode->commitIndex, pEntry->index - 1, + pNode->pLogBuf->commitIndex); + if ((code = syncNodeChangeConfig(pNode, pEntry, str)) != 0) { + sError("vgId:%d, failed to change config from Append since %s. index:%" PRId64, pNode->vgId, tstrerror(code), + pEntry->index); goto _out; } - } - else{ - sInfo("vgId:%d, delay change config from Node %s. " - "curent entry, index:%" PRId64 ", term:%" PRId64 ", " - "node, commitIndex:%" PRId64 ", pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), " - "cond:( pre entry index:%" PRId64" != buf commit index:%" PRId64 ")", - pNode->vgId, str, - pEntry->index, pEntry->term, - pNode->commitIndex, pNode->pLogBuf->startIndex, pNode->pLogBuf->commitIndex, - pNode->pLogBuf->matchIndex, pNode->pLogBuf->endIndex, - pEntry->index - 1, pNode->pLogBuf->commitIndex); + } else { + sInfo( + "vgId:%d, delay change config from Node %s. " + "curent entry, index:%" PRId64 ", term:%" PRId64 + ", " + "node, commitIndex:%" PRId64 ", pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 + "), " + "cond:( pre entry index:%" PRId64 " != buf commit index:%" PRId64 ")", + pNode->vgId, str, pEntry->index, pEntry->term, pNode->commitIndex, pNode->pLogBuf->startIndex, + pNode->pLogBuf->commitIndex, pNode->pLogBuf->matchIndex, pNode->pLogBuf->endIndex, pEntry->index - 1, + pNode->pLogBuf->commitIndex); } } @@ -535,16 +550,16 @@ _out: } int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry, - int32_t applyCode, bool force) { - //learner need to execute fsm when it catch up entry log - //if force is true, keep all contition check to execute fsm - if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1 - && pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER - && force == false) { - sDebug("vgId:%d, not to execute fsm, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x, replicaNum:%d," - "role:%d, restoreFinish:%d", - pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode, - pNode->replicaNum, pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish); + int32_t applyCode, bool force) { + // learner need to execute fsm when it catch up entry log + // if force is true, keep all contition check to execute fsm + if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1 && + pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER && force == false) { + sDebug("vgId:%d, not to execute fsm, index:%" PRId64 ", term:%" PRId64 + ", type:%s code:0x%x, replicaNum:%d," + "role:%d, restoreFinish:%d", + pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode, pNode->replicaNum, + pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish); return 0; } @@ -558,11 +573,11 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe pEntry->term); } - int32_t code = 0; + int32_t code = 0, lino = 0; bool retry = false; do { SRpcMsg rpcMsg = {.code = applyCode}; - syncEntry2OriginalRpc(pEntry, &rpcMsg); + TAOS_CHECK_EXIT(syncEntry2OriginalRpc(pEntry, &rpcMsg)); SFsmCbMeta cbMeta = {0}; cbMeta.index = pEntry->index; @@ -580,9 +595,15 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE); if (retry) { taosMsleep(10); - sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index); + sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, tstrerror(code), pEntry->index); } } while (retry); + +_exit: + if (code < 0) { + sError("vgId:%d, failed to execute fsm at line %d since %s. index:%" PRId64 ", term:%" PRId64 ", type:%s", + pNode->vgId, lino, tstrerror(code), pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType)); + } return code; } @@ -604,7 +625,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm ESyncState role = pNode->state; SyncTerm currentTerm = raftStoreGetTerm(pNode); SyncGroupId vgId = pNode->vgId; - int32_t ret = -1; + int32_t code = 0; int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex); SSyncRaftEntry* pEntry = NULL; bool inBuf = false; @@ -614,7 +635,6 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm if (commitIndex <= pBuf->commitIndex) { sDebug("vgId:%d, stale commit index. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex, commitIndex); - ret = 0; goto _out; } @@ -624,7 +644,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm // execute in fsm for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) { // get a log entry - pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); + code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry); if (pEntry == NULL) { goto _out; } @@ -635,7 +655,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm pEntry->term, TMSG_INFO(pEntry->originalRpcType)); } - if (syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0, false) != 0) { + if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0, false)) != 0) { sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64, vgId, pEntry->index, pEntry->term, role, currentTerm); @@ -646,39 +666,40 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId, pEntry->index, pEntry->term, role, currentTerm); - pNextEntry = syncLogBufferGetOneEntry(pBuf, pNode, index + 1, &nextInBuf); + code = syncLogBufferGetOneEntry(pBuf, pNode, index + 1, &nextInBuf, &pNextEntry); if (pNextEntry != NULL) { - if(pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){ - sInfo("vgId:%d, to change config at Commit. " - "current entry, index:%" PRId64 ", term:%" PRId64", " - "node, role:%d, current term:%" PRId64 ", restore:%d, " - "cond, next entry index:%" PRId64 ", msgType:%s", - vgId, - pEntry->index, pEntry->term, - role, currentTerm, pNode->restoreFinish, - pNextEntry->index, TMSG_INFO(pNextEntry->originalRpcType)); + if (pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) { + sInfo( + "vgId:%d, to change config at Commit. " + "current entry, index:%" PRId64 ", term:%" PRId64 + ", " + "node, role:%d, current term:%" PRId64 + ", restore:%d, " + "cond, next entry index:%" PRId64 ", msgType:%s", + vgId, pEntry->index, pEntry->term, role, currentTerm, pNode->restoreFinish, pNextEntry->index, + TMSG_INFO(pNextEntry->originalRpcType)); - if(syncNodeChangeConfig(pNode, pNextEntry, "Commit") != 0){ + if ((code = syncNodeChangeConfig(pNode, pNextEntry, "Commit")) != 0) { sError("vgId:%d, failed to change config from Commit. index:%" PRId64 ", term:%" PRId64 - ", role:%d, current term:%" PRId64, - vgId, pNextEntry->index, pNextEntry->term, role, currentTerm); - goto _out; + ", role:%d, current term:%" PRId64, + vgId, pNextEntry->index, pNextEntry->term, role, currentTerm); + goto _out; } - //for 2->1, need to apply config change entry in sync thread, - if(pNode->replicaNum == 1){ - if (syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true) != 0) { + // for 2->1, need to apply config change entry in sync thread, + if (pNode->replicaNum == 1) { + if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true)) != 0) { sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64 - ", role:%d, current term:%" PRId64, - vgId, pNextEntry->index, pNextEntry->term, role, currentTerm); + ", role:%d, current term:%" PRId64, + vgId, pNextEntry->index, pNextEntry->term, role, currentTerm); goto _out; } index++; pBuf->commitIndex = index; - sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId, - pNextEntry->index, pNextEntry->term, role, currentTerm); + sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", + pNode->vgId, pNextEntry->index, pNextEntry->term, role, currentTerm); } } if (!nextInBuf) { @@ -703,7 +724,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm pBuf->startIndex = index + 1; } - ret = 0; + code = 0; _out: // mark as restored if needed if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL && @@ -724,7 +745,7 @@ _out: } syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); - return ret; + TAOS_RETURN(code); } void syncLogReplReset(SSyncLogReplMgr* pMgr) { @@ -751,10 +772,10 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { syncLogReplReset(pMgr); sWarn("vgId:%d, reset sync log repl since retry backoff exceeding limit. peer:%" PRIx64, pNode->vgId, pDestId->addr); - return -1; + return TSDB_CODE_OUT_OF_RANGE; } - int32_t ret = -1; + int32_t code = 0; bool retried = false; int64_t retryWaitMs = syncLogReplGetRetryBackoffTimeMs(pMgr); int64_t nowMs = taosGetMonoTimestampMs(); @@ -776,15 +797,16 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { syncLogReplReset(pMgr); sWarn("vgId:%d, reset sync log repl since stagnation. index:%" PRId64 ", peer:%" PRIx64, pNode->vgId, index, pDestId->addr); + code = TSDB_CODE_ACTION_IN_PROGRESS; goto _out; } continue; } bool barrier = false; - if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) { sError("vgId:%d, failed to replicate sync log entry since %s. index:%" PRId64 ", dest:%" PRIx64 "", pNode->vgId, - terrstr(), index, pDestId->addr); + tstrerror(code), index, pDestId->addr); goto _out; } ASSERT(barrier == pMgr->states[pos].barrier); @@ -800,7 +822,6 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { } } - ret = 0; _out: if (retried) { pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr); @@ -811,12 +832,13 @@ _out: pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); } - return ret; + TAOS_RETURN(code); } int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { SSyncLogBuffer* pBuf = pNode->pLogBuf; SRaftId destId = pMsg->srcId; + int32_t code = 0; ASSERT(pMgr->restored == false); if (pMgr->endIndex == 0) { @@ -832,7 +854,7 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn } } else { if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) { - syncLogReplRetryOnNeed(pMgr, pNode); + (void)syncLogReplRetryOnNeed(pMgr, pNode); return 0; } @@ -854,9 +876,9 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn sInfo("vgId:%d, snapshot replication to dnode:%d. reason:%s, match index:%" PRId64 ", last sent:%" PRId64, pNode->vgId, DID(&destId), (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1), pMsg->matchIndex, pMsg->lastSendIndex); - if (syncNodeStartSnapshot(pNode, &destId) < 0) { + if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) { sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId)); - return -1; + TAOS_RETURN(code); } return 0; } @@ -869,10 +891,10 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn errno = 0; if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) { - term = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1); + code = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1, &term); if (term < 0 && (errno == ENFILE || errno == EMFILE)) { - sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64, pNode->vgId, terrstr(), index + 1); - return -1; + sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64, pNode->vgId, tstrerror(code), index + 1); + TAOS_RETURN(code); } if (pMsg->matchIndex == -1) { @@ -888,9 +910,9 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn if ((index + 1 < firstVer) || (term < 0) || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) { ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST); - if (syncNodeStartSnapshot(pNode, &destId) < 0) { + if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) { sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId)); - return -1; + TAOS_RETURN(code); } sInfo("vgId:%d, snapshot replication to peer dnode:%d", pNode->vgId, DID(&destId)); return 0; @@ -957,6 +979,7 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde ASSERT(pMgr->startIndex >= 0); int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs(); int64_t nowMs = taosGetMonoTimestampMs(); + int32_t code = 0; if (pMgr->endIndex > pMgr->startIndex && nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) { @@ -967,10 +990,10 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; SyncTerm term = -1; - if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, - terrstr(), index, pDestId->addr); - return -1; + tstrerror(code), index, pDestId->addr); + TAOS_RETURN(code); } ASSERT(index >= 0); @@ -995,6 +1018,7 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; int32_t batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff)); + int32_t code = 0; int32_t count = 0; int64_t nowMs = taosGetMonoTimestampMs(); int64_t limit = pMgr->size >> 1; @@ -1012,10 +1036,10 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; SyncTerm term = -1; - if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, - terrstr(), index, pDestId->addr); - return -1; + tstrerror(code), index, pDestId->addr); + TAOS_RETURN(code); } pMgr->states[pos].barrier = barrier; pMgr->states[pos].timeMs = nowMs; @@ -1034,7 +1058,7 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { } } - syncLogReplRetryOnNeed(pMgr, pNode); + (void)syncLogReplRetryOnNeed(pMgr, pNode); SSyncLogBuffer* pBuf = pNode->pLogBuf; sTrace("vgId:%d, replicated %d msgs to peer:%" PRIx64 ". indexes:%" PRId64 "..., terms: ...%" PRId64 @@ -1094,8 +1118,7 @@ int32_t syncNodeLogReplInit(SSyncNode* pNode) { ASSERT(pNode->logReplMgrs[i] == NULL); pNode->logReplMgrs[i] = syncLogReplCreate(); if (pNode->logReplMgrs[i] == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } pNode->logReplMgrs[i]->peerId = i; } @@ -1109,11 +1132,14 @@ void syncNodeLogReplDestroy(SSyncNode* pNode) { } } -SSyncLogBuffer* syncLogBufferCreate() { +int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) { + int32_t code = 0; + + *ppBuf = NULL; + SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer)); if (pBuf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]); @@ -1121,28 +1147,28 @@ SSyncLogBuffer* syncLogBufferCreate() { ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE); if (taosThreadMutexAttrInit(&pBuf->attr) < 0) { - sError("failed to init log buffer mutexattr due to %s", strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); + sError("failed to init log buffer mutexattr due to %s", tstrerror(code)); goto _err; } if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) { - sError("failed to set log buffer mutexattr type due to %s", strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); + sError("failed to set log buffer mutexattr type due to %s", tstrerror(code)); goto _err; } if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) { - sError("failed to init log buffer mutex due to %s", strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); + sError("failed to init log buffer mutex due to %s", tstrerror(code)); goto _err; } - return pBuf; + *ppBuf = pBuf; _err: taosMemoryFree(pBuf); - return NULL; + TAOS_RETURN(code); } void syncLogBufferClear(SSyncLogBuffer* pBuf) { @@ -1170,14 +1196,15 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) { } int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) { + int32_t code = 0; ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex); if (toIndex == pBuf->endIndex) { return 0; } - sInfo("vgId:%d, rollback sync log buffer. toindex:%" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64 - ", %" PRId64 ")", + sInfo("vgId:%d, rollback sync log buffer. toindex:%" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 + ")", pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); // trunc buffer @@ -1197,19 +1224,19 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex // trunc wal SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); - if (lastVer >= toIndex && pNode->pLogStore->syncLogTruncate(pNode->pLogStore, toIndex) < 0) { - sError("vgId:%d, failed to truncate log store since %s. from index:%" PRId64 "", pNode->vgId, terrstr(), toIndex); - return -1; + if (lastVer >= toIndex && (code = pNode->pLogStore->syncLogTruncate(pNode->pLogStore, toIndex)) < 0) { + sError("vgId:%d, failed to truncate log store since %s. from index:%" PRId64 "", pNode->vgId, tstrerror(code), + toIndex); + TAOS_RETURN(code); } lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); ASSERT(toIndex == lastVer + 1); // refill buffer on need if (toIndex <= pBuf->startIndex) { - int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode); - if (ret < 0) { - sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, terrstr()); - return -1; + if ((code = syncLogBufferInitWithoutLock(pBuf, pNode)) < 0) { + sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, tstrerror(code)); + TAOS_RETURN(code); } } @@ -1242,21 +1269,27 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { return 0; } -SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf) { - SSyncRaftEntry* pEntry = NULL; +int32_t syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf, + SSyncRaftEntry** ppEntry) { + int32_t code = 0; + + *ppEntry = NULL; + if (index >= pBuf->endIndex) { - return NULL; + return TSDB_CODE_OUT_OF_RANGE; } + if (index > pBuf->startIndex) { // startIndex might be dummy *pInBuf = true; - pEntry = pBuf->entries[index % pBuf->size].pItem; + *ppEntry = pBuf->entries[index % pBuf->size].pItem; } else { *pInBuf = false; - if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, &pEntry) < 0) { - sWarn("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); + + if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, ppEntry)) < 0) { + sWarn("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, tstrerror(code), index); } } - return pEntry; + TAOS_RETURN(code); } int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId, @@ -1266,15 +1299,16 @@ int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex ind bool inBuf = false; SyncTerm prevLogTerm = -1; SSyncLogBuffer* pBuf = pNode->pLogBuf; + int32_t code = 0; - pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); + code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry); if (pEntry == NULL) { sWarn("vgId:%d, failed to get raft entry for index:%" PRId64 "", pNode->vgId, index); - if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { + if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId); if (pMgr) { sInfo("vgId:%d, reset sync log repl of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId, pDestId->addr, - terrstr(), index); + tstrerror(code), index); (void)syncLogReplReset(pMgr); } } @@ -1282,14 +1316,14 @@ int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex ind } *pBarrier = syncLogReplBarrier(pEntry); - prevLogTerm = syncLogReplGetPrevLogTerm(pMgr, pNode, index); + code = syncLogReplGetPrevLogTerm(pMgr, pNode, index, &prevLogTerm); if (prevLogTerm < 0) { - sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); + sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64 "", pNode->vgId, tstrerror(code), index); goto _err; } if (pTerm) *pTerm = pEntry->term; - int32_t code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut); + code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut); if (code < 0) { sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index); goto _err; @@ -1313,5 +1347,5 @@ _err: syncEntryDestroy(pEntry); pEntry = NULL; } - return -1; + TAOS_RETURN(code); } diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 0e98fe94eb..19080c2d1e 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -18,7 +18,7 @@ #include "syncUtil.h" #include "tjson.h" -const char* syncRoleToStr(ESyncRole role) { +const char *syncRoleToStr(ESyncRole role) { switch (role) { case TAOS_SYNC_ROLE_VOTER: return "true"; @@ -29,11 +29,11 @@ const char* syncRoleToStr(ESyncRole role) { } } -const ESyncRole syncStrToRole(char* str) { - if(strcmp(str, "true") == 0){ +const ESyncRole syncStrToRole(char *str) { + if (strcmp(str, "true") == 0) { return TAOS_SYNC_ROLE_VOTER; } - if(strcmp(str, "false") == 0){ + if (strcmp(str, "false") == 0) { return TAOS_SYNC_ROLE_LEARNER; } @@ -42,51 +42,82 @@ const ESyncRole syncStrToRole(char* str) { static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) { SSyncCfg *pCfg = (SSyncCfg *)pObj; - if (tjsonAddDoubleToObject(pJson, "replicaNum", pCfg->replicaNum) < 0) return -1; - if (tjsonAddDoubleToObject(pJson, "myIndex", pCfg->myIndex) < 0) return -1; - if (tjsonAddDoubleToObject(pJson, "changeVersion", pCfg->changeVersion) < 0) return -1; + int32_t code = 0, lino = 0; + + TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "replicaNum", pCfg->replicaNum)); + TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "myIndex", pCfg->myIndex)); + TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "changeVersion", pCfg->changeVersion)); SJson *nodeInfo = tjsonCreateArray(); - if (nodeInfo == NULL) return -1; - if (tjsonAddItemToObject(pJson, "nodeInfo", nodeInfo) < 0) return -1; + if (nodeInfo == NULL) { + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); + } + if ((code = tjsonAddItemToObject(pJson, "nodeInfo", nodeInfo)) < 0) { + tjsonDelete(nodeInfo); + TAOS_CHECK_EXIT(code); + } for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) { SJson *info = tjsonCreateObject(); - if (info == NULL) return -1; - if (tjsonAddDoubleToObject(info, "nodePort", pCfg->nodeInfo[i].nodePort) < 0) return -1; - if (tjsonAddStringToObject(info, "nodeFqdn", pCfg->nodeInfo[i].nodeFqdn) < 0) return -1; - if (tjsonAddIntegerToObject(info, "nodeId", pCfg->nodeInfo[i].nodeId) < 0) return -1; - if (tjsonAddIntegerToObject(info, "clusterId", pCfg->nodeInfo[i].clusterId) < 0) return -1; - if (tjsonAddStringToObject(info, "isReplica", syncRoleToStr(pCfg->nodeInfo[i].nodeRole)) < 0) return -1; - if (tjsonAddItemToArray(nodeInfo, info) < 0) return -1; + if (info == NULL) { + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); + } + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(info, "nodePort", pCfg->nodeInfo[i].nodePort), NULL, _err); + TAOS_CHECK_GOTO(tjsonAddStringToObject(info, "nodeFqdn", pCfg->nodeInfo[i].nodeFqdn), NULL, _err); + TAOS_CHECK_GOTO(tjsonAddIntegerToObject(info, "nodeId", pCfg->nodeInfo[i].nodeId), NULL, _err); + TAOS_CHECK_GOTO(tjsonAddIntegerToObject(info, "clusterId", pCfg->nodeInfo[i].clusterId), NULL, _err); + TAOS_CHECK_GOTO(tjsonAddStringToObject(info, "isReplica", syncRoleToStr(pCfg->nodeInfo[i].nodeRole)), NULL, _err); + TAOS_CHECK_GOTO(tjsonAddItemToArray(nodeInfo, info), NULL, _err); + continue; + _err: + tjsonDelete(info); + break; } - - return 0; +_exit: + if (code < 0) { + sError("failed to encode sync cfg at line %d since %s", lino, tstrerror(code)); + } + TAOS_RETURN(code); } static int32_t syncEncodeRaftCfg(const void *pObj, SJson *pJson) { SRaftCfg *pCfg = (SRaftCfg *)pObj; - if (tjsonAddObject(pJson, "SSyncCfg", syncEncodeSyncCfg, (void *)&pCfg->cfg) < 0) return -1; - if (tjsonAddDoubleToObject(pJson, "isStandBy", pCfg->isStandBy) < 0) return -1; - if (tjsonAddDoubleToObject(pJson, "snapshotStrategy", pCfg->snapshotStrategy) < 0) return -1; - if (tjsonAddDoubleToObject(pJson, "batchSize", pCfg->batchSize) < 0) return -1; - if (tjsonAddIntegerToObject(pJson, "lastConfigIndex", pCfg->lastConfigIndex) < 0) return -1; - if (tjsonAddDoubleToObject(pJson, "configIndexCount", pCfg->configIndexCount) < 0) return -1; + int32_t code = 0, lino = 0; + TAOS_CHECK_EXIT(tjsonAddObject(pJson, "SSyncCfg", syncEncodeSyncCfg, (void *)&pCfg->cfg)); + TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "isStandBy", pCfg->isStandBy)); + TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "snapshotStrategy", pCfg->snapshotStrategy)); + TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "batchSize", pCfg->batchSize)); + TAOS_CHECK_EXIT(tjsonAddIntegerToObject(pJson, "lastConfigIndex", pCfg->lastConfigIndex)); + TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "configIndexCount", pCfg->configIndexCount)); SJson *configIndexArr = tjsonCreateArray(); - if (configIndexArr == NULL) return -1; - if (tjsonAddItemToObject(pJson, "configIndexArr", configIndexArr) < 0) return -1; + if (configIndexArr == NULL) { + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); + } + if ((code = tjsonAddItemToObject(pJson, "configIndexArr", configIndexArr)) < 0) { + tjsonDelete(configIndexArr); + TAOS_CHECK_EXIT(code); + } for (int32_t i = 0; i < pCfg->configIndexCount; ++i) { SJson *configIndex = tjsonCreateObject(); - if (configIndex == NULL) return -1; - if (tjsonAddIntegerToObject(configIndex, "index", pCfg->configIndexArr[i]) < 0) return -1; - if (tjsonAddItemToArray(configIndexArr, configIndex) < 0) return -1; + if (configIndex == NULL) { + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); + } + TAOS_CHECK_EXIT(tjsonAddIntegerToObject(configIndex, "index", pCfg->configIndexArr[i])); + TAOS_CHECK_EXIT(tjsonAddItemToArray(configIndexArr, configIndex)); + continue; + _err: + tjsonDelete(configIndex); + break; } - - return 0; +_exit: + if (code < 0) { + sError("failed to encode raft cfg at line %d since %s", lino, tstrerror(code)); + } + TAOS_RETURN(code); } int32_t syncWriteCfgFile(SSyncNode *pNode) { - int32_t code = -1; + int32_t code = 0, lino = 0; char *buffer = NULL; SJson *pJson = NULL; TdFilePtr pFile = NULL; @@ -95,39 +126,46 @@ int32_t syncWriteCfgFile(SSyncNode *pNode) { char file[PATH_MAX] = {0}; snprintf(file, sizeof(file), "%s.bak", realfile); - terrno = TSDB_CODE_OUT_OF_MEMORY; - pJson = tjsonCreateObject(); - if (pJson == NULL) goto _OVER; - if (tjsonAddObject(pJson, "RaftCfg", syncEncodeRaftCfg, pCfg) < 0) goto _OVER; + if ((pJson = tjsonCreateObject()) == NULL) { + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); + } + TAOS_CHECK_EXIT(tjsonAddObject(pJson, "RaftCfg", syncEncodeRaftCfg, pCfg)); buffer = tjsonToString(pJson); - if (buffer == NULL) goto _OVER; - terrno = 0; + if (buffer == NULL) { + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); + } pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); - if (pFile == NULL) goto _OVER; + if (pFile == NULL) { + code = terrno ? terrno : TAOS_SYSTEM_ERROR(errno); + TAOS_CHECK_EXIT(code); + } int32_t len = strlen(buffer); - if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER; - if (taosFsyncFile(pFile) < 0) goto _OVER; + if (taosWriteFile(pFile, buffer, len) <= 0) { + TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); + } + if (taosFsyncFile(pFile) < 0) { + TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); + } - taosCloseFile(&pFile); - if (taosRenameFile(file, realfile) != 0) goto _OVER; + (void)taosCloseFile(&pFile); + if (taosRenameFile(file, realfile) != 0) { + TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); + } - code = 0; - sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d, lastConfigIndex:%" PRId64 ", " - "changeVersion:%d", pNode->vgId, - realfile, len, pNode->raftCfg.lastConfigIndex, pNode->raftCfg.cfg.changeVersion); + sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d", + pNode->vgId, realfile, len, pNode->raftCfg.lastConfigIndex, pNode->raftCfg.cfg.changeVersion); -_OVER: +_exit: if (pJson != NULL) tjsonDelete(pJson); if (buffer != NULL) taosMemoryFree(buffer); if (pFile != NULL) taosCloseFile(&pFile); if (code != 0) { - if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno); - sError("vgId:%d, failed to write sync cfg file:%s since %s", pNode->vgId, realfile, terrstr()); + sError("vgId:%d, failed to write sync cfg file:%s since %s", pNode->vgId, realfile, tstrerror(code)); } - return code; + TAOS_RETURN(code); } static int32_t syncDecodeSyncCfg(const SJson *pJson, void *pObj) { @@ -135,32 +173,31 @@ static int32_t syncDecodeSyncCfg(const SJson *pJson, void *pObj) { int32_t code = 0; tjsonGetInt32ValueFromDouble(pJson, "replicaNum", pCfg->replicaNum, code); - if (code < 0) return -1; + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; tjsonGetInt32ValueFromDouble(pJson, "myIndex", pCfg->myIndex, code); - if (code < 0) return -1; + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; tjsonGetInt32ValueFromDouble(pJson, "changeVersion", pCfg->changeVersion, code); - if (code < 0) return -1; + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; SJson *nodeInfo = tjsonGetObjectItem(pJson, "nodeInfo"); - if (nodeInfo == NULL) return -1; + if (nodeInfo == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; pCfg->totalReplicaNum = tjsonGetArraySize(nodeInfo); for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) { SJson *info = tjsonGetArrayItem(nodeInfo, i); - if (info == NULL) return -1; + if (info == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; tjsonGetUInt16ValueFromDouble(info, "nodePort", pCfg->nodeInfo[i].nodePort, code); - if (code < 0) return -1; + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; code = tjsonGetStringValue(info, "nodeFqdn", pCfg->nodeInfo[i].nodeFqdn); - if (code < 0) return -1; + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; tjsonGetNumberValue(info, "nodeId", pCfg->nodeInfo[i].nodeId, code); tjsonGetNumberValue(info, "clusterId", pCfg->nodeInfo[i].clusterId, code); char role[10] = {0}; code = tjsonGetStringValue(info, "isReplica", role); - if(code < 0) return -1; - if(strlen(role) != 0){ + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; + if (strlen(role) != 0) { pCfg->nodeInfo[i].nodeRole = syncStrToRole(role); - } - else{ + } else { pCfg->nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER; } } @@ -172,34 +209,34 @@ static int32_t syncDecodeRaftCfg(const SJson *pJson, void *pObj) { SRaftCfg *pCfg = (SRaftCfg *)pObj; int32_t code = 0; - if (tjsonToObject(pJson, "SSyncCfg", syncDecodeSyncCfg, (void *)&pCfg->cfg) < 0) return -1; + TAOS_CHECK_RETURN(tjsonToObject(pJson, "SSyncCfg", syncDecodeSyncCfg, (void *)&pCfg->cfg)); tjsonGetInt8ValueFromDouble(pJson, "isStandBy", pCfg->isStandBy, code); - if (code < 0) return -1; + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; tjsonGetInt8ValueFromDouble(pJson, "snapshotStrategy", pCfg->snapshotStrategy, code); - if (code < 0) return -1; + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; tjsonGetInt32ValueFromDouble(pJson, "batchSize", pCfg->batchSize, code); - if (code < 0) return -1; + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; tjsonGetNumberValue(pJson, "lastConfigIndex", pCfg->lastConfigIndex, code); - if (code < 0) return -1; + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; tjsonGetInt32ValueFromDouble(pJson, "configIndexCount", pCfg->configIndexCount, code); + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; SJson *configIndexArr = tjsonGetObjectItem(pJson, "configIndexArr"); - if (configIndexArr == NULL) return -1; + if (configIndexArr == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; pCfg->configIndexCount = tjsonGetArraySize(configIndexArr); for (int32_t i = 0; i < pCfg->configIndexCount; ++i) { SJson *configIndex = tjsonGetArrayItem(configIndexArr, i); - if (configIndex == NULL) return -1; + if (configIndex == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; tjsonGetNumberValue(configIndex, "index", pCfg->configIndexArr[i], code); - if (code < 0) return -1; + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; } - return 0; } int32_t syncReadCfgFile(SSyncNode *pNode) { - int32_t code = -1; + int32_t code = 0; TdFilePtr pFile = NULL; char *pData = NULL; SJson *pJson = NULL; @@ -208,27 +245,27 @@ int32_t syncReadCfgFile(SSyncNode *pNode) { pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - sError("vgId:%d, failed to open sync cfg file:%s since %s", pNode->vgId, file, terrstr()); + code = TAOS_SYSTEM_ERROR(errno); + sError("vgId:%d, failed to open sync cfg file:%s since %s", pNode->vgId, file, tstrerror(code)); goto _OVER; } int64_t size = 0; if (taosFStatFile(pFile, &size, NULL) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - sError("vgId:%d, failed to fstat sync cfg file:%s since %s", pNode->vgId, file, terrstr()); + code = TAOS_SYSTEM_ERROR(errno); + sError("vgId:%d, failed to fstat sync cfg file:%s since %s", pNode->vgId, file, tstrerror(code)); goto _OVER; } pData = taosMemoryMalloc(size + 1); if (pData == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } if (taosReadFile(pFile, pData, size) != size) { - terrno = TAOS_SYSTEM_ERROR(errno); - sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, terrstr()); + code = TAOS_SYSTEM_ERROR(errno); + sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, tstrerror(code)); goto _OVER; } @@ -236,18 +273,16 @@ int32_t syncReadCfgFile(SSyncNode *pNode) { pJson = tjsonParse(pData); if (pJson == NULL) { - terrno = TSDB_CODE_INVALID_JSON_FORMAT; + code = TSDB_CODE_INVALID_JSON_FORMAT; goto _OVER; } if (tjsonToObject(pJson, "RaftCfg", syncDecodeRaftCfg, (void *)pCfg) < 0) { - terrno = TSDB_CODE_INVALID_JSON_FORMAT; + code = TSDB_CODE_INVALID_JSON_FORMAT; goto _OVER; } - code = 0; - sInfo("vgId:%d, succceed to read sync cfg file %s, changeVersion:%d", - pNode->vgId, file, pCfg->cfg.changeVersion); + sInfo("vgId:%d, succceed to read sync cfg file %s, changeVersion:%d", pNode->vgId, file, pCfg->cfg.changeVersion); _OVER: if (pData != NULL) taosMemoryFree(pData); @@ -255,15 +290,15 @@ _OVER: if (pFile != NULL) taosCloseFile(&pFile); if (code != 0) { - sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, terrstr()); + sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, tstrerror(code)); } - return code; + TAOS_RETURN(code); } int32_t syncAddCfgIndex(SSyncNode *pNode, SyncIndex cfgIndex) { SRaftCfg *pCfg = &pNode->raftCfg; - if (pCfg->configIndexCount < MAX_CONFIG_INDEX_COUNT) { - return -1; + if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) { + return TSDB_CODE_OUT_OF_RANGE; } pCfg->configIndexArr[pCfg->configIndexCount] = cfgIndex; diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 8f42780eb9..fd6781f354 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -99,9 +99,14 @@ void syncEntryDestroy(SSyncRaftEntry* pEntry) { } } -void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) { +int32_t syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) { pRpcMsg->msgType = pEntry->originalRpcType; pRpcMsg->contLen = (int32_t)(pEntry->dataLen); pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + if (pRpcMsg->pCont == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen); + + return 0; } diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index dbbd914041..b2233ae664 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -65,7 +65,7 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) { } // timer replicate - syncNodeReplicate(ths); + (void)syncNodeReplicate(ths); // clean mnode index if (syncNodeIsMnode(ths)) { @@ -89,7 +89,7 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) { snapshotSenderStop(pSender, false); } else { sSWarn(pSender, "snap replication resend."); - snapshotReSend(pSender); + (void)snapshotReSend(pSender); } } } @@ -112,14 +112,14 @@ int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pRpc) { if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) { ++(ths->pingTimerCounter); - syncNodeTimerRoutine(ths); + (void)syncNodeTimerRoutine(ths); } } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { if (atomic_load_64(&ths->electTimerLogicClock) <= pMsg->logicClock) { ++(ths->electTimerCounter); - syncNodeElect(ths); + (void)syncNodeElect(ths); } } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index ef1c2dcdc8..4517119eaa 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -1100,6 +1100,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { if (fd >= 0) (void)close(fd); #endif if (fp != NULL) (void)fclose(fp); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; }