fix(client): fix potential memory leak.

This commit is contained in:
Haojun Liao 2023-03-17 14:29:12 +08:00
parent 8d1c6956d6
commit 4f15a32839
1 changed files with 24 additions and 25 deletions

View File

@ -874,7 +874,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
return 0; return 0;
} }
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
// do nothing // do nothing
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
@ -905,6 +905,8 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
taosArrayDestroy(pRsp->taosxRsp.createTableLen); taosArrayDestroy(pRsp->taosxRsp.createTableLen);
taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree); taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
} }
return NULL;
} }
void tmqClearUnhandleMsg(tmq_t* tmq) { void tmqClearUnhandleMsg(tmq_t* tmq) {
@ -1788,29 +1790,28 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems); tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
while (1) { while (1) {
SMqRspWrapper* rspWrapper = NULL; SMqRspWrapper* pRspWrapper = NULL;
taosGetQitem(tmq->qall, (void**)&rspWrapper); taosGetQitem(tmq->qall, (void**)&pRspWrapper);
if (rspWrapper == NULL) { if (pRspWrapper == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall); taosReadAllQitems(tmq->mqueue, tmq->qall);
taosGetQitem(tmq->qall, (void**)&rspWrapper); taosGetQitem(tmq->qall, (void**)&pRspWrapper);
if (rspWrapper == NULL) { if (pRspWrapper == NULL) {
return NULL; return NULL;
} }
} }
tscDebug("consumer:0x%"PRIx64" handle rsp, type:%d", tmq->consumerId, rspWrapper->tmqRspType); tscDebug("consumer:0x%"PRIx64" handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
taosFreeQitem(rspWrapper); taosFreeQitem(pRspWrapper);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno)); tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
return NULL; return NULL;
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch); int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp; SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
@ -1835,9 +1836,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (pDataRsp->blockNum == 0) { if (pDataRsp->blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, reqId:0x%" PRIx64, tmq->consumerId, tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, reqId:0x%" PRIx64, tmq->consumerId,
pVg->vgId, buf, pollRspWrapper->reqId); pVg->vgId, buf, pollRspWrapper->reqId);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
rspWrapper = NULL;
continue;
} else { // build rsp } else { // build rsp
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg); SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg);
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%"PRId64" reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%"PRId64" reqId:0x%" PRIx64,
@ -1850,11 +1850,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else { } else {
tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pDataRsp->head.epoch, consumerEpoch); tmq->consumerId, pDataRsp->head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper); pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch); int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);
@ -1870,11 +1870,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else { } else {
tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper); pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch); int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) { if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
@ -1883,10 +1883,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->taosxRsp.blockNum == 0) { if (pollRspWrapper->taosxRsp.blockNum == 0) {
rspWrapper = NULL;
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId,
pollRspWrapper->reqId); pollRspWrapper->reqId);
pVg->emptyBlockReceiveTs = taosGetTimestampMs(); pVg->emptyBlockReceiveTs = taosGetTimestampMs();
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
continue; continue;
} else { } else {
@ -1912,15 +1912,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else { } else {
tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper); pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
} else { } else {
tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);
bool reset = false; bool reset = false;
tmqHandleNoPollRsp(tmq, rspWrapper, &reset); tmqHandleNoPollRsp(tmq, pRspWrapper, &reset);
taosFreeQitem(rspWrapper); taosFreeQitem(pRspWrapper);
if (pollIfReset && reset) { if (pollIfReset && reset) {
tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
tmqPollImpl(tmq, timeout); tmqPollImpl(tmq, timeout);
@ -1968,7 +1968,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
if (tmqPollImpl(tmq, timeout) < 0) { if (tmqPollImpl(tmq, timeout) < 0) {
tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
/*return NULL;*/
} }
rspObj = tmqHandleAllRsp(tmq, timeout, false); rspObj = tmqHandleAllRsp(tmq, timeout, false);