85 lines
1.8 KiB
Python
85 lines
1.8 KiB
Python
import taos
|
|
|
|
conn = taos.connect(
|
|
host="localhost",
|
|
user="root",
|
|
password="taosdata",
|
|
port=6030,
|
|
)
|
|
|
|
db = "power"
|
|
topic = "topic_meters"
|
|
|
|
conn.execute(f"DROP TOPIC IF EXISTS {topic}")
|
|
conn.execute(f"DROP DATABASE IF EXISTS {db}")
|
|
conn.execute(f"CREATE DATABASE {db}")
|
|
|
|
# change database. same as execute "USE db"
|
|
conn.select_db(db)
|
|
|
|
# create super table
|
|
conn.execute(
|
|
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
|
|
)
|
|
|
|
# ANCHOR: create_topic
|
|
# create topic
|
|
conn.execute(
|
|
f"CREATE TOPIC IF NOT EXISTS {topic} AS SELECT ts, current, voltage, phase, groupid, location FROM meters"
|
|
)
|
|
# ANCHOR_END: create_topic
|
|
|
|
# ANCHOR: create_consumer
|
|
from taos.tmq import Consumer
|
|
|
|
consumer = Consumer(
|
|
{
|
|
"group.id": "1",
|
|
"td.connect.user": "root",
|
|
"td.connect.pass": "taosdata",
|
|
"enable.auto.commit": "true",
|
|
}
|
|
)
|
|
# ANCHOR_END: create_consumer
|
|
|
|
# ANCHOR: subscribe
|
|
consumer.subscribe([topic])
|
|
# ANCHOR_END: subscribe
|
|
|
|
try:
|
|
# ANCHOR: consume
|
|
while True:
|
|
res = consumer.poll(1)
|
|
if not res:
|
|
break
|
|
err = res.error()
|
|
if err is not None:
|
|
raise err
|
|
val = res.value()
|
|
|
|
for block in val:
|
|
print(block.fetchall())
|
|
# ANCHOR_END: consume
|
|
|
|
# ANCHOR: assignment
|
|
assignments = consumer.assignment()
|
|
for assignment in assignments:
|
|
print(assignment)
|
|
# ANCHOR_END: assignment
|
|
|
|
# ANCHOR: seek
|
|
offset = taos.tmq.TopicPartition(
|
|
topic=topic,
|
|
partition=assignment.partition,
|
|
offset=0,
|
|
)
|
|
consumer.seek(offset)
|
|
# ANCHOR_END: seek
|
|
finally:
|
|
# ANCHOR: unsubscribe
|
|
consumer.unsubscribe()
|
|
consumer.close()
|
|
# ANCHOR_END: unsubscribe
|
|
|
|
conn.close()
|