enh: remove assert from mnode consumer
This commit is contained in:
parent
fc21140e03
commit
9a10242f7d
|
@ -345,6 +345,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_MND_TOPIC_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x03EB)
|
#define TSDB_CODE_MND_TOPIC_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x03EB)
|
||||||
#define TSDB_CODE_MND_CGROUP_USED TAOS_DEF_ERROR_CODE(0, 0x03EC)
|
#define TSDB_CODE_MND_CGROUP_USED TAOS_DEF_ERROR_CODE(0, 0x03EC)
|
||||||
#define TSDB_CODE_MND_TOPIC_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03ED)
|
#define TSDB_CODE_MND_TOPIC_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03ED)
|
||||||
|
#define TSDB_CODE_MND_INVALID_SUB_OPTION TAOS_DEF_ERROR_CODE(0, 0x03EE)
|
||||||
#define TSDB_CODE_MND_IN_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x03EF)
|
#define TSDB_CODE_MND_IN_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x03EF)
|
||||||
|
|
||||||
// mnode-stream
|
// mnode-stream
|
||||||
|
|
|
@ -96,8 +96,12 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
|
||||||
pSub->subType = pTopic->subType;
|
pSub->subType = pTopic->subType;
|
||||||
pSub->withMeta = pTopic->withMeta;
|
pSub->withMeta = pTopic->withMeta;
|
||||||
|
|
||||||
ASSERT(pSub->unassignedVgs->size == 0);
|
if (pSub->unassignedVgs->size != 0 || taosHashGetSize(pSub->consumerHash) != 0) {
|
||||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
tDeleteSubscribeObj(pSub);
|
||||||
|
taosMemoryFree(pSub);
|
||||||
|
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
|
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
|
||||||
tDeleteSubscribeObj(pSub);
|
tDeleteSubscribeObj(pSub);
|
||||||
|
@ -105,8 +109,12 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pSub->unassignedVgs->size > 0);
|
if (pSub->unassignedVgs->size <= 0 || taosHashGetSize(pSub->consumerHash) != 0) {
|
||||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
tDeleteSubscribeObj(pSub);
|
||||||
|
taosMemoryFree(pSub);
|
||||||
|
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
return pSub;
|
return pSub;
|
||||||
}
|
}
|
||||||
|
@ -144,7 +152,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
|
||||||
|
|
||||||
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
|
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
|
||||||
const SMqRebOutputVg *pRebVg) {
|
const SMqRebOutputVg *pRebVg) {
|
||||||
ASSERT(pRebVg->oldConsumerId != pRebVg->newConsumerId);
|
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
|
||||||
|
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
void *buf;
|
void *buf;
|
||||||
int32_t tlen;
|
int32_t tlen;
|
||||||
|
@ -155,8 +166,8 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM
|
||||||
int32_t vgId = pRebVg->pVgEp->vgId;
|
int32_t vgId = pRebVg->pVgEp->vgId;
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||||
if (pVgObj == NULL) {
|
if (pVgObj == NULL) {
|
||||||
ASSERT(0);
|
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -206,9 +217,9 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
|
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
|
||||||
int32_t totalVgNum = pOutput->pSub->vgNum;
|
int32_t totalVgNum = pOutput->pSub->vgNum;
|
||||||
|
const char *sub = pOutput->pSub->key;
|
||||||
mInfo("mq rebalance: subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum);
|
mInfo("sub:%s, mq rebalance vgNum:%d", sub, pOutput->pSub->vgNum);
|
||||||
|
|
||||||
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
|
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
|
||||||
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||||
|
@ -218,11 +229,24 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
int32_t actualRemoved = 0;
|
int32_t actualRemoved = 0;
|
||||||
for (int32_t i = 0; i < removedNum; i++) {
|
for (int32_t i = 0; i < removedNum; i++) {
|
||||||
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
|
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
|
||||||
ASSERT(consumerId > 0);
|
if (consumerId <= 0) {
|
||||||
|
mError("sub:%s, mq rebalance cunsumerId:%" PRId64 " <= 0", sub, consumerId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||||
ASSERT(pConsumerEp);
|
if (pConsumerEp == NULL) {
|
||||||
|
mError("sub:%s, mq rebalance ep is null, cunsumberId:%" PRId64, sub, consumerId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (pConsumerEp) {
|
if (pConsumerEp) {
|
||||||
ASSERT(consumerId == pConsumerEp->consumerId);
|
if (consumerId != pConsumerEp->consumerId) {
|
||||||
|
mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " not matched saved:%" PRId64, sub, consumerId,
|
||||||
|
pConsumerEp->consumerId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
actualRemoved++;
|
actualRemoved++;
|
||||||
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
|
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||||
for (int32_t j = 0; j < consumerVgNum; j++) {
|
for (int32_t j = 0; j < consumerVgNum; j++) {
|
||||||
|
@ -233,7 +257,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
.pVgEp = pVgEp,
|
.pVgEp = pVgEp,
|
||||||
};
|
};
|
||||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||||
mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64, pVgEp->vgId, consumerId);
|
mInfo("sub:%s, mq rebalance remove vgId:%d from consumer:%" PRId64, sub, pVgEp->vgId, consumerId);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pConsumerEp->vgs);
|
taosArrayDestroy(pConsumerEp->vgs);
|
||||||
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||||
|
@ -241,7 +265,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
taosArrayPush(pOutput->removedConsumers, &consumerId);
|
taosArrayPush(pOutput->removedConsumers, &consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ASSERT(removedNum == actualRemoved);
|
|
||||||
|
if (removedNum != actualRemoved) {
|
||||||
|
mError("sub:%s, mq rebalance removedNum:%d not matched with actual:%d", sub, removedNum, actualRemoved);
|
||||||
|
}
|
||||||
|
|
||||||
// if previously no consumer, there are vgs not assigned
|
// if previously no consumer, there are vgs not assigned
|
||||||
{
|
{
|
||||||
|
@ -254,7 +281,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
.pVgEp = pVgEp,
|
.pVgEp = pVgEp,
|
||||||
};
|
};
|
||||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
|
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
|
||||||
mInfo("mq rebalance: remove vgId:%d from unassigned", pVgEp->vgId);
|
mInfo("sub:%s, mq rebalance remove vgId:%d from unassigned", sub, pVgEp->vgId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -268,8 +295,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
minVgCnt = totalVgNum / afterRebConsumerNum;
|
minVgCnt = totalVgNum / afterRebConsumerNum;
|
||||||
imbConsumerNum = totalVgNum % afterRebConsumerNum;
|
imbConsumerNum = totalVgNum % afterRebConsumerNum;
|
||||||
}
|
}
|
||||||
mInfo("mq rebalance: %d consumer after rebalance, at least %d vg each, %d consumer has more vg", afterRebConsumerNum,
|
mInfo("sub:%s, mq rebalance %d consumer after rebalance, at least %d vg each, %d consumer has more vg", sub,
|
||||||
minVgCnt, imbConsumerNum);
|
afterRebConsumerNum, minVgCnt, imbConsumerNum);
|
||||||
|
|
||||||
// 4. first scan: remove consumer more than wanted, put to remove hash
|
// 4. first scan: remove consumer more than wanted, put to remove hash
|
||||||
int32_t imbCnt = 0;
|
int32_t imbCnt = 0;
|
||||||
|
@ -278,7 +305,11 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||||
ASSERT(pConsumerEp->consumerId > 0);
|
if (pConsumerEp->consumerId <= 0) {
|
||||||
|
mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " <= 0", sub, pConsumerEp->consumerId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
|
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||||
// all old consumers still existing are touched
|
// all old consumers still existing are touched
|
||||||
// TODO optimize: touch only consumer whose vgs changed
|
// TODO optimize: touch only consumer whose vgs changed
|
||||||
|
@ -298,7 +329,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
.pVgEp = pVgEp,
|
.pVgEp = pVgEp,
|
||||||
};
|
};
|
||||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||||
mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId,
|
mInfo("sub:%s, mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
|
||||||
pConsumerEp->consumerId);
|
pConsumerEp->consumerId);
|
||||||
}
|
}
|
||||||
imbCnt++;
|
imbCnt++;
|
||||||
|
@ -313,7 +344,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
.pVgEp = pVgEp,
|
.pVgEp = pVgEp,
|
||||||
};
|
};
|
||||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||||
mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId,
|
mInfo("sub:%s, mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
|
||||||
pConsumerEp->consumerId);
|
pConsumerEp->consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -325,13 +356,17 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
int32_t consumerNum = taosArrayGetSize(pInput->pRebInfo->newConsumers);
|
int32_t consumerNum = taosArrayGetSize(pInput->pRebInfo->newConsumers);
|
||||||
for (int32_t i = 0; i < consumerNum; i++) {
|
for (int32_t i = 0; i < consumerNum; i++) {
|
||||||
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
|
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
|
||||||
ASSERT(consumerId > 0);
|
if (consumerId <= 0) {
|
||||||
|
mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " <= 0", sub, consumerId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
SMqConsumerEp newConsumerEp;
|
SMqConsumerEp newConsumerEp;
|
||||||
newConsumerEp.consumerId = consumerId;
|
newConsumerEp.consumerId = consumerId;
|
||||||
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
||||||
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
||||||
taosArrayPush(pOutput->newConsumers, &consumerId);
|
taosArrayPush(pOutput->newConsumers, &consumerId);
|
||||||
mInfo("mq rebalance: add new consumer:%" PRId64, consumerId);
|
mInfo("sub:%s, mq rebalance add new consumer:%" PRId64, sub, consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -344,13 +379,18 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||||
ASSERT(pConsumerEp->consumerId > 0);
|
if (pConsumerEp->consumerId <= 0) {
|
||||||
|
mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " <= 0", sub, pConsumerEp->consumerId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// push until equal minVg
|
// push until equal minVg
|
||||||
while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
|
while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
|
||||||
// iter hash and find one vg
|
// iter hash and find one vg
|
||||||
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
||||||
ASSERT(pRemovedIter);
|
mError("sub:%s, removed iter is null", sub);
|
||||||
|
continue;
|
||||||
|
|
||||||
pRebVg = (SMqRebOutputVg *)pRemovedIter;
|
pRebVg = (SMqRebOutputVg *)pRemovedIter;
|
||||||
// push
|
// push
|
||||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||||
|
@ -361,7 +401,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pIter == NULL);
|
|
||||||
// 7. handle unassigned vg
|
// 7. handle unassigned vg
|
||||||
if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) {
|
if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) {
|
||||||
// if has consumer, assign all left vg
|
// if has consumer, assign all left vg
|
||||||
|
@ -377,9 +416,12 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
}
|
}
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||||
ASSERT(pIter);
|
|
||||||
pConsumerEp = (SMqConsumerEp *)pIter;
|
pConsumerEp = (SMqConsumerEp *)pIter;
|
||||||
ASSERT(pConsumerEp->consumerId > 0);
|
if (pConsumerEp == NULL || pConsumerEp->consumerId <= 0) {
|
||||||
|
mError("sub:%s, mq rebalance cunsumberId invalid", sub);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
|
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -404,19 +446,23 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
pIter = taosHashIterate(pHash, pIter);
|
pIter = taosHashIterate(pHash, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
pRebOutput = (SMqRebOutputVg *)pIter;
|
pRebOutput = (SMqRebOutputVg *)pIter;
|
||||||
ASSERT(pRebOutput->newConsumerId == -1);
|
if (pRebOutput->newConsumerId != 1) {
|
||||||
|
mError("sub:%s, mq rebalance output consumerId:%" PRId64 " not -1", sub, pRebOutput->newConsumerId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
|
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
|
||||||
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
||||||
mInfo("mq rebalance: unassign vgId:%d (second scan)", pRebOutput->pVgEp->vgId);
|
mInfo("sub:%s, mq rebalance unassign vgId:%d (second scan)", sub, pRebOutput->pVgEp->vgId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 8. generate logs
|
// 8. generate logs
|
||||||
mInfo("mq rebalance: calculation completed, rebalanced vg:");
|
mInfo("sub:%s, mq rebalance calculation completed, rebalanced vg", sub);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
|
||||||
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
|
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
|
||||||
mInfo("mq rebalance: vgId:%d, moved from consumer:%" PRId64 ", to consumer:%" PRId64, pOutputRebVg->pVgEp->vgId,
|
mInfo("sub:%s, mq rebalance vgId:%d, moved from consumer:%" PRId64 ", to consumer:%" PRId64, sub,
|
||||||
pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
|
pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -425,10 +471,11 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||||
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
||||||
mInfo("mq rebalance: final cfg: consumer %" PRId64 " has %d vg", pConsumerEp->consumerId, sz);
|
mInfo("sub:%s, mq rebalance final cfg: consumer %" PRId64 " has %d vg", sub, pConsumerEp->consumerId, sz);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
||||||
mInfo("mq rebalance: final cfg: vg %d to consumer %" PRId64 "", pVgEp->vgId, pConsumerEp->consumerId);
|
mInfo("sub:%s, mq rebalance final cfg: vg %d to consumer %" PRId64 "", sub, pVgEp->vgId,
|
||||||
|
pConsumerEp->consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -487,7 +534,8 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
||||||
consumerNum = taosArrayGetSize(pOutput->newConsumers);
|
consumerNum = taosArrayGetSize(pOutput->newConsumers);
|
||||||
for (int32_t i = 0; i < consumerNum; i++) {
|
for (int32_t i = 0; i < consumerNum; i++) {
|
||||||
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i);
|
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i);
|
||||||
ASSERT(consumerId > 0);
|
if (consumerId <= 0) continue;
|
||||||
|
|
||||||
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
|
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
|
||||||
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
|
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
|
||||||
pConsumerNew->updateType = CONSUMER_UPDATE__ADD;
|
pConsumerNew->updateType = CONSUMER_UPDATE__ADD;
|
||||||
|
@ -497,7 +545,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
||||||
taosArrayPush(pConsumerNew->rebNewTopics, &topic);
|
taosArrayPush(pConsumerNew->rebNewTopics, &topic);
|
||||||
mndReleaseConsumer(pMnode, pConsumerOld);
|
mndReleaseConsumer(pMnode, pConsumerOld);
|
||||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
||||||
ASSERT(0);
|
|
||||||
tDeleteSMqConsumerObj(pConsumerNew);
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
taosMemoryFree(pConsumerNew);
|
taosMemoryFree(pConsumerNew);
|
||||||
goto REB_FAIL;
|
goto REB_FAIL;
|
||||||
|
@ -510,7 +557,8 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
||||||
consumerNum = taosArrayGetSize(pOutput->removedConsumers);
|
consumerNum = taosArrayGetSize(pOutput->removedConsumers);
|
||||||
for (int32_t i = 0; i < consumerNum; i++) {
|
for (int32_t i = 0; i < consumerNum; i++) {
|
||||||
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
|
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
|
||||||
ASSERT(consumerId > 0);
|
if (consumerId <= 0) continue;
|
||||||
|
|
||||||
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
|
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
|
||||||
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
|
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
|
||||||
pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE;
|
pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE;
|
||||||
|
@ -520,7 +568,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
||||||
taosArrayPush(pConsumerNew->rebRemovedTopics, &topic);
|
taosArrayPush(pConsumerNew->rebRemovedTopics, &topic);
|
||||||
mndReleaseConsumer(pMnode, pConsumerOld);
|
mndReleaseConsumer(pMnode, pConsumerOld);
|
||||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
||||||
ASSERT(0);
|
|
||||||
tDeleteSMqConsumerObj(pConsumerNew);
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
taosMemoryFree(pConsumerNew);
|
taosMemoryFree(pConsumerNew);
|
||||||
goto REB_FAIL;
|
goto REB_FAIL;
|
||||||
|
@ -577,7 +624,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
|
mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
|
||||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||||
/*ASSERT(pTopic);*/
|
|
||||||
if (pTopic == NULL) {
|
if (pTopic == NULL) {
|
||||||
mError("mq rebalance %s failed since topic %s not exist, abort", pRebInfo->key, topic);
|
mError("mq rebalance %s failed since topic %s not exist, abort", pRebInfo->key, topic);
|
||||||
continue;
|
continue;
|
||||||
|
@ -586,7 +632,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key);
|
rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key);
|
||||||
memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
|
memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
|
||||||
ASSERT(taosHashGetSize(rebOutput.pSub->consumerHash) == 0);
|
|
||||||
|
|
||||||
taosRUnLockLatch(&pTopic->lock);
|
taosRUnLockLatch(&pTopic->lock);
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
@ -606,7 +651,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
// if add more consumer to balanced subscribe,
|
// if add more consumer to balanced subscribe,
|
||||||
// possibly no vg is changed
|
// possibly no vg is changed
|
||||||
/*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/
|
|
||||||
|
|
||||||
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
|
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
|
||||||
mError("mq rebalance persist rebalance output error, possibly vnode splitted or dropped");
|
mError("mq rebalance persist rebalance output error, possibly vnode splitted or dropped");
|
||||||
|
|
|
@ -276,8 +276,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST, "Subcribe not exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_OFFSET_NOT_EXIST, "Offset not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_OFFSET_NOT_EXIST, "Offset not exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer not ready")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer not ready")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_MUST_BE_DELETED, "Topic must be dropped first")
|
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_MUST_BE_DELETED, "Topic must be dropped first")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SUB_OPTION, "Invalid subscribe option")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_IN_REBALANCE, "Topic being rebalanced")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_IN_REBALANCE, "Topic being rebalanced")
|
||||||
|
|
||||||
// mnode-stream
|
// mnode-stream
|
||||||
|
|
Loading…
Reference in New Issue