add python example

This commit is contained in:
menshibin 2024-08-03 18:04:12 +08:00 committed by gccgdb1234
parent 83da226048
commit dc44fb7bd1
13 changed files with 260 additions and 151 deletions

View File

@ -8,7 +8,7 @@ def create_connection():
user="root", user="root",
password="taosdata", password="taosdata",
host="localhost", host="localhost",
port=6041, port=6030,
) )
except Exception as err: except Exception as err:
print(err) print(err)

View File

@ -8,6 +8,9 @@ def create_connection():
user="root", user="root",
password="taosdata", password="taosdata",
timeout=30) timeout=30)
print("Connection established")
except Exception as err: except Exception as err:
print(err) print(err)
finally: finally:

View File

@ -12,23 +12,27 @@ def create_connection():
) )
except Exception as err: except Exception as err:
print(err) print(err)
return conn return conn
# ANCHOR_END: connect
# ANCHOR_END: connect
def create_db_table(conn): def create_db_table(conn):
# ANCHOR: create_db # ANCHOR: create_db
try: try:
conn.execute("CREATE DATABASE IF NOT EXISTS power") conn.execute("CREATE DATABASE IF NOT EXISTS power")
conn.execute("USE power") conn.execute("USE power")
conn.execute("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") conn.execute(
"CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
conn.execute("CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupId, location) TAGS(0, 'Los Angles')") conn.execute("CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupId, location) TAGS(0, 'Los Angles')")
except Exception as err: except Exception as err:
print(f'Exception {err}') print(f'Exception {err}')
# ANCHOR_END: create_db # ANCHOR_END: create_db
def insert(conn): def insert(conn):
# ANCHOR: insert # ANCHOR: insert
sql = """ sql = """
INSERT INTO INSERT INTO
power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) power.d1001 USING power.meters TAGS('California.SanFrancisco', 2)
@ -39,22 +43,26 @@ def insert(conn):
""" """
try: try:
inserted = conn.execute(sql) inserted = conn.execute(sql)
assert inserted == 8 assert inserted == 4
except Exception as err: except Exception as err:
print(f'Exception111 {err}') print(f'Exception {err}')
# ANCHOR_END: insert # ANCHOR_END: insert
def query(conn): def query(conn):
# ANCHOR: query # ANCHOR: query
try: try:
result = conn.query("select * from meters") result = conn.query("select * from meters")
num_of_fields = result.field_count num_of_fields = result.field_count
print(num_of_fields) print(f"query field conunt is {num_of_fields}")
for row in result: for row in result:
print(row) print(row)
except Exception as err: except Exception as err:
print(f'Exception {err}') print(f'query Exception {err}')
# ANCHOR_END: query # ANCHOR_END: query
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,34 +1,76 @@
import taosws import taosws
conn = None db = "power"
try: def prepare():
conn = taosws.connect(user="root", conn = None
password="taosdata",
host="localhost",
port=6041)
db = "power" try:
# create database conn = taosws.connect(user="root",
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}") password="taosdata",
assert rowsAffected == 0 host="192.168.1.98",
port=6041)
# change database.
rowsAffected = conn.execute(f"USE {db}")
assert rowsAffected == 0
# create super table
rowsAffected = conn.execute(
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
)
assert rowsAffected == 0
# create table # create database
rowsAffected = conn.execute("CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupid, location) TAGS(0, 'Los Angles')") conn.execute(f"drop database if exists {db}")
assert rowsAffected == 0 conn.execute(f"create database {db}")
except Exception as err:
print(err)
finally:
if conn:
conn.close()
except Exception as err: def schemaless_insert():
print(err) conn = None
finally:
if conn: lineDemo = [
conn.close() "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
]
telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"]
jsonDemo = [
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
]
try:
conn = taosws.connect(user="root",
password="taosdata",
host="192.168.1.98",
port=6041,
database=db)
conn.schemaless_insert(
lines=lineDemo,
protocol=taosws.PySchemalessProtocol.Line,
precision=taosws.PySchemalessPrecision.Millisecond,
ttl=1,
req_id=1,
)
conn.schemaless_insert(
lines=telnetDemo,
protocol=taosws.PySchemalessProtocol.Telnet,
precision=taosws.PySchemalessPrecision.Microsecond,
ttl=1,
req_id=2,
)
conn.schemaless_insert(
lines=jsonDemo,
protocol=taosws.PySchemalessProtocol.Json,
precision=taosws.PySchemalessPrecision.Millisecond,
ttl=1,
req_id=3,
)
except Exception as err:
print(err)
finally:
if conn:
conn.close()
if __name__ == '__main__':
prepare()
schemaless_insert()

View File

@ -6,7 +6,7 @@ try:
conn = taos.connect(user="root", conn = taos.connect(user="root",
password="taosdata", password="taosdata",
host="localhost", host="localhost",
port=6041) port=6030)
sql = """ sql = """
INSERT INTO INSERT INTO
@ -17,7 +17,7 @@ try:
VALUES (NOW + 1a, 10.30000, 218, 0.25000) VALUES (NOW + 1a, 10.30000, 218, 0.25000)
""" """
affectedRows = conn.execute(sql) affectedRows = conn.execute(sql)
print("inserted into {affectedRows} rows to power.meters successfully.") print(f"inserted into {affectedRows} rows to power.meters successfully.")
except Exception as err: except Exception as err:
print(err) print(err)

View File

@ -16,8 +16,8 @@ try:
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco') power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 218, 0.25000) VALUES (NOW + 1a, 10.30000, 218, 0.25000)
""" """
inserted = conn.execute(sql) affectedRows = conn.execute(sql)
print("inserted into {affectedRows} rows to power.meters successfully.") print(f"inserted into {affectedRows} rows to power.meters successfully.")
except Exception as err: except Exception as err:
print(err) print(err)

View File

@ -15,8 +15,8 @@ try:
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco') power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 218, 0.25000) VALUES (NOW + 1a, 10.30000, 218, 0.25000)
""" """
inserted = conn.execute(sql, 1) affectedRows = conn.execute(sql, 1)
print("inserted into {affectedRows} rows to power.meters successfully.") print(f"inserted into {affectedRows} rows to power.meters successfully.")
result = conn.query("SELECT ts, current, location FROM power.meters limit 100", 2) result = conn.query("SELECT ts, current, location FROM power.meters limit 100", 2)
print(result) print(result)

View File

@ -19,7 +19,7 @@ try:
VALUES (NOW + 1a, 10.30000, 218, 0.25000) VALUES (NOW + 1a, 10.30000, 218, 0.25000)
""" """
affectedRows = conn.execute_with_req_id(sql, req_id=1) affectedRows = conn.execute_with_req_id(sql, req_id=1)
print("inserted into {affectedRows} rows to power.meters successfully.") print(f"inserted into {affectedRows} rows to power.meters successfully.")
result = conn.query_with_req_id("SELECT ts, current, location FROM power.meters limit 100", req_id=2) result = conn.query_with_req_id("SELECT ts, current, location FROM power.meters limit 100", req_id=2)
num_of_fields = result.field_count num_of_fields = result.field_count

View File

@ -1,46 +1,81 @@
import taosws import taosws
conn = None db = "power"
lineDemo = [ def prepare():
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639" conn = None
]
telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"] try:
conn = taosws.connect(user="root",
password="taosdata",
host="taosdata",
port=6041)
jsonDemo = [
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
]
try: # create database
conn = taosws.connect(user="root", conn.execute(f"drop database if exists {db}")
password="taosdata", conn.execute(f"create database {db}")
host="localhost", except Exception as err:
port=6041) print(err)
raise err
finally:
if conn:
conn.close()
conn.execute("CREATE DATABASE IF NOT EXISTS power") def schemaless_insert():
conn = conn.execute("USE power") conn = None
conn.schemaless_insert( lineDemo = [
lines=lineDemo, "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
protocol=taosws.PySchemalessProtocol.Line, ]
precision=taosws.PySchemalessPrecision.Millisecond
)
conn.schemaless_insert( telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"]
lines=telnetDemo,
protocol=taosws.PySchemalessProtocol.Telnet,
precision=taosws.PySchemalessPrecision.Microsecond
)
conn.schemaless_insert( jsonDemo = [
lines=jsonDemo, '{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
protocol=taosws.PySchemalessProtocol.Json, ]
precision=taosws.PySchemalessPrecision.Millisecond
)
except Exception as err:
print(err)
finally:
if conn:
conn.close()
try:
conn = taosws.connect(user="root",
password="taosdata",
host="192.168.1.98",
port=6041,
database=db)
conn.schemaless_insert(
lines=lineDemo,
protocol=taosws.PySchemalessProtocol.Line,
precision=taosws.PySchemalessPrecision.Millisecond,
ttl=1,
req_id=1,
)
conn.schemaless_insert(
lines=telnetDemo,
protocol=taosws.PySchemalessProtocol.Telnet,
precision=taosws.PySchemalessPrecision.Microsecond,
ttl=1,
req_id=2,
)
conn.schemaless_insert(
lines=jsonDemo,
protocol=taosws.PySchemalessProtocol.Json,
precision=taosws.PySchemalessPrecision.Millisecond,
ttl=1,
req_id=3,
)
except Exception as err:
print(err)
raise err
finally:
if conn:
conn.close()
if __name__ == '__main__':
try:
prepare()
schemaless_insert()
except Exception as err:
print(err)

View File

@ -42,7 +42,7 @@ try:
for j in range (numOfRow): for j in range (numOfRow):
timestamps.append(current + i) timestamps.append(current + i)
currents.append(random.random() * 30) currents.append(random.random() * 30)
voltages.append(random.random(100, 300)) voltages.append(random.randint(100, 300))
phases.append(random.random()) phases.append(random.random())
params = taos.new_bind_params(4) params = taos.new_bind_params(4)
@ -52,8 +52,7 @@ try:
params[3].float(phases) params[3].float(phases)
stmt.bind_param_batch(params) stmt.bind_param_batch(params)
stmt.execute() stmt.execute()
affected = stmt.affected_rows() print(f"table {tbname} insert ok.")
print(f"table {tbname} insert {affected} rows.")
except Exception as err: except Exception as err:
print(err) print(err)

View File

@ -38,10 +38,10 @@ try:
currents = [] currents = []
voltages = [] voltages = []
phases = [] phases = []
for j in range (numOfRow): for j in range(numOfRow):
timestamps.append(current + i) timestamps.append(current + i)
currents.append(random.random() * 30) currents.append(random.random() * 30)
voltages.append(random.random(100, 300)) voltages.append(random.randint(100, 300))
phases.append(random.random()) phases.append(random.random())
stmt.bind_param( stmt.bind_param(
@ -55,12 +55,12 @@ try:
stmt.add_batch() stmt.add_batch()
rows = stmt.execute() rows = stmt.execute()
print(f"insert {rows} rows.") print(f"table {tbname} insert ok.")
except Exception as err: except Exception as err:
print(err) print(err)
finally: finally:
if stmt: if stmt:
stmt.close() stmt.close()
if conn: if conn:
conn.close() conn.close()

View File

@ -1,8 +1,9 @@
import taos import taos
def prepareMeta(): def prepareMeta():
conn = None conn = None
try: try:
conn = taos.connect( conn = taos.connect(
host="localhost", host="localhost",
user="root", user="root",
@ -12,14 +13,14 @@ def prepareMeta():
db = "power" db = "power"
topic = "topic_meters" topic = "topic_meters"
conn.execute(f"CREATE DATABASE IF EXISTS {db}") conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
# change database. same as execute "USE db" # change database. same as execute "USE db"
conn.select_db(db) conn.select_db(db)
# create super table # create super table
conn.execute( conn.execute(
"CREATE STABLE IF EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)" "CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
) )
# ANCHOR: create_topic # ANCHOR: create_topic
@ -37,15 +38,20 @@ def prepareMeta():
VALUES (NOW + 1a, 10.30000, 218, 0.25000) VALUES (NOW + 1a, 10.30000, 218, 0.25000)
""" """
affectedRows = conn.execute(sql) affectedRows = conn.execute(sql)
print("inserted into {affectedRows} rows to power.meters successfully.") print(f"inserted into {affectedRows} rows to power.meters successfully.")
except Exception as err: except Exception as err:
print("prepare meta err:{err}") print(f"prepare meta err:{err}")
finally raise err
finally:
if conn: if conn:
conn.close() conn.close()
# ANCHOR: create_consumer
# ANCHOR: create_consumer
from taos.tmq import Consumer from taos.tmq import Consumer
def create_consumer(): def create_consumer():
try: try:
consumer = Consumer( consumer = Consumer(
@ -55,16 +61,18 @@ def create_consumer():
"td.connect.user": "root", "td.connect.user": "root",
"td.connect.pass": "taosdata", "td.connect.pass": "taosdata",
"enable.auto.commit": "true", "enable.auto.commit": "true",
"auto.commit.interval.ms":"1000", "auto.commit.interval.ms": "1000",
"auto.offset.reset": "latest", "auto.offset.reset": "latest",
"td.connect.ip": "192.168.1.98", "td.connect.ip": "localhost",
"td.connect.port": "6041", "td.connect.port": "6030",
} }
) )
return consumer
except Exception as err: except Exception as err:
print("Failed to poll data, err:{err}") print(f"Failed to poll data, err:{err}")
raise err raise err
# ANCHOR_END: create_consumer # ANCHOR_END: create_consumer
# ANCHOR: subscribe # ANCHOR: subscribe
def subscribe(consumer): def subscribe(consumer):
@ -78,15 +86,17 @@ def subscribe(consumer):
if err is not None: if err is not None:
print(f"poll data error, {err}") print(f"poll data error, {err}")
raise err raise err
val = res.value() val = records.value()
if val: if val:
for block in val: for block in val:
print(block.fetchall()) print(block.fetchall())
except Exception as err: except Exception as err:
print("Failed to poll data, err:{err}") print(f"Failed to poll data, err:{err}")
raise err raise err
# ANCHOR_END: subscribe # ANCHOR_END: subscribe
def commit_offset(consumer): def commit_offset(consumer):
@ -101,45 +111,49 @@ def commit_offset(consumer):
if err is not None: if err is not None:
print(f"poll data error, {err}") print(f"poll data error, {err}")
raise err raise err
val = res.value() val = records.value()
if val: if val:
for block in val: for block in val:
print(block.fetchall()) print(block.fetchall())
consumer.commit(records) consumer.commit(records)
except Exception as err: except Exception as err:
print("Failed to poll data, err:{err}") print(f"Failed to poll data, err:{err}")
raise err raise err
# ANCHOR_END: commit_offset # ANCHOR_END: commit_offset
def seek_offset(consumer): def seek_offset(consumer):
# ANCHOR: assignment # ANCHOR: assignment
try: try:
assignments = consumer.assignment() assignments = consumer.assignment()
for partition in assignments: if assignments:
print("first data polled: {partition.offset}") for partition in assignments:
partition.offset = 0 print(f"first data polled: {partition.offset}")
consumer.seek(partition) partition.offset = 0
print("assignment seek to beginning successfully"); consumer.seek(partition)
print(f"assignment seek to beginning successfully");
except Exception as err: except Exception as err:
print("seek example failed; err:{err}") print(f"seek example failed; err:{err}")
raise err raise err
# ANCHOR_END: assignment # ANCHOR_END: assignment
# ANCHOR: unsubscribe # ANCHOR: unsubscribe
def unsubscribe(consumer): def unsubscribe(consumer):
try: try:
consumer.unsubscribe() consumer.unsubscribe()
except Exception as err: except Exception as err:
print("Failed to unsubscribe consumer. err:{err}") print(f"Failed to unsubscribe consumer. err:{err}")
# ANCHOR_END: unsubscribe # ANCHOR_END: unsubscribe
if __name__ == "__main__": if __name__ == "__main__":
consumer = None consumer = None
try: try:
prepareMeta() prepareMeta()
consumer = create_consumer() consumer = create_consumer()
subscribe(consumer) subscribe(consumer)
@ -147,7 +161,7 @@ if __name__ == "__main__":
commit_offset(consumer) commit_offset(consumer)
unsubscribe(consumer) unsubscribe(consumer)
except Exception as err: except Exception as err:
print("Failed to stmt consumer. err:{err}") print(f"Failed to stmt consumer. err:{err}")
finally: finally:
if consumer: if consumer:
consumer.close() consumer.close()

View File

@ -1,24 +1,23 @@
#!/usr/bin/python3 #!/usr/bin/python3
from taosws import taosws
def prepareMeta(): def prepareMeta():
conn = None conn = None
try: try:
conn = taosws.connect(user="root", conn = taosws.connect(user="root",
password="taosdata", password="taosdata",
host="localhost", host="localhost",
port=6041) port=6041)
db = "power" db = "power"
# create database # create database
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}") rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
assert rowsAffected == 0 assert rowsAffected == 0
# change database. # change database.
rowsAffected = conn.execute(f"USE {db}") rowsAffected = conn.execute(f"USE {db}")
assert rowsAffected == 0 assert rowsAffected == 0
# create super table # create super table
rowsAffected = conn.execute( rowsAffected = conn.execute(
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))" "CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
@ -26,7 +25,8 @@ def prepareMeta():
assert rowsAffected == 0 assert rowsAffected == 0
# create table # create table
rowsAffected = conn.execute("CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupid, location) TAGS(0, 'Los Angles')") rowsAffected = conn.execute(
"CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupid, location) TAGS(0, 'Los Angles')")
assert rowsAffected == 0 assert rowsAffected == 0
sql = """ sql = """
@ -37,15 +37,16 @@ def prepareMeta():
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco') power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 218, 0.25000) VALUES (NOW + 1a, 10.30000, 218, 0.25000)
""" """
inserted = conn.execute(sql) affectedRows = conn.execute(sql)
print("inserted into {affectedRows} rows to power.meters successfully.") print(f"inserted into {affectedRows} rows to power.meters successfully.")
except Exception as err: except Exception as err:
print(err) print(f"Failed to prepareMeta {err}")
raise err
finally: finally:
if conn: if conn:
conn.close() conn.close()
# ANCHOR: create_consumer # ANCHOR: create_consumer
def create_consumer(): def create_consumer():
@ -55,15 +56,17 @@ def create_consumer():
"group.id": "group1", "group.id": "group1",
"client.id": "1", "client.id": "1",
"auto.offset.reset": "latest", "auto.offset.reset": "latest",
"td.connect.ip": "192.168.1.98", "td.connect.ip": "localhost",
"td.connect.port": "6041", "td.connect.port": "6041",
"enable.auto.commit": "true", "enable.auto.commit": "true",
"auto.commit.interval.ms":"1000", "auto.commit.interval.ms": "1000",
}) })
return consumer; return consumer;
except Exception as err: except Exception as err:
print(f"Failed to create websocket consumer, err:{err}"); print(f"Failed to create websocket consumer, err:{err}");
raise err raise err
# ANCHOR_END: create_consumer # ANCHOR_END: create_consumer
def seek_offset(consumer): def seek_offset(consumer):
@ -74,21 +77,23 @@ def seek_offset(consumer):
topic = assignment.topic() topic = assignment.topic()
print(f"topic: {topic}") print(f"topic: {topic}")
for assign in assignment.assignments(): for assign in assignment.assignments():
print(f"vg_id: {assign.vg_id()}, offset: {assign.offset()}, begin: {assign.begin()}, end: {assign.end()}") print(
f"vg_id: {assign.vg_id()}, offset: {assign.offset()}, begin: {assign.begin()}, end: {assign.end()}")
consumer.seek(topic, assign.vg_id(), assign.begin()) consumer.seek(topic, assign.vg_id(), assign.begin())
print("assignment seek to beginning successfully"); print("assignment seek to beginning successfully");
except Exception as err: except Exception as err:
print("seek example failed; err:{err}") print(f"seek example failed; err:{err}")
raise err raise err
# ANCHOR_END: assignment # ANCHOR_END: assignment
# ANCHOR: subscribe # ANCHOR: subscribe
def subscribe(consumer): def subscribe(consumer):
try: try:
consumer.subscribe(["topic_meters"]) consumer.subscribe(["topic_meters"])
print("subscribe topics successfully") print("subscribe topics successfully")
for i in range(50): for i in range(5):
records = consumer.poll(timeout=1.0) records = consumer.poll(timeout=1.0)
if records: if records:
for block in records: for block in records:
@ -96,16 +101,16 @@ def subscribe(consumer):
print(row) print(row)
except Exception as err: except Exception as err:
print("Failed to poll data, err:{err}") print(f"Failed to poll data, err:{err}")
raise err raise err
# ANCHOR_END: subscribe # ANCHOR_END: subscribe
# ANCHOR: commit_offset # ANCHOR: commit_offset
def commit_offset(consumer): def commit_offset(consumer):
try: try:
consumer.subscribe(["topic_meters"]) for i in range(5):
print("subscribe topics successfully")
for i in range(50):
records = consumer.poll(timeout=1.0) records = consumer.poll(timeout=1.0)
if records: if records:
for block in records: for block in records:
@ -114,22 +119,25 @@ def commit_offset(consumer):
consumer.commit(records) consumer.commit(records)
except Exception as err: except Exception as err:
print("Failed to poll data, err:{err}") print(f"Failed to poll data, err:{err}")
raise err raise err
# ANCHOR_END: commit_offset
#
# ANCHOR_END: commit_offset
#
# ANCHOR: unsubscribe # ANCHOR: unsubscribe
def unsubscribe(consumer): def unsubscribe(consumer):
try: try:
consumer.unsubscribe() consumer.unsubscribe()
except Exception as err: except Exception as err:
print("Failed to unsubscribe consumer. err:{err}") print("Failed to unsubscribe consumer. err:{err}")
# ANCHOR_END: unsubscribe # ANCHOR_END: unsubscribe
if __name__ == "__main__": if __name__ == "__main__":
consumer = None consumer = None
try: try:
prepareMeta() prepareMeta()
consumer = create_consumer() consumer = create_consumer()
subscribe(consumer) subscribe(consumer)
@ -137,7 +145,7 @@ if __name__ == "__main__":
commit_offset(consumer) commit_offset(consumer)
unsubscribe(consumer) unsubscribe(consumer)
except Exception as err: except Exception as err:
print("Failed to stmt consumer. err:{err}") print(f"Failed to stmt consumer. err:{err}")
finally: finally:
if consumer: if consumer:
consumer.close() consumer.close()