add python example
This commit is contained in:
parent
6693b2cc58
commit
b5a7c7fc1f
|
@ -14,7 +14,6 @@ conn.execute("insert into tb1 values (now, 1)", req_id=6)
|
||||||
conn.execute("insert into tb1 values (now, 2)", req_id=7)
|
conn.execute("insert into tb1 values (now, 2)", req_id=7)
|
||||||
conn.execute("insert into tb1 values (now, 3)", req_id=8)
|
conn.execute("insert into tb1 values (now, 3)", req_id=8)
|
||||||
|
|
||||||
r = conn.execute("select * from stb", req_id=9)
|
|
||||||
result = conn.query("select * from stb", req_id=10)
|
result = conn.query("select * from stb", req_id=10)
|
||||||
num_of_fields = result.field_count
|
num_of_fields = result.field_count
|
||||||
print(num_of_fields)
|
print(num_of_fields)
|
||||||
|
|
|
@ -16,7 +16,7 @@ try:
|
||||||
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
|
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
|
||||||
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
|
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
|
||||||
"""
|
"""
|
||||||
inserted = conn.execute(sql)
|
affectedRows = conn.execute(sql)
|
||||||
print("inserted into {affectedRows} rows to power.meters successfully.")
|
print("inserted into {affectedRows} rows to power.meters successfully.")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
import taos
|
||||||
|
|
||||||
|
conn = None
|
||||||
|
try:
|
||||||
|
conn = taos.connect(host="localhost",
|
||||||
|
port=6030,
|
||||||
|
user="root",
|
||||||
|
password="taosdata")
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
INSERT INTO
|
||||||
|
power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
|
||||||
|
VALUES (NOW + 1a, 10.30000, 219, 0.31000)
|
||||||
|
(NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
|
||||||
|
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
|
||||||
|
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
|
||||||
|
"""
|
||||||
|
inserted = conn.execute(sql, 1)
|
||||||
|
print("inserted into {affectedRows} rows to power.meters successfully.")
|
||||||
|
|
||||||
|
result = conn.query("SELECT ts, current, location FROM power.meters limit 100", 2)
|
||||||
|
print(result)
|
||||||
|
# Get fields from result
|
||||||
|
fields = result.fields
|
||||||
|
for field in fields:
|
||||||
|
print(field)
|
||||||
|
|
||||||
|
# Get data from result as list of tuple
|
||||||
|
data = result.fetch_all()
|
||||||
|
for row in data:
|
||||||
|
print(row)
|
||||||
|
|
||||||
|
except Exception as err:
|
||||||
|
print(err)
|
||||||
|
finally:
|
||||||
|
if conn:
|
||||||
|
conn.close()
|
|
@ -0,0 +1,15 @@
|
||||||
|
import taosrest
|
||||||
|
|
||||||
|
client = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
client = taosrest.RestClient(url="http://localhost:6041",
|
||||||
|
user="root",
|
||||||
|
password="taosdata",
|
||||||
|
timeout=30)
|
||||||
|
|
||||||
|
result = client.sql(f"SELECT ts, current, location FROM power.meters limit 100", 1)
|
||||||
|
print(result)
|
||||||
|
|
||||||
|
except Exception as err:
|
||||||
|
print(err)
|
|
@ -0,0 +1,35 @@
|
||||||
|
import taosws
|
||||||
|
|
||||||
|
conn = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
conn = taosws.connect(
|
||||||
|
user="root",
|
||||||
|
password="taosdata",
|
||||||
|
host="localhost",
|
||||||
|
port=6041,
|
||||||
|
)
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
INSERT INTO
|
||||||
|
power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
|
||||||
|
VALUES (NOW + 1a, 10.30000, 219, 0.31000)
|
||||||
|
(NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
|
||||||
|
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
|
||||||
|
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
|
||||||
|
"""
|
||||||
|
affectedRows = conn.execute_with_req_id(sql, req_id=1)
|
||||||
|
print("inserted into {affectedRows} rows to power.meters successfully.")
|
||||||
|
|
||||||
|
result = conn.query_with_req_id("SELECT ts, current, location FROM power.meters limit 100", req_id=2)
|
||||||
|
num_of_fields = result.field_count
|
||||||
|
print(num_of_fields)
|
||||||
|
|
||||||
|
for row in result:
|
||||||
|
print(row)
|
||||||
|
|
||||||
|
except Exception as err:
|
||||||
|
print(err)
|
||||||
|
finally:
|
||||||
|
if conn:
|
||||||
|
conn.close()
|
|
@ -1,27 +1,26 @@
|
||||||
import taos
|
import taos
|
||||||
|
|
||||||
|
lineDemo = [
|
||||||
|
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
|
||||||
|
]
|
||||||
|
|
||||||
|
telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"]
|
||||||
|
|
||||||
|
jsonDemo = [
|
||||||
|
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
conn = taos.connect(
|
conn = taos.connect(
|
||||||
host="localhost",
|
host="localhost",
|
||||||
user="root",
|
user="root",
|
||||||
password="taosdata",
|
password="taosdata",
|
||||||
port=6030,
|
port=6030
|
||||||
)
|
)
|
||||||
|
|
||||||
db = "power"
|
conn.execute("CREATE DATABASE IF NOT EXISTS power")
|
||||||
|
|
||||||
conn.execute(f"DROP DATABASE IF EXISTS {db}")
|
|
||||||
conn.execute(f"CREATE DATABASE {db}")
|
|
||||||
|
|
||||||
# change database. same as execute "USE db"
|
# change database. same as execute "USE db"
|
||||||
conn.select_db(db)
|
conn.select_db("power")
|
||||||
|
|
||||||
lineDemo = [
|
|
||||||
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000"
|
|
||||||
]
|
|
||||||
telnetDemo = ["stb0_0 1707095283260 4 host=host0 interface=eth0"]
|
|
||||||
jsonDemo = [
|
|
||||||
'{"metric": "meter_current","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
|
|
||||||
]
|
|
||||||
|
|
||||||
conn.schemaless_insert(
|
conn.schemaless_insert(
|
||||||
lineDemo, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
|
lineDemo, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
|
||||||
|
@ -32,5 +31,9 @@ conn.schemaless_insert(
|
||||||
conn.schemaless_insert(
|
conn.schemaless_insert(
|
||||||
jsonDemo, taos.SmlProtocol.JSON_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
|
jsonDemo, taos.SmlProtocol.JSON_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
|
||||||
)
|
)
|
||||||
|
except Exception as err:
|
||||||
|
print(err)
|
||||||
|
finally:
|
||||||
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
|
@ -1,46 +1,46 @@
|
||||||
import taosws
|
import taosws
|
||||||
|
|
||||||
dsn = "taosws://root:taosdata@localhost:6041"
|
conn = None
|
||||||
conn = taosws.connect(dsn)
|
|
||||||
|
|
||||||
db = "power"
|
|
||||||
|
|
||||||
conn.execute(f"DROP DATABASE IF EXISTS {db}")
|
|
||||||
conn.execute(f"CREATE DATABASE {db}")
|
|
||||||
|
|
||||||
# change database.
|
|
||||||
conn = taosws.connect(f"{dsn}/{db}")
|
|
||||||
|
|
||||||
lineDemo = [
|
lineDemo = [
|
||||||
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000"
|
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
|
||||||
]
|
]
|
||||||
telnetDemo = ["stb0_0 1707095283260 4 host=host0 interface=eth0"]
|
|
||||||
|
telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"]
|
||||||
|
|
||||||
jsonDemo = [
|
jsonDemo = [
|
||||||
'{"metric": "meter_current","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
|
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
|
||||||
]
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
conn = taosws.connect(user="root",
|
||||||
|
password="taosdata",
|
||||||
|
host="localhost",
|
||||||
|
port=6041)
|
||||||
|
|
||||||
|
conn.execute("CREATE DATABASE IF NOT EXISTS power")
|
||||||
|
conn = conn.execute("USE power")
|
||||||
|
|
||||||
conn.schemaless_insert(
|
conn.schemaless_insert(
|
||||||
lines=lineDemo,
|
lines=lineDemo,
|
||||||
protocol=taosws.PySchemalessProtocol.Line,
|
protocol=taosws.PySchemalessProtocol.Line,
|
||||||
precision=taosws.PySchemalessPrecision.Millisecond,
|
precision=taosws.PySchemalessPrecision.Millisecond
|
||||||
ttl=1,
|
|
||||||
req_id=1,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
conn.schemaless_insert(
|
conn.schemaless_insert(
|
||||||
lines=telnetDemo,
|
lines=telnetDemo,
|
||||||
protocol=taosws.PySchemalessProtocol.Telnet,
|
protocol=taosws.PySchemalessProtocol.Telnet,
|
||||||
precision=taosws.PySchemalessPrecision.Microsecond,
|
precision=taosws.PySchemalessPrecision.Microsecond
|
||||||
ttl=1,
|
|
||||||
req_id=2,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
conn.schemaless_insert(
|
conn.schemaless_insert(
|
||||||
lines=jsonDemo,
|
lines=jsonDemo,
|
||||||
protocol=taosws.PySchemalessProtocol.Json,
|
protocol=taosws.PySchemalessProtocol.Json,
|
||||||
precision=taosws.PySchemalessPrecision.Millisecond,
|
precision=taosws.PySchemalessPrecision.Millisecond
|
||||||
ttl=1,
|
|
||||||
req_id=3,
|
|
||||||
)
|
)
|
||||||
|
except Exception as err:
|
||||||
|
print(err)
|
||||||
|
finally:
|
||||||
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,14 @@
|
||||||
import taos
|
import taos
|
||||||
|
from datetime import datetime
|
||||||
|
import random
|
||||||
|
|
||||||
|
numOfSubTable = 10
|
||||||
|
numOfRow = 10
|
||||||
|
|
||||||
|
conn = None
|
||||||
|
stmt = None
|
||||||
|
|
||||||
|
try:
|
||||||
conn = taos.connect(
|
conn = taos.connect(
|
||||||
host="localhost",
|
host="localhost",
|
||||||
user="root",
|
user="root",
|
||||||
|
@ -7,47 +16,49 @@ conn = taos.connect(
|
||||||
port=6030,
|
port=6030,
|
||||||
)
|
)
|
||||||
|
|
||||||
db = "power"
|
conn.execute("CREATE DATABASE IF NOT EXISTS power")
|
||||||
|
conn.execute("USE power")
|
||||||
conn.execute(f"DROP DATABASE IF EXISTS {db}")
|
|
||||||
conn.execute(f"CREATE DATABASE {db}")
|
|
||||||
|
|
||||||
# change database. same as execute "USE db"
|
|
||||||
conn.select_db(db)
|
|
||||||
|
|
||||||
# create super table
|
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
|
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
||||||
)
|
)
|
||||||
|
|
||||||
# ANCHOR: stmt
|
# ANCHOR: stmt
|
||||||
sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
|
sql = "INSERT INTO ? USING meters (groupid, location) TAGS(?,?) VALUES (?,?,?,?)"
|
||||||
stmt = conn.statement(sql)
|
stmt = conn.statement(sql)
|
||||||
|
for i in range(numOfSubTable):
|
||||||
tbname = "power.d1001"
|
tbname = f"d_bind_{i}"
|
||||||
|
|
||||||
tags = taos.new_bind_params(2)
|
tags = taos.new_bind_params(2)
|
||||||
tags[0].binary(["California.SanFrancisco"])
|
tags[0].int([i])
|
||||||
tags[1].int([2])
|
tags[1].binary([f"location_{i}"])
|
||||||
|
|
||||||
stmt.set_tbname_tags(tbname, tags)
|
stmt.set_tbname_tags(tbname, tags)
|
||||||
|
|
||||||
|
current = int(datetime.now().timestamp() * 1000)
|
||||||
|
timestamps = []
|
||||||
|
currents = []
|
||||||
|
voltages = []
|
||||||
|
phases = []
|
||||||
|
|
||||||
|
for j in range (numOfRow):
|
||||||
|
timestamps.append(current + i)
|
||||||
|
currents.append(random.random() * 30)
|
||||||
|
voltages.append(random.random(100, 300))
|
||||||
|
phases.append(random.random())
|
||||||
|
|
||||||
params = taos.new_bind_params(4)
|
params = taos.new_bind_params(4)
|
||||||
params[0].timestamp((1626861392589, 1626861392591, 1626861392592))
|
params[0].timestamp(timestamps)
|
||||||
params[1].float((10.3, 12.6, 12.3))
|
params[1].float(currents)
|
||||||
params[2].int([194, 200, 201])
|
params[2].int(voltages)
|
||||||
params[3].float([0.31, 0.33, 0.31])
|
params[3].float(phases)
|
||||||
|
|
||||||
stmt.bind_param_batch(params)
|
stmt.bind_param_batch(params)
|
||||||
|
|
||||||
stmt.execute()
|
stmt.execute()
|
||||||
|
affected = stmt.affected_rows()
|
||||||
|
print(f"table {tbname} insert {affected} rows.")
|
||||||
|
|
||||||
|
except Exception as err:
|
||||||
|
print(err)
|
||||||
|
finally:
|
||||||
|
if stmt:
|
||||||
stmt.close()
|
stmt.close()
|
||||||
# ANCHOR_END: stmt
|
if conn:
|
||||||
|
|
||||||
result = conn.query("SELECT * from meters")
|
|
||||||
|
|
||||||
for row in result.fetch_all():
|
|
||||||
print(row)
|
|
||||||
|
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -1,52 +1,66 @@
|
||||||
|
from datetime import datetime
|
||||||
|
import random
|
||||||
import taosws
|
import taosws
|
||||||
|
|
||||||
dsn = "taosws://root:taosdata@localhost:6041"
|
numOfSubTable = 10
|
||||||
conn = taosws.connect(dsn)
|
|
||||||
|
|
||||||
db = "power"
|
numOfRow = 10
|
||||||
|
|
||||||
conn.execute(f"DROP DATABASE IF EXISTS {db}")
|
conn = None
|
||||||
conn.execute(f"CREATE DATABASE {db}")
|
stmt = None
|
||||||
|
try:
|
||||||
|
conn = taosws.connect(user="root",
|
||||||
|
password="taosdata",
|
||||||
|
host="localhost",
|
||||||
|
port=6041)
|
||||||
|
|
||||||
# change database.
|
conn.execute("CREATE DATABASE IF NOT EXISTS power")
|
||||||
conn.execute(f"USE {db}")
|
conn.execute("USE power")
|
||||||
|
|
||||||
# create super table
|
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
|
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
||||||
)
|
)
|
||||||
|
|
||||||
# ANCHOR: stmt
|
# ANCHOR: stmt
|
||||||
sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
|
sql = "INSERT INTO ? USING meters (groupid, location) TAGS(?,?) VALUES (?,?,?,?)"
|
||||||
stmt = conn.statement()
|
stmt = conn.statement()
|
||||||
stmt.prepare(sql)
|
stmt.prepare(sql)
|
||||||
|
|
||||||
tbname = "power.d1001"
|
for i in range(numOfSubTable):
|
||||||
|
tbname = f"d_bind_{i}"
|
||||||
|
|
||||||
tags = [
|
tags = [
|
||||||
taosws.varchar_to_tag("California.SanFrancisco"),
|
taosws.int_to_tag(i),
|
||||||
taosws.int_to_tag(2),
|
taosws.varchar_to_tag(f"location_{i}"),
|
||||||
]
|
]
|
||||||
|
|
||||||
stmt.set_tbname_tags(tbname, tags)
|
stmt.set_tbname_tags(tbname, tags)
|
||||||
|
current = int(datetime.now().timestamp() * 1000)
|
||||||
|
timestamps = []
|
||||||
|
currents = []
|
||||||
|
voltages = []
|
||||||
|
phases = []
|
||||||
|
for j in range (numOfRow):
|
||||||
|
timestamps.append(current + i)
|
||||||
|
currents.append(random.random() * 30)
|
||||||
|
voltages.append(random.random(100, 300))
|
||||||
|
phases.append(random.random())
|
||||||
|
|
||||||
stmt.bind_param(
|
stmt.bind_param(
|
||||||
[
|
[
|
||||||
taosws.millis_timestamps_to_column(
|
taosws.millis_timestamps_to_column(timestamps),
|
||||||
[1626861392589, 1626861392591, 1626861392592]
|
taosws.floats_to_column(currents),
|
||||||
),
|
taosws.ints_to_column(voltages),
|
||||||
taosws.floats_to_column([10.3, 12.6, 12.3]),
|
taosws.floats_to_column(phases),
|
||||||
taosws.ints_to_column([194, 200, 201]),
|
|
||||||
taosws.floats_to_column([0.31, 0.33, 0.31]),
|
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
stmt.add_batch()
|
stmt.add_batch()
|
||||||
rows = stmt.execute()
|
rows = stmt.execute()
|
||||||
|
print(f"insert {rows} rows.")
|
||||||
|
|
||||||
assert rows == 3
|
except Exception as err:
|
||||||
|
print(err)
|
||||||
|
finally:
|
||||||
|
if stmt:
|
||||||
stmt.close()
|
stmt.close()
|
||||||
# ANCHOR_END: stmt
|
if conn:
|
||||||
|
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -1,5 +1,8 @@
|
||||||
import taos
|
import taos
|
||||||
|
|
||||||
|
def prepareMeta():
|
||||||
|
conn = None
|
||||||
|
try:
|
||||||
conn = taos.connect(
|
conn = taos.connect(
|
||||||
host="localhost",
|
host="localhost",
|
||||||
user="root",
|
user="root",
|
||||||
|
@ -9,17 +12,14 @@ conn = taos.connect(
|
||||||
|
|
||||||
db = "power"
|
db = "power"
|
||||||
topic = "topic_meters"
|
topic = "topic_meters"
|
||||||
|
conn.execute(f"CREATE DATABASE IF EXISTS {db}")
|
||||||
conn.execute(f"DROP TOPIC IF EXISTS {topic}")
|
|
||||||
conn.execute(f"DROP DATABASE IF EXISTS {db}")
|
|
||||||
conn.execute(f"CREATE DATABASE {db}")
|
|
||||||
|
|
||||||
# change database. same as execute "USE db"
|
# change database. same as execute "USE db"
|
||||||
conn.select_db(db)
|
conn.select_db(db)
|
||||||
|
|
||||||
# create super table
|
# create super table
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
|
"CREATE STABLE IF EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
|
||||||
)
|
)
|
||||||
|
|
||||||
# ANCHOR: create_topic
|
# ANCHOR: create_topic
|
||||||
|
@ -28,57 +28,126 @@ conn.execute(
|
||||||
f"CREATE TOPIC IF NOT EXISTS {topic} AS SELECT ts, current, voltage, phase, groupid, location FROM meters"
|
f"CREATE TOPIC IF NOT EXISTS {topic} AS SELECT ts, current, voltage, phase, groupid, location FROM meters"
|
||||||
)
|
)
|
||||||
# ANCHOR_END: create_topic
|
# ANCHOR_END: create_topic
|
||||||
|
sql = """
|
||||||
|
INSERT INTO
|
||||||
|
power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
|
||||||
|
VALUES (NOW + 1a, 10.30000, 219, 0.31000)
|
||||||
|
(NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
|
||||||
|
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
|
||||||
|
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
|
||||||
|
"""
|
||||||
|
affectedRows = conn.execute(sql)
|
||||||
|
print("inserted into {affectedRows} rows to power.meters successfully.")
|
||||||
|
except Exception as err:
|
||||||
|
print("prepare meta err:{err}")
|
||||||
|
finally
|
||||||
|
if conn:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
# ANCHOR: create_consumer
|
# ANCHOR: create_consumer
|
||||||
from taos.tmq import Consumer
|
from taos.tmq import Consumer
|
||||||
|
def create_consumer():
|
||||||
|
try:
|
||||||
consumer = Consumer(
|
consumer = Consumer(
|
||||||
{
|
{
|
||||||
"group.id": "1",
|
"group.id": "group1",
|
||||||
|
"client.id": "1",
|
||||||
"td.connect.user": "root",
|
"td.connect.user": "root",
|
||||||
"td.connect.pass": "taosdata",
|
"td.connect.pass": "taosdata",
|
||||||
"enable.auto.commit": "true",
|
"enable.auto.commit": "true",
|
||||||
|
"auto.commit.interval.ms":"1000",
|
||||||
|
"auto.offset.reset": "latest",
|
||||||
|
"td.connect.ip": "192.168.1.98",
|
||||||
|
"td.connect.port": "6041",
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
except Exception as err:
|
||||||
|
print("Failed to poll data, err:{err}")
|
||||||
|
raise err
|
||||||
# ANCHOR_END: create_consumer
|
# ANCHOR_END: create_consumer
|
||||||
|
|
||||||
# ANCHOR: subscribe
|
# ANCHOR: subscribe
|
||||||
consumer.subscribe([topic])
|
def subscribe(consumer):
|
||||||
# ANCHOR_END: subscribe
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# ANCHOR: consume
|
consumer.subscribe(["topic_meters"])
|
||||||
while True:
|
print("subscribe topics successfully")
|
||||||
res = consumer.poll(1)
|
for i in range(50):
|
||||||
if not res:
|
records = consumer.poll(1)
|
||||||
break
|
if records:
|
||||||
err = res.error()
|
err = records.error()
|
||||||
if err is not None:
|
if err is not None:
|
||||||
|
print(f"poll data error, {err}")
|
||||||
raise err
|
raise err
|
||||||
val = res.value()
|
|
||||||
|
|
||||||
|
val = res.value()
|
||||||
|
if val:
|
||||||
for block in val:
|
for block in val:
|
||||||
print(block.fetchall())
|
print(block.fetchall())
|
||||||
# ANCHOR_END: consume
|
|
||||||
|
|
||||||
|
except Exception as err:
|
||||||
|
print("Failed to poll data, err:{err}")
|
||||||
|
raise err
|
||||||
|
# ANCHOR_END: subscribe
|
||||||
|
|
||||||
|
def commit_offset(consumer):
|
||||||
|
# ANCHOR: commit_offset
|
||||||
|
try:
|
||||||
|
consumer.subscribe(["topic_meters"])
|
||||||
|
print("subscribe topics successfully")
|
||||||
|
for i in range(50):
|
||||||
|
records = consumer.poll(1)
|
||||||
|
if records:
|
||||||
|
err = records.error()
|
||||||
|
if err is not None:
|
||||||
|
print(f"poll data error, {err}")
|
||||||
|
raise err
|
||||||
|
|
||||||
|
val = res.value()
|
||||||
|
if val:
|
||||||
|
for block in val:
|
||||||
|
print(block.fetchall())
|
||||||
|
|
||||||
|
consumer.commit(records)
|
||||||
|
|
||||||
|
except Exception as err:
|
||||||
|
print("Failed to poll data, err:{err}")
|
||||||
|
raise err
|
||||||
|
# ANCHOR_END: commit_offset
|
||||||
|
|
||||||
|
def seek_offset(consumer):
|
||||||
# ANCHOR: assignment
|
# ANCHOR: assignment
|
||||||
|
try:
|
||||||
assignments = consumer.assignment()
|
assignments = consumer.assignment()
|
||||||
for assignment in assignments:
|
for partition in assignments:
|
||||||
print(assignment)
|
print("first data polled: {partition.offset}")
|
||||||
|
partition.offset = 0
|
||||||
|
consumer.seek(partition)
|
||||||
|
print("assignment seek to beginning successfully");
|
||||||
|
except Exception as err:
|
||||||
|
print("seek example failed; err:{err}")
|
||||||
|
raise err
|
||||||
# ANCHOR_END: assignment
|
# ANCHOR_END: assignment
|
||||||
|
|
||||||
# ANCHOR: seek
|
|
||||||
offset = taos.tmq.TopicPartition(
|
|
||||||
topic=topic,
|
|
||||||
partition=assignment.partition,
|
|
||||||
offset=0,
|
|
||||||
)
|
|
||||||
consumer.seek(offset)
|
|
||||||
# ANCHOR_END: seek
|
|
||||||
finally:
|
|
||||||
# ANCHOR: unsubscribe
|
# ANCHOR: unsubscribe
|
||||||
|
def unsubscribe(consumer):
|
||||||
|
try:
|
||||||
consumer.unsubscribe()
|
consumer.unsubscribe()
|
||||||
consumer.close()
|
except Exception as err:
|
||||||
|
print("Failed to unsubscribe consumer. err:{err}")
|
||||||
|
|
||||||
# ANCHOR_END: unsubscribe
|
# ANCHOR_END: unsubscribe
|
||||||
|
|
||||||
conn.close()
|
if __name__ == "__main__":
|
||||||
|
consumer = None
|
||||||
|
try:
|
||||||
|
prepareMeta()
|
||||||
|
consumer = create_consumer()
|
||||||
|
subscribe(consumer)
|
||||||
|
seek_offset(consumer)
|
||||||
|
commit_offset(consumer)
|
||||||
|
unsubscribe(consumer)
|
||||||
|
except Exception as err:
|
||||||
|
print("Failed to stmt consumer. err:{err}")
|
||||||
|
finally:
|
||||||
|
if consumer:
|
||||||
|
consumer.close()
|
|
@ -1,31 +1,143 @@
|
||||||
#!/usr/bin/python3
|
#!/usr/bin/python3
|
||||||
from taosws import Consumer
|
from taosws
|
||||||
|
|
||||||
conf = {
|
def prepareMeta():
|
||||||
|
conn = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
conn = taosws.connect(user="root",
|
||||||
|
password="taosdata",
|
||||||
|
host="localhost",
|
||||||
|
port=6041)
|
||||||
|
|
||||||
|
db = "power"
|
||||||
|
# create database
|
||||||
|
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
|
||||||
|
assert rowsAffected == 0
|
||||||
|
|
||||||
|
# change database.
|
||||||
|
rowsAffected = conn.execute(f"USE {db}")
|
||||||
|
assert rowsAffected == 0
|
||||||
|
|
||||||
|
# create super table
|
||||||
|
rowsAffected = conn.execute(
|
||||||
|
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
||||||
|
)
|
||||||
|
assert rowsAffected == 0
|
||||||
|
|
||||||
|
# create table
|
||||||
|
rowsAffected = conn.execute("CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupid, location) TAGS(0, 'Los Angles')")
|
||||||
|
assert rowsAffected == 0
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
INSERT INTO
|
||||||
|
power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
|
||||||
|
VALUES (NOW + 1a, 10.30000, 219, 0.31000)
|
||||||
|
(NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
|
||||||
|
power.d1002 USING power.meters (groupid, location) TAGS(3, 'California.SanFrancisco')
|
||||||
|
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
|
||||||
|
"""
|
||||||
|
inserted = conn.execute(sql)
|
||||||
|
print("inserted into {affectedRows} rows to power.meters successfully.")
|
||||||
|
|
||||||
|
except Exception as err:
|
||||||
|
print(err)
|
||||||
|
finally:
|
||||||
|
if conn:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
# ANCHOR: create_consumer
|
||||||
|
def create_consumer():
|
||||||
|
try:
|
||||||
|
consumer = taosws.Consumer(conf={
|
||||||
"td.connect.websocket.scheme": "ws",
|
"td.connect.websocket.scheme": "ws",
|
||||||
"group.id": "0",
|
"group.id": "group1",
|
||||||
}
|
"client.id": "1",
|
||||||
consumer = Consumer(conf)
|
"auto.offset.reset": "latest",
|
||||||
|
"td.connect.ip": "192.168.1.98",
|
||||||
|
"td.connect.port": "6041",
|
||||||
|
"enable.auto.commit": "true",
|
||||||
|
"auto.commit.interval.ms":"1000",
|
||||||
|
})
|
||||||
|
return consumer;
|
||||||
|
except Exception as err:
|
||||||
|
print(f"Failed to create websocket consumer, err:{err}");
|
||||||
|
raise err
|
||||||
|
# ANCHOR_END: create_consumer
|
||||||
|
|
||||||
consumer.subscribe(["test"])
|
def seek_offset(consumer):
|
||||||
|
# ANCHOR: assignment
|
||||||
|
try:
|
||||||
|
assignments = consumer.assignment()
|
||||||
|
for assignment in assignments:
|
||||||
|
topic = assignment.topic()
|
||||||
|
print(f"topic: {topic}")
|
||||||
|
for assign in assignment.assignments():
|
||||||
|
print(f"vg_id: {assign.vg_id()}, offset: {assign.offset()}, begin: {assign.begin()}, end: {assign.end()}")
|
||||||
|
consumer.seek(topic, assign.vg_id(), assign.begin())
|
||||||
|
print("assignment seek to beginning successfully");
|
||||||
|
|
||||||
while True:
|
except Exception as err:
|
||||||
message = consumer.poll(timeout=1.0)
|
print("seek example failed; err:{err}")
|
||||||
if message:
|
raise err
|
||||||
id = message.vgroup()
|
# ANCHOR_END: assignment
|
||||||
topic = message.topic()
|
|
||||||
database = message.database()
|
|
||||||
|
|
||||||
for block in message:
|
# ANCHOR: subscribe
|
||||||
nrows = block.nrows()
|
def subscribe(consumer):
|
||||||
ncols = block.ncols()
|
try:
|
||||||
|
consumer.subscribe(["topic_meters"])
|
||||||
|
print("subscribe topics successfully")
|
||||||
|
for i in range(50):
|
||||||
|
records = consumer.poll(timeout=1.0)
|
||||||
|
if records:
|
||||||
|
for block in records:
|
||||||
for row in block:
|
for row in block:
|
||||||
print(row)
|
print(row)
|
||||||
values = block.fetchall()
|
|
||||||
print(nrows, ncols)
|
|
||||||
|
|
||||||
# consumer.commit(message)
|
except Exception as err:
|
||||||
else:
|
print("Failed to poll data, err:{err}")
|
||||||
break
|
raise err
|
||||||
|
# ANCHOR_END: subscribe
|
||||||
|
|
||||||
|
# ANCHOR: commit_offset
|
||||||
|
def commit_offset(consumer):
|
||||||
|
try:
|
||||||
|
consumer.subscribe(["topic_meters"])
|
||||||
|
print("subscribe topics successfully")
|
||||||
|
for i in range(50):
|
||||||
|
records = consumer.poll(timeout=1.0)
|
||||||
|
if records:
|
||||||
|
for block in records:
|
||||||
|
for row in block:
|
||||||
|
print(row)
|
||||||
|
consumer.commit(records)
|
||||||
|
|
||||||
|
except Exception as err:
|
||||||
|
print("Failed to poll data, err:{err}")
|
||||||
|
raise err
|
||||||
|
# ANCHOR_END: commit_offset
|
||||||
|
#
|
||||||
|
# ANCHOR: unsubscribe
|
||||||
|
def unsubscribe(consumer):
|
||||||
|
try:
|
||||||
|
consumer.unsubscribe()
|
||||||
|
except Exception as err:
|
||||||
|
print("Failed to unsubscribe consumer. err:{err}")
|
||||||
|
|
||||||
|
# ANCHOR_END: unsubscribe
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
consumer = None
|
||||||
|
try:
|
||||||
|
prepareMeta()
|
||||||
|
consumer = create_consumer()
|
||||||
|
subscribe(consumer)
|
||||||
|
seek_offset(consumer)
|
||||||
|
commit_offset(consumer)
|
||||||
|
unsubscribe(consumer)
|
||||||
|
except Exception as err:
|
||||||
|
print("Failed to stmt consumer. err:{err}")
|
||||||
|
finally:
|
||||||
|
if consumer:
|
||||||
consumer.close()
|
consumer.close()
|
|
@ -265,6 +265,19 @@ reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId
|
||||||
|
|
||||||
</TabItem>
|
</TabItem>
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
|
```python title="Websocket 连接"
|
||||||
|
{{#include docs/examples/python/reqid_ws.py}}
|
||||||
|
```
|
||||||
|
|
||||||
|
```python title="原生连接"
|
||||||
|
{{#include docs/examples/python/reqid_native.py}}
|
||||||
|
```
|
||||||
|
|
||||||
|
```python title="Rest 连接"
|
||||||
|
{{#include docs/examples/python/reqid_rest.py}}
|
||||||
|
```
|
||||||
|
|
||||||
</TabItem>
|
</TabItem>
|
||||||
<TabItem label="Go" value="go">
|
<TabItem label="Go" value="go">
|
||||||
```go
|
```go
|
||||||
|
|
|
@ -177,6 +177,10 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO
|
||||||
|
|
||||||
</TabItem>
|
</TabItem>
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
|
```python
|
||||||
|
{{#include docs/examples/python/schemaless_ws.py}}
|
||||||
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
<TabItem label="Go" value="go">
|
<TabItem label="Go" value="go">
|
||||||
```go
|
```go
|
||||||
|
@ -223,6 +227,10 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO
|
||||||
|
|
||||||
</TabItem>
|
</TabItem>
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
|
```python
|
||||||
|
{{#include docs/examples/python/schemaless_native.py}}
|
||||||
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
<TabItem label="Go" value="go">
|
<TabItem label="Go" value="go">
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
@ -250,6 +258,7 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO
|
||||||
不支持
|
不支持
|
||||||
</TabItem>
|
</TabItem>
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
不支持
|
||||||
</TabItem>
|
</TabItem>
|
||||||
<TabItem label="Go" value="go">
|
<TabItem label="Go" value="go">
|
||||||
```go
|
```go
|
||||||
|
|
|
@ -35,7 +35,7 @@ import TabItem from "@theme/TabItem";
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
```python
|
```python
|
||||||
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
|
{{#include docs/examples/python/stmt_ws.py}}
|
||||||
```
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
<TabItem label="Go" value="go">
|
<TabItem label="Go" value="go">
|
||||||
|
@ -85,6 +85,9 @@ import TabItem from "@theme/TabItem";
|
||||||
</TabItem>
|
</TabItem>
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
|
```python
|
||||||
|
{{#include docs/examples/python/stmt_native.py}}
|
||||||
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
<TabItem label="Go" value="go">
|
<TabItem label="Go" value="go">
|
||||||
```go
|
```go
|
||||||
|
|
|
@ -117,7 +117,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
```python
|
```python
|
||||||
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
|
{{#include docs/examples/python/tmq_websocket_example.py:create_consumer}}
|
||||||
```
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
|
||||||
|
@ -181,7 +181,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
```python
|
```python
|
||||||
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
|
{{#include docs/examples/python/tmq_native.py:create_consumer}}
|
||||||
```
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
|
||||||
|
@ -233,7 +233,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
```python
|
```python
|
||||||
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
|
{{#include docs/examples/python/tmq_websocket_example.py:subscribe}}
|
||||||
```
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
|
||||||
|
@ -284,7 +284,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
```python
|
```python
|
||||||
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
|
{{#include docs/examples/python/tmq_native.py:subscribe}}
|
||||||
```
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
|
||||||
|
@ -333,7 +333,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
```python
|
```python
|
||||||
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
|
{{#include docs/examples/python/tmq_websocket_example.py:assignment}}
|
||||||
```
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
|
||||||
|
@ -384,7 +384,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
```python
|
```python
|
||||||
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
|
{{#include docs/examples/python/tmq_native.py:assignment}}
|
||||||
```
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
|
||||||
|
@ -435,7 +435,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
```python
|
```python
|
||||||
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
|
{{#include docs/examples/python/tmq_websocket_example.py:commit_offset}}
|
||||||
```
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
|
||||||
|
@ -488,7 +488,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
```python
|
```python
|
||||||
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
|
{{#include docs/examples/python/tmq_native.py:commit_offset}}
|
||||||
```
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
|
||||||
|
@ -543,7 +543,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
```python
|
```python
|
||||||
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
|
{{#include docs/examples/python/tmq_websocket_example.py:unsubscribe}}
|
||||||
```
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
|
||||||
|
@ -595,7 +595,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
```python
|
```python
|
||||||
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
|
{{#include docs/examples/python/tmq_native.py:unsubscribe}}
|
||||||
```
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
|
||||||
|
@ -651,7 +651,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
```python
|
```python
|
||||||
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
|
{{#include docs/examples/python/tmq_websocket_example.py}}
|
||||||
```
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
|
||||||
|
@ -711,7 +711,7 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
|
||||||
<TabItem label="Python" value="python">
|
<TabItem label="Python" value="python">
|
||||||
|
|
||||||
```python
|
```python
|
||||||
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
|
{{#include docs/examples/python/tmq_native.py}}
|
||||||
```
|
```
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue