From 4b85d81d373ad91d497127b65292049175d79b10 Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Tue, 16 Aug 2022 14:31:42 +0800 Subject: [PATCH] docs(TMQ): improve tmq document --- docs/zh/07-develop/07-tmq.mdx | 105 ++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index f36f76fd85..aa599c2173 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -87,6 +87,25 @@ void commitSync() throws SQLException; void close() throws SQLException; ``` + + + +```go +func NewConsumer(conf *Config) (*Consumer, error) + +func (c *Consumer) Close() error + +func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error + +func (c *Consumer) FreeMessage(message unsafe.Pointer) + +func (c *Consumer) Poll(timeout time.Duration) (*Result, error) + +func (c *Consumer) Subscribe(topics []string) error + +func (c *Consumer) Unsubscribe() error +``` + @@ -229,6 +248,56 @@ public class MetersDeserializer extends ReferenceDeserializer { } ``` + + + +```go +config := tmq.NewConfig() +defer config.Destroy() +err = config.SetGroupID("test") +if err != nil { + panic(err) +} +err = config.SetAutoOffsetReset("earliest") +if err != nil { + panic(err) +} +err = config.SetConnectIP("127.0.0.1") +if err != nil { + panic(err) +} +err = config.SetConnectUser("root") +if err != nil { + panic(err) +} +err = config.SetConnectPass("taosdata") +if err != nil { + panic(err) +} +err = config.SetConnectPort("6030") +if err != nil { + panic(err) +} +err = config.SetMsgWithTableName(true) +if err != nil { + panic(err) +} +err = config.EnableHeartBeat() +if err != nil { + panic(err) +} +err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) { + if result.ErrCode != 0 { + errStr := wrapper.TMQErr2Str(result.ErrCode) + err := errors.NewError(int(result.ErrCode), errStr) + panic(err) + } +}) +if err != nil { + panic(err) +} +``` + @@ -260,6 +329,20 @@ topics.add("tmq_topic"); consumer.subscribe(topics); ``` + + + +```go +consumer, err := tmq.NewConsumer(config) +if err != nil { + panic(err) +} +err = consumer.Subscribe([]string{"example_tmq_topic"}) +if err != nil { + panic(err) +} +``` + @@ -293,6 +376,21 @@ while(running){ } ``` + + + +```go +for { + result, err := consumer.Poll(time.Second) + if err != nil { + panic(err) + } + fmt.Println(result) + consumer.Commit(context.Background(), result.Message) + consumer.FreeMessage(result.Message) +} +``` + @@ -322,6 +420,13 @@ consumer.unsubscribe(); consumer.close(); ``` + + + +```go +consumer.Close() +``` +