Merge pull request #20998 from taosdata/fix/liaohj_main
enh(stream): add more check to stop stream asap.
This commit is contained in:
commit
cb686f8709
|
@ -96,15 +96,14 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
|
int32_t status = pTask->status.taskStatus;
|
||||||
|
if ((pTask->taskLevel != TASK_LEVEL__SOURCE) || (status == TASK_STATUS__DROPPING)) {
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE ||
|
if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) {
|
||||||
pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
|
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, status);
|
||||||
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
|
|
||||||
pTask->status.taskStatus);
|
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#include "streamInc.h"
|
#include "streamInc.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
|
||||||
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 100000
|
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 3000
|
||||||
|
|
||||||
int32_t streamInit() {
|
int32_t streamInit() {
|
||||||
int8_t old;
|
int8_t old;
|
||||||
|
|
|
@ -368,7 +368,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
qDebug("s-task:%s exec completed", pTask->id.idStr);
|
qDebug("s-task:%s exec completed", pTask->id.idStr);
|
||||||
|
|
||||||
if (!taosQueueEmpty(pTask->inputQueue->queue)) {
|
if (!taosQueueEmpty(pTask->inputQueue->queue) && (pTask->status.taskStatus != TASK_STATUS__DROPPING)) {
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue