diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 9a6a5329ae..9bbda8309f 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG 3.0 + GIT_TAG main SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/docs/en/14-reference/03-connector/07-python.mdx b/docs/en/14-reference/03-connector/07-python.mdx index 43713117f9..2a6cd9ecf7 100644 --- a/docs/en/14-reference/03-connector/07-python.mdx +++ b/docs/en/14-reference/03-connector/07-python.mdx @@ -20,6 +20,11 @@ The source code for the Python connector is hosted on [GitHub](https://github.co - The [supported platforms](/reference/connector/#supported-platforms) for the native connection are the same as the ones supported by the TDengine client. - REST connections are supported on all platforms that can run Python. +### Supported features + +- Native connections support all the core features of TDengine, including connection management, SQL execution, bind interface, subscriptions, and schemaless writing. +- REST connections support features such as connection management and SQL execution. (SQL execution allows you to: manage databases, tables, and supertables, write data, query data, create continuous queries, etc.). + ## Version selection We recommend using the latest version of `taospy`, regardless of the version of TDengine. @@ -64,10 +69,23 @@ All exceptions from the Python Connector are thrown directly. Applications shoul {{#include docs/examples/python/handle_exception.py}} ``` -## Supported features +## TDengine DataType vs. Python DataType -- Native connections support all the core features of TDengine, including connection management, SQL execution, bind interface, subscriptions, and schemaless writing. -- REST connections support features such as connection management and SQL execution. (SQL execution allows you to: manage databases, tables, and supertables, write data, query data, create continuous queries, etc.). +TDengine currently supports timestamp, number, character, Boolean type, and the corresponding type conversion with Python is as follows: + +|TDengine DataType|Python DataType| +|:---------------:|:-------------:| +|TIMESTAMP|datetime| +|INT|int| +|BIGINT|int| +|FLOAT|float| +|DOUBLE|int| +|SMALLINT|int| +|TINYINT|int| +|BOOL|bool| +|BINARY|str| +|NCHAR|str| +|JSON|str| ## Installation @@ -544,7 +562,7 @@ Connector support data subscription. For more information about subscroption, pl The `consumer` in the connector contains the subscription api. -#### Create Consumer +##### Create Consumer The syntax for creating a consumer is `consumer = Consumer(configs)`. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/). @@ -554,7 +572,7 @@ from taos.tmq import Consumer consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) ``` -#### Subscribe topics +##### Subscribe topics The `subscribe` function is used to subscribe to a list of topics. @@ -562,7 +580,7 @@ The `subscribe` function is used to subscribe to a list of topics. consumer.subscribe(['topic1', 'topic2']) ``` -#### Consume +##### Consume The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data. @@ -580,7 +598,7 @@ while True: print(block.fetchall()) ``` -#### assignment +##### assignment The `assignment` function is used to get the assignment of the topic. @@ -588,7 +606,7 @@ The `assignment` function is used to get the assignment of the topic. assignments = consumer.assignment() ``` -#### Seek +##### Seek The `seek` function is used to reset the assignment of the topic. @@ -597,7 +615,7 @@ tp = TopicPartition(topic='topic1', partition=0, offset=0) consumer.seek(tp) ``` -#### After consuming data +##### After consuming data You should unsubscribe to the topics and close the consumer after consuming. @@ -606,13 +624,13 @@ consumer.unsubscribe() consumer.close() ``` -#### Tmq subscription example +##### Tmq subscription example ```python {{#include docs/examples/python/tmq_example.py}} ``` -#### assignment and seek example +##### assignment and seek example ```python {{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} @@ -624,7 +642,7 @@ consumer.close() In addition to native connections, the connector also supports subscriptions via websockets. -#### Create Consumer +##### Create Consumer The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer). @@ -634,7 +652,7 @@ import taosws consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) ``` -#### subscribe topics +##### subscribe topics The `subscribe` function is used to subscribe to a list of topics. @@ -642,7 +660,7 @@ The `subscribe` function is used to subscribe to a list of topics. consumer.subscribe(['topic1', 'topic2']) ``` -#### Consume +##### Consume The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data. @@ -659,7 +677,7 @@ while True: print(row) ``` -#### assignment +##### assignment The `assignment` function is used to get the assignment of the topic. @@ -667,7 +685,7 @@ The `assignment` function is used to get the assignment of the topic. assignments = consumer.assignment() ``` -#### Seek +##### Seek The `seek` function is used to reset the assignment of the topic. @@ -675,7 +693,7 @@ The `seek` function is used to reset the assignment of the topic. consumer.seek(topic='topic1', partition=0, offset=0) ``` -#### After consuming data +##### After consuming data You should unsubscribe to the topics and close the consumer after consuming. @@ -684,13 +702,13 @@ consumer.unsubscribe() consumer.close() ``` -#### Subscription example +##### Subscription example ```python {{#include docs/examples/python/tmq_websocket_example.py}} ``` -#### Assignment and seek example +##### Assignment and seek example ```python {{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} @@ -706,19 +724,19 @@ Connector support schemaless insert. -Simple insert +##### Simple insert ```python {{#include docs/examples/python/schemaless_insert.py}} ``` -Insert with ttl argument +##### Insert with ttl argument ```python {{#include docs/examples/python/schemaless_insert_ttl.py}} ``` -Insert with req_id argument +##### Insert with req_id argument ```python {{#include docs/examples/python/schemaless_insert_req_id.py}} @@ -728,19 +746,19 @@ Insert with req_id argument -Simple insert +##### Simple insert ```python {{#include docs/examples/python/schemaless_insert_raw.py}} ``` -Insert with ttl argument +##### Insert with ttl argument ```python {{#include docs/examples/python/schemaless_insert_raw_ttl.py}} ``` -Insert with req_id argument +##### Insert with req_id argument ```python {{#include docs/examples/python/schemaless_insert_raw_req_id.py}} @@ -756,7 +774,7 @@ The Python connector provides a parameter binding api for inserting data. Simila -#### Create Stmt +##### Create Stmt Call the `statement` method in `Connection` to create the `stmt` for parameter binding. @@ -767,7 +785,7 @@ conn = taos.connect() stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") ``` -#### parameter binding +##### parameter binding Call the `new_multi_binds` function to create the parameter list for parameter bindings. @@ -797,7 +815,7 @@ Call the `bind_param` (for a single row) method or the `bind_param_batch` (for m stmt.bind_param_batch(params) ``` -#### execute sql +##### execute sql Call `execute` method to execute sql. @@ -805,13 +823,13 @@ Call `execute` method to execute sql. stmt.execute() ``` -#### Close Stmt +##### Close Stmt ``` stmt.close() ``` -#### Example +##### Example ```python {{#include docs/examples/python/stmt_example.py}} @@ -820,7 +838,7 @@ stmt.close() -#### Create Stmt +##### Create Stmt Call the `statement` method in `Connection` to create the `stmt` for parameter binding. @@ -831,7 +849,7 @@ conn = taosws.connect('taosws://localhost:6041/test') stmt = conn.statement() ``` -#### Prepare sql +##### Prepare sql Call `prepare` method in stmt to prepare sql. @@ -839,7 +857,7 @@ Call `prepare` method in stmt to prepare sql. stmt.prepare("insert into t1 values (?, ?, ?, ?)") ``` -#### parameter binding +##### parameter binding Call the `bind_param` method to bind parameters. @@ -858,7 +876,7 @@ Call the `add_batch` method to add parameters to the batch. stmt.add_batch() ``` -#### execute sql +##### execute sql Call `execute` method to execute sql. @@ -866,13 +884,13 @@ Call `execute` method to execute sql. stmt.execute() ``` -#### Close Stmt +##### Close Stmt ``` stmt.close() ``` -#### Example +##### Example ```python {{#include docs/examples/python/stmt_websocket_example.py}} diff --git a/docs/zh/08-connector/30-python.mdx b/docs/zh/08-connector/30-python.mdx index 8fdb428384..fcae6e2b6b 100644 --- a/docs/zh/08-connector/30-python.mdx +++ b/docs/zh/08-connector/30-python.mdx @@ -22,7 +22,12 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con - 原生连接[支持的平台](../#支持的平台)和 TDengine 客户端支持的平台一致。 - REST 连接支持所有能运行 Python 的平台。 -## 版本选择 +### 支持的功能 + +- 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。 +- REST 连接支持的功能包括:连接管理、执行 SQL。 (通过执行 SQL 可以: 管理数据库、管理表和超级表、写入数据、查询数据、创建连续查询等)。 + +## 历史版本 无论使用什么版本的 TDengine 都建议使用最新版本的 `taospy`。 @@ -66,12 +71,25 @@ Python Connector 的所有数据库操作如果出现异常,都会直接抛出 {{#include docs/examples/python/handle_exception.py}} ``` -## 支持的功能 +TDengine DataType 和 Python DataType -- 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。 -- REST 连接支持的功能包括:连接管理、执行 SQL。 (通过执行 SQL 可以: 管理数据库、管理表和超级表、写入数据、查询数据、创建连续查询等)。 +TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对应类型转换如下: -## 安装 +|TDengine DataType|Python DataType| +|:---------------:|:-------------:| +|TIMESTAMP|datetime| +|INT|int| +|BIGINT|int| +|FLOAT|float| +|DOUBLE|int| +|SMALLINT|int| +|TINYINT|int| +|BOOL|bool| +|BINARY|str| +|NCHAR|str| +|JSON|str| + +## 安装步骤 ### 安装前准备 @@ -384,7 +402,7 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 -#### Connection 类的使用 +##### Connection 类的使用 `Connection` 类既包含对 PEP249 Connection 接口的实现(如:cursor方法和 close 方法),也包含很多扩展功能(如: execute、 query、schemaless_insert 和 subscribe 方法。 @@ -548,7 +566,7 @@ RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方 `Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。 -#### 创建 Consumer +##### 创建 Consumer 创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。 @@ -558,7 +576,7 @@ from taos.tmq import Consumer consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) ``` -#### 订阅 topics +##### 订阅 topics Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。 @@ -566,7 +584,7 @@ Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时 consumer.subscribe(['topic1', 'topic2']) ``` -#### 消费数据 +##### 消费数据 Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。 @@ -584,7 +602,7 @@ while True: print(block.fetchall()) ``` -#### 获取消费进度 +##### 获取消费进度 Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。 @@ -592,7 +610,7 @@ Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic assignments = consumer.assignment() ``` -#### 重置消费进度 +##### 指定订阅 Offset Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。 @@ -601,7 +619,7 @@ tp = TopicPartition(topic='topic1', partition=0, offset=0) consumer.seek(tp) ``` -#### 结束消费 +##### 关闭订阅 消费结束后,应当取消订阅,并关闭 Consumer。 @@ -610,13 +628,13 @@ consumer.unsubscribe() consumer.close() ``` -#### tmq 订阅示例代码 +##### 完整示例 ```python {{#include docs/examples/python/tmq_example.py}} ``` -#### 获取和重置消费进度示例代码 +##### 获取和重置消费进度示例代码 ```python {{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} @@ -630,7 +648,7 @@ consumer.close() taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。 -#### 创建 Consumer +##### 创建 Consumer 创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。 @@ -640,7 +658,7 @@ import taosws consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) ``` -#### 订阅 topics +##### 订阅 topics Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。 @@ -648,7 +666,7 @@ Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时 consumer.subscribe(['topic1', 'topic2']) ``` -#### 消费数据 +##### 消费数据 Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。 @@ -665,7 +683,7 @@ while True: print(row) ``` -#### 获取消费进度 +##### 获取消费进度 Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。 @@ -673,7 +691,7 @@ Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic assignments = consumer.assignment() ``` -#### 重置消费进度 +##### 重置消费进度 Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。 @@ -681,7 +699,7 @@ Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位 consumer.seek(topic='topic1', partition=0, offset=0) ``` -#### 结束消费 +##### 结束消费 消费结束后,应当取消订阅,并关闭 Consumer。 @@ -690,7 +708,7 @@ consumer.unsubscribe() consumer.close() ``` -#### tmq 订阅示例代码 +##### tmq 订阅示例代码 ```python {{#include docs/examples/python/tmq_websocket_example.py}} @@ -698,7 +716,7 @@ consumer.close() 连接器提供了 `assignment` 接口,用于获取 topic assignment 的功能,可以查询订阅的 topic 的消费进度,并提供 `seek` 接口,用于重置 topic 的消费进度。 -#### 获取和重置消费进度示例代码 +##### 获取和重置消费进度示例代码 ```python {{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} @@ -714,19 +732,19 @@ consumer.close() -简单写入 +##### 简单写入 ```python {{#include docs/examples/python/schemaless_insert.py}} ``` -带有 ttl 参数的写入 +##### 带有 ttl 参数的写入 ```python {{#include docs/examples/python/schemaless_insert_ttl.py}} ``` -带有 req_id 参数的写入 +##### 带有 req_id 参数的写入 ```python {{#include docs/examples/python/schemaless_insert_req_id.py}} @@ -736,19 +754,19 @@ consumer.close() -简单写入 +##### 简单写入 ```python {{#include docs/examples/python/schemaless_insert_raw.py}} ``` -带有 ttl 参数的写入 +##### 带有 ttl 参数的写入 ```python {{#include docs/examples/python/schemaless_insert_raw_ttl.py}} ``` -带有 req_id 参数的写入 +##### 带有 req_id 参数的写入 ```python {{#include docs/examples/python/schemaless_insert_raw_req_id.py}} @@ -764,7 +782,7 @@ TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写 -#### 创建 stmt +##### 创建 stmt Python 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。 @@ -775,7 +793,7 @@ conn = taos.connect() stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") ``` -#### 参数绑定 +##### 参数绑定 调用 `new_multi_binds` 函数创建 params 列表,用于参数绑定。 @@ -805,7 +823,7 @@ params[15].timestamp([None, None, 1626861392591]) stmt.bind_param_batch(params) ``` -#### 执行 sql +##### 执行 sql 调用 stmt 的 `execute` 方法执行 sql @@ -813,7 +831,7 @@ stmt.bind_param_batch(params) stmt.execute() ``` -#### 关闭 stmt +##### 关闭 stmt 最后需要关闭 stmt。 @@ -821,7 +839,7 @@ stmt.execute() stmt.close() ``` -#### 示例代码 +##### 示例代码 ```python {{#include docs/examples/python/stmt_example.py}} @@ -830,7 +848,7 @@ stmt.close() -#### 创建 stmt +##### 创建 stmt Python WebSocket 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。 @@ -841,7 +859,7 @@ conn = taosws.connect('taosws://localhost:6041/test') stmt = conn.statement() ``` -#### 解析 sql +##### 解析 sql 调用 stmt 的 `prepare` 方法来解析 insert 语句。 @@ -849,7 +867,7 @@ stmt = conn.statement() stmt.prepare("insert into t1 values (?, ?, ?, ?)") ``` -#### 参数绑定 +##### 参数绑定 调用 stmt 的 `bind_param` 方法绑定参数。 @@ -868,7 +886,7 @@ stmt.bind_param([ stmt.add_batch() ``` -#### 执行 sql +##### 执行 sql 调用 stmt 的 `execute` 方法执行 sql @@ -876,7 +894,7 @@ stmt.add_batch() stmt.execute() ``` -#### 关闭 stmt +##### 关闭 stmt 最后需要关闭 stmt。 @@ -884,7 +902,7 @@ stmt.execute() stmt.close() ``` -#### 示例代码 +##### 示例代码 ```python {{#include docs/examples/python/stmt_websocket_example.py}} diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fa092a453c..2d75424bb5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -106,7 +106,6 @@ enum { HEARTBEAT_KEY_DBINFO, HEARTBEAT_KEY_STBINFO, HEARTBEAT_KEY_TMQ, - HEARTBEAT_KEY_USER_PASSINFO, }; typedef enum _mgmt_table { @@ -636,6 +635,7 @@ typedef struct { SEpSet epSet; int32_t svrTimestamp; int32_t passVer; + int32_t authVer; char sVer[TSDB_VERSION_LEN]; char sDetailVer[128]; } SConnectRsp; @@ -703,6 +703,7 @@ int32_t tDeserializeSGetUserAuthReq(void* buf, int32_t bufLen, SGetUserAuthReq* typedef struct { char user[TSDB_USER_LEN]; int32_t version; + int32_t passVer; int8_t superAuth; int8_t sysInfo; int8_t enable; @@ -719,14 +720,6 @@ int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pR int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp); void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp); -typedef struct SUserPassVersion { - char user[TSDB_USER_LEN]; - int32_t version; -} SUserPassVersion; - -typedef SGetUserAuthReq SGetUserPassReq; -typedef SUserPassVersion SGetUserPassRsp; - /* * for client side struct, only column id, type, bytes are necessary * But for data in vnode side, we need all the following information. @@ -1070,14 +1063,6 @@ int32_t tSerializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp int32_t tDeserializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp); void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp); -typedef struct { - SArray* pArray; // Array of SGetUserPassRsp -} SUserPassBatchRsp; - -int32_t tSerializeSUserPassBatchRsp(void* buf, int32_t bufLen, SUserPassBatchRsp* pRsp); -int32_t tDeserializeSUserPassBatchRsp(void* buf, int32_t bufLen, SUserPassBatchRsp* pRsp); -void tFreeSUserPassBatchRsp(SUserPassBatchRsp* pRsp); - typedef struct { char db[TSDB_DB_FNAME_LEN]; STimeWindow timeRange; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index fa444779f3..736582dff2 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -63,7 +63,7 @@ typedef struct { // statistics int32_t reportCnt; int32_t connKeyCnt; - int32_t passKeyCnt; // with passVer call back + int8_t connHbFlag; // 0 init, 1 send req, 2 get resp int64_t reportBytes; // not implemented int64_t startTime; // ctl @@ -83,8 +83,9 @@ typedef struct { int8_t threadStop; int8_t quitByKill; TdThread thread; - TdThreadMutex lock; // used when app init and cleanup + TdThreadMutex lock; // used when app init and cleanup SHashObj* appSummary; + SHashObj* appHbHash; // key: clusterId SArray* appHbMgrs; // SArray one for each cluster FHbReqHandle reqHandle[CONN_TYPE__MAX]; FHbRspHandle rspHandle[CONN_TYPE__MAX]; @@ -146,6 +147,7 @@ typedef struct STscObj { int64_t id; // ref ID returned by taosAddRef TdThreadMutex mutex; // used to protect the operation on db int32_t numOfReqs; // number of sqlObj bound to this connection + int32_t authVer; SAppInstInfo* pAppInfo; SHashObj* pRequests; SPassInfo passInfo; diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index cbfa48b322..ccdc9bba8e 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -22,10 +22,10 @@ typedef struct { union { struct { - int64_t clusterId; - int32_t passKeyCnt; - int32_t passVer; - int32_t reqCnt; + SAppHbMgr *pAppHbMgr; + int64_t clusterId; + int32_t reqCnt; + int8_t connHbFlag; }; }; } SHbParam; @@ -34,12 +34,14 @@ static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); static void hbStopThread(); +static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp); static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; } static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } -static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { +static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog, + SAppHbMgr *pAppHbMgr) { int32_t code = 0; SUserAuthBatchRsp batchRsp = {0}; @@ -56,54 +58,61 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC catalogUpdateUserAuthInfo(pCatalog, rsp); } + if (numOfBatchs > 0) hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp); + + atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2); + taosArrayDestroy(batchRsp.pArray); return TSDB_CODE_SUCCESS; } -static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHbKey *connKey, SAppHbMgr *pAppHbMgr) { - int32_t code = 0; - int32_t numOfBatchs = 0; - SUserPassBatchRsp batchRsp = {0}; - if (tDeserializeSUserPassBatchRsp(value, valueLen, &batchRsp) != 0) { - code = TSDB_CODE_INVALID_MSG; - return code; - } - - numOfBatchs = taosArrayGetSize(batchRsp.pArray); - - SClientHbReq *pReq = NULL; - while ((pReq = taosHashIterate(pAppHbMgr->activeInfo, pReq))) { - STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid); - if (!pTscObj) { - continue; - } - SPassInfo *passInfo = &pTscObj->passInfo; - if (!passInfo->fp) { - releaseTscObj(pReq->connKey.tscRid); +static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) { + uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; + for (int i = 0; i < TARRAY_SIZE(clientHbMgr.appHbMgrs); ++i) { + SAppHbMgr *hbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); + if (!hbMgr || hbMgr->pAppInstInfo->clusterId != clusterId) { continue; } - for (int32_t i = 0; i < numOfBatchs; ++i) { - SGetUserPassRsp *rsp = taosArrayGet(batchRsp.pArray, i); - if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) { - int32_t oldVer = atomic_load_32(&passInfo->ver); - if (oldVer < rsp->version) { - atomic_store_32(&passInfo->ver, rsp->version); - if (passInfo->fp) { - (*passInfo->fp)(passInfo->param, &passInfo->ver, TAOS_NOTIFY_PASSVER); - } - tscDebug("update passVer of user %s from %d to %d, tscRid:%" PRIi64, rsp->user, oldVer, - atomic_load_32(&passInfo->ver), pTscObj->id); - } - break; + SClientHbReq *pReq = NULL; + while ((pReq = taosHashIterate(hbMgr->activeInfo, pReq))) { + STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid); + if (!pTscObj) { + continue; } + for (int32_t j = 0; j < TARRAY_SIZE(batchRsp->pArray); ++j) { + SGetUserAuthRsp *rsp = TARRAY_GET_ELEM(batchRsp->pArray, j); + + if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) { + pTscObj->authVer = rsp->version; + +#if 0 // make jenkins happy temporarily. After PR pass, enable these lines again. + if (pTscObj->sysInfo != rsp->sysInfo) { + tscDebug("update sysInfo of user %s from %" PRIi8 " to %" PRIi8 ", tscRid:%" PRIi64, rsp->user, + pTscObj->sysInfo, rsp->sysInfo, pTscObj->id); + pTscObj->sysInfo = rsp->sysInfo; + } +#endif + if (pTscObj->passInfo.fp) { + SPassInfo *passInfo = &pTscObj->passInfo; + int32_t oldVer = atomic_load_32(&passInfo->ver); + if (oldVer < rsp->passVer) { + atomic_store_32(&passInfo->ver, rsp->passVer); + if (passInfo->fp) { + (*passInfo->fp)(passInfo->param, &rsp->passVer, TAOS_NOTIFY_PASSVER); + } + tscDebug("update passVer of user %s from %d to %d, tscRid:%" PRIi64, rsp->user, oldVer, + atomic_load_32(&passInfo->ver), pTscObj->id); + } + } + + break; + } + } + releaseTscObj(pReq->connKey.tscRid); } - releaseTscObj(pReq->connKey.tscRid); } - - taosArrayDestroy(batchRsp.pArray); - - return code; + return 0; } static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) { @@ -316,7 +325,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { break; } - hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog); + hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr); break; } case HEARTBEAT_KEY_DBINFO: { @@ -353,15 +362,6 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog); break; } - case HEARTBEAT_KEY_USER_PASSINFO: { - if (kv->valueLen <= 0 || NULL == kv->value) { - tscError("invalid hb user pass info, len:%d, value:%p", kv->valueLen, kv->value); - break; - } - - hbProcessUserPassInfoRsp(kv->value, kv->valueLen, &pRsp->connKey, pAppHbMgr); - break; - } default: tscError("invalid hb key type:%d", kv->key); break; @@ -543,7 +543,7 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { return TSDB_CODE_SUCCESS; } -static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) { +static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) { STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); if (!pTscObj) { tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); @@ -552,46 +552,61 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClien int32_t code = 0; - if (param && (param->passVer != INT32_MIN) && (param->passVer <= pTscObj->passInfo.ver)) { - tscDebug("hb got user basic info, no need since passVer %d <= %d", param->passVer, pTscObj->passInfo.ver); + SKv kv = {.key = HEARTBEAT_KEY_USER_AUTHINFO}; + SKv *pKv = NULL; + if ((pKv = taosHashGet(req->info, &kv.key, sizeof(kv.key)))) { + int32_t userNum = pKv->valueLen / sizeof(SUserAuthVersion); + SUserAuthVersion *userAuths = (SUserAuthVersion *)pKv->value; + for (int32_t i = 0; i < userNum; ++i) { + SUserAuthVersion *pUserAuth = userAuths + i; + // both key and user exist, update version + if (strncmp(pUserAuth->user, pTscObj->user, TSDB_USER_LEN) == 0) { + pUserAuth->version = htonl(-1); // force get userAuthInfo + goto _return; + } + } + // key exists, user not exist, append user + SUserAuthVersion *qUserAuth = + (SUserAuthVersion *)taosMemoryRealloc(pKv->value, (userNum + 1) * sizeof(SUserAuthVersion)); + if (qUserAuth) { + strncpy((qUserAuth + userNum)->user, pTscObj->user, TSDB_USER_LEN); + (qUserAuth + userNum)->version = htonl(-1); // force get userAuthInfo + pKv->value = qUserAuth; + pKv->valueLen += sizeof(SUserAuthVersion); + } else { + code = TSDB_CODE_OUT_OF_MEMORY; + } goto _return; } - SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion)); + // key/user not exist, add user + SUserAuthVersion *user = taosMemoryMalloc(sizeof(SUserAuthVersion)); if (!user) { code = TSDB_CODE_OUT_OF_MEMORY; goto _return; } strncpy(user->user, pTscObj->user, TSDB_USER_LEN); - user->version = htonl(pTscObj->passInfo.ver); + user->version = htonl(-1); // force get userAuthInfo + kv.valueLen = sizeof(SUserAuthVersion); + kv.value = user; - SKv kv = { - .key = HEARTBEAT_KEY_USER_PASSINFO, - .valueLen = sizeof(SUserPassVersion), - .value = user, - }; - - tscDebug("hb got user basic info, valueLen:%d, user:%s, passVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user, - pTscObj->passInfo.ver, connKey->tscRid); + tscDebug("hb got user auth info, valueLen:%d, user:%s, authVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user, + pTscObj->authVer, connKey->tscRid); if (!req->info) { req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); } if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) < 0) { + taosMemoryFree(user); code = terrno ? terrno : TSDB_CODE_APP_ERROR; goto _return; } - // assign the passVer - if (param) { - param->passVer = pTscObj->passInfo.ver; - } - _return: releaseTscObj(connKey->tscRid); if (code) { - tscError("hb got user basic info failed since %s", terrstr(code)); + tscError("hb got user auth info failed since %s", terrstr(code)); } return code; @@ -749,14 +764,21 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req hbGetQueryBasicInfo(connKey, req); - if (hbParam->passKeyCnt > 0) { - hbGetUserBasicInfo(connKey, hbParam, req); - } - if (hbParam->reqCnt == 0) { - code = hbGetExpiredUserInfo(connKey, pCatalog, req); - if (TSDB_CODE_SUCCESS != code) { - return code; + if (!taosHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) { + code = hbGetExpiredUserInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + } + + // invoke after hbGetExpiredUserInfo + if (2 != atomic_load_8(&hbParam->pAppHbMgr->connHbFlag)) { + code = hbGetUserAuthInfo(connKey, hbParam, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + atomic_store_8(&hbParam->pAppHbMgr->connHbFlag, 1); } code = hbGetExpiredDBInfo(connKey, pCatalog, req); @@ -770,7 +792,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req } } - ++hbParam->reqCnt; // success to get catalog info + ++hbParam->reqCnt; // success to get catalog info return TSDB_CODE_SUCCESS; } @@ -815,9 +837,9 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { if (param.clusterId == 0) { // init param.clusterId = pOneReq->clusterId; - param.passVer = INT32_MIN; + param.pAppHbMgr = pAppHbMgr; + param.connHbFlag = atomic_load_8(&pAppHbMgr->connHbFlag); } - param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt); break; } default: @@ -901,6 +923,10 @@ static void *hbThreadFunc(void *param) { int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); if (sz > 0) { hbGatherAppInfo(); + if (sz > 1 && !clientHbMgr.appHbHash) { + clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + } + taosHashClear(clientHbMgr.appHbHash); } for (int i = 0; i < sz; i++) { @@ -953,7 +979,7 @@ static void *hbThreadFunc(void *param) { asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); tFreeClientHbBatchReq(pReq); // hbClearReqInfo(pAppHbMgr); - + taosHashPut(clientHbMgr.appHbHash, &pAppHbMgr->pAppInstInfo->clusterId, sizeof(uint64_t), NULL, 0); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); } @@ -961,6 +987,7 @@ static void *hbThreadFunc(void *param) { taosMsleep(HEARTBEAT_INTERVAL); } + taosHashCleanup(clientHbMgr.appHbHash); return NULL; } @@ -1009,7 +1036,7 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { // init stat pAppHbMgr->startTime = taosGetTimestampMs(); pAppHbMgr->connKeyCnt = 0; - pAppHbMgr->passKeyCnt = 0; + pAppHbMgr->connHbFlag = 0; pAppHbMgr->reportCnt = 0; pAppHbMgr->reportBytes = 0; pAppHbMgr->key = taosStrdup(key); @@ -1127,7 +1154,6 @@ void hbMgrCleanUp() { appHbMgrCleanup(); taosArrayDestroy(clientHbMgr.appHbMgrs); taosThreadMutexUnlock(&clientHbMgr.lock); - clientHbMgr.appHbMgrs = NULL; } @@ -1180,12 +1206,6 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) { } atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); - - taosThreadMutexLock(&pTscObj->mutex); - if (pTscObj->passInfo.fp) { - atomic_sub_fetch_32(&pAppHbMgr->passKeyCnt, 1); - } - taosThreadMutexUnlock(&pTscObj->mutex); } // set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 63b16a30c5..7ce838cd2c 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -135,11 +135,6 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type) switch (type) { case TAOS_NOTIFY_PASSVER: { taosThreadMutexLock(&pObj->mutex); - if (fp && !pObj->passInfo.fp) { - atomic_add_fetch_32(&pObj->pAppInfo->pAppHbMgr->passKeyCnt, 1); - } else if (!fp && pObj->passInfo.fp) { - atomic_sub_fetch_32(&pObj->pAppInfo->pAppHbMgr->passKeyCnt, 1); - } pObj->passInfo.fp = fp; pObj->passInfo.param = param; taosThreadMutexUnlock(&pObj->mutex); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index d6fdb29b59..9ab618cf3a 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -131,6 +131,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { pTscObj->connType = connectRsp.connType; pTscObj->passInfo.ver = connectRsp.passVer; + pTscObj->authVer = connectRsp.authVer; hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index debb93e8ba..38806b6042 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1523,6 +1523,9 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) useDb = taosHashIterate(pRsp->useDbs, useDb); } + // since 3.0.7.0 + if (tEncodeI32(pEncoder, pRsp->passVer) < 0) return -1; + return 0; } @@ -1644,6 +1647,12 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs taosHashPut(pRsp->useDbs, key, strlen(key), &ref, sizeof(ref)); taosMemoryFree(key); } + // since 3.0.7.0 + if (!tDecodeIsEnd(pDecoder)) { + if (tDecodeI32(pDecoder, &pRsp->passVer) < 0) return -1; + } else { + pRsp->passVer = 0; + } } return 0; @@ -3029,59 +3038,6 @@ void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp *pRsp) { taosArrayDestroy(pRsp->pArray); } -int32_t tSerializeSUserPassBatchRsp(void *buf, int32_t bufLen, SUserPassBatchRsp *pRsp) { - SEncoder encoder = {0}; - tEncoderInit(&encoder, buf, bufLen); - - if (tStartEncode(&encoder) < 0) return -1; - - int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); - if (tEncodeI32(&encoder, numOfBatch) < 0) return -1; - for (int32_t i = 0; i < numOfBatch; ++i) { - SGetUserPassRsp *pUserPassRsp = taosArrayGet(pRsp->pArray, i); - if (tEncodeCStr(&encoder, pUserPassRsp->user) < 0) return -1; - if (tEncodeI32(&encoder, pUserPassRsp->version) < 0) return -1; - } - tEndEncode(&encoder); - - int32_t tlen = encoder.pos; - tEncoderClear(&encoder); - return tlen; -} - -int32_t tDeserializeSUserPassBatchRsp(void *buf, int32_t bufLen, SUserPassBatchRsp *pRsp) { - SDecoder decoder = {0}; - tDecoderInit(&decoder, buf, bufLen); - - if (tStartDecode(&decoder) < 0) return -1; - - int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); - if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1; - - pRsp->pArray = taosArrayInit(numOfBatch, sizeof(SGetUserPassRsp)); - if (pRsp->pArray == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - for (int32_t i = 0; i < numOfBatch; ++i) { - SGetUserPassRsp rsp = {0}; - if (tDecodeCStrTo(&decoder, rsp.user) < 0) return -1; - if (tDecodeI32(&decoder, &rsp.version) < 0) return -1; - taosArrayPush(pRsp->pArray, &rsp); - } - tEndDecode(&decoder); - - tDecoderClear(&decoder); - return 0; -} - -void tFreeSUserPassBatchRsp(SUserPassBatchRsp *pRsp) { - if(pRsp) { - taosArrayDestroy(pRsp->pArray); - } -} - int32_t tSerializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -4159,6 +4115,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tEncodeCStr(&encoder, pRsp->sVer) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1; if (tEncodeI32(&encoder, pRsp->passVer) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->authVer) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -4188,6 +4145,12 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { } else { pRsp->passVer = 0; } + // since 3.0.7.0 + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI32(&decoder, &pRsp->authVer) < 0) return -1; + } else { + pRsp->authVer = 0; + } tEndDecode(&decoder); diff --git a/source/dnode/mnode/impl/inc/mndUser.h b/source/dnode/mnode/impl/inc/mndUser.h index 93ae38e554..8b930e7f18 100644 --- a/source/dnode/mnode/impl/inc/mndUser.h +++ b/source/dnode/mnode/impl/inc/mndUser.h @@ -35,8 +35,6 @@ SHashObj *mndDupTableHash(SHashObj *pOld); SHashObj *mndDupTopicHash(SHashObj *pOld); int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp, int32_t *pRspLen); -int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_t numOfUses, void **ppRsp, - int32_t *pRspLen); int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db); int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic); diff --git a/source/dnode/mnode/impl/src/mndPrivilege.c b/source/dnode/mnode/impl/src/mndPrivilege.c index de0374c6e8..bec516b1ee 100644 --- a/source/dnode/mnode/impl/src/mndPrivilege.c +++ b/source/dnode/mnode/impl/src/mndPrivilege.c @@ -36,7 +36,9 @@ int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp memcpy(pRsp->user, pUser->user, TSDB_USER_LEN); pRsp->superAuth = 1; pRsp->enable = pUser->enable; + pRsp->sysInfo = pUser->sysInfo; pRsp->version = pUser->authVersion; + pRsp->passVer = pUser->passVersion; return 0; } #endif \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index fbf54e80f3..01409a3880 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -288,6 +288,7 @@ _CONNECT: connectRsp.dnodeNum = mndGetDnodeSize(pMnode); connectRsp.svrTimestamp = taosGetTimestampSec(); connectRsp.passVer = pUser->passVersion; + connectRsp.authVer = pUser->authVersion; strcpy(connectRsp.sVer, version); snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, @@ -552,16 +553,6 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb } break; } - case HEARTBEAT_KEY_USER_PASSINFO: { - void *rspMsg = NULL; - int32_t rspLen = 0; - mndValidateUserPassInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserPassVersion), &rspMsg, &rspLen); - if (rspMsg && rspLen > 0) { - SKv kv1 = {.key = HEARTBEAT_KEY_USER_PASSINFO, .valueLen = rspLen, .value = rspMsg}; - taosArrayPush(hbRsp.info, &kv1); - } - break; - } default: mError("invalid kv key:%d", kv->key); hbRsp.status = TSDB_CODE_APP_ERROR; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 7a34e0f6a5..2d00e383a2 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -825,7 +825,6 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { if (mndUserDupObj(pUser, &newUser) != 0) goto _OVER; - newUser.passVersion = pUser->passVersion; if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) { char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)alterReq.pass, strlen(alterReq.pass), pass); @@ -1432,69 +1431,6 @@ _OVER: return code; } -int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_t numOfUses, void **ppRsp, - int32_t *pRspLen) { - int32_t code = 0; - SUserPassBatchRsp batchRsp = {0}; - - for (int32_t i = 0; i < numOfUses; ++i) { - SUserObj *pUser = mndAcquireUser(pMnode, pUsers[i].user); - if (pUser == NULL) { - mError("user:%s, failed to validate user pass since %s", pUsers[i].user, terrstr()); - continue; - } - - pUsers[i].version = ntohl(pUsers[i].version); - if (pUser->passVersion <= pUsers[i].version) { - mTrace("user:%s, not update since mnd passVer %d <= client passVer %d", pUsers[i].user, pUser->passVersion, - pUsers[i].version); - mndReleaseUser(pMnode, pUser); - continue; - } - - SGetUserPassRsp rsp = {0}; - memcpy(rsp.user, pUser->user, TSDB_USER_LEN); - rsp.version = pUser->passVersion; - - if (!batchRsp.pArray && !(batchRsp.pArray = taosArrayInit(numOfUses, sizeof(SGetUserPassRsp)))) { - code = TSDB_CODE_OUT_OF_MEMORY; - mndReleaseUser(pMnode, pUser); - goto _OVER; - } - - taosArrayPush(batchRsp.pArray, &rsp); - mndReleaseUser(pMnode, pUser); - } - - if (taosArrayGetSize(batchRsp.pArray) <= 0) { - goto _OVER; - } - - int32_t rspLen = tSerializeSUserPassBatchRsp(NULL, 0, &batchRsp); - if (rspLen < 0) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - void *pRsp = taosMemoryMalloc(rspLen); - if (pRsp == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - tSerializeSUserPassBatchRsp(pRsp, rspLen, &batchRsp); - - *ppRsp = pRsp; - *pRspLen = rspLen; - -_OVER: - if (code) { - *ppRsp = NULL; - *pRspLen = 0; - } - - tFreeSUserPassBatchRsp(&batchRsp); - return code; -} - int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) { int32_t code = 0; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f19fa54cbd..a293858207 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1157,7 +1157,7 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream SDecoder* pCoder = &(SDecoder){0}; SDeleteRes* pRes = &(SDeleteRes){0}; - *pRefBlock = NULL; + (*pRefBlock) = NULL; pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); if (pRes->uidList == NULL) { @@ -1197,7 +1197,7 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream taosArrayDestroy(pRes->uidList); *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); - if (pRefBlock == NULL) { + if ((*pRefBlock) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 84dcde06ac..29e59daeb9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3064,6 +3064,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr // only check here, since the iterate data in memory is very fast. if (pReader->code != TSDB_CODE_SUCCESS) { tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + taosArrayDestroy(pIndexList); return pReader->code; } @@ -5583,58 +5584,3 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { } void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; } - -/*-------------todo:refactor the implementation of those APIs in this file to seperate the API into two files------*/ -// opt perf, do NOT create so many readers -int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr) { - SQueryTableDataCond cond = {.type = TIMEWINDOW_RANGE_CONTAINED, .numOfCols = 1, .order = TSDB_ORDER_DESC, - .startVersion = -1, .endVersion = -1}; - cond.twindows.skey = INT64_MIN; - cond.twindows.ekey = INT64_MAX; - - cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); - cond.pSlotList = taosMemoryMalloc(sizeof(int32_t) * cond.numOfCols); - if (cond.colList == NULL || cond.pSlotList == NULL) { - // todo - } - - cond.colList[0].colId = 1; - cond.colList[0].slotId = 0; - cond.colList[0].type = TSDB_DATA_TYPE_TIMESTAMP; - - cond.pSlotList[0] = 0; - - STableKeyInfo* pTableKeyInfo = pTableList; - STsdbReader* pReader = NULL; - SSDataBlock* pBlock = createDataBlock(); - - SColumnInfoData data = {0}; - data.info = (SColumnInfo) {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .bytes = TSDB_KEYSIZE}; - blockDataAppendColInfo(pBlock, &data); - - int64_t key = INT64_MIN; - - for(int32_t i = 0; i < numOfTables; ++i) { - int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, (void**)&pReader, pIdStr, false, NULL); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - bool hasData = false; - code = tsdbNextDataBlock(pReader, &hasData); - if (!hasData || code != TSDB_CODE_SUCCESS) { - continue; - } - - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0); - int64_t k = *(int64_t*)pCol->pData; - - if (key < k) { - key = k; - } - - tsdbReaderClose(pReader); - } - - return 0; -} diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index cfea233a1c..832750e967 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1109,7 +1109,6 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status, &pStorageAPI->metaFilter); if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake qDebug("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid); - code = TSDB_CODE_SUCCESS; } else { qInfo("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList)); } diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 13ab5d05a5..f334ae02f6 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -150,9 +150,12 @@ static int32_t initTagColskeyBuf(int32_t* keyLen, char** keyBuf, const SArray* p int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; (*keyLen) += nullFlagSize; - (*keyBuf) = taosMemoryCalloc(1, (*keyLen)); - if ((*keyBuf) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + if (*keyLen >= 0) { + + (*keyBuf) = taosMemoryCalloc(1, (*keyLen)); + if ((*keyBuf) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 7c06e7deb3..baf2f73f90 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -27,6 +27,7 @@ SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq int32_t blockNum = pReq->blockNum; SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum); if (pArray == NULL) { + taosFreeQitem(pData); return NULL; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 46290c306f..051c664c53 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -212,8 +212,9 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { } if (taosArrayGetSize(pRes) == 0) { + taosArrayDestroy(pRes); + if (finished) { - taosArrayDestroy(pRes); qDebug("s-task:%s finish recover exec task ", pTask->id.idStr); break; } else { @@ -317,6 +318,61 @@ int32_t updateCheckPointInfo(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } +static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, + const char* id) { + int32_t retryTimes = 0; + int32_t MAX_RETRY_TIMES = 5; + + while (1) { + if (streamTaskShouldPause(&pTask->status)) { + qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks); + return TSDB_CODE_SUCCESS; + } + + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); + if (qItem == NULL) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { + taosMsleep(10); + qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes); + continue; + } + + qDebug("===stream===break batchSize:%d", *numOfBlocks); + return TSDB_CODE_SUCCESS; + } + + // do not merge blocks for sink node + if (pTask->taskLevel == TASK_LEVEL__SINK) { + *numOfBlocks = 1; + *pInput = qItem; + return TSDB_CODE_SUCCESS; + } + + if (*pInput == NULL) { + ASSERT((*numOfBlocks) == 0); + *pInput = qItem; + } else { + // todo we need to sort the data block, instead of just appending into the array list. + void* newRet = streamMergeQueueItem(*pInput, qItem); + if (newRet == NULL) { + qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); + streamQueueProcessFail(pTask->inputQueue); + return TSDB_CODE_SUCCESS; + } + + *pInput = newRet; + } + + *numOfBlocks += 1; + streamQueueProcessSuccess(pTask->inputQueue); + + if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { + qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); + return TSDB_CODE_SUCCESS; + } + } +} + /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. @@ -325,70 +381,15 @@ int32_t streamExecForAll(SStreamTask* pTask) { const char* id = pTask->id.idStr; while (1) { - int32_t batchSize = 1; - int16_t times = 0; - + int32_t batchSize = 0; SStreamQueueItem* pInput = NULL; // merge multiple input data if possible in the input queue. qDebug("s-task:%s start to extract data block from inputQ", id); - while (1) { - if (streamTaskShouldPause(&pTask->status)) { - if (batchSize > 1) { - break; - } else { - return 0; - } - } - - SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); - if (qItem == NULL) { - if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { - times++; - taosMsleep(10); - qDebug("===stream===try again batchSize:%d", batchSize); - continue; - } - - qDebug("===stream===break batchSize:%d", batchSize); - break; - } - - if (pInput == NULL) { - pInput = qItem; - streamQueueProcessSuccess(pTask->inputQueue); - if (pTask->taskLevel == TASK_LEVEL__SINK) { - break; - } - } else { - // todo we need to sort the data block, instead of just appending into the array list. - void* newRet = NULL; - if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) { - streamQueueProcessFail(pTask->inputQueue); - break; - } else { - batchSize++; - pInput = newRet; - streamQueueProcessSuccess(pTask->inputQueue); - - if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) { - qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, - MAX_STREAM_EXEC_BATCH_NUM); - break; - } - } - } - } - - if (streamTaskShouldStop(&pTask->status)) { - if (pInput) { - streamFreeQitem(pInput); - } - return 0; - } - + /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id); if (pInput == NULL) { + ASSERT(batchSize == 0); break; } @@ -403,8 +404,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (pTask->taskLevel == TASK_LEVEL__SOURCE) { int8_t status = atomic_load_8(&pTask->status.taskStatus); if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { - qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, - atomic_load_8(&pTask->status.taskStatus)); + qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, status); taosMsleep(100); } else { break; @@ -457,6 +457,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { double el = (taosGetTimestampMs() - st) / 1000.0; qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, resSize / 1048576.0, totalBlocks); + streamFreeQitem(pInput); } diff --git a/tests/script/api/passwdTest.c b/tests/script/api/passwdTest.c index 1bf4987689..1cc0072dc7 100644 --- a/tests/script/api/passwdTest.c +++ b/tests/script/api/passwdTest.c @@ -32,9 +32,21 @@ #define nRoot 10 #define nUser 10 #define USER_LEN 24 +#define BUF_LEN 256 + +typedef uint16_t VarDataLenT; + +#define TSDB_NCHAR_SIZE sizeof(int32_t) +#define VARSTR_HEADER_SIZE sizeof(VarDataLenT) + +#define GET_FLOAT_VAL(x) (*(float *)(x)) +#define GET_DOUBLE_VAL(x) (*(double *)(x)) + +#define varDataLen(v) ((VarDataLenT *)(v))[0] void createUsers(TAOS *taos, const char *host, char *qstr); void passVerTestMulti(const char *host, char *qstr); +void sysInfoTest(const char *host, char *qstr); int nPassVerNotified = 0; TAOS *taosu[nRoot] = {0}; @@ -83,6 +95,95 @@ static void queryDB(TAOS *taos, char *command) { taos_free_result(pSql); } +int printRow(char *str, TAOS_ROW row, TAOS_FIELD *fields, int numFields) { + int len = 0; + char split = ' '; + + for (int i = 0; i < numFields; ++i) { + if (i > 0) { + str[len++] = split; + } + + if (row[i] == NULL) { + len += sprintf(str + len, "%s", "NULL"); + continue; + } + + switch (fields[i].type) { + case TSDB_DATA_TYPE_TINYINT: + len += sprintf(str + len, "%d", *((int8_t *)row[i])); + break; + case TSDB_DATA_TYPE_UTINYINT: + len += sprintf(str + len, "%u", *((uint8_t *)row[i])); + break; + case TSDB_DATA_TYPE_SMALLINT: + len += sprintf(str + len, "%d", *((int16_t *)row[i])); + break; + case TSDB_DATA_TYPE_USMALLINT: + len += sprintf(str + len, "%u", *((uint16_t *)row[i])); + break; + case TSDB_DATA_TYPE_INT: + len += sprintf(str + len, "%d", *((int32_t *)row[i])); + break; + case TSDB_DATA_TYPE_UINT: + len += sprintf(str + len, "%u", *((uint32_t *)row[i])); + break; + case TSDB_DATA_TYPE_BIGINT: + len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i])); + break; + case TSDB_DATA_TYPE_UBIGINT: + len += sprintf(str + len, "%" PRIu64, *((uint64_t *)row[i])); + break; + case TSDB_DATA_TYPE_FLOAT: { + float fv = 0; + fv = GET_FLOAT_VAL(row[i]); + len += sprintf(str + len, "%f", fv); + } break; + case TSDB_DATA_TYPE_DOUBLE: { + double dv = 0; + dv = GET_DOUBLE_VAL(row[i]); + len += sprintf(str + len, "%lf", dv); + } break; + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_GEOMETRY: { + int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE); + memcpy(str + len, row[i], charLen); + len += charLen; + } break; + case TSDB_DATA_TYPE_TIMESTAMP: + len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i])); + break; + case TSDB_DATA_TYPE_BOOL: + len += sprintf(str + len, "%d", *((int8_t *)row[i])); + default: + break; + } + } + return len; +} + +static int printResult(TAOS_RES *res, char *output) { + int numFields = taos_num_fields(res); + TAOS_FIELD *fields = taos_fetch_fields(res); + char header[BUF_LEN] = {0}; + int len = 0; + for (int i = 0; i < numFields; ++i) { + len += sprintf(header + len, "%s ", fields[i].name); + } + puts(header); + if (output) { + strncpy(output, header, BUF_LEN); + } + + TAOS_ROW row = NULL; + while ((row = taos_fetch_row(res))) { + char temp[BUF_LEN] = {0}; + printRow(temp, row, fields, numFields); + puts(temp); + } +} + int main(int argc, char *argv[]) { char qstr[1024]; @@ -99,6 +200,7 @@ int main(int argc, char *argv[]) { } createUsers(taos, argv[1], qstr); passVerTestMulti(argv[1], qstr); + sysInfoTest(argv[1], qstr); taos_close(taos); taos_cleanup(); @@ -167,6 +269,8 @@ void passVerTestMulti(const char *host, char *qstr) { int nConn = nRoot + nUser; for (int i = 0; i < 15; ++i) { + printf("%s:%d [%d] second(s) elasped, passVer notification received:%d, total:%d\n", __func__, __LINE__, i, + nPassVerNotified, nConn); if (nPassVerNotified >= nConn) break; sleep(1); } @@ -175,19 +279,101 @@ void passVerTestMulti(const char *host, char *qstr) { for (int i = 0; i < nRoot; ++i) { taos_close(taos[i]); printf("%s:%d close taos[%d]\n", __func__, __LINE__, i); - sleep(1); + // sleep(1); } for (int i = 0; i < nUser; ++i) { taos_close(taosu[i]); printf("%s:%d close taosu[%d]\n", __func__, __LINE__, i); + // sleep(1); + } + + fprintf(stderr, "######## %s #########\n", __func__); + if (nPassVerNotified >= nConn) { + fprintf(stderr, ">>> succeed to get passVer notification since nNotify %d >= nConn %d\n", nPassVerNotified, + nConn); + } else { + fprintf(stderr, ">>> failed to get passVer notification since nNotify %d < nConn %d\n", nPassVerNotified, nConn); + } + fprintf(stderr, "######## %s #########\n", __func__); + // sleep(300); +} + +void sysInfoTest(const char *host, char *qstr) { + // root + TAOS *taos[nRoot] = {0}; + char userName[USER_LEN] = "root"; + + for (int i = 0; i < nRoot; ++i) { + taos[i] = taos_connect(host, "root", "taos", NULL, 0); + if (taos[i] == NULL) { + fprintf(stderr, "failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/); + exit(1); + } + } + + queryDB(taos[0], "create database if not exists demo11 vgroups 1 minrows 10"); + queryDB(taos[0], "create database if not exists demo12 vgroups 1 minrows 10"); + queryDB(taos[0], "create database if not exists demo13 vgroups 1 minrows 10"); + + queryDB(taos[0], "create table demo11.stb (ts timestamp, c1 int) tags(t1 int)"); + queryDB(taos[0], "create table demo12.stb (ts timestamp, c1 int) tags(t1 int)"); + queryDB(taos[0], "create table demo13.stb (ts timestamp, c1 int) tags(t1 int)"); + + sprintf(qstr, "show grants"); + char output[BUF_LEN]; + TAOS_RES *res = NULL; + int32_t nRep = 0; + +_REP: + fprintf(stderr, "######## %s loop:%d #########\n", __func__, nRep); + res = taos_query(taos[0], qstr); + if (taos_errno(res) != 0) { + fprintf(stderr, "%s:%d failed to execute: %s since %s\n", __func__, __LINE__, qstr, taos_errstr(res)); + taos_free_result(res); + exit(EXIT_FAILURE); + } + printResult(res, output); + taos_free_result(res); + if (!strstr(output, "timeseries")) { + fprintf(stderr, "%s:%d expected output: 'timeseries' not occur\n", __func__, __LINE__); + exit(EXIT_FAILURE); + } + + queryDB(taos[0], "alter user root sysinfo 0"); + + fprintf(stderr, "%s:%d sleep 2 seconds to wait HB take effect\n", __func__, __LINE__); + for (int i = 1; i <= 2; ++i) { sleep(1); } - if (nPassVerNotified >= nConn) { - fprintf(stderr, "succeed to get passVer notification since nNotify %d >= nConn %d\n", nPassVerNotified, nConn); - } else { - fprintf(stderr, "failed to get passVer notification since nNotify %d < nConn %d\n", nPassVerNotified, nConn); + res = taos_query(taos[0], qstr); + if (taos_errno(res) != 0) { + if (!strstr(taos_errstr(res), "Permission denied")) { + fprintf(stderr, "%s:%d expected error: 'Permission denied' not occur\n", __func__, __LINE__); + taos_free_result(res); + exit(EXIT_FAILURE); + } } - // sleep(300); + taos_free_result(res); + + queryDB(taos[0], "alter user root sysinfo 1"); + fprintf(stderr, "%s:%d sleep 2 seconds to wait HB take effect\n", __func__, __LINE__); + for (int i = 1; i <= 2; ++i) { + sleep(1); + } + + if(++nRep < 5) { + goto _REP; + } + + // close the taos_conn + for (int i = 0; i < nRoot; ++i) { + taos_close(taos[i]); + fprintf(stderr, "%s:%d close taos[%d]\n", __func__, __LINE__, i); + } + + fprintf(stderr, "######## %s #########\n", __func__); + fprintf(stderr, ">>> succeed to run sysInfoTest\n"); + fprintf(stderr, "######## %s #########\n", __func__); } \ No newline at end of file