rewrite rebalance

This commit is contained in:
Liu Jicong 2022-02-11 16:24:38 +08:00
parent f2bc000482
commit 42da4f6fdd
9 changed files with 398 additions and 125 deletions

View File

@ -44,11 +44,11 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
return -1;
}
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
/*if (taos_errno(pRes) != 0) {*/
/*printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));*/
/*return -1;*/
/*}*/
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tu using st1 tags(1)");
@ -114,19 +114,19 @@ void basic_consume_loop(tmq_t *tmq,
return;
}
int32_t cnt = 0;
clock_t startTime = clock();
/*clock_t startTime = clock();*/
while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 0);
if (tmqmessage) {
cnt++;
/*msg_process(tmqmessage);*/
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
} else {
break;
/*} else {*/
/*break;*/
}
}
clock_t endTime = clock();
printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);
/*clock_t endTime = clock();*/
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
err = tmq_consumer_close(tmq);
if (err)

View File

@ -1055,7 +1055,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
buf = taosDecodeString(buf, &pReq->consumerGroup);
pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*));
for (int i = 0; i < pReq->topicNum; i++) {
char* name = NULL;
char* name;
buf = taosDecodeString(buf, &name);
taosArrayPush(pReq->topicNames, &name);
}
@ -1132,9 +1132,60 @@ typedef struct {
} SMqTmrMsg;
typedef struct {
int64_t consumerId;
const char* key;
SArray* lostConsumers; //SArray<int64_t>
SArray* removedConsumers; //SArray<int64_t>
SArray* newConsumers; //SArray<int64_t>
} SMqRebSubscribe;
static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
SMqRebSubscribe* pRebSub = (SMqRebSubscribe*)calloc(1, sizeof(SMqRebSubscribe));
if (pRebSub == NULL) {
goto _err;
}
pRebSub->key = key;
pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebSub->lostConsumers == NULL) {
goto _err;
}
pRebSub->removedConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebSub->removedConsumers == NULL) {
goto _err;
}
pRebSub->newConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebSub->newConsumers == NULL) {
goto _err;
}
return pRebSub;
_err:
taosArrayDestroy(pRebSub->lostConsumers);
taosArrayDestroy(pRebSub->removedConsumers);
taosArrayDestroy(pRebSub->newConsumers);
tfree(pRebSub);
return NULL;
}
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization / deserialization
typedef struct {
//SArray* rebSubscribes; //SArray<SMqRebSubscribe>
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
} SMqDoRebalanceMsg;
#if 0
static FORCE_INLINE SMqDoRebalanceMsg* tNewSMqDoRebalanceMsg() {
SMqDoRebalanceMsg *pMsg = malloc(sizeof(SMqDoRebalanceMsg));
if (pMsg == NULL) {
return NULL;
}
pMsg->rebSubscribes = taosArrayInit(0, sizeof(SMqRebSubscribe));
if (pMsg->rebSubscribes == NULL) {
free(pMsg);
return NULL;
}
return pMsg;
}
#endif
typedef struct {
int64_t status;
} SMVSubscribeRsp;

View File

