From bd64a78094f08aa5ad9bd3c0f1d03b7ab5e6acef Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 4 Jan 2024 18:53:37 +0800 Subject: [PATCH] fix:[TD-28155] set consumer lost if all current topic is 0 --- source/dnode/mnode/impl/src/mndConsumer.c | 24 ++++++++++------------- source/dnode/mnode/impl/src/mndTrans.c | 5 ++++- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 9d9473d883..a365a44d56 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -749,18 +749,6 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { return 0; } -static void updateConsumerStatus(SMqConsumerObj *pConsumer) { - int32_t status = pConsumer->status; - - if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { - if (status == MQ_CONSUMER_STATUS_REBALANCE) { - pConsumer->status = MQ_CONSUMER_STATUS_READY; - } else if (status == MQ_CONSUMER_STATUS_READY) { - pConsumer->status = MQ_CONSUMER_STATUS_LOST; - } - } -} - // remove from new topic static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { int32_t size = taosArrayGetSize(pConsumer->rebNewTopics); @@ -881,7 +869,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, // set status int32_t status = pOldConsumer->status; - updateConsumerStatus(pOldConsumer); + if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && + taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0 && + taosArrayGetSize(pOldConsumer->currentTopics) != 0){ + pOldConsumer->status = MQ_CONSUMER_STATUS_READY; + } // the re-balance is triggered when the new consumer is launched. pOldConsumer->rebalanceTime = taosGetTimestampMs(); @@ -905,7 +897,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, // set status int32_t status = pOldConsumer->status; - updateConsumerStatus(pOldConsumer); + if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && + taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0 && + taosArrayGetSize(pOldConsumer->currentTopics) == 0){ + pOldConsumer->status = MQ_CONSUMER_STATUS_LOST; + } pOldConsumer->rebalanceTime = taosGetTimestampMs(); atomic_add_fetch_32(&pOldConsumer->epoch, 1); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 99553fc57a..9075f0145c 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1119,7 +1119,10 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { if (pAction->msgSent) return 0; - if (mndCannotExecuteTransAction(pMnode, topHalf)) return TSDB_CODE_MND_TRANS_CTX_SWITCH; + if (mndCannotExecuteTransAction(pMnode, topHalf)) { + terrno = TSDB_CODE_MND_TRANS_CTX_SWITCH; + return TSDB_CODE_MND_TRANS_CTX_SWITCH; + } int64_t signature = pTrans->id; signature = (signature << 32);