diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index a365a44d56..4db000287c 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -749,6 +749,18 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { return 0; } +static void updateConsumerStatus(SMqConsumerObj *pConsumer) { + int32_t status = pConsumer->status; + + if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { + if (status == MQ_CONSUMER_STATUS_REBALANCE) { + pConsumer->status = MQ_CONSUMER_STATUS_READY; + } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) { + pConsumer->status = MQ_CONSUMER_STATUS_LOST; + } + } +} + // remove from new topic static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { int32_t size = taosArrayGetSize(pConsumer->rebNewTopics); @@ -869,11 +881,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, // set status int32_t status = pOldConsumer->status; - if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && - taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0 && - taosArrayGetSize(pOldConsumer->currentTopics) != 0){ - pOldConsumer->status = MQ_CONSUMER_STATUS_READY; - } + updateConsumerStatus(pOldConsumer); // the re-balance is triggered when the new consumer is launched. pOldConsumer->rebalanceTime = taosGetTimestampMs(); @@ -897,11 +905,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, // set status int32_t status = pOldConsumer->status; - if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && - taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0 && - taosArrayGetSize(pOldConsumer->currentTopics) == 0){ - pOldConsumer->status = MQ_CONSUMER_STATUS_LOST; - } + updateConsumerStatus(pOldConsumer); pOldConsumer->rebalanceTime = taosGetTimestampMs(); atomic_add_fetch_32(&pOldConsumer->epoch, 1); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 16ae7f2548..62b671a12f 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -737,8 +737,6 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) { SMqConsumerObj *pConsumer; void *pIter = NULL; - mInfo("start to process mq timer"); - // iterate all consumers, find all modification while (1) { pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); @@ -852,6 +850,8 @@ void mndRebCntDec() { static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { int code = 0; + mInfo("start to process mq timer"); + if (!mndRebTryStart()) { mInfo("mq rebalance already in progress, do nothing"); return code;