diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 50c62b9017..c7a6ae5774 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -340,23 +340,23 @@ typedef struct STableScanInfo { } STableScanInfo; typedef struct STableMergeScanInfo { - STableListInfo* tableListInfo; - int32_t tableStartIndex; - int32_t tableEndIndex; - bool hasGroupId; - uint64_t groupId; - SArray* dataReaders; // array of tsdbReaderT* - SArray* queryConds; // array of queryTableDataCond - STsdbReader* pReader; - SReadHandle readHandle; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray* pSortInfo; - SSortHandle* pSortHandle; - SSDataBlock* pSortInputBlock; - int64_t startTs; // sort start time - SArray* sortSourceParams; - SLimitInfo limitInfo; + STableListInfo* tableListInfo; + int32_t tableStartIndex; + int32_t tableEndIndex; + bool hasGroupId; + uint64_t groupId; + SArray* dataReaders; // array of tsdbReaderT* + SReadHandle readHandle; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray* pSortInfo; + SSortHandle* pSortHandle; + SSDataBlock* pSortInputBlock; + int64_t startTs; // sort start time + SArray* sortSourceParams; + SLimitInfo limitInfo; + + SFileBlockLoadRecorder readRecorder; int64_t numOfRows; SScanInfo scanInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e854d479fc..7795eff410 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4204,114 +4204,6 @@ int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle* return TSDB_CODE_SUCCESS; } -static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, - SSDataBlock* pBlock, uint32_t* status) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableMergeScanInfo* pInfo = pOperator->info; - - uint64_t uid = pBlock->info.uid; - - SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; - - pCost->totalBlocks += 1; - pCost->totalRows += pBlock->info.rows; - - *status = pInfo->dataBlockLoadFlag; - if (pTableScanInfo->pFilterNode != NULL || - overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { - (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; - } - - SDataBlockInfo* pBlockInfo = &pBlock->info; - taosMemoryFreeClear(pBlock->pBlockAgg); - - if (*status == FUNC_DATA_REQUIRED_FILTEROUT) { - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), - pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - pCost->filterOutBlocks += 1; - return TSDB_CODE_SUCCESS; - } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { - qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), - pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - pCost->skipBlocks += 1; - - // clear all data in pBlock that are set when handing the previous block - for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { - SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i); - pcol->pData = NULL; - } - - return TSDB_CODE_SUCCESS; - } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { - pCost->loadBlockStatis += 1; - - bool allColumnsHaveAgg = true; - SColumnDataAgg** pColAgg = NULL; - STsdbReader* reader = pTableScanInfo->pReader; - tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg); - - if (allColumnsHaveAgg == true) { - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - - // todo create this buffer during creating operator - if (pBlock->pBlockAgg == NULL) { - pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES); - } - - for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i); - if (!pColMatchInfo->needOutput) { - continue; - } - pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i]; - } - - return TSDB_CODE_SUCCESS; - } else { // failed to load the block sma data, data block statistics does not exist, load data block instead - *status = FUNC_DATA_REQUIRED_DATA_LOAD; - } - } - - ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); - - pCost->totalCheckedRows += pBlock->info.rows; - pCost->loadBlocks += 1; - - STsdbReader* reader = pTableScanInfo->pReader; - SArray* pCols = tsdbRetrieveDataBlock(reader, NULL); - if (pCols == NULL) { - return terrno; - } - - relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); - - // currently only the tbname pseudo column - if (pTableScanInfo->pseudoSup.numOfExprs > 0) { - int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, - pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo)); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } - } - - if (pTableScanInfo->pFilterNode != NULL) { - int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL); - - double el = (taosGetTimestampUs() - st) / 1000.0; - pTableScanInfo->readRecorder.filterTime += el; - - if (pBlock->info.rows == 0) { - pCost->filterOutBlocks += 1; - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms", - GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el); - } else { - qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el); - } - } - return TSDB_CODE_SUCCESS; -} - // todo refactor static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {