From 6693b2cc58af66fe67cf2aa9931e98d8b6754a74 Mon Sep 17 00:00:00 2001 From: menshibin Date: Fri, 2 Aug 2024 16:20:50 +0800 Subject: [PATCH] add node.js example --- .../node/websocketexample/tmq_example.js | 46 ++++---- .../node/websocketexample/tmq_seek_example.js | 104 ++++++++++++++++++ docs/examples/python/connect_rest_example.py | 20 ++++ docs/examples/python/query_native.py | 26 +++++ docs/examples/python/query_rest.py | 15 +++ docs/examples/python/query_ws.py | 22 ++++ docs/zh/08-develop/07-tmq.md | 15 +++ 7 files changed, 222 insertions(+), 26 deletions(-) create mode 100644 docs/examples/node/websocketexample/tmq_seek_example.js create mode 100644 docs/examples/python/connect_rest_example.py create mode 100644 docs/examples/python/query_native.py create mode 100644 docs/examples/python/query_rest.py create mode 100644 docs/examples/python/query_ws.py diff --git a/docs/examples/node/websocketexample/tmq_example.js b/docs/examples/node/websocketexample/tmq_example.js index b7a8d13809..12c303acf4 100644 --- a/docs/examples/node/websocketexample/tmq_example.js +++ b/docs/examples/node/websocketexample/tmq_example.js @@ -38,10 +38,9 @@ async function prepare() { await wsSql.exec(createDB); await wsSql.exec(createStable); -// ANCHOR: create_topic -let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`; -await wsSql.exec(createTopic); -// ANCHOR_END: create_topic + let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`; + await wsSql.exec(createTopic); + for (let i = 0; i < 10; i++) { await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`); @@ -49,37 +48,31 @@ await wsSql.exec(createTopic); wsSql.Close(); } -// ANCHOR: subscribe async function subscribe(consumer) { - await consumer.subscribe(topics); - for (let i = 0; i < 5; i++) { - let res = await consumer.poll(500); - for (let [key, value] of res) { - console.log(key, value); - } - if (res.size == 0) { - break; - } - await consumer.commit(); + // ANCHOR: commit + try { + await consumer.subscribe(['topic_meters']); + for (let i = 0; i < 50; i++) { + let res = await consumer.poll(100); + for (let [key, value] of res) { + console.log(key, value); + } + consumer.commit(); + } + } catch (err) { + console.error(err.code, err.message); + throw err; } + // ANCHOR_END: commit } -// ANCHOR_END: subscribe async function test() { + // ANCHOR: unsubscribe let consumer = null; try { await prepare(); let consumer = await createConsumer() - await subscribe(consumer) - // ANCHOR: assignment - let assignment = await consumer.assignment(); - console.log(assignment); - - assignment = await consumer.seekToBeginning(assignment); - for(let i in assignment) { - console.log("seek after:", assignment[i]) - } - // ANCHOR_END: assignment + await subscribe(consumer) await consumer.unsubscribe(); } catch (err) { @@ -91,6 +84,7 @@ async function test() { } taos.destroy(); } + // ANCHOR_END: unsubscribe } test() diff --git a/docs/examples/node/websocketexample/tmq_seek_example.js b/docs/examples/node/websocketexample/tmq_seek_example.js new file mode 100644 index 0000000000..17242dc870 --- /dev/null +++ b/docs/examples/node/websocketexample/tmq_seek_example.js @@ -0,0 +1,104 @@ +const taos = require("@tdengine/websocket"); + +const db = 'power'; +const stable = 'meters'; +const topics = ['power_meters_topic']; + +// ANCHOR: create_consumer +async function createConsumer() { + let configMap = new Map([ + [taos.TMQConstants.GROUP_ID, "group1"], + [taos.TMQConstants.CLIENT_ID, 'client1'], + [taos.TMQConstants.CONNECT_USER, "root"], + [taos.TMQConstants.CONNECT_PASS, "taosdata"], + [taos.TMQConstants.AUTO_OFFSET_RESET, "latest"], + [taos.TMQConstants.WS_URL, 'ws://localhost:6041'], + [taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'], + [taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000'] + ]); + try { + return await taos.tmqConnect(configMap); + }catch (err) { + console.log(err); + throw err; + } + +} +// ANCHOR_END: create_consumer + +async function prepare() { + let conf = new taos.WSConfig('ws://localhost:6041'); + conf.setUser('root'); + conf.setPwd('taosdata'); + conf.setDb('power'); + const createDB = `CREATE DATABASE IF NOT EXISTS POWER ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`; + const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`; + + let wsSql = await taos.sqlConnect(conf); + await wsSql.exec(createDB); + await wsSql.exec(createStable); + + let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`; + await wsSql.exec(createTopic); + + + for (let i = 0; i < 10; i++) { + await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`); + } + wsSql.Close(); +} + +// ANCHOR: subscribe +async function subscribe(consumer) { + try { + await consumer.subscribe(['topic_meters']); + for (let i = 0; i < 50; i++) { + let res = await consumer.poll(100); + for (let [key, value] of res) { + console.log(key, value); + } + } + }catch (err) { + console.error(err.code, err.message); + throw err; + } + +} +// ANCHOR_END: subscribe + +// ANCHOR: offset +async function test() { + let consumer = null; + try { + await prepare(); + let consumer = await createConsumer() + await consumer.subscribe(['topic_meters']); + let res = new Map(); + while (res.size == 0) { + res = await consumer.poll(100); + } + + let assignment = await consumer.assignment(); + for (let i in assignment) { + console.log("seek before:", assignment[i]); + } + + await consumer.seekToBeginning(assignment); + assignment = await consumer.assignment(); + for (let i in assignment) { + console.log("seek after:", assignment[i]); + } + await consumer.unsubscribe(); + } + catch (err) { + console.error(err.code, err.message); + } + finally { + if (consumer) { + await consumer.close(); + } + taos.destroy(); + } +} +// ANCHOR_END: offset +test() diff --git a/docs/examples/python/connect_rest_example.py b/docs/examples/python/connect_rest_example.py new file mode 100644 index 0000000000..c2b8f38431 --- /dev/null +++ b/docs/examples/python/connect_rest_example.py @@ -0,0 +1,20 @@ +# ANCHOR: connect +import taosrest + +def create_connection(): + conn = None + try: + conn = taosrest.connect(url="http://localhost:6041", + user="root", + password="taosdata", + timeout=30) + except Exception as err: + print(err) + finally: + if conn: + conn.close() +# ANCHOR_END: connect + +if __name__ == "__main__": + create_connection() + diff --git a/docs/examples/python/query_native.py b/docs/examples/python/query_native.py new file mode 100644 index 0000000000..6e361405e3 --- /dev/null +++ b/docs/examples/python/query_native.py @@ -0,0 +1,26 @@ +import taos + +conn = None +try: + conn = taos.connect(host="localhost", + port=6030, + user="root", + password="taosdata") + + result = conn.query("SELECT ts, current, location FROM power.meters limit 100") + print(result) + # Get fields from result + fields = result.fields + for field in fields: + print(field) + + # Get data from result as list of tuple + data = result.fetch_all() + for row in data: + print(row) + +except Exception as err: + print(err) +finally: + if conn: + conn.close() \ No newline at end of file diff --git a/docs/examples/python/query_rest.py b/docs/examples/python/query_rest.py new file mode 100644 index 0000000000..fc31e9db33 --- /dev/null +++ b/docs/examples/python/query_rest.py @@ -0,0 +1,15 @@ +import taosrest + +client = None + +try: + client = taosrest.RestClient(url="http://localhost:6041", + user="root", + password="taosdata", + timeout=30) + + result = client.sql(f"SELECT ts, current, location FROM power.meters limit 100") + print(result) + +except Exception as err: + print(err) diff --git a/docs/examples/python/query_ws.py b/docs/examples/python/query_ws.py new file mode 100644 index 0000000000..ee0d40167b --- /dev/null +++ b/docs/examples/python/query_ws.py @@ -0,0 +1,22 @@ +import taosws + +conn = None + +try: + conn = taosws.connect(user="root", + password="taosdata", + host="localhost", + port=6041) + + result = conn.query("SELECT ts, current, location FROM power.meters limit 100") + num_of_fields = result.field_count + print(num_of_fields) + + for row in result: + print(row) + +except Exception as err: + print(err) +finally: + if conn: + conn.close() diff --git a/docs/zh/08-develop/07-tmq.md b/docs/zh/08-develop/07-tmq.md index cf88f5fc2e..3f379b52e6 100644 --- a/docs/zh/08-develop/07-tmq.md +++ b/docs/zh/08-develop/07-tmq.md @@ -249,6 +249,9 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 +```js + {{#include docs/examples/node/websocketexample/tmq_seek_example.js:subscribe}} +``` @@ -346,6 +349,9 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 +```js + {{#include docs/examples/node/websocketexample/tmq_seek_example.js:offset}} +``` @@ -445,6 +451,9 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 +```js + {{#include docs/examples/node/websocketexample/tmq_example.js:commit}} +``` @@ -550,6 +559,9 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 +```js + {{#include docs/examples/node/websocketexample/tmq_example.js:unsubscribe}} +``` @@ -655,6 +667,9 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请 +```js + {{#include docs/examples/node/websocketexample/tmq_example.js}} +```