From e4dd627b20737f30767c15cda789ebf5518cbb98 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 19 Mar 2022 03:02:31 -0400 Subject: [PATCH] stream plan implement --- include/libs/nodes/plannodes.h | 3 +- include/libs/nodes/querynodes.h | 3 +- include/libs/parser/parser.h | 2 +- include/libs/planner/planner.h | 1 + include/util/tjson.h | 8 + source/client/inc/clientInt.h | 2 +- source/client/src/clientImpl.c | 4 +- source/dnode/mnode/impl/src/mndTopic.c | 2 +- source/libs/nodes/src/nodesCodeFuncs.c | 231 ++++++++++++++++++++- source/libs/parser/src/parTranslater.c | 4 +- source/libs/planner/src/planLogicCreater.c | 2 +- source/libs/planner/src/planPhysiCreater.c | 24 ++- source/libs/planner/src/planSpliter.c | 3 +- source/libs/planner/test/plannerTest.cpp | 20 +- 14 files changed, 282 insertions(+), 27 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 18417cb608..cbc1f6c1f8 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -36,6 +36,7 @@ typedef enum EScanType { SCAN_TYPE_TAG, SCAN_TYPE_TABLE, SCAN_TYPE_STABLE, + SCAN_TYPE_TOPIC, SCAN_TYPE_STREAM } EScanType; @@ -154,7 +155,7 @@ typedef struct SPhysiNode { } SPhysiNode; typedef struct SScanPhysiNode { - SPhysiNode node; + SPhysiNode node; SNodeList* pScanCols; uint64_t uid; // unique id of the table int8_t tableType; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index a55a0e218d..2c733454dc 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -23,7 +23,8 @@ extern "C" { #include "nodes.h" #include "tmsg.h" -#define TABLE_META_SIZE(pMeta) (NULL == (pMeta) ? 0 : (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfColumns + (pMeta)->tableInfo.numOfTags) * sizeof(SSchema))) +#define TABLE_TOTAL_COL_NUM(pMeta) ((pMeta)->tableInfo.numOfColumns + (pMeta)->tableInfo.numOfTags) +#define TABLE_META_SIZE(pMeta) (NULL == (pMeta) ? 0 : (sizeof(STableMeta) + TABLE_TOTAL_COL_NUM((pMeta)) * sizeof(SSchema))) #define VGROUPS_INFO_SIZE(pInfo) (NULL == (pInfo) ? 0 : (sizeof(SVgroupsInfo) + (pInfo)->numOfVgroups * sizeof(SVgroupInfo))) typedef struct SRawExprNode { diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 23bcdabb1b..74a15e2d18 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -26,7 +26,7 @@ typedef struct SParseContext { uint64_t requestId; int32_t acctId; const char *db; - bool streamQuery; + bool topicQuery; void *pTransporter; SEpSet mgmtEpSet; const char *pSql; // sql string diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 7eb9d038a5..4ba04d1713 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -26,6 +26,7 @@ typedef struct SPlanContext { uint64_t queryId; int32_t acctId; SNode* pAstRoot; + bool topicQuery; bool streamQuery; } SPlanContext; diff --git a/include/util/tjson.h b/include/util/tjson.h index b27f3b93ac..6b2221f704 100644 --- a/include/util/tjson.h +++ b/include/util/tjson.h @@ -22,6 +22,14 @@ extern "C" { #endif +#define tjsonGetNumberValue(pJson, pName, val) \ + ({ \ + uint64_t _tmp = 0; \ + int32_t _code = tjsonGetUBigIntValue(pJson, pName, &_tmp); \ + val = _tmp; \ + _code; \ + }) + typedef void SJson; SJson* tjsonCreateObject(); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index e341729a8f..4067807eaf 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -230,7 +230,7 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); -int32_t parseSql(SRequestObj* pRequest, bool streamQuery, SQuery** pQuery); +int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery); int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); // --- heartbeat diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index aa94ed42fd..a5752a6f2e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -137,14 +137,14 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj* return TSDB_CODE_SUCCESS; } -int32_t parseSql(SRequestObj* pRequest, bool streamQuery, SQuery** pQuery) { +int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) { STscObj* pTscObj = pRequest->pTscObj; SParseContext cxt = { .requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = pRequest->pDb, - .streamQuery = streamQuery, + .topicQuery = topicQuery, .pSql = pRequest->sqlstr, .sqlLen = pRequest->sqlLen, .pMsg = pRequest->msgBuf, diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index e913cae1ac..6cdf56f93e 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -246,7 +246,7 @@ static int32_t mndGetPlanString(SCMCreateTopicReq *pCreate, char **pStr) { SQueryPlan* pPlan = NULL; if (TSDB_CODE_SUCCESS == code) { - SPlanContext cxt = { .pAstRoot = pAst, .streamQuery = true }; + SPlanContext cxt = { .pAstRoot = pAst, .topicQuery = true }; code = qCreateQueryPlan(&cxt, &pPlan, NULL); } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index c61a2d71fb..9b9eaaa6a9 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "cmdnodes.h" #include "nodesUtil.h" #include "plannodes.h" #include "querynodes.h" @@ -85,6 +86,8 @@ const char* nodesNodeName(ENodeType type) { return "ShowDatabaseStmt"; case QUERY_NODE_SHOW_TABLES_STMT: return "ShowTablesStmt"; + case QUERY_NODE_CREATE_TOPIC_STMT: + return "CreateTopicStmt"; case QUERY_NODE_LOGIC_PLAN_SCAN: return "LogicScan"; case QUERY_NODE_LOGIC_PLAN_JOIN: @@ -179,16 +182,118 @@ static int32_t jsonToNodeList(const SJson* pJson, const char* pName, SNodeList** return jsonToNodeListImpl(tjsonGetObjectItem(pJson, pName), pList); } -static const char* jkTableMetaUid = "TableMetaUid"; -static const char* jkTableMetaSuid = "TableMetaSuid"; +static const char* jkTableComInfoNumOfTags = "NumOfTags"; +static const char* jkTableComInfoPrecision = "Precision"; +static const char* jkTableComInfoNumOfColumns = "NumOfColumns"; +static const char* jkTableComInfoRowSize = "RowSize"; + +static int32_t tableComInfoToJson(const void* pObj, SJson* pJson) { + const STableComInfo* pNode = (const STableComInfo*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkTableComInfoNumOfTags, pNode->numOfTags); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableComInfoPrecision, pNode->precision); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableComInfoRowSize, pNode->rowSize); + } + + return code; +} + +static int32_t jsonToTableComInfo(const SJson* pJson, void* pObj) { + STableComInfo* pNode = (STableComInfo*)pObj; + + int32_t code = tjsonGetNumberValue(pJson, jkTableComInfoNumOfTags, pNode->numOfTags); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkTableComInfoPrecision, pNode->precision); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkTableComInfoRowSize, pNode->rowSize); + } + + return code; +} + +static const char* jkSchemaType = "Type"; +static const char* jkSchemaColId = "ColId"; +static const char* jkSchemaBytes = "bytes"; +static const char* jkSchemaName = "Name"; + +static int32_t schemaToJson(const void* pObj, SJson* pJson) { + const SSchema* pNode = (const SSchema*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkSchemaType, pNode->type); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkSchemaColId, pNode->colId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkSchemaBytes, pNode->bytes); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddStringToObject(pJson, jkSchemaName, pNode->name); + } + + return code; +} + +static int32_t jsonToSchema(const SJson* pJson, void* pObj) { + SSchema* pNode = (SSchema*)pObj; + + int32_t code = tjsonGetNumberValue(pJson, jkSchemaType, pNode->type); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkSchemaColId, pNode->colId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkSchemaBytes, pNode->bytes); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkSchemaName, pNode->name); + } + + return code; +} + +static const char* jkTableMetaVgId = "VgId"; +static const char* jkTableMetaTableType = "TableType"; +static const char* jkTableMetaUid = "Uid"; +static const char* jkTableMetaSuid = "Suid"; +static const char* jkTableMetaSversion = "Sversion"; +static const char* jkTableMetaTversion = "Tversion"; +static const char* jkTableMetaComInfo = "ComInfo"; +static const char* jkTableMetaColSchemas = "ColSchemas"; static int32_t tableMetaToJson(const void* pObj, SJson* pJson) { const STableMeta* pNode = (const STableMeta*)pObj; - int32_t code = tjsonAddIntegerToObject(pJson, jkTableMetaUid, pNode->uid); + int32_t code = tjsonAddIntegerToObject(pJson, jkTableMetaVgId, pNode->vgId); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableMetaTableType, pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableMetaUid, pNode->uid); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkTableMetaSuid, pNode->suid); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableMetaSversion, pNode->sversion); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableMetaTversion, pNode->tversion); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkTableMetaComInfo, tableComInfoToJson, &pNode->tableInfo); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddArray(pJson, jkTableMetaColSchemas, schemaToJson, pNode->schema, sizeof(SSchema), TABLE_TOTAL_COL_NUM(pNode)); + } return code; } @@ -196,9 +301,27 @@ static int32_t tableMetaToJson(const void* pObj, SJson* pJson) { static int32_t jsonToTableMeta(const SJson* pJson, void* pObj) { STableMeta* pNode = (STableMeta*)pObj; - int32_t code = tjsonGetUBigIntValue(pJson, jkTableMetaUid, &pNode->uid); + int32_t code = tjsonGetNumberValue(pJson, jkTableMetaVgId, pNode->vgId); if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetUBigIntValue(pJson, jkTableMetaSuid, &pNode->suid); + code = tjsonGetNumberValue(pJson, jkTableMetaTableType, pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkTableMetaUid, pNode->uid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkTableMetaSuid, pNode->suid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkTableMetaSversion, pNode->sversion); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkTableMetaTversion, pNode->tversion); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToObject(pJson, jkTableMetaComInfo, jsonToTableComInfo, &pNode->tableInfo); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToArray(pJson, jkTableMetaColSchemas, jsonToSchema, pNode->schema, sizeof(SSchema)); } return code; @@ -222,7 +345,22 @@ static int32_t logicPlanNodeToJson(const void* pObj, SJson* pJson) { return code; } +static int32_t jsonToLogicPlanNode(const SJson* pJson, void* pObj) { + SLogicNode* pNode = (SLogicNode*)pObj; + + int32_t code = jsonToNodeList(pJson, jkLogicPlanTargets, &pNode->pTargets); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkLogicPlanConditions, &pNode->pConditions); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkLogicPlanChildren, &pNode->pChildren); + } + + return code; +} + static const char* jkScanLogicPlanScanCols = "ScanCols"; +static const char* jkScanLogicPlanTableMetaSize = "TableMetaSize"; static const char* jkScanLogicPlanTableMeta = "TableMeta"; static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { @@ -232,6 +370,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkScanLogicPlanScanCols, pNode->pScanCols); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanTableMetaSize, TABLE_META_SIZE(pNode->pMeta)); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkScanLogicPlanTableMeta, tableMetaToJson, pNode->pMeta); } @@ -239,6 +380,24 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { return code; } +static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) { + SScanLogicNode* pNode = (SScanLogicNode*)pObj; + + int32_t objSize = 0; + int32_t code = jsonToLogicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkScanLogicPlanScanCols, &pNode->pScanCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkScanLogicPlanTableMetaSize, &objSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonMakeObject(pJson, jkScanLogicPlanTableMeta, jsonToTableMeta, (void**)&pNode->pMeta, objSize); + } + + return code; +} + static const char* jkProjectLogicPlanProjections = "Projections"; static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) { @@ -252,6 +411,17 @@ static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) { return code; } +static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) { + SProjectLogicNode* pNode = (SProjectLogicNode*)pObj; + + int32_t code = jsonToLogicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkProjectLogicPlanProjections, &pNode->pProjections); + } + + return code; +} + static const char* jkJoinLogicPlanJoinType = "JoinType"; static const char* jkJoinLogicPlanOnConditions = "OnConditions"; @@ -1739,6 +1909,45 @@ static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) { return code; } +static const char* jkCreateTopicStmtTopicName = "TopicName"; +static const char* jkCreateTopicStmtSubscribeDbName = "SubscribeDbName"; +static const char* jkCreateTopicStmtIgnoreExists = "IgnoreExists"; +static const char* jkCreateTopicStmtQuery = "Query"; + +static int32_t createTopicStmtToJson(const void* pObj, SJson* pJson) { + const SCreateTopicStmt* pNode = (const SCreateTopicStmt*)pObj; + + int32_t code = tjsonAddStringToObject(pJson, jkCreateTopicStmtTopicName, pNode->topicName); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddStringToObject(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subscribeDbName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkCreateTopicStmtIgnoreExists, pNode->ignoreExists); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkCreateTopicStmtQuery, nodeToJson, pNode->pQuery); + } + + return code; +} + +static int32_t jsonToCreateTopicStmt(const SJson* pJson, void* pObj) { + SCreateTopicStmt* pNode = (SCreateTopicStmt*)pObj; + + int32_t code = tjsonGetStringValue(pJson, jkCreateTopicStmtTopicName, pNode->topicName); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subscribeDbName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkCreateTopicStmtIgnoreExists, &pNode->ignoreExists); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkCreateTopicStmtQuery, &pNode->pQuery); + } + + return code; +} + static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { switch (nodeType(pObj)) { case QUERY_NODE_COLUMN: @@ -1790,6 +1999,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_SHOW_DATABASES_STMT: case QUERY_NODE_SHOW_TABLES_STMT: break; + case QUERY_NODE_CREATE_TOPIC_STMT: + return createTopicStmtToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN_SCAN: return logicScanNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN_JOIN: @@ -1877,14 +2088,16 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { // break; case QUERY_NODE_SELECT_STMT: return jsonToSelectStmt(pJson, pObj); - // case QUERY_NODE_LOGIC_PLAN_SCAN: - // return jsonToLogicScanNode(pJson, pObj); + case QUERY_NODE_CREATE_TOPIC_STMT: + return jsonToCreateTopicStmt(pJson, pObj); + case QUERY_NODE_LOGIC_PLAN_SCAN: + return jsonToLogicScanNode(pJson, pObj); // case QUERY_NODE_LOGIC_PLAN_JOIN: // return jsonToLogicJoinNode(pJson, pObj); // case QUERY_NODE_LOGIC_PLAN_AGG: // return jsonToLogicAggNode(pJson, pObj); - // case QUERY_NODE_LOGIC_PLAN_PROJECT: - // return jsonToLogicProjectNode(pJson, pObj); + case QUERY_NODE_LOGIC_PLAN_PROJECT: + return jsonToLogicProjectNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: return jsonToPhysiTagScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 171ec81ca2..70652901ef 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -499,7 +499,7 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) } static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNode* pRealTable) { - if (pCxt->streamQuery) { + if (pCxt->topicQuery) { return TSDB_CODE_SUCCESS; } @@ -1393,7 +1393,7 @@ static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* p SCMCreateTopicReq createReq = {0}; if (NULL != pStmt->pQuery) { - pCxt->pParseCxt->streamQuery = true; + pCxt->pParseCxt->topicQuery = true; int32_t code = translateQuery(pCxt, pStmt->pQuery); if (TSDB_CODE_SUCCESS == code) { code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index cbc3921711..017de7e70d 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -131,7 +131,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect TSWAP(pScan->pMeta, pRealTable->pMeta, STableMeta*); TSWAP(pScan->pVgroupList, pRealTable->pVgroupList, SVgroupsInfo*); - pScan->scanType = pCxt->pPlanCxt->streamQuery ? SCAN_TYPE_STREAM : SCAN_TYPE_TABLE; + pScan->scanType = pCxt->pPlanCxt->topicQuery ? SCAN_TYPE_TOPIC : SCAN_TYPE_TABLE; pScan->scanFlag = MAIN_SCAN; pScan->scanRange = TSWINDOW_INITIALIZER; pScan->tableName.type = TSDB_TABLE_NAME_T; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 9bd926b676..ebff05e2b7 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -272,6 +272,7 @@ static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpl return createTagScanPhysiNode(pCxt, pScanLogicNode); case SCAN_TYPE_TABLE: return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode); + case SCAN_TYPE_TOPIC: case SCAN_TYPE_STREAM: return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode); default: @@ -472,11 +473,20 @@ static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC } static SPhysiNode* createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode) { - SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE); - CHECK_ALLOC(pExchange, NULL); - CHECK_CODE(addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pExchange->node.pOutputDataBlockDesc), (SPhysiNode*)pExchange); - pExchange->srcGroupId = pExchangeLogicNode->srcGroupId; - return (SPhysiNode*)pExchange; + if (pCxt->pPlanCxt->streamQuery) { + SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); + CHECK_ALLOC(pScan, NULL); + pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets); + CHECK_ALLOC(pScan->pScanCols, (SPhysiNode*)pScan); + CHECK_CODE(addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pScan->node.pOutputDataBlockDesc), (SPhysiNode*)pScan); + return (SPhysiNode*)pScan; + } else { + SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE); + CHECK_ALLOC(pExchange, NULL); + CHECK_CODE(addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pExchange->node.pOutputDataBlockDesc), (SPhysiNode*)pExchange); + pExchange->srcGroupId = pExchangeLogicNode->srcGroupId; + return (SPhysiNode*)pExchange; + } } static SPhysiNode* createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode) { @@ -614,7 +624,9 @@ static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLog taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); } else { pSubplan->pNode = createPhysiNode(pCxt, pSubplan, pLogicSubplan->pNode); - pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode); + if (!pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) { + pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode); + } pSubplan->msgType = TDMT_VND_QUERY; } return pSubplan; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 1d0cbf22df..c8e0852b46 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -44,7 +44,8 @@ typedef struct SStsInfo { } SStsInfo; static SLogicNode* stsMatchByNode(SLogicNode* pNode) { - if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType && + SCAN_TYPE_TOPIC != ((SScanLogicNode*)pNode)->scanType) { return pNode; } SNode* pChild; diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index 8baa64d8fb..ccfdb19885 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -17,6 +17,7 @@ #include +#include "cmdnodes.h" #include "parser.h" #include "planInt.h" @@ -56,7 +57,8 @@ protected: const string syntaxTreeStr = toString(query_->pRoot, false); SLogicNode* pLogicPlan = nullptr; - SPlanContext cxt = { .queryId = 1, .acctId = 0, .pAstRoot = query_->pRoot }; + SPlanContext cxt = { .queryId = 1, .acctId = 0 }; + setPlanContext(query_, &cxt); code = createLogicPlan(&cxt, &pLogicPlan); if (code != TSDB_CODE_SUCCESS) { cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl; @@ -94,6 +96,15 @@ protected: private: static const int max_err_len = 1024; + void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) { + if (QUERY_NODE_CREATE_TOPIC_STMT == nodeType(pQuery->pRoot)) { + pCxt->pAstRoot = ((SCreateTopicStmt*)pQuery->pRoot)->pQuery; + pCxt->topicQuery = true; + } else { + pCxt->pAstRoot = pQuery->pRoot; + } + } + void reset() { memset(&cxt_, 0, sizeof(cxt_)); memset(errMagBuf_, 0, max_err_len); @@ -173,3 +184,10 @@ TEST_F(PlannerTest, interval) { bind("SELECT count(*) FROM t1 interval(10s)"); ASSERT_TRUE(run()); } + +TEST_F(PlannerTest, createTopic) { + setDatabase("root", "test"); + + bind("create topic tp as SELECT * FROM st1"); + ASSERT_TRUE(run()); +}