diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 759b56c92a..b3dc444174 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -297,10 +297,30 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { ASSERT(rid == pSyncNode->rid); int32_t code = 0; + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { + int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i])); + if (lastApplyIndex > matchIndex) { + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "new-snapshot-index:%ld is greater than match-index:%ld of %s:%d, do not delete wal", lastApplyIndex, + matchIndex, host, port); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); + + return 0; + } + } + if (pSyncNode->replicaNum == 1) { SSyncLogStoreData* pData = pSyncNode->pLogStore->data; code = walBeginSnapshot(pData->pWal, lastApplyIndex); } else { + // calculate snapshot index + SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex); if (snapshottingIndex == SYNC_INDEX_INVALID) { @@ -310,7 +330,10 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { code = walBeginSnapshot(pData->pWal, lastApplyIndex); } else { - sError("vgId:%d snapshotting index:%ld, lastApplyIndex:%ld", snapshottingIndex, lastApplyIndex); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "snapshotting for %ld, do not delete wal for new-snapshot-index:%ld", + snapshottingIndex, lastApplyIndex); + syncNodeEventLog(pSyncNode, logBuf); } } @@ -326,8 +349,13 @@ int32_t syncEndSnapshot(int64_t rid) { } ASSERT(rid == pSyncNode->rid); - SSyncLogStoreData* pData = pSyncNode->pLogStore->data; - int32_t code = walEndSnapshot(pData->pWal); + int32_t code = 0; + if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) { + SSyncLogStoreData* pData = pSyncNode->pLogStore->data; + code = walEndSnapshot(pData->pWal); + + atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID); + } taosReleaseRef(tsNodeRefId, pSyncNode->rid); return code;