feat: csv writing statitics

This commit is contained in:
Yaming Pei 2025-02-28 17:39:11 +08:00
parent 13d845935e
commit 695e921105
2 changed files with 65 additions and 14 deletions

View File

@ -79,6 +79,7 @@ typedef struct {
uint64_t ctb_start_idx;
uint64_t ctb_end_idx;
uint64_t ctb_count;
uint64_t total_rows;
time_t start_secs;
time_t end_secs;
size_t thread_id;

View File

@ -622,11 +622,19 @@ static void* csvGenStbThread(void* arg) {
int64_t slice_batch_ts = 0;
int64_t slice_ctb_cur_ts = 0;
int64_t ck = 0;
uint64_t ctb_idx = 0;
uint64_t ctb_idx = 0;
int ret = 0;
CsvFileHandle* fhdl = NULL;
char fullname[MAX_PATH_LEN] = {};
uint64_t total_rows = 0;
uint64_t pre_total_rows = 0;
uint64_t file_rows = 0;
int64_t start_print_ts = 0;
int64_t pre_print_ts = 0;
int64_t cur_print_ts = 0;
int64_t print_ts_elapse = 0;
// tags buffer
CsvRowTagsBuf* tags_buf_bucket = csvGenCtbTagData(write_meta, thread_meta);
@ -653,7 +661,7 @@ static void* csvGenStbThread(void* arg) {
thread_meta->tags_buf_bucket = tags_buf_bucket;
thread_meta->cols_buf = &cols_buf;
start_print_ts = toolsGetTimestampMs();
for (cur_ts = write_meta->start_ts; cur_ts < write_meta->end_ts; cur_ts += write_meta->ts_step) {
// get filename
@ -677,6 +685,9 @@ static void* csvGenStbThread(void* arg) {
thread_meta->output_header = g_arguments->csv_output_header;
slice_cur_ts = cur_ts;
slice_end_ts = MIN(cur_ts + write_meta->ts_step, write_meta->end_ts);
file_rows = 0;
infoPrint("thread[%d] begin to write csv file: %s.\n", thread_meta->thread_id, fullname);
// write data
while (slice_cur_ts < slice_end_ts) {
@ -692,7 +703,20 @@ static void* csvGenStbThread(void* arg) {
goto end;
}
ck += 1;
ck += 1;
total_rows += 1;
file_rows += 1;
cur_print_ts = toolsGetTimestampMs();
print_ts_elapse = cur_print_ts - pre_print_ts;
if (print_ts_elapse > 30000) {
infoPrint("thread[%d] has currently inserted rows: %" PRIu64 ", period insert rate: %.2f rows/s.\n",
thread_meta->thread_id, total_rows, (total_rows - pre_total_rows) * 1000.0 / print_ts_elapse);
pre_print_ts = cur_print_ts;
pre_total_rows = total_rows;
}
if (!g_arguments->terminate) {
csvClose(fhdl);
@ -708,7 +732,14 @@ static void* csvGenStbThread(void* arg) {
csvUpdateSliceRange(write_meta, thread_meta, last_end_ts);
}
cur_print_ts = toolsGetTimestampMs();
print_ts_elapse = cur_print_ts - start_print_ts;
succPrint("thread [%d] has completed inserting rows: %" PRIu64 ", insert rate %.2f rows/s.\n",
thread_meta->thread_id, total_rows, total_rows * 1000.0 / print_ts_elapse);
end:
thread_meta->total_rows = total_rows;
csvFreeCtbTagData(tags_buf_bucket);
tmfree(buf);
return NULL;
@ -716,8 +747,12 @@ end:
static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) {
int ret = 0;
bool prompt = true;
int ret = 0;
bool prompt = true;
uint64_t total_rows = 0;
int64_t start_ts = 0;
int64_t ts_elapse = 0;
CsvWriteMeta* write_meta = benchCalloc(1, sizeof(CsvWriteMeta), false);
if (!write_meta) {
@ -743,6 +778,7 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) {
goto end;
}
start_ts = toolsGetTimestampMs();
for (uint32_t i = 0; (i < write_meta->total_threads && !g_arguments->terminate); ++i) {
CsvThreadArgs* arg = &args[i];
arg->write_meta = write_meta;
@ -766,6 +802,20 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) {
pthread_join(pids[i], NULL);
}
// statistics
total_rows = 0;
for (uint32_t i = 0; i < write_meta->total_threads; ++i) {
CsvThreadArgs* arg = &args[i];
CsvThreadMeta* thread_meta = &arg->thread_meta;
total_rows += thread_meta->total_rows;
}
ts_elapse = toolsGetTimestampMs() - start_ts;
if (ts_elapse > 0) {
succPrint("Spent %.6f seconds to insert rows: %" PRIu64 " with %d thread(s) into %s, at a rate of %.2f rows/s.\n",
ts_elapse / 1000.0, total_rows, write_meta->total_threads, g_arguments->output_path, total_rows * 1000.0 / ts_elapse);
}
end:
tmfree(pids);
tmfree(args);
@ -859,14 +909,14 @@ static int csvParseParameter() {
// csv_output_path
size_t len = strlen(g_arguments->output_path);
if (len == 0) {
errorPrint("Failed to generate CSV files, the specified output path is empty. Please provide a valid path. database: %s, super table: %s.\n",
errorPrint("Failed to generate csv files, the specified output path is empty. Please provide a valid path. database: %s, super table: %s.\n",
db->dbName, stb->stbName);
return -1;
}
if (g_arguments->output_path[len - 1] != '/') {
int n = snprintf(g_arguments->output_path_buf, sizeof(g_arguments->output_path_buf), "%s/", g_arguments->output_path);
if (n < 0 || n >= sizeof(g_arguments->output_path_buf)) {
errorPrint("Failed to generate CSV files, path buffer overflow risk when appending '/'. path: %s, database: %s, super table: %s.\n",
errorPrint("Failed to generate csv files, path buffer overflow risk when appending '/'. path: %s, database: %s, super table: %s.\n",
g_arguments->csv_output_path, db->dbName, stb->stbName);
return -1;
}
@ -876,7 +926,7 @@ static int csvParseParameter() {
// csv_ts_format
if (g_arguments->csv_ts_format) {
if (csvValidateParamTsFormat(g_arguments->csv_ts_format) != 0) {
errorPrint("Failed to generate CSV files, the parameter `csv_ts_format` is invalid. csv_ts_format: %s, database: %s, super table: %s.\n",
errorPrint("Failed to generate csv files, the parameter `csv_ts_format` is invalid. csv_ts_format: %s, database: %s, super table: %s.\n",
g_arguments->csv_ts_format, db->dbName, stb->stbName);
return -1;
}
@ -885,7 +935,7 @@ static int csvParseParameter() {
// csv_ts_interval
long csv_ts_intv_secs = csvValidateParamTsInterval(g_arguments->csv_ts_interval);
if (csv_ts_intv_secs <= 0) {
errorPrint("Failed to generate CSV files, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s, database: %s, super table: %s.\n",
errorPrint("Failed to generate csv files, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s, database: %s, super table: %s.\n",
g_arguments->csv_ts_interval, db->dbName, stb->stbName);
return -1;
}
@ -910,7 +960,7 @@ static int csvWriteThread() {
// gen csv
int ret = csvGenStb(db, stb);
if(ret != 0) {
errorPrint("Failed to generate CSV files. database: %s, super table: %s, error code: %d.\n",
errorPrint("Failed to generate csv files. database: %s, super table: %s, error code: %d.\n",
db->dbName, stb->stbName, ret);
return -1;
}
@ -928,14 +978,14 @@ int csvTestProcess() {
return -1;
}
infoPrint("Starting to output data to CSV files in directory: %s ...\n", g_arguments->output_path);
infoPrint("Starting to output data to csv files in directory: %s ...\n", g_arguments->output_path);
int64_t start = toolsGetTimestampMs();
int ret = csvWriteThread();
if (ret != 0) {
return -1;
}
int64_t delay = toolsGetTimestampMs() - start;
infoPrint("Generating CSV files in directory: %s has been completed. Time elapsed: %.3f seconds\n",
g_arguments->output_path, delay / 1000.0);
int64_t elapse = toolsGetTimestampMs() - start;
infoPrint("Generating csv files in directory: %s has been completed. Time elapsed: %.3f seconds\n",
g_arguments->output_path, elapse / 1000.0);
return 0;
}