move highVolume jdbc to docs/examples/JDBC

This commit is contained in:
sheyanjie-qq 2025-03-18 16:49:33 +08:00
parent 6aa3b254e6
commit a0b3fd296f
11 changed files with 417 additions and 498 deletions

View File

@ -0,0 +1,268 @@
# 如何在 windows环境下使用jdbc进行TDengine应用开发
本文以windows环境为例介绍java如何进行TDengine开发应用
## 环境准备
1安装jdk
官网下载jdk-1.8下载页面https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
安装配置环境变量把jdk加入到环境变量里。
命令行内查看java的版本。
```shell
>java -version
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)
```
2安装配置maven
官网下载maven下载地址http://maven.apache.org/download.cgi
配置环境变量MAVEN_HOME将MAVEN_HOME/bin添加到PATH
命令行里查看maven的版本
```shell
>mvn --version
Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 2017-04-04T03:39:06+08:00)
Maven home: D:\apache-maven-3.5.0\bin\..
Java version: 1.8.0_131, vendor: Oracle Corporation
Java home: C:\Program Files\Java\jdk1.8.0_131\jre
Default locale: zh_CN, platform encoding: GBK
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"
```
为了加快maven下载依赖的速度可以为maven配置mirror修改MAVEN_HOME\config\settings.xml文件
```xml
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
<!-- 配置本地maven仓库的路径 -->
<localRepository>D:\apache-maven-localRepository</localRepository>
<mirrors>
<!-- 配置阿里云Maven镜像仓库 -->
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
</mirrors>
<profiles>
<!-- 配置jdkmaven会默认使用java1.8 -->
<profile>
<id>jdk-1.8</id>
<activation>
<activeByDefault>true</activeByDefault>
<jdk>1.8</jdk>
</activation>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
</properties>
</profile>
</profiles>
</settings>
```
3在linux服务器上安装TDengine-server
在taosdata官网下载TDengine-server下载地址https://www.taosdata.com/cn/all-downloads/
在linux服务器上安装TDengine-server
```shell
# tar -zxvf package/TDengine-server-2.0.1.1-Linux-x64.tar.gz
# cd TDengine-server/
# ./install.sh
```
启动taosd
```shell
# systemctl start taosd
```
在server上用taos连接taosd
```shell
# taos
taos> show dnodes;
id | end_point | vnodes | cores | status | role | create_time |
==================================================================================================================
1 | td01:6030 | 2 | 4 | ready | any | 2020-08-19 18:40:25.045 |
Query OK, 1 row(s) in set (0.005765s)
```
如果可以正确连接到taosd实例并打印出databases的信息说明TDengine的server已经正确启动。这里查看server的hostname
```shell
# hostname -f
td01
```
注意如果安装TDengine后使用默认的taos.cfg配置文件taosd会使用当前server的hostname创建dnode实例。之后在client也需要使用这个hostname来连接taosd。
4在windows上安装TDengine-client
在taosdata官网下载taos客户端下载地址
https://www.taosdata.com/cn/all-downloads/
下载后双击exe安装。
修改client的hosts文件C:\Windows\System32\drivers\etc\hosts将server的hostname和ip配置到client的hosts文件中
```
192.168.236.136 td01
```
配置完成后在命令行内使用TDengine CLI连接server端
```shell
C:\TDengine>taos -h td01
Welcome to the TDengine shell from Linux, Client Version:2.0.1.1
Copyright (c) 2017 by TAOS Data, Inc. All rights reserved.
taos> show databases;
name | created_time | ntables | vgroups | replica | quorum | days | keep0,keep1,keep(D) | cache(MB) | blocks | minrows | maxrows | wallevel | fsync | comp | precision | status |
===================================================================================================================================================================================================================================================================
test | 2020-08-19 18:43:50.731 | 1 | 1 | 1 | 1 | 2 | 3650,3650,3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | ms | ready |
log | 2020-08-19 18:40:28.064 | 4 | 1 | 1 | 1 | 10 | 30,30,30 | 1 | 3 | 100 | 4096 | 1 | 3000 | 2 | us | ready |
Query OK, 2 row(s) in set (0.068000s)
```
如果windows上的client能够正常连接并打印database信息说明client可以正常连接server了。
## 应用开发
1新建maven工程在pom.xml中引入taos-jdbcdriver依赖。
```xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.demo</groupId>
<artifactId>JdbcDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.8</version>
</dependency>
</dependencies>
</project>
```
2使用jdbc查询TDengine数据库
下面是示例代码:
```java
public class JdbcDemo {
public static void main(String[] args) throws Exception {
Connection conn = getConn();
Statement stmt = conn.createStatement();
// create database
stmt.executeUpdate("create database if not exists db");
// use database
stmt.executeUpdate("use db");
// create table
stmt.executeUpdate("create table if not exists tb (ts timestamp, temperature int, humidity float)");
// insert data
int affectedRows = stmt.executeUpdate("insert into tb values(now, 23, 10.3) (now + 1s, 20, 9.3)");
System.out.println("insert " + affectedRows + " rows.");
// query data
ResultSet resultSet = stmt.executeQuery("select * from tb");
Timestamp ts = null;
int temperature = 0;
float humidity = 0;
while(resultSet.next()){
ts = resultSet.getTimestamp(1);
temperature = resultSet.getInt(2);
humidity = resultSet.getFloat("humidity");
System.out.printf("%s, %d, %s\n", ts, temperature, humidity);
}
}
public static Connection getConn() throws Exception{
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://td01:0/log?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
return conn;
}
}
```
3测试jdbc访问tdengine的sever实例
console输出
```
insert 2 rows.
2020-08-26 00:06:34.575, 23, 10.3
2020-08-26 00:06:35.575, 20, 9.3
```
## 指南
1如何设置主机名和hosts
在server上查看hostname和fqdn
```shell
查看hostname
# hostname
taos-server
查看fqdn
# hostname -f
taos-server
```
windows下hosts文件位于
C:\\Windows\System32\drivers\etc\hosts
修改hosts文件添加server的ip和hostname
```s
192.168.56.101 node5
```
2什么是fqdn
> 什么是FQDN
>
> FQDNFull qualified domain name全限定域名fqdn由2部分组成hostname+domainname。
>
> 例如一个邮件服务器的fqdn可能是mymail.somecollege.edu其中mymail是hostname主机名somcollege.edu是domainname域名。本例中.edu是顶级域名.somecollege是二级域名。
>
> 当连接服务器时必须指定fqdn然后dns服务器通过查看dns表将hostname解析为相应的ip地址。如果只指定hostname不指定domainname应用程序可能服务解析主机名。因为如果你试图访问不在本地的远程服务器时本地的dns服务器和可能没有远程服务器的hostname列表。
>
> 参考https://kb.iu.edu/d/aiuv

