From 0aa141ec77c88bd20ed8f91a009f3e256767f1b0 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 15 Jun 2023 18:24:40 +0800 Subject: [PATCH] use stream task state --- source/dnode/vnode/src/tq/tq.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 417a582d78..b938b2b3b0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -778,7 +778,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->dataRange.range.minVer = ver; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); + SStreamTask* pSateTask = pTask; + // if (pTask->info.fillHistory) { + // pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t)); + // } + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1); if (pTask->pState == NULL) { return -1; } @@ -793,7 +797,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); + SStreamTask* pSateTask = pTask; + // if (pTask->info.fillHistory) { + // pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t)); + // } + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1); if (pTask->pState == NULL) { return -1; }