From 62a781ac6e7ffcb4dc9f49cc30dc018a9e21065e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 23 Jul 2024 14:32:28 +0800 Subject: [PATCH] fix:[TD-31017]process return value in client for tmq --- include/common/tmsg.h | 4 +++- source/client/src/clientTmq.c | 39 +++++++++++++++++++---------------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a5dea8a44e..2befefc732 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -4036,7 +4036,9 @@ static FORCE_INLINE void* tDecodeSMqAskEpRsp(void* buf, SMqAskEpRsp* pRsp) { for (int32_t i = 0; i < sz; i++) { SMqSubTopicEp topicEp; buf = tDecodeMqSubTopicEp(buf, &topicEp); - taosArrayPush(pRsp->topics, &topicEp); + if (taosArrayPush(pRsp->topics, &topicEp) == NULL) { + return NULL; + } } return buf; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index e1b7eab40b..3f578648fa 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -794,7 +794,7 @@ void tmqReplayTask(void* param, void* tmrId) { if (tmq == NULL) return; (void)tsem2_post(&tmq->rspSem); - taosReleaseRef(tmqMgmt.rsetId, refId); + (void)taosReleaseRef(tmqMgmt.rsetId, refId); } void tmqAssignDelayedCommitTask(void* param, void* tmrId) { @@ -833,7 +833,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { } } taosWUnLockLatch(&tmq->lock); - taosReleaseRef(tmqMgmt.rsetId, refId); + (void)taosReleaseRef(tmqMgmt.rsetId, refId); } tDestroySMqHbRsp(&rsp); taosMemoryFree(pMsg->pData); @@ -936,9 +936,9 @@ void tmqSendHbReq(void* param, void* tmrId) { OVER: tDestroySMqHbReq(&req); if (tmrId != NULL) { - taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); + (void)taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); } - taosReleaseRef(tmqMgmt.rsetId, refId); + (void)taosReleaseRef(tmqMgmt.rsetId, refId); } static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) { @@ -1589,7 +1589,7 @@ END: tmq->consumerId, rspType, vgId, total, requestId); FAIL1: - taosReleaseRef(tmqMgmt.rsetId, refId); + (void)taosReleaseRef(tmqMgmt.rsetId, refId); FAIL2: if (tmq) (void)tsem2_post(&tmq->rspSem); @@ -2748,7 +2748,7 @@ void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, i int32_t accId = tmq->pTscObj->acctId; char tname[TSDB_TOPIC_FNAME_LEN] = {0}; - sprintf(tname, "%d.%s", accId, pTopicName); + (void)snprintf(tname, TSDB_TOPIC_FNAME_LEN, "%d.%s", accId, pTopicName); taosWLockLatch(&tmq->lock); SMqClientVg* pVg = NULL; @@ -2802,11 +2802,12 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch); if (pParam->sync) { SMqAskEpRsp rsp = {0}; - tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); - (void)doUpdateLocalEp(tmq, head->epoch, &rsp); + if(tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL){ + (void)doUpdateLocalEp(tmq, head->epoch, &rsp); + } tDeleteSMqAskEpRsp(&rsp); } else { - SMqAskEpRspWrapper* pWrapper; + SMqAskEpRspWrapper* pWrapper = NULL; code = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0, (void**)&pWrapper); if (code) { goto END; @@ -2815,13 +2816,15 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP; pWrapper->epoch = head->epoch; (void)memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); - tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg); - - taosWriteQitem(tmq->mqueue, pWrapper); + if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg) != NULL){ + taosFreeQitem(pWrapper); + }else{ + (void)taosWriteQitem(tmq->mqueue, pWrapper); + } } END: - taosReleaseRef(tmqMgmt.rsetId, pParam->refId); + (void)taosReleaseRef(tmqMgmt.rsetId, pParam->refId); FAIL: if (pParam->sync) { @@ -3013,13 +3016,13 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { .currentOffset = rsp.common.rspOffset.version, .vgId = pParam->vgId}; - taosThreadMutexLock(&pCommon->mutex); + (void)taosThreadMutexLock(&pCommon->mutex); if(taosArrayPush(pCommon->pList, &assignment) == NULL){ tscError("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId, pParam->vgId, pCommon->pTopicName); code = TSDB_CODE_TSC_INTERNAL_ERROR; } - taosThreadMutexUnlock(&pCommon->mutex); + (void)taosThreadMutexUnlock(&pCommon->mutex); } END: @@ -3042,7 +3045,7 @@ static void destroyCommonInfo(SMqVgCommon* pCommon) { } (void)taosArrayDestroy(pCommon->pList); (void)tsem2_destroy(&pCommon->rsp); - taosThreadMutexDestroy(&pCommon->mutex); + (void)taosThreadMutexDestroy(&pCommon->mutex); taosMemoryFree(pCommon->pTopicName); taosMemoryFree(pCommon); } @@ -3359,7 +3362,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a code = TSDB_CODE_OUT_OF_MEMORY; goto end; } - taosThreadMutexInit(&pCommon->mutex, 0); + (void)taosThreadMutexInit(&pCommon->mutex, 0); pCommon->pTopicName = taosStrdup(pTopic->topicName); pCommon->consumerId = tmq->consumerId; @@ -3545,7 +3548,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ taosWUnLockLatch(&tmq->lock); SMqSeekReq req = {0}; - snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname); + (void)snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname); req.head.vgId = vgId; req.consumerId = tmq->consumerId;