Merge branch 'main' into fix/liaohj

This commit is contained in:
Haojun Liao 2023-10-09 10:14:53 +08:00
commit 74ff0c5b50
35 changed files with 539 additions and 207 deletions

View File

@ -349,7 +349,7 @@ You configure the following parameters when creating a consumer:
| `td.connect.port` | string | Port of the server side | |
| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. Each topic can create up to 100 consumer groups. |
| `client.id` | string | Client ID | Maximum length: 192. |
| `auto.offset.reset` | enum | Initial offset for the consumer group | `earliest`: subscribe from the earliest data, this is the default behavior; `latest`: subscribe from the latest data; or `none`: can't subscribe without committed offset|
| `auto.offset.reset` | enum | Initial offset for the consumer group | `earliest`: subscribe from the earliest data, this is the default behavior(version < 3.2.0.0); `latest`: subscribe from the latest data, this is the default behavior(version >= 3.2.0.0); or `none`: can't subscribe without committed offset|
| `enable.auto.commit` | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself | Default value is true |
| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages | default value: false
@ -455,7 +455,19 @@ from taos.tmq import Consumer
# Syntax: `consumer = Consumer(configs)`
#
# Example:
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
consumer = Consumer(
{
"group.id": "local",
"client.id": "1",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "1000",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
"msg.with.table.name": "true",
}
)
```
</TabItem>

View File

@ -10,76 +10,59 @@ description: How to use Seeq and TDengine to perform time series data analysis
Seeq is an advanced analytics software for the manufacturing industry and the Industrial Internet of Things (IIoT). Seeq supports the use of machine learning innovations within process manufacturing organizations. These capabilities enable organizations to deploy their own or third-party machine learning algorithms into advanced analytics applications used by frontline process engineers and subject matter experts, thus extending the efforts of a single data scientist to many frontline workers.
With the TDengine Java connector, Seeq effortlessly supports querying time series data provided by TDengine and offers functionalities such as data visualization, analysis, and forecasting.
TDengine can be added as a data source into Seeq via JDBC connector. Once data source is configured, Seeq can read data from TDengine and offers functionalities such as data visualization, analysis, and forecasting.
### Install Seeq
## Prerequisite
Please download Seeq Server and Seeq Data Lab software installation package from the [Seeq official website](https://www.seeq.com/customer-download).
1. Install Seeq Server and Seeq Data Lab software
2. Install TDengine or register TDengine Cloud service
### Install and start Seeq Server
```
tar xvzf seeq-server-xxx.tar.gz
cd seeq-server-installer
sudo ./install
sudo seeq service enable
sudo seeq start
```
### Install and start Seeq Data Lab Server
Seeq Data Lab needs to be installed on a separate server from Seeq Server and connected to Seeq Server through configuration. For detailed installation and configuration instructions, please refer to [the official documentation](https://support.seeq.com/space/KB/1034059842).
```
tar xvf seeq-data-lab-<version>-64bit-linux.tar.gz
sudo seeq-data-lab-installer/install -f /opt/seeq/seeq-data-lab -g /var/opt/seeq -u seeq
sudo seeq config set Network/DataLab/Hostname localhost
sudo seeq config set Network/DataLab/Port 34231 # the port of the Data Lab server (usually 34231)
sudo seeq config set Network/Hostname <value> # the host IP or URL of the main Seeq Server
# If the main Seeq server is configured to listen over HTTPS
sudo seeq config set Network/Webserver/SecurePort 443 # the secure port of the main Seeq Server (usually 443)
# If the main Seeq server is NOT configured to listen over HTTPS
sudo seeq config set Network/Webserver/Port <value>
#On the main Seeq server, open a Seeq Command Prompt and set the hostname of the Data Lab server:
sudo seeq config set Network/DataLab/Hostname <value> # the host IP (not URL) of the Data Lab server
sudo seeq config set Network/DataLab/Port 34231 # the port of the Data Lab server (usually 34231
```
### Install TDengine on-premise instance
See [Quick Install from Package](../../get-started).
### Or use TDengine Cloud
Register for a [TDengine Cloud](https://cloud.tdengine.com) account and log in to your account.
## Make Seeq be able to access TDengine
1. Get data location configuration
## Install TDengine JDBC connector
1. Get Seeq data location configuration
```
sudo seeq config get Folders/Data
```
2. Download TDengine Java connector from maven.org. Please use the latest version (Current is 3.2.5, https://repo1.maven.org/maven2/com/taosdata/jdbc/taos-jdbcdriver/3.2.5/taos-jdbcdriver-3.2.5-dist.jar).
2. Download the latest TDengine Java connector from maven.org (current is version is [3.2.5](https://repo1.maven.org/maven2/com/taosdata/jdbc/taos-jdbcdriver/3.2.5/taos-jdbcdriver-3.2.5-dist.jar)), and copy the JAR file into the_directory_found_in_step_1/plugins/lib/
3. Restart Seeq server
```
sudo seeq restart
```
4. Input License
## Add TDengine into Seeq's data source
1. Open Seeq, login as admin, go to Administration, click "Add Data Source"
2. For connector, choose SQL connector v2
3. Inside "Additional Configuration" input box, copy and paste the following
Use a browser to access ip:34216 and input the license according to the guide.
```
{
"QueryDefinitions": []
"Type": "GENERIC",
"Hostname": null,
"Port": 0,
"DatabaseName": null,
"Username": null,
"Password": null,
"InitialSql": null,
"TimeZone": null,
"PrintRows": false,
"UseWindowsAuth": false,
"SqlFetchBatchSize": 100000,
"UseSSL": false,
"JdbcProperties": null,
"GenericDatabaseConfig": {
"DatabaseJdbcUrl": "jdbc:TAOS://localhost:6030/?user=root&password=taosdata",
"ResolutionInNanoseconds": 1000,
"ZonedColumnTypes": []
}
}
```
## How to use Seeq to analyze time-series data that TDengine serves
Note: You need to replace DatabaseJdbcUrl with your setting. Please login TDengine cloud or open taosExplorer for enterprise edition, click programming -> Java to find yours. For the "QueryDefintions", please follow the examples below to write your own.
This chapter demonstrates how to use Seeq software in conjunction with TDengine for time series data analysis.
## Use Seeq to analyze time-series data stored inside TDengine
This chapter demonstrates how to use Seeq with TDengine for time series data analysis.
### Scenario Overview
@ -150,8 +133,8 @@ Please login with Seeq administrator and create a few data sources as following.
"Hostname": null,
"Port": 0,
"DatabaseName": null,
"Username": "root",
"Password": "taosdata",
"Username": null,
"Password": null,
"InitialSql": null,
"TimeZone": null,
"PrintRows": false,
@ -161,7 +144,6 @@ Please login with Seeq administrator and create a few data sources as following.
"JdbcProperties": null,
"GenericDatabaseConfig": {
"DatabaseJdbcUrl": "jdbc:TAOS-RS://127.0.0.1:6041/power?user=root&password=taosdata",
"SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver",
"ResolutionInNanoseconds": 1000,
"ZonedColumnTypes": []
}
@ -210,8 +192,8 @@ Please login with Seeq administrator and create a few data sources as following.
"Hostname": null,
"Port": 0,
"DatabaseName": null,
"Username": "root",
"Password": "taosdata",
"Username": null,
"Password": null,
"InitialSql": null,
"TimeZone": null,
"PrintRows": false,
@ -221,7 +203,6 @@ Please login with Seeq administrator and create a few data sources as following.
"JdbcProperties": null,
"GenericDatabaseConfig": {
"DatabaseJdbcUrl": "jdbc:TAOS-RS://127.0.0.1:6041/power?user=root&password=taosdata",
"SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver",
"ResolutionInNanoseconds": 1000,
"ZonedColumnTypes": []
}
@ -269,8 +250,8 @@ Please login with Seeq administrator and create a few data sources as following.
"Hostname": null,
"Port": 0,
"DatabaseName": null,
"Username": "root",
"Password": "taosdata",
"Username": null,
"Password": null,
"InitialSql": null,
"TimeZone": null,
"PrintRows": false,
@ -280,7 +261,6 @@ Please login with Seeq administrator and create a few data sources as following.
"JdbcProperties": null,
"GenericDatabaseConfig": {
"DatabaseJdbcUrl": "jdbc:TAOS-RS://127.0.0.1:6041/power?user=root&password=taosdata",
"SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver",
"ResolutionInNanoseconds": 1000,
"ZonedColumnTypes": []
}
@ -289,13 +269,13 @@ Please login with Seeq administrator and create a few data sources as following.
#### Launch Seeq Workbench
Please login to Seeq server with IP:port and create a new Seeq Workbench, then select data sources and choose the correct tools to do data visualization and analysis. Please refer to [the official documentation](https://support.seeq.com/space/KB/146440193/Seeq+Workbench) for the details.
Please login to Seeq server and create a new Seeq Workbench, then select data sources and choose the correct tools to do data visualization and analysis. Please refer to [the official documentation](https://support.seeq.com/space/KB/146440193/Seeq+Workbench) for the details.
![Seeq Workbench](./seeq/seeq-demo-workbench.webp)
#### Use Seeq Data Lab Server for advanced data analysis
Please login to the Seeq service with IP:port and create a new Seeq Data Lab. Then you can use advanced tools including Python environment and machine learning add-ons for more complex analysis.
Please login to the Seeq service and create a new Seeq Data Lab. Then you can use advanced tools including Python environment and machine learning add-ons for more complex analysis.
```Python
from seeq import spy
@ -409,8 +389,8 @@ Please note that when using TDengine Cloud, you need to specify the database nam
"Hostname": null,
"Port": 0,
"DatabaseName": null,
"Username": "root",
"Password": "taosdata",
"Username": null,
"Password": null,
"InitialSql": null,
"TimeZone": null,
"PrintRows": false,
@ -420,7 +400,6 @@ Please note that when using TDengine Cloud, you need to specify the database nam
"JdbcProperties": null,
"GenericDatabaseConfig": {
"DatabaseJdbcUrl": "jdbc:TAOS-RS://gw.cloud.taosdata.com?useSSL=true&token=41ac9d61d641b6b334e8b76f45f5a8XXXXXXXXXX",
"SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver",
"ResolutionInNanoseconds": 1000,
"ZonedColumnTypes": []
}
@ -433,8 +412,8 @@ Please note that when using TDengine Cloud, you need to specify the database nam
## Conclusion
By integrating Seeq and TDengine, it is possible to leverage the efficient storage and querying performance of TDengine while also benefiting from Seeq's powerful data visualization and analysis capabilities provided to users.
By integrating Seeq and TDengine, you can leverage the efficient storage and querying performance of TDengine while also benefiting from Seeq's powerful data visualization and analysis capabilities provided to users.
This integration allows users to take advantage of TDengine's high-performance time-series data storage and retrieval, ensuring efficient handling of large volumes of data. At the same time, Seeq provides advanced analytics features such as data visualization, anomaly detection, correlation analysis, and predictive modeling, enabling users to gain valuable insights and make data-driven decisions.
This integration allows users to take advantage of TDengine's high-performance time-series data storage and query, ensuring efficient handling of large volumes of data. At the same time, Seeq provides advanced analytics features such as data visualization, anomaly detection, correlation analysis, and predictive modeling, enabling users to gain valuable insights and make data-driven decisions.
Together, Seeq and TDengine provide a comprehensive solution for time series data analysis in diverse industries such as manufacturing, IIoT, and power systems. The combination of efficient data storage and advanced analytics empowers users to unlock the full potential of their time series data, driving operational improvements, and enabling predictive and prescriptive analytics applications.

View File

@ -348,7 +348,7 @@ CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
| `td.connect.port` | integer | 服务端的端口号 | |
| `group.id` | string | 消费组 ID同一消费组共享消费进度 | <br />**必填项**。最大长度192。<br />每个topic最多可建立100个 consumer group |
| `client.id` | string | 客户端 ID | 最大长度192。 |
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default;从头开始订阅; <br/>`latest`: 仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default(version < 3.2.0.0);从头开始订阅; <br/>`latest`: default(version >= 3.2.0.0);仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交true: 自动提交客户端应用无需commitfalse客户端应用需要自行commit | 默认值为 true |
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) |默认关闭 |
@ -456,7 +456,19 @@ from taos.tmq import Consumer
# Syntax: `consumer = Consumer(configs)`
#
# Example:
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
consumer = Consumer(
{
"group.id": "local",
"client.id": "1",
"enable.auto.commit": "true",
"auto.commit.interval.ms": "1000",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
"msg.with.table.name": "true",
}
)
```
</TabItem>

View File

@ -14,40 +14,7 @@ Seeq 是制造业和工业互联网IIOT高级分析软件。Seeq 支持在
### Seeq 安装方法
从 [Seeq 官网](https://www.seeq.com/customer-download)下载相关软件,例如 Seeq Server 和 Seeq Data Lab 等。
### Seeq Server 安装和启动
```
tar xvzf seeq-server-xxx.tar.gz
cd seeq-server-installer
sudo ./install
sudo seeq service enable
sudo seeq start
```
### Seeq Data Lab Server 安装和启动
Seeq Data Lab 需要安装在和 Seeq Server 不同的服务器上,并通过配置和 Seeq Server 互联。详细安装配置指令参见[Seeq 官方文档](https://support.seeq.com/space/KB/1034059842)。
```
tar xvf seeq-data-lab-<version>-64bit-linux.tar.gz
sudo seeq-data-lab-installer/install -f /opt/seeq/seeq-data-lab -g /var/opt/seeq -u seeq
sudo seeq config set Network/DataLab/Hostname localhost
sudo seeq config set Network/DataLab/Port 34231 # the port of the Data Lab server (usually 34231)
sudo seeq config set Network/Hostname <value> # the host IP or URL of the main Seeq Server
# If the main Seeq server is configured to listen over HTTPS
sudo seeq config set Network/Webserver/SecurePort 443 # the secure port of the main Seeq Server (usually 443)
# If the main Seeq server is NOT configured to listen over HTTPS
sudo seeq config set Network/Webserver/Port <value>
#On the main Seeq server, open a Seeq Command Prompt and set the hostname of the Data Lab server:
sudo seeq config set Network/DataLab/Hostname <value> # the host IP (not URL) of the Data Lab server
sudo seeq config set Network/DataLab/Port 34231 # the port of the Data Lab server (usually 34231
```
从 [Seeq 官网](https://www.seeq.com/customer-download)下载相关软件,例如 Seeq Server 和 Seeq Data Lab 等。Seeq Data Lab 需要安装在和 Seeq Server 不同的服务器上,并通过配置和 Seeq Server 互联。详细安装配置指令参见[Seeq 知识库]( https://support.seeq.com/kb/latest/cloud/)。
## TDengine 本地实例安装方法

View File

@ -683,7 +683,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
taosHashCleanup(hashTmp);
return -1;
return TSDB_CODE_SML_INVALID_DATA;
}
}
taosHashCleanup(hashTmp);

View File

@ -256,7 +256,8 @@ int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset) {
}
if (unlikely(index >= OTD_JSON_FIELDS_NUM)) {
uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start) return -1;
uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start);
return TSDB_CODE_TSC_INVALID_JSON;
}
char *sTmp = *start;
@ -367,7 +368,8 @@ int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset) {
if (unlikely(index != OTD_JSON_FIELDS_NUM) || element->tags == NULL || element->cols == NULL ||
element->measure == NULL || element->timestamp == NULL) {
uError("elements != %d or element parse null", OTD_JSON_FIELDS_NUM) return -1;
uError("elements != %d or element parse null", OTD_JSON_FIELDS_NUM);
return TSDB_CODE_TSC_INVALID_JSON;
}
return 0;
}
@ -381,7 +383,8 @@ int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset) {
}
if (unlikely(index >= OTD_JSON_FIELDS_NUM)) {
uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start) return -1;
uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start);
return TSDB_CODE_TSC_INVALID_JSON;
}
if ((*start)[1] == 'm') {
@ -448,7 +451,8 @@ int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset) {
}
if (unlikely(index != 0 && index != OTD_JSON_FIELDS_NUM)) {
uError("elements != %d", OTD_JSON_FIELDS_NUM) return -1;
uError("elements != %d", OTD_JSON_FIELDS_NUM);
return TSDB_CODE_TSC_INVALID_JSON;
}
return 0;
}
@ -477,7 +481,7 @@ static int32_t smlGetJsonElements(cJSON *root, cJSON ***marks) {
}
if (*marks[i] == NULL) {
uError("smlGetJsonElements error, not find mark:%d:%s", i, jsonName[i]);
return -1;
return TSDB_CODE_TSC_INVALID_JSON;
}
}
return TSDB_CODE_SUCCESS;
@ -816,25 +820,25 @@ static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPr
int32_t size = cJSON_GetArraySize(root);
if (unlikely(size != OTD_JSON_SUB_FIELDS_NUM)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *value = cJSON_GetObjectItem(root, "value");
if (unlikely(!cJSON_IsNumber(value))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *type = cJSON_GetObjectItem(root, "type");
if (unlikely(!cJSON_IsString(type))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
return TSDB_CODE_TSC_INVALID_JSON;
}
double timeDouble = value->valuedouble;
if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return -1;
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
if (timeDouble == 0) {
@ -849,32 +853,29 @@ static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPr
size_t typeLen = strlen(type->valuestring);
if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
// seconds
int8_t fromPrecision = TSDB_TIME_PRECISION_SECONDS;
// int8_t fromPrecision = TSDB_TIME_PRECISION_SECONDS;
if (smlFactorS[toPrecision] < INT64_MAX / tsInt64) {
return tsInt64 * smlFactorS[toPrecision];
}
return -1;
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
} else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
switch (type->valuestring[0]) {
case 'm':
case 'M':
// milliseconds
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MILLI, toPrecision);
break;
case 'u':
case 'U':
// microseconds
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MICRO, toPrecision);
break;
case 'n':
case 'N':
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_NANO, toPrecision);
break;
default:
return -1;
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
} else {
return -1;
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
}
@ -895,7 +896,7 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) {
double timeDouble = timestamp->valuedouble;
if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return -1;
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
if (unlikely(timeDouble < 0)) {
@ -911,14 +912,14 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) {
if (unlikely(fromPrecision == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf,
"timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", NULL);
return -1;
return TSDB_CODE_SML_INVALID_DATA;
}
int64_t tsInt64 = timeDouble;
if (fromPrecision == TSDB_TIME_PRECISION_SECONDS) {
if (smlFactorS[toPrecision] < INT64_MAX / tsInt64) {
return tsInt64 * smlFactorS[toPrecision];
}
return -1;
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
} else {
return convertTimePrecision(timeDouble, fromPrecision, toPrecision);
}
@ -926,7 +927,7 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) {
return smlParseTSFromJSONObj(info, timestamp, toPrecision);
} else {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
return TSDB_CODE_TSC_INVALID_JSON;
}
}

View File

@ -70,7 +70,7 @@ static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t le
int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
if (unlikely(ts == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
return -1;
return TSDB_CODE_SML_INVALID_DATA;
}
return ts;
}

View File

@ -26,8 +26,7 @@
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
#define OFFSET_IS_RESET_OFFSET(_of) ((_of) < 0)
#define DEFAULT_HEARTBEAT_INTERVAL 3000
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);
@ -63,8 +62,7 @@ struct tmq_conf_t {
int8_t resetOffset;
int8_t withTbName;
int8_t snapEnable;
int32_t snapBatchSize;
bool hbBgEnable;
// int32_t snapBatchSize;
uint16_t port;
int32_t autoCommitInterval;
char* ip;
@ -84,7 +82,6 @@ struct tmq_t {
int32_t autoCommitInterval;
int8_t resetOffsetCfg;
uint64_t consumerId;
bool hbBgEnable;
tmq_commit_cb* commitCb;
void* commitCbUserParam;
@ -276,8 +273,7 @@ tmq_conf_t* tmq_conf_new() {
conf->withTbName = false;
conf->autoCommit = true;
conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
conf->hbBgEnable = true;
conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
return conf;
}
@ -367,10 +363,10 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
}
if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
conf->snapBatchSize = taosStr2int64(value);
return TMQ_CONF_OK;
}
// if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
// conf->snapBatchSize = taosStr2int64(value);
// return TMQ_CONF_OK;
// }
// if (strcasecmp(key, "enable.heartbeat.background") == 0) {
// if (strcasecmp(value, "true") == 0) {
@ -847,7 +843,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
OVER:
tDeatroySMqHbReq(&req);
taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer);
taosReleaseRef(tmqMgmt.rsetId, refId);
}
@ -1106,8 +1102,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->resetOffsetCfg = conf->resetOffset;
taosInitRWLatch(&pTmq->lock);
pTmq->hbBgEnable = conf->hbBgEnable;
// assign consumerId
pTmq->consumerId = tGenIdPI64();
@ -1131,19 +1125,16 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
goto _failed;
}
if (pTmq->hbBgEnable) {
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId;
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
}
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId;
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, pRefId, tmqMgmt.timer);
char buf[TSDB_OFFSET_LEN] = {0};
STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
tFormatOffset(buf, tListLen(buf), &offset);
tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d",
pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
buf, pTmq->hbBgEnable);
", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s",
pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, buf);
return pTmq;

View File

@ -108,7 +108,6 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
if (pRequest->useSnapshot) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
consumerId, pHandle->subKey, vgId);
if (pHandle->fetchMeta) {
tqOffsetResetToMeta(pOffsetVal, 0);
} else {

View File

@ -34,7 +34,6 @@ typedef struct {
STFileSet *fset;
bool toData;
int32_t level;
SSttLvl *lvl;
TABLEID tbid[1];
} ctx[1];
@ -107,29 +106,63 @@ static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
merger->ctx->toData = true;
merger->ctx->level = 0;
// TODO: optimize merge strategy
for (int32_t i = 0;; ++i) {
if (i >= TARRAY2_SIZE(merger->ctx->fset->lvlArr)) {
merger->ctx->lvl = NULL;
break;
// find the highest level that can be merged to
for (int32_t i = 0, numCarry = 0;;) {
int32_t numFile = numCarry;
if (i < TARRAY2_SIZE(merger->ctx->fset->lvlArr) &&
merger->ctx->level == TARRAY2_GET(merger->ctx->fset->lvlArr, i)->level) {
numFile += TARRAY2_SIZE(TARRAY2_GET(merger->ctx->fset->lvlArr, i)->fobjArr);
i++;
}
merger->ctx->lvl = TARRAY2_GET(merger->ctx->fset->lvlArr, i);
if (merger->ctx->lvl->level != merger->ctx->level ||
TARRAY2_SIZE(merger->ctx->lvl->fobjArr) + 1 < merger->sttTrigger) {
numCarry = numFile / merger->sttTrigger;
if (numCarry == 0) {
break;
} else {
merger->ctx->level++;
}
}
ASSERT(merger->ctx->level > 0);
SSttLvl *lvl;
TARRAY2_FOREACH_REVERSE(merger->ctx->fset->lvlArr, lvl) {
if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
continue;
}
if (lvl->level <= merger->ctx->level) {
merger->ctx->toData = false;
merger->ctx->lvl = NULL;
}
break;
}
// get number of level-0 files to merge
int32_t numFile = pow(merger->sttTrigger, merger->ctx->level);
TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) {
if (lvl->level == 0) continue;
if (lvl->level >= merger->ctx->level) break;
numFile = numFile - TARRAY2_SIZE(lvl->fobjArr) * pow(merger->sttTrigger, lvl->level);
}
ASSERT(numFile >= 0);
// get file system operations
TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) {
if (lvl->level >= merger->ctx->level) {
break;
}
merger->ctx->level++;
int32_t numMergeFile;
if (lvl->level == 0) {
numMergeFile = numFile;
} else {
numMergeFile = TARRAY2_SIZE(lvl->fobjArr);
}
STFileObj *fobj;
int32_t numFile = 0;
TARRAY2_FOREACH(merger->ctx->lvl->fobjArr, fobj) {
if (numFile == merger->sttTrigger) {
break;
}
for (int32_t i = 0; i < numMergeFile; ++i) {
STFileObj *fobj = TARRAY2_GET(lvl->fobjArr, i);
STFileOp op = {
.optype = TSDB_FOP_REMOVE,
@ -151,8 +184,6 @@ static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
code = TARRAY2_APPEND(merger->sttReaderArr, reader);
TSDB_CHECK_CODE(code, lino, _exit);
numFile++;
}
}
@ -434,7 +465,9 @@ int32_t tsdbMerge(void *arg) {
.sttTrigger = tsdb->pVnode->config.sttTrigger,
}};
ASSERT(merger->sttTrigger > 1);
if (merger->sttTrigger <= 1) {
return 0;
}
code = tsdbFSCreateCopySnapshot(tsdb->pFS, &merger->fsetArr);
TSDB_CHECK_CODE(code, lino, _exit);

View File

@ -516,10 +516,13 @@ int32_t tGetDelData(uint8_t *p, void *ph) {
}
int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision) {
int64_t fid;
if (key < 0) {
return (int)((key + 1) / tsTickPerMin[precision] / minutes - 1);
fid = ((key + 1) / tsTickPerMin[precision] / minutes - 1);
return (fid < INT32_MIN) ? INT32_MIN : (int32_t)fid;
} else {
return (int)((key / tsTickPerMin[precision] / minutes));
fid = ((key / tsTickPerMin[precision] / minutes));
return (fid > INT32_MAX) ? INT32_MAX : (int32_t)fid;
}
}

View File

@ -1034,7 +1034,7 @@ int32_t doStreamIntervalEncodeOpState(void** buf, int32_t len, SOperatorInfo* pO
while ((pIte = taosHashIterate(pInfo->pPullDataMap, pIte)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen);
tlen += encodeSWinKey(buf, key);
SArray* pArray = (SArray*)pIte;
SArray* pArray = *(SArray**)pIte;
int32_t chSize = taosArrayGetSize(pArray);
tlen += taosEncodeFixedI32(buf, chSize);
for (int32_t i = 0; i < chSize; i++) {

View File

@ -103,10 +103,12 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
pInfo->minTS = -1;
pInfo->interval = adjustInterval(interval, precision);
pInfo->watermark = adjustWatermark(pInfo->interval, interval, watermark);
pInfo->numSBFs = 0;
uint64_t bfSize = 0;
if (!igUp) {
bfSize = (uint64_t)(pInfo->watermark / pInfo->interval);
pInfo->numSBFs = bfSize;
pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void *));
if (pInfo->pTsSBFs == NULL) {
@ -130,7 +132,6 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
}
pInfo->numSBFs = bfSize;
pInfo->maxDataVersion = 0;
return pInfo;
}

View File

@ -197,6 +197,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
SyncIndex index = toIndex;
SSyncRaftEntry* pEntry = NULL;
bool takeDummy = false;
int emptySize = (TSDB_SYNC_LOG_BUFFER_SIZE >> 1);
while (true) {
if (index <= pBuf->commitIndex) {
@ -210,7 +211,6 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
}
bool taken = false;
int emptySize = 5;
if (toIndex - index + 1 <= pBuf->size - emptySize) {
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
pBuf->entries[index % pBuf->size] = tmp;

22
tests/perf-test/build.sh Executable file
View File

@ -0,0 +1,22 @@
#! /bin/bash
set -x
cd $1
git reset --hard HEAD
git checkout -- .
git checkout $2
git pull
sed -i ':a;N;$!ba;s/\(.*\)OFF/\1ON/' $1/cmake/cmake.options
mkdir -p $1/debug
rm -rf $1/debug/*
cd $1/debug
cmake .. -DBUILD_TOOLS=true
cd $1/debug
make -j 4
cd $1/debug
make install
systemctl start taosd

View File

@ -0,0 +1,32 @@
import os
import subprocess
class BuildTDengine:
def __init__(self, host='vm96', path = '/root/pxiao/TDengine', branch = 'main') -> None:
self.host = host
self.path = path
self.branch = branch
def build(self):
parameters=[self.path, self.branch]
build_fild = "./build.sh"
try:
# Run the Bash script using subprocess
subprocess.run(['bash', build_fild] + parameters, check=True)
print("TDengine build successfully.")
except subprocess.CalledProcessError as e:
print(f"Error running Bash script: {e}")
except FileNotFoundError as e:
print(f"File not found: {e}")
def get_cmd_output(self, cmd):
try:
# Run the Bash command and capture the output
result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True, text=True)
# Access the output from the 'result' object
output = result.stdout
return output.strip()
except subprocess.CalledProcessError as e:
print(f"Error running Bash command: {e}")

View File

@ -0,0 +1,100 @@
import datetime
import json
class InsertJson:
def __init__(self, tables = 10000, records_per_table = 10000, interlace_rows = 0, stt_trigger = 1) -> None:
self.tables = tables
self.records_per_table = records_per_table
self.interlace_rows = interlace_rows
self.stt_trigger = stt_trigger
def get_db_cfg(self) -> dict:
return {
"name": "test",
"drop": "true",
"replica": 1,
"precision": "ms",
"cachemodel": "'both'",
"keep": 3650,
"minRows": 100,
"maxRows": 4096,
"comp": 2,
"vgroups": 10,
"stt_trigger": self.stt_trigger
}
def get_stb_cfg(self) -> list:
return [
{
"name": "meters",
"child_table_exists": "no",
"childtable_count": self.tables,
"childtable_prefix": "d",
"escape_character": "yes",
"auto_create_table": "no",
"batch_create_tbl_num": 5,
"data_source": "rand",
"insert_mode": "taosc",
"non_stop_mode": "no",
"line_protocol": "line",
"insert_rows": self.records_per_table,
"childtable_limit": 10000,
"childtable_offset": 100,
"interlace_rows": self.interlace_rows,
"insert_interval": 0,
"partial_col_num": 0,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 10,
"start_timestamp": "2022-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "./sample.csv",
"use_sample_ts": "no",
"tags_file": "",
"columns": self.get_column_list(),
"tags": self.get_tag_list()
}
]
def get_column_list(self) -> list:
return [
{"type": "FLOAT", "name": "current", "count": 1, "max": 12, "min": 8},
{"type": "INT", "name": "voltage", "max": 225, "min": 215},
{"type": "FLOAT", "name": "phase", "max": 1, "min": 0},
]
def get_tag_list(self) -> list:
return [
{ "type": "TINYINT", "name": "groupid", "max": 10, "min": 1 },
{ "name": "location", "type": "BINARY", "len": 16, "values": ["San Francisco", "Los Angles", "San Diego", "San Jose", "Palo Alto", "Campbell", "Mountain View", "Sunnyvale", "Santa Clara", "Cupertino"]}
]
def get_insert_cfg(self) -> dict:
return {
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 10,
"create_table_thread_count": 7,
"result_file": "/tmp/insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 1000,
"max_sql_len": 1024000,
"databases": [{
"dbinfo": self.get_db_cfg(),
"super_tables": self.get_stb_cfg()
}]
}
def create_insert_file(self) -> str:
date = datetime.datetime.now()
file_create_table = f"/tmp/insert_{date:%F-%H%M}.json"
with open(file_create_table, 'w') as f:
json.dump(self.get_insert_cfg(), f)
return file_create_table

View File

@ -0,0 +1,60 @@
import mysql.connector
class MySQLDatabase:
def __init__(self, host = '192.168.1.116', port = 3306, user = 'root', password = 'taosdata', database = 'perf_data'):
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.connection = None
def connect(self):
try:
self.connection = mysql.connector.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database
)
except mysql.connector.Error as error:
print("Failed to connect to database: {}".format(error))
def execute(self, query, params=None):
cursor = self.connection.cursor()
try:
cursor.execute(query, params)
self.connection.commit()
except mysql.connector.Error as error:
print("Failed to execute query: {}".format(error))
finally:
cursor.close()
def query(self, query, params=None):
cursor = self.connection.cursor()
try:
cursor.execute(query, params)
result = cursor.fetchall()
return result
except mysql.connector.Error as error:
print("Failed to execute query: {}".format(error))
finally:
cursor.close()
def get_id(self, query, params = None):
cursor = self.connection.cursor()
try:
cursor.execute(query, params)
cursor.execute("select last_insert_id()")
id = cursor.fetchone()[0]
self.connection.commit()
return id
except mysql.connector.Error as error:
print("Failed to execute query: {}".format(error))
finally:
cursor.close()
def disconnect(self):
self.connection.close()

View File

@ -0,0 +1,41 @@
import datetime
import json
class QueryJson:
def __init__(self, sql, query_times = 1) -> None:
self.sql = sql
self.query_times = query_times
def gen_query_json(self) -> dict:
return {
"filetype": "query",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"confirm_parameter_prompt": "no",
"databases": "test",
"query_times": self.query_times,
"query_mode": "taosc",
"specified_table_query": {
"query_interval": 1,
"concurrent": 1,
"sqls": [
{
"sql": "%s" % self.sql,
"result": "./query_res.txt"
}
]
}
}
def create_query_file(self) -> str:
date = datetime.datetime.now()
file_create_table = f"/tmp/query_{date:%F-%H%M}.json"
with open(file_create_table, 'w') as f:
json.dump(self.gen_query_json(), f)
return file_create_table

View File

@ -0,0 +1,75 @@
import os
import socket
import mysqldb
import insert_json
import query_json
import buildTD
if __name__ == "__main__":
# Build TDengine
hostname = socket.gethostname()
new_build = buildTD.BuildTDengine(host = hostname)
new_build.build()
cmd = f"cd {new_build.path} && git rev-parse --short @ "
commit_id = new_build.get_cmd_output(cmd)
branch = new_build.branch
num_of_tables = 10000
records_per_table = 10000
interlace_rows = 0
stt_trigger = 1
# get scenario id
db = mysqldb.MySQLDatabase()
db.connect()
sql = f"select id from scenarios where num_of_tables = {num_of_tables} and records_per_table = {records_per_table} and interlace_rows = {interlace_rows} and stt_trigger = {stt_trigger}"
row = db.query(sql)
if row is None:
id = db.get_id(f"insert into scenarios(num_of_tables, records_per_table, interlace_rows, stt_trigger) values({num_of_tables},{records_per_table}, {interlace_rows}, {stt_trigger})")
else:
id = row[0][0]
print(f"scenario id is {id}")
# record insert performance data
insert = insert_json.InsertJson(num_of_tables, records_per_table, interlace_rows, stt_trigger)
os.system(f"taosBenchmark -f {insert.create_insert_file()}")
cmd = "grep Spent /tmp/insert_res.txt | tail -1 | awk {'print $5'}"
time = new_build.get_cmd_output(cmd)
cmd = "grep Spent /tmp/insert_res.txt | tail -1 | awk {'print $16'}"
speed = new_build.get_cmd_output(cmd)
sql = f"insert into insert_perf(sid, time_cost, records_per_sec, branch, commit_id, date) values({id}, {time}, {speed}, '{branch}', '{commit_id}', now())"
print(sql)
db.execute(sql)
# record query performance data
sql = "select * from queries"
res = db.query(sql)
for row in res:
json = query_json.QueryJson(row[1], query_times=1)
print(f"query: {row[1]}")
os.system(f"taosBenchmark -f {json.create_query_file()} > /tmp/{row[0]}.txt")
cmd = "grep delay /tmp/%d.txt | awk {'print $11'} | cut -d 's' -f 1" % row[0]
print(f"cmd is {cmd}")
avg = new_build.get_cmd_output(cmd)
print(f"avg is {avg}")
if (avg == ""):
break
sql = f"insert into query_perf(sid, qid, time_cost, branch, commit_id, date) values({id}, {row[0]}, {avg}, '{branch}', '{commit_id}', now())"
print(sql)
db.execute(sql)
# close connection
db.disconnect()

View File

@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ,
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ,
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ,
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ,
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ,
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ,
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ,
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ,
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ,
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ,
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
#$keyList = $keyList . ,
#$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . ,
$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList

View File

@ -30,7 +30,7 @@ class TDTestCase:
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
self.deletedDataSql= '''drop database if exists deldata;create database deldata duration 300;use deldata;
self.deletedDataSql= '''drop database if exists deldata;create database deldata duration 300 stt_trigger 4; ;use deldata;
create table deldata.stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int);
create table deldata.ct1 using deldata.stb1 tags ( 1 );
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
@ -38,7 +38,9 @@ class TDTestCase:
delete from deldata.stb1;
flush database deldata;
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
delete from deldata.ct1;'''
delete from deldata.ct1;
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a );
flush database deldata;'''
def checkProcessPid(self,processName):
i=0
while i<60:
@ -262,7 +264,7 @@ class TDTestCase:
if self.is_list_same_as_ordered_list(resultList,expectList):
print("The unordered list is the same as the ordered list.")
else:
tdlog.error("The unordered list is not the same as the ordered list.")
tdLog.exit("The unordered list is not the same as the ordered list.")
tdsql.execute("insert into test.d80 values (now+1s, 11, 103, 0.21);")
tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);")

View File

@ -19,7 +19,7 @@ class TDTestCase:
self.wal_retention_period1 = 3600
self.wal_retention_period2 = 1
self.commit_value_list = ["true", "false"]
self.offset_value_list = ["", "earliest", "latest", "none"]
self.offset_value_list = ["earliest", "latest", "none"]
self.tbname_value_list = ["true", "false"]
self.snapshot_value_list = ["false"]
@ -92,7 +92,7 @@ class TDTestCase:
}
consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0
consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0
consumer_ret = "earliest" if offset_value == "" else offset_value
consumer_ret = "latest" if offset_value == "" else offset_value
expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]}ms,reset:{consumer_ret}'
if len(offset_value) == 0:
del consumer_dict["auto.offset.reset"]

View File

@ -18,7 +18,7 @@ python3 .\test.py -f 1-insert\influxdb_line_taosc_insert.py
@REM #python3 .\test.py -f 1-insert\test_stmt_muti_insert_query.py
@REM python3 .\test.py -f 1-insert\alter_stable.py
@REM python3 .\test.py -f 1-insert\alter_table.py
@REM python3 .\test.py -f 2-query\between.py
python3 .\test.py -f 2-query\between.py
@REM python3 .\test.py -f 2-query\distinct.py
@REM python3 .\test.py -f 2-query\varchar.py
@REM python3 .\test.py -f 2-query\ltrim.py
@ -101,3 +101,4 @@ python3 .\test.py -f 7-tmq\subscribeStb.py
@REM python3 .\test.py -f 7-tmq\subscribeStb3.py
@REM python3 .\test.py -f 7-tmq\subscribeStb4.py
@REM python3 .\test.py -f 7-tmq\db.py
python3 .\test.py -f 6-cluster\5dnode3mnodeSep1VnodeStopDnodeModifyMeta.py -N 6 -M 3

View File

@ -547,6 +547,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (g_conf.snapShot) {
tmq_conf_set(conf, "experimental.snapshot.enable", "true");