Merge branch '3.0' into fix/3_liaohj

This commit is contained in:
Haojun Liao 2024-06-10 01:34:55 +08:00 committed by GitHub
commit 20bade24c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
87 changed files with 1234 additions and 347 deletions

View File

@ -11,6 +11,10 @@ ELSE ()
SET(TD_VER_COMPATIBLE "3.0.0.0")
ENDIF ()
IF (TD_PRODUCT_NAME)
ADD_DEFINITIONS(-DTD_PRODUCT_NAME="${TD_PRODUCT_NAME}")
ENDIF ()
find_program(HAVE_GIT NAMES git)
IF (DEFINED GITINFO)

View File

@ -451,6 +451,101 @@ In addition to writing data using the SQL method or the parameter binding API, w
- Within _reqid interfaces can track the entire call chain by passing the reqid parameter.
### Subscription API
- `const char *tmq_err2str(int32_t code)`
**Description**
- This interface is used to convert error codes for data subscriptions into error messages
**Parameter description**
- code: error code
**Return value**
- non NULL, return error message, error message may be empty
- `tmq_conf_t *tmq_conf_new()`
- `tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value)`
- `void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param)`
- `void tmq_conf_destroy(tmq_conf_t *conf)`
tmq_conf_res_t defined as follows:
```
typedef enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
} tmq_conf_res_t;
```
commit callback function defined as follows:
```
typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param))
```
**Description**
- tmq_conf_new : create a tmq_conf_t structure to configure consumption parameters
- tmq_conf_set : set configuration, configuration is key-value pair
- tmq_conf_set_auto_commit_cb : set auto commit callback function
- tmq_conf_destroy : destroy tmq_conf_t structure
**Parameter description**
- tmq_conf_set : key is parameter namevalue is parameter value
- tmq_conf_set_auto_commit_cb : cb is callback function, param is callback function parameter
**Return value**
- tmq_conf_new: structure of tmq_conf_t, NULL failed
- tmq_conf_set: tmq_conf_res_t, TMQ_CONF_OK means success, others means failure
- `tmq_list_t *tmq_list_new()`
- `int32_t tmq_list_append(tmq_list_t *, const char *)`
- `void tmq_list_destroy(tmq_list_t *)`
- `int32_t tmq_list_get_size(const tmq_list_t *)`
- `char **tmq_list_to_c_array(const tmq_list_t *)`
**Description**
- tmq_list_new : build a tmq_list_t constructure, used to save topic
- tmq_list_append : add topic to tmq_list_t
- tmq_list_destroy : destroy tmq_list_t
- tmq_list_get_size : get size of tmq_list_t
- tmq_list_to_c_array : convert tmq_list_t to c array, element is string pointer
**Return value**
- tmq_list_new : structure of tmq_list_t, tmq_list_t is a list of strings, NULL failed
- tmq_list_append : zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_list_get_size : size of tmq_list_t, -1 failed
- tmq_list_to_c_array : c array, element is pointer of string, NULL failed
- `tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen)`
- `int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list)`
- `int32_t tmq_unsubscribe(tmq_t *tmq)`
- `int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topic_list)`
- `TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout)`
- `int32_t tmq_consumer_close(tmq_t *tmq)`
**Description**
- tmq_consumer_new : build a tmq_t constructure, need to be used with tmq_consumer_close
- tmq_subscribe : subscribe topic, need to be used with tmq_unsubscribe
- tmq_unsubscribe : unsubscribe topic, need to be used with tmq_subscribe
- tmq_subscription : obtain a list of topics subscribed by consumer
- tmq_consumer_poll : used to consume data
- tmq_consumer_close : clost tmq_t, need to be used with tmq_consumer_new
**Parameter description**
- conf: sed to configure consume parameters
- errstr: The error information is stored in this string. Allocation and release of memory are the responsibility of the caller
- errstenLen: the length of errstr
- tmq: structure of tmq_t returned by tmq_consumer_new
- topic_list: a list of topics subscribed by consumersneed to be freed by tmq_list_destroy
- timeout: the timeout time, measured in milliseconds, indicates how long it takes for data to expire. If it is negative, it will default to 1 second
**Return value**
- tmq_consumer_new: structure of tmq_t, NULL failed
- tmq_subscribe: zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_unsubscribe: zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_subscription: zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_consumer_poll: structure of TAOS_RES(same like taos_query), NULL if there is no data
- tmq_consumer_close: zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment)`
- `void tmq_free_assignment(tmq_topic_assignment* pAssignment)`
@ -474,6 +569,7 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Return value**
- zero success,none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**Function description**
@ -482,6 +578,7 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Return value**
- the value of committed offset, -2147467247 means no committed value, Other values less than 0 indicate failure
- `int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg)`
- `void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param)`
- `int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
@ -499,6 +596,7 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Return value**
- zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**Function description**
@ -507,6 +605,7 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Return value**
- the current consumption location, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
**Function description**
@ -515,24 +614,51 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Return value**
- zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int32_t int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
**Function description**
- Obtain the starting offset of the consumed data
- `int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
- `int32_t tmq_get_vgroup_id(TAOS_RES *res)`
**Description**
- tmq_get_vgroup_offset : Obtain the starting offset of the consumed data
- tmq_get_vgroup_id : Obtain the vgroup id of the consumed data
**Parameter description**
- msgMessage consumed
- msg : Message consumed
**Return value**
- the starting offset of the consumed data, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_get_vgroup_offset : the starting offset of the consumed data, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_get_vgroup_id : vgroup id of result, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int32_t int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics)`
**Function description**
- Obtain a list of topics subscribed by consumers
- `TAOS *tmq_get_connect(tmq_t *tmq)`
- `const char *tmq_get_table_name(TAOS_RES *res)`
- `tmq_res_t tmq_get_res_type(TAOS_RES *res)`
- `const char *tmq_get_topic_name(TAOS_RES *res)`
- `const char *tmq_get_db_name(TAOS_RES *res)`
tmq_res_t the type of consumed result, defined as follows:
```
typedef enum tmq_res_t {
TMQ_RES_INVALID = -1, // invalid
TMQ_RES_DATA = 1, // data
TMQ_RES_TABLE_META = 2, // meta
TMQ_RES_METADATA = 3 // data & meta
} tmq_res_t;
```
**Description**
- tmq_get_connect : when creating a consumer, a link will be automatically established and saved in the tmq_t structure. This interface allows users to obtain link information(same like taos_connect) from the tmq_t structure
- tmq_get_table_name : get the table name of result
- tmq_get_res_type : get the type of result
- tmq_get_topic_name : get the topic name of result
- tmq_get_db_name : get the db name of result
**Parameter description**
- topics: a list of topics subscribed by consumersneed to be freed by tmq_list_destroy
- tmq : tmq_t structure created by tmq_consumer_new
- res : TAOS_RES structure returned by tmq_consumer_poll
**Return value**
- zero successnone zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_get_connect : connection info in tmq, NULL if failed
- tmq_get_table_name : table name of result, NULL if failed
- tmq_get_res_type : result type tmq_res_t
- tmq_get_topic_name : topic name of result, NULL if failed
- tmq_get_db_name : db name of result, NULL if failed

View File

