81 lines
1.7 KiB
Go
81 lines
1.7 KiB
Go
package main
|
|
|
|
import (
|
|
"database/sql/driver"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"time"
|
|
|
|
taos "github.com/taosdata/driver-go/v2/af"
|
|
)
|
|
|
|
func Subscribe_check(topic taos.Subscriber, check int) bool {
|
|
count := 0
|
|
rows, err := topic.Consume()
|
|
defer func() { rows.Close(); time.Sleep(time.Second) }()
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
os.Exit(3)
|
|
}
|
|
for {
|
|
values := make([]driver.Value, 2)
|
|
err := rows.Next(values)
|
|
if err == io.EOF {
|
|
break
|
|
} else if err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
os.Exit(4)
|
|
}
|
|
count++
|
|
}
|
|
if count == check {
|
|
return false
|
|
} else {
|
|
return true
|
|
}
|
|
}
|
|
func main() {
|
|
ts := 1630461600000
|
|
db, err := taos.Open("127.0.0.1", "", "", "", 0)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
os.Exit(1)
|
|
}
|
|
defer db.Close()
|
|
db.Exec("drop database if exists test")
|
|
db.Exec("create database if not exists test ")
|
|
db.Exec("use test")
|
|
db.Exec("create table test (ts timestamp ,level int)")
|
|
for i := 0; i < 10; i++ {
|
|
sqlcmd := fmt.Sprintf("insert into test values(%d,%d)", ts+i, i)
|
|
db.Exec(sqlcmd)
|
|
}
|
|
|
|
fmt.Println("consumption 01.")
|
|
topic, err := db.Subscribe(false, "test", "select ts, level from test", time.Second)
|
|
if Subscribe_check(topic, 10) {
|
|
os.Exit(3)
|
|
}
|
|
|
|
fmt.Println("consumption 02: no new rows inserted")
|
|
if Subscribe_check(topic, 0) {
|
|
os.Exit(3)
|
|
}
|
|
|
|
fmt.Println("consumption 03: after one new rows inserted")
|
|
sqlcmd := fmt.Sprintf("insert into test values(%d,%d)", ts+10, 10)
|
|
db.Exec(sqlcmd)
|
|
if Subscribe_check(topic, 1) {
|
|
os.Exit(3)
|
|
}
|
|
|
|
fmt.Println("consumption 04: keep progress and continue previous subscription")
|
|
topic.Unsubscribe(true)
|
|
topic, err = db.Subscribe(false, "test", "select ts, level from test", time.Second)
|
|
if Subscribe_check(topic, 0) {
|
|
os.Exit(3)
|
|
}
|
|
|
|
}
|