From 7298feac146582bc5fd403d36c350707d13e6211 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 31 Jul 2024 14:11:45 +0800 Subject: [PATCH 1/2] fix:[TD-31146] invalid read if tmq is freed --- source/client/src/clientTmq.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 46c3cf6622..84cc251b46 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -240,7 +240,7 @@ typedef struct { SMqCommitCbParamSet* params; char topicName[TSDB_TOPIC_FNAME_LEN]; int32_t vgId; - tmq_t* pTmq; + int64_t consumerId; } SMqCommitCbParam; typedef struct SSyncCommitInfo { @@ -439,7 +439,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pEpSet); - return commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); + return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId); } static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, @@ -483,7 +483,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse pParam->params = pParamSet; pParam->vgId = vgId; - pParam->pTmq = tmq; + pParam->consumerId = tmq->consumerId; tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName)); From c75281b9ff93df4436820d775c06b3f62f89849a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 1 Aug 2024 11:29:21 +0800 Subject: [PATCH 2/2] fix:[TD-31146] invalid read if tmq is freed --- source/client/src/clientTmq.c | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 84cc251b46..c3867f2821 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1452,22 +1452,22 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tmq_t* tmq = NULL; SMqPollCbParam* pParam = (SMqPollCbParam*)param; if (pParam == NULL || pMsg == NULL) { - goto FAIL2; + return TSDB_CODE_TSC_INTERNAL_ERROR; } int64_t refId = pParam->refId; int32_t vgId = pParam->vgId; uint64_t requestId = pParam->requestId; tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { - code = TSDB_CODE_TMQ_CONSUMER_CLOSED; - goto FAIL2; + return TSDB_CODE_TMQ_CONSUMER_CLOSED; } SMqPollRspWrapper* pRspWrapper = NULL; - code = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper); - if (code) { + int32_t ret = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper); + if (ret) { + code = ret; tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); - goto FAIL1; + goto END; } if (code != 0) { @@ -1550,25 +1550,23 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } END: - pRspWrapper->code = code; - pRspWrapper->vgId = vgId; - (void)strcpy(pRspWrapper->topicName, pParam->topicName); - code = taosWriteQitem(tmq->mqueue, pRspWrapper); - if(code != 0){ - tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); + if (pRspWrapper){ + pRspWrapper->code = code; + pRspWrapper->vgId = vgId; + (void)strcpy(pRspWrapper->topicName, pParam->topicName); + code = taosWriteQitem(tmq->mqueue, pRspWrapper); + if(code != 0){ + tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); + } } - int32_t total = taosQueueItemSize(tmq->mqueue); tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId, rspType, vgId, total, requestId); -FAIL1: - (void)taosReleaseRef(tmqMgmt.rsetId, refId); - -FAIL2: if (tmq) (void)tsem2_post(&tmq->rspSem); if (pMsg) taosMemoryFreeClear(pMsg->pData); if (pMsg) taosMemoryFreeClear(pMsg->pEpSet); + (void)taosReleaseRef(tmqMgmt.rsetId, refId); return code; }