enh:[TD-32187]test tmq api

This commit is contained in:
wangmm0220 2024-09-25 16:17:21 +08:00
parent c9c5c24d85
commit 4eba8df3b3
3 changed files with 85 additions and 4 deletions

View File

@ -41,7 +41,7 @@
#define SET_ERROR_MSG_TMQ(MSG) \ #define SET_ERROR_MSG_TMQ(MSG) \
if (errstr != NULL) (void)snprintf(errstr, errstrLen, MSG); if (errstr != NULL && errstrLen > 0) (void)snprintf(errstr, errstrLen, MSG);
#define PROCESS_POLL_RSP(FUNC,DATA) \ #define PROCESS_POLL_RSP(FUNC,DATA) \
SDecoder decoder = {0}; \ SDecoder decoder = {0}; \

View File

@ -33,9 +33,10 @@ int main(int argc, char** argv) {
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }
TEST(testCase, driverInit_Test) { TEST(testCase, tmq_api_Test) {
// taosInitGlobalCfg(); tmq_conf_t *conf = tmq_conf_new();
// taos_init(); char msg[128] = {0};
tmq_consumer_new(NULL, msg, -1);
} }
TEST(testCase, create_topic_ctb_Test) { TEST(testCase, create_topic_ctb_Test) {

View File

@ -0,0 +1,80 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from taos.tmq import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
clientCfgDict = {'debugFlag': 135}
updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict}
# updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict, 'tmqRowSize':1}
def init(self, conn, logSql, replicaVar=1):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def consume_test(self):
tdSql.execute(f'create database if not exists d1')
tdSql.execute(f'use d1')
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)')
tdSql.execute(f'create topic topic_all as select * from st')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
}
consumer = Consumer(consumer_dict)
try:
consumer.unsubscribe()
consumer.unsubscribe()
consumer.subscribe(["topic_all"])
consumer.subscribe(["topic_all"])
except TmqError:
tdLog.exit(f"subscribe error")
cnt = 0
try:
while True:
res = consumer.poll(2)
if not res:
break
val = res.value()
if val is None:
print(f"null val")
continue
for block in val:
cnt += len(block.fetchall())
print(f"block {cnt} rows")
finally:
consumer.unsubscribe();
consumer.close()
def run(self):
self.consume_test()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())