diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h
index 18417cb608..cbc1f6c1f8 100644
--- a/include/libs/nodes/plannodes.h
+++ b/include/libs/nodes/plannodes.h
@@ -36,6 +36,7 @@ typedef enum EScanType {
SCAN_TYPE_TAG,
SCAN_TYPE_TABLE,
SCAN_TYPE_STABLE,
+ SCAN_TYPE_TOPIC,
SCAN_TYPE_STREAM
} EScanType;
@@ -154,7 +155,7 @@ typedef struct SPhysiNode {
} SPhysiNode;
typedef struct SScanPhysiNode {
- SPhysiNode node;
+ SPhysiNode node;
SNodeList* pScanCols;
uint64_t uid; // unique id of the table
int8_t tableType;
diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h
index a55a0e218d..2c733454dc 100644
--- a/include/libs/nodes/querynodes.h
+++ b/include/libs/nodes/querynodes.h
@@ -23,7 +23,8 @@ extern "C" {
#include "nodes.h"
#include "tmsg.h"
-#define TABLE_META_SIZE(pMeta) (NULL == (pMeta) ? 0 : (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfColumns + (pMeta)->tableInfo.numOfTags) * sizeof(SSchema)))
+#define TABLE_TOTAL_COL_NUM(pMeta) ((pMeta)->tableInfo.numOfColumns + (pMeta)->tableInfo.numOfTags)
+#define TABLE_META_SIZE(pMeta) (NULL == (pMeta) ? 0 : (sizeof(STableMeta) + TABLE_TOTAL_COL_NUM((pMeta)) * sizeof(SSchema)))
#define VGROUPS_INFO_SIZE(pInfo) (NULL == (pInfo) ? 0 : (sizeof(SVgroupsInfo) + (pInfo)->numOfVgroups * sizeof(SVgroupInfo)))
typedef struct SRawExprNode {
diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h
index 23bcdabb1b..74a15e2d18 100644
--- a/include/libs/parser/parser.h
+++ b/include/libs/parser/parser.h
@@ -26,7 +26,7 @@ typedef struct SParseContext {
uint64_t requestId;
int32_t acctId;
const char *db;
- bool streamQuery;
+ bool topicQuery;
void *pTransporter;
SEpSet mgmtEpSet;
const char *pSql; // sql string
diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h
index 7eb9d038a5..4ba04d1713 100644
--- a/include/libs/planner/planner.h
+++ b/include/libs/planner/planner.h
@@ -26,6 +26,7 @@ typedef struct SPlanContext {
uint64_t queryId;
int32_t acctId;
SNode* pAstRoot;
+ bool topicQuery;
bool streamQuery;
} SPlanContext;
diff --git a/include/util/tjson.h b/include/util/tjson.h
index b27f3b93ac..6b2221f704 100644
--- a/include/util/tjson.h
+++ b/include/util/tjson.h
@@ -22,6 +22,14 @@
extern "C" {
#endif
+#define tjsonGetNumberValue(pJson, pName, val) \
+ ({ \
+ uint64_t _tmp = 0; \
+ int32_t _code = tjsonGetUBigIntValue(pJson, pName, &_tmp); \
+ val = _tmp; \
+ _code; \
+ })
+
typedef void SJson;
SJson* tjsonCreateObject();
diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h
index e341729a8f..4067807eaf 100644
--- a/source/client/inc/clientInt.h
+++ b/source/client/inc/clientInt.h
@@ -230,7 +230,7 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
-int32_t parseSql(SRequestObj* pRequest, bool streamQuery, SQuery** pQuery);
+int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
// --- heartbeat
diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c
index aa94ed42fd..a5752a6f2e 100644
--- a/source/client/src/clientImpl.c
+++ b/source/client/src/clientImpl.c
@@ -137,14 +137,14 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
return TSDB_CODE_SUCCESS;
}
-int32_t parseSql(SRequestObj* pRequest, bool streamQuery, SQuery** pQuery) {
+int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
STscObj* pTscObj = pRequest->pTscObj;
SParseContext cxt = {
.requestId = pRequest->requestId,
.acctId = pTscObj->acctId,
.db = pRequest->pDb,
- .streamQuery = streamQuery,
+ .topicQuery = topicQuery,
.pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf,
diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c
index e913cae1ac..6cdf56f93e 100644
--- a/source/dnode/mnode/impl/src/mndTopic.c
+++ b/source/dnode/mnode/impl/src/mndTopic.c
@@ -246,7 +246,7 @@ static int32_t mndGetPlanString(SCMCreateTopicReq *pCreate, char **pStr) {
SQueryPlan* pPlan = NULL;
if (TSDB_CODE_SUCCESS == code) {
- SPlanContext cxt = { .pAstRoot = pAst, .streamQuery = true };
+ SPlanContext cxt = { .pAstRoot = pAst, .topicQuery = true };
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
}
diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c
index c61a2d71fb..9b9eaaa6a9 100644
--- a/source/libs/nodes/src/nodesCodeFuncs.c
+++ b/source/libs/nodes/src/nodesCodeFuncs.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+#include "cmdnodes.h"
#include "nodesUtil.h"
#include "plannodes.h"
#include "querynodes.h"
@@ -85,6 +86,8 @@ const char* nodesNodeName(ENodeType type) {
return "ShowDatabaseStmt";
case QUERY_NODE_SHOW_TABLES_STMT:
return "ShowTablesStmt";
+ case QUERY_NODE_CREATE_TOPIC_STMT:
+ return "CreateTopicStmt";
case QUERY_NODE_LOGIC_PLAN_SCAN:
return "LogicScan";
case QUERY_NODE_LOGIC_PLAN_JOIN:
@@ -179,16 +182,118 @@ static int32_t jsonToNodeList(const SJson* pJson, const char* pName, SNodeList**
return jsonToNodeListImpl(tjsonGetObjectItem(pJson, pName), pList);
}
-static const char* jkTableMetaUid = "TableMetaUid";
-static const char* jkTableMetaSuid = "TableMetaSuid";
+static const char* jkTableComInfoNumOfTags = "NumOfTags";
+static const char* jkTableComInfoPrecision = "Precision";
+static const char* jkTableComInfoNumOfColumns = "NumOfColumns";
+static const char* jkTableComInfoRowSize = "RowSize";
+
+static int32_t tableComInfoToJson(const void* pObj, SJson* pJson) {
+ const STableComInfo* pNode = (const STableComInfo*)pObj;
+
+ int32_t code = tjsonAddIntegerToObject(pJson, jkTableComInfoNumOfTags, pNode->numOfTags);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkTableComInfoPrecision, pNode->precision);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkTableComInfoRowSize, pNode->rowSize);
+ }
+
+ return code;
+}
+
+static int32_t jsonToTableComInfo(const SJson* pJson, void* pObj) {
+ STableComInfo* pNode = (STableComInfo*)pObj;
+
+ int32_t code = tjsonGetNumberValue(pJson, jkTableComInfoNumOfTags, pNode->numOfTags);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetNumberValue(pJson, jkTableComInfoPrecision, pNode->precision);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetNumberValue(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetNumberValue(pJson, jkTableComInfoRowSize, pNode->rowSize);
+ }
+
+ return code;
+}
+
+static const char* jkSchemaType = "Type";
+static const char* jkSchemaColId = "ColId";
+static const char* jkSchemaBytes = "bytes";
+static const char* jkSchemaName = "Name";
+
+static int32_t schemaToJson(const void* pObj, SJson* pJson) {
+ const SSchema* pNode = (const SSchema*)pObj;
+
+ int32_t code = tjsonAddIntegerToObject(pJson, jkSchemaType, pNode->type);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkSchemaColId, pNode->colId);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkSchemaBytes, pNode->bytes);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddStringToObject(pJson, jkSchemaName, pNode->name);
+ }
+
+ return code;
+}
+
+static int32_t jsonToSchema(const SJson* pJson, void* pObj) {
+ SSchema* pNode = (SSchema*)pObj;
+
+ int32_t code = tjsonGetNumberValue(pJson, jkSchemaType, pNode->type);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetNumberValue(pJson, jkSchemaColId, pNode->colId);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetNumberValue(pJson, jkSchemaBytes, pNode->bytes);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetStringValue(pJson, jkSchemaName, pNode->name);
+ }
+
+ return code;
+}
+
+static const char* jkTableMetaVgId = "VgId";
+static const char* jkTableMetaTableType = "TableType";
+static const char* jkTableMetaUid = "Uid";
+static const char* jkTableMetaSuid = "Suid";
+static const char* jkTableMetaSversion = "Sversion";
+static const char* jkTableMetaTversion = "Tversion";
+static const char* jkTableMetaComInfo = "ComInfo";
+static const char* jkTableMetaColSchemas = "ColSchemas";
static int32_t tableMetaToJson(const void* pObj, SJson* pJson) {
const STableMeta* pNode = (const STableMeta*)pObj;
- int32_t code = tjsonAddIntegerToObject(pJson, jkTableMetaUid, pNode->uid);
+ int32_t code = tjsonAddIntegerToObject(pJson, jkTableMetaVgId, pNode->vgId);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkTableMetaTableType, pNode->tableType);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkTableMetaUid, pNode->uid);
+ }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableMetaSuid, pNode->suid);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkTableMetaSversion, pNode->sversion);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkTableMetaTversion, pNode->tversion);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddObject(pJson, jkTableMetaComInfo, tableComInfoToJson, &pNode->tableInfo);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddArray(pJson, jkTableMetaColSchemas, schemaToJson, pNode->schema, sizeof(SSchema), TABLE_TOTAL_COL_NUM(pNode));
+ }
return code;
}
@@ -196,9 +301,27 @@ static int32_t tableMetaToJson(const void* pObj, SJson* pJson) {
static int32_t jsonToTableMeta(const SJson* pJson, void* pObj) {
STableMeta* pNode = (STableMeta*)pObj;
- int32_t code = tjsonGetUBigIntValue(pJson, jkTableMetaUid, &pNode->uid);
+ int32_t code = tjsonGetNumberValue(pJson, jkTableMetaVgId, pNode->vgId);
if (TSDB_CODE_SUCCESS == code) {
- code = tjsonGetUBigIntValue(pJson, jkTableMetaSuid, &pNode->suid);
+ code = tjsonGetNumberValue(pJson, jkTableMetaTableType, pNode->tableType);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetNumberValue(pJson, jkTableMetaUid, pNode->uid);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetNumberValue(pJson, jkTableMetaSuid, pNode->suid);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetNumberValue(pJson, jkTableMetaSversion, pNode->sversion);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetNumberValue(pJson, jkTableMetaTversion, pNode->tversion);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonToObject(pJson, jkTableMetaComInfo, jsonToTableComInfo, &pNode->tableInfo);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonToArray(pJson, jkTableMetaColSchemas, jsonToSchema, pNode->schema, sizeof(SSchema));
}
return code;
@@ -222,7 +345,22 @@ static int32_t logicPlanNodeToJson(const void* pObj, SJson* pJson) {
return code;
}
+static int32_t jsonToLogicPlanNode(const SJson* pJson, void* pObj) {
+ SLogicNode* pNode = (SLogicNode*)pObj;
+
+ int32_t code = jsonToNodeList(pJson, jkLogicPlanTargets, &pNode->pTargets);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = jsonToNodeObject(pJson, jkLogicPlanConditions, &pNode->pConditions);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = jsonToNodeList(pJson, jkLogicPlanChildren, &pNode->pChildren);
+ }
+
+ return code;
+}
+
static const char* jkScanLogicPlanScanCols = "ScanCols";
+static const char* jkScanLogicPlanTableMetaSize = "TableMetaSize";
static const char* jkScanLogicPlanTableMeta = "TableMeta";
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
@@ -232,6 +370,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkScanLogicPlanScanCols, pNode->pScanCols);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanTableMetaSize, TABLE_META_SIZE(pNode->pMeta));
+ }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkScanLogicPlanTableMeta, tableMetaToJson, pNode->pMeta);
}
@@ -239,6 +380,24 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
return code;
}
+static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
+ SScanLogicNode* pNode = (SScanLogicNode*)pObj;
+
+ int32_t objSize = 0;
+ int32_t code = jsonToLogicPlanNode(pJson, pObj);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = jsonToNodeList(pJson, jkScanLogicPlanScanCols, &pNode->pScanCols);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetIntValue(pJson, jkScanLogicPlanTableMetaSize, &objSize);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonMakeObject(pJson, jkScanLogicPlanTableMeta, jsonToTableMeta, (void**)&pNode->pMeta, objSize);
+ }
+
+ return code;
+}
+
static const char* jkProjectLogicPlanProjections = "Projections";
static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) {
@@ -252,6 +411,17 @@ static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) {
return code;
}
+static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) {
+ SProjectLogicNode* pNode = (SProjectLogicNode*)pObj;
+
+ int32_t code = jsonToLogicPlanNode(pJson, pObj);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = jsonToNodeList(pJson, jkProjectLogicPlanProjections, &pNode->pProjections);
+ }
+
+ return code;
+}
+
static const char* jkJoinLogicPlanJoinType = "JoinType";
static const char* jkJoinLogicPlanOnConditions = "OnConditions";
@@ -1739,6 +1909,45 @@ static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) {
return code;
}
+static const char* jkCreateTopicStmtTopicName = "TopicName";
+static const char* jkCreateTopicStmtSubscribeDbName = "SubscribeDbName";
+static const char* jkCreateTopicStmtIgnoreExists = "IgnoreExists";
+static const char* jkCreateTopicStmtQuery = "Query";
+
+static int32_t createTopicStmtToJson(const void* pObj, SJson* pJson) {
+ const SCreateTopicStmt* pNode = (const SCreateTopicStmt*)pObj;
+
+ int32_t code = tjsonAddStringToObject(pJson, jkCreateTopicStmtTopicName, pNode->topicName);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddStringToObject(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subscribeDbName);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddBoolToObject(pJson, jkCreateTopicStmtIgnoreExists, pNode->ignoreExists);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddObject(pJson, jkCreateTopicStmtQuery, nodeToJson, pNode->pQuery);
+ }
+
+ return code;
+}
+
+static int32_t jsonToCreateTopicStmt(const SJson* pJson, void* pObj) {
+ SCreateTopicStmt* pNode = (SCreateTopicStmt*)pObj;
+
+ int32_t code = tjsonGetStringValue(pJson, jkCreateTopicStmtTopicName, pNode->topicName);
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetStringValue(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subscribeDbName);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetBoolValue(pJson, jkCreateTopicStmtIgnoreExists, &pNode->ignoreExists);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = jsonToNodeObject(pJson, jkCreateTopicStmtQuery, &pNode->pQuery);
+ }
+
+ return code;
+}
+
static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
switch (nodeType(pObj)) {
case QUERY_NODE_COLUMN:
@@ -1790,6 +1999,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_SHOW_DATABASES_STMT:
case QUERY_NODE_SHOW_TABLES_STMT:
break;
+ case QUERY_NODE_CREATE_TOPIC_STMT:
+ return createTopicStmtToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_SCAN:
return logicScanNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_JOIN:
@@ -1877,14 +2088,16 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
// break;
case QUERY_NODE_SELECT_STMT:
return jsonToSelectStmt(pJson, pObj);
- // case QUERY_NODE_LOGIC_PLAN_SCAN:
- // return jsonToLogicScanNode(pJson, pObj);
+ case QUERY_NODE_CREATE_TOPIC_STMT:
+ return jsonToCreateTopicStmt(pJson, pObj);
+ case QUERY_NODE_LOGIC_PLAN_SCAN:
+ return jsonToLogicScanNode(pJson, pObj);
// case QUERY_NODE_LOGIC_PLAN_JOIN:
// return jsonToLogicJoinNode(pJson, pObj);
// case QUERY_NODE_LOGIC_PLAN_AGG:
// return jsonToLogicAggNode(pJson, pObj);
- // case QUERY_NODE_LOGIC_PLAN_PROJECT:
- // return jsonToLogicProjectNode(pJson, pObj);
+ case QUERY_NODE_LOGIC_PLAN_PROJECT:
+ return jsonToLogicProjectNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return jsonToPhysiTagScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index 171ec81ca2..70652901ef 100644
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -499,7 +499,7 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
}
static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNode* pRealTable) {
- if (pCxt->streamQuery) {
+ if (pCxt->topicQuery) {
return TSDB_CODE_SUCCESS;
}
@@ -1393,7 +1393,7 @@ static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* p
SCMCreateTopicReq createReq = {0};
if (NULL != pStmt->pQuery) {
- pCxt->pParseCxt->streamQuery = true;
+ pCxt->pParseCxt->topicQuery = true;
int32_t code = translateQuery(pCxt, pStmt->pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL);
diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c
index cbc3921711..017de7e70d 100644
--- a/source/libs/planner/src/planLogicCreater.c
+++ b/source/libs/planner/src/planLogicCreater.c
@@ -131,7 +131,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
TSWAP(pScan->pMeta, pRealTable->pMeta, STableMeta*);
TSWAP(pScan->pVgroupList, pRealTable->pVgroupList, SVgroupsInfo*);
- pScan->scanType = pCxt->pPlanCxt->streamQuery ? SCAN_TYPE_STREAM : SCAN_TYPE_TABLE;
+ pScan->scanType = pCxt->pPlanCxt->topicQuery ? SCAN_TYPE_TOPIC : SCAN_TYPE_TABLE;
pScan->scanFlag = MAIN_SCAN;
pScan->scanRange = TSWINDOW_INITIALIZER;
pScan->tableName.type = TSDB_TABLE_NAME_T;
diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c
index 9bd926b676..ebff05e2b7 100644
--- a/source/libs/planner/src/planPhysiCreater.c
+++ b/source/libs/planner/src/planPhysiCreater.c
@@ -272,6 +272,7 @@ static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpl
return createTagScanPhysiNode(pCxt, pScanLogicNode);
case SCAN_TYPE_TABLE:
return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode);
+ case SCAN_TYPE_TOPIC:
case SCAN_TYPE_STREAM:
return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode);
default:
@@ -472,11 +473,20 @@ static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
}
static SPhysiNode* createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode) {
- SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
- CHECK_ALLOC(pExchange, NULL);
- CHECK_CODE(addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pExchange->node.pOutputDataBlockDesc), (SPhysiNode*)pExchange);
- pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
- return (SPhysiNode*)pExchange;
+ if (pCxt->pPlanCxt->streamQuery) {
+ SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
+ CHECK_ALLOC(pScan, NULL);
+ pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets);
+ CHECK_ALLOC(pScan->pScanCols, (SPhysiNode*)pScan);
+ CHECK_CODE(addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pScan->node.pOutputDataBlockDesc), (SPhysiNode*)pScan);
+ return (SPhysiNode*)pScan;
+ } else {
+ SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
+ CHECK_ALLOC(pExchange, NULL);
+ CHECK_CODE(addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pExchange->node.pOutputDataBlockDesc), (SPhysiNode*)pExchange);
+ pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
+ return (SPhysiNode*)pExchange;
+ }
}
static SPhysiNode* createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode) {
@@ -614,7 +624,9 @@ static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLog
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
} else {
pSubplan->pNode = createPhysiNode(pCxt, pSubplan, pLogicSubplan->pNode);
- pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode);
+ if (!pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
+ pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode);
+ }
pSubplan->msgType = TDMT_VND_QUERY;
}
return pSubplan;
diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c
index 1d0cbf22df..c8e0852b46 100644
--- a/source/libs/planner/src/planSpliter.c
+++ b/source/libs/planner/src/planSpliter.c
@@ -44,7 +44,8 @@ typedef struct SStsInfo {
} SStsInfo;
static SLogicNode* stsMatchByNode(SLogicNode* pNode) {
- if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType) {
+ if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType &&
+ SCAN_TYPE_TOPIC != ((SScanLogicNode*)pNode)->scanType) {
return pNode;
}
SNode* pChild;
diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp
index 8baa64d8fb..ccfdb19885 100644
--- a/source/libs/planner/test/plannerTest.cpp
+++ b/source/libs/planner/test/plannerTest.cpp
@@ -17,6 +17,7 @@
#include
+#include "cmdnodes.h"
#include "parser.h"
#include "planInt.h"
@@ -56,7 +57,8 @@ protected:
const string syntaxTreeStr = toString(query_->pRoot, false);
SLogicNode* pLogicPlan = nullptr;
- SPlanContext cxt = { .queryId = 1, .acctId = 0, .pAstRoot = query_->pRoot };
+ SPlanContext cxt = { .queryId = 1, .acctId = 0 };
+ setPlanContext(query_, &cxt);
code = createLogicPlan(&cxt, &pLogicPlan);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl;
@@ -94,6 +96,15 @@ protected:
private:
static const int max_err_len = 1024;
+ void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) {
+ if (QUERY_NODE_CREATE_TOPIC_STMT == nodeType(pQuery->pRoot)) {
+ pCxt->pAstRoot = ((SCreateTopicStmt*)pQuery->pRoot)->pQuery;
+ pCxt->topicQuery = true;
+ } else {
+ pCxt->pAstRoot = pQuery->pRoot;
+ }
+ }
+
void reset() {
memset(&cxt_, 0, sizeof(cxt_));
memset(errMagBuf_, 0, max_err_len);
@@ -173,3 +184,10 @@ TEST_F(PlannerTest, interval) {
bind("SELECT count(*) FROM t1 interval(10s)");
ASSERT_TRUE(run());
}
+
+TEST_F(PlannerTest, createTopic) {
+ setDatabase("root", "test");
+
+ bind("create topic tp as SELECT * FROM st1");
+ ASSERT_TRUE(run());
+}