Merge branch 'docs/wade-3.0' of github.com:taosdata/TDengine into docs/wade-3.0

This commit is contained in:
sheyanjie-qq 2024-08-05 13:25:50 +08:00
commit baf21aa3ec
6 changed files with 82 additions and 48 deletions

View File

@ -15,8 +15,7 @@ try:
assert rowsAffected == 0 assert rowsAffected == 0
# change database. same as execute "USE db" # change database. same as execute "USE db"
rowsAffected = conn.select_db(db) conn.select_db(db)
assert rowsAffected == 0
# create super table # create super table
rowsAffected = conn.execute( rowsAffected = conn.execute(

View File

@ -1,47 +1,72 @@
import taosws import taosws
conn = None db = "power"
def prepare():
conn = None
try:
conn = taosws.connect(user="root",
password="taosdata",
host="localhost",
port=6041)
lineDemo = [ # create database
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639" rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
] assert rowsAffected == 0
telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"] except Exception as err:
print(f"Failed to create db and table, err:{err}")
finally:
if conn:
conn.close()
jsonDemo = [ def schemaless_insert():
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
]
try: conn = None
conn = taosws.connect(user="root",
password="taosdata",
host="localhost",
port=6041)
conn.execute("CREATE DATABASE IF NOT EXISTS power") lineDemo = [
conn = conn.execute("USE power") "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"
]
conn.schemaless_insert( telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"]
lines=lineDemo,
protocol=taosws.PySchemalessProtocol.Line,
precision=taosws.PySchemalessPrecision.Millisecond
)
conn.schemaless_insert( jsonDemo = [
lines=telnetDemo, '{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
protocol=taosws.PySchemalessProtocol.Telnet, ]
precision=taosws.PySchemalessPrecision.Microsecond
)
conn.schemaless_insert( try:
lines=jsonDemo, conn = taosws.connect(user="root",
protocol=taosws.PySchemalessProtocol.Json, password="taosdata",
precision=taosws.PySchemalessPrecision.Millisecond host="localhost",
) port=6041,
except Exception as err: database=db)
print(f"Failed to insert data with schemaless, err:{err}")
finally: conn.schemaless_insert(
if conn: lines = lineDemo,
conn.close() protocol = taosws.PySchemalessProtocol.Line,
precision = taosws.PySchemalessPrecision.Millisecond,
ttl=1,
req_id=1,
)
conn.schemaless_insert(
lines=telnetDemo,
protocol=taosws.PySchemalessProtocol.Telnet,
precision=taosws.PySchemalessPrecision.Microsecond,
ttl=1,
req_id=2,
)
conn.schemaless_insert(
lines=jsonDemo,
protocol=taosws.PySchemalessProtocol.Json,
precision=taosws.PySchemalessPrecision.Millisecond,
ttl=1,
req_id=3,
)
except Exception as err:
print(f"Failed to insert data with schemaless, err:{err}")
finally:
if conn:
conn.close()

View File

@ -42,7 +42,7 @@ try:
for j in range (numOfRow): for j in range (numOfRow):
timestamps.append(current + i) timestamps.append(current + i)
currents.append(random.random() * 30) currents.append(random.random() * 30)
voltages.append(random.random(100, 300)) voltages.append(random.randint(100, 300))
phases.append(random.random()) phases.append(random.random())
params = taos.new_bind_params(4) params = taos.new_bind_params(4)
@ -52,8 +52,7 @@ try:
params[3].float(phases) params[3].float(phases)
stmt.bind_param_batch(params) stmt.bind_param_batch(params)
stmt.execute() stmt.execute()
affected = stmt.affected_rows() print(f"stmt insert successfully.")
print(f"table {tbname} insert {affected} rows.")
except Exception as err: except Exception as err:
print(f"Failed to insert to table meters using stmt, error: {err}") print(f"Failed to insert to table meters using stmt, error: {err}")

View File

@ -41,7 +41,7 @@ try:
for j in range (numOfRow): for j in range (numOfRow):
timestamps.append(current + i) timestamps.append(current + i)
currents.append(random.random() * 30) currents.append(random.random() * 30)
voltages.append(random.random(100, 300)) voltages.append(random.randint(100, 300))
phases.append(random.random()) phases.append(random.random())
stmt.bind_param( stmt.bind_param(
@ -54,8 +54,9 @@ try:
) )
stmt.add_batch() stmt.add_batch()
rows = stmt.execute() stmt.execute()
print(f"insert {rows} rows.")
print(f"stmt insert successfully.")
except Exception as err: except Exception as err:
print(f"Failed to insert to table meters using stmt, error: {err}") print(f"Failed to insert to table meters using stmt, error: {err}")

View File

@ -1,5 +1,8 @@
#!/usr/bin/python3 #!/usr/bin/python3
import taosws import taosws
topic = "topic_meters"
def prepareMeta(): def prepareMeta():
conn = None conn = None
@ -29,6 +32,13 @@ def prepareMeta():
"CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupid, location) TAGS(0, 'Los Angles')") "CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupid, location) TAGS(0, 'Los Angles')")
assert rowsAffected == 0 assert rowsAffected == 0
# ANCHOR: create_topic
# create topic
conn.execute(
f"CREATE TOPIC IF NOT EXISTS {topic} AS SELECT ts, current, voltage, phase, groupid, location FROM meters"
)
# ANCHOR_END: create_topic
sql = """ sql = """
INSERT INTO INSERT INTO
power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco') power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
@ -91,9 +101,9 @@ def seek_offset(consumer):
# ANCHOR: subscribe # ANCHOR: subscribe
def subscribe(consumer): def subscribe(consumer):
try: try:
consumer.subscribe(["topic_meters"]) consumer.subscribe([topic])
print("subscribe topics successfully") print("subscribe topics successfully")
for i in range(5): for i in range(50):
records = consumer.poll(timeout=1.0) records = consumer.poll(timeout=1.0)
if records: if records:
for block in records: for block in records:
@ -110,7 +120,7 @@ def subscribe(consumer):
# ANCHOR: commit_offset # ANCHOR: commit_offset
def commit_offset(consumer): def commit_offset(consumer):
try: try:
for i in range(5): for i in range(50):
records = consumer.poll(timeout=1.0) records = consumer.poll(timeout=1.0)
if records: if records:
for block in records: for block in records: