diff --git a/docs/examples/JDBC/highvolume/README-jdbc-windows.md b/docs/examples/JDBC/highvolume/README-jdbc-windows.md deleted file mode 100644 index e91a953cd1..0000000000 --- a/docs/examples/JDBC/highvolume/README-jdbc-windows.md +++ /dev/null @@ -1,268 +0,0 @@ -# 如何在 windows环境下使用jdbc进行TDengine应用开发 - -本文以windows环境为例,介绍java如何进行TDengine开发应用 - -## 环境准备 - -(1)安装jdk - -官网下载jdk-1.8,下载页面:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html - -安装,配置环境变量,把jdk加入到环境变量里。 - -命令行内查看java的版本。 - -```shell ->java -version -java version "1.8.0_131" -Java(TM) SE Runtime Environment (build 1.8.0_131-b11) -Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode) -``` - - -(2)安装配置maven - -官网下载maven,下载地址:http://maven.apache.org/download.cgi - -配置环境变量MAVEN_HOME,将MAVEN_HOME/bin添加到PATH - -命令行里查看maven的版本 - -```shell ->mvn --version -Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 2017-04-04T03:39:06+08:00) -Maven home: D:\apache-maven-3.5.0\bin\.. -Java version: 1.8.0_131, vendor: Oracle Corporation -Java home: C:\Program Files\Java\jdk1.8.0_131\jre -Default locale: zh_CN, platform encoding: GBK -OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows" -``` - -为了加快maven下载依赖的速度,可以为maven配置mirror,修改MAVEN_HOME\config\settings.xml文件 - -```xml - - - D:\apache-maven-localRepository - - - - - alimaven - aliyun maven - http://maven.aliyun.com/nexus/content/groups/public/ - central - - - - - - - jdk-1.8 - - true - 1.8 - - - 1.8 - 1.8 - 1.8 - - - - -``` - - - -(3)在linux服务器上安装TDengine-server - -在taosdata官网下载TDengine-server,下载地址:https://www.taosdata.com/cn/all-downloads/ - -在linux服务器上安装TDengine-server - -```shell -# tar -zxvf package/TDengine-server-2.0.1.1-Linux-x64.tar.gz -# cd TDengine-server/ -# ./install.sh -``` - -启动taosd - -```shell -# systemctl start taosd -``` - -在server上用taos连接taosd - -```shell -# taos -taos> show dnodes; - id | end_point | vnodes | cores | status | role | create_time | -================================================================================================================== - 1 | td01:6030 | 2 | 4 | ready | any | 2020-08-19 18:40:25.045 | -Query OK, 1 row(s) in set (0.005765s) -``` - -如果可以正确连接到taosd实例,并打印出databases的信息,说明TDengine的server已经正确启动。这里查看server的hostname - -```shell -# hostname -f -td01 -``` - -注意,如果安装TDengine后,使用默认的taos.cfg配置文件,taosd会使用当前server的hostname创建dnode实例。之后,在client也需要使用这个hostname来连接taosd。 - - - -(4)在windows上安装TDengine-client - -在taosdata官网下载taos客户端,下载地址: -https://www.taosdata.com/cn/all-downloads/ -下载后,双击exe安装。 - -修改client的hosts文件(C:\Windows\System32\drivers\etc\hosts),将server的hostname和ip配置到client的hosts文件中 - -``` -192.168.236.136 td01 -``` - -配置完成后,在命令行内使用TDengine CLI连接server端 - -```shell -C:\TDengine>taos -h td01 -Welcome to the TDengine shell from Linux, Client Version:2.0.1.1 -Copyright (c) 2017 by TAOS Data, Inc. All rights reserved. - -taos> show databases; - name | created_time | ntables | vgroups | replica | quorum | days | keep0,keep1,keep(D) | cache(MB) | blocks | minrows | maxrows | wallevel | fsync | comp | precision | status | -=================================================================================================================================================================================================================================================================== - test | 2020-08-19 18:43:50.731 | 1 | 1 | 1 | 1 | 2 | 3650,3650,3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | ms | ready | - log | 2020-08-19 18:40:28.064 | 4 | 1 | 1 | 1 | 10 | 30,30,30 | 1 | 3 | 100 | 4096 | 1 | 3000 | 2 | us | ready | -Query OK, 2 row(s) in set (0.068000s) -``` - -如果windows上的client能够正常连接,并打印database信息,说明client可以正常连接server了。 - - - -## 应用开发 - -(1)新建maven工程,在pom.xml中引入taos-jdbcdriver依赖。 - -```xml - - - 4.0.0 - - com.taosdata.demo - JdbcDemo - 1.0-SNAPSHOT - - - - com.taosdata.jdbc - taos-jdbcdriver - 2.0.8 - - - -``` - -(2)使用jdbc查询TDengine数据库 - -下面是示例代码: - -```java -public class JdbcDemo { - - public static void main(String[] args) throws Exception { - Connection conn = getConn(); - Statement stmt = conn.createStatement(); - // create database - stmt.executeUpdate("create database if not exists db"); - // use database - stmt.executeUpdate("use db"); - // create table - stmt.executeUpdate("create table if not exists tb (ts timestamp, temperature int, humidity float)"); - // insert data - int affectedRows = stmt.executeUpdate("insert into tb values(now, 23, 10.3) (now + 1s, 20, 9.3)"); - System.out.println("insert " + affectedRows + " rows."); - // query data - ResultSet resultSet = stmt.executeQuery("select * from tb"); - Timestamp ts = null; - int temperature = 0; - float humidity = 0; - while(resultSet.next()){ - ts = resultSet.getTimestamp(1); - temperature = resultSet.getInt(2); - humidity = resultSet.getFloat("humidity"); - System.out.printf("%s, %d, %s\n", ts, temperature, humidity); - } - } - - public static Connection getConn() throws Exception{ - Class.forName("com.taosdata.jdbc.TSDBDriver"); - String jdbcUrl = "jdbc:TAOS://td01:0/log?user=root&password=taosdata"; - Properties connProps = new Properties(); - connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); - connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8"); - connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); - Connection conn = DriverManager.getConnection(jdbcUrl, connProps); - return conn; - } - -} -``` - -(3)测试jdbc访问tdengine的sever实例 - -console输出: - -``` -insert 2 rows. -2020-08-26 00:06:34.575, 23, 10.3 -2020-08-26 00:06:35.575, 20, 9.3 -``` - - - -## 指南 - -(1)如何设置主机名和hosts - -在server上查看hostname和fqdn -```shell -查看hostname -# hostname -taos-server - -查看fqdn -# hostname -f -taos-server -``` - -windows下hosts文件位于: -C:\\Windows\System32\drivers\etc\hosts -修改hosts文件,添加server的ip和hostname - -```s -192.168.56.101 node5 -``` - -(2)什么是fqdn? - - -> 什么是FQDN? -> -> FQDN(Full qualified domain name)全限定域名,fqdn由2部分组成:hostname+domainname。 -> -> 例如,一个邮件服务器的fqdn可能是:mymail.somecollege.edu,其中mymail是hostname(主机名),somcollege.edu是domainname(域名)。本例中,.edu是顶级域名,.somecollege是二级域名。 -> -> 当连接服务器时,必须指定fqdn,然后,dns服务器通过查看dns表,将hostname解析为相应的ip地址。如果只指定hostname(不指定domainname),应用程序可能服务解析主机名。因为如果你试图访问不在本地的远程服务器时,本地的dns服务器和可能没有远程服务器的hostname列表。 -> -> 参考:https://kb.iu.edu/d/aiuv diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ConsumerTask.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ConsumerTask.java new file mode 100644 index 0000000000..c29ae6fdea --- /dev/null +++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ConsumerTask.java @@ -0,0 +1,119 @@ +package com.taos.example.highvolume; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +class ConsumerTask implements Runnable, Stoppable { + private final static Logger logger = LoggerFactory.getLogger(ConsumerTask.class); + private final int taskId; + private final int writeThreadCount; + private final int batchSizeByRow; + private final int cacheSizeByRow; + private final String dbName; + private volatile boolean active = true; + + public ConsumerTask(int taskId, + int writeThradCount, + int batchSizeByRow, + int cacheSizeByRow, + String dbName) { + this.taskId = taskId; + this.writeThreadCount = writeThradCount; + this.batchSizeByRow = batchSizeByRow; + this.cacheSizeByRow = cacheSizeByRow; + this.dbName = dbName; + } + + @Override + public void run() { + + // 配置 Kafka 消费者的属性 + Properties props = new Properties(); + + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Util.getKafkaBootstrapServers()); + + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); + + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(batchSizeByRow)); + props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "3000"); + + props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, String.valueOf(2 * 1024 * 1024)); + + props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "15000"); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000"); + + KafkaConsumer consumer = new KafkaConsumer<>(props); + + List topics = Collections.singletonList(Util.getKafkaTopic()); + + try { + consumer.subscribe(topics); + } catch (Exception e) { + logger.error("Consumer Task {} Error", taskId, e); + return; + } + + try (Connection connection = Util.getConnection(batchSizeByRow, cacheSizeByRow, writeThreadCount); + PreparedStatement pstmt = connection.prepareStatement("INSERT INTO " + dbName +".meters (tbname, ts, current, voltage, phase) VALUES (?,?,?,?,?)")) { + long i = 0L; + long lastTimePolled = System.currentTimeMillis(); + while (active) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + i++; + Meters meters = Meters.fromString(record.value()); + pstmt.setString(1, meters.getTableName()); + pstmt.setTimestamp(2, meters.getTs()); + pstmt.setFloat(3, meters.getCurrent()); + pstmt.setInt(4, meters.getVoltage()); + pstmt.setFloat(5, meters.getPhase()); + pstmt.addBatch(); + + if (i % batchSizeByRow == 0) { + pstmt.executeBatch(); + } + if (i % (10L * batchSizeByRow) == 0){ + //pstmt.executeUpdate(); + consumer.commitAsync(); + } + } + + if (!records.isEmpty()){ + lastTimePolled = System.currentTimeMillis(); + } else { + if (System.currentTimeMillis() - lastTimePolled > 1000 * 60) { + lastTimePolled = System.currentTimeMillis(); + logger.error("Consumer Task {} has been idle for 10 seconds, stopping", taskId); + } + } + } + } catch (Exception e) { + logger.error("Consumer Task {} Error", taskId, e); + } finally { + // 关闭消费者 + consumer.close(); + } + } + + public void stop() { + logger.info("stop"); + this.active = false; + } +} \ No newline at end of file diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/CreateSubTableTask.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/CreateSubTableTask.java new file mode 100644 index 0000000000..5c10483283 --- /dev/null +++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/CreateSubTableTask.java @@ -0,0 +1,58 @@ +package com.taos.example.highvolume; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +class CreateSubTableTask implements Runnable { + private final static Logger logger = LoggerFactory.getLogger(CreateSubTableTask.class); + private final int taskId; + private final int subTableStartIndex; + private final int subTableEndIndex; + private final String dbName; + + + + public CreateSubTableTask(int taskId, + int subTableStartIndex, + int subTableEndIndex, + String dbName) { + this.taskId = taskId; + this.subTableStartIndex = subTableStartIndex; + this.subTableEndIndex = subTableEndIndex; + this.dbName = dbName; + } + + @Override + public void run() { + try (Connection connection = Util.getConnection(); + Statement statement = connection.createStatement()){ + statement.execute("use " + dbName); + StringBuilder sql = new StringBuilder(); + sql.append("create table"); + int i = 0; + for (int tableNum = subTableStartIndex; tableNum <= subTableEndIndex; tableNum++) { + sql.append(" if not exists " + Util.getTableNamePrefix() + tableNum + " using meters" + " tags(" + tableNum + ", " + "\"location_" + tableNum + "\"" + ")"); + + if (i < 1000) { + i++; + } else { + statement.execute(sql.toString()); + sql = new StringBuilder(); + sql.append("create table"); + i = 0; + } + } + if (sql.length() > "create table".length()) { + statement.execute(sql.toString()); + } + } catch (SQLException e) { + logger.error("task id {}, failed to create sub table", taskId, e); + } + } + + +} \ No newline at end of file diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java new file mode 100644 index 0000000000..848633a1a3 --- /dev/null +++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/DataBaseMonitor.java @@ -0,0 +1,47 @@ +package com.taos.example.highvolume; + +import java.sql.*; + +/** + * Prepare target database. + * Count total records in database periodically so that we can estimate the writing speed. + */ +public class DataBaseMonitor { + private Connection conn; + private Statement stmt; + private String dbName; + + public DataBaseMonitor init(String dbName) throws SQLException { + if (conn == null) { + conn = Util.getConnection(); + stmt = conn.createStatement(); + } + this.dbName = dbName; + return this; + } + + public void close() { + try { + stmt.close(); + } catch (SQLException e) { + } + try { + conn.close(); + } catch (SQLException e) { + } + } + + public void prepareDatabase() throws SQLException { + stmt.execute("DROP DATABASE IF EXISTS " + dbName); + stmt.execute("CREATE DATABASE IF NOT EXISTS " + dbName + " vgroups 20"); + stmt.execute("use " + dbName); + stmt.execute("CREATE STABLE " + dbName + ".meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(64))"); + } + + public long count() throws SQLException { + try (ResultSet result = stmt.executeQuery("SELECT count(*) from " + dbName +".meters")) { + result.next(); + return result.getLong(1); + } + } +} \ No newline at end of file diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/FastWriteExample.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/FastWriteExample.java new file mode 100644 index 0000000000..892537c499 --- /dev/null +++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/FastWriteExample.java @@ -0,0 +1,332 @@ +package com.taos.example.highvolume; + +import org.apache.commons.cli.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; + + +public class FastWriteExample { + final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class); + final static DataBaseMonitor databaseMonitor = new DataBaseMonitor(); + static ThreadPoolExecutor writerThreads; + static ThreadPoolExecutor producerThreads; + final static ThreadPoolExecutor statThread = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); + + private final static List allTasks = new ArrayList<>(); + + private static int readThreadCount = 5; + private static int writeThreadPerReadThread = 5; + private static int batchSizeByRow = 1000; + private static int cacheSizeByRow = 10000; + private static int subTableNum = 1000000; + private static int rowsPerSubTable = 100; + private static String dbName = "test"; + + + public static void forceStopAll() { + logger.info("shutting down"); + + for (Stoppable task : allTasks) { + task.stop(); + } + + if (producerThreads != null) { + producerThreads.shutdown(); + } + + if (writerThreads != null) { + writerThreads.shutdown(); + } + + statThread.shutdown(); + } + + private static void createSubTables(){ + writerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, getNamedThreadFactory("FW-CreateSubTable-thread-")); + + int range = (subTableNum + readThreadCount - 1) / readThreadCount; + + for (int i = 0; i < readThreadCount; i++) { + int startIndex = i * range; + int endIndex; + if (i == readThreadCount - 1) { + endIndex = subTableNum - 1; + } else { + endIndex = startIndex + range - 1; + } + + logger.debug("create sub table task {} {} {}", i, startIndex, endIndex); + + CreateSubTableTask createSubTableTask = new CreateSubTableTask(i, + startIndex, + endIndex, + dbName); + writerThreads.submit(createSubTableTask); + } + + logger.info("create sub table task started."); + + while (writerThreads.getActiveCount() != 0) { + try { + Thread.sleep(1); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + logger.info("create sub table task finished."); + + } + + public static void startStatTask() throws SQLException { + StatTask statTask = new StatTask(dbName, subTableNum); + allTasks.add(statTask); + statThread.submit(statTask); + } + public static ThreadFactory getNamedThreadFactory(String namePrefix) { + return new ThreadFactory() { + private final AtomicInteger threadNumber = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, namePrefix + threadNumber.getAndIncrement()); + } + }; + } + + private static void invokeKafkaDemo() throws SQLException { + producerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, getNamedThreadFactory("FW-kafka-producer-thread-")); + writerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, getNamedThreadFactory("FW-kafka-consumer-thread-")); + + int range = (subTableNum + readThreadCount - 1) / readThreadCount; + + for (int i = 0; i < readThreadCount; i++) { + int startIndex = i * range; + int endIndex; + if (i == readThreadCount - 1) { + endIndex = subTableNum - 1; + } else { + endIndex = startIndex + range - 1; + } + + ProducerTask producerTask = new ProducerTask(i, + rowsPerSubTable, + startIndex, + endIndex); + allTasks.add(producerTask); + producerThreads.submit(producerTask); + + ConsumerTask consumerTask = new ConsumerTask(i, + writeThreadPerReadThread, + batchSizeByRow, + cacheSizeByRow, + dbName); + allTasks.add(consumerTask); + writerThreads.submit(consumerTask); + } + + startStatTask(); + Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::forceStopAll)); + + while (writerThreads.getActiveCount() != 0) { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + } + private static void invokeMockDataDemo() throws SQLException { + ThreadFactory namedThreadFactory = new ThreadFactory() { + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix = "FW-work-thread-"; + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, namePrefix + threadNumber.getAndIncrement()); + } + }; + + writerThreads = (ThreadPoolExecutor) Executors.newFixedThreadPool(readThreadCount, namedThreadFactory); + + int range = (subTableNum + readThreadCount - 1) / readThreadCount; + + for (int i = 0; i < readThreadCount; i++) { + int startIndex = i * range; + int endIndex; + if (i == readThreadCount - 1) { + endIndex = subTableNum - 1; + } else { + endIndex = startIndex + range - 1; + } + + WorkTask task = new WorkTask(i, + writeThreadPerReadThread, + batchSizeByRow, + cacheSizeByRow, + rowsPerSubTable, + startIndex, + endIndex, + dbName); + allTasks.add(task); + writerThreads.submit(task); + } + + startStatTask(); + Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::forceStopAll)); + + while (writerThreads.getActiveCount() != 0) { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + } + + // 打印帮助信息的方法 + private static void printHelp(Options options) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("java -jar highVolume.jar", options); + System.out.println(); + } + + public static void main(String[] args) throws SQLException, InterruptedException { + Thread.sleep(10 * 1000); + Options options = new Options(); + + Option readThdcountOption = new Option("r", "readThreadCount", true, "Specify the readThreadCount, default is 5"); + readThdcountOption.setRequired(false); + options.addOption(readThdcountOption); + + Option writeThdcountOption = new Option("w", "writeThreadPerReadThread", true, "Specify the writeThreadPerReadThread, default is 5"); + writeThdcountOption.setRequired(false); + options.addOption(writeThdcountOption); + + Option batchSizeOption = new Option("b", "batchSizeByRow", true, "Specify the batchSizeByRow, default is 1000"); + batchSizeOption.setRequired(false); + options.addOption(batchSizeOption); + + Option cacheSizeOption = new Option("c", "cacheSizeByRow", true, "Specify the cacheSizeByRow, default is 10000"); + cacheSizeOption.setRequired(false); + options.addOption(cacheSizeOption); + + Option subTablesOption = new Option("s", "subTableNum", true, "Specify the subTableNum, default is 1000000"); + subTablesOption.setRequired(false); + options.addOption(subTablesOption); + + Option rowsPerTableOption = new Option("R", "rowsPerSubTable", true, "Specify the rowsPerSubTable, default is 100"); + rowsPerTableOption.setRequired(false); + options.addOption(rowsPerTableOption); + + Option dbNameOption = new Option("d", "dbName", true, "Specify the database name, default is test"); + dbNameOption.setRequired(false); + options.addOption(dbNameOption); + + Option kafkaOption = new Option("K", "useKafka", false, "use kafka demo to test"); + kafkaOption.setRequired(false); + options.addOption(kafkaOption); + + CommandLineParser parser = new DefaultParser(); + CommandLine cmd; + + try { + cmd = parser.parse(options, args); + } catch (ParseException e) { + System.out.println(e.getMessage()); + printHelp(options); + System.exit(1); + return; + } + + // 检查是否请求了帮助信息 + if (cmd.hasOption("help")) { + printHelp(options); + return; + } + + if (cmd.getOptionValue("readThreadCount") != null) { + readThreadCount = Integer.parseInt(cmd.getOptionValue("readThreadCount")); + if (readThreadCount <= 0){ + logger.error("readThreadCount must be greater than 0"); + return; + } + } + + if (cmd.getOptionValue("writeThreadPerReadThread") != null) { + writeThreadPerReadThread = Integer.parseInt(cmd.getOptionValue("writeThreadPerReadThread")); + if (writeThreadPerReadThread <= 0){ + logger.error("writeThreadPerReadThread must be greater than 0"); + return; + } + } + + if (cmd.getOptionValue("batchSizeByRow") != null) { + batchSizeByRow = Integer.parseInt(cmd.getOptionValue("batchSizeByRow")); + if (batchSizeByRow <= 0){ + logger.error("batchSizeByRow must be greater than 0"); + return; + } + } + + if (cmd.getOptionValue("cacheSizeByRow") != null) { + cacheSizeByRow = Integer.parseInt(cmd.getOptionValue("cacheSizeByRow")); + if (cacheSizeByRow <= 0){ + logger.error("cacheSizeByRow must be greater than 0"); + return; + } + } + + if (cmd.getOptionValue("subTableNum") != null) { + subTableNum = Integer.parseInt(cmd.getOptionValue("subTableNum")); + if (subTableNum <= 0){ + logger.error("subTableNum must be greater than 0"); + return; + } + } + + if (cmd.getOptionValue("rowsPerSubTable") != null) { + rowsPerSubTable = Integer.parseInt(cmd.getOptionValue("rowsPerSubTable")); + if (rowsPerSubTable <= 0){ + logger.error("rowsPerSubTable must be greater than 0"); + return; + } + } + + if (cmd.getOptionValue("dbName") != null) { + dbName = cmd.getOptionValue("dbName"); + } + + logger.info("readThreadCount={}, writeThreadPerReadThread={} batchSizeByRow={} cacheSizeByRow={}, subTableNum={}, rowsPerSubTable={}", + readThreadCount, writeThreadPerReadThread, batchSizeByRow, cacheSizeByRow, subTableNum, rowsPerSubTable); + + logger.info("create database begin."); + databaseMonitor.init(dbName).prepareDatabase(); + databaseMonitor.close(); + + logger.info("create database end."); + + logger.info("create sub tables start."); + createSubTables(); + logger.info("create sub tables end."); + + + if (cmd.hasOption("K")) { + Util.createKafkaTopic(); + // use kafka demo + invokeKafkaDemo(); + + } else { + // use mock data source demo + invokeMockDataDemo(); + } + + } +} \ No newline at end of file diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Meters.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Meters.java new file mode 100644 index 0000000000..7db0d9f2a1 --- /dev/null +++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Meters.java @@ -0,0 +1,76 @@ +package com.taos.example.highvolume; + +import java.sql.Timestamp; + +public class Meters { + String tableName; + Timestamp ts; + float current; + int voltage; + float phase; + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public Timestamp getTs() { + return ts; + } + + public void setTs(Timestamp ts) { + this.ts = ts; + } + + public float getCurrent() { + return current; + } + + public void setCurrent(float current) { + this.current = current; + } + + public int getVoltage() { + return voltage; + } + + public void setVoltage(int voltage) { + this.voltage = voltage; + } + + public float getPhase() { + return phase; + } + + public void setPhase(float phase) { + this.phase = phase; + } + + @Override + // this is just a demo, so we don't need to implement the full CSV parser + public String toString() { + return tableName + "," + + ts.toString() + "," + + current + "," + + voltage + "," + + phase; + } + + public static Meters fromString(String str) { + String[] parts = str.split(","); + if (parts.length != 5) { + throw new IllegalArgumentException("Invalid input format"); + } + Meters meters = new Meters(); + meters.setTableName(parts[0]); + meters.setTs(Timestamp.valueOf(parts[1])); + meters.setCurrent(Float.parseFloat(parts[2])); + meters.setVoltage(Integer.parseInt(parts[3])); + meters.setPhase(Float.parseFloat(parts[4])); + return meters; + } + +} diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/MockDataSource.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/MockDataSource.java new file mode 100644 index 0000000000..aa74912c34 --- /dev/null +++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/MockDataSource.java @@ -0,0 +1,54 @@ +package com.taos.example.highvolume; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; + +/** + * Generate test data + */ +class MockDataSource implements Iterator { + private final static Logger logger = LoggerFactory.getLogger(WorkTask.class); + + private final int tableStartIndex; + private final int tableEndIndex; + private final long maxRowsPerTable; + + long currentMs = System.currentTimeMillis(); + private int index = 0; + + // mock values + + public MockDataSource(int tableStartIndex, int tableEndIndex, int maxRowsPerTable) { + this.tableStartIndex = tableStartIndex; + this.tableEndIndex = tableEndIndex; + this.maxRowsPerTable = maxRowsPerTable; + } + + @Override + public boolean hasNext() { + return index < (tableEndIndex - tableStartIndex + 1) * maxRowsPerTable; + } + + @Override + public Meters next() { + // use interlace rows one to simulate the data distribution in real world + if (index % (tableEndIndex - tableStartIndex + 1) == 0) { + currentMs += 1000; + } + + long currentTbId = index % (tableEndIndex - tableStartIndex + 1) + tableStartIndex; + + Meters meters = new Meters(); + + meters.setTableName(Util.getTableNamePrefix() + currentTbId); + meters.setTs(new java.sql.Timestamp(currentMs)); + meters.setCurrent((float) (Math.random() * 100)); + meters.setVoltage((int) (Math.random() * 100)); + meters.setPhase((float) (Math.random() * 100)); + + index ++; + return meters; + } +} diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ProducerTask.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ProducerTask.java new file mode 100644 index 0000000000..6c217df8ab --- /dev/null +++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/ProducerTask.java @@ -0,0 +1,67 @@ +package com.taos.example.highvolume; + +import com.taosdata.jdbc.utils.ReqId; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.Properties; + +class ProducerTask implements Runnable, Stoppable { + private final static Logger logger = LoggerFactory.getLogger(ProducerTask.class); + private final int taskId; + private final int subTableStartIndex; + private final int subTableEndIndex; + private final int rowsPerTable; + private volatile boolean active = true; + public ProducerTask(int taskId, + int rowsPerTable, + int subTableStartIndex, + int subTableEndIndex) { + this.taskId = taskId; + this.subTableStartIndex = subTableStartIndex; + this.subTableEndIndex = subTableEndIndex; + this.rowsPerTable = rowsPerTable; + } + + @Override + public void run() { + logger.info("started"); + Iterator it = new MockDataSource(subTableStartIndex, subTableEndIndex, rowsPerTable); + + Properties props = new Properties(); + props.put("bootstrap.servers", Util.getKafkaBootstrapServers()); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("batch.size", 1024 * 1024); + props.put("linger.ms", 500); + + // create a Kafka producer + KafkaProducer producer = new KafkaProducer<>(props); + + try { + while (it.hasNext() && active) { + Meters meters = it.next(); + String key = meters.getTableName(); + String value = meters.toString(); + // to avoid the data of the sub-table out of order. we use the partition key to ensure the data of the same sub-table is sent to the same partition. + // Because efficient writing use String hashcode,here we use another hash algorithm to calculate the partition key. + long hashCode = Math.abs(ReqId.murmurHash32(key.getBytes(), 0)); + ProducerRecord record = new ProducerRecord<>(Util.getKafkaTopic(), (int)(hashCode % Util.getPartitionCount()), key, value); + producer.send(record); + } + } catch (Exception e) { + logger.error("task id {}, send message error: ", taskId, e); + } + finally { + producer.close(); + } + } + + public void stop() { + logger.info("stop"); + this.active = false; + } +} \ No newline at end of file diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/StatTask.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/StatTask.java new file mode 100644 index 0000000000..158fb61bb2 --- /dev/null +++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/StatTask.java @@ -0,0 +1,46 @@ +package com.taos.example.highvolume; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; + +class StatTask implements Runnable, Stoppable { + private final static Logger logger = LoggerFactory.getLogger(StatTask.class); + private final DataBaseMonitor databaseMonitor; + private final int subTableNum; + private volatile boolean active = true; + + + public StatTask(String dbName, + int subTableNum) throws SQLException { + this.databaseMonitor = new DataBaseMonitor().init(dbName); + this.subTableNum = subTableNum; + } + + @Override + public void run() { + long lastCount = 0; + while (active) { + try { + Thread.sleep(10000); + + long count = databaseMonitor.count(); + logger.info("numberOfTable={} count={} speed={}", subTableNum, count, (count - lastCount) / 10); + lastCount = count; + } catch (InterruptedException e) { + logger.error("interrupted", e); + break; + } catch (SQLException e) { + logger.error("execute sql error: ", e); + break; + } + } + + databaseMonitor.close(); + } + + public void stop() { + active = false; + } +} \ No newline at end of file diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Stoppable.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Stoppable.java new file mode 100644 index 0000000000..f3b249eb80 --- /dev/null +++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Stoppable.java @@ -0,0 +1,5 @@ +package com.taos.example.highvolume; + +public interface Stoppable { + void stop(); +} diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Util.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Util.java new file mode 100644 index 0000000000..fa05d59c19 --- /dev/null +++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/Util.java @@ -0,0 +1,87 @@ +package com.taos.example.highvolume; + +import com.taosdata.jdbc.TSDBDriver; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; + +import org.apache.kafka.clients.admin.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ExecutionException; + +public class Util { + private final static Logger logger = LoggerFactory.getLogger(Util.class); + + public static String getTableNamePrefix() { + return "d_"; + } + + public static Connection getConnection() throws SQLException { + String jdbcURL = System.getenv("TDENGINE_JDBC_URL"); + if (jdbcURL == null || jdbcURL == "") { + jdbcURL = "jdbc:TAOS-WS://localhost:6041/?user=root&password=taosdata"; + } + return DriverManager.getConnection(jdbcURL); + } + + public static Connection getConnection(int batchSize, int cacheSize, int writeThreadNum) throws SQLException { + String jdbcURL = System.getenv("TDENGINE_JDBC_URL"); + if (jdbcURL == null || jdbcURL == "") { + jdbcURL = "jdbc:TAOS-WS://localhost:6041/?user=root&password=taosdata"; + } + Properties properties = new Properties(); + properties.setProperty(TSDBDriver.PROPERTY_KEY_ASYNC_WRITE, "stmt"); + properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_SIZE_BY_ROW, String.valueOf(batchSize)); + properties.setProperty(TSDBDriver.PROPERTY_KEY_CACHE_SIZE_BY_ROW, String.valueOf(cacheSize)); + properties.setProperty(TSDBDriver.PROPERTY_KEY_BACKEND_WRITE_THREAD_NUM, String.valueOf(writeThreadNum)); + properties.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true"); + return DriverManager.getConnection(jdbcURL, properties); + } + + public static String getKafkaBootstrapServers() { + String kafkaBootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS"); + if (kafkaBootstrapServers == null || kafkaBootstrapServers == "") { + kafkaBootstrapServers = "localhost:9092"; + } + + return kafkaBootstrapServers; + } + + public static String getKafkaTopic() { + return "test-meters-topic"; + } + + public static void createKafkaTopic() { + Properties config = new Properties(); + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers()); + + try (AdminClient adminClient = AdminClient.create(config)) { + String topicName = getKafkaTopic(); + int numPartitions = getPartitionCount(); + short replicationFactor = 1; + + ListTopicsResult topics = adminClient.listTopics(); + Set existingTopics = topics.names().get(); + + if (!existingTopics.contains(topicName)) { + NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor); + CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic)); + createTopicsResult.all().get(); // 等待创建完成 + logger.info("Topic " + topicName + " created successfully."); + } + + } catch (InterruptedException | ExecutionException e) { + logger.error("Failed to delete/create topic: " + e.getMessage()); + throw new RuntimeException(e); + } + } + + public static int getPartitionCount() { + return 5; + } +} diff --git a/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/WorkTask.java b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/WorkTask.java new file mode 100644 index 0000000000..73cda8bf14 --- /dev/null +++ b/docs/examples/JDBC/highvolume/src/main/java/com/taos/example/highvolume/WorkTask.java @@ -0,0 +1,69 @@ +package com.taos.example.highvolume; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.Iterator; + +class WorkTask implements Runnable, Stoppable { + private final static Logger logger = LoggerFactory.getLogger(WorkTask.class); + private final int taskId; + private final int writeThreadCount; + private final int batchSizeByRow; + private final int cacheSizeByRow; + private final int rowsPerTable; + private final int subTableStartIndex; + private final int subTableEndIndex; + private final String dbName; + private volatile boolean active = true; + public WorkTask(int taskId, + int writeThradCount, + int batchSizeByRow, + int cacheSizeByRow, + int rowsPerTable, + int subTableStartIndex, + int subTableEndIndex, + String dbName) { + this.taskId = taskId; + this.writeThreadCount = writeThradCount; + this.batchSizeByRow = batchSizeByRow; + this.cacheSizeByRow = cacheSizeByRow; + this.rowsPerTable = rowsPerTable; + this.subTableStartIndex = subTableStartIndex; + this.subTableEndIndex = subTableEndIndex; + this.dbName = dbName; + } + + @Override + public void run() { + logger.info("started"); + Iterator it = new MockDataSource(subTableStartIndex, subTableEndIndex, rowsPerTable); + try (Connection connection = Util.getConnection(batchSizeByRow, cacheSizeByRow, writeThreadCount); + PreparedStatement pstmt = connection.prepareStatement("INSERT INTO " + dbName +".meters (tbname, ts, current, voltage, phase) VALUES (?,?,?,?,?)")) { + long i = 0L; + while (it.hasNext() && active) { + i++; + Meters meters = it.next(); + pstmt.setString(1, meters.getTableName()); + pstmt.setTimestamp(2, meters.getTs()); + pstmt.setFloat(3, meters.getCurrent()); + pstmt.setInt(4, meters.getVoltage()); + pstmt.setFloat(5, meters.getPhase()); + pstmt.addBatch(); + + if (i % batchSizeByRow == 0) { + pstmt.executeBatch(); + } + } + } catch (Exception e) { + logger.error("Work Task {} Error", taskId, e); + } + } + + public void stop() { + logger.info("stop"); + this.active = false; + } +} \ No newline at end of file