From e3b5c61699d33f38619fc727f6bcf938b13e6b1c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 24 Nov 2024 21:05:29 +0800 Subject: [PATCH] fix:[TS-5679] auto commit error --- source/client/src/clientTmq.c | 2 +- utils/test/c/tmq_offset_test.c | 71 ++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 950e0f7f34..bd7b1463d1 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1873,7 +1873,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { if (tmq->epTimer == NULL){ tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer); } - if (tmq->commitTimer == NULL){ + if (tmq->autoCommit && tmq->commitTimer == NULL){ tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer); } if (tmq->epTimer == NULL || tmq->commitTimer == NULL) { diff --git a/utils/test/c/tmq_offset_test.c b/utils/test/c/tmq_offset_test.c index 6be9b38979..43b5af19dc 100644 --- a/utils/test/c/tmq_offset_test.c +++ b/utils/test/c/tmq_offset_test.c @@ -80,6 +80,76 @@ int buildData(TAOS* pConn){ return 0; } +void test_ts5679(TAOS* pConn){ + TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t_5679"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists db_ts5679"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists db_ts5679 vgroups 1 wal_retention_period 3600"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create topic t_5679 as database db_ts5679"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "use db_ts5679"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn,"CREATE TABLE `t1` (`ts` TIMESTAMP, `voltage` INT)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into t1 values(now, 1)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + tmq_conf_t* conf = tmq_conf_new(); + + tmq_conf_set(conf, "enable.auto.commit", "false"); + tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); + tmq_conf_set(conf, "group.id", "group_id_2"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.with.table.name", "false"); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, "t_5679"); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + while(1){ + pRes = tmq_consumer_poll(tmq, 1000); + if (pRes == NULL){ + break; + } + taosSsleep(3); + } + tmq_topic_assignment* pAssign = NULL; + int32_t numOfAssign = 0; + int32_t code = tmq_get_topic_assignment(tmq, "t_5679", &pAssign, &numOfAssign); + ASSERT (code == 0); + + for(int i = 0; i < numOfAssign; i++){ + int64_t committed = tmq_committed(tmq, "t_5679", pAssign[i].vgId); + ASSERT(committed == TSDB_CODE_TMQ_NO_COMMITTED); + } + + taos_free_result(pRes); +} + void test_offset(TAOS* pConn){ if(buildData(pConn) != 0){ ASSERT(0); @@ -306,6 +376,7 @@ int main(int argc, char* argv[]) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); test_offset(pConn); test_ts3756(pConn); + test_ts5679(pConn); taos_close(pConn); return 0; }