From 6cf1adf1b1d417f55ccfe2afba22d25c8dc1dc50 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 May 2023 13:01:45 +0000 Subject: [PATCH] factor code --- source/libs/stream/src/streamMeta.c | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 60d505b305..a8af310bad 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -39,18 +39,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF sprintf(streamPath, "%s/%s", path, "stream"); pMeta->path = taosStrdup(streamPath); if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { - taosMemoryFree(streamPath); goto _err; } + memset(streamPath, 0, len); sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); code = taosMulModeMkDir(streamPath, 0755); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); - taosMemoryFree(streamPath); goto _err; } - taosMemoryFree(streamPath); if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { goto _err; @@ -83,24 +81,24 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->expandFunc = expandFunc; pMeta->streamBackendId = streamBackendId; - char* statePath = taosMemoryCalloc(1, len); - sprintf(statePath, "%s/%s", pMeta->path, "state"); - code = taosMulModeMkDir(statePath, 0755); + memset(streamPath, 0, len); + sprintf(streamPath, "%s/%s", pMeta->path, "state"); + code = taosMulModeMkDir(streamPath, 0755); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); - taosMemoryFree(streamPath); goto _err; } - pMeta->streamBackend = streamBackendInit(statePath); + pMeta->streamBackend = streamBackendInit(streamPath); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); - taosMemoryFree(statePath); + taosMemoryFree(streamPath); taosInitRWLatch(&pMeta->lock); return pMeta; _err: + taosMemoryFree(streamPath); taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); @@ -266,13 +264,9 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) { SStreamTask* pTask = *ppTask; - - // taosWLockLatch(&pMeta->lock); - taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); - // atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); int32_t num = taosArrayGetSize(pMeta->pTaskList);