@ -80,7 +80,7 @@ If a stream is created with PARTITION BY clause and SUBTABLE clause, the name of
CREATE STREAM avg_vol_s INTO avg_vol SUBTABLE(CONCAT('new-', tname)) AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname tname INTERVAL(1m);
```
IN PARTITION clause, 'tbname', representing each subtable name of source supertable, is given alias 'tname'. And 'tname' is used in SUBTABLE clause. In SUBTABLE clause, each auto created subtable will concat 'new-' and source subtable name as their name(Starting from 3.2.3.0, in order to avoid the expression in subtable being unable to distinguish between different subtables, add '_groupId' to the end of subtable name).
IN PARTITION clause, 'tbname', representing each subtable name of source supertable, is given alias 'tname'. And 'tname' is used in SUBTABLE clause. In SUBTABLE clause, each auto created subtable will concat 'new-' and source subtable name as their name(Starting from 3.2.3.0, in order to avoid the expression in subtable being unable to distinguish between different subtables, add '_stableName_groupId' to the end of subtable name).
If the output length exceeds the limitation of TDengine(192), the name will be truncated. If the generated name is occupied by some other table, the creation and writing of the new subtable will be failed.

View File

@ -1,5 +1,5 @@
if (! "RJDBC" %in% installed.packages()[, "Package"]) {
install.packages('RJDBC', repos='http://cran.us.r-project.org')
install.packages('RJDBC', repos='http://mirrors.tuna.tsinghua.edu.cn/CRAN')
}
# ANCHOR: demo

View File

@ -528,6 +528,99 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
- 带_reqid的接口可以通过传递reqid参数来追踪整个的调用链。
### 数据订阅 API
- `const char *tmq_err2str(int32_t code)`
**功能说明**
- 该接口用于将数据订阅的错误码转换为错误信息
**参数说明**
- code: 数据订阅的错误码
**返回值**
- 非NULL返回错误信息错误信息可能为空字符串
- `tmq_conf_t *tmq_conf_new()`
- `tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value)`
- `void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param)`
- `void tmq_conf_destroy(tmq_conf_t *conf)`
tmq_conf_res_t 错误码定义如下:
```
typedef enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
} tmq_conf_res_t;
```
设置自动提交回调函数的定义如下:
```
typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param))
```
**功能说明**
- tmq_conf_new 接口用于创建一个 tmq_conf_t 结构体,用于配置消费参数。
- tmq_conf_set 接口用于设置消费参数。
- tmq_conf_set_auto_commit_cb 接口用于设置自动提交回调函数。
- tmq_conf_destroy 接口用于销毁 tmq_conf_t 结构体。
**参数说明**
- tmq_conf_set : key 为参数名value 为参数值
- tmq_conf_set_auto_commit_cb : cb 回调函数, param 回调函数参数
**返回值**
- tmq_conf_new: 配置 tmq_conf_t 类型指针, NULL 失败
- tmq_conf_set: 结果 tmq_conf_res_t 类型, TMQ_CONF_OK 成功, 其他失败
- `tmq_list_t *tmq_list_new()`
- `int32_t tmq_list_append(tmq_list_t *, const char *)`
- `void tmq_list_destroy(tmq_list_t *)`
- `int32_t tmq_list_get_size(const tmq_list_t *)`
- `char **tmq_list_to_c_array(const tmq_list_t *)`
**功能说明**
- tmq_list_new 接口用于创建一个 tmq_list_t 结构体,用于存储订阅的 topic。
- tmq_list_append 接口用于向 tmq_list_t 结构体中添加一个 topic。
- tmq_list_destroy 接口用于销毁 tmq_list_t 结构体tmq_list_new 的结果需要通过该接口销毁。
- tmq_list_get_size 接口用于获取 tmq_list_t 结构体中 topic 的个数。
- tmq_list_to_c_array 接口用于将 tmq_list_t 结构体转换为 C 数组,数组每个元素为字符串指针。
**返回值**
- tmq_list_new : 返回 tmq_list_t 结果指针, tmq_list_t 是一个数组,每个元素是一个字符串, NULL 失败
- tmq_list_append : 返回 0 表示成功非0表示失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- tmq_list_get_size : 返回 tmq_list_t 结构体中 topic 的个数, -1 失败
- tmq_list_to_c_array : 返回 c 数组, 每个元素是字符串指针, NULL 失败
- `tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen)`
- `int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list)`
- `int32_t tmq_unsubscribe(tmq_t *tmq)`
- `int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topic_list)`
- `TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout)`
- `int32_t tmq_consumer_close(tmq_t *tmq)`
**功能说明**
- tmq_consumer_new 接口用于创建一个 tmq_t 结构体,用于消费数据,消费完数据后需调用 tmq_consumer_close 关闭消费者。
- tmq_subscribe 接口用于订阅 topic 列表,消费完数据后,需调用 tmq_subscribe 取消订阅。
- tmq_unsubscribe 接口用于取消订阅的 topic 列表。需与 tmq_subscribe 配合使用。
- tmq_subscription 接口用于获取订阅的 topic 列表。
- tmq_consumer_poll 接口用于轮询消费数据,每一个消费者,只能单线程调用该接口。
- tmq_consumer_close 接口用于关闭 tmq_t 结构体。需与 tmq_consumer_new 配合使用。
**参数说明**
- conf: 参数用于配置消费参数
- errstr: 错误信息存储在这个字符串中,需自定分配内存,释放内存由调用者负责
- errstenLen: errstr 字符串的长度
- tmq: tmq_consumer_new 函数返回的 tmq_t 结构体
- topic_list: topic 列表
- timeout: 超时时间,单位为毫秒,表示多久没数据的话自动返回 NULL负数的话默认超时1秒
**返回值**
- tmq_consumer_new 返回 tmq_t 结构体,失败返回 NULL
- tmq_subscribe 返回错误码0 表示成功非0表示失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- tmq_unsubscribe 返回错误码0 表示成功非0表示失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- tmq_subscription 返回错误码0 表示成功非0表示失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- tmq_consumer_poll 返回 TAOS_RES 结构体NULL 表示没有数据,非 NULL 表示有数据TAOS_RES 结果和 taos_query 返回结果一致,可通过查询的各种接口获取 TAOS_RES 里的信息,比如 schema 等。
- tmq_consumer_close 返回错误码0 表示成功非0表示失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- `int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment)`
- `void tmq_free_assignment(tmq_topic_assignment* pAssignment)`
@ -541,6 +634,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
int64_t end;
} tmq_topic_assignment;
```
**功能说明**
- tmq_get_topic_assignment 接口返回当前consumer分配的vgroup的信息每个vgroup的信息包括vgIdwal的最大最小offset以及当前消费到的offset。
@ -551,6 +645,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
**返回值**
- 错误码0成功非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- `int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**功能说明**
@ -559,6 +654,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
**返回值**
- 当前commit的位置-2147467247表示没有消费进度其他小于0的值表示失败错误码就是返回值
- `int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg)`
- `void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param)`
- `int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
@ -576,6 +672,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
**返回值**
- 错误码0成功非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**功能说明**
@ -584,7 +681,8 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
**返回值**
- 消费位置非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
- `int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
**功能说明**
- 设置 consumer 在某个topic的某个vgroup的 offset位置开始消费
@ -592,24 +690,52 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
**返回值**
- 错误码0成功非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int32_t int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
- `int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
- `int32_t tmq_get_vgroup_id(TAOS_RES *res)`
**功能说明**
- 获取 poll 消费到的数据的起始offset
- tmq_get_vgroup_offset 获取 poll 消费到的数据的起始offset
- tmq_get_vgroup_id 获取 poll 消费到的数据的所属的vgrou id
**参数说明**
- msg消费到的消息结构
**返回值**
- 消费到的offset非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- tmq_get_vgroup_offset 返回值为消费到的offset非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- tmq_get_vgroup_id 返回值为消费到的数据所属的vgrou id非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int32_t int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics)`
- `TAOS *tmq_get_connect(tmq_t *tmq)`
- `const char *tmq_get_table_name(TAOS_RES *res)`
- `tmq_res_t tmq_get_res_type(TAOS_RES *res)`
- `const char *tmq_get_topic_name(TAOS_RES *res)`
- `const char *tmq_get_db_name(TAOS_RES *res)`
tmq_res_t 表示消费到的数据类型,定义如下:
```
typedef enum tmq_res_t {
TMQ_RES_INVALID = -1, // invalid
TMQ_RES_DATA = 1, // 数据
TMQ_RES_TABLE_META = 2, // 元数据
TMQ_RES_METADATA = 3 // 既有元数据又有数据,即自动建表
} tmq_res_t;
```
**功能说明**
- 获取消费者订阅的 topic 列表
- tmq_get_connect 创建consumer时会自动建立链接保存在 tmq_t 结构体中,该接口用户获取 tmq_t 结构体中的链接信息类似taos_connect
- tmq_get_table_name 获取返回结果所属的的表名
- tmq_get_res_type 获取返回结果的类型
- tmq_get_topic_name 获取返回结果所属的topic名
- tmq_get_db_name 获取返回结果所属的数据库名
**参数说明**
- topics: 获取的 topic 列表存储在这个结构中接口内分配内存需调用tmq_list_destroy释放
- tmqtmq_consumer_new 返回的消费者handle
- restmq_consumer_poll 返回的消费到的消息
**返回值**
- 错误码0成功非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- tmq_get_connect 返回值为tmq_t结构体中的链接连接非 NULL 正常NULL 失败
- tmq_get_table_name 返回值为消费到的数据所属的表名,非 NULL 正常NULL 失败
- tmq_get_res_type 返回值为消费到的数据所属的类型,具体见上面 tmq_res_t 的注释说明
- tmq_get_topic_name 返回值为消费到的数据所属的 topic 名,非 NULL 正常NULL 失败
- tmq_get_db_name 返回值为消费到的数据所属的数据库名,非 NULL 正常NULL 失败

View File

@ -91,7 +91,7 @@ SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname COUNT_WIN
CREATE STREAM avg_vol_s INTO avg_vol SUBTABLE(CONCAT('new-', tname)) AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname tname INTERVAL(1m);
```
PARTITION 子句中,为 tbname 定义了一个别名 tname, 在PARTITION 子句中的别名可以用于 SUBTABLE 子句中的表达式计算,在上述示例中,流新创建的子表将以前缀 'new-' 连接原表名作为表名(从3.2.3.0开始,为了避免 sutable 中的表达式无法区分各个子表,即误将多个相同时间线写入一个子表,在指定的子表名后面加上 _groupId)。
PARTITION 子句中,为 tbname 定义了一个别名 tname, 在PARTITION 子句中的别名可以用于 SUBTABLE 子句中的表达式计算,在上述示例中,流新创建的子表将以前缀 'new-' 连接原表名作为表名(从3.2.3.0开始,为了避免 SUBTABLE 中的表达式无法区分各个子表,即误将多个相同时间线写入一个子表,在指定的子表名后面加上 __stableName_groupId)。
注意,子表名的长度若超过 TDengine 的限制,将被截断。若要生成的子表名已经存在于另一超级表,由于 TDengine 的子表名是唯一的,因此对应新子表的创建以及数据的写入将会失败。

View File

@ -73,7 +73,7 @@ char *tGetMachineId();
#ifdef TD_ENTERPRISE
#define GRANTS_SCHEMA \
static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "version", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
@ -85,7 +85,7 @@ char *tGetMachineId();
#else
#define GRANTS_SCHEMA \
static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "version", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, \

View File

@ -224,13 +224,14 @@
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ENCRYPT_KEY, "create-encrypt-key", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_S3MIGRATE_DB, "s3migrate-db", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_S3MIGRATE_DB_TIMER, "s3migrate-db-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_UNUSED2, "mnd-unused2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TSMA, "create-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TSMA, "drop-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STB_DROP, "drop-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_TSMA, "get-table-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_TSMA, "get-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TB_WITH_TSMA, "drop-tb-with-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8
@ -275,7 +276,6 @@
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "vnode-compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "vnode-drop-ttl-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_TTL_EXPIRED_TBS, "vnode-fetch-ttl-expired-tbs", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TRIM, "vnode-trim", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "vnode-commit", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_INDEX, "vnode-create-index", NULL, NULL)
@ -286,6 +286,7 @@
TD_DEF_MSG_TYPE(TDMT_VND_S3MIGRATE, "vnode-s3migrate", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ARB_HEARTBEAT, "vnode-arb-hb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ARB_CHECK_SYNC, "vnode-arb-check-sync", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_TTL_EXPIRED_TBS, "vnode-fetch-ttl-expired-tbs", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_VND_MSG)
@ -318,11 +319,12 @@
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED, "stream-unused", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_CREATE, "stream-create", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_STREAM_MSG)
TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8
@ -369,9 +371,10 @@
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_UNUSED, "vnd-stream-unused", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_GET_STREAM_PROGRESS, "vnd-stream-progress", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_VND_STREAM_MSG)
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) //8 << 8

View File

@ -104,6 +104,8 @@ typedef struct SNodeAllocator SNodeAllocator;
int32_t nodesInitAllocatorSet();
void nodesDestroyAllocatorSet();
int32_t nodesCreateAllocator(int64_t queryId, int32_t chunkSize, int64_t* pAllocatorId);
int32_t nodesSimAcquireAllocator(int64_t allocatorId);
int32_t nodesSimReleaseAllocator(int64_t allocatorId);
int32_t nodesAcquireAllocator(int64_t allocatorId);
int32_t nodesReleaseAllocator(int64_t allocatorId);
int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId);

View File

@ -45,6 +45,7 @@ extern "C" {
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
typedef enum {
TAOS_WAL_SKIP = 0,
TAOS_WAL_WRITE = 1,
TAOS_WAL_FSYNC = 2,
} EWalType;
@ -151,7 +152,8 @@ struct SWalReader {
TdFilePtr pIdxFile;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t skipToVersion; // skip data and jump to destination version, usually used by stream resume ignoring untreated data
int64_t skipToVersion; // skip data and jump to destination version, usually used by stream resume ignoring untreated
// data
int64_t capacity;
TdThreadMutex mutex;
SWalFilterCond cond;
@ -198,7 +200,7 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id);
void walCloseReader(SWalReader *pRead);
void walReadReset(SWalReader *pReader);
int32_t walReadVer(SWalReader *pRead, int64_t ver);
void decryptBody(SWalCfg* cfg, SWalCkHead* pHead, int32_t plainBodyLen, const char* func);
void decryptBody(SWalCfg *cfg, SWalCkHead *pHead, int32_t plainBodyLen, const char *func);
int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead);
int64_t walReaderGetCurrentVer(const SWalReader *pReader);
@ -206,7 +208,7 @@ int64_t walReaderGetValidFirstVer(const SWalReader *pReader);
int64_t walReaderGetSkipToVersion(SWalReader *pReader);
void walReaderSetSkipToVersion(SWalReader *pReader, int64_t ver);
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever);
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset);
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset);
// only for tq usage
int32_t walFetchHead(SWalReader *pRead, int64_t ver);

View File

@ -114,6 +114,12 @@ int32_t atomic_fetch_xor_32(int32_t volatile *ptr, int32_t val);
int64_t atomic_fetch_xor_64(int64_t volatile *ptr, int64_t val);
void *atomic_fetch_xor_ptr(void *ptr, void *val);
#ifdef _MSC_VER
#define tmemory_barrier(order) MemoryBarrier()
#else
#define tmemory_barrier(order) __sync_synchronize()
#endif
#ifdef __cplusplus
}
#endif

View File

@ -78,6 +78,14 @@ extern const int32_t TYPE_BYTES[21];
#define TSDB_DEFAULT_PASS "taosdata"
#endif
#ifndef TD_PRODUCT_NAME
#ifdef TD_ENTERPRISE
#define TD_PRODUCT_NAME "TDengine Enterprise Edition"
#else
#define TD_PRODUCT_NAME "TDengine Community Edition"
#endif
#endif
#define TSDB_TRUE 1
#define TSDB_FALSE 0
#define TSDB_OK 0
@ -365,7 +373,7 @@ typedef enum ELogicConditionType {
#define TSDB_MIN_FSYNC_PERIOD 0
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
#define TSDB_MIN_WAL_LEVEL 1
#define TSDB_MIN_WAL_LEVEL 0
#define TSDB_MAX_WAL_LEVEL 2
#define TSDB_DEFAULT_WAL_LEVEL 1
#define TSDB_MIN_PRECISION TSDB_TIME_PRECISION_MILLI

View File

@ -89,6 +89,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
"current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
if (pRequest->pQuery && pRequest->pQuery->pRoot) {
if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
(0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) {
@ -109,6 +110,9 @@ static void deregisterRequest(SRequestObj *pRequest) {
}
}
nodesSimReleaseAllocator(pRequest->allocatorRefId);
}
if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) {
sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
@ -491,7 +495,10 @@ void doDestroyRequest(void *p) {
}
taosMemoryFree(pRequest->body.interParam);
if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
qDestroyQuery(pRequest->pQuery);
nodesSimReleaseAllocator(pRequest->allocatorRefId);
}
nodesDestroyAllocator(pRequest->allocatorRefId);
taosMemoryFreeClear(pRequest->effectiveUser);

View File

