diff --git a/include/common/tmsg.h b/include/common/tmsg.h index db2c51fb01..48ad25add3 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3919,11 +3919,11 @@ int32_t tSerializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq); int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq); int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); -int32_t tDeatroySMqHbReq(SMqHbReq* pReq); +void tDestroySMqHbReq(SMqHbReq* pReq); int32_t tSerializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp); int32_t tDeserializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp); -int32_t tDeatroySMqHbRsp(SMqHbRsp* pRsp); +void tDestroySMqHbRsp(SMqHbRsp* pRsp); int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index fb1882e472..c8fc556150 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -779,7 +779,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { taosWUnLockLatch(&tmq->lock); taosReleaseRef(tmqMgmt.rsetId, refId); } - tDeatroySMqHbRsp(&rsp); + tDestroySMqHbRsp(&rsp); taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); } @@ -861,7 +861,7 @@ void tmqSendHbReq(void* param, void* tmrId) { asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); OVER: - tDeatroySMqHbReq(&req); + tDestroySMqHbReq(&req); taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); taosReleaseRef(tmqMgmt.rsetId, refId); } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9138d7c983..5b154df8a3 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6201,9 +6201,8 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) { return 0; } -int32_t tDeatroySMqHbRsp(SMqHbRsp *pRsp) { +void tDestroySMqHbRsp(SMqHbRsp *pRsp) { taosArrayDestroy(pRsp->topicPrivileges); - return 0; } int32_t tSerializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) { @@ -6250,13 +6249,12 @@ int32_t tDeserializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) { return 0; } -int32_t tDeatroySMqHbReq(SMqHbReq *pReq) { +void tDestroySMqHbReq(SMqHbReq *pReq) { for (int i = 0; i < taosArrayGetSize(pReq->topics); i++) { TopicOffsetRows *vgs = taosArrayGet(pReq->topics, i); if (vgs) taosArrayDestroy(vgs->offsetRows); } taosArrayDestroy(pReq->topics); - return 0; } int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 8c89ddc825..5184ad0eca 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -24,27 +24,22 @@ extern "C" { enum { MQ_CONSUMER_STATUS_REBALANCE = 1, -// MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore MQ_CONSUMER_STATUS_READY, MQ_CONSUMER_STATUS_LOST, -// MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore -// MQ_CONSUMER_STATUS__LOST_REBD, -};\ +}; int32_t mndInitConsumer(SMnode *pMnode); void mndCleanupConsumer(SMnode *pMnode); -void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* info); +void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo* info); SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); -SMqConsumerObj *mndCreateConsumer(int64_t consumerId, const char *cgroup); - SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer); SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); -int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer); -int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer); +int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer); +int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer); const char *mndConsumerStatusName(int status); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 4a092057ce..b98ef31d75 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -149,6 +149,7 @@ typedef enum { CONSUMER_REMOVE_REB, // remove after rebalance CONSUMER_UPDATE_REC, // update after recover CONSUMER_UPDATE_SUB, // update after subscribe req + CONSUMER_INSERT_SUB, } ECsmUpdateType; typedef struct { @@ -556,8 +557,9 @@ typedef struct { int32_t resetOffsetCfg; } SMqConsumerObj; -SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]); -void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool isDeleted); +SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe); +void tClearSMqConsumerObj(SMqConsumerObj* pConsumer); +void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer); int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer); void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver); diff --git a/source/dnode/mnode/impl/inc/mndSubscribe.h b/source/dnode/mnode/impl/inc/mndSubscribe.h index 23b3a7d1fe..eb9902a75c 100644 --- a/source/dnode/mnode/impl/inc/mndSubscribe.h +++ b/source/dnode/mnode/impl/inc/mndSubscribe.h @@ -30,7 +30,7 @@ SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const c SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key); void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); -int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName); +void mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName); int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic); int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index f60d8035e3..cdb5e89dd7 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -68,25 +68,27 @@ int32_t mndInitConsumer(SMnode *pMnode) { void mndCleanupConsumer(SMnode *pMnode) {} -void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo *info) { - SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); - if (pClearMsg == NULL) { +void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) { + void *msg = rpcMallocCont(sizeof(int64_t)); + if (msg == NULL) { mError("consumer:0x%" PRIx64 " failed to clear consumer due to out of memory. alloc size:%d", consumerId, - (int32_t)sizeof(SMqConsumerClearMsg)); + (int32_t)sizeof(int64_t)); return; } - pClearMsg->consumerId = consumerId; + *(int64_t*)msg = consumerId; SRpcMsg rpcMsg = { - .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, - .pCont = pClearMsg, - .contLen = sizeof(SMqConsumerClearMsg), + .msgType = msgType, + .pCont = msg, + .contLen = sizeof(int64_t), .info = *info, }; - mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId); - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); - return; + mInfo("mndSendConsumerMsg type:%d consumer:0x%" PRIx64, msgType, consumerId); + int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + if (code != 0){ + mError("consumer:%"PRId64" send consumer msg:%d error:%d", consumerId, msgType, code); + } } static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, @@ -148,54 +150,62 @@ FAILED: } static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { + int32_t code = 0; SMnode *pMnode = pMsg->info.node; SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont; + SMqConsumerObj *pConsumerNew = NULL; + STrans *pTrans = NULL; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId); if (pConsumer == NULL) { mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId); - return -1; + code = -1; + goto END; } mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId, pConsumer->status, mndConsumerStatusName(pConsumer->status)); if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { - mndReleaseConsumer(pMnode, pConsumer); terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; - return -1; + code = -1; + goto END; } - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE_REC; + pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_UPDATE_REC, NULL, NULL); + if (pConsumerNew == NULL){ + code = -1; + goto END; + } - mndReleaseConsumer(pMnode, pConsumer); - - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm"); if (pTrans == NULL) { - goto FAIL; + code = -1; + goto END; } - if (validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false) != 0) { - goto FAIL; + code = validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false); + if (code != 0) { + goto END; } - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; - if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; - - tDeleteSMqConsumerObj(pConsumerNew, true); + code = mndSetConsumerCommitLogs(pTrans, pConsumerNew); + if (code != 0) { + goto END; + } + code = mndTransPrepare(pMnode, pTrans); +END: + mndReleaseConsumer(pMnode, pConsumer); + tDeleteSMqConsumerObj(pConsumerNew); mndTransDrop(pTrans); - return 0; -FAIL: - tDeleteSMqConsumerObj(pConsumerNew, true); - - mndTransDrop(pTrans); - return -1; + return code; } -// todo check the clear process static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { + int32_t code = 0; SMnode *pMnode = pMsg->info.node; SMqConsumerClearMsg *pClearMsg = pMsg->pCont; + SMqConsumerObj *pConsumerNew = NULL; + STrans *pTrans = NULL; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId); if (pConsumer == NULL) { @@ -206,33 +216,37 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId, mndConsumerStatusName(pConsumer->status)); - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); + pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL); + if (pConsumerNew == NULL){ + code = -1; + goto END; + } - mndReleaseConsumer(pMnode, pConsumer); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm"); + if (pTrans == NULL) { + code = -1; + goto END; + } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm"); - if (pTrans == NULL) goto FAIL; // this is the drop action, not the update action - if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; - if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; + code = mndSetConsumerDropLogs(pTrans, pConsumerNew); + if (code != 0) { + goto END; + } - tDeleteSMqConsumerObj(pConsumerNew, true); + code = mndTransPrepare(pMnode, pTrans); +END: + mndReleaseConsumer(pMnode, pConsumer); + tDeleteSMqConsumerObj(pConsumerNew); mndTransDrop(pTrans); - return 0; - -FAIL: - tDeleteSMqConsumerObj(pConsumerNew, true); - - mndTransDrop(pTrans); - return -1; + return code; } static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char *user) { rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege)); if (rsp->topicPrivileges == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; + return TSDB_CODE_OUT_OF_MEMORY; } for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) { char *topic = taosArrayGetP(pConsumer->currentTopics, i); @@ -253,6 +267,47 @@ static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRs return 0; } +static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pConsumer){ + for (int i = 0; i < taosArrayGetSize(req->topics); i++) { + TopicOffsetRows *data = taosArrayGet(req->topics, i); + mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName); + + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); + if (pSub == NULL) { + continue; + } + taosWLockLatch(&pSub->lock); + SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t)); + if (pConsumerEp) { + taosArrayDestroy(pConsumerEp->offsetRows); + pConsumerEp->offsetRows = data->offsetRows; + data->offsetRows = NULL; + } + taosWUnLockLatch(&pSub->lock); + + mndReleaseSubscribe(pMnode, pSub); + } +} + +static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){ + int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp); + if (tlen <= 0){ + return TSDB_CODE_OUT_OF_MEMORY; + } + void *buf = rpcMallocCont(tlen); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + if(tSerializeSMqHbRsp(buf, tlen, rsp) != 0){ + rpcFreeCont(buf); + return TSDB_CODE_OUT_OF_MEMORY; + } + pMsg->info.rsp = buf; + pMsg->info.rspLen = tlen; + return 0; +} + static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { int32_t code = 0; SMnode *pMnode = pMsg->info.node; @@ -261,7 +316,6 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumer = NULL; if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY; goto end; } @@ -270,7 +324,6 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { mError("consumer:0x%" PRIx64 " not exist", consumerId); - terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; code = TSDB_CODE_MND_CONSUMER_NOT_EXIST; goto end; } @@ -284,88 +337,152 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { int32_t status = atomic_load_32(&pConsumer->status); if (status == MQ_CONSUMER_STATUS_LOST) { - mInfo("try to recover consumer:0x%" PRIx64 "", consumerId); - SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); - - pRecoverMsg->consumerId = consumerId; - SRpcMsg pRpcMsg = { - .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER, - .pCont = pRecoverMsg, - .contLen = sizeof(SMqConsumerRecoverMsg), - .info = pMsg->info, - }; - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); + mInfo("try to recover consumer:0x%" PRIx64, consumerId); + mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info); } - for (int i = 0; i < taosArrayGetSize(req.topics); i++) { - TopicOffsetRows *data = taosArrayGet(req.topics, i); - mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName); - - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); - if (pSub == NULL) { -#ifdef TMQ_DEBUG - ASSERT(0); -#endif - continue; - } - taosWLockLatch(&pSub->lock); - SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t)); - if (pConsumerEp) { - taosArrayDestroy(pConsumerEp->offsetRows); - pConsumerEp->offsetRows = data->offsetRows; - data->offsetRows = NULL; - } - taosWUnLockLatch(&pSub->lock); - - mndReleaseSubscribe(pMnode, pSub); - } - - // encode rsp - int32_t tlen = tSerializeSMqHbRsp(NULL, 0, &rsp); - void *buf = rpcMallocCont(tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } - - tSerializeSMqHbRsp(buf, tlen, &rsp); - pMsg->info.rsp = buf; - pMsg->info.rspLen = tlen; + storeOffsetRows(pMnode, &req, pConsumer); + code = buildMqHbRsp(pMsg, &rsp); end: - tDeatroySMqHbRsp(&rsp); + tDestroySMqHbRsp(&rsp); mndReleaseConsumer(pMnode, pConsumer); - tDeatroySMqHbReq(&req); + tDestroySMqHbReq(&req); return code; } +static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){ + taosRLockLatch(&pConsumer->lock); + + int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics); + + rsp->topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp)); + if (rsp->topics == NULL) { + taosRUnLockLatch(&pConsumer->lock); + return TSDB_CODE_OUT_OF_MEMORY; + } + + // handle all topics subscribed by this consumer + for (int32_t i = 0; i < numOfTopics; i++) { + char *topic = taosArrayGetP(pConsumer->currentTopics, i); + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); + // txn guarantees pSub is created + if (pSub == NULL) { + continue; + } + taosRLockLatch(&pSub->lock); + + SMqSubTopicEp topicEp = {0}; + strcpy(topicEp.topic, topic); + + // 2.1 fetch topic schema + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); + if (pTopic == NULL) { + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); + continue; + } + taosRLockLatch(&pTopic->lock); + tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN); + topicEp.schema.nCols = pTopic->schema.nCols; + if (topicEp.schema.nCols) { + topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema)); + memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema)); + } + taosRUnLockLatch(&pTopic->lock); + mndReleaseTopic(pMnode, pTopic); + + // 2.2 iterate all vg assigned to the consumer of that topic + SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t)); + int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs); + + // this customer assigned vgroups + topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp)); + if (topicEp.vgs == NULL) { + taosRUnLockLatch(&pConsumer->lock); + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t j = 0; j < vgNum; j++) { + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); + // char offsetKey[TSDB_PARTITION_KEY_LEN]; + // mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId); + + if (epoch == -1) { + SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId); + if (pVgroup) { + pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup); + mndReleaseVgroup(pMnode, pVgroup); + } + } + // 2.2.1 build vg ep + SMqSubVgEp vgEp = { + .epSet = pVgEp->epSet, + .vgId = pVgEp->vgId, + .offset = -1, + }; + + taosArrayPush(topicEp.vgs, &vgEp); + } + taosArrayPush(rsp->topics, &topicEp); + + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); + } + taosRUnLockLatch(&pConsumer->lock); + return 0; +} + +static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoch, int64_t consumerId){ + // encode rsp + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, rsp); + void *buf = rpcMallocCont(tlen); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SMqRspHead *pHead = buf; + + pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP; + pHead->epoch = serverEpoch; + pHead->consumerId = consumerId; + pHead->walsver = 0; + pHead->walever = 0; + + void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); + tEncodeSMqAskEpRsp(&abuf, rsp); + + // send rsp + pMsg->info.rsp = buf; + pMsg->info.rspLen = tlen; + return 0; +} + static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMqAskEpReq req = {0}; SMqAskEpRsp rsp = {0}; + int32_t code = 0; if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } int64_t consumerId = req.consumerId; - int32_t epoch = req.epoch; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup); - terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; - return -1; + return TSDB_CODE_MND_CONSUMER_NOT_EXIST; } - int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)); - if (ret != 0) { + if (strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)) != 0) { mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup, pConsumer->cgroup); - terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; - goto FAIL; + code = TSDB_CODE_MND_CONSUMER_NOT_EXIST; + goto END; } atomic_store_32(&pConsumer->hbStatus, 0); @@ -374,156 +491,37 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { int32_t status = atomic_load_32(&pConsumer->status); if (status == MQ_CONSUMER_STATUS_LOST) { - mInfo("try to recover consumer:0x%" PRIx64, consumerId); - SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); - - pRecoverMsg->consumerId = consumerId; - SRpcMsg pRpcMsg = { - .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER, - .pCont = pRecoverMsg, - .contLen = sizeof(SMqConsumerRecoverMsg), - .info = pMsg->info, - }; - - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); + mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info); } if (status != MQ_CONSUMER_STATUS_READY) { mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); - terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; - goto FAIL; + code = TSDB_CODE_MND_CONSUMER_NOT_READY; + goto END; } + int32_t epoch = req.epoch; int32_t serverEpoch = atomic_load_32(&pConsumer->epoch); // 2. check epoch, only send ep info when epochs do not match if (epoch != serverEpoch) { - taosRLockLatch(&pConsumer->lock); - mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch, - serverEpoch); - int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics); - - rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp)); - if (rsp.topics == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosRUnLockLatch(&pConsumer->lock); - goto FAIL; + mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", + consumerId, epoch, serverEpoch); + code = addEpSetInfo(pMnode, pConsumer, epoch, &rsp); + if(code != 0){ + goto END; } - - // handle all topics subscribed by this consumer - for (int32_t i = 0; i < numOfTopics; i++) { - char *topic = taosArrayGetP(pConsumer->currentTopics, i); - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); - // txn guarantees pSub is created - if (pSub == NULL) { -#ifdef TMQ_DEBUG - ASSERT(0); -#endif - continue; - } - taosRLockLatch(&pSub->lock); - - SMqSubTopicEp topicEp = {0}; - strcpy(topicEp.topic, topic); - - // 2.1 fetch topic schema - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - if (pTopic == NULL) { -#ifdef TMQ_DEBUG - ASSERT(0); -#endif - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); - continue; - } - taosRLockLatch(&pTopic->lock); - tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN); - topicEp.schema.nCols = pTopic->schema.nCols; - if (topicEp.schema.nCols) { - topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema)); - memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema)); - } - taosRUnLockLatch(&pTopic->lock); - mndReleaseTopic(pMnode, pTopic); - - // 2.2 iterate all vg assigned to the consumer of that topic - SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t)); - int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs); - - // this customer assigned vgroups - topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp)); - if (topicEp.vgs == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosRUnLockLatch(&pConsumer->lock); - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); - goto FAIL; - } - - for (int32_t j = 0; j < vgNum; j++) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); - // char offsetKey[TSDB_PARTITION_KEY_LEN]; - // mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId); - - if (epoch == -1) { - SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId); - if (pVgroup) { - pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup); - mndReleaseVgroup(pMnode, pVgroup); - } - } - // 2.2.1 build vg ep - SMqSubVgEp vgEp = { - .epSet = pVgEp->epSet, - .vgId = pVgEp->vgId, - .offset = -1, - }; - - taosArrayPush(topicEp.vgs, &vgEp); - } - taosArrayPush(rsp.topics, &topicEp); - - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); - } - taosRUnLockLatch(&pConsumer->lock); } - // encode rsp - int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp); - void *buf = rpcMallocCont(tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; - } + code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId); - SMqRspHead *pHead = buf; - - pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP; - pHead->epoch = serverEpoch; - pHead->consumerId = pConsumer->consumerId; - pHead->walsver = 0; - pHead->walever = 0; - - void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqAskEpRsp(&abuf, &rsp); - - // release consumer and free memory +END: tDeleteSMqAskEpRsp(&rsp); mndReleaseConsumer(pMnode, pConsumer); - - // send rsp - pMsg->info.rsp = buf; - pMsg->info.rspLen = tlen; - return 0; - -FAIL: - tDeleteSMqAskEpRsp(&rsp); - mndReleaseConsumer(pMnode, pConsumer); - return -1; + return code; } -int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) { +int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) { SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; @@ -531,7 +529,7 @@ int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *p return 0; } -int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) { +int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) { SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; @@ -539,8 +537,6 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj return 0; } -static void *topicNameDup(void *p) { return taosStrdup((char *)p); } - static void freeItem(void *param) { void *pItem = *(void **)param; if (pItem != NULL) { @@ -548,21 +544,52 @@ static void freeItem(void *param) { } } -int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; - char *msgStr = pMsg->pCont; - int32_t code = -1; +static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){ + pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *)); + pConsumerNew->rebRemovedTopics = taosArrayInit(0, sizeof(void *)); + if(pConsumerNew->rebNewTopics == NULL || pConsumerNew->rebRemovedTopics == NULL){ + return TSDB_CODE_OUT_OF_MEMORY; + } + int32_t newTopicNum = taosArrayGetSize(pConsumerNew->assignedTopics); + int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics); - SCMSubscribeReq subscribe = {0}; - tDeserializeSCMSubscribeReq(msgStr, &subscribe); + int32_t i = 0, j = 0; + while (i < oldTopicNum || j < newTopicNum) { + if (i >= oldTopicNum) { + char *newTopicCopy = taosStrdup(taosArrayGetP(pConsumerNew->assignedTopics, j)); + taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy); + j++; + continue; + } else if (j >= newTopicNum) { + char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i)); + taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy); + i++; + continue; + } else { + char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i); + char *newTopic = taosArrayGetP(pConsumerNew->assignedTopics, j); + int comp = strcmp(oldTopic, newTopic); + if (comp == 0) { + i++; + j++; + continue; + } else if (comp < 0) { + char *oldTopicCopy = taosStrdup(oldTopic); + taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy); + i++; + continue; + } else { + char *newTopicCopy = taosStrdup(newTopic); + taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy); + j++; + continue; + } + } + } + return 0; +} - int64_t consumerId = subscribe.consumerId; - char *cgroup = subscribe.cgroup; - SMqConsumerObj *pExistedConsumer = NULL; - SMqConsumerObj *pConsumerNew = NULL; - STrans *pTrans = NULL; - - SArray *pTopicList = subscribe.topicNames; +static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){ taosArraySort(pTopicList, taosArrayCompareString); taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem); @@ -571,125 +598,107 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i)); if (gNum >= MND_MAX_GROUP_PER_TOPIC) { terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE; - code = terrno; - goto _over; + return -1; } } + return TSDB_CODE_SUCCESS; +} - // check topic existence - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "subscribe"); - if (pTrans == NULL) { - goto _over; - } - - code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user, subscribe.enableReplay); - if (code != TSDB_CODE_SUCCESS) { - goto _over; - } - +static SMqConsumerObj* buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe){ + int64_t consumerId = subscribe->consumerId; + char *cgroup = subscribe->cgroup; + SMqConsumerObj *pConsumerNew = NULL; + SMqConsumerObj *pExistedConsumer = NULL; pExistedConsumer = mndAcquireConsumer(pMnode, consumerId); if (pExistedConsumer == NULL) { - mInfo("receive subscribe request from new consumer:0x%" PRIx64 " cgroup:%s, numOfTopics:%d", consumerId, - subscribe.cgroup, (int32_t)taosArrayGetSize(pTopicList)); + mInfo("receive subscribe request from new consumer:0x%" PRIx64 + ",cgroup:%s, numOfTopics:%d", consumerId, + subscribe->cgroup, (int32_t)taosArrayGetSize(subscribe->topicNames)); - pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); - tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId)); - - pConsumerNew->withTbName = subscribe.withTbName; - pConsumerNew->autoCommit = subscribe.autoCommit; - pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval; - pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg; - - // pConsumerNew->updateType = CONSUMER_UPDATE_SUB; // use insert logic - taosArrayDestroy(pConsumerNew->assignedTopics); - pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); - - // all subscribed topics should re-balance. - taosArrayDestroy(pConsumerNew->rebNewTopics); - pConsumerNew->rebNewTopics = pTopicList; - subscribe.topicNames = NULL; - - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over; - if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; + pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe); + if (pConsumerNew == NULL) { + goto _over; + } } else { int32_t status = atomic_load_32(&pExistedConsumer->status); mInfo("receive subscribe request from existed consumer:0x%" PRIx64 - " cgroup:%s, current status:%d(%s), subscribe topic num: %d", - consumerId, subscribe.cgroup, status, mndConsumerStatusName(status), newTopicNum); + ",cgroup:%s, current status:%d(%s), subscribe topic num: %d", + consumerId, subscribe->cgroup, status, mndConsumerStatusName(status), + (int32_t)taosArrayGetSize(subscribe->topicNames)); if (status != MQ_CONSUMER_STATUS_READY) { terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; goto _over; } - - pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); + pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe); if (pConsumerNew == NULL) { goto _over; } - // set the update type - pConsumerNew->updateType = CONSUMER_UPDATE_SUB; - taosArrayDestroy(pConsumerNew->assignedTopics); - pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); - - int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics); - - int32_t i = 0, j = 0; - while (i < oldTopicNum || j < newTopicNum) { - if (i >= oldTopicNum) { - char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j)); - taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy); - j++; - continue; - } else if (j >= newTopicNum) { - char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i)); - taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy); - i++; - continue; - } else { - char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i); - char *newTopic = taosArrayGetP(pTopicList, j); - int comp = strcmp(oldTopic, newTopic); - if (comp == 0) { - i++; - j++; - continue; - } else if (comp < 0) { - char *oldTopicCopy = taosStrdup(oldTopic); - taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy); - i++; - continue; - } else { - char *newTopicCopy = taosStrdup(newTopic); - taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy); - j++; - continue; - } - } + int32_t code = getTopicAddDelete(pExistedConsumer, pConsumerNew); + if (code != 0){ + terrno = code; + goto _over; } - // no topics need to be rebalanced - if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { + if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && + taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { goto _over; } - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over; - if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; + } + return pConsumerNew; + +_over: + mndReleaseConsumer(pMnode, pExistedConsumer); + tDeleteSMqConsumerObj(pConsumerNew); + taosArrayDestroyP(subscribe->topicNames, (FDelete)taosMemoryFree); + return NULL; +} + +int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { + SMnode *pMnode = pMsg->info.node; + char *msgStr = pMsg->pCont; + int32_t code = 0; + + SCMSubscribeReq subscribe = {0}; + tDeserializeSCMSubscribeReq(msgStr, &subscribe); + + SMqConsumerObj *pConsumerNew = NULL; + STrans *pTrans = NULL; + + code = checkAndSortTopic(pMnode, subscribe.topicNames); + if(code != TSDB_CODE_SUCCESS){ + goto _over; } + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "subscribe"); + if (pTrans == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _over; + } + + code = validateTopics(pTrans, subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay); + if (code != TSDB_CODE_SUCCESS) { + goto _over; + } + + pConsumerNew = buildSubConsumer(pMnode, &subscribe); + if(pConsumerNew == NULL){ + code = -1; + goto _over; + } + code = mndSetConsumerCommitLogs(pTrans, pConsumerNew); + if (code != 0) goto _over; + + code = mndTransPrepare(pMnode, pTrans); + if (code != 0) goto _over; code = TSDB_CODE_ACTION_IN_PROGRESS; _over: mndTransDrop(pTrans); - - if (pExistedConsumer) { - /*taosRUnLockLatch(&pExistedConsumer->lock);*/ - mndReleaseConsumer(pMnode, pExistedConsumer); - } - - tDeleteSMqConsumerObj(pConsumerNew, true); - + tDeleteSMqConsumerObj(pConsumerNew); taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); return code; } @@ -796,7 +805,7 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) { static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status, mndConsumerStatusName(pConsumer->status)); - tDeleteSMqConsumerObj(pConsumer, false); + tClearSMqConsumerObj(pConsumer); return 0; } @@ -888,20 +897,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->subscribeTime = taosGetTimestampMs(); pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer", pOldConsumer->consumerId); - // } else if (pNewConsumer->updateType == CONSUMER_UPDATE_TIMER_LOST) { - // int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics); - // for (int32_t i = 0; i < sz; i++) { - // char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i)); - // taosArrayPush(pOldConsumer->rebRemovedTopics, &topic); - // } - // - // int32_t prevStatus = pOldConsumer->status; - // pOldConsumer->status = MQ_CONSUMER_STATUS_LOST; - // mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", - // reb-removed-topics:%d", - // pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), - // mndConsumerStatusName(pOldConsumer->status), pOldConsumer->rebalanceTime, - // (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) { int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 5be641d1c2..654e9de617 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -249,7 +249,9 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) { return (void *)buf; } -SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup) { +static void *topicNameDup(void *p) { return taosStrdup((char *)p); } + +SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe) { SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj)); if (pConsumer == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -264,36 +266,64 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup) { pConsumer->hbStatus = 0; taosInitRWLatch(&pConsumer->lock); + pConsumer->createTime = taosGetTimestampMs(); + pConsumer->updateType = updateType; - pConsumer->currentTopics = taosArrayInit(0, sizeof(void *)); - pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *)); - pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *)); - pConsumer->assignedTopics = taosArrayInit(0, sizeof(void *)); + if (updateType == CONSUMER_ADD_REB){ + pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *)); + if(pConsumer->rebNewTopics == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } - if (pConsumer->currentTopics == NULL || pConsumer->rebNewTopics == NULL || pConsumer->rebRemovedTopics == NULL || - pConsumer->assignedTopics == NULL) { - taosArrayDestroy(pConsumer->currentTopics); - taosArrayDestroy(pConsumer->rebNewTopics); - taosArrayDestroy(pConsumer->rebRemovedTopics); - taosArrayDestroy(pConsumer->assignedTopics); - taosMemoryFree(pConsumer); - return NULL; + char* topicTmp = taosStrdup(topic); + taosArrayPush(pConsumer->rebNewTopics, &topicTmp); + }else if (updateType == CONSUMER_REMOVE_REB) { + pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *)); + if(pConsumer->rebRemovedTopics == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + char* topicTmp = taosStrdup(topic); + taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp); + }else if (updateType == CONSUMER_INSERT_SUB){ + tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId)); + pConsumer->withTbName = subscribe->withTbName; + pConsumer->autoCommit = subscribe->autoCommit; + pConsumer->autoCommitInterval = subscribe->autoCommitInterval; + pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg; + + + pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup); + if (pConsumer->rebNewTopics == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + pConsumer->assignedTopics = subscribe->topicNames; + subscribe->topicNames = NULL; + }else if (updateType == CONSUMER_UPDATE_SUB){ + pConsumer->assignedTopics = subscribe->topicNames;; + subscribe->topicNames = NULL; } - pConsumer->createTime = taosGetTimestampMs(); - return pConsumer; + +END: + tDeleteSMqConsumerObj(pConsumer); + return NULL; } -void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer, bool delete) { +void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) { if (pConsumer == NULL) return; taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree); - if (delete) { - taosMemoryFree(pConsumer); - } +} + +void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) { + tClearSMqConsumerObj(pConsumer); + taosMemoryFree(pConsumer); } int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { @@ -548,6 +578,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { } void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { + if (pSub == NULL) return; void *pIter = NULL; while (1) { pIter = taosHashIterate(pSub->consumerHash, pIter); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index fbdfd81cdf..a48573bbd0 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -41,9 +41,9 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg); static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg); static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter); -static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* hash); +static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj* hash); -static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { +static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) { SSdbRaw *pCommitRaw = mndSubActionEncode(pSub); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; @@ -223,47 +223,39 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { return pRebInfo; } -static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) { - int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); - const char *pSubKey = pOutput->pSub->key; - +static void processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) { + int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); int32_t actualRemoved = 0; for (int32_t i = 0; i < numOfRemoved; i++) { uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i); - SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); - - // consumer exists till now - if (pConsumerEp) { - actualRemoved++; - - int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); - for (int32_t j = 0; j < consumerVgNum; j++) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); - - SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp}; - taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("sub:%s mq re-balance remove vgId:%d from consumer:0x%" PRIx64, pSubKey, pVgEp->vgId, consumerId); - } - - taosArrayDestroy(pConsumerEp->vgs); - taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); - - // put into removed - taosArrayPush(pOutput->removedConsumers, &consumerId); + if (pConsumerEp == NULL) { + continue; } + + int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); + for (int32_t j = 0; j < consumerVgNum; j++) { + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); + SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp}; + taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); + mInfo("[rebalance] sub:%s mq re-balance remove vgId:%d from consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId, consumerId); + } + + taosArrayDestroy(pConsumerEp->vgs); + taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); + taosArrayPush(pOutput->removedConsumers, &consumerId); + actualRemoved++; } if (numOfRemoved != actualRemoved) { - mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", pSubKey, numOfRemoved, actualRemoved); + mError("[rebalance] sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key, numOfRemoved, actualRemoved); } else { - mInfo("sub:%s removed %d consumers", pSubKey, numOfRemoved); + mInfo("[rebalance] sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved); } } -static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) { +static void processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) { int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers); - const char *pSubKey = pOutput->pSub->key; for (int32_t i = 0; i < numOfNewConsumers; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i); @@ -274,14 +266,12 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosArrayPush(pOutput->newConsumers, &consumerId); - mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, pSubKey, consumerId); + mInfo("[rebalance] sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, consumerId); } } -static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { - const char *pSubKey = pOutput->pSub->key; +static void processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs); - for (int32_t i = 0; i < numOfVgroups; i++) { SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs); SMqRebOutputVg rebOutput = { @@ -291,7 +281,7 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg)); - mInfo("sub:%s mq re-balance addUnassignedVgroups vgId:%d from unassigned", pSubKey, pVgEp->vgId); + mInfo("[rebalance] sub:%s mq re-balance processUnassignedVgroups vgId:%d from unassigned", pOutput->pSub->key, pVgEp->vgId); } } @@ -307,11 +297,9 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { // } //} -static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt, - int32_t imbConsumerNum) { - const char *pSubKey = pOutput->pSub->key; - - int32_t imbCnt = 0; +static void processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt, + int32_t remainderVgCnt) { + int32_t cnt = 0; void *pIter = NULL; while (1) { @@ -323,11 +311,9 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); - // all old consumers still existing need to be modified - // TODO optimize: modify only consumer whose vgs changed taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId); if (consumerVgNum > minVgCnt) { - if (imbCnt < imbConsumerNum) { + if (cnt < remainderVgCnt) { // pop until equal minVg + 1 while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) { SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); @@ -337,10 +323,10 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId, - pConsumerEp->consumerId); + mInfo("[rebalance] sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, + pOutput->pSub->key, pVgEp->vgId, pConsumerEp->consumerId); } - imbCnt++; + cnt++; } else { // all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) { @@ -351,8 +337,8 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId, - pConsumerEp->consumerId); + mInfo("[rebalance] sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, + pOutput->pSub->key, pVgEp->vgId, pConsumerEp->consumerId); } } } @@ -404,7 +390,7 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput){ } } if(!find){ - mInfo("processRemoveAddVgs old vgId:%d", pVgEp->vgId); + mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEp->vgId); tDeleteSMqVgEp(pVgEp); taosArrayRemove(pConsumerEp->vgs, j); continue; @@ -415,7 +401,7 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput){ if(taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0){ taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs); - mInfo("processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs)); + mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs)); taosArrayDestroy(newVgs); }else{ taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp); @@ -423,72 +409,118 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput){ return totalVgNum; } -static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { - int32_t totalVgNum = processRemoveAddVgs(pMnode, pOutput); - const char *pSubKey = pOutput->pSub->key; +static void processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput){ + SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows + if (pSub == NULL) { + return; + } + taosRLockLatch(&pSub->lock); + if (pOutput->pSub->offsetRows == NULL) { + pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows)); + } + void *pIter = NULL; + while (1) { + pIter = taosHashIterate(pSub->consumerHash, pIter); + if (pIter == NULL) break; + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)); - int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); - int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers); - mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum, - pInput->oldConsumerNum, numOfAdded, numOfRemoved); + for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) { + OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j); + bool jump = false; + for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){ + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i); + if(pVgEp->vgId == d1->vgId){ + jump = true; + mInfo("pSub->offsetRows jump, because consumer id:0x%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId); + break; + } + } + if(jump) continue; + bool find = false; + for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) { + OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i); + if (d1->vgId == d2->vgId) { + d2->rows += d1->rows; + d2->offset = d1->offset; + d2->ever = d1->ever; + find = true; + mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows); + break; + } + } + if(!find){ + taosArrayPush(pOutput->pSub->offsetRows, d1); + } + } + } + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); +} - // 1. build temporary hash(vgId -> SMqRebOutputVg) to store vg that need to be assigned - SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - - // 2. check and get actual removed consumers, put their vg into pHash - doRemoveLostConsumers(pOutput, pHash, pInput); - - // 3. if previously no consumer, there are vgs not assigned, put these vg into pHash - addUnassignedVgroups(pOutput, pHash); - - // 4. calc vg number of each consumer - int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved; - - int32_t minVgCnt = 0; - int32_t imbConsumerNum = 0; - - // calc num - if (numOfFinal) { - minVgCnt = totalVgNum / numOfFinal; - imbConsumerNum = totalVgNum % numOfFinal; - mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has 1 more vgroups than avg value", - pSubKey, numOfFinal, minVgCnt, imbConsumerNum); - } else { - mInfo("sub:%s no consumer subscribe this topic", pSubKey); +static void printRebalanceLog(SMqRebOutputObj *pOutput){ + mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pOutput->pSub->key); + for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) { + SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i); + mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key, + pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId); } - // 5. remove vgroups from consumers who have more vgroups than the threshold value(minVgCnt or minVgCnt + 1), and then another vg into pHash - transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum); + void *pIter = NULL; + while (1) { + pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); + if (pIter == NULL) break; + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + int32_t sz = taosArrayGetSize(pConsumerEp->vgs); + mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key, pConsumerEp->consumerId, sz); + for (int32_t i = 0; i < sz; i++) { + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); + mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId, + pConsumerEp->consumerId); + } + } +} - // 6. add new consumer into sub - doAddNewConsumers(pOutput, pInput); +static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, + int32_t *minVgCnt, int32_t *remainderVgCnt){ + int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); + int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers); + int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved; + // calc num + if (numOfFinal != 0) { + *minVgCnt = totalVgNum / numOfFinal; + *remainderVgCnt = totalVgNum % numOfFinal; + } else { + mInfo("[rebalance] sub:%s no consumer subscribe this topic", pSubKey); + } + mInfo("[rebalance] sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d remainderVg:%d", + pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt); +} + +static void assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt){ SMqRebOutputVg *pRebVg = NULL; void *pRemovedIter = NULL; void *pIter = NULL; - // 7. extract bgroups from pHash and assign to consumers that do not have enough vgroups while (1) { pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) { break; } - SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - - // push until equal minVg while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) { - // iter hash and find one vg pRemovedIter = taosHashIterate(pHash, pRemovedIter); if (pRemovedIter == NULL) { - mError("sub:%s removed iter is null, never can reach hear", pSubKey); + mError("[rebalance] sub:%s removed iter is null, never can reach hear", pOutput->pSub->key); break; } pRebVg = (SMqRebOutputVg *)pRemovedIter; pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); - mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); + mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", + pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } @@ -498,24 +530,23 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR break; } SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { pRemovedIter = taosHashIterate(pHash, pRemovedIter); if (pRemovedIter == NULL) { - mInfo("sub:%s removed iter is null", pSubKey); + mInfo("[rebalance] sub:%s removed iter is null", pOutput->pSub->key); break; } pRebVg = (SMqRebOutputVg *)pRemovedIter; pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); - mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); + mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", + pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } - // All assigned vg should be put into pOutput->rebVgs if(pRemovedIter != NULL){ - mError("sub:%s error pRemovedIter should be NULL", pSubKey); + mError("[rebalance]sub:%s error pRemovedIter should be NULL", pOutput->pSub->key); } while (1) { pRemovedIter = taosHashIterate(pHash, pRemovedIter); @@ -529,88 +560,77 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); // put all vg into unassigned } } +} - SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows - if (pSub) { - taosRLockLatch(&pSub->lock); - if (pOutput->pSub->offsetRows == NULL) { - pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows)); - } - pIter = NULL; - while (1) { - pIter = taosHashIterate(pSub->consumerHash, pIter); - if (pIter == NULL) break; - SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)); +static void mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { + int32_t totalVgNum = processRemoveAddVgs(pMnode, pOutput); + const char *pSubKey = pOutput->pSub->key; + int32_t minVgCnt = 0; + int32_t remainderVgCnt = 0; - for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) { - OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j); - bool jump = false; - for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){ - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i); - if(pVgEp->vgId == d1->vgId){ - jump = true; - mInfo("pSub->offsetRows jump, because consumer id:0x%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId); - break; - } - } - if(jump) continue; - bool find = false; - for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) { - OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i); - if (d1->vgId == d2->vgId) { - d2->rows += d1->rows; - d2->offset = d1->offset; - d2->ever = d1->ever; - find = true; - mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows); - break; - } - } - if(!find){ - taosArrayPush(pOutput->pSub->offsetRows, d1); - } - } - } - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); - } + SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - // 8. generate logs - mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pSubKey); - for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) { - SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i); - mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey, - pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId); - } - - pIter = NULL; - while (1) { - pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); - if (pIter == NULL) break; - SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - int32_t sz = taosArrayGetSize(pConsumerEp->vgs); - mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz); - for (int32_t i = 0; i < sz; i++) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); - mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId, - pConsumerEp->consumerId); - } - } - - // 9. clear + processRemovedConsumers(pOutput, pHash, pInput); + processUnassignedVgroups(pOutput, pHash); + calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt); + processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt); + processNewConsumers(pOutput, pInput); + assignVgroups(pOutput, pHash, minVgCnt); + processSubOffsetRows(pMnode, pInput, pOutput); + printRebalanceLog(pOutput); taosHashCleanup(pHash); +} - return 0; +static int32_t presistConsumerByType(STrans *pTrans, SArray* consumers, int8_t type, char *cgroup, char *topic){ + int32_t code = 0; + SMqConsumerObj *pConsumerNew = NULL; + int32_t consumerNum = taosArrayGetSize(consumers); + for (int32_t i = 0; i < consumerNum; i++) { + int64_t consumerId = *(int64_t *)taosArrayGet(consumers, i); + pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup, type, topic, NULL); + if (pConsumerNew == NULL){ + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + + code = mndSetConsumerCommitLogs(pTrans, pConsumerNew); + if (code != 0) { + goto END; + } + + tDeleteSMqConsumerObj(pConsumerNew); + } + pConsumerNew = NULL; + +END: + tDeleteSMqConsumerObj(pConsumerNew); + return code; +} + +static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic){ + int32_t code = presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL); + if (code != 0) { + return code; + } + + code = presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic); + if (code != 0) { + return code; + } + + return presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic); } static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) { - struct SSubplan* pPlan = NULL; + struct SSubplan *pPlan = NULL; + int32_t code = 0; + STrans *pTrans = NULL; + if(strcmp(pOutput->pSub->qmsg, "") != 0){ - int32_t code = qStringToSubplan(pOutput->pSub->qmsg, &pPlan); - if (code != TSDB_CODE_SUCCESS) { + code = qStringToSubplan(pOutput->pSub->qmsg, &pPlan); + if (code != 0) { terrno = code; - return -1; + goto END; } } @@ -618,110 +638,55 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu char cgroup[TSDB_CGROUP_LEN] = {0}; mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "tmq-reb"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "tmq-reb"); if (pTrans == NULL) { - nodesDestroyNode((SNode*)pPlan); - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; } mndTransSetDbName(pTrans, topic, cgroup); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - mndTransDrop(pTrans); - nodesDestroyNode((SNode*)pPlan); - return -1; + code = mndTransCheckConflict(pMnode, pTrans); + if (code != 0) { + goto END; } - // make txn: // 1. redo action: action to all vg const SArray *rebVgs = pOutput->rebVgs; int32_t vgNum = taosArrayGetSize(rebVgs); for (int32_t i = 0; i < vgNum; i++) { SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i); - if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan) < 0) { - mndTransDrop(pTrans); - nodesDestroyNode((SNode*)pPlan); - return -1; + code = mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan); + if (code != 0) { + goto END; } } - nodesDestroyNode((SNode*)pPlan); - // 2. redo log: subscribe and vg assignment - // subscribe - if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) { - mndTransDrop(pTrans); - return -1; + // 2. commit log: subscribe and vg assignment + code = mndSetSubCommitLogs(pTrans, pOutput->pSub); + if (code != 0) { + goto END; } // 3. commit log: consumer to update status and epoch - // 3.1 set touched consumer - int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers); - for (int32_t i = 0; i < consumerNum; i++) { - int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i); - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE_REB; - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { - tDeleteSMqConsumerObj(pConsumerNew, true); - - mndTransDrop(pTrans); - return -1; - } - - tDeleteSMqConsumerObj(pConsumerNew, true); + code = mndPresistConsumer(pTrans, pOutput, cgroup, topic); + if (code != 0) { + goto END; } - // 3.2 set new consumer - consumerNum = taosArrayGetSize(pOutput->newConsumers); - for (int32_t i = 0; i < consumerNum; i++) { - int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i); - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); - pConsumerNew->updateType = CONSUMER_ADD_REB; - - char* topicTmp = taosStrdup(topic); - taosArrayPush(pConsumerNew->rebNewTopics, &topicTmp); - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { - tDeleteSMqConsumerObj(pConsumerNew, true); - - mndTransDrop(pTrans); - return -1; - } - - tDeleteSMqConsumerObj(pConsumerNew, true); - } - - // 3.3 set removed consumer - consumerNum = taosArrayGetSize(pOutput->removedConsumers); - for (int32_t i = 0; i < consumerNum; i++) { - int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i); - - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); - pConsumerNew->updateType = CONSUMER_REMOVE_REB; - - char* topicTmp = taosStrdup(topic); - taosArrayPush(pConsumerNew->rebRemovedTopics, &topicTmp); - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { - tDeleteSMqConsumerObj(pConsumerNew, true); - - mndTransDrop(pTrans); - return -1; - } - - tDeleteSMqConsumerObj(pConsumerNew, true); - } - - // 4. TODO commit log: modification log - - // 5. set cb + // 4. set cb mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0); - // 6. execution - if (mndTransPrepare(pMnode, pTrans) != 0) { + // 5. execution + code = mndTransPrepare(pMnode, pTrans); + if (code != 0) { mError("failed to prepare trans rebalance since %s", terrstr()); - mndTransDrop(pTrans); - return -1; + goto END; } +END: + nodesDestroyNode((SNode*)pPlan); mndTransDrop(pTrans); - return 0; + return code; } static void freeRebalanceItem(void *param) { @@ -730,10 +695,55 @@ static void freeRebalanceItem(void *param) { taosArrayDestroy(pInfo->removedConsumers); } -static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) { +// type = 0 remove type = 1 add +static void buildRebInfo(SHashObj* rebSubHash, SArray* topicList, int8_t type, char *group, int64_t consumerId){ + int32_t topicNum = taosArrayGetSize(topicList); + for (int32_t i = 0; i < topicNum; i++) { + char key[TSDB_SUBSCRIBE_KEY_LEN]; + char *removedTopic = taosArrayGetP(topicList, i); + mndMakeSubscribeKey(key, group, removedTopic); + SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key); + if(type == 0) + taosArrayPush(pRebSub->removedConsumers, &consumerId); + else if(type == 1) + taosArrayPush(pRebSub->newConsumers, &consumerId); + } +} + +static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj* rebSubHash){ + int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics); + for (int32_t i = 0; i < newTopicNum; i++) { + char * topic = taosArrayGetP(pConsumer->currentTopics, i); + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); + if (pSub == NULL) { + continue; + } + taosRLockLatch(&pSub->lock); + + // iterate all vg assigned to the consumer of that topic + SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t)); + int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs); + + for (int32_t j = 0; j < vgNum; j++) { + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); + SVgObj * pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId); + if (!pVgroup) { + char key[TSDB_SUBSCRIBE_KEY_LEN]; + mndMakeSubscribeKey(key, pConsumer->cgroup, topic); + mndGetOrCreateRebSub(rebSubHash, key); + mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId); + } + mndReleaseVgroup(pMnode, pVgroup); + } + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); + } +} + +static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) { SMnode *pMnode = pMsg->info.node; SSdb *pSdb = pMnode->pSdb; - SMqConsumerObj *pConsumer; + SMqConsumerObj *pConsumer = NULL; void *pIter = NULL; // iterate all consumers, find all modification @@ -746,89 +756,33 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) { int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t status = atomic_load_32(&pConsumer->status); - mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d", + mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d", pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime, hbStatus); + if ((taosArrayGetSize(pConsumer->assignedTopics) == 0) || + (status == MQ_CONSUMER_STATUS_LOST && + hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD)){ // unsubscribe or close + mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); + } + if (status == MQ_CONSUMER_STATUS_READY) { - if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info); - } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { + if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { taosRLockLatch(&pConsumer->lock); - int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics); - for (int32_t i = 0; i < topicNum; i++) { - char key[TSDB_SUBSCRIBE_KEY_LEN]; - char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i); - mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic); - SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key); - taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); - } + buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId); taosRUnLockLatch(&pConsumer->lock); }else{ - int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics); - for (int32_t i = 0; i < newTopicNum; i++) { - char * topic = taosArrayGetP(pConsumer->currentTopics, i); - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); - if (pSub == NULL) { - continue; - } - taosRLockLatch(&pSub->lock); - - // 2.2 iterate all vg assigned to the consumer of that topic - SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t)); - int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs); - - for (int32_t j = 0; j < vgNum; j++) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); - SVgObj * pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId); - if (!pVgroup) { - char key[TSDB_SUBSCRIBE_KEY_LEN]; - mndMakeSubscribeKey(key, pConsumer->cgroup, topic); - mndGetOrCreateRebSub(rebSubHash, key); - mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId); - } - mndReleaseVgroup(pMnode, pVgroup); - } - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); - } + checkForVgroupSplit(pMnode, pConsumer, rebSubHash); } - } else if (status == MQ_CONSUMER_STATUS_LOST) { - if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info); - } - } else { + } else if (status != MQ_CONSUMER_STATUS_LOST) { taosRLockLatch(&pConsumer->lock); - - int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics); - for (int32_t i = 0; i < newTopicNum; i++) { - char key[TSDB_SUBSCRIBE_KEY_LEN]; - char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i); - mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic); - SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key); - taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId); - } - - int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics); - for (int32_t i = 0; i < removedTopicNum; i++) { - char key[TSDB_SUBSCRIBE_KEY_LEN]; - char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i); - mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic); - SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key); - taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); - } - - if (newTopicNum == 0 && removedTopicNum == 0 && taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info); - } - + buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId); + buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId); taosRUnLockLatch(&pConsumer->lock); } mndReleaseConsumer(pMnode, pConsumer); } - - return 0; } bool mndRebTryStart() { @@ -847,29 +801,94 @@ void mndRebCntDec() { mInfo("rebalance cnt sub, value:%d", val); } +static void clearRebOutput(SMqRebOutputObj *rebOutput){ + taosArrayDestroy(rebOutput->newConsumers); + taosArrayDestroy(rebOutput->modifyConsumers); + taosArrayDestroy(rebOutput->removedConsumers); + taosArrayDestroy(rebOutput->rebVgs); + tDeleteSubscribeObj(rebOutput->pSub); + taosMemoryFree(rebOutput->pSub); +} + +static int32_t initRebOutput(SMqRebOutputObj *rebOutput) { + rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t)); + rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t)); + rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t)); + rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); + + if (rebOutput->newConsumers == NULL || rebOutput->removedConsumers == NULL || rebOutput->modifyConsumers == NULL || + rebOutput->rebVgs == NULL) { + clearRebOutput(rebOutput); + return TSDB_CODE_OUT_OF_MEMORY; + } + return 0; +} + +static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput){ + const char *key = rebInput->pRebInfo->key; + SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, key); + + if (pSub == NULL) { + // split sub key and extract topic + char topic[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CGROUP_LEN]; + mndSplitSubscribeKey(key, topic, cgroup, true); + + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); + if (pTopic == NULL) { + mError("[rebalance] mq rebalance %s ignored since topic %s doesn't exist", key, topic); + return -1; + } + + taosRLockLatch(&pTopic->lock); + + rebOutput->pSub = mndCreateSubscription(pMnode, pTopic, key); + + if (rebOutput->pSub == NULL) { + mError("[rebalance] mq rebalance %s failed create sub since %s, ignore", key, terrstr()); + taosRUnLockLatch(&pTopic->lock); + mndReleaseTopic(pMnode, pTopic); + return -1; + } + + memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN); + taosRUnLockLatch(&pTopic->lock); + mndReleaseTopic(pMnode, pTopic); + + rebInput->oldConsumerNum = 0; + mInfo("[rebalance] sub topic:%s has no consumers sub yet", key); + } else { + taosRLockLatch(&pSub->lock); + rebInput->oldConsumerNum = taosHashGetSize(pSub->consumerHash); + rebOutput->pSub = tCloneSubscribeObj(pSub); + taosRUnLockLatch(&pSub->lock); + + mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum); + mndReleaseSubscribe(pMnode, pSub); + } + return 0; +} + static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { int code = 0; - mInfo("start to process mq timer"); + mInfo("[rebalance] start to process mq timer"); if (!mndRebTryStart()) { - mInfo("mq rebalance already in progress, do nothing"); + mInfo("[rebalance] mq rebalance already in progress, do nothing"); return code; } SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); if (rebSubHash == NULL) { - mError("failed to create rebalance hashmap"); - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = -1; + code = TSDB_CODE_OUT_OF_MEMORY; goto END; } taosHashSetFreeFp(rebSubHash, freeRebalanceItem); mndCheckConsumer(pMsg, rebSubHash); - mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash)); + mInfo("[rebalance] mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash)); - // here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction. void *pIter = NULL; SMnode *pMnode = pMsg->info.node; while (1) { @@ -880,92 +899,31 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { SMqRebInputObj rebInput = {0}; SMqRebOutputObj rebOutput = {0}; - rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t)); - rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t)); - rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t)); - rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); - - if (rebOutput.newConsumers == NULL || rebOutput.removedConsumers == NULL || rebOutput.modifyConsumers == NULL || - rebOutput.rebVgs == NULL) { - taosArrayDestroy(rebOutput.newConsumers); - taosArrayDestroy(rebOutput.removedConsumers); - taosArrayDestroy(rebOutput.modifyConsumers); - taosArrayDestroy(rebOutput.rebVgs); - - taosHashCancelIterate(rebSubHash, pIter); - terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("mq re-balance failed, due to out of memory"); - code = -1; + code = initRebOutput(&rebOutput); + if (code != 0){ goto END; } - SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; - SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key); + SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; + rebInput.pRebInfo = pRebInfo; - rebInput.pRebInfo = pRebInfo; - - if (pSub == NULL) { - // split sub key and extract topic - char topic[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true); - - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - if (pTopic == NULL) { - mError("mq re-balance %s ignored since topic %s doesn't exist", pRebInfo->key, topic); - continue; - } - - taosRLockLatch(&pTopic->lock); - - rebOutput.pSub = mndCreateSubscription(pMnode, pTopic, pRebInfo->key); - - if (rebOutput.pSub == NULL) { - mError("mq rebalance %s failed create sub since %s, ignore", pRebInfo->key, terrstr()); - taosRUnLockLatch(&pTopic->lock); - mndReleaseTopic(pMnode, pTopic); - continue; - } - - memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN); - taosRUnLockLatch(&pTopic->lock); - mndReleaseTopic(pMnode, pTopic); - - rebInput.oldConsumerNum = 0; - mInfo("sub topic:%s has no consumers sub yet", pRebInfo->key); - } else { - taosRLockLatch(&pSub->lock); - rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash); - rebOutput.pSub = tCloneSubscribeObj(pSub); - taosRUnLockLatch(&pSub->lock); - - mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum); - mndReleaseSubscribe(pMnode, pSub); + if (buildRebOutput(pMnode, &rebInput, &rebOutput) != 0){ + continue; } - if (mndDoRebalance(pMnode, &rebInput, &rebOutput) < 0) { - mError("mq re-balance internal error"); - } + mndDoRebalance(pMnode, &rebInput, &rebOutput); - // if add more consumer to balanced subscribe, - // possibly no vg is changed - // when each topic is re-balanced, issue an trans to save the results in sdb. - if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) { + if (mndPersistRebResult(pMnode, pMsg, &rebOutput) != 0) { mError("mq re-balance persist output error, possibly vnode splitted or dropped,msg:%s", terrstr()); } - taosArrayDestroy(rebOutput.newConsumers); - taosArrayDestroy(rebOutput.modifyConsumers); - taosArrayDestroy(rebOutput.removedConsumers); - taosArrayDestroy(rebOutput.rebVgs); - tDeleteSubscribeObj(rebOutput.pSub); - taosMemoryFree(rebOutput.pSub); + clearRebOutput(&rebOutput); } - // reset flag - mInfo("mq re-balance completed successfully"); + mInfo("[rebalance] mq re-balance completed successfully, wait trans finish"); END: + taosHashCancelIterate(rebSubHash, pIter); taosHashCleanup(rebSubHash); mndRebCntDec(); @@ -995,6 +953,28 @@ static int32_t sendDeleteSubToVnode(SMqSubscribeObj *pSub, STrans *pTrans){ return 0; } +static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic){ + void *pIter = NULL; + SMqConsumerObj *pConsumer = NULL; + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); + if (pIter == NULL) { + break; + } + + if (strcmp(cgroup, pConsumer->cgroup) == 0 && taosArrayGetSize(pConsumer->assignedTopics) == 1 && + strcmp(topic, taosArrayGetP(pConsumer->assignedTopics, 0)) == 0) { + int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer); + if (code != 0) { + sdbRelease(pMnode->pSdb, pConsumer); + return code; + } + } + sdbRelease(pMnode->pSdb, pConsumer); + } + return 0; +} + static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMDropCgroupReq dropReq = {0}; @@ -1033,41 +1013,31 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { goto end; } + mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); mndTransSetDbName(pTrans, dropReq.topic, dropReq.cgroup); code = mndTransCheckConflict(pMnode, pTrans); if (code != 0) { goto end; } - void *pIter = NULL; - SMqConsumerObj *pConsumer; - while (1) { - pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) { - break; - } - - if (strcmp(dropReq.cgroup, pConsumer->cgroup) == 0) { - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info); - } - sdbRelease(pMnode->pSdb, pConsumer); + code = mndDropConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic); + if (code != 0) { + goto end; } - mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); - code = sendDeleteSubToVnode(pSub, pTrans); if (code != 0) { goto end; } - if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { + code = mndSetDropSubCommitLogs(pMnode, pTrans, pSub); + if (code != 0) { mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); - code = -1; goto end; } - if (mndTransPrepare(pMnode, pTrans) < 0) { - code = -1; + code = mndTransPrepare(pMnode, pTrans); + if (code != 0) { goto end; } @@ -1219,12 +1189,11 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc return 0; } -int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) { +void mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) { int32_t tlen = strlen(cgroup); memcpy(key, cgroup, tlen); key[tlen] = TMQ_SEPARATOR; strcpy(key + tlen + 1, topicName); - return 0; } SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) { @@ -1294,40 +1263,13 @@ int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj return 0; } -//int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { -// int32_t code = 0; -// SSdb *pSdb = pMnode->pSdb; -// -// void *pIter = NULL; -// SMqSubscribeObj *pSub = NULL; -// while (1) { -// pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); -// if (pIter == NULL) break; -// -// if (pSub->dbUid != pDb->uid) { -// sdbRelease(pSdb, pSub); -// continue; -// } -// -// if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { -// sdbRelease(pSdb, pSub); -// sdbCancelFetch(pSdb, pIter); -// code = -1; -// break; -// } -// -// sdbRelease(pSdb, pSub); -// } -// -// return code; -//} - int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) { - SSdb *pSdb = pMnode->pSdb; - + SSdb *pSdb = pMnode->pSdb; + int32_t code = 0; void *pIter = NULL; SMqSubscribeObj *pSub = NULL; while (1) { + sdbRelease(pSdb, pSub); pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); if (pIter == NULL) break; @@ -1335,33 +1277,31 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) char cgroup[TSDB_CGROUP_LEN] = {0}; mndSplitSubscribeKey(pSub->key, topic, cgroup, true); if (strcmp(topic, topicName) != 0) { - sdbRelease(pSdb, pSub); continue; } // iter all vnode to delete handle if (taosHashGetSize(pSub->consumerHash) != 0) { - sdbRelease(pSdb, pSub); - terrno = TSDB_CODE_MND_IN_REBALANCE; - sdbCancelFetch(pSdb, pIter); - return -1; - } - if (sendDeleteSubToVnode(pSub, pTrans) != 0) { - sdbRelease(pSdb, pSub); - sdbCancelFetch(pSdb, pIter); - return -1; + code = TSDB_CODE_MND_IN_REBALANCE; + goto END; } - if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { - sdbRelease(pSdb, pSub); - sdbCancelFetch(pSdb, pIter); - return -1; + code = sendDeleteSubToVnode(pSub, pTrans); + if (code != 0) { + goto END; } - sdbRelease(pSdb, pSub); + code = mndSetDropSubCommitLogs(pMnode, pTrans, pSub); + if (code != 0) { + goto END; + } } - return 0; +END: + sdbRelease(pSdb, pSub); + sdbCancelFetch(pSdb, pIter); + + return code; } static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t consumerId, const char* topic, const char* cgroup, SArray* vgs, SArray *offsetRows){ diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index de543f4256..9d43856e00 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -653,9 +653,103 @@ _OVER: return code; } +static bool checkTopic(SArray *topics, char *topicName){ + int32_t sz = taosArrayGetSize(topics); + for (int32_t i = 0; i < sz; i++) { + char *name = taosArrayGetP(topics, i); + if (strcmp(name, topicName) == 0) { + return true; + } + } + return false; +} + +static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){ + int32_t code = 0; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SMqConsumerObj *pConsumer = NULL; + while (1) { + sdbRelease(pSdb, pConsumer); + pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); + if (pIter == NULL) { + break; + } + + bool found = checkTopic(pConsumer->assignedTopics, topicName); + if (found){ + if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { + code = mndSetConsumerDropLogs(pTrans, pConsumer); + if (code != 0) { + goto end; + } + continue; + } + mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", + topicName, pConsumer->consumerId, pConsumer->cgroup); + code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + goto end; + } + + if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) { + code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", + topicName, pConsumer->consumerId, pConsumer->cgroup); + goto end; + } + + } + +end: + sdbRelease(pSdb, pConsumer); + sdbCancelFetch(pSdb, pIter); + return code; +} + +static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic){ + // broadcast to all vnode + void *pIter = NULL; + SVgObj *pVgroup = NULL; + int32_t code = 0; + SSdb *pSdb = pMnode->pSdb; + void *buf = NULL; + while (1) { + sdbRelease(pSdb, pVgroup); + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) { + continue; + } + + buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN); + if (buf == NULL){ + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId); + memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN); + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + action.pCont = buf; + action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN; + action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO; + code = mndTransAppendRedoAction(pTrans, &action); + if (code != 0) { + goto end; + } + } + +end: + taosMemoryFree(buf); + sdbRelease(pSdb, pVgroup); + sdbCancelFetch(pSdb, pIter); + return code; +} + static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; SMDropTopicReq dropReq = {0}; int32_t code = 0; SMqTopicObj *pTopic = NULL; @@ -705,68 +799,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { goto end; } - void *pIter = NULL; - SMqConsumerObj *pConsumer; - while (1) { - pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) { - break; - } - - bool found = false; - int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); - for (int32_t i = 0; i < sz; i++) { - char *name = taosArrayGetP(pConsumer->assignedTopics, i); - if (strcmp(name, pTopic->name) == 0) { - found = true; - break; - } - } - if (found){ - if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pReq->info); - mndReleaseConsumer(pMnode, pConsumer); - continue; - } - - mndReleaseConsumer(pMnode, pConsumer); - sdbCancelFetch(pSdb, pIter); - terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", - dropReq.name, pConsumer->consumerId, pConsumer->cgroup); - code = -1; - goto end; - } - - sz = taosArrayGetSize(pConsumer->rebNewTopics); - for (int32_t i = 0; i < sz; i++) { - char *name = taosArrayGetP(pConsumer->rebNewTopics, i); - if (strcmp(name, pTopic->name) == 0) { - mndReleaseConsumer(pMnode, pConsumer); - sdbCancelFetch(pSdb, pIter); - terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", - dropReq.name, pConsumer->consumerId, pConsumer->cgroup); - code = -1; - goto end; - } - } - - sz = taosArrayGetSize(pConsumer->rebRemovedTopics); - for (int32_t i = 0; i < sz; i++) { - char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i); - if (strcmp(name, pTopic->name) == 0) { - mndReleaseConsumer(pMnode, pConsumer); - sdbCancelFetch(pSdb, pIter); - terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)", - dropReq.name, pConsumer->consumerId, pConsumer->cgroup); - code = -1; - goto end; - } - } - - sdbRelease(pSdb, pConsumer); + code = mndDropConsumerByTopic(pMnode, pTrans, dropReq.name); + if (code != 0) { + goto end; } code = mndDropSubByTopic(pMnode, pTrans, dropReq.name); @@ -776,36 +811,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } if (pTopic->ntbUid != 0) { - // broadcast to all vnode - pIter = NULL; - SVgObj *pVgroup = NULL; - - while (1) { - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) { - sdbRelease(pSdb, pVgroup); - continue; - } - - void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN); - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId); - memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN); - - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgroup); - action.pCont = buf; - action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN; - action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO; - code = mndTransAppendRedoAction(pTrans, &action); - if (code != 0) { - taosMemoryFree(buf); - sdbRelease(pSdb, pVgroup); - sdbCancelFetch(pSdb, pIter); - goto end; - } - sdbRelease(pSdb, pVgroup); + code = mndDropCheckInfoByTopic(pMnode, pTrans, pTopic); + if (code != 0) { + goto end; } } @@ -822,7 +830,6 @@ end: SName name = {0}; tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - //reuse this function for topic auditRecord(pReq, pMnode->clusterId, "dropTopic", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen);