refactor: do some internal refactor.
This commit is contained in:
parent
ad3075db1e
commit
05bd6a52c2
|
@ -119,6 +119,17 @@ typedef struct SLimit {
|
|||
int64_t offset;
|
||||
} SLimit;
|
||||
|
||||
typedef struct SFileBlockLoadRecorder {
|
||||
uint64_t totalRows;
|
||||
uint64_t totalCheckedRows;
|
||||
uint32_t totalBlocks;
|
||||
uint32_t loadBlocks;
|
||||
uint32_t loadBlockStatis;
|
||||
uint32_t skipBlocks;
|
||||
uint32_t filterOutBlocks;
|
||||
uint64_t elapsedTime;
|
||||
} SFileBlockLoadRecorder;
|
||||
|
||||
typedef struct STaskCostInfo {
|
||||
int64_t created;
|
||||
int64_t start;
|
||||
|
@ -132,14 +143,10 @@ typedef struct STaskCostInfo {
|
|||
uint64_t loadDataInCacheSize;
|
||||
|
||||
uint64_t loadDataTime;
|
||||
uint64_t totalRows;
|
||||
uint64_t totalCheckedRows;
|
||||
uint32_t totalBlocks;
|
||||
uint32_t loadBlocks;
|
||||
uint32_t loadBlockStatis;
|
||||
uint32_t skipBlocks;
|
||||
uint32_t filterOutBlocks;
|
||||
|
||||
SFileBlockLoadRecorder* pRecoder;
|
||||
uint64_t elapsedTime;
|
||||
|
||||
uint64_t firstStageMergeTime;
|
||||
uint64_t winInfoSize;
|
||||
uint64_t tableInfoSize;
|
||||
|
@ -333,13 +340,10 @@ typedef struct SScanInfo {
|
|||
|
||||
typedef struct STableScanInfo {
|
||||
void* dataReader;
|
||||
|
||||
int32_t numOfBlocks; // extract basic running information.
|
||||
int32_t numOfSkipped;
|
||||
int32_t numOfBlockStatis;
|
||||
SFileBlockLoadRecorder readRecorder;
|
||||
int64_t numOfRows;
|
||||
int64_t elapsedTime;
|
||||
int32_t prevGroupId; // previous table group id
|
||||
// int32_t prevGroupId; // previous table group id
|
||||
SScanInfo scanInfo;
|
||||
int32_t scanTimes;
|
||||
SNode* pFilterNode; // filter info, which is push down by optimizer
|
||||
|
|
|
@ -1593,8 +1593,8 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
|
|||
|
||||
STaskCostInfo* pCost = &pTaskInfo->cost;
|
||||
|
||||
pCost->totalBlocks += 1;
|
||||
pCost->totalRows += pBlock->info.rows;
|
||||
// pCost->totalBlocks += 1;
|
||||
// pCost->totalRows += pBlock->info.rows;
|
||||
#if 0
|
||||
// Calculate all time windows that are overlapping or contain current data block.
|
||||
// If current data block is contained by all possible time window, do not load current data block.
|
||||
|
@ -2415,8 +2415,8 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
|
|||
qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64
|
||||
" us, total blocks:%d, "
|
||||
"load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
|
||||
GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks,
|
||||
pSummary->loadBlockStatis, pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows);
|
||||
GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->pRecoder->totalBlocks,
|
||||
pSummary->pRecoder->loadBlockStatis, pSummary->pRecoder->loadBlocks, pSummary->pRecoder->totalRows, pSummary->pRecoder->totalCheckedRows);
|
||||
//
|
||||
// qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
|
||||
// hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
|
||||
|
@ -4869,9 +4869,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
}
|
||||
|
||||
SInterval interval = extractIntervalInfo(pTableScanNode);
|
||||
return createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired,
|
||||
SOperatorInfo* pOperator = createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired,
|
||||
pTableScanNode->scanSeq, pColList, pResBlock, pScanPhyNode->node.pConditions,
|
||||
&interval, pTableScanNode->ratio, pTaskInfo);
|
||||
STableScanInfo* pScanInfo = pOperator->info;
|
||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||
return pOperator;
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
||||
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
|
||||
|
|
|
@ -162,7 +162,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STableScanInfo* pInfo = pOperator->info;
|
||||
|
||||
STaskCostInfo* pCost = &pTaskInfo->cost;
|
||||
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
|
||||
|
||||
pCost->totalBlocks += 1;
|
||||
pCost->totalRows += pBlock->info.rows;
|
||||
|
@ -188,11 +188,11 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
|||
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
||||
pCost->loadBlockStatis += 1;
|
||||
|
||||
bool allHave = true;
|
||||
bool allColumnsHaveAgg = true;
|
||||
SColumnDataAgg** pColAgg = NULL;
|
||||
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allHave);
|
||||
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
|
||||
|
||||
if (allHave == true) {
|
||||
if (allColumnsHaveAgg == true) {
|
||||
int32_t numOfCols = pBlock->info.numOfCols;
|
||||
|
||||
// todo create this buffer during creating operator
|
||||
|
@ -266,7 +266,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
}
|
||||
|
||||
pTableScanInfo->numOfBlocks += 1;
|
||||
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);
|
||||
|
||||
uint32_t status = 0;
|
||||
|
@ -406,7 +405,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
|
|||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
pInfo->dataReader = pReadHandle;
|
||||
pInfo->prevGroupId = -1;
|
||||
// pInfo->prevGroupId = -1;
|
||||
|
||||
pOperator->name = "TableSeqScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
|
||||
|
|
Loading…
Reference in New Issue