diff --git a/examples/JDBC/consumer-demo/pom.xml b/examples/JDBC/consumer-demo/pom.xml
new file mode 100644
index 0000000000..aa3cb154e5
--- /dev/null
+++ b/examples/JDBC/consumer-demo/pom.xml
@@ -0,0 +1,70 @@
+
+
+ 4.0.0
+
+ com.taosdata
+ consumer
+ 1.0-SNAPSHOT
+
+
+ 8
+ 8
+
+
+
+
+ com.taosdata.jdbc
+ taos-jdbcdriver
+ 3.2.1
+
+
+ com.google.guava
+ guava
+ 30.1.1-jre
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 3.3.0
+
+
+ ConsumerDemo
+
+ ConsumerDemo
+
+
+ com.taosdata.ConsumerDemo
+
+
+
+ jar-with-dependencies
+
+
+ package
+
+ single
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 8
+ 8
+ UTF-8
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/examples/JDBC/consumer-demo/readme.md b/examples/JDBC/consumer-demo/readme.md
new file mode 100644
index 0000000000..77742ab605
--- /dev/null
+++ b/examples/JDBC/consumer-demo/readme.md
@@ -0,0 +1,52 @@
+# How to Run the Consumer Demo Code On Linux OS
+TDengine's Consumer demo project is organized in a Maven way so that users can easily compile, package and run the project. If you don't have Maven on your server, you may install it using
+```
+sudo apt-get install maven
+```
+
+## Install TDengine Client
+Make sure you have already installed a tdengine client on your current develop environment.
+Download the tdengine package on our website: ``https://www.taosdata.com/cn/all-downloads/`` and install the client.
+
+## Run Consumer Demo using mvn plugin
+run command:
+```
+mvn clean compile exec:java -Dexec.mainClass="com.taosdata.ConsumerDemo"
+```
+
+## Custom configuration
+```shell
+# the host of TDengine server
+export TAOS_HOST="127.0.0.1"
+
+# the port of TDengine server
+export TAOS_PORT="6041"
+
+# the consumer type, can be "ws" or "jni"
+export TAOS_TYPE="ws"
+
+# the number of consumers
+export TAOS_JDBC_CONSUMER_NUM="1"
+
+# the number of processors to consume
+export TAOS_JDBC_PROCESSOR_NUM="2"
+
+# the number of records to be consumed per processor per second
+export TAOS_JDBC_RATE_PER_PROCESSOR="1000"
+
+# poll wait time in ms
+export TAOS_JDBC_POLL_SLEEP="100"
+```
+
+## Run Consumer Demo using jar
+
+To compile the demo project, go to the source directory ``TDengine/tests/examples/JDBC/consumer-demo`` and execute
+```
+mvn clean package assembly:single
+```
+
+To run ConsumerDemo.jar, go to ``TDengine/tests/examples/JDBC/consumer-demo`` and execute
+```
+java -jar target/ConsumerDemo-jar-with-dependencies.jar
+```
+
diff --git a/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Bean.java b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Bean.java
new file mode 100644
index 0000000000..2f2467b371
--- /dev/null
+++ b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Bean.java
@@ -0,0 +1,43 @@
+package com.taosdata;
+
+import java.sql.Timestamp;
+
+public class Bean {
+ private Timestamp ts;
+ private Integer c1;
+ private String c2;
+
+ public Timestamp getTs() {
+ return ts;
+ }
+
+ public void setTs(Timestamp ts) {
+ this.ts = ts;
+ }
+
+ public Integer getC1() {
+ return c1;
+ }
+
+ public void setC1(Integer c1) {
+ this.c1 = c1;
+ }
+
+ public String getC2() {
+ return c2;
+ }
+
+ public void setC2(String c2) {
+ this.c2 = c2;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("Bean {");
+ sb.append("ts=").append(ts);
+ sb.append(", c1=").append(c1);
+ sb.append(", c2='").append(c2).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git a/examples/JDBC/consumer-demo/src/main/java/com/taosdata/BeanDeserializer.java b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/BeanDeserializer.java
new file mode 100644
index 0000000000..478af9e70d
--- /dev/null
+++ b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/BeanDeserializer.java
@@ -0,0 +1,6 @@
+package com.taosdata;
+
+import com.taosdata.jdbc.tmq.ReferenceDeserializer;
+
+public class BeanDeserializer extends ReferenceDeserializer {
+}
diff --git a/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Config.java b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Config.java
new file mode 100644
index 0000000000..08579926e3
--- /dev/null
+++ b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Config.java
@@ -0,0 +1,78 @@
+package com.taosdata;
+
+public class Config {
+ public static final String TOPIC = "test_consumer";
+ public static final String TAOS_HOST = "127.0.0.1";
+ public static final String TAOS_PORT = "6041";
+ public static final String TAOS_TYPE = "ws";
+ public static final int TAOS_JDBC_CONSUMER_NUM = 1;
+ public static final int TAOS_JDBC_PROCESSOR_NUM = 2;
+ public static final int TAOS_JDBC_RATE_PER_PROCESSOR = 1000;
+ public static final int TAOS_JDBC_POLL_SLEEP = 100;
+
+ private final int consumerNum;
+ private final int processCapacity;
+ private final int rate;
+ private final int pollSleep;
+ private final String type;
+ private final String host;
+ private final String port;
+
+ public Config(String type, String host, String port, int consumerNum, int processCapacity, int rate, int pollSleep) {
+ this.type = type;
+ this.consumerNum = consumerNum;
+ this.processCapacity = processCapacity;
+ this.rate = rate;
+ this.pollSleep = pollSleep;
+ this.host = host;
+ this.port = port;
+ }
+
+ public int getConsumerNum() {
+ return consumerNum;
+ }
+
+ public int getProcessCapacity() {
+ return processCapacity;
+ }
+
+ public int getRate() {
+ return rate;
+ }
+
+ public int getPollSleep() {
+ return pollSleep;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getPort() {
+ return port;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public static Config getFromENV() {
+ String host = System.getenv("TAOS_HOST") != null ? System.getenv("TAOS_HOST") : TAOS_HOST;
+ String port = System.getenv("TAOS_PORT") != null ? System.getenv("TAOS_PORT") : TAOS_PORT;
+ String type = System.getenv("TAOS_TYPE") != null ? System.getenv("TAOS_TYPE") : TAOS_TYPE;
+
+ String c = System.getenv("TAOS_JDBC_CONSUMER_NUM");
+ int num = c != null ? Integer.parseInt(c) : TAOS_JDBC_CONSUMER_NUM;
+
+ String p = System.getenv("TAOS_JDBC_PROCESSOR_NUM");
+ int capacity = p != null ? Integer.parseInt(p) : TAOS_JDBC_PROCESSOR_NUM;
+
+ String r = System.getenv("TAOS_JDBC_RATE_PER_PROCESSOR");
+ int rate = r != null ? Integer.parseInt(r) : TAOS_JDBC_RATE_PER_PROCESSOR;
+
+ String s = System.getenv("TAOS_JDBC_POLL_SLEEP");
+ int sleep = s != null ? Integer.parseInt(s) : TAOS_JDBC_POLL_SLEEP;
+
+ return new Config(type, host, port, num, capacity, rate, sleep);
+ }
+}
diff --git a/examples/JDBC/consumer-demo/src/main/java/com/taosdata/ConsumerDemo.java b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/ConsumerDemo.java
new file mode 100644
index 0000000000..7c7719c639
--- /dev/null
+++ b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/ConsumerDemo.java
@@ -0,0 +1,65 @@
+package com.taosdata;
+
+import com.taosdata.jdbc.tmq.TMQConstants;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.taosdata.Config.*;
+
+public class ConsumerDemo {
+ public static void main(String[] args) throws SQLException {
+ // Config
+ Config config = Config.getFromENV();
+ // Generated data
+ mockData();
+
+ Properties prop = new Properties();
+ prop.setProperty(TMQConstants.CONNECT_TYPE, config.getType());
+ prop.setProperty(TMQConstants.BOOTSTRAP_SERVERS, config.getHost() + ":" + config.getPort());
+ prop.setProperty(TMQConstants.CONNECT_USER, "root");
+ prop.setProperty(TMQConstants.CONNECT_PASS, "taosdata");
+ prop.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
+ prop.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
+ prop.setProperty(TMQConstants.GROUP_ID, "gId");
+ prop.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.BeanDeserializer");
+ for (int i = 0; i < config.getConsumerNum() - 1; i++) {
+ new Thread(new Worker(prop, config)).start();
+ }
+ new Worker(prop, config).run();
+ }
+
+ public static void mockData() throws SQLException {
+ String dbName = "test_consumer";
+ String tableName = "st";
+ String url = "jdbc:TAOS-RS://" + TAOS_HOST + ":" + TAOS_PORT + "/?user=root&password=taosdata&batchfetch=true";
+ Connection connection = DriverManager.getConnection(url);
+ Statement statement = connection.createStatement();
+ statement.executeUpdate("create database if not exists " + dbName + " WAL_RETENTION_PERIOD 3650");
+ statement.executeUpdate("use " + dbName);
+ statement.executeUpdate("create table if not exists " + tableName + " (ts timestamp, c1 int, c2 nchar(100)) ");
+ statement.executeUpdate("create topic if not exists " + TOPIC + " as select ts, c1, c2 from " + tableName);
+
+ ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r);
+ t.setName("mock-data-thread-" + t.getId());
+ return t;
+ });
+ AtomicInteger atomic = new AtomicInteger();
+ scheduledExecutorService.scheduleWithFixedDelay(() -> {
+ int i = atomic.getAndIncrement();
+ try {
+ statement.executeUpdate("insert into " + tableName + " values(now, " + i + ",'" + i + "')");
+ } catch (SQLException e) {
+ // ignore
+ }
+ }, 0, 10, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Worker.java b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Worker.java
new file mode 100644
index 0000000000..f6e21cd729
--- /dev/null
+++ b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Worker.java
@@ -0,0 +1,60 @@
+package com.taosdata;
+
+import com.google.common.util.concurrent.RateLimiter;
+import com.taosdata.jdbc.tmq.ConsumerRecord;
+import com.taosdata.jdbc.tmq.ConsumerRecords;
+import com.taosdata.jdbc.tmq.TaosConsumer;
+
+import java.sql.SQLException;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Semaphore;
+
+public class Worker implements Runnable {
+
+ int sleepTime;
+ int rate;
+
+ ForkJoinPool pool = new ForkJoinPool();
+ Semaphore semaphore;
+
+ TaosConsumer consumer;
+
+ public Worker(Properties prop, Config config) throws SQLException {
+ consumer = new TaosConsumer<>(prop);
+ consumer.subscribe(Collections.singletonList(Config.TOPIC));
+ semaphore = new Semaphore(config.getProcessCapacity());
+ sleepTime = config.getPollSleep();
+ rate = config.getRate();
+ }
+
+ @Override
+ public void run() {
+ while (!Thread.interrupted()) {
+ try {
+ // 控制请求频率
+ if (semaphore.tryAcquire()) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(sleepTime));
+ pool.submit(() -> {
+ RateLimiter limiter = RateLimiter.create(rate);
+ try {
+ for (ConsumerRecord record : records) {
+ // 流量控制
+ limiter.acquire();
+ // 业务处理数据
+ System.out.println("[" + LocalDateTime.now() + "] Thread id:" + Thread.currentThread().getId() + " -> " + record.value());
+ }
+ } finally {
+ semaphore.release();
+ }
+ });
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}