From ec84d7fd81deb3a9c9bedf34a2a1d8747f12bca5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 16 Aug 2023 19:00:35 +0800 Subject: [PATCH 1/3] fix(stream): add the back pressure for sink node. --- source/libs/stream/src/stream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index af67490888..03a0f3586d 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -362,7 +362,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { msgLen, ver, total, size + msgLen/1048576.0); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { + if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); From 9a76ae72d625c3e97ab9968f47321b7512e9cac9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 16 Aug 2023 19:11:43 +0800 Subject: [PATCH 2/3] fix(stream): add the back pressure for sink node. --- source/dnode/vnode/src/tq/tqRestore.c | 7 +++++++ source/libs/stream/src/streamExec.c | 6 ++++++ 2 files changed, 13 insertions(+) diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index d363031db1..ed612587f5 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -287,6 +287,13 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } + // downstream task has blocked the output, stopped for a while + if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr); + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + *pScanIdle = false; // seek the stored version and extract data from WAL diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 02bbce6485..d761c5ad24 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -191,6 +191,12 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) { return 0; } + if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + qDebug("s-task:%s inputQ is blocked, wait for 5sec and retry", pTask->id.idStr); + taosMsleep(10000); + continue; + } + SSDataBlock* output = NULL; uint64_t ts = 0; code = qExecTask(exec, &output, &ts); From 59d5858b95895b62e60c132c9dc9c8791ef9093b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 16 Aug 2023 19:22:45 +0800 Subject: [PATCH 3/3] fix(stream): fix a typo --- source/libs/stream/src/streamExec.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d761c5ad24..3ab6643802 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -192,7 +192,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) { } if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { - qDebug("s-task:%s inputQ is blocked, wait for 5sec and retry", pTask->id.idStr); + qDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr); taosMsleep(10000); continue; }