From 1decaaee1e8bd3ff0ff584f68844b3c1b1ef3d81 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 16:49:04 +0800 Subject: [PATCH] fix(stream): set correct size for results generated by scan history stream tasks. --- source/libs/stream/src/streamData.c | 4 +++ source/libs/stream/src/streamExec.c | 38 +++++++++++++---------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 00bf631d74..a108667f5d 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -65,6 +65,10 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK; pStreamBlocks->blocks = pRes; + if (pItem == NULL) { + return pStreamBlocks; + } + if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem; pStreamBlocks->sourceVer = pSubmit->ver; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 74b24fb4c3..d89817d236 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -16,9 +16,10 @@ #include "streamInt.h" // maximum allowed processed block batches. One block may include several submit blocks -#define MAX_STREAM_EXEC_BATCH_NUM 32 -#define MIN_STREAM_EXEC_BATCH_NUM 4 -#define STREAM_RESULT_DUMP_THRESHOLD 100 +#define MAX_STREAM_EXEC_BATCH_NUM 32 +#define MIN_STREAM_EXEC_BATCH_NUM 4 +#define STREAM_RESULT_DUMP_THRESHOLD 100 +#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); @@ -75,7 +76,6 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks); if (code != TSDB_CODE_SUCCESS) { // back pressure and record position - //code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY destroyStreamDataBlock(pStreamBlocks); return code; } @@ -166,7 +166,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i pTask->info.selfChildId, numOfBlocks, SIZE_IN_MB(size)); // current output should be dispatched to down stream nodes - if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD) { + if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { ASSERT(numOfBlocks == taosArrayGetSize(pRes)); code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); if (code != TSDB_CODE_SUCCESS) { @@ -192,6 +192,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i int32_t streamScanHistoryData(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); + int32_t size = 0; int32_t code = TSDB_CODE_SUCCESS; void* exec = pTask->exec.pExecutor; bool finished = false; @@ -244,29 +245,24 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { block.info.childId = pTask->info.selfChildId; taosArrayPush(pRes, &block); - if ((++numOfBlocks) >= outputBatchSize) { - qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, outputBatchSize); + size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); + + if ((++numOfBlocks) >= outputBatchSize || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { + qDebug("s-task:%s scan exec numOfBlocks:%d, output num-limit:%d, size-limit:%d reached", pTask->id.idStr, numOfBlocks, + outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD); break; } } if (taosArrayGetSize(pRes) > 0) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - if (qRes == NULL) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - - code = doOutputResultBlockImpl(pTask, qRes); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(qRes); + SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes); + code = doOutputResultBlockImpl(pTask, pStreamBlocks); + if (code != TSDB_CODE_SUCCESS) { + destroyStreamDataBlock(pStreamBlocks); return code; } + + size = 0; } else { taosArrayDestroy(pRes); }