Merge branch 'docs/sheyj-3.0' of github.com:taosdata/TDengine into docs/sheyj-3.0
This commit is contained in:
commit
dd3e92d707
|
@ -16,6 +16,7 @@ async function createConnect() {
|
||||||
}
|
}
|
||||||
|
|
||||||
async function test() {
|
async function test() {
|
||||||
|
let dsn = 'ws://localhost:6041'
|
||||||
let wsSql = null;
|
let wsSql = null;
|
||||||
let wsRows = null;
|
let wsRows = null;
|
||||||
let ttl = 0;
|
let ttl = 0;
|
||||||
|
@ -24,9 +25,10 @@ async function test() {
|
||||||
await wsSql.schemalessInsert(influxdbData, taos.SchemalessProto.InfluxDBLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
|
await wsSql.schemalessInsert(influxdbData, taos.SchemalessProto.InfluxDBLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
|
||||||
await wsSql.schemalessInsert(telnetData, taos.SchemalessProto.OpenTSDBTelnetLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
|
await wsSql.schemalessInsert(telnetData, taos.SchemalessProto.OpenTSDBTelnetLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
|
||||||
await wsSql.schemalessInsert(jsonData, taos.SchemalessProto.OpenTSDBJsonFormatProtocol, taos.Precision.SECONDS, ttl);
|
await wsSql.schemalessInsert(jsonData, taos.SchemalessProto.OpenTSDBJsonFormatProtocol, taos.Precision.SECONDS, ttl);
|
||||||
|
console.log("Inserted data with schemaless successfully.")
|
||||||
}
|
}
|
||||||
catch (err) {
|
catch (err) {
|
||||||
console.error("Failed to insert data with schemaless, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
console.error("Failed to insert data with schemaless, url:"+ dsn +", ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
if (wsRows) {
|
if (wsRows) {
|
||||||
|
|
|
@ -1,34 +1,39 @@
|
||||||
// ANCHOR: createConnect
|
// ANCHOR: createConnect
|
||||||
const taos = require("@tdengine/websocket");
|
const taos = require("@tdengine/websocket");
|
||||||
|
|
||||||
|
let dsn = 'ws://localhost:6041';
|
||||||
async function createConnect() {
|
async function createConnect() {
|
||||||
let dsn = 'ws://localhost:6041';
|
|
||||||
let conf = new taos.WSConfig(dsn);
|
try {
|
||||||
conf.setUser('root');
|
let conf = new taos.WSConfig(dsn);
|
||||||
conf.setPwd('taosdata');
|
conf.setUser('root');
|
||||||
conf.setDb('power');
|
conf.setPwd('taosdata');
|
||||||
return await taos.sqlConnect(conf);
|
conf.setDb('power');
|
||||||
|
conn = await taos.sqlConnect(conf);
|
||||||
|
console.log("Connected to " + dsn + " successfully.");
|
||||||
|
return conn;
|
||||||
|
} catch (err) {
|
||||||
|
console.log("Failed to connect to " + dns + "; ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
// ANCHOR_END: createConnect
|
// ANCHOR_END: createConnect
|
||||||
|
|
||||||
// ANCHOR: create_db_and_table
|
// ANCHOR: create_db_and_table
|
||||||
async function createDbAndTable(wsSql) {
|
async function createDbAndTable() {
|
||||||
let wsSql = null;
|
let wsSql = null;
|
||||||
try {
|
try {
|
||||||
wsSql = await createConnect();
|
wsSql = await createConnect();
|
||||||
await wsSql.exec('CREATE DATABASE IF NOT EXISTS POWER ' +
|
await wsSql.exec('CREATE DATABASE IF NOT EXISTS power');
|
||||||
'KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;');
|
|
||||||
|
|
||||||
await wsSql.exec('USE power');
|
await wsSql.exec('CREATE STABLE IF NOT EXISTS power.meters ' +
|
||||||
|
|
||||||
await wsSql.exec('CREATE STABLE IF NOT EXISTS meters ' +
|
|
||||||
'(_ts timestamp, current float, voltage int, phase float) ' +
|
'(_ts timestamp, current float, voltage int, phase float) ' +
|
||||||
'TAGS (location binary(64), groupId int);');
|
'TAGS (location binary(64), groupId int);');
|
||||||
|
|
||||||
taosResult = await wsSql.exec('describe meters');
|
console.log("Create stable power.meters successfully");
|
||||||
console.log(taosResult);
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("Failed to create db and table, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
console.error("Failed to create db and table, url:" + dns + "; ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||||
} finally {
|
} finally {
|
||||||
if (wsSql) {
|
if (wsSql) {
|
||||||
await wsSql.close();
|
await wsSql.close();
|
||||||
|
@ -39,8 +44,8 @@ async function createDbAndTable(wsSql) {
|
||||||
// ANCHOR_END: create_db_and_table
|
// ANCHOR_END: create_db_and_table
|
||||||
|
|
||||||
// ANCHOR: insertData
|
// ANCHOR: insertData
|
||||||
async function insertData(wsSql) {
|
async function insertData() {
|
||||||
let wsSql = null;
|
let wsSql = null
|
||||||
try {
|
try {
|
||||||
wsSql = await createConnect();
|
wsSql = await createConnect();
|
||||||
let insertQuery = "INSERT INTO " +
|
let insertQuery = "INSERT INTO " +
|
||||||
|
@ -53,9 +58,9 @@ async function insertData(wsSql) {
|
||||||
"VALUES " +
|
"VALUES " +
|
||||||
"(NOW + 1a, 10.30000, 218, 0.25000) ";
|
"(NOW + 1a, 10.30000, 218, 0.25000) ";
|
||||||
taosResult = await wsSql.exec(insertQuery);
|
taosResult = await wsSql.exec(insertQuery);
|
||||||
console.log(taosResult);
|
console.log("Successfully inserted " + taosResult.getAffectRows() + " rows to power.meters.");
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("Failed to insert data to power.meters, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
console.error("Failed to insert data to power.meters, url:" + dsn + "; ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||||
} finally {
|
} finally {
|
||||||
if (wsSql) {
|
if (wsSql) {
|
||||||
await wsSql.close();
|
await wsSql.close();
|
||||||
|
@ -71,15 +76,13 @@ async function queryData() {
|
||||||
try {
|
try {
|
||||||
wsSql = await createConnect();
|
wsSql = await createConnect();
|
||||||
wsRows = await wsSql.query('SELECT ts, current, location FROM power.meters limit 100');
|
wsRows = await wsSql.query('SELECT ts, current, location FROM power.meters limit 100');
|
||||||
let meta = wsRows.getMeta();
|
|
||||||
console.log("wsRow:meta:=>", meta);
|
|
||||||
while (await wsRows.next()) {
|
while (await wsRows.next()) {
|
||||||
let result = wsRows.getData();
|
let row = wsRows.getData();
|
||||||
console.log('queryRes.Scan().then=>', result);
|
console.log('ts: ' + row[0] + ', current: ' + row[1] + ', location: ' + row[2]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (err) {
|
catch (err) {
|
||||||
console.error("Failed to query data from power.meters," + err.code + "; ErrMessage: " + err.message);
|
console.error("Failed to query data from power.meters, url:" + dsn + " ; ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
if (wsRows) {
|
if (wsRows) {
|
||||||
|
@ -93,22 +96,20 @@ async function queryData() {
|
||||||
// ANCHOR_END: queryData
|
// ANCHOR_END: queryData
|
||||||
|
|
||||||
// ANCHOR: sqlWithReqid
|
// ANCHOR: sqlWithReqid
|
||||||
async function sqlWithReqid(wsSql) {
|
async function sqlWithReqid() {
|
||||||
|
|
||||||
let wsRows = null;
|
let wsRows = null;
|
||||||
let wsSql = null;
|
let wsSql = null;
|
||||||
|
let reqId = 1;
|
||||||
try {
|
try {
|
||||||
wsSql = await createConnect();
|
wsSql = await createConnect();
|
||||||
wsRows = await wsSql.query('SELECT ts, current, location FROM power.meters limit 100', 1);
|
wsRows = await wsSql.query('SELECT ts, current, location FROM power.meters limit 100', reqId);
|
||||||
let meta = wsRows.getMeta();
|
|
||||||
console.log("wsRow:meta:=>", meta);
|
|
||||||
while (await wsRows.next()) {
|
while (await wsRows.next()) {
|
||||||
let result = wsRows.getData();
|
let row = wsRows.getData();
|
||||||
console.log('queryRes.Scan().then=>', result);
|
console.log('ts: ' + row[0] + ', current: ' + row[1] + ', location: ' + row[2]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (err) {
|
catch (err) {
|
||||||
console.error("Failed to execute sql with reqId," + err.code + "; ErrMessage: " + err.message);
|
console.error("Failed to execute sql with reqId: " + reqId + "," + err.code + "; ErrMessage: " + err.message);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
if (wsRows) {
|
if (wsRows) {
|
||||||
|
|
|
@ -4,6 +4,7 @@ let db = 'power';
|
||||||
let stable = 'meters';
|
let stable = 'meters';
|
||||||
let numOfSubTable = 10;
|
let numOfSubTable = 10;
|
||||||
let numOfRow = 10;
|
let numOfRow = 10;
|
||||||
|
let dsn = 'ws://localhost:6041'
|
||||||
function getRandomInt(min, max) {
|
function getRandomInt(min, max) {
|
||||||
min = Math.ceil(min);
|
min = Math.ceil(min);
|
||||||
max = Math.floor(max);
|
max = Math.floor(max);
|
||||||
|
@ -11,7 +12,7 @@ function getRandomInt(min, max) {
|
||||||
}
|
}
|
||||||
|
|
||||||
async function prepare() {
|
async function prepare() {
|
||||||
let dsn = 'ws://localhost:6041'
|
|
||||||
let conf = new taos.WSConfig(dsn);
|
let conf = new taos.WSConfig(dsn);
|
||||||
conf.setUser('root')
|
conf.setUser('root')
|
||||||
conf.setPwd('taosdata')
|
conf.setPwd('taosdata')
|
||||||
|
@ -54,11 +55,11 @@ async function prepare() {
|
||||||
await stmt.bind(bindParams);
|
await stmt.bind(bindParams);
|
||||||
await stmt.batch();
|
await stmt.batch();
|
||||||
await stmt.exec();
|
await stmt.exec();
|
||||||
console.log(`d_bind_${i} insert ` + stmt.getLastAffected() + " rows.");
|
console.log("Successfully inserted " + stmt.getLastAffected() + " to power.meters.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (err) {
|
catch (err) {
|
||||||
console.error("Failed to insert to table meters using stmt, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
console.error("Failed to insert to table meters using stmt, url:" + dsn + "ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
if (stmt) {
|
if (stmt) {
|
||||||
|
|
|
@ -1,23 +1,28 @@
|
||||||
const taos = require("@tdengine/websocket");
|
const taos = require("@tdengine/websocket");
|
||||||
|
|
||||||
|
// ANCHOR: create_consumer
|
||||||
const db = 'power';
|
const db = 'power';
|
||||||
const stable = 'meters';
|
const stable = 'meters';
|
||||||
const topics = ['power_meters_topic'];
|
const topics = ['power_meters_topic'];
|
||||||
|
const url = 'ws://localhost:6041';
|
||||||
// ANCHOR: create_consumer
|
|
||||||
async function createConsumer() {
|
async function createConsumer() {
|
||||||
|
|
||||||
|
let groupId = "group1";
|
||||||
|
let clientId = "1";
|
||||||
let configMap = new Map([
|
let configMap = new Map([
|
||||||
[taos.TMQConstants.GROUP_ID, "group1"],
|
[taos.TMQConstants.GROUP_ID, groupId],
|
||||||
[taos.TMQConstants.CLIENT_ID, 'client1'],
|
[taos.TMQConstants.CLIENT_ID, clientId],
|
||||||
[taos.TMQConstants.CONNECT_USER, "root"],
|
[taos.TMQConstants.CONNECT_USER, "root"],
|
||||||
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
|
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
|
||||||
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
|
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
|
||||||
[taos.TMQConstants.WS_URL, 'ws://localhost:6041'],
|
[taos.TMQConstants.WS_URL, url],
|
||||||
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
|
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
|
||||||
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
|
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
|
||||||
]);
|
]);
|
||||||
try {
|
try {
|
||||||
return await taos.tmqConnect(configMap);
|
conn = await taos.tmqConnect(configMap);
|
||||||
|
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
|
||||||
|
return conn;
|
||||||
}catch (err) {
|
}catch (err) {
|
||||||
console.log("Failed to create websocket consumer, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
console.log("Failed to create websocket consumer, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||||
throw err;
|
throw err;
|
||||||
|
@ -31,7 +36,7 @@ async function prepare() {
|
||||||
conf.setUser('root');
|
conf.setUser('root');
|
||||||
conf.setPwd('taosdata');
|
conf.setPwd('taosdata');
|
||||||
conf.setDb('power');
|
conf.setDb('power');
|
||||||
const createDB = `CREATE DATABASE IF NOT EXISTS POWER ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`;
|
const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`;
|
||||||
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;
|
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;
|
||||||
|
|
||||||
let wsSql = await taos.sqlConnect(conf);
|
let wsSql = await taos.sqlConnect(conf);
|
||||||
|
@ -45,7 +50,7 @@ async function prepare() {
|
||||||
for (let i = 0; i < 10; i++) {
|
for (let i = 0; i < 10; i++) {
|
||||||
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
|
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
|
||||||
}
|
}
|
||||||
wsSql.Close();
|
wsSql.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
async function subscribe(consumer) {
|
async function subscribe(consumer) {
|
||||||
|
@ -55,9 +60,10 @@ async function subscribe(consumer) {
|
||||||
for (let i = 0; i < 50; i++) {
|
for (let i = 0; i < 50; i++) {
|
||||||
let res = await consumer.poll(100);
|
let res = await consumer.poll(100);
|
||||||
for (let [key, value] of res) {
|
for (let [key, value] of res) {
|
||||||
console.log(key, value);
|
console.log(`data: ${key} ${value}`);
|
||||||
}
|
}
|
||||||
consumer.commit();
|
consumer.commit();
|
||||||
|
console.log("commit offset manually successfully.");
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("Failed to poll data; err.code, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
console.error("Failed to poll data; err.code, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||||
|
@ -74,6 +80,7 @@ async function test() {
|
||||||
let consumer = await createConsumer()
|
let consumer = await createConsumer()
|
||||||
await subscribe(consumer)
|
await subscribe(consumer)
|
||||||
await consumer.unsubscribe();
|
await consumer.unsubscribe();
|
||||||
|
console.log("unsubscribe consumer successfully.");
|
||||||
}
|
}
|
||||||
catch (err) {
|
catch (err) {
|
||||||
console.error("Failed to unsubscribe consume, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
console.error("Failed to unsubscribe consume, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||||
|
|
|
@ -84,6 +84,7 @@ async function test() {
|
||||||
}
|
}
|
||||||
|
|
||||||
await consumer.seekToBeginning(assignment);
|
await consumer.seekToBeginning(assignment);
|
||||||
|
console.log("assignment seek to beginning successfully");
|
||||||
assignment = await consumer.assignment();
|
assignment = await consumer.assignment();
|
||||||
for (let i in assignment) {
|
for (let i in assignment) {
|
||||||
console.log("seek after:", assignment[i]);
|
console.log("seek after:", assignment[i]);
|
||||||
|
|
|
@ -14,7 +14,7 @@ def create_connection():
|
||||||
)
|
)
|
||||||
print(f"Connected to {host}:{port} successfully.");
|
print(f"Connected to {host}:{port} successfully.");
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to connect to {host}:{port} ; Err:{err}")
|
print(f"Failed to connect to {host}:{port} ; err:{err}")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -12,7 +12,7 @@ def create_connection():
|
||||||
|
|
||||||
print(f"Connected to {url} successfully.");
|
print(f"Connected to {url} successfully.");
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to connect to {url} ; Err:{err}")
|
print(f"Failed to connect to {url} ; err:{err}")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -9,26 +9,18 @@ try:
|
||||||
user="root",
|
user="root",
|
||||||
password="taosdata")
|
password="taosdata")
|
||||||
|
|
||||||
db = "power"
|
|
||||||
# create database
|
# create database
|
||||||
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
|
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS power")
|
||||||
assert rowsAffected == 0
|
print(f"Create database power successfully, rowsAffected: {rowsAffected}");
|
||||||
|
|
||||||
# change database. same as execute "USE db"
|
|
||||||
conn.select_db(db)
|
|
||||||
|
|
||||||
# create super table
|
# create super table
|
||||||
rowsAffected = conn.execute(
|
rowsAffected = conn.execute(
|
||||||
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
"CREATE TABLE IF NOT EXISTS power.meters (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
||||||
)
|
)
|
||||||
assert rowsAffected == 0
|
print(f"Create stable power.meters successfully, rowsAffected: {rowsAffected}");
|
||||||
|
|
||||||
# create table
|
|
||||||
rowsAffected = conn.execute("CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupid, location) TAGS(0, 'Los Angles')")
|
|
||||||
assert rowsAffected == 0
|
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to create db and table, db addrr:{host}:{port} err:{err}")
|
print(f"Failed to create db and table, db addr:{host}:{port} ; err:{err}")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -8,22 +8,18 @@ try:
|
||||||
password="taosdata",
|
password="taosdata",
|
||||||
timeout=30)
|
timeout=30)
|
||||||
|
|
||||||
db = "power"
|
|
||||||
# create database
|
# create database
|
||||||
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
|
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS power")
|
||||||
assert rowsAffected == 0
|
print(f"Create database power successfully, rowsAffected: {rowsAffected}");
|
||||||
|
|
||||||
# create super table
|
# create super table
|
||||||
rowsAffected = conn.execute(
|
rowsAffected = conn.execute(
|
||||||
f"CREATE TABLE IF NOT EXISTS `{db}`.`meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
f"CREATE TABLE IF NOT EXISTS power.meters (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
||||||
)
|
)
|
||||||
assert rowsAffected == 0
|
print(f"Create stable power.meters successfully, rowsAffected: {rowsAffected}");
|
||||||
# create table
|
|
||||||
rowsAffected = conn.execute(f"CREATE TABLE IF NOT EXISTS `{db}`.`d0` USING `{db}`.`meters` (groupid, location) TAGS(0, 'Los Angles')")
|
|
||||||
assert rowsAffected == 0
|
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to create db and table, url:{url} err:{err}")
|
print(f"Failed to create db and table, url:{url} ; err:{err}")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -9,27 +9,18 @@ try:
|
||||||
host=host,
|
host=host,
|
||||||
port=port)
|
port=port)
|
||||||
|
|
||||||
db = "power"
|
|
||||||
# create database
|
# create database
|
||||||
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
|
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS power")
|
||||||
assert rowsAffected == 0
|
print(f"Create database power successfully, rowsAffected: {rowsAffected}");
|
||||||
|
|
||||||
# change database.
|
|
||||||
rowsAffected = conn.execute(f"USE {db}")
|
|
||||||
assert rowsAffected == 0
|
|
||||||
|
|
||||||
# create super table
|
# create super table
|
||||||
rowsAffected = conn.execute(
|
rowsAffected = conn.execute(
|
||||||
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
"CREATE TABLE IF NOT EXISTS power.meters (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
||||||
)
|
)
|
||||||
assert rowsAffected == 0
|
print(f"Create stable power.meters successfully, rowsAffected: {rowsAffected}");
|
||||||
|
|
||||||
# create table
|
|
||||||
rowsAffected = conn.execute("CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupid, location) TAGS(0, 'Los Angles')")
|
|
||||||
assert rowsAffected == 0
|
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to create db and table, db addrr:{host}:{port} err:{err}")
|
print(f"Failed to create db and table, db addrr:{host}:{port} ; err:{err}")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -1,12 +1,13 @@
|
||||||
import taos
|
import taos
|
||||||
|
|
||||||
conn = None
|
conn = None
|
||||||
|
host = "localhost"
|
||||||
|
port = 6030
|
||||||
try:
|
try:
|
||||||
conn = taos.connect(user="root",
|
conn = taos.connect(host=host,
|
||||||
password="taosdata",
|
port=port,
|
||||||
host="localhost",
|
user="root",
|
||||||
port=6030)
|
password="taosdata")
|
||||||
|
|
||||||
sql = """
|
sql = """
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
|
@ -17,10 +18,10 @@ try:
|
||||||
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
|
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
|
||||||
"""
|
"""
|
||||||
affectedRows = conn.execute(sql)
|
affectedRows = conn.execute(sql)
|
||||||
print(f"inserted into {affectedRows} rows to power.meters successfully.")
|
print(f"Successfully inserted {affectedRows} rows to power.meters.")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(err)
|
print(f"Failed to insert data to power.meters, db addr:{host}:{port} ; err:{err}")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
import taosrest
|
import taosrest
|
||||||
|
|
||||||
conn = None
|
conn = None
|
||||||
|
url="http://localhost:6041"
|
||||||
try:
|
try:
|
||||||
conn = taosrest.connect(url="http://localhost:6041",
|
conn = taosrest.connect(url=url,
|
||||||
user="root",
|
user="root",
|
||||||
password="taosdata",
|
password="taosdata",
|
||||||
timeout=30)
|
timeout=30)
|
||||||
|
@ -17,10 +17,10 @@ try:
|
||||||
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
|
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
|
||||||
"""
|
"""
|
||||||
affectedRows = conn.execute(sql)
|
affectedRows = conn.execute(sql)
|
||||||
print(f"inserted into {affectedRows} rows to power.meters successfully.")
|
print(f"Successfully inserted {affectedRows} rows to power.meters.")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(err)
|
print(f"Failed to insert data to power.meters, url:{url} ; err:{err}")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -1,12 +1,13 @@
|
||||||
import taosws
|
import taosws
|
||||||
|
|
||||||
conn = None
|
conn = None
|
||||||
|
host="localhost"
|
||||||
|
port=6041
|
||||||
try:
|
try:
|
||||||
conn = taosws.connect(user="root",
|
conn = taosws.connect(user="root",
|
||||||
password="taosdata",
|
password="taosdata",
|
||||||
host="localhost",
|
host=host,
|
||||||
port=6041)
|
port=port)
|
||||||
|
|
||||||
sql = """
|
sql = """
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
|
@ -17,10 +18,10 @@ try:
|
||||||
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
|
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
|
||||||
"""
|
"""
|
||||||
affectedRows = conn.execute(sql)
|
affectedRows = conn.execute(sql)
|
||||||
print(f"inserted into {affectedRows} rows to power.meters successfully.")
|
print(f"Successfully inserted {affectedRows} rows to power.meters.")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(err)
|
print(f"Failed to insert data to power.meters, db addr:{host}:{port} ; err:{err}")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -1,26 +1,21 @@
|
||||||
import taos
|
import taos
|
||||||
|
host="localhost"
|
||||||
|
port=6030
|
||||||
conn = None
|
conn = None
|
||||||
try:
|
try:
|
||||||
conn = taos.connect(host="localhost",
|
conn = taos.connect(host=host,
|
||||||
port=6030,
|
port=port,
|
||||||
user="root",
|
user="root",
|
||||||
password="taosdata")
|
password="taosdata")
|
||||||
|
|
||||||
result = conn.query("SELECT ts, current, location FROM power.meters limit 100")
|
result = conn.query("SELECT ts, current, location FROM power.meters limit 100")
|
||||||
print(result)
|
|
||||||
# Get fields from result
|
|
||||||
fields = result.fields
|
|
||||||
for field in fields:
|
|
||||||
print(field)
|
|
||||||
|
|
||||||
# Get data from result as list of tuple
|
# Get data from result as list of tuple
|
||||||
data = result.fetch_all()
|
data = result.fetch_all()
|
||||||
for row in data:
|
for row in data:
|
||||||
print(row)
|
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to query data from power.meters, err:{err}")
|
print(f"Failed to query data from power.meters, db addr:{host}:{port} ; err:{err}")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
|
@ -1,15 +1,17 @@
|
||||||
import taosrest
|
import taosrest
|
||||||
|
|
||||||
client = None
|
client = None
|
||||||
|
url="http://localhost:6041"
|
||||||
try:
|
try:
|
||||||
client = taosrest.RestClient(url="http://localhost:6041",
|
client = taosrest.RestClient(url=url,
|
||||||
user="root",
|
user="root",
|
||||||
password="taosdata",
|
password="taosdata",
|
||||||
timeout=30)
|
timeout=30)
|
||||||
|
|
||||||
result = client.sql(f"SELECT ts, current, location FROM power.meters limit 100", 1)
|
result = client.sql(f"SELECT ts, current, location FROM power.meters limit 100")
|
||||||
print(result)
|
if result["data"]:
|
||||||
|
for row in result["data"]:
|
||||||
|
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to query data from power.meters, err:{err}")
|
print(f"Failed to query data from power.meters, url:{url} ; err:{err}")
|
||||||
|
|
|
@ -1,22 +1,20 @@
|
||||||
import taosws
|
import taosws
|
||||||
|
|
||||||
conn = None
|
conn = None
|
||||||
|
host="localhost"
|
||||||
|
port=6041
|
||||||
try:
|
try:
|
||||||
conn = taosws.connect(user="root",
|
conn = taosws.connect(user="root",
|
||||||
password="taosdata",
|
password="taosdata",
|
||||||
host="localhost",
|
host=host,
|
||||||
port=6041)
|
port=port)
|
||||||
|
|
||||||
result = conn.query("SELECT ts, current, location FROM power.meters limit 100")
|
result = conn.query("SELECT ts, current, location FROM power.meters limit 100")
|
||||||
num_of_fields = result.field_count
|
|
||||||
print(num_of_fields)
|
|
||||||
|
|
||||||
for row in result:
|
for row in result:
|
||||||
print(row)
|
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to query data from power.meters, err:{err}")
|
print(f"Failed to query data from power.meters, db addr:{host}:{port} ; err:{err}")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -1,25 +1,24 @@
|
||||||
import taos
|
import taos
|
||||||
|
|
||||||
conn = None
|
conn = None
|
||||||
|
reqId = 1
|
||||||
|
host="localhost"
|
||||||
|
port=6030
|
||||||
try:
|
try:
|
||||||
conn = taos.connect(host="localhost",
|
conn = taos.connect(host=host,
|
||||||
port=6030,
|
port=port,
|
||||||
user="root",
|
user="root",
|
||||||
password="taosdata")
|
password="taosdata")
|
||||||
|
|
||||||
result = conn.query("SELECT ts, current, location FROM power.meters limit 100", 1)
|
result = conn.query("SELECT ts, current, location FROM power.meters limit 100", reqId)
|
||||||
# Get fields from result
|
|
||||||
fields = result.fields
|
|
||||||
for field in fields:
|
|
||||||
print(field)
|
|
||||||
|
|
||||||
# Get data from result as list of tuple
|
# Get data from result as list of tuple
|
||||||
data = result.fetch_all()
|
data = result.fetch_all()
|
||||||
for row in data:
|
for row in data:
|
||||||
print(row)
|
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to execute sql with reqId, err:{err}")
|
print(f"Failed to execute sql with reqId:{reqId}, db addr:{host}:{port} ; err:{err}")
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
|
@ -1,15 +1,18 @@
|
||||||
import taosrest
|
import taosrest
|
||||||
|
|
||||||
client = None
|
client = None
|
||||||
|
url="http://localhost:6041"
|
||||||
|
reqId = 1
|
||||||
try:
|
try:
|
||||||
client = taosrest.RestClient(url="http://localhost:6041",
|
client = taosrest.RestClient(url=url,
|
||||||
user="root",
|
user="root",
|
||||||
password="taosdata",
|
password="taosdata",
|
||||||
timeout=30)
|
timeout=30)
|
||||||
|
|
||||||
result = client.sql(f"SELECT ts, current, location FROM power.meters limit 100", 1)
|
result = client.sql(f"SELECT ts, current, location FROM power.meters limit 100", reqId)
|
||||||
print(result)
|
if result["data"]:
|
||||||
|
for row in result["data"]:
|
||||||
|
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to execute sql with reqId, err:{err}")
|
print(f"Failed to execute sql with reqId:{reqId}, url:{url} ; err:{err}")
|
||||||
|
|
|
@ -1,22 +1,24 @@
|
||||||
import taosws
|
import taosws
|
||||||
|
|
||||||
conn = None
|
conn = None
|
||||||
|
reqId = 1
|
||||||
|
host="localhost"
|
||||||
|
port=6041
|
||||||
try:
|
try:
|
||||||
conn = taosws.connect(
|
conn = taosws.connect(
|
||||||
user="root",
|
user="root",
|
||||||
password="taosdata",
|
password="taosdata",
|
||||||
host="localhost",
|
host=host,
|
||||||
port=6041,
|
port=port,
|
||||||
)
|
)
|
||||||
|
|
||||||
result = conn.query_with_req_id("SELECT ts, current, location FROM power.meters limit 100", req_id=1)
|
result = conn.query_with_req_id("SELECT ts, current, location FROM power.meters limit 100", req_id=1)
|
||||||
|
# Get data from result as list of tuple
|
||||||
for row in result:
|
for row in result:
|
||||||
print(row)
|
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to execute sql with reqId, err:{err}")
|
print(f"Failed to execute sql with reqId:{reqId}, db addr:{host}:{port} ; err:{err}")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
|
@ -9,13 +9,14 @@ telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"]
|
||||||
jsonDemo = [
|
jsonDemo = [
|
||||||
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
|
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
|
||||||
]
|
]
|
||||||
|
host = "localhost"
|
||||||
|
port = 6030
|
||||||
try:
|
try:
|
||||||
conn = taos.connect(
|
conn = taos.connect(
|
||||||
host="localhost",
|
|
||||||
user="root",
|
user="root",
|
||||||
password="taosdata",
|
password="taosdata",
|
||||||
port=6030
|
host=host,
|
||||||
|
port=port
|
||||||
)
|
)
|
||||||
|
|
||||||
conn.execute("CREATE DATABASE IF NOT EXISTS power")
|
conn.execute("CREATE DATABASE IF NOT EXISTS power")
|
||||||
|
@ -31,8 +32,9 @@ try:
|
||||||
conn.schemaless_insert(
|
conn.schemaless_insert(
|
||||||
jsonDemo, taos.SmlProtocol.JSON_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
|
jsonDemo, taos.SmlProtocol.JSON_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
|
||||||
)
|
)
|
||||||
|
print("Inserted data with schemaless successfully.");
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to insert data with schemaless, err:{err}")
|
print(f"Failed to insert data with schemaless, addr: {host}:{port} err:{err}")
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
import taosws
|
import taosws
|
||||||
|
|
||||||
db = "power"
|
|
||||||
def prepare():
|
def prepare():
|
||||||
conn = None
|
conn = None
|
||||||
try:
|
try:
|
||||||
|
@ -10,11 +9,12 @@ def prepare():
|
||||||
port=6041)
|
port=6041)
|
||||||
|
|
||||||
# create database
|
# create database
|
||||||
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
|
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS power")
|
||||||
assert rowsAffected == 0
|
assert rowsAffected == 0
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to create db and table, err:{err}")
|
print(f"Failed to create db and table, err:{err}")
|
||||||
|
raise err
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
@ -32,13 +32,14 @@ def schemaless_insert():
|
||||||
jsonDemo = [
|
jsonDemo = [
|
||||||
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
|
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
|
||||||
]
|
]
|
||||||
|
host = "localhost"
|
||||||
|
port = 6041
|
||||||
try:
|
try:
|
||||||
conn = taosws.connect(user="root",
|
conn = taosws.connect(user="root",
|
||||||
password="taosdata",
|
password="taosdata",
|
||||||
host="localhost",
|
host=host,
|
||||||
port=6041,
|
port=port,
|
||||||
database=db)
|
database='power')
|
||||||
|
|
||||||
conn.schemaless_insert(
|
conn.schemaless_insert(
|
||||||
lines = lineDemo,
|
lines = lineDemo,
|
||||||
|
@ -63,10 +64,18 @@ def schemaless_insert():
|
||||||
ttl=1,
|
ttl=1,
|
||||||
req_id=3,
|
req_id=3,
|
||||||
)
|
)
|
||||||
|
print("Inserted data with schemaless successfully.");
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to insert data with schemaless, err:{err}")
|
print(f"Failed to insert data with schemaless, addr: {host}:{port} err:{err}")
|
||||||
|
raise err
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if conn:
|
if conn:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
prepare()
|
||||||
|
schemaless_insert
|
||||||
|
except Exception as err:
|
||||||
|
print(f"Failed to insert data with schemaless, err:{err}")
|
|
@ -7,13 +7,14 @@ numOfRow = 10
|
||||||
|
|
||||||
conn = None
|
conn = None
|
||||||
stmt = None
|
stmt = None
|
||||||
|
host="localhost",
|
||||||
|
port=6030,
|
||||||
try:
|
try:
|
||||||
conn = taos.connect(
|
conn = taos.connect(
|
||||||
host="localhost",
|
|
||||||
user="root",
|
user="root",
|
||||||
password="taosdata",
|
password="taosdata",
|
||||||
port=6030,
|
host=host,
|
||||||
|
port=port,
|
||||||
)
|
)
|
||||||
|
|
||||||
conn.execute("CREATE DATABASE IF NOT EXISTS power")
|
conn.execute("CREATE DATABASE IF NOT EXISTS power")
|
||||||
|
@ -52,10 +53,10 @@ try:
|
||||||
params[3].float(phases)
|
params[3].float(phases)
|
||||||
stmt.bind_param_batch(params)
|
stmt.bind_param_batch(params)
|
||||||
stmt.execute()
|
stmt.execute()
|
||||||
print(f"stmt insert successfully.")
|
print(f"Successfully inserted to power.meters.")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to insert to table meters using stmt, error: {err}")
|
print(f"Failed to insert to table meters using stmt, addr:{host}:{port} ; err:{err}")
|
||||||
finally:
|
finally:
|
||||||
if stmt:
|
if stmt:
|
||||||
stmt.close()
|
stmt.close()
|
||||||
|
|
|
@ -8,6 +8,8 @@ numOfRow = 10
|
||||||
|
|
||||||
conn = None
|
conn = None
|
||||||
stmt = None
|
stmt = None
|
||||||
|
host="localhost"
|
||||||
|
port=6041
|
||||||
try:
|
try:
|
||||||
conn = taosws.connect(user="root",
|
conn = taosws.connect(user="root",
|
||||||
password="taosdata",
|
password="taosdata",
|
||||||
|
@ -56,10 +58,10 @@ try:
|
||||||
stmt.add_batch()
|
stmt.add_batch()
|
||||||
stmt.execute()
|
stmt.execute()
|
||||||
|
|
||||||
print(f"stmt insert successfully.")
|
print(f"Successfully inserted to power.meters.")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to insert to table meters using stmt, error: {err}")
|
print(f"Failed to insert to table meters using stmt, addr:{host}:{port} ; err:{err}")
|
||||||
finally:
|
finally:
|
||||||
if stmt:
|
if stmt:
|
||||||
stmt.close()
|
stmt.close()
|
||||||
|
|
|
@ -49,23 +49,28 @@ def prepareMeta():
|
||||||
from taos.tmq import Consumer
|
from taos.tmq import Consumer
|
||||||
|
|
||||||
def create_consumer():
|
def create_consumer():
|
||||||
|
host = "localhost"
|
||||||
|
port = 6030
|
||||||
|
groupId = "group1"
|
||||||
|
clientId = "1"
|
||||||
try:
|
try:
|
||||||
consumer = Consumer(
|
consumer = Consumer(
|
||||||
{
|
{
|
||||||
"group.id": "group1",
|
"group.id": groupId,
|
||||||
"client.id": "1",
|
"client.id": clientId,
|
||||||
"td.connect.user": "root",
|
"td.connect.user": "root",
|
||||||
"td.connect.pass": "taosdata",
|
"td.connect.pass": "taosdata",
|
||||||
"enable.auto.commit": "true",
|
"enable.auto.commit": "true",
|
||||||
"auto.commit.interval.ms": "1000",
|
"auto.commit.interval.ms": "1000",
|
||||||
"auto.offset.reset": "latest",
|
"auto.offset.reset": "latest",
|
||||||
"td.connect.ip": "localhost",
|
"td.connect.ip": host,
|
||||||
"td.connect.port": "6030",
|
"td.connect.port": port,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}");
|
||||||
return consumer
|
return consumer
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to poll data, err:{err}")
|
print(f"Failed to create websocket consumer, host: {host}:{port} ; err:{err}");
|
||||||
raise err
|
raise err
|
||||||
# ANCHOR_END: create_consumer
|
# ANCHOR_END: create_consumer
|
||||||
|
|
||||||
|
@ -87,7 +92,8 @@ def subscribe(consumer):
|
||||||
val = records.value()
|
val = records.value()
|
||||||
if val:
|
if val:
|
||||||
for block in val:
|
for block in val:
|
||||||
print(block.fetchall())
|
data = block.fetchall()
|
||||||
|
print(f"data: {data}")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to poll data, err:{err}")
|
print(f"Failed to poll data, err:{err}")
|
||||||
|
@ -114,6 +120,7 @@ def commit_offset(consumer):
|
||||||
|
|
||||||
# after processing the data, commit the offset manually
|
# after processing the data, commit the offset manually
|
||||||
consumer.commit(records)
|
consumer.commit(records)
|
||||||
|
print("commit offset manually successfully.");
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to poll data, err:{err}")
|
print(f"Failed to poll data, err:{err}")
|
||||||
|
@ -141,6 +148,7 @@ def seek_offset(consumer):
|
||||||
def unsubscribe(consumer):
|
def unsubscribe(consumer):
|
||||||
try:
|
try:
|
||||||
consumer.unsubscribe()
|
consumer.unsubscribe()
|
||||||
|
print("unsubscribe consumer successfully.");
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to unsubscribe consumer. err:{err}")
|
print(f"Failed to unsubscribe consumer. err:{err}")
|
||||||
|
|
||||||
|
|
|
@ -60,20 +60,25 @@ def prepareMeta():
|
||||||
|
|
||||||
# ANCHOR: create_consumer
|
# ANCHOR: create_consumer
|
||||||
def create_consumer():
|
def create_consumer():
|
||||||
|
host = "localhost"
|
||||||
|
port = 6041
|
||||||
|
groupId = "group1"
|
||||||
|
clientId = "1"
|
||||||
try:
|
try:
|
||||||
consumer = taosws.Consumer(conf={
|
consumer = taosws.Consumer(conf={
|
||||||
"td.connect.websocket.scheme": "ws",
|
"td.connect.websocket.scheme": "ws",
|
||||||
"group.id": "group1",
|
"group.id": groupId,
|
||||||
"client.id": "1",
|
"client.id": clientId,
|
||||||
"auto.offset.reset": "latest",
|
"auto.offset.reset": "latest",
|
||||||
"td.connect.ip": "localhost",
|
"td.connect.ip": host,
|
||||||
"td.connect.port": "6041",
|
"td.connect.port": port,
|
||||||
"enable.auto.commit": "true",
|
"enable.auto.commit": "true",
|
||||||
"auto.commit.interval.ms": "1000",
|
"auto.commit.interval.ms": "1000",
|
||||||
})
|
})
|
||||||
|
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}");
|
||||||
return consumer;
|
return consumer;
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to create websocket consumer, err:{err}");
|
print(f"Failed to create websocket consumer, host: {host}:{port} ; err:{err}");
|
||||||
raise err
|
raise err
|
||||||
|
|
||||||
|
|
||||||
|
@ -108,7 +113,7 @@ def subscribe(consumer):
|
||||||
if records:
|
if records:
|
||||||
for block in records:
|
for block in records:
|
||||||
for row in block:
|
for row in block:
|
||||||
print(row)
|
print(f"data: {row}")
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to poll data, err:{err}")
|
print(f"Failed to poll data, err:{err}")
|
||||||
|
@ -125,10 +130,11 @@ def commit_offset(consumer):
|
||||||
if records:
|
if records:
|
||||||
for block in records:
|
for block in records:
|
||||||
for row in block:
|
for row in block:
|
||||||
print(row)
|
print(f"data: {row}")
|
||||||
|
|
||||||
# after processing the data, commit the offset manually
|
# after processing the data, commit the offset manually
|
||||||
consumer.commit(records)
|
consumer.commit(records)
|
||||||
|
print("commit offset manually successfully.");
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print(f"Failed to poll data, err:{err}")
|
print(f"Failed to poll data, err:{err}")
|
||||||
|
@ -141,6 +147,7 @@ def commit_offset(consumer):
|
||||||
def unsubscribe(consumer):
|
def unsubscribe(consumer):
|
||||||
try:
|
try:
|
||||||
consumer.unsubscribe()
|
consumer.unsubscribe()
|
||||||
|
print("unsubscribe consumer successfully.");
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print("Failed to unsubscribe consumer. err:{err}")
|
print("Failed to unsubscribe consumer. err:{err}")
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue