fix(stream): fix init error.

This commit is contained in:
Haojun Liao 2024-04-13 18:27:38 +08:00
parent d799212fb2
commit 15c18af221
2 changed files with 7 additions and 8 deletions

View File

@ -74,13 +74,15 @@ int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode)
initStorageAPI(&handle.api); initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) {
if (pTask->exec.pExecutor == NULL) { pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr); if (pTask->exec.pExecutor == NULL) {
return -1; tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr);
return -1;
}
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
} }
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -591,19 +591,16 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
} }
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
tFreeStreamTask(pTask);
return -1; return -1;
} }
taosArrayPush(pMeta->pTaskList, &pTask->id); taosArrayPush(pMeta->pTaskList, &pTask->id);
if (streamMetaSaveTask(pMeta, pTask) < 0) { if (streamMetaSaveTask(pMeta, pTask) < 0) {
tFreeStreamTask(pTask);
return -1; return -1;
} }
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
tFreeStreamTask(pTask);
return -1; return -1;
} }