From 02ac3eac5a634492360f5c15c6f4bae08f73471e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 1 Sep 2023 16:51:44 +0800 Subject: [PATCH] fix:logic error --- source/dnode/vnode/src/tq/tq.c | 3 ++- source/dnode/vnode/src/tq/tqUtil.c | 7 +------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5b848b51bd..a4b10ac858 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1742,6 +1742,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + bool allStopped = false; SStreamTaskNodeUpdateMsg req = {0}; @@ -1787,7 +1788,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { pMeta->closedTask += 1; int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - bool allStopped = (pMeta->closedTask == numOfTasks); + allStopped = (pMeta->closedTask == numOfTasks); if (allStopped) { pMeta->closedTask = 0; } else { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 60d23663d0..79a87f86e4 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -225,12 +225,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, int totalRows = 0; while (1) { int32_t savedEpoch = atomic_load_32(&pHandle->epoch); - if (savedEpoch > pRequest->epoch) { - tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 - ", found new consumer epoch %d, discard req epoch %d", - pRequest->consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); - break; - } + ASSERT(savedEpoch <= pRequest->epoch); if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);