From 5ba9a088d2e149ca4e180953cfa7aba032ce6b8d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 6 Nov 2024 16:15:40 +0800 Subject: [PATCH] fix(stream): fix the underlying scan operations for stream, when trying to drop stream tasks. --- source/libs/stream/src/streamMeta.c | 8 ++++++++ source/libs/stream/src/streamTaskSm.c | 4 +++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f5de719848..9213c729d7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -887,6 +887,14 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { tstrerror(code)); } } + + // let's kill the query procedure within stream, to end it ASAP. + if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) { + code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code)); + } + } return code; } diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index c3a2742aa2..f995c48688 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -500,7 +500,9 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even STaskStateTrans* pTrans = pSM->pActiveTrans; if (pTrans == NULL) { ETaskStatus s = pSM->current.state; - + // when trying to finish current event successfully, another event with high priorities, such as dropping/stop, has + // interrupted this procedure, and changed the status after freeing the activeTrans, resulting in the failure of + // processing of current event. if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT && s != TASK_STATUS__READY) { stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name,