Merge pull request #12069 from taosdata/feature/tq
refactor: rebalance input
This commit is contained in:
commit
fe4538e3aa
|
@ -842,7 +842,7 @@ typedef struct {
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
int32_t vgVersion;
|
int32_t vgVersion;
|
||||||
int32_t numOfStables;
|
int32_t numOfStables;
|
||||||
int32_t buffer;
|
int32_t buffer;
|
||||||
int32_t pageSize;
|
int32_t pageSize;
|
||||||
int32_t pages;
|
int32_t pages;
|
||||||
|
@ -1442,32 +1442,32 @@ typedef struct {
|
||||||
SArray* lostConsumers; // SArray<int64_t>
|
SArray* lostConsumers; // SArray<int64_t>
|
||||||
SArray* removedConsumers; // SArray<int64_t>
|
SArray* removedConsumers; // SArray<int64_t>
|
||||||
SArray* newConsumers; // SArray<int64_t>
|
SArray* newConsumers; // SArray<int64_t>
|
||||||
} SMqRebSubscribe;
|
} SMqRebInfo;
|
||||||
|
|
||||||
static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
|
static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
|
||||||
SMqRebSubscribe* pRebSub = (SMqRebSubscribe*)taosMemoryCalloc(1, sizeof(SMqRebSubscribe));
|
SMqRebInfo* pRebInfo = (SMqRebInfo*)taosMemoryCalloc(1, sizeof(SMqRebInfo));
|
||||||
if (pRebSub == NULL) {
|
if (pRebInfo == NULL) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
strcpy(pRebSub->key, key);
|
strcpy(pRebInfo->key, key);
|
||||||
pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t));
|
pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||||
if (pRebSub->lostConsumers == NULL) {
|
if (pRebInfo->lostConsumers == NULL) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
pRebSub->removedConsumers = taosArrayInit(0, sizeof(int64_t));
|
pRebInfo->removedConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||||
if (pRebSub->removedConsumers == NULL) {
|
if (pRebInfo->removedConsumers == NULL) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
pRebSub->newConsumers = taosArrayInit(0, sizeof(int64_t));
|
pRebInfo->newConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||||
if (pRebSub->newConsumers == NULL) {
|
if (pRebInfo->newConsumers == NULL) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
return pRebSub;
|
return pRebInfo;
|
||||||
_err:
|
_err:
|
||||||
taosArrayDestroy(pRebSub->lostConsumers);
|
taosArrayDestroy(pRebInfo->lostConsumers);
|
||||||
taosArrayDestroy(pRebSub->removedConsumers);
|
taosArrayDestroy(pRebInfo->removedConsumers);
|
||||||
taosArrayDestroy(pRebSub->newConsumers);
|
taosArrayDestroy(pRebInfo->newConsumers);
|
||||||
taosMemoryFreeClear(pRebSub);
|
taosMemoryFreeClear(pRebInfo);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -554,9 +554,8 @@ int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogO
|
||||||
void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog);
|
void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
const SMqSubscribeObj* pOldSub;
|
int32_t oldConsumerNum;
|
||||||
const SMqTopicObj* pTopic;
|
const SMqRebInfo* pRebInfo;
|
||||||
const SMqRebSubscribe* pRebInfo;
|
|
||||||
} SMqRebInputObj;
|
} SMqRebInputObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -134,15 +134,15 @@ FAIL:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
||||||
SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
|
SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
|
||||||
if (pRebSub == NULL) {
|
if (pRebSub == NULL) {
|
||||||
pRebSub = tNewSMqRebSubscribe(key);
|
pRebSub = tNewSMqRebSubscribe(key);
|
||||||
if (pRebSub == NULL) {
|
if (pRebSub == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebSubscribe));
|
taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
|
||||||
}
|
}
|
||||||
return pRebSub;
|
return pRebSub;
|
||||||
}
|
}
|
||||||
|
@ -189,7 +189,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
|
||||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
|
char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||||
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
|
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
|
||||||
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
||||||
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
||||||
}
|
}
|
||||||
taosRUnLockLatch(&pConsumer->lock);
|
taosRUnLockLatch(&pConsumer->lock);
|
||||||
|
@ -200,7 +200,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
|
||||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
|
char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
|
||||||
mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
|
mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
|
||||||
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
||||||
taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
|
taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,7 +209,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
|
||||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
|
char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
|
||||||
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
|
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
|
||||||
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
||||||
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
||||||
}
|
}
|
||||||
taosRUnLockLatch(&pConsumer->lock);
|
taosRUnLockLatch(&pConsumer->lock);
|
||||||
|
|
|
@ -277,6 +277,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
|
||||||
memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
taosInitRWLatch(&pSubNew->lock);
|
taosInitRWLatch(&pSubNew->lock);
|
||||||
|
|
||||||
|
pSubNew->dbUid = pSub->dbUid;
|
||||||
pSubNew->subType = pSub->subType;
|
pSubNew->subType = pSub->subType;
|
||||||
pSubNew->withTbName = pSub->withTbName;
|
pSubNew->withTbName = pSub->withTbName;
|
||||||
pSubNew->withSchema = pSub->withSchema;
|
pSubNew->withSchema = pSub->withSchema;
|
||||||
|
@ -310,6 +311,7 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
|
||||||
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeString(buf, pSub->key);
|
tlen += taosEncodeString(buf, pSub->key);
|
||||||
|
tlen += taosEncodeFixedI64(buf, pSub->dbUid);
|
||||||
tlen += taosEncodeFixedI32(buf, pSub->vgNum);
|
tlen += taosEncodeFixedI32(buf, pSub->vgNum);
|
||||||
tlen += taosEncodeFixedI8(buf, pSub->subType);
|
tlen += taosEncodeFixedI8(buf, pSub->subType);
|
||||||
tlen += taosEncodeFixedI8(buf, pSub->withTbName);
|
tlen += taosEncodeFixedI8(buf, pSub->withTbName);
|
||||||
|
@ -336,6 +338,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
||||||
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
|
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
|
||||||
//
|
//
|
||||||
buf = taosDecodeStringTo(buf, pSub->key);
|
buf = taosDecodeStringTo(buf, pSub->key);
|
||||||
|
buf = taosDecodeFixedI64(buf, &pSub->dbUid);
|
||||||
buf = taosDecodeFixedI32(buf, &pSub->vgNum);
|
buf = taosDecodeFixedI32(buf, &pSub->vgNum);
|
||||||
buf = taosDecodeFixedI8(buf, &pSub->subType);
|
buf = taosDecodeFixedI8(buf, &pSub->subType);
|
||||||
buf = taosDecodeFixedI8(buf, &pSub->withTbName);
|
buf = taosDecodeFixedI8(buf, &pSub->withTbName);
|
||||||
|
|
|
@ -175,27 +175,20 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
||||||
SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
|
SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
|
||||||
if (pRebSub == NULL) {
|
if (pRebSub == NULL) {
|
||||||
pRebSub = tNewSMqRebSubscribe(key);
|
pRebSub = tNewSMqRebSubscribe(key);
|
||||||
if (pRebSub == NULL) {
|
if (pRebSub == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebSubscribe));
|
taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
|
||||||
}
|
}
|
||||||
return pRebSub;
|
return pRebSub;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
|
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
|
||||||
if (pInput->pTopic != NULL) {
|
|
||||||
// create subscribe
|
|
||||||
pOutput->pSub = mndCreateSub(pMnode, pInput->pTopic, pInput->pRebInfo->key);
|
|
||||||
ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) == 0);
|
|
||||||
} else {
|
|
||||||
pOutput->pSub = tCloneSubscribeObj(pInput->pOldSub);
|
|
||||||
}
|
|
||||||
int32_t totalVgNum = pOutput->pSub->vgNum;
|
int32_t totalVgNum = pOutput->pSub->vgNum;
|
||||||
|
|
||||||
mInfo("mq rebalance subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum);
|
mInfo("mq rebalance subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum);
|
||||||
|
@ -246,12 +239,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. calc vg number of each consumer
|
// 3. calc vg number of each consumer
|
||||||
int32_t oldSz = 0;
|
int32_t afterRebConsumerNum = pInput->oldConsumerNum + taosArrayGetSize(pInput->pRebInfo->newConsumers) -
|
||||||
if (pInput->pOldSub) {
|
taosArrayGetSize(pInput->pRebInfo->removedConsumers);
|
||||||
oldSz = taosHashGetSize(pInput->pOldSub->consumerHash);
|
|
||||||
}
|
|
||||||
int32_t afterRebConsumerNum =
|
|
||||||
oldSz + taosArrayGetSize(pInput->pRebInfo->newConsumers) - taosArrayGetSize(pInput->pRebInfo->removedConsumers);
|
|
||||||
int32_t minVgCnt = 0;
|
int32_t minVgCnt = 0;
|
||||||
int32_t imbConsumerNum = 0;
|
int32_t imbConsumerNum = 0;
|
||||||
// calc num
|
// calc num
|
||||||
|
@ -489,22 +478,34 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
|
||||||
rebOutput.touchedConsumers = taosArrayInit(0, sizeof(void *));
|
rebOutput.touchedConsumers = taosArrayInit(0, sizeof(void *));
|
||||||
rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
|
rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
|
||||||
|
|
||||||
SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter;
|
SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter;
|
||||||
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);
|
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key);
|
||||||
|
|
||||||
|
rebInput.pRebInfo = pRebInfo;
|
||||||
|
|
||||||
if (pSub == NULL) {
|
if (pSub == NULL) {
|
||||||
// split sub key and extract topic
|
// split sub key and extract topic
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
mndSplitSubscribeKey(pRebSub->key, topic, cgroup);
|
mndSplitSubscribeKey(pRebInfo->key, topic, cgroup);
|
||||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||||
ASSERT(pTopic);
|
ASSERT(pTopic);
|
||||||
taosRLockLatch(&pTopic->lock);
|
taosRLockLatch(&pTopic->lock);
|
||||||
rebInput.pTopic = pTopic;
|
|
||||||
}
|
|
||||||
|
|
||||||
rebInput.pRebInfo = pRebSub;
|
rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key);
|
||||||
rebInput.pOldSub = pSub;
|
ASSERT(taosHashGetSize(rebOutput.pSub->consumerHash) == 0);
|
||||||
|
|
||||||
|
taosRUnLockLatch(&pTopic->lock);
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
|
||||||
|
rebInput.oldConsumerNum = 0;
|
||||||
|
} else {
|
||||||
|
taosRLockLatch(&pSub->lock);
|
||||||
|
rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
|
||||||
|
rebOutput.pSub = tCloneSubscribeObj(pSub);
|
||||||
|
taosRUnLockLatch(&pSub->lock);
|
||||||
|
mndReleaseSubscribe(pMnode, pSub);
|
||||||
|
}
|
||||||
|
|
||||||
// TODO replace assert with error check
|
// TODO replace assert with error check
|
||||||
ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0);
|
ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0);
|
||||||
|
@ -517,14 +518,6 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
|
||||||
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
|
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
|
||||||
mError("persist rebalance output error, possibly vnode splitted or dropped");
|
mError("persist rebalance output error, possibly vnode splitted or dropped");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rebInput.pTopic) {
|
|
||||||
SMqTopicObj *pTopic = (SMqTopicObj *)rebInput.pTopic;
|
|
||||||
taosRUnLockLatch(&pTopic->lock);
|
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
|
||||||
} else {
|
|
||||||
mndReleaseSubscribe(pMnode, pSub);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset flag
|
// reset flag
|
||||||
|
|
Loading…
Reference in New Issue