fix: eliminate asserts on errors of appending or getting log entry
This commit is contained in:
parent
9055f63742
commit
715f53a856
|
@ -579,6 +579,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
|
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
|
||||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -710,6 +711,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
|
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
|
||||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -859,6 +861,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
|
|
||||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -974,6 +977,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
|
|
||||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -76,7 +76,10 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
|
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
|
||||||
} else {
|
} else {
|
||||||
pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
|
pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
|
||||||
ASSERT(pEntry != NULL);
|
if (pEntry == NULL) {
|
||||||
|
sError("failed to get entry since %s. index:%lld", tstrerror(terrno), index);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// cannot commit, even if quorum agree. need check term!
|
// cannot commit, even if quorum agree. need check term!
|
||||||
if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) {
|
if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) {
|
||||||
|
|
|
@ -2650,7 +2650,10 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
||||||
|
|
||||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
||||||
ASSERT(code == 0);
|
if (code != 0) {
|
||||||
|
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
syncNodeReplicate(ths, false);
|
syncNodeReplicate(ths, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2790,8 +2793,8 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p
|
||||||
|
|
||||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
|
||||||
// del resp mgr, call FpCommitCb
|
// del resp mgr, call FpCommitCb
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -412,13 +412,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
SSyncRaftEntry* logStoreGetEntryWithoutLock(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
SSyncLogStoreData* pData = pLogStore->data;
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
SWal* pWal = pData->pWal;
|
SWal* pWal = pData->pWal;
|
||||||
|
|
||||||
if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
|
if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
|
||||||
taosThreadMutexLock(&(pData->mutex));
|
|
||||||
|
|
||||||
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
|
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
|
||||||
SWalReader* pWalHandle = pData->pWalHandle;
|
SWalReader* pWalHandle = pData->pWalHandle;
|
||||||
ASSERT(pWalHandle != NULL);
|
ASSERT(pWalHandle != NULL);
|
||||||
|
@ -442,7 +440,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
}
|
}
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
ASSERT(0);
|
sError("failed to read ver since %s. index:%lld", tstrerror(terrno), index);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
|
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
|
||||||
|
@ -463,7 +462,6 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
terrno = saveErr;
|
terrno = saveErr;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
taosThreadMutexUnlock(&(pData->mutex));
|
|
||||||
return pEntry;
|
return pEntry;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
@ -471,6 +469,16 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
|
SSyncRaftEntry *pEntry = NULL;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pData->mutex);
|
||||||
|
pEntry = logStoreGetEntryWithoutLock(pLogStore, index);
|
||||||
|
taosThreadMutexUnlock(&pData->mutex);
|
||||||
|
return pEntry;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
|
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
|
||||||
SSyncLogStoreData* pData = pLogStore->data;
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
SWal* pWal = pData->pWal;
|
SWal* pWal = pData->pWal;
|
||||||
|
|
Loading…
Reference in New Issue