From a205eba2abea204c9e16b9a5a63a6e5d6b620e1c Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 25 Jul 2023 09:48:08 +0800 Subject: [PATCH 1/3] fix: choose heap sort for table merge scan when row size is big and has limit --- source/libs/executor/src/scanoperator.c | 24 ++++++++++++++++-------- source/libs/executor/src/sortoperator.c | 14 +++++++------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 810f9709f5..677f2db298 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2837,15 +2837,23 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; - pInfo->sortBufSize = 2048 * pInfo->bufPageSize; - int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, - pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); + bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1; int64_t mergeLimit = -1; - if (pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1) { - mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset; - } - tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit); + if (hasLimit) { + mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset; + } + size_t szRow = blockDataGetRowSize(pInfo->pResBlock); + if (szRow > 1024 && hasLimit) { + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, + NULL, pTaskInfo->id.str, mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024); + } else { + pInfo->sortBufSize = 2048 * pInfo->bufPageSize; + int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, + pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); + + tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit); + } tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL); // one table has one data block diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 9c70a95389..459474d06e 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -54,19 +54,19 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* int32_t numOfCols = 0; pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); pOperator->exprSupp.numOfExprs = numOfCols; - calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys); - pInfo->maxRows = -1; - if (pSortNode->node.pLimit) { - SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit; - if (pLimit->limit > 0) pInfo->maxRows = pLimit->limit; - } - int32_t numOfOutputCols = 0; int32_t code = extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } + + calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys); + pInfo->maxRows = -1; + if (pSortNode->node.pLimit) { + SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit; + if (pLimit->limit > 0) pInfo->maxRows = pLimit->limit + pLimit->offset; + } pOperator->exprSupp.pCtx = createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore); From ca2ad71bb892fd5544373f96ade9ab0b607c958f Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 25 Jul 2023 13:08:37 +0800 Subject: [PATCH 2/3] fix: return null when there are no input stream blocks for priority queue sort --- source/libs/executor/src/tsort.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 738f3f7131..7784bc0c94 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1405,6 +1405,9 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { } static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) { + if (pHandle->pDataBlock == NULL) { // when no input stream datablock + return NULL; + } blockDataCleanup(pHandle->pDataBlock); blockDataEnsureCapacity(pHandle->pDataBlock, 1); // abandon the top tuple if queue size bigger than max size From f4c28d848b911285f4d087ad9f6ac45375a38f28 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 25 Jul 2023 15:23:00 +0800 Subject: [PATCH 3/3] fix: table merge scan use heap sort when there is limit --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 677f2db298..9e5d3a3ab6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2843,7 +2843,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset; } size_t szRow = blockDataGetRowSize(pInfo->pResBlock); - if (szRow > 1024 && hasLimit) { + if (hasLimit) { pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024); } else {