remove assert

This commit is contained in:
Liu Jicong 2023-01-03 17:06:43 +08:00
parent 9a10242f7d
commit e4ab8f986b
1 changed files with 7 additions and 50 deletions

View File

@ -96,26 +96,12 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
pSub->subType = pTopic->subType;
pSub->withMeta = pTopic->withMeta;
if (pSub->unassignedVgs->size != 0 || taosHashGetSize(pSub->consumerHash) != 0) {
tDeleteSubscribeObj(pSub);
taosMemoryFree(pSub);
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
return NULL;
}
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
tDeleteSubscribeObj(pSub);
taosMemoryFree(pSub);
return NULL;
}
if (pSub->unassignedVgs->size <= 0 || taosHashGetSize(pSub->consumerHash) != 0) {
tDeleteSubscribeObj(pSub);
taosMemoryFree(pSub);
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
return NULL;
}
return pSub;
}
@ -229,24 +215,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
int32_t actualRemoved = 0;
for (int32_t i = 0; i < removedNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
if (consumerId <= 0) {
mError("sub:%s, mq rebalance cunsumerId:%" PRId64 " <= 0", sub, consumerId);
continue;
}
SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
if (pConsumerEp == NULL) {
mError("sub:%s, mq rebalance ep is null, cunsumberId:%" PRId64, sub, consumerId);
continue;
}
if (pConsumerEp) {
if (consumerId != pConsumerEp->consumerId) {
mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " not matched saved:%" PRId64, sub, consumerId,
pConsumerEp->consumerId);
continue;
}
actualRemoved++;
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
for (int32_t j = 0; j < consumerVgNum; j++) {
@ -305,10 +277,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
if (pConsumerEp->consumerId <= 0) {
mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " <= 0", sub, pConsumerEp->consumerId);
continue;
}
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
// all old consumers still existing are touched
@ -356,10 +324,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
int32_t consumerNum = taosArrayGetSize(pInput->pRebInfo->newConsumers);
for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
if (consumerId <= 0) {
mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " <= 0", sub, consumerId);
continue;
}
SMqConsumerEp newConsumerEp;
newConsumerEp.consumerId = consumerId;
@ -379,10 +343,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
if (pConsumerEp->consumerId <= 0) {
mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " <= 0", sub, pConsumerEp->consumerId);
continue;
}
// push until equal minVg
while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
@ -417,10 +377,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
pConsumerEp = (SMqConsumerEp *)pIter;
if (pConsumerEp == NULL || pConsumerEp->consumerId <= 0) {
mError("sub:%s, mq rebalance cunsumberId invalid", sub);
continue;
}
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
break;
@ -446,10 +402,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
pIter = taosHashIterate(pHash, pIter);
if (pIter == NULL) break;
pRebOutput = (SMqRebOutputVg *)pIter;
if (pRebOutput->newConsumerId != 1) {
mError("sub:%s, mq rebalance output consumerId:%" PRId64 " not -1", sub, pRebOutput->newConsumerId);
continue;
}
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
taosArrayPush(pOutput->rebVgs, pRebOutput);
@ -534,7 +486,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
consumerNum = taosArrayGetSize(pOutput->newConsumers);
for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i);
if (consumerId <= 0) continue;
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
@ -557,7 +508,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
consumerNum = taosArrayGetSize(pOutput->removedConsumers);
for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
if (consumerId <= 0) continue;
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
@ -631,6 +581,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
taosRLockLatch(&pTopic->lock);
rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key);
if (rebOutput.pSub == NULL) {
mError("mq rebalance %s failed create sub since %s, abort", pRebInfo->key, terrstr());
taosRUnLockLatch(&pTopic->lock);
mndReleaseTopic(pMnode, pTopic);
continue;
}
memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
taosRUnLockLatch(&pTopic->lock);