diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1657a17ff0..7917222542 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2744,6 +2744,7 @@ typedef struct { int32_t autoCommitInterval; int8_t resetOffsetCfg; int8_t enableReplay; + int8_t enableBatchMeta; } SCMSubscribeReq; static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { @@ -2764,6 +2765,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval); tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg); tlen += taosEncodeFixedI8(buf, pReq->enableReplay); + tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta); return tlen; } @@ -2788,6 +2790,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval); buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); buf = taosDecodeFixedI8(buf, &pReq->enableReplay); + buf = taosDecodeFixedI8(buf, &pReq->enableBatchMeta); return buf; } @@ -2976,6 +2979,7 @@ typedef struct { int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq); int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq); +void tDeleteSVCreateTbBatchReq(SVCreateTbBatchReq* pReq); typedef struct { int32_t code; @@ -3893,6 +3897,7 @@ typedef struct { STqOffsetVal reqOffset; int8_t enableReplay; int8_t sourceExcluded; + int8_t enableBatchMeta; } SMqPollReq; int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 69a4503680..8038273b43 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -71,6 +71,7 @@ struct tmq_conf_t { char* pass; tmq_commit_cb* commitCb; void* commitCbUserParam; + int8_t enableBatchMeta; }; struct tmq_t { @@ -87,6 +88,7 @@ struct tmq_t { uint64_t consumerId; tmq_commit_cb* commitCb; void* commitCbUserParam; + int8_t enableBatchMeta; // status SRWLatch lock; @@ -269,6 +271,7 @@ tmq_conf_t* tmq_conf_new() { conf->autoCommit = true; conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL; conf->resetOffset = TMQ_OFFSET__RESET_LATEST; + conf->enableBatchMeta = false; return conf; } @@ -398,6 +401,11 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value return TMQ_CONF_OK; } + if (strcasecmp(key, "msg.enable.batchmeta") == 0) { + conf->enableBatchMeta = (taosStr2int64(value) != 0) ? true : false; + return TMQ_CONF_OK; + } + return TMQ_CONF_UNKNOWN; } @@ -1122,6 +1130,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->resetOffsetCfg = conf->resetOffset; pTmq->replayEnable = conf->replayEnable; pTmq->sourceExcluded = conf->sourceExcluded; + pTmq->enableBatchMeta = conf->enableBatchMeta; if (conf->replayEnable) { pTmq->autoCommit = false; } @@ -1199,6 +1208,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { req.autoCommitInterval = tmq->autoCommitInterval; req.resetOffsetCfg = tmq->resetOffsetCfg; req.enableReplay = tmq->replayEnable; + req.enableBatchMeta = tmq->enableBatchMeta; for (int32_t i = 0; i < sz; i++) { char* topic = taosArrayGetP(container, i); @@ -1623,6 +1633,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl pReq->reqId = generateRequestId(); pReq->enableReplay = tmq->replayEnable; pReq->sourceExcluded = tmq->sourceExcluded; + pReq->enableBatchMeta = tmq->enableBatchMeta; } SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 3f76d03289..c17c522d4f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7199,6 +7199,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { pHead->vgId = htonl(pReq->head.vgId); pHead->contLen = htonl(tlen + headLen); } + if (tEncodeI8(&encoder, pReq->enableBatchMeta) < 0) return -1; return tlen + headLen; } @@ -7228,6 +7229,12 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (tDecodeI8(&decoder, &pReq->sourceExcluded) < 0) return -1; } + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI8(&decoder, &pReq->enableBatchMeta) < 0) return -1; + } else { + pReq->enableBatchMeta = false; + } + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -8362,6 +8369,15 @@ int tDecodeSVCreateTbBatchReq(SDecoder *pCoder, SVCreateTbBatchReq *pReq) { return 0; } +void tDeleteSVCreateTbBatchReq(SVCreateTbBatchReq* pReq) { + for (int32_t iReq = 0; iReq < pReq->nReqs; iReq++) { + SVCreateTbReq* pCreateReq = pReq->pReqs + iReq; + if (pCreateReq->type == TSDB_CHILD_TABLE) { + taosArrayDestroy(pCreateReq->ctb.tagName); + } + } +} + int tEncodeSVCreateTbRsp(SEncoder *pCoder, const SVCreateTbRsp *pRsp) { if (tStartEncode(pCoder) < 0) return -1; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 4b1ff04331..496201e79e 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -174,7 +174,7 @@ end : { } } -#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC) \ +#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC) \ SDecoder decoder = {0};\ TYPE req = {0}; \ void* data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)); \ @@ -184,11 +184,16 @@ end : { tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 " msgType %d", \ pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); \ fetchVer++; \ + DELETE_FUNC(&req); \ tDecoderClear(&decoder); \ continue; \ } \ + DELETE_FUNC(&req); \ tDecoderClear(&decoder); +static void tDeleteCommon(void* parm) { +} + static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* offset) { int code = 0; @@ -266,17 +271,28 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) { if (pHead->msgType == TDMT_VND_CREATE_TABLE) { - PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq) + PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq) } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) { - PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq) + PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteCommon) } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) { - PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq) + PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon) } else if (pHead->msgType == TDMT_VND_DELETE) { - PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes) + PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon) } } - tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s", pHead->version, vgId, TMSG_INFO(pHead->msgType)); + tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId, + TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta); + if (!pRequest->enableBatchMeta) { + SMqMetaRsp metaRsp = {0}; + tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1); + metaRsp.resMsgType = pHead->msgType; + metaRsp.metaRspLen = pHead->bodyLen; + metaRsp.metaRsp = pHead->body; + code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId); + goto end; + } + if (!btMetaRsp.batchMetaReq) { btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES); btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t)); @@ -348,7 +364,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } end: - + tDeleteMqBatchMetaRsp(&btMetaRsp); tDeleteSTaosxRsp(&taosxRsp); return code; }