diff --git a/docs/en/14-reference/02-tools/10-taosbenchmark.md b/docs/en/14-reference/02-tools/10-taosbenchmark.md index cfc92b4e0b..19f498eab1 100644 --- a/docs/en/14-reference/02-tools/10-taosbenchmark.md +++ b/docs/en/14-reference/02-tools/10-taosbenchmark.md @@ -188,9 +188,12 @@ taosBenchmark -A INT,DOUBLE,NCHAR,BINARY\(16\) The parameters listed in this section apply to all functional modes. -- **filetype**: The function to test, possible values are `insert`, `query`, and `subscribe`. Corresponding to insert, query, and subscribe functions. Only one can be specified in each configuration file. +- **filetype**: The function to test, possible values are `insert`, `query`, `subscribe` and `csvfile`. Corresponding to insert, query, subscribe and generate csv file functions. Only one can be specified in each configuration file. + - **cfgdir**: Directory where the TDengine client configuration file is located, default path is /etc/taos. +- **output_dir**: The directory specified for output files. When the feature category is csvfile, it refers to the directory where the generated csv files will be saved. The default value is ./output/. + - **host**: Specifies the FQDN of the TDengine server to connect to, default value is localhost. - **port**: The port number of the TDengine server to connect to, default value is 6030. @@ -283,6 +286,27 @@ Parameters related to supertable creation are configured in the `super_tables` s - **repeat_ts_max** : Numeric type, when composite primary key is enabled, specifies the maximum number of records with the same timestamp to be generated - **sqls** : Array of strings type, specifies the array of sql to be executed after the supertable is successfully created, the table name specified in sql must be prefixed with the database name, otherwise an unspecified database error will occur +- **csv_file_prefix**: String type, sets the prefix for the names of the generated csv files. Default value is "data". + +- **csv_ts_format**: String type, sets the format of the time string in the names of the generated csv files, following the `strftime` format standard. If not set, files will not be split by time intervals. Supported patterns include: + - %Y: Year as a four-digit number (e.g., 2025) + - %m: Month as a two-digit number (01 to 12) + - %d: Day of the month as a two-digit number (01 to 31) + - %H: Hour in 24-hour format as a two-digit number (00 to 23) + - %M: Minute as a two-digit number (00 to 59) + - %S: Second as a two-digit number (00 to 59) + +- **csv_ts_interval**: String type, sets the time interval for splitting generated csv file names. Supports daily, hourly, minute, and second intervals such as 1d/2h/30m/40s. The default value is "1d". + +- **csv_output_header**: String type, sets whether the generated csv files should contain column header descriptions. The default value is "yes". + +- **csv_tbname_alias**: String type, sets the alias for the tbname field in the column header descriptions of csv files. The default value is "device_id". + +- **csv_compress_level**: String type, sets the compression level for generating csv-encoded data and automatically compressing it into gzip file. This process directly encodes and compresses the data, rather than first generating a csv file and then compressing it. Possible values are: + - none: No compression + - fast: gzip level 1 compression + - balance: gzip level 6 compression + - best: gzip level 9 compression #### Tag and Data Columns @@ -478,6 +502,17 @@ Note: Data types in the taosBenchmark configuration file must be in lowercase to +### Export CSV File Example + +
+csv-export.json + +```json +{{#include /TDengine/tools/taos-tools/example/csv-export.json}} +``` + +
+ Other json examples see [here](https://github.com/taosdata/TDengine/tree/main/tools/taos-tools/example) ## Output Performance Indicators diff --git a/docs/zh/14-reference/02-tools/10-taosbenchmark.md b/docs/zh/14-reference/02-tools/10-taosbenchmark.md index 56f9e5b122..1f97b0702a 100644 --- a/docs/zh/14-reference/02-tools/10-taosbenchmark.md +++ b/docs/zh/14-reference/02-tools/10-taosbenchmark.md @@ -93,14 +93,17 @@ taosBenchmark -f 本节所列参数适用于所有功能模式。 -- **filetype**:功能分类,可选值为 `insert`、`query` 和 `subscribe`。分别对应插入、查询和订阅功能。每个配置文件中只能指定其中之一。 +- **filetype**:功能分类,可选值为 `insert`、`query`、`subscribe` 和 `csvfile`。分别对应插入、查询、订阅和生成 csv 文件功能。每个配置文件中只能指定其中之一。 + - **cfgdir**:TDengine 客户端配置文件所在的目录,默认路径是 /etc/taos 。 +- **output_dir**:指定输出文件的目录,当功能分类是 `csvfile` 时,指生成的 csv 文件的保存目录,默认值为 ./output/ 。 + - **host**:指定要连接的 TDengine 服务端的 FQDN,默认值为 localhost 。 - **port**:要连接的 TDengine 服务器的端口号,默认值为 6030 。 -- **user**:用于连接 TDengine 服务端的用户名,默认为 root 。 +- **user**:用于连接 TDengine 服务端的用户名,默认值为 root 。 - **password**:用于连接 TDengine 服务端的密码,默认值为 taosdata。 @@ -184,10 +187,34 @@ taosBenchmark -f - **tags_file**:仅当 insert_mode 为 taosc,rest 的模式下生效。最终的 tag 的数值与 childtable_count 有关,如果 csv 文件内的 tag 数据行小于给定的子表数量,那么会循环读取 csv 文件数据直到生成 childtable_count 指定的子表数量;否则则只会读取 childtable_count 行 tag 数据。也即最终生成的子表数量为二者取小。 - **primary_key**:指定超级表是否有复合主键,取值 1 和 0,复合主键列只能是超级表的第二列,指定生成复合主键后要确保第二列符合复合主键的数据类型,否则会报错。 + - **repeat_ts_min**:数值类型,复合主键开启情况下指定生成相同时间戳记录的最小个数,生成相同时间戳记录的个数是在范围[repeat_ts_min, repeat_ts_max] 内的随机值,最小值等于最大值时为固定个数。 + - **repeat_ts_max**:数值类型,复合主键开启情况下指定生成相同时间戳记录的最大个数。 + - **sqls**:字符串数组类型,指定超级表创建成功后要执行的 sql 数组,sql 中指定表名前面要带数据库名,否则会报未指定数据库错误。 +- **csv_file_prefix**:字符串类型,设置生成的 csv 文件名称的前缀,默认值为 data 。 + +- **csv_ts_format**:字符串类型,设置生成的 csv 文件名称中时间字符串的格式,格式遵循 `strftime` 格式标准,如果没有设置表示不按照时间段切分文件。支持的模式有: + - %Y: 年份,四位数表示(例如:2025) + - %m: 月份,两位数表示(01到12) + - %d: 一个月中的日子,两位数表示(01到31) + - %H: 小时,24小时制,两位数表示(00到23) + - %M: 分钟,两位数表示(00到59) + - %S: 秒,两位数表示(00到59) + +- **csv_ts_interval**:字符串类型,设置生成的 csv 文件名称中时间段间隔,支持天、小时、分钟、秒级间隔,如 1d/2h/30m/40s,默认值为 1d 。 + +- **csv_output_header**:字符串类型,设置生成的 csv 文件是否包含列头描述,默认值为 yes 。 + +- **csv_tbname_alias**:字符串类型,设置 csv 文件列头描述中 tbname 字段的别名,默认值为 device_id 。 + +- **csv_compress_level**:字符串类型,设置生成 csv 编码数据并自动压缩成 gzip 格式文件的压缩等级。此过程直接编码并压缩,而非先生成 csv 文件再压缩。可选值为: + - none:不压缩 + - fast:gzip 1级压缩 + - balance:gzip 6级压缩 + - best:gzip 9级压缩 #### 标签列与数据列 @@ -383,6 +410,17 @@ interval 控制休眠时间,避免持续查询慢查询消耗 CPU,单位为 +### 生成 CSV 文件 JSON 示例 + +
+csv-export.json + +```json +{{#include /TDengine/tools/taos-tools/example/csv-export.json}} +``` + +
+ 查看更多 json 配置文件示例可 [点击这里](https://github.com/taosdata/TDengine/tree/main/tools/taos-tools/example) ## 输出性能指标 diff --git a/tests/army/tools/benchmark/basic/csv-export.py b/tests/army/tools/benchmark/basic/csv-export.py new file mode 100644 index 0000000000..65ffb3e541 --- /dev/null +++ b/tests/army/tools/benchmark/basic/csv-export.py @@ -0,0 +1,237 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- +import os +import json +import csv +import datetime + +import frame +import frame.eos +import frame.etool +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame import * + + +class TDTestCase(TBase): + def caseDescription(self): + """ + [TS-5089] taosBenchmark support exporting csv + """ + + + def clear_directory(self, target_dir: str = 'csv'): + try: + if not os.path.exists(target_dir): + return + for entry in os.listdir(target_dir): + entry_path = os.path.join(target_dir, entry) + if os.path.isfile(entry_path) or os.path.islink(entry_path): + os.unlink(entry_path) + else: + shutil.rmtree(entry_path) + + tdLog.debug("clear succ, dir: %s " % (target_dir)) + except OSError as e: + tdLog.exit("clear fail, dir: %s " % (target_dir)) + + + def convert_timestamp(self, ts, ts_format): + dt_object = datetime.datetime.fromtimestamp(ts / 1000) + formatted_time = dt_object.strftime(ts_format) + return formatted_time + + + def calc_time_slice_partitions(self, total_start_ts, total_end_ts, ts_step, ts_format, ts_interval): + interval_days = int(ts_interval[:-1]) + n_days_millis = interval_days * 24 * 60 * 60 * 1000 + + dt_start = datetime.datetime.fromtimestamp(total_start_ts / 1000.0) + formatted_str = dt_start.strftime(ts_format) + s0_dt = datetime.datetime.strptime(formatted_str, ts_format) + s0 = int(s0_dt.timestamp() * 1000) + + partitions = [] + current_s = s0 + + while current_s <= total_end_ts: + current_end = current_s + n_days_millis + start_actual = max(current_s, total_start_ts) + end_actual = min(current_end, total_end_ts) + + if start_actual >= end_actual: + count = 0 + else: + delta = end_actual - start_actual + delta + delta_start = start_actual - total_start_ts + delta_end = end_actual - total_start_ts + if delta % ts_step: + count = delta // ts_step + 1 + else: + count = delta // ts_step + + partitions.append({ + "start_ts": current_s, + "end_ts": current_end, + "start_time": self.convert_timestamp(current_s, ts_format), + "end_time": self.convert_timestamp(current_end, ts_format), + "count": count + }) + + current_s += n_days_millis + + # partitions = [p for p in partitions if p['count'] > 0] + return partitions + + + def check_stb_csv_correct(self, csv_file_name, all_rows, interlace_rows): + # open as csv + tbname_idx = 14 + count = 0 + batch = 0 + name = "" + header = True + with open(csv_file_name) as file: + rows = csv.reader(file) + for row in rows: + if header: + header = False + continue + + # interlace_rows + if name == "": + name = row[tbname_idx] + batch = 1 + else: + if name == row[tbname_idx]: + batch += 1 + else: + # switch to another child table + if batch != interlace_rows: + tdLog.exit(f"interlace_rows invalid. tbName={name} actual={batch} expected={interlace_rows} i={count} csv_file_name={csv_file_name}") + batch = 1 + name = row[tbname_idx] + # count ++ + count += 1 + # batch + if batch != interlace_rows: + tdLog.exit(f"interlace_rows invalid. tbName={name} actual={batch} expected={interlace_rows} i={count} csv_file_name={csv_file_name}") + + # check all rows + if count != all_rows: + tdLog.exit(f"all_rows invalid. actual={count} expected={all_rows} csv_file_name={csv_file_name}") + + tdLog.info(f"Check generate csv file successfully. csv_file_name={csv_file_name} count={count} interlace_rows={batch}") + + + # check correct + def check_stb_correct(self, data, db, stb): + filepath = data["output_dir"] + stbName = stb["name"] + child_count = stb["childtable_to"] - stb["childtable_from"] + insert_rows = stb["insert_rows"] + interlace_rows = stb["interlace_rows"] + csv_file_prefix = stb["csv_file_prefix"] + csv_ts_format = stb.get("csv_ts_format", None) + csv_ts_interval = stb.get("csv_ts_interval", None) + + ts_step = stb["timestamp_step"] + total_start_ts = stb["start_timestamp"] + total_end_ts = total_start_ts + ts_step * insert_rows + + + all_rows = child_count * insert_rows + if interlace_rows > 0: + # interlace + + if not csv_ts_format: + # normal + csv_file_name = f"{filepath}{csv_file_prefix}.csv" + self.check_stb_csv_correct(csv_file_name, all_rows, interlace_rows) + else: + # time slice + partitions = self.calc_time_slice_partitions(total_start_ts, total_end_ts, ts_step, csv_ts_format, csv_ts_interval) + for part in partitions: + csv_file_name = f"{filepath}{csv_file_prefix}_{part['start_time']}_{part['end_time']}.csv" + self.check_stb_csv_correct(csv_file_name, part['count'] * child_count, interlace_rows) + else: + # batch + thread_count = stb["thread_count"] + interlace_rows = insert_rows + if not csv_ts_format: + # normal + for i in range(thread_count): + csv_file_name = f"{filepath}{csv_file_prefix}_{i + 1}.csv" + if i < child_count % thread_count: + self.check_stb_csv_correct(csv_file_name, insert_rows * (child_count // thread_count + 1), interlace_rows) + else: + self.check_stb_csv_correct(csv_file_name, insert_rows * (child_count // thread_count), interlace_rows) + else: + # time slice + for i in range(thread_count): + partitions = self.calc_time_slice_partitions(total_start_ts, total_end_ts, ts_step, csv_ts_format, csv_ts_interval) + for part in partitions: + csv_file_name = f"{filepath}{csv_file_prefix}_{i + 1}_{part['start_time']}_{part['end_time']}.csv" + if i < child_count % thread_count: + slice_rows = part['count'] * (child_count // thread_count + 1) + else: + slice_rows = part['count'] * (child_count // thread_count) + + self.check_stb_csv_correct(csv_file_name, slice_rows, part['count']) + + + # check result + def check_result(self, jsonFile): + # csv + with open(jsonFile) as file: + data = json.load(file) + + # read json + database = data["databases"][0] + stables = database["super_tables"] + + for stable in stables: + # check csv context correct + self.check_stb_correct(data, database, stable) + + + def check_export_csv(self, benchmark, jsonFile, options=""): + # clear + self.clear_directory() + + # exec + cmd = f"{benchmark} {options} -f {jsonFile}" + eos.exe(cmd) + + # check result + self.check_result(jsonFile) + + + def run(self): + # path + benchmark = etool.benchMarkFile() + + # do check interlace normal + json = "tools/benchmark/basic/json/csv-export.json" + self.check_export_csv(benchmark, json) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/army/tools/benchmark/basic/exportCsv.py b/tests/army/tools/benchmark/basic/exportCsv.py deleted file mode 100644 index b8b3828ea6..0000000000 --- a/tests/army/tools/benchmark/basic/exportCsv.py +++ /dev/null @@ -1,110 +0,0 @@ -################################################################### -# Copyright (c) 2016 by TAOS Technologies, Inc. -# All rights reserved. -# -# This file is proprietary and confidential to TAOS Technologies. -# No part of this file may be reproduced, stored, transmitted, -# disclosed or used in any form or by any means other than as -# expressly provided by the written permission from Jianhui Tao -# -################################################################### - -# -*- coding: utf-8 -*- -import os -import json -import csv - -import frame -import frame.etool -from frame.log import * -from frame.cases import * -from frame.sql import * -from frame.caseBase import * -from frame import * - - -class TDTestCase(TBase): - def caseDescription(self): - """ - [TD-11510] taosBenchmark test cases - """ - # check correct - def checkCorrect(self, csvFile, allRows, interlaceRows): - # open as csv - count = 0 - batch = 0 - name = "" - with open(csvFile) as file: - rows = csv.reader(file) - for row in rows: - # interlaceRows - if name == "": - name = row[0] - batch = 1 - else: - if name == row[0]: - batch += 1 - else: - # switch to another child table - if batch != interlaceRows: - tdLog.exit(f"interlaceRows invalid. tbName={name} real={batch} expect={interlaceRows} i={count} csvFile={csvFile}") - batch = 1 - name = row[0] - # count ++ - count += 1 - # batch - if batch != interlaceRows: - tdLog.exit(f"interlaceRows invalid. tbName={name} real={batch} expect={interlaceRows} i={count} csvFile={csvFile}") - - # check all rows - if count != allRows: - tdLog.exit(f"allRows invalid. real={count} expect={allRows} csvFile={csvFile}") - - tdLog.info(f"Check generate csv file successfully. csvFile={csvFile} count={count} interlaceRows={batch}") - - # check result - def checResult(self, jsonFile): - # csv - with open(jsonFile) as file: - data = json.load(file) - - # read json - database = data["databases"][0] - out = data["csvPath"] - dbName = database["dbinfo"]["name"] - stables = database["super_tables"] - for stable in stables: - stbName = stable["name"] - childs = stable["childtable_count"] - insertRows = stable["insert_rows"] - interlaceRows = stable["interlace_rows"] - csvFile = f"{out}{dbName}-{stbName}.csv" - rows = childs * insertRows - if interlaceRows == 0: - interlaceRows = insertRows - # check csv context correct - self.checkCorrect(csvFile, rows, interlaceRows) - - def checkExportCsv(self, benchmark, jsonFile, options=""): - # exec - cmd = f"{benchmark} {options} -f {jsonFile}" - os.system(cmd) - - # check result - self.checResult(jsonFile) - - def run(self): - # path - benchmark = etool.benchMarkFile() - - # do check - json = "tools/benchmark/basic/json/exportCsv.json" - self.checkExportCsv(benchmark, json) - - def stop(self): - tdSql.close() - tdLog.success("%s successfully executed" % __file__) - - -tdCases.addWindows(__file__, TDTestCase()) -tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/army/tools/benchmark/basic/json/csv-export.json b/tests/army/tools/benchmark/basic/json/csv-export.json new file mode 100644 index 0000000000..88beab0de1 --- /dev/null +++ b/tests/army/tools/benchmark/basic/json/csv-export.json @@ -0,0 +1,172 @@ +{ + "filetype": "csvfile", + "output_dir": "./csv/", + "databases": [ + { + "dbinfo": { + "name": "csvdb", + "precision": "ms" + }, + "super_tables": [ + { + "name": "interlace-normal", + "childtable_count": 1010, + "insert_rows": 1000, + "interlace_rows": 1, + "childtable_prefix": "d", + "timestamp_step": 1000000, + "start_timestamp":1700000000000, + "childtable_from": 1000, + "childtable_to": 1010, + "csv_file_prefix": "data", + "csv_output_header": "yes", + "csv_tbname_alias": "device_id", + "csv_compress_level": "none", + "columns": [ + { "type": "bool", "name": "bc"}, + { "type": "float", "name": "fc", "min": 1}, + { "type": "double", "name": "dc", "min":10, "max":10}, + { "type": "tinyint", "name": "ti"}, + { "type": "smallint", "name": "si"}, + { "type": "int", "name": "ic", "fillNull":"false"}, + { "type": "bigint", "name": "bi"}, + { "type": "utinyint", "name": "uti"}, + { "type": "usmallint", "name": "usi", "min":100, "max":120}, + { "type": "uint", "name": "ui"}, + { "type": "ubigint", "name": "ubi"}, + { "type": "binary", "name": "bin", "len": 16}, + { "type": "nchar", "name": "nch", "len": 16} + ], + "tags": [ + {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, + {"type": "binary", "name": "location", "len": 16, + "values": ["San Francisco", "Los Angles", "San Diego", + "San Jose", "Palo Alto", "Campbell", "Mountain View", + "Sunnyvale", "Santa Clara", "Cupertino"] + } + ] + }, + { + "name": "interlace-timeslice", + "childtable_count": 1010, + "insert_rows": 1000, + "interlace_rows": 1, + "childtable_prefix": "d", + "timestamp_step": 1000000, + "start_timestamp":1700000000000, + "childtable_from": 1000, + "childtable_to": 1010, + "csv_file_prefix": "data", + "csv_ts_format": "%Y%m%d", + "csv_ts_interval": "1d", + "csv_output_header": "yes", + "csv_tbname_alias": "device_id", + "csv_compress_level": "none", + "columns": [ + { "type": "bool", "name": "bc"}, + { "type": "float", "name": "fc", "min": 1}, + { "type": "double", "name": "dc", "min":10, "max":10}, + { "type": "tinyint", "name": "ti"}, + { "type": "smallint", "name": "si"}, + { "type": "int", "name": "ic", "fillNull":"false"}, + { "type": "bigint", "name": "bi"}, + { "type": "utinyint", "name": "uti"}, + { "type": "usmallint", "name": "usi", "min":100, "max":120}, + { "type": "uint", "name": "ui"}, + { "type": "ubigint", "name": "ubi"}, + { "type": "binary", "name": "bin", "len": 16}, + { "type": "nchar", "name": "nch", "len": 16} + ], + "tags": [ + {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, + {"type": "binary", "name": "location", "len": 16, + "values": ["San Francisco", "Los Angles", "San Diego", + "San Jose", "Palo Alto", "Campbell", "Mountain View", + "Sunnyvale", "Santa Clara", "Cupertino"] + } + ] + }, + { + "name": "batch-normal", + "childtable_count": 1010, + "insert_rows": 1000, + "interlace_rows": 0, + "thread_count": 8, + "childtable_prefix": "d", + "timestamp_step": 1000000, + "start_timestamp":1700000000000, + "childtable_from": 1000, + "childtable_to": 1010, + "csv_file_prefix": "data", + "csv_output_header": "yes", + "csv_tbname_alias": "device_id", + "csv_compress_level": "none", + "columns": [ + { "type": "bool", "name": "bc"}, + { "type": "float", "name": "fc", "min": 1}, + { "type": "double", "name": "dc", "min":10, "max":10}, + { "type": "tinyint", "name": "ti"}, + { "type": "smallint", "name": "si"}, + { "type": "int", "name": "ic", "fillNull":"false"}, + { "type": "bigint", "name": "bi"}, + { "type": "utinyint", "name": "uti"}, + { "type": "usmallint", "name": "usi", "min":100, "max":120}, + { "type": "uint", "name": "ui"}, + { "type": "ubigint", "name": "ubi"}, + { "type": "binary", "name": "bin", "len": 16}, + { "type": "nchar", "name": "nch", "len": 16} + ], + "tags": [ + {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, + {"type": "binary", "name": "location", "len": 16, + "values": ["San Francisco", "Los Angles", "San Diego", + "San Jose", "Palo Alto", "Campbell", "Mountain View", + "Sunnyvale", "Santa Clara", "Cupertino"] + } + ] + }, + { + "name": "batch-timeslice", + "childtable_count": 1010, + "insert_rows": 1000, + "interlace_rows": 0, + "thread_count": 8, + "childtable_prefix": "d", + "timestamp_step": 1000000, + "start_timestamp":1700000000000, + "childtable_from": 1000, + "childtable_to": 1010, + "csv_file_prefix": "data", + "csv_ts_format": "%Y%m%d", + "csv_ts_interval": "1d", + "csv_output_header": "yes", + "csv_tbname_alias": "device_id", + "csv_compress_level": "none", + "columns": [ + { "type": "bool", "name": "bc"}, + { "type": "float", "name": "fc", "min": 1}, + { "type": "double", "name": "dc", "min":10, "max":10}, + { "type": "tinyint", "name": "ti"}, + { "type": "smallint", "name": "si"}, + { "type": "int", "name": "ic", "fillNull":"false"}, + { "type": "bigint", "name": "bi"}, + { "type": "utinyint", "name": "uti"}, + { "type": "usmallint", "name": "usi", "min":100, "max":120}, + { "type": "uint", "name": "ui"}, + { "type": "ubigint", "name": "ubi"}, + { "type": "binary", "name": "bin", "len": 16}, + { "type": "nchar", "name": "nch", "len": 16} + ], + "tags": [ + {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, + {"type": "binary", "name": "location", "len": 16, + "values": ["San Francisco", "Los Angles", "San Diego", + "San Jose", "Palo Alto", "Campbell", "Mountain View", + "Sunnyvale", "Santa Clara", "Cupertino"] + } + ] + } + ] + } + ] +} diff --git a/tests/army/tools/benchmark/basic/json/exportCsv.json b/tests/army/tools/benchmark/basic/json/exportCsv.json deleted file mode 100644 index 05a7341eb6..0000000000 --- a/tests/army/tools/benchmark/basic/json/exportCsv.json +++ /dev/null @@ -1,78 +0,0 @@ -{ - "filetype": "csvfile", - "csvPath": "./csv/", - "num_of_records_per_req": 10000, - "databases": [ - { - "dbinfo": { - "name": "csvdb" - }, - "super_tables": [ - { - "name": "batchTable", - "childtable_count": 5, - "insert_rows": 100, - "interlace_rows": 0, - "childtable_prefix": "d", - "timestamp_step": 10, - "start_timestamp":1600000000000, - "columns": [ - { "type": "bool", "name": "bc"}, - { "type": "float", "name": "fc", "min": 1}, - { "type": "double", "name": "dc", "min":10, "max":10}, - { "type": "tinyint", "name": "ti"}, - { "type": "smallint", "name": "si"}, - { "type": "int", "name": "ic", "fillNull":"false"}, - { "type": "bigint", "name": "bi"}, - { "type": "utinyint", "name": "uti"}, - { "type": "usmallint", "name": "usi", "min":100, "max":120}, - { "type": "uint", "name": "ui"}, - { "type": "ubigint", "name": "ubi"}, - { "type": "binary", "name": "bin", "len": 16}, - { "type": "nchar", "name": "nch", "len": 16} - ], - "tags": [ - {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, - {"type": "binary", "name": "location", "len": 16, - "values": ["San Francisco", "Los Angles", "San Diego", - "San Jose", "Palo Alto", "Campbell", "Mountain View", - "Sunnyvale", "Santa Clara", "Cupertino"] - } - ] - }, - { - "name": "interlaceTable", - "childtable_count": 5, - "insert_rows": 100, - "interlace_rows": 10, - "childtable_prefix": "d", - "timestamp_step": 1000, - "start_timestamp":1700000000000, - "columns": [ - { "type": "bool", "name": "bc"}, - { "type": "float", "name": "fc", "min":16}, - { "type": "double", "name": "dc", "min":16}, - { "type": "tinyint", "name": "ti"}, - { "type": "smallint", "name": "si"}, - { "type": "int", "name": "ic", "fillNull":"false"}, - { "type": "bigint", "name": "bi"}, - { "type": "utinyint", "name": "uti"}, - { "type": "usmallint", "name": "usi"}, - { "type": "uint", "name": "ui"}, - { "type": "ubigint", "name": "ubi"}, - { "type": "binary", "name": "bin", "len": 32}, - { "type": "nchar", "name": "nch", "len": 64} - ], - "tags": [ - {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, - {"type": "binary", "name": "location", "len": 16, - "values": ["San Francisco", "Los Angles", "San Diego", - "San Jose", "Palo Alto", "Campbell", "Mountain View", - "Sunnyvale", "Santa Clara", "Cupertino"] - } - ] - } - ] - } - ] -} diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 1bfd86df4d..f79c4c291b 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -93,7 +93,8 @@ ,,y,army,./pytest.sh python3 ./test.py -f tools/benchmark/basic/default_json.py ,,y,army,./pytest.sh python3 ./test.py -f tools/benchmark/basic/demo.py -,,y,army,./pytest.sh python3 ./test.py -f tools/benchmark/basic/exportCsv.py +,,y,army,./pytest.sh python3 ./test.py -f tools/benchmark/basic/csv-export.py +# ,,y,army,./pytest.sh python3 ./test.py -f tools/benchmark/basic/csv-import.py ,,y,army,./pytest.sh python3 ./test.py -f tools/benchmark/basic/from-to.py ,,y,army,./pytest.sh python3 ./test.py -f tools/benchmark/basic/from-to-continue.py diff --git a/tools/taos-tools/README-CN.md b/tools/taos-tools/README-CN.md index 3def035f68..da14e81cd1 100644 --- a/tools/taos-tools/README-CN.md +++ b/tools/taos-tools/README-CN.md @@ -18,7 +18,7 @@ taosdump 是用于备份 TDengine 数据到本地目录和从本地目录恢复 #### 对于 Ubuntu/Debian 系统 ```shell -sudo apt install libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g pkg-config libssl-dev +sudo apt install libjansson-dev libsnappy-dev liblzma-dev libz-dev zlib1g zlib1g-dev pkg-config libssl-dev ``` #### 对于 CentOS 7/RHEL 系统 diff --git a/tools/taos-tools/example/csv-export.json b/tools/taos-tools/example/csv-export.json new file mode 100644 index 0000000000..7fa3e96f2f --- /dev/null +++ b/tools/taos-tools/example/csv-export.json @@ -0,0 +1,54 @@ +{ + "filetype": "csvfile", + "output_path": "./csv/", + "databases": [ + { + "dbinfo": { + "name": "csvdb", + "precision": "ms" + }, + "super_tables": [ + { + "name": "table", + "childtable_count": 1010, + "insert_rows": 1000, + "interlace_rows": 1, + "childtable_prefix": "d", + "timestamp_step": 1000000, + "start_timestamp": "2020-10-01 00:00:00.000", + "childtable_from": 1000, + "childtable_to": 1010, + "csv_file_prefix": "data", + "csv_ts_format": "%Y%m%d", + "csv_ts_interval": "1d", + "csv_output_header": "true", + "csv_tbname_alias": "device_id", + "csv_compress_level": "none", + "columns": [ + { "type": "bool", "name": "bc"}, + { "type": "float", "name": "fc", "min": 1}, + { "type": "double", "name": "dc", "min":10, "max":10}, + { "type": "tinyint", "name": "ti"}, + { "type": "smallint", "name": "si"}, + { "type": "int", "name": "ic", "fillNull":"false"}, + { "type": "bigint", "name": "bi"}, + { "type": "utinyint", "name": "uti"}, + { "type": "usmallint", "name": "usi", "min":100, "max":120}, + { "type": "uint", "name": "ui"}, + { "type": "ubigint", "name": "ubi"}, + { "type": "binary", "name": "bin", "len": 16}, + { "type": "nchar", "name": "nch", "len": 16} + ], + "tags": [ + {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, + {"type": "binary", "name": "location", "len": 16, + "values": ["San Francisco", "Los Angles", "San Diego", + "San Jose", "Palo Alto", "Campbell", "Mountain View", + "Sunnyvale", "Santa Clara", "Cupertino"] + } + ] + } + ] + } + ] +} diff --git a/tools/taos-tools/inc/bench.h b/tools/taos-tools/inc/bench.h index 97c82ac2fa..c413d953b7 100644 --- a/tools/taos-tools/inc/bench.h +++ b/tools/taos-tools/inc/bench.h @@ -20,6 +20,9 @@ #define CURL_STATICLIB #define ALLOW_FORBID_FUNC +#define MAX(a, b) ((a) > (b) ? (a) : (b)) +#define MIN(a, b) ((a) < (b) ? (a) : (b)) + #ifdef LINUX #ifndef _ALPINE @@ -476,6 +479,13 @@ typedef struct SChildTable_S { int32_t pkCnt; } SChildTable; +typedef enum { + CSV_COMPRESS_NONE = 0, + CSV_COMPRESS_FAST = 1, + CSV_COMPRESS_BALANCE = 6, + CSV_COMPRESS_BEST = 9 +} CsvCompressionLevel; + #define PRIMARY_KEY "PRIMARY KEY" typedef struct SSuperTable_S { char *stbName; @@ -578,6 +588,15 @@ typedef struct SSuperTable_S { // execute sqls after create super table char **sqls; + + char* csv_file_prefix; + char* csv_ts_format; + char* csv_ts_interval; + char* csv_tbname_alias; + long csv_ts_intv_secs; + bool csv_output_header; + CsvCompressionLevel csv_compress_level; + } SSuperTable; typedef struct SDbCfg_S { @@ -776,9 +795,11 @@ typedef struct SArguments_S { bool mistMode; bool escape_character; bool pre_load_tb_meta; - char csvPath[MAX_FILE_NAME_LEN]; - bool bind_vgroup; + + char* output_path; + char output_path_buf[MAX_PATH_LEN]; + } SArguments; typedef struct SBenchConn { diff --git a/tools/taos-tools/inc/benchCsv.h b/tools/taos-tools/inc/benchCsv.h index 25d0c55eba..6bf531cf14 100644 --- a/tools/taos-tools/inc/benchCsv.h +++ b/tools/taos-tools/inc/benchCsv.h @@ -16,21 +16,82 @@ #ifndef INC_BENCHCSV_H_ #define INC_BENCHCSV_H_ -#include +#include + +#include "bench.h" + + +typedef enum { + CSV_NAMING_I_SINGLE, + CSV_NAMING_I_TIME_SLICE, + CSV_NAMING_B_THREAD, + CSV_NAMING_B_THREAD_TIME_SLICE +} CsvNamingType; + +typedef enum { + CSV_ERR_OK = 0, + CSV_ERR_OPEN_FAILED, + CSV_ERR_WRITE_FAILED +} CsvIoError; + +typedef struct { + const char* filename; + CsvCompressionLevel compress_level; + CsvIoError result; + union { + gzFile gf; + FILE* fp; + } handle; +} CsvFileHandle; + +typedef struct { + char* buf; + int length; +} CsvRowTagsBuf; + +typedef struct { + char* buf; + int buf_size; + int length; +} CsvRowColsBuf; + +typedef struct { + CsvNamingType naming_type; + size_t total_threads; + char mode[MIDDLE_BUFF_LEN]; + char thread_formatter[SMALL_BUFF_LEN]; + char csv_header[LARGE_BUFF_LEN]; + int csv_header_length; + SDataBase* db; + SSuperTable* stb; + int64_t start_ts; + int64_t end_ts; + int64_t ts_step; + int64_t interlace_step; +} CsvWriteMeta; + +typedef struct { + uint64_t ctb_start_idx; + uint64_t ctb_end_idx; + uint64_t ctb_count; + uint64_t total_rows; + time_t start_secs; + time_t end_secs; + int64_t start_ts; + int64_t end_ts; + size_t thread_id; + bool output_header; + int tags_buf_size; + CsvRowTagsBuf* tags_buf_array; + CsvRowColsBuf* cols_buf; +} CsvThreadMeta; + +typedef struct { + CsvWriteMeta* write_meta; + CsvThreadMeta thread_meta; +} CsvThreadArgs; + int csvTestProcess(); -int genWithSTable(SDataBase* db, SSuperTable* stb, char* outDir); - -char * genTagData(char* buf, SSuperTable* stb, int64_t i, int64_t *k); - -char * genColumnData(char* colData, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k); - -int32_t genRowByField(char* buf, BArray* fields, int16_t fieldCnt, char* binanryPrefix, char* ncharPrefix, int64_t *k); - -void obtainCsvFile(char * outFile, SDataBase* db, SSuperTable* stb, char* outDir); - -int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain); -int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain); - #endif // INC_BENCHCSV_H_ diff --git a/tools/taos-tools/inc/benchLog.h b/tools/taos-tools/inc/benchLog.h index 426112bcd8..ab74aaff75 100644 --- a/tools/taos-tools/inc/benchLog.h +++ b/tools/taos-tools/inc/benchLog.h @@ -16,6 +16,9 @@ #ifndef INC_BENCHLOG_H_ #define INC_BENCHLOG_H_ +#include +#include + // // suport thread safe log module // @@ -53,7 +56,7 @@ void exitLog(); (int32_t)timeSecs.tv_usec); \ fprintf(stdout, "DEBG: "); \ fprintf(stdout, "%s(%d) ", __FILE__, __LINE__); \ - fprintf(stdout, "" fmt, __VA_ARGS__); \ + fprintf(stdout, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_STDOUT); \ } \ } while (0) @@ -74,7 +77,7 @@ void exitLog(); (int32_t)timeSecs.tv_usec); \ fprintf(stdout, "DEBG: "); \ fprintf(stdout, "%s(%d) ", __FILE__, __LINE__); \ - fprintf(stdout, "" fmt, __VA_ARGS__); \ + fprintf(stdout, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_STDOUT); \ } \ } while (0) @@ -94,7 +97,7 @@ void exitLog(); do { \ if (g_arguments->debug_print) { \ lockLog(LOG_STDOUT); \ - fprintf(stdout, "" fmt, __VA_ARGS__); \ + fprintf(stdout, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_STDOUT); \ } \ } while (0) @@ -102,14 +105,14 @@ void exitLog(); #define infoPrintNoTimestamp(fmt, ...) \ do { \ lockLog(LOG_STDOUT); \ - fprintf(stdout, "" fmt, __VA_ARGS__); \ + fprintf(stdout, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_STDOUT); \ } while (0) #define infoPrintNoTimestampToFile(fmt, ...) \ do { \ lockLog(LOG_RESULT); \ - fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \ + fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_RESULT); \ } while (0) @@ -126,7 +129,7 @@ void exitLog(); ptm->tm_mon + 1, \ ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \ (int32_t)timeSecs.tv_usec); \ - fprintf(stdout, "INFO: " fmt, __VA_ARGS__); \ + fprintf(stdout, "INFO: " fmt, ##__VA_ARGS__); \ unlockLog(LOG_STDOUT); \ } while (0) @@ -142,7 +145,7 @@ void exitLog(); fprintf(g_arguments->fpOfInsertResult,"[%02d/%02d %02d:%02d:%02d.%06d] ", ptm->tm_mon + 1, \ ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \ (int32_t)timeSecs.tv_usec); \ - fprintf(g_arguments->fpOfInsertResult, "INFO: " fmt, __VA_ARGS__);\ + fprintf(g_arguments->fpOfInsertResult, "INFO: " fmt, ##__VA_ARGS__);\ unlockLog(LOG_RESULT); \ } while (0) @@ -160,7 +163,7 @@ void exitLog(); ptm->tm_mon + 1, \ ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \ (int32_t)timeSecs.tv_usec); \ - fprintf(stderr, "PERF: " fmt, __VA_ARGS__); \ + fprintf(stderr, "PERF: " fmt, ##__VA_ARGS__); \ unlockLog(LOG_STDERR); \ if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \ lockLog(LOG_RESULT); \ @@ -172,7 +175,7 @@ void exitLog(); (int32_t)timeSecs.tv_usec); \ fprintf(g_arguments->fpOfInsertResult, "PERF: "); \ fprintf(g_arguments->fpOfInsertResult, \ - "" fmt, __VA_ARGS__); \ + "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_RESULT); \ } \ } \ @@ -196,7 +199,7 @@ void exitLog(); if (g_arguments->debug_print) { \ fprintf(stderr, "%s(%d) ", __FILE__, __LINE__); \ } \ - fprintf(stderr, "" fmt, __VA_ARGS__); \ + fprintf(stderr, "" fmt, ##__VA_ARGS__); \ fprintf(stderr, "\033[0m"); \ unlockLog(LOG_STDERR); \ if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \ @@ -206,7 +209,7 @@ void exitLog(); ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \ (int32_t)timeSecs.tv_usec); \ fprintf(g_arguments->fpOfInsertResult, "ERROR: "); \ - fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \ + fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_RESULT); \ } \ } while (0) @@ -229,7 +232,7 @@ void exitLog(); if (g_arguments->debug_print) { \ fprintf(stderr, "%s(%d) ", __FILE__, __LINE__); \ } \ - fprintf(stderr, "" fmt, __VA_ARGS__); \ + fprintf(stderr, "" fmt, ##__VA_ARGS__); \ fprintf(stderr, "\033[0m"); \ unlockLog(LOG_STDERR); \ if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \ @@ -239,7 +242,7 @@ void exitLog(); ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \ (int32_t)timeSecs.tv_usec); \ fprintf(g_arguments->fpOfInsertResult, "WARN: "); \ - fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \ + fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_RESULT); \ } \ } while (0) @@ -262,7 +265,7 @@ void exitLog(); if (g_arguments->debug_print) { \ fprintf(stderr, "%s(%d) ", __FILE__, __LINE__); \ } \ - fprintf(stderr, "" fmt, __VA_ARGS__); \ + fprintf(stderr, "" fmt, ##__VA_ARGS__); \ fprintf(stderr, "\033[0m"); \ unlockLog(LOG_STDERR); \ if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \ @@ -272,7 +275,7 @@ void exitLog(); ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \ (int32_t)timeSecs.tv_usec); \ fprintf(g_arguments->fpOfInsertResult, "SUCC: "); \ - fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \ + fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_RESULT); \ } \ } while (0) diff --git a/tools/taos-tools/src/CMakeLists.txt b/tools/taos-tools/src/CMakeLists.txt index 1f0899db5c..320fb1f413 100644 --- a/tools/taos-tools/src/CMakeLists.txt +++ b/tools/taos-tools/src/CMakeLists.txt @@ -316,6 +316,9 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin ENDIF () ENDIF () + + TARGET_LINK_LIBRARIES(taosBenchmark z) + ELSE () ADD_DEFINITIONS(-DWINDOWS) SET(CMAKE_C_STANDARD 11) @@ -331,6 +334,7 @@ ELSE () ADD_DEPENDENCIES(taosdump deps-snappy) ADD_DEPENDENCIES(taosdump deps-libargp) ADD_DEPENDENCIES(taosdump apache-avro) + ADD_DEPENDENCIES(taosBenchmark tools-zlib) IF (${WEBSOCKET}) INCLUDE_DIRECTORIES(/usr/local/include/) @@ -362,4 +366,8 @@ ELSE () ENDIF () TARGET_LINK_LIBRARIES(taosBenchmark taos msvcregex pthread toolscJson ${WEBSOCKET_LINK_FLAGS}) + + TARGET_LINK_LIBRARIES(taosBenchmark zlibstatic) + ENDIF () + diff --git a/tools/taos-tools/src/benchCsv.c b/tools/taos-tools/src/benchCsv.c index 8186438643..f8c43dbb97 100644 --- a/tools/taos-tools/src/benchCsv.c +++ b/tools/taos-tools/src/benchCsv.c @@ -10,293 +10,1373 @@ * FITNESS FOR A PARTICULAR PURPOSE. */ -#include -#include "benchLog.h" -#include -#include +#include +#include +#include +#include +#include +#include "benchLog.h" +#include "benchData.h" +#include "benchDataMix.h" +#include "benchCsv.h" // // main etry // #define SHOW_CNT 100000 +#define GEN_ROW_FIELDS_TAG 0 +#define GEN_ROW_FIELDS_COL 1 -static void *csvWriteThread(void *param) { - // write thread - for (int i = 0; i < g_arguments->databases->size; i++) { - // database - SDataBase * db = benchArrayGet(g_arguments->databases, i); - for (int j=0; j < db->superTbls->size; j++) { - // stb - SSuperTable* stb = benchArrayGet(db->superTbls, j); - // gen csv - int ret = genWithSTable(db, stb, g_arguments->csvPath); - if(ret != 0) { - errorPrint("failed generate to csv. db=%s stb=%s error code=%d \n", db->dbName, stb->stbName, ret); - return NULL; - } + + +static int csvValidateParamTsFormat(const char* csv_ts_format) { + if (!csv_ts_format) return 0; + + struct tm test_tm = { + .tm_year = 70, + .tm_mon = 0, + .tm_mday = 1, + .tm_hour = 0, + .tm_min = 0, + .tm_sec = 0, + .tm_isdst = -1 + }; + mktime(&test_tm); + + char buffer[1024]; + size_t len = strftime(buffer, sizeof(buffer), csv_ts_format, &test_tm); + if (len == 0) { + return -1; + } + +#ifdef _WIN32 + const char* invalid_chars = "/\\:*?\"<>|"; +#else + const char* invalid_chars = "/\\?\"<>|"; +#endif + if (strpbrk(buffer, invalid_chars) != NULL) { + return -1; + } + + int has_Y = 0, has_m = 0, has_d = 0; + const char* p = csv_ts_format; + while (*p) { + if (*p == '%') { + p++; + switch (*p) { + case 'Y': has_Y = 1; break; + case 'm': has_m = 1; break; + case 'd': has_d = 1; break; + } + } + p++; + } + + if (has_Y == 0 || has_m == 0 || has_d == 0) { + return -1; + } + + return 0; +} + + +static long csvValidateParamTsInterval(const char* csv_ts_interval) { + if (!csv_ts_interval || *csv_ts_interval == '\0') return -1; + + char* endptr; + errno = 0; + const long num = strtol(csv_ts_interval, &endptr, 10); + + if (errno == ERANGE || + endptr == csv_ts_interval || + num <= 0) { + return -1; + } + + if (*endptr == '\0' || + *(endptr + 1) != '\0') { + return -1; + } + + switch (tolower(*endptr)) { + case 's': return num; + case 'm': return num * 60; + case 'h': return num * 60 * 60; + case 'd': return num * 60 * 60 * 24; + default : return -1; + } +} + + +static int csvParseParameter() { + // csv_output_path + size_t len = strlen(g_arguments->output_path); + if (len == 0) { + errorPrint("Failed to generate csv files, the specified output path is empty. Please provide a valid path.\n"); + return -1; + } + if (g_arguments->output_path[len - 1] != '/') { + int n = snprintf(g_arguments->output_path_buf, sizeof(g_arguments->output_path_buf), "%s/", g_arguments->output_path); + if (n < 0 || n >= sizeof(g_arguments->output_path_buf)) { + errorPrint("Failed to generate csv files, path buffer overflow risk when appending '/'. path: %s.\n", + g_arguments->output_path); + return -1; + } + g_arguments->output_path = g_arguments->output_path_buf; + } + + return 0; +} + + +static int csvParseStbParameter(SSuperTable* stb) { + // csv_ts_format + if (stb->csv_ts_format) { + if (csvValidateParamTsFormat(stb->csv_ts_format) != 0) { + errorPrint("Failed to generate csv files, the parameter `csv_ts_format` is invalid. csv_ts_format: %s.\n", + stb->csv_ts_format); + return -1; } } + + // csv_ts_interval + long csv_ts_intv_secs = csvValidateParamTsInterval(stb->csv_ts_interval); + if (csv_ts_intv_secs <= 0) { + errorPrint("Failed to generate csv files, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s.\n", + stb->csv_ts_interval); + return -1; + } + stb->csv_ts_intv_secs = csv_ts_intv_secs; + + return 0; +} + + +static time_t csvAlignTimestamp(time_t seconds, const char* ts_format) { + struct tm aligned_tm; +#ifdef _WIN32 + localtime_s(&aligned_tm, &seconds); +#else + localtime_r(&seconds, &aligned_tm); +#endif + + int has_Y = 0, has_m = 0, has_d = 0, has_H = 0, has_M = 0, has_S = 0; + const char* p = ts_format; + while (*p) { + if (*p == '%') { + p++; + switch (*p) { + case 'Y': has_Y = 1; break; + case 'm': has_m = 1; break; + case 'd': has_d = 1; break; + case 'H': has_H = 1; break; + case 'M': has_M = 1; break; + case 'S': has_S = 1; break; + } + } + p++; + } + + if (!has_S) aligned_tm.tm_sec = 0; + if (!has_M) aligned_tm.tm_min = 0; + if (!has_H) aligned_tm.tm_hour = 0; + if (!has_d) aligned_tm.tm_mday = 1; + if (!has_m) aligned_tm.tm_mon = 0; + if (!has_Y) aligned_tm.tm_year = 0; + + return mktime(&aligned_tm); +} + + +static time_t csvGetStartSeconds(int precision, int64_t start_ts, const char* csv_ts_format) { + time_t start_seconds = 0; + + if (precision == TSDB_TIME_PRECISION_MICRO) { + start_seconds = start_ts / 1000000L; + } else if (precision == TSDB_TIME_PRECISION_NANO) { + start_seconds = start_ts / 1000000000L; + } else { + start_seconds = start_ts / 1000L; + } + return csvAlignTimestamp(start_seconds, csv_ts_format); +} + + +static void csvConvertTime2String(time_t time_value, char* ts_format, char* time_buf, size_t buf_size) { + struct tm tm_result; + char* old_locale = setlocale(LC_TIME, "C"); +#ifdef _WIN32 + localtime_s(&tm_result, &time_value); +#else + localtime_r(&time_value, &tm_result); +#endif + strftime(time_buf, buf_size, ts_format, &tm_result); + if (old_locale) { + setlocale(LC_TIME, old_locale); + } + return; +} + + +static CsvNamingType csvGetFileNamingType(SSuperTable* stb) { + if (stb->interlaceRows > 0) { + if (stb->csv_ts_format) { + return CSV_NAMING_I_TIME_SLICE; + } else { + return CSV_NAMING_I_SINGLE; + } + } else { + if (stb->csv_ts_format) { + return CSV_NAMING_B_THREAD_TIME_SLICE; + } else { + return CSV_NAMING_B_THREAD; + } + } +} + + +static time_t csvCalcTimestampFromSeconds(int precision, time_t secs) { + time_t ts = 0; + + if (precision == TSDB_TIME_PRECISION_MICRO) { + ts = secs * 1000000L; + } else if (precision == TSDB_TIME_PRECISION_NANO) { + ts = secs * 1000000000L; + } else { + ts = secs * 1000L; + } + return ts; +} + + +static void csvCalcTimestampStep(CsvWriteMeta* write_meta) { + write_meta->ts_step = csvCalcTimestampFromSeconds(write_meta->db->precision, write_meta->stb->csv_ts_intv_secs); + return; +} + + +static void csvCalcSliceTimestamp(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) { + thread_meta->start_ts = csvCalcTimestampFromSeconds(write_meta->db->precision, thread_meta->start_secs); + thread_meta->end_ts = csvCalcTimestampFromSeconds(write_meta->db->precision, thread_meta->end_secs); + return; +} + + +static void csvCalcCtbRange(CsvThreadMeta* thread_meta, size_t total_threads, int64_t ctb_offset, int64_t ctb_count) { + uint64_t ctb_start_idx = 0; + uint64_t ctb_end_idx = 0; + size_t tid_idx = thread_meta->thread_id - 1; + size_t base = ctb_count / total_threads; + size_t remainder = ctb_count % total_threads; + + if (tid_idx < remainder) { + ctb_start_idx = ctb_offset + tid_idx * (base + 1); + ctb_end_idx = ctb_start_idx + (base + 1); + } else { + ctb_start_idx = ctb_offset + remainder * (base + 1) + (tid_idx - remainder) * base; + ctb_end_idx = ctb_start_idx + base; + } + + if (ctb_end_idx > ctb_offset + ctb_count) { + ctb_end_idx = ctb_offset + ctb_count; + } + + thread_meta->ctb_start_idx = ctb_start_idx; + thread_meta->ctb_end_idx = ctb_end_idx; + thread_meta->ctb_count = ctb_end_idx - ctb_start_idx; + return; +} + + +static void csvGenThreadFormatter(CsvWriteMeta* write_meta) { + int digits = 0; + + if (write_meta->total_threads == 0) { + digits = 1; + } else { + for (int n = write_meta->total_threads; n > 0; n /= 10) { + digits++; + } + } + + if (digits <= 1) { + (void)snprintf(write_meta->thread_formatter, sizeof(write_meta->thread_formatter), "%%d"); + } else { + (void)snprintf(write_meta->thread_formatter, sizeof(write_meta->thread_formatter), "%%0%dd", digits); + } + return; +} + + +static int csvGenCsvHeader(CsvWriteMeta* write_meta) { + SSuperTable* stb = write_meta->stb; + char* buf = write_meta->csv_header; + int pos = 0; + int size = sizeof(write_meta->csv_header); + + if (!write_meta->stb->csv_output_header) { + return 0; + } + + // ts + pos += snprintf(buf + pos, size - pos, "ts"); + + // columns + for (size_t i = 0; i < stb->cols->size; ++i) { + Field* col = benchArrayGet(stb->cols, i); + pos += snprintf(buf + pos, size - pos, ",%s", col->name); + } + + // tbname + pos += snprintf(buf + pos, size - pos, ",%s", write_meta->stb->csv_tbname_alias); + + // tags + for (size_t i = 0; i < stb->tags->size; ++i) { + Field* tag = benchArrayGet(stb->tags, i); + pos += snprintf(buf + pos, size - pos, ",%s", tag->name); + } + + // line break + pos += snprintf(buf + pos, size - pos, "\n"); + + write_meta->csv_header_length = (pos > 0 && pos < size) ? pos : 0; + return (pos > 0 && pos < size) ? 0 : -1; +} + + +int csvGenCreateDbSql(SDataBase* db, char* buf, int size) { + int pos = 0; + + pos += snprintf(buf + pos, size - pos, "CREATE DATABASE IF NOT EXISTS "); + if (pos <= 0 || pos >= size) return -1; + + pos += snprintf(buf + pos, size - pos, g_arguments->escape_character ? "`%s`" : "%s", db->dbName); + if (pos <= 0 || pos >= size) return -1; + + if (-1 != g_arguments->inputted_vgroups) { + pos += snprintf(buf + pos, size - pos, " VGROUPS %d", g_arguments->inputted_vgroups); + if (pos <= 0 || pos >= size) return -1; + } + + if (db->cfgs) { + for (size_t i = 0; i < db->cfgs->size; ++i) { + SDbCfg* cfg = benchArrayGet(db->cfgs, i); + if (cfg->valuestring) { + pos += snprintf(buf + pos, size - pos, " %s %s", cfg->name, cfg->valuestring); + } else { + pos += snprintf(buf + pos, size - pos, " %s %d", cfg->name, cfg->valueint); + } + if (pos <= 0 || pos >= size) return -1; + } + } + + switch (db->precision) { + case TSDB_TIME_PRECISION_MILLI: + pos += snprintf(buf + pos, size - pos, " PRECISION 'ms';\n"); + break; + case TSDB_TIME_PRECISION_MICRO: + pos += snprintf(buf + pos, size - pos, " PRECISION 'us';\n"); + break; + case TSDB_TIME_PRECISION_NANO: + pos += snprintf(buf + pos, size - pos, " PRECISION 'ns';\n"); + break; + } + + return (pos > 0 && pos < size) ? pos : -1; +} + + +static int csvExportCreateDbSql(CsvWriteMeta* write_meta, FILE* fp) { + char buf[LARGE_BUFF_LEN] = {0}; + int ret = 0; + int length = 0; + + length = csvGenCreateDbSql(write_meta->db, buf, sizeof(buf)); + if (length < 0) { + errorPrint("Failed to generate create db sql, maybe buffer[%zu] not enough.\n", sizeof(buf)); + return -1; + } + + ret = fwrite(buf, 1, length, fp); + if (ret != length) { + errorPrint("Failed to write create db sql: %s. expected written %d but %d.\n", + buf, length, ret); + if (ferror(fp)) { + perror("error"); + } + return -1; + } + + return 0; +} + + +int csvGenCreateStbSql(SDataBase* db, SSuperTable* stb, char* buf, int size) { + int pos = 0; + + pos += snprintf(buf + pos, size - pos, "CREATE TABLE IF NOT EXISTS "); + if (pos <= 0 || pos >= size) return -1; + + pos += snprintf(buf + pos, size - pos, g_arguments->escape_character ? "`%s`.`%s`" : "%s.%s", db->dbName, stb->stbName); + if (pos <= 0 || pos >= size) return -1; + + pos += snprintf(buf + pos, size - pos, " (ts TIMESTAMP"); + if (pos <= 0 || pos >= size) return -1; + + + // columns + for (size_t i = 0; i < stb->cols->size; ++i) { + Field* col = benchArrayGet(stb->cols, i); + + if (col->type == TSDB_DATA_TYPE_BINARY + || col->type == TSDB_DATA_TYPE_NCHAR + || col->type == TSDB_DATA_TYPE_VARBINARY + || col->type == TSDB_DATA_TYPE_GEOMETRY) { + + if (col->type == TSDB_DATA_TYPE_GEOMETRY && col->length < 21) { + errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %zu.\n", __func__, __LINE__, i); + return -1; + } + + pos += snprintf(buf + pos, size - pos, ",%s %s(%d)", col->name, convertDatatypeToString(col->type), col->length); + } else { + pos += snprintf(buf + pos, size - pos, ",%s %s", col->name, convertDatatypeToString(col->type)); + } + if (pos <= 0 || pos >= size) return -1; + + // primary key + if (stb->primary_key && i == 0) { + pos += snprintf(buf + pos, size - pos, " %s", PRIMARY_KEY); + if (pos <= 0 || pos >= size) return -1; + } + + // compress key + if (strlen(col->encode) > 0) { + pos += snprintf(buf + pos, size - pos, " encode '%s'", col->encode); + if (pos <= 0 || pos >= size) return -1; + } + if (strlen(col->compress) > 0) { + pos += snprintf(buf + pos, size - pos, " compress '%s'", col->compress); + if (pos <= 0 || pos >= size) return -1; + } + if (strlen(col->level) > 0) { + pos += snprintf(buf + pos, size - pos, " level '%s'", col->level); + if (pos <= 0 || pos >= size) return -1; + } + } + + pos += snprintf(buf + pos, size - pos, ") TAGS ("); + if (pos <= 0 || pos >= size) return -1; + + + // tags + for (size_t i = 0; i < stb->tags->size; ++i) { + Field* tag = benchArrayGet(stb->tags, i); + + if (i > 0) { + pos += snprintf(buf + pos, size - pos, ","); + if (pos <= 0 || pos >= size) return -1; + } + + if (tag->type == TSDB_DATA_TYPE_BINARY + || tag->type == TSDB_DATA_TYPE_NCHAR + || tag->type == TSDB_DATA_TYPE_VARBINARY + || tag->type == TSDB_DATA_TYPE_GEOMETRY) { + + if (tag->type == TSDB_DATA_TYPE_GEOMETRY && tag->length < 21) { + errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %zu.\n", __func__, __LINE__, i); + return -1; + } + + pos += snprintf(buf + pos, size - pos, "%s %s(%d)", tag->name, convertDatatypeToString(tag->type), tag->length); + + } else { + pos += snprintf(buf + pos, size - pos, "%s %s", tag->name, convertDatatypeToString(tag->type)); + } + if (pos <= 0 || pos >= size) return -1; + } + + pos += snprintf(buf + pos, size - pos, ")"); + if (pos <= 0 || pos >= size) return -1; + + + // comment + if (stb->comment != NULL) { + pos += snprintf(buf + pos, size - pos," COMMENT '%s'", stb->comment); + if (pos <= 0 || pos >= size) return -1; + } + + // delay + if (stb->delay >= 0) { + pos += snprintf(buf + pos, size - pos, " DELAY %d", stb->delay); + if (pos <= 0 || pos >= size) return -1; + } + + // file factor + if (stb->file_factor >= 0) { + pos += snprintf(buf + pos, size - pos, " FILE_FACTOR %f", stb->file_factor / 100.0); + if (pos <= 0 || pos >= size) return -1; + } + + // rollup + if (stb->rollup != NULL) { + pos += snprintf(buf + pos, size - pos, " ROLLUP(%s)", stb->rollup); + if (pos <= 0 || pos >= size) return -1; + } + + // max delay + if (stb->max_delay != NULL) { + pos += snprintf(buf + pos, size - pos, " MAX_DELAY %s", stb->max_delay); + if (pos <= 0 || pos >= size) return -1; + } + + // watermark + if (stb->watermark != NULL) { + pos += snprintf(buf + pos, size - pos, " WATERMARK %s", stb->watermark); + if (pos <= 0 || pos >= size) return -1; + } + + bool first_sma = true; + for (size_t i = 0; i < stb->cols->size; ++i) { + Field* col = benchArrayGet(stb->cols, i); + if (col->sma) { + if (first_sma) { + pos += snprintf(buf + pos, size - pos, " SMA(%s", col->name); + first_sma = false; + } else { + pos += snprintf(buf + pos, size - pos, ",%s", col->name); + } + if (pos <= 0 || pos >= size) return -1; + } + } + if (!first_sma) { + pos += snprintf(buf + pos, size - pos, ")"); + if (pos <= 0 || pos >= size) return -1; + } + + pos += snprintf(buf + pos, size - pos, ";\n"); + if (pos <= 0 || pos >= size) return -1; + + // infoPrint("create stable: <%s>.\n", buf); + return (pos > 0 && pos < size) ? pos : -1; +} + + +static int csvExportCreateStbSql(CsvWriteMeta* write_meta, FILE* fp) { + char buf[4096] = {0}; + int ret = 0; + int length = 0; + + length = csvGenCreateStbSql(write_meta->db, write_meta->stb, buf, sizeof(buf)); + if (length < 0) { + errorPrint("Failed to generate create stb sql, maybe buffer[%zu] not enough.\n", sizeof(buf)); + return -1; + } + + ret = fwrite(buf, 1, length, fp); + if (ret != length) { + errorPrint("Failed to write create stb sql: %s. expected written %d but %d.\n", + buf, length, ret); + if (ferror(fp)) { + perror("error"); + } + return -1; + } + + return 0; +} + + +static int csvExportCreateSql(CsvWriteMeta* write_meta) { + char fullname[MAX_PATH_LEN] = {0}; + int ret = 0; + int length = 0; + FILE* fp = NULL; + + + length = snprintf(fullname, sizeof(fullname), "%s%s.txt", g_arguments->output_path, "create_stmt"); + if (length <= 0 || length >= sizeof(fullname)) { + return -1; + } + + fp = fopen(fullname, "w"); + if (!fp) { + return -1; + } + + + // export db + ret = csvExportCreateDbSql(write_meta, fp); + if (ret < 0) { + goto end; + } + + // export stb + ret = csvExportCreateStbSql(write_meta, fp); + if (ret < 0) { + goto end; + } + + succPrint("Export create sql to file: %s successfully.\n", fullname); + +end: + if (fp) { + fclose(fp); + } + + return ret; +} + + +static int csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* write_meta) { + write_meta->naming_type = csvGetFileNamingType(stb); + write_meta->total_threads = 1; + write_meta->csv_header_length = 0; + write_meta->db = db; + write_meta->stb = stb; + write_meta->start_ts = stb->startTimestamp; + write_meta->end_ts = stb->startTimestamp + stb->timestamp_step * stb->insertRows; + write_meta->ts_step = stb->timestamp_step * stb->insertRows; + write_meta->interlace_step = stb->timestamp_step * stb->interlaceRows; + + int ret = csvGenCsvHeader(write_meta); + if (ret < 0) { + errorPrint("Failed to generate csv header data. database: %s, super table: %s, naming type: %d.\n", + db->dbName, stb->stbName, write_meta->naming_type); + return -1; + } + + switch (write_meta->naming_type) { + case CSV_NAMING_I_SINGLE: { + (void)snprintf(write_meta->mode, sizeof(write_meta->mode), "interlace::normal"); + break; + } + case CSV_NAMING_I_TIME_SLICE: { + (void)snprintf(write_meta->mode, sizeof(write_meta->mode), "interlace::time-slice"); + csvCalcTimestampStep(write_meta); + break; + } + case CSV_NAMING_B_THREAD: { + write_meta->total_threads = MIN(g_arguments->nthreads, stb->childTblCount); + (void)snprintf(write_meta->mode, sizeof(write_meta->mode), "batch[%zu]::normal", write_meta->total_threads); + csvGenThreadFormatter(write_meta); + break; + } + case CSV_NAMING_B_THREAD_TIME_SLICE: { + write_meta->total_threads = MIN(g_arguments->nthreads, stb->childTblCount); + (void)snprintf(write_meta->mode, sizeof(write_meta->mode), "batch[%zu]::time-slice", write_meta->total_threads); + csvGenThreadFormatter(write_meta); + csvCalcTimestampStep(write_meta); + break; + } + default: { + write_meta->naming_type = CSV_NAMING_I_SINGLE; + break; + } + } + + return 0; +} + + +static void csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id, CsvThreadMeta* thread_meta) { + SDataBase* db = write_meta->db; + SSuperTable* stb = write_meta->stb; + + thread_meta->ctb_start_idx = 0; + thread_meta->ctb_end_idx = 0; + thread_meta->ctb_count = 0; + thread_meta->start_secs = 0; + thread_meta->end_secs = 0; + thread_meta->start_ts = write_meta->start_ts; + thread_meta->end_ts = write_meta->end_ts; + thread_meta->thread_id = thread_id; + thread_meta->output_header = false; + thread_meta->tags_buf_size = 0; + thread_meta->tags_buf_array = NULL; + thread_meta->cols_buf = NULL; + + csvCalcCtbRange(thread_meta, write_meta->total_threads, stb->childTblFrom, stb->childTblCount); + + switch (write_meta->naming_type) { + case CSV_NAMING_I_SINGLE: + case CSV_NAMING_B_THREAD: { + break; + } + case CSV_NAMING_I_TIME_SLICE: + case CSV_NAMING_B_THREAD_TIME_SLICE: { + thread_meta->start_secs = csvGetStartSeconds(db->precision, stb->startTimestamp, stb->csv_ts_format); + thread_meta->end_secs = thread_meta->start_secs + write_meta->stb->csv_ts_intv_secs; + csvCalcSliceTimestamp(write_meta, thread_meta); + break; + } + default: { + break; + } + } + + return; +} + + +static void csvUpdateSliceRange(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta, int64_t last_end_ts) { + SDataBase* db = write_meta->db; + SSuperTable* stb = write_meta->stb; + + switch (write_meta->naming_type) { + case CSV_NAMING_I_SINGLE: + case CSV_NAMING_B_THREAD: { + break; + } + case CSV_NAMING_I_TIME_SLICE: + case CSV_NAMING_B_THREAD_TIME_SLICE: { + thread_meta->start_secs = csvGetStartSeconds(db->precision, last_end_ts, stb->csv_ts_format); + thread_meta->end_secs = thread_meta->start_secs + write_meta->stb->csv_ts_intv_secs; + csvCalcSliceTimestamp(write_meta, thread_meta); + break; + } + default: { + break; + } + } + + return; +} + + +static const char* csvGetGzipFilePrefix(CsvCompressionLevel csv_compress_level) { + if (csv_compress_level == CSV_COMPRESS_NONE) { + return ""; + } else { + return ".gz"; + } +} + + +static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta, char* fullname, size_t size) { + char thread_buf[SMALL_BUFF_LEN]; + char start_time_buf[MIDDLE_BUFF_LEN]; + char end_time_buf[MIDDLE_BUFF_LEN]; + int ret = -1; + const char* base_path = g_arguments->output_path; + const char* file_prefix = write_meta->stb->csv_file_prefix; + const char* gzip_suffix = csvGetGzipFilePrefix(write_meta->stb->csv_compress_level); + + switch (write_meta->naming_type) { + case CSV_NAMING_I_SINGLE: { + ret = snprintf(fullname, size, "%s%s.csv%s", base_path, file_prefix, gzip_suffix); + break; + } + case CSV_NAMING_I_TIME_SLICE: { + csvConvertTime2String(thread_meta->start_secs, write_meta->stb->csv_ts_format, start_time_buf, sizeof(start_time_buf)); + csvConvertTime2String(thread_meta->end_secs, write_meta->stb->csv_ts_format, end_time_buf, sizeof(end_time_buf)); + ret = snprintf(fullname, size, "%s%s_%s_%s.csv%s", base_path, file_prefix, start_time_buf, end_time_buf, gzip_suffix); + break; + } + case CSV_NAMING_B_THREAD: { + (void)snprintf(thread_buf, sizeof(thread_buf), write_meta->thread_formatter, thread_meta->thread_id); + ret = snprintf(fullname, size, "%s%s_%s.csv%s", base_path, file_prefix, thread_buf, gzip_suffix); + break; + } + case CSV_NAMING_B_THREAD_TIME_SLICE: { + (void)snprintf(thread_buf, sizeof(thread_buf), write_meta->thread_formatter, thread_meta->thread_id); + csvConvertTime2String(thread_meta->start_secs, write_meta->stb->csv_ts_format, start_time_buf, sizeof(start_time_buf)); + csvConvertTime2String(thread_meta->end_secs, write_meta->stb->csv_ts_format, end_time_buf, sizeof(end_time_buf)); + ret = snprintf(fullname, size, "%s%s_%s_%s_%s.csv%s", base_path, file_prefix, thread_buf, start_time_buf, end_time_buf, gzip_suffix); + break; + } + default: { + ret = -1; + break; + } + } + + return (ret > 0 && (size_t)ret < size) ? 0 : -1; +} + + +static int64_t csvCalcSliceBatchTimestamp(CsvWriteMeta* write_meta, int64_t slice_cur_ts, int64_t slice_end_ts) { + int64_t slice_batch_ts = 0; + + switch (write_meta->naming_type) { + case CSV_NAMING_I_SINGLE: + case CSV_NAMING_I_TIME_SLICE: { + slice_batch_ts = MIN(slice_cur_ts + write_meta->interlace_step, slice_end_ts); + break; + } + case CSV_NAMING_B_THREAD: + case CSV_NAMING_B_THREAD_TIME_SLICE: { + slice_batch_ts = slice_end_ts; + break; + } + default: { + break; + } + } + + return slice_batch_ts; +} + + +static int csvGenRowFields(char* buf, int size, SSuperTable* stb, int fields_cate, int64_t* k) { + int pos = 0; + BArray* fields = NULL; + int16_t field_count = 0; + char* binanry_prefix = stb->binaryPrefex ? stb->binaryPrefex : ""; + char* nchar_prefix = stb->ncharPrefex ? stb->ncharPrefex : ""; + + if (!buf || !stb || !k || size <= 0) { + return -1; + } + + if (fields_cate == GEN_ROW_FIELDS_TAG) { + fields = stb->tags; + field_count = stb->tags->size; + } else { + fields = stb->cols; + field_count = stb->cols->size; + } + + for (uint16_t i = 0; i < field_count; ++i) { + Field* field = benchArrayGet(fields, i); + char* prefix = ""; + if(field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY) { + prefix = binanry_prefix; + } else if(field->type == TSDB_DATA_TYPE_NCHAR) { + prefix = nchar_prefix; + } + pos += dataGenByField(field, buf, pos, prefix, k, ""); + } + + return pos; +} + + +static int csvGenRowTagData(char* buf, int size, SSuperTable* stb, int64_t index, int64_t* k) { + if (!buf || !stb || !k || size <= 0) { + return -1; + } + + // tbname + int pos = snprintf(buf, size, ",'%s%"PRId64"'", stb->childTblPrefix, index); + + // tags + pos += csvGenRowFields(buf + pos, size - pos, stb, GEN_ROW_FIELDS_TAG, k); + + return (pos > 0 && pos < size) ? pos : -1; +} + + +static int csvGenRowColData(char* buf, int size, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k) { + char ts_fmt[128] = {0}; + toolsFormatTimestamp(ts_fmt, ts, precision); + int pos = snprintf(buf, size, "\'%s\'", ts_fmt); + + // columns + pos += csvGenRowFields(buf + pos, size - pos, stb, GEN_ROW_FIELDS_COL, k); + return (pos > 0 && pos < size) ? pos : -1; +} + + +static void csvFreeCtbTagData(CsvThreadMeta* thread_meta, CsvRowTagsBuf* tags_buf_array) { + if (!thread_meta || !tags_buf_array) { + return; + } + + for (uint64_t i = 0 ; i < thread_meta->ctb_count; ++i) { + char* tags_buf = tags_buf_array[i].buf; + if (tags_buf) { + tmfree(tags_buf); + } else { + break; + } + } + tmfree(tags_buf_array); + return; +} + + +static CsvRowTagsBuf* csvGenCtbTagData(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) { + SSuperTable* stb = write_meta->stb; + int ret = 0; + int64_t tk = 0; + + if (!write_meta || !thread_meta) { + return NULL; + } + + CsvRowTagsBuf* tags_buf_array = (CsvRowTagsBuf*)benchCalloc(thread_meta->ctb_count, sizeof(CsvRowTagsBuf), true); + if (!tags_buf_array) { + return NULL; + } + + char* tags_buf = NULL; + int tags_buf_size = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->tags->size; + for (uint64_t i = 0; i < thread_meta->ctb_count; ++i) { + tags_buf = benchCalloc(1, tags_buf_size, true); + if (!tags_buf) { + goto error; + } + + tags_buf_array[i].buf = tags_buf; + + ret = csvGenRowTagData(tags_buf, tags_buf_size, stb, thread_meta->ctb_start_idx + i, &tk); + if (ret <= 0) { + goto error; + } + + tags_buf_array[i].length = ret; + } + thread_meta->tags_buf_size = tags_buf_size; + + return tags_buf_array; + +error: + csvFreeCtbTagData(thread_meta, tags_buf_array); return NULL; } -int csvTestProcess() { - pthread_t handle; - int ret = pthread_create(&handle, NULL, csvWriteThread, NULL); - if (ret != 0) { - errorPrint("pthread_create failed. error code =%d \n", ret); - return -1; + +static CsvFileHandle* csvOpen(const char* filename, CsvCompressionLevel compress_level) { + CsvFileHandle* fhdl = NULL; + bool failed = false; + + fhdl = (CsvFileHandle*)benchCalloc(1, sizeof(CsvFileHandle), false); + if (!fhdl) { + errorPrint("Failed to malloc csv file handle. filename: %s, compress level: %d.\n", + filename, compress_level); + return NULL; } - infoPrint("start output to csv %s ...\n", g_arguments->csvPath); - int64_t start = toolsGetTimestampMs(); - pthread_join(handle, NULL); - int64_t delay = toolsGetTimestampMs() - start; - infoPrint("output to csv %s finished. delay:%"PRId64"s \n", g_arguments->csvPath, delay/1000); - - return 0; -} - -int genWithSTable(SDataBase* db, SSuperTable* stb, char* outDir) { - // filename - int ret = 0; - char outFile[MAX_FILE_NAME_LEN] = {0}; - obtainCsvFile(outFile, db, stb, outDir); - FILE * fs = fopen(outFile, "w"); - if(fs == NULL) { - errorPrint("failed create csv file. file=%s, last errno=%d strerror=%s \n", outFile, errno, strerror(errno)); - return -1; - } - - int rowLen = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->lenOfCols + stb->tags->size + stb->cols->size; - int bufLen = rowLen * g_arguments->reqPerReq; - char* buf = benchCalloc(1, bufLen, true); - - infoPrint("start write csv file: %s \n", outFile); - - if (stb->interlaceRows > 0) { - // interlace mode - ret = interlaceWriteCsv(db, stb, fs, buf, bufLen, rowLen * 2); + if (compress_level == CSV_COMPRESS_NONE) { + fhdl->handle.fp = fopen(filename, "w"); + failed = (!fhdl->handle.fp); } else { - // batch mode - ret = batchWriteCsv(db, stb, fs, buf, bufLen, rowLen * 2); + char mode[TINY_BUFF_LEN]; + (void)snprintf(mode, sizeof(mode), "wb%d", compress_level); + fhdl->handle.gf = gzopen(filename, mode); + failed = (!fhdl->handle.gf); } - tmfree(buf); - fclose(fs); - - succPrint("end write csv file: %s \n", outFile); - return ret; -} - - -void obtainCsvFile(char * outFile, SDataBase* db, SSuperTable* stb, char* outDir) { - sprintf(outFile, "%s%s-%s.csv", outDir, db->dbName, stb->stbName); -} - -int32_t writeCsvFile(FILE* f, char * buf, int32_t len) { - size_t size = fwrite(buf, 1, len, f); - if(size != len) { - errorPrint("failed to write csv file. expect write length:%d real write length:%d \n", len, (int32_t)size); - return -1; + if (failed) { + tmfree(fhdl); + errorPrint("Failed to open csv file handle. filename: %s, compress level: %d.\n", + filename, compress_level); + return NULL; + } else { + fhdl->filename = filename; + fhdl->compress_level = compress_level; + fhdl->result = CSV_ERR_OK; + return fhdl; } - return 0; } -int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain) { - int ret = 0; - int pos = 0; - int64_t tk = 0; - int64_t show = 0; - int tagDataLen = stb->lenOfTags + stb->tags->size + 256; - char * tagData = (char *) benchCalloc(1, tagDataLen, true); - int colDataLen = stb->lenOfCols + stb->cols->size + 256; - char * colData = (char *) benchCalloc(1, colDataLen, true); - - // gen child name - for (int64_t i = 0; i < stb->childTblCount; i++) { - int64_t ts = stb->startTimestamp; - int64_t ck = 0; - // tags - genTagData(tagData, stb, i, &tk); - // insert child column data - for(int64_t j = 0; j < stb->insertRows; j++) { - genColumnData(colData, stb, ts, db->precision, &ck); - // combine - pos += sprintf(buf + pos, "%s,%s\n", tagData, colData); - if (bufLen - pos < minRemain) { - // submit - ret = writeCsvFile(fs, buf, pos); - if (ret != 0) { - goto END; - } - - pos = 0; +static CsvIoError csvWrite(CsvFileHandle* fhdl, const char* buf, size_t size) { + if (fhdl->compress_level == CSV_COMPRESS_NONE) { + size_t ret = fwrite(buf, 1, size, fhdl->handle.fp); + if (ret != size) { + errorPrint("Failed to write csv file: %s. expected written %zu but %zu.\n", + fhdl->filename, size, ret); + if (ferror(fhdl->handle.fp)) { + perror("error"); } - - // ts move next - ts += stb->timestamp_step; - - // check cancel - if(g_arguments->terminate) { - infoPrint("%s", "You are cancel, exiting ...\n"); - ret = -1; - goto END; - } - - // print show - if (++show % SHOW_CNT == 0) { - infoPrint("batch write child table cnt = %"PRId64 " all rows = %" PRId64 "\n", i+1, show); - } - + fhdl->result = CSV_ERR_WRITE_FAILED; + return CSV_ERR_WRITE_FAILED; + } + } else { + int ret = gzwrite(fhdl->handle.gf, buf, size); + if (ret != size) { + errorPrint("Failed to write csv file: %s. expected written %zu but %d.\n", + fhdl->filename, size, ret); + int errnum; + const char* errmsg = gzerror(fhdl->handle.gf, &errnum); + errorPrint("gzwrite error: %s\n", errmsg); + fhdl->result = CSV_ERR_WRITE_FAILED; + return CSV_ERR_WRITE_FAILED; } } - if (pos > 0) { - ret = writeCsvFile(fs, buf, pos); - pos = 0; - } - -END: - // free - tmfree(tagData); - tmfree(colData); - return ret; + return CSV_ERR_OK; } -int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain) { - int ret = 0; - int pos = 0; - int64_t n = 0; // already inserted rows for one child table - int64_t tk = 0; - int64_t show = 0; - char **tagDatas = (char **)benchCalloc(stb->childTblCount, sizeof(char *), true); - int colDataLen = stb->lenOfCols + stb->cols->size + 256; - char * colData = (char *) benchCalloc(1, colDataLen, true); - int64_t last_ts = stb->startTimestamp; - - while (n < stb->insertRows ) { - for (int64_t i = 0; i < stb->childTblCount; i++) { - // start one table - int64_t ts = last_ts; - int64_t ck = 0; - // tags - if (tagDatas[i] == NULL) { - tagDatas[i] = genTagData(NULL, stb, i, &tk); +static void csvClose(CsvFileHandle* fhdl) { + if (!fhdl) { + return; + } + + if (fhdl->compress_level == CSV_COMPRESS_NONE) { + if (fhdl->handle.fp) { + fclose(fhdl->handle.fp); + fhdl->handle.fp = NULL; + } + } else { + if (fhdl->handle.gf) { + gzclose(fhdl->handle.gf); + fhdl->handle.gf = NULL; + } + } + + tmfree(fhdl); +} + + +static int csvWriteFile(CsvFileHandle* fhdl, uint64_t ctb_idx, int64_t cur_ts, int64_t* ck, CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) { + SDataBase* db = write_meta->db; + SSuperTable* stb = write_meta->stb; + CsvRowTagsBuf* tags_buf_array = thread_meta->tags_buf_array; + CsvRowTagsBuf* tags_buf = &tags_buf_array[ctb_idx]; + CsvRowColsBuf* cols_buf = thread_meta->cols_buf; + int ret = 0; + + + ret = csvGenRowColData(cols_buf->buf, cols_buf->buf_size, stb, cur_ts, db->precision, ck); + if (ret <= 0) { + errorPrint("Failed to generate csv column data. database: %s, super table: %s, naming type: %d, thread index: %zu, ctb index: %" PRIu64 ".\n", + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx); + return -1; + } + + cols_buf->length = ret; + + // write header + if (thread_meta->output_header) { + ret = csvWrite(fhdl, write_meta->csv_header, write_meta->csv_header_length); + if (ret != CSV_ERR_OK) { + errorPrint("Failed to write csv header data. database: %s, super table: %s, naming type: %d, thread index: %zu, ctb index: %" PRIu64 ".\n", + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx); + return -1; + } + + thread_meta->output_header = false; + } + + // write columns + ret = csvWrite(fhdl, cols_buf->buf, cols_buf->length); + if (ret != CSV_ERR_OK) { + errorPrint("Failed to write csv column data. database: %s, super table: %s, naming type: %d, thread index: %zu, ctb index: %" PRIu64 ".\n", + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx); + return -1; + } + + // write tags + ret = csvWrite(fhdl, tags_buf->buf, tags_buf->length); + if (ret != CSV_ERR_OK) { + errorPrint("Failed to write csv tag data. database: %s, super table: %s, naming type: %d, thread index: %zu, ctb index: %" PRIu64 ".\n", + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx); + return -1; + } + + // write line break + ret = csvWrite(fhdl, "\n", 1); + if (ret != CSV_ERR_OK) { + errorPrint("Failed to write csv line break data. database: %s, super table: %s, naming type: %d, thread index: %zu, ctb index: %" PRIu64 ".\n", + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx); + return -1; + } + + return 0; +} + + +static void* csvGenStbThread(void* arg) { + CsvThreadArgs* thread_arg = (CsvThreadArgs*)arg; + CsvWriteMeta* write_meta = thread_arg->write_meta; + CsvThreadMeta* thread_meta = &thread_arg->thread_meta; + SDataBase* db = write_meta->db; + SSuperTable* stb = write_meta->stb; + + int64_t cur_ts = 0; + int64_t slice_cur_ts = 0; + int64_t slice_end_ts = 0; + int64_t slice_batch_ts = 0; + int64_t slice_ctb_cur_ts = 0; + int64_t ck = 0; + uint64_t ctb_idx = 0; + int ret = 0; + CsvFileHandle* fhdl = NULL; + char fullname[MAX_PATH_LEN] = {0}; + + uint64_t total_rows = 0; + uint64_t pre_total_rows = 0; + uint64_t file_rows = 0; + int64_t start_print_ts = 0; + int64_t pre_print_ts = 0; + int64_t cur_print_ts = 0; + int64_t print_ts_elapse = 0; + + + // tags buffer + CsvRowTagsBuf* tags_buf_array = csvGenCtbTagData(write_meta, thread_meta); + if (!tags_buf_array) { + errorPrint("Failed to generate csv tag data. database: %s, super table: %s, naming type: %d, thread index: %zu.\n", + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id); + return NULL; + } + + // column buffer + int buf_size = stb->lenOfCols + stb->cols->size; + char* buf = (char*)benchCalloc(1, buf_size, true); + if (!buf) { + errorPrint("Failed to malloc csv column buffer. database: %s, super table: %s, naming type: %d, thread index: %zu.\n", + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id); + goto end; + } + + CsvRowColsBuf cols_buf = { + .buf = buf, + .buf_size = buf_size, + .length = 0 + }; + + thread_meta->tags_buf_array = tags_buf_array; + thread_meta->cols_buf = &cols_buf; + start_print_ts = toolsGetTimestampMs(); + + cur_ts = write_meta->start_ts; + while (cur_ts < write_meta->end_ts) { + // get filename + ret = csvGetFileFullname(write_meta, thread_meta, fullname, sizeof(fullname)); + if (ret < 0) { + errorPrint("Failed to generate csv filename. database: %s, super table: %s, naming type: %d, thread index: %zu.\n", + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id); + goto end; + } + + // create fd + fhdl = csvOpen(fullname, stb->csv_compress_level); + if (fhdl == NULL) { + errorPrint("Failed to create csv file. thread index: %zu, file: %s, errno: %d, strerror: %s.\n", + thread_meta->thread_id, fullname, errno, strerror(errno)); + goto end; + } + + + thread_meta->output_header = stb->csv_output_header; + slice_cur_ts = cur_ts; + slice_end_ts = MIN(thread_meta->end_ts, write_meta->end_ts); + file_rows = 0; + pre_print_ts = toolsGetTimestampMs(); + + infoPrint("thread[%zu] begin to write csv file: %s.\n", thread_meta->thread_id, fullname); + + // write data + while (slice_cur_ts < slice_end_ts) { + slice_batch_ts = csvCalcSliceBatchTimestamp(write_meta, slice_cur_ts, slice_end_ts); + + for (ctb_idx = 0; ctb_idx < thread_meta->ctb_count; ++ctb_idx) { + for (slice_ctb_cur_ts = slice_cur_ts; slice_ctb_cur_ts < slice_batch_ts; slice_ctb_cur_ts += write_meta->stb->timestamp_step) { + ret = csvWriteFile(fhdl, ctb_idx, slice_ctb_cur_ts, &ck, write_meta, thread_meta); + if (ret) { + errorPrint("Failed to write csv file. thread index: %zu, file: %s, errno: %d, strerror: %s.\n", + thread_meta->thread_id, fullname, errno, strerror(errno)); + csvClose(fhdl); + goto end; + } + + ck += 1; + total_rows += 1; + file_rows += 1; + + cur_print_ts = toolsGetTimestampMs(); + print_ts_elapse = cur_print_ts - pre_print_ts; + if (print_ts_elapse > 30000) { + infoPrint("thread[%zu] has currently inserted rows: %" PRIu64 ", period insert rate: %.2f rows/s.\n", + thread_meta->thread_id, total_rows, (total_rows - pre_total_rows) * 1000.0 / print_ts_elapse); + + pre_print_ts = cur_print_ts; + pre_total_rows = total_rows; + } + + + if (g_arguments->terminate) { + csvClose(fhdl); + goto end; + } + } } - // calc need insert rows - int64_t needInserts = stb->interlaceRows; - if(needInserts > stb->insertRows - n) { - needInserts = stb->insertRows - n; - } + slice_cur_ts = slice_batch_ts; + } - for (int64_t j = 0; j < needInserts; j++) { - genColumnData(colData, stb, ts, db->precision, &ck); - // combine tags,cols - pos += sprintf(buf + pos, "%s,%s\n", tagDatas[i], colData); - if (bufLen - pos < minRemain) { - // submit - ret = writeCsvFile(fs, buf, pos); - if (ret != 0) { - goto END; - } - pos = 0; + csvClose(fhdl); + cur_ts = thread_meta->end_ts; + csvUpdateSliceRange(write_meta, thread_meta, slice_end_ts); + } + + cur_print_ts = toolsGetTimestampMs(); + print_ts_elapse = cur_print_ts - start_print_ts; + + succPrint("thread [%zu] has completed inserting rows: %" PRIu64 ", insert rate %.2f rows/s.\n", + thread_meta->thread_id, total_rows, total_rows * 1000.0 / print_ts_elapse); + +end: + thread_meta->total_rows = total_rows; + csvFreeCtbTagData(thread_meta, tags_buf_array); + tmfree(buf); + return NULL; +} + + +static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) { + int ret = 0; + bool prompt = true; + uint64_t total_rows = 0; + int64_t start_ts = 0; + int64_t ts_elapse = 0; + + CsvWriteMeta* write_meta = NULL; + CsvThreadArgs* args = NULL; + pthread_t* pids = NULL; + + + write_meta = benchCalloc(1, sizeof(CsvWriteMeta), false); + if (!write_meta) { + ret = -1; + goto end; + } + + ret = csvInitWriteMeta(db, stb, write_meta); + if (ret < 0) { + ret = -1; + goto end; + } + + infoPrint("export csv mode: %s.\n", write_meta->mode); + + args = benchCalloc(write_meta->total_threads, sizeof(CsvThreadArgs), false); + if (!args) { + ret = -1; + goto end; + } + + pids = benchCalloc(write_meta->total_threads, sizeof(pthread_t), false); + if (!pids) { + ret = -1; + goto end; + } + + start_ts = toolsGetTimestampMs(); + for (uint32_t i = 0; (i < write_meta->total_threads && !g_arguments->terminate); ++i) { + CsvThreadArgs* arg = &args[i]; + arg->write_meta = write_meta; + csvInitThreadMeta(write_meta, i + 1, &arg->thread_meta); + + ret = pthread_create(&pids[i], NULL, csvGenStbThread, arg); + if (ret) { + perror("Failed to create thread"); + goto end; + } + } + + // wait threads + for (uint32_t i = 0; i < write_meta->total_threads; ++i) { + if (g_arguments->terminate && prompt) { + infoPrint("Operation cancelled by user, exiting gracefully...\n"); + prompt = false; + } + + infoPrint("pthread_join %u ...\n", i); + pthread_join(pids[i], NULL); + } + + // statistics + total_rows = 0; + for (uint32_t i = 0; i < write_meta->total_threads; ++i) { + CsvThreadArgs* arg = &args[i]; + CsvThreadMeta* thread_meta = &arg->thread_meta; + total_rows += thread_meta->total_rows; + } + + ts_elapse = toolsGetTimestampMs() - start_ts; + if (ts_elapse > 0) { + succPrint("Spent %.6f seconds to insert rows: %" PRIu64 " with %zu thread(s) into %s, at a rate of %.2f rows/s.\n", + ts_elapse / 1000.0, total_rows, write_meta->total_threads, g_arguments->output_path, total_rows * 1000.0 / ts_elapse); + } + + // export create db/stb sql + ret = csvExportCreateSql(write_meta); + +end: + tmfree(pids); + tmfree(args); + tmfree(write_meta); + return ret; +} + + +static void csvGenPrepare(SDataBase* db, SSuperTable* stb) { + stb->lenOfTags = accumulateRowLen(stb->tags, stb->iface); + stb->lenOfCols = accumulateRowLen(stb->cols, stb->iface); + + if (stb->childTblTo) { + stb->childTblCount = stb->childTblTo - stb->childTblFrom; + } + + return; +} + + +static int csvGenStb(SDataBase* db, SSuperTable* stb) { + // prepare + csvGenPrepare(db, stb); + + return csvGenStbProcess(db, stb); +} + + +static int csvWriteThread() { + for (size_t i = 0; i < g_arguments->databases->size && !g_arguments->terminate; ++i) { + // database + SDataBase* db = benchArrayGet(g_arguments->databases, i); + if (db->superTbls) { + for (size_t j = 0; j < db->superTbls->size && !g_arguments->terminate; ++j) { + // stb + SSuperTable* stb = benchArrayGet(db->superTbls, j); + if (stb->insertRows == 0) { + continue; } - // ts move next - ts += stb->timestamp_step; - - // check cancel - if(g_arguments->terminate) { - infoPrint("%s", "You are cancel, exiting ... \n"); - ret = -1; - goto END; + // parsing parameters + int ret = csvParseStbParameter(stb); + if (ret != 0) { + errorPrint("Failed to parse csv parameter. database: %s, super table: %s, error code: %d.\n", + db->dbName, stb->stbName, ret); + return -1; } - // print show - if (++show % SHOW_CNT == 0) { - infoPrint("interlace write child table index = %"PRId64 " all rows = %"PRId64 "\n", i+1, show); + // gen csv + ret = csvGenStb(db, stb); + if(ret != 0) { + errorPrint("Failed to generate csv files. database: %s, super table: %s, error code: %d.\n", + db->dbName, stb->stbName, ret); + return -1; } } - - // if last child table - if (i + 1 == stb->childTblCount ) { - n += needInserts; - last_ts = ts; - } } } - if (pos > 0) { - ret = writeCsvFile(fs, buf, pos); - pos = 0; - } - -END: - // free - for (int64_t m = 0 ; m < stb->childTblCount; m ++) { - tmfree(tagDatas[m]); - } - tmfree(tagDatas); - tmfree(colData); - return ret; -} - -// gen tag data -char * genTagData(char* buf, SSuperTable* stb, int64_t i, int64_t *k) { - // malloc - char* tagData; - if (buf == NULL) { - int tagDataLen = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->tags->size + 32; - tagData = benchCalloc(1, tagDataLen, true); - } else { - tagData = buf; - } - - int pos = 0; - // tbname - pos += sprintf(tagData, "\'%s%"PRId64"\'", stb->childTblPrefix, i); - // tags - pos += genRowByField(tagData + pos, stb->tags, stb->tags->size, stb->binaryPrefex, stb->ncharPrefex, k); - - return tagData; -} - -// gen column data -char * genColumnData(char* colData, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k) { - char szTime[128] = {0}; - toolsFormatTimestamp(szTime, ts, precision); - int pos = sprintf(colData, "\'%s\'", szTime); - - // columns - genRowByField(colData + pos, stb->cols, stb->cols->size, stb->binaryPrefex, stb->ncharPrefex, k); - return colData; + return 0; } -int32_t genRowByField(char* buf, BArray* fields, int16_t fieldCnt, char* binanryPrefix, char* ncharPrefix, int64_t *k) { - - // other cols data - int32_t pos1 = 0; - for(uint16_t i = 0; i < fieldCnt; i++) { - Field* fd = benchArrayGet(fields, i); - char* prefix = ""; - if(fd->type == TSDB_DATA_TYPE_BINARY || fd->type == TSDB_DATA_TYPE_VARBINARY) { - if(binanryPrefix) { - prefix = binanryPrefix; - } - } else if(fd->type == TSDB_DATA_TYPE_NCHAR) { - if(ncharPrefix) { - prefix = ncharPrefix; - } +int csvTestProcess() { + // parsing parameters + if (csvParseParameter() != 0) { + return -1; } - pos1 += dataGenByField(fd, buf, pos1, prefix, k, ""); - } - - return pos1; + infoPrint("Starting to output data to csv files in directory: %s ...\n", g_arguments->output_path); + int64_t start = toolsGetTimestampMs(); + int ret = csvWriteThread(); + if (ret != 0) { + return -1; + } + int64_t elapse = toolsGetTimestampMs() - start; + infoPrint("Generating csv files in directory: %s has been completed. Time elapsed: %.3f seconds\n", + g_arguments->output_path, elapse / 1000.0); + return 0; } diff --git a/tools/taos-tools/src/benchJsonOpt.c b/tools/taos-tools/src/benchJsonOpt.c index 4791385741..5b992b388e 100644 --- a/tools/taos-tools/src/benchJsonOpt.c +++ b/tools/taos-tools/src/benchJsonOpt.c @@ -1405,6 +1405,65 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) { } } } + + // csv file prefix + tools_cJSON* csv_fp = tools_cJSON_GetObjectItem(stbInfo, "csv_file_prefix"); + if (csv_fp && csv_fp->type == tools_cJSON_String && csv_fp->valuestring != NULL) { + superTable->csv_file_prefix = csv_fp->valuestring; + } else { + superTable->csv_file_prefix = "data"; + } + + // csv timestamp format + tools_cJSON* csv_tf = tools_cJSON_GetObjectItem(stbInfo, "csv_ts_format"); + if (csv_tf && csv_tf->type == tools_cJSON_String && csv_tf->valuestring != NULL) { + superTable->csv_ts_format = csv_tf->valuestring; + } else { + superTable->csv_ts_format = NULL; + } + + // csv timestamp format + tools_cJSON* csv_ti = tools_cJSON_GetObjectItem(stbInfo, "csv_ts_interval"); + if (csv_ti && csv_ti->type == tools_cJSON_String && csv_ti->valuestring != NULL) { + superTable->csv_ts_interval = csv_ti->valuestring; + } else { + superTable->csv_ts_interval = "1d"; + } + + // csv output header + superTable->csv_output_header = true; + tools_cJSON* oph = tools_cJSON_GetObjectItem(stbInfo, "csv_output_header"); + if (oph && oph->type == tools_cJSON_String && oph->valuestring != NULL) { + if (0 == strcasecmp(oph->valuestring, "yes")) { + superTable->csv_output_header = true; + } else if (0 == strcasecmp(oph->valuestring, "no")) { + superTable->csv_output_header = false; + } + } + + // csv tbname alias + tools_cJSON* tba = tools_cJSON_GetObjectItem(stbInfo, "csv_tbname_alias"); + if (tba && tba->type == tools_cJSON_String && tba->valuestring != NULL) { + superTable->csv_tbname_alias = tba->valuestring; + } else { + superTable->csv_tbname_alias = "device_id"; + } + + // csv compression level + tools_cJSON* cl = tools_cJSON_GetObjectItem(stbInfo, "csv_compress_level"); + if (cl && cl->type == tools_cJSON_String && cl->valuestring != NULL) { + if (0 == strcasecmp(cl->valuestring, "none")) { + superTable->csv_compress_level = CSV_COMPRESS_NONE; + } else if (0 == strcasecmp(cl->valuestring, "fast")) { + superTable->csv_compress_level = CSV_COMPRESS_FAST; + } else if (0 == strcasecmp(cl->valuestring, "balance")) { + superTable->csv_compress_level = CSV_COMPRESS_BALANCE; + } else if (0 == strcasecmp(cl->valuestring, "best")) { + superTable->csv_compress_level = CSV_COMPRESS_BEST; + } + } else { + superTable->csv_compress_level = CSV_COMPRESS_NONE; + } } return 0; } @@ -1586,26 +1645,14 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) { } } - g_arguments->csvPath[0] = 0; - tools_cJSON *csv = tools_cJSON_GetObjectItem(json, "csvPath"); - if (csv && (csv->type == tools_cJSON_String) - && (csv->valuestring != NULL)) { - tstrncpy(g_arguments->csvPath, csv->valuestring, MAX_FILE_NAME_LEN); - } - - size_t len = strlen(g_arguments->csvPath); - - if(len == 0) { - // set default with current path - strcpy(g_arguments->csvPath, "./output/"); - mkdir(g_arguments->csvPath, 0775); + // output dir + tools_cJSON* opp = tools_cJSON_GetObjectItem(json, "output_dir"); + if (opp && opp->type == tools_cJSON_String && opp->valuestring != NULL) { + g_arguments->output_path = opp->valuestring; } else { - // append end - if (g_arguments->csvPath[len-1] != '/' ) { - strcat(g_arguments->csvPath, "/"); - } - mkdir(g_arguments->csvPath, 0775); + g_arguments->output_path = "./output/"; } + (void)mkdir(g_arguments->output_path, 0775); code = 0; return code; diff --git a/tools/taos-tools/src/benchMain.c b/tools/taos-tools/src/benchMain.c index 86ad795d05..e82da29468 100644 --- a/tools/taos-tools/src/benchMain.c +++ b/tools/taos-tools/src/benchMain.c @@ -153,7 +153,7 @@ int main(int argc, char* argv[]) { } } else if (g_arguments->test_mode == CSVFILE_TEST) { if (csvTestProcess()) { - errorPrint("%s", "query test process failed\n"); + errorPrint("%s", "generate csv process failed\n"); ret = -1; } } else if (g_arguments->test_mode == QUERY_TEST) {