diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index cff568aebc..d51a24bb43 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -44,7 +44,8 @@ typedef struct SSortSource { void* param; bool onlyRef; }; - + int64_t fetchUs; + int64_t fetchNum; } SSortSource; typedef struct SMsortComparParam { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d820324128..ce647014ae 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -31,6 +31,7 @@ #include "thash.h" #include "ttypes.h" +#define MULTI_READER_MAX_TABLE_NUM 5000 #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) @@ -43,7 +44,9 @@ typedef struct STableMergeScanSortSourceParam { SOperatorInfo* pOperator; int32_t readerIdx; uint64_t uid; - SSDataBlock* inputBlock; + SSDataBlock* inputBlock; + bool multiReader; + STsdbReader* dataReader; } STableMergeScanSortSourceParam; static bool processBlockWithProbability(const SSampleExecInfo* pInfo); @@ -2588,6 +2591,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t readIdx = source->readerIdx; SSDataBlock* pBlock = source->inputBlock; + int32_t code = 0; SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx); @@ -2595,17 +2599,20 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); SReadHandle* pHandle = &pInfo->base.readHandle; - int32_t code = - tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo)); - if (code != 0) { - T_LONG_JMP(pTaskInfo->env, code); + if (NULL == source->dataReader || !source->multiReader) { + code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo)); + if (code != 0) { + T_LONG_JMP(pTaskInfo->env, code); + } } - + + pInfo->base.dataReader = source->dataReader; STsdbReader* reader = pInfo->base.dataReader; qTrace("tsdb/read-table-data: %p, enter next reader", reader); while (tsdbNextDataBlock(reader)) { if (isTaskKilled(pTaskInfo)) { tsdbReleaseDataBlock(reader); + pInfo->base.dataReader = NULL; T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } @@ -2639,14 +2646,18 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; qTrace("tsdb/read-table-data: %p, close reader", reader); - tsdbReaderClose(pInfo->base.dataReader); + if (!source->multiReader) { + tsdbReaderClose(pInfo->base.dataReader); + source->dataReader = NULL; + } pInfo->base.dataReader = NULL; return pBlock; } - qDebug("8"); - - tsdbReaderClose(pInfo->base.dataReader); + if (!source->multiReader) { + tsdbReaderClose(pInfo->base.dataReader); + source->dataReader = NULL; + } pInfo->base.dataReader = NULL; return NULL; } @@ -2718,6 +2729,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanSortSourceParam param = {0}; param.readerIdx = i; param.pOperator = pOperator; + param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false; param.inputBlock = createOneDataBlock(pInfo->pResBlock, false); blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity); @@ -2761,6 +2773,8 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { for (int32_t i = 0; i < numOfTable; ++i) { STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); blockDataDestroy(param->inputBlock); + tsdbReaderClose(param->dataReader); + param->dataReader = NULL; } taosArrayClear(pInfo->sortSourceParams); @@ -2871,15 +2885,17 @@ void destroyTableMergeScanOperatorInfo(void* param) { for (int32_t i = 0; i < numOfTable; i++) { STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i); blockDataDestroy(p->inputBlock); + tsdbReaderClose(p->dataReader); + p->dataReader = NULL; } + tsdbReaderClose(pTableScanInfo->base.dataReader); + pTableScanInfo->base.dataReader = NULL; + taosArrayDestroy(pTableScanInfo->sortSourceParams); tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL; - tsdbReaderClose(pTableScanInfo->base.dataReader); - pTableScanInfo->base.dataReader = NULL; - for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) { SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i); taosMemoryFree(pCond->colList); @@ -2896,8 +2912,6 @@ void destroyTableMergeScanOperatorInfo(void* param) { taosArrayDestroy(pTableScanInfo->pSortInfo); cleanupExprSupp(&pTableScanInfo->base.pseudoSup); - tsdbReaderClose(pTableScanInfo->base.dataReader); - pTableScanInfo->base.dataReader = NULL; taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache); taosMemoryFreeClear(param); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 291b0cf515..6c8e581b3f 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -108,12 +108,18 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) { return TSDB_CODE_SUCCESS; } -void tsortClearOrderdSource(SArray* pOrderedSource) { +void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) { for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) { SSortSource** pSource = taosArrayGet(pOrderedSource, i); if (NULL == *pSource) { continue; } + + if (fetchUs) { + *fetchUs += (*pSource)->fetchUs; + *fetchNum += (*pSource)->fetchNum; + } + // release pageIdList if ((*pSource)->pageIdList) { taosArrayDestroy((*pSource)->pageIdList); @@ -147,7 +153,10 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { taosMemoryFreeClear(pSortHandle->idStr); blockDataDestroy(pSortHandle->pDataBlock); - tsortClearOrderdSource(pSortHandle->pOrderedSource); + int64_t fetchUs = 0, fetchNum = 0; + tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum); + qError("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr); + taosArrayDestroy(pSortHandle->pOrderedSource); taosMemoryFreeClear(pSortHandle); } @@ -307,7 +316,7 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32 } int64_t et = taosGetTimestampUs(); - qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr); + qError("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr); } return code; @@ -365,7 +374,10 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT releaseBufPage(pHandle->pBuf, pPage); } } else { + int64_t st = taosGetTimestampUs(); pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param); + pSource->fetchUs += taosGetTimestampUs() - st; + pSource->fetchNum++; if (pSource->src.pBlock == NULL) { (*numOfCompleted) += 1; pSource->src.rowIndex = -1; @@ -602,7 +614,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { } } - tsortClearOrderdSource(pHandle->pOrderedSource); + tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); taosArrayAddAll(pHandle->pOrderedSource, pResList); taosArrayDestroy(pResList); @@ -644,7 +656,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { SSortSource* source = *pSource; *pSource = NULL; - tsortClearOrderdSource(pHandle->pOrderedSource); + tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); while (1) { SSDataBlock* pBlock = pHandle->fetchfp(source->param);