From 0b92a6b1eaf0013469a0d99b1cd465ec70c49394 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 9 Oct 2023 11:16:16 +0800 Subject: [PATCH] fix mem leak --- source/libs/stream/src/streamBackendRocksdb.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index ba05d9968b..f936b569ae 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -717,7 +717,9 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { */ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) { STaskBackendWrapper* pBackend = arg; + taosThreadRwlockWrlock(&pBackend->chkpDirLock); + taosArrayPush(pBackend->chkpSaved, &chkpId); SArray* chkpDel = taosArrayInit(8, sizeof(int64_t)); @@ -1028,6 +1030,7 @@ int32_t taskBackendDoCheckpoint(void* arg, uint64_t chkpId) { char* pChkpDir = NULL; char* pChkpIdDir = NULL; if (chkpPreBuildDir(pBackend->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) { + code = -1; goto _EXIT; } // Get all cf and acquire cfWrappter @@ -1036,27 +1039,24 @@ int32_t taskBackendDoCheckpoint(void* arg, uint64_t chkpId) { int32_t nCf = chkpGetAllDbCfHandle2(pBackend, &ppCf); qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pBackend, pChkpIdDir, nCf); - if ((code = chkpPreFlushDb(pBackend->db, ppCf, nCf)) == 0 && nCf != 0) { + if ((code = chkpPreFlushDb(pBackend->db, ppCf, nCf)) == 0) { if ((code = chkpDoDbCheckpoint(pBackend->db, pChkpIdDir)) != 0) { qError("stream backend:%p failed to do checkpoint at:%s", pBackend, pChkpIdDir); } else { qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pBackend, pChkpIdDir, taosGetTimestampMs() - st); } - } else if (nCf != 0) { + } else { qError("stream backend:%p failed to flush db at:%s", pBackend, pChkpIdDir); } code = chkpMayDelObsolete(pBackend, chkpId, pChkpDir); - taosReleaseRef(taskBackendWrapperId, refId); - return code; - _EXIT: taosMemoryFree(pChkpDir); taosMemoryFree(pChkpIdDir); taosReleaseRef(taskBackendWrapperId, refId); - return -1; + return code; } int32_t streamBackendDoCheckpoint(void* arg, uint64_t chkpId) { return taskBackendDoCheckpoint(arg, chkpId); }