Merge pull request #26310 from taosdata/fix/TD-30704-3.0
fix:[TD-30704] null pointer error if exception occurs
This commit is contained in:
commit
18019bcd44
|
@ -769,7 +769,9 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
if (pMsg) {
|
||||
if (pMsg == NULL) {
|
||||
return code;
|
||||
}
|
||||
SMqHbRsp rsp = {0};
|
||||
tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
|
||||
|
||||
|
@ -796,7 +798,6 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
tDestroySMqHbRsp(&rsp);
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -984,10 +985,16 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
|
|||
}
|
||||
|
||||
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
if(param == NULL) {
|
||||
return code;
|
||||
}
|
||||
|
||||
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
|
||||
pParam->rspErr = code;
|
||||
|
||||
if(pMsg){
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
}
|
||||
tsem_post(&pParam->rspSem);
|
||||
return 0;
|
||||
}
|
||||
|
@ -2565,6 +2572,7 @@ end:
|
|||
}
|
||||
|
||||
int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
if(param == NULL) return code;
|
||||
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
|
||||
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
|
||||
if (tmq == NULL) {
|
||||
|
@ -2577,6 +2585,9 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
goto END;
|
||||
}
|
||||
|
||||
if (pMsg == NULL) {
|
||||
goto END;
|
||||
}
|
||||
SMqRspHead* head = pMsg->pData;
|
||||
int32_t epoch = atomic_load_32(&tmq->epoch);
|
||||
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
|
||||
|
@ -2606,18 +2617,24 @@ END:
|
|||
FAIL:
|
||||
if (pParam->sync) {
|
||||
SAskEpInfo* pInfo = pParam->pParam;
|
||||
if(pInfo) {
|
||||
pInfo->code = code;
|
||||
tsem_post(&pInfo->sem);
|
||||
}
|
||||
}
|
||||
|
||||
if(pMsg){
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
taosMemoryFree(pMsg->pData);
|
||||
}
|
||||
|
||||
taosMemoryFree(pParam);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t syncAskEp(tmq_t* pTmq) {
|
||||
SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
|
||||
if(pInfo == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
tsem_init(&pInfo->sem, 0, 0);
|
||||
|
||||
askEp(pTmq, pInfo, true, false);
|
||||
|
@ -2769,6 +2786,9 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
|
|||
}
|
||||
|
||||
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
if(param == NULL) {
|
||||
return code;
|
||||
}
|
||||
SMqVgWalInfoParam* pParam = param;
|
||||
SMqVgCommon* pCommon = pParam->pCommon;
|
||||
|
||||
|
@ -2800,8 +2820,11 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
tsem_post(&pCommon->rsp);
|
||||
}
|
||||
|
||||
if(pMsg){
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -3233,6 +3256,9 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
}
|
||||
if(param == NULL) {
|
||||
return code;
|
||||
}
|
||||
SMqSeekParam* pParam = param;
|
||||
pParam->code = code;
|
||||
tsem_post(&pParam->sem);
|
||||
|
|
Loading…
Reference in New Issue