enh: validate alignment of WAL and tsdb commit during syncLogBufferInit
This commit is contained in:
parent
95f8e96eb5
commit
006e13e663
|
@ -119,6 +119,26 @@ SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId
|
||||||
return syncEntryBuildNoop(term, index, vgId);
|
return syncEntryBuildNoop(term, index, vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex) {
|
||||||
|
SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
|
||||||
|
if (firstVer > commitIndex + 1) {
|
||||||
|
sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1. firstVer: %" PRId64
|
||||||
|
", tsdb commit version: %" PRId64 "",
|
||||||
|
pNode->vgId, firstVer, commitIndex);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
|
||||||
|
if (lastVer < commitIndex) {
|
||||||
|
sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer: %" PRId64
|
||||||
|
", tsdb commit version: %" PRId64 "",
|
||||||
|
pNode->vgId, lastVer, commitIndex);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
taosThreadMutexLock(&pBuf->mutex);
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
ASSERT(pNode->pLogStore != NULL && "log store not created");
|
ASSERT(pNode->pLogStore != NULL && "log store not created");
|
||||||
|
@ -132,16 +152,12 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
}
|
}
|
||||||
SyncIndex commitIndex = snapshot.lastApplyIndex;
|
SyncIndex commitIndex = snapshot.lastApplyIndex;
|
||||||
SyncTerm commitTerm = snapshot.lastApplyTerm;
|
SyncTerm commitTerm = snapshot.lastApplyTerm;
|
||||||
|
if (syncLogValidateAlignmentOfCommit(pNode, commitIndex)) {
|
||||||
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
|
|
||||||
if (lastVer < commitIndex) {
|
|
||||||
sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer: %" PRId64
|
|
||||||
", tsdb commit version: %" PRId64 "",
|
|
||||||
pNode->vgId, lastVer, commitIndex);
|
|
||||||
terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
|
terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
|
||||||
ASSERT(lastVer >= commitIndex);
|
ASSERT(lastVer >= commitIndex);
|
||||||
SyncIndex toIndex = lastVer;
|
SyncIndex toIndex = lastVer;
|
||||||
// update match index
|
// update match index
|
||||||
|
|
|
@ -238,9 +238,10 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
|
||||||
taosPrintLog(flags, level, dflag,
|
taosPrintLog(flags, level, dflag,
|
||||||
"vgId:%d, sync %s "
|
"vgId:%d, sync %s "
|
||||||
"%s"
|
"%s"
|
||||||
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
|
", term:%" PRIu64 ", commitIdx:%" PRId64 ", firstVer:%" PRId64 ", lastVer:%" PRId64 ", min:%" PRId64
|
||||||
", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64
|
", snap.lastApply:%" PRId64 ", snap.term:%" PRIu64
|
||||||
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
|
", standby:%d, aqItems:%d, batchSz:%d, replicaNum:%d, lastCfgIdx:%" PRId64
|
||||||
|
", changing:%d, restore:%d, quorum:%d, electTimer:%" PRId64 ", hb:%" PRId64 ", %s, %s",
|
||||||
pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex,
|
pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex,
|
||||||
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
|
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
|
||||||
pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum,
|
pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum,
|
||||||
|
|
Loading…
Reference in New Issue