diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index a76dfa1b51..9a1804ce6a 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1774,7 +1774,7 @@ static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj * int32_t nCols) { // if (pColCmpr == NULL || colName == NULL) return -1; - ASSERT(taosArrayGetSize(pField) == nCols); + if (taosArrayGetSize(pField) != nCols) return TSDB_CODE_FAILED; TAOS_FIELD *p = taosArrayGet(pField, 0); int32_t code = 0; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 6ea5402c28..005cf4337d 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -57,14 +57,12 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { - if (pMsg->term > raftStoreGetTerm(ths)) { + if (pMsg->term != raftStoreGetTerm(ths)) { syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); syncNodeStepDown(ths, pMsg->term); return TSDB_CODE_SYN_WRONG_TERM; } - ASSERT(pMsg->term == raftStoreGetTerm(ths)); - sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 3e2b98c35b..1c129a0ed1 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -57,7 +57,10 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { return pSyncNode->qu bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) { int count = 0; SSyncIndexMgr* pMatches = pNode->pMatchIndex; - ASSERT(pNode->replicaNum == pMatches->replicaNum); + if (pNode->replicaNum != pMatches->replicaNum) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return false; + }; for (int i = 0; i < pNode->totalReplicaNum; i++) { if(pNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_VOTER){ diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 3b595f464d..fc056c9eba 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -119,7 +119,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { } ret = syncNodeRequestVotePeers(pSyncNode); - ASSERT(ret == 0); + if (ret != 0) return ret; syncNodeResetElectTimer(pSyncNode); return ret; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ffd180ee01..2f5a3488a1 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -988,9 +988,18 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) { int32_t code = 0; - ASSERTS(pNode->pLogStore != NULL, "log store not created"); - ASSERTS(pNode->pFsm != NULL, "pFsm not registered"); - ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered"); + if (pNode->pLogStore == NULL) { + sError("vgId:%d, log store not created", pNode->vgId); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (pNode->pFsm == NULL) { + sError("vgId:%d, pFsm not registered", pNode->vgId); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (pNode->pFsm->FpGetSnapshotInfo == NULL) { + sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } SSnapshot snapshot = {0}; // TODO check return value (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); @@ -1384,8 +1393,14 @@ void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) { int32_t syncNodeRestore(SSyncNode* pSyncNode) { int32_t code = 0; - ASSERTS(pSyncNode->pLogStore != NULL, "log store not created"); - ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created"); + if (pSyncNode->pLogStore == NULL) { + sError("vgId:%d, log store not created", pSyncNode->vgId); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (pSyncNode->pLogBuf == NULL) { + sError("vgId:%d, ring log buffer not created", pSyncNode->vgId); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } (void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex); SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); @@ -1400,7 +1415,7 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { TAOS_RETURN(code); } - ASSERT(endIndex == lastVer + 1); + if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR; pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex); sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex); @@ -1743,7 +1758,10 @@ inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) { } } - ASSERT(b1 == b2); + if (b1 != b2) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return false; + } return b1; } @@ -2194,7 +2212,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { } SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - // ASSERT(lastIndex >= 0); + sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); } @@ -2222,7 +2240,7 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { } int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER); + if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR; syncNodeBecomeLeader(pSyncNode, "assigned leader to leader"); sNTrace(pSyncNode, "assigned leader to leader"); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 782d97f789..22947bdd8e 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -71,16 +71,32 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt goto _err; } - ASSERT(index == pBuf->endIndex); + if (index != pBuf->endIndex) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _err; + }; SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem; - ASSERT(pExist == NULL); + if (pExist != NULL) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _err; + } // initial log buffer with at least one item, e.g. commitIndex SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem; - ASSERTS(pMatch != NULL, "no matched log entry"); - ASSERT(pMatch->index + 1 == index); - ASSERT(pMatch->term <= pEntry->term); + if (pMatch == NULL) { + sError("vgId:%d, no matched log entry", pNode->vgId); + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _err; + } + if (pMatch->index + 1 != index) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _err; + } + if (!(pMatch->term <= pEntry->term)) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _err; + } SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term}; pBuf->entries[index % pBuf->size] = tmp; @@ -114,7 +130,7 @@ int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST); } - ASSERT(index - 1 == prevIndex); + if (index - 1 != prevIndex) return TSDB_CODE_SYN_INTERNAL_ERROR; if (prevIndex >= pBuf->startIndex) { pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem; @@ -178,9 +194,18 @@ int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex } int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { - ASSERTS(pNode->pLogStore != NULL, "log store not created"); - ASSERTS(pNode->pFsm != NULL, "pFsm not registered"); - ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered"); + if (pNode->pLogStore == NULL) { + sError("log store not created"); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (pNode->pFsm == NULL) { + sError("pFsm not registered"); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (pNode->pFsm->FpGetSnapshotInfo == NULL) { + sError("FpGetSnapshotInfo not registered"); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } int32_t code = 0, lino = 0; SSnapshot snapshot = {0}; @@ -191,7 +216,8 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { TAOS_CHECK_EXIT(syncLogValidateAlignmentOfCommit(pNode, commitIndex)); SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); - ASSERT(lastVer >= commitIndex); + if (lastVer < commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR; + ; SyncIndex toIndex = lastVer; // update match index pBuf->commitIndex = commitIndex; @@ -239,7 +265,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { // put a dummy record at commitIndex if present in log buffer if (takeDummy) { - ASSERT(index == pBuf->commitIndex); + if (index != pBuf->commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR; SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId); if (pDummy == NULL) { @@ -392,9 +418,18 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt } // update - ASSERT(pBuf->startIndex < index); - ASSERT(index - pBuf->startIndex < pBuf->size); - ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL); + if (!(pBuf->startIndex < index)) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + }; + if (!(index - pBuf->startIndex < pBuf->size)) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } + if (pBuf->entries[index % pBuf->size].pItem != NULL) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm}; pEntry = NULL; pBuf->entries[index % pBuf->size] = tmp; @@ -422,14 +457,14 @@ static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replica int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) { int32_t code = 0; - ASSERT(pEntry->index >= 0); + if (pEntry->index < 0) return TSDB_CODE_SYN_INTERNAL_ERROR; SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore); if (lastVer >= pEntry->index && (code = pLogStore->syncLogTruncate(pLogStore, pEntry->index)) < 0) { sError("failed to truncate log store since %s. from index:%" PRId64 "", tstrerror(code), pEntry->index); TAOS_RETURN(code); } lastVer = pLogStore->syncLogLastIndex(pLogStore); - ASSERT(pEntry->index == lastVer + 1); + if (pEntry->index != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR; bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum); if ((code = pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync)) < 0) { @@ -439,7 +474,7 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaf } lastVer = pLogStore->syncLogLastIndex(pLogStore); - ASSERT(pEntry->index == lastVer); + if (pEntry->index != lastVer) return TSDB_CODE_SYN_INTERNAL_ERROR; return 0; } @@ -718,7 +753,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION; for (SyncIndex index = pBuf->startIndex; index < until; index++) { SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; - ASSERT(pEntry != NULL); + if (pEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR; syncEntryDestroy(pEntry); (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); pBuf->startIndex = index + 1; @@ -786,7 +821,10 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) { int64_t pos = index % pMgr->size; - ASSERT(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex)); + if (!(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex))) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) { break; @@ -809,7 +847,10 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { tstrerror(code), index, pDestId->addr); goto _out; } - ASSERT(barrier == pMgr->states[pos].barrier); + if (barrier != pMgr->states[pos].barrier) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } pMgr->states[pos].timeMs = nowMs; pMgr->states[pos].term = term; pMgr->states[pos].acked = false; @@ -839,11 +880,11 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn SSyncLogBuffer* pBuf = pNode->pLogBuf; SRaftId destId = pMsg->srcId; int32_t code = 0; - ASSERT(pMgr->restored == false); + if (pMgr->restored != false) return TSDB_CODE_SYN_INTERNAL_ERROR; if (pMgr->endIndex == 0) { - ASSERT(pMgr->startIndex == 0); - ASSERT(pMgr->matchIndex == 0); + if (pMgr->startIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR; + if (pMgr->matchIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR; if (pMsg->matchIndex < 0) { pMgr->restored = true; sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 @@ -909,7 +950,7 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn if ((index + 1 < firstVer) || (term < 0) || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) { - ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST); + if (!(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST)) return TSDB_CODE_SYN_INTERNAL_ERROR; if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) { sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId)); TAOS_RETURN(code); @@ -918,13 +959,13 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn return 0; } - ASSERT(index + 1 >= firstVer); + if (!(index + 1 >= firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR; if (term == pMsg->lastMatchTerm) { index = index + 1; - ASSERT(index <= pNode->pLogBuf->matchIndex); + if (!(index <= pNode->pLogBuf->matchIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR; } else { - ASSERT(index > firstVer); + if (!(index > firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR; } } @@ -975,8 +1016,8 @@ int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { } int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { - ASSERT(!pMgr->restored); - ASSERT(pMgr->startIndex >= 0); + if (pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR; + if (!(pMgr->startIndex >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR; int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs(); int64_t nowMs = taosGetMonoTimestampMs(); int32_t code = 0; @@ -996,7 +1037,7 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde TAOS_RETURN(code); } - ASSERT(index >= 0); + if (!(index >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR; pMgr->states[index % pMgr->size].barrier = barrier; pMgr->states[index % pMgr->size].timeMs = nowMs; pMgr->states[index % pMgr->size].term = term; @@ -1014,7 +1055,7 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde } int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { - ASSERT(pMgr->restored); + if (!pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; int32_t batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff)); @@ -1070,7 +1111,7 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { } int32_t syncLogReplContinue(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { - ASSERT(pMgr->restored == true); + if (pMgr->restored != true) return TSDB_CODE_SYN_INTERNAL_ERROR; if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) { if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) { int64_t firstMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs; @@ -1100,7 +1141,10 @@ SSyncLogReplMgr* syncLogReplCreate() { pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]); - ASSERT(pMgr->size == TSDB_SYNC_LOG_BUFFER_SIZE); + if (pMgr->size != TSDB_SYNC_LOG_BUFFER_SIZE) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return NULL; + } return pMgr; } @@ -1115,7 +1159,7 @@ void syncLogReplDestroy(SSyncLogReplMgr* pMgr) { int32_t syncNodeLogReplInit(SSyncNode* pNode) { for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) { - ASSERT(pNode->logReplMgrs[i] == NULL); + if (pNode->logReplMgrs[i] != NULL) return TSDB_CODE_SYN_INTERNAL_ERROR; pNode->logReplMgrs[i] = syncLogReplCreate(); if (pNode->logReplMgrs[i] == NULL) { TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); @@ -1141,7 +1185,10 @@ int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) { pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]); - ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE); + if (pBuf->size != TSDB_SYNC_LOG_BUFFER_SIZE) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _exit; + } if (taosThreadMutexAttrInit(&pBuf->attr) < 0) { code = TAOS_SYSTEM_ERROR(errno); @@ -1194,7 +1241,7 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) { int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) { int32_t code = 0; - ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex); + if (!(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR; if (toIndex == pBuf->endIndex) { return 0; @@ -1217,7 +1264,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex } pBuf->endIndex = toIndex; pBuf->matchIndex = TMIN(pBuf->matchIndex, index); - ASSERT(index + 1 == toIndex); + if (index + 1 != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR; // trunc wal SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); @@ -1227,7 +1274,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex TAOS_RETURN(code); } lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); - ASSERT(toIndex == lastVer + 1); + if (toIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR; // refill buffer on need if (toIndex <= pBuf->startIndex) { @@ -1237,7 +1284,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex } } - ASSERT(pBuf->endIndex == toIndex); + if (pBuf->endIndex != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR; (void)syncLogBufferValidate(pBuf); return 0; } @@ -1246,7 +1293,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { (void)taosThreadMutexLock(&pBuf->mutex); (void)syncLogBufferValidate(pBuf); SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); - ASSERT(lastVer == pBuf->matchIndex); + if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR; SyncIndex index = pBuf->endIndex - 1; (void)syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1); diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index fd6781f354..56a702a9d5 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -70,7 +70,10 @@ SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg) { return NULL; } memcpy(pEntry, pMsg->data, pMsg->dataLen); - ASSERT(pEntry->bytes == pMsg->dataLen); + if (pEntry->bytes != pMsg->dataLen) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return NULL; + } return pEntry; } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index ecca777806..e7ecb67e69 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -61,7 +61,10 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { SSyncLogStoreData* pData = pLogStore->data; pData->pSyncNode = pSyncNode; pData->pWal = pSyncNode->pWal; - ASSERT(pData->pWal != NULL); + if (pData->pWal == NULL) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return NULL; + } (void)taosThreadMutexInit(&(pData->mutex), NULL); pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0); @@ -115,7 +118,7 @@ void logStoreDestory(SSyncLogStore* pLogStore) { // log[m .. n] static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) { - ASSERT(snapshotIndex >= 0); + if (!(snapshotIndex >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR; SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -303,14 +306,14 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR } *ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); - ASSERT(*ppEntry != NULL); + if (*ppEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR; (*ppEntry)->msgType = TDMT_SYNC_CLIENT_REQUEST; (*ppEntry)->originalRpcType = pWalHandle->pHead->head.msgType; (*ppEntry)->seqNum = pWalHandle->pHead->head.syncMeta.seqNum; (*ppEntry)->isWeak = pWalHandle->pHead->head.syncMeta.isWeek; (*ppEntry)->term = pWalHandle->pHead->head.syncMeta.term; (*ppEntry)->index = index; - ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen); + if ((*ppEntry)->dataLen != pWalHandle->pHead->head.bodyLen) return TSDB_CODE_SYN_INTERNAL_ERROR; (void)memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen); /* @@ -362,14 +365,14 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - ASSERT(ppLastEntry != NULL); + if (ppLastEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR; *ppLastEntry = NULL; if (walIsEmpty(pWal)) { TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST); } else { SyncIndex lastIndex = raftLogLastIndex(pLogStore); - ASSERT(lastIndex >= SYNC_INDEX_BEGIN); + if (!(lastIndex >= SYNC_INDEX_BEGIN)) return TSDB_CODE_SYN_INTERNAL_ERROR; int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry); TAOS_RETURN(code); diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index a30b9a4930..8b8b2580b1 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -104,7 +104,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncNodeStepDown(ths, pMsg->term); } SyncTerm currentTerm = raftStoreGetTerm(ths); - ASSERT(pMsg->term <= currentTerm); + if (!(pMsg->term <= currentTerm)) return TSDB_CODE_SYN_INTERNAL_ERROR; bool grant = (pMsg->term == currentTerm) && logOK && ((!raftStoreHasVoted(ths)) || (syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId))); @@ -130,7 +130,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { pReply->destId = pMsg->srcId; pReply->term = currentTerm; pReply->voteGranted = grant; - ASSERT(!grant || pMsg->term == pReply->term); + if (!(!grant || pMsg->term == pReply->term)) return TSDB_CODE_SYN_INTERNAL_ERROR; // trace log syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, ""); diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 10d9a6c96b..eeddb708ab 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -64,7 +64,7 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } syncLogRecvRequestVoteReply(ths, pMsg, ""); - ASSERT(pMsg->term == currentTerm); + if (pMsg->term != currentTerm) return TSDB_CODE_SYN_INTERNAL_ERROR; // This tallies votes even when the current state is not Candidate, // but they won't be looked at, so it doesn't matter. diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 8a2c53997b..2a3b164f03 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -57,7 +57,7 @@ static int32_t syncSnapBufferCreate(SSyncSnapBuffer **ppBuf) { TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } pBuf->size = sizeof(pBuf->entries) / sizeof(void *); - ASSERT(pBuf->size == TSDB_SYNC_SNAP_BUFFER_SIZE); + if (pBuf->size != TSDB_SYNC_SNAP_BUFFER_SIZE) return TSDB_CODE_SYN_INTERNAL_ERROR; (void)taosThreadMutexInit(&pBuf->mutex, NULL); *ppBuf = pBuf; TAOS_RETURN(0); @@ -311,7 +311,10 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { } } - ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END); + if (!(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END)) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _OUT; + } // send msg int32_t blockLen = (pBlk) ? pBlk->blockLen : 0; @@ -323,7 +326,10 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { // put in buffer int64_t nowMs = taosGetTimestampMs(); if (pBlk) { - ASSERT(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END); + if (!(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END)) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _OUT; + } pBlk->sendTimeMs = nowMs; pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk; pBlk = NULL; @@ -351,7 +357,10 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) { SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size]; - ASSERT(pBlk); + if (!pBlk) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } int64_t nowMs = taosGetTimestampMs(); if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) { continue; @@ -682,7 +691,7 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) { static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, SSnapshot *pInfo) { - ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT); + if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT) return TSDB_CODE_SYN_INTERNAL_ERROR; int32_t code = 0, lino = 0; // copy snap info from leader @@ -878,7 +887,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot goto _out; } - ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end); + if (!(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end)) return TSDB_CODE_SYN_INTERNAL_ERROR; if (pMsg->seq > pRcvBuf->cursor) { if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) { @@ -922,7 +931,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend // condition 4 // transfering SyncSnapshotSend *pMsg = ppMsg[0]; - ASSERT(pMsg); + if (!pMsg) return TSDB_CODE_SYN_INTERNAL_ERROR; SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; int64_t timeNow = taosGetTimestampMs(); int32_t code = 0; @@ -1071,7 +1080,7 @@ _out:; } static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { - ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY); + if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT_REPLY) return TSDB_CODE_SYN_INTERNAL_ERROR; SSyncTLV *datHead = (void *)pMsg->data; if (datHead->typ != pMsg->payloadType) { @@ -1168,11 +1177,17 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp goto _out; } - ASSERT(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end); + if (!(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end)) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) { SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size]; - ASSERT(pBlk); + if (!pBlk) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } pBlk->acked = 1; }