Merge branch '3.0' of github.com:taosdata/TDengine into szhou/3.0/fix/TD-27089
This commit is contained in:
commit
9a33f0d59a
|
@ -539,7 +539,8 @@ TO_CHAR(ts, format_str_literal)
|
||||||
- When `ms`,`us`,`ns` are used in `to_char`, like `to_char(ts, 'yyyy-mm-dd hh:mi:ss.ms.us.ns')`, The time of `ms`,`us`,`ns` corresponds to the same fraction seconds. When ts is `1697182085123`, the output of `ms` is `123`, `us` is `123000`, `ns` is `123000000`.
|
- When `ms`,`us`,`ns` are used in `to_char`, like `to_char(ts, 'yyyy-mm-dd hh:mi:ss.ms.us.ns')`, The time of `ms`,`us`,`ns` corresponds to the same fraction seconds. When ts is `1697182085123`, the output of `ms` is `123`, `us` is `123000`, `ns` is `123000000`.
|
||||||
- If we want to output some characters of format without converting, surround it with double quotes. `to_char(ts, 'yyyy-mm-dd "is formated by yyyy-mm-dd"')`. If want to output double quotes, add a back slash before double quote, like `to_char(ts, '\"yyyy-mm-dd\"')` will output `"2023-10-10"`.
|
- If we want to output some characters of format without converting, surround it with double quotes. `to_char(ts, 'yyyy-mm-dd "is formated by yyyy-mm-dd"')`. If want to output double quotes, add a back slash before double quote, like `to_char(ts, '\"yyyy-mm-dd\"')` will output `"2023-10-10"`.
|
||||||
- For formats that output digits, the uppercase and lowercase formats are the same.
|
- For formats that output digits, the uppercase and lowercase formats are the same.
|
||||||
- It's recommended to put time zone in the format, if not, the default time zone zone will be that in server or client.
|
- It's recommended to put time zone in the format, if not, the default time zone will be that in server or client.
|
||||||
|
- The precision of the input timestamp will be recognized automatically according to the precision of the table used, milliseconds will be used if no table is specified.
|
||||||
|
|
||||||
#### TO_TIMESTAMP
|
#### TO_TIMESTAMP
|
||||||
|
|
||||||
|
@ -564,9 +565,10 @@ TO_TIMESTAMP(ts_str_literal, format_str_literal)
|
||||||
- The uppercase or lowercase of `MONTH`, `MON`, `DAY`, `DY` and formtas that output digits have same effect when used in `to_timestamp`, like `to_timestamp('2023-JANUARY-01', 'YYYY-month-dd')`, `month` can be replaced by `MONTH`, or `month`. The cases are ignored.
|
- The uppercase or lowercase of `MONTH`, `MON`, `DAY`, `DY` and formtas that output digits have same effect when used in `to_timestamp`, like `to_timestamp('2023-JANUARY-01', 'YYYY-month-dd')`, `month` can be replaced by `MONTH`, or `month`. The cases are ignored.
|
||||||
- If multi times are specified for one component, the previous will be overwritten. Like `to_timestamp('2023-22-10-10', 'yyyy-yy-MM-dd')`, the output year will be `2022`.
|
- If multi times are specified for one component, the previous will be overwritten. Like `to_timestamp('2023-22-10-10', 'yyyy-yy-MM-dd')`, the output year will be `2022`.
|
||||||
- To avoid unexpected time zone used during the convertion, it's recommended to put time zone in the ts string, e.g. '2023-10-10 10:10:10+08'. If time zone not specified, default will be that in server or client.
|
- To avoid unexpected time zone used during the convertion, it's recommended to put time zone in the ts string, e.g. '2023-10-10 10:10:10+08'. If time zone not specified, default will be that in server or client.
|
||||||
- The default timestamp if some components are not specified will be: `1970-01-01 00:00:00` with specified or default local timezone.
|
- The default timestamp if some components are not specified will be: `1970-01-01 00:00:00` with the timezone specified or default to local timezone.
|
||||||
- If `AM` or `PM` is specified in formats, the Hour must between `1-12`.
|
- If `AM` or `PM` is specified in formats, the Hour must between `1-12`.
|
||||||
- In some cases, `to_timestamp` can convert correctly even the format and the timestamp string are not totally matched. Like `to_timetamp('200101/2', 'yyyyMM1/dd')`, the digit `1` in format string are ignored, and the output timestsamp is `2001-01-02 00:00:00`. Spaces and tabs in formats and tiemstamp string are also ignored automatically.
|
- In some cases, `to_timestamp` can convert correctly even the format and the timestamp string are not totally matched. Like `to_timetamp('200101/2', 'yyyyMM1/dd')`, the digit `1` in format string are ignored, and the output timestsamp is `2001-01-02 00:00:00`. Spaces and tabs in formats and tiemstamp string are also ignored automatically.
|
||||||
|
- The precision of the output timestamp will be the same as the table in SELECT stmt, millisecond will be used if no table is specified. The output of `select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns')` will be truncated to millisecond precision. If a nano precision table is specified, no truncation will be applied. Like `select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns') from db_ns.table_ns limit 1`.
|
||||||
|
|
||||||
|
|
||||||
### Time and Date Functions
|
### Time and Date Functions
|
||||||
|
|
|
@ -540,6 +540,7 @@ TO_CHAR(ts, format_str_literal)
|
||||||
- 时间格式中无法匹配规则的内容会直接输出. 如果想要在格式串中指定某些能够匹配规则的部分不做转换, 可以使用双引号, 如`to_char(ts, 'yyyy-mm-dd "is formated by yyyy-mm-dd"')`. 如果想要输出双引号, 那么在双引号之前加一个反斜杠, 如 `to_char(ts, '\"yyyy-mm-dd\"')` 将会输出 `"2023-10-10"`.
|
- 时间格式中无法匹配规则的内容会直接输出. 如果想要在格式串中指定某些能够匹配规则的部分不做转换, 可以使用双引号, 如`to_char(ts, 'yyyy-mm-dd "is formated by yyyy-mm-dd"')`. 如果想要输出双引号, 那么在双引号之前加一个反斜杠, 如 `to_char(ts, '\"yyyy-mm-dd\"')` 将会输出 `"2023-10-10"`.
|
||||||
- 那些输出是数字的格式, 如`YYYY`, `DD`, 大写与小写意义相同, 即`yyyy` 和 `YYYY` 可以互换.
|
- 那些输出是数字的格式, 如`YYYY`, `DD`, 大写与小写意义相同, 即`yyyy` 和 `YYYY` 可以互换.
|
||||||
- 推荐在时间格式中带时区信息,如果不带则默认输出的时区为服务端或客户端所配置的时区.
|
- 推荐在时间格式中带时区信息,如果不带则默认输出的时区为服务端或客户端所配置的时区.
|
||||||
|
- 输入时间戳的精度由所查询表的精度确定, 若未指定表, 则精度为毫秒.
|
||||||
|
|
||||||
#### TO_TIMESTAMP
|
#### TO_TIMESTAMP
|
||||||
|
|
||||||
|
@ -560,13 +561,14 @@ TO_TIMESTAMP(ts_str_literal, format_str_literal)
|
||||||
**支持的格式**: 与`to_char`相同
|
**支持的格式**: 与`to_char`相同
|
||||||
|
|
||||||
**使用说明**:
|
**使用说明**:
|
||||||
- 若`ms`, `us`, `ns`同时指定, 那么结果时间戳包含上述三个字段的和. 如 `to_timestamp('2023-10-10 10:10:10.123.000456.000000789', 'yyyy-mm-dd hh:mi:ss.ms.us.ns')` 输出是 `2023-10-10 10:10:10.123456789`.
|
- 若`ms`, `us`, `ns`同时指定, 那么结果时间戳包含上述三个字段的和. 如 `to_timestamp('2023-10-10 10:10:10.123.000456.000000789', 'yyyy-mm-dd hh:mi:ss.ms.us.ns')` 输出为 `2023-10-10 10:10:10.123456789`对应的时间戳.
|
||||||
- `MONTH`, `MON`, `DAY`, `DY` 以及其他输出为数字的格式的大小写意义相同, 如 `to_timestamp('2023-JANUARY-01', 'YYYY-month-dd')`, `month`可以被替换为`MONTH` 或者`Month`.
|
- `MONTH`, `MON`, `DAY`, `DY` 以及其他输出为数字的格式的大小写意义相同, 如 `to_timestamp('2023-JANUARY-01', 'YYYY-month-dd')`, `month`可以被替换为`MONTH` 或者`Month`.
|
||||||
- 如果同一字段被指定了多次, 那么前面的指定将会被覆盖. 如 `to_timestamp('2023-22-10-10', 'yyyy-yy-MM-dd')`, 输出年份是`2022`.
|
- 如果同一字段被指定了多次, 那么前面的指定将会被覆盖. 如 `to_timestamp('2023-22-10-10', 'yyyy-yy-MM-dd')`, 输出年份是`2022`.
|
||||||
- 为避免转换时使用了非预期的时区,推荐在时间中携带时区信息,例如'2023-10-10 10:10:10+08',如果未指定时区则默认时区为服务端或客户端指定的时区。
|
- 为避免转换时使用了非预期的时区,推荐在时间中携带时区信息,例如'2023-10-10 10:10:10+08',如果未指定时区则默认时区为服务端或客户端指定的时区。
|
||||||
- 如果没有指定完整的时间,那么默认时间值为指定或默认时区的 `1970-01-01 00:00:00`, 未指定部分使用该默认值中的对应部分.
|
- 如果没有指定完整的时间,那么默认时间值为指定或默认时区的 `1970-01-01 00:00:00`, 未指定部分使用该默认值中的对应部分.
|
||||||
- 如果格式串中有`AM`, `PM`等, 那么小时必须是12小时制, 范围必须是01-12.
|
- 如果格式串中有`AM`, `PM`等, 那么小时必须是12小时制, 范围必须是01-12.
|
||||||
- `to_timestamp`转换具有一定的容错机制, 在格式串和时间戳串不完全对应时, 有时也可转换, 如: `to_timestamp('200101/2', 'yyyyMM1/dd')`, 格式串中多出来的1会被丢弃. 格式串与时间戳串中多余的空格字符(空格, tab等)也会被 自动忽略. 如`to_timestamp(' 23 年 - 1 月 - 01 日 ', 'yy 年-MM月-dd日')` 可以被成功转换. 虽然`MM`等字段需要两个数字对应(只有一位时前面补0), 在`to_timestamp`时, 一个数字也可以成功转换.
|
- `to_timestamp`转换具有一定的容错机制, 在格式串和时间戳串不完全对应时, 有时也可转换, 如: `to_timestamp('200101/2', 'yyyyMM1/dd')`, 格式串中多出来的1会被丢弃. 格式串与时间戳串中多余的空格字符(空格, tab等)也会被 自动忽略. 如`to_timestamp(' 23 年 - 1 月 - 01 日 ', 'yy 年-MM月-dd日')` 可以被成功转换. 虽然`MM`等字段需要两个数字对应(只有一位时前面补0), 在`to_timestamp`时, 一个数字也可以成功转换.
|
||||||
|
- 输出时间戳的精度与查询表的精度相同, 若查询未指定表, 则输出精度为毫秒. 如`select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns')`的输出将会把微妙和纳秒进行截断. 如果指定一张纳秒表, 那么就不会发生截断, 如`select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns') from db_ns.table_ns limit 1`.
|
||||||
|
|
||||||
|
|
||||||
### 时间和日期函数
|
### 时间和日期函数
|
||||||
|
|
|
@ -752,7 +752,8 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_FUNC_FUNTION_PARA_VALUE TAOS_DEF_ERROR_CODE(0, 0x2803)
|
#define TSDB_CODE_FUNC_FUNTION_PARA_VALUE TAOS_DEF_ERROR_CODE(0, 0x2803)
|
||||||
#define TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2804)
|
#define TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION TAOS_DEF_ERROR_CODE(0, 0x2804)
|
||||||
#define TSDB_CODE_FUNC_DUP_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x2805)
|
#define TSDB_CODE_FUNC_DUP_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x2805)
|
||||||
#define TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED TAOS_DEF_ERROR_CODE(0, 0x2806)
|
#define TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_FORMAT_ERR TAOS_DEF_ERROR_CODE(0, 0x2806)
|
||||||
|
#define TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_TS_ERR TAOS_DEF_ERROR_CODE(0, 0x2807)
|
||||||
|
|
||||||
//udf
|
//udf
|
||||||
#define TSDB_CODE_UDF_STOPPING TAOS_DEF_ERROR_CODE(0, 0x2901)
|
#define TSDB_CODE_UDF_STOPPING TAOS_DEF_ERROR_CODE(0, 0x2901)
|
||||||
|
|
|
@ -280,8 +280,10 @@ int8_t tsS3Enabled = false;
|
||||||
int8_t tsS3Https = true;
|
int8_t tsS3Https = true;
|
||||||
char tsS3Hostname[TSDB_FQDN_LEN] = "<hostname>";
|
char tsS3Hostname[TSDB_FQDN_LEN] = "<hostname>";
|
||||||
|
|
||||||
int32_t tsS3BlockSize = 4096; // number of tsdb pages
|
int32_t tsS3BlockSize = -1; // number of tsdb pages (4096)
|
||||||
int32_t tsS3BlockCacheSize = 16; // number of blocks
|
int32_t tsS3BlockCacheSize = 16; // number of blocks
|
||||||
|
int32_t tsS3PageCacheSize = 4096; // number of pages
|
||||||
|
int32_t tsS3UploadDelaySec = 60 * 60;
|
||||||
|
|
||||||
#ifndef _STORAGE
|
#ifndef _STORAGE
|
||||||
int32_t taosSetTfsCfg(SConfig *pCfg) {
|
int32_t taosSetTfsCfg(SConfig *pCfg) {
|
||||||
|
@ -460,7 +462,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
||||||
if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "keepColumnName", tsKeepColumnName, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddBool(pCfg, "keepColumnName", tsKeepColumnName, CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "smlChildTableName", tsSmlChildTableName, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddString(pCfg, "smlChildTableName", tsSmlChildTableName, CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "smlAutoChildTableNameDelimiter", tsSmlAutoChildTableNameDelimiter, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddString(pCfg, "smlAutoChildTableNameDelimiter", tsSmlAutoChildTableNameDelimiter, CFG_SCOPE_CLIENT) != 0)
|
||||||
|
return -1;
|
||||||
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "smlTsDefaultName", tsSmlTsDefaultName, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddString(pCfg, "smlTsDefaultName", tsSmlTsDefaultName, CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "smlDot2Underline", tsSmlDot2Underline, CFG_SCOPE_CLIENT) != 0) return -1;
|
if (cfgAddBool(pCfg, "smlDot2Underline", tsSmlDot2Underline, CFG_SCOPE_CLIENT) != 0) return -1;
|
||||||
|
@ -675,7 +678,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
if (cfgAddFloat(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddFloat(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
|
|
||||||
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0)
|
||||||
|
return -1;
|
||||||
|
|
||||||
if (cfgAddString(pCfg, "lossyColumns", tsLossyColumns, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddString(pCfg, "lossyColumns", tsLossyColumns, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
if (cfgAddFloat(pCfg, "fPrecision", tsFPrecision, 0.0f, 100000.0f, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddFloat(pCfg, "fPrecision", tsFPrecision, 0.0f, 100000.0f, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
|
@ -693,8 +697,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, 2048, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -100, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
|
if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
|
if (cfgAddInt32(pCfg, "s3PageCacheSize", tsS3PageCacheSize, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER) != 0) return -1;
|
||||||
|
if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 60 * 10, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER) != 0)
|
||||||
|
return -1;
|
||||||
|
|
||||||
// min free disk space used to check if the disk is full [50MB, 1GB]
|
// min free disk space used to check if the disk is full [50MB, 1GB]
|
||||||
if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024,
|
if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024,
|
||||||
|
@ -953,7 +960,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str, TSDB_TABLE_NAME_LEN);
|
tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str,
|
||||||
|
TSDB_TABLE_NAME_LEN);
|
||||||
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
|
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
|
||||||
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
||||||
tstrncpy(tsSmlTsDefaultName, cfgGetItem(pCfg, "smlTsDefaultName")->str, TSDB_COL_NAME_LEN);
|
tstrncpy(tsSmlTsDefaultName, cfgGetItem(pCfg, "smlTsDefaultName")->str, TSDB_COL_NAME_LEN);
|
||||||
|
@ -1125,6 +1133,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
|
|
||||||
tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32;
|
tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32;
|
||||||
tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32;
|
tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32;
|
||||||
|
tsS3PageCacheSize = cfgGetItem(pCfg, "s3PageCacheSize")->i32;
|
||||||
|
tsS3UploadDelaySec = cfgGetItem(pCfg, "s3UploadDelaySec")->i32;
|
||||||
|
|
||||||
GRANT_CFG_GET;
|
GRANT_CFG_GET;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1422,7 +1432,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
|
||||||
} else if (strcasecmp("smlChildTableName", name) == 0) {
|
} else if (strcasecmp("smlChildTableName", name) == 0) {
|
||||||
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
|
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
|
||||||
} else if (strcasecmp("smlAutoChildTableNameDelimiter", name) == 0) {
|
} else if (strcasecmp("smlAutoChildTableNameDelimiter", name) == 0) {
|
||||||
tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str, TSDB_TABLE_NAME_LEN);
|
tstrncpy(tsSmlAutoChildTableNameDelimiter, cfgGetItem(pCfg, "smlAutoChildTableNameDelimiter")->str,
|
||||||
|
TSDB_TABLE_NAME_LEN);
|
||||||
} else if (strcasecmp("smlTagName", name) == 0) {
|
} else if (strcasecmp("smlTagName", name) == 0) {
|
||||||
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
||||||
// } else if (strcasecmp("smlDataFormat", name) == 0) {
|
// } else if (strcasecmp("smlDataFormat", name) == 0) {
|
||||||
|
@ -1717,6 +1728,20 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (strcasecmp(option, "s3PageCacheSize") == 0) {
|
||||||
|
int32_t newS3PageCacheSize = atoi(value);
|
||||||
|
uInfo("s3PageCacheSize set from %d to %d", tsS3PageCacheSize, newS3PageCacheSize);
|
||||||
|
tsS3PageCacheSize = newS3PageCacheSize;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (strcasecmp(option, "s3UploadDelaySec") == 0) {
|
||||||
|
int32_t newS3UploadDelaysec = atoi(value);
|
||||||
|
uInfo("s3UploadDelaySec set from %d to %d", tsS3UploadDelaySec, newS3UploadDelaysec);
|
||||||
|
tsS3UploadDelaySec = newS3UploadDelaysec;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (strcasecmp(option, "ttlPushInterval") == 0) {
|
if (strcasecmp(option, "ttlPushInterval") == 0) {
|
||||||
int32_t newTtlPushInterval = atoi(value);
|
int32_t newTtlPushInterval = atoi(value);
|
||||||
uInfo("ttlPushInterval set from %d to %d", tsTtlPushIntervalSec, newTtlPushInterval);
|
uInfo("ttlPushInterval set from %d to %d", tsTtlPushIntervalSec, newTtlPushInterval);
|
||||||
|
|
|
@ -1320,7 +1320,7 @@ static void tm2char(const SArray* formats, const struct STm* tm, char* s, int32_
|
||||||
s += 4;
|
s += 4;
|
||||||
break;
|
break;
|
||||||
case TSFKW_DDD:
|
case TSFKW_DDD:
|
||||||
sprintf(s, "%d", tm->tm.tm_yday);
|
sprintf(s, "%03d", tm->tm.tm_yday + 1);
|
||||||
s += strlen(s);
|
s += strlen(s);
|
||||||
break;
|
break;
|
||||||
case TSFKW_DD:
|
case TSFKW_DD:
|
||||||
|
|
|
@ -344,7 +344,7 @@ TEST(timeTest, ts2char) {
|
||||||
"day-\"日\"",
|
"day-\"日\"",
|
||||||
TSDB_TIME_PRECISION_MILLI,
|
TSDB_TIME_PRECISION_MILLI,
|
||||||
"2023-023-23-3-2023-023-23-3-年-OCTOBER -OCT-October -Oct-october "
|
"2023-023-23-3-2023-023-23-3-年-OCTOBER -OCT-October -Oct-october "
|
||||||
"-oct-月-285-13-6-285-13-6-FRIDAY -Friday -friday -日");
|
"-oct-月-286-13-6-286-13-6-FRIDAY -Friday -friday -日");
|
||||||
#endif
|
#endif
|
||||||
ts = 1697182085123L; // Friday, October 13, 2023 3:28:05.123 PM GMT+08:00
|
ts = 1697182085123L; // Friday, October 13, 2023 3:28:05.123 PM GMT+08:00
|
||||||
test_ts2char(ts, "HH24:hh24:HH12:hh12:HH:hh:MI:mi:SS:ss:MS:ms:US:us:NS:ns:PM:AM:pm:am", TSDB_TIME_PRECISION_MILLI,
|
test_ts2char(ts, "HH24:hh24:HH12:hh12:HH:hh:MI:mi:SS:ss:MS:ms:US:us:NS:ns:PM:AM:pm:am", TSDB_TIME_PRECISION_MILLI,
|
||||||
|
|
|
@ -1228,6 +1228,70 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
strcpy(dcfgReq.config, "monitor");
|
strcpy(dcfgReq.config, "monitor");
|
||||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||||
|
} else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
|
||||||
|
int32_t optLen = strlen("s3blocksize");
|
||||||
|
int32_t flag = -1;
|
||||||
|
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||||
|
if (code < 0) return code;
|
||||||
|
|
||||||
|
if (flag > 1024 * 1024) {
|
||||||
|
mError("dnode:%d, failed to config s3blocksize since value:%d. Valid range: [4, 1024 * 1024]", cfgReq.dnodeId,
|
||||||
|
flag);
|
||||||
|
terrno = TSDB_CODE_INVALID_CFG;
|
||||||
|
tFreeSMCfgDnodeReq(&cfgReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
strcpy(dcfgReq.config, "s3blocksize");
|
||||||
|
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||||
|
} else if (strncasecmp(cfgReq.config, "s3blockcachesize", 16) == 0) {
|
||||||
|
int32_t optLen = strlen("s3blockcachesize");
|
||||||
|
int32_t flag = -1;
|
||||||
|
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||||
|
if (code < 0) return code;
|
||||||
|
|
||||||
|
if (flag < 4 || flag > 1024 * 1024) {
|
||||||
|
mError("dnode:%d, failed to config s3BlockCacheSize since value:%d. Valid range: [4, 1024 * 1024]",
|
||||||
|
cfgReq.dnodeId, flag);
|
||||||
|
terrno = TSDB_CODE_INVALID_CFG;
|
||||||
|
tFreeSMCfgDnodeReq(&cfgReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
strcpy(dcfgReq.config, "s3blockcachesize");
|
||||||
|
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||||
|
} else if (strncasecmp(cfgReq.config, "s3pagecachesize", 16) == 0) {
|
||||||
|
int32_t optLen = strlen("s3pagecachesize");
|
||||||
|
int32_t flag = -1;
|
||||||
|
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||||
|
if (code < 0) return code;
|
||||||
|
|
||||||
|
if (flag < 4 || flag > 1024 * 1024 * 1024) {
|
||||||
|
mError("dnode:%d, failed to config s3PageCacheSize since value:%d. Valid range: [4, 1024 * 1024]", cfgReq.dnodeId,
|
||||||
|
flag);
|
||||||
|
terrno = TSDB_CODE_INVALID_CFG;
|
||||||
|
tFreeSMCfgDnodeReq(&cfgReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
strcpy(dcfgReq.config, "s3pagecachesize");
|
||||||
|
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||||
|
} else if (strncasecmp(cfgReq.config, "s3uploaddelaysec", 16) == 0) {
|
||||||
|
int32_t optLen = strlen("s3uploaddelaysec");
|
||||||
|
int32_t flag = -1;
|
||||||
|
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||||
|
if (code < 0) return code;
|
||||||
|
|
||||||
|
if (flag < 600 || flag > 60 * 60 * 24 * 30) {
|
||||||
|
mError("dnode:%d, failed to config s3UploadDelaySec since value:%d. Valid range: [600, 60 * 60 * 24 * 30]",
|
||||||
|
cfgReq.dnodeId, flag);
|
||||||
|
terrno = TSDB_CODE_INVALID_CFG;
|
||||||
|
tFreeSMCfgDnodeReq(&cfgReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
strcpy(dcfgReq.config, "s3uploaddelaysec");
|
||||||
|
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||||
} else if (strncasecmp(cfgReq.config, "ttlpushinterval", 14) == 0) {
|
} else if (strncasecmp(cfgReq.config, "ttlpushinterval", 14) == 0) {
|
||||||
int32_t optLen = strlen("ttlpushinterval");
|
int32_t optLen = strlen("ttlpushinterval");
|
||||||
int32_t flag = -1;
|
int32_t flag = -1;
|
||||||
|
|
|
@ -382,6 +382,8 @@ struct STsdb {
|
||||||
TdThreadMutex biMutex;
|
TdThreadMutex biMutex;
|
||||||
SLRUCache *bCache;
|
SLRUCache *bCache;
|
||||||
TdThreadMutex bMutex;
|
TdThreadMutex bMutex;
|
||||||
|
SLRUCache *pgCache;
|
||||||
|
TdThreadMutex pgMutex;
|
||||||
struct STFileSystem *pFS; // new
|
struct STFileSystem *pFS; // new
|
||||||
SRocksCache rCache;
|
SRocksCache rCache;
|
||||||
};
|
};
|
||||||
|
@ -909,7 +911,9 @@ int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHa
|
||||||
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h);
|
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h);
|
||||||
|
|
||||||
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle);
|
int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle);
|
||||||
int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h);
|
int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle);
|
||||||
|
int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage);
|
||||||
|
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
|
||||||
|
|
||||||
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||||
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||||
|
|
|
@ -27,6 +27,8 @@ extern "C" {
|
||||||
extern int8_t tsS3Enabled;
|
extern int8_t tsS3Enabled;
|
||||||
extern int32_t tsS3BlockSize;
|
extern int32_t tsS3BlockSize;
|
||||||
extern int32_t tsS3BlockCacheSize;
|
extern int32_t tsS3BlockCacheSize;
|
||||||
|
extern int32_t tsS3PageCacheSize;
|
||||||
|
extern int32_t tsS3UploadDelaySec;
|
||||||
|
|
||||||
int32_t s3Init();
|
int32_t s3Init();
|
||||||
void s3CleanUp();
|
void s3CleanUp();
|
||||||
|
|
|
@ -87,6 +87,41 @@ static void tsdbCloseBCache(STsdb *pTsdb) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbOpenPgCache(STsdb *pTsdb) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5);
|
||||||
|
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
|
||||||
|
|
||||||
|
SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3PageCacheSize * szPage, 0, .5);
|
||||||
|
if (pCache == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosLRUCacheSetStrictCapacity(pCache, false);
|
||||||
|
|
||||||
|
taosThreadMutexInit(&pTsdb->pgMutex, NULL);
|
||||||
|
|
||||||
|
_err:
|
||||||
|
pTsdb->pgCache = pCache;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void tsdbClosePgCache(STsdb *pTsdb) {
|
||||||
|
SLRUCache *pCache = pTsdb->pgCache;
|
||||||
|
if (pCache) {
|
||||||
|
int32_t elems = taosLRUCacheGetElems(pCache);
|
||||||
|
tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
|
||||||
|
taosLRUCacheEraseUnrefEntries(pCache);
|
||||||
|
elems = taosLRUCacheGetElems(pCache);
|
||||||
|
tsdbTrace("vgId:%d, elems: %d", TD_VID(pTsdb->pVnode), elems);
|
||||||
|
|
||||||
|
taosLRUCacheCleanup(pCache);
|
||||||
|
|
||||||
|
taosThreadMutexDestroy(&pTsdb->bMutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
|
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -1191,6 +1226,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = tsdbOpenPgCache(pTsdb);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
code = tsdbOpenRocksCache(pTsdb);
|
code = tsdbOpenRocksCache(pTsdb);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1221,6 +1262,7 @@ void tsdbCloseCache(STsdb *pTsdb) {
|
||||||
|
|
||||||
tsdbCloseBICache(pTsdb);
|
tsdbCloseBICache(pTsdb);
|
||||||
tsdbCloseBCache(pTsdb);
|
tsdbCloseBCache(pTsdb);
|
||||||
|
tsdbClosePgCache(pTsdb);
|
||||||
tsdbCloseRocksCache(pTsdb);
|
tsdbCloseRocksCache(pTsdb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3057,7 +3099,6 @@ static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
||||||
// int64_t size = 4096;
|
|
||||||
code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock);
|
code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// taosMemoryFree(pBlock);
|
// taosMemoryFree(pBlock);
|
||||||
|
@ -3123,10 +3164,42 @@ int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle)
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbBCacheRelease(SLRUCache *pCache, LRUHandle *h) {
|
int32_t tsdbCacheGetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, LRUHandle **handle) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
char key[128] = {0};
|
||||||
|
int keyLen = 0;
|
||||||
|
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
|
||||||
|
*handle = taosLRUCacheLookup(pCache, key, keyLen);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *pPage) {
|
||||||
|
int32_t code = 0;
|
||||||
|
char key[128] = {0};
|
||||||
|
int keyLen = 0;
|
||||||
|
LRUHandle *handle = NULL;
|
||||||
|
|
||||||
|
getBCacheKey(pFD->fid, pFD->cid, pgno, key, &keyLen);
|
||||||
|
taosThreadMutexLock(&pFD->pTsdb->pgMutex);
|
||||||
|
handle = taosLRUCacheLookup(pFD->pTsdb->pgCache, key, keyLen);
|
||||||
|
if (!handle) {
|
||||||
|
size_t charge = pFD->szPage;
|
||||||
|
_taos_lru_deleter_t deleter = deleteBCache;
|
||||||
|
uint8_t *pPg = taosMemoryMalloc(charge);
|
||||||
|
memcpy(pPg, pPage, charge);
|
||||||
|
|
||||||
|
LRUStatus status =
|
||||||
|
taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||||
|
if (status != TAOS_LRU_STATUS_OK) {
|
||||||
|
// ignore cache updating if not ok
|
||||||
|
// code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&pFD->pTsdb->pgMutex);
|
||||||
|
|
||||||
|
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,7 @@ typedef struct {
|
||||||
STFileSet *fset;
|
STFileSet *fset;
|
||||||
TABLEID tbid[1];
|
TABLEID tbid[1];
|
||||||
bool hasTSData;
|
bool hasTSData;
|
||||||
|
bool skipTsRow;
|
||||||
} ctx[1];
|
} ctx[1];
|
||||||
|
|
||||||
// reader
|
// reader
|
||||||
|
@ -127,18 +128,18 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
extern int8_t tsS3Enabled;
|
extern int8_t tsS3Enabled;
|
||||||
|
|
||||||
int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs);
|
int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs);
|
||||||
bool skipRow = false;
|
committer->ctx->skipTsRow = false;
|
||||||
if (tsS3Enabled && nlevel > 1 && committer->ctx->did.level == nlevel - 1) {
|
if (tsS3Enabled && nlevel > 1 && committer->ctx->did.level == nlevel - 1) {
|
||||||
skipRow = true;
|
committer->ctx->skipTsRow = true;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
int64_t ts = TSDBROW_TS(&row->row);
|
int64_t ts = TSDBROW_TS(&row->row);
|
||||||
|
|
||||||
if (skipRow && ts <= committer->ctx->maxKey) {
|
if (committer->ctx->skipTsRow && ts <= committer->ctx->maxKey) {
|
||||||
ts = committer->ctx->maxKey + 1;
|
ts = committer->ctx->maxKey + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -402,6 +403,32 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
|
||||||
// reset nextKey
|
// reset nextKey
|
||||||
committer->ctx->nextKey = TSKEY_MAX;
|
committer->ctx->nextKey = TSKEY_MAX;
|
||||||
|
|
||||||
|
committer->ctx->skipTsRow = false;
|
||||||
|
|
||||||
|
extern int8_t tsS3Enabled;
|
||||||
|
extern int32_t tsS3UploadDelaySec;
|
||||||
|
long s3Size(const char *object_name);
|
||||||
|
int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs);
|
||||||
|
committer->ctx->skipTsRow = false;
|
||||||
|
if (tsS3Enabled && nlevel > 1 && committer->ctx->fset) {
|
||||||
|
STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA];
|
||||||
|
if (fobj && fobj->f->did.level == nlevel - 1) {
|
||||||
|
// if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay
|
||||||
|
const char *object_name = taosDirEntryBaseName((char *)fobj->fname);
|
||||||
|
|
||||||
|
if (taosCheckExistFile(fobj->fname)) {
|
||||||
|
int32_t mtime = 0;
|
||||||
|
taosStatFile(fobj->fname, NULL, &mtime, NULL);
|
||||||
|
if (mtime < committer->ctx->now - tsS3UploadDelaySec) {
|
||||||
|
committer->ctx->skipTsRow = true;
|
||||||
|
}
|
||||||
|
} else if (s3Size(object_name) > 0) {
|
||||||
|
committer->ctx->skipTsRow = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// new fset can be written with ts data
|
||||||
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
|
|
|
@ -303,6 +303,7 @@ bool tsdbIsSameTFile(const STFile *f1, const STFile *f2) {
|
||||||
if (f1->did.id != f2->did.id) return false;
|
if (f1->did.id != f2->did.id) return false;
|
||||||
if (f1->fid != f2->fid) return false;
|
if (f1->fid != f2->fid) return false;
|
||||||
if (f1->cid != f2->cid) return false;
|
if (f1->cid != f2->cid) return false;
|
||||||
|
if (f1->s3flag != f2->s3flag) return false;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,6 +58,7 @@ int32_t tsdbTFileObjCmpr(const STFileObj **fobj1, const STFileObj **fobj2);
|
||||||
struct STFile {
|
struct STFile {
|
||||||
tsdb_ftype_t type;
|
tsdb_ftype_t type;
|
||||||
SDiskID did; // disk id
|
SDiskID did; // disk id
|
||||||
|
int32_t s3flag;
|
||||||
int32_t fid; // file id
|
int32_t fid; // file id
|
||||||
int64_t cid; // commit id
|
int64_t cid; // commit id
|
||||||
int64_t size;
|
int64_t size;
|
||||||
|
|
|
@ -131,6 +131,7 @@ static int32_t tsdbWriteFilePage(STsdbFD *pFD) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pFD->s3File) {
|
if (pFD->s3File) {
|
||||||
|
tsdbWarn("%s file: %s", __func__, pFD->path);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
if (pFD->pgno > 0) {
|
if (pFD->pgno > 0) {
|
||||||
|
@ -177,7 +178,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
|
||||||
pFD->blkno = (pgno + tsS3BlockSize - 1) / tsS3BlockSize;
|
pFD->blkno = (pgno + tsS3BlockSize - 1) / tsS3BlockSize;
|
||||||
code = tsdbCacheGetBlockS3(pFD->pTsdb->bCache, pFD, &handle);
|
code = tsdbCacheGetBlockS3(pFD->pTsdb->bCache, pFD, &handle);
|
||||||
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
|
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
|
||||||
tsdbBCacheRelease(pFD->pTsdb->bCache, handle);
|
tsdbCacheRelease(pFD->pTsdb->bCache, handle);
|
||||||
if (code == TSDB_CODE_SUCCESS && !handle) {
|
if (code == TSDB_CODE_SUCCESS && !handle) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
@ -189,7 +190,7 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
|
||||||
int64_t blk_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
int64_t blk_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
||||||
memcpy(pFD->pBuf, pBlock + (offset - blk_offset), pFD->szPage);
|
memcpy(pFD->pBuf, pBlock + (offset - blk_offset), pFD->szPage);
|
||||||
|
|
||||||
tsdbBCacheRelease(pFD->pTsdb->bCache, handle);
|
tsdbCacheRelease(pFD->pTsdb->bCache, handle);
|
||||||
} else {
|
} else {
|
||||||
// seek
|
// seek
|
||||||
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
|
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
|
||||||
|
@ -253,7 +254,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
|
static int32_t tsdbReadFileImp(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int64_t n = 0;
|
int64_t n = 0;
|
||||||
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
|
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
|
||||||
|
@ -282,10 +283,122 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int64_t n = 0;
|
||||||
|
int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
|
||||||
|
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
|
||||||
|
int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
|
||||||
|
int64_t bOffset = fOffset % pFD->szPage;
|
||||||
|
|
||||||
|
ASSERT(bOffset < szPgCont);
|
||||||
|
|
||||||
|
// 1, find pgnoStart & pgnoEnd to fetch from s3, if all pgs are local, no need to fetch
|
||||||
|
// 2, fetch pgnoStart ~ pgnoEnd from s3
|
||||||
|
// 3, store pgs to pcache & last pg to pFD->pBuf
|
||||||
|
// 4, deliver pgs to [pBuf, pBuf + size)
|
||||||
|
|
||||||
|
while (n < size) {
|
||||||
|
if (pFD->pgno != pgno) {
|
||||||
|
LRUHandle *handle = NULL;
|
||||||
|
code = tsdbCacheGetPageS3(pFD->pTsdb->pgCache, pFD, pgno, &handle);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
if (handle) {
|
||||||
|
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
|
||||||
|
}
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!handle) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t *pPage = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->pgCache, handle);
|
||||||
|
memcpy(pFD->pBuf, pPage, pFD->szPage);
|
||||||
|
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
|
||||||
|
|
||||||
|
// check
|
||||||
|
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
|
||||||
|
code = TSDB_CODE_FILE_CORRUPTED;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
pFD->pgno = pgno;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t nRead = TMIN(szPgCont - bOffset, size - n);
|
||||||
|
memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
|
||||||
|
|
||||||
|
n += nRead;
|
||||||
|
pgno++;
|
||||||
|
bOffset = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (n < size) {
|
||||||
|
// 2, retrieve pgs from s3
|
||||||
|
uint8_t *pBlock = NULL;
|
||||||
|
int64_t retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage);
|
||||||
|
int64_t pgnoEnd = pgno - 1 + (size - n + szPgCont - 1) / szPgCont;
|
||||||
|
int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
|
||||||
|
code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, &pBlock);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3, Store Pages in Cache
|
||||||
|
int nPage = pgnoEnd - pgno + 1;
|
||||||
|
for (int i = 0; i < nPage; ++i) {
|
||||||
|
tsdbCacheSetPageS3(pFD->pTsdb->pgCache, pFD, pgno, pBlock + i * pFD->szPage);
|
||||||
|
|
||||||
|
memcpy(pFD->pBuf, pBlock + i * pFD->szPage, pFD->szPage);
|
||||||
|
|
||||||
|
// check
|
||||||
|
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
|
||||||
|
code = TSDB_CODE_FILE_CORRUPTED;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
pFD->pgno = pgno;
|
||||||
|
|
||||||
|
int64_t nRead = TMIN(szPgCont - bOffset, size - n);
|
||||||
|
memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
|
||||||
|
|
||||||
|
n += nRead;
|
||||||
|
pgno++;
|
||||||
|
bOffset = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
|
||||||
|
int32_t code = 0;
|
||||||
|
if (!pFD->pFD) {
|
||||||
|
code = tsdbOpenFileImpl(pFD);
|
||||||
|
if (code) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pFD->s3File && tsS3BlockSize < 0) {
|
||||||
|
return tsdbReadFileS3(pFD, offset, pBuf, size);
|
||||||
|
} else {
|
||||||
|
return tsdbReadFileImp(pFD, offset, pBuf, size);
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbFsyncFile(STsdbFD *pFD) {
|
int32_t tsdbFsyncFile(STsdbFD *pFD) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (pFD->s3File) {
|
if (pFD->s3File) {
|
||||||
|
tsdbWarn("%s file: %s", __func__, pFD->path);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
code = tsdbWriteFilePage(pFD);
|
code = tsdbWriteFilePage(pFD);
|
||||||
|
|
|
@ -206,6 +206,8 @@ static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, const
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
op.nf.s3flag = true;
|
||||||
|
|
||||||
code = TARRAY2_APPEND(rtner->fopArr, op);
|
code = TARRAY2_APPEND(rtner->fopArr, op);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
@ -322,27 +324,40 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) {
|
||||||
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = fset->farr[ftype], 1); ++ftype) {
|
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = fset->farr[ftype], 1); ++ftype) {
|
||||||
if (fobj == NULL) continue;
|
if (fobj == NULL) continue;
|
||||||
|
|
||||||
if (fobj->f->did.level == did.level) continue;
|
|
||||||
|
|
||||||
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
|
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
|
||||||
|
|
||||||
|
if (fobj->f->did.level == did.level) {
|
||||||
|
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1 &&
|
||||||
|
taosCheckExistFile(fobj->fname)) {
|
||||||
|
int32_t mtime = 0;
|
||||||
|
taosStatFile(fobj->fname, NULL, &mtime, NULL);
|
||||||
|
if (mtime < rtner->now - tsS3UploadDelaySec) {
|
||||||
|
code = tsdbMigrateDataFileS3(rtner, fobj, &did);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
/*
|
||||||
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) {
|
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) {
|
||||||
code = tsdbMigrateDataFileS3(rtner, fobj, &did);
|
code = tsdbMigrateDataFileS3(rtner, fobj, &did);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
if (tsS3Enabled) {
|
if (tsS3Enabled) {
|
||||||
int64_t fsize = 0;
|
int64_t fsize = 0;
|
||||||
if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) {
|
if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(terrno);
|
||||||
tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode), __func__,
|
tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode),
|
||||||
fobj->fname, tstrerror(code));
|
__func__, fobj->fname, tstrerror(code)); TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
s3EvictCache(fobj->fname, fsize * 2);
|
s3EvictCache(fobj->fname, fsize * 2);
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
|
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
//}
|
||||||
}
|
}
|
||||||
|
|
||||||
// stt
|
// stt
|
||||||
|
|
|
@ -80,8 +80,8 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
|
||||||
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
|
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
|
||||||
TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision];
|
TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision];
|
||||||
int32_t size = taosArrayGetSize(pMsg->aSubmitTbData);
|
int32_t size = taosArrayGetSize(pMsg->aSubmitTbData);
|
||||||
|
/*
|
||||||
int32_t nlevel = tfsGetLevel(pTsdb->pVnode->pTfs);
|
int32_t nlevel = tfsGetLevel(pTsdb->pVnode->pTfs);
|
||||||
|
|
||||||
if (nlevel > 1 && tsS3Enabled) {
|
if (nlevel > 1 && tsS3Enabled) {
|
||||||
if (nlevel == 3) {
|
if (nlevel == 3) {
|
||||||
minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep1;
|
minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep1;
|
||||||
|
@ -89,7 +89,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
|
||||||
minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep0;
|
minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i);
|
SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i);
|
||||||
if (pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
if (pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
||||||
|
|
|
@ -13,14 +13,14 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "audit.h"
|
||||||
#include "tencode.h"
|
#include "tencode.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
#include "tstrbuild.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
#include "vndCos.h"
|
#include "vndCos.h"
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
#include "audit.h"
|
|
||||||
#include "tstrbuild.h"
|
|
||||||
|
|
||||||
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
|
@ -263,8 +263,9 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
|
||||||
now *= 1000000;
|
now *= 1000000;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t nlevel = tfsGetLevel(pVnode->pTfs);
|
|
||||||
int32_t keep = pVnode->config.tsdbCfg.keep2;
|
int32_t keep = pVnode->config.tsdbCfg.keep2;
|
||||||
|
/*
|
||||||
|
int32_t nlevel = tfsGetLevel(pVnode->pTfs);
|
||||||
if (nlevel > 1 && tsS3Enabled) {
|
if (nlevel > 1 && tsS3Enabled) {
|
||||||
if (nlevel == 3) {
|
if (nlevel == 3) {
|
||||||
keep = pVnode->config.tsdbCfg.keep1;
|
keep = pVnode->config.tsdbCfg.keep1;
|
||||||
|
@ -272,6 +273,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
|
||||||
keep = pVnode->config.tsdbCfg.keep0;
|
keep = pVnode->config.tsdbCfg.keep0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * keep;
|
TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * keep;
|
||||||
TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
|
TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
|
||||||
|
@ -1518,7 +1520,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
|
int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
|
||||||
SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
|
SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
|
||||||
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
|
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
|
||||||
|
|
||||||
if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) {
|
if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) {
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver);
|
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver);
|
||||||
|
|
|
@ -3315,7 +3315,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "to_timestamp",
|
.name = "to_timestamp",
|
||||||
.type = FUNCTION_TYPE_TO_TIMESTAMP,
|
.type = FUNCTION_TYPE_TO_TIMESTAMP,
|
||||||
.classification = FUNC_MGT_SCALAR_FUNC,
|
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
|
||||||
.translateFunc = translateToTimestamp,
|
.translateFunc = translateToTimestamp,
|
||||||
.getEnvFunc = NULL,
|
.getEnvFunc = NULL,
|
||||||
.initFunc = NULL,
|
.initFunc = NULL,
|
||||||
|
|
|
@ -434,17 +434,22 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i
|
||||||
void *rsp = NULL;
|
void *rsp = NULL;
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
SOutputData sOutput = {0};
|
SOutputData sOutput = {0};
|
||||||
if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == rsp && TSDB_CODE_SUCCESS == code) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rsp) {
|
if (NULL != rsp) {
|
||||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||||
|
|
||||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
||||||
if (qComplete) {
|
if (qComplete) {
|
||||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
qwMsg->connInfo = ctx->dataConnInfo;
|
qwMsg->connInfo = ctx->dataConnInfo;
|
||||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||||
|
@ -456,7 +461,6 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i
|
||||||
dataLen);
|
dataLen);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1230,7 +1230,7 @@ int32_t toTimestampFunction(SScalarParam* pInput, int32_t inputNum, SScalarParam
|
||||||
code = taosChar2Ts(format, &formats, tsStr, &ts, precision, errMsg, 128);
|
code = taosChar2Ts(format, &formats, tsStr, &ts, precision, errMsg, 128);
|
||||||
if (code) {
|
if (code) {
|
||||||
qError("func to_timestamp failed %s", errMsg);
|
qError("func to_timestamp failed %s", errMsg);
|
||||||
code = TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED;
|
code = code == -1 ? TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_FORMAT_ERR : TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_TS_ERR;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
colDataSetVal(pOutput->columnData, i, (char *)&ts, false);
|
colDataSetVal(pOutput->columnData, i, (char *)&ts, false);
|
||||||
|
|
|
@ -23,6 +23,8 @@
|
||||||
#include "syncReplication.h"
|
#include "syncReplication.h"
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
|
|
||||||
|
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t len, int32_t typ);
|
||||||
|
|
||||||
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
|
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
|
||||||
taosThreadMutexLock(&pBuf->mutex);
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
|
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
|
||||||
|
@ -160,8 +162,11 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int dataLen = 0;
|
int dataLen = 0;
|
||||||
if (snapInfo.data) {
|
void *pData = snapInfo.data;
|
||||||
SSyncTLV *datHead = snapInfo.data;
|
int32_t type = 0;
|
||||||
|
if (pData) {
|
||||||
|
type = snapInfo.type;
|
||||||
|
SSyncTLV *datHead = pData;
|
||||||
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
|
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
|
||||||
sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
|
sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
|
||||||
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
||||||
|
@ -170,37 +175,12 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
||||||
dataLen = sizeof(SSyncTLV) + datHead->len;
|
dataLen = sizeof(SSyncTLV) + datHead->len;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
if (syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type) != 0) {
|
||||||
if (syncBuildSnapshotSend(&rpcMsg, dataLen, pSender->pSyncNode->vgId) != 0) {
|
|
||||||
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
|
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId));
|
||||||
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
|
||||||
pMsg->term = pSender->term;
|
|
||||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
|
||||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
|
||||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
|
||||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
|
||||||
pMsg->lastConfig = pSender->lastConfig;
|
|
||||||
pMsg->startTime = pSender->startTime;
|
|
||||||
pMsg->seq = pSender->seq;
|
|
||||||
|
|
||||||
if (dataLen > 0) {
|
|
||||||
pMsg->payloadType = snapInfo.type;
|
|
||||||
memcpy(pMsg->data, snapInfo.data, dataLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
// send msg
|
|
||||||
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
|
|
||||||
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
|
|
||||||
goto _out;
|
|
||||||
}
|
|
||||||
|
|
||||||
sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&pMsg->destId));
|
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
_out:
|
_out:
|
||||||
if (snapInfo.data) {
|
if (snapInfo.data) {
|
||||||
|
@ -232,6 +212,43 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
|
||||||
sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
|
sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
|
||||||
|
int32_t code = -1;
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
|
||||||
|
if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) {
|
||||||
|
sSError(pSender, "failed to build snap replication msg since %s", terrstr());
|
||||||
|
goto _OUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
||||||
|
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||||
|
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||||
|
pMsg->term = pSender->term;
|
||||||
|
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||||
|
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||||
|
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||||
|
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
||||||
|
pMsg->lastConfig = pSender->lastConfig;
|
||||||
|
pMsg->startTime = pSender->startTime;
|
||||||
|
pMsg->seq = seq;
|
||||||
|
|
||||||
|
if (pBlock != NULL && blockLen > 0) {
|
||||||
|
memcpy(pMsg->data, pBlock, blockLen);
|
||||||
|
}
|
||||||
|
pMsg->payloadType = typ;
|
||||||
|
|
||||||
|
// send msg
|
||||||
|
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
|
||||||
|
sSError(pSender, "failed to send snap replication msg since %s. seq:%d", terrstr(), seq);
|
||||||
|
goto _OUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
_OUT:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
// when sender receive ack, call this function to send msg from seq
|
// when sender receive ack, call this function to send msg from seq
|
||||||
// seq = ack + 1, already updated
|
// seq = ack + 1, already updated
|
||||||
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
||||||
|
@ -273,33 +290,10 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
||||||
|
|
||||||
ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END);
|
ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END);
|
||||||
|
|
||||||
int32_t blockLen = (pBlk != NULL) ? pBlk->blockLen : 0;
|
|
||||||
// build msg
|
|
||||||
SRpcMsg rpcMsg = {0};
|
|
||||||
if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) {
|
|
||||||
sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
|
|
||||||
goto _OUT;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
|
||||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
|
||||||
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
|
||||||
pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
|
|
||||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
|
||||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
|
||||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
|
||||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
|
||||||
pMsg->lastConfig = pSender->lastConfig;
|
|
||||||
pMsg->startTime = pSender->startTime;
|
|
||||||
pMsg->seq = pSender->seq;
|
|
||||||
|
|
||||||
if (pBlk != NULL && pBlk->pBlock != NULL && pBlk->blockLen > 0) {
|
|
||||||
memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
// send msg
|
// send msg
|
||||||
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
|
int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
|
||||||
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
|
void *pBlock = (pBlk) ? pBlk->pBlock : NULL;
|
||||||
|
if (syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0) != 0) {
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -336,36 +330,17 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
||||||
if (nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
|
if (nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// build msg
|
if (syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0) != 0) {
|
||||||
SRpcMsg rpcMsg = {0};
|
|
||||||
if (syncBuildSnapshotSend(&rpcMsg, pBlk->blockLen, pSender->pSyncNode->vgId) != 0) {
|
|
||||||
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
|
|
||||||
goto _out;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncSnapshotSend *pMsg = rpcMsg.pCont;
|
|
||||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
|
||||||
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
|
||||||
pMsg->term = pSender->term;
|
|
||||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
|
||||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
|
||||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
|
||||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
|
||||||
pMsg->lastConfig = pSender->lastConfig;
|
|
||||||
pMsg->startTime = pSender->startTime;
|
|
||||||
pMsg->seq = pBlk->seq;
|
|
||||||
|
|
||||||
if (pBlk->pBlock != NULL && pBlk->blockLen > 0) {
|
|
||||||
memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
// send msg
|
|
||||||
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
|
|
||||||
sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
|
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
pBlk->sendTimeMs = nowMs;
|
pBlk->sendTimeMs = nowMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
|
||||||
|
if (syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0) != 0) {
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
}
|
||||||
code = 0;
|
code = 0;
|
||||||
_out:;
|
_out:;
|
||||||
taosThreadMutexUnlock(&pSndBuf->mutex);
|
taosThreadMutexUnlock(&pSndBuf->mutex);
|
||||||
|
@ -861,7 +836,7 @@ static int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSen
|
||||||
pRspMsg->lastIndex = pMsg->lastIndex;
|
pRspMsg->lastIndex = pMsg->lastIndex;
|
||||||
pRspMsg->lastTerm = pMsg->lastTerm;
|
pRspMsg->lastTerm = pMsg->lastTerm;
|
||||||
pRspMsg->startTime = pMsg->startTime;
|
pRspMsg->startTime = pMsg->startTime;
|
||||||
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
|
pRspMsg->ack = pMsg->seq;
|
||||||
pRspMsg->code = code;
|
pRspMsg->code = code;
|
||||||
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
|
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
|
||||||
|
|
||||||
|
@ -893,13 +868,13 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
|
||||||
pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
|
pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
|
||||||
ppMsg[0] = NULL;
|
ppMsg[0] = NULL;
|
||||||
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
|
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
|
||||||
} else {
|
} else if (pMsg->seq < pRcvBuf->start) {
|
||||||
syncSnapSendRsp(pReceiver, pMsg, code);
|
syncSnapSendRsp(pReceiver, pMsg, code);
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
|
for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
|
||||||
if (pRcvBuf->entries[seq]) {
|
if (pRcvBuf->entries[seq % pRcvBuf->size]) {
|
||||||
pRcvBuf->cursor = seq;
|
pRcvBuf->cursor = seq;
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -613,7 +613,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_FUNTION_PARA_TYPE, "Invalid function par
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid function para value")
|
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid function para value")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION, "Not buildin function")
|
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION, "Not buildin function")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_DUP_TIMESTAMP, "Duplicate timestamps not allowed in function")
|
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_DUP_TIMESTAMP, "Duplicate timestamps not allowed in function")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED, "Func to_timestamp failed, check log for detail")
|
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_FORMAT_ERR, "Func to_timestamp failed, format mismatch")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_FUNC_TO_TIMESTAMP_FAILED_TS_ERR, "Func to_timestamp failed, wrong timestamp")
|
||||||
|
|
||||||
//udf
|
//udf
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_STOPPING, "udf is stopping")
|
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_STOPPING, "udf is stopping")
|
||||||
|
|
|
@ -166,6 +166,44 @@ class TDTestCase:
|
||||||
def run(self):
|
def run(self):
|
||||||
self.prepareTestEnv()
|
self.prepareTestEnv()
|
||||||
self.test_to_timestamp()
|
self.test_to_timestamp()
|
||||||
|
self.test_ns_to_timestamp()
|
||||||
|
|
||||||
|
def create_tables(self):
|
||||||
|
tdSql.execute("create database if not exists test_us precision 'us'")
|
||||||
|
tdSql.execute("create database if not exists test_ns precision 'ns'")
|
||||||
|
tdSql.execute("use test_us")
|
||||||
|
tdSql.execute(f"CREATE STABLE `meters_us` (`ts` TIMESTAMP, `ip_value` FLOAT, `ip_quality` INT, `ts2` timestamp) TAGS (`t1` INT)")
|
||||||
|
tdSql.execute(f"CREATE TABLE `ctb1_us` USING `meters_us` (`t1`) TAGS (1)")
|
||||||
|
tdSql.execute(f"CREATE TABLE `ctb2_us` USING `meters_us` (`t1`) TAGS (2)")
|
||||||
|
tdSql.execute("use test_ns")
|
||||||
|
tdSql.execute(f"CREATE STABLE `meters_ns` (`ts` TIMESTAMP, `ip_value` FLOAT, `ip_quality` INT, `ts2` timestamp) TAGS (`t1` INT)")
|
||||||
|
tdSql.execute(f"CREATE TABLE `ctb1_ns` USING `meters_ns` (`t1`) TAGS (1)")
|
||||||
|
tdSql.execute(f"CREATE TABLE `ctb2_ns` USING `meters_ns` (`t1`) TAGS (2)")
|
||||||
|
|
||||||
|
def insert_ns_data(self):
|
||||||
|
tdLog.debug("start to insert data ............")
|
||||||
|
tdSql.execute(f"INSERT INTO `test_us`.`ctb1_us` VALUES ('2023-07-01 00:00:00.123456', 10.30000, 100, '2023-07-01 00:00:00.123456')")
|
||||||
|
tdSql.execute(f"INSERT INTO `test_us`.`ctb2_us` VALUES ('2023-08-01 00:00:00.123456', 20.30000, 200, '2023-07-01 00:00:00.123456')")
|
||||||
|
tdSql.execute(f"INSERT INTO `test_ns`.`ctb1_ns` VALUES ('2023-07-01 00:00:00.123456789', 10.30000, 100, '2023-07-01 00:00:00.123456000')")
|
||||||
|
tdSql.execute(f"INSERT INTO `test_ns`.`ctb2_ns` VALUES ('2023-08-01 00:00:00.123456789', 20.30000, 200, '2023-08-01 00:00:00.123456789')")
|
||||||
|
tdLog.debug("insert data ............ [OK]")
|
||||||
|
|
||||||
|
def test_ns_to_timestamp(self):
|
||||||
|
self.create_tables()
|
||||||
|
self.insert_ns_data()
|
||||||
|
tdSql.query("select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns')", queryTimes=1)
|
||||||
|
tdSql.checkData(0, 0, 1690855810123)
|
||||||
|
tdSql.execute('use test_ns', queryTimes=1)
|
||||||
|
tdSql.query("select to_timestamp('2023-08-1 10:10:10.123456789', 'yyyy-mm-dd hh:mi:ss.ns')", queryTimes=1)
|
||||||
|
tdSql.checkData(0, 0, 1690855810123)
|
||||||
|
tdSql.query("select to_char(ts2, 'yyyy-mm-dd hh:mi:ss.ns') from meters_ns", queryTimes=1)
|
||||||
|
tdSql.checkData(0, 0, '2023-07-01 12:00:00.123456000')
|
||||||
|
tdSql.checkData(1, 0, '2023-08-01 12:00:00.123456789')
|
||||||
|
|
||||||
|
tdSql.query("select to_timestamp(to_char(ts2, 'yyyy-mm-dd hh:mi:ss.ns'), 'yyyy-mm-dd hh:mi:ss.ns') from meters_ns", queryTimes=1)
|
||||||
|
tdSql.checkData(0, 0, 1688140800123456000)
|
||||||
|
tdSql.checkData(1, 0, 1690819200123456789)
|
||||||
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
Loading…
Reference in New Issue