TD-12034 Physical plan code.

This commit is contained in:
Xiaoyu Wang 2021-12-14 07:37:54 -05:00
parent 33a13ff54c
commit 3936128742
4 changed files with 101 additions and 49 deletions

View File

@ -54,7 +54,6 @@ enum OPERATOR_TYPE_E {
}; };
struct SEpSet; struct SEpSet;
struct SQueryPlanNode;
struct SPhyNode; struct SPhyNode;
struct SQueryStmtInfo; struct SQueryStmtInfo;

View File

@ -25,18 +25,20 @@ extern "C" {
#include "planner.h" #include "planner.h"
#include "taosmsg.h" #include "taosmsg.h"
enum LOGIC_PLAN_E { #define QNODE_TAGSCAN 1
LP_SCAN = 1, #define QNODE_TABLESCAN 2
LP_SESSION = 2, #define QNODE_PROJECT 3
LP_STATE = 3, #define QNODE_AGGREGATE 4
LP_INTERVAL = 4, #define QNODE_GROUPBY 5
LP_FILL = 5, #define QNODE_LIMIT 6
LP_AGG = 6, #define QNODE_JOIN 7
LP_JOIN = 7, #define QNODE_DISTINCT 8
LP_PROJECT = 8, #define QNODE_SORT 9
LP_DISTINCT = 9, #define QNODE_UNION 10
LP_ORDER = 10 #define QNODE_TIMEWINDOW 11
}; #define QNODE_SESSIONWINDOW 12
#define QNODE_STATEWINDOW 13
#define QNODE_FILL 14
typedef struct SQueryNodeBasicInfo { typedef struct SQueryNodeBasicInfo {
int32_t type; // operator type int32_t type; // operator type
@ -64,10 +66,10 @@ typedef struct SQueryPlanNode {
SArray *pExpr; // the query functions or sql aggregations SArray *pExpr; // the query functions or sql aggregations
int32_t numOfExpr; // number of result columns, which is also the number of pExprs int32_t numOfExpr; // number of result columns, which is also the number of pExprs
void *pExtInfo; // additional information void *pExtInfo; // additional information
// previous operator to generated result for current node to process // children operator to generated result for current node to process
// in case of join, multiple prev nodes exist. // in case of join, multiple prev nodes exist.
SArray *pPrevNodes; // upstream nodes SArray *pChildren; // upstream nodes
struct SQueryPlanNode *nextNode; struct SQueryPlanNode *pParent;
} SQueryPlanNode; } SQueryPlanNode;
typedef SSchema SSlotSchema; typedef SSchema SSlotSchema;
@ -86,10 +88,12 @@ typedef struct SPhyNode {
// children plan to generated result for current node to process // children plan to generated result for current node to process
// in case of join, multiple plan nodes exist. // in case of join, multiple plan nodes exist.
SArray *pChildren; SArray *pChildren;
struct SPhyNode *pParent;
} SPhyNode; } SPhyNode;
typedef struct SScanPhyNode { typedef struct SScanPhyNode {
SPhyNode node; SPhyNode node;
STimeWindow window;
uint64_t uid; // unique id of the table uint64_t uid; // unique id of the table
} SScanPhyNode; } SScanPhyNode;

View File

@ -15,20 +15,84 @@
#include "plannerInt.h" #include "plannerInt.h"
// typedef struct SQueryPlanNode {
// void *pExtInfo; // additional information
// SArray *pPrevNodes; // children
// struct SQueryPlanNode *nextNode; // parent
// } SQueryPlanNode;
// typedef struct SSubplan {
// int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN
// SArray *pDatasource; // the datasource subplan,from which to fetch the result
// struct SPhyNode *pNode; // physical plan of current subplan
// } SSubplan;
// typedef struct SQueryDag {
// SArray **pSubplans;
// } SQueryDag;
// typedef struct SScanPhyNode {
// SPhyNode node;
// STimeWindow window;
// uint64_t uid; // unique id of the table
// } SScanPhyNode;
// typedef SScanPhyNode STagScanPhyNode;
void fillDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
dataBlockSchema->index = 0; // todo
SWAP(dataBlockSchema->pSchema, pPlanNode->pSchema, SSchema*);
dataBlockSchema->numOfCols = pPlanNode->numOfCols;
}
void fillPhyNode(SQueryPlanNode* pPlanNode, int32_t type, const char* name, SPhyNode* node) {
node->info.type = type;
node->info.name = name;
SWAP(node->pTargets, pPlanNode->pExpr, SArray*);
fillDataBlockSchema(pPlanNode, &(node->targetSchema));
}
SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
STagScanPhyNode* node = calloc(1, sizeof(STagScanPhyNode));
fillPhyNode(pPlanNode, OP_TagScan, "TagScan", (SPhyNode*)node);
return (SPhyNode*)node;
}
SPhyNode* createScanNode(SQueryPlanNode* pPlanNode) { SPhyNode* createScanNode(SQueryPlanNode* pPlanNode) {
return NULL; STagScanPhyNode* node = calloc(1, sizeof(STagScanPhyNode));
fillPhyNode(pPlanNode, OP_TableScan, "SingleTableScan", (SPhyNode*)node);
return (SPhyNode*)node;
} }
SPhyNode* createPhyNode(SQueryPlanNode* node) { SPhyNode* createPhyNode(SQueryPlanNode* pPlanNode) {
switch (node->info.type) { SPhyNode* node = NULL;
case LP_SCAN: switch (pPlanNode->info.type) {
return createScanNode(node); case QNODE_TAGSCAN:
node = createTagScanNode(pPlanNode);
break;
case QNODE_TABLESCAN:
node = createScanNode(pPlanNode);
break;
default:
assert(false);
} }
return NULL; if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
node->pChildren = taosArrayInit(4, POINTER_BYTES);
size_t size = taosArrayGetSize(pPlanNode->pChildren);
for(int32_t i = 0; i < size; ++i) {
SPhyNode* child = createPhyNode(taosArrayGet(pPlanNode->pChildren, i));
child->pParent = node;
taosArrayPush(node->pChildren, &child);
}
}
return node;
} }
SPhyNode* createSubplan(SQueryPlanNode* pSubquery) { SSubplan* createSubplan(SQueryPlanNode* pSubquery) {
return NULL; SSubplan* subplan = calloc(1, sizeof(SSubplan));
subplan->pNode = createPhyNode(pSubquery);
// todo
return subplan;
} }
int32_t createDag(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) { int32_t createDag(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) {

View File

@ -18,21 +18,6 @@
#include "parser.h" #include "parser.h"
#include "plannerInt.h" #include "plannerInt.h"
#define QNODE_TAGSCAN 1
#define QNODE_TABLESCAN 2
#define QNODE_PROJECT 3
#define QNODE_AGGREGATE 4
#define QNODE_GROUPBY 5
#define QNODE_LIMIT 6
#define QNODE_JOIN 7
#define QNODE_DISTINCT 8
#define QNODE_SORT 9
#define QNODE_UNION 10
#define QNODE_TIMEWINDOW 11
#define QNODE_SESSIONWINDOW 12
#define QNODE_STATEWINDOW 13
#define QNODE_FILL 14
typedef struct SFillEssInfo { typedef struct SFillEssInfo {
int32_t fillType; // fill type int32_t fillType; // fill type
int64_t *val; // fill value int64_t *val; // fill value
@ -104,9 +89,9 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
taosArrayPush(pNode->pExpr, &pExpr[i]); taosArrayPush(pNode->pExpr, &pExpr[i]);
} }
pNode->pPrevNodes = taosArrayInit(4, POINTER_BYTES); pNode->pChildren = taosArrayInit(4, POINTER_BYTES);
for(int32_t i = 0; i < numOfPrev; ++i) { for(int32_t i = 0; i < numOfPrev; ++i) {
taosArrayPush(pNode->pPrevNodes, &prev[i]); taosArrayPush(pNode->pChildren, &prev[i]);
} }
switch(type) { switch(type) {
@ -386,14 +371,14 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
tfree(pQueryNode->info.name); tfree(pQueryNode->info.name);
// dropAllExprInfo(pQueryNode->pExpr); // dropAllExprInfo(pQueryNode->pExpr);
if (pQueryNode->pPrevNodes != NULL) { if (pQueryNode->pChildren != NULL) {
int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pPrevNodes); int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pChildren);
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
SQueryPlanNode* p = taosArrayGetP(pQueryNode->pPrevNodes, i); SQueryPlanNode* p = taosArrayGetP(pQueryNode->pChildren, i);
doDestroyQueryNode(p); doDestroyQueryNode(p);
} }
taosArrayDestroy(pQueryNode->pPrevNodes); taosArrayDestroy(pQueryNode->pChildren);
} }
tfree(pQueryNode); tfree(pQueryNode);
@ -607,8 +592,8 @@ int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t
int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t totalLen) { int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t totalLen) {
int32_t len = doPrintPlan(buf, pQueryNode, level, totalLen); int32_t len = doPrintPlan(buf, pQueryNode, level, totalLen);
for(int32_t i = 0; i < taosArrayGetSize(pQueryNode->pPrevNodes); ++i) { for(int32_t i = 0; i < taosArrayGetSize(pQueryNode->pChildren); ++i) {
SQueryPlanNode* p1 = taosArrayGetP(pQueryNode->pPrevNodes, i); SQueryPlanNode* p1 = taosArrayGetP(pQueryNode->pChildren, i);
int32_t len1 = queryPlanToStringImpl(buf, p1, level + 1, len); int32_t len1 = queryPlanToStringImpl(buf, p1, level + 1, len);
len = len1; len = len1;
} }