fix:add plan for multi agg operator in stream

This commit is contained in:
wangmm0220 2023-12-05 20:07:20 +08:00
parent 1433481989
commit fc9dfc77d0
19 changed files with 149 additions and 42 deletions

View File

@ -253,6 +253,7 @@ typedef enum EWindowAlgorithm {
SESSION_ALGO_STREAM_FINAL,
SESSION_ALGO_STREAM_SINGLE,
SESSION_ALGO_MERGE,
INTERVAL_ALGO_STREAM_MID,
} EWindowAlgorithm;
typedef struct SWindowLogicNode {
@ -579,6 +580,7 @@ typedef SIntervalPhysiNode SMergeAlignedIntervalPhysiNode;
typedef SIntervalPhysiNode SStreamIntervalPhysiNode;
typedef SIntervalPhysiNode SStreamFinalIntervalPhysiNode;
typedef SIntervalPhysiNode SStreamSemiIntervalPhysiNode;
typedef SIntervalPhysiNode SStreamMidIntervalPhysiNode;
typedef struct SFillPhysiNode {
SPhysiNode node;

View File

@ -147,7 +147,7 @@ bool tsEnableQueryHb = true;
bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true
int32_t tsQuerySmaOptimize = 0;
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
bool tsQueryPlannerTrace = false;
bool tsQueryPlannerTrace = true;
int32_t tsQueryNodeChunkSize = 32 * 1024;
bool tsQueryUseNodeAllocator = true;
bool tsKeepColumnName = false;

View File

@ -328,35 +328,6 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
return 0;
}
static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64_t watermark, char **pStr) {
if (NULL == ast) {
return TSDB_CODE_SUCCESS;
}
SNode * pAst = NULL;
int32_t code = nodesStringToNode(ast, &pAst);
SQueryPlan *pPlan = NULL;
if (TSDB_CODE_SUCCESS == code) {
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = (triggerType == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType,
.watermark = watermark,
};
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString((SNode *)pPlan, false, pStr, NULL);
}
nodesDestroyNode(pAst);
nodesDestroyNode((SNode *)pPlan);
terrno = code;
return code;
}
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
SNode * pAst = NULL;
SQueryPlan *pPlan = NULL;
@ -768,6 +739,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SDbObj * pDb = NULL;
SCMCreateStreamReq createStreamReq = {0};
SStreamObj streamObj = {0};
char* sql = NULL;
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
@ -799,7 +771,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
char* sql = NULL;
int32_t sqlLen = 0;
if(createStreamReq.sql != NULL){
sqlLen = strlen(createStreamReq.sql);

View File

@ -161,6 +161,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
if(pRebVg->oldConsumerId == -1) return 0;
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
return -1;
}

View File

@ -14,4 +14,5 @@ add_subdirectory(snode)
add_subdirectory(stb)
add_subdirectory(topic)
add_subdirectory(trans)
add_subdirectory(stream)
#add_subdirectory(user)

View File

@ -0,0 +1,12 @@
aux_source_directory(. MNODE_STREAM_PLAN_TEST_SRC)
add_executable(streamPlanTest ${MNODE_STREAM_PLAN_TEST_SRC})
target_link_libraries(
streamPlanTest
PUBLIC nodes planner gtest qcom
)
add_test(
NAME streamPlanTest
COMMAND streamPlanTest
)

View File

