From b5a7c7fc1f207e51fb82d4a884b9b9c886b47286 Mon Sep 17 00:00:00 2001 From: menshibin Date: Sat, 3 Aug 2024 02:51:12 +0800 Subject: [PATCH] add python example --- .../connect_websocket_with_req_id_examples.py | 1 - docs/examples/python/insert_native.py | 2 +- docs/examples/python/reqid_native.py | 37 ++++ docs/examples/python/reqid_rest.py | 15 ++ docs/examples/python/reqid_ws.py | 35 +++ docs/examples/python/schemaless_native.py | 59 +++--- docs/examples/python/schemaless_ws.py | 70 +++--- docs/examples/python/stmt_native.py | 95 +++++---- docs/examples/python/stmt_ws.py | 98 +++++---- docs/examples/python/tmq_native.py | 199 ++++++++++++------ docs/examples/python/tmq_websocket_example.py | 160 +++++++++++--- docs/zh/08-develop/02-sql.md | 13 ++ docs/zh/08-develop/04-schemaless.md | 9 + docs/zh/08-develop/05-stmt.md | 9 +- docs/zh/08-develop/07-tmq.md | 24 +-- 15 files changed, 573 insertions(+), 253 deletions(-) create mode 100644 docs/examples/python/reqid_native.py create mode 100644 docs/examples/python/reqid_rest.py create mode 100644 docs/examples/python/reqid_ws.py diff --git a/docs/examples/python/connect_websocket_with_req_id_examples.py b/docs/examples/python/connect_websocket_with_req_id_examples.py index 3588b8e41f..639abd60ad 100644 --- a/docs/examples/python/connect_websocket_with_req_id_examples.py +++ b/docs/examples/python/connect_websocket_with_req_id_examples.py @@ -14,7 +14,6 @@ conn.execute("insert into tb1 values (now, 1)", req_id=6) conn.execute("insert into tb1 values (now, 2)", req_id=7) conn.execute("insert into tb1 values (now, 3)", req_id=8) -r = conn.execute("select * from stb", req_id=9) result = conn.query("select * from stb", req_id=10) num_of_fields = result.field_count print(num_of_fields) diff --git a/docs/examples/python/insert_native.py b/docs/examples/python/insert_native.py index faf4554437..ca59d8f395 100644 --- a/docs/examples/python/insert_native.py +++ b/docs/examples/python/insert_native.py @@ -16,7 +16,7 @@ try: power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco') VALUES (NOW + 1a, 10.30000, 218, 0.25000) """ - inserted = conn.execute(sql) + affectedRows = conn.execute(sql) print("inserted into {affectedRows} rows to power.meters successfully.") except Exception as err: diff --git a/docs/examples/python/reqid_native.py b/docs/examples/python/reqid_native.py new file mode 100644 index 0000000000..f6c27bceb3 --- /dev/null +++ b/docs/examples/python/reqid_native.py @@ -0,0 +1,37 @@ +import taos + +conn = None +try: + conn = taos.connect(host="localhost", + port=6030, + user="root", + password="taosdata") + + sql = """ + INSERT INTO + power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco') + VALUES (NOW + 1a, 10.30000, 219, 0.31000) + (NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000) + power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco') + VALUES (NOW + 1a, 10.30000, 218, 0.25000) + """ + inserted = conn.execute(sql, 1) + print("inserted into {affectedRows} rows to power.meters successfully.") + + result = conn.query("SELECT ts, current, location FROM power.meters limit 100", 2) + print(result) + # Get fields from result + fields = result.fields + for field in fields: + print(field) + + # Get data from result as list of tuple + data = result.fetch_all() + for row in data: + print(row) + +except Exception as err: + print(err) +finally: + if conn: + conn.close() \ No newline at end of file diff --git a/docs/examples/python/reqid_rest.py b/docs/examples/python/reqid_rest.py new file mode 100644 index 0000000000..10bc08d12d --- /dev/null +++ b/docs/examples/python/reqid_rest.py @@ -0,0 +1,15 @@ +import taosrest + +client = None + +try: + client = taosrest.RestClient(url="http://localhost:6041", + user="root", + password="taosdata", + timeout=30) + + result = client.sql(f"SELECT ts, current, location FROM power.meters limit 100", 1) + print(result) + +except Exception as err: + print(err) diff --git a/docs/examples/python/reqid_ws.py b/docs/examples/python/reqid_ws.py new file mode 100644 index 0000000000..47f4737a81 --- /dev/null +++ b/docs/examples/python/reqid_ws.py @@ -0,0 +1,35 @@ +import taosws + +conn = None + +try: + conn = taosws.connect( + user="root", + password="taosdata", + host="localhost", + port=6041, + ) + + sql = """ + INSERT INTO + power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco') + VALUES (NOW + 1a, 10.30000, 219, 0.31000) + (NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000) + power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco') + VALUES (NOW + 1a, 10.30000, 218, 0.25000) + """ + affectedRows = conn.execute_with_req_id(sql, req_id=1) + print("inserted into {affectedRows} rows to power.meters successfully.") + + result = conn.query_with_req_id("SELECT ts, current, location FROM power.meters limit 100", req_id=2) + num_of_fields = result.field_count + print(num_of_fields) + + for row in result: + print(row) + +except Exception as err: + print(err) +finally: + if conn: + conn.close() \ No newline at end of file diff --git a/docs/examples/python/schemaless_native.py b/docs/examples/python/schemaless_native.py index e166cf5af5..7faa2176af 100644 --- a/docs/examples/python/schemaless_native.py +++ b/docs/examples/python/schemaless_native.py @@ -1,36 +1,39 @@ import taos -conn = taos.connect( - host="localhost", - user="root", - password="taosdata", - port=6030, -) - -db = "power" - -conn.execute(f"DROP DATABASE IF EXISTS {db}") -conn.execute(f"CREATE DATABASE {db}") - -# change database. same as execute "USE db" -conn.select_db(db) - lineDemo = [ - "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000" + "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639" ] -telnetDemo = ["stb0_0 1707095283260 4 host=host0 interface=eth0"] + +telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"] + jsonDemo = [ - '{"metric": "meter_current","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}' + '{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}' ] -conn.schemaless_insert( - lineDemo, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS -) -conn.schemaless_insert( - telnetDemo, taos.SmlProtocol.TELNET_PROTOCOL, taos.SmlPrecision.MICRO_SECONDS -) -conn.schemaless_insert( - jsonDemo, taos.SmlProtocol.JSON_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS -) +try: + conn = taos.connect( + host="localhost", + user="root", + password="taosdata", + port=6030 + ) + + conn.execute("CREATE DATABASE IF NOT EXISTS power") + # change database. same as execute "USE db" + conn.select_db("power") + + conn.schemaless_insert( + lineDemo, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS + ) + conn.schemaless_insert( + telnetDemo, taos.SmlProtocol.TELNET_PROTOCOL, taos.SmlPrecision.MICRO_SECONDS + ) + conn.schemaless_insert( + jsonDemo, taos.SmlProtocol.JSON_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS + ) +except Exception as err: + print(err) +finally: + if conn: + conn.close() -conn.close() diff --git a/docs/examples/python/schemaless_ws.py b/docs/examples/python/schemaless_ws.py index 9e091f02c9..f6c1f1b0ac 100644 --- a/docs/examples/python/schemaless_ws.py +++ b/docs/examples/python/schemaless_ws.py @@ -1,46 +1,46 @@ import taosws -dsn = "taosws://root:taosdata@localhost:6041" -conn = taosws.connect(dsn) - -db = "power" - -conn.execute(f"DROP DATABASE IF EXISTS {db}") -conn.execute(f"CREATE DATABASE {db}") - -# change database. -conn = taosws.connect(f"{dsn}/{db}") +conn = None lineDemo = [ - "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000" + "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639" ] -telnetDemo = ["stb0_0 1707095283260 4 host=host0 interface=eth0"] + +telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"] + jsonDemo = [ - '{"metric": "meter_current","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}' + '{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}' ] -conn.schemaless_insert( - lines=lineDemo, - protocol=taosws.PySchemalessProtocol.Line, - precision=taosws.PySchemalessPrecision.Millisecond, - ttl=1, - req_id=1, -) +try: + conn = taosws.connect(user="root", + password="taosdata", + host="localhost", + port=6041) -conn.schemaless_insert( - lines=telnetDemo, - protocol=taosws.PySchemalessProtocol.Telnet, - precision=taosws.PySchemalessPrecision.Microsecond, - ttl=1, - req_id=2, -) + conn.execute("CREATE DATABASE IF NOT EXISTS power") + conn = conn.execute("USE power") -conn.schemaless_insert( - lines=jsonDemo, - protocol=taosws.PySchemalessProtocol.Json, - precision=taosws.PySchemalessPrecision.Millisecond, - ttl=1, - req_id=3, -) + conn.schemaless_insert( + lines=lineDemo, + protocol=taosws.PySchemalessProtocol.Line, + precision=taosws.PySchemalessPrecision.Millisecond + ) + + conn.schemaless_insert( + lines=telnetDemo, + protocol=taosws.PySchemalessProtocol.Telnet, + precision=taosws.PySchemalessPrecision.Microsecond + ) + + conn.schemaless_insert( + lines=jsonDemo, + protocol=taosws.PySchemalessProtocol.Json, + precision=taosws.PySchemalessPrecision.Millisecond + ) +except Exception as err: + print(err) +finally: + if conn: + conn.close() -conn.close() diff --git a/docs/examples/python/stmt_native.py b/docs/examples/python/stmt_native.py index 0af9d15fca..c51167a299 100644 --- a/docs/examples/python/stmt_native.py +++ b/docs/examples/python/stmt_native.py @@ -1,53 +1,64 @@ import taos +from datetime import datetime +import random -conn = taos.connect( - host="localhost", - user="root", - password="taosdata", - port=6030, -) +numOfSubTable = 10 +numOfRow = 10 -db = "power" +conn = None +stmt = None -conn.execute(f"DROP DATABASE IF EXISTS {db}") -conn.execute(f"CREATE DATABASE {db}") +try: + conn = taos.connect( + host="localhost", + user="root", + password="taosdata", + port=6030, + ) -# change database. same as execute "USE db" -conn.select_db(db) + conn.execute("CREATE DATABASE IF NOT EXISTS power") + conn.execute("USE power") + conn.execute( + "CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))" + ) -# create super table -conn.execute( - "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)" -) + # ANCHOR: stmt + sql = "INSERT INTO ? USING meters (groupid, location) TAGS(?,?) VALUES (?,?,?,?)" + stmt = conn.statement(sql) + for i in range(numOfSubTable): + tbname = f"d_bind_{i}" -# ANCHOR: stmt -sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)" -stmt = conn.statement(sql) + tags = taos.new_bind_params(2) + tags[0].int([i]) + tags[1].binary([f"location_{i}"]) + stmt.set_tbname_tags(tbname, tags) -tbname = "power.d1001" + current = int(datetime.now().timestamp() * 1000) + timestamps = [] + currents = [] + voltages = [] + phases = [] -tags = taos.new_bind_params(2) -tags[0].binary(["California.SanFrancisco"]) -tags[1].int([2]) + for j in range (numOfRow): + timestamps.append(current + i) + currents.append(random.random() * 30) + voltages.append(random.random(100, 300)) + phases.append(random.random()) -stmt.set_tbname_tags(tbname, tags) + params = taos.new_bind_params(4) + params[0].timestamp(timestamps) + params[1].float(currents) + params[2].int(voltages) + params[3].float(phases) + stmt.bind_param_batch(params) + stmt.execute() + affected = stmt.affected_rows() + print(f"table {tbname} insert {affected} rows.") -params = taos.new_bind_params(4) -params[0].timestamp((1626861392589, 1626861392591, 1626861392592)) -params[1].float((10.3, 12.6, 12.3)) -params[2].int([194, 200, 201]) -params[3].float([0.31, 0.33, 0.31]) - -stmt.bind_param_batch(params) - -stmt.execute() - -stmt.close() -# ANCHOR_END: stmt - -result = conn.query("SELECT * from meters") - -for row in result.fetch_all(): - print(row) - -conn.close() +except Exception as err: + print(err) +finally: + if stmt: + stmt.close() + if conn: + conn.close() diff --git a/docs/examples/python/stmt_ws.py b/docs/examples/python/stmt_ws.py index 3900f71bfb..360833208e 100644 --- a/docs/examples/python/stmt_ws.py +++ b/docs/examples/python/stmt_ws.py @@ -1,52 +1,66 @@ +from datetime import datetime +import random import taosws -dsn = "taosws://root:taosdata@localhost:6041" -conn = taosws.connect(dsn) +numOfSubTable = 10 -db = "power" +numOfRow = 10 -conn.execute(f"DROP DATABASE IF EXISTS {db}") -conn.execute(f"CREATE DATABASE {db}") +conn = None +stmt = None +try: + conn = taosws.connect(user="root", + password="taosdata", + host="localhost", + port=6041) -# change database. -conn.execute(f"USE {db}") + conn.execute("CREATE DATABASE IF NOT EXISTS power") + conn.execute("USE power") + conn.execute( + "CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))" + ) -# create super table -conn.execute( - "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)" -) + # ANCHOR: stmt + sql = "INSERT INTO ? USING meters (groupid, location) TAGS(?,?) VALUES (?,?,?,?)" + stmt = conn.statement() + stmt.prepare(sql) -# ANCHOR: stmt -sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)" -stmt = conn.statement() -stmt.prepare(sql) + for i in range(numOfSubTable): + tbname = f"d_bind_{i}" -tbname = "power.d1001" + tags = [ + taosws.int_to_tag(i), + taosws.varchar_to_tag(f"location_{i}"), + ] + stmt.set_tbname_tags(tbname, tags) + current = int(datetime.now().timestamp() * 1000) + timestamps = [] + currents = [] + voltages = [] + phases = [] + for j in range (numOfRow): + timestamps.append(current + i) + currents.append(random.random() * 30) + voltages.append(random.random(100, 300)) + phases.append(random.random()) -tags = [ - taosws.varchar_to_tag("California.SanFrancisco"), - taosws.int_to_tag(2), -] + stmt.bind_param( + [ + taosws.millis_timestamps_to_column(timestamps), + taosws.floats_to_column(currents), + taosws.ints_to_column(voltages), + taosws.floats_to_column(phases), + ] + ) -stmt.set_tbname_tags(tbname, tags) - -stmt.bind_param( - [ - taosws.millis_timestamps_to_column( - [1626861392589, 1626861392591, 1626861392592] - ), - taosws.floats_to_column([10.3, 12.6, 12.3]), - taosws.ints_to_column([194, 200, 201]), - taosws.floats_to_column([0.31, 0.33, 0.31]), - ] -) - -stmt.add_batch() -rows = stmt.execute() - -assert rows == 3 - -stmt.close() -# ANCHOR_END: stmt - -conn.close() + stmt.add_batch() + rows = stmt.execute() + print(f"insert {rows} rows.") + +except Exception as err: + print(err) +finally: + if stmt: + stmt.close() + if conn: + conn.close() diff --git a/docs/examples/python/tmq_native.py b/docs/examples/python/tmq_native.py index 7759c7b8e9..d43d8f4b8e 100644 --- a/docs/examples/python/tmq_native.py +++ b/docs/examples/python/tmq_native.py @@ -1,84 +1,153 @@ import taos -conn = taos.connect( - host="localhost", - user="root", - password="taosdata", - port=6030, -) +def prepareMeta(): + conn = None + try: + conn = taos.connect( + host="localhost", + user="root", + password="taosdata", + port=6030, + ) -db = "power" -topic = "topic_meters" + db = "power" + topic = "topic_meters" + conn.execute(f"CREATE DATABASE IF EXISTS {db}") -conn.execute(f"DROP TOPIC IF EXISTS {topic}") -conn.execute(f"DROP DATABASE IF EXISTS {db}") -conn.execute(f"CREATE DATABASE {db}") + # change database. same as execute "USE db" + conn.select_db(db) -# change database. same as execute "USE db" -conn.select_db(db) + # create super table + conn.execute( + "CREATE STABLE IF EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)" + ) -# create super table -conn.execute( - "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)" -) - -# ANCHOR: create_topic -# create topic -conn.execute( - f"CREATE TOPIC IF NOT EXISTS {topic} AS SELECT ts, current, voltage, phase, groupid, location FROM meters" -) -# ANCHOR_END: create_topic + # ANCHOR: create_topic + # create topic + conn.execute( + f"CREATE TOPIC IF NOT EXISTS {topic} AS SELECT ts, current, voltage, phase, groupid, location FROM meters" + ) + # ANCHOR_END: create_topic + sql = """ + INSERT INTO + power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco') + VALUES (NOW + 1a, 10.30000, 219, 0.31000) + (NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000) + power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco') + VALUES (NOW + 1a, 10.30000, 218, 0.25000) + """ + affectedRows = conn.execute(sql) + print("inserted into {affectedRows} rows to power.meters successfully.") + except Exception as err: + print("prepare meta err:{err}") + finally + if conn: + conn.close() # ANCHOR: create_consumer from taos.tmq import Consumer - -consumer = Consumer( - { - "group.id": "1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "enable.auto.commit": "true", - } -) +def create_consumer(): + try: + consumer = Consumer( + { + "group.id": "group1", + "client.id": "1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "enable.auto.commit": "true", + "auto.commit.interval.ms":"1000", + "auto.offset.reset": "latest", + "td.connect.ip": "192.168.1.98", + "td.connect.port": "6041", + } + ) + except Exception as err: + print("Failed to poll data, err:{err}") + raise err # ANCHOR_END: create_consumer # ANCHOR: subscribe -consumer.subscribe([topic]) +def subscribe(consumer): + try: + consumer.subscribe(["topic_meters"]) + print("subscribe topics successfully") + for i in range(50): + records = consumer.poll(1) + if records: + err = records.error() + if err is not None: + print(f"poll data error, {err}") + raise err + + val = res.value() + if val: + for block in val: + print(block.fetchall()) + + except Exception as err: + print("Failed to poll data, err:{err}") + raise err # ANCHOR_END: subscribe -try: - # ANCHOR: consume - while True: - res = consumer.poll(1) - if not res: - break - err = res.error() - if err is not None: - raise err - val = res.value() +def commit_offset(consumer): + # ANCHOR: commit_offset + try: + consumer.subscribe(["topic_meters"]) + print("subscribe topics successfully") + for i in range(50): + records = consumer.poll(1) + if records: + err = records.error() + if err is not None: + print(f"poll data error, {err}") + raise err + + val = res.value() + if val: + for block in val: + print(block.fetchall()) - for block in val: - print(block.fetchall()) - # ANCHOR_END: consume + consumer.commit(records) + except Exception as err: + print("Failed to poll data, err:{err}") + raise err + # ANCHOR_END: commit_offset + +def seek_offset(consumer): # ANCHOR: assignment - assignments = consumer.assignment() - for assignment in assignments: - print(assignment) - # ANCHOR_END: assignment + try: + assignments = consumer.assignment() + for partition in assignments: + print("first data polled: {partition.offset}") + partition.offset = 0 + consumer.seek(partition) + print("assignment seek to beginning successfully"); + except Exception as err: + print("seek example failed; err:{err}") + raise err + # ANCHOR_END: assignment - # ANCHOR: seek - offset = taos.tmq.TopicPartition( - topic=topic, - partition=assignment.partition, - offset=0, - ) - consumer.seek(offset) - # ANCHOR_END: seek -finally: - # ANCHOR: unsubscribe - consumer.unsubscribe() - consumer.close() - # ANCHOR_END: unsubscribe +# ANCHOR: unsubscribe +def unsubscribe(consumer): + try: + consumer.unsubscribe() + except Exception as err: + print("Failed to unsubscribe consumer. err:{err}") + +# ANCHOR_END: unsubscribe -conn.close() +if __name__ == "__main__": + consumer = None + try: + prepareMeta() + consumer = create_consumer() + subscribe(consumer) + seek_offset(consumer) + commit_offset(consumer) + unsubscribe(consumer) + except Exception as err: + print("Failed to stmt consumer. err:{err}") + finally: + if consumer: + consumer.close() \ No newline at end of file diff --git a/docs/examples/python/tmq_websocket_example.py b/docs/examples/python/tmq_websocket_example.py index e1dcb0086a..43ca725b56 100644 --- a/docs/examples/python/tmq_websocket_example.py +++ b/docs/examples/python/tmq_websocket_example.py @@ -1,31 +1,143 @@ #!/usr/bin/python3 -from taosws import Consumer +from taosws -conf = { - "td.connect.websocket.scheme": "ws", - "group.id": "0", -} -consumer = Consumer(conf) +def prepareMeta(): + conn = None -consumer.subscribe(["test"]) + try: + conn = taosws.connect(user="root", + password="taosdata", + host="localhost", + port=6041) -while True: - message = consumer.poll(timeout=1.0) - if message: - id = message.vgroup() - topic = message.topic() - database = message.database() + db = "power" + # create database + rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}") + assert rowsAffected == 0 - for block in message: - nrows = block.nrows() - ncols = block.ncols() - for row in block: - print(row) - values = block.fetchall() - print(nrows, ncols) + # change database. + rowsAffected = conn.execute(f"USE {db}") + assert rowsAffected == 0 + + # create super table + rowsAffected = conn.execute( + "CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))" + ) + assert rowsAffected == 0 - # consumer.commit(message) - else: - break + # create table + rowsAffected = conn.execute("CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupid, location) TAGS(0, 'Los Angles')") + assert rowsAffected == 0 -consumer.close() + sql = """ + INSERT INTO + power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco') + VALUES (NOW + 1a, 10.30000, 219, 0.31000) + (NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000) + power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco') + VALUES (NOW + 1a, 10.30000, 218, 0.25000) + """ + inserted = conn.execute(sql) + print("inserted into {affectedRows} rows to power.meters successfully.") + + except Exception as err: + print(err) + finally: + if conn: + conn.close() + + +# ANCHOR: create_consumer +def create_consumer(): + try: + consumer = taosws.Consumer(conf={ + "td.connect.websocket.scheme": "ws", + "group.id": "group1", + "client.id": "1", + "auto.offset.reset": "latest", + "td.connect.ip": "192.168.1.98", + "td.connect.port": "6041", + "enable.auto.commit": "true", + "auto.commit.interval.ms":"1000", + }) + return consumer; + except Exception as err: + print(f"Failed to create websocket consumer, err:{err}"); + raise err +# ANCHOR_END: create_consumer + +def seek_offset(consumer): + # ANCHOR: assignment + try: + assignments = consumer.assignment() + for assignment in assignments: + topic = assignment.topic() + print(f"topic: {topic}") + for assign in assignment.assignments(): + print(f"vg_id: {assign.vg_id()}, offset: {assign.offset()}, begin: {assign.begin()}, end: {assign.end()}") + consumer.seek(topic, assign.vg_id(), assign.begin()) + print("assignment seek to beginning successfully"); + + except Exception as err: + print("seek example failed; err:{err}") + raise err + # ANCHOR_END: assignment + +# ANCHOR: subscribe +def subscribe(consumer): + try: + consumer.subscribe(["topic_meters"]) + print("subscribe topics successfully") + for i in range(50): + records = consumer.poll(timeout=1.0) + if records: + for block in records: + for row in block: + print(row) + + except Exception as err: + print("Failed to poll data, err:{err}") + raise err +# ANCHOR_END: subscribe + +# ANCHOR: commit_offset +def commit_offset(consumer): + try: + consumer.subscribe(["topic_meters"]) + print("subscribe topics successfully") + for i in range(50): + records = consumer.poll(timeout=1.0) + if records: + for block in records: + for row in block: + print(row) + consumer.commit(records) + + except Exception as err: + print("Failed to poll data, err:{err}") + raise err +# ANCHOR_END: commit_offset +# +# ANCHOR: unsubscribe +def unsubscribe(consumer): + try: + consumer.unsubscribe() + except Exception as err: + print("Failed to unsubscribe consumer. err:{err}") + +# ANCHOR_END: unsubscribe + +if __name__ == "__main__": + consumer = None + try: + prepareMeta() + consumer = create_consumer() + subscribe(consumer) + seek_offset(consumer) + commit_offset(consumer) + unsubscribe(consumer) + except Exception as err: + print("Failed to stmt consumer. err:{err}") + finally: + if consumer: + consumer.close() \ No newline at end of file diff --git a/docs/zh/08-develop/02-sql.md b/docs/zh/08-develop/02-sql.md index d6fdbc24b7..2e49ac9f19 100644 --- a/docs/zh/08-develop/02-sql.md +++ b/docs/zh/08-develop/02-sql.md @@ -265,6 +265,19 @@ reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId + +```python title="Websocket 连接" +{{#include docs/examples/python/reqid_ws.py}} +``` + +```python title="原生连接" +{{#include docs/examples/python/reqid_native.py}} +``` + +```python title="Rest 连接" +{{#include docs/examples/python/reqid_rest.py}} +``` + ```go diff --git a/docs/zh/08-develop/04-schemaless.md b/docs/zh/08-develop/04-schemaless.md index 611878960d..4552a4f14a 100644 --- a/docs/zh/08-develop/04-schemaless.md +++ b/docs/zh/08-develop/04-schemaless.md @@ -177,6 +177,10 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO + +```python +{{#include docs/examples/python/schemaless_ws.py}} +``` ```go @@ -223,6 +227,10 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO + +```python +{{#include docs/examples/python/schemaless_native.py}} +``` @@ -250,6 +258,7 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO 不支持 + 不支持 ```go diff --git a/docs/zh/08-develop/05-stmt.md b/docs/zh/08-develop/05-stmt.md index d3e71eed63..a8fd62a87d 100644 --- a/docs/zh/08-develop/05-stmt.md +++ b/docs/zh/08-develop/05-stmt.md @@ -34,9 +34,9 @@ import TabItem from "@theme/TabItem"; - ```python - {{#include docs/examples/python/connect_websocket_examples.py:connect}} - ``` +```python +{{#include docs/examples/python/stmt_ws.py}} +``` ```go @@ -85,6 +85,9 @@ import TabItem from "@theme/TabItem"; +```python +{{#include docs/examples/python/stmt_native.py}} +``` ```go diff --git a/docs/zh/08-develop/07-tmq.md b/docs/zh/08-develop/07-tmq.md index 3f379b52e6..6353a18890 100644 --- a/docs/zh/08-develop/07-tmq.md +++ b/docs/zh/08-develop/07-tmq.md @@ -117,7 +117,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```python -{{#include docs/examples/python/connect_websocket_examples.py:connect}} +{{#include docs/examples/python/tmq_websocket_example.py:create_consumer}} ``` @@ -181,7 +181,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```python -{{#include docs/examples/python/connect_websocket_examples.py:connect}} +{{#include docs/examples/python/tmq_native.py:create_consumer}} ``` @@ -233,7 +233,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```python -{{#include docs/examples/python/connect_websocket_examples.py:connect}} +{{#include docs/examples/python/tmq_websocket_example.py:subscribe}} ``` @@ -284,7 +284,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```python -{{#include docs/examples/python/connect_websocket_examples.py:connect}} +{{#include docs/examples/python/tmq_native.py:subscribe}} ``` @@ -333,7 +333,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```python -{{#include docs/examples/python/connect_websocket_examples.py:connect}} +{{#include docs/examples/python/tmq_websocket_example.py:assignment}} ``` @@ -384,7 +384,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```python -{{#include docs/examples/python/connect_websocket_examples.py:connect}} +{{#include docs/examples/python/tmq_native.py:assignment}} ``` @@ -435,7 +435,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```python -{{#include docs/examples/python/connect_websocket_examples.py:connect}} +{{#include docs/examples/python/tmq_websocket_example.py:commit_offset}} ``` @@ -488,7 +488,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```python -{{#include docs/examples/python/connect_websocket_examples.py:connect}} +{{#include docs/examples/python/tmq_native.py:commit_offset}} ``` @@ -543,7 +543,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```python -{{#include docs/examples/python/connect_websocket_examples.py:connect}} +{{#include docs/examples/python/tmq_websocket_example.py:unsubscribe}} ``` @@ -595,7 +595,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```python -{{#include docs/examples/python/connect_websocket_examples.py:connect}} +{{#include docs/examples/python/tmq_native.py:unsubscribe}} ``` @@ -651,7 +651,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```python -{{#include docs/examples/python/connect_websocket_examples.py:connect}} +{{#include docs/examples/python/tmq_websocket_example.py}} ``` @@ -711,7 +711,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 ```python -{{#include docs/examples/python/connect_websocket_examples.py:connect}} +{{#include docs/examples/python/tmq_native.py}} ```