fix(query): set dataload flag.

This commit is contained in:
Haojun Liao 2022-12-12 18:33:44 +08:00
parent 8f02297890
commit d7edcfd207
2 changed files with 6 additions and 5 deletions

View File

@ -1080,7 +1080,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
pBlock->info.id.groupId); pBlock->info.id.groupId);
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
return 0; return 0;
} }

View File

@ -87,11 +87,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
} }
int32_t numOfCols = 0; int32_t numOfCols = 0;
SSDataBlock* pResBlock = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pInfo->pRes = pResBlock;
setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
@ -401,6 +401,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
pRes->info.rows = nrows; pRes->info.rows = nrows;
pRes->info.dataLoad = 1;
if (pRes->info.rows >= pOperator->resultInfo.threshold) { if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break; break;
} }
@ -412,7 +413,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
SSDataBlock* pRes = pJoinInfo->pRes; SSDataBlock* pRes = pJoinInfo->pRes;
blockDataCleanup(pRes); blockDataCleanup(pRes);
blockDataEnsureCapacity(pRes, 4096);
while (true) { while (true) {
int32_t numOfRowsBefore = pRes->info.rows; int32_t numOfRowsBefore = pRes->info.rows;
doMergeJoinImpl(pOperator, pRes); doMergeJoinImpl(pOperator, pRes);