fix(mq): re-balance consumer if it is in an in-balance status.
This commit is contained in:
parent
d54ae0c840
commit
41f26148a2
|
@ -24,10 +24,10 @@ extern "C" {
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
MQ_CONSUMER_STATUS__MODIFY = 1,
|
MQ_CONSUMER_STATUS__MODIFY = 1,
|
||||||
MQ_CONSUMER_STATUS__MODIFY_IN_REB,
|
MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore
|
||||||
MQ_CONSUMER_STATUS__READY,
|
MQ_CONSUMER_STATUS__READY,
|
||||||
MQ_CONSUMER_STATUS__LOST,
|
MQ_CONSUMER_STATUS__LOST,
|
||||||
MQ_CONSUMER_STATUS__LOST_IN_REB,
|
MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore
|
||||||
MQ_CONSUMER_STATUS__LOST_REBD,
|
MQ_CONSUMER_STATUS__LOST_REBD,
|
||||||
MQ_CONSUMER_STATUS__REMOVED,
|
MQ_CONSUMER_STATUS__REMOVED,
|
||||||
};
|
};
|
||||||
|
|
|
@ -317,7 +317,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
||||||
}
|
}
|
||||||
taosRUnLockLatch(&pConsumer->lock);
|
taosRUnLockLatch(&pConsumer->lock);
|
||||||
} else if (status == MQ_CONSUMER_STATUS__MODIFY) {
|
} else if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
||||||
taosRLockLatch(&pConsumer->lock);
|
taosRLockLatch(&pConsumer->lock);
|
||||||
|
|
||||||
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
|
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
|
||||||
|
|
Loading…
Reference in New Issue