diff --git a/docs/examples/python/insert_native.py b/docs/examples/python/insert_native.py index ad7a8b85c2..19dafa3f23 100644 --- a/docs/examples/python/insert_native.py +++ b/docs/examples/python/insert_native.py @@ -21,7 +21,7 @@ try: print(f"Successfully inserted {affectedRows} rows to power.meters.") except Exception as err: - print(f"Failed to insert data to power.meters, ErrMessage:{err}") + print(f"Failed to insert data to power.meters, sql: {sql}, ErrMessage: {err}.") finally: if conn: conn.close() diff --git a/docs/examples/python/insert_rest.py b/docs/examples/python/insert_rest.py index 41fd70857f..526c3a6a69 100644 --- a/docs/examples/python/insert_rest.py +++ b/docs/examples/python/insert_rest.py @@ -20,7 +20,7 @@ try: print(f"Successfully inserted {affectedRows} rows to power.meters.") except Exception as err: - print(f"Failed to insert data to power.meters, ErrMessage:{err}") + print(f"Failed to insert data to power.meters, sql:{sql}, ErrMessage:{err}.") finally: if conn: conn.close() diff --git a/docs/examples/python/insert_ws.py b/docs/examples/python/insert_ws.py index 9c03b4857a..886dda1c10 100644 --- a/docs/examples/python/insert_ws.py +++ b/docs/examples/python/insert_ws.py @@ -21,7 +21,7 @@ try: print(f"Successfully inserted {affectedRows} rows to power.meters.") except Exception as err: - print(f"Failed to insert data to power.meters, ErrMessage:{err}") + print(f"Failed to insert data to power.meters, sql: {sql}, ErrMessage: {err}.") finally: if conn: conn.close() diff --git a/docs/examples/python/tmq_native.py b/docs/examples/python/tmq_native.py index 22a9b805bc..d4ccfda138 100644 --- a/docs/examples/python/tmq_native.py +++ b/docs/examples/python/tmq_native.py @@ -1,17 +1,24 @@ +#!/usr/bin/python3 import taos +db = "power" +topic = "topic_meters" +user = "root" +password = "taosdata" +host = "localhost" +port = 6030 +groupId = "group1" +clientId = "1" +tdConnWsScheme = "ws" +autoOffsetReset = "latest" +autoCommitState = "true" +autoCommitIntv = "1000" + + def prepareMeta(): conn = None try: - conn = taos.connect( - host="localhost", - user="root", - password="taosdata", - port=6030, - ) - - db = "power" - topic = "topic_meters" + conn = taos.connect(host=host, user=user, password=password, port=port) conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}") # change database. same as execute "USE db" @@ -33,13 +40,13 @@ def prepareMeta(): power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco') VALUES (NOW + 1a, 10.30000, 219, 0.31000) (NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000) - power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco') + power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco') VALUES (NOW + 1a, 10.30000, 218, 0.25000) """ affectedRows = conn.execute(sql) print(f"Inserted into {affectedRows} rows to power.meters successfully.") except Exception as err: - print(f"Prepare insert data error, ErrMessage:{err}") + print(f"Failed to prepareMeta, host: {host}:{port}, db: {db}, topic: {topic}, ErrMessage:{err}.") raise err finally: if conn: @@ -49,28 +56,24 @@ def prepareMeta(): from taos.tmq import Consumer def create_consumer(): - host = "localhost" - port = "6030" - groupId = "group1" - clientId = "1" try: consumer = Consumer( { "group.id": groupId, "client.id": clientId, - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "enable.auto.commit": "true", - "auto.commit.interval.ms": "1000", - "auto.offset.reset": "latest", + "td.connect.user": user, + "td.connect.pass": password, + "enable.auto.commit": autoCommitState, + "auto.commit.interval.ms": autoCommitIntv, + "auto.offset.reset": autoOffsetReset, "td.connect.ip": host, - "td.connect.port": port, + "td.connect.port": str(port), } ) - print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}"); + print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}") return consumer except Exception as err: - print(f"Failed to create native consumer, host: {host}:{port}, ErrMessage:{err}"); + print(f"Failed to create native consumer, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.") raise err # ANCHOR_END: create_consumer @@ -96,7 +99,7 @@ def subscribe(consumer): print(f"data: {data}") except Exception as err: - print(f"Failed to poll data, ErrMessage:{err}") + print(f"Failed to poll data, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.") raise err @@ -123,7 +126,7 @@ def commit_offset(consumer): print("Commit offset manually successfully."); except Exception as err: - print(f"Failed to execute commit example, ErrMessage:{err}") + print(f"Failed to commit offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.") raise err # ANCHOR_END: commit_offset @@ -136,9 +139,9 @@ def seek_offset(consumer): for partition in assignments: partition.offset = 0 consumer.seek(partition) - print(f"Assignment seek to beginning successfully"); + print(f"Assignment seek to beginning successfully.") except Exception as err: - print(f"Failed to execute seek example, ErrMessage:{err}") + print(f"Failed to seek offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.") raise err # ANCHOR_END: assignment @@ -148,7 +151,7 @@ def unsubscribe(consumer): consumer.unsubscribe() print("Consumer unsubscribed successfully."); except Exception as err: - print(f"Failed to unsubscribe consumer. ErrMessage:{err}") + print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.") finally: if consumer: consumer.close() @@ -163,10 +166,7 @@ if __name__ == "__main__": subscribe(consumer) seek_offset(consumer) commit_offset(consumer) - consumer.unsubscribe() - print("Consumer unsubscribed successfully.") except Exception as err: - print(f"Failed to execute consumer example, ErrMessage:{err}") + print(f"Failed to execute consumer example, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.") finally: - consumer.unsubscribe() - + unsubscribe(consumer); diff --git a/docs/examples/python/tmq_websocket_example.py b/docs/examples/python/tmq_websocket_example.py index 15441fbb41..793eb3c416 100644 --- a/docs/examples/python/tmq_websocket_example.py +++ b/docs/examples/python/tmq_websocket_example.py @@ -1,18 +1,26 @@ #!/usr/bin/python3 import taosws -topic = "topic_meters" +db = "power" +topic = "topic_meters" +user = "root" +password = "taosdata" +host = "localhost" +port = 6041 +groupId = "group1" +clientId = "1" +tdConnWsScheme = "ws" +autoOffsetReset = "latest" +autoCommitState = "true" +autoCommitIntv = "1000" + def prepareMeta(): conn = None try: - conn = taosws.connect(user="root", - password="taosdata", - host="localhost", - port=6041) + conn = taosws.connect(user=user, password=password, host=host, port=port) - db = "power" # create database rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}") assert rowsAffected == 0 @@ -51,7 +59,7 @@ def prepareMeta(): print(f"Inserted into {affectedRows} rows to power.meters successfully.") except Exception as err: - print(f"Failed to prepareMeta ErrMessage:{err}") + print(f"Failed to prepareMeta, host: {host}:{port}, db: {db}, topic: {topic}, ErrMessage:{err}.") raise err finally: if conn: @@ -59,26 +67,22 @@ def prepareMeta(): # ANCHOR: create_consumer -def create_consumer(): - host = "localhost" - port = 6041 - groupId = "group1" - clientId = "1" +def create_consumer(): try: consumer = taosws.Consumer(conf={ - "td.connect.websocket.scheme": "ws", + "td.connect.websocket.scheme": tdConnWsScheme, "group.id": groupId, "client.id": clientId, - "auto.offset.reset": "latest", + "auto.offset.reset": autoOffsetReset, "td.connect.ip": host, "td.connect.port": port, - "enable.auto.commit": "true", - "auto.commit.interval.ms": "1000", + "enable.auto.commit": autoCommitState, + "auto.commit.interval.ms": autoCommitIntv, }) - print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}"); + print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}."); return consumer; except Exception as err: - print(f"Failed to create websocket consumer, host: {host}:{port}, ErrMessage:{err}"); + print(f"Failed to create websocket consumer, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}."); raise err @@ -95,10 +99,10 @@ def seek_offset(consumer): print( f"vg_id: {assign.vg_id()}, offset: {assign.offset()}, begin: {assign.begin()}, end: {assign.end()}") consumer.seek(topic, assign.vg_id(), assign.begin()) - print("Assignment seek to beginning successfully"); + print("Assignment seek to beginning successfully.") except Exception as err: - print(f"Failed to execute seek example, ErrMessage:{err}") + print(f"Failed to seek offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.") raise err # ANCHOR_END: assignment @@ -116,7 +120,7 @@ def subscribe(consumer): print(f"data: {row}") except Exception as err: - print(f"Failed to poll data, ErrMessage:{err}") + print(f"Failed to poll data, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.") raise err @@ -134,10 +138,10 @@ def commit_offset(consumer): # after processing the data, commit the offset manually consumer.commit(records) - print("Commit offset manually successfully."); + print("Commit offset manually successfully.") except Exception as err: - print(f"Failed to execute commit example, ErrMessage:{err}") + print(f"Failed to commit offset, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.") raise err @@ -150,7 +154,7 @@ def unsubscribe(consumer): consumer.unsubscribe() print("Consumer unsubscribed successfully."); except Exception as err: - print(f"Failed to unsubscribe consumer. ErrMessage:{err}") + print(f"Failed to unsubscribe consumer. topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.") finally: if consumer: consumer.close() @@ -167,6 +171,6 @@ if __name__ == "__main__": seek_offset(consumer) commit_offset(consumer) except Exception as err: - print(f"Failed to execute consumer example, ErrorMessage:{err}") + print(f"Failed to execute consumer example, topic: {topic}, groupId: {groupId}, clientId: {clientId}, ErrMessage:{err}.") finally: unsubscribe(consumer);