From 8eddf6c2685b389ad833bf1148a2d3b99e089d14 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 9 Oct 2024 17:52:21 +0800 Subject: [PATCH 1/3] feat: analysis function --- source/libs/function/src/builtins.c | 86 ++++++++++++++++++++++++- source/libs/function/src/builtinsimpl.c | 6 ++ source/libs/function/src/functionMgt.c | 9 +++ source/libs/parser/src/parAstCreater.c | 60 +++++++++++++++++ source/libs/parser/src/parUtil.c | 2 + source/libs/planner/src/planOptimizer.c | 2 + source/libs/planner/src/planSpliter.c | 17 ++++- 7 files changed, 180 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 21fb57f5bb..be65f4cf06 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -16,9 +16,10 @@ #include "builtins.h" #include "builtinsimpl.h" #include "cJSON.h" +#include "geomFunc.h" #include "querynodes.h" #include "scalar.h" -#include "geomFunc.h" +#include "tanal.h" #include "taoserror.h" #include "ttime.h" @@ -2078,6 +2079,47 @@ static int32_t translateMode(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return translateUniqueMode(pFunc, pErrBuf, len, false); } +static int32_t translateForecast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); + if (2 != numOfParams && 1 != numOfParams) { + return invaildFuncParaNumErrMsg(pErrBuf, len, "FORECAST require 1 or 2 parameters"); + } + + uint8_t valType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type; + if (!IS_MATHABLE_TYPE(valType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, "FORECAST only support mathable column"); + } + + if (numOfParams == 2) { + uint8_t optionType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 1))->type; + if (TSDB_DATA_TYPE_BINARY != optionType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, "FORECAST option should be varchar"); + } + + SNode* pOption = nodesListGetNode(pFunc->pParameterList, 1); + if (QUERY_NODE_VALUE != nodeType(pOption)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, "FORECAST option should be value"); + } + + SValueNode* pValue = (SValueNode*)pOption; + if (!taosAnalGetOptStr(pValue->literal, "algo", NULL, 0) != 0) { + return invaildFuncParaValueErrMsg(pErrBuf, len, "FORECAST option should include algo field"); + } + + pValue->notReserved = true; + } + + pFunc->node.resType = (SDataType){.bytes = tDataTypes[valType].bytes, .type = valType}; + return TSDB_CODE_SUCCESS; +} + +static int32_t translateForecastConf(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_FLOAT].bytes, .type = TSDB_DATA_TYPE_FLOAT}; + return TSDB_CODE_SUCCESS; +} + +static EFuncReturnRows forecastEstReturnRows(SFunctionNode* pFunc) { return FUNC_RETURN_ROWS_N; } + static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); if (numOfParams > 2) { @@ -3623,6 +3665,48 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .estimateReturnRowsFunc = diffEstReturnRows, .processFuncByRow = diffFunctionByRow, }, + { + .name = "forecast", + .type = FUNCTION_TYPE_FORECAST, + .classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | + FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, + .translateFunc = translateForecast, + .getEnvFunc = getSelectivityFuncEnv, + .initFunc = functionSetup, + .processFunc = NULL, + .finalizeFunc = NULL, + .estimateReturnRowsFunc = forecastEstReturnRows, + }, + { + .name = "_frowts", + .type = FUNCTION_TYPE_FORECAST_ROWTS, + .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_FORECAST_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, + .translateFunc = translateTimePseudoColumn, + .getEnvFunc = getTimePseudoFuncEnv, + .initFunc = NULL, + .sprocessFunc = NULL, + .finalizeFunc = NULL + }, + { + .name = "_flow", + .type = FUNCTION_TYPE_FORECAST_LOW, + .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_FORECAST_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, + .translateFunc = translateForecastConf, + .getEnvFunc = getForecastConfEnv, + .initFunc = NULL, + .sprocessFunc = NULL, + .finalizeFunc = NULL + }, + { + .name = "_fhigh", + .type = FUNCTION_TYPE_FORECAST_HIGH, + .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_FORECAST_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, + .translateFunc = translateForecastConf, + .getEnvFunc = getForecastConfEnv, + .initFunc = NULL, + .sprocessFunc = NULL, + .finalizeFunc = NULL + }, { .name = "statecount", .type = FUNCTION_TYPE_STATE_COUNT, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index a7e2b28de2..4a8992e1c0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -18,6 +18,7 @@ #include "function.h" #include "query.h" #include "querynodes.h" +#include "tanal.h" #include "tcompare.h" #include "tdatablock.h" #include "tdigest.h" @@ -3578,6 +3579,11 @@ bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool f } } +bool getForecastConfEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(float); + return true; +} + int32_t diffResultIsNull(SqlFunctionCtx* pCtx, SFuncInputRow* pRow){ SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 886772b36c..12dbd88788 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -232,6 +232,15 @@ bool fmIsInterpFunc(int32_t funcId) { bool fmIsInterpPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INTERP_PC_FUNC); } +bool fmIsForecastFunc(int32_t funcId) { + if (funcId < 0 || funcId >= funcMgtBuiltinsNum) { + return false; + } + return FUNCTION_TYPE_FORECAST == funcMgtBuiltins[funcId].type; +} + +bool fmIsForecastPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORECAST_PC_FUNC); } + bool fmIsLastRowFunc(int32_t funcId) { if (funcId < 0 || funcId >= funcMgtBuiltinsNum) { return false; diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index cd3095ede8..684bd24a9c 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -1367,6 +1367,25 @@ _err: return NULL; } +SNode* createAnomalyWindowNode(SAstCreateContext* pCxt, SNode* pExpr, const SToken* pFuncOpt) { + SAnomalyWindowNode* pAnomaly = NULL; + CHECK_PARSER_STATUS(pCxt); + pCxt->errCode = nodesMakeNode(QUERY_NODE_ANOMALY_WINDOW, (SNode**)&pAnomaly); + CHECK_MAKE_NODE(pAnomaly); + pAnomaly->pCol = createPrimaryKeyCol(pCxt, NULL); + CHECK_MAKE_NODE(pAnomaly->pCol); + pAnomaly->pExpr = pExpr; + if (pFuncOpt == NULL) { + tstrncpy(pAnomaly->anomalyOpt, "algo=iqr", TSDB_ANAL_ALGO_OPTION_LEN); + } else { + (void)trimString(pFuncOpt->z, pFuncOpt->n, pAnomaly->anomalyOpt, sizeof(pAnomaly->anomalyOpt)); + } + return (SNode*)pAnomaly; +_err: + nodesDestroyNode((SNode*)pAnomaly); + return NULL; +} + SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding, SNode* pFill) { SIntervalWindowNode* interval = NULL; @@ -2997,6 +3016,47 @@ _err: return NULL; } +SNode* createCreateAnodeStmt(SAstCreateContext* pCxt, const SToken* pUrl) { + CHECK_PARSER_STATUS(pCxt); + SCreateAnodeStmt* pStmt = NULL; + pCxt->errCode = nodesMakeNode(QUERY_NODE_CREATE_ANODE_STMT, (SNode**)&pStmt); + CHECK_MAKE_NODE(pStmt); + (void)trimString(pUrl->z, pUrl->n, pStmt->url, sizeof(pStmt->url)); + return (SNode*)pStmt; +_err: + return NULL; +} + +SNode* createDropAnodeStmt(SAstCreateContext* pCxt, const SToken* pAnode) { + CHECK_PARSER_STATUS(pCxt); + SUpdateAnodeStmt* pStmt = NULL; + pCxt->errCode = nodesMakeNode(QUERY_NODE_DROP_ANODE_STMT, (SNode**)&pStmt); + CHECK_MAKE_NODE(pStmt); + if (NULL != pAnode) { + pStmt->anodeId = taosStr2Int32(pAnode->z, NULL, 10); + } else { + pStmt->anodeId = -1; + } + return (SNode*)pStmt; +_err: + return NULL; +} + +SNode* createUpdateAnodeStmt(SAstCreateContext* pCxt, const SToken* pAnode, bool updateAll) { + CHECK_PARSER_STATUS(pCxt); + SUpdateAnodeStmt* pStmt = NULL; + pCxt->errCode = nodesMakeNode(QUERY_NODE_UPDATE_ANODE_STMT, (SNode**)&pStmt); + CHECK_MAKE_NODE(pStmt); + if (NULL != pAnode) { + pStmt->anodeId = taosStr2Int32(pAnode->z, NULL, 10); + } else { + pStmt->anodeId = -1; + } + return (SNode*)pStmt; +_err: + return NULL; +} + SNode* createEncryptKeyStmt(SAstCreateContext* pCxt, const SToken* pValue) { SToken config; config.type = TK_NK_STRING; diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 1ce8b04324..189afdfcd3 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -185,6 +185,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "%s is not supported in system table query"; case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE: return "Invalid usage of RANGE clause, EVERY clause or FILL clause"; + case TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE: + return "Invalid usage of forecast clause"; case TSDB_CODE_PAR_NO_VALID_FUNC_IN_WIN: return "No valid function in window query"; case TSDB_CODE_PAR_INVALID_OPTR_USAGE: diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 1bcec86385..3b4e835465 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2380,6 +2380,8 @@ static bool sortPriKeyOptHasUnsupportedPkFunc(SLogicNode* pLogicNode, EOrder sor case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: pFuncList = ((SInterpFuncLogicNode*)pLogicNode)->pFuncs; break; + case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC: + pFuncList = ((SForecastFuncLogicNode*)pLogicNode)->pFuncs; default: break; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 706394507a..755dd8739b 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -939,6 +939,18 @@ static int32_t stbSplSplitCount(SSplitContext* pCxt, SStableSplitInfo* pInfo) { } } +static int32_t stbSplSplitAnomalyForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + return TSDB_CODE_PLAN_INTERNAL_ERROR; +} + +static int32_t stbSplSplitAnomaly(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + if (pCxt->pPlanCxt->streamQuery) { + return stbSplSplitAnomalyForStream(pCxt, pInfo); + } else { + return stbSplSplitSessionOrStateForBatch(pCxt, pInfo); + } +} + static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) { case WINDOW_TYPE_INTERVAL: @@ -951,6 +963,8 @@ static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitI return stbSplSplitEvent(pCxt, pInfo); case WINDOW_TYPE_COUNT: return stbSplSplitCount(pCxt, pInfo); + case WINDOW_TYPE_ANOMALY: + return stbSplSplitAnomaly(pCxt, pInfo); default: break; } @@ -2000,7 +2014,8 @@ typedef struct SQnodeSplitInfo { static bool qndSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, SQnodeSplitInfo* pInfo) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent && - QUERY_NODE_LOGIC_PLAN_INTERP_FUNC != nodeType(pNode->pParent) && ((SScanLogicNode*)pNode)->scanSeq[0] <= 1 && + QUERY_NODE_LOGIC_PLAN_INTERP_FUNC != nodeType(pNode->pParent) && + QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC != nodeType(pNode->pParent) && ((SScanLogicNode*)pNode)->scanSeq[0] <= 1 && ((SScanLogicNode*)pNode)->scanSeq[1] <= 1) { pInfo->pSplitNode = pNode; pInfo->pSubplan = pSubplan; From 234bf4b332a6148c69a0715910bedcb4da00ae86 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 9 Oct 2024 17:58:46 +0800 Subject: [PATCH 2/3] feat: analysis plan --- source/libs/nodes/src/nodesCloneFuncs.c | 21 +++++ source/libs/nodes/src/nodesMsgFuncs.c | 96 ++++++++++++++++++++++ source/libs/parser/src/parAstParser.c | 20 +++++ source/libs/parser/src/parAuthenticator.c | 2 + source/libs/planner/src/planLogicCreater.c | 86 +++++++++++++++++++ source/libs/planner/src/planPhysiCreater.c | 95 +++++++++++++++++++++ source/libs/planner/src/planUtil.c | 23 ++++++ 7 files changed, 343 insertions(+) diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 5db8863311..b77ffb8d2c 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -368,6 +368,13 @@ static int32_t countWindowNodeCopy(const SCountWindowNode* pSrc, SCountWindowNod return TSDB_CODE_SUCCESS; } +static int32_t anomalyWindowNodeCopy(const SAnomalyWindowNode* pSrc, SAnomalyWindowNode* pDst) { + CLONE_NODE_FIELD(pCol); + CLONE_NODE_FIELD(pExpr); + COPY_CHAR_ARRAY_FIELD(anomalyOpt); + return TSDB_CODE_SUCCESS; +} + static int32_t sessionWindowNodeCopy(const SSessionWindowNode* pSrc, SSessionWindowNode* pDst) { CLONE_NODE_FIELD_EX(pCol, SColumnNode*); CLONE_NODE_FIELD_EX(pGap, SValueNode*); @@ -622,6 +629,8 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p COPY_SCALAR_FIELD(windowAlgo); COPY_SCALAR_FIELD(windowCount); COPY_SCALAR_FIELD(windowSliding); + CLONE_NODE_FIELD(pAnomalyExpr); + COPY_CHAR_ARRAY_FIELD(anomalyOpt); return TSDB_CODE_SUCCESS; } @@ -674,6 +683,12 @@ static int32_t logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFunc return TSDB_CODE_SUCCESS; } +static int32_t logicForecastFuncCopy(const SForecastFuncLogicNode* pSrc, SForecastFuncLogicNode* pDst) { + COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); + CLONE_NODE_LIST_FIELD(pFuncs); + return TSDB_CODE_SUCCESS; +} + static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCacheLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_SCALAR_FIELD(grpColsMayBeNull); @@ -937,6 +952,9 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) { case QUERY_NODE_COUNT_WINDOW: code = countWindowNodeCopy((const SCountWindowNode*)pNode, (SCountWindowNode*)pDst); break; + case QUERY_NODE_ANOMALY_WINDOW: + code = anomalyWindowNodeCopy((const SAnomalyWindowNode*)pNode, (SAnomalyWindowNode*)pDst); + break; case QUERY_NODE_SESSION_WINDOW: code = sessionWindowNodeCopy((const SSessionWindowNode*)pNode, (SSessionWindowNode*)pDst); break; @@ -1021,6 +1039,9 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) { case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: code = logicInterpFuncCopy((const SInterpFuncLogicNode*)pNode, (SInterpFuncLogicNode*)pDst); break; + case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC: + code = logicForecastFuncCopy((const SForecastFuncLogicNode*)pNode, (SForecastFuncLogicNode*)pDst); + break; case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: code = logicGroupCacheCopy((const SGroupCacheLogicNode*)pNode, (SGroupCacheLogicNode*)pDst); break; diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 581c6222d2..3d8a57363b 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -3539,6 +3539,46 @@ static int32_t msgToPhysiCountWindowNode(STlvDecoder* pDecoder, void* pObj) { return code; } +enum { PHY_ANOMALY_CODE_WINDOW = 1, PHY_ANOMALY_CODE_KEY, PHY_ANOMALY_CODE_WINDOW_OPTION }; + +static int32_t physiAnomalyWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SAnomalyWindowPhysiNode* pNode = (const SAnomalyWindowPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_ANOMALY_CODE_WINDOW, physiWindowNodeToMsg, &pNode->window); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_ANOMALY_CODE_KEY, nodeToMsg, pNode->pAnomalyKey); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, PHY_ANOMALY_CODE_WINDOW_OPTION, pNode->anomalyOpt); + } + + return code; +} + +static int32_t msgToPhysiAnomalyWindowNode(STlvDecoder* pDecoder, void* pObj) { + SAnomalyWindowPhysiNode* pNode = (SAnomalyWindowPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_ANOMALY_CODE_WINDOW: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiWindowNode, &pNode->window); + break; + case PHY_ANOMALY_CODE_KEY: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pAnomalyKey); + break; + case PHY_ANOMALY_CODE_WINDOW_OPTION: + code = tlvDecodeCStr(pTlv, pNode->anomalyOpt, sizeof(pNode->anomalyOpt)); + break; + default: + break; + } + } + + return code; +} + enum { PHY_PARTITION_CODE_BASE_NODE = 1, PHY_PARTITION_CODE_EXPR, @@ -3770,6 +3810,50 @@ static int32_t msgToPhysiInterpFuncNode(STlvDecoder* pDecoder, void* pObj) { return code; } +enum { + PHY_FORECAST_FUNC_CODE_BASE_NODE = 1, + PHY_FORECAST_FUNC_CODE_EXPR, + PHY_FORECAST_FUNC_CODE_FUNCS, +}; + +static int32_t physiForecastFuncNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SForecastFuncPhysiNode* pNode = (const SForecastFuncPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_FORECAST_FUNC_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_FORECAST_FUNC_CODE_EXPR, nodeListToMsg, pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_FORECAST_FUNC_CODE_FUNCS, nodeListToMsg, pNode->pFuncs); + } + + return code; +} + +static int32_t msgToPhysiForecastFuncNode(STlvDecoder* pDecoder, void* pObj) { + SForecastFuncPhysiNode* pNode = (SForecastFuncPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_FORECAST_FUNC_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_FORECAST_FUNC_CODE_EXPR: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pExprs); + break; + case PHY_FORECAST_FUNC_CODE_FUNCS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pFuncs); + break; + default: + break; + } + } + + return code; +} + enum { PHY_DATA_SINK_CODE_INPUT_DESC = 1 }; static int32_t physicDataSinkNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -4536,6 +4620,9 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT: code = physiCountWindowNodeToMsg(pObj, pEncoder); break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY: + code = physiAnomalyWindowNodeToMsg(pObj, pEncoder); + break; case QUERY_NODE_PHYSICAL_PLAN_PARTITION: code = physiPartitionNodeToMsg(pObj, pEncoder); break; @@ -4548,6 +4635,9 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: code = physiInterpFuncNodeToMsg(pObj, pEncoder); break; + case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC: + code = physiForecastFuncNodeToMsg(pObj, pEncoder); + break; case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: code = physiDispatchNodeToMsg(pObj, pEncoder); break; @@ -4698,6 +4788,9 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT: code = msgToPhysiCountWindowNode(pDecoder, pObj); break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY: + code = msgToPhysiAnomalyWindowNode(pDecoder, pObj); + break; case QUERY_NODE_PHYSICAL_PLAN_PARTITION: code = msgToPhysiPartitionNode(pDecoder, pObj); break; @@ -4710,6 +4803,9 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: code = msgToPhysiInterpFuncNode(pDecoder, pObj); break; + case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC: + code = msgToPhysiForecastFuncNode(pDecoder, pObj); + break; case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: code = msgToPhysiDispatchNode(pDecoder, pObj); break; diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 10d9b19e7f..eecc04658b 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -555,6 +555,22 @@ static int32_t collectMetaKeyFromShowSnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* return TSDB_CODE_SUCCESS; } +static int32_t collectMetaKeyFromShowAnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { + if (pCxt->pParseCxt->enableSysInfo) { + return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_ANODES, + pCxt->pMetaCache); + } + return TSDB_CODE_SUCCESS; +} + +static int32_t collectMetaKeyFromShowAnodesFull(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { + if (pCxt->pParseCxt->enableSysInfo) { + return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_ANODES_FULL, + pCxt->pMetaCache); + } + return TSDB_CODE_SUCCESS; +} + static int32_t collectMetaKeyFromShowBnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { if (pCxt->pParseCxt->enableSysInfo) { return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_BNODES, @@ -983,6 +999,10 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromShowQnodes(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_SNODES_STMT: return collectMetaKeyFromShowSnodes(pCxt, (SShowStmt*)pStmt); + case QUERY_NODE_SHOW_ANODES_STMT: + return collectMetaKeyFromShowAnodes(pCxt, (SShowStmt*)pStmt); + case QUERY_NODE_SHOW_ANODES_FULL_STMT: + return collectMetaKeyFromShowAnodesFull(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_BNODES_STMT: return collectMetaKeyFromShowBnodes(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_ARBGROUPS_STMT: diff --git a/source/libs/parser/src/parAuthenticator.c b/source/libs/parser/src/parAuthenticator.c index 0eb07d8143..5ae17a7647 100644 --- a/source/libs/parser/src/parAuthenticator.c +++ b/source/libs/parser/src/parAuthenticator.c @@ -358,6 +358,8 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) { case QUERY_NODE_SHOW_MNODES_STMT: case QUERY_NODE_SHOW_MODULES_STMT: case QUERY_NODE_SHOW_QNODES_STMT: + case QUERY_NODE_SHOW_ANODES_STMT: + case QUERY_NODE_SHOW_ANODES_FULL_STMT: case QUERY_NODE_SHOW_SNODES_STMT: case QUERY_NODE_SHOW_BNODES_STMT: case QUERY_NODE_SHOW_CLUSTER_STMT: diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 6ad30eccb8..40cd415cb9 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -973,6 +973,45 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p return code; } +static bool isForecastFunc(int32_t funcId) { + return fmIsForecastFunc(funcId) || fmIsForecastPseudoColumnFunc(funcId) || fmIsGroupKeyFunc(funcId) || fmisSelectGroupConstValueFunc(funcId); +} + +static int32_t createForecastFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { + if (!pSelect->hasForecastFunc) { + return TSDB_CODE_SUCCESS; + } + + SForecastFuncLogicNode* pForecastFunc = NULL; + int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC, (SNode**)&pForecastFunc); + if (NULL == pForecastFunc) { + return code; + } + + pForecastFunc->node.groupAction = getGroupAction(pCxt, pSelect); + pForecastFunc->node.requireDataOrder = getRequireDataOrder(true, pSelect); + pForecastFunc->node.resultDataOrder = pForecastFunc->node.requireDataOrder; + + // interp functions and _group_key functions + code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, NULL, isForecastFunc, &pForecastFunc->pFuncs); + if (TSDB_CODE_SUCCESS == code) { + code = rewriteExprsForSelect(pForecastFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT, NULL); + } + + // set the output + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExprs(pForecastFunc->pFuncs, &pForecastFunc->node.pTargets); + } + + if (TSDB_CODE_SUCCESS == code) { + *pLogicNode = (SLogicNode*)pForecastFunc; + } else { + nodesDestroyNode((SNode*)pForecastFunc); + } + + return code; +} + static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow, SLogicNode** pLogicNode) { if (pCxt->pPlanCxt->streamQuery) { @@ -1174,6 +1213,48 @@ static int32_t createWindowLogicNodeByCount(SLogicPlanContext* pCxt, SCountWindo return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode); } +static int32_t createWindowLogicNodeByAnomaly(SLogicPlanContext* pCxt, SAnomalyWindowNode* pAnomaly, + SSelectStmt* pSelect, SLogicNode** pLogicNode) { + SWindowLogicNode* pWindow = NULL; + int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW, (SNode**)&pWindow); + if (NULL == pWindow) { + return code; + } + + pWindow->winType = WINDOW_TYPE_ANOMALY; + pWindow->node.groupAction = getGroupAction(pCxt, pSelect); + pWindow->node.requireDataOrder = + pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : getRequireDataOrder(true, pSelect); + pWindow->node.resultDataOrder = + pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_GLOBAL : pWindow->node.requireDataOrder; + + pWindow->pAnomalyExpr = NULL; + code = nodesCloneNode(pAnomaly->pExpr, &pWindow->pAnomalyExpr); + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode((SNode*)pWindow); + return code; + } + + tstrncpy(pWindow->anomalyOpt, pAnomaly->anomalyOpt, sizeof(pWindow->anomalyOpt)); + + pWindow->pTspk = NULL; + code = nodesCloneNode(pAnomaly->pCol, &pWindow->pTspk); + if (NULL == pWindow->pTspk) { + nodesDestroyNode((SNode*)pWindow); + return code; + } + + // rewrite the expression in subsequent clauses + code = rewriteExprForSelect(pWindow->pAnomalyExpr, pSelect, SQL_CLAUSE_WINDOW); + if (TSDB_CODE_SUCCESS == code) { + code = createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode); + } else { + nodesDestroyNode((SNode*)pWindow); + } + + return code; +} + static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { if (NULL == pSelect->pWindow) { return TSDB_CODE_SUCCESS; @@ -1189,6 +1270,8 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele return createWindowLogicNodeByEvent(pCxt, (SEventWindowNode*)pSelect->pWindow, pSelect, pLogicNode); case QUERY_NODE_COUNT_WINDOW: return createWindowLogicNodeByCount(pCxt, (SCountWindowNode*)pSelect->pWindow, pSelect, pLogicNode); + case QUERY_NODE_ANOMALY_WINDOW: + return createWindowLogicNodeByAnomaly(pCxt, (SAnomalyWindowNode*)pSelect->pWindow, pSelect, pLogicNode); default: break; } @@ -1600,6 +1683,9 @@ static int32_t createSelectFromLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p if (TSDB_CODE_SUCCESS == code) { code = createSelectRootLogicNode(pCxt, pSelect, createInterpFuncLogicNode, &pRoot); } + if (TSDB_CODE_SUCCESS == code) { + code = createSelectRootLogicNode(pCxt, pSelect, createForecastFuncLogicNode, &pRoot); + } if (TSDB_CODE_SUCCESS == code) { code = createSelectRootLogicNode(pCxt, pSelect, createDistinctLogicNode, &pRoot); } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index a00eb05482..a88909947b 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1990,6 +1990,50 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh return code; } +static int32_t createForecastFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, + SForecastFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) { + SForecastFuncPhysiNode* pForecastFunc = + (SForecastFuncPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC); + if (NULL == pForecastFunc) { + return terrno; + } + + SNodeList* pPrecalcExprs = NULL; + SNodeList* pFuncs = NULL; + int32_t code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs); + + SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); + // push down expression to pOutputDataBlockDesc of child node + if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) { + code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pForecastFunc->pExprs); + if (TSDB_CODE_SUCCESS == code) { + code = pushdownDataBlockSlots(pCxt, pForecastFunc->pExprs, pChildTupe); + } + } + + if (TSDB_CODE_SUCCESS == code) { + code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pForecastFunc->pFuncs); + if (TSDB_CODE_SUCCESS == code) { + code = addDataBlockSlots(pCxt, pForecastFunc->pFuncs, pForecastFunc->node.pOutputDataBlockDesc); + } + } + + if (TSDB_CODE_SUCCESS == code) { + code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pForecastFunc); + } + + if (TSDB_CODE_SUCCESS == code) { + *pPhyNode = (SPhysiNode*)pForecastFunc; + } else { + nodesDestroyNode((SNode*)pForecastFunc); + } + + nodesDestroyList(pPrecalcExprs); + nodesDestroyList(pFuncs); + + return code; +} + static bool projectCanMergeDataBlock(SProjectLogicNode* pProject) { if (GROUP_ACTION_KEEP == pProject->node.groupAction) { return false; @@ -2325,6 +2369,53 @@ static int32_t createCountWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC return code; } +static int32_t createAnomalyWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, + SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { + SAnomalyWindowPhysiNode* pAnomaly = (SAnomalyWindowPhysiNode*)makePhysiNode( + pCxt, (SLogicNode*)pWindowLogicNode, + (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY : QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY)); + if (NULL == pAnomaly) { + return terrno; + } + + SNodeList* pPrecalcExprs = NULL; + SNode* pAnomalyKey = NULL; + int32_t code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pAnomalyExpr, &pPrecalcExprs, &pAnomalyKey); + + SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); + // push down expression to pOutputDataBlockDesc of child node + if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) { + code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAnomaly->window.pExprs); + if (TSDB_CODE_SUCCESS == code) { + code = addDataBlockSlots(pCxt, pAnomaly->window.pExprs, pChildTupe); + } + } + + if (TSDB_CODE_SUCCESS == code) { + code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pAnomalyKey, &pAnomaly->pAnomalyKey); + // if (TSDB_CODE_SUCCESS == code) { + // code = addDataBlockSlot(pCxt, &pAnomaly->pAnomalyKey, pAnomaly->window.node.pOutputDataBlockDesc); + // } + } + + tstrncpy(pAnomaly->anomalyOpt, pWindowLogicNode->anomalyOpt, sizeof(pAnomaly->anomalyOpt)); + + if (TSDB_CODE_SUCCESS == code) { + code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pAnomaly->window, pWindowLogicNode); + } + + if (TSDB_CODE_SUCCESS == code) { + *pPhyNode = (SPhysiNode*)pAnomaly; + } else { + nodesDestroyNode((SNode*)pAnomaly); + } + + nodesDestroyList(pPrecalcExprs); + nodesDestroyNode(pAnomalyKey); + + return code; +} + static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { switch (pWindowLogicNode->winType) { @@ -2338,6 +2429,8 @@ static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildr return createEventWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode); case WINDOW_TYPE_COUNT: return createCountWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode); + case WINDOW_TYPE_ANOMALY: + return createAnomalyWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode); default: break; } @@ -2652,6 +2745,8 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode return createIndefRowsFuncPhysiNode(pCxt, pChildren, (SIndefRowsFuncLogicNode*)pLogicNode, pPhyNode); case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: return createInterpFuncPhysiNode(pCxt, pChildren, (SInterpFuncLogicNode*)pLogicNode, pPhyNode); + case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC: + return createForecastFuncPhysiNode(pCxt, pChildren, (SForecastFuncLogicNode*)pLogicNode, pPhyNode); case QUERY_NODE_LOGIC_PLAN_MERGE: return createMergePhysiNode(pCxt, pChildren, (SMergeLogicNode*)pLogicNode, pPhyNode); case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index b5f0bc50e8..ed597936c8 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -256,6 +256,15 @@ static int32_t adjustCountDataRequirement(SWindowLogicNode* pWindow, EDataOrderL return TSDB_CODE_SUCCESS; } +static int32_t adjustAnomalyDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { + if (requirement <= pWindow->node.resultDataOrder) { + return TSDB_CODE_SUCCESS; + } + pWindow->node.resultDataOrder = requirement; + pWindow->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { switch (pWindow->winType) { case WINDOW_TYPE_INTERVAL: @@ -268,6 +277,8 @@ static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrder return adjustEventDataRequirement(pWindow, requirement); case WINDOW_TYPE_COUNT: return adjustCountDataRequirement(pWindow, requirement); + case WINDOW_TYPE_ANOMALY: + return adjustAnomalyDataRequirement(pWindow, requirement); default: break; } @@ -318,6 +329,15 @@ static int32_t adjustInterpDataRequirement(SInterpFuncLogicNode* pInterp, EDataO return TSDB_CODE_SUCCESS; } +static int32_t adjustForecastDataRequirement(SForecastFuncLogicNode* pForecast, EDataOrderLevel requirement) { + if (requirement <= pForecast->node.requireDataOrder) { + return TSDB_CODE_SUCCESS; + } + pForecast->node.resultDataOrder = requirement; + pForecast->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requirement) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pNode)) { @@ -355,6 +375,9 @@ int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requir case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: code = adjustInterpDataRequirement((SInterpFuncLogicNode*)pNode, requirement); break; + case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC: + code = adjustForecastDataRequirement((SForecastFuncLogicNode*)pNode, requirement); + break; default: break; } From 44477014e23a39dbd37754168a2f042c567a91af Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 10 Oct 2024 21:01:43 +0800 Subject: [PATCH 3/3] feat: unitest error --- source/libs/function/src/builtins.c | 42 ----------------------------- 1 file changed, 42 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 30f51ab0df..9364c313ca 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -3665,48 +3665,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .estimateReturnRowsFunc = diffEstReturnRows, .processFuncByRow = diffFunctionByRow, }, - { - .name = "forecast", - .type = FUNCTION_TYPE_FORECAST, - .classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, - .translateFunc = translateForecast, - .getEnvFunc = getSelectivityFuncEnv, - .initFunc = functionSetup, - .processFunc = NULL, - .finalizeFunc = NULL, - .estimateReturnRowsFunc = forecastEstReturnRows, - }, - { - .name = "_frowts", - .type = FUNCTION_TYPE_FORECAST_ROWTS, - .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_FORECAST_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, - .translateFunc = translateTimePseudoColumn, - .getEnvFunc = getTimePseudoFuncEnv, - .initFunc = NULL, - .sprocessFunc = NULL, - .finalizeFunc = NULL - }, - { - .name = "_flow", - .type = FUNCTION_TYPE_FORECAST_LOW, - .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_FORECAST_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, - .translateFunc = translateForecastConf, - .getEnvFunc = getForecastConfEnv, - .initFunc = NULL, - .sprocessFunc = NULL, - .finalizeFunc = NULL - }, - { - .name = "_fhigh", - .type = FUNCTION_TYPE_FORECAST_HIGH, - .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_FORECAST_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, - .translateFunc = translateForecastConf, - .getEnvFunc = getForecastConfEnv, - .initFunc = NULL, - .sprocessFunc = NULL, - .finalizeFunc = NULL - }, { .name = "statecount", .type = FUNCTION_TYPE_STATE_COUNT,