From bd819f5c8839c078c91fe58200776966d11ad7c4 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 6 Jul 2022 14:42:56 +0800 Subject: [PATCH] feat: add filter to sort operator --- source/libs/executor/inc/executorimpl.h | 2 ++ source/libs/executor/src/sortoperator.c | 23 ++++++++++++++++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 88f2fedd28..d3d7db3341 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -682,6 +682,8 @@ typedef struct SSortOperatorInfo { int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. + + SNode* pCondition; } SSortOperatorInfo; typedef struct STagFilterOperatorInfo { diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 44ff4c1c90..2dc8ced737 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -46,7 +46,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* initResultSizeInfo(pOperator, 1024); pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); - ; + pInfo->pCondition = pSortPhyNode->node.pConditions; pInfo->pColMatchInfo = pColMatchColInfo; pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; @@ -205,14 +205,27 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, code); } - SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, - pInfo->pColMatchInfo, pInfo); + SSDataBlock* pBlock = NULL; + while (1) { + pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, + pInfo->pColMatchInfo, pInfo); + if (pBlock != NULL) { + doFilter(pInfo->pCondition, pBlock); + } + + if (pBlock == NULL) { + doSetOperatorCompleted(pOperator); + break; + } + if (blockDataGetNumOfRows(pBlock) > 0) { + break; + } + } if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; - } else { - doSetOperatorCompleted(pOperator); } + return pBlock; }