enh: logging sync log buffer rollback events

This commit is contained in:
Benguang Zhao 2022-12-03 08:25:57 +08:00
parent 03d62e478f
commit e97c8f637b
2 changed files with 9 additions and 5 deletions

View File

@ -107,7 +107,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode);
// private // private
SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf); SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf);
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf); int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf);
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex); int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -310,7 +310,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
ASSERT(pEntry->index == pExist->index); ASSERT(pEntry->index == pExist->index);
if (pEntry->term != pExist->term) { if (pEntry->term != pExist->term) {
(void)syncLogBufferRollback(pBuf, index); (void)syncLogBufferRollback(pBuf, pNode, index);
} else { } else {
sTrace("vgId:%d, duplicate log entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 sTrace("vgId:%d, duplicate log entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64
" %" PRId64 " %" PRId64 ", %" PRId64 ")", " %" PRId64 " %" PRId64 ", %" PRId64 ")",
@ -971,14 +971,18 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
return; return;
} }
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex) { int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex); ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex);
sInfo("vgId:%d, rollback sync log buffer. toindex: %" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64
", %" PRId64 ")",
pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
SyncIndex index = pBuf->endIndex - 1; SyncIndex index = pBuf->endIndex - 1;
while (index >= toIndex) { while (index >= toIndex) {
SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem; SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
if (pEntry != NULL) { if (pEntry != NULL) {
syncEntryDestroy(pEntry); (void)syncEntryDestroy(pEntry);
pEntry = NULL; pEntry = NULL;
memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0])); memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0]));
} }
@ -996,7 +1000,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
ASSERT(lastVer == pBuf->matchIndex); ASSERT(lastVer == pBuf->matchIndex);
SyncIndex index = pBuf->endIndex - 1; SyncIndex index = pBuf->endIndex - 1;
(void)syncLogBufferRollback(pBuf, pBuf->matchIndex + 1); (void)syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1);
sInfo("vgId:%d, reset sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, sInfo("vgId:%d, reset sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);