fix: optimize table merge scan perf
This commit is contained in:
parent
ddab83a2ba
commit
ddfcba76f3
|
@ -31,6 +31,7 @@
|
|||
#include "thash.h"
|
||||
#include "ttypes.h"
|
||||
|
||||
#define MULTI_READER_MAX_TABLE_NUM 5000
|
||||
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||
|
||||
|
@ -44,6 +45,7 @@ typedef struct STableMergeScanSortSourceParam {
|
|||
int32_t readerIdx;
|
||||
uint64_t uid;
|
||||
SSDataBlock* inputBlock;
|
||||
bool multiReader;
|
||||
STsdbReader* dataReader;
|
||||
} STableMergeScanSortSourceParam;
|
||||
|
||||
|
@ -2595,7 +2597,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
|
||||
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||
|
||||
if (NULL == source->dataReader) {
|
||||
if (NULL == source->dataReader || !source->multiReader) {
|
||||
code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo));
|
||||
if (code != 0) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
|
@ -2608,6 +2610,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
while (tsdbNextDataBlock(reader)) {
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
tsdbReleaseDataBlock(reader);
|
||||
pInfo->base.dataReader = NULL;
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
|
||||
|
@ -2641,10 +2644,18 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
qTrace("tsdb/read-table-data: %p, close reader", reader);
|
||||
if (!source->multiReader) {
|
||||
tsdbReaderClose(pInfo->base.dataReader);
|
||||
source->dataReader = NULL;
|
||||
}
|
||||
pInfo->base.dataReader = NULL;
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
if (!source->multiReader) {
|
||||
tsdbReaderClose(pInfo->base.dataReader);
|
||||
source->dataReader = NULL;
|
||||
}
|
||||
pInfo->base.dataReader = NULL;
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2716,6 +2727,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
STableMergeScanSortSourceParam param = {0};
|
||||
param.readerIdx = i;
|
||||
param.pOperator = pOperator;
|
||||
param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false;
|
||||
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||
blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);
|
||||
|
||||
|
@ -2877,6 +2889,9 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
|||
p->dataReader = NULL;
|
||||
}
|
||||
|
||||
tsdbReaderClose(pTableScanInfo->base.dataReader);
|
||||
pTableScanInfo->base.dataReader = NULL;
|
||||
|
||||
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
||||
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
||||
pTableScanInfo->pSortHandle = NULL;
|
||||
|
|
Loading…
Reference in New Issue