fix:[TD-28155] set consumer lost if all current topic is 0

This commit is contained in:
wangmm0220 2024-01-05 19:28:40 +08:00
parent bd64a78094
commit f2a1d24ab1
2 changed files with 16 additions and 12 deletions

View File

@ -749,6 +749,18 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
return 0; return 0;
} }
static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
int32_t status = pConsumer->status;
if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
if (status == MQ_CONSUMER_STATUS_REBALANCE) {
pConsumer->status = MQ_CONSUMER_STATUS_READY;
} else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) {
pConsumer->status = MQ_CONSUMER_STATUS_LOST;
}
}
}
// remove from new topic // remove from new topic
static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
int32_t size = taosArrayGetSize(pConsumer->rebNewTopics); int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
@ -869,11 +881,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
// set status // set status
int32_t status = pOldConsumer->status; int32_t status = pOldConsumer->status;
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && updateConsumerStatus(pOldConsumer);
taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0 &&
taosArrayGetSize(pOldConsumer->currentTopics) != 0){
pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
}
// the re-balance is triggered when the new consumer is launched. // the re-balance is triggered when the new consumer is launched.
pOldConsumer->rebalanceTime = taosGetTimestampMs(); pOldConsumer->rebalanceTime = taosGetTimestampMs();
@ -897,11 +905,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
// set status // set status
int32_t status = pOldConsumer->status; int32_t status = pOldConsumer->status;
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && updateConsumerStatus(pOldConsumer);
taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0 &&
taosArrayGetSize(pOldConsumer->currentTopics) == 0){
pOldConsumer->status = MQ_CONSUMER_STATUS_LOST;
}
pOldConsumer->rebalanceTime = taosGetTimestampMs(); pOldConsumer->rebalanceTime = taosGetTimestampMs();
atomic_add_fetch_32(&pOldConsumer->epoch, 1); atomic_add_fetch_32(&pOldConsumer->epoch, 1);

View File

@ -737,8 +737,6 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) {
SMqConsumerObj *pConsumer; SMqConsumerObj *pConsumer;
void *pIter = NULL; void *pIter = NULL;
mInfo("start to process mq timer");
// iterate all consumers, find all modification // iterate all consumers, find all modification
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
@ -852,6 +850,8 @@ void mndRebCntDec() {
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
int code = 0; int code = 0;
mInfo("start to process mq timer");
if (!mndRebTryStart()) { if (!mndRebTryStart()) {
mInfo("mq rebalance already in progress, do nothing"); mInfo("mq rebalance already in progress, do nothing");
return code; return code;