add node.js example

This commit is contained in:
menshibin 2024-08-02 16:20:50 +08:00
parent 8bb7cf5926
commit 6693b2cc58
7 changed files with 222 additions and 26 deletions

View File

@ -38,10 +38,9 @@ async function prepare() {
await wsSql.exec(createDB);
await wsSql.exec(createStable);
// ANCHOR: create_topic
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
// ANCHOR_END: create_topic
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
for (let i = 0; i < 10; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
@ -49,37 +48,31 @@ await wsSql.exec(createTopic);
wsSql.Close();
}
// ANCHOR: subscribe
async function subscribe(consumer) {
await consumer.subscribe(topics);
for (let i = 0; i < 5; i++) {
let res = await consumer.poll(500);
for (let [key, value] of res) {
console.log(key, value);
}
if (res.size == 0) {
break;
}
await consumer.commit();
// ANCHOR: commit
try {
await consumer.subscribe(['topic_meters']);
for (let i = 0; i < 50; i++) {
let res = await consumer.poll(100);
for (let [key, value] of res) {
console.log(key, value);
}
consumer.commit();
}
} catch (err) {
console.error(err.code, err.message);
throw err;
}
// ANCHOR_END: commit
}
// ANCHOR_END: subscribe
async function test() {
// ANCHOR: unsubscribe
let consumer = null;
try {
await prepare();
let consumer = await createConsumer()
await subscribe(consumer)
// ANCHOR: assignment
let assignment = await consumer.assignment();
console.log(assignment);
assignment = await consumer.seekToBeginning(assignment);
for(let i in assignment) {
console.log("seek after:", assignment[i])
}
// ANCHOR_END: assignment
await subscribe(consumer)
await consumer.unsubscribe();
}
catch (err) {
@ -91,6 +84,7 @@ async function test() {
}
taos.destroy();
}
// ANCHOR_END: unsubscribe
}
test()

View File

@ -0,0 +1,104 @@
const taos = require("@tdengine/websocket");
const db = 'power';
const stable = 'meters';
const topics = ['power_meters_topic'];
// ANCHOR: create_consumer
async function createConsumer() {
let configMap = new Map([
[taos.TMQConstants.GROUP_ID, "group1"],
[taos.TMQConstants.CLIENT_ID, 'client1'],
[taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.WS_URL, 'ws://localhost:6041'],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]);
try {
return await taos.tmqConnect(configMap);
}catch (err) {
console.log(err);
throw err;
}
}
// ANCHOR_END: create_consumer
async function prepare() {
let conf = new taos.WSConfig('ws://localhost:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
const createDB = `CREATE DATABASE IF NOT EXISTS POWER ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;
let wsSql = await taos.sqlConnect(conf);
await wsSql.exec(createDB);
await wsSql.exec(createStable);
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic);
for (let i = 0; i < 10; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
}
wsSql.Close();
}
// ANCHOR: subscribe
async function subscribe(consumer) {
try {
await consumer.subscribe(['topic_meters']);
for (let i = 0; i < 50; i++) {
let res = await consumer.poll(100);
for (let [key, value] of res) {
console.log(key, value);
}
}
}catch (err) {
console.error(err.code, err.message);
throw err;
}
}
// ANCHOR_END: subscribe
// ANCHOR: offset
async function test() {
let consumer = null;
try {
await prepare();
let consumer = await createConsumer()
await consumer.subscribe(['topic_meters']);
let res = new Map();
while (res.size == 0) {
res = await consumer.poll(100);
}
let assignment = await consumer.assignment();
for (let i in assignment) {
console.log("seek before:", assignment[i]);
}
await consumer.seekToBeginning(assignment);
assignment = await consumer.assignment();
for (let i in assignment) {
console.log("seek after:", assignment[i]);
}
await consumer.unsubscribe();
}
catch (err) {
console.error(err.code, err.message);
}
finally {
if (consumer) {
await consumer.close();
}
taos.destroy();
}
}
// ANCHOR_END: offset
test()

View File

@ -0,0 +1,20 @@
# ANCHOR: connect
import taosrest
def create_connection():
conn = None
try:
conn = taosrest.connect(url="http://localhost:6041",
user="root",
password="taosdata",
timeout=30)
except Exception as err:
print(err)
finally:
if conn:
conn.close()
# ANCHOR_END: connect
if __name__ == "__main__":
create_connection()

View File

@ -0,0 +1,26 @@
import taos
conn = None
try:
conn = taos.connect(host="localhost",
port=6030,
user="root",
password="taosdata")
result = conn.query("SELECT ts, current, location FROM power.meters limit 100")
print(result)
# Get fields from result
fields = result.fields
for field in fields:
print(field)
# Get data from result as list of tuple
data = result.fetch_all()
for row in data:
print(row)
except Exception as err:
print(err)
finally:
if conn:
conn.close()

View File

@ -0,0 +1,15 @@
import taosrest
client = None
try:
client = taosrest.RestClient(url="http://localhost:6041",
user="root",
password="taosdata",
timeout=30)
result = client.sql(f"SELECT ts, current, location FROM power.meters limit 100")
print(result)
except Exception as err:
print(err)

View File

@ -0,0 +1,22 @@
import taosws
conn = None
try:
conn = taosws.connect(user="root",
password="taosdata",
host="localhost",
port=6041)
result = conn.query("SELECT ts, current, location FROM power.meters limit 100")
num_of_fields = result.field_count
print(num_of_fields)
for row in result:
print(row)
except Exception as err:
print(err)
finally:
if conn:
conn.close()

View File

@ -249,6 +249,9 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
<TabItem label="Node.js" value="node">
```js
{{#include docs/examples/node/websocketexample/tmq_seek_example.js:subscribe}}
```
</TabItem>
<TabItem label="C#" value="csharp">
@ -346,6 +349,9 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
<TabItem label="Node.js" value="node">
```js
{{#include docs/examples/node/websocketexample/tmq_seek_example.js:offset}}
```
</TabItem>
<TabItem label="C#" value="csharp">
@ -445,6 +451,9 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
<TabItem label="Node.js" value="node">
```js
{{#include docs/examples/node/websocketexample/tmq_example.js:commit}}
```
</TabItem>
<TabItem label="C#" value="csharp">
@ -550,6 +559,9 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
<TabItem label="Node.js" value="node">
```js
{{#include docs/examples/node/websocketexample/tmq_example.js:unsubscribe}}
```
</TabItem>
<TabItem label="C#" value="csharp">
@ -655,6 +667,9 @@ Rust 连接器创建消费者的参数为 DSN 可以设置的参数列表请
<TabItem label="Node.js" value="node">
```js
{{#include docs/examples/node/websocketexample/tmq_example.js}}
```
</TabItem>
<TabItem label="C#" value="csharp">