From fba43e17480b3423bae78c5e9ca638449bf58efb Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Thu, 3 Aug 2023 18:05:52 +0800 Subject: [PATCH] feature: optimize interval with limit --- source/libs/executor/inc/executorInt.h | 9 + source/libs/executor/src/executil.c | 3 +- source/libs/executor/src/operator.c | 1 - source/libs/executor/src/scanoperator.c | 47 ++-- source/libs/executor/src/timewindowoperator.c | 92 +++++- source/libs/planner/src/planLogicCreater.c | 1 - source/libs/planner/src/planOptimizer.c | 126 +++++++-- source/libs/planner/src/planSpliter.c | 12 + tests/parallel_test/cases.task | 1 + .../tsim/query/r/explain_tsorder.result | 212 +++++++------- .../system-test/2-query/interval_limit_opt.py | 266 ++++++++++++++++++ 11 files changed, 607 insertions(+), 163 deletions(-) create mode 100644 tests/system-test/2-query/interval_limit_opt.py diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 5baf0978cd..1bff9fce9e 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -25,6 +25,7 @@ extern "C" { #include "tsort.h" #include "ttszip.h" #include "tvariant.h" +#include "theap.h" #include "dataSinkMgt.h" #include "executil.h" @@ -418,6 +419,14 @@ typedef struct SIntervalAggOperatorInfo { EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] STimeWindowAggSupp twAggSup; SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation. + // for limit optimization + bool limited; + int64_t limit; + bool slimited; + int64_t slimit; + uint64_t curGroupId; // initialize to UINT64_MAX + uint64_t handledGroupNum; + BoundedQueue* pBQ; } SIntervalAggOperatorInfo; typedef struct SMergeAlignedIntervalAggOperatorInfo { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index e1bf4e7cb0..aa0c7945b0 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2118,8 +2118,9 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* if (code != TSDB_CODE_SUCCESS) { return code; } + if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList); - if (groupSort) { + if (groupSort || pScanNode->groupOrderScan) { code = sortTableGroup(pTableListInfo); } } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 2db5ea2f1e..8ddcc8fd15 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -275,7 +275,6 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR SNode* pTagIndexCond, const char* pUser, const char* dbname) { int32_t type = nodeType(pPhyNode); const char* idstr = GET_TASKID(pTaskInfo); - if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { SOperatorInfo* pOperator = NULL; if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6c8d9ed59f..b15cf56c3d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -848,30 +848,29 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return result; } - if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { - setOperatorCompleted(pOperator); - return NULL; + while (1) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { + setOperatorCompleted(pOperator); + return NULL; + } + + // reset value for the next group data output + pOperator->status = OP_OPENED; + resetLimitInfoForNextGroup(&pInfo->base.limitInfo); + + int32_t num = 0; + STableKeyInfo* pList = NULL; + tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); + + pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, pList, num); + pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); + pInfo->scanTimes = 0; + + result = doGroupedTableScan(pOperator); + if (result != NULL) { + return result; + } } - - // reset value for the next group data output - pOperator->status = OP_OPENED; - resetLimitInfoForNextGroup(&pInfo->base.limitInfo); - - int32_t num = 0; - STableKeyInfo* pList = NULL; - tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); - - pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, pList, num); - pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); - pInfo->scanTimes = 0; - - result = doGroupedTableScan(pOperator); - if (result != NULL) { - return result; - } - - setOperatorCompleted(pOperator); - return NULL; } } @@ -3551,4 +3550,4 @@ static void destoryTableCountScanOperator(void* param) { taosArrayDestroy(pTableCountScanInfo->stbUidList); taosMemoryFreeClear(param); -} \ No newline at end of file +} diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0a46def23d..4f8a3acd15 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -876,7 +876,67 @@ bool needDeleteWindowBuf(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) { return pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark; } -static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, +static bool tsKeyCompFn(void* l, void* r, void* param) { + TSKEY* lTS = (TSKEY*)l; + TSKEY* rTS = (TSKEY*)r; + SIntervalAggOperatorInfo* pInfo = param; + return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS; +} + +static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) { + char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0}; + SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId); + return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL; +} + +/** + * @brief check if cur window should be filtered out by limit info + * @retval true if should be filtered out + * @retval false if not filtering out + * @note If no limit info, we skip filtering. + * If input/output ts order mismatch, we skip filtering too. + * eg. input ts order: desc, and output ts order: asc, limit: 10 + * IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan + * every tuple in every block. + * And the boundedQueue keeps refreshing all records with smaller ts key. + */ +static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId) { + if (!pOperatorInfo->limited // if no limit info, no filter will be applied + || pOperatorInfo->binfo.inputTsOrder != + pOperatorInfo->binfo.outputTsOrder // if input/output ts order mismatch, no filter + ) { + return false; + } + if (pOperatorInfo->limit == 0) return true; + + if (pOperatorInfo->pBQ == NULL) { + pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosMemoryFree, pOperatorInfo); + } + + bool shouldFilter = false; + // if BQ has been full, compare it with top of BQ + if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) { + PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ); + shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo); + } + if (shouldFilter) { + return true; + } else if (isCalculatedWin(pOperatorInfo, win, groupId)) { + return false; + } + + // cur win not been filtered out and not been pushed into BQ yet, push it into BQ + PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))}; + *((TSKEY*)node.data) = win->skey; + + if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) { + taosMemoryFree(node.data); + return true; + } + return false; +} + +static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, int32_t scanFlag) { SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; @@ -891,8 +951,21 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); SResultRow* pResult = NULL; + if (tableGroupId != pInfo->curGroupId) { + pInfo->handledGroupNum += 1; + if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) { + return true; + } else { + pInfo->curGroupId = tableGroupId; + destroyBoundedQueue(pInfo->pBQ); + pInfo->pBQ = NULL; + } + } + STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder); + if (filterWindowWithLimit(pInfo, &win, tableGroupId)) return false; + int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { @@ -929,7 +1002,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul while (1) { int32_t prevEndPos = forwardRows - 1 + startPos; startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->binfo.inputTsOrder); - if (startPos < 0) { + if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId)) { break; } // null data, failed to allocate more memory buffer @@ -963,6 +1036,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if (pInfo->timeWindowInterpo) { saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols); } + return false; } void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) { @@ -1043,7 +1117,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true); - hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag); + if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break; } initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder); @@ -1495,6 +1569,7 @@ void destroyIntervalOperatorInfo(void* param) { cleanupGroupResInfo(&pInfo->groupResInfo); colDataDestroy(&pInfo->twAggSup.timeWindowData); + destroyBoundedQueue(pInfo->pBQ); taosMemoryFreeClear(param); } @@ -1658,6 +1733,17 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh pInfo->interval = interval; pInfo->twAggSup = as; pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock; + if (pPhyNode->window.node.pLimit) { + SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit; + pInfo->limited = true; + pInfo->limit = pLimit->limit + pLimit->offset; + } + if (pPhyNode->window.node.pSlimit) { + SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit; + pInfo->slimited = true; + pInfo->slimit = pLimit->limit + pLimit->offset; + pInfo->curGroupId = UINT64_MAX; + } if (pPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 713f12e229..854f3fc4c6 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -847,7 +847,6 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva : (pSelect->hasTimeLineFunc ? getRequireDataOrder(true, pSelect) : DATA_ORDER_LEVEL_IN_BLOCK); pWindow->node.resultDataOrder = pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_GLOBAL : getRequireDataOrder(true, pSelect); - pWindow->pTspk = nodesCloneNode(pInterval->pCol); if (NULL == pWindow->pTspk) { nodesDestroyNode((SNode*)pWindow); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 32721d8060..b2f71adbd7 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -368,7 +368,7 @@ static void scanPathOptSetGroupOrderScan(SScanLogicNode* pScan) { if (pScan->node.pParent && nodeType(pScan->node.pParent) == QUERY_NODE_LOGIC_PLAN_AGG) { SAggLogicNode* pAgg = (SAggLogicNode*)pScan->node.pParent; - bool withSlimit = pAgg->node.pSlimit != NULL || (pAgg->node.pParent && pAgg->node.pParent->pSlimit); + bool withSlimit = pAgg->node.pSlimit != NULL || (pAgg->node.pParent && pAgg->node.pParent->pSlimit); if (withSlimit && isPartTableAgg(pAgg)) { pScan->groupOrderScan = pAgg->node.forceCreateNonBlockingOptr = true; } @@ -1546,11 +1546,33 @@ static bool planOptNodeListHasTbname(SNodeList* pKeys) { } static bool partTagsIsOptimizableNode(SLogicNode* pNode) { - return ((QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) || - (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && NULL != ((SAggLogicNode*)pNode)->pGroupKeys && - NULL != ((SAggLogicNode*)pNode)->pAggFuncs)) && - 1 == LIST_LENGTH(pNode->pChildren) && - QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0))); + bool ret = 1 == LIST_LENGTH(pNode->pChildren) && + QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0)); + if (!ret) return ret; + switch (nodeType(pNode)) { + case QUERY_NODE_LOGIC_PLAN_PARTITION: { + if (pNode->pParent && nodeType(pNode->pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) { + SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode->pParent; + if (pWindow->winType == WINDOW_TYPE_INTERVAL) { + // if interval has slimit, we push down partition node to scan, and scan will set groupOrderScan to true + // we want to skip groups of blocks after slimit satisfied + // if interval only has limit, we do not push down partition node to scan + // we want to get grouped output from partition node and make use of limit + // if no slimit and no limit, we push down partition node and groupOrderScan is false, cause we do not need + // group ordered output + if (!pWindow->node.pSlimit && pWindow->node.pLimit) ret = false; + } + } + } break; + case QUERY_NODE_LOGIC_PLAN_AGG: { + SAggLogicNode* pAgg = (SAggLogicNode*)pNode; + ret = pAgg->pGroupKeys && pAgg->pAggFuncs; + } break; + default: + ret = false; + break; + } + return ret; } static SNodeList* partTagsGetPartKeys(SLogicNode* pNode) { @@ -1691,6 +1713,8 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub scanPathOptSetGroupOrderScan(pScan); pParent->hasGroupKeyOptimized = true; } + if (pNode->pParent->pSlimit) + pScan->groupOrderScan = true; NODES_CLEAR_LIST(pNode->pChildren); nodesDestroyNode((SNode*)pNode); @@ -2644,23 +2668,79 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp } static bool pushDownLimitOptShouldBeOptimized(SLogicNode* pNode) { - if (NULL == pNode->pLimit || 1 != LIST_LENGTH(pNode->pChildren)) { + if ((NULL == pNode->pLimit && pNode->pSlimit == NULL) || 1 != LIST_LENGTH(pNode->pChildren)) { return false; } SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0); - // push down to sort node - if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) { - // if we have pushed down, we skip it - if (pChild->pLimit) return false; - } else if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild) || QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pNode)) { - // push down to table scan node - // if pNode is sortNode, we skip push down limit info to table scan node - return false; - } + if (pChild->pLimit || pChild->pSlimit) return false; return true; } +static void swapLimit(SLogicNode* pParent, SLogicNode* pChild) { + pChild->pLimit = pParent->pLimit; + pParent->pLimit = NULL; +} + +static void cloneLimit(SLogicNode* pParent, SLogicNode* pChild) { + SLimitNode* pLimit = NULL; + if (pParent->pLimit) { + pChild->pLimit = nodesCloneNode(pParent->pLimit); + pLimit = (SLimitNode*)pChild->pLimit; + pLimit->limit += pLimit->offset; + pLimit->offset = 0; + } + + if (pParent->pSlimit) { + pChild->pSlimit = nodesCloneNode(pParent->pSlimit); + pLimit = (SLimitNode*)pChild->pSlimit; + pLimit->limit += pLimit->offset; + pLimit->offset = 0; + } +} + +static bool pushDownLimitHow(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPushTo); +static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPushTo) { + switch (nodeType(pNodeLimitPushTo)) { + case QUERY_NODE_LOGIC_PLAN_WINDOW: { + SWindowLogicNode* pWindow = (SWindowLogicNode*)pNodeLimitPushTo; + if (pWindow->winType != WINDOW_TYPE_INTERVAL) break; + cloneLimit(pNodeWithLimit, pNodeLimitPushTo); + return true; + } + case QUERY_NODE_LOGIC_PLAN_FILL: + case QUERY_NODE_LOGIC_PLAN_SORT: { + cloneLimit(pNodeWithLimit, pNodeLimitPushTo); + SNode* pChild = NULL; + FOREACH(pChild, pNodeLimitPushTo->pChildren) { pushDownLimitHow(pNodeLimitPushTo, (SLogicNode*)pChild); } + return true; + } + case QUERY_NODE_LOGIC_PLAN_SCAN: + if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT && pNodeWithLimit->pLimit) { + swapLimit(pNodeWithLimit, pNodeLimitPushTo); + return true; + } + default: + break; + } + return false; +} + +static bool pushDownLimitHow(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPushTo) { + switch (nodeType(pNodeWithLimit)) { + case QUERY_NODE_LOGIC_PLAN_PROJECT: + case QUERY_NODE_LOGIC_PLAN_FILL: + return pushDownLimitTo(pNodeWithLimit, pNodeLimitPushTo); + case QUERY_NODE_LOGIC_PLAN_SORT: { + SSortLogicNode* pSort = (SSortLogicNode*)pNodeWithLimit; + if (sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys)) return pushDownLimitTo(pNodeWithLimit, pNodeLimitPushTo); + } + default: + break; + } + return false; +} + static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, pushDownLimitOptShouldBeOptimized); if (NULL == pNode) { @@ -2669,17 +2749,9 @@ static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0); nodesDestroyNode(pChild->pLimit); - if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) { - pChild->pLimit = nodesCloneNode(pNode->pLimit); - SLimitNode* pLimit = (SLimitNode*)pChild->pLimit; - pLimit->limit += pLimit->offset; - pLimit->offset = 0; - } else { - pChild->pLimit = pNode->pLimit; - pNode->pLimit = NULL; + if (pushDownLimitHow(pNode, pChild)) { + pCxt->optimized = true; } - pCxt->optimized = true; - return TSDB_CODE_SUCCESS; } @@ -2980,6 +3052,7 @@ static const SOptimizeRule optimizeRuleSet[] = { {.pName = "sortNonPriKeyOptimize", .optimizeFunc = sortNonPriKeyOptimize}, {.pName = "SortPrimaryKey", .optimizeFunc = sortPrimaryKeyOptimize}, {.pName = "SmaIndex", .optimizeFunc = smaIndexOptimize}, + {.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize}, {.pName = "PartitionTags", .optimizeFunc = partTagsOptimize}, {.pName = "MergeProjects", .optimizeFunc = mergeProjectsOptimize}, {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}, @@ -2988,7 +3061,6 @@ static const SOptimizeRule optimizeRuleSet[] = { {.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize}, {.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize}, {.pName = "TagScan", .optimizeFunc = tagScanOptimize}, - {.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize}, {.pName = "TableCountScan", .optimizeFunc = tableCountScanOptimize}, }; // clang-format on diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 84a486649e..3f6c73b4e5 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -498,6 +498,18 @@ static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* p } break; } + case QUERY_NODE_LOGIC_PLAN_WINDOW: { + SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode; + if (pMerge->node.pLimit) { + nodesDestroyNode(pMerge->node.pLimit); + pMerge->node.pLimit = NULL; + } + if (pMerge->node.pSlimit) { + nodesDestroyNode(pMerge->node.pSlimit); + pMerge->node.pSlimit = NULL; + } + break; + } default: break; } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 9ec12197ec..dc72c67402 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -25,6 +25,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py diff --git a/tests/script/tsim/query/r/explain_tsorder.result b/tests/script/tsim/query/r/explain_tsorder.result index b69a77ada5..95f8c38fbf 100644 --- a/tests/script/tsim/query/r/explain_tsorder.result +++ b/tests/script/tsim/query/r/explain_tsorder.result @@ -1603,58 +1603,58 @@ QUERY_PLAN: Time Range: [-9223372036854775808, taos> select _wstart, last(ts), avg(c2) from meters interval(10s) order by _wstart desc; _wstart | last(ts) | avg(c2) | ================================================================================ - 2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000 | - 2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000 | - 2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000 | - 2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000 | - 2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000 | - 2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000 | - 2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000 | - 2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000 | - 2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000 | - 2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000 | + 2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000000000 | + 2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000000000 | + 2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000000000 | + 2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000000000 | + 2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000000000 | + 2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000000000 | + 2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000000000 | + 2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000000000 | + 2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000000000 | + 2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000000000 | taos> select _wstart, last(ts), avg(c2) from meters interval(10s) order by _wstart asc; _wstart | last(ts) | avg(c2) | ================================================================================ - 2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000 | - 2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000 | - 2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000 | - 2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000 | - 2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000 | - 2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000 | - 2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000 | - 2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000 | - 2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000 | - 2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000 | + 2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000000000 | + 2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000000000 | + 2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000000000 | + 2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000000000 | + 2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000000000 | + 2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000000000 | + 2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000000000 | + 2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000000000 | + 2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000000000 | + 2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000000000 | taos> select _wstart, first(ts), avg(c2) from meters interval(10s) order by _wstart asc; _wstart | first(ts) | avg(c2) | ================================================================================ - 2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000 | - 2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000 | - 2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000 | - 2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000 | - 2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000 | - 2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000 | - 2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000 | - 2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000 | - 2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000 | - 2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000 | + 2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000000000 | + 2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000000000 | + 2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000000000 | + 2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000000000 | + 2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000000000 | + 2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000000000 | + 2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000000000 | + 2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000000000 | + 2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000000000 | + 2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000000000 | taos> select _wstart, first(ts), avg(c2) from meters interval(10s) order by _wstart desc; _wstart | first(ts) | avg(c2) | ================================================================================ - 2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000 | - 2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000 | - 2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000 | - 2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000 | - 2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000 | - 2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000 | - 2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000 | - 2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000 | - 2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000 | - 2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000 | + 2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000000000 | + 2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000000000 | + 2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000000000 | + 2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000000000 | + 2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000000000 | + 2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000000000 | + 2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000000000 | + 2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000000000 | + 2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000000000 | + 2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000000000 | taos> select last(a) as d from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s)) order by d; d | @@ -1792,35 +1792,35 @@ taos> select last(b) as d from (select last(ts) as b, avg(c2) as c from meters i taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by d desc; _wstart | d | avg(c) | ================================================================================ - 2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000 | - 2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000 | - 2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000 | - 2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000 | - 2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000 | - 2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.600000000 | - 2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 | - 2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 | - 2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 | - 2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 | - 2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 | - 2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000 | - 2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000 | - 2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000 | - 2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000 | - 2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000 | - 2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.400000000 | - 2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000 | - 2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.800000000 | - 2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000 | - 2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000 | - 2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.800000000 | - 2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000 | - 2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.600000000 | - 2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000 | - 2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000 | - 2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 | - 2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 | - 2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 | + 2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000000000 | + 2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000000000 | + 2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000000000 | + 2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000000000 | + 2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000000000 | + 2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.599999999999994 | + 2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.199999999999989 | + 2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000000011 | + 2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000000006 | + 2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000000000 | + 2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000000000 | + 2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000000000 | + 2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000000000 | + 2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000000000 | + 2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000000000 | + 2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000000003 | + 2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.399999999999999 | + 2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000000001 | + 2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.799999999999997 | + 2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000000000 | + 2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000000006 | + 2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.799999999999997 | + 2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000000003 | + 2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.599999999999994 | + 2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000000000 | + 2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000000000 | + 2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000000000 | + 2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000000000 | + 2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000000000 | taos> explain verbose true select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by d desc\G; *************************** 1.row *************************** @@ -2673,51 +2673,51 @@ taos> select ts, c2 from d1 order by ts asc, c2 desc limit 5,5; taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc; _wstart | d | avg(c) | ================================================================================ - 2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 | - 2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 | - 2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 | - 2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 | - 2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 | - 2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 | - 2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 | - 2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 | - 2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000 | - 2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000 | - 2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.600000000 | - 2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000 | - 2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000 | - 2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.600000000 | - 2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000 | - 2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000 | - 2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000 | - 2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000 | - 2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.800000000 | - 2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000 | - 2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000 | - 2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000 | - 2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.800000000 | - 2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000 | - 2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.400000000 | - 2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000 | - 2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000 | - 2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000 | - 2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000 | + 2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000000000 | + 2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000000000 | + 2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000000006 | + 2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000000000 | + 2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000000000 | + 2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000000011 | + 2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000000000 | + 2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.199999999999989 | + 2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000000000 | + 2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000000000 | + 2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.599999999999994 | + 2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000000000 | + 2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000000000 | + 2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.599999999999994 | + 2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000000000 | + 2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000000003 | + 2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000000000 | + 2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000000000 | + 2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.799999999999997 | + 2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000000006 | + 2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000000000 | + 2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000000000 | + 2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.799999999999997 | + 2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000000001 | + 2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.399999999999999 | + 2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000000003 | + 2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000000000 | + 2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000000000 | + 2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000000000 | taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2; _wstart | d | avg(c) | ================================================================================ - 2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 | - 2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 | + 2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000000000 | + 2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000000000 | taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2,6; _wstart | d | avg(c) | ================================================================================ - 2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 | - 2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 | - 2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 | - 2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 | - 2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 | - 2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 | + 2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000000006 | + 2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000000000 | + 2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000000000 | + 2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000000011 | + 2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000000000 | + 2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.199999999999989 | taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 10; last(ts) | d | diff --git a/tests/system-test/2-query/interval_limit_opt.py b/tests/system-test/2-query/interval_limit_opt.py new file mode 100644 index 0000000000..fef6e9facd --- /dev/null +++ b/tests/system-test/2-query/interval_limit_opt.py @@ -0,0 +1,266 @@ +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +# from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 4 + self.ctbNum = 10 + self.rowsPerTbl = 10000 + self.duraion = '1h' + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'): + if dropFlag == 1: + tsql.execute("drop database if exists %s"%(dbName)) + + tsql.execute("create database if not exists %s vgroups %d replica %d duration %s"%(dbName, vgroups, replica, duration)) + tdLog.debug("complete to create database %s"%(dbName)) + return + + def create_stable(self,tsql, paraDict): + colString = tdCom.gen_column_type_str(colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"]) + tagString = tdCom.gen_tag_type_str(tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"]) + sqlString = f"create table if not exists %s.%s (%s) tags (%s)"%(paraDict["dbName"], paraDict["stbName"], colString, tagString) + tdLog.debug("%s"%(sqlString)) + tsql.execute(sqlString) + return + + def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0): + for i in range(ctbNum): + sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % \ + (dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,(i+ctbStartIdx) % 5,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx) + tsql.execute(sqlString) + + tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName)) + return + + def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + for i in range(ctbNum): + rowsBatched = 0 + sql += " %s%d values "%(ctbPrefix,i) + for j in range(rowsPerTbl): + if (i < ctbNum/2): + sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10, j%10, j%10) + else: + sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10) + rowsBatched += 1 + if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + rowsBatched = 0 + if j < rowsPerTbl - 1: + sql = "insert into %s%d values " %(ctbPrefix,i) + else: + sql = "insert into " + if sql != pre_insert: + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'test', + 'dropFlag': 1, + 'vgroups': 2, + 'stbName': 'meters', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'FLOAT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'smallint', 'count':1},{'type': 'tinyint', 'count':1},{'type': 'bool', 'count':1},{'type': 'binary', 'len':10, 'count':1},{'type': 'nchar', 'len':10, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}], + 'ctbPrefix': 't', + 'ctbStartIdx': 0, + 'ctbNum': 100, + 'rowsPerTbl': 10000, + 'batchNum': 3000, + 'startTs': 1537146000000, + 'tsStep': 600000} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdLog.info("create database") + self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion) + + tdLog.info("create stb") + self.create_stable(tsql=tdSql, paraDict=paraDict) + + tdLog.info("create child tables") + self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"], \ + stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],\ + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict["ctbStartIdx"]) + self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],\ + ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\ + rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\ + startTs=paraDict["startTs"],tsStep=paraDict["tsStep"]) + return + + def check_first_rows(self, all_rows, limited_rows, offset: int = 0): + for i in range(0, len(limited_rows) - 1): + if limited_rows[i] != all_rows[i + offset]: + tdLog.info("row: %d, row in all: %s" % (i+offset+1, str(all_rows[i+offset]))) + tdLog.info("row: %d, row in limted: %s" % (i+1, str(limited_rows[i]))) + tdLog.exit("row data check failed") + tdLog.info("all rows are the same as query without limit..") + + def query_and_check_with_slimit(self, sql: str, max_limit: int, step: int, offset: int = 0): + self.query_and_check_with_limit(sql, max_limit, step, offset, ' slimit ') + + def query_and_check_with_limit(self, sql: str, max_limit: int, step: int, offset: int = 0, limit_str: str = ' limit '): + for limit in range(0, max_limit, step): + limited_sql = sql + limit_str + str(offset) + "," + str(limit) + tdLog.info("query with sql: %s " % (sql) + limit_str + " %d,%d" % (offset, limit)) + all_rows = tdSql.getResult(sql) + limited_rows = tdSql.getResult(limited_sql) + tdLog.info("all rows: %d, limited rows: %d" % (len(all_rows), len(limited_rows))) + if limit_str == ' limit ': + if limit + offset <= len(all_rows) and len(limited_rows) != limit: + tdLog.exit("limited sql has less rows than limit value which is not right, \ + limit: %d, limited_rows: %d, all_rows: %d, offset: %d" % (limit, len(limited_rows), len(all_rows), offset)) + elif limit + offset > len(all_rows) and offset < len(all_rows) and offset + len(limited_rows) != len(all_rows): + tdLog.exit("limited sql has less rows than all_rows which is not right, \ + limit: %d, limited_rows: %d, all_rows: %d, offset: %d" % (limit, len(limited_rows), len(all_rows), offset)) + elif offset >= len(all_rows) and len(limited_rows) != 0: + tdLog.exit("limited rows should be zero, \ + limit: %d, limited_rows: %d, all_rows: %d, offset: %d" % (limit, len(limited_rows), len(all_rows), offset)) + + self.check_first_rows(all_rows, limited_rows, offset) + + def test_interval_limit_asc(self, offset: int = 0): + sqls = ["select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1s) ", + "select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1m) ", + "select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1h) ", + "select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1d) ", + "select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1s) ", + "select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1m) ", + "select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1h) ", + "select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1d) "] + for sql in sqls: + self.query_and_check_with_limit(sql, 5000, 500, offset) + + def test_interval_limit_desc(self, offset: int = 0): + sqls = ["select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1s) ", + "select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1m) ", + "select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1h) ", + "select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1d) ", + "select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1s) ", + "select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1m) ", + "select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1h) ", + "select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1d) "] + for sql in sqls: + self.query_and_check_with_limit(sql, 5000, 500, offset) + + def test_interval_limit_offset(self): + for offset in range(0, 1000, 500): + self.test_interval_limit_asc(offset) + self.test_interval_limit_desc(offset) + self.test_interval_fill_limit(offset) + self.test_interval_order_by_limit(offset) + self.test_interval_partition_by_slimit(offset) + + def test_interval_fill_limit(self, offset: int = 0): + sqls = [ + "select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \ + where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1s) fill(linear)", + "select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \ + where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1m) fill(linear)", + "select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \ + where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1h) fill(linear)", + "select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \ + where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1d) fill(linear)" + ] + for sql in sqls: + self.query_and_check_with_limit(sql, 5000, 1000, offset) + + def test_interval_order_by_limit(self, offset: int = 0): + sqls = [ + "select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \ + where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by b", + "select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \ + where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by a desc", + "select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), last(ts) from meters \ + where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by a desc", + "select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \ + where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by count(*), sum(c1), a", + "select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \ + where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by a, count(*), sum(c1)", + "select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \ + where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by b", + "select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \ + where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by a desc", + "select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters \ + where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by a desc", + "select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \ + where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by count(*), sum(c1), a", + "select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \ + where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by a, count(*), sum(c1)", + ] + for sql in sqls: + self.query_and_check_with_limit(sql, 6000, 2000, offset) + + def test_interval_partition_by_slimit(self, offset: int = 0): + sqls = [ + "select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters " + "where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by t1 interval(1m)", + "select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters " + "where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by t1 interval(1h)", + "select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters " + "where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by c3 interval(1m)", + ] + for sql in sqls: + self.query_and_check_with_slimit(sql, 10, 2, offset) + + def test_interval_partition_by_slimit_limit(self): + sql = "select * from (select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts),c3 from meters " \ + "where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by c3 interval(1m) slimit 10 limit 2) order by c3 asc" + tdSql.query(sql) + tdSql.checkRows(20) + tdSql.checkData(0, 4, 0) + tdSql.checkData(1, 4, 0) + tdSql.checkData(2, 4, 1) + tdSql.checkData(3, 4, 1) + tdSql.checkData(18, 4, 9) + tdSql.checkData(19, 4, 9) + + sql = "select * from (select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts),c3 from meters " \ + "where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by c3 interval(1m) slimit 2,2 limit 2) order by c3 asc" + tdSql.query(sql) + tdSql.checkRows(4) + tdSql.checkData(0, 4, 2) + tdSql.checkData(1, 4, 2) + tdSql.checkData(2, 4, 9) + tdSql.checkData(3, 4, 9) + + def run(self): + self.prepareTestEnv() + self.test_interval_limit_offset() + self.test_interval_partition_by_slimit_limit() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())