From bd64a78094f08aa5ad9bd3c0f1d03b7ab5e6acef Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 4 Jan 2024 18:53:37 +0800 Subject: [PATCH 1/2] fix:[TD-28155] set consumer lost if all current topic is 0 --- source/dnode/mnode/impl/src/mndConsumer.c | 24 ++++++++++------------- source/dnode/mnode/impl/src/mndTrans.c | 5 ++++- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 9d9473d883..a365a44d56 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -749,18 +749,6 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { return 0; } -static void updateConsumerStatus(SMqConsumerObj *pConsumer) { - int32_t status = pConsumer->status; - - if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { - if (status == MQ_CONSUMER_STATUS_REBALANCE) { - pConsumer->status = MQ_CONSUMER_STATUS_READY; - } else if (status == MQ_CONSUMER_STATUS_READY) { - pConsumer->status = MQ_CONSUMER_STATUS_LOST; - } - } -} - // remove from new topic static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { int32_t size = taosArrayGetSize(pConsumer->rebNewTopics); @@ -881,7 +869,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, // set status int32_t status = pOldConsumer->status; - updateConsumerStatus(pOldConsumer); + if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && + taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0 && + taosArrayGetSize(pOldConsumer->currentTopics) != 0){ + pOldConsumer->status = MQ_CONSUMER_STATUS_READY; + } // the re-balance is triggered when the new consumer is launched. pOldConsumer->rebalanceTime = taosGetTimestampMs(); @@ -905,7 +897,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, // set status int32_t status = pOldConsumer->status; - updateConsumerStatus(pOldConsumer); + if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && + taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0 && + taosArrayGetSize(pOldConsumer->currentTopics) == 0){ + pOldConsumer->status = MQ_CONSUMER_STATUS_LOST; + } pOldConsumer->rebalanceTime = taosGetTimestampMs(); atomic_add_fetch_32(&pOldConsumer->epoch, 1); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 99553fc57a..9075f0145c 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1119,7 +1119,10 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { if (pAction->msgSent) return 0; - if (mndCannotExecuteTransAction(pMnode, topHalf)) return TSDB_CODE_MND_TRANS_CTX_SWITCH; + if (mndCannotExecuteTransAction(pMnode, topHalf)) { + terrno = TSDB_CODE_MND_TRANS_CTX_SWITCH; + return TSDB_CODE_MND_TRANS_CTX_SWITCH; + } int64_t signature = pTrans->id; signature = (signature << 32); From f2a1d24ab1a5a6c7b19a1ffdbafef4ebc44846b0 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 5 Jan 2024 19:28:40 +0800 Subject: [PATCH 2/2] fix:[TD-28155] set consumer lost if all current topic is 0 --- source/dnode/mnode/impl/src/mndConsumer.c | 24 +++++++++++++--------- source/dnode/mnode/impl/src/mndSubscribe.c | 4 ++-- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index a365a44d56..4db000287c 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -749,6 +749,18 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { return 0; } +static void updateConsumerStatus(SMqConsumerObj *pConsumer) { + int32_t status = pConsumer->status; + + if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { + if (status == MQ_CONSUMER_STATUS_REBALANCE) { + pConsumer->status = MQ_CONSUMER_STATUS_READY; + } else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) { + pConsumer->status = MQ_CONSUMER_STATUS_LOST; + } + } +} + // remove from new topic static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { int32_t size = taosArrayGetSize(pConsumer->rebNewTopics); @@ -869,11 +881,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, // set status int32_t status = pOldConsumer->status; - if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && - taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0 && - taosArrayGetSize(pOldConsumer->currentTopics) != 0){ - pOldConsumer->status = MQ_CONSUMER_STATUS_READY; - } + updateConsumerStatus(pOldConsumer); // the re-balance is triggered when the new consumer is launched. pOldConsumer->rebalanceTime = taosGetTimestampMs(); @@ -897,11 +905,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, // set status int32_t status = pOldConsumer->status; - if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && - taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0 && - taosArrayGetSize(pOldConsumer->currentTopics) == 0){ - pOldConsumer->status = MQ_CONSUMER_STATUS_LOST; - } + updateConsumerStatus(pOldConsumer); pOldConsumer->rebalanceTime = taosGetTimestampMs(); atomic_add_fetch_32(&pOldConsumer->epoch, 1); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 16ae7f2548..62b671a12f 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -737,8 +737,6 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) { SMqConsumerObj *pConsumer; void *pIter = NULL; - mInfo("start to process mq timer"); - // iterate all consumers, find all modification while (1) { pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); @@ -852,6 +850,8 @@ void mndRebCntDec() { static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { int code = 0; + mInfo("start to process mq timer"); + if (!mndRebTryStart()) { mInfo("mq rebalance already in progress, do nothing"); return code;