docs: update go and c# example

This commit is contained in:
t_max 2024-08-12 19:48:41 +08:00
parent 15ef067070
commit 0cd09c02de
4 changed files with 33 additions and 106 deletions

View File

@ -152,41 +152,24 @@ namespace TMQExample
{ {
// get assignment // get assignment
var assignment = consumer.Assignment; var assignment = consumer.Assignment;
Console.WriteLine($"now assignment: ${assignment}"); Console.WriteLine($"now assignment: {assignment}");
// seek to the beginning // seek to the beginning
foreach (var topicPartition in assignment) foreach (var topicPartition in assignment)
{ {
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0)); consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
} }
Console.WriteLine("assignment seek to beginning successfully"); Console.WriteLine("Assignment seek to beginning successfully");
// poll data again
for (int i = 0; i < 50; i++)
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
// handle message
Console.WriteLine(
$"second data polled: {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
break;
}
}
} }
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("seek example failed; ErrCode:" + e.Code + "; ErrMessage: " + e.Error); Console.WriteLine("Seek example failed; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
throw; throw;
} }
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine("seek example failed; ErrMessage: " + e.Message); Console.WriteLine("Seek example failed; ErrMessage: " + e.Message);
throw; throw;
} }
// ANCHOR_END: seek // ANCHOR_END: seek
@ -208,7 +191,7 @@ namespace TMQExample
{ {
cr.TopicPartitionOffset, cr.TopicPartitionOffset,
}); });
Console.WriteLine("commit offset manually successfully."); Console.WriteLine("Commit offset manually successfully.");
} }
} }
catch (TDengineError e) catch (TDengineError e)
@ -251,7 +234,7 @@ namespace TMQExample
{ {
// close consumer // close consumer
consumer.Close(); consumer.Close();
Console.WriteLine("consumer closed successfully."); Console.WriteLine("Consumer closed successfully.");
} }
// ANCHOR_END: close // ANCHOR_END: close
} }

View File

@ -118,7 +118,7 @@ namespace TMQExample
{ {
// subscribe // subscribe
consumer.Subscribe(new List<string>() { "topic_meters" }); consumer.Subscribe(new List<string>() { "topic_meters" });
Console.WriteLine("subscribe topics successfully"); Console.WriteLine("Subscribe topics successfully");
for (int i = 0; i < 50; i++) for (int i = 0; i < 50; i++)
{ {
// consume message with using block to ensure the result is disposed // consume message with using block to ensure the result is disposed
@ -157,42 +157,24 @@ namespace TMQExample
{ {
// get assignment // get assignment
var assignment = consumer.Assignment; var assignment = consumer.Assignment;
Console.WriteLine($"now assignment: ${assignment}"); Console.WriteLine($"Now assignment: {assignment}");
// seek to the beginning // seek to the beginning
foreach (var topicPartition in assignment) foreach (var topicPartition in assignment)
{ {
consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0)); consumer.Seek(new TopicPartitionOffset(topicPartition.Topic, topicPartition.Partition, 0));
} }
Console.WriteLine("assignment seek to beginning successfully"); Console.WriteLine("Assignment seek to beginning successfully");
// poll data again
for (int i = 0; i < 50; i++)
{
// consume message with using block to ensure the result is disposed
using (var cr = consumer.Consume(100))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
// handle message
Console.WriteLine(
$"second data polled: {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
break;
}
}
} }
catch (TDengineError e) catch (TDengineError e)
{ {
// handle TDengine error // handle TDengine error
Console.WriteLine("seek example failed; ErrCode:" + e.Code + "; ErrMessage: " + e.Error); Console.WriteLine("Seek example failed; ErrCode:" + e.Code + "; ErrMessage: " + e.Error);
throw; throw;
} }
catch (Exception e) catch (Exception e)
{ {
// handle other exceptions // handle other exceptions
Console.WriteLine("seek example failed; ErrMessage: " + e.Message); Console.WriteLine("Seek example failed; ErrMessage: " + e.Message);
throw; throw;
} }
// ANCHOR_END: seek // ANCHOR_END: seek
@ -214,7 +196,7 @@ namespace TMQExample
{ {
cr.TopicPartitionOffset, cr.TopicPartitionOffset,
}); });
Console.WriteLine("commit offset manually successfully."); Console.WriteLine("Commit offset manually successfully.");
} }
} }
catch (TDengineError e) catch (TDengineError e)
@ -257,7 +239,7 @@ namespace TMQExample
{ {
// close consumer // close consumer
consumer.Close(); consumer.Close();
Console.WriteLine("consumer closed successfully."); Console.WriteLine("Consumer closed successfully.");
} }
// ANCHOR_END: close // ANCHOR_END: close
} }

View File

@ -50,7 +50,7 @@ func main() {
if err != nil { if err != nil {
log.Fatalln("Failed to subscribe, host : " + host + "; ErrMessage: " + err.Error()) log.Fatalln("Failed to subscribe, host : " + host + "; ErrMessage: " + err.Error())
} }
log.Println("subscribe topics successfully") log.Println("Subscribe topics successfully")
for i := 0; i < 50; i++ { for i := 0; i < 50; i++ {
ev := consumer.Poll(100) ev := consumer.Poll(100)
if ev != nil { if ev != nil {
@ -64,7 +64,7 @@ func main() {
if err != nil { if err != nil {
log.Fatalln("Failed to commit offset, host : " + host + "; ErrMessage: " + err.Error()) log.Fatalln("Failed to commit offset, host : " + host + "; ErrMessage: " + err.Error())
} }
log.Println("commit offset manually successfully.") log.Println("Commit offset manually successfully.")
// ANCHOR_END: commit_offset // ANCHOR_END: commit_offset
case tmqcommon.Error: case tmqcommon.Error:
fmt.Printf("%% Error: %v: %v\n", e.Code(), e) fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
@ -77,9 +77,9 @@ func main() {
// get assignment // get assignment
partitions, err := consumer.Assignment() partitions, err := consumer.Assignment()
if err != nil { if err != nil {
log.Fatal("failed to get assignment, err:", err) log.Fatal("Failed to get assignment; ErrMessage: " + err.Error())
} }
fmt.Println("now assignment:", partitions) fmt.Println("Now assignment:", partitions)
for i := 0; i < len(partitions); i++ { for i := 0; i < len(partitions); i++ {
// seek to the beginning // seek to the beginning
err = consumer.Seek(tmqcommon.TopicPartition{ err = consumer.Seek(tmqcommon.TopicPartition{
@ -88,29 +88,10 @@ func main() {
Offset: 0, Offset: 0,
}, 0) }, 0)
if err != nil { if err != nil {
log.Fatalln("seek example failed; ErrMessage: " + err.Error()) log.Fatalln("Seek example failed; ErrMessage: " + err.Error())
}
}
fmt.Println("assignment seek to beginning successfully")
// poll data again
gotData := false
for i := 0; i < 50; i++ {
if gotData {
break
}
ev := consumer.Poll(100)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
// process your data here
fmt.Printf("second data polled:%v\n", e)
gotData = true
case tmqcommon.Error:
fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
log.Fatal("failed to get message, err:", e)
}
} }
} }
fmt.Println("Assignment seek to beginning successfully")
// ANCHOR_END: seek // ANCHOR_END: seek
// ANCHOR: close // ANCHOR: close
// unsubscribe // unsubscribe
@ -130,22 +111,22 @@ func main() {
func initEnv(conn *sql.DB) { func initEnv(conn *sql.DB) {
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power") _, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil { if err != nil {
log.Fatal("failed to create database, err:", err) log.Fatal("Failed to create database. ErrMessage: " + err.Error())
} }
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") _, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil { if err != nil {
log.Fatal("failed to create stable, err:", err) log.Fatal("Failed to create stable. ErrMessage: " + err.Error())
} }
_, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters") _, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters")
if err != nil { if err != nil {
log.Fatal("failed to create topic, err:", err) log.Fatal("Failed to create topic. ErrMessage: " + err.Error())
} }
go func() { go func() {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
time.Sleep(time.Second) time.Sleep(time.Second)
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)") _, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)")
if err != nil { if err != nil {
log.Fatal("failed to insert data, err:", err) log.Fatal("Failed to insert data. ErrMessage: " + err.Error())
} }
} }
done <- struct{}{} done <- struct{}{}

View File

@ -55,7 +55,7 @@ func main() {
if err != nil { if err != nil {
log.Fatalln("Failed to subscribe, host : " + wsUrl + "; ErrMessage: " + err.Error()) log.Fatalln("Failed to subscribe, host : " + wsUrl + "; ErrMessage: " + err.Error())
} }
log.Println("subscribe topics successfully") log.Println("Subscribe topics successfully")
for i := 0; i < 50; i++ { for i := 0; i < 50; i++ {
ev := consumer.Poll(100) ev := consumer.Poll(100)
if ev != nil { if ev != nil {
@ -69,7 +69,7 @@ func main() {
if err != nil { if err != nil {
log.Fatalln("Failed to commit offset, host : " + wsUrl + "; ErrMessage: " + err.Error()) log.Fatalln("Failed to commit offset, host : " + wsUrl + "; ErrMessage: " + err.Error())
} }
log.Println("commit offset manually successfully.") log.Println("Commit offset manually successfully.")
// ANCHOR_END: commit_offset // ANCHOR_END: commit_offset
case tmqcommon.Error: case tmqcommon.Error:
fmt.Printf("%% Error: %v: %v\n", e.Code(), e) fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
@ -82,9 +82,9 @@ func main() {
// get assignment // get assignment
partitions, err := consumer.Assignment() partitions, err := consumer.Assignment()
if err != nil { if err != nil {
log.Fatal("failed to get assignment, err:", err) log.Fatal("Failed to get assignment; ErrMessage: " + err.Error())
} }
fmt.Println("now assignment:", partitions) fmt.Println("Now assignment:", partitions)
for i := 0; i < len(partitions); i++ { for i := 0; i < len(partitions); i++ {
// seek to the beginning // seek to the beginning
err = consumer.Seek(tmqcommon.TopicPartition{ err = consumer.Seek(tmqcommon.TopicPartition{
@ -93,29 +93,10 @@ func main() {
Offset: 0, Offset: 0,
}, 0) }, 0)
if err != nil { if err != nil {
log.Fatalln("seek example failed; ErrMessage: " + err.Error()) log.Fatalln("Seek example failed; ErrMessage: " + err.Error())
}
}
fmt.Println("assignment seek to beginning successfully")
// poll data again
gotData := false
for i := 0; i < 50; i++ {
if gotData {
break
}
ev := consumer.Poll(100)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
// process your data here
fmt.Printf("second data polled:%v\n", e)
gotData = true
case tmqcommon.Error:
fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
log.Fatal("failed to get message, err:", e)
}
} }
} }
fmt.Println("Assignment seek to beginning successfully")
// ANCHOR_END: seek // ANCHOR_END: seek
// ANCHOR: close // ANCHOR: close
// unsubscribe // unsubscribe
@ -135,22 +116,22 @@ func main() {
func initEnv(conn *sql.DB) { func initEnv(conn *sql.DB) {
_, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power") _, err := conn.Exec("CREATE DATABASE IF NOT EXISTS power")
if err != nil { if err != nil {
log.Fatal("failed to create database, err:", err) log.Fatal("Failed to create database. ErrMessage: " + err.Error())
} }
_, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))") _, err = conn.Exec("CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))")
if err != nil { if err != nil {
log.Fatal("failed to create stable, err:", err) log.Fatal("Failed to create stable. ErrMessage: " + err.Error())
} }
_, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters") _, err = conn.Exec("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters")
if err != nil { if err != nil {
log.Fatal("failed to create topic, err:", err) log.Fatal("Failed to create topic. ErrMessage: " + err.Error())
} }
go func() { go func() {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
time.Sleep(time.Second) time.Sleep(time.Second)
_, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)") _, err = conn.Exec("INSERT INTO power.d1001 USING power.meters TAGS (2, 'California.SanFrancisco') VALUES (NOW , 10.2, 219, 0.32)")
if err != nil { if err != nil {
log.Fatal("failed to insert data, err:", err) log.Fatal("Failed to insert data. ErrMessage: " + err.Error())
} }
} }
done <- struct{}{} done <- struct{}{}