fix(stream): fix the underlying scan operations for stream, when trying to drop stream tasks.
This commit is contained in:
parent
08603e5d11
commit
5ba9a088d2
|
@ -887,6 +887,14 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// let's kill the query procedure within stream, to end it ASAP.
|
||||||
|
if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
|
||||||
|
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
|
||||||
|
}
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -500,7 +500,9 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
STaskStateTrans* pTrans = pSM->pActiveTrans;
|
STaskStateTrans* pTrans = pSM->pActiveTrans;
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
ETaskStatus s = pSM->current.state;
|
ETaskStatus s = pSM->current.state;
|
||||||
|
// when trying to finish current event successfully, another event with high priorities, such as dropping/stop, has
|
||||||
|
// interrupted this procedure, and changed the status after freeing the activeTrans, resulting in the failure of
|
||||||
|
// processing of current event.
|
||||||
if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT &&
|
if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT &&
|
||||||
s != TASK_STATUS__READY) {
|
s != TASK_STATUS__READY) {
|
||||||
stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name,
|
stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name,
|
||||||
|
|
Loading…
Reference in New Issue