From 94c3eb29d26d7296b8702e60cc16187c5f2ed574 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 29 Jun 2022 23:25:04 +0800 Subject: [PATCH] fix(tmq): rsp error offset --- source/client/src/tmq.c | 4 ++-- source/dnode/vnode/src/tq/tq.c | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index ec2ebdfae9..5493628e74 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -514,6 +514,8 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm pParamSet->userParam = userParam; tsem_init(&pParamSet->rspSem, 0, 0); + int32_t code = -1; + for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); if (strcmp(pTopic->topicName, topic) != 0) continue; @@ -530,8 +532,6 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm } } - int32_t code = -1; - HANDLE_RSP: if (pParamSet->totalRspNum == 0) { tsem_destroy(&pParamSet->rspSem); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aadacf1e72..a08515ed5c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -332,7 +332,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (tqFetchLog(pTq, pHandle, &fetchVer, &pHeadWithCkSum) < 0) { // TODO add push mgr - tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer - 1); + tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer); + ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version); if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } @@ -354,6 +355,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // TODO continue scan until meeting batch requirement if (dataRsp.blockNum > 0 /* threshold */) { tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer); + ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version); if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1;