diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 63e8b3097c..a887db301c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -147,8 +147,9 @@ typedef struct { int32_t vgId; int32_t vgStatus; int32_t vgSkipCnt; // here used to mark the slow vgroups - bool receiveInfo; + bool receivedInfoFromVnode;// has already received info from vnode int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data + bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp. SEpSet epSet; } SMqClientVg; @@ -1704,6 +1705,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; + pVg->seekUpdated = false; // reset this flag. pTmq->pollCnt++; return TSDB_CODE_SUCCESS; @@ -1806,8 +1808,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pVg->epSet = *pollRspWrapper->pEpset; } - // update the local offset value only for the returned values. - pVg->offsetInfo.currentOffset = pDataRsp->rspOffset; + // update the local offset value only for the returned values, only when the local offset is NOT updated + // by tmq_offset_seek function + if (!pVg->seekUpdated) { + pVg->offsetInfo.currentOffset = pDataRsp->rspOffset; + } // update the status atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); @@ -1815,7 +1820,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { // update the valid wal version range pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver; pVg->offsetInfo.walVerEnd = pDataRsp->head.walever; - pVg->receiveInfo = true; + pVg->receivedInfoFromVnode = true; char buf[80]; tFormatOffset(buf, 80, &pDataRsp->rspOffset); @@ -2433,7 +2438,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - if (!pClientVg->receiveInfo) { + if (!pClientVg->receivedInfoFromVnode) { needFetch = true; break; } @@ -2623,6 +2628,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) { pOffsetInfo->currentOffset.version = offset; pOffsetInfo->committedOffset.version = INT64_MIN; + pVg->seekUpdated = true; } SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a0da2c1cbe..fbcae1aa42 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -389,6 +389,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) return -1; } + walReaderSeekVer(pHandle->execHandle.pTqReader->pWalReader, vgOffset.offset.val.version); tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 885eb65160..98f08ec7e4 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -255,18 +255,22 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (metaRsp.metaRspLen > 0) { code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId); - tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64, - pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts); + tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 + ",ts:%" PRId64, + pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, + metaRsp.rspOffset.ts); taosMemoryFree(metaRsp.metaRsp); goto end; } tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 - ",ts:%" PRId64,pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,taosxRsp.rspOffset.ts); + ",ts:%" PRId64, + pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, + taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts); if (taosxRsp.blockNum > 0) { code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; - }else { + } else { *offset = taosxRsp.rspOffset; } }