diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 53a534b83a..2a3ef8b129 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2709,16 +2709,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; - pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; - char offsetBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); +// pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; - tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf); +// char offsetBuf[TSDB_OFFSET_LEN] = {0}; +// tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); + + tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset); pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerEnd = p->end; - pOffsetInfo->currentOffset.version = p->currentOffset; - pOffsetInfo->committedOffset.version = p->currentOffset; +// pOffsetInfo->currentOffset.version = p->currentOffset; +// pOffsetInfo->committedOffset.version = p->currentOffset; } } } @@ -2779,25 +2780,26 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; int32_t type = pOffsetInfo->currentOffset.type; - if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) { - tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); - taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_INVALID_PARA; - } - - if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) { - tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", - tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); - taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_INVALID_PARA; - } +// if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) { +// tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); +// taosWUnLockLatch(&tmq->lock); +// return TSDB_CODE_INVALID_PARA; +// } +// +// if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) { +// tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", +// tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); +// taosWUnLockLatch(&tmq->lock); +// return TSDB_CODE_INVALID_PARA; +// } // update the offset, and then commit to vnode - if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) { - pOffsetInfo->currentOffset.version = offset - 1; - pOffsetInfo->committedOffset.version = INT64_MIN; - pVg->seekUpdated = true; - } +// if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) { + pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; + pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0; + pOffsetInfo->committedOffset.version = INT64_MIN; + pVg->seekUpdated = true; +// } SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));