Merge pull request #22301 from taosdata/feat/3.0/TD-25394
feat: optimize table merge scan when 1 child table
This commit is contained in:
commit
0ea34286cf
|
@ -191,6 +191,8 @@ int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols);
|
||||||
bool tsortIsClosed(SSortHandle* pHandle);
|
bool tsortIsClosed(SSortHandle* pHandle);
|
||||||
void tsortSetClosed(SSortHandle* pHandle);
|
void tsortSetClosed(SSortHandle* pHandle);
|
||||||
|
|
||||||
|
void setSingleTableMerge(SSortHandle* pHandle);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -2938,17 +2938,22 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
// one table has one data block
|
// one table has one data block
|
||||||
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
||||||
|
|
||||||
STableMergeScanSortSourceParam param = {0};
|
STableMergeScanSortSourceParam *param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam));
|
||||||
param.pOperator = pOperator;
|
param->pOperator = pOperator;
|
||||||
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
|
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
|
||||||
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL);
|
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL);
|
||||||
|
|
||||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||||
ps->param = ¶m;
|
ps->param = param;
|
||||||
ps->onlyRef = true;
|
ps->onlyRef = false;
|
||||||
tsortAddSource(pInfo->pSortHandle, ps);
|
tsortAddSource(pInfo->pSortHandle, ps);
|
||||||
|
|
||||||
int32_t code = tsortOpen(pInfo->pSortHandle);
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
if (numOfTable == 1) {
|
||||||
|
setSingleTableMerge(pInfo->pSortHandle);
|
||||||
|
} else {
|
||||||
|
code = tsortOpen(pInfo->pSortHandle);
|
||||||
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||||
|
|
|
@ -69,8 +69,14 @@ struct SSortHandle {
|
||||||
_sort_fetch_block_fn_t fetchfp;
|
_sort_fetch_block_fn_t fetchfp;
|
||||||
_sort_merge_compar_fn_t comparFn;
|
_sort_merge_compar_fn_t comparFn;
|
||||||
SMultiwayMergeTreeInfo* pMergeTree;
|
SMultiwayMergeTreeInfo* pMergeTree;
|
||||||
|
|
||||||
|
bool singleTableMerge;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void setSingleTableMerge(SSortHandle* pHandle) {
|
||||||
|
pHandle->singleTableMerge = true;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
|
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
|
||||||
|
|
||||||
// | offset[0] | offset[1] |....| nullbitmap | data |...|
|
// | offset[0] | offset[1] |....| nullbitmap | data |...|
|
||||||
|
@ -1453,6 +1459,26 @@ static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) {
|
||||||
return &pHandle->tupleHandle;
|
return &pHandle->tupleHandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static STupleHandle* tsortSingleTableMergeNextTuple(SSortHandle* pHandle) {
|
||||||
|
if (1 == pHandle->numOfCompletedSources) return NULL;
|
||||||
|
if (pHandle->tupleHandle.pBlock && pHandle->tupleHandle.rowIndex + 1 < pHandle->tupleHandle.pBlock->info.rows) {
|
||||||
|
pHandle->tupleHandle.rowIndex++;
|
||||||
|
} else {
|
||||||
|
if (pHandle->tupleHandle.rowIndex == -1) return NULL;
|
||||||
|
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
|
||||||
|
SSortSource* source = *pSource;
|
||||||
|
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
|
||||||
|
if (!pBlock || pBlock->info.rows == 0) {
|
||||||
|
setCurrentSourceDone(source, pHandle);
|
||||||
|
pHandle->tupleHandle.pBlock = NULL;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pHandle->tupleHandle.pBlock = pBlock;
|
||||||
|
pHandle->tupleHandle.rowIndex = 0;
|
||||||
|
}
|
||||||
|
return &pHandle->tupleHandle;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsortOpen(SSortHandle* pHandle) {
|
int32_t tsortOpen(SSortHandle* pHandle) {
|
||||||
if (pHandle->opened) {
|
if (pHandle->opened) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1470,7 +1496,9 @@ int32_t tsortOpen(SSortHandle* pHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
|
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
|
||||||
if (pHandle->pBoundedQueue)
|
if (pHandle->singleTableMerge)
|
||||||
|
return tsortSingleTableMergeNextTuple(pHandle);
|
||||||
|
else if (pHandle->pBoundedQueue)
|
||||||
return tsortPQSortNextTuple(pHandle);
|
return tsortPQSortNextTuple(pHandle);
|
||||||
else
|
else
|
||||||
return tsortBufMergeSortNextTuple(pHandle);
|
return tsortBufMergeSortNextTuple(pHandle);
|
||||||
|
|
Loading…
Reference in New Issue