Merge branch 'docs/sheyj-3.0' of github.com:taosdata/TDengine into docs/sheyj-3.0
This commit is contained in:
commit
c33705e4c9
|
@ -6,6 +6,11 @@ namespace TMQExample
|
||||||
{
|
{
|
||||||
internal class SubscribeDemo
|
internal class SubscribeDemo
|
||||||
{
|
{
|
||||||
|
private static string _host = "";
|
||||||
|
private static string _groupId = "";
|
||||||
|
private static string _clientId = "";
|
||||||
|
private static string _topic = "";
|
||||||
|
|
||||||
public static void Main(string[] args)
|
public static void Main(string[] args)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
@ -64,9 +69,9 @@ namespace TMQExample
|
||||||
{
|
{
|
||||||
// ANCHOR: create_consumer
|
// ANCHOR: create_consumer
|
||||||
// consumer config
|
// consumer config
|
||||||
var host = "127.0.0.1";
|
_host = "127.0.0.1";
|
||||||
var groupId = "group1";
|
_groupId = "group1";
|
||||||
var clientId = "client1";
|
_clientId = "client1";
|
||||||
var cfg = new Dictionary<string, string>()
|
var cfg = new Dictionary<string, string>()
|
||||||
{
|
{
|
||||||
{ "td.connect.port", "6030" },
|
{ "td.connect.port", "6030" },
|
||||||
|
@ -74,9 +79,9 @@ namespace TMQExample
|
||||||
{ "msg.with.table.name", "true" },
|
{ "msg.with.table.name", "true" },
|
||||||
{ "enable.auto.commit", "true" },
|
{ "enable.auto.commit", "true" },
|
||||||
{ "auto.commit.interval.ms", "1000" },
|
{ "auto.commit.interval.ms", "1000" },
|
||||||
{ "group.id", groupId },
|
{ "group.id", _groupId },
|
||||||
{ "client.id", clientId },
|
{ "client.id", _clientId },
|
||||||
{ "td.connect.ip", host },
|
{ "td.connect.ip", _host },
|
||||||
{ "td.connect.user", "root" },
|
{ "td.connect.user", "root" },
|
||||||
{ "td.connect.pass", "taosdata" },
|
{ "td.connect.pass", "taosdata" },
|
||||||
};
|
};
|
||||||
|
@ -85,20 +90,32 @@ namespace TMQExample
|
||||||
{
|
{
|
||||||
// create consumer
|
// create consumer
|
||||||
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
|
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
|
||||||
Console.WriteLine("Create consumer successfully, host: " + host + ", groupId: " + groupId +
|
Console.WriteLine(
|
||||||
", clientId: " + clientId);
|
$"Create consumer successfully, " +
|
||||||
|
$"host: {_host}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}");
|
||||||
}
|
}
|
||||||
catch (TDengineError e)
|
catch (TDengineError e)
|
||||||
{
|
{
|
||||||
// handle TDengine error
|
// handle TDengine error
|
||||||
Console.WriteLine("Failed to create native consumer, host: " + host + ", ErrCode:" + e.Code +
|
Console.WriteLine(
|
||||||
", ErrMessage: " + e.Error);
|
$"Failed to create native consumer, " +
|
||||||
|
$"host: {_host}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"ErrCode: {e.Code}, " +
|
||||||
|
$"ErrMessage: {e.Error}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
// handle other exceptions
|
// handle other exceptions
|
||||||
Console.WriteLine("Failed to create native consumer, host: " + host + ", ErrMessage: " + e.Message);
|
Console.WriteLine($"Failed to create native consumer, " +
|
||||||
|
$"host: {_host}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"ErrMessage: {e.Message}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,11 +126,12 @@ namespace TMQExample
|
||||||
static void Consume(IConsumer<Dictionary<string, object>> consumer)
|
static void Consume(IConsumer<Dictionary<string, object>> consumer)
|
||||||
{
|
{
|
||||||
// ANCHOR: subscribe
|
// ANCHOR: subscribe
|
||||||
|
_topic = "topic_meters";
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// subscribe
|
// subscribe
|
||||||
consumer.Subscribe(new List<string>() { "topic_meters" });
|
consumer.Subscribe(new List<string>() { _topic });
|
||||||
Console.WriteLine("subscribe topics successfully");
|
Console.WriteLine("Subscribe topics successfully");
|
||||||
for (int i = 0; i < 50; i++)
|
for (int i = 0; i < 50; i++)
|
||||||
{
|
{
|
||||||
// consume message with using block to ensure the result is disposed
|
// consume message with using block to ensure the result is disposed
|
||||||
|
@ -133,13 +151,23 @@ namespace TMQExample
|
||||||
catch (TDengineError e)
|
catch (TDengineError e)
|
||||||
{
|
{
|
||||||
// handle TDengine error
|
// handle TDengine error
|
||||||
Console.WriteLine("Failed to poll data, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
|
Console.WriteLine(
|
||||||
|
$"Failed to poll data, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"ErrCode: {e.Code}, " +
|
||||||
|
$"ErrMessage: {e.Error}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
// handle other exceptions
|
// handle other exceptions
|
||||||
Console.WriteLine("Failed to poll data, ErrMessage: " + e.Message);
|
Console.WriteLine($"Failed to poll data, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"ErrMessage: {e.Message}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
// ANCHOR_END: subscribe
|
// ANCHOR_END: subscribe
|
||||||
|
@ -152,7 +180,7 @@ namespace TMQExample
|
||||||
{
|
{
|
||||||
// get assignment
|
// get assignment
|
||||||
var assignment = consumer.Assignment;
|
var assignment = consumer.Assignment;
|
||||||
Console.WriteLine($"now assignment: {assignment}");
|
Console.WriteLine($"Now assignment: {assignment}");
|
||||||
// seek to the beginning
|
// seek to the beginning
|
||||||
foreach (var topicPartition in assignment)
|
foreach (var topicPartition in assignment)
|
||||||
{
|
{
|
||||||
|
@ -163,13 +191,25 @@ namespace TMQExample
|
||||||
catch (TDengineError e)
|
catch (TDengineError e)
|
||||||
{
|
{
|
||||||
// handle TDengine error
|
// handle TDengine error
|
||||||
Console.WriteLine("Failed to execute seek example, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
|
Console.WriteLine(
|
||||||
|
$"Failed to execute seek example, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"offset: 0, " +
|
||||||
|
$"ErrCode: {e.Code}, " +
|
||||||
|
$"ErrMessage: {e.Error}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
// handle other exceptions
|
// handle other exceptions
|
||||||
Console.WriteLine("Failed to execute seek example, ErrMessage: " + e.Message);
|
Console.WriteLine($"Failed to execute seek example, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"offset: 0, " +
|
||||||
|
$"ErrMessage: {e.Message}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
// ANCHOR_END: seek
|
// ANCHOR_END: seek
|
||||||
|
@ -180,6 +220,7 @@ namespace TMQExample
|
||||||
// ANCHOR: commit_offset
|
// ANCHOR: commit_offset
|
||||||
for (int i = 0; i < 5; i++)
|
for (int i = 0; i < 5; i++)
|
||||||
{
|
{
|
||||||
|
TopicPartitionOffset topicPartitionOffset = null;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// consume message with using block to ensure the result is disposed
|
// consume message with using block to ensure the result is disposed
|
||||||
|
@ -187,9 +228,10 @@ namespace TMQExample
|
||||||
{
|
{
|
||||||
if (cr == null) continue;
|
if (cr == null) continue;
|
||||||
// commit offset
|
// commit offset
|
||||||
|
topicPartitionOffset = cr.TopicPartitionOffset;
|
||||||
consumer.Commit(new List<TopicPartitionOffset>
|
consumer.Commit(new List<TopicPartitionOffset>
|
||||||
{
|
{
|
||||||
cr.TopicPartitionOffset,
|
topicPartitionOffset,
|
||||||
});
|
});
|
||||||
Console.WriteLine("Commit offset manually successfully.");
|
Console.WriteLine("Commit offset manually successfully.");
|
||||||
}
|
}
|
||||||
|
@ -197,13 +239,26 @@ namespace TMQExample
|
||||||
catch (TDengineError e)
|
catch (TDengineError e)
|
||||||
{
|
{
|
||||||
// handle TDengine error
|
// handle TDengine error
|
||||||
Console.WriteLine("Failed to execute commit example, ErrCode:" + e.Code + ", ErrMessage: " + e.Error);
|
Console.WriteLine(
|
||||||
|
$"Failed to execute commit example, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"offset: {topicPartitionOffset}, " +
|
||||||
|
$"ErrCode: {e.Code}, " +
|
||||||
|
$"ErrMessage: {e.Error}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
// handle other exceptions
|
// handle other exceptions
|
||||||
Console.WriteLine("Failed to execute commit example, ErrMessage: " + e.Message);
|
Console.WriteLine(
|
||||||
|
$"Failed to execute commit example, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"offset: {topicPartitionOffset}, " +
|
||||||
|
$"ErrMessage: {e.Message}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -221,13 +276,24 @@ namespace TMQExample
|
||||||
catch (TDengineError e)
|
catch (TDengineError e)
|
||||||
{
|
{
|
||||||
// handle TDengine error
|
// handle TDengine error
|
||||||
Console.WriteLine("Failed to unsubscribe consumer, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
|
Console.WriteLine(
|
||||||
|
$"Failed to unsubscribe consumer, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"ErrCode: {e.Code}, " +
|
||||||
|
$"ErrMessage: {e.Error}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
// handle other exceptions
|
// handle other exceptions
|
||||||
Console.WriteLine("Failed to unsubscribe consumer, ErrMessage: " + e.Message);
|
Console.WriteLine(
|
||||||
|
$"Failed to execute commit example, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"ErrMessage: {e.Message}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
|
@ -239,4 +305,4 @@ namespace TMQExample
|
||||||
// ANCHOR_END: close
|
// ANCHOR_END: close
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -16,10 +16,10 @@ namespace Examples
|
||||||
var builder = new ConnectionStringBuilder(connectionString);
|
var builder = new ConnectionStringBuilder(connectionString);
|
||||||
using (var client = DbDriver.Open(builder))
|
using (var client = DbDriver.Open(builder))
|
||||||
{
|
{
|
||||||
CreateDatabaseAndTable(client,connectionString);
|
CreateDatabaseAndTable(client, connectionString);
|
||||||
InsertData(client,connectionString);
|
InsertData(client, connectionString);
|
||||||
QueryData(client,connectionString);
|
QueryData(client, connectionString);
|
||||||
QueryWithReqId(client,connectionString);
|
QueryWithReqId(client, connectionString);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (TDengineError e)
|
catch (TDengineError e)
|
||||||
|
@ -52,7 +52,8 @@ namespace Examples
|
||||||
catch (TDengineError e)
|
catch (TDengineError e)
|
||||||
{
|
{
|
||||||
// handle TDengine error
|
// handle TDengine error
|
||||||
Console.WriteLine("Failed to create database power or stable meters, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
|
Console.WriteLine("Failed to create database power or stable meters, ErrCode: " + e.Code +
|
||||||
|
", ErrMessage: " + e.Error);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
|
@ -64,40 +65,43 @@ namespace Examples
|
||||||
// ANCHOR_END: create_db_and_table
|
// ANCHOR_END: create_db_and_table
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void InsertData(ITDengineClient client,string connectionString)
|
private static void InsertData(ITDengineClient client, string connectionString)
|
||||||
{
|
{
|
||||||
// ANCHOR: insert_data
|
// ANCHOR: insert_data
|
||||||
|
// insert data, please make sure the database and table are created before
|
||||||
|
var insertQuery = "INSERT INTO " +
|
||||||
|
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
|
||||||
|
"VALUES " +
|
||||||
|
"(NOW + 1a, 10.30000, 219, 0.31000) " +
|
||||||
|
"(NOW + 2a, 12.60000, 218, 0.33000) " +
|
||||||
|
"(NOW + 3a, 12.30000, 221, 0.31000) " +
|
||||||
|
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
|
||||||
|
"VALUES " +
|
||||||
|
"(NOW + 1a, 10.30000, 218, 0.25000) ";
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// insert data, please make sure the database and table are created before
|
|
||||||
var insertQuery = "INSERT INTO " +
|
|
||||||
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
|
|
||||||
"VALUES " +
|
|
||||||
"(NOW + 1a, 10.30000, 219, 0.31000) " +
|
|
||||||
"(NOW + 2a, 12.60000, 218, 0.33000) " +
|
|
||||||
"(NOW + 3a, 12.30000, 221, 0.31000) " +
|
|
||||||
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
|
|
||||||
"VALUES " +
|
|
||||||
"(NOW + 1a, 10.30000, 218, 0.25000) ";
|
|
||||||
var affectedRows = client.Exec(insertQuery);
|
var affectedRows = client.Exec(insertQuery);
|
||||||
Console.WriteLine("Successfully inserted " + affectedRows + " rows to power.meters.");
|
Console.WriteLine("Successfully inserted " + affectedRows + " rows to power.meters.");
|
||||||
}
|
}
|
||||||
catch (TDengineError e)
|
catch (TDengineError e)
|
||||||
{
|
{
|
||||||
// handle TDengine error
|
// handle TDengine error
|
||||||
Console.WriteLine("Failed to insert data to power.meters, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
|
Console.WriteLine("Failed to insert data to power.meters, sql: " + insertQuery + ", ErrCode: " +
|
||||||
|
e.Code + ", ErrMessage: " +
|
||||||
|
e.Error);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
// handle other exceptions
|
// handle other exceptions
|
||||||
Console.WriteLine("Failed to insert data to power.meters, ErrMessage: " + e.Message);
|
Console.WriteLine("Failed to insert data to power.meters, sql: " + insertQuery + ", ErrMessage: " +
|
||||||
|
e.Message);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
// ANCHOR_END: insert_data
|
// ANCHOR_END: insert_data
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void QueryData(ITDengineClient client,string connectionString)
|
private static void QueryData(ITDengineClient client, string connectionString)
|
||||||
{
|
{
|
||||||
// ANCHOR: select_data
|
// ANCHOR: select_data
|
||||||
// query data, make sure the database and table are created before
|
// query data, make sure the database and table are created before
|
||||||
|
@ -108,6 +112,7 @@ namespace Examples
|
||||||
{
|
{
|
||||||
while (rows.Read())
|
while (rows.Read())
|
||||||
{
|
{
|
||||||
|
// Add your data processing logic here
|
||||||
var ts = (DateTime)rows.GetValue(0);
|
var ts = (DateTime)rows.GetValue(0);
|
||||||
var current = (float)rows.GetValue(1);
|
var current = (float)rows.GetValue(1);
|
||||||
var location = Encoding.UTF8.GetString((byte[])rows.GetValue(2));
|
var location = Encoding.UTF8.GetString((byte[])rows.GetValue(2));
|
||||||
|
@ -119,28 +124,30 @@ namespace Examples
|
||||||
catch (TDengineError e)
|
catch (TDengineError e)
|
||||||
{
|
{
|
||||||
// handle TDengine error
|
// handle TDengine error
|
||||||
Console.WriteLine("Failed to query data from power.meters, sql: " + query + ", ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
|
Console.WriteLine("Failed to query data from power.meters, sql: " + query + ", ErrCode: " + e.Code +
|
||||||
|
", ErrMessage: " + e.Error);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
// handle other exceptions
|
// handle other exceptions
|
||||||
Console.WriteLine("Failed to query data from power.meters, sql: " + query + ", ErrMessage: " + e.Message);
|
Console.WriteLine(
|
||||||
|
"Failed to query data from power.meters, sql: " + query + ", ErrMessage: " + e.Message);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
// ANCHOR_END: select_data
|
// ANCHOR_END: select_data
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void QueryWithReqId(ITDengineClient client,string connectionString)
|
private static void QueryWithReqId(ITDengineClient client, string connectionString)
|
||||||
{
|
{
|
||||||
// ANCHOR: query_id
|
// ANCHOR: query_id
|
||||||
var reqId = (long)3;
|
var reqId = (long)3;
|
||||||
|
// query data
|
||||||
|
var query = "SELECT ts, current, location FROM power.meters limit 1";
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// query data
|
|
||||||
var query = "SELECT ts, current, location FROM power.meters limit 1";
|
|
||||||
// query with request id 3
|
// query with request id 3
|
||||||
using (var rows = client.Query(query,reqId))
|
using (var rows = client.Query(query, reqId))
|
||||||
{
|
{
|
||||||
while (rows.Read())
|
while (rows.Read())
|
||||||
{
|
{
|
||||||
|
@ -155,16 +162,18 @@ namespace Examples
|
||||||
catch (TDengineError e)
|
catch (TDengineError e)
|
||||||
{
|
{
|
||||||
// handle TDengine error
|
// handle TDengine error
|
||||||
Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
|
Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", sql: " + query + ", ErrCode: " +
|
||||||
|
e.Code + ", ErrMessage: " + e.Error);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
// handle other exceptions
|
// handle other exceptions
|
||||||
Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", ErrMessage: " + e.Message);
|
Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", sql: " + query + ", ErrMessage: " +
|
||||||
|
e.Message);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
// ANCHOR_END: query_id
|
// ANCHOR_END: query_id
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -6,6 +6,11 @@ namespace TMQExample
|
||||||
{
|
{
|
||||||
internal class SubscribeDemo
|
internal class SubscribeDemo
|
||||||
{
|
{
|
||||||
|
private static string _host = "";
|
||||||
|
private static string _groupId = "";
|
||||||
|
private static string _clientId = "";
|
||||||
|
private static string _topic = "";
|
||||||
|
|
||||||
public static void Main(string[] args)
|
public static void Main(string[] args)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
@ -68,9 +73,9 @@ namespace TMQExample
|
||||||
{
|
{
|
||||||
// ANCHOR: create_consumer
|
// ANCHOR: create_consumer
|
||||||
// consumer config
|
// consumer config
|
||||||
var host = "127.0.0.1";
|
_host = "127.0.0.1";
|
||||||
var groupId = "group1";
|
_groupId = "group1";
|
||||||
var clientId = "client1";
|
_clientId = "client1";
|
||||||
var cfg = new Dictionary<string, string>()
|
var cfg = new Dictionary<string, string>()
|
||||||
{
|
{
|
||||||
{ "td.connect.type", "WebSocket" },
|
{ "td.connect.type", "WebSocket" },
|
||||||
|
@ -79,9 +84,9 @@ namespace TMQExample
|
||||||
{ "msg.with.table.name", "true" },
|
{ "msg.with.table.name", "true" },
|
||||||
{ "enable.auto.commit", "true" },
|
{ "enable.auto.commit", "true" },
|
||||||
{ "auto.commit.interval.ms", "1000" },
|
{ "auto.commit.interval.ms", "1000" },
|
||||||
{ "group.id", groupId },
|
{ "group.id", _groupId },
|
||||||
{ "client.id", clientId },
|
{ "client.id", _clientId },
|
||||||
{ "td.connect.ip", host },
|
{ "td.connect.ip", _host },
|
||||||
{ "td.connect.user", "root" },
|
{ "td.connect.user", "root" },
|
||||||
{ "td.connect.pass", "taosdata" },
|
{ "td.connect.pass", "taosdata" },
|
||||||
};
|
};
|
||||||
|
@ -90,20 +95,32 @@ namespace TMQExample
|
||||||
{
|
{
|
||||||
// create consumer
|
// create consumer
|
||||||
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
|
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
|
||||||
Console.WriteLine("Create consumer successfully, host: " + host + ", groupId: " + groupId +
|
Console.WriteLine(
|
||||||
", clientId: " + clientId);
|
$"Create consumer successfully, " +
|
||||||
|
$"host: {_host}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}");
|
||||||
}
|
}
|
||||||
catch (TDengineError e)
|
catch (TDengineError e)
|
||||||
{
|
{
|
||||||
// handle TDengine error
|
// handle TDengine error
|
||||||
Console.WriteLine("Failed to create websocket consumer, host: " + host + ", ErrCode: " + e.Code +
|
Console.WriteLine(
|
||||||
", ErrMessage: " + e.Error);
|
$"Failed to create native consumer, " +
|
||||||
|
$"host: {_host}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"ErrCode: {e.Code}, " +
|
||||||
|
$"ErrMessage: {e.Error}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
// handle other exceptions
|
// handle other exceptions
|
||||||
Console.WriteLine("Failed to create websocket consumer, host: " + host + ", ErrMessage: " + e.Message);
|
Console.WriteLine($"Failed to create native consumer, " +
|
||||||
|
$"host: {_host}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"ErrMessage: {e.Message}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,10 +131,11 @@ namespace TMQExample
|
||||||
static void Consume(IConsumer<Dictionary<string, object>> consumer)
|
static void Consume(IConsumer<Dictionary<string, object>> consumer)
|
||||||
{
|
{
|
||||||
// ANCHOR: subscribe
|
// ANCHOR: subscribe
|
||||||
|
_topic = "topic_meters";
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// subscribe
|
// subscribe
|
||||||
consumer.Subscribe(new List<string>() { "topic_meters" });
|
consumer.Subscribe(new List<string>() { _topic });
|
||||||
Console.WriteLine("Subscribe topics successfully");
|
Console.WriteLine("Subscribe topics successfully");
|
||||||
for (int i = 0; i < 50; i++)
|
for (int i = 0; i < 50; i++)
|
||||||
{
|
{
|
||||||
|
@ -138,13 +156,23 @@ namespace TMQExample
|
||||||
catch (TDengineError e)
|
catch (TDengineError e)
|
||||||
{
|
{
|
||||||
// handle TDengine error
|
// handle TDengine error
|
||||||
Console.WriteLine("Failed to poll data, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
|
Console.WriteLine(
|
||||||
|
$"Failed to poll data, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"ErrCode: {e.Code}, " +
|
||||||
|
$"ErrMessage: {e.Error}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
// handle other exceptions
|
// handle other exceptions
|
||||||
Console.WriteLine("Failed to poll data, ErrMessage: " + e.Message);
|
Console.WriteLine($"Failed to poll data, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"ErrMessage: {e.Message}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
// ANCHOR_END: subscribe
|
// ANCHOR_END: subscribe
|
||||||
|
@ -168,13 +196,25 @@ namespace TMQExample
|
||||||
catch (TDengineError e)
|
catch (TDengineError e)
|
||||||
{
|
{
|
||||||
// handle TDengine error
|
// handle TDengine error
|
||||||
Console.WriteLine("Failed to execute seek example, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
|
Console.WriteLine(
|
||||||
|
$"Failed to execute seek example, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"offset: 0, " +
|
||||||
|
$"ErrCode: {e.Code}, " +
|
||||||
|
$"ErrMessage: {e.Error}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
// handle other exceptions
|
// handle other exceptions
|
||||||
Console.WriteLine("Failed to execute seek example, ErrMessage: " + e.Message);
|
Console.WriteLine($"Failed to execute seek example, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"offset: 0, " +
|
||||||
|
$"ErrMessage: {e.Message}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
// ANCHOR_END: seek
|
// ANCHOR_END: seek
|
||||||
|
@ -185,6 +225,7 @@ namespace TMQExample
|
||||||
// ANCHOR: commit_offset
|
// ANCHOR: commit_offset
|
||||||
for (int i = 0; i < 5; i++)
|
for (int i = 0; i < 5; i++)
|
||||||
{
|
{
|
||||||
|
TopicPartitionOffset topicPartitionOffset = null;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// consume message with using block to ensure the result is disposed
|
// consume message with using block to ensure the result is disposed
|
||||||
|
@ -192,9 +233,10 @@ namespace TMQExample
|
||||||
{
|
{
|
||||||
if (cr == null) continue;
|
if (cr == null) continue;
|
||||||
// commit offset
|
// commit offset
|
||||||
|
topicPartitionOffset = cr.TopicPartitionOffset;
|
||||||
consumer.Commit(new List<TopicPartitionOffset>
|
consumer.Commit(new List<TopicPartitionOffset>
|
||||||
{
|
{
|
||||||
cr.TopicPartitionOffset,
|
topicPartitionOffset,
|
||||||
});
|
});
|
||||||
Console.WriteLine("Commit offset manually successfully.");
|
Console.WriteLine("Commit offset manually successfully.");
|
||||||
}
|
}
|
||||||
|
@ -202,13 +244,26 @@ namespace TMQExample
|
||||||
catch (TDengineError e)
|
catch (TDengineError e)
|
||||||
{
|
{
|
||||||
// handle TDengine error
|
// handle TDengine error
|
||||||
Console.WriteLine("Failed to execute commit example, ErrCode: " + e.Code + ", ErrMessage: " + e.Error);
|
Console.WriteLine(
|
||||||
|
$"Failed to execute commit example, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"offset: {topicPartitionOffset}, " +
|
||||||
|
$"ErrCode: {e.Code}, " +
|
||||||
|
$"ErrMessage: {e.Error}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
// handle other exceptions
|
// handle other exceptions
|
||||||
Console.WriteLine("Failed to execute commit example, ErrMessage: " + e.Message);
|
Console.WriteLine(
|
||||||
|
$"Failed to execute commit example, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"offset: {topicPartitionOffset}, " +
|
||||||
|
$"ErrMessage: {e.Message}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -226,13 +281,24 @@ namespace TMQExample
|
||||||
catch (TDengineError e)
|
catch (TDengineError e)
|
||||||
{
|
{
|
||||||
// handle TDengine error
|
// handle TDengine error
|
||||||
Console.WriteLine("Failed to unsubscribe consumer, ErrCode :" + e.Code + ", ErrMessage: " + e.Error);
|
Console.WriteLine(
|
||||||
|
$"Failed to unsubscribe consumer, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"ErrCode: {e.Code}, " +
|
||||||
|
$"ErrMessage: {e.Error}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
// handle other exceptions
|
// handle other exceptions
|
||||||
Console.WriteLine("Failed to unsubscribe consumer, ErrMessage: " + e.Message);
|
Console.WriteLine(
|
||||||
|
$"Failed to execute commit example, " +
|
||||||
|
$"topic: {_topic}, " +
|
||||||
|
$"groupId: {_groupId}, " +
|
||||||
|
$"clientId: {_clientId}, " +
|
||||||
|
$"ErrMessage: {e.Message}");
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
|
@ -244,4 +310,4 @@ namespace TMQExample
|
||||||
// ANCHOR_END: close
|
// ANCHOR_END: close
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,58 +1,60 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
_ "github.com/taosdata/driver-go/v3/taosSql"
|
_ "github.com/taosdata/driver-go/v3/taosSql"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
taosDSN := "root:taosdata@tcp(localhost:6030)/"
|
taosDSN := "root:taosdata@tcp(localhost:6030)/"
|
||||||
db, err := sql.Open("taosSql", taosDSN)
|
db, err := sql.Open("taosSql", taosDSN)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error())
|
log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error())
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
initEnv(db)
|
initEnv(db)
|
||||||
// ANCHOR: query_id
|
// ANCHOR: query_id
|
||||||
// use context to set request id
|
// use context to set request id
|
||||||
reqId := int64(3)
|
reqId := int64(3)
|
||||||
ctx := context.WithValue(context.Background(), "taos_req_id", reqId)
|
ctx := context.WithValue(context.Background(), "taos_req_id", reqId)
|
||||||
// execute query with context
|
// execute query with context
|
||||||
rows, err := db.QueryContext(ctx, "SELECT ts, current, location FROM power.meters limit 1")
|
querySql := "SELECT ts, current, location FROM power.meters limit 1"
|
||||||
if err != nil {
|
rows, err := db.QueryContext(ctx, querySql)
|
||||||
log.Fatalf("Failed to execute sql with reqId: %d, url: %s; ErrMessage: %s\n", reqId, taosDSN, err.Error())
|
if err != nil {
|
||||||
}
|
log.Fatalf("Failed to execute sql with reqId: %d, url: %s, sql: %s, ErrMessage: %s\n", reqId, taosDSN, querySql, err.Error())
|
||||||
for rows.Next() {
|
}
|
||||||
var (
|
for rows.Next() {
|
||||||
ts time.Time
|
// Add your data processing logic here
|
||||||
current float32
|
var (
|
||||||
location string
|
ts time.Time
|
||||||
)
|
current float32
|
||||||
err = rows.Scan(&ts, ¤t, &location)
|
location string
|
||||||
if err != nil {
|
)
|
||||||
log.Fatal("Scan error: ", err)
|
err = rows.Scan(&ts, ¤t, &location)
|
||||||
}
|
if err != nil {
|
||||||
fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location)
|
log.Fatalf("Failed to scan data, reqId: %d, url:%s, sql: %s, ErrMessage: %s\n", reqId, taosDSN, querySql, err)
|
||||||
}
|
}
|
||||||
// ANCHOR_END: query_id
|
fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location)
|
||||||
}
|
}
|
||||||
|
// ANCHOR_END: query_id
|
||||||
func initEnv(conn *sql.DB) {
|
}
|
||||||
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
|
|
||||||
if err != nil {
|
func initEnv(conn *sql.DB) {
|
||||||
log.Fatal("Create database power error: ", err)
|
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
|
||||||
}
|
if err != nil {
|
||||||
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
|
log.Fatal("Create database power error: ", err)
|
||||||
if err != nil {
|
}
|
||||||
log.Fatal("Create stable meters error: ", err)
|
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
|
||||||
}
|
if err != nil {
|
||||||
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)")
|
log.Fatal("Create stable meters error: ", err)
|
||||||
if err != nil {
|
}
|
||||||
log.Fatal("Insert data to power.meters error: ", err)
|
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)")
|
||||||
}
|
if err != nil {
|
||||||
}
|
log.Fatal("Insert data to power.meters error: ", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,86 +1,86 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
_ "github.com/taosdata/driver-go/v3/taosSql"
|
_ "github.com/taosdata/driver-go/v3/taosSql"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
var taosDSN = "root:taosdata@tcp(localhost:6030)/"
|
var taosDSN = "root:taosdata@tcp(localhost:6030)/"
|
||||||
db, err := sql.Open("taosSql", taosDSN)
|
db, err := sql.Open("taosSql", taosDSN)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error())
|
log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error())
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
// ANCHOR: create_db_and_table
|
// ANCHOR: create_db_and_table
|
||||||
// create database
|
// create database
|
||||||
res, err := db.Exec("CREATE DATABASE IF NOT EXISTS power")
|
res, err := db.Exec("CREATE DATABASE IF NOT EXISTS power")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
|
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
|
||||||
}
|
}
|
||||||
rowsAffected, err := res.RowsAffected()
|
rowsAffected, err := res.RowsAffected()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln("Failed to get create database rowsAffected, ErrMessage: " + err.Error())
|
log.Fatalln("Failed to get create database rowsAffected, ErrMessage: " + err.Error())
|
||||||
}
|
}
|
||||||
// you can check rowsAffected here
|
// you can check rowsAffected here
|
||||||
fmt.Println("Create database power successfully, rowsAffected: ", rowsAffected)
|
fmt.Println("Create database power successfully, rowsAffected: ", rowsAffected)
|
||||||
// create table
|
// create table
|
||||||
res, err = db.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
|
res, err = db.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln("Failed to create stable meters, ErrMessage: " + err.Error())
|
log.Fatalln("Failed to create stable meters, ErrMessage: " + err.Error())
|
||||||
}
|
}
|
||||||
rowsAffected, err = res.RowsAffected()
|
rowsAffected, err = res.RowsAffected()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln("Failed to get create stable rowsAffected, ErrMessage: " + err.Error())
|
log.Fatalln("Failed to get create stable rowsAffected, ErrMessage: " + err.Error())
|
||||||
}
|
}
|
||||||
// you can check rowsAffected here
|
// you can check rowsAffected here
|
||||||
fmt.Println("Create stable power.meters successfully, rowsAffected:", rowsAffected)
|
fmt.Println("Create stable power.meters successfully, rowsAffected:", rowsAffected)
|
||||||
// ANCHOR_END: create_db_and_table
|
// ANCHOR_END: create_db_and_table
|
||||||
// ANCHOR: insert_data
|
// ANCHOR: insert_data
|
||||||
// insert data, please make sure the database and table are created before
|
// insert data, please make sure the database and table are created before
|
||||||
insertQuery := "INSERT INTO " +
|
insertQuery := "INSERT INTO " +
|
||||||
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
|
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
|
||||||
"VALUES " +
|
"VALUES " +
|
||||||
"(NOW + 1a, 10.30000, 219, 0.31000) " +
|
"(NOW + 1a, 10.30000, 219, 0.31000) " +
|
||||||
"(NOW + 2a, 12.60000, 218, 0.33000) " +
|
"(NOW + 2a, 12.60000, 218, 0.33000) " +
|
||||||
"(NOW + 3a, 12.30000, 221, 0.31000) " +
|
"(NOW + 3a, 12.30000, 221, 0.31000) " +
|
||||||
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
|
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
|
||||||
"VALUES " +
|
"VALUES " +
|
||||||
"(NOW + 1a, 10.30000, 218, 0.25000) "
|
"(NOW + 1a, 10.30000, 218, 0.25000) "
|
||||||
res, err = db.Exec(insertQuery)
|
res, err = db.Exec(insertQuery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Failed to insert data to power.meters, ErrMessage: " + err.Error())
|
log.Fatalf("Failed to insert data to power.meters, sql: %s, ErrMessage: %s\n", insertQuery, err.Error())
|
||||||
}
|
}
|
||||||
rowsAffected, err = res.RowsAffected()
|
rowsAffected, err = res.RowsAffected()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Failed to get insert rowsAffected, ErrMessage: " + err.Error())
|
log.Fatalf("Failed to get insert rowsAffected, sql: %s, ErrMessage: %s\n", insertQuery, err.Error())
|
||||||
}
|
}
|
||||||
// you can check affectedRows here
|
// you can check affectedRows here
|
||||||
fmt.Printf("Successfully inserted %d rows to power.meters.\n", rowsAffected)
|
fmt.Printf("Successfully inserted %d rows to power.meters.\n", rowsAffected)
|
||||||
// ANCHOR_END: insert_data
|
// ANCHOR_END: insert_data
|
||||||
// ANCHOR: select_data
|
// ANCHOR: select_data
|
||||||
// query data, make sure the database and table are created before
|
// query data, make sure the database and table are created before
|
||||||
sql := "SELECT ts, current, location FROM power.meters limit 100"
|
sql := "SELECT ts, current, location FROM power.meters limit 100"
|
||||||
rows, err := db.Query(sql)
|
rows, err := db.Query(sql)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Failed to query data from power.meters, ErrMessage: " + err.Error())
|
log.Fatalf("Failed to query data from power.meters, sql: %s, ErrMessage: %s\n", sql, err.Error())
|
||||||
}
|
}
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var (
|
// Add your data processing logic here
|
||||||
ts time.Time
|
var (
|
||||||
current float32
|
ts time.Time
|
||||||
location string
|
current float32
|
||||||
)
|
location string
|
||||||
err = rows.Scan(&ts, ¤t, &location)
|
)
|
||||||
if err != nil {
|
err = rows.Scan(&ts, ¤t, &location)
|
||||||
log.Fatal("Failed to scan data, sql:" + sql + ", ErrMessage: " + err.Error())
|
if err != nil {
|
||||||
}
|
log.Fatalf("Failed to scan data, sql: %s, ErrMessage: %s\n", sql, err)
|
||||||
// you can check data here
|
}
|
||||||
fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location)
|
fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location)
|
||||||
}
|
}
|
||||||
// ANCHOR_END: select_data
|
// ANCHOR_END: select_data
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,136 +1,179 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/taosdata/driver-go/v3/af/tmq"
|
"github.com/taosdata/driver-go/v3/af/tmq"
|
||||||
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
|
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
|
||||||
_ "github.com/taosdata/driver-go/v3/taosSql"
|
_ "github.com/taosdata/driver-go/v3/taosSql"
|
||||||
)
|
)
|
||||||
|
|
||||||
var done = make(chan struct{})
|
var done = make(chan struct{})
|
||||||
|
var groupID string
|
||||||
func main() {
|
var clientID string
|
||||||
// init env
|
var host string
|
||||||
taosDSN := "root:taosdata@tcp(127.0.0.1:6030)/"
|
var topic string
|
||||||
conn, err := sql.Open("taosSql", taosDSN)
|
|
||||||
if err != nil {
|
func main() {
|
||||||
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error())
|
// init env
|
||||||
}
|
taosDSN := "root:taosdata@tcp(127.0.0.1:6030)/"
|
||||||
defer func() {
|
conn, err := sql.Open("taosSql", taosDSN)
|
||||||
conn.Close()
|
if err != nil {
|
||||||
}()
|
log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error())
|
||||||
initEnv(conn)
|
}
|
||||||
// ANCHOR: create_consumer
|
defer func() {
|
||||||
// create consumer
|
conn.Close()
|
||||||
groupID := "group1"
|
}()
|
||||||
clientID := "client1"
|
initEnv(conn)
|
||||||
host := "127.0.0.1"
|
// ANCHOR: create_consumer
|
||||||
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
|
// create consumer
|
||||||
"td.connect.user": "root",
|
groupID = "group1"
|
||||||
"td.connect.pass": "taosdata",
|
clientID = "client1"
|
||||||
"auto.offset.reset": "latest",
|
host = "127.0.0.1"
|
||||||
"msg.with.table.name": "true",
|
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
|
||||||
"enable.auto.commit": "true",
|
"td.connect.user": "root",
|
||||||
"auto.commit.interval.ms": "1000",
|
"td.connect.pass": "taosdata",
|
||||||
"group.id": groupID,
|
"auto.offset.reset": "latest",
|
||||||
"client.id": clientID,
|
"msg.with.table.name": "true",
|
||||||
})
|
"enable.auto.commit": "true",
|
||||||
if err != nil {
|
"auto.commit.interval.ms": "1000",
|
||||||
log.Fatalln("Failed to create native consumer, host : " + host + "; ErrMessage: " + err.Error())
|
"group.id": groupID,
|
||||||
}
|
"client.id": clientID,
|
||||||
log.Println("Create consumer successfully, host: " + host + ", groupId: " + groupID + ", clientId: " + clientID)
|
})
|
||||||
|
if err != nil {
|
||||||
// ANCHOR_END: create_consumer
|
log.Fatalf(
|
||||||
// ANCHOR: subscribe
|
"Failed to create native consumer, host: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
|
||||||
err = consumer.Subscribe("topic_meters", nil)
|
host,
|
||||||
if err != nil {
|
groupID,
|
||||||
log.Fatalln("Failed to subscribe topic_meters, ErrMessage: " + err.Error())
|
clientID,
|
||||||
}
|
err.Error(),
|
||||||
log.Println("Subscribe topics successfully")
|
)
|
||||||
for i := 0; i < 50; i++ {
|
}
|
||||||
ev := consumer.Poll(100)
|
log.Printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s\n", host, groupID, clientID)
|
||||||
if ev != nil {
|
|
||||||
switch e := ev.(type) {
|
// ANCHOR_END: create_consumer
|
||||||
case *tmqcommon.DataMessage:
|
// ANCHOR: subscribe
|
||||||
// process your data here
|
topic = "topic_meters"
|
||||||
fmt.Printf("data:%v\n", e)
|
err = consumer.Subscribe(topic, nil)
|
||||||
// ANCHOR: commit_offset
|
if err != nil {
|
||||||
// commit offset
|
log.Fatalf(
|
||||||
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
|
"Failed to subscribe topic_meters, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
|
||||||
if err != nil {
|
topic,
|
||||||
log.Fatalln("Failed to commit offset, ErrMessage: " + err.Error())
|
groupID,
|
||||||
}
|
clientID,
|
||||||
log.Println("Commit offset manually successfully.")
|
err.Error(),
|
||||||
// ANCHOR_END: commit_offset
|
)
|
||||||
case tmqcommon.Error:
|
}
|
||||||
fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
|
log.Println("Subscribe topics successfully")
|
||||||
log.Fatalln("Failed to poll data, ErrMessage: " + err.Error())
|
for i := 0; i < 50; i++ {
|
||||||
}
|
ev := consumer.Poll(100)
|
||||||
}
|
if ev != nil {
|
||||||
}
|
switch e := ev.(type) {
|
||||||
// ANCHOR_END: subscribe
|
case *tmqcommon.DataMessage:
|
||||||
// ANCHOR: seek
|
// process your data here
|
||||||
// get assignment
|
fmt.Printf("data:%v\n", e)
|
||||||
partitions, err := consumer.Assignment()
|
// ANCHOR: commit_offset
|
||||||
if err != nil {
|
// commit offset
|
||||||
log.Fatal("Failed to get assignment, ErrMessage: " + err.Error())
|
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
|
||||||
}
|
if err != nil {
|
||||||
fmt.Println("Now assignment:", partitions)
|
log.Fatalf(
|
||||||
for i := 0; i < len(partitions); i++ {
|
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
|
||||||
// seek to the beginning
|
topic,
|
||||||
err = consumer.Seek(tmqcommon.TopicPartition{
|
groupID,
|
||||||
Topic: partitions[i].Topic,
|
clientID,
|
||||||
Partition: partitions[i].Partition,
|
e.TopicPartition,
|
||||||
Offset: 0,
|
err.Error(),
|
||||||
}, 0)
|
)
|
||||||
if err != nil {
|
}
|
||||||
log.Fatalln("Failed to execute seek example, ErrMessage: " + err.Error())
|
log.Println("Commit offset manually successfully.")
|
||||||
}
|
// ANCHOR_END: commit_offset
|
||||||
}
|
case tmqcommon.Error:
|
||||||
fmt.Println("Assignment seek to beginning successfully")
|
log.Fatalf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", topic, groupID, clientID, e.Error())
|
||||||
// ANCHOR_END: seek
|
}
|
||||||
// ANCHOR: close
|
}
|
||||||
// unsubscribe
|
}
|
||||||
err = consumer.Unsubscribe()
|
// ANCHOR_END: subscribe
|
||||||
if err != nil {
|
// ANCHOR: seek
|
||||||
log.Fatal("Failed to unsubscribe consumer, ErrMessage: " + err.Error())
|
// get assignment
|
||||||
}
|
partitions, err := consumer.Assignment()
|
||||||
fmt.Println("Consumer unsubscribed successfully.")
|
if err != nil {
|
||||||
// close consumer
|
log.Fatalf("Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", topic, groupID, clientID, err.Error())
|
||||||
err = consumer.Close()
|
}
|
||||||
if err != nil {
|
fmt.Println("Now assignment:", partitions)
|
||||||
log.Fatal("Failed to close consumer, ErrMessage: " + err.Error())
|
for i := 0; i < len(partitions); i++ {
|
||||||
}
|
// seek to the beginning
|
||||||
fmt.Println("Consumer closed successfully.")
|
err = consumer.Seek(tmqcommon.TopicPartition{
|
||||||
// ANCHOR_END: close
|
Topic: partitions[i].Topic,
|
||||||
<-done
|
Partition: partitions[i].Partition,
|
||||||
}
|
Offset: 0,
|
||||||
|
}, 0)
|
||||||
func initEnv(conn *sql.DB) {
|
if err != nil {
|
||||||
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
|
log.Fatalf(
|
||||||
if err != nil {
|
"Failed to execute seek example, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n",
|
||||||
log.Fatal("Failed to create database, ErrMessage: " + err.Error())
|
topic,
|
||||||
}
|
groupID,
|
||||||
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
|
clientID,
|
||||||
if err != nil {
|
partitions[i].Partition,
|
||||||
log.Fatal("Failed to create stable, ErrMessage: " + err.Error())
|
0,
|
||||||
}
|
err.Error(),
|
||||||
_, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters")
|
)
|
||||||
if err != nil {
|
}
|
||||||
log.Fatal("Failed to create topic, ErrMessage: " + err.Error())
|
}
|
||||||
}
|
fmt.Println("Assignment seek to beginning successfully")
|
||||||
go func() {
|
// ANCHOR_END: seek
|
||||||
for i := 0; i < 10; i++ {
|
// ANCHOR: close
|
||||||
time.Sleep(time.Second)
|
// unsubscribe
|
||||||
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)")
|
err = consumer.Unsubscribe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Failed to insert data, ErrMessage: " + err.Error())
|
log.Fatalf(
|
||||||
}
|
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
|
||||||
}
|
topic,
|
||||||
done <- struct{}{}
|
groupID,
|
||||||
}()
|
clientID,
|
||||||
}
|
err.Error(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
fmt.Println("Consumer unsubscribed successfully.")
|
||||||
|
// close consumer
|
||||||
|
err = consumer.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf(
|
||||||
|
"Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
|
||||||
|
topic,
|
||||||
|
groupID,
|
||||||
|
clientID,
|
||||||
|
err.Error(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
fmt.Println("Consumer closed successfully.")
|
||||||
|
// ANCHOR_END: close
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
|
func initEnv(conn *sql.DB) {
|
||||||
|
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to create database, ErrMessage: " + err.Error())
|
||||||
|
}
|
||||||
|
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to create stable, ErrMessage: " + err.Error())
|
||||||
|
}
|
||||||
|
_, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to create topic, ErrMessage: " + err.Error())
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to insert data, ErrMessage: " + err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
done <- struct{}{}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
|
@ -1,141 +1,197 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/taosdata/driver-go/v3/common"
|
"github.com/taosdata/driver-go/v3/common"
|
||||||
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
|
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
|
||||||
_ "github.com/taosdata/driver-go/v3/taosWS"
|
_ "github.com/taosdata/driver-go/v3/taosWS"
|
||||||
"github.com/taosdata/driver-go/v3/ws/tmq"
|
"github.com/taosdata/driver-go/v3/ws/tmq"
|
||||||
)
|
)
|
||||||
|
|
||||||
var done = make(chan struct{})
|
var done = make(chan struct{})
|
||||||
|
var groupID string
|
||||||
func main() {
|
var clientID string
|
||||||
// init env
|
var host string
|
||||||
taosDSN := "root:taosdata@ws(127.0.0.1:6041)/"
|
var topic string
|
||||||
conn, err := sql.Open("taosWS", taosDSN)
|
|
||||||
if err != nil {
|
func main() {
|
||||||
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error())
|
// init env
|
||||||
}
|
taosDSN := "root:taosdata@ws(127.0.0.1:6041)/"
|
||||||
defer func() {
|
conn, err := sql.Open("taosWS", taosDSN)
|
||||||
conn.Close()
|
if err != nil {
|
||||||
}()
|
log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error())
|
||||||
initEnv(conn)
|
}
|
||||||
// ANCHOR: create_consumer
|
defer func() {
|
||||||
// create consumer
|
conn.Close()
|
||||||
wsUrl := "ws://127.0.0.1:6041"
|
}()
|
||||||
groupID := "group1"
|
initEnv(conn)
|
||||||
clientID := "client1"
|
// ANCHOR: create_consumer
|
||||||
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
|
// create consumer
|
||||||
"ws.url": wsUrl,
|
wsUrl := "ws://127.0.0.1:6041"
|
||||||
"ws.message.channelLen": uint(0),
|
groupID = "group1"
|
||||||
"ws.message.timeout": common.DefaultMessageTimeout,
|
clientID = "client1"
|
||||||
"ws.message.writeWait": common.DefaultWriteWait,
|
host = "127.0.0.1"
|
||||||
"td.connect.user": "root",
|
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
|
||||||
"td.connect.pass": "taosdata",
|
"ws.url": wsUrl,
|
||||||
"auto.offset.reset": "latest",
|
"ws.message.channelLen": uint(0),
|
||||||
"msg.with.table.name": "true",
|
"ws.message.timeout": common.DefaultMessageTimeout,
|
||||||
"enable.auto.commit": "true",
|
"ws.message.writeWait": common.DefaultWriteWait,
|
||||||
"auto.commit.interval.ms": "1000",
|
"td.connect.user": "root",
|
||||||
"group.id": groupID,
|
"td.connect.pass": "taosdata",
|
||||||
"client.id": clientID,
|
"auto.offset.reset": "latest",
|
||||||
})
|
"msg.with.table.name": "true",
|
||||||
if err != nil {
|
"enable.auto.commit": "true",
|
||||||
log.Fatalln("Failed to create websocket consumer, host : " + wsUrl + "; ErrMessage: " + err.Error())
|
"auto.commit.interval.ms": "1000",
|
||||||
}
|
"group.id": groupID,
|
||||||
log.Println("Create consumer successfully, host: " + wsUrl + ", groupId: " + groupID + ", clientId: " + clientID)
|
"client.id": clientID,
|
||||||
|
})
|
||||||
// ANCHOR_END: create_consumer
|
if err != nil {
|
||||||
// ANCHOR: subscribe
|
log.Fatalf(
|
||||||
err = consumer.Subscribe("topic_meters", nil)
|
"Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
|
||||||
if err != nil {
|
host,
|
||||||
log.Fatalln("Failed to subscribe topic_meters, ErrMessage: " + err.Error())
|
groupID,
|
||||||
}
|
clientID,
|
||||||
log.Println("Subscribe topics successfully")
|
err.Error(),
|
||||||
for i := 0; i < 50; i++ {
|
)
|
||||||
ev := consumer.Poll(100)
|
}
|
||||||
if ev != nil {
|
log.Printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s\n", host, groupID, clientID)
|
||||||
switch e := ev.(type) {
|
|
||||||
case *tmqcommon.DataMessage:
|
// ANCHOR_END: create_consumer
|
||||||
// process your data here
|
// ANCHOR: subscribe
|
||||||
fmt.Printf("data:%v\n", e)
|
topic = "topic_meters"
|
||||||
// ANCHOR: commit_offset
|
err = consumer.Subscribe(topic, nil)
|
||||||
// commit offset
|
if err != nil {
|
||||||
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
|
log.Fatalf(
|
||||||
if err != nil {
|
"Failed to subscribe topic_meters, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
|
||||||
log.Fatalln("Failed to commit offset, ErrMessage: " + err.Error())
|
topic,
|
||||||
}
|
groupID,
|
||||||
log.Println("Commit offset manually successfully.")
|
clientID,
|
||||||
// ANCHOR_END: commit_offset
|
err.Error(),
|
||||||
case tmqcommon.Error:
|
)
|
||||||
fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
|
}
|
||||||
log.Fatalln("Failed to poll data, ErrMessage: " + err.Error())
|
log.Println("Subscribe topics successfully")
|
||||||
}
|
for i := 0; i < 50; i++ {
|
||||||
}
|
ev := consumer.Poll(100)
|
||||||
}
|
if ev != nil {
|
||||||
// ANCHOR_END: subscribe
|
switch e := ev.(type) {
|
||||||
// ANCHOR: seek
|
case *tmqcommon.DataMessage:
|
||||||
// get assignment
|
// process your data here
|
||||||
partitions, err := consumer.Assignment()
|
fmt.Printf("data:%v\n", e)
|
||||||
if err != nil {
|
// ANCHOR: commit_offset
|
||||||
log.Fatal("Failed to get assignment, ErrMessage: " + err.Error())
|
// commit offset
|
||||||
}
|
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
|
||||||
fmt.Println("Now assignment:", partitions)
|
if err != nil {
|
||||||
for i := 0; i < len(partitions); i++ {
|
log.Fatalf(
|
||||||
// seek to the beginning
|
"Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
|
||||||
err = consumer.Seek(tmqcommon.TopicPartition{
|
topic,
|
||||||
Topic: partitions[i].Topic,
|
groupID,
|
||||||
Partition: partitions[i].Partition,
|
clientID,
|
||||||
Offset: 0,
|
e.TopicPartition,
|
||||||
}, 0)
|
err.Error(),
|
||||||
if err != nil {
|
)
|
||||||
log.Fatalln("Failed to execute seek example, ErrMessage: " + err.Error())
|
}
|
||||||
}
|
log.Println("Commit offset manually successfully.")
|
||||||
}
|
// ANCHOR_END: commit_offset
|
||||||
fmt.Println("Assignment seek to beginning successfully")
|
case tmqcommon.Error:
|
||||||
// ANCHOR_END: seek
|
log.Fatalf(
|
||||||
// ANCHOR: close
|
"Failed to poll data, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
|
||||||
// unsubscribe
|
topic,
|
||||||
err = consumer.Unsubscribe()
|
groupID,
|
||||||
if err != nil {
|
clientID,
|
||||||
log.Fatal("Failed to unsubscribe consumer, ErrMessage: " + err.Error())
|
e.Error(),
|
||||||
}
|
)
|
||||||
fmt.Println("Consumer unsubscribed successfully.")
|
}
|
||||||
// close consumer
|
}
|
||||||
err = consumer.Close()
|
}
|
||||||
if err != nil {
|
// ANCHOR_END: subscribe
|
||||||
log.Fatal("Failed to close consumer, ErrMessage: " + err.Error())
|
// ANCHOR: seek
|
||||||
}
|
// get assignment
|
||||||
fmt.Println("Consumer closed successfully.")
|
partitions, err := consumer.Assignment()
|
||||||
// ANCHOR_END: close
|
if err != nil {
|
||||||
<-done
|
log.Fatalf(
|
||||||
}
|
"Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
|
||||||
|
topic,
|
||||||
func initEnv(conn *sql.DB) {
|
groupID,
|
||||||
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
|
clientID,
|
||||||
if err != nil {
|
err.Error(),
|
||||||
log.Fatal("Failed to create database, ErrMessage: " + err.Error())
|
)
|
||||||
}
|
}
|
||||||
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
|
fmt.Println("Now assignment:", partitions)
|
||||||
if err != nil {
|
for i := 0; i < len(partitions); i++ {
|
||||||
log.Fatal("Failed to create stable, ErrMessage: " + err.Error())
|
// seek to the beginning
|
||||||
}
|
err = consumer.Seek(tmqcommon.TopicPartition{
|
||||||
_, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters")
|
Topic: partitions[i].Topic,
|
||||||
if err != nil {
|
Partition: partitions[i].Partition,
|
||||||
log.Fatal("Failed to create topic, ErrMessage: " + err.Error())
|
Offset: 0,
|
||||||
}
|
}, 0)
|
||||||
go func() {
|
if err != nil {
|
||||||
for i := 0; i < 10; i++ {
|
log.Fatalf(
|
||||||
time.Sleep(time.Second)
|
"Failed to execute seek example, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n",
|
||||||
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)")
|
topic,
|
||||||
if err != nil {
|
groupID,
|
||||||
log.Fatal("Failed to insert data, ErrMessage: " + err.Error())
|
clientID,
|
||||||
}
|
partitions[i].Partition,
|
||||||
}
|
0,
|
||||||
done <- struct{}{}
|
err.Error(),
|
||||||
}()
|
)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
fmt.Println("Assignment seek to beginning successfully")
|
||||||
|
// ANCHOR_END: seek
|
||||||
|
// ANCHOR: close
|
||||||
|
// unsubscribe
|
||||||
|
err = consumer.Unsubscribe()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf(
|
||||||
|
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
|
||||||
|
topic,
|
||||||
|
groupID,
|
||||||
|
clientID,
|
||||||
|
err.Error(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
fmt.Println("Consumer unsubscribed successfully.")
|
||||||
|
// close consumer
|
||||||
|
err = consumer.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf(
|
||||||
|
"Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
|
||||||
|
topic,
|
||||||
|
groupID,
|
||||||
|
clientID,
|
||||||
|
err.Error(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
fmt.Println("Consumer closed successfully.")
|
||||||
|
// ANCHOR_END: close
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
|
func initEnv(conn *sql.DB) {
|
||||||
|
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to create database, ErrMessage: " + err.Error())
|
||||||
|
}
|
||||||
|
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to create stable, ErrMessage: " + err.Error())
|
||||||
|
}
|
||||||
|
_, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to create topic, ErrMessage: " + err.Error())
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Failed to insert data, ErrMessage: " + err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
done <- struct{}{}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue