feat: add session and state window on stable

This commit is contained in:
slzhou 2022-06-23 09:01:03 +08:00
parent 849cb8d1b3
commit 6fe368a375
1 changed files with 27 additions and 19 deletions

View File

@ -166,6 +166,31 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild)); return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
} }
static bool stbSplNeedSplitWindow(bool streamQuery, SLogicNode* pNode) {
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
if (WINDOW_TYPE_INTERVAL == pWindow->winType) {
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
}
if (WINDOW_TYPE_SESSION == pWindow->winType) {
if (!streamQuery) {
return stbSplHasMultiTbScan(streamQuery, pNode);
} else {
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
}
}
if (WINDOW_TYPE_STATE == pWindow->winType) {
if (!streamQuery) {
return stbSplHasMultiTbScan(streamQuery, pNode);
} else {
return false;
}
}
return false;
}
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
switch (nodeType(pNode)) { switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_LOGIC_PLAN_SCAN:
@ -174,25 +199,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
return !(((SJoinLogicNode*)pNode)->isSingleTableJoin); return !(((SJoinLogicNode*)pNode)->isSingleTableJoin);
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: { case QUERY_NODE_LOGIC_PLAN_WINDOW:
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode; return stbSplNeedSplitWindow(streamQuery, pNode);
if (WINDOW_TYPE_INTERVAL == pWindow->winType) {
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
if (WINDOW_TYPE_STATE == pWindow->winType) {
if (!streamQuery) {
return stbSplHasMultiTbScan(streamQuery, pNode);
} else {
return false;
}
}
if (WINDOW_TYPE_SESSION == pWindow->winType) {
if (!streamQuery) {
return stbSplHasMultiTbScan(streamQuery, pNode);
} else {
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
}
}
}
case QUERY_NODE_LOGIC_PLAN_SORT: case QUERY_NODE_LOGIC_PLAN_SORT:
return stbSplHasMultiTbScan(streamQuery, pNode); return stbSplHasMultiTbScan(streamQuery, pNode);
default: default: