From 9c3bfb3890f05215b0b32ea8c3447125ab78fe71 Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Thu, 19 Oct 2023 13:59:41 +0800 Subject: [PATCH] docs(driver-go): update tmq auto.offset.reset configuration --- docs/en/07-develop/07-tmq.mdx | 4 +- docs/en/14-reference/03-connector/05-go.mdx | 88 ++++++++++++--------- docs/examples/go/sub/main.go | 34 ++++---- docs/zh/07-develop/07-tmq.md | 4 +- docs/zh/08-connector/20-go.mdx | 88 ++++++++++++--------- 5 files changed, 122 insertions(+), 96 deletions(-) diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx index ecadb5a499..6b23956e8e 100644 --- a/docs/en/07-develop/07-tmq.mdx +++ b/docs/en/07-develop/07-tmq.mdx @@ -421,7 +421,7 @@ public class MetersDeserializer extends ReferenceDeserializer { ```go conf := &tmq.ConfigMap{ "group.id": "test", - "auto.offset.reset": "earliest", + "auto.offset.reset": "latest", "td.connect.ip": "127.0.0.1", "td.connect.user": "root", "td.connect.pass": "taosdata", @@ -510,7 +510,7 @@ var cfg = new ConsumerConfig GourpId = "TDengine-TMQ-C#", TDConnectUser = "root", TDConnectPasswd = "taosdata", - AutoOffsetReset = "earliest" + AutoOffsetReset = "latest" MsgWithTableName = "true", TDConnectIp = "127.0.0.1", TDConnectPort = "6030" diff --git a/docs/en/14-reference/03-connector/05-go.mdx b/docs/en/14-reference/03-connector/05-go.mdx index b3d4857d75..a0be7a4a02 100644 --- a/docs/en/14-reference/03-connector/05-go.mdx +++ b/docs/en/14-reference/03-connector/05-go.mdx @@ -794,7 +794,7 @@ The TDengine Go Connector supports subscription functionality with the following ```go consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ "group.id": "test", - "auto.offset.reset": "earliest", + "auto.offset.reset": "latest", "td.connect.ip": "127.0.0.1", "td.connect.user": "root", "td.connect.pass": "taosdata", @@ -870,6 +870,7 @@ package main import ( "fmt" "os" + "time" "github.com/taosdata/driver-go/v3/af" "github.com/taosdata/driver-go/v3/af/tmq" @@ -890,19 +891,16 @@ func main() { if err != nil { panic(err) } - if err != nil { - panic(err) - } consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ - "group.id": "test", - "auto.offset.reset": "earliest", - "td.connect.ip": "127.0.0.1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "td.connect.port": "6030", - "client.id": "test_tmq_client", - "enable.auto.commit": "false", - "msg.with.table.name": "true", + "group.id": "test", + "auto.offset.reset": "latest", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "td.connect.port": "6030", + "client.id": "test_tmq_client", + "enable.auto.commit": "false", + "msg.with.table.name": "true", }) if err != nil { panic(err) @@ -915,10 +913,16 @@ func main() { if err != nil { panic(err) } - _, err = db.Exec("insert into example_tmq.t1 values(now,1)") - if err != nil { - panic(err) - } + go func() { + for { + _, err = db.Exec("insert into example_tmq.t1 values(now,1)") + if err != nil { + panic(err) + } + time.Sleep(time.Millisecond * 100) + } + }() + for i := 0; i < 5; i++ { ev := consumer.Poll(500) if ev != nil { @@ -972,6 +976,7 @@ package main import ( "database/sql" "fmt" + "time" "github.com/taosdata/driver-go/v3/common" tmqcommon "github.com/taosdata/driver-go/v3/common/tmq" @@ -995,7 +1000,7 @@ func main() { "td.connect.pass": "taosdata", "group.id": "example", "client.id": "example_consumer", - "auto.offset.reset": "earliest", + "auto.offset.reset": "latest", }) if err != nil { panic(err) @@ -1004,29 +1009,34 @@ func main() { if err != nil { panic(err) } + + _, err = db.Exec("create table example_ws_tmq.t_all(ts timestamp," + + "c1 bool," + + "c2 tinyint," + + "c3 smallint," + + "c4 int," + + "c5 bigint," + + "c6 tinyint unsigned," + + "c7 smallint unsigned," + + "c8 int unsigned," + + "c9 bigint unsigned," + + "c10 float," + + "c11 double," + + "c12 binary(20)," + + "c13 nchar(20)" + + ")") + if err != nil { + panic(err) + } go func() { - _, err := db.Exec("create table example_ws_tmq.t_all(ts timestamp," + - "c1 bool," + - "c2 tinyint," + - "c3 smallint," + - "c4 int," + - "c5 bigint," + - "c6 tinyint unsigned," + - "c7 smallint unsigned," + - "c8 int unsigned," + - "c9 bigint unsigned," + - "c10 float," + - "c11 double," + - "c12 binary(20)," + - "c13 nchar(20)" + - ")") - if err != nil { - panic(err) - } - _, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')") - if err != nil { - panic(err) + for { + _, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')") + if err != nil { + panic(err) + } + time.Sleep(time.Millisecond * 100) } + }() for i := 0; i < 5; i++ { ev := consumer.Poll(500) diff --git a/docs/examples/go/sub/main.go b/docs/examples/go/sub/main.go index ed335cfdea..41cb33e94d 100644 --- a/docs/examples/go/sub/main.go +++ b/docs/examples/go/sub/main.go @@ -3,6 +3,7 @@ package main import ( "fmt" "os" + "time" "github.com/taosdata/driver-go/v3/af" "github.com/taosdata/driver-go/v3/af/tmq" @@ -27,15 +28,15 @@ func main() { panic(err) } consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ - "group.id": "test", - "auto.offset.reset": "earliest", - "td.connect.ip": "127.0.0.1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "td.connect.port": "6030", - "client.id": "test_tmq_client", - "enable.auto.commit": "false", - "msg.with.table.name": "true", + "group.id": "test", + "auto.offset.reset": "latest", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "td.connect.port": "6030", + "client.id": "test_tmq_client", + "enable.auto.commit": "false", + "msg.with.table.name": "true", }) if err != nil { panic(err) @@ -48,12 +49,17 @@ func main() { if err != nil { panic(err) } - _, err = db.Exec("insert into example_tmq.t1 values(now,1)") - if err != nil { - panic(err) - } + go func() { + for { + _, err = db.Exec("insert into example_tmq.t1 values(now,1)") + if err != nil { + panic(err) + } + time.Sleep(time.Microsecond * 100) + } + }() for i := 0; i < 5; i++ { - ev := consumer.Poll(0) + ev := consumer.Poll(500) if ev != nil { switch e := ev.(type) { case *tmqcommon.DataMessage: diff --git a/docs/zh/07-develop/07-tmq.md b/docs/zh/07-develop/07-tmq.md index 8e43631c9a..6fbdf11f3f 100644 --- a/docs/zh/07-develop/07-tmq.md +++ b/docs/zh/07-develop/07-tmq.md @@ -420,7 +420,7 @@ public class MetersDeserializer extends ReferenceDeserializer { ```go conf := &tmq.ConfigMap{ "group.id": "test", - "auto.offset.reset": "earliest", + "auto.offset.reset": "latest", "td.connect.ip": "127.0.0.1", "td.connect.user": "root", "td.connect.pass": "taosdata", @@ -511,7 +511,7 @@ var cfg = new ConsumerConfig GourpId = "TDengine-TMQ-C#", TDConnectUser = "root", TDConnectPasswd = "taosdata", - AutoOffsetReset = "earliest" + AutoOffsetReset = "latest" MsgWithTableName = "true", TDConnectIp = "127.0.0.1", TDConnectPort = "6030" diff --git a/docs/zh/08-connector/20-go.mdx b/docs/zh/08-connector/20-go.mdx index 90ef4d83ca..3994278eef 100644 --- a/docs/zh/08-connector/20-go.mdx +++ b/docs/zh/08-connector/20-go.mdx @@ -797,7 +797,7 @@ TDengine Go 连接器支持订阅功能,应用 API 如下: ```go consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ "group.id": "test", - "auto.offset.reset": "earliest", + "auto.offset.reset": "latest", "td.connect.ip": "127.0.0.1", "td.connect.user": "root", "td.connect.pass": "taosdata", @@ -873,6 +873,7 @@ package main import ( "fmt" "os" + "time" "github.com/taosdata/driver-go/v3/af" "github.com/taosdata/driver-go/v3/af/tmq" @@ -893,19 +894,16 @@ func main() { if err != nil { panic(err) } - if err != nil { - panic(err) - } consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ - "group.id": "test", - "auto.offset.reset": "earliest", - "td.connect.ip": "127.0.0.1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "td.connect.port": "6030", - "client.id": "test_tmq_client", - "enable.auto.commit": "false", - "msg.with.table.name": "true", + "group.id": "test", + "auto.offset.reset": "latest", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "td.connect.port": "6030", + "client.id": "test_tmq_client", + "enable.auto.commit": "false", + "msg.with.table.name": "true", }) if err != nil { panic(err) @@ -918,10 +916,16 @@ func main() { if err != nil { panic(err) } - _, err = db.Exec("insert into example_tmq.t1 values(now,1)") - if err != nil { - panic(err) - } + go func() { + for { + _, err = db.Exec("insert into example_tmq.t1 values(now,1)") + if err != nil { + panic(err) + } + time.Sleep(time.Millisecond * 100) + } + }() + for i := 0; i < 5; i++ { ev := consumer.Poll(500) if ev != nil { @@ -975,6 +979,7 @@ package main import ( "database/sql" "fmt" + "time" "github.com/taosdata/driver-go/v3/common" tmqcommon "github.com/taosdata/driver-go/v3/common/tmq" @@ -998,7 +1003,7 @@ func main() { "td.connect.pass": "taosdata", "group.id": "example", "client.id": "example_consumer", - "auto.offset.reset": "earliest", + "auto.offset.reset": "latest", }) if err != nil { panic(err) @@ -1007,29 +1012,34 @@ func main() { if err != nil { panic(err) } + + _, err = db.Exec("create table example_ws_tmq.t_all(ts timestamp," + + "c1 bool," + + "c2 tinyint," + + "c3 smallint," + + "c4 int," + + "c5 bigint," + + "c6 tinyint unsigned," + + "c7 smallint unsigned," + + "c8 int unsigned," + + "c9 bigint unsigned," + + "c10 float," + + "c11 double," + + "c12 binary(20)," + + "c13 nchar(20)" + + ")") + if err != nil { + panic(err) + } go func() { - _, err := db.Exec("create table example_ws_tmq.t_all(ts timestamp," + - "c1 bool," + - "c2 tinyint," + - "c3 smallint," + - "c4 int," + - "c5 bigint," + - "c6 tinyint unsigned," + - "c7 smallint unsigned," + - "c8 int unsigned," + - "c9 bigint unsigned," + - "c10 float," + - "c11 double," + - "c12 binary(20)," + - "c13 nchar(20)" + - ")") - if err != nil { - panic(err) - } - _, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')") - if err != nil { - panic(err) + for { + _, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')") + if err != nil { + panic(err) + } + time.Sleep(time.Millisecond * 100) } + }() for i := 0; i < 5; i++ { ev := consumer.Poll(500)