From bdc4afec8323e251e3606f189c2efdd19d81f476 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 7 Oct 2023 17:17:30 +0800 Subject: [PATCH] refact task backend --- source/libs/stream/src/streamBackendRocksdb.c | 28 +++++++++---------- source/libs/stream/src/streamMeta.c | 2 ++ 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 97d6f0634a..539844a4e8 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1639,50 +1639,50 @@ void taskBackendDestroyChkpOpt(STaskBackendWrapper* pBackend) { taosThreadRwlockDestroy(&pBackend->chkpDirLock); } -int32_t taskBackendBuildFullPath(char* path, char* key, char** dbFullPath, char** taskFullPath) { +int32_t taskBackendBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) { int32_t code = 0; - char* taskPath = taosMemoryCalloc(1, strlen(path) + 128); - sprintf(taskPath, "%s%s%s", path, TD_DIRSEP, key); - if (!taosDirExist(taskPath)) { - code = taosMulMkDir(taskPath); + char* statePath = taosMemoryCalloc(1, strlen(path) + 128); + sprintf(statePath, "%s%s%s", path, TD_DIRSEP, key); + if (!taosDirExist(statePath)) { + code = taosMulMkDir(statePath); if (code != 0) { - qError("failed to create dir: %s, reason:%s", taskPath, tstrerror(code)); - taosMemoryFree(taskPath); + qError("failed to create dir: %s, reason:%s", statePath, tstrerror(code)); + taosMemoryFree(statePath); return code; } } - char* dbPath = taosMemoryCalloc(1, strlen(taskPath) + 128); - sprintf(dbPath, "%s%s%s", taskPath, TD_DIRSEP, "state"); + char* dbPath = taosMemoryCalloc(1, strlen(statePath) + 128); + sprintf(dbPath, "%s%s%s", statePath, TD_DIRSEP, "state"); if (!taosDirExist(dbPath)) { code = taosMulMkDir(dbPath); if (code != 0) { qError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code)); - taosMemoryFree(taskPath); + taosMemoryFree(statePath); taosMemoryFree(dbPath); return code; } } *dbFullPath = dbPath; - *taskFullPath = taskPath; + *stateFullPath = statePath; return 0; } STaskBackendWrapper* taskBackendOpen(char* path, char* key) { - char* taskPath = NULL; + char* statePath = NULL; char* err = NULL; char* dbPath = NULL; char** cfNames = NULL; size_t nCf = 0; - if (taskBackendBuildFullPath(path, key, &dbPath, &taskPath) != 0) { + if (taskBackendBuildFullPath(path, key, &dbPath, &statePath) != 0) { return NULL; } STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(STaskBackendWrapper)); - pTaskBackend->path = taskPath; + pTaskBackend->path = statePath; taskBackendInitChkpOpt(pTaskBackend); taskBackendInitDBOpt(pTaskBackend); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 1892f58b9e..8cce4dc2f9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -237,9 +237,11 @@ void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref) taosThreadMutexUnlock(&pMeta->backendMutex); return NULL; } + *ref = taosAddRef(taskBackendWrapperId, pBackend); taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*)); + taosThreadMutexUnlock(&pMeta->backendMutex); return pBackend; } SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {