enh: optimize table merge scan performance
This commit is contained in:
parent
189ff567e0
commit
ddab83a2ba
|
@ -44,7 +44,8 @@ typedef struct SSortSource {
|
|||
void* param;
|
||||
bool onlyRef;
|
||||
};
|
||||
|
||||
int64_t fetchUs;
|
||||
int64_t fetchNum;
|
||||
} SSortSource;
|
||||
|
||||
typedef struct SMsortComparParam {
|
||||
|
|
|
@ -44,6 +44,7 @@ typedef struct STableMergeScanSortSourceParam {
|
|||
int32_t readerIdx;
|
||||
uint64_t uid;
|
||||
SSDataBlock* inputBlock;
|
||||
STsdbReader* dataReader;
|
||||
} STableMergeScanSortSourceParam;
|
||||
|
||||
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
||||
|
@ -2586,6 +2587,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
int32_t readIdx = source->readerIdx;
|
||||
SSDataBlock* pBlock = source->inputBlock;
|
||||
int32_t code = 0;
|
||||
|
||||
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
|
||||
|
||||
|
@ -2593,12 +2595,14 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
|
||||
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||
|
||||
int32_t code =
|
||||
tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
|
||||
if (NULL == source->dataReader) {
|
||||
code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo));
|
||||
if (code != 0) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
}
|
||||
|
||||
pInfo->base.dataReader = source->dataReader;
|
||||
STsdbReader* reader = pInfo->base.dataReader;
|
||||
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
|
||||
while (tsdbNextDataBlock(reader)) {
|
||||
|
@ -2637,14 +2641,10 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
qTrace("tsdb/read-table-data: %p, close reader", reader);
|
||||
tsdbReaderClose(pInfo->base.dataReader);
|
||||
pInfo->base.dataReader = NULL;
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
qDebug("8");
|
||||
|
||||
tsdbReaderClose(pInfo->base.dataReader);
|
||||
pInfo->base.dataReader = NULL;
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2759,6 +2759,8 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
|
||||
blockDataDestroy(param->inputBlock);
|
||||
tsdbReaderClose(param->dataReader);
|
||||
param->dataReader = NULL;
|
||||
}
|
||||
taosArrayClear(pInfo->sortSourceParams);
|
||||
|
||||
|
@ -2871,15 +2873,14 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
|||
for (int32_t i = 0; i < numOfTable; i++) {
|
||||
STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
|
||||
blockDataDestroy(p->inputBlock);
|
||||
tsdbReaderClose(p->dataReader);
|
||||
p->dataReader = NULL;
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
||||
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
||||
pTableScanInfo->pSortHandle = NULL;
|
||||
|
||||
tsdbReaderClose(pTableScanInfo->base.dataReader);
|
||||
pTableScanInfo->base.dataReader = NULL;
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
|
||||
SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
|
||||
taosMemoryFree(pCond->colList);
|
||||
|
@ -2896,8 +2897,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
|||
taosArrayDestroy(pTableScanInfo->pSortInfo);
|
||||
cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
|
||||
|
||||
tsdbReaderClose(pTableScanInfo->base.dataReader);
|
||||
pTableScanInfo->base.dataReader = NULL;
|
||||
taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);
|
||||
|
||||
taosMemoryFreeClear(param);
|
||||
|
|
|
@ -108,12 +108,18 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void tsortClearOrderdSource(SArray* pOrderedSource) {
|
||||
void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) {
|
||||
for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
|
||||
SSortSource** pSource = taosArrayGet(pOrderedSource, i);
|
||||
if (NULL == *pSource) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (fetchUs) {
|
||||
*fetchUs += (*pSource)->fetchUs;
|
||||
*fetchNum += (*pSource)->fetchNum;
|
||||
}
|
||||
|
||||
// release pageIdList
|
||||
if ((*pSource)->pageIdList) {
|
||||
taosArrayDestroy((*pSource)->pageIdList);
|
||||
|
@ -147,7 +153,10 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
|
|||
taosMemoryFreeClear(pSortHandle->idStr);
|
||||
blockDataDestroy(pSortHandle->pDataBlock);
|
||||
|
||||
tsortClearOrderdSource(pSortHandle->pOrderedSource);
|
||||
int64_t fetchUs = 0, fetchNum = 0;
|
||||
tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
|
||||
qError("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
|
||||
|
||||
taosArrayDestroy(pSortHandle->pOrderedSource);
|
||||
taosMemoryFreeClear(pSortHandle);
|
||||
}
|
||||
|
@ -307,7 +316,7 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32
|
|||
}
|
||||
|
||||
int64_t et = taosGetTimestampUs();
|
||||
qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
|
||||
qError("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -365,7 +374,10 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
|
|||
releaseBufPage(pHandle->pBuf, pPage);
|
||||
}
|
||||
} else {
|
||||
int64_t st = taosGetTimestampUs();
|
||||
pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param);
|
||||
pSource->fetchUs += taosGetTimestampUs() - st;
|
||||
pSource->fetchNum++;
|
||||
if (pSource->src.pBlock == NULL) {
|
||||
(*numOfCompleted) += 1;
|
||||
pSource->src.rowIndex = -1;
|
||||
|
@ -602,7 +614,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
|||
}
|
||||
}
|
||||
|
||||
tsortClearOrderdSource(pHandle->pOrderedSource);
|
||||
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
|
||||
taosArrayAddAll(pHandle->pOrderedSource, pResList);
|
||||
taosArrayDestroy(pResList);
|
||||
|
||||
|
@ -644,7 +656,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
SSortSource* source = *pSource;
|
||||
*pSource = NULL;
|
||||
|
||||
tsortClearOrderdSource(pHandle->pOrderedSource);
|
||||
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
|
||||
|
|
Loading…
Reference in New Issue