View File

@ -0,0 +1,91 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>highVolume</artifactId>
<version>SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.assembly.dir>src/main/resources/assembly</project.assembly.dir>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>highVolume</id>
<configuration>
<finalName>highVolume</finalName>
<appendAssemblyId>false</appendAssemblyId>
<archive>
<manifest>
<mainClass>com.taos.example.highvolume.FastWriteExample</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,31 @@
# How to Run the JDBC Demo Code On Linux OS
TDengine's JDBC demo project is organized in a Maven way so that users can easily compile, package and run the project. If you don't have Maven on your server, you may install it using
```
sudo apt-get install maven
```
## Install TDengine Client
Make sure you have already installed a tdengine client on your current develop environment.
Download the tdengine package on our website: ``https://www.taosdata.com/cn/all-downloads/`` and install the client.
## Run jdbcDemo using mvn plugin
run command:
```
mvn clean compile exec:java -Dexec.mainClass="com.taosdata.example.JdbcDemo"
```
and run with your customed args
```
mvn clean compile exec:java -Dexec.mainClass="com.taosdata.example.JdbcDemo" -Dexec.args="-host [HOSTNAME]"
```
## Compile the Demo Code and Run It
```
mvn clean package -Dmaven.test.skip=true
```
To run JDBCDemo.jar, execute
```
java -jar target/JDBCDemo-SNAPSHOT-jar-with-dependencies.jar -host [HOSTNAME]
```

View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="pattern" value="[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} %c %M %L %thread %m%n"/>
<property name="logDir" value="/var/log/taos"/>
<appender name="fileAppender" class="ch.qos.logback.core.FileAppender">
<file>${logDir}/high-volume.log</file>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${pattern}</pattern>
</encoder>
</appender>
<appender name="consoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<target>
System.out
</target>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${pattern}</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="consoleAppender"/>
<appender-ref ref="fileAppender"/>
</root>
</configuration>

View File

@ -1,55 +0,0 @@
package com.taos.example.highvolume;
import java.sql.*;
/**
* Prepare target database.
* Count total records in database periodically so that we can estimate the writing speed.
*/
public class DataBaseMonitor {
private Connection conn;
private Statement stmt;
public DataBaseMonitor init() throws SQLException {
if (conn == null) {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
if (jdbcURL == null || jdbcURL == ""){
jdbcURL = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
}
conn = DriverManager.getConnection(jdbcURL);
stmt = conn.createStatement();
}
return this;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
public void prepareDatabase() throws SQLException {
stmt.execute("DROP DATABASE IF EXISTS test");
stmt.execute("CREATE DATABASE test");
stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
}
public long count() throws SQLException {
try (ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters")) {
result.next();
return result.getLong(1);
}
}
public long getTableCount() throws SQLException {
try (ResultSet result = stmt.executeQuery("select count(*) from information_schema.ins_tables where db_name = 'test';")) {
result.next();
return result.getLong(1);
}
}
}

View File

@ -1,70 +0,0 @@
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class FastWriteExample {
final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
final static int taskQueueCapacity = 1000000;
final static List<BlockingQueue<String>> taskQueues = new ArrayList<>();
final static List<ReadTask> readTasks = new ArrayList<>();
final static List<WriteTask> writeTasks = new ArrayList<>();
final static DataBaseMonitor databaseMonitor = new DataBaseMonitor();
public static void stopAll() {
logger.info("shutting down");
readTasks.forEach(task -> task.stop());
writeTasks.forEach(task -> task.stop());
databaseMonitor.close();
}
public static void main(String[] args) throws InterruptedException, SQLException {
int readTaskCount = args.length > 0 ? Integer.parseInt(args[0]) : 1;
int writeTaskCount = args.length > 1 ? Integer.parseInt(args[1]) : 3;
int tableCount = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
int maxBatchSize = args.length > 3 ? Integer.parseInt(args[3]) : 3000;
logger.info("readTaskCount={}, writeTaskCount={} tableCount={} maxBatchSize={}",
readTaskCount, writeTaskCount, tableCount, maxBatchSize);
databaseMonitor.init().prepareDatabase();
// Create task queues, whiting tasks and start writing threads.
for (int i = 0; i < writeTaskCount; ++i) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(taskQueueCapacity);
taskQueues.add(queue);
WriteTask task = new WriteTask(queue, maxBatchSize);
Thread t = new Thread(task);
t.setName("WriteThread-" + i);
t.start();
}
// create reading tasks and start reading threads
int tableCountPerTask = tableCount / readTaskCount;
for (int i = 0; i < readTaskCount; ++i) {
ReadTask task = new ReadTask(i, taskQueues, tableCountPerTask);
Thread t = new Thread(task);
t.setName("ReadThread-" + i);
t.start();
}
Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::stopAll));
long lastCount = 0;
while (true) {
Thread.sleep(10000);
long numberOfTable = databaseMonitor.getTableCount();
long count = databaseMonitor.count();
logger.info("numberOfTable={} count={} speed={}", numberOfTable, count, (count - lastCount) / 10);
lastCount = count;
}
}
}

View File

@ -1,53 +0,0 @@
package com.taos.example.highvolume;
import java.util.Iterator;
/**
* Generate test data
*/
class MockDataSource implements Iterator {
private String tbNamePrefix;
private int tableCount;
private long maxRowsPerTable = 1000000000L;
// 100 milliseconds between two neighbouring rows.
long startMs = System.currentTimeMillis() - maxRowsPerTable * 100;
private int currentRow = 0;
private int currentTbId = -1;
// mock values
String[] location = {"California.LosAngeles", "California.SanDiego", "California.SanJose", "California.Campbell", "California.SanFrancisco"};
float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f};
int[] voltage = {119, 116, 111, 113, 118};
float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f};
public MockDataSource(String tbNamePrefix, int tableCount) {
this.tbNamePrefix = tbNamePrefix;
this.tableCount = tableCount;
}
@Override
public boolean hasNext() {
currentTbId += 1;
if (currentTbId == tableCount) {
currentTbId = 0;
currentRow += 1;
}
return currentRow < maxRowsPerTable;
}
@Override
public String next() {
long ts = startMs + 100 * currentRow;
int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1;
StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName
sb.append(ts).append(','); // ts
sb.append(current[currentRow % 5]).append(','); // current
sb.append(voltage[currentRow % 5]).append(','); // voltage
sb.append(phase[currentRow % 5]).append(','); // phase
sb.append(location[currentRow % 5]).append(','); // location
sb.append(groupId); // groupID
return sb.toString();
}
}

View File

@ -1,58 +0,0 @@
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
class ReadTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(ReadTask.class);
private final int taskId;
private final List<BlockingQueue<String>> taskQueues;
private final int queueCount;
private final int tableCount;
private boolean active = true;
public ReadTask(int readTaskId, List<BlockingQueue<String>> queues, int tableCount) {
this.taskId = readTaskId;
this.taskQueues = queues;
this.queueCount = queues.size();
this.tableCount = tableCount;
}
/**
* Assign data received to different queues.
* Here we use the suffix number in table name.
* You are expected to define your own rule in practice.
*
* @param line record received
* @return which queue to use
*/
public int getQueueId(String line) {
String tbName = line.substring(0, line.indexOf(',')); // For example: tb1_101
String suffixNumber = tbName.split("_")[1];
return Integer.parseInt(suffixNumber) % this.queueCount;
}
@Override
public void run() {
logger.info("started");
Iterator<String> it = new MockDataSource("tb" + this.taskId, tableCount);
try {
while (it.hasNext() && active) {
String line = it.next();
int queueId = getQueueId(line);
taskQueues.get(queueId).put(line);
}
} catch (Exception e) {
logger.error("Read Task Error", e);
}
}
public void stop() {
logger.info("stop");
this.active = false;
}
}

View File

