From 4c03d372ef1886af9f0b68d0900fa2e8a1cdbdb6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 26 Oct 2023 14:19:34 +0800 Subject: [PATCH] fix stream case error --- source/dnode/vnode/src/tq/tq.c | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d028989865..b1e5058ea9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -750,10 +750,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { SStreamTask* pStateTask = pTask; - // if (pTask->info.fillHistory) { - // pTask->id.streamId = pTask->streamTaskId.streamId; - // pTask->id.taskId = pTask->streamTaskId.taskId; - // } + STaskId taskId = {.streamId = 0, .taskId = 0}; + if (pTask->info.fillHistory) { + + taskId.streamId = pTask->id.streamId; + taskId.taskId = pTask->id.taskId; + + pTask->id.streamId = pTask->streamTaskId.streamId; + pTask->id.taskId = pTask->streamTaskId.taskId; + } pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); if (pTask->pState == NULL) { @@ -762,6 +767,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { } else { tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } + if (pTask->info.fillHistory) { + pTask->id.streamId = taskId.streamId; + pTask->id.taskId = taskId.taskId; + } SReadHandle handle = { .checkpointId = pTask->chkInfo.checkpointId, @@ -783,10 +792,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { SStreamTask* pSateTask = pTask; // SStreamTask task = {0}; - // if (pTask->info.fillHistory) { - // pTask->id.streamId = pTask->streamTaskId.streamId; - // pTask->id.taskId = pTask->streamTaskId.taskId; - // } + + STaskId taskId = {.streamId = 0, .taskId = 0}; + if (pTask->info.fillHistory) { + taskId.streamId = pTask->id.streamId; + taskId.taskId = pTask->id.taskId; + pTask->id.streamId = pTask->streamTaskId.streamId; + pTask->id.taskId = pTask->streamTaskId.taskId; + } pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); if (pTask->pState == NULL) { @@ -796,6 +809,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } + if (pTask->info.fillHistory) { + pTask->id.streamId = taskId.streamId; + pTask->id.taskId = taskId.taskId; + } + int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); SReadHandle handle = { .checkpointId = pTask->chkInfo.checkpointId,