From 080c44681c70f84e483776f8f6ce4584679337e2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 9 Oct 2024 18:13:45 +0800 Subject: [PATCH] feat: analysis translater --- source/libs/nodes/src/nodesTraverseFuncs.c | 16 ++ source/libs/nodes/src/nodesUtilFuncs.c | 44 ++++ source/libs/parser/src/parTranslater.c | 239 ++++++++++++++++++++- 3 files changed, 292 insertions(+), 7 deletions(-) diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index 4f30cb12b7..f3f7395a37 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -181,6 +181,14 @@ static EDealRes dispatchExpr(SNode* pNode, ETraversalOrder order, FNodeWalker wa res = walkExpr(pEvent->pCol, order, walker, pContext); break; } + case QUERY_NODE_ANOMALY_WINDOW: { + SAnomalyWindowNode* pAnomaly = (SAnomalyWindowNode*)pNode; + res = walkExpr(pAnomaly->pExpr, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkExpr(pAnomaly->pCol, order, walker, pContext); + } + break; + } default: break; } @@ -392,6 +400,14 @@ static EDealRes rewriteExpr(SNode** pRawNode, ETraversalOrder order, FNodeRewrit res = rewriteExpr(&pEvent->pCol, order, rewriter, pContext); break; } + case QUERY_NODE_ANOMALY_WINDOW: { + SAnomalyWindowNode* pAnomaly = (SAnomalyWindowNode*)pNode; + res = rewriteExpr(&pAnomaly->pExpr, order, rewriter, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = rewriteExpr(&pAnomaly->pCol, order, rewriter, pContext); + } + break; + } default: break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 2c102f5e74..e4fda5ae42 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -419,6 +419,8 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { code = makeNode(type, sizeof(SEventWindowNode), &pNode); break; case QUERY_NODE_COUNT_WINDOW: code = makeNode(type, sizeof(SCountWindowNode), &pNode); break; + case QUERY_NODE_ANOMALY_WINDOW: + code = makeNode(type, sizeof(SAnomalyWindowNode), &pNode); break; case QUERY_NODE_HINT: code = makeNode(type, sizeof(SHintNode), &pNode); break; case QUERY_NODE_VIEW: @@ -474,6 +476,12 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { code = makeNode(type, sizeof(SDropDnodeStmt), &pNode); break; case QUERY_NODE_ALTER_DNODE_STMT: code = makeNode(type, sizeof(SAlterDnodeStmt), &pNode); break; + case QUERY_NODE_CREATE_ANODE_STMT: + code = makeNode(type, sizeof(SCreateAnodeStmt), &pNode); break; + case QUERY_NODE_DROP_ANODE_STMT: + code = makeNode(type, sizeof(SDropAnodeStmt), &pNode); break; + case QUERY_NODE_UPDATE_ANODE_STMT: + code = makeNode(type, sizeof(SUpdateAnodeStmt), &pNode); break; case QUERY_NODE_CREATE_INDEX_STMT: code = makeNode(type, sizeof(SCreateIndexStmt), &pNode); break; case QUERY_NODE_DROP_INDEX_STMT: @@ -540,6 +548,8 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { case QUERY_NODE_SHOW_MNODES_STMT: case QUERY_NODE_SHOW_MODULES_STMT: case QUERY_NODE_SHOW_QNODES_STMT: + case QUERY_NODE_SHOW_ANODES_STMT: + case QUERY_NODE_SHOW_ANODES_FULL_STMT: case QUERY_NODE_SHOW_SNODES_STMT: case QUERY_NODE_SHOW_BNODES_STMT: case QUERY_NODE_SHOW_ARBGROUPS_STMT: @@ -647,6 +657,8 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { code = makeNode(type, sizeof(SIndefRowsFuncLogicNode), &pNode); break; case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: code = makeNode(type, sizeof(SInterpFuncLogicNode), &pNode); break; + case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC: + code = makeNode(type, sizeof(SForecastFuncLogicNode), &pNode); break; case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: code = makeNode(type, sizeof(SGroupCacheLogicNode), &pNode); break; case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL: @@ -722,6 +734,8 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { code = makeNode(type, sizeof(SStreamEventWinodwPhysiNode), &pNode); break; case QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT: code = makeNode(type, sizeof(SCountWinodwPhysiNode), &pNode); break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY: + code = makeNode(type, sizeof(SAnomalyWindowPhysiNode), &pNode); break; case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT: code = makeNode(type, sizeof(SStreamCountWinodwPhysiNode), &pNode); break; case QUERY_NODE_PHYSICAL_PLAN_PARTITION: @@ -732,6 +746,8 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { code = makeNode(type, sizeof(SIndefRowsFuncPhysiNode), &pNode); break; case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: code = makeNode(type, sizeof(SInterpFuncLogicNode), &pNode); break; + case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC: + code = makeNode(type, sizeof(SForecastFuncLogicNode), &pNode); break; case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: code = makeNode(type, sizeof(SDataDispatcherNode), &pNode); break; case QUERY_NODE_PHYSICAL_PLAN_INSERT: @@ -1019,6 +1035,11 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pEvent->pCol); break; } + case QUERY_NODE_ANOMALY_WINDOW: { + SAnomalyWindowNode* pAnomaly = (SAnomalyWindowNode*)pNode; + nodesDestroyNode(pAnomaly->pCol); + break; + } case QUERY_NODE_HINT: { SHintNode* pHint = (SHintNode*)pNode; destroyHintValue(pHint->option, pHint->value); @@ -1167,6 +1188,9 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_CREATE_DNODE_STMT: // no pointer field case QUERY_NODE_DROP_DNODE_STMT: // no pointer field case QUERY_NODE_ALTER_DNODE_STMT: // no pointer field + case QUERY_NODE_CREATE_ANODE_STMT: // no pointer field + case QUERY_NODE_UPDATE_ANODE_STMT: // no pointer field + case QUERY_NODE_DROP_ANODE_STMT: // no pointer field break; case QUERY_NODE_CREATE_INDEX_STMT: { SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pNode; @@ -1252,6 +1276,8 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_SHOW_MNODES_STMT: case QUERY_NODE_SHOW_MODULES_STMT: case QUERY_NODE_SHOW_QNODES_STMT: + case QUERY_NODE_SHOW_ANODES_STMT: + case QUERY_NODE_SHOW_ANODES_FULL_STMT: case QUERY_NODE_SHOW_SNODES_STMT: case QUERY_NODE_SHOW_BNODES_STMT: case QUERY_NODE_SHOW_ARBGROUPS_STMT: @@ -1500,6 +1526,12 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pLogicNode->pTimeSeries); break; } + case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC: { + SForecastFuncLogicNode* pLogicNode = (SForecastFuncLogicNode*)pNode; + destroyLogicNode((SLogicNode*)pLogicNode); + nodesDestroyList(pLogicNode->pFuncs); + break; + } case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: { SGroupCacheLogicNode* pLogicNode = (SGroupCacheLogicNode*)pNode; destroyLogicNode((SLogicNode*)pLogicNode); @@ -1663,6 +1695,11 @@ void nodesDestroyNode(SNode* pNode) { destroyWinodwPhysiNode((SWindowPhysiNode*)pPhyNode); break; } + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY: { + SAnomalyWindowPhysiNode* pPhyNode = (SAnomalyWindowPhysiNode*)pNode; + destroyWinodwPhysiNode((SWindowPhysiNode*)pPhyNode); + break; + } case QUERY_NODE_PHYSICAL_PLAN_PARTITION: { destroyPartitionPhysiNode((SPartitionPhysiNode*)pNode); break; @@ -1690,6 +1727,13 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pPhyNode->pTimeSeries); break; } + case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC: { + SForecastFuncPhysiNode* pPhyNode = (SForecastFuncPhysiNode*)pNode; + destroyPhysiNode((SPhysiNode*)pPhyNode); + nodesDestroyList(pPhyNode->pExprs); + nodesDestroyList(pPhyNode->pFuncs); + break; + } case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: destroyDataSinkNode((SDataSinkNode*)pNode); break; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 3ae4583013..ab06008e6c 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -24,6 +24,7 @@ #include "parUtil.h" #include "scalar.h" #include "systable.h" +#include "tanal.h" #include "tcol.h" #include "tglobal.h" #include "ttime.h" @@ -348,6 +349,20 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = { .numOfShowCols = 1, .pShowCols = {"*"} }, + { + .showType = QUERY_NODE_SHOW_ANODES_STMT, + .pDbName = TSDB_INFORMATION_SCHEMA_DB, + .pTableName = TSDB_INS_TABLE_ANODES, + .numOfShowCols = 1, + .pShowCols = {"*"} + }, + { + .showType = QUERY_NODE_SHOW_ANODES_FULL_STMT, + .pDbName = TSDB_INFORMATION_SCHEMA_DB, + .pTableName = TSDB_INS_TABLE_ANODES_FULL, + .numOfShowCols = 1, + .pShowCols = {"*"} + }, }; // clang-format on @@ -1035,6 +1050,14 @@ static bool isInterpPseudoColumnFunc(const SNode* pNode) { return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsInterpPseudoColumnFunc(((SFunctionNode*)pNode)->funcId)); } +static bool isForecastFunc(const SNode* pNode) { + return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsForecastFunc(((SFunctionNode*)pNode)->funcId)); +} + +static bool isForecastPseudoColumnFunc(const SNode* pNode) { + return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsForecastPseudoColumnFunc(((SFunctionNode*)pNode)->funcId)); +} + #ifdef BUILD_NO_CALL static bool isTimelineFunc(const SNode* pNode) { return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsTimelineFunc(((SFunctionNode*)pNode)->funcId)); @@ -1237,7 +1260,7 @@ bool isPrimaryKeyImpl(SNode* pExpr) { FUNCTION_TYPE_LAST_ROW == pFunc->funcType || FUNCTION_TYPE_TIMETRUNCATE == pFunc->funcType) { return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0)); } else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType || - FUNCTION_TYPE_IROWTS == pFunc->funcType) { + FUNCTION_TYPE_IROWTS == pFunc->funcType || FUNCTION_TYPE_FORECAST_ROWTS == pFunc->funcType) { return true; } } else if (QUERY_NODE_OPERATOR == nodeType(pExpr)) { @@ -2250,7 +2273,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) { static EDealRes haveVectorFunction(SNode* pNode, void* pContext) { if (isAggFunc(pNode) || isIndefiniteRowsFunc(pNode) || isWindowPseudoColumnFunc(pNode) || - isInterpPseudoColumnFunc(pNode)) { + isInterpPseudoColumnFunc(pNode) || isForecastPseudoColumnFunc(pNode)) { *((bool*)pContext) = true; return DEAL_RES_END; } @@ -2553,6 +2576,72 @@ static int32_t translateInterpPseudoColumnFunc(STranslateContext* pCxt, SNode** return TSDB_CODE_SUCCESS; } +static int32_t translateForecastFunc(STranslateContext* pCxt, SFunctionNode* pFunc) { + if (!fmIsForecastFunc(pFunc->funcId)) { + return TSDB_CODE_SUCCESS; + } + if (!isSelectStmt(pCxt->pCurrStmt) || SQL_CLAUSE_SELECT != pCxt->currClause) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC); + } + SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt; + SNode* pTable = pSelect->pFromTable; + + if (pSelect->hasAggFuncs || pSelect->hasMultiRowsFunc || pSelect->hasIndefiniteRowsFunc) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC); + } + + if (pSelect->hasForecastFunc && + (FUNC_RETURN_ROWS_INDEFINITE == pSelect->returnRows || pSelect->returnRows != fmGetFuncReturnRows(pFunc))) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC, + "%s ignoring null value options cannot be used when applying to multiple columns", + pFunc->functionName); + } + + if (NULL != pSelect->pWindow || NULL != pSelect->pGroupByList) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC, + "%s function is not supported in window query or group query", pFunc->functionName); + } + if (hasInvalidFuncNesting(pFunc->pParameterList)) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_AGG_FUNC_NESTING); + } + return TSDB_CODE_SUCCESS; +} + +static int32_t translateForecastPseudoColumnFunc(STranslateContext* pCxt, SNode** ppNode, bool* pRewriteToColumn) { + SFunctionNode* pFunc = (SFunctionNode*)(*ppNode); + if (!fmIsForecastPseudoColumnFunc(pFunc->funcId)) { + return TSDB_CODE_SUCCESS; + } + if (!isSelectStmt(pCxt->pCurrStmt)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC, + "%s must be used in select statements", pFunc->functionName); + } + if (pCxt->currClause == SQL_CLAUSE_WHERE) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE, + "%s is not allowed in where clause", pFunc->functionName); + } + + SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt; + SNode* pNode = NULL; + bool bFound = false; + FOREACH(pNode, pSelect->pProjectionList) { + if (nodeType(pNode) == QUERY_NODE_FUNCTION && strcasecmp(((SFunctionNode*)pNode)->functionName, "forecast") == 0) { + bFound = true; + break; + } + } + if (!bFound) { + *pRewriteToColumn = true; + int32_t code = replacePsedudoColumnFuncWithColumn(pCxt, ppNode); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + (void)translateColumn(pCxt, (SColumnNode**)ppNode); + return pCxt->errCode; + } + return TSDB_CODE_SUCCESS; +} + static int32_t translateTimelineFunc(STranslateContext* pCxt, SFunctionNode* pFunc) { if (!fmIsTimelineFunc(pFunc->funcId)) { return TSDB_CODE_SUCCESS; @@ -2738,6 +2827,8 @@ static void setFuncClassification(STranslateContext* pCxt, SFunctionNode* pFunc) pSelect->returnRows = fmGetFuncReturnRows(pFunc); } else if (fmIsInterpFunc(pFunc->funcId)) { pSelect->returnRows = fmGetFuncReturnRows(pFunc); + } else if (fmIsForecastFunc(pFunc->funcId)) { + pSelect->returnRows = fmGetFuncReturnRows(pFunc); } if (fmIsProcessByRowFunc(pFunc->funcId)) { pSelect->lastProcessByRowFuncId = pFunc->funcId; @@ -2755,6 +2846,9 @@ static void setFuncClassification(STranslateContext* pCxt, SFunctionNode* pFunc) pSelect->hasInterpFunc = pSelect->hasInterpFunc ? true : (FUNCTION_TYPE_INTERP == pFunc->funcType); pSelect->hasInterpPseudoColFunc = pSelect->hasInterpPseudoColFunc ? true : fmIsInterpPseudoColumnFunc(pFunc->funcId); + pSelect->hasForecastFunc = pSelect->hasForecastFunc ? true : (FUNCTION_TYPE_FORECAST == pFunc->funcType); + pSelect->hasForecastPseudoColFunc = + pSelect->hasForecastPseudoColFunc ? true : fmIsForecastPseudoColumnFunc(pFunc->funcId); pSelect->hasLastRowFunc = pSelect->hasLastRowFunc ? true : (FUNCTION_TYPE_LAST_ROW == pFunc->funcType); pSelect->hasLastFunc = pSelect->hasLastFunc ? true : (FUNCTION_TYPE_LAST == pFunc->funcType); pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId); @@ -2946,6 +3040,9 @@ static int32_t translateScanPseudoColumnFunc(STranslateContext* pCxt, SNode** pp return TSDB_CODE_SUCCESS; } if (0 == LIST_LENGTH(pFunc->pParameterList)) { + if (pFunc->funcType == FUNCTION_TYPE_FORECAST_LOW || pFunc->funcType == FUNCTION_TYPE_FORECAST_HIGH) { + return TSDB_CODE_SUCCESS; + } if (!isSelectStmt(pCxt->pCurrStmt) || NULL == ((SSelectStmt*)pCxt->pCurrStmt)->pFromTable) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TBNAME); } @@ -3016,6 +3113,16 @@ static int32_t translateNormalFunction(STranslateContext* pCxt, SNode** ppNode) return code; } } + if (TSDB_CODE_SUCCESS == code) { + code = translateForecastFunc(pCxt, pFunc); + } + if (TSDB_CODE_SUCCESS == code) { + bool bRewriteToColumn = false; + code = translateForecastPseudoColumnFunc(pCxt, ppNode, &bRewriteToColumn); + if (bRewriteToColumn) { + return code; + } + } if (TSDB_CODE_SUCCESS == code) { code = translateTimelineFunc(pCxt, pFunc); } @@ -3759,7 +3866,8 @@ static int32_t resetSelectFuncNumWithoutDup(SSelectStmt* pSelect) { static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) { if (NULL != pSelect->pGroupByList || NULL != pSelect->pWindow || isWindowJoinStmt(pSelect) || - (!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && !pSelect->hasInterpFunc)) { + (!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && !pSelect->hasInterpFunc && + !pSelect->hasForecastFunc)) { return TSDB_CODE_SUCCESS; } if (!pSelect->onlyHasKeepOrderFunc) { @@ -3781,8 +3889,8 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) } static int32_t checkWinJoinAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) { - if (!isWindowJoinStmt(pSelect) || - (!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && !pSelect->hasInterpFunc)) { + if (!isWindowJoinStmt(pSelect) || (!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && + !pSelect->hasInterpFunc && !pSelect->hasForecastFunc)) { return TSDB_CODE_SUCCESS; } if (!pSelect->onlyHasKeepOrderFunc) { @@ -5795,6 +5903,40 @@ static int32_t translateCountWindow(STranslateContext* pCxt, SSelectStmt* pSelec return TSDB_CODE_SUCCESS; } +static int32_t checkAnomalyExpr(STranslateContext* pCxt, SNode* pNode) { + int32_t type = ((SExprNode*)pNode)->resType.type; + if (!IS_MATHABLE_TYPE(type)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ANOMALY_WIN_TYPE, + "ANOMALY_WINDOW only support mathable column"); + } + + if (QUERY_NODE_COLUMN == nodeType(pNode) && COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ANOMALY_WIN_COL, + "ANOMALY_WINDOW not support on tag column"); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t translateAnomalyWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { + if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) && + !isGlobalTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TIMELINE_QUERY, + "ANOMALY_WINDOW requires valid time series input"); + } + + SAnomalyWindowNode* pAnomaly = (SAnomalyWindowNode*)pSelect->pWindow; + int32_t code = checkAnomalyExpr(pCxt, pAnomaly->pExpr); + if (TSDB_CODE_SUCCESS == code) { + if (!taosAnalGetOptStr(pAnomaly->anomalyOpt, "algo", NULL, 0) != 0) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT, + "ANOMALY_WINDOW option should include algo field"); + } + } + + return code; +} + static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { switch (nodeType(pSelect->pWindow)) { case QUERY_NODE_STATE_WINDOW: @@ -5807,6 +5949,8 @@ static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSe return translateEventWindow(pCxt, pSelect); case QUERY_NODE_COUNT_WINDOW: return translateCountWindow(pCxt, pSelect); + case QUERY_NODE_ANOMALY_WINDOW: + return translateAnomalyWindow(pCxt, pSelect); default: break; } @@ -6043,6 +6187,26 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { return code; } +static int32_t translateForecast(STranslateContext* pCxt, SSelectStmt* pSelect) { + if (!pSelect->hasForecastFunc) { + if (pSelect->hasForecastPseudoColFunc) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC, + "Has Forecast pseudo column(s) but missing forcast function"); + } + return TSDB_CODE_SUCCESS; + } + + if ((NULL != pSelect->pFromTable) && (QUERY_NODE_JOIN_TABLE == nodeType(pSelect->pFromTable))) { + SJoinTableNode* pJoinTable = (SJoinTableNode*)pSelect->pFromTable; + if (IS_WINDOW_JOIN(pJoinTable->subType)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE, + "Forecast not supported to be used in WINDOW join"); + } + } + + return 0; +} + static int32_t removeConstantValueFromList(SNodeList** pList) { SNode* pNode = NULL; WHERE_EACH(pNode, *pList) { @@ -6884,6 +7048,9 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect if (TSDB_CODE_SUCCESS == code) { code = translateInterp(pCxt, pSelect); } + if (TSDB_CODE_SUCCESS == code) { + code = translateForecast(pCxt, pSelect); + } if (TSDB_CODE_SUCCESS == code) { code = appendTsForImplicitTsFunc(pCxt, pSelect); } @@ -7895,6 +8062,19 @@ static int32_t fillCmdSql(STranslateContext* pCxt, int16_t msgType, void* pReq) break; } + case TDMT_MND_CREATE_ANODE: { + FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMCreateAnodeReq, pReq); + break; + } + case TDMT_MND_DROP_ANODE: { + FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMDropAnodeReq, pReq); + break; + } + case TDMT_MND_UPDATE_ANODE: { + FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMUpdateAnodeReq, pReq); + break; + } + case TDMT_MND_CREATE_MNODE: { FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMCreateMnodeReq, pReq); break; @@ -9398,6 +9578,39 @@ static int32_t translateDropUser(STranslateContext* pCxt, SDropUserStmt* pStmt) return code; } +static int32_t translateCreateAnode(STranslateContext* pCxt, SCreateAnodeStmt* pStmt) { + SMCreateAnodeReq createReq = {0}; + createReq.urlLen = strlen(pStmt->url) + 1; + createReq.url = taosMemoryCalloc(createReq.urlLen, 1); + if (createReq.url == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + tstrncpy(createReq.url, pStmt->url, createReq.urlLen); + + int32_t code = buildCmdMsg(pCxt, TDMT_MND_CREATE_ANODE, (FSerializeFunc)tSerializeSMCreateAnodeReq, &createReq); + tFreeSMCreateAnodeReq(&createReq); + return code; +} + +static int32_t translateDropAnode(STranslateContext* pCxt, SDropAnodeStmt* pStmt) { + SMDropAnodeReq dropReq = {0}; + dropReq.anodeId = pStmt->anodeId; + + int32_t code = buildCmdMsg(pCxt, TDMT_MND_DROP_ANODE, (FSerializeFunc)tSerializeSMDropAnodeReq, &dropReq); + tFreeSMDropAnodeReq(&dropReq); + return code; +} + +static int32_t translateUpdateAnode(STranslateContext* pCxt, SUpdateAnodeStmt* pStmt) { + SMUpdateAnodeReq updateReq = {0}; + updateReq.anodeId = pStmt->anodeId; + + int32_t code = buildCmdMsg(pCxt, TDMT_MND_UPDATE_ANODE, (FSerializeFunc)tSerializeSMUpdateAnodeReq, &updateReq); + tFreeSMUpdateAnodeReq(&updateReq); + return code; +} + static int32_t translateCreateDnode(STranslateContext* pCxt, SCreateDnodeStmt* pStmt) { SCreateDnodeReq createReq = {0}; strcpy(createReq.fqdn, pStmt->fqdn); @@ -9820,7 +10033,7 @@ static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponen } static int32_t checkTopicQuery(STranslateContext* pCxt, SSelectStmt* pSelect) { - if (pSelect->hasAggFuncs || pSelect->hasInterpFunc || pSelect->hasIndefiniteRowsFunc) { + if (pSelect->hasAggFuncs || pSelect->hasForecastFunc || pSelect->hasInterpFunc || pSelect->hasIndefiniteRowsFunc) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TOPIC_QUERY); } return TSDB_CODE_SUCCESS; @@ -10186,7 +10399,8 @@ static int32_t translateKillTransaction(STranslateContext* pCxt, SKillStmt* pStm static bool crossTableWithoutAggOper(SSelectStmt* pSelect) { return NULL == pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && - !pSelect->hasInterpFunc && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && + !pSelect->hasInterpFunc && !pSelect->hasForecastFunc && + TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && !hasTbnameFunction(pSelect->pPartitionByList); } @@ -12389,6 +12603,15 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { case QUERY_NODE_ALTER_DNODE_STMT: code = translateAlterDnode(pCxt, (SAlterDnodeStmt*)pNode); break; + case QUERY_NODE_CREATE_ANODE_STMT: + code = translateCreateAnode(pCxt, (SCreateAnodeStmt*)pNode); + break; + case QUERY_NODE_DROP_ANODE_STMT: + code = translateDropAnode(pCxt, (SDropAnodeStmt*)pNode); + break; + case QUERY_NODE_UPDATE_ANODE_STMT: + code = translateUpdateAnode(pCxt, (SUpdateAnodeStmt*)pNode); + break; case QUERY_NODE_CREATE_INDEX_STMT: code = translateCreateIndex(pCxt, (SCreateIndexStmt*)pNode); break; @@ -15749,6 +15972,8 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { case QUERY_NODE_SHOW_MNODES_STMT: case QUERY_NODE_SHOW_MODULES_STMT: case QUERY_NODE_SHOW_QNODES_STMT: + case QUERY_NODE_SHOW_ANODES_STMT: + case QUERY_NODE_SHOW_ANODES_FULL_STMT: case QUERY_NODE_SHOW_FUNCTIONS_STMT: case QUERY_NODE_SHOW_INDEXES_STMT: case QUERY_NODE_SHOW_STREAMS_STMT: