From 8cf3339b25298cda5997adbfc91ad7cec93e656b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 26 Oct 2022 11:31:31 +0800 Subject: [PATCH] enh(query): enable the limit clause to be push down to the table scan operator. --- source/libs/executor/inc/executorimpl.h | 32 ++++++++--------- source/libs/executor/src/projectoperator.c | 2 -- source/libs/executor/src/scanoperator.c | 41 +++++++++++++++++----- source/libs/planner/src/planOptimizer.c | 2 +- 4 files changed, 48 insertions(+), 29 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 7755bd88db..dc6765cced 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -316,27 +316,25 @@ typedef struct { } SAggOptrPushDownInfo; typedef struct STableScanInfo { - STsdbReader* dataReader; - SReadHandle readHandle; - + STsdbReader* dataReader; + SReadHandle readHandle; + SLimitInfo limitInfo; SFileBlockLoadRecorder readRecorder; SScanInfo scanInfo; int32_t scanTimes; SNode* pFilterNode; // filter info, which is push down by optimizer - - SSDataBlock* pResBlock; - SColMatchInfo matchInfo; - SExprSupp pseudoSup; - SQueryTableDataCond cond; - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan - int32_t dataBlockLoadFlag; - SSampleExecInfo sample; // sample execution info - int32_t currentGroupId; - int32_t currentTable; - int8_t scanMode; - int8_t noTable; - SAggOptrPushDownInfo pdInfo; - int8_t assignBlockUid; + SSDataBlock* pResBlock; + SColMatchInfo matchInfo; + SExprSupp pseudoSup; + SQueryTableDataCond cond; + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t dataBlockLoadFlag; + SSampleExecInfo sample; // sample execution info + int32_t currentGroupId; + int32_t currentTable; + int8_t scanMode; + SAggOptrPushDownInfo pdInfo; + int8_t assignBlockUid; } STableScanInfo; typedef struct STableMergeScanInfo { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index b3ea7a5573..4e4c33d4c3 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -210,8 +210,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pOperator->status = OP_OPENED; } - qDebug("enter project"); - if (pOperator->status == OP_EXEC_DONE) { if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { pOperator->status = OP_OPENED; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7bbfccd461..aaf3b96d0f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -364,6 +364,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca pCost->totalBlocks += 1; pCost->totalRows += pBlock->info.rows; + bool loadSMA = false; *status = pInfo->dataBlockLoadFlag; @@ -379,6 +380,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pCost->filterOutBlocks += 1; + pCost->totalRows += pBlock->info.rows; return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), @@ -447,6 +449,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); if (pTableScanInfo->pFilterNode != NULL) { + + // restore the previous value + pCost->totalRows -= pBlock->info.rows; + int64_t st = taosGetTimestampUs(); doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, pOperator->exprSupp.pFilterInfo); @@ -460,6 +466,22 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca } else { qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el); } + + pCost->totalRows += pBlock->info.rows; + } + + pInfo->limitInfo.numOfOutputRows = pCost->totalRows; + SLimit* pLimit = &pInfo->limitInfo.limit; + + if (pLimit->limit != -1 && pLimit->limit < pInfo->limitInfo.numOfOutputRows && pBlock->info.rows > 0) { + // limit the output rows + int32_t overflowRows = pInfo->limitInfo.numOfOutputRows - pLimit->limit; + int32_t keep = pBlock->info.rows - overflowRows; + + blockDataKeepFirstNRows(pBlock, keep); + qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo)); + setTaskStatus(pTaskInfo, TASK_COMPLETED); + pOperator->status = OP_EXEC_DONE; } return TSDB_CODE_SUCCESS; @@ -691,10 +713,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // if scan table by table if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { - if (pInfo->noTable) { - return NULL; - } - int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); while (1) { @@ -727,7 +745,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); - tsdbReaderClose(pInfo->dataReader); int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader, @@ -749,9 +766,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return NULL; } - SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); - // tsdbSetTableList(pInfo->dataReader, tableList); - tsdbReaderReset(pInfo->dataReader, &pInfo->cond); pInfo->scanTimes = 0; @@ -798,9 +812,15 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, } SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; - int32_t numOfCols = 0; + + int32_t numOfCols = 0; int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { @@ -825,6 +845,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, if (pInfo->pFilterNode != NULL) { code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_OUT_OF_MEMORY) { + goto _error; + } } pInfo->scanFlag = MAIN_SCAN; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 9c9a7cfebb..58be9b0914 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2498,7 +2498,7 @@ static const SOptimizeRule optimizeRuleSet[] = { {.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize}, {.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize}, {.pName = "TagScan", .optimizeFunc = tagScanOptimize}, - // {.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize} + {.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize} }; // clang-format on