@ -1,200 +0,0 @@
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.HashMap;
import java.util.Map;
/**
* A helper class encapsulate the logic of writing using SQL.
* <p>
* The main interfaces are two methods:
* <ol>
* <li>{@link SQLWriter#processLine}, which receive raw lines from WriteTask and group them by table names.</li>
* <li>{@link SQLWriter#flush}, which assemble INSERT statement and execute it.</li>
* </ol>
* <p>
* There is a technical skill worth mentioning: we create table as needed when "table does not exist" error occur instead of creating table automatically using syntax "INSET INTO tb USING stb".
* This ensure that checking table existence is a one-time-only operation.
* </p>
*
* </p>
*/
public class SQLWriter {
final static Logger logger = LoggerFactory.getLogger(SQLWriter.class);
private Connection conn;
private Statement stmt;
/**
* current number of buffered records
*/
private int bufferedCount = 0;
/**
* Maximum number of buffered records.
* Flush action will be triggered if bufferedCount reached this value,
*/
private int maxBatchSize;
/**
* Maximum SQL length.
*/
private int maxSQLLength = 800_000;
/**
* Map from table name to column values. For example:
* "tb001" -> "(1648432611249,2.1,114,0.09) (1648432611250,2.2,135,0.2)"
*/
private Map<String, String> tbValues = new HashMap<>();
/**
* Map from table name to tag values in the same order as creating stable.
* Used for creating table.
*/
private Map<String, String> tbTags = new HashMap<>();
public SQLWriter(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}
/**
* Get Database Connection
*
* @return Connection
* @throws SQLException
*/
private static Connection getConnection() throws SQLException {
String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
if (jdbcURL == null || jdbcURL == ""){
jdbcURL = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
}
return DriverManager.getConnection(jdbcURL);
}
/**
* Create Connection and Statement
*
* @throws SQLException
*/
public void init() throws SQLException {
conn = getConnection();
stmt = conn.createStatement();
stmt.execute("use test");
}
/**
* Convert raw data to SQL fragments, group them by table name and cache them in a HashMap.
* Trigger writing when number of buffered records reached maxBachSize.
*
* @param line raw data get from task queue in format: tbName,ts,current,voltage,phase,location,groupId
*/
public void processLine(String line) throws SQLException {
bufferedCount += 1;
int firstComma = line.indexOf(',');
String tbName = line.substring(0, firstComma);
int lastComma = line.lastIndexOf(',');
int secondLastComma = line.lastIndexOf(',', lastComma - 1);
String value = "(" + line.substring(firstComma + 1, secondLastComma) + ") ";
if (tbValues.containsKey(tbName)) {
tbValues.put(tbName, tbValues.get(tbName) + value);
} else {
tbValues.put(tbName, value);
}
if (!tbTags.containsKey(tbName)) {
String location = line.substring(secondLastComma + 1, lastComma);
String groupId = line.substring(lastComma + 1);
String tagValues = "('" + location + "'," + groupId + ')';
tbTags.put(tbName, tagValues);
}
if (bufferedCount == maxBatchSize) {
flush();
}
}
/**
* Assemble INSERT statement using buffered SQL fragments in Map {@link SQLWriter#tbValues} and execute it.
* In case of "Table does not exit" exception, create all tables in the sql and retry the sql.
*/
public void flush() throws SQLException {
StringBuilder sb = new StringBuilder("INSERT INTO ");
for (Map.Entry<String, String> entry : tbValues.entrySet()) {
String tableName = entry.getKey();
String values = entry.getValue();
String q = tableName + " values " + values + " ";
if (sb.length() + q.length() > maxSQLLength) {
executeSQL(sb.toString());
logger.warn("increase maxSQLLength or decrease maxBatchSize to gain better performance");
sb = new StringBuilder("INSERT INTO ");
}
sb.append(q);
}
executeSQL(sb.toString());
tbValues.clear();
bufferedCount = 0;
}
private void executeSQL(String sql) throws SQLException {
try {
stmt.executeUpdate(sql);
} catch (SQLException e) {
// convert to error code defined in taoserror.h
int errorCode = e.getErrorCode() & 0xffff;
if (errorCode == 0x2603) {
// Table does not exist
createTables();
executeSQL(sql);
} else {
logger.error("Execute SQL: {}", sql);
throw e;
}
} catch (Throwable throwable) {
logger.error("Execute SQL: {}", sql);
throw throwable;
}
}
/**
* Create tables in batch using syntax:
* <p>
* CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF NOT EXISTS] tb_name2 USING stb_name TAGS (tag_value2, ...) ...;
* </p>
*/
private void createTables() throws SQLException {
StringBuilder sb = new StringBuilder("CREATE TABLE ");
for (String tbName : tbValues.keySet()) {
String tagValues = tbTags.get(tbName);
sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" ");
}
String sql = sb.toString();
try {
stmt.executeUpdate(sql);
} catch (Throwable throwable) {
logger.error("Execute SQL: {}", sql);
throw throwable;
}
}
public boolean hasBufferedValues() {
return bufferedCount > 0;
}
public int getBufferedCount() {
return bufferedCount;
}
public void close() {
try {
stmt.close();
} catch (SQLException e) {
}
try {
conn.close();
} catch (SQLException e) {
}
}
}

View File

@ -1,4 +0,0 @@
package com.taos.example.highvolume;
public class StmtWriter {
}

View File

@ -1,58 +0,0 @@
package com.taos.example.highvolume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
class WriteTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(WriteTask.class);
private final int maxBatchSize;
// the queue from which this writing task get raw data.
private final BlockingQueue<String> queue;
// A flag indicate whether to continue.
private boolean active = true;
public WriteTask(BlockingQueue<String> taskQueue, int maxBatchSize) {
this.queue = taskQueue;
this.maxBatchSize = maxBatchSize;
}
@Override
public void run() {
logger.info("started");
String line = null; // data getting from the queue just now.
SQLWriter writer = new SQLWriter(maxBatchSize);
try {
writer.init();
while (active) {
line = queue.poll();
if (line != null) {
// parse raw data and buffer the data.
writer.processLine(line);
} else if (writer.hasBufferedValues()) {
// write data immediately if no more data in the queue
writer.flush();
} else {
// sleep a while to avoid high CPU usage if no more data in the queue and no buffered records, .
Thread.sleep(100);
}
}
if (writer.hasBufferedValues()) {
writer.flush();
}
} catch (Exception e) {
String msg = String.format("line=%s, bufferedCount=%s", line, writer.getBufferedCount());
logger.error(msg, e);
} finally {
writer.close();
}
}
public void stop() {
logger.info("stop");
this.active = false;
}
}