From 9ef69dd455e2df379bd7043cd37e97f5cf6241a0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 21 Aug 2023 10:27:47 +0800 Subject: [PATCH] refactor checkpoint --- source/libs/stream/src/streamBackendRocksdb.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index cdcd482442..37b4f45df6 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -871,14 +871,14 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { int32_t code = -1; SArray* refs = taosArrayInit(16, sizeof(int64_t)); - SArray* pCf = taosArrayInit(16, POINTER_BYTES); rocksdb_column_family_handle_t** ppCf = NULL; char* pChkpDir = NULL; char* pChkpIdDir = NULL; if (chkpPreCheckDir(pMeta->path, checkpointId, &pChkpDir, &pChkpIdDir) != 0) { - goto _ERROR; + taosArrayDestroy(refs); + return code; } SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); @@ -886,6 +886,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { goto _ERROR; } + // Get all cf and acquire cfWrappter int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf); @@ -901,7 +902,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { } else { qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir); } - + // release all ref to cfWrapper; for (int i = 0; i < taosArrayGetSize(refs); i++) { int64_t id = *(int64_t*)taosArrayGet(refs, i); taosReleaseRef(streamBackendCfWrapperId, id);