open operator after resume stream

This commit is contained in:
54liuyao 2024-03-12 16:35:55 +08:00
parent 2f6304d71e
commit 3835e0c156
2 changed files with 1 additions and 17 deletions

View File

@ -929,6 +929,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
}
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
pTask->hTaskInfo.operatorOpen = false;
streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
tqScanWalAsync((STQ*)handle, false);

View File

@ -1273,14 +1273,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
while (1) {
if (isTaskKilled(pTaskInfo)) {
if (pInfo->pUpdated != NULL) {
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
}
if (pInfo->pUpdatedMap != NULL) {
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
}
qInfo("===stream=== %s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
return NULL;
}
@ -4323,15 +4315,6 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
while (1) {
if (isTaskKilled(pTaskInfo)) {
if (pInfo->pUpdated != NULL) {
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
}
if (pInfo->pUpdatedMap != NULL) {
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
}
qInfo("===stream=== %s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
return NULL;
}