build: add python demo to ci (#19641)
* build: add python demo to ci * build: add fast write example to ci * build: add kafka demo to ci * fix kafka demo * fix python demo * fix python demo * fix python demo * fix python demo * fix python demo
This commit is contained in:
parent
6cc6396619
commit
c30cb1f5db
|
@ -0,0 +1,15 @@
|
|||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/taosdata/driver-go/v3 v3.1.0/go.mod h1:H2vo/At+rOPY1aMzUV9P49SVX7NlXb3LAbKw+MCLrmU=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
|
@ -1,8 +1,11 @@
|
|||
import pandas
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy import create_engine, text
|
||||
|
||||
engine = create_engine("taos://root:taosdata@localhost:6030/power")
|
||||
df = pandas.read_sql("SELECT * FROM meters", engine)
|
||||
conn = engine.connect()
|
||||
df = pandas.read_sql(text("SELECT * FROM power.meters"), conn)
|
||||
conn.close()
|
||||
|
||||
|
||||
# print index
|
||||
print(df.index)
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
import pandas
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy import create_engine, text
|
||||
|
||||
engine = create_engine("taosrest://root:taosdata@localhost:6041")
|
||||
df: pandas.DataFrame = pandas.read_sql("SELECT * FROM power.meters", engine)
|
||||
conn = engine.connect()
|
||||
df: pandas.DataFrame = pandas.read_sql(text("SELECT * FROM power.meters"), conn)
|
||||
conn.close()
|
||||
|
||||
# print index
|
||||
print(df.index)
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
# ANCHOR: connect
|
||||
from taosrest import connect, TaosRestConnection, TaosRestCursor
|
||||
|
||||
conn: TaosRestConnection = connect(url="http://localhost:6041",
|
||||
conn = connect(url="http://localhost:6041",
|
||||
user="root",
|
||||
password="taosdata",
|
||||
timeout=30)
|
||||
|
@ -9,10 +9,11 @@ conn: TaosRestConnection = connect(url="http://localhost:6041",
|
|||
# ANCHOR_END: connect
|
||||
# ANCHOR: basic
|
||||
# create STable
|
||||
cursor: TaosRestCursor = conn.cursor()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("DROP DATABASE IF EXISTS power")
|
||||
cursor.execute("CREATE DATABASE power")
|
||||
cursor.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
|
||||
cursor.execute(
|
||||
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
|
||||
|
||||
# insert data
|
||||
cursor.execute("""INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
|
||||
|
@ -28,7 +29,7 @@ print("queried row count:", cursor.rowcount)
|
|||
# get column names from cursor
|
||||
column_names = [meta[0] for meta in cursor.description]
|
||||
# get rows
|
||||
data: list[tuple] = cursor.fetchall()
|
||||
data = cursor.fetchall()
|
||||
print(column_names)
|
||||
for row in data:
|
||||
print(row)
|
||||
|
|
|
@ -8,7 +8,7 @@ conn.execute("CREATE DATABASE test")
|
|||
# change database. same as execute "USE db"
|
||||
conn.select_db("test")
|
||||
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
|
||||
affected_row: int = conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)")
|
||||
affected_row = conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)")
|
||||
print("affected_row", affected_row)
|
||||
# output:
|
||||
# affected_row 3
|
||||
|
@ -16,10 +16,10 @@ print("affected_row", affected_row)
|
|||
|
||||
# ANCHOR: query
|
||||
# Execute a sql and get its result set. It's useful for SELECT statement
|
||||
result: taos.TaosResult = conn.query("SELECT * from weather")
|
||||
result = conn.query("SELECT * from weather")
|
||||
|
||||
# Get fields from result
|
||||
fields: taos.field.TaosFields = result.fields
|
||||
fields = result.fields
|
||||
for field in fields:
|
||||
print(field) # {name: ts, type: 9, bytes: 8}
|
||||
|
||||
|
|
|
@ -1,15 +1,14 @@
|
|||
# install dependencies:
|
||||
# recommend python >= 3.8
|
||||
# pip3 install faster-fifo
|
||||
#
|
||||
|
||||
import logging
|
||||
import math
|
||||
import multiprocessing
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
from multiprocessing import Process
|
||||
from faster_fifo import Queue
|
||||
from multiprocessing import Process, Queue
|
||||
from mockdatasource import MockDataSource
|
||||
from queue import Empty
|
||||
from typing import List
|
||||
|
@ -22,8 +21,7 @@ TABLE_COUNT = 1000
|
|||
QUEUE_SIZE = 1000000
|
||||
MAX_BATCH_SIZE = 3000
|
||||
|
||||
read_processes = []
|
||||
write_processes = []
|
||||
_DONE_MESSAGE = '__DONE__'
|
||||
|
||||
|
||||
def get_connection():
|
||||
|
@ -44,41 +42,64 @@ def get_connection():
|
|||
|
||||
# ANCHOR: read
|
||||
|
||||
def run_read_task(task_id: int, task_queues: List[Queue]):
|
||||
def run_read_task(task_id: int, task_queues: List[Queue], infinity):
|
||||
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
|
||||
data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
|
||||
data_source = MockDataSource(f"tb{task_id}", table_count_per_task, infinity)
|
||||
try:
|
||||
for batch in data_source:
|
||||
if isinstance(batch, tuple):
|
||||
batch = [batch]
|
||||
for table_id, rows in batch:
|
||||
# hash data to different queue
|
||||
i = table_id % len(task_queues)
|
||||
# block putting forever when the queue is full
|
||||
task_queues[i].put_many(rows, block=True, timeout=-1)
|
||||
for row in rows:
|
||||
task_queues[i].put(row)
|
||||
if not infinity:
|
||||
for queue in task_queues:
|
||||
queue.put(_DONE_MESSAGE)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
logging.info('read task over')
|
||||
|
||||
|
||||
# ANCHOR_END: read
|
||||
|
||||
|
||||
# ANCHOR: write
|
||||
def run_write_task(task_id: int, queue: Queue):
|
||||
def run_write_task(task_id: int, queue: Queue, done_queue: Queue):
|
||||
from sql_writer import SQLWriter
|
||||
log = logging.getLogger(f"WriteTask-{task_id}")
|
||||
writer = SQLWriter(get_connection)
|
||||
lines = None
|
||||
try:
|
||||
while True:
|
||||
over = False
|
||||
lines = []
|
||||
for _ in range(MAX_BATCH_SIZE):
|
||||
try:
|
||||
# get as many as possible
|
||||
lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE)
|
||||
writer.process_lines(lines)
|
||||
line = queue.get_nowait()
|
||||
if line == _DONE_MESSAGE:
|
||||
over = True
|
||||
break
|
||||
if line:
|
||||
lines.append(line)
|
||||
except Empty:
|
||||
time.sleep(0.01)
|
||||
time.sleep(0.1)
|
||||
if len(lines) > 0:
|
||||
writer.process_lines(lines)
|
||||
if over:
|
||||
done_queue.put(_DONE_MESSAGE)
|
||||
break
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except BaseException as e:
|
||||
log.debug(f"lines={lines}")
|
||||
raise e
|
||||
finally:
|
||||
writer.close()
|
||||
log.debug('write task over')
|
||||
|
||||
|
||||
# ANCHOR_END: write
|
||||
|
@ -103,13 +124,11 @@ def set_global_config():
|
|||
|
||||
|
||||
# ANCHOR: monitor
|
||||
def run_monitor_process():
|
||||
def run_monitor_process(done_queue: Queue):
|
||||
log = logging.getLogger("DataBaseMonitor")
|
||||
conn = None
|
||||
try:
|
||||
conn = get_connection()
|
||||
conn.execute("DROP DATABASE IF EXISTS test")
|
||||
conn.execute("CREATE DATABASE test")
|
||||
conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
|
||||
"TAGS (location BINARY(64), groupId INT)")
|
||||
|
||||
def get_count():
|
||||
res = conn.query("SELECT count(*) FROM test.meters")
|
||||
|
@ -118,32 +137,51 @@ def run_monitor_process():
|
|||
|
||||
last_count = 0
|
||||
while True:
|
||||
try:
|
||||
done = done_queue.get_nowait()
|
||||
if done == _DONE_MESSAGE:
|
||||
break
|
||||
except Empty:
|
||||
pass
|
||||
time.sleep(10)
|
||||
count = get_count()
|
||||
log.info(f"count={count} speed={(count - last_count) / 10}")
|
||||
last_count = count
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# ANCHOR_END: monitor
|
||||
# ANCHOR: main
|
||||
def main():
|
||||
def main(infinity):
|
||||
set_global_config()
|
||||
logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, "
|
||||
f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
|
||||
|
||||
monitor_process = Process(target=run_monitor_process)
|
||||
conn = get_connection()
|
||||
conn.execute("DROP DATABASE IF EXISTS test")
|
||||
conn.execute("CREATE DATABASE IF NOT EXISTS test")
|
||||
conn.execute("CREATE STABLE IF NOT EXISTS test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
|
||||
"TAGS (location BINARY(64), groupId INT)")
|
||||
conn.close()
|
||||
|
||||
done_queue = Queue()
|
||||
monitor_process = Process(target=run_monitor_process, args=(done_queue,))
|
||||
monitor_process.start()
|
||||
time.sleep(3) # waiting for database ready.
|
||||
logging.debug(f"monitor task started with pid {monitor_process.pid}")
|
||||
|
||||
task_queues: List[Queue] = []
|
||||
write_processes = []
|
||||
read_processes = []
|
||||
|
||||
# create task queues
|
||||
for i in range(WRITE_TASK_COUNT):
|
||||
queue = Queue(max_size_bytes=QUEUE_SIZE)
|
||||
queue = Queue()
|
||||
task_queues.append(queue)
|
||||
|
||||
# create write processes
|
||||
for i in range(WRITE_TASK_COUNT):
|
||||
p = Process(target=run_write_task, args=(i, task_queues[i]))
|
||||
p = Process(target=run_write_task, args=(i, task_queues[i], done_queue))
|
||||
p.start()
|
||||
logging.debug(f"WriteTask-{i} started with pid {p.pid}")
|
||||
write_processes.append(p)
|
||||
|
@ -151,13 +189,19 @@ def main():
|
|||
# create read processes
|
||||
for i in range(READ_TASK_COUNT):
|
||||
queues = assign_queues(i, task_queues)
|
||||
p = Process(target=run_read_task, args=(i, queues))
|
||||
p = Process(target=run_read_task, args=(i, queues, infinity))
|
||||
p.start()
|
||||
logging.debug(f"ReadTask-{i} started with pid {p.pid}")
|
||||
read_processes.append(p)
|
||||
|
||||
try:
|
||||
monitor_process.join()
|
||||
for p in read_processes:
|
||||
p.join()
|
||||
for p in write_processes:
|
||||
p.join()
|
||||
time.sleep(1)
|
||||
return
|
||||
except KeyboardInterrupt:
|
||||
monitor_process.terminate()
|
||||
[p.terminate() for p in read_processes]
|
||||
|
@ -176,5 +220,6 @@ def assign_queues(read_task_id, task_queues):
|
|||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
multiprocessing.set_start_method('spawn')
|
||||
main(False)
|
||||
# ANCHOR_END: main
|
||||
|
|
|
@ -26,7 +26,8 @@ class Consumer(object):
|
|||
'bath_consume': True,
|
||||
'batch_size': 1000,
|
||||
'async_model': True,
|
||||
'workers': 10
|
||||
'workers': 10,
|
||||
'testing': False
|
||||
}
|
||||
|
||||
LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
|
||||
|
@ -46,6 +47,7 @@ class Consumer(object):
|
|||
def __init__(self, **configs):
|
||||
self.config: dict = self.DEFAULT_CONFIGS
|
||||
self.config.update(configs)
|
||||
if not self.config.get('testing'):
|
||||
self.consumer = KafkaConsumer(
|
||||
self.config.get('kafka_topic'), # topic
|
||||
bootstrap_servers=self.config.get('kafka_brokers'),
|
||||
|
@ -60,7 +62,7 @@ class Consumer(object):
|
|||
)
|
||||
if self.config.get('async_model'):
|
||||
self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers'))
|
||||
self.tasks: list[Future] = []
|
||||
self.tasks = []
|
||||
# tags and table mapping # key: {location}_{groupId} value:
|
||||
self.tag_table_mapping = {}
|
||||
i = 0
|
||||
|
@ -115,14 +117,14 @@ class Consumer(object):
|
|||
if self.taos is not None:
|
||||
self.taos.close()
|
||||
|
||||
def _run(self, f: Callable[[ConsumerRecord], bool]):
|
||||
def _run(self, f):
|
||||
for message in self.consumer:
|
||||
if self.config.get('async_model'):
|
||||
self.pool.submit(f(message))
|
||||
else:
|
||||
f(message)
|
||||
|
||||
def _run_batch(self, f: Callable[[list[list[ConsumerRecord]]], None]):
|
||||
def _run_batch(self, f):
|
||||
while True:
|
||||
messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size'))
|
||||
if messages:
|
||||
|
@ -140,7 +142,7 @@ class Consumer(object):
|
|||
logging.info('## insert sql %s', sql)
|
||||
return self.taos.execute(sql=sql) == 1
|
||||
|
||||
def _to_taos_batch(self, messages: list[list[ConsumerRecord]]):
|
||||
def _to_taos_batch(self, messages):
|
||||
sql = self._build_sql_batch(messages=messages)
|
||||
if len(sql) == 0: # decode error, skip
|
||||
return
|
||||
|
@ -162,7 +164,7 @@ class Consumer(object):
|
|||
table_name = self._get_table_name(location=location, group_id=group_id)
|
||||
return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase)
|
||||
|
||||
def _build_sql_batch(self, messages: list[list[ConsumerRecord]]) -> str:
|
||||
def _build_sql_batch(self, messages) -> str:
|
||||
sql_list = []
|
||||
for partition_messages in messages:
|
||||
for message in partition_messages:
|
||||
|
@ -186,7 +188,54 @@ def _get_location_and_group(key: str) -> (str, int):
|
|||
return fields[0], fields[1]
|
||||
|
||||
|
||||
def test_to_taos(consumer: Consumer):
|
||||
msg = {
|
||||
'location': 'California.SanFrancisco',
|
||||
'groupId': 1,
|
||||
'ts': '2022-12-06 15:13:38.643',
|
||||
'current': 3.41,
|
||||
'voltage': 105,
|
||||
'phase': 0.02027,
|
||||
}
|
||||
record = ConsumerRecord(checksum=None, headers=None, offset=1, key=None, value=json.dumps(msg), partition=1,
|
||||
topic='test', serialized_key_size=None, serialized_header_size=None,
|
||||
serialized_value_size=None, timestamp=time.time(), timestamp_type=None)
|
||||
assert consumer._to_taos(message=record)
|
||||
|
||||
|
||||
def test_to_taos_batch(consumer: Consumer):
|
||||
records = [
|
||||
[
|
||||
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
|
||||
value=json.dumps({'location': 'California.SanFrancisco',
|
||||
'groupId': 1,
|
||||
'ts': '2022-12-06 15:13:38.643',
|
||||
'current': 3.41,
|
||||
'voltage': 105,
|
||||
'phase': 0.02027, }),
|
||||
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
|
||||
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
|
||||
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
|
||||
value=json.dumps({'location': 'California.LosAngles',
|
||||
'groupId': 2,
|
||||
'ts': '2022-12-06 15:13:39.643',
|
||||
'current': 3.41,
|
||||
'voltage': 102,
|
||||
'phase': 0.02027, }),
|
||||
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
|
||||
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
|
||||
]
|
||||
]
|
||||
|
||||
consumer._to_taos_batch(messages=records)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
consumer = Consumer(async_model=True)
|
||||
consumer = Consumer(async_model=True, testing=True)
|
||||
# init env
|
||||
consumer.init_env()
|
||||
consumer.consume()
|
||||
# consumer.consume()
|
||||
# test build sql
|
||||
# test build sql batch
|
||||
test_to_taos(consumer)
|
||||
test_to_taos_batch(consumer)
|
||||
|
|
|
@ -10,13 +10,14 @@ class MockDataSource:
|
|||
"9.4,118,0.141,California.SanFrancisco,4"
|
||||
]
|
||||
|
||||
def __init__(self, tb_name_prefix, table_count):
|
||||
def __init__(self, tb_name_prefix, table_count, infinity=True):
|
||||
self.table_name_prefix = tb_name_prefix + "_"
|
||||
self.table_count = table_count
|
||||
self.max_rows = 10000000
|
||||
self.current_ts = round(time.time() * 1000) - self.max_rows * 100
|
||||
# [(tableId, tableName, values),]
|
||||
self.data = self._init_data()
|
||||
self.infinity = infinity
|
||||
|
||||
def _init_data(self):
|
||||
lines = self.samples * (self.table_count // 5 + 1)
|
||||
|
@ -28,6 +29,9 @@ class MockDataSource:
|
|||
|
||||
def __iter__(self):
|
||||
self.row = 0
|
||||
if not self.infinity:
|
||||
return iter(self._iter_data())
|
||||
else:
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
|
@ -35,7 +39,9 @@ class MockDataSource:
|
|||
next 1000 rows for each table.
|
||||
return: {tableId:[row,...]}
|
||||
"""
|
||||
# generate 1000 timestamps
|
||||
return self._iter_data()
|
||||
|
||||
def _iter_data(self):
|
||||
ts = []
|
||||
for _ in range(1000):
|
||||
self.current_ts += 100
|
||||
|
@ -47,3 +53,9 @@ class MockDataSource:
|
|||
rows = [table_name + ',' + t + ',' + values for t in ts]
|
||||
result.append((table_id, rows))
|
||||
return result
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
datasource = MockDataSource('t', 10, False)
|
||||
for data in datasource:
|
||||
print(data)
|
||||
|
|
|
@ -10,6 +10,7 @@ class SQLWriter:
|
|||
self._tb_tags = {}
|
||||
self._conn = get_connection_func()
|
||||
self._max_sql_length = self.get_max_sql_length()
|
||||
self._conn.execute("create database if not exists test")
|
||||
self._conn.execute("USE test")
|
||||
|
||||
def get_max_sql_length(self):
|
||||
|
@ -20,7 +21,7 @@ class SQLWriter:
|
|||
return int(r[1])
|
||||
return 1024 * 1024
|
||||
|
||||
def process_lines(self, lines: str):
|
||||
def process_lines(self, lines: [str]):
|
||||
"""
|
||||
:param lines: [[tbName,ts,current,voltage,phase,location,groupId]]
|
||||
"""
|
||||
|
@ -60,6 +61,7 @@ class SQLWriter:
|
|||
buf.append(q)
|
||||
sql_len += len(q)
|
||||
sql += " ".join(buf)
|
||||
self.create_tables()
|
||||
self.execute_sql(sql)
|
||||
self._tb_values.clear()
|
||||
|
||||
|
@ -88,3 +90,22 @@ class SQLWriter:
|
|||
except BaseException as e:
|
||||
self.log.error("Execute SQL: %s", sql)
|
||||
raise e
|
||||
|
||||
def close(self):
|
||||
if self._conn:
|
||||
self._conn.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
def get_connection_func():
|
||||
conn = taos.connect()
|
||||
return conn
|
||||
|
||||
|
||||
writer = SQLWriter(get_connection_func=get_connection_func)
|
||||
writer.execute_sql(
|
||||
"create stable if not exists meters (ts timestamp, current float, voltage int, phase float) "
|
||||
"tags (location binary(64), groupId int)")
|
||||
writer.execute_sql(
|
||||
"INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) "
|
||||
"VALUES ('2021-07-13 14:06:32.272', 10.2, 219, 0.32)")
|
||||
|
|
|
@ -19,6 +19,12 @@ def init_tmq_env(db, topic):
|
|||
conn.execute("insert into tb3 values (now, 3, 3.0, 'tmq test')")
|
||||
|
||||
|
||||
def cleanup(db, topic):
|
||||
conn = taos.connect()
|
||||
conn.execute("drop topic if exists {}".format(topic))
|
||||
conn.execute("drop database if exists {}".format(db))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
init_tmq_env("tmq_test", "tmq_test_topic") # init env
|
||||
consumer = Consumer(
|
||||
|
@ -33,9 +39,9 @@ if __name__ == '__main__':
|
|||
|
||||
try:
|
||||
while True:
|
||||
res = consumer.poll(100)
|
||||
res = consumer.poll(1)
|
||||
if not res:
|
||||
continue
|
||||
break
|
||||
err = res.error()
|
||||
if err is not None:
|
||||
raise err
|
||||
|
@ -46,3 +52,4 @@ if __name__ == '__main__':
|
|||
finally:
|
||||
consumer.unsubscribe()
|
||||
consumer.close()
|
||||
cleanup("tmq_test", "tmq_test_topic")
|
||||
|
|
|
@ -44,4 +44,43 @@ taos -s "drop database test"
|
|||
python3 json_protocol_example.py
|
||||
|
||||
# 10
|
||||
# python3 subscribe_demo.py
|
||||
pip install SQLAlchemy
|
||||
pip install pandas
|
||||
taosBenchmark -y -d power -t 10 -n 10
|
||||
python3 conn_native_pandas.py
|
||||
python3 conn_rest_pandas.py
|
||||
taos -s "drop database if exists power"
|
||||
|
||||
# 11
|
||||
taos -s "create database if not exists test"
|
||||
python3 connect_native_reference.py
|
||||
|
||||
# 12
|
||||
python3 connect_rest_examples.py
|
||||
|
||||
# 13
|
||||
python3 handle_exception.py
|
||||
|
||||
# 14
|
||||
taosBenchmark -y -d power -t 2 -n 10
|
||||
python3 rest_client_example.py
|
||||
taos -s "drop database if exists power"
|
||||
|
||||
# 15
|
||||
python3 result_set_examples.py
|
||||
|
||||
# 16
|
||||
python3 tmq_example.py
|
||||
|
||||
# 17
|
||||
python3 sql_writer.py
|
||||
|
||||
# 18
|
||||
python3 mockdatasource.py
|
||||
|
||||
# 19
|
||||
python3 fast_write_example.py
|
||||
|
||||
# 20
|
||||
pip3 install kafka-python
|
||||
python3 kafka_example.py
|
||||
|
|
Loading…
Reference in New Issue