@ -939,7 +939,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
pReq.pColumns = taosArrayInit(pReq.numOfColumns, sizeof(SFieldWithOptions));
for (int32_t i = 0; i < pReq.numOfColumns; ++i) {
SField *pField = taosArrayGet(pColumns, i);
SFieldWithOptions fieldWithOption;
SFieldWithOptions fieldWithOption = {0};
setFieldWithOptions(&fieldWithOption, pField);
setDefaultOptionsForField(&fieldWithOption);
taosArrayPush(pReq.pColumns, &fieldWithOption);

View File

@ -233,7 +233,7 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
SSmlKv kvTs = {0};
smlBuildTsKv(&kvTs, ts);
if (needConverTime) {
if (needConverTime && info->currSTableMeta != NULL) {
kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
}

View File

@ -412,11 +412,11 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
tmq_list_t* tmq_list_new() { return (tmq_list_t*)taosArrayInit(0, sizeof(void*)); }
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
if (list == NULL) return -1;
if (list == NULL) return TSDB_CODE_INVALID_PARA;
SArray* container = &list->container;
if (src == NULL || src[0] == 0) return -1;
if (src == NULL || src[0] == 0) return TSDB_CODE_INVALID_PARA;
char* topic = taosStrdup(src);
if (taosArrayPush(container, &topic) == NULL) return -1;
if (taosArrayPush(container, &topic) == NULL) return TSDB_CODE_INVALID_PARA;
return 0;
}
@ -826,7 +826,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
offRows->vgId = pVg->vgId;
offRows->rows = pVg->numOfRows;
offRows->offset = pVg->offsetInfo.endOffset;
offRows->ever = pVg->offsetInfo.walVerEnd;
offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd;
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
tscDebug("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,
@ -872,7 +872,10 @@ void tmqSendHbReq(void* param, void* tmrId) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (code != 0) {
tscError("tmqSendHbReq asyncSendMsgToServer failed");
}
OVER:
tDestroySMqHbReq(&req);
@ -1240,12 +1243,15 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(buf);
goto FAIL;
}
SMqSubscribeCbParam param = {.rspErr = 0};
if (tsem_init(&param.rspSem, 0, 0) != 0) {
code = TSDB_CODE_TSC_INTERNAL_ERROR;
taosMemoryFree(buf);
taosMemoryFree(sendInfo);
goto FAIL;
}
@ -1265,10 +1271,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
goto FAIL;
}
// avoid double free if msg is sent
buf = NULL;
sendInfo = NULL;
tsem_wait(&param.rspSem);
tsem_destroy(&param.rspSem);
@ -1280,7 +1282,10 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int32_t retryCnt = 0;
while ((code = syncAskEp(tmq)) != 0) {
if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId);
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s", tmq->consumerId, strerror(code));
if(code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
code = 0;
}
goto FAIL;
}
@ -1304,8 +1309,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
FAIL:
taosArrayDestroyP(req.topicNames, taosMemoryFree);
taosMemoryFree(buf);
taosMemoryFree(sendInfo);
return code;
}
@ -1378,7 +1381,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
if(pMsg->pData == NULL){
tscError("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
goto FAIL;
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto END;
}
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
@ -1407,7 +1411,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderClear(&decoder);
taosReleaseRef(tmqMgmt.rsetId, refId);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
goto END;
}
tDecoderClear(&decoder);
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
@ -1423,7 +1427,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderClear(&decoder);
taosReleaseRef(tmqMgmt.rsetId, refId);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
goto END;
}
tDecoderClear(&decoder);
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
@ -1434,7 +1438,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderClear(&decoder);
taosReleaseRef(tmqMgmt.rsetId, refId);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
goto END;
}
tDecoderClear(&decoder);
memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
@ -1792,6 +1796,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
if (pParam == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(msg);
goto FAIL;
}
@ -1803,6 +1808,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(msg);
goto FAIL;
}
@ -1829,7 +1835,6 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
return 0;
FAIL:
taosMemoryFreeClear(msg);
return tmqPollCb(pParam, NULL, code);
}
@ -1935,8 +1940,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
terrno = pRspWrapper->code;
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId,
tstrerror(pRspWrapper->code));
taosFreeQitem(pRspWrapper);
return NULL;
} else {
if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform
askEp(tmq, NULL, false, true);
@ -2339,7 +2342,7 @@ const char* tmq_get_db_name(TAOS_RES* res) {
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
if (res == NULL) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
return ((SMqRspObjCommon*)res)->vgId;
@ -2347,7 +2350,7 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return pMetaRspObj->vgId;
} else {
return -1;
return TSDB_CODE_INVALID_PARA;
}
}
@ -2650,6 +2653,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
code = TSDB_CODE_INVALID_PARA;
taosMemoryFree(pReq);
goto FAIL;
}
@ -2657,6 +2661,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
if (pParam == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pReq);
goto FAIL;
}
@ -2667,6 +2672,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pReq);
goto FAIL;
}
@ -2688,8 +2694,6 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
}
FAIL:
taosMemoryFreeClear(pParam);
taosMemoryFreeClear(pReq);
askEpCb(pParam, NULL, code);
}
@ -2704,7 +2708,7 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
if (tmq == NULL) {
taosMemoryFree(pParamSet);
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return -1;
return terrno;
}
// if no more waiting rsp
@ -2796,7 +2800,6 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pParam);
return 0;
}
@ -2900,8 +2903,6 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
int64_t transporterId = 0;
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo);
if (code != 0) {
taosMemoryFree(buf);
taosMemoryFree(sendInfo);
tsem_destroy(&pParam->sem);
taosMemoryFree(pParam);
return code;
@ -3158,6 +3159,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
sendInfo->requestId = req.reqId;
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->paramFreeFp = taosMemoryFree;
sendInfo->fp = tmqGetWalInfoCb;
sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
@ -3169,8 +3171,6 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
if (code != 0) {
taosMemoryFree(pParam);
taosMemoryFree(msg);
goto end;
}
}
@ -3326,8 +3326,6 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
int64_t transporterId = 0;
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (code != 0) {
taosMemoryFree(msg);
taosMemoryFree(sendInfo);
tsem_destroy(&pParam->sem);
taosMemoryFree(pParam);
return code;

View File

@ -383,7 +383,9 @@ static int32_t tRowBuildKVRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, c
if (IS_VAR_DATA_TYPE(schema->columns[i].type)) {
payloadSize += tPutI16v(payload + payloadSize, colValArray[colValIndex].cid);
payloadSize += tPutU32v(payload + payloadSize, colValArray[colValIndex].value.nData);
if (colValArray[colValIndex].value.nData > 0) {
memcpy(payload + payloadSize, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData);
}
payloadSize += colValArray[colValIndex].value.nData;
} else {
payloadSize += tPutI16v(payload + payloadSize, colValArray[colValIndex].cid);

View File

@ -258,17 +258,12 @@ static void dmPrintArgs(int32_t argc, char const *argv[]) {
static void dmGenerateGrant() { mndGenerateMachineCode(); }
static void dmPrintVersion() {
printf("%s\ntaosd version: %s compatible_version: %s\n", TD_PRODUCT_NAME, version, compatible_version);
printf("git: %s\n", gitinfo);
#ifdef TD_ENTERPRISE
char *releaseName = "enterprise";
#else
char *releaseName = "community";
printf("git: %s\n", gitinfoOfInternal);
#endif
printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version);
printf("gitinfo: %s\n", gitinfo);
#ifdef TD_ENTERPRISE
printf("gitinfoOfInternal: %s\n", gitinfoOfInternal);
#endif
printf("buildInfo: %s\n", buildinfo);
printf("build: %s\n", buildinfo);
}
static void dmPrintHelp() {
@ -403,13 +398,6 @@ int mainWindows(int argc, char **argv) {
return -1;
}
if(dmGetEncryptKey() != 0){
dError("failed to start since failed to get encrypt key");
taosCloseLog();
taosCleanupArgs();
return -1;
};
if (taosConvInit() != 0) {
dError("failed to init conv");
taosCloseLog();
@ -447,6 +435,13 @@ int mainWindows(int argc, char **argv) {
osSetProcPath(argc, (char **)argv);
taosCleanupArgs();
if(dmGetEncryptKey() != 0){
dError("failed to start since failed to get encrypt key");
taosCloseLog();
taosCleanupArgs();
return -1;
};
if (dmInit() != 0) {
if (terrno == TSDB_CODE_NOT_FOUND) {
dError("failed to init dnode since unsupported platform, please visit https://www.taosdata.com for support");

View File

@ -662,6 +662,14 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumerNew = NULL;
STrans *pTrans = NULL;
if(taosArrayGetSize(subscribe.topicNames) == 0){
SMqConsumerObj *pConsumerTmp = mndAcquireConsumer(pMnode, subscribe.consumerId);
if(pConsumerTmp == NULL){
goto _over;
}
mndReleaseConsumer(pMnode, pConsumerTmp);
}
code = checkAndSortTopic(pMnode, subscribe.topicNames);
if(code != TSDB_CODE_SUCCESS){
goto _over;

View File

@ -36,7 +36,7 @@ static int32_t mndRetrieveGrant(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
if (pShow->numOfRows < 1) {
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
const char *src = "community";
const char *src = TD_PRODUCT_NAME;
STR_WITH_MAXSIZE_TO_VARSTR(tmp, src, 32);
colDataSetVal(pColInfo, numOfRows, tmp, false);

View File

@ -796,7 +796,13 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
static bool mndCheckDbConflict(const char *conflict, STrans *pTrans) {
if (conflict[0] == 0) return false;
if (strcasecmp(conflict, pTrans->dbname) == 0 || strcasecmp(conflict, pTrans->stbname) == 0) return true;
if (strcasecmp(conflict, pTrans->dbname) == 0) return true;
return false;
}
static bool mndCheckStbConflict(const char *conflict, STrans *pTrans) {
if (conflict[0] == 0) return false;
if (strcasecmp(conflict, pTrans->stbname) == 0) return true;
return false;
}
@ -816,17 +822,17 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (mndCheckDbConflict(pNew->dbname, pTrans)) conflict = true;
if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true;
if (mndCheckStbConflict(pNew->stbname, pTrans)) conflict = true;
}
}
if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB) {
if (mndCheckDbConflict(pNew->dbname, pTrans)) conflict = true;
if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true;
if (mndCheckStbConflict(pNew->stbname, pTrans)) conflict = true;
}
if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true; // for stb
if (mndCheckStbConflict(pNew->stbname, pTrans)) conflict = true; // for stb
}
}

View File

@ -160,6 +160,12 @@ int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
(atExit && (pVnode->inUse->size > 0 || pVnode->pMeta->changed ||
pVnode->state.applied - pVnode->state.committed > 4096));
}
vTrace("vgId:%d, should commit:%d, disk available:%d, buffer size:%" PRId64 ", node size:%" PRId64
", meta changed:%d"
", state:[%" PRId64 ",%" PRId64 "]",
TD_VID(pVnode), needCommit, diskAvail, pVnode->inUse ? pVnode->inUse->size : 0,
pVnode->inUse ? pVnode->inUse->node.size : 0, pVnode->pMeta->changed, pVnode->state.applied,
pVnode->state.committed);
taosThreadMutexUnlock(&pVnode->mutex);
return needCommit;
}

View File

@ -67,7 +67,12 @@ static void clearWinStateBuff(SCountWindowResult* pBuff) {
static SCountWindowResult* getCountWinStateInfo(SCountWindowSupp* pCountSup) {
SCountWindowResult* pBuffInfo = taosArrayGet(pCountSup->pWinStates, pCountSup->stateIndex);
pCountSup->stateIndex = (pCountSup->stateIndex + 1) % taosArrayGetSize(pCountSup->pWinStates);
int32_t size = taosArrayGetSize(pCountSup->pWinStates);
// coverity scan
ASSERTS(size > 0, "WinStates is empty");
if (size > 0) {
pCountSup->stateIndex = (pCountSup->stateIndex + 1) % size;
}
return pBuffInfo;
}

View File

@ -93,6 +93,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
goto _error;
}
tmemory_barrier();
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
@ -428,6 +429,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
}
}
tmemory_barrier();
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
code = tsem_post(&pExchangeInfo->ready);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -1314,6 +1314,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint6
pTableScanInfo->tableEndIndex = -1;
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL;
pTableScanInfo->scanMode = TABLE_SCAN__BLOCK_ORDER;
}
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
@ -1363,6 +1364,12 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
}
bool comparePrimaryKey(SColumnInfoData* pCol, int32_t rowId, void* pVal) {
// coverity scan
ASSERTS(pVal != NULL, "pVal should not be NULL");
if (!pVal) {
qError("failed to compare primary key, since primary key is null");
return false;
}
void* pData = colDataGetData(pCol, rowId);
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
int32_t colLen = varDataLen(pData);
@ -1469,8 +1476,13 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
}
STableScanInfo* pTScanInfo = pInfo->pTableScanOp->info;
qDebug("prepare range scan start:%" PRId64 ",end:%" PRId64 ",maxVer:%" PRIu64, win.skey, win.ekey, pInfo->pUpdateInfo->maxDataVersion);
// coverity scan
ASSERTS(pInfo->pUpdateInfo != NULL, "Failed to set data version, since pInfo->pUpdateInfo is NULL");
if (pInfo->pUpdateInfo) {
qDebug("prepare range scan start:%" PRId64 ",end:%" PRId64 ",maxVer:%" PRIu64, win.skey, win.ekey,
pInfo->pUpdateInfo->maxDataVersion);
resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
}
pInfo->pTableScanOp->status = OP_OPENED;
return true;
}

View File

@ -677,6 +677,14 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
goto _error;
}
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pCountNode->window.watermark,
.calTrigger = pCountNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
.deleteMark = getDeleteMark(&pCountNode->window, 0),
};
pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0,
pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
@ -687,13 +695,6 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->streamAggSup.windowCount = pCountNode->windowCount;
pInfo->streamAggSup.windowSliding = pCountNode->windowSliding;
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pCountNode->window.watermark,
.calTrigger = pCountNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
};
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->binfo.pRes = pResBlock;

View File

@ -364,7 +364,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
code = saveResult(curWin.winInfo, pSeUpdated);
saveResult(curWin.winInfo, pSeUpdated);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {

View File

@ -1427,7 +1427,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
}
if (IS_FINAL_INTERVAL_OP(pOperator) && !pInfo->destHasPrimaryKey) {
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
}
if (IS_FINAL_INTERVAL_OP(pOperator)) {
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval,
pInfo->pPullDataMap, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator);
@ -2845,7 +2847,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
if (!pInfo->destHasPrimaryKey) {
removeSessionDeleteResults(pInfo->pStDeleted, pInfo->pUpdated);
}
if (pInfo->isHistoryOp) {
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
}
@ -4131,7 +4135,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
}
pOperator->status = OP_RES_TO_RETURN;
if (!pInfo->destHasPrimaryKey) {
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
}
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL,
pInfo->pUpdatedMap, pInfo->pDelWins, pOperator);
if (pInfo->destHasPrimaryKey && IS_NORMAL_INTERVAL_OP(pOperator)) {

View File

@ -1242,14 +1242,9 @@ static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
}
static void udfdPrintVersion() {
#ifdef TD_ENTERPRISE
char *releaseName = "enterprise";
#else
char *releaseName = "community";
#endif
printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version);
printf("gitinfo: %s\n", gitinfo);
printf("buildInfo: %s\n", buildinfo);
printf("udfd version: %s compatible_version: %s\n", version, compatible_version);
printf("git: %s\n", gitinfo);
printf("build: %s\n", buildinfo);
}
static int32_t udfdInitLog() {

View File

@ -240,6 +240,26 @@ int32_t nodesCreateAllocator(int64_t queryId, int32_t chunkSize, int64_t* pAlloc
return code;
}
int32_t nodesSimAcquireAllocator(int64_t allocatorId) {
if (allocatorId <= 0) {
return TSDB_CODE_SUCCESS;
}
SNodeAllocator* pAllocator = taosAcquireRef(g_allocatorReqRefPool, allocatorId);
if (NULL == pAllocator) {
return terrno;
}
return TSDB_CODE_SUCCESS;
}
int32_t nodesSimReleaseAllocator(int64_t allocatorId) {
if (allocatorId <= 0) {
return TSDB_CODE_SUCCESS;
}
return taosReleaseRef(g_allocatorReqRefPool, allocatorId);
}
int32_t nodesAcquireAllocator(int64_t allocatorId) {
if (allocatorId <= 0) {
return TSDB_CODE_SUCCESS;

View File

@ -6771,6 +6771,15 @@ static int32_t checkDbEnumOption(STranslateContext* pCxt, const char* pName, int
return TSDB_CODE_SUCCESS;
}
static int32_t checkDbEnumOption3(STranslateContext* pCxt, const char* pName, int32_t val, int32_t v1, int32_t v2,
int32_t v3) {
if (val >= 0 && val != v1 && val != v2 && val != v3) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option %s: %d, only %d, %d, %d allowed", pName, val, v1, v2, v3);
}
return TSDB_CODE_SUCCESS;
}
static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRetentions, int8_t precision) {
if (NULL == pRetentions) {
return TSDB_CODE_SUCCESS;
@ -6914,7 +6923,12 @@ static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbNa
if ((pOptions->replica == 2) ^ (pOptions->withArbitrator == TSDB_MAX_DB_WITH_ARBITRATOR)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid database option, with_arbitrator should be used with replica 2");
"Invalid option, with_arbitrator should be used with replica 2");
}
if (pOptions->replica > 1 && pOptions->walLevel == 0) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option, wal_level 0 should be used with replica 1");
}
return TSDB_CODE_SUCCESS;
@ -6978,7 +6992,8 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
code = checkDbStrictOption(pCxt, pOptions);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkDbEnumOption(pCxt, "walLevel", pOptions->walLevel, TSDB_MIN_WAL_LEVEL, TSDB_MAX_WAL_LEVEL);
code = checkDbEnumOption3(pCxt, "walLevel", pOptions->walLevel, TSDB_MIN_WAL_LEVEL, TSDB_DEFAULT_WAL_LEVEL,
TSDB_MAX_WAL_LEVEL);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkDbRangeOption(pCxt, "vgroups", pOptions->numOfVgroups, TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
@ -7252,6 +7267,15 @@ static void buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt,
}
static int32_t translateAlterDatabase(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt) {
if (pStmt->pOptions->walLevel == 0) {
SDbCfgInfo dbCfg = {0};
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
if (TSDB_CODE_SUCCESS == code && dbCfg.replications > 1) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option, wal_level 0 should be used with replica 1");
}
}
int32_t code = checkDatabaseOptions(pCxt, pStmt->dbName, pStmt->pOptions);
if (TSDB_CODE_SUCCESS != code) {
return code;

View File

@ -335,7 +335,7 @@ TEST_F(ParserInitialATest, alterDatabaseSemanticCheck) {
run("ALTER DATABASE test KEEP 1000000000s", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test KEEP 1w", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test PAGES 63", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test WAL_LEVEL 0", TSDB_CODE_PAR_INVALID_DB_OPTION);
//run("ALTER DATABASE test WAL_LEVEL 0", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test WAL_LEVEL 3", TSDB_CODE_PAR_INVALID_DB_OPTION);
//run("ALTER DATABASE test REPLICA 2", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test STT_TRIGGER 0", TSDB_CODE_PAR_INVALID_DB_OPTION);

View File

@ -4164,8 +4164,10 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, pLastRowCols, false, cxt.pkBytes);
lastRowScanOptRemoveUslessTargets(pScan->node.pTargets, cxt.pLastCols, cxt.pOtherCols, pLastRowCols);
if (pPKTsCol && ((pScan->node.pTargets->length == 1) || (pScan->node.pTargets->length == 2 && cxt.pkBytes > 0))) {
// when select last(ts),ts from ..., we add another ts to targets
if (pPKTsCol &&
((cxt.pLastCols->length == 1 && nodesEqualNode((SNode*)pPKTsCol, nodesListGetNode(cxt.pLastCols, 0))) ||
(pScan->node.pTargets->length == 2 && cxt.pkBytes > 0))) {
// when select last(ts),tbname,ts from ..., we add another ts to targets
sprintf(pPKTsCol->colName, "#sel_val.%p", pPKTsCol);
nodesListAppend(pScan->node.pTargets, nodesCloneNode((SNode*)pPKTsCol));
}

View File

@ -1213,6 +1213,7 @@ EDealRes sclRewriteFunction(SNode **pNode, SScalarCtx *ctx) {
res->translate = true;
strcpy(res->node.aliasName, node->node.aliasName);
res->node.resType.type = output.columnData->info.type;
res->node.resType.bytes = output.columnData->info.bytes;
res->node.resType.scale = output.columnData->info.scale;
@ -1268,6 +1269,7 @@ EDealRes sclRewriteLogic(SNode **pNode, SScalarCtx *ctx) {
res->node.resType = node->node.resType;
res->translate = true;
strcpy(res->node.aliasName, node->node.aliasName);
int32_t type = output.columnData->info.type;
if (IS_VAR_DATA_TYPE(type)) {
res->datum.p = output.columnData->pData;
@ -1309,6 +1311,7 @@ EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) {
res->translate = true;
strcpy(res->node.aliasName, node->node.aliasName);
res->node.resType = node->node.resType;
if (colDataIsNull_s(output.columnData, 0)) {
res->isNull = true;
@ -1364,6 +1367,7 @@ EDealRes sclRewriteCaseWhen(SNode **pNode, SScalarCtx *ctx) {
res->translate = true;
strcpy(res->node.aliasName, node->node.aliasName);
res->node.resType = node->node.resType;
if (colDataIsNull_s(output.columnData, 0)) {
res->isNull = true;

View File

@ -734,6 +734,21 @@ _end:
return code;
}
int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey);
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
streamStateFreeCur(pCur);
if (code == TSDB_CODE_SUCCESS) {
return code;
} else {
pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey);
}
code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
streamStateFreeCur(pCur);
return code;
}
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) {
SSessionKey* pWinKey = pKey;
const TSKEY gap = 0;
@ -755,14 +770,13 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
if (size == 0) {
void* pFileStore = getStateFileStore(pFileState);
void* pRockVal = NULL;
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId);
code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen);
streamStateFreeCur(pCur);
code = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code);
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
pWinKey->win.ekey, code);
if (code == TSDB_CODE_SUCCESS) {
int32_t valSize = *pVLen;
COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) );
COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
goto _end;
@ -798,20 +812,24 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
}
if (index == -1) {
if (!isDeteled(pFileState, endTs)) {
void* p = NULL;
if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) {
SSessionKey tmpKey = *pWinKey;
void* pRockVal = NULL;
void* pFileStore = getStateFileStore(pFileState);
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId);
int32_t code_file = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &p, pVLen);
int32_t code_file = getCountWinStateFromDisc(pFileStore, &tmpKey, &pRockVal, pVLen);
if (code_file == TSDB_CODE_SUCCESS) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
SRowBuffPos* pFirstPos = taosArrayGetP(pWinStates, 0);
SSessionKey* pFirstWinKey = (SSessionKey*)pFirstPos->pKey;
if (tmpKey.win.ekey < pFirstWinKey->win.skey) {
*pWinKey = tmpKey;
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
code = code_file;
qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file);
streamStateFreeCur(pCur);
qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
pWinKey->win.ekey, code_file);
goto _end;
}
taosMemoryFree(p);
streamStateFreeCur(pCur);
}
taosMemoryFree(pRockVal);
}
}

View File

@ -874,6 +874,17 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64, pNode->vgId, terrstr(), index + 1);
return -1;
}
if (pMsg->matchIndex == -1) {
// first time to restore
sInfo("vgId:%d, first time to restore sync log repl. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64
", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), index:%" PRId64
", firstVer:%" PRId64 ", term:%" PRId64 ", lastMatchTerm:%" PRId64,
pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, index, firstVer, term,
pMsg->lastMatchTerm);
}
if ((index + 1 < firstVer) || (term < 0) ||
(term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);

View File

@ -348,6 +348,7 @@ bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) {
taosWLockLatch(&exh->latch);
if (exh->handle == NULL) exh->handle = conn;
exh->inited = 1;
exh->pThrd = conn->hostThrd;
if (!QUEUE_IS_EMPTY(&exh->q)) {
queue* h = QUEUE_HEAD(&exh->q);
QUEUE_REMOVE(h);
@ -2514,7 +2515,7 @@ static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t
if (exh == NULL) {
return NULL;
}
taosWLockLatch(&exh->latch);
if (exh->pThrd == NULL && trans != NULL) {
int idx = cliRBChoseIdx(trans);
if (idx < 0) return NULL;
@ -2522,7 +2523,9 @@ static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t
}
pThrd = exh->pThrd;
taosWUnLockLatch(&exh->latch);
transReleaseExHandle(transGetRefMgt(), handle);
return pThrd;
}
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {

View File

@ -20,7 +20,6 @@
#include "tutil.h"
#include "walInt.h"
bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver;
}
@ -154,7 +153,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
// validate body
int32_t cryptedBodyLen = logContent->head.bodyLen;
if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
}
recordLen = walCkHeadSz + cryptedBodyLen;
@ -226,7 +225,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
goto _err;
}
if (taosFsyncFile(pFile) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pFile) < 0) {
wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
@ -626,7 +625,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
int32_t plainBodyLen = ckHead.head.bodyLen;
int32_t cryptedBodyLen = plainBodyLen;
if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
}
idxEntry.offset += sizeof(SWalCkHead) + cryptedBodyLen;
@ -636,7 +635,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
pWal->cfg.vgId, terrstr(), idxEntry.ver, idxEntry.offset, fLogNameStr);
goto _err;
}
if (taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, failed to append file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
goto _err;
@ -644,7 +643,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
count++;
}
if (taosFsyncFile(pIdxFile) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pIdxFile) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, faild to fsync file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
goto _err;
@ -880,13 +879,13 @@ int walSaveMeta(SWal* pWal) {
int n;
// fsync the idx and log file at first to ensure validity of meta
if (taosFsyncFile(pWal->pIdxFile) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pIdxFile) < 0) {
wError("vgId:%d, failed to sync idx file due to %s", pWal->cfg.vgId, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosFsyncFile(pWal->pLogFile) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pLogFile) < 0) {
wError("vgId:%d, failed to sync log file due to %s", pWal->cfg.vgId, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
@ -901,7 +900,8 @@ int walSaveMeta(SWal* pWal) {
return -1;
}
TdFilePtr pMetaFile = taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
TdFilePtr pMetaFile =
taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
if (pMetaFile == NULL) {
wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
@ -910,13 +910,13 @@ int walSaveMeta(SWal* pWal) {
char* serialized = walMetaSerialize(pWal);
int len = strlen(serialized);
if (len != taosWriteFile(pMetaFile, serialized, len)) {
if (pWal->cfg.level != TAOS_WAL_SKIP && len != taosWriteFile(pMetaFile, serialized, len)) {
wError("vgId:%d, failed to write file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile(pMetaFile) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pMetaFile) < 0) {
wError("vgId:%d, failed to sync file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;

View File

@ -79,8 +79,7 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) {
TdFilePtr pIdxTFile, pLogTFile;
char fnameStr[WAL_FILE_LEN];
if (pWal->pLogFile != NULL) {
code = taosFsyncFile(pWal->pLogFile);
if (code != 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pLogFile)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
@ -91,8 +90,7 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) {
}
}
if (pWal->pIdxFile != NULL) {
code = taosFsyncFile(pWal->pIdxFile);
if (code != 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pIdxFile)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}

View File

@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "crypt.h"
#include "os.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tglobal.h"
#include "walInt.h"
#include "crypt.h"
int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
taosThreadMutexLock(&pWal->mutex);
@ -396,8 +396,7 @@ int32_t walRollImpl(SWal *pWal) {
int32_t code = 0;
if (pWal->pIdxFile != NULL) {
code = taosFsyncFile(pWal->pIdxFile);
if (code != 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pIdxFile)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto END;
}
@ -409,8 +408,7 @@ int32_t walRollImpl(SWal *pWal) {
}
if (pWal->pLogFile != NULL) {
code = taosFsyncFile(pWal->pLogFile);
if (code != 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pLogFile)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto END;
}
@ -510,12 +508,15 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
wDebug("vgId:%d, wal write log %" PRId64 ", msgType: %s, cksum head %u cksum body %u", pWal->cfg.vgId, index,
TMSG_INFO(msgType), pWal->writeHead.cksumHead, pWal->writeHead.cksumBody);
if (pWal->cfg.level != TAOS_WAL_SKIP) {
code = walWriteIndex(pWal, index, offset);
if (code < 0) {
goto END;
}
}
if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
if (pWal->cfg.level != TAOS_WAL_SKIP &&
taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
@ -524,15 +525,15 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
}
int32_t cyptedBodyLen = plainBodyLen;
char* buf = (char*)body;
char* newBody = NULL;
char* newBodyEncrypted = NULL;
char *buf = (char *)body;
char *newBody = NULL;
char *newBodyEncrypted = NULL;
if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
cyptedBodyLen = ENCRYPTED_LEN(cyptedBodyLen);
newBody = taosMemoryMalloc(cyptedBodyLen);
if(newBody == NULL){
if (newBody == NULL) {
wError("vgId:%d, file:%" PRId64 ".log, failed to malloc since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
code = -1;
@ -542,11 +543,11 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
memcpy(newBody, body, plainBodyLen);
newBodyEncrypted = taosMemoryMalloc(cyptedBodyLen);
if(newBodyEncrypted == NULL){
if (newBodyEncrypted == NULL) {
wError("vgId:%d, file:%" PRId64 ".log, failed to malloc since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
code = -1;
if(newBody != NULL) taosMemoryFreeClear(newBody);
if (newBody != NULL) taosMemoryFreeClear(newBody);
goto END;
}
@ -559,28 +560,28 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
int32_t count = CBC_Encrypt(&opts);
//wDebug("vgId:%d, file:%" PRId64 ".log, index:%" PRId64 ", CBC_Encrypt cryptedBodyLen:%d, plainBodyLen:%d, %s",
// wDebug("vgId:%d, file:%" PRId64 ".log, index:%" PRId64 ", CBC_Encrypt cryptedBodyLen:%d, plainBodyLen:%d, %s",
// pWal->cfg.vgId, walGetLastFileFirstVer(pWal), index, count, plainBodyLen, __FUNCTION__);
buf = newBodyEncrypted;
}
if (taosWriteFile(pWal->pLogFile, (char *)buf, cyptedBodyLen) != cyptedBodyLen) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosWriteFile(pWal->pLogFile, (char *)buf, cyptedBodyLen) != cyptedBodyLen) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
code = -1;
if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
taosMemoryFreeClear(newBody);
taosMemoryFreeClear(newBodyEncrypted);
}
goto END;
}
if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
taosMemoryFreeClear(newBody);
taosMemoryFreeClear(newBodyEncrypted);
//wInfo("vgId:%d, free newBody newBodyEncrypted %s",
// wInfo("vgId:%d, free newBody newBodyEncrypted %s",
// pWal->cfg.vgId, __FUNCTION__);
}
@ -693,6 +694,10 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
}
void walFsync(SWal *pWal, bool forceFsync) {
if (pWal->cfg.level == TAOS_WAL_SKIP) {
return;
}
taosThreadMutexLock(&pWal->mutex);
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));

View File

@ -2,6 +2,6 @@ char version[64] = "${TD_VER_NUMBER}";
char compatible_version[12] = "${TD_VER_COMPATIBLE}";
char gitinfo[48] = "${TD_VER_GIT}";
char gitinfoOfInternal[48] = "${TD_VER_GIT_INTERNAL}";
char buildinfo[64] = "Built ${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} at ${TD_VER_DATE}";
char buildinfo[64] = "${TD_VER_OSTYPE}-${TD_VER_CPUTYPE} ${TD_VER_DATE}";
void libtaos_${TD_LIB_VER_NUMBER}_${TD_VER_OSTYPE}_${TD_VER_CPUTYPE}_${TD_VER_VERTYPE}() {};

View File

@ -0,0 +1,110 @@
import taos
import sys
import os
import subprocess
import glob
import shutil
import time
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.srvCtl import *
from frame.caseBase import *
from frame import *
from frame.autogen import *
# from frame.server.dnodes import *
# from frame.server.cluster import *
class TDTestCase(TBase):
updatecfgDict = {
'slowLogScope':"query"
}
def init(self, conn, logSql, replicaVar=3):
super(TDTestCase, self).init(conn, logSql, replicaVar=3, db="snapshot", checkColName="c1")
self.valgrind = 0
self.childtable_count = 10
# tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), logSql) # output sql.txt file
def run(self):
tdSql.prepare()
autoGen = AutoGen()
autoGen.create_db(self.db, 2, 3)
tdSql.execute(f"use {self.db}")
autoGen.create_stable(self.stb, 5, 10, 8, 8)
autoGen.create_child(self.stb, "d", self.childtable_count)
autoGen.insert_data(1000)
tdSql.execute(f"flush database {self.db}")
sc.dnodeStop(3)
# clusterDnodes.stoptaosd(1)
# clusterDnodes.starttaosd(3)
# time.sleep(5)
# clusterDnodes.stoptaosd(2)
# clusterDnodes.starttaosd(1)
# time.sleep(5)
autoGen.insert_data(5000, True)
self.flushDb(True)
# wait flush operation over
time.sleep(5)
# sql = 'show vnodes;'
# while True:
# bFinish = True
# param_list = tdSql.query(sql, row_tag=True)
# for param in param_list:
# if param[3] == 'leading' or param[3] == 'following':
# bFinish = False
# break
# if bFinish:
# break
self.snapshotAgg()
time.sleep(10)
sc.dnodeStopAll()
for i in range(1, 4):
path = clusterDnodes.getDnodeDir(i)
dnodesRootDir = os.path.join(path,"data","vnode", "vnode*")
dirs = glob.glob(dnodesRootDir)
for dir in dirs:
if os.path.isdir(dir):
self.remove_directory(os.path.join(dir, "wal"))
sc.dnodeStart(1)
sc.dnodeStart(2)
sc.dnodeStart(3)
sql = "show vnodes;"
time.sleep(10)
while True:
bFinish = True
param_list = tdSql.query(sql, row_tag=True)
for param in param_list:
if param[3] == 'offline':
tdLog.exit(
"dnode synchronous fail dnode id: %d, vgroup id:%d status offline" % (param[0], param[1]))
if param[3] == 'leading' or param[3] == 'following':
bFinish = False
break
if bFinish:
break
self.timestamp_step = 1000
self.insert_rows = 6000
self.checkInsertCorrect()
self.checkAggCorrect()
def remove_directory(self, directory):
try:
shutil.rmtree(directory)
tdLog.debug("delete dir: %s " % (directory))
except OSError as e:
tdLog.exit("delete fail dir: %s " % (directory))
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -19,5 +19,3 @@ import sys
import os
import time
import datetime

28
tests/army/frame/eutil.py Normal file
View File

@ -0,0 +1,28 @@
###################################################################
# Copyright (c) 2023 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
#
# about tools funciton extension
#
import sys
import os
import time
import datetime
import psutil
# cpu frequent as random
def cpuRand(max):
decimal = int(str(psutil.cpu_freq().current).split(".")[1])
return decimal % max

View File

@ -35,18 +35,18 @@ class TDTestCase(TBase):
tdSql.execute("create database db_geometry;")
tdSql.execute("use db_geometry;")
tdSql.execute("create table t_ge (ts timestamp, id int, c1 GEOMETRY(512));")
tdSql.execute("insert into t_ge values(now, 1, 'MULTIPOINT ((0 0), (1 1))');")
tdSql.execute("insert into t_ge values(now, 1, 'MULTIPOINT (0 0, 1 1)');")
tdSql.execute("insert into t_ge values(now, 2, 'POINT (0 0)');")
tdSql.execute("insert into t_ge values(now, 2, 'POINT EMPTY');")
tdSql.execute("insert into t_ge values(now, 3, 'LINESTRING (0 0, 0 1, 1 2)');")
tdSql.execute("insert into t_ge values(now, 3, 'LINESTRING EMPTY');")
tdSql.execute("insert into t_ge values(now, 4, 'POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))');")
tdSql.execute("insert into t_ge values(now, 4, 'POLYGON ((0 0, 4 0, 4 4, 0 4, 0 0), (1 1, 1 2, 2 2, 2 1, 1 1))');")
tdSql.execute("insert into t_ge values(now, 4, 'POLYGON EMPTY');")
tdSql.execute("insert into t_ge values(now, 5, 'MULTILINESTRING ((0 0, 1 1), (2 2, 3 3))');")
tdSql.execute("insert into t_ge values(now, 6, 'MULTIPOLYGON (((1 1, 1 3, 3 3, 3 1, 1 1)), ((4 3, 6 3, 6 1, 4 1, 4 3)))');")
tdSql.execute("insert into t_ge values(now, 7, 'GEOMETRYCOLLECTION (MULTIPOINT((0 0), (1 1)), POINT(3 4), LINESTRING(2 3, 3 4))');")
tdSql.execute("insert into t_ge values(1717122943000, 1, 'MULTIPOINT ((0 0), (1 1))');")
tdSql.execute("insert into t_ge values(1717122944000, 1, 'MULTIPOINT (0 0, 1 1)');")
tdSql.execute("insert into t_ge values(1717122945000, 2, 'POINT (0 0)');")
tdSql.execute("insert into t_ge values(1717122946000, 2, 'POINT EMPTY');")
tdSql.execute("insert into t_ge values(1717122947000, 3, 'LINESTRING (0 0, 0 1, 1 2)');")
tdSql.execute("insert into t_ge values(1717122948000, 3, 'LINESTRING EMPTY');")
tdSql.execute("insert into t_ge values(1717122949000, 4, 'POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))');")
tdSql.execute("insert into t_ge values(1717122950000, 4, 'POLYGON ((0 0, 4 0, 4 4, 0 4, 0 0), (1 1, 1 2, 2 2, 2 1, 1 1))');")
tdSql.execute("insert into t_ge values(1717122951000, 4, 'POLYGON EMPTY');")
tdSql.execute("insert into t_ge values(1717122952000, 5, 'MULTILINESTRING ((0 0, 1 1), (2 2, 3 3))');")
tdSql.execute("insert into t_ge values(1717122953000, 6, 'MULTIPOLYGON (((1 1, 1 3, 3 3, 3 1, 1 1)), ((4 3, 6 3, 6 1, 4 1, 4 3)))');")
tdSql.execute("insert into t_ge values(1717122954000, 7, 'GEOMETRYCOLLECTION (MULTIPOINT((0 0), (1 1)), POINT(3 4), LINESTRING(2 3, 3 4))');")
tdSql.query("select * from t_ge;")
tdSql.checkRows(12)
tdSql.query("select * from t_ge where id=1;")

View File

@ -70,6 +70,14 @@ class TDTestCase(TBase):
tdSql.query(sql1)
tdSql.checkRows(4)
row1 = ['2023-11-30 23:59:27.255', "201000008", 51412.900999999998021]
row2 = ['2023-12-04 23:11:28.179', "201000008", 51458.900999999998021]
row3 = ['2023-12-04 23:12:28.180', "201000008", 51458.800999999999476]
row4 = ['2023-12-31 23:59:36.108', "201000008", 52855.400999999998021]
rows = [row1, row2, row3, row4]
tdSql.checkDataMem(sql1, rows)
# run
def run(self):
tdLog.debug(f"start to excute {__file__}")

View File

@ -19,6 +19,7 @@ import taos
import frame
import frame.etool
import frame.eos
import frame.eutil
from frame.log import *
from frame.cases import *
@ -46,17 +47,20 @@ for test:
class TDTestCase(TBase):
index = eutil.cpuRand(20) + 1
bucketName = f"ci-bucket{index}"
updatecfgDict = {
"supportVnodes":"1000",
's3EndPoint': 'http://192.168.1.52:9000',
's3AccessKey': 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX',
's3BucketName': 'ci-bucket',
's3BucketName': f'{bucketName}',
's3PageCacheSize': '10240',
"s3UploadDelaySec": "10",
's3MigrateIntervalSec': '600',
's3MigrateEnabled': '1'
}
tdLog.info(f"assign bucketName is {bucketName}\n")
maxFileSize = (128 + 10) * 1014 * 1024 # add 10M buffer
def insertData(self):
@ -241,10 +245,9 @@ class TDTestCase(TBase):
#
def preDb(self, vgroups):
cnt = int(time.time())%3 + 1
cnt = int(time.time())%2 + 1
for i in range(cnt):
vg = int(time.time()*1000)%10 + 1
vg = eutil.cpuRand(9) + 1
sql = f"create database predb vgroups {vg}"
tdSql.execute(sql, show=True)
sql = "drop database predb"

View File

@ -48,13 +48,6 @@ class TDTestCase(TBase):
"bigint","bigint unsigned","timestamp","bool","float","double","binary(16)","nchar(16)",
"varchar(16)","varbinary(16)"]
def combineValid(self, datatype, encode, compress):
if datatype != "float" and datatype != "double":
if compress == "tsz":
return False
return True
def genAllSqls(self, stbName, max):
# encode
encodes = [
[["tinyint","tinyint unsigned","smallint","smallint unsigned","int","int unsigned","bigint","bigint unsigned"], ["simple8B"]],
@ -63,6 +56,15 @@ class TDTestCase(TBase):
[["float","double"], ["Delta-d"]]
]
def combineValid(self, datatype, encode, compress):
if datatype != "float" and datatype != "double":
if compress == "tsz":
return False
return True
def genAllSqls(self, stbName, max):
c = 0 # column number
t = 0 # table number
@ -70,7 +72,7 @@ class TDTestCase(TBase):
sql = ""
# loop append sqls
for lines in encodes:
for lines in self.encodes:
for datatype in lines[0]:
for encode in lines[1]:
for compress in self.compresses:
@ -218,6 +220,33 @@ class TDTestCase(TBase):
]
tdSql.errors(sqls)
# add column
def checkAddColumn(self):
c = 0
tbname = f"{self.db}.tbadd"
sql = f"create table {tbname}(ts timestamp, c0 int) tags(area int);"
tdSql.execute(sql)
# loop append sqls
for lines in self.encodes:
for datatype in lines[0]:
for encode in lines[1]:
for compress in self.compresses:
for level in self.levels:
if self.combineValid(datatype, encode, compress):
sql = f"alter table {tbname} add column col{c} {datatype} ENCODE '{encode}' COMPRESS '{compress}' LEVEL '{level}';"
tdSql.execute(sql, 3, True)
c += 1
# alter error
sqls = [
f"alter table {tbname} add column a1 int ENCODE 'simple8bAA';",
f"alter table {tbname} add column a2 int COMPRESS 'AABB';",
f"alter table {tbname} add column a3 bigint LEVEL 'high1';",
f"alter table {tbname} add column a4 BINARY(12) ENCODE 'simple8b' LEVEL 'high2';",
f"alter table {tbname} add column a5 VARCHAR(16) ENCODE 'simple8b' COMPRESS 'gzip' LEVEL 'high3';"
]
tdSql.errors(sqls)
def validCreate(self):
sqls = self.genAllSqls(self.stb, 50)
@ -238,6 +267,9 @@ class TDTestCase(TBase):
# check alter and write
self.checkAlter()
# check add column
self.checkAddColumn()
def checkCorrect(self):
# check data correct
tbname = f"{self.db}.{self.stb}"

View File

@ -10,26 +10,26 @@
#
# army-test
#
,,y,army,./pytest.sh python3 ./test.py -f enterprise/multi-level/mlevel_basic.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f enterprise/db-encrypt/basic.py
,,n,army,python3 ./test.py -f enterprise/s3/s3Basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/snapshot.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f community/query/function/test_func_elapsed.py
,,y,army,./pytest.sh python3 ./test.py -f community/query/test_join.py
,,y,army,./pytest.sh python3 ./test.py -f community/query/test_compare.py
,,y,army,./pytest.sh python3 ./test.py -f community/insert/test_column_tag_boundary.py
,,y,army,./pytest.sh python3 ./test.py -f community/query/fill/fill_desc.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/incSnapshot.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/query/query_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/query/accuracy/test_query_accuracy.py
,,y,army,./pytest.sh python3 ./test.py -f community/insert/insert_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/splitVgroupByLearner.py -N 3
,,n,army,python3 ./test.py -f community/cmdline/fullopt.py
,,n,army,python3 ./test.py -f community/query/show.py -N 3
,,n,army,python3 ./test.py -f enterprise/alter/alterConfig.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/query/subquery/subqueryBugs.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/storage/oneStageComp.py -N 3 -L 3 -D 1
,,y,army,./pytest.sh python3 ./test.py -f community/storage/compressBasic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f multi-level/mlevel_basic.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f db-encrypt/basic.py
,,n,army,python3 ./test.py -f s3/s3Basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f cluster/snapshot.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f query/function/test_func_elapsed.py
,,y,army,./pytest.sh python3 ./test.py -f query/test_join.py
,,y,army,./pytest.sh python3 ./test.py -f query/test_compare.py
,,y,army,./pytest.sh python3 ./test.py -f insert/test_column_tag_boundary.py
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_desc.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f cluster/incSnapshot.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f query/query_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f query/accuracy/test_query_accuracy.py
,,y,army,./pytest.sh python3 ./test.py -f insert/insert_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f cluster/splitVgroupByLearner.py -N 3
,,n,army,python3 ./test.py -f cmdline/fullopt.py
,,n,army,python3 ./test.py -f query/show.py -N 3
,,n,army,python3 ./test.py -f alter/alterConfig.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f query/subquery/subqueryBugs.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f storage/oneStageComp.py -N 3 -L 3 -D 1
,,y,army,./pytest.sh python3 ./test.py -f storage/compressBasic.py -N 3
#
# system test

View File

@ -49,7 +49,7 @@ print ============= create database with all options
# | KEEP value [max(1d ~ 365000d), default: 1d, unit may be minut/hour/day]
# | PRECISION ['ms' | 'us' | 'ns', default: ms]
# | REPLICA value [1 | 3, default: 1]
# | WAL_LEVEL value [1 | 2, default: 1]
# | WAL_LEVEL value [0 | 1 | 2, default: 1]
# | VGROUPS value [default: 2]
# | SINGLE_STABLE [0 | 1, default: ]
#
@ -404,7 +404,7 @@ endi
sql drop database db
sql_error create database db WAL_LEVEL 3
sql_error create database db WAL_LEVEL -1
sql_error create database db WAL_LEVEL 0
#sql_error create database db WAL_LEVEL 0
print ====> VGROUPS value [1~4096, default: 2]
sql create database db VGROUPS 1

View File

@ -61,5 +61,23 @@ if $data02 != 1 then
return -1
endi
sql insert into t2 (ts, b, a) select ts + 1, 11, 12 from t1;
sql select * from t2;
if $rows != 2 then
return -1
endi
if $data01 != 2 then
return -1
endi
if $data02 != 1 then
return -1
endi
if $data11 != 12 then
return -1
endi
if $data12 != 11 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -225,7 +225,7 @@ sql_error create database $db ctime 29
sql_error create database $db ctime 40961
# wal {0, 2}
sql_error create database testwal wal_level 0
#sql_error create database testwal wal_level 0
sql select * from information_schema.ins_databases
if $rows != 2 then
return -1

View File

@ -101,5 +101,248 @@ if $rows != 1 then
return -1
endi
print step 2-------------------------------
sql drop database if exists test;
sql create database test cachemodel 'both';
sql use test;
sql create table stb (ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using stb tags(1,1,1);
sql create table t2 using stb tags(2,2,2);
sql insert into t1 values('2024-06-05 11:00:00',1,2,3);
sql insert into t1 values('2024-06-05 12:00:00',2,2,3);
sql insert into t2 values('2024-06-05 13:00:00',3,2,3);
sql insert into t2 values('2024-06-05 14:00:00',4,2,3);
sql select last(ts) ts1,ts from stb;
if $data00 != $data01 then
print $data00
return -1
endi
sql select last(ts) ts1,ts from stb group by tbname;
if $data00 != $data01 then
print $data00
return -1
endi
sql select last(ts) ts1,tbname, ts from stb;
if $data00 != $data02 then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
sql select last(ts) ts1,tbname, ts from stb group by tbname;
if $data00 != $data02 then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
print step 3-------------------------------
sql drop database if exists test1;
sql create database test1 cachemodel 'both';
sql use test1;
sql create table stb (ts timestamp,a int primary key,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using stb tags(1,1,1);
sql create table t2 using stb tags(2,2,2);
sql insert into t1 values('2024-06-05 11:00:00',1,2,3);
sql insert into t1 values('2024-06-05 12:00:00',2,2,3);
sql insert into t2 values('2024-06-05 13:00:00',3,2,3);
sql insert into t2 values('2024-06-05 14:00:00',4,2,3);
sql select last(ts) ts1,ts from stb;
if $data00 != $data01 then
print $data00
return -1
endi
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
sql select last(ts) ts1,ts from stb group by tbname;
if $data00 != $data01 then
print $data00
return -1
endi
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
sql select last(ts) ts1,tbname, ts from stb;
if $data00 != $data02 then
print $data00
return -1
endi
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
sql select last(ts) ts1,tbname, ts from stb group by tbname;
if $data00 != $data02 then
print $data00
return -1
endi
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
print step 4-------------------------------
sql select last(a) a,ts from stb;
if $data00 != 4 then
print $data00
return -1
endi
if $data01 != @24-06-05 14:00:00.000@ then
print $data01
return -1
endi
sql select last(a) a,ts from stb group by tbname;
if $data00 != 4 then
print $data00
return -1
endi
if $data01 != @24-06-05 14:00:00.000@ then
print $data01
return -1
endi
sql select last(a) a,tbname, ts from stb;
if $data00 != 4 then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
if $data02 != @24-06-05 14:00:00.000@ then
print $data02
return -1
endi
sql select last(a) a,tbname, ts from stb group by tbname;
if $data00 != 4 then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
if $data02 != @24-06-05 14:00:00.000@ then
print $data02
return -1
endi
print step 5-------------------------------
sql select last(ts) ts1,a from stb;
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
if $data01 != 4 then
print $data01
return -1
endi
sql select last(ts) ts1,a from stb group by tbname;
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
if $data01 != 4 then
print $data01
return -1
endi
sql select last(ts) ts1,tbname, a from stb;
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
if $data02 != 4 then
print $data02
return -1
endi
sql select last(ts) ts1,tbname, a from stb group by tbname;
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
if $data02 != 4 then
print $data02
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -148,7 +148,7 @@ class TDTestCase:
@property
def fsync_create_err(self):
return [
"create database db1 wal_level 0",
#"create database db1 wal_level 0",
"create database db1 wal_level 3",
"create database db1 wal_level null",
"create database db1 wal_level true",
@ -162,7 +162,7 @@ class TDTestCase:
@property
def fsync_alter_err(self):
return [
"alter database db1 wal_level 0",
#"alter database db1 wal_level 0",
"alter database db1 wal_level 3",
"alter database db1 wal_level null",
"alter database db1 wal_level true",

View File

@ -269,6 +269,10 @@ class TDTestCase:
def ins_grants_check(self):
grant_name_dict = {
'service':'Service Time',
'timeseries':'Timeseries',
'dnodes':'Dnodes',
'cpu_cores':'CPU Cores',
'stream':'Stream',
'subscription':'Subscription',
'view':'View',
@ -293,6 +297,7 @@ class TDTestCase:
'mysql':'MySQL',
'postgres':'PostgreSQL',
'oracle':'Oracle',
'mssql':'SqlServer'
}
tdSql.execute('drop database if exists db2')
@ -304,7 +309,7 @@ class TDTestCase:
if result[i][0] in grant_name_dict:
tdSql.checkEqual(result[i][1], grant_name_dict[result[i][0]])
index += 1
tdSql.checkEqual(index, 24)
tdSql.checkEqual(index, len(grant_name_dict))
tdSql.query(f'select * from information_schema.ins_grants_logs')
result = tdSql.queryResult
tdSql.checkEqual(True, len(result) >= 0)

View File

@ -186,17 +186,17 @@ class TDTestCase:
tdSql.query('show dnode 1 variables')
for i in tdSql.queryResult:
if i[1].lower() == "gitinfo":
taosd_gitinfo_sql = f"gitinfo: {i[2]}"
taosd_gitinfo_sql = f"git: {i[2]}"
taos_gitinfo_sql = ''
tdSql.query('show local variables')
for i in tdSql.queryResult:
if i[0].lower() == "gitinfo":
taos_gitinfo_sql = f"gitinfo: {i[1]}"
taos_gitinfo_sql = f"git: {i[1]}"
taos_info = os.popen('taos -V').read()
taos_gitinfo = re.findall("^gitinfo.*",taos_info,re.M)
taos_gitinfo = re.findall("^git: .*",taos_info,re.M)
tdSql.checkEqual(taos_gitinfo_sql,taos_gitinfo[0])
taosd_info = os.popen('taosd -V').read()
taosd_gitinfo = re.findall("^gitinfo.*",taosd_info,re.M)
taosd_gitinfo = re.findall("^git: .*",taosd_info,re.M)
tdSql.checkEqual(taosd_gitinfo_sql,taosd_gitinfo[0])
def show_base(self):

View File

@ -5,6 +5,7 @@ import time
import socket
import os
import platform
import re
if platform.system().lower() == 'windows':
import wexpect as taosExpect
else:
@ -370,10 +371,11 @@ class TDTestCase:
if retCode != "TAOS_OK":
tdLog.exit("taos -V fail")
version = 'version: ' + version
retVal = retVal.replace("\n", "")
retVal = retVal.replace("\r", "")
if retVal.startswith(version) == False:
version = 'taos version: ' + version
# retVal = retVal.replace("\n", "")
# retVal = retVal.replace("\r", "")
taosVersion = re.findall((f'^%s'%(version)), retVal,re.M)
if len(taosVersion) == 0:
print ("return version: [%s]"%retVal)
print ("dict version: [%s]"%version)
tdLog.exit("taos -V version not match")

View File

@ -26,30 +26,30 @@ class TDTestCase:
tdSql.execute(f"drop table if exists {table_name}")
tdSql.execute(f"create table {table_name}(ts timestamp, i1 {dtype}, i2 {dtype} unsigned)")
tdSql.execute(f"insert into {table_name} values(now, -16, +6)")
tdSql.execute(f"insert into {table_name} values(now, 80.99, +0042)")
tdSql.execute(f"insert into {table_name} values(now, -0042, +80.99)")
tdSql.execute(f"insert into {table_name} values(now, 52.34354, 18.6)")
tdSql.execute(f"insert into {table_name} values(now, -12., +3.)")
tdSql.execute(f"insert into {table_name} values(now, -0.12, +3.0)")
tdSql.execute(f"insert into {table_name} values(now, -2.3e1, +2.324e2)")
tdSql.execute(f"insert into {table_name} values(now, -2e1, +2e2)")
tdSql.execute(f"insert into {table_name} values(now, -2.e1, +2.e2)")
tdSql.execute(f"insert into {table_name} values(now, -0x40, +0b10000)")
tdSql.execute(f"insert into {table_name} values(now, -0b10000, +0x40)")
tdSql.execute(f"insert into {table_name} values(1717122943000, -16, +6)")
tdSql.execute(f"insert into {table_name} values(1717122944000, 80.99, +0042)")
tdSql.execute(f"insert into {table_name} values(1717122945000, -0042, +80.99)")
tdSql.execute(f"insert into {table_name} values(1717122946000, 52.34354, 18.6)")
tdSql.execute(f"insert into {table_name} values(1717122947000, -12., +3.)")
tdSql.execute(f"insert into {table_name} values(1717122948000, -0.12, +3.0)")
tdSql.execute(f"insert into {table_name} values(1717122949000, -2.3e1, +2.324e2)")
tdSql.execute(f"insert into {table_name} values(1717122950000, -2e1, +2e2)")
tdSql.execute(f"insert into {table_name} values(1717122951000, -2.e1, +2.e2)")
tdSql.execute(f"insert into {table_name} values(1717122952000, -0x40, +0b10000)")
tdSql.execute(f"insert into {table_name} values(1717122953000, -0b10000, +0x40)")
# str support
tdSql.execute(f"insert into {table_name} values(now, '-16', '+6')")
tdSql.execute(f"insert into {table_name} values(now, ' -80.99', ' +0042')")
tdSql.execute(f"insert into {table_name} values(now, ' -0042', ' +80.99')")
tdSql.execute(f"insert into {table_name} values(now, '52.34354', '18.6')")
tdSql.execute(f"insert into {table_name} values(now, '-12.', '+5.')")
tdSql.execute(f"insert into {table_name} values(now, '-.12', '+.5')")
tdSql.execute(f"insert into {table_name} values(now, '-2.e1', '+2.e2')")
tdSql.execute(f"insert into {table_name} values(now, '-2e1', '+2e2')")
tdSql.execute(f"insert into {table_name} values(now, '-2.3e1', '+2.324e2')")
tdSql.execute(f"insert into {table_name} values(now, '-0x40', '+0b10010')")
tdSql.execute(f"insert into {table_name} values(now, '-0b10010', '+0x40')")
tdSql.execute(f"insert into {table_name} values(1717122954000, '-16', '+6')")
tdSql.execute(f"insert into {table_name} values(1717122955000, ' -80.99', ' +0042')")
tdSql.execute(f"insert into {table_name} values(1717122956000, ' -0042', ' +80.99')")
tdSql.execute(f"insert into {table_name} values(1717122957000, '52.34354', '18.6')")
tdSql.execute(f"insert into {table_name} values(1717122958000, '-12.', '+5.')")
tdSql.execute(f"insert into {table_name} values(1717122959000, '-.12', '+.5')")
tdSql.execute(f"insert into {table_name} values(1717122960000, '-2.e1', '+2.e2')")
tdSql.execute(f"insert into {table_name} values(1717122961000, '-2e1', '+2e2')")
tdSql.execute(f"insert into {table_name} values(1717122962000, '-2.3e1', '+2.324e2')")
tdSql.execute(f"insert into {table_name} values(1717122963000, '-0x40', '+0b10010')")
tdSql.execute(f"insert into {table_name} values(1717122964000, '-0b10010', '+0x40')")
tdSql.query(f"select * from {table_name}")
tdSql.checkRows(22)
@ -64,22 +64,22 @@ class TDTestCase:
min_u = 0
print("val:", baseval, negval, posval, max_i)
tdSql.execute(f"insert into {table_name} values(now, {negval}, {posval})")
tdSql.execute(f"insert into {table_name} values(now, -{baseval}, {baseval})")
tdSql.execute(f"insert into {table_name} values(now, {max_i}, {max_u})")
tdSql.execute(f"insert into {table_name} values(now, {min_i}, {min_u})")
tdSql.execute(f"insert into {table_name} values(1717122965000, {negval}, {posval})")
tdSql.execute(f"insert into {table_name} values(1717122966000, -{baseval}, {baseval})")
tdSql.execute(f"insert into {table_name} values(1717122967000, {max_i}, {max_u})")
tdSql.execute(f"insert into {table_name} values(1717122968000, {min_i}, {min_u})")
tdSql.query(f"select * from {table_name}")
tdSql.checkRows(26)
# error case
tdSql.error(f"insert into {table_name} values(now, 0, {max_u+1})")
tdSql.error(f"insert into {table_name} values(now, 0, -1)")
tdSql.error(f"insert into {table_name} values(now, 0, -2.0)")
tdSql.error(f"insert into {table_name} values(now, 0, '-2.0')")
tdSql.error(f"insert into {table_name} values(now, {max_i+1}, 0)")
tdSql.error(f"insert into {table_name} values(now, {min_i-1}, 0)")
tdSql.error(f"insert into {table_name} values(now, '{min_i-1}', 0)")
tdSql.error(f"insert into {table_name} values(1717122969000, 0, {max_u+1})")
tdSql.error(f"insert into {table_name} values(1717122970000, 0, -1)")
tdSql.error(f"insert into {table_name} values(1717122971000, 0, -2.0)")
tdSql.error(f"insert into {table_name} values(1717122972000, 0, '-2.0')")
tdSql.error(f"insert into {table_name} values(1717122973000, {max_i+1}, 0)")
tdSql.error(f"insert into {table_name} values(1717122974000, {min_i-1}, 0)")
tdSql.error(f"insert into {table_name} values(1717122975000, '{min_i-1}', 0)")
def test_tags(self, stable_name, dtype, bits):
tdSql.execute(f"create stable {stable_name}(ts timestamp, i1 {dtype}, i2 {dtype} unsigned) tags(id {dtype})")
@ -93,20 +93,20 @@ class TDTestCase:
max_u = 2*bigval - 1
min_u = 0
tdSql.execute(f"insert into {stable_name}_1 using {stable_name} tags('{negval}') values(now, {negval}, {posval})")
tdSql.execute(f"insert into {stable_name}_2 using {stable_name} tags({posval}) values(now, -{baseval} , {baseval})")
tdSql.execute(f"insert into {stable_name}_3 using {stable_name} tags('0x40') values(now, {max_i}, {max_u})")
tdSql.execute(f"insert into {stable_name}_4 using {stable_name} tags(0b10000) values(now, {min_i}, {min_u})")
tdSql.execute(f"insert into {stable_name}_1 using {stable_name} tags('{negval}') values(1717122976000, {negval}, {posval})")
tdSql.execute(f"insert into {stable_name}_2 using {stable_name} tags({posval}) values(1717122977000, -{baseval} , {baseval})")
tdSql.execute(f"insert into {stable_name}_3 using {stable_name} tags('0x40') values(1717122978000, {max_i}, {max_u})")
tdSql.execute(f"insert into {stable_name}_4 using {stable_name} tags(0b10000) values(1717122979000, {min_i}, {min_u})")
tdSql.execute(f"insert into {stable_name}_5 using {stable_name} tags({max_i}) values(now, '{negval}', '{posval}')")
tdSql.execute(f"insert into {stable_name}_6 using {stable_name} tags('{min_i}') values(now, '-{baseval}' , '{baseval}')")
tdSql.execute(f"insert into {stable_name}_7 using {stable_name} tags(-0x40) values(now, '{max_i}', '{max_u}')")
tdSql.execute(f"insert into {stable_name}_8 using {stable_name} tags('-0b10000') values(now, '{min_i}', '{min_u}')")
tdSql.execute(f"insert into {stable_name}_5 using {stable_name} tags({max_i}) values(1717122980000, '{negval}', '{posval}')")
tdSql.execute(f"insert into {stable_name}_6 using {stable_name} tags('{min_i}') values(1717122981000, '-{baseval}' , '{baseval}')")
tdSql.execute(f"insert into {stable_name}_7 using {stable_name} tags(-0x40) values(1717122982000, '{max_i}', '{max_u}')")
tdSql.execute(f"insert into {stable_name}_8 using {stable_name} tags('-0b10000') values(1717122983000, '{min_i}', '{min_u}')")
tdSql.execute(f"insert into {stable_name}_9 using {stable_name} tags(12.) values(now, {negval}, {posval})")
tdSql.execute(f"insert into {stable_name}_10 using {stable_name} tags('-8.3') values(now, -{baseval} , {baseval})")
tdSql.execute(f"insert into {stable_name}_11 using {stable_name} tags(2.e1) values(now, {max_i}, {max_u})")
tdSql.execute(f"insert into {stable_name}_12 using {stable_name} tags('-2.3e1') values(now, {min_i}, {min_u})")
tdSql.execute(f"insert into {stable_name}_9 using {stable_name} tags(12.) values(1717122984000, {negval}, {posval})")
tdSql.execute(f"insert into {stable_name}_10 using {stable_name} tags('-8.3') values(1717122985000, -{baseval} , {baseval})")
tdSql.execute(f"insert into {stable_name}_11 using {stable_name} tags(2.e1) values(1717122986000, {max_i}, {max_u})")
tdSql.execute(f"insert into {stable_name}_12 using {stable_name} tags('-2.3e1') values(1717122987000, {min_i}, {min_u})")
tdSql.query(f"select * from {stable_name}")
tdSql.checkRows(12)

View File

@ -257,10 +257,10 @@ class TDTestCase:
os.system(f'taos -f {sql_file}')
tdSql.query('select count(c_1) from d2.t2 where c_1 < 10', queryTimes=1)
tdSql.checkData(0, 0, 0)
tdSql.query('select count(c_1), min(c_1),tbname from d2.can partition by tbname', queryTimes=1)
tdSql.query('select count(c_1), min(c_1),tbname from d2.can partition by tbname order by 3', queryTimes=1)
tdSql.checkData(0, 0, 0)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 2, 't3')
tdSql.checkData(0, 2, 't1')
tdSql.checkData(1, 0, 15)
tdSql.checkData(1, 1, 1471617148940980000)
@ -268,7 +268,7 @@ class TDTestCase:
tdSql.checkData(2, 0, 0)
tdSql.checkData(2, 1, None)
tdSql.checkData(2, 2, 't1')
tdSql.checkData(2, 2, 't3')
def run(self):
self.test_count_with_sma_data()

View File

@ -10,14 +10,14 @@ class TDTestCase:
tdSql.execute("create database td_28068;")
tdSql.execute("create database if not exists td_28068;")
tdSql.execute("create stable td_28068.st (ts timestamp, test_case nchar(10), time_cost float, num float) tags (branch nchar(10), scenario nchar(10));")
tdSql.execute("insert into td_28068.ct1 using td_28068.st (branch, scenario) tags ('3.0', 'scenario1') values (now(), 'query1', 1,2);")
tdSql.execute("insert into td_28068.ct1 using td_28068.st (branch, scenario) tags ('3.0', 'scenario1') values (now(), 'query1', 2,3);")
tdSql.execute("insert into td_28068.ct2 using td_28068.st (branch, scenario) tags ('3.0', 'scenario2') values (now(), 'query1', 10,1);")
tdSql.execute("insert into td_28068.ct2 using td_28068.st (branch, scenario) tags ('3.0', 'scenario2') values (now(), 'query1', 11,5);")
tdSql.execute("insert into td_28068.ct3 using td_28068.st (branch, scenario) tags ('3.1', 'scenario1') values (now(), 'query1', 20,4);")
tdSql.execute("insert into td_28068.ct3 using td_28068.st (branch, scenario) tags ('3.1', 'scenario1') values (now(), 'query1', 30,1);")
tdSql.execute("insert into td_28068.ct4 using td_28068.st (branch, scenario) tags ('3.1', 'scenario2') values (now(), 'query1', 8,8);")
tdSql.execute("insert into td_28068.ct4 using td_28068.st (branch, scenario) tags ('3.1', 'scenario2') values (now(), 'query1', 9,10);")
tdSql.execute("insert into td_28068.ct1 using td_28068.st (branch, scenario) tags ('3.0', 'scenario1') values (1717122943000, 'query1', 1,2);")
tdSql.execute("insert into td_28068.ct1 using td_28068.st (branch, scenario) tags ('3.0', 'scenario1') values (1717122944000, 'query1', 2,3);")
tdSql.execute("insert into td_28068.ct2 using td_28068.st (branch, scenario) tags ('3.0', 'scenario2') values (1717122945000, 'query1', 10,1);")
tdSql.execute("insert into td_28068.ct2 using td_28068.st (branch, scenario) tags ('3.0', 'scenario2') values (1717122946000, 'query1', 11,5);")
tdSql.execute("insert into td_28068.ct3 using td_28068.st (branch, scenario) tags ('3.1', 'scenario1') values (1717122947000, 'query1', 20,4);")
tdSql.execute("insert into td_28068.ct3 using td_28068.st (branch, scenario) tags ('3.1', 'scenario1') values (1717122948000, 'query1', 30,1);")
tdSql.execute("insert into td_28068.ct4 using td_28068.st (branch, scenario) tags ('3.1', 'scenario2') values (1717122949000, 'query1', 8,8);")
tdSql.execute("insert into td_28068.ct4 using td_28068.st (branch, scenario) tags ('3.1', 'scenario2') values (1717122950000, 'query1', 9,10);")
def run(self):
tdSql.query('select last(ts) as ts, last(branch) as branch, last(scenario) as scenario, last(test_case) as test_case from td_28068.st group by branch, scenario order by last(branch);')

View File

@ -45,6 +45,9 @@ class TDTestCase:
for ts_value in [self.tdCom.date_time, window_close_ts-1]:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
if i == 0 and fill_history_value is not None:
for tbname in [self.ctb_stream_des_table, self.tb_stream_des_table]:
tdSql.query(f'select count(*) from {tbname}', count_expected_res=1)
if self.tdCom.update and i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)

View File

@ -435,12 +435,12 @@ int32_t shellParseArgs(int32_t argc, char *argv[]) {
shell.info.promptSize = strlen(shell.info.promptHeader);
#ifdef TD_ENTERPRISE
snprintf(shell.info.programVersion, sizeof(shell.info.programVersion),
"version: %s compatible_version: %s\ngitinfo: %s\ngitinfoOfInternal: %s\nbuildInfo: %s", version,
"%s\ntaos version: %s compatible_version: %s\ngit: %s\ngit: %s\nbuild: %s", TD_PRODUCT_NAME, version,
compatible_version, gitinfo, gitinfoOfInternal, buildinfo);
#else
snprintf(shell.info.programVersion, sizeof(shell.info.programVersion),
"version: %s compatible_version: %s\ngitinfo: %s\nbuildInfo: %s", version, compatible_version, gitinfo,
buildinfo);
"%s\ntaos version: %s compatible_version: %s\ngit: %s\nbuild: %s", TD_PRODUCT_NAME, version,
compatible_version, gitinfo, buildinfo);
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)

View File

@ -1189,12 +1189,11 @@ bool shellGetGrantInfo(char* buf) {
fprintf(stderr, "\r\nFailed to get grant information from server. Abort.\r\n");
exit(0);
}
char serverVersion[32] = {0};
char serverVersion[64] = {0};
char expiretime[32] = {0};
char expired[32] = {0};
memcpy(serverVersion, row[0], fields[0].bytes);
tstrncpy(serverVersion, row[0], 64);
memcpy(expiretime, row[1], fields[1].bytes);
memcpy(expired, row[2], fields[2].bytes);
@ -1202,10 +1201,10 @@ bool shellGetGrantInfo(char* buf) {
community = true;
} else if (strcmp(expiretime, "unlimited") == 0) {
community = false;
sprintf(buf, "Server is Enterprise %s Edition, %s and will never expire.\r\n", serverVersion, sinfo);
sprintf(buf, "Server is %s, %s and will never expire.\r\n", serverVersion, sinfo);
} else {
community = false;
sprintf(buf, "Server is Enterprise %s Edition, %s and will expire at %s.\r\n", serverVersion, sinfo,
sprintf(buf, "Server is %s, %s and will expire at %s.\r\n", serverVersion, sinfo,
expiretime);
}

View File

@ -982,6 +982,9 @@ int smlProcess_18784_Test() {
rowIndex++;
}
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists db_18784");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -1187,6 +1190,9 @@ int sml_ts2164_Test() {
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes);
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists line_test");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -1236,6 +1242,8 @@ int sml_ts3116_Test() {
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
code = taos_errno(pRes);
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists ts3116");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -1449,6 +1457,13 @@ int sml_td24070_Test() {
code = taos_errno(pRes);
ASSERT(code == 0);
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists td24070_write");
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists td24070_read");
taos_free_result(pRes);
taos_close(taos);
// test table privilege
@ -1477,6 +1492,10 @@ int sml_td23881_Test() {
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes);
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists line_23881");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -1691,6 +1710,8 @@ int sml_ts2385_Test() {
code = taos_errno(pRes);
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists ts2385");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -1784,6 +1805,9 @@ int sml_td24559_Test() {
}
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists td24559");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -1889,6 +1913,9 @@ int sml_td29691_Test() {
ASSERT(code == TSDB_CODE_PAR_DUPLICATED_COLUMN);
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists td29691");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -1942,6 +1969,9 @@ int sml_td18789_Test() {
}
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists td18789");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -2047,6 +2077,9 @@ int sml_td29373_Test() {
ASSERT(code == TSDB_CODE_SML_NOT_SUPPORT_PK);
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists td29373");
taos_free_result(pRes);
taos_close(taos);
return code;