diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index e000ba8bf8..4a35a15d3e 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -579,6 +579,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); if (code != 0) { + sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); return -1; } @@ -710,6 +711,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); if (code != 0) { + sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); return -1; } @@ -859,6 +861,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); if (code != 0) { + sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); return -1; } @@ -974,6 +977,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); if (code != 0) { + sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); return -1; } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index b604d25816..e8ad22a10d 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -76,7 +76,10 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); } else { pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); - ASSERT(pEntry != NULL); + if (pEntry == NULL) { + sError("failed to get entry since %s. index:%lld", tstrerror(terrno), index); + return; + } } // cannot commit, even if quorum agree. need check term! if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d6da42fd42..ad0390ac7c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2650,7 +2650,10 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { if (ths->state == TAOS_SYNC_STATE_LEADER) { int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); - ASSERT(code == 0); + if (code != 0) { + sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); + return -1; + } syncNodeReplicate(ths, false); } @@ -2790,8 +2793,8 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); if (code != 0) { + sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); // del resp mgr, call FpCommitCb - ASSERT(0); return -1; } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 1cfe37dc7e..c3dad104d1 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -412,13 +412,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { return 0; } -SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { +SSyncRaftEntry* logStoreGetEntryWithoutLock(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) { - taosThreadMutexLock(&(pData->mutex)); - // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); SWalReader* pWalHandle = pData->pWalHandle; ASSERT(pWalHandle != NULL); @@ -442,7 +440,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { } } while (0); - ASSERT(0); + sError("failed to read ver since %s. index:%lld", tstrerror(terrno), index); + return NULL; } SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); @@ -463,7 +462,6 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { terrno = saveErr; */ - taosThreadMutexUnlock(&(pData->mutex)); return pEntry; } else { @@ -471,6 +469,16 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { } } +SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { + SSyncLogStoreData* pData = pLogStore->data; + SSyncRaftEntry *pEntry = NULL; + + taosThreadMutexLock(&pData->mutex); + pEntry = logStoreGetEntryWithoutLock(pLogStore, index); + taosThreadMutexUnlock(&pData->mutex); + return pEntry; +} + int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal;