Merge pull request #25067 from taosdata/fix/TD-29073

open operator after resume stream
This commit is contained in:
Haojun Liao 2024-03-13 17:43:13 +08:00 committed by GitHub
commit 2466e169a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 1 additions and 17 deletions

View File

@ -929,6 +929,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
}
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
pTask->hTaskInfo.operatorOpen = false;
streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
tqScanWalAsync((STQ*)handle, false);

View File

@ -1273,14 +1273,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
while (1) {
if (isTaskKilled(pTaskInfo)) {
if (pInfo->pUpdated != NULL) {
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
}
if (pInfo->pUpdatedMap != NULL) {
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
}
qInfo("===stream=== %s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
return NULL;
}
@ -4323,15 +4315,6 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
while (1) {
if (isTaskKilled(pTaskInfo)) {
if (pInfo->pUpdated != NULL) {
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
}
if (pInfo->pUpdatedMap != NULL) {
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
}
qInfo("===stream=== %s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
return NULL;
}