change
This commit is contained in:
parent
acd270836c
commit
c5b53e9700
|
@ -1,153 +0,0 @@
|
|||
package com.taosdata.example.jdbcTaosdemo.domain;
|
||||
|
||||
public final class JdbcTaosdemoConfig {
|
||||
|
||||
//The host to connect to TDengine. Must insert one
|
||||
private String host;
|
||||
//The TCP/IP port number to use for the connection. Default is 6030.
|
||||
private int port = 6030;
|
||||
//The TDengine user name to use when connecting to the server. Default is 'root'
|
||||
private String user = "root";
|
||||
//The password to use when connecting to the server. Default is 'taosdata'
|
||||
private String password = "taosdata";
|
||||
|
||||
//Destination database. Default is 'test'
|
||||
private String dbName = "test";
|
||||
//keep
|
||||
private int keep = 36500;
|
||||
//days
|
||||
private int days = 120;
|
||||
|
||||
//Super table Name. Default is 'meters'
|
||||
private String stbName = "meters";
|
||||
//Table name prefix. Default is 'd'
|
||||
private String tbPrefix = "d";
|
||||
//The number of tables. Default is 10.
|
||||
private int numberOfTable = 10;
|
||||
//The number of records per table. Default is 2
|
||||
private int numberOfRecordsPerTable = 2;
|
||||
//The number of records per request. Default is 100
|
||||
private int numberOfRecordsPerRequest = 100;
|
||||
|
||||
//The number of threads. Default is 1.
|
||||
private int numberOfThreads = 1;
|
||||
//Delete data. Default is false
|
||||
private boolean deleteTable = false;
|
||||
|
||||
public static void printHelp() {
|
||||
System.out.println("Usage: java -jar JdbcTaosDemo.jar [OPTION...]");
|
||||
System.out.println("-h host The host to connect to TDengine. you must input one");
|
||||
System.out.println("-p port The TCP/IP port number to use for the connection. Default is 6030");
|
||||
System.out.println("-u user The TDengine user name to use when connecting to the server. Default is 'root'");
|
||||
System.out.println("-P password The password to use when connecting to the server.Default is 'taosdata'");
|
||||
System.out.println("-d database Destination database. Default is 'test'");
|
||||
System.out.println("-m tablePrefix Table prefix name. Default is 'd'");
|
||||
System.out.println("-t num_of_tables The number of tables. Default is 10");
|
||||
System.out.println("-n num_of_records_per_table The number of records per table. Default is 2");
|
||||
System.out.println("-r num_of_records_per_req The number of records per request. Default is 100");
|
||||
System.out.println("-T num_of_threads The number of threads. Default is 1");
|
||||
System.out.println("-D delete table Delete data methods. Default is false");
|
||||
System.out.println("--help Give this help list");
|
||||
// System.out.println("--infinite infinite insert mode");
|
||||
}
|
||||
|
||||
/**
|
||||
* parse args from command line
|
||||
*
|
||||
* @param args command line args
|
||||
* @return JdbcTaosdemoConfig
|
||||
*/
|
||||
public JdbcTaosdemoConfig(String[] args) {
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
if ("-h".equals(args[i]) && i < args.length - 1) {
|
||||
host = args[++i];
|
||||
}
|
||||
if ("-p".equals(args[i]) && i < args.length - 1) {
|
||||
port = Integer.parseInt(args[++i]);
|
||||
}
|
||||
if ("-u".equals(args[i]) && i < args.length - 1) {
|
||||
user = args[++i];
|
||||
}
|
||||
if ("-P".equals(args[i]) && i < args.length - 1) {
|
||||
password = args[++i];
|
||||
}
|
||||
if ("-d".equals(args[i]) && i < args.length - 1) {
|
||||
dbName = args[++i];
|
||||
}
|
||||
if ("-m".equals(args[i]) && i < args.length - 1) {
|
||||
tbPrefix = args[++i];
|
||||
}
|
||||
if ("-t".equals(args[i]) && i < args.length - 1) {
|
||||
numberOfTable = Integer.parseInt(args[++i]);
|
||||
}
|
||||
if ("-n".equals(args[i]) && i < args.length - 1) {
|
||||
numberOfRecordsPerTable = Integer.parseInt(args[++i]);
|
||||
}
|
||||
if ("-r".equals(args[i]) && i < args.length - 1) {
|
||||
numberOfRecordsPerRequest = Integer.parseInt(args[++i]);
|
||||
}
|
||||
if ("-T".equals(args[i]) && i < args.length - 1) {
|
||||
numberOfThreads = Integer.parseInt(args[++i]);
|
||||
}
|
||||
if ("-D".equals(args[i]) && i < args.length - 1) {
|
||||
deleteTable = Boolean.parseBoolean(args[++i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
public String getUser() {
|
||||
return user;
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
public String getDbName() {
|
||||
return dbName;
|
||||
}
|
||||
|
||||
public int getKeep() {
|
||||
return keep;
|
||||
}
|
||||
|
||||
public int getDays() {
|
||||
return days;
|
||||
}
|
||||
|
||||
public String getStbName() {
|
||||
return stbName;
|
||||
}
|
||||
|
||||
public String getTbPrefix() {
|
||||
return tbPrefix;
|
||||
}
|
||||
|
||||
public int getNumberOfTable() {
|
||||
return numberOfTable;
|
||||
}
|
||||
|
||||
public int getNumberOfRecordsPerTable() {
|
||||
return numberOfRecordsPerTable;
|
||||
}
|
||||
|
||||
public int getNumberOfThreads() {
|
||||
return numberOfThreads;
|
||||
}
|
||||
|
||||
public boolean isDeleteTable() {
|
||||
return deleteTable;
|
||||
}
|
||||
|
||||
public int getNumberOfRecordsPerRequest() {
|
||||
return numberOfRecordsPerRequest;
|
||||
}
|
||||
}
|
|
@ -89,6 +89,12 @@
|
|||
|
||||
<build>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>src/main/resources/lib</directory>
|
||||
<includes>
|
||||
<include>**/*.jar</include>
|
||||
</includes>
|
||||
</resource>
|
||||
<resource>
|
||||
<directory>src/main/resources</directory>
|
||||
<includes>
|
||||
|
@ -97,6 +103,7 @@
|
|||
</includes>
|
||||
<filtering>true</filtering>
|
||||
</resource>
|
||||
|
||||
<resource>
|
||||
<directory>src/main/java</directory>
|
||||
<includes>
|
||||
|
|
|
@ -1,13 +1,12 @@
|
|||
package com.taosdata.taosdemo.components;
|
||||
|
||||
import com.taosdata.taosdemo.domain.FieldMeta;
|
||||
import com.taosdata.taosdemo.domain.SubTableValue;
|
||||
import com.taosdata.taosdemo.domain.SuperTableMeta;
|
||||
import com.taosdata.taosdemo.domain.TagMeta;
|
||||
import com.taosdata.taosdemo.service.DatabaseService;
|
||||
import com.taosdata.taosdemo.service.InsertTask;
|
||||
import com.taosdata.taosdemo.service.SubTableService;
|
||||
import com.taosdata.taosdemo.service.SuperTableService;
|
||||
import com.taosdata.taosdemo.service.data.SubTableValueGenerator;
|
||||
import com.taosdata.taosdemo.service.data.SuperTableMetaGenerator;
|
||||
import com.taosdata.taosdemo.utils.JdbcTaosdemoConfig;
|
||||
import org.apache.log4j.Logger;
|
||||
|
@ -15,25 +14,23 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
|
||||
@Component
|
||||
public class TaosDemoCommandLineRunner implements CommandLineRunner {
|
||||
|
||||
private static Logger logger = Logger.getLogger(TaosDemoCommandLineRunner.class);
|
||||
@Autowired
|
||||
private DatabaseService databaseService;
|
||||
@Autowired
|
||||
private SuperTableService superTableService;
|
||||
@Autowired
|
||||
private SubTableService subTableService;
|
||||
private DataSource dataSource;
|
||||
|
||||
private SuperTableMeta superTableMeta;
|
||||
|
||||
|
@ -42,30 +39,27 @@ public class TaosDemoCommandLineRunner implements CommandLineRunner {
|
|||
// 读配置参数
|
||||
JdbcTaosdemoConfig config = new JdbcTaosdemoConfig(args);
|
||||
boolean isHelp = Arrays.asList(args).contains("--help");
|
||||
if (isHelp) {
|
||||
if (isHelp || config.host == null || config.host.isEmpty()) {
|
||||
JdbcTaosdemoConfig.printHelp();
|
||||
System.exit(0);
|
||||
}
|
||||
|
||||
// 准备数据
|
||||
prepareMetaData(config);
|
||||
// 超级表的meta
|
||||
superTableMeta = createSupertable(config);
|
||||
// 子表的meta
|
||||
// subTableMetaList = SubTableMetaGenerator.generate(superTableMeta, config.numOfTables, config.tablePrefix);
|
||||
dataSource = DataSourceFactory.getInstance(config.host, config.port, config.user, config.password);
|
||||
databaseService = new DatabaseService(dataSource);
|
||||
superTableService = new SuperTableService(dataSource);
|
||||
|
||||
// 创建数据库
|
||||
createDatabaseTask(config);
|
||||
// createDatabaseTask(config);
|
||||
// 超级表的meta
|
||||
superTableMeta = buildSuperTableMeta(config);
|
||||
// 建表
|
||||
createTableTask(config);
|
||||
// createTableTask(config);
|
||||
// 插入
|
||||
insertTask(config);
|
||||
// 查询: 1. 生成查询语句, 2. 执行查询
|
||||
|
||||
// 删除表
|
||||
if (config.dropTable) {
|
||||
superTableService.drop(config.database, config.superTable);
|
||||
}
|
||||
|
||||
System.exit(0);
|
||||
}
|
||||
|
||||
|
@ -92,119 +86,84 @@ public class TaosDemoCommandLineRunner implements CommandLineRunner {
|
|||
if (config.autoCreateTable)
|
||||
return;
|
||||
// 批量建子表
|
||||
subTableService.createSubTable(superTableMeta, config.numOfTables, config.prefixOfTable, config.numOfThreadsForCreate);
|
||||
// subTableService.createSubTable(superTableMeta, config.numOfTables, config.prefixOfTable, config.numOfThreadsForCreate);
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
logger.info(">>> create table time cost : " + (end - start) + " ms.");
|
||||
}
|
||||
|
||||
private void insertTask(JdbcTaosdemoConfig config) {
|
||||
long numOfTables = config.numOfTables;
|
||||
int numOfTablesPerSQL = config.numOfTablesPerSQL;
|
||||
long numOfRowsPerTable = config.numOfRowsPerTable;
|
||||
int numOfValuesPerSQL = config.numOfValuesPerSQL;
|
||||
|
||||
private long getProperStartTime(JdbcTaosdemoConfig config) {
|
||||
Instant now = Instant.now();
|
||||
long earliest = now.minus(Duration.ofDays(config.keep)).toEpochMilli();
|
||||
if (config.startTime == 0 || config.startTime < earliest) {
|
||||
config.startTime = earliest;
|
||||
long startTime = config.startTime;
|
||||
if (startTime == 0 || startTime < earliest) {
|
||||
startTime = earliest;
|
||||
}
|
||||
|
||||
if (numOfRowsPerTable < numOfValuesPerSQL)
|
||||
numOfValuesPerSQL = (int) numOfRowsPerTable;
|
||||
if (numOfTables < numOfTablesPerSQL)
|
||||
numOfTablesPerSQL = (int) numOfTables;
|
||||
|
||||
|
||||
ExecutorService executors = Executors.newFixedThreadPool(config.numOfThreadsForInsert);
|
||||
List<Future<Integer>> futureList = new ArrayList<>();
|
||||
long start = System.currentTimeMillis();
|
||||
long affectRows = 0;
|
||||
|
||||
// row
|
||||
for (long rowCnt = 0; rowCnt < numOfRowsPerTable; ) {
|
||||
long rowSize = numOfValuesPerSQL;
|
||||
if (rowCnt + rowSize > numOfRowsPerTable) {
|
||||
rowSize = numOfRowsPerTable - rowCnt;
|
||||
}
|
||||
|
||||
//table
|
||||
for (long tableCnt = 0; tableCnt < numOfTables; ) {
|
||||
long tableSize = numOfTablesPerSQL;
|
||||
if (tableCnt + tableSize > numOfTables) {
|
||||
tableSize = numOfTables - tableCnt;
|
||||
}
|
||||
/***********************************************/
|
||||
long startTime = config.startTime + rowCnt * config.timeGap;
|
||||
// 生成数据
|
||||
List<SubTableValue> data = SubTableValueGenerator.generate(superTableMeta, config.prefixOfTable, tableCnt, tableSize, rowSize, startTime, config.timeGap);
|
||||
// 乱序
|
||||
if (config.order != 0) {
|
||||
SubTableValueGenerator.disrupt(data, config.rate, config.range);
|
||||
}
|
||||
// insert
|
||||
if (config.autoCreateTable) {
|
||||
Future<Integer> future = executors.submit(() -> subTableService.insertAutoCreateTable(data));
|
||||
try {
|
||||
affectRows += future.get();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
} else {
|
||||
subTableService.insert(data, config.numOfThreadsForInsert, config.frequency);
|
||||
}
|
||||
/***********************************************/
|
||||
tableCnt += tableSize;
|
||||
}
|
||||
rowCnt += rowSize;
|
||||
}
|
||||
executors.shutdown();
|
||||
long end = System.currentTimeMillis();
|
||||
logger.info(">>> insert " + affectRows + " rows with time cost: " + (end - start) + "ms");
|
||||
/*********************************************************************************/
|
||||
// 批量插入,自动建表
|
||||
// dataList.stream().forEach(subTableValues -> {
|
||||
// subTableService.insertAutoCreateTable(subTableValues, config.numOfThreadsForInsert, config.frequency);
|
||||
// });
|
||||
|
||||
// subTableService.insertAutoCreateTable(subTableMetaList, config.numOfTables, config.tablePrefix, config.numOfThreadsForInsert, config.frequency);
|
||||
// } else {
|
||||
// dataList.stream().forEach(subTableValues -> {
|
||||
// subTableService.insert(subTableValues, config.numOfThreadsForInsert, config.frequency);
|
||||
// });
|
||||
|
||||
// subTableService.insert(subTableMetaList, config.numOfTables, config.tablePrefix, config.numOfThreadsForInsert, config.frequency);
|
||||
// }
|
||||
return startTime;
|
||||
}
|
||||
|
||||
private void prepareMetaData(JdbcTaosdemoConfig config) {
|
||||
long start = System.currentTimeMillis();
|
||||
// 超级表的meta
|
||||
superTableMeta = createSupertable(config);
|
||||
// 子表的meta
|
||||
// subTableMetaList = SubTableMetaGenerator.generate(superTableMeta, config.numOfTables, config.prefixOfTable);
|
||||
private void insertTask(JdbcTaosdemoConfig config) {
|
||||
long tableSize = config.numOfTables;
|
||||
int threadSize = config.numOfThreadsForInsert;
|
||||
long startTime = getProperStartTime(config);
|
||||
|
||||
/*
|
||||
// 子表的data
|
||||
subTableValueList = SubTableValueGenerator.generate(subTableMetaList, config.numOfRowsPerTable, config.startTime, config.timeGap);
|
||||
// 如果有乱序,给数据搞乱
|
||||
if (config.order != 0) {
|
||||
SubTableValueGenerator.disrupt(subTableValueList, config.rate, config.range);
|
||||
if (tableSize < threadSize)
|
||||
threadSize = (int) tableSize;
|
||||
long gap = (long) Math.ceil((0.0d + tableSize) / threadSize);
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
List<FutureTask> taskList = new ArrayList<>();
|
||||
List<Thread> threads = IntStream.range(0, threadSize)
|
||||
.mapToObj(i -> {
|
||||
long startInd = i * gap;
|
||||
long endInd = (i + 1) * gap < tableSize ? (i + 1) * gap : tableSize;
|
||||
FutureTask<Integer> task = new FutureTask<>(
|
||||
new InsertTask(superTableMeta,
|
||||
startInd, endInd,
|
||||
startTime, config.timeGap,
|
||||
config.numOfRowsPerTable, config.numOfTablesPerSQL, config.numOfValuesPerSQL,
|
||||
config.order, config.rate, config.range,
|
||||
config.prefixOfTable, config.autoCreateTable, dataSource)
|
||||
);
|
||||
taskList.add(task);
|
||||
return new Thread(task, "InsertThread-" + i);
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
threads.stream().forEach(Thread::start);
|
||||
for (Thread thread : threads) {
|
||||
try {
|
||||
thread.join();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
// 分割数据
|
||||
int numOfTables = config.numOfTables;
|
||||
int numOfTablesPerSQL = config.numOfTablesPerSQL;
|
||||
int numOfRowsPerTable = config.numOfRowsPerTable;
|
||||
int numOfValuesPerSQL = config.numOfValuesPerSQL;
|
||||
dataList = SubTableValueGenerator.split(subTableValueList, numOfTables, numOfTablesPerSQL, numOfRowsPerTable, numOfValuesPerSQL);
|
||||
*/
|
||||
|
||||
int affectedRows = 0;
|
||||
for (FutureTask<Integer> task : taskList) {
|
||||
try {
|
||||
affectedRows += task.get();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
logger.info(">>> prepare meta data time cost : " + (end - start) + " ms.");
|
||||
logger.info("insert " + affectedRows + " rows, time cost: " + (end - start) + " ms");
|
||||
// long numOfTables = config.numOfTables;
|
||||
// int numOfTablesPerSQL = config.numOfTablesPerSQL;
|
||||
// long numOfRowsPerTable = config.numOfRowsPerTable;
|
||||
// int numOfValuesPerSQL = config.numOfValuesPerSQL;
|
||||
|
||||
// long start = System.currentTimeMillis();
|
||||
// long affectRows = 0;
|
||||
// long end = System.currentTimeMillis();
|
||||
// logger.info(">>> insert " + affectRows + " rows with time cost: " + (end - start) + "ms");
|
||||
}
|
||||
|
||||
private SuperTableMeta createSupertable(JdbcTaosdemoConfig config) {
|
||||
private SuperTableMeta buildSuperTableMeta(JdbcTaosdemoConfig config) {
|
||||
SuperTableMeta tableMeta;
|
||||
// create super table
|
||||
if (config.superTableSQL != null) {
|
||||
|
|
|
@ -3,7 +3,6 @@ package com.taosdata.taosdemo.mapper.impl;
|
|||
import com.taosdata.taosdemo.domain.SubTableMeta;
|
||||
import com.taosdata.taosdemo.domain.SubTableValue;
|
||||
import com.taosdata.taosdemo.mapper.SubTableMapper;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.util.List;
|
||||
|
|
|
@ -1,9 +1,14 @@
|
|||
package com.taosdata.taosdemo.service;
|
||||
|
||||
import com.taosdata.taosdemo.mapper.DatabaseMapper;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.Map;
|
||||
|
||||
@Service
|
||||
|
@ -12,9 +17,26 @@ public class DatabaseService {
|
|||
@Autowired
|
||||
private DatabaseMapper databaseMapper;
|
||||
|
||||
private DataSource dataSource;
|
||||
private static Logger logger = Logger.getLogger(DatabaseService.class);
|
||||
|
||||
public DatabaseService(DataSource dataSource) {
|
||||
this.dataSource = dataSource;
|
||||
}
|
||||
|
||||
// 建库,指定 name
|
||||
public int createDatabase(String database) {
|
||||
return databaseMapper.createDatabase(database);
|
||||
try {
|
||||
Connection connection = dataSource.getConnection();
|
||||
Statement statement = connection.createStatement();
|
||||
statement.execute("create database " + database);
|
||||
statement.close();
|
||||
connection.close();
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return 0;
|
||||
// return databaseMapper.createDatabase(database);
|
||||
}
|
||||
|
||||
// 建库,指定参数 keep,days,replica等
|
||||
|
@ -22,17 +44,56 @@ public class DatabaseService {
|
|||
if (map.isEmpty())
|
||||
return 0;
|
||||
if (map.containsKey("database") && map.size() == 1)
|
||||
return databaseMapper.createDatabase(map.get("database"));
|
||||
return databaseMapper.createDatabaseWithParameters(map);
|
||||
createDatabase(map.get("database"));
|
||||
// return databaseMapper.createDatabase(map.get("database"));
|
||||
// return databaseMapper.createDatabaseWithParameters(map);
|
||||
try {
|
||||
Connection connection = dataSource.getConnection();
|
||||
Statement statement = connection.createStatement();
|
||||
String sql = "create database if not exists " + map.get("database")
|
||||
+ " keep " + map.get("keep")
|
||||
+ " days " + map.get("days")
|
||||
+ " replica " + map.get("replica");
|
||||
logger.info(">>> " + sql);
|
||||
statement.execute(sql);
|
||||
statement.close();
|
||||
connection.close();
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// drop database
|
||||
public int dropDatabase(String dbname) {
|
||||
return databaseMapper.dropDatabase(dbname);
|
||||
try {
|
||||
Connection connection = dataSource.getConnection();
|
||||
Statement statement = connection.createStatement();
|
||||
String sql = "drop database if exists " + dbname;
|
||||
logger.info(">>> " + sql);
|
||||
statement.execute(sql);
|
||||
statement.close();
|
||||
connection.close();
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// use database
|
||||
public int useDatabase(String dbname) {
|
||||
return databaseMapper.useDatabase(dbname);
|
||||
try {
|
||||
Connection connection = dataSource.getConnection();
|
||||
Statement statement = connection.createStatement();
|
||||
String sql = "use " + dbname;
|
||||
logger.info(">>> " + sql);
|
||||
statement.execute(sql);
|
||||
statement.close();
|
||||
connection.close();
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return 0;
|
||||
// return databaseMapper.useDatabase(dbname);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
package com.taosdata.taosdemo.service;
|
||||
|
||||
import com.taosdata.taosdemo.domain.SubTableValue;
|
||||
import com.taosdata.taosdemo.domain.SuperTableMeta;
|
||||
import com.taosdata.taosdemo.service.data.SubTableValueGenerator;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.Connection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
public class InsertTask implements Callable<Integer> {
|
||||
|
||||
private final long startTableInd; // included
|
||||
private final long endTableInd; // excluded
|
||||
private final long startTime;
|
||||
private final long timeGap;
|
||||
private final long numOfRowsPerTable;
|
||||
private long numOfTablesPerSQL;
|
||||
private long numOfValuesPerSQL;
|
||||
private final SuperTableMeta superTableMeta;
|
||||
private final int order;
|
||||
private final int rate;
|
||||
private final long range;
|
||||
private final String prefixOfTable;
|
||||
private final boolean autoCreateTable;
|
||||
private final DataSource dataSource;
|
||||
|
||||
public InsertTask(SuperTableMeta superTableMeta, long startTableInd, long endTableInd,
|
||||
long startTime, long timeGap,
|
||||
long numOfRowsPerTable, long numOfTablesPerSQL, long numOfValuesPerSQL,
|
||||
int order, int rate, long range,
|
||||
String prefixOfTable, boolean autoCreateTable, DataSource dataSource) {
|
||||
this.superTableMeta = superTableMeta;
|
||||
this.startTableInd = startTableInd;
|
||||
this.endTableInd = endTableInd;
|
||||
this.startTime = startTime;
|
||||
this.timeGap = timeGap;
|
||||
this.numOfRowsPerTable = numOfRowsPerTable;
|
||||
this.numOfTablesPerSQL = numOfTablesPerSQL;
|
||||
this.numOfValuesPerSQL = numOfValuesPerSQL;
|
||||
this.order = order;
|
||||
this.rate = rate;
|
||||
this.range = range;
|
||||
this.prefixOfTable = prefixOfTable;
|
||||
this.autoCreateTable = autoCreateTable;
|
||||
this.dataSource = dataSource;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
|
||||
Connection connection = dataSource.getConnection();
|
||||
|
||||
long numOfTables = endTableInd - startTableInd;
|
||||
if (numOfRowsPerTable < numOfValuesPerSQL)
|
||||
numOfValuesPerSQL = (int) numOfRowsPerTable;
|
||||
if (numOfTables < numOfTablesPerSQL)
|
||||
numOfTablesPerSQL = (int) numOfTables;
|
||||
|
||||
// row
|
||||
for (long rowCnt = 0; rowCnt < numOfRowsPerTable; ) {
|
||||
long rowSize = numOfValuesPerSQL;
|
||||
if (rowCnt + rowSize > numOfRowsPerTable) {
|
||||
rowSize = numOfRowsPerTable - rowCnt;
|
||||
}
|
||||
//table
|
||||
for (long tableCnt = startTableInd; tableCnt < endTableInd; ) {
|
||||
long tableSize = numOfTablesPerSQL;
|
||||
if (tableCnt + tableSize > endTableInd) {
|
||||
tableSize = endTableInd - tableCnt;
|
||||
}
|
||||
long startTime = this.startTime + rowCnt * timeGap;
|
||||
|
||||
// System.out.println(Thread.currentThread().getName() + " >>> " + "rowCnt: " + rowCnt + ", rowSize: " + rowSize + ", " + "tableCnt: " + tableCnt + ",tableSize: " + tableSize + ", " + "startTime: " + startTime + ",timeGap: " + timeGap + "");
|
||||
/***********************************************/
|
||||
// 生成数据
|
||||
List<SubTableValue> data = SubTableValueGenerator.generate(superTableMeta, prefixOfTable, tableCnt, tableSize, rowSize, startTime, timeGap);
|
||||
// 乱序
|
||||
if (order != 0) {
|
||||
SubTableValueGenerator.disrupt(data, rate, range);
|
||||
}
|
||||
// insert
|
||||
SubTableService subTableService = new SubTableService(connection);
|
||||
if (autoCreateTable) {
|
||||
subTableService.insertAutoCreateTable(data);
|
||||
} else {
|
||||
subTableService.insert(data);
|
||||
}
|
||||
/***********************************************/
|
||||
tableCnt += tableSize;
|
||||
}
|
||||
rowCnt += rowSize;
|
||||
}
|
||||
|
||||
connection.close();
|
||||
return 1;
|
||||
}
|
||||
|
||||
}
|
|
@ -30,6 +30,15 @@ public class SubTableService extends AbstractService {
|
|||
@Autowired
|
||||
private SubTableMapper mapper;
|
||||
|
||||
private Connection connection;
|
||||
|
||||
public SubTableService() {
|
||||
}
|
||||
|
||||
public SubTableService(Connection connection) {
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* 1. 选择database,找到所有supertable
|
||||
* 2. 选择supertable,可以拿到表结构,包括field和tag
|
||||
|
@ -113,8 +122,6 @@ public class SubTableService extends AbstractService {
|
|||
return mapper.insertOneTableMultiValuesUsingSuperTable(subTableValue);
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private SqlSessionFactory sqlSessionFactory;
|
||||
@Autowired
|
||||
private DataSource dataSource;
|
||||
|
||||
|
@ -123,13 +130,11 @@ public class SubTableService extends AbstractService {
|
|||
|
||||
int affectRows = 0;
|
||||
try {
|
||||
Connection connection = dataSource.getConnection();
|
||||
String sql = sql(subTableValues);
|
||||
// logger.info(">>> SQL : " + sql);
|
||||
logger.info(">>> SQL : " + sql);
|
||||
Statement statement = connection.createStatement();
|
||||
affectRows = statement.executeUpdate(sql);
|
||||
statement.close();
|
||||
connection.close();
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
@ -142,7 +147,7 @@ public class SubTableService extends AbstractService {
|
|||
sb.append("insert into ");
|
||||
for (int i = 0; i < subTableValues.size(); i++) {
|
||||
SubTableValue subTableValue = subTableValues.get(i);
|
||||
sb.append(subTableValue.getDatabase() + "." + subTableValue.getName() + " using " + subTableValue.getSupertable() + " tags (");
|
||||
sb.append(subTableValue.getDatabase() + "." + subTableValue.getName() + " using " + subTableValue.getDatabase() + "." + subTableValue.getSupertable() + " tags (");
|
||||
for (int j = 0; j < subTableValue.getTags().size(); j++) {
|
||||
TagValue tagValue = subTableValue.getTags().get(j);
|
||||
if (j == 0)
|
||||
|
|
|
@ -1,19 +1,74 @@
|
|||
package com.taosdata.taosdemo.service;
|
||||
|
||||
import com.taosdata.taosdemo.domain.FieldMeta;
|
||||
import com.taosdata.taosdemo.domain.SuperTableMeta;
|
||||
import com.taosdata.taosdemo.domain.TagMeta;
|
||||
import com.taosdata.taosdemo.mapper.SuperTableMapper;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.List;
|
||||
|
||||
@Service
|
||||
public class SuperTableService {
|
||||
|
||||
private static Logger logger = Logger.getLogger(SuperTableService.class);
|
||||
@Autowired
|
||||
private SuperTableMapper superTableMapper;
|
||||
|
||||
private DataSource dataSource;
|
||||
|
||||
public SuperTableService(DataSource dataSource) {
|
||||
this.dataSource = dataSource;
|
||||
}
|
||||
|
||||
// 创建超级表,指定每个field的名称和类型,每个tag的名称和类型
|
||||
public int create(SuperTableMeta superTableMeta) {
|
||||
return superTableMapper.createSuperTable(superTableMeta);
|
||||
int result = 0;
|
||||
try {
|
||||
Connection connection = dataSource.getConnection();
|
||||
Statement statement = connection.createStatement();
|
||||
String sql = sql(superTableMeta);
|
||||
logger.info(">>> " + sql);
|
||||
result = statement.executeUpdate(sql);
|
||||
statement.close();
|
||||
connection.close();
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return result;
|
||||
// return superTableMapper.createSuperTable(superTableMeta);
|
||||
}
|
||||
|
||||
private String sql(SuperTableMeta superTableMeta) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("create table " + superTableMeta.getDatabase() + "." + superTableMeta.getName() + "(");
|
||||
List<FieldMeta> fields = superTableMeta.getFields();
|
||||
for (int i = 0; i < fields.size(); i++) {
|
||||
FieldMeta fieldMeta = fields.get(i);
|
||||
if (i == 0) {
|
||||
sb.append(fieldMeta.getName() + " " + fieldMeta.getType());
|
||||
} else {
|
||||
sb.append(", " + fieldMeta.getName() + " " + fieldMeta.getType());
|
||||
}
|
||||
}
|
||||
sb.append(") tags(");
|
||||
List<TagMeta> tags = superTableMeta.getTags();
|
||||
for (int i = 0; i < tags.size(); i++) {
|
||||
TagMeta tagMeta = tags.get(i);
|
||||
if (i == 0) {
|
||||
sb.append(tagMeta.getName() + " " + tagMeta.getType());
|
||||
} else {
|
||||
sb.append(", " + tagMeta.getName() + " " + tagMeta.getType() + "");
|
||||
}
|
||||
}
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public void drop(String database, String name) {
|
||||
|
|
|
@ -23,10 +23,10 @@ public final class JdbcTaosdemoConfig {
|
|||
public String prefixOfTable = "t";
|
||||
// insert task
|
||||
public boolean autoCreateTable = true;
|
||||
public long numOfTables = 100;
|
||||
public long numOfRowsPerTable = 100;
|
||||
public int numOfTablesPerSQL = 10;
|
||||
public int numOfValuesPerSQL = 10;
|
||||
public long numOfTables = 10;
|
||||
public long numOfRowsPerTable = 10;
|
||||
public int numOfTablesPerSQL = 2;
|
||||
public int numOfValuesPerSQL = 2;
|
||||
public int numOfThreadsForCreate = 1;
|
||||
public int numOfThreadsForInsert = 1;
|
||||
public long startTime;
|
||||
|
|
|
@ -10,9 +10,8 @@ spring.datasource.password=taosdata
|
|||
#spring.datasource.driver-class-name=com.taosdata.jdbc.rs.RestfulDriver
|
||||
#spring.datasource.username=root
|
||||
#spring.datasource.password=taosdata
|
||||
spring.datasource.hikari.maximum-pool-size=500
|
||||
spring.datasource.hikari.minimum-idle=500
|
||||
spring.datasource.hikari.maximum-pool-size=1
|
||||
spring.datasource.hikari.minimum-idle=1
|
||||
spring.datasource.hikari.max-lifetime=0
|
||||
logging.level.com.taosdata.taosdemo.mapper=error
|
||||
|
||||
server.port=8888
|
|
@ -0,0 +1,119 @@
|
|||
{
|
||||
"filetype": "insert",
|
||||
"cfgdir": "/etc/taos",
|
||||
"host": "127.0.0.1",
|
||||
"port": 6030,
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"thread_count": 2,
|
||||
"databases": [
|
||||
{
|
||||
"dbinfo": {
|
||||
"name": "db04",
|
||||
"drop": "no",
|
||||
"replica": 1,
|
||||
"days": 2,
|
||||
"cache": 16,
|
||||
"blocks": 8,
|
||||
"precision": "ms",
|
||||
"keep": 365,
|
||||
"minRows": 100,
|
||||
"maxRows": 4096,
|
||||
"comp": 2,
|
||||
"walLevel": 1,
|
||||
"quorum": 1,
|
||||
"fsync": 3000,
|
||||
"update": 0
|
||||
},
|
||||
"super_tables": [
|
||||
{
|
||||
"name": "stb04",
|
||||
"child_table_exists": "no",
|
||||
"childtable_count": 10,
|
||||
"childtable_prefix": "stb01_",
|
||||
"auto_create_table": "no",
|
||||
"data_source": "rand",
|
||||
"insert_mode": "taosc",
|
||||
"insert_rate": 0,
|
||||
"insert_rows": 100,
|
||||
"multi_thread_write_one_tbl": "no",
|
||||
"number_of_tbl_in_one_sql": 0,
|
||||
"rows_per_tbl": 3,
|
||||
"max_sql_len": 1024,
|
||||
"disorder_ratio": 0,
|
||||
"disorder_range": 1000,
|
||||
"timestamp_step": 10,
|
||||
"start_timestamp": "2020-10-01 00:00:00.000",
|
||||
"sample_format": "csv",
|
||||
"sample_file": "./sample.csv",
|
||||
"tags_file": "",
|
||||
"columns": [
|
||||
{
|
||||
"type": "TINYINT"
|
||||
},
|
||||
{
|
||||
"type": "SMALLINT"
|
||||
},
|
||||
{
|
||||
"type": "INT"
|
||||
},
|
||||
{
|
||||
"type": "BIGINT"
|
||||
},
|
||||
{
|
||||
"type": "BOOL"
|
||||
},
|
||||
{
|
||||
"type": "FLOAT"
|
||||
},
|
||||
{
|
||||
"type": "DOUBLE"
|
||||
},
|
||||
{
|
||||
"type": "TIMESTAMP"
|
||||
},
|
||||
{
|
||||
"type": "BINARY",
|
||||
"len": 16
|
||||
},
|
||||
{
|
||||
"type": "NCHAR",
|
||||
"len": 4
|
||||
}
|
||||
],
|
||||
"tags": [
|
||||
{
|
||||
"type": "TINYINT"
|
||||
},
|
||||
{
|
||||
"type": "SMALLINT"
|
||||
},
|
||||
{
|
||||
"type": "INT"
|
||||
},
|
||||
{
|
||||
"type": "BIGINT"
|
||||
},
|
||||
{
|
||||
"type": "BOOL"
|
||||
},
|
||||
{
|
||||
"type": "FLOAT"
|
||||
},
|
||||
{
|
||||
"type": "DOUBLE"
|
||||
},
|
||||
{
|
||||
"type": "BINARY",
|
||||
"len": 16
|
||||
},
|
||||
{
|
||||
"type": "NCHAR",
|
||||
"len": 4
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
### 设置###
|
||||
log4j.rootLogger=debug,stdout,DebugLog,ErrorLog
|
||||
log4j.rootLogger=debug,stdout
|
||||
### 输出信息到控制抬 ###
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.Target=System.out
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
{
|
||||
"filetype":"query",
|
||||
"cfgdir": "/etc/taos",
|
||||
"host": "127.0.0.1",
|
||||
"port": 6030,
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"databases": "db01",
|
||||
"super_table_query":
|
||||
{"rate":1, "concurrent":1,
|
||||
"sqls": [{"sql": "select count(*) from stb01", "result": "./query_res0.txt"}]
|
||||
},
|
||||
"sub_table_query":
|
||||
{"stblname": "stb01", "rate":1, "threads":1,
|
||||
"sqls": [{"sql": "select count(*) from xxxx", "result": "./query_res1.txt"}]
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue