From b01f13bbd9c053f9a56fbc7411b267ff57909dbf Mon Sep 17 00:00:00 2001 From: zyyang Date: Tue, 22 Dec 2020 10:36:43 +0800 Subject: [PATCH] change --- .../taosdemo/TaosDemoApplication.java | 73 ++------- .../taosdata/taosdemo/service/InsertTask.java | 101 ------------- .../taosdemo/service/SubTableService.java | 138 +++++++++++++++++- 3 files changed, 145 insertions(+), 167 deletions(-) delete mode 100644 tests/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/InsertTask.java diff --git a/tests/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/TaosDemoApplication.java b/tests/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/TaosDemoApplication.java index e7e1cbc945..502936c961 100644 --- a/tests/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/TaosDemoApplication.java +++ b/tests/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/TaosDemoApplication.java @@ -1,32 +1,27 @@ package com.taosdata.taosdemo; import com.taosdata.taosdemo.components.DataSourceFactory; -import com.taosdata.taosdemo.domain.FieldMeta; +import com.taosdata.taosdemo.components.JdbcTaosdemoConfig; import com.taosdata.taosdemo.domain.SuperTableMeta; -import com.taosdata.taosdemo.domain.TagMeta; import com.taosdata.taosdemo.service.DatabaseService; -import com.taosdata.taosdemo.service.InsertTask; import com.taosdata.taosdemo.service.SubTableService; import com.taosdata.taosdemo.service.SuperTableService; import com.taosdata.taosdemo.service.data.SuperTableMetaGenerator; -import com.taosdata.taosdemo.components.JdbcTaosdemoConfig; import org.apache.log4j.Logger; import javax.sql.DataSource; import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.FutureTask; -import java.util.stream.Collectors; -import java.util.stream.IntStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; public class TaosDemoApplication { + private static Logger logger = Logger.getLogger(TaosDemoApplication.class); public static void main(String[] args) throws IOException { - // 读配置参数 JdbcTaosdemoConfig config = new JdbcTaosdemoConfig(args); boolean isHelp = Arrays.asList(args).contains("--help"); @@ -34,12 +29,11 @@ public class TaosDemoApplication { JdbcTaosdemoConfig.printHelp(); System.exit(0); } - + // 初始化 final DataSource dataSource = DataSourceFactory.getInstance(config.host, config.port, config.user, config.password); final DatabaseService databaseService = new DatabaseService(dataSource); final SuperTableService superTableService = new SuperTableService(dataSource); final SubTableService subTableService = new SubTableService(dataSource); - // 创建数据库 long start = System.currentTimeMillis(); Map databaseParam = new HashMap<>(); @@ -54,7 +48,7 @@ public class TaosDemoApplication { long end = System.currentTimeMillis(); logger.info(">>> create database time cost : " + (end - start) + " ms."); /**********************************************************************************/ - // 超级表的meta + // 构造超级表的meta SuperTableMeta superTableMeta; // create super table if (config.superTableSQL != null) { @@ -63,19 +57,8 @@ public class TaosDemoApplication { if (config.database != null && !config.database.isEmpty()) superTableMeta.setDatabase(config.database); } else if (config.numOfFields == 0) { - // default sql = "create table test.weather (ts timestamp, temperature float, humidity int) tags(location nchar(64), groupId int)"; - superTableMeta = new SuperTableMeta(); - superTableMeta.setDatabase(config.database); - superTableMeta.setName(config.superTable); - List fields = new ArrayList<>(); - fields.add(new FieldMeta("ts", "timestamp")); - fields.add(new FieldMeta("temperature", "float")); - fields.add(new FieldMeta("humidity", "int")); - superTableMeta.setFields(fields); - List tags = new ArrayList<>(); - tags.add(new TagMeta("location", "nchar(64)")); - tags.add(new TagMeta("groupId", "int")); - superTableMeta.setTags(tags); + String sql = "create table " + config.database + "." + config.superTable + " (ts timestamp, temperature float, humidity int) tags(location nchar(64), groupId int)"; + superTableMeta = SuperTableMetaGenerator.generate(sql); } else { // create super table with specified field size and tag size superTableMeta = SuperTableMetaGenerator.generate(config.database, config.superTable, config.numOfFields, config.prefixOfFields, config.numOfTags, config.prefixOfTags); @@ -104,43 +87,7 @@ public class TaosDemoApplication { start = System.currentTimeMillis(); // multi threads to insert - List taskList = new ArrayList<>(); - List threads = IntStream.range(0, threadSize) - .mapToObj(i -> { - long startInd = i * gap; - long endInd = (i + 1) * gap < tableSize ? (i + 1) * gap : tableSize; - FutureTask task = new FutureTask<>( - new InsertTask(superTableMeta, - startInd, endInd, - startTime, config.timeGap, - config.numOfRowsPerTable, config.numOfTablesPerSQL, config.numOfValuesPerSQL, - config.order, config.rate, config.range, - config.prefixOfTable, config.autoCreateTable, dataSource) - ); - taskList.add(task); - return new Thread(task, "InsertThread-" + i); - }).collect(Collectors.toList()); - - threads.stream().forEach(Thread::start); - for (Thread thread : threads) { - try { - thread.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - int affectedRows = 0; - for (FutureTask task : taskList) { - try { - affectedRows += task.get(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); - } - } - + int affectedRows = subTableService.insertAutoCreateTable(superTableMeta, threadSize, tableSize, startTime, gap, config); end = System.currentTimeMillis(); logger.info("insert " + affectedRows + " rows, time cost: " + (end - start) + " ms"); /**********************************************************************************/ diff --git a/tests/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/InsertTask.java b/tests/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/InsertTask.java deleted file mode 100644 index b213e311ac..0000000000 --- a/tests/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/InsertTask.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.taosdata.taosdemo.service; - -import com.taosdata.taosdemo.domain.SubTableValue; -import com.taosdata.taosdemo.domain.SuperTableMeta; -import com.taosdata.taosdemo.service.data.SubTableValueGenerator; - -import javax.sql.DataSource; -import java.sql.Connection; -import java.util.List; -import java.util.concurrent.Callable; - -public class InsertTask implements Callable { - - private final long startTableInd; // included - private final long endTableInd; // excluded - private final long startTime; - private final long timeGap; - private final long numOfRowsPerTable; - private long numOfTablesPerSQL; - private long numOfValuesPerSQL; - private final SuperTableMeta superTableMeta; - private final int order; - private final int rate; - private final long range; - private final String prefixOfTable; - private final boolean autoCreateTable; - private final DataSource dataSource; - - public InsertTask(SuperTableMeta superTableMeta, long startTableInd, long endTableInd, - long startTime, long timeGap, - long numOfRowsPerTable, long numOfTablesPerSQL, long numOfValuesPerSQL, - int order, int rate, long range, - String prefixOfTable, boolean autoCreateTable, DataSource dataSource) { - this.superTableMeta = superTableMeta; - this.startTableInd = startTableInd; - this.endTableInd = endTableInd; - this.startTime = startTime; - this.timeGap = timeGap; - this.numOfRowsPerTable = numOfRowsPerTable; - this.numOfTablesPerSQL = numOfTablesPerSQL; - this.numOfValuesPerSQL = numOfValuesPerSQL; - this.order = order; - this.rate = rate; - this.range = range; - this.prefixOfTable = prefixOfTable; - this.autoCreateTable = autoCreateTable; - this.dataSource = dataSource; - } - - - @Override - public Integer call() throws Exception { - - Connection connection = dataSource.getConnection(); - - long numOfTables = endTableInd - startTableInd; - if (numOfRowsPerTable < numOfValuesPerSQL) - numOfValuesPerSQL = (int) numOfRowsPerTable; - if (numOfTables < numOfTablesPerSQL) - numOfTablesPerSQL = (int) numOfTables; - - // row - for (long rowCnt = 0; rowCnt < numOfRowsPerTable; ) { - long rowSize = numOfValuesPerSQL; - if (rowCnt + rowSize > numOfRowsPerTable) { - rowSize = numOfRowsPerTable - rowCnt; - } - //table - for (long tableCnt = startTableInd; tableCnt < endTableInd; ) { - long tableSize = numOfTablesPerSQL; - if (tableCnt + tableSize > endTableInd) { - tableSize = endTableInd - tableCnt; - } - long startTime = this.startTime + rowCnt * timeGap; - -// System.out.println(Thread.currentThread().getName() + " >>> " + "rowCnt: " + rowCnt + ", rowSize: " + rowSize + ", " + "tableCnt: " + tableCnt + ",tableSize: " + tableSize + ", " + "startTime: " + startTime + ",timeGap: " + timeGap + ""); - /***********************************************/ - // 生成数据 - List data = SubTableValueGenerator.generate(superTableMeta, prefixOfTable, tableCnt, tableSize, rowSize, startTime, timeGap); - // 乱序 - if (order != 0) { - SubTableValueGenerator.disrupt(data, rate, range); - } - // insert - SubTableService subTableService = new SubTableService(dataSource); - if (autoCreateTable) { - subTableService.insertAutoCreateTable(data); - } else { - subTableService.insert(data); - } - /***********************************************/ - tableCnt += tableSize; - } - rowCnt += rowSize; - } - - connection.close(); - return 1; - } - -} diff --git a/tests/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/SubTableService.java b/tests/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/SubTableService.java index 64732cd257..95887137a2 100644 --- a/tests/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/SubTableService.java +++ b/tests/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/SubTableService.java @@ -1,22 +1,26 @@ package com.taosdata.taosdemo.service; +import com.taosdata.taosdemo.components.JdbcTaosdemoConfig; import com.taosdata.taosdemo.dao.SubTableMapper; import com.taosdata.taosdemo.dao.SubTableMapperImpl; import com.taosdata.taosdemo.domain.SubTableMeta; import com.taosdata.taosdemo.domain.SubTableValue; import com.taosdata.taosdemo.domain.SuperTableMeta; import com.taosdata.taosdemo.service.data.SubTableMetaGenerator; +import com.taosdata.taosdemo.service.data.SubTableValueGenerator; import org.apache.log4j.Logger; import javax.sql.DataSource; +import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; public class SubTableService extends AbstractService { private SubTableMapper mapper; + private static final Logger logger = Logger.getLogger(SubTableService.class); public SubTableService(DataSource datasource) { this.mapper = new SubTableMapperImpl(datasource); @@ -72,4 +76,132 @@ public class SubTableService extends AbstractService { return mapper.insertMultiTableMultiValuesUsingSuperTable(subTableValues); } + public int insertAutoCreateTable(SuperTableMeta superTableMeta, int threadSize, long tableSize, long startTime, long gap, JdbcTaosdemoConfig config) { + long start = System.currentTimeMillis(); + + List taskList = new ArrayList<>(); + List threads = IntStream.range(0, threadSize) + .mapToObj(i -> { + long startInd = i * gap; + long endInd = (i + 1) * gap < tableSize ? (i + 1) * gap : tableSize; + FutureTask task = new FutureTask<>( + new InsertTask(superTableMeta, + startInd, endInd, + startTime, config.timeGap, + config.numOfRowsPerTable, config.numOfTablesPerSQL, config.numOfValuesPerSQL, + config.order, config.rate, config.range, + config.prefixOfTable, config.autoCreateTable) + ); + taskList.add(task); + return new Thread(task, "InsertThread-" + i); + }).collect(Collectors.toList()); + + threads.stream().forEach(Thread::start); + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + int affectedRows = 0; + for (FutureTask task : taskList) { + try { + affectedRows += task.get(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + } + + long end = System.currentTimeMillis(); + logger.info("insert " + affectedRows + " rows, time cost: " + (end - start) + " ms"); + return affectedRows; + } + + private class InsertTask implements Callable { + + private final long startTableInd; // included + private final long endTableInd; // excluded + private final long startTime; + private final long timeGap; + private final long numOfRowsPerTable; + private long numOfTablesPerSQL; + private long numOfValuesPerSQL; + private final SuperTableMeta superTableMeta; + private final int order; + private final int rate; + private final long range; + private final String prefixOfTable; + private final boolean autoCreateTable; + + public InsertTask(SuperTableMeta superTableMeta, long startTableInd, long endTableInd, + long startTime, long timeGap, + long numOfRowsPerTable, long numOfTablesPerSQL, long numOfValuesPerSQL, + int order, int rate, long range, + String prefixOfTable, boolean autoCreateTable) { + this.superTableMeta = superTableMeta; + this.startTableInd = startTableInd; + this.endTableInd = endTableInd; + this.startTime = startTime; + this.timeGap = timeGap; + this.numOfRowsPerTable = numOfRowsPerTable; + this.numOfTablesPerSQL = numOfTablesPerSQL; + this.numOfValuesPerSQL = numOfValuesPerSQL; + this.order = order; + this.rate = rate; + this.range = range; + this.prefixOfTable = prefixOfTable; + this.autoCreateTable = autoCreateTable; + } + + + @Override + public Integer call() { + + long numOfTables = endTableInd - startTableInd; + if (numOfRowsPerTable < numOfValuesPerSQL) + numOfValuesPerSQL = (int) numOfRowsPerTable; + if (numOfTables < numOfTablesPerSQL) + numOfTablesPerSQL = (int) numOfTables; + + int affectRows = 0; + // row + for (long rowCnt = 0; rowCnt < numOfRowsPerTable; ) { + long rowSize = numOfValuesPerSQL; + if (rowCnt + rowSize > numOfRowsPerTable) { + rowSize = numOfRowsPerTable - rowCnt; + } + //table + for (long tableCnt = startTableInd; tableCnt < endTableInd; ) { + long tableSize = numOfTablesPerSQL; + if (tableCnt + tableSize > endTableInd) { + tableSize = endTableInd - tableCnt; + } + long startTime = this.startTime + rowCnt * timeGap; +// System.out.println(Thread.currentThread().getName() + " >>> " + "rowCnt: " + rowCnt + ", rowSize: " + rowSize + ", " + "tableCnt: " + tableCnt + ",tableSize: " + tableSize + ", " + "startTime: " + startTime + ",timeGap: " + timeGap + ""); + /***********************************************/ + // 生成数据 + List data = SubTableValueGenerator.generate(superTableMeta, prefixOfTable, tableCnt, tableSize, rowSize, startTime, timeGap); + // 乱序 + if (order != 0) + SubTableValueGenerator.disrupt(data, rate, range); + // insert + if (autoCreateTable) + affectRows = insertAutoCreateTable(data); + else + affectRows = insert(data); + /***********************************************/ + tableCnt += tableSize; + } + rowCnt += rowSize; + } + + return affectRows; + } + } + + }