From 4e2bd03df4ed11d93b7a27f8fbf9efb0b1596db5 Mon Sep 17 00:00:00 2001 From: menshibin Date: Wed, 25 Sep 2024 21:47:13 +0800 Subject: [PATCH] modify node ci example --- .../node/websocketexample/tmq_seek_example.js | 92 ++++++++++--------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/docs/examples/node/websocketexample/tmq_seek_example.js b/docs/examples/node/websocketexample/tmq_seek_example.js index 892483e718..e15e9acc6e 100644 --- a/docs/examples/node/websocketexample/tmq_seek_example.js +++ b/docs/examples/node/websocketexample/tmq_seek_example.js @@ -1,41 +1,46 @@ +const { sleep } = require("@tdengine/websocket"); const taos = require("@tdengine/websocket"); const db = 'power'; const stable = 'meters'; +const url = 'ws://localhost:6041'; const topic = 'topic_meters' const topics = [topic]; const groupId = "group1"; const clientId = "client1"; - -// ANCHOR: create_consumer async function createConsumer() { + + let groupId = "group1"; + let clientId = "client1"; let configMap = new Map([ - [taos.TMQConstants.GROUP_ID, "group1"], - [taos.TMQConstants.CLIENT_ID, 'client1'], + [taos.TMQConstants.GROUP_ID, groupId], + [taos.TMQConstants.CLIENT_ID, clientId], [taos.TMQConstants.CONNECT_USER, "root"], [taos.TMQConstants.CONNECT_PASS, "taosdata"], [taos.TMQConstants.AUTO_OFFSET_RESET, "latest"], - [taos.TMQConstants.WS_URL, 'ws://localhost:6041'], + [taos.TMQConstants.WS_URL, url], [taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'], [taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000'] ]); try { - return await taos.tmqConnect(configMap); + conn = await taos.tmqConnect(configMap); + console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`) + return conn; } catch (err) { - console.error(err); + console.error(`Failed to create websocket consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`); throw err; } } -// ANCHOR_END: create_consumer + async function prepare() { - let conf = new taos.WSConfig('ws://localhost:6041'); + let conf = new taos.WSConfig('ws://192.168.1.98:6041'); conf.setUser('root'); conf.setPwd('taosdata'); conf.setDb('power'); - const createDB = `CREATE DATABASE IF NOT EXISTS ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`; + const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`; const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`; let wsSql = await taos.sqlConnect(conf); @@ -44,66 +49,69 @@ async function prepare() { let createTopic = `CREATE TOPIC IF NOT EXISTS ${topics[0]} AS SELECT * FROM ${db}.${stable}`; await wsSql.exec(createTopic); + await wsSql.close(); +} - - for (let i = 0; i < 10; i++) { +async function insert() { + let conf = new taos.WSConfig('ws://192.168.1.98:6041'); + conf.setUser('root'); + conf.setPwd('taosdata'); + conf.setDb('power'); + let wsSql = await taos.sqlConnect(conf); + for (let i = 0; i < 1; i++) { await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`); } await wsSql.close(); } -// ANCHOR: subscribe +// ANCHOR: offset async function subscribe(consumer) { try { - await consumer.subscribe(['topic_meters']); - for (let i = 0; i < 50; i++) { - let res = await consumer.poll(100); - for (let [key, value] of res) { - // Add your data processing logic here - console.log(`data: ${key} ${value}`); - } - } - } catch (err) { - console.error(`Failed to poll data, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`); - throw err; - } - -} -// ANCHOR_END: subscribe - -// ANCHOR: offset -async function consumer() { - let consumer = null; - try { - await prepare(); - let consumer = await createConsumer() await consumer.subscribe(['topic_meters']); let res = new Map(); while (res.size == 0) { res = await consumer.poll(100); + await consumer.commit(); } - let assignment = await consumer.assignment(); await consumer.seekToBeginning(assignment); console.log("Assignment seek to beginning successfully"); + } catch (err) { + console.error(`Failed to seek offset, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`); + throw err; + } +} +// ANCHOR_END: offset + +async function consumer() { + let consumer = null; + try { + await prepare(); + consumer = await createConsumer(); + const allPromises = []; + allPromises.push(subscribe(consumer)); + allPromises.push(insert()); + await Promise.all(allPromises); + await consumer.unsubscribe(); + console.log("Consumer unsubscribed successfully."); } catch (err) { - console.error(`Failed to seek offset, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`); + console.error(`Failed to unsubscribe consumer, topic: ${topic}, groupId: ${groupId}, clientId: ${clientId}, ErrCode: ${err.code}, ErrMessage: ${err.message}`); throw err; } finally { if (consumer) { await consumer.close(); + console.log("Consumer closed successfully."); } taos.destroy(); } } -// ANCHOR_END: offset async function test() { - console.log("begin tmq_seek_example") + console.log("begin tmq_example") await consumer(); - console.log("end tmq_seek_example") - } - + console.log("end tmq_example") +} + test()