enh:[TD-30270]opti data struct in tmq

This commit is contained in:
wangmm0220 2024-09-09 16:49:56 +08:00
parent d870d0a967
commit 221733edb2
2 changed files with 16 additions and 7 deletions

View File

@ -2096,13 +2096,21 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
raw->raw_len = rspObj->metaRsp.metaRspLen; raw->raw_len = rspObj->metaRsp.metaRspLen;
raw->raw_type = rspObj->metaRsp.resMsgType; raw->raw_type = rspObj->metaRsp.resMsgType;
uDebug("tmq get raw type meta:%p", raw); uDebug("tmq get raw type meta:%p", raw);
} else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { } else if (TD_RES_TMQ(res)) {
int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw); int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw);
if (code != 0) { if (code != 0) {
uError("tmq get raw type error:%d", terrno); uError("tmq get raw type error:%d", terrno);
return code; return code;
} }
raw->raw_type = rspObj->resType; raw->raw_type = RES_TYPE__TMQ;
uDebug("tmq get raw type data:%p", raw);
} else if (TD_RES_TMQ_METADATA(res)) {
int32_t code = encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw);
if (code != 0) {
uError("tmq get raw type error:%d", terrno);
return code;
}
raw->raw_type = RES_TYPE__TMQ_METADATA;
uDebug("tmq get raw type metadata:%p", raw); uDebug("tmq get raw type metadata:%p", raw);
} else if (TD_RES_TMQ_BATCH_META(res)) { } else if (TD_RES_TMQ_BATCH_META(res)) {
raw->raw = rspObj->batchMetaRsp.pMetaBuff; raw->raw = rspObj->batchMetaRsp.pMetaBuff;

View File

@ -55,7 +55,7 @@
(void)memcpy(DATA, pMsg->pData, sizeof(SMqRspHead)); (void)memcpy(DATA, pMsg->pData, sizeof(SMqRspHead));
#define DELETE_POLL_RSP(FUNC,DATA) \ #define DELETE_POLL_RSP(FUNC,DATA) \
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;\ SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;\
taosMemoryFreeClear(pRsp->pEpset);\ taosMemoryFreeClear(pRsp->pEpset);\
FUNC(DATA); FUNC(DATA);
@ -1029,9 +1029,10 @@ static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
tDeleteSMqAskEpRsp(&rspWrapper->epRsp); tDeleteSMqAskEpRsp(&rspWrapper->epRsp);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP || } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { DELETE_POLL_RSP(tDeleteMqDataRsp, &pRsp->dataRsp)
DELETE_POLL_RSP(tDeleteMqDataRsp,&pRsp->dataRsp) } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP){
DELETE_POLL_RSP(tDeleteSTaosxRsp, &pRsp->dataRsp)
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp) DELETE_POLL_RSP(tDeleteMqMetaRsp,&pRsp->metaRsp)
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {