feat: Supports csv parameters at the super table level

This commit is contained in:
Yaming Pei 2025-03-05 10:03:46 +08:00
parent 0c55e8a0ad
commit 47ded1b71d
3 changed files with 214 additions and 201 deletions

View File

@ -479,6 +479,13 @@ typedef struct SChildTable_S {
int32_t pkCnt;
} SChildTable;
typedef enum {
CSV_COMPRESS_NONE = 0,
CSV_COMPRESS_FAST = 1,
CSV_COMPRESS_BALANCE = 6,
CSV_COMPRESS_BEST = 9
} CsvCompressionLevel;
#define PRIMARY_KEY "PRIMARY KEY"
typedef struct SSuperTable_S {
char *stbName;
@ -581,6 +588,15 @@ typedef struct SSuperTable_S {
// execute sqls after create super table
char **sqls;
char* csv_file_prefix;
char* csv_ts_format;
char* csv_ts_interval;
char* csv_tbname_alias;
long csv_ts_intv_secs;
bool csv_output_header;
CsvCompressionLevel csv_compress_level;
} SSuperTable;
typedef struct SDbCfg_S {
@ -719,14 +735,6 @@ typedef struct STmqMetaInfo_S {
uint16_t iface;
} STmqMetaInfo;
typedef enum {
CSV_COMPRESS_NONE = 0,
CSV_COMPRESS_FAST = 1,
CSV_COMPRESS_BALANCE = 6,
CSV_COMPRESS_BEST = 9
} CsvCompressionLevel;
typedef struct SArguments_S {
uint8_t taosc_version;
char * metaFile;
@ -791,14 +799,6 @@ typedef struct SArguments_S {
char* output_path;
char output_path_buf[MAX_PATH_LEN];
char* csv_file_prefix;
char* csv_ts_format;
char* csv_ts_interval;
char* csv_tbname_alias;
long csv_ts_intv_secs;
bool csv_output_header;
CsvCompressionLevel csv_compress_level;
} SArguments;

View File

@ -31,6 +31,111 @@
static int csvValidateParamTsFormat(const char* csv_ts_format) {
if (!csv_ts_format) return 0;
struct tm test_tm = {
.tm_year = 70,
.tm_mon = 0,
.tm_mday = 1,
.tm_hour = 0,
.tm_min = 0,
.tm_sec = 0,
.tm_isdst = -1
};
mktime(&test_tm);
char buffer[1024];
size_t len = strftime(buffer, sizeof(buffer), csv_ts_format, &test_tm);
if (len == 0) {
return -1;
}
#ifdef _WIN32
const char* invalid_chars = "/\\:*?\"<>|";
#else
const char* invalid_chars = "/\\?\"<>|";
#endif
if (strpbrk(buffer, invalid_chars) != NULL) {
return -1;
}
return 0;
}
static long csvValidateParamTsInterval(const char* csv_ts_interval) {
if (!csv_ts_interval || *csv_ts_interval == '\0') return -1;
char* endptr;
errno = 0;
const long num = strtol(csv_ts_interval, &endptr, 10);
if (errno == ERANGE ||
endptr == csv_ts_interval ||
num <= 0) {
return -1;
}
if (*endptr == '\0' ||
*(endptr + 1) != '\0') {
return -1;
}
switch (tolower(*endptr)) {
case 's': return num;
case 'm': return num * 60;
case 'h': return num * 60 * 60;
case 'd': return num * 60 * 60 * 24;
default : return -1;
}
}
static int csvParseParameter() {
// csv_output_path
size_t len = strlen(g_arguments->output_path);
if (len == 0) {
errorPrint("Failed to generate csv files, the specified output path is empty. Please provide a valid path.\n");
return -1;
}
if (g_arguments->output_path[len - 1] != '/') {
int n = snprintf(g_arguments->output_path_buf, sizeof(g_arguments->output_path_buf), "%s/", g_arguments->output_path);
if (n < 0 || n >= sizeof(g_arguments->output_path_buf)) {
errorPrint("Failed to generate csv files, path buffer overflow risk when appending '/'. path: %s.\n",
g_arguments->output_path);
return -1;
}
g_arguments->output_path = g_arguments->output_path_buf;
}
return 0;
}
static int csvParseStbParameter(SSuperTable* stb) {
// csv_ts_format
if (stb->csv_ts_format) {
if (csvValidateParamTsFormat(stb->csv_ts_format) != 0) {
errorPrint("Failed to generate csv files, the parameter `csv_ts_format` is invalid. csv_ts_format: %s.\n",
stb->csv_ts_format);
return -1;
}
}
// csv_ts_interval
long csv_ts_intv_secs = csvValidateParamTsInterval(stb->csv_ts_interval);
if (csv_ts_intv_secs <= 0) {
errorPrint("Failed to generate csv files, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s.\n",
stb->csv_ts_interval);
return -1;
}
stb->csv_ts_intv_secs = csv_ts_intv_secs;
return 0;
}
static time_t csvGetStartSeconds(int precision, int64_t start_ts) {
time_t start_seconds = 0;
@ -45,7 +150,7 @@ static time_t csvGetStartSeconds(int precision, int64_t start_ts) {
}
static void csvConvertTime2String(time_t time_value, char* time_buf, size_t buf_size) {
static void csvConvertTime2String(time_t time_value, char* ts_format, char* time_buf, size_t buf_size) {
struct tm tm_result;
char *old_locale = setlocale(LC_TIME, "C");
#ifdef _WIN32
@ -53,7 +158,7 @@ static void csvConvertTime2String(time_t time_value, char* time_buf, size_t buf_
#else
gmtime_r(&time_value, &tm_result);
#endif
strftime(time_buf, buf_size, g_arguments->csv_ts_format, &tm_result);
strftime(time_buf, buf_size, ts_format, &tm_result);
if (old_locale) {
setlocale(LC_TIME, old_locale);
}
@ -63,13 +168,13 @@ static void csvConvertTime2String(time_t time_value, char* time_buf, size_t buf_
static CsvNamingType csvGetFileNamingType(SSuperTable* stb) {
if (stb->interlaceRows > 0) {
if (g_arguments->csv_ts_format) {
if (stb->csv_ts_format) {
return CSV_NAMING_I_TIME_SLICE;
} else {
return CSV_NAMING_I_SINGLE;
}
} else {
if (g_arguments->csv_ts_format) {
if (stb->csv_ts_format) {
return CSV_NAMING_B_THREAD_TIME_SLICE;
} else {
return CSV_NAMING_B_THREAD;
@ -82,11 +187,11 @@ static void csvCalcTimestampStep(CsvWriteMeta* write_meta) {
time_t ts_step = 0;
if (write_meta->db->precision == TSDB_TIME_PRECISION_MICRO) {
ts_step = g_arguments->csv_ts_intv_secs * 1000000L;
ts_step = write_meta->stb->csv_ts_intv_secs * 1000000L;
} else if (write_meta->db->precision == TSDB_TIME_PRECISION_NANO) {
ts_step = g_arguments->csv_ts_intv_secs * 1000000000L;
ts_step = write_meta->stb->csv_ts_intv_secs * 1000000000L;
} else {
ts_step = g_arguments->csv_ts_intv_secs * 1000L;
ts_step = write_meta->stb->csv_ts_intv_secs * 1000L;
}
write_meta->ts_step = ts_step;
return;
@ -145,7 +250,7 @@ static int csvGenCsvHeader(CsvWriteMeta* write_meta) {
int pos = 0;
int size = sizeof(write_meta->csv_header);
if (!g_arguments->csv_output_header) {
if (!write_meta->stb->csv_output_header) {
return 0;
}
@ -159,7 +264,7 @@ static int csvGenCsvHeader(CsvWriteMeta* write_meta) {
}
// tbname
pos += snprintf(buf + pos, size - pos, ",%s", g_arguments->csv_tbname_alias);
pos += snprintf(buf + pos, size - pos, ",%s", write_meta->stb->csv_tbname_alias);
// tags
for (size_t i = 0; i < stb->tags->size; ++i) {
@ -479,23 +584,23 @@ static int csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* write
switch (write_meta->naming_type) {
case CSV_NAMING_I_SINGLE: {
(void)snprintf(write_meta->mode, sizeof(write_meta->mode), "interlace|no-time-slice");
(void)snprintf(write_meta->mode, sizeof(write_meta->mode), "interlace::normal");
break;
}
case CSV_NAMING_I_TIME_SLICE: {
(void)snprintf(write_meta->mode, sizeof(write_meta->mode), "interlace|time-slice");
(void)snprintf(write_meta->mode, sizeof(write_meta->mode), "interlace::time-slice");
csvCalcTimestampStep(write_meta);
break;
}
case CSV_NAMING_B_THREAD: {
write_meta->total_threads = MIN(g_arguments->nthreads, stb->childTblCount);
(void)snprintf(write_meta->mode, sizeof(write_meta->mode), "batch[%zu]|no-time-slice", write_meta->total_threads);
(void)snprintf(write_meta->mode, sizeof(write_meta->mode), "batch[%zu]::normal", write_meta->total_threads);
csvGenThreadFormatter(write_meta);
break;
}
case CSV_NAMING_B_THREAD_TIME_SLICE: {
write_meta->total_threads = MIN(g_arguments->nthreads, stb->childTblCount);
(void)snprintf(write_meta->mode, sizeof(write_meta->mode), "batch[%zu]|time-slice", write_meta->total_threads);
(void)snprintf(write_meta->mode, sizeof(write_meta->mode), "batch[%zu]::time-slice", write_meta->total_threads);
csvGenThreadFormatter(write_meta);
csvCalcTimestampStep(write_meta);
break;
@ -535,7 +640,7 @@ static void csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id, CsvT
case CSV_NAMING_I_TIME_SLICE:
case CSV_NAMING_B_THREAD_TIME_SLICE: {
thread_meta->start_secs = csvGetStartSeconds(db->precision, stb->startTimestamp);
thread_meta->end_secs = thread_meta->start_secs + g_arguments->csv_ts_intv_secs;
thread_meta->end_secs = thread_meta->start_secs + write_meta->stb->csv_ts_intv_secs;
break;
}
default: {
@ -558,7 +663,7 @@ static void csvUpdateSliceRange(CsvWriteMeta* write_meta, CsvThreadMeta* thread_
case CSV_NAMING_I_TIME_SLICE:
case CSV_NAMING_B_THREAD_TIME_SLICE: {
thread_meta->start_secs = csvGetStartSeconds(db->precision, last_end_ts);
thread_meta->end_secs = thread_meta->start_secs + g_arguments->csv_ts_intv_secs;
thread_meta->end_secs = thread_meta->start_secs + write_meta->stb->csv_ts_intv_secs;
break;
}
default: {
@ -570,8 +675,8 @@ static void csvUpdateSliceRange(CsvWriteMeta* write_meta, CsvThreadMeta* thread_
}
static const char* csvGetGzipFilePrefix() {
if (g_arguments->csv_compress_level == CSV_COMPRESS_NONE) {
static const char* csvGetGzipFilePrefix(CsvCompressionLevel csv_compress_level) {
if (csv_compress_level == CSV_COMPRESS_NONE) {
return "";
} else {
return ".gz";
@ -585,8 +690,8 @@ static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_me
char end_time_buf[MIDDLE_BUFF_LEN];
int ret = -1;
const char* base_path = g_arguments->output_path;
const char* file_prefix = g_arguments->csv_file_prefix;
const char* gzip_suffix = csvGetGzipFilePrefix();
const char* file_prefix = write_meta->stb->csv_file_prefix;
const char* gzip_suffix = csvGetGzipFilePrefix(write_meta->stb->csv_compress_level);
switch (write_meta->naming_type) {
case CSV_NAMING_I_SINGLE: {
@ -594,8 +699,8 @@ static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_me
break;
}
case CSV_NAMING_I_TIME_SLICE: {
csvConvertTime2String(thread_meta->start_secs, start_time_buf, sizeof(start_time_buf));
csvConvertTime2String(thread_meta->end_secs, end_time_buf, sizeof(end_time_buf));
csvConvertTime2String(thread_meta->start_secs, write_meta->stb->csv_ts_format, start_time_buf, sizeof(start_time_buf));
csvConvertTime2String(thread_meta->end_secs, write_meta->stb->csv_ts_format, end_time_buf, sizeof(end_time_buf));
ret = snprintf(fullname, size, "%s%s_%s_%s.csv%s", base_path, file_prefix, start_time_buf, end_time_buf, gzip_suffix);
break;
}
@ -606,8 +711,8 @@ static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_me
}
case CSV_NAMING_B_THREAD_TIME_SLICE: {
(void)snprintf(thread_buf, sizeof(thread_buf), write_meta->thread_formatter, thread_meta->thread_id);
csvConvertTime2String(thread_meta->start_secs, start_time_buf, sizeof(start_time_buf));
csvConvertTime2String(thread_meta->end_secs, end_time_buf, sizeof(end_time_buf));
csvConvertTime2String(thread_meta->start_secs, write_meta->stb->csv_ts_format, start_time_buf, sizeof(start_time_buf));
csvConvertTime2String(thread_meta->end_secs, write_meta->stb->csv_ts_format, end_time_buf, sizeof(end_time_buf));
ret = snprintf(fullname, size, "%s%s_%s_%s_%s.csv%s", base_path, file_prefix, thread_buf, start_time_buf, end_time_buf, gzip_suffix);
break;
}
@ -968,7 +1073,7 @@ static void* csvGenStbThread(void* arg) {
}
// create fd
fhdl = csvOpen(fullname, g_arguments->csv_compress_level);
fhdl = csvOpen(fullname, stb->csv_compress_level);
if (fhdl == NULL) {
errorPrint("Failed to create csv file. thread index: %zu, file: %s, errno: %d, strerror: %s.\n",
thread_meta->thread_id, fullname, errno, strerror(errno));
@ -976,7 +1081,7 @@ static void* csvGenStbThread(void* arg) {
}
thread_meta->output_header = g_arguments->csv_output_header;
thread_meta->output_header = stb->csv_output_header;
slice_cur_ts = cur_ts;
slice_end_ts = MIN(cur_ts + write_meta->ts_step, write_meta->end_ts);
file_rows = 0;
@ -1148,106 +1253,6 @@ static int csvGenStb(SDataBase* db, SSuperTable* stb) {
}
static int csvValidateParamTsFormat(const char* csv_ts_format) {
if (!csv_ts_format) return 0;
struct tm test_tm = {
.tm_year = 70,
.tm_mon = 0,
.tm_mday = 1,
.tm_hour = 0,
.tm_min = 0,
.tm_sec = 0,
.tm_isdst = -1
};
mktime(&test_tm);
char buffer[1024];
size_t len = strftime(buffer, sizeof(buffer), csv_ts_format, &test_tm);
if (len == 0) {
return -1;
}
#ifdef _WIN32
const char* invalid_chars = "/\\:*?\"<>|";
#else
const char* invalid_chars = "/\\?\"<>|";
#endif
if (strpbrk(buffer, invalid_chars) != NULL) {
return -1;
}
return 0;
}
static long csvValidateParamTsInterval(const char* csv_ts_interval) {
if (!csv_ts_interval || *csv_ts_interval == '\0') return -1;
char* endptr;
errno = 0;
const long num = strtol(csv_ts_interval, &endptr, 10);
if (errno == ERANGE ||
endptr == csv_ts_interval ||
num <= 0) {
return -1;
}
if (*endptr == '\0' ||
*(endptr + 1) != '\0') {
return -1;
}
switch (tolower(*endptr)) {
case 's': return num;
case 'm': return num * 60;
case 'h': return num * 60 * 60;
case 'd': return num * 60 * 60 * 24;
default : return -1;
}
}
static int csvParseParameter() {
// csv_output_path
size_t len = strlen(g_arguments->output_path);
if (len == 0) {
errorPrint("Failed to generate csv files, the specified output path is empty. Please provide a valid path.\n");
return -1;
}
if (g_arguments->output_path[len - 1] != '/') {
int n = snprintf(g_arguments->output_path_buf, sizeof(g_arguments->output_path_buf), "%s/", g_arguments->output_path);
if (n < 0 || n >= sizeof(g_arguments->output_path_buf)) {
errorPrint("Failed to generate csv files, path buffer overflow risk when appending '/'. path: %s.\n",
g_arguments->output_path);
return -1;
}
g_arguments->output_path = g_arguments->output_path_buf;
}
// csv_ts_format
if (g_arguments->csv_ts_format) {
if (csvValidateParamTsFormat(g_arguments->csv_ts_format) != 0) {
errorPrint("Failed to generate csv files, the parameter `csv_ts_format` is invalid. csv_ts_format: %s.\n",
g_arguments->csv_ts_format);
return -1;
}
}
// csv_ts_interval
long csv_ts_intv_secs = csvValidateParamTsInterval(g_arguments->csv_ts_interval);
if (csv_ts_intv_secs <= 0) {
errorPrint("Failed to generate csv files, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s.\n",
g_arguments->csv_ts_interval);
return -1;
}
g_arguments->csv_ts_intv_secs = csv_ts_intv_secs;
return 0;
}
static int csvWriteThread() {
for (size_t i = 0; i < g_arguments->databases->size && !g_arguments->terminate; ++i) {
// database
@ -1260,8 +1265,16 @@ static int csvWriteThread() {
continue;
}
// parsing parameters
int ret = csvParseStbParameter(stb);
if (ret != 0) {
errorPrint("Failed to parse csv parameter. database: %s, super table: %s, error code: %d.\n",
db->dbName, stb->stbName, ret);
return -1;
}
// gen csv
int ret = csvGenStb(db, stb);
ret = csvGenStb(db, stb);
if(ret != 0) {
errorPrint("Failed to generate csv files. database: %s, super table: %s, error code: %d.\n",
db->dbName, stb->stbName, ret);

View File

@ -1405,6 +1405,65 @@ static int getStableInfo(tools_cJSON *dbinfos, int index) {
}
}
}
// csv file prefix
tools_cJSON* csv_fp = tools_cJSON_GetObjectItem(stbInfo, "csv_file_prefix");
if (csv_fp && csv_fp->type == tools_cJSON_String && csv_fp->valuestring != NULL) {
superTable->csv_file_prefix = csv_fp->valuestring;
} else {
superTable->csv_file_prefix = "data";
}
// csv timestamp format
tools_cJSON* csv_tf = tools_cJSON_GetObjectItem(stbInfo, "csv_ts_format");
if (csv_tf && csv_tf->type == tools_cJSON_String && csv_tf->valuestring != NULL) {
superTable->csv_ts_format = csv_tf->valuestring;
} else {
superTable->csv_ts_format = NULL;
}
// csv timestamp format
tools_cJSON* csv_ti = tools_cJSON_GetObjectItem(stbInfo, "csv_ts_interval");
if (csv_ti && csv_ti->type == tools_cJSON_String && csv_ti->valuestring != NULL) {
superTable->csv_ts_interval = csv_ti->valuestring;
} else {
superTable->csv_ts_interval = "1d";
}
// csv output header
superTable->csv_output_header = true;
tools_cJSON* oph = tools_cJSON_GetObjectItem(stbInfo, "csv_output_header");
if (oph && oph->type == tools_cJSON_String && oph->valuestring != NULL) {
if (0 == strcasecmp(oph->valuestring, "yes") || 0 == strcasecmp(oph->valuestring, "true")) {
superTable->csv_output_header = true;
} else if (0 == strcasecmp(oph->valuestring, "no") || 0 == strcasecmp(oph->valuestring, "false")) {
superTable->csv_output_header = false;
}
}
// csv tbname alias
tools_cJSON* tba = tools_cJSON_GetObjectItem(stbInfo, "csv_tbname_alias");
if (tba && tba->type == tools_cJSON_String && tba->valuestring != NULL) {
superTable->csv_tbname_alias = tba->valuestring;
} else {
superTable->csv_tbname_alias = "device_id";
}
// csv compression level
tools_cJSON* cl = tools_cJSON_GetObjectItem(stbInfo, "csv_compress_level");
if (cl && cl->type == tools_cJSON_String && cl->valuestring != NULL) {
if (0 == strcasecmp(cl->valuestring, "none")) {
superTable->csv_compress_level = CSV_COMPRESS_NONE;
} else if (0 == strcasecmp(cl->valuestring, "fast")) {
superTable->csv_compress_level = CSV_COMPRESS_FAST;
} else if (0 == strcasecmp(cl->valuestring, "balance")) {
superTable->csv_compress_level = CSV_COMPRESS_BALANCE;
} else if (0 == strcasecmp(cl->valuestring, "best")) {
superTable->csv_compress_level = CSV_COMPRESS_BEST;
}
} else {
superTable->csv_compress_level = CSV_COMPRESS_NONE;
}
}
return 0;
}
@ -1595,65 +1654,6 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) {
}
(void)mkdir(g_arguments->output_path, 0775);
// csv file prefix
tools_cJSON* csv_fp = tools_cJSON_GetObjectItem(json, "csv_file_prefix");
if (csv_fp && csv_fp->type == tools_cJSON_String && csv_fp->valuestring != NULL) {
g_arguments->csv_file_prefix = csv_fp->valuestring;
} else {
g_arguments->csv_file_prefix = "data";
}
// csv timestamp format
tools_cJSON* csv_tf = tools_cJSON_GetObjectItem(json, "csv_ts_format");
if (csv_tf && csv_tf->type == tools_cJSON_String && csv_tf->valuestring != NULL) {
g_arguments->csv_ts_format = csv_tf->valuestring;
} else {
g_arguments->csv_ts_format = NULL;
}
// csv timestamp format
tools_cJSON* csv_ti = tools_cJSON_GetObjectItem(json, "csv_ts_interval");
if (csv_ti && csv_ti->type == tools_cJSON_String && csv_ti->valuestring != NULL) {
g_arguments->csv_ts_interval = csv_ti->valuestring;
} else {
g_arguments->csv_ts_interval = "1d";
}
// csv output header
g_arguments->csv_output_header = true;
tools_cJSON* oph = tools_cJSON_GetObjectItem(json, "csv_output_header");
if (oph && oph->type == tools_cJSON_String && oph->valuestring != NULL) {
if (0 == strcasecmp(oph->valuestring, "yes") || 0 == strcasecmp(oph->valuestring, "true")) {
g_arguments->csv_output_header = true;
} else if (0 == strcasecmp(oph->valuestring, "no") || 0 == strcasecmp(oph->valuestring, "false")) {
g_arguments->csv_output_header = false;
}
}
// csv tbname alias
tools_cJSON* tba = tools_cJSON_GetObjectItem(json, "csv_tbname_alias");
if (tba && tba->type == tools_cJSON_String && tba->valuestring != NULL) {
g_arguments->csv_tbname_alias = tba->valuestring;
} else {
g_arguments->csv_tbname_alias = "device_id";
}
// csv compression level
tools_cJSON* cl = tools_cJSON_GetObjectItem(json, "csv_compress_level");
if (cl && cl->type == tools_cJSON_String && cl->valuestring != NULL) {
if (0 == strcasecmp(cl->valuestring, "none")) {
g_arguments->csv_compress_level = CSV_COMPRESS_NONE;
} else if (0 == strcasecmp(cl->valuestring, "fast")) {
g_arguments->csv_compress_level = CSV_COMPRESS_FAST;
} else if (0 == strcasecmp(cl->valuestring, "balance")) {
g_arguments->csv_compress_level = CSV_COMPRESS_BALANCE;
} else if (0 == strcasecmp(cl->valuestring, "best")) {
g_arguments->csv_compress_level = CSV_COMPRESS_BEST;
}
} else {
g_arguments->csv_compress_level = CSV_COMPRESS_NONE;
}
code = 0;
return code;
}