diff --git a/docs/examples/python/tmq_example.py b/docs/examples/python/tmq_example.py
index 1f6da3d1b6..cee036454e 100644
--- a/docs/examples/python/tmq_example.py
+++ b/docs/examples/python/tmq_example.py
@@ -1,59 +1,6 @@
import taos
-from taos.tmq import *
-
-conn = taos.connect()
-
-# create database
-conn.execute("drop database if exists py_tmq")
-conn.execute("create database if not exists py_tmq vgroups 2")
-
-# create table and stables
-conn.select_db("py_tmq")
-conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
-conn.execute("create table if not exists tb1 using stb1 tags(1)")
-conn.execute("create table if not exists tb2 using stb1 tags(2)")
-conn.execute("create table if not exists tb3 using stb1 tags(3)")
-
-# create topic
-conn.execute("drop topic if exists topic_ctb_column")
-conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
-
-# set consumer configure options
-conf = TaosTmqConf()
-conf.set("group.id", "tg2")
-conf.set("td.connect.user", "root")
-conf.set("td.connect.pass", "taosdata")
-conf.set("enable.auto.commit", "true")
-conf.set("msg.with.table.name", "true")
-
-def tmq_commit_cb_print(tmq, resp, offset, param=None):
- print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
-
-conf.set_auto_commit_cb(tmq_commit_cb_print, None)
-
-# build consumer
-tmq = conf.new_consumer()
-
-# build topic list
-topic_list = TaosTmqList()
-topic_list.append("topic_ctb_column")
-
-# subscribe consumer
-tmq.subscribe(topic_list)
-
-# check subscriptions
-sub_list = tmq.subscription()
-print("subscribed topics: ",sub_list)
-
-# start subscribe
-while 1:
- res = tmq.poll(1000)
- if res:
- topic = res.get_topic_name()
- vg = res.get_vgroup_id()
- db = res.get_db_name()
- print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
- for row in res:
- print(row)
- tb = res.get_table_name()
- print(f"from table: {tb}")
+from taos.tmq import TaosConsumer
+consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
+for msg in consumer:
+ for row in msg:
+ print(row)
diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx
index 0945f69fcf..200c1258a9 100644
--- a/docs/zh/07-develop/07-tmq.mdx
+++ b/docs/zh/07-develop/07-tmq.mdx
@@ -87,6 +87,27 @@ void commitSync() throws SQLException;
void close() throws SQLException;
```
+
+
+
+```python
+class TaosConsumer():
+ def __init__(self, *topics, **configs)
+
+ def __iter__(self)
+
+ def __next__(self)
+
+ def sync_next(self)
+
+ def subscription(self)
+
+ def unsubscribe(self)
+
+ def close(self)
+
+ def __del__(self)
+```
@@ -275,6 +296,29 @@ public class MetersDeserializer extends ReferenceDeserializer {
```
+
+
+
+Python 使用以下配置项创建一个 Consumer 实例。
+
+| 参数名称 | 类型 | 参数说明 | 备注 |
+| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
+| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | |
+| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | |
+| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | |
+| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | |
+| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
+| `client_id` | string | 客户端 ID | 最大长度:192。 |
+| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
+| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 |
+| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | |
+| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
+| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` |
+| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` |
+| `timeout` | int | 消费者拉去的超时时间 | |
+
+
+
```go
@@ -397,6 +441,7 @@ if err != nil {
+
```C#
@@ -409,6 +454,15 @@ consumer.Subscribe(topics);
+
+
+```python
+consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
+```
+
+
+
+
## 消费
@@ -441,6 +495,16 @@ while(running){
```
+
+
+```python
+for msg in consumer:
+ for row in msg:
+ print(row)
+```
+
+
+
```go
@@ -502,6 +566,15 @@ consumer.close();
```
+
+
+
+```python
+/* 取消订阅 */
+consumer.unsubscribe();
+
+/* 关闭消费 */
+consumer.close();
```go
@@ -866,65 +939,15 @@ int main(int argc, char* argv[]) {
```python
+import taos
+from taos.tmq import TaosConsumer
+
import taos
from taos.tmq import *
-
-conn = taos.connect()
-
-# create database
-conn.execute("drop database if exists py_tmq")
-conn.execute("create database if not exists py_tmq vgroups 2")
-
-# create table and stables
-conn.select_db("py_tmq")
-conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
-conn.execute("create table if not exists tb1 using stb1 tags(1)")
-conn.execute("create table if not exists tb2 using stb1 tags(2)")
-conn.execute("create table if not exists tb3 using stb1 tags(3)")
-
-# create topic
-conn.execute("drop topic if exists topic_ctb_column")
-conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
-
-# set consumer configure options
-conf = TaosTmqConf()
-conf.set("group.id", "tg2")
-conf.set("td.connect.user", "root")
-conf.set("td.connect.pass", "taosdata")
-conf.set("enable.auto.commit", "true")
-conf.set("msg.with.table.name", "true")
-
-def tmq_commit_cb_print(tmq, resp, offset, param=None):
- print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
-
-conf.set_auto_commit_cb(tmq_commit_cb_print, None)
-
-# build consumer
-tmq = conf.new_consumer()
-
-# build topic list
-topic_list = TaosTmqList()
-topic_list.append("topic_ctb_column")
-
-# subscribe consumer
-tmq.subscribe(topic_list)
-
-# check subscriptions
-sub_list = tmq.subscription()
-print("subscribed topics: ",sub_list)
-
-# start subscribe
-while 1:
- res = tmq.poll(1000)
- if res:
- topic = res.get_topic_name()
- vg = res.get_vgroup_id()
- db = res.get_db_name()
- print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
- for row in res:
- print(row)
- tb = res.get_table_name()
- print(f"from table: {tb}")
+consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
+for msg in consumer:
+ for row in msg:
+ print(row)
```
diff --git a/docs/zh/14-reference/03-connector/_linux_install.mdx b/docs/zh/14-reference/03-connector/_linux_install.mdx
index a4667caec9..c3ddff53cd 100644
--- a/docs/zh/14-reference/03-connector/_linux_install.mdx
+++ b/docs/zh/14-reference/03-connector/_linux_install.mdx
@@ -2,9 +2,9 @@ import PkgListV3 from "/components/PkgListV3";
1. 下载客户端安装包
-
+
- [所有下载](https://www.taosdata.com/cn/all-downloads/)
+ [所有下载](../../releases)
2. 解压缩软件包
diff --git a/docs/zh/14-reference/03-connector/_windows_install.mdx b/docs/zh/14-reference/03-connector/_windows_install.mdx
index 10cf37a7b4..9fdefa04c0 100644
--- a/docs/zh/14-reference/03-connector/_windows_install.mdx
+++ b/docs/zh/14-reference/03-connector/_windows_install.mdx
@@ -4,8 +4,7 @@ import PkgListV3 from "/components/PkgListV3";
- [所有下载](https://www.taosdata.com/cn/all-downloads/)
-
+ [所有下载](../../releases)
2. 执行安装程序,按提示选择默认值,完成安装
3. 安装路径
diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c
index de72c32fa1..cbf81f1d0d 100644
--- a/source/libs/function/src/builtinsimpl.c
+++ b/source/libs/function/src/builtinsimpl.c
@@ -4918,6 +4918,16 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx) {
return numOfElems;
}
+static SSampleInfo* getSampleOutputInfo(SqlFunctionCtx* pCtx) {
+ SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
+ SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
+
+ pInfo->data = (char*)pInfo + sizeof(SSampleInfo);
+ pInfo->tuplePos = (STuplePos*)((char*)pInfo + sizeof(SSampleInfo) + pInfo->samples * pInfo->colBytes);
+
+ return pInfo;
+}
+
bool getSampleFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
@@ -4972,7 +4982,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
int32_t sampleFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
- SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
+ SSampleInfo* pInfo = getSampleOutputInfo(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
@@ -4998,7 +5008,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
- SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo);
+ SSampleInfo* pInfo = getSampleOutputInfo(pCtx);
pEntryInfo->complete = true;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;