From e96ed81ec0df8c6eacdaa0c38c44e8099c9c3346 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 May 2023 17:29:01 +0800 Subject: [PATCH 1/4] fix(stream): add input queue capacity check. --- source/libs/stream/src/stream.c | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index d3e4b23ad1..0d772247b4 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -132,8 +132,6 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); // input queue is full, upstream is blocked now status = (code == TSDB_CODE_SUCCESS)? TASK_INPUT_STATUS__NORMAL:TASK_INPUT_STATUS__BLOCKED; - - } // rsp by input status @@ -304,10 +302,15 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - taosWriteQitem(pTask->inputQueue->queue, pItem); + int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); + if (code != TSDB_CODE_SUCCESS) { + streamDataSubmitDestroy(px); + taosFreeQitem(pItem); + return code; + } } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { + if (/*(pTask->taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, size); @@ -317,10 +320,16 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { } qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); - taosWriteQitem(pTask->inputQueue->queue, pItem); + int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); + if (code != TSDB_CODE_SUCCESS) { + destroyStreamDataBlock((SStreamDataBlock*) pItem); + taosFreeQitem(pItem); + return code; + } } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__GET_RES) { + // use the default memory limit, refactor later. taosWriteQitem(pTask->inputQueue->queue, pItem); qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } From 937f67f07b0a89dc1df5a5e01c4dbfa2e31b76e5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 May 2023 18:02:42 +0800 Subject: [PATCH 2/4] fix(stream): fix invalid free. --- source/libs/stream/src/stream.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 0d772247b4..03013dbf3e 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -315,7 +315,6 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, size); destroyStreamDataBlock((SStreamDataBlock*) pItem); - taosFreeQitem(pItem); return -1; } From d9cc9f21574f4799e827c5792e85e4efa4ce3db1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 May 2023 18:03:21 +0800 Subject: [PATCH 3/4] fix(stream): remove invalid free. --- source/libs/stream/src/stream.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 03013dbf3e..6b69f19427 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -322,7 +322,6 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); if (code != TSDB_CODE_SUCCESS) { destroyStreamDataBlock((SStreamDataBlock*) pItem); - taosFreeQitem(pItem); return code; } } else if (type == STREAM_INPUT__CHECKPOINT) { From fc20ca8423dbe4b637d3465adea0f8fce692072e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 May 2023 18:42:23 +0800 Subject: [PATCH 4/4] fix(stream): set the correct rps flag as the dispatch rsp. --- source/libs/stream/src/stream.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 6b69f19427..8bca0e6ec2 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -233,8 +233,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { - ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED); - qDebug("s-task:%s receive dispatch rsp, code: %x", pTask->id.idStr, code); + qDebug("s-task:%s receive dispatch rsp, status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); @@ -246,13 +245,16 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); ASSERT(old == TASK_OUTPUT_STATUS__WAIT); + + // the input queue of the (down stream) task that receive the output data is full, so the TASK_INPUT_STATUS_BLOCKED is rsp + // todo we need to send EMPTY PACKAGE to detect if the input queue is available for output of upstream task, every 50 ms. if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { // TODO: init recover timer - ASSERT(0); + qError("s-task:%s inputQ of downstream task:0x%x is full, need to block output", pTask->id.idStr, pRsp->downstreamTaskId); return 0; } - // continue dispatch one block to down stream in pipeline + // otherwise, continue dispatch the first block to down stream task in pipeline streamDispatchStreamBlock(pTask); return 0; }