enh: csv-related parameters validity check

This commit is contained in:
Yaming Pei 2025-02-21 17:04:41 +08:00
parent 4fe7ed9a75
commit b93428432c
4 changed files with 205 additions and 77 deletions

View File

@ -781,10 +781,12 @@ typedef struct SArguments_S {
bool escape_character; bool escape_character;
bool pre_load_tb_meta; bool pre_load_tb_meta;
char* csv_output_dir; char* csv_output_path;
char csv_output_path_buf[MAX_PATH_LEN];
char* csv_file_prefix; char* csv_file_prefix;
char* csv_ts_format; char* csv_ts_format;
char* csv_ts_interval; char* csv_ts_interval;
long csv_ts_intv_secs;
bool bind_vgroup; bool bind_vgroup;
} SArguments; } SArguments;

View File

@ -10,6 +10,11 @@
* FITNESS FOR A PARTICULAR PURPOSE. * FITNESS FOR A PARTICULAR PURPOSE.
*/ */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <bench.h> #include <bench.h>
#include "benchLog.h" #include "benchLog.h"
#include <benchDataMix.h> #include <benchDataMix.h>
@ -22,73 +27,6 @@
#define SHOW_CNT 100000 #define SHOW_CNT 100000
static void *csvWriteThread(void *param) {
// write thread
for (int i = 0; i < g_arguments->databases->size; i++) {
// database
SDataBase * db = benchArrayGet(g_arguments->databases, i);
for (int j=0; j < db->superTbls->size; j++) {
// stb
SSuperTable* stb = benchArrayGet(db->superTbls, j);
// gen csv
int ret = genWithSTable(db, stb, g_arguments->csvPath);
if(ret != 0) {
errorPrint("failed generate to csv. db=%s stb=%s error code=%d \n", db->dbName, stb->stbName, ret);
return NULL;
}
}
}
return NULL;
}
int csvTestProcess() {
pthread_t handle;
int ret = pthread_create(&handle, NULL, csvWriteThread, NULL);
if (ret != 0) {
errorPrint("pthread_create failed. error code =%d \n", ret);
return -1;
}
infoPrint("start output to csv %s ...\n", g_arguments->csvPath);
int64_t start = toolsGetTimestampMs();
pthread_join(handle, NULL);
int64_t delay = toolsGetTimestampMs() - start;
infoPrint("output to csv %s finished. delay:%"PRId64"s \n", g_arguments->csvPath, delay/1000);
return 0;
}
int genWithSTable(SDataBase* db, SSuperTable* stb, char* outDir) {
// filename
int ret = 0;
char outFile[MAX_FILE_NAME_LEN] = {0};
obtainCsvFile(outFile, db, stb, outDir);
FILE * fs = fopen(outFile, "w");
if(fs == NULL) {
errorPrint("failed create csv file. file=%s, last errno=%d strerror=%s \n", outFile, errno, strerror(errno));
return -1;
}
int rowLen = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->lenOfCols + stb->tags->size + stb->cols->size;
int bufLen = rowLen * g_arguments->reqPerReq;
char* buf = benchCalloc(1, bufLen, true);
infoPrint("start write csv file: %s \n", outFile);
if (stb->interlaceRows > 0) {
// interlace mode
ret = interlaceWriteCsv(db, stb, fs, buf, bufLen, rowLen * 2);
} else {
// batch mode
ret = batchWriteCsv(db, stb, fs, buf, bufLen, rowLen * 2);
}
tmfree(buf);
fclose(fs);
succPrint("end write csv file: %s \n", outFile);
return ret;
}
void obtainCsvFile(char * outFile, SDataBase* db, SSuperTable* stb, char* outDir) { void obtainCsvFile(char * outFile, SDataBase* db, SSuperTable* stb, char* outDir) {
@ -125,7 +63,7 @@ int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufL
for(int64_t j = 0; j < stb->insertRows; j++) { for(int64_t j = 0; j < stb->insertRows; j++) {
genColumnData(colData, stb, ts, db->precision, &ck); genColumnData(colData, stb, ts, db->precision, &ck);
// combine // combine
pos += sprintf(buf + pos, "%s,%s\n", tagData, colData); pos += sprintf(buf + pos, "%s,%s.\n", tagData, colData);
if (bufLen - pos < minRemain) { if (bufLen - pos < minRemain) {
// submit // submit
ret = writeCsvFile(fs, buf, pos); ret = writeCsvFile(fs, buf, pos);
@ -197,7 +135,7 @@ int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int
for (int64_t j = 0; j < needInserts; j++) { for (int64_t j = 0; j < needInserts; j++) {
genColumnData(colData, stb, ts, db->precision, &ck); genColumnData(colData, stb, ts, db->precision, &ck);
// combine tags,cols // combine tags,cols
pos += sprintf(buf + pos, "%s,%s\n", tagDatas[i], colData); pos += sprintf(buf + pos, "%s,%s.\n", tagDatas[i], colData);
if (bufLen - pos < minRemain) { if (bufLen - pos < minRemain) {
// submit // submit
ret = writeCsvFile(fs, buf, pos); ret = writeCsvFile(fs, buf, pos);
@ -300,3 +238,191 @@ int32_t genRowByField(char* buf, BArray* fields, int16_t fieldCnt, char* binanry
return pos1; return pos1;
} }
int genWithSTable(SDataBase* db, SSuperTable* stb) {
int ret = 0;
char outFile[MAX_FILE_NAME_LEN] = {0};
obtainCsvFile(outFile, db, stb, outDir);
FILE * fs = fopen(outFile, "w");
if(fs == NULL) {
errorPrint("failed create csv file. file=%s, last errno=%d strerror=%s \n", outFile, errno, strerror(errno));
return -1;
}
int rowLen = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->lenOfCols + stb->tags->size + stb->cols->size;
int bufLen = rowLen * g_arguments->reqPerReq;
char* buf = benchCalloc(1, bufLen, true);
infoPrint("start write csv file: %s \n", outFile);
if (stb->interlaceRows > 0) {
// interlace mode
ret = interlaceWriteCsv(db, stb, fs, buf, bufLen, rowLen * 2);
} else {
// batch mode
ret = batchWriteCsv(db, stb, fs, buf, bufLen, rowLen * 2);
}
tmfree(buf);
fclose(fs);
succPrint("end write csv file: %s \n", outFile);
return ret;
}
static int is_valid_csv_ts_format(const char* csv_ts_format) {
if (!csv_ts_format) return 0;
struct tm test_tm = {
.tm_year = 70,
.tm_mon = 0,
.tm_mday = 1,
.tm_hour = 0,
.tm_min = 0,
.tm_sec = 0,
.tm_isdst = -1
};
mktime(&test_tm);
char buffer[1024];
size_t len = strftime(buffer, sizeof(buffer), csv_ts_format, &test_tm);
if (len == 0) {
return -1;
}
const char* invalid_chars = "/\\:*?\"<>|";
if (strpbrk(buffer, invalid_chars) != NULL) {
return -1;
}
return 0;
}
static long validate_csv_ts_interval(const char* csv_ts_interval) {
if (!csv_ts_interval || *csv_ts_interval == '\0') return -1;
char* endptr;
errno = 0;
const long num = strtol(csv_ts_interval, &endptr, 10);
if (errno == ERANGE ||
endptr == csv_ts_interval ||
num <= 0) {
return -1;
}
if (*endptr == '\0' ||
*(endptr + 1) != '\0') {
return -1;
}
switch (tolower(*endptr)) {
case 's': return num;
case 'm': return num * 60;
case 'h': return num * 60 * 60;
case 'd': return num * 60 * 60 * 24;
default : return -1;
}
}
static int csvParseParameter() {
// csv_output_path
{
size_t len = strlen(g_arguments->csv_output_path);
if (len == 0) {
errorPrint("Failed to generate CSV, the specified output path is empty. Please provide a valid path. database: %s, super table: %s.\n",
db->dbName, stb->stbName);
return -1;
}
if (g_arguments->csv_output_path[len - 1] != '/') {
int n = snprintf(g_arguments->csv_output_path_buf, sizeof(g_arguments->csv_output_path_buf), "%s/", g_arguments->csv_output_path);
if (n < 0 || n >= sizeof(g_arguments->csv_output_path_buf)) {
errorPrint("Failed to generate CSV, path buffer overflow risk when appending '/'. path: %s, database: %s, super table: %s.\n",
g_arguments->csv_output_path db->dbName, stb->stbName);
return -1;
}
g_arguments->csv_output_path = g_arguments->csv_output_path_buf;
}
}
// csv_ts_format
{
if (g_arguments->csv_ts_format) {
if (is_valid_csv_ts_format(g_arguments->csv_ts_format) != 0) {
errorPrint("Failed to generate CSV, the parameter `csv_ts_format` is invalid. csv_ts_format: %s, database: %s, super table: %s.\n",
g_arguments->csv_ts_format, db->dbName, stb->stbName);
return -1;
}
}
}
// csv_ts_interval
{
long csv_ts_intv_secs = validate_csv_ts_interval(g_arguments->csv_ts_interval);
if (csv_ts_intv_secs <= 0) {
errorPrint("Failed to generate CSV, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s, database: %s, super table: %s.\n",
g_arguments->csv_ts_interval, db->dbName, stb->stbName);
return -1;
}
g_arguments->csv_ts_intv_secs = csv_ts_intv_secs;
}
return 0;
}
static void csvWriteThread() {
for (size_t i = 0; i < g_arguments->databases->size; ++i) {
// database
SDataBase* db = benchArrayGet(g_arguments->databases, i);
if (database->superTbls) {
for (size_t j = 0; j < db->superTbls->size; ++j) {
// stb
SSuperTable* stb = benchArrayGet(db->superTbls, j);
if (stb->insertRows == 0) {
continue;
}
// gen csv
int ret = genWithSTable(db, stb);
if(ret != 0) {
errorPrint("Failed to generate CSV files. database: %s, super table: %s, error code: %d.\n",
db->dbName, stb->stbName, ret);
return;
}
}
}
}
return;
}
int csvTestProcess() {
// parse parameter
if (csvParseParameter() != 0) {
errorPrint("Failed to generate CSV files. database: %s, super table: %s, error code: %d.\n",
db->dbName, stb->stbName, ret);
return -1;
}
infoPrint("Starting to output data to CSV files in directory: %s ...\n", g_arguments->csv_output_path);
int64_t start = toolsGetTimestampMs();
csvWriteThread();
int64_t delay = toolsGetTimestampMs() - start;
infoPrint("Data export to CSV files in directory: %s has been completed. Time elapsed: %.3f seconds\n",
g_arguments->csv_output_path, delay / 1000.0);
return 0;
}

View File

@ -1587,13 +1587,13 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) {
} }
// csv output dir // csv output dir
tools_cJSON* csv_od = tools_cJSON_GetObjectItem(json, "csv_output_dir"); tools_cJSON* csv_op = tools_cJSON_GetObjectItem(json, "csv_output_path");
if (csv_od && csv_od->type == tools_cJSON_String && csv_od->valuestring != NULL) { if (csv_op && csv_op->type == tools_cJSON_String && csv_op->valuestring != NULL) {
g_arguments->csv_output_dir = csv_od->valuestring; g_arguments->csv_output_path = csv_op->valuestring;
} else { } else {
g_arguments->csv_output_dir = "./output/"; g_arguments->csv_output_path = "./output/";
} }
(void)mkdir(g_arguments->csv_output_dir, 0775); (void)mkdir(g_arguments->csv_output_path, 0775);
// csv file prefix // csv file prefix
tools_cJSON* csv_fp = tools_cJSON_GetObjectItem(json, "csv_file_prefix"); tools_cJSON* csv_fp = tools_cJSON_GetObjectItem(json, "csv_file_prefix");
@ -1608,7 +1608,7 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) {
if (csv_tf && csv_tf->type == tools_cJSON_String && csv_tf->valuestring != NULL) { if (csv_tf && csv_tf->type == tools_cJSON_String && csv_tf->valuestring != NULL) {
g_arguments->csv_ts_format = csv_tf->valuestring; g_arguments->csv_ts_format = csv_tf->valuestring;
} else { } else {
g_arguments->csv_ts_format = "YYYYMMDDHHmmSS"; g_arguments->csv_ts_format = NULL;
} }
// csv timestamp format // csv timestamp format

View File

@ -153,7 +153,7 @@ int main(int argc, char* argv[]) {
} }
} else if (g_arguments->test_mode == CSVFILE_TEST) { } else if (g_arguments->test_mode == CSVFILE_TEST) {
if (csvTestProcess()) { if (csvTestProcess()) {
errorPrint("%s", "query test process failed\n"); errorPrint("%s", "generate csv process failed\n");
ret = -1; ret = -1;
} }
} else if (g_arguments->test_mode == QUERY_TEST) { } else if (g_arguments->test_mode == QUERY_TEST) {