From 1b2afe31edeb61051e7a58f510c29feb5d9a3e82 Mon Sep 17 00:00:00 2001 From: Yaming Pei Date: Wed, 5 Mar 2025 18:15:02 +0800 Subject: [PATCH] fix: fix bug in time slice window calculation --- tools/taos-tools/inc/benchCsv.h | 2 + tools/taos-tools/src/benchCsv.c | 89 ++++++++++++++++++++++++++------- 2 files changed, 74 insertions(+), 17 deletions(-) diff --git a/tools/taos-tools/inc/benchCsv.h b/tools/taos-tools/inc/benchCsv.h index e80f73bcda..624bcadedc 100644 --- a/tools/taos-tools/inc/benchCsv.h +++ b/tools/taos-tools/inc/benchCsv.h @@ -76,6 +76,8 @@ typedef struct { uint64_t total_rows; time_t start_secs; time_t end_secs; + int64_t start_ts; + int64_t end_ts; size_t thread_id; bool output_header; int tags_buf_size; diff --git a/tools/taos-tools/src/benchCsv.c b/tools/taos-tools/src/benchCsv.c index dd6ce3360a..b498214468 100644 --- a/tools/taos-tools/src/benchCsv.c +++ b/tools/taos-tools/src/benchCsv.c @@ -136,7 +136,43 @@ static int csvParseStbParameter(SSuperTable* stb) { } -static time_t csvGetStartSeconds(int precision, int64_t start_ts) { +static time_t csvAlignTimestamp(time_t seconds, const char* ts_format) { + struct tm aligned_tm; +#ifdef _WIN32 + localtime_s(&aligned_tm, &seconds); +#else + localtime_r(&seconds, &aligned_tm); +#endif + + int has_Y = 0, has_m = 0, has_d = 0, has_H = 0, has_M = 0, has_S = 0; + const char* p = ts_format; + while (*p) { + if (*p == '%') { + p++; + switch (*p) { + case 'Y': has_Y = 1; break; + case 'm': has_m = 1; break; + case 'd': has_d = 1; break; + case 'H': has_H = 1; break; + case 'M': has_M = 1; break; + case 'S': has_S = 1; break; + } + } + p++; + } + + if (!has_S) aligned_tm.tm_sec = 0; + if (!has_M) aligned_tm.tm_min = 0; + if (!has_H) aligned_tm.tm_hour = 0; + if (!has_d) aligned_tm.tm_mday = 1; + if (!has_m) aligned_tm.tm_mon = 0; + if (!has_Y) aligned_tm.tm_year = 0; + + return mktime(&aligned_tm); +} + + +static time_t csvGetStartSeconds(int precision, int64_t start_ts, const char* csv_ts_format) { time_t start_seconds = 0; if (precision == TSDB_TIME_PRECISION_MICRO) { @@ -146,17 +182,17 @@ static time_t csvGetStartSeconds(int precision, int64_t start_ts) { } else { start_seconds = start_ts / 1000L; } - return start_seconds; + return csvAlignTimestamp(start_seconds, csv_ts_format); } static void csvConvertTime2String(time_t time_value, char* ts_format, char* time_buf, size_t buf_size) { struct tm tm_result; - char *old_locale = setlocale(LC_TIME, "C"); + char* old_locale = setlocale(LC_TIME, "C"); #ifdef _WIN32 - gmtime_s(&tm_result, &time_value); + localtime_s(&tm_result, &time_value); #else - gmtime_r(&time_value, &tm_result); + localtime_r(&time_value, &tm_result); #endif strftime(time_buf, buf_size, ts_format, &tm_result); if (old_locale) { @@ -183,17 +219,29 @@ static CsvNamingType csvGetFileNamingType(SSuperTable* stb) { } -static void csvCalcTimestampStep(CsvWriteMeta* write_meta) { - time_t ts_step = 0; +static time_t csvCalcTimestampFromSeconds(int precision, time_t secs) { + time_t ts = 0; - if (write_meta->db->precision == TSDB_TIME_PRECISION_MICRO) { - ts_step = write_meta->stb->csv_ts_intv_secs * 1000000L; - } else if (write_meta->db->precision == TSDB_TIME_PRECISION_NANO) { - ts_step = write_meta->stb->csv_ts_intv_secs * 1000000000L; + if (precision == TSDB_TIME_PRECISION_MICRO) { + ts = secs * 1000000L; + } else if (precision == TSDB_TIME_PRECISION_NANO) { + ts = secs * 1000000000L; } else { - ts_step = write_meta->stb->csv_ts_intv_secs * 1000L; + ts = secs * 1000L; } - write_meta->ts_step = ts_step; + return ts; +} + + +static void csvCalcTimestampStep(CsvWriteMeta* write_meta) { + write_meta->ts_step = csvCalcTimestampFromSeconds(write_meta->db->precision, write_meta->stb->csv_ts_intv_secs); + return; +} + + +static void csvCalcSliceTimestamp(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) { + thread_meta->start_ts = csvCalcTimestampFromSeconds(write_meta->db->precision, thread_meta->start_secs); + thread_meta->end_ts = csvCalcTimestampFromSeconds(write_meta->db->precision, thread_meta->end_secs); return; } @@ -624,6 +672,8 @@ static void csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id, CsvT thread_meta->ctb_count = 0; thread_meta->start_secs = 0; thread_meta->end_secs = 0; + thread_meta->start_ts = write_meta->start_ts; + thread_meta->end_ts = write_meta->end_ts; thread_meta->thread_id = thread_id; thread_meta->output_header = false; thread_meta->tags_buf_size = 0; @@ -639,8 +689,9 @@ static void csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id, CsvT } case CSV_NAMING_I_TIME_SLICE: case CSV_NAMING_B_THREAD_TIME_SLICE: { - thread_meta->start_secs = csvGetStartSeconds(db->precision, stb->startTimestamp); + thread_meta->start_secs = csvGetStartSeconds(db->precision, stb->startTimestamp, stb->csv_ts_format); thread_meta->end_secs = thread_meta->start_secs + write_meta->stb->csv_ts_intv_secs; + csvCalcSliceTimestamp(write_meta, thread_meta); break; } default: { @@ -654,6 +705,7 @@ static void csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id, CsvT static void csvUpdateSliceRange(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta, int64_t last_end_ts) { SDataBase* db = write_meta->db; + SSuperTable* stb = write_meta->stb; switch (write_meta->naming_type) { case CSV_NAMING_I_SINGLE: @@ -662,8 +714,9 @@ static void csvUpdateSliceRange(CsvWriteMeta* write_meta, CsvThreadMeta* thread_ } case CSV_NAMING_I_TIME_SLICE: case CSV_NAMING_B_THREAD_TIME_SLICE: { - thread_meta->start_secs = csvGetStartSeconds(db->precision, last_end_ts); + thread_meta->start_secs = csvGetStartSeconds(db->precision, last_end_ts, stb->csv_ts_format); thread_meta->end_secs = thread_meta->start_secs + write_meta->stb->csv_ts_intv_secs; + csvCalcSliceTimestamp(write_meta, thread_meta); break; } default: { @@ -1063,7 +1116,8 @@ static void* csvGenStbThread(void* arg) { thread_meta->cols_buf = &cols_buf; start_print_ts = toolsGetTimestampMs(); - for (cur_ts = write_meta->start_ts; cur_ts < write_meta->end_ts; cur_ts += write_meta->ts_step) { + cur_ts = write_meta->start_ts; + while (cur_ts < write_meta->end_ts) { // get filename ret = csvGetFileFullname(write_meta, thread_meta, fullname, sizeof(fullname)); if (ret < 0) { @@ -1083,7 +1137,7 @@ static void* csvGenStbThread(void* arg) { thread_meta->output_header = stb->csv_output_header; slice_cur_ts = cur_ts; - slice_end_ts = MIN(cur_ts + write_meta->ts_step, write_meta->end_ts); + slice_end_ts = MIN(thread_meta->end_ts, write_meta->end_ts); file_rows = 0; pre_print_ts = toolsGetTimestampMs(); @@ -1129,6 +1183,7 @@ static void* csvGenStbThread(void* arg) { } csvClose(fhdl); + cur_ts = thread_meta->end_ts; csvUpdateSliceRange(write_meta, thread_meta, slice_end_ts); }