feat:support only meta in tmq subscriptions

This commit is contained in:
wangmm0220 2023-06-30 11:42:40 +08:00
parent d91f44c274
commit 58c7011e0e
16 changed files with 272 additions and 269 deletions

View File

@ -131,10 +131,10 @@ static inline int STupleKeyCmpr(const void* pKey1, int kLen1, const void* pKey2,
enum { enum {
TMQ_MSG_TYPE__DUMMY = 0, TMQ_MSG_TYPE__DUMMY = 0,
TMQ_MSG_TYPE__POLL_RSP, TMQ_MSG_TYPE__POLL_DATA_RSP,
TMQ_MSG_TYPE__POLL_META_RSP, TMQ_MSG_TYPE__POLL_META_RSP,
TMQ_MSG_TYPE__EP_RSP, TMQ_MSG_TYPE__EP_RSP,
TMQ_MSG_TYPE__TAOSX_RSP, TMQ_MSG_TYPE__POLL_DATA_META_RSP,
TMQ_MSG_TYPE__WALINFO_RSP, TMQ_MSG_TYPE__WALINFO_RSP,
TMQ_MSG_TYPE__END_RSP, TMQ_MSG_TYPE__END_RSP,
}; };

View File

@ -2909,6 +2909,12 @@ enum {
TMQ_OFFSET__SNAPSHOT_META = 3, TMQ_OFFSET__SNAPSHOT_META = 3,
}; };
enum {
WITH_DATA = 0,
WITH_META = 1,
ONLY_META = 2,
};
typedef struct { typedef struct {
int8_t type; int8_t type;
union { union {

View File

@ -193,165 +193,166 @@
#define TK_INTERVAL 175 #define TK_INTERVAL 175
#define TK_COUNT 176 #define TK_COUNT 176
#define TK_LAST_ROW 177 #define TK_LAST_ROW 177
#define TK_TOPIC 178 #define TK_META 178
#define TK_META 179 #define TK_ONLY 179
#define TK_CONSUMER 180 #define TK_TOPIC 180
#define TK_GROUP 181 #define TK_CONSUMER 181
#define TK_DESC 182 #define TK_GROUP 182
#define TK_DESCRIBE 183 #define TK_DESC 183
#define TK_RESET 184 #define TK_DESCRIBE 184
#define TK_QUERY 185 #define TK_RESET 185
#define TK_CACHE 186 #define TK_QUERY 186
#define TK_EXPLAIN 187 #define TK_CACHE 187
#define TK_ANALYZE 188 #define TK_EXPLAIN 188
#define TK_VERBOSE 189 #define TK_ANALYZE 189
#define TK_NK_BOOL 190 #define TK_VERBOSE 190
#define TK_RATIO 191 #define TK_NK_BOOL 191
#define TK_NK_FLOAT 192 #define TK_RATIO 192
#define TK_OUTPUTTYPE 193 #define TK_NK_FLOAT 193
#define TK_AGGREGATE 194 #define TK_OUTPUTTYPE 194
#define TK_BUFSIZE 195 #define TK_AGGREGATE 195
#define TK_LANGUAGE 196 #define TK_BUFSIZE 196
#define TK_REPLACE 197 #define TK_LANGUAGE 197
#define TK_STREAM 198 #define TK_REPLACE 198
#define TK_INTO 199 #define TK_STREAM 199
#define TK_PAUSE 200 #define TK_INTO 200
#define TK_RESUME 201 #define TK_PAUSE 201
#define TK_TRIGGER 202 #define TK_RESUME 202
#define TK_AT_ONCE 203 #define TK_TRIGGER 203
#define TK_WINDOW_CLOSE 204 #define TK_AT_ONCE 204
#define TK_IGNORE 205 #define TK_WINDOW_CLOSE 205
#define TK_EXPIRED 206 #define TK_IGNORE 206
#define TK_FILL_HISTORY 207 #define TK_EXPIRED 207
#define TK_UPDATE 208 #define TK_FILL_HISTORY 208
#define TK_SUBTABLE 209 #define TK_UPDATE 209
#define TK_UNTREATED 210 #define TK_SUBTABLE 210
#define TK_KILL 211 #define TK_UNTREATED 211
#define TK_CONNECTION 212 #define TK_KILL 212
#define TK_TRANSACTION 213 #define TK_CONNECTION 213
#define TK_BALANCE 214 #define TK_TRANSACTION 214
#define TK_VGROUP 215 #define TK_BALANCE 215
#define TK_LEADER 216 #define TK_VGROUP 216
#define TK_MERGE 217 #define TK_LEADER 217
#define TK_REDISTRIBUTE 218 #define TK_MERGE 218
#define TK_SPLIT 219 #define TK_REDISTRIBUTE 219
#define TK_DELETE 220 #define TK_SPLIT 220
#define TK_INSERT 221 #define TK_DELETE 221
#define TK_NULL 222 #define TK_INSERT 222
#define TK_NK_QUESTION 223 #define TK_NULL 223
#define TK_NK_ARROW 224 #define TK_NK_QUESTION 224
#define TK_ROWTS 225 #define TK_NK_ARROW 225
#define TK_QSTART 226 #define TK_ROWTS 226
#define TK_QEND 227 #define TK_QSTART 227
#define TK_QDURATION 228 #define TK_QEND 228
#define TK_WSTART 229 #define TK_QDURATION 229
#define TK_WEND 230 #define TK_WSTART 230
#define TK_WDURATION 231 #define TK_WEND 231
#define TK_IROWTS 232 #define TK_WDURATION 232
#define TK_ISFILLED 233 #define TK_IROWTS 233
#define TK_CAST 234 #define TK_ISFILLED 234
#define TK_NOW 235 #define TK_CAST 235
#define TK_TODAY 236 #define TK_NOW 236
#define TK_TIMEZONE 237 #define TK_TODAY 237
#define TK_CLIENT_VERSION 238 #define TK_TIMEZONE 238
#define TK_SERVER_VERSION 239 #define TK_CLIENT_VERSION 239
#define TK_SERVER_STATUS 240 #define TK_SERVER_VERSION 240
#define TK_CURRENT_USER 241 #define TK_SERVER_STATUS 241
#define TK_CASE 242 #define TK_CURRENT_USER 242
#define TK_WHEN 243 #define TK_CASE 243
#define TK_THEN 244 #define TK_WHEN 244
#define TK_ELSE 245 #define TK_THEN 245
#define TK_BETWEEN 246 #define TK_ELSE 246
#define TK_IS 247 #define TK_BETWEEN 247
#define TK_NK_LT 248 #define TK_IS 248
#define TK_NK_GT 249 #define TK_NK_LT 249
#define TK_NK_LE 250 #define TK_NK_GT 250
#define TK_NK_GE 251 #define TK_NK_LE 251
#define TK_NK_NE 252 #define TK_NK_GE 252
#define TK_MATCH 253 #define TK_NK_NE 253
#define TK_NMATCH 254 #define TK_MATCH 254
#define TK_CONTAINS 255 #define TK_NMATCH 255
#define TK_IN 256 #define TK_CONTAINS 256
#define TK_JOIN 257 #define TK_IN 257
#define TK_INNER 258 #define TK_JOIN 258
#define TK_SELECT 259 #define TK_INNER 259
#define TK_DISTINCT 260 #define TK_SELECT 260
#define TK_WHERE 261 #define TK_DISTINCT 261
#define TK_PARTITION 262 #define TK_WHERE 262
#define TK_BY 263 #define TK_PARTITION 263
#define TK_SESSION 264 #define TK_BY 264
#define TK_STATE_WINDOW 265 #define TK_SESSION 265
#define TK_EVENT_WINDOW 266 #define TK_STATE_WINDOW 266
#define TK_SLIDING 267 #define TK_EVENT_WINDOW 267
#define TK_FILL 268 #define TK_SLIDING 268
#define TK_VALUE 269 #define TK_FILL 269
#define TK_VALUE_F 270 #define TK_VALUE 270
#define TK_NONE 271 #define TK_VALUE_F 271
#define TK_PREV 272 #define TK_NONE 272
#define TK_NULL_F 273 #define TK_PREV 273
#define TK_LINEAR 274 #define TK_NULL_F 274
#define TK_NEXT 275 #define TK_LINEAR 275
#define TK_HAVING 276 #define TK_NEXT 276
#define TK_RANGE 277 #define TK_HAVING 277
#define TK_EVERY 278 #define TK_RANGE 278
#define TK_ORDER 279 #define TK_EVERY 279
#define TK_SLIMIT 280 #define TK_ORDER 280
#define TK_SOFFSET 281 #define TK_SLIMIT 281
#define TK_LIMIT 282 #define TK_SOFFSET 282
#define TK_OFFSET 283 #define TK_LIMIT 283
#define TK_ASC 284 #define TK_OFFSET 284
#define TK_NULLS 285 #define TK_ASC 285
#define TK_ABORT 286 #define TK_NULLS 286
#define TK_AFTER 287 #define TK_ABORT 287
#define TK_ATTACH 288 #define TK_AFTER 288
#define TK_BEFORE 289 #define TK_ATTACH 289
#define TK_BEGIN 290 #define TK_BEFORE 290
#define TK_BITAND 291 #define TK_BEGIN 291
#define TK_BITNOT 292 #define TK_BITAND 292
#define TK_BITOR 293 #define TK_BITNOT 293
#define TK_BLOCKS 294 #define TK_BITOR 294
#define TK_CHANGE 295 #define TK_BLOCKS 295
#define TK_COMMA 296 #define TK_CHANGE 296
#define TK_CONCAT 297 #define TK_COMMA 297
#define TK_CONFLICT 298 #define TK_CONCAT 298
#define TK_COPY 299 #define TK_CONFLICT 299
#define TK_DEFERRED 300 #define TK_COPY 300
#define TK_DELIMITERS 301 #define TK_DEFERRED 301
#define TK_DETACH 302 #define TK_DELIMITERS 302
#define TK_DIVIDE 303 #define TK_DETACH 303
#define TK_DOT 304 #define TK_DIVIDE 304
#define TK_EACH 305 #define TK_DOT 305
#define TK_FAIL 306 #define TK_EACH 306
#define TK_FILE 307 #define TK_FAIL 307
#define TK_FOR 308 #define TK_FILE 308
#define TK_GLOB 309 #define TK_FOR 309
#define TK_ID 310 #define TK_GLOB 310
#define TK_IMMEDIATE 311 #define TK_ID 311
#define TK_IMPORT 312 #define TK_IMMEDIATE 312
#define TK_INITIALLY 313 #define TK_IMPORT 313
#define TK_INSTEAD 314 #define TK_INITIALLY 314
#define TK_ISNULL 315 #define TK_INSTEAD 315
#define TK_KEY 316 #define TK_ISNULL 316
#define TK_MODULES 317 #define TK_KEY 317
#define TK_NK_BITNOT 318 #define TK_MODULES 318
#define TK_NK_SEMI 319 #define TK_NK_BITNOT 319
#define TK_NOTNULL 320 #define TK_NK_SEMI 320
#define TK_OF 321 #define TK_NOTNULL 321
#define TK_PLUS 322 #define TK_OF 322
#define TK_PRIVILEGE 323 #define TK_PLUS 323
#define TK_RAISE 324 #define TK_PRIVILEGE 324
#define TK_RESTRICT 325 #define TK_RAISE 325
#define TK_ROW 326 #define TK_RESTRICT 326
#define TK_SEMI 327 #define TK_ROW 327
#define TK_STAR 328 #define TK_SEMI 328
#define TK_STATEMENT 329 #define TK_STAR 329
#define TK_STRICT 330 #define TK_STATEMENT 330
#define TK_STRING 331 #define TK_STRICT 331
#define TK_TIMES 332 #define TK_STRING 332
#define TK_VALUES 333 #define TK_TIMES 333
#define TK_VARIABLE 334 #define TK_VALUES 334
#define TK_VIEW 335 #define TK_VARIABLE 335
#define TK_WAL 336 #define TK_VIEW 336
#define TK_WAL 337

View File

@ -123,8 +123,8 @@ typedef struct SSnapContext {
SHashObj* suidInfo; SHashObj* suidInfo;
SArray* idList; SArray* idList;
int32_t index; int32_t index;
bool withMeta; int8_t withMeta;
bool queryMeta; // true-get meta, false-get data int8_t queryMeta; // true-get meta, false-get data
} SSnapContext; } SSnapContext;
typedef struct { typedef struct {

View File

@ -362,7 +362,7 @@ typedef struct SCreateTopicStmt {
char subDbName[TSDB_DB_NAME_LEN]; char subDbName[TSDB_DB_NAME_LEN];
char subSTbName[TSDB_TABLE_NAME_LEN]; char subSTbName[TSDB_TABLE_NAME_LEN];
bool ignoreExists; bool ignoreExists;
bool withMeta; int8_t withMeta;
SNode* pQuery; SNode* pQuery;
SNode* pWhere; SNode* pWhere;
} SCreateTopicStmt; } SCreateTopicStmt;

View File

@ -917,7 +917,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
tDeleteSMqAskEpRsp(&pEpRspWrapper->msg); tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset); taosMemoryFreeClear(pRsp->pEpset);
@ -930,7 +930,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
taosMemoryFreeClear(pRsp->pEpset); taosMemoryFreeClear(pRsp->pEpset);
taosMemoryFree(pRsp->metaRsp.metaRsp); taosMemoryFree(pRsp->metaRsp.metaRsp);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset); taosMemoryFreeClear(pRsp->pEpset);
@ -1405,7 +1405,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
strcpy(pRspWrapper->topicName, pParam->topicName); strcpy(pRspWrapper->topicName, pParam->topicName);
pMsg->pEpSet = NULL; pMsg->pEpSet = NULL;
if (rspType == TMQ_MSG_TYPE__POLL_RSP) { if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp); tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp);
@ -1422,7 +1422,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp); tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
tDecoderClear(&decoder); tDecoderClear(&decoder);
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
} else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) { } else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp); tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
@ -1881,7 +1881,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno)); tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
return NULL; return NULL;
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch); int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
@ -1981,7 +1981,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pRspWrapper = tmqFreeRspWrapper(pRspWrapper); pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch); int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
@ -2023,7 +2023,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
void* pRsp = NULL; void* pRsp = NULL;
int64_t numOfRows = 0; int64_t numOfRows = 0;
if (pollRspWrapper->taosxRsp.createTableNum == 0) { if (pollRspWrapper->taosxRsp.createTableNum == 0) {
pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); tscError("consumer:0x%" PRIx64" createTableNum should > 0 if rsp type is data_meta", tmq->consumerId);
} else { } else {
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows); pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
} }

