From 55eddbfb5eadf004edb519510f7271144baaa4df Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 6 May 2023 10:54:05 +0800 Subject: [PATCH] fix:[TD-23972] push subscribe msg to vnode even though consumer not change --- source/dnode/mnode/impl/src/mndSubscribe.c | 42 ++++++++------ source/dnode/vnode/src/tq/tq.c | 64 ++++++++++------------ 2 files changed, 53 insertions(+), 53 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 573c60549e..75bc595a2e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -269,6 +269,18 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { } } +static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){ + for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){ + SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i); + SMqRebOutputVg outputVg = { + .oldConsumerId = pConsumerEp->consumerId, + .newConsumerId = pConsumerEp->consumerId, + .pVgEp = pVgEp, + }; + taosArrayPush(pOutput->rebVgs, &outputVg); + } +} + static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt, int32_t imbConsumerNum) { const char *pSubKey = pOutput->pSub->key; @@ -290,24 +302,19 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId); if (consumerVgNum > minVgCnt) { if (imbCnt < imbConsumerNum) { - if (consumerVgNum == minVgCnt + 1) { - imbCnt++; - continue; - } else { - // pop until equal minVg + 1 - while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) { - SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); - SMqRebOutputVg outputVg = { - .oldConsumerId = pConsumerEp->consumerId, - .newConsumerId = -1, - .pVgEp = pVgEp, - }; - taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId, - pConsumerEp->consumerId); - } - imbCnt++; + // pop until equal minVg + 1 + while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) { + SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); + SMqRebOutputVg outputVg = { + .oldConsumerId = pConsumerEp->consumerId, + .newConsumerId = -1, + .pVgEp = pVgEp, + }; + taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); + mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId, + pConsumerEp->consumerId); } + imbCnt++; } else { // all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) { @@ -323,6 +330,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas } } } + putNoTransferToOutput(pOutput, pConsumerEp); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 792ff8677e..7004fe0be3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -445,6 +445,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t } int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { + int ret = 0; SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); @@ -463,8 +464,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (req.newConsumerId == -1) { tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId); - taosMemoryFree(req.qmsg); - return 0; + goto end; } STqHandle tqHandle = {0}; @@ -481,8 +481,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg // TODO version should be assigned and refed during preprocess SWalRef* pRef = walRefCommittedVer(pVnode->pWal); if (pRef == NULL) { - taosMemoryFree(req.qmsg); - return -1; + ret = -1; + goto end; } int64_t ver = pRef->refVer; @@ -534,49 +534,41 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey, pHandle->consumerId, oldConsumerId); - if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { - taosMemoryFree(req.qmsg); - return -1; - } + ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); + goto end; } else { if (pHandle->consumerId == req.newConsumerId) { // do nothing tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId); - atomic_store_32(&pHandle->epoch, -1); - atomic_add_fetch_32(&pHandle->epoch, 1); - taosMemoryFree(req.qmsg); - return tqMetaSaveHandle(pTq, req.subKey, pHandle); } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); - - // kill executing task - qTaskInfo_t pTaskInfo = pHandle->execHandle.task; - if (pTaskInfo != NULL) { - qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); - } - - taosWLockLatch(&pTq->lock); - atomic_store_32(&pHandle->epoch, 0); - - // remove if it has been register in the push manager, and return one empty block to consumer - tqUnregisterPushHandle(pTq, pHandle); - atomic_store_64(&pHandle->consumerId, req.newConsumerId); - - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - qStreamCloseTsdbReader(pTaskInfo); - } - - taosWUnLockLatch(&pTq->lock); - if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { - taosMemoryFree(req.qmsg); - return -1; - } } + // kill executing task + qTaskInfo_t pTaskInfo = pHandle->execHandle.task; + if (pTaskInfo != NULL) { + qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); + } + + taosWLockLatch(&pTq->lock); + atomic_add_fetch_32(&pHandle->epoch, 1); + + // remove if it has been register in the push manager, and return one empty block to consumer + tqUnregisterPushHandle(pTq, pHandle); + + + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + qStreamCloseTsdbReader(pTaskInfo); + } + + taosWUnLockLatch(&pTq->lock); + ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); + goto end; } +end: taosMemoryFree(req.qmsg); - return 0; + return ret; } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {