homework-jianmu/docs/zh/14-reference/05-connector/40-csharp.mdx

1947 lines
69 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
toc_max_heading_level: 4
sidebar_label: C#
title: TDengine C# Connector
---
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
import RequestId from "./_request_id.mdx";
`TDengine.Connector` 是 TDengine 提供的 C# 语言连接器。C# 开发人员可以通过它开发存取 TDengine 集群数据的 C# 应用软件。
## 连接方式
`TDengine.Connector` 提供两种形式的连接器
* **原生连接**,通过 TDengine 客户端驱动程序taosc原生连接 TDengine 实例支持数据写入、查询、数据订阅、schemaless 接口和参数绑定接口等功能。
* **Websocket 连接**,通过 taosAdapter 提供的 Websocket 接口连接 TDengine 实例WebSocket 连接实现的功能集合和原生连接有少量不同。(自 v3.0.1 起)
连接方式的详细介绍请参考:[连接器建立连接的方式](../../develop/connect/#连接器建立连接的方式)
## 兼容性
* `TDengine.Connector` 3.1.0 版本进行了完整的重构,不再兼容 3.0.2 及以前版本。3.0.2 文档请参考 [nuget](https://www.nuget.org/packages/TDengine.Connector/3.0.2)
* `TDengine.Connector` 3.x 不兼容 TDengine 2.x如果在运行 TDengine 2.x 版本的环境下需要使用 C# 连接器请使用 TDengine.Connector 的 1.x 版本。
## 支持的平台
支持的平台和 TDengine 客户端驱动支持的平台一致。
:::warning
TDengine 不再支持 32 位 Windows 平台。
:::
## 版本支持
| **Connector 版本** | **TDengine 版本** | **主要功能** |
|------------------|------------------|----------------------------|
| 3.1.3 | 3.2.1.0/3.1.1.18 | 支持 WebSocket 自动重连 |
| 3.1.2 | 3.2.1.0/3.1.1.18 | 修复 schemaless 资源释放 |
| 3.1.1 | 3.2.1.0/3.1.1.18 | 支持 varbinary 和 geometry 类型 |
| 3.1.0 | 3.2.1.0/3.1.1.18 | WebSocket 使用原生实现 |
## 处理异常
`TDengine.Connector` 会抛出异常应用程序需要处理异常。taosc 异常类型 `TDengineError`,包含错误码和错误信息,应用程序可以根据错误码和错误信息进行处理。
## TDengine DataType 和 C# DataType
| TDengine DataType | C# Type |
|-------------------|------------------|
| TIMESTAMP | DateTime |
| TINYINT | sbyte |
| SMALLINT | short |
| INT | int |
| BIGINT | long |
| TINYINT UNSIGNED | byte |
| SMALLINT UNSIGNED | ushort |
| INT UNSIGNED | uint |
| BIGINT UNSIGNED | ulong |
| FLOAT | float |
| DOUBLE | double |
| BOOL | bool |
| BINARY | byte[] |
| NCHAR | string (utf-8编码) |
| JSON | byte[] |
| VARBINARY | byte[] |
| GEOMETRY | byte[] |
:::note
JSON 类型仅在 tag 中支持。
:::
## 安装步骤
### 安装前准备
* 安装 [.NET SDK](https://dotnet.microsoft.com/download)
* [Nuget 客户端](https://docs.microsoft.com/en-us/nuget/install-nuget-client-tools) (可选安装)
* 对于 Native 连接方式,需要安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)WebSocket 连接方式无需安装。
### 安装连接器
可以在当前 .NET 项目的路径下,通过 dotnet CLI 添加 Nuget package `TDengine.Connector` 到当前项目。
``` bash
dotnet add package TDengine.Connector
```
也可以修改当前项目的 `.csproj` 文件,添加如下 ItemGroup。
``` XML
<ItemGroup>
<PackageReference Include="TDengine.Connector" Version="3.1.*" />
</ItemGroup>
```
## 建立连接
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">
``` csharp
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
Console.WriteLine("connected");
}
```
</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">
```csharp
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
Console.WriteLine("connected");
}
```
</TabItem>
</Tabs>
ConnectionStringBuilder 支持的参数如下:
* protocol: 连接协议,可选值为 Native 或 WebSocket默认为 Native
* host: TDengine 或 taosadapter 运行实例的地址
* port: TDengine 或 taosadapter 运行实例的端口
* 当 protocol 为 WebSocket 时 useSSL 为 false 时port 默认为 6041
* 当 protocol 为 WebSocket 时 useSSL 为 true 时port 默认为 443
* useSSL: 是否使用 SSL 连接,仅当 protocol 为 WebSocket 时有效,默认为 false
* token: 连接 TDengine cloud 的 token仅当 protocol 为 WebSocket 时有效
* username: 连接 TDengine 的用户名
* password: 连接 TDengine 的密码
* db: 连接 TDengine 的数据库
* timezone: 解析时间结果的时区,默认为 `TimeZoneInfo.Local`,使用 `TimeZoneInfo.FindSystemTimeZoneById` 方法解析字符串为 `TimeZoneInfo` 对象。
* connTimeout: WebSocket 连接超时时间,仅当 protocol 为 WebSocket 时有效,默认为 1 分钟,使用 `TimeSpan.Parse` 方法解析字符串为 `TimeSpan` 对象。
* readTimeout: WebSocket 读超时时间,仅当 protocol 为 WebSocket 时有效,默认为 5 分钟,使用 `TimeSpan.Parse` 方法解析字符串为 `TimeSpan` 对象。
* writeTimeout: WebSocket 写超时时间,仅当 protocol 为 WebSocket 时有效,默认为 10 秒,使用 `TimeSpan.Parse` 方法解析字符串为 `TimeSpan` 对象。
* autoReconnect: 是否自动重连(连接器版本 3.1.3 及以上生效),默认为 false
> **注意**:启用自动重连仅对简单执行 SQL 语句以及 无模式写入、数据订阅有效。对于参数绑定无效。自动重连仅对连接建立时通过参数指定数据库有效,对后面的 `use db` 语句切换数据库无效。
* reconnectRetryCount: 重连次数(连接器版本 3.1.3 及以上生效),默认为 3
* reconnectIntervalMs: 重连间隔时间(连接器版本 3.1.3 及以上生效),默认为 2000
### 指定 URL 和 Properties 获取连接
C# 连接器不支持此功能
### 配置参数的优先级
C# 连接器不支持此功能
## 使用示例
### 创建数据库和表
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">
```csharp
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace NativeQuery
{
internal class Query
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("create database power");
client.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
```
</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">
```csharp
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace WSQuery
{
internal class Query
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("create database power");
client.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
```
</TabItem>
</Tabs>
### 插入数据
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">
```csharp
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace NativeQuery
{
internal class Query
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
string insertQuery =
"INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"('2023-10-03 14:38:05.000', 10.30000, 219, 0.31000) " +
"('2023-10-03 14:38:15.000', 12.60000, 218, 0.33000) " +
"('2023-10-03 14:38:16.800', 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"('2023-10-03 14:38:16.650', 10.30000, 218, 0.25000) " +
"power.d1003 USING power.meters TAGS(2,'California.LosAngeles') " +
"VALUES " +
"('2023-10-03 14:38:05.500', 11.80000, 221, 0.28000) " +
"('2023-10-03 14:38:16.600', 13.40000, 223, 0.29000) " +
"power.d1004 USING power.meters TAGS(3,'California.LosAngeles') " +
"VALUES " +
"('2023-10-03 14:38:05.000', 10.80000, 223, 0.29000) " +
"('2023-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
client.Exec(insertQuery);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
```
</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">
```csharp
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace WSQuery
{
internal class Query
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
string insertQuery =
"INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"('2023-10-03 14:38:05.000', 10.30000, 219, 0.31000) " +
"('2023-10-03 14:38:15.000', 12.60000, 218, 0.33000) " +
"('2023-10-03 14:38:16.800', 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"('2023-10-03 14:38:16.650', 10.30000, 218, 0.25000) " +
"power.d1003 USING power.meters TAGS(2,'California.LosAngeles') " +
"VALUES " +
"('2023-10-03 14:38:05.500', 11.80000, 221, 0.28000) " +
"('2023-10-03 14:38:16.600', 13.40000, 223, 0.29000) " +
"power.d1004 USING power.meters TAGS(3,'California.LosAngeles') " +
"VALUES " +
"('2023-10-03 14:38:05.000', 10.80000, 223, 0.29000) " +
"('2023-10-03 14:38:06.500', 11.50000, 221, 0.35000)";
client.Exec(insertQuery);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
```
</TabItem>
</Tabs>
### 查询数据
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">
```csharp
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace NativeQuery
{
internal class Query
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("use power");
string query = "SELECT * FROM meters";
using (var rows = client.Query(query))
{
while (rows.Read())
{
Console.WriteLine($"{((DateTime)rows.GetValue(0)):yyyy-MM-dd HH:mm:ss.fff}, {rows.GetValue(1)}, {rows.GetValue(2)}, {rows.GetValue(3)}, {rows.GetValue(4)}, {Encoding.UTF8.GetString((byte[])rows.GetValue(5))}");
}
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
```
</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">
```csharp
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace WSQuery
{
internal class Query
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("use power");
string query = "SELECT * FROM meters";
using (var rows = client.Query(query))
{
while (rows.Read())
{
Console.WriteLine($"{((DateTime)rows.GetValue(0)):yyyy-MM-dd HH:mm:ss.fff}, {rows.GetValue(1)}, {rows.GetValue(2)}, {rows.GetValue(3)}, {rows.GetValue(4)}, {Encoding.UTF8.GetString((byte[])rows.GetValue(5))}");
}
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
```
</TabItem>
</Tabs>
### 执行带有 reqId 的 SQL
<RequestId />
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">
```csharp
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace NativeQueryWithReqID
{
internal abstract class QueryWithReqID
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec($"create database if not exists test_db",ReqId.GetReqId());
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
```
</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">
```csharp
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace WSQueryWithReqID
{
internal abstract class QueryWithReqID
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec($"create database if not exists test_db",ReqId.GetReqId());
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
```
</TabItem>
</Tabs>
### 通过参数绑定写入数据
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">
```csharp
using System;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace NativeStmt
{
internal abstract class NativeStmt
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("create database power");
client.Exec(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
using (var stmt = client.StmtInit())
{
stmt.Prepare(
"Insert into power.d1001 using power.meters tags(2,'California.SanFrancisco') values(?,?,?,?)");
var ts = new DateTime(2023, 10, 03, 14, 38, 05, 000);
stmt.BindRow(new object[] { ts, (float)10.30000, (int)219, (float)0.31000 });
stmt.AddBatch();
stmt.Exec();
var affected = stmt.Affected();
Console.WriteLine($"affected rows: {affected}");
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
}
}
```
</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">
```csharp
using System;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace WSStmt
{
internal abstract class WSStmt
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("create database power");
client.Exec(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
using (var stmt = client.StmtInit())
{
stmt.Prepare(
"Insert into power.d1001 using power.meters tags(2,'California.SanFrancisco') values(?,?,?,?)");
var ts = new DateTime(2023, 10, 03, 14, 38, 05, 000);
stmt.BindRow(new object[] { ts, (float)10.30000, (int)219, (float)0.31000 });
stmt.AddBatch();
stmt.Exec();
var affected = stmt.Affected();
Console.WriteLine($"affected rows: {affected}");
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
}
}
```
</TabItem>
</Tabs>
注意:使用 BindRow 需要注意原始 C# 列类型与 TDengine 列类型的需要一一对应,具体对应关系请参考 [TDengine DataType 和 C# DataType](#tdengine-datatype-和-c-datatype)。
### 无模式写入
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">
```csharp
using TDengine.Driver;
using TDengine.Driver.Client;
namespace NativeSchemaless
{
internal class Program
{
public static void Main(string[] args)
{
var builder =
new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
client.Exec("create database sml");
client.Exec("use sml");
var influxDBData =
"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000";
client.SchemalessInsert(new string[] { influxDBData },
TDengineSchemalessProtocol.TSDB_SML_LINE_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NANO_SECONDS, 0, ReqId.GetReqId());
var telnetData = "stb0_0 1626006833 4 host=host0 interface=eth0";
client.SchemalessInsert(new string[] { telnetData },
TDengineSchemalessProtocol.TSDB_SML_TELNET_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
var jsonData =
"{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
client.SchemalessInsert(new string[] { jsonData }, TDengineSchemalessProtocol.TSDB_SML_JSON_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
}
}
}
}
```
</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">
```csharp
using TDengine.Driver;
using TDengine.Driver.Client;
namespace WSSchemaless
{
internal class Program
{
public static void Main(string[] args)
{
var builder =
new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
client.Exec("create database sml");
client.Exec("use sml");
var influxDBData =
"st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000";
client.SchemalessInsert(new string[] { influxDBData },
TDengineSchemalessProtocol.TSDB_SML_LINE_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_NANO_SECONDS, 0, ReqId.GetReqId());
var telnetData = "stb0_0 1626006833 4 host=host0 interface=eth0";
client.SchemalessInsert(new string[] { telnetData },
TDengineSchemalessProtocol.TSDB_SML_TELNET_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
var jsonData =
"{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
client.SchemalessInsert(new string[] { jsonData }, TDengineSchemalessProtocol.TSDB_SML_JSON_PROTOCOL,
TDengineSchemalessPrecision.TSDB_SML_TIMESTAMP_MILLI_SECONDS, 0, ReqId.GetReqId());
}
}
}
}
```
</TabItem>
</Tabs>
### 执行带有 reqId 的无模式写入
```csharp
public void SchemalessInsert(string[] lines, TDengineSchemalessProtocol protocol,
TDengineSchemalessPrecision precision,
int ttl, long reqId)
```
### 数据订阅
#### 创建 Topic
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">
```csharp
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace NativeSubscription
{
internal class Program
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("create database power");
client.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
client.Exec("CREATE TOPIC topic_meters as SELECT * from power.meters");
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
```
</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">
```csharp
using System;
using System.Text;
using TDengine.Driver;
using TDengine.Driver.Client;
namespace WSSubscription
{
internal class Program
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("create database power");
client.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
client.Exec("CREATE TOPIC topic_meters as SELECT * from power.meters");
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
}
}
```
</TabItem>
</Tabs>
#### 创建 Consumer
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">
```csharp
var cfg = new Dictionary<string, string>()
{
{ "group.id", "group1" },
{ "auto.offset.reset", "latest" },
{ "td.connect.ip", "127.0.0.1" },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
{ "td.connect.port", "6030" },
{ "client.id", "tmq_example" },
{ "enable.auto.commit", "true" },
{ "msg.with.table.name", "false" },
};
var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
```
</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">
```csharp
var cfg = new Dictionary<string, string>()
{
{ "td.connect.type", "WebSocket" },
{ "group.id", "group1" },
{ "auto.offset.reset", "latest" },
{ "td.connect.ip", "localhost" },
{ "td.connect.port", "6041" },
{ "useSSL", "false" },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
{ "client.id", "tmq_example" },
{ "enable.auto.commit", "true" },
{ "msg.with.table.name", "false" },
};
var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
```
</TabItem>
</Tabs>
consumer 支持的配置参数如下:
* td.connect.type: 连接类型,可选值为 Native 或 WebSocket默认为 Native
* td.connect.ip: TDengine 或 taosadapter 运行实例的地址
* td.connect.port: TDengine 或 taosadapter 运行实例的端口
* 当 td.connect.type 为 WebSocket 且 useSSL 为 false 时td.connect.port 默认为 6041
* 当 td.connect.type 为 WebSocket 且 useSSL 为 true 时td.connect.port 默认为 443
* useSSL: 是否使用 SSL 连接,仅当 td.connect.type 为 WebSocket 时有效,默认为 false
* token: 连接 TDengine cloud 的 token仅当 td.connect.type 为 WebSocket 时有效
* td.connect.user: 连接 TDengine 的用户名
* td.connect.pass: 连接 TDengine 的密码
* group.id: 消费者组 ID
* client.id: 消费者 ID
* enable.auto.commit: 是否自动提交 offset默认为 true
* auto.commit.interval.ms: 自动提交 offset 的间隔时间,默认为 5000 毫秒
* auto.offset.reset: 当 offset 不存在时,从哪里开始消费,可选值为 earliest 或 latest默认为 latest
* msg.with.table.name: 消息是否包含表名
* ws.message.enableCompression: 是否启用 WebSocket 压缩dotnet 版本 6 及以上,连接器版本 3.1.1 及以上生效),默认为 false
* ws.autoReconnect: 是否自动重连(连接器版本 3.1.3 及以上生效),默认为 false
* ws.reconnect.retry.count: 重连次数(连接器版本 3.1.3 及以上生效),默认为 3
* ws.reconnect.interval.ms: 重连间隔时间(连接器版本 3.1.3 及以上生效),默认为 2000
支持订阅结果集 `Dictionary<string, object>` key 为列名value 为列值。
如果使用 object 接收列值,需要注意:
* 原始 C# 列类型与 TDengine 列类型的需要一一对应,具体对应关系请参考 [TDengine DataType 和 C# DataType](#tdengine-datatype-和-c-datatype)。
* 列名与 class 属性名一致,并可读写。
* 明确设置 value 解析器`ConsumerBuilder.SetValueDeserializer(new ReferenceDeserializer<T>());`
样例如下
结果 class
```csharp
class Result
{
public DateTime ts { get; set; }
public float current { get; set; }
public int voltage { get; set; }
public float phase { get; set; }
}
```
设置解析器
```csharp
var tmqBuilder = new ConsumerBuilder<Result>(cfg);
tmqBuilder.SetValueDeserializer(new ReferenceDeserializer<Result>());
var consumer = tmqBuilder.Build();
```
也可实现自定义解析器,实现 `IDeserializer<T>` 接口并通过`ConsumerBuilder.SetValueDeserializer`方法传入。
```csharp
public interface IDeserializer<T>
{
T Deserialize(ITMQRows data, bool isNull, SerializationContext context);
}
```
#### 订阅消费数据
```csharp
consumer.Subscribe(new List<string>() { "topic_meters" });
while (true)
{
using (var cr = consumer.Consume(500))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
Console.WriteLine(
$"message {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
}
}
```
#### 指定订阅 Offset
```csharp
consumer.Assignment.ForEach(a =>
{
Console.WriteLine($"{a}, seek to 0");
consumer.Seek(new TopicPartitionOffset(a.Topic, a.Partition, 0));
Thread.Sleep(TimeSpan.FromSeconds(1));
});
```
#### 提交 Offset
```csharp
public void Commit(ConsumeResult<TValue> consumerResult)
public List<TopicPartitionOffset> Commit()
public void Commit(IEnumerable<TopicPartitionOffset> offsets)
```
#### 关闭订阅
```csharp
consumer.Unsubscribe();
consumer.Close();
```
#### 完整示例
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">
```csharp
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using TDengine.Driver;
using TDengine.Driver.Client;
using TDengine.TMQ;
namespace NativeSubscription
{
internal class Program
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("CREATE DATABASE power");
client.Exec("USE power");
client.Exec(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
client.Exec("CREATE TOPIC topic_meters as SELECT * from power.meters");
var cfg = new Dictionary<string, string>()
{
{ "group.id", "group1" },
{ "auto.offset.reset", "latest" },
{ "td.connect.ip", "127.0.0.1" },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
{ "td.connect.port", "6030" },
{ "client.id", "tmq_example" },
{ "enable.auto.commit", "true" },
{ "msg.with.table.name", "false" },
};
var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
consumer.Subscribe(new List<string>() { "topic_meters" });
Task.Run(InsertData);
while (true)
{
using (var cr = consumer.Consume(500))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
Console.WriteLine(
$"message {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
consumer.Commit();
}
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
static void InsertData()
{
var builder = new ConnectionStringBuilder("host=localhost;port=6030;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
while (true)
{
client.Exec("INSERT into power.d1001 using power.meters tags(2,'California.SanFrancisco') values(now,11.5,219,0.30)");
Task.Delay(1000).Wait();
}
}
}
}
}
```
</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">
```csharp
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using TDengine.Driver;
using TDengine.Driver.Client;
using TDengine.TMQ;
namespace WSSubscription
{
internal class Program
{
public static void Main(string[] args)
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
try
{
client.Exec("CREATE DATABASE power");
client.Exec("USE power");
client.Exec(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
client.Exec("CREATE TOPIC topic_meters as SELECT * from power.meters");
var cfg = new Dictionary<string, string>()
{
{ "td.connect.type", "WebSocket" },
{ "group.id", "group1" },
{ "auto.offset.reset", "latest" },
{ "td.connect.ip", "localhost" },
{ "td.connect.port", "6041" },
{ "useSSL", "false" },
{ "td.connect.user", "root" },
{ "td.connect.pass", "taosdata" },
{ "client.id", "tmq_example" },
{ "enable.auto.commit", "true" },
{ "msg.with.table.name", "false" },
};
var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
consumer.Subscribe(new List<string>() { "topic_meters" });
Task.Run(InsertData);
while (true)
{
using (var cr = consumer.Consume(500))
{
if (cr == null) continue;
foreach (var message in cr.Message)
{
Console.WriteLine(
$"message {{{((DateTime)message.Value["ts"]).ToString("yyyy-MM-dd HH:mm:ss.fff")}, " +
$"{message.Value["current"]}, {message.Value["voltage"]}, {message.Value["phase"]}}}");
}
consumer.Commit();
}
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
throw;
}
}
}
static void InsertData()
{
var builder = new ConnectionStringBuilder("protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata");
using (var client = DbDriver.Open(builder))
{
while (true)
{
client.Exec("INSERT into power.d1001 using power.meters tags(2,'California.SanFrancisco') values(now,11.5,219,0.30)");
Task.Delay(1000).Wait();
}
}
}
}
}
```
</TabItem>
</Tabs>
### ADO.NET
C# 连接器支持 ADO.NET 接口,可以通过 ADO.NET 接口连接 TDengine 运行实例,进行数据写入、查询等操作。
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">
```csharp
using System;
using TDengine.Data.Client;
namespace NativeADO
{
internal class Program
{
public static void Main(string[] args)
{
const string connectionString = "host=localhost;port=6030;username=root;password=taosdata";
using (var connection = new TDengineConnection(connectionString))
{
try
{
connection.Open();
using (var command = new TDengineCommand(connection))
{
command.CommandText = "create database power";
command.ExecuteNonQuery();
connection.ChangeDatabase("power");
command.CommandText =
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))";
command.ExecuteNonQuery();
command.CommandText = "INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(?,?,?,?)";
var parameters = command.Parameters;
parameters.Add(new TDengineParameter("@0", new DateTime(2023,10,03,14,38,05,000)));
parameters.Add(new TDengineParameter("@1", (float)10.30000));
parameters.Add(new TDengineParameter("@2", (int)219));
parameters.Add(new TDengineParameter("@3", (float)0.31000));
command.ExecuteNonQuery();
command.Parameters.Clear();
command.CommandText = "SELECT * FROM meters";
using (var reader = command.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine(
$"{((DateTime) reader.GetValue(0)):yyyy-MM-dd HH:mm:ss.fff}, {reader.GetValue(1)}, {reader.GetValue(2)}, {reader.GetValue(3)}, {reader.GetValue(4)}, {System.Text.Encoding.UTF8.GetString((byte[]) reader.GetValue(5))}");
}
}
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
}
}
```
</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">
```csharp
using System;
using TDengine.Data.Client;
namespace WSADO
{
internal class Program
{
public static void Main(string[] args)
{
const string connectionString = "protocol=WebSocket;host=localhost;port=6041;useSSL=false;username=root;password=taosdata";
using (var connection = new TDengineConnection(connectionString))
{
try
{
connection.Open();
using (var command = new TDengineCommand(connection))
{
command.CommandText = "create database power";
command.ExecuteNonQuery();
connection.ChangeDatabase("power");
command.CommandText =
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))";
command.ExecuteNonQuery();
command.CommandText = "INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(?,?,?,?)";
var parameters = command.Parameters;
parameters.Add(new TDengineParameter("@0", new DateTime(2023,10,03,14,38,05,000)));
parameters.Add(new TDengineParameter("@1", (float)10.30000));
parameters.Add(new TDengineParameter("@2", (int)219));
parameters.Add(new TDengineParameter("@3", (float)0.31000));
command.ExecuteNonQuery();
command.Parameters.Clear();
command.CommandText = "SELECT * FROM meters";
using (var reader = command.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine(
$"{((DateTime) reader.GetValue(0)):yyyy-MM-dd HH:mm:ss.fff}, {reader.GetValue(1)}, {reader.GetValue(2)}, {reader.GetValue(3)}, {reader.GetValue(4)}, {System.Text.Encoding.UTF8.GetString((byte[]) reader.GetValue(5))}");
}
}
}
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
}
}
```
</TabItem>
</Tabs>
* 连接参数与[建立连接](#建立连接)中的连接参数一致。
* TDengineParameter 的 name 需要以 @ 开头,如 @0、@1、@2 等value 需要 C# 列类型与 TDengine 列类型一一对应,具体对应关系请参考 [TDengine DataType 和 C# DataType](#tdengine-datatype-和-c-datatype)。
### 更多示例程序
[示例程序](https://github.com/taosdata/taos-connector-dotnet/tree/3.0/examples)
## API 参考
### ADO.NET 驱动
`TDengine.Data.Client` 接口实现了 ADO.NET 驱动,支持连接 TDengine 数据库,进行数据操作。
#### 参数规范
ConnectionStringBuilder 使用 key-value 对方式设置连接参数key 为参数名value 为参数值,不同参数之间使用分号 `;` 分割。
例如:
```csharp
"protocol=WebSocket;host=127.0.0.1;port=6041;useSSL=false"
```
##### 原生连接
例如:`"host=127.0.0.1;port=6030;username=root;password=taosdata;protocol=Native;db=test"`
支持的参数如下:
- `host`TDengine 运行实例的地址。
- `port`TDengine 运行实例的端口。
- `username`:连接的用户名。
- `password`:连接的密码。
- `protocol`:连接的协议,可选值为 Native 或 WebSocket默认为 Native。
- `db`:连接的数据库。
- `timezone`:时区,默认为本地时区。
##### WebSocket 连接
例如:`"protocol=WebSocket;host=127.0.0.1;port=6041;useSSL=false;enableCompression=true;autoReconnect=true;reconnectIntervalMs=10;reconnectRetryCount=5"`
支持的参数如下:
- `host`TDengine 运行实例的地址。
- `port`TDengine 运行实例的端口。
- `username`:连接的用户名。
- `password`:连接的密码。
- `protocol`:连接的协议,可选值为 Native 或 WebSocket默认为 Native。
- `db`:连接的数据库。
- `timezone`:时区,默认为本地时区。
- `connTimeout`:连接超时时间,默认为 1 分钟。
- `readTimeout`:读取超时时间,默认为 5 分钟。
- `writeTimeout`:发送超时时间,默认为 10 秒。
- `token`:连接 TDengine cloud 的 token。
- `useSSL`:是否使用 SSL 连接,默认为 false。
- `enableCompression`:是否启用 WebSocket 压缩,默认为 false。
- `autoReconnect`:是否自动重连,默认为 false。
- `reconnectRetryCount`:重连次数,默认为 3。
- `reconnectIntervalMs`:重连间隔毫秒时间,默认为 2000。
#### 接口说明
`ConnectionStringBuilder` 类提供了连接配置字符串的解析功能。
- `public ConnectionStringBuilder(string connectionString)`
- **接口说明**ConnectionStringBuilder 构造函数。
- **参数说明**
- `connectionString`:连接配置字符串。
### 连接功能
C# 驱动支持创建 ADO.NET 连接,返回支持 ADO.NET 标准的 `DbConnection` 接口的对象,还提供了 `ITDengineClient` 接口,扩充了一些无模式写入接口。
#### 标准接口
ADO.NET 连接支持的标准接口如下:
- `public TDengineConnection(string connectionString)`
- **接口说明**TDengineConnection 构造函数。
- **参数说明**
- `connectionString`:连接配置字符串。
- **异常**:格式错误抛出 `ArgumentException` 异常。
- `public void ChangeDatabase(string databaseName)`
- **接口说明**:切换数据库。
- **参数说明**
- `databaseName`:数据库名。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `public void Close()`
- **接口说明**:关闭连接。
- `public void Open()`
- **接口说明**:打开连接。
- **异常**:打开失败抛出 `TDengineError` 异常WebSocket 连接可能存在网络异常须注意处理。
- `public string ServerVersion`
- **接口说明**:返回服务器版本。
- **返回值**:服务器版本字符串。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `public string DataSource`
- **接口说明**:返回数据源。
- **返回值**:创建连接 host 配置。
- `public string Database`
- **接口说明**:返回连接数据库。
- **返回值**:创建连接 db 配置。
- `public TDengineCommand(TDengineConnection connection)`
- **接口说明**TDengineCommand 构造函数。
- **参数说明**
- `connection`TDengineConnection 对象。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `public void Prepare()`
- **接口说明**:检查连接和命令文本,并准备命令执行。
- **异常**:未执行 open 或未设置 CommandText 抛出 `InvalidOperationException` 异常。
- `public string CommandText`
- **接口说明**:获取或设置命令文本。
- **返回值**:命令文本。
- `public new virtual TDengineParameterCollection Parameters`
- **接口说明**:获取参数集合。
- **返回值**TDengineParameterCollection 对象。
#### 无模式写入
- `public static ITDengineClient Open(ConnectionStringBuilder builder)`
- **接口说明**:打开连接。
- **参数说明**
- `builder`:连接配置。
- **返回值**ITDengineClient 接口。
- **异常**:打开失败抛出 `TDengineError` 异常WebSocket 连接可能存在网络异常须注意处理。
- `void SchemalessInsert(string[] lines, TDengineSchemalessProtocol protocol,TDengineSchemalessPrecision precision, int ttl, long reqId)`
- **接口说明**:无模式写入。
- **参数说明**
- `lines`:数据行数组。
- `protocol`:数据协议,支持协议:`TSDB_SML_LINE_PROTOCOL = 1` `TSDB_SML_TELNET_PROTOCOL = 2` `TSDB_SML_JSON_PROTOCOL = 3`。
- `precision`:时间精度,支持配置:`TSDB_SML_TIMESTAMP_NOT_CONFIGURED = 0` `TSDB_SML_TIMESTAMP_HOURS = 1` `TSDB_SML_TIMESTAMP_MINUTES = 2` `TSDB_SML_TIMESTAMP_SECONDS = 3` `TSDB_SML_TIMESTAMP_MILLI_SECONDS = 4` `TSDB_SML_TIMESTAMP_MICRO_SECONDS = 5` `TSDB_SML_TIMESTAMP_NANO_SECONDS = 6`。
- `ttl`数据过期时间0表示不配置。
- `reqId`:请求 ID。
- **异常**:执行失败抛出 `TDengineError` 异常。
### 执行 SQL
C# 驱动提供了符合 ADO.NET 标准的 `DbCommand` 接口,支持以下功能:
1. **执行 SQL 语句**:执行静态 SQL 语句,并返回其生成的结果对象。
2. **查询执行**:可以执行返回数据集的查询(`SELECT` 语句)。
3. **更新执行**:可以执行影响行数的 SQL 语句,如 `INSERT`、`UPDATE`、`DELETE` 等。
4. **获取结果**:可以获取查询执行后返回的结果集(`ResultSet` 对象),并遍历查询返回的数据。
5. **获取更新计数**:对于非查询 SQL 语句,可以获取执行后影响的行数。
6. **关闭资源**:提供了关闭的方法,以释放数据库资源。
另外 C# 驱动还提供了用于请求链路跟踪的扩展接口。
#### 标准接口
- `public int ExecuteNonQuery()`
- **接口说明**:执行 SQL 语句,返回受影响的行数。
- **返回值**:受影响的行数。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `public object ExecuteScalar()`
- **接口说明**:执行查询,并返回查询结果的第一行第一列。
- **返回值**:查询结果的第一行第一列。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `public DbDataReader ExecuteReader()`
- **接口说明**:执行查询,并返回查询结果的数据读取器。
- **返回值**:查询结果的数据读取器。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `public void Dispose();`
- **接口说明**:释放资源。
#### 扩展接口
扩展接口主要用于请求链路跟踪。
- `IRows Query(string query, long reqId)`
- **接口说明**:执行查询,返回查询结果。
- **参数说明**
- `query`:查询语句。
- `reqId`:请求 ID。
- **返回值**:查询结果。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `long Exec(string query, long reqId)`
- **接口说明**:执行 SQL 语句。
- **参数说明**
- `query`SQL 语句。
- `reqId`:请求 ID。
- **返回值**:受影响的行数。
- **异常**:执行失败抛出 `TDengineError` 异常。
### 结果获取
C# 驱动提供了符合 ADO.NET 标准的 `DbDataReader` 接口,提供了用于读取结果集中元数据和数据的方法。
#### 结果集
`DbDataReader` 接口提供了以下方法获取结果集:
- `public bool GetBoolean(int ordinal)`
- **接口说明**:获取指定列的布尔值。
- **参数说明**
- `ordinal`:列索引。
- **返回值**:布尔值。
- **异常**:类型不对应抛出 `InvalidCastException` 异常。
- `public byte GetByte(int ordinal)`
- **接口说明**:获取指定列的字节值。
- **参数说明**
- `ordinal`:列索引。
- **返回值**:字节值。
- **异常**:类型不对应抛出 `InvalidCastException` 异常。
- `public long GetBytes(int ordinal, long dataOffset, byte[] buffer, int bufferOffset, int length)`
- **接口说明**:获取指定列的字节值。
- **参数说明**
- `ordinal`:列索引。
- `dataOffset`:数据偏移量。
- `buffer`:缓冲区。
- `bufferOffset`:缓冲区偏移量。
- `length`:长度。
- **返回值**:字节值。
- **异常**:类型不对应抛出 `InvalidCastException` 异常。
- `public char GetChar(int ordinal)`
- **接口说明**:获取指定列的字符值。
- **参数说明**
- `ordinal`:列索引。
- **返回值**:字符值。
- **异常**:类型不对应抛出 `InvalidCastException` 异常。
- `public long GetChars(int ordinal, long dataOffset, char[] buffer, int bufferOffset, int length)`
- **接口说明**:获取指定列的字符值。
- **参数说明**
- `ordinal`:列索引。
- `dataOffset`:数据偏移量。
- `buffer`:缓冲区。
- `bufferOffset`:缓冲区偏移量。
- `length`:长度。
- **返回值**:字符值。
- **异常**:类型不对应抛出 `InvalidCastException` 异常。
- `public DateTime GetDateTime(int ordinal)`
- **接口说明**:获取指定列的日期时间值。
- **参数说明**
- `ordinal`:列索引。
- **返回值**:日期时间值。
- **异常**:类型不对应抛出 `InvalidCastException` 异常。
- `public double GetDouble(int ordinal)`
- **接口说明**:获取指定列的双精度浮点数值。
- **参数说明**
- `ordinal`:列索引。
- **返回值**:双精度浮点数值。
- **异常**:类型不对应抛出 `InvalidCastException` 异常。
- `public float GetFloat(int ordinal)`
- **接口说明**:获取指定列的单精度浮点数值。
- **参数说明**
- `ordinal`:列索引。
- **返回值**:单精度浮点数值。
- **异常**:类型不对应抛出 `InvalidCastException` 异常。
- `public short GetInt16(int ordinal)`
- **接口说明**:获取指定列的 16 位整数值。
- **参数说明**
- `ordinal`:列索引。
- **返回值**16 位整数值。
- **异常**:类型不对应抛出 `InvalidCastException` 异常。
- `public int GetInt32(int ordinal)`
- **接口说明**:获取指定列的 32 位整数值。
- **参数说明**
- `ordinal`:列索引。
- **返回值**32 位整数值。
- **异常**:类型不对应抛出 `InvalidCastException` 异常。
- `public long GetInt64(int ordinal)`
- **接口说明**:获取指定列的 64 位整数值。
- **参数说明**
- `ordinal`:列索引。
- **返回值**64 位整数值。
- **异常**:类型不对应抛出 `InvalidCastException` 异常。
- `public string GetString(int ordinal)`
- **接口说明**:获取指定列的字符串值。
- **参数说明**
- `ordinal`:列索引。
- **返回值**:字符串值。
- **异常**:类型不对应抛出 `InvalidCastException` 异常。
- `public object GetValue(int ordinal)`
- **接口说明**:获取指定列的值。
- **参数说明**
- `ordinal`:列索引。
- **返回值** 结果对象。
- `public int GetValues(object[] values)`
- **接口说明**:获取所有列的值。
- **参数说明**
- `values`:值数组。
- **返回值**:值数量。
- `public bool IsDBNull(int ordinal)`
- **接口说明**:判断指定列是否为 NULL。
- **参数说明**
- `ordinal`:列索引。
- **返回值**:是否为 NULL。
- `public int RecordsAffected`
- **接口说明**:获取受影响的行数。
- **返回值**:受影响的行数。
- `public bool HasRows`
- **接口说明**:结果是否有行数据。
- **返回值**:结果是否有行数据。
- `public bool Read()`
- **接口说明**:读取下一行。
- **返回值**:是否读取成功。
- `public IEnumerator GetEnumerator()`
- **接口说明**:获取枚举器。
- **返回值**:枚举器。
- `public void Close()`
- **接口说明**:关闭结果集。
#### 结果集元数据
`DbDataReader` 接口提供了以下方法获取结果集元数据:
- `public DataTable GetSchemaTable()`
- **接口说明**:获取结果集元数据。
- **返回值**:结果集元数据。
- `public string GetDataTypeName(int ordinal)`
- **接口说明**:获取指定列的数据类型名称。
- **参数说明**
- `ordinal`:列索引。
- **返回值**:数据类型名称。
- ` public Type GetFieldType(int ordinal)`
- **接口说明**:获取指定列的数据类型。
- **参数说明**
- `ordinal`:列索引。
- **返回值**:数据类型。
- `public string GetName(int ordinal)`
- **接口说明**:获取指定列的名称。
- **参数说明**
- `ordinal`:列索引。
- **返回值**:列名称。
- ` public int GetFieldSize(int ordinal)`
- **接口说明**:获取指定列的大小。
- **参数说明**
- `ordinal`:列索引。
- **返回值**:列大小。
- `public int GetOrdinal(string name)`
- **接口说明**:获取指定列的索引。
- **参数说明**
- `name`:列名称。
- **返回值**:列索引。
- `public int FieldCount`
- **接口说明**:获取列数。
- **返回值**:列数。
### 参数绑定
`TDengineCommand` 类支持参数绑定。
#### 标准接口
`TDengineCommand` 类继承了 `DbCommand` 接口,支持以下功能:
- `public string CommandText`
- **接口说明**:获取或设置命令文本,支持参数绑定。
- **返回值**:命令文本。
- `public new virtual TDengineParameterCollection Parameters`
- **接口说明**:获取参数集合。
- **返回值**`TDengineParameterCollection` 对象。
#### 参数元数据
`TDengineParameterCollection` 继承了 `DbParameterCollection` 接口,支持以下功能:
- `public int Add(object value)`
- **接口说明**:添加参数。
- **参数说明**
- `value`:参数值。
- **返回值**:参数索引。
- `public void Clear()`
- **接口说明**:清空参数。
- `public bool Contains(object value)`
- **接口说明**:是否包含参数。
- **参数说明**
- `value`:参数值。
- **返回值**:是否包含参数。
- `public int IndexOf(object value)`
- **接口说明**:获取参数索引。
- **参数说明**
- `value`:参数值。
- **返回值**:参数索引。
- `public void Insert(int index, object value)`
- **接口说明**:插入参数。
- **参数说明**
- `index`:索引。
- `value`:参数值。
- `public void Remove(object value)`
- **接口说明**:移除参数。
- **参数说明**
- `value`:参数值。
- `public void RemoveAt(int index)`
- **接口说明**:移除参数。
- **参数说明**
- `index`:索引。
- `public void RemoveAt(string parameterName)`
- **接口说明**:移除参数。
- **参数说明**
- `parameterName`:参数名。
- `public int Count`
- **接口说明**:获取参数数量。
- **返回值**:参数数量。
- `public int IndexOf(string parameterName)`
- **接口说明**:获取参数索引。
- **参数说明**
- `parameterName`:参数名。
- **返回值**:参数索引。
- `public bool Contains(string value)`
- **接口说明**:是否包含参数。
- **参数说明**
- `value`:参数名。
- **返回值**:是否包含参数。
- `public void CopyTo(Array array, int index)`
- **接口说明**:复制参数。
- **参数说明**
- `array`:目标数组。
- `index`:索引。
- `public IEnumerator GetEnumerator()`
- **接口说明**:获取枚举器。
- **返回值**:枚举器。
- `public void AddRange(Array values)`
- **接口说明**:添加参数。
- **参数说明**
- `values`:参数数组。
`TDengineParameter` 继承了 `DbParameter` 接口,支持以下功能:
- `public TDengineParameter(string name, object value)`
- **接口说明**TDengineParameter 构造函数。
- **参数说明**
- `name`:参数名,需要以 @ 开头,如 @0、@1、@2 等。
- `value`:参数值,需要 C# 列类型与 TDengine 列类型一一对应。
- `public string ParameterName`
- **接口说明**:获取或设置参数名。
- **返回值**:参数名。
- `public object Value`
- **接口说明**:获取或设置参数值。
- **返回值**:参数值。
#### 扩展接口
`ITDengineClient` 接口提供了扩展的参数绑定接口。
- `IStmt StmtInit(long reqId)`
- **接口说明**:初始化 statement 对象。
- **参数说明**
- `reqId`:请求 ID。
- **返回值**:实现 IStmt 接口的对象。
- **异常**:执行失败抛出 `TDengineError` 异常。
`IStmt` 接口提供了扩展的参数绑定接口。
- `void Prepare(string query)`
- **接口说明**:准备 statement。
- **参数说明**
- `query`:查询语句。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `bool IsInsert()`
- **接口说明**:判断是否为插入语句。
- **返回值**:是否为插入语句。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `void SetTableName(string tableName)`
- **接口说明**:设置表名。
- **参数说明**
- `tableName`:表名。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `void SetTags(object[] tags)`
- **接口说明**:设置标签。
- **参数说明**
- `tags`:标签数组。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `TaosFieldE[] GetTagFields()`
- **接口说明**:获取标签属性。
- **返回值**:标签属性数组。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `TaosFieldE[] GetColFields()`
- **接口说明**:获取列属性。
- **返回值**:列属性数组。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `void BindRow(object[] row)`
- **接口说明**:绑定行。
- **参数说明**
- `row`:行数据数组。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `void BindColumn( TaosFieldE[] fields,params Array[] arrays)`
- **接口说明**:绑定全部列。
- **参数说明**
- `fields`:字段属性数组。
- `arrays`:多列数据数组。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `void AddBatch()`
- **接口说明**:添加批处理。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `void Exec()`
- **接口说明**:执行参数绑定。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `long Affected()`
- **接口说明**:获取受影响的行数。
- **返回值**:受影响的行数。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `IRows Result()`
- **接口说明**:获取结果。
- **返回值**:结果对象。
- **异常**:执行失败抛出 `TDengineError` 异常。
### 数据订阅
`ConsumerBuilder` 类提供了消费者构建相关接口,`ConsumeResult` 类提供了消费结果相关接口,`TopicPartitionOffset` 类提供了分区偏移量相关接口。`ReferenceDeserializer` 和 `DictionaryDeserializer` 提供了反序列化的支持。
#### 消费者
- `public ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)`
- **接口说明**ConsumerBuilder 构造函数。
- **参数说明**
- `config`:消费配置。
创建消费者支持属性列表:
- `useSSL`:是否使用 SSL 连接,默认为 false
- `token`:连接 TDengine cloud 的 token
- `ws.message.enableCompression`:是否启用 WebSocket 压缩,默认为 false
- `ws.autoReconnect`:是否自动重连,默认为 false
- `ws.reconnect.retry.count`:重连次数,默认为 3
- `ws.reconnect.interval.ms`:重连间隔毫秒时间,默认为 2000
其他参数请参考:[Consumer 参数列表](../../develop/tmq/#数据订阅相关参数) 注意TDengine服务端自 3.2.0.0 版本开始消息订阅中的 auto.offset.reset 默认值发生变化。
- `public IConsumer<TValue> Build()`
- **接口说明**:构建消费者。
- **返回值**:消费者对象。
`IConsumer` 接口提供了消费者相关 API
- `ConsumeResult<TValue> Consume(int millisecondsTimeout)`
- **接口说明**:消费消息。
- **参数说明**
- `millisecondsTimeout`:毫秒超时时间。
- **返回值**:消费结果。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `List<TopicPartition> Assignment { get; }`
- **接口说明**:获取分配信息。
- **返回值**:分配信息。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `List<string> Subscription()`
- **接口说明**:获取订阅的主题。
- **返回值**:主题列表。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `void Subscribe(IEnumerable<string> topic)`
- **接口说明**:订阅主题列表。
- **参数说明**
- `topic`:主题列表。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `void Subscribe(string topic)`
- **接口说明**:订阅单个主题。
- **参数说明**
- `topic`:主题。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `void Unsubscribe()`
- **接口说明**:取消订阅。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `void Commit(ConsumeResult<TValue> consumerResult)`
- **接口说明**:提交消费结果。
- **参数说明**
- `consumerResult`:消费结果。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `List<TopicPartitionOffset> Commit()`
- **接口说明**:提交全部消费结果。
- **返回值**:分区偏移量。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `void Commit(IEnumerable<TopicPartitionOffset> offsets)`
- **接口说明**:提交消费结果。
- **参数说明**
- `offsets`:分区偏移量。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `void Seek(TopicPartitionOffset tpo)`
- **接口说明**:跳转到分区偏移量。
- **参数说明**
- `tpo`:分区偏移量。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `List<TopicPartitionOffset> Committed(TimeSpan timeout)`
- **接口说明**:获取分区偏移量。
- **参数说明**
- `timeout`:超时时间(未使用)。
- **返回值**:分区偏移量。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `List<TopicPartitionOffset> Committed(IEnumerable<TopicPartition> partitions, TimeSpan timeout)`
- **接口说明**:获取指定分区偏移量。
- **参数说明**
- `partitions`:分区列表。
- `timeout`:超时时间(未使用)。
- **返回值**:分区偏移量。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `Offset Position(TopicPartition partition)`
- **接口说明**:获取消费位置。
- **参数说明**
- `partition`:分区。
- **返回值**:偏移量。
- **异常**:执行失败抛出 `TDengineError` 异常。
- `void Close()`
- **接口说明**:关闭消费者。
#### 消费记录
`ConsumeResult` 类提供了消费结果相关接口:
- `public List<TmqMessage<TValue>> Message`
- **接口说明**:获取消息列表。
- **返回值**:消息列表。
`TmqMessage` 类提供了消息具体内容:
```csharp
public class TmqMessage<TValue>
{
public string TableName { get; set; }
public TValue Value { get; set; }
}
```
- `TableName`:表名
- `Value`:消息内容
#### 分区信息
从 `ConsumeResult` 获取 `TopicPartitionOffset`
```csharp
public TopicPartitionOffset TopicPartitionOffset
```
`TopicPartitionOffset` 类提供了获取分区信息的接口:
- `public string Topic { get; }`
- **接口说明**:获取主题。
- **返回值**:主题。
- `public Partition Partition { get; }`
- **接口说明**:获取分区。
- **返回值**:分区。
- `public Offset Offset { get; }`
- **接口说明**:获取偏移量。
- **返回值**:偏移量。
- `public TopicPartition TopicPartition`
- **接口说明**:获取主题分区。
- **返回值**:主题分区。
- `public string ToString()`
- **接口说明**:转换为字符串。
- **返回值**:字符串信息。
#### 偏移量元数据
`Offset` 类提供了偏移量相关接口:
- `public long Value`
- **接口说明**:获取偏移量值。
- **返回值**:偏移量值。
#### 反序列化
C# 驱动提供了两个反序列化类:`ReferenceDeserializer` 和 `DictionaryDeserializer`。它们都实现了 `IDeserializer` 接口。
ReferenceDeserializer 用来将消费到的一条记录反序列化为一个对象,需要保证对象类的属性名与消费到的数据的列名能够对应,且类型能够匹配。
DictionaryDeserializer 则会将消费到的一行数据反序列化为一个 `Dictionary<string, object>` 对象,其 key 为列名,值为对象。
ReferenceDeserializer 和 DictionaryDeserializer 的接口不会被用户直接调用,请参考使用样例。