From 0c7a4bfad950ff93ccb9773de220a08ba515b9d1 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 20 Feb 2023 18:48:07 +0800 Subject: [PATCH 1/2] enh: refactor syncBeginSnapshot and walBeginSnapshot for logRetention --- include/libs/wal/wal.h | 3 +- source/dnode/mnode/sdb/src/sdbFile.c | 2 +- source/libs/sync/src/syncMain.c | 96 +++++++--------------------- source/libs/wal/src/walWrite.c | 28 ++++---- source/libs/wal/test/walMetaTest.cpp | 6 +- 5 files changed, 46 insertions(+), 89 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 1eed342f8c..169013d6c1 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -66,6 +66,7 @@ typedef struct { int64_t commitVer; int64_t appliedVer; int64_t lastVer; + int64_t logRetention; } SWalVer; #pragma pack(push, 1) @@ -180,7 +181,7 @@ void walFsync(SWal *, bool force); int32_t walCommit(SWal *, int64_t ver); int32_t walRollback(SWal *, int64_t ver); // notify that previous logs can be pruned safely -int32_t walBeginSnapshot(SWal *, int64_t ver); +int32_t walBeginSnapshot(SWal *, int64_t ver, int64_t logRetention); int32_t walEndSnapshot(SWal *); int32_t walRestoreFromSnapshot(SWal *, int64_t ver); // for tq diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index c2d7a9757a..2d4b7a1e56 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -472,7 +472,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) { taosThreadMutexLock(&pSdb->filelock); if (pSdb->pWal != NULL) { - // code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex); + // code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex, 0); if (pSdb->sync == 0) { code = 0; } else { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3f0432d998..1deb9fa066 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -270,88 +270,40 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { return -1; } + SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); + SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); + bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore); + + if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) { + sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty); + syncNodeRelease(pSyncNode); + return 0; + } + int32_t code = 0; + int64_t logRetention = 0; if (syncNodeIsMnode(pSyncNode)) { // mnode - int64_t logRetention = SYNC_MNODE_LOG_RETENTION; - - SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); - SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); - int64_t logNum = endIndex - beginIndex; - bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore); - - if (isEmpty || (!isEmpty && logNum < logRetention)) { - sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal", - lastApplyIndex, logNum, isEmpty); - syncNodeRelease(pSyncNode); - return 0; - } - - goto _DEL_WAL; - + logRetention = SYNC_MNODE_LOG_RETENTION; } else { - SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); - SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); - bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore); - - if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) { - sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty); - syncNodeRelease(pSyncNode); - return 0; - } - // vnode if (pSyncNode->replicaNum > 1) { // multi replicas - - lastApplyIndex = TMAX(lastApplyIndex - SYNC_VNODE_LOG_RETENTION, beginIndex - 1); - - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode); - - for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { - int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i])); - if (lastApplyIndex > matchIndex) { - sNTrace(pSyncNode, - "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64 - " of dnode:%d, do not delete wal", - lastApplyIndex, matchIndex, DID(&pSyncNode->peersId[i])); - - syncNodeRelease(pSyncNode); - return 0; - } - } - - } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { - if (lastApplyIndex > pSyncNode->minMatchIndex) { - sNTrace(pSyncNode, - "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal", - lastApplyIndex, pSyncNode->minMatchIndex); - syncNodeRelease(pSyncNode); - return 0; - } - - } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) { - sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex); - syncNodeRelease(pSyncNode); - return 0; - - } else { - sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex); - syncNodeRelease(pSyncNode); - return 0; - } - - goto _DEL_WAL; - - } else { - // one replica - - goto _DEL_WAL; + logRetention = SYNC_VNODE_LOG_RETENTION; } } + if (pSyncNode->replicaNum > 1) { + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { + sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal", + lastApplyIndex); + syncNodeRelease(pSyncNode); + return 0; + } + logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex); + } + _DEL_WAL: do { @@ -366,7 +318,7 @@ _DEL_WAL: atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex); pSyncNode->snapshottingTime = taosGetTimestampMs(); - code = walBeginSnapshot(pData->pWal, lastApplyIndex); + code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention); if (code == 0) { sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64, pSyncNode->snapshottingIndex, lastApplyIndex); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 96c77d0971..b38961709e 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -247,21 +247,23 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) { return 0; } -int32_t walBeginSnapshot(SWal *pWal, int64_t ver) { +int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) { taosThreadMutexLock(&pWal->mutex); - + ASSERT(logRetention >= 0); pWal->vers.verInSnapshotting = ver; - wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, - pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer); + pWal->vers.logRetention = logRetention; + + wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", log retention %" PRId64 " first ver %" PRId64 + ", last ver %" PRId64, + pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer); // check file rolling - if (pWal->cfg.retentionPeriod == 0) { - if (walGetLastFileSize(pWal) != 0) { - if (walRollImpl(pWal) < 0) { - wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr()); - goto _err; - } + if (walGetLastFileSize(pWal) != 0) { + if (walRollImpl(pWal) < 0) { + wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr()); + goto _err; } } + taosThreadMutexUnlock(&pWal->mutex); return 0; @@ -275,8 +277,9 @@ int32_t walEndSnapshot(SWal *pWal) { taosThreadMutexLock(&pWal->mutex); int64_t ver = pWal->vers.verInSnapshotting; - wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, pWal->cfg.vgId, - ver, pWal->vers.firstVer, pWal->vers.lastVer); + wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", log retention %" PRId64 " first ver %" PRId64 + ", last ver %" PRId64, + pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer); if (ver == -1) { code = -1; @@ -286,6 +289,7 @@ int32_t walEndSnapshot(SWal *pWal) { pWal->vers.snapshotVer = ver; int ts = taosGetTimestampSec(); + ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1); void *pIter = NULL; while (1) { pIter = taosHashIterate(pWal->pRefHash, pIter); diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index 891e7dcdae..0784db917a 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -264,7 +264,7 @@ TEST_F(WalCleanEnv, rollbackMultiFile) { ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, i); if (i == 5) { - walBeginSnapshot(pWal, i); + walBeginSnapshot(pWal, i, 0); walEndSnapshot(pWal); } } @@ -301,7 +301,7 @@ TEST_F(WalCleanDeleteEnv, roll) { ASSERT_EQ(pWal->vers.commitVer, i); } - walBeginSnapshot(pWal, i - 1); + walBeginSnapshot(pWal, i - 1, 0); ASSERT_EQ(pWal->vers.verInSnapshotting, i - 1); walEndSnapshot(pWal); ASSERT_EQ(pWal->vers.snapshotVer, i - 1); @@ -317,7 +317,7 @@ TEST_F(WalCleanDeleteEnv, roll) { ASSERT_EQ(pWal->vers.commitVer, i); } - code = walBeginSnapshot(pWal, i - 1); + code = walBeginSnapshot(pWal, i - 1, 0); ASSERT_EQ(code, 0); code = walEndSnapshot(pWal); ASSERT_EQ(code, 0); From 3b47dd753e4fb3c620582e278e13de9c3568c317 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 21 Feb 2023 20:01:54 +0800 Subject: [PATCH 2/2] enh: update pSyncNode->minMatchIndex in sncNodeEqPeerHeartbeatTimer --- source/libs/sync/src/syncMain.c | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1deb9fa066..84341803d1 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2094,24 +2094,19 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { if (timerLogicClock == msgLogicClock) { if (tsNow > pData->execTime) { -#if 0 - sTrace( - "vgId:%d, hbDataRid:%ld, EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, " - "---------", - pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime); -#endif - pData->execTime += pSyncTimer->timerMS; SRpcMsg rpcMsg = {0}; (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId); + pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode); + SyncHeartbeat* pSyncMsg = rpcMsg.pCont; pSyncMsg->srcId = pSyncNode->myRaftId; pSyncMsg->destId = pData->destId; pSyncMsg->term = raftStoreGetTerm(pSyncNode); pSyncMsg->commitIndex = pSyncNode->commitIndex; - pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); + pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex; pSyncMsg->privateTerm = 0; pSyncMsg->timeStamp = tsNow; @@ -2123,11 +2118,6 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime); syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); } else { -#if 0 - sTrace( - "vgId:%d, hbDataRid:%ld, pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------", - pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime); -#endif } if (syncIsInit()) {