From e1c4cca33dc1ecb9df30a73e5b0c7031661ff0bb Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 20 Jul 2023 00:07:04 +0800 Subject: [PATCH] feat:add committed & position & commite_offset interface --- source/client/src/clientTmq.c | 2 +- source/client/test/clientTests.cpp | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index f2ea7309e4..3576df434b 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -3106,7 +3106,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ return TSDB_CODE_TMQ_SNAPSHOT_ERROR; } - code = checkWalRange(pOffsetInfo, -1); + code = checkWalRange(pOffsetInfo, offset); if (code != 0) { taosWUnLockLatch(&tmq->lock); return code; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 02443a696c..d88a26cbb2 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1123,6 +1123,9 @@ TEST(clientCase, tmq_commit) { for(int i = 0; i < numOfAssign; i++){ printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId); + printf("committed vgId:%d, committed:%lld\n", pAssign[i].vgId, committed); + int64_t position = tmq_position(tmq, topicName, pAssign[i].vgId); printf("position vgId:%d, position:%lld\n", pAssign[i].vgId, position); tmq_offset_seek(tmq, topicName, pAssign[i].vgId, 1); @@ -1317,6 +1320,7 @@ TEST(clientCase, td_25129) { printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); } + tmq_free_assignment(pAssign); tmq_consumer_close(tmq); taos_close(pConn); fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);