enh: clean WAL logs based on size if one follower is offline.

This commit is contained in:
Benguang Zhao 2023-12-13 09:57:12 +08:00
parent 3be9af9765
commit c74c4fe208
6 changed files with 39 additions and 3 deletions

View File

@ -33,11 +33,12 @@ extern "C" {
#define SYNC_MAX_PROGRESS_WAIT_MS 4000
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
#define SYNC_MAX_RECV_TIME_RANGE_MS 1200
#define SYNC_DEL_WAL_MS (1000 * 60)
#define SYNC_ADD_QUORUM_COUNT 3
#define SYNC_VNODE_LOG_RETENTION (TSDB_SYNC_LOG_BUFFER_RETENTION + 1)
#define SNAPSHOT_WAIT_MS 1000 * 5
#define SYNC_WAL_LOG_RETENTION_SIZE (8LL * 1024 * 1024 * 1024)
#define SYNC_MAX_RETRY_BACKOFF 5
#define SYNC_LOG_REPL_RETRY_WAIT_MS 100
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
@ -219,6 +220,7 @@ typedef struct SSyncLogStore {
SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore);
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
SyncIndex (*syncLogIndexRetention)(struct SSyncLogStore* pLogStore, int64_t bytes);
SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore);
int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forcSync);

View File

@ -225,6 +225,7 @@ bool walIsEmpty(SWal *);
int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *);
int64_t walGetVerRetention(SWal *pWal, int64_t bytes);
int64_t walGetCommittedVer(SWal *);
int64_t walGetAppliedVer(SWal *);

View File

@ -46,6 +46,7 @@ SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore);
SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore);
int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore);
SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore);
SyncIndex raftLogIndexRetention(struct SSyncLogStore* pLogStore, int64_t bytes);
SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore);
int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);

View File

@ -305,6 +305,10 @@ SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
return minMatchIndex;
}
static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) {
return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes);
}
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
@ -331,7 +335,6 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
} else {
// vnode
if (pSyncNode->replicaNum > 1) {
// multi replicas
logRetention = SYNC_VNODE_LOG_RETENTION;
}
}
@ -344,7 +347,9 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
syncNodeRelease(pSyncNode);
return 0;
}
logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex + logRetention);
SyncIndex retentionIndex =
TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE));
logRetention += TMAX(0, lastApplyIndex - retentionIndex);
}
_DEL_WAL:

View File

@ -70,6 +70,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pLogStore->syncLogIsEmpty = raftLogIsEmpty;
pLogStore->syncLogEntryCount = raftLogEntryCount;
pLogStore->syncLogLastIndex = raftLogLastIndex;
pLogStore->syncLogIndexRetention = raftLogIndexRetention;
pLogStore->syncLogLastTerm = raftLogLastTerm;
pLogStore->syncLogAppendEntry = raftLogAppendEntry;
pLogStore->syncLogGetEntry = raftLogGetEntry;
@ -154,6 +155,15 @@ SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
return lastVer;
}
SyncIndex raftLogIndexRetention(struct SSyncLogStore* pLogStore, int64_t bytes) {
SyncIndex lastIndex;
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex lastVer = walGetVerRetention(pWal, bytes);
return lastVer;
}
SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;

View File

@ -654,6 +654,23 @@ _err:
return -1;
}
int64_t walGetVerRetention(SWal* pWal, int64_t bytes) {
int64_t ver = -1;
int64_t totSize = 0;
taosThreadMutexLock(&pWal->mutex);
int32_t fileIdx = taosArrayGetSize(pWal->fileInfoSet);
while (--fileIdx) {
SWalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
if (totSize >= bytes) {
ver = pInfo->lastVer;
break;
}
totSize += pInfo->fileSize;
}
taosThreadMutexUnlock(&pWal->mutex);
return ver + 1;
}
int walCheckAndRepairIdx(SWal* pWal) {
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
int32_t fileIdx = sz;