Merge branch 'main' into doc/contrib
This commit is contained in:
commit
1ed7ab7663
|
@ -348,7 +348,7 @@ TDengine 提供了丰富的应用程序开发接口,其中包括 C/C++、Java
|
||||||
|
|
||||||
# 成为社区贡献者
|
# 成为社区贡献者
|
||||||
|
|
||||||
点击 [这里](https://www.taosdata.com/cn/contributor/),了解如何成为 TDengine 的贡献者。
|
点击 [这里](https://www.taosdata.com/contributor),了解如何成为 TDengine 的贡献者。
|
||||||
|
|
||||||
# 加入技术交流群
|
# 加入技术交流群
|
||||||
|
|
||||||
|
|
|
@ -364,6 +364,9 @@ The configuration parameters for specifying super table tag columns and data col
|
||||||
- **min**: The minimum value of the column/label of the data type. The generated value will equal or large than the minimum value.
|
- **min**: The minimum value of the column/label of the data type. The generated value will equal or large than the minimum value.
|
||||||
|
|
||||||
- **max**: The maximum value of the column/label of the data type. The generated value will less than the maximum value.
|
- **max**: The maximum value of the column/label of the data type. The generated value will less than the maximum value.
|
||||||
|
|
||||||
|
- **scalingFactor**: Floating-point precision enhancement factor, which takes effect only when the data type is float/double. It has a valid range of positive integers from 1 to 1,000,000. It is used to enhance the precision of generated floating-point numbers, particularly when the min or max values are small. This property enhances the precision after the decimal point by powers of 10: scalingFactor of 10 indicates an enhancement of 1 decimal precision, 100 indicates an enhancement of 2 decimal precision, and so on.
|
||||||
|
|
||||||
- **fun**: This column of data is filled with functions. Currently, only the sin and cos functions are supported. The input parameter is the timestamp and converted to an angle value. The conversion formula is: angle x=input time column ts value % 360. At the same time, it supports coefficient adjustment and random fluctuation factor adjustment, presented in a fixed format expression, such as fun="10\*sin(x)+100\*random(5)", where x represents the angle, ranging from 0 to 360 degrees, and the growth step size is consistent with the time column step size. 10 represents the coefficient of multiplication, 100 represents the coefficient of addition or subtraction, and 5 represents the fluctuation range within a random range of 5%. The currently supported data types are int, bigint, float, and double. Note: The expression is fixed and cannot be reversed.
|
- **fun**: This column of data is filled with functions. Currently, only the sin and cos functions are supported. The input parameter is the timestamp and converted to an angle value. The conversion formula is: angle x=input time column ts value % 360. At the same time, it supports coefficient adjustment and random fluctuation factor adjustment, presented in a fixed format expression, such as fun="10\*sin(x)+100\*random(5)", where x represents the angle, ranging from 0 to 360 degrees, and the growth step size is consistent with the time column step size. 10 represents the coefficient of multiplication, 100 represents the coefficient of addition or subtraction, and 5 represents the fluctuation range within a random range of 5%. The currently supported data types are int, bigint, float, and double. Note: The expression is fixed and cannot be reversed.
|
||||||
|
|
||||||
- **values**: The value field of the nchar/binary column/label, which will be chosen randomly from the values.
|
- **values**: The value field of the nchar/binary column/label, which will be chosen randomly from the values.
|
||||||
|
|
|
@ -14,7 +14,9 @@ TDengine 完整的软件包包括服务端(taosd)、应用驱动(taosc)
|
||||||
|
|
||||||
为方便使用,标准的服务端安装包包含了 taosd、taosAdapter、taosc、taos、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 Lite 版本的安装包。
|
为方便使用,标准的服务端安装包包含了 taosd、taosAdapter、taosc、taos、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 Lite 版本的安装包。
|
||||||
|
|
||||||
在 Linux 系统上,TDengine 社区版提供 Deb 和 RPM 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 Deb 支持 Debian/Ubuntu 及其衍生系统,RPM 支持 CentOS/RHEL/SUSE 及其衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。需要注意的是,RPM 和 Deb 包不含 `taosdump` 和 TDinsight 安装脚本,这些工具需要通过安装 taosTools 包获得。TDengine 也提供 Windows x64 平台和 macOS x64/m1 平台的安装包。
|
在 Linux 系统上,TDengine 社区版提供 Deb 和 RPM 格式安装包,其中 Deb 支持 Debian/Ubuntu 及其衍生系统,RPM 支持 CentOS/RHEL/SUSE 及其衍生系统,用户可以根据自己的运行环境自行选择。同时我们也提供了 tar.gz 格式安装包,以及 `apt-get` 工具从线上进行安装。
|
||||||
|
|
||||||
|
此外,TDengine 也提供 macOS x64/m1 平台的 pkg 安装包。
|
||||||
|
|
||||||
## 运行环境要求
|
## 运行环境要求
|
||||||
在linux系统中,运行环境最低要求如下:
|
在linux系统中,运行环境最低要求如下:
|
||||||
|
|
|
@ -364,6 +364,8 @@ taosBenchmark -A INT,DOUBLE,NCHAR,BINARY\(16\)
|
||||||
|
|
||||||
- **max** : 数据类型的 列/标签 的最大值。生成的值将小于最小值。
|
- **max** : 数据类型的 列/标签 的最大值。生成的值将小于最小值。
|
||||||
|
|
||||||
|
- **scalingFactor** : 浮点数精度增强因子,仅当数据类型是 float/double 时生效,有效值范围为 1 至 1000000 的正整数。用于增强生成浮点数的精度,特别是在 min 或 max 值较小的情况下。此属性按 10 的幂次增强小数点后的精度:scalingFactor 为 10 表示增强 1 位小数精度,100 表示增强 2 位,依此类推。
|
||||||
|
|
||||||
- **fun** : 此列数据以函数填充,目前只支持 sin 和 cos 两函数,输入参数为时间戳换算成角度值,换算公式: 角度 x = 输入的时间列ts值 % 360。同时支持系数调节,随机波动因子调节,以固定格式的表达式展现,如 fun=“10\*sin(x)+100\*random(5)” , x 表示角度,取值 0 ~ 360度,增长步长与时间列步长一致。10 表示乘的系数,100 表示加或减的系数,5 表示波动幅度在 5% 的随机范围内。目前支持的数据类型为 int, bigint, float, double 四种数据类型。注意:表达式为固定模式,不可前后颠倒。
|
- **fun** : 此列数据以函数填充,目前只支持 sin 和 cos 两函数,输入参数为时间戳换算成角度值,换算公式: 角度 x = 输入的时间列ts值 % 360。同时支持系数调节,随机波动因子调节,以固定格式的表达式展现,如 fun=“10\*sin(x)+100\*random(5)” , x 表示角度,取值 0 ~ 360度,增长步长与时间列步长一致。10 表示乘的系数,100 表示加或减的系数,5 表示波动幅度在 5% 的随机范围内。目前支持的数据类型为 int, bigint, float, double 四种数据类型。注意:表达式为固定模式,不可前后颠倒。
|
||||||
|
|
||||||
- **values** : nchar/binary 列/标签的值域,将从值中随机选择。
|
- **values** : nchar/binary 列/标签的值域,将从值中随机选择。
|
||||||
|
|
|
@ -212,11 +212,11 @@ TDengine 对于修改数据提供两种处理方式,由 IGNORE UPDATE 选项
|
||||||
```sql
|
```sql
|
||||||
[field1_name,...]
|
[field1_name,...]
|
||||||
```
|
```
|
||||||
用来指定stb_name的列与subquery输出结果的对应关系。如果stb_name的列与subquery输出结果的位置、数量全部匹配,则不需要显示指定对应关系。如果stb_name的列与subquery输出结果的数据类型不匹配,会把subquery输出结果的类型转换成对应的stb_name的列的类型。
|
在本页文档顶部的 [field1_name,...] 是用来指定 stb_name 的列与 subquery 输出结果的对应关系的。如果 stb_name 的列与 subquery 输出结果的位置、数量全部匹配,则不需要显示指定对应关系。如果 stb_name 的列与 subquery 输出结果的数据类型不匹配,会把 subquery 输出结果的类型转换成对应的 stb_name 的列的类型。
|
||||||
|
|
||||||
对于已经存在的超级表,检查列的schema信息
|
对于已经存在的超级表,检查列的schema信息
|
||||||
1. 检查列的schema信息是否匹配,对于不匹配的,则自动进行类型转换,当前只有数据长度大于4096byte时才报错,其余场景都能进行类型转换。
|
1. 检查列的 schema 信息是否匹配,对于不匹配的,则自动进行类型转换,当前只有数据长度大于 4096byte 时才报错,其余场景都能进行类型转换。
|
||||||
2. 检查列的个数是否相同,如果不同,需要显示的指定超级表与subquery的列的对应关系,否则报错;如果相同,可以指定对应关系,也可以不指定,不指定则按位置顺序对应。
|
2. 检查列的个数是否相同,如果不同,需要显示的指定超级表与 subquery 的列的对应关系,否则报错;如果相同,可以指定对应关系,也可以不指定,不指定则按位置顺序对应。
|
||||||
|
|
||||||
## 自定义TAG
|
## 自定义TAG
|
||||||
|
|
||||||
|
|
|
@ -6,15 +6,28 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表"
|
||||||
|
|
||||||
## TDengine 服务端支持的平台列表
|
## TDengine 服务端支持的平台列表
|
||||||
|
|
||||||
| | **Windows server 2016/2019** | **Windows 10/11** | **CentOS 7.9/8** | **Ubuntu 18 以上** | **统信 UOS** | **银河/中标麒麟** | **凝思 V60/V80** | **macOS** |
|
| | **版本** | **X64 64bit** | **ARM64** |
|
||||||
| ------------ | ---------------------------- | ----------------- | ---------------- | ------------------ | ------------ | ----------------- | ---------------- | --------- |
|
| ----------------------|----------------| ------------- | --------- |
|
||||||
| X64 | ●/E | ●/E | ● | ● | ●/E | ●/E | ●/E | ● |
|
| **CentOS** | **7.9 以上** | ● | ● |
|
||||||
| 树莓派 ARM64 | | | ● | | | | | |
|
| **Ubuntu** | **18 以上** | ● | ● |
|
||||||
| 华为云 ARM64 | | | | ● | | | | |
|
| **RedHat** | **RHEL 7 以上** | ● | ● |
|
||||||
| M1 | | | | | | | | ● |
|
| **Debian** | **6.0 以上** | ● | ● |
|
||||||
|
| **FreeBSD** | **12 以上** | ● | ● |
|
||||||
|
| **OpenSUSE** | **全部版本** | ● | ● |
|
||||||
|
| **SUSE Linux** | **11 以上** | ● | ● |
|
||||||
|
| **Fedora** | **21 以上** | ● | ● |
|
||||||
|
| **Windows Server** | **2016 以上** | ●/E | |
|
||||||
|
| **Windows** | **10/11** | ●/E | |
|
||||||
|
| **银河麒麟** | **V10 以上** | ●/E | ●/E |
|
||||||
|
| **中标麒麟** | **V7.0 以上** | ●/E | ●/E |
|
||||||
|
| **统信 UOS** | **V20 以上** | ●/E | |
|
||||||
|
| **凝思磐石** | **V8.0 以上** | ●/E | |
|
||||||
|
| **华为欧拉 openEuler** | **V20.03 以上** | ●/E | |
|
||||||
|
| **龙蜥 Anolis OS** | **V8.6 以上** | ●/E | |
|
||||||
|
| **macOS** | **11.0 以上** | | ● |
|
||||||
|
|
||||||
注:1) ● 表示经过官方测试验证, ○ 表示非官方测试验证,E 表示仅企业版支持。
|
注:1) ● 表示经过官方测试验证, ○ 表示非官方测试验证,E 表示仅企业版支持。
|
||||||
2) 社区版仅支持主流操作系统的较新版本,包括 Ubuntu 18+/CentOS 7+/RedHat/Debian/CoreOS/FreeBSD/OpenSUSE/SUSE Linux/Fedora/macOS 等。如果有其他操作系统及版本的需求,请联系企业版支持。
|
2) 社区版仅支持主流操作系统的较新版本,包括 Ubuntu 18+/CentOS 7+/CentOS Stream/RedHat/Debian/CoreOS/FreeBSD/OpenSUSE/SUSE Linux/Fedora/macOS 等。如果有其他操作系统及版本的需求,请联系企业版支持。
|
||||||
|
|
||||||
## TDengine 客户端和连接器支持的平台列表
|
## TDengine 客户端和连接器支持的平台列表
|
||||||
|
|
||||||
|
@ -22,16 +35,16 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表"
|
||||||
|
|
||||||
对照矩阵如下:
|
对照矩阵如下:
|
||||||
|
|
||||||
| **CPU** | **X64 64bit** | **X64 64bit** | **ARM64** | **X64 64bit** | **ARM64** |
|
| **CPU** | **X64 64bit** | **X64 64bit** | **X64 64bit** | **ARM64** | **ARM64** |
|
||||||
| ----------- | ------------- | ------------- | --------- | ------------- | --------- |
|
| ----------- | ------------- | ------------- | ------------- | --------- | --------- |
|
||||||
| **OS** | **Linux** | **Win64** | **Linux** | **macOS** | **macOS** |
|
| **OS** | **Linux** | **Win64** | **macOS** | **Linux** | **macOS** |
|
||||||
| **C/C++** | ● | ● | ● | ● | ● |
|
| **C/C++** | ● | ● | ● | ● | ● |
|
||||||
| **JDBC** | ● | ● | ● | ○ | ○ |
|
| **JDBC** | ● | ● | ○ | ● | ○ |
|
||||||
| **Python** | ● | ● | ● | ● | ● |
|
| **Python** | ● | ● | ● | ● | ● |
|
||||||
| **Go** | ● | ● | ● | ● | ● |
|
| **Go** | ● | ● | ● | ● | ● |
|
||||||
| **NodeJs** | ● | ● | ● | ○ | ○ |
|
| **NodeJs** | ● | ● | ○ | ● | ○ |
|
||||||
| **C#** | ● | ● | ○ | ○ | ○ |
|
| **C#** | ● | ● | ○ | ● | ○ |
|
||||||
| **Rust** | ● | ● | ○ | ● | ● |
|
| **Rust** | ● | ● | ● | ○ | ● |
|
||||||
| **RESTful** | ● | ● | ● | ● | ● |
|
| **RESTful** | ● | ● | ● | ● | ● |
|
||||||
|
|
||||||
注:● 表示官方测试验证通过,○ 表示非官方测试验证通过,-- 表示未经验证。
|
注:● 表示官方测试验证通过,○ 表示非官方测试验证通过,-- 表示未经验证。
|
||||||
|
|
|
@ -178,7 +178,7 @@ TDengine 集群可以容纳单个、多个甚至几千个数据节点。应用
|
||||||
|
|
||||||
TDengine 存储的数据包括采集的时序数据以及库、表相关的元数据、标签数据等,这些数据具体分为三部分:
|
TDengine 存储的数据包括采集的时序数据以及库、表相关的元数据、标签数据等,这些数据具体分为三部分:
|
||||||
|
|
||||||
- 时序数据:TDengine 的核心存储对象,存放于 vnode 里,由 data、head 和 last 三个文件组成,数据量大,查询量取决于应用场景。允许乱序写入,但暂时不支持删除操作,并且仅在 update 参数设置为 1 时允许更新操作。通过采用一个采集点一张表的模型,一个时间段的数据是连续存储,对单张表的写入是简单的追加操作,一次读,可以读到多条记录,这样保证对单个采集点的插入和查询操作,性能达到最优。
|
- 时序数据:时序数据是 TDengine 的核心存储对象,它们被存储在 vnode 中。时序数据由 data、head、sma 和 stt 4 类文件组成,这些文件共同构成了时序数据的完整存储结构。由于时序数据的特点是数据量大且查询需求取决于具体应用场景,因此 TDengine 采用了“一个数据采集点一张表”的模型来优化存储和查询性能。在这种模型下,一个时间段内的数据是连续存储的,对单张表的写入是简单的追加操作,一次读取可以获取多条记录。这种设计确保了单个数据采集点的写入和查询操作都能达到最优性能。
|
||||||
- 数据表元数据:包含标签信息和 Table Schema 信息,存放于 vnode 里的 meta 文件,支持增删改查四个标准操作。数据量很大,有 N 张表,就有 N 条记录,因此采用 LRU 存储,支持标签数据的索引。TDengine 支持多核多线程并发查询。只要计算内存足够,元数据全内存存储,千万级别规模的标签数据过滤结果能毫秒级返回。在内存资源不足的情况下,仍然可以支持数千万张表的快速查询。
|
- 数据表元数据:包含标签信息和 Table Schema 信息,存放于 vnode 里的 meta 文件,支持增删改查四个标准操作。数据量很大,有 N 张表,就有 N 条记录,因此采用 LRU 存储,支持标签数据的索引。TDengine 支持多核多线程并发查询。只要计算内存足够,元数据全内存存储,千万级别规模的标签数据过滤结果能毫秒级返回。在内存资源不足的情况下,仍然可以支持数千万张表的快速查询。
|
||||||
- 数据库元数据:存放于 mnode 里,包含系统节点、用户、DB、STable Schema 等信息,支持增删改查四个标准操作。这部分数据的量不大,可以全内存保存,而且由于客户端有缓存,查询量也不大。因此目前的设计虽是集中式存储管理,但不会构成性能瓶颈。
|
- 数据库元数据:存放于 mnode 里,包含系统节点、用户、DB、STable Schema 等信息,支持增删改查四个标准操作。这部分数据的量不大,可以全内存保存,而且由于客户端有缓存,查询量也不大。因此目前的设计虽是集中式存储管理,但不会构成性能瓶颈。
|
||||||
|
|
||||||
|
|
|
@ -353,6 +353,7 @@ typedef struct {
|
||||||
queue node;
|
queue node;
|
||||||
void (*freeFunc)(void* arg);
|
void (*freeFunc)(void* arg);
|
||||||
int32_t size;
|
int32_t size;
|
||||||
|
int8_t inited;
|
||||||
} STransQueue;
|
} STransQueue;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -127,10 +127,12 @@ typedef struct {
|
||||||
typedef struct SCliReq {
|
typedef struct SCliReq {
|
||||||
SReqCtx* ctx;
|
SReqCtx* ctx;
|
||||||
queue q;
|
queue q;
|
||||||
|
queue sendQ;
|
||||||
STransMsgType type;
|
STransMsgType type;
|
||||||
uint64_t st;
|
uint64_t st;
|
||||||
int64_t seq;
|
int64_t seq;
|
||||||
int32_t sent; //(0: no send, 1: alread sent)
|
int32_t sent; //(0: no send, 1: alread sent)
|
||||||
|
int8_t inSendQ;
|
||||||
STransMsg msg;
|
STransMsg msg;
|
||||||
int8_t inRetry;
|
int8_t inRetry;
|
||||||
|
|
||||||
|
@ -274,6 +276,8 @@ static FORCE_INLINE void destroyReqAndAhanlde(void* cmsg);
|
||||||
static FORCE_INLINE int cliRBChoseIdx(STrans* pInst);
|
static FORCE_INLINE int cliRBChoseIdx(STrans* pInst);
|
||||||
static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx);
|
static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx);
|
||||||
|
|
||||||
|
static FORCE_INLINE void removeReqFromSendQ(SCliReq* pReq);
|
||||||
|
|
||||||
static int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq);
|
static int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq);
|
||||||
static int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead);
|
static int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead);
|
||||||
static int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp);
|
static int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp);
|
||||||
|
@ -453,6 +457,7 @@ static bool filteBySeq(void* key, void* arg) {
|
||||||
SFiterArg* targ = arg;
|
SFiterArg* targ = arg;
|
||||||
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
|
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
|
||||||
if (pReq->seq == targ->seq && pReq->msg.msgType + 1 == targ->msgType) {
|
if (pReq->seq == targ->seq && pReq->msg.msgType + 1 == targ->msgType) {
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
|
@ -539,6 +544,7 @@ bool filterByQid(void* key, void* arg) {
|
||||||
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
|
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
|
||||||
|
|
||||||
if (pReq->msg.info.qId == *qid) {
|
if (pReq->msg.info.qId == *qid) {
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
|
@ -600,7 +606,7 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead
|
||||||
queue* el = QUEUE_HEAD(&set);
|
queue* el = QUEUE_HEAD(&set);
|
||||||
QUEUE_REMOVE(el);
|
QUEUE_REMOVE(el);
|
||||||
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
STraceId* trace = &pReq->msg.info.traceId;
|
STraceId* trace = &pReq->msg.info.traceId;
|
||||||
tGDebug("start to free msg %p", pReq);
|
tGDebug("start to free msg %p", pReq);
|
||||||
destroyReqWrapper(pReq, pThrd);
|
destroyReqWrapper(pReq, pThrd);
|
||||||
|
@ -700,6 +706,7 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
|
|
||||||
code = cliBuildRespFromCont(pReq, &resp, pHead);
|
code = cliBuildRespFromCont(pReq, &resp, pHead);
|
||||||
STraceId* trace = &resp.info.traceId;
|
STraceId* trace = &resp.info.traceId;
|
||||||
|
@ -905,6 +912,10 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SCliThrd* thrd = conn->hostThrd;
|
SCliThrd* thrd = conn->hostThrd;
|
||||||
|
if (thrd->quit == true) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
cliResetConnTimer(conn);
|
cliResetConnTimer(conn);
|
||||||
if (conn->list == NULL && conn->dstAddr != NULL) {
|
if (conn->list == NULL && conn->dstAddr != NULL) {
|
||||||
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr));
|
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr));
|
||||||
|
@ -1092,6 +1103,7 @@ _failed:
|
||||||
transQueueDestroy(&conn->reqsToSend);
|
transQueueDestroy(&conn->reqsToSend);
|
||||||
transQueueDestroy(&conn->reqsSentOut);
|
transQueueDestroy(&conn->reqsSentOut);
|
||||||
taosMemoryFree(conn->dstAddr);
|
taosMemoryFree(conn->dstAddr);
|
||||||
|
taosMemoryFree(conn->ipStr);
|
||||||
}
|
}
|
||||||
tError("failed to create conn, code:%d", code);
|
tError("failed to create conn, code:%d", code);
|
||||||
taosMemoryFree(conn);
|
taosMemoryFree(conn);
|
||||||
|
@ -1216,6 +1228,7 @@ static FORCE_INLINE void destroyReqInQueue(SCliConn* conn, queue* set, int32_t c
|
||||||
QUEUE_REMOVE(el);
|
QUEUE_REMOVE(el);
|
||||||
|
|
||||||
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
notifyAndDestroyReq(conn, pReq, code);
|
notifyAndDestroyReq(conn, pReq, code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1246,8 +1259,8 @@ static void cliHandleException(SCliConn* conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cliDestroyAllQidFromThrd(conn);
|
cliDestroyAllQidFromThrd(conn);
|
||||||
QUEUE_REMOVE(&conn->q);
|
if (pThrd->quit == false && conn->list) {
|
||||||
if (conn->list) {
|
QUEUE_REMOVE(&conn->q);
|
||||||
conn->list->totalSize -= 1;
|
conn->list->totalSize -= 1;
|
||||||
conn->list = NULL;
|
conn->list = NULL;
|
||||||
}
|
}
|
||||||
|
@ -1273,7 +1286,8 @@ static void cliHandleException(SCliConn* conn) {
|
||||||
bool filterToRmReq(void* h, void* arg) {
|
bool filterToRmReq(void* h, void* arg) {
|
||||||
queue* el = h;
|
queue* el = h;
|
||||||
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
||||||
if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) {
|
if (pReq->sent == 1 && pReq->inSendQ == 0 && REQUEST_NO_RESP(&pReq->msg)) {
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -1300,12 +1314,18 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
|
||||||
SCliThrd* pThrd = conn->hostThrd;
|
SCliThrd* pThrd = conn->hostThrd;
|
||||||
STrans* pInst = pThrd->pInst;
|
STrans* pInst = pThrd->pInst;
|
||||||
|
|
||||||
|
while (!QUEUE_IS_EMPTY(&wrapper->node)) {
|
||||||
|
queue* h = QUEUE_HEAD(&wrapper->node);
|
||||||
|
SCliReq* pReq = QUEUE_DATA(h, SCliReq, sendQ);
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
|
}
|
||||||
freeWReqToWQ(&conn->wq, wrapper);
|
freeWReqToWQ(&conn->wq, wrapper);
|
||||||
|
|
||||||
int32_t ref = transUnrefCliHandle(conn);
|
int32_t ref = transUnrefCliHandle(conn);
|
||||||
if (ref <= 0) {
|
if (ref <= 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
cliConnRmReqs(conn);
|
cliConnRmReqs(conn);
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
tDebug("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
|
tDebug("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
|
||||||
|
@ -1340,6 +1360,9 @@ bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msg
|
||||||
}
|
}
|
||||||
STransMsgHead* pHead = *ppHead;
|
STransMsgHead* pHead = *ppHead;
|
||||||
STransMsgHead* tHead = taosMemoryCalloc(1, *msgLen + sizeof(pInst->user));
|
STransMsgHead* tHead = taosMemoryCalloc(1, *msgLen + sizeof(pInst->user));
|
||||||
|
if (tHead == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
memcpy((char*)tHead, (char*)pHead, TRANS_MSG_OVERHEAD);
|
memcpy((char*)tHead, (char*)pHead, TRANS_MSG_OVERHEAD);
|
||||||
memcpy((char*)tHead + TRANS_MSG_OVERHEAD, pInst->user, sizeof(pInst->user));
|
memcpy((char*)tHead + TRANS_MSG_OVERHEAD, pInst->user, sizeof(pInst->user));
|
||||||
|
|
||||||
|
@ -1398,6 +1421,10 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
|
||||||
|
|
||||||
int j = 0;
|
int j = 0;
|
||||||
int32_t batchLimit = 64;
|
int32_t batchLimit = 64;
|
||||||
|
|
||||||
|
queue reqToSend;
|
||||||
|
QUEUE_INIT(&reqToSend);
|
||||||
|
|
||||||
while (!transQueueEmpty(&pConn->reqsToSend)) {
|
while (!transQueueEmpty(&pConn->reqsToSend)) {
|
||||||
queue* h = transQueuePop(&pConn->reqsToSend);
|
queue* h = transQueuePop(&pConn->reqsToSend);
|
||||||
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, q);
|
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, q);
|
||||||
|
@ -1422,6 +1449,10 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
|
||||||
if (cliConnMayAddUserInfo(pConn, &pHead, &msgLen)) {
|
if (cliConnMayAddUserInfo(pConn, &pHead, &msgLen)) {
|
||||||
content = transContFromHead(pHead);
|
content = transContFromHead(pHead);
|
||||||
contLen = transContLenFromMsg(msgLen);
|
contLen = transContLenFromMsg(msgLen);
|
||||||
|
} else {
|
||||||
|
if (pConn->userInited == 0) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (pHead->comp == 0) {
|
if (pHead->comp == 0) {
|
||||||
pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0;
|
pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0;
|
||||||
|
@ -1447,30 +1478,51 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
|
||||||
wb[j++] = uv_buf_init((char*)pHead, msgLen);
|
wb[j++] = uv_buf_init((char*)pHead, msgLen);
|
||||||
totalLen += msgLen;
|
totalLen += msgLen;
|
||||||
|
|
||||||
pCliMsg->sent = 1;
|
|
||||||
pCliMsg->seq = pConn->seq;
|
pCliMsg->seq = pConn->seq;
|
||||||
|
pCliMsg->sent = 1;
|
||||||
|
|
||||||
STraceId* trace = &pCliMsg->msg.info.traceId;
|
STraceId* trace = &pCliMsg->msg.info.traceId;
|
||||||
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn),
|
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn),
|
||||||
pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId);
|
pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId);
|
||||||
|
|
||||||
transQueuePush(&pConn->reqsSentOut, &pCliMsg->q);
|
transQueuePush(&pConn->reqsSentOut, &pCliMsg->q);
|
||||||
|
QUEUE_INIT(&pCliMsg->sendQ);
|
||||||
|
QUEUE_PUSH(&reqToSend, &pCliMsg->sendQ);
|
||||||
|
|
||||||
|
pCliMsg->inSendQ = 1;
|
||||||
if (j >= batchLimit) {
|
if (j >= batchLimit) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
transRefCliHandle(pConn);
|
transRefCliHandle(pConn);
|
||||||
uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn);
|
uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn);
|
||||||
|
|
||||||
if (req == NULL) {
|
if (req == NULL) {
|
||||||
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno));
|
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno));
|
||||||
|
while (!QUEUE_IS_EMPTY(&reqToSend)) {
|
||||||
|
queue* h = QUEUE_HEAD(&reqToSend);
|
||||||
|
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ);
|
||||||
|
removeReqFromSendQ(pCliMsg);
|
||||||
|
}
|
||||||
|
|
||||||
transRefCliHandle(pConn);
|
transRefCliHandle(pConn);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SWReqsWrapper* pWreq = req->data;
|
||||||
|
|
||||||
|
QUEUE_MOVE(&reqToSend, &pWreq->node);
|
||||||
tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, j, totalLen);
|
tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, j, totalLen);
|
||||||
|
|
||||||
int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb);
|
int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret));
|
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret));
|
||||||
|
while (!QUEUE_IS_EMPTY(&pWreq->node)) {
|
||||||
|
queue* h = QUEUE_HEAD(&pWreq->node);
|
||||||
|
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ);
|
||||||
|
removeReqFromSendQ(pCliMsg);
|
||||||
|
}
|
||||||
|
|
||||||
freeWReqToWQ(&pConn->wq, req->data);
|
freeWReqToWQ(&pConn->wq, req->data);
|
||||||
code = TSDB_CODE_THIRDPARTY_ERROR;
|
code = TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
TAOS_UNUSED(transUnrefCliHandle(pConn));
|
TAOS_UNUSED(transUnrefCliHandle(pConn));
|
||||||
|
@ -2182,11 +2234,21 @@ static void cliAsyncCb(uv_async_t* handle) {
|
||||||
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd, pThrd->stopMsg);
|
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd, pThrd->stopMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void removeReqFromSendQ(SCliReq* pReq) {
|
||||||
|
if (pReq == NULL || pReq->inSendQ == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
QUEUE_REMOVE(&pReq->sendQ);
|
||||||
|
pReq->inSendQ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void destroyReq(void* arg) {
|
static FORCE_INLINE void destroyReq(void* arg) {
|
||||||
SCliReq* pReq = arg;
|
SCliReq* pReq = arg;
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
STraceId* trace = &pReq->msg.info.traceId;
|
STraceId* trace = &pReq->msg.info.traceId;
|
||||||
tGDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx);
|
tGDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx);
|
||||||
|
|
||||||
|
@ -2961,6 +3023,7 @@ int32_t cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
|
||||||
STrans* pInst = pThrd->pInst;
|
STrans* pInst = pThrd->pInst;
|
||||||
|
|
||||||
if (pReq != NULL) {
|
if (pReq != NULL) {
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
if (pResp->code != TSDB_CODE_SUCCESS) {
|
if (pResp->code != TSDB_CODE_SUCCESS) {
|
||||||
if (cliMayRetry(pConn, pReq, pResp)) {
|
if (cliMayRetry(pConn, pReq, pResp)) {
|
||||||
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
|
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
|
||||||
|
@ -3114,7 +3177,7 @@ static int32_t transInitMsg(void* pInstRef, const SEpSet* pEpSet, STransMsg* pRe
|
||||||
if (ctx != NULL) pCtx->userCtx = *ctx;
|
if (ctx != NULL) pCtx->userCtx = *ctx;
|
||||||
|
|
||||||
pCliReq = taosMemoryCalloc(1, sizeof(SCliReq));
|
pCliReq = taosMemoryCalloc(1, sizeof(SCliReq));
|
||||||
if (pReq == NULL) {
|
if (pCliReq == NULL) {
|
||||||
TAOS_CHECK_GOTO(terrno, NULL, _exception);
|
TAOS_CHECK_GOTO(terrno, NULL, _exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3183,6 +3246,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int8_t transIdInited = 0;
|
||||||
|
|
||||||
STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef);
|
STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef);
|
||||||
if (pInst == NULL) {
|
if (pInst == NULL) {
|
||||||
|
@ -3200,6 +3264,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
|
||||||
if (exh == NULL) {
|
if (exh == NULL) {
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _exception);
|
TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _exception);
|
||||||
}
|
}
|
||||||
|
transIdInited = 1;
|
||||||
|
|
||||||
pReq->info.handle = (void*)(*transpointId);
|
pReq->info.handle = (void*)(*transpointId);
|
||||||
pReq->info.qId = *transpointId;
|
pReq->info.qId = *transpointId;
|
||||||
|
@ -3216,9 +3281,6 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
|
||||||
return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
|
return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (pReq->msgType == TDMT_SCH_DROP_TASK) {
|
|
||||||
// TAOS_UNUSED(transReleaseCliHandle(pReq->info.handle));
|
|
||||||
// }
|
|
||||||
transReleaseExHandle(transGetRefMgt(), *transpointId);
|
transReleaseExHandle(transGetRefMgt(), *transpointId);
|
||||||
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
|
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -3226,6 +3288,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
|
||||||
_exception:
|
_exception:
|
||||||
transFreeMsg(pReq->pCont);
|
transFreeMsg(pReq->pCont);
|
||||||
pReq->pCont = NULL;
|
pReq->pCont = NULL;
|
||||||
|
if (transIdInited) transReleaseExHandle(transGetRefMgt(), *transpointId);
|
||||||
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
|
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
|
||||||
|
|
||||||
tError("failed to send request since %s", tstrerror(code));
|
tError("failed to send request since %s", tstrerror(code));
|
||||||
|
@ -3641,6 +3704,7 @@ bool filterTimeoutReq(void* key, void* arg) {
|
||||||
if (pReq->msg.info.qId == 0 && !REQUEST_NO_RESP(&pReq->msg) && pReq->ctx) {
|
if (pReq->msg.info.qId == 0 && !REQUEST_NO_RESP(&pReq->msg) && pReq->ctx) {
|
||||||
int64_t elapse = ((st - pReq->st) / 1000000);
|
int64_t elapse = ((st - pReq->st) / 1000000);
|
||||||
if (listArg && elapse >= listArg->pInst->readTimeout) {
|
if (listArg && elapse >= listArg->pInst->readTimeout) {
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -423,6 +423,7 @@ int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) {
|
||||||
QUEUE_INIT(&wq->node);
|
QUEUE_INIT(&wq->node);
|
||||||
wq->freeFunc = (void (*)(void*))freeFunc;
|
wq->freeFunc = (void (*)(void*))freeFunc;
|
||||||
wq->size = 0;
|
wq->size = 0;
|
||||||
|
wq->inited = 1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
void transQueuePush(STransQueue* q, void* arg) {
|
void transQueuePush(STransQueue* q, void* arg) {
|
||||||
|
@ -497,6 +498,7 @@ void transQueueRemove(STransQueue* q, void* e) {
|
||||||
bool transQueueEmpty(STransQueue* q) { return q->size == 0 ? true : false; }
|
bool transQueueEmpty(STransQueue* q) { return q->size == 0 ? true : false; }
|
||||||
|
|
||||||
void transQueueClear(STransQueue* q) {
|
void transQueueClear(STransQueue* q) {
|
||||||
|
if (q->inited == 0) return;
|
||||||
while (!QUEUE_IS_EMPTY(&q->node)) {
|
while (!QUEUE_IS_EMPTY(&q->node)) {
|
||||||
queue* h = QUEUE_HEAD(&q->node);
|
queue* h = QUEUE_HEAD(&q->node);
|
||||||
QUEUE_REMOVE(h);
|
QUEUE_REMOVE(h);
|
||||||
|
|
|
@ -1289,8 +1289,8 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SWorkThrd* pThrd = hThrd;
|
SWorkThrd* pThrd = hThrd;
|
||||||
int32_t lino;
|
int32_t lino;
|
||||||
|
int8_t wqInited = 0;
|
||||||
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
|
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
|
||||||
}
|
}
|
||||||
|
@ -1340,6 +1340,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
|
||||||
|
|
||||||
code = initWQ(&pConn->wq);
|
code = initWQ(&pConn->wq);
|
||||||
TAOS_CHECK_GOTO(code, &lino, _end);
|
TAOS_CHECK_GOTO(code, &lino, _end);
|
||||||
|
wqInited = 1;
|
||||||
|
|
||||||
// init client handle
|
// init client handle
|
||||||
pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
|
pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
|
||||||
|
@ -1372,7 +1373,7 @@ _end:
|
||||||
transDestroyBuffer(&pConn->readBuf);
|
transDestroyBuffer(&pConn->readBuf);
|
||||||
taosHashCleanup(pConn->pQTable);
|
taosHashCleanup(pConn->pQTable);
|
||||||
taosMemoryFree(pConn->pTcp);
|
taosMemoryFree(pConn->pTcp);
|
||||||
destroyWQ(&pConn->wq);
|
if (wqInited) destroyWQ(&pConn->wq);
|
||||||
taosMemoryFree(pConn->buf);
|
taosMemoryFree(pConn->buf);
|
||||||
taosMemoryFree(pConn);
|
taosMemoryFree(pConn);
|
||||||
pConn = NULL;
|
pConn = NULL;
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
add_executable(transportTest "")
|
add_executable(transportTest "")
|
||||||
add_executable(transUT "")
|
add_executable(transUT "")
|
||||||
|
add_executable(transUT2 "")
|
||||||
add_executable(svrBench "")
|
add_executable(svrBench "")
|
||||||
add_executable(cliBench "")
|
add_executable(cliBench "")
|
||||||
add_executable(httpBench "")
|
add_executable(httpBench "")
|
||||||
|
@ -9,6 +10,10 @@ target_sources(transUT
|
||||||
"transUT.cpp"
|
"transUT.cpp"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
target_sources(transUT2
|
||||||
|
PRIVATE
|
||||||
|
"transUT2.cpp"
|
||||||
|
)
|
||||||
target_sources(transportTest
|
target_sources(transportTest
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"transportTests.cpp"
|
"transportTests.cpp"
|
||||||
|
@ -56,6 +61,20 @@ target_include_directories(transUT
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
target_link_libraries(transUT2
|
||||||
|
os
|
||||||
|
util
|
||||||
|
common
|
||||||
|
gtest_main
|
||||||
|
transport
|
||||||
|
)
|
||||||
|
|
||||||
|
target_include_directories(transUT2
|
||||||
|
PUBLIC
|
||||||
|
"${TD_SOURCE_DIR}/include/libs/transport"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
|
||||||
target_include_directories(svrBench
|
target_include_directories(svrBench
|
||||||
PUBLIC
|
PUBLIC
|
||||||
"${TD_SOURCE_DIR}/include/libs/transport"
|
"${TD_SOURCE_DIR}/include/libs/transport"
|
||||||
|
|
|
@ -53,8 +53,6 @@ static void processResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
||||||
pMsg->code);
|
pMsg->code);
|
||||||
|
|
||||||
if (pEpSet) pInfo->epSet = *pEpSet;
|
|
||||||
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
tsem_post(&pInfo->rspSem);
|
tsem_post(&pInfo->rspSem);
|
||||||
}
|
}
|
||||||
|
@ -72,12 +70,12 @@ static void *sendRequest(void *param) {
|
||||||
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
||||||
rpcMsg.contLen = pInfo->msgSize;
|
rpcMsg.contLen = pInfo->msgSize;
|
||||||
rpcMsg.info.ahandle = pInfo;
|
rpcMsg.info.ahandle = pInfo;
|
||||||
rpcMsg.info.noResp = 1;
|
rpcMsg.info.noResp = 0;
|
||||||
rpcMsg.msgType = 1;
|
rpcMsg.msgType = 1;
|
||||||
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
||||||
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
||||||
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||||
// tsem_wait(&pInfo->rspSem);
|
tsem_wait(&pInfo->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
tDebug("thread:%d, it is over", pInfo->index);
|
tDebug("thread:%d, it is over", pInfo->index);
|
||||||
|
@ -110,17 +108,15 @@ int main(int argc, char *argv[]) {
|
||||||
rpcInit.label = "APP";
|
rpcInit.label = "APP";
|
||||||
rpcInit.numOfThreads = 1;
|
rpcInit.numOfThreads = 1;
|
||||||
rpcInit.cfp = processResponse;
|
rpcInit.cfp = processResponse;
|
||||||
rpcInit.sessions = 100;
|
rpcInit.sessions = 1000;
|
||||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
rpcInit.user = "michael";
|
rpcInit.user = "michael";
|
||||||
|
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
rpcInit.connLimitNum = 10;
|
rpcInit.shareConnLimit = tsShareConnLimit;
|
||||||
rpcInit.connLimitLock = 1;
|
|
||||||
rpcInit.shareConnLimit = 16 * 1024;
|
|
||||||
rpcInit.supportBatch = 1;
|
rpcInit.supportBatch = 1;
|
||||||
|
rpcInit.compressSize = -1;
|
||||||
rpcDebugFlag = 135;
|
rpcDebugFlag = 143;
|
||||||
for (int i = 1; i < argc; ++i) {
|
for (int i = 1; i < argc; ++i) {
|
||||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||||
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
|
||||||
|
@ -139,6 +135,10 @@ int main(int argc, char *argv[]) {
|
||||||
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
||||||
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
||||||
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
||||||
|
} else if (strcmp(argv[i], "-l") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.shareConnLimit = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-c") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.compressSize = atoi(argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
||||||
rpcDebugFlag = atoi(argv[++i]);
|
rpcDebugFlag = atoi(argv[++i]);
|
||||||
} else {
|
} else {
|
||||||
|
@ -150,6 +150,8 @@ int main(int argc, char *argv[]) {
|
||||||
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
||||||
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
||||||
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
||||||
|
printf(" [-c compressSize]: compress size, default:%d\n", tsCompressMsgSize);
|
||||||
|
printf(" [-l shareConnLimit]: share conn limit, default:%d\n", tsShareConnLimit);
|
||||||
printf(" [-h help]: print out this help\n\n");
|
printf(" [-h help]: print out this help\n\n");
|
||||||
exit(0);
|
exit(0);
|
||||||
}
|
}
|
||||||
|
@ -168,18 +170,18 @@ int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
int64_t now = taosGetTimestampUs();
|
int64_t now = taosGetTimestampUs();
|
||||||
|
|
||||||
SInfo *pInfo = (SInfo *)taosMemoryCalloc(1, sizeof(SInfo) * appThreads);
|
SInfo **pInfo = (SInfo **)taosMemoryCalloc(1, sizeof(SInfo *) * appThreads);
|
||||||
SInfo *p = pInfo;
|
|
||||||
for (int i = 0; i < appThreads; ++i) {
|
for (int i = 0; i < appThreads; ++i) {
|
||||||
pInfo->index = i;
|
SInfo *p = taosMemoryCalloc(1, sizeof(SInfo));
|
||||||
pInfo->epSet = epSet;
|
p->index = i;
|
||||||
pInfo->numOfReqs = numOfReqs;
|
p->epSet = epSet;
|
||||||
pInfo->msgSize = msgSize;
|
p->numOfReqs = numOfReqs;
|
||||||
tsem_init(&pInfo->rspSem, 0, 0);
|
p->msgSize = msgSize;
|
||||||
pInfo->pRpc = pRpc;
|
tsem_init(&p->rspSem, 0, 0);
|
||||||
|
p->pRpc = pRpc;
|
||||||
|
pInfo[i] = p;
|
||||||
|
|
||||||
taosThreadCreate(&pInfo->thread, NULL, sendRequest, pInfo);
|
taosThreadCreate(&p->thread, NULL, sendRequest, pInfo[i]);
|
||||||
pInfo++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
do {
|
do {
|
||||||
|
@ -192,12 +194,14 @@ int main(int argc, char *argv[]) {
|
||||||
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
|
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
|
||||||
|
|
||||||
for (int i = 0; i < appThreads; i++) {
|
for (int i = 0; i < appThreads; i++) {
|
||||||
SInfo *pInfo = p;
|
SInfo *p = pInfo[i];
|
||||||
taosThreadJoin(pInfo->thread, NULL);
|
taosThreadJoin(p->thread, NULL);
|
||||||
p++;
|
taosMemoryFree(p);
|
||||||
}
|
}
|
||||||
int ch = getchar();
|
taosMemoryFree(pInfo);
|
||||||
UNUSED(ch);
|
|
||||||
|
// int ch = getchar();
|
||||||
|
// UNUSED(ch);
|
||||||
|
|
||||||
taosCloseLog();
|
taosCloseLog();
|
||||||
|
|
||||||
|
|
|
@ -76,23 +76,6 @@ void *processShellMsg(void *arg) {
|
||||||
|
|
||||||
for (int i = 0; i < numOfMsgs; ++i) {
|
for (int i = 0; i < numOfMsgs; ++i) {
|
||||||
taosGetQitem(qall, (void **)&pRpcMsg);
|
taosGetQitem(qall, (void **)&pRpcMsg);
|
||||||
|
|
||||||
if (pDataFile != NULL) {
|
|
||||||
if (taosWriteFile(pDataFile, pRpcMsg->pCont, pRpcMsg->contLen) < 0) {
|
|
||||||
tInfo("failed to write data file, reason:%s", strerror(errno));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (commit >= 2) {
|
|
||||||
num += numOfMsgs;
|
|
||||||
// if (taosFsync(pDataFile) < 0) {
|
|
||||||
// tInfo("failed to flush data to file, reason:%s", strerror(errno));
|
|
||||||
//}
|
|
||||||
|
|
||||||
if (num % 10000 == 0) {
|
|
||||||
tInfo("%d request have been written into disk", num);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosResetQitems(qall);
|
taosResetQitems(qall);
|
||||||
|
@ -107,16 +90,7 @@ void *processShellMsg(void *arg) {
|
||||||
rpcMsg.code = 0;
|
rpcMsg.code = 0;
|
||||||
rpcSendResponse(&rpcMsg);
|
rpcSendResponse(&rpcMsg);
|
||||||
|
|
||||||
void *handle = pRpcMsg->info.handle;
|
|
||||||
taosFreeQitem(pRpcMsg);
|
taosFreeQitem(pRpcMsg);
|
||||||
//{
|
|
||||||
// SRpcMsg nRpcMsg = {0};
|
|
||||||
// nRpcMsg.pCont = rpcMallocCont(msgSize);
|
|
||||||
// nRpcMsg.contLen = msgSize;
|
|
||||||
// nRpcMsg.info.handle = handle;
|
|
||||||
// nRpcMsg.code = TSDB_CODE_CTG_NOT_READY;
|
|
||||||
// rpcSendResponse(&nRpcMsg);
|
|
||||||
//}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosUpdateItemSize(qinfo.queue, numOfMsgs);
|
taosUpdateItemSize(qinfo.queue, numOfMsgs);
|
||||||
|
@ -149,12 +123,13 @@ int main(int argc, char *argv[]) {
|
||||||
rpcInit.localPort = 7000;
|
rpcInit.localPort = 7000;
|
||||||
memcpy(rpcInit.localFqdn, "localhost", strlen("localhost"));
|
memcpy(rpcInit.localFqdn, "localhost", strlen("localhost"));
|
||||||
rpcInit.label = "SER";
|
rpcInit.label = "SER";
|
||||||
rpcInit.numOfThreads = 1;
|
rpcInit.numOfThreads = 10;
|
||||||
rpcInit.cfp = processRequestMsg;
|
rpcInit.cfp = processRequestMsg;
|
||||||
rpcInit.idleTime = 2 * 1500;
|
rpcInit.idleTime = 2 * 1500;
|
||||||
|
|
||||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||||
rpcDebugFlag = 131;
|
rpcDebugFlag = 131;
|
||||||
|
rpcInit.compressSize = -1;
|
||||||
|
|
||||||
for (int i = 1; i < argc; ++i) {
|
for (int i = 1; i < argc; ++i) {
|
||||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||||
|
@ -205,8 +180,8 @@ int main(int argc, char *argv[]) {
|
||||||
if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno));
|
if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfAthread = 5;
|
int32_t numOfAthread = 1;
|
||||||
multiQ = taosMemoryMalloc(sizeof(numOfAthread));
|
multiQ = taosMemoryMalloc(sizeof(MultiThreadQhandle));
|
||||||
multiQ->numOfThread = numOfAthread;
|
multiQ->numOfThread = numOfAthread;
|
||||||
multiQ->qhandle = (STaosQueue **)taosMemoryMalloc(sizeof(STaosQueue *) * numOfAthread);
|
multiQ->qhandle = (STaosQueue **)taosMemoryMalloc(sizeof(STaosQueue *) * numOfAthread);
|
||||||
multiQ->qset = (STaosQset **)taosMemoryMalloc(sizeof(STaosQset *) * numOfAthread);
|
multiQ->qset = (STaosQset **)taosMemoryMalloc(sizeof(STaosQset *) * numOfAthread);
|
||||||
|
@ -221,11 +196,6 @@ int main(int argc, char *argv[]) {
|
||||||
threads[i].idx = i;
|
threads[i].idx = i;
|
||||||
taosThreadCreate(&(threads[i].thread), NULL, processShellMsg, (void *)&threads[i]);
|
taosThreadCreate(&(threads[i].thread), NULL, processShellMsg, (void *)&threads[i]);
|
||||||
}
|
}
|
||||||
// qhandle = taosOpenQueue();
|
|
||||||
// qset = taosOpenQset();
|
|
||||||
// taosAddIntoQset(qset, qhandle, NULL);
|
|
||||||
|
|
||||||
// processShellMsg();
|
|
||||||
|
|
||||||
if (pDataFile != NULL) {
|
if (pDataFile != NULL) {
|
||||||
taosCloseFile(&pDataFile);
|
taosCloseFile(&pDataFile);
|
||||||
|
|
|
@ -54,6 +54,7 @@ class Client {
|
||||||
rpcInit_.user = (char *)user;
|
rpcInit_.user = (char *)user;
|
||||||
rpcInit_.parent = this;
|
rpcInit_.parent = this;
|
||||||
rpcInit_.connType = TAOS_CONN_CLIENT;
|
rpcInit_.connType = TAOS_CONN_CLIENT;
|
||||||
|
rpcInit_.shareConnLimit = 200;
|
||||||
|
|
||||||
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
||||||
this->transCli = rpcOpen(&rpcInit_);
|
this->transCli = rpcOpen(&rpcInit_);
|
||||||
|
@ -85,6 +86,14 @@ class Client {
|
||||||
SemWait();
|
SemWait();
|
||||||
*resp = this->resp;
|
*resp = this->resp;
|
||||||
}
|
}
|
||||||
|
void sendReq(SRpcMsg *req) {
|
||||||
|
SEpSet epSet = {0};
|
||||||
|
epSet.inUse = 0;
|
||||||
|
addEpIntoEpSet(&epSet, "127.0.0.1", 7000);
|
||||||
|
|
||||||
|
rpcSendRequest(this->transCli, &epSet, req, NULL);
|
||||||
|
|
||||||
|
}
|
||||||
void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) {
|
void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) {
|
||||||
if (req->info.handle != NULL) {
|
if (req->info.handle != NULL) {
|
||||||
rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT);
|
rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT);
|
||||||
|
@ -160,6 +169,7 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
rpcMsg.contLen = 100;
|
rpcMsg.contLen = 100;
|
||||||
rpcMsg.info = pMsg->info;
|
rpcMsg.info = pMsg->info;
|
||||||
rpcMsg.code = 0;
|
rpcMsg.code = 0;
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
rpcSendResponse(&rpcMsg);
|
rpcSendResponse(&rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,6 +274,7 @@ class TransObj {
|
||||||
cli->Stop();
|
cli->Stop();
|
||||||
}
|
}
|
||||||
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
|
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
|
||||||
|
void cliSendReq(SRpcMsg *req) { cli->sendReq(req); }
|
||||||
void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); }
|
void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); }
|
||||||
|
|
||||||
~TransObj() {
|
~TransObj() {
|
||||||
|
@ -492,15 +503,16 @@ TEST_F(TransEnv, queryExcept) {
|
||||||
TEST_F(TransEnv, noResp) {
|
TEST_F(TransEnv, noResp) {
|
||||||
SRpcMsg resp = {0};
|
SRpcMsg resp = {0};
|
||||||
SRpcMsg req = {0};
|
SRpcMsg req = {0};
|
||||||
// for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 500000; i++) {
|
||||||
// memset(&req, 0, sizeof(req));
|
memset(&req, 0, sizeof(req));
|
||||||
// req.info.noResp = 1;
|
req.info.noResp = 1;
|
||||||
// req.msgType = 1;
|
req.msgType = 3;
|
||||||
// req.pCont = rpcMallocCont(10);
|
req.pCont = rpcMallocCont(10);
|
||||||
// req.contLen = 10;
|
req.contLen = 10;
|
||||||
// tr->cliSendAndRecv(&req, &resp);
|
tr->cliSendReq(&req);
|
||||||
//}
|
//tr->cliSendAndRecv(&req, &resp);
|
||||||
// taosMsleep(2000);
|
}
|
||||||
|
taosMsleep(2000);
|
||||||
|
|
||||||
// no resp
|
// no resp
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,529 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
|
||||||
|
* Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <cstdio>
|
||||||
|
#include <cstring>
|
||||||
|
#include "tdatablock.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "tmisce.h"
|
||||||
|
#include "transLog.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
#include "tversion.h"
|
||||||
|
using namespace std;
|
||||||
|
|
||||||
|
const char *label = "APP";
|
||||||
|
const char *secret = "secret";
|
||||||
|
const char *user = "user";
|
||||||
|
const char *ckey = "ckey";
|
||||||
|
|
||||||
|
class Server;
|
||||||
|
int port = 7000;
|
||||||
|
// server process
|
||||||
|
// server except
|
||||||
|
|
||||||
|
typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
|
||||||
|
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
// client process;
|
||||||
|
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
class Client {
|
||||||
|
public:
|
||||||
|
void Init(int nThread) {
|
||||||
|
memcpy(tsTempDir, TD_TMP_DIR_PATH, strlen(TD_TMP_DIR_PATH));
|
||||||
|
memset(&rpcInit_, 0, sizeof(rpcInit_));
|
||||||
|
rpcInit_.localPort = 0;
|
||||||
|
rpcInit_.label = (char *)"client";
|
||||||
|
rpcInit_.numOfThreads = nThread;
|
||||||
|
rpcInit_.cfp = processResp;
|
||||||
|
rpcInit_.user = (char *)user;
|
||||||
|
rpcInit_.parent = this;
|
||||||
|
rpcInit_.connType = TAOS_CONN_CLIENT;
|
||||||
|
rpcInit_.shareConnLimit = 200;
|
||||||
|
|
||||||
|
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
||||||
|
this->transCli = rpcOpen(&rpcInit_);
|
||||||
|
//tsem_init(&this->sem, 0, 0);
|
||||||
|
}
|
||||||
|
void SetResp(SRpcMsg *pMsg) {
|
||||||
|
// set up resp;
|
||||||
|
this->resp = *pMsg;
|
||||||
|
}
|
||||||
|
SRpcMsg *Resp() { return &this->resp; }
|
||||||
|
|
||||||
|
void Restart(CB cb) {
|
||||||
|
rpcClose(this->transCli);
|
||||||
|
rpcInit_.cfp = cb;
|
||||||
|
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
||||||
|
this->transCli = rpcOpen(&rpcInit_);
|
||||||
|
}
|
||||||
|
void Stop() {
|
||||||
|
rpcClose(this->transCli);
|
||||||
|
this->transCli = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SendAndRecv(SRpcMsg *req, SRpcMsg *resp) {
|
||||||
|
SEpSet epSet = {0};
|
||||||
|
epSet.inUse = 0;
|
||||||
|
addEpIntoEpSet(&epSet, "127.0.0.1", 7000);
|
||||||
|
|
||||||
|
rpcSendRequest(this->transCli, &epSet, req, NULL);
|
||||||
|
SemWait();
|
||||||
|
*resp = this->resp;
|
||||||
|
}
|
||||||
|
void sendReq(SRpcMsg *req) {
|
||||||
|
SEpSet epSet = {0};
|
||||||
|
epSet.inUse = 0;
|
||||||
|
addEpIntoEpSet(&epSet, "127.0.0.1", 7000);
|
||||||
|
|
||||||
|
rpcSendRequest(this->transCli, &epSet, req, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
void sendReqWithId(SRpcMsg *req, int64_t *id) {
|
||||||
|
SEpSet epSet = {0};
|
||||||
|
epSet.inUse = 0;
|
||||||
|
addEpIntoEpSet(&epSet, "127.0.0.1",7000);
|
||||||
|
rpcSendRequestWithCtx(this->transCli, &epSet, req, id, NULL);
|
||||||
|
|
||||||
|
}
|
||||||
|
void freeId(int64_t *id) {
|
||||||
|
rpcFreeConnById(this->transCli, *id);
|
||||||
|
}
|
||||||
|
void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) {
|
||||||
|
if (req->info.handle != NULL) {
|
||||||
|
rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT);
|
||||||
|
req->info.handle = NULL;
|
||||||
|
}
|
||||||
|
SendAndRecv(req, resp);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SemWait() { tsem_wait(&this->sem); }
|
||||||
|
void SemPost() { tsem_post(&this->sem); }
|
||||||
|
void Reset() {}
|
||||||
|
|
||||||
|
~Client() {
|
||||||
|
if (this->transCli) rpcClose(this->transCli);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
tsem_t sem;
|
||||||
|
SRpcInit rpcInit_;
|
||||||
|
void *transCli;
|
||||||
|
SRpcMsg resp;
|
||||||
|
};
|
||||||
|
class Server {
|
||||||
|
public:
|
||||||
|
Server() {
|
||||||
|
memcpy(tsTempDir, TD_TMP_DIR_PATH, strlen(TD_TMP_DIR_PATH));
|
||||||
|
memset(&rpcInit_, 0, sizeof(rpcInit_));
|
||||||
|
|
||||||
|
memcpy(rpcInit_.localFqdn, "localhost", strlen("localhost"));
|
||||||
|
rpcInit_.localPort = port;
|
||||||
|
rpcInit_.label = (char *)"server";
|
||||||
|
rpcInit_.numOfThreads = 5;
|
||||||
|
rpcInit_.cfp = processReq;
|
||||||
|
rpcInit_.user = (char *)user;
|
||||||
|
rpcInit_.connType = TAOS_CONN_SERVER;
|
||||||
|
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
||||||
|
}
|
||||||
|
void Start() {
|
||||||
|
this->transSrv = rpcOpen(&this->rpcInit_);
|
||||||
|
taosMsleep(1000);
|
||||||
|
}
|
||||||
|
void SetSrvContinueSend(CB cb) {
|
||||||
|
this->Stop();
|
||||||
|
rpcInit_.cfp = cb;
|
||||||
|
this->Start();
|
||||||
|
}
|
||||||
|
void Stop() {
|
||||||
|
if (this->transSrv == NULL) return;
|
||||||
|
rpcClose(this->transSrv);
|
||||||
|
this->transSrv = NULL;
|
||||||
|
}
|
||||||
|
void SetSrvSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
|
||||||
|
this->Stop();
|
||||||
|
rpcInit_.cfp = cfp;
|
||||||
|
this->Start();
|
||||||
|
}
|
||||||
|
void Restart() {
|
||||||
|
this->Stop();
|
||||||
|
this->Start();
|
||||||
|
}
|
||||||
|
~Server() {
|
||||||
|
if (this->transSrv) rpcClose(this->transSrv);
|
||||||
|
this->transSrv = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
SRpcInit rpcInit_;
|
||||||
|
void *transSrv;
|
||||||
|
};
|
||||||
|
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.pCont = rpcMallocCont(100);
|
||||||
|
rpcMsg.contLen = 100;
|
||||||
|
rpcMsg.info = pMsg->info;
|
||||||
|
rpcMsg.code = 0;
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
rpcSendResponse(&rpcMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
// for (int i = 0; i < 10; i++) {
|
||||||
|
// SRpcMsg rpcMsg = {0};
|
||||||
|
// rpcMsg.pCont = rpcMallocCont(100);
|
||||||
|
// rpcMsg.contLen = 100;
|
||||||
|
// rpcMsg.info = pMsg->info;
|
||||||
|
// rpcMsg.code = 0;
|
||||||
|
// rpcSendResponse(&rpcMsg);
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.pCont = rpcMallocCont(100);
|
||||||
|
rpcMsg.contLen = 100;
|
||||||
|
rpcMsg.info = pMsg->info;
|
||||||
|
rpcMsg.code = 0;
|
||||||
|
rpcSendResponse(&rpcMsg);
|
||||||
|
|
||||||
|
rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER);
|
||||||
|
}
|
||||||
|
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
// {
|
||||||
|
// SRpcMsg rpcMsg1 = {0};
|
||||||
|
// rpcMsg1.pCont = rpcMallocCont(100);
|
||||||
|
// rpcMsg1.contLen = 100;
|
||||||
|
// rpcMsg1.info = pMsg->info;
|
||||||
|
// rpcMsg1.code = 0;
|
||||||
|
// rpcRegisterBrokenLinkArg(&rpcMsg1);
|
||||||
|
// }
|
||||||
|
// taosMsleep(10);
|
||||||
|
|
||||||
|
// SRpcMsg rpcMsg = {0};
|
||||||
|
// rpcMsg.pCont = rpcMallocCont(100);
|
||||||
|
// rpcMsg.contLen = 100;
|
||||||
|
// rpcMsg.info = pMsg->info;
|
||||||
|
// rpcMsg.code = 0;
|
||||||
|
// rpcSendResponse(&rpcMsg);
|
||||||
|
}
|
||||||
|
// client process;
|
||||||
|
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
Client *client = (Client *)parent;
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
STraceId *trace = (STraceId *)&pMsg->info.traceId;
|
||||||
|
tGDebug("received resp %s",tstrerror(pMsg->code));
|
||||||
|
}
|
||||||
|
|
||||||
|
static void initEnv() {
|
||||||
|
dDebugFlag = 143;
|
||||||
|
vDebugFlag = 0;
|
||||||
|
mDebugFlag = 143;
|
||||||
|
cDebugFlag = 0;
|
||||||
|
jniDebugFlag = 0;
|
||||||
|
tmrDebugFlag = 143;
|
||||||
|
uDebugFlag = 143;
|
||||||
|
rpcDebugFlag = 143;
|
||||||
|
qDebugFlag = 0;
|
||||||
|
wDebugFlag = 0;
|
||||||
|
sDebugFlag = 0;
|
||||||
|
tsdbDebugFlag = 0;
|
||||||
|
tsLogEmbedded = 1;
|
||||||
|
tsAsyncLog = 0;
|
||||||
|
|
||||||
|
std::string path = TD_TMP_DIR_PATH "transport";
|
||||||
|
// taosRemoveDir(path.c_str());
|
||||||
|
taosMkDir(path.c_str());
|
||||||
|
|
||||||
|
tstrncpy(tsLogDir, path.c_str(), PATH_MAX);
|
||||||
|
if (taosInitLog("taosdlog", 1, false) != 0) {
|
||||||
|
printf("failed to init log file\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class TransObj {
|
||||||
|
public:
|
||||||
|
TransObj() {
|
||||||
|
initEnv();
|
||||||
|
cli = new Client;
|
||||||
|
cli->Init(1);
|
||||||
|
srv = new Server;
|
||||||
|
srv->Start();
|
||||||
|
}
|
||||||
|
|
||||||
|
void RestartCli(CB cb) {
|
||||||
|
//
|
||||||
|
cli->Restart(cb);
|
||||||
|
}
|
||||||
|
void StopSrv() {
|
||||||
|
//
|
||||||
|
srv->Stop();
|
||||||
|
}
|
||||||
|
// call when link broken, and notify query or fetch stop
|
||||||
|
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
|
||||||
|
///////
|
||||||
|
srv->SetSrvContinueSend(cfp);
|
||||||
|
}
|
||||||
|
void RestartSrv() { srv->Restart(); }
|
||||||
|
void StopCli() {
|
||||||
|
///////
|
||||||
|
cli->Stop();
|
||||||
|
}
|
||||||
|
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
|
||||||
|
void cliSendReq(SRpcMsg *req) { cli->sendReq(req); }
|
||||||
|
|
||||||
|
void cliSendReqWithId(SRpcMsg *req, int64_t *id) { cli->sendReqWithId(req, id);}
|
||||||
|
void cliFreeReqId(int64_t *id) { cli->freeId(id);}
|
||||||
|
void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); }
|
||||||
|
|
||||||
|
~TransObj() {
|
||||||
|
delete cli;
|
||||||
|
delete srv;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Client *cli;
|
||||||
|
Server *srv;
|
||||||
|
};
|
||||||
|
class TransEnv : public ::testing::Test {
|
||||||
|
protected:
|
||||||
|
virtual void SetUp() {
|
||||||
|
// set up trans obj
|
||||||
|
tr = new TransObj();
|
||||||
|
}
|
||||||
|
virtual void TearDown() {
|
||||||
|
// tear down
|
||||||
|
delete tr;
|
||||||
|
}
|
||||||
|
|
||||||
|
TransObj *tr = NULL;
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(TransEnv, 01sendAndRec) {
|
||||||
|
// for (int i = 0; i < 10; i++) {
|
||||||
|
// SRpcMsg req = {0}, resp = {0};
|
||||||
|
// req.msgType = 0;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// assert(resp.code == 0);
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TransEnv, 02StopServer) {
|
||||||
|
// for (int i = 0; i < 1; i++) {
|
||||||
|
// SRpcMsg req = {0}, resp = {0};
|
||||||
|
// req.msgType = 0;
|
||||||
|
// req.info.ahandle = (void *)0x35;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// assert(resp.code == 0);
|
||||||
|
// }
|
||||||
|
// SRpcMsg req = {0}, resp = {0};
|
||||||
|
// req.info.ahandle = (void *)0x35;
|
||||||
|
// req.msgType = 1;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->StopSrv();
|
||||||
|
// // tr->RestartSrv();
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// assert(resp.code != 0);
|
||||||
|
}
|
||||||
|
TEST_F(TransEnv, clientUserDefined) {
|
||||||
|
// tr->RestartSrv();
|
||||||
|
// for (int i = 0; i < 10; i++) {
|
||||||
|
// SRpcMsg req = {0}, resp = {0};
|
||||||
|
// req.msgType = 0;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// assert(resp.code == 0);
|
||||||
|
// }
|
||||||
|
|
||||||
|
//////////////////
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TransEnv, cliPersistHandle) {
|
||||||
|
// SRpcMsg resp = {0};
|
||||||
|
// void *handle = NULL;
|
||||||
|
// for (int i = 0; i < 10; i++) {
|
||||||
|
// SRpcMsg req = {0};
|
||||||
|
// req.info = resp.info;
|
||||||
|
// req.info.persistHandle = 1;
|
||||||
|
|
||||||
|
// req.msgType = 1;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// // if (i == 5) {
|
||||||
|
// // std::cout << "stop server" << std::endl;
|
||||||
|
// // tr->StopSrv();
|
||||||
|
// //}
|
||||||
|
// // if (i >= 6) {
|
||||||
|
// // EXPECT_TRUE(resp.code != 0);
|
||||||
|
// //}
|
||||||
|
// handle = resp.info.handle;
|
||||||
|
// }
|
||||||
|
// rpcReleaseHandle(handle, TAOS_CONN_CLIENT);
|
||||||
|
// for (int i = 0; i < 10; i++) {
|
||||||
|
// SRpcMsg req = {0};
|
||||||
|
// req.msgType = 1;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// taosMsleep(1000);
|
||||||
|
//////////////////
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TransEnv, srvReleaseHandle) {
|
||||||
|
// SRpcMsg resp = {0};
|
||||||
|
// tr->SetSrvContinueSend(processReleaseHandleCb);
|
||||||
|
// // tr->Restart(processReleaseHandleCb);
|
||||||
|
// void *handle = NULL;
|
||||||
|
// SRpcMsg req = {0};
|
||||||
|
// for (int i = 0; i < 1; i++) {
|
||||||
|
// memset(&req, 0, sizeof(req));
|
||||||
|
// req.info = resp.info;
|
||||||
|
// req.info.persistHandle = 1;
|
||||||
|
// req.msgType = 1;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// // tr->cliSendAndRecvNoHandle(&req, &resp);
|
||||||
|
// EXPECT_TRUE(resp.code == 0);
|
||||||
|
// }
|
||||||
|
//////////////////
|
||||||
|
}
|
||||||
|
// reopen later
|
||||||
|
// TEST_F(TransEnv, cliReleaseHandleExcept) {
|
||||||
|
// SRpcMsg resp = {0};
|
||||||
|
// SRpcMsg req = {0};
|
||||||
|
// for (int i = 0; i < 3; i++) {
|
||||||
|
// memset(&req, 0, sizeof(req));
|
||||||
|
// req.info = resp.info;
|
||||||
|
// req.info.persistHandle = 1;
|
||||||
|
// req.info.ahandle = (void *)1234;
|
||||||
|
// req.msgType = 1;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// if (i == 1) {
|
||||||
|
// std::cout << "stop server" << std::endl;
|
||||||
|
// tr->StopSrv();
|
||||||
|
// }
|
||||||
|
// if (i > 1) {
|
||||||
|
// EXPECT_TRUE(resp.code != 0);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// //////////////////
|
||||||
|
//}
|
||||||
|
TEST_F(TransEnv, srvContinueSend) {
|
||||||
|
// tr->SetSrvContinueSend(processContinueSend);
|
||||||
|
// SRpcMsg req = {0}, resp = {0};
|
||||||
|
// for (int i = 0; i < 10; i++) {
|
||||||
|
// // memset(&req, 0, sizeof(req));
|
||||||
|
// // memset(&resp, 0, sizeof(resp));
|
||||||
|
// // req.msgType = 1;
|
||||||
|
// // req.pCont = rpcMallocCont(10);
|
||||||
|
// // req.contLen = 10;
|
||||||
|
// // tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// }
|
||||||
|
// taosMsleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TransEnv, srvPersistHandleExcept) {
|
||||||
|
// tr->SetSrvContinueSend(processContinueSend);
|
||||||
|
// // tr->SetCliPersistFp(cliPersistHandle);
|
||||||
|
// SRpcMsg resp = {0};
|
||||||
|
// SRpcMsg req = {0};
|
||||||
|
// for (int i = 0; i < 5; i++) {
|
||||||
|
// // memset(&req, 0, sizeof(req));
|
||||||
|
// // req.info = resp.info;
|
||||||
|
// // req.msgType = 1;
|
||||||
|
// // req.pCont = rpcMallocCont(10);
|
||||||
|
// // req.contLen = 10;
|
||||||
|
// // tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// // if (i > 2) {
|
||||||
|
// // tr->StopCli();
|
||||||
|
// // break;
|
||||||
|
// //}
|
||||||
|
// }
|
||||||
|
// taosMsleep(2000);
|
||||||
|
// conn broken
|
||||||
|
//
|
||||||
|
}
|
||||||
|
TEST_F(TransEnv, cliPersistHandleExcept) {
|
||||||
|
// tr->SetSrvContinueSend(processContinueSend);
|
||||||
|
// SRpcMsg resp = {0};
|
||||||
|
// SRpcMsg req = {0};
|
||||||
|
// for (int i = 0; i < 5; i++) {
|
||||||
|
// // memset(&req, 0, sizeof(req));
|
||||||
|
// // req.info = resp.info;
|
||||||
|
// // req.msgType = 1;
|
||||||
|
// // req.pCont = rpcMallocCont(10);
|
||||||
|
// // req.contLen = 10;
|
||||||
|
// // tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// // if (i > 2) {
|
||||||
|
// // tr->StopSrv();
|
||||||
|
// // break;
|
||||||
|
// //}
|
||||||
|
// }
|
||||||
|
// taosMsleep(2000);
|
||||||
|
// // conn broken
|
||||||
|
//
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TransEnv, multiCliPersistHandleExcept) {
|
||||||
|
// conn broken
|
||||||
|
}
|
||||||
|
TEST_F(TransEnv, queryExcept) {
|
||||||
|
//taosMsleep(4 * 1000);
|
||||||
|
}
|
||||||
|
TEST_F(TransEnv, idTest) {
|
||||||
|
SRpcMsg resp = {0};
|
||||||
|
SRpcMsg req = {0};
|
||||||
|
for (int i = 0; i < 50000; i++) {
|
||||||
|
memset(&req, 0, sizeof(req));
|
||||||
|
req.info.noResp = 0;
|
||||||
|
req.msgType = 3;
|
||||||
|
req.pCont = rpcMallocCont(10);
|
||||||
|
req.contLen = 10;
|
||||||
|
int64_t id;
|
||||||
|
tr->cliSendReqWithId(&req, &id);
|
||||||
|
tr->cliFreeReqId(&id);
|
||||||
|
}
|
||||||
|
taosMsleep(1000);
|
||||||
|
// no resp
|
||||||
|
}
|
||||||
|
TEST_F(TransEnv, noResp) {
|
||||||
|
SRpcMsg resp = {0};
|
||||||
|
SRpcMsg req = {0};
|
||||||
|
for (int i = 0; i < 500000; i++) {
|
||||||
|
memset(&req, 0, sizeof(req));
|
||||||
|
req.info.noResp = 0;
|
||||||
|
req.msgType = 3;
|
||||||
|
req.pCont = rpcMallocCont(10);
|
||||||
|
req.contLen = 10;
|
||||||
|
tr->cliSendReq(&req);
|
||||||
|
//tr->cliSendAndRecv(&req, &resp);
|
||||||
|
}
|
||||||
|
taosMsleep(10000);
|
||||||
|
// no resp
|
||||||
|
}
|
Loading…
Reference in New Issue