diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9024f7a341..d3dd90795c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2269,6 +2269,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { if (isTaskKilled(pTaskInfo)) { + qInfo("===stream===stream scan is killed. task id:%s, code %s", id, tstrerror(pTaskInfo->code)); return NULL; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 7a1bb2729c..a05e6fbfb8 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1281,8 +1281,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { tSimpleHashCleanup(pInfo->pUpdatedMap); pInfo->pUpdatedMap = NULL; } - qInfo("%s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + qInfo("===stream=== %s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); + return NULL; } SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -4332,7 +4332,8 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { pInfo->pUpdatedMap = NULL; } - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + qInfo("===stream=== %s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); + return NULL; } SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);