Merge pull request #20061 from taosdata/FIX/TD-22611-main
enh: refactor syncBeginSnapshot and walBeginSnapshot for logRetention
This commit is contained in:
commit
d04d610a85
|
@ -66,6 +66,7 @@ typedef struct {
|
||||||
int64_t commitVer;
|
int64_t commitVer;
|
||||||
int64_t appliedVer;
|
int64_t appliedVer;
|
||||||
int64_t lastVer;
|
int64_t lastVer;
|
||||||
|
int64_t logRetention;
|
||||||
} SWalVer;
|
} SWalVer;
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
|
@ -180,7 +181,7 @@ void walFsync(SWal *, bool force);
|
||||||
int32_t walCommit(SWal *, int64_t ver);
|
int32_t walCommit(SWal *, int64_t ver);
|
||||||
int32_t walRollback(SWal *, int64_t ver);
|
int32_t walRollback(SWal *, int64_t ver);
|
||||||
// notify that previous logs can be pruned safely
|
// notify that previous logs can be pruned safely
|
||||||
int32_t walBeginSnapshot(SWal *, int64_t ver);
|
int32_t walBeginSnapshot(SWal *, int64_t ver, int64_t logRetention);
|
||||||
int32_t walEndSnapshot(SWal *);
|
int32_t walEndSnapshot(SWal *);
|
||||||
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
|
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
|
||||||
// for tq
|
// for tq
|
||||||
|
|
|
@ -472,7 +472,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
|
||||||
|
|
||||||
taosThreadMutexLock(&pSdb->filelock);
|
taosThreadMutexLock(&pSdb->filelock);
|
||||||
if (pSdb->pWal != NULL) {
|
if (pSdb->pWal != NULL) {
|
||||||
// code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex);
|
// code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex, 0);
|
||||||
if (pSdb->sync == 0) {
|
if (pSdb->sync == 0) {
|
||||||
code = 0;
|
code = 0;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -270,88 +270,40 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
|
||||||
|
SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
|
||||||
|
bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
|
||||||
|
|
||||||
|
if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
|
||||||
|
sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
|
||||||
|
syncNodeRelease(pSyncNode);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int64_t logRetention = 0;
|
||||||
|
|
||||||
if (syncNodeIsMnode(pSyncNode)) {
|
if (syncNodeIsMnode(pSyncNode)) {
|
||||||
// mnode
|
// mnode
|
||||||
int64_t logRetention = SYNC_MNODE_LOG_RETENTION;
|
logRetention = SYNC_MNODE_LOG_RETENTION;
|
||||||
|
|
||||||
SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
|
|
||||||
SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
|
|
||||||
int64_t logNum = endIndex - beginIndex;
|
|
||||||
bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
|
|
||||||
|
|
||||||
if (isEmpty || (!isEmpty && logNum < logRetention)) {
|
|
||||||
sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
|
|
||||||
lastApplyIndex, logNum, isEmpty);
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
goto _DEL_WAL;
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
|
|
||||||
SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
|
|
||||||
bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
|
|
||||||
|
|
||||||
if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
|
|
||||||
sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// vnode
|
// vnode
|
||||||
if (pSyncNode->replicaNum > 1) {
|
if (pSyncNode->replicaNum > 1) {
|
||||||
// multi replicas
|
// multi replicas
|
||||||
|
logRetention = SYNC_VNODE_LOG_RETENTION;
|
||||||
lastApplyIndex = TMAX(lastApplyIndex - SYNC_VNODE_LOG_RETENTION, beginIndex - 1);
|
|
||||||
|
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
|
||||||
pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
|
||||||
int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
|
|
||||||
if (lastApplyIndex > matchIndex) {
|
|
||||||
sNTrace(pSyncNode,
|
|
||||||
"new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
|
|
||||||
" of dnode:%d, do not delete wal",
|
|
||||||
lastApplyIndex, matchIndex, DID(&pSyncNode->peersId[i]));
|
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
|
|
||||||
if (lastApplyIndex > pSyncNode->minMatchIndex) {
|
|
||||||
sNTrace(pSyncNode,
|
|
||||||
"new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
|
|
||||||
lastApplyIndex, pSyncNode->minMatchIndex);
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
} else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
|
|
||||||
sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
} else {
|
|
||||||
sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
goto _DEL_WAL;
|
|
||||||
|
|
||||||
} else {
|
|
||||||
// one replica
|
|
||||||
|
|
||||||
goto _DEL_WAL;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pSyncNode->replicaNum > 1) {
|
||||||
|
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
|
sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
|
||||||
|
lastApplyIndex);
|
||||||
|
syncNodeRelease(pSyncNode);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex);
|
||||||
|
}
|
||||||
|
|
||||||
_DEL_WAL:
|
_DEL_WAL:
|
||||||
|
|
||||||
do {
|
do {
|
||||||
|
@ -366,7 +318,7 @@ _DEL_WAL:
|
||||||
atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
|
atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
|
||||||
pSyncNode->snapshottingTime = taosGetTimestampMs();
|
pSyncNode->snapshottingTime = taosGetTimestampMs();
|
||||||
|
|
||||||
code = walBeginSnapshot(pData->pWal, lastApplyIndex);
|
code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
|
sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
|
||||||
pSyncNode->snapshottingIndex, lastApplyIndex);
|
pSyncNode->snapshottingIndex, lastApplyIndex);
|
||||||
|
@ -2142,24 +2094,19 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
||||||
|
|
||||||
if (timerLogicClock == msgLogicClock) {
|
if (timerLogicClock == msgLogicClock) {
|
||||||
if (tsNow > pData->execTime) {
|
if (tsNow > pData->execTime) {
|
||||||
#if 0
|
|
||||||
sTrace(
|
|
||||||
"vgId:%d, hbDataRid:%ld, EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, "
|
|
||||||
"---------",
|
|
||||||
pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
pData->execTime += pSyncTimer->timerMS;
|
pData->execTime += pSyncTimer->timerMS;
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
|
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
|
||||||
|
|
||||||
|
pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
||||||
|
|
||||||
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
|
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
|
||||||
pSyncMsg->srcId = pSyncNode->myRaftId;
|
pSyncMsg->srcId = pSyncNode->myRaftId;
|
||||||
pSyncMsg->destId = pData->destId;
|
pSyncMsg->destId = pData->destId;
|
||||||
pSyncMsg->term = raftStoreGetTerm(pSyncNode);
|
pSyncMsg->term = raftStoreGetTerm(pSyncNode);
|
||||||
pSyncMsg->commitIndex = pSyncNode->commitIndex;
|
pSyncMsg->commitIndex = pSyncNode->commitIndex;
|
||||||
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
|
||||||
pSyncMsg->privateTerm = 0;
|
pSyncMsg->privateTerm = 0;
|
||||||
pSyncMsg->timeStamp = tsNow;
|
pSyncMsg->timeStamp = tsNow;
|
||||||
|
|
||||||
|
@ -2171,11 +2118,6 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
||||||
syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
|
syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
|
||||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||||
} else {
|
} else {
|
||||||
#if 0
|
|
||||||
sTrace(
|
|
||||||
"vgId:%d, hbDataRid:%ld, pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------",
|
|
||||||
pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (syncIsInit()) {
|
if (syncIsInit()) {
|
||||||
|
|
|
@ -247,21 +247,23 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
|
int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) {
|
||||||
taosThreadMutexLock(&pWal->mutex);
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
ASSERT(logRetention >= 0);
|
||||||
pWal->vers.verInSnapshotting = ver;
|
pWal->vers.verInSnapshotting = ver;
|
||||||
wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64,
|
pWal->vers.logRetention = logRetention;
|
||||||
pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
|
||||||
|
wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", log retention %" PRId64 " first ver %" PRId64
|
||||||
|
", last ver %" PRId64,
|
||||||
|
pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer);
|
||||||
// check file rolling
|
// check file rolling
|
||||||
if (pWal->cfg.retentionPeriod == 0) {
|
if (walGetLastFileSize(pWal) != 0) {
|
||||||
if (walGetLastFileSize(pWal) != 0) {
|
if (walRollImpl(pWal) < 0) {
|
||||||
if (walRollImpl(pWal) < 0) {
|
wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr());
|
||||||
wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr());
|
goto _err;
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
@ -275,8 +277,9 @@ int32_t walEndSnapshot(SWal *pWal) {
|
||||||
taosThreadMutexLock(&pWal->mutex);
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
int64_t ver = pWal->vers.verInSnapshotting;
|
int64_t ver = pWal->vers.verInSnapshotting;
|
||||||
|
|
||||||
wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, pWal->cfg.vgId,
|
wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", log retention %" PRId64 " first ver %" PRId64
|
||||||
ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
", last ver %" PRId64,
|
||||||
|
pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer);
|
||||||
|
|
||||||
if (ver == -1) {
|
if (ver == -1) {
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -286,6 +289,7 @@ int32_t walEndSnapshot(SWal *pWal) {
|
||||||
pWal->vers.snapshotVer = ver;
|
pWal->vers.snapshotVer = ver;
|
||||||
int ts = taosGetTimestampSec();
|
int ts = taosGetTimestampSec();
|
||||||
|
|
||||||
|
ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1);
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pWal->pRefHash, pIter);
|
pIter = taosHashIterate(pWal->pRefHash, pIter);
|
||||||
|
|
|
@ -264,7 +264,7 @@ TEST_F(WalCleanEnv, rollbackMultiFile) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(pWal->vers.lastVer, i);
|
ASSERT_EQ(pWal->vers.lastVer, i);
|
||||||
if (i == 5) {
|
if (i == 5) {
|
||||||
walBeginSnapshot(pWal, i);
|
walBeginSnapshot(pWal, i, 0);
|
||||||
walEndSnapshot(pWal);
|
walEndSnapshot(pWal);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -301,7 +301,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
|
||||||
ASSERT_EQ(pWal->vers.commitVer, i);
|
ASSERT_EQ(pWal->vers.commitVer, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
walBeginSnapshot(pWal, i - 1);
|
walBeginSnapshot(pWal, i - 1, 0);
|
||||||
ASSERT_EQ(pWal->vers.verInSnapshotting, i - 1);
|
ASSERT_EQ(pWal->vers.verInSnapshotting, i - 1);
|
||||||
walEndSnapshot(pWal);
|
walEndSnapshot(pWal);
|
||||||
ASSERT_EQ(pWal->vers.snapshotVer, i - 1);
|
ASSERT_EQ(pWal->vers.snapshotVer, i - 1);
|
||||||
|
@ -317,7 +317,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
|
||||||
ASSERT_EQ(pWal->vers.commitVer, i);
|
ASSERT_EQ(pWal->vers.commitVer, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = walBeginSnapshot(pWal, i - 1);
|
code = walBeginSnapshot(pWal, i - 1, 0);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
code = walEndSnapshot(pWal);
|
code = walEndSnapshot(pWal);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
Loading…
Reference in New Issue