From d7edcfd207b50c9ce5b6481384e8c306ea37387d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Dec 2022 18:33:44 +0800 Subject: [PATCH] fix(query): set dataload flag. --- source/libs/executor/src/executorimpl.c | 2 +- source/libs/executor/src/joinoperator.c | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 043cc396b5..41e3d890cd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1080,7 +1080,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, pBlock->info.id.groupId); - + pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, 0); return 0; } diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index d460af971c..8a097a23ce 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -87,11 +87,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t } int32_t numOfCols = 0; - SSDataBlock* pResBlock = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); initResultSizeInfo(&pOperator->resultInfo, 4096); - - pInfo->pRes = pResBlock; + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.pExprInfo = pExprInfo; @@ -401,6 +401,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) // the pDataBlock are always the same one, no need to call this again pRes->info.rows = nrows; + pRes->info.dataLoad = 1; if (pRes->info.rows >= pOperator->resultInfo.threshold) { break; } @@ -412,7 +413,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { SSDataBlock* pRes = pJoinInfo->pRes; blockDataCleanup(pRes); - blockDataEnsureCapacity(pRes, 4096); + while (true) { int32_t numOfRowsBefore = pRes->info.rows; doMergeJoinImpl(pOperator, pRes);