diff --git a/docs/en/07-develop/03-insert-data/_py_kafka.mdx b/docs/en/07-develop/03-insert-data/_py_kafka.mdx index dc43a0d415..c71821dad1 100644 --- a/docs/en/07-develop/03-insert-data/_py_kafka.mdx +++ b/docs/en/07-develop/03-insert-data/_py_kafka.mdx @@ -53,8 +53,69 @@ for p in ps: In addition to python's built-in multithreading and multiprocessing library, we can also use the third-party library gunicorn. -### Examples +### examples + +
+kafka_example_perform + +`kafka_example_perform` is the entry point of the examples. ```py -{{#include docs/examples/python/kafka_example.py}} +{{#include docs/examples/python/kafka_example_perform.py}} ``` +
+ +
+kafka_example_common + +`kafka_example_common` is the common code of the examples. + +```py +{{#include docs/examples/python/kafka_example_common.py}} +``` +
+ +
+kafka_example_producer + +`kafka_example_producer` is `producer`, which is responsible for generating test data and sending it to kafka. + +```py +{{#include docs/examples/python/kafka_example_producer.py}} +``` +
+ +
+kafka_example_consumer + +`kafka_example_consumer` is `consumer`,which is responsible for consuming data from kafka and writing it to TDengine. + +```py +{{#include docs/examples/python/kafka_example_consumer.py}} +``` +
+ +### execute Python examples + +
+ execute Python examples + + 1. install and start up `kafka` + 2. install python3 and pip + 3. install `taospy` by pip + 4. install `kafka-python` by pip + 5. execute this example + + The entry point of this example is `kafka_example_perform.py`. For more information about usage, please use `--help` command. + + ``` + python3 kafka_example_perform.py --help + ``` + + For example, the following command is creating 100 sub-table and inserting 20000 data for each table and the kafka max poll is 100 and 1 thread and 1 process per thread. + + ``` + python3 kafka_example_perform.py -table-count=100 -table-items=20000 -max-poll=100 -threads=1 -processes=1 + ``` + +
diff --git a/docs/examples/python/kafka_example.py b/docs/examples/python/kafka_example.py deleted file mode 100644 index 5b81706ef7..0000000000 --- a/docs/examples/python/kafka_example.py +++ /dev/null @@ -1,241 +0,0 @@ -#! encoding = utf-8 -import json -import time -from json import JSONDecodeError -from typing import Callable -import logging -from concurrent.futures import ThreadPoolExecutor, Future - -import taos -from kafka import KafkaConsumer -from kafka.consumer.fetcher import ConsumerRecord - - -class Consumer(object): - DEFAULT_CONFIGS = { - 'kafka_brokers': 'localhost:9092', - 'kafka_topic': 'python_kafka', - 'kafka_group_id': 'taos', - 'taos_host': 'localhost', - 'taos_user': 'root', - 'taos_password': 'taosdata', - 'taos_database': 'power', - 'taos_port': 6030, - 'timezone': None, - 'clean_after_testing': False, - 'bath_consume': True, - 'batch_size': 1000, - 'async_model': True, - 'workers': 10, - 'testing': False - } - - LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose', - 'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale', - 'California.SantaClara', 'California.Cupertino'] - - CREATE_DATABASE_SQL = 'create database if not exists {} keep 365 duration 10 buffer 16 wal_level 1' - USE_DATABASE_SQL = 'use {}' - DROP_TABLE_SQL = 'drop table if exists meters' - DROP_DATABASE_SQL = 'drop database if exists {}' - CREATE_STABLE_SQL = 'create stable meters (ts timestamp, current float, voltage int, phase float) ' \ - 'tags (location binary(64), groupId int)' - CREATE_TABLE_SQL = 'create table if not exists {} using meters tags (\'{}\', {})' - INSERT_SQL_HEADER = "insert into " - INSERT_PART_SQL = 'power.{} values (\'{}\', {}, {}, {})' - - def __init__(self, **configs): - self.config: dict = self.DEFAULT_CONFIGS - self.config.update(configs) - if not self.config.get('testing'): - self.consumer = KafkaConsumer( - self.config.get('kafka_topic'), # topic - bootstrap_servers=self.config.get('kafka_brokers'), - group_id=self.config.get('kafka_group_id'), - ) - self.taos = taos.connect( - host=self.config.get('taos_host'), - user=self.config.get('taos_user'), - password=self.config.get('taos_password'), - port=self.config.get('taos_port'), - timezone=self.config.get('timezone'), - ) - if self.config.get('async_model'): - self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers')) - self.tasks = [] - # tags and table mapping # key: {location}_{groupId} value: - self.tag_table_mapping = {} - i = 0 - for location in self.LOCATIONS: - for j in range(1, 11): - table_name = 'd{}'.format(i) - self._cache_table(location=location, group_id=j, table_name=table_name) - i += 1 - - def init_env(self): - # create database and table - self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database'))) - self.taos.execute(self.CREATE_DATABASE_SQL.format(self.config.get('taos_database'))) - self.taos.execute(self.USE_DATABASE_SQL.format(self.config.get('taos_database'))) - self.taos.execute(self.DROP_TABLE_SQL) - self.taos.execute(self.CREATE_STABLE_SQL) - for tags, table_name in self.tag_table_mapping.items(): - location, group_id = _get_location_and_group(tags) - self.taos.execute(self.CREATE_TABLE_SQL.format(table_name, location, group_id)) - - def consume(self): - logging.warning('## start consumer topic-[%s]', self.config.get('kafka_topic')) - try: - if self.config.get('bath_consume'): - self._run_batch(self._to_taos_batch) - else: - self._run(self._to_taos) - except KeyboardInterrupt: - logging.warning("## caught keyboard interrupt, stopping") - finally: - self.stop() - - def stop(self): - # close consumer - if self.consumer is not None: - self.consumer.commit() - self.consumer.close() - - # multi thread - if self.config.get('async_model'): - for task in self.tasks: - while not task.done(): - pass - if self.pool is not None: - self.pool.shutdown() - - # clean data - if self.config.get('clean_after_testing'): - self.taos.execute(self.DROP_TABLE_SQL) - self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database'))) - # close taos - if self.taos is not None: - self.taos.close() - - def _run(self, f): - for message in self.consumer: - if self.config.get('async_model'): - self.pool.submit(f(message)) - else: - f(message) - - def _run_batch(self, f): - while True: - messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size')) - if messages: - if self.config.get('async_model'): - self.pool.submit(f, messages.values()) - else: - f(list(messages.values())) - if not messages: - time.sleep(0.1) - - def _to_taos(self, message: ConsumerRecord) -> bool: - sql = self.INSERT_SQL_HEADER + self._build_sql(message.value) - if len(sql) == 0: # decode error, skip - return True - logging.info('## insert sql %s', sql) - return self.taos.execute(sql=sql) == 1 - - def _to_taos_batch(self, messages): - sql = self._build_sql_batch(messages=messages) - if len(sql) == 0: # decode error, skip - return - self.taos.execute(sql=sql) - - def _build_sql(self, msg_value: str) -> str: - try: - data = json.loads(msg_value) - except JSONDecodeError as e: - logging.error('## decode message [%s] error ', msg_value, e) - return '' - location = data.get('location') - group_id = data.get('groupId') - ts = data.get('ts') - current = data.get('current') - voltage = data.get('voltage') - phase = data.get('phase') - - table_name = self._get_table_name(location=location, group_id=group_id) - return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase) - - def _build_sql_batch(self, messages) -> str: - sql_list = [] - for partition_messages in messages: - for message in partition_messages: - sql_list.append(self._build_sql(message.value)) - - return self.INSERT_SQL_HEADER + ' '.join(sql_list) - - def _cache_table(self, location: str, group_id: int, table_name: str): - self.tag_table_mapping[_tag_table_mapping_key(location=location, group_id=group_id)] = table_name - - def _get_table_name(self, location: str, group_id: int) -> str: - return self.tag_table_mapping.get(_tag_table_mapping_key(location=location, group_id=group_id)) - - -def _tag_table_mapping_key(location: str, group_id: int): - return '{}_{}'.format(location, group_id) - - -def _get_location_and_group(key: str) -> (str, int): - fields = key.split('_') - return fields[0], fields[1] - - -def test_to_taos(consumer: Consumer): - msg = { - 'location': 'California.SanFrancisco', - 'groupId': 1, - 'ts': '2022-12-06 15:13:38.643', - 'current': 3.41, - 'voltage': 105, - 'phase': 0.02027, - } - record = ConsumerRecord(checksum=None, headers=None, offset=1, key=None, value=json.dumps(msg), partition=1, - topic='test', serialized_key_size=None, serialized_header_size=None, - serialized_value_size=None, timestamp=time.time(), timestamp_type=None) - assert consumer._to_taos(message=record) - - -def test_to_taos_batch(consumer: Consumer): - records = [ - [ - ConsumerRecord(checksum=None, headers=None, offset=1, key=None, - value=json.dumps({'location': 'California.SanFrancisco', - 'groupId': 1, - 'ts': '2022-12-06 15:13:38.643', - 'current': 3.41, - 'voltage': 105, - 'phase': 0.02027, }), - partition=1, topic='test', serialized_key_size=None, serialized_header_size=None, - serialized_value_size=None, timestamp=time.time(), timestamp_type=None), - ConsumerRecord(checksum=None, headers=None, offset=1, key=None, - value=json.dumps({'location': 'California.LosAngles', - 'groupId': 2, - 'ts': '2022-12-06 15:13:39.643', - 'current': 3.41, - 'voltage': 102, - 'phase': 0.02027, }), - partition=1, topic='test', serialized_key_size=None, serialized_header_size=None, - serialized_value_size=None, timestamp=time.time(), timestamp_type=None), - ] - ] - - consumer._to_taos_batch(messages=records) - - -if __name__ == '__main__': - consumer = Consumer(async_model=True, testing=True) - # init env - consumer.init_env() - # consumer.consume() - # test build sql - # test build sql batch - test_to_taos(consumer) - test_to_taos_batch(consumer) diff --git a/docs/examples/python/kafka_example_common.py b/docs/examples/python/kafka_example_common.py new file mode 100644 index 0000000000..566748c94e --- /dev/null +++ b/docs/examples/python/kafka_example_common.py @@ -0,0 +1,65 @@ +#! encoding = utf-8 +import taos + +LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose', + 'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale', + 'California.SantaClara', 'California.Cupertino'] + +CREATE_DATABASE_SQL = 'create database if not exists {} keep 365 duration 10 buffer 16 wal_level 1' +USE_DATABASE_SQL = 'use {}' +DROP_TABLE_SQL = 'drop table if exists meters' +DROP_DATABASE_SQL = 'drop database if exists {}' +CREATE_STABLE_SQL = 'create stable meters (ts timestamp, current float, voltage int, phase float) tags ' \ + '(location binary(64), groupId int)' +CREATE_TABLE_SQL = 'create table if not exists {} using meters tags (\'{}\', {})' + + +def create_database_and_tables(host, port, user, password, db, table_count): + tags_tables = _init_tags_table_names(table_count=table_count) + conn = taos.connect(host=host, port=port, user=user, password=password) + + conn.execute(DROP_DATABASE_SQL.format(db)) + conn.execute(CREATE_DATABASE_SQL.format(db)) + conn.execute(USE_DATABASE_SQL.format(db)) + conn.execute(DROP_TABLE_SQL) + conn.execute(CREATE_STABLE_SQL) + for tags in tags_tables: + location, group_id = _get_location_and_group(tags) + tables = tags_tables[tags] + for table_name in tables: + conn.execute(CREATE_TABLE_SQL.format(table_name, location, group_id)) + conn.close() + + +def clean(host, port, user, password, db): + conn = taos.connect(host=host, port=port, user=user, password=password) + conn.execute(DROP_DATABASE_SQL.format(db)) + conn.close() + + +def _init_tags_table_names(table_count): + tags_table_names = {} + group_id = 0 + for i in range(table_count): + table_name = 'd{}'.format(i) + location_idx = i % len(LOCATIONS) + location = LOCATIONS[location_idx] + if location_idx == 0: + group_id += 1 + if group_id > 10: + group_id -= 10 + key = _tag_table_mapping_key(location=location, group_id=group_id) + if key not in tags_table_names: + tags_table_names[key] = [] + tags_table_names[key].append(table_name) + + return tags_table_names + + +def _tag_table_mapping_key(location, group_id): + return '{}_{}'.format(location, group_id) + + +def _get_location_and_group(key): + fields = key.split('_') + return fields[0], fields[1] diff --git a/docs/examples/python/kafka_example_consumer.py b/docs/examples/python/kafka_example_consumer.py new file mode 100644 index 0000000000..e2d5cf535b --- /dev/null +++ b/docs/examples/python/kafka_example_consumer.py @@ -0,0 +1,231 @@ +#! encoding = utf-8 +import json +import logging +import time +from concurrent.futures import ThreadPoolExecutor, Future +from json import JSONDecodeError +from typing import Callable + +import taos +from kafka import KafkaConsumer +from kafka.consumer.fetcher import ConsumerRecord + +import kafka_example_common as common + + +class Consumer(object): + DEFAULT_CONFIGS = { + 'kafka_brokers': 'localhost:9092', # kafka broker + 'kafka_topic': 'tdengine_kafka_practices', + 'kafka_group_id': 'taos', + 'taos_host': 'localhost', # TDengine host + 'taos_port': 6030, # TDengine port + 'taos_user': 'root', # TDengine user name + 'taos_password': 'taosdata', # TDengine password + 'taos_database': 'power', # TDengine database + 'message_type': 'json', # message format, 'json' or 'line' + 'clean_after_testing': False, # if drop database after testing + 'max_poll': 1000, # poll size for batch mode + 'workers': 10, # thread count for multi-threading + 'testing': False + } + + INSERT_SQL_HEADER = "insert into " + INSERT_PART_SQL = '{} values (\'{}\', {}, {}, {})' + + def __init__(self, **configs): + self.config = self.DEFAULT_CONFIGS + self.config.update(configs) + + self.consumer = None + if not self.config.get('testing'): + self.consumer = KafkaConsumer( + self.config.get('kafka_topic'), + bootstrap_servers=self.config.get('kafka_brokers'), + group_id=self.config.get('kafka_group_id'), + ) + + self.conns = taos.connect( + host=self.config.get('taos_host'), + port=self.config.get('taos_port'), + user=self.config.get('taos_user'), + password=self.config.get('taos_password'), + db=self.config.get('taos_database'), + ) + if self.config.get('workers') > 1: + self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers')) + self.tasks = [] + # tags and table mapping # key: {location}_{groupId} value: + + def consume(self): + """ + + consume data from kafka and deal. Base on `message_type`, `bath_consume`, `insert_by_table`, + there are several deal function. + :return: + """ + self.conns.execute(common.USE_DATABASE_SQL.format(self.config.get('taos_database'))) + try: + if self.config.get('message_type') == 'line': # line + self._run(self._line_to_taos) + if self.config.get('message_type') == 'json': # json + self._run(self._json_to_taos) + except KeyboardInterrupt: + logging.warning("## caught keyboard interrupt, stopping") + finally: + self.stop() + + def stop(self): + """ + + stop consuming + :return: + """ + # close consumer + if self.consumer is not None: + self.consumer.commit() + self.consumer.close() + + # multi thread + if self.config.get('workers') > 1: + if self.pool is not None: + self.pool.shutdown() + for task in self.tasks: + while not task.done(): + time.sleep(0.01) + + # clean data + if self.config.get('clean_after_testing'): + self.conns.execute(common.DROP_TABLE_SQL) + self.conns.execute(common.DROP_DATABASE_SQL.format(self.config.get('taos_database'))) + # close taos + if self.conns is not None: + self.conns.close() + + def _run(self, f): + """ + + run in batch consuming mode + :param f: + :return: + """ + i = 0 # just for test. + while True: + messages = self.consumer.poll(timeout_ms=100, max_records=self.config.get('max_poll')) + if messages: + if self.config.get('workers') > 1: + self.pool.submit(f, messages.values()) + else: + f(list(messages.values())) + if not messages: + i += 1 # just for test. + time.sleep(0.1) + if i > 3: # just for test. + logging.warning('## test over.') # just for test. + return # just for test. + + def _json_to_taos(self, messages): + """ + + convert a batch of json data to sql, and insert into TDengine + :param messages: + :return: + """ + sql = self._build_sql_from_json(messages=messages) + self.conns.execute(sql=sql) + + def _line_to_taos(self, messages): + """ + + convert a batch of lines data to sql, and insert into TDengine + :param messages: + :return: + """ + lines = [] + for partition_messages in messages: + for message in partition_messages: + lines.append(message.value.decode()) + sql = self.INSERT_SQL_HEADER + ' '.join(lines) + self.conns.execute(sql=sql) + + def _build_single_sql_from_json(self, msg_value): + try: + data = json.loads(msg_value) + except JSONDecodeError as e: + logging.error('## decode message [%s] error ', msg_value, e) + return '' + # location = data.get('location') + # group_id = data.get('groupId') + ts = data.get('ts') + current = data.get('current') + voltage = data.get('voltage') + phase = data.get('phase') + table_name = data.get('table_name') + + return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase) + + def _build_sql_from_json(self, messages): + sql_list = [] + for partition_messages in messages: + for message in partition_messages: + sql_list.append(self._build_single_sql_from_json(message.value)) + return self.INSERT_SQL_HEADER + ' '.join(sql_list) + + +def test_json_to_taos(consumer: Consumer): + records = [ + [ + ConsumerRecord(checksum=None, headers=None, offset=1, key=None, + value=json.dumps({'table_name': 'd0', + 'ts': '2022-12-06 15:13:38.643', + 'current': 3.41, + 'voltage': 105, + 'phase': 0.02027, }), + partition=1, topic='test', serialized_key_size=None, serialized_header_size=None, + serialized_value_size=None, timestamp=time.time(), timestamp_type=None), + ConsumerRecord(checksum=None, headers=None, offset=1, key=None, + value=json.dumps({'table_name': 'd1', + 'ts': '2022-12-06 15:13:39.643', + 'current': 3.41, + 'voltage': 102, + 'phase': 0.02027, }), + partition=1, topic='test', serialized_key_size=None, serialized_header_size=None, + serialized_value_size=None, timestamp=time.time(), timestamp_type=None), + ] + ] + + consumer._json_to_taos(messages=records) + + +def test_line_to_taos(consumer: Consumer): + records = [ + [ + ConsumerRecord(checksum=None, headers=None, offset=1, key=None, + value="d0 values('2023-01-01 00:00:00.001', 3.49, 109, 0.02737)".encode('utf-8'), + partition=1, topic='test', serialized_key_size=None, serialized_header_size=None, + serialized_value_size=None, timestamp=time.time(), timestamp_type=None), + ConsumerRecord(checksum=None, headers=None, offset=1, key=None, + value="d1 values('2023-01-01 00:00:00.002', 6.19, 112, 0.09171)".encode('utf-8'), + partition=1, topic='test', serialized_key_size=None, serialized_header_size=None, + serialized_value_size=None, timestamp=time.time(), timestamp_type=None), + ] + ] + consumer._line_to_taos(messages=records) + + +def consume(kafka_brokers, kafka_topic, kafka_group_id, taos_host, taos_port, taos_user, + taos_password, taos_database, message_type, max_poll, workers): + c = Consumer(kafka_brokers=kafka_brokers, kafka_topic=kafka_topic, kafka_group_id=kafka_group_id, + taos_host=taos_host, taos_port=taos_port, taos_user=taos_user, taos_password=taos_password, + taos_database=taos_database, message_type=message_type, max_poll=max_poll, workers=workers) + c.consume() + + +if __name__ == '__main__': + consumer = Consumer(testing=True) + common.create_database_and_tables(host='localhost', port=6030, user='root', password='taosdata', db='py_kafka_test', + table_count=10) + consumer.conns.execute(common.USE_DATABASE_SQL.format('py_kafka_test')) + test_json_to_taos(consumer) + test_line_to_taos(consumer) + common.clean(host='localhost', port=6030, user='root', password='taosdata', db='py_kafka_test') diff --git a/docs/examples/python/kafka_example_perform.py b/docs/examples/python/kafka_example_perform.py new file mode 100644 index 0000000000..23ae4b48c8 --- /dev/null +++ b/docs/examples/python/kafka_example_perform.py @@ -0,0 +1,103 @@ +#! encoding=utf-8 + +import argparse +import logging +import multiprocessing +import time +from multiprocessing import pool + +import kafka_example_common as common +import kafka_example_consumer as consumer +import kafka_example_producer as producer + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('-kafka-broker', type=str, default='localhost:9092', + help='kafka borker host. default is `localhost:9200`') + parser.add_argument('-kafka-topic', type=str, default='tdengine-kafka-practices', + help='kafka topic. default is `tdengine-kafka-practices`') + parser.add_argument('-kafka-group', type=str, default='kafka_practices', + help='kafka consumer group. default is `kafka_practices`') + parser.add_argument('-taos-host', type=str, default='localhost', + help='TDengine host. default is `localhost`') + parser.add_argument('-taos-port', type=int, default=6030, help='TDengine port. default is 6030') + parser.add_argument('-taos-user', type=str, default='root', help='TDengine username, default is `root`') + parser.add_argument('-taos-password', type=str, default='taosdata', help='TDengine password, default is `taosdata`') + parser.add_argument('-taos-db', type=str, default='tdengine_kafka_practices', + help='TDengine db name, default is `tdengine_kafka_practices`') + parser.add_argument('-table-count', type=int, default=100, help='TDengine sub-table count, default is 100') + parser.add_argument('-table-items', type=int, default=1000, help='items in per sub-tables, default is 1000') + parser.add_argument('-message-type', type=str, default='line', + help='kafka message type. `line` or `json`. default is `line`') + parser.add_argument('-max-poll', type=int, default=1000, help='max poll for kafka consumer') + parser.add_argument('-threads', type=int, default=10, help='thread count for deal message') + parser.add_argument('-processes', type=int, default=1, help='process count') + + args = parser.parse_args() + total = args.table_count * args.table_items + + logging.warning("## start to prepare testing data...") + prepare_data_start = time.time() + producer.produce_total(100, args.kafka_broker, args.kafka_topic, args.message_type, total, args.table_count) + prepare_data_end = time.time() + logging.warning("## prepare testing data finished! spend-[%s]", prepare_data_end - prepare_data_start) + + logging.warning("## start to create database and tables ...") + create_db_start = time.time() + # create database and table + common.create_database_and_tables(host=args.taos_host, port=args.taos_port, user=args.taos_user, + password=args.taos_password, db=args.taos_db, table_count=args.table_count) + create_db_end = time.time() + logging.warning("## create database and tables finished! spend [%s]", create_db_end - create_db_start) + + processes = args.processes + + logging.warning("## start to consume data and insert into TDengine...") + consume_start = time.time() + if processes > 1: # multiprocess + multiprocessing.set_start_method("spawn") + pool = pool.Pool(processes) + + consume_start = time.time() + for _ in range(processes): + pool.apply_async(func=consumer.consume, args=( + args.kafka_broker, args.kafka_topic, args.kafka_group, args.taos_host, args.taos_port, args.taos_user, + args.taos_password, args.taos_db, args.message_type, args.max_poll, args.threads)) + pool.close() + pool.join() + else: + consume_start = time.time() + consumer.consume(kafka_brokers=args.kafka_broker, kafka_topic=args.kafka_topic, kafka_group_id=args.kafka_group, + taos_host=args.taos_host, taos_port=args.taos_port, taos_user=args.taos_user, + taos_password=args.taos_password, taos_database=args.taos_db, message_type=args.message_type, + max_poll=args.max_poll, workers=args.threads) + consume_end = time.time() + logging.warning("## consume data and insert into TDengine over! spend-[%s]", consume_end - consume_start) + + # print report + logging.warning( + "\n#######################\n" + " Prepare data \n" + "#######################\n" + "# data_type # %s \n" + "# total # %s \n" + "# spend # %s s\n" + "#######################\n" + " Create database \n" + "#######################\n" + "# stable # 1 \n" + "# sub-table # 100 \n" + "# spend # %s s \n" + "#######################\n" + " Consume \n" + "#######################\n" + "# data_type # %s \n" + "# threads # %s \n" + "# processes # %s \n" + "# total_count # %s \n" + "# spend # %s s\n" + "# per_second # %s \n" + "#######################\n", + args.message_type, total, prepare_data_end - prepare_data_start, create_db_end - create_db_start, + args.message_type, args.threads, processes, total, consume_end - consume_start, + total / (consume_end - consume_start)) diff --git a/docs/examples/python/kafka_example_producer.py b/docs/examples/python/kafka_example_producer.py new file mode 100644 index 0000000000..51468c7e37 --- /dev/null +++ b/docs/examples/python/kafka_example_producer.py @@ -0,0 +1,97 @@ +#! encoding = utf-8 +import json +import random +import threading +from concurrent.futures import ThreadPoolExecutor, Future +from datetime import datetime + +from kafka import KafkaProducer + +locations = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose', + 'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale', + 'California.SantaClara', 'California.Cupertino'] + +producers: list[KafkaProducer] = [] + +lock = threading.Lock() +start = 1640966400 + + +def produce_total(workers, broker, topic, message_type, total, table_count): + if len(producers) == 0: + lock.acquire() + if len(producers) == 0: + _init_kafka_producers(broker=broker, count=10) + lock.release() + pool = ThreadPoolExecutor(max_workers=workers) + futures = [] + for _ in range(0, workers): + futures.append(pool.submit(_produce_total, topic, message_type, int(total / workers), table_count)) + pool.shutdown() + for f in futures: + f.result() + _close_kafka_producers() + + +def _produce_total(topic, message_type, total, table_count): + producer = _get_kafka_producer() + for _ in range(total): + message = _get_fake_date(message_type=message_type, table_count=table_count) + producer.send(topic=topic, value=message.encode(encoding='utf-8')) + + +def _init_kafka_producers(broker, count): + for _ in range(count): + p = KafkaProducer(bootstrap_servers=broker, batch_size=64 * 1024, linger_ms=300, acks=0) + producers.append(p) + + +def _close_kafka_producers(): + for p in producers: + p.close() + + +def _get_kafka_producer(): + return producers[random.randint(0, len(producers) - 1)] + + +def _get_fake_date(table_count, message_type='json'): + if message_type == 'json': + return _get_json_message(table_count=table_count) + if message_type == 'line': + return _get_line_message(table_count=table_count) + return '' + + +def _get_json_message(table_count): + return json.dumps({ + 'ts': _get_timestamp(), + 'current': random.randint(0, 1000) / 100, + 'voltage': random.randint(105, 115), + 'phase': random.randint(0, 32000) / 100000, + 'location': random.choice(locations), + 'groupId': random.randint(1, 10), + 'table_name': _random_table_name(table_count) + }) + + +def _get_line_message(table_count): + return "{} values('{}', {}, {}, {})".format( + _random_table_name(table_count), # table + _get_timestamp(), # ts + random.randint(0, 1000) / 100, # current + random.randint(105, 115), # voltage + random.randint(0, 32000) / 100000, # phase + ) + + +def _random_table_name(table_count): + return 'd{}'.format(random.randint(0, table_count - 1)) + + +def _get_timestamp(): + global start + lock.acquire(blocking=True) + start += 0.001 + lock.release() + return datetime.fromtimestamp(start).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] diff --git a/docs/zh/07-develop/03-insert-data/_py_kafka.mdx b/docs/zh/07-develop/03-insert-data/_py_kafka.mdx index cd7edf557d..d656325674 100644 --- a/docs/zh/07-develop/03-insert-data/_py_kafka.mdx +++ b/docs/zh/07-develop/03-insert-data/_py_kafka.mdx @@ -55,6 +55,70 @@ for p in ps: ### 完整示例 +
+kafka_example_perform + +`kafka_example_perform` 是示例程序的入口 + ```py -{{#include docs/examples/python/kafka_example.py}} +{{#include docs/examples/python/kafka_example_perform.py}} ``` +
+ +
+kafka_example_common + +`kafka_example_common` 是示例程序的公共代码 + +```py +{{#include docs/examples/python/kafka_example_common.py}} +``` +
+ +
+kafka_example_producer + +`kafka_example_producer` 是示例程序的 producer 代码,负责生成并发送测试数据到 kafka + +```py +{{#include docs/examples/python/kafka_example_producer.py}} +``` +
+ +
+kafka_example_consumer + +`kafka_example_consumer` 是示例程序的 consumer 代码,负责从 kafka 消费数据,并写入到 TDengine + +```py +{{#include docs/examples/python/kafka_example_consumer.py}} +``` +
+ +### 执行步骤 + +
+ 执行 Python 示例程序 + + 1. 安装并启动 kafka + + 2. python 环境准备 + - 安装 python3 + - 安装 taospy + - 安装 kafka-python + + 3. 执行示例程序 + + 程序的执行入口是 `kafka_example_perform.py`,获取程序完整的执行参数,请执行 help 命令。 + + ``` + python3 kafka_example_perform.py --help + ``` + + 以下为创建 100 个子表,每个子表 20000 条数据,kafka max poll 为 100,一个进程,每个进程一个处理线程的程序执行命令 + + ``` + python3 kafka_example_perform.py -table-count=100 -table-items=20000 -max-poll=100 -threads=1 -processes=1 + ``` + +
diff --git a/tests/docs-examples-test/python.sh b/tests/docs-examples-test/python.sh index ccb391b752..31342b33d7 100644 --- a/tests/docs-examples-test/python.sh +++ b/tests/docs-examples-test/python.sh @@ -83,4 +83,5 @@ python3 fast_write_example.py # 20 pip3 install kafka-python -python3 kafka_example.py +python3 kafka_example_consumer.py +