View File

@ -235,7 +235,7 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot); int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot);
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
SSnapContext **ctxRet); SSnapContext **ctxRet);
int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx);

View File

@ -260,7 +260,7 @@ static void saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo)
taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable)); taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
} }
int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
SSnapContext** ctxRet) { SSnapContext** ctxRet) {
SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext)); SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
if (ctx == NULL) return -1; if (ctx == NULL) return -1;
@ -476,7 +476,7 @@ int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLe
if (ctx->index >= taosArrayGetSize(ctx->idList)) { if (ctx->index >= taosArrayGetSize(ctx->idList)) {
metaDebug("tmqsnap get meta end"); metaDebug("tmqsnap get meta end");
ctx->index = 0; ctx->index = 0;
ctx->queryMeta = false; // change to get data ctx->queryMeta = 0; // change to get data
return 0; return 0;
} }

View File

@ -183,64 +183,64 @@ void tqNotifyClose(STQ* pTq) {
} }
} }
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, //static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
int64_t consumerId, int32_t type) { // int64_t consumerId, int32_t type) {
int32_t len = 0; // int32_t len = 0;
int32_t code = 0; // int32_t code = 0;
//
if (type == TMQ_MSG_TYPE__POLL_RSP) { // if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); // tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { // } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); // tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
} // }
//
if (code < 0) { // if (code < 0) {
return -1; // return -1;
} // }
//
int32_t tlen = sizeof(SMqRspHead) + len; // int32_t tlen = sizeof(SMqRspHead) + len;
void* buf = rpcMallocCont(tlen); // void* buf = rpcMallocCont(tlen);
if (buf == NULL) { // if (buf == NULL) {
return -1; // return -1;
} // }
//
((SMqRspHead*)buf)->mqMsgType = type; // ((SMqRspHead*)buf)->mqMsgType = type;
((SMqRspHead*)buf)->epoch = epoch; // ((SMqRspHead*)buf)->epoch = epoch;
((SMqRspHead*)buf)->consumerId = consumerId; // ((SMqRspHead*)buf)->consumerId = consumerId;
//
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); // void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
//
SEncoder encoder = {0}; // SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len); // tEncoderInit(&encoder, abuf, len);
//
if (type == TMQ_MSG_TYPE__POLL_RSP) { // if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
tEncodeMqDataRsp(&encoder, pRsp); // tEncodeMqDataRsp(&encoder, pRsp);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { // } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); // tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
} // }
//
tEncoderClear(&encoder); // tEncoderClear(&encoder);
//
SRpcMsg rsp = { // SRpcMsg rsp = {
.info = *pRpcHandleInfo, // .info = *pRpcHandleInfo,
.pCont = buf, // .pCont = buf,
.contLen = tlen, // .contLen = tlen,
.code = 0, // .code = 0,
}; // };
//
tmsgSendRsp(&rsp); // tmsgSendRsp(&rsp);
return 0; // return 0;
} //}
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
dataRsp.head.consumerId = pHandle->consumerId; dataRsp.head.consumerId = pHandle->consumerId;
dataRsp.head.epoch = pHandle->epoch; dataRsp.head.epoch = pHandle->epoch;
dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_DATA_RSP;
int64_t sver = 0, ever = 0; int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_DATA_RSP, sver,
ever); ever);
char buf1[TSDB_OFFSET_LEN] = {0}; char buf1[TSDB_OFFSET_LEN] = {0};

View File

@ -216,7 +216,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
code = 0; code = 0;
goto END; goto END;
} else { } else {
if (pHandle->fetchMeta) { if (pHandle->fetchMeta != WITH_DATA) {
SWalCont* pHead = &((*ppCkHead)->head); SWalCont* pHead = &((*ppCkHead)->head);
if (IS_META_MSG(pHead->msgType)) { if (IS_META_MSG(pHead->msgType)) {
code = walFetchBody(pHandle->pWalReader, ppCkHead); code = walFetchBody(pHandle->pWalReader, ppCkHead);

View File

@ -227,7 +227,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
continue; continue;
} }
} }
if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) { if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pRsp->createTableNum == 0) { if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
@ -255,6 +255,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
tEncoderClear(&encoder); tEncoderClear(&encoder);
} }
if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL){
continue;
}
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i); SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
@ -286,7 +289,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
continue; continue;
} }
} }
if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) { if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pRsp->createTableNum == 0) { if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
@ -314,6 +317,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
tEncoderClear(&encoder); tEncoderClear(&encoder);
} }
if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL){
continue;
}
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i); SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),

View File

@ -123,28 +123,17 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
} }
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
walRefLastVer(pTq->pVnode->pWal, pHandle->pRef); walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { SMqDataRsp dataRsp = {0};
SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest);
tqInitDataRsp(&dataRsp, pRequest);
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer); tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
pHandle->subKey, vgId, dataRsp.rspOffset.version); pHandle->subKey, vgId, dataRsp.rspOffset.version);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
tDeleteMqDataRsp(&dataRsp); tDeleteMqDataRsp(&dataRsp);
*pBlockReturned = true; *pBlockReturned = true;
return code; return code;
} else {
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
tDeleteSTaosxRsp(&taosxRsp);
*pBlockReturned = true;
return code;
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
" in vg %d, subkey %s, reset none failed", " in vg %d, subkey %s, reset none failed",
@ -187,7 +176,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
} }
} }
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
end : { end : {
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
@ -230,7 +219,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type,
taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts); taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
if (taosxRsp.blockNum > 0) { if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} else { } else {
*offset = taosxRsp.rspOffset; *offset = taosxRsp.rspOffset;
@ -260,7 +249,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} }
@ -272,7 +261,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (pHead->msgType != TDMT_VND_SUBMIT) { if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) { if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} }
@ -301,7 +290,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} else { } else {
fetchVer++; fetchVer++;
@ -396,9 +385,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
int32_t len = 0; int32_t len = 0;
int32_t code = 0; int32_t code = 0;
if (type == TMQ_MSG_TYPE__POLL_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) { if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
} }
@ -420,9 +409,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len); tEncoderInit(&encoder, abuf, len);
if (type == TMQ_MSG_TYPE__POLL_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) { if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
tEncodeMqDataRsp(&encoder, pRsp); tEncodeMqDataRsp(&encoder, pRsp);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
} }

View File

@ -2154,7 +2154,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
qDebug("tmqsnap doRawScan called"); qDebug("tmqsnap doRawScan called");
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
bool hasNext = false; bool hasNext = false;
if (pInfo->dataReader) { if (pInfo->dataReader && pInfo->sContext->withMeta != ONLY_META) {
code = pAPI->tsdReader.tsdNextDataBlock(pInfo->dataReader, &hasNext); code = pAPI->tsdReader.tsdNextDataBlock(pInfo->dataReader, &hasNext);
if (code) { if (code) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader); pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader);
@ -2180,7 +2180,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
SMetaTableInfo mtInfo = pAPI->snapshotFn.getMetaTableInfoFromSnapshot(pInfo->sContext); SMetaTableInfo mtInfo = pAPI->snapshotFn.getMetaTableInfoFromSnapshot(pInfo->sContext);
STqOffsetVal offset = {0}; STqOffsetVal offset = {0};
if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal if (mtInfo.uid == 0 || pInfo->sContext->withMeta == ONLY_META) { // read snapshot done, change to get data from wal
qDebug("tmqsnap read snapshot done, change to get data from wal"); qDebug("tmqsnap read snapshot done, change to get data from wal");
tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion); tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion);
} else { } else {

View File

@ -206,9 +206,9 @@ SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, cons
SNode* createRestoreComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId); SNode* createRestoreComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId);
SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pQuery); SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pQuery);
SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SToken* pSubDbName, SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SToken* pSubDbName,
bool withMeta); int8_t withMeta);
SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable, SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable,
bool withMeta, SNode* pWhere); int8_t withMeta, SNode* pWhere);
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName); SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName);
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pCGroupId, SToken* pTopicName); SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pCGroupId, SToken* pTopicName);
SNode* createAlterLocalStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue); SNode* createAlterLocalStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue);

View File

@ -538,14 +538,15 @@ sma_stream_opt(A) ::= sma_stream_opt(B) MAX_DELAY duration_literal(C).
sma_stream_opt(A) ::= sma_stream_opt(B) DELETE_MARK duration_literal(C). { ((SStreamOptions*)B)->pDeleteMark = releaseRawExprNode(pCxt, C); A = B; } sma_stream_opt(A) ::= sma_stream_opt(B) DELETE_MARK duration_literal(C). { ((SStreamOptions*)B)->pDeleteMark = releaseRawExprNode(pCxt, C); A = B; }
/************************************************ create/drop topic ***************************************************/ /************************************************ create/drop topic ***************************************************/
with_meta(A) ::= . { A = 0; }
with_meta(A) ::= WITH META. { A = 1; }
with_meta(A) ::= ONLY META. { A = 2; }
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS query_or_subquery(C). { pCxt->pRootNode = createCreateTopicStmtUseQuery(pCxt, A, &B, C); } cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS query_or_subquery(C). { pCxt->pRootNode = createCreateTopicStmtUseQuery(pCxt, A, &B, C); }
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, false); } cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) with_meta(D)
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, D); }
WITH META AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, true); } cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) with_meta(E)
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS STABLE full_table_name(C) where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, E, D); }
AS STABLE full_table_name(C) where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, false, D); }
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B)
WITH META AS STABLE full_table_name(C) where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, true, D); }
cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); } cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); }
cmd ::= DROP CONSUMER GROUP exists_opt(A) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C); } cmd ::= DROP CONSUMER GROUP exists_opt(A) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C); }

View File

@ -1715,7 +1715,7 @@ SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists,
} }
SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SToken* pSubDbName, SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SToken* pSubDbName,
bool withMeta) { int8_t withMeta) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
if (!checkTopicName(pCxt, pTopicName) || !checkDbName(pCxt, pSubDbName, true)) { if (!checkTopicName(pCxt, pTopicName) || !checkDbName(pCxt, pSubDbName, true)) {
return NULL; return NULL;
@ -1730,7 +1730,7 @@ SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, ST
} }
SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable, SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable,
bool withMeta, SNode* pWhere) { int8_t withMeta, SNode* pWhere) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
if (!checkTopicName(pCxt, pTopicName)) { if (!checkTopicName(pCxt, pTopicName)) {
return NULL; return NULL;