diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4c5ccd1a3e..16a72fbd2c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3921,11 +3921,11 @@ int32_t tSerializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq); int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq); int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); -int32_t tDeatroySMqHbReq(SMqHbReq* pReq); +void tDestroySMqHbReq(SMqHbReq* pReq); int32_t tSerializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp); int32_t tDeserializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp); -int32_t tDeatroySMqHbRsp(SMqHbRsp* pRsp); +void tDestroySMqHbRsp(SMqHbRsp* pRsp); int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index fb1882e472..c8fc556150 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -779,7 +779,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { taosWUnLockLatch(&tmq->lock); taosReleaseRef(tmqMgmt.rsetId, refId); } - tDeatroySMqHbRsp(&rsp); + tDestroySMqHbRsp(&rsp); taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); } @@ -861,7 +861,7 @@ void tmqSendHbReq(void* param, void* tmrId) { asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); OVER: - tDeatroySMqHbReq(&req); + tDestroySMqHbReq(&req); taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); taosReleaseRef(tmqMgmt.rsetId, refId); } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9138d7c983..5b154df8a3 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6201,9 +6201,8 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) { return 0; } -int32_t tDeatroySMqHbRsp(SMqHbRsp *pRsp) { +void tDestroySMqHbRsp(SMqHbRsp *pRsp) { taosArrayDestroy(pRsp->topicPrivileges); - return 0; } int32_t tSerializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) { @@ -6250,13 +6249,12 @@ int32_t tDeserializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) { return 0; } -int32_t tDeatroySMqHbReq(SMqHbReq *pReq) { +void tDestroySMqHbReq(SMqHbReq *pReq) { for (int i = 0; i < taosArrayGetSize(pReq->topics); i++) { TopicOffsetRows *vgs = taosArrayGet(pReq->topics, i); if (vgs) taosArrayDestroy(vgs->offsetRows); } taosArrayDestroy(pReq->topics); - return 0; } int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 8c89ddc825..5184ad0eca 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -24,27 +24,22 @@ extern "C" { enum { MQ_CONSUMER_STATUS_REBALANCE = 1, -// MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore MQ_CONSUMER_STATUS_READY, MQ_CONSUMER_STATUS_LOST, -// MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore -// MQ_CONSUMER_STATUS__LOST_REBD, -};\ +}; int32_t mndInitConsumer(SMnode *pMnode); void mndCleanupConsumer(SMnode *pMnode); -void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* info); +void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo* info); SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); -SMqConsumerObj *mndCreateConsumer(int64_t consumerId, const char *cgroup); - SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer); SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); -int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer); -int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer); +int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer); +int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer); const char *mndConsumerStatusName(int status); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 4a092057ce..b98ef31d75 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -149,6 +149,7 @@ typedef enum { CONSUMER_REMOVE_REB, // remove after rebalance CONSUMER_UPDATE_REC, // update after recover CONSUMER_UPDATE_SUB, // update after subscribe req + CONSUMER_INSERT_SUB, } ECsmUpdateType; typedef struct { @@ -556,8 +557,9 @@ typedef struct { int32_t resetOffsetCfg; } SMqConsumerObj; -SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]); -void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool isDeleted); +SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe); +void tClearSMqConsumerObj(SMqConsumerObj* pConsumer); +void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer); int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer); void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver); diff --git a/source/dnode/mnode/impl/inc/mndSubscribe.h b/source/dnode/mnode/impl/inc/mndSubscribe.h index 23b3a7d1fe..eb9902a75c 100644 --- a/source/dnode/mnode/impl/inc/mndSubscribe.h +++ b/source/dnode/mnode/impl/inc/mndSubscribe.h @@ -30,7 +30,7 @@ SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const c SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key); void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); -int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName); +void mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName); int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic); int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index f60d8035e3..52671f6b66 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -68,25 +68,27 @@ int32_t mndInitConsumer(SMnode *pMnode) { void mndCleanupConsumer(SMnode *pMnode) {} -void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo *info) { - SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); - if (pClearMsg == NULL) { +void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) { + void *msg = rpcMallocCont(sizeof(int64_t)); + if (msg == NULL) { mError("consumer:0x%" PRIx64 " failed to clear consumer due to out of memory. alloc size:%d", consumerId, - (int32_t)sizeof(SMqConsumerClearMsg)); + (int32_t)sizeof(int64_t)); return; } - pClearMsg->consumerId = consumerId; + *(int64_t*)msg = consumerId; SRpcMsg rpcMsg = { - .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, - .pCont = pClearMsg, - .contLen = sizeof(SMqConsumerClearMsg), + .msgType = msgType, + .pCont = msg, + .contLen = sizeof(int64_t), .info = *info, }; - mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId); - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); - return; + mInfo("mndSendConsumerMsg type:%d consumer:0x%" PRIx64, msgType, consumerId); + int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + if (code != 0){ + mError("consumer:%"PRId64" send consumer msg:%d error:%d", consumerId, msgType, code); + } } static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, @@ -148,54 +150,62 @@ FAILED: } static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { + int32_t code = 0; SMnode *pMnode = pMsg->info.node; SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont; + SMqConsumerObj *pConsumerNew = NULL; + STrans *pTrans = NULL; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId); if (pConsumer == NULL) { mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId); - return -1; + code = -1; + goto END; } mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId, pConsumer->status, mndConsumerStatusName(pConsumer->status)); if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { - mndReleaseConsumer(pMnode, pConsumer); terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; - return -1; + code = -1; + goto END; } - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE_REC; + pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_UPDATE_REC, NULL, NULL); + if (pConsumerNew == NULL){ + code = -1; + goto END; + } - mndReleaseConsumer(pMnode, pConsumer); - - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm"); if (pTrans == NULL) { - goto FAIL; + code = -1; + goto END; } - if (validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false) != 0) { - goto FAIL; + code = validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false); + if (code != 0) { + goto END; } - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; - if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; - - tDeleteSMqConsumerObj(pConsumerNew, true); + code = mndSetConsumerCommitLogs(pTrans, pConsumerNew); + if (code != 0) { + goto END; + } + code = mndTransPrepare(pMnode, pTrans); +END: + mndReleaseConsumer(pMnode, pConsumer); + tDeleteSMqConsumerObj(pConsumerNew); mndTransDrop(pTrans); - return 0; -FAIL: - tDeleteSMqConsumerObj(pConsumerNew, true); - - mndTransDrop(pTrans); - return -1; + return code; } -// todo check the clear process static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { + int32_t code = 0; SMnode *pMnode = pMsg->info.node; SMqConsumerClearMsg *pClearMsg = pMsg->pCont; + SMqConsumerObj *pConsumerNew = NULL; + STrans *pTrans = NULL; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId); if (pConsumer == NULL) { @@ -206,33 +216,37 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId, mndConsumerStatusName(pConsumer->status)); - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); + pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL); + if (pConsumerNew == NULL){ + code = -1; + goto END; + } - mndReleaseConsumer(pMnode, pConsumer); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm"); + if (pTrans == NULL) { + code = -1; + goto END; + } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm"); - if (pTrans == NULL) goto FAIL; // this is the drop action, not the update action - if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; - if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; + code = mndSetConsumerDropLogs(pTrans, pConsumerNew); + if (code != 0) { + goto END; + } - tDeleteSMqConsumerObj(pConsumerNew, true); + code = mndTransPrepare(pMnode, pTrans); +END: + mndReleaseConsumer(pMnode, pConsumer); + tDeleteSMqConsumerObj(pConsumerNew); mndTransDrop(pTrans); - return 0; - -FAIL: - tDeleteSMqConsumerObj(pConsumerNew, true); - - mndTransDrop(pTrans); - return -1; + return code; } static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char *user) { rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege)); if (rsp->topicPrivileges == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; + return TSDB_CODE_OUT_OF_MEMORY; } for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) { char *topic = taosArrayGetP(pConsumer->currentTopics, i); @@ -253,6 +267,47 @@ static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRs return 0; } +static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pConsumer){ + for (int i = 0; i < taosArrayGetSize(req->topics); i++) { + TopicOffsetRows *data = taosArrayGet(req->topics, i); + mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName); + + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); + if (pSub == NULL) { + continue; + } + taosWLockLatch(&pSub->lock); + SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t)); + if (pConsumerEp) { + taosArrayDestroy(pConsumerEp->offsetRows); + pConsumerEp->offsetRows = data->offsetRows; + data->offsetRows = NULL; + } + taosWUnLockLatch(&pSub->lock); + + mndReleaseSubscribe(pMnode, pSub); + } +} + +static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){ + int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp); + if (tlen <= 0){ + return TSDB_CODE_OUT_OF_MEMORY; + } + void *buf = rpcMallocCont(tlen); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + if(tSerializeSMqHbRsp(buf, tlen, rsp) != 0){ + rpcFreeCont(buf); + return TSDB_CODE_OUT_OF_MEMORY; + } + pMsg->info.rsp = buf; + pMsg->info.rspLen = tlen; + return 0; +} + static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { int32_t code = 0; SMnode *pMnode = pMsg->info.node; @@ -261,7 +316,6 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumer = NULL; if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY; goto end; } @@ -270,7 +324,6 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { mError("consumer:0x%" PRIx64 " not exist", consumerId); - terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; code = TSDB_CODE_MND_CONSUMER_NOT_EXIST; goto end; } @@ -284,88 +337,152 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { int32_t status = atomic_load_32(&pConsumer->status); if (status == MQ_CONSUMER_STATUS_LOST) { - mInfo("try to recover consumer:0x%" PRIx64 "", consumerId); - SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); - - pRecoverMsg->consumerId = consumerId; - SRpcMsg pRpcMsg = { - .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER, - .pCont = pRecoverMsg, - .contLen = sizeof(SMqConsumerRecoverMsg), - .info = pMsg->info, - }; - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); + mInfo("try to recover consumer:0x%" PRIx64, consumerId); + mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info); } - for (int i = 0; i < taosArrayGetSize(req.topics); i++) { - TopicOffsetRows *data = taosArrayGet(req.topics, i); - mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName); - - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); - if (pSub == NULL) { -#ifdef TMQ_DEBUG - ASSERT(0); -#endif - continue; - } - taosWLockLatch(&pSub->lock); - SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t)); - if (pConsumerEp) { - taosArrayDestroy(pConsumerEp->offsetRows); - pConsumerEp->offsetRows = data->offsetRows; - data->offsetRows = NULL; - } - taosWUnLockLatch(&pSub->lock); - - mndReleaseSubscribe(pMnode, pSub); - } - - // encode rsp - int32_t tlen = tSerializeSMqHbRsp(NULL, 0, &rsp); - void *buf = rpcMallocCont(tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } - - tSerializeSMqHbRsp(buf, tlen, &rsp); - pMsg->info.rsp = buf; - pMsg->info.rspLen = tlen; + storeOffsetRows(pMnode, &req, pConsumer); + code = buildMqHbRsp(pMsg, &rsp); end: - tDeatroySMqHbRsp(&rsp); + tDestroySMqHbRsp(&rsp); mndReleaseConsumer(pMnode, pConsumer); - tDeatroySMqHbReq(&req); + tDestroySMqHbReq(&req); return code; } +static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){ + taosRLockLatch(&pConsumer->lock); + + int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics); + + rsp->topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp)); + if (rsp->topics == NULL) { + taosRUnLockLatch(&pConsumer->lock); + return TSDB_CODE_OUT_OF_MEMORY; + } + + // handle all topics subscribed by this consumer + for (int32_t i = 0; i < numOfTopics; i++) { + char *topic = taosArrayGetP(pConsumer->currentTopics, i); + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); + // txn guarantees pSub is created + if (pSub == NULL) { + continue; + } + taosRLockLatch(&pSub->lock); + + SMqSubTopicEp topicEp = {0}; + strcpy(topicEp.topic, topic); + + // 2.1 fetch topic schema + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); + if (pTopic == NULL) { + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); + continue; + } + taosRLockLatch(&pTopic->lock); + tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN); + topicEp.schema.nCols = pTopic->schema.nCols; + if (topicEp.schema.nCols) { + topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema)); + memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema)); + } + taosRUnLockLatch(&pTopic->lock); + mndReleaseTopic(pMnode, pTopic); + + // 2.2 iterate all vg assigned to the consumer of that topic + SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t)); + int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs); + + // this customer assigned vgroups + topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp)); + if (topicEp.vgs == NULL) { + taosRUnLockLatch(&pConsumer->lock); + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t j = 0; j < vgNum; j++) { + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); + // char offsetKey[TSDB_PARTITION_KEY_LEN]; + // mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId); + + if (epoch == -1) { + SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId); + if (pVgroup) { + pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup); + mndReleaseVgroup(pMnode, pVgroup); + } + } + // 2.2.1 build vg ep + SMqSubVgEp vgEp = { + .epSet = pVgEp->epSet, + .vgId = pVgEp->vgId, + .offset = -1, + }; + + taosArrayPush(topicEp.vgs, &vgEp); + } + taosArrayPush(rsp->topics, &topicEp); + + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); + } + taosRUnLockLatch(&pConsumer->lock); + return 0; +} + +static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoch, int64_t consumerId){ + // encode rsp + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, rsp); + void *buf = rpcMallocCont(tlen); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SMqRspHead *pHead = buf; + + pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP; + pHead->epoch = serverEpoch; + pHead->consumerId = consumerId; + pHead->walsver = 0; + pHead->walever = 0; + + void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); + tEncodeSMqAskEpRsp(&abuf, rsp); + + // send rsp + pMsg->info.rsp = buf; + pMsg->info.rspLen = tlen; + return 0; +} + static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMqAskEpReq req = {0}; SMqAskEpRsp rsp = {0}; + int32_t code = 0; if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } int64_t consumerId = req.consumerId; - int32_t epoch = req.epoch; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup); - terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; - return -1; + return TSDB_CODE_MND_CONSUMER_NOT_EXIST; } - int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)); - if (ret != 0) { + if (strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)) != 0) { mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup, pConsumer->cgroup); - terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; - goto FAIL; + code = TSDB_CODE_MND_CONSUMER_NOT_EXIST; + goto END; } atomic_store_32(&pConsumer->hbStatus, 0); @@ -374,156 +491,37 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { int32_t status = atomic_load_32(&pConsumer->status); if (status == MQ_CONSUMER_STATUS_LOST) { - mInfo("try to recover consumer:0x%" PRIx64, consumerId); - SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); - - pRecoverMsg->consumerId = consumerId; - SRpcMsg pRpcMsg = { - .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER, - .pCont = pRecoverMsg, - .contLen = sizeof(SMqConsumerRecoverMsg), - .info = pMsg->info, - }; - - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); + mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info); } if (status != MQ_CONSUMER_STATUS_READY) { mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); - terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; - goto FAIL; + code = TSDB_CODE_MND_CONSUMER_NOT_READY; + goto END; } + int32_t epoch = req.epoch; int32_t serverEpoch = atomic_load_32(&pConsumer->epoch); // 2. check epoch, only send ep info when epochs do not match if (epoch != serverEpoch) { - taosRLockLatch(&pConsumer->lock); - mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch, - serverEpoch); - int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics); - - rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp)); - if (rsp.topics == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosRUnLockLatch(&pConsumer->lock); - goto FAIL; + mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", + consumerId, epoch, serverEpoch); + code = addEpSetInfo(pMnode, pConsumer, epoch, &rsp); + if(code != 0){ + goto END; } - - // handle all topics subscribed by this consumer - for (int32_t i = 0; i < numOfTopics; i++) { - char *topic = taosArrayGetP(pConsumer->currentTopics, i); - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); - // txn guarantees pSub is created - if (pSub == NULL) { -#ifdef TMQ_DEBUG - ASSERT(0); -#endif - continue; - } - taosRLockLatch(&pSub->lock); - - SMqSubTopicEp topicEp = {0}; - strcpy(topicEp.topic, topic); - - // 2.1 fetch topic schema - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - if (pTopic == NULL) { -#ifdef TMQ_DEBUG - ASSERT(0); -#endif - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); - continue; - } - taosRLockLatch(&pTopic->lock); - tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN); - topicEp.schema.nCols = pTopic->schema.nCols; - if (topicEp.schema.nCols) { - topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema)); - memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema)); - } - taosRUnLockLatch(&pTopic->lock); - mndReleaseTopic(pMnode, pTopic); - - // 2.2 iterate all vg assigned to the consumer of that topic - SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t)); - int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs); - - // this customer assigned vgroups - topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp)); - if (topicEp.vgs == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosRUnLockLatch(&pConsumer->lock); - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); - goto FAIL; - } - - for (int32_t j = 0; j < vgNum; j++) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); - // char offsetKey[TSDB_PARTITION_KEY_LEN]; - // mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId); - - if (epoch == -1) { - SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId); - if (pVgroup) { - pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup); - mndReleaseVgroup(pMnode, pVgroup); - } - } - // 2.2.1 build vg ep - SMqSubVgEp vgEp = { - .epSet = pVgEp->epSet, - .vgId = pVgEp->vgId, - .offset = -1, - }; - - taosArrayPush(topicEp.vgs, &vgEp); - } - taosArrayPush(rsp.topics, &topicEp); - - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); - } - taosRUnLockLatch(&pConsumer->lock); } - // encode rsp - int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp); - void *buf = rpcMallocCont(tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; - } + code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId); - SMqRspHead *pHead = buf; - - pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP; - pHead->epoch = serverEpoch; - pHead->consumerId = pConsumer->consumerId; - pHead->walsver = 0; - pHead->walever = 0; - - void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqAskEpRsp(&abuf, &rsp); - - // release consumer and free memory +END: tDeleteSMqAskEpRsp(&rsp); mndReleaseConsumer(pMnode, pConsumer); - - // send rsp - pMsg->info.rsp = buf; - pMsg->info.rspLen = tlen; - return 0; - -FAIL: - tDeleteSMqAskEpRsp(&rsp); - mndReleaseConsumer(pMnode, pConsumer); - return -1; + return code; } -int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) { +int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) { SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; @@ -531,7 +529,7 @@ int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *p return 0; } -int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) { +int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) { SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; @@ -539,8 +537,6 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj return 0; } -static void *topicNameDup(void *p) { return taosStrdup((char *)p); } - static void freeItem(void *param) { void *pItem = *(void **)param; if (pItem != NULL) { @@ -548,21 +544,52 @@ static void freeItem(void *param) { } } -int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; - char *msgStr = pMsg->pCont; - int32_t code = -1; +static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){ + pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *)); + pConsumerNew->rebRemovedTopics = taosArrayInit(0, sizeof(void *)); + if(pConsumerNew->rebNewTopics == NULL || pConsumerNew->rebRemovedTopics == NULL){ + return TSDB_CODE_OUT_OF_MEMORY; + } + int32_t newTopicNum = taosArrayGetSize(pConsumerNew->assignedTopics); + int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics); - SCMSubscribeReq subscribe = {0}; - tDeserializeSCMSubscribeReq(msgStr, &subscribe); + int32_t i = 0, j = 0; + while (i < oldTopicNum || j < newTopicNum) { + if (i >= oldTopicNum) { + char *newTopicCopy = taosStrdup(taosArrayGetP(pConsumerNew->assignedTopics, j)); + taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy); + j++; + continue; + } else if (j >= newTopicNum) { + char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i)); + taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy); + i++; + continue; + } else { + char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i); + char *newTopic = taosArrayGetP(pConsumerNew->assignedTopics, j); + int comp = strcmp(oldTopic, newTopic); + if (comp == 0) { + i++; + j++; + continue; + } else if (comp < 0) { + char *oldTopicCopy = taosStrdup(oldTopic); + taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy); + i++; + continue; + } else { + char *newTopicCopy = taosStrdup(newTopic); + taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy); + j++; + continue; + } + } + } + return 0; +} - int64_t consumerId = subscribe.consumerId; - char *cgroup = subscribe.cgroup; - SMqConsumerObj *pExistedConsumer = NULL; - SMqConsumerObj *pConsumerNew = NULL; - STrans *pTrans = NULL; - - SArray *pTopicList = subscribe.topicNames; +static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){ taosArraySort(pTopicList, taosArrayCompareString); taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem); @@ -571,125 +598,102 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i)); if (gNum >= MND_MAX_GROUP_PER_TOPIC) { terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE; - code = terrno; - goto _over; + return -1; } } + return TSDB_CODE_SUCCESS; +} - // check topic existence - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "subscribe"); - if (pTrans == NULL) { - goto _over; - } - - code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user, subscribe.enableReplay); - if (code != TSDB_CODE_SUCCESS) { - goto _over; - } - +static SMqConsumerObj* buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe){ + int64_t consumerId = subscribe->consumerId; + char *cgroup = subscribe->cgroup; + SMqConsumerObj *pConsumerNew = NULL; + SMqConsumerObj *pExistedConsumer = NULL; pExistedConsumer = mndAcquireConsumer(pMnode, consumerId); if (pExistedConsumer == NULL) { - mInfo("receive subscribe request from new consumer:0x%" PRIx64 " cgroup:%s, numOfTopics:%d", consumerId, - subscribe.cgroup, (int32_t)taosArrayGetSize(pTopicList)); + mInfo("receive subscribe request from new consumer:0x%" PRIx64 + ",cgroup:%s, numOfTopics:%d", consumerId, + subscribe->cgroup, (int32_t)taosArrayGetSize(subscribe->topicNames)); - pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); - tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId)); - - pConsumerNew->withTbName = subscribe.withTbName; - pConsumerNew->autoCommit = subscribe.autoCommit; - pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval; - pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg; - - // pConsumerNew->updateType = CONSUMER_UPDATE_SUB; // use insert logic - taosArrayDestroy(pConsumerNew->assignedTopics); - pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); - - // all subscribed topics should re-balance. - taosArrayDestroy(pConsumerNew->rebNewTopics); - pConsumerNew->rebNewTopics = pTopicList; - subscribe.topicNames = NULL; - - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over; - if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; + pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe); + if (pConsumerNew == NULL) { + goto _over; + } } else { int32_t status = atomic_load_32(&pExistedConsumer->status); mInfo("receive subscribe request from existed consumer:0x%" PRIx64 - " cgroup:%s, current status:%d(%s), subscribe topic num: %d", - consumerId, subscribe.cgroup, status, mndConsumerStatusName(status), newTopicNum); + ",cgroup:%s, current status:%d(%s), subscribe topic num: %d", + consumerId, subscribe->cgroup, status, mndConsumerStatusName(status), + (int32_t)taosArrayGetSize(subscribe->topicNames)); if (status != MQ_CONSUMER_STATUS_READY) { terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; goto _over; } - - pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); + pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe); if (pConsumerNew == NULL) { goto _over; } - // set the update type - pConsumerNew->updateType = CONSUMER_UPDATE_SUB; - taosArrayDestroy(pConsumerNew->assignedTopics); - pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); - - int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics); - - int32_t i = 0, j = 0; - while (i < oldTopicNum || j < newTopicNum) { - if (i >= oldTopicNum) { - char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j)); - taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy); - j++; - continue; - } else if (j >= newTopicNum) { - char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i)); - taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy); - i++; - continue; - } else { - char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i); - char *newTopic = taosArrayGetP(pTopicList, j); - int comp = strcmp(oldTopic, newTopic); - if (comp == 0) { - i++; - j++; - continue; - } else if (comp < 0) { - char *oldTopicCopy = taosStrdup(oldTopic); - taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy); - i++; - continue; - } else { - char *newTopicCopy = taosStrdup(newTopic); - taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy); - j++; - continue; - } - } - } - - // no topics need to be rebalanced - if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { + int32_t code = getTopicAddDelete(pExistedConsumer, pConsumerNew); + if (code != 0){ + terrno = code; goto _over; } + } + mndReleaseConsumer(pMnode, pExistedConsumer); + return pConsumerNew; - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over; - if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; +_over: + mndReleaseConsumer(pMnode, pExistedConsumer); + tDeleteSMqConsumerObj(pConsumerNew); + taosArrayDestroyP(subscribe->topicNames, (FDelete)taosMemoryFree); + return NULL; +} + +int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { + SMnode *pMnode = pMsg->info.node; + char *msgStr = pMsg->pCont; + int32_t code = 0; + + SCMSubscribeReq subscribe = {0}; + tDeserializeSCMSubscribeReq(msgStr, &subscribe); + + SMqConsumerObj *pConsumerNew = NULL; + STrans *pTrans = NULL; + + code = checkAndSortTopic(pMnode, subscribe.topicNames); + if(code != TSDB_CODE_SUCCESS){ + goto _over; } + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "subscribe"); + if (pTrans == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _over; + } + + code = validateTopics(pTrans, subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay); + if (code != TSDB_CODE_SUCCESS) { + goto _over; + } + + pConsumerNew = buildSubConsumer(pMnode, &subscribe); + if(pConsumerNew == NULL){ + code = -1; + goto _over; + } + code = mndSetConsumerCommitLogs(pTrans, pConsumerNew); + if (code != 0) goto _over; + + code = mndTransPrepare(pMnode, pTrans); + if (code != 0) goto _over; code = TSDB_CODE_ACTION_IN_PROGRESS; _over: mndTransDrop(pTrans); - - if (pExistedConsumer) { - /*taosRUnLockLatch(&pExistedConsumer->lock);*/ - mndReleaseConsumer(pMnode, pExistedConsumer); - } - - tDeleteSMqConsumerObj(pConsumerNew, true); - + tDeleteSMqConsumerObj(pConsumerNew); taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); return code; } @@ -796,7 +800,7 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) { static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status, mndConsumerStatusName(pConsumer->status)); - tDeleteSMqConsumerObj(pConsumer, false); + tClearSMqConsumerObj(pConsumer); return 0; } @@ -812,48 +816,17 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) { } } -// remove from new topic -static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { - int32_t size = taosArrayGetSize(pConsumer->rebNewTopics); +// remove from topic list +static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) { + int32_t size = taosArrayGetSize(topicList); for (int32_t i = 0; i < size; i++) { - char *p = taosArrayGetP(pConsumer->rebNewTopics, i); + char *p = taosArrayGetP(topicList, i); if (strcmp(pTopic, p) == 0) { - taosArrayRemove(pConsumer->rebNewTopics, i); + taosArrayRemove(topicList, i); taosMemoryFree(p); - mInfo("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId, - pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics)); - break; - } - } -} - -// remove from removed topic -static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { - int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics); - for (int32_t i = 0; i < size; i++) { - char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i); - if (strcmp(pTopic, p) == 0) { - taosArrayRemove(pConsumer->rebRemovedTopics, i); - taosMemoryFree(p); - - mInfo("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d", - pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics)); - break; - } - } -} - -static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { - int32_t sz = taosArrayGetSize(pConsumer->currentTopics); - for (int32_t i = 0; i < sz; i++) { - char *topic = taosArrayGetP(pConsumer->currentTopics, i); - if (strcmp(pTopic, topic) == 0) { - taosArrayRemove(pConsumer->currentTopics, i); - taosMemoryFree(topic); - - mInfo("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d", - pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics)); + mInfo("[rebalance] consumer:0x%" PRIx64 " remove topic:%s in the %s topic list, remain newTopics:%d", + consumerId, pTopic, type, (int)taosArrayGetSize(topicList)); break; } } @@ -887,60 +860,38 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->subscribeTime = taosGetTimestampMs(); pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; - mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer", pOldConsumer->consumerId); - // } else if (pNewConsumer->updateType == CONSUMER_UPDATE_TIMER_LOST) { - // int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics); - // for (int32_t i = 0; i < sz; i++) { - // char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i)); - // taosArrayPush(pOldConsumer->rebRemovedTopics, &topic); - // } - // - // int32_t prevStatus = pOldConsumer->status; - // pOldConsumer->status = MQ_CONSUMER_STATUS_LOST; - // mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", - // reb-removed-topics:%d", - // pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), - // mndConsumerStatusName(pOldConsumer->status), pOldConsumer->rebalanceTime, - // (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); + mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId); } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) { int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i)); taosArrayPush(pOldConsumer->rebNewTopics, &topic); } - pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; - mInfo("consumer:0x%" PRIx64 " timer update, timer recover", pOldConsumer->consumerId); + mInfo("consumer:0x%" PRIx64 " recover update", pOldConsumer->consumerId); } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) { atomic_add_fetch_32(&pOldConsumer->epoch, 1); pOldConsumer->rebalanceTime = taosGetTimestampMs(); - mInfo("consumer:0x%" PRIx64 " reb update, only rebalance time", pOldConsumer->consumerId); + mInfo("[rebalance] consumer:0x%" PRIx64 " rebalance update, only rebalance time", pOldConsumer->consumerId); } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) { char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); - - // check if exist in current topic - removeFromNewTopicList(pOldConsumer, pNewTopic); - - // add to current topic + removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new"); bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic); if (existing) { - mError("consumer:0x%" PRIx64 "new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic); + mError("[rebalance] consumer:0x%" PRIx64 " add new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic); taosMemoryFree(pNewTopic); - } else { // added into current topic list + } else { taosArrayPush(pOldConsumer->currentTopics, &pNewTopic); taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString); } - // set status int32_t status = pOldConsumer->status; updateConsumerStatus(pOldConsumer); - - // the re-balance is triggered when the new consumer is launched. pOldConsumer->rebalanceTime = taosGetTimestampMs(); - atomic_add_fetch_32(&pOldConsumer->epoch, 1); - mInfo("consumer:0x%" PRIx64 " reb update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 + + mInfo("[rebalance] consumer:0x%" PRIx64 " rebalance update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d, newTopics:%d, removeTopics:%d", pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status, mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime, @@ -948,22 +899,16 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) { - char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0); + char *topic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0); + removeFromTopicList(pOldConsumer->rebRemovedTopics, topic, pOldConsumer->consumerId, "remove"); + removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current"); - // remove from removed topic - removeFromRemoveTopicList(pOldConsumer, removedTopic); - - // remove from current topic - removeFromCurrentTopicList(pOldConsumer, removedTopic); - - // set status int32_t status = pOldConsumer->status; updateConsumerStatus(pOldConsumer); - pOldConsumer->rebalanceTime = taosGetTimestampMs(); atomic_add_fetch_32(&pOldConsumer->epoch, 1); - mInfo("consumer:0x%" PRIx64 " reb update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 + mInfo("[rebalance]consumer:0x%" PRIx64 " rebalance update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d, newTopics:%d, removeTopics:%d", pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status, mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime, diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 5be641d1c2..3f69c7def3 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -249,7 +249,9 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) { return (void *)buf; } -SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup) { +static void *topicNameDup(void *p) { return taosStrdup((char *)p); } + +SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe) { SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj)); if (pConsumer == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -264,36 +266,64 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup) { pConsumer->hbStatus = 0; taosInitRWLatch(&pConsumer->lock); + pConsumer->createTime = taosGetTimestampMs(); + pConsumer->updateType = updateType; - pConsumer->currentTopics = taosArrayInit(0, sizeof(void *)); - pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *)); - pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *)); - pConsumer->assignedTopics = taosArrayInit(0, sizeof(void *)); + if (updateType == CONSUMER_ADD_REB){ + pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *)); + if(pConsumer->rebNewTopics == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } - if (pConsumer->currentTopics == NULL || pConsumer->rebNewTopics == NULL || pConsumer->rebRemovedTopics == NULL || - pConsumer->assignedTopics == NULL) { - taosArrayDestroy(pConsumer->currentTopics); - taosArrayDestroy(pConsumer->rebNewTopics); - taosArrayDestroy(pConsumer->rebRemovedTopics); - taosArrayDestroy(pConsumer->assignedTopics); - taosMemoryFree(pConsumer); - return NULL; + char* topicTmp = taosStrdup(topic); + taosArrayPush(pConsumer->rebNewTopics, &topicTmp); + }else if (updateType == CONSUMER_REMOVE_REB) { + pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *)); + if(pConsumer->rebRemovedTopics == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + char* topicTmp = taosStrdup(topic); + taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp); + }else if (updateType == CONSUMER_INSERT_SUB){ + tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId)); + pConsumer->withTbName = subscribe->withTbName; + pConsumer->autoCommit = subscribe->autoCommit; + pConsumer->autoCommitInterval = subscribe->autoCommitInterval; + pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg; + + + pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup); + if (pConsumer->rebNewTopics == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + pConsumer->assignedTopics = subscribe->topicNames; + subscribe->topicNames = NULL; + }else if (updateType == CONSUMER_UPDATE_SUB){ + pConsumer->assignedTopics = subscribe->topicNames; + subscribe->topicNames = NULL; } - pConsumer->createTime = taosGetTimestampMs(); - return pConsumer; + +END: + tDeleteSMqConsumerObj(pConsumer); + return NULL; } -void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer, bool delete) { +void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) { if (pConsumer == NULL) return; taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree); - if (delete) { - taosMemoryFree(pConsumer); - } +} + +void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) { + tClearSMqConsumerObj(pConsumer); + taosMemoryFree(pConsumer); } int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { @@ -548,6 +578,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { } void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { + if (pSub == NULL) return; void *pIter = NULL; while (1) { pIter = taosHashIterate(pSub->consumerHash, pIter); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index fbdfd81cdf..a6d7a24323 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -41,9 +41,9 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg); static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg); static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter); -static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* hash); +static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj* hash); -static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { +static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) { SSdbRaw *pCommitRaw = mndSubActionEncode(pSub); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; @@ -223,47 +223,44 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { return pRebInfo; } -static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) { - int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); - const char *pSubKey = pOutput->pSub->key; +static void pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char* key){ + SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(vgs); + SMqRebOutputVg outputVg = {consumerId, -1, pVgEp}; + taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); + mInfo("[rebalance] sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64, + key, pVgEp->vgId, consumerId); +} +static void processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) { + int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); int32_t actualRemoved = 0; for (int32_t i = 0; i < numOfRemoved; i++) { uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i); - SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); - - // consumer exists till now - if (pConsumerEp) { - actualRemoved++; - - int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); - for (int32_t j = 0; j < consumerVgNum; j++) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); - - SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp}; - taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("sub:%s mq re-balance remove vgId:%d from consumer:0x%" PRIx64, pSubKey, pVgEp->vgId, consumerId); - } - - taosArrayDestroy(pConsumerEp->vgs); - taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); - - // put into removed - taosArrayPush(pOutput->removedConsumers, &consumerId); + if (pConsumerEp == NULL) { + continue; } + + int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); + for (int32_t j = 0; j < consumerVgNum; j++) { + pushVgDataToHash(pConsumerEp->vgs, pHash, consumerId, pOutput->pSub->key); + } + + taosArrayDestroy(pConsumerEp->vgs); + taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); + taosArrayPush(pOutput->removedConsumers, &consumerId); + actualRemoved++; } if (numOfRemoved != actualRemoved) { - mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", pSubKey, numOfRemoved, actualRemoved); + mError("[rebalance] sub:%s mq rebalance removedNum:%d not matched with actual:%d", pOutput->pSub->key, numOfRemoved, actualRemoved); } else { - mInfo("sub:%s removed %d consumers", pSubKey, numOfRemoved); + mInfo("[rebalance] sub:%s removed %d consumers", pOutput->pSub->key, numOfRemoved); } } -static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) { +static void processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) { int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers); - const char *pSubKey = pOutput->pSub->key; for (int32_t i = 0; i < numOfNewConsumers; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i); @@ -274,24 +271,14 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosArrayPush(pOutput->newConsumers, &consumerId); - mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, pSubKey, consumerId); + mInfo("[rebalance] sub:%s mq rebalance add new consumer:0x%" PRIx64, pOutput->pSub->key, consumerId); } } -static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { - const char *pSubKey = pOutput->pSub->key; +static void processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs); - for (int32_t i = 0; i < numOfVgroups; i++) { - SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs); - SMqRebOutputVg rebOutput = { - .oldConsumerId = -1, - .newConsumerId = -1, - .pVgEp = pVgEp, - }; - - taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg)); - mInfo("sub:%s mq re-balance addUnassignedVgroups vgId:%d from unassigned", pSubKey, pVgEp->vgId); + pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key); } } @@ -307,11 +294,9 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { // } //} -static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt, - int32_t imbConsumerNum) { - const char *pSubKey = pOutput->pSub->key; - - int32_t imbCnt = 0; +static void processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt, + int32_t remainderVgCnt) { + int32_t cnt = 0; void *pIter = NULL; while (1) { @@ -323,36 +308,16 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); - // all old consumers still existing need to be modified - // TODO optimize: modify only consumer whose vgs changed taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId); if (consumerVgNum > minVgCnt) { - if (imbCnt < imbConsumerNum) { - // pop until equal minVg + 1 - while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) { - SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); - SMqRebOutputVg outputVg = { - .oldConsumerId = pConsumerEp->consumerId, - .newConsumerId = -1, - .pVgEp = pVgEp, - }; - taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId, - pConsumerEp->consumerId); + if (cnt < remainderVgCnt) { + while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) { // pop until equal minVg + 1 + pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key); } - imbCnt++; + cnt++; } else { - // all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) { - SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); - SMqRebOutputVg outputVg = { - .oldConsumerId = pConsumerEp->consumerId, - .newConsumerId = -1, - .pVgEp = pVgEp, - }; - taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId, - pConsumerEp->consumerId); + pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key); } } } @@ -404,7 +369,7 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput){ } } if(!find){ - mInfo("processRemoveAddVgs old vgId:%d", pVgEp->vgId); + mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEp->vgId); tDeleteSMqVgEp(pVgEp); taosArrayRemove(pConsumerEp->vgs, j); continue; @@ -415,7 +380,7 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput){ if(taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0){ taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs); - mInfo("processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs)); + mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs)); taosArrayDestroy(newVgs); }else{ taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp); @@ -423,194 +388,229 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput){ return totalVgNum; } -static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { - int32_t totalVgNum = processRemoveAddVgs(pMnode, pOutput); - const char *pSubKey = pOutput->pSub->key; - - int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); - int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers); - mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum, - pInput->oldConsumerNum, numOfAdded, numOfRemoved); - - // 1. build temporary hash(vgId -> SMqRebOutputVg) to store vg that need to be assigned - SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - - // 2. check and get actual removed consumers, put their vg into pHash - doRemoveLostConsumers(pOutput, pHash, pInput); - - // 3. if previously no consumer, there are vgs not assigned, put these vg into pHash - addUnassignedVgroups(pOutput, pHash); - - // 4. calc vg number of each consumer - int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved; - - int32_t minVgCnt = 0; - int32_t imbConsumerNum = 0; - - // calc num - if (numOfFinal) { - minVgCnt = totalVgNum / numOfFinal; - imbConsumerNum = totalVgNum % numOfFinal; - mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has 1 more vgroups than avg value", - pSubKey, numOfFinal, minVgCnt, imbConsumerNum); - } else { - mInfo("sub:%s no consumer subscribe this topic", pSubKey); - } - - // 5. remove vgroups from consumers who have more vgroups than the threshold value(minVgCnt or minVgCnt + 1), and then another vg into pHash - transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum); - - // 6. add new consumer into sub - doAddNewConsumers(pOutput, pInput); - - SMqRebOutputVg *pRebVg = NULL; - void *pRemovedIter = NULL; - void *pIter = NULL; - - // 7. extract bgroups from pHash and assign to consumers that do not have enough vgroups - while (1) { - pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); - if (pIter == NULL) { - break; - } - - SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - - // push until equal minVg - while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) { - // iter hash and find one vg - pRemovedIter = taosHashIterate(pHash, pRemovedIter); - if (pRemovedIter == NULL) { - mError("sub:%s removed iter is null, never can reach hear", pSubKey); - break; - } - - pRebVg = (SMqRebOutputVg *)pRemovedIter; - pRebVg->newConsumerId = pConsumerEp->consumerId; - taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); - mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); - } - } - - while (1) { - pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); - if (pIter == NULL) { - break; - } - SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - - if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { - pRemovedIter = taosHashIterate(pHash, pRemovedIter); - if (pRemovedIter == NULL) { - mInfo("sub:%s removed iter is null", pSubKey); - break; - } - - pRebVg = (SMqRebOutputVg *)pRemovedIter; - pRebVg->newConsumerId = pConsumerEp->consumerId; - taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); - mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); - } - } - - // All assigned vg should be put into pOutput->rebVgs - if(pRemovedIter != NULL){ - mError("sub:%s error pRemovedIter should be NULL", pSubKey); - } - while (1) { - pRemovedIter = taosHashIterate(pHash, pRemovedIter); - if (pRemovedIter == NULL) { - break; - } - - SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter; - taosArrayPush(pOutput->rebVgs, pRebOutput); - if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed - taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); // put all vg into unassigned - } - } - +static void processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput){ SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows - if (pSub) { - taosRLockLatch(&pSub->lock); - if (pOutput->pSub->offsetRows == NULL) { - pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows)); - } - pIter = NULL; - while (1) { - pIter = taosHashIterate(pSub->consumerHash, pIter); - if (pIter == NULL) break; - SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)); + if (pSub == NULL) { + return; + } + taosRLockLatch(&pSub->lock); + if (pOutput->pSub->offsetRows == NULL) { + pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows)); + } + void *pIter = NULL; + while (1) { + pIter = taosHashIterate(pSub->consumerHash, pIter); + if (pIter == NULL) break; + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t)); - for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) { - OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j); - bool jump = false; - for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){ - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i); - if(pVgEp->vgId == d1->vgId){ - jump = true; - mInfo("pSub->offsetRows jump, because consumer id:0x%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId); - break; - } - } - if(jump) continue; - bool find = false; - for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) { - OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i); - if (d1->vgId == d2->vgId) { - d2->rows += d1->rows; - d2->offset = d1->offset; - d2->ever = d1->ever; - find = true; - mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows); - break; - } - } - if(!find){ - taosArrayPush(pOutput->pSub->offsetRows, d1); + for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) { + OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j); + bool jump = false; + for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){ + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i); + if(pVgEp->vgId == d1->vgId){ + jump = true; + mInfo("pSub->offsetRows jump, because consumer id:0x%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId); + break; } } + if(jump) continue; + bool find = false; + for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) { + OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i); + if (d1->vgId == d2->vgId) { + d2->rows += d1->rows; + d2->offset = d1->offset; + d2->ever = d1->ever; + find = true; + mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows); + break; + } + } + if(!find){ + taosArrayPush(pOutput->pSub->offsetRows, d1); + } } - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); } + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); +} - // 8. generate logs - mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pSubKey); +static void printRebalanceLog(SMqRebOutputObj *pOutput){ + mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pOutput->pSub->key); for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) { SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i); - mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey, + mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key, pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId); } - pIter = NULL; + void *pIter = NULL; while (1) { pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) break; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; int32_t sz = taosArrayGetSize(pConsumerEp->vgs); - mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz); + mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key, pConsumerEp->consumerId, sz); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); - mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId, + mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId, pConsumerEp->consumerId); } } +} - // 9. clear +static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, const char *pSubKey, + int32_t *minVgCnt, int32_t *remainderVgCnt){ + int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); + int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers); + int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved; + + // calc num + if (numOfFinal != 0) { + *minVgCnt = totalVgNum / numOfFinal; + *remainderVgCnt = totalVgNum % numOfFinal; + } else { + mInfo("[rebalance] sub:%s no consumer subscribe this topic", pSubKey); + } + mInfo("[rebalance] sub:%s mq rebalance %d vgroups, existed consumers:%d, added:%d, removed:%d, minVg:%d remainderVg:%d", + pSubKey, totalVgNum, pInput->oldConsumerNum, numOfAdded, numOfRemoved, *minVgCnt, *remainderVgCnt); +} + +static void assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt){ + SMqRebOutputVg *pRebVg = NULL; + void *pAssignIter = NULL; + void *pIter = NULL; + + while (1) { + pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); + if (pIter == NULL) { + break; + } + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) { + pAssignIter = taosHashIterate(pHash, pAssignIter); + if (pAssignIter == NULL) { + mError("[rebalance] sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key); + break; + } + + pRebVg = (SMqRebOutputVg *)pAssignIter; + pRebVg->newConsumerId = pConsumerEp->consumerId; + taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); + mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", + pRebVg->pVgEp->vgId, pConsumerEp->consumerId); + } + } + + while (1) { + pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); + if (pIter == NULL) { + break; + } + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { + pAssignIter = taosHashIterate(pHash, pAssignIter); + if (pAssignIter == NULL) { + mInfo("[rebalance] sub:%s assign iter is used up", pOutput->pSub->key); + break; + } + + pRebVg = (SMqRebOutputVg *)pAssignIter; + pRebVg->newConsumerId = pConsumerEp->consumerId; + taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); + mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", + pRebVg->pVgEp->vgId, pConsumerEp->consumerId); + } + } + + taosHashCancelIterate(pOutput->pSub->consumerHash, pIter); + if(pAssignIter != NULL){ + mError("[rebalance]sub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key); + } + while (1) { + pAssignIter = taosHashIterate(pHash, pAssignIter); + if (pAssignIter == NULL) { + break; + } + + SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pAssignIter; + taosArrayPush(pOutput->rebVgs, pRebOutput); + if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed + taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); // put all vg into unassigned + } + } +} + +static void mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { + int32_t totalVgNum = processRemoveAddVgs(pMnode, pOutput); + const char *pSubKey = pOutput->pSub->key; + int32_t minVgCnt = 0; + int32_t remainderVgCnt = 0; + + SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + + processRemovedConsumers(pOutput, pHash, pInput); + processUnassignedVgroups(pOutput, pHash); + calcVgroupsCnt(pInput, totalVgNum, pSubKey, &minVgCnt, &remainderVgCnt); + processModifiedConsumers(pOutput, pHash, minVgCnt, remainderVgCnt); + processNewConsumers(pOutput, pInput); + assignVgroups(pOutput, pHash, minVgCnt); + processSubOffsetRows(pMnode, pInput, pOutput); + printRebalanceLog(pOutput); taosHashCleanup(pHash); +} - return 0; +static int32_t presistConsumerByType(STrans *pTrans, SArray* consumers, int8_t type, char *cgroup, char *topic){ + int32_t code = 0; + SMqConsumerObj *pConsumerNew = NULL; + int32_t consumerNum = taosArrayGetSize(consumers); + for (int32_t i = 0; i < consumerNum; i++) { + int64_t consumerId = *(int64_t *)taosArrayGet(consumers, i); + pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup, type, topic, NULL); + if (pConsumerNew == NULL){ + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + + code = mndSetConsumerCommitLogs(pTrans, pConsumerNew); + if (code != 0) { + goto END; + } + + tDeleteSMqConsumerObj(pConsumerNew); + } + pConsumerNew = NULL; + +END: + tDeleteSMqConsumerObj(pConsumerNew); + return code; +} + +static int32_t mndPresistConsumer(STrans *pTrans, const SMqRebOutputObj *pOutput, char *cgroup, char *topic){ + int32_t code = presistConsumerByType(pTrans, pOutput->modifyConsumers, CONSUMER_UPDATE_REB, cgroup, NULL); + if (code != 0) { + return code; + } + + code = presistConsumerByType(pTrans, pOutput->newConsumers, CONSUMER_ADD_REB, cgroup, topic); + if (code != 0) { + return code; + } + + return presistConsumerByType(pTrans, pOutput->removedConsumers, CONSUMER_REMOVE_REB, cgroup, topic); } static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) { - struct SSubplan* pPlan = NULL; + struct SSubplan *pPlan = NULL; + int32_t code = 0; + STrans *pTrans = NULL; + if(strcmp(pOutput->pSub->qmsg, "") != 0){ - int32_t code = qStringToSubplan(pOutput->pSub->qmsg, &pPlan); - if (code != TSDB_CODE_SUCCESS) { + code = qStringToSubplan(pOutput->pSub->qmsg, &pPlan); + if (code != 0) { terrno = code; - return -1; + goto END; } } @@ -618,110 +618,51 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu char cgroup[TSDB_CGROUP_LEN] = {0}; mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "tmq-reb"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "tmq-reb"); if (pTrans == NULL) { - nodesDestroyNode((SNode*)pPlan); - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; } mndTransSetDbName(pTrans, topic, cgroup); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - mndTransDrop(pTrans); - nodesDestroyNode((SNode*)pPlan); - return -1; + code = mndTransCheckConflict(pMnode, pTrans); + if (code != 0) { + goto END; } - // make txn: // 1. redo action: action to all vg const SArray *rebVgs = pOutput->rebVgs; int32_t vgNum = taosArrayGetSize(rebVgs); for (int32_t i = 0; i < vgNum; i++) { SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i); - if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan) < 0) { - mndTransDrop(pTrans); - nodesDestroyNode((SNode*)pPlan); - return -1; + code = mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan); + if (code != 0) { + goto END; } } - nodesDestroyNode((SNode*)pPlan); - // 2. redo log: subscribe and vg assignment - // subscribe - if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) { - mndTransDrop(pTrans); - return -1; + // 2. commit log: subscribe and vg assignment + code = mndSetSubCommitLogs(pTrans, pOutput->pSub); + if (code != 0) { + goto END; } // 3. commit log: consumer to update status and epoch - // 3.1 set touched consumer - int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers); - for (int32_t i = 0; i < consumerNum; i++) { - int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i); - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE_REB; - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { - tDeleteSMqConsumerObj(pConsumerNew, true); - - mndTransDrop(pTrans); - return -1; - } - - tDeleteSMqConsumerObj(pConsumerNew, true); + code = mndPresistConsumer(pTrans, pOutput, cgroup, topic); + if (code != 0) { + goto END; } - // 3.2 set new consumer - consumerNum = taosArrayGetSize(pOutput->newConsumers); - for (int32_t i = 0; i < consumerNum; i++) { - int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i); - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); - pConsumerNew->updateType = CONSUMER_ADD_REB; - - char* topicTmp = taosStrdup(topic); - taosArrayPush(pConsumerNew->rebNewTopics, &topicTmp); - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { - tDeleteSMqConsumerObj(pConsumerNew, true); - - mndTransDrop(pTrans); - return -1; - } - - tDeleteSMqConsumerObj(pConsumerNew, true); - } - - // 3.3 set removed consumer - consumerNum = taosArrayGetSize(pOutput->removedConsumers); - for (int32_t i = 0; i < consumerNum; i++) { - int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i); - - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); - pConsumerNew->updateType = CONSUMER_REMOVE_REB; - - char* topicTmp = taosStrdup(topic); - taosArrayPush(pConsumerNew->rebRemovedTopics, &topicTmp); - if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { - tDeleteSMqConsumerObj(pConsumerNew, true); - - mndTransDrop(pTrans); - return -1; - } - - tDeleteSMqConsumerObj(pConsumerNew, true); - } - - // 4. TODO commit log: modification log - - // 5. set cb + // 4. set cb mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0); - // 6. execution - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("failed to prepare trans rebalance since %s", terrstr()); - mndTransDrop(pTrans); - return -1; - } + // 5. execution + code = mndTransPrepare(pMnode, pTrans); +END: + nodesDestroyNode((SNode*)pPlan); mndTransDrop(pTrans); - return 0; + return code; } static void freeRebalanceItem(void *param) { @@ -730,10 +671,55 @@ static void freeRebalanceItem(void *param) { taosArrayDestroy(pInfo->removedConsumers); } -static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) { +// type = 0 remove type = 1 add +static void buildRebInfo(SHashObj* rebSubHash, SArray* topicList, int8_t type, char *group, int64_t consumerId){ + int32_t topicNum = taosArrayGetSize(topicList); + for (int32_t i = 0; i < topicNum; i++) { + char key[TSDB_SUBSCRIBE_KEY_LEN]; + char *removedTopic = taosArrayGetP(topicList, i); + mndMakeSubscribeKey(key, group, removedTopic); + SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key); + if(type == 0) + taosArrayPush(pRebSub->removedConsumers, &consumerId); + else if(type == 1) + taosArrayPush(pRebSub->newConsumers, &consumerId); + } +} + +static void checkForVgroupSplit(SMnode *pMnode, SMqConsumerObj *pConsumer, SHashObj* rebSubHash){ + int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics); + for (int32_t i = 0; i < newTopicNum; i++) { + char * topic = taosArrayGetP(pConsumer->currentTopics, i); + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); + if (pSub == NULL) { + continue; + } + taosRLockLatch(&pSub->lock); + + // iterate all vg assigned to the consumer of that topic + SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t)); + int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs); + + for (int32_t j = 0; j < vgNum; j++) { + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); + SVgObj * pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId); + if (!pVgroup) { + char key[TSDB_SUBSCRIBE_KEY_LEN]; + mndMakeSubscribeKey(key, pConsumer->cgroup, topic); + mndGetOrCreateRebSub(rebSubHash, key); + mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId); + } + mndReleaseVgroup(pMnode, pVgroup); + } + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); + } +} + +static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) { SMnode *pMnode = pMsg->info.node; SSdb *pSdb = pMnode->pSdb; - SMqConsumerObj *pConsumer; + SMqConsumerObj *pConsumer = NULL; void *pIter = NULL; // iterate all consumers, find all modification @@ -746,89 +732,32 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) { int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t status = atomic_load_32(&pConsumer->status); - mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d", - pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime, - hbStatus); + mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d", + pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime, hbStatus); if (status == MQ_CONSUMER_STATUS_READY) { - if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info); + if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {// unsubscribe or close + mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { taosRLockLatch(&pConsumer->lock); - int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics); - for (int32_t i = 0; i < topicNum; i++) { - char key[TSDB_SUBSCRIBE_KEY_LEN]; - char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i); - mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic); - SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key); - taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); - } + buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId); taosRUnLockLatch(&pConsumer->lock); }else{ - int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics); - for (int32_t i = 0; i < newTopicNum; i++) { - char * topic = taosArrayGetP(pConsumer->currentTopics, i); - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); - if (pSub == NULL) { - continue; - } - taosRLockLatch(&pSub->lock); - - // 2.2 iterate all vg assigned to the consumer of that topic - SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t)); - int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs); - - for (int32_t j = 0; j < vgNum; j++) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); - SVgObj * pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId); - if (!pVgroup) { - char key[TSDB_SUBSCRIBE_KEY_LEN]; - mndMakeSubscribeKey(key, pConsumer->cgroup, topic); - mndGetOrCreateRebSub(rebSubHash, key); - mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId); - } - mndReleaseVgroup(pMnode, pVgroup); - } - taosRUnLockLatch(&pSub->lock); - mndReleaseSubscribe(pMnode, pSub); - } + checkForVgroupSplit(pMnode, pConsumer, rebSubHash); } } else if (status == MQ_CONSUMER_STATUS_LOST) { - if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info); + if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day + mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); } } else { taosRLockLatch(&pConsumer->lock); - - int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics); - for (int32_t i = 0; i < newTopicNum; i++) { - char key[TSDB_SUBSCRIBE_KEY_LEN]; - char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i); - mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic); - SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key); - taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId); - } - - int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics); - for (int32_t i = 0; i < removedTopicNum; i++) { - char key[TSDB_SUBSCRIBE_KEY_LEN]; - char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i); - mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic); - SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key); - taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); - } - - if (newTopicNum == 0 && removedTopicNum == 0 && taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info); - } - + buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId); + buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId); taosRUnLockLatch(&pConsumer->lock); } mndReleaseConsumer(pMnode, pConsumer); } - - return 0; } bool mndRebTryStart() { @@ -847,31 +776,95 @@ void mndRebCntDec() { mInfo("rebalance cnt sub, value:%d", val); } +static void clearRebOutput(SMqRebOutputObj *rebOutput){ + taosArrayDestroy(rebOutput->newConsumers); + taosArrayDestroy(rebOutput->modifyConsumers); + taosArrayDestroy(rebOutput->removedConsumers); + taosArrayDestroy(rebOutput->rebVgs); + tDeleteSubscribeObj(rebOutput->pSub); + taosMemoryFree(rebOutput->pSub); +} + +static int32_t initRebOutput(SMqRebOutputObj *rebOutput) { + rebOutput->newConsumers = taosArrayInit(0, sizeof(int64_t)); + rebOutput->removedConsumers = taosArrayInit(0, sizeof(int64_t)); + rebOutput->modifyConsumers = taosArrayInit(0, sizeof(int64_t)); + rebOutput->rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); + + if (rebOutput->newConsumers == NULL || rebOutput->removedConsumers == NULL || rebOutput->modifyConsumers == NULL || + rebOutput->rebVgs == NULL) { + clearRebOutput(rebOutput); + return TSDB_CODE_OUT_OF_MEMORY; + } + return 0; +} + +static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput){ + const char *key = rebInput->pRebInfo->key; + SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, key); + + if (pSub == NULL) { + // split sub key and extract topic + char topic[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CGROUP_LEN]; + mndSplitSubscribeKey(key, topic, cgroup, true); + + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); + if (pTopic == NULL) { + mError("[rebalance] mq rebalance %s ignored since topic %s doesn't exist", key, topic); + return -1; + } + + taosRLockLatch(&pTopic->lock); + + rebInput->oldConsumerNum = 0; + rebOutput->pSub = mndCreateSubscription(pMnode, pTopic, key); + + if (rebOutput->pSub == NULL) { + mError("[rebalance] mq rebalance %s failed create sub since %s, ignore", key, terrstr()); + taosRUnLockLatch(&pTopic->lock); + mndReleaseTopic(pMnode, pTopic); + return -1; + } + + memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN); + taosRUnLockLatch(&pTopic->lock); + mndReleaseTopic(pMnode, pTopic); + + mInfo("[rebalance] sub topic:%s has no consumers sub yet", key); + } else { + taosRLockLatch(&pSub->lock); + rebInput->oldConsumerNum = taosHashGetSize(pSub->consumerHash); + rebOutput->pSub = tCloneSubscribeObj(pSub); + taosRUnLockLatch(&pSub->lock); + + mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum); + mndReleaseSubscribe(pMnode, pSub); + } + return 0; +} + static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { - int code = 0; - mInfo("start to process mq timer"); + int code = 0; + void *pIter = NULL; + SMnode *pMnode = pMsg->info.node; + mInfo("[rebalance] start to process mq timer"); if (!mndRebTryStart()) { - mInfo("mq rebalance already in progress, do nothing"); + mInfo("[rebalance] mq rebalance already in progress, do nothing"); return code; } SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); if (rebSubHash == NULL) { - mError("failed to create rebalance hashmap"); - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = -1; + code = TSDB_CODE_OUT_OF_MEMORY; goto END; } - taosHashSetFreeFp(rebSubHash, freeRebalanceItem); mndCheckConsumer(pMsg, rebSubHash); - mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash)); + mInfo("[rebalance] mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash)); - // here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction. - void *pIter = NULL; - SMnode *pMnode = pMsg->info.node; while (1) { pIter = taosHashIterate(rebSubHash, pIter); if (pIter == NULL) { @@ -880,92 +873,30 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { SMqRebInputObj rebInput = {0}; SMqRebOutputObj rebOutput = {0}; - rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t)); - rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t)); - rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t)); - rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); - - if (rebOutput.newConsumers == NULL || rebOutput.removedConsumers == NULL || rebOutput.modifyConsumers == NULL || - rebOutput.rebVgs == NULL) { - taosArrayDestroy(rebOutput.newConsumers); - taosArrayDestroy(rebOutput.removedConsumers); - taosArrayDestroy(rebOutput.modifyConsumers); - taosArrayDestroy(rebOutput.rebVgs); - - taosHashCancelIterate(rebSubHash, pIter); - terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("mq re-balance failed, due to out of memory"); - code = -1; + code = initRebOutput(&rebOutput); + if (code != 0){ goto END; } - SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; - SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key); + rebInput.pRebInfo = (SMqRebInfo*)pIter; - rebInput.pRebInfo = pRebInfo; - - if (pSub == NULL) { - // split sub key and extract topic - char topic[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true); - - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - if (pTopic == NULL) { - mError("mq re-balance %s ignored since topic %s doesn't exist", pRebInfo->key, topic); - continue; - } - - taosRLockLatch(&pTopic->lock); - - rebOutput.pSub = mndCreateSubscription(pMnode, pTopic, pRebInfo->key); - - if (rebOutput.pSub == NULL) { - mError("mq rebalance %s failed create sub since %s, ignore", pRebInfo->key, terrstr()); - taosRUnLockLatch(&pTopic->lock); - mndReleaseTopic(pMnode, pTopic); - continue; - } - - memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN); - taosRUnLockLatch(&pTopic->lock); - mndReleaseTopic(pMnode, pTopic); - - rebInput.oldConsumerNum = 0; - mInfo("sub topic:%s has no consumers sub yet", pRebInfo->key); - } else { - taosRLockLatch(&pSub->lock); - rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash); - rebOutput.pSub = tCloneSubscribeObj(pSub); - taosRUnLockLatch(&pSub->lock); - - mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum); - mndReleaseSubscribe(pMnode, pSub); + if (buildRebOutput(pMnode, &rebInput, &rebOutput) != 0){ + continue; } - if (mndDoRebalance(pMnode, &rebInput, &rebOutput) < 0) { - mError("mq re-balance internal error"); - } + mndDoRebalance(pMnode, &rebInput, &rebOutput); - // if add more consumer to balanced subscribe, - // possibly no vg is changed - // when each topic is re-balanced, issue an trans to save the results in sdb. - if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) { + if (mndPersistRebResult(pMnode, pMsg, &rebOutput) != 0) { mError("mq re-balance persist output error, possibly vnode splitted or dropped,msg:%s", terrstr()); } - taosArrayDestroy(rebOutput.newConsumers); - taosArrayDestroy(rebOutput.modifyConsumers); - taosArrayDestroy(rebOutput.removedConsumers); - taosArrayDestroy(rebOutput.rebVgs); - tDeleteSubscribeObj(rebOutput.pSub); - taosMemoryFree(rebOutput.pSub); + clearRebOutput(&rebOutput); } - // reset flag - mInfo("mq re-balance completed successfully"); + mInfo("[rebalance] mq re-balance completed successfully, wait trans finish"); END: + taosHashCancelIterate(rebSubHash, pIter); taosHashCleanup(rebSubHash); mndRebCntDec(); @@ -995,6 +926,28 @@ static int32_t sendDeleteSubToVnode(SMqSubscribeObj *pSub, STrans *pTrans){ return 0; } +static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic){ + void *pIter = NULL; + SMqConsumerObj *pConsumer = NULL; + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); + if (pIter == NULL) { + break; + } + + if (strcmp(cgroup, pConsumer->cgroup) == 0 && taosArrayGetSize(pConsumer->currentTopics) == 0) { + int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer); + if (code != 0) { + sdbRelease(pMnode->pSdb, pConsumer); + sdbCancelFetch(pMnode->pSdb, pIter); + return code; + } + } + sdbRelease(pMnode->pSdb, pConsumer); + } + return 0; +} + static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMDropCgroupReq dropReq = {0}; @@ -1033,41 +986,31 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { goto end; } + mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); mndTransSetDbName(pTrans, dropReq.topic, dropReq.cgroup); code = mndTransCheckConflict(pMnode, pTrans); if (code != 0) { goto end; } - void *pIter = NULL; - SMqConsumerObj *pConsumer; - while (1) { - pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) { - break; - } - - if (strcmp(dropReq.cgroup, pConsumer->cgroup) == 0) { - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info); - } - sdbRelease(pMnode->pSdb, pConsumer); + code = mndDropConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic); + if (code != 0) { + goto end; } - mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); - code = sendDeleteSubToVnode(pSub, pTrans); if (code != 0) { goto end; } - if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { + code = mndSetDropSubCommitLogs(pMnode, pTrans, pSub); + if (code != 0) { mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); - code = -1; goto end; } - if (mndTransPrepare(pMnode, pTrans) < 0) { - code = -1; + code = mndTransPrepare(pMnode, pTrans); + if (code != 0) { goto end; } @@ -1219,12 +1162,11 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc return 0; } -int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) { +void mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) { int32_t tlen = strlen(cgroup); memcpy(key, cgroup, tlen); key[tlen] = TMQ_SEPARATOR; strcpy(key + tlen + 1, topicName); - return 0; } SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) { @@ -1294,40 +1236,13 @@ int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj return 0; } -//int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { -// int32_t code = 0; -// SSdb *pSdb = pMnode->pSdb; -// -// void *pIter = NULL; -// SMqSubscribeObj *pSub = NULL; -// while (1) { -// pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); -// if (pIter == NULL) break; -// -// if (pSub->dbUid != pDb->uid) { -// sdbRelease(pSdb, pSub); -// continue; -// } -// -// if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { -// sdbRelease(pSdb, pSub); -// sdbCancelFetch(pSdb, pIter); -// code = -1; -// break; -// } -// -// sdbRelease(pSdb, pSub); -// } -// -// return code; -//} - int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) { - SSdb *pSdb = pMnode->pSdb; - + SSdb *pSdb = pMnode->pSdb; + int32_t code = 0; void *pIter = NULL; SMqSubscribeObj *pSub = NULL; while (1) { + sdbRelease(pSdb, pSub); pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); if (pIter == NULL) break; @@ -1335,33 +1250,31 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) char cgroup[TSDB_CGROUP_LEN] = {0}; mndSplitSubscribeKey(pSub->key, topic, cgroup, true); if (strcmp(topic, topicName) != 0) { - sdbRelease(pSdb, pSub); continue; } // iter all vnode to delete handle if (taosHashGetSize(pSub->consumerHash) != 0) { - sdbRelease(pSdb, pSub); - terrno = TSDB_CODE_MND_IN_REBALANCE; - sdbCancelFetch(pSdb, pIter); - return -1; - } - if (sendDeleteSubToVnode(pSub, pTrans) != 0) { - sdbRelease(pSdb, pSub); - sdbCancelFetch(pSdb, pIter); - return -1; + code = TSDB_CODE_MND_IN_REBALANCE; + goto END; } - if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { - sdbRelease(pSdb, pSub); - sdbCancelFetch(pSdb, pIter); - return -1; + code = sendDeleteSubToVnode(pSub, pTrans); + if (code != 0) { + goto END; } - sdbRelease(pSdb, pSub); + code = mndSetDropSubCommitLogs(pMnode, pTrans, pSub); + if (code != 0) { + goto END; + } } - return 0; +END: + sdbRelease(pSdb, pSub); + sdbCancelFetch(pSdb, pIter); + + return code; } static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t consumerId, const char* topic, const char* cgroup, SArray* vgs, SArray *offsetRows){ diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index de543f4256..4a0d58a32e 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -353,6 +353,66 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) { return 0; } +static int32_t sendCheckInfoToVnode(STrans *pTrans, SMnode *pMnode, SMqTopicObj *topicObj){ + STqCheckInfo info; + memcpy(info.topic, topicObj->name, TSDB_TOPIC_FNAME_LEN); + info.ntbUid = topicObj->ntbUid; + info.colIdList = topicObj->ntbColIds; + // broadcast forbid alter info + void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = NULL; + int32_t code = 0; + void *buf = NULL; + + while (1) { + // iterate vg + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (!mndVgroupInDb(pVgroup, topicObj->dbUid)) { + sdbRelease(pSdb, pVgroup); + continue; + } + + // encoder check alter info + int32_t len; + tEncodeSize(tEncodeSTqCheckInfo, &info, len, code); + if (code != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + SEncoder encoder; + tEncoderInit(&encoder, abuf, len); + code = tEncodeSTqCheckInfo(&encoder, &info); + if (code < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + tEncoderClear(&encoder); + ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId); + // add redo action + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + action.pCont = buf; + action.contLen = sizeof(SMsgHead) + len; + action.msgType = TDMT_VND_TMQ_ADD_CHECKINFO; + code = mndTransAppendRedoAction(pTrans, &action); + if (code != 0) { + goto END; + } + sdbRelease(pSdb, pVgroup); + buf = NULL; + } + +END: + taosMemoryFree(buf); + sdbRelease(pSdb, pVgroup); + sdbCancelFetch(pSdb, pIter); + return code; +} + static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb, const char *userName) { mInfo("start to create topic:%s", pCreate->name); @@ -396,13 +456,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.withMeta = pCreate->withMeta; if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) { - if (pCreate->withMeta) { - terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - code = terrno; - goto _OUT; - } - topicObj.ast = taosStrdup(pCreate->ast); topicObj.astLen = strlen(pCreate->ast) + 1; @@ -474,59 +527,9 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); if (topicObj.ntbUid != 0) { - STqCheckInfo info; - memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN); - info.ntbUid = topicObj.ntbUid; - info.colIdList = topicObj.ntbColIds; - // broadcast forbid alter info - void *pIter = NULL; - SSdb *pSdb = pMnode->pSdb; - SVgObj *pVgroup = NULL; - while (1) { - // iterate vg - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - if (!mndVgroupInDb(pVgroup, topicObj.dbUid)) { - sdbRelease(pSdb, pVgroup); - continue; - } - - // encoder check alter info - int32_t len; - tEncodeSize(tEncodeSTqCheckInfo, &info, len, code); - if (code < 0) { - sdbRelease(pSdb, pVgroup); - sdbCancelFetch(pSdb, pIter); - goto _OUT; - } - void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - SEncoder encoder; - tEncoderInit(&encoder, abuf, len); - if (tEncodeSTqCheckInfo(&encoder, &info) < 0) { - taosMemoryFree(buf); - sdbRelease(pSdb, pVgroup); - sdbCancelFetch(pSdb, pIter); - code = -1; - goto _OUT; - } - tEncoderClear(&encoder); - ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId); - // add redo action - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgroup); - action.pCont = buf; - action.contLen = sizeof(SMsgHead) + len; - action.msgType = TDMT_VND_TMQ_ADD_CHECKINFO; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(buf); - sdbRelease(pSdb, pVgroup); - sdbCancelFetch(pSdb, pIter); - code = -1; - goto _OUT; - } - buf = NULL; - sdbRelease(pSdb, pVgroup); + code = sendCheckInfoToVnode(pTrans, pMnode, &topicObj); + if (code != 0){ + goto _OUT; } } @@ -618,7 +621,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { tNameFromString(&topicName, createTopicReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); //reuse this function for topic - auditRecord(pReq, pMnode->clusterId, "createTopic", dbname.dbname, topicName.dbname, + auditRecord(pReq, pMnode->clusterId, "createTopic", dbname.dbname, topicName.dbname, createTopicReq.sql, strlen(createTopicReq.sql)); _OVER: @@ -653,9 +656,104 @@ _OVER: return code; } +static bool checkTopic(SArray *topics, char *topicName){ + int32_t sz = taosArrayGetSize(topics); + for (int32_t i = 0; i < sz; i++) { + char *name = taosArrayGetP(topics, i); + if (strcmp(name, topicName) == 0) { + return true; + } + } + return false; +} + +static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){ + int32_t code = 0; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SMqConsumerObj *pConsumer = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); + if (pIter == NULL) { + break; + } + + bool found = checkTopic(pConsumer->assignedTopics, topicName); + if (found){ + if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { + code = mndSetConsumerDropLogs(pTrans, pConsumer); + if (code != 0) { + goto end; + } + sdbRelease(pSdb, pConsumer); + continue; + } + mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", + topicName, pConsumer->consumerId, pConsumer->cgroup); + code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + goto end; + } + + if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) { + code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", + topicName, pConsumer->consumerId, pConsumer->cgroup); + goto end; + } + sdbRelease(pSdb, pConsumer); + } + +end: + sdbRelease(pSdb, pConsumer); + sdbCancelFetch(pSdb, pIter); + return code; +} + +static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic){ + // broadcast to all vnode + void *pIter = NULL; + SVgObj *pVgroup = NULL; + int32_t code = 0; + SSdb *pSdb = pMnode->pSdb; + void *buf = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) { + sdbRelease(pSdb, pVgroup); + continue; + } + + buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN); + if (buf == NULL){ + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId); + memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN); + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + action.pCont = buf; + action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN; + action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO; + code = mndTransAppendRedoAction(pTrans, &action); + if (code != 0) { + taosMemoryFree(buf); + goto end; + } + sdbRelease(pSdb, pVgroup); + } + +end: + sdbRelease(pSdb, pVgroup); + sdbCancelFetch(pSdb, pIter); + return code; +} + static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; SMDropTopicReq dropReq = {0}; int32_t code = 0; SMqTopicObj *pTopic = NULL; @@ -705,68 +803,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { goto end; } - void *pIter = NULL; - SMqConsumerObj *pConsumer; - while (1) { - pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) { - break; - } - - bool found = false; - int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); - for (int32_t i = 0; i < sz; i++) { - char *name = taosArrayGetP(pConsumer->assignedTopics, i); - if (strcmp(name, pTopic->name) == 0) { - found = true; - break; - } - } - if (found){ - if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pReq->info); - mndReleaseConsumer(pMnode, pConsumer); - continue; - } - - mndReleaseConsumer(pMnode, pConsumer); - sdbCancelFetch(pSdb, pIter); - terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", - dropReq.name, pConsumer->consumerId, pConsumer->cgroup); - code = -1; - goto end; - } - - sz = taosArrayGetSize(pConsumer->rebNewTopics); - for (int32_t i = 0; i < sz; i++) { - char *name = taosArrayGetP(pConsumer->rebNewTopics, i); - if (strcmp(name, pTopic->name) == 0) { - mndReleaseConsumer(pMnode, pConsumer); - sdbCancelFetch(pSdb, pIter); - terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", - dropReq.name, pConsumer->consumerId, pConsumer->cgroup); - code = -1; - goto end; - } - } - - sz = taosArrayGetSize(pConsumer->rebRemovedTopics); - for (int32_t i = 0; i < sz; i++) { - char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i); - if (strcmp(name, pTopic->name) == 0) { - mndReleaseConsumer(pMnode, pConsumer); - sdbCancelFetch(pSdb, pIter); - terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)", - dropReq.name, pConsumer->consumerId, pConsumer->cgroup); - code = -1; - goto end; - } - } - - sdbRelease(pSdb, pConsumer); + code = mndDropConsumerByTopic(pMnode, pTrans, dropReq.name); + if (code != 0) { + goto end; } code = mndDropSubByTopic(pMnode, pTrans, dropReq.name); @@ -776,36 +815,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } if (pTopic->ntbUid != 0) { - // broadcast to all vnode - pIter = NULL; - SVgObj *pVgroup = NULL; - - while (1) { - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) { - sdbRelease(pSdb, pVgroup); - continue; - } - - void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN); - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId); - memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN); - - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgroup); - action.pCont = buf; - action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN; - action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO; - code = mndTransAppendRedoAction(pTrans, &action); - if (code != 0) { - taosMemoryFree(buf); - sdbRelease(pSdb, pVgroup); - sdbCancelFetch(pSdb, pIter); - goto end; - } - sdbRelease(pSdb, pVgroup); + code = mndDropCheckInfoByTopic(pMnode, pTrans, pTopic); + if (code != 0) { + goto end; } } @@ -822,7 +834,6 @@ end: SName name = {0}; tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - //reuse this function for topic auditRecord(pReq, pMnode->clusterId, "dropTopic", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 4c403dc18f..e64de9a423 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -88,10 +88,6 @@ int32_t tqMetaOpen(STQ* pTq) { return -1; } -// if (tqMetaRestoreHandle(pTq) < 0) { -// return -1; -// } - if (tqMetaRestoreCheckInfo(pTq) < 0) { return -1; } @@ -167,32 +163,30 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) { void* pVal = NULL; int vLen = 0; SDecoder decoder; + int32_t code = 0; tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { STqCheckInfo info; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { + code = tDecodeSTqCheckInfo(&decoder, &info); + if (code != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tdbFree(pKey); - tdbFree(pVal); - tdbTbcClose(pCur); - return -1; + goto END; } tDecoderClear(&decoder); - if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) { + code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)); + if (code != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tdbFree(pKey); - tdbFree(pVal); - tdbTbcClose(pCur); - return -1; + goto END; } } +END: tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); - return 0; + return code; } int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {