From 695e92110568fad2381d613a1f9469c1b0300333 Mon Sep 17 00:00:00 2001 From: Yaming Pei Date: Fri, 28 Feb 2025 17:39:11 +0800 Subject: [PATCH] feat: csv writing statitics --- tools/taos-tools/inc/benchCsv.h | 1 + tools/taos-tools/src/benchCsv.c | 78 +++++++++++++++++++++++++++------ 2 files changed, 65 insertions(+), 14 deletions(-) diff --git a/tools/taos-tools/inc/benchCsv.h b/tools/taos-tools/inc/benchCsv.h index 2db2ec324e..11666a0b45 100644 --- a/tools/taos-tools/inc/benchCsv.h +++ b/tools/taos-tools/inc/benchCsv.h @@ -79,6 +79,7 @@ typedef struct { uint64_t ctb_start_idx; uint64_t ctb_end_idx; uint64_t ctb_count; + uint64_t total_rows; time_t start_secs; time_t end_secs; size_t thread_id; diff --git a/tools/taos-tools/src/benchCsv.c b/tools/taos-tools/src/benchCsv.c index e7db3481b1..92641f8ea8 100644 --- a/tools/taos-tools/src/benchCsv.c +++ b/tools/taos-tools/src/benchCsv.c @@ -622,11 +622,19 @@ static void* csvGenStbThread(void* arg) { int64_t slice_batch_ts = 0; int64_t slice_ctb_cur_ts = 0; int64_t ck = 0; - uint64_t ctb_idx = 0; + uint64_t ctb_idx = 0; int ret = 0; CsvFileHandle* fhdl = NULL; char fullname[MAX_PATH_LEN] = {}; + uint64_t total_rows = 0; + uint64_t pre_total_rows = 0; + uint64_t file_rows = 0; + int64_t start_print_ts = 0; + int64_t pre_print_ts = 0; + int64_t cur_print_ts = 0; + int64_t print_ts_elapse = 0; + // tags buffer CsvRowTagsBuf* tags_buf_bucket = csvGenCtbTagData(write_meta, thread_meta); @@ -653,7 +661,7 @@ static void* csvGenStbThread(void* arg) { thread_meta->tags_buf_bucket = tags_buf_bucket; thread_meta->cols_buf = &cols_buf; - + start_print_ts = toolsGetTimestampMs(); for (cur_ts = write_meta->start_ts; cur_ts < write_meta->end_ts; cur_ts += write_meta->ts_step) { // get filename @@ -677,6 +685,9 @@ static void* csvGenStbThread(void* arg) { thread_meta->output_header = g_arguments->csv_output_header; slice_cur_ts = cur_ts; slice_end_ts = MIN(cur_ts + write_meta->ts_step, write_meta->end_ts); + file_rows = 0; + + infoPrint("thread[%d] begin to write csv file: %s.\n", thread_meta->thread_id, fullname); // write data while (slice_cur_ts < slice_end_ts) { @@ -692,7 +703,20 @@ static void* csvGenStbThread(void* arg) { goto end; } - ck += 1; + ck += 1; + total_rows += 1; + file_rows += 1; + + cur_print_ts = toolsGetTimestampMs(); + print_ts_elapse = cur_print_ts - pre_print_ts; + if (print_ts_elapse > 30000) { + infoPrint("thread[%d] has currently inserted rows: %" PRIu64 ", period insert rate: %.2f rows/s.\n", + thread_meta->thread_id, total_rows, (total_rows - pre_total_rows) * 1000.0 / print_ts_elapse); + + pre_print_ts = cur_print_ts; + pre_total_rows = total_rows; + } + if (!g_arguments->terminate) { csvClose(fhdl); @@ -708,7 +732,14 @@ static void* csvGenStbThread(void* arg) { csvUpdateSliceRange(write_meta, thread_meta, last_end_ts); } + cur_print_ts = toolsGetTimestampMs(); + print_ts_elapse = cur_print_ts - start_print_ts; + + succPrint("thread [%d] has completed inserting rows: %" PRIu64 ", insert rate %.2f rows/s.\n", + thread_meta->thread_id, total_rows, total_rows * 1000.0 / print_ts_elapse); + end: + thread_meta->total_rows = total_rows; csvFreeCtbTagData(tags_buf_bucket); tmfree(buf); return NULL; @@ -716,8 +747,12 @@ end: static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) { - int ret = 0; - bool prompt = true; + int ret = 0; + bool prompt = true; + uint64_t total_rows = 0; + int64_t start_ts = 0; + int64_t ts_elapse = 0; + CsvWriteMeta* write_meta = benchCalloc(1, sizeof(CsvWriteMeta), false); if (!write_meta) { @@ -743,6 +778,7 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) { goto end; } + start_ts = toolsGetTimestampMs(); for (uint32_t i = 0; (i < write_meta->total_threads && !g_arguments->terminate); ++i) { CsvThreadArgs* arg = &args[i]; arg->write_meta = write_meta; @@ -766,6 +802,20 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) { pthread_join(pids[i], NULL); } + // statistics + total_rows = 0; + for (uint32_t i = 0; i < write_meta->total_threads; ++i) { + CsvThreadArgs* arg = &args[i]; + CsvThreadMeta* thread_meta = &arg->thread_meta; + total_rows += thread_meta->total_rows; + } + + ts_elapse = toolsGetTimestampMs() - start_ts; + if (ts_elapse > 0) { + succPrint("Spent %.6f seconds to insert rows: %" PRIu64 " with %d thread(s) into %s, at a rate of %.2f rows/s.\n", + ts_elapse / 1000.0, total_rows, write_meta->total_threads, g_arguments->output_path, total_rows * 1000.0 / ts_elapse); + } + end: tmfree(pids); tmfree(args); @@ -859,14 +909,14 @@ static int csvParseParameter() { // csv_output_path size_t len = strlen(g_arguments->output_path); if (len == 0) { - errorPrint("Failed to generate CSV files, the specified output path is empty. Please provide a valid path. database: %s, super table: %s.\n", + errorPrint("Failed to generate csv files, the specified output path is empty. Please provide a valid path. database: %s, super table: %s.\n", db->dbName, stb->stbName); return -1; } if (g_arguments->output_path[len - 1] != '/') { int n = snprintf(g_arguments->output_path_buf, sizeof(g_arguments->output_path_buf), "%s/", g_arguments->output_path); if (n < 0 || n >= sizeof(g_arguments->output_path_buf)) { - errorPrint("Failed to generate CSV files, path buffer overflow risk when appending '/'. path: %s, database: %s, super table: %s.\n", + errorPrint("Failed to generate csv files, path buffer overflow risk when appending '/'. path: %s, database: %s, super table: %s.\n", g_arguments->csv_output_path, db->dbName, stb->stbName); return -1; } @@ -876,7 +926,7 @@ static int csvParseParameter() { // csv_ts_format if (g_arguments->csv_ts_format) { if (csvValidateParamTsFormat(g_arguments->csv_ts_format) != 0) { - errorPrint("Failed to generate CSV files, the parameter `csv_ts_format` is invalid. csv_ts_format: %s, database: %s, super table: %s.\n", + errorPrint("Failed to generate csv files, the parameter `csv_ts_format` is invalid. csv_ts_format: %s, database: %s, super table: %s.\n", g_arguments->csv_ts_format, db->dbName, stb->stbName); return -1; } @@ -885,7 +935,7 @@ static int csvParseParameter() { // csv_ts_interval long csv_ts_intv_secs = csvValidateParamTsInterval(g_arguments->csv_ts_interval); if (csv_ts_intv_secs <= 0) { - errorPrint("Failed to generate CSV files, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s, database: %s, super table: %s.\n", + errorPrint("Failed to generate csv files, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s, database: %s, super table: %s.\n", g_arguments->csv_ts_interval, db->dbName, stb->stbName); return -1; } @@ -910,7 +960,7 @@ static int csvWriteThread() { // gen csv int ret = csvGenStb(db, stb); if(ret != 0) { - errorPrint("Failed to generate CSV files. database: %s, super table: %s, error code: %d.\n", + errorPrint("Failed to generate csv files. database: %s, super table: %s, error code: %d.\n", db->dbName, stb->stbName, ret); return -1; } @@ -928,14 +978,14 @@ int csvTestProcess() { return -1; } - infoPrint("Starting to output data to CSV files in directory: %s ...\n", g_arguments->output_path); + infoPrint("Starting to output data to csv files in directory: %s ...\n", g_arguments->output_path); int64_t start = toolsGetTimestampMs(); int ret = csvWriteThread(); if (ret != 0) { return -1; } - int64_t delay = toolsGetTimestampMs() - start; - infoPrint("Generating CSV files in directory: %s has been completed. Time elapsed: %.3f seconds\n", - g_arguments->output_path, delay / 1000.0); + int64_t elapse = toolsGetTimestampMs() - start; + infoPrint("Generating csv files in directory: %s has been completed. Time elapsed: %.3f seconds\n", + g_arguments->output_path, elapse / 1000.0); return 0; }