diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a428a9ae6a..e54237fe8b 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -33,11 +33,12 @@ extern "C" { #define SYNC_MAX_PROGRESS_WAIT_MS 4000 #define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20) #define SYNC_MAX_RECV_TIME_RANGE_MS 1200 -#define SYNC_DEL_WAL_MS (1000 * 60) #define SYNC_ADD_QUORUM_COUNT 3 #define SYNC_VNODE_LOG_RETENTION (TSDB_SYNC_LOG_BUFFER_RETENTION + 1) #define SNAPSHOT_WAIT_MS 1000 * 5 +#define SYNC_WAL_LOG_RETENTION_SIZE (8LL * 1024 * 1024 * 1024) + #define SYNC_MAX_RETRY_BACKOFF 5 #define SYNC_LOG_REPL_RETRY_WAIT_MS 100 #define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000 @@ -219,6 +220,7 @@ typedef struct SSyncLogStore { SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore); + SyncIndex (*syncLogIndexRetention)(struct SSyncLogStore* pLogStore, int64_t bytes); SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore); int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forcSync); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index a56a5567eb..7c00ff5178 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -225,6 +225,7 @@ bool walIsEmpty(SWal *); int64_t walGetFirstVer(SWal *); int64_t walGetSnapshotVer(SWal *); int64_t walGetLastVer(SWal *); +int64_t walGetVerRetention(SWal *pWal, int64_t bytes); int64_t walGetCommittedVer(SWal *); int64_t walGetAppliedVer(SWal *); diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index de8bd81b30..137baab558 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -46,6 +46,7 @@ SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore); SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore); int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore); SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore); +SyncIndex raftLogIndexRetention(struct SSyncLogStore* pLogStore, int64_t bytes); SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore); int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry); diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index f8ee99e8a0..66d8edfdfc 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -63,7 +63,6 @@ typedef struct SSyncSnapshotSender { int64_t sendingMS; SyncTerm term; int64_t startTime; - int64_t waitTime; int64_t lastSendTime; bool finish; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 199c7a1445..b8740a2858 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -305,6 +305,10 @@ SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) { return minMatchIndex; } +static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) { + return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes); +} + int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { @@ -331,7 +335,6 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { } else { // vnode if (pSyncNode->replicaNum > 1) { - // multi replicas logRetention = SYNC_VNODE_LOG_RETENTION; } } @@ -344,7 +347,9 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { syncNodeRelease(pSyncNode); return 0; } - logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex + logRetention); + SyncIndex retentionIndex = + TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE)); + logRetention += TMAX(0, lastApplyIndex - retentionIndex); } _DEL_WAL: diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index b167f2ecb6..b9c6838fda 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -70,6 +70,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { pLogStore->syncLogIsEmpty = raftLogIsEmpty; pLogStore->syncLogEntryCount = raftLogEntryCount; pLogStore->syncLogLastIndex = raftLogLastIndex; + pLogStore->syncLogIndexRetention = raftLogIndexRetention; pLogStore->syncLogLastTerm = raftLogLastTerm; pLogStore->syncLogAppendEntry = raftLogAppendEntry; pLogStore->syncLogGetEntry = raftLogGetEntry; @@ -154,6 +155,15 @@ SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) { return lastVer; } +SyncIndex raftLogIndexRetention(struct SSyncLogStore* pLogStore, int64_t bytes) { + SyncIndex lastIndex; + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + SyncIndex lastVer = walGetVerRetention(pWal, bytes); + + return lastVer; +} + SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index d7b19a75cd..cbdc60b2b3 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -23,6 +23,8 @@ #include "syncReplication.h" #include "syncUtil.h" +static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths); + static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) { for (int64_t i = pBuf->start; i < pBuf->end; ++i) { if (pBuf->entryDeleteCb) { @@ -79,7 +81,6 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->replicaIndex = replicaIndex; pSender->term = raftStoreGetTerm(pSyncNode); pSender->startTime = -1; - pSender->waitTime = -1; pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); pSender->finish = false; @@ -199,7 +200,6 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { taosThreadMutexLock(&pSender->pSndBuf->mutex); { pSender->finish = finish; - pSender->waitTime = -1; // close reader if (pSender->pReader != NULL) { @@ -371,14 +371,7 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { return 0; } - int64_t timeNow = taosGetTimestampMs(); - if (pSender->waitTime <= 0) { - pSender->waitTime = timeNow + SNAPSHOT_WAIT_MS; - } - if (timeNow < pSender->waitTime) { - sSDebug(pSender, "snapshot sender waitTime not expired yet, ignore"); - return 0; - } + taosMsleep(1); int32_t code = snapshotSenderStart(pSender); if (code != 0) { @@ -514,7 +507,9 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p pReceiver->term = pPreMsg->term; pReceiver->fromId = pPreMsg->srcId; pReceiver->startTime = pPreMsg->startTime; - ASSERT(pReceiver->startTime); + + pReceiver->snapshotParam.start = syncNodeGetSnapBeginIndex(pReceiver->pSyncNode); + pReceiver->snapshotParam.end = -1; sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId)); } diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 933014466a..b897eb4922 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -654,6 +654,23 @@ _err: return -1; } +int64_t walGetVerRetention(SWal* pWal, int64_t bytes) { + int64_t ver = -1; + int64_t totSize = 0; + taosThreadMutexLock(&pWal->mutex); + int32_t fileIdx = taosArrayGetSize(pWal->fileInfoSet); + while (--fileIdx) { + SWalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); + if (totSize >= bytes) { + ver = pInfo->lastVer; + break; + } + totSize += pInfo->fileSize; + } + taosThreadMutexUnlock(&pWal->mutex); + return ver + 1; +} + int walCheckAndRepairIdx(SWal* pWal) { int32_t sz = taosArrayGetSize(pWal->fileInfoSet); int32_t fileIdx = sz;