@ -7,7 +7,6 @@ import (
"encoding/json"
"encoding/json"
"flag"
"flag"
"fmt"
"fmt"
"hash/crc32"
"io"
"io"
"log"
"log"
"os"
"os"
@ -17,47 +16,55 @@ import (
"sync"
"sync"
"time"
"time"
data i mport "github.com/taosdata/TDengine/importSampleData/import"
data I mport "github.com/taosdata/TDengine/importSampleData/import"
_ "github.com/taosdata/driver-go/taosSql"
_ "github.com/taosdata/driver-go/taosSql"
)
)
const (
const (
TIMESTAMP = "timestamp"
// 主键类型必须为 timestamp
DATETIME = "datetime"
TIMESTAMP = "timestamp"
MILLISECOND = "millisecond"
DEFAULT_STARTTIME int64 = - 1
DEFAULT_INTERVAL int64 = 1 * 1000
DEFAULT_DELAY int64 = - 1
DEFAULT_STATISTIC_TABLE = "statistic"
JSON_FORMAT = "json"
// 样例数据中主键时间字段是 millisecond 还是 dateTime 格式
CSV_FORMAT = "csv"
DATETIME = "datetime"
SUPERTABLE_PREFIX = "s_"
MILLISECOND = "millisecond"
SUBTABLE_PREFIX = "t_"
DRIVER_NAME = "taosSql"
DefaultStartTime int64 = - 1
STARTTIME_LAYOUT = "2006-01-02 15:04:05.000"
DefaultInterval int64 = 1 * 1000 // 导入的记录时间间隔,该设置只会在指定 auto=1 之后生效,否则会根据样例数据自动计算间隔时间。单位为毫秒,默认 1000。
INSERT_PREFIX = "insert into "
DefaultDelay int64 = - 1 //
// 当 save 为 1 时保存统计信息的表名, 默认 statistic。
DefaultStatisticTable = "statistic"
// 样例数据文件格式,可以是 json 或 csv
JsonFormat = "json"
CsvFormat = "csv"
SuperTablePrefix = "s_" // 超级表前缀
SubTablePrefix = "t_" // 子表前缀
DriverName = "taosSql"
StartTimeLayout = "2006-01-02 15:04:05.000"
InsertPrefix = "insert into "
)
)
var (
var (
cfg string
cfg string // 导入配置文件路径,包含样例数据文件相关描述及对应 TDengine 配置信息。默认使用 config/cfg.toml
cases string
cases string // 需要导入的场景名称,该名称可从 -cfg 指定的配置文件中 [usecase] 查看, 可同时导入多个场景, 中间使用逗号分隔, 如: sensor_info,camera_detection, 默认为 sensor_info
hnum int
hnum int // 需要将样例数据进行横向扩展的倍数,假设原有样例数据包含 1 张子表 t_0 数据,指定 hnum 为 2 时会根据原有表名创建 t、t_1 两张子表。默认为 100。
vnum int
vnum int // 需要将样例数据进行纵向扩展的次数,如果设置为 0 代表将历史数据导入至当前时间后持续按照指定间隔导入。默认为 1000, 表示将样例数据在时间轴上纵向复制1000 次
thread int
thread int // 执行导入数据的线程数目,默认为 10
batch int
batch int // 执行导入数据时的批量大小,默认为 100。批量是指一次写操作时, 包含多少条记录
auto int
auto int // 是否自动生成样例数据中的主键时间戳, 1 是, 0 否, 默认 0
starttimestr string
start TimeStr string // 导入的记录开始时间,格式为 "yyyy-MM-dd HH:mm:ss.SSS",不设置会使用样例数据中最小时间,设置后会忽略样例数据中的主键时间,会按照指定的 start 进行导入。如果 auto 为 1, 则必须设置 start, 默认为空
interval int64
interval int64 // 导入的记录时间间隔,该设置只会在指定 auto=1 之后生效,否则会根据样例数据自动计算间隔时间。单位为毫秒,默认 1000
host string
host string // 导入的 TDengine 服务器 IP, 默认为 127.0.0.1
port int
port int // 导入的 TDengine 服务器端口,默认为 6030
user string
user string // 导入的 TDengine 用户名,默认为 root
password string
password string // 导入的 TDengine 用户密码,默认为 taosdata
dropdb int
dropdb int // 导入数据之前是否删除数据库, 1 是, 0 否, 默认 0
db string
db string // 导入的 TDengine 数据库名称,默认为 test_yyyyMMdd
dbparam string
dbparam string // 当指定的数据库不存在时,自动创建数据库时可选项配置参数,如 days 10 cache 16000 ablocks 4, 默认为空
dataSourceName string
dataSourceName string
startTime int64
startTime int64
@ -72,10 +79,10 @@ var (
lastStaticTime time . Time
lastStaticTime time . Time
lastTotalRows int64
lastTotalRows int64
timeTicker * time . Ticker
timeTicker * time . Ticker
delay int64 // default 10 milliseconds
delay int64 // 当 vnum 设置为 0 时持续导入的时间间隔,默认为所有场景中最小记录间隔时间的一半,单位 ms。
tick int64
tick int64 // 打印统计信息的时间间隔,默认 2000 ms。
save int
save int // 是否保存统计信息到 tdengine 的 statistic 表中, 1 是, 0 否, 默认 0。
saveTable string
saveTable string // 当 save 为 1 时保存统计信息的表名, 默认 statistic。
)
)
type superTableConfig struct {
type superTableConfig struct {
@ -83,7 +90,7 @@ type superTableConfig struct {
endTime int64
endTime int64
cycleTime int64
cycleTime int64
avgInterval int64
avgInterval int64
config data i mport. CaseConfig
config data I mport. CaseConfig
}
}
type scaleTableInfo struct {
type scaleTableInfo struct {
@ -92,14 +99,14 @@ type scaleTableInfo struct {
insertRows int64
insertRows int64
}
}
type tableRows struct {
//type tableRows struct {
tableName string // tableName
// tableName string // tableName
value string // values(...)
// value string // values(...)
}
//}
type dataRows struct {
type dataRows struct {
rows [ ] map [ string ] interface { }
rows [ ] map [ string ] interface { }
config data i mport. CaseConfig
config data I mport. CaseConfig
}
}
func ( rows dataRows ) Len ( ) int {
func ( rows dataRows ) Len ( ) int {
@ -107,9 +114,9 @@ func (rows dataRows) Len() int {
}
}
func ( rows dataRows ) Less ( i , j int ) bool {
func ( rows dataRows ) Less ( i , j int ) bool {
i t ime := getPrimaryKey ( rows . rows [ i ] [ rows . config . Timestamp ] )
i T ime := getPrimaryKey ( rows . rows [ i ] [ rows . config . Timestamp ] )
j t ime := getPrimaryKey ( rows . rows [ j ] [ rows . config . Timestamp ] )
j T ime := getPrimaryKey ( rows . rows [ j ] [ rows . config . Timestamp ] )
return i time < jt ime
return i Time < jT ime
}
}
func ( rows dataRows ) Swap ( i , j int ) {
func ( rows dataRows ) Swap ( i , j int ) {
@ -123,26 +130,26 @@ func getPrimaryKey(value interface{}) int64 {
}
}
func init ( ) {
func init ( ) {
parseArg ( ) // parse argument
parseArg ( ) // parse argument
if db == "" {
if db == "" {
// db = "go"
// 导入的 TDengine 数据库名称,默认为 test_yyyyMMdd
db = fmt . Sprintf ( "test_%s" , time . Now ( ) . Format ( "20060102" ) )
db = fmt . Sprintf ( "test_%s" , time . Now ( ) . Format ( "20060102" ) )
}
}
if auto == 1 && len ( start times tr) == 0 {
if auto == 1 && len ( start TimeS tr) == 0 {
log . Fatalf ( "startTime must be set when auto is 1, the format is \"yyyy-MM-dd HH:mm:ss.SSS\" " )
log . Fatalf ( "startTime must be set when auto is 1, the format is \"yyyy-MM-dd HH:mm:ss.SSS\" " )
}
}
if len ( start times tr) != 0 {
if len ( start TimeS tr) != 0 {
t , err := time . ParseInLocation ( S TARTTIME_LAYOUT, strings . TrimSpace ( starttimes tr) , time . Local )
t , err := time . ParseInLocation ( S tartTimeLayout, strings . TrimSpace ( startTimeS tr) , time . Local )
if err != nil {
if err != nil {
log . Fatalf ( "param startTime %s error, %s\n" , start times tr, err )
log . Fatalf ( "param startTime %s error, %s\n" , start TimeS tr, err )
}
}
startTime = t . UnixNano ( ) / 1e6 // as millisecond
startTime = t . UnixNano ( ) / 1e6 // as millisecond
} else {
} else {
startTime = D EFAULT_STARTTIME
startTime = D efaultStartTime
}
}
dataSourceName = fmt . Sprintf ( "%s:%s@/tcp(%s:%d)/" , user , password , host , port )
dataSourceName = fmt . Sprintf ( "%s:%s@/tcp(%s:%d)/" , user , password , host , port )
@ -154,9 +161,9 @@ func init() {
func main ( ) {
func main ( ) {
importConfig := data i mport. LoadConfig ( cfg )
importConfig := data I mport. LoadConfig ( cfg )
var caseMin um Interval int64 = - 1
var caseMin Interval int64 = - 1
for _ , userCase := range strings . Split ( cases , "," ) {
for _ , userCase := range strings . Split ( cases , "," ) {
caseConfig , ok := importConfig . UserCases [ userCase ]
caseConfig , ok := importConfig . UserCases [ userCase ]
@ -168,7 +175,7 @@ func main() {
checkUserCaseConfig ( userCase , & caseConfig )
checkUserCaseConfig ( userCase , & caseConfig )
// read file as map array
// read file as map array
fileRows := readFile ( caseConfig )
fileRows := readFile ( caseConfig )
log . Printf ( "case [%s] sample data file contains %d rows.\n" , userCase , len ( fileRows . rows ) )
log . Printf ( "case [%s] sample data file contains %d rows.\n" , userCase , len ( fileRows . rows ) )
@ -177,31 +184,31 @@ func main() {
continue
continue
}
}
_ , exists := superTableConfigMap [ caseConfig . St n ame]
_ , exists := superTableConfigMap [ caseConfig . St N ame]
if ! exists {
if ! exists {
superTableConfigMap [ caseConfig . St n ame] = & superTableConfig { config : caseConfig }
superTableConfigMap [ caseConfig . St N ame] = & superTableConfig { config : caseConfig }
} else {
} else {
log . Fatalf ( "the stname of case %s already exist.\n" , caseConfig . St n ame)
log . Fatalf ( "the stname of case %s already exist.\n" , caseConfig . St N ame)
}
}
var start , cycleTime , avgInterval int64 = getSuperTableTimeConfig ( fileRows )
var start , cycleTime , avgInterval int64 = getSuperTableTimeConfig ( fileRows )
// set super table's startTime, cycleTime and avgInterval
// set super table's startTime, cycleTime and avgInterval
superTableConfigMap [ caseConfig . St n ame] . startTime = start
superTableConfigMap [ caseConfig . St N ame] . startTime = start
superTableConfigMap [ caseConfig . St name] . avgInterval = avgInterval
superTableConfigMap [ caseConfig . St Name] . cycleTime = cycleTime
superTableConfigMap [ caseConfig . St name] . cycleTime = cycleTime
superTableConfigMap [ caseConfig . St Name] . avgInterval = avgInterval
if caseMin um Interval == - 1 || caseMin um Interval > avgInterval {
if caseMin Interval == - 1 || caseMin Interval > avgInterval {
caseMin um Interval = avgInterval
caseMin Interval = avgInterval
}
}
startStr := time . Unix ( 0 , start * int64 ( time . Millisecond ) ) . Format ( S TARTTIME_LAYOUT )
startStr := time . Unix ( 0 , start * int64 ( time . Millisecond ) ) . Format ( S tartTimeLayout )
log . Printf ( "case [%s] startTime %s(%d), average dataInterval %d ms, cycleTime %d ms.\n" , userCase , startStr , start , avgInterval , cycleTime )
log . Printf ( "case [%s] startTime %s(%d), average dataInterval %d ms, cycleTime %d ms.\n" , userCase , startStr , start , avgInterval , cycleTime )
}
}
if D EFAULT_DELAY == delay {
if D efaultDelay == delay {
// default delay
// default delay
delay = caseMin um Interval / 2
delay = caseMin Interval / 2
if delay < 1 {
if delay < 1 {
delay = 1
delay = 1
}
}
@ -218,7 +225,7 @@ func main() {
createSuperTable ( superTableConfigMap )
createSuperTable ( superTableConfigMap )
log . Printf ( "create %d superTable ,used %d ms.\n" , superTableNum , time . Since ( start ) / 1e6 )
log . Printf ( "create %d superTable ,used %d ms.\n" , superTableNum , time . Since ( start ) / 1e6 )
// create sub table
// create sub table
start = time . Now ( )
start = time . Now ( )
createSubTable ( subTableMap )
createSubTable ( subTableMap )
log . Printf ( "create %d times of %d subtable ,all %d tables, used %d ms.\n" , hnum , len ( subTableMap ) , len ( scaleTableMap ) , time . Since ( start ) / 1e6 )
log . Printf ( "create %d times of %d subtable ,all %d tables, used %d ms.\n" , hnum , len ( subTableMap ) , len ( scaleTableMap ) , time . Since ( start ) / 1e6 )
@ -278,7 +285,7 @@ func staticSpeed() {
defer connection . Close ( )
defer connection . Close ( )
if save == 1 {
if save == 1 {
connection. Exec ( "use " + db )
_, _ = connection. Exec ( "use " + db )
_ , err := connection . Exec ( "create table if not exists " + saveTable + "(ts timestamp, speed int)" )
_ , err := connection . Exec ( "create table if not exists " + saveTable + "(ts timestamp, speed int)" )
if err != nil {
if err != nil {
log . Fatalf ( "create %s Table error: %s\n" , saveTable , err )
log . Fatalf ( "create %s Table error: %s\n" , saveTable , err )
@ -294,12 +301,12 @@ func staticSpeed() {
total := getTotalRows ( successRows )
total := getTotalRows ( successRows )
currentSuccessRows := total - lastTotalRows
currentSuccessRows := total - lastTotalRows
speed := currentSuccessRows * 1e9 / int64 ( usedTime )
speed := currentSuccessRows * 1e9 / usedTime
log . Printf ( "insert %d rows, used %d ms, speed %d rows/s" , currentSuccessRows , usedTime / 1e6 , speed )
log . Printf ( "insert %d rows, used %d ms, speed %d rows/s" , currentSuccessRows , usedTime / 1e6 , speed )
if save == 1 {
if save == 1 {
insertSql := fmt . Sprintf ( "insert into %s values(%d, %d)" , saveTable , currentTime . UnixNano ( ) / 1e6 , speed )
insertSql := fmt . Sprintf ( "insert into %s values(%d, %d)" , saveTable , currentTime . UnixNano ( ) / 1e6 , speed )
connection. Exec ( insertSql )
_, _ = connection. Exec ( insertSql )
}
}
lastStaticTime = currentTime
lastStaticTime = currentTime
@ -327,12 +334,13 @@ func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval i
} else {
} else {
// use the sample data primary timestamp
// use the sample data primary timestamp
sort . Sort ( fileRows ) // sort the file data by the primary k ey
sort . Sort ( fileRows ) // sort the file data by the primary K ey
minTime := getPrimaryKey ( fileRows . rows [ 0 ] [ fileRows . config . Timestamp ] )
minTime := getPrimaryKey ( fileRows . rows [ 0 ] [ fileRows . config . Timestamp ] )
maxTime := getPrimaryKey ( fileRows . rows [ len ( fileRows . rows ) - 1 ] [ fileRows . config . Timestamp ] )
maxTime := getPrimaryKey ( fileRows . rows [ len ( fileRows . rows ) - 1 ] [ fileRows . config . Timestamp ] )
start = minTime // default startTime use the minTime
start = minTime // default startTime use the minTime
if DEFAULT_STARTTIME != startTime {
// 设置了start时间的话 按照start来
if DefaultStartTime != startTime {
start = startTime
start = startTime
}
}
@ -350,31 +358,21 @@ func getSuperTableTimeConfig(fileRows dataRows) (start, cycleTime, avgInterval i
return
return
}
}
func createStatisticTable ( ) {
connection := getConnection ( )
defer connection . Close ( )
_ , err := connection . Exec ( "create table if not exist " + db + "." + saveTable + "(ts timestamp, speed int)" )
if err != nil {
log . Fatalf ( "createStatisticTable error: %s\n" , err )
}
}
func createSubTable ( subTableMaps map [ string ] * dataRows ) {
func createSubTable ( subTableMaps map [ string ] * dataRows ) {
connection := getConnection ( )
connection := getConnection ( )
defer connection . Close ( )
defer connection . Close ( )
connection. Exec ( "use " + db )
_ , _ = connection . Exec ( "use " + db )
createTablePrefix := "create table if not exists "
createTablePrefix := "create table if not exists "
var buffer bytes . Buffer
for subTableName := range subTableMaps {
for subTableName := range subTableMaps {
superTableName := getSuperTableName ( subTableMaps [ subTableName ] . config . St n ame)
superTableName := getSuperTableName ( subTableMaps [ subTableName ] . config . St N ame)
tag Values := subTableMaps [ subTableName ] . rows [ 0 ] // the first rows values as tags
firstRow Values := subTableMaps [ subTableName ] . rows [ 0 ] // the first rows values as tags
buffers := bytes . Buffer { }
// create table t using superTable tags(...);
// create table t using supertTable tags(...);
for i := 0 ; i < hnum ; i ++ {
for i := 0 ; i < hnum ; i ++ {
tableName := getScaleSubTableName ( subTableName , i )
tableName := getScaleSubTableName ( subTableName , i )
@ -384,21 +382,21 @@ func createSubTable(subTableMaps map[string]*dataRows) {
}
}
scaleTableNames = append ( scaleTableNames , tableName )
scaleTableNames = append ( scaleTableNames , tableName )
buffer s . WriteString ( createTablePrefix )
buffer . WriteString ( createTablePrefix )
buffer s . WriteString ( tableName )
buffer . WriteString ( tableName )
buffer s . WriteString ( " using " )
buffer . WriteString ( " using " )
buffer s . WriteString ( superTableName )
buffer . WriteString ( superTableName )
buffer s . WriteString ( " tags(" )
buffer . WriteString ( " tags(" )
for _ , tag := range subTableMaps [ subTableName ] . config . Tags {
for _ , tag := range subTableMaps [ subTableName ] . config . Tags {
tagValue := fmt . Sprintf ( "%v" , tag Values[ strings . ToLower ( tag . Name ) ] )
tagValue := fmt . Sprintf ( "%v" , firstRow Values[ strings . ToLower ( tag . Name ) ] )
buffer s . WriteString ( "'" + tagValue + "'" )
buffer . WriteString ( "'" + tagValue + "'" )
buffer s . WriteString ( "," )
buffer . WriteString ( "," )
}
}
buffer s . Truncate ( buffer s . Len ( ) - 1 )
buffer . Truncate ( buffer . Len ( ) - 1 )
buffer s . WriteString ( ")" )
buffer . WriteString ( ")" )
createTableSql := buffer s . String ( )
createTableSql := buffer . String ( )
buffer s . Reset ( )
buffer . Reset ( )
//log.Printf("create table: %s\n", createTableSql)
//log.Printf("create table: %s\n", createTableSql)
_ , err := connection . Exec ( createTableSql )
_ , err := connection . Exec ( createTableSql )
@ -420,7 +418,7 @@ func createSuperTable(superTableConfigMap map[string]*superTableConfig) {
if err != nil {
if err != nil {
log . Fatalf ( "drop database error: %s\n" , err )
log . Fatalf ( "drop database error: %s\n" , err )
}
}
log . Printf ( "drop D b: %s\n", dropDbSql )
log . Printf ( "drop d b: %s\n", dropDbSql )
}
}
createDbSql := "create database if not exists " + db + " " + dbparam
createDbSql := "create database if not exists " + db + " " + dbparam
@ -431,7 +429,7 @@ func createSuperTable(superTableConfigMap map[string]*superTableConfig) {
}
}
log . Printf ( "createDb: %s\n" , createDbSql )
log . Printf ( "createDb: %s\n" , createDbSql )
connection. Exec ( "use " + db )
_, _ = connection. Exec ( "use " + db )
prefix := "create table if not exists "
prefix := "create table if not exists "
var buffer bytes . Buffer
var buffer bytes . Buffer
@ -464,7 +462,7 @@ func createSuperTable(superTableConfigMap map[string]*superTableConfig) {
createSql := buffer . String ( )
createSql := buffer . String ( )
buffer . Reset ( )
buffer . Reset ( )
//log.Printf("super t able: %s\n", createSql)
//log.Printf("super T able: %s\n", createSql)
_ , err = connection . Exec ( createSql )
_ , err = connection . Exec ( createSql )
if err != nil {
if err != nil {
log . Fatalf ( "create supertable error: %s\n" , err )
log . Fatalf ( "create supertable error: %s\n" , err )
@ -473,15 +471,15 @@ func createSuperTable(superTableConfigMap map[string]*superTableConfig) {
}
}
func getScaleSubTableName ( subTableName string , h n um int ) string {
func getScaleSubTableName ( subTableName string , h N um int ) string {
if h n um == 0 {
if h N um == 0 {
return subTableName
return subTableName
}
}
return fmt . Sprintf ( "%s_%d" , subTableName , h n um)
return fmt . Sprintf ( "%s_%d" , subTableName , h N um)
}
}
func getSuperTableName ( st n ame string ) string {
func getSuperTableName ( st N ame string ) string {
return S UPERTABLE_PREFIX + stn ame
return S uperTablePrefix + stN ame
}
}
/ * *
/ * *
@ -499,7 +497,7 @@ func normalizationData(fileRows dataRows, minTime int64) int64 {
row [ fileRows . config . Timestamp ] = getPrimaryKey ( row [ fileRows . config . Timestamp ] ) - minTime
row [ fileRows . config . Timestamp ] = getPrimaryKey ( row [ fileRows . config . Timestamp ] ) - minTime
subTableName := getSubTableName ( tableValue , fileRows . config . St n ame)
subTableName := getSubTableName ( tableValue , fileRows . config . St N ame)
value , ok := subTableMap [ subTableName ]
value , ok := subTableMap [ subTableName ]
if ! ok {
if ! ok {
@ -527,7 +525,7 @@ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int
continue
continue
}
}
subTableName := getSubTableName ( tableValue , fileRows . config . St n ame)
subTableName := getSubTableName ( tableValue , fileRows . config . St N ame)
value , ok := currSubTableMap [ subTableName ]
value , ok := currSubTableMap [ subTableName ]
if ! ok {
if ! ok {
@ -543,7 +541,7 @@ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int
}
}
var maxRows , tableRows int = 0 , 0
var maxRows , tableRows = 0 , 0
for tableName := range currSubTableMap {
for tableName := range currSubTableMap {
tableRows = len ( currSubTableMap [ tableName ] . rows )
tableRows = len ( currSubTableMap [ tableName ] . rows )
subTableMap [ tableName ] = currSubTableMap [ tableName ] // add to global subTableMap
subTableMap [ tableName ] = currSubTableMap [ tableName ] // add to global subTableMap
@ -556,7 +554,7 @@ func normalizationDataWithSameInterval(fileRows dataRows, avgInterval int64) int
}
}
func getSubTableName ( subTableValue string , superTableName string ) string {
func getSubTableName ( subTableValue string , superTableName string ) string {
return S UBTABLE_PREFIX + subTableValue + "_" + superTableName
return S ubTablePrefix + subTableValue + "_" + superTableName
}
}
func insertData ( threadIndex , start , end int , wg * sync . WaitGroup , successRows [ ] int64 ) {
func insertData ( threadIndex , start , end int , wg * sync . WaitGroup , successRows [ ] int64 ) {
@ -564,25 +562,25 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []i
defer connection . Close ( )
defer connection . Close ( )
defer wg . Done ( )
defer wg . Done ( )
connection. Exec ( "use " + db ) // use db
_, _ = connection. Exec ( "use " + db ) // use db
log . Printf ( "thread-%d start insert into [%d, %d) subtables.\n" , threadIndex , start , end )
log . Printf ( "thread-%d start insert into [%d, %d) subtables.\n" , threadIndex , start , end )
num := 0
num := 0
subTables := scaleTableNames [ start : end ]
subTables := scaleTableNames [ start : end ]
var buffer bytes . Buffer
for {
for {
var currSuccessRows int64
var currSuccessRows int64
var appendRows int
var appendRows int
var lastTableName string
var lastTableName string
buffers := bytes . Buffer { }
buffer . WriteString ( InsertPrefix )
buffers . WriteString ( INSERT_PREFIX )
for _ , tableName := range subTables {
for _ , tableName := range subTables {
subTableInfo := subTableMap [ scaleTableMap [ tableName ] . subTableName ]
subTableInfo := subTableMap [ scaleTableMap [ tableName ] . subTableName ]
subTableRows := int64 ( len ( subTableInfo . rows ) )
subTableRows := int64 ( len ( subTableInfo . rows ) )
superTableConf := superTableConfigMap [ subTableInfo . config . St n ame]
superTableConf := superTableConfigMap [ subTableInfo . config . St N ame]
tableStartTime := superTableConf . startTime
tableStartTime := superTableConf . startTime
var tableEndTime int64
var tableEndTime int64
@ -605,40 +603,35 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []i
// append
// append
if lastTableName != tableName {
if lastTableName != tableName {
buffer s . WriteString ( tableName )
buffer . WriteString ( tableName )
buffer s . WriteString ( " values" )
buffer . WriteString ( " values" )
}
}
lastTableName = tableName
lastTableName = tableName
buffer s . WriteString ( "(" )
buffer . WriteString ( "(" )
buffer s . WriteString ( fmt . Sprintf ( "%v" , currentTime ) )
buffer . WriteString ( fmt . Sprintf ( "%v" , currentTime ) )
buffer s . WriteString ( "," )
buffer . WriteString ( "," )
// fieldNum := len(subTableInfo.config.Fields)
for _ , field := range subTableInfo . config . Fields {
for _ , field := range subTableInfo . config . Fields {
buffers . WriteString ( getFieldValue ( currentRow [ strings . ToLower ( field . Name ) ] ) )
buffer . WriteString ( getFieldValue ( currentRow [ strings . ToLower ( field . Name ) ] ) )
buffers . WriteString ( "," )
buffer . WriteString ( "," )
// if( i != fieldNum -1){
// }
}
}
buffer s . Truncate ( buffer s . Len ( ) - 1 )
buffer . Truncate ( buffer . Len ( ) - 1 )
buffer s . WriteString ( ") " )
buffer . WriteString ( ") " )
appendRows ++
appendRows ++
insertRows ++
insertRows ++
if appendRows == batch {
if appendRows == batch {
// executebatch
// executeBatch
insertSql := buffers . String ( )
insertSql := buffer . String ( )
connection . Exec ( "use " + db )
affectedRows := executeBatchInsert ( insertSql , connection )
affectedRows := executeBatchInsert ( insertSql , connection )
successRows [ threadIndex ] += affectedRows
successRows [ threadIndex ] += affectedRows
currSuccessRows += affectedRows
currSuccessRows += affectedRows
buffer s . Reset ( )
buffer . Reset ( )
buffer s. WriteString ( INSERT_PREFIX )
buffer . WriteString ( InsertPrefix )
lastTableName = ""
lastTableName = ""
appendRows = 0
appendRows = 0
}
}
@ -654,15 +647,14 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []i
// left := len(rows)
// left := len(rows)
if appendRows > 0 {
if appendRows > 0 {
// executebatch
// executeBatch
insertSql := buffers . String ( )
insertSql := buffer . String ( )
connection . Exec ( "use " + db )
affectedRows := executeBatchInsert ( insertSql , connection )
affectedRows := executeBatchInsert ( insertSql , connection )
successRows [ threadIndex ] += affectedRows
successRows [ threadIndex ] += affectedRows
currSuccessRows += affectedRows
currSuccessRows += affectedRows
buffer s . Reset ( )
buffer . Reset ( )
}
}
// log.Printf("thread-%d finished insert %d rows, used %d ms.", threadIndex, currSuccessRows, time.Since(threadStartTime)/1e6)
// log.Printf("thread-%d finished insert %d rows, used %d ms.", threadIndex, currSuccessRows, time.Since(threadStartTime)/1e6)
@ -688,65 +680,10 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows []i
}
}
func buildSql ( rows [ ] tableRows ) string {
var lastTableName string
buffers := bytes . Buffer { }
for i , row := range rows {
if i == 0 {
lastTableName = row . tableName
buffers . WriteString ( INSERT_PREFIX )
buffers . WriteString ( row . tableName )
buffers . WriteString ( " values" )
buffers . WriteString ( row . value )
continue
}
if lastTableName == row . tableName {
buffers . WriteString ( row . value )
} else {
buffers . WriteString ( " " )
buffers . WriteString ( row . tableName )
buffers . WriteString ( " values" )
buffers . WriteString ( row . value )
lastTableName = row . tableName
}
}
inserSql := buffers . String ( )
return inserSql
}
func buildRow ( tableName string , currentTime int64 , subTableInfo * dataRows , currentRow map [ string ] interface { } ) tableRows {
tableRows := tableRows { tableName : tableName }
buffers := bytes . Buffer { }
buffers . WriteString ( "(" )
buffers . WriteString ( fmt . Sprintf ( "%v" , currentTime ) )
buffers . WriteString ( "," )
for _ , field := range subTableInfo . config . Fields {
buffers . WriteString ( getFieldValue ( currentRow [ strings . ToLower ( field . Name ) ] ) )
buffers . WriteString ( "," )
}
buffers . Truncate ( buffers . Len ( ) - 1 )
buffers . WriteString ( ")" )
insertSql := buffers . String ( )
tableRows . value = insertSql
return tableRows
}
func executeBatchInsert ( insertSql string , connection * sql . DB ) int64 {
func executeBatchInsert ( insertSql string , connection * sql . DB ) int64 {
result , erro r := connection . Exec ( insertSql )
result , err := connection . Exec ( insertSql )
if erro r != nil {
if err != nil {
log . Printf ( "execute insertSql %s error, %s\n" , insertSql , erro r)
log . Printf ( "execute insertSql %s error, %s\n" , insertSql , err )
return 0
return 0
}
}
affected , _ := result . RowsAffected ( )
affected , _ := result . RowsAffected ( )
@ -754,7 +691,6 @@ func executeBatchInsert(insertSql string, connection *sql.DB) int64 {
affected = 0
affected = 0
}
}
return affected
return affected
// return 0
}
}
func getFieldValue ( fieldValue interface { } ) string {
func getFieldValue ( fieldValue interface { } ) string {
@ -762,7 +698,7 @@ func getFieldValue(fieldValue interface{}) string {
}
}
func getConnection ( ) * sql . DB {
func getConnection ( ) * sql . DB {
db , err := sql . Open ( D RIVER_NAME , dataSourceName )
db , err := sql . Open ( D riverName , dataSourceName )
if err != nil {
if err != nil {
panic ( err )
panic ( err )
}
}
@ -773,19 +709,11 @@ func getSubTableNameValue(suffix interface{}) string {
return fmt . Sprintf ( "%v" , suffix )
return fmt . Sprintf ( "%v" , suffix )
}
}
func hash ( s string ) int {
func readFile ( config dataImport . CaseConfig ) dataRows {
v := int ( crc32 . ChecksumIEEE ( [ ] byte ( s ) ) )
if v < 0 {
return - v
}
return v
}
func readFile ( config dataimport . CaseConfig ) dataRows {
fileFormat := strings . ToLower ( config . Format )
fileFormat := strings . ToLower ( config . Format )
if fileFormat == J SON_FORMAT {
if fileFormat == JsonFormat {
return readJSONFile ( config )
return readJSONFile ( config )
} else if fileFormat == C SV_FORMAT {
} else if fileFormat == CsvFormat {
return readCSVFile ( config )
return readCSVFile ( config )
}
}
@ -793,7 +721,7 @@ func readFile(config dataimport.CaseConfig) dataRows {
return dataRows { }
return dataRows { }
}
}
func readCSVFile ( config data i mport. CaseConfig ) dataRows {
func readCSVFile ( config data I mport. CaseConfig ) dataRows {
var rows dataRows
var rows dataRows
f , err := os . Open ( config . FilePath )
f , err := os . Open ( config . FilePath )
if err != nil {
if err != nil {
@ -813,7 +741,7 @@ func readCSVFile(config dataimport.CaseConfig) dataRows {
line := strings . ToLower ( string ( lineBytes ) )
line := strings . ToLower ( string ( lineBytes ) )
titles := strings . Split ( line , config . Separator )
titles := strings . Split ( line , config . Separator )
if len ( titles ) < 3 {
if len ( titles ) < 3 {
// need suffix、 primary k ey and at least one other field
// need suffix、 primary K ey and at least one other field
log . Printf ( "the first line of file %s should be title row, and at least 3 field.\n" , config . FilePath )
log . Printf ( "the first line of file %s should be title row, and at least 3 field.\n" , config . FilePath )
return rows
return rows
}
}
@ -848,7 +776,7 @@ func readCSVFile(config dataimport.CaseConfig) dataRows {
}
}
// if the primary key valid
// if the primary key valid
primaryKeyValue := getPrimaryKeyMilli s ec( config . Timestamp , config . TimestampType , config . TimestampTypeFormat , dataMap )
primaryKeyValue := getPrimaryKeyMilli S ec( config . Timestamp , config . TimestampType , config . TimestampTypeFormat , dataMap )
if primaryKeyValue == - 1 {
if primaryKeyValue == - 1 {
log . Printf ( "the Timestamp[%s] of line %d is not valid, will filtered.\n" , config . Timestamp , lineNum )
log . Printf ( "the Timestamp[%s] of line %d is not valid, will filtered.\n" , config . Timestamp , lineNum )
continue
continue
@ -861,7 +789,7 @@ func readCSVFile(config dataimport.CaseConfig) dataRows {
return rows
return rows
}
}
func readJSONFile ( config data i mport. CaseConfig ) dataRows {
func readJSONFile ( config data I mport. CaseConfig ) dataRows {
var rows dataRows
var rows dataRows
f , err := os . Open ( config . FilePath )
f , err := os . Open ( config . FilePath )
@ -899,7 +827,7 @@ func readJSONFile(config dataimport.CaseConfig) dataRows {
continue
continue
}
}
primaryKeyValue := getPrimaryKeyMilli s ec( config . Timestamp , config . TimestampType , config . TimestampTypeFormat , line )
primaryKeyValue := getPrimaryKeyMilli S ec( config . Timestamp , config . TimestampType , config . TimestampTypeFormat , line )
if primaryKeyValue == - 1 {
if primaryKeyValue == - 1 {
log . Printf ( "the Timestamp[%s] of line %d is not valid, will filtered.\n" , config . Timestamp , lineNum )
log . Printf ( "the Timestamp[%s] of line %d is not valid, will filtered.\n" , config . Timestamp , lineNum )
continue
continue
@ -916,7 +844,7 @@ func readJSONFile(config dataimport.CaseConfig) dataRows {
/ * *
/ * *
* get primary key as millisecond , otherwise return - 1
* get primary key as millisecond , otherwise return - 1
* /
* /
func getPrimaryKeyMilli s ec( key string , valueType string , valueFormat string , line map [ string ] interface { } ) int64 {
func getPrimaryKeyMilli S ec( key string , valueType string , valueFormat string , line map [ string ] interface { } ) int64 {
if ! existMapKeyAndNotEmpty ( key , line ) {
if ! existMapKeyAndNotEmpty ( key , line ) {
return - 1
return - 1
}
}
@ -971,13 +899,13 @@ func existMapKeyAndNotEmpty(key string, maps map[string]interface{}) bool {
return true
return true
}
}
func checkUserCaseConfig ( caseName string , caseConfig * data i mport. CaseConfig ) {
func checkUserCaseConfig ( caseName string , caseConfig * data I mport. CaseConfig ) {
if len ( caseConfig . St n ame) == 0 {
if len ( caseConfig . St N ame) == 0 {
log . Fatalf ( "the stname of case %s can't be empty\n" , caseName )
log . Fatalf ( "the stname of case %s can't be empty\n" , caseName )
}
}
caseConfig . St name = strings . ToLower ( caseConfig . Stn ame)
caseConfig . St Name = strings . ToLower ( caseConfig . StN ame)
if len ( caseConfig . Tags ) == 0 {
if len ( caseConfig . Tags ) == 0 {
log . Fatalf ( "the tags of case %s can't be empty\n" , caseName )
log . Fatalf ( "the tags of case %s can't be empty\n" , caseName )
@ -1029,24 +957,24 @@ func checkUserCaseConfig(caseName string, caseConfig *dataimport.CaseConfig) {
}
}
func parseArg ( ) {
func parseArg ( ) {
flag . StringVar ( & cfg , "cfg" , "config/cfg.toml" , "configuration file which describes use c ase and data format.")
flag . StringVar ( & cfg , "cfg" , "config/cfg.toml" , "configuration file which describes use C ase and data format.")
flag . StringVar ( & cases , "cases" , "sensor_info" , "use c ase for dataset to be imported. Multiple choices can be separated by comma, for example, -cases sensor_info,camera_detection.")
flag . StringVar ( & cases , "cases" , "sensor_info" , "use C ase for dataset to be imported. Multiple choices can be separated by comma, for example, -cases sensor_info,camera_detection.")
flag . IntVar ( & hnum , "hnum" , 100 , "magnification factor of the sample tables. For example, if hnum is 100 and in the sample data there are 10 tables, then 10x100=1000 tables will be created in the database." )
flag . IntVar ( & hnum , "hnum" , 100 , "magnification factor of the sample tables. For example, if hnum is 100 and in the sample data there are 10 tables, then 10x100=1000 tables will be created in the database." )
flag . IntVar ( & vnum , "vnum" , 1000 , "copies of the sample records in each table. If set to 0, this program will never stop simulating and importing data even if the timestamp has passed current time." )
flag . IntVar ( & vnum , "vnum" , 1000 , "copies of the sample records in each table. If set to 0, this program will never stop simulating and importing data even if the timestamp has passed current time." )
flag . Int64Var ( & delay , "delay" , D EFAULT_DELAY , "the delay time interval(millisecond) to continue generating data when vnum set 0." )
flag . Int64Var ( & delay , "delay" , D efaultDelay , "the delay time interval(millisecond) to continue generating data when vnum set 0." )
flag . Int64Var ( & tick , "tick" , 2000 , "the tick time interval(millisecond) to print statistic info." )
flag . Int64Var ( & tick , "tick" , 2000 , "the tick time interval(millisecond) to print statistic info." )
flag . IntVar ( & save , "save" , 0 , "whether to save the statistical info into 'statistic' table. 0 is disabled and 1 is enabled." )
flag . IntVar ( & save , "save" , 0 , "whether to save the statistical info into 'statistic' table. 0 is disabled and 1 is enabled." )
flag . StringVar ( & saveTable , "savetb" , D EFAULT_STATISTIC_TABLE , "the table to save 'statistic' info when save set 1." )
flag . StringVar ( & saveTable , "savetb" , D efaultStatisticTable , "the table to save 'statistic' info when save set 1." )
flag . IntVar ( & thread , "thread" , 10 , "number of threads to import data." )
flag . IntVar ( & thread , "thread" , 10 , "number of threads to import data." )
flag . IntVar ( & batch , "batch" , 100 , "rows of records in one import batch." )
flag . IntVar ( & batch , "batch" , 100 , "rows of records in one import batch." )
flag . IntVar ( & auto , "auto" , 0 , "whether to use the start t ime and interval specified by users when simulating the data. 0 is disabled and 1 is enabled.")
flag . IntVar ( & auto , "auto" , 0 , "whether to use the start T ime and interval specified by users when simulating the data. 0 is disabled and 1 is enabled.")
flag . StringVar ( & start times tr, "start" , "" , "the starting timestamp of simulated data, in the format of yyyy-MM-dd HH:mm:ss.SSS. If not specified, the ea liest timestamp in the sample data will be set as the startt ime.")
flag . StringVar ( & start TimeS tr, "start" , "" , "the starting timestamp of simulated data, in the format of yyyy-MM-dd HH:mm:ss.SSS. If not specified, the ea r liest timestamp in the sample data will be set as the startT ime.")
flag . Int64Var ( & interval , "interval" , D EFAULT_INTERVAL, "time inte val between two consecutive records, in the unit of millisecond. Only valid when auto is 1.")
flag . Int64Var ( & interval , "interval" , D efaultInterval, "time inter val between two consecutive records, in the unit of millisecond. Only valid when auto is 1.")
flag . StringVar ( & host , "host" , "127.0.0.1" , "tdengine server ip." )
flag . StringVar ( & host , "host" , "127.0.0.1" , "tdengine server ip." )
flag . IntVar ( & port , "port" , 6030 , "tdengine server port." )
flag . IntVar ( & port , "port" , 6030 , "tdengine server port." )
flag . StringVar ( & user , "user" , "root" , "user name to login into the database." )
flag . StringVar ( & user , "user" , "root" , "user name to login into the database." )
flag . StringVar ( & password , "password" , "taosdata" , "the import tdengine user password" )
flag . StringVar ( & password , "password" , "taosdata" , "the import tdengine user password" )
flag . IntVar ( & dropdb , "dropdb" , 0 , "whether to drop the existing dat base. 1 is yes and 0 otherwise.")
flag . IntVar ( & dropdb , "dropdb" , 0 , "whether to drop the existing dat a base. 1 is yes and 0 otherwise.")
flag . StringVar ( & db , "db" , "" , "name of the database to store data." )
flag . StringVar ( & db , "db" , "" , "name of the database to store data." )
flag . StringVar ( & dbparam , "dbparam" , "" , "database configurations when it is created." )
flag . StringVar ( & dbparam , "dbparam" , "" , "database configurations when it is created." )
@ -1066,7 +994,7 @@ func printArg() {
fmt . Println ( "-thread:" , thread )
fmt . Println ( "-thread:" , thread )
fmt . Println ( "-batch:" , batch )
fmt . Println ( "-batch:" , batch )
fmt . Println ( "-auto:" , auto )
fmt . Println ( "-auto:" , auto )
fmt . Println ( "-start:" , start times tr)
fmt . Println ( "-start:" , start TimeS tr)
fmt . Println ( "-interval:" , interval )
fmt . Println ( "-interval:" , interval )
fmt . Println ( "-host:" , host )
fmt . Println ( "-host:" , host )
fmt . Println ( "-port" , port )
fmt . Println ( "-port" , port )