diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index 4208d40a69..8c7edf85ff 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -107,7 +107,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode); // private SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf); int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf); -int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex); +int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index f0be976402..fcb0d63209 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -310,7 +310,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt ASSERT(pEntry->index == pExist->index); if (pEntry->term != pExist->term) { - (void)syncLogBufferRollback(pBuf, index); + (void)syncLogBufferRollback(pBuf, pNode, index); } else { sTrace("vgId:%d, duplicate log entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", @@ -971,14 +971,18 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) { return; } -int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex) { +int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) { ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex); + sInfo("vgId:%d, rollback sync log buffer. toindex: %" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64 + ", %" PRId64 ")", + pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + SyncIndex index = pBuf->endIndex - 1; while (index >= toIndex) { SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem; if (pEntry != NULL) { - syncEntryDestroy(pEntry); + (void)syncEntryDestroy(pEntry); pEntry = NULL; memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0])); } @@ -996,7 +1000,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { ASSERT(lastVer == pBuf->matchIndex); SyncIndex index = pBuf->endIndex - 1; - (void)syncLogBufferRollback(pBuf, pBuf->matchIndex + 1); + (void)syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1); sInfo("vgId:%d, reset sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);