Merge branch 'docs/wade-3.0-jdbc' of github.com:taosdata/TDengine into docs/wade-3.0-jdbc

This commit is contained in:
sheyanjie-qq 2024-08-01 21:16:19 +08:00
commit d72829aa46
8 changed files with 866 additions and 817 deletions

View File

@ -1,6 +1,6 @@
// ANCHOR: createConnect
const taos = require("@tdengine/websocket");
// ANCHOR: createConnect
async function createConnect() {
let dsn = 'ws://localhost:6041';
let conf = new taos.WSConfig(dsn);

View File

@ -1,19 +1,21 @@
import taos
def test_connection():
def create_connection():
# all parameters are optional.
# if database is specified,
# then it must exist.
conn = taos.connect(host="localhost",
port=6030,
user="root",
password="taosdata",
database="log")
print('client info:', conn.client_info)
print('server info:', conn.server_info)
conn.close()
conn = None
try:
conn = taosws.connect(
user="root",
password="taosdata",
host="192.168.1.98",
port=6041,
)
except Exception as err:
print(f'Exception {err}')
finally:
if conn:
conn.close()
if __name__ == "__main__":
test_connection()
create_connection()

View File

@ -1,26 +1,53 @@
# ANCHOR: connect
import taosws
conn = taosws.connect("taosws://root:taosdata@localhost:6041")
def create_connection():
conn = None
# ANCHOR: connect
try:
conn = taosws.connect(
user="root",
password="taosdata",
host="localhost",
port=6041,
)
except Exception as err:
print(f'Exception {err}')
# ANCHOR_END: connect
return conn
# ANCHOR: basic
conn.execute("drop database if exists connwspy")
conn.execute("create database if not exists connwspy wal_retention_period 3600 keep 36500 ")
conn.execute("use connwspy")
conn.execute("create table if not exists stb (ts timestamp, c1 int) tags (t1 int)")
conn.execute("create table if not exists tb1 using stb tags (1)")
conn.execute("insert into tb1 values (now, 1)")
conn.execute("insert into tb1 values (now+1s, 2)")
conn.execute("insert into tb1 values (now+2s, 3)")
def create_db_table(conn):
# ANCHOR: create_db
try:
conn.execute("CREATE DATABASE IF NOT EXISTS power")
conn.execute("USE power")
conn.execute("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
conn.execute("CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')")
except Exception as err:
print(f'Exception {err}')
# ANCHOR_END: create_db
r = conn.execute("select * from stb")
result = conn.query("select * from stb")
num_of_fields = result.field_count
print(num_of_fields)
def insert(conn):
sql = """
INSERT INTO
power.d1001 USING power.meters TAGS('California.SanFrancisco', 2)
VALUES (NOW + 1a, 10.30000, 219, 0.31000)
(NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('California.SanFrancisco', 3)
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
"""
try:
inserted = conn.execute(sql)
assert inserted == 8
except Exception as err:
print(f'Exception {err}')
for row in result:
print(row)
def query(conn):
result = conn.query("select * from stb")
num_of_fields = result.field_count
print(num_of_fields)
for row in result:
print(row)
# output:
# 3

View File

@ -1,14 +1,3 @@
```text
[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------------|---|-----------|-----------|------|------|------------|-----------------------|
| protocol | | username | password | host | port | database | params |
- **protocol**: Display using websocket protocol to establish connection.
- **username/password**: Database's username and password.
- **host/port**: Declare host and port. eg. `localhost:6041`
- **database**: Optional, use to specify database name.
- **params**: Other parameters. Like cloud Token.
```
```python
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
{{#include docs/examples/python/connect_example.py}}
```

View File

@ -97,18 +97,63 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
</TabItem>
<TabItem label="Python" value="python">
使用 `pip` 从 PyPI 安装:
- **安装前准备**
- 安装 Python。新近版本 taospy 包要求 Python 3.6.2+。早期版本 taospy 包要求 Python 3.7+。taos-ws-py 包要求 Python 3.7+。如果系统上还没有 Python 可参考 [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) 安装。
- 安装 [pip](https://pypi.org/project/pip/)。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 [pip documentation](https://pip.pypa.io/en/stable/installation/) 安装。
- 如果使用原生连接,还需[安装客户端驱动](../#安装客户端驱动)。客户端软件包含了 TDengine 客户端动态链接库(libtaos.so 或 taos.dll) 和 TDengine CLI。
```
pip install taospy
```
从 Git URL 安装:
```
pip install git+https://github.com/taosdata/taos-connector-python.git
```
- **使用 pip 安装**
- 卸载旧版本
如果以前安装过旧版本的 Python 连接器, 请提前卸载。
```
pip3 uninstall taos taospy
pip3 uninstall taos taos-ws-py
```
- 安装 `taospy`
- 最新版本
```
pip3 install taospy
```
- 指定某个特定版本安装
```
pip3 install taospy==2.3.0
```
- 从 GitHub 安装
```
pip3 install git+https://github.com/taosdata/taos-connector-python.git
```
:::note 此安装包为原生连接器
- 安装 `taos-ws-py`
```bash
pip3 install taos-ws-py
```
:::note 此安装包为 Websocket 连接器
- 同时安装 `taospy``taos-ws-py`
```bash
pip3 install taospy[ws]
```
- **安装验证**
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
对于原生连接,需要验证客户端驱动和 Python 连接器本身是否都正确安装。如果能成功导入 `taos` 模块,则说明已经正确安装了客户端驱动和 Python 连接器。可在 Python 交互式 Shell 中输入:
```python
import taos
```
</TabItem>
<TabItem value="rest" label="REST 连接">
对于 REST 连接,只需验证是否能成功导入 `taosrest` 模块。可在 Python 交互式 Shell 中输入:
```python
import taosrest
```
</TabItem>
<TabItem value="ws" label="WebSocket 连接">
对于 WebSocket 连接,只需验证是否能成功导入 `taosws` 模块。可在 Python 交互式 Shell 中输入:
```python
import taosws
```
</TabItem>
</Tabs>
</TabItem>
<TabItem label="Go" value="go">
@ -149,23 +194,26 @@ taos = { version = "*", default-features = false, features = ["ws"] }
</TabItem>
<TabItem label="Node.js" value="node">
Node.js 连接器通过不同的包提供不同的连接方式。
- **安装前准备**
- 安装 Node.js 开发环境, 使用14以上版本。下载链接 https://nodejs.org/en/download/
1. 安装 Node.js 原生连接器
- **安装**
- 使用 npm 安装 Node.js 连接器
```
npm install @tdengine/websocket
```
:::note Node.js 目前只支持 Websocket 连接
- **安装验证**
- 新建安装验证目录,例如:`~/tdengine-test`,下载 GitHub 上 [nodejsChecker.js 源代码](https://github.com/taosdata/TDengine/tree/main/docs/examples/node/websocketexample/nodejsChecker.js)到本地。
- 在命令行中执行以下命令。
```bash
npm init -y
npm install @tdengine/websocket
node nodejsChecker.js
```
- 执行以上步骤后,在命令行会输出 nodeChecker.js 连接 TDengine 实例,并执行简单插入和查询的结果。
```
npm install @tdengine/client
```
:::note
推荐 Node 版本大于等于 `node-v12.8.0` 小于 `node-v13.0.0`
:::
2. 安装 Node.js REST 连接器
```
npm install @tdengine/rest
```
</TabItem>
<TabItem label="C#" value="csharp">
@ -303,10 +351,9 @@ URL 和 Properties 的详细参数说明和如何使用详见 [API 说明](../..
```
</TabItem>
<TabItem label="Python" value="python">
```python
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
```
```python
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
```
</TabItem>
<TabItem label="Go" value="go">
<ConnGo />
@ -315,9 +362,9 @@ URL 和 Properties 的详细参数说明和如何使用详见 [API 说明](../..
<ConnRust />
</TabItem>
<TabItem label="Node.js" value="node">
```js
{{#include docs/examples/node/websocketexample/sql_example.js:createConnect}}
```
```js
{{#include docs/examples/node/websocketexample/sql_example.js:createConnect}}
```
</TabItem>
<TabItem label="C#" value="csharp">
<ConnCSNative />
@ -376,7 +423,9 @@ URL 和 Properties 的详细参数说明和如何使用详见 [API 说明](../..
```
</TabItem>
<TabItem label="Python" value="python">
<ConnPythonNative />
```python
{{#include docs/examples/python/connect_rest_examples.py:connect}}
```
</TabItem>
<TabItem label="Go" value="go">
<ConnGo />

View File

@ -25,6 +25,8 @@ TDengine 对 SQL 语言提供了全面的支持,允许用户以熟悉的 SQL
</TabItem>
<TabItem label="Python" value="python">
- Websocket 连接
</TabItem>
<TabItem label="Go" value="go">
</TabItem>

View File

@ -99,507 +99,9 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对
|NCHAR|str|
|JSON|str|
## 安装步骤
## 示例程序汇总
### 安装前准备
1. 安装 Python。新近版本 taospy 包要求 Python 3.6.2+。早期版本 taospy 包要求 Python 3.7+。taos-ws-py 包要求 Python 3.7+。如果系统上还没有 Python 可参考 [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) 安装。
2. 安装 [pip](https://pypi.org/project/pip/)。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 [pip documentation](https://pip.pypa.io/en/stable/installation/) 安装。
3. 如果使用原生连接,还需[安装客户端驱动](../#安装客户端驱动)。客户端软件包含了 TDengine 客户端动态链接库(libtaos.so 或 taos.dll) 和 TDengine CLI。
### 使用 pip 安装
#### 卸载旧版本
如果以前安装过旧版本的 Python 连接器, 请提前卸载。
```
pip3 uninstall taos taospy
```
:::note
较早的 TDengine 客户端软件包含了 Python 连接器。如果从客户端软件的安装目录安装了 Python 连接器,那么对应的 Python 包名是 `taos`。 所以上述卸载命令包含了 `taos` 不存在也没关系。
:::
#### 安装 `taospy`
<Tabs>
<TabItem label="从 PyPI 安装" value="pypi">
安装最新版本
```
pip3 install taospy
```
也可以指定某个特定版本安装。
```
pip3 install taospy==2.3.0
```
</TabItem>
<TabItem label="从 GitHub 安装" value="github">
```
pip3 install git+https://github.com/taosdata/taos-connector-python.git
```
</TabItem>
</Tabs>
#### 安装 `taos-ws-py`(可选)
taos-ws-py 包提供了通过 WebSocket 连接 TDengine 的能力,可选安装 taos-ws-py 以获得 WebSocket 连接 TDengine 的能力。
##### 和 taospy 同时安装
```bash
pip3 install taospy[ws]
```
##### 单独安装
```bash
pip3 install taos-ws-py
```
### 安装验证
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
对于原生连接,需要验证客户端驱动和 Python 连接器本身是否都正确安装。如果能成功导入 `taos` 模块,则说明已经正确安装了客户端驱动和 Python 连接器。可在 Python 交互式 Shell 中输入:
```python
import taos
```
</TabItem>
<TabItem value="rest" label="REST 连接">
对于 REST 连接,只需验证是否能成功导入 `taosrest` 模块。可在 Python 交互式 Shell 中输入:
```python
import taosrest
```
</TabItem>
<TabItem value="ws" label="WebSocket 连接">
对于 WebSocket 连接,只需验证是否能成功导入 `taosws` 模块。可在 Python 交互式 Shell 中输入:
```python
import taosws
```
</TabItem>
</Tabs>
:::tip
如果系统上有多个版本的 Python则可能有多个 `pip` 命令。要确保使用的 `pip` 命令路径是正确的。上面我们用 `pip3` 命令安装,排除了使用 Python 2.x 版本对应的 `pip` 的可能性。但是如果系统上有多个 Python 3.x 版本,仍需检查安装路径是否正确。最简单的验证方式是,在命令再次输入 `pip3 install taospy`, 就会打印出 `taospy` 的具体安装位置,比如在 Windows 上:
```
C:\> pip3 install taospy
Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Requirement already satisfied: taospy in c:\users\username\appdata\local\programs\python\python310\lib\site-packages (2.3.0)
```
:::
## 建立连接
### 连通性测试
在用连接器建立连接之前,建议先测试本地 TDengine CLI 到 TDengine 集群的连通性。
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
请确保 TDengine 集群已经启动, 且集群中机器的 FQDN 如果启动的是单机版FQDN 默认为 hostname在本机能够解析, 可用 `ping` 命令进行测试:
```
ping <FQDN>
```
然后测试用 TDengine CLI 能否正常连接集群:
```
taos -h <FQDN> -p <PORT>
```
上面的 FQDN 可以为集群中任意一个 dnode 的 FQDN PORT 为这个 dnode 对应的 serverPort。
</TabItem>
<TabItem value="rest" label="REST 连接" groupId="connect">
对于 REST 连接, 除了确保集群已经启动,还要确保 taosAdapter 组件已经启动。可以使用如下 curl 命令测试:
```
curl -u root:taosdata http://<FQDN>:<PORT>/rest/sql -d "select server_version()"
```
上面的 FQDN 为运行 taosAdapter 的机器的 FQDN PORT 为 taosAdapter 配置的监听端口, 默认为 6041。
如果测试成功,会输出服务器版本信息,比如:
```json
{
"code": 0,
"column_meta": [
[
"server_version()",
"VARCHAR",
7
]
],
"data": [
[
"3.0.0.0"
]
],
"rows": 1
}
```
</TabItem>
<TabItem value="ws" label="WebSocket 连接" groupId="connect">
对于 WebSocket 连接, 除了确保集群已经启动,还要确保 taosAdapter 组件已经启动。可以使用如下 curl 命令测试:
```
curl -i -N -d "show databases" -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -H "Connection: Upgrade" -H "Upgrade: websocket" -H "Host: <FQDN>:<PORT>" -H "Origin: http://<FQDN>:<PORT>" http://<FQDN>:<PORT>/rest/sql
```
上面的 FQDN 为运行 taosAdapter 的机器的 FQDN PORT 为 taosAdapter 配置的监听端口, 默认为 6041。
如果测试成功,会输出服务器版本信息,比如:
```json
HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Date: Tue, 21 Mar 2023 09:29:17 GMT
Transfer-Encoding: chunked
{"status":"succ","head":["server_version()"],"column_meta":[["server_version()",8,8]],"data":[["2.6.0.27"]],"rows":1}
```
</TabItem>
</Tabs>
### 指定 Host 和 Properties 获取连接
以下示例代码假设 TDengine 安装在本机, 且 FQDN 和 serverPort 都使用了默认配置。
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接" groupId="connect">
```python
{{#include docs/examples/python/connect_native_reference.py}}
```
`connect` 函数的所有参数都是可选的关键字参数。下面是连接参数的具体说明:
- `host` 要连接的节点的 FQDN。 没有默认值。如果不同提供此参数,则会连接客户端配置文件中的 firstEP。
- `user` TDengine 用户名。 默认值是 root。
- `password` TDengine 用户密码。 默认值是 taosdata。
- `port` 要连接的数据节点的起始端口,即 serverPort 配置。默认值是 6030。只有在提供了 host 参数的时候,这个参数才生效。
- `config` 客户端配置文件路径。 在 Windows 系统上默认是 `C:\TDengine\cfg`。 在 Linux/macOS 系统上默认是 `/etc/taos/`。
- `timezone` 查询结果中 TIMESTAMP 类型的数据,转换为 python 的 datetime 对象时使用的时区。默认为本地时区。
:::warning
`config` 和 `timezone` 都是进程级别的配置。建议一个进程建立的所有连接都使用相同的参数值。否则可能产生无法预知的错误。
:::
:::tip
`connect` 函数返回 `taos.TaosConnection` 实例。 在客户端多线程的场景下,推荐每个线程申请一个独立的连接实例,而不建议多线程共享一个连接。
:::
</TabItem>
<TabItem value="rest" label="REST 连接">
```python
{{#include docs/examples/python/connect_rest_examples.py:connect}}
```
`connect()` 函数的所有参数都是可选的关键字参数。下面是连接参数的具体说明:
- `url` taosAdapter REST 服务的 URL。默认是 \<http://localhost:6041>。
- `user` TDengine 用户名。默认是 root。
- `password` TDengine 用户密码。默认是 taosdata。
- `timeout`: HTTP 请求超时时间。单位为秒。默认为 `socket._GLOBAL_DEFAULT_TIMEOUT`。 一般无需配置。
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
```python
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
```
`connect()` 函数参数为连接 url协议为 `taosws` 或 `ws`
</TabItem>
</Tabs>
### 配置参数的优先级
如果配置参数在参数和客户端配置文件中有重复,则参数的优先级由高到低分别如下:
1. 连接参数
2. 使用原生连接时TDengine 客户端驱动的配置文件 taos.cfg
## 使用示例
### 创建数据库和表
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
```python
{{#include docs/examples/python/create_db_native.py}}
```
</TabItem>
<TabItem value="rest" label="REST 连接">
```python
{{#include docs/examples/python/create_db_rest.py}}
```
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
```python
{{#include docs/examples/python/create_db_ws.py}}
```
</TabItem>
</Tabs>
### 插入数据
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
```python
{{#include docs/examples/python/insert_native.py:insert}}
```
</TabItem>
<TabItem value="rest" label="REST 连接">
```python
{{#include docs/examples/python/insert_rest.py:insert}}
```
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
```python
{{#include docs/examples/python/insert_ws.py:insert}}
```
</TabItem>
</Tabs>
> NOW 为系统内部函数,默认为客户端所在计算机当前时间。
> `NOW + 1s` 代表客户端当前时间往后加 1 秒数字后面代表时间单位a(毫秒)s(秒)m(分)h(小时)d(天)w(周)n(月)y(年)。
### 查询数据
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
`TaosConnection` 类的 `query` 方法可以用来查询数据,返回 `TaosResult` 类型的结果数据。
```python
{{#include docs/examples/python/insert_native.py:query}}
```
:::tip
查询结果只能获取一次。比如上面的示例中 `fetch_all()` 和 `fetch_all_into_dict()` 只能用一个。重复获取得到的结果为空列表。
:::
</TabItem>
<TabItem value="rest" label="REST 连接">
RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方法用于执行任意 SQL 语句, 并返回执行结果。
```python
{{#include docs/examples/python/insert_rest.py:query}}
```
对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
`TaosConnection` 类的 `query` 方法可以用来查询数据,返回 `TaosResult` 类型的结果数据。
```python
{{#include docs/examples/python/insert_ws.py:query}}
```
</TabItem>
</Tabs>
### 执行带有 reqId 的 SQL
<RequestId />
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
类似上文介绍的使用方法,增加 `req_id` 参数。
```python
{{#include docs/examples/python/insert_native.py:req_id}}
```
</TabItem>
<TabItem value="rest" label="REST 连接">
类似上文介绍的使用方法,增加 `req_id` 参数。
```python
{{#include docs/examples/python/insert_rest.py:req_id}}
```
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
类似上文介绍的使用方法,增加 `req_id` 参数。
```python
{{#include docs/examples/python/insert_ws.py:req_id}}
```
</TabItem>
</Tabs>
### 通过参数绑定写入数据
TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 `?` 来代表待绑定的参数。
<Tabs>
<TabItem value="native" label="原生连接">
```python
{{#include docs/examples/python/stmt_native.py:stmt}}
```
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
```python
{{#include docs/examples/python/stmt_ws.py:stmt}}
```
</TabItem>
</Tabs>
### 无模式写入
连接器支持无模式写入功能。
<Tabs defaultValue="list">
<TabItem value="list" label="原生连接">
```python
{{#include docs/examples/python/schemaless_native.py}}
```
</TabItem>
<TabItem value="raw" label="WebSocket 连接">
```python
{{#include docs/examples/python/schemaless_ws.py}}
```
</TabItem>
</Tabs>
### 执行带有 reqId 的无模式写入
连接器的 `schemaless_insert` 和 `schemaless_insert_raw` 方法支持 `req_id` 可选参数,此 `req_id` 可用于请求链路追踪。
```python
conn.schemaless_insert(
lines=lineDemo,
protocol=taos.SmlProtocol.LINE_PROTOCOL,
precision=taos.SmlPrecision.NANO_SECONDS,
req_id=1,
)
```
### 数据订阅
连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅文档](../../develop/tmq/)。
#### 创建 Topic
```python
{{#include docs/examples/python/tmq_native.py:create_topic}}
```
#### 创建 Consumer
```python
{{#include docs/examples/python/tmq_native.py:create_consumer}}
```
#### 订阅 topics
```python
{{#include docs/examples/python/tmq_native.py:subscribe}}
```
#### 消费数据
```python
{{#include docs/examples/python/tmq_native.py:consume}}
```
#### 获取消费进度
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
```python
{{#include docs/examples/python/tmq_native.py:assignment}}
```
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。
```python
{{#include docs/examples/python/tmq_native.py:consume}}
```
#### 关闭订阅
消费结束后,应当取消订阅,并关闭 Consumer。
```python
{{#include docs/examples/python/tmq_native.py:unsubscribe}}
```
#### 完整示例
```python
{{#include docs/examples/python/tmq_native.py}}
```
### 更多示例程序
| 示例程序链接 | 示例程序内容 |
| 示例程序链接 | 示例程序内容 |
| ------------------------------------------------------------------------------------------------------------- | ----------------------- |
| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | 参数绑定, 一次绑定多行 |
| [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py) | 参数绑定,一次绑定一行 |
@ -607,24 +109,450 @@ Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位
| [json_tag.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/json-tag.py) | 使用 JSON 类型的标签 |
| [tmq_consumer.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/tmq_consumer.py) | tmq 订阅 |
## 其它说明
### 关于纳秒 (nanosecond)
## 关于纳秒 (nanosecond)
由于目前 Python 对 nanosecond 支持的不完善(见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒Python 连接器可能会修改相关接口。
1. https://stackoverflow.com/questions/10611328/parsing-datetime-strings-containing-nanoseconds
2. https://www.python.org/dev/peps/pep-0564/
## 重要更新
[**Release Notes**](https://github.com/taosdata/taos-connector-python/releases)
## API 参考
- [taos](https://docs.taosdata.com/api/taospy/taos/)
- [taosrest](https://docs.taosdata.com/api/taospy/taosrest)
## 常见问题
欢迎[提问或报告问题](https://github.com/taosdata/taos-connector-python/issues)。
## API 参考
### WebSocket 连接
#### URL 规范
```text
[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------------|---|-----------|-----------|------|------|------------|-----------------------|
| protocol | | username | password | host | port | database | params |
```
- **protocol**: 使用 websocket 协议建立连接。例如`ws://localhost:6041`
- **username/password**: 数据库的用户名和密码。
- **host/port**: 主机地址和端口号。例如`localhost:6041`
- **database**: 数据库名称。
- **params**: 其他参数。 例如token。
#### 建立连接
- `fn connect(dsn: Option<&str>, args: Option<&PyDict>) -> PyResult<Connection>`
- **接口说明**:建立 taosAdapter 连接。
- **参数说明**
- `dsn`: 类型 `Option<&str>` 可选数据源名称DSN用于指定要连接的数据库的位置和认证信息。
- `args`: 类型 `Option<&PyDict>` 可选,以 Python 字典的形式提供, 可用于设置
- `user`: 数据库的用户名
- `password`: 数据库的密码。
- `host`: 主机地址
- `port`: 端口号
- `database`: 数据库名称
- **返回值**:连接对象。
- **异常**:操作失败抛出 `ConnectionError` 异常。
- `fn cursor(&self) -> PyResult<Cursor>`
- **接口说明**创建一个新的数据库游标对象用于执行SQL命令和查询。
- **返回值**:数据库游标对象。
- **异常**:操作失败抛出 `ConnectionError` 异常。
#### 执行SQL
- `fn execute(&self, sql: &str) -> PyResult<i32>`
- **接口说明**:执行 sql 语句。
- **参数说明**
- `sql`:待执行的 sql 语句。
- **返回值**:影响的条数。
- **异常**:操作失败抛出 `QueryError` 异常。
- `fn execute_with_req_id(&self, sql: &str, req_id: u64) -> PyResult<i32>`
- **接口说明**:执行带有 req_id 的 sql 语句。
- **参数说明**
- `sql`:待执行的 sql 语句。
- `reqId`: 用于问题追踪。
- **返回值**:影响的条数。
- **异常**:操作失败抛出 `QueryError` 异常。
- `fn query(&self, sql: &str) -> PyResult<TaosResult>`
- **接口说明**:查询数据。
- **参数说明**
- `sql`:待执行的 sql 语句。
- **返回值**`TaosResult` 数据集对象。
- **异常**:操作失败抛出 `QueryError` 异常。
- `fn query_with_req_id(&self, sql: &str, req_id: u64) -> PyResult<TaosResult>`
- **接口说明**:查询带有 req_id 的 sql 语句。
- **参数说明**
- `sql`:待执行的 sql 语句。
- `reqId`: 用于问题追踪。
- **返回值**`TaosResult` 数据集对象。
- **异常**:操作失败抛出 `QueryError` 异常。
#### 数据集
TaosResult 对象可以通过循环遍历获取查询到的数据。
- `fn fields(&self) -> Vec<TaosField>`
- **接口说明**:获取查询数据的字段信息, 包括:名称,类型及字段长度。
- **返回值**`Vec<TaosField>` 字段信息数组。
- `fn field_count(&self) -> i32`
- **接口说明**:获取查询到的记录条数。
- **返回值**`i32` 查询到的记录条数。
#### 无模式写入
- `fn schemaless_insert(&self, lines: Vec<String>, protocol: PySchemalessProtocol, precision: PySchemalessPrecision, ttl: i32, req_id: u64) -> PyResult<()>`
- **接口说明**:无模式写入。
- **参数说明**
- `lines`:待写入的数据数组,无模式具体的数据格式可参考 `Schemaless 写入`。
- `protocol`: 协议类型
- `PySchemalessProtocol::Line` InfluxDB 行协议Line Protocol)。
- `PySchemalessProtocol::Telnet`OpenTSDB 文本行协议。
- `PySchemalessProtocol::Json` JSON 协议格式
- `precision`: 时间精度
- `PySchemalessPrecision::Hour` 小时
- `PySchemalessPrecision::Minute`:分钟
- `PySchemalessPrecision::Second` 秒
- `PySchemalessPrecision::Millisecond`:毫秒
- `PySchemalessPrecision::Microsecond`:微秒
- `PySchemalessPrecision::Nanosecond` 纳秒
- `ttl`:表过期时间,单位天。
- `reqId`: 用于问题追踪。
- **异常**:操作失败抛出 `DataError` 或 `OperationalError` 异常。
#### 参数绑定
- `fn statement(&self) -> PyResult<TaosStmt>`
- **接口说明**:使用 连接 对象创建 stmt 对象。
- **返回值**stmt 对象。
- **异常**:操作失败抛出 `ConnectionError` 异常。
- `fn prepare(&mut self, sql: &str) -> PyResult<()>`
- **接口说明**:绑定预编译 sql 语句。
- **参数说明**
- `sql`: 预编译的 SQL 语句。
- **异常**:操作失败抛出 `ProgrammingError` 异常。
- `fn set_tbname(&mut self, table_name: &str) -> PyResult<()>`
- **接口说明**:设置将要写入数据的表名。
- **参数说明**
- `tableName`: 表名,如果需要指定数据库, 例如: `db_name.table_name` 即可。
- **异常**:操作失败抛出 `ProgrammingError` 异常。
- `fn set_tags(&mut self, tags: Vec<PyTagView>) -> PyResult<()>`
- **接口说明**:设置表 Tags 数据, 用于自动建表。
- **参数说明**
- `paramsArray`: Tags 数据。
- **异常**:操作失败抛出 `ProgrammingError` 异常。
- `fn bind_param(&mut self, params: Vec<PyColumnView>) -> PyResult<()>`
- **接口说明**:绑定数据。
- **参数说明**
- `paramsArray`: 绑定数据。
- **异常**:操作失败抛出 `ProgrammingError` 异常。
- `fn add_batch(&mut self) -> PyResult<()>`
- **接口说明**:提交绑定数据。
- **异常**:操作失败抛出 `ProgrammingError` 异常。
- `fn execute(&mut self) -> PyResult<usize>`
- **接口说明**:执行将绑定的数据全部写入。
- **返回值**:写入条数。
- **异常**:操作失败抛出 `QueryError` 异常。
- `fn affect_rows(&mut self) -> PyResult<usize>`
- **接口说明** 获取写入条数。
- **返回值**:写入条数。
- `fn close(&self) -> PyResult<()>`
- **接口说明** 关闭 stmt 对象。
#### 数据订阅
- **创建消费者支持属性列表**
- host主机地址。
- port端口号。
- group.id所在的 group。
- client.id客户端id。
- td.connect.user: 数据库用户名。
- td.connect.pass: 数据库密码。
- td.connect.token数据库的连接token。
- auto.offset.reset来确定消费位置为最新数据latest还是包含旧数据earliest
- enable.auto.commit是否允许自动提交。
- auto.commit.interval.ms自动提交间隔
- `fn Consumer(conf: Option<&PyDict>, dsn: Option<&str>) -> PyResult<Self>`
- **接口说明** 消费者构造函数。
- `conf`: 类型 `Option<&PyDict>` 可选,以 Python 字典的形式提供, 具体配置参见属性列表。
- `dsn`: 类型 `Option<&str>` 可选数据源名称DSN用于指定要连接的数据库的位置和认证信息。
- **返回值**Consumer 消费者对象。
- **异常**:操作失败抛出 `ConsumerException` 异常。
- `fn subscribe(&mut self, topics: &PyList) -> PyResult<()>`
- **接口说明** 订阅一组主题。
- **参数说明**
- `topics`: 订阅的主题列表。
- **异常**:操作失败抛出 `ConsumerException` 异常。
- `fn unsubscribe(&mut self)`
- **接口说明** 取消订阅。
- **异常**:操作失败抛出 `ConsumerException` 异常。
- `fn poll(&mut self, timeout: Option<f64>) -> PyResult<Option<Message>>`
- **接口说明** 轮询消息。
- **参数说明**
- `timeoutMs`: 表示轮询的超时时间,单位毫秒。
- **返回值**`Message` 每个主题对应的数据。
- **异常**:操作失败抛出 `ConsumerException` 异常。
- `fn commit(&mut self, message: &mut Message) -> PyResult<()>`
- **接口说明** 提交当前处理的消息的偏移量。
- **参数说明**
- `message`: 类型 `Message`, 当前处理的消息的偏移量。
- **异常**:操作失败抛出 `ConsumerException` 异常。
- `fn assignment(&mut self) -> PyResult<Option<Vec<TopicAssignment>>>`
- **接口说明**:获取消费者当前分配的指定的分区或所有分区。
- **返回值**:返回值类型为 `Vec<TopicAssignment>`,即消费者当前分配的所有分区。
- **异常**:操作失败抛出 ConsumerException 异常。
- `fn seek(&mut self, topic: &str, vg_id: i32, offset: i64) -> PyResult<()>`
- **接口说明**:将给定分区的偏移量设置到指定的位置。
- **参数说明**
- `topic`: 订阅的主题。
- `vg_id`: vgroupid。
- `offset`:需要设置的的偏移量。
- **异常**:操作失败抛出 ConsumerException 异常。
- `fn committed(&mut self, topic: &str, vg_id: i32) -> PyResult<i64>`
- **接口说明**获取订阅主题的vgroupid分区最后提交的偏移量。
- **参数说明**
- `topic`: 订阅的主题。
- `vg_id`: vgroupid。
- **返回值**`i64`,分区最后提交的偏移量。
- **异常**:操作失败抛出 ConsumerException 异常。
- `fn position(&mut self, topic: &str, vg_id: i32) -> PyResult<i64>`
- **接口说明**:获取给定分区当前的偏移量。
- **参数说明**
- `topic`: 订阅的主题。
- `vg_id`: vgroupid。
- **返回值**`i64`,分区最后提交的偏移量。
- **异常**:操作失败抛出 ConsumerException 异常。
- `fn close(&mut self)`
- **接口说明**:关闭 tmq 连接。
- **异常**:操作失败抛出 ConsumerException 异常。
### Native 连接
#### 建立连接
- `def connect(*args, **kwargs):`
- **接口说明**:建立 taosAdapter 连接。
- **参数说明**
- `kwargs`: 以 Python 字典的形式提供, 可用于设置
- `user`: 数据库的用户名
- `password`: 数据库的密码。
- `host`: 主机地址
- `port`: 端口号
- `database`: 数据库名称
- `timezone`: 时区
- **返回值**`TaosConnection` 连接对象。
- **异常**:操作失败抛出 `AttributeError` 或 `ConnectionError` 异常。
- `def cursor(self)`
- **接口说明**创建一个新的数据库游标对象用于执行SQL命令和查询。
- **返回值**:数据库游标对象。
#### 执行SQL
- `def execute(self, operation, req_id: Optional[int] = None)`
- **接口说明**:执行 sql 语句。
- **参数说明**
- `operation`:待执行的 sql 语句。
- `reqId`: 用于问题追踪。
- **返回值**:影响的条数。
- **异常**:操作失败抛出 `ProgrammingError` 异常。
- `def query(self, sql: str, req_id: Optional[int] = None) -> TaosResult`
- **接口说明**:查询数据。
- **参数说明**
- `sql`:待执行的 sql 语句。
- `reqId`: 用于问题追踪。
- **返回值**`TaosResult` 数据集对象。
- **异常**:操作失败抛出 `ProgrammingError` 异常。
#### 数据集
TaosResult 对象可以通过循环遍历获取查询到的数据。
- `def fields(&self)`
- **接口说明**:获取查询数据的字段信息, 包括:名称,类型及字段长度。
- **返回值**`TaosFields` 字段信息 list。
- `def field_count(&self)`
- **接口说明**:获取查询到的记录条数。
- **返回值**:查询到的记录条数。
- `def fetch_all_into_dict(self)`
- **接口说明**:将所有的记录转换为字典。
- **返回值**:返回字典列表。
#### 无模式写入
- `def schemaless_insert(&self, lines: List[str], protocol: SmlProtocol, precision: SmlPrecision, req_id: Optional[int] = None, ttl: Optional[int] = None) -> int:`
- **接口说明**:无模式写入。
- **参数说明**
- `lines`:待写入的数据数组,无模式具体的数据格式可参考 `Schemaless 写入`。
- `protocol`: 协议类型
- `SmlProtocol.LINE_PROTOCOL` InfluxDB 行协议Line Protocol)。
- `SmlProtocol.TELNET_PROTOCOL`OpenTSDB 文本行协议。
- `SmlProtocol.JSON_PROTOCOL` JSON 协议格式
- `precision`: 时间精度
- `SmlPrecision.Hour` 小时
- `SmlPrecision.Minute`:分钟
- `SmlPrecision.Second` 秒
- `SmlPrecision.Millisecond`:毫秒
- `SmlPrecision.Microsecond`:微秒
- `SmlPrecision.Nanosecond` 纳秒
- `ttl`:表过期时间,单位天。
- `reqId`: 用于问题追踪。
- **返回值**:影响的条数。
- **异常**:操作失败抛出 `SchemalessError` 异常。
#### 参数绑定
- `def statement(self, sql=None)`
- **接口说明**:使用连接对象创建 stmt 对象, 如果 sql 不空会进行调用 prepare。
- `sql`: 预编译的 SQL 语句。
- **返回值**stmt 对象。
- **异常**:操作失败抛出 `StatementError` 异常。
- `def prepare(self, sql)`
- **接口说明**:绑定预编译 sql 语句。
- **参数说明**
- `sql`: 预编译的 SQL 语句。
- **异常**:操作失败抛出 `StatementError` 异常。
- `def set_tbname(self, name)`
- **接口说明**:设置将要写入数据的表名。
- **参数说明**
- `name`: 表名,如果需要指定数据库, 例如: `db_name.table_name` 即可。
- **异常**:操作失败抛出 `StatementError` 异常。
- `def set_tbname_tags(self, name, tags):`
- **接口说明**:设置表和 Tags 数据, 用于自动建表。
- **参数说明**
- `name`: 表名,如果需要指定数据库, 例如: `db_name.table_name` 即可。
- `tags`: Tags 数据。
- **异常**:操作失败抛出 `StatementError` 异常。
- `def bind_param(self, params, add_batch=True)`
- **接口说明**:绑定一组数据并提交。
- **参数说明**
- `params`: 绑定数据。
- `add_batch`: 是否提交绑定数据。
- **异常**:操作失败抛出 `StatementError` 异常。
- `def bind_param_batch(self, binds, add_batch=True)`
- **接口说明**:绑定多组数据并提交。
- **参数说明**
- `binds`: 绑定数据。
- `add_batch`: 是否提交绑定数据。
- **异常**:操作失败抛出 `StatementError` 异常。
- `def add_batch(self)`
- **接口说明**:提交绑定数据。
- **异常**:操作失败抛出 `StatementError` 异常。
- `def execute(self)`
- **接口说明**:执行将绑定的数据全部写入。
- **异常**:操作失败抛出 `StatementError` 异常。
- `def affected_rows(self)`
- **接口说明** 获取写入条数。
- **返回值**:写入条数。
- `def close(&self)`
- **接口说明** 关闭 stmt 对象。
#### 数据订阅
- **创建消费者支持属性列表**
- td.connect.ip主机地址。
- td.connect.port端口号。
- group.id所在的 group。
- client.id客户端id。
- td.connect.user: 数据库用户名。
- td.connect.pass: 数据库密码。
- td.connect.token数据库的连接token。
- auto.offset.reset来确定消费位置为最新数据latest还是包含旧数据earliest
- enable.auto.commit是否允许自动提交。
- auto.commit.interval.ms自动提交间隔
- `def Consumer(configs)`
- **接口说明** 消费者构造函数。
- `configs`: Python 字典的形式提供, 具体配置参见属性列表。
- **返回值**Consumer 消费者对象。
- **异常**:操作失败抛出 `TmqError` 异常。
- `def subscribe(self, topics)`
- **接口说明** 订阅一组主题。
- **参数说明**
- `topics`: 订阅的主题列表。
- **异常**:操作失败抛出 `TmqError` 异常。
- `def unsubscribe(self)`
- **接口说明** 取消订阅。
- **异常**:操作失败抛出 `TmqError` 异常。
- `def poll(self, timeout: float = 1.0)`
- **接口说明** 轮询消息。
- **参数说明**
- `timeout`: 表示轮询的超时时间,单位毫秒。
- **返回值**`Message` 每个主题对应的数据。
- **异常**:操作失败抛出 `TmqError` 异常。
- `def commit(self, message: Message = None, offsets: [TopicPartition] = None)`
- **接口说明** 提交当前处理的消息的偏移量。
- **参数说明**
- `message`: 类型 `Message`, 当前处理的消息的偏移量。
- `offsets`: 类型 `[TopicPartition]`, 提交一批消息的偏移量。
- **异常**:操作失败抛出 `TmqError` 异常。
- `def assignment(self)`
- **接口说明**:获取消费者当前分配的指定的分区或所有分区。
- **返回值**:返回值类型为 `[TopicPartition]`,即消费者当前分配的所有分区。
- **异常**:操作失败抛出 TmqError 异常。
- `def seek(self, partition)`
- **接口说明**:将给定分区的偏移量设置到指定的位置。
- **参数说明**
- `partition`: 需要设置的的偏移量。
- `topic`: 订阅的主题
- `partition`: 分区
- `offset`: 偏移量
- **异常**:操作失败抛出 `TmqError` 异常。
- `def committed(self, partitions)`
- **接口说明**:获取订阅主题的分区最后提交的偏移量。
- **参数说明**
- `partition`: 需要设置的的偏移量。
- `topic`: 订阅的主题
- `partition`: 分区
- **返回值**`partition`,分区最后提交的偏移量。
- **异常**:操作失败抛出 `TmqError` 异常。
- `def position(self, partitions)`
- **接口说明**:获取给定分区当前的偏移量。
- **参数说明**
- `partition`: 需要设置的的偏移量。
- `topic`: 订阅的主题
- `partition`: 分区
- **返回值**`partition`,分区最后提交的偏移量。
- **异常**:操作失败抛出 TmqError 异常。
- `def close(self)`
- **接口说明**:关闭 tmq 连接。
- **异常**:操作失败抛出 TmqError 异常。
### REST 连接
- `def connect(**kwargs) -> TaosRestConnection`
- **接口说明**:建立 taosAdapter 连接。
- **参数说明**
- `kwargs`: 以 Python 字典的形式提供, 可用于设置
- `user`: 数据库的用户名
- `password`: 数据库的密码。
- `host`: 主机地址
- `port`: 端口号
- `database`: 数据库名称
- **返回值**:连接对象。
- **异常**:操作失败抛出 `ConnectError` 异常。
- `def execute(self, sql: str, req_id: Optional[int] = None) -> Optional[int]`
- **接口说明**:执行 sql 语句。
- **参数说明**
- `sql`:待执行的 sql 语句。
- `reqId`: 用于问题追踪。
- **返回值**:影响的条数。
- **异常**:操作失败抛出 `ConnectError` 或 `HTTPError` 异常。
- `def query(self, sql: str, req_id: Optional[int] = None) -> Result`
- **接口说明**:查询数据。
- **参数说明**
- `sql`:待执行的 sql 语句。
- `reqId`: 用于问题追踪。
- **返回值**`Result` 数据集对象。
- **异常**:操作失败抛出 `ConnectError` 或 `HTTPError` 异常。
- `RestClient(self, url: str, token: str = None, database: str = None, user: str = "root", password: str = "taosdata", timeout: int = None, convert_timestamp: bool = True, timezone: Union[str, datetime.tzinfo] = None)`
- **接口说明**:建立 taosAdapter 连接 client。
- **参数说明**
- `url`: taosAdapter REST 服务的 URL。默认是 \<http://localhost:6041>。
- `user`: 数据库的用户名。
- `password`: 数据库的密码。
- `database`: 数据库名称。
- `timezone`: 时区。
- `timeout`: HTTP 请求超时时间。单位为秒。
- `convert_timestamp`: 是否将时间戳从STR类型转换为datetime类型。
- `timezone`: 时区.
- **返回值**:连接对象。
- **异常**:操作失败抛出 `ConnectError` 异常。
- `def sql(self, q: str, req_id: Optional[int] = None) -> dict`
- **接口说明**:执行 sql 语句。
- **参数说明**
- `sql`:待执行的 sql 语句。
- `reqId`: 用于问题追踪。
- **返回值**:返回字典列表。
- **异常**:操作失败抛出 `ConnectError` 或 `HTTPError` 异常。

View File

@ -28,15 +28,6 @@ Node.js 连接器目前仅支持 Websocket 连接器, 其通过 taosAdapter
| :------------------: | :----------------------: | :----------------: |
| 3.1.0 | 新版本发布,支持 WebSocket 连接 | 3.2.0.0 及更高版本 |
## 支持的功能特性
- 连接管理
- SQL写入
- SQL查询
- 参数绑定
- 数据订阅
- 无模式写入
## 处理异常
在调用连接器 api 报错后,通过 try catch 可以获取到错误的信息和错误码。
@ -85,227 +76,6 @@ Node.js 连接器目前仅支持 Websocket 连接器, 其通过 taosAdapter
**注意**JSON 类型仅在 tag 中支持。
## 安装步骤
### 安装前准备
- 安装 Node.js 开发环境, 使用14以上版本。下载链接 https://nodejs.org/en/download/
### 使用 npm 安装 Node.js 连接器
```bash
npm install @tdengine/websocket
```
### 安装验证
验证方法:
- 新建安装验证目录,例如:`~/tdengine-test`,下载 GitHub 上 [nodejsChecker.js 源代码](https://github.com/taosdata/TDengine/tree/main/docs/examples/node/websocketexample/nodejsChecker.js)到本地。
- 在命令行中执行以下命令。
```bash
npm init -y
npm install @tdengine/websocket
node nodejsChecker.js
```
- 执行以上步骤后,在命令行会输出 nodeChecker.js 连接 TDengine 实例,并执行简单插入和查询的结果。
## 建立连接
安装并引用 `@tdengine/websocket` 包。
**注意**
- 链接器使用结束后,需要调用 taos.destroy() 释放连接器资源
```javascript
const taos = require("@tdengine/websocket");
//数据库操作......
taos.destroy();
```
```javascript
WSConfig配置Websocket参数如下:
getToken(): string | undefined | null;
setToken(token: string): void;
getUser(): string | undefined | null;
setUser(user: string): void;
getPwd(): string | undefined | null;
setPwd(pws: string): void;
getDb(): string | undefined | null;
setDb(db: string): void;
getUrl(): string;
setUrl(url: string): void;
setTimeOut(ms: number): void;
getTimeOut(): number | undefined | null;
```
```javascript
{{#include docs/examples/node/websocketexample/sql_example.js:createConnect}}
```
## 使用示例
### 创建数据库和表
```javascript
{{#include docs/examples/node/websocketexample/sql_example.js:create_db_and_table}}
```
> **注意**:如果不使用 `USE power` 指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 power.meters。
### 插入数据
```javascript
{{#include docs/examples/node/websocketexample/sql_example.js:insertData}}
```
> NOW 为系统内部函数,默认为客户端所在计算机当前时间。
> `NOW + 1s` 代表客户端当前时间往后加 1 秒数字后面代表时间单位a(毫秒)s(秒)m(分)h(小时)d(天)w(周)n(月)y(年)。
### 查询数据
```javascript
{{#include docs/examples/node/websocketexample/sql_example.js:queryData}}
```
> 查询到的数据
```javascript
wsRow:meta:=> [
{ name: 'ts', type: 'TIMESTAMP', length: 8 },
{ name: 'current', type: 'FLOAT', length: 4 },
{ name: 'voltage', type: 'INT', length: 4 },
{ name: 'phase', type: 'FLOAT', length: 4 },
{ name: 'location', type: 'VARCHAR', length: 64},
{ name: 'groupid', type: 'INT', length: 4 }
]
wsRow:data:=> [
[ 1714013737536n, 12.3, 221, 0.31, 'California.SanFrancisco', 3 ]
]
```
### 执行带有 reqId 的 SQL
<RequestId />
```javascript
{{#include docs/examples/node/websocketexample/sql_example.js:sqlWithReqid}}
```
### 通过参数绑定写入数据
TDengine 的 NodeJs 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 ? 来代表待绑定的参数。采用这种方式写入数据时,能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。
**注意**
- 预处理语句中指定数据库与子表名称不要使用 `db.?`,应直接使用 `?`,然后在 setTableName 中指定数据库,如:`stmt.setTableName("db.t1")`。
示例代码:
```javascript
{{#include docs/examples/node/websocketexample/stmt_example.js}}
```
用于设定 TAG/VALUES 数据列的取值的方法总共有:
```javascript
setBoolean(params: any[]): void;
setTinyInt(params: any[]): void;
setUTinyInt(params: any[]): void;
setSmallInt(params: any[]): void;
setUSmallInt(params: any[]): void;
setInt(params: any[]): void;
setUInt(params: any[]): void;
setBigint(params: any[]): void;
setUBigint(params: any[]): void;
setFloat(params: any[]): void;
setDouble(params: any[]): void;
setVarchar(params: any[]): void;
setBinary(params: any[]): void;
setNchar(params: any[]): void;
setJson(params: any[]): void;
setVarBinary(params: any[]): void;
setGeometry(params: any[]): void;
setTimestamp(params: any[]): void;
```
**注意**JSON 类型仅在 tag 中支持。
### 无模式写入
TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议Line Protocol、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../reference/schemaless/)。
```javascript
{{#include docs/examples/node/websocketexample/line_example.js}}
```
### 执行带有 reqId 的无模式写入
此 reqId 可用于请求链路追踪。
```javascript
await wsSchemaless.schemalessInsert([influxdbData], SchemalessProto.InfluxDBLineProtocol, Precision.NANO_SECONDS, ttl, reqId);
await wsSchemaless.schemalessInsert([telnetData], SchemalessProto.OpenTSDBTelnetLineProtocol, Precision.NANO_SECONDS, ttl, reqId);
await wsSchemaless.schemalessInsert([jsonData], SchemalessProto.OpenTSDBJsonFormatProtocol, Precision.NANO_SECONDS, ttl, reqId);
```
### 数据订阅
TDengine NodeJs 连接器支持订阅功能,应用 API 如下:
#### 创建 Topic
```javascript
{{#include docs/examples/node/websocketexample/tmq_example.js:create_topic}}
```
#### 创建 Consumer
```javascript
{{#include docs/examples/node/websocketexample/tmq_example.js:create_consumer}}
```
> 参数说明
- taos.TMQConstants.CONNECT_USER: 用户名。
- taos.TMQConstants.CONNECT_PASS: 密码。
- taos.TMQConstants.GROUP_ID: 所在的 group。
- taos.TMQConstants.CLIENT_ID: client id。
- taos.TMQConstants.WS_URL: taosAdapter 的url地址。
- taos.TMQConstants.AUTO_OFFSET_RESET: 来确定消费位置为最新数据latest还是包含旧数据earliest
- taos.TMQConstants.ENABLE_AUTO_COMMIT: 是否允许自动提交。
- taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS: 自动提交间隔。
- taos.TMQConstants.CONNECT_MESSAGE_TIMEOUT: 数据传输超时参数,单位 ms默认为 10000 ms。
其他参数请参考:[Consumer 参数列表](../../develop/tmq/#数据订阅相关参数) 注意 TDengine 服务端自3.2.0.0版本开始消息订阅中的 auto.offset.reset 默认值发生变化。
#### 订阅消费数据
```javascript
{{#include docs/examples/node/websocketexample/tmq_example.js:subscribe}}
```
#### 指定订阅 Offset
```javascript
{{#include docs/examples/node/websocketexample/tmq_example.js:assignment}}
```
#### 关闭订阅
```javascript
// 取消订阅
consumer.unsubscribe();
// 关闭消费
consumer.close()
// 释放连接器资源
taos.destroy();
```
详情请参考:[数据订阅](../../develop/tmq)
#### 完整示例
```javascript
{{#include docs/examples/node/websocketexample/tmq_example.js}}
```
## 更多示例程序
| 示例程序 | 示例程序描述 |
@ -330,4 +100,286 @@ taos.destroy();
**原因**:一般都是因为配置 FQDN 不正确。 可以参考[如何彻底搞懂 TDengine 的 FQDN](https://www.taosdata.com/blog/2021/07/29/2741.html) 。
## API 参考
Node.js 连接器(`@tdengine/websocket`, 其通过 taosAdapter 提供的 Websocket 接口连接 TDengine 实例。
### URL 规范
```text
[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------------|---|-----------|-----------|------|------|------------|-----------------------|
| protocol | | username | password | host | port | database | params |
```
- **protocol**: 使用 websocket 协议建立连接。例如`ws://localhost:6041`
- **username/password**: 数据库的用户名和密码。
- **host/port**: 主机地址和端口号。例如`localhost:6041`
- **database**: 数据库名称。
- **params**: 其他参数。 例如token。
- 完整 URL 示例:
```js
ws://root:taosdata@localhost:6041
```
### WSConfig
除了通过指定的 URL 获取连接,还可以使用 WSConfig 指定建立连接时的参数。
```js
try {
let url = 'ws://127.0.0.1:6041'
let conf = WsSql.NewConfig(url)
conf.setUser('root')
conf.setPwd('taosdata')
conf.setDb('db')
conf.setTimeOut(500)
let wsSql = await WsSql.open(conf);
} catch (e) {
console.error(e);
}
```
WSConfig 中的配置如下:
- setUrl(url string) 设置 taosAdapter 连接地址 url详见上文 URL 规范。
- setUser(user: string) 设置数据库用户名。
- setPwd(pws:string) 设置数据库密码。
- setDb(db: string) 设置数据库名称。
- setTimeOut(ms : number) 设置连接超时,单位毫秒。
- setToken(token: string) 设置 taosAdapter 认证token。
### 连接功能
- `static async open(wsConfig:WSConfig):Promise<WsSql>`
- **接口说明**:建立 taosAdapter 连接。
- **参数说明**
- `wsConfig`:连接配置,详见上文 WSConfig 章节。
- **返回值**:连接对象。
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
- `destroyed()`
- **接口说明**:释放销毁资源。
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
### 获取 taosc 版本号
- `async version(): Promise<string>`
- **接口说明**:获取 taosc 客户端版本。
- **返回值**taosc 客户端版本号。
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
### 执行 SQL
- `async exec(sql: string, reqId?: number): Promise<TaosResult>`
- **接口说明**:执行 sql 语句。
- **参数说明**
- `sql`:待执行的 sql 语句。
- `reqId`: 请求 id 非必填,用于问题追踪。
- **返回值**:执行结果
```js
TaosResult {
affectRows: number, 影响的条数
timing: number, 执行时长
totalTime: number, 响应总时长
}
```
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
- `async query(sql: string, reqId?:number): Promise<WSRows>`
- **接口说明**:查询数据。
- **参数说明**
- `sql`:待执行的查询 sql 语句。
- `reqId`: 请求 id 非必填,用于问题追踪。
- **返回值**WSRows 数据集对象。
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
### 数据集
- `getMeta():Array<TDengineMeta> | null`
- **接口说明**:获取查询结果的的列的数量、类型和长度。
- **返回值**TDengineMeta 数据对象数组。
```js
export interface TDengineMeta {
name: string,
type: string,
length: number,
}
```
- `async next(): Promise<boolean>`
- **接口说明**:将游标从当前位置向后移动一行。用于遍历查询结果集。
- **返回值**:如果新的当前行有效,则返回 true如果结果集中没有更多行则返回 false。
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
- `getData(): Array<any>`
- **接口说明**:返回查询的一行数据。
- **返回值**:返回查询的一行数据,此接口需要搭配 next 接口一起使用。
- `async close():Promise<void>`
- **接口说明**:数据读取完成后,释放结果集。
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
### 无模式写入
- `async schemalessInsert(lines: Array<string>, protocol: SchemalessProto, precision: Precision, ttl: number, reqId?: number): Promise<void>`
- **接口说明**:无模式写入。
- **参数说明**
- `lines`:待写入的数据数组,无模式具体的数据格式可参考 `Schemaless 写入`。
- `protocol`: 协议类型
- `SchemalessProto.InfluxDBLineProtocol`InfluxDB 行协议Line Protocol)。
- `SchemalessProto.OpenTSDBTelnetLineProtocol`OpenTSDB 文本行协议。
- `SchemalessProto.OpenTSDBJsonFormatProtocol`JSON 协议格式。
- `precision`: 时间精度
- `Precision.HOURS` 小时
- `Precision.MINUTES`:分钟
- `Precision.SECONDS`:秒
- `Precision.MILLI_SECONDS`:毫秒
- `Precision.MICRO_SECONDS`:微秒
- `Precision.NANO_SECONDS` 纳秒
- `ttl`:表过期时间,单位天。
- `reqId`: 用于问题追踪,可选。
- **异常**:连接失败抛出 `TaosResultError` 异常。
### 参数绑定
- `async stmtInit(reqId?:number): Promise<WsStmt>`
- **接口说明** 使用 WsSql 对象创建 stmt 对象。
- **参数说明**
- `reqId`: 请求 id 非必填,用于问题追踪。
- **返回值**stmt 对象。
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
- `async prepare(sql: string): Promise<void>`
- **接口说明** 绑定预编译 sql 语句。
- **参数说明**
- `sql`: 预编译的 SQL 语句。
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
- `async setTableName(tableName: string): Promise<void>`
- **接口说明** 设置将要写入数据的表名。
- **参数说明**
- `tableName`: 表名,如果需要指定数据库, 例如: `db_name.table_name` 即可。
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
通过 StmtBindParams 对象设置绑定数据。
- `setBoolean(params :any[])`
- **接口说明** 设置布尔值。
- **参数说明**
- `params`: 布尔类型列表。
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
- 下面接口除了要设置的值类型不同外,其余同 setBoolean
- `setTinyInt(params :any[])`
- `setUTinyInt(params :any[])`
- `setSmallInt(params :any[])`
- `setUSmallInt(params :any[])`
- `setInt(params :any[])`
- `setUInt(params :any[])`
- `setBigint(params :any[])`
- `setUBigint(params :any[])`
- `setFloat(params :any[])`
- `setDouble(params :any[])`
- `setVarchar(params :any[])`
- `setBinary(params :any[])`
- `setNchar(params :any[])`
- `setJson(params :any[])`
- `setVarBinary(params :any[])`
- `setGeometry(params :any[])`
- `setTimestamp(params :any[])`
- `async setTags(paramsArray:StmtBindParams): Promise<void>`
- **接口说明** 设置表 Tags 数据,用于自动建表。
- **参数说明**
- `paramsArray`: Tags 数据。
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
- `async bind(paramsArray:StmtBindParams): Promise<void>`
- **接口说明** 绑定数据。
- **参数说明**
- `paramsArray`: 绑定数据。
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
- `async batch(): Promise<void>`
- **接口说明** 提交绑定数据。
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
- `async exec(): Promise<void>`
- **接口说明** 执行将绑定的数据全部写入。
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
- `getLastAffected()`
- **接口说明** 获取写入条数。
- **返回值**:写入条数。
- `async close(): Promise<void>`
- **接口说明** 关闭 stmt 对象。
- **异常**:连接失败抛出 `TDWebSocketClientError` 异常。
### 数据订阅
- **创建消费者支持属性列表**
- taos.TMQConstants.CONNECT_USER: 用户名。
- taos.TMQConstants.CONNECT_PASS: 密码。
- taos.TMQConstants.GROUP_ID: 所在的 group。
- taos.TMQConstants.CLIENT_ID: 客户端id。
- taos.TMQConstants.WS_URL: taosAdapter 的url地址。
- taos.TMQConstants.AUTO_OFFSET_RESET: 来确定消费位置为最新数据latest还是包含旧数据earliest
- taos.TMQConstants.ENABLE_AUTO_COMMIT: 是否允许自动提交。
- taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS: 自动提交间隔。
- taos.TMQConstants.CONNECT_MESSAGE_TIMEOUT: 数据传输超时参数,单位 ms默认为 10000 ms。
- `static async newConsumer(wsConfig:Map<string, any>):Promise<WsConsumer>`
- **接口说明** 消费者构造函数。
- **参数说明**
- `wsConfig`: 创建消费者属性配置。
- **返回值**WsConsumer 消费者对象。
- **异常**:如果在执行过程中出现异常,抛出 `TDWebSocketClientError` 错误。
- `async subscribe(topics: Array<string>, reqId?:number): Promise<void>`
- **接口说明** 订阅一组主题。
- **参数说明**
- `topics`: 订阅的主题列表。
- `reqId`: 请求 id 非必填,用于问题追踪。
- **异常**:失败抛出 `TDWebSocketClientError` 异常。
- `async unsubscribe(reqId?:number): Promise<void>`
- **接口说明** 取消订阅。
- **参数说明**
- `reqId`: 请求 id 非必填,用于问题追踪。
- **异常**:失败抛出 `TDWebSocketClientError` 异常。
- `async poll(timeoutMs: number, reqId?:number):Promise<Map<string, TaosResult>>`
- **接口说明** 轮询消息。
- **参数说明**
- `timeoutMs`: 表示轮询的超时时间,单位毫秒。
- `reqId`: 请求 id 非必填,用于问题追踪。
- **返回值**`Map<string, TaosResult>` 每个主题对应的数据。
- **异常**:失败抛出 `TDWebSocketClientError` 异常。
- `async subscription(reqId?:number):Promise<Array<string>>`
- **接口说明** 获取当前订阅的所有主题。
- **参数说明**
- `reqId`: 请求 id 非必填,用于问题追踪。
- **返回值**`Array<string>` 主题列表。
- **异常**:失败抛出 `TDWebSocketClientError` 异常。
- `async commit(reqId?:number):Promise<Array<TopicPartition>>`
- **接口说明** 提交当前处理的消息的偏移量。
- **参数说明**
- `reqId`: 请求 id 非必填,用于问题追踪。
- **返回值**`Array<TopicPartition>` 每个主题的消费进度。
- **异常**:失败抛出 `TDWebSocketClientError` 异常。
- `async committed(partitions:Array<TopicPartition>, reqId?:number):Promise<Array<TopicPartition>>`
- **接口说明**:获取一组分区最后提交的偏移量。
- **参数说明**
- `partitions`:一个 `Array<TopicPartition>` 类型的参数,表示要查询的分区集合。
- `reqId`: 请求 id 非必填,用于问题追踪。
- **返回值**`Array<TopicPartition>`,即一组分区最后提交的偏移量。
- **异常**:如果在获取提交的偏移量过程中发生错误,将抛出 TDWebSocketClientError 异常。
- `async seek(partition:TopicPartition, reqId?:number):Promise<void>`
- **接口说明**:将给定分区的偏移量设置到指定的位置。
- **参数说明**
- `partition`:一个 `TopicPartition` 类型的参数,表示要操作的分区和要设置的偏移量。
- `reqId`: 请求 id 非必填,用于问题追踪。
- **异常**:如果在设置偏移量过程中发生错误,将抛出 TDWebSocketClientError 异常。
- `async positions(partitions:Array<TopicPartition>, reqId?:number):Promise<Array<TopicPartition>>`
- **接口说明**:获取给定分区当前的偏移量。
- **参数说明**
- `partitions`:一个 `TopicPartition` 类型的参数,表示要查询的分区。
- `reqId`: 请求 id 非必填,用于问题追踪。
- **返回值**`Array<TopicPartition>`,即一组分区最后提交的偏移量。
- **异常**:如果在获取偏移量过程中发生错误,将抛出 TDWebSocketClientError 异常。
- `async seekToBeginning(partitions:Array<TopicPartition>):Promise<void>`
- **接口说明**:将一组分区的偏移量设置到最早的偏移量。
- **参数说明**
- `partitions`:一个 `Array<TopicPartition>` 类型的参数,表示要操作的分区集合。
- **异常**:如果在设置偏移量过程中发生错误,将抛出 TDWebSocketClientError 异常。
- `async seekToEnd(partitions:Array<TopicPartition>):Promise<void>`
- **接口说明**:将一组分区的偏移量设置到最新的偏移量。
- **参数说明**
- `partitions`:一个 `Array<TopicPartition>` 类型的参数,表示要操作的分区集合。
- **异常**:如果在设置偏移量过程中发生错误,将抛出 TDWebSocketClientError 异常。
- `async assignment(topics?:string[]):Promise<Array<TopicPartition>>`
- **接口说明**:获取消费者当前分配的指定的分区或所有分区。
- **参数说明**
- `topics`:需要获取的分区(非必填),不填表示获取全部的分区
- **返回值**:返回值类型为 `Array<TopicPartition>`,即消费者当前分配的所有分区。
- **异常**:如果在获取分配的分区过程中发生错误,将抛出 TDWebSocketClientError 异常。
- `async close():Promise<void>`
- **接口说明**:关闭 tmq 连接。
- **异常**:操作失败抛出 `TDWebSocketClientError` 异常。