From 9a9afc0666548585123a7ed73058e0ff8eaad59c Mon Sep 17 00:00:00 2001 From: zhaoyanggh Date: Tue, 16 Aug 2022 12:06:35 +0800 Subject: [PATCH 01/18] docs: refine python tmq doc --- docs/examples/python/tmq_example.py | 63 +---------- docs/zh/07-develop/07-tmq.mdx | 157 ++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+), 58 deletions(-) diff --git a/docs/examples/python/tmq_example.py b/docs/examples/python/tmq_example.py index 1f6da3d1b6..cee036454e 100644 --- a/docs/examples/python/tmq_example.py +++ b/docs/examples/python/tmq_example.py @@ -1,59 +1,6 @@ import taos -from taos.tmq import * - -conn = taos.connect() - -# create database -conn.execute("drop database if exists py_tmq") -conn.execute("create database if not exists py_tmq vgroups 2") - -# create table and stables -conn.select_db("py_tmq") -conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)") -conn.execute("create table if not exists tb1 using stb1 tags(1)") -conn.execute("create table if not exists tb2 using stb1 tags(2)") -conn.execute("create table if not exists tb3 using stb1 tags(3)") - -# create topic -conn.execute("drop topic if exists topic_ctb_column") -conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1") - -# set consumer configure options -conf = TaosTmqConf() -conf.set("group.id", "tg2") -conf.set("td.connect.user", "root") -conf.set("td.connect.pass", "taosdata") -conf.set("enable.auto.commit", "true") -conf.set("msg.with.table.name", "true") - -def tmq_commit_cb_print(tmq, resp, offset, param=None): - print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}") - -conf.set_auto_commit_cb(tmq_commit_cb_print, None) - -# build consumer -tmq = conf.new_consumer() - -# build topic list -topic_list = TaosTmqList() -topic_list.append("topic_ctb_column") - -# subscribe consumer -tmq.subscribe(topic_list) - -# check subscriptions -sub_list = tmq.subscription() -print("subscribed topics: ",sub_list) - -# start subscribe -while 1: - res = tmq.poll(1000) - if res: - topic = res.get_topic_name() - vg = res.get_vgroup_id() - db = res.get_db_name() - print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}") - for row in res: - print(row) - tb = res.get_table_name() - print(f"from table: {tb}") +from taos.tmq import TaosConsumer +consumer = TaosConsumer('topic_ctb_column', group_id='vg2') +for msg in consumer: + for row in msg: + print(row) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index f36f76fd85..23574e7478 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -88,6 +88,110 @@ void close() throws SQLException; ``` + + + +```python +class TaosConsumer(): + DEFAULT_CONFIG = { + 'group.id', + 'client.id', + 'enable.auto.commit', + 'auto.commit.interval.ms', + 'auto.offset.reset', + 'msg.with.table.name', + 'experimental.snapshot.enable', + 'enable.heartbeat.background', + 'experimental.snapshot.batch.size', + 'td.connect.ip', + 'td.connect.user', + 'td.connect.pass', + 'td.connect.port', + 'td.connect.db', + 'timeout' + } + + def __init__(self, *topics, **configs): + self._closed = True + self._conf = None + self._list = None + self._tmq = None + + keys = list(configs.keys()) + for k in keys: + configs.update({k.replace('_','.'): configs.pop(k)}) + + extra_configs = set(configs).difference(self.DEFAULT_CONFIG) + if extra_configs: + raise TmqError("Unrecognized configs: %s" % (extra_configs,)) + + self._conf = tmq_conf_new() + self._list = tmq_list_new() + + # set poll timeout + if 'timeout' in configs: + self._timeout = configs['timeout'] + del configs['timeout'] + else: + self._timeout = 0 + + # check if group id is set + + if 'group.id' not in configs: + raise TmqError("missing group.id in consumer config setting") + + for key, value in configs.items(): + tmq_conf_set(self._conf, key, value) + + self._tmq = tmq_consumer_new(self._conf) + + if not topics: + raise TmqError("Unset topic for Consumer") + + for topic in topics: + tmq_list_append(self._list, topic) + + tmq_subscribe(self._tmq, self._list) + + + def __iter__(self): + return self + + def __next__(self): + if not self._tmq: + raise StopIteration('TaosConsumer closed') + return next(self.sync_next()) + + def sync_next(self): + while 1: + res = tmq_consumer_poll(self._tmq, self._timeout) + if res: + break + yield TaosResult(res) + + def subscription(self): + if self._tmq is None: + return None + return tmq_subscription(self._tmq) + + def unsubscribe(self): + tmq_unsubscribe(self._tmq) + + def close(self): + if self._tmq: + tmq_consumer_close(self._tmq) + self._tmq = None + + def __del__(self): + if self._conf: + tmq_conf_destroy(self._conf) + if self._list: + tmq_list_destroy(self._list) + if self._tmq: + tmq_consumer_close(self._tmq) +``` + + ## 写入数据 @@ -230,6 +334,27 @@ public class MetersDeserializer extends ReferenceDeserializer { ``` + + + + +| 参数名称 | 类型 | 参数说明 | 备注 | +| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- | +| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | | +| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | | +| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | | +| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | | +| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 | +| `client_id` | string | 客户端 ID | 最大长度:192。 | +| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) | +| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 | +| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | | +| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | +| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` | +| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` | + + + 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 @@ -262,6 +387,14 @@ consumer.subscribe(topics); + + +```python +consumer = TaosConsumer('topic_ctb_column', group_id='vg2') +``` + + + ## 消费 @@ -294,6 +427,17 @@ while(running){ ``` + + + +```python +for msg in consumer: + for row in msg: + print(row) +``` + + + ## 结束消费 @@ -322,6 +466,19 @@ consumer.unsubscribe(); consumer.close(); ``` + + + + + +```python +/* 取消订阅 */ +consumer.unsubscribe(); + +/* 关闭消费 */ +consumer.close(); +``` + From 4b85d81d373ad91d497127b65292049175d79b10 Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Tue, 16 Aug 2022 14:31:42 +0800 Subject: [PATCH 02/18] docs(TMQ): improve tmq document --- docs/zh/07-develop/07-tmq.mdx | 105 ++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index f36f76fd85..aa599c2173 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -87,6 +87,25 @@ void commitSync() throws SQLException; void close() throws SQLException; ``` + + + +```go +func NewConsumer(conf *Config) (*Consumer, error) + +func (c *Consumer) Close() error + +func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error + +func (c *Consumer) FreeMessage(message unsafe.Pointer) + +func (c *Consumer) Poll(timeout time.Duration) (*Result, error) + +func (c *Consumer) Subscribe(topics []string) error + +func (c *Consumer) Unsubscribe() error +``` + @@ -229,6 +248,56 @@ public class MetersDeserializer extends ReferenceDeserializer { } ``` + + + +```go +config := tmq.NewConfig() +defer config.Destroy() +err = config.SetGroupID("test") +if err != nil { + panic(err) +} +err = config.SetAutoOffsetReset("earliest") +if err != nil { + panic(err) +} +err = config.SetConnectIP("127.0.0.1") +if err != nil { + panic(err) +} +err = config.SetConnectUser("root") +if err != nil { + panic(err) +} +err = config.SetConnectPass("taosdata") +if err != nil { + panic(err) +} +err = config.SetConnectPort("6030") +if err != nil { + panic(err) +} +err = config.SetMsgWithTableName(true) +if err != nil { + panic(err) +} +err = config.EnableHeartBeat() +if err != nil { + panic(err) +} +err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) { + if result.ErrCode != 0 { + errStr := wrapper.TMQErr2Str(result.ErrCode) + err := errors.NewError(int(result.ErrCode), errStr) + panic(err) + } +}) +if err != nil { + panic(err) +} +``` + @@ -260,6 +329,20 @@ topics.add("tmq_topic"); consumer.subscribe(topics); ``` + + + +```go +consumer, err := tmq.NewConsumer(config) +if err != nil { + panic(err) +} +err = consumer.Subscribe([]string{"example_tmq_topic"}) +if err != nil { + panic(err) +} +``` + @@ -293,6 +376,21 @@ while(running){ } ``` + + + +```go +for { + result, err := consumer.Poll(time.Second) + if err != nil { + panic(err) + } + fmt.Println(result) + consumer.Commit(context.Background(), result.Message) + consumer.FreeMessage(result.Message) +} +``` + @@ -322,6 +420,13 @@ consumer.unsubscribe(); consumer.close(); ``` + + + +```go +consumer.Close() +``` + From a325c5e7f2b5cf373cc5c265e2ca3d332b4fe568 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 14:58:37 +0800 Subject: [PATCH 03/18] feat: add download --- docs/zh/05-get-started/03-package.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 846cd9e9cd..4a5329323d 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -5,6 +5,7 @@ title: 使用安装包立即开始 import Tabs from "@theme/Tabs"; import TabItem from "@theme/TabItem"; +import PkgListV3 from "/components/PkgListV3"; 在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用Docker立即体验](../../get-started/docker/)。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. @@ -64,6 +65,8 @@ sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm + + 1. 从 [发布历史页面](../../releases) 下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz; 2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本: @@ -85,6 +88,8 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问 + + 1. 从 [发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe; 2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。 From e5194f5f53cc44d45ad75c07b8627a8db427a990 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:03:17 +0800 Subject: [PATCH 04/18] Update 03-package.md --- docs/zh/05-get-started/03-package.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 4a5329323d..59991cc0c5 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -89,7 +89,7 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问 - + 1. 从 [发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe; 2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。 From e4f9a41f4d4f0d0bd0d55d61be1bf6ed031334fa Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 16 Aug 2022 15:03:20 +0800 Subject: [PATCH 05/18] fix(query): fix sample with partition by invalid pageId cause crash issue TD-17499 --- source/libs/function/src/builtinsimpl.c | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index de72c32fa1..cbf81f1d0d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -4918,6 +4918,16 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx) { return numOfElems; } +static SSampleInfo* getSampleOutputInfo(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + pInfo->data = (char*)pInfo + sizeof(SSampleInfo); + pInfo->tuplePos = (STuplePos*)((char*)pInfo + sizeof(SSampleInfo) + pInfo->samples * pInfo->colBytes); + + return pInfo; +} + bool getSampleFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); @@ -4972,7 +4982,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da int32_t sampleFunction(SqlFunctionCtx* pCtx) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + SSampleInfo* pInfo = getSampleOutputInfo(pCtx); SInputColumnInfoData* pInput = &pCtx->input; @@ -4998,7 +5008,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); - SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo); + SSampleInfo* pInfo = getSampleOutputInfo(pCtx); pEntryInfo->complete = true; int32_t slotId = pCtx->pExpr->base.resSchema.slotId; From f7a2d8f63c234de88285a3d1fc562a11ed471e6d Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:05:16 +0800 Subject: [PATCH 06/18] Update 03-package.md --- docs/zh/05-get-started/03-package.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 59991cc0c5..08b23b5025 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -89,7 +89,7 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问 - + 1. 从 [发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe; 2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。 From 1b86bd111692052dec19e4c57c0af058ba4fa8d5 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:10:07 +0800 Subject: [PATCH 07/18] Update 03-package.md --- docs/zh/05-get-started/03-package.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 08b23b5025..69424a2d18 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -65,9 +65,8 @@ sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm +1. 从列表中下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz; - -1. 从 [发布历史页面](../../releases) 下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz; 2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本: ```bash @@ -81,6 +80,9 @@ sudo ./install.sh ``` :::info + +下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases) + install.sh 安装脚本在执行过程中,会通过命令行交互界面询问一些配置信息。如果希望采取无交互安装方式,那么可以用 -e no 参数来执行 install.sh 脚本。运行 `./install.sh -h` 指令可以查看所有参数的详细说明信息。 ::: From 5f04c27cc2761c8510d845b668d1a9434b1a2380 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:12:10 +0800 Subject: [PATCH 08/18] Update 03-package.md --- docs/zh/05-get-started/03-package.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 69424a2d18..bce7529989 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -82,7 +82,6 @@ sudo ./install.sh :::info 下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases) - install.sh 安装脚本在执行过程中,会通过命令行交互界面询问一些配置信息。如果希望采取无交互安装方式,那么可以用 -e no 参数来执行 install.sh 脚本。运行 `./install.sh -h` 指令可以查看所有参数的详细说明信息。 ::: From 400ff894174a0e9e8d8b6bc4caa79b7bea59ee46 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:15:23 +0800 Subject: [PATCH 09/18] Update 03-package.md --- docs/zh/05-get-started/03-package.md | 65 ++++++++++++++-------------- 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index bce7529989..b35872e7ae 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -11,36 +11,11 @@ import PkgListV3 from "/components/PkgListV3"; ## 安装 +:::info +下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases) +::: + - -可以使用 apt-get 工具从官方仓库安装。 - -**安装包仓库** - -```bash -wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add - -echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | sudo tee /etc/apt/sources.list.d/tdengine-stable.list -``` - -如果安装 Beta 版需要安装包仓库 - -```bash -wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add - -echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list -``` - -**使用 apt-get 命令安装** - -```bash -sudo apt-get update -apt-cache policy tdengine -sudo apt-get install tdengine -``` - -:::tip -apt-get 方式只适用于 Debian 或 Ubuntu 系统 -:::: - 1. 从 [发布历史页面](../../releases) 下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb; @@ -80,10 +55,7 @@ sudo ./install.sh ``` :::info - -下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases) install.sh 安装脚本在执行过程中,会通过命令行交互界面询问一些配置信息。如果希望采取无交互安装方式,那么可以用 -e no 参数来执行 install.sh 脚本。运行 `./install.sh -h` 指令可以查看所有参数的详细说明信息。 - ::: @@ -95,6 +67,35 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问 1. 从 [发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe; 2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。 + + +可以使用 apt-get 工具从官方仓库安装。 + +**安装包仓库** + +```bash +wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add - +echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | sudo tee /etc/apt/sources.list.d/tdengine-stable.list +``` + +如果安装 Beta 版需要安装包仓库 + +```bash +wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add - +echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list +``` + +**使用 apt-get 命令安装** + +```bash +sudo apt-get update +apt-cache policy tdengine +sudo apt-get install tdengine +``` + +:::tip +apt-get 方式只适用于 Debian 或 Ubuntu 系统 +:::: From 29f78522f8a123e40614f4ce88bbdb5f531e18ad Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:18:32 +0800 Subject: [PATCH 10/18] Update _windows_install.mdx --- docs/zh/14-reference/03-connector/_windows_install.mdx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/zh/14-reference/03-connector/_windows_install.mdx b/docs/zh/14-reference/03-connector/_windows_install.mdx index 755f96b2d7..10cf37a7b4 100644 --- a/docs/zh/14-reference/03-connector/_windows_install.mdx +++ b/docs/zh/14-reference/03-connector/_windows_install.mdx @@ -1,8 +1,8 @@ -import PkgList from "/components/PkgList"; +import PkgListV3 from "/components/PkgListV3"; 1. 下载客户端安装包 - + [所有下载](https://www.taosdata.com/cn/all-downloads/) From 397349156f97bd2765a67c78f50d8978e213b579 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:19:17 +0800 Subject: [PATCH 11/18] Update _linux_install.mdx --- docs/zh/14-reference/03-connector/_linux_install.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/14-reference/03-connector/_linux_install.mdx b/docs/zh/14-reference/03-connector/_linux_install.mdx index eb7f683288..a4667caec9 100644 --- a/docs/zh/14-reference/03-connector/_linux_install.mdx +++ b/docs/zh/14-reference/03-connector/_linux_install.mdx @@ -1,4 +1,4 @@ -import PkgList from "/components/PkgList"; +import PkgListV3 from "/components/PkgListV3"; 1. 下载客户端安装包 From 5b3f85ed3ba569a3cf867603d6747f991b5f5636 Mon Sep 17 00:00:00 2001 From: xleili Date: Tue, 16 Aug 2022 15:22:09 +0800 Subject: [PATCH 12/18] docs(driver):update C# 3.0 tmq doc in develop/tmq.mdx --- docs/zh/07-develop/07-tmq.mdx | 90 +++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index f36f76fd85..957a8bbdda 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -87,6 +87,31 @@ void commitSync() throws SQLException; void close() throws SQLException; ``` + + + +```C# +ConsumerBuilder(IEnumerable> config) + +virtual IConsumer Build() + +Consumer(ConsumerBuilder builder) + +void Subscribe(IEnumerable topics) + +void Subscribe(string topic) + +ConsumeResult Consume(int millisecondsTimeout) + +List Subscription() + +void Unsubscribe() + +void Commit(ConsumeResult consumerResult) + +void Close() +``` + @@ -229,6 +254,30 @@ public class MetersDeserializer extends ReferenceDeserializer { } ``` + + + +```C# +using TDengineTMQ; + +// 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、 +// 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数 +var cfg = new ConsumerConfig + { + EnableAutoCommit = "true" + AutoCommitIntervalMs = "1000" + GourpId = "TDengine-TMQ-C#", + TDConnectUser = "root", + TDConnectPasswd = "taosdata", + AutoOffsetReset = "earliest" + MsgWithTableName = "true", + TDConnectIp = "127.0.0.1", + TDConnectPort = "6030" + }; + +var consumer = new ConsumerBuilder(cfg).Build(); +``` + @@ -262,6 +311,18 @@ consumer.subscribe(topics); + + +```C# +// 创建订阅 topics 列表 +List topics = new List(); +topics.add("tmq_topic"); +// 启动订阅 +consumer.Subscribe(topics); +``` + + + ## 消费 @@ -296,6 +357,23 @@ while(running){ + + +```C# +// 消费数据 +while (true) +{ + var consumerRes = consumer.Consume(100); + // process ConsumeResult + ProcessMsg(consumerRes); + consumer.Commit(consumerRes); +} +``` + + + + + ## 结束消费 消费结束后,应当取消订阅。 @@ -322,6 +400,18 @@ consumer.unsubscribe(); consumer.close(); ``` + + + + +```C# +// 取消订阅 +consumer.Unsubscribe(); + +// 关闭消费 +consumer.Close(); +``` + From 0db3fe1e1e617e03435238e327a31e98c803b72d Mon Sep 17 00:00:00 2001 From: xleili Date: Tue, 16 Aug 2022 15:39:22 +0800 Subject: [PATCH 13/18] docs(driver):update markdown syntax mistake caused by merge --- docs/zh/07-develop/07-tmq.mdx | 105 ++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 49 deletions(-) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index 985b6a6ff2..fb308c5c40 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -89,6 +89,26 @@ void close() throws SQLException; + + +```go +func NewConsumer(conf *Config) (*Consumer, error) + +func (c *Consumer) Close() error + +func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error + +func (c *Consumer) FreeMessage(message unsafe.Pointer) + +func (c *Consumer) Poll(timeout time.Duration) (*Result, error) + +func (c *Consumer) Subscribe(topics []string) error + +func (c *Consumer) Unsubscribe() error + +``` + + ```C# @@ -111,27 +131,10 @@ void Unsubscribe() void Commit(ConsumeResult consumerResult) void Close() - - - - -```go -func NewConsumer(conf *Config) (*Consumer, error) - -func (c *Consumer) Close() error - -func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error - -func (c *Consumer) FreeMessage(message unsafe.Pointer) - -func (c *Consumer) Poll(timeout time.Duration) (*Result, error) - -func (c *Consumer) Subscribe(topics []string) error - -func (c *Consumer) Unsubscribe() error ``` + ## 写入数据 @@ -275,29 +278,6 @@ public class MetersDeserializer extends ReferenceDeserializer { - - -```C# -using TDengineTMQ; - -// 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、 -// 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数 -var cfg = new ConsumerConfig - { - EnableAutoCommit = "true" - AutoCommitIntervalMs = "1000" - GourpId = "TDengine-TMQ-C#", - TDConnectUser = "root", - TDConnectPasswd = "taosdata", - AutoOffsetReset = "earliest" - MsgWithTableName = "true", - TDConnectIp = "127.0.0.1", - TDConnectPort = "6030" - }; - -var consumer = new ConsumerBuilder(cfg).Build(); - - ```go @@ -348,6 +328,32 @@ if err != nil { ``` + + + +```C# +using TDengineTMQ; + +// 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、 +// 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数 +var cfg = new ConsumerConfig + { + EnableAutoCommit = "true" + AutoCommitIntervalMs = "1000" + GourpId = "TDengine-TMQ-C#", + TDConnectUser = "root", + TDConnectPasswd = "taosdata", + AutoOffsetReset = "earliest" + MsgWithTableName = "true", + TDConnectIp = "127.0.0.1", + TDConnectPort = "6030" + }; + +var consumer = new ConsumerBuilder(cfg).Build(); + +``` + + 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 @@ -500,6 +506,15 @@ consumer.close(); + + +```go +consumer.Close() + +``` + + + ```C# @@ -510,14 +525,6 @@ consumer.Unsubscribe(); consumer.Close(); - - -```go -consumer.Close() - -``` - - ## 删除 *topic* From e60359336d0d7ed32c5198dd9215e61708343145 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:42:01 +0800 Subject: [PATCH 14/18] Update _linux_install.mdx --- docs/zh/14-reference/03-connector/_linux_install.mdx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/zh/14-reference/03-connector/_linux_install.mdx b/docs/zh/14-reference/03-connector/_linux_install.mdx index a4667caec9..c3ddff53cd 100644 --- a/docs/zh/14-reference/03-connector/_linux_install.mdx +++ b/docs/zh/14-reference/03-connector/_linux_install.mdx @@ -2,9 +2,9 @@ import PkgListV3 from "/components/PkgListV3"; 1. 下载客户端安装包 - + - [所有下载](https://www.taosdata.com/cn/all-downloads/) + [所有下载](../../releases) 2. 解压缩软件包 From 85c4994e7b0c6336e70065cb1f581b0a1866bc05 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:42:32 +0800 Subject: [PATCH 15/18] Update _windows_install.mdx --- docs/zh/14-reference/03-connector/_windows_install.mdx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/zh/14-reference/03-connector/_windows_install.mdx b/docs/zh/14-reference/03-connector/_windows_install.mdx index 10cf37a7b4..9fdefa04c0 100644 --- a/docs/zh/14-reference/03-connector/_windows_install.mdx +++ b/docs/zh/14-reference/03-connector/_windows_install.mdx @@ -4,8 +4,7 @@ import PkgListV3 from "/components/PkgListV3"; - [所有下载](https://www.taosdata.com/cn/all-downloads/) - + [所有下载](../../releases) 2. 执行安装程序,按提示选择默认值,完成安装 3. 安装路径 From a0525216c0c3a6d49696b76b7f03f36f2b0e5bf5 Mon Sep 17 00:00:00 2001 From: xleili Date: Tue, 16 Aug 2022 15:44:51 +0800 Subject: [PATCH 16/18] docs(driver):update C# tmq document --- docs/zh/07-develop/07-tmq.mdx | 5 ----- 1 file changed, 5 deletions(-) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index fb308c5c40..c37a3c4737 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -88,7 +88,6 @@ void close() throws SQLException; ``` - ```go @@ -105,7 +104,6 @@ func (c *Consumer) Poll(timeout time.Duration) (*Result, error) func (c *Consumer) Subscribe(topics []string) error func (c *Consumer) Unsubscribe() error - ``` @@ -277,7 +275,6 @@ public class MetersDeserializer extends ReferenceDeserializer { ``` - ```go @@ -505,7 +502,6 @@ consumer.close(); ``` - ```go @@ -514,7 +510,6 @@ consumer.Close() ``` - ```C# From 093ff2bd54b50f41895151773872b9c1ee73069c Mon Sep 17 00:00:00 2001 From: xleili Date: Tue, 16 Aug 2022 15:46:07 +0800 Subject: [PATCH 17/18] docs(driver):update TMQ markdown doc --- docs/zh/07-develop/07-tmq.mdx | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index c37a3c4737..0945f69fcf 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -506,7 +506,6 @@ consumer.close(); ```go consumer.Close() - ``` From dbf93120ab9a8e48dcef5286afc16fcc68c4ed4f Mon Sep 17 00:00:00 2001 From: Yang Zhao Date: Tue, 16 Aug 2022 15:46:51 +0800 Subject: [PATCH 18/18] Update 07-tmq.mdx --- docs/zh/07-develop/07-tmq.mdx | 100 ++++------------------------------ 1 file changed, 11 insertions(+), 89 deletions(-) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index d8966f7798..1ec1922c01 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -92,102 +92,21 @@ void close() throws SQLException; ```python class TaosConsumer(): - DEFAULT_CONFIG = { - 'group.id', - 'client.id', - 'enable.auto.commit', - 'auto.commit.interval.ms', - 'auto.offset.reset', - 'msg.with.table.name', - 'experimental.snapshot.enable', - 'enable.heartbeat.background', - 'experimental.snapshot.batch.size', - 'td.connect.ip', - 'td.connect.user', - 'td.connect.pass', - 'td.connect.port', - 'td.connect.db', - 'timeout' - } + def __init__(self, *topics, **configs) - def __init__(self, *topics, **configs): - self._closed = True - self._conf = None - self._list = None - self._tmq = None + def __iter__(self) - keys = list(configs.keys()) - for k in keys: - configs.update({k.replace('_','.'): configs.pop(k)}) + def __next__(self) - extra_configs = set(configs).difference(self.DEFAULT_CONFIG) - if extra_configs: - raise TmqError("Unrecognized configs: %s" % (extra_configs,)) - - self._conf = tmq_conf_new() - self._list = tmq_list_new() - - # set poll timeout - if 'timeout' in configs: - self._timeout = configs['timeout'] - del configs['timeout'] - else: - self._timeout = 0 - - # check if group id is set - - if 'group.id' not in configs: - raise TmqError("missing group.id in consumer config setting") - - for key, value in configs.items(): - tmq_conf_set(self._conf, key, value) - - self._tmq = tmq_consumer_new(self._conf) - - if not topics: - raise TmqError("Unset topic for Consumer") - - for topic in topics: - tmq_list_append(self._list, topic) - - tmq_subscribe(self._tmq, self._list) - - - def __iter__(self): - return self - - def __next__(self): - if not self._tmq: - raise StopIteration('TaosConsumer closed') - return next(self.sync_next()) - - def sync_next(self): - while 1: - res = tmq_consumer_poll(self._tmq, self._timeout) - if res: - break - yield TaosResult(res) + def sync_next(self) - def subscription(self): - if self._tmq is None: - return None - return tmq_subscription(self._tmq) + def subscription(self) - def unsubscribe(self): - tmq_unsubscribe(self._tmq) + def unsubscribe(self) - def close(self): - if self._tmq: - tmq_consumer_close(self._tmq) - self._tmq = None + def close(self) - def __del__(self): - if self._conf: - tmq_conf_destroy(self._conf) - if self._list: - tmq_list_destroy(self._list) - if self._tmq: - tmq_consumer_close(self._tmq) + def __del__(self) ``` @@ -354,6 +273,8 @@ public class MetersDeserializer extends ReferenceDeserializer { +Python 使用以下配置项创建一个 Consumer 实例。 + | 参数名称 | 类型 | 参数说明 | 备注 | | :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- | | `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | | @@ -368,6 +289,7 @@ public class MetersDeserializer extends ReferenceDeserializer { | `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | | `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` | | `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` | +| `timeout` | int | 消费者拉去的超时时间 | |