Merge pull request #24342 from taosdata/fix/TD-28155
fix:[TD-28155] set consumer lost if all current topic is 0
This commit is contained in:
commit
f7ab55fb71
|
@ -755,7 +755,7 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
|
||||||
if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
|
if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
|
||||||
if (status == MQ_CONSUMER_STATUS_REBALANCE) {
|
if (status == MQ_CONSUMER_STATUS_REBALANCE) {
|
||||||
pConsumer->status = MQ_CONSUMER_STATUS_READY;
|
pConsumer->status = MQ_CONSUMER_STATUS_READY;
|
||||||
} else if (status == MQ_CONSUMER_STATUS_READY) {
|
} else if (status == MQ_CONSUMER_STATUS_READY && taosArrayGetSize(pConsumer->currentTopics) == 0) {
|
||||||
pConsumer->status = MQ_CONSUMER_STATUS_LOST;
|
pConsumer->status = MQ_CONSUMER_STATUS_LOST;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -737,8 +737,6 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) {
|
||||||
SMqConsumerObj *pConsumer;
|
SMqConsumerObj *pConsumer;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
|
||||||
mInfo("start to process mq timer");
|
|
||||||
|
|
||||||
// iterate all consumers, find all modification
|
// iterate all consumers, find all modification
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||||
|
@ -852,6 +850,8 @@ void mndRebCntDec() {
|
||||||
|
|
||||||
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
mInfo("start to process mq timer");
|
||||||
|
|
||||||
if (!mndRebTryStart()) {
|
if (!mndRebTryStart()) {
|
||||||
mInfo("mq rebalance already in progress, do nothing");
|
mInfo("mq rebalance already in progress, do nothing");
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -1119,7 +1119,10 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
|
||||||
|
|
||||||
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
|
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
|
||||||
if (pAction->msgSent) return 0;
|
if (pAction->msgSent) return 0;
|
||||||
if (mndCannotExecuteTransAction(pMnode, topHalf)) return TSDB_CODE_MND_TRANS_CTX_SWITCH;
|
if (mndCannotExecuteTransAction(pMnode, topHalf)) {
|
||||||
|
terrno = TSDB_CODE_MND_TRANS_CTX_SWITCH;
|
||||||
|
return TSDB_CODE_MND_TRANS_CTX_SWITCH;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t signature = pTrans->id;
|
int64_t signature = pTrans->id;
|
||||||
signature = (signature << 32);
|
signature = (signature << 32);
|
||||||
|
|
Loading…
Reference in New Issue