Merge pull request #20090 from taosdata/FIX/TD-22470-main

fix: check if log buffer is empty in syncNodeOnLocalCmd
This commit is contained in:
Xiaoyu Wang 2023-02-23 10:38:21 +08:00 committed by GitHub
commit f6ba2d728c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 0 deletions

View File

@ -99,6 +99,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
// access // access
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf); SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf);
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf);
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm); int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm);

View File

@ -2468,6 +2468,10 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncNodeStepDown(ths, pMsg->currentTerm); syncNodeStepDown(ths, pMsg->currentTerm);
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
if (syncLogBufferIsEmpty(ths->pLogBuf)) {
sError("vgId:%d, sync log buffer is empty.", ths->vgId);
return 0;
}
SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf); SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
if (pMsg->currentTerm == matchTerm) { if (pMsg->currentTerm == matchTerm) {
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);

View File

@ -253,6 +253,7 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf);
for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) { for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
if (pEntry == NULL) continue; if (pEntry == NULL) continue;
@ -265,6 +266,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
if (ret < 0) { if (ret < 0) {
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr()); sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr());
} }
syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex); taosThreadMutexUnlock(&pBuf->mutex);
return ret; return ret;
} }
@ -283,6 +285,13 @@ SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
return term; return term;
} }
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) {
taosThreadMutexLock(&pBuf->mutex);
bool empty = (pBuf->endIndex <= pBuf->startIndex);
taosThreadMutexUnlock(&pBuf->mutex);
return empty;
}
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) { int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf); syncLogBufferValidate(pBuf);
@ -1073,6 +1082,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf);
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
ASSERT(lastVer == pBuf->matchIndex); ASSERT(lastVer == pBuf->matchIndex);
SyncIndex index = pBuf->endIndex - 1; SyncIndex index = pBuf->endIndex - 1;
@ -1089,6 +1099,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
syncLogReplMgrReset(pMgr); syncLogReplMgrReset(pMgr);
} }
syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex); taosThreadMutexUnlock(&pBuf->mutex);
return 0; return 0;
} }