diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 644185a244..2f6bb603c1 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -96,6 +96,7 @@ typedef struct SScanLogicNode { bool groupSort; int8_t cacheLastMode; bool hasNormalCols; // neither tag column nor primary key tag column + bool sortPrimaryKey; } SScanLogicNode; typedef struct SJoinLogicNode { @@ -204,6 +205,7 @@ typedef struct SWindowLogicNode { int8_t igExpired; EWindowAlgorithm windowAlgo; EOrder inputTsOrder; + EOrder outputTsOrder; } SWindowLogicNode; typedef struct SFillLogicNode { @@ -212,6 +214,7 @@ typedef struct SFillLogicNode { SNode* pWStartTs; SNode* pValues; // SNodeListNode STimeWindow timeRange; + EOrder inputTsOrder; } SFillLogicNode; typedef struct SSortLogicNode { @@ -410,6 +413,8 @@ typedef struct SWinodwPhysiNode { int8_t triggerType; int64_t watermark; int8_t igExpired; + EOrder inputTsOrder; + EOrder outputTsOrder; } SWinodwPhysiNode; typedef struct SIntervalPhysiNode { @@ -434,6 +439,7 @@ typedef struct SFillPhysiNode { SNode* pValues; // SNodeListNode SNodeList* pTargets; STimeWindow timeRange; + EOrder inputTsOrder; } SFillPhysiNode; typedef struct SMultiTableIntervalPhysiNode { diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h index f60c518d73..104c007756 100644 --- a/source/libs/command/inc/commandInt.h +++ b/source/libs/command/inc/commandInt.h @@ -19,6 +19,8 @@ #ifdef __cplusplus extern "C" { #endif + +// clang-format off #include "nodes.h" #include "plannodes.h" #include "ttime.h" @@ -77,6 +79,8 @@ extern "C" { #define EXPLAIN_EXECINFO_FORMAT "cost=%.3f..%.3f rows=%" PRIu64 #define EXPLAIN_MODE_FORMAT "mode=%s" #define EXPLAIN_STRING_TYPE_FORMAT "%s" +#define EXPLAIN_INPUT_ORDER_FORMAT "input_order=%s" +#define EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT "output_order=%s" #define COMMAND_RESET_LOG "resetLog" #define COMMAND_SCHEDULE_POLICY "schedulePolicy" @@ -122,7 +126,7 @@ typedef struct SExplainCtx { SHashObj *groupHash; // Hash } SExplainCtx; -#define EXPLAIN_ORDER_STRING(_order) ((TSDB_ORDER_ASC == _order) ? "Ascending" : "Descending") +#define EXPLAIN_ORDER_STRING(_order) ((ORDER_ASC == _order) ? "asc" : "desc") #define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join") #define INVERAL_TIME_FROM_PRECISION_TO_UNIT(_t, _u, _p) (((_u) == 'n' || (_u) == 'y') ? (_t) : (convertTimeFromPrecisionToUnit(_t, _p, _u))) diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 266f96b41e..66a94f7e28 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +// clang-format off #include "commandInt.h" #include "plannodes.h" #include "query.h" @@ -849,6 +850,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pIntNode->window.pFuncs->length); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->totalRowSize); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pIntNode->window.inputTsOrder)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT, EXPLAIN_ORDER_STRING(pIntNode->window.outputTsOrder)); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); @@ -1154,7 +1159,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_NEW(level + 1, EXPLAIN_MERGE_KEYS_FORMAT); for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) { SOrderByExprNode *ptn = (SOrderByExprNode *)nodesListGetNode(pMergeNode->pMergeKeys, i); - EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr)); + EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, nodesGetNameFromColumnNode(ptn->pExpr)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, EXPLAIN_ORDER_STRING(ptn->order)); } EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a80c2c2fea..37818dd91d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -523,7 +523,8 @@ typedef struct SIntervalAggOperatorInfo { STimeWindow win; // query time range bool timeWindowInterpo; // interpolation needed or not SArray* pInterpCols; // interpolation columns - int32_t order; // current SSDataBlock scan order + int32_t resultTsOrder; // result timestamp order + int32_t inputOrder; // input data ts order EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] STimeWindowAggSupp twAggSup; bool invertible; @@ -533,8 +534,7 @@ typedef struct SIntervalAggOperatorInfo { SArray* pDelWins; // SWinRes int32_t delIndex; SSDataBlock* pDelRes; - - SNode *pCondition; + SNode* pCondition; } SIntervalAggOperatorInfo; typedef struct SMergeAlignedIntervalAggOperatorInfo { @@ -804,7 +804,7 @@ typedef struct STagFilterOperatorInfo { typedef struct SJoinOperatorInfo { SSDataBlock *pRes; int32_t joinType; - int32_t inputTsOrder; + int32_t inputOrder; SSDataBlock *pLeft; int32_t leftPos; diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index 0349632b9a..b604794dad 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -74,9 +74,9 @@ void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataB struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* val); bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); -SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, +SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId, - const char* id); + int32_t order, const char* id); void* taosDestroyFillInfo(struct SFillInfo *pFillInfo); int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 525d7bf336..7359a4ea90 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3586,15 +3586,14 @@ void doDestroyExchangeOperatorInfo(void* param) { } static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode, - STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) { + STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType, int32_t order) { SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode); STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey); w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC); - int32_t order = TSDB_ORDER_ASC; pInfo->pFillInfo = - taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, pInfo->primaryTsCol, id); + taosCreateFillInfo(w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, pInfo->primaryTsCol, order, id); pInfo->win = win; pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); @@ -3624,6 +3623,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval : &((SIntervalAggOperatorInfo*)downstream->info)->interval; + int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; int32_t type = convertFillType(pPhyFillNode->mode); SResultInfo* pResultInfo = &pOperator->resultInfo; @@ -3635,7 +3635,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, - pResultInfo->capacity, pTaskInfo->id.str, pInterval, type); + pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 8902804fab..17b81cdb82 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -77,11 +77,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->pCondAfterMerge = NULL; } - pInfo->inputTsOrder = TSDB_ORDER_ASC; + pInfo->inputOrder = TSDB_ORDER_ASC; if (pJoinNode->inputTsOrder == ORDER_ASC) { - pInfo->inputTsOrder = TSDB_ORDER_ASC; + pInfo->inputOrder = TSDB_ORDER_ASC; } else if (pJoinNode->inputTsOrder == ORDER_DESC) { - pInfo->inputTsOrder = TSDB_ORDER_DESC; + pInfo->inputOrder = TSDB_ORDER_DESC; } pOperator->fpSet = @@ -312,7 +312,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) int32_t nrows = pRes->info.rows; - bool asc = (pJoinInfo->inputTsOrder == TSDB_ORDER_ASC) ? true : false; + bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false; while (1) { int64_t leftTs = 0; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index ff856a48b6..c5d68676d2 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -66,7 +66,7 @@ static void setNullRow(SSDataBlock* pBlock, int64_t ts, int32_t rowIndex) { static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey); -static void doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32_t rowIndex, int64_t currentKey) { +static void doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32_t rowIndex, int64_t currentKey) { if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { float v = 0; GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); @@ -184,7 +184,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* continue; } - SVariant* pVar = &pFillInfo->pFillCol[i].fillVal; + SVariant* pVar = &pFillInfo->pFillCol[i].fillVal; SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); doSetUserSpecifiedValue(pDst, pVar, index, pFillInfo->currentKey); } @@ -298,7 +298,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); char* src = colDataGetData(pSrc, pFillInfo->index); - if (/*i == 0 || (*/!colDataIsNull_s(pSrc, pFillInfo->index)) { + if (/*i == 0 || (*/ !colDataIsNull_s(pSrc, pFillInfo->index)) { bool isNull = colDataIsNull_s(pSrc, pFillInfo->index); colDataAppend(pDst, index, src, isNull); saveColData(pFillInfo->prev, i, src, isNull); @@ -313,7 +313,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t } else if (pFillInfo->type == TSDB_FILL_LINEAR) { bool isNull = colDataIsNull_s(pSrc, pFillInfo->index); colDataAppend(pDst, index, src, isNull); - saveColData(pFillInfo->prev, i, src, isNull); // todo: + saveColData(pFillInfo->prev, i, src, isNull); // todo: } else if (pFillInfo->type == TSDB_FILL_NULL) { colDataAppendNULL(pDst, index); } else if (pFillInfo->type == TSDB_FILL_NEXT) { @@ -433,9 +433,9 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { return pFillInfo->numOfRows - pFillInfo->index; } -struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, - SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t primaryTsSlotId, - const char* id) { +struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, + SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, + int32_t primaryTsSlotId, int32_t order, const char* id) { if (fillType == TSDB_FILL_NONE) { return NULL; } @@ -446,10 +446,9 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag return NULL; } - pFillInfo->tsSlotId = primaryTsSlotId; - - taosResetFillInfo(pFillInfo, skey); pFillInfo->order = order; + pFillInfo->tsSlotId = primaryTsSlotId; + taosResetFillInfo(pFillInfo, skey); switch (fillType) { case FILL_MODE_NONE: @@ -535,6 +534,14 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) { return NULL; } +void taosFillSetDataOrderInfo(SFillInfo* pFillInfo, int32_t order) { + if (pFillInfo == NULL || (order != TSDB_ORDER_ASC && order != TSDB_ORDER_DESC)) { + return; + } + + pFillInfo->order = order; +} + void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) { if (pFillInfo->type == TSDB_FILL_NONE) { return; @@ -581,7 +588,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma int64_t numOfRes = -1; if (numOfRows > 0) { // still fill gap within current data block, not generating data after the result set. - TSKEY lastKey = tsList[pFillInfo->numOfRows - 1]; + TSKEY lastKey = (TSDB_ORDER_ASC == pFillInfo->order ? tsList[pFillInfo->numOfRows - 1] : tsList[0]); numOfRes = taosTimeCountInterval(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit, pFillInfo->interval.precision); numOfRes += 1; @@ -626,9 +633,9 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t ca } qDebug("fill:%p, generated fill result, src block:%d, index:%d, brange:%" PRId64 "-%" PRId64 ", currentKey:%" PRId64 - ", current : % d, total : % d, %s", pFillInfo, - pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->end, pFillInfo->currentKey, - pFillInfo->numOfCurrent, pFillInfo->numOfTotal, pFillInfo->id); + ", current : % d, total : % d, %s", + pFillInfo, pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->end, pFillInfo->currentKey, + pFillInfo->numOfCurrent, pFillInfo->numOfTotal, pFillInfo->id); return numOfRes; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 400940245d..f567d426b5 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -362,7 +362,7 @@ static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, in static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock, const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) { - bool ascQuery = (pInfo->order == TSDB_ORDER_ASC); + bool ascQuery = (pInfo->inputOrder == TSDB_ORDER_ASC); TSKEY curTs = tsCols[pos]; @@ -392,7 +392,7 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex, SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) { - int32_t order = pInfo->order; + int32_t order = pInfo->inputOrder; TSKEY actualEndKey = tsCols[endRowIndex]; TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey; @@ -550,7 +550,7 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB if (!done) { int32_t endRowIndex = startPos + forwardRows - 1; - TSKEY endKey = (pInfo->order == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey; + TSKEY endKey = (pInfo->inputOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey; bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); @@ -639,7 +639,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); doApplyFunctions(pTaskInfo, pSup->pCtx, &w, &pInfo->twAggSup.timeWindowData, startPos, 0, tsCols, pBlock->info.rows, - numOfExprs, pInfo->order); + numOfExprs, pInfo->inputOrder); if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { closeResultRow(pr); @@ -924,11 +924,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul int32_t numOfOutput = pSup->numOfExprs; int64_t* tsCols = extractTsCol(pBlock, pInfo); uint64_t tableGroupId = pBlock->info.groupId; - bool ascScan = (pInfo->order == TSDB_ORDER_ASC); + bool ascScan = (pInfo->inputOrder == TSDB_ORDER_ASC); TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); SResultRow* pResult = NULL; - STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order); + STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder); int32_t ret = TSDB_CODE_SUCCESS; if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { @@ -946,7 +946,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul TSKEY ekey = ascScan ? win.ekey : win.skey; int32_t forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); ASSERT(forwardRows > 0); // prev time window not interpolation yet. @@ -969,7 +969,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pBlock->info.rows, numOfOutput, pInfo->order); + pBlock->info.rows, numOfOutput, pInfo->inputOrder); } doCloseWindow(pResultRowInfo, pInfo, pResult); @@ -977,14 +977,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul STimeWindow nextWin = win; while (1) { int32_t prevEndPos = forwardRows - 1 + startPos; - startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->order); + startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->inputOrder); if (startPos < 0) { break; } if (pInfo->ignoreExpiredData && isCloseWindow(&nextWin, &pInfo->twAggSup)) { ekey = ascScan ? nextWin.ekey : nextWin.skey; forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); continue; } @@ -1002,14 +1002,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ekey = ascScan ? nextWin.ekey : nextWin.skey; forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); // window start(end) key interpolation doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pBlock->info.rows, numOfOutput, pInfo->order); + pBlock->info.rows, numOfOutput, pInfo->inputOrder); doCloseWindow(pResultRowInfo, pInfo, pResult); } @@ -1082,7 +1082,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { break; } - getTableScanInfo(pOperator, &pInfo->order, &scanFlag); + getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag); if (pInfo->scalarSupp.pExprInfo != NULL) { SExprSupp* pExprSup = &pInfo->scalarSupp; @@ -1090,13 +1090,13 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, scanFlag, true); + setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->inputOrder, scanFlag, true); blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); } - initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->order); + initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->resultTsOrder); OPTR_SET_OPENED(pOperator); pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; @@ -1249,7 +1249,6 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { } pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; - pOperator->status = OP_RES_TO_RETURN; initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); @@ -1550,7 +1549,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SIntervalAggOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - pInfo->order = TSDB_ORDER_ASC; + pInfo->inputOrder = TSDB_ORDER_ASC; SExprSupp* pSup = &pOperator->exprSupp; if (pOperator->status == OP_EXEC_DONE) { @@ -1610,7 +1609,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { // The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the // caller. Note that all the time window are not close till now. // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true); + setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->inputOrder, MAIN_SCAN, true); if (pInfo->invertible) { setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type); } @@ -1790,7 +1789,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* } pInfo->win = pTaskInfo->window; - pInfo->order = TSDB_ORDER_ASC; + pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; + pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; pInfo->interval = *pInterval; pInfo->execModel = pTaskInfo->execModel; pInfo->twAggSup = *pTwAggSupp; @@ -1807,7 +1807,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* } pInfo->primaryTsIndex = primaryTsSlotId; - SExprSupp* pSup = &pOperator->exprSupp; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -1879,7 +1878,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr goto _error; } - pInfo->order = TSDB_ORDER_ASC; + pInfo->inputOrder = TSDB_ORDER_ASC; pInfo->interval = *pInterval; pInfo->execModel = OPTR_EXEC_MODEL_STREAM; pInfo->win = pTaskInfo->window; @@ -4593,7 +4592,7 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; SExprSupp* pSup = &pOperatorInfo->exprSupp; - bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); + bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC); SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId); SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, @@ -4647,7 +4646,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } else { updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); + tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); @@ -4666,7 +4665,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); + tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); } @@ -4711,8 +4710,8 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { break; } - getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); - setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->order, scanFlag, true); + getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag); + setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->inputOrder, scanFlag, true); doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); doFilter(miaInfo->pCondition, pRes, NULL); if (pRes->info.rows >= pOperator->resultInfo.capacity) { @@ -4753,7 +4752,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, miaInfo->pCondition = pCondition; iaInfo->win = pTaskInfo->window; - iaInfo->order = TSDB_ORDER_ASC; + iaInfo->inputOrder = TSDB_ORDER_ASC; iaInfo->interval = *pInterval; iaInfo->execModel = pTaskInfo->execModel; iaInfo->primaryTsIndex = primaryTsSlotId; @@ -4835,7 +4834,7 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); + bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC); SExprSupp* pExprSup = &pOperatorInfo->exprSupp; SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId); @@ -4853,7 +4852,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); + bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC); SExprSupp* pExprSup = &pOperatorInfo->exprSupp; SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin}; @@ -4889,12 +4888,12 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* int32_t numOfOutput = pExprSup->numOfExprs; int64_t* tsCols = extractTsCol(pBlock, iaInfo); uint64_t tableGroupId = pBlock->info.groupId; - bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); + bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC); TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols); SResultRow* pResult = NULL; STimeWindow win = - getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, iaInfo->order); + getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, iaInfo->inputOrder); int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx, @@ -4905,7 +4904,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* TSKEY ekey = ascScan ? win.ekey : win.skey; int32_t forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order); + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->inputOrder); ASSERT(forwardRows > 0); // prev time window not interpolation yet. @@ -4926,7 +4925,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true); doApplyFunctions(pTaskInfo, pExprSup->pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pBlock->info.rows, numOfOutput, iaInfo->order); + pBlock->info.rows, numOfOutput, iaInfo->inputOrder); doCloseWindow(pResultRowInfo, iaInfo, pResult); // output previous interval results after this interval (&win) is closed @@ -4935,7 +4934,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* STimeWindow nextWin = win; while (1) { int32_t prevEndPos = forwardRows - 1 + startPos; - startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->order); + startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->inputOrder); if (startPos < 0) { break; } @@ -4950,14 +4949,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ekey = ascScan ? nextWin.ekey : nextWin.skey; forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order); + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->inputOrder); // window start(end) key interpolation doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pExprSup->pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, - tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); + tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); doCloseWindow(pResultRowInfo, iaInfo, pResult); // output previous interval results after this interval (&nextWin) is closed @@ -5011,8 +5010,8 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { break; } - getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); - setInputDataBlock(pOperator, pExpSupp->pCtx, pBlock, iaInfo->order, scanFlag, true); + getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag); + setInputDataBlock(pOperator, pExpSupp->pCtx, pBlock, iaInfo->inputOrder, scanFlag, true); doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); if (pRes->info.rows >= pOperator->resultInfo.threshold) { @@ -5054,9 +5053,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI miaInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow)); SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; - iaInfo->win = pTaskInfo->window; - iaInfo->order = TSDB_ORDER_ASC; + iaInfo->inputOrder = TSDB_ORDER_ASC; iaInfo->interval = *pInterval; iaInfo->execModel = pTaskInfo->execModel; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 5fc94c2642..79ef18eeb6 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -443,6 +443,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p COPY_SCALAR_FIELD(igExpired); COPY_SCALAR_FIELD(windowAlgo); COPY_SCALAR_FIELD(inputTsOrder); + COPY_SCALAR_FIELD(outputTsOrder); return TSDB_CODE_SUCCESS; } @@ -452,6 +453,7 @@ static int32_t logicFillCopy(const SFillLogicNode* pSrc, SFillLogicNode* pDst) { CLONE_NODE_FIELD(pWStartTs); CLONE_NODE_FIELD(pValues); COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow)); + COPY_SCALAR_FIELD(inputTsOrder); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index af3f0c242b..b0c16f26ed 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1936,6 +1936,8 @@ static const char* jkWindowPhysiPlanTsEnd = "TsEnd"; static const char* jkWindowPhysiPlanTriggerType = "TriggerType"; static const char* jkWindowPhysiPlanWatermark = "Watermark"; static const char* jkWindowPhysiPlanIgnoreExpired = "IgnoreExpired"; +static const char* jkWindowPhysiPlanInputTsOrder = "inputTsOrder"; +static const char* jkWindowPhysiPlanOutputTsOrder = "outputTsOrder"; static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj; @@ -1962,6 +1964,12 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanIgnoreExpired, pNode->igExpired); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanInputTsOrder, pNode->inputTsOrder); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanOutputTsOrder, pNode->outputTsOrder); + } return code; } @@ -1991,6 +1999,12 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetTinyIntValue(pJson, jkWindowPhysiPlanIgnoreExpired, &pNode->igExpired); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkWindowPhysiPlanInputTsOrder, pNode->inputTsOrder, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkWindowPhysiPlanOutputTsOrder, pNode->outputTsOrder, code); + } return code; } @@ -2053,6 +2067,7 @@ static const char* jkFillPhysiPlanValues = "Values"; static const char* jkFillPhysiPlanTargets = "Targets"; static const char* jkFillPhysiPlanStartTime = "StartTime"; static const char* jkFillPhysiPlanEndTime = "EndTime"; +static const char* jkFillPhysiPlanInputTsOrder = "inputTsOrder"; static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) { const SFillPhysiNode* pNode = (const SFillPhysiNode*)pObj; @@ -2076,6 +2091,9 @@ static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkFillPhysiPlanEndTime, pNode->timeRange.ekey); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkFillPhysiPlanInputTsOrder, pNode->inputTsOrder); + } return code; } @@ -2103,6 +2121,9 @@ static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBigIntValue(pJson, jkFillPhysiPlanEndTime, &pNode->timeRange.ekey); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkFillPhysiPlanInputTsOrder, pNode->inputTsOrder, code); + } return code; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index a80509c165..b51624336b 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -632,6 +632,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm pWindow->igExpired = pCxt->pPlanCxt->igExpired; } pWindow->inputTsOrder = ORDER_ASC; + pWindow->outputTsOrder = ORDER_ASC; int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_WINDOW, fmIsWindowClauseFunc, &pWindow->pFuncs); if (TSDB_CODE_SUCCESS == code) { @@ -764,6 +765,7 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pFill->node.groupAction = GROUP_ACTION_KEEP; pFill->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP; pFill->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP; + pFill->inputTsOrder = ORDER_ASC; int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_WINDOW, NULL, COLLECT_COL_TYPE_ALL, &pFill->node.pTargets); if (TSDB_CODE_SUCCESS == code && NULL == pFill->node.pTargets) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 98b0ce2007..577266a697 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -38,10 +38,13 @@ typedef struct SOptimizeRule { FOptimize optimizeFunc; } SOptimizeRule; +typedef enum EScanOrder { SCAN_ORDER_ASC = 1, SCAN_ORDER_DESC, SCAN_ORDER_BOTH } EScanOrder; + typedef struct SOsdInfo { SScanLogicNode* pScan; SNodeList* pSdrFuncs; SNodeList* pDsoFuncs; + EScanOrder scanOrder; } SOsdInfo; typedef struct SCpdIsMultiTableCondCxt { @@ -97,6 +100,27 @@ static EDealRes optRebuildTbanme(SNode** pNode, void* pContext) { return DEAL_RES_CONTINUE; } +static void optSetParentOrder(SLogicNode* pNode, EOrder order) { + if (NULL == pNode) { + return; + } + switch (nodeType(pNode)) { + case QUERY_NODE_LOGIC_PLAN_WINDOW: + ((SWindowLogicNode*)pNode)->inputTsOrder = order; + // window has a sorting function, and the operator behind it uses its output order + return; + case QUERY_NODE_LOGIC_PLAN_JOIN: + ((SJoinLogicNode*)pNode)->inputTsOrder = order; + break; + case QUERY_NODE_LOGIC_PLAN_FILL: + ((SFillLogicNode*)pNode)->inputTsOrder = order; + break; + default: + break; + } + optSetParentOrder(pNode->pParent, order); +} + EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { // *((bool*)pContext) = (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType); @@ -179,16 +203,18 @@ static int32_t scanPathOptGetRelatedFuncs(SScanLogicNode* pScan, SNodeList** pSd SNodeList* pAllFuncs = scanPathOptGetAllFuncs(pScan->node.pParent); SNodeList* pTmpSdrFuncs = NULL; SNodeList* pTmpDsoFuncs = NULL; - SNode* pFunc = NULL; + SNode* pNode = NULL; bool otherFunc = false; - FOREACH(pFunc, pAllFuncs) { - int32_t code = TSDB_CODE_SUCCESS; - if (scanPathOptNeedOptimizeDataRequire((SFunctionNode*)pFunc)) { - code = nodesListMakeStrictAppend(&pTmpSdrFuncs, nodesCloneNode(pFunc)); - } else if (scanPathOptNeedDynOptimize((SFunctionNode*)pFunc)) { - code = nodesListMakeStrictAppend(&pTmpDsoFuncs, nodesCloneNode(pFunc)); + FOREACH(pNode, pAllFuncs) { + SFunctionNode* pFunc = (SFunctionNode*)pNode; + int32_t code = TSDB_CODE_SUCCESS; + if (scanPathOptNeedOptimizeDataRequire(pFunc)) { + code = nodesListMakeStrictAppend(&pTmpSdrFuncs, nodesCloneNode(pNode)); + } else if (scanPathOptNeedDynOptimize(pFunc)) { + code = nodesListMakeStrictAppend(&pTmpDsoFuncs, nodesCloneNode(pNode)); } else { otherFunc = true; + break; } if (TSDB_CODE_SUCCESS != code) { nodesDestroyList(pTmpSdrFuncs); @@ -206,12 +232,46 @@ static int32_t scanPathOptGetRelatedFuncs(SScanLogicNode* pScan, SNodeList** pSd return TSDB_CODE_SUCCESS; } +static int32_t scanPathOptGetScanOrder(SScanLogicNode* pScan, EScanOrder* pScanOrder) { + SNodeList* pAllFuncs = scanPathOptGetAllFuncs(pScan->node.pParent); + SNode* pNode = NULL; + bool hasFirst = false; + bool hasLast = false; + bool otherFunc = false; + FOREACH(pNode, pAllFuncs) { + SFunctionNode* pFunc = (SFunctionNode*)pNode; + if (FUNCTION_TYPE_FIRST == pFunc->funcType) { + hasFirst = true; + } else if (FUNCTION_TYPE_LAST == pFunc->funcType) { + hasLast = true; + } else if (FUNCTION_TYPE_SELECT_VALUE != pFunc->funcType) { + otherFunc = true; + } + } + if (hasFirst && hasLast && !otherFunc) { + *pScanOrder = SCAN_ORDER_BOTH; + } else if (hasLast) { + *pScanOrder = SCAN_ORDER_DESC; + } else { + *pScanOrder = SCAN_ORDER_ASC; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t scanPathOptSetOsdInfo(SOsdInfo* pInfo) { + int32_t code = scanPathOptGetRelatedFuncs(pInfo->pScan, &pInfo->pSdrFuncs, &pInfo->pDsoFuncs); + if (TSDB_CODE_SUCCESS == code) { + code = scanPathOptGetScanOrder(pInfo->pScan, &pInfo->scanOrder); + } + return code; +} + static int32_t scanPathOptMatch(SOptimizeContext* pCxt, SLogicNode* pLogicNode, SOsdInfo* pInfo) { pInfo->pScan = (SScanLogicNode*)optFindPossibleNode(pLogicNode, scanPathOptMayBeOptimized); if (NULL == pInfo->pScan) { return TSDB_CODE_SUCCESS; } - return scanPathOptGetRelatedFuncs(pInfo->pScan, &pInfo->pSdrFuncs, &pInfo->pDsoFuncs); + return scanPathOptSetOsdInfo(pInfo); } static EFuncDataRequired scanPathOptPromoteDataRequired(EFuncDataRequired l, EFuncDataRequired r) { @@ -258,15 +318,42 @@ static void scanPathOptSetScanWin(SScanLogicNode* pScan) { } } +static void scanPathOptSetScanOrder(EScanOrder scanOrder, SScanLogicNode* pScan) { + if (pScan->sortPrimaryKey || pScan->scanSeq[0] > 1 || pScan->scanSeq[1] > 1) { + return; + } + switch (scanOrder) { + case SCAN_ORDER_ASC: + pScan->scanSeq[0] = 1; + pScan->scanSeq[1] = 0; + optSetParentOrder(pScan->node.pParent, ORDER_ASC); + break; + case SCAN_ORDER_DESC: + pScan->scanSeq[0] = 0; + pScan->scanSeq[1] = 1; + optSetParentOrder(pScan->node.pParent, ORDER_DESC); + break; + case SCAN_ORDER_BOTH: + pScan->scanSeq[0] = 1; + pScan->scanSeq[1] = 1; + break; + default: + break; + } +} + static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { - SOsdInfo info = {0}; + SOsdInfo info = {.scanOrder = SCAN_ORDER_ASC}; int32_t code = scanPathOptMatch(pCxt, pLogicSubplan->pNode, &info); if (TSDB_CODE_SUCCESS == code && info.pScan) { - scanPathOptSetScanWin((SScanLogicNode*)info.pScan); + scanPathOptSetScanWin(info.pScan); + scanPathOptSetScanOrder(info.scanOrder, info.pScan); } if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) { info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs); info.pScan->pDynamicScanFuncs = info.pDsoFuncs; + } + if (TSDB_CODE_SUCCESS == code && info.pScan) { OPTIMIZE_FLAG_SET_MASK(info.pScan->node.optimizedFlag, OPTIMIZE_FLAG_SCAN_PATH); pCxt->optimized = true; } @@ -987,12 +1074,13 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) { } SSortLogicNode* pSort = (SSortLogicNode*)pNode; if (pSort->groupSort || !sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys) || 1 != LIST_LENGTH(pSort->node.pChildren)) { - return TSDB_CODE_SUCCESS; + return false; } return true; } -static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeList** pScanNodes) { +static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool* pNotOptimize, + SNodeList** pSequencingNodes) { switch (nodeType(pNode)) { case QUERY_NODE_LOGIC_PLAN_SCAN: { SScanLogicNode* pScan = (SScanLogicNode*)pNode; @@ -1000,17 +1088,19 @@ static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimi *pNotOptimize = true; return TSDB_CODE_SUCCESS; } - return nodesListMakeAppend(pScanNodes, (SNode*)pNode); + return nodesListMakeAppend(pSequencingNodes, (SNode*)pNode); } case QUERY_NODE_LOGIC_PLAN_JOIN: { - int32_t code = - sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes); + int32_t code = sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), + pNotOptimize, pSequencingNodes); if (TSDB_CODE_SUCCESS == code) { - code = - sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), pNotOptimize, pScanNodes); + code = sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), pNotOptimize, + pSequencingNodes); } return code; } + case QUERY_NODE_LOGIC_PLAN_WINDOW: + return nodesListMakeAppend(pSequencingNodes, (SNode*)pNode); case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_PARTITION: *pNotOptimize = true; @@ -1024,14 +1114,15 @@ static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimi return TSDB_CODE_SUCCESS; } - return sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes); + return sortPriKeyOptGetSequencingNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, + pSequencingNodes); } -static int32_t sortPriKeyOptGetScanNodes(SLogicNode* pNode, SNodeList** pScanNodes) { +static int32_t sortPriKeyOptGetSequencingNodes(SLogicNode* pNode, SNodeList** pSequencingNodes) { bool notOptimize = false; - int32_t code = sortPriKeyOptGetScanNodesImpl(pNode, ¬Optimize, pScanNodes); + int32_t code = sortPriKeyOptGetSequencingNodesImpl(pNode, ¬Optimize, pSequencingNodes); if (TSDB_CODE_SUCCESS != code || notOptimize) { - nodesClearList(*pScanNodes); + nodesClearList(*pSequencingNodes); } return code; } @@ -1040,33 +1131,26 @@ static EOrder sortPriKeyOptGetPriKeyOrder(SSortLogicNode* pSort) { return ((SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0))->order; } -static void sortPriKeyOptSetParentOrder(SLogicNode* pNode, EOrder order) { - if (NULL == pNode) { - return; - } - if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode)) { - ((SWindowLogicNode*)pNode)->inputTsOrder = order; - } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode)) { - ((SJoinLogicNode*)pNode)->inputTsOrder = order; - } - sortPriKeyOptSetParentOrder(pNode->pParent, order); -} - static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort, - SNodeList* pScanNodes) { + SNodeList* pSequencingNodes) { EOrder order = sortPriKeyOptGetPriKeyOrder(pSort); - SNode* pScanNode = NULL; - FOREACH(pScanNode, pScanNodes) { - SScanLogicNode* pScan = (SScanLogicNode*)pScanNode; - if (ORDER_DESC == order && pScan->scanSeq[0] > 0) { - TSWAP(pScan->scanSeq[0], pScan->scanSeq[1]); + SNode* pSequencingNode = NULL; + FOREACH(pSequencingNode, pSequencingNodes) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pSequencingNode)) { + SScanLogicNode* pScan = (SScanLogicNode*)pSequencingNode; + if ((ORDER_DESC == order && pScan->scanSeq[0] > 0) || (ORDER_ASC == order && pScan->scanSeq[1] > 0)) { + TSWAP(pScan->scanSeq[0], pScan->scanSeq[1]); + } + if (TSDB_SUPER_TABLE == pScan->tableType) { + pScan->scanType = SCAN_TYPE_TABLE_MERGE; + pScan->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL; + pScan->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; + } + pScan->sortPrimaryKey = true; + } else if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pSequencingNode)) { + ((SWindowLogicNode*)pSequencingNode)->outputTsOrder = order; } - if (TSDB_SUPER_TABLE == pScan->tableType) { - pScan->scanType = SCAN_TYPE_TABLE_MERGE; - pScan->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL; - pScan->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; - } - sortPriKeyOptSetParentOrder(pScan->node.pParent, order); + optSetParentOrder(((SLogicNode*)pSequencingNode)->pParent, order); } SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0); @@ -1083,12 +1167,13 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS } static int32_t sortPrimaryKeyOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort) { - SNodeList* pScanNodes = NULL; - int32_t code = sortPriKeyOptGetScanNodes((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0), &pScanNodes); - if (TSDB_CODE_SUCCESS == code && NULL != pScanNodes) { - code = sortPriKeyOptApply(pCxt, pLogicSubplan, pSort, pScanNodes); + SNodeList* pSequencingNodes = NULL; + int32_t code = + sortPriKeyOptGetSequencingNodes((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0), &pSequencingNodes); + if (TSDB_CODE_SUCCESS == code && NULL != pSequencingNodes) { + code = sortPriKeyOptApply(pCxt, pLogicSubplan, pSort, pSequencingNodes); } - nodesClearList(pScanNodes); + nodesClearList(pSequencingNodes); return code; } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 2e5c4255e6..3771586b34 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1089,6 +1089,8 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pWindow->triggerType = pWindowLogicNode->triggerType; pWindow->watermark = pWindowLogicNode->watermark; pWindow->igExpired = pWindowLogicNode->igExpired; + pWindow->inputTsOrder = pWindowLogicNode->inputTsOrder; + pWindow->outputTsOrder = pWindowLogicNode->outputTsOrder; SNodeList* pPrecalcExprs = NULL; SNodeList* pFuncs = NULL; @@ -1363,6 +1365,7 @@ static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren pFill->mode = pFillNode->mode; pFill->timeRange = pFillNode->timeRange; + pFill->inputTsOrder = pFillNode->inputTsOrder; SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); int32_t code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->node.pTargets, &pFill->pTargets); diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 81e2bff179..bc5e50218b 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -492,7 +492,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_MERGE; SNodeList* pMergeKeys = NULL; code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, - ((SWindowLogicNode*)pInfo->pSplitNode)->inputTsOrder, &pMergeKeys); + ((SWindowLogicNode*)pInfo->pSplitNode)->outputTsOrder, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true); } diff --git a/source/libs/planner/test/planOptimizeTest.cpp b/source/libs/planner/test/planOptimizeTest.cpp index 058705403b..6c5b760564 100644 --- a/source/libs/planner/test/planOptimizeTest.cpp +++ b/source/libs/planner/test/planOptimizeTest.cpp @@ -30,6 +30,11 @@ TEST_F(PlanOptimizeTest, scanPath) { run("SELECT COUNT(CAST(c1 AS BIGINT)) FROM t1"); run("SELECT PERCENTILE(c1, 40), COUNT(*) FROM t1"); + + run("SELECT LAST(c1) FROM t1"); + + run("SELECT LAST(c1) FROM t1 WHERE ts BETWEEN '2022-7-29 11:10:10' AND '2022-7-30 11:10:10' INTERVAL(10S) " + "FILL(LINEAR)"); } TEST_F(PlanOptimizeTest, pushDownCondition) { @@ -57,7 +62,15 @@ TEST_F(PlanOptimizeTest, sortPrimaryKey) { run("SELECT c1 FROM t1 ORDER BY ts DESC"); + run("SELECT c1 FROM st1 ORDER BY ts DESC"); + run("SELECT COUNT(*) FROM t1 INTERVAL(10S) ORDER BY _WSTART DESC"); + + run("SELECT FIRST(c1) FROM t1 WHERE ts BETWEEN '2022-7-29 11:10:10' AND '2022-7-30 11:10:10' INTERVAL(10S) " + "FILL(LINEAR) ORDER BY _WSTART DESC"); + + run("SELECT LAST(c1) FROM t1 WHERE ts BETWEEN '2022-7-29 11:10:10' AND '2022-7-30 11:10:10' INTERVAL(10S) " + "FILL(LINEAR) ORDER BY _WSTART"); } TEST_F(PlanOptimizeTest, PartitionTags) {