feat: stream interval distributed split
This commit is contained in:
parent
cd7d3c59a2
commit
f45ddaf1c8
|
@ -212,6 +212,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_PHYSICAL_PLAN_INTERVAL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_FILL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW,
|
||||
|
|
|
@ -106,6 +106,12 @@ typedef struct SMergeLogicNode {
|
|||
|
||||
typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType;
|
||||
|
||||
typedef enum EStreamIntervalAlgorithm {
|
||||
STREAM_INTERVAL_ALGO_FINAL = 1,
|
||||
STREAM_INTERVAL_ALGO_SEMI,
|
||||
STREAM_INTERVAL_ALGO_SINGLE
|
||||
} EStreamIntervalAlgorithm;
|
||||
|
||||
typedef struct SWindowLogicNode {
|
||||
SLogicNode node;
|
||||
EWindowType winType;
|
||||
|
@ -121,6 +127,7 @@ typedef struct SWindowLogicNode {
|
|||
int8_t triggerType;
|
||||
int64_t watermark;
|
||||
double filesFactor;
|
||||
EStreamIntervalAlgorithm stmInterAlgo;
|
||||
} SWindowLogicNode;
|
||||
|
||||
typedef struct SFillLogicNode {
|
||||
|
@ -301,6 +308,8 @@ typedef struct SIntervalPhysiNode {
|
|||
} SIntervalPhysiNode;
|
||||
|
||||
typedef SIntervalPhysiNode SStreamIntervalPhysiNode;
|
||||
typedef SIntervalPhysiNode SStreamFinalIntervalPhysiNode;
|
||||
typedef SIntervalPhysiNode SStreamSemiIntervalPhysiNode;
|
||||
|
||||
typedef struct SFillPhysiNode {
|
||||
SPhysiNode node;
|
||||
|
|
|
@ -464,12 +464,9 @@ static bool validateStateOper(const SValueNode* pVal) {
|
|||
if (TSDB_DATA_TYPE_BINARY != pVal->node.resType.type) {
|
||||
return false;
|
||||
}
|
||||
return (0 == strcasecmp(varDataVal(pVal->datum.p), "GT") ||
|
||||
0 == strcasecmp(varDataVal(pVal->datum.p), "GE") ||
|
||||
0 == strcasecmp(varDataVal(pVal->datum.p), "LT") ||
|
||||
0 == strcasecmp(varDataVal(pVal->datum.p), "LE") ||
|
||||
0 == strcasecmp(varDataVal(pVal->datum.p), "EQ") ||
|
||||
0 == strcasecmp(varDataVal(pVal->datum.p), "NE"));
|
||||
return (0 == strcasecmp(varDataVal(pVal->datum.p), "GT") || 0 == strcasecmp(varDataVal(pVal->datum.p), "GE") ||
|
||||
0 == strcasecmp(varDataVal(pVal->datum.p), "LT") || 0 == strcasecmp(varDataVal(pVal->datum.p), "LE") ||
|
||||
0 == strcasecmp(varDataVal(pVal->datum.p), "EQ") || 0 == strcasecmp(varDataVal(pVal->datum.p), "NE"));
|
||||
}
|
||||
|
||||
static int32_t translateStateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
|
@ -553,7 +550,6 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32
|
|||
"STATEDURATION function time unit parameter should be greater than db precision");
|
||||
}
|
||||
|
||||
|
||||
pValue->notReserved = true;
|
||||
}
|
||||
|
||||
|
@ -838,7 +834,7 @@ static int32_t translateConcatImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
|||
int32_t resultBytes = 0;
|
||||
int32_t sepBytes = 0;
|
||||
|
||||
//concat_ws separator should be constant string
|
||||
// concat_ws separator should be constant string
|
||||
if (hasSep) {
|
||||
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (nodeType(pPara) != QUERY_NODE_VALUE) {
|
||||
|
@ -964,7 +960,7 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
|
|||
return false;
|
||||
}
|
||||
|
||||
char *tz = varDataVal(pVal->datum.p);
|
||||
char* tz = varDataVal(pVal->datum.p);
|
||||
int32_t len = varDataLen(pVal->datum.p);
|
||||
|
||||
if (len == 0) {
|
||||
|
@ -1010,18 +1006,18 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
|
|||
void static addTimezoneParam(SNodeList* pList) {
|
||||
char buf[6] = {0};
|
||||
time_t t = taosTime(NULL);
|
||||
struct tm *tmInfo = taosLocalTime(&t, NULL);
|
||||
struct tm* tmInfo = taosLocalTime(&t, NULL);
|
||||
strftime(buf, sizeof(buf), "%z", tmInfo);
|
||||
int32_t len = (int32_t)strlen(buf);
|
||||
|
||||
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
pVal->literal = strndup(buf, len);
|
||||
pVal->isDuration =false;
|
||||
pVal->isDuration = false;
|
||||
pVal->translate = true;
|
||||
pVal->node.resType.type = TSDB_DATA_TYPE_BINARY;
|
||||
pVal->node.resType.bytes = len + VARSTR_HEADER_SIZE;
|
||||
pVal->node.resType.precision = TSDB_TIME_PRECISION_MILLI;
|
||||
pVal->datum.p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE +1);
|
||||
pVal->datum.p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE + 1);
|
||||
varDataSetLen(pVal->datum.p, len);
|
||||
strncpy(varDataVal(pVal->datum.p), pVal->literal, len);
|
||||
|
||||
|
@ -1034,25 +1030,24 @@ static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
|||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
//param0
|
||||
// param0
|
||||
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (!IS_INTEGER_TYPE(paraType) && TSDB_DATA_TYPE_TIMESTAMP != paraType) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
//param1
|
||||
// param1
|
||||
if (numOfParams == 2) {
|
||||
SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
|
||||
|
||||
if (!validateTimezoneFormat(pValue)) {
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||
"Invalid timzone format");
|
||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "Invalid timzone format");
|
||||
}
|
||||
} else { //add default client timezone
|
||||
} else { // add default client timezone
|
||||
addTimezoneParam(pFunc->pParameterList);
|
||||
}
|
||||
|
||||
//set result type
|
||||
// set result type
|
||||
pFunc->node.resType = (SDataType){.bytes = 64, .type = TSDB_DATA_TYPE_BINARY};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1143,8 +1138,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.finalizeFunc = functionFinalize,
|
||||
.invertFunc = countInvertFunction,
|
||||
.combineFunc = combineFunction,
|
||||
// .pPartialFunc = "count",
|
||||
// .pMergeFunc = "sum"
|
||||
.pPartialFunc = "count",
|
||||
.pMergeFunc = "sum"
|
||||
},
|
||||
{
|
||||
.name = "sum",
|
||||
|
|
|
@ -142,7 +142,7 @@ static SNode* valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
|
|||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_VARCHAR:
|
||||
case TSDB_DATA_TYPE_VARBINARY:{
|
||||
case TSDB_DATA_TYPE_VARBINARY: {
|
||||
int32_t len = varDataTLen(pSrc->datum.p) + 1;
|
||||
pDst->datum.p = taosMemoryCalloc(1, len);
|
||||
if (NULL == pDst->datum.p) {
|
||||
|
@ -399,6 +399,7 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD
|
|||
COPY_SCALAR_FIELD(triggerType);
|
||||
COPY_SCALAR_FIELD(watermark);
|
||||
COPY_SCALAR_FIELD(filesFactor);
|
||||
COPY_SCALAR_FIELD(stmInterAlgo);
|
||||
return (SNode*)pDst;
|
||||
}
|
||||
|
||||
|
|
|
@ -230,6 +230,10 @@ const char* nodesNodeName(ENodeType type) {
|
|||
return "PhysiInterval";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
|
||||
return "PhysiStreamInterval";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
|
||||
return "PhysiStreamFinalInterval";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
|
||||
return "PhysiStreamSemiInterval";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_FILL:
|
||||
return "PhysiFill";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
|
||||
|
@ -3611,6 +3615,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
return physiSortNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
|
||||
return physiIntervalNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_FILL:
|
||||
return physiFillNodeToJson(pObj, pJson);
|
||||
|
@ -3728,6 +3734,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
return jsonToPhysiSortNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
|
||||
return jsonToPhysiIntervalNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_FILL:
|
||||
return jsonToPhysiFillNode(pJson, pObj);
|
||||
|
|
|
@ -260,6 +260,10 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
|||
return makeNode(type, sizeof(SIntervalPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
|
||||
return makeNode(type, sizeof(SStreamIntervalPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
|
||||
return makeNode(type, sizeof(SStreamFinalIntervalPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
|
||||
return makeNode(type, sizeof(SStreamSemiIntervalPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_FILL:
|
||||
return makeNode(type, sizeof(SFillPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
|
||||
|
|
|
@ -562,6 +562,7 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
|
|||
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval);
|
||||
pWindow->slidingUnit =
|
||||
(NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit);
|
||||
pWindow->stmInterAlgo = STREAM_INTERVAL_ALGO_SINGLE;
|
||||
|
||||
pWindow->pTspk = nodesCloneNode(pInterval->pCol);
|
||||
if (NULL == pWindow->pTspk) {
|
||||
|
|
|
@ -526,10 +526,10 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
|||
pScan->accountId = pCxt->pPlanCxt->acctId;
|
||||
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES)) {
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
SQueryNodeLoad node = { .addr = pSubplan->execNode, .load = 0};
|
||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
|
||||
} else {
|
||||
SQueryNodeLoad node = { .addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
|
||||
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
}
|
||||
pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
|
||||
|
@ -933,11 +933,22 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
|
|||
return code;
|
||||
}
|
||||
|
||||
static ENodeType getIntervalOperatorType(bool streamQuery, EStreamIntervalAlgorithm stmAlgo) {
|
||||
if (streamQuery) {
|
||||
return STREAM_INTERVAL_ALGO_FINAL == stmAlgo
|
||||
? QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
|
||||
: (STREAM_INTERVAL_ALGO_SEMI == stmAlgo ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
|
||||
: QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL);
|
||||
} else {
|
||||
return QUERY_NODE_PHYSICAL_PLAN_INTERVAL;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
|
||||
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
|
||||
SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(
|
||||
pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode,
|
||||
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL : QUERY_NODE_PHYSICAL_PLAN_INTERVAL));
|
||||
getIntervalOperatorType(pCxt->pPlanCxt->streamQuery, pWindowLogicNode->stmInterAlgo));
|
||||
if (NULL == pInterval) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||
|
||||
typedef struct SSplitContext {
|
||||
SPlanContext* pPlanCxt;
|
||||
uint64_t queryId;
|
||||
int32_t groupId;
|
||||
bool split;
|
||||
|
@ -36,7 +37,7 @@ typedef struct SSplitRule {
|
|||
FSplit splitFunc;
|
||||
} SSplitRule;
|
||||
|
||||
typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, void* pInfo);
|
||||
typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, void* pInfo);
|
||||
|
||||
static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||
|
@ -63,19 +64,29 @@ static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNod
|
|||
return pSubplan;
|
||||
}
|
||||
|
||||
static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
|
||||
ESubplanType subplanType) {
|
||||
static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SExchangeLogicNode** pOutput) {
|
||||
SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
|
||||
if (NULL == pExchange) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pExchange->srcGroupId = pCxt->groupId;
|
||||
pExchange->node.precision = pSplitNode->precision;
|
||||
pExchange->node.pTargets = nodesCloneList(pSplitNode->pTargets);
|
||||
pExchange->node.precision = pChild->precision;
|
||||
pExchange->node.pTargets = nodesCloneList(pChild->pTargets);
|
||||
if (NULL == pExchange->node.pTargets) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
*pOutput = pExchange;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
|
||||
ESubplanType subplanType) {
|
||||
SExchangeLogicNode* pExchange = NULL;
|
||||
if (TSDB_CODE_SUCCESS != splCreateExchangeNode(pCxt, pSplitNode, &pExchange)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pSubplan->subplanType = subplanType;
|
||||
|
||||
if (NULL == pSplitNode->pParent) {
|
||||
|
@ -97,7 +108,7 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
|
|||
|
||||
static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) {
|
||||
if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) {
|
||||
if (func(pSubplan, pInfo)) {
|
||||
if (func(pCxt, pSubplan, pInfo)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -125,41 +136,47 @@ static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
|
|||
return false;
|
||||
}
|
||||
|
||||
static bool stbSplIsMultiTbScan(SScanLogicNode* pScan) {
|
||||
return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1);
|
||||
static bool stbSplIsMultiTbScan(bool streamQuery, SScanLogicNode* pScan) {
|
||||
return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1) ||
|
||||
(streamQuery && TSDB_SUPER_TABLE == pScan->pMeta->tableType);
|
||||
}
|
||||
|
||||
static bool stbSplHasMultiTbScan(SLogicNode* pNode) {
|
||||
static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
|
||||
if (1 != LIST_LENGTH(pNode->pChildren)) {
|
||||
return false;
|
||||
}
|
||||
SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
|
||||
return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan((SScanLogicNode*)pChild));
|
||||
return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
|
||||
}
|
||||
|
||||
static bool stbSplNeedSplit(SLogicNode* pNode) {
|
||||
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
|
||||
switch (nodeType(pNode)) {
|
||||
// case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||
// return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(pNode);
|
||||
case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
||||
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(pNode);
|
||||
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
|
||||
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
|
||||
if (WINDOW_TYPE_INTERVAL != pWindow->winType) {
|
||||
return false;
|
||||
}
|
||||
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
||||
}
|
||||
// case QUERY_NODE_LOGIC_PLAN_SORT:
|
||||
// return stbSplHasMultiTbScan(pNode);
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
return stbSplIsMultiTbScan((SScanLogicNode*)pNode);
|
||||
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static SLogicNode* stbSplMatchByNode(SLogicNode* pNode) {
|
||||
if (stbSplNeedSplit(pNode)) {
|
||||
static SLogicNode* stbSplMatchByNode(bool streamQuery, SLogicNode* pNode) {
|
||||
if (stbSplNeedSplit(streamQuery, pNode)) {
|
||||
return pNode;
|
||||
}
|
||||
SNode* pChild;
|
||||
FOREACH(pChild, pNode->pChildren) {
|
||||
SLogicNode* pSplitNode = stbSplMatchByNode((SLogicNode*)pChild);
|
||||
SLogicNode* pSplitNode = stbSplMatchByNode(streamQuery, (SLogicNode*)pChild);
|
||||
if (NULL != pSplitNode) {
|
||||
return pSplitNode;
|
||||
}
|
||||
|
@ -167,8 +184,8 @@ static SLogicNode* stbSplMatchByNode(SLogicNode* pNode) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static bool stbSplFindSplitNode(SLogicSubplan* pSubplan, SStableSplitInfo* pInfo) {
|
||||
SLogicNode* pSplitNode = stbSplMatchByNode(pSubplan->pNode);
|
||||
static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SStableSplitInfo* pInfo) {
|
||||
SLogicNode* pSplitNode = stbSplMatchByNode(pCxt->pPlanCxt->streamQuery, pSubplan->pNode);
|
||||
if (NULL != pSplitNode) {
|
||||
pInfo->pSplitNode = pSplitNode;
|
||||
pInfo->pSubplan = pSubplan;
|
||||
|
@ -301,7 +318,7 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, S
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
SLogicNode* pPartWindow = NULL;
|
||||
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -315,8 +332,41 @@ static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInf
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) {
|
||||
SExchangeLogicNode* pExchange = NULL;
|
||||
int32_t code = splCreateExchangeNode(pCxt, pPartChild, &pExchange);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeAppend(&pParent->pChildren, pExchange);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitWindowNodeForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
SLogicNode* pPartWindow = NULL;
|
||||
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
((SWindowLogicNode*)pPartWindow)->stmInterAlgo = STREAM_INTERVAL_ALGO_SEMI;
|
||||
((SWindowLogicNode*)pInfo->pSplitNode)->stmInterAlgo = STREAM_INTERVAL_ALGO_FINAL;
|
||||
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
||||
splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
|
||||
}
|
||||
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
if (pCxt->pPlanCxt->streamQuery) {
|
||||
return stbSplSplitWindowNodeForStream(pCxt, pInfo);
|
||||
} else {
|
||||
return stbSplSplitWindowNodeForBatch(pCxt, pInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
int32_t code = splCreateExchangeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
|
||||
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
||||
splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
|
||||
|
@ -375,7 +425,7 @@ static SJoinLogicNode* sigTbJoinSplMatchByNode(SLogicNode* pNode) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static bool sigTbJoinSplFindSplitNode(SLogicSubplan* pSubplan, SSigTbJoinSplitInfo* pInfo) {
|
||||
static bool sigTbJoinSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SSigTbJoinSplitInfo* pInfo) {
|
||||
SJoinLogicNode* pJoin = sigTbJoinSplMatchByNode(pSubplan->pNode);
|
||||
if (NULL != pJoin) {
|
||||
pInfo->pJoin = pJoin;
|
||||
|
@ -390,7 +440,7 @@ static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan
|
|||
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)sigTbJoinSplFindSplitNode, &info)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
int32_t code = splCreateExchangeNode(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
|
||||
int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, splCreateScanSubplan(pCxt, info.pSplitNode, 0));
|
||||
}
|
||||
|
@ -489,7 +539,7 @@ static SLogicNode* unAllSplMatchByNode(SLogicNode* pNode) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static bool unAllSplFindSplitNode(SLogicSubplan* pSubplan, SUnionAllSplitInfo* pInfo) {
|
||||
static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SUnionAllSplitInfo* pInfo) {
|
||||
SLogicNode* pSplitNode = unAllSplMatchByNode(pSubplan->pNode);
|
||||
if (NULL != pSplitNode) {
|
||||
pInfo->pProject = (SProjectLogicNode*)pSplitNode;
|
||||
|
@ -581,7 +631,7 @@ static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* p
|
|||
return nodesListMakeAppend(&pAgg->node.pChildren, pExchange);
|
||||
}
|
||||
|
||||
static bool unDistSplFindSplitNode(SLogicSubplan* pSubplan, SUnionDistinctSplitInfo* pInfo) {
|
||||
static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SUnionDistinctSplitInfo* pInfo) {
|
||||
SLogicNode* pSplitNode = unDistSplMatchByNode(pSubplan->pNode);
|
||||
if (NULL != pSplitNode) {
|
||||
pInfo->pAgg = (SAggLogicNode*)pSplitNode;
|
||||
|
@ -623,8 +673,9 @@ static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
|
|||
taosMemoryFree(pStr);
|
||||
}
|
||||
|
||||
static int32_t applySplitRule(SLogicSubplan* pSubplan) {
|
||||
SSplitContext cxt = {.queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
|
||||
static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
|
||||
SSplitContext cxt = {
|
||||
.pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
|
||||
bool split = false;
|
||||
do {
|
||||
split = false;
|
||||
|
@ -672,7 +723,7 @@ int32_t splitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan
|
|||
pSubplan->id.groupId = 1;
|
||||
setLogicNodeParent(pSubplan->pNode);
|
||||
|
||||
int32_t code = applySplitRule(pSubplan);
|
||||
int32_t code = applySplitRule(pCxt, pSubplan);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicSubplan = pSubplan;
|
||||
} else {
|
||||
|
|
|
@ -33,6 +33,12 @@ TEST_F(PlanOtherTest, createStream) {
|
|||
"interval(10s)");
|
||||
}
|
||||
|
||||
TEST_F(PlanOtherTest, createStreamUseSTable) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("create stream if not exists s1 as select count(*) from st1 interval(10s)");
|
||||
}
|
||||
|
||||
TEST_F(PlanOtherTest, createSmaIndex) {
|
||||
useDb("root", "test");
|
||||
|
||||
|
|
Loading…
Reference in New Issue