diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index be52a10dcb..b4e786746b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -325,16 +325,16 @@ typedef struct STableScanBase { SExprSupp pseudoSup; STableMetaCacheInfo metaCache; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t dataBlockLoadFlag; + SLimitInfo limitInfo; } STableScanBase; typedef struct STableScanInfo { STableScanBase base; - SLimitInfo limitInfo; SScanInfo scanInfo; int32_t scanTimes; SSDataBlock* pResBlock; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan - int32_t dataBlockLoadFlag; SSampleExecInfo sample; // sample execution info int32_t currentGroupId; int32_t currentTable; @@ -361,13 +361,8 @@ typedef struct STableMergeScanInfo { SScanInfo scanInfo; int32_t scanTimes; SSDataBlock* pResBlock; - int32_t numOfOutput; - - // if the upstream is an interval operator, the interval info is also kept here to get the time - // window to check if current data block needs to be loaded. - SInterval interval; - SSampleExecInfo sample; // sample execution info - SSortExecInfo sortExecInfo; + SSampleExecInfo sample; // sample execution info + SSortExecInfo sortExecInfo; } STableMergeScanInfo; typedef struct STagScanInfo { @@ -380,17 +375,17 @@ typedef struct STagScanInfo { } STagScanInfo; typedef struct SLastrowScanInfo { - SSDataBlock* pRes; - SReadHandle readHandle; - void* pLastrowReader; - SColMatchInfo matchInfo; - int32_t* pSlotIds; - SExprSupp pseudoExprSup; - int32_t retrieveType; - int32_t currentGroupIndex; - SSDataBlock* pBufferredRes; - SArray* pUidList; - int32_t indexOfBufferedRes; + SSDataBlock* pRes; + SReadHandle readHandle; + void* pLastrowReader; + SColMatchInfo matchInfo; + int32_t* pSlotIds; + SExprSupp pseudoExprSup; + int32_t retrieveType; + int32_t currentGroupIndex; + SSDataBlock* pBufferredRes; + SArray* pUidList; + int32_t indexOfBufferedRes; } SLastrowScanInfo; typedef enum EStreamScanMode { @@ -407,13 +402,6 @@ enum { PROJECT_RETRIEVE_DONE = 0x2, }; -typedef struct SCatchSupporter { - SHashObj* pWindowHashTable; // quick locate the window object for each window - SDiskbasedBuf* pDataBuf; // buffer based on blocked-wised disk file - int32_t keySize; - int64_t* pKeyBuf; -} SCatchSupporter; - typedef struct SStreamAggSupporter { int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row SSDataBlock* pScanBlock; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8e8a86670b..26117b442e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -374,23 +374,16 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo } } - - - - static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableScanInfo* pInfo = pOperator->info; - SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; pCost->totalBlocks += 1; pCost->totalRows += pBlock->info.rows; bool loadSMA = false; - - *status = pInfo->dataBlockLoadFlag; + *status = pTableScanInfo->dataBlockLoadFlag; if (pOperator->exprSupp.pFilterInfo != NULL || overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) { (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; @@ -489,10 +482,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca } } - applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo, pOperator); + applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator); pCost->totalRows += pBlock->info.rows; - pInfo->limitInfo.numOfOutputRows = pCost->totalRows; + pTableScanInfo->limitInfo.numOfOutputRows = pCost->totalRows; return TSDB_CODE_SUCCESS; } @@ -871,8 +864,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // reset value for the next group data output pOperator->status = OP_OPENED; - pInfo->limitInfo.numOfOutputRows = 0; - pInfo->limitInfo.remainOffset = pInfo->limitInfo.limit.offset; + pInfo->base.limitInfo.numOfOutputRows = 0; + pInfo->base.limitInfo.remainOffset = pInfo->base.limitInfo.limit.offset; int32_t num = 0; STableKeyInfo* pList = NULL; @@ -936,7 +929,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, goto _error; } - initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->limitInfo); + initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo); code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -954,7 +947,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); - pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; + pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired; initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pResBlock = createResDataBlock(pDescNode); @@ -4729,8 +4722,11 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; + pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD; + pInfo->base.scanFlag = MAIN_SCAN; pInfo->base.readHandle = *readHandle; - pInfo->interval = extractIntervalInfo(pTableScanNode); + initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->base.limitInfo); + pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); @@ -4739,7 +4735,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN goto _error; } - pInfo->base.scanFlag = MAIN_SCAN; initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->pResBlock = createResDataBlock(pDescNode);