From c6e62d7fb893787f053dc074cc26fc7ee10022f4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 May 2023 17:40:17 +0800 Subject: [PATCH] fix(tmq): update the local offset when retrieving offset from vnode. --- source/client/src/clientTmq.c | 25 +++++++++++++++++++++++++ source/client/test/clientTests.cpp | 2 ++ 2 files changed, 27 insertions(+) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index acf878632d..b9d4e4edab 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2517,6 +2517,31 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a *numOfAssignment = num; } + for (int32_t j = 0; j < (*numOfAssignment); ++j) { + tmq_topic_assignment* p = &(*assignment)[j]; + + for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) { + SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); + if (pClientVg->vgId != p->vgId) { + continue; + } + + SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; + + pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; + + char offsetBuf[80] = {0}; + tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); + + tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf); + + pOffsetInfo->walVerBegin = p->begin; + pOffsetInfo->walVerEnd = p->end; + pOffsetInfo->currentOffset.version = p->currentOffset; + pOffsetInfo->committedOffset.version = p->currentOffset; + } + } + destroyCommonInfo(pCommon); return code; } else { diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index f15a93cb2c..f1d1ad0865 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1122,6 +1122,8 @@ TEST(clientCase, sub_tb_test) { return; } + tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgId, 0); + while (1) { TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); if (pRes != NULL) {