diff --git a/docs/examples/python/connect_websocket_with_req_id_examples.py b/docs/examples/python/connect_websocket_with_req_id_examples.py
index 3588b8e41f..639abd60ad 100644
--- a/docs/examples/python/connect_websocket_with_req_id_examples.py
+++ b/docs/examples/python/connect_websocket_with_req_id_examples.py
@@ -14,7 +14,6 @@ conn.execute("insert into tb1 values (now, 1)", req_id=6)
conn.execute("insert into tb1 values (now, 2)", req_id=7)
conn.execute("insert into tb1 values (now, 3)", req_id=8)
-r = conn.execute("select * from stb", req_id=9)
result = conn.query("select * from stb", req_id=10)
num_of_fields = result.field_count
print(num_of_fields)
diff --git a/docs/examples/python/insert_native.py b/docs/examples/python/insert_native.py
index faf4554437..ca59d8f395 100644
--- a/docs/examples/python/insert_native.py
+++ b/docs/examples/python/insert_native.py
@@ -16,7 +16,7 @@ try:
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
"""
- inserted = conn.execute(sql)
+ affectedRows = conn.execute(sql)
print("inserted into {affectedRows} rows to power.meters successfully.")
except Exception as err:
diff --git a/docs/examples/python/reqid_native.py b/docs/examples/python/reqid_native.py
new file mode 100644
index 0000000000..f6c27bceb3
--- /dev/null
+++ b/docs/examples/python/reqid_native.py
@@ -0,0 +1,37 @@
+import taos
+
+conn = None
+try:
+ conn = taos.connect(host="localhost",
+ port=6030,
+ user="root",
+ password="taosdata")
+
+ sql = """
+ INSERT INTO
+ power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
+ VALUES (NOW + 1a, 10.30000, 219, 0.31000)
+ (NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
+ power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
+ VALUES (NOW + 1a, 10.30000, 218, 0.25000)
+ """
+ inserted = conn.execute(sql, 1)
+ print("inserted into {affectedRows} rows to power.meters successfully.")
+
+ result = conn.query("SELECT ts, current, location FROM power.meters limit 100", 2)
+ print(result)
+ # Get fields from result
+ fields = result.fields
+ for field in fields:
+ print(field)
+
+ # Get data from result as list of tuple
+ data = result.fetch_all()
+ for row in data:
+ print(row)
+
+except Exception as err:
+ print(err)
+finally:
+ if conn:
+ conn.close()
\ No newline at end of file
diff --git a/docs/examples/python/reqid_rest.py b/docs/examples/python/reqid_rest.py
new file mode 100644
index 0000000000..10bc08d12d
--- /dev/null
+++ b/docs/examples/python/reqid_rest.py
@@ -0,0 +1,15 @@
+import taosrest
+
+client = None
+
+try:
+ client = taosrest.RestClient(url="http://localhost:6041",
+ user="root",
+ password="taosdata",
+ timeout=30)
+
+ result = client.sql(f"SELECT ts, current, location FROM power.meters limit 100", 1)
+ print(result)
+
+except Exception as err:
+ print(err)
diff --git a/docs/examples/python/reqid_ws.py b/docs/examples/python/reqid_ws.py
new file mode 100644
index 0000000000..47f4737a81
--- /dev/null
+++ b/docs/examples/python/reqid_ws.py
@@ -0,0 +1,35 @@
+import taosws
+
+conn = None
+
+try:
+ conn = taosws.connect(
+ user="root",
+ password="taosdata",
+ host="localhost",
+ port=6041,
+ )
+
+ sql = """
+ INSERT INTO
+ power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
+ VALUES (NOW + 1a, 10.30000, 219, 0.31000)
+ (NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
+ power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
+ VALUES (NOW + 1a, 10.30000, 218, 0.25000)
+ """
+ affectedRows = conn.execute_with_req_id(sql, req_id=1)
+ print("inserted into {affectedRows} rows to power.meters successfully.")
+
+ result = conn.query_with_req_id("SELECT ts, current, location FROM power.meters limit 100", req_id=2)
+ num_of_fields = result.field_count
+ print(num_of_fields)
+
+ for row in result:
+ print(row)
+
+except Exception as err:
+ print(err)
+finally:
+ if conn:
+ conn.close()
\ No newline at end of file
diff --git a/docs/examples/python/schemaless_native.py b/docs/examples/python/schemaless_native.py
index e166cf5af5..7faa2176af 100644
--- a/docs/examples/python/schemaless_native.py
+++ b/docs/examples/python/schemaless_native.py
@@ -1,36 +1,39 @@
import taos
-conn = taos.connect(
- host="localhost",
- user="root",
- password="taosdata",
- port=6030,
-)
-
-db = "power"
-
-conn.execute(f"DROP DATABASE IF EXISTS {db}")
-conn.execute(f"CREATE DATABASE {db}")
-
-# change database. same as execute "USE db"
-conn.select_db(db)
-
lineDemo = [
- "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000"
+ "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
]
-telnetDemo = ["stb0_0 1707095283260 4 host=host0 interface=eth0"]
+
+telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"]
+
jsonDemo = [
- '{"metric": "meter_current","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
+ '{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
]
-conn.schemaless_insert(
- lineDemo, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
-)
-conn.schemaless_insert(
- telnetDemo, taos.SmlProtocol.TELNET_PROTOCOL, taos.SmlPrecision.MICRO_SECONDS
-)
-conn.schemaless_insert(
- jsonDemo, taos.SmlProtocol.JSON_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
-)
+try:
+ conn = taos.connect(
+ host="localhost",
+ user="root",
+ password="taosdata",
+ port=6030
+ )
+
+ conn.execute("CREATE DATABASE IF NOT EXISTS power")
+ # change database. same as execute "USE db"
+ conn.select_db("power")
+
+ conn.schemaless_insert(
+ lineDemo, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
+ )
+ conn.schemaless_insert(
+ telnetDemo, taos.SmlProtocol.TELNET_PROTOCOL, taos.SmlPrecision.MICRO_SECONDS
+ )
+ conn.schemaless_insert(
+ jsonDemo, taos.SmlProtocol.JSON_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
+ )
+except Exception as err:
+ print(err)
+finally:
+ if conn:
+ conn.close()
-conn.close()
diff --git a/docs/examples/python/schemaless_ws.py b/docs/examples/python/schemaless_ws.py
index 9e091f02c9..f6c1f1b0ac 100644
--- a/docs/examples/python/schemaless_ws.py
+++ b/docs/examples/python/schemaless_ws.py
@@ -1,46 +1,46 @@
import taosws
-dsn = "taosws://root:taosdata@localhost:6041"
-conn = taosws.connect(dsn)
-
-db = "power"
-
-conn.execute(f"DROP DATABASE IF EXISTS {db}")
-conn.execute(f"CREATE DATABASE {db}")
-
-# change database.
-conn = taosws.connect(f"{dsn}/{db}")
+conn = None
lineDemo = [
- "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000"
+ "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
]
-telnetDemo = ["stb0_0 1707095283260 4 host=host0 interface=eth0"]
+
+telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"]
+
jsonDemo = [
- '{"metric": "meter_current","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
+ '{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
]
-conn.schemaless_insert(
- lines=lineDemo,
- protocol=taosws.PySchemalessProtocol.Line,
- precision=taosws.PySchemalessPrecision.Millisecond,
- ttl=1,
- req_id=1,
-)
+try:
+ conn = taosws.connect(user="root",
+ password="taosdata",
+ host="localhost",
+ port=6041)
-conn.schemaless_insert(
- lines=telnetDemo,
- protocol=taosws.PySchemalessProtocol.Telnet,
- precision=taosws.PySchemalessPrecision.Microsecond,
- ttl=1,
- req_id=2,
-)
+ conn.execute("CREATE DATABASE IF NOT EXISTS power")
+ conn = conn.execute("USE power")
-conn.schemaless_insert(
- lines=jsonDemo,
- protocol=taosws.PySchemalessProtocol.Json,
- precision=taosws.PySchemalessPrecision.Millisecond,
- ttl=1,
- req_id=3,
-)
+ conn.schemaless_insert(
+ lines=lineDemo,
+ protocol=taosws.PySchemalessProtocol.Line,
+ precision=taosws.PySchemalessPrecision.Millisecond
+ )
+
+ conn.schemaless_insert(
+ lines=telnetDemo,
+ protocol=taosws.PySchemalessProtocol.Telnet,
+ precision=taosws.PySchemalessPrecision.Microsecond
+ )
+
+ conn.schemaless_insert(
+ lines=jsonDemo,
+ protocol=taosws.PySchemalessProtocol.Json,
+ precision=taosws.PySchemalessPrecision.Millisecond
+ )
+except Exception as err:
+ print(err)
+finally:
+ if conn:
+ conn.close()
-conn.close()
diff --git a/docs/examples/python/stmt_native.py b/docs/examples/python/stmt_native.py
index 0af9d15fca..c51167a299 100644
--- a/docs/examples/python/stmt_native.py
+++ b/docs/examples/python/stmt_native.py
@@ -1,53 +1,64 @@
import taos
+from datetime import datetime
+import random
-conn = taos.connect(
- host="localhost",
- user="root",
- password="taosdata",
- port=6030,
-)
+numOfSubTable = 10
+numOfRow = 10
-db = "power"
+conn = None
+stmt = None
-conn.execute(f"DROP DATABASE IF EXISTS {db}")
-conn.execute(f"CREATE DATABASE {db}")
+try:
+ conn = taos.connect(
+ host="localhost",
+ user="root",
+ password="taosdata",
+ port=6030,
+ )
-# change database. same as execute "USE db"
-conn.select_db(db)
+ conn.execute("CREATE DATABASE IF NOT EXISTS power")
+ conn.execute("USE power")
+ conn.execute(
+ "CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
+ )
-# create super table
-conn.execute(
- "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
-)
+ # ANCHOR: stmt
+ sql = "INSERT INTO ? USING meters (groupid, location) TAGS(?,?) VALUES (?,?,?,?)"
+ stmt = conn.statement(sql)
+ for i in range(numOfSubTable):
+ tbname = f"d_bind_{i}"
-# ANCHOR: stmt
-sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
-stmt = conn.statement(sql)
+ tags = taos.new_bind_params(2)
+ tags[0].int([i])
+ tags[1].binary([f"location_{i}"])
+ stmt.set_tbname_tags(tbname, tags)
-tbname = "power.d1001"
+ current = int(datetime.now().timestamp() * 1000)
+ timestamps = []
+ currents = []
+ voltages = []
+ phases = []
-tags = taos.new_bind_params(2)
-tags[0].binary(["California.SanFrancisco"])
-tags[1].int([2])
+ for j in range (numOfRow):
+ timestamps.append(current + i)
+ currents.append(random.random() * 30)
+ voltages.append(random.random(100, 300))
+ phases.append(random.random())
-stmt.set_tbname_tags(tbname, tags)
+ params = taos.new_bind_params(4)
+ params[0].timestamp(timestamps)
+ params[1].float(currents)
+ params[2].int(voltages)
+ params[3].float(phases)
+ stmt.bind_param_batch(params)
+ stmt.execute()
+ affected = stmt.affected_rows()
+ print(f"table {tbname} insert {affected} rows.")
-params = taos.new_bind_params(4)
-params[0].timestamp((1626861392589, 1626861392591, 1626861392592))
-params[1].float((10.3, 12.6, 12.3))
-params[2].int([194, 200, 201])
-params[3].float([0.31, 0.33, 0.31])
-
-stmt.bind_param_batch(params)
-
-stmt.execute()
-
-stmt.close()
-# ANCHOR_END: stmt
-
-result = conn.query("SELECT * from meters")
-
-for row in result.fetch_all():
- print(row)
-
-conn.close()
+except Exception as err:
+ print(err)
+finally:
+ if stmt:
+ stmt.close()
+ if conn:
+ conn.close()
diff --git a/docs/examples/python/stmt_ws.py b/docs/examples/python/stmt_ws.py
index 3900f71bfb..360833208e 100644
--- a/docs/examples/python/stmt_ws.py
+++ b/docs/examples/python/stmt_ws.py
@@ -1,52 +1,66 @@
+from datetime import datetime
+import random
import taosws
-dsn = "taosws://root:taosdata@localhost:6041"
-conn = taosws.connect(dsn)
+numOfSubTable = 10
-db = "power"
+numOfRow = 10
-conn.execute(f"DROP DATABASE IF EXISTS {db}")
-conn.execute(f"CREATE DATABASE {db}")
+conn = None
+stmt = None
+try:
+ conn = taosws.connect(user="root",
+ password="taosdata",
+ host="localhost",
+ port=6041)
-# change database.
-conn.execute(f"USE {db}")
+ conn.execute("CREATE DATABASE IF NOT EXISTS power")
+ conn.execute("USE power")
+ conn.execute(
+ "CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
+ )
-# create super table
-conn.execute(
- "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
-)
+ # ANCHOR: stmt
+ sql = "INSERT INTO ? USING meters (groupid, location) TAGS(?,?) VALUES (?,?,?,?)"
+ stmt = conn.statement()
+ stmt.prepare(sql)
-# ANCHOR: stmt
-sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
-stmt = conn.statement()
-stmt.prepare(sql)
+ for i in range(numOfSubTable):
+ tbname = f"d_bind_{i}"
-tbname = "power.d1001"
+ tags = [
+ taosws.int_to_tag(i),
+ taosws.varchar_to_tag(f"location_{i}"),
+ ]
+ stmt.set_tbname_tags(tbname, tags)
+ current = int(datetime.now().timestamp() * 1000)
+ timestamps = []
+ currents = []
+ voltages = []
+ phases = []
+ for j in range (numOfRow):
+ timestamps.append(current + i)
+ currents.append(random.random() * 30)
+ voltages.append(random.random(100, 300))
+ phases.append(random.random())
-tags = [
- taosws.varchar_to_tag("California.SanFrancisco"),
- taosws.int_to_tag(2),
-]
+ stmt.bind_param(
+ [
+ taosws.millis_timestamps_to_column(timestamps),
+ taosws.floats_to_column(currents),
+ taosws.ints_to_column(voltages),
+ taosws.floats_to_column(phases),
+ ]
+ )
-stmt.set_tbname_tags(tbname, tags)
-
-stmt.bind_param(
- [
- taosws.millis_timestamps_to_column(
- [1626861392589, 1626861392591, 1626861392592]
- ),
- taosws.floats_to_column([10.3, 12.6, 12.3]),
- taosws.ints_to_column([194, 200, 201]),
- taosws.floats_to_column([0.31, 0.33, 0.31]),
- ]
-)
-
-stmt.add_batch()
-rows = stmt.execute()
-
-assert rows == 3
-
-stmt.close()
-# ANCHOR_END: stmt
-
-conn.close()
+ stmt.add_batch()
+ rows = stmt.execute()
+ print(f"insert {rows} rows.")
+
+except Exception as err:
+ print(err)
+finally:
+ if stmt:
+ stmt.close()
+ if conn:
+ conn.close()
diff --git a/docs/examples/python/tmq_native.py b/docs/examples/python/tmq_native.py
index 7759c7b8e9..d43d8f4b8e 100644
--- a/docs/examples/python/tmq_native.py
+++ b/docs/examples/python/tmq_native.py
@@ -1,84 +1,153 @@
import taos
-conn = taos.connect(
- host="localhost",
- user="root",
- password="taosdata",
- port=6030,
-)
+def prepareMeta():
+ conn = None
+ try:
+ conn = taos.connect(
+ host="localhost",
+ user="root",
+ password="taosdata",
+ port=6030,
+ )
-db = "power"
-topic = "topic_meters"
+ db = "power"
+ topic = "topic_meters"
+ conn.execute(f"CREATE DATABASE IF EXISTS {db}")
-conn.execute(f"DROP TOPIC IF EXISTS {topic}")
-conn.execute(f"DROP DATABASE IF EXISTS {db}")
-conn.execute(f"CREATE DATABASE {db}")
+ # change database. same as execute "USE db"
+ conn.select_db(db)
-# change database. same as execute "USE db"
-conn.select_db(db)
+ # create super table
+ conn.execute(
+ "CREATE STABLE IF EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
+ )
-# create super table
-conn.execute(
- "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
-)
-
-# ANCHOR: create_topic
-# create topic
-conn.execute(
- f"CREATE TOPIC IF NOT EXISTS {topic} AS SELECT ts, current, voltage, phase, groupid, location FROM meters"
-)
-# ANCHOR_END: create_topic
+ # ANCHOR: create_topic
+ # create topic
+ conn.execute(
+ f"CREATE TOPIC IF NOT EXISTS {topic} AS SELECT ts, current, voltage, phase, groupid, location FROM meters"
+ )
+ # ANCHOR_END: create_topic
+ sql = """
+ INSERT INTO
+ power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
+ VALUES (NOW + 1a, 10.30000, 219, 0.31000)
+ (NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
+ power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
+ VALUES (NOW + 1a, 10.30000, 218, 0.25000)
+ """
+ affectedRows = conn.execute(sql)
+ print("inserted into {affectedRows} rows to power.meters successfully.")
+ except Exception as err:
+ print("prepare meta err:{err}")
+ finally
+ if conn:
+ conn.close()
# ANCHOR: create_consumer
from taos.tmq import Consumer
-
-consumer = Consumer(
- {
- "group.id": "1",
- "td.connect.user": "root",
- "td.connect.pass": "taosdata",
- "enable.auto.commit": "true",
- }
-)
+def create_consumer():
+ try:
+ consumer = Consumer(
+ {
+ "group.id": "group1",
+ "client.id": "1",
+ "td.connect.user": "root",
+ "td.connect.pass": "taosdata",
+ "enable.auto.commit": "true",
+ "auto.commit.interval.ms":"1000",
+ "auto.offset.reset": "latest",
+ "td.connect.ip": "192.168.1.98",
+ "td.connect.port": "6041",
+ }
+ )
+ except Exception as err:
+ print("Failed to poll data, err:{err}")
+ raise err
# ANCHOR_END: create_consumer
# ANCHOR: subscribe
-consumer.subscribe([topic])
+def subscribe(consumer):
+ try:
+ consumer.subscribe(["topic_meters"])
+ print("subscribe topics successfully")
+ for i in range(50):
+ records = consumer.poll(1)
+ if records:
+ err = records.error()
+ if err is not None:
+ print(f"poll data error, {err}")
+ raise err
+
+ val = res.value()
+ if val:
+ for block in val:
+ print(block.fetchall())
+
+ except Exception as err:
+ print("Failed to poll data, err:{err}")
+ raise err
# ANCHOR_END: subscribe
-try:
- # ANCHOR: consume
- while True:
- res = consumer.poll(1)
- if not res:
- break
- err = res.error()
- if err is not None:
- raise err
- val = res.value()
+def commit_offset(consumer):
+ # ANCHOR: commit_offset
+ try:
+ consumer.subscribe(["topic_meters"])
+ print("subscribe topics successfully")
+ for i in range(50):
+ records = consumer.poll(1)
+ if records:
+ err = records.error()
+ if err is not None:
+ print(f"poll data error, {err}")
+ raise err
+
+ val = res.value()
+ if val:
+ for block in val:
+ print(block.fetchall())
- for block in val:
- print(block.fetchall())
- # ANCHOR_END: consume
+ consumer.commit(records)
+ except Exception as err:
+ print("Failed to poll data, err:{err}")
+ raise err
+ # ANCHOR_END: commit_offset
+
+def seek_offset(consumer):
# ANCHOR: assignment
- assignments = consumer.assignment()
- for assignment in assignments:
- print(assignment)
- # ANCHOR_END: assignment
+ try:
+ assignments = consumer.assignment()
+ for partition in assignments:
+ print("first data polled: {partition.offset}")
+ partition.offset = 0
+ consumer.seek(partition)
+ print("assignment seek to beginning successfully");
+ except Exception as err:
+ print("seek example failed; err:{err}")
+ raise err
+ # ANCHOR_END: assignment
- # ANCHOR: seek
- offset = taos.tmq.TopicPartition(
- topic=topic,
- partition=assignment.partition,
- offset=0,
- )
- consumer.seek(offset)
- # ANCHOR_END: seek
-finally:
- # ANCHOR: unsubscribe
- consumer.unsubscribe()
- consumer.close()
- # ANCHOR_END: unsubscribe
+# ANCHOR: unsubscribe
+def unsubscribe(consumer):
+ try:
+ consumer.unsubscribe()
+ except Exception as err:
+ print("Failed to unsubscribe consumer. err:{err}")
+
+# ANCHOR_END: unsubscribe
-conn.close()
+if __name__ == "__main__":
+ consumer = None
+ try:
+ prepareMeta()
+ consumer = create_consumer()
+ subscribe(consumer)
+ seek_offset(consumer)
+ commit_offset(consumer)
+ unsubscribe(consumer)
+ except Exception as err:
+ print("Failed to stmt consumer. err:{err}")
+ finally:
+ if consumer:
+ consumer.close()
\ No newline at end of file
diff --git a/docs/examples/python/tmq_websocket_example.py b/docs/examples/python/tmq_websocket_example.py
index e1dcb0086a..43ca725b56 100644
--- a/docs/examples/python/tmq_websocket_example.py
+++ b/docs/examples/python/tmq_websocket_example.py
@@ -1,31 +1,143 @@
#!/usr/bin/python3
-from taosws import Consumer
+from taosws
-conf = {
- "td.connect.websocket.scheme": "ws",
- "group.id": "0",
-}
-consumer = Consumer(conf)
+def prepareMeta():
+ conn = None
-consumer.subscribe(["test"])
+ try:
+ conn = taosws.connect(user="root",
+ password="taosdata",
+ host="localhost",
+ port=6041)
-while True:
- message = consumer.poll(timeout=1.0)
- if message:
- id = message.vgroup()
- topic = message.topic()
- database = message.database()
+ db = "power"
+ # create database
+ rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
+ assert rowsAffected == 0
- for block in message:
- nrows = block.nrows()
- ncols = block.ncols()
- for row in block:
- print(row)
- values = block.fetchall()
- print(nrows, ncols)
+ # change database.
+ rowsAffected = conn.execute(f"USE {db}")
+ assert rowsAffected == 0
+
+ # create super table
+ rowsAffected = conn.execute(
+ "CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
+ )
+ assert rowsAffected == 0
- # consumer.commit(message)
- else:
- break
+ # create table
+ rowsAffected = conn.execute("CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupid, location) TAGS(0, 'Los Angles')")
+ assert rowsAffected == 0
-consumer.close()
+ sql = """
+ INSERT INTO
+ power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
+ VALUES (NOW + 1a, 10.30000, 219, 0.31000)
+ (NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
+ power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
+ VALUES (NOW + 1a, 10.30000, 218, 0.25000)
+ """
+ inserted = conn.execute(sql)
+ print("inserted into {affectedRows} rows to power.meters successfully.")
+
+ except Exception as err:
+ print(err)
+ finally:
+ if conn:
+ conn.close()
+
+
+# ANCHOR: create_consumer
+def create_consumer():
+ try:
+ consumer = taosws.Consumer(conf={
+ "td.connect.websocket.scheme": "ws",
+ "group.id": "group1",
+ "client.id": "1",
+ "auto.offset.reset": "latest",
+ "td.connect.ip": "192.168.1.98",
+ "td.connect.port": "6041",
+ "enable.auto.commit": "true",
+ "auto.commit.interval.ms":"1000",
+ })
+ return consumer;
+ except Exception as err:
+ print(f"Failed to create websocket consumer, err:{err}");
+ raise err
+# ANCHOR_END: create_consumer
+
+def seek_offset(consumer):
+ # ANCHOR: assignment
+ try:
+ assignments = consumer.assignment()
+ for assignment in assignments:
+ topic = assignment.topic()
+ print(f"topic: {topic}")
+ for assign in assignment.assignments():
+ print(f"vg_id: {assign.vg_id()}, offset: {assign.offset()}, begin: {assign.begin()}, end: {assign.end()}")
+ consumer.seek(topic, assign.vg_id(), assign.begin())
+ print("assignment seek to beginning successfully");
+
+ except Exception as err:
+ print("seek example failed; err:{err}")
+ raise err
+ # ANCHOR_END: assignment
+
+# ANCHOR: subscribe
+def subscribe(consumer):
+ try:
+ consumer.subscribe(["topic_meters"])
+ print("subscribe topics successfully")
+ for i in range(50):
+ records = consumer.poll(timeout=1.0)
+ if records:
+ for block in records:
+ for row in block:
+ print(row)
+
+ except Exception as err:
+ print("Failed to poll data, err:{err}")
+ raise err
+# ANCHOR_END: subscribe
+
+# ANCHOR: commit_offset
+def commit_offset(consumer):
+ try:
+ consumer.subscribe(["topic_meters"])
+ print("subscribe topics successfully")
+ for i in range(50):
+ records = consumer.poll(timeout=1.0)
+ if records:
+ for block in records:
+ for row in block:
+ print(row)
+ consumer.commit(records)
+
+ except Exception as err:
+ print("Failed to poll data, err:{err}")
+ raise err
+# ANCHOR_END: commit_offset
+#
+# ANCHOR: unsubscribe
+def unsubscribe(consumer):
+ try:
+ consumer.unsubscribe()
+ except Exception as err:
+ print("Failed to unsubscribe consumer. err:{err}")
+
+# ANCHOR_END: unsubscribe
+
+if __name__ == "__main__":
+ consumer = None
+ try:
+ prepareMeta()
+ consumer = create_consumer()
+ subscribe(consumer)
+ seek_offset(consumer)
+ commit_offset(consumer)
+ unsubscribe(consumer)
+ except Exception as err:
+ print("Failed to stmt consumer. err:{err}")
+ finally:
+ if consumer:
+ consumer.close()
\ No newline at end of file
diff --git a/docs/zh/08-develop/02-sql.md b/docs/zh/08-develop/02-sql.md
index 01d94cca53..eb0de71244 100644
--- a/docs/zh/08-develop/02-sql.md
+++ b/docs/zh/08-develop/02-sql.md
@@ -252,6 +252,19 @@ reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId
+
+```python title="Websocket 连接"
+{{#include docs/examples/python/reqid_ws.py}}
+```
+
+```python title="原生连接"
+{{#include docs/examples/python/reqid_native.py}}
+```
+
+```python title="Rest 连接"
+{{#include docs/examples/python/reqid_rest.py}}
+```
+
```go
diff --git a/docs/zh/08-develop/04-schemaless.md b/docs/zh/08-develop/04-schemaless.md
index 08c2ce70fb..f25162a62a 100644
--- a/docs/zh/08-develop/04-schemaless.md
+++ b/docs/zh/08-develop/04-schemaless.md
@@ -177,6 +177,10 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO
+
+```python
+{{#include docs/examples/python/schemaless_ws.py}}
+```
```go
@@ -219,6 +223,10 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO
+
+```python
+{{#include docs/examples/python/schemaless_native.py}}
+```
```go
@@ -248,6 +256,7 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO
不支持
+ 不支持
不支持
diff --git a/docs/zh/08-develop/05-stmt.md b/docs/zh/08-develop/05-stmt.md
index c13fb96446..6cf8149fc6 100644
--- a/docs/zh/08-develop/05-stmt.md
+++ b/docs/zh/08-develop/05-stmt.md
@@ -34,9 +34,9 @@ import TabItem from "@theme/TabItem";
- ```python
- {{#include docs/examples/python/connect_websocket_examples.py:connect}}
- ```
+```python
+{{#include docs/examples/python/stmt_ws.py}}
+```
```go
@@ -79,6 +79,9 @@ import TabItem from "@theme/TabItem";
+```python
+{{#include docs/examples/python/stmt_native.py}}
+```
```go
diff --git a/docs/zh/08-develop/07-tmq.md b/docs/zh/08-develop/07-tmq.md
index ff7ac5d997..a9c2b391d3 100644
--- a/docs/zh/08-develop/07-tmq.md
+++ b/docs/zh/08-develop/07-tmq.md
@@ -111,7 +111,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
```python
-{{#include docs/examples/python/connect_websocket_examples.py:connect}}
+{{#include docs/examples/python/tmq_websocket_example.py:create_consumer}}
```
@@ -171,7 +171,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
```python
-{{#include docs/examples/python/connect_websocket_examples.py:connect}}
+{{#include docs/examples/python/tmq_native.py:create_consumer}}
```
@@ -223,7 +223,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
```python
-{{#include docs/examples/python/connect_websocket_examples.py:connect}}
+{{#include docs/examples/python/tmq_websocket_example.py:subscribe}}
```
@@ -271,7 +271,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
```python
-{{#include docs/examples/python/connect_websocket_examples.py:connect}}
+{{#include docs/examples/python/tmq_native.py:subscribe}}
```
@@ -317,7 +317,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
```python
-{{#include docs/examples/python/connect_websocket_examples.py:connect}}
+{{#include docs/examples/python/tmq_websocket_example.py:assignment}}
```
@@ -371,7 +371,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
```python
-{{#include docs/examples/python/connect_websocket_examples.py:connect}}
+{{#include docs/examples/python/tmq_native.py:assignment}}
```
@@ -417,7 +417,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
```python
-{{#include docs/examples/python/connect_websocket_examples.py:connect}}
+{{#include docs/examples/python/tmq_websocket_example.py:commit_offset}}
```
@@ -465,7 +465,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
```python
-{{#include docs/examples/python/connect_websocket_examples.py:connect}}
+{{#include docs/examples/python/tmq_native.py:commit_offset}}
```
@@ -511,7 +511,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
```python
-{{#include docs/examples/python/connect_websocket_examples.py:connect}}
+{{#include docs/examples/python/tmq_websocket_example.py:unsubscribe}}
```
@@ -559,7 +559,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
```python
-{{#include docs/examples/python/connect_websocket_examples.py:connect}}
+{{#include docs/examples/python/tmq_native.py:unsubscribe}}
```
@@ -607,7 +607,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
```python
-{{#include docs/examples/python/connect_websocket_examples.py:connect}}
+{{#include docs/examples/python/tmq_websocket_example.py}}
```
@@ -661,7 +661,7 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
```python
-{{#include docs/examples/python/connect_websocket_examples.py:connect}}
+{{#include docs/examples/python/tmq_native.py}}
```