Merge pull request #27391 from taosdata/fix/TD-31542-sync1

fix/TD-31542-sync1
This commit is contained in:
Hongze Cheng 2024-08-22 16:52:43 +08:00 committed by GitHub
commit e6ebfb286f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 162 additions and 75 deletions

View File

@ -1774,7 +1774,7 @@ static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj *
int32_t nCols) {
// if (pColCmpr == NULL || colName == NULL) return -1;
ASSERT(taosArrayGetSize(pField) == nCols);
if (taosArrayGetSize(pField) != nCols) return TSDB_CODE_FAILED;
TAOS_FIELD *p = taosArrayGet(pField, 0);
int32_t code = 0;

View File

@ -57,14 +57,12 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
if (pMsg->term > raftStoreGetTerm(ths)) {
if (pMsg->term != raftStoreGetTerm(ths)) {
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term);
return TSDB_CODE_SYN_WRONG_TERM;
}
ASSERT(pMsg->term == raftStoreGetTerm(ths));
sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "",
pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex);

View File

@ -57,7 +57,10 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { return pSyncNode->qu
bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) {
int count = 0;
SSyncIndexMgr* pMatches = pNode->pMatchIndex;
ASSERT(pNode->replicaNum == pMatches->replicaNum);
if (pNode->replicaNum != pMatches->replicaNum) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return false;
};
for (int i = 0; i < pNode->totalReplicaNum; i++) {
if(pNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_VOTER){

View File

@ -119,7 +119,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
}
ret = syncNodeRequestVotePeers(pSyncNode);
ASSERT(ret == 0);
if (ret != 0) return ret;
syncNodeResetElectTimer(pSyncNode);
return ret;

View File

@ -988,9 +988,18 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
int32_t code = 0;
ASSERTS(pNode->pLogStore != NULL, "log store not created");
ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
if (pNode->pLogStore == NULL) {
sError("vgId:%d, log store not created", pNode->vgId);
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
if (pNode->pFsm == NULL) {
sError("vgId:%d, pFsm not registered", pNode->vgId);
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
sError("vgId:%d, FpGetSnapshotInfo not registered", pNode->vgId);
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
SSnapshot snapshot = {0};
// TODO check return value
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
@ -1384,8 +1393,14 @@ void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
int32_t code = 0;
ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created");
if (pSyncNode->pLogStore == NULL) {
sError("vgId:%d, log store not created", pSyncNode->vgId);
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
if (pSyncNode->pLogBuf == NULL) {
sError("vgId:%d, ring log buffer not created", pSyncNode->vgId);
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
(void)taosThreadMutexLock(&pSyncNode->pLogBuf->mutex);
SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
@ -1400,7 +1415,7 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
TAOS_RETURN(code);
}
ASSERT(endIndex == lastVer + 1);
if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
@ -1743,7 +1758,10 @@ inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
}
}
ASSERT(b1 == b2);
if (b1 != b2) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return false;
}
return b1;
}
@ -2194,7 +2212,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
}
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
// ASSERT(lastIndex >= 0);
sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId,
raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
}
@ -2222,7 +2240,7 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
}
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER);
if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) return TSDB_CODE_SYN_INTERNAL_ERROR;
syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
sNTrace(pSyncNode, "assigned leader to leader");

View File

@ -71,16 +71,32 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
goto _err;
}
ASSERT(index == pBuf->endIndex);
if (index != pBuf->endIndex) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _err;
};
SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
ASSERT(pExist == NULL);
if (pExist != NULL) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _err;
}
// initial log buffer with at least one item, e.g. commitIndex
SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
ASSERTS(pMatch != NULL, "no matched log entry");
ASSERT(pMatch->index + 1 == index);
ASSERT(pMatch->term <= pEntry->term);
if (pMatch == NULL) {
sError("vgId:%d, no matched log entry", pNode->vgId);
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _err;
}
if (pMatch->index + 1 != index) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _err;
}
if (!(pMatch->term <= pEntry->term)) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _err;
}
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
pBuf->entries[index % pBuf->size] = tmp;
@ -114,7 +130,7 @@ int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI
TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
}
ASSERT(index - 1 == prevIndex);
if (index - 1 != prevIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (prevIndex >= pBuf->startIndex) {
pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
@ -178,9 +194,18 @@ int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex
}
int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
ASSERTS(pNode->pLogStore != NULL, "log store not created");
ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
if (pNode->pLogStore == NULL) {
sError("log store not created");
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
if (pNode->pFsm == NULL) {
sError("pFsm not registered");
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
if (pNode->pFsm->FpGetSnapshotInfo == NULL) {
sError("FpGetSnapshotInfo not registered");
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
int32_t code = 0, lino = 0;
SSnapshot snapshot = {0};
@ -191,7 +216,8 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
TAOS_CHECK_EXIT(syncLogValidateAlignmentOfCommit(pNode, commitIndex));
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
ASSERT(lastVer >= commitIndex);
if (lastVer < commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
;
SyncIndex toIndex = lastVer;
// update match index
pBuf->commitIndex = commitIndex;
@ -239,7 +265,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
// put a dummy record at commitIndex if present in log buffer
if (takeDummy) {
ASSERT(index == pBuf->commitIndex);
if (index != pBuf->commitIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
if (pDummy == NULL) {
@ -392,9 +418,18 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
}
// update
ASSERT(pBuf->startIndex < index);
ASSERT(index - pBuf->startIndex < pBuf->size);
ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL);
if (!(pBuf->startIndex < index)) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
};
if (!(index - pBuf->startIndex < pBuf->size)) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
if (pBuf->entries[index % pBuf->size].pItem != NULL) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
pEntry = NULL;
pBuf->entries[index % pBuf->size] = tmp;
@ -422,14 +457,14 @@ static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replica
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
int32_t code = 0;
ASSERT(pEntry->index >= 0);
if (pEntry->index < 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
if (lastVer >= pEntry->index && (code = pLogStore->syncLogTruncate(pLogStore, pEntry->index)) < 0) {
sError("failed to truncate log store since %s. from index:%" PRId64 "", tstrerror(code), pEntry->index);
TAOS_RETURN(code);
}
lastVer = pLogStore->syncLogLastIndex(pLogStore);
ASSERT(pEntry->index == lastVer + 1);
if (pEntry->index != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum);
if ((code = pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync)) < 0) {
@ -439,7 +474,7 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaf
}
lastVer = pLogStore->syncLogLastIndex(pLogStore);
ASSERT(pEntry->index == lastVer);
if (pEntry->index != lastVer) return TSDB_CODE_SYN_INTERNAL_ERROR;
return 0;
}
@ -718,7 +753,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
for (SyncIndex index = pBuf->startIndex; index < until; index++) {
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
ASSERT(pEntry != NULL);
if (pEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
syncEntryDestroy(pEntry);
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
pBuf->startIndex = index + 1;
@ -786,7 +821,10 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
int64_t pos = index % pMgr->size;
ASSERT(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex));
if (!(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex))) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) {
break;
@ -809,7 +847,10 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
tstrerror(code), index, pDestId->addr);
goto _out;
}
ASSERT(barrier == pMgr->states[pos].barrier);
if (barrier != pMgr->states[pos].barrier) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
pMgr->states[pos].timeMs = nowMs;
pMgr->states[pos].term = term;
pMgr->states[pos].acked = false;
@ -839,11 +880,11 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
SSyncLogBuffer* pBuf = pNode->pLogBuf;
SRaftId destId = pMsg->srcId;
int32_t code = 0;
ASSERT(pMgr->restored == false);
if (pMgr->restored != false) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (pMgr->endIndex == 0) {
ASSERT(pMgr->startIndex == 0);
ASSERT(pMgr->matchIndex == 0);
if (pMgr->startIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (pMgr->matchIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (pMsg->matchIndex < 0) {
pMgr->restored = true;
sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
@ -909,7 +950,7 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
if ((index + 1 < firstVer) || (term < 0) ||
(term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
if (!(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST)) return TSDB_CODE_SYN_INTERNAL_ERROR;
if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) {
sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
TAOS_RETURN(code);
@ -918,13 +959,13 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
return 0;
}
ASSERT(index + 1 >= firstVer);
if (!(index + 1 >= firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (term == pMsg->lastMatchTerm) {
index = index + 1;
ASSERT(index <= pNode->pLogBuf->matchIndex);
if (!(index <= pNode->pLogBuf->matchIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
} else {
ASSERT(index > firstVer);
if (!(index > firstVer)) return TSDB_CODE_SYN_INTERNAL_ERROR;
}
}
@ -975,8 +1016,8 @@ int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
}
int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
ASSERT(!pMgr->restored);
ASSERT(pMgr->startIndex >= 0);
if (pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (!(pMgr->startIndex >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
int64_t nowMs = taosGetMonoTimestampMs();
int32_t code = 0;
@ -996,7 +1037,7 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde
TAOS_RETURN(code);
}
ASSERT(index >= 0);
if (!(index >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
pMgr->states[index % pMgr->size].barrier = barrier;
pMgr->states[index % pMgr->size].timeMs = nowMs;
pMgr->states[index % pMgr->size].term = term;
@ -1014,7 +1055,7 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde
}
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
ASSERT(pMgr->restored);
if (!pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
int32_t batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
@ -1070,7 +1111,7 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
}
int32_t syncLogReplContinue(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
ASSERT(pMgr->restored == true);
if (pMgr->restored != true) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
int64_t firstMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
@ -1100,7 +1141,10 @@ SSyncLogReplMgr* syncLogReplCreate() {
pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]);
ASSERT(pMgr->size == TSDB_SYNC_LOG_BUFFER_SIZE);
if (pMgr->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return NULL;
}
return pMgr;
}
@ -1115,7 +1159,7 @@ void syncLogReplDestroy(SSyncLogReplMgr* pMgr) {
int32_t syncNodeLogReplInit(SSyncNode* pNode) {
for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
ASSERT(pNode->logReplMgrs[i] == NULL);
if (pNode->logReplMgrs[i] != NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
pNode->logReplMgrs[i] = syncLogReplCreate();
if (pNode->logReplMgrs[i] == NULL) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
@ -1141,7 +1185,10 @@ int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) {
pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]);
ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE);
if (pBuf->size != TSDB_SYNC_LOG_BUFFER_SIZE) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _exit;
}
if (taosThreadMutexAttrInit(&pBuf->attr) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
@ -1194,7 +1241,7 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
int32_t code = 0;
ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex);
if (!(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex)) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (toIndex == pBuf->endIndex) {
return 0;
@ -1217,7 +1264,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
}
pBuf->endIndex = toIndex;
pBuf->matchIndex = TMIN(pBuf->matchIndex, index);
ASSERT(index + 1 == toIndex);
if (index + 1 != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
// trunc wal
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
@ -1227,7 +1274,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
TAOS_RETURN(code);
}
lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
ASSERT(toIndex == lastVer + 1);
if (toIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
// refill buffer on need
if (toIndex <= pBuf->startIndex) {
@ -1237,7 +1284,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
}
}
ASSERT(pBuf->endIndex == toIndex);
if (pBuf->endIndex != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
(void)syncLogBufferValidate(pBuf);
return 0;
}
@ -1246,7 +1293,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
(void)taosThreadMutexLock(&pBuf->mutex);
(void)syncLogBufferValidate(pBuf);
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
ASSERT(lastVer == pBuf->matchIndex);
if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
SyncIndex index = pBuf->endIndex - 1;
(void)syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1);

View File

@ -70,7 +70,10 @@ SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg) {
return NULL;
}
memcpy(pEntry, pMsg->data, pMsg->dataLen);
ASSERT(pEntry->bytes == pMsg->dataLen);
if (pEntry->bytes != pMsg->dataLen) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return NULL;
}
return pEntry;
}

View File

@ -61,7 +61,10 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
SSyncLogStoreData* pData = pLogStore->data;
pData->pSyncNode = pSyncNode;
pData->pWal = pSyncNode->pWal;
ASSERT(pData->pWal != NULL);
if (pData->pWal == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return NULL;
}
(void)taosThreadMutexInit(&(pData->mutex), NULL);
pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0);
@ -115,7 +118,7 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
// log[m .. n]
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) {
ASSERT(snapshotIndex >= 0);
if (!(snapshotIndex >= 0)) return TSDB_CODE_SYN_INTERNAL_ERROR;
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
@ -303,14 +306,14 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
}
*ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
ASSERT(*ppEntry != NULL);
if (*ppEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
(*ppEntry)->msgType = TDMT_SYNC_CLIENT_REQUEST;
(*ppEntry)->originalRpcType = pWalHandle->pHead->head.msgType;
(*ppEntry)->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
(*ppEntry)->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
(*ppEntry)->term = pWalHandle->pHead->head.syncMeta.term;
(*ppEntry)->index = index;
ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen);
if ((*ppEntry)->dataLen != pWalHandle->pHead->head.bodyLen) return TSDB_CODE_SYN_INTERNAL_ERROR;
(void)memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
/*
@ -362,14 +365,14 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
ASSERT(ppLastEntry != NULL);
if (ppLastEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
*ppLastEntry = NULL;
if (walIsEmpty(pWal)) {
TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
} else {
SyncIndex lastIndex = raftLogLastIndex(pLogStore);
ASSERT(lastIndex >= SYNC_INDEX_BEGIN);
if (!(lastIndex >= SYNC_INDEX_BEGIN)) return TSDB_CODE_SYN_INTERNAL_ERROR;
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
TAOS_RETURN(code);

View File

@ -104,7 +104,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncNodeStepDown(ths, pMsg->term);
}
SyncTerm currentTerm = raftStoreGetTerm(ths);
ASSERT(pMsg->term <= currentTerm);
if (!(pMsg->term <= currentTerm)) return TSDB_CODE_SYN_INTERNAL_ERROR;
bool grant = (pMsg->term == currentTerm) && logOK &&
((!raftStoreHasVoted(ths)) || (syncUtilSameId(&ths->raftStore.voteFor, &pMsg->srcId)));
@ -130,7 +130,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pReply->destId = pMsg->srcId;
pReply->term = currentTerm;
pReply->voteGranted = grant;
ASSERT(!grant || pMsg->term == pReply->term);
if (!(!grant || pMsg->term == pReply->term)) return TSDB_CODE_SYN_INTERNAL_ERROR;
// trace log
syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, "");

View File

@ -64,7 +64,7 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
syncLogRecvRequestVoteReply(ths, pMsg, "");
ASSERT(pMsg->term == currentTerm);
if (pMsg->term != currentTerm) return TSDB_CODE_SYN_INTERNAL_ERROR;
// This tallies votes even when the current state is not Candidate,
// but they won't be looked at, so it doesn't matter.

View File

@ -57,7 +57,7 @@ static int32_t syncSnapBufferCreate(SSyncSnapBuffer **ppBuf) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
pBuf->size = sizeof(pBuf->entries) / sizeof(void *);
ASSERT(pBuf->size == TSDB_SYNC_SNAP_BUFFER_SIZE);
if (pBuf->size != TSDB_SYNC_SNAP_BUFFER_SIZE) return TSDB_CODE_SYN_INTERNAL_ERROR;
(void)taosThreadMutexInit(&pBuf->mutex, NULL);
*ppBuf = pBuf;
TAOS_RETURN(0);
@ -311,7 +311,10 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
}
}
ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END);
if (!(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END)) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _OUT;
}
// send msg
int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
@ -323,7 +326,10 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// put in buffer
int64_t nowMs = taosGetTimestampMs();
if (pBlk) {
ASSERT(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END);
if (!(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END)) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _OUT;
}
pBlk->sendTimeMs = nowMs;
pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk;
pBlk = NULL;
@ -351,7 +357,10 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
ASSERT(pBlk);
if (!pBlk) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
int64_t nowMs = taosGetTimestampMs();
if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
continue;
@ -682,7 +691,7 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver,
SyncSnapshotSend *pMsg, SSnapshot *pInfo) {
ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT);
if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT) return TSDB_CODE_SYN_INTERNAL_ERROR;
int32_t code = 0, lino = 0;
// copy snap info from leader
@ -878,7 +887,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
goto _out;
}
ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end);
if (!(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end)) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (pMsg->seq > pRcvBuf->cursor) {
if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) {
@ -922,7 +931,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend
// condition 4
// transfering
SyncSnapshotSend *pMsg = ppMsg[0];
ASSERT(pMsg);
if (!pMsg) return TSDB_CODE_SYN_INTERNAL_ERROR;
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
int64_t timeNow = taosGetTimestampMs();
int32_t code = 0;
@ -1071,7 +1080,7 @@ _out:;
}
static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY);
if (pMsg->payloadType != TDMT_SYNC_PREP_SNAPSHOT_REPLY) return TSDB_CODE_SYN_INTERNAL_ERROR;
SSyncTLV *datHead = (void *)pMsg->data;
if (datHead->typ != pMsg->payloadType) {
@ -1168,11 +1177,17 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp
goto _out;
}
ASSERT(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end);
if (!(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end)) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) {
SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size];
ASSERT(pBlk);
if (!pBlk) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
pBlk->acked = 1;
}