From 1b1d1d1c4a3b2a44a16f5307a507efa770f81b6e Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 14 Jun 2022 18:17:18 +0800 Subject: [PATCH 1/6] feature: add table merge scan operator --- source/libs/executor/inc/executorimpl.h | 3 +- source/libs/executor/src/executorimpl.c | 2 - source/libs/executor/src/scanoperator.c | 525 +++++++++++++++++++++++- source/libs/executor/src/sortoperator.c | 2 +- 4 files changed, 513 insertions(+), 19 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ab60acab53..e8dab42129 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -907,7 +907,8 @@ int32_t compareTimeWindow(const void* p1, const void* p2, const void* param); int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); - +int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, + SNode* pTagCond); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 99a9dcb6af..f5d813f2bf 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4556,8 +4556,6 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond); -static int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, - SNode* pTagCond); static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* createSortInfo(SNodeList* pNodeList); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d30e4ef6db..3ad5465d67 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -630,7 +630,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo); - char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); + char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); varDataSetLen(p, len); @@ -642,7 +642,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { } static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) { - SBlockDistInfo* pDistInfo = (SBlockDistInfo*) param; + SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param; blockDataDestroy(pDistInfo->pResBlock); } @@ -654,24 +654,25 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* goto _error; } - pInfo->pHandle = dataReader; + pInfo->pHandle = dataReader; pInfo->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SColumnInfoData infoData = {0}; - infoData.info.type = TSDB_DATA_TYPE_VARCHAR; - infoData.info.bytes = 1024; + infoData.info.type = TSDB_DATA_TYPE_VARCHAR; + infoData.info.bytes = 1024; taosArrayPush(pInfo->pResBlock->pDataBlock, &infoData); - pOperator->name = "DataBlockInfoScanOperator"; + pOperator->name = "DataBlockInfoScanOperator"; // pOperator->operatorType = OP_TableBlockInfoScan; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, destroyBlockDistScanOperatorInfo, NULL, NULL, NULL); + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, + destroyBlockDistScanOperatorInfo, NULL, NULL, NULL); return pOperator; _error: @@ -823,7 +824,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { return NULL; } - int32_t current = pInfo->validBlockIndex++; + int32_t current = pInfo->validBlockIndex++; SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); blockDataUpdateTsWindow(pBlock, 0); return pBlock; @@ -984,7 +985,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan; SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info; @@ -1436,7 +1437,6 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { pRsp->numOfRows, pInfo->loadInfo.totalRows); if (pRsp->numOfRows == 0) { - taosMemoryFree(pRsp); return NULL; } @@ -1773,3 +1773,498 @@ _error: terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + +typedef struct STableMergeScanInfo { + SArray* dataReaders; // array of tsdbReaderT* + SReadHandle readHandle; + + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + + SArray* pSortInfo; + SSortHandle* pSortHandle; + + SSDataBlock* pSortInputBlock; + int64_t startTs; // sort start time + + bool hasGroupId; + uint64_t groupId; + STupleHandle* prefetchedTuple; + + SArray* sortSourceParams; + + SFileBlockLoadRecorder readRecorder; + int64_t numOfRows; + // int32_t prevGroupId; // previous table group id + SScanInfo scanInfo; + int32_t scanTimes; + SNode* pFilterNode; // filter info, which is push down by optimizer + SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context + SResultRowInfo* pResultRowInfo; + int32_t* rowCellInfoOffset; + SExprInfo* pExpr; + SSDataBlock* pResBlock; + SArray* pColMatchInfo; + int32_t numOfOutput; + + SExprInfo* pPseudoExpr; + int32_t numOfPseudoExpr; + SqlFunctionCtx* pPseudoCtx; + // int32_t* rowCellInfoOffset; + + SQueryTableDataCond cond; + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t dataBlockLoadFlag; + SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time + // window to check if current data block needs to be loaded. + + SSampleExecInfo sample; // sample execution info + int32_t curTWinIdx; + +} STableMergeScanInfo; + +int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, + STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, + uint64_t taskId, SNode* pTagCond) { + int32_t code = + getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { + qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId); + goto _error; + } + + SQueryTableDataCond cond = {0}; + code = initQueryTableDataCond(&cond, pTableScanNode); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); ++i) { + STableListInfo* subListInfo = taosMemoryCalloc(1, sizeof(subListInfo)); + subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); + taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i)); + tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); + taosArrayPush(arrayReader, &pReader); + } + clearupQueryTableDataCond(&cond); + + return 0; + +_error: + return code; +} + +static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, + int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableScanInfo* pInfo = pOperator->info; + + SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; + + pCost->totalBlocks += 1; + pCost->totalRows += pBlock->info.rows; + + *status = pInfo->dataBlockLoadFlag; + if (pTableScanInfo->pFilterNode != NULL || + overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { + (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; + } + + SDataBlockInfo* pBlockInfo = &pBlock->info; + taosMemoryFreeClear(pBlock->pBlockAgg); + + if (*status == FUNC_DATA_REQUIRED_FILTEROUT) { + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->filterOutBlocks += 1; + return TSDB_CODE_SUCCESS; + } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { + qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->skipBlocks += 1; + + // clear all data in pBlock that are set when handing the previous block + for (int32_t i = 0; i < pBlockInfo->numOfCols; ++i) { + SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i); + pcol->pData = NULL; + } + + return TSDB_CODE_SUCCESS; + } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { + pCost->loadBlockStatis += 1; + + bool allColumnsHaveAgg = true; + SColumnDataAgg** pColAgg = NULL; + tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); + tsdbRetrieveDataBlockStatisInfo(reader, &pColAgg, &allColumnsHaveAgg); + + if (allColumnsHaveAgg == true) { + int32_t numOfCols = pBlock->info.numOfCols; + + // todo create this buffer during creating operator + if (pBlock->pBlockAgg == NULL) { + pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES); + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); + if (!pColMatchInfo->output) { + continue; + } + pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i]; + } + + return TSDB_CODE_SUCCESS; + } else { // failed to load the block sma data, data block statistics does not exist, load data block instead + *status = FUNC_DATA_REQUIRED_DATA_LOAD; + } + } + + ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); + + // todo filter data block according to the block sma data firstly +#if 0 + if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, + pBlockInfo->window.ekey, pBlockInfo->rows); + (*status) = FUNC_DATA_REQUIRED_FILTEROUT; + return TSDB_CODE_SUCCESS; + } +#endif + + pCost->totalCheckedRows += pBlock->info.rows; + pCost->loadBlocks += 1; + + tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); + SArray* pCols = tsdbRetrieveDataBlock(reader, NULL); + if (pCols == NULL) { + return terrno; + } + + relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols); + + // currently only the tbname pseudo column + if (pTableScanInfo->numOfPseudoExpr > 0) { + addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr, + pBlock); + } + + int64_t st = taosGetTimestampMs(); + doFilter(pTableScanInfo->pFilterNode, pBlock); + + int64_t et = taosGetTimestampMs(); + pTableScanInfo->readRecorder.filterTime += (et - st); + + if (pBlock->info.rows == 0) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + } + + return TSDB_CODE_SUCCESS; +} + +typedef struct STableMergeScanSortSourceParam { + SOperatorInfo* pOperator; + int32_t readerIdx; +} STableMergeScanSortSourceParam; + +static SSDataBlock* getTableDataBlock(void* param) { + STableMergeScanSortSourceParam* source = param; + SOperatorInfo* pOperator = source->pOperator; + int32_t readerIdx = source->readerIdx; + STableMergeScanInfo* pTableScanInfo = pOperator->info; + SSDataBlock* pBlock = pTableScanInfo->pResBlock; + + int64_t st = taosGetTimestampUs(); + + tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); + while (tsdbNextDataBlock(reader)) { + if (isTaskKilled(pOperator->pTaskInfo)) { + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + + // process this data block based on the probabilities + bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); + if (!processThisBlock) { + continue; + } + + tsdbRetrieveDataBlockInfo(reader, &pBlock->info); + + uint32_t status = 0; + int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status); + // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pOperator->pTaskInfo->env, code); + } + + // current block is filter out according to filter condition, continue load the next block + if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { + continue; + } + + uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); + if (groupId) { + pBlock->info.groupId = *groupId; + } + + pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; + pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + + pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime; + return pBlock; + } + return NULL; +} + +SArray* generateSortByTsInfo(int32_t order) { + SArray* pList = taosArrayInit(1, sizeof(SBlockOrderInfo)); + SBlockOrderInfo bi = {0}; + bi.order = order; + bi.slotId = 0; + bi.nullFirst = NULL_ORDER_FIRST; + + taosArrayPush(pList, &bi); + + return pList; +} + +int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) { + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + if (OPTR_IS_OPENED(pOperator)) { + return TSDB_CODE_SUCCESS; + } + + pInfo->startTs = taosGetTimestampUs(); + + int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; + + pInfo->pSortHandle = + tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, + numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str); + + tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL); + + size_t numReaders = taosArrayGetSize(pInfo->dataReaders); + for (int32_t i = 0; i < numReaders; ++i) { + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); + param->readerIdx = i; + param->pOperator = pOperator; + ps->param = param; + tsortAddSource(pInfo->pSortHandle, ps); + } + + int32_t code = tsortOpen(pInfo->pSortHandle); + + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, terrno); + } + + pOperator->status = OP_RES_TO_RETURN; + + OPTR_SET_OPENED(pOperator); + return TSDB_CODE_SUCCESS; +} + +SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SOperatorInfo* pOperator) { + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + blockDataCleanup(pDataBlock); + + SSDataBlock* p = tsortGetSortedDataBlock(pHandle); + if (p == NULL) { + return NULL; + } + + blockDataEnsureCapacity(p, capacity); + + while (1) { + STupleHandle* pTupleHandle = NULL; + if (pInfo->prefetchedTuple == NULL) { + pTupleHandle = tsortNextTuple(pHandle); + } else { + pTupleHandle = pInfo->prefetchedTuple; + pInfo->groupId = tsortGetGroupId(pTupleHandle); + pInfo->prefetchedTuple = NULL; + } + + if (pTupleHandle == NULL) { + break; + } + + uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); + if (!pInfo->hasGroupId) { + pInfo->groupId = tupleGroupId; + pInfo->hasGroupId = true; + appendOneRowToDataBlock(p, pTupleHandle); + } else if (pInfo->groupId == tupleGroupId) { + appendOneRowToDataBlock(p, pTupleHandle); + } else { + pInfo->prefetchedTuple = pTupleHandle; + break; + } + + if (p->info.rows >= capacity) { + break; + } + } + + blockDataDestroy(p); + + qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.rows); + return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; +} + +SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableMergeScanInfo* pInfo = pOperator->info; + + int32_t code = pOperator->fpSet._openFn(pOperator); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + SSDataBlock* pBlock = getSortedTableMergeScanBlockData( + pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator); + + if (pBlock != NULL) { + pOperator->resultInfo.totalRows += pBlock->info.rows; + } else { + doSetOperatorCompleted(pOperator); + } + return pBlock; +} + +void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { + STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; + blockDataDestroy(pTableScanInfo->pResBlock); + clearupQueryTableDataCond(&pTableScanInfo->cond); + + for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) { + tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, i); + tsdbCleanupReadHandle(reader); + } + taosArrayDestroy(pTableScanInfo->dataReaders); + + if (pTableScanInfo->pColMatchInfo != NULL) { + taosArrayDestroy(pTableScanInfo->pColMatchInfo); + } + + taosArrayDestroy(pTableScanInfo->sortSourceParams); + pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); + pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); + + taosArrayDestroy(pTableScanInfo->pSortInfo); +} + +int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { + ASSERT(pOptr != NULL); + //TODO: merge these two info into one struct + SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder)); + STableScanInfo* pTableScanInfo = pOptr->info; + *pRecorder = pTableScanInfo->readRecorder; + *pOptrExplain = pRecorder; + *len = sizeof(SFileBlockLoadRecorder); + + SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); + + STableMergeScanInfo* pOperatorInfo = (STableMergeScanInfo*)pOptr->info; + + *pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle); + *pOptrExplain = pInfo; + *len = sizeof(SSortExecInfo); + return TSDB_CODE_SUCCESS; +} + +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SArray* dataReaders, + SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { + STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; + + int32_t numOfCols = 0; + SArray* pColList = + extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID); + + int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + if (pTableScanNode->scan.pScanPseudoCols != NULL) { + pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); + pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset); + } + + pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; + + pInfo->readHandle = *readHandle; + pInfo->interval = extractIntervalInfo(pTableScanNode); + pInfo->sample.sampleRatio = pTableScanNode->ratio; + pInfo->sample.seed = taosGetTimestampSec(); + + pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; + pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + pInfo->dataReaders = dataReaders; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColList; + pInfo->curTWinIdx = 0; + + pInfo->pResBlock = createResDataBlock(pDescNode); + + pInfo->sortSourceParams = taosArrayInit(taosArrayGetSize(dataReaders), sizeof(STableMergeScanSortSourceParam)); + for (int32_t i = 0; i < taosArrayGetSize(dataReaders); ++i) { + STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); + param->readerIdx = i; + param->pOperator = pOperator; + taosArrayPush(pInfo->sortSourceParams, param); + taosMemoryFree(param); + } + pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order); + pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); + int32_t rowSize = pInfo->pResBlock->info.rowSize; + pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2; + pInfo->sortBufSize = pInfo->bufPageSize * 16; + pInfo->hasGroupId = false; + pInfo->prefetchedTuple = NULL; + + pOperator->name = "TableMergeScanOperator"; + // TODO : change it + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; + initResultSizeInfo(pOperator, 1024); + + pOperator->fpSet = + createOperatorFpSet(doOpenTableMergeScanOperator, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, + NULL, NULL, getTableMergeScanExplainExecInfo); + pOperator->cost.openCost = 0; + return pOperator; + +_error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; +} diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index ff093741fa..7b66b04ed3 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -427,7 +427,7 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, pInfo->pInputBlock = pInputBlock; pOperator->name = "MultiwaySortMerge"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; - pOperator->blocking = true; + pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; From 4fa20a1e76167aa2c748916672924239cd80b01f Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 14 Jun 2022 19:44:48 +0800 Subject: [PATCH 2/6] feat: add table merge scan operator --- source/libs/executor/src/scanoperator.c | 46 ++++++++++++++----------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3ad5465d67..47289221c0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1791,7 +1791,7 @@ typedef struct STableMergeScanInfo { uint64_t groupId; STupleHandle* prefetchedTuple; - SArray* sortSourceParams; + SArray* sortSourceParams; SFileBlockLoadRecorder readRecorder; int64_t numOfRows; @@ -1842,12 +1842,17 @@ int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHand if (code != TSDB_CODE_SUCCESS) { goto _error; } + // TODO: free the sublist info and the table list in it for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); ++i) { STableListInfo* subListInfo = taosMemoryCalloc(1, sizeof(subListInfo)); subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i)); - tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); + + tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, subListInfo, queryId, taskId); taosArrayPush(arrayReader, &pReader); + + taosArrayDestroy(subListInfo->pTableList); + taosMemoryFree(subListInfo); } clearupQueryTableDataCond(&cond); @@ -2042,8 +2047,6 @@ int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } - pInfo->startTs = taosGetTimestampUs(); - int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = @@ -2074,7 +2077,8 @@ int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SOperatorInfo* pOperator) { +SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, + SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2137,8 +2141,8 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, code); } - SSDataBlock* pBlock = getSortedTableMergeScanBlockData( - pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator); + SSDataBlock* pBlock = + getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -2154,7 +2158,7 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { clearupQueryTableDataCond(&pTableScanInfo->cond); for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) { - tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, i); + tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, i); tsdbCleanupReadHandle(reader); } taosArrayDestroy(pTableScanInfo->dataReaders); @@ -2170,22 +2174,22 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pTableScanInfo->pSortInfo); } +typedef struct STableMergeScanExecInfo { + SFileBlockLoadRecorder blockRecorder; + SSortExecInfo sortExecInfo; +} STableMergeScanExecInfo; + int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { ASSERT(pOptr != NULL); - //TODO: merge these two info into one struct - SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder)); - STableScanInfo* pTableScanInfo = pOptr->info; - *pRecorder = pTableScanInfo->readRecorder; - *pOptrExplain = pRecorder; - *len = sizeof(SFileBlockLoadRecorder); + // TODO: merge these two info into one struct + STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo)); + STableMergeScanInfo* pInfo = pOptr->info; + execInfo->blockRecorder = pInfo->readRecorder; + execInfo->sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); - SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); - - STableMergeScanInfo* pOperatorInfo = (STableMergeScanInfo*)pOptr->info; - - *pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle); - *pOptrExplain = pInfo; - *len = sizeof(SSortExecInfo); + *pOptrExplain = execInfo; + *len = sizeof(STableMergeScanExecInfo); + return TSDB_CODE_SUCCESS; } From a38f70ba7ce21bfc923bfb34126d3253a104afd1 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 14 Jun 2022 23:34:50 +0800 Subject: [PATCH 3/6] fix: select * from t core dump --- source/libs/executor/inc/executorimpl.h | 5 +++ source/libs/executor/src/executorimpl.c | 52 +++++++++++++++---------- source/libs/executor/src/scanoperator.c | 3 +- 3 files changed, 37 insertions(+), 23 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e8dab42129..701c7ee0f3 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -909,6 +909,11 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, SNode* pTagCond); +int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, + STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, + uint64_t taskId, SNode* pTagCond); +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SArray* dataReaders, + SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f5d813f2bf..f6a5899ad8 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4675,33 +4675,43 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; +// +// tsdbReaderT pDataReader = +// doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); +// if (pDataReader == NULL && terrno != 0) { +// return NULL; +// } +// +// int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); +// if (code) { +// tsdbCleanupReadHandle(pDataReader); +// return NULL; +// } +// +// SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys); +// code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json +// taosArrayDestroy(groupKeys); +// if (code){ +// tsdbCleanupReadHandle(pDataReader); +// return NULL; +// } +// +// SOperatorInfo* pOperator = +// createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); +// +// STableScanInfo* pScanInfo = pOperator->info; +// pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; - tsdbReaderT pDataReader = - doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); - if (pDataReader == NULL && terrno != 0) { - return NULL; - } - - int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); - if (code) { - tsdbCleanupReadHandle(pDataReader); - return NULL; - } - + SArray* dataReaders = taosArrayInit(8, POINTER_BYTES); + createMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId, pTagCond); + extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys); - code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json + generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json taosArrayDestroy(groupKeys); - if (code){ - tsdbCleanupReadHandle(pDataReader); - return NULL; - } - SOperatorInfo* pOperator = - createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); - + createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; - return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 47289221c0..e342912727 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1865,7 +1865,7 @@ _error: static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableScanInfo* pInfo = pOperator->info; + STableMergeScanInfo* pInfo = pOperator->info; SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; @@ -2154,7 +2154,6 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; - blockDataDestroy(pTableScanInfo->pResBlock); clearupQueryTableDataCond(&pTableScanInfo->cond); for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) { From 9f9a55f4f029319127f2936f03aa80dea2980e68 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 15 Jun 2022 08:56:45 +0800 Subject: [PATCH 4/6] fix: change table scan to table merge scan --- source/libs/executor/src/scanoperator.c | 12 ++++-------- source/libs/executor/src/sortoperator.c | 4 +++- source/libs/executor/src/tsort.c | 6 +++++- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e342912727..ac4b4d45b6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2077,13 +2077,10 @@ int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, - SOperatorInfo* pOperator) { +SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capacity, SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - blockDataCleanup(pDataBlock); - SSDataBlock* p = tsortGetSortedDataBlock(pHandle); if (p == NULL) { return NULL; @@ -2122,10 +2119,9 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* } } - blockDataDestroy(p); - qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.rows); - return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; + qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows); + return (p->info.rows > 0) ? p : NULL; } SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { @@ -2142,7 +2138,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { } SSDataBlock* pBlock = - getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator); + getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 7b66b04ed3..1008a5263c 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -76,7 +76,9 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { colDataAppendNULL(pColInfo, pBlock->info.rows); } else { char* pData = tsortGetValue(pTupleHandle, i); - colDataAppend(pColInfo, pBlock->info.rows, pData, false); + if (pData != NULL) { + colDataAppend(pColInfo, pBlock->info.rows, pData, false); + } } } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index f63a9351c6..1473bd81bb 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -738,7 +738,11 @@ bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) { void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) { SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex); - return colDataGetData(pColInfo, pVHandle->rowIndex); + if (pColInfo->pData == NULL) { + return NULL; + } else { + return colDataGetData(pColInfo, pVHandle->rowIndex); + } } uint64_t tsortGetGroupId(STupleHandle* pVHandle) { From 562ec27c7b7a5eb3cfdf3f0be1277840ce2bf079 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 15 Jun 2022 16:40:43 +0800 Subject: [PATCH 5/6] feat: add merge table scan operator --- source/libs/executor/src/executorimpl.c | 53 +++++++++++----------- source/libs/planner/src/planPhysiCreater.c | 4 +- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f6a5899ad8..b96db082bb 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4675,32 +4675,33 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; -// -// tsdbReaderT pDataReader = -// doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); -// if (pDataReader == NULL && terrno != 0) { -// return NULL; -// } -// -// int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); -// if (code) { -// tsdbCleanupReadHandle(pDataReader); -// return NULL; -// } -// -// SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys); -// code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json -// taosArrayDestroy(groupKeys); -// if (code){ -// tsdbCleanupReadHandle(pDataReader); -// return NULL; -// } -// -// SOperatorInfo* pOperator = -// createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); -// -// STableScanInfo* pScanInfo = pOperator->info; -// pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; + + tsdbReaderT pDataReader = + doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); + if (pDataReader == NULL && terrno != 0) { + return NULL; + } + + int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); + if (code) { + tsdbCleanupReadHandle(pDataReader); + return NULL; + } + + SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys); + code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json + taosArrayDestroy(groupKeys); + if (code) { + tsdbCleanupReadHandle(pDataReader); + return NULL; + } + + SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + STableScanInfo* pScanInfo = pOperator->info; + pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; + return pOperator; + } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { + STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; SArray* dataReaders = taosArrayInit(8, POINTER_BYTES); createMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId, pTagCond); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 4a0348151b..08e693da71 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -470,8 +470,8 @@ static ENodeType getScanOperatorType(EScanType scanType) { case SCAN_TYPE_STREAM: return QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; case SCAN_TYPE_TABLE_MERGE: - return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - // return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; + // return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; default: break; } From 8efaa65adea2d694216c0f6593a57d6b76d6c3f0 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 15 Jun 2022 20:12:04 +0800 Subject: [PATCH 6/6] fix: use different datablock for each sort source --- source/libs/executor/src/scanoperator.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a5ba48f871..23709be9b6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2052,17 +2052,20 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc typedef struct STableMergeScanSortSourceParam { SOperatorInfo* pOperator; int32_t readerIdx; + SSDataBlock* inputBlock; } STableMergeScanSortSourceParam; static SSDataBlock* getTableDataBlock(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; int32_t readerIdx = source->readerIdx; + SSDataBlock* pBlock = source->inputBlock; STableMergeScanInfo* pTableScanInfo = pOperator->info; - SSDataBlock* pBlock = pTableScanInfo->pResBlock; int64_t st = taosGetTimestampUs(); + blockDataCleanup(pBlock); + tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); while (tsdbNextDataBlock(reader)) { if (isTaskKilled(pOperator->pTaskInfo)) { @@ -2097,7 +2100,6 @@ static SSDataBlock* getTableDataBlock(void* param) { pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime; return pBlock; } return NULL; @@ -2134,9 +2136,7 @@ int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) { size_t numReaders = taosArrayGetSize(pInfo->dataReaders); for (int32_t i = 0; i < numReaders; ++i) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); - STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); - param->readerIdx = i; - param->pOperator = pOperator; + STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); ps->param = param; tsortAddSource(pInfo->pSortHandle, ps); } @@ -2310,6 +2310,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); param->readerIdx = i; param->pOperator = pOperator; + param->inputBlock = createOneDataBlock(pInfo->pResBlock, false); taosArrayPush(pInfo->sortSourceParams, param); taosMemoryFree(param); }