From aa7ea60bca6217d589ca1453959e6cb6f6a63b86 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 15:43:48 +0800 Subject: [PATCH] fix(stream): update the free strategy. --- source/libs/stream/inc/streamInc.h | 2 +- source/libs/stream/src/stream.c | 8 ++++---- source/libs/stream/src/streamDispatch.c | 12 +++--------- source/libs/stream/src/streamExec.c | 7 ++----- 4 files changed, 10 insertions(+), 19 deletions(-) diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index bd13d9ec37..049aac6214 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -34,7 +34,7 @@ typedef struct { static SStreamGlobalEnv streamEnv; void destroyStreamDataBlock(SStreamDataBlock* pBlock); -int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock); +int32_t streamDispatch(SStreamTask* pTask); int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 80511cd491..7987490bf5 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -200,8 +200,10 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock int32_t code = 0; if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); + destroyStreamDataBlock(pBlock); } else if (pTask->outputType == TASK_OUTPUT__SMA) { pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); + destroyStreamDataBlock(pBlock); } else { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); code = taosWriteQitem(pTask->outputQueue->queue, pBlock); @@ -209,7 +211,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock return code; } - streamDispatch(pTask, NULL); + streamDispatch(pTask); } return 0; @@ -254,10 +256,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i return 0; } - SStreamDataBlock* pBlock = NULL; // continue dispatch one block to down stream in pipeline - streamDispatch(pTask, &pBlock); - destroyStreamDataBlock(pBlock); + streamDispatch(pTask); return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 33e7b949f9..5723802d34 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -501,12 +501,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat return code; } -int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock) { +int32_t streamDispatch(SStreamTask* pTask) { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); - if (pBlock != NULL) { - *pBlock = NULL; - } - int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); if (numOfElems > 0) { qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr, @@ -535,9 +531,7 @@ int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock) { atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); } - if (pBlock != NULL) { - *pBlock = pDispatchedBlock; - } - + // this block can be freed only when it has been pushed to down stream. + destroyStreamDataBlock(pDispatchedBlock); return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 2f019529e4..077d011900 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -60,7 +60,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* *totalSize += size; *totalBlocks += numOfBlocks; - destroyStreamDataBlock(pStreamBlocks); +// destroyStreamDataBlock(pStreamBlocks); } else { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); } @@ -245,10 +245,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); - - SStreamDataBlock* pBlock = NULL; - streamDispatch(pTask, &pBlock); - destroyStreamDataBlock(pBlock); + streamDispatch(pTask); } if (finished) {