From 03784406c472f15de0cec9ba2a7c1c69eab64d75 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 11 Aug 2022 22:08:19 +0800 Subject: [PATCH] fix: improve create stream check --- include/libs/nodes/querynodes.h | 1 + source/libs/parser/src/parTranslater.c | 14 +++++++++++++- source/libs/planner/src/planOptimizer.c | 3 +++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index db87bde521..38c5055059 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -269,6 +269,7 @@ typedef struct SSelectStmt { bool hasInterpFunc; bool hasLastRowFunc; bool hasTimeLineFunc; + bool hasUdaf; bool onlyHasKeepOrderFunc; bool groupSort; } SSelectStmt; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 4743a9aa9a..be4ac404fd 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1350,6 +1350,7 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) { pSelect->hasInterpFunc = pSelect->hasInterpFunc ? true : (FUNCTION_TYPE_INTERP == pFunc->funcType); pSelect->hasLastRowFunc = pSelect->hasLastRowFunc ? true : (FUNCTION_TYPE_LAST_ROW == pFunc->funcType); pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId); + pSelect->hasUdaf = pSelect->hasUdaf ? true : fmIsUserDefinedFunc(pFunc->funcId) && fmIsAggFunc(pFunc->funcId); pSelect->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc ? fmIsKeepOrderFunc(pFunc->funcId) : false; } } @@ -2644,6 +2645,11 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { return TSDB_CODE_SUCCESS; } + if (NULL == pSelect->pRange || NULL == pSelect->pEvery || NULL == pSelect->pFill) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE, + "Missing RANGE clause, EVERY clause or FILL clause"); + } + int32_t code = translateExpr(pCxt, &pSelect->pRange); if (TSDB_CODE_SUCCESS == code) { code = translateExpr(pCxt, &pSelect->pEvery); @@ -4734,6 +4740,11 @@ static bool crossTableWithoutAggOper(SSelectStmt* pSelect) { !isPartitionByTbname(pSelect->pPartitionByList); } +static bool crossTableWithUdaf(SSelectStmt* pSelect) { + return pSelect->hasUdaf && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && + !isPartitionByTbname(pSelect->pPartitionByList); +} + static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { if (NULL != pStmt->pOptions->pWatermark && (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) { @@ -4785,7 +4796,8 @@ static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) { static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) { if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || - !pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect)) { + !pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList || + crossTableWithUdaf(pSelect)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index f4f7c9aefd..45ab3903a9 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1615,6 +1615,9 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pGroupTags); int32_t code = replaceLogicNode(pLogicSubplan, pNode, (SLogicNode*)pScan); + if (TSDB_CODE_SUCCESS == code) { + code = adjustLogicNodeDataRequirement((SLogicNode*)pScan, pNode->resultDataOrder); + } if (TSDB_CODE_SUCCESS == code) { NODES_CLEAR_LIST(pNode->pChildren); nodesDestroyNode((SNode*)pNode);