diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 1c129a0ed1..5054339e8e 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -45,9 +45,6 @@ // static inline int64_t syncNodeAbs64(int64_t a, int64_t b) { - ASSERT(a >= 0); - ASSERT(b >= 0); - int64_t c = a > b ? a - b : b - a; return c; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 179bb4d503..d947a488bf 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -3523,6 +3523,9 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { return 0; } SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf); + if (matchTerm < 0) { + return TSDB_CODE_SYN_INTERNAL_ERROR; + } if (pMsg->currentTerm == matchTerm) { (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 22947bdd8e..5e6204dde8 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -46,8 +46,8 @@ int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) { int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) { int32_t code = 0; + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); (void)taosThreadMutexLock(&pBuf->mutex); - (void)syncLogBufferValidate(pBuf); SyncIndex index = pEntry->index; if (index - pBuf->startIndex >= pBuf->size) { @@ -102,13 +102,13 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt pBuf->entries[index % pBuf->size] = tmp; pBuf->endIndex = index + 1; - (void)syncLogBufferValidate(pBuf); (void)taosThreadMutexUnlock(&pBuf->mutex); + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); return 0; _err: - (void)syncLogBufferValidate(pBuf); (void)taosThreadMutexUnlock(&pBuf->mutex); + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); taosMsleep(1); TAOS_RETURN(code); } @@ -134,7 +134,11 @@ int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI if (prevIndex >= pBuf->startIndex) { pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem; - ASSERTS(pEntry != NULL, "no log entry found"); + if (pEntry == NULL) { + sError("vgId:%d, failed to get pre log term since no log entry found", pNode->vgId); + *pSyncTerm = -1; + TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR); + } prevLogTerm = pEntry->term; *pSyncTerm = prevLogTerm; return 0; @@ -142,9 +146,18 @@ int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) { int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs; - ASSERTS(timeMs != 0, "no log entry found"); + if (timeMs == 0) { + sError("vgId:%d, failed to get pre log term since timeMs is 0", pNode->vgId); + *pSyncTerm = -1; + TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR); + } prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term; - ASSERT(prevIndex == 0 || prevLogTerm != 0); + if (!(prevIndex == 0 || prevLogTerm != 0)) { + sError("vgId:%d, failed to get pre log term prevIndex:%" PRId64 ", prevLogTerm:%" PRId64, pNode->vgId, prevIndex, + prevLogTerm); + *pSyncTerm = -1; + TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR); + } *pSyncTerm = prevLogTerm; return 0; } @@ -289,7 +302,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); // validate - (void)syncLogBufferValidate(pBuf); + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); return 0; _exit: @@ -307,8 +320,8 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { } int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); (void)taosThreadMutexLock(&pBuf->mutex); - (void)syncLogBufferValidate(pBuf); for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) { SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; if (pEntry == NULL) continue; @@ -321,15 +334,19 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { if (code < 0) { sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code)); } - (void)syncLogBufferValidate(pBuf); (void)taosThreadMutexUnlock(&pBuf->mutex); + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); return code; } FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) { SyncIndex index = pBuf->matchIndex; SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; - ASSERT(pEntry != NULL); + if (pEntry == NULL) { + sError("failed to get last match term since entry is null"); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } return pEntry->term; } @@ -348,8 +365,8 @@ bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) { } int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) { + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); (void)taosThreadMutexLock(&pBuf->mutex); - (void)syncLogBufferValidate(pBuf); int32_t code = 0; SyncIndex index = pEntry->index; SyncIndex prevIndex = pEntry->index - 1; @@ -357,6 +374,12 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt SSyncRaftEntry* pExist = NULL; bool inBuf = true; + if (lastMatchTerm < 0) { + sError("vgId:%d, failed to accept, lastMatchTerm:%" PRId64, pNode->vgId, lastMatchTerm); + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } + if (index <= pBuf->commitIndex) { sTrace("vgId:%d, already committed. index:%" PRId64 ", term:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", @@ -364,7 +387,11 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt pBuf->endIndex); SyncTerm term = -1; code = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1, &term); - ASSERT(pEntry->term >= 0); + if (pEntry->term < 0) { + sError("vgId:%d, failed to accept, pEntry->term:%" PRId64, pNode->vgId, pEntry->term); + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } if (term == pEntry->term) { code = 0; } @@ -401,7 +428,12 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt // check current in buffer code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pExist); if (pExist != NULL) { - ASSERT(pEntry->index == pExist->index); + if (pEntry->index != pExist->index) { + sError("vgId:%d, failed to accept, pEntry->index:%" PRId64 ", pExist->index:%" PRId64, pNode->vgId, pEntry->index, + pExist->index); + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } if (pEntry->term != pExist->term) { (void)syncLogBufferRollback(pBuf, pNode, index); } else { @@ -411,7 +443,14 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt pBuf->endIndex); SyncTerm existPrevTerm = -1; (void)syncLogReplGetPrevLogTerm(NULL, pNode, index, &existPrevTerm); - ASSERT(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm)); + if (!(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm))) { + sError("vgId:%d, failed to accept, pEntry->term:%" PRId64 ", pExist->indexpExist->term:%" PRId64 + ", pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64 ", prevTerm:%" PRId64 + ", existPrevTerm:%" PRId64, + pNode->vgId, pEntry->term, pExist->term, pEntry->index, pBuf->matchIndex, prevTerm, existPrevTerm); + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } code = 0; goto _out; } @@ -446,8 +485,8 @@ _out: syncEntryDestroy(pExist); pExist = NULL; } - (void)syncLogBufferValidate(pBuf); (void)taosThreadMutexUnlock(&pBuf->mutex); + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); TAOS_RETURN(code); } @@ -479,8 +518,8 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaf } int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char* str) { + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); (void)taosThreadMutexLock(&pBuf->mutex); - (void)syncLogBufferValidate(pBuf); SSyncLogStore* pLogStore = pNode->pLogStore; int64_t matchIndex = pBuf->matchIndex; @@ -488,7 +527,11 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p while (pBuf->matchIndex + 1 < pBuf->endIndex) { int64_t index = pBuf->matchIndex + 1; - ASSERT(index >= 0); + if (index < 0) { + sError("vgId:%d, failed to proceed index:%" PRId64, pNode->vgId, index); + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } // try to proceed SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size]; @@ -501,14 +544,37 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p goto _out; } - ASSERT(index == pEntry->index); + if (index != pEntry->index) { + sError("vgId:%d, failed to proceed index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, index, pEntry->index); + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } // match SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem; - ASSERT(pMatch != NULL); - ASSERT(pMatch->index == pBuf->matchIndex); - ASSERT(pMatch->index + 1 == pEntry->index); - ASSERT(prevLogIndex == pMatch->index); + if (pMatch == NULL) { + sError("vgId:%d, failed to proceed since pMatch is null", pNode->vgId); + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } + if (pMatch->index != pBuf->matchIndex) { + sError("vgId:%d, failed to proceed, pMatch->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId, + pMatch->index, pBuf->matchIndex); + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } + if (pMatch->index + 1 != pEntry->index) { + sError("vgId:%d, failed to proceed, pMatch->index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, + pMatch->index, pEntry->index); + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } + if (prevLogIndex != pMatch->index) { + sError("vgId:%d, failed to proceed, prevLogIndex:%" PRId64 ", pMatch->index:%" PRId64, pNode->vgId, prevLogIndex, + pMatch->index); + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } if (pMatch->term != prevLogTerm) { sInfo( @@ -567,7 +633,12 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p // replicate on demand (void)syncNodeReplicateWithoutLock(pNode); - ASSERT(pEntry->index == pBuf->matchIndex); + if (pEntry->index != pBuf->matchIndex) { + sError("vgId:%d, failed to proceed, pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId, + pEntry->index, pBuf->matchIndex); + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } // update my match index matchIndex = pBuf->matchIndex; @@ -579,8 +650,8 @@ _out: if (pMatchTerm) { *pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term; } - (void)syncLogBufferValidate(pBuf); (void)taosThreadMutexUnlock(&pBuf->mutex); + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); return matchIndex; } @@ -643,17 +714,36 @@ _exit: } int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) { - ASSERT(pBuf->startIndex <= pBuf->matchIndex); - ASSERT(pBuf->commitIndex <= pBuf->matchIndex); - ASSERT(pBuf->matchIndex < pBuf->endIndex); - ASSERT(pBuf->endIndex - pBuf->startIndex <= pBuf->size); - ASSERT(pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem); + if (pBuf->startIndex > pBuf->matchIndex) { + sError("failed to validate, pBuf->startIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->startIndex, + pBuf->matchIndex); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (pBuf->commitIndex > pBuf->matchIndex) { + sError("failed to validate, pBuf->commitIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->commitIndex, + pBuf->matchIndex); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (pBuf->matchIndex >= pBuf->endIndex) { + sError("failed to validate, pBuf->matchIndex:%" PRId64 ", pBuf->endIndex:%" PRId64, pBuf->matchIndex, + pBuf->endIndex); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (pBuf->endIndex - pBuf->startIndex > pBuf->size) { + sError("failed to validate, pBuf->endIndex:%" PRId64 ", pBuf->startIndex:%" PRId64 ", pBuf->size:%" PRId64, + pBuf->endIndex, pBuf->startIndex, pBuf->size); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem == NULL) { + sError("failed to validate since pItem is null"); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } return 0; } int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex) { + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); (void)taosThreadMutexLock(&pBuf->mutex); - (void)syncLogBufferValidate(pBuf); SSyncLogStore* pLogStore = pNode->pLogStore; SSyncFSM* pFsm = pNode->pFsm; @@ -778,15 +868,18 @@ _out: syncEntryDestroy(pNextEntry); pNextEntry = NULL; } - (void)syncLogBufferValidate(pBuf); (void)taosThreadMutexUnlock(&pBuf->mutex); + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); TAOS_RETURN(code); } void syncLogReplReset(SSyncLogReplMgr* pMgr) { if (pMgr == NULL) return; - ASSERT(pMgr->startIndex >= 0); + if (pMgr->startIndex < 0) { + sError("failed to reset, pMgr->startIndex:%" PRId64, pMgr->startIndex); + return; + } for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) { (void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0])); } @@ -1285,13 +1378,13 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex } if (pBuf->endIndex != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR; - (void)syncLogBufferValidate(pBuf); + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); return 0; } int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); (void)taosThreadMutexLock(&pBuf->mutex); - (void)syncLogBufferValidate(pBuf); SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR; SyncIndex index = pBuf->endIndex - 1; @@ -1308,8 +1401,8 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; syncLogReplReset(pMgr); } - (void)syncLogBufferValidate(pBuf); (void)taosThreadMutexUnlock(&pBuf->mutex); + TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); return 0; }