Merge pull request #23337 from taosdata/docs/xftan/TD-26829/3.0

docs(driver-go): update tmq auto.offset.reset configuration
This commit is contained in:
wade zhang 2023-10-19 14:43:47 +08:00 committed by GitHub
commit 69fad1c65e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 122 additions and 96 deletions

View File

@ -422,7 +422,7 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
```go ```go
conf := &tmq.ConfigMap{ conf := &tmq.ConfigMap{
"group.id": "test", "group.id": "test",
"auto.offset.reset": "earliest", "auto.offset.reset": "latest",
"td.connect.ip": "127.0.0.1", "td.connect.ip": "127.0.0.1",
"td.connect.user": "root", "td.connect.user": "root",
"td.connect.pass": "taosdata", "td.connect.pass": "taosdata",
@ -511,7 +511,7 @@ var cfg = new ConsumerConfig
GourpId = "TDengine-TMQ-C#", GourpId = "TDengine-TMQ-C#",
TDConnectUser = "root", TDConnectUser = "root",
TDConnectPasswd = "taosdata", TDConnectPasswd = "taosdata",
AutoOffsetReset = "earliest" AutoOffsetReset = "latest"
MsgWithTableName = "true", MsgWithTableName = "true",
TDConnectIp = "127.0.0.1", TDConnectIp = "127.0.0.1",
TDConnectPort = "6030" TDConnectPort = "6030"

View File

@ -794,7 +794,7 @@ The TDengine Go Connector supports subscription functionality with the following
```go ```go
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"group.id": "test", "group.id": "test",
"auto.offset.reset": "earliest", "auto.offset.reset": "latest",
"td.connect.ip": "127.0.0.1", "td.connect.ip": "127.0.0.1",
"td.connect.user": "root", "td.connect.user": "root",
"td.connect.pass": "taosdata", "td.connect.pass": "taosdata",
@ -870,6 +870,7 @@ package main
import ( import (
"fmt" "fmt"
"os" "os"
"time"
"github.com/taosdata/driver-go/v3/af" "github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/tmq" "github.com/taosdata/driver-go/v3/af/tmq"
@ -890,19 +891,16 @@ func main() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
if err != nil {
panic(err)
}
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"group.id": "test", "group.id": "test",
"auto.offset.reset": "earliest", "auto.offset.reset": "latest",
"td.connect.ip": "127.0.0.1", "td.connect.ip": "127.0.0.1",
"td.connect.user": "root", "td.connect.user": "root",
"td.connect.pass": "taosdata", "td.connect.pass": "taosdata",
"td.connect.port": "6030", "td.connect.port": "6030",
"client.id": "test_tmq_client", "client.id": "test_tmq_client",
"enable.auto.commit": "false", "enable.auto.commit": "false",
"msg.with.table.name": "true", "msg.with.table.name": "true",
}) })
if err != nil { if err != nil {
panic(err) panic(err)
@ -915,10 +913,16 @@ func main() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
_, err = db.Exec("insert into example_tmq.t1 values(now,1)") go func() {
if err != nil { for {
panic(err) _, err = db.Exec("insert into example_tmq.t1 values(now,1)")
} if err != nil {
panic(err)
}
time.Sleep(time.Millisecond * 100)
}
}()
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
ev := consumer.Poll(500) ev := consumer.Poll(500)
if ev != nil { if ev != nil {
@ -972,6 +976,7 @@ package main
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"time"
"github.com/taosdata/driver-go/v3/common" "github.com/taosdata/driver-go/v3/common"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq" tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
@ -995,7 +1000,7 @@ func main() {
"td.connect.pass": "taosdata", "td.connect.pass": "taosdata",
"group.id": "example", "group.id": "example",
"client.id": "example_consumer", "client.id": "example_consumer",
"auto.offset.reset": "earliest", "auto.offset.reset": "latest",
}) })
if err != nil { if err != nil {
panic(err) panic(err)
@ -1004,29 +1009,34 @@ func main() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
_, err = db.Exec("create table example_ws_tmq.t_all(ts timestamp," +
"c1 bool," +
"c2 tinyint," +
"c3 smallint," +
"c4 int," +
"c5 bigint," +
"c6 tinyint unsigned," +
"c7 smallint unsigned," +
"c8 int unsigned," +
"c9 bigint unsigned," +
"c10 float," +
"c11 double," +
"c12 binary(20)," +
"c13 nchar(20)" +
")")
if err != nil {
panic(err)
}
go func() { go func() {
_, err := db.Exec("create table example_ws_tmq.t_all(ts timestamp," + for {
"c1 bool," + _, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')")
"c2 tinyint," + if err != nil {
"c3 smallint," + panic(err)
"c4 int," + }
"c5 bigint," + time.Sleep(time.Millisecond * 100)
"c6 tinyint unsigned," +
"c7 smallint unsigned," +
"c8 int unsigned," +
"c9 bigint unsigned," +
"c10 float," +
"c11 double," +
"c12 binary(20)," +
"c13 nchar(20)" +
")")
if err != nil {
panic(err)
}
_, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')")
if err != nil {
panic(err)
} }
}() }()
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
ev := consumer.Poll(500) ev := consumer.Poll(500)

View File

@ -3,6 +3,7 @@ package main
import ( import (
"fmt" "fmt"
"os" "os"
"time"
"github.com/taosdata/driver-go/v3/af" "github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/tmq" "github.com/taosdata/driver-go/v3/af/tmq"
@ -27,15 +28,15 @@ func main() {
panic(err) panic(err)
} }
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"group.id": "test", "group.id": "test",
"auto.offset.reset": "earliest", "auto.offset.reset": "latest",
"td.connect.ip": "127.0.0.1", "td.connect.ip": "127.0.0.1",
"td.connect.user": "root", "td.connect.user": "root",
"td.connect.pass": "taosdata", "td.connect.pass": "taosdata",
"td.connect.port": "6030", "td.connect.port": "6030",
"client.id": "test_tmq_client", "client.id": "test_tmq_client",
"enable.auto.commit": "false", "enable.auto.commit": "false",
"msg.with.table.name": "true", "msg.with.table.name": "true",
}) })
if err != nil { if err != nil {
panic(err) panic(err)
@ -48,12 +49,17 @@ func main() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
_, err = db.Exec("insert into example_tmq.t1 values(now,1)") go func() {
if err != nil { for {
panic(err) _, err = db.Exec("insert into example_tmq.t1 values(now,1)")
} if err != nil {
panic(err)
}
time.Sleep(time.Microsecond * 100)
}
}()
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
ev := consumer.Poll(0) ev := consumer.Poll(500)
if ev != nil { if ev != nil {
switch e := ev.(type) { switch e := ev.(type) {
case *tmqcommon.DataMessage: case *tmqcommon.DataMessage:

View File

@ -421,7 +421,7 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
```go ```go
conf := &tmq.ConfigMap{ conf := &tmq.ConfigMap{
"group.id": "test", "group.id": "test",
"auto.offset.reset": "earliest", "auto.offset.reset": "latest",
"td.connect.ip": "127.0.0.1", "td.connect.ip": "127.0.0.1",
"td.connect.user": "root", "td.connect.user": "root",
"td.connect.pass": "taosdata", "td.connect.pass": "taosdata",
@ -512,7 +512,7 @@ var cfg = new ConsumerConfig
GourpId = "TDengine-TMQ-C#", GourpId = "TDengine-TMQ-C#",
TDConnectUser = "root", TDConnectUser = "root",
TDConnectPasswd = "taosdata", TDConnectPasswd = "taosdata",
AutoOffsetReset = "earliest" AutoOffsetReset = "latest"
MsgWithTableName = "true", MsgWithTableName = "true",
TDConnectIp = "127.0.0.1", TDConnectIp = "127.0.0.1",
TDConnectPort = "6030" TDConnectPort = "6030"

View File

@ -797,7 +797,7 @@ TDengine Go 连接器支持订阅功能,应用 API 如下:
```go ```go
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"group.id": "test", "group.id": "test",
"auto.offset.reset": "earliest", "auto.offset.reset": "latest",
"td.connect.ip": "127.0.0.1", "td.connect.ip": "127.0.0.1",
"td.connect.user": "root", "td.connect.user": "root",
"td.connect.pass": "taosdata", "td.connect.pass": "taosdata",
@ -873,6 +873,7 @@ package main
import ( import (
"fmt" "fmt"
"os" "os"
"time"
"github.com/taosdata/driver-go/v3/af" "github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/tmq" "github.com/taosdata/driver-go/v3/af/tmq"
@ -893,19 +894,16 @@ func main() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
if err != nil {
panic(err)
}
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"group.id": "test", "group.id": "test",
"auto.offset.reset": "earliest", "auto.offset.reset": "latest",
"td.connect.ip": "127.0.0.1", "td.connect.ip": "127.0.0.1",
"td.connect.user": "root", "td.connect.user": "root",
"td.connect.pass": "taosdata", "td.connect.pass": "taosdata",
"td.connect.port": "6030", "td.connect.port": "6030",
"client.id": "test_tmq_client", "client.id": "test_tmq_client",
"enable.auto.commit": "false", "enable.auto.commit": "false",
"msg.with.table.name": "true", "msg.with.table.name": "true",
}) })
if err != nil { if err != nil {
panic(err) panic(err)
@ -918,10 +916,16 @@ func main() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
_, err = db.Exec("insert into example_tmq.t1 values(now,1)") go func() {
if err != nil { for {
panic(err) _, err = db.Exec("insert into example_tmq.t1 values(now,1)")
} if err != nil {
panic(err)
}
time.Sleep(time.Millisecond * 100)
}
}()
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
ev := consumer.Poll(500) ev := consumer.Poll(500)
if ev != nil { if ev != nil {
@ -975,6 +979,7 @@ package main
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"time"
"github.com/taosdata/driver-go/v3/common" "github.com/taosdata/driver-go/v3/common"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq" tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
@ -998,7 +1003,7 @@ func main() {
"td.connect.pass": "taosdata", "td.connect.pass": "taosdata",
"group.id": "example", "group.id": "example",
"client.id": "example_consumer", "client.id": "example_consumer",
"auto.offset.reset": "earliest", "auto.offset.reset": "latest",
}) })
if err != nil { if err != nil {
panic(err) panic(err)
@ -1007,29 +1012,34 @@ func main() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
_, err = db.Exec("create table example_ws_tmq.t_all(ts timestamp," +
"c1 bool," +
"c2 tinyint," +
"c3 smallint," +
"c4 int," +
"c5 bigint," +
"c6 tinyint unsigned," +
"c7 smallint unsigned," +
"c8 int unsigned," +
"c9 bigint unsigned," +
"c10 float," +
"c11 double," +
"c12 binary(20)," +
"c13 nchar(20)" +
")")
if err != nil {
panic(err)
}
go func() { go func() {
_, err := db.Exec("create table example_ws_tmq.t_all(ts timestamp," + for {
"c1 bool," + _, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')")
"c2 tinyint," + if err != nil {
"c3 smallint," + panic(err)
"c4 int," + }
"c5 bigint," + time.Sleep(time.Millisecond * 100)
"c6 tinyint unsigned," +
"c7 smallint unsigned," +
"c8 int unsigned," +
"c9 bigint unsigned," +
"c10 float," +
"c11 double," +
"c12 binary(20)," +
"c13 nchar(20)" +
")")
if err != nil {
panic(err)
}
_, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')")
if err != nil {
panic(err)
} }
}() }()
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
ev := consumer.Poll(500) ev := consumer.Poll(500)