diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b4606ffe86..97955c0c49 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -4267,6 +4267,7 @@ typedef struct { void* rawData; }; }; + void* data; //for free, only effected if type is data or metadata. raw data not effected } SMqDataRsp; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 15a7a9b538..d2b6aff2bf 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1745,7 +1745,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->resetOffsetCfg = conf->resetOffset; pTmq->replayEnable = conf->replayEnable; pTmq->sourceExcluded = conf->sourceExcluded; - pTmq->rawData = conf->rawData;; + pTmq->rawData = conf->rawData; pTmq->enableBatchMeta = conf->enableBatchMeta; tstrncpy(pTmq->user, user, TSDB_USER_LEN); if (taosGetFqdn(pTmq->fqdn) != 0) { @@ -2512,10 +2512,14 @@ static int32_t processWrapperData(SMqRspWrapper* pRspWrapper){ int32_t code = 0; if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp) + pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data; + pRspWrapper->pollRsp.data = NULL; } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp) } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp) + pRspWrapper->pollRsp.dataRsp.data = pRspWrapper->pollRsp.data; + pRspWrapper->pollRsp.data = NULL; } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp) } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index d7c793df33..e5926dcb24 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -11518,6 +11518,7 @@ static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) { pRsp->blockTbName = NULL; tOffsetDestroy(&pRsp->reqOffset); tOffsetDestroy(&pRsp->rspOffset); + taosMemoryFreeClear(pRsp->data); } void tDeleteMqDataRsp(SMqDataRsp *rsp) { tDeleteMqDataRspCommon(rsp); }