fix:set vg status idle if reveive poll callback
This commit is contained in:
parent
90e938e48f
commit
9a2da3adee
|
@ -1308,6 +1308,15 @@ static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName){
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void setVgIdle(tmq_t* tmq, char* topicName, int32_t vgId){
|
||||||
|
taosWLockLatch(&tmq->lock);
|
||||||
|
SMqClientVg* pVg = getVgInfo(tmq, topicName, vgId);
|
||||||
|
if(pVg){
|
||||||
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&tmq->lock);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
|
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
|
||||||
int64_t refId = pParam->refId;
|
int64_t refId = pParam->refId;
|
||||||
|
@ -1410,12 +1419,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
tmq->consumerId, rspType, vgId, total, requestId);
|
tmq->consumerId, rspType, vgId, total, requestId);
|
||||||
|
|
||||||
END:
|
END:
|
||||||
taosWLockLatch(&tmq->lock);
|
if(code != 0){
|
||||||
SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId);
|
setVgIdle(tmq, pParam->topicName, vgId);
|
||||||
if(pVg){
|
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&tmq->lock);
|
|
||||||
|
|
||||||
tsem_post(&tmq->rspSem);
|
tsem_post(&tmq->rspSem);
|
||||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||||
|
@ -1839,6 +1845,9 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal
|
||||||
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
|
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update the status
|
||||||
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
|
|
||||||
// update the valid wal version range
|
// update the valid wal version range
|
||||||
pVg->offsetInfo.walVerBegin = sver;
|
pVg->offsetInfo.walVerBegin = sver;
|
||||||
pVg->offsetInfo.walVerEnd = ever + 1;
|
pVg->offsetInfo.walVerEnd = ever + 1;
|
||||||
|
@ -1922,6 +1931,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
|
tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
|
||||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
|
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
||||||
}
|
}
|
||||||
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||||
// todo handle the wal range and epset for each vgroup
|
// todo handle the wal range and epset for each vgroup
|
||||||
|
@ -1953,6 +1963,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
||||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
|
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
||||||
}
|
}
|
||||||
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
|
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
|
||||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
||||||
|
@ -2010,6 +2021,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
||||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
|
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);
|
tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);
|
||||||
|
|
Loading…
Reference in New Issue