diff --git a/tools/taos-tools/inc/bench.h b/tools/taos-tools/inc/bench.h index 4dd19d83b9..4f41abb903 100644 --- a/tools/taos-tools/inc/bench.h +++ b/tools/taos-tools/inc/bench.h @@ -783,6 +783,7 @@ typedef struct SArguments_S { bool mistMode; bool escape_character; bool pre_load_tb_meta; + bool bind_vgroup; char* output_path; char output_path_buf[MAX_PATH_LEN]; @@ -792,8 +793,8 @@ typedef struct SArguments_S { long csv_ts_intv_secs; bool csv_output_header; bool csv_tbname_alias; + CsvCompressionLevel csv_compress_level; - bool bind_vgroup; } SArguments; typedef struct SBenchConn { diff --git a/tools/taos-tools/inc/benchCsv.h b/tools/taos-tools/inc/benchCsv.h index c522a12c50..2db2ec324e 100644 --- a/tools/taos-tools/inc/benchCsv.h +++ b/tools/taos-tools/inc/benchCsv.h @@ -16,7 +16,8 @@ #ifndef INC_BENCHCSV_H_ #define INC_BENCHCSV_H_ -#include +#include +#include "bench.h" typedef enum { @@ -26,6 +27,29 @@ typedef enum { CSV_NAMING_B_THREAD_TIME_SLICE } CsvNamingType; +typedef enum { + CSV_COMPRESS_NONE = 0, + CSV_COMPRESS_FAST = 1, + CSV_COMPRESS_BALANCE = 6, + CSV_COMPRESS_BEST = 9 +} CsvCompressionLevel; + +typedef enum { + CSV_ERR_OK = 0, + CSV_ERR_OPEN_FAILED, + CSV_ERR_WRITE_FAILED +} CsvIoError; + +typedef struct { + const char* filename; + CsvCompressionLevel compress_level; + CsvIoError result; + union { + gzFile gf; + FILE* fp; + } handle; +} CsvFileHandle; + typedef struct { char* buf; int length; diff --git a/tools/taos-tools/src/benchCsv.c b/tools/taos-tools/src/benchCsv.c index f1aacaec66..97a1a74c0f 100644 --- a/tools/taos-tools/src/benchCsv.c +++ b/tools/taos-tools/src/benchCsv.c @@ -280,6 +280,15 @@ static void csvUpdateSliceRange(CsvWriteMeta* write_meta, CsvThreadMeta* thread_ } +static const char* csvGetGzipFilePrefix() { + if (g_arguments->csv_compress_level == CSV_COMPRESS_NONE) { + return ""; + } else { + return ".gz" + } +} + + static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta, char* fullname, size_t size) { char thread_buf[SMALL_BUFF_LEN]; char start_time_buf[MIDDLE_BUFF_LEN]; @@ -287,28 +296,29 @@ static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_me int ret = -1; const char* base_path = g_arguments->output_path; const char* file_prefix = g_arguments->csv_file_prefix; + const char* gzip_suffix = csvGetGzipFilePrefix(); switch (meta->naming_type) { case CSV_NAMING_I_SINGLE: { - ret = snprintf(fullname, size, "%s%s.csv", base_path, file_prefix); + ret = snprintf(fullname, size, "%s%s.csv%s", base_path, file_prefix, g_arguments->csv_compress_level, gzip_suffix); break; } case CSV_NAMING_I_TIME_SLICE: { csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf)); csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf)); - ret = snprintf(fullname, size, "%s%s_%s_%s.csv", base_path, file_prefix, start_time_buf, end_time_buf); + ret = snprintf(fullname, size, "%s%s_%s_%s.csv%s", base_path, file_prefix, start_time_buf, end_time_buf, gzip_suffix); break; } case CSV_NAMING_B_THREAD: { (void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id); - ret = snprintf(fullname, size, "%s%s_%s.csv", base_path, file_prefix, thread_buf); + ret = snprintf(fullname, size, "%s%s_%s.csv%s", base_path, file_prefix, thread_buf, gzip_suffix); break; } case CSV_NAMING_B_THREAD_TIME_SLICE: { (void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id); csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf)); csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf)); - ret = snprintf(fullname, size, "%s%s_%s_%s_%s.csv", base_path, file_prefix, thread_buf, start_time_buf, end_time_buf); + ret = snprintf(fullname, size, "%s%s_%s_%s_%s.csv%s", base_path, file_prefix, thread_buf, start_time_buf, end_time_buf, gzip_suffix); break; } default: { @@ -463,14 +473,91 @@ static void csvFreeCtbTagData(CsvThreadMeta* thread_meta, CsvRowTagsBuf* tags_bu } -static int csvWriteFile(FILE* fp, uint64_t ctb_idx, int64_t cur_ts, int64_t* ck, CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) { +static CsvFileHandle* csvOpen(const char* filename, CsvCompressionLevel compress_level) { + CsvFileHandle* fhdl = NULL; + bool failed = false; + + fhdl = (CsvFileHandle*)benchCalloc(1, sizeof(CsvFileHandle), false); + if (!fhdl) { + errorPrint("Failed to malloc csv file handle. filename: %s, compress level: %d.\n", + filename, compress_level); + return NULL; + } + + if (compress_level == CSV_COMPRESS_NONE) { + fhdl.handle.fp = fopen(filename, "w"); + failed = (!fhdl.handle.fp); + } else { + char mode[TINY_BUFF_LEN]; + (void)snprintf(mode, sizeof(mode), "wb%d", compress_level); + fhdl.handle.gf = gzopen(filename, mode); + failed = (!fhdl.handle.gf); + } + + if (failed) { + tmfree(fhdl); + errorPrint("Failed to open csv file handle. filename: %s, compress level: %d.\n", + filename, compress_level); + return NULL; + } else { + fhdl->filename = filename; + fhdl->compress_level = compress_level; + fhdl->result = CSV_ERR_OK; + return fhdl; + } +} + + +static CsvIoError csvWrite(CsvFileHandle* fhdl, const char* buf, size_t size) { + if (fhdl->compress_level == CSV_COMPRESS_NONE) { + size_t ret = fwrite(buf, 1, size, fhdl->handle.fp); + if (ret != size) { + errorPrint("Failed to write csv file: %s. expected written %zu but %zu.\n", + fhdl->filename, size, ret); + if (ferror(fhdl->handle.fp)) { + perror("error"); + } + fhdl->result = CSV_ERR_WRITE_FAILED; + return CSV_ERR_WRITE_FAILED; + } + } else { + unsigned int ret = gzwrite(fhdl->handle.gf, buf, size); + if (ret != size) { + errorPrint("Failed to write csv file: %s. expected written %zu but %zu.\n", + fhdl->filename, size, ret); + int errnum; + const char* errmsg = gzerror(fhdl->handle.gf, &errnum); + errorPrint("gzwrite error: %s\n", errmsg); + fhdl->result = CSV_ERR_WRITE_FAILED; + return CSV_ERR_WRITE_FAILED; + } + } + return CSV_ERR_OK; +} + + +static void csvClose(CsvFileHandle* fhdl) { + if (fhdl->compress_level == CSV_COMPRESS_NONE) { + if (fhdl->handle.fp) { + fclose(fhdl->handle.fp); + fhdl->handle.fp = NULL; + } + } else { + if (fhdl->handle.gf) { + gzclose(fhdl->handle.gf); + fhdl->handle.gf = NULL; + } + } +} + + +static int csvWriteFile(CsvFileHandle* fhdl, uint64_t ctb_idx, int64_t cur_ts, int64_t* ck, CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) { SDataBase* db = write_meta->db; SSuperTable* stb = write_meta->stb; CsvRowTagsBuf* tags_buf_bucket = thread_meta->tags_buf_bucket; CsvRowColsBuf* tags_buf = &tags_buf_bucket[ctb_idx]; CsvRowColsBuf* cols_buf = thread_meta->cols_buf; - int ret = 0; - size_t written = 0; + int ret = 0; ret = csvGenRowColData(cols_buf->buf, cols_buf->buf_size, stb, cur_ts, db->precision, ck); @@ -484,23 +571,37 @@ static int csvWriteFile(FILE* fp, uint64_t ctb_idx, int64_t cur_ts, int64_t* ck, // write header if (thread_meta->output_header) { - written = fwrite(write_meta->csv_header, 1, write_meta->csv_header_length, fp); + ret = csvWrite(fhdl, write_meta->csv_header, write_meta->csv_header_length); + if (ret != CSV_ERR_OK) { + errorPrint("Failed to write csv header data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx); + return -1; + } + thread_meta->output_header = false; } // write columns - written = fwrite(cols_buf->buf, 1, cols_buf->length, fp); - if (written != cols_buf->length) { + ret = csvWrite(fhdl, cols_buf->buf, cols_buf->length); + if (ret != CSV_ERR_OK) { errorPrint("Failed to write csv column data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", - cols_buf->length, written, db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx); + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx); return -1; } // write tags - written = fwrite(tags_buf->buf, 1, tags_buf->length, fp); - if (written != tags_buf->length) { - errorPrint("Failed to write csv tag data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", - tags_buf->length, written, db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx); + ret = csvWrite(fhdl, tags_buf->buf, tags_buf->length); + if (ret != CSV_ERR_OK) { + errorPrint("Failed to write csv tag data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx); + return -1; + } + + // write line break + ret = csvWrite(fhdl, "\n", 1); + if (ret != CSV_ERR_OK) { + errorPrint("Failed to write csv line break data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx); return -1; } @@ -523,7 +624,7 @@ static void* csvGenStbThread(void* arg) { int64_t ck = 0; uint64_t ctb_idx = 0; int ret = 0; - FILE* fp = NULL; + CsvFileHandle* fhdl = NULL; char fullname[MAX_PATH_LEN] = {}; @@ -565,8 +666,8 @@ static void* csvGenStbThread(void* arg) { } // create fd - fp = fopen(fullname, "w"); - if (fp == NULL) { + fhdl = csvOpen(fullname, g_arguments->csv_compress_level); + if (fhdl == NULL) { errorPrint("Failed to create csv file. thread index: %d, file: %s, errno: %d, strerror: %s.\n", thread_meta->thread_id, fullname, errno, strerror(errno)); goto end; @@ -583,11 +684,11 @@ static void* csvGenStbThread(void* arg) { for (ctb_idx = 0; ctb_idx < thread_meta->ctb_count; ++ctb_idx) { for (slice_ctb_cur_ts = slice_cur_ts; slice_ctb_cur_ts < slice_batch_ts; slice_ctb_cur_ts += write_meta->stb->timestamp_step) { - ret = csvWriteFile(fp, ctb_idx, slice_ctb_cur_ts, &ck, write_meta, thread_meta); + ret = csvWriteFile(fhdl, ctb_idx, slice_ctb_cur_ts, &ck, write_meta, thread_meta); if (!ret) { errorPrint("Failed to write csv file. thread index: %d, file: %s, errno: %d, strerror: %s.\n", thread_meta->thread_id, fullname, errno, strerror(errno)); - fclose(fp); + csvClose(fhdl); goto end; } @@ -598,7 +699,7 @@ static void* csvGenStbThread(void* arg) { slice_cur_ts = slice_batch_ts; } - fclose(fp); + csvClose(fhdl); csvUpdateSliceRange(write_meta, thread_meta, last_end_ts); } diff --git a/tools/taos-tools/src/benchJsonOpt.c b/tools/taos-tools/src/benchJsonOpt.c index 4cf690204e..26c6200157 100644 --- a/tools/taos-tools/src/benchJsonOpt.c +++ b/tools/taos-tools/src/benchJsonOpt.c @@ -14,6 +14,7 @@ #include #include #include "benchLog.h" +#include "benchCsv.h" extern char g_configDir[MAX_PATH_LEN]; @@ -1636,6 +1637,22 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) { g_arguments->csv_tbname_alias = "device_id"; } + // csv compression level + tools_cJSON* cl = tools_cJSON_GetObjectItem(json, "csv_compress_level"); + if (cl && cl->type == tools_cJSON_String && cl->valuestring != NULL) { + if (0 == strcasecmp(cl->valuestring, "none")) { + g_arguments->csv_compress_level = CSV_COMPRESS_NONE; + } else if (0 == strcasecmp(cl->valuestring, "fast")) { + g_arguments->csv_compress_level = CSV_COMPRESS_FAST; + } else if (0 == strcasecmp(cl->valuestring, "balance")) { + g_arguments->csv_compress_level = CSV_COMPRESS_BALANCE; + } else if (0 == strcasecmp(cl->valuestring, "best")) { + g_arguments->csv_compress_level = CSV_COMPRESS_BEST; + } + } else { + g_arguments->csv_compress_level = CSV_COMPRESS_NONE; + } + code = 0; return code; }