fix(query): add limit/offset for order by operator.
This commit is contained in:
parent
3fba8182a5
commit
facf3c8648
|
@ -744,8 +744,8 @@ typedef struct SSortOperatorInfo {
|
|||
|
||||
int64_t startTs; // sort start time
|
||||
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
|
||||
|
||||
SNode* pCondition;
|
||||
SLimitInfo limitInfo;
|
||||
SNode* pCondition;
|
||||
} SSortOperatorInfo;
|
||||
|
||||
typedef struct STagFilterOperatorInfo {
|
||||
|
@ -785,7 +785,7 @@ int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
|
|||
void cleanupExprSupp(SExprSupp* pSup);
|
||||
int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
|
||||
const char* pkey);
|
||||
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
|
||||
void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows);
|
||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf);
|
||||
int32_t handleLimitOffset(SOperatorInfo *pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
|
||||
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
|
||||
|
|
|
@ -50,7 +50,7 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
|
|||
|
||||
STableListInfo* pTableList = &pTaskInfo->tableqinfoList;
|
||||
|
||||
initResultSizeInfo(pOperator, 1024);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
|
||||
|
||||
|
|
|
@ -3601,13 +3601,13 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
|
||||
void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows) {
|
||||
ASSERT(numOfRows != 0);
|
||||
pOperator->resultInfo.capacity = numOfRows;
|
||||
pOperator->resultInfo.threshold = numOfRows * 0.75;
|
||||
pResultInfo->capacity = numOfRows;
|
||||
pResultInfo->threshold = numOfRows * 0.75;
|
||||
|
||||
if (pOperator->resultInfo.threshold == 0) {
|
||||
pOperator->resultInfo.threshold = numOfRows;
|
||||
if (pResultInfo->threshold == 0) {
|
||||
pResultInfo->threshold = numOfRows;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3670,7 +3670,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
int32_t numOfRows = 1024;
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
|
||||
initResultSizeInfo(pOperator, numOfRows);
|
||||
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
|
||||
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -3825,7 +3825,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
|||
if (numOfRows * pResBlock->info.rowSize > TWOMB) {
|
||||
numOfRows = TWOMB / pResBlock->info.rowSize;
|
||||
}
|
||||
initResultSizeInfo(pOperator, numOfRows);
|
||||
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
|
||||
|
||||
initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
@ -4003,7 +4003,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
|||
numOfRows = TWOMB / pResBlock->info.rowSize;
|
||||
}
|
||||
|
||||
initResultSizeInfo(pOperator, numOfRows);
|
||||
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
|
||||
|
||||
initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
@ -4080,7 +4080,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
int32_t type = convertFillType(pPhyFillNode->mode);
|
||||
|
||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
|
||||
|
||||
int32_t numOfOutputCols = 0;
|
||||
|
|
|
@ -406,7 +406,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
goto _error;
|
||||
}
|
||||
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str);
|
||||
initBasicInfo(&pInfo->binfo, pResultBlock);
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||
|
|
|
@ -41,7 +41,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
|
|||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);
|
||||
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
pInfo->pRes = pResBlock;
|
||||
pOperator->name = "MergeJoinOperator";
|
||||
|
|
|
@ -2299,7 +2299,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
|||
pInfo->pCondition = pScanNode->node.pConditions;
|
||||
pInfo->scanCols = colList;
|
||||
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
tNameAssign(&pInfo->name, &pScanNode->tableName);
|
||||
const char* name = tNameGetTableName(&pInfo->name);
|
||||
|
@ -2529,7 +2529,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
|||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
pOperator->fpSet =
|
||||
|
@ -3073,7 +3073,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
pOperator->info = pInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
initResultSizeInfo(pOperator, 1024);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL,
|
||||
|
|
|
@ -26,7 +26,7 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
|
|||
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) {
|
||||
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) {
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -41,13 +41,15 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
|||
extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||
|
||||
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
initResultSizeInfo(pOperator, 1024);
|
||||
|
||||
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
|
||||
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
|
||||
pInfo->pCondition = pSortNode->node.pConditions;
|
||||
pInfo->pColMatchInfo = pColMatchColInfo;
|
||||
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
|
||||
|
||||
pOperator->name = "SortOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
||||
pOperator->blocking = true;
|
||||
|
@ -208,26 +210,44 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* pBlock = NULL;
|
||||
while (1) {
|
||||
pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
|
||||
pInfo->pColMatchInfo, pInfo);
|
||||
if (pBlock != NULL) {
|
||||
doFilter(pInfo->pCondition, pBlock);
|
||||
}
|
||||
|
||||
pInfo->pColMatchInfo, pInfo);
|
||||
if (pBlock == NULL) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
break;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (blockDataGetNumOfRows(pBlock) > 0) {
|
||||
doFilter(pInfo->pCondition, pBlock);
|
||||
if (blockDataGetNumOfRows(pBlock) == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// todo add the limit/offset info
|
||||
if (pInfo->limitInfo.remainOffset > 0) {
|
||||
if (pInfo->limitInfo.remainOffset >= blockDataGetNumOfRows(pBlock)) {
|
||||
pInfo->limitInfo.remainOffset -= pBlock->info.rows;
|
||||
continue;
|
||||
}
|
||||
|
||||
blockDataTrimFirstNRows(pBlock, pInfo->limitInfo.remainOffset);
|
||||
pInfo->limitInfo.remainOffset = 0;
|
||||
}
|
||||
|
||||
if (pInfo->limitInfo.limit.limit > 0 &&
|
||||
pInfo->limitInfo.limit.limit <= pInfo->limitInfo.numOfOutputRows + blockDataGetNumOfRows(pBlock)) {
|
||||
int32_t remain = pInfo->limitInfo.limit.limit - pInfo->limitInfo.numOfOutputRows;
|
||||
blockDataKeepFirstNRows(pBlock, remain);
|
||||
}
|
||||
|
||||
size_t numOfRows = blockDataGetNumOfRows(pBlock);
|
||||
pInfo->limitInfo.numOfOutputRows += numOfRows;
|
||||
pOperator->resultInfo.totalRows += numOfRows;
|
||||
|
||||
if (numOfRows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pBlock != NULL) {
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
return blockDataGetNumOfRows(pBlock) > 0? pBlock:NULL;
|
||||
}
|
||||
|
||||
void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
|
@ -479,7 +499,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
|
|||
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
initResultSizeInfo(pOperator, 1024);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
|
||||
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
|
||||
;
|
||||
|
@ -711,7 +731,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
|||
extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
|
||||
SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
|
||||
initResultSizeInfo(pOperator, 1024);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
|
||||
pInfo->groupSort = pMergePhyNode->groupSort;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
|
|
@ -1683,7 +1683,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
@ -1764,7 +1764,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
|
|||
int32_t numOfRows = 4096;
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
|
||||
initResultSizeInfo(pOperator, numOfRows);
|
||||
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
|
||||
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
|
||||
|
@ -2224,7 +2224,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
|
||||
pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries);
|
||||
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues);
|
||||
pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
|
@ -2272,7 +2272,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
|||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
|
@ -2320,7 +2320,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
|||
}
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2896,7 +2896,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY);
|
||||
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
if (pIntervalPhyNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
|
||||
|
@ -3072,7 +3072,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
goto _error;
|
||||
}
|
||||
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
if (pSessionNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar);
|
||||
|
@ -4336,7 +4336,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols);
|
||||
|
||||
pInfo->stateCol = extractColumnFromColumnNode(pColNode);
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
if (pStateNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar);
|
||||
|
@ -4586,7 +4586,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
iaInfo->primaryTsIndex = primaryTsSlotId;
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
int32_t code =
|
||||
initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
|
@ -4892,7 +4892,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
|
|||
SExprSupp* pExprSupp = &pOperator->exprSupp;
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
int32_t code = initAggInfo(pExprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
initBasicInfo(&iaInfo->binfo, pResBlock);
|
||||
|
|
Loading…
Reference in New Issue