diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 32d2684a0a..e854d479fc 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4434,144 +4434,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc typedef struct STableMergeScanSortSourceParam { SOperatorInfo* pOperator; int32_t readerIdx; - int64_t uid; SSDataBlock* inputBlock; } STableMergeScanSortSourceParam; -static SSDataBlock* getTableDataBlockTemp(void* param) { - STableMergeScanSortSourceParam* source = param; - SOperatorInfo* pOperator = source->pOperator; - STableMergeScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - int32_t readIdx = source->readerIdx; - SSDataBlock* pBlock = source->inputBlock; - STableMergeScanInfo* pTableScanInfo = pOperator->info; - - SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx); - - blockDataCleanup(pBlock); - - int64_t st = taosGetTimestampUs(); - - void* pStart = taosArrayGet(pInfo->tableListInfo->pTableList, readIdx + pInfo->tableStartIndex); - - SReadHandle* pHandle = &pInfo->readHandle; - tsdbReaderOpen(pHandle->vnode, pQueryCond, pStart, 1, &pInfo->pReader, GET_TASKID(pTaskInfo)); - - STsdbReader* reader = pInfo->pReader; - while (tsdbNextDataBlock(reader)) { - if (isTaskKilled(pOperator->pTaskInfo)) { - T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); - } - - // process this data block based on the probabilities - bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); - if (!processThisBlock) { - continue; - } - - blockDataCleanup(pBlock); - SDataBlockInfo binfo = pBlock->info; - tsdbRetrieveDataBlockInfo(reader, &binfo); - - blockDataEnsureCapacity(pBlock, binfo.rows); - pBlock->info.type = binfo.type; - pBlock->info.uid = binfo.uid; - pBlock->info.window = binfo.window; - pBlock->info.rows = binfo.rows; - - if (tsdbIsAscendingOrder(pInfo->pReader)) { - pQueryCond->twindows.skey = pBlock->info.window.ekey + 1; - } else { - pQueryCond->twindows.ekey = pBlock->info.window.skey - 1; - } - - uint32_t status = 0; - int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readIdx, pBlock, &status); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pOperator->pTaskInfo->env, code); - } - - // current block is filter out according to filter condition, continue load the next block - if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { - continue; - } - - uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); - if (groupId) { - pBlock->info.groupId = *groupId; - } - - pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows; - pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - - tsdbReaderClose(pInfo->pReader); - pInfo->pReader = NULL; - return pBlock; - } - tsdbReaderClose(pInfo->pReader); - pInfo->pReader = NULL; - - return NULL; -} - -static SSDataBlock* getTableDataBlock2(void* param) { - STableMergeScanSortSourceParam* source = param; - SOperatorInfo* pOperator = source->pOperator; - int64_t uid = source->uid; - SSDataBlock* pBlock = source->inputBlock; - STableMergeScanInfo* pTableScanInfo = pOperator->info; - - int64_t st = taosGetTimestampUs(); - - blockDataCleanup(pBlock); - - STsdbReader* reader = pTableScanInfo->pReader; - while (tsdbTableNextDataBlock(reader, uid)) { - if (isTaskKilled(pOperator->pTaskInfo)) { - T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); - } - - // process this data block based on the probabilities - bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); - if (!processThisBlock) { - continue; - } - - blockDataCleanup(pBlock); - SDataBlockInfo binfo = pBlock->info; - tsdbRetrieveDataBlockInfo(reader, &binfo); - - blockDataEnsureCapacity(pBlock, binfo.rows); - pBlock->info.type = binfo.type; - pBlock->info.uid = binfo.uid; - pBlock->info.window = binfo.window; - pBlock->info.rows = binfo.rows; - - uint32_t status = 0; - int32_t code = loadDataBlockFromOneTable2(pOperator, pTableScanInfo, pBlock, &status); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pOperator->pTaskInfo->env, code); - } - - // current block is filter out according to filter condition, continue load the next block - if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { - continue; - } - - uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); - if (groupId) { - pBlock->info.groupId = *groupId; - } - - pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; - pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - - return pBlock; - } - return NULL; -} - static SSDataBlock* getTableDataBlock(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator;