fix(tmq): status check
This commit is contained in:
parent
ec5da952f9
commit
238b2c1639
|
|
@ -92,6 +92,14 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
|
||||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
|
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
|
||||||
ASSERT(pConsumer);
|
ASSERT(pConsumer);
|
||||||
|
|
||||||
|
mInfo("receive consumer lost msg, consumer id %ld, status %s", pLostMsg->consumerId,
|
||||||
|
mndConsumerStatusName(pConsumer->status));
|
||||||
|
|
||||||
|
if (pConsumer->status != MQ_CONSUMER_STATUS__READY) {
|
||||||
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
|
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
|
||||||
pConsumerNew->updateType = CONSUMER_UPDATE__LOST;
|
pConsumerNew->updateType = CONSUMER_UPDATE__LOST;
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue