feat: sql command 'create topic as {database | stable| query}'
This commit is contained in:
parent
9342ef3ef7
commit
fdd00a6582
|
@ -1482,14 +1482,11 @@ enum {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
|
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
|
||||||
int8_t igExists;
|
int8_t igExists;
|
||||||
// int8_t withTbName;
|
|
||||||
// int8_t withSchema;
|
|
||||||
// int8_t withTag;
|
|
||||||
int8_t subType;
|
int8_t subType;
|
||||||
char* sql;
|
char* sql;
|
||||||
union {
|
union {
|
||||||
char* ast;
|
char* ast;
|
||||||
char subDbName[TSDB_DB_NAME_LEN];
|
char subDbName[TSDB_DB_FNAME_LEN];
|
||||||
char subStbName[TSDB_TABLE_FNAME_LEN];
|
char subStbName[TSDB_TABLE_FNAME_LEN];
|
||||||
};
|
};
|
||||||
} SCMCreateTopicReq;
|
} SCMCreateTopicReq;
|
||||||
|
|
|
@ -163,98 +163,96 @@
|
||||||
#define TK_TOPIC 145
|
#define TK_TOPIC 145
|
||||||
#define TK_AS 146
|
#define TK_AS 146
|
||||||
#define TK_CGROUP 147
|
#define TK_CGROUP 147
|
||||||
#define TK_WITH 148
|
#define TK_DESC 148
|
||||||
#define TK_SCHEMA 149
|
#define TK_DESCRIBE 149
|
||||||
#define TK_DESC 150
|
#define TK_RESET 150
|
||||||
#define TK_DESCRIBE 151
|
#define TK_QUERY 151
|
||||||
#define TK_RESET 152
|
#define TK_CACHE 152
|
||||||
#define TK_QUERY 153
|
#define TK_EXPLAIN 153
|
||||||
#define TK_CACHE 154
|
#define TK_ANALYZE 154
|
||||||
#define TK_EXPLAIN 155
|
#define TK_VERBOSE 155
|
||||||
#define TK_ANALYZE 156
|
#define TK_NK_BOOL 156
|
||||||
#define TK_VERBOSE 157
|
#define TK_RATIO 157
|
||||||
#define TK_NK_BOOL 158
|
#define TK_COMPACT 158
|
||||||
#define TK_RATIO 159
|
#define TK_VNODES 159
|
||||||
#define TK_COMPACT 160
|
#define TK_IN 160
|
||||||
#define TK_VNODES 161
|
#define TK_OUTPUTTYPE 161
|
||||||
#define TK_IN 162
|
#define TK_AGGREGATE 162
|
||||||
#define TK_OUTPUTTYPE 163
|
#define TK_BUFSIZE 163
|
||||||
#define TK_AGGREGATE 164
|
#define TK_STREAM 164
|
||||||
#define TK_BUFSIZE 165
|
#define TK_INTO 165
|
||||||
#define TK_STREAM 166
|
#define TK_TRIGGER 166
|
||||||
#define TK_INTO 167
|
#define TK_AT_ONCE 167
|
||||||
#define TK_TRIGGER 168
|
#define TK_WINDOW_CLOSE 168
|
||||||
#define TK_AT_ONCE 169
|
#define TK_WATERMARK 169
|
||||||
#define TK_WINDOW_CLOSE 170
|
#define TK_KILL 170
|
||||||
#define TK_WATERMARK 171
|
#define TK_CONNECTION 171
|
||||||
#define TK_KILL 172
|
#define TK_TRANSACTION 172
|
||||||
#define TK_CONNECTION 173
|
#define TK_MERGE 173
|
||||||
#define TK_TRANSACTION 174
|
#define TK_VGROUP 174
|
||||||
#define TK_MERGE 175
|
#define TK_REDISTRIBUTE 175
|
||||||
#define TK_VGROUP 176
|
#define TK_SPLIT 176
|
||||||
#define TK_REDISTRIBUTE 177
|
#define TK_SYNCDB 177
|
||||||
#define TK_SPLIT 178
|
#define TK_NULL 178
|
||||||
#define TK_SYNCDB 179
|
#define TK_NK_QUESTION 179
|
||||||
#define TK_NULL 180
|
#define TK_NK_ARROW 180
|
||||||
#define TK_NK_QUESTION 181
|
#define TK_ROWTS 181
|
||||||
#define TK_NK_ARROW 182
|
#define TK_TBNAME 182
|
||||||
#define TK_ROWTS 183
|
#define TK_QSTARTTS 183
|
||||||
#define TK_TBNAME 184
|
#define TK_QENDTS 184
|
||||||
#define TK_QSTARTTS 185
|
#define TK_WSTARTTS 185
|
||||||
#define TK_QENDTS 186
|
#define TK_WENDTS 186
|
||||||
#define TK_WSTARTTS 187
|
#define TK_WDURATION 187
|
||||||
#define TK_WENDTS 188
|
#define TK_CAST 188
|
||||||
#define TK_WDURATION 189
|
#define TK_NOW 189
|
||||||
#define TK_CAST 190
|
#define TK_TODAY 190
|
||||||
#define TK_NOW 191
|
#define TK_TIMEZONE 191
|
||||||
#define TK_TODAY 192
|
#define TK_COUNT 192
|
||||||
#define TK_TIMEZONE 193
|
#define TK_FIRST 193
|
||||||
#define TK_COUNT 194
|
#define TK_LAST 194
|
||||||
#define TK_FIRST 195
|
#define TK_LAST_ROW 195
|
||||||
#define TK_LAST 196
|
#define TK_BETWEEN 196
|
||||||
#define TK_LAST_ROW 197
|
#define TK_IS 197
|
||||||
#define TK_BETWEEN 198
|
#define TK_NK_LT 198
|
||||||
#define TK_IS 199
|
#define TK_NK_GT 199
|
||||||
#define TK_NK_LT 200
|
#define TK_NK_LE 200
|
||||||
#define TK_NK_GT 201
|
#define TK_NK_GE 201
|
||||||
#define TK_NK_LE 202
|
#define TK_NK_NE 202
|
||||||
#define TK_NK_GE 203
|
#define TK_MATCH 203
|
||||||
#define TK_NK_NE 204
|
#define TK_NMATCH 204
|
||||||
#define TK_MATCH 205
|
#define TK_CONTAINS 205
|
||||||
#define TK_NMATCH 206
|
#define TK_JOIN 206
|
||||||
#define TK_CONTAINS 207
|
#define TK_INNER 207
|
||||||
#define TK_JOIN 208
|
#define TK_SELECT 208
|
||||||
#define TK_INNER 209
|
#define TK_DISTINCT 209
|
||||||
#define TK_SELECT 210
|
#define TK_WHERE 210
|
||||||
#define TK_DISTINCT 211
|
#define TK_PARTITION 211
|
||||||
#define TK_WHERE 212
|
#define TK_BY 212
|
||||||
#define TK_PARTITION 213
|
#define TK_SESSION 213
|
||||||
#define TK_BY 214
|
#define TK_STATE_WINDOW 214
|
||||||
#define TK_SESSION 215
|
#define TK_SLIDING 215
|
||||||
#define TK_STATE_WINDOW 216
|
#define TK_FILL 216
|
||||||
#define TK_SLIDING 217
|
#define TK_VALUE 217
|
||||||
#define TK_FILL 218
|
#define TK_NONE 218
|
||||||
#define TK_VALUE 219
|
#define TK_PREV 219
|
||||||
#define TK_NONE 220
|
#define TK_LINEAR 220
|
||||||
#define TK_PREV 221
|
#define TK_NEXT 221
|
||||||
#define TK_LINEAR 222
|
#define TK_GROUP 222
|
||||||
#define TK_NEXT 223
|
#define TK_HAVING 223
|
||||||
#define TK_GROUP 224
|
#define TK_ORDER 224
|
||||||
#define TK_HAVING 225
|
#define TK_SLIMIT 225
|
||||||
#define TK_ORDER 226
|
#define TK_SOFFSET 226
|
||||||
#define TK_SLIMIT 227
|
#define TK_LIMIT 227
|
||||||
#define TK_SOFFSET 228
|
#define TK_OFFSET 228
|
||||||
#define TK_LIMIT 229
|
#define TK_ASC 229
|
||||||
#define TK_OFFSET 230
|
#define TK_NULLS 230
|
||||||
#define TK_ASC 231
|
#define TK_ID 231
|
||||||
#define TK_NULLS 232
|
#define TK_NK_BITNOT 232
|
||||||
#define TK_ID 233
|
#define TK_INSERT 233
|
||||||
#define TK_NK_BITNOT 234
|
#define TK_VALUES 234
|
||||||
#define TK_INSERT 235
|
#define TK_IMPORT 235
|
||||||
#define TK_VALUES 236
|
#define TK_NK_SEMI 236
|
||||||
#define TK_IMPORT 237
|
#define TK_FILE 237
|
||||||
#define TK_NK_SEMI 238
|
|
||||||
#define TK_FILE 239
|
|
||||||
|
|
||||||
#define TK_NK_SPACE 300
|
#define TK_NK_SPACE 300
|
||||||
#define TK_NK_COMMENT 301
|
#define TK_NK_COMMENT 301
|
||||||
|
|
|
@ -239,20 +239,13 @@ typedef struct SDropComponentNodeStmt {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
} SDropComponentNodeStmt;
|
} SDropComponentNodeStmt;
|
||||||
|
|
||||||
typedef struct STopicOptions {
|
|
||||||
ENodeType type;
|
|
||||||
bool withTable;
|
|
||||||
bool withSchema;
|
|
||||||
bool withTag;
|
|
||||||
} STopicOptions;
|
|
||||||
|
|
||||||
typedef struct SCreateTopicStmt {
|
typedef struct SCreateTopicStmt {
|
||||||
ENodeType type;
|
ENodeType type;
|
||||||
char topicName[TSDB_TABLE_NAME_LEN];
|
char topicName[TSDB_TABLE_NAME_LEN];
|
||||||
char subscribeDbName[TSDB_DB_NAME_LEN];
|
char subDbName[TSDB_DB_NAME_LEN];
|
||||||
bool ignoreExists;
|
char subSTbName[TSDB_TABLE_NAME_LEN];
|
||||||
SNode* pQuery;
|
bool ignoreExists;
|
||||||
STopicOptions* pOptions;
|
SNode* pQuery;
|
||||||
} SCreateTopicStmt;
|
} SCreateTopicStmt;
|
||||||
|
|
||||||
typedef struct SDropTopicStmt {
|
typedef struct SDropTopicStmt {
|
||||||
|
|
|
@ -95,7 +95,6 @@ typedef enum ENodeType {
|
||||||
QUERY_NODE_INDEX_OPTIONS,
|
QUERY_NODE_INDEX_OPTIONS,
|
||||||
QUERY_NODE_EXPLAIN_OPTIONS,
|
QUERY_NODE_EXPLAIN_OPTIONS,
|
||||||
QUERY_NODE_STREAM_OPTIONS,
|
QUERY_NODE_STREAM_OPTIONS,
|
||||||
QUERY_NODE_TOPIC_OPTIONS,
|
|
||||||
QUERY_NODE_LEFT_VALUE,
|
QUERY_NODE_LEFT_VALUE,
|
||||||
|
|
||||||
// Statement nodes are used in parser and planner module.
|
// Statement nodes are used in parser and planner module.
|
||||||
|
|
|
@ -2666,27 +2666,23 @@ int32_t tDeserializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTopicReq *pReq) {
|
int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTopicReq *pReq) {
|
||||||
int32_t sqlLen = 0;
|
|
||||||
int32_t astLen = 0;
|
|
||||||
if (pReq->sql != NULL) sqlLen = (int32_t)strlen(pReq->sql);
|
|
||||||
if (pReq->ast != NULL) astLen = (int32_t)strlen(pReq->ast);
|
|
||||||
|
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
||||||
/*if (tEncodeI8(&encoder, pReq->withTbName) < 0) return -1;*/
|
|
||||||
/*if (tEncodeI8(&encoder, pReq->withSchema) < 0) return -1;*/
|
|
||||||
/*if (tEncodeI8(&encoder, pReq->withTag) < 0) return -1;*/
|
|
||||||
if (tEncodeI8(&encoder, pReq->subType) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->subType) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->subDbName) < 0) return -1;
|
if (TOPIC_SUB_TYPE__DB == pReq->subType) {
|
||||||
/*if (tEncodeCStr(&encoder, pReq->subStbName) < 0) return -1;*/
|
if (tEncodeCStr(&encoder, pReq->subDbName) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
|
} else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
|
||||||
if (tEncodeI32(&encoder, astLen) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->subStbName) < 0) return -1;
|
||||||
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
|
} else {
|
||||||
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
|
if (tEncodeI32(&encoder, strlen(pReq->ast)) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
|
||||||
|
}
|
||||||
|
if (tEncodeI32(&encoder, strlen(pReq->sql)) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -2705,30 +2701,26 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
||||||
/*if (tDecodeI8(&decoder, &pReq->withTbName) < 0) return -1;*/
|
|
||||||
/*if (tDecodeI8(&decoder, &pReq->withSchema) < 0) return -1;*/
|
|
||||||
/*if (tDecodeI8(&decoder, &pReq->withTag) < 0) return -1;*/
|
|
||||||
if (tDecodeI8(&decoder, &pReq->subType) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->subType) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->subDbName) < 0) return -1;
|
if (TOPIC_SUB_TYPE__DB == pReq->subType) {
|
||||||
/*if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {*/
|
if (tDecodeCStrTo(&decoder, pReq->subDbName) < 0) return -1;
|
||||||
/*if (tDecodeCStrTo(&decoder, pReq->subStbName) < 0) return -1;*/
|
} else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
|
||||||
/*}*/
|
if (tDecodeCStrTo(&decoder, pReq->subStbName) < 0) return -1;
|
||||||
|
} else {
|
||||||
|
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
|
||||||
|
if (astLen > 0) {
|
||||||
|
pReq->ast = taosMemoryCalloc(1, astLen + 1);
|
||||||
|
if (pReq->ast == NULL) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
|
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
|
|
||||||
|
|
||||||
if (sqlLen > 0) {
|
if (sqlLen > 0) {
|
||||||
pReq->sql = taosMemoryCalloc(1, sqlLen + 1);
|
pReq->sql = taosMemoryCalloc(1, sqlLen + 1);
|
||||||
if (pReq->sql == NULL) return -1;
|
if (pReq->sql == NULL) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (astLen > 0) {
|
|
||||||
pReq->ast = taosMemoryCalloc(1, astLen + 1);
|
|
||||||
if (pReq->ast == NULL) return -1;
|
|
||||||
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
|
|
||||||
} else {
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
@ -2737,7 +2729,9 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
|
||||||
|
|
||||||
void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) {
|
void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) {
|
||||||
taosMemoryFreeClear(pReq->sql);
|
taosMemoryFreeClear(pReq->sql);
|
||||||
taosMemoryFreeClear(pReq->ast);
|
if (TOPIC_SUB_TYPE__COLUMN == pReq->subType) {
|
||||||
|
taosMemoryFreeClear(pReq->ast);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSerializeSCMCreateTopicRsp(void *buf, int32_t bufLen, const SCMCreateTopicRsp *pRsp) {
|
int32_t tSerializeSCMCreateTopicRsp(void *buf, int32_t bufLen, const SCMCreateTopicRsp *pRsp) {
|
||||||
|
|
|
@ -2534,7 +2534,7 @@ static const char* jkSessionWindowTsPrimaryKey = "TsPrimaryKey";
|
||||||
static const char* jkSessionWindowGap = "Gap";
|
static const char* jkSessionWindowGap = "Gap";
|
||||||
|
|
||||||
static int32_t sessionWindowNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t sessionWindowNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SSessionWindowNode * pNode = (const SSessionWindowNode*)pObj;
|
const SSessionWindowNode* pNode = (const SSessionWindowNode*)pObj;
|
||||||
|
|
||||||
int32_t code = tjsonAddObject(pJson, jkSessionWindowTsPrimaryKey, nodeToJson, pNode->pCol);
|
int32_t code = tjsonAddObject(pJson, jkSessionWindowTsPrimaryKey, nodeToJson, pNode->pCol);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -2546,9 +2546,9 @@ static int32_t sessionWindowNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
static int32_t jsonToSessionWindowNode(const SJson* pJson, void* pObj) {
|
static int32_t jsonToSessionWindowNode(const SJson* pJson, void* pObj) {
|
||||||
SSessionWindowNode* pNode = (SSessionWindowNode*)pObj;
|
SSessionWindowNode* pNode = (SSessionWindowNode*)pObj;
|
||||||
|
|
||||||
int32_t code = jsonToNodeObject(pJson, jkSessionWindowTsPrimaryKey, (SNode **)&pNode->pCol);
|
int32_t code = jsonToNodeObject(pJson, jkSessionWindowTsPrimaryKey, (SNode**)&pNode->pCol);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = jsonToNodeObject(pJson, jkSessionWindowGap, (SNode **)&pNode->pGap);
|
code = jsonToNodeObject(pJson, jkSessionWindowGap, (SNode**)&pNode->pGap);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2987,7 +2987,7 @@ static int32_t createTopicStmtToJson(const void* pObj, SJson* pJson) {
|
||||||
|
|
||||||
int32_t code = tjsonAddStringToObject(pJson, jkCreateTopicStmtTopicName, pNode->topicName);
|
int32_t code = tjsonAddStringToObject(pJson, jkCreateTopicStmtTopicName, pNode->topicName);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddStringToObject(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subscribeDbName);
|
code = tjsonAddStringToObject(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subDbName);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddBoolToObject(pJson, jkCreateTopicStmtIgnoreExists, pNode->ignoreExists);
|
code = tjsonAddBoolToObject(pJson, jkCreateTopicStmtIgnoreExists, pNode->ignoreExists);
|
||||||
|
@ -3004,7 +3004,7 @@ static int32_t jsonToCreateTopicStmt(const SJson* pJson, void* pObj) {
|
||||||
|
|
||||||
int32_t code = tjsonGetStringValue(pJson, jkCreateTopicStmtTopicName, pNode->topicName);
|
int32_t code = tjsonGetStringValue(pJson, jkCreateTopicStmtTopicName, pNode->topicName);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetStringValue(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subscribeDbName);
|
code = tjsonGetStringValue(pJson, jkCreateTopicStmtSubscribeDbName, pNode->subDbName);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetBoolValue(pJson, jkCreateTopicStmtIgnoreExists, &pNode->ignoreExists);
|
code = tjsonGetBoolValue(pJson, jkCreateTopicStmtIgnoreExists, &pNode->ignoreExists);
|
||||||
|
|
|
@ -86,8 +86,6 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
||||||
return makeNode(type, sizeof(SExplainOptions));
|
return makeNode(type, sizeof(SExplainOptions));
|
||||||
case QUERY_NODE_STREAM_OPTIONS:
|
case QUERY_NODE_STREAM_OPTIONS:
|
||||||
return makeNode(type, sizeof(SStreamOptions));
|
return makeNode(type, sizeof(SStreamOptions));
|
||||||
case QUERY_NODE_TOPIC_OPTIONS:
|
|
||||||
return makeNode(type, sizeof(STopicOptions));
|
|
||||||
case QUERY_NODE_LEFT_VALUE:
|
case QUERY_NODE_LEFT_VALUE:
|
||||||
return makeNode(type, sizeof(SLeftValueNode));
|
return makeNode(type, sizeof(SLeftValueNode));
|
||||||
case QUERY_NODE_SET_OPERATOR:
|
case QUERY_NODE_SET_OPERATOR:
|
||||||
|
|
|
@ -168,7 +168,7 @@ SNode* createCreateComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, co
|
||||||
SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId);
|
SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId);
|
||||||
SNode* createTopicOptions(SAstCreateContext* pCxt);
|
SNode* createTopicOptions(SAstCreateContext* pCxt);
|
||||||
SNode* createCreateTopicStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName, SNode* pQuery,
|
SNode* createCreateTopicStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName, SNode* pQuery,
|
||||||
const SToken* pSubscribeDbName, SNode* pOptions);
|
const SToken* pSubDbName, SNode* pRealTable);
|
||||||
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pTopicName);
|
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pTopicName);
|
||||||
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pCGroupId,
|
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pCGroupId,
|
||||||
const SToken* pTopicName);
|
const SToken* pTopicName);
|
||||||
|
|
|
@ -403,18 +403,13 @@ func_list(A) ::= func_list(B) NK_COMMA func(C).
|
||||||
func(A) ::= function_name(B) NK_LP expression_list(C) NK_RP. { A = createFunctionNode(pCxt, &B, C); }
|
func(A) ::= function_name(B) NK_LP expression_list(C) NK_RP. { A = createFunctionNode(pCxt, &B, C); }
|
||||||
|
|
||||||
/************************************************ create/drop topic ***************************************************/
|
/************************************************ create/drop topic ***************************************************/
|
||||||
cmd ::= CREATE TOPIC not_exists_opt(A)
|
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS query_expression(C). { pCxt->pRootNode = createCreateTopicStmt(pCxt, A, &B, C, NULL, NULL); }
|
||||||
topic_name(B) topic_options(D) AS query_expression(C). { pCxt->pRootNode = createCreateTopicStmt(pCxt, A, &B, C, NULL, D); }
|
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmt(pCxt, A, &B, NULL, &C, NULL); }
|
||||||
cmd ::= CREATE TOPIC not_exists_opt(A)
|
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B)
|
||||||
topic_name(B) topic_options(D) AS db_name(C). { pCxt->pRootNode = createCreateTopicStmt(pCxt, A, &B, NULL, &C, D); }
|
AS STABLE full_table_name(C). { pCxt->pRootNode = createCreateTopicStmt(pCxt, A, &B, NULL, NULL, C); }
|
||||||
cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); }
|
cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); }
|
||||||
cmd ::= DROP CGROUP exists_opt(A) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C); }
|
cmd ::= DROP CGROUP exists_opt(A) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C); }
|
||||||
|
|
||||||
topic_options(A) ::= . { A = createTopicOptions(pCxt); }
|
|
||||||
topic_options(A) ::= topic_options(B) WITH TABLE. { ((STopicOptions*)B)->withTable = true; A = B; }
|
|
||||||
topic_options(A) ::= topic_options(B) WITH SCHEMA. { ((STopicOptions*)B)->withSchema = true; A = B; }
|
|
||||||
topic_options(A) ::= topic_options(B) WITH TAG. { ((STopicOptions*)B)->withTag = true; A = B; }
|
|
||||||
|
|
||||||
/************************************************ desc/describe *******************************************************/
|
/************************************************ desc/describe *******************************************************/
|
||||||
cmd ::= DESC full_table_name(A). { pCxt->pRootNode = createDescribeStmt(pCxt, A); }
|
cmd ::= DESC full_table_name(A). { pCxt->pRootNode = createDescribeStmt(pCxt, A); }
|
||||||
cmd ::= DESCRIBE full_table_name(A). { pCxt->pRootNode = createDescribeStmt(pCxt, A); }
|
cmd ::= DESCRIBE full_table_name(A). { pCxt->pRootNode = createDescribeStmt(pCxt, A); }
|
||||||
|
|
|
@ -1265,28 +1265,22 @@ SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, cons
|
||||||
return (SNode*)pStmt;
|
return (SNode*)pStmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNode* createTopicOptions(SAstCreateContext* pCxt) {
|
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
|
||||||
STopicOptions* pOptions = nodesMakeNode(QUERY_NODE_TOPIC_OPTIONS);
|
|
||||||
CHECK_OUT_OF_MEM(pOptions);
|
|
||||||
pOptions->withTable = false;
|
|
||||||
pOptions->withSchema = false;
|
|
||||||
pOptions->withTag = false;
|
|
||||||
return (SNode*)pOptions;
|
|
||||||
}
|
|
||||||
|
|
||||||
SNode* createCreateTopicStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName, SNode* pQuery,
|
SNode* createCreateTopicStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName, SNode* pQuery,
|
||||||
const SToken* pSubscribeDbName, SNode* pOptions) {
|
const SToken* pSubDbName, SNode* pRealTable) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
SCreateTopicStmt* pStmt = nodesMakeNode(QUERY_NODE_CREATE_TOPIC_STMT);
|
SCreateTopicStmt* pStmt = nodesMakeNode(QUERY_NODE_CREATE_TOPIC_STMT);
|
||||||
CHECK_OUT_OF_MEM(pStmt);
|
CHECK_OUT_OF_MEM(pStmt);
|
||||||
strncpy(pStmt->topicName, pTopicName->z, pTopicName->n);
|
strncpy(pStmt->topicName, pTopicName->z, pTopicName->n);
|
||||||
pStmt->ignoreExists = ignoreExists;
|
pStmt->ignoreExists = ignoreExists;
|
||||||
pStmt->pQuery = pQuery;
|
if (NULL != pRealTable) {
|
||||||
if (NULL != pSubscribeDbName) {
|
strcpy(pStmt->subDbName, ((SRealTableNode*)pRealTable)->table.dbName);
|
||||||
strncpy(pStmt->subscribeDbName, pSubscribeDbName->z, pSubscribeDbName->n);
|
strcpy(pStmt->subSTbName, ((SRealTableNode*)pRealTable)->table.tableName);
|
||||||
|
nodesDestroyNode(pRealTable);
|
||||||
|
} else if (NULL != pSubDbName) {
|
||||||
|
strncpy(pStmt->subDbName, pSubDbName->z, pSubDbName->n);
|
||||||
|
} else {
|
||||||
|
pStmt->pQuery = pQuery;
|
||||||
}
|
}
|
||||||
pStmt->pOptions = (STopicOptions*)pOptions;
|
|
||||||
return (SNode*)pStmt;
|
return (SNode*)pStmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -156,7 +156,6 @@ static SKeyword keywordTable[] = {
|
||||||
{"RETENTIONS", TK_RETENTIONS},
|
{"RETENTIONS", TK_RETENTIONS},
|
||||||
{"REVOKE", TK_REVOKE},
|
{"REVOKE", TK_REVOKE},
|
||||||
{"ROLLUP", TK_ROLLUP},
|
{"ROLLUP", TK_ROLLUP},
|
||||||
{"SCHEMA", TK_SCHEMA},
|
|
||||||
{"SCHEMALESS", TK_SCHEMALESS},
|
{"SCHEMALESS", TK_SCHEMALESS},
|
||||||
{"SCORES", TK_SCORES},
|
{"SCORES", TK_SCORES},
|
||||||
{"SELECT", TK_SELECT},
|
{"SELECT", TK_SELECT},
|
||||||
|
@ -214,7 +213,6 @@ static SKeyword keywordTable[] = {
|
||||||
{"WATERMARK", TK_WATERMARK},
|
{"WATERMARK", TK_WATERMARK},
|
||||||
{"WHERE", TK_WHERE},
|
{"WHERE", TK_WHERE},
|
||||||
{"WINDOW_CLOSE", TK_WINDOW_CLOSE},
|
{"WINDOW_CLOSE", TK_WINDOW_CLOSE},
|
||||||
{"WITH", TK_WITH},
|
|
||||||
{"WRITE", TK_WRITE},
|
{"WRITE", TK_WRITE},
|
||||||
{"_C0", TK_ROWTS},
|
{"_C0", TK_ROWTS},
|
||||||
{"_QENDTS", TK_QENDTS},
|
{"_QENDTS", TK_QENDTS},
|
||||||
|
|
|
@ -3239,9 +3239,6 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
|
||||||
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
|
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
|
||||||
tNameGetFullDbName(&name, pReq->name);
|
tNameGetFullDbName(&name, pReq->name);
|
||||||
pReq->igExists = pStmt->ignoreExists;
|
pReq->igExists = pStmt->ignoreExists;
|
||||||
/*pReq->withTbName = pStmt->pOptions->withTable;*/
|
|
||||||
/*pReq->withSchema = pStmt->pOptions->withSchema;*/
|
|
||||||
/*pReq->withTag = pStmt->pOptions->withTag;*/
|
|
||||||
|
|
||||||
pReq->sql = strdup(pCxt->pParseCxt->pSql);
|
pReq->sql = strdup(pCxt->pParseCxt->pSql);
|
||||||
if (NULL == pReq->sql) {
|
if (NULL == pReq->sql) {
|
||||||
|
@ -3250,19 +3247,22 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
const char* dbName;
|
if ('\0' != pStmt->subSTbName[0]) {
|
||||||
if (NULL != pStmt->pQuery) {
|
pReq->subType = TOPIC_SUB_TYPE__TABLE;
|
||||||
dbName = ((SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable))->table.dbName;
|
toName(pCxt->pParseCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name);
|
||||||
|
tNameExtractFullName(&name, pReq->subStbName);
|
||||||
|
} else if ('\0' != pStmt->subDbName[0]) {
|
||||||
|
pReq->subType = TOPIC_SUB_TYPE__DB;
|
||||||
|
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->subDbName, strlen(pStmt->subDbName));
|
||||||
|
tNameGetFullDbName(&name, pReq->subDbName);
|
||||||
|
} else {
|
||||||
|
pReq->subType = TOPIC_SUB_TYPE__COLUMN;
|
||||||
pCxt->pParseCxt->topicQuery = true;
|
pCxt->pParseCxt->topicQuery = true;
|
||||||
code = translateQuery(pCxt, pStmt->pQuery);
|
code = translateQuery(pCxt, pStmt->pQuery);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
|
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
dbName = pStmt->subscribeDbName;
|
|
||||||
}
|
}
|
||||||
tNameSetDbName(&name, pCxt->pParseCxt->acctId, dbName, strlen(dbName));
|
|
||||||
tNameGetFullDbName(&name, pReq->subDbName);
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -440,13 +440,62 @@ TEST_F(ParserInitialCTest, createTable) {
|
||||||
TEST_F(ParserInitialCTest, createTopic) {
|
TEST_F(ParserInitialCTest, createTopic) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
|
SCMCreateTopicReq expect = {0};
|
||||||
|
|
||||||
|
auto setCreateTopicReqFunc = [&](const char* pTopicName, int8_t igExists, const char* pSql, const char* pAst,
|
||||||
|
const char* pDbName = nullptr, const char* pTbname = nullptr) {
|
||||||
|
memset(&expect, 0, sizeof(SMCreateStbReq));
|
||||||
|
snprintf(expect.name, sizeof(expect.name), "0.%s", pTopicName);
|
||||||
|
expect.igExists = igExists;
|
||||||
|
expect.sql = (char*)pSql;
|
||||||
|
if (nullptr != pTbname) {
|
||||||
|
expect.subType = TOPIC_SUB_TYPE__TABLE;
|
||||||
|
snprintf(expect.subStbName, sizeof(expect.subStbName), "0.%s.%s", pDbName, pTbname);
|
||||||
|
} else if (nullptr != pAst) {
|
||||||
|
expect.subType = TOPIC_SUB_TYPE__COLUMN;
|
||||||
|
expect.ast = (char*)pAst;
|
||||||
|
} else {
|
||||||
|
expect.subType = TOPIC_SUB_TYPE__DB;
|
||||||
|
snprintf(expect.subStbName, sizeof(expect.subStbName), "0.%s", pDbName);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
|
||||||
|
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_TOPIC_STMT);
|
||||||
|
SCMCreateTopicReq req = {0};
|
||||||
|
ASSERT_TRUE(TSDB_CODE_SUCCESS ==
|
||||||
|
tDeserializeSCMCreateTopicReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
|
||||||
|
|
||||||
|
ASSERT_EQ(std::string(req.name), std::string(expect.name));
|
||||||
|
ASSERT_EQ(req.igExists, expect.igExists);
|
||||||
|
ASSERT_EQ(req.subType, expect.subType);
|
||||||
|
ASSERT_EQ(std::string(req.sql), std::string(expect.sql));
|
||||||
|
switch (expect.subType) {
|
||||||
|
case TOPIC_SUB_TYPE__DB:
|
||||||
|
ASSERT_EQ(std::string(req.subDbName), std::string(expect.subDbName));
|
||||||
|
break;
|
||||||
|
case TOPIC_SUB_TYPE__TABLE:
|
||||||
|
ASSERT_EQ(std::string(req.subStbName), std::string(expect.subStbName));
|
||||||
|
break;
|
||||||
|
case TOPIC_SUB_TYPE__COLUMN:
|
||||||
|
ASSERT_NE(req.ast, nullptr);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
ASSERT_TRUE(false);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
setCreateTopicReqFunc("tp1", 0, "create topic tp1 as select * from t1", "ast");
|
||||||
run("CREATE TOPIC tp1 AS SELECT * FROM t1");
|
run("CREATE TOPIC tp1 AS SELECT * FROM t1");
|
||||||
|
|
||||||
run("CREATE TOPIC IF NOT EXISTS tp1 AS SELECT * FROM t1");
|
setCreateTopicReqFunc("tp1", 1, "create topic if not exists tp1 as select ts, ceil(c1) from t1", "ast");
|
||||||
|
run("CREATE TOPIC IF NOT EXISTS tp1 AS SELECT ts, CEIL(c1) FROM t1");
|
||||||
|
|
||||||
run("CREATE TOPIC tp1 AS test");
|
setCreateTopicReqFunc("tp1", 0, "create topic tp1 as database test", nullptr, "test");
|
||||||
|
run("CREATE TOPIC tp1 AS DATABASE test");
|
||||||
|
|
||||||
run("CREATE TOPIC IF NOT EXISTS tp1 AS test");
|
setCreateTopicReqFunc("tp1", 1, "create topic if not exists tp1 as stable st1", nullptr, "test", "st1");
|
||||||
|
run("CREATE TOPIC IF NOT EXISTS tp1 AS STABLE st1");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(ParserInitialCTest, createUser) {
|
TEST_F(ParserInitialCTest, createUser) {
|
||||||
|
|
Loading…
Reference in New Issue