fix:[TD-30704] null pointer error if exception occurs

This commit is contained in:
wangmm0220 2024-06-25 13:59:56 +08:00
parent 42beba30a8
commit 173446e84e
1 changed files with 55 additions and 29 deletions

View File

@ -769,34 +769,35 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
} }
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg) { if (pMsg == NULL) {
SMqHbRsp rsp = {0}; return code;
tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp); }
SMqHbRsp rsp = {0};
tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
int64_t refId = *(int64_t*)param; int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) { if (tmq != NULL) {
taosWLockLatch(&tmq->lock); taosWLockLatch(&tmq->lock);
for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) { for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) {
STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i); STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
if (privilege->noPrivilege == 1) { if (privilege->noPrivilege == 1) {
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for (int32_t j = 0; j < topicNumCur; j++) { for (int32_t j = 0; j < topicNumCur; j++) {
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j); SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
if (strcmp(pTopicCur->topicName, privilege->topic) == 0) { if (strcmp(pTopicCur->topicName, privilege->topic) == 0) {
tscInfo("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic); tscInfo("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic);
pTopicCur->noPrivilege = 1; pTopicCur->noPrivilege = 1;
}
} }
} }
} }
taosWUnLockLatch(&tmq->lock);
taosReleaseRef(tmqMgmt.rsetId, refId);
} }
tDestroySMqHbRsp(&rsp); taosWUnLockLatch(&tmq->lock);
taosMemoryFree(pMsg->pData); taosReleaseRef(tmqMgmt.rsetId, refId);
taosMemoryFree(pMsg->pEpSet);
} }
tDestroySMqHbRsp(&rsp);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
return 0; return 0;
} }
@ -984,10 +985,16 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
} }
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
if(param == NULL) {
return code;
}
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
pParam->rspErr = code; pParam->rspErr = code;
taosMemoryFree(pMsg->pEpSet); if(pMsg){
taosMemoryFree(pMsg->pEpSet);
}
tsem_post(&pParam->rspSem); tsem_post(&pParam->rspSem);
return 0; return 0;
} }
@ -2563,6 +2570,7 @@ end:
} }
int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if(param == NULL) return code;
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
if (tmq == NULL) { if (tmq == NULL) {
@ -2575,6 +2583,9 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
goto END; goto END;
} }
if (pMsg == NULL) {
goto END;
}
SMqRspHead* head = pMsg->pData; SMqRspHead* head = pMsg->pData;
int32_t epoch = atomic_load_32(&tmq->epoch); int32_t epoch = atomic_load_32(&tmq->epoch);
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch); tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
@ -2604,18 +2615,24 @@ END:
FAIL: FAIL:
if (pParam->sync) { if (pParam->sync) {
SAskEpInfo* pInfo = pParam->pParam; SAskEpInfo* pInfo = pParam->pParam;
pInfo->code = code; if(pInfo) {
tsem_post(&pInfo->sem); pInfo->code = code;
tsem_post(&pInfo->sem);
}
}
if(pMsg){
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData);
} }
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pParam); taosMemoryFree(pParam);
return code; return code;
} }
int32_t syncAskEp(tmq_t* pTmq) { int32_t syncAskEp(tmq_t* pTmq) {
SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo)); SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
if(pInfo == NULL) return TSDB_CODE_OUT_OF_MEMORY;
tsem_init(&pInfo->sem, 0, 0); tsem_init(&pInfo->sem, 0, 0);
askEp(pTmq, pInfo, true, false); askEp(pTmq, pInfo, true, false);
@ -2767,6 +2784,9 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
} }
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
if(param == NULL) {
return code;
}
SMqVgWalInfoParam* pParam = param; SMqVgWalInfoParam* pParam = param;
SMqVgCommon* pCommon = pParam->pCommon; SMqVgCommon* pCommon = pParam->pCommon;
@ -2798,8 +2818,11 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
tsem_post(&pCommon->rsp); tsem_post(&pCommon->rsp);
} }
taosMemoryFree(pMsg->pData); if(pMsg){
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
}
return 0; return 0;
} }
@ -3231,6 +3254,9 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
} }
if(param == NULL) {
return code;
}
SMqSeekParam* pParam = param; SMqSeekParam* pParam = param;
pParam->code = code; pParam->code = code;
tsem_post(&pParam->sem); tsem_post(&pParam->sem);