add python example

This commit is contained in:
menshibin 2024-08-03 23:32:14 +08:00
parent 67737307e4
commit bae9f78d3f
8 changed files with 29 additions and 46 deletions

View File

@ -1,16 +1,8 @@
const taos = require("@tdengine/websocket"); const taos = require("@tdengine/websocket");
let influxdbData = ["meters,location=California.LosAngeles,groupId=2 current=11.8,voltage=221,phase=0.28 1648432611249", let influxdbData = ["meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639"];
"meters,location=California.LosAngeles,groupId=2 current=13.4,voltage=223,phase=0.29 1648432611250", let jsonData = ["{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"]
"meters,location=California.LosAngeles,groupId=3 current=10.8,voltage=223,phase=0.29 1648432611249"]; let telnetData = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"];
let jsonData = ["{\"metric\": \"meter_current\",\"timestamp\": 1626846402,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}",
"{\"metric\": \"meter_current\",\"timestamp\": 1626846403,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1002\"}}",
"{\"metric\": \"meter_current\",\"timestamp\": 1626846404,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1003\"}}"]
let telnetData = ["meters.current 1648432611249 10.3 location=California.SanFrancisco groupid=2",
"meters.current 1648432611250 12.6 location=California.SanFrancisco groupid=2",
"meters.current 1648432611249 10.8 location=California.LosAngeles groupid=3"];
async function createConnect() { async function createConnect() {
let dsn = 'ws://localhost:6041' let dsn = 'ws://localhost:6041'
@ -30,11 +22,11 @@ async function test() {
try { try {
wsSql = await createConnect() wsSql = await createConnect()
await wsSql.schemalessInsert(influxdbData, taos.SchemalessProto.InfluxDBLineProtocol, taos.Precision.MILLI_SECONDS, ttl); await wsSql.schemalessInsert(influxdbData, taos.SchemalessProto.InfluxDBLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
await wsSql.schemalessInsert(jsonData, taos.SchemalessProto.OpenTSDBJsonFormatProtocol, taos.Precision.SECONDS, ttl);
await wsSql.schemalessInsert(telnetData, taos.SchemalessProto.OpenTSDBTelnetLineProtocol, taos.Precision.MILLI_SECONDS, ttl); await wsSql.schemalessInsert(telnetData, taos.SchemalessProto.OpenTSDBTelnetLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
await wsSql.schemalessInsert(jsonData, taos.SchemalessProto.OpenTSDBJsonFormatProtocol, taos.Precision.SECONDS, ttl);
} }
catch (err) { catch (err) {
console.error(err.code, err.message); console.error("Failed to insert data with schemaless, ErrCode:" + err.code + "; ErrMessage: " + err.message);
} }
finally { finally {
if (wsRows) { if (wsRows) {

View File

@ -28,8 +28,7 @@ async function createDbAndTable(wsSql) {
taosResult = await wsSql.exec('describe meters'); taosResult = await wsSql.exec('describe meters');
console.log(taosResult); console.log(taosResult);
} catch (err) { } catch (err) {
console.error("Failed to create db and table, ErrCode:" + err.code + "; ErrMessage: " + err.message);
console.error(err.code, err.message);
} finally { } finally {
if (wsSql) { if (wsSql) {
await wsSql.close(); await wsSql.close();
@ -56,7 +55,7 @@ async function insertData(wsSql) {
taosResult = await wsSql.exec(insertQuery); taosResult = await wsSql.exec(insertQuery);
console.log(taosResult); console.log(taosResult);
} catch (err) { } catch (err) {
console.error(err.code, err.message); console.error("Failed to insert data to power.meters, ErrCode:" + err.code + "; ErrMessage: " + err.message);
} finally { } finally {
if (wsSql) { if (wsSql) {
await wsSql.close(); await wsSql.close();
@ -71,7 +70,7 @@ async function queryData() {
let wsSql = null; let wsSql = null;
try { try {
wsSql = await createConnect(); wsSql = await createConnect();
wsRows = await wsSql.query('select * from meters'); wsRows = await wsSql.query('SELECT ts, current, location FROM power.meters limit 100');
let meta = wsRows.getMeta(); let meta = wsRows.getMeta();
console.log("wsRow:meta:=>", meta); console.log("wsRow:meta:=>", meta);
while (await wsRows.next()) { while (await wsRows.next()) {
@ -80,7 +79,7 @@ async function queryData() {
} }
} }
catch (err) { catch (err) {
console.error(err.code, err.message); console.error("Failed to query data from power.meters," + err.code + "; ErrMessage: " + err.message);
} }
finally { finally {
if (wsRows) { if (wsRows) {
@ -95,22 +94,12 @@ async function queryData() {
// ANCHOR: sqlWithReqid // ANCHOR: sqlWithReqid
async function sqlWithReqid(wsSql) { async function sqlWithReqid(wsSql) {
let insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters (location, groupId) TAGS('California.SanFrancisco', 2) " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
let wsRows = null; let wsRows = null;
let wsSql = null; let wsSql = null;
try { try {
wsSql = await createConnect(); wsSql = await createConnect();
taosResult = await wsSql.exec(insertQuery, 1); wsRows = await wsSql.query('SELECT ts, current, location FROM power.meters limit 100', 1);
wsRows = await wsSql.query('select * from meters', 2);
let meta = wsRows.getMeta(); let meta = wsRows.getMeta();
console.log("wsRow:meta:=>", meta); console.log("wsRow:meta:=>", meta);
while (await wsRows.next()) { while (await wsRows.next()) {
@ -119,7 +108,7 @@ async function sqlWithReqid(wsSql) {
} }
} }
catch (err) { catch (err) {
console.error(err.code, err.message); console.error("Failed to execute sql with reqId," + err.code + "; ErrMessage: " + err.message);
} }
finally { finally {
if (wsRows) { if (wsRows) {

View File

@ -57,8 +57,8 @@ async function prepare() {
console.log(`d_bind_${i} insert ` + stmt.getLastAffected() + " rows."); console.log(`d_bind_${i} insert ` + stmt.getLastAffected() + " rows.");
} }
} }
catch (e) { catch (err) {
console.error(e); console.error("Failed to insert to table meters using stmt, ErrCode:" + err.code + "; ErrMessage: " + err.message);
} }
finally { finally {
if (stmt) { if (stmt) {

View File

@ -19,7 +19,7 @@ async function createConsumer() {
try { try {
return await taos.tmqConnect(configMap); return await taos.tmqConnect(configMap);
}catch (err) { }catch (err) {
console.log(err); console.log("Failed to create websocket consumer, ErrCode:" + err.code + "; ErrMessage: " + err.message);
throw err; throw err;
} }
@ -60,7 +60,7 @@ async function subscribe(consumer) {
consumer.commit(); consumer.commit();
} }
} catch (err) { } catch (err) {
console.error(err.code, err.message); console.error("Failed to poll data; err.code, ErrCode:" + err.code + "; ErrMessage: " + err.message);
throw err; throw err;
} }
// ANCHOR_END: commit // ANCHOR_END: commit
@ -76,7 +76,7 @@ async function test() {
await consumer.unsubscribe(); await consumer.unsubscribe();
} }
catch (err) { catch (err) {
console.error(err.code, err.message); console.error("Failed to unsubscribe consume, ErrCode:" + err.code + "; ErrMessage: " + err.message);
} }
finally { finally {
if (consumer) { if (consumer) {

View File

@ -59,7 +59,7 @@ async function subscribe(consumer) {
} }
} }
}catch (err) { }catch (err) {
console.error(err.code, err.message); console.error("Failed to poll data; err.code, ErrCode:" + err.code + "; ErrMessage: " + err.message);
throw err; throw err;
} }
@ -91,7 +91,7 @@ async function test() {
await consumer.unsubscribe(); await consumer.unsubscribe();
} }
catch (err) { catch (err) {
console.error(err.code, err.message); console.error("seek example failed, ErrCode:" + err.code + "; ErrMessage: " + err.message);
} }
finally { finally {
if (consumer) { if (consumer) {

View File

@ -73,6 +73,7 @@ def create_consumer():
# ANCHOR: subscribe # ANCHOR: subscribe
def subscribe(consumer): def subscribe(consumer):
try: try:
# subscribe to the topics
consumer.subscribe(["topic_meters"]) consumer.subscribe(["topic_meters"])
print("subscribe topics successfully") print("subscribe topics successfully")
for i in range(50): for i in range(50):
@ -98,8 +99,6 @@ def subscribe(consumer):
def commit_offset(consumer): def commit_offset(consumer):
# ANCHOR: commit_offset # ANCHOR: commit_offset
try: try:
consumer.subscribe(["topic_meters"])
print("subscribe topics successfully")
for i in range(50): for i in range(50):
records = consumer.poll(1) records = consumer.poll(1)
if records: if records:
@ -113,6 +112,7 @@ def commit_offset(consumer):
for block in val: for block in val:
print(block.fetchall()) print(block.fetchall())
# after processing the data, commit the offset manually
consumer.commit(records) consumer.commit(records)
except Exception as err: except Exception as err:

View File

@ -116,6 +116,8 @@ def commit_offset(consumer):
for block in records: for block in records:
for row in block: for row in block:
print(row) print(row)
# after processing the data, commit the offset manually
consumer.commit(records) consumer.commit(records)
except Exception as err: except Exception as err:

View File

@ -59,7 +59,7 @@ Python 连接器可能会产生 4 种异常:
- 原生连接方式的异常 - 原生连接方式的异常
- websocket 连接方式异常 - websocket 连接方式异常
- 数据订阅异常 - 数据订阅异常
- TDengine 其他功能模块的异常 - TDengine 其他功能模块的报错,请参考 [错误码](../../../reference/error-code)
|Error Type|Description|Suggested Actions| |Error Type|Description|Suggested Actions|
|:--------:|:---------:|:---------------:| |:--------:|:---------:|:---------------:|
@ -67,11 +67,11 @@ Python 连接器可能会产生 4 种异常:
|ConnectionError|数据库链接错误|请检查 TDengine 服务端状态和连接参数| |ConnectionError|数据库链接错误|请检查 TDengine 服务端状态和连接参数|
|DatabaseError|数据库错误|请检查 TDengine 服务端版本,并将 Python 连接器升级到最新版| |DatabaseError|数据库错误|请检查 TDengine 服务端版本,并将 Python 连接器升级到最新版|
|OperationalError|操作错误|API 使用错误,请检查代码| |OperationalError|操作错误|API 使用错误,请检查代码|
|ProgrammingError||| |ProgrammingError|接口调用错误|请检查提交的数据是否正确|
|StatementError|stmt 相关异常|| |StatementError|stmt 相关异常|请检查绑定参数与 sql 是否匹配|
|ResultError||| |ResultError|操作数据错误|请检查操作的数据与数据库中的数据类型是否匹配|
|SchemalessError|schemaless 相关异常|| |SchemalessError|schemaless 相关异常|请检查数据格式及对应的协议类型是否正确|
|TmqError|tmq 相关异常|| |TmqError|tmq 相关异常|请检查 Topic 及 consumer 配置是否正确|
Python 中通常通过 try-expect 处理异常,异常处理相关请参考 [Python 错误和异常文档](https://docs.python.org/3/tutorial/errors.html)。 Python 中通常通过 try-expect 处理异常,异常处理相关请参考 [Python 错误和异常文档](https://docs.python.org/3/tutorial/errors.html)。
TDengine 其他功能模块的报错,请参考 [错误码](../../../reference/error-code) TDengine 其他功能模块的报错,请参考 [错误码](../../../reference/error-code)
@ -540,7 +540,7 @@ TaosResult 对象可以通过循环遍历获取查询到的数据。
- `RestClient(self, url: str, token: str = None, database: str = None, user: str = "root", password: str = "taosdata", timeout: int = None, convert_timestamp: bool = True, timezone: Union[str, datetime.tzinfo] = None)` - `RestClient(self, url: str, token: str = None, database: str = None, user: str = "root", password: str = "taosdata", timeout: int = None, convert_timestamp: bool = True, timezone: Union[str, datetime.tzinfo] = None)`
- **接口说明**:建立 taosAdapter 连接 client。 - **接口说明**:建立 taosAdapter 连接 client。
- **参数说明** - **参数说明**
- `url`: taosAdapter REST 服务的 URL。默认是 \<http://localhost:6041>。 - `url`: taosAdapter REST 服务的 URL。
- `user`: 数据库的用户名。 - `user`: 数据库的用户名。
- `password`: 数据库的密码。 - `password`: 数据库的密码。
- `database`: 数据库名称。 - `database`: 数据库名称。