Merge pull request #10365 from taosdata/feature/3.0_wxy
TD-13495 physical plan refactoring
This commit is contained in:
commit
5fa4b1ae86
|
@ -61,22 +61,27 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_INTERVAL_WINDOW,
|
||||
QUERY_NODE_NODE_LIST,
|
||||
QUERY_NODE_FILL,
|
||||
QUERY_NODE_RAW_EXPR, // Only be used in parser module.
|
||||
QUERY_NODE_COLUMN_REF,
|
||||
QUERY_NODE_TARGET,
|
||||
|
||||
// Only be used in parser module.
|
||||
QUERY_NODE_RAW_EXPR,
|
||||
QUERY_NODE_TUPLE_DESC,
|
||||
QUERY_NODE_SLOT_DESC,
|
||||
|
||||
// Statement nodes are used in parser and planner module.
|
||||
QUERY_NODE_SET_OPERATOR,
|
||||
QUERY_NODE_SELECT_STMT,
|
||||
QUERY_NODE_SHOW_STMT,
|
||||
|
||||
// logic plan node
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN,
|
||||
QUERY_NODE_LOGIC_PLAN_JOIN,
|
||||
QUERY_NODE_LOGIC_PLAN_FILTER,
|
||||
QUERY_NODE_LOGIC_PLAN_AGG,
|
||||
QUERY_NODE_LOGIC_PLAN_PROJECT
|
||||
QUERY_NODE_LOGIC_PLAN_PROJECT,
|
||||
|
||||
// physical plan node
|
||||
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_PROJECT
|
||||
} ENodeType;
|
||||
|
||||
/**
|
||||
|
|
|
@ -21,6 +21,7 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
#include "querynodes.h"
|
||||
#include "tmsg.h"
|
||||
|
||||
typedef struct SLogicNode {
|
||||
ENodeType type;
|
||||
|
@ -31,10 +32,20 @@ typedef struct SLogicNode {
|
|||
struct SLogicNode* pParent;
|
||||
} SLogicNode;
|
||||
|
||||
typedef enum EScanType {
|
||||
SCAN_TYPE_TAG,
|
||||
SCAN_TYPE_TABLE,
|
||||
SCAN_TYPE_STABLE,
|
||||
SCAN_TYPE_STREAM
|
||||
} EScanType;
|
||||
|
||||
typedef struct SScanLogicNode {
|
||||
SLogicNode node;
|
||||
SNodeList* pScanCols;
|
||||
struct STableMeta* pMeta;
|
||||
EScanType scanType;
|
||||
uint8_t scanFlag; // denotes reversed scan of data or not
|
||||
STimeWindow scanRange;
|
||||
} SScanLogicNode;
|
||||
|
||||
typedef struct SJoinLogicNode {
|
||||
|
@ -43,10 +54,6 @@ typedef struct SJoinLogicNode {
|
|||
SNode* pOnConditions;
|
||||
} SJoinLogicNode;
|
||||
|
||||
typedef struct SFilterLogicNode {
|
||||
SLogicNode node;
|
||||
} SFilterLogicNode;
|
||||
|
||||
typedef struct SAggLogicNode {
|
||||
SLogicNode node;
|
||||
SNodeList* pGroupKeys;
|
||||
|
@ -58,6 +65,56 @@ typedef struct SProjectLogicNode {
|
|||
SNodeList* pProjections;
|
||||
} SProjectLogicNode;
|
||||
|
||||
typedef struct SSlotDescNode {
|
||||
ENodeType type;
|
||||
int16_t slotId;
|
||||
SDataType dataType;
|
||||
int16_t srcTupleId;
|
||||
int16_t srcSlotId;
|
||||
bool reserve;
|
||||
bool output;
|
||||
} SSlotDescNode;
|
||||
|
||||
typedef struct STupleDescNode {
|
||||
ENodeType type;
|
||||
int16_t tupleId;
|
||||
SNodeList* pSlots;
|
||||
} STupleDescNode;
|
||||
|
||||
typedef struct SPhysiNode {
|
||||
ENodeType type;
|
||||
STupleDescNode outputTuple;
|
||||
SNode* pConditions;
|
||||
SNodeList* pChildren;
|
||||
struct SPhysiNode* pParent;
|
||||
} SPhysiNode;
|
||||
|
||||
typedef struct SScanPhysiNode {
|
||||
SPhysiNode node;
|
||||
SNodeList* pScanCols;
|
||||
uint64_t uid; // unique id of the table
|
||||
int8_t tableType;
|
||||
int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
|
||||
int32_t count; // repeat count
|
||||
int32_t reverse; // reverse scan count
|
||||
} SScanPhysiNode;
|
||||
|
||||
typedef SScanPhysiNode SSystemTableScanPhysiNode;
|
||||
typedef SScanPhysiNode STagScanPhysiNode;
|
||||
|
||||
typedef struct STableScanPhysiNode {
|
||||
SScanPhysiNode scan;
|
||||
uint8_t scanFlag; // denotes reversed scan of data or not
|
||||
STimeWindow scanRange;
|
||||
} STableScanPhysiNode;
|
||||
|
||||
typedef STableScanPhysiNode STableSeqScanPhysiNode;
|
||||
|
||||
typedef struct SProjectPhysiNode {
|
||||
SPhysiNode node;
|
||||
SNodeList* pProjections;
|
||||
} SProjectPhysiNode;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -68,6 +68,13 @@ typedef struct SColumnRefNode {
|
|||
int16_t columnId;
|
||||
} SColumnRefNode;
|
||||
|
||||
typedef struct STargetNode {
|
||||
ENodeType type;
|
||||
int16_t tupleId;
|
||||
int16_t slotId;
|
||||
SNode* pExpr;
|
||||
} STargetNode;
|
||||
|
||||
typedef struct SValueNode {
|
||||
SExprNode node; // QUERY_NODE_VALUE
|
||||
char* literal;
|
||||
|
@ -141,6 +148,7 @@ typedef struct SLogicConditionNode {
|
|||
|
||||
typedef struct SNodeListNode {
|
||||
ENodeType type; // QUERY_NODE_NODE_LIST
|
||||
SDataType dataType;
|
||||
SNodeList* pNodeList;
|
||||
} SNodeListNode;
|
||||
|
||||
|
@ -306,7 +314,8 @@ bool nodesIsJsonOp(const SOperatorNode* pOp);
|
|||
|
||||
bool nodesIsTimeorderQuery(const SNode* pQuery);
|
||||
bool nodesIsTimelineQuery(const SNode* pQuery);
|
||||
void *nodesGetValueFromNode(SValueNode *pNode);
|
||||
|
||||
void* nodesGetValueFromNode(SValueNode *pNode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -142,6 +142,21 @@ static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) {
|
|||
return (SNode*)pDst;
|
||||
}
|
||||
|
||||
static SNode* columnRefNodeCopy(const SColumnRefNode* pSrc, SColumnRefNode* pDst) {
|
||||
dataTypeCopy(&pSrc->dataType, &pDst->dataType);
|
||||
COPY_SCALAR_FIELD(tupleId);
|
||||
COPY_SCALAR_FIELD(slotId);
|
||||
COPY_SCALAR_FIELD(columnId);
|
||||
return (SNode*)pDst;
|
||||
}
|
||||
|
||||
static SNode* targetNodeCopy(const STargetNode* pSrc, STargetNode* pDst) {
|
||||
COPY_SCALAR_FIELD(tupleId);
|
||||
COPY_SCALAR_FIELD(slotId);
|
||||
COPY_NODE_FIELD(pExpr);
|
||||
return (SNode*)pDst;
|
||||
}
|
||||
|
||||
static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode* pDst) {
|
||||
COPY_SCALAR_FIELD(groupingSetType);
|
||||
COPY_NODE_LIST_FIELD(pParameterList);
|
||||
|
@ -168,6 +183,10 @@ SNode* nodesCloneNode(const SNode* pNode) {
|
|||
return logicConditionNodeCopy((const SLogicConditionNode*)pNode, (SLogicConditionNode*)pDst);
|
||||
case QUERY_NODE_FUNCTION:
|
||||
return functionNodeCopy((const SFunctionNode*)pNode, (SFunctionNode*)pDst);
|
||||
case QUERY_NODE_COLUMN_REF:
|
||||
return columnRefNodeCopy((const SColumnRefNode*)pNode, (SColumnRefNode*)pDst);
|
||||
case QUERY_NODE_TARGET:
|
||||
return targetNodeCopy((const STargetNode*)pNode, (STargetNode*)pDst);
|
||||
case QUERY_NODE_REAL_TABLE:
|
||||
case QUERY_NODE_TEMP_TABLE:
|
||||
case QUERY_NODE_JOIN_TABLE:
|
||||
|
|
|
@ -61,6 +61,10 @@ static char* nodeName(ENodeType type) {
|
|||
return "Target";
|
||||
case QUERY_NODE_RAW_EXPR:
|
||||
return "RawExpr";
|
||||
case QUERY_NODE_TUPLE_DESC:
|
||||
return "TupleDesc";
|
||||
case QUERY_NODE_SLOT_DESC:
|
||||
return "SlotDesc";
|
||||
case QUERY_NODE_SET_OPERATOR:
|
||||
return "SetOperator";
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
|
@ -71,16 +75,22 @@ static char* nodeName(ENodeType type) {
|
|||
return "LogicScan";
|
||||
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||
return "LogicJoin";
|
||||
case QUERY_NODE_LOGIC_PLAN_FILTER:
|
||||
return "LogicFilter";
|
||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||
return "LogicAgg";
|
||||
case QUERY_NODE_LOGIC_PLAN_PROJECT:
|
||||
return "LogicProject";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||
return "PhysiTagScan";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||
return "PhysiTableScan";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return "PhysiProject";
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return "Unknown";
|
||||
static char tmp[20];
|
||||
snprintf(tmp, sizeof(tmp), "Unknown %d", type);
|
||||
return tmp;
|
||||
}
|
||||
|
||||
static int32_t addNodeList(SJson* pJson, const char* pName, FToJson func, const SNodeList* pList) {
|
||||
|
@ -183,8 +193,93 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t logicFilterNodeToJson(const void* pObj, SJson* pJson) {
|
||||
return logicPlanNodeToJson(pObj, pJson);
|
||||
static const char* jkPhysiPlanOutputTuple = "OutputTuple";
|
||||
static const char* jkPhysiPlanConditions = "Conditions";
|
||||
static const char* jkPhysiPlanChildren = "Children";
|
||||
|
||||
static int32_t physicPlanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SPhysiNode* pNode = (const SPhysiNode*)pObj;
|
||||
|
||||
int32_t code = tjsonAddObject(pJson, jkPhysiPlanOutputTuple, nodeToJson, &pNode->outputTuple);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkPhysiPlanConditions, nodeToJson, pNode->pConditions);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addNodeList(pJson, jkPhysiPlanChildren, nodeToJson, pNode->pChildren);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkScanPhysiPlanScanCols = "ScanCols";
|
||||
static const char* jkScanPhysiPlanTableId = "TableId";
|
||||
static const char* jkScanPhysiPlanTableType = "TableType";
|
||||
static const char* jkScanPhysiPlanScanOrder = "ScanOrder";
|
||||
static const char* jkScanPhysiPlanScanCount = "ScanCount";
|
||||
static const char* jkScanPhysiPlanReverseScanCount = "ReverseScanCount";
|
||||
|
||||
static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
|
||||
|
||||
int32_t code = physicPlanNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addNodeList(pJson, jkScanPhysiPlanScanCols, nodeToJson, pNode->pScanCols);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableId, pNode->uid);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableType, pNode->tableType);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanScanOrder, pNode->order);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanScanCount, pNode->count);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanReverseScanCount, pNode->reverse);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t physiTagScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
return physiScanNodeToJson(pObj, pJson);
|
||||
}
|
||||
|
||||
static const char* jkTableScanPhysiPlanScanFlag = "ScanFlag";
|
||||
static const char* jkTableScanPhysiPlanStartKey = "StartKey";
|
||||
static const char* jkTableScanPhysiPlanEndKey = "EndKey";
|
||||
|
||||
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
||||
|
||||
int32_t code = physiScanNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanScanFlag, pNode->scanFlag);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanStartKey, pNode->scanRange.skey);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanEndKey, pNode->scanRange.ekey);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkProjectPhysiPlanProjections = "Projections";
|
||||
|
||||
static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SProjectPhysiNode* pNode = (const SProjectPhysiNode*)pObj;
|
||||
|
||||
int32_t code = physicPlanNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addNodeList(pJson, jkProjectPhysiPlanProjections, nodeToJson, pNode->pProjections);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkAggLogicPlanGroupKeys = "GroupKeys";
|
||||
|
@ -277,19 +372,6 @@ static int32_t columnNodeToJson(const void* pObj, SJson* pJson) {
|
|||
return code;
|
||||
}
|
||||
|
||||
// typedef struct SValueNode {
|
||||
// SExprNode node; // QUERY_NODE_VALUE
|
||||
// char* ;
|
||||
// bool ;
|
||||
// union {
|
||||
// bool b;
|
||||
// int64_t i;
|
||||
// uint64_t u;
|
||||
// double d;
|
||||
// char* p;
|
||||
// } datum;
|
||||
// } SValueNode;
|
||||
|
||||
static const char* jkValueLiteral = "Literal";
|
||||
static const char* jkValueDuration = "Duration";
|
||||
static const char* jkValueDatum = "Datum";
|
||||
|
@ -421,6 +503,74 @@ static int32_t groupingSetNodeToJson(const void* pObj, SJson* pJson) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static const char* jkColumnRefDataType = "DataType";
|
||||
static const char* jkColumnRefTupleId = "TupleId";
|
||||
static const char* jkColumnRefSlotId = "SlotId";
|
||||
static const char* jkColumnRefColumnId = "ColumnId";
|
||||
|
||||
static int32_t columnRefNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SColumnRefNode* pNode = (const SColumnRefNode*)pObj;
|
||||
|
||||
int32_t code = tjsonAddObject(pJson, jkColumnRefDataType, dataTypeToJson, &pNode->dataType);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkColumnRefTupleId, pNode->tupleId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkColumnRefSlotId, pNode->slotId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkColumnRefColumnId, pNode->columnId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkTargetTupleId = "TupleId";
|
||||
static const char* jkTargetSlotId = "SlotId";
|
||||
static const char* jkTargetExpr = "Expr";
|
||||
|
||||
static int32_t targetNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const STargetNode* pNode = (const STargetNode*)pObj;
|
||||
|
||||
int32_t code = tjsonAddIntegerToObject(pJson, jkTargetTupleId, pNode->tupleId);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkTargetSlotId, pNode->slotId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkTargetExpr, nodeToJson, pNode->pExpr);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkSlotDescSlotId = "SlotId";
|
||||
static const char* jkSlotDescDataType = "DataType";
|
||||
|
||||
static int32_t slotDescNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SSlotDescNode* pNode = (const SSlotDescNode*)pObj;
|
||||
|
||||
int32_t code = tjsonAddIntegerToObject(pJson, jkSlotDescSlotId, pNode->slotId);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkSlotDescDataType, dataTypeToJson, &pNode->dataType);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkTupleDescTupleId = "TupleId";
|
||||
static const char* jkTupleDescSlots = "Slots";
|
||||
|
||||
static int32_t tupleDescNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const STupleDescNode* pNode = (const STupleDescNode*)pObj;
|
||||
|
||||
int32_t code = tjsonAddIntegerToObject(pJson, jkTupleDescTupleId, pNode->tupleId);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addNodeList(pJson, jkTupleDescSlots, nodeToJson, pNode->pSlots);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkSelectStmtDistinct = "Distinct";
|
||||
static const char* jkSelectStmtProjections = "Projections";
|
||||
static const char* jkSelectStmtFrom = "From";
|
||||
|
@ -497,8 +647,15 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
case QUERY_NODE_NODE_LIST:
|
||||
case QUERY_NODE_FILL:
|
||||
case QUERY_NODE_COLUMN_REF:
|
||||
return columnRefNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_TARGET:
|
||||
return targetNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_RAW_EXPR:
|
||||
break;
|
||||
case QUERY_NODE_TUPLE_DESC:
|
||||
return tupleDescNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_SLOT_DESC:
|
||||
return slotDescNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_SET_OPERATOR:
|
||||
break;
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
|
@ -509,12 +666,16 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
return logicScanNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||
return logicJoinNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_LOGIC_PLAN_FILTER:
|
||||
return logicFilterNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||
return logicAggNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_LOGIC_PLAN_PROJECT:
|
||||
return logicProjectNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||
return physiTagScanNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||
return physiTableScanNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return physiProjectNodeToJson(pObj, pJson);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -75,12 +75,24 @@ SNode* nodesMakeNode(ENodeType type) {
|
|||
return makeNode(type, sizeof(SScanLogicNode));
|
||||
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||
return makeNode(type, sizeof(SJoinLogicNode));
|
||||
case QUERY_NODE_LOGIC_PLAN_FILTER:
|
||||
return makeNode(type, sizeof(SFilterLogicNode));
|
||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||
return makeNode(type, sizeof(SAggLogicNode));
|
||||
case QUERY_NODE_LOGIC_PLAN_PROJECT:
|
||||
return makeNode(type, sizeof(SProjectLogicNode));
|
||||
case QUERY_NODE_COLUMN_REF:
|
||||
return makeNode(type, sizeof(SColumnRefNode));
|
||||
case QUERY_NODE_TARGET:
|
||||
return makeNode(type, sizeof(STargetNode));
|
||||
case QUERY_NODE_TUPLE_DESC:
|
||||
return makeNode(type, sizeof(STupleDescNode));
|
||||
case QUERY_NODE_SLOT_DESC:
|
||||
return makeNode(type, sizeof(SSlotDescNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||
return makeNode(type, sizeof(STagScanPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||
return makeNode(type, sizeof(STableScanPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return makeNode(type, sizeof(SProjectPhysiNode));
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -184,29 +196,29 @@ void nodesDestroyList(SNodeList* pList) {
|
|||
tfree(pList);
|
||||
}
|
||||
|
||||
void *nodesGetValueFromNode(SValueNode *pNode) {
|
||||
void* nodesGetValueFromNode(SValueNode *pNode) {
|
||||
switch (pNode->node.resType.type) {
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
return (void *)&pNode->datum.b;
|
||||
return (void*)&pNode->datum.b;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
return (void *)&pNode->datum.i;
|
||||
return (void*)&pNode->datum.i;
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
return (void *)&pNode->datum.u;
|
||||
return (void*)&pNode->datum.u;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
return (void *)&pNode->datum.d;
|
||||
return (void*)&pNode->datum.d;
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_VARCHAR:
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
return (void *)pNode->datum.p;
|
||||
return (void*)pNode->datum.p;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -500,22 +500,24 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode* pCol) {
|
|||
}
|
||||
|
||||
static int32_t trimStringCopy(const char* src, int32_t len, char* dst) {
|
||||
varDataSetLen(dst, len);
|
||||
char* dstVal = varDataVal(dst);
|
||||
// delete escape character: \\, \', \"
|
||||
char delim = src[0];
|
||||
int32_t cnt = 0;
|
||||
int32_t j = 0;
|
||||
for (uint32_t k = 1; k < len - 1; ++k) {
|
||||
if (src[k] == '\\' || (src[k] == delim && src[k + 1] == delim)) {
|
||||
dst[j] = src[k + 1];
|
||||
dstVal[j] = src[k + 1];
|
||||
cnt++;
|
||||
j++;
|
||||
k++;
|
||||
continue;
|
||||
}
|
||||
dst[j] = src[k];
|
||||
dstVal[j] = src[k];
|
||||
j++;
|
||||
}
|
||||
dst[j] = '\0';
|
||||
dstVal[j] = '\0';
|
||||
return j;
|
||||
}
|
||||
|
||||
|
@ -560,7 +562,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
|
|||
case TSDB_DATA_TYPE_VARCHAR:
|
||||
case TSDB_DATA_TYPE_VARBINARY: {
|
||||
int32_t n = strlen(pVal->literal);
|
||||
pVal->datum.p = calloc(1, n);
|
||||
pVal->datum.p = calloc(1, n + VARSTR_HEADER_SIZE);
|
||||
if (NULL == pVal->datum.p) {
|
||||
generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
|
||||
return DEAL_RES_ERROR;
|
||||
|
|
|
@ -24,6 +24,7 @@ extern "C" {
|
|||
#include "planner.h"
|
||||
|
||||
int32_t createLogicPlan(SNode* pNode, SLogicNode** pLogicNode);
|
||||
int32_t createPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
#define CHECK_ALLOC(p, res) \
|
||||
do { \
|
||||
if (NULL == (p)) { \
|
||||
printf("%s : %d\n", __FUNCTION__, __LINE__); \
|
||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \
|
||||
return (res); \
|
||||
} \
|
||||
|
@ -29,7 +28,6 @@
|
|||
do { \
|
||||
int32_t code = (exec); \
|
||||
if (TSDB_CODE_SUCCESS != code) { \
|
||||
printf("%s : %d\n", __FUNCTION__, __LINE__); \
|
||||
pCxt->errCode = code; \
|
||||
return (res); \
|
||||
} \
|
||||
|
@ -38,7 +36,6 @@
|
|||
typedef struct SPlanContext {
|
||||
int32_t errCode;
|
||||
int32_t planNodeId;
|
||||
SNodeList* pResource;
|
||||
} SPlanContext;
|
||||
|
||||
static SLogicNode* createQueryLogicNode(SPlanContext* pCxt, SNode* pStmt);
|
||||
|
@ -60,10 +57,7 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
|
|||
FOREACH(pExpr, pCxt->pExprs) {
|
||||
if (nodesEqualNode(pExpr, *pNode)) {
|
||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
CHECK_ALLOC(pCol, DEAL_RES_ERROR);
|
||||
SExprNode* pToBeRewrittenExpr = (SExprNode*)(*pNode);
|
||||
pCol->node.resType = pToBeRewrittenExpr->resType;
|
||||
strcpy(pCol->node.aliasName, pToBeRewrittenExpr->aliasName);
|
||||
|
@ -222,26 +216,6 @@ static SLogicNode* createLogicNodeByTable(SPlanContext* pCxt, SSelectStmt* pSele
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static SLogicNode* createWhereFilterLogicNode(SPlanContext* pCxt, SLogicNode* pChild, SSelectStmt* pSelect) {
|
||||
if (NULL == pSelect->pWhere) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SFilterLogicNode* pFilter = (SFilterLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_FILTER);
|
||||
CHECK_ALLOC(pFilter, NULL);
|
||||
pFilter->node.id = pCxt->planNodeId++;
|
||||
|
||||
// set filter conditions
|
||||
pFilter->node.pConditions = nodesCloneNode(pSelect->pWhere);
|
||||
CHECK_ALLOC(pFilter->node.pConditions, (SLogicNode*)pFilter);
|
||||
|
||||
// set the output
|
||||
pFilter->node.pTargets = nodesCloneList(pChild->pTargets);
|
||||
CHECK_ALLOC(pFilter->node.pTargets, (SLogicNode*)pFilter);
|
||||
|
||||
return (SLogicNode*)pFilter;
|
||||
}
|
||||
|
||||
typedef struct SCreateColumnCxt {
|
||||
int32_t errCode;
|
||||
SNodeList* pList;
|
||||
|
@ -252,10 +226,8 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
|
|||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_COLUMN: {
|
||||
SNode* pCol = nodesCloneNode(pNode);
|
||||
if (NULL == pCol || TSDB_CODE_SUCCESS != nodesListAppend(pCxt->pList, pCol)) {
|
||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
CHECK_ALLOC(pCol, DEAL_RES_ERROR);
|
||||
CHECK_CODE(nodesListAppend(pCxt->pList, pCol), DEAL_RES_ERROR);
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
case QUERY_NODE_OPERATOR:
|
||||
|
@ -263,16 +235,10 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
|
|||
case QUERY_NODE_FUNCTION: {
|
||||
SExprNode* pExpr = (SExprNode*)pNode;
|
||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
CHECK_ALLOC(pCol, DEAL_RES_ERROR);
|
||||
pCol->node.resType = pExpr->resType;
|
||||
strcpy(pCol->colName, pExpr->aliasName);
|
||||
if (TSDB_CODE_SUCCESS != nodesListAppend(pCxt->pList, (SNode*)pCol)) {
|
||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
CHECK_CODE(nodesListAppend(pCxt->pList, (SNode*)pCol), DEAL_RES_ERROR);
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
default:
|
||||
|
@ -284,9 +250,8 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
|
|||
|
||||
static SNodeList* createColumnByRewriteExps(SPlanContext* pCxt, SNodeList* pExprs) {
|
||||
SCreateColumnCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pList = nodesMakeList() };
|
||||
if (NULL == cxt.pList) {
|
||||
return NULL;
|
||||
}
|
||||
CHECK_ALLOC(cxt.pList, NULL);
|
||||
|
||||
nodesWalkList(pExprs, doCreateColumn, &cxt);
|
||||
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
||||
nodesDestroyList(cxt.pList);
|
||||
|
@ -379,8 +344,9 @@ static SLogicNode* createProjectLogicNode(SPlanContext* pCxt, SSelectStmt* pSele
|
|||
|
||||
static SLogicNode* createSelectLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) {
|
||||
SLogicNode* pRoot = createLogicNodeByTable(pCxt, pSelect, pSelect->pFromTable);
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||
pRoot = pushLogicNode(pCxt, pRoot, createWhereFilterLogicNode(pCxt, pRoot, pSelect));
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode && NULL != pSelect->pWhere) {
|
||||
pRoot->pConditions = nodesCloneNode(pSelect->pWhere);
|
||||
CHECK_ALLOC(pRoot->pConditions, pRoot);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||
pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect));
|
||||
|
@ -410,3 +376,300 @@ int32_t createLogicPlan(SNode* pNode, SLogicNode** pLogicNode) {
|
|||
*pLogicNode = pRoot;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t optimize(SLogicNode* pLogicNode) {
|
||||
// todo
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef struct SSubLogicPlan {
|
||||
SNode* pRoot; // SLogicNode
|
||||
bool haveSuperTable;
|
||||
bool haveSystemTable;
|
||||
} SSubLogicPlan;
|
||||
|
||||
int32_t splitLogicPlan(SSubLogicPlan* pLogicPlan) {
|
||||
// todo
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef struct SSlotIndex {
|
||||
int16_t tupleId;
|
||||
int16_t slotId;
|
||||
} SSlotIndex;
|
||||
|
||||
typedef struct SPhysiPlanContext {
|
||||
int32_t errCode;
|
||||
int16_t nextTupleId;
|
||||
SArray* pTupleHelper;
|
||||
} SPhysiPlanContext;
|
||||
|
||||
static int32_t getSlotKey(SNode* pNode, char* pKey) {
|
||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
return sprintf(pKey, "%s.%s", ((SColumnNode*)pNode)->tableAlias, ((SColumnNode*)pNode)->colName);
|
||||
} else {
|
||||
return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName);
|
||||
}
|
||||
}
|
||||
|
||||
static SNode* createColumnRef(SNode* pNode, int16_t tupleId, int16_t slotId) {
|
||||
SColumnRefNode* pCol = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF);
|
||||
if (NULL == pCol) {
|
||||
return NULL;
|
||||
}
|
||||
pCol->dataType = ((SExprNode*)pNode)->resType;
|
||||
pCol->tupleId = tupleId;
|
||||
pCol->slotId = slotId;
|
||||
pCol->columnId = (QUERY_NODE_COLUMN == nodeType(pNode) ? ((SColumnNode*)pNode)->colId : -1);
|
||||
return (SNode*)pCol;
|
||||
}
|
||||
|
||||
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId) {
|
||||
SSlotDescNode* pSlot = (SSlotDescNode*)nodesMakeNode(QUERY_NODE_SLOT_DESC);
|
||||
CHECK_ALLOC(pSlot, NULL);
|
||||
pSlot->slotId = slotId;
|
||||
pSlot->dataType = ((SExprNode*)pNode)->resType;
|
||||
pSlot->srcTupleId = -1;
|
||||
pSlot->srcSlotId = -1;
|
||||
pSlot->reserve = false;
|
||||
pSlot->output = true;
|
||||
return (SNode*)pSlot;
|
||||
}
|
||||
|
||||
static SNode* createTarget(SNode* pNode, int16_t tupleId, int16_t slotId) {
|
||||
STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET);
|
||||
if (NULL == pTarget) {
|
||||
return NULL;
|
||||
}
|
||||
pTarget->tupleId = tupleId;
|
||||
pTarget->slotId = slotId;
|
||||
pTarget->pExpr = nodesCloneNode(pNode);
|
||||
if (NULL == pTarget->pExpr) {
|
||||
nodesDestroyNode((SNode*)pTarget);
|
||||
return NULL;
|
||||
}
|
||||
return (SNode*)pTarget;
|
||||
}
|
||||
|
||||
static int32_t addTupleDesc(SPhysiPlanContext* pCxt, SNodeList* pList, STupleDescNode* pTuple, SNodeList** pOutput) {
|
||||
pTuple->tupleId = pCxt->nextTupleId++;
|
||||
|
||||
SHashObj* pHash = NULL;
|
||||
if (NULL == pTuple->pSlots) {
|
||||
pTuple->pSlots = nodesMakeList();
|
||||
CHECK_ALLOC(pTuple->pSlots, TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
||||
pHash = taosHashInit(LIST_LENGTH(pList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
CHECK_ALLOC(pHash, TSDB_CODE_OUT_OF_MEMORY);
|
||||
if (NULL == taosArrayInsert(pCxt->pTupleHelper, pTuple->tupleId, &pHash)) {
|
||||
taosHashCleanup(pHash);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
} else {
|
||||
pHash = taosArrayGetP(pCxt->pTupleHelper, pTuple->tupleId);
|
||||
}
|
||||
|
||||
*pOutput = nodesMakeList();
|
||||
CHECK_ALLOC(*pOutput, TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
||||
SNode* pNode = NULL;
|
||||
int16_t slotId = 0;
|
||||
FOREACH(pNode, pList) {
|
||||
SNode* pSlot = createSlotDesc(pCxt, pNode, slotId);
|
||||
CHECK_ALLOC(pSlot, TSDB_CODE_OUT_OF_MEMORY);
|
||||
if (TSDB_CODE_SUCCESS != nodesListAppend(pTuple->pSlots, (SNode*)pSlot)) {
|
||||
nodesDestroyNode(pSlot);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SNode* pTarget = createTarget(pNode, pTuple->tupleId, slotId);
|
||||
CHECK_ALLOC(pTarget, TSDB_CODE_OUT_OF_MEMORY);
|
||||
if (TSDB_CODE_SUCCESS != nodesListAppend(*pOutput, pTarget)) {
|
||||
nodesDestroyNode(pTarget);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SSlotIndex index = { .tupleId = pTuple->tupleId, .slotId = slotId };
|
||||
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
|
||||
int32_t len = getSlotKey(pNode, name);
|
||||
CHECK_CODE(taosHashPut(pHash, name, len, &index, sizeof(SSlotIndex)), TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
||||
++slotId;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef struct STransformCxt {
|
||||
int32_t errCode;
|
||||
SHashObj* pHash;
|
||||
} STransformCxt;
|
||||
|
||||
static EDealRes doTransform(SNode** pNode, void* pContext) {
|
||||
if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
|
||||
STransformCxt* pCxt = (STransformCxt*)pContext;
|
||||
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
|
||||
int32_t len = getSlotKey(*pNode, name);
|
||||
SSlotIndex* pIndex = taosHashGet(pCxt->pHash, name, len);
|
||||
if (NULL != pIndex) {
|
||||
*pNode = createColumnRef(*pNode, pIndex->tupleId, pIndex->slotId);
|
||||
CHECK_ALLOC(*pNode, DEAL_RES_ERROR);
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static SNode* transformForPhysiPlan(SPhysiPlanContext* pCxt, int16_t tupleId, SNode* pNode) {
|
||||
SNode* pRes = nodesCloneNode(pNode);
|
||||
CHECK_ALLOC(pRes, NULL);
|
||||
STransformCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pHash = taosArrayGetP(pCxt->pTupleHelper, tupleId) };
|
||||
nodesRewriteNode(&pRes, doTransform, &cxt);
|
||||
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
||||
nodesDestroyNode(pRes);
|
||||
return NULL;
|
||||
}
|
||||
return pRes;
|
||||
}
|
||||
|
||||
static SNodeList* transformListForPhysiPlan(SPhysiPlanContext* pCxt, int16_t tupleId, SNodeList* pList) {
|
||||
SNodeList* pRes = nodesCloneList(pList);
|
||||
CHECK_ALLOC(pRes, NULL);
|
||||
STransformCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pHash = taosArrayGetP(pCxt->pTupleHelper, tupleId) };
|
||||
nodesRewriteList(pRes, doTransform, &cxt);
|
||||
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
||||
nodesDestroyList(pRes);
|
||||
return NULL;
|
||||
}
|
||||
return pRes;
|
||||
}
|
||||
|
||||
static SPhysiNode* makePhysiNode(ENodeType type) {
|
||||
SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type);
|
||||
if (NULL == pPhysiNode) {
|
||||
return NULL;
|
||||
}
|
||||
pPhysiNode->outputTuple.type = QUERY_NODE_TUPLE_DESC;
|
||||
return pPhysiNode;
|
||||
}
|
||||
|
||||
static int32_t initScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SScanPhysiNode* pScanPhysiNode) {
|
||||
CHECK_CODE(addTupleDesc(pCxt, pScanLogicNode->pScanCols, &pScanPhysiNode->node.outputTuple, &pScanPhysiNode->pScanCols), TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
||||
if (NULL != pScanLogicNode->node.pConditions) {
|
||||
pScanPhysiNode->node.pConditions = transformForPhysiPlan(pCxt, pScanPhysiNode->node.outputTuple.tupleId, pScanLogicNode->node.pConditions);
|
||||
CHECK_ALLOC(pScanPhysiNode->node.pConditions, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
|
||||
pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
|
||||
pScanPhysiNode->order = TSDB_ORDER_ASC;
|
||||
pScanPhysiNode->count = 1;
|
||||
pScanPhysiNode->reverse = 0;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SPhysiNode* createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
|
||||
STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
|
||||
CHECK_ALLOC(pTagScan, NULL);
|
||||
CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pTagScan), (SPhysiNode*)pTagScan);
|
||||
return (SPhysiNode*)pTagScan;
|
||||
}
|
||||
|
||||
static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
|
||||
STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
|
||||
CHECK_ALLOC(pTableScan, NULL);
|
||||
CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pTableScan), (SPhysiNode*)pTableScan);
|
||||
pTableScan->scanFlag = pScanLogicNode->scanFlag;
|
||||
pTableScan->scanRange = pScanLogicNode->scanRange;
|
||||
return (SPhysiNode*)pTableScan;
|
||||
}
|
||||
|
||||
static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
|
||||
switch (pScanLogicNode->scanType) {
|
||||
case SCAN_TYPE_TAG:
|
||||
return createTagScanPhysiNode(pCxt, pScanLogicNode);
|
||||
case SCAN_TYPE_TABLE:
|
||||
return createTableScanPhysiNode(pCxt, pScanLogicNode);
|
||||
case SCAN_TYPE_STABLE:
|
||||
case SCAN_TYPE_STREAM:
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SProjectLogicNode* pProjectLogicNode) {
|
||||
SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(QUERY_NODE_PHYSICAL_PLAN_PROJECT);
|
||||
CHECK_ALLOC(pProject, NULL);
|
||||
|
||||
SNodeList* pProjections = transformListForPhysiPlan(pCxt, pProject->node.outputTuple.tupleId, pProjectLogicNode->pProjections);
|
||||
CHECK_ALLOC(pProjections, (SPhysiNode*)pProject);
|
||||
CHECK_CODE(addTupleDesc(pCxt, pProjections, &pProject->node.outputTuple, &pProject->pProjections), (SPhysiNode*)pProject);
|
||||
nodesDestroyList(pProjections);
|
||||
|
||||
if (NULL != pProjectLogicNode->node.pConditions) {
|
||||
pProject->node.pConditions = transformForPhysiPlan(pCxt, pProject->node.outputTuple.tupleId, pProjectLogicNode->node.pConditions);
|
||||
CHECK_ALLOC(pProject->node.pConditions, (SPhysiNode*)pProject);
|
||||
}
|
||||
|
||||
return (SPhysiNode*)pProject;
|
||||
}
|
||||
|
||||
static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPlan) {
|
||||
SNodeList* pChildern = nodesMakeList();
|
||||
CHECK_ALLOC(pChildern, NULL);
|
||||
|
||||
SNode* pLogicChild;
|
||||
FOREACH(pLogicChild, pLogicPlan->pChildren) {
|
||||
SNode* pChildPhyNode = (SNode*)createPhysiNode(pCxt, (SLogicNode*)pLogicChild);
|
||||
if (TSDB_CODE_SUCCESS != nodesListAppend(pChildern, pChildPhyNode)) {
|
||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
|
||||
nodesDestroyList(pChildern);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
SPhysiNode* pPhyNode = NULL;
|
||||
switch (nodeType(pLogicPlan)) {
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
pPhyNode = createScanPhysiNode(pCxt, (SScanLogicNode*)pLogicPlan);
|
||||
break;
|
||||
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||
break;
|
||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||
break;
|
||||
case QUERY_NODE_LOGIC_PLAN_PROJECT:
|
||||
pPhyNode = createProjectPhysiNode(pCxt, (SProjectLogicNode*)pLogicPlan);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (NULL != pPhyNode) {
|
||||
pPhyNode->pChildren = pChildern;
|
||||
SNode* pChild;
|
||||
FOREACH(pChild, pPhyNode->pChildren) {
|
||||
((SPhysiNode*)pChild)->pParent = pPhyNode;
|
||||
}
|
||||
}
|
||||
|
||||
return pPhyNode;
|
||||
}
|
||||
|
||||
int32_t createPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode) {
|
||||
SPhysiPlanContext cxt = { .errCode = TSDB_CODE_SUCCESS, .nextTupleId = 0, .pTupleHelper = taosArrayInit(32, POINTER_BYTES) };
|
||||
if (NULL == cxt.pTupleHelper) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
*pPhyNode = createPhysiNode(&cxt, pLogicNode);
|
||||
return cxt.errCode;
|
||||
}
|
||||
|
||||
int32_t buildPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode) {
|
||||
// split
|
||||
// scale out
|
||||
// maping
|
||||
// create
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -25,6 +25,11 @@ using namespace testing;
|
|||
|
||||
class NewPlannerTest : public Test {
|
||||
protected:
|
||||
enum TestTarget {
|
||||
TEST_LOGIC_PLAN,
|
||||
TEST_PHYSICAL_PLAN
|
||||
};
|
||||
|
||||
void setDatabase(const string& acctId, const string& db) {
|
||||
acctId_ = acctId;
|
||||
db_ = db;
|
||||
|
@ -40,7 +45,7 @@ protected:
|
|||
cxt_.pSql = sqlBuf_.c_str();
|
||||
}
|
||||
|
||||
bool run() {
|
||||
bool run(TestTarget target = TEST_PHYSICAL_PLAN) {
|
||||
int32_t code = parser(&cxt_, &query_);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -53,17 +58,27 @@ protected:
|
|||
SLogicNode* pLogicPlan = nullptr;
|
||||
code = createLogicPlan(query_.pRoot, &pLogicPlan);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
cout << "sql:[" << cxt_.pSql << "] plan code:" << code << ", strerror:" << tstrerror(code) << endl;
|
||||
cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
cout << "sql : [" << cxt_.pSql << "]" << endl;
|
||||
cout << "syntax test : " << endl;
|
||||
cout << syntaxTreeStr << endl;
|
||||
// cout << "logic plan : " << endl;
|
||||
// cout << toString((const SNode*)pLogicPlan) << endl;
|
||||
cout << "unformatted logic plan : " << endl;
|
||||
cout << toString((const SNode*)pLogicPlan, false) << endl;
|
||||
|
||||
if (TEST_PHYSICAL_PLAN == target) {
|
||||
SPhysiNode* pPhyPlan = nullptr;
|
||||
code = createPhysiPlan(pLogicPlan, &pPhyPlan);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
cout << "sql:[" << cxt_.pSql << "] physical plan code:" << code << ", strerror:" << tstrerror(code) << endl;
|
||||
return false;
|
||||
}
|
||||
cout << "unformatted physical plan : " << endl;
|
||||
cout << toString((const SNode*)pPhyPlan, false) << endl;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -120,3 +135,10 @@ TEST_F(NewPlannerTest, groupBy) {
|
|||
bind("SELECT c1 + c3, count(*) FROM t1 where concat(c2, 'wwww') = 'abcwww' GROUP BY c1 + c3");
|
||||
ASSERT_TRUE(run());
|
||||
}
|
||||
|
||||
TEST_F(NewPlannerTest, subquery) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("SELECT count(*) FROM (SELECT c1 + c3 a, c1 + count(*) b FROM t1 where c2 = 'abc' GROUP BY c1, c3) where a > 100 group by b");
|
||||
ASSERT_TRUE(run());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue