Merge branch 'main' into merge/mainto3.0
This commit is contained in:
commit
a381eb78a8
|
@ -46,7 +46,7 @@ For more details on features, please read through the entire documentation.
|
||||||
|
|
||||||
By making full use of [characteristics of time series data](https://tdengine.com/characteristics-of-time-series-data/), TDengine differentiates itself from other time series databases with the following advantages.
|
By making full use of [characteristics of time series data](https://tdengine.com/characteristics-of-time-series-data/), TDengine differentiates itself from other time series databases with the following advantages.
|
||||||
|
|
||||||
- **[High-Performance](https://tdengine.com/high-performance/)**: TDengine is the only time-series database to solve the high cardinality issue to support billions of data collection points while out performing other time-series databases for data ingestion, querying and data compression.
|
- **[High-Performance](https://tdengine.com/high-performance/)**: TDengine is the only time-series database to solve the high cardinality issue to support billions of data collection points while outperforming other time-series databases for data ingestion, querying and data compression.
|
||||||
|
|
||||||
- **[Simplified Solution](https://tdengine.com/comprehensive-industrial-data-solution/)**: Through built-in caching, stream processing and data subscription features, TDengine provides a simplified solution for time-series data processing. It reduces system design complexity and operation costs significantly.
|
- **[Simplified Solution](https://tdengine.com/comprehensive-industrial-data-solution/)**: Through built-in caching, stream processing and data subscription features, TDengine provides a simplified solution for time-series data processing. It reduces system design complexity and operation costs significantly.
|
||||||
|
|
||||||
|
|
|
@ -364,6 +364,9 @@ The configuration parameters for specifying super table tag columns and data col
|
||||||
- **min**: The minimum value of the column/label of the data type. The generated value will equal or large than the minimum value.
|
- **min**: The minimum value of the column/label of the data type. The generated value will equal or large than the minimum value.
|
||||||
|
|
||||||
- **max**: The maximum value of the column/label of the data type. The generated value will less than the maximum value.
|
- **max**: The maximum value of the column/label of the data type. The generated value will less than the maximum value.
|
||||||
|
|
||||||
|
- **scalingFactor**: Floating-point precision enhancement factor, which takes effect only when the data type is float/double. It has a valid range of positive integers from 1 to 1,000,000. It is used to enhance the precision of generated floating-point numbers, particularly when the min or max values are small. This property enhances the precision after the decimal point by powers of 10: scalingFactor of 10 indicates an enhancement of 1 decimal precision, 100 indicates an enhancement of 2 decimal precision, and so on.
|
||||||
|
|
||||||
- **fun**: This column of data is filled with functions. Currently, only the sin and cos functions are supported. The input parameter is the timestamp and converted to an angle value. The conversion formula is: angle x=input time column ts value % 360. At the same time, it supports coefficient adjustment and random fluctuation factor adjustment, presented in a fixed format expression, such as fun="10\*sin(x)+100\*random(5)", where x represents the angle, ranging from 0 to 360 degrees, and the growth step size is consistent with the time column step size. 10 represents the coefficient of multiplication, 100 represents the coefficient of addition or subtraction, and 5 represents the fluctuation range within a random range of 5%. The currently supported data types are int, bigint, float, and double. Note: The expression is fixed and cannot be reversed.
|
- **fun**: This column of data is filled with functions. Currently, only the sin and cos functions are supported. The input parameter is the timestamp and converted to an angle value. The conversion formula is: angle x=input time column ts value % 360. At the same time, it supports coefficient adjustment and random fluctuation factor adjustment, presented in a fixed format expression, such as fun="10\*sin(x)+100\*random(5)", where x represents the angle, ranging from 0 to 360 degrees, and the growth step size is consistent with the time column step size. 10 represents the coefficient of multiplication, 100 represents the coefficient of addition or subtraction, and 5 represents the fluctuation range within a random range of 5%. The currently supported data types are int, bigint, float, and double. Note: The expression is fixed and cannot be reversed.
|
||||||
|
|
||||||
- **values**: The value field of the nchar/binary column/label, which will be chosen randomly from the values.
|
- **values**: The value field of the nchar/binary column/label, which will be chosen randomly from the values.
|
||||||
|
|
|
@ -1187,7 +1187,7 @@ CSUM(expr)
|
||||||
### DERIVATIVE
|
### DERIVATIVE
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
DERIVATIVE(expr, time_inerval, ignore_negative)
|
DERIVATIVE(expr, time_interval, ignore_negative)
|
||||||
|
|
||||||
ignore_negative: {
|
ignore_negative: {
|
||||||
0
|
0
|
||||||
|
|
|
@ -41,7 +41,7 @@ In this article, it specifically refers to the level within the secondary compre
|
||||||
### Create Table with Compression
|
### Create Table with Compression
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
CREATE [dbname.]tabname (colName colType [ENCODE 'encode_type'] [COMPRESS 'compress_type' [LEVEL 'level'], [, other cerate_definition]...])
|
CREATE [dbname.]tabname (colName colType [ENCODE 'encode_type'] [COMPRESS 'compress_type' [LEVEL 'level'], [, other create_definition]...])
|
||||||
```
|
```
|
||||||
|
|
||||||
**Parameter Description**
|
**Parameter Description**
|
||||||
|
@ -58,7 +58,7 @@ CREATE [dbname.]tabname (colName colType [ENCODE 'encode_type'] [COMPRESS 'compr
|
||||||
### Change Compression Method
|
### Change Compression Method
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
ALTER TABLE [db_name.]tabName MODIFY COLUMN colName [ENCODE 'ecode_type'] [COMPRESS 'compress_type'] [LEVEL "high"]
|
ALTER TABLE [db_name.]tabName MODIFY COLUMN colName [ENCODE 'encode_type'] [COMPRESS 'compress_type'] [LEVEL "high"]
|
||||||
```
|
```
|
||||||
|
|
||||||
**Parameter Description**
|
**Parameter Description**
|
||||||
|
|
|
@ -125,7 +125,7 @@ where `TOKEN` is the string after Base64 encoding of `{username}:{password}`, e.
|
||||||
Starting from `TDengine 3.0.3.0`, `taosAdapter` provides a configuration parameter `httpCodeServerError` to set whether to return a non-200 http status code when the C interface returns an error
|
Starting from `TDengine 3.0.3.0`, `taosAdapter` provides a configuration parameter `httpCodeServerError` to set whether to return a non-200 http status code when the C interface returns an error
|
||||||
|
|
||||||
| **Description** | **httpCodeServerError false** | **httpCodeServerError true** |
|
| **Description** | **httpCodeServerError false** | **httpCodeServerError true** |
|
||||||
|--------------------|---------------------------- ------|---------------------------------------|
|
|--------------------|----------------------------------|---------------------------------------|
|
||||||
| taos_errno() returns 0 | 200 | 200 |
|
| taos_errno() returns 0 | 200 | 200 |
|
||||||
| taos_errno() returns non-0 | 200 (except authentication error) | 500 (except authentication error and 400/502 error) |
|
| taos_errno() returns non-0 | 200 (except authentication error) | 500 (except authentication error and 400/502 error) |
|
||||||
| Parameter error | 400 (only handle HTTP request URL parameter error) | 400 (handle HTTP request URL parameter error and taosd return error) |
|
| Parameter error | 400 (only handle HTTP request URL parameter error) | 400 (handle HTTP request URL parameter error and taosd return error) |
|
||||||
|
|
|
@ -4,7 +4,7 @@ sidebar_label: Load Balance
|
||||||
description: This document describes how TDengine implements load balancing.
|
description: This document describes how TDengine implements load balancing.
|
||||||
---
|
---
|
||||||
|
|
||||||
The load balance in TDengine is mainly about processing data series data. TDengine employes builtin hash algorithm to distribute all the tables, sub-tables and their data of a database across all the vgroups that belongs to the database. Each table or sub-table can only be handled by a single vgroup, while each vgroup can process multiple table or sub-table.
|
The load balance in TDengine is mainly about processing data series data. TDengine employs builtin hash algorithm to distribute all the tables, sub-tables and their data of a database across all the vgroups that belongs to the database. Each table or sub-table can only be handled by a single vgroup, while each vgroup can process multiple table or sub-table.
|
||||||
|
|
||||||
The number of vgroup can be specified when creating a database, using the parameter `vgroups`.
|
The number of vgroup can be specified when creating a database, using the parameter `vgroups`.
|
||||||
|
|
||||||
|
@ -12,10 +12,10 @@ The number of vgroup can be specified when creating a database, using the parame
|
||||||
create database db0 vgroups 100;
|
create database db0 vgroups 100;
|
||||||
```
|
```
|
||||||
|
|
||||||
The proper value of `vgroups` depends on available system resources. Assuming there is only one database to be created in the system, then the number of `vgroups` is determined by the available resources from all dnodes. In principle more vgroups can be created if you have more CPU and memory. Disk I/O is another important factor to consider. Once the bottleneck shows on disk I/O, more vgroups may downgrad the system performance significantly. If multiple databases are to be created in the system, then the total number of `vroups` of all the databases are dependent on the available system resources. It needs to be careful to distribute vgroups among these databases, you need to consider the number of tables, data writing frequency, size of each data row for all these databases. A recommended practice is to firstly choose a starting number for `vgroups`, for example double of the number of CPU cores, then try to adjust and optimize system configurations to find the best setting for `vgroups`, then distribute these vgroups among databases.
|
The proper value of `vgroups` depends on available system resources. Assuming there is only one database to be created in the system, then the number of `vgroups` is determined by the available resources from all dnodes. In principle more vgroups can be created if you have more CPU and memory. Disk I/O is another important factor to consider. Once the bottleneck shows on disk I/O, more vgroups may degrade the system performance significantly. If multiple databases are to be created in the system, then the total number of `vgroups` of all the databases are dependent on the available system resources. It needs to be careful to distribute vgroups among these databases, you need to consider the number of tables, data writing frequency, size of each data row for all these databases. A recommended practice is to firstly choose a starting number for `vgroups`, for example double of the number of CPU cores, then try to adjust and optimize system configurations to find the best setting for `vgroups`, then distribute these vgroups among databases.
|
||||||
|
|
||||||
Furthermode, TDengine distributes the vgroups of each database equally among all dnodes. In case of replica 3, the distribution is even more complex, TDengine tries its best to prevent any dnode from becoming a bottleneck.
|
Furthermore, TDengine distributes the vgroups of each database equally among all dnodes. In case of replica 3, the distribution is even more complex, TDengine tries its best to prevent any dnode from becoming a bottleneck.
|
||||||
|
|
||||||
TDegnine utilizes the above ways to achieve load balance in a cluster, and finally achieve higher throughput.
|
TDengine utilizes the above ways to achieve load balance in a cluster, and finally achieve higher throughput.
|
||||||
|
|
||||||
Once the load balance is achieved, after some operations like deleting tables or dropping databases, the load across all dnodes may become imbalanced, the method of rebalance will be provided in later versions. However, even without explicit rebalancing, TDengine will try its best to achieve new balance without manual interfering when a new database is created.
|
Once the load balance is achieved, after some operations like deleting tables or dropping databases, the load across all dnodes may become imbalanced, the method of rebalance will be provided in later versions. However, even without explicit rebalancing, TDengine will try its best to achieve new balance without manual interfering when a new database is created.
|
||||||
|
|
|
@ -364,6 +364,8 @@ taosBenchmark -A INT,DOUBLE,NCHAR,BINARY\(16\)
|
||||||
|
|
||||||
- **max** : 数据类型的 列/标签 的最大值。生成的值将小于最小值。
|
- **max** : 数据类型的 列/标签 的最大值。生成的值将小于最小值。
|
||||||
|
|
||||||
|
- **scalingFactor** : 浮点数精度增强因子,仅当数据类型是 float/double 时生效,有效值范围为 1 至 1000000 的正整数。用于增强生成浮点数的精度,特别是在 min 或 max 值较小的情况下。此属性按 10 的幂次增强小数点后的精度:scalingFactor 为 10 表示增强 1 位小数精度,100 表示增强 2 位,依此类推。
|
||||||
|
|
||||||
- **fun** : 此列数据以函数填充,目前只支持 sin 和 cos 两函数,输入参数为时间戳换算成角度值,换算公式: 角度 x = 输入的时间列ts值 % 360。同时支持系数调节,随机波动因子调节,以固定格式的表达式展现,如 fun=“10\*sin(x)+100\*random(5)” , x 表示角度,取值 0 ~ 360度,增长步长与时间列步长一致。10 表示乘的系数,100 表示加或减的系数,5 表示波动幅度在 5% 的随机范围内。目前支持的数据类型为 int, bigint, float, double 四种数据类型。注意:表达式为固定模式,不可前后颠倒。
|
- **fun** : 此列数据以函数填充,目前只支持 sin 和 cos 两函数,输入参数为时间戳换算成角度值,换算公式: 角度 x = 输入的时间列ts值 % 360。同时支持系数调节,随机波动因子调节,以固定格式的表达式展现,如 fun=“10\*sin(x)+100\*random(5)” , x 表示角度,取值 0 ~ 360度,增长步长与时间列步长一致。10 表示乘的系数,100 表示加或减的系数,5 表示波动幅度在 5% 的随机范围内。目前支持的数据类型为 int, bigint, float, double 四种数据类型。注意:表达式为固定模式,不可前后颠倒。
|
||||||
|
|
||||||
- **values** : nchar/binary 列/标签的值域,将从值中随机选择。
|
- **values** : nchar/binary 列/标签的值域,将从值中随机选择。
|
||||||
|
|
|
@ -34,8 +34,8 @@ TDengine 版本更新往往会增加新的功能特性,列表中的连接器
|
||||||
| **3.0.0.0 及以上** | 3.0.2以上 | 当前版本 | 3.0 分支 | 3.0.0 | 3.1.0 | 当前版本 | 与 TDengine 相同版本 |
|
| **3.0.0.0 及以上** | 3.0.2以上 | 当前版本 | 3.0 分支 | 3.0.0 | 3.1.0 | 当前版本 | 与 TDengine 相同版本 |
|
||||||
| **2.4.0.14 及以上** | 2.0.38 | 当前版本 | develop 分支 | 1.0.2 - 1.0.6 | 2.0.10 - 2.0.12 | 当前版本 | 与 TDengine 相同版本 |
|
| **2.4.0.14 及以上** | 2.0.38 | 当前版本 | develop 分支 | 1.0.2 - 1.0.6 | 2.0.10 - 2.0.12 | 当前版本 | 与 TDengine 相同版本 |
|
||||||
| **2.4.0.4 - 2.4.0.13** | 2.0.37 | 当前版本 | develop 分支 | 1.0.2 - 1.0.6 | 2.0.10 - 2.0.12 | 当前版本 | 与 TDengine 相同版本 |
|
| **2.4.0.4 - 2.4.0.13** | 2.0.37 | 当前版本 | develop 分支 | 1.0.2 - 1.0.6 | 2.0.10 - 2.0.12 | 当前版本 | 与 TDengine 相同版本 |
|
||||||
| **2.2.x.x ** | 2.0.36 | 当前版本 | master 分支 | n/a | 2.0.7 - 2.0.9 | 当前版本 | 与 TDengine 相同版本 |
|
| **2.2.x.x** | 2.0.36 | 当前版本 | master 分支 | n/a | 2.0.7 - 2.0.9 | 当前版本 | 与 TDengine 相同版本 |
|
||||||
| **2.0.x.x ** | 2.0.34 | 当前版本 | master 分支 | n/a | 2.0.1 - 2.0.6 | 当前版本 | 与 TDengine 相同版本 |
|
| **2.0.x.x** | 2.0.34 | 当前版本 | master 分支 | n/a | 2.0.1 - 2.0.6 | 当前版本 | 与 TDengine 相同版本 |
|
||||||
|
|
||||||
## 功能特性
|
## 功能特性
|
||||||
|
|
||||||
|
|
|
@ -353,6 +353,7 @@ typedef struct {
|
||||||
queue node;
|
queue node;
|
||||||
void (*freeFunc)(void* arg);
|
void (*freeFunc)(void* arg);
|
||||||
int32_t size;
|
int32_t size;
|
||||||
|
int8_t inited;
|
||||||
} STransQueue;
|
} STransQueue;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -127,10 +127,12 @@ typedef struct {
|
||||||
typedef struct SCliReq {
|
typedef struct SCliReq {
|
||||||
SReqCtx* ctx;
|
SReqCtx* ctx;
|
||||||
queue q;
|
queue q;
|
||||||
|
queue sendQ;
|
||||||
STransMsgType type;
|
STransMsgType type;
|
||||||
uint64_t st;
|
uint64_t st;
|
||||||
int64_t seq;
|
int64_t seq;
|
||||||
int32_t sent; //(0: no send, 1: alread sent)
|
int32_t sent; //(0: no send, 1: alread sent)
|
||||||
|
int8_t inSendQ;
|
||||||
STransMsg msg;
|
STransMsg msg;
|
||||||
int8_t inRetry;
|
int8_t inRetry;
|
||||||
|
|
||||||
|
@ -274,6 +276,8 @@ static FORCE_INLINE void destroyReqAndAhanlde(void* cmsg);
|
||||||
static FORCE_INLINE int cliRBChoseIdx(STrans* pInst);
|
static FORCE_INLINE int cliRBChoseIdx(STrans* pInst);
|
||||||
static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx);
|
static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx);
|
||||||
|
|
||||||
|
static FORCE_INLINE void removeReqFromSendQ(SCliReq* pReq);
|
||||||
|
|
||||||
static int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq);
|
static int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq);
|
||||||
static int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead);
|
static int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead);
|
||||||
static int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp);
|
static int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp);
|
||||||
|
@ -453,6 +457,7 @@ static bool filteBySeq(void* key, void* arg) {
|
||||||
SFiterArg* targ = arg;
|
SFiterArg* targ = arg;
|
||||||
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
|
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
|
||||||
if (pReq->seq == targ->seq && pReq->msg.msgType + 1 == targ->msgType) {
|
if (pReq->seq == targ->seq && pReq->msg.msgType + 1 == targ->msgType) {
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
|
@ -539,6 +544,7 @@ bool filterByQid(void* key, void* arg) {
|
||||||
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
|
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
|
||||||
|
|
||||||
if (pReq->msg.info.qId == *qid) {
|
if (pReq->msg.info.qId == *qid) {
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
|
@ -600,7 +606,7 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead
|
||||||
queue* el = QUEUE_HEAD(&set);
|
queue* el = QUEUE_HEAD(&set);
|
||||||
QUEUE_REMOVE(el);
|
QUEUE_REMOVE(el);
|
||||||
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
STraceId* trace = &pReq->msg.info.traceId;
|
STraceId* trace = &pReq->msg.info.traceId;
|
||||||
tGDebug("start to free msg %p", pReq);
|
tGDebug("start to free msg %p", pReq);
|
||||||
destroyReqWrapper(pReq, pThrd);
|
destroyReqWrapper(pReq, pThrd);
|
||||||
|
@ -700,6 +706,7 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
|
|
||||||
code = cliBuildRespFromCont(pReq, &resp, pHead);
|
code = cliBuildRespFromCont(pReq, &resp, pHead);
|
||||||
STraceId* trace = &resp.info.traceId;
|
STraceId* trace = &resp.info.traceId;
|
||||||
|
@ -905,6 +912,10 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SCliThrd* thrd = conn->hostThrd;
|
SCliThrd* thrd = conn->hostThrd;
|
||||||
|
if (thrd->quit == true) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
cliResetConnTimer(conn);
|
cliResetConnTimer(conn);
|
||||||
if (conn->list == NULL && conn->dstAddr != NULL) {
|
if (conn->list == NULL && conn->dstAddr != NULL) {
|
||||||
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr));
|
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr));
|
||||||
|
@ -1092,6 +1103,7 @@ _failed:
|
||||||
transQueueDestroy(&conn->reqsToSend);
|
transQueueDestroy(&conn->reqsToSend);
|
||||||
transQueueDestroy(&conn->reqsSentOut);
|
transQueueDestroy(&conn->reqsSentOut);
|
||||||
taosMemoryFree(conn->dstAddr);
|
taosMemoryFree(conn->dstAddr);
|
||||||
|
taosMemoryFree(conn->ipStr);
|
||||||
}
|
}
|
||||||
tError("failed to create conn, code:%d", code);
|
tError("failed to create conn, code:%d", code);
|
||||||
taosMemoryFree(conn);
|
taosMemoryFree(conn);
|
||||||
|
@ -1216,6 +1228,7 @@ static FORCE_INLINE void destroyReqInQueue(SCliConn* conn, queue* set, int32_t c
|
||||||
QUEUE_REMOVE(el);
|
QUEUE_REMOVE(el);
|
||||||
|
|
||||||
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
notifyAndDestroyReq(conn, pReq, code);
|
notifyAndDestroyReq(conn, pReq, code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1246,8 +1259,8 @@ static void cliHandleException(SCliConn* conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cliDestroyAllQidFromThrd(conn);
|
cliDestroyAllQidFromThrd(conn);
|
||||||
|
if (pThrd->quit == false && conn->list) {
|
||||||
QUEUE_REMOVE(&conn->q);
|
QUEUE_REMOVE(&conn->q);
|
||||||
if (conn->list) {
|
|
||||||
conn->list->totalSize -= 1;
|
conn->list->totalSize -= 1;
|
||||||
conn->list = NULL;
|
conn->list = NULL;
|
||||||
}
|
}
|
||||||
|
@ -1273,7 +1286,8 @@ static void cliHandleException(SCliConn* conn) {
|
||||||
bool filterToRmReq(void* h, void* arg) {
|
bool filterToRmReq(void* h, void* arg) {
|
||||||
queue* el = h;
|
queue* el = h;
|
||||||
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
|
||||||
if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) {
|
if (pReq->sent == 1 && pReq->inSendQ == 0 && REQUEST_NO_RESP(&pReq->msg)) {
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -1300,12 +1314,18 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
|
||||||
SCliThrd* pThrd = conn->hostThrd;
|
SCliThrd* pThrd = conn->hostThrd;
|
||||||
STrans* pInst = pThrd->pInst;
|
STrans* pInst = pThrd->pInst;
|
||||||
|
|
||||||
|
while (!QUEUE_IS_EMPTY(&wrapper->node)) {
|
||||||
|
queue* h = QUEUE_HEAD(&wrapper->node);
|
||||||
|
SCliReq* pReq = QUEUE_DATA(h, SCliReq, sendQ);
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
|
}
|
||||||
freeWReqToWQ(&conn->wq, wrapper);
|
freeWReqToWQ(&conn->wq, wrapper);
|
||||||
|
|
||||||
int32_t ref = transUnrefCliHandle(conn);
|
int32_t ref = transUnrefCliHandle(conn);
|
||||||
if (ref <= 0) {
|
if (ref <= 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
cliConnRmReqs(conn);
|
cliConnRmReqs(conn);
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
tDebug("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
|
tDebug("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
|
||||||
|
@ -1340,6 +1360,9 @@ bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msg
|
||||||
}
|
}
|
||||||
STransMsgHead* pHead = *ppHead;
|
STransMsgHead* pHead = *ppHead;
|
||||||
STransMsgHead* tHead = taosMemoryCalloc(1, *msgLen + sizeof(pInst->user));
|
STransMsgHead* tHead = taosMemoryCalloc(1, *msgLen + sizeof(pInst->user));
|
||||||
|
if (tHead == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
memcpy((char*)tHead, (char*)pHead, TRANS_MSG_OVERHEAD);
|
memcpy((char*)tHead, (char*)pHead, TRANS_MSG_OVERHEAD);
|
||||||
memcpy((char*)tHead + TRANS_MSG_OVERHEAD, pInst->user, sizeof(pInst->user));
|
memcpy((char*)tHead + TRANS_MSG_OVERHEAD, pInst->user, sizeof(pInst->user));
|
||||||
|
|
||||||
|
@ -1398,6 +1421,10 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
|
||||||
|
|
||||||
int j = 0;
|
int j = 0;
|
||||||
int32_t batchLimit = 64;
|
int32_t batchLimit = 64;
|
||||||
|
|
||||||
|
queue reqToSend;
|
||||||
|
QUEUE_INIT(&reqToSend);
|
||||||
|
|
||||||
while (!transQueueEmpty(&pConn->reqsToSend)) {
|
while (!transQueueEmpty(&pConn->reqsToSend)) {
|
||||||
queue* h = transQueuePop(&pConn->reqsToSend);
|
queue* h = transQueuePop(&pConn->reqsToSend);
|
||||||
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, q);
|
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, q);
|
||||||
|
@ -1422,6 +1449,10 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
|
||||||
if (cliConnMayAddUserInfo(pConn, &pHead, &msgLen)) {
|
if (cliConnMayAddUserInfo(pConn, &pHead, &msgLen)) {
|
||||||
content = transContFromHead(pHead);
|
content = transContFromHead(pHead);
|
||||||
contLen = transContLenFromMsg(msgLen);
|
contLen = transContLenFromMsg(msgLen);
|
||||||
|
} else {
|
||||||
|
if (pConn->userInited == 0) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (pHead->comp == 0) {
|
if (pHead->comp == 0) {
|
||||||
pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0;
|
pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0;
|
||||||
|
@ -1447,30 +1478,51 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) {
|
||||||
wb[j++] = uv_buf_init((char*)pHead, msgLen);
|
wb[j++] = uv_buf_init((char*)pHead, msgLen);
|
||||||
totalLen += msgLen;
|
totalLen += msgLen;
|
||||||
|
|
||||||
pCliMsg->sent = 1;
|
|
||||||
pCliMsg->seq = pConn->seq;
|
pCliMsg->seq = pConn->seq;
|
||||||
|
pCliMsg->sent = 1;
|
||||||
|
|
||||||
STraceId* trace = &pCliMsg->msg.info.traceId;
|
STraceId* trace = &pCliMsg->msg.info.traceId;
|
||||||
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn),
|
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn),
|
||||||
pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId);
|
pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId);
|
||||||
|
|
||||||
transQueuePush(&pConn->reqsSentOut, &pCliMsg->q);
|
transQueuePush(&pConn->reqsSentOut, &pCliMsg->q);
|
||||||
|
QUEUE_INIT(&pCliMsg->sendQ);
|
||||||
|
QUEUE_PUSH(&reqToSend, &pCliMsg->sendQ);
|
||||||
|
|
||||||
|
pCliMsg->inSendQ = 1;
|
||||||
if (j >= batchLimit) {
|
if (j >= batchLimit) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
transRefCliHandle(pConn);
|
transRefCliHandle(pConn);
|
||||||
uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn);
|
uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn);
|
||||||
|
|
||||||
if (req == NULL) {
|
if (req == NULL) {
|
||||||
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno));
|
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno));
|
||||||
|
while (!QUEUE_IS_EMPTY(&reqToSend)) {
|
||||||
|
queue* h = QUEUE_HEAD(&reqToSend);
|
||||||
|
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ);
|
||||||
|
removeReqFromSendQ(pCliMsg);
|
||||||
|
}
|
||||||
|
|
||||||
transRefCliHandle(pConn);
|
transRefCliHandle(pConn);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SWReqsWrapper* pWreq = req->data;
|
||||||
|
|
||||||
|
QUEUE_MOVE(&reqToSend, &pWreq->node);
|
||||||
tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, j, totalLen);
|
tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, j, totalLen);
|
||||||
|
|
||||||
int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb);
|
int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret));
|
tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret));
|
||||||
|
while (!QUEUE_IS_EMPTY(&pWreq->node)) {
|
||||||
|
queue* h = QUEUE_HEAD(&pWreq->node);
|
||||||
|
SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ);
|
||||||
|
removeReqFromSendQ(pCliMsg);
|
||||||
|
}
|
||||||
|
|
||||||
freeWReqToWQ(&pConn->wq, req->data);
|
freeWReqToWQ(&pConn->wq, req->data);
|
||||||
code = TSDB_CODE_THIRDPARTY_ERROR;
|
code = TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
TAOS_UNUSED(transUnrefCliHandle(pConn));
|
TAOS_UNUSED(transUnrefCliHandle(pConn));
|
||||||
|
@ -2182,11 +2234,21 @@ static void cliAsyncCb(uv_async_t* handle) {
|
||||||
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd, pThrd->stopMsg);
|
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd, pThrd->stopMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void removeReqFromSendQ(SCliReq* pReq) {
|
||||||
|
if (pReq == NULL || pReq->inSendQ == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
QUEUE_REMOVE(&pReq->sendQ);
|
||||||
|
pReq->inSendQ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void destroyReq(void* arg) {
|
static FORCE_INLINE void destroyReq(void* arg) {
|
||||||
SCliReq* pReq = arg;
|
SCliReq* pReq = arg;
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
STraceId* trace = &pReq->msg.info.traceId;
|
STraceId* trace = &pReq->msg.info.traceId;
|
||||||
tGDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx);
|
tGDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx);
|
||||||
|
|
||||||
|
@ -2961,6 +3023,7 @@ int32_t cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
|
||||||
STrans* pInst = pThrd->pInst;
|
STrans* pInst = pThrd->pInst;
|
||||||
|
|
||||||
if (pReq != NULL) {
|
if (pReq != NULL) {
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
if (pResp->code != TSDB_CODE_SUCCESS) {
|
if (pResp->code != TSDB_CODE_SUCCESS) {
|
||||||
if (cliMayRetry(pConn, pReq, pResp)) {
|
if (cliMayRetry(pConn, pReq, pResp)) {
|
||||||
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
|
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
|
||||||
|
@ -3114,7 +3177,7 @@ static int32_t transInitMsg(void* pInstRef, const SEpSet* pEpSet, STransMsg* pRe
|
||||||
if (ctx != NULL) pCtx->userCtx = *ctx;
|
if (ctx != NULL) pCtx->userCtx = *ctx;
|
||||||
|
|
||||||
pCliReq = taosMemoryCalloc(1, sizeof(SCliReq));
|
pCliReq = taosMemoryCalloc(1, sizeof(SCliReq));
|
||||||
if (pReq == NULL) {
|
if (pCliReq == NULL) {
|
||||||
TAOS_CHECK_GOTO(terrno, NULL, _exception);
|
TAOS_CHECK_GOTO(terrno, NULL, _exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3183,6 +3246,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int8_t transIdInited = 0;
|
||||||
|
|
||||||
STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef);
|
STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef);
|
||||||
if (pInst == NULL) {
|
if (pInst == NULL) {
|
||||||
|
@ -3200,6 +3264,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
|
||||||
if (exh == NULL) {
|
if (exh == NULL) {
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _exception);
|
TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _exception);
|
||||||
}
|
}
|
||||||
|
transIdInited = 1;
|
||||||
|
|
||||||
pReq->info.handle = (void*)(*transpointId);
|
pReq->info.handle = (void*)(*transpointId);
|
||||||
pReq->info.qId = *transpointId;
|
pReq->info.qId = *transpointId;
|
||||||
|
@ -3216,9 +3281,6 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
|
||||||
return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
|
return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (pReq->msgType == TDMT_SCH_DROP_TASK) {
|
|
||||||
// TAOS_UNUSED(transReleaseCliHandle(pReq->info.handle));
|
|
||||||
// }
|
|
||||||
transReleaseExHandle(transGetRefMgt(), *transpointId);
|
transReleaseExHandle(transGetRefMgt(), *transpointId);
|
||||||
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
|
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -3226,6 +3288,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg*
|
||||||
_exception:
|
_exception:
|
||||||
transFreeMsg(pReq->pCont);
|
transFreeMsg(pReq->pCont);
|
||||||
pReq->pCont = NULL;
|
pReq->pCont = NULL;
|
||||||
|
if (transIdInited) transReleaseExHandle(transGetRefMgt(), *transpointId);
|
||||||
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
|
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);
|
||||||
|
|
||||||
tError("failed to send request since %s", tstrerror(code));
|
tError("failed to send request since %s", tstrerror(code));
|
||||||
|
@ -3641,6 +3704,7 @@ bool filterTimeoutReq(void* key, void* arg) {
|
||||||
if (pReq->msg.info.qId == 0 && !REQUEST_NO_RESP(&pReq->msg) && pReq->ctx) {
|
if (pReq->msg.info.qId == 0 && !REQUEST_NO_RESP(&pReq->msg) && pReq->ctx) {
|
||||||
int64_t elapse = ((st - pReq->st) / 1000000);
|
int64_t elapse = ((st - pReq->st) / 1000000);
|
||||||
if (listArg && elapse >= listArg->pInst->readTimeout) {
|
if (listArg && elapse >= listArg->pInst->readTimeout) {
|
||||||
|
removeReqFromSendQ(pReq);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -423,6 +423,7 @@ int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) {
|
||||||
QUEUE_INIT(&wq->node);
|
QUEUE_INIT(&wq->node);
|
||||||
wq->freeFunc = (void (*)(void*))freeFunc;
|
wq->freeFunc = (void (*)(void*))freeFunc;
|
||||||
wq->size = 0;
|
wq->size = 0;
|
||||||
|
wq->inited = 1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
void transQueuePush(STransQueue* q, void* arg) {
|
void transQueuePush(STransQueue* q, void* arg) {
|
||||||
|
@ -497,6 +498,7 @@ void transQueueRemove(STransQueue* q, void* e) {
|
||||||
bool transQueueEmpty(STransQueue* q) { return q->size == 0 ? true : false; }
|
bool transQueueEmpty(STransQueue* q) { return q->size == 0 ? true : false; }
|
||||||
|
|
||||||
void transQueueClear(STransQueue* q) {
|
void transQueueClear(STransQueue* q) {
|
||||||
|
if (q->inited == 0) return;
|
||||||
while (!QUEUE_IS_EMPTY(&q->node)) {
|
while (!QUEUE_IS_EMPTY(&q->node)) {
|
||||||
queue* h = QUEUE_HEAD(&q->node);
|
queue* h = QUEUE_HEAD(&q->node);
|
||||||
QUEUE_REMOVE(h);
|
QUEUE_REMOVE(h);
|
||||||
|
|
|
@ -1289,7 +1289,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SWorkThrd* pThrd = hThrd;
|
SWorkThrd* pThrd = hThrd;
|
||||||
int32_t lino;
|
int32_t lino;
|
||||||
|
int8_t wqInited = 0;
|
||||||
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
|
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
|
||||||
|
@ -1340,6 +1340,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
|
||||||
|
|
||||||
code = initWQ(&pConn->wq);
|
code = initWQ(&pConn->wq);
|
||||||
TAOS_CHECK_GOTO(code, &lino, _end);
|
TAOS_CHECK_GOTO(code, &lino, _end);
|
||||||
|
wqInited = 1;
|
||||||
|
|
||||||
// init client handle
|
// init client handle
|
||||||
pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
|
pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
|
||||||
|
@ -1372,7 +1373,7 @@ _end:
|
||||||
transDestroyBuffer(&pConn->readBuf);
|
transDestroyBuffer(&pConn->readBuf);
|
||||||
taosHashCleanup(pConn->pQTable);
|
taosHashCleanup(pConn->pQTable);
|
||||||
taosMemoryFree(pConn->pTcp);
|
taosMemoryFree(pConn->pTcp);
|
||||||
destroyWQ(&pConn->wq);
|
if (wqInited) destroyWQ(&pConn->wq);
|
||||||
taosMemoryFree(pConn->buf);
|
taosMemoryFree(pConn->buf);
|
||||||
taosMemoryFree(pConn);
|
taosMemoryFree(pConn);
|
||||||
pConn = NULL;
|
pConn = NULL;
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
add_executable(transportTest "")
|
add_executable(transportTest "")
|
||||||
add_executable(transUT "")
|
add_executable(transUT "")
|
||||||
|
add_executable(transUT2 "")
|
||||||
add_executable(svrBench "")
|
add_executable(svrBench "")
|
||||||
add_executable(cliBench "")
|
add_executable(cliBench "")
|
||||||
add_executable(httpBench "")
|
add_executable(httpBench "")
|
||||||
|
@ -9,6 +10,10 @@ target_sources(transUT
|
||||||
"transUT.cpp"
|
"transUT.cpp"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
target_sources(transUT2
|
||||||
|
PRIVATE
|
||||||
|
"transUT2.cpp"
|
||||||
|
)
|
||||||
target_sources(transportTest
|
target_sources(transportTest
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"transportTests.cpp"
|
"transportTests.cpp"
|
||||||
|
@ -56,6 +61,20 @@ target_include_directories(transUT
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
target_link_libraries (transUT2
|
||||||
|
os
|
||||||
|
util
|
||||||
|
common
|
||||||
|
gtest_main
|
||||||
|
transport
|
||||||
|
)
|
||||||
|
|
||||||
|
target_include_directories(transUT2
|
||||||
|
PUBLIC
|
||||||
|
"${TD_SOURCE_DIR}/include/libs/transport"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
|
||||||
target_include_directories(svrBench
|
target_include_directories(svrBench
|
||||||
PUBLIC
|
PUBLIC
|
||||||
"${TD_SOURCE_DIR}/include/libs/transport"
|
"${TD_SOURCE_DIR}/include/libs/transport"
|
||||||
|
|
|
@ -53,8 +53,6 @@ static void processResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
||||||
pMsg->code);
|
pMsg->code);
|
||||||
|
|
||||||
if (pEpSet) pInfo->epSet = *pEpSet;
|
|
||||||
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
tsem_post(&pInfo->rspSem);
|
tsem_post(&pInfo->rspSem);
|
||||||
}
|
}
|
||||||
|
@ -72,12 +70,12 @@ static void *sendRequest(void *param) {
|
||||||
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
||||||
rpcMsg.contLen = pInfo->msgSize;
|
rpcMsg.contLen = pInfo->msgSize;
|
||||||
rpcMsg.info.ahandle = pInfo;
|
rpcMsg.info.ahandle = pInfo;
|
||||||
rpcMsg.info.noResp = 1;
|
rpcMsg.info.noResp = 0;
|
||||||
rpcMsg.msgType = 1;
|
rpcMsg.msgType = 1;
|
||||||
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
||||||
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
||||||
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||||
// tsem_wait(&pInfo->rspSem);
|
tsem_wait(&pInfo->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
tDebug("thread:%d, it is over", pInfo->index);
|
tDebug("thread:%d, it is over", pInfo->index);
|
||||||
|
@ -110,17 +108,15 @@ int main(int argc, char *argv[]) {
|
||||||
rpcInit.label = "APP";
|
rpcInit.label = "APP";
|
||||||
rpcInit.numOfThreads = 1;
|
rpcInit.numOfThreads = 1;
|
||||||
rpcInit.cfp = processResponse;
|
rpcInit.cfp = processResponse;
|
||||||
rpcInit.sessions = 100;
|
rpcInit.sessions = 1000;
|
||||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
rpcInit.user = "michael";
|
rpcInit.user = "michael";
|
||||||
|
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
rpcInit.connLimitNum = 10;
|
rpcInit.shareConnLimit = tsShareConnLimit;
|
||||||
rpcInit.connLimitLock = 1;
|
|
||||||
rpcInit.shareConnLimit = 16 * 1024;
|
|
||||||
rpcInit.supportBatch = 1;
|
rpcInit.supportBatch = 1;
|
||||||
|
rpcInit.compressSize = -1;
|
||||||
rpcDebugFlag = 135;
|
rpcDebugFlag = 143;
|
||||||
for (int i = 1; i < argc; ++i) {
|
for (int i = 1; i < argc; ++i) {
|
||||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||||
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
|
||||||
|
@ -139,6 +135,10 @@ int main(int argc, char *argv[]) {
|
||||||
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
||||||
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
||||||
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
||||||
|
} else if (strcmp(argv[i], "-l") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.shareConnLimit = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-c") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.compressSize = atoi(argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
||||||
rpcDebugFlag = atoi(argv[++i]);
|
rpcDebugFlag = atoi(argv[++i]);
|
||||||
} else {
|
} else {
|
||||||
|
@ -150,6 +150,8 @@ int main(int argc, char *argv[]) {
|
||||||
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
||||||
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
||||||
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
||||||
|
printf(" [-c compressSize]: compress size, default:%d\n", tsCompressMsgSize);
|
||||||
|
printf(" [-l shareConnLimit]: share conn limit, default:%d\n", tsShareConnLimit);
|
||||||
printf(" [-h help]: print out this help\n\n");
|
printf(" [-h help]: print out this help\n\n");
|
||||||
exit(0);
|
exit(0);
|
||||||
}
|
}
|
||||||
|
@ -168,18 +170,18 @@ int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
int64_t now = taosGetTimestampUs();
|
int64_t now = taosGetTimestampUs();
|
||||||
|
|
||||||
SInfo *pInfo = (SInfo *)taosMemoryCalloc(1, sizeof(SInfo) * appThreads);
|
SInfo **pInfo = (SInfo **)taosMemoryCalloc(1, sizeof(SInfo *) * appThreads);
|
||||||
SInfo *p = pInfo;
|
|
||||||
for (int i = 0; i < appThreads; ++i) {
|
for (int i = 0; i < appThreads; ++i) {
|
||||||
pInfo->index = i;
|
SInfo *p = taosMemoryCalloc(1, sizeof(SInfo));
|
||||||
pInfo->epSet = epSet;
|
p->index = i;
|
||||||
pInfo->numOfReqs = numOfReqs;
|
p->epSet = epSet;
|
||||||
pInfo->msgSize = msgSize;
|
p->numOfReqs = numOfReqs;
|
||||||
tsem_init(&pInfo->rspSem, 0, 0);
|
p->msgSize = msgSize;
|
||||||
pInfo->pRpc = pRpc;
|
tsem_init(&p->rspSem, 0, 0);
|
||||||
|
p->pRpc = pRpc;
|
||||||
|
pInfo[i] = p;
|
||||||
|
|
||||||
taosThreadCreate(&pInfo->thread, NULL, sendRequest, pInfo);
|
taosThreadCreate(&p->thread, NULL, sendRequest, pInfo[i]);
|
||||||
pInfo++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
do {
|
do {
|
||||||
|
@ -192,12 +194,14 @@ int main(int argc, char *argv[]) {
|
||||||
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
|
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
|
||||||
|
|
||||||
for (int i = 0; i < appThreads; i++) {
|
for (int i = 0; i < appThreads; i++) {
|
||||||
SInfo *pInfo = p;
|
SInfo *p = pInfo[i];
|
||||||
taosThreadJoin(pInfo->thread, NULL);
|
taosThreadJoin(p->thread, NULL);
|
||||||
p++;
|
taosMemoryFree(p);
|
||||||
}
|
}
|
||||||
int ch = getchar();
|
taosMemoryFree(pInfo);
|
||||||
UNUSED(ch);
|
|
||||||
|
// int ch = getchar();
|
||||||
|
// UNUSED(ch);
|
||||||
|
|
||||||
taosCloseLog();
|
taosCloseLog();
|
||||||
|
|
||||||
|
|
|
@ -76,23 +76,6 @@ void *processShellMsg(void *arg) {
|
||||||
|
|
||||||
for (int i = 0; i < numOfMsgs; ++i) {
|
for (int i = 0; i < numOfMsgs; ++i) {
|
||||||
taosGetQitem(qall, (void **)&pRpcMsg);
|
taosGetQitem(qall, (void **)&pRpcMsg);
|
||||||
|
|
||||||
if (pDataFile != NULL) {
|
|
||||||
if (taosWriteFile(pDataFile, pRpcMsg->pCont, pRpcMsg->contLen) < 0) {
|
|
||||||
tInfo("failed to write data file, reason:%s", strerror(errno));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (commit >= 2) {
|
|
||||||
num += numOfMsgs;
|
|
||||||
// if (taosFsync(pDataFile) < 0) {
|
|
||||||
// tInfo("failed to flush data to file, reason:%s", strerror(errno));
|
|
||||||
//}
|
|
||||||
|
|
||||||
if (num % 10000 == 0) {
|
|
||||||
tInfo("%d request have been written into disk", num);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosResetQitems(qall);
|
taosResetQitems(qall);
|
||||||
|
@ -107,16 +90,7 @@ void *processShellMsg(void *arg) {
|
||||||
rpcMsg.code = 0;
|
rpcMsg.code = 0;
|
||||||
rpcSendResponse(&rpcMsg);
|
rpcSendResponse(&rpcMsg);
|
||||||
|
|
||||||
void *handle = pRpcMsg->info.handle;
|
|
||||||
taosFreeQitem(pRpcMsg);
|
taosFreeQitem(pRpcMsg);
|
||||||
//{
|
|
||||||
// SRpcMsg nRpcMsg = {0};
|
|
||||||
// nRpcMsg.pCont = rpcMallocCont(msgSize);
|
|
||||||
// nRpcMsg.contLen = msgSize;
|
|
||||||
// nRpcMsg.info.handle = handle;
|
|
||||||
// nRpcMsg.code = TSDB_CODE_CTG_NOT_READY;
|
|
||||||
// rpcSendResponse(&nRpcMsg);
|
|
||||||
//}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosUpdateItemSize(qinfo.queue, numOfMsgs);
|
taosUpdateItemSize(qinfo.queue, numOfMsgs);
|
||||||
|
@ -149,12 +123,13 @@ int main(int argc, char *argv[]) {
|
||||||
rpcInit.localPort = 7000;
|
rpcInit.localPort = 7000;
|
||||||
memcpy(rpcInit.localFqdn, "localhost", strlen("localhost"));
|
memcpy(rpcInit.localFqdn, "localhost", strlen("localhost"));
|
||||||
rpcInit.label = "SER";
|
rpcInit.label = "SER";
|
||||||
rpcInit.numOfThreads = 1;
|
rpcInit.numOfThreads = 10;
|
||||||
rpcInit.cfp = processRequestMsg;
|
rpcInit.cfp = processRequestMsg;
|
||||||
rpcInit.idleTime = 2 * 1500;
|
rpcInit.idleTime = 2 * 1500;
|
||||||
|
|
||||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||||
rpcDebugFlag = 131;
|
rpcDebugFlag = 131;
|
||||||
|
rpcInit.compressSize = -1;
|
||||||
|
|
||||||
for (int i = 1; i < argc; ++i) {
|
for (int i = 1; i < argc; ++i) {
|
||||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||||
|
@ -205,8 +180,8 @@ int main(int argc, char *argv[]) {
|
||||||
if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno));
|
if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfAthread = 5;
|
int32_t numOfAthread = 1;
|
||||||
multiQ = taosMemoryMalloc(sizeof(numOfAthread));
|
multiQ = taosMemoryMalloc(sizeof(MultiThreadQhandle));
|
||||||
multiQ->numOfThread = numOfAthread;
|
multiQ->numOfThread = numOfAthread;
|
||||||
multiQ->qhandle = (STaosQueue **)taosMemoryMalloc(sizeof(STaosQueue *) * numOfAthread);
|
multiQ->qhandle = (STaosQueue **)taosMemoryMalloc(sizeof(STaosQueue *) * numOfAthread);
|
||||||
multiQ->qset = (STaosQset **)taosMemoryMalloc(sizeof(STaosQset *) * numOfAthread);
|
multiQ->qset = (STaosQset **)taosMemoryMalloc(sizeof(STaosQset *) * numOfAthread);
|
||||||
|
@ -221,11 +196,6 @@ int main(int argc, char *argv[]) {
|
||||||
threads[i].idx = i;
|
threads[i].idx = i;
|
||||||
taosThreadCreate(&(threads[i].thread), NULL, processShellMsg, (void *)&threads[i]);
|
taosThreadCreate(&(threads[i].thread), NULL, processShellMsg, (void *)&threads[i]);
|
||||||
}
|
}
|
||||||
// qhandle = taosOpenQueue();
|
|
||||||
// qset = taosOpenQset();
|
|
||||||
// taosAddIntoQset(qset, qhandle, NULL);
|
|
||||||
|
|
||||||
// processShellMsg();
|
|
||||||
|
|
||||||
if (pDataFile != NULL) {
|
if (pDataFile != NULL) {
|
||||||
taosCloseFile(&pDataFile);
|
taosCloseFile(&pDataFile);
|
||||||
|
|
|
@ -54,6 +54,7 @@ class Client {
|
||||||
rpcInit_.user = (char *)user;
|
rpcInit_.user = (char *)user;
|
||||||
rpcInit_.parent = this;
|
rpcInit_.parent = this;
|
||||||
rpcInit_.connType = TAOS_CONN_CLIENT;
|
rpcInit_.connType = TAOS_CONN_CLIENT;
|
||||||
|
rpcInit_.shareConnLimit = 200;
|
||||||
|
|
||||||
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
||||||
this->transCli = rpcOpen(&rpcInit_);
|
this->transCli = rpcOpen(&rpcInit_);
|
||||||
|
@ -85,6 +86,14 @@ class Client {
|
||||||
SemWait();
|
SemWait();
|
||||||
*resp = this->resp;
|
*resp = this->resp;
|
||||||
}
|
}
|
||||||
|
void sendReq(SRpcMsg *req) {
|
||||||
|
SEpSet epSet = {0};
|
||||||
|
epSet.inUse = 0;
|
||||||
|
addEpIntoEpSet(&epSet, "127.0.0.1", 7000);
|
||||||
|
|
||||||
|
rpcSendRequest(this->transCli, &epSet, req, NULL);
|
||||||
|
|
||||||
|
}
|
||||||
void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) {
|
void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) {
|
||||||
if (req->info.handle != NULL) {
|
if (req->info.handle != NULL) {
|
||||||
rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT);
|
rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT);
|
||||||
|
@ -160,6 +169,7 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
rpcMsg.contLen = 100;
|
rpcMsg.contLen = 100;
|
||||||
rpcMsg.info = pMsg->info;
|
rpcMsg.info = pMsg->info;
|
||||||
rpcMsg.code = 0;
|
rpcMsg.code = 0;
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
rpcSendResponse(&rpcMsg);
|
rpcSendResponse(&rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,6 +274,7 @@ class TransObj {
|
||||||
cli->Stop();
|
cli->Stop();
|
||||||
}
|
}
|
||||||
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
|
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
|
||||||
|
void cliSendReq(SRpcMsg *req) { cli->sendReq(req); }
|
||||||
void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); }
|
void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); }
|
||||||
|
|
||||||
~TransObj() {
|
~TransObj() {
|
||||||
|
@ -492,15 +503,16 @@ TEST_F(TransEnv, queryExcept) {
|
||||||
TEST_F(TransEnv, noResp) {
|
TEST_F(TransEnv, noResp) {
|
||||||
SRpcMsg resp = {0};
|
SRpcMsg resp = {0};
|
||||||
SRpcMsg req = {0};
|
SRpcMsg req = {0};
|
||||||
// for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 500000; i++) {
|
||||||
// memset(&req, 0, sizeof(req));
|
memset(&req, 0, sizeof(req));
|
||||||
// req.info.noResp = 1;
|
req.info.noResp = 1;
|
||||||
// req.msgType = 1;
|
req.msgType = 3;
|
||||||
// req.pCont = rpcMallocCont(10);
|
req.pCont = rpcMallocCont(10);
|
||||||
// req.contLen = 10;
|
req.contLen = 10;
|
||||||
// tr->cliSendAndRecv(&req, &resp);
|
tr->cliSendReq(&req);
|
||||||
//}
|
//tr->cliSendAndRecv(&req, &resp);
|
||||||
// taosMsleep(2000);
|
}
|
||||||
|
taosMsleep(2000);
|
||||||
|
|
||||||
// no resp
|
// no resp
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,529 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
|
||||||
|
* Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <cstdio>
|
||||||
|
#include <cstring>
|
||||||
|
#include "tdatablock.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "tmisce.h"
|
||||||
|
#include "transLog.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
#include "tversion.h"
|
||||||
|
using namespace std;
|
||||||
|
|
||||||
|
const char *label = "APP";
|
||||||
|
const char *secret = "secret";
|
||||||
|
const char *user = "user";
|
||||||
|
const char *ckey = "ckey";
|
||||||
|
|
||||||
|
class Server;
|
||||||
|
int port = 7000;
|
||||||
|
// server process
|
||||||
|
// server except
|
||||||
|
|
||||||
|
typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
|
||||||
|
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
// client process;
|
||||||
|
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
class Client {
|
||||||
|
public:
|
||||||
|
void Init(int nThread) {
|
||||||
|
memcpy(tsTempDir, TD_TMP_DIR_PATH, strlen(TD_TMP_DIR_PATH));
|
||||||
|
memset(&rpcInit_, 0, sizeof(rpcInit_));
|
||||||
|
rpcInit_.localPort = 0;
|
||||||
|
rpcInit_.label = (char *)"client";
|
||||||
|
rpcInit_.numOfThreads = nThread;
|
||||||
|
rpcInit_.cfp = processResp;
|
||||||
|
rpcInit_.user = (char *)user;
|
||||||
|
rpcInit_.parent = this;
|
||||||
|
rpcInit_.connType = TAOS_CONN_CLIENT;
|
||||||
|
rpcInit_.shareConnLimit = 200;
|
||||||
|
|
||||||
|
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
||||||
|
this->transCli = rpcOpen(&rpcInit_);
|
||||||
|
//tsem_init(&this->sem, 0, 0);
|
||||||
|
}
|
||||||
|
void SetResp(SRpcMsg *pMsg) {
|
||||||
|
// set up resp;
|
||||||
|
this->resp = *pMsg;
|
||||||
|
}
|
||||||
|
SRpcMsg *Resp() { return &this->resp; }
|
||||||
|
|
||||||
|
void Restart(CB cb) {
|
||||||
|
rpcClose(this->transCli);
|
||||||
|
rpcInit_.cfp = cb;
|
||||||
|
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
||||||
|
this->transCli = rpcOpen(&rpcInit_);
|
||||||
|
}
|
||||||
|
void Stop() {
|
||||||
|
rpcClose(this->transCli);
|
||||||
|
this->transCli = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SendAndRecv(SRpcMsg *req, SRpcMsg *resp) {
|
||||||
|
SEpSet epSet = {0};
|
||||||
|
epSet.inUse = 0;
|
||||||
|
addEpIntoEpSet(&epSet, "127.0.0.1", 7000);
|
||||||
|
|
||||||
|
rpcSendRequest(this->transCli, &epSet, req, NULL);
|
||||||
|
SemWait();
|
||||||
|
*resp = this->resp;
|
||||||
|
}
|
||||||
|
void sendReq(SRpcMsg *req) {
|
||||||
|
SEpSet epSet = {0};
|
||||||
|
epSet.inUse = 0;
|
||||||
|
addEpIntoEpSet(&epSet, "127.0.0.1", 7000);
|
||||||
|
|
||||||
|
rpcSendRequest(this->transCli, &epSet, req, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
void sendReqWithId(SRpcMsg *req, int64_t *id) {
|
||||||
|
SEpSet epSet = {0};
|
||||||
|
epSet.inUse = 0;
|
||||||
|
addEpIntoEpSet(&epSet, "127.0.0.1",7000);
|
||||||
|
rpcSendRequestWithCtx(this->transCli, &epSet, req, id, NULL);
|
||||||
|
|
||||||
|
}
|
||||||
|
void freeId(int64_t *id) {
|
||||||
|
rpcFreeConnById(this->transCli, *id);
|
||||||
|
}
|
||||||
|
void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) {
|
||||||
|
if (req->info.handle != NULL) {
|
||||||
|
rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT);
|
||||||
|
req->info.handle = NULL;
|
||||||
|
}
|
||||||
|
SendAndRecv(req, resp);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SemWait() { tsem_wait(&this->sem); }
|
||||||
|
void SemPost() { tsem_post(&this->sem); }
|
||||||
|
void Reset() {}
|
||||||
|
|
||||||
|
~Client() {
|
||||||
|
if (this->transCli) rpcClose(this->transCli);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
tsem_t sem;
|
||||||
|
SRpcInit rpcInit_;
|
||||||
|
void *transCli;
|
||||||
|
SRpcMsg resp;
|
||||||
|
};
|
||||||
|
class Server {
|
||||||
|
public:
|
||||||
|
Server() {
|
||||||
|
memcpy(tsTempDir, TD_TMP_DIR_PATH, strlen(TD_TMP_DIR_PATH));
|
||||||
|
memset(&rpcInit_, 0, sizeof(rpcInit_));
|
||||||
|
|
||||||
|
memcpy(rpcInit_.localFqdn, "localhost", strlen("localhost"));
|
||||||
|
rpcInit_.localPort = port;
|
||||||
|
rpcInit_.label = (char *)"server";
|
||||||
|
rpcInit_.numOfThreads = 5;
|
||||||
|
rpcInit_.cfp = processReq;
|
||||||
|
rpcInit_.user = (char *)user;
|
||||||
|
rpcInit_.connType = TAOS_CONN_SERVER;
|
||||||
|
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
|
||||||
|
}
|
||||||
|
void Start() {
|
||||||
|
this->transSrv = rpcOpen(&this->rpcInit_);
|
||||||
|
taosMsleep(1000);
|
||||||
|
}
|
||||||
|
void SetSrvContinueSend(CB cb) {
|
||||||
|
this->Stop();
|
||||||
|
rpcInit_.cfp = cb;
|
||||||
|
this->Start();
|
||||||
|
}
|
||||||
|
void Stop() {
|
||||||
|
if (this->transSrv == NULL) return;
|
||||||
|
rpcClose(this->transSrv);
|
||||||
|
this->transSrv = NULL;
|
||||||
|
}
|
||||||
|
void SetSrvSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
|
||||||
|
this->Stop();
|
||||||
|
rpcInit_.cfp = cfp;
|
||||||
|
this->Start();
|
||||||
|
}
|
||||||
|
void Restart() {
|
||||||
|
this->Stop();
|
||||||
|
this->Start();
|
||||||
|
}
|
||||||
|
~Server() {
|
||||||
|
if (this->transSrv) rpcClose(this->transSrv);
|
||||||
|
this->transSrv = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
SRpcInit rpcInit_;
|
||||||
|
void *transSrv;
|
||||||
|
};
|
||||||
|
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.pCont = rpcMallocCont(100);
|
||||||
|
rpcMsg.contLen = 100;
|
||||||
|
rpcMsg.info = pMsg->info;
|
||||||
|
rpcMsg.code = 0;
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
rpcSendResponse(&rpcMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
// for (int i = 0; i < 10; i++) {
|
||||||
|
// SRpcMsg rpcMsg = {0};
|
||||||
|
// rpcMsg.pCont = rpcMallocCont(100);
|
||||||
|
// rpcMsg.contLen = 100;
|
||||||
|
// rpcMsg.info = pMsg->info;
|
||||||
|
// rpcMsg.code = 0;
|
||||||
|
// rpcSendResponse(&rpcMsg);
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.pCont = rpcMallocCont(100);
|
||||||
|
rpcMsg.contLen = 100;
|
||||||
|
rpcMsg.info = pMsg->info;
|
||||||
|
rpcMsg.code = 0;
|
||||||
|
rpcSendResponse(&rpcMsg);
|
||||||
|
|
||||||
|
rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER);
|
||||||
|
}
|
||||||
|
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
// {
|
||||||
|
// SRpcMsg rpcMsg1 = {0};
|
||||||
|
// rpcMsg1.pCont = rpcMallocCont(100);
|
||||||
|
// rpcMsg1.contLen = 100;
|
||||||
|
// rpcMsg1.info = pMsg->info;
|
||||||
|
// rpcMsg1.code = 0;
|
||||||
|
// rpcRegisterBrokenLinkArg(&rpcMsg1);
|
||||||
|
// }
|
||||||
|
// taosMsleep(10);
|
||||||
|
|
||||||
|
// SRpcMsg rpcMsg = {0};
|
||||||
|
// rpcMsg.pCont = rpcMallocCont(100);
|
||||||
|
// rpcMsg.contLen = 100;
|
||||||
|
// rpcMsg.info = pMsg->info;
|
||||||
|
// rpcMsg.code = 0;
|
||||||
|
// rpcSendResponse(&rpcMsg);
|
||||||
|
}
|
||||||
|
// client process;
|
||||||
|
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
Client *client = (Client *)parent;
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
STraceId *trace = (STraceId *)&pMsg->info.traceId;
|
||||||
|
tGDebug("received resp %s",tstrerror(pMsg->code));
|
||||||
|
}
|
||||||
|
|
||||||
|
static void initEnv() {
|
||||||
|
dDebugFlag = 143;
|
||||||
|
vDebugFlag = 0;
|
||||||
|
mDebugFlag = 143;
|
||||||
|
cDebugFlag = 0;
|
||||||
|
jniDebugFlag = 0;
|
||||||
|
tmrDebugFlag = 143;
|
||||||
|
uDebugFlag = 143;
|
||||||
|
rpcDebugFlag = 143;
|
||||||
|
qDebugFlag = 0;
|
||||||
|
wDebugFlag = 0;
|
||||||
|
sDebugFlag = 0;
|
||||||
|
tsdbDebugFlag = 0;
|
||||||
|
tsLogEmbedded = 1;
|
||||||
|
tsAsyncLog = 0;
|
||||||
|
|
||||||
|
std::string path = TD_TMP_DIR_PATH "transport";
|
||||||
|
// taosRemoveDir(path.c_str());
|
||||||
|
taosMkDir(path.c_str());
|
||||||
|
|
||||||
|
tstrncpy(tsLogDir, path.c_str(), PATH_MAX);
|
||||||
|
if (taosInitLog("taosdlog", 1, false) != 0) {
|
||||||
|
printf("failed to init log file\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class TransObj {
|
||||||
|
public:
|
||||||
|
TransObj() {
|
||||||
|
initEnv();
|
||||||
|
cli = new Client;
|
||||||
|
cli->Init(1);
|
||||||
|
srv = new Server;
|
||||||
|
srv->Start();
|
||||||
|
}
|
||||||
|
|
||||||
|
void RestartCli(CB cb) {
|
||||||
|
//
|
||||||
|
cli->Restart(cb);
|
||||||
|
}
|
||||||
|
void StopSrv() {
|
||||||
|
//
|
||||||
|
srv->Stop();
|
||||||
|
}
|
||||||
|
// call when link broken, and notify query or fetch stop
|
||||||
|
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
|
||||||
|
///////
|
||||||
|
srv->SetSrvContinueSend(cfp);
|
||||||
|
}
|
||||||
|
void RestartSrv() { srv->Restart(); }
|
||||||
|
void StopCli() {
|
||||||
|
///////
|
||||||
|
cli->Stop();
|
||||||
|
}
|
||||||
|
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
|
||||||
|
void cliSendReq(SRpcMsg *req) { cli->sendReq(req); }
|
||||||
|
|
||||||
|
void cliSendReqWithId(SRpcMsg *req, int64_t *id) { cli->sendReqWithId(req, id);}
|
||||||
|
void cliFreeReqId(int64_t *id) { cli->freeId(id);}
|
||||||
|
void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); }
|
||||||
|
|
||||||
|
~TransObj() {
|
||||||
|
delete cli;
|
||||||
|
delete srv;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Client *cli;
|
||||||
|
Server *srv;
|
||||||
|
};
|
||||||
|
class TransEnv : public ::testing::Test {
|
||||||
|
protected:
|
||||||
|
virtual void SetUp() {
|
||||||
|
// set up trans obj
|
||||||
|
tr = new TransObj();
|
||||||
|
}
|
||||||
|
virtual void TearDown() {
|
||||||
|
// tear down
|
||||||
|
delete tr;
|
||||||
|
}
|
||||||
|
|
||||||
|
TransObj *tr = NULL;
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(TransEnv, 01sendAndRec) {
|
||||||
|
// for (int i = 0; i < 10; i++) {
|
||||||
|
// SRpcMsg req = {0}, resp = {0};
|
||||||
|
// req.msgType = 0;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// assert(resp.code == 0);
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TransEnv, 02StopServer) {
|
||||||
|
// for (int i = 0; i < 1; i++) {
|
||||||
|
// SRpcMsg req = {0}, resp = {0};
|
||||||
|
// req.msgType = 0;
|
||||||
|
// req.info.ahandle = (void *)0x35;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// assert(resp.code == 0);
|
||||||
|
// }
|
||||||
|
// SRpcMsg req = {0}, resp = {0};
|
||||||
|
// req.info.ahandle = (void *)0x35;
|
||||||
|
// req.msgType = 1;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->StopSrv();
|
||||||
|
// // tr->RestartSrv();
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// assert(resp.code != 0);
|
||||||
|
}
|
||||||
|
TEST_F(TransEnv, clientUserDefined) {
|
||||||
|
// tr->RestartSrv();
|
||||||
|
// for (int i = 0; i < 10; i++) {
|
||||||
|
// SRpcMsg req = {0}, resp = {0};
|
||||||
|
// req.msgType = 0;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// assert(resp.code == 0);
|
||||||
|
// }
|
||||||
|
|
||||||
|
//////////////////
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TransEnv, cliPersistHandle) {
|
||||||
|
// SRpcMsg resp = {0};
|
||||||
|
// void *handle = NULL;
|
||||||
|
// for (int i = 0; i < 10; i++) {
|
||||||
|
// SRpcMsg req = {0};
|
||||||
|
// req.info = resp.info;
|
||||||
|
// req.info.persistHandle = 1;
|
||||||
|
|
||||||
|
// req.msgType = 1;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// // if (i == 5) {
|
||||||
|
// // std::cout << "stop server" << std::endl;
|
||||||
|
// // tr->StopSrv();
|
||||||
|
// //}
|
||||||
|
// // if (i >= 6) {
|
||||||
|
// // EXPECT_TRUE(resp.code != 0);
|
||||||
|
// //}
|
||||||
|
// handle = resp.info.handle;
|
||||||
|
// }
|
||||||
|
// rpcReleaseHandle(handle, TAOS_CONN_CLIENT);
|
||||||
|
// for (int i = 0; i < 10; i++) {
|
||||||
|
// SRpcMsg req = {0};
|
||||||
|
// req.msgType = 1;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// taosMsleep(1000);
|
||||||
|
//////////////////
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TransEnv, srvReleaseHandle) {
|
||||||
|
// SRpcMsg resp = {0};
|
||||||
|
// tr->SetSrvContinueSend(processReleaseHandleCb);
|
||||||
|
// // tr->Restart(processReleaseHandleCb);
|
||||||
|
// void *handle = NULL;
|
||||||
|
// SRpcMsg req = {0};
|
||||||
|
// for (int i = 0; i < 1; i++) {
|
||||||
|
// memset(&req, 0, sizeof(req));
|
||||||
|
// req.info = resp.info;
|
||||||
|
// req.info.persistHandle = 1;
|
||||||
|
// req.msgType = 1;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// // tr->cliSendAndRecvNoHandle(&req, &resp);
|
||||||
|
// EXPECT_TRUE(resp.code == 0);
|
||||||
|
// }
|
||||||
|
//////////////////
|
||||||
|
}
|
||||||
|
// reopen later
|
||||||
|
// TEST_F(TransEnv, cliReleaseHandleExcept) {
|
||||||
|
// SRpcMsg resp = {0};
|
||||||
|
// SRpcMsg req = {0};
|
||||||
|
// for (int i = 0; i < 3; i++) {
|
||||||
|
// memset(&req, 0, sizeof(req));
|
||||||
|
// req.info = resp.info;
|
||||||
|
// req.info.persistHandle = 1;
|
||||||
|
// req.info.ahandle = (void *)1234;
|
||||||
|
// req.msgType = 1;
|
||||||
|
// req.pCont = rpcMallocCont(10);
|
||||||
|
// req.contLen = 10;
|
||||||
|
// tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// if (i == 1) {
|
||||||
|
// std::cout << "stop server" << std::endl;
|
||||||
|
// tr->StopSrv();
|
||||||
|
// }
|
||||||
|
// if (i > 1) {
|
||||||
|
// EXPECT_TRUE(resp.code != 0);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// //////////////////
|
||||||
|
//}
|
||||||
|
TEST_F(TransEnv, srvContinueSend) {
|
||||||
|
// tr->SetSrvContinueSend(processContinueSend);
|
||||||
|
// SRpcMsg req = {0}, resp = {0};
|
||||||
|
// for (int i = 0; i < 10; i++) {
|
||||||
|
// // memset(&req, 0, sizeof(req));
|
||||||
|
// // memset(&resp, 0, sizeof(resp));
|
||||||
|
// // req.msgType = 1;
|
||||||
|
// // req.pCont = rpcMallocCont(10);
|
||||||
|
// // req.contLen = 10;
|
||||||
|
// // tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// }
|
||||||
|
// taosMsleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TransEnv, srvPersistHandleExcept) {
|
||||||
|
// tr->SetSrvContinueSend(processContinueSend);
|
||||||
|
// // tr->SetCliPersistFp(cliPersistHandle);
|
||||||
|
// SRpcMsg resp = {0};
|
||||||
|
// SRpcMsg req = {0};
|
||||||
|
// for (int i = 0; i < 5; i++) {
|
||||||
|
// // memset(&req, 0, sizeof(req));
|
||||||
|
// // req.info = resp.info;
|
||||||
|
// // req.msgType = 1;
|
||||||
|
// // req.pCont = rpcMallocCont(10);
|
||||||
|
// // req.contLen = 10;
|
||||||
|
// // tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// // if (i > 2) {
|
||||||
|
// // tr->StopCli();
|
||||||
|
// // break;
|
||||||
|
// //}
|
||||||
|
// }
|
||||||
|
// taosMsleep(2000);
|
||||||
|
// conn broken
|
||||||
|
//
|
||||||
|
}
|
||||||
|
TEST_F(TransEnv, cliPersistHandleExcept) {
|
||||||
|
// tr->SetSrvContinueSend(processContinueSend);
|
||||||
|
// SRpcMsg resp = {0};
|
||||||
|
// SRpcMsg req = {0};
|
||||||
|
// for (int i = 0; i < 5; i++) {
|
||||||
|
// // memset(&req, 0, sizeof(req));
|
||||||
|
// // req.info = resp.info;
|
||||||
|
// // req.msgType = 1;
|
||||||
|
// // req.pCont = rpcMallocCont(10);
|
||||||
|
// // req.contLen = 10;
|
||||||
|
// // tr->cliSendAndRecv(&req, &resp);
|
||||||
|
// // if (i > 2) {
|
||||||
|
// // tr->StopSrv();
|
||||||
|
// // break;
|
||||||
|
// //}
|
||||||
|
// }
|
||||||
|
// taosMsleep(2000);
|
||||||
|
// // conn broken
|
||||||
|
//
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TransEnv, multiCliPersistHandleExcept) {
|
||||||
|
// conn broken
|
||||||
|
}
|
||||||
|
TEST_F(TransEnv, queryExcept) {
|
||||||
|
//taosMsleep(4 * 1000);
|
||||||
|
}
|
||||||
|
TEST_F(TransEnv, idTest) {
|
||||||
|
SRpcMsg resp = {0};
|
||||||
|
SRpcMsg req = {0};
|
||||||
|
for (int i = 0; i < 50000; i++) {
|
||||||
|
memset(&req, 0, sizeof(req));
|
||||||
|
req.info.noResp = 0;
|
||||||
|
req.msgType = 3;
|
||||||
|
req.pCont = rpcMallocCont(10);
|
||||||
|
req.contLen = 10;
|
||||||
|
int64_t id;
|
||||||
|
tr->cliSendReqWithId(&req, &id);
|
||||||
|
tr->cliFreeReqId(&id);
|
||||||
|
}
|
||||||
|
taosMsleep(1000);
|
||||||
|
// no resp
|
||||||
|
}
|
||||||
|
TEST_F(TransEnv, noResp) {
|
||||||
|
SRpcMsg resp = {0};
|
||||||
|
SRpcMsg req = {0};
|
||||||
|
for (int i = 0; i < 500000; i++) {
|
||||||
|
memset(&req, 0, sizeof(req));
|
||||||
|
req.info.noResp = 0;
|
||||||
|
req.msgType = 3;
|
||||||
|
req.pCont = rpcMallocCont(10);
|
||||||
|
req.contLen = 10;
|
||||||
|
tr->cliSendReq(&req);
|
||||||
|
//tr->cliSendAndRecv(&req, &resp);
|
||||||
|
}
|
||||||
|
taosMsleep(10000);
|
||||||
|
// no resp
|
||||||
|
}
|
Loading…
Reference in New Issue