fix/TD-31542-remove-assert-sync3
This commit is contained in:
parent
99cca75128
commit
8ac690638f
|
@ -45,9 +45,6 @@
|
||||||
//
|
//
|
||||||
|
|
||||||
static inline int64_t syncNodeAbs64(int64_t a, int64_t b) {
|
static inline int64_t syncNodeAbs64(int64_t a, int64_t b) {
|
||||||
ASSERT(a >= 0);
|
|
||||||
ASSERT(b >= 0);
|
|
||||||
|
|
||||||
int64_t c = a > b ? a - b : b - a;
|
int64_t c = a > b ? a - b : b - a;
|
||||||
return c;
|
return c;
|
||||||
}
|
}
|
||||||
|
|
|
@ -3523,6 +3523,9 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
|
SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
|
||||||
|
if (matchTerm < 0) {
|
||||||
|
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
if (pMsg->currentTerm == matchTerm) {
|
if (pMsg->currentTerm == matchTerm) {
|
||||||
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
|
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,8 +46,8 @@ int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
|
||||||
|
|
||||||
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
|
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
(void)taosThreadMutexLock(&pBuf->mutex);
|
(void)taosThreadMutexLock(&pBuf->mutex);
|
||||||
(void)syncLogBufferValidate(pBuf);
|
|
||||||
SyncIndex index = pEntry->index;
|
SyncIndex index = pEntry->index;
|
||||||
|
|
||||||
if (index - pBuf->startIndex >= pBuf->size) {
|
if (index - pBuf->startIndex >= pBuf->size) {
|
||||||
|
@ -102,13 +102,13 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
||||||
pBuf->entries[index % pBuf->size] = tmp;
|
pBuf->entries[index % pBuf->size] = tmp;
|
||||||
pBuf->endIndex = index + 1;
|
pBuf->endIndex = index + 1;
|
||||||
|
|
||||||
(void)syncLogBufferValidate(pBuf);
|
|
||||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
(void)syncLogBufferValidate(pBuf);
|
|
||||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
taosMsleep(1);
|
taosMsleep(1);
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
@ -134,7 +134,11 @@ int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI
|
||||||
|
|
||||||
if (prevIndex >= pBuf->startIndex) {
|
if (prevIndex >= pBuf->startIndex) {
|
||||||
pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
|
pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
|
||||||
ASSERTS(pEntry != NULL, "no log entry found");
|
if (pEntry == NULL) {
|
||||||
|
sError("vgId:%d, failed to get pre log term since no log entry found", pNode->vgId);
|
||||||
|
*pSyncTerm = -1;
|
||||||
|
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
prevLogTerm = pEntry->term;
|
prevLogTerm = pEntry->term;
|
||||||
*pSyncTerm = prevLogTerm;
|
*pSyncTerm = prevLogTerm;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -142,9 +146,18 @@ int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI
|
||||||
|
|
||||||
if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
|
if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
|
||||||
int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
|
int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
|
||||||
ASSERTS(timeMs != 0, "no log entry found");
|
if (timeMs == 0) {
|
||||||
|
sError("vgId:%d, failed to get pre log term since timeMs is 0", pNode->vgId);
|
||||||
|
*pSyncTerm = -1;
|
||||||
|
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
|
prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
|
||||||
ASSERT(prevIndex == 0 || prevLogTerm != 0);
|
if (!(prevIndex == 0 || prevLogTerm != 0)) {
|
||||||
|
sError("vgId:%d, failed to get pre log term prevIndex:%" PRId64 ", prevLogTerm:%" PRId64, pNode->vgId, prevIndex,
|
||||||
|
prevLogTerm);
|
||||||
|
*pSyncTerm = -1;
|
||||||
|
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
*pSyncTerm = prevLogTerm;
|
*pSyncTerm = prevLogTerm;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -289,7 +302,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||||
|
|
||||||
// validate
|
// validate
|
||||||
(void)syncLogBufferValidate(pBuf);
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -307,8 +320,8 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
(void)taosThreadMutexLock(&pBuf->mutex);
|
(void)taosThreadMutexLock(&pBuf->mutex);
|
||||||
(void)syncLogBufferValidate(pBuf);
|
|
||||||
for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
|
for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
|
||||||
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
|
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
|
||||||
if (pEntry == NULL) continue;
|
if (pEntry == NULL) continue;
|
||||||
|
@ -321,15 +334,19 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
|
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
|
||||||
}
|
}
|
||||||
(void)syncLogBufferValidate(pBuf);
|
|
||||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) {
|
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) {
|
||||||
SyncIndex index = pBuf->matchIndex;
|
SyncIndex index = pBuf->matchIndex;
|
||||||
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
|
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
|
||||||
ASSERT(pEntry != NULL);
|
if (pEntry == NULL) {
|
||||||
|
sError("failed to get last match term since entry is null");
|
||||||
|
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
return pEntry->term;
|
return pEntry->term;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -348,8 +365,8 @@ bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
|
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
|
||||||
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
(void)taosThreadMutexLock(&pBuf->mutex);
|
(void)taosThreadMutexLock(&pBuf->mutex);
|
||||||
(void)syncLogBufferValidate(pBuf);
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SyncIndex index = pEntry->index;
|
SyncIndex index = pEntry->index;
|
||||||
SyncIndex prevIndex = pEntry->index - 1;
|
SyncIndex prevIndex = pEntry->index - 1;
|
||||||
|
@ -357,6 +374,12 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
||||||
SSyncRaftEntry* pExist = NULL;
|
SSyncRaftEntry* pExist = NULL;
|
||||||
bool inBuf = true;
|
bool inBuf = true;
|
||||||
|
|
||||||
|
if (lastMatchTerm < 0) {
|
||||||
|
sError("vgId:%d, failed to accept, lastMatchTerm:%" PRId64, pNode->vgId, lastMatchTerm);
|
||||||
|
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
if (index <= pBuf->commitIndex) {
|
if (index <= pBuf->commitIndex) {
|
||||||
sTrace("vgId:%d, already committed. index:%" PRId64 ", term:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64
|
sTrace("vgId:%d, already committed. index:%" PRId64 ", term:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64
|
||||||
" %" PRId64 ", %" PRId64 ")",
|
" %" PRId64 ", %" PRId64 ")",
|
||||||
|
@ -364,7 +387,11 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
||||||
pBuf->endIndex);
|
pBuf->endIndex);
|
||||||
SyncTerm term = -1;
|
SyncTerm term = -1;
|
||||||
code = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1, &term);
|
code = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1, &term);
|
||||||
ASSERT(pEntry->term >= 0);
|
if (pEntry->term < 0) {
|
||||||
|
sError("vgId:%d, failed to accept, pEntry->term:%" PRId64, pNode->vgId, pEntry->term);
|
||||||
|
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
if (term == pEntry->term) {
|
if (term == pEntry->term) {
|
||||||
code = 0;
|
code = 0;
|
||||||
}
|
}
|
||||||
|
@ -401,7 +428,12 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
||||||
// check current in buffer
|
// check current in buffer
|
||||||
code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pExist);
|
code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pExist);
|
||||||
if (pExist != NULL) {
|
if (pExist != NULL) {
|
||||||
ASSERT(pEntry->index == pExist->index);
|
if (pEntry->index != pExist->index) {
|
||||||
|
sError("vgId:%d, failed to accept, pEntry->index:%" PRId64 ", pExist->index:%" PRId64, pNode->vgId, pEntry->index,
|
||||||
|
pExist->index);
|
||||||
|
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
if (pEntry->term != pExist->term) {
|
if (pEntry->term != pExist->term) {
|
||||||
(void)syncLogBufferRollback(pBuf, pNode, index);
|
(void)syncLogBufferRollback(pBuf, pNode, index);
|
||||||
} else {
|
} else {
|
||||||
|
@ -411,7 +443,14 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
||||||
pBuf->endIndex);
|
pBuf->endIndex);
|
||||||
SyncTerm existPrevTerm = -1;
|
SyncTerm existPrevTerm = -1;
|
||||||
(void)syncLogReplGetPrevLogTerm(NULL, pNode, index, &existPrevTerm);
|
(void)syncLogReplGetPrevLogTerm(NULL, pNode, index, &existPrevTerm);
|
||||||
ASSERT(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm));
|
if (!(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm))) {
|
||||||
|
sError("vgId:%d, failed to accept, pEntry->term:%" PRId64 ", pExist->indexpExist->term:%" PRId64
|
||||||
|
", pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64 ", prevTerm:%" PRId64
|
||||||
|
", existPrevTerm:%" PRId64,
|
||||||
|
pNode->vgId, pEntry->term, pExist->term, pEntry->index, pBuf->matchIndex, prevTerm, existPrevTerm);
|
||||||
|
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
code = 0;
|
code = 0;
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
@ -446,8 +485,8 @@ _out:
|
||||||
syncEntryDestroy(pExist);
|
syncEntryDestroy(pExist);
|
||||||
pExist = NULL;
|
pExist = NULL;
|
||||||
}
|
}
|
||||||
(void)syncLogBufferValidate(pBuf);
|
|
||||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -479,8 +518,8 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaf
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char* str) {
|
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char* str) {
|
||||||
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
(void)taosThreadMutexLock(&pBuf->mutex);
|
(void)taosThreadMutexLock(&pBuf->mutex);
|
||||||
(void)syncLogBufferValidate(pBuf);
|
|
||||||
|
|
||||||
SSyncLogStore* pLogStore = pNode->pLogStore;
|
SSyncLogStore* pLogStore = pNode->pLogStore;
|
||||||
int64_t matchIndex = pBuf->matchIndex;
|
int64_t matchIndex = pBuf->matchIndex;
|
||||||
|
@ -488,7 +527,11 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
|
||||||
|
|
||||||
while (pBuf->matchIndex + 1 < pBuf->endIndex) {
|
while (pBuf->matchIndex + 1 < pBuf->endIndex) {
|
||||||
int64_t index = pBuf->matchIndex + 1;
|
int64_t index = pBuf->matchIndex + 1;
|
||||||
ASSERT(index >= 0);
|
if (index < 0) {
|
||||||
|
sError("vgId:%d, failed to proceed index:%" PRId64, pNode->vgId, index);
|
||||||
|
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
// try to proceed
|
// try to proceed
|
||||||
SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size];
|
SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size];
|
||||||
|
@ -501,14 +544,37 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(index == pEntry->index);
|
if (index != pEntry->index) {
|
||||||
|
sError("vgId:%d, failed to proceed index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId, index, pEntry->index);
|
||||||
|
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
// match
|
// match
|
||||||
SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem;
|
SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem;
|
||||||
ASSERT(pMatch != NULL);
|
if (pMatch == NULL) {
|
||||||
ASSERT(pMatch->index == pBuf->matchIndex);
|
sError("vgId:%d, failed to proceed since pMatch is null", pNode->vgId);
|
||||||
ASSERT(pMatch->index + 1 == pEntry->index);
|
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
ASSERT(prevLogIndex == pMatch->index);
|
goto _out;
|
||||||
|
}
|
||||||
|
if (pMatch->index != pBuf->matchIndex) {
|
||||||
|
sError("vgId:%d, failed to proceed, pMatch->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
|
||||||
|
pMatch->index, pBuf->matchIndex);
|
||||||
|
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
if (pMatch->index + 1 != pEntry->index) {
|
||||||
|
sError("vgId:%d, failed to proceed, pMatch->index:%" PRId64 ", pEntry->index:%" PRId64, pNode->vgId,
|
||||||
|
pMatch->index, pEntry->index);
|
||||||
|
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
if (prevLogIndex != pMatch->index) {
|
||||||
|
sError("vgId:%d, failed to proceed, prevLogIndex:%" PRId64 ", pMatch->index:%" PRId64, pNode->vgId, prevLogIndex,
|
||||||
|
pMatch->index);
|
||||||
|
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
if (pMatch->term != prevLogTerm) {
|
if (pMatch->term != prevLogTerm) {
|
||||||
sInfo(
|
sInfo(
|
||||||
|
@ -567,7 +633,12 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
|
||||||
// replicate on demand
|
// replicate on demand
|
||||||
(void)syncNodeReplicateWithoutLock(pNode);
|
(void)syncNodeReplicateWithoutLock(pNode);
|
||||||
|
|
||||||
ASSERT(pEntry->index == pBuf->matchIndex);
|
if (pEntry->index != pBuf->matchIndex) {
|
||||||
|
sError("vgId:%d, failed to proceed, pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
|
||||||
|
pEntry->index, pBuf->matchIndex);
|
||||||
|
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
// update my match index
|
// update my match index
|
||||||
matchIndex = pBuf->matchIndex;
|
matchIndex = pBuf->matchIndex;
|
||||||
|
@ -579,8 +650,8 @@ _out:
|
||||||
if (pMatchTerm) {
|
if (pMatchTerm) {
|
||||||
*pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term;
|
*pMatchTerm = pBuf->entries[(matchIndex + pBuf->size) % pBuf->size].pItem->term;
|
||||||
}
|
}
|
||||||
(void)syncLogBufferValidate(pBuf);
|
|
||||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
return matchIndex;
|
return matchIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -643,17 +714,36 @@ _exit:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
|
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
|
||||||
ASSERT(pBuf->startIndex <= pBuf->matchIndex);
|
if (pBuf->startIndex > pBuf->matchIndex) {
|
||||||
ASSERT(pBuf->commitIndex <= pBuf->matchIndex);
|
sError("failed to validate, pBuf->startIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->startIndex,
|
||||||
ASSERT(pBuf->matchIndex < pBuf->endIndex);
|
pBuf->matchIndex);
|
||||||
ASSERT(pBuf->endIndex - pBuf->startIndex <= pBuf->size);
|
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
ASSERT(pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem);
|
}
|
||||||
|
if (pBuf->commitIndex > pBuf->matchIndex) {
|
||||||
|
sError("failed to validate, pBuf->commitIndex:%" PRId64 ", pBuf->matchIndex:%" PRId64, pBuf->commitIndex,
|
||||||
|
pBuf->matchIndex);
|
||||||
|
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
|
if (pBuf->matchIndex >= pBuf->endIndex) {
|
||||||
|
sError("failed to validate, pBuf->matchIndex:%" PRId64 ", pBuf->endIndex:%" PRId64, pBuf->matchIndex,
|
||||||
|
pBuf->endIndex);
|
||||||
|
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
|
if (pBuf->endIndex - pBuf->startIndex > pBuf->size) {
|
||||||
|
sError("failed to validate, pBuf->endIndex:%" PRId64 ", pBuf->startIndex:%" PRId64 ", pBuf->size:%" PRId64,
|
||||||
|
pBuf->endIndex, pBuf->startIndex, pBuf->size);
|
||||||
|
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
|
if (pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem == NULL) {
|
||||||
|
sError("failed to validate since pItem is null");
|
||||||
|
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex) {
|
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex) {
|
||||||
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
(void)taosThreadMutexLock(&pBuf->mutex);
|
(void)taosThreadMutexLock(&pBuf->mutex);
|
||||||
(void)syncLogBufferValidate(pBuf);
|
|
||||||
|
|
||||||
SSyncLogStore* pLogStore = pNode->pLogStore;
|
SSyncLogStore* pLogStore = pNode->pLogStore;
|
||||||
SSyncFSM* pFsm = pNode->pFsm;
|
SSyncFSM* pFsm = pNode->pFsm;
|
||||||
|
@ -778,15 +868,18 @@ _out:
|
||||||
syncEntryDestroy(pNextEntry);
|
syncEntryDestroy(pNextEntry);
|
||||||
pNextEntry = NULL;
|
pNextEntry = NULL;
|
||||||
}
|
}
|
||||||
(void)syncLogBufferValidate(pBuf);
|
|
||||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogReplReset(SSyncLogReplMgr* pMgr) {
|
void syncLogReplReset(SSyncLogReplMgr* pMgr) {
|
||||||
if (pMgr == NULL) return;
|
if (pMgr == NULL) return;
|
||||||
|
|
||||||
ASSERT(pMgr->startIndex >= 0);
|
if (pMgr->startIndex < 0) {
|
||||||
|
sError("failed to reset, pMgr->startIndex:%" PRId64, pMgr->startIndex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
|
for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
|
||||||
(void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
|
(void)memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
|
||||||
}
|
}
|
||||||
|
@ -1285,13 +1378,13 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBuf->endIndex != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
|
if (pBuf->endIndex != toIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
(void)syncLogBufferValidate(pBuf);
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
(void)taosThreadMutexLock(&pBuf->mutex);
|
(void)taosThreadMutexLock(&pBuf->mutex);
|
||||||
(void)syncLogBufferValidate(pBuf);
|
|
||||||
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
|
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
|
||||||
if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
|
if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
SyncIndex index = pBuf->endIndex - 1;
|
SyncIndex index = pBuf->endIndex - 1;
|
||||||
|
@ -1308,8 +1401,8 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
|
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
|
||||||
syncLogReplReset(pMgr);
|
syncLogReplReset(pMgr);
|
||||||
}
|
}
|
||||||
(void)syncLogBufferValidate(pBuf);
|
|
||||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue