From 9a9afc0666548585123a7ed73058e0ff8eaad59c Mon Sep 17 00:00:00 2001 From: zhaoyanggh Date: Tue, 16 Aug 2022 12:06:35 +0800 Subject: [PATCH 1/5] docs: refine python tmq doc --- docs/examples/python/tmq_example.py | 63 +---------- docs/zh/07-develop/07-tmq.mdx | 157 ++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+), 58 deletions(-) diff --git a/docs/examples/python/tmq_example.py b/docs/examples/python/tmq_example.py index 1f6da3d1b6..cee036454e 100644 --- a/docs/examples/python/tmq_example.py +++ b/docs/examples/python/tmq_example.py @@ -1,59 +1,6 @@ import taos -from taos.tmq import * - -conn = taos.connect() - -# create database -conn.execute("drop database if exists py_tmq") -conn.execute("create database if not exists py_tmq vgroups 2") - -# create table and stables -conn.select_db("py_tmq") -conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)") -conn.execute("create table if not exists tb1 using stb1 tags(1)") -conn.execute("create table if not exists tb2 using stb1 tags(2)") -conn.execute("create table if not exists tb3 using stb1 tags(3)") - -# create topic -conn.execute("drop topic if exists topic_ctb_column") -conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1") - -# set consumer configure options -conf = TaosTmqConf() -conf.set("group.id", "tg2") -conf.set("td.connect.user", "root") -conf.set("td.connect.pass", "taosdata") -conf.set("enable.auto.commit", "true") -conf.set("msg.with.table.name", "true") - -def tmq_commit_cb_print(tmq, resp, offset, param=None): - print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}") - -conf.set_auto_commit_cb(tmq_commit_cb_print, None) - -# build consumer -tmq = conf.new_consumer() - -# build topic list -topic_list = TaosTmqList() -topic_list.append("topic_ctb_column") - -# subscribe consumer -tmq.subscribe(topic_list) - -# check subscriptions -sub_list = tmq.subscription() -print("subscribed topics: ",sub_list) - -# start subscribe -while 1: - res = tmq.poll(1000) - if res: - topic = res.get_topic_name() - vg = res.get_vgroup_id() - db = res.get_db_name() - print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}") - for row in res: - print(row) - tb = res.get_table_name() - print(f"from table: {tb}") +from taos.tmq import TaosConsumer +consumer = TaosConsumer('topic_ctb_column', group_id='vg2') +for msg in consumer: + for row in msg: + print(row) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index f36f76fd85..23574e7478 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -88,6 +88,110 @@ void close() throws SQLException; ``` + + + +```python +class TaosConsumer(): + DEFAULT_CONFIG = { + 'group.id', + 'client.id', + 'enable.auto.commit', + 'auto.commit.interval.ms', + 'auto.offset.reset', + 'msg.with.table.name', + 'experimental.snapshot.enable', + 'enable.heartbeat.background', + 'experimental.snapshot.batch.size', + 'td.connect.ip', + 'td.connect.user', + 'td.connect.pass', + 'td.connect.port', + 'td.connect.db', + 'timeout' + } + + def __init__(self, *topics, **configs): + self._closed = True + self._conf = None + self._list = None + self._tmq = None + + keys = list(configs.keys()) + for k in keys: + configs.update({k.replace('_','.'): configs.pop(k)}) + + extra_configs = set(configs).difference(self.DEFAULT_CONFIG) + if extra_configs: + raise TmqError("Unrecognized configs: %s" % (extra_configs,)) + + self._conf = tmq_conf_new() + self._list = tmq_list_new() + + # set poll timeout + if 'timeout' in configs: + self._timeout = configs['timeout'] + del configs['timeout'] + else: + self._timeout = 0 + + # check if group id is set + + if 'group.id' not in configs: + raise TmqError("missing group.id in consumer config setting") + + for key, value in configs.items(): + tmq_conf_set(self._conf, key, value) + + self._tmq = tmq_consumer_new(self._conf) + + if not topics: + raise TmqError("Unset topic for Consumer") + + for topic in topics: + tmq_list_append(self._list, topic) + + tmq_subscribe(self._tmq, self._list) + + + def __iter__(self): + return self + + def __next__(self): + if not self._tmq: + raise StopIteration('TaosConsumer closed') + return next(self.sync_next()) + + def sync_next(self): + while 1: + res = tmq_consumer_poll(self._tmq, self._timeout) + if res: + break + yield TaosResult(res) + + def subscription(self): + if self._tmq is None: + return None + return tmq_subscription(self._tmq) + + def unsubscribe(self): + tmq_unsubscribe(self._tmq) + + def close(self): + if self._tmq: + tmq_consumer_close(self._tmq) + self._tmq = None + + def __del__(self): + if self._conf: + tmq_conf_destroy(self._conf) + if self._list: + tmq_list_destroy(self._list) + if self._tmq: + tmq_consumer_close(self._tmq) +``` + + ## 写入数据 @@ -230,6 +334,27 @@ public class MetersDeserializer extends ReferenceDeserializer { ``` + + + + +| 参数名称 | 类型 | 参数说明 | 备注 | +| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- | +| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | | +| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | | +| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | | +| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | | +| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 | +| `client_id` | string | 客户端 ID | 最大长度:192。 | +| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) | +| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 | +| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | | +| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | +| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` | +| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` | + + + 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 @@ -262,6 +387,14 @@ consumer.subscribe(topics); + + +```python +consumer = TaosConsumer('topic_ctb_column', group_id='vg2') +``` + + + ## 消费 @@ -294,6 +427,17 @@ while(running){ ``` + + + +```python +for msg in consumer: + for row in msg: + print(row) +``` + + + ## 结束消费 @@ -322,6 +466,19 @@ consumer.unsubscribe(); consumer.close(); ``` + + + + + +```python +/* 取消订阅 */ +consumer.unsubscribe(); + +/* 关闭消费 */ +consumer.close(); +``` + From e4f9a41f4d4f0d0bd0d55d61be1bf6ed031334fa Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 16 Aug 2022 15:03:20 +0800 Subject: [PATCH 2/5] fix(query): fix sample with partition by invalid pageId cause crash issue TD-17499 --- source/libs/function/src/builtinsimpl.c | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index de72c32fa1..cbf81f1d0d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -4918,6 +4918,16 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx) { return numOfElems; } +static SSampleInfo* getSampleOutputInfo(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + pInfo->data = (char*)pInfo + sizeof(SSampleInfo); + pInfo->tuplePos = (STuplePos*)((char*)pInfo + sizeof(SSampleInfo) + pInfo->samples * pInfo->colBytes); + + return pInfo; +} + bool getSampleFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); @@ -4972,7 +4982,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da int32_t sampleFunction(SqlFunctionCtx* pCtx) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + SSampleInfo* pInfo = getSampleOutputInfo(pCtx); SInputColumnInfoData* pInput = &pCtx->input; @@ -4998,7 +5008,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); - SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo); + SSampleInfo* pInfo = getSampleOutputInfo(pCtx); pEntryInfo->complete = true; int32_t slotId = pCtx->pExpr->base.resSchema.slotId; From e60359336d0d7ed32c5198dd9215e61708343145 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:42:01 +0800 Subject: [PATCH 3/5] Update _linux_install.mdx --- docs/zh/14-reference/03-connector/_linux_install.mdx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/zh/14-reference/03-connector/_linux_install.mdx b/docs/zh/14-reference/03-connector/_linux_install.mdx index a4667caec9..c3ddff53cd 100644 --- a/docs/zh/14-reference/03-connector/_linux_install.mdx +++ b/docs/zh/14-reference/03-connector/_linux_install.mdx @@ -2,9 +2,9 @@ import PkgListV3 from "/components/PkgListV3"; 1. 下载客户端安装包 - + - [所有下载](https://www.taosdata.com/cn/all-downloads/) + [所有下载](../../releases) 2. 解压缩软件包 From 85c4994e7b0c6336e70065cb1f581b0a1866bc05 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 16 Aug 2022 15:42:32 +0800 Subject: [PATCH 4/5] Update _windows_install.mdx --- docs/zh/14-reference/03-connector/_windows_install.mdx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/zh/14-reference/03-connector/_windows_install.mdx b/docs/zh/14-reference/03-connector/_windows_install.mdx index 10cf37a7b4..9fdefa04c0 100644 --- a/docs/zh/14-reference/03-connector/_windows_install.mdx +++ b/docs/zh/14-reference/03-connector/_windows_install.mdx @@ -4,8 +4,7 @@ import PkgListV3 from "/components/PkgListV3"; - [所有下载](https://www.taosdata.com/cn/all-downloads/) - + [所有下载](../../releases) 2. 执行安装程序,按提示选择默认值,完成安装 3. 安装路径 From dbf93120ab9a8e48dcef5286afc16fcc68c4ed4f Mon Sep 17 00:00:00 2001 From: Yang Zhao Date: Tue, 16 Aug 2022 15:46:51 +0800 Subject: [PATCH 5/5] Update 07-tmq.mdx --- docs/zh/07-develop/07-tmq.mdx | 100 ++++------------------------------ 1 file changed, 11 insertions(+), 89 deletions(-) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index d8966f7798..1ec1922c01 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -92,102 +92,21 @@ void close() throws SQLException; ```python class TaosConsumer(): - DEFAULT_CONFIG = { - 'group.id', - 'client.id', - 'enable.auto.commit', - 'auto.commit.interval.ms', - 'auto.offset.reset', - 'msg.with.table.name', - 'experimental.snapshot.enable', - 'enable.heartbeat.background', - 'experimental.snapshot.batch.size', - 'td.connect.ip', - 'td.connect.user', - 'td.connect.pass', - 'td.connect.port', - 'td.connect.db', - 'timeout' - } + def __init__(self, *topics, **configs) - def __init__(self, *topics, **configs): - self._closed = True - self._conf = None - self._list = None - self._tmq = None + def __iter__(self) - keys = list(configs.keys()) - for k in keys: - configs.update({k.replace('_','.'): configs.pop(k)}) + def __next__(self) - extra_configs = set(configs).difference(self.DEFAULT_CONFIG) - if extra_configs: - raise TmqError("Unrecognized configs: %s" % (extra_configs,)) - - self._conf = tmq_conf_new() - self._list = tmq_list_new() - - # set poll timeout - if 'timeout' in configs: - self._timeout = configs['timeout'] - del configs['timeout'] - else: - self._timeout = 0 - - # check if group id is set - - if 'group.id' not in configs: - raise TmqError("missing group.id in consumer config setting") - - for key, value in configs.items(): - tmq_conf_set(self._conf, key, value) - - self._tmq = tmq_consumer_new(self._conf) - - if not topics: - raise TmqError("Unset topic for Consumer") - - for topic in topics: - tmq_list_append(self._list, topic) - - tmq_subscribe(self._tmq, self._list) - - - def __iter__(self): - return self - - def __next__(self): - if not self._tmq: - raise StopIteration('TaosConsumer closed') - return next(self.sync_next()) - - def sync_next(self): - while 1: - res = tmq_consumer_poll(self._tmq, self._timeout) - if res: - break - yield TaosResult(res) + def sync_next(self) - def subscription(self): - if self._tmq is None: - return None - return tmq_subscription(self._tmq) + def subscription(self) - def unsubscribe(self): - tmq_unsubscribe(self._tmq) + def unsubscribe(self) - def close(self): - if self._tmq: - tmq_consumer_close(self._tmq) - self._tmq = None + def close(self) - def __del__(self): - if self._conf: - tmq_conf_destroy(self._conf) - if self._list: - tmq_list_destroy(self._list) - if self._tmq: - tmq_consumer_close(self._tmq) + def __del__(self) ``` @@ -354,6 +273,8 @@ public class MetersDeserializer extends ReferenceDeserializer { +Python 使用以下配置项创建一个 Consumer 实例。 + | 参数名称 | 类型 | 参数说明 | 备注 | | :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- | | `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | | @@ -368,6 +289,7 @@ public class MetersDeserializer extends ReferenceDeserializer { | `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | | `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` | | `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` | +| `timeout` | int | 消费者拉去的超时时间 | |