From 49a193b328789d108db30e0d6d85bac7883d0feb Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 18 Sep 2023 14:46:39 +0800 Subject: [PATCH] fix:send delete subscribe info to vnode if drop consumer --- source/dnode/mnode/impl/src/mndConsumer.c | 7 +++ source/dnode/mnode/impl/src/mndSubscribe.c | 51 ++++++++++++++-------- 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index f9943ed00c..c1494fd0d0 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -402,6 +402,9 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); if(pSub == NULL){ +#ifdef TMQ_DEBUG + ASSERT(0); +#endif continue; } taosWLockLatch(&pSub->lock); @@ -499,7 +502,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); // txn guarantees pSub is created if(pSub == NULL) { +#ifdef TMQ_DEBUG ASSERT(0); +#endif continue; } taosRLockLatch(&pSub->lock); @@ -510,7 +515,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { // 2.1 fetch topic schema SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); if(pTopic == NULL) { +#ifdef TMQ_DEBUG ASSERT(0); +#endif taosRUnLockLatch(&pSub->lock); mndReleaseSubscribe(pMnode, pSub); continue; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index c756341164..a8cd62db09 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -771,6 +771,29 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { return 0; } +static int32_t sendDeleteSubToVnode(SMqSubscribeObj *pSub, STrans *pTrans){ + // iter all vnode to delete handle + int32_t sz = taosArrayGetSize(pSub->unassignedVgs); + for (int32_t i = 0; i < sz; i++) { + SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); + SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); + pReq->head.vgId = htonl(pVgEp->vgId); + pReq->vgId = pVgEp->vgId; + pReq->consumerId = -1; + memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); + STransAction action = {0}; + action.epSet = pVgEp->epSet; + action.pCont = pReq; + action.contLen = sizeof(SMqVDeleteReq); + action.msgType = TDMT_VND_TMQ_DELETE_SUB; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + } + return 0; +} + static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMDropCgroupReq dropReq = {0}; @@ -831,6 +854,11 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); + code = sendDeleteSubToVnode(pSub, pTrans); + if (code != 0) { + goto end; + } + if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); code = -1; @@ -1113,25 +1141,10 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) sdbCancelFetch(pSdb, pIter); return -1; } - int32_t sz = taosArrayGetSize(pSub->unassignedVgs); - for (int32_t i = 0; i < sz; i++) { - SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); - SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); - pReq->head.vgId = htonl(pVgEp->vgId); - pReq->vgId = pVgEp->vgId; - pReq->consumerId = -1; - memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); - STransAction action = {0}; - action.epSet = pVgEp->epSet; - action.pCont = pReq; - action.contLen = sizeof(SMqVDeleteReq); - action.msgType = TDMT_VND_TMQ_DELETE_SUB; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - sdbRelease(pSdb, pSub); - sdbCancelFetch(pSdb, pIter); - return -1; - } + if (sendDeleteSubToVnode(pSub, pTrans) != 0) { + sdbRelease(pSdb, pSub); + sdbCancelFetch(pSdb, pIter); + return -1; } if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {