add eager rebalance back
This commit is contained in:
parent
ca97b16071
commit
9cb9cb686a
|
@ -695,11 +695,13 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
||||||
|
|
||||||
if (taosArrayGetSize(tmq->clientTopics) == 0) {
|
if (taosArrayGetSize(tmq->clientTopics) == 0) {
|
||||||
tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
|
tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
|
||||||
|
printf("over1\n");
|
||||||
usleep(blocking_time * 1000);
|
usleep(blocking_time * 1000);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
|
||||||
if (taosArrayGetSize(pTopic->vgs) == 0) {
|
if (taosArrayGetSize(pTopic->vgs) == 0) {
|
||||||
|
printf("over2\n");
|
||||||
usleep(blocking_time * 1000);
|
usleep(blocking_time * 1000);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -424,7 +424,9 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
||||||
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
|
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
|
||||||
(vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
|
(vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
|
||||||
(vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) {
|
(vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) {
|
||||||
pRebConsumer->epoch++;
|
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {
|
||||||
|
pRebConsumer->epoch++;
|
||||||
|
}
|
||||||
if (vgThisConsumerAfterRb != 0) {
|
if (vgThisConsumerAfterRb != 0) {
|
||||||
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
|
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1036,7 +1038,6 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
taosArrayPush(pSub->consumers, &mqSubConsumer);
|
taosArrayPush(pSub->consumers, &mqSubConsumer);
|
||||||
|
|
||||||
// if have un assigned vg, assign one to the consumer
|
// if have un assigned vg, assign one to the consumer
|
||||||
#if 0
|
|
||||||
if (taosArrayGetSize(pSub->unassignedVg) > 0) {
|
if (taosArrayGetSize(pSub->unassignedVg) > 0) {
|
||||||
SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
|
SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
|
||||||
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
|
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
|
||||||
|
@ -1047,10 +1048,9 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
} else {
|
} else {
|
||||||
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
|
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
|
||||||
}
|
}
|
||||||
// do not set status active to trigger rebalance
|
// to trigger rebalance at once, do not set status active
|
||||||
/*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/
|
/*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
SSdbRaw *pRaw = mndSubActionEncode(pSub);
|
SSdbRaw *pRaw = mndSubActionEncode(pSub);
|
||||||
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||||
|
|
Loading…
Reference in New Issue