Merge pull request #10730 from taosdata/feature/3.0_wxy

TD-13990 interval plan implement
This commit is contained in:
Xiaoyu Wang 2022-03-14 14:16:15 +08:00 committed by GitHub
commit b814f6c738
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 237 additions and 9 deletions

View File

@ -101,6 +101,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_PROJECT,
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF,
QUERY_NODE_LOGIC_PLAN_EXCHANGE,
QUERY_NODE_LOGIC_PLAN_WINDOW,
QUERY_NODE_LOGIC_SUBPLAN,
QUERY_NODE_LOGIC_PLAN,
@ -115,6 +116,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_AGG,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
QUERY_NODE_PHYSICAL_PLAN_INSERT,
QUERY_NODE_PHYSICAL_SUBPLAN,

View File

@ -80,6 +80,22 @@ typedef struct SExchangeLogicNode {
int32_t srcGroupId;
} SExchangeLogicNode;
typedef enum EWindowType {
WINDOW_TYPE_INTERVAL = 1,
WINDOW_TYPE_SESSION,
WINDOW_TYPE_STATE
} EWindowType;
typedef struct SWindowLogicNode {
SLogicNode node;
EWindowType winType;
SNodeList* pFuncs;
int64_t interval;
int64_t offset;
int64_t sliding;
SFillNode* pFill;
} SWindowLogicNode;
typedef enum ESubplanType {
SUBPLAN_TYPE_MERGE = 1,
SUBPLAN_TYPE_PARTIAL,
@ -191,6 +207,16 @@ typedef struct SExchangePhysiNode {
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhysiNode;
typedef struct SIntervalPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of parameter expression of function
SNodeList* pFuncs;
int64_t interval;
int64_t offset;
int64_t sliding;
SFillNode* pFill;
} SIntervalPhysiNode;
typedef struct SDataSinkNode {
ENodeType type;
SDataBlockDescNode* pInputDataBlockDesc;

View File

@ -195,11 +195,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
pRequest->type = pQuery->msgType;
SPlanContext cxt = { .queryId = pRequest->requestId, .pAstRoot = pQuery->pRoot, .acctId = pRequest->pTscObj->acctId };
int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
if (code != 0) {
return code;
}
return code;
return qCreateQueryPlan(&cxt, pPlan, pNodeList);
}
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {

View File

@ -183,6 +183,12 @@ static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode
return (SNode*)pDst;
}
static SNode* fillNodeCopy(const SFillNode* pSrc, SFillNode* pDst) {
COPY_SCALAR_FIELD(mode);
CLONE_NODE_FIELD(pValues);
return (SNode*)pDst;
}
static SNode* logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
COPY_SCALAR_FIELD(id);
CLONE_NODE_LIST_FIELD(pTargets);
@ -248,6 +254,17 @@ static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNo
return (SNode*)pDst;
}
static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(winType);
CLONE_NODE_LIST_FIELD(pFuncs);
COPY_SCALAR_FIELD(interval);
COPY_SCALAR_FIELD(offset);
COPY_SCALAR_FIELD(sliding);
CLONE_NODE_FIELD(pFill);
return (SNode*)pDst;
}
static SNode* logicSubplanCopy(const SSubLogicPlan* pSrc, SSubLogicPlan* pDst) {
CLONE_NODE_FIELD(pNode);
COPY_SCALAR_FIELD(subplanType);
@ -309,6 +326,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
case QUERY_NODE_ORDER_BY_EXPR:
case QUERY_NODE_LIMIT:
break;
case QUERY_NODE_FILL:
return fillNodeCopy((const SFillNode*)pNode, (SFillNode*)pDst);
case QUERY_NODE_DATABLOCK_DESC:
return dataBlockDescCopy((const SDataBlockDescNode*)pNode, (SDataBlockDescNode*)pDst);
case QUERY_NODE_SLOT_DESC:
@ -325,6 +344,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return logicVnodeModifCopy((const SVnodeModifLogicNode*)pNode, (SVnodeModifLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return logicExchangeCopy((const SExchangeLogicNode*)pNode, (SExchangeLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return logicWindowCopy((const SWindowLogicNode*)pNode, (SWindowLogicNode*)pDst);
case QUERY_NODE_LOGIC_SUBPLAN:
return logicSubplanCopy((const SSubLogicPlan*)pNode, (SSubLogicPlan*)pDst);
default:

View File

@ -117,6 +117,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiExchange";
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return "PhysiSort";
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return "PhysiInterval";
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return "PhysiDispatch";
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
@ -573,6 +575,65 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkIntervalPhysiPlanExprs = "Exprs";
static const char* jkIntervalPhysiPlanFuncs = "Funcs";
static const char* jkIntervalPhysiPlanInterval = "Interval";
static const char* jkIntervalPhysiPlanOffset = "Offset";
static const char* jkIntervalPhysiPlanSliding = "Sliding";
static const char* jkIntervalPhysiPlanFill = "Fill";
static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkIntervalPhysiPlanExprs, pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkIntervalPhysiPlanFuncs, pNode->pFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanInterval, pNode->interval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanOffset, pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanSliding, pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkIntervalPhysiPlanFill, nodeToJson, pNode->pFill);
}
return code;
}
static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) {
SIntervalPhysiNode* pNode = (SIntervalPhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkIntervalPhysiPlanExprs, &pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkIntervalPhysiPlanFuncs, &pNode->pFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanInterval, &pNode->interval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanOffset, &pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanSliding, &pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkIntervalPhysiPlanFill, (SNode**)&pNode->pFill);
}
return code;
}
static const char* jkDataSinkInputDataBlockDesc = "InputDataBlockDesc";
static int32_t physicDataSinkNodeToJson(const void* pObj, SJson* pJson) {
@ -1500,6 +1561,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiExchangeNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
break;
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return physiIntervalNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return physiDispatchNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INSERT:

View File

@ -134,6 +134,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SVnodeModifLogicNode));
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangeLogicNode));
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return makeNode(type, sizeof(SWindowLogicNode));
case QUERY_NODE_LOGIC_SUBPLAN:
return makeNode(type, sizeof(SSubLogicPlan));
case QUERY_NODE_LOGIC_PLAN:
@ -156,6 +158,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SExchangePhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return makeNode(type, sizeof(SNode));
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return makeNode(type, sizeof(SIntervalPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return makeNode(type, sizeof(SDataDispatcherNode));
case QUERY_NODE_PHYSICAL_PLAN_INSERT:

View File

@ -198,7 +198,7 @@ col_name(A) ::= column_name(B).
cmd ::= SHOW VGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, NULL); }
cmd ::= SHOW db_name(B) NK_DOT VGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, &B); }
/************************************************ show vgroups ********************************************************/
/************************************************ show mnodes *********************************************************/
cmd ::= SHOW MNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_MNODES_STMT, NULL); }
/************************************************ select **************************************************************/

