From 221733edb26fa75d25309bd7d99e488281b032b7 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 9 Sep 2024 16:49:56 +0800 Subject: [PATCH] enh:[TD-30270]opti data struct in tmq --- source/client/src/clientRawBlockWrite.c | 14 +++++++++++--- source/client/src/clientTmq.c | 9 +++++---- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index d481ede616..809d1442ba 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -2096,13 +2096,21 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { raw->raw_len = rspObj->metaRsp.metaRspLen; raw->raw_type = rspObj->metaRsp.resMsgType; uDebug("tmq get raw type meta:%p", raw); - } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { - int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw); + } else if (TD_RES_TMQ(res)) { + int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw); if (code != 0) { uError("tmq get raw type error:%d", terrno); return code; } - raw->raw_type = rspObj->resType; + raw->raw_type = RES_TYPE__TMQ; + uDebug("tmq get raw type data:%p", raw); + } else if (TD_RES_TMQ_METADATA(res)) { + int32_t code = encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw); + if (code != 0) { + uError("tmq get raw type error:%d", terrno); + return code; + } + raw->raw_type = RES_TYPE__TMQ_METADATA; uDebug("tmq get raw type metadata:%p", raw); } else if (TD_RES_TMQ_BATCH_META(res)) { raw->raw = rspObj->batchMetaRsp.pMetaBuff; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 2ee9396f7f..20798fbdeb 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -55,7 +55,7 @@ (void)memcpy(DATA, pMsg->pData, sizeof(SMqRspHead)); #define DELETE_POLL_RSP(FUNC,DATA) \ - SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;\ + SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;\ taosMemoryFreeClear(pRsp->pEpset);\ FUNC(DATA); @@ -1029,9 +1029,10 @@ static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) { static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { tDeleteSMqAskEpRsp(&rspWrapper->epRsp); - } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP || - rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { - DELETE_POLL_RSP(tDeleteMqDataRsp,&pRsp->dataRsp) + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { + DELETE_POLL_RSP(tDeleteMqDataRsp, &pRsp->dataRsp) + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP){ + DELETE_POLL_RSP(tDeleteSTaosxRsp, &pRsp->dataRsp) } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp) } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {