diff --git a/docs/en/07-develop/15-high.md b/docs/en/07-develop/15-high.md
index 145ec93006..26585750ca 100644
--- a/docs/en/07-develop/15-high.md
+++ b/docs/en/07-develop/15-high.md
@@ -51,12 +51,10 @@ For more tuning parameters, please refer to [Database Management](../../tdengine
### Scenario Design {#scenario}
-The following example program demonstrates how to write data efficiently, with the scenario designed as follows:
-
-- The TDengine client application continuously reads data from other data sources. In the example program, simulated data generation is used to mimic reading from data sources.
-- The speed of a single connection writing to TDengine cannot match the speed of reading data, so the client application starts multiple threads, each establishing a connection with TDengine, and each thread has a dedicated fixed-size message queue.
-- The client application hashes the received data according to the table name (or subtable name) to different threads, i.e., writing to the message queue corresponding to that thread, ensuring that data belonging to a certain table (or subtable) will always be processed by a fixed thread.
-- Each sub-thread empties the data in its associated message queue or reaches a predetermined threshold of data volume, writes that batch of data to TDengine, and continues to process the data received afterwards.
+The following sample program demonstrates how to efficiently write data, with the scenario designed as follows:
+- The TDengine client program continuously reads data from other data sources. In the sample program, simulated data generation is used to mimic data source reading, while also providing an example of pulling data from Kafka and writing it to TDengine.
+- To improve the data reading speed of the TDengine client program, multi-threading is used for reading. To avoid out-of-order issues, the sets of tables read by multiple reading threads should be non-overlapping.
+- To match the data reading speed of each data reading thread, a set of write threads is launched in the background. Each write thread has an exclusive fixed-size message queue.
@@ -72,94 +70,172 @@ This sample code assumes that the source data belongs to different subtables of
+**Introduction to JDBC Efficient Writing Features**
+
+Starting from version 3.6.0, the JDBC driver provides efficient writing features over WebSocket connections. For configuration parameters, refer to efficient Writing Configuration. The JDBC driver's efficient writing features include the following characteristics:
+
+- Supports the JDBC standard parameter binding interface.
+- When resources are sufficient, writing capacity is linearly correlated with the configuration of write threads.
+- Supports configuration of write timeout, number of retries, and retry interval after connection disconnection and reconnection.
+- Supports invoking the executeUpdate interface to obtain the number of written data records, and exceptions during writing can be caught.
+
**Program Listing**
-| Class Name | Function Description |
-| ----------------- | -------------------------------------------------------------------------------- |
-| FastWriteExample | Main program |
-| ReadTask | Reads data from a simulated source, hashes the table name to get the Queue Index, writes to the corresponding Queue |
-| WriteTask | Retrieves data from the Queue, forms a Batch, writes to TDengine |
-| MockDataSource | Simulates generating data for a certain number of meters subtables |
-| SQLWriter | WriteTask relies on this class to complete SQL stitching, automatic table creation, SQL writing, and SQL length checking |
-| StmtWriter | Implements parameter binding for batch writing (not yet completed) |
-| DataBaseMonitor | Counts the writing speed and prints the current writing speed to the console every 10 seconds |
+| Class Name | Functional Description |
+| ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------ |
+| FastWriteExample | The main program responsible for command-line argument parsing, thread pool creation, and waiting for task completion. |
+| WorkTask | Reads data from a simulated source and writes it using the JDBC standard interface. |
+| MockDataSource | Simulates and generates data for a certain number of `meters` child tables. |
+| DataBaseMonitor | Tracks write speed and prints the current write speed to the console every 10 seconds. |
+| CreateSubTableTask | Creates child tables within a specified range for invocation by the main program. |
+| Meters | Provides serialization and deserialization of single records in the `meters` table, used for sending messages to Kafka and receiving messages from Kafka. |
+| ProducerTask | A producer that sends messages to Kafka. |
+| ConsumerTask | A consumer that receives messages from Kafka, writes data to TDengine using the JDBC efficient writing interface, and commits offsets according to progress. |
+| Util | Provides basic functionalities, including creating connections, creating Kafka topics, and counting write operations. |
+
Below are the complete codes and more detailed function descriptions for each class.
FastWriteExample
-The main program is responsible for:
-1. Creating message queues
-2. Starting write threads
-3. Starting read threads
-4. Counting the writing speed every 10 seconds
-The main program exposes 4 parameters by default, which can be adjusted each time the program is started, for testing and tuning:
-1. Number of read threads. Default is 1.
-2. Number of write threads. Default is 3.
-3. Total number of simulated tables. Default is 1,000. This will be evenly divided among the read threads. If the total number of tables is large, table creation will take longer, and the initial writing speed statistics may be slow.
-4. Maximum number of records written per batch. Default is 3,000.
-Queue capacity (taskQueueCapacity) is also a performance-related parameter, which can be adjusted by modifying the program. Generally speaking, the larger the queue capacity, the less likely it is to be blocked when enqueuing, the greater the throughput of the queue, but the larger the memory usage. The default value of the sample program is already set large enough.
+**Introduction to Main Program Command-Line Arguments:**
+
+```shell
+ -b,--batchSizeByRow Specifies the `batchSizeByRow` parameter for Efficient Writing, default is 1000
+ -c,--cacheSizeByRow Specifies the `cacheSizeByRow` parameter for Efficient Writing, default is 10000
+ -d,--dbName Specifies the database name, default is `test`
+ --help Prints help information
+ -K,--useKafka Enables Kafka mode, creating a producer to send messages and a consumer to receive messages for writing to TDengine. Otherwise, uses worker threads to subscribe to simulated data for writing.
+ -r,--readThreadCount Specifies the number of worker threads, default is 5. In Kafka mode, this parameter also determines the number of producer and consumer threads.
+ -R,--rowsPerSubTable Specifies the number of rows to write per child table, default is 100
+ -s,--subTableNum Specifies the total number of child tables, default is 1000000
+ -w,--writeThreadPerReadThread Specifies the number of write threads per worker thread, default is 5
+```
+
+**JDBC URL and Kafka Cluster Address Configuration:**
+
+1. The JDBC URL is configured via an environment variable, for example:
+ ```shell
+ export TDENGINE_JDBC_URL="jdbc:TAOS-WS://localhost:6041?user=root&password=taosdata"
+ ```
+2. The Kafka cluster address is configured via an environment variable, for example:
+ ```shell
+ export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
+ ```
+
+**Usage:**
+```shell
+1. Simulated data writing mode:
+ java -jar highVolume.jar -r 5 -w 5 -b 10000 -c 100000 -s 1000000 -R 1000
+2. Kafka subscription writing mode:
+ java -jar highVolume.jar -r 5 -w 5 -b 10000 -c 100000 -s 1000000 -R 100 -K
+```
+
+**Responsibilities of the Main Program:**
+1. Parses command-line arguments.
+2. Creates child tables.
+3. Creates worker threads or Kafka producers and consumers.
+4. Tracks write speed.
+5. Waits for writing to complete and releases resources.
+
```java
-{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java}}
+{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/FastWriteExample.java}}
+```
+
+
+
+
+
+
+WorkTask
+
+The worker thread is responsible for reading data from the simulated data source. Each read task is associated with a simulated data source, which can generate data for a specific range of sub-tables. Different simulated data sources generate data for different tables.
+The worker thread uses a blocking approach to invoke the JDBC standard interface `addBatch`. This means that if the corresponding efficient writing backend queue is full, the write operation will block.
+
+```java
+{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/WorkTask.java}}
```
-ReadTask
-
-The read task is responsible for reading data from the data source. Each read task is associated with a simulated data source. Each simulated data source can generate data for a certain number of tables. Different simulated data sources generate data for different tables.
-
-The read task writes to the message queue in a blocking manner. That is, once the queue is full, the write operation will be blocked.
-
-```java
-{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java}}
-```
-
-
-
-
-WriteTask
-
-```java
-{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/WriteTask.java}}
-```
-
-
-
-
-
MockDataSource
+A simulated data generator that produces data for a certain range of sub-tables. To mimic real-world scenarios, it generates data in a round-robin fashion, one row per subtable.
+
```java
-{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java}}
+{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/MockDataSource.java}}
```
+CreateSubTableTask
-SQLWriter
-
-The SQLWriter class encapsulates the logic of SQL stitching and data writing. Note that none of the tables are created in advance; instead, they are created in batches using the supertable as a template when a table not found exception is caught, and then the INSERT statement is re-executed. For other exceptions, this simply logs the SQL statement being executed at the time; you can also log more clues to facilitate error troubleshooting and fault recovery.
+Creates sub-tables within a specified range using a batch SQL creation approach.
```java
-{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java}}
+{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/CreateSubTableTask.java}}
```
+Meters
-DataBaseMonitor
+A data model class that provides serialization and deserialization methods for sending data to Kafka and receiving data from Kafka.
```java
-{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java}}
+{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Meters.java}}
+```
+
+
+
+
+ProducerTask
+
+A message producer that writes data generated by the simulated data generator to all partitions using a hash method different from JDBC efficient writing.
+
+```java
+{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ProducerTask.java}}
+```
+
+
+
+
+ConsumerTask
+
+A message consumer that receives messages from Kafka and writes them to TDengine.
+
+```java
+{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ConsumerTask.java}}
+```
+
+
+
+
+StatTask
+
+Provides a periodic function to count the number of written records.
+
+```java
+{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/StatTask.java}}
+```
+
+
+
+
+Util
+
+A utility class that provides functions such as creating connections, creating databases, and creating topics.
+
+```java
+{{#include docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Util.java}}
```
@@ -169,12 +245,6 @@ The SQLWriter class encapsulates the logic of SQL stitching and data writing. No
Execute the Java Example Program
-Before running the program, configure the environment variable `TDENGINE_JDBC_URL`. If the TDengine Server is deployed on the local machine, and the username, password, and port are all default values, then you can configure:
-
-```shell
-TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
-```
-
**Execute the example program in a local integrated development environment**
1. Clone the TDengine repository
@@ -183,268 +253,72 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
git clone git@github.com:taosdata/TDengine.git --depth 1
```
-2. Open the `docs/examples/java` directory with the integrated development environment.
+2. Open the `TDengine/docs/examples/JDBC/highvolume` directory with the integrated development environment.
3. Configure the environment variable `TDENGINE_JDBC_URL` in the development environment. If the global environment variable `TDENGINE_JDBC_URL` has already been configured, you can skip this step.
-4. Run the class `com.taos.example.highvolume.FastWriteExample`.
+4. If you want to run the Kafka example, you need to set the environment variable `KAFKA_BOOTSTRAP_SERVERS` for the Kafka cluster address.
+5. Specify command-line arguments, such as `-r 3 -w 3 -b 100 -c 1000 -s 1000 -R 100`.
+6. Run the class `com.taos.example.highvolume.FastWriteExample`.
**Execute the example program on a remote server**
To execute the example program on a server, follow these steps:
-1. Package the example code. Execute in the directory TDengine/docs/examples/java:
+1. Package the sample code. Navigate to the directory `TDengine/docs/examples/JDBC/highvolume` and run the following command to generate `highVolume.jar`:
```shell
mvn package
```
-2. Create an examples directory on the remote server:
-
+2. Copy the program to the specified directory on the server:
+
```shell
- mkdir -p examples/java
+ scp -r .\target\highVolume.jar @:~/dest-path
```
-3. Copy dependencies to the specified directory on the server:
- - Copy dependency packages, only once
-
- ```shell
- scp -r .\target\lib @:~/examples/java
- ```
-
- - Copy the jar package of this program, copy every time the code is updated
-
- ```shell
- scp -r .\target\javaexample-1.0.jar @:~/examples/java
- ```
-
-4. Configure the environment variable.
+3. Configure the environment variable.
Edit `~/.bash_profile` or `~/.bashrc` and add the following content for example:
```shell
export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
```
- The above uses the default JDBC URL when TDengine Server is deployed locally. You need to modify it according to your actual situation.
-
-5. Start the example program with the Java command, command template:
+ The above uses the default JDBC URL for a locally deployed TDengine Server. Modify it according to your actual environment.
+ If you want to use Kafka subscription mode, additionally configure the Kafka cluster environment variable:
```shell
- java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample
+ export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
```
-6. End the test program. The test program will not end automatically; after obtaining a stable writing speed under the current configuration, press CTRL + C to end the program.
- Below is a log output from an actual run, with machine configuration 16 cores + 64G + SSD.
+4. Start the sample program with the Java command. Use the following template (append `-K` for Kafka subscription mode):
+
+ ```shell
+ java -jar highVolume.jar -r 5 -w 5 -b 10000 -c 100000 -s 1000000 -R 1000
+ ```
+
+5. Terminate the test program. The program does not exit automatically. Once a stable write speed is achieved under the current configuration, press CTRL + C to terminate it.
+Below is a sample log output from an actual run on a machine with a 40-core CPU, 256GB RAM, and SSD storage.
```text
- root@vm85$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 2 12
- 18:56:35.896 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=2, writeTaskCount=12 tableCount=1000 maxBatchSize=3000
- 18:56:36.011 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started
- 18:56:36.015 [WriteThread-0] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
- 18:56:36.021 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started
- 18:56:36.022 [WriteThread-1] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
- 18:56:36.031 [WriteThread-2] INFO c.taos.example.highvolume.WriteTask - started
- 18:56:36.032 [WriteThread-2] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
- 18:56:36.041 [WriteThread-3] INFO c.taos.example.highvolume.WriteTask - started
- 18:56:36.042 [WriteThread-3] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
- 18:56:36.093 [WriteThread-4] INFO c.taos.example.highvolume.WriteTask - started
- 18:56:36.094 [WriteThread-4] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
- 18:56:36.099 [WriteThread-5] INFO c.taos.example.highvolume.WriteTask - started
- 18:56:36.100 [WriteThread-5] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
- 18:56:36.100 [WriteThread-6] INFO c.taos.example.highvolume.WriteTask - started
- 18:56:36.101 [WriteThread-6] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
- 18:56:36.103 [WriteThread-7] INFO c.taos.example.highvolume.WriteTask - started
- 18:56:36.104 [WriteThread-7] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
- 18:56:36.105 [WriteThread-8] INFO c.taos.example.highvolume.WriteTask - started
- 18:56:36.107 [WriteThread-8] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
- 18:56:36.108 [WriteThread-9] INFO c.taos.example.highvolume.WriteTask - started
- 18:56:36.109 [WriteThread-9] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
- 18:56:36.156 [WriteThread-10] INFO c.taos.example.highvolume.WriteTask - started
- 18:56:36.157 [WriteThread-11] INFO c.taos.example.highvolume.WriteTask - started
- 18:56:36.158 [WriteThread-10] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
- 18:56:36.158 [ReadThread-0] INFO com.taos.example.highvolume.ReadTask - started
- 18:56:36.158 [ReadThread-1] INFO com.taos.example.highvolume.ReadTask - started
- 18:56:36.158 [WriteThread-11] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
- 18:56:46.369 [main] INFO c.t.e.highvolume.FastWriteExample - count=18554448 speed=1855444
- 18:56:56.946 [main] INFO c.t.e.highvolume.FastWriteExample - count=39059660 speed=2050521
- 18:57:07.322 [main] INFO c.t.e.highvolume.FastWriteExample - count=59403604 speed=2034394
- 18:57:18.032 [main] INFO c.t.e.highvolume.FastWriteExample - count=80262938 speed=2085933
- 18:57:28.432 [main] INFO c.t.e.highvolume.FastWriteExample - count=101139906 speed=2087696
- 18:57:38.921 [main] INFO c.t.e.highvolume.FastWriteExample - count=121807202 speed=2066729
- 18:57:49.375 [main] INFO c.t.e.highvolume.FastWriteExample - count=142952417 speed=2114521
- 18:58:00.689 [main] INFO c.t.e.highvolume.FastWriteExample - count=163650306 speed=2069788
- 18:58:11.646 [main] INFO c.t.e.highvolume.FastWriteExample - count=185019808 speed=2136950
+ ---------------$ java -jar highVolume.jar -r 2 -w 10 -b 10000 -c 100000 -s 1000000 -R 100
+ [INFO ] 2025-03-24 18:03:17.980 com.taos.example.highvolume.FastWriteExample main 309 main readThreadCount=2, writeThreadPerReadThread=10 batchSizeByRow=10000 cacheSizeByRow=100000, subTableNum=1000000, rowsPerSubTable=100
+ [INFO ] 2025-03-24 18:03:17.983 com.taos.example.highvolume.FastWriteExample main 312 main create database begin.
+ [INFO ] 2025-03-24 18:03:34.499 com.taos.example.highvolume.FastWriteExample main 315 main create database end.
+ [INFO ] 2025-03-24 18:03:34.500 com.taos.example.highvolume.FastWriteExample main 317 main create sub tables start.
+ [INFO ] 2025-03-24 18:03:34.502 com.taos.example.highvolume.FastWriteExample createSubTables 73 main create sub table task started.
+ [INFO ] 2025-03-24 18:03:55.777 com.taos.example.highvolume.FastWriteExample createSubTables 82 main create sub table task finished.
+ [INFO ] 2025-03-24 18:03:55.778 com.taos.example.highvolume.FastWriteExample main 319 main create sub tables end.
+ [INFO ] 2025-03-24 18:03:55.781 com.taos.example.highvolume.WorkTask run 41 FW-work-thread-2 started
+ [INFO ] 2025-03-24 18:03:55.781 com.taos.example.highvolume.WorkTask run 41 FW-work-thread-1 started
+ [INFO ] 2025-03-24 18:04:06.580 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=12235906 speed=1223590
+ [INFO ] 2025-03-24 18:04:17.531 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=31185614 speed=1894970
+ [INFO ] 2025-03-24 18:04:28.490 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=51464904 speed=2027929
+ [INFO ] 2025-03-24 18:04:40.851 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=71498113 speed=2003320
+ [INFO ] 2025-03-24 18:04:51.948 com.taos.example.highvolume.StatTask run 36 pool-1-thread-1 numberOfTable=1000000 count=91242103 speed=1974399
+
```
-
-**Program Listing**
-
-The Python example program uses a multi-process architecture and employs a cross-process message queue.
-
-| Function or Class | Description |
-| ------------------------ | ------------------------------------------------------------------- |
-| main function | Entry point of the program, creates various subprocesses and message queues |
-| run_monitor_process function | Creates database, supertables, tracks write speed and periodically prints to console |
-| run_read_task function | Main logic for read processes, responsible for reading data from other data systems and distributing it to assigned queues |
-| MockDataSource class | Simulates a data source, implements iterator interface, returns the next 1,000 records for each table in batches |
-| run_write_task function | Main logic for write processes. Retrieves as much data as possible from the queue and writes in batches |
-| SQLWriter class | Handles SQL writing and automatic table creation |
-| StmtWriter class | Implements batch writing with parameter binding (not yet completed) |
-
-
-main function
-
-The main function is responsible for creating message queues and launching subprocesses, which are of 3 types:
-
-1. 1 monitoring process, responsible for database initialization and tracking write speed
-2. n read processes, responsible for reading data from other data systems
-3. m write processes, responsible for writing to the database
-
-The main function can accept 5 startup parameters, in order:
-
-1. Number of read tasks (processes), default is 1
-2. Number of write tasks (processes), default is 1
-3. Total number of simulated tables, default is 1,000
-4. Queue size (in bytes), default is 1,000,000
-5. Maximum number of records written per batch, default is 3,000
-
-```python
-{{#include docs/examples/python/fast_write_example.py:main}}
-```
-
-
-
-
-run_monitor_process
-
-The monitoring process is responsible for initializing the database and monitoring the current write speed.
-
-```python
-{{#include docs/examples/python/fast_write_example.py:monitor}}
-```
-
-
-
-
-
-run_read_task function
-
-The read process, responsible for reading data from other data systems and distributing it to assigned queues.
-
-```python
-{{#include docs/examples/python/fast_write_example.py:read}}
-```
-
-
-
-
-
-MockDataSource
-
-Below is the implementation of the mock data source. We assume that each piece of data generated by the data source includes the target table name information. In practice, you might need certain rules to determine the target table name.
-
-```python
-{{#include docs/examples/python/mockdatasource.py}}
-```
-
-
-
-
-run_write_task function
-
-The write process retrieves as much data as possible from the queue and writes in batches.
-
-```python
-{{#include docs/examples/python/fast_write_example.py:write}}
-```
-
-
-
-
-
-The SQLWriter class encapsulates the logic of SQL stitching and data writing. None of the tables are pre-created; instead, they are batch-created using the supertable as a template when a table does not exist error occurs, and then the INSERT statement is re-executed. For other errors, the SQL executed at the time is recorded for error troubleshooting and fault recovery. This class also checks whether the SQL exceeds the maximum length limit, based on the TDengine 3.0 limit, the supported maximum SQL length of 1,048,576 is passed in by the input parameter maxSQLLength.
-
-SQLWriter
-
-```python
-{{#include docs/examples/python/sql_writer.py}}
-```
-
-
-
-**Execution Steps**
-
-
-
-Execute the Python Example Program
-
-1. Prerequisites
-
- - TDengine client driver installed
- - Python3 installed, recommended version >= 3.8
- - taospy installed
-
-2. Install faster-fifo to replace the built-in multiprocessing.Queue in python
-
- ```shell
- pip3 install faster-fifo
- ```
-
-3. Click the "View Source" link above to copy the `fast_write_example.py`, `sql_writer.py`, and `mockdatasource.py` files.
-
-4. Execute the example program
-
- ```shell
- python3 fast_write_example.py
- ```
-
- Below is an actual output from a run, on a machine configured with 16 cores + 64G + SSD.
-
- ```text
- root@vm85$ python3 fast_write_example.py 8 8
- 2022-07-14 19:13:45,869 [root] - READ_TASK_COUNT=8, WRITE_TASK_COUNT=8, TABLE_COUNT=1000, QUEUE_SIZE=1000000, MAX_BATCH_SIZE=3000
- 2022-07-14 19:13:48,882 [root] - WriteTask-0 started with pid 718347
- 2022-07-14 19:13:48,883 [root] - WriteTask-1 started with pid 718348
- 2022-07-14 19:13:48,884 [root] - WriteTask-2 started with pid 718349
- 2022-07-14 19:13:48,884 [root] - WriteTask-3 started with pid 718350
- 2022-07-14 19:13:48,885 [root] - WriteTask-4 started with pid 718351
- 2022-07-14 19:13:48,885 [root] - WriteTask-5 started with pid 718352
- 2022-07-14 19:13:48,886 [root] - WriteTask-6 started with pid 718353
- 2022-07-14 19:13:48,886 [root] - WriteTask-7 started with pid 718354
- 2022-07-14 19:13:48,887 [root] - ReadTask-0 started with pid 718355
- 2022-07-14 19:13:48,888 [root] - ReadTask-1 started with pid 718356
- 2022-07-14 19:13:48,889 [root] - ReadTask-2 started with pid 718357
- 2022-07-14 19:13:48,889 [root] - ReadTask-3 started with pid 718358
- 2022-07-14 19:13:48,890 [root] - ReadTask-4 started with pid 718359
- 2022-07-14 19:13:48,891 [root] - ReadTask-5 started with pid 718361
- 2022-07-14 19:13:48,892 [root] - ReadTask-6 started with pid 718364
- 2022-07-14 19:13:48,893 [root] - ReadTask-7 started with pid 718365
- 2022-07-14 19:13:56,042 [DataBaseMonitor] - count=6676310 speed=667631.0
- 2022-07-14 19:14:06,196 [DataBaseMonitor] - count=20004310 speed=1332800.0
- 2022-07-14 19:14:16,366 [DataBaseMonitor] - count=32290310 speed=1228600.0
- 2022-07-14 19:14:26,527 [DataBaseMonitor] - count=44438310 speed=1214800.0
- 2022-07-14 19:14:36,673 [DataBaseMonitor] - count=56608310 speed=1217000.0
- 2022-07-14 19:14:46,834 [DataBaseMonitor] - count=68757310 speed=1214900.0
- 2022-07-14 19:14:57,280 [DataBaseMonitor] - count=80992310 speed=1223500.0
- 2022-07-14 19:15:07,689 [DataBaseMonitor] - count=93805310 speed=1281300.0
- 2022-07-14 19:15:18,020 [DataBaseMonitor] - count=106111310 speed=1230600.0
- 2022-07-14 19:15:28,356 [DataBaseMonitor] - count=118394310 speed=1228300.0
- 2022-07-14 19:15:38,690 [DataBaseMonitor] - count=130742310 speed=1234800.0
- 2022-07-14 19:15:49,000 [DataBaseMonitor] - count=143051310 speed=1230900.0
- 2022-07-14 19:15:59,323 [DataBaseMonitor] - count=155276310 speed=1222500.0
- 2022-07-14 19:16:09,649 [DataBaseMonitor] - count=167603310 speed=1232700.0
- 2022-07-14 19:16:19,995 [DataBaseMonitor] - count=179976310 speed=1237300.0
- ```
-
-
-
-:::note
-When using the Python connector to connect to TDengine with multiple processes, there is a limitation: connections cannot be established in the parent process; all connections must be created in the child processes.
-If a connection is created in the parent process, any connection attempts in the child processes will be perpetually blocked. This is a known issue.
-
-:::
-
-
diff --git a/docs/en/08-operation/15-sec.md b/docs/en/08-operation/17-security-suggestions.md
similarity index 99%
rename from docs/en/08-operation/15-sec.md
rename to docs/en/08-operation/17-security-suggestions.md
index ed7ac529f9..62f2a43251 100644
--- a/docs/en/08-operation/15-sec.md
+++ b/docs/en/08-operation/17-security-suggestions.md
@@ -1,6 +1,6 @@
---
-sidebar_label: Security Configuration
-title: Security Configuration
+sidebar_label: Security Suggestions
+title: Security Suggestions
toc_max_heading_level: 4
---
diff --git a/docs/en/assets/ingesting-data-efficiently-01.png b/docs/en/assets/ingesting-data-efficiently-01.png
index b540d64717..fb88e10197 100644
Binary files a/docs/en/assets/ingesting-data-efficiently-01.png and b/docs/en/assets/ingesting-data-efficiently-01.png differ
diff --git a/docs/examples/JDBC/highvolume/.gitignore b/docs/examples/JDBC/highvolume/.gitignore
new file mode 100644
index 0000000000..2931eae3d1
--- /dev/null
+++ b/docs/examples/JDBC/highvolume/.gitignore
@@ -0,0 +1,34 @@
+.mvn/
+target/
+!**/src/main/**
+!**/src/test/**
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+
+### VS Code ###
+.vscode/
+
+
+.DS_Store
+
diff --git a/docs/examples/JDBC/highvolume/pom.xml b/docs/examples/JDBC/highvolume/pom.xml
new file mode 100644
index 0000000000..921df7a52d
--- /dev/null
+++ b/docs/examples/JDBC/highvolume/pom.xml
@@ -0,0 +1,91 @@
+
+
+ 4.0.0
+
+ com.taosdata.jdbc
+ highVolume
+ SNAPSHOT
+ jar
+
+
+ src/main/resources/assembly
+ 1.8
+
+
+
+
+
+ com.taosdata.jdbc
+ taos-jdbcdriver
+ 3.6.0
+
+
+ commons-cli
+ commons-cli
+ 1.4
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 3.9.0
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.25
+
+
+ ch.qos.logback
+ logback-classic
+ 1.2.3
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 3.3.0
+
+
+ highVolume
+
+ highVolume
+ false
+
+
+ com.taos.example.highvolume.FastWriteExample
+
+
+
+ jar-with-dependencies
+
+
+ package
+
+ single
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 8
+ 8
+ UTF-8
+
+
+
+
+
+
+
+
diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ConsumerTask.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ConsumerTask.java
new file mode 100644
index 0000000000..3e653d691d
--- /dev/null
+++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ConsumerTask.java
@@ -0,0 +1,120 @@
+package com.taos.example.highvolume;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+class ConsumerTask implements Runnable, Stoppable {
+ private static final Logger logger = LoggerFactory.getLogger(ConsumerTask.class);
+ private final int taskId;
+ private final int writeThreadCount;
+ private final int batchSizeByRow;
+ private final int cacheSizeByRow;
+ private final String dbName;
+ private volatile boolean active = true;
+
+ public ConsumerTask(int taskId,
+ int writeThreadCount,
+ int batchSizeByRow,
+ int cacheSizeByRow,
+ String dbName) {
+ this.taskId = taskId;
+ this.writeThreadCount = writeThreadCount;
+ this.batchSizeByRow = batchSizeByRow;
+ this.cacheSizeByRow = cacheSizeByRow;
+ this.dbName = dbName;
+ }
+
+ @Override
+ public void run() {
+ logger.info("Consumer Task {} started", taskId);
+
+ Properties props = new Properties();
+
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Util.getKafkaBootstrapServers());
+
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
+
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(batchSizeByRow));
+ props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "3000");
+
+ props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, String.valueOf(2 * 1024 * 1024));
+
+ props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "15000");
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
+
+ KafkaConsumer consumer = new KafkaConsumer<>(props);
+
+ List topics = Collections.singletonList(Util.getKafkaTopic());
+
+ try {
+ consumer.subscribe(topics);
+ } catch (Exception e) {
+ logger.error("Consumer Task {} Error", taskId, e);
+ return;
+ }
+
+ try (Connection connection = Util.getConnection(batchSizeByRow, cacheSizeByRow, writeThreadCount);
+ PreparedStatement pstmt = connection.prepareStatement("INSERT INTO " + dbName +".meters (tbname, ts, current, voltage, phase) VALUES (?,?,?,?,?)")) {
+ long i = 0L;
+ long lastTimePolled = System.currentTimeMillis();
+ while (active) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord metersRecord : records) {
+ i++;
+ Meters meters = Meters.fromString(metersRecord.value());
+ pstmt.setString(1, meters.getTableName());
+ pstmt.setTimestamp(2, meters.getTs());
+ pstmt.setFloat(3, meters.getCurrent());
+ pstmt.setInt(4, meters.getVoltage());
+ pstmt.setFloat(5, meters.getPhase());
+ pstmt.addBatch();
+
+ if (i % batchSizeByRow == 0) {
+ pstmt.executeBatch();
+ }
+ if (i % (10L * batchSizeByRow) == 0){
+ pstmt.executeUpdate();
+ consumer.commitSync();
+ }
+ }
+
+ if (!records.isEmpty()){
+ lastTimePolled = System.currentTimeMillis();
+ } else {
+ if (System.currentTimeMillis() - lastTimePolled > 1000 * 60) {
+ lastTimePolled = System.currentTimeMillis();
+ logger.error("Consumer Task {} has been idle for 10 seconds, stopping", taskId);
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Consumer Task {} Error", taskId, e);
+ } finally {
+ consumer.close();
+ }
+
+ logger.info("Consumer Task {} stopped", taskId);
+ }
+
+ public void stop() {
+ logger.info("consumer task {} stopping", taskId);
+ this.active = false;
+ }
+}
\ No newline at end of file
diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/CreateSubTableTask.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/CreateSubTableTask.java
new file mode 100644
index 0000000000..270f53dfbc
--- /dev/null
+++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/CreateSubTableTask.java
@@ -0,0 +1,56 @@
+package com.taos.example.highvolume;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+class CreateSubTableTask implements Runnable {
+ private static final Logger logger = LoggerFactory.getLogger(CreateSubTableTask.class);
+ private final int taskId;
+ private final int subTableStartIndex;
+ private final int subTableEndIndex;
+ private final String dbName;
+
+ public CreateSubTableTask(int taskId,
+ int subTableStartIndex,
+ int subTableEndIndex,
+ String dbName) {
+ this.taskId = taskId;
+ this.subTableStartIndex = subTableStartIndex;
+ this.subTableEndIndex = subTableEndIndex;
+ this.dbName = dbName;
+ }
+
+ @Override
+ public void run() {
+ try (Connection connection = Util.getConnection();
+ Statement statement = connection.createStatement()){
+ statement.execute("use " + dbName);
+ StringBuilder sql = new StringBuilder();
+ sql.append("create table");
+ int i = 0;
+ for (int tableNum = subTableStartIndex; tableNum <= subTableEndIndex; tableNum++) {
+ sql.append(" if not exists " + Util.getTableNamePrefix() + tableNum + " using meters" + " tags(" + tableNum + ", " + "\"location_" + tableNum + "\"" + ")");
+
+ if (i < 1000) {
+ i++;
+ } else {
+ statement.execute(sql.toString());
+ sql = new StringBuilder();
+ sql.append("create table");
+ i = 0;
+ }
+ }
+ if (sql.length() > "create table".length()) {
+ statement.execute(sql.toString());
+ }
+ } catch (SQLException e) {
+ logger.error("task id {}, failed to create sub table", taskId, e);
+ }
+ }
+
+
+}
\ No newline at end of file
diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/FastWriteExample.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/FastWriteExample.java
new file mode 100644
index 0000000000..b118f9d8fa
--- /dev/null
+++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/FastWriteExample.java
@@ -0,0 +1,332 @@
+package com.taos.example.highvolume;
+
+import org.apache.commons.cli.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class FastWriteExample {
+ static final Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
+ static ThreadPoolExecutor writerThreads;
+ static ThreadPoolExecutor producerThreads;
+ static final ThreadPoolExecutor statThread = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
+ private static final List allTasks = new ArrayList<>();
+
+ private static int readThreadCount = 5;
+ private static int writeThreadPerReadThread = 5;
+ private static int batchSizeByRow = 1000;
+ private static int cacheSizeByRow = 10000;
+ private static int subTableNum = 1000000;
+ private static int rowsPerSubTable = 100;
+ private static String dbName = "test";
+
+
+ public static void forceStopAll() {
+ logger.info("shutting down");
+
+ for (Stoppable task : allTasks) {
+ task.stop();
+ }
+
+ if (producerThreads != null) {
+ producerThreads.shutdown();
+ }
+
+ if (writerThreads != null) {
+ writerThreads.shutdown();
+ }
+
+ statThread.shutdown();
+ }
+
+ private static void createSubTables(){
+ writerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, getNamedThreadFactory("FW-CreateSubTable-thread-"));
+
+ int range = (subTableNum + readThreadCount - 1) / readThreadCount;
+
+ for (int i = 0; i < readThreadCount; i++) {
+ int startIndex = i * range;
+ int endIndex;
+ if (i == readThreadCount - 1) {
+ endIndex = subTableNum - 1;
+ } else {
+ endIndex = startIndex + range - 1;
+ }
+
+ logger.debug("create sub table task {} {} {}", i, startIndex, endIndex);
+
+ CreateSubTableTask createSubTableTask = new CreateSubTableTask(i,
+ startIndex,
+ endIndex,
+ dbName);
+ writerThreads.submit(createSubTableTask);
+ }
+
+ logger.info("create sub table task started.");
+
+ while (writerThreads.getActiveCount() != 0) {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ logger.info("create sub table task finished.");
+
+ }
+
+ public static void startStatTask() throws SQLException {
+ StatTask statTask = new StatTask(dbName, subTableNum);
+ allTasks.add(statTask);
+ statThread.submit(statTask);
+ }
+ public static ThreadFactory getNamedThreadFactory(String namePrefix) {
+ return new ThreadFactory() {
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, namePrefix + threadNumber.getAndIncrement());
+ }
+ };
+ }
+
+ private static void invokeKafkaDemo() throws SQLException {
+ producerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, getNamedThreadFactory("FW-kafka-producer-thread-"));
+ writerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, getNamedThreadFactory("FW-kafka-consumer-thread-"));
+
+ int range = (subTableNum + readThreadCount - 1) / readThreadCount;
+
+ for (int i = 0; i < readThreadCount; i++) {
+ int startIndex = i * range;
+ int endIndex;
+ if (i == readThreadCount - 1) {
+ endIndex = subTableNum - 1;
+ } else {
+ endIndex = startIndex + range - 1;
+ }
+
+ ProducerTask producerTask = new ProducerTask(i,
+ rowsPerSubTable,
+ startIndex,
+ endIndex);
+ allTasks.add(producerTask);
+ producerThreads.submit(producerTask);
+
+ ConsumerTask consumerTask = new ConsumerTask(i,
+ writeThreadPerReadThread,
+ batchSizeByRow,
+ cacheSizeByRow,
+ dbName);
+ allTasks.add(consumerTask);
+ writerThreads.submit(consumerTask);
+ }
+
+ startStatTask();
+ Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::forceStopAll));
+
+ while (writerThreads.getActiveCount() != 0) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ private static void invokeMockDataDemo() throws SQLException {
+ ThreadFactory namedThreadFactory = new ThreadFactory() {
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final String namePrefix = "FW-work-thread-";
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, namePrefix + threadNumber.getAndIncrement());
+ }
+ };
+
+ writerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, namedThreadFactory);
+
+ int range = (subTableNum + readThreadCount - 1) / readThreadCount;
+
+ for (int i = 0; i < readThreadCount; i++) {
+ int startIndex = i * range;
+ int endIndex;
+ if (i == readThreadCount - 1) {
+ endIndex = subTableNum - 1;
+ } else {
+ endIndex = startIndex + range - 1;
+ }
+
+ WorkTask task = new WorkTask(i,
+ writeThreadPerReadThread,
+ batchSizeByRow,
+ cacheSizeByRow,
+ rowsPerSubTable,
+ startIndex,
+ endIndex,
+ dbName);
+ allTasks.add(task);
+ writerThreads.submit(task);
+ }
+
+ startStatTask();
+ Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::forceStopAll));
+
+ while (writerThreads.getActiveCount() != 0) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ // print help
+ private static void printHelp(Options options) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("java -jar highVolume.jar", options);
+ System.out.println();
+ }
+
+ public static void main(String[] args) throws SQLException, InterruptedException {
+ Options options = new Options();
+
+ Option readThdcountOption = new Option("r", "readThreadCount", true, "Specify the readThreadCount, default is 5");
+ readThdcountOption.setRequired(false);
+ options.addOption(readThdcountOption);
+
+ Option writeThdcountOption = new Option("w", "writeThreadPerReadThread", true, "Specify the writeThreadPerReadThread, default is 5");
+ writeThdcountOption.setRequired(false);
+ options.addOption(writeThdcountOption);
+
+ Option batchSizeOption = new Option("b", "batchSizeByRow", true, "Specify the batchSizeByRow, default is 1000");
+ batchSizeOption.setRequired(false);
+ options.addOption(batchSizeOption);
+
+ Option cacheSizeOption = new Option("c", "cacheSizeByRow", true, "Specify the cacheSizeByRow, default is 10000");
+ cacheSizeOption.setRequired(false);
+ options.addOption(cacheSizeOption);
+
+ Option subTablesOption = new Option("s", "subTableNum", true, "Specify the subTableNum, default is 1000000");
+ subTablesOption.setRequired(false);
+ options.addOption(subTablesOption);
+
+ Option rowsPerTableOption = new Option("R", "rowsPerSubTable", true, "Specify the rowsPerSubTable, default is 100");
+ rowsPerTableOption.setRequired(false);
+ options.addOption(rowsPerTableOption);
+
+ Option dbNameOption = new Option("d", "dbName", true, "Specify the database name, default is test");
+ dbNameOption.setRequired(false);
+ options.addOption(dbNameOption);
+
+ Option kafkaOption = new Option("K", "useKafka", false, "use kafka demo to test");
+ kafkaOption.setRequired(false);
+ options.addOption(kafkaOption);
+
+
+ Option helpOption = new Option(null, "help", false, "print help information");
+ helpOption.setRequired(false);
+ options.addOption(helpOption);
+
+ CommandLineParser parser = new DefaultParser();
+ CommandLine cmd;
+
+ try {
+ cmd = parser.parse(options, args);
+ } catch (ParseException e) {
+ System.out.println(e.getMessage());
+ printHelp(options);
+ System.exit(1);
+ return;
+ }
+
+ if (cmd.hasOption("help")) {
+ printHelp(options);
+ return;
+ }
+
+ if (cmd.getOptionValue("readThreadCount") != null) {
+ readThreadCount = Integer.parseInt(cmd.getOptionValue("readThreadCount"));
+ if (readThreadCount <= 0){
+ logger.error("readThreadCount must be greater than 0");
+ return;
+ }
+ }
+
+ if (cmd.getOptionValue("writeThreadPerReadThread") != null) {
+ writeThreadPerReadThread = Integer.parseInt(cmd.getOptionValue("writeThreadPerReadThread"));
+ if (writeThreadPerReadThread <= 0){
+ logger.error("writeThreadPerReadThread must be greater than 0");
+ return;
+ }
+ }
+
+ if (cmd.getOptionValue("batchSizeByRow") != null) {
+ batchSizeByRow = Integer.parseInt(cmd.getOptionValue("batchSizeByRow"));
+ if (batchSizeByRow <= 0){
+ logger.error("batchSizeByRow must be greater than 0");
+ return;
+ }
+ }
+
+ if (cmd.getOptionValue("cacheSizeByRow") != null) {
+ cacheSizeByRow = Integer.parseInt(cmd.getOptionValue("cacheSizeByRow"));
+ if (cacheSizeByRow <= 0){
+ logger.error("cacheSizeByRow must be greater than 0");
+ return;
+ }
+ }
+
+ if (cmd.getOptionValue("subTableNum") != null) {
+ subTableNum = Integer.parseInt(cmd.getOptionValue("subTableNum"));
+ if (subTableNum <= 0){
+ logger.error("subTableNum must be greater than 0");
+ return;
+ }
+ }
+
+ if (cmd.getOptionValue("rowsPerSubTable") != null) {
+ rowsPerSubTable = Integer.parseInt(cmd.getOptionValue("rowsPerSubTable"));
+ if (rowsPerSubTable <= 0){
+ logger.error("rowsPerSubTable must be greater than 0");
+ return;
+ }
+ }
+
+ if (cmd.getOptionValue("dbName") != null) {
+ dbName = cmd.getOptionValue("dbName");
+ }
+
+ logger.info("readThreadCount={}, writeThreadPerReadThread={} batchSizeByRow={} cacheSizeByRow={}, subTableNum={}, rowsPerSubTable={}",
+ readThreadCount, writeThreadPerReadThread, batchSizeByRow, cacheSizeByRow, subTableNum, rowsPerSubTable);
+
+ logger.info("create database begin.");
+ Util.prepareDatabase(dbName);
+
+ logger.info("create database end.");
+
+ logger.info("create sub tables start.");
+ createSubTables();
+ logger.info("create sub tables end.");
+
+
+ if (cmd.hasOption("K")) {
+ Util.createKafkaTopic();
+ // use kafka demo
+ invokeKafkaDemo();
+
+ } else {
+ // use mock data source demo
+ invokeMockDataDemo();
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Meters.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Meters.java
new file mode 100644
index 0000000000..7db0d9f2a1
--- /dev/null
+++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Meters.java
@@ -0,0 +1,76 @@
+package com.taos.example.highvolume;
+
+import java.sql.Timestamp;
+
+public class Meters {
+ String tableName;
+ Timestamp ts;
+ float current;
+ int voltage;
+ float phase;
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public Timestamp getTs() {
+ return ts;
+ }
+
+ public void setTs(Timestamp ts) {
+ this.ts = ts;
+ }
+
+ public float getCurrent() {
+ return current;
+ }
+
+ public void setCurrent(float current) {
+ this.current = current;
+ }
+
+ public int getVoltage() {
+ return voltage;
+ }
+
+ public void setVoltage(int voltage) {
+ this.voltage = voltage;
+ }
+
+ public float getPhase() {
+ return phase;
+ }
+
+ public void setPhase(float phase) {
+ this.phase = phase;
+ }
+
+ @Override
+ // this is just a demo, so we don't need to implement the full CSV parser
+ public String toString() {
+ return tableName + "," +
+ ts.toString() + "," +
+ current + "," +
+ voltage + "," +
+ phase;
+ }
+
+ public static Meters fromString(String str) {
+ String[] parts = str.split(",");
+ if (parts.length != 5) {
+ throw new IllegalArgumentException("Invalid input format");
+ }
+ Meters meters = new Meters();
+ meters.setTableName(parts[0]);
+ meters.setTs(Timestamp.valueOf(parts[1]));
+ meters.setCurrent(Float.parseFloat(parts[2]));
+ meters.setVoltage(Integer.parseInt(parts[3]));
+ meters.setPhase(Float.parseFloat(parts[4]));
+ return meters;
+ }
+
+}
diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/MockDataSource.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/MockDataSource.java
new file mode 100644
index 0000000000..603f33496c
--- /dev/null
+++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/MockDataSource.java
@@ -0,0 +1,56 @@
+package com.taos.example.highvolume;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * Generate test data
+ */
+class MockDataSource implements Iterator {
+ private final static Logger logger = LoggerFactory.getLogger(MockDataSource.class);
+
+ private final int tableStartIndex;
+ private final int tableEndIndex;
+ private final long maxRowsPerTable;
+
+ long currentMs = System.currentTimeMillis();
+ private int index = 0;
+ private Random random;
+
+ // mock values
+
+ public MockDataSource(int tableStartIndex, int tableEndIndex, int maxRowsPerTable) {
+ this.tableStartIndex = tableStartIndex;
+ this.tableEndIndex = tableEndIndex;
+ this.maxRowsPerTable = maxRowsPerTable;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < (tableEndIndex - tableStartIndex + 1) * maxRowsPerTable;
+ }
+
+ @Override
+ public Meters next() {
+ // use interlace rows to simulate the data distribution in real world
+ if (index % (tableEndIndex - tableStartIndex + 1) == 0) {
+ currentMs += 1000;
+ }
+
+ long currentTbId = index % (tableEndIndex - tableStartIndex + 1) + tableStartIndex;
+
+ Meters meters = new Meters();
+
+ meters.setTableName(Util.getTableNamePrefix() + currentTbId);
+ meters.setTs(new java.sql.Timestamp(currentMs));
+ meters.setCurrent((float) (Math.random() * 100));
+ meters.setVoltage(random.nextInt());
+ meters.setPhase((float) (Math.random() * 100));
+
+ index ++;
+ return meters;
+ }
+}
diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ProducerTask.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ProducerTask.java
new file mode 100644
index 0000000000..8d0d0a082d
--- /dev/null
+++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ProducerTask.java
@@ -0,0 +1,68 @@
+package com.taos.example.highvolume;
+
+import com.taosdata.jdbc.utils.ReqId;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.Properties;
+
+class ProducerTask implements Runnable, Stoppable {
+ private final static Logger logger = LoggerFactory.getLogger(ProducerTask.class);
+ private final int taskId;
+ private final int subTableStartIndex;
+ private final int subTableEndIndex;
+ private final int rowsPerTable;
+ private volatile boolean active = true;
+ public ProducerTask(int taskId,
+ int rowsPerTable,
+ int subTableStartIndex,
+ int subTableEndIndex) {
+ this.taskId = taskId;
+ this.subTableStartIndex = subTableStartIndex;
+ this.subTableEndIndex = subTableEndIndex;
+ this.rowsPerTable = rowsPerTable;
+ }
+
+ @Override
+ public void run() {
+ logger.info("kafak producer {}, started", taskId);
+ Iterator it = new MockDataSource(subTableStartIndex, subTableEndIndex, rowsPerTable);
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", Util.getKafkaBootstrapServers());
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("batch.size", 1024 * 1024);
+ props.put("linger.ms", 500);
+
+ // create a Kafka producer
+ KafkaProducer producer = new KafkaProducer<>(props);
+
+ try {
+ while (it.hasNext() && active) {
+ Meters meters = it.next();
+ String key = meters.getTableName();
+ String value = meters.toString();
+ // to avoid the data of the sub-table out of order. we use the partition key to ensure the data of the same sub-table is sent to the same partition.
+ // Because efficient writing use String hashcode,here we use another hash algorithm to calculate the partition key.
+ long hashCode = Math.abs(ReqId.murmurHash32(key.getBytes(), 0));
+ ProducerRecord metersRecord = new ProducerRecord<>(Util.getKafkaTopic(), (int)(hashCode % Util.getPartitionCount()), key, value);
+ producer.send(metersRecord);
+ }
+ } catch (Exception e) {
+ logger.error("task id {}, send message error: ", taskId, e);
+ }
+ finally {
+ producer.close();
+ }
+ logger.info("kafka producer {} stopped", taskId);
+ }
+
+ public void stop() {
+ logger.info("kafka producer {} stopping", taskId);
+ this.active = false;
+ }
+}
\ No newline at end of file
diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/StatTask.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/StatTask.java
new file mode 100644
index 0000000000..b8685188c6
--- /dev/null
+++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/StatTask.java
@@ -0,0 +1,60 @@
+package com.taos.example.highvolume;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+class StatTask implements Runnable, Stoppable {
+ private final static Logger logger = LoggerFactory.getLogger(StatTask.class);
+ private final int subTableNum;
+ private final String dbName;
+ private final Connection conn;
+ private final Statement stmt;
+ private volatile boolean active = true;
+
+
+ public StatTask(String dbName,
+ int subTableNum) throws SQLException {
+ this.dbName = dbName;
+ this.subTableNum = subTableNum;
+ this.conn = Util.getConnection();
+ this.stmt = conn.createStatement();
+ }
+
+ @Override
+ public void run() {
+ long lastCount = 0;
+
+ while (active) {
+ try {
+ Thread.sleep(10000);
+
+ long count = Util.count(stmt, dbName);
+ logger.info("numberOfTable={} count={} speed={}", subTableNum, count, (count - lastCount) / 10);
+ lastCount = count;
+ } catch (InterruptedException e) {
+ logger.error("interrupted", e);
+ break;
+ } catch (SQLException e) {
+ logger.error("execute sql error: ", e);
+ break;
+ }
+ }
+
+ try {
+ stmt.close();
+ conn.close();
+ } catch (SQLException e) {
+ logger.error("close connection error: ", e);
+ }
+ }
+
+ public void stop() {
+ active = false;
+ }
+
+
+}
\ No newline at end of file
diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Stoppable.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Stoppable.java
new file mode 100644
index 0000000000..f3b249eb80
--- /dev/null
+++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Stoppable.java
@@ -0,0 +1,5 @@
+package com.taos.example.highvolume;
+
+public interface Stoppable {
+ void stop();
+}
diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Util.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Util.java
new file mode 100644
index 0000000000..5e4214e61b
--- /dev/null
+++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Util.java
@@ -0,0 +1,104 @@
+package com.taos.example.highvolume;
+
+import com.taosdata.jdbc.TSDBDriver;
+
+import java.sql.*;
+import java.util.Properties;
+
+import org.apache.kafka.clients.admin.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+public class Util {
+ private final static Logger logger = LoggerFactory.getLogger(Util.class);
+
+ public static String getTableNamePrefix() {
+ return "d_";
+ }
+
+ public static Connection getConnection() throws SQLException {
+ String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
+ if (jdbcURL == null || jdbcURL == "") {
+ jdbcURL = "jdbc:TAOS-WS://localhost:6041/?user=root&password=taosdata";
+ }
+ return DriverManager.getConnection(jdbcURL);
+ }
+
+ public static Connection getConnection(int batchSize, int cacheSize, int writeThreadNum) throws SQLException {
+ String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
+ if (jdbcURL == null || jdbcURL == "") {
+ jdbcURL = "jdbc:TAOS-WS://localhost:6041/?user=root&password=taosdata";
+ }
+ Properties properties = new Properties();
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_ASYNC_WRITE, "stmt");
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_SIZE_BY_ROW, String.valueOf(batchSize));
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_CACHE_SIZE_BY_ROW, String.valueOf(cacheSize));
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_BACKEND_WRITE_THREAD_NUM, String.valueOf(writeThreadNum));
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
+ return DriverManager.getConnection(jdbcURL, properties);
+ }
+ public static void prepareDatabase(String dbName) throws SQLException {
+ try (Connection conn =Util.getConnection();
+ Statement stmt = conn.createStatement()){
+ stmt.execute("DROP DATABASE IF EXISTS " + dbName);
+ stmt.execute("CREATE DATABASE IF NOT EXISTS " + dbName + " vgroups 20");
+ stmt.execute("use " + dbName);
+ stmt.execute("CREATE STABLE " + dbName + ".meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(64))");
+ }
+ }
+
+ public static long count(Statement stmt, String dbName) throws SQLException {
+ try (ResultSet result = stmt.executeQuery("SELECT count(*) from " + dbName +".meters")) {
+ result.next();
+ return result.getLong(1);
+ }
+ }
+
+
+ public static String getKafkaBootstrapServers() {
+ String kafkaBootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
+ if (kafkaBootstrapServers == null || kafkaBootstrapServers == "") {
+ kafkaBootstrapServers = "localhost:9092";
+ }
+
+ return kafkaBootstrapServers;
+ }
+
+ public static String getKafkaTopic() {
+ return "test-meters-topic";
+ }
+
+ public static void createKafkaTopic() {
+ Properties config = new Properties();
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers());
+
+ try (AdminClient adminClient = AdminClient.create(config)) {
+ String topicName = getKafkaTopic();
+ int numPartitions = getPartitionCount();
+ short replicationFactor = 1;
+
+ ListTopicsResult topics = adminClient.listTopics();
+ Set existingTopics = topics.names().get();
+
+ if (!existingTopics.contains(topicName)) {
+ NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
+ CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
+ createTopicsResult.all().get(); // 等待创建完成
+ logger.info("Topic " + topicName + " created successfully.");
+ }
+
+ } catch (InterruptedException | ExecutionException e) {
+ logger.error("Failed to delete/create topic: " + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static int getPartitionCount() {
+ return 5;
+ }
+
+
+}
diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/WorkTask.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/WorkTask.java
new file mode 100644
index 0000000000..29b9c06484
--- /dev/null
+++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/WorkTask.java
@@ -0,0 +1,74 @@
+package com.taos.example.highvolume;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.Iterator;
+
+class WorkTask implements Runnable, Stoppable {
+ private static final Logger logger = LoggerFactory.getLogger(WorkTask.class);
+ private final int taskId;
+ private final int writeThreadCount;
+ private final int batchSizeByRow;
+ private final int cacheSizeByRow;
+ private final int rowsPerTable;
+ private final int subTableStartIndex;
+ private final int subTableEndIndex;
+ private final String dbName;
+ private volatile boolean active = true;
+ public WorkTask(int taskId,
+ int writeThradCount,
+ int batchSizeByRow,
+ int cacheSizeByRow,
+ int rowsPerTable,
+ int subTableStartIndex,
+ int subTableEndIndex,
+ String dbName) {
+ this.taskId = taskId;
+ this.writeThreadCount = writeThradCount;
+ this.batchSizeByRow = batchSizeByRow;
+ this.cacheSizeByRow = cacheSizeByRow;
+ this.rowsPerTable = rowsPerTable;
+ this.subTableStartIndex = subTableStartIndex; // for this task, the start index of sub table
+ this.subTableEndIndex = subTableEndIndex; // for this task, the end index of sub table
+ this.dbName = dbName;
+ }
+
+ @Override
+ public void run() {
+ logger.info("task {} started", taskId);
+ Iterator it = new MockDataSource(subTableStartIndex, subTableEndIndex, rowsPerTable);
+ try (Connection connection = Util.getConnection(batchSizeByRow, cacheSizeByRow, writeThreadCount);
+ PreparedStatement pstmt = connection.prepareStatement("INSERT INTO " + dbName +".meters (tbname, ts, current, voltage, phase) VALUES (?,?,?,?,?)")) {
+ long i = 0L;
+ while (it.hasNext() && active) {
+ i++;
+ Meters meters = it.next();
+ pstmt.setString(1, meters.getTableName());
+ pstmt.setTimestamp(2, meters.getTs());
+ pstmt.setFloat(3, meters.getCurrent());
+ pstmt.setInt(4, meters.getVoltage());
+ pstmt.setFloat(5, meters.getPhase());
+ pstmt.addBatch();
+
+ if (i % batchSizeByRow == 0) {
+ pstmt.executeBatch();
+ }
+
+ if (i % (10L * batchSizeByRow) == 0){
+ pstmt.executeUpdate();
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Work Task {} Error", taskId, e);
+ }
+ logger.info("task {} stopped", taskId);
+ }
+
+ public void stop() {
+ logger.info("task {} stopping", taskId);
+ this.active = false;
+ }
+}
\ No newline at end of file
diff --git a/docs/examples/JDBC/highvolume/src/main/resources/logback.xml b/docs/examples/JDBC/highvolume/src/main/resources/logback.xml
new file mode 100644
index 0000000000..979d901a1c
--- /dev/null
+++ b/docs/examples/JDBC/highvolume/src/main/resources/logback.xml
@@ -0,0 +1,27 @@
+
+
+
+
+
+
+ ${logDir}/high-volume.log
+
+ ${pattern}
+
+
+
+
+
+ System.out
+
+
+ ${pattern}
+
+
+
+
+
+
+
+
+
diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java
deleted file mode 100644
index fa6ebf0858..0000000000
--- a/docs/examples/java/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package com.taos.example.highvolume;
-
-import java.sql.*;
-
-/**
- * Prepare target database.
- * Count total records in database periodically so that we can estimate the writing speed.
- */
-public class DataBaseMonitor {
- private Connection conn;
- private Statement stmt;
-
- public DataBaseMonitor init() throws SQLException {
- if (conn == null) {
- String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
- if (jdbcURL == null || jdbcURL == ""){
- jdbcURL = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
- }
- conn = DriverManager.getConnection(jdbcURL);
- stmt = conn.createStatement();
- }
- return this;
- }
-
- public void close() {
- try {
- stmt.close();
- } catch (SQLException e) {
- }
- try {
- conn.close();
- } catch (SQLException e) {
- }
- }
-
- public void prepareDatabase() throws SQLException {
- stmt.execute("DROP DATABASE IF EXISTS test");
- stmt.execute("CREATE DATABASE test");
- stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
- }
-
- public long count() throws SQLException {
- try (ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters")) {
- result.next();
- return result.getLong(1);
- }
- }
-
- public long getTableCount() throws SQLException {
- try (ResultSet result = stmt.executeQuery("select count(*) from information_schema.ins_tables where db_name = 'test';")) {
- result.next();
- return result.getLong(1);
- }
- }
-}
\ No newline at end of file
diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java
deleted file mode 100644
index 41b59551ca..0000000000
--- a/docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package com.taos.example.highvolume;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-
-public class FastWriteExample {
- final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
-
- final static int taskQueueCapacity = 1000000;
- final static List> taskQueues = new ArrayList<>();
- final static List readTasks = new ArrayList<>();
- final static List writeTasks = new ArrayList<>();
- final static DataBaseMonitor databaseMonitor = new DataBaseMonitor();
-
- public static void stopAll() {
- logger.info("shutting down");
- readTasks.forEach(task -> task.stop());
- writeTasks.forEach(task -> task.stop());
- databaseMonitor.close();
- }
-
- public static void main(String[] args) throws InterruptedException, SQLException {
- int readTaskCount = args.length > 0 ? Integer.parseInt(args[0]) : 1;
- int writeTaskCount = args.length > 1 ? Integer.parseInt(args[1]) : 3;
- int tableCount = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
- int maxBatchSize = args.length > 3 ? Integer.parseInt(args[3]) : 3000;
-
- logger.info("readTaskCount={}, writeTaskCount={} tableCount={} maxBatchSize={}",
- readTaskCount, writeTaskCount, tableCount, maxBatchSize);
-
- databaseMonitor.init().prepareDatabase();
-
- // Create task queues, whiting tasks and start writing threads.
- for (int i = 0; i < writeTaskCount; ++i) {
- BlockingQueue queue = new ArrayBlockingQueue<>(taskQueueCapacity);
- taskQueues.add(queue);
- WriteTask task = new WriteTask(queue, maxBatchSize);
- Thread t = new Thread(task);
- t.setName("WriteThread-" + i);
- t.start();
- }
-
- // create reading tasks and start reading threads
- int tableCountPerTask = tableCount / readTaskCount;
- for (int i = 0; i < readTaskCount; ++i) {
- ReadTask task = new ReadTask(i, taskQueues, tableCountPerTask);
- Thread t = new Thread(task);
- t.setName("ReadThread-" + i);
- t.start();
- }
-
- Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::stopAll));
-
- long lastCount = 0;
- while (true) {
- Thread.sleep(10000);
- long numberOfTable = databaseMonitor.getTableCount();
- long count = databaseMonitor.count();
- logger.info("numberOfTable={} count={} speed={}", numberOfTable, count, (count - lastCount) / 10);
- lastCount = count;
- }
- }
-}
\ No newline at end of file
diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java
deleted file mode 100644
index f0ebc53b4b..0000000000
--- a/docs/examples/java/src/main/java/com/taos/example/highvolume/MockDataSource.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package com.taos.example.highvolume;
-
-import java.util.Iterator;
-
-/**
- * Generate test data
- */
-class MockDataSource implements Iterator {
- private String tbNamePrefix;
- private int tableCount;
- private long maxRowsPerTable = 1000000000L;
-
- // 100 milliseconds between two neighbouring rows.
- long startMs = System.currentTimeMillis() - maxRowsPerTable * 100;
- private int currentRow = 0;
- private int currentTbId = -1;
-
- // mock values
- String[] location = {"California.LosAngeles", "California.SanDiego", "California.SanJose", "California.Campbell", "California.SanFrancisco"};
- float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f};
- int[] voltage = {119, 116, 111, 113, 118};
- float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f};
-
- public MockDataSource(String tbNamePrefix, int tableCount) {
- this.tbNamePrefix = tbNamePrefix;
- this.tableCount = tableCount;
- }
-
- @Override
- public boolean hasNext() {
- currentTbId += 1;
- if (currentTbId == tableCount) {
- currentTbId = 0;
- currentRow += 1;
- }
- return currentRow < maxRowsPerTable;
- }
-
- @Override
- public String next() {
- long ts = startMs + 100 * currentRow;
- int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1;
- StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName
- sb.append(ts).append(','); // ts
- sb.append(current[currentRow % 5]).append(','); // current
- sb.append(voltage[currentRow % 5]).append(','); // voltage
- sb.append(phase[currentRow % 5]).append(','); // phase
- sb.append(location[currentRow % 5]).append(','); // location
- sb.append(groupId); // groupID
-
- return sb.toString();
- }
-}
diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java
deleted file mode 100644
index a6fcfed1d2..0000000000
--- a/docs/examples/java/src/main/java/com/taos/example/highvolume/ReadTask.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package com.taos.example.highvolume;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-class ReadTask implements Runnable {
- private final static Logger logger = LoggerFactory.getLogger(ReadTask.class);
- private final int taskId;
- private final List> taskQueues;
- private final int queueCount;
- private final int tableCount;
- private boolean active = true;
-
- public ReadTask(int readTaskId, List> queues, int tableCount) {
- this.taskId = readTaskId;
- this.taskQueues = queues;
- this.queueCount = queues.size();
- this.tableCount = tableCount;
- }
-
- /**
- * Assign data received to different queues.
- * Here we use the suffix number in table name.
- * You are expected to define your own rule in practice.
- *
- * @param line record received
- * @return which queue to use
- */
- public int getQueueId(String line) {
- String tbName = line.substring(0, line.indexOf(',')); // For example: tb1_101
- String suffixNumber = tbName.split("_")[1];
- return Integer.parseInt(suffixNumber) % this.queueCount;
- }
-
- @Override
- public void run() {
- logger.info("started");
- Iterator it = new MockDataSource("tb" + this.taskId, tableCount);
- try {
- while (it.hasNext() && active) {
- String line = it.next();
- int queueId = getQueueId(line);
- taskQueues.get(queueId).put(line);
- }
- } catch (Exception e) {
- logger.error("Read Task Error", e);
- }
- }
-
- public void stop() {
- logger.info("stop");
- this.active = false;
- }
-}
\ No newline at end of file
diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java
deleted file mode 100644
index 1497992f6b..0000000000
--- a/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java
+++ /dev/null
@@ -1,200 +0,0 @@
-package com.taos.example.highvolume;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.*;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A helper class encapsulate the logic of writing using SQL.
- *
- * The main interfaces are two methods:
- *
- *
{@link SQLWriter#processLine}, which receive raw lines from WriteTask and group them by table names.
- *
{@link SQLWriter#flush}, which assemble INSERT statement and execute it.
- *
- *
- * There is a technical skill worth mentioning: we create table as needed when "table does not exist" error occur instead of creating table automatically using syntax "INSET INTO tb USING stb".
- * This ensure that checking table existence is a one-time-only operation.
- *
- *
- *
- */
-public class SQLWriter {
- final static Logger logger = LoggerFactory.getLogger(SQLWriter.class);
-
- private Connection conn;
- private Statement stmt;
-
- /**
- * current number of buffered records
- */
- private int bufferedCount = 0;
- /**
- * Maximum number of buffered records.
- * Flush action will be triggered if bufferedCount reached this value,
- */
- private int maxBatchSize;
-
-
- /**
- * Maximum SQL length.
- */
- private int maxSQLLength = 800_000;
-
- /**
- * Map from table name to column values. For example:
- * "tb001" -> "(1648432611249,2.1,114,0.09) (1648432611250,2.2,135,0.2)"
- */
- private Map tbValues = new HashMap<>();
-
- /**
- * Map from table name to tag values in the same order as creating stable.
- * Used for creating table.
- */
- private Map tbTags = new HashMap<>();
-
- public SQLWriter(int maxBatchSize) {
- this.maxBatchSize = maxBatchSize;
- }
-
-
- /**
- * Get Database Connection
- *
- * @return Connection
- * @throws SQLException
- */
- private static Connection getConnection() throws SQLException {
- String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
- if (jdbcURL == null || jdbcURL == ""){
- jdbcURL = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
- }
- return DriverManager.getConnection(jdbcURL);
- }
-
- /**
- * Create Connection and Statement
- *
- * @throws SQLException
- */
- public void init() throws SQLException {
- conn = getConnection();
- stmt = conn.createStatement();
- stmt.execute("use test");
- }
-
- /**
- * Convert raw data to SQL fragments, group them by table name and cache them in a HashMap.
- * Trigger writing when number of buffered records reached maxBachSize.
- *
- * @param line raw data get from task queue in format: tbName,ts,current,voltage,phase,location,groupId
- */
- public void processLine(String line) throws SQLException {
- bufferedCount += 1;
- int firstComma = line.indexOf(',');
- String tbName = line.substring(0, firstComma);
- int lastComma = line.lastIndexOf(',');
- int secondLastComma = line.lastIndexOf(',', lastComma - 1);
- String value = "(" + line.substring(firstComma + 1, secondLastComma) + ") ";
- if (tbValues.containsKey(tbName)) {
- tbValues.put(tbName, tbValues.get(tbName) + value);
- } else {
- tbValues.put(tbName, value);
- }
- if (!tbTags.containsKey(tbName)) {
- String location = line.substring(secondLastComma + 1, lastComma);
- String groupId = line.substring(lastComma + 1);
- String tagValues = "('" + location + "'," + groupId + ')';
- tbTags.put(tbName, tagValues);
- }
- if (bufferedCount == maxBatchSize) {
- flush();
- }
- }
-
-
- /**
- * Assemble INSERT statement using buffered SQL fragments in Map {@link SQLWriter#tbValues} and execute it.
- * In case of "Table does not exit" exception, create all tables in the sql and retry the sql.
- */
- public void flush() throws SQLException {
- StringBuilder sb = new StringBuilder("INSERT INTO ");
- for (Map.Entry entry : tbValues.entrySet()) {
- String tableName = entry.getKey();
- String values = entry.getValue();
- String q = tableName + " values " + values + " ";
- if (sb.length() + q.length() > maxSQLLength) {
- executeSQL(sb.toString());
- logger.warn("increase maxSQLLength or decrease maxBatchSize to gain better performance");
- sb = new StringBuilder("INSERT INTO ");
- }
- sb.append(q);
- }
- executeSQL(sb.toString());
- tbValues.clear();
- bufferedCount = 0;
- }
-
- private void executeSQL(String sql) throws SQLException {
- try {
- stmt.executeUpdate(sql);
- } catch (SQLException e) {
- // convert to error code defined in taoserror.h
- int errorCode = e.getErrorCode() & 0xffff;
- if (errorCode == 0x2603) {
- // Table does not exist
- createTables();
- executeSQL(sql);
- } else {
- logger.error("Execute SQL: {}", sql);
- throw e;
- }
- } catch (Throwable throwable) {
- logger.error("Execute SQL: {}", sql);
- throw throwable;
- }
- }
-
- /**
- * Create tables in batch using syntax:
- *
- * CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF NOT EXISTS] tb_name2 USING stb_name TAGS (tag_value2, ...) ...;
- *