Merge pull request #22028 from wangjiaming0909/opt/3.0/TD-25006
feat: 1. add limit for diskBasedBuf
This commit is contained in:
commit
693e34ccf3
|
@ -246,7 +246,6 @@ typedef struct SSortLogicNode {
|
|||
SLogicNode node;
|
||||
SNodeList* pSortKeys;
|
||||
bool groupSort;
|
||||
int64_t maxRows;
|
||||
} SSortLogicNode;
|
||||
|
||||
typedef struct SPartitionLogicNode {
|
||||
|
@ -524,7 +523,6 @@ typedef struct SSortPhysiNode {
|
|||
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
|
||||
SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
|
||||
SNodeList* pTargets;
|
||||
int64_t maxRows;
|
||||
} SSortPhysiNode;
|
||||
|
||||
typedef SSortPhysiNode SGroupSortPhysiNode;
|
||||
|
|
|
@ -77,7 +77,7 @@ PriorityQueueNode* taosPQTop(PriorityQueue* pq);
|
|||
|
||||
size_t taosPQSize(PriorityQueue* pq);
|
||||
|
||||
void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node);
|
||||
PriorityQueueNode* taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node);
|
||||
|
||||
void taosPQPop(PriorityQueue* pq);
|
||||
|
||||
|
@ -89,7 +89,13 @@ void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn);
|
|||
|
||||
void destroyBoundedQueue(BoundedQueue* q);
|
||||
|
||||
void taosBQPush(BoundedQueue* q, PriorityQueueNode* n);
|
||||
/*
|
||||
* Push one node into BQ
|
||||
* @retval NULL if n is upper than top node in q, and n is not freed
|
||||
* @retval the pushed Node if pushing succeeded
|
||||
* @note if maxSize exceeded, the original highest node is popped and freed with deleteFn
|
||||
* */
|
||||
PriorityQueueNode* taosBQPush(BoundedQueue* q, PriorityQueueNode* n);
|
||||
|
||||
PriorityQueueNode* taosBQTop(BoundedQueue* q);
|
||||
|
||||
|
|
|
@ -64,8 +64,8 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
|
|||
/**
|
||||
*
|
||||
* @param type
|
||||
* @param maxRows keep maxRows at most
|
||||
* @param maxTupleLength max len of one tuple, for check if heap sort is applicable
|
||||
* @param maxRows keep maxRows at most, if 0, pq sort will not be used
|
||||
* @param maxTupleLength max len of one tuple, for check if pq sort is applicable
|
||||
* @param sortBufSize sort memory buf size, for check if heap sort is applicable
|
||||
* @return
|
||||
*/
|
||||
|
@ -73,6 +73,8 @@ SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pag
|
|||
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
|
||||
uint32_t sortBufSize);
|
||||
|
||||
void tsortSetForceUsePQSort(SSortHandle* pHandle);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param pSortHandle
|
||||
|
|
|
@ -55,7 +55,11 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
|||
pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
|
||||
pInfo->maxRows = pSortNode->maxRows;
|
||||
pInfo->maxRows = -1;
|
||||
if (pSortNode->node.pLimit) {
|
||||
SLimitNode* pLimit = (SLimitNode*)pSortNode->node.pLimit;
|
||||
if (pLimit->limit > 0) pInfo->maxRows = pLimit->limit;
|
||||
}
|
||||
|
||||
int32_t numOfOutputCols = 0;
|
||||
int32_t code =
|
||||
|
@ -718,7 +722,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||
}
|
||||
|
||||
if (p->info.rows > 0) {
|
||||
if (p->info.rows > 0 || limitReached) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ struct SSortHandle {
|
|||
uint64_t maxRows;
|
||||
uint32_t maxTupleLength;
|
||||
uint32_t sortBufSize;
|
||||
bool forceUsePQSort;
|
||||
BoundedQueue* pBoundedQueue;
|
||||
uint32_t tmpRowIdx;
|
||||
|
||||
|
@ -73,7 +74,7 @@ static void* createTuple(uint32_t columnNum, uint32_t tupleLen) {
|
|||
uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen;
|
||||
return taosMemoryCalloc(1, totalLen);
|
||||
}
|
||||
static void destoryTuple(void* t) { taosMemoryFree(t); }
|
||||
static void destoryAllocatedTuple(void* t) { taosMemoryFree(t); }
|
||||
|
||||
#define tupleOffset(tuple, colIdx) ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx))
|
||||
#define tupleSetOffset(tuple, colIdx, offset) (*tupleOffset(tuple, colIdx) = offset)
|
||||
|
@ -107,12 +108,65 @@ static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
|
|||
return t + *tupleOffset(t, colIdx);
|
||||
}
|
||||
|
||||
static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param);
|
||||
|
||||
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
|
||||
return createOneDataBlock(pSortHandle->pDataBlock, false);
|
||||
}
|
||||
|
||||
#define AllocatedTupleType 0
|
||||
#define ReferencedTupleType 1 // tuple references to one row in pDataBlock
|
||||
typedef struct TupleDesc {
|
||||
uint8_t type;
|
||||
char* data; // if type is AllocatedTuple, then points to the created tuple, otherwise points to the DataBlock
|
||||
} TupleDesc;
|
||||
|
||||
typedef struct ReferencedTuple {
|
||||
TupleDesc desc;
|
||||
size_t rowIndex;
|
||||
} ReferencedTuple;
|
||||
|
||||
static TupleDesc* createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx) {
|
||||
TupleDesc* t = taosMemoryCalloc(1, sizeof(TupleDesc));
|
||||
void* pTuple = createTuple(colNum, tupleLen);
|
||||
if (!pTuple) {
|
||||
taosMemoryFree(t);
|
||||
return NULL;
|
||||
}
|
||||
size_t colLen = 0;
|
||||
uint32_t offset = tupleGetDataStartOffset(colNum);
|
||||
for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
|
||||
if (colDataIsNull_s(pCol, rowIdx)) {
|
||||
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen);
|
||||
} else {
|
||||
colLen = colDataGetRowLength(pCol, rowIdx);
|
||||
offset =
|
||||
tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, tupleLen);
|
||||
}
|
||||
}
|
||||
t->type = AllocatedTupleType;
|
||||
t->data = pTuple;
|
||||
return t;
|
||||
}
|
||||
|
||||
void* tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum) {
|
||||
if (pDesc->type == ReferencedTupleType) {
|
||||
ReferencedTuple* pRefTuple = (ReferencedTuple*)pDesc;
|
||||
SColumnInfoData* pCol = taosArrayGet(((SSDataBlock*)pDesc->data)->pDataBlock, colIdx);
|
||||
if (colDataIsNull_s(pCol, pRefTuple->rowIndex)) return NULL;
|
||||
return colDataGetData(pCol, pRefTuple->rowIndex);
|
||||
} else {
|
||||
return tupleGetField(pDesc->data, colIdx, colNum);
|
||||
}
|
||||
}
|
||||
|
||||
void destroyTuple(void* t) {
|
||||
TupleDesc* pDesc = t;
|
||||
if (pDesc->type == AllocatedTupleType) {
|
||||
destoryAllocatedTuple(pDesc->data);
|
||||
taosMemoryFree(pDesc);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param type
|
||||
|
@ -130,11 +184,11 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
|
|||
pSortHandle->loops = 0;
|
||||
|
||||
pSortHandle->maxTupleLength = maxTupleLength;
|
||||
if (maxRows < 0)
|
||||
pSortHandle->sortBufSize = 0;
|
||||
else
|
||||
if (maxRows != 0) {
|
||||
pSortHandle->sortBufSize = sortBufSize;
|
||||
pSortHandle->maxRows = maxRows;
|
||||
pSortHandle->maxRows = maxRows;
|
||||
}
|
||||
pSortHandle->forceUsePQSort = false;
|
||||
|
||||
if (pBlock != NULL) {
|
||||
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
||||
|
@ -779,7 +833,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
|
||||
int64_t el = taosGetTimestampUs() - p;
|
||||
pHandle->sortElapsed += el;
|
||||
|
||||
if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
|
||||
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -804,6 +858,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
return code;
|
||||
}
|
||||
|
||||
if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
|
||||
int64_t el = taosGetTimestampUs() - p;
|
||||
pHandle->sortElapsed += el;
|
||||
|
||||
|
@ -936,8 +991,17 @@ static STupleHandle* tsortBufMergeSortNextTuple(SSortHandle* pHandle) {
|
|||
return &pHandle->tupleHandle;
|
||||
}
|
||||
|
||||
static bool tsortIsForceUsePQSort(SSortHandle* pHandle) {
|
||||
return pHandle->forceUsePQSort == true;
|
||||
}
|
||||
|
||||
void tsortSetForceUsePQSort(SSortHandle* pHandle) {
|
||||
pHandle->forceUsePQSort = true;
|
||||
}
|
||||
|
||||
static bool tsortIsPQSortApplicable(SSortHandle* pHandle) {
|
||||
if (pHandle->type != SORT_SINGLESOURCE_SORT) return false;
|
||||
if (tsortIsForceUsePQSort(pHandle)) return true;
|
||||
uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*));
|
||||
return maxRowsFitInMemory > pHandle->maxRows;
|
||||
}
|
||||
|
@ -956,16 +1020,17 @@ static bool tsortPQComFnReverse(void*a, void* b, void* param) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param) {
|
||||
char* pLTuple = (char*)pLeft;
|
||||
char* pRTuple = (char*)pRight;
|
||||
static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) {
|
||||
TupleDesc* pLeftDesc = (TupleDesc*)pLeft;
|
||||
TupleDesc* pRightDesc = (TupleDesc*)pRight;
|
||||
|
||||
SSortHandle* pHandle = (SSortHandle*)param;
|
||||
SArray* orderInfo = (SArray*)pHandle->pSortInfo;
|
||||
uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
|
||||
for (int32_t i = 0; i < orderInfo->size; ++i) {
|
||||
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i);
|
||||
void *lData = tupleGetField(pLTuple, pOrder->slotId, colNum);
|
||||
void *rData = tupleGetField(pRTuple, pOrder->slotId, colNum);
|
||||
void *lData = tupleDescGetField(pLeftDesc, pOrder->slotId, colNum);
|
||||
void *rData = tupleDescGetField(pRightDesc, pOrder->slotId, colNum);
|
||||
if (!lData && !rData) continue;
|
||||
if (!lData) return pOrder->nullFirst ? -1 : 1;
|
||||
if (!rData) return pOrder->nullFirst ? 1 : -1;
|
||||
|
@ -984,9 +1049,9 @@ static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* para
|
|||
}
|
||||
|
||||
static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
|
||||
pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destoryTuple, pHandle);
|
||||
pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destroyTuple, pHandle);
|
||||
if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
tsortSetComparFp(pHandle, colDataComparFn);
|
||||
tsortSetComparFp(pHandle, tupleComparFn);
|
||||
|
||||
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
|
||||
SSortSource* source = *pSource;
|
||||
|
@ -1018,24 +1083,17 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
|
|||
}
|
||||
}
|
||||
}
|
||||
size_t colLen = 0;
|
||||
ReferencedTuple refTuple = {.desc.data = (char*)pBlock, .desc.type = ReferencedTupleType, .rowIndex = 0};
|
||||
for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) {
|
||||
void* pTuple = createTuple(colNum, tupleLen);
|
||||
if (pTuple == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
uint32_t offset = tupleGetDataStartOffset(colNum);
|
||||
for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
|
||||
if (colDataIsNull_s(pCol, rowIdx)) {
|
||||
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen);
|
||||
} else {
|
||||
colLen = colDataGetRowLength(pCol, rowIdx);
|
||||
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false,
|
||||
tupleLen);
|
||||
}
|
||||
refTuple.rowIndex = rowIdx;
|
||||
pqNode.data = &refTuple;
|
||||
PriorityQueueNode* pPushedNode = taosBQPush(pHandle->pBoundedQueue, &pqNode);
|
||||
if (!pPushedNode) {
|
||||
// do nothing if push failed
|
||||
} else {
|
||||
pPushedNode->data = createAllocatedTuple(pBlock, colNum, tupleLen, rowIdx);
|
||||
if (pPushedNode->data == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pqNode.data = pTuple;
|
||||
taosBQPush(pHandle->pBoundedQueue, &pqNode);
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1044,7 +1102,7 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
|
|||
static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) {
|
||||
blockDataCleanup(pHandle->pDataBlock);
|
||||
blockDataEnsureCapacity(pHandle->pDataBlock, 1);
|
||||
// abondan the top tuple if queue size bigger than max size
|
||||
// abandon the top tuple if queue size bigger than max size
|
||||
if (taosBQSize(pHandle->pBoundedQueue) == taosBQMaxSize(pHandle->pBoundedQueue) + 1) {
|
||||
taosBQPop(pHandle->pBoundedQueue);
|
||||
}
|
||||
|
@ -1056,7 +1114,7 @@ static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) {
|
|||
if (taosBQSize(pHandle->pBoundedQueue) > 0) {
|
||||
uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
|
||||
PriorityQueueNode* node = taosBQTop(pHandle->pBoundedQueue);
|
||||
char* pTuple = (char*)node->data;
|
||||
char* pTuple = ((TupleDesc*)node->data)->data;
|
||||
|
||||
for (uint32_t i = 0; i < colNum; ++i) {
|
||||
void* pData = tupleGetField(pTuple, i, colNum);
|
||||
|
|
|
@ -502,7 +502,6 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) {
|
|||
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
||||
CLONE_NODE_LIST_FIELD(pSortKeys);
|
||||
COPY_SCALAR_FIELD(groupSort);
|
||||
COPY_SCALAR_FIELD(maxRows);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -2115,9 +2115,6 @@ static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkSortPhysiPlanTargets, pNode->pTargets);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkSortPhysiPlanMaxRows, pNode->maxRows);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2135,9 +2132,6 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkSortPhysiPlanTargets, &pNode->pTargets);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBigIntValue(pJson, jkSortPhysiPlanMaxRows, &pNode->maxRows);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -2594,7 +2594,7 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS, PHY_SORT_CODE_MAX_ROWS };
|
||||
enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS };
|
||||
|
||||
static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
|
||||
|
@ -2609,9 +2609,6 @@ static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeI64(pEncoder, PHY_SORT_CODE_MAX_ROWS, pNode->maxRows);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2635,9 +2632,6 @@ static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_SORT_CODE_TARGETS:
|
||||
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
|
||||
break;
|
||||
case PHY_SORT_CODE_MAX_ROWS:
|
||||
code = tlvDecodeI64(pTlv, &pNode->maxRows);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -1027,7 +1027,6 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pSort->maxRows = -1;
|
||||
pSort->groupSort = pSelect->groupSort;
|
||||
pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
|
||||
pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
|
||||
|
@ -1299,7 +1298,6 @@ static int32_t createSetOpSortLogicNode(SLogicPlanContext* pCxt, SSetOperator* p
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pSort->maxRows = -1;
|
||||
TSWAP(pSort->node.pLimit, pSetOperator->pLimit);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -2635,11 +2635,13 @@ static bool pushDownLimitOptShouldBeOptimized(SLogicNode* pNode) {
|
|||
}
|
||||
|
||||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
|
||||
// push down to sort node
|
||||
if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
|
||||
SLimitNode* pChildLimit = (SLimitNode*)(pChild->pLimit);
|
||||
// if we have pushed down, we skip it
|
||||
if ((*(SSortLogicNode*)pChild).maxRows != -1) return false;
|
||||
} else if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild)) {
|
||||
if (pChild->pLimit) return false;
|
||||
} else if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild) || QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pNode)) {
|
||||
// push down to table scan node
|
||||
// if pNode is sortNode, we skip push down limit info to table scan node
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
@ -2654,13 +2656,10 @@ static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
|
|||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
|
||||
nodesDestroyNode(pChild->pLimit);
|
||||
if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
|
||||
SLimitNode* pLimitNode = (SLimitNode*)pNode->pLimit;
|
||||
int64_t maxRows = -1;
|
||||
if (pLimitNode->limit != -1) {
|
||||
maxRows = pLimitNode->limit;
|
||||
if (pLimitNode->offset != -1) maxRows += pLimitNode->offset;
|
||||
}
|
||||
((SSortLogicNode*)pChild)->maxRows = maxRows;
|
||||
pChild->pLimit = nodesCloneNode(pNode->pLimit);
|
||||
SLimitNode* pLimit = (SLimitNode*)pChild->pLimit;
|
||||
pLimit->limit += pLimit->offset;
|
||||
pLimit->offset = 0;
|
||||
} else {
|
||||
pChild->pLimit = pNode->pLimit;
|
||||
pNode->pLimit = NULL;
|
||||
|
|
|
@ -1374,7 +1374,6 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
|
|||
if (NULL == pSort) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pSort->maxRows = pSortLogicNode->maxRows;
|
||||
|
||||
SNodeList* pPrecalcExprs = NULL;
|
||||
SNodeList* pSortKeys = NULL;
|
||||
|
|
|
@ -1018,7 +1018,6 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut
|
|||
splSetParent((SLogicNode*)pPartSort);
|
||||
pPartSort->pSortKeys = pSortKeys;
|
||||
pPartSort->groupSort = pSort->groupSort;
|
||||
pPartSort->maxRows = pSort->maxRows;
|
||||
code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
|
||||
}
|
||||
|
||||
|
|
|
@ -230,7 +230,7 @@ static void pqSwapPQNode(PriorityQueueNode* a, PriorityQueueNode* b) {
|
|||
|
||||
size_t taosPQSize(PriorityQueue* pq) { return pqContainerSize(pq); }
|
||||
|
||||
static void pqHeapify(PriorityQueue* pq, size_t from, size_t last) {
|
||||
static PriorityQueueNode* pqHeapify(PriorityQueue* pq, size_t from, size_t last) {
|
||||
size_t largest = from;
|
||||
do {
|
||||
from = largest;
|
||||
|
@ -246,6 +246,7 @@ static void pqHeapify(PriorityQueue* pq, size_t from, size_t last) {
|
|||
pqSwapPQNode(pqContainerGetEle(pq, from), pqContainerGetEle(pq, largest));
|
||||
}
|
||||
} while (largest != from);
|
||||
return pqContainerGetEle(pq, largest);
|
||||
}
|
||||
|
||||
static void pqBuildHeap(PriorityQueue* pq) {
|
||||
|
@ -257,12 +258,13 @@ static void pqBuildHeap(PriorityQueue* pq) {
|
|||
}
|
||||
}
|
||||
|
||||
static void pqReverseHeapify(PriorityQueue* pq, size_t i) {
|
||||
static PriorityQueueNode* pqReverseHeapify(PriorityQueue* pq, size_t i) {
|
||||
while (i > 0 && !pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) {
|
||||
size_t parentIdx = pqParent(i);
|
||||
pqSwapPQNode(pqContainerGetEle(pq, i), pqContainerGetEle(pq, parentIdx));
|
||||
i = parentIdx;
|
||||
}
|
||||
return pqContainerGetEle(pq, i);
|
||||
}
|
||||
|
||||
static void pqUpdate(PriorityQueue* pq, size_t i) {
|
||||
|
@ -290,9 +292,9 @@ PriorityQueueNode* taosPQTop(PriorityQueue* pq) {
|
|||
return pqContainerGetEle(pq, 0);
|
||||
}
|
||||
|
||||
void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node) {
|
||||
PriorityQueueNode* taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node) {
|
||||
taosArrayPush(pq->container, node);
|
||||
pqReverseHeapify(pq, pqContainerSize(pq) - 1);
|
||||
return pqReverseHeapify(pq, pqContainerSize(pq) - 1);
|
||||
}
|
||||
|
||||
void taosPQPop(PriorityQueue* pq) {
|
||||
|
@ -324,16 +326,20 @@ void destroyBoundedQueue(BoundedQueue* q) {
|
|||
taosMemoryFree(q);
|
||||
}
|
||||
|
||||
void taosBQPush(BoundedQueue* q, PriorityQueueNode* n) {
|
||||
PriorityQueueNode* taosBQPush(BoundedQueue* q, PriorityQueueNode* n) {
|
||||
if (pqContainerSize(q->queue) == q->maxSize + 1) {
|
||||
PriorityQueueNode* top = pqContainerGetEle(q->queue, 0);
|
||||
void *p = top->data;
|
||||
top->data = n->data;
|
||||
n->data = p;
|
||||
if (q->queue->deleteFn) q->queue->deleteFn(n->data);
|
||||
pqHeapify(q->queue, 0, taosBQSize(q));
|
||||
if (q->queue->fn(top->data, n->data, q->queue->param)) {
|
||||
return NULL;
|
||||
} else {
|
||||
void* p = top->data;
|
||||
top->data = n->data;
|
||||
n->data = p;
|
||||
if (q->queue->deleteFn) q->queue->deleteFn(n->data);
|
||||
}
|
||||
return pqHeapify(q->queue, 0, taosBQSize(q));
|
||||
} else {
|
||||
taosPQPush(q->queue, n);
|
||||
return taosPQPush(q->queue, n);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue