diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 9b49c1908d..548a8c4cdf 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -668,8 +668,14 @@ int32_t taosGetErrSize(); #define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916) #define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917) #define TSDB_CODE_SYN_NEGOTIATION_WIN_FULL TAOS_DEF_ERROR_CODE(0, 0x0918) +#define TSDB_CODE_SYN_WRONG_TERM TAOS_DEF_ERROR_CODE(0, 0x0919) +#define TSDB_CODE_SYN_WRONG_FSM_STATE TAOS_DEF_ERROR_CODE(0, 0x091A) +#define TSDB_CODE_SYN_WRONG_SYNC_STATE TAOS_DEF_ERROR_CODE(0, 0x091B) +#define TSDB_CODE_SYN_WRONG_REF TAOS_DEF_ERROR_CODE(0, 0x091C) +#define TSDB_CODE_SYN_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x091D) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) + // tq #define TSDB_CODE_TQ_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0A00) #define TSDB_CODE_TQ_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0A01) diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 4b7ed59039..6ea5402c28 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -40,6 +40,7 @@ // int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { + int32_t code = 0; SyncAppendEntriesReply* pMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont; int32_t ret = 0; @@ -59,7 +60,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (pMsg->term > raftStoreGetTerm(ths)) { syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); syncNodeStepDown(ths, pMsg->term); - return -1; + return TSDB_CODE_SYN_WRONG_TERM; } ASSERT(pMsg->term == raftStoreGetTerm(ths)); @@ -81,17 +82,19 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncNodeStepDown(ths, pMsg->term); } } else { - (void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex); + TAOS_CHECK_RETURN(syncLogBufferCommit(ths->pLogBuf, ths, commitIndex)); } } // replicate log SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId); if (pMgr == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; sError("vgId:%d, failed to get log repl mgr for src addr: 0x%016" PRIx64, ths->vgId, pMsg->srcId.addr); - return -1; + TAOS_RETURN(code); } - (void)syncLogReplProcessReply(pMgr, ths, pMsg); + TAOS_CHECK_RETURN(syncLogReplProcessReply(pMgr, ths, pMsg)); } - return 0; + TAOS_RETURN(code); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 3c401a978b..b55267a0e7 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -72,17 +72,25 @@ bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) { } int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) { + int32_t code = 0; SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore); commitIndex = TMAX(commitIndex, ths->commitIndex); ths->commitIndex = TMIN(commitIndex, lastVer); - ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex); + if ((code = ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex)) != 0) { + // TODO add return when error + sError("failed to update commit index since %s", tstrerror(code)); + } return ths->commitIndex; } int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { + int32_t code = 0; if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) { SyncIndex commitIndex = indexLikely; - syncNodeUpdateCommitIndex(ths, commitIndex); + if ((code = syncNodeUpdateCommitIndex(ths, commitIndex)) != 0) { + // TODO add return when error + sError("failed to update commit index since %s", tstrerror(code)); + } sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 "", ths->vgId, ths->state, raftStoreGetTerm(ths), commitIndex); } @@ -90,9 +98,13 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { } int64_t syncNodeUpdateAssignedCommitIndex(SSyncNode* ths, SyncIndex assignedCommitIndex) { + int32_t code = 0; SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore); assignedCommitIndex = TMAX(assignedCommitIndex, ths->assignedCommitIndex); ths->assignedCommitIndex = TMIN(assignedCommitIndex, lastVer); - ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->assignedCommitIndex); + if ((code = ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->assignedCommitIndex)) != 0) { + // TODO add return when error + sError("failed to update commit index since %s", tstrerror(code)); + } return ths->commitIndex; } diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index c57e7e273f..3b595f464d 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -73,7 +73,7 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { int32_t syncNodeElect(SSyncNode* pSyncNode) { if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) { sNError(pSyncNode, "skip leader election due to incomplete fsm state"); - return -1; + return TSDB_CODE_SYN_WRONG_FSM_STATE; } sNInfo(pSyncNode, "begin election"); @@ -86,7 +86,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) { sNError(pSyncNode, "not candidate, can not elect"); - return -1; + return TSDB_CODE_SYN_WRONG_SYNC_STATE; } // start election diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index 1fa67cfa4d..cce8149037 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -48,14 +48,14 @@ int32_t syncInit() { if (gNodeRefId < 0) { sError("failed to init node ref"); syncCleanUp(); - return -1; + return TSDB_CODE_SYN_WRONG_REF; } gHbDataRefId = taosOpenRef(200, (RefFp)syncHbTimerDataFree); if (gHbDataRefId < 0) { sError("failed to init hb-data ref"); syncCleanUp(); - return -1; + return TSDB_CODE_SYN_WRONG_REF; } sDebug("sync rsetId:%d is open", gNodeRefId); @@ -106,7 +106,7 @@ void syncNodeRelease(SSyncNode *pNode) { int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) { pData->rid = taosAddRef(gHbDataRefId, pData); - if (pData->rid < 0) return -1; + if (pData->rid < 0) return TSDB_CODE_SYN_WRONG_REF; return pData->rid; } diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index add2f1a5dd..4946912941 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -98,6 +98,7 @@ SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pRaftId) { } } + terrno = TSDB_CODE_SYN_INVALID_ID; sError("vgId:%d, indexmgr get replmgr from dnode:%d cluster:%d failed", pNode->vgId, DID(pRaftId), CID(pRaftId)); return NULL; } @@ -110,6 +111,7 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) } } + terrno = TSDB_CODE_SYN_INVALID_ID; sError("vgId:%d, indexmgr get index from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId), CID(pRaftId)); return SYNC_INDEX_INVALID; @@ -137,7 +139,8 @@ int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftI sError("vgId:%d, indexmgr get start-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId), CID(pRaftId)); - return -1; + return TSDB_CODE_SYN_INVALID_ID; + ; } void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t recvTime) { @@ -162,7 +165,7 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId sError("vgId:%d, indexmgr get recv-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId), CID(pRaftId)); - return -1; + return TSDB_CODE_SYN_INVALID_ID; } void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncTerm term) { @@ -187,5 +190,5 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) { sError("vgId:%d, indexmgr get term from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId), CID(pRaftId)); - return -1; + return TSDB_CODE_SYN_INVALID_ID; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 09c7224c1e..10ad170218 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -527,6 +527,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEGOTIATION_WIN_FULL, "Sync negotiation win is full") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_TERM, "Sync got a wrong term") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_FSM_STATE, "Sync got a wrong fsm state") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_SYNC_STATE, "Sync got a wrong state") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_REF, "Sync got a wrong ref") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_ID, "Sync invalid id") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") //tq