fix: fix bug in time slice window calculation

This commit is contained in:
Yaming Pei 2025-03-05 18:15:02 +08:00
parent 47ded1b71d
commit 1b2afe31ed
2 changed files with 74 additions and 17 deletions

View File

@ -76,6 +76,8 @@ typedef struct {
uint64_t total_rows;
time_t start_secs;
time_t end_secs;
int64_t start_ts;
int64_t end_ts;
size_t thread_id;
bool output_header;
int tags_buf_size;

View File

@ -136,7 +136,43 @@ static int csvParseStbParameter(SSuperTable* stb) {
}
static time_t csvGetStartSeconds(int precision, int64_t start_ts) {
static time_t csvAlignTimestamp(time_t seconds, const char* ts_format) {
struct tm aligned_tm;
#ifdef _WIN32
localtime_s(&aligned_tm, &seconds);
#else
localtime_r(&seconds, &aligned_tm);
#endif
int has_Y = 0, has_m = 0, has_d = 0, has_H = 0, has_M = 0, has_S = 0;
const char* p = ts_format;
while (*p) {
if (*p == '%') {
p++;
switch (*p) {
case 'Y': has_Y = 1; break;
case 'm': has_m = 1; break;
case 'd': has_d = 1; break;
case 'H': has_H = 1; break;
case 'M': has_M = 1; break;
case 'S': has_S = 1; break;
}
}
p++;
}
if (!has_S) aligned_tm.tm_sec = 0;
if (!has_M) aligned_tm.tm_min = 0;
if (!has_H) aligned_tm.tm_hour = 0;
if (!has_d) aligned_tm.tm_mday = 1;
if (!has_m) aligned_tm.tm_mon = 0;
if (!has_Y) aligned_tm.tm_year = 0;
return mktime(&aligned_tm);
}
static time_t csvGetStartSeconds(int precision, int64_t start_ts, const char* csv_ts_format) {
time_t start_seconds = 0;
if (precision == TSDB_TIME_PRECISION_MICRO) {
@ -146,17 +182,17 @@ static time_t csvGetStartSeconds(int precision, int64_t start_ts) {
} else {
start_seconds = start_ts / 1000L;
}
return start_seconds;
return csvAlignTimestamp(start_seconds, csv_ts_format);
}
static void csvConvertTime2String(time_t time_value, char* ts_format, char* time_buf, size_t buf_size) {
struct tm tm_result;
char *old_locale = setlocale(LC_TIME, "C");
char* old_locale = setlocale(LC_TIME, "C");
#ifdef _WIN32
gmtime_s(&tm_result, &time_value);
localtime_s(&tm_result, &time_value);
#else
gmtime_r(&time_value, &tm_result);
localtime_r(&time_value, &tm_result);
#endif
strftime(time_buf, buf_size, ts_format, &tm_result);
if (old_locale) {
@ -183,17 +219,29 @@ static CsvNamingType csvGetFileNamingType(SSuperTable* stb) {
}
static void csvCalcTimestampStep(CsvWriteMeta* write_meta) {
time_t ts_step = 0;
static time_t csvCalcTimestampFromSeconds(int precision, time_t secs) {
time_t ts = 0;
if (write_meta->db->precision == TSDB_TIME_PRECISION_MICRO) {
ts_step = write_meta->stb->csv_ts_intv_secs * 1000000L;
} else if (write_meta->db->precision == TSDB_TIME_PRECISION_NANO) {
ts_step = write_meta->stb->csv_ts_intv_secs * 1000000000L;
if (precision == TSDB_TIME_PRECISION_MICRO) {
ts = secs * 1000000L;
} else if (precision == TSDB_TIME_PRECISION_NANO) {
ts = secs * 1000000000L;
} else {
ts_step = write_meta->stb->csv_ts_intv_secs * 1000L;
ts = secs * 1000L;
}
write_meta->ts_step = ts_step;
return ts;
}
static void csvCalcTimestampStep(CsvWriteMeta* write_meta) {
write_meta->ts_step = csvCalcTimestampFromSeconds(write_meta->db->precision, write_meta->stb->csv_ts_intv_secs);
return;
}
static void csvCalcSliceTimestamp(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) {
thread_meta->start_ts = csvCalcTimestampFromSeconds(write_meta->db->precision, thread_meta->start_secs);
thread_meta->end_ts = csvCalcTimestampFromSeconds(write_meta->db->precision, thread_meta->end_secs);
return;
}
@ -624,6 +672,8 @@ static void csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id, CsvT
thread_meta->ctb_count = 0;
thread_meta->start_secs = 0;
thread_meta->end_secs = 0;
thread_meta->start_ts = write_meta->start_ts;
thread_meta->end_ts = write_meta->end_ts;
thread_meta->thread_id = thread_id;
thread_meta->output_header = false;
thread_meta->tags_buf_size = 0;
@ -639,8 +689,9 @@ static void csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id, CsvT
}
case CSV_NAMING_I_TIME_SLICE:
case CSV_NAMING_B_THREAD_TIME_SLICE: {
thread_meta->start_secs = csvGetStartSeconds(db->precision, stb->startTimestamp);
thread_meta->start_secs = csvGetStartSeconds(db->precision, stb->startTimestamp, stb->csv_ts_format);
thread_meta->end_secs = thread_meta->start_secs + write_meta->stb->csv_ts_intv_secs;
csvCalcSliceTimestamp(write_meta, thread_meta);
break;
}
default: {
@ -654,6 +705,7 @@ static void csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id, CsvT
static void csvUpdateSliceRange(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta, int64_t last_end_ts) {
SDataBase* db = write_meta->db;
SSuperTable* stb = write_meta->stb;
switch (write_meta->naming_type) {
case CSV_NAMING_I_SINGLE:
@ -662,8 +714,9 @@ static void csvUpdateSliceRange(CsvWriteMeta* write_meta, CsvThreadMeta* thread_
}
case CSV_NAMING_I_TIME_SLICE:
case CSV_NAMING_B_THREAD_TIME_SLICE: {
thread_meta->start_secs = csvGetStartSeconds(db->precision, last_end_ts);
thread_meta->start_secs = csvGetStartSeconds(db->precision, last_end_ts, stb->csv_ts_format);
thread_meta->end_secs = thread_meta->start_secs + write_meta->stb->csv_ts_intv_secs;
csvCalcSliceTimestamp(write_meta, thread_meta);
break;
}
default: {
@ -1063,7 +1116,8 @@ static void* csvGenStbThread(void* arg) {
thread_meta->cols_buf = &cols_buf;
start_print_ts = toolsGetTimestampMs();
for (cur_ts = write_meta->start_ts; cur_ts < write_meta->end_ts; cur_ts += write_meta->ts_step) {
cur_ts = write_meta->start_ts;
while (cur_ts < write_meta->end_ts) {
// get filename
ret = csvGetFileFullname(write_meta, thread_meta, fullname, sizeof(fullname));
if (ret < 0) {
@ -1083,7 +1137,7 @@ static void* csvGenStbThread(void* arg) {
thread_meta->output_header = stb->csv_output_header;
slice_cur_ts = cur_ts;
slice_end_ts = MIN(cur_ts + write_meta->ts_step, write_meta->end_ts);
slice_end_ts = MIN(thread_meta->end_ts, write_meta->end_ts);
file_rows = 0;
pre_print_ts = toolsGetTimestampMs();
@ -1129,6 +1183,7 @@ static void* csvGenStbThread(void* arg) {
}
csvClose(fhdl);
cur_ts = thread_meta->end_ts;
csvUpdateSliceRange(write_meta, thread_meta, slice_end_ts);
}