refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2022-11-18 13:40:03 +08:00
parent 875989aa30
commit 8b1e13af5e
2 changed files with 26 additions and 43 deletions

View File

@ -325,16 +325,16 @@ typedef struct STableScanBase {
SExprSupp pseudoSup; SExprSupp pseudoSup;
STableMetaCacheInfo metaCache; STableMetaCacheInfo metaCache;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
SLimitInfo limitInfo;
} STableScanBase; } STableScanBase;
typedef struct STableScanInfo { typedef struct STableScanInfo {
STableScanBase base; STableScanBase base;
SLimitInfo limitInfo;
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
int32_t currentGroupId; int32_t currentGroupId;
int32_t currentTable; int32_t currentTable;
@ -361,13 +361,8 @@ typedef struct STableMergeScanInfo {
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
int32_t numOfOutput; SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SInterval interval;
SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo;
} STableMergeScanInfo; } STableMergeScanInfo;
typedef struct STagScanInfo { typedef struct STagScanInfo {
@ -380,17 +375,17 @@ typedef struct STagScanInfo {
} STagScanInfo; } STagScanInfo;
typedef struct SLastrowScanInfo { typedef struct SLastrowScanInfo {
SSDataBlock* pRes; SSDataBlock* pRes;
SReadHandle readHandle; SReadHandle readHandle;
void* pLastrowReader; void* pLastrowReader;
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
int32_t* pSlotIds; int32_t* pSlotIds;
SExprSupp pseudoExprSup; SExprSupp pseudoExprSup;
int32_t retrieveType; int32_t retrieveType;
int32_t currentGroupIndex; int32_t currentGroupIndex;
SSDataBlock* pBufferredRes; SSDataBlock* pBufferredRes;
SArray* pUidList; SArray* pUidList;
int32_t indexOfBufferedRes; int32_t indexOfBufferedRes;
} SLastrowScanInfo; } SLastrowScanInfo;
typedef enum EStreamScanMode { typedef enum EStreamScanMode {
@ -407,13 +402,6 @@ enum {
PROJECT_RETRIEVE_DONE = 0x2, PROJECT_RETRIEVE_DONE = 0x2,
}; };
typedef struct SCatchSupporter {
SHashObj* pWindowHashTable; // quick locate the window object for each window
SDiskbasedBuf* pDataBuf; // buffer based on blocked-wised disk file
int32_t keySize;
int64_t* pKeyBuf;
} SCatchSupporter;
typedef struct SStreamAggSupporter { typedef struct SStreamAggSupporter {
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
SSDataBlock* pScanBlock; SSDataBlock* pScanBlock;

View File

@ -374,23 +374,16 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
} }
} }
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock, static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) { uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableScanInfo* pInfo = pOperator->info;
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
pCost->totalBlocks += 1; pCost->totalBlocks += 1;
pCost->totalRows += pBlock->info.rows; pCost->totalRows += pBlock->info.rows;
bool loadSMA = false; bool loadSMA = false;
*status = pTableScanInfo->dataBlockLoadFlag;
*status = pInfo->dataBlockLoadFlag;
if (pOperator->exprSupp.pFilterInfo != NULL || if (pOperator->exprSupp.pFilterInfo != NULL ||
overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) { overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD; (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
@ -489,10 +482,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
} }
} }
applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo, pOperator); applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator);
pCost->totalRows += pBlock->info.rows; pCost->totalRows += pBlock->info.rows;
pInfo->limitInfo.numOfOutputRows = pCost->totalRows; pTableScanInfo->limitInfo.numOfOutputRows = pCost->totalRows;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -871,8 +864,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// reset value for the next group data output // reset value for the next group data output
pOperator->status = OP_OPENED; pOperator->status = OP_OPENED;
pInfo->limitInfo.numOfOutputRows = 0; pInfo->base.limitInfo.numOfOutputRows = 0;
pInfo->limitInfo.remainOffset = pInfo->limitInfo.limit.offset; pInfo->base.limitInfo.remainOffset = pInfo->base.limitInfo.limit.offset;
int32_t num = 0; int32_t num = 0;
STableKeyInfo* pList = NULL; STableKeyInfo* pList = NULL;
@ -936,7 +929,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
goto _error; goto _error;
} }
initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->limitInfo); initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode); code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
@ -954,7 +947,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec(); pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->pResBlock = createResDataBlock(pDescNode);
@ -4729,8 +4722,11 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
pInfo->base.scanFlag = MAIN_SCAN;
pInfo->base.readHandle = *readHandle; pInfo->base.readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->base.limitInfo);
pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec(); pInfo->sample.seed = taosGetTimestampSec();
@ -4739,7 +4735,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
goto _error; goto _error;
} }
pInfo->base.scanFlag = MAIN_SCAN;
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->pResBlock = createResDataBlock(pDescNode);