From 8323ad86701cc80f0e00251eb9a2a7e36a264bdf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Apr 2023 17:29:08 +0800 Subject: [PATCH 1/3] enh(stream): add more check to stop stream asap. --- source/dnode/vnode/src/tq/tqRestore.c | 2 +- source/libs/stream/src/streamExec.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 657dd376a1..9a9c750194 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -96,7 +96,7 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS continue; } - if (pTask->taskLevel != TASK_LEVEL__SOURCE) { + if ((pTask->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.taskStatus == TASK_STATUS__DROPPING)) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 9a6ff302ef..f52af66387 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -368,7 +368,7 @@ int32_t streamTryExec(SStreamTask* pTask) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed", pTask->id.idStr); - if (!taosQueueEmpty(pTask->inputQueue->queue)) { + if (!taosQueueEmpty(pTask->inputQueue->queue) && (pTask->status.taskStatus != TASK_STATUS__DROPPING)) { streamSchedExec(pTask); } } From c7e42d5422e1fbc1d732ec3edb180a4713854eb7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Apr 2023 17:30:02 +0800 Subject: [PATCH 2/3] other: do some internal refactor. --- source/dnode/vnode/src/tq/tqRestore.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 9a9c750194..56f0a80b9e 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -96,15 +96,14 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS continue; } - if ((pTask->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.taskStatus == TASK_STATUS__DROPPING)) { + int32_t status = pTask->status.taskStatus; + if ((pTask->taskLevel != TASK_LEVEL__SOURCE) || (status == TASK_STATUS__DROPPING)) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } - if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || - pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, - pTask->status.taskStatus); + if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) { + tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, status); streamMetaReleaseTask(pStreamMeta, pTask); continue; } From dc733352db5164d544084f14b769980adbce8f6b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Apr 2023 17:38:10 +0800 Subject: [PATCH 3/3] enh(stream): set the max input queue size to be 3000. --- source/libs/stream/src/stream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 0f000f1f50..86ba91f76d 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -16,7 +16,7 @@ #include "streamInc.h" #include "ttimer.h" -#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 100000 +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 3000 int32_t streamInit() { int8_t old;