build: update writing kafka example and document (#19864)
* build: update writing kafka example and document * fixes kafka demo * fix: refine wording --------- Co-authored-by: Shuduo Sang <sangshuduo@gmail.com>
This commit is contained in:
parent
c2591b3215
commit
db6a8cdf60
|
@ -53,8 +53,69 @@ for p in ps:
|
|||
|
||||
In addition to python's built-in multithreading and multiprocessing library, we can also use the third-party library gunicorn.
|
||||
|
||||
### Examples
|
||||
### examples
|
||||
|
||||
<details>
|
||||
<summary>kafka_example_perform</summary>
|
||||
|
||||
`kafka_example_perform` is the entry point of the examples.
|
||||
|
||||
```py
|
||||
{{#include docs/examples/python/kafka_example.py}}
|
||||
{{#include docs/examples/python/kafka_example_perform.py}}
|
||||
```
|
||||
</details>
|
||||
|
||||
<details>
|
||||
<summary>kafka_example_common</summary>
|
||||
|
||||
`kafka_example_common` is the common code of the examples.
|
||||
|
||||
```py
|
||||
{{#include docs/examples/python/kafka_example_common.py}}
|
||||
```
|
||||
</details>
|
||||
|
||||
<details>
|
||||
<summary>kafka_example_producer</summary>
|
||||
|
||||
`kafka_example_producer` is `producer`, which is responsible for generating test data and sending it to kafka.
|
||||
|
||||
```py
|
||||
{{#include docs/examples/python/kafka_example_producer.py}}
|
||||
```
|
||||
</details>
|
||||
|
||||
<details>
|
||||
<summary>kafka_example_consumer</summary>
|
||||
|
||||
`kafka_example_consumer` is `consumer`,which is responsible for consuming data from kafka and writing it to TDengine.
|
||||
|
||||
```py
|
||||
{{#include docs/examples/python/kafka_example_consumer.py}}
|
||||
```
|
||||
</details>
|
||||
|
||||
### execute Python examples
|
||||
|
||||
<details>
|
||||
<summary>execute Python examples</summary>
|
||||
|
||||
1. install and start up `kafka`
|
||||
2. install python3 and pip
|
||||
3. install `taospy` by pip
|
||||
4. install `kafka-python` by pip
|
||||
5. execute this example
|
||||
|
||||
The entry point of this example is `kafka_example_perform.py`. For more information about usage, please use `--help` command.
|
||||
|
||||
```
|
||||
python3 kafka_example_perform.py --help
|
||||
```
|
||||
|
||||
For example, the following command is creating 100 sub-table and inserting 20000 data for each table and the kafka max poll is 100 and 1 thread and 1 process per thread.
|
||||
|
||||
```
|
||||
python3 kafka_example_perform.py -table-count=100 -table-items=20000 -max-poll=100 -threads=1 -processes=1
|
||||
```
|
||||
|
||||
</details>
|
||||
|
|
|
@ -1,241 +0,0 @@
|
|||
#! encoding = utf-8
|
||||
import json
|
||||
import time
|
||||
from json import JSONDecodeError
|
||||
from typing import Callable
|
||||
import logging
|
||||
from concurrent.futures import ThreadPoolExecutor, Future
|
||||
|
||||
import taos
|
||||
from kafka import KafkaConsumer
|
||||
from kafka.consumer.fetcher import ConsumerRecord
|
||||
|
||||
|
||||
class Consumer(object):
|
||||
DEFAULT_CONFIGS = {
|
||||
'kafka_brokers': 'localhost:9092',
|
||||
'kafka_topic': 'python_kafka',
|
||||
'kafka_group_id': 'taos',
|
||||
'taos_host': 'localhost',
|
||||
'taos_user': 'root',
|
||||
'taos_password': 'taosdata',
|
||||
'taos_database': 'power',
|
||||
'taos_port': 6030,
|
||||
'timezone': None,
|
||||
'clean_after_testing': False,
|
||||
'bath_consume': True,
|
||||
'batch_size': 1000,
|
||||
'async_model': True,
|
||||
'workers': 10,
|
||||
'testing': False
|
||||
}
|
||||
|
||||
LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
|
||||
'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale',
|
||||
'California.SantaClara', 'California.Cupertino']
|
||||
|
||||
CREATE_DATABASE_SQL = 'create database if not exists {} keep 365 duration 10 buffer 16 wal_level 1'
|
||||
USE_DATABASE_SQL = 'use {}'
|
||||
DROP_TABLE_SQL = 'drop table if exists meters'
|
||||
DROP_DATABASE_SQL = 'drop database if exists {}'
|
||||
CREATE_STABLE_SQL = 'create stable meters (ts timestamp, current float, voltage int, phase float) ' \
|
||||
'tags (location binary(64), groupId int)'
|
||||
CREATE_TABLE_SQL = 'create table if not exists {} using meters tags (\'{}\', {})'
|
||||
INSERT_SQL_HEADER = "insert into "
|
||||
INSERT_PART_SQL = 'power.{} values (\'{}\', {}, {}, {})'
|
||||
|
||||
def __init__(self, **configs):
|
||||
self.config: dict = self.DEFAULT_CONFIGS
|
||||
self.config.update(configs)
|
||||
if not self.config.get('testing'):
|
||||
self.consumer = KafkaConsumer(
|
||||
self.config.get('kafka_topic'), # topic
|
||||
bootstrap_servers=self.config.get('kafka_brokers'),
|
||||
group_id=self.config.get('kafka_group_id'),
|
||||
)
|
||||
self.taos = taos.connect(
|
||||
host=self.config.get('taos_host'),
|
||||
user=self.config.get('taos_user'),
|
||||
password=self.config.get('taos_password'),
|
||||
port=self.config.get('taos_port'),
|
||||
timezone=self.config.get('timezone'),
|
||||
)
|
||||
if self.config.get('async_model'):
|
||||
self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers'))
|
||||
self.tasks = []
|
||||
# tags and table mapping # key: {location}_{groupId} value:
|
||||
self.tag_table_mapping = {}
|
||||
i = 0
|
||||
for location in self.LOCATIONS:
|
||||
for j in range(1, 11):
|
||||
table_name = 'd{}'.format(i)
|
||||
self._cache_table(location=location, group_id=j, table_name=table_name)
|
||||
i += 1
|
||||
|
||||
def init_env(self):
|
||||
# create database and table
|
||||
self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database')))
|
||||
self.taos.execute(self.CREATE_DATABASE_SQL.format(self.config.get('taos_database')))
|
||||
self.taos.execute(self.USE_DATABASE_SQL.format(self.config.get('taos_database')))
|
||||
self.taos.execute(self.DROP_TABLE_SQL)
|
||||
self.taos.execute(self.CREATE_STABLE_SQL)
|
||||
for tags, table_name in self.tag_table_mapping.items():
|
||||
location, group_id = _get_location_and_group(tags)
|
||||
self.taos.execute(self.CREATE_TABLE_SQL.format(table_name, location, group_id))
|
||||
|
||||
def consume(self):
|
||||
logging.warning('## start consumer topic-[%s]', self.config.get('kafka_topic'))
|
||||
try:
|
||||
if self.config.get('bath_consume'):
|
||||
self._run_batch(self._to_taos_batch)
|
||||
else:
|
||||
self._run(self._to_taos)
|
||||
except KeyboardInterrupt:
|
||||
logging.warning("## caught keyboard interrupt, stopping")
|
||||
finally:
|
||||
self.stop()
|
||||
|
||||
def stop(self):
|
||||
# close consumer
|
||||
if self.consumer is not None:
|
||||
self.consumer.commit()
|
||||
self.consumer.close()
|
||||
|
||||
# multi thread
|
||||
if self.config.get('async_model'):
|
||||
for task in self.tasks:
|
||||
while not task.done():
|
||||
pass
|
||||
if self.pool is not None:
|
||||
self.pool.shutdown()
|
||||
|
||||
# clean data
|
||||
if self.config.get('clean_after_testing'):
|
||||
self.taos.execute(self.DROP_TABLE_SQL)
|
||||
self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database')))
|
||||
# close taos
|
||||
if self.taos is not None:
|
||||
self.taos.close()
|
||||
|
||||
def _run(self, f):
|
||||
for message in self.consumer:
|
||||
if self.config.get('async_model'):
|
||||
self.pool.submit(f(message))
|
||||
else:
|
||||
f(message)
|
||||
|
||||
def _run_batch(self, f):
|
||||
while True:
|
||||
messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size'))
|
||||
if messages:
|
||||
if self.config.get('async_model'):
|
||||
self.pool.submit(f, messages.values())
|
||||
else:
|
||||
f(list(messages.values()))
|
||||
if not messages:
|
||||
time.sleep(0.1)
|
||||
|
||||
def _to_taos(self, message: ConsumerRecord) -> bool:
|
||||
sql = self.INSERT_SQL_HEADER + self._build_sql(message.value)
|
||||
if len(sql) == 0: # decode error, skip
|
||||
return True
|
||||
logging.info('## insert sql %s', sql)
|
||||
return self.taos.execute(sql=sql) == 1
|
||||
|
||||
def _to_taos_batch(self, messages):
|
||||
sql = self._build_sql_batch(messages=messages)
|
||||
if len(sql) == 0: # decode error, skip
|
||||
return
|
||||
self.taos.execute(sql=sql)
|
||||
|
||||
def _build_sql(self, msg_value: str) -> str:
|
||||
try:
|
||||
data = json.loads(msg_value)
|
||||
except JSONDecodeError as e:
|
||||
logging.error('## decode message [%s] error ', msg_value, e)
|
||||
return ''
|
||||
location = data.get('location')
|
||||
group_id = data.get('groupId')
|
||||
ts = data.get('ts')
|
||||
current = data.get('current')
|
||||
voltage = data.get('voltage')
|
||||
phase = data.get('phase')
|
||||
|
||||
table_name = self._get_table_name(location=location, group_id=group_id)
|
||||
return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase)
|
||||
|
||||
def _build_sql_batch(self, messages) -> str:
|
||||
sql_list = []
|
||||
for partition_messages in messages:
|
||||
for message in partition_messages:
|
||||
sql_list.append(self._build_sql(message.value))
|
||||
|
||||
return self.INSERT_SQL_HEADER + ' '.join(sql_list)
|
||||
|
||||
def _cache_table(self, location: str, group_id: int, table_name: str):
|
||||
self.tag_table_mapping[_tag_table_mapping_key(location=location, group_id=group_id)] = table_name
|
||||
|
||||
def _get_table_name(self, location: str, group_id: int) -> str:
|
||||
return self.tag_table_mapping.get(_tag_table_mapping_key(location=location, group_id=group_id))
|
||||
|
||||
|
||||
def _tag_table_mapping_key(location: str, group_id: int):
|
||||
return '{}_{}'.format(location, group_id)
|
||||
|
||||
|
||||
def _get_location_and_group(key: str) -> (str, int):
|
||||
fields = key.split('_')
|
||||
return fields[0], fields[1]
|
||||
|
||||
|
||||
def test_to_taos(consumer: Consumer):
|
||||
msg = {
|
||||
'location': 'California.SanFrancisco',
|
||||
'groupId': 1,
|
||||
'ts': '2022-12-06 15:13:38.643',
|
||||
'current': 3.41,
|
||||
'voltage': 105,
|
||||
'phase': 0.02027,
|
||||
}
|
||||
record = ConsumerRecord(checksum=None, headers=None, offset=1, key=None, value=json.dumps(msg), partition=1,
|
||||
topic='test', serialized_key_size=None, serialized_header_size=None,
|
||||
serialized_value_size=None, timestamp=time.time(), timestamp_type=None)
|
||||
assert consumer._to_taos(message=record)
|
||||
|
||||
|
||||
def test_to_taos_batch(consumer: Consumer):
|
||||
records = [
|
||||
[
|
||||
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
|
||||
value=json.dumps({'location': 'California.SanFrancisco',
|
||||
'groupId': 1,
|
||||
'ts': '2022-12-06 15:13:38.643',
|
||||
'current': 3.41,
|
||||
'voltage': 105,
|
||||
'phase': 0.02027, }),
|
||||
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
|
||||
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
|
||||
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
|
||||
value=json.dumps({'location': 'California.LosAngles',
|
||||
'groupId': 2,
|
||||
'ts': '2022-12-06 15:13:39.643',
|
||||
'current': 3.41,
|
||||
'voltage': 102,
|
||||
'phase': 0.02027, }),
|
||||
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
|
||||
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
|
||||
]
|
||||
]
|
||||
|
||||
consumer._to_taos_batch(messages=records)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
consumer = Consumer(async_model=True, testing=True)
|
||||
# init env
|
||||
consumer.init_env()
|
||||
# consumer.consume()
|
||||
# test build sql
|
||||
# test build sql batch
|
||||
test_to_taos(consumer)
|
||||
test_to_taos_batch(consumer)
|
|
@ -0,0 +1,65 @@
|
|||
#! encoding = utf-8
|
||||
import taos
|
||||
|
||||
LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
|
||||
'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale',
|
||||
'California.SantaClara', 'California.Cupertino']
|
||||
|
||||
CREATE_DATABASE_SQL = 'create database if not exists {} keep 365 duration 10 buffer 16 wal_level 1'
|
||||
USE_DATABASE_SQL = 'use {}'
|
||||
DROP_TABLE_SQL = 'drop table if exists meters'
|
||||
DROP_DATABASE_SQL = 'drop database if exists {}'
|
||||
CREATE_STABLE_SQL = 'create stable meters (ts timestamp, current float, voltage int, phase float) tags ' \
|
||||
'(location binary(64), groupId int)'
|
||||
CREATE_TABLE_SQL = 'create table if not exists {} using meters tags (\'{}\', {})'
|
||||
|
||||
|
||||
def create_database_and_tables(host, port, user, password, db, table_count):
|
||||
tags_tables = _init_tags_table_names(table_count=table_count)
|
||||
conn = taos.connect(host=host, port=port, user=user, password=password)
|
||||
|
||||
conn.execute(DROP_DATABASE_SQL.format(db))
|
||||
conn.execute(CREATE_DATABASE_SQL.format(db))
|
||||
conn.execute(USE_DATABASE_SQL.format(db))
|
||||
conn.execute(DROP_TABLE_SQL)
|
||||
conn.execute(CREATE_STABLE_SQL)
|
||||
for tags in tags_tables:
|
||||
location, group_id = _get_location_and_group(tags)
|
||||
tables = tags_tables[tags]
|
||||
for table_name in tables:
|
||||
conn.execute(CREATE_TABLE_SQL.format(table_name, location, group_id))
|
||||
conn.close()
|
||||
|
||||
|
||||
def clean(host, port, user, password, db):
|
||||
conn = taos.connect(host=host, port=port, user=user, password=password)
|
||||
conn.execute(DROP_DATABASE_SQL.format(db))
|
||||
conn.close()
|
||||
|
||||
|
||||
def _init_tags_table_names(table_count):
|
||||
tags_table_names = {}
|
||||
group_id = 0
|
||||
for i in range(table_count):
|
||||
table_name = 'd{}'.format(i)
|
||||
location_idx = i % len(LOCATIONS)
|
||||
location = LOCATIONS[location_idx]
|
||||
if location_idx == 0:
|
||||
group_id += 1
|
||||
if group_id > 10:
|
||||
group_id -= 10
|
||||
key = _tag_table_mapping_key(location=location, group_id=group_id)
|
||||
if key not in tags_table_names:
|
||||
tags_table_names[key] = []
|
||||
tags_table_names[key].append(table_name)
|
||||
|
||||
return tags_table_names
|
||||
|
||||
|
||||
def _tag_table_mapping_key(location, group_id):
|
||||
return '{}_{}'.format(location, group_id)
|
||||
|
||||
|
||||
def _get_location_and_group(key):
|
||||
fields = key.split('_')
|
||||
return fields[0], fields[1]
|
|
@ -0,0 +1,231 @@
|
|||
#! encoding = utf-8
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, Future
|
||||
from json import JSONDecodeError
|
||||
from typing import Callable
|
||||
|
||||
import taos
|
||||
from kafka import KafkaConsumer
|
||||
from kafka.consumer.fetcher import ConsumerRecord
|
||||
|
||||
import kafka_example_common as common
|
||||
|
||||
|
||||
class Consumer(object):
|
||||
DEFAULT_CONFIGS = {
|
||||
'kafka_brokers': 'localhost:9092', # kafka broker
|
||||
'kafka_topic': 'tdengine_kafka_practices',
|
||||
'kafka_group_id': 'taos',
|
||||
'taos_host': 'localhost', # TDengine host
|
||||
'taos_port': 6030, # TDengine port
|
||||
'taos_user': 'root', # TDengine user name
|
||||
'taos_password': 'taosdata', # TDengine password
|
||||
'taos_database': 'power', # TDengine database
|
||||
'message_type': 'json', # message format, 'json' or 'line'
|
||||
'clean_after_testing': False, # if drop database after testing
|
||||
'max_poll': 1000, # poll size for batch mode
|
||||
'workers': 10, # thread count for multi-threading
|
||||
'testing': False
|
||||
}
|
||||
|
||||
INSERT_SQL_HEADER = "insert into "
|
||||
INSERT_PART_SQL = '{} values (\'{}\', {}, {}, {})'
|
||||
|
||||
def __init__(self, **configs):
|
||||
self.config = self.DEFAULT_CONFIGS
|
||||
self.config.update(configs)
|
||||
|
||||
self.consumer = None
|
||||
if not self.config.get('testing'):
|
||||
self.consumer = KafkaConsumer(
|
||||
self.config.get('kafka_topic'),
|
||||
bootstrap_servers=self.config.get('kafka_brokers'),
|
||||
group_id=self.config.get('kafka_group_id'),
|
||||
)
|
||||
|
||||
self.conns = taos.connect(
|
||||
host=self.config.get('taos_host'),
|
||||
port=self.config.get('taos_port'),
|
||||
user=self.config.get('taos_user'),
|
||||
password=self.config.get('taos_password'),
|
||||
db=self.config.get('taos_database'),
|
||||
)
|
||||
if self.config.get('workers') > 1:
|
||||
self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers'))
|
||||
self.tasks = []
|
||||
# tags and table mapping # key: {location}_{groupId} value:
|
||||
|
||||
def consume(self):
|
||||
"""
|
||||
|
||||
consume data from kafka and deal. Base on `message_type`, `bath_consume`, `insert_by_table`,
|
||||
there are several deal function.
|
||||
:return:
|
||||
"""
|
||||
self.conns.execute(common.USE_DATABASE_SQL.format(self.config.get('taos_database')))
|
||||
try:
|
||||
if self.config.get('message_type') == 'line': # line
|
||||
self._run(self._line_to_taos)
|
||||
if self.config.get('message_type') == 'json': # json
|
||||
self._run(self._json_to_taos)
|
||||
except KeyboardInterrupt:
|
||||
logging.warning("## caught keyboard interrupt, stopping")
|
||||
finally:
|
||||
self.stop()
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
|
||||
stop consuming
|
||||
:return:
|
||||
"""
|
||||
# close consumer
|
||||
if self.consumer is not None:
|
||||
self.consumer.commit()
|
||||
self.consumer.close()
|
||||
|
||||
# multi thread
|
||||
if self.config.get('workers') > 1:
|
||||
if self.pool is not None:
|
||||
self.pool.shutdown()
|
||||
for task in self.tasks:
|
||||
while not task.done():
|
||||
time.sleep(0.01)
|
||||
|
||||
# clean data
|
||||
if self.config.get('clean_after_testing'):
|
||||
self.conns.execute(common.DROP_TABLE_SQL)
|
||||
self.conns.execute(common.DROP_DATABASE_SQL.format(self.config.get('taos_database')))
|
||||
# close taos
|
||||
if self.conns is not None:
|
||||
self.conns.close()
|
||||
|
||||
def _run(self, f):
|
||||
"""
|
||||
|
||||
run in batch consuming mode
|
||||
:param f:
|
||||
:return:
|
||||
"""
|
||||
i = 0 # just for test.
|
||||
while True:
|
||||
messages = self.consumer.poll(timeout_ms=100, max_records=self.config.get('max_poll'))
|
||||
if messages:
|
||||
if self.config.get('workers') > 1:
|
||||
self.pool.submit(f, messages.values())
|
||||
else:
|
||||
f(list(messages.values()))
|
||||
if not messages:
|
||||
i += 1 # just for test.
|
||||
time.sleep(0.1)
|
||||
if i > 3: # just for test.
|
||||
logging.warning('## test over.') # just for test.
|
||||
return # just for test.
|
||||
|
||||
def _json_to_taos(self, messages):
|
||||
"""
|
||||
|
||||
convert a batch of json data to sql, and insert into TDengine
|
||||
:param messages:
|
||||
:return:
|
||||
"""
|
||||
sql = self._build_sql_from_json(messages=messages)
|
||||
self.conns.execute(sql=sql)
|
||||
|
||||
def _line_to_taos(self, messages):
|
||||
"""
|
||||
|
||||
convert a batch of lines data to sql, and insert into TDengine
|
||||
:param messages:
|
||||
:return:
|
||||
"""
|
||||
lines = []
|
||||
for partition_messages in messages:
|
||||
for message in partition_messages:
|
||||
lines.append(message.value.decode())
|
||||
sql = self.INSERT_SQL_HEADER + ' '.join(lines)
|
||||
self.conns.execute(sql=sql)
|
||||
|
||||
def _build_single_sql_from_json(self, msg_value):
|
||||
try:
|
||||
data = json.loads(msg_value)
|
||||
except JSONDecodeError as e:
|
||||
logging.error('## decode message [%s] error ', msg_value, e)
|
||||
return ''
|
||||
# location = data.get('location')
|
||||
# group_id = data.get('groupId')
|
||||
ts = data.get('ts')
|
||||
current = data.get('current')
|
||||
voltage = data.get('voltage')
|
||||
phase = data.get('phase')
|
||||
table_name = data.get('table_name')
|
||||
|
||||
return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase)
|
||||
|
||||
def _build_sql_from_json(self, messages):
|
||||
sql_list = []
|
||||
for partition_messages in messages:
|
||||
for message in partition_messages:
|
||||
sql_list.append(self._build_single_sql_from_json(message.value))
|
||||
return self.INSERT_SQL_HEADER + ' '.join(sql_list)
|
||||
|
||||
|
||||
def test_json_to_taos(consumer: Consumer):
|
||||
records = [
|
||||
[
|
||||
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
|
||||
value=json.dumps({'table_name': 'd0',
|
||||
'ts': '2022-12-06 15:13:38.643',
|
||||
'current': 3.41,
|
||||
'voltage': 105,
|
||||
'phase': 0.02027, }),
|
||||
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
|
||||
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
|
||||
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
|
||||
value=json.dumps({'table_name': 'd1',
|
||||
'ts': '2022-12-06 15:13:39.643',
|
||||
'current': 3.41,
|
||||
'voltage': 102,
|
||||
'phase': 0.02027, }),
|
||||
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
|
||||
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
|
||||
]
|
||||
]
|
||||
|
||||
consumer._json_to_taos(messages=records)
|
||||
|
||||
|
||||
def test_line_to_taos(consumer: Consumer):
|
||||
records = [
|
||||
[
|
||||
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
|
||||
value="d0 values('2023-01-01 00:00:00.001', 3.49, 109, 0.02737)".encode('utf-8'),
|
||||
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
|
||||
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
|
||||
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
|
||||
value="d1 values('2023-01-01 00:00:00.002', 6.19, 112, 0.09171)".encode('utf-8'),
|
||||
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
|
||||
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
|
||||
]
|
||||
]
|
||||
consumer._line_to_taos(messages=records)
|
||||
|
||||
|
||||
def consume(kafka_brokers, kafka_topic, kafka_group_id, taos_host, taos_port, taos_user,
|
||||
taos_password, taos_database, message_type, max_poll, workers):
|
||||
c = Consumer(kafka_brokers=kafka_brokers, kafka_topic=kafka_topic, kafka_group_id=kafka_group_id,
|
||||
taos_host=taos_host, taos_port=taos_port, taos_user=taos_user, taos_password=taos_password,
|
||||
taos_database=taos_database, message_type=message_type, max_poll=max_poll, workers=workers)
|
||||
c.consume()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
consumer = Consumer(testing=True)
|
||||
common.create_database_and_tables(host='localhost', port=6030, user='root', password='taosdata', db='py_kafka_test',
|
||||
table_count=10)
|
||||
consumer.conns.execute(common.USE_DATABASE_SQL.format('py_kafka_test'))
|
||||
test_json_to_taos(consumer)
|
||||
test_line_to_taos(consumer)
|
||||
common.clean(host='localhost', port=6030, user='root', password='taosdata', db='py_kafka_test')
|
|
@ -0,0 +1,103 @@
|
|||
#! encoding=utf-8
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import multiprocessing
|
||||
import time
|
||||
from multiprocessing import pool
|
||||
|
||||
import kafka_example_common as common
|
||||
import kafka_example_consumer as consumer
|
||||
import kafka_example_producer as producer
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('-kafka-broker', type=str, default='localhost:9092',
|
||||
help='kafka borker host. default is `localhost:9200`')
|
||||
parser.add_argument('-kafka-topic', type=str, default='tdengine-kafka-practices',
|
||||
help='kafka topic. default is `tdengine-kafka-practices`')
|
||||
parser.add_argument('-kafka-group', type=str, default='kafka_practices',
|
||||
help='kafka consumer group. default is `kafka_practices`')
|
||||
parser.add_argument('-taos-host', type=str, default='localhost',
|
||||
help='TDengine host. default is `localhost`')
|
||||
parser.add_argument('-taos-port', type=int, default=6030, help='TDengine port. default is 6030')
|
||||
parser.add_argument('-taos-user', type=str, default='root', help='TDengine username, default is `root`')
|
||||
parser.add_argument('-taos-password', type=str, default='taosdata', help='TDengine password, default is `taosdata`')
|
||||
parser.add_argument('-taos-db', type=str, default='tdengine_kafka_practices',
|
||||
help='TDengine db name, default is `tdengine_kafka_practices`')
|
||||
parser.add_argument('-table-count', type=int, default=100, help='TDengine sub-table count, default is 100')
|
||||
parser.add_argument('-table-items', type=int, default=1000, help='items in per sub-tables, default is 1000')
|
||||
parser.add_argument('-message-type', type=str, default='line',
|
||||
help='kafka message type. `line` or `json`. default is `line`')
|
||||
parser.add_argument('-max-poll', type=int, default=1000, help='max poll for kafka consumer')
|
||||
parser.add_argument('-threads', type=int, default=10, help='thread count for deal message')
|
||||
parser.add_argument('-processes', type=int, default=1, help='process count')
|
||||
|
||||
args = parser.parse_args()
|
||||
total = args.table_count * args.table_items
|
||||
|
||||
logging.warning("## start to prepare testing data...")
|
||||
prepare_data_start = time.time()
|
||||
producer.produce_total(100, args.kafka_broker, args.kafka_topic, args.message_type, total, args.table_count)
|
||||
prepare_data_end = time.time()
|
||||
logging.warning("## prepare testing data finished! spend-[%s]", prepare_data_end - prepare_data_start)
|
||||
|
||||
logging.warning("## start to create database and tables ...")
|
||||
create_db_start = time.time()
|
||||
# create database and table
|
||||
common.create_database_and_tables(host=args.taos_host, port=args.taos_port, user=args.taos_user,
|
||||
password=args.taos_password, db=args.taos_db, table_count=args.table_count)
|
||||
create_db_end = time.time()
|
||||
logging.warning("## create database and tables finished! spend [%s]", create_db_end - create_db_start)
|
||||
|
||||
processes = args.processes
|
||||
|
||||
logging.warning("## start to consume data and insert into TDengine...")
|
||||
consume_start = time.time()
|
||||
if processes > 1: # multiprocess
|
||||
multiprocessing.set_start_method("spawn")
|
||||
pool = pool.Pool(processes)
|
||||
|
||||
consume_start = time.time()
|
||||
for _ in range(processes):
|
||||
pool.apply_async(func=consumer.consume, args=(
|
||||
args.kafka_broker, args.kafka_topic, args.kafka_group, args.taos_host, args.taos_port, args.taos_user,
|
||||
args.taos_password, args.taos_db, args.message_type, args.max_poll, args.threads))
|
||||
pool.close()
|
||||
pool.join()
|
||||
else:
|
||||
consume_start = time.time()
|
||||
consumer.consume(kafka_brokers=args.kafka_broker, kafka_topic=args.kafka_topic, kafka_group_id=args.kafka_group,
|
||||
taos_host=args.taos_host, taos_port=args.taos_port, taos_user=args.taos_user,
|
||||
taos_password=args.taos_password, taos_database=args.taos_db, message_type=args.message_type,
|
||||
max_poll=args.max_poll, workers=args.threads)
|
||||
consume_end = time.time()
|
||||
logging.warning("## consume data and insert into TDengine over! spend-[%s]", consume_end - consume_start)
|
||||
|
||||
# print report
|
||||
logging.warning(
|
||||
"\n#######################\n"
|
||||
" Prepare data \n"
|
||||
"#######################\n"
|
||||
"# data_type # %s \n"
|
||||
"# total # %s \n"
|
||||
"# spend # %s s\n"
|
||||
"#######################\n"
|
||||
" Create database \n"
|
||||
"#######################\n"
|
||||
"# stable # 1 \n"
|
||||
"# sub-table # 100 \n"
|
||||
"# spend # %s s \n"
|
||||
"#######################\n"
|
||||
" Consume \n"
|
||||
"#######################\n"
|
||||
"# data_type # %s \n"
|
||||
"# threads # %s \n"
|
||||
"# processes # %s \n"
|
||||
"# total_count # %s \n"
|
||||
"# spend # %s s\n"
|
||||
"# per_second # %s \n"
|
||||
"#######################\n",
|
||||
args.message_type, total, prepare_data_end - prepare_data_start, create_db_end - create_db_start,
|
||||
args.message_type, args.threads, processes, total, consume_end - consume_start,
|
||||
total / (consume_end - consume_start))
|
|
@ -0,0 +1,97 @@
|
|||
#! encoding = utf-8
|
||||
import json
|
||||
import random
|
||||
import threading
|
||||
from concurrent.futures import ThreadPoolExecutor, Future
|
||||
from datetime import datetime
|
||||
|
||||
from kafka import KafkaProducer
|
||||
|
||||
locations = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
|
||||
'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale',
|
||||
'California.SantaClara', 'California.Cupertino']
|
||||
|
||||
producers: list[KafkaProducer] = []
|
||||
|
||||
lock = threading.Lock()
|
||||
start = 1640966400
|
||||
|
||||
|
||||
def produce_total(workers, broker, topic, message_type, total, table_count):
|
||||
if len(producers) == 0:
|
||||
lock.acquire()
|
||||
if len(producers) == 0:
|
||||
_init_kafka_producers(broker=broker, count=10)
|
||||
lock.release()
|
||||
pool = ThreadPoolExecutor(max_workers=workers)
|
||||
futures = []
|
||||
for _ in range(0, workers):
|
||||
futures.append(pool.submit(_produce_total, topic, message_type, int(total / workers), table_count))
|
||||
pool.shutdown()
|
||||
for f in futures:
|
||||
f.result()
|
||||
_close_kafka_producers()
|
||||
|
||||
|
||||
def _produce_total(topic, message_type, total, table_count):
|
||||
producer = _get_kafka_producer()
|
||||
for _ in range(total):
|
||||
message = _get_fake_date(message_type=message_type, table_count=table_count)
|
||||
producer.send(topic=topic, value=message.encode(encoding='utf-8'))
|
||||
|
||||
|
||||
def _init_kafka_producers(broker, count):
|
||||
for _ in range(count):
|
||||
p = KafkaProducer(bootstrap_servers=broker, batch_size=64 * 1024, linger_ms=300, acks=0)
|
||||
producers.append(p)
|
||||
|
||||
|
||||
def _close_kafka_producers():
|
||||
for p in producers:
|
||||
p.close()
|
||||
|
||||
|
||||
def _get_kafka_producer():
|
||||
return producers[random.randint(0, len(producers) - 1)]
|
||||
|
||||
|
||||
def _get_fake_date(table_count, message_type='json'):
|
||||
if message_type == 'json':
|
||||
return _get_json_message(table_count=table_count)
|
||||
if message_type == 'line':
|
||||
return _get_line_message(table_count=table_count)
|
||||
return ''
|
||||
|
||||
|
||||
def _get_json_message(table_count):
|
||||
return json.dumps({
|
||||
'ts': _get_timestamp(),
|
||||
'current': random.randint(0, 1000) / 100,
|
||||
'voltage': random.randint(105, 115),
|
||||
'phase': random.randint(0, 32000) / 100000,
|
||||
'location': random.choice(locations),
|
||||
'groupId': random.randint(1, 10),
|
||||
'table_name': _random_table_name(table_count)
|
||||
})
|
||||
|
||||
|
||||
def _get_line_message(table_count):
|
||||
return "{} values('{}', {}, {}, {})".format(
|
||||
_random_table_name(table_count), # table
|
||||
_get_timestamp(), # ts
|
||||
random.randint(0, 1000) / 100, # current
|
||||
random.randint(105, 115), # voltage
|
||||
random.randint(0, 32000) / 100000, # phase
|
||||
)
|
||||
|
||||
|
||||
def _random_table_name(table_count):
|
||||
return 'd{}'.format(random.randint(0, table_count - 1))
|
||||
|
||||
|
||||
def _get_timestamp():
|
||||
global start
|
||||
lock.acquire(blocking=True)
|
||||
start += 0.001
|
||||
lock.release()
|
||||
return datetime.fromtimestamp(start).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
|
|
@ -55,6 +55,70 @@ for p in ps:
|
|||
|
||||
### 完整示例
|
||||
|
||||
<details>
|
||||
<summary>kafka_example_perform</summary>
|
||||
|
||||
`kafka_example_perform` 是示例程序的入口
|
||||
|
||||
```py
|
||||
{{#include docs/examples/python/kafka_example.py}}
|
||||
{{#include docs/examples/python/kafka_example_perform.py}}
|
||||
```
|
||||
</details>
|
||||
|
||||
<details>
|
||||
<summary>kafka_example_common</summary>
|
||||
|
||||
`kafka_example_common` 是示例程序的公共代码
|
||||
|
||||
```py
|
||||
{{#include docs/examples/python/kafka_example_common.py}}
|
||||
```
|
||||
</details>
|
||||
|
||||
<details>
|
||||
<summary>kafka_example_producer</summary>
|
||||
|
||||
`kafka_example_producer` 是示例程序的 producer 代码,负责生成并发送测试数据到 kafka
|
||||
|
||||
```py
|
||||
{{#include docs/examples/python/kafka_example_producer.py}}
|
||||
```
|
||||
</details>
|
||||
|
||||
<details>
|
||||
<summary>kafka_example_consumer</summary>
|
||||
|
||||
`kafka_example_consumer` 是示例程序的 consumer 代码,负责从 kafka 消费数据,并写入到 TDengine
|
||||
|
||||
```py
|
||||
{{#include docs/examples/python/kafka_example_consumer.py}}
|
||||
```
|
||||
</details>
|
||||
|
||||
### 执行步骤
|
||||
|
||||
<details>
|
||||
<summary>执行 Python 示例程序</summary>
|
||||
|
||||
1. 安装并启动 kafka
|
||||
|
||||
2. python 环境准备
|
||||
- 安装 python3
|
||||
- 安装 taospy
|
||||
- 安装 kafka-python
|
||||
|
||||
3. 执行示例程序
|
||||
|
||||
程序的执行入口是 `kafka_example_perform.py`,获取程序完整的执行参数,请执行 help 命令。
|
||||
|
||||
```
|
||||
python3 kafka_example_perform.py --help
|
||||
```
|
||||
|
||||
以下为创建 100 个子表,每个子表 20000 条数据,kafka max poll 为 100,一个进程,每个进程一个处理线程的程序执行命令
|
||||
|
||||
```
|
||||
python3 kafka_example_perform.py -table-count=100 -table-items=20000 -max-poll=100 -threads=1 -processes=1
|
||||
```
|
||||
|
||||
</details>
|
||||
|
|
|
@ -83,4 +83,5 @@ python3 fast_write_example.py
|
|||
|
||||
# 20
|
||||
pip3 install kafka-python
|
||||
python3 kafka_example.py
|
||||
python3 kafka_example_consumer.py
|
||||
|
||||
|
|
Loading…
Reference in New Issue