diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 5db8863311..b77ffb8d2c 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -368,6 +368,13 @@ static int32_t countWindowNodeCopy(const SCountWindowNode* pSrc, SCountWindowNod return TSDB_CODE_SUCCESS; } +static int32_t anomalyWindowNodeCopy(const SAnomalyWindowNode* pSrc, SAnomalyWindowNode* pDst) { + CLONE_NODE_FIELD(pCol); + CLONE_NODE_FIELD(pExpr); + COPY_CHAR_ARRAY_FIELD(anomalyOpt); + return TSDB_CODE_SUCCESS; +} + static int32_t sessionWindowNodeCopy(const SSessionWindowNode* pSrc, SSessionWindowNode* pDst) { CLONE_NODE_FIELD_EX(pCol, SColumnNode*); CLONE_NODE_FIELD_EX(pGap, SValueNode*); @@ -622,6 +629,8 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p COPY_SCALAR_FIELD(windowAlgo); COPY_SCALAR_FIELD(windowCount); COPY_SCALAR_FIELD(windowSliding); + CLONE_NODE_FIELD(pAnomalyExpr); + COPY_CHAR_ARRAY_FIELD(anomalyOpt); return TSDB_CODE_SUCCESS; } @@ -674,6 +683,12 @@ static int32_t logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFunc return TSDB_CODE_SUCCESS; } +static int32_t logicForecastFuncCopy(const SForecastFuncLogicNode* pSrc, SForecastFuncLogicNode* pDst) { + COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); + CLONE_NODE_LIST_FIELD(pFuncs); + return TSDB_CODE_SUCCESS; +} + static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCacheLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_SCALAR_FIELD(grpColsMayBeNull); @@ -937,6 +952,9 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) { case QUERY_NODE_COUNT_WINDOW: code = countWindowNodeCopy((const SCountWindowNode*)pNode, (SCountWindowNode*)pDst); break; + case QUERY_NODE_ANOMALY_WINDOW: + code = anomalyWindowNodeCopy((const SAnomalyWindowNode*)pNode, (SAnomalyWindowNode*)pDst); + break; case QUERY_NODE_SESSION_WINDOW: code = sessionWindowNodeCopy((const SSessionWindowNode*)pNode, (SSessionWindowNode*)pDst); break; @@ -1021,6 +1039,9 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) { case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: code = logicInterpFuncCopy((const SInterpFuncLogicNode*)pNode, (SInterpFuncLogicNode*)pDst); break; + case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC: + code = logicForecastFuncCopy((const SForecastFuncLogicNode*)pNode, (SForecastFuncLogicNode*)pDst); + break; case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: code = logicGroupCacheCopy((const SGroupCacheLogicNode*)pNode, (SGroupCacheLogicNode*)pDst); break; diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 581c6222d2..3d8a57363b 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -3539,6 +3539,46 @@ static int32_t msgToPhysiCountWindowNode(STlvDecoder* pDecoder, void* pObj) { return code; } +enum { PHY_ANOMALY_CODE_WINDOW = 1, PHY_ANOMALY_CODE_KEY, PHY_ANOMALY_CODE_WINDOW_OPTION }; + +static int32_t physiAnomalyWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SAnomalyWindowPhysiNode* pNode = (const SAnomalyWindowPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_ANOMALY_CODE_WINDOW, physiWindowNodeToMsg, &pNode->window); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_ANOMALY_CODE_KEY, nodeToMsg, pNode->pAnomalyKey); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, PHY_ANOMALY_CODE_WINDOW_OPTION, pNode->anomalyOpt); + } + + return code; +} + +static int32_t msgToPhysiAnomalyWindowNode(STlvDecoder* pDecoder, void* pObj) { + SAnomalyWindowPhysiNode* pNode = (SAnomalyWindowPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_ANOMALY_CODE_WINDOW: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiWindowNode, &pNode->window); + break; + case PHY_ANOMALY_CODE_KEY: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pAnomalyKey); + break; + case PHY_ANOMALY_CODE_WINDOW_OPTION: + code = tlvDecodeCStr(pTlv, pNode->anomalyOpt, sizeof(pNode->anomalyOpt)); + break; + default: + break; + } + } + + return code; +} + enum { PHY_PARTITION_CODE_BASE_NODE = 1, PHY_PARTITION_CODE_EXPR, @@ -3770,6 +3810,50 @@ static int32_t msgToPhysiInterpFuncNode(STlvDecoder* pDecoder, void* pObj) { return code; } +enum { + PHY_FORECAST_FUNC_CODE_BASE_NODE = 1, + PHY_FORECAST_FUNC_CODE_EXPR, + PHY_FORECAST_FUNC_CODE_FUNCS, +}; + +static int32_t physiForecastFuncNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SForecastFuncPhysiNode* pNode = (const SForecastFuncPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_FORECAST_FUNC_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_FORECAST_FUNC_CODE_EXPR, nodeListToMsg, pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_FORECAST_FUNC_CODE_FUNCS, nodeListToMsg, pNode->pFuncs); + } + + return code; +} + +static int32_t msgToPhysiForecastFuncNode(STlvDecoder* pDecoder, void* pObj) { + SForecastFuncPhysiNode* pNode = (SForecastFuncPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_FORECAST_FUNC_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_FORECAST_FUNC_CODE_EXPR: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pExprs); + break; + case PHY_FORECAST_FUNC_CODE_FUNCS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pFuncs); + break; + default: + break; + } + } + + return code; +} + enum { PHY_DATA_SINK_CODE_INPUT_DESC = 1 }; static int32_t physicDataSinkNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -4536,6 +4620,9 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT: code = physiCountWindowNodeToMsg(pObj, pEncoder); break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY: + code = physiAnomalyWindowNodeToMsg(pObj, pEncoder); + break; case QUERY_NODE_PHYSICAL_PLAN_PARTITION: code = physiPartitionNodeToMsg(pObj, pEncoder); break; @@ -4548,6 +4635,9 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: code = physiInterpFuncNodeToMsg(pObj, pEncoder); break; + case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC: + code = physiForecastFuncNodeToMsg(pObj, pEncoder); + break; case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: code = physiDispatchNodeToMsg(pObj, pEncoder); break; @@ -4698,6 +4788,9 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT: code = msgToPhysiCountWindowNode(pDecoder, pObj); break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY: + code = msgToPhysiAnomalyWindowNode(pDecoder, pObj); + break; case QUERY_NODE_PHYSICAL_PLAN_PARTITION: code = msgToPhysiPartitionNode(pDecoder, pObj); break; @@ -4710,6 +4803,9 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: code = msgToPhysiInterpFuncNode(pDecoder, pObj); break; + case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC: + code = msgToPhysiForecastFuncNode(pDecoder, pObj); + break; case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: code = msgToPhysiDispatchNode(pDecoder, pObj); break; diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 10d9b19e7f..eecc04658b 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -555,6 +555,22 @@ static int32_t collectMetaKeyFromShowSnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* return TSDB_CODE_SUCCESS; } +static int32_t collectMetaKeyFromShowAnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { + if (pCxt->pParseCxt->enableSysInfo) { + return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_ANODES, + pCxt->pMetaCache); + } + return TSDB_CODE_SUCCESS; +} + +static int32_t collectMetaKeyFromShowAnodesFull(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { + if (pCxt->pParseCxt->enableSysInfo) { + return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_ANODES_FULL, + pCxt->pMetaCache); + } + return TSDB_CODE_SUCCESS; +} + static int32_t collectMetaKeyFromShowBnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { if (pCxt->pParseCxt->enableSysInfo) { return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_BNODES, @@ -983,6 +999,10 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromShowQnodes(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_SNODES_STMT: return collectMetaKeyFromShowSnodes(pCxt, (SShowStmt*)pStmt); + case QUERY_NODE_SHOW_ANODES_STMT: + return collectMetaKeyFromShowAnodes(pCxt, (SShowStmt*)pStmt); + case QUERY_NODE_SHOW_ANODES_FULL_STMT: + return collectMetaKeyFromShowAnodesFull(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_BNODES_STMT: return collectMetaKeyFromShowBnodes(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_ARBGROUPS_STMT: diff --git a/source/libs/parser/src/parAuthenticator.c b/source/libs/parser/src/parAuthenticator.c index 0eb07d8143..5ae17a7647 100644 --- a/source/libs/parser/src/parAuthenticator.c +++ b/source/libs/parser/src/parAuthenticator.c @@ -358,6 +358,8 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) { case QUERY_NODE_SHOW_MNODES_STMT: case QUERY_NODE_SHOW_MODULES_STMT: case QUERY_NODE_SHOW_QNODES_STMT: + case QUERY_NODE_SHOW_ANODES_STMT: + case QUERY_NODE_SHOW_ANODES_FULL_STMT: case QUERY_NODE_SHOW_SNODES_STMT: case QUERY_NODE_SHOW_BNODES_STMT: case QUERY_NODE_SHOW_CLUSTER_STMT: diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 6ad30eccb8..40cd415cb9 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -973,6 +973,45 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p return code; } +static bool isForecastFunc(int32_t funcId) { + return fmIsForecastFunc(funcId) || fmIsForecastPseudoColumnFunc(funcId) || fmIsGroupKeyFunc(funcId) || fmisSelectGroupConstValueFunc(funcId); +} + +static int32_t createForecastFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { + if (!pSelect->hasForecastFunc) { + return TSDB_CODE_SUCCESS; + } + + SForecastFuncLogicNode* pForecastFunc = NULL; + int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC, (SNode**)&pForecastFunc); + if (NULL == pForecastFunc) { + return code; + } + + pForecastFunc->node.groupAction = getGroupAction(pCxt, pSelect); + pForecastFunc->node.requireDataOrder = getRequireDataOrder(true, pSelect); + pForecastFunc->node.resultDataOrder = pForecastFunc->node.requireDataOrder; + + // interp functions and _group_key functions + code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, NULL, isForecastFunc, &pForecastFunc->pFuncs); + if (TSDB_CODE_SUCCESS == code) { + code = rewriteExprsForSelect(pForecastFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT, NULL); + } + + // set the output + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExprs(pForecastFunc->pFuncs, &pForecastFunc->node.pTargets); + } + + if (TSDB_CODE_SUCCESS == code) { + *pLogicNode = (SLogicNode*)pForecastFunc; + } else { + nodesDestroyNode((SNode*)pForecastFunc); + } + + return code; +} + static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow, SLogicNode** pLogicNode) { if (pCxt->pPlanCxt->streamQuery) { @@ -1174,6 +1213,48 @@ static int32_t createWindowLogicNodeByCount(SLogicPlanContext* pCxt, SCountWindo return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode); } +static int32_t createWindowLogicNodeByAnomaly(SLogicPlanContext* pCxt, SAnomalyWindowNode* pAnomaly, + SSelectStmt* pSelect, SLogicNode** pLogicNode) { + SWindowLogicNode* pWindow = NULL; + int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW, (SNode**)&pWindow); + if (NULL == pWindow) { + return code; + } + + pWindow->winType = WINDOW_TYPE_ANOMALY; + pWindow->node.groupAction = getGroupAction(pCxt, pSelect); + pWindow->node.requireDataOrder = + pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : getRequireDataOrder(true, pSelect); + pWindow->node.resultDataOrder = + pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_GLOBAL : pWindow->node.requireDataOrder; + + pWindow->pAnomalyExpr = NULL; + code = nodesCloneNode(pAnomaly->pExpr, &pWindow->pAnomalyExpr); + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode((SNode*)pWindow); + return code; + } + + tstrncpy(pWindow->anomalyOpt, pAnomaly->anomalyOpt, sizeof(pWindow->anomalyOpt)); + + pWindow->pTspk = NULL; + code = nodesCloneNode(pAnomaly->pCol, &pWindow->pTspk); + if (NULL == pWindow->pTspk) { + nodesDestroyNode((SNode*)pWindow); + return code; + } + + // rewrite the expression in subsequent clauses + code = rewriteExprForSelect(pWindow->pAnomalyExpr, pSelect, SQL_CLAUSE_WINDOW); + if (TSDB_CODE_SUCCESS == code) { + code = createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode); + } else { + nodesDestroyNode((SNode*)pWindow); + } + + return code; +} + static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { if (NULL == pSelect->pWindow) { return TSDB_CODE_SUCCESS; @@ -1189,6 +1270,8 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele return createWindowLogicNodeByEvent(pCxt, (SEventWindowNode*)pSelect->pWindow, pSelect, pLogicNode); case QUERY_NODE_COUNT_WINDOW: return createWindowLogicNodeByCount(pCxt, (SCountWindowNode*)pSelect->pWindow, pSelect, pLogicNode); + case QUERY_NODE_ANOMALY_WINDOW: + return createWindowLogicNodeByAnomaly(pCxt, (SAnomalyWindowNode*)pSelect->pWindow, pSelect, pLogicNode); default: break; } @@ -1600,6 +1683,9 @@ static int32_t createSelectFromLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p if (TSDB_CODE_SUCCESS == code) { code = createSelectRootLogicNode(pCxt, pSelect, createInterpFuncLogicNode, &pRoot); } + if (TSDB_CODE_SUCCESS == code) { + code = createSelectRootLogicNode(pCxt, pSelect, createForecastFuncLogicNode, &pRoot); + } if (TSDB_CODE_SUCCESS == code) { code = createSelectRootLogicNode(pCxt, pSelect, createDistinctLogicNode, &pRoot); } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index a00eb05482..a88909947b 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1990,6 +1990,50 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh return code; } +static int32_t createForecastFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, + SForecastFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) { + SForecastFuncPhysiNode* pForecastFunc = + (SForecastFuncPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC); + if (NULL == pForecastFunc) { + return terrno; + } + + SNodeList* pPrecalcExprs = NULL; + SNodeList* pFuncs = NULL; + int32_t code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs); + + SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); + // push down expression to pOutputDataBlockDesc of child node + if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) { + code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pForecastFunc->pExprs); + if (TSDB_CODE_SUCCESS == code) { + code = pushdownDataBlockSlots(pCxt, pForecastFunc->pExprs, pChildTupe); + } + } + + if (TSDB_CODE_SUCCESS == code) { + code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pForecastFunc->pFuncs); + if (TSDB_CODE_SUCCESS == code) { + code = addDataBlockSlots(pCxt, pForecastFunc->pFuncs, pForecastFunc->node.pOutputDataBlockDesc); + } + } + + if (TSDB_CODE_SUCCESS == code) { + code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pForecastFunc); + } + + if (TSDB_CODE_SUCCESS == code) { + *pPhyNode = (SPhysiNode*)pForecastFunc; + } else { + nodesDestroyNode((SNode*)pForecastFunc); + } + + nodesDestroyList(pPrecalcExprs); + nodesDestroyList(pFuncs); + + return code; +} + static bool projectCanMergeDataBlock(SProjectLogicNode* pProject) { if (GROUP_ACTION_KEEP == pProject->node.groupAction) { return false; @@ -2325,6 +2369,53 @@ static int32_t createCountWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC return code; } +static int32_t createAnomalyWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, + SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { + SAnomalyWindowPhysiNode* pAnomaly = (SAnomalyWindowPhysiNode*)makePhysiNode( + pCxt, (SLogicNode*)pWindowLogicNode, + (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY : QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY)); + if (NULL == pAnomaly) { + return terrno; + } + + SNodeList* pPrecalcExprs = NULL; + SNode* pAnomalyKey = NULL; + int32_t code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pAnomalyExpr, &pPrecalcExprs, &pAnomalyKey); + + SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); + // push down expression to pOutputDataBlockDesc of child node + if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) { + code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAnomaly->window.pExprs); + if (TSDB_CODE_SUCCESS == code) { + code = addDataBlockSlots(pCxt, pAnomaly->window.pExprs, pChildTupe); + } + } + + if (TSDB_CODE_SUCCESS == code) { + code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pAnomalyKey, &pAnomaly->pAnomalyKey); + // if (TSDB_CODE_SUCCESS == code) { + // code = addDataBlockSlot(pCxt, &pAnomaly->pAnomalyKey, pAnomaly->window.node.pOutputDataBlockDesc); + // } + } + + tstrncpy(pAnomaly->anomalyOpt, pWindowLogicNode->anomalyOpt, sizeof(pAnomaly->anomalyOpt)); + + if (TSDB_CODE_SUCCESS == code) { + code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pAnomaly->window, pWindowLogicNode); + } + + if (TSDB_CODE_SUCCESS == code) { + *pPhyNode = (SPhysiNode*)pAnomaly; + } else { + nodesDestroyNode((SNode*)pAnomaly); + } + + nodesDestroyList(pPrecalcExprs); + nodesDestroyNode(pAnomalyKey); + + return code; +} + static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { switch (pWindowLogicNode->winType) { @@ -2338,6 +2429,8 @@ static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildr return createEventWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode); case WINDOW_TYPE_COUNT: return createCountWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode); + case WINDOW_TYPE_ANOMALY: + return createAnomalyWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode); default: break; } @@ -2652,6 +2745,8 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode return createIndefRowsFuncPhysiNode(pCxt, pChildren, (SIndefRowsFuncLogicNode*)pLogicNode, pPhyNode); case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: return createInterpFuncPhysiNode(pCxt, pChildren, (SInterpFuncLogicNode*)pLogicNode, pPhyNode); + case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC: + return createForecastFuncPhysiNode(pCxt, pChildren, (SForecastFuncLogicNode*)pLogicNode, pPhyNode); case QUERY_NODE_LOGIC_PLAN_MERGE: return createMergePhysiNode(pCxt, pChildren, (SMergeLogicNode*)pLogicNode, pPhyNode); case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index b5f0bc50e8..ed597936c8 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -256,6 +256,15 @@ static int32_t adjustCountDataRequirement(SWindowLogicNode* pWindow, EDataOrderL return TSDB_CODE_SUCCESS; } +static int32_t adjustAnomalyDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { + if (requirement <= pWindow->node.resultDataOrder) { + return TSDB_CODE_SUCCESS; + } + pWindow->node.resultDataOrder = requirement; + pWindow->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { switch (pWindow->winType) { case WINDOW_TYPE_INTERVAL: @@ -268,6 +277,8 @@ static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrder return adjustEventDataRequirement(pWindow, requirement); case WINDOW_TYPE_COUNT: return adjustCountDataRequirement(pWindow, requirement); + case WINDOW_TYPE_ANOMALY: + return adjustAnomalyDataRequirement(pWindow, requirement); default: break; } @@ -318,6 +329,15 @@ static int32_t adjustInterpDataRequirement(SInterpFuncLogicNode* pInterp, EDataO return TSDB_CODE_SUCCESS; } +static int32_t adjustForecastDataRequirement(SForecastFuncLogicNode* pForecast, EDataOrderLevel requirement) { + if (requirement <= pForecast->node.requireDataOrder) { + return TSDB_CODE_SUCCESS; + } + pForecast->node.resultDataOrder = requirement; + pForecast->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requirement) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pNode)) { @@ -355,6 +375,9 @@ int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requir case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: code = adjustInterpDataRequirement((SInterpFuncLogicNode*)pNode, requirement); break; + case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC: + code = adjustForecastDataRequirement((SForecastFuncLogicNode*)pNode, requirement); + break; default: break; }