From e11adb498bb324aca884dbf836e851649e8bccff Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 24 Nov 2024 21:26:56 +0800 Subject: [PATCH] fix:[TS-5679] auto commit error --- source/client/src/clientTmq.c | 2 +- tests/system-test/7-tmq/tmq_offset.py | 5 ++ utils/test/c/tmq_offset_test.c | 82 ++++++++++++++++++++++++++- 3 files changed, 85 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 4d6a6fbff7..849526e948 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1873,7 +1873,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { if (tmq->epTimer == NULL){ tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer); } - if (tmq->commitTimer == NULL){ + if (tmq->autoCommit && tmq->commitTimer == NULL){ tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer); } if (tmq->epTimer == NULL || tmq->commitTimer == NULL) { diff --git a/tests/system-test/7-tmq/tmq_offset.py b/tests/system-test/7-tmq/tmq_offset.py index 07d1a4bc04..7eabb50be2 100644 --- a/tests/system-test/7-tmq/tmq_offset.py +++ b/tests/system-test/7-tmq/tmq_offset.py @@ -45,6 +45,11 @@ class TDTestCase: tdLog.exit("tmq_offset_test error!") else: buildPath = tdCom.getBuildPath() + cmdStr0 = '%s/build/bin/tmq_offset_test 5679'%(buildPath) + tdLog.info(cmdStr0) + if os.system(cmdStr0) != 0: + tdLog.exit(cmdStr0) + cmdStr1 = '%s/build/bin/taosBenchmark -i 50 -B 1 -t 1000 -n 100000 -y &'%(buildPath) tdLog.info(cmdStr1) os.system(cmdStr1) diff --git a/utils/test/c/tmq_offset_test.c b/utils/test/c/tmq_offset_test.c index 6be9b38979..1060175f3e 100644 --- a/utils/test/c/tmq_offset_test.c +++ b/utils/test/c/tmq_offset_test.c @@ -80,6 +80,77 @@ int buildData(TAOS* pConn){ return 0; } +void test_ts5679(TAOS* pConn){ + TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t_5679"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists db_ts5679"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists db_ts5679 vgroups 1 wal_retention_period 3600"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create topic t_5679 as database db_ts5679"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "use db_ts5679"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn,"CREATE TABLE `t1` (`ts` TIMESTAMP, `voltage` INT)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into t1 values(now, 1)"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + + tmq_conf_t* conf = tmq_conf_new(); + + tmq_conf_set(conf, "enable.auto.commit", "false"); + tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); + tmq_conf_set(conf, "group.id", "group_id_2"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.with.table.name", "false"); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, "t_5679"); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + while(1){ + pRes = tmq_consumer_poll(tmq, 1000); + if (pRes == NULL){ + break; + } + taosSsleep(3); + } + tmq_topic_assignment* pAssign = NULL; + int32_t numOfAssign = 0; + int32_t code = tmq_get_topic_assignment(tmq, "t_5679", &pAssign, &numOfAssign); + ASSERT (code == 0); + + for(int i = 0; i < numOfAssign; i++){ + int64_t committed = tmq_committed(tmq, "t_5679", pAssign[i].vgId); + printf("committed offset:%"PRId64"\n", committed); + ASSERT(committed == TSDB_CODE_TMQ_NO_COMMITTED); + } + + taos_free_result(pRes); +} + void test_offset(TAOS* pConn){ if(buildData(pConn) != 0){ ASSERT(0); @@ -304,8 +375,13 @@ void test_ts3756(TAOS* pConn){ int main(int argc, char* argv[]) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - test_offset(pConn); - test_ts3756(pConn); + if (argc == 2) { + test_ts5679(pConn); + }else{ + test_offset(pConn); + test_ts3756(pConn); + } + taos_close(pConn); return 0; -} +} \ No newline at end of file