From 5217131f500ccb65d2ca36150ce1904d91e536ad Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 3 Jun 2024 16:48:44 +0800 Subject: [PATCH] fix:[TD-30306]error in converity scan --- source/client/src/clientTmq.c | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 7bc6a2f2e6..13fc2ce16a 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1344,12 +1344,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { goto FAIL; } - if(pMsg->pData == NULL){ - tscError("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId); - code = TSDB_CODE_TSC_INTERNAL_ERROR; - goto FAIL; - } - SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); if (pRspWrapper == NULL) { tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); @@ -1362,6 +1356,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { goto END; } + if(pMsg->pData == NULL){ + tscError("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId); + code = TSDB_CODE_TSC_INTERNAL_ERROR; + goto END; + } + int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; int32_t clientEpoch = atomic_load_32(&tmq->epoch); if (msgEpoch < clientEpoch) { @@ -1388,7 +1388,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tDecoderClear(&decoder); taosReleaseRef(tmqMgmt.rsetId, refId); code = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; + goto END; } tDecoderClear(&decoder); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); @@ -1404,7 +1404,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tDecoderClear(&decoder); taosReleaseRef(tmqMgmt.rsetId, refId); code = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; + goto END; } tDecoderClear(&decoder); memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); @@ -1415,7 +1415,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tDecoderClear(&decoder); taosReleaseRef(tmqMgmt.rsetId, refId); code = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; + goto END; } tDecoderClear(&decoder); memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead)); @@ -1889,8 +1889,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { terrno = pRspWrapper->code; tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(pRspWrapper->code)); - taosFreeQitem(pRspWrapper); - return NULL; } else { if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform askEp(tmq, NULL, false, true);