From 4eba8df3b3559cec8840e77b751f2aa015349f62 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 25 Sep 2024 16:17:21 +0800 Subject: [PATCH] enh:[TD-32187]test tmq api --- source/client/src/clientTmq.c | 2 +- source/client/test/tmqTest.cpp | 7 +-- tests/system-test/7-tmq/td-32187.py | 80 +++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 4 deletions(-) create mode 100644 tests/system-test/7-tmq/td-32187.py diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 975d14f3ee..71864d8b9c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -41,7 +41,7 @@ #define SET_ERROR_MSG_TMQ(MSG) \ - if (errstr != NULL) (void)snprintf(errstr, errstrLen, MSG); + if (errstr != NULL && errstrLen > 0) (void)snprintf(errstr, errstrLen, MSG); #define PROCESS_POLL_RSP(FUNC,DATA) \ SDecoder decoder = {0}; \ diff --git a/source/client/test/tmqTest.cpp b/source/client/test/tmqTest.cpp index c7a75e352a..7505e81884 100644 --- a/source/client/test/tmqTest.cpp +++ b/source/client/test/tmqTest.cpp @@ -33,9 +33,10 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } -TEST(testCase, driverInit_Test) { - // taosInitGlobalCfg(); - // taos_init(); +TEST(testCase, tmq_api_Test) { + tmq_conf_t *conf = tmq_conf_new(); + char msg[128] = {0}; + tmq_consumer_new(NULL, msg, -1); } TEST(testCase, create_topic_ctb_Test) { diff --git a/tests/system-test/7-tmq/td-32187.py b/tests/system-test/7-tmq/td-32187.py new file mode 100644 index 0000000000..73c7a88f65 --- /dev/null +++ b/tests/system-test/7-tmq/td-32187.py @@ -0,0 +1,80 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from taos.tmq import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + clientCfgDict = {'debugFlag': 135} + updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict} + # updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict, 'tmqRowSize':1} + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + + def consume_test(self): + + tdSql.execute(f'create database if not exists d1') + tdSql.execute(f'use d1') + tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)') + tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)') + + + tdSql.execute(f'create topic topic_all as select * from st') + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + } + consumer = Consumer(consumer_dict) + + try: + consumer.unsubscribe() + consumer.unsubscribe() + consumer.subscribe(["topic_all"]) + consumer.subscribe(["topic_all"]) + except TmqError: + tdLog.exit(f"subscribe error") + + cnt = 0 + try: + while True: + res = consumer.poll(2) + if not res: + break + val = res.value() + if val is None: + print(f"null val") + continue + for block in val: + cnt += len(block.fetchall()) + + print(f"block {cnt} rows") + + finally: + consumer.unsubscribe(); + consumer.close() + def run(self): + self.consume_test() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())