Merge branch '3.0' into enh/TS-3737-3.0

This commit is contained in:
kailixu 2024-06-06 16:42:04 +08:00
commit 2f5179cf44
50 changed files with 846 additions and 142 deletions

View File

@ -451,6 +451,101 @@ In addition to writing data using the SQL method or the parameter binding API, w
- Within _reqid interfaces can track the entire call chain by passing the reqid parameter.
### Subscription API
- `const char *tmq_err2str(int32_t code)`
**Description**
- This interface is used to convert error codes for data subscriptions into error messages
**Parameter description**
- code: error code
**Return value**
- non NULL, return error message, error message may be empty
- `tmq_conf_t *tmq_conf_new()`
- `tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value)`
- `void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param)`
- `void tmq_conf_destroy(tmq_conf_t *conf)`
tmq_conf_res_t defined as follows:
```
typedef enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
} tmq_conf_res_t;
```
commit callback function defined as follows:
```
typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param))
```
**Description**
- tmq_conf_new : create a tmq_conf_t structure to configure consumption parameters
- tmq_conf_set : set configuration, configuration is key-value pair
- tmq_conf_set_auto_commit_cb : set auto commit callback function
- tmq_conf_destroy : destroy tmq_conf_t structure
**Parameter description**
- tmq_conf_set : key is parameter namevalue is parameter value
- tmq_conf_set_auto_commit_cb : cb is callback function, param is callback function parameter
**Return value**
- tmq_conf_new: structure of tmq_conf_t, NULL failed
- tmq_conf_set: tmq_conf_res_t, TMQ_CONF_OK means success, others means failure
- `tmq_list_t *tmq_list_new()`
- `int32_t tmq_list_append(tmq_list_t *, const char *)`
- `void tmq_list_destroy(tmq_list_t *)`
- `int32_t tmq_list_get_size(const tmq_list_t *)`
- `char **tmq_list_to_c_array(const tmq_list_t *)`
**Description**
- tmq_list_new : build a tmq_list_t constructure, used to save topic
- tmq_list_append : add topic to tmq_list_t
- tmq_list_destroy : destroy tmq_list_t
- tmq_list_get_size : get size of tmq_list_t
- tmq_list_to_c_array : convert tmq_list_t to c array, element is string pointer
**Return value**
- tmq_list_new : structure of tmq_list_t, tmq_list_t is a list of strings, NULL failed
- tmq_list_append : zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_list_get_size : size of tmq_list_t, -1 failed
- tmq_list_to_c_array : c array, element is pointer of string, NULL failed
- `tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen)`
- `int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list)`
- `int32_t tmq_unsubscribe(tmq_t *tmq)`
- `int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topic_list)`
- `TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout)`
- `int32_t tmq_consumer_close(tmq_t *tmq)`
**Description**
- tmq_consumer_new : build a tmq_t constructure, need to be used with tmq_consumer_close
- tmq_subscribe : subscribe topic, need to be used with tmq_unsubscribe
- tmq_unsubscribe : unsubscribe topic, need to be used with tmq_subscribe
- tmq_subscription : obtain a list of topics subscribed by consumer
- tmq_consumer_poll : used to consume data
- tmq_consumer_close : clost tmq_t, need to be used with tmq_consumer_new
**Parameter description**
- conf: sed to configure consume parameters
- errstr: The error information is stored in this string. Allocation and release of memory are the responsibility of the caller
- errstenLen: the length of errstr
- tmq: structure of tmq_t returned by tmq_consumer_new
- topic_list: a list of topics subscribed by consumersneed to be freed by tmq_list_destroy
- timeout: the timeout time, measured in milliseconds, indicates how long it takes for data to expire. If it is negative, it will default to 1 second
**Return value**
- tmq_consumer_new: structure of tmq_t, NULL failed
- tmq_subscribe: zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_unsubscribe: zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_subscription: zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_consumer_poll: structure of TAOS_RES(same like taos_query), NULL if there is no data
- tmq_consumer_close: zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment)`
- `void tmq_free_assignment(tmq_topic_assignment* pAssignment)`
@ -474,6 +569,7 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Return value**
- zero success,none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**Function description**
@ -482,6 +578,7 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Return value**
- the value of committed offset, -2147467247 means no committed value, Other values less than 0 indicate failure
- `int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg)`
- `void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param)`
- `int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
@ -499,6 +596,7 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Return value**
- zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**Function description**
@ -507,6 +605,7 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Return value**
- the current consumption location, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
**Function description**
@ -514,25 +613,52 @@ In addition to writing data using the SQL method or the parameter binding API, w
**Return value**
- zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
- `int32_t tmq_get_vgroup_id(TAOS_RES *res)`
**Description**
- tmq_get_vgroup_offset : Obtain the starting offset of the consumed data
- tmq_get_vgroup_id : Obtain the vgroup id of the consumed data
- `int32_t int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
**Function description**
- Obtain the starting offset of the consumed data
**Parameter description**
- msgMessage consumed
**Return value**
- the starting offset of the consumed data, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- msg : Message consumed
- `int32_t int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics)`
**Function description**
- Obtain a list of topics subscribed by consumers
**Parameter description**
- topics: a list of topics subscribed by consumersneed to be freed by tmq_list_destroy
**Return value**
- zero successnone zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_get_vgroup_offset : the starting offset of the consumed data, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- tmq_get_vgroup_id : vgroup id of result, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `TAOS *tmq_get_connect(tmq_t *tmq)`
- `const char *tmq_get_table_name(TAOS_RES *res)`
- `tmq_res_t tmq_get_res_type(TAOS_RES *res)`
- `const char *tmq_get_topic_name(TAOS_RES *res)`
- `const char *tmq_get_db_name(TAOS_RES *res)`
tmq_res_t the type of consumed result, defined as follows:
```
typedef enum tmq_res_t {
TMQ_RES_INVALID = -1, // invalid
TMQ_RES_DATA = 1, // data
TMQ_RES_TABLE_META = 2, // meta
TMQ_RES_METADATA = 3 // data & meta
} tmq_res_t;
```
**Description**
- tmq_get_connect : when creating a consumer, a link will be automatically established and saved in the tmq_t structure. This interface allows users to obtain link information(same like taos_connect) from the tmq_t structure
- tmq_get_table_name : get the table name of result
- tmq_get_res_type : get the type of result
- tmq_get_topic_name : get the topic name of result
- tmq_get_db_name : get the db name of result
**Parameter description**
- tmq : tmq_t structure created by tmq_consumer_new
- res : TAOS_RES structure returned by tmq_consumer_poll
**Return value**
- tmq_get_connect : connection info in tmq, NULL if failed
- tmq_get_table_name : table name of result, NULL if failed
- tmq_get_res_type : result type tmq_res_t
- tmq_get_topic_name : topic name of result, NULL if failed
- tmq_get_db_name : db name of result, NULL if failed

View File

@ -1,5 +1,5 @@
if (! "RJDBC" %in% installed.packages()[, "Package"]) {
install.packages('RJDBC', repos='http://cran.us.r-project.org')
install.packages('RJDBC', repos='http://mirrors.tuna.tsinghua.edu.cn/CRAN')
}
# ANCHOR: demo

View File

@ -528,10 +528,103 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
- 带_reqid的接口可以通过传递reqid参数来追踪整个的调用链。
### 数据订阅 API
- `const char *tmq_err2str(int32_t code)`
**功能说明**
- 该接口用于将数据订阅的错误码转换为错误信息
**参数说明**
- code: 数据订阅的错误码
**返回值**
- 非NULL返回错误信息错误信息可能为空字符串
- `tmq_conf_t *tmq_conf_new()`
- `tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value)`
- `void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param)`
- `void tmq_conf_destroy(tmq_conf_t *conf)`
tmq_conf_res_t 错误码定义如下:
```
typedef enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
} tmq_conf_res_t;
```
设置自动提交回调函数的定义如下:
```
typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param))
```
**功能说明**
- tmq_conf_new 接口用于创建一个 tmq_conf_t 结构体,用于配置消费参数。
- tmq_conf_set 接口用于设置消费参数。
- tmq_conf_set_auto_commit_cb 接口用于设置自动提交回调函数。
- tmq_conf_destroy 接口用于销毁 tmq_conf_t 结构体。
**参数说明**
- tmq_conf_set : key 为参数名value 为参数值
- tmq_conf_set_auto_commit_cb : cb 回调函数, param 回调函数参数
**返回值**
- tmq_conf_new: 配置 tmq_conf_t 类型指针, NULL 失败
- tmq_conf_set: 结果 tmq_conf_res_t 类型, TMQ_CONF_OK 成功, 其他失败
- `tmq_list_t *tmq_list_new()`
- `int32_t tmq_list_append(tmq_list_t *, const char *)`
- `void tmq_list_destroy(tmq_list_t *)`
- `int32_t tmq_list_get_size(const tmq_list_t *)`
- `char **tmq_list_to_c_array(const tmq_list_t *)`
**功能说明**
- tmq_list_new 接口用于创建一个 tmq_list_t 结构体,用于存储订阅的 topic。
- tmq_list_append 接口用于向 tmq_list_t 结构体中添加一个 topic。
- tmq_list_destroy 接口用于销毁 tmq_list_t 结构体tmq_list_new 的结果需要通过该接口销毁。
- tmq_list_get_size 接口用于获取 tmq_list_t 结构体中 topic 的个数。
- tmq_list_to_c_array 接口用于将 tmq_list_t 结构体转换为 C 数组,数组每个元素为字符串指针。
**返回值**
- tmq_list_new : 返回 tmq_list_t 结果指针, tmq_list_t 是一个数组,每个元素是一个字符串, NULL 失败
- tmq_list_append : 返回 0 表示成功非0表示失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- tmq_list_get_size : 返回 tmq_list_t 结构体中 topic 的个数, -1 失败
- tmq_list_to_c_array : 返回 c 数组, 每个元素是字符串指针, NULL 失败
- `tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen)`
- `int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list)`
- `int32_t tmq_unsubscribe(tmq_t *tmq)`
- `int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topic_list)`
- `TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout)`
- `int32_t tmq_consumer_close(tmq_t *tmq)`
**功能说明**
- tmq_consumer_new 接口用于创建一个 tmq_t 结构体,用于消费数据,消费完数据后需调用 tmq_consumer_close 关闭消费者。
- tmq_subscribe 接口用于订阅 topic 列表,消费完数据后,需调用 tmq_subscribe 取消订阅。
- tmq_unsubscribe 接口用于取消订阅的 topic 列表。需与 tmq_subscribe 配合使用。
- tmq_subscription 接口用于获取订阅的 topic 列表。
- tmq_consumer_poll 接口用于轮询消费数据,每一个消费者,只能单线程调用该接口。
- tmq_consumer_close 接口用于关闭 tmq_t 结构体。需与 tmq_consumer_new 配合使用。
**参数说明**
- conf: 参数用于配置消费参数
- errstr: 错误信息存储在这个字符串中,需自定分配内存,释放内存由调用者负责
- errstenLen: errstr 字符串的长度
- tmq: tmq_consumer_new 函数返回的 tmq_t 结构体
- topic_list: topic 列表
- timeout: 超时时间,单位为毫秒,表示多久没数据的话自动返回 NULL负数的话默认超时1秒
**返回值**
- tmq_consumer_new 返回 tmq_t 结构体,失败返回 NULL
- tmq_subscribe 返回错误码0 表示成功非0表示失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- tmq_unsubscribe 返回错误码0 表示成功非0表示失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- tmq_subscription 返回错误码0 表示成功非0表示失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- tmq_consumer_poll 返回 TAOS_RES 结构体NULL 表示没有数据,非 NULL 表示有数据TAOS_RES 结果和 taos_query 返回结果一致,可通过查询的各种接口获取 TAOS_RES 里的信息,比如 schema 等。
- tmq_consumer_close 返回错误码0 表示成功非0表示失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- `int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment)`
- `void tmq_free_assignment(tmq_topic_assignment* pAssignment)`
tmq_topic_assignment结构体定义如下
```c
typedef struct tmq_topic_assignment {
@ -541,6 +634,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
int64_t end;
} tmq_topic_assignment;
```
**功能说明**
- tmq_get_topic_assignment 接口返回当前consumer分配的vgroup的信息每个vgroup的信息包括vgIdwal的最大最小offset以及当前消费到的offset。
@ -551,65 +645,97 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
**返回值**
- 错误码0成功非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- `int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**功能说明**
- 获取当前 consumer 在某个 topic 和 vgroup上的 commit 位置。
**返回值**
- 当前commit的位置-2147467247表示没有消费进度其他小于0的值表示失败错误码就是返回值
- `int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg)`
- `void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param)`
- `int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
- `void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param)`
**功能说明**
- commit接口分为两种类型每种类型有同步和异步接口
- 第一种类型根据消息提交提交消息里的进度如果消息传NULL提交当前consumer所有消费的vgroup的当前进度 : tmq_commit_sync/tmq_commit_async
- 第二种类型根据某个topic的某个vgroup的offset提交 : tmq_commit_offset_sync/tmq_commit_offset_async
- 第一种类型根据消息提交提交消息里的进度如果消息传NULL提交当前consumer所有消费的vgroup的当前进度 : tmq_commit_sync/tmq_commit_async
- 第二种类型根据某个topic的某个vgroup的offset提交 : tmq_commit_offset_sync/tmq_commit_offset_async
**参数说明**
- msg消费到的消息结构如果msg传NULL提交当前consumer所有消费的vgroup的当前进度
**返回值**
- 错误码0成功非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**功能说明**
- 获取当前消费位置,为消费到的数据位置的下一个位置
**返回值**
- 消费位置非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
- `int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
**功能说明**
- 设置 consumer 在某个topic的某个vgroup的 offset位置开始消费
**返回值**
- 错误码0成功非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int32_t int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
- `int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
- `int32_t tmq_get_vgroup_id(TAOS_RES *res)`
**功能说明**
- 获取 poll 消费到的数据的起始offset
- tmq_get_vgroup_offset 获取 poll 消费到的数据的起始offset
- tmq_get_vgroup_id 获取 poll 消费到的数据的所属的vgrou id
**参数说明**
- msg消费到的消息结构
**返回值**
- 消费到的offset非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int32_t int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics)`
- tmq_get_vgroup_offset 返回值为消费到的offset非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- tmq_get_vgroup_id 返回值为消费到的数据所属的vgrou id非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `TAOS *tmq_get_connect(tmq_t *tmq)`
- `const char *tmq_get_table_name(TAOS_RES *res)`
- `tmq_res_t tmq_get_res_type(TAOS_RES *res)`
- `const char *tmq_get_topic_name(TAOS_RES *res)`
- `const char *tmq_get_db_name(TAOS_RES *res)`
tmq_res_t 表示消费到的数据类型,定义如下:
```
typedef enum tmq_res_t {
TMQ_RES_INVALID = -1, // invalid
TMQ_RES_DATA = 1, // 数据
TMQ_RES_TABLE_META = 2, // 元数据
TMQ_RES_METADATA = 3 // 既有元数据又有数据,即自动建表
} tmq_res_t;
```
**功能说明**
- 获取消费者订阅的 topic 列表
- tmq_get_connect 创建consumer时会自动建立链接保存在 tmq_t 结构体中,该接口用户获取 tmq_t 结构体中的链接信息类似taos_connect
- tmq_get_table_name 获取返回结果所属的的表名
- tmq_get_res_type 获取返回结果的类型
- tmq_get_topic_name 获取返回结果所属的topic名
- tmq_get_db_name 获取返回结果所属的数据库名
**参数说明**
- topics: 获取的 topic 列表存储在这个结构中接口内分配内存需调用tmq_list_destroy释放
- tmqtmq_consumer_new 返回的消费者handle
- restmq_consumer_poll 返回的消费到的消息
**返回值**
- 错误码0成功非0失败可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- tmq_get_connect 返回值为tmq_t结构体中的链接连接非 NULL 正常NULL 失败
- tmq_get_table_name 返回值为消费到的数据所属的表名,非 NULL 正常NULL 失败
- tmq_get_res_type 返回值为消费到的数据所属的类型,具体见上面 tmq_res_t 的注释说明
- tmq_get_topic_name 返回值为消费到的数据所属的 topic 名,非 NULL 正常NULL 失败
- tmq_get_db_name 返回值为消费到的数据所属的数据库名,非 NULL 正常NULL 失败

View File

@ -91,7 +91,7 @@ SELECT _wstart, count(*), avg(voltage) from meters PARTITION BY tbname COUNT_WIN
CREATE STREAM avg_vol_s INTO avg_vol SUBTABLE(CONCAT('new-', tname)) AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname tname INTERVAL(1m);
```
PARTITION 子句中,为 tbname 定义了一个别名 tname, 在PARTITION 子句中的别名可以用于 SUBTABLE 子句中的表达式计算,在上述示例中,流新创建的子表将以前缀 'new-' 连接原表名作为表名(从3.2.3.0开始,为了避免 sutable 中的表达式无法区分各个子表,即误将多个相同时间线写入一个子表,在指定的子表名后面加上 __stableName_groupId)。
PARTITION 子句中,为 tbname 定义了一个别名 tname, 在PARTITION 子句中的别名可以用于 SUBTABLE 子句中的表达式计算,在上述示例中,流新创建的子表将以前缀 'new-' 连接原表名作为表名(从3.2.3.0开始,为了避免 SUBTABLE 中的表达式无法区分各个子表,即误将多个相同时间线写入一个子表,在指定的子表名后面加上 __stableName_groupId)。
注意,子表名的长度若超过 TDengine 的限制,将被截断。若要生成的子表名已经存在于另一超级表,由于 TDengine 的子表名是唯一的,因此对应新子表的创建以及数据的写入将会失败。

View File

@ -45,6 +45,7 @@ extern "C" {
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
typedef enum {
TAOS_WAL_SKIP = 0,
TAOS_WAL_WRITE = 1,
TAOS_WAL_FSYNC = 2,
} EWalType;
@ -145,17 +146,18 @@ typedef struct SWalReader SWalReader;
// todo hide this struct
struct SWalReader {
SWal *pWal;
int64_t readerId;
TdFilePtr pLogFile;
TdFilePtr pIdxFile;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t skipToVersion; // skip data and jump to destination version, usually used by stream resume ignoring untreated data
SWal *pWal;
int64_t readerId;
TdFilePtr pLogFile;
TdFilePtr pIdxFile;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t skipToVersion; // skip data and jump to destination version, usually used by stream resume ignoring untreated
// data
int64_t capacity;
TdThreadMutex mutex;
SWalFilterCond cond;
SWalCkHead *pHead;
SWalCkHead *pHead;
};
// module initialization
@ -198,7 +200,7 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond, int64_t id);
void walCloseReader(SWalReader *pRead);
void walReadReset(SWalReader *pReader);
int32_t walReadVer(SWalReader *pRead, int64_t ver);
void decryptBody(SWalCfg* cfg, SWalCkHead* pHead, int32_t plainBodyLen, const char* func);
void decryptBody(SWalCfg *cfg, SWalCkHead *pHead, int32_t plainBodyLen, const char *func);
int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead);
int64_t walReaderGetCurrentVer(const SWalReader *pReader);
@ -206,7 +208,7 @@ int64_t walReaderGetValidFirstVer(const SWalReader *pReader);
int64_t walReaderGetSkipToVersion(SWalReader *pReader);
void walReaderSetSkipToVersion(SWalReader *pReader, int64_t ver);
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever);
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset);
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal *pOffset);
// only for tq usage
int32_t walFetchHead(SWalReader *pRead, int64_t ver);

View File

@ -365,7 +365,7 @@ typedef enum ELogicConditionType {
#define TSDB_MIN_FSYNC_PERIOD 0
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
#define TSDB_MIN_WAL_LEVEL 1
#define TSDB_MIN_WAL_LEVEL 0
#define TSDB_MAX_WAL_LEVEL 2
#define TSDB_DEFAULT_WAL_LEVEL 1
#define TSDB_MIN_PRECISION TSDB_TIME_PRECISION_MILLI

View File

@ -412,11 +412,11 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
tmq_list_t* tmq_list_new() { return (tmq_list_t*)taosArrayInit(0, sizeof(void*)); }
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
if (list == NULL) return -1;
if (list == NULL) return TSDB_CODE_INVALID_PARA;
SArray* container = &list->container;
if (src == NULL || src[0] == 0) return -1;
if (src == NULL || src[0] == 0) return TSDB_CODE_INVALID_PARA;
char* topic = taosStrdup(src);
if (taosArrayPush(container, &topic) == NULL) return -1;
if (taosArrayPush(container, &topic) == NULL) return TSDB_CODE_INVALID_PARA;
return 0;
}
@ -2342,7 +2342,7 @@ const char* tmq_get_db_name(TAOS_RES* res) {
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
if (res == NULL) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
return ((SMqRspObjCommon*)res)->vgId;
@ -2350,7 +2350,7 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return pMetaRspObj->vgId;
} else {
return -1;
return TSDB_CODE_INVALID_PARA;
}
}
@ -2708,7 +2708,7 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
if (tmq == NULL) {
taosMemoryFree(pParamSet);
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return -1;
return terrno;
}
// if no more waiting rsp

View File

@ -796,7 +796,13 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
static bool mndCheckDbConflict(const char *conflict, STrans *pTrans) {
if (conflict[0] == 0) return false;
if (strcasecmp(conflict, pTrans->dbname) == 0 || strcasecmp(conflict, pTrans->stbname) == 0) return true;
if (strcasecmp(conflict, pTrans->dbname) == 0) return true;
return false;
}
static bool mndCheckStbConflict(const char *conflict, STrans *pTrans) {
if (conflict[0] == 0) return false;
if (strcasecmp(conflict, pTrans->stbname) == 0) return true;
return false;
}
@ -816,17 +822,17 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (mndCheckDbConflict(pNew->dbname, pTrans)) conflict = true;
if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true;
if (mndCheckStbConflict(pNew->stbname, pTrans)) conflict = true;
}
}
if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB) {
if (mndCheckDbConflict(pNew->dbname, pTrans)) conflict = true;
if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true;
if (mndCheckStbConflict(pNew->stbname, pTrans)) conflict = true;
}
if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true; // for stb
if (mndCheckStbConflict(pNew->stbname, pTrans)) conflict = true; // for stb
}
}

View File

@ -6771,6 +6771,15 @@ static int32_t checkDbEnumOption(STranslateContext* pCxt, const char* pName, int
return TSDB_CODE_SUCCESS;
}
static int32_t checkDbEnumOption3(STranslateContext* pCxt, const char* pName, int32_t val, int32_t v1, int32_t v2,
int32_t v3) {
if (val >= 0 && val != v1 && val != v2 && val != v3) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option %s: %d, only %d, %d, %d allowed", pName, val, v1, v2, v3);
}
return TSDB_CODE_SUCCESS;
}
static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRetentions, int8_t precision) {
if (NULL == pRetentions) {
return TSDB_CODE_SUCCESS;
@ -6914,7 +6923,12 @@ static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbNa
if ((pOptions->replica == 2) ^ (pOptions->withArbitrator == TSDB_MAX_DB_WITH_ARBITRATOR)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid database option, with_arbitrator should be used with replica 2");
"Invalid option, with_arbitrator should be used with replica 2");
}
if (pOptions->replica > 1 && pOptions->walLevel == 0) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option, wal_level 0 should be used with replica 1");
}
return TSDB_CODE_SUCCESS;
@ -6978,7 +6992,8 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
code = checkDbStrictOption(pCxt, pOptions);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkDbEnumOption(pCxt, "walLevel", pOptions->walLevel, TSDB_MIN_WAL_LEVEL, TSDB_MAX_WAL_LEVEL);
code = checkDbEnumOption3(pCxt, "walLevel", pOptions->walLevel, TSDB_MIN_WAL_LEVEL, TSDB_DEFAULT_WAL_LEVEL,
TSDB_MAX_WAL_LEVEL);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkDbRangeOption(pCxt, "vgroups", pOptions->numOfVgroups, TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
@ -7252,6 +7267,15 @@ static void buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt,
}
static int32_t translateAlterDatabase(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt) {
if (pStmt->pOptions->walLevel == 0) {
SDbCfgInfo dbCfg = {0};
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
if (TSDB_CODE_SUCCESS == code && dbCfg.replications > 1) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
"Invalid option, wal_level 0 should be used with replica 1");
}
}
int32_t code = checkDatabaseOptions(pCxt, pStmt->dbName, pStmt->pOptions);
if (TSDB_CODE_SUCCESS != code) {
return code;

View File

@ -335,7 +335,7 @@ TEST_F(ParserInitialATest, alterDatabaseSemanticCheck) {
run("ALTER DATABASE test KEEP 1000000000s", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test KEEP 1w", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test PAGES 63", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test WAL_LEVEL 0", TSDB_CODE_PAR_INVALID_DB_OPTION);
//run("ALTER DATABASE test WAL_LEVEL 0", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test WAL_LEVEL 3", TSDB_CODE_PAR_INVALID_DB_OPTION);
//run("ALTER DATABASE test REPLICA 2", TSDB_CODE_PAR_INVALID_DB_OPTION);
run("ALTER DATABASE test STT_TRIGGER 0", TSDB_CODE_PAR_INVALID_DB_OPTION);

View File

@ -4159,8 +4159,10 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, pLastRowCols, false, cxt.pkBytes);
lastRowScanOptRemoveUslessTargets(pScan->node.pTargets, cxt.pLastCols, cxt.pOtherCols, pLastRowCols);
if (pPKTsCol && ((pScan->node.pTargets->length == 1) || (pScan->node.pTargets->length == 2 && cxt.pkBytes > 0))) {
// when select last(ts),ts from ..., we add another ts to targets
if (pPKTsCol &&
((cxt.pLastCols->length == 1 && nodesEqualNode((SNode*)pPKTsCol, nodesListGetNode(cxt.pLastCols, 0))) ||
(pScan->node.pTargets->length == 2 && cxt.pkBytes > 0))) {
// when select last(ts),tbname,ts from ..., we add another ts to targets
sprintf(pPKTsCol->colName, "#sel_val.%p", pPKTsCol);
nodesListAppend(pScan->node.pTargets, nodesCloneNode((SNode*)pPKTsCol));
}

View File

@ -20,7 +20,6 @@
#include "tutil.h"
#include "walInt.h"
bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver;
}
@ -154,7 +153,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
// validate body
int32_t cryptedBodyLen = logContent->head.bodyLen;
if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
}
recordLen = walCkHeadSz + cryptedBodyLen;
@ -226,7 +225,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
goto _err;
}
if (taosFsyncFile(pFile) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pFile) < 0) {
wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
@ -626,7 +625,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
int32_t plainBodyLen = ckHead.head.bodyLen;
int32_t cryptedBodyLen = plainBodyLen;
if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
cryptedBodyLen = ENCRYPTED_LEN(cryptedBodyLen);
}
idxEntry.offset += sizeof(SWalCkHead) + cryptedBodyLen;
@ -636,7 +635,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
pWal->cfg.vgId, terrstr(), idxEntry.ver, idxEntry.offset, fLogNameStr);
goto _err;
}
if (taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, failed to append file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
goto _err;
@ -644,7 +643,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
count++;
}
if (taosFsyncFile(pIdxFile) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pIdxFile) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, faild to fsync file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
goto _err;
@ -880,13 +879,13 @@ int walSaveMeta(SWal* pWal) {
int n;
// fsync the idx and log file at first to ensure validity of meta
if (taosFsyncFile(pWal->pIdxFile) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pIdxFile) < 0) {
wError("vgId:%d, failed to sync idx file due to %s", pWal->cfg.vgId, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosFsyncFile(pWal->pLogFile) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pWal->pLogFile) < 0) {
wError("vgId:%d, failed to sync log file due to %s", pWal->cfg.vgId, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
@ -901,7 +900,8 @@ int walSaveMeta(SWal* pWal) {
return -1;
}
TdFilePtr pMetaFile = taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
TdFilePtr pMetaFile =
taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
if (pMetaFile == NULL) {
wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
@ -910,13 +910,13 @@ int walSaveMeta(SWal* pWal) {
char* serialized = walMetaSerialize(pWal);
int len = strlen(serialized);
if (len != taosWriteFile(pMetaFile, serialized, len)) {
if (pWal->cfg.level != TAOS_WAL_SKIP && len != taosWriteFile(pMetaFile, serialized, len)) {
wError("vgId:%d, failed to write file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile(pMetaFile) < 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosFsyncFile(pMetaFile) < 0) {
wError("vgId:%d, failed to sync file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;

View File

@ -79,8 +79,7 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) {
TdFilePtr pIdxTFile, pLogTFile;
char fnameStr[WAL_FILE_LEN];
if (pWal->pLogFile != NULL) {
code = taosFsyncFile(pWal->pLogFile);
if (code != 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pLogFile)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
@ -91,8 +90,7 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) {
}
}
if (pWal->pIdxFile != NULL) {
code = taosFsyncFile(pWal->pIdxFile);
if (code != 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pIdxFile)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}

View File

@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "crypt.h"
#include "os.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tglobal.h"
#include "walInt.h"
#include "crypt.h"
int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
taosThreadMutexLock(&pWal->mutex);
@ -295,9 +295,9 @@ int32_t walEndSnapshot(SWal *pWal) {
ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1);
// compatible mode for refVer
bool hasTopic = false;
bool hasTopic = false;
int64_t refVer = INT64_MAX;
void *pIter = NULL;
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pWal->pRefHash, pIter);
if (pIter == NULL) break;
@ -396,8 +396,7 @@ int32_t walRollImpl(SWal *pWal) {
int32_t code = 0;
if (pWal->pIdxFile != NULL) {
code = taosFsyncFile(pWal->pIdxFile);
if (code != 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pIdxFile)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto END;
}
@ -409,8 +408,7 @@ int32_t walRollImpl(SWal *pWal) {
}
if (pWal->pLogFile != NULL) {
code = taosFsyncFile(pWal->pLogFile);
if (code != 0) {
if (pWal->cfg.level != TAOS_WAL_SKIP && (code = taosFsyncFile(pWal->pLogFile)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto END;
}
@ -510,12 +508,15 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
wDebug("vgId:%d, wal write log %" PRId64 ", msgType: %s, cksum head %u cksum body %u", pWal->cfg.vgId, index,
TMSG_INFO(msgType), pWal->writeHead.cksumHead, pWal->writeHead.cksumBody);
code = walWriteIndex(pWal, index, offset);
if (code < 0) {
goto END;
if (pWal->cfg.level != TAOS_WAL_SKIP) {
code = walWriteIndex(pWal, index, offset);
if (code < 0) {
goto END;
}
}
if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
if (pWal->cfg.level != TAOS_WAL_SKIP &&
taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
@ -524,17 +525,17 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
}
int32_t cyptedBodyLen = plainBodyLen;
char* buf = (char*)body;
char* newBody = NULL;
char* newBodyEncrypted = NULL;
char *buf = (char *)body;
char *newBody = NULL;
char *newBodyEncrypted = NULL;
if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
cyptedBodyLen = ENCRYPTED_LEN(cyptedBodyLen);
newBody = taosMemoryMalloc(cyptedBodyLen);
if(newBody == NULL){
if (newBody == NULL) {
wError("vgId:%d, file:%" PRId64 ".log, failed to malloc since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
strerror(errno));
code = -1;
goto END;
}
@ -542,11 +543,11 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
memcpy(newBody, body, plainBodyLen);
newBodyEncrypted = taosMemoryMalloc(cyptedBodyLen);
if(newBodyEncrypted == NULL){
if (newBodyEncrypted == NULL) {
wError("vgId:%d, file:%" PRId64 ".log, failed to malloc since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
strerror(errno));
code = -1;
if(newBody != NULL) taosMemoryFreeClear(newBody);
if (newBody != NULL) taosMemoryFreeClear(newBody);
goto END;
}
@ -559,29 +560,29 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
int32_t count = CBC_Encrypt(&opts);
//wDebug("vgId:%d, file:%" PRId64 ".log, index:%" PRId64 ", CBC_Encrypt cryptedBodyLen:%d, plainBodyLen:%d, %s",
// pWal->cfg.vgId, walGetLastFileFirstVer(pWal), index, count, plainBodyLen, __FUNCTION__);
// wDebug("vgId:%d, file:%" PRId64 ".log, index:%" PRId64 ", CBC_Encrypt cryptedBodyLen:%d, plainBodyLen:%d, %s",
// pWal->cfg.vgId, walGetLastFileFirstVer(pWal), index, count, plainBodyLen, __FUNCTION__);
buf = newBodyEncrypted;
}
if (taosWriteFile(pWal->pLogFile, (char *)buf, cyptedBodyLen) != cyptedBodyLen) {
if (pWal->cfg.level != TAOS_WAL_SKIP && taosWriteFile(pWal->pLogFile, (char *)buf, cyptedBodyLen) != cyptedBodyLen) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
code = -1;
if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
taosMemoryFreeClear(newBody);
taosMemoryFreeClear(newBodyEncrypted);
}
goto END;
}
if(pWal->cfg.encryptAlgorithm == DND_CA_SM4){
if (pWal->cfg.encryptAlgorithm == DND_CA_SM4) {
taosMemoryFreeClear(newBody);
taosMemoryFreeClear(newBodyEncrypted);
//wInfo("vgId:%d, free newBody newBodyEncrypted %s",
// pWal->cfg.vgId, __FUNCTION__);
taosMemoryFreeClear(newBodyEncrypted);
// wInfo("vgId:%d, free newBody newBodyEncrypted %s",
// pWal->cfg.vgId, __FUNCTION__);
}
// set status
@ -693,6 +694,10 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
}
void walFsync(SWal *pWal, bool forceFsync) {
if (pWal->cfg.level == TAOS_WAL_SKIP) {
return;
}
taosThreadMutexLock(&pWal->mutex);
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));

View File

@ -0,0 +1,110 @@
import taos
import sys
import os
import subprocess
import glob
import shutil
import time
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.srvCtl import *
from frame.caseBase import *
from frame import *
from frame.autogen import *
# from frame.server.dnodes import *
# from frame.server.cluster import *
class TDTestCase(TBase):
updatecfgDict = {
'slowLogScope':"query"
}
def init(self, conn, logSql, replicaVar=3):
super(TDTestCase, self).init(conn, logSql, replicaVar=3, db="snapshot", checkColName="c1")
self.valgrind = 0
self.childtable_count = 10
# tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), logSql) # output sql.txt file
def run(self):
tdSql.prepare()
autoGen = AutoGen()
autoGen.create_db(self.db, 2, 3)
tdSql.execute(f"use {self.db}")
autoGen.create_stable(self.stb, 5, 10, 8, 8)
autoGen.create_child(self.stb, "d", self.childtable_count)
autoGen.insert_data(1000)
tdSql.execute(f"flush database {self.db}")
sc.dnodeStop(3)
# clusterDnodes.stoptaosd(1)
# clusterDnodes.starttaosd(3)
# time.sleep(5)
# clusterDnodes.stoptaosd(2)
# clusterDnodes.starttaosd(1)
# time.sleep(5)
autoGen.insert_data(5000, True)
self.flushDb(True)
# wait flush operation over
time.sleep(5)
# sql = 'show vnodes;'
# while True:
# bFinish = True
# param_list = tdSql.query(sql, row_tag=True)
# for param in param_list:
# if param[3] == 'leading' or param[3] == 'following':
# bFinish = False
# break
# if bFinish:
# break
self.snapshotAgg()
time.sleep(10)
sc.dnodeStopAll()
for i in range(1, 4):
path = clusterDnodes.getDnodeDir(i)
dnodesRootDir = os.path.join(path,"data","vnode", "vnode*")
dirs = glob.glob(dnodesRootDir)
for dir in dirs:
if os.path.isdir(dir):
self.remove_directory(os.path.join(dir, "wal"))
sc.dnodeStart(1)
sc.dnodeStart(2)
sc.dnodeStart(3)
sql = "show vnodes;"
time.sleep(10)
while True:
bFinish = True
param_list = tdSql.query(sql, row_tag=True)
for param in param_list:
if param[3] == 'offline':
tdLog.exit(
"dnode synchronous fail dnode id: %d, vgroup id:%d status offline" % (param[0], param[1]))
if param[3] == 'leading' or param[3] == 'following':
bFinish = False
break
if bFinish:
break
self.timestamp_step = 1000
self.insert_rows = 6000
self.checkInsertCorrect()
self.checkAggCorrect()
def remove_directory(self, directory):
try:
shutil.rmtree(directory)
tdLog.debug("delete dir: %s " % (directory))
except OSError as e:
tdLog.exit("delete fail dir: %s " % (directory))
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -19,5 +19,3 @@ import sys
import os
import time
import datetime

28
tests/army/frame/eutil.py Normal file
View File

@ -0,0 +1,28 @@
###################################################################
# Copyright (c) 2023 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
#
# about tools funciton extension
#
import sys
import os
import time
import datetime
import psutil
# cpu frequent as random
def cpuRand(max):
decimal = int(str(psutil.cpu_freq().current).split(".")[1])
return decimal % max

View File

@ -19,6 +19,7 @@ import taos
import frame
import frame.etool
import frame.eos
import frame.eutil
from frame.log import *
from frame.cases import *
@ -46,17 +47,20 @@ for test:
class TDTestCase(TBase):
index = eutil.cpuRand(20) + 1
bucketName = f"ci-bucket{index}"
updatecfgDict = {
"supportVnodes":"1000",
's3EndPoint': 'http://192.168.1.52:9000',
's3AccessKey': 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX',
's3BucketName': 'ci-bucket',
's3BucketName': f'{bucketName}',
's3PageCacheSize': '10240',
"s3UploadDelaySec": "10",
's3MigrateIntervalSec': '600',
's3MigrateEnabled': '1'
}
tdLog.info(f"assign bucketName is {bucketName}\n")
maxFileSize = (128 + 10) * 1014 * 1024 # add 10M buffer
def insertData(self):
@ -241,10 +245,9 @@ class TDTestCase(TBase):
#
def preDb(self, vgroups):
cnt = int(time.time())%3 + 1
cnt = int(time.time())%2 + 1
for i in range(cnt):
vg = int(time.time()*1000)%10 + 1
vg = eutil.cpuRand(9) + 1
sql = f"create database predb vgroups {vg}"
tdSql.execute(sql, show=True)
sql = "drop database predb"

View File

@ -10,26 +10,26 @@
#
# army-test
#
,,y,army,./pytest.sh python3 ./test.py -f enterprise/multi-level/mlevel_basic.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f enterprise/db-encrypt/basic.py
,,n,army,python3 ./test.py -f enterprise/s3/s3Basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/snapshot.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f community/query/function/test_func_elapsed.py
,,y,army,./pytest.sh python3 ./test.py -f community/query/test_join.py
,,y,army,./pytest.sh python3 ./test.py -f community/query/test_compare.py
,,y,army,./pytest.sh python3 ./test.py -f community/insert/test_column_tag_boundary.py
,,y,army,./pytest.sh python3 ./test.py -f community/query/fill/fill_desc.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/incSnapshot.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/query/query_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/query/accuracy/test_query_accuracy.py
,,y,army,./pytest.sh python3 ./test.py -f community/insert/insert_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/splitVgroupByLearner.py -N 3
,,n,army,python3 ./test.py -f community/cmdline/fullopt.py
,,n,army,python3 ./test.py -f community/query/show.py -N 3
,,n,army,python3 ./test.py -f enterprise/alter/alterConfig.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/query/subquery/subqueryBugs.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/storage/oneStageComp.py -N 3 -L 3 -D 1
,,y,army,./pytest.sh python3 ./test.py -f community/storage/compressBasic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f multi-level/mlevel_basic.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f db-encrypt/basic.py
,,n,army,python3 ./test.py -f s3/s3Basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f cluster/snapshot.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f query/function/test_func_elapsed.py
,,y,army,./pytest.sh python3 ./test.py -f query/test_join.py
,,y,army,./pytest.sh python3 ./test.py -f query/test_compare.py
,,y,army,./pytest.sh python3 ./test.py -f insert/test_column_tag_boundary.py
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_desc.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f cluster/incSnapshot.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f query/query_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f query/accuracy/test_query_accuracy.py
,,y,army,./pytest.sh python3 ./test.py -f insert/insert_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f cluster/splitVgroupByLearner.py -N 3
,,n,army,python3 ./test.py -f cmdline/fullopt.py
,,n,army,python3 ./test.py -f query/show.py -N 3
,,n,army,python3 ./test.py -f alter/alterConfig.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f query/subquery/subqueryBugs.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f storage/oneStageComp.py -N 3 -L 3 -D 1
,,y,army,./pytest.sh python3 ./test.py -f storage/compressBasic.py -N 3
#
# system test

View File

@ -49,7 +49,7 @@ print ============= create database with all options
# | KEEP value [max(1d ~ 365000d), default: 1d, unit may be minut/hour/day]
# | PRECISION ['ms' | 'us' | 'ns', default: ms]
# | REPLICA value [1 | 3, default: 1]
# | WAL_LEVEL value [1 | 2, default: 1]
# | WAL_LEVEL value [0 | 1 | 2, default: 1]
# | VGROUPS value [default: 2]
# | SINGLE_STABLE [0 | 1, default: ]
#
@ -404,7 +404,7 @@ endi
sql drop database db
sql_error create database db WAL_LEVEL 3
sql_error create database db WAL_LEVEL -1
sql_error create database db WAL_LEVEL 0
#sql_error create database db WAL_LEVEL 0
print ====> VGROUPS value [1~4096, default: 2]
sql create database db VGROUPS 1

View File

@ -225,7 +225,7 @@ sql_error create database $db ctime 29
sql_error create database $db ctime 40961
# wal {0, 2}
sql_error create database testwal wal_level 0
#sql_error create database testwal wal_level 0
sql select * from information_schema.ins_databases
if $rows != 2 then
return -1

View File

@ -101,5 +101,248 @@ if $rows != 1 then
return -1
endi
print step 2-------------------------------
sql drop database if exists test;
sql create database test cachemodel 'both';
sql use test;
sql create table stb (ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using stb tags(1,1,1);
sql create table t2 using stb tags(2,2,2);
sql insert into t1 values('2024-06-05 11:00:00',1,2,3);
sql insert into t1 values('2024-06-05 12:00:00',2,2,3);
sql insert into t2 values('2024-06-05 13:00:00',3,2,3);
sql insert into t2 values('2024-06-05 14:00:00',4,2,3);
sql select last(ts) ts1,ts from stb;
if $data00 != $data01 then
print $data00
return -1
endi
sql select last(ts) ts1,ts from stb group by tbname;
if $data00 != $data01 then
print $data00
return -1
endi
sql select last(ts) ts1,tbname, ts from stb;
if $data00 != $data02 then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
sql select last(ts) ts1,tbname, ts from stb group by tbname;
if $data00 != $data02 then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
print step 3-------------------------------
sql drop database if exists test1;
sql create database test1 cachemodel 'both';
sql use test1;
sql create table stb (ts timestamp,a int primary key,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using stb tags(1,1,1);
sql create table t2 using stb tags(2,2,2);
sql insert into t1 values('2024-06-05 11:00:00',1,2,3);
sql insert into t1 values('2024-06-05 12:00:00',2,2,3);
sql insert into t2 values('2024-06-05 13:00:00',3,2,3);
sql insert into t2 values('2024-06-05 14:00:00',4,2,3);
sql select last(ts) ts1,ts from stb;
if $data00 != $data01 then
print $data00
return -1
endi
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
sql select last(ts) ts1,ts from stb group by tbname;
if $data00 != $data01 then
print $data00
return -1
endi
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
sql select last(ts) ts1,tbname, ts from stb;
if $data00 != $data02 then
print $data00
return -1
endi
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
sql select last(ts) ts1,tbname, ts from stb group by tbname;
if $data00 != $data02 then
print $data00
return -1
endi
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
print step 4-------------------------------
sql select last(a) a,ts from stb;
if $data00 != 4 then
print $data00
return -1
endi
if $data01 != @24-06-05 14:00:00.000@ then
print $data01
return -1
endi
sql select last(a) a,ts from stb group by tbname;
if $data00 != 4 then
print $data00
return -1
endi
if $data01 != @24-06-05 14:00:00.000@ then
print $data01
return -1
endi
sql select last(a) a,tbname, ts from stb;
if $data00 != 4 then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
if $data02 != @24-06-05 14:00:00.000@ then
print $data02
return -1
endi
sql select last(a) a,tbname, ts from stb group by tbname;
if $data00 != 4 then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
if $data02 != @24-06-05 14:00:00.000@ then
print $data02
return -1
endi
print step 5-------------------------------
sql select last(ts) ts1,a from stb;
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
if $data01 != 4 then
print $data01
return -1
endi
sql select last(ts) ts1,a from stb group by tbname;
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
if $data01 != 4 then
print $data01
return -1
endi
sql select last(ts) ts1,tbname, a from stb;
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
if $data02 != 4 then
print $data02
return -1
endi
sql select last(ts) ts1,tbname, a from stb group by tbname;
if $data00 != @24-06-05 14:00:00.000@ then
print $data00
return -1
endi
if $data01 != t2 then
print $data01
return -1
endi
if $data02 != 4 then
print $data02
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -148,7 +148,7 @@ class TDTestCase:
@property
def fsync_create_err(self):
return [
"create database db1 wal_level 0",
#"create database db1 wal_level 0",
"create database db1 wal_level 3",
"create database db1 wal_level null",
"create database db1 wal_level true",
@ -162,7 +162,7 @@ class TDTestCase:
@property
def fsync_alter_err(self):
return [
"alter database db1 wal_level 0",
#"alter database db1 wal_level 0",
"alter database db1 wal_level 3",
"alter database db1 wal_level null",
"alter database db1 wal_level true",

View File

@ -982,6 +982,9 @@ int smlProcess_18784_Test() {
rowIndex++;
}
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists db_18784");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -1187,6 +1190,9 @@ int sml_ts2164_Test() {
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes);
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists line_test");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -1236,6 +1242,8 @@ int sml_ts3116_Test() {
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
code = taos_errno(pRes);
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists ts3116");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -1449,6 +1457,13 @@ int sml_td24070_Test() {
code = taos_errno(pRes);
ASSERT(code == 0);
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists td24070_write");
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists td24070_read");
taos_free_result(pRes);
taos_close(taos);
// test table privilege
@ -1477,6 +1492,10 @@ int sml_td23881_Test() {
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes);
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists line_23881");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -1691,6 +1710,8 @@ int sml_ts2385_Test() {
code = taos_errno(pRes);
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists ts2385");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -1784,6 +1805,9 @@ int sml_td24559_Test() {
}
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists td24559");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -1889,6 +1913,9 @@ int sml_td29691_Test() {
ASSERT(code == TSDB_CODE_PAR_DUPLICATED_COLUMN);
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists td29691");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -1942,6 +1969,9 @@ int sml_td18789_Test() {
}
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists td18789");
taos_free_result(pRes);
taos_close(taos);
return code;
@ -2047,6 +2077,9 @@ int sml_td29373_Test() {
ASSERT(code == TSDB_CODE_SML_NOT_SUPPORT_PK);
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists td29373");
taos_free_result(pRes);
taos_close(taos);
return code;