From d0bdb0ce2ba8584cd39e2179638063c6763431bc Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 13 Mar 2024 19:36:00 +0800 Subject: [PATCH] opti:tmq logic --- source/dnode/mnode/impl/src/mndConsumer.c | 89 ++++------------- source/dnode/mnode/impl/src/mndDef.c | 2 +- source/dnode/mnode/impl/src/mndSubscribe.c | 111 ++++++++------------- 3 files changed, 62 insertions(+), 140 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index cdb5e89dd7..3fa2ca379a 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -641,12 +641,6 @@ static SMqConsumerObj* buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscri terrno = code; goto _over; } - // no topics need to be rebalanced - if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && - taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { - goto _over; - } - } return pConsumerNew; @@ -821,48 +815,17 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) { } } -// remove from new topic -static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { - int32_t size = taosArrayGetSize(pConsumer->rebNewTopics); +// remove from topic list +static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) { + int32_t size = taosArrayGetSize(topicList); for (int32_t i = 0; i < size; i++) { - char *p = taosArrayGetP(pConsumer->rebNewTopics, i); + char *p = taosArrayGetP(topicList, i); if (strcmp(pTopic, p) == 0) { - taosArrayRemove(pConsumer->rebNewTopics, i); + taosArrayRemove(topicList, i); taosMemoryFree(p); - mInfo("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId, - pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics)); - break; - } - } -} - -// remove from removed topic -static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { - int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics); - for (int32_t i = 0; i < size; i++) { - char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i); - if (strcmp(pTopic, p) == 0) { - taosArrayRemove(pConsumer->rebRemovedTopics, i); - taosMemoryFree(p); - - mInfo("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d", - pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics)); - break; - } - } -} - -static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { - int32_t sz = taosArrayGetSize(pConsumer->currentTopics); - for (int32_t i = 0; i < sz; i++) { - char *topic = taosArrayGetP(pConsumer->currentTopics, i); - if (strcmp(pTopic, topic) == 0) { - taosArrayRemove(pConsumer->currentTopics, i); - taosMemoryFree(topic); - - mInfo("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d", - pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics)); + mInfo("[rebalance] consumer:0x%" PRIx64 " remove topic:%s in the %s topic list, remain newTopics:%d", + consumerId, pTopic, type, (int)taosArrayGetSize(topicList)); break; } } @@ -896,46 +859,38 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->subscribeTime = taosGetTimestampMs(); pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; - mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer", pOldConsumer->consumerId); + mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId); } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) { int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i)); taosArrayPush(pOldConsumer->rebNewTopics, &topic); } - pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; - mInfo("consumer:0x%" PRIx64 " timer update, timer recover", pOldConsumer->consumerId); + mInfo("consumer:0x%" PRIx64 " recover update", pOldConsumer->consumerId); } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) { atomic_add_fetch_32(&pOldConsumer->epoch, 1); pOldConsumer->rebalanceTime = taosGetTimestampMs(); - mInfo("consumer:0x%" PRIx64 " reb update, only rebalance time", pOldConsumer->consumerId); + mInfo("[rebalance] consumer:0x%" PRIx64 " rebalance update, only rebalance time", pOldConsumer->consumerId); } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) { char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); - - // check if exist in current topic - removeFromNewTopicList(pOldConsumer, pNewTopic); - - // add to current topic + removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new"); bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic); if (existing) { - mError("consumer:0x%" PRIx64 "new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic); + mError("[rebalance] consumer:0x%" PRIx64 " add new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic); taosMemoryFree(pNewTopic); - } else { // added into current topic list + } else { taosArrayPush(pOldConsumer->currentTopics, &pNewTopic); taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString); } - // set status int32_t status = pOldConsumer->status; updateConsumerStatus(pOldConsumer); - - // the re-balance is triggered when the new consumer is launched. pOldConsumer->rebalanceTime = taosGetTimestampMs(); - atomic_add_fetch_32(&pOldConsumer->epoch, 1); - mInfo("consumer:0x%" PRIx64 " reb update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 + + mInfo("[rebalance] consumer:0x%" PRIx64 " rebalance update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d, newTopics:%d, removeTopics:%d", pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status, mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime, @@ -943,22 +898,16 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) { - char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0); + char *topic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0); + removeFromTopicList(pOldConsumer->rebRemovedTopics, topic, pOldConsumer->consumerId, "remove"); + removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current"); - // remove from removed topic - removeFromRemoveTopicList(pOldConsumer, removedTopic); - - // remove from current topic - removeFromCurrentTopicList(pOldConsumer, removedTopic); - - // set status int32_t status = pOldConsumer->status; updateConsumerStatus(pOldConsumer); - pOldConsumer->rebalanceTime = taosGetTimestampMs(); atomic_add_fetch_32(&pOldConsumer->epoch, 1); - mInfo("consumer:0x%" PRIx64 " reb update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 + mInfo("[rebalance]consumer:0x%" PRIx64 " rebalance update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d, newTopics:%d, removeTopics:%d", pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status, mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime, diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 654e9de617..3f69c7def3 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -302,7 +302,7 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t upda pConsumer->assignedTopics = subscribe->topicNames; subscribe->topicNames = NULL; }else if (updateType == CONSUMER_UPDATE_SUB){ - pConsumer->assignedTopics = subscribe->topicNames;; + pConsumer->assignedTopics = subscribe->topicNames; subscribe->topicNames = NULL; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index a48573bbd0..19a39a9c33 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -223,6 +223,14 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { return pRebInfo; } +static void pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char* key){ + SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(vgs); + SMqRebOutputVg outputVg = {consumerId, -1, pVgEp}; + taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); + mInfo("[rebalance] sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, + key, pVgEp->vgId, consumerId); +} + static void processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) { int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); int32_t actualRemoved = 0; @@ -235,10 +243,7 @@ static void processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, c int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); for (int32_t j = 0; j < consumerVgNum; j++) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); - SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp}; - taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("[rebalance] sub:%s mq re-balance remove vgId:%d from consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId, consumerId); + pushVgDataToHash(pConsumerEp->vgs, pHash, consumerId, pOutput->pSub->key); } taosArrayDestroy(pConsumerEp->vgs); @@ -273,15 +278,7 @@ static void processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj * static void processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs); for (int32_t i = 0; i < numOfVgroups; i++) { - SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs); - SMqRebOutputVg rebOutput = { - .oldConsumerId = -1, - .newConsumerId = -1, - .pVgEp = pVgEp, - }; - - taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg)); - mInfo("[rebalance] sub:%s mq re-balance processUnassignedVgroups vgId:%d from unassigned", pOutput->pSub->key, pVgEp->vgId); + pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key); } } @@ -314,31 +311,13 @@ static void processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId); if (consumerVgNum > minVgCnt) { if (cnt < remainderVgCnt) { - // pop until equal minVg + 1 - while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) { - SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); - SMqRebOutputVg outputVg = { - .oldConsumerId = pConsumerEp->consumerId, - .newConsumerId = -1, - .pVgEp = pVgEp, - }; - taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("[rebalance] sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, - pOutput->pSub->key, pVgEp->vgId, pConsumerEp->consumerId); + while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) { // pop until equal minVg + 1 + pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key); } cnt++; } else { - // all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) { - SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); - SMqRebOutputVg outputVg = { - .oldConsumerId = pConsumerEp->consumerId, - .newConsumerId = -1, - .pVgEp = pVgEp, - }; - taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("[rebalance] sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, - pOutput->pSub->key, pVgEp->vgId, pConsumerEp->consumerId); + pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key); } } } @@ -500,7 +479,7 @@ static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, con static void assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt){ SMqRebOutputVg *pRebVg = NULL; - void *pRemovedIter = NULL; + void *pAssignIter = NULL; void *pIter = NULL; while (1) { @@ -510,13 +489,13 @@ static void assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t min } SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) { - pRemovedIter = taosHashIterate(pHash, pRemovedIter); - if (pRemovedIter == NULL) { - mError("[rebalance] sub:%s removed iter is null, never can reach hear", pOutput->pSub->key); + pAssignIter = taosHashIterate(pHash, pAssignIter); + if (pAssignIter == NULL) { + mError("[rebalance] sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key); break; } - pRebVg = (SMqRebOutputVg *)pRemovedIter; + pRebVg = (SMqRebOutputVg *)pAssignIter; pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", @@ -531,13 +510,13 @@ static void assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t min } SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { - pRemovedIter = taosHashIterate(pHash, pRemovedIter); - if (pRemovedIter == NULL) { - mInfo("[rebalance] sub:%s removed iter is null", pOutput->pSub->key); + pAssignIter = taosHashIterate(pHash, pAssignIter); + if (pAssignIter == NULL) { + mInfo("[rebalance] sub:%s assign iter is used up", pOutput->pSub->key); break; } - pRebVg = (SMqRebOutputVg *)pRemovedIter; + pRebVg = (SMqRebOutputVg *)pAssignIter; pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", @@ -545,16 +524,17 @@ static void assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t min } } - if(pRemovedIter != NULL){ - mError("[rebalance]sub:%s error pRemovedIter should be NULL", pOutput->pSub->key); + taosHashCancelIterate(pOutput->pSub->consumerHash, pIter); + if(pAssignIter != NULL){ + mError("[rebalance]sub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key); } while (1) { - pRemovedIter = taosHashIterate(pHash, pRemovedIter); - if (pRemovedIter == NULL) { + pAssignIter = taosHashIterate(pHash, pAssignIter); + if (pAssignIter == NULL) { break; } - SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter; + SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pAssignIter; taosArrayPush(pOutput->rebVgs, pRebOutput); if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); // put all vg into unassigned @@ -678,10 +658,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu // 5. execution code = mndTransPrepare(pMnode, pTrans); - if (code != 0) { - mError("failed to prepare trans rebalance since %s", terrstr()); - goto END; - } END: nodesDestroyNode((SNode*)pPlan); @@ -757,24 +733,23 @@ static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) { int32_t status = atomic_load_32(&pConsumer->status); mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d", - pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime, - hbStatus); - - if ((taosArrayGetSize(pConsumer->assignedTopics) == 0) || - (status == MQ_CONSUMER_STATUS_LOST && - hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD)){ // unsubscribe or close - mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); - } + pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime, hbStatus); if (status == MQ_CONSUMER_STATUS_READY) { - if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { + if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {// unsubscribe or close + mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); + } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { taosRLockLatch(&pConsumer->lock); buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId); taosRUnLockLatch(&pConsumer->lock); }else{ checkForVgroupSplit(pMnode, pConsumer, rebSubHash); } - } else if (status != MQ_CONSUMER_STATUS_LOST) { + } else if (status == MQ_CONSUMER_STATUS_LOST) { + if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day + mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); + } + } else { taosRLockLatch(&pConsumer->lock); buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId); buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId); @@ -842,6 +817,7 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu taosRLockLatch(&pTopic->lock); + rebInput->oldConsumerNum = 0; rebOutput->pSub = mndCreateSubscription(pMnode, pTopic, key); if (rebOutput->pSub == NULL) { @@ -855,7 +831,6 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu taosRUnLockLatch(&pTopic->lock); mndReleaseTopic(pMnode, pTopic); - rebInput->oldConsumerNum = 0; mInfo("[rebalance] sub topic:%s has no consumers sub yet", key); } else { taosRLockLatch(&pSub->lock); @@ -870,7 +845,9 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu } static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { - int code = 0; + int code = 0; + void *pIter = NULL; + SMnode *pMnode = pMsg->info.node; mInfo("[rebalance] start to process mq timer"); if (!mndRebTryStart()) { @@ -883,14 +860,11 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { code = TSDB_CODE_OUT_OF_MEMORY; goto END; } - taosHashSetFreeFp(rebSubHash, freeRebalanceItem); mndCheckConsumer(pMsg, rebSubHash); mInfo("[rebalance] mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash)); - void *pIter = NULL; - SMnode *pMnode = pMsg->info.node; while (1) { pIter = taosHashIterate(rebSubHash, pIter); if (pIter == NULL) { @@ -904,8 +878,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { goto END; } - SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; - rebInput.pRebInfo = pRebInfo; + rebInput.pRebInfo = (SMqRebInfo*)pIter; if (buildRebOutput(pMnode, &rebInput, &rebOutput) != 0){ continue;