From 3ebc7eef5f32193398b5afc51137a9d2a52b488a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 7 Oct 2023 17:03:12 +0800 Subject: [PATCH] refact task backend --- source/libs/stream/src/streamBackendRocksdb.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4edb166982..07249862fb 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -715,9 +715,10 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { * chkpInUse is doing translation, cannot del until * replication is finished */ -int32_t chkpDelObsolete(void* arg, char* path) { +int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) { STaskBackendWrapper* pBackend = arg; taosThreadRwlockWrlock(&pBackend->chkpDirLock); + taosArrayPush(pBackend->chkpSaved, &chkpId); SArray* chkpDel = taosArrayInit(8, sizeof(int64_t)); SArray* chkpDup = taosArrayInit(8, sizeof(int64_t)); @@ -1041,9 +1042,9 @@ int32_t taskBackendDoCheckpoint(void* arg, uint64_t chkpId) { qError("stream backend:%p failed to flush db at:%s", pBackend, pChkpIdDir); } - code = chkpDelObsolete(pBackend, pChkpDir); - taosReleaseRef(taskBackendWrapperId, refId); + code = chkpMayDelObsolete(pBackend, chkpId, pChkpDir); + taosReleaseRef(taskBackendWrapperId, refId); return code; _EXIT: