diff --git a/include/common/tcommon.h b/include/common/tcommon.h index ea17262abd..d88228b436 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -131,10 +131,10 @@ static inline int STupleKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, enum { TMQ_MSG_TYPE__DUMMY = 0, - TMQ_MSG_TYPE__POLL_RSP, + TMQ_MSG_TYPE__POLL_DATA_RSP, TMQ_MSG_TYPE__POLL_META_RSP, TMQ_MSG_TYPE__EP_RSP, - TMQ_MSG_TYPE__TAOSX_RSP, + TMQ_MSG_TYPE__POLL_DATA_META_RSP, TMQ_MSG_TYPE__WALINFO_RSP, TMQ_MSG_TYPE__END_RSP, }; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fa092a453c..6e182c1c35 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2909,6 +2909,12 @@ enum { TMQ_OFFSET__SNAPSHOT_META = 3, }; +enum { + WITH_DATA = 0, + WITH_META = 1, + ONLY_META = 2, +}; + typedef struct { int8_t type; union { diff --git a/include/common/ttokendef.h b/include/common/ttokendef.h index 5410e9af88..9c9b2f5820 100644 --- a/include/common/ttokendef.h +++ b/include/common/ttokendef.h @@ -193,165 +193,166 @@ #define TK_INTERVAL 175 #define TK_COUNT 176 #define TK_LAST_ROW 177 -#define TK_TOPIC 178 -#define TK_META 179 -#define TK_CONSUMER 180 -#define TK_GROUP 181 -#define TK_DESC 182 -#define TK_DESCRIBE 183 -#define TK_RESET 184 -#define TK_QUERY 185 -#define TK_CACHE 186 -#define TK_EXPLAIN 187 -#define TK_ANALYZE 188 -#define TK_VERBOSE 189 -#define TK_NK_BOOL 190 -#define TK_RATIO 191 -#define TK_NK_FLOAT 192 -#define TK_OUTPUTTYPE 193 -#define TK_AGGREGATE 194 -#define TK_BUFSIZE 195 -#define TK_LANGUAGE 196 -#define TK_REPLACE 197 -#define TK_STREAM 198 -#define TK_INTO 199 -#define TK_PAUSE 200 -#define TK_RESUME 201 -#define TK_TRIGGER 202 -#define TK_AT_ONCE 203 -#define TK_WINDOW_CLOSE 204 -#define TK_IGNORE 205 -#define TK_EXPIRED 206 -#define TK_FILL_HISTORY 207 -#define TK_UPDATE 208 -#define TK_SUBTABLE 209 -#define TK_UNTREATED 210 -#define TK_KILL 211 -#define TK_CONNECTION 212 -#define TK_TRANSACTION 213 -#define TK_BALANCE 214 -#define TK_VGROUP 215 -#define TK_LEADER 216 -#define TK_MERGE 217 -#define TK_REDISTRIBUTE 218 -#define TK_SPLIT 219 -#define TK_DELETE 220 -#define TK_INSERT 221 -#define TK_NULL 222 -#define TK_NK_QUESTION 223 -#define TK_NK_ARROW 224 -#define TK_ROWTS 225 -#define TK_QSTART 226 -#define TK_QEND 227 -#define TK_QDURATION 228 -#define TK_WSTART 229 -#define TK_WEND 230 -#define TK_WDURATION 231 -#define TK_IROWTS 232 -#define TK_ISFILLED 233 -#define TK_CAST 234 -#define TK_NOW 235 -#define TK_TODAY 236 -#define TK_TIMEZONE 237 -#define TK_CLIENT_VERSION 238 -#define TK_SERVER_VERSION 239 -#define TK_SERVER_STATUS 240 -#define TK_CURRENT_USER 241 -#define TK_CASE 242 -#define TK_WHEN 243 -#define TK_THEN 244 -#define TK_ELSE 245 -#define TK_BETWEEN 246 -#define TK_IS 247 -#define TK_NK_LT 248 -#define TK_NK_GT 249 -#define TK_NK_LE 250 -#define TK_NK_GE 251 -#define TK_NK_NE 252 -#define TK_MATCH 253 -#define TK_NMATCH 254 -#define TK_CONTAINS 255 -#define TK_IN 256 -#define TK_JOIN 257 -#define TK_INNER 258 -#define TK_SELECT 259 -#define TK_DISTINCT 260 -#define TK_WHERE 261 -#define TK_PARTITION 262 -#define TK_BY 263 -#define TK_SESSION 264 -#define TK_STATE_WINDOW 265 -#define TK_EVENT_WINDOW 266 -#define TK_SLIDING 267 -#define TK_FILL 268 -#define TK_VALUE 269 -#define TK_VALUE_F 270 -#define TK_NONE 271 -#define TK_PREV 272 -#define TK_NULL_F 273 -#define TK_LINEAR 274 -#define TK_NEXT 275 -#define TK_HAVING 276 -#define TK_RANGE 277 -#define TK_EVERY 278 -#define TK_ORDER 279 -#define TK_SLIMIT 280 -#define TK_SOFFSET 281 -#define TK_LIMIT 282 -#define TK_OFFSET 283 -#define TK_ASC 284 -#define TK_NULLS 285 -#define TK_ABORT 286 -#define TK_AFTER 287 -#define TK_ATTACH 288 -#define TK_BEFORE 289 -#define TK_BEGIN 290 -#define TK_BITAND 291 -#define TK_BITNOT 292 -#define TK_BITOR 293 -#define TK_BLOCKS 294 -#define TK_CHANGE 295 -#define TK_COMMA 296 -#define TK_CONCAT 297 -#define TK_CONFLICT 298 -#define TK_COPY 299 -#define TK_DEFERRED 300 -#define TK_DELIMITERS 301 -#define TK_DETACH 302 -#define TK_DIVIDE 303 -#define TK_DOT 304 -#define TK_EACH 305 -#define TK_FAIL 306 -#define TK_FILE 307 -#define TK_FOR 308 -#define TK_GLOB 309 -#define TK_ID 310 -#define TK_IMMEDIATE 311 -#define TK_IMPORT 312 -#define TK_INITIALLY 313 -#define TK_INSTEAD 314 -#define TK_ISNULL 315 -#define TK_KEY 316 -#define TK_MODULES 317 -#define TK_NK_BITNOT 318 -#define TK_NK_SEMI 319 -#define TK_NOTNULL 320 -#define TK_OF 321 -#define TK_PLUS 322 -#define TK_PRIVILEGE 323 -#define TK_RAISE 324 -#define TK_RESTRICT 325 -#define TK_ROW 326 -#define TK_SEMI 327 -#define TK_STAR 328 -#define TK_STATEMENT 329 -#define TK_STRICT 330 -#define TK_STRING 331 -#define TK_TIMES 332 -#define TK_VALUES 333 -#define TK_VARIABLE 334 -#define TK_VIEW 335 -#define TK_WAL 336 +#define TK_META 178 +#define TK_ONLY 179 +#define TK_TOPIC 180 +#define TK_CONSUMER 181 +#define TK_GROUP 182 +#define TK_DESC 183 +#define TK_DESCRIBE 184 +#define TK_RESET 185 +#define TK_QUERY 186 +#define TK_CACHE 187 +#define TK_EXPLAIN 188 +#define TK_ANALYZE 189 +#define TK_VERBOSE 190 +#define TK_NK_BOOL 191 +#define TK_RATIO 192 +#define TK_NK_FLOAT 193 +#define TK_OUTPUTTYPE 194 +#define TK_AGGREGATE 195 +#define TK_BUFSIZE 196 +#define TK_LANGUAGE 197 +#define TK_REPLACE 198 +#define TK_STREAM 199 +#define TK_INTO 200 +#define TK_PAUSE 201 +#define TK_RESUME 202 +#define TK_TRIGGER 203 +#define TK_AT_ONCE 204 +#define TK_WINDOW_CLOSE 205 +#define TK_IGNORE 206 +#define TK_EXPIRED 207 +#define TK_FILL_HISTORY 208 +#define TK_UPDATE 209 +#define TK_SUBTABLE 210 +#define TK_UNTREATED 211 +#define TK_KILL 212 +#define TK_CONNECTION 213 +#define TK_TRANSACTION 214 +#define TK_BALANCE 215 +#define TK_VGROUP 216 +#define TK_LEADER 217 +#define TK_MERGE 218 +#define TK_REDISTRIBUTE 219 +#define TK_SPLIT 220 +#define TK_DELETE 221 +#define TK_INSERT 222 +#define TK_NULL 223 +#define TK_NK_QUESTION 224 +#define TK_NK_ARROW 225 +#define TK_ROWTS 226 +#define TK_QSTART 227 +#define TK_QEND 228 +#define TK_QDURATION 229 +#define TK_WSTART 230 +#define TK_WEND 231 +#define TK_WDURATION 232 +#define TK_IROWTS 233 +#define TK_ISFILLED 234 +#define TK_CAST 235 +#define TK_NOW 236 +#define TK_TODAY 237 +#define TK_TIMEZONE 238 +#define TK_CLIENT_VERSION 239 +#define TK_SERVER_VERSION 240 +#define TK_SERVER_STATUS 241 +#define TK_CURRENT_USER 242 +#define TK_CASE 243 +#define TK_WHEN 244 +#define TK_THEN 245 +#define TK_ELSE 246 +#define TK_BETWEEN 247 +#define TK_IS 248 +#define TK_NK_LT 249 +#define TK_NK_GT 250 +#define TK_NK_LE 251 +#define TK_NK_GE 252 +#define TK_NK_NE 253 +#define TK_MATCH 254 +#define TK_NMATCH 255 +#define TK_CONTAINS 256 +#define TK_IN 257 +#define TK_JOIN 258 +#define TK_INNER 259 +#define TK_SELECT 260 +#define TK_DISTINCT 261 +#define TK_WHERE 262 +#define TK_PARTITION 263 +#define TK_BY 264 +#define TK_SESSION 265 +#define TK_STATE_WINDOW 266 +#define TK_EVENT_WINDOW 267 +#define TK_SLIDING 268 +#define TK_FILL 269 +#define TK_VALUE 270 +#define TK_VALUE_F 271 +#define TK_NONE 272 +#define TK_PREV 273 +#define TK_NULL_F 274 +#define TK_LINEAR 275 +#define TK_NEXT 276 +#define TK_HAVING 277 +#define TK_RANGE 278 +#define TK_EVERY 279 +#define TK_ORDER 280 +#define TK_SLIMIT 281 +#define TK_SOFFSET 282 +#define TK_LIMIT 283 +#define TK_OFFSET 284 +#define TK_ASC 285 +#define TK_NULLS 286 +#define TK_ABORT 287 +#define TK_AFTER 288 +#define TK_ATTACH 289 +#define TK_BEFORE 290 +#define TK_BEGIN 291 +#define TK_BITAND 292 +#define TK_BITNOT 293 +#define TK_BITOR 294 +#define TK_BLOCKS 295 +#define TK_CHANGE 296 +#define TK_COMMA 297 +#define TK_CONCAT 298 +#define TK_CONFLICT 299 +#define TK_COPY 300 +#define TK_DEFERRED 301 +#define TK_DELIMITERS 302 +#define TK_DETACH 303 +#define TK_DIVIDE 304 +#define TK_DOT 305 +#define TK_EACH 306 +#define TK_FAIL 307 +#define TK_FILE 308 +#define TK_FOR 309 +#define TK_GLOB 310 +#define TK_ID 311 +#define TK_IMMEDIATE 312 +#define TK_IMPORT 313 +#define TK_INITIALLY 314 +#define TK_INSTEAD 315 +#define TK_ISNULL 316 +#define TK_KEY 317 +#define TK_MODULES 318 +#define TK_NK_BITNOT 319 +#define TK_NK_SEMI 320 +#define TK_NOTNULL 321 +#define TK_OF 322 +#define TK_PLUS 323 +#define TK_PRIVILEGE 324 +#define TK_RAISE 325 +#define TK_RESTRICT 326 +#define TK_ROW 327 +#define TK_SEMI 328 +#define TK_STAR 329 +#define TK_STATEMENT 330 +#define TK_STRICT 331 +#define TK_STRING 332 +#define TK_TIMES 333 +#define TK_VALUES 334 +#define TK_VARIABLE 335 +#define TK_VIEW 336 +#define TK_WAL 337 diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 6031b99cfc..0826df67c0 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -123,8 +123,8 @@ typedef struct SSnapContext { SHashObj* suidInfo; SArray* idList; int32_t index; - bool withMeta; - bool queryMeta; // true-get meta, false-get data + int8_t withMeta; + int8_t queryMeta; // true-get meta, false-get data } SSnapContext; typedef struct { diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 3a36601b11..3ac971344b 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -362,7 +362,7 @@ typedef struct SCreateTopicStmt { char subDbName[TSDB_DB_NAME_LEN]; char subSTbName[TSDB_TABLE_NAME_LEN]; bool ignoreExists; - bool withMeta; + int8_t withMeta; SNode* pQuery; SNode* pWhere; } SCreateTopicStmt; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 8758cec2ec..0e22819cdb 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -917,7 +917,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; tDeleteSMqAskEpRsp(&pEpRspWrapper->msg); - } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; taosMemoryFreeClear(pRsp->pEpset); @@ -930,7 +930,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { taosMemoryFreeClear(pRsp->pEpset); taosMemoryFree(pRsp->metaRsp.metaRsp); - } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; taosMemoryFreeClear(pRsp->pEpset); @@ -1405,7 +1405,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { strcpy(pRspWrapper->topicName, pParam->topicName); pMsg->pEpSet = NULL; - if (rspType == TMQ_MSG_TYPE__POLL_RSP) { + if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp); @@ -1422,7 +1422,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp); tDecoderClear(&decoder); memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); - } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) { + } else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp); @@ -1881,7 +1881,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno)); return NULL; - } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); @@ -1981,7 +1981,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } - } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); @@ -2023,7 +2023,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { void* pRsp = NULL; int64_t numOfRows = 0; if (pollRspWrapper->taosxRsp.createTableNum == 0) { - pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); + tscError("consumer:0x%" PRIx64" createTableNum should > 0 if rsp type is data_meta", tmq->consumerId); } else { pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows); } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e84211c765..17336d4295 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -235,7 +235,7 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot); int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); -int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, +int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta, SSnapContext **ctxRet); int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index f4e930e509..18190ac533 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -260,7 +260,7 @@ static void saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo) taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable)); } -int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, +int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta, SSnapContext** ctxRet) { SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext)); if (ctx == NULL) return -1; @@ -476,7 +476,7 @@ int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLe if (ctx->index >= taosArrayGetSize(ctx->idList)) { metaDebug("tmqsnap get meta end"); ctx->index = 0; - ctx->queryMeta = false; // change to get data + ctx->queryMeta = 0; // change to get data return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f19fa54cbd..3cb8652a4b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -183,64 +183,64 @@ void tqNotifyClose(STQ* pTq) { } } -static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, - int64_t consumerId, int32_t type) { - int32_t len = 0; - int32_t code = 0; - - if (type == TMQ_MSG_TYPE__POLL_RSP) { - tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); - } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { - tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); - } - - if (code < 0) { - return -1; - } - - int32_t tlen = sizeof(SMqRspHead) + len; - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - return -1; - } - - ((SMqRspHead*)buf)->mqMsgType = type; - ((SMqRspHead*)buf)->epoch = epoch; - ((SMqRspHead*)buf)->consumerId = consumerId; - - void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - - SEncoder encoder = {0}; - tEncoderInit(&encoder, abuf, len); - - if (type == TMQ_MSG_TYPE__POLL_RSP) { - tEncodeMqDataRsp(&encoder, pRsp); - } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { - tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); - } - - tEncoderClear(&encoder); - - SRpcMsg rsp = { - .info = *pRpcHandleInfo, - .pCont = buf, - .contLen = tlen, - .code = 0, - }; - - tmsgSendRsp(&rsp); - return 0; -} +//static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, +// int64_t consumerId, int32_t type) { +// int32_t len = 0; +// int32_t code = 0; +// +// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) { +// tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); +// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { +// tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); +// } +// +// if (code < 0) { +// return -1; +// } +// +// int32_t tlen = sizeof(SMqRspHead) + len; +// void* buf = rpcMallocCont(tlen); +// if (buf == NULL) { +// return -1; +// } +// +// ((SMqRspHead*)buf)->mqMsgType = type; +// ((SMqRspHead*)buf)->epoch = epoch; +// ((SMqRspHead*)buf)->consumerId = consumerId; +// +// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); +// +// SEncoder encoder = {0}; +// tEncoderInit(&encoder, abuf, len); +// +// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) { +// tEncodeMqDataRsp(&encoder, pRsp); +// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { +// tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); +// } +// +// tEncoderClear(&encoder); +// +// SRpcMsg rsp = { +// .info = *pRpcHandleInfo, +// .pCont = buf, +// .contLen = tlen, +// .code = 0, +// }; +// +// tmsgSendRsp(&rsp); +// return 0; +//} int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { SMqDataRsp dataRsp = {0}; dataRsp.head.consumerId = pHandle->consumerId; dataRsp.head.epoch = pHandle->epoch; - dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_DATA_RSP; int64_t sver = 0, ever = 0; walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); - tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, + tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_DATA_RSP, sver, ever); char buf1[TSDB_OFFSET_LEN] = {0}; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 77a966715e..c8fd5ae02b 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -216,7 +216,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea code = 0; goto END; } else { - if (pHandle->fetchMeta) { + if (pHandle->fetchMeta != WITH_DATA) { SWalCont* pHead = &((*ppCkHead)->head); if (IS_META_MSG(pHead->msgType)) { code = walFetchBody(pHandle->pWalReader, ppCkHead); diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 1ff78c586f..62c28351aa 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -227,7 +227,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR continue; } } - if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) { + if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) { if (pRsp->createTableNum == 0) { pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); @@ -255,6 +255,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR tEncoderClear(&encoder); } + if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL){ + continue; + } for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { SSDataBlock* pBlock = taosArrayGet(pBlocks, i); tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), @@ -286,7 +289,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR continue; } } - if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) { + if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) { if (pRsp->createTableNum == 0) { pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); @@ -314,6 +317,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR tEncoderClear(&encoder); } + if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL){ + continue; + } for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { SSDataBlock* pBlock = taosArrayGet(pBlocks, i); tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index a301d82c30..ae69c4d456 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -123,28 +123,17 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand } } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { walRefLastVer(pTq->pVnode->pWal, pHandle->pRef); - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, pRequest); + SMqDataRsp dataRsp = {0}; + tqInitDataRsp(&dataRsp, pRequest); - tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, - pHandle->subKey, vgId, dataRsp.rspOffset.version); - int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); - tDeleteMqDataRsp(&dataRsp); + tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer); + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, + pHandle->subKey, vgId, dataRsp.rspOffset.version); + int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); + tDeleteMqDataRsp(&dataRsp); - *pBlockReturned = true; - return code; - } else { - STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, pRequest); - tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer); - int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); - tDeleteSTaosxRsp(&taosxRsp); - - *pBlockReturned = true; - return code; - } + *pBlockReturned = true; + return code; } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed", @@ -187,7 +176,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, } } - code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); end : { char buf[TSDB_OFFSET_LEN] = {0}; @@ -230,7 +219,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts); if (taosxRsp.blockNum > 0) { - code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } else { *offset = taosxRsp.rspOffset; @@ -260,7 +249,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } @@ -272,7 +261,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (pHead->msgType != TDMT_VND_SUBMIT) { if (totalRows > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); - code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } @@ -301,7 +290,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } else { fetchVer++; @@ -396,9 +385,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* int32_t len = 0; int32_t code = 0; - if (type == TMQ_MSG_TYPE__POLL_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) { + if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) { tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); - } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { + } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); } @@ -420,9 +409,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* SEncoder encoder = {0}; tEncoderInit(&encoder, abuf, len); - if (type == TMQ_MSG_TYPE__POLL_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) { + if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) { tEncodeMqDataRsp(&encoder, pRsp); - } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { + } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a65f5a27ab..078655ae70 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2154,7 +2154,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { qDebug("tmqsnap doRawScan called"); if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { bool hasNext = false; - if (pInfo->dataReader) { + if (pInfo->dataReader && pInfo->sContext->withMeta != ONLY_META) { code = pAPI->tsdReader.tsdNextDataBlock(pInfo->dataReader, &hasNext); if (code) { pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader); @@ -2180,7 +2180,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { SMetaTableInfo mtInfo = pAPI->snapshotFn.getMetaTableInfoFromSnapshot(pInfo->sContext); STqOffsetVal offset = {0}; - if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal + if (mtInfo.uid == 0 || pInfo->sContext->withMeta == ONLY_META) { // read snapshot done, change to get data from wal qDebug("tmqsnap read snapshot done, change to get data from wal"); tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion); } else { diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index ff394467f6..78422bf746 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -206,9 +206,9 @@ SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, cons SNode* createRestoreComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId); SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pQuery); SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SToken* pSubDbName, - bool withMeta); + int8_t withMeta); SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable, - bool withMeta, SNode* pWhere); + int8_t withMeta, SNode* pWhere); SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName); SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pCGroupId, SToken* pTopicName); SNode* createAlterLocalStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 518dd95f23..91ecda88e4 100755 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -538,14 +538,15 @@ sma_stream_opt(A) ::= sma_stream_opt(B) MAX_DELAY duration_literal(C). sma_stream_opt(A) ::= sma_stream_opt(B) DELETE_MARK duration_literal(C). { ((SStreamOptions*)B)->pDeleteMark = releaseRawExprNode(pCxt, C); A = B; } /************************************************ create/drop topic ***************************************************/ +with_meta(A) ::= . { A = 0; } +with_meta(A) ::= WITH META. { A = 1; } +with_meta(A) ::= ONLY META. { A = 2; } + cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS query_or_subquery(C). { pCxt->pRootNode = createCreateTopicStmtUseQuery(pCxt, A, &B, C); } -cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, false); } -cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) - WITH META AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, true); } -cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) - AS STABLE full_table_name(C) where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, false, D); } -cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) - WITH META AS STABLE full_table_name(C) where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, true, D); } +cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) with_meta(D) + AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, D); } +cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) with_meta(E) + AS STABLE full_table_name(C) where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, E, D); } cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); } cmd ::= DROP CONSUMER GROUP exists_opt(A) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C); } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index e08153c341..f85218c50a 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -1715,7 +1715,7 @@ SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists, } SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SToken* pSubDbName, - bool withMeta) { + int8_t withMeta) { CHECK_PARSER_STATUS(pCxt); if (!checkTopicName(pCxt, pTopicName) || !checkDbName(pCxt, pSubDbName, true)) { return NULL; @@ -1730,7 +1730,7 @@ SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, ST } SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable, - bool withMeta, SNode* pWhere) { + int8_t withMeta, SNode* pWhere) { CHECK_PARSER_STATUS(pCxt); if (!checkTopicName(pCxt, pTopicName)) { return NULL;