diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 87511865e9..0a64a02f20 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1308,6 +1308,15 @@ static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName){ return NULL; } +static void setVgIdle(tmq_t* tmq, char* topicName, int32_t vgId){ + taosWLockLatch(&tmq->lock); + SMqClientVg* pVg = getVgInfo(tmq, topicName, vgId); + if(pVg){ + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + } + taosWUnLockLatch(&tmq->lock); +} + int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { SMqPollCbParam* pParam = (SMqPollCbParam*)param; int64_t refId = pParam->refId; @@ -1410,12 +1419,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tmq->consumerId, rspType, vgId, total, requestId); END: - taosWLockLatch(&tmq->lock); - SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId); - if(pVg){ - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + if(code != 0){ + setVgIdle(tmq, pParam->topicName, vgId); } - taosWUnLockLatch(&tmq->lock); tsem_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); @@ -1839,6 +1845,9 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); } + // update the status + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // update the valid wal version range pVg->offsetInfo.walVerBegin = sver; pVg->offsetInfo.walVerEnd = ever + 1; @@ -1922,6 +1931,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); } } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { // todo handle the wal range and epset for each vgroup @@ -1953,6 +1963,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); } } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; @@ -2010,6 +2021,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); } } else { tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);