diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5db1e00410..23a2eb5492 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -31,6 +31,7 @@ #include "thash.h" #include "ttypes.h" +#define MULTI_READER_MAX_TABLE_NUM 5000 #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) @@ -44,6 +45,7 @@ typedef struct STableMergeScanSortSourceParam { int32_t readerIdx; uint64_t uid; SSDataBlock* inputBlock; + bool multiReader; STsdbReader* dataReader; } STableMergeScanSortSourceParam; @@ -2595,7 +2597,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); SReadHandle* pHandle = &pInfo->base.readHandle; - if (NULL == source->dataReader) { + if (NULL == source->dataReader || !source->multiReader) { code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo)); if (code != 0) { T_LONG_JMP(pTaskInfo->env, code); @@ -2608,6 +2610,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { while (tsdbNextDataBlock(reader)) { if (isTaskKilled(pTaskInfo)) { tsdbReleaseDataBlock(reader); + pInfo->base.dataReader = NULL; T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } @@ -2641,10 +2644,18 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; qTrace("tsdb/read-table-data: %p, close reader", reader); + if (!source->multiReader) { + tsdbReaderClose(pInfo->base.dataReader); + source->dataReader = NULL; + } pInfo->base.dataReader = NULL; return pBlock; } + if (!source->multiReader) { + tsdbReaderClose(pInfo->base.dataReader); + source->dataReader = NULL; + } pInfo->base.dataReader = NULL; return NULL; } @@ -2716,6 +2727,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanSortSourceParam param = {0}; param.readerIdx = i; param.pOperator = pOperator; + param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false; param.inputBlock = createOneDataBlock(pInfo->pResBlock, false); blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity); @@ -2877,6 +2889,9 @@ void destroyTableMergeScanOperatorInfo(void* param) { p->dataReader = NULL; } + tsdbReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.dataReader = NULL; + taosArrayDestroy(pTableScanInfo->sortSourceParams); tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL;