Merge pull request #29999 from taosdata/enh/TS-5089

feat: taosBenchmark supports exporting to CSV files
This commit is contained in:
Linhe Huo 2025-03-08 14:17:35 +08:00 committed by GitHub
commit d8ef73ea90
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 2055 additions and 486 deletions

View File

@ -188,9 +188,12 @@ taosBenchmark -A INT,DOUBLE,NCHAR,BINARY\(16\)
The parameters listed in this section apply to all functional modes.
- **filetype**: The function to test, possible values are `insert`, `query`, and `subscribe`. Corresponding to insert, query, and subscribe functions. Only one can be specified in each configuration file.
- **filetype**: The function to test, possible values are `insert`, `query`, `subscribe` and `csvfile`. Corresponding to insert, query, subscribe and generate csv file functions. Only one can be specified in each configuration file.
- **cfgdir**: Directory where the TDengine client configuration file is located, default path is /etc/taos.
- **output_dir**: The directory specified for output files. When the feature category is csvfile, it refers to the directory where the generated csv files will be saved. The default value is ./output/.
- **host**: Specifies the FQDN of the TDengine server to connect to, default value is localhost.
- **port**: The port number of the TDengine server to connect to, default value is 6030.
@ -283,6 +286,27 @@ Parameters related to supertable creation are configured in the `super_tables` s
- **repeat_ts_max** : Numeric type, when composite primary key is enabled, specifies the maximum number of records with the same timestamp to be generated
- **sqls** : Array of strings type, specifies the array of sql to be executed after the supertable is successfully created, the table name specified in sql must be prefixed with the database name, otherwise an unspecified database error will occur
- **csv_file_prefix**: String type, sets the prefix for the names of the generated csv files. Default value is "data".
- **csv_ts_format**: String type, sets the format of the time string in the names of the generated csv files, following the `strftime` format standard. If not set, files will not be split by time intervals. Supported patterns include:
- %Y: Year as a four-digit number (e.g., 2025)
- %m: Month as a two-digit number (01 to 12)
- %d: Day of the month as a two-digit number (01 to 31)
- %H: Hour in 24-hour format as a two-digit number (00 to 23)
- %M: Minute as a two-digit number (00 to 59)
- %S: Second as a two-digit number (00 to 59)
- **csv_ts_interval**: String type, sets the time interval for splitting generated csv file names. Supports daily, hourly, minute, and second intervals such as 1d/2h/30m/40s. The default value is "1d".
- **csv_output_header**: String type, sets whether the generated csv files should contain column header descriptions. The default value is "yes".
- **csv_tbname_alias**: String type, sets the alias for the tbname field in the column header descriptions of csv files. The default value is "device_id".
- **csv_compress_level**: String type, sets the compression level for generating csv-encoded data and automatically compressing it into gzip file. This process directly encodes and compresses the data, rather than first generating a csv file and then compressing it. Possible values are:
- none: No compression
- fast: gzip level 1 compression
- balance: gzip level 6 compression
- best: gzip level 9 compression
#### Tag and Data Columns
@ -478,6 +502,17 @@ Note: Data types in the taosBenchmark configuration file must be in lowercase to
</details>
### Export CSV File Example
<details>
<summary>csv-export.json</summary>
```json
{{#include /TDengine/tools/taos-tools/example/csv-export.json}}
```
</details>
Other json examples see [here](https://github.com/taosdata/TDengine/tree/main/tools/taos-tools/example)
## Output Performance Indicators

View File

@ -93,14 +93,17 @@ taosBenchmark -f <json file>
本节所列参数适用于所有功能模式。
- **filetype**:功能分类,可选值为 `insert`、`query` 和 `subscribe`。分别对应插入、查询和订阅功能。每个配置文件中只能指定其中之一。
- **filetype**:功能分类,可选值为 `insert`、`query`、`subscribe` 和 `csvfile`。分别对应插入、查询、订阅和生成 csv 文件功能。每个配置文件中只能指定其中之一。
- **cfgdir**TDengine 客户端配置文件所在的目录,默认路径是 /etc/taos 。
- **output_dir**:指定输出文件的目录,当功能分类是 `csvfile` 时,指生成的 csv 文件的保存目录,默认值为 ./output/ 。
- **host**:指定要连接的 TDengine 服务端的 FQDN默认值为 localhost 。
- **port**:要连接的 TDengine 服务器的端口号,默认值为 6030 。
- **user**:用于连接 TDengine 服务端的用户名,默认为 root 。
- **user**:用于连接 TDengine 服务端的用户名,默认为 root 。
- **password**:用于连接 TDengine 服务端的密码,默认值为 taosdata。
@ -184,10 +187,34 @@ taosBenchmark -f <json file>
- **tags_file**:仅当 insert_mode 为 taoscrest 的模式下生效。最终的 tag 的数值与 childtable_count 有关,如果 csv 文件内的 tag 数据行小于给定的子表数量,那么会循环读取 csv 文件数据直到生成 childtable_count 指定的子表数量;否则则只会读取 childtable_count 行 tag 数据。也即最终生成的子表数量为二者取小。
- **primary_key**:指定超级表是否有复合主键,取值 1 和 0复合主键列只能是超级表的第二列指定生成复合主键后要确保第二列符合复合主键的数据类型否则会报错。
- **repeat_ts_min**:数值类型,复合主键开启情况下指定生成相同时间戳记录的最小个数,生成相同时间戳记录的个数是在范围[repeat_ts_min, repeat_ts_max] 内的随机值,最小值等于最大值时为固定个数。
- **repeat_ts_max**:数值类型,复合主键开启情况下指定生成相同时间戳记录的最大个数。
- **sqls**:字符串数组类型,指定超级表创建成功后要执行的 sql 数组sql 中指定表名前面要带数据库名,否则会报未指定数据库错误。
- **csv_file_prefix**:字符串类型,设置生成的 csv 文件名称的前缀,默认值为 data 。
- **csv_ts_format**:字符串类型,设置生成的 csv 文件名称中时间字符串的格式,格式遵循 `strftime` 格式标准,如果没有设置表示不按照时间段切分文件。支持的模式有:
- %Y: 年份四位数表示例如2025
- %m: 月份两位数表示01到12
- %d: 一个月中的日子两位数表示01到31
- %H: 小时24小时制两位数表示00到23
- %M: 分钟两位数表示00到59
- %S: 秒两位数表示00到59
- **csv_ts_interval**:字符串类型,设置生成的 csv 文件名称中时间段间隔,支持天、小时、分钟、秒级间隔,如 1d/2h/30m/40s默认值为 1d 。
- **csv_output_header**:字符串类型,设置生成的 csv 文件是否包含列头描述,默认值为 yes 。
- **csv_tbname_alias**:字符串类型,设置 csv 文件列头描述中 tbname 字段的别名,默认值为 device_id 。
- **csv_compress_level**:字符串类型,设置生成 csv 编码数据并自动压缩成 gzip 格式文件的压缩等级。此过程直接编码并压缩,而非先生成 csv 文件再压缩。可选值为:
- none不压缩
- fastgzip 1级压缩
- balancegzip 6级压缩
- bestgzip 9级压缩
#### 标签列与数据列
@ -383,6 +410,17 @@ interval 控制休眠时间,避免持续查询慢查询消耗 CPU单位为
</details>
### 生成 CSV 文件 JSON 示例
<details>
<summary>csv-export.json</summary>
```json
{{#include /TDengine/tools/taos-tools/example/csv-export.json}}
```
</details>
查看更多 json 配置文件示例可 [点击这里](https://github.com/taosdata/TDengine/tree/main/tools/taos-tools/example)
## 输出性能指标

View File

@ -0,0 +1,237 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import os
import json
import csv
import datetime
import frame
import frame.eos
import frame.etool
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
class TDTestCase(TBase):
def caseDescription(self):
"""
[TS-5089] taosBenchmark support exporting csv
"""
def clear_directory(self, target_dir: str = 'csv'):
try:
if not os.path.exists(target_dir):
return
for entry in os.listdir(target_dir):
entry_path = os.path.join(target_dir, entry)
if os.path.isfile(entry_path) or os.path.islink(entry_path):
os.unlink(entry_path)
else:
shutil.rmtree(entry_path)
tdLog.debug("clear succ, dir: %s " % (target_dir))
except OSError as e:
tdLog.exit("clear fail, dir: %s " % (target_dir))
def convert_timestamp(self, ts, ts_format):
dt_object = datetime.datetime.fromtimestamp(ts / 1000)
formatted_time = dt_object.strftime(ts_format)
return formatted_time
def calc_time_slice_partitions(self, total_start_ts, total_end_ts, ts_step, ts_format, ts_interval):
interval_days = int(ts_interval[:-1])
n_days_millis = interval_days * 24 * 60 * 60 * 1000
dt_start = datetime.datetime.fromtimestamp(total_start_ts / 1000.0)
formatted_str = dt_start.strftime(ts_format)
s0_dt = datetime.datetime.strptime(formatted_str, ts_format)
s0 = int(s0_dt.timestamp() * 1000)
partitions = []
current_s = s0
while current_s <= total_end_ts:
current_end = current_s + n_days_millis
start_actual = max(current_s, total_start_ts)
end_actual = min(current_end, total_end_ts)
if start_actual >= end_actual:
count = 0
else:
delta = end_actual - start_actual
delta
delta_start = start_actual - total_start_ts
delta_end = end_actual - total_start_ts
if delta % ts_step:
count = delta // ts_step + 1
else:
count = delta // ts_step
partitions.append({
"start_ts": current_s,
"end_ts": current_end,
"start_time": self.convert_timestamp(current_s, ts_format),
"end_time": self.convert_timestamp(current_end, ts_format),
"count": count
})
current_s += n_days_millis
# partitions = [p for p in partitions if p['count'] > 0]
return partitions
def check_stb_csv_correct(self, csv_file_name, all_rows, interlace_rows):
# open as csv
tbname_idx = 14
count = 0
batch = 0
name = ""
header = True
with open(csv_file_name) as file:
rows = csv.reader(file)
for row in rows:
if header:
header = False
continue
# interlace_rows
if name == "":
name = row[tbname_idx]
batch = 1
else:
if name == row[tbname_idx]:
batch += 1
else:
# switch to another child table
if batch != interlace_rows:
tdLog.exit(f"interlace_rows invalid. tbName={name} actual={batch} expected={interlace_rows} i={count} csv_file_name={csv_file_name}")
batch = 1
name = row[tbname_idx]
# count ++
count += 1
# batch
if batch != interlace_rows:
tdLog.exit(f"interlace_rows invalid. tbName={name} actual={batch} expected={interlace_rows} i={count} csv_file_name={csv_file_name}")
# check all rows
if count != all_rows:
tdLog.exit(f"all_rows invalid. actual={count} expected={all_rows} csv_file_name={csv_file_name}")
tdLog.info(f"Check generate csv file successfully. csv_file_name={csv_file_name} count={count} interlace_rows={batch}")
# check correct
def check_stb_correct(self, data, db, stb):
filepath = data["output_dir"]
stbName = stb["name"]
child_count = stb["childtable_to"] - stb["childtable_from"]
insert_rows = stb["insert_rows"]
interlace_rows = stb["interlace_rows"]
csv_file_prefix = stb["csv_file_prefix"]
csv_ts_format = stb.get("csv_ts_format", None)
csv_ts_interval = stb.get("csv_ts_interval", None)
ts_step = stb["timestamp_step"]
total_start_ts = stb["start_timestamp"]
total_end_ts = total_start_ts + ts_step * insert_rows
all_rows = child_count * insert_rows
if interlace_rows > 0:
# interlace
if not csv_ts_format:
# normal
csv_file_name = f"{filepath}{csv_file_prefix}.csv"
self.check_stb_csv_correct(csv_file_name, all_rows, interlace_rows)
else:
# time slice
partitions = self.calc_time_slice_partitions(total_start_ts, total_end_ts, ts_step, csv_ts_format, csv_ts_interval)
for part in partitions:
csv_file_name = f"{filepath}{csv_file_prefix}_{part['start_time']}_{part['end_time']}.csv"
self.check_stb_csv_correct(csv_file_name, part['count'] * child_count, interlace_rows)
else:
# batch
thread_count = stb["thread_count"]
interlace_rows = insert_rows
if not csv_ts_format:
# normal
for i in range(thread_count):
csv_file_name = f"{filepath}{csv_file_prefix}_{i + 1}.csv"
if i < child_count % thread_count:
self.check_stb_csv_correct(csv_file_name, insert_rows * (child_count // thread_count + 1), interlace_rows)
else:
self.check_stb_csv_correct(csv_file_name, insert_rows * (child_count // thread_count), interlace_rows)
else:
# time slice
for i in range(thread_count):
partitions = self.calc_time_slice_partitions(total_start_ts, total_end_ts, ts_step, csv_ts_format, csv_ts_interval)
for part in partitions:
csv_file_name = f"{filepath}{csv_file_prefix}_{i + 1}_{part['start_time']}_{part['end_time']}.csv"
if i < child_count % thread_count:
slice_rows = part['count'] * (child_count // thread_count + 1)
else:
slice_rows = part['count'] * (child_count // thread_count)
self.check_stb_csv_correct(csv_file_name, slice_rows, part['count'])
# check result
def check_result(self, jsonFile):
# csv
with open(jsonFile) as file:
data = json.load(file)
# read json
database = data["databases"][0]
stables = database["super_tables"]
for stable in stables:
# check csv context correct
self.check_stb_correct(data, database, stable)
def check_export_csv(self, benchmark, jsonFile, options=""):
# clear
self.clear_directory()
# exec
cmd = f"{benchmark} {options} -f {jsonFile}"
eos.exe(cmd)
# check result
self.check_result(jsonFile)
def run(self):
# path
benchmark = etool.benchMarkFile()
# do check interlace normal
json = "tools/benchmark/basic/json/csv-export.json"
self.check_export_csv(benchmark, json)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -1,110 +0,0 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import os
import json
import csv
import frame
import frame.etool
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
class TDTestCase(TBase):
def caseDescription(self):
"""
[TD-11510] taosBenchmark test cases
"""
# check correct
def checkCorrect(self, csvFile, allRows, interlaceRows):
# open as csv
count = 0
batch = 0
name = ""
with open(csvFile) as file:
rows = csv.reader(file)
for row in rows:
# interlaceRows
if name == "":
name = row[0]
batch = 1
else:
if name == row[0]:
batch += 1
else:
# switch to another child table
if batch != interlaceRows:
tdLog.exit(f"interlaceRows invalid. tbName={name} real={batch} expect={interlaceRows} i={count} csvFile={csvFile}")
batch = 1
name = row[0]
# count ++
count += 1
# batch
if batch != interlaceRows:
tdLog.exit(f"interlaceRows invalid. tbName={name} real={batch} expect={interlaceRows} i={count} csvFile={csvFile}")
# check all rows
if count != allRows:
tdLog.exit(f"allRows invalid. real={count} expect={allRows} csvFile={csvFile}")
tdLog.info(f"Check generate csv file successfully. csvFile={csvFile} count={count} interlaceRows={batch}")
# check result
def checResult(self, jsonFile):
# csv
with open(jsonFile) as file:
data = json.load(file)
# read json
database = data["databases"][0]
out = data["csvPath"]
dbName = database["dbinfo"]["name"]
stables = database["super_tables"]
for stable in stables:
stbName = stable["name"]
childs = stable["childtable_count"]
insertRows = stable["insert_rows"]
interlaceRows = stable["interlace_rows"]
csvFile = f"{out}{dbName}-{stbName}.csv"
rows = childs * insertRows
if interlaceRows == 0:
interlaceRows = insertRows
# check csv context correct
self.checkCorrect(csvFile, rows, interlaceRows)
def checkExportCsv(self, benchmark, jsonFile, options=""):
# exec
cmd = f"{benchmark} {options} -f {jsonFile}"
os.system(cmd)
# check result
self.checResult(jsonFile)
def run(self):
# path
benchmark = etool.benchMarkFile()
# do check
json = "tools/benchmark/basic/json/exportCsv.json"
self.checkExportCsv(benchmark, json)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -0,0 +1,172 @@
{
"filetype": "csvfile",
"output_dir": "./csv/",
"databases": [
{
"dbinfo": {
"name": "csvdb",
"precision": "ms"
},
"super_tables": [
{
"name": "interlace-normal",
"childtable_count": 1010,
"insert_rows": 1000,
"interlace_rows": 1,
"childtable_prefix": "d",
"timestamp_step": 1000000,
"start_timestamp":1700000000000,
"childtable_from": 1000,
"childtable_to": 1010,
"csv_file_prefix": "data",
"csv_output_header": "yes",
"csv_tbname_alias": "device_id",
"csv_compress_level": "none",
"columns": [
{ "type": "bool", "name": "bc"},
{ "type": "float", "name": "fc", "min": 1},
{ "type": "double", "name": "dc", "min":10, "max":10},
{ "type": "tinyint", "name": "ti"},
{ "type": "smallint", "name": "si"},
{ "type": "int", "name": "ic", "fillNull":"false"},
{ "type": "bigint", "name": "bi"},
{ "type": "utinyint", "name": "uti"},
{ "type": "usmallint", "name": "usi", "min":100, "max":120},
{ "type": "uint", "name": "ui"},
{ "type": "ubigint", "name": "ubi"},
{ "type": "binary", "name": "bin", "len": 16},
{ "type": "nchar", "name": "nch", "len": 16}
],
"tags": [
{"type": "tinyint", "name": "groupid","max": 10,"min": 1},
{"type": "binary", "name": "location", "len": 16,
"values": ["San Francisco", "Los Angles", "San Diego",
"San Jose", "Palo Alto", "Campbell", "Mountain View",
"Sunnyvale", "Santa Clara", "Cupertino"]
}
]
},
{
"name": "interlace-timeslice",
"childtable_count": 1010,
"insert_rows": 1000,
"interlace_rows": 1,
"childtable_prefix": "d",
"timestamp_step": 1000000,
"start_timestamp":1700000000000,
"childtable_from": 1000,
"childtable_to": 1010,
"csv_file_prefix": "data",
"csv_ts_format": "%Y%m%d",
"csv_ts_interval": "1d",
"csv_output_header": "yes",
"csv_tbname_alias": "device_id",
"csv_compress_level": "none",
"columns": [
{ "type": "bool", "name": "bc"},
{ "type": "float", "name": "fc", "min": 1},
{ "type": "double", "name": "dc", "min":10, "max":10},
{ "type": "tinyint", "name": "ti"},
{ "type": "smallint", "name": "si"},
{ "type": "int", "name": "ic", "fillNull":"false"},
{ "type": "bigint", "name": "bi"},
{ "type": "utinyint", "name": "uti"},
{ "type": "usmallint", "name": "usi", "min":100, "max":120},
{ "type": "uint", "name": "ui"},
{ "type": "ubigint", "name": "ubi"},
{ "type": "binary", "name": "bin", "len": 16},
{ "type": "nchar", "name": "nch", "len": 16}
],
"tags": [
{"type": "tinyint", "name": "groupid","max": 10,"min": 1},
{"type": "binary", "name": "location", "len": 16,
"values": ["San Francisco", "Los Angles", "San Diego",
"San Jose", "Palo Alto", "Campbell", "Mountain View",
"Sunnyvale", "Santa Clara", "Cupertino"]
}
]
},
{
"name": "batch-normal",
"childtable_count": 1010,
"insert_rows": 1000,
"interlace_rows": 0,
"thread_count": 8,
"childtable_prefix": "d",
"timestamp_step": 1000000,
"start_timestamp":1700000000000,
"childtable_from": 1000,
"childtable_to": 1010,
"csv_file_prefix": "data",
"csv_output_header": "yes",
"csv_tbname_alias": "device_id",
"csv_compress_level": "none",
"columns": [
{ "type": "bool", "name": "bc"},
{ "type": "float", "name": "fc", "min": 1},
{ "type": "double", "name": "dc", "min":10, "max":10},
{ "type": "tinyint", "name": "ti"},
{ "type": "smallint", "name": "si"},
{ "type": "int", "name": "ic", "fillNull":"false"},
{ "type": "bigint", "name": "bi"},
{ "type": "utinyint", "name": "uti"},
{ "type": "usmallint", "name": "usi", "min":100, "max":120},
{ "type": "uint", "name": "ui"},
{ "type": "ubigint", "name": "ubi"},
{ "type": "binary", "name": "bin", "len": 16},
{ "type": "nchar", "name": "nch", "len": 16}
],
"tags": [
{"type": "tinyint", "name": "groupid","max": 10,"min": 1},
{"type": "binary", "name": "location", "len": 16,
"values": ["San Francisco", "Los Angles", "San Diego",
"San Jose", "Palo Alto", "Campbell", "Mountain View",
"Sunnyvale", "Santa Clara", "Cupertino"]
}
]
},
{
"name": "batch-timeslice",
"childtable_count": 1010,
"insert_rows": 1000,
"interlace_rows": 0,
"thread_count": 8,
"childtable_prefix": "d",
"timestamp_step": 1000000,
"start_timestamp":1700000000000,
"childtable_from": 1000,
"childtable_to": 1010,
"csv_file_prefix": "data",
"csv_ts_format": "%Y%m%d",
"csv_ts_interval": "1d",
"csv_output_header": "yes",
"csv_tbname_alias": "device_id",
"csv_compress_level": "none",
"columns": [
{ "type": "bool", "name": "bc"},
{ "type": "float", "name": "fc", "min": 1},
{ "type": "double", "name": "dc", "min":10, "max":10},
{ "type": "tinyint", "name": "ti"},
{ "type": "smallint", "name": "si"},
{ "type": "int", "name": "ic", "fillNull":"false"},
{ "type": "bigint", "name": "bi"},
{ "type": "utinyint", "name": "uti"},
{ "type": "usmallint", "name": "usi", "min":100, "max":120},
{ "type": "uint", "name": "ui"},
{ "type": "ubigint", "name": "ubi"},
{ "type": "binary", "name": "bin", "len": 16},
{ "type": "nchar", "name": "nch", "len": 16}
],
"tags": [
{"type": "tinyint", "name": "groupid","max": 10,"min": 1},
{"type": "binary", "name": "location", "len": 16,
"values": ["San Francisco", "Los Angles", "San Diego",
"San Jose", "Palo Alto", "Campbell", "Mountain View",
"Sunnyvale", "Santa Clara", "Cupertino"]
}
]
}
]
}
]
}

View File

@ -1,78 +0,0 @@
{
"filetype": "csvfile",
"csvPath": "./csv/",
"num_of_records_per_req": 10000,
"databases": [
{
"dbinfo": {
"name": "csvdb"
},
"super_tables": [
{
"name": "batchTable",
"childtable_count": 5,
"insert_rows": 100,
"interlace_rows": 0,
"childtable_prefix": "d",
"timestamp_step": 10,
"start_timestamp":1600000000000,
"columns": [
{ "type": "bool", "name": "bc"},
{ "type": "float", "name": "fc", "min": 1},
{ "type": "double", "name": "dc", "min":10, "max":10},
{ "type": "tinyint", "name": "ti"},
{ "type": "smallint", "name": "si"},
{ "type": "int", "name": "ic", "fillNull":"false"},
{ "type": "bigint", "name": "bi"},
{ "type": "utinyint", "name": "uti"},
{ "type": "usmallint", "name": "usi", "min":100, "max":120},
{ "type": "uint", "name": "ui"},
{ "type": "ubigint", "name": "ubi"},
{ "type": "binary", "name": "bin", "len": 16},
{ "type": "nchar", "name": "nch", "len": 16}
],
"tags": [
{"type": "tinyint", "name": "groupid","max": 10,"min": 1},
{"type": "binary", "name": "location", "len": 16,
"values": ["San Francisco", "Los Angles", "San Diego",
"San Jose", "Palo Alto", "Campbell", "Mountain View",
"Sunnyvale", "Santa Clara", "Cupertino"]
}
]
},
{
"name": "interlaceTable",
"childtable_count": 5,
"insert_rows": 100,
"interlace_rows": 10,
"childtable_prefix": "d",
"timestamp_step": 1000,
"start_timestamp":1700000000000,
"columns": [
{ "type": "bool", "name": "bc"},
{ "type": "float", "name": "fc", "min":16},
{ "type": "double", "name": "dc", "min":16},
{ "type": "tinyint", "name": "ti"},
{ "type": "smallint", "name": "si"},
{ "type": "int", "name": "ic", "fillNull":"false"},
{ "type": "bigint", "name": "bi"},
{ "type": "utinyint", "name": "uti"},
{ "type": "usmallint", "name": "usi"},
{ "type": "uint", "name": "ui"},
{ "type": "ubigint", "name": "ubi"},
{ "type": "binary", "name": "bin", "len": 32},
{ "type": "nchar", "name": "nch", "len": 64}
],
"tags": [
{"type": "tinyint", "name": "groupid","max": 10,"min": 1},
{"type": "binary", "name": "location", "len": 16,
"values": ["San Francisco", "Los Angles", "San Diego",
"San Jose", "Palo Alto", "Campbell", "Mountain View",
"Sunnyvale", "Santa Clara", "Cupertino"]
}
]
}
]
}
]
}

View File

@ -93,7 +93,8 @@
,,y,army,./pytest.sh python3 ./test.py -f tools/benchmark/basic/default_json.py
,,y,army,./pytest.sh python3 ./test.py -f tools/benchmark/basic/demo.py
,,y,army,./pytest.sh python3 ./test.py -f tools/benchmark/basic/exportCsv.py
,,y,army,./pytest.sh python3 ./test.py -f tools/benchmark/basic/csv-export.py
# ,,y,army,./pytest.sh python3 ./test.py -f tools/benchmark/basic/csv-import.py
,,y,army,./pytest.sh python3 ./test.py -f tools/benchmark/basic/from-to.py
,,y,army,./pytest.sh python3 ./test.py -f tools/benchmark/basic/from-to-continue.py

View File

@ -18,7 +18,7 @@ taosdump 是用于备份 TDengine 数据到本地目录和从本地目录恢复
#### 对于 Ubuntu/Debian 系统
```shell
sudo apt install libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g pkg-config libssl-dev
sudo apt install libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g zlib1g-dev pkg-config libssl-dev
```
#### 对于 CentOS 7/RHEL 系统

View File

@ -0,0 +1,54 @@
{
"filetype": "csvfile",
"output_path": "./csv/",
"databases": [
{
"dbinfo": {
"name": "csvdb",
"precision": "ms"
},
"super_tables": [
{
"name": "table",
"childtable_count": 1010,
"insert_rows": 1000,
"interlace_rows": 1,
"childtable_prefix": "d",
"timestamp_step": 1000000,
"start_timestamp": "2020-10-01 00:00:00.000",
"childtable_from": 1000,
"childtable_to": 1010,
"csv_file_prefix": "data",
"csv_ts_format": "%Y%m%d",
"csv_ts_interval": "1d",
"csv_output_header": "true",
"csv_tbname_alias": "device_id",
"csv_compress_level": "none",
"columns": [
{ "type": "bool", "name": "bc"},
{ "type": "float", "name": "fc", "min": 1},
{ "type": "double", "name": "dc", "min":10, "max":10},
{ "type": "tinyint", "name": "ti"},
{ "type": "smallint", "name": "si"},
{ "type": "int", "name": "ic", "fillNull":"false"},
{ "type": "bigint", "name": "bi"},
{ "type": "utinyint", "name": "uti"},
{ "type": "usmallint", "name": "usi", "min":100, "max":120},
{ "type": "uint", "name": "ui"},
{ "type": "ubigint", "name": "ubi"},
{ "type": "binary", "name": "bin", "len": 16},
{ "type": "nchar", "name": "nch", "len": 16}
],
"tags": [
{"type": "tinyint", "name": "groupid","max": 10,"min": 1},
{"type": "binary", "name": "location", "len": 16,
"values": ["San Francisco", "Los Angles", "San Diego",
"San Jose", "Palo Alto", "Campbell", "Mountain View",
"Sunnyvale", "Santa Clara", "Cupertino"]
}
]
}
]
}
]
}

View File

@ -20,6 +20,9 @@
#define CURL_STATICLIB
#define ALLOW_FORBID_FUNC
#define MAX(a, b) ((a) > (b) ? (a) : (b))
#define MIN(a, b) ((a) < (b) ? (a) : (b))
#ifdef LINUX
#ifndef _ALPINE
@ -476,6 +479,13 @@ typedef struct SChildTable_S {
int32_t pkCnt;
} SChildTable;
typedef enum {
CSV_COMPRESS_NONE = 0,
CSV_COMPRESS_FAST = 1,
CSV_COMPRESS_BALANCE = 6,
CSV_COMPRESS_BEST = 9
} CsvCompressionLevel;
#define PRIMARY_KEY "PRIMARY KEY"
typedef struct SSuperTable_S {
char *stbName;
@ -578,6 +588,15 @@ typedef struct SSuperTable_S {
// execute sqls after create super table
char **sqls;
char* csv_file_prefix;
char* csv_ts_format;
char* csv_ts_interval;
char* csv_tbname_alias;
long csv_ts_intv_secs;
bool csv_output_header;
CsvCompressionLevel csv_compress_level;
} SSuperTable;
typedef struct SDbCfg_S {
@ -776,9 +795,11 @@ typedef struct SArguments_S {
bool mistMode;
bool escape_character;
bool pre_load_tb_meta;
char csvPath[MAX_FILE_NAME_LEN];
bool bind_vgroup;
char* output_path;
char output_path_buf[MAX_PATH_LEN];
} SArguments;
typedef struct SBenchConn {

View File

@ -16,21 +16,82 @@
#ifndef INC_BENCHCSV_H_
#define INC_BENCHCSV_H_
#include <bench.h>
#include <zlib.h>
#include "bench.h"
typedef enum {
CSV_NAMING_I_SINGLE,
CSV_NAMING_I_TIME_SLICE,
CSV_NAMING_B_THREAD,
CSV_NAMING_B_THREAD_TIME_SLICE
} CsvNamingType;
typedef enum {
CSV_ERR_OK = 0,
CSV_ERR_OPEN_FAILED,
CSV_ERR_WRITE_FAILED
} CsvIoError;
typedef struct {
const char* filename;
CsvCompressionLevel compress_level;
CsvIoError result;
union {
gzFile gf;
FILE* fp;
} handle;
} CsvFileHandle;
typedef struct {
char* buf;
int length;
} CsvRowTagsBuf;
typedef struct {
char* buf;
int buf_size;
int length;
} CsvRowColsBuf;
typedef struct {
CsvNamingType naming_type;
size_t total_threads;
char mode[MIDDLE_BUFF_LEN];
char thread_formatter[SMALL_BUFF_LEN];
char csv_header[LARGE_BUFF_LEN];
int csv_header_length;
SDataBase* db;
SSuperTable* stb;
int64_t start_ts;
int64_t end_ts;
int64_t ts_step;
int64_t interlace_step;
} CsvWriteMeta;
typedef struct {
uint64_t ctb_start_idx;
uint64_t ctb_end_idx;
uint64_t ctb_count;
uint64_t total_rows;
time_t start_secs;
time_t end_secs;
int64_t start_ts;
int64_t end_ts;
size_t thread_id;
bool output_header;
int tags_buf_size;
CsvRowTagsBuf* tags_buf_array;
CsvRowColsBuf* cols_buf;
} CsvThreadMeta;
typedef struct {
CsvWriteMeta* write_meta;
CsvThreadMeta thread_meta;
} CsvThreadArgs;
int csvTestProcess();
int genWithSTable(SDataBase* db, SSuperTable* stb, char* outDir);
char * genTagData(char* buf, SSuperTable* stb, int64_t i, int64_t *k);
char * genColumnData(char* colData, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k);
int32_t genRowByField(char* buf, BArray* fields, int16_t fieldCnt, char* binanryPrefix, char* ncharPrefix, int64_t *k);
void obtainCsvFile(char * outFile, SDataBase* db, SSuperTable* stb, char* outDir);
int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain);
int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain);
#endif // INC_BENCHCSV_H_

View File

@ -16,6 +16,9 @@
#ifndef INC_BENCHLOG_H_
#define INC_BENCHLOG_H_
#include <stdbool.h>
#include <stdint.h>
//
// suport thread safe log module
//
@ -53,7 +56,7 @@ void exitLog();
(int32_t)timeSecs.tv_usec); \
fprintf(stdout, "DEBG: "); \
fprintf(stdout, "%s(%d) ", __FILE__, __LINE__); \
fprintf(stdout, "" fmt, __VA_ARGS__); \
fprintf(stdout, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_STDOUT); \
} \
} while (0)
@ -74,7 +77,7 @@ void exitLog();
(int32_t)timeSecs.tv_usec); \
fprintf(stdout, "DEBG: "); \
fprintf(stdout, "%s(%d) ", __FILE__, __LINE__); \
fprintf(stdout, "" fmt, __VA_ARGS__); \
fprintf(stdout, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_STDOUT); \
} \
} while (0)
@ -94,7 +97,7 @@ void exitLog();
do { \
if (g_arguments->debug_print) { \
lockLog(LOG_STDOUT); \
fprintf(stdout, "" fmt, __VA_ARGS__); \
fprintf(stdout, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_STDOUT); \
} \
} while (0)
@ -102,14 +105,14 @@ void exitLog();
#define infoPrintNoTimestamp(fmt, ...) \
do { \
lockLog(LOG_STDOUT); \
fprintf(stdout, "" fmt, __VA_ARGS__); \
fprintf(stdout, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_STDOUT); \
} while (0)
#define infoPrintNoTimestampToFile(fmt, ...) \
do { \
lockLog(LOG_RESULT); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_RESULT); \
} while (0)
@ -126,7 +129,7 @@ void exitLog();
ptm->tm_mon + 1, \
ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \
(int32_t)timeSecs.tv_usec); \
fprintf(stdout, "INFO: " fmt, __VA_ARGS__); \
fprintf(stdout, "INFO: " fmt, ##__VA_ARGS__); \
unlockLog(LOG_STDOUT); \
} while (0)
@ -142,7 +145,7 @@ void exitLog();
fprintf(g_arguments->fpOfInsertResult,"[%02d/%02d %02d:%02d:%02d.%06d] ", ptm->tm_mon + 1, \
ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \
(int32_t)timeSecs.tv_usec); \
fprintf(g_arguments->fpOfInsertResult, "INFO: " fmt, __VA_ARGS__);\
fprintf(g_arguments->fpOfInsertResult, "INFO: " fmt, ##__VA_ARGS__);\
unlockLog(LOG_RESULT); \
} while (0)
@ -160,7 +163,7 @@ void exitLog();
ptm->tm_mon + 1, \
ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \
(int32_t)timeSecs.tv_usec); \
fprintf(stderr, "PERF: " fmt, __VA_ARGS__); \
fprintf(stderr, "PERF: " fmt, ##__VA_ARGS__); \
unlockLog(LOG_STDERR); \
if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \
lockLog(LOG_RESULT); \
@ -172,7 +175,7 @@ void exitLog();
(int32_t)timeSecs.tv_usec); \
fprintf(g_arguments->fpOfInsertResult, "PERF: "); \
fprintf(g_arguments->fpOfInsertResult, \
"" fmt, __VA_ARGS__); \
"" fmt, ##__VA_ARGS__); \
unlockLog(LOG_RESULT); \
} \
} \
@ -196,7 +199,7 @@ void exitLog();
if (g_arguments->debug_print) { \
fprintf(stderr, "%s(%d) ", __FILE__, __LINE__); \
} \
fprintf(stderr, "" fmt, __VA_ARGS__); \
fprintf(stderr, "" fmt, ##__VA_ARGS__); \
fprintf(stderr, "\033[0m"); \
unlockLog(LOG_STDERR); \
if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \
@ -206,7 +209,7 @@ void exitLog();
ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \
(int32_t)timeSecs.tv_usec); \
fprintf(g_arguments->fpOfInsertResult, "ERROR: "); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_RESULT); \
} \
} while (0)
@ -229,7 +232,7 @@ void exitLog();
if (g_arguments->debug_print) { \
fprintf(stderr, "%s(%d) ", __FILE__, __LINE__); \
} \
fprintf(stderr, "" fmt, __VA_ARGS__); \
fprintf(stderr, "" fmt, ##__VA_ARGS__); \
fprintf(stderr, "\033[0m"); \
unlockLog(LOG_STDERR); \
if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \
@ -239,7 +242,7 @@ void exitLog();
ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \
(int32_t)timeSecs.tv_usec); \
fprintf(g_arguments->fpOfInsertResult, "WARN: "); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_RESULT); \
} \
} while (0)
@ -262,7 +265,7 @@ void exitLog();
if (g_arguments->debug_print) { \
fprintf(stderr, "%s(%d) ", __FILE__, __LINE__); \
} \
fprintf(stderr, "" fmt, __VA_ARGS__); \
fprintf(stderr, "" fmt, ##__VA_ARGS__); \
fprintf(stderr, "\033[0m"); \
unlockLog(LOG_STDERR); \
if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \
@ -272,7 +275,7 @@ void exitLog();
ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \
(int32_t)timeSecs.tv_usec); \
fprintf(g_arguments->fpOfInsertResult, "SUCC: "); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_RESULT); \
} \
} while (0)

View File

@ -316,6 +316,9 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin
ENDIF ()
ENDIF ()
TARGET_LINK_LIBRARIES(taosBenchmark z)
ELSE ()
ADD_DEFINITIONS(-DWINDOWS)
SET(CMAKE_C_STANDARD 11)
@ -331,6 +334,7 @@ ELSE ()
ADD_DEPENDENCIES(taosdump deps-snappy)
ADD_DEPENDENCIES(taosdump deps-libargp)
ADD_DEPENDENCIES(taosdump apache-avro)
ADD_DEPENDENCIES(taosBenchmark tools-zlib)
IF (${WEBSOCKET})
INCLUDE_DIRECTORIES(/usr/local/include/)
@ -362,4 +366,8 @@ ELSE ()
ENDIF ()
TARGET_LINK_LIBRARIES(taosBenchmark taos msvcregex pthread toolscJson ${WEBSOCKET_LINK_FLAGS})
TARGET_LINK_LIBRARIES(taosBenchmark zlibstatic)
ENDIF ()

File diff suppressed because it is too large Load Diff

View File

@ -1405,6 +1405,65 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) {
}
}
}
// csv file prefix
tools_cJSON* csv_fp = tools_cJSON_GetObjectItem(stbInfo, "csv_file_prefix");
if (csv_fp && csv_fp->type == tools_cJSON_String && csv_fp->valuestring != NULL) {
superTable->csv_file_prefix = csv_fp->valuestring;
} else {
superTable->csv_file_prefix = "data";
}
// csv timestamp format
tools_cJSON* csv_tf = tools_cJSON_GetObjectItem(stbInfo, "csv_ts_format");
if (csv_tf && csv_tf->type == tools_cJSON_String && csv_tf->valuestring != NULL) {
superTable->csv_ts_format = csv_tf->valuestring;
} else {
superTable->csv_ts_format = NULL;
}
// csv timestamp format
tools_cJSON* csv_ti = tools_cJSON_GetObjectItem(stbInfo, "csv_ts_interval");
if (csv_ti && csv_ti->type == tools_cJSON_String && csv_ti->valuestring != NULL) {
superTable->csv_ts_interval = csv_ti->valuestring;
} else {
superTable->csv_ts_interval = "1d";
}
// csv output header
superTable->csv_output_header = true;
tools_cJSON* oph = tools_cJSON_GetObjectItem(stbInfo, "csv_output_header");
if (oph && oph->type == tools_cJSON_String && oph->valuestring != NULL) {
if (0 == strcasecmp(oph->valuestring, "yes")) {
superTable->csv_output_header = true;
} else if (0 == strcasecmp(oph->valuestring, "no")) {
superTable->csv_output_header = false;
}
}
// csv tbname alias
tools_cJSON* tba = tools_cJSON_GetObjectItem(stbInfo, "csv_tbname_alias");
if (tba && tba->type == tools_cJSON_String && tba->valuestring != NULL) {
superTable->csv_tbname_alias = tba->valuestring;
} else {
superTable->csv_tbname_alias = "device_id";
}
// csv compression level
tools_cJSON* cl = tools_cJSON_GetObjectItem(stbInfo, "csv_compress_level");
if (cl && cl->type == tools_cJSON_String && cl->valuestring != NULL) {
if (0 == strcasecmp(cl->valuestring, "none")) {
superTable->csv_compress_level = CSV_COMPRESS_NONE;
} else if (0 == strcasecmp(cl->valuestring, "fast")) {
superTable->csv_compress_level = CSV_COMPRESS_FAST;
} else if (0 == strcasecmp(cl->valuestring, "balance")) {
superTable->csv_compress_level = CSV_COMPRESS_BALANCE;
} else if (0 == strcasecmp(cl->valuestring, "best")) {
superTable->csv_compress_level = CSV_COMPRESS_BEST;
}
} else {
superTable->csv_compress_level = CSV_COMPRESS_NONE;
}
}
return 0;
}
@ -1586,26 +1645,14 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) {
}
}
g_arguments->csvPath[0] = 0;
tools_cJSON *csv = tools_cJSON_GetObjectItem(json, "csvPath");
if (csv && (csv->type == tools_cJSON_String)
&& (csv->valuestring != NULL)) {
tstrncpy(g_arguments->csvPath, csv->valuestring, MAX_FILE_NAME_LEN);
}
size_t len = strlen(g_arguments->csvPath);
if(len == 0) {
// set default with current path
strcpy(g_arguments->csvPath, "./output/");
mkdir(g_arguments->csvPath, 0775);
// output dir
tools_cJSON* opp = tools_cJSON_GetObjectItem(json, "output_dir");
if (opp && opp->type == tools_cJSON_String && opp->valuestring != NULL) {
g_arguments->output_path = opp->valuestring;
} else {
// append end
if (g_arguments->csvPath[len-1] != '/' ) {
strcat(g_arguments->csvPath, "/");
}
mkdir(g_arguments->csvPath, 0775);
g_arguments->output_path = "./output/";
}
(void)mkdir(g_arguments->output_path, 0775);
code = 0;
return code;

View File

@ -153,7 +153,7 @@ int main(int argc, char* argv[]) {
}
} else if (g_arguments->test_mode == CSVFILE_TEST) {
if (csvTestProcess()) {
errorPrint("%s", "query test process failed\n");
errorPrint("%s", "generate csv process failed\n");
ret = -1;
}
} else if (g_arguments->test_mode == QUERY_TEST) {