Merge pull request #7749 from taosdata/test/TD-6256-d
[TD-6256]<test> add subscribe case for go
This commit is contained in:
commit
2a5cbf770a
|
@ -17,5 +17,5 @@ go env -w GO111MODULE=on
|
|||
go env -w GOPROXY=https://goproxy.io,direct
|
||||
|
||||
bash ./case001/case001.sh $severIp $serverPort
|
||||
#bash ./case002/case002.sh $severIp $serverPort
|
||||
bash ./case002/case002.sh $severIp $serverPort
|
||||
#bash ./case003/case003.sh $severIp $serverPort
|
||||
|
|
|
@ -19,7 +19,7 @@ import (
|
|||
"database/sql"
|
||||
"flag"
|
||||
"fmt"
|
||||
_ "github.com/taosdata/driver-go/taosSql"
|
||||
_ "github.com/taosdata/driver-go/v2/taosSql"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
@ -63,6 +63,7 @@ func main() {
|
|||
|
||||
url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
|
||||
// open connect to taos server
|
||||
fmt.Printf("url:%s",url)
|
||||
db, err := sql.Open(taosDriverName, url)
|
||||
if err != nil {
|
||||
log.Fatalf("Open database error: %s\n", err)
|
||||
|
@ -168,17 +169,18 @@ func insert_data(db *sql.DB, demot string) {
|
|||
|
||||
func select_data(db *sql.DB, demot string) {
|
||||
st := time.Now().Nanosecond()
|
||||
|
||||
fmt.Println(demot)
|
||||
rows, err := db.Query("select * from ? ", demot) // go text mode
|
||||
fmt.Println("end query",err)
|
||||
checkErr(err, "select db.Query")
|
||||
|
||||
fmt.Printf("%10s%s%8s %5s %9s%s %s %8s%s %7s%s %8s%s %4s%s %5s%s\n", " ", "ts", " ", "id", " ", "name", " ", "len", " ", "flag", " ", "notes", " ", "fv", " ", " ", "dv")
|
||||
var affectd int
|
||||
|
||||
//decoder := mahonia.NewDecoder("gbk") // 把原来ANSI格式的文本文件里的字符,用gbk进行解码。
|
||||
|
||||
fmt.Println("start next")
|
||||
for rows.Next() {
|
||||
var ts string
|
||||
var ts time.Time
|
||||
var name string
|
||||
var id int
|
||||
var len int8
|
||||
|
@ -188,6 +190,7 @@ func select_data(db *sql.DB, demot string) {
|
|||
var dv float64
|
||||
|
||||
err = rows.Scan(&ts, &id, &name, &len, &flag, ¬es, &fv, &dv)
|
||||
fmt.Println("rows:",err)
|
||||
checkErr(err, "select rows.Scan")
|
||||
|
||||
fmt.Printf("%s|\t", ts)
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
@echo off
|
||||
echo ==== start run cases001.go
|
||||
|
||||
del go.*
|
||||
go mod init demotest
|
||||
go build
|
||||
demotest.exe -h %1 -p %2
|
||||
cd ..
|
||||
|
|
@ -0,0 +1,81 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
taos "github.com/taosdata/driver-go/v2/af"
|
||||
)
|
||||
|
||||
func Subscribe_check(topic taos.Subscriber, check int) bool {
|
||||
count := 0
|
||||
rows, err := topic.Consume()
|
||||
defer func() { rows.Close(); time.Sleep(time.Second) }()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(3)
|
||||
}
|
||||
for {
|
||||
values := make([]driver.Value, 2)
|
||||
err := rows.Next(values)
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(4)
|
||||
}
|
||||
count++
|
||||
}
|
||||
if count == check {
|
||||
return false
|
||||
} else {
|
||||
return true
|
||||
}
|
||||
}
|
||||
func main() {
|
||||
ts := 1630461600000
|
||||
db, err := taos.Open("127.0.0.1", "", "", "", 0)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer db.Close()
|
||||
db.Exec("drop if exists database test")
|
||||
db.Exec("create if not exists database test")
|
||||
db.Exec("use test")
|
||||
db.Exec("drop if exists database test")
|
||||
db.Exec("create table test (ts timestamp ,level int)")
|
||||
for i := 0; i < 10; i++ {
|
||||
sqlcmd := fmt.Sprintf("insert into test values(%d,%d)", ts+i, i)
|
||||
db.Exec(sqlcmd)
|
||||
}
|
||||
|
||||
fmt.Println("consumption 01.")
|
||||
topic, err := db.Subscribe(false, "test", "select ts, level from test", time.Second)
|
||||
if Subscribe_check(topic, 10) {
|
||||
os.Exit(3)
|
||||
}
|
||||
|
||||
fmt.Println("consumption 02: no new rows inserted")
|
||||
if Subscribe_check(topic, 0) {
|
||||
os.Exit(3)
|
||||
}
|
||||
|
||||
fmt.Println("consumption 03: after one new rows inserted")
|
||||
sqlcmd := fmt.Sprintf("insert into test values(%d,%d)", ts+10, 10)
|
||||
db.Exec(sqlcmd)
|
||||
if Subscribe_check(topic, 1) {
|
||||
os.Exit(3)
|
||||
}
|
||||
|
||||
fmt.Println("consumption 04: keep progress and continue previous subscription")
|
||||
topic.Unsubscribe(true)
|
||||
topic, err = db.Subscribe(false, "test", "select ts, level from test", time.Second)
|
||||
if Subscribe_check(topic, 0) {
|
||||
os.Exit(3)
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
#!/bin/bash
|
||||
|
||||
echo "==== start run cases001.go"
|
||||
|
||||
set +e
|
||||
#set -x
|
||||
|
||||
script_dir="$(dirname $(readlink -f $0))"
|
||||
#echo "pwd: $script_dir, para0: $0"
|
||||
|
||||
#execName=$0
|
||||
#execName=`echo ${execName##*/}`
|
||||
#goName=`echo ${execName%.*}`
|
||||
|
||||
###### step 3: start build
|
||||
cd $script_dir
|
||||
rm -f go.*
|
||||
go mod init demotest > /dev/null 2>&1
|
||||
go mod tidy > /dev/null 2>&1
|
||||
go build > /dev/null 2>&1
|
||||
sleep 1s
|
||||
./demotest -h $1 -p $2
|
Loading…
Reference in New Issue