diff --git a/example/src/tmq.c b/example/src/tmq.c index e19e6b5aee..c36675fa7e 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -116,7 +116,7 @@ void basic_consume_loop(tmq_t *tmq, int32_t cnt = 0; /*clock_t startTime = clock();*/ while (running) { - tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 0); + tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500); if (tmqmessage) { cnt++; msg_process(tmqmessage); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d299860df7..5d5bc833fb 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1681,7 +1681,7 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { return buf; } -typedef struct SMqSetCVgReq { +typedef struct { int64_t leftForVer; int32_t vgId; int64_t oldConsumerId; @@ -1746,7 +1746,51 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { return buf; } -typedef struct SMqSetCVgRsp { +typedef struct { + int64_t leftForVer; + int32_t vgId; + int64_t oldConsumerId; + int64_t newConsumerId; + //char topicName[TSDB_TOPIC_FNAME_LEN]; + //char cgroup[TSDB_CONSUMER_GROUP_LEN]; + //char* sql; + //char* logicalPlan; + //char* physicalPlan; + //char* qmsg; +} SMqMVRebReq; + +static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pReq->leftForVer); + tlen += taosEncodeFixedI32(buf, pReq->vgId); + tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); + tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); + //tlen += taosEncodeString(buf, pReq->topicName); + //tlen += taosEncodeString(buf, pReq->cgroup); + //tlen += taosEncodeString(buf, pReq->sql); + //tlen += taosEncodeString(buf, pReq->logicalPlan); + //tlen += taosEncodeString(buf, pReq->physicalPlan); + //tlen += taosEncodeString(buf, pReq->qmsg); + //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) { + buf = taosDecodeFixedI64(buf, &pReq->leftForVer); + buf = taosDecodeFixedI32(buf, &pReq->vgId); + buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); + buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); + //buf = taosDecodeStringTo(buf, pReq->topicName); + //buf = taosDecodeStringTo(buf, pReq->cgroup); + //buf = taosDecodeString(buf, &pReq->sql); + //buf = taosDecodeString(buf, &pReq->logicalPlan); + //buf = taosDecodeString(buf, &pReq->physicalPlan); + //buf = taosDecodeString(buf, &pReq->qmsg); + //buf = tDecodeSSubQueryMsg(buf, &pReq->msg); + return buf; +} + +typedef struct { SMsgHead header; int32_t vgId; int64_t consumerId; @@ -1754,6 +1798,14 @@ typedef struct SMqSetCVgRsp { char cGroup[TSDB_CONSUMER_GROUP_LEN]; } SMqSetCVgRsp; +typedef struct { + SMsgHead header; + int32_t vgId; + int64_t consumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cGroup[TSDB_CONSUMER_GROUP_LEN]; +} SMqMVRebRsp; + typedef struct { uint32_t nCols; SSchema *pSchema; @@ -1854,7 +1906,7 @@ typedef struct { typedef struct { int64_t consumerId; - int64_t epoch; + int32_t epoch; char cgroup[TSDB_CONSUMER_GROUP_LEN]; SArray* topics; // SArray } SMqCMGetSubEpRsp; @@ -1911,7 +1963,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pRsp->consumerId); - tlen += taosEncodeFixedI64(buf, pRsp->epoch); + tlen += taosEncodeFixedI32(buf, pRsp->epoch); tlen += taosEncodeString(buf, pRsp->cgroup); int32_t sz = taosArrayGetSize(pRsp->topics); tlen += taosEncodeFixedI32(buf, sz); @@ -1924,7 +1976,7 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) { buf = taosDecodeFixedI64(buf, &pRsp->consumerId); - buf = taosDecodeFixedI64(buf, &pRsp->epoch); + buf = taosDecodeFixedI32(buf, &pRsp->epoch); buf = taosDecodeStringTo(buf, pRsp->cgroup); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 98d2b2c519..cf890f5cbb 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -164,6 +164,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", SMqSetCVgReq, SMqSetCVgRsp) + TD_DEF_MSG_TYPE(TDMT_VND_MQ_REB, "vnode-mq-mv-rebalance", SMqMVRebReq, SMqMVRebRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index a820d68e50..c0f0cf4dfe 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -58,7 +58,7 @@ struct tmq_t { char clientId[256]; SRWLatch lock; int64_t consumerId; - int64_t epoch; + int32_t epoch; int64_t status; tsem_t rspSem; STscObj* pTscObj; @@ -592,6 +592,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { goto END; } buf->consumerId = htobe64(tmq->consumerId); + buf->epoch = htonl(tmq->epoch); strcpy(buf->cgroup, tmq->groupId); SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 9b3ab40226..239fe9ca4d 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -116,6 +116,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; /*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/ pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = dndProcessMnodeReadMsg; // Requests handled by VNODE @@ -149,6 +150,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg; } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 9090d5d97c..73c0446e3f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -424,6 +424,7 @@ typedef struct { int32_t status; int32_t vgNum; SArray* consumers; // SArray + SArray* lostConsumers; // SArray SArray* unassignedVg; // SArray } SMqSubscribeObj; @@ -432,10 +433,17 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { if (pSub == NULL) { return NULL; } + pSub->consumers = taosArrayInit(0, sizeof(SMqSubConsumer)); if (pSub->consumers == NULL) { goto _err; } + + pSub->lostConsumers = taosArrayInit(0, sizeof(SMqSubConsumer)); + if (pSub->lostConsumers == NULL) { + goto _err; + } + pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp)); if (pSub->unassignedVg == NULL) { goto _err; @@ -448,8 +456,9 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { return pSub; _err: - tfree(pSub->unassignedVg); tfree(pSub->consumers); + tfree(pSub->lostConsumers); + tfree(pSub->unassignedVg); tfree(pSub); return NULL; } @@ -468,6 +477,13 @@ static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeOb tlen += tEncodeSMqSubConsumer(buf, pSubConsumer); } + sz = taosArrayGetSize(pSub->lostConsumers); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqSubConsumer* pSubConsumer = taosArrayGet(pSub->lostConsumers, i); + tlen += tEncodeSMqSubConsumer(buf, pSubConsumer); + } + sz = taosArrayGetSize(pSub->unassignedVg); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { @@ -496,6 +512,17 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) taosArrayPush(pSub->consumers, &subConsumer); } + buf = taosDecodeFixedI32(buf, &sz); + pSub->lostConsumers = taosArrayInit(sz, sizeof(SMqSubConsumer)); + if (pSub->lostConsumers == NULL) { + return NULL; + } + for (int32_t i = 0; i < sz; i++) { + SMqSubConsumer subConsumer = {0}; + buf = tDecodeSMqSubConsumer(buf, &subConsumer); + taosArrayPush(pSub->lostConsumers, &subConsumer); + } + buf = taosDecodeFixedI32(buf, &sz); pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp)); if (pSub->unassignedVg == NULL) { @@ -545,7 +572,7 @@ typedef struct { char cgroup[TSDB_CONSUMER_GROUP_LEN]; SArray* currentTopics; // SArray SArray* recentRemovedTopics; // SArray - int64_t epoch; + int32_t epoch; // stat int64_t pollCnt; // status @@ -561,8 +588,9 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerO int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pConsumer->consumerId); tlen += taosEncodeFixedI64(buf, pConsumer->connId); - tlen += taosEncodeFixedI64(buf, pConsumer->epoch); + tlen += taosEncodeFixedI32(buf, pConsumer->epoch); tlen += taosEncodeFixedI64(buf, pConsumer->pollCnt); + tlen += taosEncodeFixedI32(buf, pConsumer->status); tlen += taosEncodeString(buf, pConsumer->cgroup); sz = taosArrayGetSize(pConsumer->currentTopics); @@ -585,8 +613,9 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons int32_t sz; buf = taosDecodeFixedI64(buf, &pConsumer->consumerId); buf = taosDecodeFixedI64(buf, &pConsumer->connId); - buf = taosDecodeFixedI64(buf, &pConsumer->epoch); + buf = taosDecodeFixedI32(buf, &pConsumer->epoch); buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt); + buf = taosDecodeFixedI32(buf, &pConsumer->status); buf = taosDecodeStringTo(buf, pConsumer->cgroup); buf = taosDecodeFixedI32(buf, &sz); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 4b78f6eb64..e87021b44d 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -61,7 +61,7 @@ SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) { } pConsumer->epoch = 1; pConsumer->consumerId = consumerId; - pConsumer->status = MQ_CONSUMER_STATUS__INIT; + atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__INIT); strcpy(pConsumer->cgroup, cgroup); taosInitRWLatch(&pConsumer->lock); return pConsumer; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 4fab193a83..c5bebb3249 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -72,6 +72,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp); + mndSetMsgHandle(pMnode, TDMT_VND_MQ_REB_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg); @@ -105,13 +106,13 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj } static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) { - SMqSetCVgReq req = { + SMqMVRebReq req = { .vgId = pConsumerEp->vgId, .oldConsumerId = pConsumerEp->oldConsumerId, .newConsumerId = pConsumerEp->consumerId, }; - int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); + int32_t tlen = tEncodeSMqMVRebReq(NULL, &req); void *buf = malloc(sizeof(SMsgHead) + tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -123,7 +124,7 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume pMsgHead->vgId = htonl(pConsumerEp->vgId); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncodeSMqSetCVgReq(&abuf, &req); + tEncodeSMqMVRebReq(&abuf, &req); *pBuf = buf; *pLen = tlen; @@ -208,6 +209,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont; SMqCMGetSubEpRsp rsp = {0}; int64_t consumerId = be64toh(pReq->consumerId); + int32_t epoch = ntohl(pReq->epoch); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId); if (pConsumer == NULL) { @@ -216,10 +218,19 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { } ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); + //TODO + int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus); + mInfo("try to get sub ep, old val: %d", hbStatus); + atomic_store_32(&pConsumer->hbStatus, 0); + /*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/ + /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/ + /*sdbWrite(pMnode->pSdb, pConsumerRaw);*/ + strcpy(rsp.cgroup, pReq->cgroup); rsp.consumerId = consumerId; rsp.epoch = pConsumer->epoch; - if (pReq->epoch != rsp.epoch) { + if (epoch != rsp.epoch) { + mInfo("old epoch %d, new epoch %d", epoch, rsp.epoch); SArray *pTopics = pConsumer->currentTopics; int sz = taosArrayGetSize(pTopics); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); @@ -258,6 +269,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { void *abuf = buf; tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); tDeleteSMqCMGetSubEpRsp(&rsp); + mndReleaseConsumer(pMnode, pConsumer); pMsg->pCont = buf; pMsg->contLen = tlen; return 0; @@ -373,9 +385,10 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { mInfo("mq remove lost consumer %ld", lostConsumerId); for (int j = 0; j < taosArrayGetSize(pSub->consumers); j++) { - SMqConsumerEp *pConsumerEp = taosArrayGet(pSub->consumers, j); - if (pConsumerEp->consumerId == lostConsumerId) { - taosArrayPush(pSub->unassignedVg, pConsumerEp); + SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); + if (pSubConsumer->consumerId == lostConsumerId) { + taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo); + taosArrayPush(pSub->lostConsumers, pSubConsumer); taosArrayRemove(pSub->consumers, j); break; } @@ -389,8 +402,8 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { int32_t vgEachConsumer = vgNum / consumerNum; int32_t imbalanceVg = vgNum % consumerNum; int32_t imbalanceSolved = 0; - SArray *unassignedVgStash = taosArrayInit(0, sizeof(SMqConsumerEp)); - SArray *unassignedConsumerIdx = taosArrayInit(0, sizeof(int32_t)); + /*SArray *unassignedVgStash = taosArrayInit(0, sizeof(SMqConsumerEp));*/ + /*SArray *unassignedConsumerIdx = taosArrayInit(0, sizeof(int32_t));*/ // iterate all consumers, set unassignedVgStash for (int i = 0; i < consumerNum; i++) { @@ -400,13 +413,13 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1; else vgThisConsumerAfterRb = vgEachConsumer; - mInfo("mq consumer:%ld ,connectted vgroup change from %d %d", pSubConsumer->consumerId, vgThisConsumerBeforeRb, vgThisConsumerAfterRb); + mInfo("mq consumer:%ld, connectted vgroup change from %d %d", pSubConsumer->consumerId, vgThisConsumerBeforeRb, vgThisConsumerAfterRb); while(taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) { SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo); ASSERT(pConsumerEp != NULL); ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId); - taosArrayPush(unassignedVgStash, pConsumerEp); + taosArrayPush(pSub->unassignedVg, pConsumerEp); } SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId); @@ -422,7 +435,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); } - mInfo("mq consumer:%ld , status change from %d %d", pRebConsumer->consumerId, status, pRebConsumer->status); + mInfo("mq consumer:%ld, status change from %d %d", pRebConsumer->consumerId, status, pRebConsumer->status); SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer); sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); @@ -432,7 +445,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { } //assign to vgroup - if (taosArrayGetSize(unassignedVgStash) != 0) { + if (taosArrayGetSize(pSub->unassignedVg) != 0) { for (int i = 0; i < consumerNum; i++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); @@ -440,14 +453,13 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1; else vgThisConsumerAfterRb = vgEachConsumer; - while(taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerBeforeRb) { - SMqConsumerEp* pConsumerEp = taosArrayPop(unassignedVgStash); + while(taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) { + SMqConsumerEp* pConsumerEp = taosArrayPop(pSub->unassignedVg); ASSERT(pConsumerEp != NULL); - ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId); - pConsumerEp->oldConsumerId = pConsumerEp->consumerId; pConsumerEp->consumerId = pSubConsumer->consumerId; + taosArrayPush(pSubConsumer->vgInfo, pConsumerEp); mInfo("mq consumer:%ld , assign vgroup %d, previously assigned to consumer %ld", pSubConsumer->consumerId, pConsumerEp->vgId, pConsumerEp->oldConsumerId); @@ -455,7 +467,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { } } } - ASSERT(taosArrayGetSize(unassignedVgStash) == 0); + ASSERT(taosArrayGetSize(pSub->unassignedVg) == 0); // TODO: log rebalance statistics SSdbRaw *pSubRaw = mndSubActionEncode(pSub); @@ -1019,7 +1031,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { pConsumerEp->consumerId = consumerId; taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp); mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); - atomic_store_32(&pConsumer->hbStatus, MQ_CONSUMER_STATUS__ACTIVE); + atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); } SSdbRaw *pRaw = mndSubActionEncode(pSub); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index ca851748d6..211cfbd1f0 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -389,7 +389,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { return NULL; } - if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER) { + if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER && pRpcMsg->msgType != TDMT_MND_MQ_DO_REBALANCE) { SRpcConnInfo connInfo = {0}; if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { taosFreeQitem(pMsg); diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 2aceaeb016..bd626154e6 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -198,6 +198,7 @@ int tqCommit(STQ*); int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg); +int32_t tqProcessRebReq(STQ* pTq, char* msg); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f65c0d9893..af4c9da05f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -281,6 +281,19 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } +int32_t tqProcessRebReq(STQ* pTq, char* msg) { + SMqMVRebReq req = {0}; + tDecodeSMqMVRebReq(msg, &req); + + STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId); + ASSERT(pConsumer); + tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer); + tqHandleCommit(pTq->tqMeta, req.newConsumerId); + tqHandlePurge(pTq->tqMeta, req.oldConsumerId); + terrno = TSDB_CODE_SUCCESS; + return 0; +} + int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { SMqSetCVgReq req = {0}; tDecodeSMqSetCVgReq(msg, &req); diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 28487821e6..4d048bc6e2 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -130,6 +130,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // TODO: handle error } } break; + case TDMT_VND_MQ_REB: { + if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead))) < 0) { + + } + } break; default: ASSERT(0); break;