This commit is contained in:
sheyanjie-qq 2025-03-21 09:41:10 +08:00
parent a0b3fd296f
commit b38a0a2584
12 changed files with 960 additions and 268 deletions

View File

@ -1,268 +0,0 @@
# 如何在 windows环境下使用jdbc进行TDengine应用开发
本文以windows环境为例介绍java如何进行TDengine开发应用
## 环境准备
1安装jdk
官网下载jdk-1.8下载页面https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
安装配置环境变量把jdk加入到环境变量里。
命令行内查看java的版本。
```shell
>java -version
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)
```
2安装配置maven
官网下载maven下载地址http://maven.apache.org/download.cgi
配置环境变量MAVEN_HOME将MAVEN_HOME/bin添加到PATH
命令行里查看maven的版本
```shell
>mvn --version
Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 2017-04-04T03:39:06+08:00)
Maven home: D:\apache-maven-3.5.0\bin\..
Java version: 1.8.0_131, vendor: Oracle Corporation
Java home: C:\Program Files\Java\jdk1.8.0_131\jre
Default locale: zh_CN, platform encoding: GBK
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"
```
为了加快maven下载依赖的速度可以为maven配置mirror修改MAVEN_HOME\config\settings.xml文件
```xml
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
<!-- 配置本地maven仓库的路径 -->
<localRepository>D:\apache-maven-localRepository</localRepository>
<mirrors>
<!-- 配置阿里云Maven镜像仓库 -->
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
</mirrors>
<profiles>
<!-- 配置jdkmaven会默认使用java1.8 -->
<profile>
<id>jdk-1.8</id>
<activation>
<activeByDefault>true</activeByDefault>
<jdk>1.8</jdk>
</activation>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
</properties>
</profile>
</profiles>
</settings>
```
3在linux服务器上安装TDengine-server
在taosdata官网下载TDengine-server下载地址https://www.taosdata.com/cn/all-downloads/
在linux服务器上安装TDengine-server
```shell
# tar -zxvf package/TDengine-server-2.0.1.1-Linux-x64.tar.gz
# cd TDengine-server/
# ./install.sh
```
启动taosd
```shell
# systemctl start taosd
```
在server上用taos连接taosd
```shell
# taos
taos> show dnodes;
id | end_point | vnodes | cores | status | role | create_time |
==================================================================================================================
1 | td01:6030 | 2 | 4 | ready | any | 2020-08-19 18:40:25.045 |
Query OK, 1 row(s) in set (0.005765s)
```
如果可以正确连接到taosd实例并打印出databases的信息说明TDengine的server已经正确启动。这里查看server的hostname
```shell
# hostname -f
td01
```
注意如果安装TDengine后使用默认的taos.cfg配置文件taosd会使用当前server的hostname创建dnode实例。之后在client也需要使用这个hostname来连接taosd。
4在windows上安装TDengine-client
在taosdata官网下载taos客户端下载地址
https://www.taosdata.com/cn/all-downloads/
下载后双击exe安装。
修改client的hosts文件C:\Windows\System32\drivers\etc\hosts将server的hostname和ip配置到client的hosts文件中
```
192.168.236.136 td01
```
配置完成后在命令行内使用TDengine CLI连接server端
```shell
C:\TDengine>taos -h td01
Welcome to the TDengine shell from Linux, Client Version:2.0.1.1
Copyright (c) 2017 by TAOS Data, Inc. All rights reserved.
taos> show databases;
name | created_time | ntables | vgroups | replica | quorum | days | keep0,keep1,keep(D) | cache(MB) | blocks | minrows | maxrows | wallevel | fsync | comp | precision | status |
===================================================================================================================================================================================================================================================================
test | 2020-08-19 18:43:50.731 | 1 | 1 | 1 | 1 | 2 | 3650,3650,3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | ms | ready |
log | 2020-08-19 18:40:28.064 | 4 | 1 | 1 | 1 | 10 | 30,30,30 | 1 | 3 | 100 | 4096 | 1 | 3000 | 2 | us | ready |
Query OK, 2 row(s) in set (0.068000s)
```
如果windows上的client能够正常连接并打印database信息说明client可以正常连接server了。
## 应用开发
1新建maven工程在pom.xml中引入taos-jdbcdriver依赖。
```xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.demo</groupId>
<artifactId>JdbcDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.8</version>
</dependency>
</dependencies>
</project>
```
2使用jdbc查询TDengine数据库
下面是示例代码:
```java
public class JdbcDemo {
public static void main(String[] args) throws Exception {
Connection conn = getConn();
Statement stmt = conn.createStatement();
// create database
stmt.executeUpdate("create database if not exists db");
// use database
stmt.executeUpdate("use db");
// create table
stmt.executeUpdate("create table if not exists tb (ts timestamp, temperature int, humidity float)");
// insert data
int affectedRows = stmt.executeUpdate("insert into tb values(now, 23, 10.3) (now + 1s, 20, 9.3)");
System.out.println("insert " + affectedRows + " rows.");
// query data
ResultSet resultSet = stmt.executeQuery("select * from tb");
Timestamp ts = null;
int temperature = 0;
float humidity = 0;
while(resultSet.next()){
ts = resultSet.getTimestamp(1);
temperature = resultSet.getInt(2);
humidity = resultSet.getFloat("humidity");
System.out.printf("%s, %d, %s\n", ts, temperature, humidity);
}
}
public static Connection getConn() throws Exception{
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://td01:0/log?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
return conn;
}
}
```
3测试jdbc访问tdengine的sever实例
console输出
```
insert 2 rows.
2020-08-26 00:06:34.575, 23, 10.3
2020-08-26 00:06:35.575, 20, 9.3
```
## 指南
1如何设置主机名和hosts
在server上查看hostname和fqdn
```shell
查看hostname
# hostname
taos-server
查看fqdn
# hostname -f
taos-server
```
windows下hosts文件位于
C:\\Windows\System32\drivers\etc\hosts
修改hosts文件添加server的ip和hostname
```s
192.168.56.101 node5
```
2什么是fqdn
> 什么是FQDN
>
> FQDNFull qualified domain name全限定域名fqdn由2部分组成hostname+domainname。
>
> 例如一个邮件服务器的fqdn可能是mymail.somecollege.edu其中mymail是hostname主机名somcollege.edu是domainname域名。本例中.edu是顶级域名.somecollege是二级域名。
>
> 当连接服务器时必须指定fqdn然后dns服务器通过查看dns表将hostname解析为相应的ip地址。如果只指定hostname不指定domainname应用程序可能服务解析主机名。因为如果你试图访问不在本地的远程服务器时本地的dns服务器和可能没有远程服务器的hostname列表。
>
> 参考https://kb.iu.edu/d/aiuv

View File

@ -0,0 +1,119 @@
package com.taos.example.highvolume;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
class ConsumerTask implements Runnable, Stoppable {
private final static Logger logger = LoggerFactory.getLogger(ConsumerTask.class);
private final int taskId;
private final int writeThreadCount;
private final int batchSizeByRow;
private final int cacheSizeByRow;
private final String dbName;
private volatile boolean active = true;
public ConsumerTask(int taskId,
int writeThradCount,
int batchSizeByRow,
int cacheSizeByRow,
String dbName) {
this.taskId = taskId;
this.writeThreadCount = writeThradCount;
this.batchSizeByRow = batchSizeByRow;
this.cacheSizeByRow = cacheSizeByRow;
this.dbName = dbName;
}
@Override
public void run() {
// 配置 Kafka 消费者的属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Util.getKafkaBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(batchSizeByRow));
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "3000");
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, String.valueOf(2 * 1024 * 1024));
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "15000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<String> topics = Collections.singletonList(Util.getKafkaTopic());
try {
consumer.subscribe(topics);
} catch (Exception e) {
logger.error("Consumer Task {} Error", taskId, e);
return;
}
try (Connection connection = Util.getConnection(batchSizeByRow, cacheSizeByRow, writeThreadCount);
PreparedStatement pstmt = connection.prepareStatement("INSERT INTO " + dbName +".meters (tbname, ts, current, voltage, phase) VALUES (?,?,?,?,?)")) {
long i = 0L;
long lastTimePolled = System.currentTimeMillis();
while (active) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
i++;
Meters meters = Meters.fromString(record.value());
pstmt.setString(1, meters.getTableName());
pstmt.setTimestamp(2, meters.getTs());
pstmt.setFloat(3, meters.getCurrent());
pstmt.setInt(4, meters.getVoltage());
pstmt.setFloat(5, meters.getPhase());
pstmt.addBatch();
if (i % batchSizeByRow == 0) {
pstmt.executeBatch();
}
if (i % (10L * batchSizeByRow) == 0){
//pstmt.executeUpdate();
consumer.commitAsync();
}
}
if (!records.isEmpty()){
lastTimePolled = System.currentTimeMillis();
} else {
if (System.currentTimeMillis() - lastTimePolled > 1000 * 60) {
lastTimePolled = System.currentTimeMillis();
logger.error("Consumer Task {} has been idle for 10 seconds, stopping", taskId);
}
}
}
} catch (Exception e) {
logger.error("Consumer Task {} Error", taskId, e);
} finally {
// 关闭消费者
consumer.close();
}
}
public void stop() {
logger.info("stop");
this.active = false;
}
}

View File

@ -0,0 +1,58 @@
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
class CreateSubTableTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(CreateSubTableTask.class);
private final int taskId;
private final int subTableStartIndex;
private final int subTableEndIndex;
private final String dbName;
public CreateSubTableTask(int taskId,
int subTableStartIndex,
int subTableEndIndex,
String dbName) {
this.taskId = taskId;
this.subTableStartIndex = subTableStartIndex;
this.subTableEndIndex = subTableEndIndex;
this.dbName = dbName;
}
@Override
public void run() {
try (Connection connection = Util.getConnection();
Statement statement = connection.createStatement()){
statement.execute("use " + dbName);
StringBuilder sql = new StringBuilder();
sql.append("create table");
int i = 0;
for (int tableNum = subTableStartIndex; tableNum <= subTableEndIndex; tableNum++) {
sql.append(" if not exists " + Util.getTableNamePrefix() + tableNum + " using meters" + " tags(" + tableNum + ", " + "\"location_" + tableNum + "\"" + ")");
if (i < 1000) {
i++;
} else {
statement.execute(sql.toString());
sql = new StringBuilder();
sql.append("create table");
i = 0;
}
}
if (sql.length() > "create table".length()) {
statement.execute(sql.toString());
}
} catch (SQLException e) {
logger.error("task id {}, failed to create sub table", taskId, e);
}
}
}

View File

@ -0,0 +1,47 @@
package com.taos.example.highvolume;
import java.sql.*;
/**
* Prepare target database.
* Count total records in database periodically so that we can estimate the writing speed.
*/
public class DataBaseMonitor {
private Connection conn;
private Statement stmt;
private String dbName;
public DataBaseMonitor init(String dbName) throws SQLException {
if (conn == null) {
conn = Util.getConnection();
stmt = conn.createStatement();
}
this.dbName = dbName;
return this;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
public void prepareDatabase() throws SQLException {
stmt.execute("DROP DATABASE IF EXISTS " + dbName);
stmt.execute("CREATE DATABASE IF NOT EXISTS " + dbName + " vgroups 20");
stmt.execute("use " + dbName);
stmt.execute("CREATE STABLE " + dbName + ".meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(64))");
}
public long count() throws SQLException {
try (ResultSet result = stmt.executeQuery("SELECT count(*) from " + dbName +".meters")) {
result.next();
return result.getLong(1);
}
}
}

View File

@ -0,0 +1,332 @@
package com.taos.example.highvolume;
import org.apache.commons.cli.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
public class FastWriteExample {
final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
final static DataBaseMonitor databaseMonitor = new DataBaseMonitor();
static ThreadPoolExecutor writerThreads;
static ThreadPoolExecutor producerThreads;
final static ThreadPoolExecutor statThread = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
private final static List<Stoppable> allTasks = new ArrayList<>();
private static int readThreadCount = 5;
private static int writeThreadPerReadThread = 5;
private static int batchSizeByRow = 1000;
private static int cacheSizeByRow = 10000;
private static int subTableNum = 1000000;
private static int rowsPerSubTable = 100;
private static String dbName = "test";
public static void forceStopAll() {
logger.info("shutting down");
for (Stoppable task : allTasks) {
task.stop();
}
if (producerThreads != null) {
producerThreads.shutdown();
}
if (writerThreads != null) {
writerThreads.shutdown();
}
statThread.shutdown();
}
private static void createSubTables(){
writerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, getNamedThreadFactory("FW-CreateSubTable-thread-"));
int range = (subTableNum + readThreadCount - 1) / readThreadCount;
for (int i = 0; i < readThreadCount; i++) {
int startIndex = i * range;
int endIndex;
if (i == readThreadCount - 1) {
endIndex = subTableNum - 1;
} else {
endIndex = startIndex + range - 1;
}
logger.debug("create sub table task {} {} {}", i, startIndex, endIndex);
CreateSubTableTask createSubTableTask = new CreateSubTableTask(i,
startIndex,
endIndex,
dbName);
writerThreads.submit(createSubTableTask);
}
logger.info("create sub table task started.");
while (writerThreads.getActiveCount() != 0) {
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
logger.info("create sub table task finished.");
}
public static void startStatTask() throws SQLException {
StatTask statTask = new StatTask(dbName, subTableNum);
allTasks.add(statTask);
statThread.submit(statTask);
}
public static ThreadFactory getNamedThreadFactory(String namePrefix) {
return new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, namePrefix + threadNumber.getAndIncrement());
}
};
}
private static void invokeKafkaDemo() throws SQLException {
producerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, getNamedThreadFactory("FW-kafka-producer-thread-"));
writerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, getNamedThreadFactory("FW-kafka-consumer-thread-"));
int range = (subTableNum + readThreadCount - 1) / readThreadCount;
for (int i = 0; i < readThreadCount; i++) {
int startIndex = i * range;
int endIndex;
if (i == readThreadCount - 1) {
endIndex = subTableNum - 1;
} else {
endIndex = startIndex + range - 1;
}
ProducerTask producerTask = new ProducerTask(i,
rowsPerSubTable,
startIndex,
endIndex);
allTasks.add(producerTask);
producerThreads.submit(producerTask);
ConsumerTask consumerTask = new ConsumerTask(i,
writeThreadPerReadThread,
batchSizeByRow,
cacheSizeByRow,
dbName);
allTasks.add(consumerTask);
writerThreads.submit(consumerTask);
}
startStatTask();
Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::forceStopAll));
while (writerThreads.getActiveCount() != 0) {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
private static void invokeMockDataDemo() throws SQLException {
ThreadFactory namedThreadFactory = new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = "FW-work-thread-";
@Override
public Thread newThread(Runnable r) {
return new Thread(r, namePrefix + threadNumber.getAndIncrement());
}
};
writerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, namedThreadFactory);
int range = (subTableNum + readThreadCount - 1) / readThreadCount;
for (int i = 0; i < readThreadCount; i++) {
int startIndex = i * range;
int endIndex;
if (i == readThreadCount - 1) {
endIndex = subTableNum - 1;
} else {
endIndex = startIndex + range - 1;
}
WorkTask task = new WorkTask(i,
writeThreadPerReadThread,
batchSizeByRow,
cacheSizeByRow,
rowsPerSubTable,
startIndex,
endIndex,
dbName);
allTasks.add(task);
writerThreads.submit(task);
}
startStatTask();
Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::forceStopAll));
while (writerThreads.getActiveCount() != 0) {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
// 打印帮助信息的方法
private static void printHelp(Options options) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("java -jar highVolume.jar", options);
System.out.println();
}
public static void main(String[] args) throws SQLException, InterruptedException {
Thread.sleep(10 * 1000);
Options options = new Options();
Option readThdcountOption = new Option("r", "readThreadCount", true, "Specify the readThreadCount, default is 5");
readThdcountOption.setRequired(false);
options.addOption(readThdcountOption);
Option writeThdcountOption = new Option("w", "writeThreadPerReadThread", true, "Specify the writeThreadPerReadThread, default is 5");
writeThdcountOption.setRequired(false);
options.addOption(writeThdcountOption);
Option batchSizeOption = new Option("b", "batchSizeByRow", true, "Specify the batchSizeByRow, default is 1000");
batchSizeOption.setRequired(false);
options.addOption(batchSizeOption);
Option cacheSizeOption = new Option("c", "cacheSizeByRow", true, "Specify the cacheSizeByRow, default is 10000");
cacheSizeOption.setRequired(false);
options.addOption(cacheSizeOption);
Option subTablesOption = new Option("s", "subTableNum", true, "Specify the subTableNum, default is 1000000");
subTablesOption.setRequired(false);
options.addOption(subTablesOption);
Option rowsPerTableOption = new Option("R", "rowsPerSubTable", true, "Specify the rowsPerSubTable, default is 100");
rowsPerTableOption.setRequired(false);
options.addOption(rowsPerTableOption);
Option dbNameOption = new Option("d", "dbName", true, "Specify the database name, default is test");
dbNameOption.setRequired(false);
options.addOption(dbNameOption);
Option kafkaOption = new Option("K", "useKafka", false, "use kafka demo to test");
kafkaOption.setRequired(false);
options.addOption(kafkaOption);
CommandLineParser parser = new DefaultParser();
CommandLine cmd;
try {
cmd = parser.parse(options, args);
} catch (ParseException e) {
System.out.println(e.getMessage());
printHelp(options);
System.exit(1);
return;
}
// 检查是否请求了帮助信息
if (cmd.hasOption("help")) {
printHelp(options);
return;
}
if (cmd.getOptionValue("readThreadCount") != null) {
readThreadCount = Integer.parseInt(cmd.getOptionValue("readThreadCount"));
if (readThreadCount <= 0){
logger.error("readThreadCount must be greater than 0");
return;
}
}
if (cmd.getOptionValue("writeThreadPerReadThread") != null) {
writeThreadPerReadThread = Integer.parseInt(cmd.getOptionValue("writeThreadPerReadThread"));
if (writeThreadPerReadThread <= 0){
logger.error("writeThreadPerReadThread must be greater than 0");
return;
}
}
if (cmd.getOptionValue("batchSizeByRow") != null) {
batchSizeByRow = Integer.parseInt(cmd.getOptionValue("batchSizeByRow"));
if (batchSizeByRow <= 0){
logger.error("batchSizeByRow must be greater than 0");
return;
}
}
if (cmd.getOptionValue("cacheSizeByRow") != null) {
cacheSizeByRow = Integer.parseInt(cmd.getOptionValue("cacheSizeByRow"));
if (cacheSizeByRow <= 0){
logger.error("cacheSizeByRow must be greater than 0");
return;
}
}
if (cmd.getOptionValue("subTableNum") != null) {
subTableNum = Integer.parseInt(cmd.getOptionValue("subTableNum"));
if (subTableNum <= 0){
logger.error("subTableNum must be greater than 0");
return;
}
}
if (cmd.getOptionValue("rowsPerSubTable") != null) {
rowsPerSubTable = Integer.parseInt(cmd.getOptionValue("rowsPerSubTable"));
if (rowsPerSubTable <= 0){
logger.error("rowsPerSubTable must be greater than 0");
return;
}
}
if (cmd.getOptionValue("dbName") != null) {
dbName = cmd.getOptionValue("dbName");
}
logger.info("readThreadCount={}, writeThreadPerReadThread={} batchSizeByRow={} cacheSizeByRow={}, subTableNum={}, rowsPerSubTable={}",
readThreadCount, writeThreadPerReadThread, batchSizeByRow, cacheSizeByRow, subTableNum, rowsPerSubTable);
logger.info("create database begin.");
databaseMonitor.init(dbName).prepareDatabase();
databaseMonitor.close();
logger.info("create database end.");
logger.info("create sub tables start.");
createSubTables();
logger.info("create sub tables end.");
if (cmd.hasOption("K")) {
Util.createKafkaTopic();
// use kafka demo
invokeKafkaDemo();
} else {
// use mock data source demo
invokeMockDataDemo();
}
}
}

View File

@ -0,0 +1,76 @@
package com.taos.example.highvolume;
import java.sql.Timestamp;
public class Meters {
String tableName;
Timestamp ts;
float current;
int voltage;
float phase;
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public float getCurrent() {
return current;
}
public void setCurrent(float current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public float getPhase() {
return phase;
}
public void setPhase(float phase) {
this.phase = phase;
}
@Override
// this is just a demo, so we don't need to implement the full CSV parser
public String toString() {
return tableName + "," +
ts.toString() + "," +
current + "," +
voltage + "," +
phase;
}
public static Meters fromString(String str) {
String[] parts = str.split(",");
if (parts.length != 5) {
throw new IllegalArgumentException("Invalid input format");
}
Meters meters = new Meters();
meters.setTableName(parts[0]);
meters.setTs(Timestamp.valueOf(parts[1]));
meters.setCurrent(Float.parseFloat(parts[2]));
meters.setVoltage(Integer.parseInt(parts[3]));
meters.setPhase(Float.parseFloat(parts[4]));
return meters;
}
}

View File

@ -0,0 +1,54 @@
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
/**
* Generate test data
*/
class MockDataSource implements Iterator<Meters> {
private final static Logger logger = LoggerFactory.getLogger(WorkTask.class);
private final int tableStartIndex;
private final int tableEndIndex;
private final long maxRowsPerTable;
long currentMs = System.currentTimeMillis();
private int index = 0;
// mock values
public MockDataSource(int tableStartIndex, int tableEndIndex, int maxRowsPerTable) {
this.tableStartIndex = tableStartIndex;
this.tableEndIndex = tableEndIndex;
this.maxRowsPerTable = maxRowsPerTable;
}
@Override
public boolean hasNext() {
return index < (tableEndIndex - tableStartIndex + 1) * maxRowsPerTable;
}
@Override
public Meters next() {
// use interlace rows one to simulate the data distribution in real world
if (index % (tableEndIndex - tableStartIndex + 1) == 0) {
currentMs += 1000;
}
long currentTbId = index % (tableEndIndex - tableStartIndex + 1) + tableStartIndex;
Meters meters = new Meters();
meters.setTableName(Util.getTableNamePrefix() + currentTbId);
meters.setTs(new java.sql.Timestamp(currentMs));
meters.setCurrent((float) (Math.random() * 100));
meters.setVoltage((int) (Math.random() * 100));
meters.setPhase((float) (Math.random() * 100));
index ++;
return meters;
}
}

View File

@ -0,0 +1,67 @@
package com.taos.example.highvolume;
import com.taosdata.jdbc.utils.ReqId;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.Properties;
class ProducerTask implements Runnable, Stoppable {
private final static Logger logger = LoggerFactory.getLogger(ProducerTask.class);
private final int taskId;
private final int subTableStartIndex;
private final int subTableEndIndex;
private final int rowsPerTable;
private volatile boolean active = true;
public ProducerTask(int taskId,
int rowsPerTable,
int subTableStartIndex,
int subTableEndIndex) {
this.taskId = taskId;
this.subTableStartIndex = subTableStartIndex;
this.subTableEndIndex = subTableEndIndex;
this.rowsPerTable = rowsPerTable;
}
@Override
public void run() {
logger.info("started");
Iterator<Meters> it = new MockDataSource(subTableStartIndex, subTableEndIndex, rowsPerTable);
Properties props = new Properties();
props.put("bootstrap.servers", Util.getKafkaBootstrapServers());
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("batch.size", 1024 * 1024);
props.put("linger.ms", 500);
// create a Kafka producer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
while (it.hasNext() && active) {
Meters meters = it.next();
String key = meters.getTableName();
String value = meters.toString();
// to avoid the data of the sub-table out of order. we use the partition key to ensure the data of the same sub-table is sent to the same partition.
// Because efficient writing use String hashcodehere we use another hash algorithm to calculate the partition key.
long hashCode = Math.abs(ReqId.murmurHash32(key.getBytes(), 0));
ProducerRecord<String, String> record = new ProducerRecord<>(Util.getKafkaTopic(), (int)(hashCode % Util.getPartitionCount()), key, value);
producer.send(record);
}
} catch (Exception e) {
logger.error("task id {}, send message error: ", taskId, e);
}
finally {
producer.close();
}
}
public void stop() {
logger.info("stop");
this.active = false;
}
}

View File

@ -0,0 +1,46 @@
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
class StatTask implements Runnable, Stoppable {
private final static Logger logger = LoggerFactory.getLogger(StatTask.class);
private final DataBaseMonitor databaseMonitor;
private final int subTableNum;
private volatile boolean active = true;
public StatTask(String dbName,
int subTableNum) throws SQLException {
this.databaseMonitor = new DataBaseMonitor().init(dbName);
this.subTableNum = subTableNum;
}
@Override
public void run() {
long lastCount = 0;
while (active) {
try {
Thread.sleep(10000);
long count = databaseMonitor.count();
logger.info("numberOfTable={} count={} speed={}", subTableNum, count, (count - lastCount) / 10);
lastCount = count;
} catch (InterruptedException e) {
logger.error("interrupted", e);
break;
} catch (SQLException e) {
logger.error("execute sql error: ", e);
break;
}
}
databaseMonitor.close();
}
public void stop() {
active = false;
}
}

View File

@ -0,0 +1,5 @@
package com.taos.example.highvolume;
public interface Stoppable {
void stop();
}

View File

@ -0,0 +1,87 @@
package com.taos.example.highvolume;
import com.taosdata.jdbc.TSDBDriver;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import org.apache.kafka.clients.admin.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ExecutionException;
public class Util {
private final static Logger logger = LoggerFactory.getLogger(Util.class);
public static String getTableNamePrefix() {
return "d_";
}
public static Connection getConnection() throws SQLException {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
if (jdbcURL == null || jdbcURL == "") {
jdbcURL = "jdbc:TAOS-WS://localhost:6041/?user=root&password=taosdata";
}
return DriverManager.getConnection(jdbcURL);
}
public static Connection getConnection(int batchSize, int cacheSize, int writeThreadNum) throws SQLException {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
if (jdbcURL == null || jdbcURL == "") {
jdbcURL = "jdbc:TAOS-WS://localhost:6041/?user=root&password=taosdata";
}
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_ASYNC_WRITE, "stmt");
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_SIZE_BY_ROW, String.valueOf(batchSize));
properties.setProperty(TSDBDriver.PROPERTY_KEY_CACHE_SIZE_BY_ROW, String.valueOf(cacheSize));
properties.setProperty(TSDBDriver.PROPERTY_KEY_BACKEND_WRITE_THREAD_NUM, String.valueOf(writeThreadNum));
properties.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
return DriverManager.getConnection(jdbcURL, properties);
}
public static String getKafkaBootstrapServers() {
String kafkaBootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
if (kafkaBootstrapServers == null || kafkaBootstrapServers == "") {
kafkaBootstrapServers = "localhost:9092";
}
return kafkaBootstrapServers;
}
public static String getKafkaTopic() {
return "test-meters-topic";
}
public static void createKafkaTopic() {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers());
try (AdminClient adminClient = AdminClient.create(config)) {
String topicName = getKafkaTopic();
int numPartitions = getPartitionCount();
short replicationFactor = 1;
ListTopicsResult topics = adminClient.listTopics();
Set<String> existingTopics = topics.names().get();
if (!existingTopics.contains(topicName)) {
NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
createTopicsResult.all().get(); // 等待创建完成
logger.info("Topic " + topicName + " created successfully.");
}
} catch (InterruptedException | ExecutionException e) {
logger.error("Failed to delete/create topic: " + e.getMessage());
throw new RuntimeException(e);
}
}
public static int getPartitionCount() {
return 5;
}
}

View File

@ -0,0 +1,69 @@
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.Iterator;
class WorkTask implements Runnable, Stoppable {
private final static Logger logger = LoggerFactory.getLogger(WorkTask.class);
private final int taskId;
private final int writeThreadCount;
private final int batchSizeByRow;
private final int cacheSizeByRow;
private final int rowsPerTable;
private final int subTableStartIndex;
private final int subTableEndIndex;
private final String dbName;
private volatile boolean active = true;
public WorkTask(int taskId,
int writeThradCount,
int batchSizeByRow,
int cacheSizeByRow,
int rowsPerTable,
int subTableStartIndex,
int subTableEndIndex,
String dbName) {
this.taskId = taskId;
this.writeThreadCount = writeThradCount;
this.batchSizeByRow = batchSizeByRow;
this.cacheSizeByRow = cacheSizeByRow;
this.rowsPerTable = rowsPerTable;
this.subTableStartIndex = subTableStartIndex;
this.subTableEndIndex = subTableEndIndex;
this.dbName = dbName;
}
@Override
public void run() {
logger.info("started");
Iterator<Meters> it = new MockDataSource(subTableStartIndex, subTableEndIndex, rowsPerTable);
try (Connection connection = Util.getConnection(batchSizeByRow, cacheSizeByRow, writeThreadCount);
PreparedStatement pstmt = connection.prepareStatement("INSERT INTO " + dbName +".meters (tbname, ts, current, voltage, phase) VALUES (?,?,?,?,?)")) {
long i = 0L;
while (it.hasNext() && active) {
i++;
Meters meters = it.next();
pstmt.setString(1, meters.getTableName());
pstmt.setTimestamp(2, meters.getTs());
pstmt.setFloat(3, meters.getCurrent());
pstmt.setInt(4, meters.getVoltage());
pstmt.setFloat(5, meters.getPhase());
pstmt.addBatch();
if (i % batchSizeByRow == 0) {
pstmt.executeBatch();
}
}
} catch (Exception e) {
logger.error("Work Task {} Error", taskId, e);
}
}
public void stop() {
logger.info("stop");
this.active = false;
}
}