Merge pull request #20938 from taosdata/fix/TD-22671

opti:the logic of mndDoRebalance for clear
This commit is contained in:
Haojun Liao 2023-04-18 09:30:17 +08:00 committed by GitHub
commit 739dc5a7a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 77 additions and 128 deletions

View File

@ -2071,7 +2071,6 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq
typedef struct { typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN];
SArray* lostConsumers; // SArray<int64_t>
SArray* removedConsumers; // SArray<int64_t> SArray* removedConsumers; // SArray<int64_t>
SArray* newConsumers; // SArray<int64_t> SArray* newConsumers; // SArray<int64_t>
} SMqRebInfo; } SMqRebInfo;
@ -2082,10 +2081,6 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
return NULL; return NULL;
} }
tstrncpy(pRebInfo->key, key, TSDB_SUBSCRIBE_KEY_LEN); tstrncpy(pRebInfo->key, key, TSDB_SUBSCRIBE_KEY_LEN);
pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebInfo->lostConsumers == NULL) {
goto _err;
}
pRebInfo->removedConsumers = taosArrayInit(0, sizeof(int64_t)); pRebInfo->removedConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebInfo->removedConsumers == NULL) { if (pRebInfo->removedConsumers == NULL) {
goto _err; goto _err;
@ -2096,7 +2091,6 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
} }
return pRebInfo; return pRebInfo;
_err: _err:
taosArrayDestroy(pRebInfo->lostConsumers);
taosArrayDestroy(pRebInfo->removedConsumers); taosArrayDestroy(pRebInfo->removedConsumers);
taosArrayDestroy(pRebInfo->newConsumers); taosArrayDestroy(pRebInfo->newConsumers);
taosMemoryFreeClear(pRebInfo); taosMemoryFreeClear(pRebInfo);

View File

@ -137,12 +137,12 @@ typedef enum {
} EDndReason; } EDndReason;
typedef enum { typedef enum {
CONSUMER_UPDATE__TOUCH = 1, CONSUMER_UPDATE__TOUCH = 1, // rebalance req do not need change consume topic
CONSUMER_UPDATE__ADD, CONSUMER_UPDATE__ADD,
CONSUMER_UPDATE__REMOVE, CONSUMER_UPDATE__REMOVE,
CONSUMER_UPDATE__LOST, CONSUMER_UPDATE__LOST,
CONSUMER_UPDATE__RECOVER, CONSUMER_UPDATE__RECOVER,
CONSUMER_UPDATE__MODIFY, CONSUMER_UPDATE__MODIFY, // subscribe req need change consume topic
} ECsmUpdateType; } ECsmUpdateType;
typedef struct { typedef struct {
@ -624,7 +624,7 @@ typedef struct {
SArray* rebVgs; // SArray<SMqRebOutputVg> SArray* rebVgs; // SArray<SMqRebOutputVg>
SArray* newConsumers; // SArray<int64_t> SArray* newConsumers; // SArray<int64_t>
SArray* removedConsumers; // SArray<int64_t> SArray* removedConsumers; // SArray<int64_t>
SArray* touchedConsumers; // SArray<int64_t> SArray* modifyConsumers; // SArray<int64_t>
SMqSubscribeObj* pSub; SMqSubscribeObj* pSub;
SMqSubActionLogEntry* pLogEntry; SMqSubActionLogEntry* pLogEntry;
} SMqRebOutputObj; } SMqRebOutputObj;

View File

@ -247,7 +247,6 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
static void freeRebalanceItem(void *param) { static void freeRebalanceItem(void *param) {
SMqRebInfo *pInfo = param; SMqRebInfo *pInfo = param;
taosArrayDestroy(pInfo->lostConsumers);
taosArrayDestroy(pInfo->newConsumers); taosArrayDestroy(pInfo->newConsumers);
taosArrayDestroy(pInfo->removedConsumers); taosArrayDestroy(pInfo->removedConsumers);
} }
@ -335,7 +334,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
} }
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
} else if (status == MQ_CONSUMER_STATUS__MODIFY) { } else {
taosRLockLatch(&pConsumer->lock); taosRLockLatch(&pConsumer->lock);
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics); int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
@ -356,8 +355,6 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
} }
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
} else {
// do nothing
} }
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
@ -917,34 +914,22 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
taosWLockLatch(&pOldConsumer->lock); taosWLockLatch(&pOldConsumer->lock);
if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) { if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
/*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/ SArray *tmp = pOldConsumer->rebNewTopics;
/*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/ pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
pNewConsumer->rebNewTopics = tmp;
// this new consumer has identical topics with one existed consumers. tmp = pOldConsumer->rebRemovedTopics;
if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) { pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
pOldConsumer->status = MQ_CONSUMER_STATUS__READY; pNewConsumer->rebRemovedTopics = tmp;
} else {
SArray *tmp = pOldConsumer->rebNewTopics;
pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
pNewConsumer->rebNewTopics = tmp;
tmp = pOldConsumer->rebRemovedTopics; tmp = pOldConsumer->assignedTopics;
pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics; pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
pNewConsumer->rebRemovedTopics = tmp; pNewConsumer->assignedTopics = tmp;
tmp = pOldConsumer->assignedTopics; pOldConsumer->subscribeTime = pNewConsumer->upTime;
pOldConsumer->assignedTopics = pNewConsumer->assignedTopics; pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
pNewConsumer->assignedTopics = tmp;
pOldConsumer->subscribeTime = pNewConsumer->upTime;
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
}
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
/*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
/*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics); int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
/*pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));*/
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i)); char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
taosArrayPush(pOldConsumer->rebRemovedTopics, &topic); taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
@ -958,9 +943,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status), pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status),
pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) {
/*A(taosArrayGetSize(pOldConsumer->currentTopics) == 0);*/
/*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i)); char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
@ -976,7 +958,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->rebalanceTime = pNewConsumer->upTime; pOldConsumer->rebalanceTime = pNewConsumer->upTime;
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);
char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
// not exist in current topic // not exist in current topic
@ -1015,15 +996,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
/*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/
/*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);*/
char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0); char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
#if 0
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
A(strcmp(topic, removedTopic) != 0);
}
#endif
// remove from removed topic // remove from removed topic
removeFromRemoveTopicList(pOldConsumer, removedTopic); removeFromRemoveTopicList(pOldConsumer, removedTopic);

View File

@ -269,7 +269,7 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
}; };
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg)); taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", pSubKey, pVgEp->vgId); mInfo("sub:%s mq re-balance addUnassignedVgroups vgId:%d from unassigned", pSubKey, pVgEp->vgId);
} }
} }
@ -289,9 +289,9 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
// all old consumers still existing are touched // all old consumers still existing need to be modified
// TODO optimize: touch only consumer whose vgs changed // TODO optimize: modify only consumer whose vgs changed
taosArrayPush(pOutput->touchedConsumers, &pConsumerEp->consumerId); taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId);
if (consumerVgNum > minVgCnt) { if (consumerVgNum > minVgCnt) {
if (imbCnt < imbConsumerNum) { if (imbCnt < imbConsumerNum) {
if (consumerVgNum == minVgCnt + 1) { if (consumerVgNum == minVgCnt + 1) {
@ -339,13 +339,13 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum, mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum,
pInput->oldConsumerNum, numOfAdded, numOfRemoved); pInput->oldConsumerNum, numOfAdded, numOfRemoved);
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg // 1. build temporary hash(vgId -> SMqRebOutputVg) to store vg that need to be assigned
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
// 2. check and get actual removed consumers, put their vg into hash // 2. check and get actual removed consumers, put their vg into pHash
doRemoveExistedConsumers(pOutput, pHash, pInput); doRemoveExistedConsumers(pOutput, pHash, pInput);
// 3. if previously no consumer, there are vgs not assigned // 3. if previously no consumer, there are vgs not assigned, put these vg into pHash
addUnassignedVgroups(pOutput, pHash); addUnassignedVgroups(pOutput, pHash);
// 4. calc vg number of each consumer // 4. calc vg number of each consumer
@ -364,19 +364,17 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
mInfo("sub:%s no consumer subscribe this topic", pSubKey); mInfo("sub:%s no consumer subscribe this topic", pSubKey);
} }
// 5. first scan: remove vgroups from te consumers, who have more vgroups than the threashold value that is // 5. remove vgroups from consumers who have more vgroups than the threshold value(minVgCnt or minVgCnt + 1), and then another vg into pHash
// minVgCnt, and then put them into the recycled hash list
transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum); transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum);
// 6. add new consumer into sub // 6. add new consumer into sub
doAddNewConsumers(pOutput, pInput); doAddNewConsumers(pOutput, pInput);
// 7. second scan: find consumer do not have enough vgroups, extract from temporary hash and assign to them
// All related vg should be put into rebVgs
SMqRebOutputVg *pRebVg = NULL; SMqRebOutputVg *pRebVg = NULL;
void *pRemovedIter = NULL; void *pRemovedIter = NULL;
void *pIter = NULL; void *pIter = NULL;
// 7. extract bgroups from pHash and assign to consumers that do not have enough vgroups
while (1) { while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) { if (pIter == NULL) {
@ -390,68 +388,52 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
// iter hash and find one vg // iter hash and find one vg
pRemovedIter = taosHashIterate(pHash, pRemovedIter); pRemovedIter = taosHashIterate(pHash, pRemovedIter);
if (pRemovedIter == NULL) { if (pRemovedIter == NULL) {
mError("sub:%s removed iter is null", pSubKey); mError("sub:%s removed iter is null, never can reach hear", pSubKey);
break; break;
} }
pRebVg = (SMqRebOutputVg *)pRemovedIter; pRebVg = (SMqRebOutputVg *)pRemovedIter;
// push
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId; pRebVg->newConsumerId = pConsumerEp->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg); taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (not enough)", pRebVg->pVgEp->vgId, mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
pConsumerEp->consumerId);
} }
} }
// 7. handle unassigned vg while (1) {
if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) { pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
// if has consumer, assign all left vg if (pIter == NULL) {
while (1) { break;
SMqConsumerEp *pConsumerEp = NULL; }
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
pRemovedIter = taosHashIterate(pHash, pRemovedIter); pRemovedIter = taosHashIterate(pHash, pRemovedIter);
if (pRemovedIter == NULL) { if (pRemovedIter == NULL) {
if (pIter != NULL) { mInfo("sub:%s removed iter is null", pSubKey);
taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
pIter = NULL;
}
break; break;
} }
while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
pConsumerEp = (SMqConsumerEp *)pIter;
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
break;
}
}
pRebVg = (SMqRebOutputVg *)pRemovedIter; pRebVg = (SMqRebOutputVg *)pRemovedIter;
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId; pRebVg->newConsumerId = pConsumerEp->consumerId;
if (pRebVg->newConsumerId == pRebVg->oldConsumerId) { taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
mInfo("mq rebalance: skip vg %d for same consumer:0x%" PRIx64 " (second scan)", pRebVg->pVgEp->vgId, mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
pConsumerEp->consumerId);
continue;
}
taosArrayPush(pOutput->rebVgs, pRebVg);
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId);
} }
} else { }
// if all consumer is removed, put all vg into unassigned
pIter = NULL;
SMqRebOutputVg *pRebOutput = NULL;
while (1) {
pIter = taosHashIterate(pHash, pIter);
if (pIter == NULL) {
break;
}
pRebOutput = (SMqRebOutputVg *)pIter; // All assigned vg should be put into pOutput->rebVgs
if(pRemovedIter != NULL){
mError("sub:%s error pRemovedIter should be NULL", pSubKey);
}
while (1) {
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
if (pRemovedIter == NULL) {
break;
}
SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter;
taosArrayPush(pOutput->rebVgs, pRebOutput);
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed, put all vg into unassigned
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
taosArrayPush(pOutput->rebVgs, pRebOutput);
mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", pSubKey, pRebOutput->pVgEp->vgId);
} }
} }
@ -462,19 +444,18 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey, mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey,
pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId); pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
} }
{
pIter = NULL; pIter = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
int32_t sz = taosArrayGetSize(pConsumerEp->vgs); int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz); mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId, mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId,
pConsumerEp->consumerId); pConsumerEp->consumerId);
}
} }
} }
@ -515,9 +496,9 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
// 3. commit log: consumer to update status and epoch // 3. commit log: consumer to update status and epoch
// 3.1 set touched consumer // 3.1 set touched consumer
int32_t consumerNum = taosArrayGetSize(pOutput->touchedConsumers); int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers);
for (int32_t i = 0; i < consumerNum; i++) { for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->touchedConsumers, i); int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i);
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH; pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH;
@ -597,15 +578,15 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMqDoRebalanceMsg *pReq = pMsg->pCont; SMqDoRebalanceMsg *pReq = pMsg->pCont;
void *pIter = NULL; void *pIter = NULL;
bool rebalanceOnce = false; // to ensure only once. // bool rebalanceOnce = false; // to ensure only once.
mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash)); mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash));
// here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction. // here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
while (1) { while (1) {
if (rebalanceOnce) { // if (rebalanceOnce) {
break; // break;
} // }
pIter = taosHashIterate(pReq->rebSubHash, pIter); pIter = taosHashIterate(pReq->rebSubHash, pIter);
if (pIter == NULL) { if (pIter == NULL) {
@ -617,7 +598,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
SMqRebOutputObj rebOutput = {0}; SMqRebOutputObj rebOutput = {0};
rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t));
rebOutput.touchedConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t));
rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter;
@ -653,13 +634,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
rebInput.oldConsumerNum = 0; rebInput.oldConsumerNum = 0;
mInfo("topic:%s has no consumers sub yet", topic); mInfo("sub topic:%s has no consumers sub yet", pRebInfo->key);
} else { } else {
taosRLockLatch(&pSub->lock); taosRLockLatch(&pSub->lock);
rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash); rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
rebOutput.pSub = tCloneSubscribeObj(pSub); rebOutput.pSub = tCloneSubscribeObj(pSub);
taosRUnLockLatch(&pSub->lock); taosRUnLockLatch(&pSub->lock);
mInfo("topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum); mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
} }
@ -675,13 +656,14 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
} }
taosArrayDestroy(rebOutput.newConsumers); taosArrayDestroy(rebOutput.newConsumers);
taosArrayDestroy(rebOutput.touchedConsumers); taosArrayDestroy(rebOutput.modifyConsumers);
taosArrayDestroy(rebOutput.removedConsumers); taosArrayDestroy(rebOutput.removedConsumers);
taosArrayDestroy(rebOutput.rebVgs); taosArrayDestroy(rebOutput.rebVgs);
tDeleteSubscribeObj(rebOutput.pSub); tDeleteSubscribeObj(rebOutput.pSub);
taosMemoryFree(rebOutput.pSub); taosMemoryFree(rebOutput.pSub);
rebalanceOnce = true; // taosSsleep(100);
// rebalanceOnce = true;
} }
// reset flag // reset flag

View File

@ -196,13 +196,13 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
SSdbRow *pOldRow = *ppOldRow; SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status; pOldRow->status = pRaw->status;
sdbPrintOper(pSdb, pOldRow, "update"); sdbPrintOper(pSdb, pOldRow, "update");
sdbUnLock(pSdb, type);
int32_t code = 0; int32_t code = 0;
SdbUpdateFp updateFp = pSdb->updateFps[type]; SdbUpdateFp updateFp = pSdb->updateFps[type];
if (updateFp != NULL) { if (updateFp != NULL) {
code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj); code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
} }
sdbUnLock(pSdb, type);
// sdbUnLock(pSdb, type); // sdbUnLock(pSdb, type);
sdbFreeRow(pSdb, pNewRow, false); sdbFreeRow(pSdb, pNewRow, false);

View File

@ -226,7 +226,7 @@ class TDTestCase:
event.wait() event.wait()
tdLog.info("start consume processor") tdLog.info("start consume processor")
pollDelay = 100 pollDelay = 20
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)