@ -222,7 +222,7 @@ int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw);
* @param pKey The key value of the row.
* @return void* The object of the row.
*/
void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey);
void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey);
/**
* @brief Release a row from sdb.

View File

@ -22,6 +22,16 @@
extern "C" {
#endif
enum {
MQ_CONSUMER_STATUS__INIT = 1,
MQ_CONSUMER_STATUS__IDLE,
MQ_CONSUMER_STATUS__ACTIVE,
MQ_CONSUMER_STATUS__LOST,
MQ_CONSUMER_STATUS__MODIFY
};
int32_t mndInitConsumer(SMnode *pMnode);
void mndCleanupConsumer(SMnode *pMnode);

View File

@ -388,6 +388,7 @@ static FORCE_INLINE int32_t tEncodeSMqSubConsumer(void** buf, const SMqSubConsum
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
int32_t sz = taosArrayGetSize(pConsumer->vgInfo);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp* pCEp = taosArrayGet(pConsumer->vgInfo, i);
tlen += tEncodeSMqConsumerEp(buf, pCEp);
@ -498,9 +499,9 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp = {0};
buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->unassignedVg, &cEp);
SMqConsumerEp consumerEp = {0};
buf = tDecodeSMqConsumerEp(buf, &consumerEp);
taosArrayPush(pSub->unassignedVg, &consumerEp);
}
return buf;
}
@ -539,7 +540,8 @@ typedef struct {
int64_t connId;
SRWLatch lock;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray* topics; // SArray<char*>
SArray* currentTopics; // SArray<char*>
SArray* recentRemovedTopics; // SArray<char*>
int64_t epoch;
// stat
int64_t pollCnt;
@ -552,16 +554,25 @@ typedef struct {
} SMqConsumerObj;
static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) {
int32_t sz;
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
tlen += taosEncodeFixedI64(buf, pConsumer->connId);
tlen += taosEncodeFixedI64(buf, pConsumer->epoch);
tlen += taosEncodeFixedI64(buf, pConsumer->pollCnt);
tlen += taosEncodeString(buf, pConsumer->cgroup);
int32_t sz = taosArrayGetSize(pConsumer->topics);
sz = taosArrayGetSize(pConsumer->currentTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char* topic = taosArrayGetP(pConsumer->topics, i);
char* topic = taosArrayGetP(pConsumer->currentTopics, i);
tlen += taosEncodeString(buf, topic);
}
sz = taosArrayGetSize(pConsumer->recentRemovedTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char* topic = taosArrayGetP(pConsumer->recentRemovedTopics, i);
tlen += taosEncodeString(buf, topic);
}
return tlen;
@ -574,12 +585,21 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
buf = taosDecodeFixedI64(buf, &pConsumer->epoch);
buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt);
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->topics = taosArrayInit(sz, sizeof(SMqConsumerObj));
pConsumer->currentTopics = taosArrayInit(sz, sizeof(SMqConsumerObj));
for (int32_t i = 0; i < sz; i++) {
char* topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->topics, &topic);
taosArrayPush(pConsumer->currentTopics, &topic);
}
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->recentRemovedTopics = taosArrayInit(sz, sizeof(SMqConsumerObj));
for (int32_t i = 0; i < sz; i++) {
char* topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->recentRemovedTopics, &topic);
}
return buf;
}

View File

@ -25,7 +25,8 @@ extern "C" {
int32_t mndInitSubscribe(SMnode *pMnode);
void mndCleanupSubscribe(SMnode *pMnode);
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *CGroup, char *topicName);
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const char *topicName);
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char* key);
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
#ifdef __cplusplus

View File

@ -61,6 +61,7 @@ SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) {
}
pConsumer->epoch = 1;
pConsumer->consumerId = consumerId;
pConsumer->status = MQ_CONSUMER_STATUS__INIT;
strcpy(pConsumer->cgroup, cgroup);
taosInitRWLatch(&pConsumer->lock);
return pConsumer;

View File

@ -33,12 +33,6 @@
#define MND_SUBSCRIBE_REBALANCE_CNT 3
enum {
MQ_CONSUMER_STATUS__INIT = 1,
MQ_CONSUMER_STATUS__ACTIVE,
MQ_CONSUMER_STATUS__LOST,
};
enum {
MQ_SUBSCRIBE_STATUS__ACTIVE = 1,
MQ_SUBSCRIBE_STATUS__DELETED,
@ -58,10 +52,13 @@ static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg);
static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
const SMqConsumerEp *pSub);
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp);
static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub);
int32_t mndInitSubscribe(SMnode *pMnode) {
@ -77,6 +74,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg);
return sdbSetTable(pMnode->pSdb, table);
}
@ -106,18 +104,13 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
return pSub;
}
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqTopicObj *pTopic,
const SMqConsumerEp *pConsumerEp, const char *cgroup) {
SMqSetCVgReq req = {0};
strcpy(req.cgroup, cgroup);
strcpy(req.topicName, pTopic->name);
req.sql = pTopic->sql;
req.logicalPlan = pTopic->logicalPlan;
req.physicalPlan = pTopic->physicalPlan;
req.qmsg = pConsumerEp->qmsg;
req.oldConsumerId = pConsumerEp->oldConsumerId;
req.newConsumerId = pConsumerEp->consumerId;
req.vgId = pConsumerEp->vgId;
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
SMqSetCVgReq req = {
.vgId = pConsumerEp->vgId,
.oldConsumerId = pConsumerEp->oldConsumerId,
.newConsumerId = pConsumerEp->consumerId,
};
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *buf = malloc(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
@ -128,22 +121,23 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqTopicOb
pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
pMsgHead->vgId = htonl(pConsumerEp->vgId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSMqSetCVgReq(&abuf, &req);
*pBuf = buf;
*pLen = tlen;
return 0;
}
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic,
const SMqConsumerEp *pConsumerEp, const char *cgroup) {
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
int32_t vgId = pConsumerEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
void *buf;
int32_t tlen;
if (mndBuildRebalanceMsg(&buf, &tlen, pTopic, pConsumerEp, cgroup) < 0) {
if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp) < 0) {
return -1;
}
@ -226,7 +220,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
rsp.consumerId = consumerId;
rsp.epoch = pConsumer->epoch;
if (pReq->epoch != rsp.epoch) {
SArray *pTopics = pConsumer->topics;
SArray *pTopics = pConsumer->currentTopics;
int sz = taosArrayGetSize(pTopics);
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
for (int i = 0; i < sz; i++) {
@ -234,12 +228,13 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName);
ASSERT(pSub);
int csz = taosArrayGetSize(pSub->consumers);
//TODO: change to bsearch
// TODO: change to bsearch
for (int j = 0; j < csz; j++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
if (consumerId == pSubConsumer->consumerId) {
int vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
SMqSubTopicEp topicEp;
strcpy(topicEp.topic, topicName);
topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
for (int k = 0; k < vgsz; k++) {
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);
@ -280,11 +275,27 @@ static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
return 0;
}
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key));
if (pRebSub == NULL) {
pRebSub = tNewSMqRebSubscribe(key);
if (pRebSub == NULL) {
// TODO
return NULL;
}
taosHashPut(pHash, key, strlen(key), pRebSub, sizeof(SMqRebSubscribe));
}
return pRebSub;
}
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
SMqConsumerObj *pConsumer;
void *pIter = NULL;
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
while (1) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) break;
@ -293,12 +304,48 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
int32_t old =
atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST);
if (old == MQ_CONSUMER_STATUS__ACTIVE) {
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
pRebMsg->consumerId = pConsumer->consumerId;
// get all topics of that topic
int sz = taosArrayGetSize(pConsumer->currentTopics);
for (int i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId);
}
/*pRebMsg->consumerId = pConsumer->consumerId;*/
/*SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen =
* sizeof(SMqDoRebalanceMsg)};*/
/*pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);*/
}
}
int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS__INIT || status == MQ_CONSUMER_STATUS__MODIFY) {
SArray *rebSubs;
if (status == MQ_CONSUMER_STATUS__INIT) {
rebSubs = pConsumer->currentTopics;
} else {
rebSubs = pConsumer->recentRemovedTopics;
}
int sz = taosArrayGetSize(rebSubs);
for (int i = 0; i < sz; i++) {
char *topic = taosArrayGetP(rebSubs, i);
char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
if (status == MQ_CONSUMER_STATUS__INIT) {
taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
} else if (status == MQ_CONSUMER_STATUS__MODIFY) {
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
}
}
}
}
if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
mInfo("mq rebalance will be triggered");
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen = sizeof(SMqDoRebalanceMsg)};
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
}
}
} else {
taosHashCleanup(pRebMsg->rebSubHash);
rpcFreeCont(pRebMsg);
}
return 0;
}
@ -306,77 +353,111 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SMqDoRebalanceMsg *pReq = (SMqDoRebalanceMsg *)pMsg->rpcMsg.pCont;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pReq->consumerId);
int topicSz = taosArrayGetSize(pConsumer->topics);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
for (int i = 0; i < topicSz; i++) {
char *topic = taosArrayGetP(pConsumer->topics, i);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
void *pIter = NULL;
mInfo("mq rebalance start");
while (1) {
pIter = taosHashIterate(pReq->rebSubHash, pIter);
if (pIter == NULL) break;
SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter;
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);
mInfo("mq rebalance subscription: %s", pSub->key);
// remove lost consumer
for (int i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) {
int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i);
mInfo("mq remove lost consumer %ld", lostConsumerId);
for (int j = 0; j < taosArrayGetSize(pSub->consumers); j++) {
SMqConsumerEp *pConsumerEp = taosArrayGet(pSub->consumers, j);
if (pConsumerEp->consumerId == lostConsumerId) {
taosArrayPush(pSub->unassignedVg, pConsumerEp);
taosArrayRemove(pSub->consumers, j);
break;
}
}
}
// calculate rebalance
int32_t consumerNum = taosArrayGetSize(pSub->consumers);
if (consumerNum != 0) {
int32_t vgNum = pSub->vgNum;
int32_t vgEachConsumer = vgNum / consumerNum;
int32_t left = vgNum % consumerNum;
int32_t leftUsed = 0;
int32_t imbalanceVg = vgNum % consumerNum;
int32_t imbalanceSolved = 0;
SArray *unassignedVgStash = taosArrayInit(0, sizeof(SMqConsumerEp));
SArray *unassignedConsumer = taosArrayInit(0, sizeof(int32_t));
for (int32_t j = 0; j < consumerNum; j++) {
bool changed = false;
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
int32_t vgOneConsumer = taosArrayGetSize(pSubConsumer->vgInfo);
bool canUseLeft = leftUsed < left;
if (vgOneConsumer > vgEachConsumer + canUseLeft) {
changed = true;
if (canUseLeft) leftUsed++;
// put into unassigned
while (taosArrayGetSize(pSubConsumer->vgInfo) > vgEachConsumer + canUseLeft) {
SArray *unassignedConsumerIdx = taosArrayInit(0, sizeof(int32_t));
// iterate all consumers, set unassignedVgStash
for (int i = 0; i < consumerNum; i++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
int vgThisConsumerAfterRb;
if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1;
else vgThisConsumerAfterRb = vgEachConsumer;
mInfo("mq consumer:%ld ,connectted vgroup change from %d %d", pSubConsumer->consumerId, vgThisConsumerBeforeRb, vgThisConsumerAfterRb);
while(taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
ASSERT(pConsumerEp != NULL);
ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
taosArrayPush(unassignedVgStash, pConsumerEp);
// build msg and persist into trans
}
} else if (vgOneConsumer < vgEachConsumer) {
changed = true;
// assign from unassigned
while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer) {
// if no unassgined, save j
if (taosArrayGetSize(unassignedVgStash) == 0) {
taosArrayPush(unassignedConsumer, &j);
break;
}
SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
ASSERT(pConsumerEp != NULL);
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
pConsumerEp->consumerId = pSubConsumer->consumerId;
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
// build msg and persist into trans
}
}
if (changed) {
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
int32_t status = atomic_load_32(&pRebConsumer->status);
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
(vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
(vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)
) {
pRebConsumer->epoch++;
SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
if (vgThisConsumerAfterRb != 0) {
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
} else {
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
}
mInfo("mq consumer:%ld , status change from %d %d", pRebConsumer->consumerId, status, pRebConsumer->status);
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pConsumerRaw);
}
mndReleaseConsumer(pMnode, pRebConsumer);
}
for (int32_t j = 0; j < taosArrayGetSize(unassignedConsumer); j++) {
int32_t consumerIdx = *(int32_t *)taosArrayGet(unassignedConsumer, j);
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, consumerIdx);
while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer) {
SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
//assign to vgroup
if (taosArrayGetSize(unassignedVgStash) != 0) {
for (int i = 0; i < consumerNum; i++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
int vgThisConsumerAfterRb;
if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1;
else vgThisConsumerAfterRb = vgEachConsumer;
while(taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerBeforeRb) {
SMqConsumerEp* pConsumerEp = taosArrayPop(unassignedVgStash);
ASSERT(pConsumerEp != NULL);
ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
pConsumerEp->consumerId = pSubConsumer->consumerId;
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
// build msg and persist into trans
mInfo("mq consumer:%ld , assign vgroup %d, previously assigned to consumer %ld", pSubConsumer->consumerId, pConsumerEp->vgId, pConsumerEp->oldConsumerId);
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
}
}
}
ASSERT(taosArrayGetSize(unassignedVgStash) == 0);
// send msg to vnode
// log rebalance statistics
// TODO: log rebalance statistics
SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pSubRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pSubRaw);
@ -386,15 +467,111 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
mndReleaseConsumer(pMnode, pConsumer);
return -1;
}
mndTransDrop(pTrans);
mndReleaseConsumer(pMnode, pConsumer);
return 0;
}
#if 0
for (int32_t j = 0; j < consumerNum; j++) {
bool changed = false;
bool unfished = false;
bool canUseLeft = imbalanceSolved < imbalanceVg;
bool mustUseLeft = canUseLeft && (imbalanceVg - imbalanceSolved >= consumerNum - j);
ASSERT(imbalanceVg - imbalanceSolved <= consumerNum - j);
int32_t maxVg = vgEachConsumer + canUseLeft;
int32_t minVg = vgEachConsumer + mustUseLeft;
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
int32_t vgThisConsumerAfterRb;
if (vgThisConsumerBeforeRb > maxVg) {
vgThisConsumerAfterRb = maxVg;
imbalanceSolved++;
changed = true;
} else if (vgThisConsumerBeforeRb < minVg) {
vgThisConsumerAfterRb = minVg;
if (mustUseLeft) imbalanceSolved++;
changed = true;
} else {
vgThisConsumerAfterRb = vgThisConsumerBeforeRb;
}
if (vgThisConsumerBeforeRb > vgThisConsumerAfterRb) {
while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
// put into unassigned
SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
ASSERT(pConsumerEp != NULL);
ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
taosArrayPush(unassignedVgStash, pConsumerEp);
}
} else if (vgThisConsumerBeforeRb < vgThisConsumerAfterRb) {
// assign from unassigned
while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
// if no unassgined, save j
if (taosArrayGetSize(unassignedVgStash) == 0) {
taosArrayPush(unassignedConsumerIdx, &j);
unfished = true;
break;
}
// assign vg to consumer
SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
ASSERT(pConsumerEp != NULL);
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
pConsumerEp->consumerId = pSubConsumer->consumerId;
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
// build msg and persist into trans
}
}
if (changed && !unfished) {
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
pRebConsumer->epoch++;
if (vgThisConsumerAfterRb != 0) {
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
} else {
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
}
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pConsumerRaw);
mndReleaseConsumer(pMnode, pRebConsumer);
// TODO: save history
}
}
for (int32_t j = 0; j < taosArrayGetSize(unassignedConsumerIdx); j++) {
bool canUseLeft = imbalanceSolved < imbalanceVg;
int32_t consumerIdx = *(int32_t *)taosArrayGet(unassignedConsumerIdx, j);
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, consumerIdx);
if (canUseLeft) imbalanceSolved++;
// must use
int32_t vgThisConsumerAfterRb = taosArrayGetSize(pSubConsumer->vgInfo) + canUseLeft;
while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer + canUseLeft) {
// assign vg to consumer
SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
ASSERT(pConsumerEp != NULL);
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
pConsumerEp->consumerId = pSubConsumer->consumerId;
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
// build msg and persist into trans
}
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
pRebConsumer->epoch++;
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pConsumerRaw);
mndReleaseConsumer(pMnode, pRebConsumer);
// TODO: save history
}
#endif
#if 0
//update consumer status for the subscribption
for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) {
@ -518,11 +695,11 @@ static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSub
mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql);
return -1;
}
if (pArray && taosArrayGetSize(pArray) != 1) {
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
mError("unsupport topic: %s, sql: %s, plan level: %ld", pTopic->name, pTopic->sql, taosArrayGetSize(pArray));
return -1;
}
/*if (pArray && taosArrayGetSize(pArray) != 1) {*/
/*terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;*/
/*mError("unsupport topic: %s, sql: %s, plan level: %ld", pTopic->name, pTopic->sql, taosArrayGetSize(pArray));*/
/*return -1;*/
/*}*/
SMqConsumerEp consumerEp = {0};
consumerEp.status = 0;
@ -697,7 +874,7 @@ static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName) {
return key;
}
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *cgroup, char *topicName) {
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
SSdb *pSdb = pMnode->pSdb;
char *key = mndMakeSubscribeKey(cgroup, topicName);
SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
@ -708,6 +885,15 @@ SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *cgroup, char *topicNa
return pSub;
}
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
SSdb *pSdb = pMnode->pSdb;
SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
if (pSub == NULL) {
/*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/
}
return pSub;
}
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pSub);
@ -737,9 +923,9 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
createConsumer = true;
} else {
pConsumer->epoch++;
oldSub = pConsumer->topics;
oldSub = pConsumer->currentTopics;
}
pConsumer->topics = newSub;
pConsumer->currentTopics = newSub;
if (oldSub != NULL) {
oldTopicNum = taosArrayGetSize(oldSub);
@ -796,11 +982,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
for (int vgi = 0; vgi < vgsz; vgi++) {
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi);
mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp);
taosArrayPush(pSub->unassignedVg, pConsumerEp);
}
taosArrayRemove(pSub->consumers, ci);
break;
}
}
pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY);
/*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/
} else if (newTopicName != NULL) {
ASSERT(oldTopicName == NULL);
@ -830,6 +1019,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
pConsumerEp->consumerId = consumerId;
taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
atomic_store_32(&pConsumer->hbStatus, MQ_CONSUMER_STATUS__ACTIVE);
}
SSdbRaw *pRaw = mndSubActionEncode(pSub);

View File

@ -107,7 +107,7 @@ static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) {
return hash;
}
static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, void *pKey) {
static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) {
int32_t keySize;
EKeyType keyType = pSdb->keyTypes[type];
@ -263,7 +263,7 @@ int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) {
return code;
}
void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) {
void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
terrno = 0;
SHashObj *hash = sdbGetHash(pSdb, type);