add tmq param

This commit is contained in:
54liuyao 2024-05-28 18:34:37 +08:00
parent d815bb117b
commit a2255ca355
4 changed files with 55 additions and 7 deletions

View File

@ -2744,6 +2744,7 @@ typedef struct {
int32_t autoCommitInterval; int32_t autoCommitInterval;
int8_t resetOffsetCfg; int8_t resetOffsetCfg;
int8_t enableReplay; int8_t enableReplay;
int8_t enableBatchMeta;
} SCMSubscribeReq; } SCMSubscribeReq;
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
@ -2764,6 +2765,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval); tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval);
tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg); tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg);
tlen += taosEncodeFixedI8(buf, pReq->enableReplay); tlen += taosEncodeFixedI8(buf, pReq->enableReplay);
tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta);
return tlen; return tlen;
} }
@ -2788,6 +2790,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval); buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval);
buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg);
buf = taosDecodeFixedI8(buf, &pReq->enableReplay); buf = taosDecodeFixedI8(buf, &pReq->enableReplay);
buf = taosDecodeFixedI8(buf, &pReq->enableBatchMeta);
return buf; return buf;
} }
@ -2976,6 +2979,7 @@ typedef struct {
int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq); int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq); int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq);
void tDeleteSVCreateTbBatchReq(SVCreateTbBatchReq* pReq);
typedef struct { typedef struct {
int32_t code; int32_t code;
@ -3893,6 +3897,7 @@ typedef struct {
STqOffsetVal reqOffset; STqOffsetVal reqOffset;
int8_t enableReplay; int8_t enableReplay;
int8_t sourceExcluded; int8_t sourceExcluded;
int8_t enableBatchMeta;
} SMqPollReq; } SMqPollReq;
int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq); int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);

View File

@ -71,6 +71,7 @@ struct tmq_conf_t {
char* pass; char* pass;
tmq_commit_cb* commitCb; tmq_commit_cb* commitCb;
void* commitCbUserParam; void* commitCbUserParam;
int8_t enableBatchMeta;
}; };
struct tmq_t { struct tmq_t {
@ -87,6 +88,7 @@ struct tmq_t {
uint64_t consumerId; uint64_t consumerId;
tmq_commit_cb* commitCb; tmq_commit_cb* commitCb;
void* commitCbUserParam; void* commitCbUserParam;
int8_t enableBatchMeta;
// status // status
SRWLatch lock; SRWLatch lock;
@ -269,6 +271,7 @@ tmq_conf_t* tmq_conf_new() {
conf->autoCommit = true; conf->autoCommit = true;
conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL; conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
conf->resetOffset = TMQ_OFFSET__RESET_LATEST; conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
conf->enableBatchMeta = false;
return conf; return conf;
} }
@ -398,6 +401,11 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcasecmp(key, "msg.enable.batchmeta") == 0) {
conf->enableBatchMeta = (taosStr2int64(value) != 0) ? true : false;
return TMQ_CONF_OK;
}
return TMQ_CONF_UNKNOWN; return TMQ_CONF_UNKNOWN;
} }
@ -1122,6 +1130,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->resetOffsetCfg = conf->resetOffset; pTmq->resetOffsetCfg = conf->resetOffset;
pTmq->replayEnable = conf->replayEnable; pTmq->replayEnable = conf->replayEnable;
pTmq->sourceExcluded = conf->sourceExcluded; pTmq->sourceExcluded = conf->sourceExcluded;
pTmq->enableBatchMeta = conf->enableBatchMeta;
if (conf->replayEnable) { if (conf->replayEnable) {
pTmq->autoCommit = false; pTmq->autoCommit = false;
} }
@ -1199,6 +1208,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
req.autoCommitInterval = tmq->autoCommitInterval; req.autoCommitInterval = tmq->autoCommitInterval;
req.resetOffsetCfg = tmq->resetOffsetCfg; req.resetOffsetCfg = tmq->resetOffsetCfg;
req.enableReplay = tmq->replayEnable; req.enableReplay = tmq->replayEnable;
req.enableBatchMeta = tmq->enableBatchMeta;
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
char* topic = taosArrayGetP(container, i); char* topic = taosArrayGetP(container, i);
@ -1623,6 +1633,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
pReq->reqId = generateRequestId(); pReq->reqId = generateRequestId();
pReq->enableReplay = tmq->replayEnable; pReq->enableReplay = tmq->replayEnable;
pReq->sourceExcluded = tmq->sourceExcluded; pReq->sourceExcluded = tmq->sourceExcluded;
pReq->enableBatchMeta = tmq->enableBatchMeta;
} }
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {

View File

@ -7199,6 +7199,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
pHead->vgId = htonl(pReq->head.vgId); pHead->vgId = htonl(pReq->head.vgId);
pHead->contLen = htonl(tlen + headLen); pHead->contLen = htonl(tlen + headLen);
} }
if (tEncodeI8(&encoder, pReq->enableBatchMeta) < 0) return -1;
return tlen + headLen; return tlen + headLen;
} }
@ -7228,6 +7229,12 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
if (tDecodeI8(&decoder, &pReq->sourceExcluded) < 0) return -1; if (tDecodeI8(&decoder, &pReq->sourceExcluded) < 0) return -1;
} }
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI8(&decoder, &pReq->enableBatchMeta) < 0) return -1;
} else {
pReq->enableBatchMeta = false;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
@ -8362,6 +8369,15 @@ int tDecodeSVCreateTbBatchReq(SDecoder *pCoder, SVCreateTbBatchReq *pReq) {
return 0; return 0;
} }
void tDeleteSVCreateTbBatchReq(SVCreateTbBatchReq* pReq) {
for (int32_t iReq = 0; iReq < pReq->nReqs; iReq++) {
SVCreateTbReq* pCreateReq = pReq->pReqs + iReq;
if (pCreateReq->type == TSDB_CHILD_TABLE) {
taosArrayDestroy(pCreateReq->ctb.tagName);
}
}
}
int tEncodeSVCreateTbRsp(SEncoder *pCoder, const SVCreateTbRsp *pRsp) { int tEncodeSVCreateTbRsp(SEncoder *pCoder, const SVCreateTbRsp *pRsp) {
if (tStartEncode(pCoder) < 0) return -1; if (tStartEncode(pCoder) < 0) return -1;

View File

@ -174,7 +174,7 @@ end : {
} }
} }
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC) \ #define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC) \
SDecoder decoder = {0};\ SDecoder decoder = {0};\
TYPE req = {0}; \ TYPE req = {0}; \
void* data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)); \ void* data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)); \
@ -184,11 +184,16 @@ end : {
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 " msgType %d", \ tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 " msgType %d", \
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); \ pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); \
fetchVer++; \ fetchVer++; \
DELETE_FUNC(&req); \
tDecoderClear(&decoder); \ tDecoderClear(&decoder); \
continue; \ continue; \
} \ } \
DELETE_FUNC(&req); \
tDecoderClear(&decoder); tDecoderClear(&decoder);
static void tDeleteCommon(void* parm) {
}
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* offset) { SRpcMsg* pMsg, STqOffsetVal* offset) {
int code = 0; int code = 0;
@ -266,17 +271,28 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) { if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
if (pHead->msgType == TDMT_VND_CREATE_TABLE) { if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq) PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
} else if (pHead->msgType == TDMT_VND_ALTER_TABLE) { } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq) PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteCommon)
} else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) { } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq) PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon)
} else if (pHead->msgType == TDMT_VND_DELETE) { } else if (pHead->msgType == TDMT_VND_DELETE) {
PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes) PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon)
} }
} }
tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s", pHead->version, vgId, TMSG_INFO(pHead->msgType)); tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
if (!pRequest->enableBatchMeta) {
SMqMetaRsp metaRsp = {0};
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body;
code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
goto end;
}
if (!btMetaRsp.batchMetaReq) { if (!btMetaRsp.batchMetaReq) {
btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES); btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t)); btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
@ -348,7 +364,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
} }
end: end:
tDeleteMqBatchMetaRsp(&btMetaRsp);
tDeleteSTaosxRsp(&taosxRsp); tDeleteSTaosxRsp(&taosxRsp);
return code; return code;
} }