From a3a3dc824355c4ff54a2f39d658181ffd4ebb3bb Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 30 Mar 2023 16:17:34 +0800 Subject: [PATCH] fix: add filter to exchange node for union-all subquery --- source/libs/executor/src/exchangeoperator.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 3bdecbe748..8caebd2a34 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -212,6 +212,11 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) { return NULL; } + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); + if (blockDataGetNumOfRows(pBlock) == 0) { + continue; + } + SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo; if (hasLimitOffsetInfo(pLimitInfo)) { int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false); @@ -303,6 +308,11 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock); + code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL); return pOperator;