From 2359e0e61bb1d50f99d4cf4c09c3f4f4324e3865 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 16 May 2024 18:07:57 +0800 Subject: [PATCH] opt checkpoint --- source/libs/stream/src/streamBackendRocksdb.c | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index bcc5da81ea..5f3eb9c630 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1010,6 +1010,50 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t* } #endif +int chkpIdComp(const void* a, const void* b) { + int64_t x = *(int64_t*)a; + int64_t y = *(int64_t*)b; + if (x == y) return 0; + + return x < y ? -1 : 1; +} +int32_t chkpLoadInfo(STaskDbWrapper* pBackend) { + int32_t code = 0; + char* pChkpDir = taosMemoryCalloc(1, 256); + + sprintf(pChkpDir, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints"); + if (!taosIsDir(pChkpDir)) { + taosMemoryFree(pChkpDir); + return 0; + } + TdDirPtr pDir = taosOpenDir(pChkpDir); + if (pDir == NULL) { + taosMemoryFree(pChkpDir); + return 0; + } + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { + if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue; + + if (taosDirEntryIsDir(de)) { + char checkpointPrefix[32] = {0}; + int64_t checkpointId = 0; + + int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId); + if (ret == 1) { + taosArrayPush(pBackend->chkpSaved, &checkpointId); + } + } else { + continue; + } + } + taosArraySort(pBackend->chkpSaved, chkpIdComp); + + taosMemoryFree(pChkpDir); + taosCloseDir(&pDir); + + return 0; +} int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) { SArray* pHandle = taosArrayInit(8, POINTER_BYTES); for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { @@ -1902,6 +1946,8 @@ void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) { pTaskDb->chkpId = -1; pTaskDb->chkpCap = 4; pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t)); + chkpLoadInfo(pTaskDb); + pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t)); taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL);