diff --git a/docs/examples/csharp/subscribe/Program.cs b/docs/examples/csharp/subscribe/Program.cs index 4138194800..50988d0c5d 100644 --- a/docs/examples/csharp/subscribe/Program.cs +++ b/docs/examples/csharp/subscribe/Program.cs @@ -6,6 +6,11 @@ namespace TMQExample { internal class SubscribeDemo { + private static string _host = ""; + private static string _groupId = ""; + private static string _clientId = ""; + private static string _topic = ""; + public static void Main(string[] args) { try @@ -64,9 +69,9 @@ namespace TMQExample { // ANCHOR: create_consumer // consumer config - var host = "127.0.0.1"; - var groupId = "group1"; - var clientId = "client1"; + _host = "127.0.0.1"; + _groupId = "group1"; + _clientId = "client1"; var cfg = new Dictionary() { { "td.connect.port", "6030" }, @@ -74,9 +79,9 @@ namespace TMQExample { "msg.with.table.name", "true" }, { "enable.auto.commit", "true" }, { "auto.commit.interval.ms", "1000" }, - { "group.id", groupId }, - { "client.id", clientId }, - { "td.connect.ip", host }, + { "group.id", _groupId }, + { "client.id", _clientId }, + { "td.connect.ip", _host }, { "td.connect.user", "root" }, { "td.connect.pass", "taosdata" }, }; @@ -85,20 +90,33 @@ namespace TMQExample { // create consumer consumer = new ConsumerBuilder>(cfg).Build(); - Console.WriteLine("Create consumer successfully, host: " + host + ", groupId: " + groupId + - ", clientId: " + clientId); + Console.WriteLine( + $"Create consumer successfully, " + + $"host: {_host}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}"); } catch (TDengineError e) { // handle TDengine error - Console.WriteLine("Failed to create native consumer, host: " + host + ", ErrCode:" + e.Code + - ", ErrMessage: " + e.Error); + Console.WriteLine( + $"Failed to create native consumer, " + + $"host: {_host}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"ErrCode: {e.Code}, " + + $"ErrMessage: {e.Error}"); throw; } catch (Exception e) { // handle other exceptions - Console.WriteLine("Failed to create native consumer, host: " + host + ", ErrMessage: " + e.Message); + Console.WriteLine( + $"Failed to create native consumer, " + + $"host: {_host}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"ErrMessage: {e.Message}"); throw; } @@ -109,11 +127,12 @@ namespace TMQExample static void Consume(IConsumer> consumer) { // ANCHOR: subscribe + _topic = "topic_meters"; try { // subscribe - consumer.Subscribe(new List() { "topic_meters" }); - Console.WriteLine("subscribe topics successfully"); + consumer.Subscribe(new List() { _topic }); + Console.WriteLine("Subscribe topics successfully"); for (int i = 0; i < 50; i++) { // consume message with using block to ensure the result is disposed @@ -133,13 +152,24 @@ namespace TMQExample catch (TDengineError e) { // handle TDengine error - Console.WriteLine("Failed to poll data, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); + Console.WriteLine( + $"Failed to poll data, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"ErrCode: {e.Code}, " + + $"ErrMessage: {e.Error}"); throw; } catch (Exception e) { // handle other exceptions - Console.WriteLine("Failed to poll data, ErrMessage: " + e.Message); + Console.WriteLine( + $"Failed to poll data, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"ErrMessage: {e.Message}"); throw; } // ANCHOR_END: subscribe @@ -152,24 +182,38 @@ namespace TMQExample { // get assignment var assignment = consumer.Assignment; - Console.WriteLine($"now assignment: {assignment}"); + Console.WriteLine($"Now assignment: {assignment}"); // seek to the beginning foreach (var topicPartition in assignment) { consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0)); } + Console.WriteLine("Assignment seek to beginning successfully"); } catch (TDengineError e) { // handle TDengine error - Console.WriteLine("Failed to execute seek example, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); + Console.WriteLine( + $"Failed to seek offset, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"offset: 0, " + + $"ErrCode: {e.Code}, " + + $"ErrMessage: {e.Error}"); throw; } catch (Exception e) { // handle other exceptions - Console.WriteLine("Failed to execute seek example, ErrMessage: " + e.Message); + Console.WriteLine( + $"Failed to seek offset, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"offset: 0, " + + $"ErrMessage: {e.Message}"); throw; } // ANCHOR_END: seek @@ -180,6 +224,7 @@ namespace TMQExample // ANCHOR: commit_offset for (int i = 0; i < 5; i++) { + TopicPartitionOffset topicPartitionOffset = null; try { // consume message with using block to ensure the result is disposed @@ -187,9 +232,10 @@ namespace TMQExample { if (cr == null) continue; // commit offset + topicPartitionOffset = cr.TopicPartitionOffset; consumer.Commit(new List { - cr.TopicPartitionOffset, + topicPartitionOffset, }); Console.WriteLine("Commit offset manually successfully."); } @@ -197,13 +243,26 @@ namespace TMQExample catch (TDengineError e) { // handle TDengine error - Console.WriteLine("Failed to execute commit example, ErrCode:" + e.Code + ", ErrMessage: " + e.Error); + Console.WriteLine( + $"Failed to commit offset, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"offset: {topicPartitionOffset}, " + + $"ErrCode: {e.Code}, " + + $"ErrMessage: {e.Error}"); throw; } catch (Exception e) { // handle other exceptions - Console.WriteLine("Failed to execute commit example, ErrMessage: " + e.Message); + Console.WriteLine( + $"Failed to commit offset, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"offset: {topicPartitionOffset}, " + + $"ErrMessage: {e.Message}"); throw; } } @@ -221,13 +280,24 @@ namespace TMQExample catch (TDengineError e) { // handle TDengine error - Console.WriteLine("Failed to unsubscribe consumer, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); + Console.WriteLine( + $"Failed to unsubscribe consumer, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"ErrCode: {e.Code}, " + + $"ErrMessage: {e.Error}"); throw; } catch (Exception e) { // handle other exceptions - Console.WriteLine("Failed to unsubscribe consumer, ErrMessage: " + e.Message); + Console.WriteLine( + $"Failed to execute commit example, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"ErrMessage: {e.Message}"); throw; } finally @@ -239,4 +309,4 @@ namespace TMQExample // ANCHOR_END: close } } -} +} \ No newline at end of file diff --git a/docs/examples/csharp/wsInsert/Program.cs b/docs/examples/csharp/wsInsert/Program.cs index 36b884a522..9bc47d97f1 100644 --- a/docs/examples/csharp/wsInsert/Program.cs +++ b/docs/examples/csharp/wsInsert/Program.cs @@ -16,10 +16,10 @@ namespace Examples var builder = new ConnectionStringBuilder(connectionString); using (var client = DbDriver.Open(builder)) { - CreateDatabaseAndTable(client,connectionString); - InsertData(client,connectionString); - QueryData(client,connectionString); - QueryWithReqId(client,connectionString); + CreateDatabaseAndTable(client, connectionString); + InsertData(client, connectionString); + QueryData(client, connectionString); + QueryWithReqId(client, connectionString); } } catch (TDengineError e) @@ -52,7 +52,8 @@ namespace Examples catch (TDengineError e) { // handle TDengine error - Console.WriteLine("Failed to create database power or stable meters, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); + Console.WriteLine("Failed to create database power or stable meters, ErrCode: " + e.Code + + ", ErrMessage: " + e.Error); throw; } catch (Exception e) @@ -64,40 +65,43 @@ namespace Examples // ANCHOR_END: create_db_and_table } - private static void InsertData(ITDengineClient client,string connectionString) + private static void InsertData(ITDengineClient client, string connectionString) { // ANCHOR: insert_data + // insert data, please make sure the database and table are created before + var insertQuery = "INSERT INTO " + + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 219, 0.31000) " + + "(NOW + 2a, 12.60000, 218, 0.33000) " + + "(NOW + 3a, 12.30000, 221, 0.31000) " + + "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 218, 0.25000) "; try { - // insert data, please make sure the database and table are created before - var insertQuery = "INSERT INTO " + - "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 219, 0.31000) " + - "(NOW + 2a, 12.60000, 218, 0.33000) " + - "(NOW + 3a, 12.30000, 221, 0.31000) " + - "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 218, 0.25000) "; var affectedRows = client.Exec(insertQuery); Console.WriteLine("Successfully inserted " + affectedRows + " rows to power.meters."); } catch (TDengineError e) { // handle TDengine error - Console.WriteLine("Failed to insert data to power.meters, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); + Console.WriteLine("Failed to insert data to power.meters, sql: " + insertQuery + ", ErrCode: " + + e.Code + ", ErrMessage: " + + e.Error); throw; } catch (Exception e) { // handle other exceptions - Console.WriteLine("Failed to insert data to power.meters, ErrMessage: " + e.Message); + Console.WriteLine("Failed to insert data to power.meters, sql: " + insertQuery + ", ErrMessage: " + + e.Message); throw; } // ANCHOR_END: insert_data } - private static void QueryData(ITDengineClient client,string connectionString) + private static void QueryData(ITDengineClient client, string connectionString) { // ANCHOR: select_data // query data, make sure the database and table are created before @@ -108,6 +112,7 @@ namespace Examples { while (rows.Read()) { + // Add your data processing logic here var ts = (DateTime)rows.GetValue(0); var current = (float)rows.GetValue(1); var location = Encoding.UTF8.GetString((byte[])rows.GetValue(2)); @@ -119,28 +124,30 @@ namespace Examples catch (TDengineError e) { // handle TDengine error - Console.WriteLine("Failed to query data from power.meters, sql: " + query + ", ErrCode: " + e.Code + ", ErrMessage: " + e.Error); + Console.WriteLine("Failed to query data from power.meters, sql: " + query + ", ErrCode: " + e.Code + + ", ErrMessage: " + e.Error); throw; } catch (Exception e) { // handle other exceptions - Console.WriteLine("Failed to query data from power.meters, sql: " + query + ", ErrMessage: " + e.Message); + Console.WriteLine( + "Failed to query data from power.meters, sql: " + query + ", ErrMessage: " + e.Message); throw; } // ANCHOR_END: select_data } - private static void QueryWithReqId(ITDengineClient client,string connectionString) + private static void QueryWithReqId(ITDengineClient client, string connectionString) { // ANCHOR: query_id var reqId = (long)3; + // query data + var query = "SELECT ts, current, location FROM power.meters limit 1"; try { - // query data - var query = "SELECT ts, current, location FROM power.meters limit 1"; // query with request id 3 - using (var rows = client.Query(query,reqId)) + using (var rows = client.Query(query, reqId)) { while (rows.Read()) { @@ -155,16 +162,18 @@ namespace Examples catch (TDengineError e) { // handle TDengine error - Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", ErrCode: " + e.Code + ", ErrMessage: " + e.Error); + Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", sql: " + query + ", ErrCode: " + + e.Code + ", ErrMessage: " + e.Error); throw; } catch (Exception e) { // handle other exceptions - Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", ErrMessage: " + e.Message); + Console.WriteLine("Failed to execute sql with reqId: " + reqId + ", sql: " + query + ", ErrMessage: " + + e.Message); throw; } // ANCHOR_END: query_id } } -} +} \ No newline at end of file diff --git a/docs/examples/csharp/wssubscribe/Program.cs b/docs/examples/csharp/wssubscribe/Program.cs index 21abe10847..939189cabd 100644 --- a/docs/examples/csharp/wssubscribe/Program.cs +++ b/docs/examples/csharp/wssubscribe/Program.cs @@ -6,6 +6,11 @@ namespace TMQExample { internal class SubscribeDemo { + private static string _host = ""; + private static string _groupId = ""; + private static string _clientId = ""; + private static string _topic = ""; + public static void Main(string[] args) { try @@ -68,9 +73,9 @@ namespace TMQExample { // ANCHOR: create_consumer // consumer config - var host = "127.0.0.1"; - var groupId = "group1"; - var clientId = "client1"; + _host = "127.0.0.1"; + _groupId = "group1"; + _clientId = "client1"; var cfg = new Dictionary() { { "td.connect.type", "WebSocket" }, @@ -79,9 +84,9 @@ namespace TMQExample { "msg.with.table.name", "true" }, { "enable.auto.commit", "true" }, { "auto.commit.interval.ms", "1000" }, - { "group.id", groupId }, - { "client.id", clientId }, - { "td.connect.ip", host }, + { "group.id", _groupId }, + { "client.id", _clientId }, + { "td.connect.ip", _host }, { "td.connect.user", "root" }, { "td.connect.pass", "taosdata" }, }; @@ -90,20 +95,33 @@ namespace TMQExample { // create consumer consumer = new ConsumerBuilder>(cfg).Build(); - Console.WriteLine("Create consumer successfully, host: " + host + ", groupId: " + groupId + - ", clientId: " + clientId); + Console.WriteLine( + $"Create consumer successfully, " + + $"host: {_host}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}"); } catch (TDengineError e) { // handle TDengine error - Console.WriteLine("Failed to create websocket consumer, host: " + host + ", ErrCode: " + e.Code + - ", ErrMessage: " + e.Error); + Console.WriteLine( + $"Failed to create native consumer, " + + $"host: {_host}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"ErrCode: {e.Code}, " + + $"ErrMessage: {e.Error}"); throw; } catch (Exception e) { // handle other exceptions - Console.WriteLine("Failed to create websocket consumer, host: " + host + ", ErrMessage: " + e.Message); + Console.WriteLine( + $"Failed to create native consumer, " + + $"host: {_host}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"ErrMessage: {e.Message}"); throw; } @@ -114,10 +132,11 @@ namespace TMQExample static void Consume(IConsumer> consumer) { // ANCHOR: subscribe + _topic = "topic_meters"; try { // subscribe - consumer.Subscribe(new List() { "topic_meters" }); + consumer.Subscribe(new List() { _topic }); Console.WriteLine("Subscribe topics successfully"); for (int i = 0; i < 50; i++) { @@ -138,13 +157,23 @@ namespace TMQExample catch (TDengineError e) { // handle TDengine error - Console.WriteLine("Failed to poll data, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); + Console.WriteLine( + $"Failed to poll data, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"ErrCode: {e.Code}, " + + $"ErrMessage: {e.Error}"); throw; } catch (Exception e) { // handle other exceptions - Console.WriteLine("Failed to poll data, ErrMessage: " + e.Message); + Console.WriteLine($"Failed to poll data, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"ErrMessage: {e.Message}"); throw; } // ANCHOR_END: subscribe @@ -163,18 +192,32 @@ namespace TMQExample { consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0)); } + Console.WriteLine("Assignment seek to beginning successfully"); } catch (TDengineError e) { // handle TDengine error - Console.WriteLine("Failed to execute seek example, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); + Console.WriteLine( + $"Failed to seek offset, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"offset: 0, " + + $"ErrCode: {e.Code}, " + + $"ErrMessage: {e.Error}"); throw; } catch (Exception e) { // handle other exceptions - Console.WriteLine("Failed to execute seek example, ErrMessage: " + e.Message); + Console.WriteLine( + $"Failed to seek offset, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"offset: 0, " + + $"ErrMessage: {e.Message}"); throw; } // ANCHOR_END: seek @@ -185,6 +228,7 @@ namespace TMQExample // ANCHOR: commit_offset for (int i = 0; i < 5; i++) { + TopicPartitionOffset topicPartitionOffset = null; try { // consume message with using block to ensure the result is disposed @@ -192,9 +236,10 @@ namespace TMQExample { if (cr == null) continue; // commit offset + topicPartitionOffset = cr.TopicPartitionOffset; consumer.Commit(new List { - cr.TopicPartitionOffset, + topicPartitionOffset, }); Console.WriteLine("Commit offset manually successfully."); } @@ -202,13 +247,26 @@ namespace TMQExample catch (TDengineError e) { // handle TDengine error - Console.WriteLine("Failed to execute commit example, ErrCode: " + e.Code + ", ErrMessage: " + e.Error); + Console.WriteLine( + $"Failed to commit offset, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"offset: {topicPartitionOffset}, " + + $"ErrCode: {e.Code}, " + + $"ErrMessage: {e.Error}"); throw; } catch (Exception e) { // handle other exceptions - Console.WriteLine("Failed to execute commit example, ErrMessage: " + e.Message); + Console.WriteLine( + $"Failed to commit offset, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"offset: {topicPartitionOffset}, " + + $"ErrMessage: {e.Message}"); throw; } } @@ -226,13 +284,24 @@ namespace TMQExample catch (TDengineError e) { // handle TDengine error - Console.WriteLine("Failed to unsubscribe consumer, ErrCode :" + e.Code + ", ErrMessage: " + e.Error); + Console.WriteLine( + $"Failed to unsubscribe consumer, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"ErrCode: {e.Code}, " + + $"ErrMessage: {e.Error}"); throw; } catch (Exception e) { // handle other exceptions - Console.WriteLine("Failed to unsubscribe consumer, ErrMessage: " + e.Message); + Console.WriteLine( + $"Failed to execute commit example, " + + $"topic: {_topic}, " + + $"groupId: {_groupId}, " + + $"clientId: {_clientId}, " + + $"ErrMessage: {e.Message}"); throw; } finally @@ -244,4 +313,4 @@ namespace TMQExample // ANCHOR_END: close } } -} +} \ No newline at end of file diff --git a/docs/examples/go/queryreqid/main.go b/docs/examples/go/queryreqid/main.go index 39d1d6bd5e..0763feceff 100644 --- a/docs/examples/go/queryreqid/main.go +++ b/docs/examples/go/queryreqid/main.go @@ -1,58 +1,60 @@ -package main - -import ( - "context" - "database/sql" - "fmt" - "log" - "time" - - _ "github.com/taosdata/driver-go/v3/taosSql" -) - -func main() { - taosDSN := "root:taosdata@tcp(localhost:6030)/" - db, err := sql.Open("taosSql", taosDSN) - if err != nil { - log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error()) - } - defer db.Close() - initEnv(db) - // ANCHOR: query_id - // use context to set request id - reqId := int64(3) - ctx := context.WithValue(context.Background(), "taos_req_id", reqId) - // execute query with context - rows, err := db.QueryContext(ctx, "SELECT ts, current, location FROM power.meters limit 1") - if err != nil { - log.Fatalf("Failed to execute sql with reqId: %d, url: %s; ErrMessage: %s\n", reqId, taosDSN, err.Error()) - } - for rows.Next() { - var ( - ts time.Time - current float32 - location string - ) - err = rows.Scan(&ts, ¤t, &location) - if err != nil { - log.Fatal("Scan error: ", err) - } - fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location) - } - // ANCHOR_END: query_id -} - -func initEnv(conn *sql.DB) { - _, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power") - if err != nil { - log.Fatal("Create database power error: ", err) - } - _, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") - if err != nil { - log.Fatal("Create stable meters error: ", err) - } - _, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)") - if err != nil { - log.Fatal("Insert data to power.meters error: ", err) - } -} +package main + +import ( + "context" + "database/sql" + "fmt" + "log" + "time" + + _ "github.com/taosdata/driver-go/v3/taosSql" +) + +func main() { + taosDSN := "root:taosdata@tcp(localhost:6030)/" + db, err := sql.Open("taosSql", taosDSN) + if err != nil { + log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error()) + } + defer db.Close() + initEnv(db) + // ANCHOR: query_id + // use context to set request id + reqId := int64(3) + ctx := context.WithValue(context.Background(), "taos_req_id", reqId) + // execute query with context + querySql := "SELECT ts, current, location FROM power.meters limit 1" + rows, err := db.QueryContext(ctx, querySql) + if err != nil { + log.Fatalf("Failed to execute sql with reqId: %d, url: %s, sql: %s, ErrMessage: %s\n", reqId, taosDSN, querySql, err.Error()) + } + for rows.Next() { + // Add your data processing logic here + var ( + ts time.Time + current float32 + location string + ) + err = rows.Scan(&ts, ¤t, &location) + if err != nil { + log.Fatalf("Failed to scan data, reqId: %d, url:%s, sql: %s, ErrMessage: %s\n", reqId, taosDSN, querySql, err) + } + fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location) + } + // ANCHOR_END: query_id +} + +func initEnv(conn *sql.DB) { + _, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power") + if err != nil { + log.Fatal("Create database power error: ", err) + } + _, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") + if err != nil { + log.Fatal("Create stable meters error: ", err) + } + _, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)") + if err != nil { + log.Fatal("Insert data to power.meters error: ", err) + } +} diff --git a/docs/examples/go/sqlquery/main.go b/docs/examples/go/sqlquery/main.go index f0e0f1c97e..1bfb74ca87 100644 --- a/docs/examples/go/sqlquery/main.go +++ b/docs/examples/go/sqlquery/main.go @@ -1,86 +1,86 @@ -package main - -import ( - "database/sql" - "fmt" - "log" - "time" - - _ "github.com/taosdata/driver-go/v3/taosSql" -) - -func main() { - var taosDSN = "root:taosdata@tcp(localhost:6030)/" - db, err := sql.Open("taosSql", taosDSN) - if err != nil { - log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error()) - } - defer db.Close() - // ANCHOR: create_db_and_table - // create database - res, err := db.Exec("CREATE DATABASE IF NOT EXISTS power") - if err != nil { - log.Fatalln("Failed to create database power, ErrMessage: " + err.Error()) - } - rowsAffected, err := res.RowsAffected() - if err != nil { - log.Fatalln("Failed to get create database rowsAffected, ErrMessage: " + err.Error()) - } - // you can check rowsAffected here - fmt.Println("Create database power successfully, rowsAffected: ", rowsAffected) - // create table - res, err = db.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") - if err != nil { - log.Fatalln("Failed to create stable meters, ErrMessage: " + err.Error()) - } - rowsAffected, err = res.RowsAffected() - if err != nil { - log.Fatalln("Failed to get create stable rowsAffected, ErrMessage: " + err.Error()) - } - // you can check rowsAffected here - fmt.Println("Create stable power.meters successfully, rowsAffected:", rowsAffected) - // ANCHOR_END: create_db_and_table - // ANCHOR: insert_data - // insert data, please make sure the database and table are created before - insertQuery := "INSERT INTO " + - "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 219, 0.31000) " + - "(NOW + 2a, 12.60000, 218, 0.33000) " + - "(NOW + 3a, 12.30000, 221, 0.31000) " + - "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + - "VALUES " + - "(NOW + 1a, 10.30000, 218, 0.25000) " - res, err = db.Exec(insertQuery) - if err != nil { - log.Fatal("Failed to insert data to power.meters, ErrMessage: " + err.Error()) - } - rowsAffected, err = res.RowsAffected() - if err != nil { - log.Fatal("Failed to get insert rowsAffected, ErrMessage: " + err.Error()) - } - // you can check affectedRows here - fmt.Printf("Successfully inserted %d rows to power.meters.\n", rowsAffected) - // ANCHOR_END: insert_data - // ANCHOR: select_data - // query data, make sure the database and table are created before - sql := "SELECT ts, current, location FROM power.meters limit 100" - rows, err := db.Query(sql) - if err != nil { - log.Fatal("Failed to query data from power.meters, ErrMessage: " + err.Error()) - } - for rows.Next() { - var ( - ts time.Time - current float32 - location string - ) - err = rows.Scan(&ts, ¤t, &location) - if err != nil { - log.Fatal("Failed to scan data, sql:" + sql + ", ErrMessage: " + err.Error()) - } - // you can check data here - fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location) - } - // ANCHOR_END: select_data -} +package main + +import ( + "database/sql" + "fmt" + "log" + "time" + + _ "github.com/taosdata/driver-go/v3/taosSql" +) + +func main() { + var taosDSN = "root:taosdata@tcp(localhost:6030)/" + db, err := sql.Open("taosSql", taosDSN) + if err != nil { + log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error()) + } + defer db.Close() + // ANCHOR: create_db_and_table + // create database + res, err := db.Exec("CREATE DATABASE IF NOT EXISTS power") + if err != nil { + log.Fatalln("Failed to create database power, ErrMessage: " + err.Error()) + } + rowsAffected, err := res.RowsAffected() + if err != nil { + log.Fatalln("Failed to get create database rowsAffected, ErrMessage: " + err.Error()) + } + // you can check rowsAffected here + fmt.Println("Create database power successfully, rowsAffected: ", rowsAffected) + // create table + res, err = db.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") + if err != nil { + log.Fatalln("Failed to create stable meters, ErrMessage: " + err.Error()) + } + rowsAffected, err = res.RowsAffected() + if err != nil { + log.Fatalln("Failed to get create stable rowsAffected, ErrMessage: " + err.Error()) + } + // you can check rowsAffected here + fmt.Println("Create stable power.meters successfully, rowsAffected:", rowsAffected) + // ANCHOR_END: create_db_and_table + // ANCHOR: insert_data + // insert data, please make sure the database and table are created before + insertQuery := "INSERT INTO " + + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 219, 0.31000) " + + "(NOW + 2a, 12.60000, 218, 0.33000) " + + "(NOW + 3a, 12.30000, 221, 0.31000) " + + "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 218, 0.25000) " + res, err = db.Exec(insertQuery) + if err != nil { + log.Fatalf("Failed to insert data to power.meters, sql: %s, ErrMessage: %s\n", insertQuery, err.Error()) + } + rowsAffected, err = res.RowsAffected() + if err != nil { + log.Fatalf("Failed to get insert rowsAffected, sql: %s, ErrMessage: %s\n", insertQuery, err.Error()) + } + // you can check affectedRows here + fmt.Printf("Successfully inserted %d rows to power.meters.\n", rowsAffected) + // ANCHOR_END: insert_data + // ANCHOR: select_data + // query data, make sure the database and table are created before + sql := "SELECT ts, current, location FROM power.meters limit 100" + rows, err := db.Query(sql) + if err != nil { + log.Fatalf("Failed to query data from power.meters, sql: %s, ErrMessage: %s\n", sql, err.Error()) + } + for rows.Next() { + // Add your data processing logic here + var ( + ts time.Time + current float32 + location string + ) + err = rows.Scan(&ts, ¤t, &location) + if err != nil { + log.Fatalf("Failed to scan data, sql: %s, ErrMessage: %s\n", sql, err) + } + fmt.Printf("ts: %s, current: %f, location: %s\n", ts, current, location) + } + // ANCHOR_END: select_data +} diff --git a/docs/examples/go/tmq/native/main.go b/docs/examples/go/tmq/native/main.go index 638a07d235..8d667abc18 100644 --- a/docs/examples/go/tmq/native/main.go +++ b/docs/examples/go/tmq/native/main.go @@ -1,136 +1,179 @@ -package main - -import ( - "database/sql" - "fmt" - "log" - "time" - - "github.com/taosdata/driver-go/v3/af/tmq" - tmqcommon "github.com/taosdata/driver-go/v3/common/tmq" - _ "github.com/taosdata/driver-go/v3/taosSql" -) - -var done = make(chan struct{}) - -func main() { - // init env - taosDSN := "root:taosdata@tcp(127.0.0.1:6030)/" - conn, err := sql.Open("taosSql", taosDSN) - if err != nil { - log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error()) - } - defer func() { - conn.Close() - }() - initEnv(conn) - // ANCHOR: create_consumer - // create consumer - groupID := "group1" - clientID := "client1" - host := "127.0.0.1" - consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "auto.offset.reset": "latest", - "msg.with.table.name": "true", - "enable.auto.commit": "true", - "auto.commit.interval.ms": "1000", - "group.id": groupID, - "client.id": clientID, - }) - if err != nil { - log.Fatalln("Failed to create native consumer, host : " + host + "; ErrMessage: " + err.Error()) - } - log.Println("Create consumer successfully, host: " + host + ", groupId: " + groupID + ", clientId: " + clientID) - - // ANCHOR_END: create_consumer - // ANCHOR: subscribe - err = consumer.Subscribe("topic_meters", nil) - if err != nil { - log.Fatalln("Failed to subscribe topic_meters, ErrMessage: " + err.Error()) - } - log.Println("Subscribe topics successfully") - for i := 0; i < 50; i++ { - ev := consumer.Poll(100) - if ev != nil { - switch e := ev.(type) { - case *tmqcommon.DataMessage: - // process your data here - fmt.Printf("data:%v\n", e) - // ANCHOR: commit_offset - // commit offset - _, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition}) - if err != nil { - log.Fatalln("Failed to commit offset, ErrMessage: " + err.Error()) - } - log.Println("Commit offset manually successfully.") - // ANCHOR_END: commit_offset - case tmqcommon.Error: - fmt.Printf("%% Error: %v: %v\n", e.Code(), e) - log.Fatalln("Failed to poll data, ErrMessage: " + err.Error()) - } - } - } - // ANCHOR_END: subscribe - // ANCHOR: seek - // get assignment - partitions, err := consumer.Assignment() - if err != nil { - log.Fatal("Failed to get assignment, ErrMessage: " + err.Error()) - } - fmt.Println("Now assignment:", partitions) - for i := 0; i < len(partitions); i++ { - // seek to the beginning - err = consumer.Seek(tmqcommon.TopicPartition{ - Topic: partitions[i].Topic, - Partition: partitions[i].Partition, - Offset: 0, - }, 0) - if err != nil { - log.Fatalln("Failed to execute seek example, ErrMessage: " + err.Error()) - } - } - fmt.Println("Assignment seek to beginning successfully") - // ANCHOR_END: seek - // ANCHOR: close - // unsubscribe - err = consumer.Unsubscribe() - if err != nil { - log.Fatal("Failed to unsubscribe consumer, ErrMessage: " + err.Error()) - } - fmt.Println("Consumer unsubscribed successfully.") - // close consumer - err = consumer.Close() - if err != nil { - log.Fatal("Failed to close consumer, ErrMessage: " + err.Error()) - } - fmt.Println("Consumer closed successfully.") - // ANCHOR_END: close - <-done -} - -func initEnv(conn *sql.DB) { - _, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power") - if err != nil { - log.Fatal("Failed to create database, ErrMessage: " + err.Error()) - } - _, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") - if err != nil { - log.Fatal("Failed to create stable, ErrMessage: " + err.Error()) - } - _, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters") - if err != nil { - log.Fatal("Failed to create topic, ErrMessage: " + err.Error()) - } - go func() { - for i := 0; i < 10; i++ { - time.Sleep(time.Second) - _, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)") - if err != nil { - log.Fatal("Failed to insert data, ErrMessage: " + err.Error()) - } - } - done <- struct{}{} - }() -} +package main + +import ( + "database/sql" + "fmt" + "log" + "time" + + "github.com/taosdata/driver-go/v3/af/tmq" + tmqcommon "github.com/taosdata/driver-go/v3/common/tmq" + _ "github.com/taosdata/driver-go/v3/taosSql" +) + +var done = make(chan struct{}) +var groupID string +var clientID string +var host string +var topic string + +func main() { + // init env + taosDSN := "root:taosdata@tcp(127.0.0.1:6030)/" + conn, err := sql.Open("taosSql", taosDSN) + if err != nil { + log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error()) + } + defer func() { + conn.Close() + }() + initEnv(conn) + // ANCHOR: create_consumer + // create consumer + groupID = "group1" + clientID = "client1" + host = "127.0.0.1" + consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "latest", + "msg.with.table.name": "true", + "enable.auto.commit": "true", + "auto.commit.interval.ms": "1000", + "group.id": groupID, + "client.id": clientID, + }) + if err != nil { + log.Fatalf( + "Failed to create native consumer, host: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", + host, + groupID, + clientID, + err.Error(), + ) + } + log.Printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s\n", host, groupID, clientID) + + // ANCHOR_END: create_consumer + // ANCHOR: subscribe + topic = "topic_meters" + err = consumer.Subscribe(topic, nil) + if err != nil { + log.Fatalf( + "Failed to subscribe topic_meters, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", + topic, + groupID, + clientID, + err.Error(), + ) + } + log.Println("Subscribe topics successfully") + for i := 0; i < 50; i++ { + ev := consumer.Poll(100) + if ev != nil { + switch e := ev.(type) { + case *tmqcommon.DataMessage: + // process your data here + fmt.Printf("data:%v\n", e) + // ANCHOR: commit_offset + // commit offset + _, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition}) + if err != nil { + log.Fatalf( + "Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n", + topic, + groupID, + clientID, + e.TopicPartition, + err.Error(), + ) + } + log.Println("Commit offset manually successfully.") + // ANCHOR_END: commit_offset + case tmqcommon.Error: + log.Fatalf("Failed to poll data, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", topic, groupID, clientID, e.Error()) + } + } + } + // ANCHOR_END: subscribe + // ANCHOR: seek + // get assignment + partitions, err := consumer.Assignment() + if err != nil { + log.Fatalf("Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", topic, groupID, clientID, err.Error()) + } + fmt.Println("Now assignment:", partitions) + for i := 0; i < len(partitions); i++ { + // seek to the beginning + err = consumer.Seek(tmqcommon.TopicPartition{ + Topic: partitions[i].Topic, + Partition: partitions[i].Partition, + Offset: 0, + }, 0) + if err != nil { + log.Fatalf( + "Failed to execute seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n", + topic, + groupID, + clientID, + partitions[i].Partition, + 0, + err.Error(), + ) + } + } + fmt.Println("Assignment seek to beginning successfully") + // ANCHOR_END: seek + // ANCHOR: close + // unsubscribe + err = consumer.Unsubscribe() + if err != nil { + log.Fatalf( + "Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", + topic, + groupID, + clientID, + err.Error(), + ) + } + fmt.Println("Consumer unsubscribed successfully.") + // close consumer + err = consumer.Close() + if err != nil { + log.Fatalf( + "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", + topic, + groupID, + clientID, + err.Error(), + ) + } + fmt.Println("Consumer closed successfully.") + // ANCHOR_END: close + <-done +} + +func initEnv(conn *sql.DB) { + _, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power") + if err != nil { + log.Fatal("Failed to create database, ErrMessage: " + err.Error()) + } + _, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") + if err != nil { + log.Fatal("Failed to create stable, ErrMessage: " + err.Error()) + } + _, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters") + if err != nil { + log.Fatal("Failed to create topic, ErrMessage: " + err.Error()) + } + go func() { + for i := 0; i < 10; i++ { + time.Sleep(time.Second) + _, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)") + if err != nil { + log.Fatal("Failed to insert data, ErrMessage: " + err.Error()) + } + } + done <- struct{}{} + }() +} diff --git a/docs/examples/go/tmq/ws/main.go b/docs/examples/go/tmq/ws/main.go index 70ea3af0b3..9ea4d72b39 100644 --- a/docs/examples/go/tmq/ws/main.go +++ b/docs/examples/go/tmq/ws/main.go @@ -1,141 +1,197 @@ -package main - -import ( - "database/sql" - "fmt" - "log" - "time" - - "github.com/taosdata/driver-go/v3/common" - tmqcommon "github.com/taosdata/driver-go/v3/common/tmq" - _ "github.com/taosdata/driver-go/v3/taosWS" - "github.com/taosdata/driver-go/v3/ws/tmq" -) - -var done = make(chan struct{}) - -func main() { - // init env - taosDSN := "root:taosdata@ws(127.0.0.1:6041)/" - conn, err := sql.Open("taosWS", taosDSN) - if err != nil { - log.Fatalln("Failed to connect to " + taosDSN + "; ErrMessage: " + err.Error()) - } - defer func() { - conn.Close() - }() - initEnv(conn) - // ANCHOR: create_consumer - // create consumer - wsUrl := "ws://127.0.0.1:6041" - groupID := "group1" - clientID := "client1" - consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ - "ws.url": wsUrl, - "ws.message.channelLen": uint(0), - "ws.message.timeout": common.DefaultMessageTimeout, - "ws.message.writeWait": common.DefaultWriteWait, - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "auto.offset.reset": "latest", - "msg.with.table.name": "true", - "enable.auto.commit": "true", - "auto.commit.interval.ms": "1000", - "group.id": groupID, - "client.id": clientID, - }) - if err != nil { - log.Fatalln("Failed to create websocket consumer, host : " + wsUrl + "; ErrMessage: " + err.Error()) - } - log.Println("Create consumer successfully, host: " + wsUrl + ", groupId: " + groupID + ", clientId: " + clientID) - - // ANCHOR_END: create_consumer - // ANCHOR: subscribe - err = consumer.Subscribe("topic_meters", nil) - if err != nil { - log.Fatalln("Failed to subscribe topic_meters, ErrMessage: " + err.Error()) - } - log.Println("Subscribe topics successfully") - for i := 0; i < 50; i++ { - ev := consumer.Poll(100) - if ev != nil { - switch e := ev.(type) { - case *tmqcommon.DataMessage: - // process your data here - fmt.Printf("data:%v\n", e) - // ANCHOR: commit_offset - // commit offset - _, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition}) - if err != nil { - log.Fatalln("Failed to commit offset, ErrMessage: " + err.Error()) - } - log.Println("Commit offset manually successfully.") - // ANCHOR_END: commit_offset - case tmqcommon.Error: - fmt.Printf("%% Error: %v: %v\n", e.Code(), e) - log.Fatalln("Failed to poll data, ErrMessage: " + err.Error()) - } - } - } - // ANCHOR_END: subscribe - // ANCHOR: seek - // get assignment - partitions, err := consumer.Assignment() - if err != nil { - log.Fatal("Failed to get assignment, ErrMessage: " + err.Error()) - } - fmt.Println("Now assignment:", partitions) - for i := 0; i < len(partitions); i++ { - // seek to the beginning - err = consumer.Seek(tmqcommon.TopicPartition{ - Topic: partitions[i].Topic, - Partition: partitions[i].Partition, - Offset: 0, - }, 0) - if err != nil { - log.Fatalln("Failed to execute seek example, ErrMessage: " + err.Error()) - } - } - fmt.Println("Assignment seek to beginning successfully") - // ANCHOR_END: seek - // ANCHOR: close - // unsubscribe - err = consumer.Unsubscribe() - if err != nil { - log.Fatal("Failed to unsubscribe consumer, ErrMessage: " + err.Error()) - } - fmt.Println("Consumer unsubscribed successfully.") - // close consumer - err = consumer.Close() - if err != nil { - log.Fatal("Failed to close consumer, ErrMessage: " + err.Error()) - } - fmt.Println("Consumer closed successfully.") - // ANCHOR_END: close - <-done -} - -func initEnv(conn *sql.DB) { - _, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power") - if err != nil { - log.Fatal("Failed to create database, ErrMessage: " + err.Error()) - } - _, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") - if err != nil { - log.Fatal("Failed to create stable, ErrMessage: " + err.Error()) - } - _, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters") - if err != nil { - log.Fatal("Failed to create topic, ErrMessage: " + err.Error()) - } - go func() { - for i := 0; i < 10; i++ { - time.Sleep(time.Second) - _, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)") - if err != nil { - log.Fatal("Failed to insert data, ErrMessage: " + err.Error()) - } - } - done <- struct{}{} - }() -} +package main + +import ( + "database/sql" + "fmt" + "log" + "time" + + "github.com/taosdata/driver-go/v3/common" + tmqcommon "github.com/taosdata/driver-go/v3/common/tmq" + _ "github.com/taosdata/driver-go/v3/taosWS" + "github.com/taosdata/driver-go/v3/ws/tmq" +) + +var done = make(chan struct{}) +var groupID string +var clientID string +var host string +var topic string + +func main() { + // init env + taosDSN := "root:taosdata@ws(127.0.0.1:6041)/" + conn, err := sql.Open("taosWS", taosDSN) + if err != nil { + log.Fatalln("Failed to connect to " + taosDSN + ", ErrMessage: " + err.Error()) + } + defer func() { + conn.Close() + }() + initEnv(conn) + // ANCHOR: create_consumer + // create consumer + wsUrl := "ws://127.0.0.1:6041" + groupID = "group1" + clientID = "client1" + host = "127.0.0.1" + consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ + "ws.url": wsUrl, + "ws.message.channelLen": uint(0), + "ws.message.timeout": common.DefaultMessageTimeout, + "ws.message.writeWait": common.DefaultWriteWait, + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "latest", + "msg.with.table.name": "true", + "enable.auto.commit": "true", + "auto.commit.interval.ms": "1000", + "group.id": groupID, + "client.id": clientID, + }) + if err != nil { + log.Fatalf( + "Failed to create websocket consumer, host: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", + host, + groupID, + clientID, + err.Error(), + ) + } + log.Printf("Create consumer successfully, host: %s, groupId: %s, clientId: %s\n", host, groupID, clientID) + + // ANCHOR_END: create_consumer + // ANCHOR: subscribe + topic = "topic_meters" + err = consumer.Subscribe(topic, nil) + if err != nil { + log.Fatalf( + "Failed to subscribe topic_meters, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", + topic, + groupID, + clientID, + err.Error(), + ) + } + log.Println("Subscribe topics successfully") + for i := 0; i < 50; i++ { + ev := consumer.Poll(100) + if ev != nil { + switch e := ev.(type) { + case *tmqcommon.DataMessage: + // process your data here + fmt.Printf("data:%v\n", e) + // ANCHOR: commit_offset + // commit offset + _, err = consumer.CommitOffsets([]tmqcommon.TopicPartition{e.TopicPartition}) + if err != nil { + log.Fatalf( + "Failed to commit offset, topic: %s, groupId: %s, clientId: %s, offset %s, ErrMessage: %s\n", + topic, + groupID, + clientID, + e.TopicPartition, + err.Error(), + ) + } + log.Println("Commit offset manually successfully.") + // ANCHOR_END: commit_offset + case tmqcommon.Error: + log.Fatalf( + "Failed to poll data, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", + topic, + groupID, + clientID, + e.Error(), + ) + } + } + } + // ANCHOR_END: subscribe + // ANCHOR: seek + // get assignment + partitions, err := consumer.Assignment() + if err != nil { + log.Fatalf( + "Failed to get assignment, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", + topic, + groupID, + clientID, + err.Error(), + ) + } + fmt.Println("Now assignment:", partitions) + for i := 0; i < len(partitions); i++ { + // seek to the beginning + err = consumer.Seek(tmqcommon.TopicPartition{ + Topic: partitions[i].Topic, + Partition: partitions[i].Partition, + Offset: 0, + }, 0) + if err != nil { + log.Fatalf( + "Failed to seek offset, topic: %s, groupId: %s, clientId: %s, partition: %d, offset: %d, ErrMessage: %s\n", + topic, + groupID, + clientID, + partitions[i].Partition, + 0, + err.Error(), + ) + } + } + fmt.Println("Assignment seek to beginning successfully") + // ANCHOR_END: seek + // ANCHOR: close + // unsubscribe + err = consumer.Unsubscribe() + if err != nil { + log.Fatalf( + "Failed to unsubscribe consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", + topic, + groupID, + clientID, + err.Error(), + ) + } + fmt.Println("Consumer unsubscribed successfully.") + // close consumer + err = consumer.Close() + if err != nil { + log.Fatalf( + "Failed to close consumer, topic: %s, groupId: %s, clientId: %s, ErrMessage: %s\n", + topic, + groupID, + clientID, + err.Error(), + ) + } + fmt.Println("Consumer closed successfully.") + // ANCHOR_END: close + <-done +} + +func initEnv(conn *sql.DB) { + _, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power") + if err != nil { + log.Fatal("Failed to create database, ErrMessage: " + err.Error()) + } + _, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") + if err != nil { + log.Fatal("Failed to create stable, ErrMessage: " + err.Error()) + } + _, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters") + if err != nil { + log.Fatal("Failed to create topic, ErrMessage: " + err.Error()) + } + go func() { + for i := 0; i < 10; i++ { + time.Sleep(time.Second) + _, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)") + if err != nil { + log.Fatal("Failed to insert data, ErrMessage: " + err.Error()) + } + } + done <- struct{}{} + }() +}