From 33d6df8717875f1f2001d8ea8dde8339beee279d Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 19 May 2023 19:11:49 +0800 Subject: [PATCH] fix:set task status killed when vnode receive subscribe msg from mnode --- source/dnode/vnode/src/tq/tq.c | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b1e412d5ec..d9961202b1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -367,6 +367,20 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { return -1; } + // 3. update the epoch value + int32_t savedEpoch = pHandle->epoch; + if (savedEpoch <= reqEpoch) { + tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, + reqEpoch); + pHandle->epoch = reqEpoch; + }else { + tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, savedEpoch:%d > reqEpoch:%d ", + consumerId, TD_VID(pTq->pVnode), req.subKey, savedEpoch, reqEpoch); + terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; + taosWUnLockLatch(&pTq->lock); + return -1; + } + bool exec = tqIsHandleExec(pHandle); if(!exec) { tqSetHandleExec(pHandle); @@ -381,14 +395,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { taosMsleep(10); } - // 3. update the epoch value - int32_t savedEpoch = pHandle->epoch; - if (savedEpoch < reqEpoch) { - tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, - reqEpoch); - pHandle->epoch = reqEpoch; - } - char buf[80]; tFormatOffset(buf, 80, &reqOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, @@ -396,6 +402,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); tqSetHandleIdle(pHandle); + tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle); return code; } @@ -561,17 +568,17 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); goto end; } else { + taosWLockLatch(&pTq->lock); + if (pHandle->consumerId == req.newConsumerId) { // do nothing tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId, req.newConsumerId); -// atomic_add_fetch_32(&pHandle->epoch, 1); } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); -// atomic_store_32(&pHandle->epoch, 0); } + atomic_add_fetch_32(&pHandle->epoch, 1); - taosWLockLatch(&pTq->lock); // kill executing task if(tqIsHandleExec(pHandle)) { qTaskInfo_t pTaskInfo = pHandle->execHandle.task;