From 1d7515213bfb9a22f832f862c18a1079d3598c7a Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Tue, 11 Jul 2023 11:35:21 +0800 Subject: [PATCH] feat: 1. add limit for diskBasedBuf 2. use referenced tuple before actually pushing into pq 3. use limitInfo instead of maxRows in sort pyhsical node --- include/libs/nodes/plannodes.h | 2 - include/util/theap.h | 10 +- source/libs/executor/inc/tsort.h | 6 +- source/libs/executor/src/sortoperator.c | 8 +- source/libs/executor/src/tsort.c | 124 +++++++++++++++------ source/libs/nodes/src/nodesCloneFuncs.c | 1 - source/libs/nodes/src/nodesCodeFuncs.c | 6 - source/libs/nodes/src/nodesMsgFuncs.c | 8 +- source/libs/planner/src/planLogicCreater.c | 2 - source/libs/planner/src/planOptimizer.c | 19 ++-- source/libs/planner/src/planPhysiCreater.c | 1 - source/libs/planner/src/planSpliter.c | 1 - source/util/src/theap.c | 28 +++-- 13 files changed, 136 insertions(+), 80 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index c1481da80c..453c5d4914 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -246,7 +246,6 @@ typedef struct SSortLogicNode { SLogicNode node; SNodeList* pSortKeys; bool groupSort; - int64_t maxRows; } SSortLogicNode; typedef struct SPartitionLogicNode { @@ -524,7 +523,6 @@ typedef struct SSortPhysiNode { SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode SNodeList* pTargets; - int64_t maxRows; } SSortPhysiNode; typedef SSortPhysiNode SGroupSortPhysiNode; diff --git a/include/util/theap.h b/include/util/theap.h index 8ddeeb28a4..b795db6aea 100644 --- a/include/util/theap.h +++ b/include/util/theap.h @@ -77,7 +77,7 @@ PriorityQueueNode* taosPQTop(PriorityQueue* pq); size_t taosPQSize(PriorityQueue* pq); -void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node); +PriorityQueueNode* taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node); void taosPQPop(PriorityQueue* pq); @@ -89,7 +89,13 @@ void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn); void destroyBoundedQueue(BoundedQueue* q); -void taosBQPush(BoundedQueue* q, PriorityQueueNode* n); +/* + * Push one node into BQ + * @retval NULL if n is upper than top node in q, and n is not freed + * @retval the pushed Node if pushing succeeded + * @note if maxSize exceeded, the original highest node is popped and freed with deleteFn + * */ +PriorityQueueNode* taosBQPush(BoundedQueue* q, PriorityQueueNode* n); PriorityQueueNode* taosBQTop(BoundedQueue* q); diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 7a0d236a37..627aa825c6 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -64,8 +64,8 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* /** * * @param type - * @param maxRows keep maxRows at most - * @param maxTupleLength max len of one tuple, for check if heap sort is applicable + * @param maxRows keep maxRows at most, if 0, pq sort will not be used + * @param maxTupleLength max len of one tuple, for check if pq sort is applicable * @param sortBufSize sort memory buf size, for check if heap sort is applicable * @return */ @@ -73,6 +73,8 @@ SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pag SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength, uint32_t sortBufSize); +void tsortSetForceUsePQSort(SSortHandle* pHandle); + /** * * @param pSortHandle diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 20fb588a02..9c70a95389 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -55,7 +55,11 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); pOperator->exprSupp.numOfExprs = numOfCols; calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys); - pInfo->maxRows = pSortNode->maxRows; + pInfo->maxRows = -1; + if (pSortNode->node.pLimit) { + SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit; + if (pLimit->limit > 0) pInfo->maxRows = pLimit->limit; + } int32_t numOfOutputCols = 0; int32_t code = @@ -718,7 +722,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData resetLimitInfoForNextGroup(&pInfo->limitInfo); } - if (p->info.rows > 0) { + if (p->info.rows > 0 || limitReached) { break; } } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index c0be5f99c1..d26db6536f 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -45,6 +45,7 @@ struct SSortHandle { uint64_t maxRows; uint32_t maxTupleLength; uint32_t sortBufSize; + bool forceUsePQSort; BoundedQueue* pBoundedQueue; uint32_t tmpRowIdx; @@ -73,7 +74,7 @@ static void* createTuple(uint32_t columnNum, uint32_t tupleLen) { uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen; return taosMemoryCalloc(1, totalLen); } -static void destoryTuple(void* t) { taosMemoryFree(t); } +static void destoryAllocatedTuple(void* t) { taosMemoryFree(t); } #define tupleOffset(tuple, colIdx) ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx)) #define tupleSetOffset(tuple, colIdx, offset) (*tupleOffset(tuple, colIdx) = offset) @@ -107,12 +108,65 @@ static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) { return t + *tupleOffset(t, colIdx); } -static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param); - SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) { return createOneDataBlock(pSortHandle->pDataBlock, false); } +#define AllocatedTupleType 0 +#define ReferencedTupleType 1 // tuple references to one row in pDataBlock +typedef struct TupleDesc { + uint8_t type; + char* data; // if type is AllocatedTuple, then points to the created tuple, otherwise points to the DataBlock +} TupleDesc; + +typedef struct ReferencedTuple { + TupleDesc desc; + size_t rowIndex; +} ReferencedTuple; + +static TupleDesc* createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx) { + TupleDesc* t = taosMemoryCalloc(1, sizeof(TupleDesc)); + void* pTuple = createTuple(colNum, tupleLen); + if (!pTuple) { + taosMemoryFree(t); + return NULL; + } + size_t colLen = 0; + uint32_t offset = tupleGetDataStartOffset(colNum); + for (size_t colIdx = 0; colIdx < colNum; ++colIdx) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx); + if (colDataIsNull_s(pCol, rowIdx)) { + offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen); + } else { + colLen = colDataGetRowLength(pCol, rowIdx); + offset = + tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, tupleLen); + } + } + t->type = AllocatedTupleType; + t->data = pTuple; + return t; +} + +void* tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum) { + if (pDesc->type == ReferencedTupleType) { + ReferencedTuple* pRefTuple = (ReferencedTuple*)pDesc; + SColumnInfoData* pCol = taosArrayGet(((SSDataBlock*)pDesc->data)->pDataBlock, colIdx); + if (colDataIsNull_s(pCol, pRefTuple->rowIndex)) return NULL; + return colDataGetData(pCol, pRefTuple->rowIndex); + } else { + return tupleGetField(pDesc->data, colIdx, colNum); + } +} + +void destroyTuple(void* t) { + TupleDesc* pDesc = t; + if (pDesc->type == AllocatedTupleType) { + destoryAllocatedTuple(pDesc->data); + taosMemoryFree(pDesc); + } +} + /** * * @param type @@ -130,11 +184,11 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page pSortHandle->loops = 0; pSortHandle->maxTupleLength = maxTupleLength; - if (maxRows < 0) - pSortHandle->sortBufSize = 0; - else + if (maxRows != 0) { pSortHandle->sortBufSize = sortBufSize; - pSortHandle->maxRows = maxRows; + pSortHandle->maxRows = maxRows; + } + pSortHandle->forceUsePQSort = false; if (pBlock != NULL) { pSortHandle->pDataBlock = createOneDataBlock(pBlock, false); @@ -779,7 +833,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { int64_t el = taosGetTimestampUs() - p; pHandle->sortElapsed += el; - + if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); code = doAddToBuf(pHandle->pDataBlock, pHandle); if (code != TSDB_CODE_SUCCESS) { return code; @@ -804,6 +858,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { return code; } + if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); int64_t el = taosGetTimestampUs() - p; pHandle->sortElapsed += el; @@ -936,8 +991,17 @@ static STupleHandle* tsortBufMergeSortNextTuple(SSortHandle* pHandle) { return &pHandle->tupleHandle; } +static bool tsortIsForceUsePQSort(SSortHandle* pHandle) { + return pHandle->forceUsePQSort == true; +} + +void tsortSetForceUsePQSort(SSortHandle* pHandle) { + pHandle->forceUsePQSort = true; +} + static bool tsortIsPQSortApplicable(SSortHandle* pHandle) { if (pHandle->type != SORT_SINGLESOURCE_SORT) return false; + if (tsortIsForceUsePQSort(pHandle)) return true; uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*)); return maxRowsFitInMemory > pHandle->maxRows; } @@ -956,16 +1020,17 @@ static bool tsortPQComFnReverse(void*a, void* b, void* param) { return 0; } -static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param) { - char* pLTuple = (char*)pLeft; - char* pRTuple = (char*)pRight; +static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) { + TupleDesc* pLeftDesc = (TupleDesc*)pLeft; + TupleDesc* pRightDesc = (TupleDesc*)pRight; + SSortHandle* pHandle = (SSortHandle*)param; SArray* orderInfo = (SArray*)pHandle->pSortInfo; uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock); for (int32_t i = 0; i < orderInfo->size; ++i) { SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i); - void *lData = tupleGetField(pLTuple, pOrder->slotId, colNum); - void *rData = tupleGetField(pRTuple, pOrder->slotId, colNum); + void *lData = tupleDescGetField(pLeftDesc, pOrder->slotId, colNum); + void *rData = tupleDescGetField(pRightDesc, pOrder->slotId, colNum); if (!lData && !rData) continue; if (!lData) return pOrder->nullFirst ? -1 : 1; if (!rData) return pOrder->nullFirst ? 1 : -1; @@ -984,9 +1049,9 @@ static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* para } static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { - pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destoryTuple, pHandle); + pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destroyTuple, pHandle); if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY; - tsortSetComparFp(pHandle, colDataComparFn); + tsortSetComparFp(pHandle, tupleComparFn); SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); SSortSource* source = *pSource; @@ -1018,24 +1083,17 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { } } } - size_t colLen = 0; + ReferencedTuple refTuple = {.desc.data = (char*)pBlock, .desc.type = ReferencedTupleType, .rowIndex = 0}; for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) { - void* pTuple = createTuple(colNum, tupleLen); - if (pTuple == NULL) return TSDB_CODE_OUT_OF_MEMORY; - - uint32_t offset = tupleGetDataStartOffset(colNum); - for (size_t colIdx = 0; colIdx < colNum; ++colIdx) { - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx); - if (colDataIsNull_s(pCol, rowIdx)) { - offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen); - } else { - colLen = colDataGetRowLength(pCol, rowIdx); - offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, - tupleLen); - } + refTuple.rowIndex = rowIdx; + pqNode.data = &refTuple; + PriorityQueueNode* pPushedNode = taosBQPush(pHandle->pBoundedQueue, &pqNode); + if (!pPushedNode) { + // do nothing if push failed + } else { + pPushedNode->data = createAllocatedTuple(pBlock, colNum, tupleLen, rowIdx); + if (pPushedNode->data == NULL) return TSDB_CODE_OUT_OF_MEMORY; } - pqNode.data = pTuple; - taosBQPush(pHandle->pBoundedQueue, &pqNode); } } return TSDB_CODE_SUCCESS; @@ -1044,7 +1102,7 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) { blockDataCleanup(pHandle->pDataBlock); blockDataEnsureCapacity(pHandle->pDataBlock, 1); - // abondan the top tuple if queue size bigger than max size + // abandon the top tuple if queue size bigger than max size if (taosBQSize(pHandle->pBoundedQueue) == taosBQMaxSize(pHandle->pBoundedQueue) + 1) { taosBQPop(pHandle->pBoundedQueue); } @@ -1056,7 +1114,7 @@ static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) { if (taosBQSize(pHandle->pBoundedQueue) > 0) { uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock); PriorityQueueNode* node = taosBQTop(pHandle->pBoundedQueue); - char* pTuple = (char*)node->data; + char* pTuple = ((TupleDesc*)node->data)->data; for (uint32_t i = 0; i < colNum; ++i) { void* pData = tupleGetField(pTuple, i, colNum); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 8305daa45e..6e4dde4ec1 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -502,7 +502,6 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); CLONE_NODE_LIST_FIELD(pSortKeys); COPY_SCALAR_FIELD(groupSort); - COPY_SCALAR_FIELD(maxRows); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 99790e0a93..81116a60b0 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2115,9 +2115,6 @@ static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkSortPhysiPlanTargets, pNode->pTargets); } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkSortPhysiPlanMaxRows, pNode->maxRows); - } return code; } @@ -2135,9 +2132,6 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkSortPhysiPlanTargets, &pNode->pTargets); } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetBigIntValue(pJson, jkSortPhysiPlanMaxRows, &pNode->maxRows); - } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index e79a520615..1ca37defa4 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2594,7 +2594,7 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) { return code; } -enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS, PHY_SORT_CODE_MAX_ROWS }; +enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS }; static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj; @@ -2609,9 +2609,6 @@ static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_TARGETS, nodeListToMsg, pNode->pTargets); } - if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeI64(pEncoder, PHY_SORT_CODE_MAX_ROWS, pNode->maxRows); - } return code; } @@ -2635,9 +2632,6 @@ static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) { case PHY_SORT_CODE_TARGETS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); break; - case PHY_SORT_CODE_MAX_ROWS: - code = tlvDecodeI64(pTlv, &pNode->maxRows); - break; default: break; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 4a8d100db3..713f12e229 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1027,7 +1027,6 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect return TSDB_CODE_OUT_OF_MEMORY; } - pSort->maxRows = -1; pSort->groupSort = pSelect->groupSort; pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR; pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; @@ -1299,7 +1298,6 @@ static int32_t createSetOpSortLogicNode(SLogicPlanContext* pCxt, SSetOperator* p return TSDB_CODE_OUT_OF_MEMORY; } - pSort->maxRows = -1; TSWAP(pSort->node.pLimit, pSetOperator->pLimit); int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 82d883714d..05f478b116 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2635,11 +2635,13 @@ static bool pushDownLimitOptShouldBeOptimized(SLogicNode* pNode) { } SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0); + // push down to sort node if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) { - SLimitNode* pChildLimit = (SLimitNode*)(pChild->pLimit); // if we have pushed down, we skip it - if ((*(SSortLogicNode*)pChild).maxRows != -1) return false; - } else if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild)) { + if (pChild->pLimit) return false; + } else if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild) || QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pNode)) { + // push down to table scan node + // if pNode is sortNode, we skip push down limit info to table scan node return false; } return true; @@ -2654,13 +2656,10 @@ static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0); nodesDestroyNode(pChild->pLimit); if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) { - SLimitNode* pLimitNode = (SLimitNode*)pNode->pLimit; - int64_t maxRows = -1; - if (pLimitNode->limit != -1) { - maxRows = pLimitNode->limit; - if (pLimitNode->offset != -1) maxRows += pLimitNode->offset; - } - ((SSortLogicNode*)pChild)->maxRows = maxRows; + pChild->pLimit = nodesCloneNode(pNode->pLimit); + SLimitNode* pLimit = (SLimitNode*)pChild->pLimit; + pLimit->limit += pLimit->offset; + pLimit->offset = 0; } else { pChild->pLimit = pNode->pLimit; pNode->pLimit = NULL; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index a349e2c0e9..b3d94a5e47 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1374,7 +1374,6 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren if (NULL == pSort) { return TSDB_CODE_OUT_OF_MEMORY; } - pSort->maxRows = pSortLogicNode->maxRows; SNodeList* pPrecalcExprs = NULL; SNodeList* pSortKeys = NULL; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index f352a2bba3..246ee13fb0 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -1018,7 +1018,6 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut splSetParent((SLogicNode*)pPartSort); pPartSort->pSortKeys = pSortKeys; pPartSort->groupSort = pSort->groupSort; - pPartSort->maxRows = pSort->maxRows; code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys); } diff --git a/source/util/src/theap.c b/source/util/src/theap.c index d60606008f..315ddf9367 100644 --- a/source/util/src/theap.c +++ b/source/util/src/theap.c @@ -230,7 +230,7 @@ static void pqSwapPQNode(PriorityQueueNode* a, PriorityQueueNode* b) { size_t taosPQSize(PriorityQueue* pq) { return pqContainerSize(pq); } -static void pqHeapify(PriorityQueue* pq, size_t from, size_t last) { +static PriorityQueueNode* pqHeapify(PriorityQueue* pq, size_t from, size_t last) { size_t largest = from; do { from = largest; @@ -246,6 +246,7 @@ static void pqHeapify(PriorityQueue* pq, size_t from, size_t last) { pqSwapPQNode(pqContainerGetEle(pq, from), pqContainerGetEle(pq, largest)); } } while (largest != from); + return pqContainerGetEle(pq, largest); } static void pqBuildHeap(PriorityQueue* pq) { @@ -257,12 +258,13 @@ static void pqBuildHeap(PriorityQueue* pq) { } } -static void pqReverseHeapify(PriorityQueue* pq, size_t i) { +static PriorityQueueNode* pqReverseHeapify(PriorityQueue* pq, size_t i) { while (i > 0 && !pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) { size_t parentIdx = pqParent(i); pqSwapPQNode(pqContainerGetEle(pq, i), pqContainerGetEle(pq, parentIdx)); i = parentIdx; } + return pqContainerGetEle(pq, i); } static void pqUpdate(PriorityQueue* pq, size_t i) { @@ -290,9 +292,9 @@ PriorityQueueNode* taosPQTop(PriorityQueue* pq) { return pqContainerGetEle(pq, 0); } -void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node) { +PriorityQueueNode* taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node) { taosArrayPush(pq->container, node); - pqReverseHeapify(pq, pqContainerSize(pq) - 1); + return pqReverseHeapify(pq, pqContainerSize(pq) - 1); } void taosPQPop(PriorityQueue* pq) { @@ -324,16 +326,20 @@ void destroyBoundedQueue(BoundedQueue* q) { taosMemoryFree(q); } -void taosBQPush(BoundedQueue* q, PriorityQueueNode* n) { +PriorityQueueNode* taosBQPush(BoundedQueue* q, PriorityQueueNode* n) { if (pqContainerSize(q->queue) == q->maxSize + 1) { PriorityQueueNode* top = pqContainerGetEle(q->queue, 0); - void *p = top->data; - top->data = n->data; - n->data = p; - if (q->queue->deleteFn) q->queue->deleteFn(n->data); - pqHeapify(q->queue, 0, taosBQSize(q)); + if (q->queue->fn(top->data, n->data, q->queue->param)) { + return NULL; + } else { + void* p = top->data; + top->data = n->data; + n->data = p; + if (q->queue->deleteFn) q->queue->deleteFn(n->data); + } + return pqHeapify(q->queue, 0, taosBQSize(q)); } else { - taosPQPush(q->queue, n); + return taosPQPush(q->queue, n); } }