@ -0,0 +1,66 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#include "nodes.h"
#include "planner.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
TEST(testCase, plan_Test) {
char* ast = "{\"NodeType\":\"101\",\"Name\":\"SelectStmt\",\"SelectStmt\":{\"Distinct\":false,\"Projections\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"9\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_1\",\"UserAlias\":\"_wstart\",\"Name\":\"_wstart\",\"Id\":\"89\",\"Type\":\"3505\",\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_2\",\"UserAlias\":\"sum(voltage)\",\"Name\":\"sum\",\"Id\":\"1\",\"Type\":\"14\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"voltage\",\"UserAlias\":\"voltage\",\"TableId\":\"6555383776122680534\",\"TableType\":\"1\",\"ColId\":\"3\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"meters\",\"TableAlias\":\"meters\",\"ColName\":\"voltage\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"#expr_3\",\"UserAlias\":\"groupid\",\"Name\":\"_group_key\",\"Id\":\"96\",\"Type\":\"3754\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"#expr_3\",\"UserAlias\":\"groupid\",\"TableId\":\"6555383776122680534\",\"TableType\":\"1\",\"ColId\":\"5\",\"ProjId\":\"0\",\"ColType\":\"2\",\"DbName\":\"test\",\"TableName\":\"meters\",\"TableAlias\":\"meters\",\"ColName\":\"groupid\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}}],\"From\":{\"NodeType\":\"6\",\"Name\":\"RealTable\",\"RealTable\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"0\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"DbName\":\"test\",\"tableName\":\"meters\",\"tableAlias\":\"meters\",\"MetaSize\":\"475\",\"Meta\":{\"VgId\":\"0\",\"TableType\":\"1\",\"Uid\":\"6555383776122680534\",\"Suid\":\"6555383776122680534\",\"Sversion\":\"1\",\"Tversion\":\"1\",\"ComInfo\":{\"NumOfTags\":\"2\",\"Precision\":\"0\",\"NumOfColumns\":\"4\",\"RowSize\":\"20\"},\"ColSchemas\":[{\"Type\":\"9\",\"ColId\":\"1\",\"bytes\":\"8\",\"Name\":\"ts\"},{\"Type\":\"6\",\"ColId\":\"2\",\"bytes\":\"4\",\"Name\":\"current\"},{\"Type\":\"4\",\"ColId\":\"3\",\"bytes\":\"4\",\"Name\":\"voltage\"},{\"Type\":\"6\",\"ColId\":\"4\",\"bytes\":\"4\",\"Name\":\"phase\"},{\"Type\":\"4\",\"ColId\":\"5\",\"bytes\":\"4\",\"Name\":\"groupid\"},{\"Type\":\"8\",\"ColId\":\"6\",\"bytes\":\"26\",\"Name\":\"location\"}]},\"VgroupsInfoSize\":\"1340\",\"VgroupsInfo\":{\"Num\":\"2\",\"Vgroups\":[{\"VgId\":\"2\",\"HashBegin\":\"0\",\"HashEnd\":\"2147483646\",\"EpSet\":{\"InUse\":\"0\",\"NumOfEps\":\"1\",\"Eps\":[{\"Fqdn\":\"localhost\",\"Port\":\"6030\"}]},\"NumOfTable\":\"0\"},{\"VgId\":\"3\",\"HashBegin\":\"2147483647\",\"HashEnd\":\"4294967295\",\"EpSet\":{\"InUse\":\"0\",\"NumOfEps\":\"1\",\"Eps\":[{\"Fqdn\":\"localhost\",\"Port\":\"6030\"}]},\"NumOfTable\":\"0\"}]}}},\"PartitionBy\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"groupid\",\"UserAlias\":\"groupid\",\"TableId\":\"6555383776122680534\",\"TableType\":\"1\",\"ColId\":\"5\",\"ProjId\":\"0\",\"ColType\":\"2\",\"DbName\":\"test\",\"TableName\":\"meters\",\"TableAlias\":\"meters\",\"ColName\":\"groupid\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"Window\":{\"NodeType\":\"14\",\"Name\":\"IntervalWindow\",\"IntervalWindow\":{\"Interval\":{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"115\",\"Bytes\":\"8\"},\"AliasName\":\"c804c3a15ebe05b5baf40ad5ee12be1f\",\"UserAlias\":\"2s\",\"LiteralSize\":\"2\",\"Literal\":\"2s\",\"Duration\":true,\"Translate\":true,\"NotReserved\":false,\"IsNull\":false,\"Unit\":\"115\",\"Datum\":\"2000\"}},\"TsPk\":{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"9\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"ts\",\"UserAlias\":\"ts\",\"TableId\":\"6555383776122680534\",\"TableType\":\"1\",\"ColId\":\"1\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"meters\",\"TableAlias\":\"meters\",\"ColName\":\"ts\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}}},\"StmtName\":\"0x1580095ba\",\"HasAggFuncs\":true}}";
SNode * pAst = NULL;
SQueryPlan *pPlan = NULL;
if (taosCreateLog("taoslog", 10, "/etc/taos", NULL, NULL, NULL, NULL, 1) != 0) {
// ignore create log failed, only print
printf(" WARING: Create failed:%s. configDir\n", strerror(errno));
}
if (nodesStringToNode(ast, &pAst) < 0) {
ASSERT(0);
}
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = STREAM_TRIGGER_WINDOW_CLOSE,
.watermark = 1,
.igExpired = 1,
.deleteMark = 1,
.igCheckUpdate = 1,
};
// using ast and param to build physical plan
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
ASSERT(0);
}
}

