feat: analysis plan

This commit is contained in:
Shengliang Guan 2024-10-09 17:58:46 +08:00
parent 8eddf6c268
commit 234bf4b332
7 changed files with 343 additions and 0 deletions

View File

@ -368,6 +368,13 @@ static int32_t countWindowNodeCopy(const SCountWindowNode* pSrc, SCountWindowNod
return TSDB_CODE_SUCCESS;
}
static int32_t anomalyWindowNodeCopy(const SAnomalyWindowNode* pSrc, SAnomalyWindowNode* pDst) {
CLONE_NODE_FIELD(pCol);
CLONE_NODE_FIELD(pExpr);
COPY_CHAR_ARRAY_FIELD(anomalyOpt);
return TSDB_CODE_SUCCESS;
}
static int32_t sessionWindowNodeCopy(const SSessionWindowNode* pSrc, SSessionWindowNode* pDst) {
CLONE_NODE_FIELD_EX(pCol, SColumnNode*);
CLONE_NODE_FIELD_EX(pGap, SValueNode*);
@ -622,6 +629,8 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
COPY_SCALAR_FIELD(windowAlgo);
COPY_SCALAR_FIELD(windowCount);
COPY_SCALAR_FIELD(windowSliding);
CLONE_NODE_FIELD(pAnomalyExpr);
COPY_CHAR_ARRAY_FIELD(anomalyOpt);
return TSDB_CODE_SUCCESS;
}
@ -674,6 +683,12 @@ static int32_t logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFunc
return TSDB_CODE_SUCCESS;
}
static int32_t logicForecastFuncCopy(const SForecastFuncLogicNode* pSrc, SForecastFuncLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pFuncs);
return TSDB_CODE_SUCCESS;
}
static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCacheLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(grpColsMayBeNull);
@ -937,6 +952,9 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) {
case QUERY_NODE_COUNT_WINDOW:
code = countWindowNodeCopy((const SCountWindowNode*)pNode, (SCountWindowNode*)pDst);
break;
case QUERY_NODE_ANOMALY_WINDOW:
code = anomalyWindowNodeCopy((const SAnomalyWindowNode*)pNode, (SAnomalyWindowNode*)pDst);
break;
case QUERY_NODE_SESSION_WINDOW:
code = sessionWindowNodeCopy((const SSessionWindowNode*)pNode, (SSessionWindowNode*)pDst);
break;
@ -1021,6 +1039,9 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) {
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
code = logicInterpFuncCopy((const SInterpFuncLogicNode*)pNode, (SInterpFuncLogicNode*)pDst);
break;
case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
code = logicForecastFuncCopy((const SForecastFuncLogicNode*)pNode, (SForecastFuncLogicNode*)pDst);
break;
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
code = logicGroupCacheCopy((const SGroupCacheLogicNode*)pNode, (SGroupCacheLogicNode*)pDst);
break;

View File

@ -3539,6 +3539,46 @@ static int32_t msgToPhysiCountWindowNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { PHY_ANOMALY_CODE_WINDOW = 1, PHY_ANOMALY_CODE_KEY, PHY_ANOMALY_CODE_WINDOW_OPTION };
static int32_t physiAnomalyWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SAnomalyWindowPhysiNode* pNode = (const SAnomalyWindowPhysiNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, PHY_ANOMALY_CODE_WINDOW, physiWindowNodeToMsg, &pNode->window);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_ANOMALY_CODE_KEY, nodeToMsg, pNode->pAnomalyKey);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeCStr(pEncoder, PHY_ANOMALY_CODE_WINDOW_OPTION, pNode->anomalyOpt);
}
return code;
}
static int32_t msgToPhysiAnomalyWindowNode(STlvDecoder* pDecoder, void* pObj) {
SAnomalyWindowPhysiNode* pNode = (SAnomalyWindowPhysiNode*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case PHY_ANOMALY_CODE_WINDOW:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiWindowNode, &pNode->window);
break;
case PHY_ANOMALY_CODE_KEY:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pAnomalyKey);
break;
case PHY_ANOMALY_CODE_WINDOW_OPTION:
code = tlvDecodeCStr(pTlv, pNode->anomalyOpt, sizeof(pNode->anomalyOpt));
break;
default:
break;
}
}
return code;
}
enum {
PHY_PARTITION_CODE_BASE_NODE = 1,
PHY_PARTITION_CODE_EXPR,
@ -3770,6 +3810,50 @@ static int32_t msgToPhysiInterpFuncNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum {
PHY_FORECAST_FUNC_CODE_BASE_NODE = 1,
PHY_FORECAST_FUNC_CODE_EXPR,
PHY_FORECAST_FUNC_CODE_FUNCS,
};
static int32_t physiForecastFuncNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SForecastFuncPhysiNode* pNode = (const SForecastFuncPhysiNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, PHY_FORECAST_FUNC_CODE_BASE_NODE, physiNodeToMsg, &pNode->node);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_FORECAST_FUNC_CODE_EXPR, nodeListToMsg, pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_FORECAST_FUNC_CODE_FUNCS, nodeListToMsg, pNode->pFuncs);
}
return code;
}
static int32_t msgToPhysiForecastFuncNode(STlvDecoder* pDecoder, void* pObj) {
SForecastFuncPhysiNode* pNode = (SForecastFuncPhysiNode*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case PHY_FORECAST_FUNC_CODE_BASE_NODE:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node);
break;
case PHY_FORECAST_FUNC_CODE_EXPR:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pExprs);
break;
case PHY_FORECAST_FUNC_CODE_FUNCS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pFuncs);
break;
default:
break;
}
}
return code;
}
enum { PHY_DATA_SINK_CODE_INPUT_DESC = 1 };
static int32_t physicDataSinkNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -4536,6 +4620,9 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT:
code = physiCountWindowNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY:
code = physiAnomalyWindowNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
code = physiPartitionNodeToMsg(pObj, pEncoder);
break;
@ -4548,6 +4635,9 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
code = physiInterpFuncNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC:
code = physiForecastFuncNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
code = physiDispatchNodeToMsg(pObj, pEncoder);
break;
@ -4698,6 +4788,9 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT:
code = msgToPhysiCountWindowNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY:
code = msgToPhysiAnomalyWindowNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
code = msgToPhysiPartitionNode(pDecoder, pObj);
break;
@ -4710,6 +4803,9 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
code = msgToPhysiInterpFuncNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC:
code = msgToPhysiForecastFuncNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
code = msgToPhysiDispatchNode(pDecoder, pObj);
break;

View File

@ -555,6 +555,22 @@ static int32_t collectMetaKeyFromShowSnodes(SCollectMetaKeyCxt* pCxt, SShowStmt*
return TSDB_CODE_SUCCESS;
}
static int32_t collectMetaKeyFromShowAnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
if (pCxt->pParseCxt->enableSysInfo) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_ANODES,
pCxt->pMetaCache);
}
return TSDB_CODE_SUCCESS;
}
static int32_t collectMetaKeyFromShowAnodesFull(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
if (pCxt->pParseCxt->enableSysInfo) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_ANODES_FULL,
pCxt->pMetaCache);
}
return TSDB_CODE_SUCCESS;
}
static int32_t collectMetaKeyFromShowBnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
if (pCxt->pParseCxt->enableSysInfo) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_BNODES,
@ -983,6 +999,10 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowQnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_SNODES_STMT:
return collectMetaKeyFromShowSnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_ANODES_STMT:
return collectMetaKeyFromShowAnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_ANODES_FULL_STMT:
return collectMetaKeyFromShowAnodesFull(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_BNODES_STMT:
return collectMetaKeyFromShowBnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_ARBGROUPS_STMT:

View File

@ -358,6 +358,8 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
case QUERY_NODE_SHOW_MNODES_STMT:
case QUERY_NODE_SHOW_MODULES_STMT:
case QUERY_NODE_SHOW_QNODES_STMT:
case QUERY_NODE_SHOW_ANODES_STMT:
case QUERY_NODE_SHOW_ANODES_FULL_STMT:
case QUERY_NODE_SHOW_SNODES_STMT:
case QUERY_NODE_SHOW_BNODES_STMT:
case QUERY_NODE_SHOW_CLUSTER_STMT:

View File

@ -973,6 +973,45 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
return code;
}
static bool isForecastFunc(int32_t funcId) {
return fmIsForecastFunc(funcId) || fmIsForecastPseudoColumnFunc(funcId) || fmIsGroupKeyFunc(funcId) || fmisSelectGroupConstValueFunc(funcId);
}
static int32_t createForecastFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
if (!pSelect->hasForecastFunc) {
return TSDB_CODE_SUCCESS;
}
SForecastFuncLogicNode* pForecastFunc = NULL;
int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC, (SNode**)&pForecastFunc);
if (NULL == pForecastFunc) {
return code;
}
pForecastFunc->node.groupAction = getGroupAction(pCxt, pSelect);
pForecastFunc->node.requireDataOrder = getRequireDataOrder(true, pSelect);
pForecastFunc->node.resultDataOrder = pForecastFunc->node.requireDataOrder;
// interp functions and _group_key functions
code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, NULL, isForecastFunc, &pForecastFunc->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pForecastFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT, NULL);
}
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pForecastFunc->pFuncs, &pForecastFunc->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pForecastFunc;
} else {
nodesDestroyNode((SNode*)pForecastFunc);
}
return code;
}
static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow,
SLogicNode** pLogicNode) {
if (pCxt->pPlanCxt->streamQuery) {
@ -1174,6 +1213,48 @@ static int32_t createWindowLogicNodeByCount(SLogicPlanContext* pCxt, SCountWindo
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
}
static int32_t createWindowLogicNodeByAnomaly(SLogicPlanContext* pCxt, SAnomalyWindowNode* pAnomaly,
SSelectStmt* pSelect, SLogicNode** pLogicNode) {
SWindowLogicNode* pWindow = NULL;
int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW, (SNode**)&pWindow);
if (NULL == pWindow) {
return code;
}
pWindow->winType = WINDOW_TYPE_ANOMALY;
pWindow->node.groupAction = getGroupAction(pCxt, pSelect);
pWindow->node.requireDataOrder =
pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : getRequireDataOrder(true, pSelect);
pWindow->node.resultDataOrder =
pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_GLOBAL : pWindow->node.requireDataOrder;
pWindow->pAnomalyExpr = NULL;
code = nodesCloneNode(pAnomaly->pExpr, &pWindow->pAnomalyExpr);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode((SNode*)pWindow);
return code;
}
tstrncpy(pWindow->anomalyOpt, pAnomaly->anomalyOpt, sizeof(pWindow->anomalyOpt));
pWindow->pTspk = NULL;
code = nodesCloneNode(pAnomaly->pCol, &pWindow->pTspk);
if (NULL == pWindow->pTspk) {
nodesDestroyNode((SNode*)pWindow);
return code;
}
// rewrite the expression in subsequent clauses
code = rewriteExprForSelect(pWindow->pAnomalyExpr, pSelect, SQL_CLAUSE_WINDOW);
if (TSDB_CODE_SUCCESS == code) {
code = createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
} else {
nodesDestroyNode((SNode*)pWindow);
}
return code;
}
static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
if (NULL == pSelect->pWindow) {
return TSDB_CODE_SUCCESS;
@ -1189,6 +1270,8 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
return createWindowLogicNodeByEvent(pCxt, (SEventWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
case QUERY_NODE_COUNT_WINDOW:
return createWindowLogicNodeByCount(pCxt, (SCountWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
case QUERY_NODE_ANOMALY_WINDOW:
return createWindowLogicNodeByAnomaly(pCxt, (SAnomalyWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
default:
break;
}
@ -1600,6 +1683,9 @@ static int32_t createSelectFromLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
if (TSDB_CODE_SUCCESS == code) {
code = createSelectRootLogicNode(pCxt, pSelect, createInterpFuncLogicNode, &pRoot);
}
if (TSDB_CODE_SUCCESS == code) {
code = createSelectRootLogicNode(pCxt, pSelect, createForecastFuncLogicNode, &pRoot);
}
if (TSDB_CODE_SUCCESS == code) {
code = createSelectRootLogicNode(pCxt, pSelect, createDistinctLogicNode, &pRoot);
}

View File

@ -1990,6 +1990,50 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
return code;
}
static int32_t createForecastFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SForecastFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
SForecastFuncPhysiNode* pForecastFunc =
(SForecastFuncPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC);
if (NULL == pForecastFunc) {
return terrno;
}
SNodeList* pPrecalcExprs = NULL;
SNodeList* pFuncs = NULL;
int32_t code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
// push down expression to pOutputDataBlockDesc of child node
if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pForecastFunc->pExprs);
if (TSDB_CODE_SUCCESS == code) {
code = pushdownDataBlockSlots(pCxt, pForecastFunc->pExprs, pChildTupe);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pForecastFunc->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pForecastFunc->pFuncs, pForecastFunc->node.pOutputDataBlockDesc);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pForecastFunc);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pForecastFunc;
} else {
nodesDestroyNode((SNode*)pForecastFunc);
}
nodesDestroyList(pPrecalcExprs);
nodesDestroyList(pFuncs);
return code;
}
static bool projectCanMergeDataBlock(SProjectLogicNode* pProject) {
if (GROUP_ACTION_KEEP == pProject->node.groupAction) {
return false;
@ -2325,6 +2369,53 @@ static int32_t createCountWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
return code;
}
static int32_t createAnomalyWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SAnomalyWindowPhysiNode* pAnomaly = (SAnomalyWindowPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pWindowLogicNode,
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY : QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY));
if (NULL == pAnomaly) {
return terrno;
}
SNodeList* pPrecalcExprs = NULL;
SNode* pAnomalyKey = NULL;
int32_t code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pAnomalyExpr, &pPrecalcExprs, &pAnomalyKey);
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
// push down expression to pOutputDataBlockDesc of child node
if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAnomaly->window.pExprs);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pAnomaly->window.pExprs, pChildTupe);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pAnomalyKey, &pAnomaly->pAnomalyKey);
// if (TSDB_CODE_SUCCESS == code) {
// code = addDataBlockSlot(pCxt, &pAnomaly->pAnomalyKey, pAnomaly->window.node.pOutputDataBlockDesc);
// }
}
tstrncpy(pAnomaly->anomalyOpt, pWindowLogicNode->anomalyOpt, sizeof(pAnomaly->anomalyOpt));
if (TSDB_CODE_SUCCESS == code) {
code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pAnomaly->window, pWindowLogicNode);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pAnomaly;
} else {
nodesDestroyNode((SNode*)pAnomaly);
}
nodesDestroyList(pPrecalcExprs);
nodesDestroyNode(pAnomalyKey);
return code;
}
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode,
SPhysiNode** pPhyNode) {
switch (pWindowLogicNode->winType) {
@ -2338,6 +2429,8 @@ static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildr
return createEventWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
case WINDOW_TYPE_COUNT:
return createCountWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
case WINDOW_TYPE_ANOMALY:
return createAnomalyWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
default:
break;
}
@ -2652,6 +2745,8 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode
return createIndefRowsFuncPhysiNode(pCxt, pChildren, (SIndefRowsFuncLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
return createInterpFuncPhysiNode(pCxt, pChildren, (SInterpFuncLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
return createForecastFuncPhysiNode(pCxt, pChildren, (SForecastFuncLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_MERGE:
return createMergePhysiNode(pCxt, pChildren, (SMergeLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:

View File

@ -256,6 +256,15 @@ static int32_t adjustCountDataRequirement(SWindowLogicNode* pWindow, EDataOrderL
return TSDB_CODE_SUCCESS;
}
static int32_t adjustAnomalyDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
if (requirement <= pWindow->node.resultDataOrder) {
return TSDB_CODE_SUCCESS;
}
pWindow->node.resultDataOrder = requirement;
pWindow->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
switch (pWindow->winType) {
case WINDOW_TYPE_INTERVAL:
@ -268,6 +277,8 @@ static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrder
return adjustEventDataRequirement(pWindow, requirement);
case WINDOW_TYPE_COUNT:
return adjustCountDataRequirement(pWindow, requirement);
case WINDOW_TYPE_ANOMALY:
return adjustAnomalyDataRequirement(pWindow, requirement);
default:
break;
}
@ -318,6 +329,15 @@ static int32_t adjustInterpDataRequirement(SInterpFuncLogicNode* pInterp, EDataO
return TSDB_CODE_SUCCESS;
}
static int32_t adjustForecastDataRequirement(SForecastFuncLogicNode* pForecast, EDataOrderLevel requirement) {
if (requirement <= pForecast->node.requireDataOrder) {
return TSDB_CODE_SUCCESS;
}
pForecast->node.resultDataOrder = requirement;
pForecast->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requirement) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) {
@ -355,6 +375,9 @@ int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requir
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
code = adjustInterpDataRequirement((SInterpFuncLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
code = adjustForecastDataRequirement((SForecastFuncLogicNode*)pNode, requirement);
break;
default:
break;
}