From 55eddbfb5eadf004edb519510f7271144baaa4df Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 6 May 2023 10:54:05 +0800 Subject: [PATCH 1/3] fix:[TD-23972] push subscribe msg to vnode even though consumer not change --- source/dnode/mnode/impl/src/mndSubscribe.c | 42 ++++++++------ source/dnode/vnode/src/tq/tq.c | 64 ++++++++++------------ 2 files changed, 53 insertions(+), 53 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 573c60549e..75bc595a2e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -269,6 +269,18 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { } } +static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){ + for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){ + SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i); + SMqRebOutputVg outputVg = { + .oldConsumerId = pConsumerEp->consumerId, + .newConsumerId = pConsumerEp->consumerId, + .pVgEp = pVgEp, + }; + taosArrayPush(pOutput->rebVgs, &outputVg); + } +} + static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt, int32_t imbConsumerNum) { const char *pSubKey = pOutput->pSub->key; @@ -290,24 +302,19 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId); if (consumerVgNum > minVgCnt) { if (imbCnt < imbConsumerNum) { - if (consumerVgNum == minVgCnt + 1) { - imbCnt++; - continue; - } else { - // pop until equal minVg + 1 - while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) { - SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); - SMqRebOutputVg outputVg = { - .oldConsumerId = pConsumerEp->consumerId, - .newConsumerId = -1, - .pVgEp = pVgEp, - }; - taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId, - pConsumerEp->consumerId); - } - imbCnt++; + // pop until equal minVg + 1 + while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) { + SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); + SMqRebOutputVg outputVg = { + .oldConsumerId = pConsumerEp->consumerId, + .newConsumerId = -1, + .pVgEp = pVgEp, + }; + taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); + mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId, + pConsumerEp->consumerId); } + imbCnt++; } else { // all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) { @@ -323,6 +330,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas } } } + putNoTransferToOutput(pOutput, pConsumerEp); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 792ff8677e..7004fe0be3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -445,6 +445,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t } int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { + int ret = 0; SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); @@ -463,8 +464,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (req.newConsumerId == -1) { tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId); - taosMemoryFree(req.qmsg); - return 0; + goto end; } STqHandle tqHandle = {0}; @@ -481,8 +481,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg // TODO version should be assigned and refed during preprocess SWalRef* pRef = walRefCommittedVer(pVnode->pWal); if (pRef == NULL) { - taosMemoryFree(req.qmsg); - return -1; + ret = -1; + goto end; } int64_t ver = pRef->refVer; @@ -534,49 +534,41 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey, pHandle->consumerId, oldConsumerId); - if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { - taosMemoryFree(req.qmsg); - return -1; - } + ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); + goto end; } else { if (pHandle->consumerId == req.newConsumerId) { // do nothing tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId); - atomic_store_32(&pHandle->epoch, -1); - atomic_add_fetch_32(&pHandle->epoch, 1); - taosMemoryFree(req.qmsg); - return tqMetaSaveHandle(pTq, req.subKey, pHandle); } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); - - // kill executing task - qTaskInfo_t pTaskInfo = pHandle->execHandle.task; - if (pTaskInfo != NULL) { - qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); - } - - taosWLockLatch(&pTq->lock); - atomic_store_32(&pHandle->epoch, 0); - - // remove if it has been register in the push manager, and return one empty block to consumer - tqUnregisterPushHandle(pTq, pHandle); - atomic_store_64(&pHandle->consumerId, req.newConsumerId); - - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - qStreamCloseTsdbReader(pTaskInfo); - } - - taosWUnLockLatch(&pTq->lock); - if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { - taosMemoryFree(req.qmsg); - return -1; - } } + // kill executing task + qTaskInfo_t pTaskInfo = pHandle->execHandle.task; + if (pTaskInfo != NULL) { + qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); + } + + taosWLockLatch(&pTq->lock); + atomic_add_fetch_32(&pHandle->epoch, 1); + + // remove if it has been register in the push manager, and return one empty block to consumer + tqUnregisterPushHandle(pTq, pHandle); + + + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + qStreamCloseTsdbReader(pTaskInfo); + } + + taosWUnLockLatch(&pTq->lock); + ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); + goto end; } +end: taosMemoryFree(req.qmsg); - return 0; + return ret; } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { From 446097434e5a288929fe8264ac9a56c6294fc06c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 6 May 2023 11:06:41 +0800 Subject: [PATCH 2/3] fix:[TD-23972] push subscribe msg to vnode even though consumer not change --- source/dnode/vnode/src/tq/tqUtil.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index d186c63871..57fd271416 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -165,12 +165,19 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, SRpcMsg* pMsg, STqOffsetVal* pOffset) { uint64_t consumerId = pRequest->consumerId; int32_t vgId = TD_VID(pTq->pVnode); + int code = 0; SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); + qTaskInfo_t task = pHandle->execHandle.task; + if(qTaskIsExecuting(task)){ + code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); + tDeleteSMqDataRsp(&dataRsp); + return code; + } qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); - int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); + code = tqScanData(pTq, pHandle, &dataRsp, pOffset); if(code != 0) { goto end; } From 279fe0803f720e2d3ff09ae9a697ff01faebf4e9 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 6 May 2023 18:35:24 +0800 Subject: [PATCH 3/3] fix:[TD-23972] push subscribe msg to vnode even though consumer not change --- source/dnode/mnode/impl/src/mndSubscribe.c | 8 ++++---- source/dnode/vnode/src/tq/tq.c | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 75bc595a2e..b6ab7df68c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -133,10 +133,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg) { - if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { - terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; - return -1; - } +// if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { +// terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; +// return -1; +// } void *buf; int32_t tlen; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7004fe0be3..1661bb4c21 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -539,10 +539,13 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } else { if (pHandle->consumerId == req.newConsumerId) { // do nothing tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId); + atomic_add_fetch_32(&pHandle->epoch, 1); + } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); + atomic_store_32(&pHandle->epoch, 0); } // kill executing task qTaskInfo_t pTaskInfo = pHandle->execHandle.task; @@ -551,8 +554,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } taosWLockLatch(&pTq->lock); - atomic_add_fetch_32(&pHandle->epoch, 1); - // remove if it has been register in the push manager, and return one empty block to consumer tqUnregisterPushHandle(pTq, pHandle);