diff --git a/tools/taos-tools/inc/bench.h b/tools/taos-tools/inc/bench.h index 30973170a3..c413d953b7 100644 --- a/tools/taos-tools/inc/bench.h +++ b/tools/taos-tools/inc/bench.h @@ -479,6 +479,13 @@ typedef struct SChildTable_S { int32_t pkCnt; } SChildTable; +typedef enum { + CSV_COMPRESS_NONE = 0, + CSV_COMPRESS_FAST = 1, + CSV_COMPRESS_BALANCE = 6, + CSV_COMPRESS_BEST = 9 +} CsvCompressionLevel; + #define PRIMARY_KEY "PRIMARY KEY" typedef struct SSuperTable_S { char *stbName; @@ -581,6 +588,15 @@ typedef struct SSuperTable_S { // execute sqls after create super table char **sqls; + + char* csv_file_prefix; + char* csv_ts_format; + char* csv_ts_interval; + char* csv_tbname_alias; + long csv_ts_intv_secs; + bool csv_output_header; + CsvCompressionLevel csv_compress_level; + } SSuperTable; typedef struct SDbCfg_S { @@ -719,14 +735,6 @@ typedef struct STmqMetaInfo_S { uint16_t iface; } STmqMetaInfo; - -typedef enum { - CSV_COMPRESS_NONE = 0, - CSV_COMPRESS_FAST = 1, - CSV_COMPRESS_BALANCE = 6, - CSV_COMPRESS_BEST = 9 -} CsvCompressionLevel; - typedef struct SArguments_S { uint8_t taosc_version; char * metaFile; @@ -791,14 +799,6 @@ typedef struct SArguments_S { char* output_path; char output_path_buf[MAX_PATH_LEN]; - char* csv_file_prefix; - char* csv_ts_format; - char* csv_ts_interval; - char* csv_tbname_alias; - long csv_ts_intv_secs; - bool csv_output_header; - - CsvCompressionLevel csv_compress_level; } SArguments; diff --git a/tools/taos-tools/src/benchCsv.c b/tools/taos-tools/src/benchCsv.c index 848cd9a6ef..dd6ce3360a 100644 --- a/tools/taos-tools/src/benchCsv.c +++ b/tools/taos-tools/src/benchCsv.c @@ -31,6 +31,111 @@ +static int csvValidateParamTsFormat(const char* csv_ts_format) { + if (!csv_ts_format) return 0; + + struct tm test_tm = { + .tm_year = 70, + .tm_mon = 0, + .tm_mday = 1, + .tm_hour = 0, + .tm_min = 0, + .tm_sec = 0, + .tm_isdst = -1 + }; + mktime(&test_tm); + + char buffer[1024]; + size_t len = strftime(buffer, sizeof(buffer), csv_ts_format, &test_tm); + if (len == 0) { + return -1; + } + +#ifdef _WIN32 + const char* invalid_chars = "/\\:*?\"<>|"; +#else + const char* invalid_chars = "/\\?\"<>|"; +#endif + if (strpbrk(buffer, invalid_chars) != NULL) { + return -1; + } + + return 0; +} + + +static long csvValidateParamTsInterval(const char* csv_ts_interval) { + if (!csv_ts_interval || *csv_ts_interval == '\0') return -1; + + char* endptr; + errno = 0; + const long num = strtol(csv_ts_interval, &endptr, 10); + + if (errno == ERANGE || + endptr == csv_ts_interval || + num <= 0) { + return -1; + } + + if (*endptr == '\0' || + *(endptr + 1) != '\0') { + return -1; + } + + switch (tolower(*endptr)) { + case 's': return num; + case 'm': return num * 60; + case 'h': return num * 60 * 60; + case 'd': return num * 60 * 60 * 24; + default : return -1; + } +} + + +static int csvParseParameter() { + // csv_output_path + size_t len = strlen(g_arguments->output_path); + if (len == 0) { + errorPrint("Failed to generate csv files, the specified output path is empty. Please provide a valid path.\n"); + return -1; + } + if (g_arguments->output_path[len - 1] != '/') { + int n = snprintf(g_arguments->output_path_buf, sizeof(g_arguments->output_path_buf), "%s/", g_arguments->output_path); + if (n < 0 || n >= sizeof(g_arguments->output_path_buf)) { + errorPrint("Failed to generate csv files, path buffer overflow risk when appending '/'. path: %s.\n", + g_arguments->output_path); + return -1; + } + g_arguments->output_path = g_arguments->output_path_buf; + } + + return 0; +} + + +static int csvParseStbParameter(SSuperTable* stb) { + // csv_ts_format + if (stb->csv_ts_format) { + if (csvValidateParamTsFormat(stb->csv_ts_format) != 0) { + errorPrint("Failed to generate csv files, the parameter `csv_ts_format` is invalid. csv_ts_format: %s.\n", + stb->csv_ts_format); + return -1; + } + } + + // csv_ts_interval + long csv_ts_intv_secs = csvValidateParamTsInterval(stb->csv_ts_interval); + if (csv_ts_intv_secs <= 0) { + errorPrint("Failed to generate csv files, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s.\n", + stb->csv_ts_interval); + return -1; + } + stb->csv_ts_intv_secs = csv_ts_intv_secs; + + return 0; +} + + static time_t csvGetStartSeconds(int precision, int64_t start_ts) { time_t start_seconds = 0; @@ -45,7 +150,7 @@ static time_t csvGetStartSeconds(int precision, int64_t start_ts) { } -static void csvConvertTime2String(time_t time_value, char* time_buf, size_t buf_size) { +static void csvConvertTime2String(time_t time_value, char* ts_format, char* time_buf, size_t buf_size) { struct tm tm_result; char *old_locale = setlocale(LC_TIME, "C"); #ifdef _WIN32 @@ -53,7 +158,7 @@ static void csvConvertTime2String(time_t time_value, char* time_buf, size_t buf_ #else gmtime_r(&time_value, &tm_result); #endif - strftime(time_buf, buf_size, g_arguments->csv_ts_format, &tm_result); + strftime(time_buf, buf_size, ts_format, &tm_result); if (old_locale) { setlocale(LC_TIME, old_locale); } @@ -63,13 +168,13 @@ static void csvConvertTime2String(time_t time_value, char* time_buf, size_t buf_ static CsvNamingType csvGetFileNamingType(SSuperTable* stb) { if (stb->interlaceRows > 0) { - if (g_arguments->csv_ts_format) { + if (stb->csv_ts_format) { return CSV_NAMING_I_TIME_SLICE; } else { return CSV_NAMING_I_SINGLE; } } else { - if (g_arguments->csv_ts_format) { + if (stb->csv_ts_format) { return CSV_NAMING_B_THREAD_TIME_SLICE; } else { return CSV_NAMING_B_THREAD; @@ -82,11 +187,11 @@ static void csvCalcTimestampStep(CsvWriteMeta* write_meta) { time_t ts_step = 0; if (write_meta->db->precision == TSDB_TIME_PRECISION_MICRO) { - ts_step = g_arguments->csv_ts_intv_secs * 1000000L; + ts_step = write_meta->stb->csv_ts_intv_secs * 1000000L; } else if (write_meta->db->precision == TSDB_TIME_PRECISION_NANO) { - ts_step = g_arguments->csv_ts_intv_secs * 1000000000L; + ts_step = write_meta->stb->csv_ts_intv_secs * 1000000000L; } else { - ts_step = g_arguments->csv_ts_intv_secs * 1000L; + ts_step = write_meta->stb->csv_ts_intv_secs * 1000L; } write_meta->ts_step = ts_step; return; @@ -145,7 +250,7 @@ static int csvGenCsvHeader(CsvWriteMeta* write_meta) { int pos = 0; int size = sizeof(write_meta->csv_header); - if (!g_arguments->csv_output_header) { + if (!write_meta->stb->csv_output_header) { return 0; } @@ -159,7 +264,7 @@ static int csvGenCsvHeader(CsvWriteMeta* write_meta) { } // tbname - pos += snprintf(buf + pos, size - pos, ",%s", g_arguments->csv_tbname_alias); + pos += snprintf(buf + pos, size - pos, ",%s", write_meta->stb->csv_tbname_alias); // tags for (size_t i = 0; i < stb->tags->size; ++i) { @@ -479,23 +584,23 @@ static int csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* write switch (write_meta->naming_type) { case CSV_NAMING_I_SINGLE: { - (void)snprintf(write_meta->mode, sizeof(write_meta->mode), "interlace|no-time-slice"); + (void)snprintf(write_meta->mode, sizeof(write_meta->mode), "interlace::normal"); break; } case CSV_NAMING_I_TIME_SLICE: { - (void)snprintf(write_meta->mode, sizeof(write_meta->mode), "interlace|time-slice"); + (void)snprintf(write_meta->mode, sizeof(write_meta->mode), "interlace::time-slice"); csvCalcTimestampStep(write_meta); break; } case CSV_NAMING_B_THREAD: { write_meta->total_threads = MIN(g_arguments->nthreads, stb->childTblCount); - (void)snprintf(write_meta->mode, sizeof(write_meta->mode), "batch[%zu]|no-time-slice", write_meta->total_threads); + (void)snprintf(write_meta->mode, sizeof(write_meta->mode), "batch[%zu]::normal", write_meta->total_threads); csvGenThreadFormatter(write_meta); break; } case CSV_NAMING_B_THREAD_TIME_SLICE: { write_meta->total_threads = MIN(g_arguments->nthreads, stb->childTblCount); - (void)snprintf(write_meta->mode, sizeof(write_meta->mode), "batch[%zu]|time-slice", write_meta->total_threads); + (void)snprintf(write_meta->mode, sizeof(write_meta->mode), "batch[%zu]::time-slice", write_meta->total_threads); csvGenThreadFormatter(write_meta); csvCalcTimestampStep(write_meta); break; @@ -535,7 +640,7 @@ static void csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id, CsvT case CSV_NAMING_I_TIME_SLICE: case CSV_NAMING_B_THREAD_TIME_SLICE: { thread_meta->start_secs = csvGetStartSeconds(db->precision, stb->startTimestamp); - thread_meta->end_secs = thread_meta->start_secs + g_arguments->csv_ts_intv_secs; + thread_meta->end_secs = thread_meta->start_secs + write_meta->stb->csv_ts_intv_secs; break; } default: { @@ -558,7 +663,7 @@ static void csvUpdateSliceRange(CsvWriteMeta* write_meta, CsvThreadMeta* thread_ case CSV_NAMING_I_TIME_SLICE: case CSV_NAMING_B_THREAD_TIME_SLICE: { thread_meta->start_secs = csvGetStartSeconds(db->precision, last_end_ts); - thread_meta->end_secs = thread_meta->start_secs + g_arguments->csv_ts_intv_secs; + thread_meta->end_secs = thread_meta->start_secs + write_meta->stb->csv_ts_intv_secs; break; } default: { @@ -570,8 +675,8 @@ static void csvUpdateSliceRange(CsvWriteMeta* write_meta, CsvThreadMeta* thread_ } -static const char* csvGetGzipFilePrefix() { - if (g_arguments->csv_compress_level == CSV_COMPRESS_NONE) { +static const char* csvGetGzipFilePrefix(CsvCompressionLevel csv_compress_level) { + if (csv_compress_level == CSV_COMPRESS_NONE) { return ""; } else { return ".gz"; @@ -585,8 +690,8 @@ static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_me char end_time_buf[MIDDLE_BUFF_LEN]; int ret = -1; const char* base_path = g_arguments->output_path; - const char* file_prefix = g_arguments->csv_file_prefix; - const char* gzip_suffix = csvGetGzipFilePrefix(); + const char* file_prefix = write_meta->stb->csv_file_prefix; + const char* gzip_suffix = csvGetGzipFilePrefix(write_meta->stb->csv_compress_level); switch (write_meta->naming_type) { case CSV_NAMING_I_SINGLE: { @@ -594,8 +699,8 @@ static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_me break; } case CSV_NAMING_I_TIME_SLICE: { - csvConvertTime2String(thread_meta->start_secs, start_time_buf, sizeof(start_time_buf)); - csvConvertTime2String(thread_meta->end_secs, end_time_buf, sizeof(end_time_buf)); + csvConvertTime2String(thread_meta->start_secs, write_meta->stb->csv_ts_format, start_time_buf, sizeof(start_time_buf)); + csvConvertTime2String(thread_meta->end_secs, write_meta->stb->csv_ts_format, end_time_buf, sizeof(end_time_buf)); ret = snprintf(fullname, size, "%s%s_%s_%s.csv%s", base_path, file_prefix, start_time_buf, end_time_buf, gzip_suffix); break; } @@ -606,8 +711,8 @@ static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_me } case CSV_NAMING_B_THREAD_TIME_SLICE: { (void)snprintf(thread_buf, sizeof(thread_buf), write_meta->thread_formatter, thread_meta->thread_id); - csvConvertTime2String(thread_meta->start_secs, start_time_buf, sizeof(start_time_buf)); - csvConvertTime2String(thread_meta->end_secs, end_time_buf, sizeof(end_time_buf)); + csvConvertTime2String(thread_meta->start_secs, write_meta->stb->csv_ts_format, start_time_buf, sizeof(start_time_buf)); + csvConvertTime2String(thread_meta->end_secs, write_meta->stb->csv_ts_format, end_time_buf, sizeof(end_time_buf)); ret = snprintf(fullname, size, "%s%s_%s_%s_%s.csv%s", base_path, file_prefix, thread_buf, start_time_buf, end_time_buf, gzip_suffix); break; } @@ -968,7 +1073,7 @@ static void* csvGenStbThread(void* arg) { } // create fd - fhdl = csvOpen(fullname, g_arguments->csv_compress_level); + fhdl = csvOpen(fullname, stb->csv_compress_level); if (fhdl == NULL) { errorPrint("Failed to create csv file. thread index: %zu, file: %s, errno: %d, strerror: %s.\n", thread_meta->thread_id, fullname, errno, strerror(errno)); @@ -976,7 +1081,7 @@ static void* csvGenStbThread(void* arg) { } - thread_meta->output_header = g_arguments->csv_output_header; + thread_meta->output_header = stb->csv_output_header; slice_cur_ts = cur_ts; slice_end_ts = MIN(cur_ts + write_meta->ts_step, write_meta->end_ts); file_rows = 0; @@ -1148,106 +1253,6 @@ static int csvGenStb(SDataBase* db, SSuperTable* stb) { } -static int csvValidateParamTsFormat(const char* csv_ts_format) { - if (!csv_ts_format) return 0; - - struct tm test_tm = { - .tm_year = 70, - .tm_mon = 0, - .tm_mday = 1, - .tm_hour = 0, - .tm_min = 0, - .tm_sec = 0, - .tm_isdst = -1 - }; - mktime(&test_tm); - - char buffer[1024]; - size_t len = strftime(buffer, sizeof(buffer), csv_ts_format, &test_tm); - if (len == 0) { - return -1; - } - -#ifdef _WIN32 - const char* invalid_chars = "/\\:*?\"<>|"; -#else - const char* invalid_chars = "/\\?\"<>|"; -#endif - if (strpbrk(buffer, invalid_chars) != NULL) { - return -1; - } - - return 0; -} - - -static long csvValidateParamTsInterval(const char* csv_ts_interval) { - if (!csv_ts_interval || *csv_ts_interval == '\0') return -1; - - char* endptr; - errno = 0; - const long num = strtol(csv_ts_interval, &endptr, 10); - - if (errno == ERANGE || - endptr == csv_ts_interval || - num <= 0) { - return -1; - } - - if (*endptr == '\0' || - *(endptr + 1) != '\0') { - return -1; - } - - switch (tolower(*endptr)) { - case 's': return num; - case 'm': return num * 60; - case 'h': return num * 60 * 60; - case 'd': return num * 60 * 60 * 24; - default : return -1; - } -} - - -static int csvParseParameter() { - // csv_output_path - size_t len = strlen(g_arguments->output_path); - if (len == 0) { - errorPrint("Failed to generate csv files, the specified output path is empty. Please provide a valid path.\n"); - return -1; - } - if (g_arguments->output_path[len - 1] != '/') { - int n = snprintf(g_arguments->output_path_buf, sizeof(g_arguments->output_path_buf), "%s/", g_arguments->output_path); - if (n < 0 || n >= sizeof(g_arguments->output_path_buf)) { - errorPrint("Failed to generate csv files, path buffer overflow risk when appending '/'. path: %s.\n", - g_arguments->output_path); - return -1; - } - g_arguments->output_path = g_arguments->output_path_buf; - } - - // csv_ts_format - if (g_arguments->csv_ts_format) { - if (csvValidateParamTsFormat(g_arguments->csv_ts_format) != 0) { - errorPrint("Failed to generate csv files, the parameter `csv_ts_format` is invalid. csv_ts_format: %s.\n", - g_arguments->csv_ts_format); - return -1; - } - } - - // csv_ts_interval - long csv_ts_intv_secs = csvValidateParamTsInterval(g_arguments->csv_ts_interval); - if (csv_ts_intv_secs <= 0) { - errorPrint("Failed to generate csv files, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s.\n", - g_arguments->csv_ts_interval); - return -1; - } - g_arguments->csv_ts_intv_secs = csv_ts_intv_secs; - - return 0; -} - - static int csvWriteThread() { for (size_t i = 0; i < g_arguments->databases->size && !g_arguments->terminate; ++i) { // database @@ -1260,8 +1265,16 @@ static int csvWriteThread() { continue; } + // parsing parameters + int ret = csvParseStbParameter(stb); + if (ret != 0) { + errorPrint("Failed to parse csv parameter. database: %s, super table: %s, error code: %d.\n", + db->dbName, stb->stbName, ret); + return -1; + } + // gen csv - int ret = csvGenStb(db, stb); + ret = csvGenStb(db, stb); if(ret != 0) { errorPrint("Failed to generate csv files. database: %s, super table: %s, error code: %d.\n", db->dbName, stb->stbName, ret); diff --git a/tools/taos-tools/src/benchJsonOpt.c b/tools/taos-tools/src/benchJsonOpt.c index 9bc8527130..83edc5c6ef 100644 --- a/tools/taos-tools/src/benchJsonOpt.c +++ b/tools/taos-tools/src/benchJsonOpt.c @@ -1405,6 +1405,65 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) { } } } + + // csv file prefix + tools_cJSON* csv_fp = tools_cJSON_GetObjectItem(stbInfo, "csv_file_prefix"); + if (csv_fp && csv_fp->type == tools_cJSON_String && csv_fp->valuestring != NULL) { + superTable->csv_file_prefix = csv_fp->valuestring; + } else { + superTable->csv_file_prefix = "data"; + } + + // csv timestamp format + tools_cJSON* csv_tf = tools_cJSON_GetObjectItem(stbInfo, "csv_ts_format"); + if (csv_tf && csv_tf->type == tools_cJSON_String && csv_tf->valuestring != NULL) { + superTable->csv_ts_format = csv_tf->valuestring; + } else { + superTable->csv_ts_format = NULL; + } + + // csv timestamp format + tools_cJSON* csv_ti = tools_cJSON_GetObjectItem(stbInfo, "csv_ts_interval"); + if (csv_ti && csv_ti->type == tools_cJSON_String && csv_ti->valuestring != NULL) { + superTable->csv_ts_interval = csv_ti->valuestring; + } else { + superTable->csv_ts_interval = "1d"; + } + + // csv output header + superTable->csv_output_header = true; + tools_cJSON* oph = tools_cJSON_GetObjectItem(stbInfo, "csv_output_header"); + if (oph && oph->type == tools_cJSON_String && oph->valuestring != NULL) { + if (0 == strcasecmp(oph->valuestring, "yes") || 0 == strcasecmp(oph->valuestring, "true")) { + superTable->csv_output_header = true; + } else if (0 == strcasecmp(oph->valuestring, "no") || 0 == strcasecmp(oph->valuestring, "false")) { + superTable->csv_output_header = false; + } + } + + // csv tbname alias + tools_cJSON* tba = tools_cJSON_GetObjectItem(stbInfo, "csv_tbname_alias"); + if (tba && tba->type == tools_cJSON_String && tba->valuestring != NULL) { + superTable->csv_tbname_alias = tba->valuestring; + } else { + superTable->csv_tbname_alias = "device_id"; + } + + // csv compression level + tools_cJSON* cl = tools_cJSON_GetObjectItem(stbInfo, "csv_compress_level"); + if (cl && cl->type == tools_cJSON_String && cl->valuestring != NULL) { + if (0 == strcasecmp(cl->valuestring, "none")) { + superTable->csv_compress_level = CSV_COMPRESS_NONE; + } else if (0 == strcasecmp(cl->valuestring, "fast")) { + superTable->csv_compress_level = CSV_COMPRESS_FAST; + } else if (0 == strcasecmp(cl->valuestring, "balance")) { + superTable->csv_compress_level = CSV_COMPRESS_BALANCE; + } else if (0 == strcasecmp(cl->valuestring, "best")) { + superTable->csv_compress_level = CSV_COMPRESS_BEST; + } + } else { + superTable->csv_compress_level = CSV_COMPRESS_NONE; + } } return 0; } @@ -1595,65 +1654,6 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) { } (void)mkdir(g_arguments->output_path, 0775); - // csv file prefix - tools_cJSON* csv_fp = tools_cJSON_GetObjectItem(json, "csv_file_prefix"); - if (csv_fp && csv_fp->type == tools_cJSON_String && csv_fp->valuestring != NULL) { - g_arguments->csv_file_prefix = csv_fp->valuestring; - } else { - g_arguments->csv_file_prefix = "data"; - } - - // csv timestamp format - tools_cJSON* csv_tf = tools_cJSON_GetObjectItem(json, "csv_ts_format"); - if (csv_tf && csv_tf->type == tools_cJSON_String && csv_tf->valuestring != NULL) { - g_arguments->csv_ts_format = csv_tf->valuestring; - } else { - g_arguments->csv_ts_format = NULL; - } - - // csv timestamp format - tools_cJSON* csv_ti = tools_cJSON_GetObjectItem(json, "csv_ts_interval"); - if (csv_ti && csv_ti->type == tools_cJSON_String && csv_ti->valuestring != NULL) { - g_arguments->csv_ts_interval = csv_ti->valuestring; - } else { - g_arguments->csv_ts_interval = "1d"; - } - - // csv output header - g_arguments->csv_output_header = true; - tools_cJSON* oph = tools_cJSON_GetObjectItem(json, "csv_output_header"); - if (oph && oph->type == tools_cJSON_String && oph->valuestring != NULL) { - if (0 == strcasecmp(oph->valuestring, "yes") || 0 == strcasecmp(oph->valuestring, "true")) { - g_arguments->csv_output_header = true; - } else if (0 == strcasecmp(oph->valuestring, "no") || 0 == strcasecmp(oph->valuestring, "false")) { - g_arguments->csv_output_header = false; - } - } - - // csv tbname alias - tools_cJSON* tba = tools_cJSON_GetObjectItem(json, "csv_tbname_alias"); - if (tba && tba->type == tools_cJSON_String && tba->valuestring != NULL) { - g_arguments->csv_tbname_alias = tba->valuestring; - } else { - g_arguments->csv_tbname_alias = "device_id"; - } - - // csv compression level - tools_cJSON* cl = tools_cJSON_GetObjectItem(json, "csv_compress_level"); - if (cl && cl->type == tools_cJSON_String && cl->valuestring != NULL) { - if (0 == strcasecmp(cl->valuestring, "none")) { - g_arguments->csv_compress_level = CSV_COMPRESS_NONE; - } else if (0 == strcasecmp(cl->valuestring, "fast")) { - g_arguments->csv_compress_level = CSV_COMPRESS_FAST; - } else if (0 == strcasecmp(cl->valuestring, "balance")) { - g_arguments->csv_compress_level = CSV_COMPRESS_BALANCE; - } else if (0 == strcasecmp(cl->valuestring, "best")) { - g_arguments->csv_compress_level = CSV_COMPRESS_BEST; - } - } else { - g_arguments->csv_compress_level = CSV_COMPRESS_NONE; - } - code = 0; return code; }