Merge pull request #19004 from taosdata/FIX/TD-21169-main
fix: allow to rollback sync log buffer beyond startIndex with refill
This commit is contained in:
commit
b74e0b0715
|
@ -275,6 +275,8 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
SyncIndex index = pEntry->index;
|
||||
SyncIndex prevIndex = pEntry->index - 1;
|
||||
SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf);
|
||||
SSyncRaftEntry* pExist = NULL;
|
||||
bool inBuf = true;
|
||||
|
||||
if (index <= pBuf->commitIndex) {
|
||||
sTrace("vgId:%d, already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64
|
||||
|
@ -306,10 +308,9 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
}
|
||||
|
||||
// check current in buffer
|
||||
SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
|
||||
pExist = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
|
||||
if (pExist != NULL) {
|
||||
ASSERT(pEntry->index == pExist->index);
|
||||
|
||||
if (pEntry->term != pExist->term) {
|
||||
(void)syncLogBufferRollback(pBuf, pNode, index);
|
||||
} else {
|
||||
|
@ -317,14 +318,15 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
" %" PRId64 " %" PRId64 ", %" PRId64 ")",
|
||||
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
|
||||
pBuf->endIndex);
|
||||
SyncTerm existPrevTerm = pBuf->entries[index % pBuf->size].prevLogTerm;
|
||||
ASSERT(pEntry->term == pExist->term && prevTerm == existPrevTerm);
|
||||
SyncTerm existPrevTerm = syncLogReplMgrGetPrevLogTerm(NULL, pNode, index);
|
||||
ASSERT(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm));
|
||||
ret = 0;
|
||||
goto _out;
|
||||
}
|
||||
}
|
||||
|
||||
// update
|
||||
ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL);
|
||||
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
|
||||
pEntry = NULL;
|
||||
pBuf->entries[index % pBuf->size] = tmp;
|
||||
|
@ -337,6 +339,10 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
|
||||
_out:
|
||||
syncEntryDestroy(pEntry);
|
||||
if (!inBuf) {
|
||||
syncEntryDestroy(pExist);
|
||||
pExist = NULL;
|
||||
}
|
||||
syncLogBufferValidate(pBuf);
|
||||
taosThreadMutexUnlock(&pBuf->mutex);
|
||||
return ret;
|
||||
|
@ -1008,6 +1014,16 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
|
|||
lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
|
||||
ASSERT(toIndex == lastVer + 1);
|
||||
|
||||
// refill buffer on need
|
||||
if (toIndex <= pBuf->startIndex) {
|
||||
int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
|
||||
if (ret < 0) {
|
||||
sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(pBuf->endIndex == toIndex);
|
||||
syncLogBufferValidate(pBuf);
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue