Merge pull request #26901 from taosdata/fix/TD-31146

fix:[TD-31146] invalid read if tmq is freed
This commit is contained in:
dapan1121 2024-08-01 19:16:50 +08:00 committed by GitHub
commit 06bbec005a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 18 additions and 20 deletions

View File

@ -247,7 +247,7 @@ typedef struct {
SMqCommitCbParamSet* params; SMqCommitCbParamSet* params;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
int32_t vgId; int32_t vgId;
tmq_t* pTmq; int64_t consumerId;
} SMqCommitCbParam; } SMqCommitCbParam;
typedef struct SSyncCommitInfo { typedef struct SSyncCommitInfo {
@ -485,7 +485,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet); taosMemoryFree(pBuf->pEpSet);
return commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); return commitRspCountDown(pParamSet, pParam->consumerId, pParam->topicName, pParam->vgId);
} }
static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName,
@ -529,7 +529,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
pParam->params = pParamSet; pParam->params = pParamSet;
pParam->vgId = vgId; pParam->vgId = vgId;
pParam->pTmq = tmq; pParam->consumerId = tmq->consumerId;
tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName)); tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
@ -1505,22 +1505,22 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = NULL; tmq_t* tmq = NULL;
SMqPollCbParam* pParam = (SMqPollCbParam*)param; SMqPollCbParam* pParam = (SMqPollCbParam*)param;
if (pParam == NULL || pMsg == NULL) { if (pParam == NULL || pMsg == NULL) {
goto FAIL2; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
int64_t refId = pParam->refId; int64_t refId = pParam->refId;
int32_t vgId = pParam->vgId; int32_t vgId = pParam->vgId;
uint64_t requestId = pParam->requestId; uint64_t requestId = pParam->requestId;
tmq = taosAcquireRef(tmqMgmt.rsetId, refId); tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) { if (tmq == NULL) {
code = TSDB_CODE_TMQ_CONSUMER_CLOSED; return TSDB_CODE_TMQ_CONSUMER_CLOSED;
goto FAIL2;
} }
SMqPollRspWrapper* pRspWrapper = NULL; SMqPollRspWrapper* pRspWrapper = NULL;
code = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper); int32_t ret = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper);
if (code) { if (ret) {
code = ret;
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
goto FAIL1; goto END;
} }
if (code != 0) { if (code != 0) {
@ -1603,6 +1603,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
END: END:
if (pRspWrapper){
pRspWrapper->code = code; pRspWrapper->code = code;
pRspWrapper->vgId = vgId; pRspWrapper->vgId = vgId;
(void)strcpy(pRspWrapper->topicName, pParam->topicName); (void)strcpy(pRspWrapper->topicName, pParam->topicName);
@ -1610,18 +1611,15 @@ END:
if(code != 0){ if(code != 0){
tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
} }
}
int32_t total = taosQueueItemSize(tmq->mqueue); int32_t total = taosQueueItemSize(tmq->mqueue);
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
tmq->consumerId, rspType, vgId, total, requestId); tmq->consumerId, rspType, vgId, total, requestId);
FAIL1:
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
FAIL2:
if (tmq) (void)tsem2_post(&tmq->rspSem); if (tmq) (void)tsem2_post(&tmq->rspSem);
if (pMsg) taosMemoryFreeClear(pMsg->pData); if (pMsg) taosMemoryFreeClear(pMsg->pData);
if (pMsg) taosMemoryFreeClear(pMsg->pEpSet); if (pMsg) taosMemoryFreeClear(pMsg->pEpSet);
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
return code; return code;
} }