From 8ba478cad0ffd60897601c78ca8c245c1b93a833 Mon Sep 17 00:00:00 2001 From: Yaming Pei Date: Mon, 3 Mar 2025 14:34:15 +0800 Subject: [PATCH] fix: resolve csv compilation errors --- tools/taos-tools/inc/bench.h | 11 +- tools/taos-tools/inc/benchCsv.h | 9 +- tools/taos-tools/inc/benchLog.h | 32 ++-- tools/taos-tools/src/benchCsv.c | 222 ++++++++++++++-------------- tools/taos-tools/src/benchJsonOpt.c | 1 - 5 files changed, 139 insertions(+), 136 deletions(-) diff --git a/tools/taos-tools/inc/bench.h b/tools/taos-tools/inc/bench.h index ac187d2575..30973170a3 100644 --- a/tools/taos-tools/inc/bench.h +++ b/tools/taos-tools/inc/bench.h @@ -719,6 +719,14 @@ typedef struct STmqMetaInfo_S { uint16_t iface; } STmqMetaInfo; + +typedef enum { + CSV_COMPRESS_NONE = 0, + CSV_COMPRESS_FAST = 1, + CSV_COMPRESS_BALANCE = 6, + CSV_COMPRESS_BEST = 9 +} CsvCompressionLevel; + typedef struct SArguments_S { uint8_t taosc_version; char * metaFile; @@ -786,9 +794,10 @@ typedef struct SArguments_S { char* csv_file_prefix; char* csv_ts_format; char* csv_ts_interval; + char* csv_tbname_alias; long csv_ts_intv_secs; bool csv_output_header; - bool csv_tbname_alias; + CsvCompressionLevel csv_compress_level; } SArguments; diff --git a/tools/taos-tools/inc/benchCsv.h b/tools/taos-tools/inc/benchCsv.h index 62e0dea7d7..f9f87aa341 100644 --- a/tools/taos-tools/inc/benchCsv.h +++ b/tools/taos-tools/inc/benchCsv.h @@ -27,13 +27,6 @@ typedef enum { CSV_NAMING_B_THREAD_TIME_SLICE } CsvNamingType; -typedef enum { - CSV_COMPRESS_NONE = 0, - CSV_COMPRESS_FAST = 1, - CSV_COMPRESS_BALANCE = 6, - CSV_COMPRESS_BEST = 9 -} CsvCompressionLevel; - typedef enum { CSV_ERR_OK = 0, CSV_ERR_OPEN_FAILED, @@ -85,7 +78,7 @@ typedef struct { size_t thread_id; bool output_header; int tags_buf_size; - CsvRowTagsBuf* tags_buf_bucket; + CsvRowTagsBuf* tags_buf_array; CsvRowColsBuf* cols_buf; } CsvThreadMeta; diff --git a/tools/taos-tools/inc/benchLog.h b/tools/taos-tools/inc/benchLog.h index 426112bcd8..961a037e3c 100644 --- a/tools/taos-tools/inc/benchLog.h +++ b/tools/taos-tools/inc/benchLog.h @@ -16,6 +16,8 @@ #ifndef INC_BENCHLOG_H_ #define INC_BENCHLOG_H_ +#include + // // suport thread safe log module // @@ -53,7 +55,7 @@ void exitLog(); (int32_t)timeSecs.tv_usec); \ fprintf(stdout, "DEBG: "); \ fprintf(stdout, "%s(%d) ", __FILE__, __LINE__); \ - fprintf(stdout, "" fmt, __VA_ARGS__); \ + fprintf(stdout, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_STDOUT); \ } \ } while (0) @@ -74,7 +76,7 @@ void exitLog(); (int32_t)timeSecs.tv_usec); \ fprintf(stdout, "DEBG: "); \ fprintf(stdout, "%s(%d) ", __FILE__, __LINE__); \ - fprintf(stdout, "" fmt, __VA_ARGS__); \ + fprintf(stdout, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_STDOUT); \ } \ } while (0) @@ -94,7 +96,7 @@ void exitLog(); do { \ if (g_arguments->debug_print) { \ lockLog(LOG_STDOUT); \ - fprintf(stdout, "" fmt, __VA_ARGS__); \ + fprintf(stdout, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_STDOUT); \ } \ } while (0) @@ -102,14 +104,14 @@ void exitLog(); #define infoPrintNoTimestamp(fmt, ...) \ do { \ lockLog(LOG_STDOUT); \ - fprintf(stdout, "" fmt, __VA_ARGS__); \ + fprintf(stdout, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_STDOUT); \ } while (0) #define infoPrintNoTimestampToFile(fmt, ...) \ do { \ lockLog(LOG_RESULT); \ - fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \ + fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_RESULT); \ } while (0) @@ -126,7 +128,7 @@ void exitLog(); ptm->tm_mon + 1, \ ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \ (int32_t)timeSecs.tv_usec); \ - fprintf(stdout, "INFO: " fmt, __VA_ARGS__); \ + fprintf(stdout, "INFO: " fmt, ##__VA_ARGS__); \ unlockLog(LOG_STDOUT); \ } while (0) @@ -142,7 +144,7 @@ void exitLog(); fprintf(g_arguments->fpOfInsertResult,"[%02d/%02d %02d:%02d:%02d.%06d] ", ptm->tm_mon + 1, \ ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \ (int32_t)timeSecs.tv_usec); \ - fprintf(g_arguments->fpOfInsertResult, "INFO: " fmt, __VA_ARGS__);\ + fprintf(g_arguments->fpOfInsertResult, "INFO: " fmt, ##__VA_ARGS__);\ unlockLog(LOG_RESULT); \ } while (0) @@ -160,7 +162,7 @@ void exitLog(); ptm->tm_mon + 1, \ ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \ (int32_t)timeSecs.tv_usec); \ - fprintf(stderr, "PERF: " fmt, __VA_ARGS__); \ + fprintf(stderr, "PERF: " fmt, ##__VA_ARGS__); \ unlockLog(LOG_STDERR); \ if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \ lockLog(LOG_RESULT); \ @@ -172,7 +174,7 @@ void exitLog(); (int32_t)timeSecs.tv_usec); \ fprintf(g_arguments->fpOfInsertResult, "PERF: "); \ fprintf(g_arguments->fpOfInsertResult, \ - "" fmt, __VA_ARGS__); \ + "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_RESULT); \ } \ } \ @@ -196,7 +198,7 @@ void exitLog(); if (g_arguments->debug_print) { \ fprintf(stderr, "%s(%d) ", __FILE__, __LINE__); \ } \ - fprintf(stderr, "" fmt, __VA_ARGS__); \ + fprintf(stderr, "" fmt, ##__VA_ARGS__); \ fprintf(stderr, "\033[0m"); \ unlockLog(LOG_STDERR); \ if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \ @@ -206,7 +208,7 @@ void exitLog(); ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \ (int32_t)timeSecs.tv_usec); \ fprintf(g_arguments->fpOfInsertResult, "ERROR: "); \ - fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \ + fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_RESULT); \ } \ } while (0) @@ -229,7 +231,7 @@ void exitLog(); if (g_arguments->debug_print) { \ fprintf(stderr, "%s(%d) ", __FILE__, __LINE__); \ } \ - fprintf(stderr, "" fmt, __VA_ARGS__); \ + fprintf(stderr, "" fmt, ##__VA_ARGS__); \ fprintf(stderr, "\033[0m"); \ unlockLog(LOG_STDERR); \ if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \ @@ -239,7 +241,7 @@ void exitLog(); ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \ (int32_t)timeSecs.tv_usec); \ fprintf(g_arguments->fpOfInsertResult, "WARN: "); \ - fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \ + fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_RESULT); \ } \ } while (0) @@ -262,7 +264,7 @@ void exitLog(); if (g_arguments->debug_print) { \ fprintf(stderr, "%s(%d) ", __FILE__, __LINE__); \ } \ - fprintf(stderr, "" fmt, __VA_ARGS__); \ + fprintf(stderr, "" fmt, ##__VA_ARGS__); \ fprintf(stderr, "\033[0m"); \ unlockLog(LOG_STDERR); \ if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \ @@ -272,7 +274,7 @@ void exitLog(); ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \ (int32_t)timeSecs.tv_usec); \ fprintf(g_arguments->fpOfInsertResult, "SUCC: "); \ - fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \ + fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \ unlockLog(LOG_RESULT); \ } \ } while (0) diff --git a/tools/taos-tools/src/benchCsv.c b/tools/taos-tools/src/benchCsv.c index 246ff79287..c491e94606 100644 --- a/tools/taos-tools/src/benchCsv.c +++ b/tools/taos-tools/src/benchCsv.c @@ -14,6 +14,7 @@ #include #include #include +#include #include "benchLog.h" #include "benchData.h" @@ -77,25 +78,25 @@ static CsvNamingType csvGetFileNamingType(SSuperTable* stb) { } -static void csvCalcTimestampStep(CsvWriteMeta* meta) { +static void csvCalcTimestampStep(CsvWriteMeta* write_meta) { time_t ts_step = 0; - if (meta->db->precision == TSDB_TIME_PRECISION_MICRO) { + if (write_meta->db->precision == TSDB_TIME_PRECISION_MICRO) { ts_step = g_arguments->csv_ts_intv_secs * 1000000L; - } else if (db->precision == TSDB_TIME_PRECISION_NANO) { + } else if (write_meta->db->precision == TSDB_TIME_PRECISION_NANO) { ts_step = g_arguments->csv_ts_intv_secs * 1000000000L; } else { ts_step = g_arguments->csv_ts_intv_secs * 1000L; } - meta->ts_step = ts_step; + write_meta->ts_step = ts_step; return; } -static void csvCalcCtbRange(CsvThreadMeta* meta, size_t total_threads, int64_t ctb_offset, int64_t ctb_count) { +static void csvCalcCtbRange(CsvThreadMeta* thread_meta, size_t total_threads, int64_t ctb_offset, int64_t ctb_count) { uint64_t ctb_start_idx = 0; uint64_t ctb_end_idx = 0; - size_t tid_idx = meta->thread_id - 1; + size_t tid_idx = thread_meta->thread_id - 1; size_t base = ctb_count / total_threads; size_t remainder = ctb_count % total_threads; @@ -111,35 +112,34 @@ static void csvCalcCtbRange(CsvThreadMeta* meta, size_t total_threads, int64_t c ctb_end_idx = ctb_offset + ctb_count; } - meta->ctb_start_idx = ctb_start_idx; - meta->ctb_end_idx = ctb_end_idx; - meta->ctb_count = ctb_count; + thread_meta->ctb_start_idx = ctb_start_idx; + thread_meta->ctb_end_idx = ctb_end_idx; + thread_meta->ctb_count = ctb_count; return; } -static void csvGenThreadFormatter(CsvWriteMeta* meta) { +static void csvGenThreadFormatter(CsvWriteMeta* write_meta) { int digits = 0; - if (meta->total_threads == 0) { + if (write_meta->total_threads == 0) { digits = 1; } else { - for (int n = meta->total_threads; n > 0; n /= 10) { + for (int n = write_meta->total_threads; n > 0; n /= 10) { digits++; } } if (digits <= 1) { - (void)snprintf(meta->thread_formatter, sizeof(meta->thread_formatter), "%%d"); + (void)snprintf(write_meta->thread_formatter, sizeof(write_meta->thread_formatter), "%%d"); } else { - (void)snprintf(meta->thread_formatter, sizeof(meta->thread_formatter), "%%0%dd", digits); + (void)snprintf(write_meta->thread_formatter, sizeof(write_meta->thread_formatter), "%%0%dd", digits); } return; } static int csvGenCsvHeader(CsvWriteMeta* write_meta) { - SDataBase* db = write_meta->db; SSuperTable* stb = write_meta->stb; char* buf = write_meta->csv_header; int pos = 0; @@ -190,7 +190,7 @@ int csvGenCreateDbSql(SDataBase* db, char* buf, int size) { } if (db->cfgs) { - for (size i = 0; i < db->cfgs->size; ++i) { + for (size_t i = 0; i < db->cfgs->size; ++i) { SDbCfg* cfg = benchArrayGet(db->cfgs, i); if (cfg->valuestring) { pos += snprintf(buf + pos, size - pos, " %s %s", cfg->name, cfg->valuestring); @@ -224,7 +224,7 @@ static int csvExportCreateDbSql(CsvWriteMeta* write_meta, FILE* fp) { length = csvGenCreateDbSql(write_meta->db, buf, sizeof(buf)); if (length < 0) { - errorPrint("Failed to generate create db sql, maybe buffer[%d] not enough.\n", sizeof(buf)); + errorPrint("Failed to generate create db sql, maybe buffer[%zu] not enough.\n", sizeof(buf)); return -1; } @@ -256,7 +256,7 @@ int csvGenCreateStbSql(SDataBase* db, SSuperTable* stb, char* buf, int size) { // columns - for (sizt_t i = 0; i < stb->cols->size; ++i) { + for (size_t i = 0; i < stb->cols->size; ++i) { Field* col = benchArrayGet(stb->cols, i); if (col->type == TSDB_DATA_TYPE_BINARY @@ -265,7 +265,7 @@ int csvGenCreateStbSql(SDataBase* db, SSuperTable* stb, char* buf, int size) { || col->type == TSDB_DATA_TYPE_GEOMETRY) { if (col->type == TSDB_DATA_TYPE_GEOMETRY && col->length < 21) { - errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__, i); + errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %zu.\n", __func__, __LINE__, i); return -1; } @@ -301,7 +301,7 @@ int csvGenCreateStbSql(SDataBase* db, SSuperTable* stb, char* buf, int size) { // tags - for (sizt_t i = 0; i < stb->tags->size; ++i) { + for (size_t i = 0; i < stb->tags->size; ++i) { Field* tag = benchArrayGet(stb->tags, i); if (i > 0) { @@ -315,7 +315,7 @@ int csvGenCreateStbSql(SDataBase* db, SSuperTable* stb, char* buf, int size) { || tag->type == TSDB_DATA_TYPE_GEOMETRY) { if (tag->type == TSDB_DATA_TYPE_GEOMETRY && tag->length < 21) { - errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__, i); + errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %zu.\n", __func__, __LINE__, i); return -1; } @@ -397,7 +397,7 @@ static int csvExportCreateStbSql(CsvWriteMeta* write_meta, FILE* fp) { length = csvGenCreateStbSql(write_meta->db, write_meta->stb, buf, sizeof(buf)); if (length < 0) { - errorPrint("Failed to generate create stb sql, maybe buffer[%d] not enough.\n", sizeof(buf)); + errorPrint("Failed to generate create stb sql, maybe buffer[%zu] not enough.\n", sizeof(buf)); return -1; } @@ -417,7 +417,6 @@ static int csvExportCreateStbSql(CsvWriteMeta* write_meta, FILE* fp) { static int csvExportCreateSql(CsvWriteMeta* write_meta) { char fullname[MAX_PATH_LEN] = {}; - char buf[LARGE_BUFF_LEN] = {}; int ret = 0; int length = 0; FILE* fp = NULL; @@ -428,7 +427,7 @@ static int csvExportCreateSql(CsvWriteMeta* write_meta) { return -1; } - FILE* fp = fopen(fullname, "w"); + fp = fopen(fullname, "w"); if (!fp) { return -1; } @@ -475,7 +474,7 @@ static int csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* write return -1; } - switch (meta.naming_type) { + switch (write_meta->naming_type) { case CSV_NAMING_I_SINGLE: { break; } @@ -484,18 +483,18 @@ static int csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* write break; } case CSV_NAMING_B_THREAD: { - meta.total_threads = g_arguments->nthreads; + write_meta->total_threads = g_arguments->nthreads; csvGenThreadFormatter(write_meta); break; } case CSV_NAMING_B_THREAD_TIME_SLICE: { - meta.total_threads = g_arguments->nthreads; + write_meta->total_threads = g_arguments->nthreads; csvGenThreadFormatter(write_meta); csvCalcTimestampStep(write_meta); break; } default: { - meta.naming_type = CSV_NAMING_I_SINGLE; + write_meta->naming_type = CSV_NAMING_I_SINGLE; break; } } @@ -516,10 +515,10 @@ static void csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id, CsvT thread_meta->thread_id = thread_id; thread_meta->output_header = false; thread_meta->tags_buf_size = 0; - thread_meta->tags_buf_bucket = NULL; + thread_meta->tags_buf_array = NULL; thread_meta->cols_buf = NULL; - csvCalcCtbRange(write_meta, write_meta->total_threads, stb->childTblFrom, stb->childTblCount); + csvCalcCtbRange(thread_meta, write_meta->total_threads, stb->childTblFrom, stb->childTblCount); switch (write_meta->naming_type) { case CSV_NAMING_I_SINGLE: @@ -533,7 +532,6 @@ static void csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id, CsvT break; } default: { - thread_meta->naming_type = CSV_NAMING_I_SINGLE; break; } } @@ -553,7 +551,7 @@ static void csvUpdateSliceRange(CsvWriteMeta* write_meta, CsvThreadMeta* thread_ case CSV_NAMING_I_TIME_SLICE: case CSV_NAMING_B_THREAD_TIME_SLICE: { thread_meta->start_secs = csvGetStartSeconds(db->precision, last_end_ts); - thread_meta->end_secs = thread_meta.start_secs + g_arguments->csv_ts_intv_secs; + thread_meta->end_secs = thread_meta->start_secs + g_arguments->csv_ts_intv_secs; break; } default: { @@ -569,7 +567,7 @@ static const char* csvGetGzipFilePrefix() { if (g_arguments->csv_compress_level == CSV_COMPRESS_NONE) { return ""; } else { - return ".gz" + return ".gz"; } } @@ -583,26 +581,26 @@ static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_me const char* file_prefix = g_arguments->csv_file_prefix; const char* gzip_suffix = csvGetGzipFilePrefix(); - switch (meta->naming_type) { + switch (write_meta->naming_type) { case CSV_NAMING_I_SINGLE: { - ret = snprintf(fullname, size, "%s%s.csv%s", base_path, file_prefix, g_arguments->csv_compress_level, gzip_suffix); + ret = snprintf(fullname, size, "%s%s.csv%s", base_path, file_prefix, gzip_suffix); break; } case CSV_NAMING_I_TIME_SLICE: { - csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf)); - csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf)); + csvConvertTime2String(thread_meta->start_secs, start_time_buf, sizeof(start_time_buf)); + csvConvertTime2String(thread_meta->end_secs, end_time_buf, sizeof(end_time_buf)); ret = snprintf(fullname, size, "%s%s_%s_%s.csv%s", base_path, file_prefix, start_time_buf, end_time_buf, gzip_suffix); break; } case CSV_NAMING_B_THREAD: { - (void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id); + (void)snprintf(thread_buf, sizeof(thread_buf), write_meta->thread_formatter, thread_meta->thread_id); ret = snprintf(fullname, size, "%s%s_%s.csv%s", base_path, file_prefix, thread_buf, gzip_suffix); break; } case CSV_NAMING_B_THREAD_TIME_SLICE: { - (void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id); - csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf)); - csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf)); + (void)snprintf(thread_buf, sizeof(thread_buf), write_meta->thread_formatter, thread_meta->thread_id); + csvConvertTime2String(thread_meta->start_secs, start_time_buf, sizeof(start_time_buf)); + csvConvertTime2String(thread_meta->end_secs, end_time_buf, sizeof(end_time_buf)); ret = snprintf(fullname, size, "%s%s_%s_%s_%s.csv%s", base_path, file_prefix, thread_buf, start_time_buf, end_time_buf, gzip_suffix); break; } @@ -699,6 +697,24 @@ static int csvGenRowColData(char* buf, int size, SSuperTable* stb, int64_t ts, i } +static void csvFreeCtbTagData(CsvThreadMeta* thread_meta, CsvRowTagsBuf* tags_buf_array) { + if (!thread_meta || !tags_buf_array) { + return; + } + + for (uint64_t i = 0 ; i < thread_meta->ctb_count; ++i) { + char* tags_buf = tags_buf_array[i].buf; + if (tags_buf) { + tmfree(tags_buf_array); + } else { + break; + } + } + tmfree(tags_buf_array); + return; +} + + static CsvRowTagsBuf* csvGenCtbTagData(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) { SSuperTable* stb = write_meta->stb; int ret = 0; @@ -708,8 +724,8 @@ static CsvRowTagsBuf* csvGenCtbTagData(CsvWriteMeta* write_meta, CsvThreadMeta* return NULL; } - CsvRowTagsBuf* tags_buf_bucket = (CsvRowTagsBuf*)benchCalloc(thread_meta->ctb_count, sizeof(CsvRowTagsBuf), true); - if (!tags_buf_bucket) { + CsvRowTagsBuf* tags_buf_array = (CsvRowTagsBuf*)benchCalloc(thread_meta->ctb_count, sizeof(CsvRowTagsBuf), true); + if (!tags_buf_array) { return NULL; } @@ -721,43 +737,25 @@ static CsvRowTagsBuf* csvGenCtbTagData(CsvWriteMeta* write_meta, CsvThreadMeta* goto error; } - tags_buf_bucket[i].buf = tags_buf; - write_meta->tags_buf_size = tags_buf_size; + tags_buf_array[i].buf = tags_buf; + thread_meta->tags_buf_size = tags_buf_size; ret = csvGenRowTagData(tags_buf, tags_buf_size, stb, thread_meta->ctb_start_idx + i, &tk); if (ret <= 0) { goto error; } - tags_buf_bucket[i].length = ret; + tags_buf_array[i].length = ret; } - return tags_buf_bucket; + return tags_buf_array; error: - csvFreeCtbTagData(thread_meta, tags_buf_bucket); + csvFreeCtbTagData(thread_meta, tags_buf_array); return NULL; } -static void csvFreeCtbTagData(CsvThreadMeta* thread_meta, CsvRowTagsBuf* tags_buf_bucket) { - if (!thread_meta || !tags_buf_bucket) { - return; - } - - for (uint64_t i = 0 ; i < thread_meta->ctb_count; ++i) { - char* tags_buf = tags_buf_bucket[i].buf; - if (tags_buf) { - tmfree(tags_buf_bucket); - } else { - break; - } - } - tmfree(tags_buf_bucket); - return; -} - - static CsvFileHandle* csvOpen(const char* filename, CsvCompressionLevel compress_level) { CsvFileHandle* fhdl = NULL; bool failed = false; @@ -770,13 +768,13 @@ static CsvFileHandle* csvOpen(const char* filename, CsvCompressionLevel compress } if (compress_level == CSV_COMPRESS_NONE) { - fhdl.handle.fp = fopen(filename, "w"); - failed = (!fhdl.handle.fp); + fhdl->handle.fp = fopen(filename, "w"); + failed = (!fhdl->handle.fp); } else { char mode[TINY_BUFF_LEN]; (void)snprintf(mode, sizeof(mode), "wb%d", compress_level); - fhdl.handle.gf = gzopen(filename, mode); - failed = (!fhdl.handle.gf); + fhdl->handle.gf = gzopen(filename, mode); + failed = (!fhdl->handle.gf); } if (failed) { @@ -806,9 +804,9 @@ static CsvIoError csvWrite(CsvFileHandle* fhdl, const char* buf, size_t size) { return CSV_ERR_WRITE_FAILED; } } else { - unsigned int ret = gzwrite(fhdl->handle.gf, buf, size); + int ret = gzwrite(fhdl->handle.gf, buf, size); if (ret != size) { - errorPrint("Failed to write csv file: %s. expected written %zu but %zu.\n", + errorPrint("Failed to write csv file: %s. expected written %zu but %d.\n", fhdl->filename, size, ret); int errnum; const char* errmsg = gzerror(fhdl->handle.gf, &errnum); @@ -839,16 +837,16 @@ static void csvClose(CsvFileHandle* fhdl) { static int csvWriteFile(CsvFileHandle* fhdl, uint64_t ctb_idx, int64_t cur_ts, int64_t* ck, CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) { SDataBase* db = write_meta->db; SSuperTable* stb = write_meta->stb; - CsvRowTagsBuf* tags_buf_bucket = thread_meta->tags_buf_bucket; - CsvRowColsBuf* tags_buf = &tags_buf_bucket[ctb_idx]; + CsvRowTagsBuf* tags_buf_array = thread_meta->tags_buf_array; + CsvRowTagsBuf* tags_buf = &tags_buf_array[ctb_idx]; CsvRowColsBuf* cols_buf = thread_meta->cols_buf; int ret = 0; ret = csvGenRowColData(cols_buf->buf, cols_buf->buf_size, stb, cur_ts, db->precision, ck); if (ret <= 0) { - errorPrint("Failed to generate csv column data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", - db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx); + errorPrint("Failed to generate csv column data. database: %s, super table: %s, naming type: %d, thread index: %zu, ctb index: %" PRIu64 ".\n", + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx); return -1; } @@ -858,7 +856,7 @@ static int csvWriteFile(CsvFileHandle* fhdl, uint64_t ctb_idx, int64_t cur_ts, i if (thread_meta->output_header) { ret = csvWrite(fhdl, write_meta->csv_header, write_meta->csv_header_length); if (ret != CSV_ERR_OK) { - errorPrint("Failed to write csv header data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", + errorPrint("Failed to write csv header data. database: %s, super table: %s, naming type: %d, thread index: %zu, ctb index: %" PRIu64 ".\n", db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx); return -1; } @@ -869,7 +867,7 @@ static int csvWriteFile(CsvFileHandle* fhdl, uint64_t ctb_idx, int64_t cur_ts, i // write columns ret = csvWrite(fhdl, cols_buf->buf, cols_buf->length); if (ret != CSV_ERR_OK) { - errorPrint("Failed to write csv column data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", + errorPrint("Failed to write csv column data. database: %s, super table: %s, naming type: %d, thread index: %zu, ctb index: %" PRIu64 ".\n", db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx); return -1; } @@ -877,7 +875,7 @@ static int csvWriteFile(CsvFileHandle* fhdl, uint64_t ctb_idx, int64_t cur_ts, i // write tags ret = csvWrite(fhdl, tags_buf->buf, tags_buf->length); if (ret != CSV_ERR_OK) { - errorPrint("Failed to write csv tag data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", + errorPrint("Failed to write csv tag data. database: %s, super table: %s, naming type: %d, thread index: %zu, ctb index: %" PRIu64 ".\n", db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx); return -1; } @@ -885,7 +883,7 @@ static int csvWriteFile(CsvFileHandle* fhdl, uint64_t ctb_idx, int64_t cur_ts, i // write line break ret = csvWrite(fhdl, "\n", 1); if (ret != CSV_ERR_OK) { - errorPrint("Failed to write csv line break data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", + errorPrint("Failed to write csv line break data. database: %s, super table: %s, naming type: %d, thread index: %zu, ctb index: %" PRIu64 ".\n", db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx); return -1; } @@ -922,10 +920,10 @@ static void* csvGenStbThread(void* arg) { // tags buffer - CsvRowTagsBuf* tags_buf_bucket = csvGenCtbTagData(write_meta, thread_meta); - if (!tags_buf_bucket) { - errorPrint("Failed to generate csv tag data. database: %s, super table: %s, naming type: %d, thread index: %d.\n", - db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id); + CsvRowTagsBuf* tags_buf_array = csvGenCtbTagData(write_meta, thread_meta); + if (!tags_buf_array) { + errorPrint("Failed to generate csv tag data. database: %s, super table: %s, naming type: %d, thread index: %zu.\n", + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id); return NULL; } @@ -933,8 +931,8 @@ static void* csvGenStbThread(void* arg) { int buf_size = stb->lenOfCols + stb->cols->size; char* buf = (char*)benchCalloc(1, buf_size, true); if (!buf) { - errorPrint("Failed to malloc csv column buffer. database: %s, super table: %s, naming type: %d, thread index: %d.\n", - db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id); + errorPrint("Failed to malloc csv column buffer. database: %s, super table: %s, naming type: %d, thread index: %zu.\n", + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id); goto end; } @@ -944,24 +942,23 @@ static void* csvGenStbThread(void* arg) { .length = 0 }; - thread_meta->tags_buf_bucket = tags_buf_bucket; + thread_meta->tags_buf_array = tags_buf_array; thread_meta->cols_buf = &cols_buf; start_print_ts = toolsGetTimestampMs(); for (cur_ts = write_meta->start_ts; cur_ts < write_meta->end_ts; cur_ts += write_meta->ts_step) { // get filename - fullname[MAX_PATH_LEN] = {}; ret = csvGetFileFullname(write_meta, thread_meta, fullname, sizeof(fullname)); if (ret < 0) { - errorPrint("Failed to generate csv filename. database: %s, super table: %s, naming type: %d, thread index: %d.\n", - db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id); + errorPrint("Failed to generate csv filename. database: %s, super table: %s, naming type: %d, thread index: %zu.\n", + db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id); goto end; } // create fd fhdl = csvOpen(fullname, g_arguments->csv_compress_level); if (fhdl == NULL) { - errorPrint("Failed to create csv file. thread index: %d, file: %s, errno: %d, strerror: %s.\n", + errorPrint("Failed to create csv file. thread index: %zu, file: %s, errno: %d, strerror: %s.\n", thread_meta->thread_id, fullname, errno, strerror(errno)); goto end; } @@ -972,7 +969,7 @@ static void* csvGenStbThread(void* arg) { slice_end_ts = MIN(cur_ts + write_meta->ts_step, write_meta->end_ts); file_rows = 0; - infoPrint("thread[%d] begin to write csv file: %s.\n", thread_meta->thread_id, fullname); + infoPrint("thread[%zu] begin to write csv file: %s.\n", thread_meta->thread_id, fullname); // write data while (slice_cur_ts < slice_end_ts) { @@ -982,7 +979,7 @@ static void* csvGenStbThread(void* arg) { for (slice_ctb_cur_ts = slice_cur_ts; slice_ctb_cur_ts < slice_batch_ts; slice_ctb_cur_ts += write_meta->stb->timestamp_step) { ret = csvWriteFile(fhdl, ctb_idx, slice_ctb_cur_ts, &ck, write_meta, thread_meta); if (!ret) { - errorPrint("Failed to write csv file. thread index: %d, file: %s, errno: %d, strerror: %s.\n", + errorPrint("Failed to write csv file. thread index: %zu, file: %s, errno: %d, strerror: %s.\n", thread_meta->thread_id, fullname, errno, strerror(errno)); csvClose(fhdl); goto end; @@ -995,7 +992,7 @@ static void* csvGenStbThread(void* arg) { cur_print_ts = toolsGetTimestampMs(); print_ts_elapse = cur_print_ts - pre_print_ts; if (print_ts_elapse > 30000) { - infoPrint("thread[%d] has currently inserted rows: %" PRIu64 ", period insert rate: %.2f rows/s.\n", + infoPrint("thread[%zu] has currently inserted rows: %" PRIu64 ", period insert rate: %.2f rows/s.\n", thread_meta->thread_id, total_rows, (total_rows - pre_total_rows) * 1000.0 / print_ts_elapse); pre_print_ts = cur_print_ts; @@ -1014,18 +1011,18 @@ static void* csvGenStbThread(void* arg) { } csvClose(fhdl); - csvUpdateSliceRange(write_meta, thread_meta, last_end_ts); + csvUpdateSliceRange(write_meta, thread_meta, slice_end_ts); } cur_print_ts = toolsGetTimestampMs(); print_ts_elapse = cur_print_ts - start_print_ts; - succPrint("thread [%d] has completed inserting rows: %" PRIu64 ", insert rate %.2f rows/s.\n", + succPrint("thread [%zu] has completed inserting rows: %" PRIu64 ", insert rate %.2f rows/s.\n", thread_meta->thread_id, total_rows, total_rows * 1000.0 / print_ts_elapse); end: thread_meta->total_rows = total_rows; - csvFreeCtbTagData(tags_buf_bucket); + csvFreeCtbTagData(thread_meta, tags_buf_array); tmfree(buf); return NULL; } @@ -1038,8 +1035,12 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) { int64_t start_ts = 0; int64_t ts_elapse = 0; + CsvWriteMeta* write_meta = NULL; + CsvThreadArgs* args = NULL; + pthread_t* pids = NULL; - CsvWriteMeta* write_meta = benchCalloc(1, sizeof(CsvWriteMeta), false); + + write_meta = benchCalloc(1, sizeof(CsvWriteMeta), false); if (!write_meta) { ret = -1; goto end; @@ -1051,13 +1052,13 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) { goto end; } - CsvThreadArgs* args = benchCalloc(write_meta->total_threads, sizeof(CsvThreadArgs), false); + args = benchCalloc(write_meta->total_threads, sizeof(CsvThreadArgs), false); if (!args) { ret = -1; goto end; } - pthread_t* pids = benchCalloc(write_meta.total_threads, sizeof(pthread_t), false); + pids = benchCalloc(write_meta->total_threads, sizeof(pthread_t), false); if (!pids) { ret = -1; goto end; @@ -1083,7 +1084,7 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) { prompt = false; } - infoPrint("pthread_join %d ...\n", i); + infoPrint("pthread_join %u ...\n", i); pthread_join(pids[i], NULL); } @@ -1097,7 +1098,7 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) { ts_elapse = toolsGetTimestampMs() - start_ts; if (ts_elapse > 0) { - succPrint("Spent %.6f seconds to insert rows: %" PRIu64 " with %d thread(s) into %s, at a rate of %.2f rows/s.\n", + succPrint("Spent %.6f seconds to insert rows: %" PRIu64 " with %zu thread(s) into %s, at a rate of %.2f rows/s.\n", ts_elapse / 1000.0, total_rows, write_meta->total_threads, g_arguments->output_path, total_rows * 1000.0 / ts_elapse); } @@ -1197,15 +1198,14 @@ static int csvParseParameter() { // csv_output_path size_t len = strlen(g_arguments->output_path); if (len == 0) { - errorPrint("Failed to generate csv files, the specified output path is empty. Please provide a valid path. database: %s, super table: %s.\n", - db->dbName, stb->stbName); + errorPrint("Failed to generate csv files, the specified output path is empty. Please provide a valid path.\n"); return -1; } if (g_arguments->output_path[len - 1] != '/') { int n = snprintf(g_arguments->output_path_buf, sizeof(g_arguments->output_path_buf), "%s/", g_arguments->output_path); if (n < 0 || n >= sizeof(g_arguments->output_path_buf)) { - errorPrint("Failed to generate csv files, path buffer overflow risk when appending '/'. path: %s, database: %s, super table: %s.\n", - g_arguments->csv_output_path, db->dbName, stb->stbName); + errorPrint("Failed to generate csv files, path buffer overflow risk when appending '/'. path: %s.\n", + g_arguments->output_path); return -1; } g_arguments->output_path = g_arguments->output_path_buf; @@ -1214,8 +1214,8 @@ static int csvParseParameter() { // csv_ts_format if (g_arguments->csv_ts_format) { if (csvValidateParamTsFormat(g_arguments->csv_ts_format) != 0) { - errorPrint("Failed to generate csv files, the parameter `csv_ts_format` is invalid. csv_ts_format: %s, database: %s, super table: %s.\n", - g_arguments->csv_ts_format, db->dbName, stb->stbName); + errorPrint("Failed to generate csv files, the parameter `csv_ts_format` is invalid. csv_ts_format: %s.\n", + g_arguments->csv_ts_format); return -1; } } @@ -1223,8 +1223,8 @@ static int csvParseParameter() { // csv_ts_interval long csv_ts_intv_secs = csvValidateParamTsInterval(g_arguments->csv_ts_interval); if (csv_ts_intv_secs <= 0) { - errorPrint("Failed to generate csv files, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s, database: %s, super table: %s.\n", - g_arguments->csv_ts_interval, db->dbName, stb->stbName); + errorPrint("Failed to generate csv files, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s.\n", + g_arguments->csv_ts_interval); return -1; } g_arguments->csv_ts_intv_secs = csv_ts_intv_secs; @@ -1237,7 +1237,7 @@ static int csvWriteThread() { for (size_t i = 0; i < g_arguments->databases->size && !g_arguments->terminate; ++i) { // database SDataBase* db = benchArrayGet(g_arguments->databases, i); - if (database->superTbls) { + if (db->superTbls) { for (size_t j = 0; j < db->superTbls->size && !g_arguments->terminate; ++j) { // stb SSuperTable* stb = benchArrayGet(db->superTbls, j); diff --git a/tools/taos-tools/src/benchJsonOpt.c b/tools/taos-tools/src/benchJsonOpt.c index 21393b8d29..967b465dff 100644 --- a/tools/taos-tools/src/benchJsonOpt.c +++ b/tools/taos-tools/src/benchJsonOpt.c @@ -14,7 +14,6 @@ #include #include #include "benchLog.h" -#include "benchCsv.h" extern char g_configDir[MAX_PATH_LEN];