From 19a0b51c1ac2293398aef65f4e27a97677304ad5 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Sun, 28 Jun 2020 18:37:38 +0800 Subject: [PATCH 1/4] add stress test tool --- tests/stress/.gitignore | 3 + tests/stress/README.md | 32 ++++ tests/stress/go.mod | 5 + tests/stress/main.go | 318 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 358 insertions(+) create mode 100644 tests/stress/.gitignore create mode 100644 tests/stress/README.md create mode 100644 tests/stress/go.mod create mode 100644 tests/stress/main.go diff --git a/tests/stress/.gitignore b/tests/stress/.gitignore new file mode 100644 index 0000000000..e13f45af4b --- /dev/null +++ b/tests/stress/.gitignore @@ -0,0 +1,3 @@ +stress +stress.exe +script.json \ No newline at end of file diff --git a/tests/stress/README.md b/tests/stress/README.md new file mode 100644 index 0000000000..e554214ad3 --- /dev/null +++ b/tests/stress/README.md @@ -0,0 +1,32 @@ +# STRESS + +``` bash +$ ./stress [-server=] [-db=] [-concurrent=<1>] [-fetch=] [scriptFile] +``` + +## SCRIPT FILE + +```json +[{ + "sql": "select * from meters where id = %d and a >= %d and a <= %d and tbname='%s'", + "args": [{ + "type": "int", + "min": -10, + "max": 20 + }, { + "type": "range", + "min": 30, + "max": 60 + }, { + "type": "string", + "min": 0, + "max": 10, + "list": [ + "table1", + "table2", + "table3", + "table4" + ] + }] +}] +``` \ No newline at end of file diff --git a/tests/stress/go.mod b/tests/stress/go.mod new file mode 100644 index 0000000000..9cd09db81e --- /dev/null +++ b/tests/stress/go.mod @@ -0,0 +1,5 @@ +module github.com/taosdata/stress + +go 1.14 + +require github.com/taosdata/driver-go v0.0.0-20200606095205-b786bac1857f diff --git a/tests/stress/main.go b/tests/stress/main.go new file mode 100644 index 0000000000..0e271eab72 --- /dev/null +++ b/tests/stress/main.go @@ -0,0 +1,318 @@ +package main + +import ( + "database/sql" + "encoding/json" + "errors" + "flag" + "fmt" + "math/rand" + "os" + "os/signal" + "strings" + "sync" + "sync/atomic" + "time" + + _ "github.com/taosdata/driver-go/taosSql" +) + +type argument struct { + Type string `json:"type"` + Min int `json:"min"` + Max int `json:"max"` + List []interface{} `json:"list, omitempty"` +} + +type script struct { + isQuery bool `json:"-"` + numArgs int `json:"-"` + Sql string `json:"sql"` + Args []argument `json:"args"` +} + +func (arg *argument) check() (int, error) { + if arg.Type == "list" { + if len(arg.List) == 0 { + return 0, errors.New("list cannot be empty") + } + return 1, nil + } + + if arg.Max < arg.Min { + return 0, errors.New("invalid min/max value") + } + + if arg.Type == "string" { + if arg.Min < 0 { + return 0, errors.New("negative string length") + } + } + + if arg.Type == "int" && arg.Min == 0 && arg.Max == 0 { + arg.Max = arg.Min + 100 + } + + if arg.Type == "range" { + return 2, nil + } + + return 1, nil +} + +func (arg *argument) generate(args []interface{}) []interface{} { + const chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" + + switch arg.Type { + case "bool": + if rand.Intn(2) == 1 { + args = append(args, true) + } else { + args = append(args, false) + } + + case "int": + v := rand.Intn(arg.Max-arg.Min+1) + arg.Min + args = append(args, v) + + case "range": + v := rand.Intn(arg.Max-arg.Min) + arg.Min + args = append(args, v) + v = rand.Intn(arg.Max-v+1) + v + args = append(args, v) + + case "string": + l := rand.Intn(arg.Max-arg.Min+1) + arg.Min + sb := strings.Builder{} + for i := 0; i < l; i++ { + sb.WriteByte(chars[rand.Intn(len(chars))]) + } + args = append(args, sb.String()) + + case "list": + v := arg.List[rand.Intn(len(arg.List))] + args = append(args, v) + } + + return args +} + +type statitics struct { + succeeded int64 + failed int64 + succeededDuration int64 + failedDuration int64 +} + +var ( + server string + database string + fetch bool + concurrent uint + startAt time.Time + shouldStop int64 + wg sync.WaitGroup + stat statitics + scripts []script +) + +func loadScript(path string) error { + f, e := os.Open(path) + if e != nil { + return e + } + defer f.Close() + + e = json.NewDecoder(f).Decode(&scripts) + if e != nil { + return e + } + + for i := 0; i < len(scripts); i++ { + s := &scripts[i] + s.Sql = strings.TrimSpace(s.Sql) + s.isQuery = strings.ToLower(s.Sql[:6]) == "select" + + for j := 0; j < len(s.Args); j++ { + arg := &s.Args[j] + arg.Type = strings.ToLower(arg.Type) + n, e := arg.check() + if e != nil { + return fmt.Errorf("script %d argument %d: %s", i, j, e.Error()) + } + s.numArgs += n + } + } + + return nil +} + +func buildSql() (string, bool) { + s := scripts[rand.Intn(len(scripts))] + args := make([]interface{}, 0, s.numArgs) + for i := 0; i < len(s.Args); i++ { + args = s.Args[i].generate(args) + } + return fmt.Sprintf(s.Sql, args...), s.isQuery +} + +func runTest() { + defer wg.Done() + db, e := sql.Open("taosSql", "root:taosdata@tcp("+server+":0)/"+database) + if e != nil { + fmt.Printf("failed to connect to database: %s\n", e.Error()) + return + } + defer db.Close() + + for atomic.LoadInt64(&shouldStop) == 0 { + str, isQuery := buildSql() + start := time.Now() + + if isQuery { + var rows *sql.Rows + if rows, e = db.Query(str); rows != nil { + if fetch { + for rows.Next() { + } + } + rows.Close() + } + } else { + _, e = db.Exec(str) + } + + duration := time.Now().Sub(start).Microseconds() + if e != nil { + atomic.AddInt64(&stat.failed, 1) + atomic.AddInt64(&stat.failedDuration, duration) + } else { + atomic.AddInt64(&stat.succeeded, 1) + atomic.AddInt64(&stat.succeededDuration, duration) + } + } +} + +func getStatPrinter() func(tm time.Time) { + var last statitics + lastPrintAt := startAt + + return func(tm time.Time) { + var current statitics + + current.succeeded = atomic.LoadInt64(&stat.succeeded) + current.failed = atomic.LoadInt64(&stat.failed) + current.succeededDuration = atomic.LoadInt64(&stat.succeededDuration) + current.failedDuration = atomic.LoadInt64(&stat.failedDuration) + + seconds := int64(tm.Sub(startAt).Seconds()) + format := "\033K %02v:%02v:%02v | TOTAL REQ | TOTAL TIME(us) | TOTAL AVG(us) | REQUEST | TIME(us) | AVERAGE(us) |\n" + fmt.Printf(format, seconds/3600, seconds%3600/60, seconds%60) + fmt.Println("------------------------------------------------------------------------------------------------") + + tr := current.succeeded + current.failed + td := current.succeededDuration + current.failedDuration + r := tr - last.succeeded - last.failed + d := td - last.succeededDuration - last.failedDuration + ta, a := 0.0, 0.0 + if tr > 0 { + ta = float64(td) / float64(tr) + } + if r > 0 { + a = float64(d) / float64(r) + } + format = "\033[K TOTAL | %9v | %14v | %13.2f | %7v | %10v | % 13.2f |\n" + fmt.Printf(format, tr, td, ta, r, d, a) + + tr = current.succeeded + td = current.succeededDuration + r = tr - last.succeeded + d = td - last.succeededDuration + ta, a = 0.0, 0.0 + if tr > 0 { + ta = float64(td) / float64(tr) + } + if r > 0 { + a = float64(d) / float64(r) + } + format = "\033[K SUCCESS | \033[32m%9v\033[0m | \033[32m%14v\033[0m | \033[32m%13.2f\033[0m | \033[32m%7v\033[0m | \033[32m%10v\033[0m | \033[32m%13.2f\033[0m |\n" + fmt.Printf(format, tr, td, ta, r, d, a) + + tr = current.failed + td = current.failedDuration + r = tr - last.failed + d = td - last.failedDuration + ta, a = 0.0, 0.0 + if tr > 0 { + ta = float64(td) / float64(tr) + } + if r > 0 { + a = float64(d) / float64(r) + } + format = "\033[K FAIL | \033[31m%9v\033[0m | \033[31m%14v\033[0m | \033[31m%13.2f\033[0m | \033[31m%7v\033[0m | \033[31m%10v\033[0m | \033[31m%13.2f\033[0m |\n" + fmt.Printf(format, tr, td, ta, r, d, a) + + last = current + lastPrintAt = tm + } +} + +func main() { + flag.StringVar(&server, "server", "localhost", "host name or IP address of TDengine server") + flag.StringVar(&database, "db", "test", "database name") + flag.BoolVar(&fetch, "fetch", false, "fetch result or not") + flag.UintVar(&concurrent, "concurrent", 1, "number of concurrent queries") + flag.Parse() + + scriptFile := flag.Arg(0) + if scriptFile == "" { + scriptFile = "script.json" + } + if e := loadScript(scriptFile); e != nil { + fmt.Println("failed to load script file:", e.Error()) + return + } else if len(scripts) == 0 { + fmt.Println("there's no script in the script file") + return + } + + rand.Seed(time.Now().UnixNano()) + + fmt.Println() + fmt.Printf("SERVER: %s DATABASE: %s CONCURRENT QUERIES: %d FETCH DATA: %v\n", server, database, concurrent, fetch) + fmt.Println() + + startAt = time.Now() + printStat := getStatPrinter() + printStat(startAt) + + for i := uint(0); i < concurrent; i++ { + wg.Add(1) + go runTest() + } + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + ticker := time.NewTicker(time.Second) + + fmt.Println("Ctrl + C to exit....\033[1A") + +LOOP: + for { + select { + case <-interrupt: + break LOOP + case tm := <-ticker.C: + fmt.Print("\033[5A") + printStat(tm) + } + } + + atomic.StoreInt64(&shouldStop, 1) + fmt.Print("\033[100D'Ctrl + C' received, Waiting started query to stop...") + + wg.Wait() + fmt.Print("\033[5A\033[100D") + printStat(time.Now()) + fmt.Println() +} From c115bb3dd44ef7093e5e7160c6e87f8f35dd0b30 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Mon, 29 Jun 2020 13:24:20 +0800 Subject: [PATCH 2/4] update stress test tool --- tests/stress/.gitignore | 2 +- tests/stress/README.md | 64 ++++++++++++++++++--- tests/stress/go.mod | 4 +- tests/stress/main.go | 123 +++++++++++++++++++++++++--------------- 4 files changed, 138 insertions(+), 55 deletions(-) diff --git a/tests/stress/.gitignore b/tests/stress/.gitignore index e13f45af4b..25a8031c43 100644 --- a/tests/stress/.gitignore +++ b/tests/stress/.gitignore @@ -1,3 +1,3 @@ stress stress.exe -script.json \ No newline at end of file +cases.json \ No newline at end of file diff --git a/tests/stress/README.md b/tests/stress/README.md index e554214ad3..e23927bd1a 100644 --- a/tests/stress/README.md +++ b/tests/stress/README.md @@ -1,26 +1,43 @@ # STRESS +Stress test tool for TDengine. It run a set of test cases randomly and show statistics. + +## COMMAND LINE + ``` bash -$ ./stress [-server=] [-db=] [-concurrent=<1>] [-fetch=] [scriptFile] +$ ./stress [-h=] [-P=<0>] [-d=] [-u=] [-p=] [-c=<4>] [-f=] [test case file] ``` -## SCRIPT FILE +* **-h**: host name or IP address of TDengine server (default: localhost). +* **-P**: port number of TDengine server (default: 0). +* **-u**: user name (default: root). +* **-p**: password (default: taosdata). +* **-c**: concurrency, number of concurrent goroutines for query (default: 4). +* **-f**: fetch data or not (default: true). +* **test case file**: the path of a JSON file which contains the test cases (default: cases.json). + +## TEST CASE FILE ```json [{ - "sql": "select * from meters where id = %d and a >= %d and a <= %d and tbname='%s'", + "weight": 1, + "sql": "select * from meters where ts>=now+%dm and ts<=now-%dm and c1=%v and c2=%d and c3='%s' and tbname='%s'", "args": [{ - "type": "int", - "min": -10, - "max": 20 - }, { "type": "range", "min": 30, "max": 60 + }, { + "type": "bool" + }, { + "type": "int", + "min": -10, + "max": 20 }, { "type": "string", "min": 0, "max": 10, + }, { + "type": "list", "list": [ "table1", "table2", @@ -29,4 +46,35 @@ $ ./stress [-server=] [-db=] [-concurrent=<1>] [-fetch=] ] }] }] -``` \ No newline at end of file +``` + +The test case file is a standard JSON file which contains an array of test cases. For test cases, field `sql` is mandatory, and it can optionally include a `weight` field and an `args` field which is an array of arguments. + +`sql` is a SQL statement, it can include zero or more arguments (placeholders). + +`weight` defines the possibility of the case being selected, the greater value the higher possibility. It must be an non-negative integer and the default value is zero, but, if all cases have a zero weight, all the weights are regarded as 1. + +Placeholders of `sql` are replaced by arguments in `args` at runtime. There are 5 types of arguments currently: + +* **bool**: generate a `boolean` value randomly. +* **int**: generate an `integer` between [`min`, `max`] randomly, the default value of `min` is 0 and `max` is 100. +* **range**: generate two `integer`s between [`min`, `max`] randomly, the first is less than the second, the default value of `min` is 0 and `max` is 100. +* **string**: generate a `string` with length between [`min`, `max`] randomly, the default value of `min` is 0 and `max` is 100. +* **list**: select an item from `list` randomly. + +## OUTPUT + +``` + 00:00:08 | TOTAL REQ | TOTAL TIME(us) | TOTAL AVG(us) | REQUEST | TIME(us) | AVERAGE(us) | +----------------------------------------------------------------------------------------------- + TOTAL | 3027 | 26183890 | 8650.11 | 287 | 3060935 | 10665.28 | + SUCCESS | 3027 | 26183890 | 8650.11 | 287 | 3060935 | 10665.28 | + FAIL | 0 | 0 | 0.00 | 0 | 0 | 0.00 | +``` + +* **Col 2**: total number of request since test start. +* **Col 3**: total time of all request since test start. +* **Col 4**: average time of all request since test start. +* **Col 5**: number of request in last second. +* **Col 6**: time of all request in last second. +* **Col 7**: average time of all request in last second. diff --git a/tests/stress/go.mod b/tests/stress/go.mod index 9cd09db81e..df9b2806b5 100644 --- a/tests/stress/go.mod +++ b/tests/stress/go.mod @@ -2,4 +2,6 @@ module github.com/taosdata/stress go 1.14 -require github.com/taosdata/driver-go v0.0.0-20200606095205-b786bac1857f +require ( + github.com/taosdata/driver-go v0.0.0-20200606095205-b786bac1857f +) diff --git a/tests/stress/main.go b/tests/stress/main.go index 0e271eab72..2e0775d498 100644 --- a/tests/stress/main.go +++ b/tests/stress/main.go @@ -24,9 +24,10 @@ type argument struct { List []interface{} `json:"list, omitempty"` } -type script struct { +type testCase struct { isQuery bool `json:"-"` numArgs int `json:"-"` + Weight int `json:"weight"` Sql string `json:"sql"` Args []argument `json:"args"` } @@ -97,6 +98,14 @@ func (arg *argument) generate(args []interface{}) []interface{} { return args } +func (tc *testCase) buildSql() string { + args := make([]interface{}, 0, tc.numArgs) + for i := 0; i < len(tc.Args); i++ { + args = tc.Args[i].generate(args) + } + return fmt.Sprintf(tc.Sql, args...) +} + type statitics struct { succeeded int64 failed int64 @@ -105,60 +114,79 @@ type statitics struct { } var ( - server string - database string - fetch bool - concurrent uint - startAt time.Time - shouldStop int64 - wg sync.WaitGroup - stat statitics - scripts []script + host string + port uint + database string + user string + password string + fetch bool + + startAt time.Time + shouldStop int64 + wg sync.WaitGroup + stat statitics + totalWeight int + cases []testCase ) -func loadScript(path string) error { +func loadTestCase(path string) error { f, e := os.Open(path) if e != nil { return e } defer f.Close() - e = json.NewDecoder(f).Decode(&scripts) + e = json.NewDecoder(f).Decode(&cases) if e != nil { return e } - for i := 0; i < len(scripts); i++ { - s := &scripts[i] - s.Sql = strings.TrimSpace(s.Sql) - s.isQuery = strings.ToLower(s.Sql[:6]) == "select" + for i := 0; i < len(cases); i++ { + c := &cases[i] + c.Sql = strings.TrimSpace(c.Sql) + c.isQuery = strings.ToLower(c.Sql[:6]) == "select" + if c.Weight < 0 { + return fmt.Errorf("test %d: negative weight", i) + } + totalWeight += c.Weight - for j := 0; j < len(s.Args); j++ { - arg := &s.Args[j] + for j := 0; j < len(c.Args); j++ { + arg := &c.Args[j] arg.Type = strings.ToLower(arg.Type) n, e := arg.check() if e != nil { - return fmt.Errorf("script %d argument %d: %s", i, j, e.Error()) + return fmt.Errorf("test case %d argument %d: %s", i, j, e.Error()) } - s.numArgs += n + c.numArgs += n } } + if totalWeight == 0 { + for i := 0; i < len(cases); i++ { + cases[i].Weight = 1 + } + totalWeight = len(cases) + } + return nil } -func buildSql() (string, bool) { - s := scripts[rand.Intn(len(scripts))] - args := make([]interface{}, 0, s.numArgs) - for i := 0; i < len(s.Args); i++ { - args = s.Args[i].generate(args) +func selectTestCase() *testCase { + sum, target := 0, rand.Intn(totalWeight) + var c *testCase + for i := 0; i < len(cases); i++ { + c = &cases[i] + sum += c.Weight + if sum > target { + break + } } - return fmt.Sprintf(s.Sql, args...), s.isQuery + return c } func runTest() { defer wg.Done() - db, e := sql.Open("taosSql", "root:taosdata@tcp("+server+":0)/"+database) + db, e := sql.Open("taosSql", fmt.Sprintf("%s:%s@tcp(%s:%v)/%s", user, password, host, port, database)) if e != nil { fmt.Printf("failed to connect to database: %s\n", e.Error()) return @@ -166,10 +194,11 @@ func runTest() { defer db.Close() for atomic.LoadInt64(&shouldStop) == 0 { - str, isQuery := buildSql() - start := time.Now() + c := selectTestCase() + str := c.buildSql() - if isQuery { + start := time.Now() + if c.isQuery { var rows *sql.Rows if rows, e = db.Query(str); rows != nil { if fetch { @@ -181,8 +210,8 @@ func runTest() { } else { _, e = db.Exec(str) } - duration := time.Now().Sub(start).Microseconds() + if e != nil { atomic.AddInt64(&stat.failed, 1) atomic.AddInt64(&stat.failedDuration, duration) @@ -208,7 +237,7 @@ func getStatPrinter() func(tm time.Time) { seconds := int64(tm.Sub(startAt).Seconds()) format := "\033K %02v:%02v:%02v | TOTAL REQ | TOTAL TIME(us) | TOTAL AVG(us) | REQUEST | TIME(us) | AVERAGE(us) |\n" fmt.Printf(format, seconds/3600, seconds%3600/60, seconds%60) - fmt.Println("------------------------------------------------------------------------------------------------") + fmt.Println("-----------------------------------------------------------------------------------------------") tr := current.succeeded + current.failed td := current.succeededDuration + current.failedDuration @@ -258,35 +287,39 @@ func getStatPrinter() func(tm time.Time) { } func main() { - flag.StringVar(&server, "server", "localhost", "host name or IP address of TDengine server") - flag.StringVar(&database, "db", "test", "database name") - flag.BoolVar(&fetch, "fetch", false, "fetch result or not") - flag.UintVar(&concurrent, "concurrent", 1, "number of concurrent queries") + var concurrency uint + flag.StringVar(&host, "h", "localhost", "host name or IP address of TDengine server") + flag.UintVar(&port, "P", 0, "port (default 0)") + flag.StringVar(&database, "d", "test", "database name") + flag.StringVar(&user, "u", "root", "user name") + flag.StringVar(&password, "p", "taosdata", "password") + flag.BoolVar(&fetch, "f", true, "fetch result or not") + flag.UintVar(&concurrency, "c", 4, "concurrency, number of goroutines for query") flag.Parse() - scriptFile := flag.Arg(0) - if scriptFile == "" { - scriptFile = "script.json" + caseFile := flag.Arg(0) + if caseFile == "" { + caseFile = "cases.json" } - if e := loadScript(scriptFile); e != nil { - fmt.Println("failed to load script file:", e.Error()) + if e := loadTestCase(caseFile); e != nil { + fmt.Println("failed to load test cases:", e.Error()) return - } else if len(scripts) == 0 { - fmt.Println("there's no script in the script file") + } else if len(cases) == 0 { + fmt.Println("there's no test case") return } rand.Seed(time.Now().UnixNano()) fmt.Println() - fmt.Printf("SERVER: %s DATABASE: %s CONCURRENT QUERIES: %d FETCH DATA: %v\n", server, database, concurrent, fetch) + fmt.Printf("SERVER: %s DATABASE: %s CONCURRENCY: %d FETCH DATA: %v\n", host, database, concurrency, fetch) fmt.Println() startAt = time.Now() printStat := getStatPrinter() printStat(startAt) - for i := uint(0); i < concurrent; i++ { + for i := uint(0); i < concurrency; i++ { wg.Add(1) go runTest() } From 72ddf72b01b4413a6a001b57137769180fdb924d Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Tue, 30 Jun 2020 10:36:04 +0800 Subject: [PATCH 3/4] td-797: support SQL argument from command line --- tests/stress/README.md | 4 ++-- tests/stress/main.go | 52 +++++++++++++++++++++++++++--------------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/tests/stress/README.md b/tests/stress/README.md index e23927bd1a..c00e954c24 100644 --- a/tests/stress/README.md +++ b/tests/stress/README.md @@ -5,7 +5,7 @@ Stress test tool for TDengine. It run a set of test cases randomly and show stat ## COMMAND LINE ``` bash -$ ./stress [-h=] [-P=<0>] [-d=] [-u=] [-p=] [-c=<4>] [-f=] [test case file] +$ ./stress [-h=] [-P=<0>] [-d=] [-u=] [-p=] [-c=<4>] [-f=] [path_or_sql] ``` * **-h**: host name or IP address of TDengine server (default: localhost). @@ -14,7 +14,7 @@ $ ./stress [-h=] [-P=<0>] [-d=] [-u=] [-p=] [-c * **-p**: password (default: taosdata). * **-c**: concurrency, number of concurrent goroutines for query (default: 4). * **-f**: fetch data or not (default: true). -* **test case file**: the path of a JSON file which contains the test cases (default: cases.json). +* **path_or_sql**: a SQL statement or path of a JSON file which contains the test cases (default: cases.json). ## TEST CASE FILE diff --git a/tests/stress/main.go b/tests/stress/main.go index 2e0775d498..c32f1660a1 100644 --- a/tests/stress/main.go +++ b/tests/stress/main.go @@ -28,7 +28,7 @@ type testCase struct { isQuery bool `json:"-"` numArgs int `json:"-"` Weight int `json:"weight"` - Sql string `json:"sql"` + SQL string `json:"sql"` Args []argument `json:"args"` } @@ -103,7 +103,7 @@ func (tc *testCase) buildSql() string { for i := 0; i < len(tc.Args); i++ { args = tc.Args[i].generate(args) } - return fmt.Sprintf(tc.Sql, args...) + return fmt.Sprintf(tc.SQL, args...) } type statitics struct { @@ -129,22 +129,19 @@ var ( cases []testCase ) -func loadTestCase(path string) error { - f, e := os.Open(path) - if e != nil { +func loadTestCaseFromFile(file *os.File) error { + if e := json.NewDecoder(file).Decode(&cases); e != nil { return e } - defer f.Close() - e = json.NewDecoder(f).Decode(&cases) - if e != nil { - return e + if len(cases) == 0 { + return fmt.Errorf("no test case loaded.") } for i := 0; i < len(cases); i++ { c := &cases[i] - c.Sql = strings.TrimSpace(c.Sql) - c.isQuery = strings.ToLower(c.Sql[:6]) == "select" + c.SQL = strings.TrimSpace(c.SQL) + c.isQuery = strings.ToLower(c.SQL[:6]) == "select" if c.Weight < 0 { return fmt.Errorf("test %d: negative weight", i) } @@ -171,6 +168,28 @@ func loadTestCase(path string) error { return nil } +func loadTestCase(pathOrSQL string) error { + if f, e := os.Open(pathOrSQL); e == nil { + defer f.Close() + return loadTestCaseFromFile(f) + } + + pathOrSQL = strings.TrimSpace(pathOrSQL) + if strings.ToLower(pathOrSQL[:6]) != "select" { + return fmt.Errorf("'%s' is not a valid file or SQL statement", pathOrSQL) + } + + cases = append(cases, testCase{ + isQuery: true, + Weight: 1, + numArgs: 0, + SQL: pathOrSQL, + }) + totalWeight = 1 + + return nil +} + func selectTestCase() *testCase { sum, target := 0, rand.Intn(totalWeight) var c *testCase @@ -297,16 +316,13 @@ func main() { flag.UintVar(&concurrency, "c", 4, "concurrency, number of goroutines for query") flag.Parse() - caseFile := flag.Arg(0) - if caseFile == "" { - caseFile = "cases.json" + pathOrSQL := flag.Arg(0) + if len(pathOrSQL) == 0 { + pathOrSQL = "cases.json" } - if e := loadTestCase(caseFile); e != nil { + if e := loadTestCase(pathOrSQL); e != nil { fmt.Println("failed to load test cases:", e.Error()) return - } else if len(cases) == 0 { - fmt.Println("there's no test case") - return } rand.Seed(time.Now().UnixNano()) From 96acc6c284671db6bbb57af8c3eacc80d102ecf3 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Thu, 2 Jul 2020 16:06:47 +0800 Subject: [PATCH 4/4] TD-797: add log support --- tests/stress/README.md | 4 +-- tests/stress/main.go | 67 +++++++++++++++++++++++++++++++++--------- 2 files changed, 55 insertions(+), 16 deletions(-) diff --git a/tests/stress/README.md b/tests/stress/README.md index c00e954c24..a7f8a2dac6 100644 --- a/tests/stress/README.md +++ b/tests/stress/README.md @@ -5,7 +5,7 @@ Stress test tool for TDengine. It run a set of test cases randomly and show stat ## COMMAND LINE ``` bash -$ ./stress [-h=] [-P=<0>] [-d=] [-u=] [-p=] [-c=<4>] [-f=] [path_or_sql] +$ ./stress [-h=] [-P=<0>] [-d=] [-u=] [-p=] [-c=<4>] [-f=] [-l=] [path_or_sql] ``` * **-h**: host name or IP address of TDengine server (default: localhost). @@ -14,6 +14,7 @@ $ ./stress [-h=] [-P=<0>] [-d=] [-u=] [-p=] [-c * **-p**: password (default: taosdata). * **-c**: concurrency, number of concurrent goroutines for query (default: 4). * **-f**: fetch data or not (default: true). +* **-l**: log file path (default: no log). * **path_or_sql**: a SQL statement or path of a JSON file which contains the test cases (default: cases.json). ## TEST CASE FILE @@ -66,7 +67,6 @@ Placeholders of `sql` are replaced by arguments in `args` at runtime. There are ``` 00:00:08 | TOTAL REQ | TOTAL TIME(us) | TOTAL AVG(us) | REQUEST | TIME(us) | AVERAGE(us) | ------------------------------------------------------------------------------------------------ TOTAL | 3027 | 26183890 | 8650.11 | 287 | 3060935 | 10665.28 | SUCCESS | 3027 | 26183890 | 8650.11 | 287 | 3060935 | 10665.28 | FAIL | 0 | 0 | 0.00 | 0 | 0 | 0.00 | diff --git a/tests/stress/main.go b/tests/stress/main.go index c32f1660a1..c3b9290a37 100644 --- a/tests/stress/main.go +++ b/tests/stress/main.go @@ -121,9 +121,11 @@ var ( password string fetch bool + chLog chan string + wgLog sync.WaitGroup startAt time.Time shouldStop int64 - wg sync.WaitGroup + wgTest sync.WaitGroup stat statitics totalWeight int cases []testCase @@ -204,7 +206,7 @@ func selectTestCase() *testCase { } func runTest() { - defer wg.Done() + defer wgTest.Done() db, e := sql.Open("taosSql", fmt.Sprintf("%s:%s@tcp(%s:%v)/%s", user, password, host, port, database)) if e != nil { fmt.Printf("failed to connect to database: %s\n", e.Error()) @@ -232,6 +234,9 @@ func runTest() { duration := time.Now().Sub(start).Microseconds() if e != nil { + if chLog != nil { + chLog <- str + ": " + e.Error() + } atomic.AddInt64(&stat.failed, 1) atomic.AddInt64(&stat.failedDuration, duration) } else { @@ -254,9 +259,8 @@ func getStatPrinter() func(tm time.Time) { current.failedDuration = atomic.LoadInt64(&stat.failedDuration) seconds := int64(tm.Sub(startAt).Seconds()) - format := "\033K %02v:%02v:%02v | TOTAL REQ | TOTAL TIME(us) | TOTAL AVG(us) | REQUEST | TIME(us) | AVERAGE(us) |\n" + format := "\033[47;30m %02v:%02v:%02v | TOTAL REQ | TOTAL TIME(us) | TOTAL AVG(us) | REQUEST | TIME(us) | AVERAGE(us) |\033[0m\n" fmt.Printf(format, seconds/3600, seconds%3600/60, seconds%60) - fmt.Println("-----------------------------------------------------------------------------------------------") tr := current.succeeded + current.failed td := current.succeededDuration + current.failedDuration @@ -269,7 +273,7 @@ func getStatPrinter() func(tm time.Time) { if r > 0 { a = float64(d) / float64(r) } - format = "\033[K TOTAL | %9v | %14v | %13.2f | %7v | %10v | % 13.2f |\n" + format = " TOTAL | %9v | %14v | %13.2f | %7v | %10v | % 13.2f |\n" fmt.Printf(format, tr, td, ta, r, d, a) tr = current.succeeded @@ -283,7 +287,7 @@ func getStatPrinter() func(tm time.Time) { if r > 0 { a = float64(d) / float64(r) } - format = "\033[K SUCCESS | \033[32m%9v\033[0m | \033[32m%14v\033[0m | \033[32m%13.2f\033[0m | \033[32m%7v\033[0m | \033[32m%10v\033[0m | \033[32m%13.2f\033[0m |\n" + format = " SUCCESS | \033[32m%9v\033[0m | \033[32m%14v\033[0m | \033[32m%13.2f\033[0m | \033[32m%7v\033[0m | \033[32m%10v\033[0m | \033[32m%13.2f\033[0m |\n" fmt.Printf(format, tr, td, ta, r, d, a) tr = current.failed @@ -297,7 +301,7 @@ func getStatPrinter() func(tm time.Time) { if r > 0 { a = float64(d) / float64(r) } - format = "\033[K FAIL | \033[31m%9v\033[0m | \033[31m%14v\033[0m | \033[31m%13.2f\033[0m | \033[31m%7v\033[0m | \033[31m%10v\033[0m | \033[31m%13.2f\033[0m |\n" + format = " FAIL | \033[31m%9v\033[0m | \033[31m%14v\033[0m | \033[31m%13.2f\033[0m | \033[31m%7v\033[0m | \033[31m%10v\033[0m | \033[31m%13.2f\033[0m |\n" fmt.Printf(format, tr, td, ta, r, d, a) last = current @@ -305,8 +309,35 @@ func getStatPrinter() func(tm time.Time) { } } +func startLogger(path string) error { + if len(path) == 0 { + return nil + } + + f, e := os.Create(path) + if e != nil { + return e + } + + chLog = make(chan string, 100) + wgLog.Add(1) + go func() { + for s := range chLog { + if f != nil { + f.WriteString(s) + f.WriteString("\n") + } + } + f.Close() + wgLog.Done() + }() + + return nil +} + func main() { var concurrency uint + var logPath string flag.StringVar(&host, "h", "localhost", "host name or IP address of TDengine server") flag.UintVar(&port, "P", 0, "port (default 0)") flag.StringVar(&database, "d", "test", "database name") @@ -314,8 +345,14 @@ func main() { flag.StringVar(&password, "p", "taosdata", "password") flag.BoolVar(&fetch, "f", true, "fetch result or not") flag.UintVar(&concurrency, "c", 4, "concurrency, number of goroutines for query") + flag.StringVar(&logPath, "l", "", "path of log file (default: no log)") flag.Parse() + if e := startLogger(logPath); e != nil { + fmt.Println("failed to open log file:", e.Error()) + return + } + pathOrSQL := flag.Arg(0) if len(pathOrSQL) == 0 { pathOrSQL = "cases.json" @@ -327,16 +364,14 @@ func main() { rand.Seed(time.Now().UnixNano()) - fmt.Println() - fmt.Printf("SERVER: %s DATABASE: %s CONCURRENCY: %d FETCH DATA: %v\n", host, database, concurrency, fetch) - fmt.Println() + fmt.Printf("\nSERVER: %s DATABASE: %s CONCURRENCY: %d FETCH DATA: %v\n\n", host, database, concurrency, fetch) startAt = time.Now() printStat := getStatPrinter() printStat(startAt) for i := uint(0); i < concurrency; i++ { - wg.Add(1) + wgTest.Add(1) go runTest() } @@ -352,16 +387,20 @@ LOOP: case <-interrupt: break LOOP case tm := <-ticker.C: - fmt.Print("\033[5A") + fmt.Print("\033[4A") printStat(tm) } } atomic.StoreInt64(&shouldStop, 1) fmt.Print("\033[100D'Ctrl + C' received, Waiting started query to stop...") + wgTest.Wait() - wg.Wait() - fmt.Print("\033[5A\033[100D") + if chLog != nil { + close(chLog) + wgLog.Wait() + } + fmt.Print("\033[4A\033[100D") printStat(time.Now()) fmt.Println() }