Merge branch 'docs/sheyj-3.0' of github.com:taosdata/TDengine into docs/sheyj-3.0
This commit is contained in:
commit
27c44318b6
|
@ -5,9 +5,9 @@ namespace TDengineExample
|
|||
{
|
||||
internal class OptsJsonExample
|
||||
{
|
||||
// ANCHOR: main
|
||||
public static void Main(string[] args)
|
||||
{
|
||||
// ANCHOR: main
|
||||
var host = "127.0.0.1";
|
||||
|
||||
var lineDemo =
|
||||
|
@ -38,20 +38,22 @@ namespace TDengineExample
|
|||
client.SchemalessInsert(new []{jsonDemo}, TDengineSchemalessProtocol.TSDB_SML_JSON_PROTOCOL,
|
||||
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NOT_CONFIGURED, 0, ReqId.GetReqId());
|
||||
}
|
||||
|
||||
Console.WriteLine("Inserted data with schemaless successfully.");
|
||||
}
|
||||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to insert data with schemaless; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("Failed to insert data with schemaless, host:" + host + "; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to insert data with schemaless; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to insert data with schemaless, host:" + host + "; ErrMessage: " + e.Message);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: main
|
||||
}
|
||||
}
|
||||
}
|
|
@ -12,9 +12,10 @@ namespace TDengineExample
|
|||
var numOfSubTable = 10;
|
||||
var numOfRow = 10;
|
||||
var random = new Random();
|
||||
var connectionString = $"host={host};port=6030;username=root;password=taosdata";
|
||||
try
|
||||
{
|
||||
var builder = new ConnectionStringBuilder($"host={host};port=6030;username=root;password=taosdata");
|
||||
var builder = new ConnectionStringBuilder(connectionString);
|
||||
using (var client = DbDriver.Open(builder))
|
||||
{
|
||||
// create database
|
||||
|
@ -53,7 +54,7 @@ namespace TDengineExample
|
|||
stmt.Exec();
|
||||
// get affected rows
|
||||
var affectedRows = stmt.Affected();
|
||||
Console.WriteLine($"table {tableName} insert {affectedRows} rows.");
|
||||
Console.WriteLine($"Successfully inserted {affectedRows} rows to {tableName}.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -61,13 +62,13 @@ namespace TDengineExample
|
|||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to insert to table meters using stmt; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("Failed to insert to table meters using stmt, url: " + connectionString + "; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to insert to table meters using stmt; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to insert to table meters using stmt, url: " + connectionString + "; ErrMessage: " + e.Message);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,6 +64,9 @@ namespace TMQExample
|
|||
{
|
||||
// ANCHOR: create_consumer
|
||||
// consumer config
|
||||
var host = "127.0.0.1";
|
||||
var groupId = "group1";
|
||||
var clientId = "client1";
|
||||
var cfg = new Dictionary<string, string>()
|
||||
{
|
||||
{ "td.connect.port", "6030" },
|
||||
|
@ -71,9 +74,9 @@ namespace TMQExample
|
|||
{ "msg.with.table.name", "true" },
|
||||
{ "enable.auto.commit", "true" },
|
||||
{ "auto.commit.interval.ms", "1000" },
|
||||
{ "group.id", "group1" },
|
||||
{ "client.id", "client1" },
|
||||
{ "td.connect.ip", "127.0.0.1" },
|
||||
{ "group.id", groupId },
|
||||
{ "client.id", clientId },
|
||||
{ "td.connect.ip", host },
|
||||
{ "td.connect.user", "root" },
|
||||
{ "td.connect.pass", "taosdata" },
|
||||
};
|
||||
|
@ -82,17 +85,20 @@ namespace TMQExample
|
|||
{
|
||||
// create consumer
|
||||
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
|
||||
Console.WriteLine("Create consumer successfully, host: " + host + ", groupId: " + groupId +
|
||||
", clientId: " + clientId);
|
||||
}
|
||||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to create consumer; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("Failed to create native consumer, host : " + host + "; ErrCode:" + e.Code +
|
||||
"; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to create consumer; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to create native consumer, host : " + host + "; ErrMessage: " + e.Message);
|
||||
throw;
|
||||
}
|
||||
|
||||
|
@ -107,6 +113,7 @@ namespace TMQExample
|
|||
{
|
||||
// subscribe
|
||||
consumer.Subscribe(new List<string>() { "topic_meters" });
|
||||
Console.WriteLine("subscribe topics successfully");
|
||||
for (int i = 0; i < 50; i++)
|
||||
{
|
||||
// consume message with using block to ensure the result is disposed
|
||||
|
@ -117,7 +124,7 @@ namespace TMQExample
|
|||
{
|
||||
// handle message
|
||||
Console.WriteLine(
|
||||
$"data {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
|
||||
$"data: {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
|
||||
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
|
||||
}
|
||||
}
|
||||
|
@ -132,7 +139,7 @@ namespace TMQExample
|
|||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to poll data; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to poll data; ErrMessage:" + e.Message);
|
||||
throw;
|
||||
}
|
||||
// ANCHOR_END: subscribe
|
||||
|
@ -145,6 +152,7 @@ namespace TMQExample
|
|||
{
|
||||
// get assignment
|
||||
var assignment = consumer.Assignment;
|
||||
Console.WriteLine($"now assignment: ${assignment}");
|
||||
// seek to the beginning
|
||||
foreach (var topicPartition in assignment)
|
||||
{
|
||||
|
@ -172,13 +180,13 @@ namespace TMQExample
|
|||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to seek; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("seek example failed; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to seek; Err:" + e.Message);
|
||||
Console.WriteLine("seek example failed; ErrMessage: " + e.Message);
|
||||
throw;
|
||||
}
|
||||
// ANCHOR_END: seek
|
||||
|
@ -200,18 +208,19 @@ namespace TMQExample
|
|||
{
|
||||
cr.TopicPartitionOffset,
|
||||
});
|
||||
Console.WriteLine("commit offset manually successfully.");
|
||||
}
|
||||
}
|
||||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to commit offset; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("Failed to execute consumer functions. ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to commit offset; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to execute consumer functions. ErrMessage:" + e.Message);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
@ -229,19 +238,20 @@ namespace TMQExample
|
|||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to unsubscribe consumer; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("Failed to unsubscribe consumer. ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to unsubscribe consumer; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to unsubscribe consumer. Err: " + e.Message);
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
// close consumer
|
||||
consumer.Close();
|
||||
Console.WriteLine("consumer closed successfully.");
|
||||
}
|
||||
// ANCHOR_END: close
|
||||
}
|
||||
|
|
|
@ -11,13 +11,15 @@ namespace Examples
|
|||
{
|
||||
try
|
||||
{
|
||||
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=127.0.0.1;port=6041;useSSL=false;username=root;password=taosdata");
|
||||
var connectionString =
|
||||
"protocol=WebSocket;host=127.0.0.1;port=6041;useSSL=false;username=root;password=taosdata";
|
||||
var builder = new ConnectionStringBuilder(connectionString);
|
||||
using (var client = DbDriver.Open(builder))
|
||||
{
|
||||
CreateDatabaseAndTable(client);
|
||||
InsertData(client);
|
||||
QueryData(client);
|
||||
QueryWithReqId(client);
|
||||
CreateDatabaseAndTable(client,connectionString);
|
||||
InsertData(client,connectionString);
|
||||
QueryData(client,connectionString);
|
||||
QueryWithReqId(client,connectionString);
|
||||
}
|
||||
}
|
||||
catch (TDengineError e)
|
||||
|
@ -34,40 +36,40 @@ namespace Examples
|
|||
}
|
||||
}
|
||||
|
||||
private static void CreateDatabaseAndTable(ITDengineClient client)
|
||||
private static void CreateDatabaseAndTable(ITDengineClient client, string connectionString)
|
||||
{
|
||||
// ANCHOR: create_db_and_table
|
||||
try
|
||||
{
|
||||
// create database
|
||||
var affected = client.Exec("CREATE DATABASE IF NOT EXISTS power");
|
||||
Console.WriteLine($"Create database power, affected rows: {affected}");
|
||||
Console.WriteLine($"Create database power successfully, rowsAffected: {affected}");
|
||||
// create table
|
||||
affected = client.Exec(
|
||||
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
|
||||
Console.WriteLine($"Create table meters, affected rows: {affected}");
|
||||
Console.WriteLine($"Create stable power.meters successfully, rowsAffected: {affected}");
|
||||
}
|
||||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to create db and table; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("Failed to create db and table,url:" + connectionString +"; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to create db and table; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to create db and table, url:" + connectionString + "; ErrMessage: " + e.Message);
|
||||
throw;
|
||||
}
|
||||
// ANCHOR_END: create_db_and_table
|
||||
}
|
||||
|
||||
private static void InsertData(ITDengineClient client)
|
||||
private static void InsertData(ITDengineClient client,string connectionString)
|
||||
{
|
||||
// ANCHOR: insert_data
|
||||
try
|
||||
{
|
||||
// insert data
|
||||
// insert data, please make sure the database and table are created before
|
||||
var insertQuery = "INSERT INTO " +
|
||||
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
|
||||
"VALUES " +
|
||||
|
@ -78,29 +80,29 @@ namespace Examples
|
|||
"VALUES " +
|
||||
"(NOW + 1a, 10.30000, 218, 0.25000) ";
|
||||
var affectedRows = client.Exec(insertQuery);
|
||||
Console.WriteLine("insert " + affectedRows + " rows to power.meters successfully.");
|
||||
Console.WriteLine("Successfully inserted " + affectedRows + " rows to power.meters.");
|
||||
}
|
||||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to insert data to power.meters; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("Failed to insert data to power.meters, url:" + connectionString + "; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to insert data to power.meters; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to insert data to power.meters, url:" + connectionString + "; ErrMessage: " + e.Message);
|
||||
throw;
|
||||
}
|
||||
// ANCHOR_END: insert_data
|
||||
}
|
||||
|
||||
private static void QueryData(ITDengineClient client)
|
||||
private static void QueryData(ITDengineClient client,string connectionString)
|
||||
{
|
||||
// ANCHOR: select_data
|
||||
try
|
||||
{
|
||||
// query data
|
||||
// query data, make sure the database and table are created before
|
||||
var query = "SELECT ts, current, location FROM power.meters limit 100";
|
||||
using (var rows = client.Query(query))
|
||||
{
|
||||
|
@ -117,27 +119,28 @@ namespace Examples
|
|||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to query data from power.meters; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("Failed to query data from power.meters, url:" + connectionString + "; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to query data from power.meters; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to query data from power.meters, url:" + connectionString + "; ErrMessage: " + e.Message);
|
||||
throw;
|
||||
}
|
||||
// ANCHOR_END: select_data
|
||||
}
|
||||
|
||||
private static void QueryWithReqId(ITDengineClient client)
|
||||
private static void QueryWithReqId(ITDengineClient client,string connectionString)
|
||||
{
|
||||
// ANCHOR: query_id
|
||||
var reqId = (long)3;
|
||||
try
|
||||
{
|
||||
// query data
|
||||
var query = "SELECT ts, current, location FROM power.meters limit 1";
|
||||
// query with request id 3
|
||||
using (var rows = client.Query(query,3))
|
||||
using (var rows = client.Query(query,reqId))
|
||||
{
|
||||
while (rows.Read())
|
||||
{
|
||||
|
@ -152,13 +155,13 @@ namespace Examples
|
|||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to execute sql with reqId; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", url:" + connectionString + "; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to execute sql with reqId; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", url:" + connectionString + "; ErrMessage: " + e.Message);
|
||||
throw;
|
||||
}
|
||||
// ANCHOR_END: query_id
|
||||
|
|
|
@ -13,9 +13,10 @@ namespace Examples
|
|||
var numOfSubTable = 10;
|
||||
var numOfRow = 10;
|
||||
var random = new Random();
|
||||
var connectionString = $"protocol=WebSocket;host={host};port=6041;useSSL=false;username=root;password=taosdata";
|
||||
try
|
||||
{
|
||||
var builder = new ConnectionStringBuilder($"protocol=WebSocket;host={host};port=6041;useSSL=false;username=root;password=taosdata");
|
||||
var builder = new ConnectionStringBuilder(connectionString);
|
||||
using (var client = DbDriver.Open(builder))
|
||||
{
|
||||
// create database
|
||||
|
@ -54,7 +55,7 @@ namespace Examples
|
|||
stmt.Exec();
|
||||
// get affected rows
|
||||
var affectedRows = stmt.Affected();
|
||||
Console.WriteLine($"table {tableName} insert {affectedRows} rows.");
|
||||
Console.WriteLine($"Successfully inserted {affectedRows} rows to {tableName}.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -62,13 +63,13 @@ namespace Examples
|
|||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to insert to table meters using stmt; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("Failed to insert to table meters using stmt, url: " + connectionString + "; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to insert to table meters using stmt; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to insert to table meters using stmt, url: " + connectionString + "; ErrMessage: " + e.Message);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,17 +38,20 @@ namespace TDengineExample
|
|||
client.SchemalessInsert(new[] { jsonDemo }, TDengineSchemalessProtocol.TSDB_SML_JSON_PROTOCOL,
|
||||
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NOT_CONFIGURED, 0, ReqId.GetReqId());
|
||||
}
|
||||
|
||||
Console.WriteLine("Inserted data with schemaless successfully.");
|
||||
}
|
||||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to insert data with schemaless; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("Failed to insert data with schemaless, host:" + host + "; ErrCode:" + e.Code +
|
||||
"; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to insert data with schemaless; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to insert data with schemaless, host:" + host + "; ErrMessage: " + e.Message);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,7 +10,9 @@ namespace TMQExample
|
|||
{
|
||||
try
|
||||
{
|
||||
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=127.0.0.1;port=6041;username=root;password=taosdata");
|
||||
var builder =
|
||||
new ConnectionStringBuilder(
|
||||
"protocol=WebSocket;host=127.0.0.1;port=6041;username=root;password=taosdata");
|
||||
using (var client = DbDriver.Open(builder))
|
||||
{
|
||||
client.Exec("CREATE DATABASE IF NOT EXISTS power");
|
||||
|
@ -48,7 +50,9 @@ namespace TMQExample
|
|||
|
||||
static void InsertData()
|
||||
{
|
||||
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=127.0.0.1;port=6041;username=root;password=taosdata");
|
||||
var builder =
|
||||
new ConnectionStringBuilder(
|
||||
"protocol=WebSocket;host=127.0.0.1;port=6041;username=root;password=taosdata");
|
||||
using (var client = DbDriver.Open(builder))
|
||||
{
|
||||
while (true)
|
||||
|
@ -64,6 +68,9 @@ namespace TMQExample
|
|||
{
|
||||
// ANCHOR: create_consumer
|
||||
// consumer config
|
||||
var host = "127.0.0.1";
|
||||
var groupId = "group1";
|
||||
var clientId = "client1";
|
||||
var cfg = new Dictionary<string, string>()
|
||||
{
|
||||
{ "td.connect.type", "WebSocket" },
|
||||
|
@ -72,9 +79,9 @@ namespace TMQExample
|
|||
{ "msg.with.table.name", "true" },
|
||||
{ "enable.auto.commit", "true" },
|
||||
{ "auto.commit.interval.ms", "1000" },
|
||||
{ "group.id", "group1" },
|
||||
{ "client.id", "client1" },
|
||||
{ "td.connect.ip", "127.0.0.1" },
|
||||
{ "group.id", groupId },
|
||||
{ "client.id", clientId },
|
||||
{ "td.connect.ip", host },
|
||||
{ "td.connect.user", "root" },
|
||||
{ "td.connect.pass", "taosdata" },
|
||||
};
|
||||
|
@ -83,17 +90,20 @@ namespace TMQExample
|
|||
{
|
||||
// create consumer
|
||||
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
|
||||
Console.WriteLine("Create consumer successfully, host: " + host + ", groupId: " + groupId +
|
||||
", clientId: " + clientId);
|
||||
}
|
||||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to create consumer; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("Failed to create websocket consumer, host : " + host + "; ErrCode:" + e.Code +
|
||||
"; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to create consumer; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to create websocket consumer, host : " + host + "; ErrMessage: " + e.Message);
|
||||
throw;
|
||||
}
|
||||
|
||||
|
@ -108,6 +118,7 @@ namespace TMQExample
|
|||
{
|
||||
// subscribe
|
||||
consumer.Subscribe(new List<string>() { "topic_meters" });
|
||||
Console.WriteLine("subscribe topics successfully");
|
||||
for (int i = 0; i < 50; i++)
|
||||
{
|
||||
// consume message with using block to ensure the result is disposed
|
||||
|
@ -118,7 +129,7 @@ namespace TMQExample
|
|||
{
|
||||
// handle message
|
||||
Console.WriteLine(
|
||||
$"data {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
|
||||
$"data: {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
|
||||
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
|
||||
}
|
||||
}
|
||||
|
@ -133,7 +144,7 @@ namespace TMQExample
|
|||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to poll data; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to poll data; ErrMessage:" + e.Message);
|
||||
throw;
|
||||
}
|
||||
// ANCHOR_END: subscribe
|
||||
|
@ -146,6 +157,7 @@ namespace TMQExample
|
|||
{
|
||||
// get assignment
|
||||
var assignment = consumer.Assignment;
|
||||
Console.WriteLine($"now assignment: ${assignment}");
|
||||
// seek to the beginning
|
||||
foreach (var topicPartition in assignment)
|
||||
{
|
||||
|
@ -166,6 +178,7 @@ namespace TMQExample
|
|||
$"second data polled: {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
|
||||
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -173,13 +186,13 @@ namespace TMQExample
|
|||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to seek; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("seek example failed; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to seek; Err:" + e.Message);
|
||||
Console.WriteLine("seek example failed; ErrMessage: " + e.Message);
|
||||
throw;
|
||||
}
|
||||
// ANCHOR_END: seek
|
||||
|
@ -201,18 +214,19 @@ namespace TMQExample
|
|||
{
|
||||
cr.TopicPartitionOffset,
|
||||
});
|
||||
Console.WriteLine("commit offset manually successfully.");
|
||||
}
|
||||
}
|
||||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to commit offset; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("Failed to execute consumer functions. ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to commit offset; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to execute consumer functions. ErrMessage:" + e.Message);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
@ -230,19 +244,20 @@ namespace TMQExample
|
|||
catch (TDengineError e)
|
||||
{
|
||||
// handle TDengine error
|
||||
Console.WriteLine("Failed to unsubscribe consumer; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
Console.WriteLine("Failed to unsubscribe consumer. ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
|
||||
throw;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
// handle other exceptions
|
||||
Console.WriteLine("Failed to unsubscribe consumer; Err:" + e.Message);
|
||||
Console.WriteLine("Failed to unsubscribe consumer. Err: " + e.Message);
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
// close consumer
|
||||
consumer.Close();
|
||||
Console.WriteLine("consumer closed successfully.");
|
||||
}
|
||||
// ANCHOR_END: close
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@ func main() {
|
|||
var taosDSN = "root:taosdata@tcp(localhost:6030)/"
|
||||
taos, err := sql.Open("taosSql", taosDSN)
|
||||
if err != nil {
|
||||
log.Fatalln("failed to connect TDengine, err:", err)
|
||||
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
fmt.Println("Connected to " + taosDSN + " successfully.")
|
||||
defer taos.Close()
|
||||
|
|
|
@ -15,7 +15,7 @@ func main() {
|
|||
var taosDSN = "root:taosdata@http(localhost:6041)/"
|
||||
taos, err := sql.Open("taosRestful", taosDSN)
|
||||
if err != nil {
|
||||
log.Fatalln("failed to connect TDengine, err:", err)
|
||||
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
fmt.Println("Connected to " + taosDSN + " successfully.")
|
||||
defer taos.Close()
|
||||
|
|
|
@ -15,7 +15,7 @@ func main() {
|
|||
var taosDSN = "root:taosdata@ws(localhost:6041)/"
|
||||
taos, err := sql.Open("taosWS", taosDSN)
|
||||
if err != nil {
|
||||
log.Fatalln("failed to connect TDengine, err:", err)
|
||||
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
fmt.Println("Connected to " + taosDSN + " successfully.")
|
||||
defer taos.Close()
|
||||
|
|
|
@ -11,19 +11,21 @@ import (
|
|||
)
|
||||
|
||||
func main() {
|
||||
db, err := sql.Open("taosSql", "root:taosdata@tcp(localhost:6030)/")
|
||||
taosDSN := "root:taosdata@tcp(localhost:6030)/"
|
||||
db, err := sql.Open("taosSql", taosDSN)
|
||||
if err != nil {
|
||||
log.Fatal("Open database error: ", err)
|
||||
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
defer db.Close()
|
||||
initEnv(db)
|
||||
// ANCHOR: query_id
|
||||
// use context to set request id
|
||||
ctx := context.WithValue(context.Background(), "taos_req_id", int64(3))
|
||||
reqId := int64(3)
|
||||
ctx := context.WithValue(context.Background(), "taos_req_id", reqId)
|
||||
// execute query with context
|
||||
rows, err := db.QueryContext(ctx, "SELECT ts, current, location FROM power.meters limit 1")
|
||||
if err != nil {
|
||||
log.Fatal("Query error: ", err)
|
||||
log.Fatalf("Failed to execute sql with reqId: %d, url: %s; ErrMessage: %s\n", reqId, taosDSN, err.Error())
|
||||
}
|
||||
for rows.Next() {
|
||||
var (
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/taosdata/driver-go/v3/af"
|
||||
|
@ -14,30 +15,31 @@ func main() {
|
|||
|
||||
conn, err := af.Open(host, "root", "taosdata", "", 0)
|
||||
if err != nil {
|
||||
log.Fatal("failed to connect TDengine, err:", err)
|
||||
log.Fatalln("Failed to connect to host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
defer conn.Close()
|
||||
_, err = conn.Exec("CREATE DATABASE IF NOT EXISTS power")
|
||||
if err != nil {
|
||||
log.Fatal("failed to create database, err:", err)
|
||||
log.Fatalln("Failed to create db host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
_, err = conn.Exec("USE power")
|
||||
if err != nil {
|
||||
log.Fatal("failed to use database, err:", err)
|
||||
log.Fatalln("Failed to use db host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// insert influxdb line protocol
|
||||
err = conn.InfluxDBInsertLines([]string{lineDemo}, "ms")
|
||||
if err != nil {
|
||||
log.Fatal("failed to insert influxdb line protocol, err:", err)
|
||||
log.Fatalln("Failed to insert data with schemaless, host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// insert opentsdb telnet protocol
|
||||
err = conn.OpenTSDBInsertTelnetLines([]string{telnetDemo})
|
||||
if err != nil {
|
||||
log.Fatal("failed to insert opentsdb telnet line protocol, err:", err)
|
||||
log.Fatalln("Failed to insert data with schemaless, host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// insert opentsdb json protocol
|
||||
err = conn.OpenTSDBInsertJsonPayload(jsonDemo)
|
||||
if err != nil {
|
||||
log.Fatal("failed to insert opentsdb json format protocol, err:", err)
|
||||
log.Fatalln("Failed to insert data with schemaless, host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
fmt.Println("Inserted data with schemaless successfully.")
|
||||
}
|
||||
|
|
|
@ -17,14 +17,15 @@ func main() {
|
|||
telnetDemo := "metric_telnet 1707095283260 4 host=host0 interface=eth0"
|
||||
jsonDemo := "{\"metric\": \"metric_json\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"
|
||||
|
||||
db, err := sql.Open("taosWS", fmt.Sprintf("root:taosdata@ws(%s:6041)/", host))
|
||||
taosDSN := fmt.Sprintf("root:taosdata@ws(%s:6041)/", host)
|
||||
db, err := sql.Open("taosWS", taosDSN)
|
||||
if err != nil {
|
||||
log.Fatal("failed to connect TDengine, err:", err)
|
||||
log.Fatalln("Failed to connect to host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
defer db.Close()
|
||||
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS power")
|
||||
if err != nil {
|
||||
log.Fatal("failed to create database, err:", err)
|
||||
log.Fatalln("Failed to create db host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
s, err := schemaless.NewSchemaless(schemaless.NewConfig("ws://localhost:6041", 1,
|
||||
schemaless.SetDb("power"),
|
||||
|
@ -34,21 +35,22 @@ func main() {
|
|||
schemaless.SetPassword("taosdata"),
|
||||
))
|
||||
if err != nil {
|
||||
log.Fatal("failed to create schemaless connection, err:", err)
|
||||
log.Fatalln("Failed to connect to host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// insert influxdb line protocol
|
||||
err = s.Insert(lineDemo, schemaless.InfluxDBLineProtocol, "ms", 0, common.GetReqID())
|
||||
if err != nil {
|
||||
log.Fatal("failed to insert influxdb line protocol, err:", err)
|
||||
log.Fatalln("Failed to insert data with schemaless, host:" + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// insert opentsdb telnet line protocol
|
||||
err = s.Insert(telnetDemo, schemaless.OpenTSDBTelnetLineProtocol, "ms", 0, common.GetReqID())
|
||||
if err != nil {
|
||||
log.Fatal("failed to insert opentsdb telnet line protocol, err:", err)
|
||||
log.Fatalln("Failed to insert data with schemaless, host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// insert opentsdb json format protocol
|
||||
err = s.Insert(jsonDemo, schemaless.OpenTSDBJsonFormatProtocol, "s", 0, common.GetReqID())
|
||||
if err != nil {
|
||||
log.Fatal("failed to insert opentsdb json format protocol, err:", err)
|
||||
log.Fatalln("Failed to insert data with schemaless, host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
fmt.Println("Inserted data with schemaless successfully.")
|
||||
}
|
||||
|
|
|
@ -10,39 +10,35 @@ import (
|
|||
)
|
||||
|
||||
func main() {
|
||||
db, err := sql.Open("taosSql", "root:taosdata@tcp(localhost:6030)/")
|
||||
var taosDSN = "root:taosdata@tcp(localhost:6030)/"
|
||||
db, err := sql.Open("taosSql", taosDSN)
|
||||
if err != nil {
|
||||
log.Fatal("open database failed:", err)
|
||||
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
defer db.Close()
|
||||
// ANCHOR: create_db_and_table
|
||||
// create database
|
||||
res, err := db.Exec("CREATE DATABASE IF NOT EXISTS power")
|
||||
if err != nil {
|
||||
log.Fatal("create database failed:", err)
|
||||
log.Fatalln("Failed to create db, url:" + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
affected, err := res.RowsAffected()
|
||||
rowsAffected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
log.Fatal("get affected rows failed:", err)
|
||||
log.Fatalln("Failed to get create db rowsAffected, url:" + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
fmt.Println("create database affected:", affected)
|
||||
// use database
|
||||
res, err = db.Exec("USE power")
|
||||
if err != nil {
|
||||
log.Fatal("use database failed:", err)
|
||||
}
|
||||
affected, err = res.RowsAffected()
|
||||
if err != nil {
|
||||
log.Fatal("get affected rows failed:", err)
|
||||
}
|
||||
fmt.Println("use database affected:", affected)
|
||||
// you can check rowsAffected here
|
||||
fmt.Println("Create database power successfully, rowsAffected: ", rowsAffected)
|
||||
// create table
|
||||
res, err = db.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
|
||||
affected, err = res.RowsAffected()
|
||||
if err != nil {
|
||||
log.Fatal("create table failed:", err)
|
||||
log.Fatalln("Failed to create db and table, url:" + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
fmt.Println("create table affected:", affected)
|
||||
rowsAffected, err = res.RowsAffected()
|
||||
if err != nil {
|
||||
log.Fatalln("Failed to get create create rowsAffected, url:" + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// you can check rowsAffected here
|
||||
fmt.Println("Create stable power.meters successfully, rowsAffected:", rowsAffected)
|
||||
// ANCHOR_END: create_db_and_table
|
||||
// ANCHOR: insert_data
|
||||
// insert data, please make sure the database and table are created before
|
||||
|
@ -57,14 +53,14 @@ func main() {
|
|||
"(NOW + 1a, 10.30000, 218, 0.25000) "
|
||||
res, err = db.Exec(insertQuery)
|
||||
if err != nil {
|
||||
log.Fatal("insert data failed:", err)
|
||||
log.Fatal("Failed to insert data to power.meters, url:" + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
affected, err = res.RowsAffected()
|
||||
rowsAffected, err = res.RowsAffected()
|
||||
if err != nil {
|
||||
log.Fatal("get affected rows failed:", err)
|
||||
log.Fatal("Failed to get insert rowsAffected, url:" + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// you can check affectedRows here
|
||||
fmt.Println("insert data affected:", affected)
|
||||
fmt.Printf("Successfully inserted %d rows to power.meters.\n", rowsAffected)
|
||||
// ANCHOR_END: insert_data
|
||||
// ANCHOR: select_data
|
||||
// query data, make sure the database and table are created before
|
||||
|
|
|
@ -17,28 +17,28 @@ func main() {
|
|||
numOfRow := 10
|
||||
db, err := af.Open(host, "root", "taosdata", "", 0)
|
||||
if err != nil {
|
||||
log.Fatal("failed to connect TDengine, err:", err)
|
||||
log.Fatalln("Failed to connect to " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
defer db.Close()
|
||||
// prepare database and table
|
||||
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS power")
|
||||
if err != nil {
|
||||
log.Fatal("failed to create database, err:", err)
|
||||
log.Fatalln("Failed to create db, host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
_, err = db.Exec("USE power")
|
||||
if err != nil {
|
||||
log.Fatal("failed to use database, err:", err)
|
||||
log.Fatalln("Failed to use db, host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
_, err = db.Exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
|
||||
if err != nil {
|
||||
log.Fatal("failed to create table, err:", err)
|
||||
log.Fatalln("Failed to create table, host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// prepare statement
|
||||
sql := "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
|
||||
stmt := db.Stmt()
|
||||
err = stmt.Prepare(sql)
|
||||
if err != nil {
|
||||
log.Fatal("failed to prepare statement, err:", err)
|
||||
log.Fatalln("Failed to prepare sql, host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
for i := 1; i <= numOfSubTable; i++ {
|
||||
tableName := fmt.Sprintf("d_bind_%d", i)
|
||||
|
@ -46,7 +46,7 @@ func main() {
|
|||
// set tableName and tags
|
||||
err = stmt.SetTableNameWithTags(tableName, tags)
|
||||
if err != nil {
|
||||
log.Fatal("failed to set table name and tags, err:", err)
|
||||
log.Fatalln("Failed to set table name and tags, host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// bind column data
|
||||
current := time.Now()
|
||||
|
@ -58,23 +58,23 @@ func main() {
|
|||
AddFloat(rand.Float32())
|
||||
err = stmt.BindRow(row)
|
||||
if err != nil {
|
||||
log.Fatal("failed to bind row, err:", err)
|
||||
log.Fatalln("Failed to bind params, host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
}
|
||||
// add batch
|
||||
err = stmt.AddBatch()
|
||||
if err != nil {
|
||||
log.Fatal("failed to add batch, err:", err)
|
||||
log.Fatalln("Failed to add batch, host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// execute batch
|
||||
err = stmt.Execute()
|
||||
if err != nil {
|
||||
log.Fatal("failed to execute batch, err:", err)
|
||||
log.Fatalln("Failed to exec, host: " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// get affected rows
|
||||
affected := stmt.GetAffectedRows()
|
||||
// you can check exeResult here
|
||||
fmt.Printf("table %s insert %d rows.\n", tableName, affected)
|
||||
fmt.Printf("Successfully inserted %d rows to %s.\n", affected, tableName)
|
||||
}
|
||||
err = stmt.Close()
|
||||
if err != nil {
|
||||
|
|
|
@ -17,19 +17,21 @@ func main() {
|
|||
host := "127.0.0.1"
|
||||
numOfSubTable := 10
|
||||
numOfRow := 10
|
||||
db, err := sql.Open("taosRestful", fmt.Sprintf("root:taosdata@http(%s:6041)/", host))
|
||||
|
||||
taosDSN := fmt.Sprintf("root:taosdata@http(%s:6041)/", host)
|
||||
db, err := sql.Open("taosRestful", taosDSN)
|
||||
if err != nil {
|
||||
log.Fatal("failed to connect TDengine, err:", err)
|
||||
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
defer db.Close()
|
||||
// prepare database and table
|
||||
_, err = db.Exec("CREATE DATABASE IF NOT EXISTS power")
|
||||
if err != nil {
|
||||
log.Fatal("failed to create database, err:", err)
|
||||
log.Fatalln("Failed to create db, url: " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
_, err = db.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
|
||||
if err != nil {
|
||||
log.Fatal("failed to create table, err:", err)
|
||||
log.Fatalln("Failed to create table, url: " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
|
||||
config := stmt.NewConfig(fmt.Sprintf("ws://%s:6041", host), 0)
|
||||
|
@ -41,17 +43,17 @@ func main() {
|
|||
|
||||
connector, err := stmt.NewConnector(config)
|
||||
if err != nil {
|
||||
log.Fatal("failed to create stmt connector, err:", err)
|
||||
log.Fatalln("Failed to create stmt connector,url: " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// prepare statement
|
||||
sql := "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"
|
||||
stmt, err := connector.Init()
|
||||
if err != nil {
|
||||
log.Fatal("failed to init stmt, err:", err)
|
||||
log.Fatalln("Failed to init stmt, url: " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
err = stmt.Prepare(sql)
|
||||
if err != nil {
|
||||
log.Fatal("failed to prepare stmt, err:", err)
|
||||
log.Fatal("Failed to prepare sql, url: " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
for i := 1; i <= numOfSubTable; i++ {
|
||||
tableName := fmt.Sprintf("d_bind_%d", i)
|
||||
|
@ -61,12 +63,12 @@ func main() {
|
|||
// set tableName
|
||||
err = stmt.SetTableName(tableName)
|
||||
if err != nil {
|
||||
log.Fatal("failed to set table name, err:", err)
|
||||
log.Fatal("Failed to set table name, url: " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// set tags
|
||||
err = stmt.SetTags(tags, tagsType)
|
||||
if err != nil {
|
||||
log.Fatal("failed to set tags, err:", err)
|
||||
log.Fatal("Failed to set tags, url: " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// bind column data
|
||||
current := time.Now()
|
||||
|
@ -78,26 +80,26 @@ func main() {
|
|||
columnData[3] = param.NewParam(1).AddFloat(rand.Float32())
|
||||
err = stmt.BindParam(columnData, columnType)
|
||||
if err != nil {
|
||||
log.Fatal("failed to bind param, err:", err)
|
||||
log.Fatal("Failed to bind params, url: " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
}
|
||||
// add batch
|
||||
err = stmt.AddBatch()
|
||||
if err != nil {
|
||||
log.Fatal("failed to add batch, err:", err)
|
||||
log.Fatal("Failed to add batch, url: " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// execute batch
|
||||
err = stmt.Exec()
|
||||
if err != nil {
|
||||
log.Fatal("failed to exec stmt, err:", err)
|
||||
log.Fatal("Failed to exec, url: " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// get affected rows
|
||||
affected := stmt.GetAffectedRows()
|
||||
// you can check exeResult here
|
||||
fmt.Printf("table %s insert %d rows.\n", tableName, affected)
|
||||
fmt.Printf("Successfully inserted %d rows to %s.\n", affected, tableName)
|
||||
}
|
||||
err = stmt.Close()
|
||||
if err != nil {
|
||||
log.Fatal("failed to close stmt, err:", err)
|
||||
log.Fatal("Failed to close stmt, url: " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,9 +15,10 @@ var done = make(chan struct{})
|
|||
|
||||
func main() {
|
||||
// init env
|
||||
conn, err := sql.Open("taosSql", "root:taosdata@tcp(127.0.0.1:6030)/")
|
||||
taosDSN := "root:taosdata@tcp(127.0.0.1:6030)/"
|
||||
conn, err := sql.Open("taosSql", taosDSN)
|
||||
if err != nil {
|
||||
log.Fatal("failed to connect TDengine, err:", err)
|
||||
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
defer func() {
|
||||
conn.Close()
|
||||
|
@ -25,6 +26,9 @@ func main() {
|
|||
initEnv(conn)
|
||||
// ANCHOR: create_consumer
|
||||
// create consumer
|
||||
groupID := "group1"
|
||||
clientID := "client1"
|
||||
host := "127.0.0.1"
|
||||
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
|
@ -32,18 +36,21 @@ func main() {
|
|||
"msg.with.table.name": "true",
|
||||
"enable.auto.commit": "true",
|
||||
"auto.commit.interval.ms": "1000",
|
||||
"group.id": "group1",
|
||||
"client.id": "client1",
|
||||
"group.id": groupID,
|
||||
"client.id": clientID,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal("failed to create consumer, err:", err)
|
||||
log.Fatalln("Failed to create native consumer, host : " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
log.Println("Create consumer successfully, host: " + host + ", groupId: " + groupID + ", clientId: " + clientID)
|
||||
|
||||
// ANCHOR_END: create_consumer
|
||||
// ANCHOR: subscribe
|
||||
err = consumer.Subscribe("topic_meters", nil)
|
||||
if err != nil {
|
||||
log.Fatal("failed to subscribe, err:", err)
|
||||
log.Fatalln("Failed to subscribe, host : " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
log.Println("subscribe topics successfully")
|
||||
for i := 0; i < 50; i++ {
|
||||
ev := consumer.Poll(100)
|
||||
if ev != nil {
|
||||
|
@ -53,23 +60,16 @@ func main() {
|
|||
fmt.Printf("data:%v\n", e)
|
||||
// ANCHOR: commit_offset
|
||||
// commit offset
|
||||
topicPartition, err := consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
|
||||
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
|
||||
if err != nil {
|
||||
log.Fatal("failed to commit offset, err:", err)
|
||||
log.Fatalln("Failed to commit offset, host : " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
fmt.Println(topicPartition)
|
||||
log.Println("commit offset manually successfully.")
|
||||
// ANCHOR_END: commit_offset
|
||||
case tmqcommon.Error:
|
||||
fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
|
||||
log.Fatal("failed to get message, err:", e)
|
||||
log.Fatalln("Failed to poll data, host : " + host + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// commit all offsets
|
||||
topicPartition, err := consumer.Commit()
|
||||
if err != nil {
|
||||
log.Fatal("failed to commit, err:", err)
|
||||
}
|
||||
fmt.Println(topicPartition)
|
||||
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: subscribe
|
||||
|
@ -79,8 +79,8 @@ func main() {
|
|||
if err != nil {
|
||||
log.Fatal("failed to get assignment, err:", err)
|
||||
}
|
||||
fmt.Println("now assignment:", partitions)
|
||||
for i := 0; i < len(partitions); i++ {
|
||||
fmt.Println(partitions[i])
|
||||
// seek to the beginning
|
||||
err = consumer.Seek(tmqcommon.TopicPartition{
|
||||
Topic: partitions[i].Topic,
|
||||
|
@ -88,7 +88,7 @@ func main() {
|
|||
Offset: 0,
|
||||
}, 0)
|
||||
if err != nil {
|
||||
log.Fatal("failed to seek, err:", err)
|
||||
log.Fatalln("seek example failed; ErrMessage: " + err.Error())
|
||||
}
|
||||
}
|
||||
fmt.Println("assignment seek to beginning successfully")
|
||||
|
@ -116,12 +116,12 @@ func main() {
|
|||
// unsubscribe
|
||||
err = consumer.Unsubscribe()
|
||||
if err != nil {
|
||||
log.Fatal("failed to unsubscribe, err:", err)
|
||||
log.Fatal("Failed to unsubscribe consumer. ErrMessage: " + err.Error())
|
||||
}
|
||||
// close consumer
|
||||
err = consumer.Close()
|
||||
if err != nil {
|
||||
log.Fatal("failed to close consumer, err:", err)
|
||||
log.Fatal("Failed to close consumer. ErrMessage: " + err.Error())
|
||||
}
|
||||
// ANCHOR_END: close
|
||||
<-done
|
||||
|
|
|
@ -16,9 +16,10 @@ var done = make(chan struct{})
|
|||
|
||||
func main() {
|
||||
// init env
|
||||
conn, err := sql.Open("taosWS", "root:taosdata@ws(127.0.0.1:6041)/")
|
||||
taosDSN := "root:taosdata@ws(127.0.0.1:6041)/"
|
||||
conn, err := sql.Open("taosWS", taosDSN)
|
||||
if err != nil {
|
||||
log.Fatal("failed to connect TDengine, err:", err)
|
||||
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
defer func() {
|
||||
conn.Close()
|
||||
|
@ -26,8 +27,11 @@ func main() {
|
|||
initEnv(conn)
|
||||
// ANCHOR: create_consumer
|
||||
// create consumer
|
||||
wsUrl := "ws://127.0.0.1:6041"
|
||||
groupID := "group1"
|
||||
clientID := "client1"
|
||||
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
|
||||
"ws.url": "ws://127.0.0.1:6041",
|
||||
"ws.url": wsUrl,
|
||||
"ws.message.channelLen": uint(0),
|
||||
"ws.message.timeout": common.DefaultMessageTimeout,
|
||||
"ws.message.writeWait": common.DefaultWriteWait,
|
||||
|
@ -37,18 +41,21 @@ func main() {
|
|||
"msg.with.table.name": "true",
|
||||
"enable.auto.commit": "true",
|
||||
"auto.commit.interval.ms": "1000",
|
||||
"group.id": "group1",
|
||||
"client.id": "client1",
|
||||
"group.id": groupID,
|
||||
"client.id": clientID,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal("failed to create consumer, err:", err)
|
||||
log.Fatalln("Failed to create websocket consumer, host : " + wsUrl + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
log.Println("Create consumer successfully, host: " + wsUrl + ", groupId: " + groupID + ", clientId: " + clientID)
|
||||
|
||||
// ANCHOR_END: create_consumer
|
||||
// ANCHOR: subscribe
|
||||
err = consumer.Subscribe("topic_meters", nil)
|
||||
if err != nil {
|
||||
log.Fatal("failed to subscribe, err:", err)
|
||||
log.Fatalln("Failed to subscribe, host : " + wsUrl + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
log.Println("subscribe topics successfully")
|
||||
for i := 0; i < 50; i++ {
|
||||
ev := consumer.Poll(100)
|
||||
if ev != nil {
|
||||
|
@ -58,23 +65,16 @@ func main() {
|
|||
fmt.Printf("data:%v\n", e)
|
||||
// ANCHOR: commit_offset
|
||||
// commit offset
|
||||
topicPartition, err := consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
|
||||
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
|
||||
if err != nil {
|
||||
log.Fatal("failed to commit offset, err:", err)
|
||||
log.Fatalln("Failed to commit offset, host : " + wsUrl + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
fmt.Println(topicPartition)
|
||||
log.Println("commit offset manually successfully.")
|
||||
// ANCHOR_END: commit_offset
|
||||
case tmqcommon.Error:
|
||||
fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
|
||||
log.Fatal("failed to get message, err:", e)
|
||||
log.Fatalln("Failed to poll data, host : " + wsUrl + "; ErrMessage: " + err.Error())
|
||||
}
|
||||
// commit all offsets
|
||||
topicPartition, err := consumer.Commit()
|
||||
if err != nil {
|
||||
log.Fatal("failed to commit, err:", err)
|
||||
}
|
||||
fmt.Println(topicPartition)
|
||||
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: subscribe
|
||||
|
@ -84,8 +84,8 @@ func main() {
|
|||
if err != nil {
|
||||
log.Fatal("failed to get assignment, err:", err)
|
||||
}
|
||||
fmt.Println("now assignment:", partitions)
|
||||
for i := 0; i < len(partitions); i++ {
|
||||
fmt.Println(partitions[i])
|
||||
// seek to the beginning
|
||||
err = consumer.Seek(tmqcommon.TopicPartition{
|
||||
Topic: partitions[i].Topic,
|
||||
|
@ -93,7 +93,7 @@ func main() {
|
|||
Offset: 0,
|
||||
}, 0)
|
||||
if err != nil {
|
||||
log.Fatal("failed to seek, err:", err)
|
||||
log.Fatalln("seek example failed; ErrMessage: " + err.Error())
|
||||
}
|
||||
}
|
||||
fmt.Println("assignment seek to beginning successfully")
|
||||
|
@ -121,12 +121,12 @@ func main() {
|
|||
// unsubscribe
|
||||
err = consumer.Unsubscribe()
|
||||
if err != nil {
|
||||
log.Fatal("failed to unsubscribe, err:", err)
|
||||
log.Fatal("Failed to unsubscribe consumer. ErrMessage: " + err.Error())
|
||||
}
|
||||
// close consumer
|
||||
err = consumer.Close()
|
||||
if err != nil {
|
||||
log.Fatal("failed to close consumer, err:", err)
|
||||
log.Fatal("Failed to close consumer. ErrMessage: " + err.Error())
|
||||
}
|
||||
// ANCHOR_END: close
|
||||
<-done
|
||||
|
|
|
@ -16,6 +16,7 @@ async function createConnect() {
|
|||
}
|
||||
|
||||
async function test() {
|
||||
let dsn = 'ws://localhost:6041'
|
||||
let wsSql = null;
|
||||
let wsRows = null;
|
||||
let ttl = 0;
|
||||
|
@ -24,9 +25,10 @@ async function test() {
|
|||
await wsSql.schemalessInsert(influxdbData, taos.SchemalessProto.InfluxDBLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
|
||||
await wsSql.schemalessInsert(telnetData, taos.SchemalessProto.OpenTSDBTelnetLineProtocol, taos.Precision.MILLI_SECONDS, ttl);
|
||||
await wsSql.schemalessInsert(jsonData, taos.SchemalessProto.OpenTSDBJsonFormatProtocol, taos.Precision.SECONDS, ttl);
|
||||
console.log("Inserted data with schemaless successfully.")
|
||||
}
|
||||
catch (err) {
|
||||
console.error("Failed to insert data with schemaless, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||
console.error("Failed to insert data with schemaless, url:"+ dsn +", ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||
}
|
||||
finally {
|
||||
if (wsRows) {
|
||||
|
|
|
@ -1,34 +1,39 @@
|
|||
// ANCHOR: createConnect
|
||||
const taos = require("@tdengine/websocket");
|
||||
|
||||
async function createConnect() {
|
||||
let dsn = 'ws://localhost:6041';
|
||||
async function createConnect() {
|
||||
|
||||
try {
|
||||
let conf = new taos.WSConfig(dsn);
|
||||
conf.setUser('root');
|
||||
conf.setPwd('taosdata');
|
||||
conf.setDb('power');
|
||||
return await taos.sqlConnect(conf);
|
||||
conn = await taos.sqlConnect(conf);
|
||||
console.log("Connected to " + dsn + " successfully.");
|
||||
return conn;
|
||||
} catch (err) {
|
||||
console.log("Failed to connect to " + dns + "; ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||
throw err;
|
||||
}
|
||||
|
||||
}
|
||||
// ANCHOR_END: createConnect
|
||||
|
||||
// ANCHOR: create_db_and_table
|
||||
async function createDbAndTable(wsSql) {
|
||||
async function createDbAndTable() {
|
||||
let wsSql = null;
|
||||
try {
|
||||
wsSql = await createConnect();
|
||||
await wsSql.exec('CREATE DATABASE IF NOT EXISTS POWER ' +
|
||||
'KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;');
|
||||
await wsSql.exec('CREATE DATABASE IF NOT EXISTS power');
|
||||
|
||||
await wsSql.exec('USE power');
|
||||
|
||||
await wsSql.exec('CREATE STABLE IF NOT EXISTS meters ' +
|
||||
await wsSql.exec('CREATE STABLE IF NOT EXISTS power.meters ' +
|
||||
'(_ts timestamp, current float, voltage int, phase float) ' +
|
||||
'TAGS (location binary(64), groupId int);');
|
||||
|
||||
taosResult = await wsSql.exec('describe meters');
|
||||
console.log(taosResult);
|
||||
console.log("Create stable power.meters successfully");
|
||||
} catch (err) {
|
||||
console.error("Failed to create db and table, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||
console.error("Failed to create db and table, url:" + dns + "; ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||
} finally {
|
||||
if (wsSql) {
|
||||
await wsSql.close();
|
||||
|
@ -39,8 +44,8 @@ async function createDbAndTable(wsSql) {
|
|||
// ANCHOR_END: create_db_and_table
|
||||
|
||||
// ANCHOR: insertData
|
||||
async function insertData(wsSql) {
|
||||
let wsSql = null;
|
||||
async function insertData() {
|
||||
let wsSql = null
|
||||
try {
|
||||
wsSql = await createConnect();
|
||||
let insertQuery = "INSERT INTO " +
|
||||
|
@ -53,9 +58,9 @@ async function insertData(wsSql) {
|
|||
"VALUES " +
|
||||
"(NOW + 1a, 10.30000, 218, 0.25000) ";
|
||||
taosResult = await wsSql.exec(insertQuery);
|
||||
console.log(taosResult);
|
||||
console.log("Successfully inserted " + taosResult.getAffectRows() + " rows to power.meters.");
|
||||
} catch (err) {
|
||||
console.error("Failed to insert data to power.meters, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||
console.error("Failed to insert data to power.meters, url:" + dsn + "; ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||
} finally {
|
||||
if (wsSql) {
|
||||
await wsSql.close();
|
||||
|
@ -71,15 +76,13 @@ async function queryData() {
|
|||
try {
|
||||
wsSql = await createConnect();
|
||||
wsRows = await wsSql.query('SELECT ts, current, location FROM power.meters limit 100');
|
||||
let meta = wsRows.getMeta();
|
||||
console.log("wsRow:meta:=>", meta);
|
||||
while (await wsRows.next()) {
|
||||
let result = wsRows.getData();
|
||||
console.log('queryRes.Scan().then=>', result);
|
||||
let row = wsRows.getData();
|
||||
console.log('ts: ' + row[0] + ', current: ' + row[1] + ', location: ' + row[2]);
|
||||
}
|
||||
}
|
||||
catch (err) {
|
||||
console.error("Failed to query data from power.meters," + err.code + "; ErrMessage: " + err.message);
|
||||
console.error("Failed to query data from power.meters, url:" + dsn + " ; ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||
}
|
||||
finally {
|
||||
if (wsRows) {
|
||||
|
@ -93,22 +96,20 @@ async function queryData() {
|
|||
// ANCHOR_END: queryData
|
||||
|
||||
// ANCHOR: sqlWithReqid
|
||||
async function sqlWithReqid(wsSql) {
|
||||
|
||||
async function sqlWithReqid() {
|
||||
let wsRows = null;
|
||||
let wsSql = null;
|
||||
let reqId = 1;
|
||||
try {
|
||||
wsSql = await createConnect();
|
||||
wsRows = await wsSql.query('SELECT ts, current, location FROM power.meters limit 100', 1);
|
||||
let meta = wsRows.getMeta();
|
||||
console.log("wsRow:meta:=>", meta);
|
||||
wsRows = await wsSql.query('SELECT ts, current, location FROM power.meters limit 100', reqId);
|
||||
while (await wsRows.next()) {
|
||||
let result = wsRows.getData();
|
||||
console.log('queryRes.Scan().then=>', result);
|
||||
let row = wsRows.getData();
|
||||
console.log('ts: ' + row[0] + ', current: ' + row[1] + ', location: ' + row[2]);
|
||||
}
|
||||
}
|
||||
catch (err) {
|
||||
console.error("Failed to execute sql with reqId," + err.code + "; ErrMessage: " + err.message);
|
||||
console.error("Failed to execute sql with reqId: " + reqId + "," + err.code + "; ErrMessage: " + err.message);
|
||||
}
|
||||
finally {
|
||||
if (wsRows) {
|
||||
|
|
|
@ -4,6 +4,7 @@ let db = 'power';
|
|||
let stable = 'meters';
|
||||
let numOfSubTable = 10;
|
||||
let numOfRow = 10;
|
||||
let dsn = 'ws://localhost:6041'
|
||||
function getRandomInt(min, max) {
|
||||
min = Math.ceil(min);
|
||||
max = Math.floor(max);
|
||||
|
@ -11,7 +12,7 @@ function getRandomInt(min, max) {
|
|||
}
|
||||
|
||||
async function prepare() {
|
||||
let dsn = 'ws://localhost:6041'
|
||||
|
||||
let conf = new taos.WSConfig(dsn);
|
||||
conf.setUser('root')
|
||||
conf.setPwd('taosdata')
|
||||
|
@ -54,11 +55,11 @@ async function prepare() {
|
|||
await stmt.bind(bindParams);
|
||||
await stmt.batch();
|
||||
await stmt.exec();
|
||||
console.log(`d_bind_${i} insert ` + stmt.getLastAffected() + " rows.");
|
||||
console.log("Successfully inserted " + stmt.getLastAffected() + " to power.meters.");
|
||||
}
|
||||
}
|
||||
catch (err) {
|
||||
console.error("Failed to insert to table meters using stmt, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||
console.error("Failed to insert to table meters using stmt, url:" + dsn + "ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||
}
|
||||
finally {
|
||||
if (stmt) {
|
||||
|
|
|
@ -1,23 +1,28 @@
|
|||
const taos = require("@tdengine/websocket");
|
||||
|
||||
// ANCHOR: create_consumer
|
||||
const db = 'power';
|
||||
const stable = 'meters';
|
||||
const topics = ['power_meters_topic'];
|
||||
|
||||
// ANCHOR: create_consumer
|
||||
const url = 'ws://localhost:6041';
|
||||
async function createConsumer() {
|
||||
|
||||
let groupId = "group1";
|
||||
let clientId = "1";
|
||||
let configMap = new Map([
|
||||
[taos.TMQConstants.GROUP_ID, "group1"],
|
||||
[taos.TMQConstants.CLIENT_ID, 'client1'],
|
||||
[taos.TMQConstants.GROUP_ID, groupId],
|
||||
[taos.TMQConstants.CLIENT_ID, clientId],
|
||||
[taos.TMQConstants.CONNECT_USER, "root"],
|
||||
[taos.TMQConstants.CONNECT_PASS, "taosdata"],
|
||||
[taos.TMQConstants.AUTO_OFFSET_RESET, "latest"],
|
||||
[taos.TMQConstants.WS_URL, 'ws://localhost:6041'],
|
||||
[taos.TMQConstants.WS_URL, url],
|
||||
[taos.TMQConstants.ENABLE_AUTO_COMMIT, 'true'],
|
||||
[taos.TMQConstants.AUTO_COMMIT_INTERVAL_MS, '1000']
|
||||
]);
|
||||
try {
|
||||
return await taos.tmqConnect(configMap);
|
||||
conn = await taos.tmqConnect(configMap);
|
||||
console.log(`Create consumer successfully, host: ${url}, groupId: ${groupId}, clientId: ${clientId}`)
|
||||
return conn;
|
||||
}catch (err) {
|
||||
console.log("Failed to create websocket consumer, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||
throw err;
|
||||
|
@ -31,7 +36,7 @@ async function prepare() {
|
|||
conf.setUser('root');
|
||||
conf.setPwd('taosdata');
|
||||
conf.setDb('power');
|
||||
const createDB = `CREATE DATABASE IF NOT EXISTS POWER ${db} KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;`;
|
||||
const createDB = `CREATE DATABASE IF NOT EXISTS ${db}`;
|
||||
const createStable = `CREATE STABLE IF NOT EXISTS ${db}.${stable} (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);`;
|
||||
|
||||
let wsSql = await taos.sqlConnect(conf);
|
||||
|
@ -45,7 +50,7 @@ async function prepare() {
|
|||
for (let i = 0; i < 10; i++) {
|
||||
await wsSql.exec(`INSERT INTO d1001 USING ${stable} (location, groupId) TAGS ("California.SanFrancisco", 3) VALUES (NOW, ${10 + i}, ${200 + i}, ${0.32 + i})`);
|
||||
}
|
||||
wsSql.Close();
|
||||
wsSql.close();
|
||||
}
|
||||
|
||||
async function subscribe(consumer) {
|
||||
|
@ -55,9 +60,10 @@ async function subscribe(consumer) {
|
|||
for (let i = 0; i < 50; i++) {
|
||||
let res = await consumer.poll(100);
|
||||
for (let [key, value] of res) {
|
||||
console.log(key, value);
|
||||
console.log(`data: ${key} ${value}`);
|
||||
}
|
||||
consumer.commit();
|
||||
console.log("commit offset manually successfully.");
|
||||
}
|
||||
} catch (err) {
|
||||
console.error("Failed to poll data; err.code, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||
|
@ -74,6 +80,7 @@ async function test() {
|
|||
let consumer = await createConsumer()
|
||||
await subscribe(consumer)
|
||||
await consumer.unsubscribe();
|
||||
console.log("unsubscribe consumer successfully.");
|
||||
}
|
||||
catch (err) {
|
||||
console.error("Failed to unsubscribe consume, ErrCode:" + err.code + "; ErrMessage: " + err.message);
|
||||
|
|
|
@ -84,6 +84,7 @@ async function test() {
|
|||
}
|
||||
|
||||
await consumer.seekToBeginning(assignment);
|
||||
console.log("assignment seek to beginning successfully");
|
||||
assignment = await consumer.assignment();
|
||||
for (let i in assignment) {
|
||||
console.log("seek after:", assignment[i]);
|
||||
|
|
|
@ -14,7 +14,7 @@ def create_connection():
|
|||
)
|
||||
print(f"Connected to {host}:{port} successfully.");
|
||||
except Exception as err:
|
||||
print(f"Failed to connect to {host}:{port} ; Err:{err}")
|
||||
print(f"Failed to connect to {host}:{port} ; err:{err}")
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -12,7 +12,7 @@ def create_connection():
|
|||
|
||||
print(f"Connected to {url} successfully.");
|
||||
except Exception as err:
|
||||
print(f"Failed to connect to {url} ; Err:{err}")
|
||||
print(f"Failed to connect to {url} ; err:{err}")
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -9,26 +9,18 @@ try:
|
|||
user="root",
|
||||
password="taosdata")
|
||||
|
||||
db = "power"
|
||||
# create database
|
||||
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
|
||||
assert rowsAffected == 0
|
||||
|
||||
# change database. same as execute "USE db"
|
||||
conn.select_db(db)
|
||||
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS power")
|
||||
print(f"Create database power successfully, rowsAffected: {rowsAffected}");
|
||||
|
||||
# create super table
|
||||
rowsAffected = conn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
||||
"CREATE TABLE IF NOT EXISTS power.meters (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
||||
)
|
||||
assert rowsAffected == 0
|
||||
|
||||
# create table
|
||||
rowsAffected = conn.execute("CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupid, location) TAGS(0, 'Los Angles')")
|
||||
assert rowsAffected == 0
|
||||
print(f"Create stable power.meters successfully, rowsAffected: {rowsAffected}");
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to create db and table, db addrr:{host}:{port} err:{err}")
|
||||
print(f"Failed to create db and table, db addr:{host}:{port} ; err:{err}")
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -8,22 +8,18 @@ try:
|
|||
password="taosdata",
|
||||
timeout=30)
|
||||
|
||||
db = "power"
|
||||
# create database
|
||||
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
|
||||
assert rowsAffected == 0
|
||||
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS power")
|
||||
print(f"Create database power successfully, rowsAffected: {rowsAffected}");
|
||||
|
||||
# create super table
|
||||
rowsAffected = conn.execute(
|
||||
f"CREATE TABLE IF NOT EXISTS `{db}`.`meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
||||
f"CREATE TABLE IF NOT EXISTS power.meters (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
||||
)
|
||||
assert rowsAffected == 0
|
||||
# create table
|
||||
rowsAffected = conn.execute(f"CREATE TABLE IF NOT EXISTS `{db}`.`d0` USING `{db}`.`meters` (groupid, location) TAGS(0, 'Los Angles')")
|
||||
assert rowsAffected == 0
|
||||
print(f"Create stable power.meters successfully, rowsAffected: {rowsAffected}");
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to create db and table, url:{url} err:{err}")
|
||||
print(f"Failed to create db and table, url:{url} ; err:{err}")
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -9,27 +9,18 @@ try:
|
|||
host=host,
|
||||
port=port)
|
||||
|
||||
db = "power"
|
||||
# create database
|
||||
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
|
||||
assert rowsAffected == 0
|
||||
|
||||
# change database.
|
||||
rowsAffected = conn.execute(f"USE {db}")
|
||||
assert rowsAffected == 0
|
||||
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS power")
|
||||
print(f"Create database power successfully, rowsAffected: {rowsAffected}");
|
||||
|
||||
# create super table
|
||||
rowsAffected = conn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
||||
"CREATE TABLE IF NOT EXISTS power.meters (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))"
|
||||
)
|
||||
assert rowsAffected == 0
|
||||
|
||||
# create table
|
||||
rowsAffected = conn.execute("CREATE TABLE IF NOT EXISTS `d0` USING `meters` (groupid, location) TAGS(0, 'Los Angles')")
|
||||
assert rowsAffected == 0
|
||||
print(f"Create stable power.meters successfully, rowsAffected: {rowsAffected}");
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to create db and table, db addrr:{host}:{port} err:{err}")
|
||||
print(f"Failed to create db and table, db addrr:{host}:{port} ; err:{err}")
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
import taos
|
||||
|
||||
conn = None
|
||||
|
||||
host = "localhost"
|
||||
port = 6030
|
||||
try:
|
||||
conn = taos.connect(user="root",
|
||||
password="taosdata",
|
||||
host="localhost",
|
||||
port=6030)
|
||||
conn = taos.connect(host=host,
|
||||
port=port,
|
||||
user="root",
|
||||
password="taosdata")
|
||||
|
||||
sql = """
|
||||
INSERT INTO
|
||||
|
@ -17,10 +18,10 @@ try:
|
|||
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
|
||||
"""
|
||||
affectedRows = conn.execute(sql)
|
||||
print(f"inserted into {affectedRows} rows to power.meters successfully.")
|
||||
print(f"Successfully inserted {affectedRows} rows to power.meters.")
|
||||
|
||||
except Exception as err:
|
||||
print(err)
|
||||
print(f"Failed to insert data to power.meters, db addr:{host}:{port} ; err:{err}")
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
import taosrest
|
||||
|
||||
conn = None
|
||||
|
||||
url="http://localhost:6041"
|
||||
try:
|
||||
conn = taosrest.connect(url="http://localhost:6041",
|
||||
conn = taosrest.connect(url=url,
|
||||
user="root",
|
||||
password="taosdata",
|
||||
timeout=30)
|
||||
|
@ -17,10 +17,10 @@ try:
|
|||
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
|
||||
"""
|
||||
affectedRows = conn.execute(sql)
|
||||
print(f"inserted into {affectedRows} rows to power.meters successfully.")
|
||||
print(f"Successfully inserted {affectedRows} rows to power.meters.")
|
||||
|
||||
except Exception as err:
|
||||
print(err)
|
||||
print(f"Failed to insert data to power.meters, url:{url} ; err:{err}")
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
import taosws
|
||||
|
||||
conn = None
|
||||
|
||||
host="localhost"
|
||||
port=6041
|
||||
try:
|
||||
conn = taosws.connect(user="root",
|
||||
password="taosdata",
|
||||
host="localhost",
|
||||
port=6041)
|
||||
host=host,
|
||||
port=port)
|
||||
|
||||
sql = """
|
||||
INSERT INTO
|
||||
|
@ -17,10 +18,10 @@ try:
|
|||
VALUES (NOW + 1a, 10.30000, 218, 0.25000)
|
||||
"""
|
||||
affectedRows = conn.execute(sql)
|
||||
print(f"inserted into {affectedRows} rows to power.meters successfully.")
|
||||
print(f"Successfully inserted {affectedRows} rows to power.meters.")
|
||||
|
||||
except Exception as err:
|
||||
print(err)
|
||||
print(f"Failed to insert data to power.meters, db addr:{host}:{port} ; err:{err}")
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -1,26 +1,21 @@
|
|||
import taos
|
||||
|
||||
host="localhost"
|
||||
port=6030
|
||||
conn = None
|
||||
try:
|
||||
conn = taos.connect(host="localhost",
|
||||
port=6030,
|
||||
conn = taos.connect(host=host,
|
||||
port=port,
|
||||
user="root",
|
||||
password="taosdata")
|
||||
|
||||
result = conn.query("SELECT ts, current, location FROM power.meters limit 100")
|
||||
print(result)
|
||||
# Get fields from result
|
||||
fields = result.fields
|
||||
for field in fields:
|
||||
print(field)
|
||||
|
||||
# Get data from result as list of tuple
|
||||
data = result.fetch_all()
|
||||
for row in data:
|
||||
print(row)
|
||||
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to query data from power.meters, err:{err}")
|
||||
print(f"Failed to query data from power.meters, db addr:{host}:{port} ; err:{err}")
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
|
@ -1,15 +1,17 @@
|
|||
import taosrest
|
||||
|
||||
client = None
|
||||
|
||||
url="http://localhost:6041"
|
||||
try:
|
||||
client = taosrest.RestClient(url="http://localhost:6041",
|
||||
client = taosrest.RestClient(url=url,
|
||||
user="root",
|
||||
password="taosdata",
|
||||
timeout=30)
|
||||
|
||||
result = client.sql(f"SELECT ts, current, location FROM power.meters limit 100", 1)
|
||||
print(result)
|
||||
result = client.sql(f"SELECT ts, current, location FROM power.meters limit 100")
|
||||
if result["data"]:
|
||||
for row in result["data"]:
|
||||
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to query data from power.meters, err:{err}")
|
||||
print(f"Failed to query data from power.meters, url:{url} ; err:{err}")
|
||||
|
|
|
@ -1,22 +1,20 @@
|
|||
import taosws
|
||||
|
||||
conn = None
|
||||
|
||||
host="localhost"
|
||||
port=6041
|
||||
try:
|
||||
conn = taosws.connect(user="root",
|
||||
password="taosdata",
|
||||
host="localhost",
|
||||
port=6041)
|
||||
host=host,
|
||||
port=port)
|
||||
|
||||
result = conn.query("SELECT ts, current, location FROM power.meters limit 100")
|
||||
num_of_fields = result.field_count
|
||||
print(num_of_fields)
|
||||
|
||||
for row in result:
|
||||
print(row)
|
||||
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to query data from power.meters, err:{err}")
|
||||
print(f"Failed to query data from power.meters, db addr:{host}:{port} ; err:{err}")
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -1,25 +1,24 @@
|
|||
import taos
|
||||
|
||||
conn = None
|
||||
reqId = 1
|
||||
host="localhost"
|
||||
port=6030
|
||||
try:
|
||||
conn = taos.connect(host="localhost",
|
||||
port=6030,
|
||||
conn = taos.connect(host=host,
|
||||
port=port,
|
||||
user="root",
|
||||
password="taosdata")
|
||||
|
||||
result = conn.query("SELECT ts, current, location FROM power.meters limit 100", 1)
|
||||
# Get fields from result
|
||||
fields = result.fields
|
||||
for field in fields:
|
||||
print(field)
|
||||
|
||||
result = conn.query("SELECT ts, current, location FROM power.meters limit 100", reqId)
|
||||
# Get data from result as list of tuple
|
||||
data = result.fetch_all()
|
||||
for row in data:
|
||||
print(row)
|
||||
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to execute sql with reqId, err:{err}")
|
||||
print(f"Failed to execute sql with reqId:{reqId}, db addr:{host}:{port} ; err:{err}")
|
||||
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
|
@ -1,15 +1,18 @@
|
|||
import taosrest
|
||||
|
||||
client = None
|
||||
|
||||
url="http://localhost:6041"
|
||||
reqId = 1
|
||||
try:
|
||||
client = taosrest.RestClient(url="http://localhost:6041",
|
||||
client = taosrest.RestClient(url=url,
|
||||
user="root",
|
||||
password="taosdata",
|
||||
timeout=30)
|
||||
|
||||
result = client.sql(f"SELECT ts, current, location FROM power.meters limit 100", 1)
|
||||
print(result)
|
||||
result = client.sql(f"SELECT ts, current, location FROM power.meters limit 100", reqId)
|
||||
if result["data"]:
|
||||
for row in result["data"]:
|
||||
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to execute sql with reqId, err:{err}")
|
||||
print(f"Failed to execute sql with reqId:{reqId}, url:{url} ; err:{err}")
|
||||
|
|
|
@ -1,22 +1,24 @@
|
|||
import taosws
|
||||
|
||||
conn = None
|
||||
|
||||
reqId = 1
|
||||
host="localhost"
|
||||
port=6041
|
||||
try:
|
||||
conn = taosws.connect(
|
||||
user="root",
|
||||
password="taosdata",
|
||||
host="localhost",
|
||||
port=6041,
|
||||
host=host,
|
||||
port=port,
|
||||
)
|
||||
|
||||
result = conn.query_with_req_id("SELECT ts, current, location FROM power.meters limit 100", req_id=1)
|
||||
|
||||
# Get data from result as list of tuple
|
||||
for row in result:
|
||||
print(row)
|
||||
print(f"ts: {row[0]}, current: {row[1]}, location: {row[2]}")
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to execute sql with reqId, err:{err}")
|
||||
print(f"Failed to execute sql with reqId:{reqId}, db addr:{host}:{port} ; err:{err}")
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
|
@ -9,13 +9,14 @@ telnetDemo = ["metric_telnet 1707095283260 4 host=host0 interface=eth0"]
|
|||
jsonDemo = [
|
||||
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
|
||||
]
|
||||
|
||||
host = "localhost"
|
||||
port = 6030
|
||||
try:
|
||||
conn = taos.connect(
|
||||
host="localhost",
|
||||
user="root",
|
||||
password="taosdata",
|
||||
port=6030
|
||||
host=host,
|
||||
port=port
|
||||
)
|
||||
|
||||
conn.execute("CREATE DATABASE IF NOT EXISTS power")
|
||||
|
@ -31,8 +32,9 @@ try:
|
|||
conn.schemaless_insert(
|
||||
jsonDemo, taos.SmlProtocol.JSON_PROTOCOL, taos.SmlPrecision.MILLI_SECONDS
|
||||
)
|
||||
print("Inserted data with schemaless successfully.");
|
||||
except Exception as err:
|
||||
print(f"Failed to insert data with schemaless, err:{err}")
|
||||
print(f"Failed to insert data with schemaless, addr: {host}:{port} err:{err}")
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import taosws
|
||||
|
||||
db = "power"
|
||||
def prepare():
|
||||
conn = None
|
||||
try:
|
||||
|
@ -10,11 +9,12 @@ def prepare():
|
|||
port=6041)
|
||||
|
||||
# create database
|
||||
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS {db}")
|
||||
rowsAffected = conn.execute(f"CREATE DATABASE IF NOT EXISTS power")
|
||||
assert rowsAffected == 0
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to create db and table, err:{err}")
|
||||
raise err
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
@ -32,13 +32,14 @@ def schemaless_insert():
|
|||
jsonDemo = [
|
||||
'{"metric": "metric_json","timestamp": 1626846400,"value": 10.3, "tags": {"groupid": 2, "location": "California.SanFrancisco", "id": "d1001"}}'
|
||||
]
|
||||
|
||||
host = "localhost"
|
||||
port = 6041
|
||||
try:
|
||||
conn = taosws.connect(user="root",
|
||||
password="taosdata",
|
||||
host="localhost",
|
||||
port=6041,
|
||||
database=db)
|
||||
host=host,
|
||||
port=port,
|
||||
database='power')
|
||||
|
||||
conn.schemaless_insert(
|
||||
lines = lineDemo,
|
||||
|
@ -63,10 +64,18 @@ def schemaless_insert():
|
|||
ttl=1,
|
||||
req_id=3,
|
||||
)
|
||||
|
||||
print("Inserted data with schemaless successfully.");
|
||||
except Exception as err:
|
||||
print(f"Failed to insert data with schemaless, err:{err}")
|
||||
print(f"Failed to insert data with schemaless, addr: {host}:{port} err:{err}")
|
||||
raise err
|
||||
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
prepare()
|
||||
schemaless_insert
|
||||
except Exception as err:
|
||||
print(f"Failed to insert data with schemaless, err:{err}")
|
|
@ -7,13 +7,14 @@ numOfRow = 10
|
|||
|
||||
conn = None
|
||||
stmt = None
|
||||
|
||||
host="localhost",
|
||||
port=6030,
|
||||
try:
|
||||
conn = taos.connect(
|
||||
host="localhost",
|
||||
user="root",
|
||||
password="taosdata",
|
||||
port=6030,
|
||||
host=host,
|
||||
port=port,
|
||||
)
|
||||
|
||||
conn.execute("CREATE DATABASE IF NOT EXISTS power")
|
||||
|
@ -52,10 +53,10 @@ try:
|
|||
params[3].float(phases)
|
||||
stmt.bind_param_batch(params)
|
||||
stmt.execute()
|
||||
print(f"stmt insert successfully.")
|
||||
print(f"Successfully inserted to power.meters.")
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to insert to table meters using stmt, error: {err}")
|
||||
print(f"Failed to insert to table meters using stmt, addr:{host}:{port} ; err:{err}")
|
||||
finally:
|
||||
if stmt:
|
||||
stmt.close()
|
||||
|
|
|
@ -8,6 +8,8 @@ numOfRow = 10
|
|||
|
||||
conn = None
|
||||
stmt = None
|
||||
host="localhost"
|
||||
port=6041
|
||||
try:
|
||||
conn = taosws.connect(user="root",
|
||||
password="taosdata",
|
||||
|
@ -56,10 +58,10 @@ try:
|
|||
stmt.add_batch()
|
||||
stmt.execute()
|
||||
|
||||
print(f"stmt insert successfully.")
|
||||
print(f"Successfully inserted to power.meters.")
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to insert to table meters using stmt, error: {err}")
|
||||
print(f"Failed to insert to table meters using stmt, addr:{host}:{port} ; err:{err}")
|
||||
finally:
|
||||
if stmt:
|
||||
stmt.close()
|
||||
|
|
|
@ -49,23 +49,28 @@ def prepareMeta():
|
|||
from taos.tmq import Consumer
|
||||
|
||||
def create_consumer():
|
||||
host = "localhost"
|
||||
port = 6030
|
||||
groupId = "group1"
|
||||
clientId = "1"
|
||||
try:
|
||||
consumer = Consumer(
|
||||
{
|
||||
"group.id": "group1",
|
||||
"client.id": "1",
|
||||
"group.id": groupId,
|
||||
"client.id": clientId,
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"enable.auto.commit": "true",
|
||||
"auto.commit.interval.ms": "1000",
|
||||
"auto.offset.reset": "latest",
|
||||
"td.connect.ip": "localhost",
|
||||
"td.connect.port": "6030",
|
||||
"td.connect.ip": host,
|
||||
"td.connect.port": port,
|
||||
}
|
||||
)
|
||||
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}");
|
||||
return consumer
|
||||
except Exception as err:
|
||||
print(f"Failed to poll data, err:{err}")
|
||||
print(f"Failed to create websocket consumer, host: {host}:{port} ; err:{err}");
|
||||
raise err
|
||||
# ANCHOR_END: create_consumer
|
||||
|
||||
|
@ -87,7 +92,8 @@ def subscribe(consumer):
|
|||
val = records.value()
|
||||
if val:
|
||||
for block in val:
|
||||
print(block.fetchall())
|
||||
data = block.fetchall()
|
||||
print(f"data: {data}")
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to poll data, err:{err}")
|
||||
|
@ -114,6 +120,7 @@ def commit_offset(consumer):
|
|||
|
||||
# after processing the data, commit the offset manually
|
||||
consumer.commit(records)
|
||||
print("commit offset manually successfully.");
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to poll data, err:{err}")
|
||||
|
@ -141,6 +148,7 @@ def seek_offset(consumer):
|
|||
def unsubscribe(consumer):
|
||||
try:
|
||||
consumer.unsubscribe()
|
||||
print("unsubscribe consumer successfully.");
|
||||
except Exception as err:
|
||||
print(f"Failed to unsubscribe consumer. err:{err}")
|
||||
|
||||
|
|
|
@ -60,20 +60,25 @@ def prepareMeta():
|
|||
|
||||
# ANCHOR: create_consumer
|
||||
def create_consumer():
|
||||
host = "localhost"
|
||||
port = 6041
|
||||
groupId = "group1"
|
||||
clientId = "1"
|
||||
try:
|
||||
consumer = taosws.Consumer(conf={
|
||||
"td.connect.websocket.scheme": "ws",
|
||||
"group.id": "group1",
|
||||
"client.id": "1",
|
||||
"group.id": groupId,
|
||||
"client.id": clientId,
|
||||
"auto.offset.reset": "latest",
|
||||
"td.connect.ip": "localhost",
|
||||
"td.connect.port": "6041",
|
||||
"td.connect.ip": host,
|
||||
"td.connect.port": port,
|
||||
"enable.auto.commit": "true",
|
||||
"auto.commit.interval.ms": "1000",
|
||||
})
|
||||
print(f"Create consumer successfully, host: {host}:{port}, groupId: {groupId}, clientId: {clientId}");
|
||||
return consumer;
|
||||
except Exception as err:
|
||||
print(f"Failed to create websocket consumer, err:{err}");
|
||||
print(f"Failed to create websocket consumer, host: {host}:{port} ; err:{err}");
|
||||
raise err
|
||||
|
||||
|
||||
|
@ -108,7 +113,7 @@ def subscribe(consumer):
|
|||
if records:
|
||||
for block in records:
|
||||
for row in block:
|
||||
print(row)
|
||||
print(f"data: {row}")
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to poll data, err:{err}")
|
||||
|
@ -125,10 +130,11 @@ def commit_offset(consumer):
|
|||
if records:
|
||||
for block in records:
|
||||
for row in block:
|
||||
print(row)
|
||||
print(f"data: {row}")
|
||||
|
||||
# after processing the data, commit the offset manually
|
||||
consumer.commit(records)
|
||||
print("commit offset manually successfully.");
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed to poll data, err:{err}")
|
||||
|
@ -141,6 +147,7 @@ def commit_offset(consumer):
|
|||
def unsubscribe(consumer):
|
||||
try:
|
||||
consumer.unsubscribe()
|
||||
print("unsubscribe consumer successfully.");
|
||||
except Exception as err:
|
||||
print("Failed to unsubscribe consumer. err:{err}")
|
||||
|
||||
|
|
|
@ -2,6 +2,21 @@
|
|||
|
||||
set -e
|
||||
|
||||
check_transactions() {
|
||||
for i in {1..30}
|
||||
do
|
||||
output=$(taos -s "show transactions;")
|
||||
if [[ $output == *"Query OK, 0 row(s)"* ]]; then
|
||||
echo "Success: No transactions are in progress."
|
||||
return 0
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
echo "Error: Transactions are still in progress after 30 attempts."
|
||||
return 1
|
||||
}
|
||||
|
||||
taosd >>/dev/null 2>&1 &
|
||||
taosadapter >>/dev/null 2>&1 &
|
||||
sleep 1
|
||||
|
@ -19,60 +34,61 @@ taos -s "drop database if exists power"
|
|||
go run ./sqlquery/main.go
|
||||
|
||||
taos -s "drop database if exists power"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
go run ./queryreqid/main.go
|
||||
|
||||
taos -s "drop database if exists power"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
go run ./stmt/native/main.go
|
||||
|
||||
taos -s "drop database if exists power"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
go run ./stmt/ws/main.go
|
||||
|
||||
taos -s "drop database if exists power"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
sleep 3
|
||||
go run ./schemaless/native/main.go
|
||||
|
||||
taos -s "drop database if exists power"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
go run ./schemaless/ws/main.go
|
||||
|
||||
taos -s "drop topic if exists topic_meters"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
taos -s "drop database if exists power"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
go run ./tmq/native/main.go
|
||||
|
||||
taos -s "drop topic if exists topic_meters"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
taos -s "drop database if exists power"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
go run ./tmq/ws/main.go
|
||||
|
||||
|
||||
taos -s "drop database if exists test"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
go run ./insert/json/main.go
|
||||
taos -s "drop database if exists test"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
go run ./insert/line/main.go
|
||||
taos -s "drop topic if exists topic_meters"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
taos -s "drop database if exists power"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
go run ./insert/sql/main.go
|
||||
taos -s "drop database if exists power"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
go run ./insert/stmt/main.go
|
||||
taos -s "drop database if exists test"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
go run ./insert/telnet/main.go
|
||||
|
||||
go run ./query/sync/main.go
|
||||
|
||||
taos -s "drop topic if exists example_tmq_topic"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
taos -s "drop database if exists example_tmq"
|
||||
sleep 1
|
||||
check_transactions || exit 1
|
||||
go run ./sub/main.go
|
||||
|
|
Loading…
Reference in New Issue