fix stream case error

This commit is contained in:
yihaoDeng 2023-10-26 14:19:34 +08:00
parent 87db93e72e
commit 4c03d372ef
1 changed files with 26 additions and 8 deletions

View File

@ -750,10 +750,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SStreamTask* pStateTask = pTask;
// if (pTask->info.fillHistory) {
// pTask->id.streamId = pTask->streamTaskId.streamId;
// pTask->id.taskId = pTask->streamTaskId.taskId;
// }
STaskId taskId = {.streamId = 0, .taskId = 0};
if (pTask->info.fillHistory) {
taskId.streamId = pTask->id.streamId;
taskId.taskId = pTask->id.taskId;
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
}
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
@ -762,6 +767,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
} else {
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
if (pTask->info.fillHistory) {
pTask->id.streamId = taskId.streamId;
pTask->id.taskId = taskId.taskId;
}
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
@ -783,10 +792,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
SStreamTask* pSateTask = pTask;
// SStreamTask task = {0};
// if (pTask->info.fillHistory) {
// pTask->id.streamId = pTask->streamTaskId.streamId;
// pTask->id.taskId = pTask->streamTaskId.taskId;
// }
STaskId taskId = {.streamId = 0, .taskId = 0};
if (pTask->info.fillHistory) {
taskId.streamId = pTask->id.streamId;
taskId.taskId = pTask->id.taskId;
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
}
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
@ -796,6 +809,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
if (pTask->info.fillHistory) {
pTask->id.streamId = taskId.streamId;
pTask->id.taskId = taskId.taskId;
}
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,