enh(stream): add more check to stop stream asap.

This commit is contained in:
Haojun Liao 2023-04-20 17:29:08 +08:00
parent b60b1796f7
commit 8323ad8670
2 changed files with 2 additions and 2 deletions

View File

@ -96,7 +96,7 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS
continue;
}
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
if ((pTask->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.taskStatus == TASK_STATUS__DROPPING)) {
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}

View File

@ -368,7 +368,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed", pTask->id.idStr);
if (!taosQueueEmpty(pTask->inputQueue->queue)) {
if (!taosQueueEmpty(pTask->inputQueue->queue) && (pTask->status.taskStatus != TASK_STATUS__DROPPING)) {
streamSchedExec(pTask);
}
}