diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index 1ec1922c01..200c1258a9 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -126,8 +126,34 @@ func (c *Consumer) Subscribe(topics []string) error func (c *Consumer) Unsubscribe() error ``` - + + + +```C# +ConsumerBuilder(IEnumerable> config) + +virtual IConsumer Build() + +Consumer(ConsumerBuilder builder) + +void Subscribe(IEnumerable topics) + +void Subscribe(string topic) + +ConsumeResult Consume(int millisecondsTimeout) + +List Subscription() + +void Unsubscribe() + +void Commit(ConsumeResult consumerResult) + +void Close() + +``` + + ## 写入数据 @@ -343,6 +369,32 @@ if err != nil { ``` + + + +```C# +using TDengineTMQ; + +// 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、 +// 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数 +var cfg = new ConsumerConfig + { + EnableAutoCommit = "true" + AutoCommitIntervalMs = "1000" + GourpId = "TDengine-TMQ-C#", + TDConnectUser = "root", + TDConnectPasswd = "taosdata", + AutoOffsetReset = "earliest" + MsgWithTableName = "true", + TDConnectIp = "127.0.0.1", + TDConnectPort = "6030" + }; + +var consumer = new ConsumerBuilder(cfg).Build(); + +``` + + 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 @@ -389,6 +441,20 @@ if err != nil { + + + +```C# +// 创建订阅 topics 列表 +List topics = new List(); +topics.add("tmq_topic"); +// 启动订阅 +consumer.Subscribe(topics); +``` + + + + ```python consumer = TaosConsumer('topic_ctb_column', group_id='vg2') @@ -396,6 +462,7 @@ consumer = TaosConsumer('topic_ctb_column', group_id='vg2') + ## 消费 @@ -455,6 +522,23 @@ for { + + +```C# +// 消费数据 +while (true) +{ + var consumerRes = consumer.Consume(100); + // process ConsumeResult + ProcessMsg(consumerRes); + consumer.Commit(consumerRes); +} +``` + + + + + ## 结束消费 消费结束后,应当取消订阅。 @@ -498,6 +582,16 @@ consumer.Close() ``` + + +```C# +// 取消订阅 +consumer.Unsubscribe(); + +// 关闭消费 +consumer.Close(); + + ## 删除 *topic*