fix:[TD-28155] set consumer lost if all current topic is 0
This commit is contained in:
parent
13834802ef
commit
bd64a78094
|
@ -749,18 +749,6 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
|
|
||||||
int32_t status = pConsumer->status;
|
|
||||||
|
|
||||||
if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
|
|
||||||
if (status == MQ_CONSUMER_STATUS_REBALANCE) {
|
|
||||||
pConsumer->status = MQ_CONSUMER_STATUS_READY;
|
|
||||||
} else if (status == MQ_CONSUMER_STATUS_READY) {
|
|
||||||
pConsumer->status = MQ_CONSUMER_STATUS_LOST;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove from new topic
|
// remove from new topic
|
||||||
static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
|
static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
|
||||||
int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
|
int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
|
||||||
|
@ -881,7 +869,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
|
|
||||||
// set status
|
// set status
|
||||||
int32_t status = pOldConsumer->status;
|
int32_t status = pOldConsumer->status;
|
||||||
updateConsumerStatus(pOldConsumer);
|
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 &&
|
||||||
|
taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0 &&
|
||||||
|
taosArrayGetSize(pOldConsumer->currentTopics) != 0){
|
||||||
|
pOldConsumer->status = MQ_CONSUMER_STATUS_READY;
|
||||||
|
}
|
||||||
|
|
||||||
// the re-balance is triggered when the new consumer is launched.
|
// the re-balance is triggered when the new consumer is launched.
|
||||||
pOldConsumer->rebalanceTime = taosGetTimestampMs();
|
pOldConsumer->rebalanceTime = taosGetTimestampMs();
|
||||||
|
@ -905,7 +897,11 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
|
|
||||||
// set status
|
// set status
|
||||||
int32_t status = pOldConsumer->status;
|
int32_t status = pOldConsumer->status;
|
||||||
updateConsumerStatus(pOldConsumer);
|
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 &&
|
||||||
|
taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0 &&
|
||||||
|
taosArrayGetSize(pOldConsumer->currentTopics) == 0){
|
||||||
|
pOldConsumer->status = MQ_CONSUMER_STATUS_LOST;
|
||||||
|
}
|
||||||
|
|
||||||
pOldConsumer->rebalanceTime = taosGetTimestampMs();
|
pOldConsumer->rebalanceTime = taosGetTimestampMs();
|
||||||
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||||
|
|
|
@ -1119,7 +1119,10 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
|
||||||
|
|
||||||
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
|
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
|
||||||
if (pAction->msgSent) return 0;
|
if (pAction->msgSent) return 0;
|
||||||
if (mndCannotExecuteTransAction(pMnode, topHalf)) return TSDB_CODE_MND_TRANS_CTX_SWITCH;
|
if (mndCannotExecuteTransAction(pMnode, topHalf)) {
|
||||||
|
terrno = TSDB_CODE_MND_TRANS_CTX_SWITCH;
|
||||||
|
return TSDB_CODE_MND_TRANS_CTX_SWITCH;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t signature = pTrans->id;
|
int64_t signature = pTrans->id;
|
||||||
signature = (signature << 32);
|
signature = (signature << 32);
|
||||||
|
|
Loading…
Reference in New Issue