diff --git a/docs-cn/14-reference/03-connector/cpp.mdx b/docs-cn/14-reference/03-connector/cpp.mdx
index aba1d6c717..aecf9fde12 100644
--- a/docs-cn/14-reference/03-connector/cpp.mdx
+++ b/docs-cn/14-reference/03-connector/cpp.mdx
@@ -114,7 +114,6 @@ TDengine 客户端驱动的安装请参考 [安装指南](/reference/connector#
订阅和消费
```c
-{{#include examples/c/subscribe.c}}
```
diff --git a/docs-en/14-reference/03-connector/cpp.mdx b/docs-en/14-reference/03-connector/cpp.mdx
index d13a74384c..d549413012 100644
--- a/docs-en/14-reference/03-connector/cpp.mdx
+++ b/docs-en/14-reference/03-connector/cpp.mdx
@@ -114,7 +114,6 @@ This section shows sample code for standard access methods to TDengine clusters
Subscribe and consume
```c
-{{#include examples/c/subscribe.c}}
```
diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h
index 922136b590..f3e28936af 100644
--- a/include/libs/function/functionMgt.h
+++ b/include/libs/function/functionMgt.h
@@ -156,6 +156,9 @@ bool fmIsDynamicScanOptimizedFunc(int32_t funcId);
bool fmIsMultiResFunc(int32_t funcId);
bool fmIsRepeatScanFunc(int32_t funcId);
bool fmIsUserDefinedFunc(int32_t funcId);
+bool fmIsDistExecFunc(int32_t funcId);
+
+int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
typedef enum EFuncDataRequired {
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h
index d960ccbd65..41c196c916 100644
--- a/include/libs/nodes/nodes.h
+++ b/include/libs/nodes/nodes.h
@@ -189,6 +189,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_PROJECT,
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF,
QUERY_NODE_LOGIC_PLAN_EXCHANGE,
+ QUERY_NODE_LOGIC_PLAN_MERGE,
QUERY_NODE_LOGIC_PLAN_WINDOW,
QUERY_NODE_LOGIC_PLAN_FILL,
QUERY_NODE_LOGIC_PLAN_SORT,
@@ -206,6 +207,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_JOIN,
QUERY_NODE_PHYSICAL_PLAN_AGG,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
+ QUERY_NODE_PHYSICAL_PLAN_MERGE,
QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h
index 44e7295b69..5892bffac2 100644
--- a/include/libs/nodes/plannodes.h
+++ b/include/libs/nodes/plannodes.h
@@ -95,9 +95,15 @@ typedef struct SVnodeModifLogicNode {
typedef struct SExchangeLogicNode {
SLogicNode node;
int32_t srcGroupId;
- uint8_t precision;
} SExchangeLogicNode;
+typedef struct SMergeLogicNode {
+ SLogicNode node;
+ SNodeList* pMergeKeys;
+ int32_t numOfChannels;
+ int32_t srcGroupId;
+} SMergeLogicNode;
+
typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType;
typedef struct SWindowLogicNode {
@@ -268,6 +274,13 @@ typedef struct SExchangePhysiNode {
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhysiNode;
+typedef struct SMergePhysiNode {
+ SPhysiNode node;
+ SNodeList* pMergeKeys;
+ int32_t numOfChannels;
+ int32_t srcGroupId;
+} SMergePhysiNode;
+
typedef struct SWinodwPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of parameter expression of function
diff --git a/source/libs/function/inc/builtins.h b/source/libs/function/inc/builtins.h
index 3bd0f35bf5..bc91875006 100644
--- a/source/libs/function/inc/builtins.h
+++ b/source/libs/function/inc/builtins.h
@@ -26,22 +26,24 @@ typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t l
typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
typedef struct SBuiltinFuncDefinition {
- char name[FUNCTION_NAME_MAX_LENGTH];
- EFunctionType type;
- uint64_t classification;
- FTranslateFunc translateFunc;
- FFuncDataRequired dataRequiredFunc;
- FExecGetEnv getEnvFunc;
- FExecInit initFunc;
- FExecProcess processFunc;
+ const char* name;
+ EFunctionType type;
+ uint64_t classification;
+ FTranslateFunc translateFunc;
+ FFuncDataRequired dataRequiredFunc;
+ FExecGetEnv getEnvFunc;
+ FExecInit initFunc;
+ FExecProcess processFunc;
FScalarExecProcess sprocessFunc;
- FExecFinalize finalizeFunc;
- FExecProcess invertFunc;
- FExecCombine combineFunc;
+ FExecFinalize finalizeFunc;
+ FExecProcess invertFunc;
+ FExecCombine combineFunc;
+ const char* pPartialFunc;
+ const char* pMergeFunc;
} SBuiltinFuncDefinition;
extern const SBuiltinFuncDefinition funcMgtBuiltins[];
-extern const int funcMgtBuiltinsNum;
+extern const int funcMgtBuiltinsNum;
#ifdef __cplusplus
}
diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c
index 759f2439bd..5560c9c1d5 100644
--- a/source/libs/function/src/builtins.c
+++ b/source/libs/function/src/builtins.c
@@ -178,14 +178,14 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
- //param0
+ // param0
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The first parameter of PERCENTILE function can only be column");
}
- //param1
+ // param1
SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
if (pValue->datum.i < 0 || pValue->datum.i > 100) {
@@ -200,7 +200,7 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
- //set result type
+ // set result type
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
return TSDB_CODE_SUCCESS;
}
@@ -219,14 +219,14 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
- //param0
+ // param0
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The first parameter of APERCENTILE function can only be column");
}
- //param1
+ // param1
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
@@ -245,7 +245,7 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
- //param2
+ // param2
if (3 == numOfParams) {
uint8_t para3Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type;
if (!IS_VAR_DATA_TYPE(para3Type)) {
@@ -285,14 +285,14 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
- //param0
+ // param0
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The first parameter of TOP/BOTTOM function can only be column");
}
- //param1
+ // param1
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
@@ -309,7 +309,7 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
pValue->notReserved = true;
- //set result type
+ // set result type
SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type};
return TSDB_CODE_SUCCESS;
@@ -709,8 +709,8 @@ static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (pValue->datum.i < ((i > 1) ? 0 : 1) || pValue->datum.i > 100) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
- "TAIL function second parameter should be in range [1, 100], "
- "third parameter should be in range [0, 100]");
+ "TAIL function second parameter should be in range [1, 100], "
+ "third parameter should be in range [0, 100]");
}
pValue->notReserved = true;
@@ -771,7 +771,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
- //param0
+ // param0
SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0);
if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
@@ -779,12 +779,11 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
}
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
- if (!IS_SIGNED_NUMERIC_TYPE(colType) && !IS_FLOAT_TYPE(colType) &&
- TSDB_DATA_TYPE_BOOL != colType) {
+ if (!IS_SIGNED_NUMERIC_TYPE(colType) && !IS_FLOAT_TYPE(colType) && TSDB_DATA_TYPE_BOOL != colType) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
- //param1
+ // param1
if (numOfParams == 2) {
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_INTEGER_TYPE(paraType)) {
@@ -911,7 +910,7 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
if (3 == numOfParams) {
SExprNode* p2 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 2);
- uint8_t para2Type = p2->resType.type;
+ uint8_t para2Type = p2->resType.type;
if (!IS_INTEGER_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
@@ -1144,6 +1143,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.finalizeFunc = functionFinalize,
.invertFunc = countInvertFunction,
.combineFunc = combineFunction,
+ // .pPartialFunc = "count",
+ // .pMergeFunc = "sum"
},
{
.name = "sum",
diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c
index c2b325bc92..611ae8d81f 100644
--- a/source/libs/function/src/functionMgt.c
+++ b/source/libs/function/src/functionMgt.c
@@ -199,3 +199,81 @@ bool fmIsInvertible(int32_t funcId) {
}
return res;
}
+
+static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) {
+ SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
+ if (NULL == pFunc) {
+ return NULL;
+ }
+ strcpy(pFunc->functionName, pName);
+ pFunc->pParameterList = pParameterList;
+ char msg[64] = {0};
+ if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc, msg, sizeof(msg))) {
+ nodesDestroyNode(pFunc);
+ return NULL;
+ }
+ return pFunc;
+}
+
+static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) {
+ SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
+ if (NULL == pCol) {
+ return NULL;
+ }
+ strcpy(pCol->colName, pFunc->node.aliasName);
+ pCol->node.resType = pFunc->node.resType;
+ return pCol;
+}
+
+bool fmIsDistExecFunc(int32_t funcId) {
+ if (!fmIsVectorFunc(funcId)) {
+ return true;
+ }
+ return (NULL != funcMgtBuiltins[funcId].pPartialFunc && NULL != funcMgtBuiltins[funcId].pMergeFunc);
+}
+
+static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNode** pPartialFunc) {
+ SNodeList* pParameterList = nodesCloneList(pSrcFunc->pParameterList);
+ if (NULL == pParameterList) {
+ return TSDB_CODE_OUT_OF_MEMORY;
+ }
+ *pPartialFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pPartialFunc, pParameterList);
+ if (NULL == *pPartialFunc) {
+ nodesDestroyList(pParameterList);
+ return TSDB_CODE_OUT_OF_MEMORY;
+ }
+ snprintf((*pPartialFunc)->node.aliasName, sizeof((*pPartialFunc)->node.aliasName), "%s.%p",
+ (*pPartialFunc)->functionName, pSrcFunc);
+ return TSDB_CODE_SUCCESS;
+}
+
+static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctionNode* pPartialFunc,
+ SFunctionNode** pMergeFunc) {
+ SNodeList* pParameterList = NULL;
+ nodesListMakeStrictAppend(&pParameterList, createColumnByFunc(pPartialFunc));
+ *pMergeFunc = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList);
+ if (NULL == *pMergeFunc) {
+ nodesDestroyList(pParameterList);
+ return TSDB_CODE_OUT_OF_MEMORY;
+ }
+ strcpy((*pMergeFunc)->node.aliasName, pSrcFunc->node.aliasName);
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc) {
+ if (!fmIsDistExecFunc(pFunc->funcId)) {
+ return TSDB_CODE_FAILED;
+ }
+
+ int32_t code = createPartialFunction(pFunc, pPartialFunc);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = createMergeFunction(pFunc, *pPartialFunc, pMergeFunc);
+ }
+
+ if (TSDB_CODE_SUCCESS != code) {
+ nodesDestroyNode(*pPartialFunc);
+ nodesDestroyNode(*pMergeFunc);
+ }
+
+ return code;
+}
diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c
index e2c9f91af4..94661388cb 100644
--- a/source/libs/nodes/src/nodesCloneFuncs.c
+++ b/source/libs/nodes/src/nodesCloneFuncs.c
@@ -373,7 +373,14 @@ static SNode* logicVnodeModifCopy(const SVnodeModifLogicNode* pSrc, SVnodeModifL
static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(srcGroupId);
- COPY_SCALAR_FIELD(precision);
+ return (SNode*)pDst;
+}
+
+static SNode* logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst) {
+ COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
+ CLONE_NODE_LIST_FIELD(pMergeKeys);
+ COPY_SCALAR_FIELD(numOfChannels);
+ COPY_SCALAR_FIELD(srcGroupId);
return (SNode*)pDst;
}
@@ -537,6 +544,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return logicVnodeModifCopy((const SVnodeModifLogicNode*)pNode, (SVnodeModifLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return logicExchangeCopy((const SExchangeLogicNode*)pNode, (SExchangeLogicNode*)pDst);
+ case QUERY_NODE_LOGIC_PLAN_MERGE:
+ return logicMergeCopy((const SMergeLogicNode*)pNode, (SMergeLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return logicWindowCopy((const SWindowLogicNode*)pNode, (SWindowLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_FILL:
diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c
index c0c8168eb1..4bce6381cd 100644
--- a/source/libs/nodes/src/nodesCodeFuncs.c
+++ b/source/libs/nodes/src/nodesCodeFuncs.c
@@ -190,6 +190,8 @@ const char* nodesNodeName(ENodeType type) {
return "LogicVnodeModif";
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return "LogicExchange";
+ case QUERY_NODE_LOGIC_PLAN_MERGE:
+ return "LogicMerge";
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return "LogicWindow";
case QUERY_NODE_LOGIC_PLAN_FILL:
@@ -220,6 +222,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiAgg";
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return "PhysiExchange";
+ case QUERY_NODE_PHYSICAL_PLAN_MERGE:
+ return "PhysiMerge";
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return "PhysiSort";
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
@@ -596,7 +600,6 @@ static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) {
}
static const char* jkExchangeLogicPlanSrcGroupId = "SrcGroupId";
-static const char* jkExchangeLogicPlanSrcPrecision = "Precision";
static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) {
const SExchangeLogicNode* pNode = (const SExchangeLogicNode*)pObj;
@@ -605,9 +608,6 @@ static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcGroupId, pNode->srcGroupId);
}
- if (TSDB_CODE_SUCCESS == code) {
- code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcPrecision, pNode->precision);
- }
return code;
}
@@ -619,8 +619,144 @@ static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcGroupId, &pNode->srcGroupId);
}
+
+ return code;
+}
+
+static const char* jkMergeLogicPlanMergeKeys = "MergeKeys";
+static const char* jkMergeLogicPlanNumOfChannels = "NumOfChannels";
+static const char* jkMergeLogicPlanSrcGroupId = "SrcGroupId";
+
+static int32_t logicMergeNodeToJson(const void* pObj, SJson* pJson) {
+ const SMergeLogicNode* pNode = (const SMergeLogicNode*)pObj;
+
+ int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
- code = tjsonGetUTinyIntValue(pJson, jkExchangeLogicPlanSrcPrecision, &pNode->precision);
+ code = nodeListToJson(pJson, jkMergeLogicPlanMergeKeys, pNode->pMergeKeys);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkMergeLogicPlanNumOfChannels, pNode->numOfChannels);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkMergeLogicPlanSrcGroupId, pNode->srcGroupId);
+ }
+
+ return code;
+}
+
+static int32_t jsonToLogicMergeNode(const SJson* pJson, void* pObj) {
+ SMergeLogicNode* pNode = (SMergeLogicNode*)pObj;
+
+ int32_t code = jsonToLogicPlanNode(pJson, pObj);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = jsonToNodeList(pJson, jkMergeLogicPlanMergeKeys, &pNode->pMergeKeys);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetIntValue(pJson, jkMergeLogicPlanNumOfChannels, &pNode->numOfChannels);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetIntValue(pJson, jkMergeLogicPlanSrcGroupId, &pNode->srcGroupId);
+ }
+
+ return code;
+}
+
+static const char* jkWindowLogicPlanWinType = "WinType";
+static const char* jkWindowLogicPlanFuncs = "Funcs";
+static const char* jkWindowLogicPlanInterval = "Interval";
+static const char* jkWindowLogicPlanOffset = "Offset";
+static const char* jkWindowLogicPlanSliding = "Sliding";
+static const char* jkWindowLogicPlanIntervalUnit = "IntervalUnit";
+static const char* jkWindowLogicPlanSlidingUnit = "SlidingUnit";
+static const char* jkWindowLogicPlanSessionGap = "SessionGap";
+static const char* jkWindowLogicPlanTspk = "Tspk";
+static const char* jkWindowLogicPlanStateExpr = "StateExpr";
+static const char* jkWindowLogicPlanTriggerType = "TriggerType";
+static const char* jkWindowLogicPlanWatermark = "Watermark";
+
+static int32_t logicWindowNodeToJson(const void* pObj, SJson* pJson) {
+ const SWindowLogicNode* pNode = (const SWindowLogicNode*)pObj;
+
+ int32_t code = logicPlanNodeToJson(pObj, pJson);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanWinType, pNode->winType);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = nodeListToJson(pJson, jkWindowLogicPlanFuncs, pNode->pFuncs);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanInterval, pNode->interval);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanOffset, pNode->offset);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanSliding, pNode->sliding);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanIntervalUnit, pNode->intervalUnit);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanSlidingUnit, pNode->slidingUnit);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanSessionGap, pNode->sessionGap);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddObject(pJson, jkWindowLogicPlanTspk, nodeToJson, pNode->pTspk);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddObject(pJson, jkWindowLogicPlanStateExpr, nodeToJson, pNode->pStateExpr);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanTriggerType, pNode->triggerType);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanWatermark, pNode->watermark);
+ }
+
+ return code;
+}
+
+static int32_t jsonToLogicWindowNode(const SJson* pJson, void* pObj) {
+ SWindowLogicNode* pNode = (SWindowLogicNode*)pObj;
+
+ int32_t code = jsonToLogicPlanNode(pJson, pObj);
+ if (TSDB_CODE_SUCCESS == code) {
+ tjsonGetNumberValue(pJson, jkWindowLogicPlanWinType, pNode->winType, code);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = jsonToNodeList(pJson, jkWindowLogicPlanFuncs, &pNode->pFuncs);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanInterval, &pNode->interval);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanOffset, &pNode->offset);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanSliding, &pNode->sliding);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetTinyIntValue(pJson, jkWindowLogicPlanIntervalUnit, &pNode->intervalUnit);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetTinyIntValue(pJson, jkWindowLogicPlanSlidingUnit, &pNode->slidingUnit);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanSessionGap, &pNode->sessionGap);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = jsonToNodeObject(pJson, jkWindowLogicPlanTspk, &pNode->pTspk);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = jsonToNodeObject(pJson, jkWindowLogicPlanStateExpr, &pNode->pStateExpr);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetTinyIntValue(pJson, jkWindowLogicPlanTriggerType, &pNode->triggerType);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanWatermark, &pNode->watermark);
}
return code;
@@ -1459,6 +1595,44 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
return code;
}
+static const char* jkMergePhysiPlanMergeKeys = "MergeKeys";
+static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels";
+static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId";
+
+static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
+ const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj;
+
+ int32_t code = physicPlanNodeToJson(pObj, pJson);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = nodeListToJson(pJson, jkMergePhysiPlanMergeKeys, pNode->pMergeKeys);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanNumOfChannels, pNode->numOfChannels);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanSrcGroupId, pNode->srcGroupId);
+ }
+
+ return code;
+}
+
+static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
+ SMergePhysiNode* pNode = (SMergePhysiNode*)pObj;
+
+ int32_t code = jsonToPhysicPlanNode(pJson, pObj);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = jsonToNodeList(pJson, jkMergePhysiPlanMergeKeys, &pNode->pMergeKeys);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetIntValue(pJson, jkMergePhysiPlanNumOfChannels, &pNode->numOfChannels);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetIntValue(pJson, jkMergePhysiPlanSrcGroupId, &pNode->srcGroupId);
+ }
+
+ return code;
+}
+
static const char* jkSortPhysiPlanExprs = "Exprs";
static const char* jkSortPhysiPlanSortKeys = "SortKeys";
static const char* jkSortPhysiPlanTargets = "Targets";
@@ -3401,6 +3575,10 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
break;
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return logicExchangeNodeToJson(pObj, pJson);
+ case QUERY_NODE_LOGIC_PLAN_MERGE:
+ return logicMergeNodeToJson(pObj, pJson);
+ case QUERY_NODE_LOGIC_PLAN_WINDOW:
+ return logicWindowNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_FILL:
return logicFillNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_SORT:
@@ -3427,6 +3605,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiAggNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return physiExchangeNodeToJson(pObj, pJson);
+ case QUERY_NODE_PHYSICAL_PLAN_MERGE:
+ return physiMergeNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return physiSortNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
@@ -3512,6 +3692,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToLogicProjectNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return jsonToLogicExchangeNode(pJson, pObj);
+ case QUERY_NODE_LOGIC_PLAN_MERGE:
+ return jsonToLogicMergeNode(pJson, pObj);
+ case QUERY_NODE_LOGIC_PLAN_WINDOW:
+ return jsonToLogicWindowNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_FILL:
return jsonToLogicFillNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_SORT:
@@ -3538,6 +3722,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiAggNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return jsonToPhysiExchangeNode(pJson, pObj);
+ case QUERY_NODE_PHYSICAL_PLAN_MERGE:
+ return jsonToPhysiMergeNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return jsonToPhysiSortNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c
index d29e89d266..51111ad864 100644
--- a/source/libs/nodes/src/nodesUtilFuncs.c
+++ b/source/libs/nodes/src/nodesUtilFuncs.c
@@ -220,6 +220,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SVnodeModifLogicNode));
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangeLogicNode));
+ case QUERY_NODE_LOGIC_PLAN_MERGE:
+ return makeNode(type, sizeof(SMergeLogicNode));
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return makeNode(type, sizeof(SWindowLogicNode));
case QUERY_NODE_LOGIC_PLAN_FILL:
@@ -250,6 +252,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SAggPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangePhysiNode));
+ case QUERY_NODE_PHYSICAL_PLAN_MERGE:
+ return makeNode(type, sizeof(SMergePhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return makeNode(type, sizeof(SSortPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp
index 154f13ea68..8fb28ce395 100644
--- a/source/libs/parser/test/mockCatalog.cpp
+++ b/source/libs/parser/test/mockCatalog.cpp
@@ -188,8 +188,8 @@ int32_t __catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* ve
}
int32_t __catalogGetDBVgInfo(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* dbFName,
- SArray** vgroupList) {
- return 0;
+ SArray** pVgList) {
+ return g_mockCatalogService->catalogGetDBVgInfo(dbFName, pVgList);
}
int32_t __catalogGetDBCfg(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg) {
diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp
index 566c4d8b04..4834d2d377 100644
--- a/source/libs/parser/test/mockCatalogService.cpp
+++ b/source/libs/parser/test/mockCatalogService.cpp
@@ -18,6 +18,7 @@
#include
#include
#include