add jdbc efficient writing docs

This commit is contained in:
sheyanjie-qq 2025-03-25 00:03:58 +08:00
parent 83e4c563d9
commit f66dffe5db
14 changed files with 397 additions and 664 deletions

View File

@ -51,12 +51,10 @@ For more tuning parameters, please refer to [Database Management](../../tdengine
### Scenario Design {#scenario}
The following example program demonstrates how to write data efficiently, with the scenario designed as follows:
- The TDengine client application continuously reads data from other data sources. In the example program, simulated data generation is used to mimic reading from data sources.
- The speed of a single connection writing to TDengine cannot match the speed of reading data, so the client application starts multiple threads, each establishing a connection with TDengine, and each thread has a dedicated fixed-size message queue.
- The client application hashes the received data according to the table name (or subtable name) to different threads, i.e., writing to the message queue corresponding to that thread, ensuring that data belonging to a certain table (or subtable) will always be processed by a fixed thread.
- Each sub-thread empties the data in its associated message queue or reaches a predetermined threshold of data volume, writes that batch of data to TDengine, and continues to process the data received afterwards.
The following sample program demonstrates how to efficiently write data, with the scenario designed as follows:
- The TDengine client program continuously reads data from other data sources. In the sample program, simulated data generation is used to mimic data source reading, while also providing an example of pulling data from Kafka and writing it to TDengine.
- To improve the data reading speed of the TDengine client program, multi-threading is used for reading. To avoid out-of-order issues, the sets of tables read by multiple reading threads should be non-overlapping.
- To match the data reading speed of each data reading thread, a set of write threads is launched in the background. Each write thread has an exclusive fixed-size message queue.
<figure>
<Image img={imgThread} alt="Thread model for efficient writing example"/>
@ -72,94 +70,172 @@ This sample code assumes that the source data belongs to different subtables of
<Tabs defaultValue="java" groupId="lang">
<TabItem label="Java" value="java">
**Introduction to JDBC Efficient Writing Features**
Starting from version 3.6.0, the JDBC driver provides efficient writing features over WebSocket connections. For configuration parameters, refer to efficient Writing Configuration. The JDBC driver's efficient writing features include the following characteristics:
- Supports the JDBC standard parameter binding interface.
- When resources are sufficient, writing capacity is linearly correlated with the configuration of write threads.
- Supports configuration of write timeout, number of retries, and retry interval after connection disconnection and reconnection.
- Supports invoking the executeUpdate interface to obtain the number of written data records, and exceptions during writing can be caught.
**Program Listing**
| Class Name | Function Description |
| ----------------- | -------------------------------------------------------------------------------- |
| FastWriteExample | Main program |
| ReadTask | Reads data from a simulated source, hashes the table name to get the Queue Index, writes to the corresponding Queue |
| WriteTask | Retrieves data from the Queue, forms a Batch, writes to TDengine |
| MockDataSource | Simulates generating data for a certain number of meters subtables |
| SQLWriter | WriteTask relies on this class to complete SQL stitching, automatic table creation, SQL writing, and SQL length checking |
| StmtWriter | Implements parameter binding for batch writing (not yet completed) |
| DataBaseMonitor | Counts the writing speed and prints the current writing speed to the console every 10 seconds |
| Class Name | Functional Description |
| ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| FastWriteExample | The main program responsible for command-line argument parsing, thread pool creation, and waiting for task completion. |
| WorkTask | Reads data from a simulated source and writes it using the JDBC standard interface. |
| MockDataSource | Simulates and generates data for a certain number of `meters` child tables. |
| DataBaseMonitor | Tracks write speed and prints the current write speed to the console every 10 seconds. |
| CreateSubTableTask | Creates child tables within a specified range for invocation by the main program. |
| Meters | Provides serialization and deserialization of single records in the `meters` table, used for sending messages to Kafka and receiving messages from Kafka. |
| ProducerTask | A producer that sends messages to Kafka. |
| ConsumerTask | A consumer that receives messages from Kafka, writes data to TDengine using the JDBC efficient writing interface, and commits offsets according to progress. |
| Util | Provides basic functionalities, including creating connections, creating Kafka topics, and counting write operations. |
Below are the complete codes and more detailed function descriptions for each class.
<details>
<summary>FastWriteExample</summary>
The main program is responsible for:
1. Creating message queues
2. Starting write threads
3. Starting read threads
4. Counting the writing speed every 10 seconds
The main program exposes 4 parameters by default, which can be adjusted each time the program is started, for testing and tuning:
1. Number of read threads. Default is 1.
2. Number of write threads. Default is 3.
3. Total number of simulated tables. Default is 1,000. This will be evenly divided among the read threads. If the total number of tables is large, table creation will take longer, and the initial writing speed statistics may be slow.
4. Maximum number of records written per batch. Default is 3,000.
Queue capacity (taskQueueCapacity) is also a performance-related parameter, which can be adjusted by modifying the program. Generally speaking, the larger the queue capacity, the less likely it is to be blocked when enqueuing, the greater the throughput of the queue, but the larger the memory usage. The default value of the sample program is already set large enough.
**Introduction to Main Program Command-Line Arguments:**
```shell
-b,--batchSizeByRow <arg> Specifies the `batchSizeByRow` parameter for Efficient Writing, default is 1000
-c,--cacheSizeByRow <arg> Specifies the `cacheSizeByRow` parameter for Efficient Writing, default is 10000
-d,--dbName <arg> Specifies the database name, default is `test`
--help Prints help information
-K,--useKafka Enables Kafka mode, creating a producer to send messages and a consumer to receive messages for writing to TDengine. Otherwise, uses worker threads to subscribe to simulated data for writing.
-r,--readThreadCount <arg> Specifies the number of worker threads, default is 5. In Kafka mode, this parameter also determines the number of producer and consumer threads.
-R,--rowsPerSubTable <arg> Specifies the number of rows to write per child table, default is 100
-s,--subTableNum <arg> Specifies the total number of child tables, default is 1000000
-w,--writeThreadPerReadThread <arg> Specifies the number of write threads per worker thread, default is 5
```
**JDBC URL and Kafka Cluster Address Configuration:**
1. The JDBC URL is configured via an environment variable, for example:
```shell
export TDENGINE_JDBC_URL="jdbc:TAOS-WS://localhost:6041?user=root&password=taosdata"
```
2. The Kafka cluster address is configured via an environment variable, for example:
```shell
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
```
**Usage:**
```shell
1. Simulated data writing mode:
java -jar highVolume.jar -r 5 -w 5 -b 10000 -c 100000 -s 1000000 -R 1000
2. Kafka subscription writing mode:
java -jar highVolume.jar -r 5 -w 5 -b 10000 -c 100000 -s 1000000 -R 100 -K
```
**Responsibilities of the Main Program:**
1. Parses command-line arguments.
2. Creates child tables.
3. Creates worker threads or Kafka producers and consumers.
4. Tracks write speed.
5. Waits for writing to complete and releases resources.
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java}}
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/FastWriteExample.java}}
```
</details>
<details>
<summary>WorkTask</summary>
The worker thread is responsible for reading data from the simulated data source. Each read task is associated with a simulated data source, which can generate data for a specific range of sub-tables. Different simulated data sources generate data for different tables.
The worker thread uses a blocking approach to invoke the JDBC standard interface `addBatch`. This means that if the corresponding efficient writing backend queue is full, the write operation will block.
```java
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/WorkTask.java}}
```
</details>
<details>
<summary>ReadTask</summary>
The read task is responsible for reading data from the data source. Each read task is associated with a simulated data source. Each simulated data source can generate data for a certain number of tables. Different simulated data sources generate data for different tables.
The read task writes to the message queue in a blocking manner. That is, once the queue is full, the write operation will be blocked.
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java}}
```
</details>
<details>
<summary>WriteTask</summary>
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java}}
```
</details>
<details>
<summary>MockDataSource</summary>
A simulated data generator that produces data for a certain range of sub-tables. To mimic real-world scenarios, it generates data in a round-robin fashion, one row per subtable.
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java}}
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/MockDataSource.java}}
```
</details>
<details>
<summary>CreateSubTableTask</summary>
<summary>SQLWriter</summary>
The SQLWriter class encapsulates the logic of SQL stitching and data writing. Note that none of the tables are created in advance; instead, they are created in batches using the supertable as a template when a table not found exception is caught, and then the INSERT statement is re-executed. For other exceptions, this simply logs the SQL statement being executed at the time; you can also log more clues to facilitate error troubleshooting and fault recovery.
Creates sub-tables within a specified range using a batch SQL creation approach.
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java}}
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/CreateSubTableTask.java}}
```
</details>
<details>
<summary>Meters</summary>
<summary>DataBaseMonitor</summary>
A data model class that provides serialization and deserialization methods for sending data to Kafka and receiving data from Kafka.
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java}}
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Meters.java}}
```
</details>
<details>
<summary>ProducerTask</summary>
A message producer that writes data generated by the simulated data generator to all partitions using a hash method different from JDBC efficient writing.
```java
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ProducerTask.java}}
```
</details>
<details>
<summary>ConsumerTask</summary>
A message consumer that receives messages from Kafka and writes them to TDengine.
```java
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ConsumerTask.java}}
```
</details>
<details>
<summary>StatTask</summary>
Provides a periodic function to count the number of written records.
```java
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/StatTask.java}}
```
</details>
<details>
<summary>Util</summary>
A utility class that provides functions such as creating connections, creating databases, and creating topics.
```java
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Util.java}}
```
</details>
@ -169,12 +245,6 @@ The SQLWriter class encapsulates the logic of SQL stitching and data writing. No
<details>
<summary>Execute the Java Example Program</summary>
Before running the program, configure the environment variable `TDENGINE_JDBC_URL`. If the TDengine Server is deployed on the local machine, and the username, password, and port are all default values, then you can configure:
```shell
TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
```
**Execute the example program in a local integrated development environment**
1. Clone the TDengine repository
@ -183,268 +253,72 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
git clone git@github.com:taosdata/TDengine.git --depth 1
```
2. Open the `docs/examples/java` directory with the integrated development environment.
2. Open the `TDengine/docs/examples/JDBC/highvolume` directory with the integrated development environment.
3. Configure the environment variable `TDENGINE_JDBC_URL` in the development environment. If the global environment variable `TDENGINE_JDBC_URL` has already been configured, you can skip this step.
4. Run the class `com.taos.example.highvolume.FastWriteExample`.
4. If you want to run the Kafka example, you need to set the environment variable `KAFKA_BOOTSTRAP_SERVERS` for the Kafka cluster address.
5. Specify command-line arguments, such as `-r 3 -w 3 -b 100 -c 1000 -s 1000 -R 100`.
6. Run the class `com.taos.example.highvolume.FastWriteExample`.
**Execute the example program on a remote server**
To execute the example program on a server, follow these steps:
1. Package the example code. Execute in the directory TDengine/docs/examples/java:
1. Package the sample code. Navigate to the directory `TDengine/docs/examples/JDBC/highvolume` and run the following command to generate `highVolume.jar`:
```shell
mvn package
```
2. Create an examples directory on the remote server:
2. Copy the program to the specified directory on the server:
```shell
mkdir -p examples/java
scp -r .\target\highVolume.jar <user>@<host>:~/dest-path
```
3. Copy dependencies to the specified directory on the server:
- Copy dependency packages, only once
```shell
scp -r .\target\lib <user>@<host>:~/examples/java
```
- Copy the jar package of this program, copy every time the code is updated
```shell
scp -r .\target\javaexample-1.0.jar <user>@<host>:~/examples/java
```
4. Configure the environment variable.
3. Configure the environment variable.
Edit `~/.bash_profile` or `~/.bashrc` and add the following content for example:
```shell
export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
```
The above uses the default JDBC URL when TDengine Server is deployed locally. You need to modify it according to your actual situation.
5. Start the example program with the Java command, command template:
The above uses the default JDBC URL for a locally deployed TDengine Server. Modify it according to your actual environment.
If you want to use Kafka subscription mode, additionally configure the Kafka cluster environment variable:
```shell
java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample <read_thread_count> <white_thread_count> <total_table_count> <max_batch_size>
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
```
6. End the test program. The test program will not end automatically; after obtaining a stable writing speed under the current configuration, press <kbd>CTRL</kbd> + <kbd>C</kbd> to end the program.
Below is a log output from an actual run, with machine configuration 16 cores + 64G + SSD.
4. Start the sample program with the Java command. Use the following template (append `-K` for Kafka subscription mode):
```shell
java -jar highVolume.jar -r 5 -w 5 -b 10000 -c 100000 -s 1000000 -R 1000
```
5. Terminate the test program. The program does not exit automatically. Once a stable write speed is achieved under the current configuration, press <kbd>CTRL</kbd> + <kbd>C</kbd> to terminate it.
Below is a sample log output from an actual run on a machine with a 40-core CPU, 256GB RAM, and SSD storage.
```text
root@vm85$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 2 12
18:56:35.896 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=2, writeTaskCount=12 tableCount=1000 maxBatchSize=3000
18:56:36.011 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.015 [WriteThread-0] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.021 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.022 [WriteThread-1] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.031 [WriteThread-2] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.032 [WriteThread-2] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.041 [WriteThread-3] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.042 [WriteThread-3] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.093 [WriteThread-4] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.094 [WriteThread-4] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.099 [WriteThread-5] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.100 [WriteThread-5] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.100 [WriteThread-6] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.101 [WriteThread-6] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.103 [WriteThread-7] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.104 [WriteThread-7] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.105 [WriteThread-8] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.107 [WriteThread-8] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.108 [WriteThread-9] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.109 [WriteThread-9] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.156 [WriteThread-10] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.157 [WriteThread-11] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.158 [WriteThread-10] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.158 [ReadThread-0] INFO com.taos.example.highvolume.ReadTask - started
18:56:36.158 [ReadThread-1] INFO com.taos.example.highvolume.ReadTask - started
18:56:36.158 [WriteThread-11] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:46.369 [main] INFO c.t.e.highvolume.FastWriteExample - count=18554448 speed=1855444
18:56:56.946 [main] INFO c.t.e.highvolume.FastWriteExample - count=39059660 speed=2050521
18:57:07.322 [main] INFO c.t.e.highvolume.FastWriteExample - count=59403604 speed=2034394
18:57:18.032 [main] INFO c.t.e.highvolume.FastWriteExample - count=80262938 speed=2085933
18:57:28.432 [main] INFO c.t.e.highvolume.FastWriteExample - count=101139906 speed=2087696
18:57:38.921 [main] INFO c.t.e.highvolume.FastWriteExample - count=121807202 speed=2066729
18:57:49.375 [main] INFO c.t.e.highvolume.FastWriteExample - count=142952417 speed=2114521
18:58:00.689 [main] INFO c.t.e.highvolume.FastWriteExample - count=163650306 speed=2069788
18:58:11.646 [main] INFO c.t.e.highvolume.FastWriteExample - count=185019808 speed=2136950
---------------$ java -jar highVolume.jar -r 2 -w 10 -b 10000 -c 100000 -s 1000000 -R 100
[INFO ] 2025-03-24 18:03:17.980 com.taos.example.highvolume.FastWriteExample main 309 main readThreadCount=2, writeThreadPerReadThread=10 batchSizeByRow=10000 cacheSizeByRow=100000, subTableNum=1000000, rowsPerSubTable=100
[INFO ] 2025-03-24 18:03:17.983 com.taos.example.highvolume.FastWriteExample main 312 main create database begin.
[INFO ] 2025-03-24 18:03:34.499 com.taos.example.highvolume.FastWriteExample main 315 main create database end.
[INFO ] 2025-03-24 18:03:34.500 com.taos.example.highvolume.FastWriteExample main 317 main create sub tables start.
[INFO ] 2025-03-24 18:03:34.502 com.taos.example.highvolume.FastWriteExample createSubTables 73 main create sub table task started.
[INFO ] 2025-03-24 18:03:55.777 com.taos.example.highvolume.FastWriteExample createSubTables 82 main create sub table task finished.
[INFO ] 2025-03-24 18:03:55.778 com.taos.example.highvolume.FastWriteExample main 319 main create sub tables end.
[INFO ] 2025-03-24 18:03:55.781 com.taos.example.highvolume.WorkTask run 41 FW-work-thread-2 started
[INFO ] 2025-03-24 18:03:55.781 com.taos.example.highvolume.WorkTask run 41 FW-work-thread-1 started
[INFO ] 2025-03-24 18:04:06.580 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=12235906 speed=1223590
[INFO ] 2025-03-24 18:04:17.531 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=31185614 speed=1894970
[INFO ] 2025-03-24 18:04:28.490 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=51464904 speed=2027929
[INFO ] 2025-03-24 18:04:40.851 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=71498113 speed=2003320
[INFO ] 2025-03-24 18:04:51.948 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=91242103 speed=1974399
```
</details>
</TabItem>
<TabItem label="Python" value="python">
**Program Listing**
The Python example program uses a multi-process architecture and employs a cross-process message queue.
| Function or Class | Description |
| ------------------------ | ------------------------------------------------------------------- |
| main function | Entry point of the program, creates various subprocesses and message queues |
| run_monitor_process function | Creates database, supertables, tracks write speed and periodically prints to console |
| run_read_task function | Main logic for read processes, responsible for reading data from other data systems and distributing it to assigned queues |
| MockDataSource class | Simulates a data source, implements iterator interface, returns the next 1,000 records for each table in batches |
| run_write_task function | Main logic for write processes. Retrieves as much data as possible from the queue and writes in batches |
| SQLWriter class | Handles SQL writing and automatic table creation |
| StmtWriter class | Implements batch writing with parameter binding (not yet completed) |
<details>
<summary>main function</summary>
The main function is responsible for creating message queues and launching subprocesses, which are of 3 types:
1. 1 monitoring process, responsible for database initialization and tracking write speed
2. n read processes, responsible for reading data from other data systems
3. m write processes, responsible for writing to the database
The main function can accept 5 startup parameters, in order:
1. Number of read tasks (processes), default is 1
2. Number of write tasks (processes), default is 1
3. Total number of simulated tables, default is 1,000
4. Queue size (in bytes), default is 1,000,000
5. Maximum number of records written per batch, default is 3,000
```python
{{#include docs/examples/python/fast_write_example.py:main}}
```
</details>
<details>
<summary>run_monitor_process</summary>
The monitoring process is responsible for initializing the database and monitoring the current write speed.
```python
{{#include docs/examples/python/fast_write_example.py:monitor}}
```
</details>
<details>
<summary>run_read_task function</summary>
The read process, responsible for reading data from other data systems and distributing it to assigned queues.
```python
{{#include docs/examples/python/fast_write_example.py:read}}
```
</details>
<details>
<summary>MockDataSource</summary>
Below is the implementation of the mock data source. We assume that each piece of data generated by the data source includes the target table name information. In practice, you might need certain rules to determine the target table name.
```python
{{#include docs/examples/python/mockdatasource.py}}
```
</details>
<details>
<summary>run_write_task function</summary>
The write process retrieves as much data as possible from the queue and writes in batches.
```python
{{#include docs/examples/python/fast_write_example.py:write}}
```
</details>
<details>
The SQLWriter class encapsulates the logic of SQL stitching and data writing. None of the tables are pre-created; instead, they are batch-created using the supertable as a template when a table does not exist error occurs, and then the INSERT statement is re-executed. For other errors, the SQL executed at the time is recorded for error troubleshooting and fault recovery. This class also checks whether the SQL exceeds the maximum length limit, based on the TDengine 3.0 limit, the supported maximum SQL length of 1,048,576 is passed in by the input parameter maxSQLLength.
<summary>SQLWriter</summary>
```python
{{#include docs/examples/python/sql_writer.py}}
```
</details>
**Execution Steps**
<details>
<summary>Execute the Python Example Program</summary>
1. Prerequisites
- TDengine client driver installed
- Python3 installed, recommended version >= 3.8
- taospy installed
2. Install faster-fifo to replace the built-in multiprocessing.Queue in python
```shell
pip3 install faster-fifo
```
3. Click the "View Source" link above to copy the `fast_write_example.py`, `sql_writer.py`, and `mockdatasource.py` files.
4. Execute the example program
```shell
python3 fast_write_example.py <READ_TASK_COUNT> <WRITE_TASK_COUNT> <TABLE_COUNT> <QUEUE_SIZE> <MAX_BATCH_SIZE>
```
Below is an actual output from a run, on a machine configured with 16 cores + 64G + SSD.
```text
root@vm85$ python3 fast_write_example.py 8 8
2022-07-14 19:13:45,869 [root] - READ_TASK_COUNT=8, WRITE_TASK_COUNT=8, TABLE_COUNT=1000, QUEUE_SIZE=1000000, MAX_BATCH_SIZE=3000
2022-07-14 19:13:48,882 [root] - WriteTask-0 started with pid 718347
2022-07-14 19:13:48,883 [root] - WriteTask-1 started with pid 718348
2022-07-14 19:13:48,884 [root] - WriteTask-2 started with pid 718349
2022-07-14 19:13:48,884 [root] - WriteTask-3 started with pid 718350
2022-07-14 19:13:48,885 [root] - WriteTask-4 started with pid 718351
2022-07-14 19:13:48,885 [root] - WriteTask-5 started with pid 718352
2022-07-14 19:13:48,886 [root] - WriteTask-6 started with pid 718353
2022-07-14 19:13:48,886 [root] - WriteTask-7 started with pid 718354
2022-07-14 19:13:48,887 [root] - ReadTask-0 started with pid 718355
2022-07-14 19:13:48,888 [root] - ReadTask-1 started with pid 718356
2022-07-14 19:13:48,889 [root] - ReadTask-2 started with pid 718357
2022-07-14 19:13:48,889 [root] - ReadTask-3 started with pid 718358
2022-07-14 19:13:48,890 [root] - ReadTask-4 started with pid 718359
2022-07-14 19:13:48,891 [root] - ReadTask-5 started with pid 718361
2022-07-14 19:13:48,892 [root] - ReadTask-6 started with pid 718364
2022-07-14 19:13:48,893 [root] - ReadTask-7 started with pid 718365
2022-07-14 19:13:56,042 [DataBaseMonitor] - count=6676310 speed=667631.0
2022-07-14 19:14:06,196 [DataBaseMonitor] - count=20004310 speed=1332800.0
2022-07-14 19:14:16,366 [DataBaseMonitor] - count=32290310 speed=1228600.0
2022-07-14 19:14:26,527 [DataBaseMonitor] - count=44438310 speed=1214800.0
2022-07-14 19:14:36,673 [DataBaseMonitor] - count=56608310 speed=1217000.0
2022-07-14 19:14:46,834 [DataBaseMonitor] - count=68757310 speed=1214900.0
2022-07-14 19:14:57,280 [DataBaseMonitor] - count=80992310 speed=1223500.0
2022-07-14 19:15:07,689 [DataBaseMonitor] - count=93805310 speed=1281300.0
2022-07-14 19:15:18,020 [DataBaseMonitor] - count=106111310 speed=1230600.0
2022-07-14 19:15:28,356 [DataBaseMonitor] - count=118394310 speed=1228300.0
2022-07-14 19:15:38,690 [DataBaseMonitor] - count=130742310 speed=1234800.0
2022-07-14 19:15:49,000 [DataBaseMonitor] - count=143051310 speed=1230900.0
2022-07-14 19:15:59,323 [DataBaseMonitor] - count=155276310 speed=1222500.0
2022-07-14 19:16:09,649 [DataBaseMonitor] - count=167603310 speed=1232700.0
2022-07-14 19:16:19,995 [DataBaseMonitor] - count=179976310 speed=1237300.0
```
</details>
:::note
When using the Python connector to connect to TDengine with multiple processes, there is a limitation: connections cannot be established in the parent process; all connections must be created in the child processes.
If a connection is created in the parent process, any connection attempts in the child processes will be perpetually blocked. This is a known issue.
:::
</TabItem>
</Tabs>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 23 KiB

After

Width:  |  Height:  |  Size: 558 KiB

View File

@ -16,7 +16,7 @@ import java.util.List;
import java.util.Properties;
class ConsumerTask implements Runnable, Stoppable {
private final static Logger logger = LoggerFactory.getLogger(ConsumerTask.class);
private static final Logger logger = LoggerFactory.getLogger(ConsumerTask.class);
private final int taskId;
private final int writeThreadCount;
private final int batchSizeByRow;
@ -25,12 +25,12 @@ class ConsumerTask implements Runnable, Stoppable {
private volatile boolean active = true;
public ConsumerTask(int taskId,
int writeThradCount,
int writeThreadCount,
int batchSizeByRow,
int cacheSizeByRow,
String dbName) {
this.taskId = taskId;
this.writeThreadCount = writeThradCount;
this.writeThreadCount = writeThreadCount;
this.batchSizeByRow = batchSizeByRow;
this.cacheSizeByRow = cacheSizeByRow;
this.dbName = dbName;
@ -38,8 +38,8 @@ class ConsumerTask implements Runnable, Stoppable {
@Override
public void run() {
logger.info("Consumer Task {} started", taskId);
// 配置 Kafka 消费者的属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Util.getKafkaBootstrapServers());
@ -109,10 +109,12 @@ class ConsumerTask implements Runnable, Stoppable {
} finally {
consumer.close();
}
logger.info("Consumer Task {} stopped", taskId);
}
public void stop() {
logger.info("stop");
logger.info("consumer task {} stopping", taskId);
this.active = false;
}
}

View File

@ -8,7 +8,7 @@ import java.sql.SQLException;
import java.sql.Statement;
class CreateSubTableTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(CreateSubTableTask.class);
private static final Logger logger = LoggerFactory.getLogger(CreateSubTableTask.class);
private final int taskId;
private final int subTableStartIndex;
private final int subTableEndIndex;

View File

@ -1,47 +0,0 @@
package com.taos.example.highvolume;
import java.sql.*;
/**
* Prepare target database.
* Count total records in database periodically so that we can estimate the writing speed.
*/
public class DataBaseMonitor {
private Connection conn;
private Statement stmt;
private String dbName;
public DataBaseMonitor init(String dbName) throws SQLException {
if (conn == null) {
conn = Util.getConnection();
stmt = conn.createStatement();
}
this.dbName = dbName;
return this;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
public void prepareDatabase() throws SQLException {
stmt.execute("DROP DATABASE IF EXISTS " + dbName);
stmt.execute("CREATE DATABASE IF NOT EXISTS " + dbName + " vgroups 20");
stmt.execute("use " + dbName);
stmt.execute("CREATE STABLE " + dbName + ".meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(64))");
}
public long count() throws SQLException {
try (ResultSet result = stmt.executeQuery("SELECT count(*) from " + dbName +".meters")) {
result.next();
return result.getLong(1);
}
}
}

View File

@ -15,11 +15,10 @@ import java.util.concurrent.atomic.AtomicInteger;
public class FastWriteExample {
static final Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
static final DataBaseMonitor databaseMonitor = new DataBaseMonitor();
static ThreadPoolExecutor writerThreads;
static ThreadPoolExecutor producerThreads;
static final ThreadPoolExecutor statThread = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
static private final List<Stoppable> allTasks = new ArrayList<>();
private static final List<Stoppable> allTasks = new ArrayList<>();
private static int readThreadCount = 5;
private static int writeThreadPerReadThread = 5;
@ -190,7 +189,7 @@ public class FastWriteExample {
}
}
// 打印帮助信息的方法
// print help
private static void printHelp(Options options) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("java -jar highVolume.jar", options);
@ -198,7 +197,6 @@ public class FastWriteExample {
}
public static void main(String[] args) throws SQLException, InterruptedException {
Thread.sleep(10 * 1000);
Options options = new Options();
Option readThdcountOption = new Option("r", "readThreadCount", true, "Specify the readThreadCount, default is 5");
@ -233,6 +231,11 @@ public class FastWriteExample {
kafkaOption.setRequired(false);
options.addOption(kafkaOption);
Option helpOption = new Option(null, "help", false, "print help information");
helpOption.setRequired(false);
options.addOption(helpOption);
CommandLineParser parser = new DefaultParser();
CommandLine cmd;
@ -245,7 +248,6 @@ public class FastWriteExample {
return;
}
// 检查是否请求了帮助信息
if (cmd.hasOption("help")) {
printHelp(options);
return;
@ -307,8 +309,7 @@ public class FastWriteExample {
readThreadCount, writeThreadPerReadThread, batchSizeByRow, cacheSizeByRow, subTableNum, rowsPerSubTable);
logger.info("create database begin.");
databaseMonitor.init(dbName).prepareDatabase();
databaseMonitor.close();
Util.prepareDatabase(dbName);
logger.info("create database end.");

View File

@ -4,12 +4,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.Random;
/**
* Generate test data
*/
class MockDataSource implements Iterator<Meters> {
private final static Logger logger = LoggerFactory.getLogger(WorkTask.class);
private final static Logger logger = LoggerFactory.getLogger(MockDataSource.class);
private final int tableStartIndex;
private final int tableEndIndex;
@ -17,6 +18,7 @@ class MockDataSource implements Iterator<Meters> {
long currentMs = System.currentTimeMillis();
private int index = 0;
private Random random;
// mock values
@ -45,7 +47,7 @@ class MockDataSource implements Iterator<Meters> {
meters.setTableName(Util.getTableNamePrefix() + currentTbId);
meters.setTs(new java.sql.Timestamp(currentMs));
meters.setCurrent((float) (Math.random() * 100));
meters.setVoltage((int) (Math.random() * 100));
meters.setVoltage(random.nextInt());
meters.setPhase((float) (Math.random() * 100));
index ++;

View File

@ -28,7 +28,7 @@ class ProducerTask implements Runnable, Stoppable {
@Override
public void run() {
logger.info("started");
logger.info("kafak producer {}, started", taskId);
Iterator<Meters> it = new MockDataSource(subTableStartIndex, subTableEndIndex, rowsPerTable);
Properties props = new Properties();
@ -58,10 +58,11 @@ class ProducerTask implements Runnable, Stoppable {
finally {
producer.close();
}
logger.info("kafka producer {} stopped", taskId);
}
public void stop() {
logger.info("stop");
logger.info("kafka producer {} stopping", taskId);
this.active = false;
}
}

View File

@ -3,29 +3,36 @@ package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
class StatTask implements Runnable, Stoppable {
private final static Logger logger = LoggerFactory.getLogger(StatTask.class);
private final DataBaseMonitor databaseMonitor;
private final int subTableNum;
private final String dbName;
private final Connection conn;
private final Statement stmt;
private volatile boolean active = true;
public StatTask(String dbName,
int subTableNum) throws SQLException {
this.databaseMonitor = new DataBaseMonitor().init(dbName);
this.dbName = dbName;
this.subTableNum = subTableNum;
this.conn = Util.getConnection();
this.stmt = conn.createStatement();
}
@Override
public void run() {
long lastCount = 0;
while (active) {
try {
Thread.sleep(10000);
long count = databaseMonitor.count();
long count = Util.count(stmt, dbName);
logger.info("numberOfTable={} count={} speed={}", subTableNum, count, (count - lastCount) / 10);
lastCount = count;
} catch (InterruptedException e) {
@ -37,10 +44,17 @@ class StatTask implements Runnable, Stoppable {
}
}
databaseMonitor.close();
try {
stmt.close();
conn.close();
} catch (SQLException e) {
logger.error("close connection error: ", e);
}
}
public void stop() {
active = false;
}
}

View File

@ -2,9 +2,7 @@ package com.taos.example.highvolume;
import com.taosdata.jdbc.TSDBDriver;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.*;
import java.util.Properties;
import org.apache.kafka.clients.admin.*;
@ -42,6 +40,23 @@ public class Util {
properties.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
return DriverManager.getConnection(jdbcURL, properties);
}
public static void prepareDatabase(String dbName) throws SQLException {
try (Connection conn =Util.getConnection();
Statement stmt = conn.createStatement()){
stmt.execute("DROP DATABASE IF EXISTS " + dbName);
stmt.execute("CREATE DATABASE IF NOT EXISTS " + dbName + " vgroups 20");
stmt.execute("use " + dbName);
stmt.execute("CREATE STABLE " + dbName + ".meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(64))");
}
}
public static long count(Statement stmt, String dbName) throws SQLException {
try (ResultSet result = stmt.executeQuery("SELECT count(*) from " + dbName +".meters")) {
result.next();
return result.getLong(1);
}
}
public static String getKafkaBootstrapServers() {
String kafkaBootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
@ -84,4 +99,6 @@ public class Util {
public static int getPartitionCount() {
return 5;
}
}

View File

@ -8,7 +8,7 @@ import java.sql.PreparedStatement;
import java.util.Iterator;
class WorkTask implements Runnable, Stoppable {
private final static Logger logger = LoggerFactory.getLogger(WorkTask.class);
private static final Logger logger = LoggerFactory.getLogger(WorkTask.class);
private final int taskId;
private final int writeThreadCount;
private final int batchSizeByRow;
@ -31,14 +31,14 @@ class WorkTask implements Runnable, Stoppable {
this.batchSizeByRow = batchSizeByRow;
this.cacheSizeByRow = cacheSizeByRow;
this.rowsPerTable = rowsPerTable;
this.subTableStartIndex = subTableStartIndex;
this.subTableEndIndex = subTableEndIndex;
this.subTableStartIndex = subTableStartIndex; // for this task, the start index of sub table
this.subTableEndIndex = subTableEndIndex; // for this task, the end index of sub table
this.dbName = dbName;
}
@Override
public void run() {
logger.info("started");
logger.info("task {} started", taskId);
Iterator<Meters> it = new MockDataSource(subTableStartIndex, subTableEndIndex, rowsPerTable);
try (Connection connection = Util.getConnection(batchSizeByRow, cacheSizeByRow, writeThreadCount);
PreparedStatement pstmt = connection.prepareStatement("INSERT INTO " + dbName +".meters (tbname, ts, current, voltage, phase) VALUES (?,?,?,?,?)")) {
@ -56,14 +56,19 @@ class WorkTask implements Runnable, Stoppable {
if (i % batchSizeByRow == 0) {
pstmt.executeBatch();
}
if (i % (10L * batchSizeByRow) == 0){
pstmt.executeUpdate();
}
}
} catch (Exception e) {
logger.error("Work Task {} Error", taskId, e);
}
logger.info("task {} stopped", taskId);
}
public void stop() {
logger.info("stop");
logger.info("task {} stopping", taskId);
this.active = false;
}
}

View File

@ -21,9 +21,9 @@ import TabItem from "@theme/TabItem";
2. 并发连接数。一般来讲,同时写入数据的并发连接数越多写入越高效(但超过一定阈值反而会下降,取决于服务端处理能力)
3. 数据在不同表(或子表)之间的分布,即要写入数据的相邻性。一般来说,每批次只向同一张表(或子表)写入数据比向多张表(或子表)写入数据要更高效
4. 写入方式。一般来讲:
- 参数绑定写入比 SQL 写入更高效。因参数绑定方式避免了 SQL 解析。(但增加了 C 接口的调用次数,对于连接器也有性能损耗)
- SQL 写入不自动建表比自动建表更高效。因自动建表要频繁检查表是否存在
- SQL 写入比无模式写入更高效。因无模式写入会自动建表且支持动态更改表结构
- 参数绑定写入比 SQL 写入更高效。因参数绑定方式避免了 SQL 解析。
- SQL 写入不自动建表比自动建表更高效。因自动建表要频繁检查表是否存在
- SQL 写入比无模式写入更高效。因无模式写入会自动建表且支持动态更改表结构
客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。
@ -54,110 +54,170 @@ import TabItem from "@theme/TabItem";
下面的示例程序展示了如何高效写入数据,场景设计如下:
- TDengine 客户端程序从其它数据源不断读入数据,在示例程序中采用生成模拟数据的方式来模拟读取数据源
- 单个连接向 TDengine 写入的速度无法与读数据的速度相匹配,因此客户端程序启动多个线程,每个线程都建立了与 TDengine 的连接,每个线程都有一个独占的固定大小的消息队列
- 客户端程序将接收到的数据根据所属的表名或子表名HASH 到不同的线程,即写入该线程所对应的消息队列,以此确保属于某个表(或子表)的数据一定会被一个固定的线程处理
- 各个子线程在将所关联的消息队列中的数据读空后或者读取数据量达到一个预定的阈值后将该批数据写入 TDengine并继续处理后面接收到的数据
- TDengine 客户端程序从其它数据源不断读入数据,在示例程序中采用生成模拟数据的方式来模拟读取数据源,同时提供了从 Kafka 拉取数据写入 TDengine 的示例。
- 为了提高TDengine 客户端程序读取数据速度,使用多线程读取。为了避免乱序,多个读取线程读取数据对应的表集合应该不重叠。
- 为了与每个数据读取线程读取数据的速度相匹配,后台启用一组写入线程与之对应,每个写入线程都有一个独占的固定大小的消息队列。
![TDengine 高效写入示例场景的线程模型](highvolume.webp)
![TDengine 高效写入示例场景的线程模型](highvolume.png)
### 示例代码 {#code}
这一部分是针对以上场景的示例代码。对于其它场景高效写入原理相同,不过代码需要适当修改。
本示例代码假设源数据属于同一张超级表meters的不同子表。程序在开始写入数据之前已经在 test 库创建了这个超级表。对于子表,将根据收到的数据,由应用程序自动创建。如果实际场景是多个超级表,只需修改写任务自动建表的代码
本示例代码假设源数据属于同一张超级表meters的不同子表。程序在开始写入数据之前已经在 test 库创建了这个超级表,以及对应的子表。如果实际场景是多个超级表,只需按需创建多个超级表和启动多组任务
<Tabs defaultValue="java" groupId="lang">
<TabItem label="Java" value="java">
**程序清单**
**JDBC 高效写入特性简介**
JDBC 驱动从 `3.6.0` 版本开始,在 WebSocket 连接上提供了高效写入特性,其配置参数请参考 [高效写入配置](../../reference/connector/java/#properties)。 JDBC 驱动高效写入特性有如下特点:
- 支持 JDBC 标准参数绑定接口。
- 在资源充分条件下,写入能力跟写入线程数配置线性相关。
- 支持写入超时和连接断开重连后的重试次数和重试间隔配置。
- 支持调用 executeUpdate 接口获取写入数据条数,若写入有异常,此时可捕获。
| 类名 | 功能说明 |
| ---------------- | --------------------------------------------------------------------------- |
| FastWriteExample | 主程序 |
| ReadTask | 从模拟源中读取数据,将表名经过 Hash 后得到 Queue 的 Index写入对应的 Queue |
| WriteTask | 从 Queue 中获取数据,组成一个 Batch写入 TDengine |
| MockDataSource | 模拟生成一定数量 meters 子表的数据 |
| SQLWriter | WriteTask 依赖这个类完成 SQL 拼接、自动建表、 SQL 写入、SQL 长度检查 |
| StmtWriter | 实现参数绑定方式批量写入(暂未完成) |
| DataBaseMonitor | 统计写入速度,并每隔 10 秒把当前写入速度打印到控制台 |
**程序清单**
| 类名 | 功能说明 |
| ------------------ | ----------------------------------------------------------------------------------------- |
| FastWriteExample | 主程序,完成命令行参数解析,线程池创建,以及等待任务完成功能 |
| WorkTask | 从模拟源中读取数据,调用 JDBC 标准接口写入 |
| MockDataSource | 模拟生成一定数量 `meters` 子表的数据 |
| DataBaseMonitor | 统计写入速度,并每隔 10 秒把当前写入速度打印到控制台 |
| CreateSubTableTask | 根据子表范围创建子表,供主程序调用 |
| Meters | 提供了 `meters` 表单条数据的序列化和反序列化,供发送消息给 Kafka 和 从 Kafka 接收消息使用 |
| ProducerTask | 生产者,向 Kafka 发送消息 |
| ConsumerTask | 消费者,从 Kafka 接收消息,调用 JDBC 高效写入接口写入 TDengine并按进度提交 offset |
| Util | 提供一些基础功能,包括创建连接,创建 Kafka topic统计写入条数等 |
以下是各类的完整代码和更详细的功能说明。
<details>
<summary>FastWriteExample</summary>
主程序命令行参数介绍:
```shell
-b,--batchSizeByRow <arg> 指定高效写入的 batchSizeByRow 参数,默认 1000
-c,--cacheSizeByRow <arg> 指定高效写入的 cacheSizeByRow 参数,默认 10000
-d,--dbName <arg> 指定数据库名, 默认 `test`
--help 打印帮助信息
-K,--useKafka 使用 Kafka采用创建生产者发送消息消费者接收消息写入 TDengine 方式。否则采用工作线程订阅模拟器生成数据写入 TDengine 方式
-r,--readThreadCount <arg> 指定工作线程数,默认 5当 Kafka 模式,此参数同时决定生产者和消费者线程数
-R,--rowsPerSubTable <arg> 指定每子表写入行数,默认 100
-s,--subTableNum <arg> 指定子表总数,默认 1000000
-w,--writeThreadPerReadThread <arg> 指定每工作线程对应写入线程数,默认 5
```
JDBC URL 和 Kafka 集群地址配置:
1. JDBC URL 通过环境变量配置,例如:`export TDENGINE_JDBC_URL="jdbc:TAOS-WS://localhost:6041?user=root&password=taosdata"`
2. Kafka 集群地址通过环境变量配置,例如: `KAFKA_BOOTSTRAP_SERVERS=localhost:9092`
使用方式:
```shell
1. 采用模拟数据写入方式java -jar highVolume.jar -r 5 -w 5 -b 10000 -c 100000 -s 1000000 -R 1000
2. 采用 Kafka 订阅写入方式java -jar highVolume.jar -r 5 -w 5 -b 10000 -c 100000 -s 1000000 -R 100 -K
```
主程序负责:
1. 创建消息队列
2. 启动写线程
3. 启动读线程
4. 每隔 10 秒统计一次写入速度
主程序默认暴露了 4 个参数,每次启动程序都可调节,用于测试和调优:
1. 读线程个数。默认为 1。
2. 写线程个数。默认为 3。
3. 模拟生成的总表数。默认为 1,000。将会平分给各个读线程。如果总表数较大建表需要花费较长开始统计的写入速度可能较慢。
4. 每批最多写入记录数量。默认为 3,000。
队列容量taskQueueCapacity也是与性能有关的参数可通过修改程序调节。一般来讲队列容量越大入队被阻塞的概率越小队列的吞吐量越大但是内存占用也会越大。示例程序默认值已经设置地足够大。
1. 解析命令行参数
2. 创建子表
3. 创建工作线程或 Kafka 生产者,消费者
4. 统计写入速度
5. 等待写入结束,释放资源
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java}}
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/FastWriteExample.java}}
```
</details>
<details>
<summary>ReadTask</summary>
<summary>WorkTask</summary>
读任务负责从数据源读数据。每个读任务都关联了一个模拟数据源。每个模拟数据源可生成一点数量表的数据。不同的模拟数据源生成不同表的数据。
读任务采用阻塞的方式写消息队列。也就是说,一旦队列满了,写操作就会阻塞。
工作线程负责从模拟数据源读数据。每个读任务都关联了一个模拟数据源。每个模拟数据源可生成某个子表区间的数据。不同的模拟数据源生成不同表的数据。
工作线程采用阻塞的方式调用 JDBC 标准接口 `addBatch`。也就是说,一旦对应高效写入后端队列满了,写操作就会阻塞。
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java}}
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/WorkTask.java}}
```
</details>
<details>
<summary>WriteTask</summary>
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java}}
```
</details>
<details>
<summary>MockDataSource</summary>
模拟数据生成器,生成一定子表范围的数据。为了模拟真实情况,采用轮流每个子表一条数据的生成方式。
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java}}
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/MockDataSource.java}}
```
</details>
<details>
<summary>CreateSubTableTask</summary>
<summary>SQLWriter</summary>
SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都没有提前创建,而是在 catch 到表不存在异常的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。对于其它异常,这里简单地记录当时执行的 SQL 语句到日志中,你也可以记录更多线索到日志,已便排查错误和故障恢复。
根据子表范围创建子表,采用批量拼 sql 创建方式。
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java}}
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/CreateSubTableTask.java}}
```
</details>
<details>
<summary>Meters</summary>
<summary>DataBaseMonitor</summary>
数据模型类,提供了发送到 Kafka 的序列化和反序列化方法。
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java}}
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Meters.java}}
```
</details>
<details>
<summary>ProducerTask</summary>
消息生产者,采用与 JDBC 高效写入不同的 Hash 方式,将模拟数据生成器生成的数据,写入所有分区。
```java
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ProducerTask.java}}
```
</details>
<details>
<summary>ConsumerTask</summary>
消息消费者,从 Kafka 接收消息,写入 TDengine。
```java
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ConsumerTask.java}}
```
</details>
<details>
<summary>StatTask</summary>
提供定时统计写入条数功能
```java
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/StatTask.java}}
```
</details>
<details>
<summary>Util</summary>
工具类,提供连接创建,数据库创建和 topic 创建等功能。
```java
{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Util.java}}
```
</details>
@ -167,272 +227,76 @@ SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都
<details>
<summary>执行 Java 示例程序</summary>
执行程序前需配置环境变量 `TDENGINE_JDBC_URL`。如果 TDengine Server 部署在本机,且用户名、密码和端口都是默认值,那么可配置:
```
TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
```
**本地集成开发环境执行示例程序**
1. clone TDengine 仓库
```
git clone git@github.com:taosdata/TDengine.git --depth 1
```
2. 用集成开发环境打开 `docs/examples/java` 目录。
2. 用集成开发环境打开 `TDengine/docs/examples/JDBC/highvolume` 目录。
3. 在开发环境中配置环境变量 `TDENGINE_JDBC_URL`。如果已配置了全局的环境变量 `TDENGINE_JDBC_URL` 可跳过这一步。
4. 运行类 `com.taos.example.highvolume.FastWriteExample`
4. 如果要运行 Kafka 示例,需要设置 Kafka 集群地址的环境变量 `KAFKA_BOOTSTRAP_SERVERS`
5. 指定命令行参数,如 `-r 3 -w 3 -b 100 -c 1000 -s 1000 -R 100`
6. 运行类 `com.taos.example.highvolume.FastWriteExample`
**远程服务器上执行示例程序**
若要在服务器上执行示例程序,可按照下面的步骤操作:
1. 打包示例代码。在目录 TDengine/docs/examples/java 下执行:
```
1. 打包示例代码。在目录 `TDengine/docs/examples/JDBC/highvolume` 下执行下面命令来生成 `highVolume.jar`
```java
mvn package
```
2. 远程服务器上创建 examples 目录:
2. 复制程序到服务器指定目录:
```shell
scp -r .\target\highVolume.jar <user>@<host>:~/dest-path
```
mkdir -p examples/java
```
3. 复制依赖到服务器指定目录:
- 复制依赖包,只用复制一次
```
scp -r .\target\lib <user>@<host>:~/examples/java
```
- 复制本程序的 jar 包,每次更新代码都需要复制
```
scp -r .\target\javaexample-1.0.jar <user>@<host>:~/examples/java
```
4. 配置环境变量。
3. 配置环境变量。
编辑 `~/.bash_profile``~/.bashrc` 添加如下内容例如:
```
```shell
export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
```
以上使用的是本地部署 TDengine Server 时默认的 JDBC URL。你需要根据自己的实际情况更改。
以上使用的是本地部署 TDengine Server 时默认的 JDBC URL。你需要根据自己的实际情况更改。
如果想使用 Kafka 订阅模式,请再增加 Kafaka 集群环境变量配置:
5. 用 Java 命令启动示例程序,命令模板:
```
java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample <read_thread_count> <white_thread_count> <total_table_count> <max_batch_size>
```shell
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
```
6. 结束测试程序。测试程序不会自动结束,在获取到当前配置下稳定的写入速度后,按 <kbd>CTRL</kbd> + <kbd>C</kbd> 结束程序。
下面是一次实际运行的日志输出,机器配置 16 核 + 64G + 固态硬盘。
4. 用 Java 命令启动示例程序,命令模板(如果用 Kafaka 订阅模式,最后可以加上 `-K`
```java
java -jar highVolume.jar -r 5 -w 5 -b 10000 -c 100000 -s 1000000 -R 1000
```
root@vm85$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 2 12
18:56:35.896 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=2, writeTaskCount=12 tableCount=1000 maxBatchSize=3000
18:56:36.011 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.015 [WriteThread-0] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.021 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.022 [WriteThread-1] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.031 [WriteThread-2] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.032 [WriteThread-2] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.041 [WriteThread-3] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.042 [WriteThread-3] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.093 [WriteThread-4] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.094 [WriteThread-4] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.099 [WriteThread-5] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.100 [WriteThread-5] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.100 [WriteThread-6] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.101 [WriteThread-6] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.103 [WriteThread-7] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.104 [WriteThread-7] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.105 [WriteThread-8] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.107 [WriteThread-8] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.108 [WriteThread-9] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.109 [WriteThread-9] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.156 [WriteThread-10] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.157 [WriteThread-11] INFO c.taos.example.highvolume.WriteTask - started
18:56:36.158 [WriteThread-10] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:36.158 [ReadThread-0] INFO com.taos.example.highvolume.ReadTask - started
18:56:36.158 [ReadThread-1] INFO com.taos.example.highvolume.ReadTask - started
18:56:36.158 [WriteThread-11] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
18:56:46.369 [main] INFO c.t.e.highvolume.FastWriteExample - count=18554448 speed=1855444
18:56:56.946 [main] INFO c.t.e.highvolume.FastWriteExample - count=39059660 speed=2050521
18:57:07.322 [main] INFO c.t.e.highvolume.FastWriteExample - count=59403604 speed=2034394
18:57:18.032 [main] INFO c.t.e.highvolume.FastWriteExample - count=80262938 speed=2085933
18:57:28.432 [main] INFO c.t.e.highvolume.FastWriteExample - count=101139906 speed=2087696
18:57:38.921 [main] INFO c.t.e.highvolume.FastWriteExample - count=121807202 speed=2066729
18:57:49.375 [main] INFO c.t.e.highvolume.FastWriteExample - count=142952417 speed=2114521
18:58:00.689 [main] INFO c.t.e.highvolume.FastWriteExample - count=163650306 speed=2069788
18:58:11.646 [main] INFO c.t.e.highvolume.FastWriteExample - count=185019808 speed=2136950
5. 结束测试程序。测试程序不会自动结束,在获取到当前配置下稳定的写入速度后,按 <kbd>CTRL</kbd> + <kbd>C</kbd> 结束程序。
下面是一次实际运行的日志输出,机器配置 40 核 + 256G + 固态硬盘。
```shell
---------------$ java -jar highVolume.jar -r 2 -w 10 -b 10000 -c 100000 -s 1000000 -R 100
[INFO ] 2025-03-24 18:03:17.980 com.taos.example.highvolume.FastWriteExample main 309 main readThreadCount=2, writeThreadPerReadThread=10 batchSizeByRow=10000 cacheSizeByRow=100000, subTableNum=1000000, rowsPerSubTable=100
[INFO ] 2025-03-24 18:03:17.983 com.taos.example.highvolume.FastWriteExample main 312 main create database begin.
[INFO ] 2025-03-24 18:03:34.499 com.taos.example.highvolume.FastWriteExample main 315 main create database end.
[INFO ] 2025-03-24 18:03:34.500 com.taos.example.highvolume.FastWriteExample main 317 main create sub tables start.
[INFO ] 2025-03-24 18:03:34.502 com.taos.example.highvolume.FastWriteExample createSubTables 73 main create sub table task started.
[INFO ] 2025-03-24 18:03:55.777 com.taos.example.highvolume.FastWriteExample createSubTables 82 main create sub table task finished.
[INFO ] 2025-03-24 18:03:55.778 com.taos.example.highvolume.FastWriteExample main 319 main create sub tables end.
[INFO ] 2025-03-24 18:03:55.781 com.taos.example.highvolume.WorkTask run 41 FW-work-thread-2 started
[INFO ] 2025-03-24 18:03:55.781 com.taos.example.highvolume.WorkTask run 41 FW-work-thread-1 started
[INFO ] 2025-03-24 18:04:06.580 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=12235906 speed=1223590
[INFO ] 2025-03-24 18:04:17.531 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=31185614 speed=1894970
[INFO ] 2025-03-24 18:04:28.490 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=51464904 speed=2027929
[INFO ] 2025-03-24 18:04:40.851 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=71498113 speed=2003320
[INFO ] 2025-03-24 18:04:51.948 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=91242103 speed=1974399
```
</details>
</TabItem>
<TabItem label="Python" value="python">
**程序清单**
Python 示例程序中采用了多进程的架构,并使用了跨进程的消息队列。
| 函数或类 | 功能说明 |
| ------------------------ | -------------------------------------------------------------------- |
| main 函数 | 程序入口, 创建各个子进程和消息队列 |
| run_monitor_process 函数 | 创建数据库,超级表,统计写入速度并定时打印到控制台 |
| run_read_task 函数 | 读进程主要逻辑,负责从其它数据系统读数据,并分发数据到为之分配的队列 |
| MockDataSource 类 | 模拟数据源, 实现迭代器接口,每次批量返回每张表的接下来 1,000 条数据 |
| run_write_task 函数 | 写进程主要逻辑。每次从队列中取出尽量多的数据,并批量写入 |
| SQLWriter 类 | SQL 写入和自动建表 |
| StmtWriter 类 | 实现参数绑定方式批量写入(暂未完成) |
<details>
<summary>main 函数</summary>
main 函数负责创建消息队列和启动子进程,子进程有 3 类:
1. 1 个监控进程,负责数据库初始化和统计写入速度
2. n 个读进程,负责从其它数据系统读数据
3. m 个写进程,负责写数据库
main 函数可以接收 5 个启动参数,依次是:
1. 读任务(进程)数, 默认为 1
2. 写任务(进程)数, 默认为 1
3. 模拟生成的总表数,默认为 1,000
4. 队列大小(单位字节),默认为 1,000,000
5. 每批最多写入记录数量, 默认为 3,000
```python
{{#include docs/examples/python/fast_write_example.py:main}}
```
</details>
<details>
<summary>run_monitor_process</summary>
监控进程负责初始化数据库,并监控当前的写入速度。
```python
{{#include docs/examples/python/fast_write_example.py:monitor}}
```
</details>
<details>
<summary>run_read_task 函数</summary>
读进程,负责从其它数据系统读数据,并分发数据到为之分配的队列。
```python
{{#include docs/examples/python/fast_write_example.py:read}}
```
</details>
<details>
<summary>MockDataSource</summary>
以下是模拟数据源的实现,我们假设数据源生成的每一条数据都带有目标表名信息。实际中你可能需要一定的规则确定目标表名。
```python
{{#include docs/examples/python/mockdatasource.py}}
```
</details>
<details>
<summary>run_write_task 函数</summary>
写进程每次从队列中取出尽量多的数据,并批量写入。
```python
{{#include docs/examples/python/fast_write_example.py:write}}
```
</details>
<details>
SQLWriter 类封装了拼 SQL 和写数据的逻辑。所有的表都没有提前创建,而是在发生表不存在错误的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。对于其它错误会记录当时执行的 SQL以便排查错误和故障恢复。这个类也对 SQL 是否超过最大长度限制做了检查,根据 TDengine 3.0 的限制由输入参数 maxSQLLength 传入了支持的最大 SQL 长度,即 1,048,576 。
<summary>SQLWriter</summary>
```python
{{#include docs/examples/python/sql_writer.py}}
```
</details>
**执行步骤**
<details>
<summary>执行 Python 示例程序</summary>
1. 前提条件
- 已安装 TDengine 客户端驱动
- 已安装 Python3 推荐版本 >= 3.8
- 已安装 taospy
2. 安装 faster-fifo 代替 Python 内置的 multiprocessing.Queue
```
pip3 install faster-fifo
```
3. 点击上面的“查看源码”链接复制 `fast_write_example.py``sql_writer.py``mockdatasource.py` 三个文件。
4. 执行示例程序
```
python3 fast_write_example.py <READ_TASK_COUNT> <WRITE_TASK_COUNT> <TABLE_COUNT> <QUEUE_SIZE> <MAX_BATCH_SIZE>
```
下面是一次实际运行的输出, 机器配置 16 核 + 64G + 固态硬盘。
```
root@vm85$ python3 fast_write_example.py 8 8
2022-07-14 19:13:45,869 [root] - READ_TASK_COUNT=8, WRITE_TASK_COUNT=8, TABLE_COUNT=1000, QUEUE_SIZE=1000000, MAX_BATCH_SIZE=3000
2022-07-14 19:13:48,882 [root] - WriteTask-0 started with pid 718347
2022-07-14 19:13:48,883 [root] - WriteTask-1 started with pid 718348
2022-07-14 19:13:48,884 [root] - WriteTask-2 started with pid 718349
2022-07-14 19:13:48,884 [root] - WriteTask-3 started with pid 718350
2022-07-14 19:13:48,885 [root] - WriteTask-4 started with pid 718351
2022-07-14 19:13:48,885 [root] - WriteTask-5 started with pid 718352
2022-07-14 19:13:48,886 [root] - WriteTask-6 started with pid 718353
2022-07-14 19:13:48,886 [root] - WriteTask-7 started with pid 718354
2022-07-14 19:13:48,887 [root] - ReadTask-0 started with pid 718355
2022-07-14 19:13:48,888 [root] - ReadTask-1 started with pid 718356
2022-07-14 19:13:48,889 [root] - ReadTask-2 started with pid 718357
2022-07-14 19:13:48,889 [root] - ReadTask-3 started with pid 718358
2022-07-14 19:13:48,890 [root] - ReadTask-4 started with pid 718359
2022-07-14 19:13:48,891 [root] - ReadTask-5 started with pid 718361
2022-07-14 19:13:48,892 [root] - ReadTask-6 started with pid 718364
2022-07-14 19:13:48,893 [root] - ReadTask-7 started with pid 718365
2022-07-14 19:13:56,042 [DataBaseMonitor] - count=6676310 speed=667631.0
2022-07-14 19:14:06,196 [DataBaseMonitor] - count=20004310 speed=1332800.0
2022-07-14 19:14:16,366 [DataBaseMonitor] - count=32290310 speed=1228600.0
2022-07-14 19:14:26,527 [DataBaseMonitor] - count=44438310 speed=1214800.0
2022-07-14 19:14:36,673 [DataBaseMonitor] - count=56608310 speed=1217000.0
2022-07-14 19:14:46,834 [DataBaseMonitor] - count=68757310 speed=1214900.0
2022-07-14 19:14:57,280 [DataBaseMonitor] - count=80992310 speed=1223500.0
2022-07-14 19:15:07,689 [DataBaseMonitor] - count=93805310 speed=1281300.0
2022-07-14 19:15:18,020 [DataBaseMonitor] - count=106111310 speed=1230600.0
2022-07-14 19:15:28,356 [DataBaseMonitor] - count=118394310 speed=1228300.0
2022-07-14 19:15:38,690 [DataBaseMonitor] - count=130742310 speed=1234800.0
2022-07-14 19:15:49,000 [DataBaseMonitor] - count=143051310 speed=1230900.0
2022-07-14 19:15:59,323 [DataBaseMonitor] - count=155276310 speed=1222500.0
2022-07-14 19:16:09,649 [DataBaseMonitor] - count=167603310 speed=1232700.0
2022-07-14 19:16:19,995 [DataBaseMonitor] - count=179976310 speed=1237300.0
```
</details>
:::note
使用 Python 连接器多进程连接 TDengine 的时候,有一个限制:不能在父进程中建立连接,所有连接只能在子进程中创建。
如果在父进程中创建连接,子进程再创建连接就会一直阻塞。这是个已知问题。
:::
</TabItem>
</Tabs>

Binary file not shown.

After

Width:  |  Height:  |  Size: 558 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.1 KiB