View File

@ -706,9 +706,17 @@ static int32_t translateGroupBy(STranslateContext* pCxt, SNodeList* pGroupByList
return translateExprList(pCxt, pGroupByList);
}
static int32_t doTranslateWindow(STranslateContext* pCxt, SNode* pWindow) {
return TSDB_CODE_SUCCESS;
}
static int32_t translateWindow(STranslateContext* pCxt, SNode* pWindow) {
pCxt->currClause = SQL_CLAUSE_WINDOW;
return translateExpr(pCxt, pWindow);
int32_t code = translateExpr(pCxt, pWindow);
if (TSDB_CODE_SUCCESS == code) {
code = doTranslateWindow(pCxt, pWindow);
}
return code;
}
static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) {

View File

@ -183,6 +183,13 @@ TEST_F(ParserTest, selectClause) {
ASSERT_TRUE(run());
}
TEST_F(ParserTest, selectWindow) {
setDatabase("root", "test");
bind("SELECT count(*) FROM t1 interval(10s)");
ASSERT_TRUE(run());
}
TEST_F(ParserTest, selectSyntaxError) {
setDatabase("root", "test");

View File

@ -304,6 +304,50 @@ static SLogicNode* createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel
return (SLogicNode*)pAgg;
}
static SLogicNode* createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SIntervalWindowNode* pInterval, SSelectStmt* pSelect) {
SWindowLogicNode* pWindow = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW);
CHECK_ALLOC(pWindow, NULL);
pWindow->node.id = pCxt->planNodeId++;
pWindow->winType = WINDOW_TYPE_INTERVAL;
pWindow->interval = ((SValueNode*)pInterval->pInterval)->datum.i;
pWindow->offset = (NULL != pInterval->pOffset ? ((SValueNode*)pInterval->pOffset)->datum.i : 0);
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : 0);
if (NULL != pInterval->pFill) {
pWindow->pFill = nodesCloneNode(pInterval->pFill);
CHECK_ALLOC(pWindow->pFill, (SLogicNode*)pWindow);
}
SNodeList* pFuncs = NULL;
CHECK_CODE(nodesCollectFuncs(pSelect, fmIsAggFunc, &pFuncs), NULL);
if (NULL != pFuncs) {
pWindow->pFuncs = nodesCloneList(pFuncs);
CHECK_ALLOC(pWindow->pFuncs, (SLogicNode*)pWindow);
}
CHECK_CODE(rewriteExpr(pWindow->node.id, 1, pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW), (SLogicNode*)pWindow);
pWindow->node.pTargets = createColumnByRewriteExps(pCxt, pWindow->pFuncs);
CHECK_ALLOC(pWindow->node.pTargets, (SLogicNode*)pWindow);
return (SLogicNode*)pWindow;
}
static SLogicNode* createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pWindow) {
return NULL;
}
switch (nodeType(pSelect->pWindow)) {
case QUERY_NODE_INTERVAL_WINDOW:
return createWindowLogicNodeByInterval(pCxt, (SIntervalWindowNode*)pSelect->pWindow, pSelect);
default:
break;
}
return NULL;
}
static SNodeList* createColumnByProjections(SLogicPlanContext* pCxt, SNodeList* pExprs) {
SNodeList* pList = nodesMakeList();
CHECK_ALLOC(pList, NULL);
@ -345,6 +389,9 @@ static SLogicNode* createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
pRoot->pConditions = nodesCloneNode(pSelect->pWhere);
CHECK_ALLOC(pRoot->pConditions, pRoot);
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pRoot = pushLogicNode(pCxt, pRoot, createWindowLogicNode(pCxt, pSelect));
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect));
}

View File

@ -473,14 +473,58 @@ static SPhysiNode* createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLog
return (SPhysiNode*)pExchange;
}
static SPhysiNode* createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode) {
SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_INTERVAL);
CHECK_ALLOC(pInterval, NULL);
pInterval->interval = pWindowLogicNode->interval;
pInterval->offset = pWindowLogicNode->offset;
pInterval->sliding = pWindowLogicNode->sliding;
pInterval->pFill = nodesCloneNode(pWindowLogicNode->pFill);
SNodeList* pPrecalcExprs = NULL;
SNodeList* pFuncs = NULL;
CHECK_CODE(rewritePrecalcExprs(pCxt, pWindowLogicNode->pFuncs, &pPrecalcExprs, &pFuncs), (SPhysiNode*)pInterval);
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
// push down expression to pOutputDataBlockDesc of child node
if (NULL != pPrecalcExprs) {
pInterval->pExprs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs);
CHECK_ALLOC(pInterval->pExprs, (SPhysiNode*)pInterval);
CHECK_CODE(addDataBlockDesc(pCxt, pInterval->pExprs, pChildTupe), (SPhysiNode*)pInterval);
}
if (NULL != pFuncs) {
pInterval->pFuncs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs);
CHECK_ALLOC(pInterval->pFuncs, (SPhysiNode*)pInterval);
CHECK_CODE(addDataBlockDesc(pCxt, pInterval->pFuncs, pInterval->node.pOutputDataBlockDesc), (SPhysiNode*)pInterval);
}
CHECK_CODE(setSlotOutput(pCxt, pWindowLogicNode->node.pTargets, pInterval->node.pOutputDataBlockDesc), (SPhysiNode*)pInterval);
return (SPhysiNode*)pInterval;
}
static SPhysiNode* createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode) {
switch (pWindowLogicNode->winType) {
case WINDOW_TYPE_INTERVAL:
return createIntervalPhysiNode(pCxt, pChildren, pWindowLogicNode);
case WINDOW_TYPE_SESSION:
case WINDOW_TYPE_STATE:
break;
default:
break;
}
return NULL;
}
static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SLogicNode* pLogicPlan) {
SNodeList* pChildren = nodesMakeList();
CHECK_ALLOC(pChildren, NULL);
SNode* pLogicChild;
FOREACH(pLogicChild, pLogicPlan->pChildren) {
SNode* pChildPhyNode = (SNode*)createPhysiNode(pCxt, pSubplan, (SLogicNode*)pLogicChild);
if (TSDB_CODE_SUCCESS != nodesListAppend(pChildren, pChildPhyNode)) {
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pChildren, createPhysiNode(pCxt, pSubplan, (SLogicNode*)pLogicChild))) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
nodesDestroyList(pChildren);
return NULL;
@ -504,6 +548,9 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
pPhyNode = createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicPlan);
break;
case QUERY_NODE_LOGIC_PLAN_WINDOW:
pPhyNode = createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicPlan);
break;
default:
break;
}

View File

@ -166,3 +166,10 @@ TEST_F(PlannerTest, subquery) {
bind("SELECT count(*) FROM (SELECT c1 + c3 a, c1 + count(*) b FROM t1 where c2 = 'abc' GROUP BY c1, c3) where a > 100 group by b");
ASSERT_TRUE(run());
}
TEST_F(PlannerTest, interval) {
setDatabase("root", "test");
bind("SELECT count(*) FROM t1 interval(10s)");
ASSERT_TRUE(run());
}