other: add jdbc consumer demo
This commit is contained in:
parent
92ecdda403
commit
8c00af5c17
|
@ -0,0 +1,70 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.taosdata</groupId>
|
||||
<artifactId>consumer</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>8</maven.compiler.source>
|
||||
<maven.compiler.target>8</maven.compiler.target>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.2.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>30.1.1-jre</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<version>3.3.0</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>ConsumerDemo</id>
|
||||
<configuration>
|
||||
<finalName>ConsumerDemo</finalName>
|
||||
<archive>
|
||||
<manifest>
|
||||
<mainClass>com.taosdata.ConsumerDemo</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
<descriptorRefs>
|
||||
<descriptorRef>jar-with-dependencies</descriptorRef>
|
||||
</descriptorRefs>
|
||||
</configuration>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>8</source>
|
||||
<target>8</target>
|
||||
<encoding>UTF-8</encoding>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,52 @@
|
|||
# How to Run the Consumer Demo Code On Linux OS
|
||||
TDengine's Consumer demo project is organized in a Maven way so that users can easily compile, package and run the project. If you don't have Maven on your server, you may install it using
|
||||
```
|
||||
sudo apt-get install maven
|
||||
```
|
||||
|
||||
## Install TDengine Client
|
||||
Make sure you have already installed a tdengine client on your current develop environment.
|
||||
Download the tdengine package on our website: ``https://www.taosdata.com/cn/all-downloads/`` and install the client.
|
||||
|
||||
## Run Consumer Demo using mvn plugin
|
||||
run command:
|
||||
```
|
||||
mvn clean compile exec:java -Dexec.mainClass="com.taosdata.ConsumerDemo"
|
||||
```
|
||||
|
||||
## Custom configuration
|
||||
```shell
|
||||
# the host of TDengine server
|
||||
export TAOS_HOST="127.0.0.1"
|
||||
|
||||
# the port of TDengine server
|
||||
export TAOS_PORT="6041"
|
||||
|
||||
# the consumer type, can be "ws" or "jni"
|
||||
export TAOS_TYPE="ws"
|
||||
|
||||
# the number of consumers
|
||||
export TAOS_JDBC_CONSUMER_NUM="1"
|
||||
|
||||
# the number of processors to consume
|
||||
export TAOS_JDBC_PROCESSOR_NUM="2"
|
||||
|
||||
# the number of records to be consumed per processor per second
|
||||
export TAOS_JDBC_RATE_PER_PROCESSOR="1000"
|
||||
|
||||
# poll wait time in ms
|
||||
export TAOS_JDBC_POLL_SLEEP="100"
|
||||
```
|
||||
|
||||
## Run Consumer Demo using jar
|
||||
|
||||
To compile the demo project, go to the source directory ``TDengine/tests/examples/JDBC/consumer-demo`` and execute
|
||||
```
|
||||
mvn clean package assembly:single
|
||||
```
|
||||
|
||||
To run ConsumerDemo.jar, go to ``TDengine/tests/examples/JDBC/consumer-demo`` and execute
|
||||
```
|
||||
java -jar target/ConsumerDemo-jar-with-dependencies.jar
|
||||
```
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
package com.taosdata;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
|
||||
public class Bean {
|
||||
private Timestamp ts;
|
||||
private Integer c1;
|
||||
private String c2;
|
||||
|
||||
public Timestamp getTs() {
|
||||
return ts;
|
||||
}
|
||||
|
||||
public void setTs(Timestamp ts) {
|
||||
this.ts = ts;
|
||||
}
|
||||
|
||||
public Integer getC1() {
|
||||
return c1;
|
||||
}
|
||||
|
||||
public void setC1(Integer c1) {
|
||||
this.c1 = c1;
|
||||
}
|
||||
|
||||
public String getC2() {
|
||||
return c2;
|
||||
}
|
||||
|
||||
public void setC2(String c2) {
|
||||
this.c2 = c2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder("Bean {");
|
||||
sb.append("ts=").append(ts);
|
||||
sb.append(", c1=").append(c1);
|
||||
sb.append(", c2='").append(c2).append('\'');
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,6 @@
|
|||
package com.taosdata;
|
||||
|
||||
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
|
||||
|
||||
public class BeanDeserializer extends ReferenceDeserializer<Bean> {
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
package com.taosdata;
|
||||
|
||||
public class Config {
|
||||
public static final String TOPIC = "test_consumer";
|
||||
public static final String TAOS_HOST = "127.0.0.1";
|
||||
public static final String TAOS_PORT = "6041";
|
||||
public static final String TAOS_TYPE = "ws";
|
||||
public static final int TAOS_JDBC_CONSUMER_NUM = 1;
|
||||
public static final int TAOS_JDBC_PROCESSOR_NUM = 2;
|
||||
public static final int TAOS_JDBC_RATE_PER_PROCESSOR = 1000;
|
||||
public static final int TAOS_JDBC_POLL_SLEEP = 100;
|
||||
|
||||
private final int consumerNum;
|
||||
private final int processCapacity;
|
||||
private final int rate;
|
||||
private final int pollSleep;
|
||||
private final String type;
|
||||
private final String host;
|
||||
private final String port;
|
||||
|
||||
public Config(String type, String host, String port, int consumerNum, int processCapacity, int rate, int pollSleep) {
|
||||
this.type = type;
|
||||
this.consumerNum = consumerNum;
|
||||
this.processCapacity = processCapacity;
|
||||
this.rate = rate;
|
||||
this.pollSleep = pollSleep;
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
public int getConsumerNum() {
|
||||
return consumerNum;
|
||||
}
|
||||
|
||||
public int getProcessCapacity() {
|
||||
return processCapacity;
|
||||
}
|
||||
|
||||
public int getRate() {
|
||||
return rate;
|
||||
}
|
||||
|
||||
public int getPollSleep() {
|
||||
return pollSleep;
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
public String getPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public static Config getFromENV() {
|
||||
String host = System.getenv("TAOS_HOST") != null ? System.getenv("TAOS_HOST") : TAOS_HOST;
|
||||
String port = System.getenv("TAOS_PORT") != null ? System.getenv("TAOS_PORT") : TAOS_PORT;
|
||||
String type = System.getenv("TAOS_TYPE") != null ? System.getenv("TAOS_TYPE") : TAOS_TYPE;
|
||||
|
||||
String c = System.getenv("TAOS_JDBC_CONSUMER_NUM");
|
||||
int num = c != null ? Integer.parseInt(c) : TAOS_JDBC_CONSUMER_NUM;
|
||||
|
||||
String p = System.getenv("TAOS_JDBC_PROCESSOR_NUM");
|
||||
int capacity = p != null ? Integer.parseInt(p) : TAOS_JDBC_PROCESSOR_NUM;
|
||||
|
||||
String r = System.getenv("TAOS_JDBC_RATE_PER_PROCESSOR");
|
||||
int rate = r != null ? Integer.parseInt(r) : TAOS_JDBC_RATE_PER_PROCESSOR;
|
||||
|
||||
String s = System.getenv("TAOS_JDBC_POLL_SLEEP");
|
||||
int sleep = s != null ? Integer.parseInt(s) : TAOS_JDBC_POLL_SLEEP;
|
||||
|
||||
return new Config(type, host, port, num, capacity, rate, sleep);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
package com.taosdata;
|
||||
|
||||
import com.taosdata.jdbc.tmq.TMQConstants;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static com.taosdata.Config.*;
|
||||
|
||||
public class ConsumerDemo {
|
||||
public static void main(String[] args) throws SQLException {
|
||||
// Config
|
||||
Config config = Config.getFromENV();
|
||||
// Generated data
|
||||
mockData();
|
||||
|
||||
Properties prop = new Properties();
|
||||
prop.setProperty(TMQConstants.CONNECT_TYPE, config.getType());
|
||||
prop.setProperty(TMQConstants.BOOTSTRAP_SERVERS, config.getHost() + ":" + config.getPort());
|
||||
prop.setProperty(TMQConstants.CONNECT_USER, "root");
|
||||
prop.setProperty(TMQConstants.CONNECT_PASS, "taosdata");
|
||||
prop.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
|
||||
prop.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
|
||||
prop.setProperty(TMQConstants.GROUP_ID, "gId");
|
||||
prop.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.BeanDeserializer");
|
||||
for (int i = 0; i < config.getConsumerNum() - 1; i++) {
|
||||
new Thread(new Worker(prop, config)).start();
|
||||
}
|
||||
new Worker(prop, config).run();
|
||||
}
|
||||
|
||||
public static void mockData() throws SQLException {
|
||||
String dbName = "test_consumer";
|
||||
String tableName = "st";
|
||||
String url = "jdbc:TAOS-RS://" + TAOS_HOST + ":" + TAOS_PORT + "/?user=root&password=taosdata&batchfetch=true";
|
||||
Connection connection = DriverManager.getConnection(url);
|
||||
Statement statement = connection.createStatement();
|
||||
statement.executeUpdate("create database if not exists " + dbName + " WAL_RETENTION_PERIOD 3650");
|
||||
statement.executeUpdate("use " + dbName);
|
||||
statement.executeUpdate("create table if not exists " + tableName + " (ts timestamp, c1 int, c2 nchar(100)) ");
|
||||
statement.executeUpdate("create topic if not exists " + TOPIC + " as select ts, c1, c2 from " + tableName);
|
||||
|
||||
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> {
|
||||
Thread t = new Thread(r);
|
||||
t.setName("mock-data-thread-" + t.getId());
|
||||
return t;
|
||||
});
|
||||
AtomicInteger atomic = new AtomicInteger();
|
||||
scheduledExecutorService.scheduleWithFixedDelay(() -> {
|
||||
int i = atomic.getAndIncrement();
|
||||
try {
|
||||
statement.executeUpdate("insert into " + tableName + " values(now, " + i + ",'" + i + "')");
|
||||
} catch (SQLException e) {
|
||||
// ignore
|
||||
}
|
||||
}, 0, 10, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
package com.taosdata;
|
||||
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
import com.taosdata.jdbc.tmq.ConsumerRecord;
|
||||
import com.taosdata.jdbc.tmq.ConsumerRecords;
|
||||
import com.taosdata.jdbc.tmq.TaosConsumer;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Collections;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
public class Worker implements Runnable {
|
||||
|
||||
int sleepTime;
|
||||
int rate;
|
||||
|
||||
ForkJoinPool pool = new ForkJoinPool();
|
||||
Semaphore semaphore;
|
||||
|
||||
TaosConsumer<Bean> consumer;
|
||||
|
||||
public Worker(Properties prop, Config config) throws SQLException {
|
||||
consumer = new TaosConsumer<>(prop);
|
||||
consumer.subscribe(Collections.singletonList(Config.TOPIC));
|
||||
semaphore = new Semaphore(config.getProcessCapacity());
|
||||
sleepTime = config.getPollSleep();
|
||||
rate = config.getRate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (!Thread.interrupted()) {
|
||||
try {
|
||||
// 控制请求频率
|
||||
if (semaphore.tryAcquire()) {
|
||||
ConsumerRecords<Bean> records = consumer.poll(Duration.ofMillis(sleepTime));
|
||||
pool.submit(() -> {
|
||||
RateLimiter limiter = RateLimiter.create(rate);
|
||||
try {
|
||||
for (ConsumerRecord<Bean> record : records) {
|
||||
// 流量控制
|
||||
limiter.acquire();
|
||||
// 业务处理数据
|
||||
System.out.println("[" + LocalDateTime.now() + "] Thread id:" + Thread.currentThread().getId() + " -> " + record.value());
|
||||
}
|
||||
} finally {
|
||||
semaphore.release();
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue