fix(query): allow only one trans to be execute for each balance, and do some other refactor.
This commit is contained in:
parent
35de37926e
commit
e92bfa558d
|
@ -845,6 +845,53 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void updateConsumerStatus(SMqConsumerObj* pConsumer) {
|
||||
int32_t status = pConsumer->status;
|
||||
|
||||
if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
|
||||
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
||||
pConsumer->status = MQ_CONSUMER_STATUS__READY;
|
||||
} else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
|
||||
pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
|
||||
}
|
||||
} else {
|
||||
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
||||
pConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
|
||||
} else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
|
||||
pConsumer->status = MQ_CONSUMER_STATUS__LOST;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// remove from new topic
|
||||
static void removeFromNewTopicList(SMqConsumerObj* pConsumer, const char* pTopic) {
|
||||
int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pConsumer->rebNewTopics); i++) {
|
||||
char *p = taosArrayGetP(pConsumer->rebNewTopics, i);
|
||||
if (strcmp(pTopic, p) == 0) {
|
||||
taosArrayRemove(pConsumer->rebNewTopics, i);
|
||||
taosMemoryFree(p);
|
||||
|
||||
mDebug("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId,
|
||||
pTopic, (int) taosArrayGetSize(pConsumer->rebNewTopics));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// remove from removed topic
|
||||
static void removeFromRemoveTopicList(SMqConsumerObj* pConsumer, const char* pTopic) {
|
||||
int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i);
|
||||
if (strcmp(pTopic, p) == 0) {
|
||||
taosArrayRemove(pConsumer->rebRemovedTopics, i);
|
||||
taosMemoryFree(p);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
|
||||
mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", uptime:%" PRId64,
|
||||
pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime);
|
||||
|
@ -855,6 +902,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
/*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
|
||||
/*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
|
||||
|
||||
// this new consumer has identical topics with one existed consumers.
|
||||
if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
|
||||
} else {
|
||||
|
@ -871,7 +919,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
pNewConsumer->assignedTopics = tmp;
|
||||
|
||||
pOldConsumer->subscribeTime = pNewConsumer->upTime;
|
||||
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
|
||||
}
|
||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
|
||||
|
@ -911,71 +958,48 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
||||
|
||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
|
||||
/*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);*/
|
||||
/*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);*/
|
||||
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);
|
||||
char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
|
||||
|
||||
char *addedTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
|
||||
// not exist in current topic
|
||||
|
||||
bool existing = false;
|
||||
#if 1
|
||||
bool existing = false;
|
||||
int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics);
|
||||
for (int32_t i = 0; i < numOfExistedTopics; i++) {
|
||||
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
|
||||
if (strcmp(topic, addedTopic) == 0) {
|
||||
if (strcmp(topic, pNewTopic) == 0) {
|
||||
existing = true;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
// remove from new topic
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
|
||||
char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
|
||||
if (strcmp(addedTopic, topic) == 0) {
|
||||
taosArrayRemove(pOldConsumer->rebNewTopics, i);
|
||||
taosMemoryFree(topic);
|
||||
break;
|
||||
}
|
||||
}
|
||||
removeFromNewTopicList(pOldConsumer, pNewTopic);
|
||||
|
||||
// add to current topic
|
||||
if (!existing) {
|
||||
taosArrayPush(pOldConsumer->currentTopics, &addedTopic);
|
||||
taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
|
||||
taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
|
||||
} else {
|
||||
taosMemoryFree(addedTopic);
|
||||
taosMemoryFree(pNewTopic);
|
||||
}
|
||||
|
||||
// set status
|
||||
int32_t status = pOldConsumer->status;
|
||||
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
|
||||
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
|
||||
} else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
|
||||
}
|
||||
} else {
|
||||
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
|
||||
} else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
|
||||
}
|
||||
}
|
||||
updateConsumerStatus(pOldConsumer);
|
||||
|
||||
// the re-balance is triggered when the new consumer is launched.
|
||||
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
||||
|
||||
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||
mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
|
||||
mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
|
||||
", current topics:%d, newTopics:%d, removeTopics:%d",
|
||||
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
|
||||
mndConsumerStatusName(pOldConsumer->status),
|
||||
pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
|
||||
mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
|
||||
(int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
|
||||
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
|
||||
|
||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
|
||||
/*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/
|
||||
/*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);*/
|
||||
char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
|
||||
|
||||
// not exist in new topic
|
||||
#if 0
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
|
||||
char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
|
||||
|
@ -984,14 +1008,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
#endif
|
||||
|
||||
// remove from removed topic
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebRemovedTopics); i++) {
|
||||
char *topic = taosArrayGetP(pOldConsumer->rebRemovedTopics, i);
|
||||
if (strcmp(removedTopic, topic) == 0) {
|
||||
taosArrayRemove(pOldConsumer->rebRemovedTopics, i);
|
||||
taosMemoryFree(topic);
|
||||
break;
|
||||
}
|
||||
}
|
||||
removeFromRemoveTopicList(pOldConsumer, removedTopic);
|
||||
|
||||
// remove from current topic
|
||||
int32_t i = 0;
|
||||
|
@ -1004,32 +1021,20 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
break;
|
||||
}
|
||||
}
|
||||
// must find the topic
|
||||
/*A(i < sz);*/
|
||||
|
||||
// set status
|
||||
int32_t status = pOldConsumer->status;
|
||||
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
|
||||
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
|
||||
} else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
|
||||
}
|
||||
} else {
|
||||
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
|
||||
} else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
|
||||
}
|
||||
}
|
||||
updateConsumerStatus(pOldConsumer);
|
||||
|
||||
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
||||
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||
|
||||
mDebug("consumer:0x%" PRIx64 " state %d(%s) -> %d(%s), new epoch:%d, reb-time:%" PRId64 ", current topics:%d",
|
||||
mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
|
||||
", current topics:%d, newTopics:%d, removeTopics:%d",
|
||||
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
|
||||
mndConsumerStatusName(pOldConsumer->status),
|
||||
pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics));
|
||||
mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
|
||||
(int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
|
||||
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pOldConsumer->lock);
|
||||
|
|
|
@ -197,24 +197,20 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
|||
return pRebSub;
|
||||
}
|
||||
|
||||
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
|
||||
int32_t totalVgNum = pOutput->pSub->vgNum;
|
||||
const char *sub = pOutput->pSub->key;
|
||||
mInfo("sub:%s mq re-balance %d vgroups", sub, pOutput->pSub->vgNum);
|
||||
static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
|
||||
int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
|
||||
const char *pSubKey = pOutput->pSub->key;
|
||||
|
||||
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
|
||||
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
|
||||
// 2. check and get actual removed consumers, put their vg into hash
|
||||
int32_t removedNum = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
|
||||
int32_t actualRemoved = 0;
|
||||
for (int32_t i = 0; i < removedNum; i++) {
|
||||
for (int32_t i = 0; i < numOfRemoved; i++) {
|
||||
uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
|
||||
|
||||
SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||
|
||||
// consumer exists till now
|
||||
if (pConsumerEp) {
|
||||
actualRemoved++;
|
||||
|
||||
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||
for (int32_t j = 0; j < consumerVgNum; j++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
||||
|
@ -223,52 +219,66 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
.newConsumerId = -1,
|
||||
.pVgEp = pVgEp,
|
||||
};
|
||||
|
||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, sub, pVgEp->vgId, consumerId);
|
||||
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pConsumerEp->vgs);
|
||||
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||
|
||||
// put into removed
|
||||
taosArrayPush(pOutput->removedConsumers, &consumerId);
|
||||
}
|
||||
}
|
||||
|
||||
if (removedNum != actualRemoved) {
|
||||
mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", sub, removedNum, actualRemoved);
|
||||
if (numOfRemoved != actualRemoved) {
|
||||
mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", pSubKey, numOfRemoved, actualRemoved);
|
||||
} else {
|
||||
mInfo("sub:%s removed %d consumers", pSubKey, numOfRemoved);
|
||||
}
|
||||
}
|
||||
|
||||
// if previously no consumer, there are vgs not assigned
|
||||
{
|
||||
int32_t consumerVgNum = taosArrayGetSize(pOutput->pSub->unassignedVgs);
|
||||
for (int32_t i = 0; i < consumerVgNum; i++) {
|
||||
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs);
|
||||
SMqRebOutputVg rebOutput = {
|
||||
.oldConsumerId = -1,
|
||||
.newConsumerId = -1,
|
||||
.pVgEp = pVgEp,
|
||||
};
|
||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
|
||||
mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", sub, pVgEp->vgId);
|
||||
}
|
||||
static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
|
||||
int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
|
||||
const char *pSubKey = pOutput->pSub->key;
|
||||
|
||||
for (int32_t i = 0; i < numOfNewConsumers; i++) {
|
||||
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
|
||||
|
||||
SMqConsumerEp newConsumerEp;
|
||||
newConsumerEp.consumerId = consumerId;
|
||||
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
||||
|
||||
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
||||
taosArrayPush(pOutput->newConsumers, &consumerId);
|
||||
mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId);
|
||||
}
|
||||
}
|
||||
|
||||
// 3. calc vg number of each consumer
|
||||
int32_t afterRebConsumerNum = pInput->oldConsumerNum + taosArrayGetSize(pInput->pRebInfo->newConsumers) -
|
||||
taosArrayGetSize(pInput->pRebInfo->removedConsumers);
|
||||
int32_t minVgCnt = 0;
|
||||
int32_t imbConsumerNum = 0;
|
||||
// calc num
|
||||
if (afterRebConsumerNum) {
|
||||
minVgCnt = totalVgNum / afterRebConsumerNum;
|
||||
imbConsumerNum = totalVgNum % afterRebConsumerNum;
|
||||
static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj* pHash) {
|
||||
const char *pSubKey = pOutput->pSub->key;
|
||||
int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
|
||||
|
||||
for (int32_t i = 0; i < numOfVgroups; i++) {
|
||||
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs);
|
||||
SMqRebOutputVg rebOutput = {
|
||||
.oldConsumerId = -1,
|
||||
.newConsumerId = -1,
|
||||
.pVgEp = pVgEp,
|
||||
};
|
||||
|
||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
|
||||
mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", pSubKey, pVgEp->vgId);
|
||||
}
|
||||
}
|
||||
|
||||
mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has more vgs", sub,
|
||||
afterRebConsumerNum, minVgCnt, imbConsumerNum);
|
||||
static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj* pHash, int32_t minVgCnt, int32_t imbConsumerNum) {
|
||||
const char *pSubKey = pOutput->pSub->key;
|
||||
|
||||
// 4. first scan: remove consumer more than wanted, put to remove hash
|
||||
int32_t imbCnt = 0;
|
||||
void *pIter = NULL;
|
||||
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) {
|
||||
|
@ -276,8 +286,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
}
|
||||
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||
|
||||
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||
// all old consumers still existing are touched
|
||||
// TODO optimize: touch only consumer whose vgs changed
|
||||
taosArrayPush(pOutput->touchedConsumers, &pConsumerEp->consumerId);
|
||||
|
@ -296,13 +306,13 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
.pVgEp = pVgEp,
|
||||
};
|
||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", sub, pVgEp->vgId,
|
||||
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
imbCnt++;
|
||||
}
|
||||
} else {
|
||||
// pop until equal minVg
|
||||
// all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
|
||||
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
|
||||
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
|
||||
SMqRebOutputVg outputVg = {
|
||||
|
@ -311,36 +321,66 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
.pVgEp = pVgEp,
|
||||
};
|
||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", sub, pVgEp->vgId,
|
||||
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 5. add new consumer into sub
|
||||
{
|
||||
int32_t consumerNum = taosArrayGetSize(pInput->pRebInfo->newConsumers);
|
||||
for (int32_t i = 0; i < consumerNum; i++) {
|
||||
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
|
||||
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
|
||||
int32_t totalVgNum = pOutput->pSub->vgNum;
|
||||
const char *pSubKey = pOutput->pSub->key;
|
||||
|
||||
SMqConsumerEp newConsumerEp;
|
||||
newConsumerEp.consumerId = consumerId;
|
||||
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
||||
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
||||
taosArrayPush(pOutput->newConsumers, &consumerId);
|
||||
mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, sub, consumerId);
|
||||
}
|
||||
int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
|
||||
int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
|
||||
mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum,
|
||||
pInput->oldConsumerNum, numOfAdded, numOfRemoved);
|
||||
|
||||
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
|
||||
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
|
||||
// 2. check and get actual removed consumers, put their vg into hash
|
||||
doRemoveExistedConsumers(pOutput, pHash, pInput);
|
||||
|
||||
// 3. if previously no consumer, there are vgs not assigned
|
||||
addUnassignedVgroups(pOutput, pHash);
|
||||
|
||||
// 4. calc vg number of each consumer
|
||||
int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
|
||||
|
||||
int32_t minVgCnt = 0;
|
||||
int32_t imbConsumerNum = 0;
|
||||
|
||||
// calc num
|
||||
if (numOfFinal) {
|
||||
minVgCnt = totalVgNum / numOfFinal;
|
||||
imbConsumerNum = totalVgNum % numOfFinal;
|
||||
}
|
||||
|
||||
// 6. second scan: find consumer do not have enough vg, extract from temporary hash and assign to new consumer.
|
||||
mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has 1 more vgroups than avg value",
|
||||
pSubKey, numOfFinal, minVgCnt, imbConsumerNum);
|
||||
|
||||
// 5. first scan: remove vgroups from te consumers, who have more vgroups than the threashold value that is
|
||||
// minVgCnt, and then put them into the recycled hash list
|
||||
transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum);
|
||||
|
||||
// 6. add new consumer into sub
|
||||
doAddNewConsumers(pOutput, pInput);
|
||||
|
||||
// 7. second scan: find consumer do not have enough vgroups, extract from temporary hash and assign to them
|
||||
// All related vg should be put into rebVgs
|
||||
SMqRebOutputVg *pRebVg = NULL;
|
||||
void *pRemovedIter = NULL;
|
||||
pIter = NULL;
|
||||
void *pIter = NULL;
|
||||
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
|
||||
// push until equal minVg
|
||||
|
@ -348,8 +388,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
// iter hash and find one vg
|
||||
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
||||
if (pRemovedIter == NULL) {
|
||||
mError("sub:%s removed iter is null", sub);
|
||||
continue;
|
||||
mError("sub:%s removed iter is null", pSubKey);
|
||||
break;
|
||||
}
|
||||
|
||||
pRebVg = (SMqRebOutputVg *)pRemovedIter;
|
||||
|
@ -409,15 +449,15 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
|
||||
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
|
||||
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
||||
mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", sub, pRebOutput->pVgEp->vgId);
|
||||
mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", pSubKey, pRebOutput->pVgEp->vgId);
|
||||
}
|
||||
}
|
||||
|
||||
// 8. generate logs
|
||||
mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", sub);
|
||||
mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pSubKey);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
|
||||
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
|
||||
mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, sub,
|
||||
mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey,
|
||||
pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
|
||||
}
|
||||
{
|
||||
|
@ -427,10 +467,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
if (pIter == NULL) break;
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
||||
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", sub, pConsumerEp->consumerId, sz);
|
||||
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
||||
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, sub, pVgEp->vgId,
|
||||
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
}
|
||||
|
@ -555,17 +595,24 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
SMnode *pMnode = pMsg->info.node;
|
||||
SMqDoRebalanceMsg *pReq = pMsg->pCont;
|
||||
void *pIter = NULL;
|
||||
bool rebalanceExec = false; // to ensure only once.
|
||||
|
||||
mInfo("mq re-balance start");
|
||||
mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash));
|
||||
|
||||
// here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
|
||||
while (1) {
|
||||
if (rebalanceExec) {
|
||||
break;
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pReq->rebSubHash, pIter);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SMqRebInputObj rebInput = {0};
|
||||
taosSsleep(20);
|
||||
|
||||
SMqRebInputObj rebInput = {0};
|
||||
SMqRebOutputObj rebOutput = {0};
|
||||
rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||
rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||
|
@ -582,9 +629,10 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
|
||||
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||
if (pTopic == NULL) {
|
||||
mError("mq re-balance %s ignored since topic %s not exist", pRebInfo->key, topic);
|
||||
mError("mq re-balance %s ignored since topic %s doesn't exist", pRebInfo->key, topic);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -604,11 +652,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
||||
rebInput.oldConsumerNum = 0;
|
||||
mInfo("topic:%s has no consumers sub yet", topic);
|
||||
} else {
|
||||
taosRLockLatch(&pSub->lock);
|
||||
rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
|
||||
rebOutput.pSub = tCloneSubscribeObj(pSub);
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mInfo("topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
}
|
||||
|
||||
|
@ -633,6 +683,8 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
taosArrayDestroy(rebOutput.rebVgs);
|
||||
tDeleteSubscribeObj(rebOutput.pSub);
|
||||
taosMemoryFree(rebOutput.pSub);
|
||||
|
||||
rebalanceExec = true;
|
||||
}
|
||||
|
||||
// reset flag
|
||||
|
|
Loading…
Reference in New Issue