TD-13495 physical plan refactoring

This commit is contained in:
Xiaoyu Wang 2022-02-24 05:18:41 -05:00
parent 5fa4b1ae86
commit 49143d079e
11 changed files with 322 additions and 157 deletions

View File

@ -62,7 +62,6 @@ typedef enum ENodeType {
QUERY_NODE_NODE_LIST,
QUERY_NODE_FILL,
QUERY_NODE_RAW_EXPR, // Only be used in parser module.
QUERY_NODE_COLUMN_REF,
QUERY_NODE_TARGET,
QUERY_NODE_TUPLE_DESC,
QUERY_NODE_SLOT_DESC,
@ -81,7 +80,9 @@ typedef enum ENodeType {
// physical plan node
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_PROJECT
QUERY_NODE_PHYSICAL_PLAN_PROJECT,
QUERY_NODE_PHYSICAL_PLAN_JOIN,
QUERY_NODE_PHYSICAL_PLAN_AGG
} ENodeType;
/**

View File

@ -69,8 +69,6 @@ typedef struct SSlotDescNode {
ENodeType type;
int16_t slotId;
SDataType dataType;
int16_t srcTupleId;
int16_t srcSlotId;
bool reserve;
bool output;
} SSlotDescNode;
@ -115,6 +113,20 @@ typedef struct SProjectPhysiNode {
SNodeList* pProjections;
} SProjectPhysiNode;
typedef struct SJoinPhysiNode {
SPhysiNode node;
EJoinType joinType;
SNode* pOnConditions; // in or out tuple ?
SNodeList* pTargets;
} SJoinPhysiNode;
typedef struct SAggPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of group_by_clause and parameter expression of aggregate function
SNodeList* pGroupKeys; // SColumnRefNode list
SNodeList* pAggFuncs;
} SAggPhysiNode;
#ifdef __cplusplus
}
#endif

View File

@ -58,15 +58,17 @@ typedef struct SColumnNode {
char tableAlias[TSDB_TABLE_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
SNode* pProjectRef;
} SColumnNode;
typedef struct SColumnRefNode {
ENodeType type;
SDataType dataType;
int16_t tupleId;
int16_t slotId;
int16_t columnId;
} SColumnRefNode;
} SColumnNode;
// typedef struct SColumnRefNode {
// ENodeType type;
// SDataType dataType;
// int16_t tupleId;
// int16_t slotId;
// int16_t columnId;
// } SColumnRefNode;
typedef struct STargetNode {
ENodeType type;

View File

@ -142,14 +142,6 @@ static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) {
return (SNode*)pDst;
}
static SNode* columnRefNodeCopy(const SColumnRefNode* pSrc, SColumnRefNode* pDst) {
dataTypeCopy(&pSrc->dataType, &pDst->dataType);
COPY_SCALAR_FIELD(tupleId);
COPY_SCALAR_FIELD(slotId);
COPY_SCALAR_FIELD(columnId);
return (SNode*)pDst;
}
static SNode* targetNodeCopy(const STargetNode* pSrc, STargetNode* pDst) {
COPY_SCALAR_FIELD(tupleId);
COPY_SCALAR_FIELD(slotId);
@ -183,8 +175,6 @@ SNode* nodesCloneNode(const SNode* pNode) {
return logicConditionNodeCopy((const SLogicConditionNode*)pNode, (SLogicConditionNode*)pDst);
case QUERY_NODE_FUNCTION:
return functionNodeCopy((const SFunctionNode*)pNode, (SFunctionNode*)pDst);
case QUERY_NODE_COLUMN_REF:
return columnRefNodeCopy((const SColumnRefNode*)pNode, (SColumnRefNode*)pDst);
case QUERY_NODE_TARGET:
return targetNodeCopy((const STargetNode*)pNode, (STargetNode*)pDst);
case QUERY_NODE_REAL_TABLE:

View File

@ -55,8 +55,6 @@ static char* nodeName(ENodeType type) {
return "NodeList";
case QUERY_NODE_FILL:
return "Fill";
case QUERY_NODE_COLUMN_REF:
return "ColumnRef";
case QUERY_NODE_TARGET:
return "Target";
case QUERY_NODE_RAW_EXPR:
@ -503,28 +501,6 @@ static int32_t groupingSetNodeToJson(const void* pObj, SJson* pJson) {
return code;
}
static const char* jkColumnRefDataType = "DataType";
static const char* jkColumnRefTupleId = "TupleId";
static const char* jkColumnRefSlotId = "SlotId";
static const char* jkColumnRefColumnId = "ColumnId";
static int32_t columnRefNodeToJson(const void* pObj, SJson* pJson) {
const SColumnRefNode* pNode = (const SColumnRefNode*)pObj;
int32_t code = tjsonAddObject(pJson, jkColumnRefDataType, dataTypeToJson, &pNode->dataType);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkColumnRefTupleId, pNode->tupleId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkColumnRefSlotId, pNode->slotId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkColumnRefColumnId, pNode->columnId);
}
return code;
}
static const char* jkTargetTupleId = "TupleId";
static const char* jkTargetSlotId = "SlotId";
static const char* jkTargetExpr = "Expr";
@ -646,8 +622,6 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_INTERVAL_WINDOW:
case QUERY_NODE_NODE_LIST:
case QUERY_NODE_FILL:
case QUERY_NODE_COLUMN_REF:
return columnRefNodeToJson(pObj, pJson);
case QUERY_NODE_TARGET:
return targetNodeToJson(pObj, pJson);
case QUERY_NODE_RAW_EXPR:

View File

@ -79,8 +79,6 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SAggLogicNode));
case QUERY_NODE_LOGIC_PLAN_PROJECT:
return makeNode(type, sizeof(SProjectLogicNode));
case QUERY_NODE_COLUMN_REF:
return makeNode(type, sizeof(SColumnRefNode));
case QUERY_NODE_TARGET:
return makeNode(type, sizeof(STargetNode));
case QUERY_NODE_TUPLE_DESC:
@ -93,6 +91,10 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(STableScanPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return makeNode(type, sizeof(SProjectPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
return makeNode(type, sizeof(SJoinPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_AGG:
return makeNode(type, sizeof(SAggPhysiNode));
default:
break;
}

View File

@ -55,6 +55,9 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
SNode* pExpr;
int32_t index = 0;
FOREACH(pExpr, pCxt->pExprs) {
if (QUERY_NODE_GROUPING_SET == nodeType(pExpr)) {
pExpr = nodesListGetNode(((SGroupingSetNode*)pExpr)->pParameterList, 0);
}
if (nodesEqualNode(pExpr, *pNode)) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
CHECK_ALLOC(pCol, DEAL_RES_ERROR);
@ -406,22 +409,13 @@ typedef struct SPhysiPlanContext {
static int32_t getSlotKey(SNode* pNode, char* pKey) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
return sprintf(pKey, "%s.%s", ((SColumnNode*)pNode)->tableAlias, ((SColumnNode*)pNode)->colName);
} else {
return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName);
SColumnNode* pCol = (SColumnNode*)pNode;
if ('\0' == pCol->tableAlias[0]) {
return sprintf(pKey, "%s", pCol->colName);
}
return sprintf(pKey, "%s.%s", pCol->tableAlias, pCol->colName);
}
}
static SNode* createColumnRef(SNode* pNode, int16_t tupleId, int16_t slotId) {
SColumnRefNode* pCol = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF);
if (NULL == pCol) {
return NULL;
}
pCol->dataType = ((SExprNode*)pNode)->resType;
pCol->tupleId = tupleId;
pCol->slotId = slotId;
pCol->columnId = (QUERY_NODE_COLUMN == nodeType(pNode) ? ((SColumnNode*)pNode)->colId : -1);
return (SNode*)pCol;
return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName);
}
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId) {
@ -429,10 +423,8 @@ static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_
CHECK_ALLOC(pSlot, NULL);
pSlot->slotId = slotId;
pSlot->dataType = ((SExprNode*)pNode)->resType;
pSlot->srcTupleId = -1;
pSlot->srcSlotId = -1;
pSlot->reserve = false;
pSlot->output = true;
pSlot->output = false;
return (SNode*)pSlot;
}
@ -443,17 +435,11 @@ static SNode* createTarget(SNode* pNode, int16_t tupleId, int16_t slotId) {
}
pTarget->tupleId = tupleId;
pTarget->slotId = slotId;
pTarget->pExpr = nodesCloneNode(pNode);
if (NULL == pTarget->pExpr) {
nodesDestroyNode((SNode*)pTarget);
return NULL;
}
pTarget->pExpr = pNode;
return (SNode*)pTarget;
}
static int32_t addTupleDesc(SPhysiPlanContext* pCxt, SNodeList* pList, STupleDescNode* pTuple, SNodeList** pOutput) {
pTuple->tupleId = pCxt->nextTupleId++;
static int32_t addTupleDesc(SPhysiPlanContext* pCxt, SNodeList* pList, STupleDescNode* pTuple) {
SHashObj* pHash = NULL;
if (NULL == pTuple->pSlots) {
pTuple->pSlots = nodesMakeList();
@ -469,11 +455,8 @@ static int32_t addTupleDesc(SPhysiPlanContext* pCxt, SNodeList* pList, STupleDes
pHash = taosArrayGetP(pCxt->pTupleHelper, pTuple->tupleId);
}
*pOutput = nodesMakeList();
CHECK_ALLOC(*pOutput, TSDB_CODE_OUT_OF_MEMORY);
SNode* pNode = NULL;
int16_t slotId = 0;
int16_t slotId = taosHashGetSize(pHash);
FOREACH(pNode, pList) {
SNode* pSlot = createSlotDesc(pCxt, pNode, slotId);
CHECK_ALLOC(pSlot, TSDB_CODE_OUT_OF_MEMORY);
@ -482,48 +465,50 @@ static int32_t addTupleDesc(SPhysiPlanContext* pCxt, SNodeList* pList, STupleDes
return TSDB_CODE_OUT_OF_MEMORY;
}
SNode* pTarget = createTarget(pNode, pTuple->tupleId, slotId);
CHECK_ALLOC(pTarget, TSDB_CODE_OUT_OF_MEMORY);
if (TSDB_CODE_SUCCESS != nodesListAppend(*pOutput, pTarget)) {
nodesDestroyNode(pTarget);
return TSDB_CODE_OUT_OF_MEMORY;
}
SSlotIndex index = { .tupleId = pTuple->tupleId, .slotId = slotId };
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
int32_t len = getSlotKey(pNode, name);
CHECK_CODE(taosHashPut(pHash, name, len, &index, sizeof(SSlotIndex)), TSDB_CODE_OUT_OF_MEMORY);
SNode* pTarget = createTarget(pNode, pTuple->tupleId, slotId);
CHECK_ALLOC(pTarget, TSDB_CODE_OUT_OF_MEMORY);
REPLACE_NODE(pTarget);
++slotId;
}
return TSDB_CODE_SUCCESS;
}
typedef struct STransformCxt {
typedef struct SSetSlotIdCxt {
int32_t errCode;
SHashObj* pHash;
} STransformCxt;
SHashObj* pLeftHash;
SHashObj* pRightHash;
} SSetSlotIdCxt;
static EDealRes doTransform(SNode** pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
STransformCxt* pCxt = (STransformCxt*)pContext;
static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext;
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
int32_t len = getSlotKey(*pNode, name);
SSlotIndex* pIndex = taosHashGet(pCxt->pHash, name, len);
if (NULL != pIndex) {
*pNode = createColumnRef(*pNode, pIndex->tupleId, pIndex->slotId);
CHECK_ALLOC(*pNode, DEAL_RES_ERROR);
return DEAL_RES_IGNORE_CHILD;
int32_t len = getSlotKey(pNode, name);
SSlotIndex* pIndex = taosHashGet(pCxt->pLeftHash, name, len);
if (NULL == pIndex) {
pIndex = taosHashGet(pCxt->pRightHash, name, len);
}
// pIndex is definitely not NULL, otherwise it is a bug
((SColumnNode*)pNode)->tupleId = pIndex->tupleId;
((SColumnNode*)pNode)->slotId = pIndex->slotId;
CHECK_ALLOC(pNode, DEAL_RES_ERROR);
return DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;
}
static SNode* transformForPhysiPlan(SPhysiPlanContext* pCxt, int16_t tupleId, SNode* pNode) {
static SNode* setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftTupleId, int16_t rightTupleId, SNode* pNode) {
SNode* pRes = nodesCloneNode(pNode);
CHECK_ALLOC(pRes, NULL);
STransformCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pHash = taosArrayGetP(pCxt->pTupleHelper, tupleId) };
nodesRewriteNode(&pRes, doTransform, &cxt);
SSetSlotIdCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pLeftHash = taosArrayGetP(pCxt->pTupleHelper, leftTupleId),
.pRightHash = (rightTupleId < 0 ? NULL : taosArrayGetP(pCxt->pTupleHelper, rightTupleId)) };
nodesWalkNode(pRes, doSetSlotId, &cxt);
if (TSDB_CODE_SUCCESS != cxt.errCode) {
nodesDestroyNode(pRes);
return NULL;
@ -531,11 +516,12 @@ static SNode* transformForPhysiPlan(SPhysiPlanContext* pCxt, int16_t tupleId, SN
return pRes;
}
static SNodeList* transformListForPhysiPlan(SPhysiPlanContext* pCxt, int16_t tupleId, SNodeList* pList) {
static SNodeList* setListSlotId(SPhysiPlanContext* pCxt, int16_t leftTupleId, int16_t rightTupleId, SNodeList* pList) {
SNodeList* pRes = nodesCloneList(pList);
CHECK_ALLOC(pRes, NULL);
STransformCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pHash = taosArrayGetP(pCxt->pTupleHelper, tupleId) };
nodesRewriteList(pRes, doTransform, &cxt);
SSetSlotIdCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pLeftHash = taosArrayGetP(pCxt->pTupleHelper, leftTupleId),
.pRightHash = (rightTupleId < 0 ? NULL : taosArrayGetP(pCxt->pTupleHelper, rightTupleId)) };
nodesWalkList(pRes, doSetSlotId, &cxt);
if (TSDB_CODE_SUCCESS != cxt.errCode) {
nodesDestroyList(pRes);
return NULL;
@ -543,22 +529,48 @@ static SNodeList* transformListForPhysiPlan(SPhysiPlanContext* pCxt, int16_t tup
return pRes;
}
static SPhysiNode* makePhysiNode(ENodeType type) {
static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, ENodeType type) {
SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type);
if (NULL == pPhysiNode) {
return NULL;
}
pPhysiNode->outputTuple.tupleId = pCxt->nextTupleId++;
pPhysiNode->outputTuple.type = QUERY_NODE_TUPLE_DESC;
return pPhysiNode;
}
static int32_t initScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SScanPhysiNode* pScanPhysiNode) {
CHECK_CODE(addTupleDesc(pCxt, pScanLogicNode->pScanCols, &pScanPhysiNode->node.outputTuple, &pScanPhysiNode->pScanCols), TSDB_CODE_OUT_OF_MEMORY);
if (NULL != pScanLogicNode->node.pConditions) {
pScanPhysiNode->node.pConditions = transformForPhysiPlan(pCxt, pScanPhysiNode->node.outputTuple.tupleId, pScanLogicNode->node.pConditions);
CHECK_ALLOC(pScanPhysiNode->node.pConditions, TSDB_CODE_OUT_OF_MEMORY);
static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) {
if (NULL != pLogicNode->pConditions) {
pPhysiNode->pConditions = setNodeSlotId(pCxt, pPhysiNode->outputTuple.tupleId, -1, pLogicNode->pConditions);
CHECK_ALLOC(pPhysiNode->pConditions, TSDB_CODE_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
}
static int32_t setSlotOutput(SPhysiPlanContext* pCxt, SNodeList* pTargets, STupleDescNode* pTuple) {
SHashObj* pHash = taosArrayGetP(pCxt->pTupleHelper, pTuple->tupleId);
char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
SNode* pNode;
FOREACH(pNode, pTargets) {
int32_t len = getSlotKey(pNode, name);
SSlotIndex* pIndex = taosHashGet(pHash, name, len);
((SSlotDescNode*)nodesListGetNode(pTuple->pSlots, pIndex->slotId))->output = true;
}
return TSDB_CODE_SUCCESS;
}
static int32_t initScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SScanPhysiNode* pScanPhysiNode) {
if (NULL != pScanLogicNode->pScanCols) {
pScanPhysiNode->pScanCols = nodesCloneList(pScanLogicNode->pScanCols);
CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY);
}
// Tuple describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
CHECK_CODE(addTupleDesc(pCxt, pScanPhysiNode->pScanCols, &pScanPhysiNode->node.outputTuple), TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode), TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE(setSlotOutput(pCxt, pScanLogicNode->node.pTargets, &pScanPhysiNode->node.outputTuple), TSDB_CODE_OUT_OF_MEMORY);
pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
@ -570,14 +582,14 @@ static int32_t initScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanL
}
static SPhysiNode* createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
CHECK_ALLOC(pTagScan, NULL);
CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pTagScan), (SPhysiNode*)pTagScan);
return (SPhysiNode*)pTagScan;
}
static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
CHECK_ALLOC(pTableScan, NULL);
CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pTableScan), (SPhysiNode*)pTableScan);
pTableScan->scanFlag = pScanLogicNode->scanFlag;
@ -597,35 +609,205 @@ static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode*
default:
break;
}
return NULL;
}
static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SProjectLogicNode* pProjectLogicNode) {
SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(QUERY_NODE_PHYSICAL_PLAN_PROJECT);
static SNodeList* createJoinOutputCols(SPhysiPlanContext* pCxt, STupleDescNode* pLeftTuple, STupleDescNode* pRightTuple) {
SNodeList* pCols = nodesMakeList();
CHECK_ALLOC(pCols, NULL);
SNode* pNode;
FOREACH(pNode, pLeftTuple->pSlots) {
SSlotDescNode* pSlot = (SSlotDescNode*)pNode;
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
goto error;
}
pCol->node.resType = pSlot->dataType;
pCol->tupleId = pLeftTuple->tupleId;
pCol->slotId = pSlot->slotId;
pCol->colId = -1;
if (TSDB_CODE_SUCCESS != nodesListAppend(pCols, (SNode*)pCol)) {
goto error;
}
}
FOREACH(pNode, pRightTuple->pSlots) {
SSlotDescNode* pSlot = (SSlotDescNode*)pNode;
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
goto error;
}
pCol->node.resType = pSlot->dataType;
pCol->tupleId = pRightTuple->tupleId;
pCol->slotId = pSlot->slotId;
pCol->colId = -1;
if (TSDB_CODE_SUCCESS != nodesListAppend(pCols, (SNode*)pCol)) {
goto error;
}
}
return pCols;
error:
nodesDestroyList(pCols);
return NULL;
}
static SPhysiNode* createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode) {
SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_JOIN);
CHECK_ALLOC(pJoin, NULL);
STupleDescNode* pLeftTuple = &((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputTuple;
STupleDescNode* pRightTuple = &((SPhysiNode*)nodesListGetNode(pChildren, 1))->outputTuple;
pJoin->pOnConditions = setNodeSlotId(pCxt, pLeftTuple->tupleId, pRightTuple->tupleId, pJoinLogicNode->pOnConditions);
CHECK_ALLOC(pJoin->pOnConditions, (SPhysiNode*)pJoin);
pJoin->pTargets = createJoinOutputCols(pCxt, pLeftTuple, pRightTuple);
CHECK_ALLOC(pJoin->pTargets, (SPhysiNode*)pJoin);
CHECK_CODE(addTupleDesc(pCxt, pJoin->pTargets, &pJoin->node.outputTuple), (SPhysiNode*)pJoin);
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin), (SPhysiNode*)pJoin);
CHECK_CODE(setSlotOutput(pCxt, pJoinLogicNode->node.pTargets, &pJoin->node.outputTuple), (SPhysiNode*)pJoin);
return (SPhysiNode*)pJoin;
}
typedef struct SRewritePrecalcExprsCxt {
int32_t errCode;
int32_t planNodeId;
int32_t rewriteId;
SNodeList* pPrecalcExprs;
} SRewritePrecalcExprsCxt;
static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
SNode* pExpr = nodesCloneNode(*pNode);
CHECK_ALLOC(pExpr, DEAL_RES_ERROR);
if (nodesListAppend(pCxt->pPrecalcExprs, pExpr)) {
nodesDestroyNode(pExpr);
return DEAL_RES_ERROR;
}
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
nodesDestroyNode(pExpr);
return DEAL_RES_ERROR;
}
SExprNode* pToBeRewrittenExpr = (SExprNode*)(*pNode);
pCol->node.resType = pToBeRewrittenExpr->resType;
strcpy(pCol->colName, pToBeRewrittenExpr->aliasName);
nodesDestroyNode(*pNode);
*pNode = (SNode*)pCol;
return DEAL_RES_IGNORE_CHILD;
}
static EDealRes doRewritePrecalcExprs(SNode** pNode, void* pContext) {
SRewritePrecalcExprsCxt* pCxt = (SRewritePrecalcExprsCxt*)pContext;
switch (nodeType(*pNode)) {
case QUERY_NODE_OPERATOR:
case QUERY_NODE_LOGIC_CONDITION: {
return collectAndRewrite(pContext, pNode);
}
case QUERY_NODE_FUNCTION: {
if (!fmIsAggFunc(((SFunctionNode*)(*pNode))->funcId)) {
return collectAndRewrite(pContext, pNode);
}
}
default:
break;
}
return DEAL_RES_CONTINUE;
}
static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SNodeList** pPrecalcExprs, SNodeList** pRewrittenList) {
if (NULL == pList) {
return TSDB_CODE_SUCCESS;
}
if (NULL == *pPrecalcExprs) {
*pPrecalcExprs = nodesMakeList();
CHECK_ALLOC(*pPrecalcExprs, TSDB_CODE_OUT_OF_MEMORY);
}
if (NULL == *pRewrittenList) {
*pRewrittenList = nodesMakeList();
CHECK_ALLOC(*pRewrittenList, TSDB_CODE_OUT_OF_MEMORY);
}
SNode* pNode = NULL;
FOREACH(pNode, pList) {
SNode* pNew = NULL;
if (QUERY_NODE_GROUPING_SET == nodeType(pNode)) {
pNew = nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pNode)->pParameterList, 0));
} else {
pNew = nodesCloneNode(pNode);
}
CHECK_ALLOC(pNew, TSDB_CODE_OUT_OF_MEMORY);
CHECK_CODE(nodesListAppend(*pRewrittenList, pNew), TSDB_CODE_OUT_OF_MEMORY);
}
SRewritePrecalcExprsCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs };
nodesRewriteList(*pRewrittenList, doRewritePrecalcExprs, &cxt);
if (0 == LIST_LENGTH(cxt.pPrecalcExprs)) {
nodesDestroyList(cxt.pPrecalcExprs);
*pPrecalcExprs = NULL;
}
return cxt.errCode;
}
static SPhysiNode* createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode) {
SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_AGG);
CHECK_ALLOC(pAgg, NULL);
SNodeList* pPrecalcExprs = NULL;
SNodeList* pGroupKeys = NULL;
SNodeList* pAggFuncs = NULL;
CHECK_CODE(rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys), (SPhysiNode*)pAgg);
CHECK_CODE(rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs), (SPhysiNode*)pAgg);
STupleDescNode* pChildTupe = &(((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputTuple);
// push down expression to outputTuple of child node
if (NULL != pPrecalcExprs) {
pAgg->pExprs = setListSlotId(pCxt, pChildTupe->tupleId, -1, pPrecalcExprs);
CHECK_ALLOC(pAgg->pExprs, (SPhysiNode*)pAgg);
CHECK_CODE(addTupleDesc(pCxt, pAgg->pExprs, pChildTupe), (SPhysiNode*)pAgg);
}
if (NULL != pGroupKeys) {
pAgg->pGroupKeys = setListSlotId(pCxt, pChildTupe->tupleId, -1, pGroupKeys);
CHECK_ALLOC(pAgg->pGroupKeys, (SPhysiNode*)pAgg);
CHECK_CODE(addTupleDesc(pCxt, pAgg->pGroupKeys, &pAgg->node.outputTuple), (SPhysiNode*)pAgg);
}
if (NULL != pAggFuncs) {
pAgg->pAggFuncs = setListSlotId(pCxt, pChildTupe->tupleId, -1, pAggFuncs);
CHECK_ALLOC(pAgg->pAggFuncs, (SPhysiNode*)pAgg);
CHECK_CODE(addTupleDesc(pCxt, pAgg->pAggFuncs, &pAgg->node.outputTuple), (SPhysiNode*)pAgg);
}
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg), (SPhysiNode*)pAgg);
CHECK_CODE(setSlotOutput(pCxt, pAggLogicNode->node.pTargets, &pAgg->node.outputTuple), (SPhysiNode*)pAgg);
return (SPhysiNode*)pAgg;
}
static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SProjectLogicNode* pProjectLogicNode) {
SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
CHECK_ALLOC(pProject, NULL);
SNodeList* pProjections = transformListForPhysiPlan(pCxt, pProject->node.outputTuple.tupleId, pProjectLogicNode->pProjections);
CHECK_ALLOC(pProjections, (SPhysiNode*)pProject);
CHECK_CODE(addTupleDesc(pCxt, pProjections, &pProject->node.outputTuple, &pProject->pProjections), (SPhysiNode*)pProject);
nodesDestroyList(pProjections);
pProject->pProjections = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputTuple.tupleId, -1, pProjectLogicNode->pProjections);
CHECK_ALLOC(pProject->pProjections, (SPhysiNode*)pProject);
CHECK_CODE(addTupleDesc(pCxt, pProject->pProjections, &pProject->node.outputTuple), (SPhysiNode*)pProject);
if (NULL != pProjectLogicNode->node.pConditions) {
pProject->node.pConditions = transformForPhysiPlan(pCxt, pProject->node.outputTuple.tupleId, pProjectLogicNode->node.pConditions);
CHECK_ALLOC(pProject->node.pConditions, (SPhysiNode*)pProject);
}
CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject), (SPhysiNode*)pProject);
return (SPhysiNode*)pProject;
}
static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPlan) {
SNodeList* pChildern = nodesMakeList();
CHECK_ALLOC(pChildern, NULL);
SNodeList* pChildren = nodesMakeList();
CHECK_ALLOC(pChildren, NULL);
SNode* pLogicChild;
FOREACH(pLogicChild, pLogicPlan->pChildren) {
SNode* pChildPhyNode = (SNode*)createPhysiNode(pCxt, (SLogicNode*)pLogicChild);
if (TSDB_CODE_SUCCESS != nodesListAppend(pChildern, pChildPhyNode)) {
if (TSDB_CODE_SUCCESS != nodesListAppend(pChildren, pChildPhyNode)) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
nodesDestroyList(pChildern);
nodesDestroyList(pChildren);
return NULL;
}
}
@ -636,22 +818,22 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPl
pPhyNode = createScanPhysiNode(pCxt, (SScanLogicNode*)pLogicPlan);
break;
case QUERY_NODE_LOGIC_PLAN_JOIN:
pPhyNode = createJoinPhysiNode(pCxt, pChildren, (SJoinLogicNode*)pLogicPlan);
break;
case QUERY_NODE_LOGIC_PLAN_AGG:
pPhyNode = createAggPhysiNode(pCxt, pChildren, (SAggLogicNode*)pLogicPlan);
break;
case QUERY_NODE_LOGIC_PLAN_PROJECT:
pPhyNode = createProjectPhysiNode(pCxt, (SProjectLogicNode*)pLogicPlan);
pPhyNode = createProjectPhysiNode(pCxt, pChildren, (SProjectLogicNode*)pLogicPlan);
break;
default:
break;
}
if (NULL != pPhyNode) {
pPhyNode->pChildren = pChildern;
SNode* pChild;
FOREACH(pChild, pPhyNode->pChildren) {
((SPhysiNode*)pChild)->pParent = pPhyNode;
}
pPhyNode->pChildren = pChildren;
SNode* pChild;
FOREACH(pChild, pPhyNode->pChildren) {
((SPhysiNode*)pChild)->pParent = pPhyNode;
}
return pPhyNode;

View File

@ -123,8 +123,8 @@ TEST_F(NewPlannerTest, simple) {
TEST_F(NewPlannerTest, groupBy) {
setDatabase("root", "test");
bind("SELECT count(*) FROM t1");
ASSERT_TRUE(run());
// bind("SELECT count(*) FROM t1");
// ASSERT_TRUE(run());
bind("SELECT c1, count(*) FROM t1 GROUP BY c1");
ASSERT_TRUE(run());

View File

@ -307,12 +307,12 @@ typedef struct SFilterInfo {
#define FILTER_GET_FIELD(i, id) (&((i)->fields[(id).type].fields[(id).idx]))
#define FILTER_GET_COL_FIELD(i, idx) (&((i)->fields[FLD_TYPE_COLUMN].fields[idx]))
#define FILTER_GET_COL_FIELD_TYPE(fi) (((SColumnRefNode *)((fi)->desc))->dataType.type)
#define FILTER_GET_COL_FIELD_SIZE(fi) (((SColumnRefNode *)((fi)->desc))->dataType.bytes)
#define FILTER_GET_COL_FIELD_ID(fi) (((SColumnRefNode *)((fi)->desc))->columnId)
#define FILTER_GET_COL_FIELD_SLOT_ID(fi) (((SColumnRefNode *)((fi)->desc))->slotId)
#define FILTER_GET_COL_FIELD_DESC(fi) ((SColumnRefNode *)((fi)->desc))
#define FILTER_GET_COL_FIELD_DATA(fi, ri) ((char *)(fi)->data + ((SColumnRefNode *)((fi)->desc))->dataType.bytes * (ri))
#define FILTER_GET_COL_FIELD_TYPE(fi) (((SColumnNode *)((fi)->desc))->node.resType.type)
#define FILTER_GET_COL_FIELD_SIZE(fi) (((SColumnNode *)((fi)->desc))->node.resType.bytes)
#define FILTER_GET_COL_FIELD_ID(fi) (((SColumnNode *)((fi)->desc))->colId)
#define FILTER_GET_COL_FIELD_SLOT_ID(fi) (((SColumnNode *)((fi)->desc))->slotId)
#define FILTER_GET_COL_FIELD_DESC(fi) ((SColumnNode *)((fi)->desc))
#define FILTER_GET_COL_FIELD_DATA(fi, ri) ((char *)(fi)->data + ((SColumnNode *)((fi)->desc))->node.resType.bytes * (ri))
#define FILTER_GET_VAL_FIELD_TYPE(fi) (((SValueNode *)((fi)->desc))->node.resType.type)
#define FILTER_GET_VAL_FIELD_DATA(fi) ((char *)(fi)->data)
#define FILTER_GET_JSON_VAL_FIELD_DATA(fi) ((char *)(fi)->desc)

View File

@ -886,14 +886,14 @@ int32_t filterAddFieldFromNode(SFilterInfo *info, SNode *node, SFilterFieldId *f
FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (nodeType(node) != QUERY_NODE_COLUMN_REF && nodeType(node) != QUERY_NODE_VALUE) {
if (nodeType(node) != QUERY_NODE_COLUMN && nodeType(node) != QUERY_NODE_VALUE) {
FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
int32_t type;
void *v;
if (nodeType(node) == QUERY_NODE_COLUMN_REF) {
if (nodeType(node) == QUERY_NODE_COLUMN) {
type = FLD_TYPE_COLUMN;
v = node;
} else {
@ -1418,7 +1418,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
qDebug("COLUMN Field Num:%u", info->fields[FLD_TYPE_COLUMN].num);
for (uint32_t i = 0; i < info->fields[FLD_TYPE_COLUMN].num; ++i) {
SFilterField *field = &info->fields[FLD_TYPE_COLUMN].fields[i];
SColumnRefNode *refNode = (SColumnRefNode *)field->desc;
SColumnNode *refNode = (SColumnNode *)field->desc;
qDebug("COL%d => [%d][%d]", i, refNode->tupleId, refNode->slotId);
}
@ -1447,7 +1447,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
char str[512] = {0};
SFilterField *left = FILTER_UNIT_LEFT_FIELD(info, unit);
SColumnRefNode *refNode = (SColumnRefNode *)left->desc;
SColumnNode *refNode = (SColumnNode *)left->desc;
if (unit->compare.optr >= TSDB_RELATION_INVALID && unit->compare.optr <= TSDB_RELATION_NMATCH){
len = sprintf(str, "UNIT[%d] => [%d][%d] %s [", i, refNode->tupleId, refNode->slotId, gOptrStr[unit->compare.optr].str);
}
@ -3549,17 +3549,17 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
return DEAL_RES_ERROR;
}
if (QUERY_NODE_COLUMN_REF != nodeType(node->pLeft)) {
if (QUERY_NODE_COLUMN != nodeType(node->pLeft)) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
} else {
if ((QUERY_NODE_COLUMN_REF != nodeType(node->pLeft)) && (QUERY_NODE_VALUE != nodeType(node->pLeft))) {
if ((QUERY_NODE_COLUMN != nodeType(node->pLeft)) && (QUERY_NODE_VALUE != nodeType(node->pLeft))) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
if ((QUERY_NODE_COLUMN_REF != nodeType(node->pRight)) && (QUERY_NODE_VALUE != nodeType(node->pRight))) {
if ((QUERY_NODE_COLUMN != nodeType(node->pRight)) && (QUERY_NODE_VALUE != nodeType(node->pRight))) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
@ -3569,7 +3569,7 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
if (QUERY_NODE_COLUMN_REF != nodeType(node->pLeft)) {
if (QUERY_NODE_COLUMN != nodeType(node->pLeft)) {
SNode *t = node->pLeft;
node->pLeft = node->pRight;
node->pRight = t;
@ -3582,10 +3582,10 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
}
if (OP_TYPE_IN != node->opType) {
SColumnRefNode *refNode = (SColumnRefNode *)node->pLeft;
SColumnNode *refNode = (SColumnNode *)node->pLeft;
SValueNode *valueNode = (SValueNode *)node->pRight;
int32_t type = vectorGetConvertType(refNode->dataType.type, valueNode->node.resType.type);
if (0 != type && type != refNode->dataType.type) {
int32_t type = vectorGetConvertType(refNode->node.resType.type, valueNode->node.resType.type);
if (0 != type && type != refNode->node.resType.type) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}

View File

@ -50,13 +50,13 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
//TODO BUILD HASH
break;
}
case QUERY_NODE_COLUMN_REF: {
case QUERY_NODE_COLUMN: {
if (NULL == ctx) {
sclError("invalid node type for constant calculating, type:%d, ctx:%p", nodeType(node), ctx);
SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
SColumnRefNode *ref = (SColumnRefNode *)node;
SColumnNode *ref = (SColumnNode *)node;
if (ref->slotId >= taosArrayGetSize(ctx->pSrc->pDataBlock)) {
sclError("column ref slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, (int32_t)taosArrayGetSize(ctx->pSrc->pDataBlock));
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -190,7 +190,8 @@ int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outpu
SScalarFuncExecFuncs ffpSet = {0};
int32_t code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
if (code) {
sclError( "fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
sclError(
"fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
SCL_ERR_RET(code);
}
@ -208,7 +209,8 @@ int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outpu
for (int32_t i = 0; i < rowNum; ++i) {
code = (*ffpSet.process)(params, node->pParameterList->length, output);
if (code) {
sclError( "scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
sclError(
"scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
SCL_ERR_JRET(code);
}