diff --git a/tools/taos-tools/inc/bench.h b/tools/taos-tools/inc/bench.h index caabd39d3b..e8c94016f8 100644 --- a/tools/taos-tools/inc/bench.h +++ b/tools/taos-tools/inc/bench.h @@ -781,8 +781,8 @@ typedef struct SArguments_S { bool escape_character; bool pre_load_tb_meta; - char* csv_output_path; - char csv_output_path_buf[MAX_PATH_LEN]; + char* output_path; + char output_path_buf[MAX_PATH_LEN]; char* csv_file_prefix; char* csv_ts_format; char* csv_ts_interval; diff --git a/tools/taos-tools/inc/benchCsv.h b/tools/taos-tools/inc/benchCsv.h index 25d0c55eba..a65d5d1c9c 100644 --- a/tools/taos-tools/inc/benchCsv.h +++ b/tools/taos-tools/inc/benchCsv.h @@ -18,19 +18,26 @@ #include + +typedef enum { + CSV_NAMING_SINGLE, + CSV_NAMING_TIME_SLICE, + CSV_NAMING_THREAD, + CSV_NAMING_THREAD_TIME_SLICE +} CsvNamingType; + +typedef struct { + CsvNamingType naming_type; + time_t start_secs; + time_t end_secs; + time_t end_ts; + size_t thread_id; + size_t total_threads; + char thread_formatter[TINY_BUFF_LEN]; +} CsvWriteMeta; + + + int csvTestProcess(); -int genWithSTable(SDataBase* db, SSuperTable* stb, char* outDir); - -char * genTagData(char* buf, SSuperTable* stb, int64_t i, int64_t *k); - -char * genColumnData(char* colData, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k); - -int32_t genRowByField(char* buf, BArray* fields, int16_t fieldCnt, char* binanryPrefix, char* ncharPrefix, int64_t *k); - -void obtainCsvFile(char * outFile, SDataBase* db, SSuperTable* stb, char* outDir); - -int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain); -int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain); - #endif // INC_BENCHCSV_H_ diff --git a/tools/taos-tools/src/benchCsv.c b/tools/taos-tools/src/benchCsv.c index 6f88d2864d..c7d455c66a 100644 --- a/tools/taos-tools/src/benchCsv.c +++ b/tools/taos-tools/src/benchCsv.c @@ -15,11 +15,10 @@ #include #include -#include #include "benchLog.h" -#include -#include - +#include "benchData.h" +#include "benchDataMix.h" +#include "benchCsv.h" // // main etry @@ -29,9 +28,7 @@ -void obtainCsvFile(char * outFile, SDataBase* db, SSuperTable* stb, char* outDir) { - sprintf(outFile, "%s%s-%s.csv", outDir, db->dbName, stb->stbName); -} + int32_t writeCsvFile(FILE* f, char * buf, int32_t len) { size_t size = fwrite(buf, 1, len, f); @@ -42,29 +39,36 @@ int32_t writeCsvFile(FILE* f, char * buf, int32_t len) { return 0; } -int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain) { +int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int rows_buf_len, int minRemain) { int ret = 0; int pos = 0; int64_t tk = 0; int64_t show = 0; - int tagDataLen = stb->lenOfTags + stb->tags->size + 256; - char * tagData = (char *) benchCalloc(1, tagDataLen, true); - int colDataLen = stb->lenOfCols + stb->cols->size + 256; - char * colData = (char *) benchCalloc(1, colDataLen, true); + + uint32_t tags_length = accumulateRowLen(stbInfo->tags, stbInfo->iface); + uint32_t cols_length = accumulateRowLen(stbInfo->cols, stbInfo->iface); + + size_t tags_csv_length = tags_length + stb->tags->size; + size_t cols_csv_length = cols_length + stb->cols->size; + char* tags_csv_buf = (char*)benchCalloc(1, tags_csv_length, true); + char* cols_csv_buf = (char*)benchCalloc(1, cols_csv_length, true); // gen child name - for (int64_t i = 0; i < stb->childTblCount; i++) { + for (int64_t i = 0; i < stb->childTblCount; ++i) { int64_t ts = stb->startTimestamp; int64_t ck = 0; + + // child table + // tags - genTagData(tagData, stb, i, &tk); + csvGenRowTagData(tags_csv_buf, stb, i, &tk); // insert child column data for(int64_t j = 0; j < stb->insertRows; j++) { - genColumnData(colData, stb, ts, db->precision, &ck); + genColumnData(cols_csv_buf, stb, ts, db->precision, &ck); // combine - pos += sprintf(buf + pos, "%s,%s.\n", tagData, colData); - if (bufLen - pos < minRemain) { + pos += sprintf(buf + pos, "%s,%s.\n", tags_csv_buf, cols_csv_buf); + if (rows_buf_len - pos < minRemain) { // submit ret = writeCsvFile(fs, buf, pos); if (ret != 0) { @@ -99,48 +103,277 @@ int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufL END: // free - tmfree(tagData); - tmfree(colData); + tmfree(tags_csv_buf); + tmfree(cols_csv_buf); return ret; } -int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain) { + +static time_t csvGetStartSeconds(SDataBase* db, SSuperTable* stb) { + time_t start_seconds = 0; + + if (db->precision == TSDB_TIME_PRECISION_MICRO) { + start_seconds = stb->startTimestamp / 1000000L; + } else if (db->precision == TSDB_TIME_PRECISION_NANO) { + start_seconds = stb->startTimestamp / 1000000000L; + } else { + start_seconds = stb->startTimestamp / 1000L; + } + return start_seconds; +} + + +void csvConvertTime2String(time_t time_value, char* time_buf, size_t buf_size) { + struct tm tm_result; + char *old_locale = setlocale(LC_TIME, "C"); +#ifdef _WIN32 + gmtime_s(&tm_result, &time_value); +#else + gmtime_r(&time_value, &tm_result); +#endif + strftime(time_buf, buf_size, g_arguments->csv_ts_format, &tm_result); + if (old_locale) { + (LC_TIME, old_locale); + } +} + + +static CsvNamingType csvGetFileNamingType(SSuperTable* stb) { + if (stb->interlaceRows > 0) { + if (g_arguments->csv_ts_format) { + return CSV_NAMING_TIME_SLICE; + } else { + return CSV_NAMING_SINGLE; + } + } else { + if (g_arguments->csv_ts_format) { + return CSV_NAMING_THREAD_TIME_SLICE; + } else { + return CSV_NAMING_THREAD; + } + } +} + + +static void csvGenEndTimestamp(CsvWriteMeta* meta, SDataBase* db) { + time_t end_ts = 0; + + if (db->precision == TSDB_TIME_PRECISION_MICRO) { + end_ts = meta->end_secs * 1000000L; + } else if (db->precision == TSDB_TIME_PRECISION_NANO) { + end_ts = meta->end_secs * 1000000000L; + } else { + end_ts = meta->end_secs * 1000L; + } + meta->end_ts = end_ts; + return; +} + + +static void csvGenThreadFormatter(CsvWriteMeta* meta) { + int digits = 0; + if (meta->total_threads == 0) { + digits = 1; + } else { + for (int n = meta->total_threads; n > 0; n /= 10) { + digits++; + } + } + + if (digits <= 1) { + (void)sprintf(meta->thread_formatter, "%%d"); + } else { + (void)snprintf(meta->thread_formatter, sizeof(meta->thread_formatter), "%%0%dd", digits); + } +} + + +static CsvWriteMeta csvInitFileNamingMeta(SDataBase* db, SSuperTable* stb) { + CsvWriteMeta meta = { + .naming_type = CSV_NAMING_SINGLE, + .start_secs = 0, + .end_secs = 0, + .thread_id = 0, + .total_threads = 1, + .thread_formatter = {} + }; + + meta.naming_type = csvGetFileNamingType(stb); + + switch (meta.naming_type) { + case CSV_NAMING_SINGLE: { + break; + } + case CSV_NAMING_TIME_SLICE: { + meta.start_secs = csvGetStartSeconds(db, stb); + meta.end_secs = meta.start_secs + g_arguments->csv_ts_intv_secs; + csvGenEndTimestamp(&meta, db); + break; + } + case CSV_NAMING_THREAD: { + meta.thread_id = 1; + meta.total_threads = g_arguments->nthreads; + csvGenThreadFormatter(&meta); + break; + } + case CSV_NAMING_THREAD_TIME_SLICE: { + meta.thread_id = 1; + meta.total_threads = g_arguments->nthreads; + csvGenThreadFormatter(&meta); + meta.start_secs = csvGetStartSeconds(db, stb); + meta.end_secs = meta.start_secs + g_arguments->csv_ts_intv_secs; + csvGenEndTimestamp(&meta, db); + break; + } + default: { + meta.naming_type = CSV_NAMING_SINGLE; + break; + } + } + + return meta; +} + + +int csvGetFileFullname(CsvWriteMeta* meta, char* fullname, size_t size) { + char thread_buf[SMALL_BUFF_LEN]; + char start_time_buf[MIDDLE_BUFF_LEN]; + char end_time_buf[MIDDLE_BUFF_LEN]; + int ret = -1; + const char* base_path = g_arguments->output_path; + const char* file_prefix = g_arguments->csv_file_prefix; + + switch (meta->naming_type) { + case CSV_NAMING_SINGLE: { + ret = snprintf(fullname, size, "%s%s.csv", base_path, file_prefix); + break; + } + case CSV_NAMING_TIME_SLICE: { + csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf)); + csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf)); + ret = snprintf(fullname, size, "%s%s_%s_%s.csv", base_path, file_prefix, start_time_buf, end_time_buf); + break; + } + case CSV_NAMING_THREAD: { + (void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id); + ret = snprintf(fullname, size, "%s%s_%s.csv", base_path, file_prefix, thread_buf); + break; + } + case CSV_NAMING_THREAD_TIME_SLICE: { + (void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id); + csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf)); + csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf)); + ret = snprintf(fullname, size, "%s%s_%s_%s_%s.csv", base_path, file_prefix, thread_buf, start_time_buf, end_time_buf); + break; + } + default: { + ret = -1; + break; + } + } + + return (ret > 0 && (size_t)ret < size) ? 0 : -1; +} + + +uint32_t csvCalcInterlaceRows(CsvWriteMeta* meta, SSuperTable* stb, int64_t ts) { + uint32_t need_rows = 0; + + + switch (meta->naming_type) { + case CSV_NAMING_SINGLE: { + need_rows = stb->interlaceRows; + break; + } + case CSV_NAMING_TIME_SLICE: { + (meta->end_ts - ts) / stb->timestamp_step + need_rows = stb->interlaceRows; + + break; + } + case CSV_NAMING_THREAD: { + (void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id); + ret = snprintf(fullname, size, "%s%s_%s.csv", base_path, file_prefix, thread_buf); + break; + } + case CSV_NAMING_THREAD_TIME_SLICE: { + (void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id); + csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf)); + csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf)); + ret = snprintf(fullname, size, "%s%s_%s_%s_%s.csv", base_path, file_prefix, thread_buf, start_time_buf, end_time_buf); + break; + } + default: { + ret = -1; + break; + } + } +} + + + + +static int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fp, char* rows_buf, int rows_buf_len) { + char fullname[MAX_PATH_LEN] = {}; + CsvWriteMeta meta = csvInitFileNamingMeta(); + + int ret = csvGetFileFullname(&meta, fullname, sizeof(fullname)); + if (ret < 0) { + errorPrint("Failed to generate csv filename. database: %s, super table: %s, naming type: %d.\n", + db->dbName, stb->stbName, meta.naming_type); + return -1; + } + int ret = 0; int pos = 0; int64_t n = 0; // already inserted rows for one child table int64_t tk = 0; int64_t show = 0; + int64_t ts = 0; + int64_t last_ts = stb->startTimestamp; + + // init buffer + char** tags_buf_bucket = (char **)benchCalloc(stb->childTblCount, sizeof(char *), true); + int cols_buf_length = stb->lenOfCols + stb->cols->size; + char* cols_buf = (char *)benchCalloc(1, cols_buf_length, true); + + for (int64_t i = 0; i < stb->childTblCount; ++i) { + int tags_buf_length = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->tags->size; + tags_buf_bucket[i] = benchCalloc(1, tags_buf_length, true); + if (!tags_buf_bucket[i]) { + ret = -1; + goto end; + } + + ret = csvGenRowTagData(tags_buf_bucket[i], tags_buf_length, stb, i, &tk); + if (!ret) { + goto end; + } + } - char **tagDatas = (char **)benchCalloc(stb->childTblCount, sizeof(char *), true); - int colDataLen = stb->lenOfCols + stb->cols->size + 256; - char * colData = (char *) benchCalloc(1, colDataLen, true); - int64_t last_ts = stb->startTimestamp; - while (n < stb->insertRows ) { - for (int64_t i = 0; i < stb->childTblCount; i++) { - // start one table - int64_t ts = last_ts; + for (int64_t i = 0; i < stb->childTblCount; ++i) { + ts = last_ts; int64_t ck = 0; - // tags - if (tagDatas[i] == NULL) { - tagDatas[i] = genTagData(NULL, stb, i, &tk); - } + // calc need insert rows + uint32_t need_rows = csvCalcInterlaceRows(&meta, stb, ts) + int64_t needInserts = stb->interlaceRows; if(needInserts > stb->insertRows - n) { needInserts = stb->insertRows - n; - } + } for (int64_t j = 0; j < needInserts; j++) { - genColumnData(colData, stb, ts, db->precision, &ck); + genColumnData(cols_buf, stb, ts, db->precision, &ck); // combine tags,cols - pos += sprintf(buf + pos, "%s,%s.\n", tagDatas[i], colData); - if (bufLen - pos < minRemain) { + pos += sprintf(buf + pos, "%s,%s\n", tags_buf_bucket[i], cols_buf); + if (rows_buf_len - pos < minRemain) { // submit - ret = writeCsvFile(fs, buf, pos); + ret = writeCsvFile(fp, buf, pos); if (ret != 0) { - goto END; + goto end; } pos = 0; } @@ -152,7 +385,7 @@ int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int if(g_arguments->terminate) { infoPrint("%s", "You are cancel, exiting ... \n"); ret = -1; - goto END; + goto end; } // print show @@ -170,113 +403,131 @@ int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int } if (pos > 0) { - ret = writeCsvFile(fs, buf, pos); + ret = writeCsvFile(fp, buf, pos); pos = 0; } -END: +end: // free for (int64_t m = 0 ; m < stb->childTblCount; m ++) { - tmfree(tagDatas[m]); + tmfree(tags_buf_bucket[m]); } - tmfree(tagDatas); - tmfree(colData); + tmfree(tags_buf_bucket); + tmfree(cols_buf); return ret; } + // gen tag data -char * genTagData(char* buf, SSuperTable* stb, int64_t i, int64_t *k) { - // malloc - char* tagData; - if (buf == NULL) { - int tagDataLen = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->tags->size + 32; - tagData = benchCalloc(1, tagDataLen, true); - } else { - tagData = buf; - } - - int pos = 0; +int csvGenRowTagData(char* buf, size_t size, SSuperTable* stb, int64_t index, int64_t* k) { // tbname - pos += sprintf(tagData, "\'%s%"PRId64"\'", stb->childTblPrefix, i); + int pos = snprintf(buf, size, "\'%s%"PRId64"\'", stb->childTblPrefix, index); // tags - pos += genRowByField(tagData + pos, stb->tags, stb->tags->size, stb->binaryPrefex, stb->ncharPrefex, k); + pos += csvGenRowFields(buf + pos, stb->tags, stb->tags->size, stb->binaryPrefex, stb->ncharPrefex, k); - return tagData; + return (pos > 0 && (size_t)pos < size) ? 0 : -1; } // gen column data -char * genColumnData(char* colData, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k) { +char * genColumnData(char* cols_csv_buf, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k) { char szTime[128] = {0}; toolsFormatTimestamp(szTime, ts, precision); - int pos = sprintf(colData, "\'%s\'", szTime); + int pos = sprintf(cols_csv_buf, "\'%s\'", szTime); // columns - genRowByField(colData + pos, stb->cols, stb->cols->size, stb->binaryPrefex, stb->ncharPrefex, k); - return colData; + csvGenRowFields(cols_csv_buf + pos, stb->cols, stb->cols->size, stb->binaryPrefex, stb->ncharPrefex, k); + return cols_csv_buf; } -int32_t genRowByField(char* buf, BArray* fields, int16_t fieldCnt, char* binanryPrefix, char* ncharPrefix, int64_t *k) { +int32_t csvGenRowFields(char* buf, BArray* fields, int16_t field_count, char* binanry_prefix, char* nchar_prefix, int64_t* k) { + int32_t pos = 0; - // other cols data - int32_t pos1 = 0; - for(uint16_t i = 0; i < fieldCnt; i++) { - Field* fd = benchArrayGet(fields, i); - char* prefix = ""; - if(fd->type == TSDB_DATA_TYPE_BINARY || fd->type == TSDB_DATA_TYPE_VARBINARY) { - if(binanryPrefix) { - prefix = binanryPrefix; - } - } else if(fd->type == TSDB_DATA_TYPE_NCHAR) { - if(ncharPrefix) { - prefix = ncharPrefix; - } + for (uint16_t i = 0; i < field_count; ++i) { + Field* field = benchArrayGet(fields, i); + char* prefix = ""; + if(field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY) { + if (binanry_prefix) { + prefix = binanry_prefix; + } + } else if(field->type == TSDB_DATA_TYPE_NCHAR) { + if (nchar_prefix) { + prefix = nchar_prefix; + } + } + pos += dataGenByField(field, buf, pos, prefix, k, ""); } - pos1 += dataGenByField(fd, buf, pos1, prefix, k, ""); - } - - return pos1; + return pos; } -int genWithSTable(SDataBase* db, SSuperTable* stb) { - +int csvGenStbInterlace(SDataBase* db, SSuperTable* stb) { int ret = 0; char outFile[MAX_FILE_NAME_LEN] = {0}; obtainCsvFile(outFile, db, stb, outDir); - FILE * fs = fopen(outFile, "w"); - if(fs == NULL) { + FILE* fp = fopen(outFile, "w"); + if(fp == NULL) { errorPrint("failed create csv file. file=%s, last errno=%d strerror=%s \n", outFile, errno, strerror(errno)); return -1; } - int rowLen = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->lenOfCols + stb->tags->size + stb->cols->size; - int bufLen = rowLen * g_arguments->reqPerReq; - char* buf = benchCalloc(1, bufLen, true); + int row_buf_len = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->lenOfCols + stb->tags->size + stb->cols->size; + int rows_buf_len = row_buf_len * g_arguments->interlaceRows; + char* rows_buf = benchCalloc(1, rows_buf_len, true); infoPrint("start write csv file: %s \n", outFile); - if (stb->interlaceRows > 0) { - // interlace mode - ret = interlaceWriteCsv(db, stb, fs, buf, bufLen, rowLen * 2); - } else { - // batch mode - ret = batchWriteCsv(db, stb, fs, buf, bufLen, rowLen * 2); - } + // interlace mode + ret = interlaceWriteCsv(db, stb, fp, rows_buf, rows_buf_len); - tmfree(buf); - fclose(fs); + + tmfree(rows_buf); + fclose(fp); succPrint("end write csv file: %s \n", outFile); + + + // wait threads + for (int i = 0; i < threadCnt; i++) { + infoPrint("pthread_join %d ...\n", i); + pthread_join(pids[i], NULL); + } + + return ret; } -static int is_valid_csv_ts_format(const char* csv_ts_format) { +void csvGenPrepare(SDataBase* db, SSuperTable* stb) { + stbInfo->lenOfTags = accumulateRowLen(stbInfo->tags, stbInfo->iface); + stbInfo->lenOfCols = accumulateRowLen(stbInfo->cols, stbInfo->iface); + return; +} + + +int csvGenStb(SDataBase* db, SSuperTable* stb) { + // prepare + csvGenPrepare(db, stb); + + + int ret = 0; + if (stb->interlaceRows > 0) { + // interlace mode + ret = csvGenStbInterlace(db, stb); + } else { + // batch mode + ret = csvGenStbBatch(db, stb); + } + + return ret; +} + + +static int csvValidateParamTsFormat(const char* csv_ts_format) { if (!csv_ts_format) return 0; struct tm test_tm = { @@ -296,7 +547,11 @@ static int is_valid_csv_ts_format(const char* csv_ts_format) { return -1; } +#ifdef _WIN32 const char* invalid_chars = "/\\:*?\"<>|"; +#else + const char* invalid_chars = "/\\?\"<>|"; +#endif if (strpbrk(buffer, invalid_chars) != NULL) { return -1; } @@ -305,7 +560,7 @@ static int is_valid_csv_ts_format(const char* csv_ts_format) { } -static long validate_csv_ts_interval(const char* csv_ts_interval) { +static long csvValidateParamTsInterval(const char* csv_ts_interval) { if (!csv_ts_interval || *csv_ts_interval == '\0') return -1; char* endptr; @@ -335,35 +590,35 @@ static long validate_csv_ts_interval(const char* csv_ts_interval) { static int csvParseParameter() { // csv_output_path - size_t len = strlen(g_arguments->csv_output_path); + size_t len = strlen(g_arguments->output_path); if (len == 0) { - errorPrint("Failed to generate CSV, the specified output path is empty. Please provide a valid path. database: %s, super table: %s.\n", + errorPrint("Failed to generate CSV files, the specified output path is empty. Please provide a valid path. database: %s, super table: %s.\n", db->dbName, stb->stbName); return -1; } - if (g_arguments->csv_output_path[len - 1] != '/') { - int n = snprintf(g_arguments->csv_output_path_buf, sizeof(g_arguments->csv_output_path_buf), "%s/", g_arguments->csv_output_path); - if (n < 0 || n >= sizeof(g_arguments->csv_output_path_buf)) { - errorPrint("Failed to generate CSV, path buffer overflow risk when appending '/'. path: %s, database: %s, super table: %s.\n", - g_arguments->csv_output_path, db->dbName, stb->stbName); + if (g_arguments->output_path[len - 1] != '/') { + int n = snprintf(g_arguments->output_path_buf, sizeof(g_arguments->output_path_buf), "%s/", g_arguments->output_path); + if (n < 0 || n >= sizeof(g_arguments->output_path_buf)) { + errorPrint("Failed to generate CSV files, path buffer overflow risk when appending '/'. path: %s, database: %s, super table: %s.\n", + g_arguments->csv_output_path, db->dbName, stb->stbName); return -1; } - g_arguments->csv_output_path = g_arguments->csv_output_path_buf; + g_arguments->output_path = g_arguments->output_path_buf; } // csv_ts_format if (g_arguments->csv_ts_format) { - if (is_valid_csv_ts_format(g_arguments->csv_ts_format) != 0) { - errorPrint("Failed to generate CSV, the parameter `csv_ts_format` is invalid. csv_ts_format: %s, database: %s, super table: %s.\n", + if (csvValidateParamTsFormat(g_arguments->csv_ts_format) != 0) { + errorPrint("Failed to generate CSV files, the parameter `csv_ts_format` is invalid. csv_ts_format: %s, database: %s, super table: %s.\n", g_arguments->csv_ts_format, db->dbName, stb->stbName); return -1; } } // csv_ts_interval - long csv_ts_intv_secs = validate_csv_ts_interval(g_arguments->csv_ts_interval); + long csv_ts_intv_secs = csvValidateParamTsInterval(g_arguments->csv_ts_interval); if (csv_ts_intv_secs <= 0) { - errorPrint("Failed to generate CSV, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s, database: %s, super table: %s.\n", + errorPrint("Failed to generate CSV files, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s, database: %s, super table: %s.\n", g_arguments->csv_ts_interval, db->dbName, stb->stbName); return -1; } @@ -373,12 +628,12 @@ static int csvParseParameter() { } -static void csvWriteThread() { - for (size_t i = 0; i < g_arguments->databases->size; ++i) { +static int csvWriteThread() { + for (size_t i = 0; i < g_arguments->databases->size && !g_arguments->terminate; ++i) { // database SDataBase* db = benchArrayGet(g_arguments->databases, i); if (database->superTbls) { - for (size_t j = 0; j < db->superTbls->size; ++j) { + for (size_t j = 0; j < db->superTbls->size && !g_arguments->terminate; ++j) { // stb SSuperTable* stb = benchArrayGet(db->superTbls, j); if (stb->insertRows == 0) { @@ -386,37 +641,34 @@ static void csvWriteThread() { } // gen csv - int ret = genWithSTable(db, stb); + int ret = csvGenStb(db, stb); if(ret != 0) { errorPrint("Failed to generate CSV files. database: %s, super table: %s, error code: %d.\n", db->dbName, stb->stbName, ret); - return; + return -1; } } } } - return; + return 0; } - int csvTestProcess() { - // parse parameter + // parsing parameters if (csvParseParameter() != 0) { - errorPrint("Failed to generate CSV files. database: %s, super table: %s, error code: %d.\n", - db->dbName, stb->stbName, ret); return -1; } - - - - infoPrint("Starting to output data to CSV files in directory: %s ...\n", g_arguments->csv_output_path); + infoPrint("Starting to output data to CSV files in directory: %s ...\n", g_arguments->output_path); int64_t start = toolsGetTimestampMs(); - csvWriteThread(); + int ret = csvWriteThread(); + if (ret != 0) { + return -1; + } int64_t delay = toolsGetTimestampMs() - start; - infoPrint("Data export to CSV files in directory: %s has been completed. Time elapsed: %.3f seconds\n", - g_arguments->csv_output_path, delay / 1000.0); + infoPrint("Generating CSV files in directory: %s has been completed. Time elapsed: %.3f seconds\n", + g_arguments->output_path, delay / 1000.0); return 0; } diff --git a/tools/taos-tools/src/benchJsonOpt.c b/tools/taos-tools/src/benchJsonOpt.c index a2bf4f07d8..a88526c278 100644 --- a/tools/taos-tools/src/benchJsonOpt.c +++ b/tools/taos-tools/src/benchJsonOpt.c @@ -1586,14 +1586,14 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) { } } - // csv output dir - tools_cJSON* csv_op = tools_cJSON_GetObjectItem(json, "csv_output_path"); - if (csv_op && csv_op->type == tools_cJSON_String && csv_op->valuestring != NULL) { - g_arguments->csv_output_path = csv_op->valuestring; + // output dir + tools_cJSON* opp = tools_cJSON_GetObjectItem(json, "output_path"); + if (opp && opp->type == tools_cJSON_String && opp->valuestring != NULL) { + g_arguments->output_path = opp->valuestring; } else { - g_arguments->csv_output_path = "./output/"; + g_arguments->output_path = "./output/"; } - (void)mkdir(g_arguments->csv_output_path, 0775); + (void)mkdir(g_arguments->output_path, 0775); // csv file prefix tools_cJSON* csv_fp = tools_cJSON_GetObjectItem(json, "csv_file_prefix");