From 3ad486e95bbff527c0a712c884b29a7c4918f9e2 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 9 Jul 2023 11:34:49 +0800 Subject: [PATCH 1/4] fix:seek failed if type is earliest --- source/client/src/clientTmq.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 5fd79a2711..d9f021211c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2709,17 +2709,16 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; -// pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; + pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; + char offsetBuf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); -// char offsetBuf[TSDB_OFFSET_LEN] = {0}; -// tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); - - tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset); + tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf); pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerEnd = p->end; -// pOffsetInfo->currentOffset.version = p->currentOffset; -// pOffsetInfo->committedOffset.version = p->currentOffset; + pOffsetInfo->currentOffset.version = p->currentOffset; + pOffsetInfo->committedOffset.version = p->currentOffset; } } } From 0929be690dac0d29254bb3328fec7025f2531d2e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 9 Jul 2023 11:54:53 +0800 Subject: [PATCH 2/4] fix:seek to offset - 1 --- source/client/src/clientTmq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index d9f021211c..53a534b83a 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2794,7 +2794,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ // update the offset, and then commit to vnode if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) { - pOffsetInfo->currentOffset.version = offset; + pOffsetInfo->currentOffset.version = offset - 1; pOffsetInfo->committedOffset.version = INT64_MIN; pVg->seekUpdated = true; } From ee50ed4847dd7fa7c26b4eed16956eee9dd4bda3 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 9 Jul 2023 15:31:19 +0800 Subject: [PATCH 3/4] fix:seek failed in initilized statue --- source/client/src/clientTmq.c | 48 ++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 53a534b83a..2a3ef8b129 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2709,16 +2709,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; - pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; - char offsetBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); +// pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; - tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf); +// char offsetBuf[TSDB_OFFSET_LEN] = {0}; +// tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); + + tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset); pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerEnd = p->end; - pOffsetInfo->currentOffset.version = p->currentOffset; - pOffsetInfo->committedOffset.version = p->currentOffset; +// pOffsetInfo->currentOffset.version = p->currentOffset; +// pOffsetInfo->committedOffset.version = p->currentOffset; } } } @@ -2779,25 +2780,26 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; int32_t type = pOffsetInfo->currentOffset.type; - if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) { - tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); - taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_INVALID_PARA; - } - - if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) { - tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", - tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); - taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_INVALID_PARA; - } +// if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) { +// tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); +// taosWUnLockLatch(&tmq->lock); +// return TSDB_CODE_INVALID_PARA; +// } +// +// if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) { +// tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", +// tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); +// taosWUnLockLatch(&tmq->lock); +// return TSDB_CODE_INVALID_PARA; +// } // update the offset, and then commit to vnode - if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) { - pOffsetInfo->currentOffset.version = offset - 1; - pOffsetInfo->committedOffset.version = INT64_MIN; - pVg->seekUpdated = true; - } +// if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) { + pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; + pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0; + pOffsetInfo->committedOffset.version = INT64_MIN; + pVg->seekUpdated = true; +// } SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic)); From 8904f3857b5ef44960072d9d3ec82f9f7f689b06 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 9 Jul 2023 19:16:19 +0800 Subject: [PATCH 4/4] fix:seek failed in initilized status --- source/client/src/clientTmq.c | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 2a3ef8b129..f514bb616c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2780,18 +2780,18 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; int32_t type = pOffsetInfo->currentOffset.type; -// if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) { -// tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); -// taosWUnLockLatch(&tmq->lock); -// return TSDB_CODE_INVALID_PARA; -// } -// -// if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) { -// tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", -// tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); -// taosWUnLockLatch(&tmq->lock); -// return TSDB_CODE_INVALID_PARA; -// } + if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) { + tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_INVALID_PARA; + } + + if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) { + tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", + tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_INVALID_PARA; + } // update the offset, and then commit to vnode // if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) {