fix:[TD-31146] invalid read if tmq is freed
This commit is contained in:
parent
7298feac14
commit
c75281b9ff
|
@ -1452,22 +1452,22 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
tmq_t* tmq = NULL;
|
||||
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
|
||||
if (pParam == NULL || pMsg == NULL) {
|
||||
goto FAIL2;
|
||||
return TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
}
|
||||
int64_t refId = pParam->refId;
|
||||
int32_t vgId = pParam->vgId;
|
||||
uint64_t requestId = pParam->requestId;
|
||||
tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
|
||||
if (tmq == NULL) {
|
||||
code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
|
||||
goto FAIL2;
|
||||
return TSDB_CODE_TMQ_CONSUMER_CLOSED;
|
||||
}
|
||||
|
||||
SMqPollRspWrapper* pRspWrapper = NULL;
|
||||
code = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
|
||||
if (code) {
|
||||
int32_t ret = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
|
||||
if (ret) {
|
||||
code = ret;
|
||||
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
|
||||
goto FAIL1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
if (code != 0) {
|
||||
|
@ -1550,25 +1550,23 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
}
|
||||
|
||||
END:
|
||||
pRspWrapper->code = code;
|
||||
pRspWrapper->vgId = vgId;
|
||||
(void)strcpy(pRspWrapper->topicName, pParam->topicName);
|
||||
code = taosWriteQitem(tmq->mqueue, pRspWrapper);
|
||||
if(code != 0){
|
||||
tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
|
||||
if (pRspWrapper){
|
||||
pRspWrapper->code = code;
|
||||
pRspWrapper->vgId = vgId;
|
||||
(void)strcpy(pRspWrapper->topicName, pParam->topicName);
|
||||
code = taosWriteQitem(tmq->mqueue, pRspWrapper);
|
||||
if(code != 0){
|
||||
tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t total = taosQueueItemSize(tmq->mqueue);
|
||||
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
|
||||
tmq->consumerId, rspType, vgId, total, requestId);
|
||||
|
||||
FAIL1:
|
||||
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
|
||||
FAIL2:
|
||||
if (tmq) (void)tsem2_post(&tmq->rspSem);
|
||||
if (pMsg) taosMemoryFreeClear(pMsg->pData);
|
||||
if (pMsg) taosMemoryFreeClear(pMsg->pEpSet);
|
||||
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue