56 lines
1.9 KiB
Python
56 lines
1.9 KiB
Python
from taos.tmq import Consumer
|
|
import taos
|
|
|
|
|
|
def init_tmq_env(db, topic):
|
|
conn = taos.connect()
|
|
conn.execute("drop topic if exists {}".format(topic))
|
|
conn.execute("drop database if exists {}".format(db))
|
|
conn.execute("create database if not exists {} wal_retention_period 3600 keep 36500".format(db))
|
|
conn.select_db(db)
|
|
conn.execute(
|
|
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
|
|
conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')")
|
|
conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')")
|
|
conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')")
|
|
conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic))
|
|
conn.execute("insert into tb1 values (now, 1, 1.0, 'tmq test')")
|
|
conn.execute("insert into tb2 values (now, 2, 2.0, 'tmq test')")
|
|
conn.execute("insert into tb3 values (now, 3, 3.0, 'tmq test')")
|
|
|
|
|
|
def cleanup(db, topic):
|
|
conn = taos.connect()
|
|
conn.execute("drop topic if exists {}".format(topic))
|
|
conn.execute("drop database if exists {}".format(db))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
init_tmq_env("tmq_test", "tmq_test_topic") # init env
|
|
consumer = Consumer(
|
|
{
|
|
"group.id": "tg2",
|
|
"td.connect.user": "root",
|
|
"td.connect.pass": "taosdata",
|
|
"enable.auto.commit": "true",
|
|
}
|
|
)
|
|
consumer.subscribe(["tmq_test_topic"])
|
|
|
|
try:
|
|
while True:
|
|
res = consumer.poll(1)
|
|
if not res:
|
|
break
|
|
err = res.error()
|
|
if err is not None:
|
|
raise err
|
|
val = res.value()
|
|
|
|
for block in val:
|
|
print(block.fetchall())
|
|
finally:
|
|
consumer.unsubscribe()
|
|
consumer.close()
|
|
cleanup("tmq_test", "tmq_test_topic")
|