View File

@ -180,7 +180,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
pSdb->maxId[pRow->type] = TMAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
}
if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT64) {
pSdb->maxId[pRow->type] = TMAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
pSdb->maxId[pRow->type] = TMAX(pSdb->maxId[pRow->type], *((int64_t *)pRow->pObj));
}
pSdb->tableVer[pRow->type]++;

View File

@ -943,7 +943,7 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
while (1) {
int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
@ -1016,7 +1016,7 @@ int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
while (1) {
uint16_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;

View File

@ -489,6 +489,9 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
int32_t children = 0;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL == type) {
int32_t children = 0;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
int32_t children = pHandle->numOfVgroups;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle);

View File

@ -1134,6 +1134,7 @@ static bool isStateWindow(SStreamScanInfo* pInfo) {
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL ||
pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
}

View File

@ -1380,7 +1380,8 @@ static int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) {
}
static void streamIntervalReleaseState(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL &&
pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
int32_t resSize = sizeof(TSKEY);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
@ -1396,7 +1397,8 @@ static void streamIntervalReleaseState(SOperatorInfo* pOperator) {
}
void streamIntervalReloadState(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL &&
pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
int32_t size = 0;
void* pBuf = NULL;
@ -1512,7 +1514,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
initIntervalDownStream(downstream, pPhyNode->type, pInfo);
}
code = appendDownstream(pOperator, &downstream, 1);

View File

@ -921,6 +921,7 @@ SNode* nodesCloneNode(const SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
code = physiIntervalCopy((const SIntervalPhysiNode*)pNode, (SIntervalPhysiNode*)pDst);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:

View File

@ -357,6 +357,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiStreamFinalInterval";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
return "PhysiStreamSemiInterval";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
return "PhysiStreamMidInterval";
case QUERY_NODE_PHYSICAL_PLAN_FILL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
return "PhysiFill";
@ -7082,6 +7084,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
return physiIntervalNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_FILL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
@ -7413,6 +7416,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
return jsonToPhysiIntervalNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_FILL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:

View File

@ -4139,6 +4139,7 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
code = physiIntervalNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_FILL:
@ -4293,6 +4294,7 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
code = msgToPhysiIntervalNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_FILL:

View File

@ -554,6 +554,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SStreamFinalIntervalPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
return makeNode(type, sizeof(SStreamSemiIntervalPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
return makeNode(type, sizeof(SStreamMidIntervalPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_FILL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
return makeNode(type, sizeof(SFillPhysiNode));
@ -1363,6 +1365,7 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
destroyWinodwPhysiNode((SWindowPhysiNode*)pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_FILL:

View File

@ -1587,6 +1587,8 @@ static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) {
return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
case INTERVAL_ALGO_STREAM_SEMI:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
case INTERVAL_ALGO_STREAM_MID:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL;
case INTERVAL_ALGO_STREAM_SINGLE:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
case SESSION_ALGO_STREAM_FINAL:

View File

@ -635,18 +635,52 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
return code;
}
//static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
// SLogicNode* pPartWindow = NULL;
// int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
// if (TSDB_CODE_SUCCESS == code) {
// ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
// ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL;
// code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
// }
// if (TSDB_CODE_SUCCESS == code) {
// code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
// (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
// }
// pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
// ++(pCxt->groupId);
// return code;
//}
static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
SLogicNode* pMidWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pMidWindow);
if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
((SWindowLogicNode*)pMidWindow)->windowAlgo = INTERVAL_ALGO_STREAM_MID;
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL;
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pMidWindow);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
code = stbSplCreatePartWindowNode((SWindowLogicNode*)pMidWindow, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
code = stbSplCreateExchangeNode(pCxt, pMidWindow, pPartWindow);
}
}
if (TSDB_CODE_SUCCESS == code) {
SNode* subPlan = (SNode*)splCreateSubplan(pCxt, pMidWindow);
((SLogicSubplan*)subPlan)->subplanType = SUBPLAN_TYPE_MERGE;
code = nodesListMakeStrictAppend(&((SLogicSubplan*)subPlan)->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, subPlan);
}
}
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
++(pCxt->groupId);
return code;

View File

@ -95,6 +95,7 @@ int32_t doValidatePhysiNode(SValidatePlanContext* pCxt, SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_FILL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: