python language sample program modification
This commit is contained in:
parent
ad0e6ef85e
commit
a9acc43550
|
@ -21,7 +21,7 @@ try:
|
||||||
print(f"Successfully inserted {affectedRows} rows to power.meters.")
|
print(f"Successfully inserted {affectedRows} rows to power.meters.")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to insert data to power.meters, ErrMessage:{err}")
|
print(f"Failed to insert data to power.meters, sql: {sql}, ErrMessage: {err}.")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -20,7 +20,7 @@ try:
|
||||||
print(f"Successfully inserted {affectedRows} rows to power.meters.")
|
print(f"Successfully inserted {affectedRows} rows to power.meters.")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to insert data to power.meters, ErrMessage:{err}")
|
print(f"Failed to insert data to power.meters, sql:{sql}, ErrMessage:{err}.")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -21,7 +21,7 @@ try:
|
||||||
print(f"Successfully inserted {affectedRows} rows to power.meters.")
|
print(f"Successfully inserted {affectedRows} rows to power.meters.")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to insert data to power.meters, ErrMessage:{err}")
|
print(f"Failed to insert data to power.meters, sql: {sql}, ErrMessage: {err}.")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -1,17 +1,24 @@
|
||||||
|
#!/usr/bin/python3
|
||||||
import taos
|
import taos
|
||||||
|
|
||||||
|
db = "power"
|
||||||
|
topic = "topic_meters"
|
||||||
|
user = "root"
|
||||||
|
password = "taosdata"
|
||||||
|
host = "localhost"
|
||||||
|
port = 6030
|
||||||
|
groupId = "group1"
|
||||||
|
clientId = "1"
|
||||||
|
tdConnWsScheme = "ws"
|
||||||
|
autoOffsetReset = "latest"
|
||||||
|
autoCommitState = "true"
|
||||||
|
autoCommitIntv = "1000"
|
||||||
|
|
||||||
|
|
||||||
def prepareMeta():
|
def prepareMeta():
|
||||||
conn = None
|
conn = None
|
||||||
try:
|
try:
|
||||||
conn = taos.connect(
|
conn = taos.connect(host=host, user=user, password=password, port=port)
|
||||||
host="localhost",
|
|
||||||
user="root",
|
|
||||||
password="taosdata",
|
|
||||||
port=6030,
|
|
||||||
)
|
|
||||||
|
|
||||||
db = "power"
|
|
||||||
topic = "topic_meters"
|
|
||||||
conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
|
conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
|
||||||
|
|
||||||
# change database. same as execute "USE db"
|
# change database. same as execute "USE db"
|
||||||
|
@ -39,7 +46,7 @@ def prepareMeta():
|
||||||
affectedRows = conn.execute(sql)
|
affectedRows = conn.execute(sql)
|
||||||
print(f"Inserted into {affectedRows} rows to power.meters successfully.")
|
print(f"Inserted into {affectedRows} rows to power.meters successfully.")
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Prepare insert data error, ErrMessage:{err}")
|
print(f"Failed to prepareMeta, host: {host}:{port}, db: {db}, topic: {topic}, ErrMessage:{err}.")
|
||||||
raise err
|
raise err
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
|
@ -49,28 +56,24 @@ def prepareMeta():
|
||||||
from taos.tmq import Consumer
|
from taos.tmq import Consumer
|
||||||
|
|
||||||
def create_consumer():
|
def create_consumer():
|
||||||
host = "localhost"
|
|
||||||
port = "6030"
|
|
||||||
groupId = "group1"
|
|
||||||
clientId = "1"
|
|
||||||
try:
|
try:
|
||||||
consumer = Consumer(
|
consumer = Consumer(
|
||||||
{
|
{
|
||||||
"group.id": groupId,
|
"group.id": groupId,
|
||||||
"client.id": clientId,
|
"client.id": clientId,
|
||||||
"td.connect.user": "root",
|
"td.connect.user": user,
|
||||||
"td.connect.pass": "taosdata",
|
"td.connect.pass": password,
|
||||||
"enable.auto.commit": "true",
|
"enable.auto.commit": autoCommitState,
|
||||||
"auto.commit.interval.ms": "1000",
|
"auto.commit.interval.ms": autoCommitIntv,
|
||||||
"auto.offset.reset": "latest",
|
"auto.offset.reset": autoOffsetReset,
|
||||||
"td.connect.ip": host,
|
"td.connect.ip": host,
|
||||||
"td.connect.port": port,
|
"td.connect.port": str(port),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}");
|
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}")
|
||||||
return consumer
|
return consumer
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to create native consumer, host: {host}:{port}, ErrMessage:{err}");
|
print(f"Failed to create native consumer, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
|
||||||
raise err
|
raise err
|
||||||
# ANCHOR_END: create_consumer
|
# ANCHOR_END: create_consumer
|
||||||
|
|
||||||
|
@ -96,7 +99,7 @@ def subscribe(consumer):
|
||||||
print(f"data: {data}")
|
print(f"data: {data}")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to poll data, ErrMessage:{err}")
|
print(f"Failed to poll data, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
|
||||||
raise err
|
raise err
|
||||||
|
|
||||||
|
|
||||||
|
@ -123,7 +126,7 @@ def commit_offset(consumer):
|
||||||
print("Commit offset manually successfully.");
|
print("Commit offset manually successfully.");
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to execute commit example, ErrMessage:{err}")
|
print(f"Failed to commit offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
|
||||||
raise err
|
raise err
|
||||||
# ANCHOR_END: commit_offset
|
# ANCHOR_END: commit_offset
|
||||||
|
|
||||||
|
@ -136,9 +139,9 @@ def seek_offset(consumer):
|
||||||
for partition in assignments:
|
for partition in assignments:
|
||||||
partition.offset = 0
|
partition.offset = 0
|
||||||
consumer.seek(partition)
|
consumer.seek(partition)
|
||||||
print(f"Assignment seek to beginning successfully");
|
print(f"Assignment seek to beginning successfully.")
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to execute seek example, ErrMessage:{err}")
|
print(f"Failed to seek offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
|
||||||
raise err
|
raise err
|
||||||
# ANCHOR_END: assignment
|
# ANCHOR_END: assignment
|
||||||
|
|
||||||
|
@ -148,7 +151,7 @@ def unsubscribe(consumer):
|
||||||
consumer.unsubscribe()
|
consumer.unsubscribe()
|
||||||
print("Consumer unsubscribed successfully.");
|
print("Consumer unsubscribed successfully.");
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to unsubscribe consumer. ErrMessage:{err}")
|
print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
|
||||||
finally:
|
finally:
|
||||||
if consumer:
|
if consumer:
|
||||||
consumer.close()
|
consumer.close()
|
||||||
|
@ -163,10 +166,7 @@ if __name__ == "__main__":
|
||||||
subscribe(consumer)
|
subscribe(consumer)
|
||||||
seek_offset(consumer)
|
seek_offset(consumer)
|
||||||
commit_offset(consumer)
|
commit_offset(consumer)
|
||||||
consumer.unsubscribe()
|
|
||||||
print("Consumer unsubscribed successfully.")
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to execute consumer example, ErrMessage:{err}")
|
print(f"Failed to execute consumer example, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
|
||||||
finally:
|
finally:
|
||||||
consumer.unsubscribe()
|
unsubscribe(consumer);
|
||||||
|
|
||||||
|
|
|
@ -1,18 +1,26 @@
|
||||||
#!/usr/bin/python3
|
#!/usr/bin/python3
|
||||||
import taosws
|
import taosws
|
||||||
|
|
||||||
|
db = "power"
|
||||||
topic = "topic_meters"
|
topic = "topic_meters"
|
||||||
|
user = "root"
|
||||||
|
password = "taosdata"
|
||||||
|
host = "localhost"
|
||||||
|
port = 6041
|
||||||
|
groupId = "group1"
|
||||||
|
clientId = "1"
|
||||||
|
tdConnWsScheme = "ws"
|
||||||
|
autoOffsetReset = "latest"
|
||||||
|
autoCommitState = "true"
|
||||||
|
autoCommitIntv = "1000"
|
||||||
|
|
||||||
|
|
||||||
def prepareMeta():
|
def prepareMeta():
|
||||||
conn = None
|
conn = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
conn = taosws.connect(user="root",
|
conn = taosws.connect(user=user, password=password, host=host, port=port)
|
||||||
password="taosdata",
|
|
||||||
host="localhost",
|
|
||||||
port=6041)
|
|
||||||
|
|
||||||
db = "power"
|
|
||||||
# create database
|
# create database
|
||||||
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
|
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
|
||||||
assert rowsAffected == 0
|
assert rowsAffected == 0
|
||||||
|
@ -51,7 +59,7 @@ def prepareMeta():
|
||||||
print(f"Inserted into {affectedRows} rows to power.meters successfully.")
|
print(f"Inserted into {affectedRows} rows to power.meters successfully.")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to prepareMeta ErrMessage:{err}")
|
print(f"Failed to prepareMeta, host: {host}:{port}, db: {db}, topic: {topic}, ErrMessage:{err}.")
|
||||||
raise err
|
raise err
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
|
@ -60,25 +68,21 @@ def prepareMeta():
|
||||||
|
|
||||||
# ANCHOR: create_consumer
|
# ANCHOR: create_consumer
|
||||||
def create_consumer():
|
def create_consumer():
|
||||||
host = "localhost"
|
|
||||||
port = 6041
|
|
||||||
groupId = "group1"
|
|
||||||
clientId = "1"
|
|
||||||
try:
|
try:
|
||||||
consumer = taosws.Consumer(conf={
|
consumer = taosws.Consumer(conf={
|
||||||
"td.connect.websocket.scheme": "ws",
|
"td.connect.websocket.scheme": tdConnWsScheme,
|
||||||
"group.id": groupId,
|
"group.id": groupId,
|
||||||
"client.id": clientId,
|
"client.id": clientId,
|
||||||
"auto.offset.reset": "latest",
|
"auto.offset.reset": autoOffsetReset,
|
||||||
"td.connect.ip": host,
|
"td.connect.ip": host,
|
||||||
"td.connect.port": port,
|
"td.connect.port": port,
|
||||||
"enable.auto.commit": "true",
|
"enable.auto.commit": autoCommitState,
|
||||||
"auto.commit.interval.ms": "1000",
|
"auto.commit.interval.ms": autoCommitIntv,
|
||||||
})
|
})
|
||||||
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}");
|
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}.");
|
||||||
return consumer;
|
return consumer;
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to create websocket consumer, host: {host}:{port}, ErrMessage:{err}");
|
print(f"Failed to create websocket consumer, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.");
|
||||||
raise err
|
raise err
|
||||||
|
|
||||||
|
|
||||||
|
@ -95,10 +99,10 @@ def seek_offset(consumer):
|
||||||
print(
|
print(
|
||||||
f"vg_id: {assign.vg_id()}, offset: {assign.offset()}, begin: {assign.begin()}, end: {assign.end()}")
|
f"vg_id: {assign.vg_id()}, offset: {assign.offset()}, begin: {assign.begin()}, end: {assign.end()}")
|
||||||
consumer.seek(topic, assign.vg_id(), assign.begin())
|
consumer.seek(topic, assign.vg_id(), assign.begin())
|
||||||
print("Assignment seek to beginning successfully");
|
print("Assignment seek to beginning successfully.")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to execute seek example, ErrMessage:{err}")
|
print(f"Failed to seek offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
|
||||||
raise err
|
raise err
|
||||||
# ANCHOR_END: assignment
|
# ANCHOR_END: assignment
|
||||||
|
|
||||||
|
@ -116,7 +120,7 @@ def subscribe(consumer):
|
||||||
print(f"data: {row}")
|
print(f"data: {row}")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to poll data, ErrMessage:{err}")
|
print(f"Failed to poll data, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
|
||||||
raise err
|
raise err
|
||||||
|
|
||||||
|
|
||||||
|
@ -134,10 +138,10 @@ def commit_offset(consumer):
|
||||||
|
|
||||||
# after processing the data, commit the offset manually
|
# after processing the data, commit the offset manually
|
||||||
consumer.commit(records)
|
consumer.commit(records)
|
||||||
print("Commit offset manually successfully.");
|
print("Commit offset manually successfully.")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to execute commit example, ErrMessage:{err}")
|
print(f"Failed to commit offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
|
||||||
raise err
|
raise err
|
||||||
|
|
||||||
|
|
||||||
|
@ -150,7 +154,7 @@ def unsubscribe(consumer):
|
||||||
consumer.unsubscribe()
|
consumer.unsubscribe()
|
||||||
print("Consumer unsubscribed successfully.");
|
print("Consumer unsubscribed successfully.");
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to unsubscribe consumer. ErrMessage:{err}")
|
print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
|
||||||
finally:
|
finally:
|
||||||
if consumer:
|
if consumer:
|
||||||
consumer.close()
|
consumer.close()
|
||||||
|
@ -167,6 +171,6 @@ if __name__ == "__main__":
|
||||||
seek_offset(consumer)
|
seek_offset(consumer)
|
||||||
commit_offset(consumer)
|
commit_offset(consumer)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to execute consumer example, ErrorMessage:{err}")
|
print(f"Failed to execute consumer example, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.")
|
||||||
finally:
|
finally:
|
||||||
unsubscribe(consumer);
|
unsubscribe(consumer);
|
||||||
|
|
Loading…
Reference in New Issue