From 15c18af221948986b94f559d689abb238339c552 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 13 Apr 2024 18:27:38 +0800 Subject: [PATCH] fix(stream): fix init error. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 12 +++++++----- source/libs/stream/src/streamMeta.c | 3 --- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index f85bb8cee5..1c3a760bab 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -74,13 +74,15 @@ int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) initStorageAPI(&handle.api); - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); - if (pTask->exec.pExecutor == NULL) { - tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr); - return -1; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) { + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); + if (pTask->exec.pExecutor == NULL) { + tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr); + return -1; + } + qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8d5e4f3c87..ea18e791a6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -591,19 +591,16 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { - tFreeStreamTask(pTask); return -1; } taosArrayPush(pMeta->pTaskList, &pTask->id); if (streamMetaSaveTask(pMeta, pTask) < 0) { - tFreeStreamTask(pTask); return -1; } if (streamMetaCommit(pMeta) < 0) { - tFreeStreamTask(pTask); return -1; }