From 173446e84e38550c57b79e4695391538262b751e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 25 Jun 2024 13:59:56 +0800 Subject: [PATCH] fix:[TD-30704] null pointer error if exception occurs --- source/client/src/clientTmq.c | 84 +++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 29 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index f5f083e5d8..c4fc9db44f 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -769,34 +769,35 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { } int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { - if (pMsg) { - SMqHbRsp rsp = {0}; - tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp); + if (pMsg == NULL) { + return code; + } + SMqHbRsp rsp = {0}; + tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp); - int64_t refId = *(int64_t*)param; - tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); - if (tmq != NULL) { - taosWLockLatch(&tmq->lock); - for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) { - STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i); - if (privilege->noPrivilege == 1) { - int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); - for (int32_t j = 0; j < topicNumCur; j++) { - SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j); - if (strcmp(pTopicCur->topicName, privilege->topic) == 0) { - tscInfo("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic); - pTopicCur->noPrivilege = 1; - } + int64_t refId = *(int64_t*)param; + tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); + if (tmq != NULL) { + taosWLockLatch(&tmq->lock); + for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) { + STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i); + if (privilege->noPrivilege == 1) { + int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); + for (int32_t j = 0; j < topicNumCur; j++) { + SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j); + if (strcmp(pTopicCur->topicName, privilege->topic) == 0) { + tscInfo("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic); + pTopicCur->noPrivilege = 1; } } } - taosWUnLockLatch(&tmq->lock); - taosReleaseRef(tmqMgmt.rsetId, refId); } - tDestroySMqHbRsp(&rsp); - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); + taosWUnLockLatch(&tmq->lock); + taosReleaseRef(tmqMgmt.rsetId, refId); } + tDestroySMqHbRsp(&rsp); + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); return 0; } @@ -984,10 +985,16 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { } int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { + if(param == NULL) { + return code; + } + SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; pParam->rspErr = code; - taosMemoryFree(pMsg->pEpSet); + if(pMsg){ + taosMemoryFree(pMsg->pEpSet); + } tsem_post(&pParam->rspSem); return 0; } @@ -2563,6 +2570,7 @@ end: } int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { + if(param == NULL) return code; SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); if (tmq == NULL) { @@ -2575,6 +2583,9 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { goto END; } + if (pMsg == NULL) { + goto END; + } SMqRspHead* head = pMsg->pData; int32_t epoch = atomic_load_32(&tmq->epoch); tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch); @@ -2604,18 +2615,24 @@ END: FAIL: if (pParam->sync) { SAskEpInfo* pInfo = pParam->pParam; - pInfo->code = code; - tsem_post(&pInfo->sem); + if(pInfo) { + pInfo->code = code; + tsem_post(&pInfo->sem); + } + } + + if(pMsg){ + taosMemoryFree(pMsg->pEpSet); + taosMemoryFree(pMsg->pData); } - taosMemoryFree(pMsg->pEpSet); - taosMemoryFree(pMsg->pData); taosMemoryFree(pParam); return code; } int32_t syncAskEp(tmq_t* pTmq) { SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo)); + if(pInfo == NULL) return TSDB_CODE_OUT_OF_MEMORY; tsem_init(&pInfo->sem, 0, 0); askEp(pTmq, pInfo, true, false); @@ -2767,6 +2784,9 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { } static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { + if(param == NULL) { + return code; + } SMqVgWalInfoParam* pParam = param; SMqVgCommon* pCommon = pParam->pCommon; @@ -2798,8 +2818,11 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { tsem_post(&pCommon->rsp); } - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); + if(pMsg){ + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); + } + return 0; } @@ -3231,6 +3254,9 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); } + if(param == NULL) { + return code; + } SMqSeekParam* pParam = param; pParam->code = code; tsem_post(&pParam->sem);