fix:add api document for tmq

This commit is contained in:
wangmm0220 2024-06-04 14:27:46 +08:00
parent 0c8e9046f6
commit 61e6b25da6
3 changed files with 473 additions and 247 deletions

View File

@ -451,6 +451,88 @@ In addition to writing data using the SQL method or the parameter binding API, w
- Within _reqid interfaces can track the entire call chain by passing the reqid parameter. - Within _reqid interfaces can track the entire call chain by passing the reqid parameter.
### Subscription API ### Subscription API
- `const char *tmq_err2str(int32_t code)`
**Description**
- This interface is used to convert error codes for data subscriptions into error messages
**Parameter description**
- code: error code
**Return value**
- non NULL, return error message, error message may be empty
- `tmq_conf_t *tmq_conf_new()`
- `tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value)`
- `void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param)`
- `void tmq_conf_destroy(tmq_conf_t *conf)`
tmq_conf_res_t defined as follows:
```
typedef enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
} tmq_conf_res_t;
```
commit callback function defined as follows:
```
typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param))
```
**Description**
- tmq_conf_new : create a tmq_conf_t structure to configure consumption parameters
- tmq_conf_set : set configuration, key is parameter namevalue is parameter value
- tmq_conf_set_auto_commit_cb : set automatic commit callback function, cb is call back function, param is callback function parameter
- tmq_conf_destroy : destroy tmq_conf_t structure
- `tmq_list_t *tmq_list_new()`
- `int32_t tmq_list_append(tmq_list_t *, const char *)`
- `void tmq_list_destroy(tmq_list_t *)`
- `int32_t tmq_list_get_size(const tmq_list_t *)`
- `char **tmq_list_to_c_array(const tmq_list_t *)`
**Description**
- tmq_list_new : build a tmq_list_t constructure, used to save topic
- tmq_list_append : add topic to tmq_list_t
- tmq_list_destroy : destroy tmq_list_t
- tmq_list_get_size : get size of tmq_list_t
- tmq_list_to_c_array : convert tmq_list_t to c array, element is string pointer
- `tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen)`
- `int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list)`
- `int32_t tmq_unsubscribe(tmq_t *tmq)`
- `int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topic_list)`
- `TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout)`
- `int32_t tmq_consumer_close(tmq_t *tmq)`
**Description**
- tmq_consumer_new : build a tmq_t constructure, need to be used with tmq_consumer_close
- tmq_subscribe : subscribe topic, need to be used with tmq_unsubscribe
- tmq_unsubscribe : unsubscribe topic, need to be used with tmq_subscribe
- tmq_subscription : obtain a list of topics subscribed by consumer
- tmq_consumer_poll : used to consume data
- tmq_consumer_close : clost tmq_t, need to be used with tmq_consumer_new
**Parameter description**
- conf: sed to configure consume parameters
- errstr: The error information is stored in this string. Allocation and release of memory are the responsibility of the caller
- errstenLen: the length of errstr
- tmq: structure of tmq_t returned by tmq_consumer_new
- topic_list: a list of topics subscribed by consumersneed to be freed by tmq_list_destroy
- timeout: the timeout time, measured in milliseconds, indicates how long it takes for data to expire. If it is negative, it will default to 1 second
**Return value**
- tmq_consumer_new: structure of tmq_t, NULL failed
- tmq_subscribe: zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_unsubscribe: zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_subscription: zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_consumer_poll: structure of TAOS_RES(same like taos_query), NULL if there is no data
- tmq_consumer_close: zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment)` - `int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment)`
- `void tmq_free_assignment(tmq_topic_assignment* pAssignment)` - `void tmq_free_assignment(tmq_topic_assignment* pAssignment)`
@ -474,6 +556,7 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Return value** **Return value**
- zero success,none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)` - zero success,none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId)` - `int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**Function description** **Function description**
@ -482,6 +565,7 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Return value** **Return value**
- the value of committed offset, -2147467247 means no committed value, Other values less than 0 indicate failure - the value of committed offset, -2147467247 means no committed value, Other values less than 0 indicate failure
- `int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg)` - `int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg)`
- `void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param)` - `void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param)`
- `int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)` - `int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
@ -499,6 +583,7 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Return value** **Return value**
- zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)` - zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId)` - `int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**Function description** **Function description**
@ -507,6 +592,7 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Return value** **Return value**
- the current consumption location, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)` - the current consumption location, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)` - `int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
**Function description** **Function description**
@ -514,25 +600,52 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Return value** **Return value**
- zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)` - zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
- `int32_t tmq_get_vgroup_id(TAOS_RES *res)`
**Description**
- tmq_get_vgroup_offset : Obtain the starting offset of the consumed data
- tmq_get_vgroup_id : Obtain the vgroup id of the consumed data
- `int32_t int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
**Function description**
- Obtain the starting offset of the consumed data
**Parameter description** **Parameter description**
- msgMessage consumed - msg : Message consumed
**Return value**
- the starting offset of the consumed data, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int32_t int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics)`
**Function description**
- Obtain a list of topics subscribed by consumers
**Parameter description**
- topics: a list of topics subscribed by consumersneed to be freed by tmq_list_destroy
**Return value** **Return value**
- zero successnone zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)` - tmq_get_vgroup_offset : the starting offset of the consumed data, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_get_vgroup_id : vgroup id of result, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `TAOS *tmq_get_connect(tmq_t *tmq)`
- `const char *tmq_get_table_name(TAOS_RES *res)`
- `tmq_res_t tmq_get_res_type(TAOS_RES *res)`
- `const char *tmq_get_topic_name(TAOS_RES *res)`
- `const char *tmq_get_db_name(TAOS_RES *res)`
tmq_res_t 表示消费到的数据类型,定义如下:
```
typedef enum tmq_res_t {
TMQ_RES_INVALID = -1, // invalid
TMQ_RES_DATA = 1, // data
TMQ_RES_TABLE_META = 2, // meta
TMQ_RES_METADATA = 3 // data & meta
} tmq_res_t;
```
**Description**
- tmq_get_connect : when creating a consumer, a link will be automatically established and saved in the tmq_t structure. This interface allows users to obtain link information(same like taos_connect) from the tmq_t structure
- tmq_get_table_name : get the table name of result
- tmq_get_res_type : get the type of result
- tmq_get_topic_name : get the topic name of result
- tmq_get_db_name : get the db name of result
**Parameter description**
- tmq : tmq_t structure created by tmq_consumer_new
- res : TAOS_RES structure returned by tmq_consumer_poll
**Return value**
- tmq_get_connect : connection info in tmq, NULL if failed
- tmq_get_table_name : table name of result, NULL if failed
- tmq_get_res_type : result type tmq_res_t
- tmq_get_topic_name : topic name of result, NULL if failed
- tmq_get_db_name : db name of result, NULL if failed

View File

@ -68,143 +68,143 @@ TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤)
### 同步查询示例 ### 同步查询示例
<details> <details>
<summary>同步查询</summary> <summary>同步查询</summary>
```c ```c
{{#include examples/c/demo.c}} {{#include examples/c/demo.c}}
``` ```
格式化输出不同类型字段函数 taos_print_row 格式化输出不同类型字段函数 taos_print_row
```c ```c
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) { int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
int32_t len = 0; int32_t len = 0;
for (int i = 0; i < num_fields; ++i) { for (int i = 0; i < num_fields; ++i) {
if (i > 0) { if (i > 0) {
str[len++] = ' '; str[len++] = ' ';
} }
if (row[i] == NULL) { if (row[i] == NULL) {
len += sprintf(str + len, "%s", TSDB_DATA_NULL_STR); len += sprintf(str + len, "%s", TSDB_DATA_NULL_STR);
continue; continue;
} }
switch (fields[i].type) { switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
len += sprintf(str + len, "%d", *((int8_t *)row[i])); len += sprintf(str + len, "%d", *((int8_t *)row[i]));
break; break;
case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_UTINYINT:
len += sprintf(str + len, "%u", *((uint8_t *)row[i])); len += sprintf(str + len, "%u", *((uint8_t *)row[i]));
break; break;
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
len += sprintf(str + len, "%d", *((int16_t *)row[i])); len += sprintf(str + len, "%d", *((int16_t *)row[i]));
break; break;
case TSDB_DATA_TYPE_USMALLINT: case TSDB_DATA_TYPE_USMALLINT:
len += sprintf(str + len, "%u", *((uint16_t *)row[i])); len += sprintf(str + len, "%u", *((uint16_t *)row[i]));
break; break;
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
len += sprintf(str + len, "%d", *((int32_t *)row[i])); len += sprintf(str + len, "%d", *((int32_t *)row[i]));
break; break;
case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UINT:
len += sprintf(str + len, "%u", *((uint32_t *)row[i])); len += sprintf(str + len, "%u", *((uint32_t *)row[i]));
break; break;
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i])); len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
break; break;
case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_UBIGINT:
len += sprintf(str + len, "%" PRIu64, *((uint64_t *)row[i])); len += sprintf(str + len, "%" PRIu64, *((uint64_t *)row[i]));
break; break;
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
float fv = 0; float fv = 0;
fv = GET_FLOAT_VAL(row[i]); fv = GET_FLOAT_VAL(row[i]);
len += sprintf(str + len, "%f", fv); len += sprintf(str + len, "%f", fv);
} break; } break;
case TSDB_DATA_TYPE_DOUBLE: { case TSDB_DATA_TYPE_DOUBLE: {
double dv = 0; double dv = 0;
dv = GET_DOUBLE_VAL(row[i]); dv = GET_DOUBLE_VAL(row[i]);
len += sprintf(str + len, "%lf", dv); len += sprintf(str + len, "%lf", dv);
} break; } break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE); int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
if (fields[i].type == TSDB_DATA_TYPE_BINARY) { if (fields[i].type == TSDB_DATA_TYPE_BINARY) {
assert(charLen <= fields[i].bytes && charLen >= 0); assert(charLen <= fields[i].bytes && charLen >= 0);
} else { } else {
assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE && charLen >= 0); assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE && charLen >= 0);
}
memcpy(str + len, row[i], charLen);
len += charLen;
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_BOOL:
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
default:
break;
}
}
str[len] = 0;
return len;
} }
``` memcpy(str + len, row[i], charLen);
len += charLen;
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_BOOL:
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
default:
break;
}
}
str[len] = 0;
return len;
}
```
</details> </details>
### 异步查询示例 ### 异步查询示例
<details> <details>
<summary>异步查询</summary> <summary>异步查询</summary>
```c ```c
{{#include examples/c/asyncdemo.c}} {{#include examples/c/asyncdemo.c}}
``` ```
</details> </details>
### 参数绑定示例 ### 参数绑定示例
<details> <details>
<summary>参数绑定</summary> <summary>参数绑定</summary>
```c ```c
{{#include examples/c/prepare.c}} {{#include examples/c/prepare.c}}
``` ```
</details> </details>
### 无模式写入示例 ### 无模式写入示例
<details> <details>
<summary>无模式写入</summary> <summary>无模式写入</summary>
```c ```c
{{#include examples/c/schemaless.c}} {{#include examples/c/schemaless.c}}
``` ```
</details> </details>
### 订阅和消费示例 ### 订阅和消费示例
<details> <details>
<summary>订阅和消费</summary> <summary>订阅和消费</summary>
```c ```c
{{#include examples/c/tmq.c}} {{#include examples/c/tmq.c}}
``` ```
</details> </details>
@ -225,70 +225,70 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
- `int taos_init()` - `int taos_init()`
初始化运行环境。如果没有主动调用该 API那么调用 `taos_connect()` 时驱动将自动调用该 API故程序一般无需手动调用。 初始化运行环境。如果没有主动调用该 API那么调用 `taos_connect()` 时驱动将自动调用该 API故程序一般无需手动调用。
- `void taos_cleanup()` - `void taos_cleanup()`
清理运行环境,应用退出前应调用。 清理运行环境,应用退出前应调用。
- `int taos_options(TSDB_OPTION option, const void * arg, ...)` - `int taos_options(TSDB_OPTION option, const void * arg, ...)`
设置客户端选项,目前支持区域设置(`TSDB_OPTION_LOCALE`)、字符集设置(`TSDB_OPTION_CHARSET`)、时区设置(`TSDB_OPTION_TIMEZONE`)、配置文件路径设置(`TSDB_OPTION_CONFIGDIR`)。区域设置、字符集、时区默认为操作系统当前设置。 设置客户端选项,目前支持区域设置(`TSDB_OPTION_LOCALE`)、字符集设置(`TSDB_OPTION_CHARSET`)、时区设置(`TSDB_OPTION_TIMEZONE`)、配置文件路径设置(`TSDB_OPTION_CONFIGDIR`)。区域设置、字符集、时区默认为操作系统当前设置。
- `char *taos_get_client_info()` - `char *taos_get_client_info()`
获取客户端版本信息。 获取客户端版本信息。
- `TAOS *taos_connect(const char *host, const char *user, const char *pass, const char *db, int port)` - `TAOS *taos_connect(const char *host, const char *user, const char *pass, const char *db, int port)`
创建数据库连接,初始化连接上下文。其中需要用户提供的参数包含: 创建数据库连接,初始化连接上下文。其中需要用户提供的参数包含:
- hostTDengine 集群中任一节点的 FQDN - hostTDengine 集群中任一节点的 FQDN
- user用户名 - user用户名
- pass密码 - pass密码
- db 数据库名字,如果用户没有提供,也可以正常连接,用户可以通过该连接创建新的数据库,如果用户提供了数据库名字,则说明该数据库用户已经创建好,缺省使用该数据库 - db 数据库名字,如果用户没有提供,也可以正常连接,用户可以通过该连接创建新的数据库,如果用户提供了数据库名字,则说明该数据库用户已经创建好,缺省使用该数据库
- porttaosd 程序监听的端口 - porttaosd 程序监听的端口
返回值为空表示失败。应用程序需要保存返回的参数,以便后续使用。 返回值为空表示失败。应用程序需要保存返回的参数,以便后续使用。
:::info :::info
同一进程可以根据不同的 host/port 连接多个 TDengine 集群 同一进程可以根据不同的 host/port 连接多个 TDengine 集群
::: :::
- `TAOS *taos_connect_auth(const char *host, const char *user, const char *auth, const char *db, uint16_t port)` - `TAOS *taos_connect_auth(const char *host, const char *user, const char *auth, const char *db, uint16_t port)`
功能同 taos_connect。除 pass 参数替换为 auth 外,其他参数同 taos_connect。 功能同 taos_connect。除 pass 参数替换为 auth 外,其他参数同 taos_connect。
- auth: 原始密码取 32 位小写 md5 - auth: 原始密码取 32 位小写 md5
- `char *taos_get_server_info(TAOS *taos)` - `char *taos_get_server_info(TAOS *taos)`
获取服务端版本信息。 获取服务端版本信息。
- `int taos_select_db(TAOS *taos, const char *db)` - `int taos_select_db(TAOS *taos, const char *db)`
将当前的缺省数据库设置为 `db`。 将当前的缺省数据库设置为 `db`。
- `int taos_get_current_db(TAOS *taos, char *database, int len, int *required)` - `int taos_get_current_db(TAOS *taos, char *database, int len, int *required)`
- databaselen为用户在外面申请的空间内部会把当前db赋值到database里。 - databaselen为用户在外面申请的空间内部会把当前db赋值到database里。
- 只要是没有正常把db名赋值到database中包括截断返回错误返回值为-1然后用户可以通过 taos_errstr(NULL) 来获取错误提示。 - 只要是没有正常把db名赋值到database中包括截断返回错误返回值为-1然后用户可以通过 taos_errstr(NULL) 来获取错误提示。
- 如果database == NULL 或者 len<=0 返回错误required里保存存储db需要的空间包含最后的'\0' - 如果database == NULL 或者 len<=0 返回错误required里保存存储db需要的空间包含最后的'\0'
- 如果len 小于 存储db需要的空间包含最后的'\0'返回错误database里赋值截断的数据以'\0'结尾。 - 如果len 小于 存储db需要的空间包含最后的'\0'返回错误database里赋值截断的数据以'\0'结尾。
- 如果len 大于等于 存储db需要的空间包含最后的'\0'返回正常0database里赋值以'\0结尾的db名。 - 如果len 大于等于 存储db需要的空间包含最后的'\0'返回正常0database里赋值以'\0结尾的db名。
- `int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)` - `int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)`
设置事件回调函数。 设置事件回调函数。
- fp 事件回调函数指针。函数声明typedef void (*__taos_notify_fn_t)(void *param, void *ext, int type);其中, param 为用户自定义参数ext 为扩展参数(依赖事件类型,针对 TAOS_NOTIFY_PASSVER 返回用户密码版本)type 为事件类型 - fp 事件回调函数指针。函数声明typedef void (*__taos_notify_fn_t)(void *param, void *ext, int type);其中, param 为用户自定义参数ext 为扩展参数(依赖事件类型,针对 TAOS_NOTIFY_PASSVER 返回用户密码版本)type 为事件类型
- param 用户自定义参数 - param 用户自定义参数
- type 事件类型。取值范围1TAOS_NOTIFY_PASSVER 用户密码改变 - type 事件类型。取值范围1TAOS_NOTIFY_PASSVER 用户密码改变
- `void taos_close(TAOS *taos)` - `void taos_close(TAOS *taos)`
关闭连接,其中`taos`是 `taos_connect()` 返回的句柄。 关闭连接,其中`taos`是 `taos_connect()` 返回的句柄。
### 同步查询 API ### 同步查询 API
@ -296,35 +296,35 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
- `TAOS_RES* taos_query(TAOS *taos, const char *sql)` - `TAOS_RES* taos_query(TAOS *taos, const char *sql)`
执行 SQL 语句,可以是 DQL、DML 或 DDL 语句。 其中的 `taos` 参数是通过 `taos_connect()` 获得的句柄。不能通过返回值是否是 `NULL` 来判断执行结果是否失败,而是需要用 `taos_errno()` 函数解析结果集中的错误代码来进行判断。 执行 SQL 语句,可以是 DQL、DML 或 DDL 语句。 其中的 `taos` 参数是通过 `taos_connect()` 获得的句柄。不能通过返回值是否是 `NULL` 来判断执行结果是否失败,而是需要用 `taos_errno()` 函数解析结果集中的错误代码来进行判断。
- `int taos_result_precision(TAOS_RES *res)` - `int taos_result_precision(TAOS_RES *res)`
返回结果集时间戳字段的精度,`0` 代表毫秒,`1` 代表微秒,`2` 代表纳秒。 返回结果集时间戳字段的精度,`0` 代表毫秒,`1` 代表微秒,`2` 代表纳秒。
- `TAOS_ROW taos_fetch_row(TAOS_RES *res)` - `TAOS_ROW taos_fetch_row(TAOS_RES *res)`
按行获取查询结果集中的数据。 按行获取查询结果集中的数据。
- `int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows)` - `int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows)`
批量获取查询结果集中的数据,返回值为获取到的数据的行数。 批量获取查询结果集中的数据,返回值为获取到的数据的行数。
- `int taos_num_fields(TAOS_RES *res)` 和 `int taos_field_count(TAOS_RES *res)` - `int taos_num_fields(TAOS_RES *res)` 和 `int taos_field_count(TAOS_RES *res)`
这两个 API 等价,用于获取查询结果集中的列数。 这两个 API 等价,用于获取查询结果集中的列数。
- `int* taos_fetch_lengths(TAOS_RES *res)` - `int* taos_fetch_lengths(TAOS_RES *res)`
获取结果集中每个字段的长度。返回值是一个数组,其长度为结果集的列数。 获取结果集中每个字段的长度。返回值是一个数组,其长度为结果集的列数。
- `int taos_affected_rows(TAOS_RES *res)` - `int taos_affected_rows(TAOS_RES *res)`
获取被所执行的 SQL 语句影响的行数。 获取被所执行的 SQL 语句影响的行数。
- `TAOS_FIELD *taos_fetch_fields(TAOS_RES *res)` - `TAOS_FIELD *taos_fetch_fields(TAOS_RES *res)`
获取查询结果集每列数据的属性(列的名称、列的数据类型、列的长度),与 `taos_num_fields()` 配合使用,可用来解析 `taos_fetch_row()` 返回的一个元组(一行)的数据。 `TAOS_FIELD` 的结构如下: 获取查询结果集每列数据的属性(列的名称、列的数据类型、列的长度),与 `taos_num_fields()` 配合使用,可用来解析 `taos_fetch_row()` 返回的一个元组(一行)的数据。 `TAOS_FIELD` 的结构如下:
```c ```c
typedef struct taosField { typedef struct taosField {
@ -336,19 +336,19 @@ typedef struct taosField {
- `void taos_stop_query(TAOS_RES *res)` - `void taos_stop_query(TAOS_RES *res)`
停止当前查询的执行。 停止当前查询的执行。
- `void taos_free_result(TAOS_RES *res)` - `void taos_free_result(TAOS_RES *res)`
释放查询结果集以及相关的资源。查询完成后,务必调用该 API 释放资源,否则可能导致应用内存泄露。但也需注意,释放资源后,如果再调用 `taos_consume()` 等获取查询结果的函数,将导致应用崩溃。 释放查询结果集以及相关的资源。查询完成后,务必调用该 API 释放资源,否则可能导致应用内存泄露。但也需注意,释放资源后,如果再调用 `taos_consume()` 等获取查询结果的函数,将导致应用崩溃。
- `char *taos_errstr(TAOS_RES *res)` - `char *taos_errstr(TAOS_RES *res)`
获取最近一次 API 调用失败的原因,返回值为字符串标识的错误提示信息。 获取最近一次 API 调用失败的原因,返回值为字符串标识的错误提示信息。
- `int taos_errno(TAOS_RES *res)` - `int taos_errno(TAOS_RES *res)`
获取最近一次 API 调用失败的原因,返回值为错误代码。 获取最近一次 API 调用失败的原因,返回值为错误代码。
:::note :::note
2.0 及以上版本 TDengine 推荐数据库应用的每个线程都建立一个独立的连接,或基于线程建立连接池。而不推荐在应用中将该连接 (TAOS\*) 结构体传递到不同的线程共享使用。基于 TAOS 结构体发出的查询、写入等操作具有多线程安全性,但 “USE statement” 等状态量有可能在线程之间相互干扰。此外C 语言的连接器可以按照需求动态建立面向数据库的新连接(该过程对用户不可见),同时建议只有在程序最后退出的时候才调用 `taos_close()` 关闭连接。 2.0 及以上版本 TDengine 推荐数据库应用的每个线程都建立一个独立的连接,或基于线程建立连接池。而不推荐在应用中将该连接 (TAOS\*) 结构体传递到不同的线程共享使用。基于 TAOS 结构体发出的查询、写入等操作具有多线程安全性,但 “USE statement” 等状态量有可能在线程之间相互干扰。此外C 语言的连接器可以按照需求动态建立面向数据库的新连接(该过程对用户不可见),同时建议只有在程序最后退出的时候才调用 `taos_close()` 关闭连接。
@ -366,19 +366,19 @@ TDengine 还提供性能更高的异步 API 处理数据插入、查询操作。
- `void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, int code), void *param);` - `void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, int code), void *param);`
异步执行 SQL 语句。 异步执行 SQL 语句。
- taos调用 `taos_connect()` 返回的数据库连接 - taos调用 `taos_connect()` 返回的数据库连接
- sql需要执行的 SQL 语句 - sql需要执行的 SQL 语句
- fp用户定义的回调函数其第三个参数 `code` 用于指示操作是否成功,`0` 表示成功,负数表示失败(调用 `taos_errstr()` 可获取失败原因)。应用在定义回调函数的时候,主要处理第二个参数 `TAOS_RES *`,该参数是查询返回的结果集 - fp用户定义的回调函数其第三个参数 `code` 用于指示操作是否成功,`0` 表示成功,负数表示失败(调用 `taos_errstr()` 可获取失败原因)。应用在定义回调函数的时候,主要处理第二个参数 `TAOS_RES *`,该参数是查询返回的结果集
- param应用提供一个用于回调的参数 - param应用提供一个用于回调的参数
- `void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);` - `void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);`
批量获取异步查询的结果集,只能与 `taos_query_a()` 配合使用。其中: 批量获取异步查询的结果集,只能与 `taos_query_a()` 配合使用。其中:
- res`taos_query_a()` 回调时返回的结果集 - res`taos_query_a()` 回调时返回的结果集
- fp回调函数。其参数 `param` 是用户可定义的传递给回调函数的参数结构体;`numOfRows` 是获取到的数据的行数(不是整个查询结果集的函数)。 在回调函数中,应用可以通过调用 `taos_fetch_row()` 前向迭代获取批量记录中每一行记录。读完一块内的所有记录后,应用需要在回调函数中继续调用 `taos_fetch_rows_a()` 获取下一批记录进行处理,直到返回的记录数 `numOfRows` 为零(结果返回完成)或记录数为负值(查询出错)。 - fp回调函数。其参数 `param` 是用户可定义的传递给回调函数的参数结构体;`numOfRows` 是获取到的数据的行数(不是整个查询结果集的函数)。 在回调函数中,应用可以通过调用 `taos_fetch_row()` 前向迭代获取批量记录中每一行记录。读完一块内的所有记录后,应用需要在回调函数中继续调用 `taos_fetch_rows_a()` 获取下一批记录进行处理,直到返回的记录数 `numOfRows` 为零(结果返回完成)或记录数为负值(查询出错)。
TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多线程同时打开多张表,并可以同时对每张打开的表进行查询或者插入操作。需要指出的是,**客户端应用必须确保对同一张表的操作完全串行化**,即对同一个表的插入或查询操作未完成时(未返回时),不能够执行第二个插入或查询操作。 TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多线程同时打开多张表,并可以同时对每张打开的表进行查询或者插入操作。需要指出的是,**客户端应用必须确保对同一张表的操作完全串行化**,即对同一个表的插入或查询操作未完成时(未返回时),不能够执行第二个插入或查询操作。
@ -404,71 +404,71 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
- `TAOS_STMT* taos_stmt_init(TAOS *taos)` - `TAOS_STMT* taos_stmt_init(TAOS *taos)`
创建一个 TAOS_STMT 对象用于后续调用。 创建一个 TAOS_STMT 对象用于后续调用。
- `int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length)` - `int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length)`
解析一条 SQL 语句,将解析结果和参数信息绑定到 stmt 上,如果参数 length 大于 0将使用此参数作为 SQL 语句的长度,如等于 0将自动判断 SQL 语句的长度。 解析一条 SQL 语句,将解析结果和参数信息绑定到 stmt 上,如果参数 length 大于 0将使用此参数作为 SQL 语句的长度,如等于 0将自动判断 SQL 语句的长度。
- `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind)` - `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind)`
不如 `taos_stmt_bind_param_batch()` 效率高,但可以支持非 INSERT 类型的 SQL 语句。 不如 `taos_stmt_bind_param_batch()` 效率高,但可以支持非 INSERT 类型的 SQL 语句。
进行参数绑定bind 指向一个数组(代表所要绑定的一行数据),需保证此数组中的元素数量和顺序与 SQL 语句中的参数完全一致。TAOS_MULTI_BIND 的使用方法与 MySQL 中的 MYSQL_BIND 类似,具体定义如下: 进行参数绑定bind 指向一个数组(代表所要绑定的一行数据),需保证此数组中的元素数量和顺序与 SQL 语句中的参数完全一致。TAOS_MULTI_BIND 的使用方法与 MySQL 中的 MYSQL_BIND 类似,具体定义如下:
```c ```c
typedef struct TAOS_MULTI_BIND { typedef struct TAOS_MULTI_BIND {
int buffer_type; int buffer_type;
void *buffer; void *buffer;
uintptr_t buffer_length; uintptr_t buffer_length;
uint32_t *length; uint32_t *length;
char *is_null; char *is_null;
int num; // the number of columns int num; // the number of columns
} TAOS_MULTI_BIND; } TAOS_MULTI_BIND;
``` ```
- `int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name)` - `int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name)`
2.1.1.0 版本新增,仅支持用于替换 INSERT 语句中的参数值) 2.1.1.0 版本新增,仅支持用于替换 INSERT 语句中的参数值)
当 SQL 语句中的表名使用了 `?` 占位时,可以使用此函数绑定一个具体的表名。 当 SQL 语句中的表名使用了 `?` 占位时,可以使用此函数绑定一个具体的表名。
- `int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_MULTI_BIND* tags)` - `int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_MULTI_BIND* tags)`
2.1.2.0 版本新增,仅支持用于替换 INSERT 语句中的参数值) 2.1.2.0 版本新增,仅支持用于替换 INSERT 语句中的参数值)
当 SQL 语句中的表名和 TAGS 都使用了 `?` 占位时,可以使用此函数绑定具体的表名和具体的 TAGS 取值。最典型的使用场景是使用了自动建表功能的 INSERT 语句(目前版本不支持指定具体的 TAGS 列。TAGS 参数中的列数量需要与 SQL 语句中要求的 TAGS 数量完全一致。 当 SQL 语句中的表名和 TAGS 都使用了 `?` 占位时,可以使用此函数绑定具体的表名和具体的 TAGS 取值。最典型的使用场景是使用了自动建表功能的 INSERT 语句(目前版本不支持指定具体的 TAGS 列。TAGS 参数中的列数量需要与 SQL 语句中要求的 TAGS 数量完全一致。
- `int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind)` - `int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind)`
2.1.1.0 版本新增,仅支持用于替换 INSERT 语句中的参数值) 2.1.1.0 版本新增,仅支持用于替换 INSERT 语句中的参数值)
以多列的方式传递待绑定的数据,需要保证这里传递的数据列的顺序、列的数量与 SQL 语句中的 VALUES 参数完全一致。TAOS_MULTI_BIND 的具体定义如下: 以多列的方式传递待绑定的数据,需要保证这里传递的数据列的顺序、列的数量与 SQL 语句中的 VALUES 参数完全一致。TAOS_MULTI_BIND 的具体定义如下:
- `int taos_stmt_add_batch(TAOS_STMT *stmt)` - `int taos_stmt_add_batch(TAOS_STMT *stmt)`
将当前绑定的参数加入批处理中,调用此函数后,可以再次调用 `taos_stmt_bind_param()` 或 `taos_stmt_bind_param_batch()` 绑定新的参数。需要注意,此函数仅支持 INSERT/IMPORT 语句,如果是 SELECT 等其他 SQL 语句,将返回错误。 将当前绑定的参数加入批处理中,调用此函数后,可以再次调用 `taos_stmt_bind_param()` 或 `taos_stmt_bind_param_batch()` 绑定新的参数。需要注意,此函数仅支持 INSERT/IMPORT 语句,如果是 SELECT 等其他 SQL 语句,将返回错误。
- `int taos_stmt_execute(TAOS_STMT *stmt)` - `int taos_stmt_execute(TAOS_STMT *stmt)`
执行准备好的语句。目前,一条语句只能执行一次。 执行准备好的语句。目前,一条语句只能执行一次。
- `int taos_stmt_affected_rows(TAOS_STMT *stmt)` - `int taos_stmt_affected_rows(TAOS_STMT *stmt)`
获取执行多次绑定语句影响的行数。 获取执行多次绑定语句影响的行数。
- `int taos_stmt_affected_rows_once(TAOS_STMT *stmt)` - `int taos_stmt_affected_rows_once(TAOS_STMT *stmt)`
获取执行一次绑定语句影响的行数。 获取执行一次绑定语句影响的行数。
- `TAOS_RES* taos_stmt_use_result(TAOS_STMT *stmt)` - `TAOS_RES* taos_stmt_use_result(TAOS_STMT *stmt)`
获取语句的结果集。结果集的使用方式与非参数化调用时一致,使用完成后,应对此结果集调用 `taos_free_result()` 以释放资源。 获取语句的结果集。结果集的使用方式与非参数化调用时一致,使用完成后,应对此结果集调用 `taos_free_result()` 以释放资源。
- `int taos_stmt_close(TAOS_STMT *stmt)` - `int taos_stmt_close(TAOS_STMT *stmt)`
执行完毕,释放所有资源。 执行完毕,释放所有资源。
- `char * taos_stmt_errstr(TAOS_STMT *stmt)` - `char * taos_stmt_errstr(TAOS_STMT *stmt)`
2.1.3.0 版本新增) 2.1.3.0 版本新增)
用于在其他 STMT API 返回错误(返回错误码或空指针)时获取错误信息。 用于在其他 STMT API 返回错误(返回错误码或空指针)时获取错误信息。
### 无模式schemaless写入 API ### 无模式schemaless写入 API
@ -476,43 +476,43 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
- `TAOS_RES* taos_schemaless_insert(TAOS* taos, const char* lines[], int numLines, int protocol, int precision)` - `TAOS_RES* taos_schemaless_insert(TAOS* taos, const char* lines[], int numLines, int protocol, int precision)`
**功能说明** **功能说明**
- 该接口将行协议的文本数据写入到 TDengine 中。 - 该接口将行协议的文本数据写入到 TDengine 中。
**参数说明** **参数说明**
- taos: 数据库连接,通过 `taos_connect()` 函数建立的数据库连接。 - taos: 数据库连接,通过 `taos_connect()` 函数建立的数据库连接。
- lines文本数据。满足解析格式要求的无模式文本字符串。 - lines文本数据。满足解析格式要求的无模式文本字符串。
- numLines:文本数据的行数,不能为 0 。 - numLines:文本数据的行数,不能为 0 。
- protocol: 行协议类型,用于标识文本数据格式。 - protocol: 行协议类型,用于标识文本数据格式。
- precision文本数据中的时间戳精度字符串。 - precision文本数据中的时间戳精度字符串。
**返回值** **返回值**
- TAOS_RES 结构体,应用可以通过使用 `taos_errstr()` 获得错误信息,也可以使用 `taos_errno()` 获得错误码。 - TAOS_RES 结构体,应用可以通过使用 `taos_errstr()` 获得错误信息,也可以使用 `taos_errno()` 获得错误码。
在某些情况下,返回的 TAOS_RES 为 `NULL`,此时仍然可以调用 `taos_errno()` 来安全地获得错误码信息。 在某些情况下,返回的 TAOS_RES 为 `NULL`,此时仍然可以调用 `taos_errno()` 来安全地获得错误码信息。
返回的 TAOS_RES 需要调用方来负责释放,否则会出现内存泄漏。 返回的 TAOS_RES 需要调用方来负责释放,否则会出现内存泄漏。
**说明** **说明**
协议类型是枚举类型,包含以下三种格式:
- TSDB_SML_LINE_PROTOCOLInfluxDB 行协议Line Protocol) 协议类型是枚举类型,包含以下三种格式:
- TSDB_SML_TELNET_PROTOCOL: OpenTSDB Telnet 文本行协议
- TSDB_SML_JSON_PROTOCOL: OpenTSDB Json 协议格式
时间戳分辨率的定义,定义在 `taos.h` 文件中,具体内容如下: - TSDB_SML_LINE_PROTOCOLInfluxDB 行协议Line Protocol)
- TSDB_SML_TELNET_PROTOCOL: OpenTSDB Telnet 文本行协议
- TSDB_SML_JSON_PROTOCOL: OpenTSDB Json 协议格式
- TSDB_SML_TIMESTAMP_NOT_CONFIGURED = 0, 时间戳分辨率的定义,定义在 `taos.h` 文件中,具体内容如下:
- TSDB_SML_TIMESTAMP_HOURS,
- TSDB_SML_TIMESTAMP_MINUTES,
- TSDB_SML_TIMESTAMP_SECONDS,
- TSDB_SML_TIMESTAMP_MILLI_SECONDS,
- TSDB_SML_TIMESTAMP_MICRO_SECONDS,
- TSDB_SML_TIMESTAMP_NANO_SECONDS
需要注意的是,时间戳分辨率参数只在协议类型为 `SML_LINE_PROTOCOL` 的时候生效。 - TSDB_SML_TIMESTAMP_NOT_CONFIGURED = 0,
对于 OpenTSDB 的文本协议,时间戳的解析遵循其官方解析规则 — 按照时间戳包含的字符的数量来确认时间精度。 - TSDB_SML_TIMESTAMP_HOURS,
- TSDB_SML_TIMESTAMP_MINUTES,
- TSDB_SML_TIMESTAMP_SECONDS,
- TSDB_SML_TIMESTAMP_MILLI_SECONDS,
- TSDB_SML_TIMESTAMP_MICRO_SECONDS,
- TSDB_SML_TIMESTAMP_NANO_SECONDS
**schemaless 其他相关的接口** 需要注意的是,时间戳分辨率参数只在协议类型为 `SML_LINE_PROTOCOL` 的时候生效。
对于 OpenTSDB 的文本协议,时间戳的解析遵循其官方解析规则 — 按照时间戳包含的字符的数量来确认时间精度。
**schemaless 其他相关的接口**
- `TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int64_t reqid)` - `TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int64_t reqid)`
- `TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision)` - `TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision)`
- `TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid)` - `TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid)`
@ -521,17 +521,97 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
- `TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl)` - `TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl)`
- `TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl, int64_t reqid)` - `TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl, int64_t reqid)`
**说明** **说明**
- 上面这7个接口是扩展接口主要用于在schemaless写入时传递ttl、reqid参数可以根据需要使用。 - 上面这7个接口是扩展接口主要用于在schemaless写入时传递ttl、reqid参数可以根据需要使用。
- 带_raw的接口通过传递的参数lines指针和长度len来表示数据为了解决原始接口数据包含'\0'而被截断的问题。totalRows指针返回解析出来的数据行数。 - 带_raw的接口通过传递的参数lines指针和长度len来表示数据为了解决原始接口数据包含'\0'而被截断的问题。totalRows指针返回解析出来的数据行数。
- 带_ttl的接口可以传递ttl参数来控制建表的ttl到期时间。 - 带_ttl的接口可以传递ttl参数来控制建表的ttl到期时间。
- 带_reqid的接口可以通过传递reqid参数来追踪整个的调用链。 - 带_reqid的接口可以通过传递reqid参数来追踪整个的调用链。
### 数据订阅 API ### 数据订阅 API
- `const char *tmq_err2str(int32_t code)`
**功能说明**
- 该接口用于将数据订阅的错误码转换为错误信息
**参数说明**
- code: 数据订阅的错误码
**返回值**
- 非NULL返回错误信息错误信息可能为空字符串
- `tmq_conf_t *tmq_conf_new()`
- `tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value)`
- `void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param)`
- `void tmq_conf_destroy(tmq_conf_t *conf)`
tmq_conf_res_t 错误码定义如下:
```
typedef enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
} tmq_conf_res_t;
```
设置自动提交回调函数的定义如下:
```
typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param))
```
**功能说明**
- tmq_conf_new 接口用于创建一个 tmq_conf_t 结构体,用于配置消费参数。
- tmq_conf_set 接口用于设置消费参数key 为参数名value 为参数值。
- tmq_conf_set_auto_commit_cb 接口用于设置自动提交回调函数,参数为回调函数和回调函数的参数。
- tmq_conf_destroy 接口用于销毁 tmq_conf_t 结构体。
- `tmq_list_t *tmq_list_new()`
- `int32_t tmq_list_append(tmq_list_t *, const char *)`
- `void tmq_list_destroy(tmq_list_t *)`
- `int32_t tmq_list_get_size(const tmq_list_t *)`
- `char **tmq_list_to_c_array(const tmq_list_t *)`
**功能说明**
- tmq_list_new 接口用于创建一个 tmq_list_t 结构体,用于存储订阅的 topic。
- tmq_list_append 接口用于向 tmq_list_t 结构体中添加一个 topic。
- tmq_list_destroy 接口用于销毁 tmq_list_t 结构体tmq_list_new 的结果需要通过该接口销毁。
- tmq_list_get_size 接口用于获取 tmq_list_t 结构体中 topic 的个数。
- tmq_list_to_c_array 接口用于将 tmq_list_t 结构体转换为 C 数组,数组每个元素为字符串指针。
- `tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen)`
- `int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list)`
- `int32_t tmq_unsubscribe(tmq_t *tmq)`
- `int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topic_list)`
- `TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout)`
- `int32_t tmq_consumer_close(tmq_t *tmq)`
**功能说明**
- tmq_consumer_new 接口用于创建一个 tmq_t 结构体,用于消费数据,消费完数据后需调用 tmq_consumer_close 关闭消费者。
- tmq_subscribe 接口用于订阅 topic 列表,消费完数据后,需调用 tmq_subscribe 取消订阅。
- tmq_unsubscribe 接口用于取消订阅的 topic 列表。需与 tmq_subscribe 配合使用。
- tmq_subscription 接口用于获取订阅的 topic 列表。
- tmq_consumer_poll 接口用于轮询消费数据,每一个消费者,只能单线程调用该接口。
- tmq_consumer_close 接口用于关闭 tmq_t 结构体。需与 tmq_consumer_new 配合使用。
**参数说明**
- conf: 参数用于配置消费参数
- errstr: 错误信息存储在这个字符串中,需自定分配内存,释放内存由调用者负责
- errstenLen: errstr 字符串的长度
- tmq: tmq_consumer_new 函数返回的 tmq_t 结构体
- topic_list: topic 列表
- timeout: 超时时间,单位为毫秒,表示多久没数据的话自动返回 NULL负数的话默认超时1秒
**返回值**
- tmq_consumer_new 返回 tmq_t 结构体,失败返回 NULL
- tmq_subscribe 返回错误码0 表示成功非0表示失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- tmq_unsubscribe 返回错误码0 表示成功非0表示失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- tmq_subscription 返回错误码0 表示成功非0表示失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- tmq_consumer_poll 返回 TAOS_RES 结构体NULL 表示没有数据,非 NULL 表示有数据TAOS_RES 结果和 taos_query 返回结果一致,可通过查询的各种接口获取 TAOS_RES 里的信息,比如 schema 等。
- tmq_consumer_close 返回错误码0 表示成功非0表示失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- `int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment)` - `int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment)`
- `void tmq_free_assignment(tmq_topic_assignment* pAssignment)` - `void tmq_free_assignment(tmq_topic_assignment* pAssignment)`
tmq_topic_assignment结构体定义如下 tmq_topic_assignment结构体定义如下
```c ```c
typedef struct tmq_topic_assignment { typedef struct tmq_topic_assignment {
@ -541,6 +621,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
int64_t end; int64_t end;
} tmq_topic_assignment; } tmq_topic_assignment;
``` ```
**功能说明** **功能说明**
- tmq_get_topic_assignment 接口返回当前consumer分配的vgroup的信息每个vgroup的信息包括vgIdwal的最大最小offset以及当前消费到的offset。 - tmq_get_topic_assignment 接口返回当前consumer分配的vgroup的信息每个vgroup的信息包括vgIdwal的最大最小offset以及当前消费到的offset。
@ -551,65 +632,97 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
**返回值** **返回值**
- 错误码0成功非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。 - 错误码0成功非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- `int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId)` - `int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**功能说明** **功能说明**
- 获取当前 consumer 在某个 topic 和 vgroup上的 commit 位置。 - 获取当前 consumer 在某个 topic 和 vgroup上的 commit 位置。
**返回值** **返回值**
- 当前commit的位置-2147467247表示没有消费进度其他小于0的值表示失败错误码就是返回值 - 当前commit的位置-2147467247表示没有消费进度其他小于0的值表示失败错误码就是返回值
- `int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg)` - `int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg)`
- `void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param)` - `void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param)`
- `int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)` - `int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
- `void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param)` - `void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param)`
**功能说明** **功能说明**
- commit接口分为两种类型每种类型有同步和异步接口 - commit接口分为两种类型每种类型有同步和异步接口
- 第一种类型根据消息提交提交消息里的进度如果消息传NULL提交当前consumer所有消费的vgroup的当前进度 : tmq_commit_sync/tmq_commit_async - 第一种类型根据消息提交提交消息里的进度如果消息传NULL提交当前consumer所有消费的vgroup的当前进度 : tmq_commit_sync/tmq_commit_async
- 第二种类型根据某个topic的某个vgroup的offset提交 : tmq_commit_offset_sync/tmq_commit_offset_async - 第二种类型根据某个topic的某个vgroup的offset提交 : tmq_commit_offset_sync/tmq_commit_offset_async
**参数说明** **参数说明**
- msg消费到的消息结构如果msg传NULL提交当前consumer所有消费的vgroup的当前进度 - msg消费到的消息结构如果msg传NULL提交当前consumer所有消费的vgroup的当前进度
**返回值** **返回值**
- 错误码0成功非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息 - 错误码0成功非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId)` - `int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**功能说明** **功能说明**
- 获取当前消费位置,为消费到的数据位置的下一个位置 - 获取当前消费位置,为消费到的数据位置的下一个位置
**返回值** **返回值**
- 消费位置非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息 - 消费位置非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
- `int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
**功能说明** **功能说明**
- 设置 consumer 在某个topic的某个vgroup的 offset位置开始消费 - 设置 consumer 在某个topic的某个vgroup的 offset位置开始消费
**返回值** **返回值**
- 错误码0成功非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息 - 错误码0成功非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int32_t int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
- `int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
- `int32_t tmq_get_vgroup_id(TAOS_RES *res)`
**功能说明** **功能说明**
- 获取 poll 消费到的数据的起始offset - tmq_get_vgroup_offset 获取 poll 消费到的数据的起始offset
- tmq_get_vgroup_id 获取 poll 消费到的数据的所属的vgrou id
**参数说明** **参数说明**
- msg消费到的消息结构 - msg消费到的消息结构
**返回值** **返回值**
- 消费到的offset非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息 - tmq_get_vgroup_offset 返回值为消费到的offset非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- tmq_get_vgroup_id 返回值为消费到的数据所属的vgrou id非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int32_t int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics)`
- `TAOS *tmq_get_connect(tmq_t *tmq)`
- `const char *tmq_get_table_name(TAOS_RES *res)`
- `tmq_res_t tmq_get_res_type(TAOS_RES *res)`
- `const char *tmq_get_topic_name(TAOS_RES *res)`
- `const char *tmq_get_db_name(TAOS_RES *res)`
tmq_res_t 表示消费到的数据类型,定义如下:
```
typedef enum tmq_res_t {
TMQ_RES_INVALID = -1, // invalid
TMQ_RES_DATA = 1, // 数据
TMQ_RES_TABLE_META = 2, // 元数据
TMQ_RES_METADATA = 3 // 既有元数据又有数据,即自动建表
} tmq_res_t;
```
**功能说明** **功能说明**
- 获取消费者订阅的 topic 列表 - tmq_get_connect 创建consumer时会自动建立链接保存在 tmq_t 结构体中,该接口用户获取 tmq_t 结构体中的链接信息类似taos_connect
- tmq_get_table_name 获取返回结果所属的的表名
- tmq_get_res_type 获取返回结果的类型
- tmq_get_topic_name 获取返回结果所属的topic名
- tmq_get_db_name 获取返回结果所属的数据库名
**参数说明** **参数说明**
- topics: 获取的 topic 列表存储在这个结构中接口内分配内存需调用tmq_list_destroy释放 - tmqtmq_consumer_new 返回的消费者handle
- restmq_consumer_poll 返回的消费到的消息
**返回值** **返回值**
- 错误码0成功非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息 - tmq_get_connect 返回值为tmq_t结构体中的链接连接非 NULL 正常NULL 失败
- tmq_get_table_name 返回值为消费到的数据所属的表名,非 NULL 正常NULL 失败
- tmq_get_res_type 返回值为消费到的数据所属的类型,具体见上面 tmq_res_t 的注释说明
- tmq_get_topic_name 返回值为消费到的数据所属的 topic 名,非 NULL 正常NULL 失败
- tmq_get_db_name 返回值为消费到的数据所属的数据库名,非 NULL 正常NULL 失败

View File

@ -2342,7 +2342,7 @@ const char* tmq_get_db_name(TAOS_RES* res) {
int32_t tmq_get_vgroup_id(TAOS_RES* res) { int32_t tmq_get_vgroup_id(TAOS_RES* res) {
if (res == NULL) { if (res == NULL) {
return -1; return TSDB_CODE_INVALID_PARA;
} }
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) { if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
return ((SMqRspObjCommon*)res)->vgId; return ((SMqRspObjCommon*)res)->vgId;
@ -2350,7 +2350,7 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return pMetaRspObj->vgId; return pMetaRspObj->vgId;
} else { } else {
return -1; return TSDB_CODE_INVALID_PARA;
} }
} }