diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 9d9473d883..4db000287c 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -755,7 +755,7 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) { if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { if (status == MQ_CONSUMER_STATUS_REBALANCE) { pConsumer->status = MQ_CONSUMER_STATUS_READY; - } else if (status == MQ_CONSUMER_STATUS_READY) { + } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) { pConsumer->status = MQ_CONSUMER_STATUS_LOST; } } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 16ae7f2548..62b671a12f 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -737,8 +737,6 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) { SMqConsumerObj *pConsumer; void *pIter = NULL; - mInfo("start to process mq timer"); - // iterate all consumers, find all modification while (1) { pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); @@ -852,6 +850,8 @@ void mndRebCntDec() { static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { int code = 0; + mInfo("start to process mq timer"); + if (!mndRebTryStart()) { mInfo("mq rebalance already in progress, do nothing"); return code; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 99553fc57a..9075f0145c 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1119,7 +1119,10 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { if (pAction->msgSent) return 0; - if (mndCannotExecuteTransAction(pMnode, topHalf)) return TSDB_CODE_MND_TRANS_CTX_SWITCH; + if (mndCannotExecuteTransAction(pMnode, topHalf)) { + terrno = TSDB_CODE_MND_TRANS_CTX_SWITCH; + return TSDB_CODE_MND_TRANS_CTX_SWITCH; + } int64_t signature = pTrans->id; signature = (signature << 32);