TD-13495 physical plan refactoring
This commit is contained in:
parent
c9c75bdf79
commit
7377c27367
|
@ -63,7 +63,7 @@ typedef enum ENodeType {
|
||||||
QUERY_NODE_FILL,
|
QUERY_NODE_FILL,
|
||||||
QUERY_NODE_RAW_EXPR, // Only be used in parser module.
|
QUERY_NODE_RAW_EXPR, // Only be used in parser module.
|
||||||
QUERY_NODE_TARGET,
|
QUERY_NODE_TARGET,
|
||||||
QUERY_NODE_TUPLE_DESC,
|
QUERY_NODE_DATABLOCK_DESC,
|
||||||
QUERY_NODE_SLOT_DESC,
|
QUERY_NODE_SLOT_DESC,
|
||||||
|
|
||||||
// Statement nodes are used in parser and planner module.
|
// Statement nodes are used in parser and planner module.
|
||||||
|
|
|
@ -71,17 +71,18 @@ typedef struct SSlotDescNode {
|
||||||
SDataType dataType;
|
SDataType dataType;
|
||||||
bool reserve;
|
bool reserve;
|
||||||
bool output;
|
bool output;
|
||||||
|
bool tag;
|
||||||
} SSlotDescNode;
|
} SSlotDescNode;
|
||||||
|
|
||||||
typedef struct STupleDescNode {
|
typedef struct SDataBlockDescNode {
|
||||||
ENodeType type;
|
ENodeType type;
|
||||||
int16_t tupleId;
|
int16_t dataBlockId;
|
||||||
SNodeList* pSlots;
|
SNodeList* pSlots;
|
||||||
} STupleDescNode;
|
} SDataBlockDescNode;
|
||||||
|
|
||||||
typedef struct SPhysiNode {
|
typedef struct SPhysiNode {
|
||||||
ENodeType type;
|
ENodeType type;
|
||||||
STupleDescNode outputTuple;
|
SDataBlockDescNode outputDataBlockDesc;
|
||||||
SNode* pConditions;
|
SNode* pConditions;
|
||||||
SNodeList* pChildren;
|
SNodeList* pChildren;
|
||||||
struct SPhysiNode* pParent;
|
struct SPhysiNode* pParent;
|
||||||
|
@ -104,6 +105,7 @@ typedef struct STableScanPhysiNode {
|
||||||
SScanPhysiNode scan;
|
SScanPhysiNode scan;
|
||||||
uint8_t scanFlag; // denotes reversed scan of data or not
|
uint8_t scanFlag; // denotes reversed scan of data or not
|
||||||
STimeWindow scanRange;
|
STimeWindow scanRange;
|
||||||
|
SNode* pScanConditions;
|
||||||
} STableScanPhysiNode;
|
} STableScanPhysiNode;
|
||||||
|
|
||||||
typedef STableScanPhysiNode STableSeqScanPhysiNode;
|
typedef STableScanPhysiNode STableSeqScanPhysiNode;
|
||||||
|
|
|
@ -58,21 +58,13 @@ typedef struct SColumnNode {
|
||||||
char tableAlias[TSDB_TABLE_NAME_LEN];
|
char tableAlias[TSDB_TABLE_NAME_LEN];
|
||||||
char colName[TSDB_COL_NAME_LEN];
|
char colName[TSDB_COL_NAME_LEN];
|
||||||
SNode* pProjectRef;
|
SNode* pProjectRef;
|
||||||
int16_t tupleId;
|
int16_t dataBlockId;
|
||||||
int16_t slotId;
|
int16_t slotId;
|
||||||
} SColumnNode;
|
} SColumnNode;
|
||||||
|
|
||||||
// typedef struct SColumnRefNode {
|
|
||||||
// ENodeType type;
|
|
||||||
// SDataType dataType;
|
|
||||||
// int16_t tupleId;
|
|
||||||
// int16_t slotId;
|
|
||||||
// int16_t columnId;
|
|
||||||
// } SColumnRefNode;
|
|
||||||
|
|
||||||
typedef struct STargetNode {
|
typedef struct STargetNode {
|
||||||
ENodeType type;
|
ENodeType type;
|
||||||
int16_t tupleId;
|
int16_t dataBlockId;
|
||||||
int16_t slotId;
|
int16_t slotId;
|
||||||
SNode* pExpr;
|
SNode* pExpr;
|
||||||
} STargetNode;
|
} STargetNode;
|
||||||
|
|
|
@ -29,6 +29,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = NULL,
|
.processFunc = NULL,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
.name = "sum",
|
||||||
|
.type = FUNCTION_TYPE_SUM,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.checkFunc = stubCheckAndGetResultType,
|
||||||
|
.getEnvFunc = NULL,
|
||||||
|
.initFunc = NULL,
|
||||||
|
.processFunc = NULL,
|
||||||
|
.finalizeFunc = NULL
|
||||||
|
},
|
||||||
{
|
{
|
||||||
.name = "concat",
|
.name = "concat",
|
||||||
.type = FUNCTION_TYPE_CONCAT,
|
.type = FUNCTION_TYPE_CONCAT,
|
||||||
|
|
|
@ -72,6 +72,8 @@ static SNode* columnNodeCopy(const SColumnNode* pSrc, SColumnNode* pDst) {
|
||||||
COPY_CHAR_ARRAY_FIELD(tableAlias);
|
COPY_CHAR_ARRAY_FIELD(tableAlias);
|
||||||
COPY_CHAR_ARRAY_FIELD(colName);
|
COPY_CHAR_ARRAY_FIELD(colName);
|
||||||
// COPY_NODE_FIELD(pProjectRef);
|
// COPY_NODE_FIELD(pProjectRef);
|
||||||
|
COPY_SCALAR_FIELD(dataBlockId);
|
||||||
|
COPY_SCALAR_FIELD(slotId);
|
||||||
return (SNode*)pDst;
|
return (SNode*)pDst;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,7 +145,7 @@ static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static SNode* targetNodeCopy(const STargetNode* pSrc, STargetNode* pDst) {
|
static SNode* targetNodeCopy(const STargetNode* pSrc, STargetNode* pDst) {
|
||||||
COPY_SCALAR_FIELD(tupleId);
|
COPY_SCALAR_FIELD(dataBlockId);
|
||||||
COPY_SCALAR_FIELD(slotId);
|
COPY_SCALAR_FIELD(slotId);
|
||||||
COPY_NODE_FIELD(pExpr);
|
COPY_NODE_FIELD(pExpr);
|
||||||
return (SNode*)pDst;
|
return (SNode*)pDst;
|
||||||
|
|
|
@ -59,7 +59,7 @@ static char* nodeName(ENodeType type) {
|
||||||
return "Target";
|
return "Target";
|
||||||
case QUERY_NODE_RAW_EXPR:
|
case QUERY_NODE_RAW_EXPR:
|
||||||
return "RawExpr";
|
return "RawExpr";
|
||||||
case QUERY_NODE_TUPLE_DESC:
|
case QUERY_NODE_DATABLOCK_DESC:
|
||||||
return "TupleDesc";
|
return "TupleDesc";
|
||||||
case QUERY_NODE_SLOT_DESC:
|
case QUERY_NODE_SLOT_DESC:
|
||||||
return "SlotDesc";
|
return "SlotDesc";
|
||||||
|
@ -83,6 +83,10 @@ static char* nodeName(ENodeType type) {
|
||||||
return "PhysiTableScan";
|
return "PhysiTableScan";
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||||
return "PhysiProject";
|
return "PhysiProject";
|
||||||
|
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
|
||||||
|
return "PhysiJoin";
|
||||||
|
case QUERY_NODE_PHYSICAL_PLAN_AGG:
|
||||||
|
return "PhysiAgg";
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -191,14 +195,14 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char* jkPhysiPlanOutputTuple = "OutputTuple";
|
static const char* jkPhysiPlanOutputDataBlockDesc = "OutputDataBlockDesc";
|
||||||
static const char* jkPhysiPlanConditions = "Conditions";
|
static const char* jkPhysiPlanConditions = "Conditions";
|
||||||
static const char* jkPhysiPlanChildren = "Children";
|
static const char* jkPhysiPlanChildren = "Children";
|
||||||
|
|
||||||
static int32_t physicPlanNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physicPlanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SPhysiNode* pNode = (const SPhysiNode*)pObj;
|
const SPhysiNode* pNode = (const SPhysiNode*)pObj;
|
||||||
|
|
||||||
int32_t code = tjsonAddObject(pJson, jkPhysiPlanOutputTuple, nodeToJson, &pNode->outputTuple);
|
int32_t code = tjsonAddObject(pJson, jkPhysiPlanOutputDataBlockDesc, nodeToJson, &pNode->outputDataBlockDesc);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddObject(pJson, jkPhysiPlanConditions, nodeToJson, pNode->pConditions);
|
code = tjsonAddObject(pJson, jkPhysiPlanConditions, nodeToJson, pNode->pConditions);
|
||||||
}
|
}
|
||||||
|
@ -280,6 +284,48 @@ static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static const char* jkJoinPhysiPlanJoinType = "JoinType";
|
||||||
|
static const char* jkJoinPhysiPlanOnConditions = "OnConditions";
|
||||||
|
static const char* jkJoinPhysiPlanTargets = "Targets";
|
||||||
|
|
||||||
|
static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
|
const SJoinPhysiNode* pNode = (const SJoinPhysiNode*)pObj;
|
||||||
|
|
||||||
|
int32_t code = physicPlanNodeToJson(pObj, pJson);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pOnConditions);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = addNodeList(pJson, jkJoinPhysiPlanTargets, nodeToJson, pNode->pTargets);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static const char* jkAggPhysiPlanExprs = "Exprs";
|
||||||
|
static const char* jkAggPhysiPlanGroupKeys = "GroupKeys";
|
||||||
|
static const char* jkAggPhysiPlanAggFuncs = "AggFuncs";
|
||||||
|
|
||||||
|
static int32_t physiAggNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
|
const SAggPhysiNode* pNode = (const SAggPhysiNode*)pObj;
|
||||||
|
|
||||||
|
int32_t code = physicPlanNodeToJson(pObj, pJson);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = addNodeList(pJson, jkAggPhysiPlanExprs, nodeToJson, pNode->pExprs);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = addNodeList(pJson, jkAggPhysiPlanGroupKeys, nodeToJson, pNode->pGroupKeys);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = addNodeList(pJson, jkAggPhysiPlanAggFuncs, nodeToJson, pNode->pAggFuncs);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static const char* jkAggLogicPlanGroupKeys = "GroupKeys";
|
static const char* jkAggLogicPlanGroupKeys = "GroupKeys";
|
||||||
static const char* jkAggLogicPlanAggFuncs = "AggFuncs";
|
static const char* jkAggLogicPlanAggFuncs = "AggFuncs";
|
||||||
|
|
||||||
|
@ -340,6 +386,8 @@ static const char* jkColumnDbName = "DbName";
|
||||||
static const char* jkColumnTableName = "TableName";
|
static const char* jkColumnTableName = "TableName";
|
||||||
static const char* jkColumnTableAlias = "TableAlias";
|
static const char* jkColumnTableAlias = "TableAlias";
|
||||||
static const char* jkColumnColName = "ColName";
|
static const char* jkColumnColName = "ColName";
|
||||||
|
static const char* jkColumnDataBlockId = "DataBlockId";
|
||||||
|
static const char* jkColumnSlotId = "SlotId";
|
||||||
|
|
||||||
static int32_t columnNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t columnNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SColumnNode* pNode = (const SColumnNode*)pObj;
|
const SColumnNode* pNode = (const SColumnNode*)pObj;
|
||||||
|
@ -366,6 +414,12 @@ static int32_t columnNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddStringToObject(pJson, jkColumnColName, pNode->colName);
|
code = tjsonAddStringToObject(pJson, jkColumnColName, pNode->colName);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkColumnDataBlockId, pNode->dataBlockId);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkColumnSlotId, pNode->slotId);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -501,14 +555,14 @@ static int32_t groupingSetNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char* jkTargetTupleId = "TupleId";
|
static const char* jkTargetDataBlockId = "DataBlockId";
|
||||||
static const char* jkTargetSlotId = "SlotId";
|
static const char* jkTargetSlotId = "SlotId";
|
||||||
static const char* jkTargetExpr = "Expr";
|
static const char* jkTargetExpr = "Expr";
|
||||||
|
|
||||||
static int32_t targetNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t targetNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const STargetNode* pNode = (const STargetNode*)pObj;
|
const STargetNode* pNode = (const STargetNode*)pObj;
|
||||||
|
|
||||||
int32_t code = tjsonAddIntegerToObject(pJson, jkTargetTupleId, pNode->tupleId);
|
int32_t code = tjsonAddIntegerToObject(pJson, jkTargetDataBlockId, pNode->dataBlockId);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkTargetSlotId, pNode->slotId);
|
code = tjsonAddIntegerToObject(pJson, jkTargetSlotId, pNode->slotId);
|
||||||
}
|
}
|
||||||
|
@ -521,6 +575,8 @@ static int32_t targetNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
|
|
||||||
static const char* jkSlotDescSlotId = "SlotId";
|
static const char* jkSlotDescSlotId = "SlotId";
|
||||||
static const char* jkSlotDescDataType = "DataType";
|
static const char* jkSlotDescDataType = "DataType";
|
||||||
|
static const char* jkSlotDescReserve = "Reserve";
|
||||||
|
static const char* jkSlotDescOutput = "Output";
|
||||||
|
|
||||||
static int32_t slotDescNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t slotDescNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SSlotDescNode* pNode = (const SSlotDescNode*)pObj;
|
const SSlotDescNode* pNode = (const SSlotDescNode*)pObj;
|
||||||
|
@ -529,19 +585,25 @@ static int32_t slotDescNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddObject(pJson, jkSlotDescDataType, dataTypeToJson, &pNode->dataType);
|
code = tjsonAddObject(pJson, jkSlotDescDataType, dataTypeToJson, &pNode->dataType);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkSlotDescReserve, pNode->reserve);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkSlotDescOutput, pNode->output);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char* jkTupleDescTupleId = "TupleId";
|
static const char* jkDataBlockDescDataBlockId = "DataBlockId";
|
||||||
static const char* jkTupleDescSlots = "Slots";
|
static const char* jkDataBlockDescSlots = "Slots";
|
||||||
|
|
||||||
static int32_t tupleDescNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t dataBlockDescNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const STupleDescNode* pNode = (const STupleDescNode*)pObj;
|
const SDataBlockDescNode* pNode = (const SDataBlockDescNode*)pObj;
|
||||||
|
|
||||||
int32_t code = tjsonAddIntegerToObject(pJson, jkTupleDescTupleId, pNode->tupleId);
|
int32_t code = tjsonAddIntegerToObject(pJson, jkDataBlockDescDataBlockId, pNode->dataBlockId);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = addNodeList(pJson, jkTupleDescSlots, nodeToJson, pNode->pSlots);
|
code = addNodeList(pJson, jkDataBlockDescSlots, nodeToJson, pNode->pSlots);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -626,8 +688,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
return targetNodeToJson(pObj, pJson);
|
return targetNodeToJson(pObj, pJson);
|
||||||
case QUERY_NODE_RAW_EXPR:
|
case QUERY_NODE_RAW_EXPR:
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_TUPLE_DESC:
|
case QUERY_NODE_DATABLOCK_DESC:
|
||||||
return tupleDescNodeToJson(pObj, pJson);
|
return dataBlockDescNodeToJson(pObj, pJson);
|
||||||
case QUERY_NODE_SLOT_DESC:
|
case QUERY_NODE_SLOT_DESC:
|
||||||
return slotDescNodeToJson(pObj, pJson);
|
return slotDescNodeToJson(pObj, pJson);
|
||||||
case QUERY_NODE_SET_OPERATOR:
|
case QUERY_NODE_SET_OPERATOR:
|
||||||
|
@ -650,6 +712,10 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
return physiTableScanNodeToJson(pObj, pJson);
|
return physiTableScanNodeToJson(pObj, pJson);
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||||
return physiProjectNodeToJson(pObj, pJson);
|
return physiProjectNodeToJson(pObj, pJson);
|
||||||
|
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
|
||||||
|
return physiJoinNodeToJson(pObj, pJson);
|
||||||
|
case QUERY_NODE_PHYSICAL_PLAN_AGG:
|
||||||
|
return physiAggNodeToJson(pObj, pJson);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,8 +81,8 @@ SNode* nodesMakeNode(ENodeType type) {
|
||||||
return makeNode(type, sizeof(SProjectLogicNode));
|
return makeNode(type, sizeof(SProjectLogicNode));
|
||||||
case QUERY_NODE_TARGET:
|
case QUERY_NODE_TARGET:
|
||||||
return makeNode(type, sizeof(STargetNode));
|
return makeNode(type, sizeof(STargetNode));
|
||||||
case QUERY_NODE_TUPLE_DESC:
|
case QUERY_NODE_DATABLOCK_DESC:
|
||||||
return makeNode(type, sizeof(STupleDescNode));
|
return makeNode(type, sizeof(SDataBlockDescNode));
|
||||||
case QUERY_NODE_SLOT_DESC:
|
case QUERY_NODE_SLOT_DESC:
|
||||||
return makeNode(type, sizeof(SSlotDescNode));
|
return makeNode(type, sizeof(SSlotDescNode));
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||||
|
|
|
@ -29,9 +29,10 @@
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
void generateTestT1(MockCatalogService* mcs) {
|
void generateTestT1(MockCatalogService* mcs) {
|
||||||
ITableBuilder& builder = mcs->createTableBuilder("test", "t1", TSDB_NORMAL_TABLE, 4)
|
ITableBuilder& builder = mcs->createTableBuilder("test", "t1", TSDB_NORMAL_TABLE, 6)
|
||||||
.setPrecision(TSDB_TIME_PRECISION_MILLI).setVgid(1).addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP)
|
.setPrecision(TSDB_TIME_PRECISION_MILLI).setVgid(1).addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP)
|
||||||
.addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 20).addColumn("c3", TSDB_DATA_TYPE_BIGINT);
|
.addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 20).addColumn("c3", TSDB_DATA_TYPE_BIGINT)
|
||||||
|
.addColumn("c4", TSDB_DATA_TYPE_DOUBLE).addColumn("c5", TSDB_DATA_TYPE_DOUBLE);
|
||||||
builder.done();
|
builder.done();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -159,6 +159,10 @@ static SLogicNode* createScanLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect,
|
||||||
CHECK_ALLOC(pScan->node.pTargets, (SLogicNode*)pScan);
|
CHECK_ALLOC(pScan->node.pTargets, (SLogicNode*)pScan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pScan->scanType = SCAN_TYPE_TABLE;
|
||||||
|
pScan->scanFlag = MAIN_SCAN;
|
||||||
|
pScan->scanRange = TSWINDOW_INITIALIZER;
|
||||||
|
|
||||||
return (SLogicNode*)pScan;
|
return (SLogicNode*)pScan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -397,14 +401,14 @@ int32_t splitLogicPlan(SSubLogicPlan* pLogicPlan) {
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SSlotIndex {
|
typedef struct SSlotIndex {
|
||||||
int16_t tupleId;
|
int16_t dataBlockId;
|
||||||
int16_t slotId;
|
int16_t slotId;
|
||||||
} SSlotIndex;
|
} SSlotIndex;
|
||||||
|
|
||||||
typedef struct SPhysiPlanContext {
|
typedef struct SPhysiPlanContext {
|
||||||
int32_t errCode;
|
int32_t errCode;
|
||||||
int16_t nextTupleId;
|
int16_t nextDataBlockId;
|
||||||
SArray* pTupleHelper;
|
SArray* pLocationHelper;
|
||||||
} SPhysiPlanContext;
|
} SPhysiPlanContext;
|
||||||
|
|
||||||
static int32_t getSlotKey(SNode* pNode, char* pKey) {
|
static int32_t getSlotKey(SNode* pNode, char* pKey) {
|
||||||
|
@ -428,31 +432,31 @@ static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_
|
||||||
return (SNode*)pSlot;
|
return (SNode*)pSlot;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SNode* createTarget(SNode* pNode, int16_t tupleId, int16_t slotId) {
|
static SNode* createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId) {
|
||||||
STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET);
|
STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET);
|
||||||
if (NULL == pTarget) {
|
if (NULL == pTarget) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pTarget->tupleId = tupleId;
|
pTarget->dataBlockId = dataBlockId;
|
||||||
pTarget->slotId = slotId;
|
pTarget->slotId = slotId;
|
||||||
pTarget->pExpr = pNode;
|
pTarget->pExpr = pNode;
|
||||||
return (SNode*)pTarget;
|
return (SNode*)pTarget;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addTupleDesc(SPhysiPlanContext* pCxt, SNodeList* pList, STupleDescNode* pTuple) {
|
static int32_t addDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
|
||||||
SHashObj* pHash = NULL;
|
SHashObj* pHash = NULL;
|
||||||
if (NULL == pTuple->pSlots) {
|
if (NULL == pDataBlockDesc->pSlots) {
|
||||||
pTuple->pSlots = nodesMakeList();
|
pDataBlockDesc->pSlots = nodesMakeList();
|
||||||
CHECK_ALLOC(pTuple->pSlots, TSDB_CODE_OUT_OF_MEMORY);
|
CHECK_ALLOC(pDataBlockDesc->pSlots, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
|
||||||
pHash = taosHashInit(LIST_LENGTH(pList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
pHash = taosHashInit(LIST_LENGTH(pList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
CHECK_ALLOC(pHash, TSDB_CODE_OUT_OF_MEMORY);
|
CHECK_ALLOC(pHash, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
if (NULL == taosArrayInsert(pCxt->pTupleHelper, pTuple->tupleId, &pHash)) {
|
if (NULL == taosArrayInsert(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId, &pHash)) {
|
||||||
taosHashCleanup(pHash);
|
taosHashCleanup(pHash);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pHash = taosArrayGetP(pCxt->pTupleHelper, pTuple->tupleId);
|
pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
|
||||||
}
|
}
|
||||||
|
|
||||||
SNode* pNode = NULL;
|
SNode* pNode = NULL;
|
||||||
|
@ -460,17 +464,17 @@ static int32_t addTupleDesc(SPhysiPlanContext* pCxt, SNodeList* pList, STupleDes
|
||||||
FOREACH(pNode, pList) {
|
FOREACH(pNode, pList) {
|
||||||
SNode* pSlot = createSlotDesc(pCxt, pNode, slotId);
|
SNode* pSlot = createSlotDesc(pCxt, pNode, slotId);
|
||||||
CHECK_ALLOC(pSlot, TSDB_CODE_OUT_OF_MEMORY);
|
CHECK_ALLOC(pSlot, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
if (TSDB_CODE_SUCCESS != nodesListAppend(pTuple->pSlots, (SNode*)pSlot)) {
|
if (TSDB_CODE_SUCCESS != nodesListAppend(pDataBlockDesc->pSlots, (SNode*)pSlot)) {
|
||||||
nodesDestroyNode(pSlot);
|
nodesDestroyNode(pSlot);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSlotIndex index = { .tupleId = pTuple->tupleId, .slotId = slotId };
|
SSlotIndex index = { .dataBlockId = pDataBlockDesc->dataBlockId, .slotId = slotId };
|
||||||
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
|
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
|
||||||
int32_t len = getSlotKey(pNode, name);
|
int32_t len = getSlotKey(pNode, name);
|
||||||
CHECK_CODE(taosHashPut(pHash, name, len, &index, sizeof(SSlotIndex)), TSDB_CODE_OUT_OF_MEMORY);
|
CHECK_CODE(taosHashPut(pHash, name, len, &index, sizeof(SSlotIndex)), TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
|
||||||
SNode* pTarget = createTarget(pNode, pTuple->tupleId, slotId);
|
SNode* pTarget = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId);
|
||||||
CHECK_ALLOC(pTarget, TSDB_CODE_OUT_OF_MEMORY);
|
CHECK_ALLOC(pTarget, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
REPLACE_NODE(pTarget);
|
REPLACE_NODE(pTarget);
|
||||||
|
|
||||||
|
@ -495,7 +499,7 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
|
||||||
pIndex = taosHashGet(pCxt->pRightHash, name, len);
|
pIndex = taosHashGet(pCxt->pRightHash, name, len);
|
||||||
}
|
}
|
||||||
// pIndex is definitely not NULL, otherwise it is a bug
|
// pIndex is definitely not NULL, otherwise it is a bug
|
||||||
((SColumnNode*)pNode)->tupleId = pIndex->tupleId;
|
((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
|
||||||
((SColumnNode*)pNode)->slotId = pIndex->slotId;
|
((SColumnNode*)pNode)->slotId = pIndex->slotId;
|
||||||
CHECK_ALLOC(pNode, DEAL_RES_ERROR);
|
CHECK_ALLOC(pNode, DEAL_RES_ERROR);
|
||||||
return DEAL_RES_IGNORE_CHILD;
|
return DEAL_RES_IGNORE_CHILD;
|
||||||
|
@ -503,11 +507,11 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SNode* setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftTupleId, int16_t rightTupleId, SNode* pNode) {
|
static SNode* setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNode* pNode) {
|
||||||
SNode* pRes = nodesCloneNode(pNode);
|
SNode* pRes = nodesCloneNode(pNode);
|
||||||
CHECK_ALLOC(pRes, NULL);
|
CHECK_ALLOC(pRes, NULL);
|
||||||
SSetSlotIdCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pLeftHash = taosArrayGetP(pCxt->pTupleHelper, leftTupleId),
|
SSetSlotIdCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
|
||||||
.pRightHash = (rightTupleId < 0 ? NULL : taosArrayGetP(pCxt->pTupleHelper, rightTupleId)) };
|
.pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId)) };
|
||||||
nodesWalkNode(pRes, doSetSlotId, &cxt);
|
nodesWalkNode(pRes, doSetSlotId, &cxt);
|
||||||
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
||||||
nodesDestroyNode(pRes);
|
nodesDestroyNode(pRes);
|
||||||
|
@ -516,11 +520,11 @@ static SNode* setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftTupleId, int16_
|
||||||
return pRes;
|
return pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SNodeList* setListSlotId(SPhysiPlanContext* pCxt, int16_t leftTupleId, int16_t rightTupleId, SNodeList* pList) {
|
static SNodeList* setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNodeList* pList) {
|
||||||
SNodeList* pRes = nodesCloneList(pList);
|
SNodeList* pRes = nodesCloneList(pList);
|
||||||
CHECK_ALLOC(pRes, NULL);
|
CHECK_ALLOC(pRes, NULL);
|
||||||
SSetSlotIdCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pLeftHash = taosArrayGetP(pCxt->pTupleHelper, leftTupleId),
|
SSetSlotIdCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
|
||||||
.pRightHash = (rightTupleId < 0 ? NULL : taosArrayGetP(pCxt->pTupleHelper, rightTupleId)) };
|
.pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId)) };
|
||||||
nodesWalkList(pRes, doSetSlotId, &cxt);
|
nodesWalkList(pRes, doSetSlotId, &cxt);
|
||||||
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
||||||
nodesDestroyList(pRes);
|
nodesDestroyList(pRes);
|
||||||
|
@ -534,27 +538,27 @@ static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, ENodeType type) {
|
||||||
if (NULL == pPhysiNode) {
|
if (NULL == pPhysiNode) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pPhysiNode->outputTuple.tupleId = pCxt->nextTupleId++;
|
pPhysiNode->outputDataBlockDesc.dataBlockId = pCxt->nextDataBlockId++;
|
||||||
pPhysiNode->outputTuple.type = QUERY_NODE_TUPLE_DESC;
|
pPhysiNode->outputDataBlockDesc.type = QUERY_NODE_DATABLOCK_DESC;
|
||||||
return pPhysiNode;
|
return pPhysiNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) {
|
static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) {
|
||||||
if (NULL != pLogicNode->pConditions) {
|
if (NULL != pLogicNode->pConditions) {
|
||||||
pPhysiNode->pConditions = setNodeSlotId(pCxt, pPhysiNode->outputTuple.tupleId, -1, pLogicNode->pConditions);
|
pPhysiNode->pConditions = setNodeSlotId(pCxt, pPhysiNode->outputDataBlockDesc.dataBlockId, -1, pLogicNode->pConditions);
|
||||||
CHECK_ALLOC(pPhysiNode->pConditions, TSDB_CODE_OUT_OF_MEMORY);
|
CHECK_ALLOC(pPhysiNode->pConditions, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setSlotOutput(SPhysiPlanContext* pCxt, SNodeList* pTargets, STupleDescNode* pTuple) {
|
static int32_t setSlotOutput(SPhysiPlanContext* pCxt, SNodeList* pTargets, SDataBlockDescNode* pDataBlockDesc) {
|
||||||
SHashObj* pHash = taosArrayGetP(pCxt->pTupleHelper, pTuple->tupleId);
|
SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
|
||||||
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
|
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
|
||||||
SNode* pNode;
|
SNode* pNode;
|
||||||
FOREACH(pNode, pTargets) {
|
FOREACH(pNode, pTargets) {
|
||||||
int32_t len = getSlotKey(pNode, name);
|
int32_t len = getSlotKey(pNode, name);
|
||||||
SSlotIndex* pIndex = taosHashGet(pHash, name, len);
|
SSlotIndex* pIndex = taosHashGet(pHash, name, len);
|
||||||
((SSlotDescNode*)nodesListGetNode(pTuple->pSlots, pIndex->slotId))->output = true;
|
((SSlotDescNode*)nodesListGetNode(pDataBlockDesc->pSlots, pIndex->slotId))->output = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -565,12 +569,12 @@ static int32_t initScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanL
|
||||||
pScanPhysiNode->pScanCols = nodesCloneList(pScanLogicNode->pScanCols);
|
pScanPhysiNode->pScanCols = nodesCloneList(pScanLogicNode->pScanCols);
|
||||||
CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY);
|
CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
// Tuple describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
|
// Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
|
||||||
CHECK_CODE(addTupleDesc(pCxt, pScanPhysiNode->pScanCols, &pScanPhysiNode->node.outputTuple), TSDB_CODE_OUT_OF_MEMORY);
|
CHECK_CODE(addDataBlockDesc(pCxt, pScanPhysiNode->pScanCols, &pScanPhysiNode->node.outputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
|
||||||
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode), TSDB_CODE_OUT_OF_MEMORY);
|
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode), TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
|
||||||
CHECK_CODE(setSlotOutput(pCxt, pScanLogicNode->node.pTargets, &pScanPhysiNode->node.outputTuple), TSDB_CODE_OUT_OF_MEMORY);
|
CHECK_CODE(setSlotOutput(pCxt, pScanLogicNode->node.pTargets, &pScanPhysiNode->node.outputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
|
||||||
pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
|
pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
|
||||||
pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
|
pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
|
||||||
|
@ -612,32 +616,32 @@ static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode*
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SNodeList* createJoinOutputCols(SPhysiPlanContext* pCxt, STupleDescNode* pLeftTuple, STupleDescNode* pRightTuple) {
|
static SNodeList* createJoinOutputCols(SPhysiPlanContext* pCxt, SDataBlockDescNode* pLeftDesc, SDataBlockDescNode* pRightDesc) {
|
||||||
SNodeList* pCols = nodesMakeList();
|
SNodeList* pCols = nodesMakeList();
|
||||||
CHECK_ALLOC(pCols, NULL);
|
CHECK_ALLOC(pCols, NULL);
|
||||||
SNode* pNode;
|
SNode* pNode;
|
||||||
FOREACH(pNode, pLeftTuple->pSlots) {
|
FOREACH(pNode, pLeftDesc->pSlots) {
|
||||||
SSlotDescNode* pSlot = (SSlotDescNode*)pNode;
|
SSlotDescNode* pSlot = (SSlotDescNode*)pNode;
|
||||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||||
if (NULL == pCol) {
|
if (NULL == pCol) {
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
pCol->node.resType = pSlot->dataType;
|
pCol->node.resType = pSlot->dataType;
|
||||||
pCol->tupleId = pLeftTuple->tupleId;
|
pCol->dataBlockId = pLeftDesc->dataBlockId;
|
||||||
pCol->slotId = pSlot->slotId;
|
pCol->slotId = pSlot->slotId;
|
||||||
pCol->colId = -1;
|
pCol->colId = -1;
|
||||||
if (TSDB_CODE_SUCCESS != nodesListAppend(pCols, (SNode*)pCol)) {
|
if (TSDB_CODE_SUCCESS != nodesListAppend(pCols, (SNode*)pCol)) {
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
FOREACH(pNode, pRightTuple->pSlots) {
|
FOREACH(pNode, pRightDesc->pSlots) {
|
||||||
SSlotDescNode* pSlot = (SSlotDescNode*)pNode;
|
SSlotDescNode* pSlot = (SSlotDescNode*)pNode;
|
||||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||||
if (NULL == pCol) {
|
if (NULL == pCol) {
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
pCol->node.resType = pSlot->dataType;
|
pCol->node.resType = pSlot->dataType;
|
||||||
pCol->tupleId = pRightTuple->tupleId;
|
pCol->dataBlockId = pRightDesc->dataBlockId;
|
||||||
pCol->slotId = pSlot->slotId;
|
pCol->slotId = pSlot->slotId;
|
||||||
pCol->colId = -1;
|
pCol->colId = -1;
|
||||||
if (TSDB_CODE_SUCCESS != nodesListAppend(pCols, (SNode*)pCol)) {
|
if (TSDB_CODE_SUCCESS != nodesListAppend(pCols, (SNode*)pCol)) {
|
||||||
|
@ -654,18 +658,18 @@ static SPhysiNode* createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
|
||||||
SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_JOIN);
|
SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_JOIN);
|
||||||
CHECK_ALLOC(pJoin, NULL);
|
CHECK_ALLOC(pJoin, NULL);
|
||||||
|
|
||||||
STupleDescNode* pLeftTuple = &((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputTuple;
|
SDataBlockDescNode* pLeftDesc = &((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputDataBlockDesc;
|
||||||
STupleDescNode* pRightTuple = &((SPhysiNode*)nodesListGetNode(pChildren, 1))->outputTuple;
|
SDataBlockDescNode* pRightDesc = &((SPhysiNode*)nodesListGetNode(pChildren, 1))->outputDataBlockDesc;
|
||||||
pJoin->pOnConditions = setNodeSlotId(pCxt, pLeftTuple->tupleId, pRightTuple->tupleId, pJoinLogicNode->pOnConditions);
|
pJoin->pOnConditions = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pOnConditions);
|
||||||
CHECK_ALLOC(pJoin->pOnConditions, (SPhysiNode*)pJoin);
|
CHECK_ALLOC(pJoin->pOnConditions, (SPhysiNode*)pJoin);
|
||||||
|
|
||||||
pJoin->pTargets = createJoinOutputCols(pCxt, pLeftTuple, pRightTuple);
|
pJoin->pTargets = createJoinOutputCols(pCxt, pLeftDesc, pRightDesc);
|
||||||
CHECK_ALLOC(pJoin->pTargets, (SPhysiNode*)pJoin);
|
CHECK_ALLOC(pJoin->pTargets, (SPhysiNode*)pJoin);
|
||||||
CHECK_CODE(addTupleDesc(pCxt, pJoin->pTargets, &pJoin->node.outputTuple), (SPhysiNode*)pJoin);
|
CHECK_CODE(addDataBlockDesc(pCxt, pJoin->pTargets, &pJoin->node.outputDataBlockDesc), (SPhysiNode*)pJoin);
|
||||||
|
|
||||||
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin), (SPhysiNode*)pJoin);
|
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin), (SPhysiNode*)pJoin);
|
||||||
|
|
||||||
CHECK_CODE(setSlotOutput(pCxt, pJoinLogicNode->node.pTargets, &pJoin->node.outputTuple), (SPhysiNode*)pJoin);
|
CHECK_CODE(setSlotOutput(pCxt, pJoinLogicNode->node.pTargets, &pJoin->node.outputDataBlockDesc), (SPhysiNode*)pJoin);
|
||||||
|
|
||||||
return (SPhysiNode*)pJoin;
|
return (SPhysiNode*)pJoin;
|
||||||
}
|
}
|
||||||
|
@ -689,9 +693,14 @@ static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode)
|
||||||
nodesDestroyNode(pExpr);
|
nodesDestroyNode(pExpr);
|
||||||
return DEAL_RES_ERROR;
|
return DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
SExprNode* pToBeRewrittenExpr = (SExprNode*)(*pNode);
|
SExprNode* pRewrittenExpr = (SExprNode*)pExpr;
|
||||||
pCol->node.resType = pToBeRewrittenExpr->resType;
|
pCol->node.resType = pRewrittenExpr->resType;
|
||||||
strcpy(pCol->colName, pToBeRewrittenExpr->aliasName);
|
if ('\0' != pRewrittenExpr->aliasName[0]) {
|
||||||
|
strcpy(pCol->colName, pRewrittenExpr->aliasName);
|
||||||
|
} else {
|
||||||
|
snprintf(pRewrittenExpr->aliasName, sizeof(pRewrittenExpr->aliasName), "#expr_%d_%d", pCxt->planNodeId, pCxt->rewriteId);
|
||||||
|
strcpy(pCol->colName, pRewrittenExpr->aliasName);
|
||||||
|
}
|
||||||
nodesDestroyNode(*pNode);
|
nodesDestroyNode(*pNode);
|
||||||
*pNode = (SNode*)pCol;
|
*pNode = (SNode*)pCol;
|
||||||
return DEAL_RES_IGNORE_CHILD;
|
return DEAL_RES_IGNORE_CHILD;
|
||||||
|
@ -758,29 +767,29 @@ static SPhysiNode* createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild
|
||||||
CHECK_CODE(rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys), (SPhysiNode*)pAgg);
|
CHECK_CODE(rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys), (SPhysiNode*)pAgg);
|
||||||
CHECK_CODE(rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs), (SPhysiNode*)pAgg);
|
CHECK_CODE(rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs), (SPhysiNode*)pAgg);
|
||||||
|
|
||||||
STupleDescNode* pChildTupe = &(((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputTuple);
|
SDataBlockDescNode* pChildTupe = &(((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputDataBlockDesc);
|
||||||
// push down expression to outputTuple of child node
|
// push down expression to outputDataBlockDesc of child node
|
||||||
if (NULL != pPrecalcExprs) {
|
if (NULL != pPrecalcExprs) {
|
||||||
pAgg->pExprs = setListSlotId(pCxt, pChildTupe->tupleId, -1, pPrecalcExprs);
|
pAgg->pExprs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs);
|
||||||
CHECK_ALLOC(pAgg->pExprs, (SPhysiNode*)pAgg);
|
CHECK_ALLOC(pAgg->pExprs, (SPhysiNode*)pAgg);
|
||||||
CHECK_CODE(addTupleDesc(pCxt, pAgg->pExprs, pChildTupe), (SPhysiNode*)pAgg);
|
CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pExprs, pChildTupe), (SPhysiNode*)pAgg);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL != pGroupKeys) {
|
if (NULL != pGroupKeys) {
|
||||||
pAgg->pGroupKeys = setListSlotId(pCxt, pChildTupe->tupleId, -1, pGroupKeys);
|
pAgg->pGroupKeys = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys);
|
||||||
CHECK_ALLOC(pAgg->pGroupKeys, (SPhysiNode*)pAgg);
|
CHECK_ALLOC(pAgg->pGroupKeys, (SPhysiNode*)pAgg);
|
||||||
CHECK_CODE(addTupleDesc(pCxt, pAgg->pGroupKeys, &pAgg->node.outputTuple), (SPhysiNode*)pAgg);
|
CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pGroupKeys, &pAgg->node.outputDataBlockDesc), (SPhysiNode*)pAgg);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL != pAggFuncs) {
|
if (NULL != pAggFuncs) {
|
||||||
pAgg->pAggFuncs = setListSlotId(pCxt, pChildTupe->tupleId, -1, pAggFuncs);
|
pAgg->pAggFuncs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs);
|
||||||
CHECK_ALLOC(pAgg->pAggFuncs, (SPhysiNode*)pAgg);
|
CHECK_ALLOC(pAgg->pAggFuncs, (SPhysiNode*)pAgg);
|
||||||
CHECK_CODE(addTupleDesc(pCxt, pAgg->pAggFuncs, &pAgg->node.outputTuple), (SPhysiNode*)pAgg);
|
CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pAggFuncs, &pAgg->node.outputDataBlockDesc), (SPhysiNode*)pAgg);
|
||||||
}
|
}
|
||||||
|
|
||||||
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg), (SPhysiNode*)pAgg);
|
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg), (SPhysiNode*)pAgg);
|
||||||
|
|
||||||
CHECK_CODE(setSlotOutput(pCxt, pAggLogicNode->node.pTargets, &pAgg->node.outputTuple), (SPhysiNode*)pAgg);
|
CHECK_CODE(setSlotOutput(pCxt, pAggLogicNode->node.pTargets, &pAgg->node.outputDataBlockDesc), (SPhysiNode*)pAgg);
|
||||||
|
|
||||||
return (SPhysiNode*)pAgg;
|
return (SPhysiNode*)pAgg;
|
||||||
}
|
}
|
||||||
|
@ -789,9 +798,9 @@ static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
|
||||||
SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
|
SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
|
||||||
CHECK_ALLOC(pProject, NULL);
|
CHECK_ALLOC(pProject, NULL);
|
||||||
|
|
||||||
pProject->pProjections = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputTuple.tupleId, -1, pProjectLogicNode->pProjections);
|
pProject->pProjections = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputDataBlockDesc.dataBlockId, -1, pProjectLogicNode->pProjections);
|
||||||
CHECK_ALLOC(pProject->pProjections, (SPhysiNode*)pProject);
|
CHECK_ALLOC(pProject->pProjections, (SPhysiNode*)pProject);
|
||||||
CHECK_CODE(addTupleDesc(pCxt, pProject->pProjections, &pProject->node.outputTuple), (SPhysiNode*)pProject);
|
CHECK_CODE(addDataBlockDesc(pCxt, pProject->pProjections, &pProject->node.outputDataBlockDesc), (SPhysiNode*)pProject);
|
||||||
|
|
||||||
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject), (SPhysiNode*)pProject);
|
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject), (SPhysiNode*)pProject);
|
||||||
|
|
||||||
|
@ -840,8 +849,8 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPl
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode) {
|
int32_t createPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode) {
|
||||||
SPhysiPlanContext cxt = { .errCode = TSDB_CODE_SUCCESS, .nextTupleId = 0, .pTupleHelper = taosArrayInit(32, POINTER_BYTES) };
|
SPhysiPlanContext cxt = { .errCode = TSDB_CODE_SUCCESS, .nextDataBlockId = 0, .pLocationHelper = taosArrayInit(32, POINTER_BYTES) };
|
||||||
if (NULL == cxt.pTupleHelper) {
|
if (NULL == cxt.pLocationHelper) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
*pPhyNode = createPhysiNode(&cxt, pLogicNode);
|
*pPhyNode = createPhysiNode(&cxt, pLogicNode);
|
||||||
|
|
|
@ -62,7 +62,7 @@ protected:
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
cout << "sql : [" << cxt_.pSql << "]" << endl;
|
cout << "====================sql : [" << cxt_.pSql << "]" << endl;
|
||||||
cout << "syntax test : " << endl;
|
cout << "syntax test : " << endl;
|
||||||
cout << syntaxTreeStr << endl;
|
cout << syntaxTreeStr << endl;
|
||||||
cout << "unformatted logic plan : " << endl;
|
cout << "unformatted logic plan : " << endl;
|
||||||
|
@ -123,8 +123,8 @@ TEST_F(NewPlannerTest, simple) {
|
||||||
TEST_F(NewPlannerTest, groupBy) {
|
TEST_F(NewPlannerTest, groupBy) {
|
||||||
setDatabase("root", "test");
|
setDatabase("root", "test");
|
||||||
|
|
||||||
// bind("SELECT count(*) FROM t1");
|
bind("SELECT count(*) FROM t1");
|
||||||
// ASSERT_TRUE(run());
|
ASSERT_TRUE(run());
|
||||||
|
|
||||||
bind("SELECT c1, count(*) FROM t1 GROUP BY c1");
|
bind("SELECT c1, count(*) FROM t1 GROUP BY c1");
|
||||||
ASSERT_TRUE(run());
|
ASSERT_TRUE(run());
|
||||||
|
@ -132,7 +132,7 @@ TEST_F(NewPlannerTest, groupBy) {
|
||||||
bind("SELECT c1 + c3, c1 + count(*) FROM t1 where c2 = 'abc' GROUP BY c1, c3");
|
bind("SELECT c1 + c3, c1 + count(*) FROM t1 where c2 = 'abc' GROUP BY c1, c3");
|
||||||
ASSERT_TRUE(run());
|
ASSERT_TRUE(run());
|
||||||
|
|
||||||
bind("SELECT c1 + c3, count(*) FROM t1 where concat(c2, 'wwww') = 'abcwww' GROUP BY c1 + c3");
|
bind("SELECT c1 + c3, sum(c4 * c5) FROM t1 where concat(c2, 'wwww') = 'abcwww' GROUP BY c1 + c3");
|
||||||
ASSERT_TRUE(run());
|
ASSERT_TRUE(run());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1455,7 +1455,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
|
||||||
for (uint32_t i = 0; i < info->fields[FLD_TYPE_COLUMN].num; ++i) {
|
for (uint32_t i = 0; i < info->fields[FLD_TYPE_COLUMN].num; ++i) {
|
||||||
SFilterField *field = &info->fields[FLD_TYPE_COLUMN].fields[i];
|
SFilterField *field = &info->fields[FLD_TYPE_COLUMN].fields[i];
|
||||||
SColumnNode *refNode = (SColumnNode *)field->desc;
|
SColumnNode *refNode = (SColumnNode *)field->desc;
|
||||||
qDebug("COL%d => [%d][%d]", i, refNode->tupleId, refNode->slotId);
|
qDebug("COL%d => [%d][%d]", i, refNode->dataBlockId, refNode->slotId);
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("VALUE Field Num:%u", info->fields[FLD_TYPE_VALUE].num);
|
qDebug("VALUE Field Num:%u", info->fields[FLD_TYPE_VALUE].num);
|
||||||
|
@ -1485,7 +1485,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
|
||||||
SFilterField *left = FILTER_UNIT_LEFT_FIELD(info, unit);
|
SFilterField *left = FILTER_UNIT_LEFT_FIELD(info, unit);
|
||||||
SColumnNode *refNode = (SColumnNode *)left->desc;
|
SColumnNode *refNode = (SColumnNode *)left->desc;
|
||||||
if (unit->compare.optr >= 0 && unit->compare.optr <= OP_TYPE_JSON_CONTAINS){
|
if (unit->compare.optr >= 0 && unit->compare.optr <= OP_TYPE_JSON_CONTAINS){
|
||||||
len = sprintf(str, "UNIT[%d] => [%d][%d] %s [", i, refNode->tupleId, refNode->slotId, gOptrStr[unit->compare.optr].str);
|
len = sprintf(str, "UNIT[%d] => [%d][%d] %s [", i, refNode->dataBlockId, refNode->slotId, gOptrStr[unit->compare.optr].str);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (unit->right.type == FLD_TYPE_VALUE && FILTER_UNIT_OPTR(unit) != OP_TYPE_IN) {
|
if (unit->right.type == FLD_TYPE_VALUE && FILTER_UNIT_OPTR(unit) != OP_TYPE_IN) {
|
||||||
|
@ -1504,7 +1504,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
|
||||||
if (unit->compare.optr2) {
|
if (unit->compare.optr2) {
|
||||||
strcat(str, " && ");
|
strcat(str, " && ");
|
||||||
if (unit->compare.optr2 >= 0 && unit->compare.optr2 <= OP_TYPE_JSON_CONTAINS){
|
if (unit->compare.optr2 >= 0 && unit->compare.optr2 <= OP_TYPE_JSON_CONTAINS){
|
||||||
sprintf(str + strlen(str), "[%d][%d] %s [", refNode->tupleId, refNode->slotId, gOptrStr[unit->compare.optr2].str);
|
sprintf(str + strlen(str), "[%d][%d] %s [", refNode->dataBlockId, refNode->slotId, gOptrStr[unit->compare.optr2].str);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (unit->right2.type == FLD_TYPE_VALUE && FILTER_UNIT_OPTR(unit) != OP_TYPE_IN) {
|
if (unit->right2.type == FLD_TYPE_VALUE && FILTER_UNIT_OPTR(unit) != OP_TYPE_IN) {
|
||||||
|
|
|
@ -80,7 +80,7 @@ void flttMakeColRefNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in
|
||||||
SColumnNode *rnode = (SColumnNode *)node;
|
SColumnNode *rnode = (SColumnNode *)node;
|
||||||
rnode->node.resType.type = dataType;
|
rnode->node.resType.type = dataType;
|
||||||
rnode->node.resType.bytes = dataBytes;
|
rnode->node.resType.bytes = dataBytes;
|
||||||
rnode->tupleId = 0;
|
rnode->dataBlockId = 0;
|
||||||
|
|
||||||
if (NULL == block) {
|
if (NULL == block) {
|
||||||
rnode->slotId = 2;
|
rnode->slotId = 2;
|
||||||
|
|
|
@ -79,7 +79,7 @@ void scltMakeColRefNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in
|
||||||
SColumnNode *rnode = (SColumnNode *)node;
|
SColumnNode *rnode = (SColumnNode *)node;
|
||||||
rnode->node.resType.type = dataType;
|
rnode->node.resType.type = dataType;
|
||||||
rnode->node.resType.bytes = dataBytes;
|
rnode->node.resType.bytes = dataBytes;
|
||||||
rnode->tupleId = 0;
|
rnode->dataBlockId = 0;
|
||||||
|
|
||||||
if (NULL == *block) {
|
if (NULL == *block) {
|
||||||
SSDataBlock *res = (SSDataBlock *)calloc(1, sizeof(SSDataBlock));
|
SSDataBlock *res = (SSDataBlock *)calloc(1, sizeof(SSDataBlock));
|
||||||
|
|
Loading…
Reference in New Issue