modify node ci example

This commit is contained in:
menshibin 2024-09-25 21:47:13 +08:00
parent 396945f57c
commit 4e2bd03df4
1 changed files with 50 additions and 42 deletions

View File

@ -1,41 +1,46 @@
const { sleep } = require("@tdengine/websocket");
const taos = require("@tdengine/websocket"); const taos = require("@tdengine/websocket");
const db = 'power'; const db = 'power';
const stable = 'meters'; const stable = 'meters';
const url = 'ws://localhost:6041';
const topic = 'topic_meters' const topic = 'topic_meters'
const topics = [topic]; const topics = [topic];
const groupId = "group1"; const groupId = "group1";
const clientId = "client1"; const clientId = "client1";
// ANCHOR: create_consumer
async function createConsumer() { async function createConsumer() {
let groupId = "group1";
let clientId = "client1";
let configMap = new Map([ let configMap = new Map([
[taos.TMQConstants.GROUP_ID, "group1"], [taos.TMQConstants.GROUP_ID, groupId],
[taos.TMQConstants.CLIENT_ID, 'client1'], [taos.TMQConstants.CLIENT_ID, clientId],
[taos.TMQConstants.CONNECT_USER, "root"], [taos.TMQConstants.CONNECT_USER, "root"],
[taos.TMQConstants.CONNECT_PASS, "taosdata"], [taos.TMQConstants.CONNECT_PASS, "taosdata"],
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"], [taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
[taos.TMQConstants.WS_URL, 'ws://localhost:6041'], [taos.TMQConstants.WS_URL, url],
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'], [taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000'] [taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
]); ]);
try { try {
return await taos.tmqConnect(configMap); conn = await taos.tmqConnect(configMap);
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
return conn;
} catch (err) { } catch (err) {
console.error(err); console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err; throw err;
} }
} }
// ANCHOR_END: create_consumer
async function prepare() { async function prepare() {
let conf = new taos.WSConfig('ws://localhost:6041'); let conf = new taos.WSConfig('ws://192.168.1.98:6041');
conf.setUser('root'); conf.setUser('root');
conf.setPwd('taosdata'); conf.setPwd('taosdata');
conf.setDb('power'); conf.setDb('power');
const createDB = `CREATE DATABASE IF NOT EXISTS ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`; const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`;
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`; const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;
let wsSql = await taos.sqlConnect(conf); let wsSql = await taos.sqlConnect(conf);
@ -44,66 +49,69 @@ async function prepare() {
let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`; let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`;
await wsSql.exec(createTopic); await wsSql.exec(createTopic);
await wsSql.close();
}
async function insert() {
for (let i = 0; i < 10; i++) { let conf = new taos.WSConfig('ws://192.168.1.98:6041');
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
let wsSql = await taos.sqlConnect(conf);
for (let i = 0; i < 1; i++) {
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`); await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
} }
await wsSql.close(); await wsSql.close();
} }
// ANCHOR: subscribe // ANCHOR: offset
async function subscribe(consumer) { async function subscribe(consumer) {
try { try {
await consumer.subscribe(['topic_meters']);
for (let i = 0; i < 50; i++) {
let res = await consumer.poll(100);
for (let [key, value] of res) {
// Add your data processing logic here
console.log(`data: ${key} ${value}`);
}
}
} catch (err) {
console.error(`Failed to poll data, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}
// ANCHOR_END: subscribe
// ANCHOR: offset
async function consumer() {
let consumer = null;
try {
await prepare();
let consumer = await createConsumer()
await consumer.subscribe(['topic_meters']); await consumer.subscribe(['topic_meters']);
let res = new Map(); let res = new Map();
while (res.size == 0) { while (res.size == 0) {
res = await consumer.poll(100); res = await consumer.poll(100);
await consumer.commit();
} }
let assignment = await consumer.assignment(); let assignment = await consumer.assignment();
await consumer.seekToBeginning(assignment); await consumer.seekToBeginning(assignment);
console.log("Assignment seek to beginning successfully"); console.log("Assignment seek to beginning successfully");
} catch (err) {
console.error(`Failed to seek offset, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err;
}
}
// ANCHOR_END: offset
async function consumer() {
let consumer = null;
try {
await prepare();
consumer = await createConsumer();
const allPromises = [];
allPromises.push(subscribe(consumer));
allPromises.push(insert());
await Promise.all(allPromises);
await consumer.unsubscribe();
console.log("Consumer unsubscribed successfully.");
} }
catch (err) { catch (err) {
console.error(`Failed to seek offset, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`); console.error(`Failed to unsubscribe consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
throw err; throw err;
} }
finally { finally {
if (consumer) { if (consumer) {
await consumer.close(); await consumer.close();
console.log("Consumer closed successfully.");
} }
taos.destroy(); taos.destroy();
} }
} }
// ANCHOR_END: offset
async function test() { async function test() {
console.log("begin tmq_seek_example") console.log("begin tmq_example")
await consumer(); await consumer();
console.log("end tmq_seek_example") console.log("end tmq_example")
} }
test() test()