From f7ab8dabf086ef3740040635059aa36627bc1525 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 19 May 2023 15:14:39 +0800 Subject: [PATCH] fix:[TD-23972] push subscribe msg to vnode even though consumer not change --- source/dnode/mnode/impl/src/mndSubscribe.c | 21 ++++++++++++---- source/dnode/vnode/src/tq/tq.c | 28 ++++++++++++---------- 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 2ae4ede25a..e62102fa77 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -133,10 +133,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg) { - if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { - terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; - return -1; - } +// if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { +// terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; +// return -1; +// } void *buf; int32_t tlen; @@ -269,6 +269,18 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { } } +static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){ + for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){ + SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i); + SMqRebOutputVg outputVg = { + .oldConsumerId = pConsumerEp->consumerId, + .newConsumerId = pConsumerEp->consumerId, + .pVgEp = pVgEp, + }; + taosArrayPush(pOutput->rebVgs, &outputVg); + } +} + static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt, int32_t imbConsumerNum) { const char *pSubKey = pOutput->pSub->key; @@ -318,6 +330,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas } } } + putNoTransferToOutput(pOutput, pConsumerEp); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 41b43026ea..6f619e9bbe 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -569,20 +569,22 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); // atomic_store_32(&pHandle->epoch, 0); - // kill executing task - qTaskInfo_t pTaskInfo = pHandle->execHandle.task; - if (pTaskInfo != NULL) { - qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); - } - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - qStreamCloseTsdbReader(pTaskInfo); - } - // remove if it has been register in the push manager, and return one empty block to consumer - taosWLockLatch(&pTq->lock); - tqUnregisterPushHandle(pTq, pHandle); - taosWUnLockLatch(&pTq->lock); - ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); } + + // kill executing task + qTaskInfo_t pTaskInfo = pHandle->execHandle.task; + if (pTaskInfo != NULL) { + qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); + } + + taosWLockLatch(&pTq->lock); + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + qStreamCloseTsdbReader(pTaskInfo); + } + // remove if it has been register in the push manager, and return one empty block to consumer + tqUnregisterPushHandle(pTq, pHandle); + taosWUnLockLatch(&pTq->lock); + ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); } end: