diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 12b89dbd9c..e95114fcfb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -990,7 +990,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms if (code < 0) { tqError("vgId:%d failed to add s-task:0x%x, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); - tFreeStreamTask(pTask); return -1; } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index eed65d3f3a..6384d85eb6 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1565,7 +1565,7 @@ int32_t taskBackendBuildFullPath(char* path, char* key, char** fullPath) { char* taskPath = taosMemoryCalloc(1, strlen(path) + 128); sprintf(taskPath, "%s%s%s%s%s", path, TD_DIRSEP, "state", TD_DIRSEP, key); if (!taosDirExist(taskPath)) { - code = taosMkDir(taskPath); + code = taosMulMkDir(taskPath); if (code != 0) { qError("failed to create dir: %s, reason:%s", taskPath, tstrerror(code)); taosMemoryFree(taskPath); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2da4b2f370..ad3ff82ec3 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -229,9 +229,12 @@ void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key) { return *ppBackend; } void* pBackend = taskBackendOpen(pMeta->path, key); + if (pBackend == NULL) { + taosThreadMutexUnlock(&pMeta->backendMutex); + return NULL; + } taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*)); - taosThreadMutexLock(&pMeta->backendMutex); return pBackend; } SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {