diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index fc9f7540f7..9339fc0ceb 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -874,7 +874,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { return 0; } -static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { +static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { // do nothing } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { @@ -905,6 +905,8 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { taosArrayDestroy(pRsp->taosxRsp.createTableLen); taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree); } + + return NULL; } void tmqClearUnhandleMsg(tmq_t* tmq) { @@ -1788,29 +1790,28 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems); while (1) { - SMqRspWrapper* rspWrapper = NULL; - taosGetQitem(tmq->qall, (void**)&rspWrapper); + SMqRspWrapper* pRspWrapper = NULL; + taosGetQitem(tmq->qall, (void**)&pRspWrapper); - if (rspWrapper == NULL) { + if (pRspWrapper == NULL) { taosReadAllQitems(tmq->mqueue, tmq->qall); - taosGetQitem(tmq->qall, (void**)&rspWrapper); + taosGetQitem(tmq->qall, (void**)&pRspWrapper); - if (rspWrapper == NULL) { + if (pRspWrapper == NULL) { return NULL; } } - tscDebug("consumer:0x%"PRIx64" handle rsp, type:%d", tmq->consumerId, rspWrapper->tmqRspType); + tscDebug("consumer:0x%"PRIx64" handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType); - if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { - taosFreeQitem(rspWrapper); + if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { + taosFreeQitem(pRspWrapper); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno)); return NULL; - } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { - SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { + SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; - /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ int32_t consumerEpoch = atomic_load_32(&tmq->epoch); SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp; @@ -1835,9 +1836,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pDataRsp->blockNum == 0) { tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pollRspWrapper->reqId); + pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); - rspWrapper = NULL; - continue; } else { // build rsp SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg); tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%"PRId64" reqId:0x%" PRIx64, @@ -1850,11 +1850,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pDataRsp->head.epoch, consumerEpoch); - tmqFreeRspWrapper(rspWrapper); + pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } - } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { - SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { + SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId); @@ -1870,11 +1870,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); - tmqFreeRspWrapper(rspWrapper); + pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } - } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { - SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { + SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) { @@ -1883,10 +1883,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); if (pollRspWrapper->taosxRsp.blockNum == 0) { - rspWrapper = NULL; tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, pollRspWrapper->reqId); pVg->emptyBlockReceiveTs = taosGetTimestampMs(); + pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); continue; } else { @@ -1912,15 +1912,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); - tmqFreeRspWrapper(rspWrapper); + pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } } else { tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId); bool reset = false; - tmqHandleNoPollRsp(tmq, rspWrapper, &reset); - taosFreeQitem(rspWrapper); + tmqHandleNoPollRsp(tmq, pRspWrapper, &reset); + taosFreeQitem(pRspWrapper); if (pollIfReset && reset) { tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId); tmqPollImpl(tmq, timeout); @@ -1968,7 +1968,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { if (tmqPollImpl(tmq, timeout) < 0) { tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId); - /*return NULL;*/ } rspObj = tmqHandleAllRsp(tmq, timeout, false);