use stream task state

This commit is contained in:
liuyao 2023-06-15 18:24:40 +08:00
parent a3e1882fb2
commit 0aa141ec77
1 changed files with 10 additions and 2 deletions

View File

@ -778,7 +778,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->dataRange.range.minVer = ver;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
SStreamTask* pSateTask = pTask;
// if (pTask->info.fillHistory) {
// pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t));
// }
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
if (pTask->pState == NULL) {
return -1;
}
@ -793,7 +797,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
SStreamTask* pSateTask = pTask;
// if (pTask->info.fillHistory) {
// pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t));
// }
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
if (pTask->pState == NULL) {
return -1;
}