diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index e62102fa77..2ae4ede25a 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -133,10 +133,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg) { -// if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { -// terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; -// return -1; -// } + if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { + terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; + return -1; + } void *buf; int32_t tlen; @@ -269,18 +269,6 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { } } -static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){ - for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){ - SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i); - SMqRebOutputVg outputVg = { - .oldConsumerId = pConsumerEp->consumerId, - .newConsumerId = pConsumerEp->consumerId, - .pVgEp = pVgEp, - }; - taosArrayPush(pOutput->rebVgs, &outputVg); - } -} - static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt, int32_t imbConsumerNum) { const char *pSubKey = pOutput->pSub->key; @@ -330,7 +318,6 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas } } } - putNoTransferToOutput(pOutput, pConsumerEp); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d8ebf72549..e074a2d8a6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -564,29 +564,27 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg goto end; } else { if (pHandle->consumerId == req.newConsumerId) { // do nothing - tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId); + tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId, req.newConsumerId); // atomic_add_fetch_32(&pHandle->epoch, 1); } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); // atomic_store_32(&pHandle->epoch, 0); + // kill executing task + qTaskInfo_t pTaskInfo = pHandle->execHandle.task; + if (pTaskInfo != NULL) { + qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); + } + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + qStreamCloseTsdbReader(pTaskInfo); + } + // remove if it has been register in the push manager, and return one empty block to consumer + taosWLockLatch(&pTq->lock); + tqUnregisterPushHandle(pTq, pHandle); + taosWUnLockLatch(&pTq->lock); + ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); } - // kill executing task - qTaskInfo_t pTaskInfo = pHandle->execHandle.task; - if (pTaskInfo != NULL) { - qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); - } - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - qStreamCloseTsdbReader(pTaskInfo); - } - // remove if it has been register in the push manager, and return one empty block to consumer - taosWLockLatch(&pTq->lock); - tqUnregisterPushHandle(pTq, pHandle); - taosWUnLockLatch(&pTq->lock); - - ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); - goto end; } end: