Merge branch 'docs/sheyj-3.0' of github.com:taosdata/TDengine into docs/sheyj-3.0

This commit is contained in:
Yaming Pei 2024-08-16 17:11:07 +08:00
commit ad0e6ef85e
7 changed files with 744 additions and 495 deletions

View File

@ -6,6 +6,11 @@ namespace TMQExample
{ {
internal class SubscribeDemo internal class SubscribeDemo
{ {
private static string _host = "";
private static string _groupId = "";
private static string _clientId = "";
private static string _topic = "";
public static void Main(string[] args) public static void Main(string[] args)
{ {
try try
@ -64,9 +69,9 @@ namespace TMQExample
{ {
// ANCHOR: create_consumer // ANCHOR: create_consumer
// consumer config // consumer config
var host = "127.0.0.1"; _host = "127.0.0.1";
var groupId = "group1"; _groupId = "group1";
var clientId = "client1"; _clientId = "client1";
var cfg = new Dictionary<string, string>() var cfg = new Dictionary<string, string>()
{ {
{ "td.connect.port", "6030" }, { "td.connect.port", "6030" },
@ -74,9 +79,9 @@ namespace TMQExample
{ "msg.with.table.name", "true" }, { "msg.with.table.name", "true" },
{ "enable.auto.commit", "true" }, { "enable.auto.commit", "true" },
{ "auto.commit.interval.ms", "1000" }, { "auto.commit.interval.ms", "1000" },
{ "group.id", groupId }, { "group.id", _groupId },
{ "client.id", clientId }, { "client.id", _clientId },
{ "td.connect.ip", host }, { "td.connect.ip", _host },
{ "td.connect.user", "root" }, { "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" }, { "td.connect.pass", "taosdata" },
}; };
@ -85,20 +90,33 @@ namespace TMQExample
{ {
// create consumer // create consumer
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build(); consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
Console.WriteLine("Create consumer successfully, host: " + host + ", groupId: " + groupId + Console.WriteLine(
", clientId: " + clientId); $"Create consumer successfully, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}");
} }
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("Failed to create native consumer, host: " + host + ", ErrCode:" + e.Code + Console.WriteLine(
", ErrMessage: " + e.Error); $"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw; throw;
} }
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine("Failed to create native consumer, host: " + host + ", ErrMessage: " + e.Message); Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw; throw;
} }
@ -109,11 +127,12 @@ namespace TMQExample
static void Consume(IConsumer<Dictionary<string, object>> consumer) static void Consume(IConsumer<Dictionary<string, object>> consumer)
{ {
// ANCHOR: subscribe // ANCHOR: subscribe
_topic = "topic_meters";
try try
{ {
// subscribe // subscribe
consumer.Subscribe(new List<string>() { "topic_meters" }); consumer.Subscribe(new List<string>() { _topic });
Console.WriteLine("subscribe topics successfully"); Console.WriteLine("Subscribe topics successfully");
for (int i = 0; i < 50; i++) for (int i = 0; i < 50; i++)
{ {
// consume message with using block to ensure the result is disposed // consume message with using block to ensure the result is disposed
@ -133,13 +152,24 @@ namespace TMQExample
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("Failed to poll data, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw; throw;
} }
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine("Failed to poll data, ErrMessage: " + e.Message); Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw; throw;
} }
// ANCHOR_END: subscribe // ANCHOR_END: subscribe
@ -152,24 +182,38 @@ namespace TMQExample
{ {
// get assignment // get assignment
var assignment = consumer.Assignment; var assignment = consumer.Assignment;
Console.WriteLine($"now assignment: {assignment}"); Console.WriteLine($"Now assignment: {assignment}");
// seek to the beginning // seek to the beginning
foreach (var topicPartition in assignment) foreach (var topicPartition in assignment)
{ {
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0)); consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
} }
Console.WriteLine("Assignment seek to beginning successfully"); Console.WriteLine("Assignment seek to beginning successfully");
} }
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("Failed to execute seek example, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw; throw;
} }
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine("Failed to execute seek example, ErrMessage: " + e.Message); Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrMessage: {e.Message}");
throw; throw;
} }
// ANCHOR_END: seek // ANCHOR_END: seek
@ -180,6 +224,7 @@ namespace TMQExample
// ANCHOR: commit_offset // ANCHOR: commit_offset
for (int i = 0; i < 5; i++) for (int i = 0; i < 5; i++)
{ {
TopicPartitionOffset topicPartitionOffset = null;
try try
{ {
// consume message with using block to ensure the result is disposed // consume message with using block to ensure the result is disposed
@ -187,9 +232,10 @@ namespace TMQExample
{ {
if (cr == null) continue; if (cr == null) continue;
// commit offset // commit offset
topicPartitionOffset = cr.TopicPartitionOffset;
consumer.Commit(new List<TopicPartitionOffset> consumer.Commit(new List<TopicPartitionOffset>
{ {
cr.TopicPartitionOffset, topicPartitionOffset,
}); });
Console.WriteLine("Commit offset manually successfully."); Console.WriteLine("Commit offset manually successfully.");
} }
@ -197,13 +243,26 @@ namespace TMQExample
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("Failed to execute commit example, ErrCode:" + e.Code + ", ErrMessage: " + e.Error); Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw; throw;
} }
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine("Failed to execute commit example, ErrMessage: " + e.Message); Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrMessage: {e.Message}");
throw; throw;
} }
} }
@ -221,13 +280,24 @@ namespace TMQExample
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("Failed to unsubscribe consumer, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); Console.WriteLine(
$"Failed to unsubscribe consumer, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw; throw;
} }
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine("Failed to unsubscribe consumer, ErrMessage: " + e.Message); Console.WriteLine(
$"Failed to execute commit example, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw; throw;
} }
finally finally
@ -239,4 +309,4 @@ namespace TMQExample
// ANCHOR_END: close // ANCHOR_END: close
} }
} }
} }

View File

@ -16,10 +16,10 @@ namespace Examples
var builder = new ConnectionStringBuilder(connectionString); var builder = new ConnectionStringBuilder(connectionString);
using (var client = DbDriver.Open(builder)) using (var client = DbDriver.Open(builder))
{ {
CreateDatabaseAndTable(client,connectionString); CreateDatabaseAndTable(client, connectionString);
InsertData(client,connectionString); InsertData(client, connectionString);
QueryData(client,connectionString); QueryData(client, connectionString);
QueryWithReqId(client,connectionString); QueryWithReqId(client, connectionString);
} }
} }
catch (TDengineError e) catch (TDengineError e)
@ -52,7 +52,8 @@ namespace Examples
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("Failed to create database power or stable meters, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); Console.WriteLine("Failed to create database power or stable meters, ErrCode: " + e.Code +
", ErrMessage: " + e.Error);
throw; throw;
} }
catch (Exception e) catch (Exception e)
@ -64,40 +65,43 @@ namespace Examples
// ANCHOR_END: create_db_and_table // ANCHOR_END: create_db_and_table
} }
private static void InsertData(ITDengineClient client,string connectionString) private static void InsertData(ITDengineClient client, string connectionString)
{ {
// ANCHOR: insert_data // ANCHOR: insert_data
// insert data, please make sure the database and table are created before
var insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
try try
{ {
// insert data, please make sure the database and table are created before
var insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
var affectedRows = client.Exec(insertQuery); var affectedRows = client.Exec(insertQuery);
Console.WriteLine("Successfully inserted " + affectedRows + " rows to power.meters."); Console.WriteLine("Successfully inserted " + affectedRows + " rows to power.meters.");
} }
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("Failed to insert data to power.meters, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); Console.WriteLine("Failed to insert data to power.meters, sql: " + insertQuery + ", ErrCode: " +
e.Code + ", ErrMessage: " +
e.Error);
throw; throw;
} }
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine("Failed to insert data to power.meters, ErrMessage: " + e.Message); Console.WriteLine("Failed to insert data to power.meters, sql: " + insertQuery + ", ErrMessage: " +
e.Message);
throw; throw;
} }
// ANCHOR_END: insert_data // ANCHOR_END: insert_data
} }
private static void QueryData(ITDengineClient client,string connectionString) private static void QueryData(ITDengineClient client, string connectionString)
{ {
// ANCHOR: select_data // ANCHOR: select_data
// query data, make sure the database and table are created before // query data, make sure the database and table are created before
@ -108,6 +112,7 @@ namespace Examples
{ {
while (rows.Read()) while (rows.Read())
{ {
// Add your data processing logic here
var ts = (DateTime)rows.GetValue(0); var ts = (DateTime)rows.GetValue(0);
var current = (float)rows.GetValue(1); var current = (float)rows.GetValue(1);
var location = Encoding.UTF8.GetString((byte[])rows.GetValue(2)); var location = Encoding.UTF8.GetString((byte[])rows.GetValue(2));
@ -119,28 +124,30 @@ namespace Examples
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("Failed to query data from power.meters, sql: " + query + ", ErrCode: " + e.Code + ", ErrMessage: " + e.Error); Console.WriteLine("Failed to query data from power.meters, sql: " + query + ", ErrCode: " + e.Code +
", ErrMessage: " + e.Error);
throw; throw;
} }
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine("Failed to query data from power.meters, sql: " + query + ", ErrMessage: " + e.Message); Console.WriteLine(
"Failed to query data from power.meters, sql: " + query + ", ErrMessage: " + e.Message);
throw; throw;
} }
// ANCHOR_END: select_data // ANCHOR_END: select_data
} }
private static void QueryWithReqId(ITDengineClient client,string connectionString) private static void QueryWithReqId(ITDengineClient client, string connectionString)
{ {
// ANCHOR: query_id // ANCHOR: query_id
var reqId = (long)3; var reqId = (long)3;
// query data
var query = "SELECT ts, current, location FROM power.meters limit 1";
try try
{ {
// query data
var query = "SELECT ts, current, location FROM power.meters limit 1";
// query with request id 3 // query with request id 3
using (var rows = client.Query(query,reqId)) using (var rows = client.Query(query, reqId))
{ {
while (rows.Read()) while (rows.Read())
{ {
@ -155,16 +162,18 @@ namespace Examples
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", ErrCode: " + e.Code + ", ErrMessage: " + e.Error); Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", sql: " + query + ", ErrCode: " +
e.Code + ", ErrMessage: " + e.Error);
throw; throw;
} }
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", ErrMessage: " + e.Message); Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", sql: " + query + ", ErrMessage: " +
e.Message);
throw; throw;
} }
// ANCHOR_END: query_id // ANCHOR_END: query_id
} }
} }
} }

View File

@ -6,6 +6,11 @@ namespace TMQExample
{ {
internal class SubscribeDemo internal class SubscribeDemo
{ {
private static string _host = "";
private static string _groupId = "";
private static string _clientId = "";
private static string _topic = "";
public static void Main(string[] args) public static void Main(string[] args)
{ {
try try
@ -68,9 +73,9 @@ namespace TMQExample
{ {
// ANCHOR: create_consumer // ANCHOR: create_consumer
// consumer config // consumer config
var host = "127.0.0.1"; _host = "127.0.0.1";
var groupId = "group1"; _groupId = "group1";
var clientId = "client1"; _clientId = "client1";
var cfg = new Dictionary<string, string>() var cfg = new Dictionary<string, string>()
{ {
{ "td.connect.type", "WebSocket" }, { "td.connect.type", "WebSocket" },
@ -79,9 +84,9 @@ namespace TMQExample
{ "msg.with.table.name", "true" }, { "msg.with.table.name", "true" },
{ "enable.auto.commit", "true" }, { "enable.auto.commit", "true" },
{ "auto.commit.interval.ms", "1000" }, { "auto.commit.interval.ms", "1000" },
{ "group.id", groupId }, { "group.id", _groupId },
{ "client.id", clientId }, { "client.id", _clientId },
{ "td.connect.ip", host }, { "td.connect.ip", _host },
{ "td.connect.user", "root" }, { "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" }, { "td.connect.pass", "taosdata" },
}; };
@ -90,20 +95,33 @@ namespace TMQExample
{ {
// create consumer // create consumer
consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build(); consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
Console.WriteLine("Create consumer successfully, host: " + host + ", groupId: " + groupId + Console.WriteLine(
", clientId: " + clientId); $"Create consumer successfully, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}");
} }
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("Failed to create websocket consumer, host: " + host + ", ErrCode: " + e.Code + Console.WriteLine(
", ErrMessage: " + e.Error); $"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw; throw;
} }
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine("Failed to create websocket consumer, host: " + host + ", ErrMessage: " + e.Message); Console.WriteLine(
$"Failed to create native consumer, " +
$"host: {_host}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw; throw;
} }
@ -114,10 +132,11 @@ namespace TMQExample
static void Consume(IConsumer<Dictionary<string, object>> consumer) static void Consume(IConsumer<Dictionary<string, object>> consumer)
{ {
// ANCHOR: subscribe // ANCHOR: subscribe
_topic = "topic_meters";
try try
{ {
// subscribe // subscribe
consumer.Subscribe(new List<string>() { "topic_meters" }); consumer.Subscribe(new List<string>() { _topic });
Console.WriteLine("Subscribe topics successfully"); Console.WriteLine("Subscribe topics successfully");
for (int i = 0; i < 50; i++) for (int i = 0; i < 50; i++)
{ {
@ -138,13 +157,23 @@ namespace TMQExample
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("Failed to poll data, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); Console.WriteLine(
$"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw; throw;
} }
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine("Failed to poll data, ErrMessage: " + e.Message); Console.WriteLine($"Failed to poll data, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw; throw;
} }
// ANCHOR_END: subscribe // ANCHOR_END: subscribe
@ -163,18 +192,32 @@ namespace TMQExample
{ {
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0)); consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
} }
Console.WriteLine("Assignment seek to beginning successfully"); Console.WriteLine("Assignment seek to beginning successfully");
} }
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("Failed to execute seek example, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw; throw;
} }
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine("Failed to execute seek example, ErrMessage: " + e.Message); Console.WriteLine(
$"Failed to seek offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: 0, " +
$"ErrMessage: {e.Message}");
throw; throw;
} }
// ANCHOR_END: seek // ANCHOR_END: seek
@ -185,6 +228,7 @@ namespace TMQExample
// ANCHOR: commit_offset // ANCHOR: commit_offset
for (int i = 0; i < 5; i++) for (int i = 0; i < 5; i++)
{ {
TopicPartitionOffset topicPartitionOffset = null;
try try
{ {
// consume message with using block to ensure the result is disposed // consume message with using block to ensure the result is disposed
@ -192,9 +236,10 @@ namespace TMQExample
{ {
if (cr == null) continue; if (cr == null) continue;
// commit offset // commit offset
topicPartitionOffset = cr.TopicPartitionOffset;
consumer.Commit(new List<TopicPartitionOffset> consumer.Commit(new List<TopicPartitionOffset>
{ {
cr.TopicPartitionOffset, topicPartitionOffset,
}); });
Console.WriteLine("Commit offset manually successfully."); Console.WriteLine("Commit offset manually successfully.");
} }
@ -202,13 +247,26 @@ namespace TMQExample
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("Failed to execute commit example, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw; throw;
} }
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine("Failed to execute commit example, ErrMessage: " + e.Message); Console.WriteLine(
$"Failed to commit offset, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"offset: {topicPartitionOffset}, " +
$"ErrMessage: {e.Message}");
throw; throw;
} }
} }
@ -226,13 +284,24 @@ namespace TMQExample
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("Failed to unsubscribe consumer, ErrCode :" + e.Code + ", ErrMessage: " + e.Error); Console.WriteLine(
$"Failed to unsubscribe consumer, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrCode: {e.Code}, " +
$"ErrMessage: {e.Error}");
throw; throw;
} }
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine("Failed to unsubscribe consumer, ErrMessage: " + e.Message); Console.WriteLine(
$"Failed to execute commit example, " +
$"topic: {_topic}, " +
$"groupId: {_groupId}, " +
$"clientId: {_clientId}, " +
$"ErrMessage: {e.Message}");
throw; throw;
} }
finally finally
@ -244,4 +313,4 @@ namespace TMQExample
// ANCHOR_END: close // ANCHOR_END: close
} }
} }
} }

View File

@ -1,58 +1,60 @@
package main package main
import ( import (
"context" "context"
"database/sql" "database/sql"
"fmt" "fmt"
"log" "log"
"time" "time"
_ "github.com/taosdata/driver-go/v3/taosSql" _ "github.com/taosdata/driver-go/v3/taosSql"
) )
func main() { func main() {
taosDSN := "root:taosdata@tcp(localhost:6030)/" taosDSN := "root:taosdata@tcp(localhost:6030)/"
db, err := sql.Open("taosSql", taosDSN) db, err := sql.Open("taosSql", taosDSN)
if err != nil { if err != nil {
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error()) log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error())
} }
defer db.Close() defer db.Close()
initEnv(db) initEnv(db)
// ANCHOR: query_id // ANCHOR: query_id
// use context to set request id // use context to set request id
reqId := int64(3) reqId := int64(3)
ctx := context.WithValue(context.Background(), "taos_req_id", reqId) ctx := context.WithValue(context.Background(), "taos_req_id", reqId)
// execute query with context // execute query with context
rows, err := db.QueryContext(ctx, "SELECT ts, current, location FROM power.meters limit 1") querySql := "SELECT ts, current, location FROM power.meters limit 1"
if err != nil { rows, err := db.QueryContext(ctx, querySql)
log.Fatalf("Failed to execute sql with reqId: %d, url: %s; ErrMessage: %s\n", reqId, taosDSN, err.Error()) if err != nil {
} log.Fatalf("Failed to execute sql with reqId: %d, url: %s, sql: %s, ErrMessage: %s\n", reqId, taosDSN, querySql, err.Error())
for rows.Next() { }
var ( for rows.Next() {
ts time.Time // Add your data processing logic here
current float32 var (
location string ts time.Time
) current float32
err = rows.Scan(&ts, &current, &location) location string
if err != nil { )
log.Fatal("Scan error: ", err) err = rows.Scan(&ts, &current, &location)
} if err != nil {
fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location) log.Fatalf("Failed to scan data, reqId: %d, url:%s, sql: %s, ErrMessage: %s\n", reqId, taosDSN, querySql, err)
} }
// ANCHOR_END: query_id fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location)
} }
// ANCHOR_END: query_id
func initEnv(conn *sql.DB) { }
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil { func initEnv(conn *sql.DB) {
log.Fatal("Create database power error: ", err) _, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
} if err != nil {
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") log.Fatal("Create database power error: ", err)
if err != nil { }
log.Fatal("Create stable meters error: ", err) _, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
} if err != nil {
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)") log.Fatal("Create stable meters error: ", err)
if err != nil { }
log.Fatal("Insert data to power.meters error: ", err) _, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)")
} if err != nil {
} log.Fatal("Insert data to power.meters error: ", err)
}
}

View File

@ -1,86 +1,86 @@
package main package main
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"log" "log"
"time" "time"
_ "github.com/taosdata/driver-go/v3/taosSql" _ "github.com/taosdata/driver-go/v3/taosSql"
) )
func main() { func main() {
var taosDSN = "root:taosdata@tcp(localhost:6030)/" var taosDSN = "root:taosdata@tcp(localhost:6030)/"
db, err := sql.Open("taosSql", taosDSN) db, err := sql.Open("taosSql", taosDSN)
if err != nil { if err != nil {
log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error()) log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error())
} }
defer db.Close() defer db.Close()
// ANCHOR: create_db_and_table // ANCHOR: create_db_and_table
// create database // create database
res, err := db.Exec("CREATE DATABASE IF NOT EXISTS power") res, err := db.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil { if err != nil {
log.Fatalln("Failed to create database power, ErrMessage: " + err.Error()) log.Fatalln("Failed to create database power, ErrMessage: " + err.Error())
} }
rowsAffected, err := res.RowsAffected() rowsAffected, err := res.RowsAffected()
if err != nil { if err != nil {
log.Fatalln("Failed to get create database rowsAffected, ErrMessage: " + err.Error()) log.Fatalln("Failed to get create database rowsAffected, ErrMessage: " + err.Error())
} }
// you can check rowsAffected here // you can check rowsAffected here
fmt.Println("Create database power successfully, rowsAffected: ", rowsAffected) fmt.Println("Create database power successfully, rowsAffected: ", rowsAffected)
// create table // create table
res, err = db.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") res, err = db.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil { if err != nil {
log.Fatalln("Failed to create stable meters, ErrMessage: " + err.Error()) log.Fatalln("Failed to create stable meters, ErrMessage: " + err.Error())
} }
rowsAffected, err = res.RowsAffected() rowsAffected, err = res.RowsAffected()
if err != nil { if err != nil {
log.Fatalln("Failed to get create stable rowsAffected, ErrMessage: " + err.Error()) log.Fatalln("Failed to get create stable rowsAffected, ErrMessage: " + err.Error())
} }
// you can check rowsAffected here // you can check rowsAffected here
fmt.Println("Create stable power.meters successfully, rowsAffected:", rowsAffected) fmt.Println("Create stable power.meters successfully, rowsAffected:", rowsAffected)
// ANCHOR_END: create_db_and_table // ANCHOR_END: create_db_and_table
// ANCHOR: insert_data // ANCHOR: insert_data
// insert data, please make sure the database and table are created before // insert data, please make sure the database and table are created before
insertQuery := "INSERT INTO " + insertQuery := "INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " + "VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " + "(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " + "(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " + "(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " + "VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) " "(NOW + 1a, 10.30000, 218, 0.25000) "
res, err = db.Exec(insertQuery) res, err = db.Exec(insertQuery)
if err != nil { if err != nil {
log.Fatal("Failed to insert data to power.meters, ErrMessage: " + err.Error()) log.Fatalf("Failed to insert data to power.meters, sql: %s, ErrMessage: %s\n", insertQuery, err.Error())
} }
rowsAffected, err = res.RowsAffected() rowsAffected, err = res.RowsAffected()
if err != nil { if err != nil {
log.Fatal("Failed to get insert rowsAffected, ErrMessage: " + err.Error()) log.Fatalf("Failed to get insert rowsAffected, sql: %s, ErrMessage: %s\n", insertQuery, err.Error())
} }
// you can check affectedRows here // you can check affectedRows here
fmt.Printf("Successfully inserted %d rows to power.meters.\n", rowsAffected) fmt.Printf("Successfully inserted %d rows to power.meters.\n", rowsAffected)
// ANCHOR_END: insert_data // ANCHOR_END: insert_data
// ANCHOR: select_data // ANCHOR: select_data
// query data, make sure the database and table are created before // query data, make sure the database and table are created before
sql := "SELECT ts, current, location FROM power.meters limit 100" sql := "SELECT ts, current, location FROM power.meters limit 100"
rows, err := db.Query(sql) rows, err := db.Query(sql)
if err != nil { if err != nil {
log.Fatal("Failed to query data from power.meters, ErrMessage: " + err.Error()) log.Fatalf("Failed to query data from power.meters, sql: %s, ErrMessage: %s\n", sql, err.Error())
} }
for rows.Next() { for rows.Next() {
var ( // Add your data processing logic here
ts time.Time var (
current float32 ts time.Time
location string current float32
) location string
err = rows.Scan(&ts, &current, &location) )
if err != nil { err = rows.Scan(&ts, &current, &location)
log.Fatal("Failed to scan data, sql:" + sql + ", ErrMessage: " + err.Error()) if err != nil {
} log.Fatalf("Failed to scan data, sql: %s, ErrMessage: %s\n", sql, err)
// you can check data here }
fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location) fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location)
} }
// ANCHOR_END: select_data // ANCHOR_END: select_data
} }

View File

@ -1,136 +1,179 @@
package main package main
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"log" "log"
"time" "time"
"github.com/taosdata/driver-go/v3/af/tmq" "github.com/taosdata/driver-go/v3/af/tmq"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq" tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
_ "github.com/taosdata/driver-go/v3/taosSql" _ "github.com/taosdata/driver-go/v3/taosSql"
) )
var done = make(chan struct{}) var done = make(chan struct{})
var groupID string
func main() { var clientID string
// init env var host string
taosDSN := "root:taosdata@tcp(127.0.0.1:6030)/" var topic string
conn, err := sql.Open("taosSql", taosDSN)
if err != nil { func main() {
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error()) // init env
} taosDSN := "root:taosdata@tcp(127.0.0.1:6030)/"
defer func() { conn, err := sql.Open("taosSql", taosDSN)
conn.Close() if err != nil {
}() log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error())
initEnv(conn) }
// ANCHOR: create_consumer defer func() {
// create consumer conn.Close()
groupID := "group1" }()
clientID := "client1" initEnv(conn)
host := "127.0.0.1" // ANCHOR: create_consumer
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ // create consumer
"td.connect.user": "root", groupID = "group1"
"td.connect.pass": "taosdata", clientID = "client1"
"auto.offset.reset": "latest", host = "127.0.0.1"
"msg.with.table.name": "true", consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"enable.auto.commit": "true", "td.connect.user": "root",
"auto.commit.interval.ms": "1000", "td.connect.pass": "taosdata",
"group.id": groupID, "auto.offset.reset": "latest",
"client.id": clientID, "msg.with.table.name": "true",
}) "enable.auto.commit": "true",
if err != nil { "auto.commit.interval.ms": "1000",
log.Fatalln("Failed to create native consumer, host : " + host + "; ErrMessage: " + err.Error()) "group.id": groupID,
} "client.id": clientID,
log.Println("Create consumer successfully, host: " + host + ", groupId: " + groupID + ", clientId: " + clientID) })
if err != nil {
// ANCHOR_END: create_consumer log.Fatalf(
// ANCHOR: subscribe "Failed to create native consumer, host: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
err = consumer.Subscribe("topic_meters", nil) host,
if err != nil { groupID,
log.Fatalln("Failed to subscribe topic_meters, ErrMessage: " + err.Error()) clientID,
} err.Error(),
log.Println("Subscribe topics successfully") )
for i := 0; i < 50; i++ { }
ev := consumer.Poll(100) log.Printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s\n", host, groupID, clientID)
if ev != nil {
switch e := ev.(type) { // ANCHOR_END: create_consumer
case *tmqcommon.DataMessage: // ANCHOR: subscribe
// process your data here topic = "topic_meters"
fmt.Printf("data:%v\n", e) err = consumer.Subscribe(topic, nil)
// ANCHOR: commit_offset if err != nil {
// commit offset log.Fatalf(
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition}) "Failed to subscribe topic_meters, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
if err != nil { topic,
log.Fatalln("Failed to commit offset, ErrMessage: " + err.Error()) groupID,
} clientID,
log.Println("Commit offset manually successfully.") err.Error(),
// ANCHOR_END: commit_offset )
case tmqcommon.Error: }
fmt.Printf("%% Error: %v: %v\n", e.Code(), e) log.Println("Subscribe topics successfully")
log.Fatalln("Failed to poll data, ErrMessage: " + err.Error()) for i := 0; i < 50; i++ {
} ev := consumer.Poll(100)
} if ev != nil {
} switch e := ev.(type) {
// ANCHOR_END: subscribe case *tmqcommon.DataMessage:
// ANCHOR: seek // process your data here
// get assignment fmt.Printf("data:%v\n", e)
partitions, err := consumer.Assignment() // ANCHOR: commit_offset
if err != nil { // commit offset
log.Fatal("Failed to get assignment, ErrMessage: " + err.Error()) _, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
} if err != nil {
fmt.Println("Now assignment:", partitions) log.Fatalf(
for i := 0; i < len(partitions); i++ { "Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
// seek to the beginning topic,
err = consumer.Seek(tmqcommon.TopicPartition{ groupID,
Topic: partitions[i].Topic, clientID,
Partition: partitions[i].Partition, e.TopicPartition,
Offset: 0, err.Error(),
}, 0) )
if err != nil { }
log.Fatalln("Failed to execute seek example, ErrMessage: " + err.Error()) log.Println("Commit offset manually successfully.")
} // ANCHOR_END: commit_offset
} case tmqcommon.Error:
fmt.Println("Assignment seek to beginning successfully") log.Fatalf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", topic, groupID, clientID, e.Error())
// ANCHOR_END: seek }
// ANCHOR: close }
// unsubscribe }
err = consumer.Unsubscribe() // ANCHOR_END: subscribe
if err != nil { // ANCHOR: seek
log.Fatal("Failed to unsubscribe consumer, ErrMessage: " + err.Error()) // get assignment
} partitions, err := consumer.Assignment()
fmt.Println("Consumer unsubscribed successfully.") if err != nil {
// close consumer log.Fatalf("Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", topic, groupID, clientID, err.Error())
err = consumer.Close() }
if err != nil { fmt.Println("Now assignment:", partitions)
log.Fatal("Failed to close consumer, ErrMessage: " + err.Error()) for i := 0; i < len(partitions); i++ {
} // seek to the beginning
fmt.Println("Consumer closed successfully.") err = consumer.Seek(tmqcommon.TopicPartition{
// ANCHOR_END: close Topic: partitions[i].Topic,
<-done Partition: partitions[i].Partition,
} Offset: 0,
}, 0)
func initEnv(conn *sql.DB) { if err != nil {
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power") log.Fatalf(
if err != nil { "Failed to execute seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n",
log.Fatal("Failed to create database, ErrMessage: " + err.Error()) topic,
} groupID,
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") clientID,
if err != nil { partitions[i].Partition,
log.Fatal("Failed to create stable, ErrMessage: " + err.Error()) 0,
} err.Error(),
_, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters") )
if err != nil { }
log.Fatal("Failed to create topic, ErrMessage: " + err.Error()) }
} fmt.Println("Assignment seek to beginning successfully")
go func() { // ANCHOR_END: seek
for i := 0; i < 10; i++ { // ANCHOR: close
time.Sleep(time.Second) // unsubscribe
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)") err = consumer.Unsubscribe()
if err != nil { if err != nil {
log.Fatal("Failed to insert data, ErrMessage: " + err.Error()) log.Fatalf(
} "Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
} topic,
done <- struct{}{} groupID,
}() clientID,
} err.Error(),
)
}
fmt.Println("Consumer unsubscribed successfully.")
// close consumer
err = consumer.Close()
if err != nil {
log.Fatalf(
"Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer closed successfully.")
// ANCHOR_END: close
<-done
}
func initEnv(conn *sql.DB) {
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatal("Failed to create database, ErrMessage: " + err.Error())
}
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatal("Failed to create stable, ErrMessage: " + err.Error())
}
_, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters")
if err != nil {
log.Fatal("Failed to create topic, ErrMessage: " + err.Error())
}
go func() {
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)")
if err != nil {
log.Fatal("Failed to insert data, ErrMessage: " + err.Error())
}
}
done <- struct{}{}
}()
}

View File

@ -1,141 +1,197 @@
package main package main
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"log" "log"
"time" "time"
"github.com/taosdata/driver-go/v3/common" "github.com/taosdata/driver-go/v3/common"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq" tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
_ "github.com/taosdata/driver-go/v3/taosWS" _ "github.com/taosdata/driver-go/v3/taosWS"
"github.com/taosdata/driver-go/v3/ws/tmq" "github.com/taosdata/driver-go/v3/ws/tmq"
) )
var done = make(chan struct{}) var done = make(chan struct{})
var groupID string
func main() { var clientID string
// init env var host string
taosDSN := "root:taosdata@ws(127.0.0.1:6041)/" var topic string
conn, err := sql.Open("taosWS", taosDSN)
if err != nil { func main() {
log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error()) // init env
} taosDSN := "root:taosdata@ws(127.0.0.1:6041)/"
defer func() { conn, err := sql.Open("taosWS", taosDSN)
conn.Close() if err != nil {
}() log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error())
initEnv(conn) }
// ANCHOR: create_consumer defer func() {
// create consumer conn.Close()
wsUrl := "ws://127.0.0.1:6041" }()
groupID := "group1" initEnv(conn)
clientID := "client1" // ANCHOR: create_consumer
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ // create consumer
"ws.url": wsUrl, wsUrl := "ws://127.0.0.1:6041"
"ws.message.channelLen": uint(0), groupID = "group1"
"ws.message.timeout": common.DefaultMessageTimeout, clientID = "client1"
"ws.message.writeWait": common.DefaultWriteWait, host = "127.0.0.1"
"td.connect.user": "root", consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"td.connect.pass": "taosdata", "ws.url": wsUrl,
"auto.offset.reset": "latest", "ws.message.channelLen": uint(0),
"msg.with.table.name": "true", "ws.message.timeout": common.DefaultMessageTimeout,
"enable.auto.commit": "true", "ws.message.writeWait": common.DefaultWriteWait,
"auto.commit.interval.ms": "1000", "td.connect.user": "root",
"group.id": groupID, "td.connect.pass": "taosdata",
"client.id": clientID, "auto.offset.reset": "latest",
}) "msg.with.table.name": "true",
if err != nil { "enable.auto.commit": "true",
log.Fatalln("Failed to create websocket consumer, host : " + wsUrl + "; ErrMessage: " + err.Error()) "auto.commit.interval.ms": "1000",
} "group.id": groupID,
log.Println("Create consumer successfully, host: " + wsUrl + ", groupId: " + groupID + ", clientId: " + clientID) "client.id": clientID,
})
// ANCHOR_END: create_consumer if err != nil {
// ANCHOR: subscribe log.Fatalf(
err = consumer.Subscribe("topic_meters", nil) "Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
if err != nil { host,
log.Fatalln("Failed to subscribe topic_meters, ErrMessage: " + err.Error()) groupID,
} clientID,
log.Println("Subscribe topics successfully") err.Error(),
for i := 0; i < 50; i++ { )
ev := consumer.Poll(100) }
if ev != nil { log.Printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s\n", host, groupID, clientID)
switch e := ev.(type) {
case *tmqcommon.DataMessage: // ANCHOR_END: create_consumer
// process your data here // ANCHOR: subscribe
fmt.Printf("data:%v\n", e) topic = "topic_meters"
// ANCHOR: commit_offset err = consumer.Subscribe(topic, nil)
// commit offset if err != nil {
_, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition}) log.Fatalf(
if err != nil { "Failed to subscribe topic_meters, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
log.Fatalln("Failed to commit offset, ErrMessage: " + err.Error()) topic,
} groupID,
log.Println("Commit offset manually successfully.") clientID,
// ANCHOR_END: commit_offset err.Error(),
case tmqcommon.Error: )
fmt.Printf("%% Error: %v: %v\n", e.Code(), e) }
log.Fatalln("Failed to poll data, ErrMessage: " + err.Error()) log.Println("Subscribe topics successfully")
} for i := 0; i < 50; i++ {
} ev := consumer.Poll(100)
} if ev != nil {
// ANCHOR_END: subscribe switch e := ev.(type) {
// ANCHOR: seek case *tmqcommon.DataMessage:
// get assignment // process your data here
partitions, err := consumer.Assignment() fmt.Printf("data:%v\n", e)
if err != nil { // ANCHOR: commit_offset
log.Fatal("Failed to get assignment, ErrMessage: " + err.Error()) // commit offset
} _, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition})
fmt.Println("Now assignment:", partitions) if err != nil {
for i := 0; i < len(partitions); i++ { log.Fatalf(
// seek to the beginning "Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n",
err = consumer.Seek(tmqcommon.TopicPartition{ topic,
Topic: partitions[i].Topic, groupID,
Partition: partitions[i].Partition, clientID,
Offset: 0, e.TopicPartition,
}, 0) err.Error(),
if err != nil { )
log.Fatalln("Failed to execute seek example, ErrMessage: " + err.Error()) }
} log.Println("Commit offset manually successfully.")
} // ANCHOR_END: commit_offset
fmt.Println("Assignment seek to beginning successfully") case tmqcommon.Error:
// ANCHOR_END: seek log.Fatalf(
// ANCHOR: close "Failed to poll data, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
// unsubscribe topic,
err = consumer.Unsubscribe() groupID,
if err != nil { clientID,
log.Fatal("Failed to unsubscribe consumer, ErrMessage: " + err.Error()) e.Error(),
} )
fmt.Println("Consumer unsubscribed successfully.") }
// close consumer }
err = consumer.Close() }
if err != nil { // ANCHOR_END: subscribe
log.Fatal("Failed to close consumer, ErrMessage: " + err.Error()) // ANCHOR: seek
} // get assignment
fmt.Println("Consumer closed successfully.") partitions, err := consumer.Assignment()
// ANCHOR_END: close if err != nil {
<-done log.Fatalf(
} "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
func initEnv(conn *sql.DB) { groupID,
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power") clientID,
if err != nil { err.Error(),
log.Fatal("Failed to create database, ErrMessage: " + err.Error()) )
} }
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") fmt.Println("Now assignment:", partitions)
if err != nil { for i := 0; i < len(partitions); i++ {
log.Fatal("Failed to create stable, ErrMessage: " + err.Error()) // seek to the beginning
} err = consumer.Seek(tmqcommon.TopicPartition{
_, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters") Topic: partitions[i].Topic,
if err != nil { Partition: partitions[i].Partition,
log.Fatal("Failed to create topic, ErrMessage: " + err.Error()) Offset: 0,
} }, 0)
go func() { if err != nil {
for i := 0; i < 10; i++ { log.Fatalf(
time.Sleep(time.Second) "Failed to seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n",
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)") topic,
if err != nil { groupID,
log.Fatal("Failed to insert data, ErrMessage: " + err.Error()) clientID,
} partitions[i].Partition,
} 0,
done <- struct{}{} err.Error(),
}() )
} }
}
fmt.Println("Assignment seek to beginning successfully")
// ANCHOR_END: seek
// ANCHOR: close
// unsubscribe
err = consumer.Unsubscribe()
if err != nil {
log.Fatalf(
"Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer unsubscribed successfully.")
// close consumer
err = consumer.Close()
if err != nil {
log.Fatalf(
"Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n",
topic,
groupID,
clientID,
err.Error(),
)
}
fmt.Println("Consumer closed successfully.")
// ANCHOR_END: close
<-done
}
func initEnv(conn *sql.DB) {
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil {
log.Fatal("Failed to create database, ErrMessage: " + err.Error())
}
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil {
log.Fatal("Failed to create stable, ErrMessage: " + err.Error())
}
_, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters")
if err != nil {
log.Fatal("Failed to create topic, ErrMessage: " + err.Error())
}
go func() {
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)")
if err != nil {
log.Fatal("Failed to insert data, ErrMessage: " + err.Error())
}
}
done <- struct{}{}
}()
}