feat: sql statement likes 'select * from (select 1)'
This commit is contained in:
parent
b99c6958fa
commit
b7e5780c44
|
@ -2074,6 +2074,7 @@ typedef struct {
|
|||
char* tbName;
|
||||
int8_t action;
|
||||
char* colName;
|
||||
int32_t colId;
|
||||
// TSDB_ALTER_TABLE_ADD_COLUMN
|
||||
int8_t type;
|
||||
int8_t flags;
|
||||
|
|
|
@ -26,6 +26,12 @@ extern "C" {
|
|||
|
||||
#define SLOT_NAME_LEN TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN
|
||||
|
||||
typedef enum EDataOrderLevel {
|
||||
DATA_ORDER_LEVEL_NONE = 1,
|
||||
DATA_ORDER_LEVEL_IN_BLOCK,
|
||||
DATA_ORDER_LEVEL_IN_GROUP
|
||||
} EDataOrderLevel;
|
||||
|
||||
typedef struct SLogicNode {
|
||||
ENodeType type;
|
||||
SNodeList* pTargets; // SColumnNode
|
||||
|
@ -36,6 +42,8 @@ typedef struct SLogicNode {
|
|||
uint8_t precision;
|
||||
SNode* pLimit;
|
||||
SNode* pSlimit;
|
||||
EDataOrderLevel requireDataOrder; // requirements for input data
|
||||
EDataOrderLevel resultDataOrder; // properties of the output data
|
||||
} SLogicNode;
|
||||
|
||||
typedef enum EScanType {
|
||||
|
@ -317,6 +325,7 @@ typedef STableScanPhysiNode SStreamScanPhysiNode;
|
|||
typedef struct SProjectPhysiNode {
|
||||
SPhysiNode node;
|
||||
SNodeList* pProjections;
|
||||
bool mergeDataBlock;
|
||||
} SProjectPhysiNode;
|
||||
|
||||
typedef struct SIndefRowsFuncPhysiNode {
|
||||
|
|
|
@ -548,37 +548,12 @@ static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols";
|
|||
static const char* jkScanLogicPlanTableType = "TableType";
|
||||
static const char* jkScanLogicPlanTableId = "TableId";
|
||||
static const char* jkScanLogicPlanStableId = "StableId";
|
||||
static const char* jkScanLogicPlanScanType = "ScanType";
|
||||
static const char* jkScanLogicPlanScanCount = "ScanCount";
|
||||
static const char* jkScanLogicPlanReverseScanCount = "ReverseScanCount";
|
||||
static const char* jkScanLogicPlanTagCond = "TagCond";
|
||||
static const char* jkScanLogicPlanGroupTags = "GroupTags";
|
||||
|
||||
// typedef struct SScanLogicNode {
|
||||
// uint64_t stableId;
|
||||
// SVgroupsInfo* pVgroupList;
|
||||
// EScanType scanType;
|
||||
// uint8_t scanSeq[2]; // first is scan count, and second is reverse scan count
|
||||
// STimeWindow scanRange;
|
||||
// SName tableName;
|
||||
// bool showRewrite;
|
||||
// double ratio;
|
||||
// SNodeList* pDynamicScanFuncs;
|
||||
// int32_t dataRequired;
|
||||
// int64_t interval;
|
||||
// int64_t offset;
|
||||
// int64_t sliding;
|
||||
// int8_t intervalUnit;
|
||||
// int8_t slidingUnit;
|
||||
// SNode* pTagCond;
|
||||
// SNode* pTagIndexCond;
|
||||
// int8_t triggerType;
|
||||
// int64_t watermark;
|
||||
// int8_t igExpired;
|
||||
// SArray* pSmaIndexes;
|
||||
// SNodeList* pGroupTags;
|
||||
// bool groupSort;
|
||||
// } SScanLogicNode;
|
||||
|
||||
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
||||
|
||||
|
@ -598,6 +573,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanStableId, pNode->stableId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanScanType, pNode->scanType);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanScanCount, pNode->scanSeq[0]);
|
||||
}
|
||||
|
@ -634,6 +612,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUBigIntValue(pJson, jkScanLogicPlanStableId, &pNode->stableId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tjsonGetNumberValue(pJson, jkScanLogicPlanScanType, pNode->scanType, code);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUTinyIntValue(pJson, jkScanLogicPlanScanCount, &pNode->scanSeq[0]);
|
||||
}
|
||||
|
@ -1677,6 +1658,7 @@ static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) {
|
|||
}
|
||||
|
||||
static const char* jkProjectPhysiPlanProjections = "Projections";
|
||||
static const char* jkProjectPhysiPlanMergeDataBlock = "MergeDataBlock";
|
||||
|
||||
static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SProjectPhysiNode* pNode = (const SProjectPhysiNode*)pObj;
|
||||
|
@ -1685,6 +1667,9 @@ static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkProjectPhysiPlanProjections, pNode->pProjections);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkProjectPhysiPlanMergeDataBlock, pNode->mergeDataBlock);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1696,6 +1681,9 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkProjectPhysiPlanProjections, &pNode->pProjections);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkProjectPhysiPlanMergeDataBlock, &pNode->mergeDataBlock);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -201,7 +201,8 @@ static int32_t calcConstProject(SNode* pProject, bool dual, SNode** pNew) {
|
|||
}
|
||||
|
||||
static bool isUselessCol(SExprNode* pProj) {
|
||||
if (QUERY_NODE_FUNCTION == nodeType(pProj) && !fmIsScalarFunc(((SFunctionNode*)pProj)->funcId)) {
|
||||
if (QUERY_NODE_FUNCTION == nodeType(pProj) && !fmIsScalarFunc(((SFunctionNode*)pProj)->funcId) &&
|
||||
!fmIsPseudoColumnFunc(((SFunctionNode*)pProj)->funcId)) {
|
||||
return false;
|
||||
}
|
||||
return NULL == ((SExprNode*)pProj)->pAssociation;
|
||||
|
|
|
@ -5968,6 +5968,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
|
|||
if (NULL == pReq->tagName) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pReq->colId = pSchema->colId;
|
||||
|
||||
SDataType targetDt = schemaToDataType(pTableMeta->tableInfo.precision, pSchema);
|
||||
if (DEAL_RES_ERROR == translateValueImpl(pCxt, pStmt->pVal, targetDt, true)) {
|
||||
|
@ -6051,6 +6052,7 @@ static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt,
|
|||
if (NULL == pReq->colName) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pReq->colId = pSchema->colId;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -6071,6 +6073,7 @@ static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt
|
|||
if (NULL == pReq->colName) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pReq->colId = pSchema->colId;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
typedef struct SLogicPlanContext {
|
||||
SPlanContext* pPlanCxt;
|
||||
SLogicNode* pCurrRoot;
|
||||
bool hasScan;
|
||||
} SLogicPlanContext;
|
||||
|
||||
typedef int32_t (*FCreateLogicNode)(SLogicPlanContext*, void*, SLogicNode**);
|
||||
|
@ -161,6 +162,10 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
|
|||
return SCAN_TYPE_STREAM;
|
||||
}
|
||||
|
||||
if (TSDB_SYSTEM_TABLE == tableType) {
|
||||
return SCAN_TYPE_SYSTEM_TABLE;
|
||||
}
|
||||
|
||||
if (NULL == pScanCols) {
|
||||
return NULL == pScanPseudoCols
|
||||
? SCAN_TYPE_TABLE
|
||||
|
@ -169,17 +174,6 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
|
|||
: SCAN_TYPE_TABLE);
|
||||
}
|
||||
|
||||
if (TSDB_SYSTEM_TABLE == tableType) {
|
||||
return SCAN_TYPE_SYSTEM_TABLE;
|
||||
}
|
||||
|
||||
SNode* pCol = NULL;
|
||||
FOREACH(pCol, pScanCols) {
|
||||
if (COLUMN_TYPE_COLUMN == ((SColumnNode*)pCol)->colType) {
|
||||
return SCAN_TYPE_TABLE;
|
||||
}
|
||||
}
|
||||
|
||||
return SCAN_TYPE_TABLE;
|
||||
}
|
||||
|
||||
|
@ -300,6 +294,8 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
nodesDestroyNode((SNode*)pScan);
|
||||
}
|
||||
|
||||
pCxt->hasScan = true;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1339,9 +1335,9 @@ static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) {
|
|||
|
||||
static void setLogicNodeParent(SLogicNode* pNode) { doSetLogicNodeParent(pNode, NULL); }
|
||||
|
||||
static void setLogicSubplanType(SLogicSubplan* pSubplan) {
|
||||
static void setLogicSubplanType(bool hasScan, SLogicSubplan* pSubplan) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pSubplan->pNode)) {
|
||||
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
||||
pSubplan->subplanType = hasScan ? SUBPLAN_TYPE_SCAN : SUBPLAN_TYPE_MERGE;
|
||||
} else {
|
||||
SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pSubplan->pNode;
|
||||
pSubplan->subplanType = (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren)
|
||||
|
@ -1351,7 +1347,7 @@ static void setLogicSubplanType(SLogicSubplan* pSubplan) {
|
|||
}
|
||||
|
||||
int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) {
|
||||
SLogicPlanContext cxt = {.pPlanCxt = pCxt};
|
||||
SLogicPlanContext cxt = {.pPlanCxt = pCxt, .pCurrRoot = NULL, .hasScan = false};
|
||||
|
||||
SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
|
||||
if (NULL == pSubplan) {
|
||||
|
@ -1364,7 +1360,7 @@ int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) {
|
|||
int32_t code = createQueryLogicNode(&cxt, pCxt->pAstRoot, &pSubplan->pNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
setLogicNodeParent(pSubplan->pNode);
|
||||
setLogicSubplanType(pSubplan);
|
||||
setLogicSubplanType(cxt.hasScan, pSubplan);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -35,7 +35,8 @@ typedef struct SPhysiPlanContext {
|
|||
int32_t errCode;
|
||||
int16_t nextDataBlockId;
|
||||
SArray* pLocationHelper;
|
||||
SArray* pExecNodeList; // SArray<SQueryNodeLoad>
|
||||
bool hasScan;
|
||||
bool hasSysScan;
|
||||
} SPhysiPlanContext;
|
||||
|
||||
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
|
||||
|
@ -255,7 +256,7 @@ static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlo
|
|||
|
||||
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList,
|
||||
SDataBlockDescNode* pDataBlockDesc) {
|
||||
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, true, false);
|
||||
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, false, false);
|
||||
}
|
||||
|
||||
static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
|
||||
|
@ -495,8 +496,6 @@ static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSub
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode);
|
||||
}
|
||||
|
||||
|
@ -577,8 +576,6 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
|||
pSubplan->execNode.nodeId = MNODE_HANDLE;
|
||||
pSubplan->execNode.epSet = pCxt->pPlanCxt->mgmtEpSet;
|
||||
}
|
||||
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_DNODE_VARIABLES)) {
|
||||
pScan->mgmtEpSet = pScanLogicNode->pVgroupList->vgroups->epSet;
|
||||
} else {
|
||||
|
@ -586,6 +583,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
|||
}
|
||||
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
|
||||
|
||||
pCxt->hasSysScan = true;
|
||||
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
|
||||
}
|
||||
|
||||
|
@ -601,6 +599,7 @@ static int32_t createTableMergeScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
|||
|
||||
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
|
||||
SPhysiNode** pPhyNode) {
|
||||
pCxt->hasScan = true;
|
||||
switch (pScanLogicNode->scanType) {
|
||||
case SCAN_TYPE_TAG:
|
||||
case SCAN_TYPE_BLOCK_INFO:
|
||||
|
@ -1806,23 +1805,31 @@ static void setExplainInfo(SPlanContext* pCxt, SQueryPlan* pPlan) {
|
|||
}
|
||||
}
|
||||
|
||||
static void setExecNodeList(SPhysiPlanContext* pCxt, SArray* pExecNodeList) {
|
||||
if (NULL == pExecNodeList) {
|
||||
return;
|
||||
}
|
||||
if (pCxt->hasSysScan || !pCxt->hasScan) {
|
||||
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
|
||||
taosArrayPush(pExecNodeList, &node);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
|
||||
SPhysiPlanContext cxt = {.pPlanCxt = pCxt,
|
||||
.errCode = TSDB_CODE_SUCCESS,
|
||||
.nextDataBlockId = 0,
|
||||
.pLocationHelper = taosArrayInit(32, POINTER_BYTES),
|
||||
.pExecNodeList = pExecNodeList};
|
||||
.hasScan = false,
|
||||
.hasSysScan = false};
|
||||
if (NULL == cxt.pLocationHelper) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (QUERY_POLICY_VNODE == tsQueryPolicy) {
|
||||
taosArrayClear(pExecNodeList);
|
||||
}
|
||||
|
||||
int32_t code = doCreatePhysiPlan(&cxt, pLogicPlan, pPlan);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
setExplainInfo(pCxt, *pPlan);
|
||||
setExecNodeList(&cxt, pExecNodeList);
|
||||
}
|
||||
|
||||
destoryPhysiPlanContext(&cxt);
|
||||
|
|
Loading…
Reference in New Issue