enh: add csv writing meta
This commit is contained in:
parent
a1b7986cbd
commit
5cb31be1e6
|
@ -781,8 +781,8 @@ typedef struct SArguments_S {
|
||||||
bool escape_character;
|
bool escape_character;
|
||||||
bool pre_load_tb_meta;
|
bool pre_load_tb_meta;
|
||||||
|
|
||||||
char* csv_output_path;
|
char* output_path;
|
||||||
char csv_output_path_buf[MAX_PATH_LEN];
|
char output_path_buf[MAX_PATH_LEN];
|
||||||
char* csv_file_prefix;
|
char* csv_file_prefix;
|
||||||
char* csv_ts_format;
|
char* csv_ts_format;
|
||||||
char* csv_ts_interval;
|
char* csv_ts_interval;
|
||||||
|
|
|
@ -18,19 +18,26 @@
|
||||||
|
|
||||||
#include <bench.h>
|
#include <bench.h>
|
||||||
|
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
CSV_NAMING_SINGLE,
|
||||||
|
CSV_NAMING_TIME_SLICE,
|
||||||
|
CSV_NAMING_THREAD,
|
||||||
|
CSV_NAMING_THREAD_TIME_SLICE
|
||||||
|
} CsvNamingType;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
CsvNamingType naming_type;
|
||||||
|
time_t start_secs;
|
||||||
|
time_t end_secs;
|
||||||
|
time_t end_ts;
|
||||||
|
size_t thread_id;
|
||||||
|
size_t total_threads;
|
||||||
|
char thread_formatter[TINY_BUFF_LEN];
|
||||||
|
} CsvWriteMeta;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int csvTestProcess();
|
int csvTestProcess();
|
||||||
|
|
||||||
int genWithSTable(SDataBase* db, SSuperTable* stb, char* outDir);
|
|
||||||
|
|
||||||
char * genTagData(char* buf, SSuperTable* stb, int64_t i, int64_t *k);
|
|
||||||
|
|
||||||
char * genColumnData(char* colData, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k);
|
|
||||||
|
|
||||||
int32_t genRowByField(char* buf, BArray* fields, int16_t fieldCnt, char* binanryPrefix, char* ncharPrefix, int64_t *k);
|
|
||||||
|
|
||||||
void obtainCsvFile(char * outFile, SDataBase* db, SSuperTable* stb, char* outDir);
|
|
||||||
|
|
||||||
int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain);
|
|
||||||
int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain);
|
|
||||||
|
|
||||||
#endif // INC_BENCHCSV_H_
|
#endif // INC_BENCHCSV_H_
|
||||||
|
|
|
@ -15,11 +15,10 @@
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
|
|
||||||
#include <bench.h>
|
|
||||||
#include "benchLog.h"
|
#include "benchLog.h"
|
||||||
#include <benchDataMix.h>
|
#include "benchData.h"
|
||||||
#include <benchCsv.h>
|
#include "benchDataMix.h"
|
||||||
|
#include "benchCsv.h"
|
||||||
|
|
||||||
//
|
//
|
||||||
// main etry
|
// main etry
|
||||||
|
@ -29,9 +28,7 @@
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void obtainCsvFile(char * outFile, SDataBase* db, SSuperTable* stb, char* outDir) {
|
|
||||||
sprintf(outFile, "%s%s-%s.csv", outDir, db->dbName, stb->stbName);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t writeCsvFile(FILE* f, char * buf, int32_t len) {
|
int32_t writeCsvFile(FILE* f, char * buf, int32_t len) {
|
||||||
size_t size = fwrite(buf, 1, len, f);
|
size_t size = fwrite(buf, 1, len, f);
|
||||||
|
@ -42,29 +39,36 @@ int32_t writeCsvFile(FILE* f, char * buf, int32_t len) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain) {
|
int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int rows_buf_len, int minRemain) {
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
int pos = 0;
|
int pos = 0;
|
||||||
int64_t tk = 0;
|
int64_t tk = 0;
|
||||||
int64_t show = 0;
|
int64_t show = 0;
|
||||||
|
|
||||||
int tagDataLen = stb->lenOfTags + stb->tags->size + 256;
|
|
||||||
char * tagData = (char *) benchCalloc(1, tagDataLen, true);
|
uint32_t tags_length = accumulateRowLen(stbInfo->tags, stbInfo->iface);
|
||||||
int colDataLen = stb->lenOfCols + stb->cols->size + 256;
|
uint32_t cols_length = accumulateRowLen(stbInfo->cols, stbInfo->iface);
|
||||||
char * colData = (char *) benchCalloc(1, colDataLen, true);
|
|
||||||
|
size_t tags_csv_length = tags_length + stb->tags->size;
|
||||||
|
size_t cols_csv_length = cols_length + stb->cols->size;
|
||||||
|
char* tags_csv_buf = (char*)benchCalloc(1, tags_csv_length, true);
|
||||||
|
char* cols_csv_buf = (char*)benchCalloc(1, cols_csv_length, true);
|
||||||
|
|
||||||
// gen child name
|
// gen child name
|
||||||
for (int64_t i = 0; i < stb->childTblCount; i++) {
|
for (int64_t i = 0; i < stb->childTblCount; ++i) {
|
||||||
int64_t ts = stb->startTimestamp;
|
int64_t ts = stb->startTimestamp;
|
||||||
int64_t ck = 0;
|
int64_t ck = 0;
|
||||||
|
|
||||||
|
// child table
|
||||||
|
|
||||||
// tags
|
// tags
|
||||||
genTagData(tagData, stb, i, &tk);
|
csvGenRowTagData(tags_csv_buf, stb, i, &tk);
|
||||||
// insert child column data
|
// insert child column data
|
||||||
for(int64_t j = 0; j < stb->insertRows; j++) {
|
for(int64_t j = 0; j < stb->insertRows; j++) {
|
||||||
genColumnData(colData, stb, ts, db->precision, &ck);
|
genColumnData(cols_csv_buf, stb, ts, db->precision, &ck);
|
||||||
// combine
|
// combine
|
||||||
pos += sprintf(buf + pos, "%s,%s.\n", tagData, colData);
|
pos += sprintf(buf + pos, "%s,%s.\n", tags_csv_buf, cols_csv_buf);
|
||||||
if (bufLen - pos < minRemain) {
|
if (rows_buf_len - pos < minRemain) {
|
||||||
// submit
|
// submit
|
||||||
ret = writeCsvFile(fs, buf, pos);
|
ret = writeCsvFile(fs, buf, pos);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
|
@ -99,48 +103,277 @@ int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufL
|
||||||
|
|
||||||
END:
|
END:
|
||||||
// free
|
// free
|
||||||
tmfree(tagData);
|
tmfree(tags_csv_buf);
|
||||||
tmfree(colData);
|
tmfree(cols_csv_buf);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int bufLen, int minRemain) {
|
|
||||||
|
static time_t csvGetStartSeconds(SDataBase* db, SSuperTable* stb) {
|
||||||
|
time_t start_seconds = 0;
|
||||||
|
|
||||||
|
if (db->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||||
|
start_seconds = stb->startTimestamp / 1000000L;
|
||||||
|
} else if (db->precision == TSDB_TIME_PRECISION_NANO) {
|
||||||
|
start_seconds = stb->startTimestamp / 1000000000L;
|
||||||
|
} else {
|
||||||
|
start_seconds = stb->startTimestamp / 1000L;
|
||||||
|
}
|
||||||
|
return start_seconds;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void csvConvertTime2String(time_t time_value, char* time_buf, size_t buf_size) {
|
||||||
|
struct tm tm_result;
|
||||||
|
char *old_locale = setlocale(LC_TIME, "C");
|
||||||
|
#ifdef _WIN32
|
||||||
|
gmtime_s(&tm_result, &time_value);
|
||||||
|
#else
|
||||||
|
gmtime_r(&time_value, &tm_result);
|
||||||
|
#endif
|
||||||
|
strftime(time_buf, buf_size, g_arguments->csv_ts_format, &tm_result);
|
||||||
|
if (old_locale) {
|
||||||
|
(LC_TIME, old_locale);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static CsvNamingType csvGetFileNamingType(SSuperTable* stb) {
|
||||||
|
if (stb->interlaceRows > 0) {
|
||||||
|
if (g_arguments->csv_ts_format) {
|
||||||
|
return CSV_NAMING_TIME_SLICE;
|
||||||
|
} else {
|
||||||
|
return CSV_NAMING_SINGLE;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (g_arguments->csv_ts_format) {
|
||||||
|
return CSV_NAMING_THREAD_TIME_SLICE;
|
||||||
|
} else {
|
||||||
|
return CSV_NAMING_THREAD;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void csvGenEndTimestamp(CsvWriteMeta* meta, SDataBase* db) {
|
||||||
|
time_t end_ts = 0;
|
||||||
|
|
||||||
|
if (db->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||||
|
end_ts = meta->end_secs * 1000000L;
|
||||||
|
} else if (db->precision == TSDB_TIME_PRECISION_NANO) {
|
||||||
|
end_ts = meta->end_secs * 1000000000L;
|
||||||
|
} else {
|
||||||
|
end_ts = meta->end_secs * 1000L;
|
||||||
|
}
|
||||||
|
meta->end_ts = end_ts;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void csvGenThreadFormatter(CsvWriteMeta* meta) {
|
||||||
|
int digits = 0;
|
||||||
|
if (meta->total_threads == 0) {
|
||||||
|
digits = 1;
|
||||||
|
} else {
|
||||||
|
for (int n = meta->total_threads; n > 0; n /= 10) {
|
||||||
|
digits++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (digits <= 1) {
|
||||||
|
(void)sprintf(meta->thread_formatter, "%%d");
|
||||||
|
} else {
|
||||||
|
(void)snprintf(meta->thread_formatter, sizeof(meta->thread_formatter), "%%0%dd", digits);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static CsvWriteMeta csvInitFileNamingMeta(SDataBase* db, SSuperTable* stb) {
|
||||||
|
CsvWriteMeta meta = {
|
||||||
|
.naming_type = CSV_NAMING_SINGLE,
|
||||||
|
.start_secs = 0,
|
||||||
|
.end_secs = 0,
|
||||||
|
.thread_id = 0,
|
||||||
|
.total_threads = 1,
|
||||||
|
.thread_formatter = {}
|
||||||
|
};
|
||||||
|
|
||||||
|
meta.naming_type = csvGetFileNamingType(stb);
|
||||||
|
|
||||||
|
switch (meta.naming_type) {
|
||||||
|
case CSV_NAMING_SINGLE: {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CSV_NAMING_TIME_SLICE: {
|
||||||
|
meta.start_secs = csvGetStartSeconds(db, stb);
|
||||||
|
meta.end_secs = meta.start_secs + g_arguments->csv_ts_intv_secs;
|
||||||
|
csvGenEndTimestamp(&meta, db);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CSV_NAMING_THREAD: {
|
||||||
|
meta.thread_id = 1;
|
||||||
|
meta.total_threads = g_arguments->nthreads;
|
||||||
|
csvGenThreadFormatter(&meta);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CSV_NAMING_THREAD_TIME_SLICE: {
|
||||||
|
meta.thread_id = 1;
|
||||||
|
meta.total_threads = g_arguments->nthreads;
|
||||||
|
csvGenThreadFormatter(&meta);
|
||||||
|
meta.start_secs = csvGetStartSeconds(db, stb);
|
||||||
|
meta.end_secs = meta.start_secs + g_arguments->csv_ts_intv_secs;
|
||||||
|
csvGenEndTimestamp(&meta, db);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
meta.naming_type = CSV_NAMING_SINGLE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return meta;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int csvGetFileFullname(CsvWriteMeta* meta, char* fullname, size_t size) {
|
||||||
|
char thread_buf[SMALL_BUFF_LEN];
|
||||||
|
char start_time_buf[MIDDLE_BUFF_LEN];
|
||||||
|
char end_time_buf[MIDDLE_BUFF_LEN];
|
||||||
|
int ret = -1;
|
||||||
|
const char* base_path = g_arguments->output_path;
|
||||||
|
const char* file_prefix = g_arguments->csv_file_prefix;
|
||||||
|
|
||||||
|
switch (meta->naming_type) {
|
||||||
|
case CSV_NAMING_SINGLE: {
|
||||||
|
ret = snprintf(fullname, size, "%s%s.csv", base_path, file_prefix);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CSV_NAMING_TIME_SLICE: {
|
||||||
|
csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf));
|
||||||
|
csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf));
|
||||||
|
ret = snprintf(fullname, size, "%s%s_%s_%s.csv", base_path, file_prefix, start_time_buf, end_time_buf);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CSV_NAMING_THREAD: {
|
||||||
|
(void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id);
|
||||||
|
ret = snprintf(fullname, size, "%s%s_%s.csv", base_path, file_prefix, thread_buf);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CSV_NAMING_THREAD_TIME_SLICE: {
|
||||||
|
(void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id);
|
||||||
|
csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf));
|
||||||
|
csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf));
|
||||||
|
ret = snprintf(fullname, size, "%s%s_%s_%s_%s.csv", base_path, file_prefix, thread_buf, start_time_buf, end_time_buf);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
ret = -1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return (ret > 0 && (size_t)ret < size) ? 0 : -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
uint32_t csvCalcInterlaceRows(CsvWriteMeta* meta, SSuperTable* stb, int64_t ts) {
|
||||||
|
uint32_t need_rows = 0;
|
||||||
|
|
||||||
|
|
||||||
|
switch (meta->naming_type) {
|
||||||
|
case CSV_NAMING_SINGLE: {
|
||||||
|
need_rows = stb->interlaceRows;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CSV_NAMING_TIME_SLICE: {
|
||||||
|
(meta->end_ts - ts) / stb->timestamp_step
|
||||||
|
need_rows = stb->interlaceRows;
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CSV_NAMING_THREAD: {
|
||||||
|
(void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id);
|
||||||
|
ret = snprintf(fullname, size, "%s%s_%s.csv", base_path, file_prefix, thread_buf);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CSV_NAMING_THREAD_TIME_SLICE: {
|
||||||
|
(void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id);
|
||||||
|
csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf));
|
||||||
|
csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf));
|
||||||
|
ret = snprintf(fullname, size, "%s%s_%s_%s_%s.csv", base_path, file_prefix, thread_buf, start_time_buf, end_time_buf);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
ret = -1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
static int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fp, char* rows_buf, int rows_buf_len) {
|
||||||
|
char fullname[MAX_PATH_LEN] = {};
|
||||||
|
CsvWriteMeta meta = csvInitFileNamingMeta();
|
||||||
|
|
||||||
|
int ret = csvGetFileFullname(&meta, fullname, sizeof(fullname));
|
||||||
|
if (ret < 0) {
|
||||||
|
errorPrint("Failed to generate csv filename. database: %s, super table: %s, naming type: %d.\n",
|
||||||
|
db->dbName, stb->stbName, meta.naming_type);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
int pos = 0;
|
int pos = 0;
|
||||||
int64_t n = 0; // already inserted rows for one child table
|
int64_t n = 0; // already inserted rows for one child table
|
||||||
int64_t tk = 0;
|
int64_t tk = 0;
|
||||||
int64_t show = 0;
|
int64_t show = 0;
|
||||||
|
int64_t ts = 0;
|
||||||
char **tagDatas = (char **)benchCalloc(stb->childTblCount, sizeof(char *), true);
|
|
||||||
int colDataLen = stb->lenOfCols + stb->cols->size + 256;
|
|
||||||
char * colData = (char *) benchCalloc(1, colDataLen, true);
|
|
||||||
int64_t last_ts = stb->startTimestamp;
|
int64_t last_ts = stb->startTimestamp;
|
||||||
|
|
||||||
while (n < stb->insertRows ) {
|
// init buffer
|
||||||
for (int64_t i = 0; i < stb->childTblCount; i++) {
|
char** tags_buf_bucket = (char **)benchCalloc(stb->childTblCount, sizeof(char *), true);
|
||||||
// start one table
|
int cols_buf_length = stb->lenOfCols + stb->cols->size;
|
||||||
int64_t ts = last_ts;
|
char* cols_buf = (char *)benchCalloc(1, cols_buf_length, true);
|
||||||
int64_t ck = 0;
|
|
||||||
// tags
|
for (int64_t i = 0; i < stb->childTblCount; ++i) {
|
||||||
if (tagDatas[i] == NULL) {
|
int tags_buf_length = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->tags->size;
|
||||||
tagDatas[i] = genTagData(NULL, stb, i, &tk);
|
tags_buf_bucket[i] = benchCalloc(1, tags_buf_length, true);
|
||||||
|
if (!tags_buf_bucket[i]) {
|
||||||
|
ret = -1;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ret = csvGenRowTagData(tags_buf_bucket[i], tags_buf_length, stb, i, &tk);
|
||||||
|
if (!ret) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
while (n < stb->insertRows ) {
|
||||||
|
for (int64_t i = 0; i < stb->childTblCount; ++i) {
|
||||||
|
ts = last_ts;
|
||||||
|
int64_t ck = 0;
|
||||||
|
|
||||||
|
|
||||||
// calc need insert rows
|
// calc need insert rows
|
||||||
|
uint32_t need_rows = csvCalcInterlaceRows(&meta, stb, ts)
|
||||||
|
|
||||||
int64_t needInserts = stb->interlaceRows;
|
int64_t needInserts = stb->interlaceRows;
|
||||||
if(needInserts > stb->insertRows - n) {
|
if(needInserts > stb->insertRows - n) {
|
||||||
needInserts = stb->insertRows - n;
|
needInserts = stb->insertRows - n;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int64_t j = 0; j < needInserts; j++) {
|
for (int64_t j = 0; j < needInserts; j++) {
|
||||||
genColumnData(colData, stb, ts, db->precision, &ck);
|
genColumnData(cols_buf, stb, ts, db->precision, &ck);
|
||||||
// combine tags,cols
|
// combine tags,cols
|
||||||
pos += sprintf(buf + pos, "%s,%s.\n", tagDatas[i], colData);
|
pos += sprintf(buf + pos, "%s,%s\n", tags_buf_bucket[i], cols_buf);
|
||||||
if (bufLen - pos < minRemain) {
|
if (rows_buf_len - pos < minRemain) {
|
||||||
// submit
|
// submit
|
||||||
ret = writeCsvFile(fs, buf, pos);
|
ret = writeCsvFile(fp, buf, pos);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
goto END;
|
goto end;
|
||||||
}
|
}
|
||||||
pos = 0;
|
pos = 0;
|
||||||
}
|
}
|
||||||
|
@ -152,7 +385,7 @@ int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int
|
||||||
if(g_arguments->terminate) {
|
if(g_arguments->terminate) {
|
||||||
infoPrint("%s", "You are cancel, exiting ... \n");
|
infoPrint("%s", "You are cancel, exiting ... \n");
|
||||||
ret = -1;
|
ret = -1;
|
||||||
goto END;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
// print show
|
// print show
|
||||||
|
@ -170,113 +403,131 @@ int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pos > 0) {
|
if (pos > 0) {
|
||||||
ret = writeCsvFile(fs, buf, pos);
|
ret = writeCsvFile(fp, buf, pos);
|
||||||
pos = 0;
|
pos = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
END:
|
end:
|
||||||
// free
|
// free
|
||||||
for (int64_t m = 0 ; m < stb->childTblCount; m ++) {
|
for (int64_t m = 0 ; m < stb->childTblCount; m ++) {
|
||||||
tmfree(tagDatas[m]);
|
tmfree(tags_buf_bucket[m]);
|
||||||
}
|
}
|
||||||
tmfree(tagDatas);
|
tmfree(tags_buf_bucket);
|
||||||
tmfree(colData);
|
tmfree(cols_buf);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// gen tag data
|
// gen tag data
|
||||||
char * genTagData(char* buf, SSuperTable* stb, int64_t i, int64_t *k) {
|
int csvGenRowTagData(char* buf, size_t size, SSuperTable* stb, int64_t index, int64_t* k) {
|
||||||
// malloc
|
|
||||||
char* tagData;
|
|
||||||
if (buf == NULL) {
|
|
||||||
int tagDataLen = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->tags->size + 32;
|
|
||||||
tagData = benchCalloc(1, tagDataLen, true);
|
|
||||||
} else {
|
|
||||||
tagData = buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
int pos = 0;
|
|
||||||
// tbname
|
// tbname
|
||||||
pos += sprintf(tagData, "\'%s%"PRId64"\'", stb->childTblPrefix, i);
|
int pos = snprintf(buf, size, "\'%s%"PRId64"\'", stb->childTblPrefix, index);
|
||||||
// tags
|
// tags
|
||||||
pos += genRowByField(tagData + pos, stb->tags, stb->tags->size, stb->binaryPrefex, stb->ncharPrefex, k);
|
pos += csvGenRowFields(buf + pos, stb->tags, stb->tags->size, stb->binaryPrefex, stb->ncharPrefex, k);
|
||||||
|
|
||||||
return tagData;
|
return (pos > 0 && (size_t)pos < size) ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// gen column data
|
// gen column data
|
||||||
char * genColumnData(char* colData, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k) {
|
char * genColumnData(char* cols_csv_buf, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k) {
|
||||||
char szTime[128] = {0};
|
char szTime[128] = {0};
|
||||||
toolsFormatTimestamp(szTime, ts, precision);
|
toolsFormatTimestamp(szTime, ts, precision);
|
||||||
int pos = sprintf(colData, "\'%s\'", szTime);
|
int pos = sprintf(cols_csv_buf, "\'%s\'", szTime);
|
||||||
|
|
||||||
// columns
|
// columns
|
||||||
genRowByField(colData + pos, stb->cols, stb->cols->size, stb->binaryPrefex, stb->ncharPrefex, k);
|
csvGenRowFields(cols_csv_buf + pos, stb->cols, stb->cols->size, stb->binaryPrefex, stb->ncharPrefex, k);
|
||||||
return colData;
|
return cols_csv_buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t genRowByField(char* buf, BArray* fields, int16_t fieldCnt, char* binanryPrefix, char* ncharPrefix, int64_t *k) {
|
int32_t csvGenRowFields(char* buf, BArray* fields, int16_t field_count, char* binanry_prefix, char* nchar_prefix, int64_t* k) {
|
||||||
|
int32_t pos = 0;
|
||||||
|
|
||||||
// other cols data
|
for (uint16_t i = 0; i < field_count; ++i) {
|
||||||
int32_t pos1 = 0;
|
Field* field = benchArrayGet(fields, i);
|
||||||
for(uint16_t i = 0; i < fieldCnt; i++) {
|
|
||||||
Field* fd = benchArrayGet(fields, i);
|
|
||||||
char* prefix = "";
|
char* prefix = "";
|
||||||
if(fd->type == TSDB_DATA_TYPE_BINARY || fd->type == TSDB_DATA_TYPE_VARBINARY) {
|
if(field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY) {
|
||||||
if(binanryPrefix) {
|
if (binanry_prefix) {
|
||||||
prefix = binanryPrefix;
|
prefix = binanry_prefix;
|
||||||
}
|
}
|
||||||
} else if(fd->type == TSDB_DATA_TYPE_NCHAR) {
|
} else if(field->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
if(ncharPrefix) {
|
if (nchar_prefix) {
|
||||||
prefix = ncharPrefix;
|
prefix = nchar_prefix;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pos += dataGenByField(field, buf, pos, prefix, k, "");
|
||||||
|
}
|
||||||
|
|
||||||
pos1 += dataGenByField(fd, buf, pos1, prefix, k, "");
|
return pos;
|
||||||
}
|
|
||||||
|
|
||||||
return pos1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int genWithSTable(SDataBase* db, SSuperTable* stb) {
|
|
||||||
|
|
||||||
|
|
||||||
|
int csvGenStbInterlace(SDataBase* db, SSuperTable* stb) {
|
||||||
|
|
||||||
|
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
char outFile[MAX_FILE_NAME_LEN] = {0};
|
char outFile[MAX_FILE_NAME_LEN] = {0};
|
||||||
obtainCsvFile(outFile, db, stb, outDir);
|
obtainCsvFile(outFile, db, stb, outDir);
|
||||||
FILE * fs = fopen(outFile, "w");
|
FILE* fp = fopen(outFile, "w");
|
||||||
if(fs == NULL) {
|
if(fp == NULL) {
|
||||||
errorPrint("failed create csv file. file=%s, last errno=%d strerror=%s \n", outFile, errno, strerror(errno));
|
errorPrint("failed create csv file. file=%s, last errno=%d strerror=%s \n", outFile, errno, strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int rowLen = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->lenOfCols + stb->tags->size + stb->cols->size;
|
int row_buf_len = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->lenOfCols + stb->tags->size + stb->cols->size;
|
||||||
int bufLen = rowLen * g_arguments->reqPerReq;
|
int rows_buf_len = row_buf_len * g_arguments->interlaceRows;
|
||||||
char* buf = benchCalloc(1, bufLen, true);
|
char* rows_buf = benchCalloc(1, rows_buf_len, true);
|
||||||
|
|
||||||
infoPrint("start write csv file: %s \n", outFile);
|
infoPrint("start write csv file: %s \n", outFile);
|
||||||
|
|
||||||
if (stb->interlaceRows > 0) {
|
|
||||||
// interlace mode
|
// interlace mode
|
||||||
ret = interlaceWriteCsv(db, stb, fs, buf, bufLen, rowLen * 2);
|
ret = interlaceWriteCsv(db, stb, fp, rows_buf, rows_buf_len);
|
||||||
} else {
|
|
||||||
// batch mode
|
|
||||||
ret = batchWriteCsv(db, stb, fs, buf, bufLen, rowLen * 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
tmfree(buf);
|
|
||||||
fclose(fs);
|
tmfree(rows_buf);
|
||||||
|
fclose(fp);
|
||||||
|
|
||||||
succPrint("end write csv file: %s \n", outFile);
|
succPrint("end write csv file: %s \n", outFile);
|
||||||
|
|
||||||
|
|
||||||
|
// wait threads
|
||||||
|
for (int i = 0; i < threadCnt; i++) {
|
||||||
|
infoPrint("pthread_join %d ...\n", i);
|
||||||
|
pthread_join(pids[i], NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int is_valid_csv_ts_format(const char* csv_ts_format) {
|
void csvGenPrepare(SDataBase* db, SSuperTable* stb) {
|
||||||
|
stbInfo->lenOfTags = accumulateRowLen(stbInfo->tags, stbInfo->iface);
|
||||||
|
stbInfo->lenOfCols = accumulateRowLen(stbInfo->cols, stbInfo->iface);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int csvGenStb(SDataBase* db, SSuperTable* stb) {
|
||||||
|
// prepare
|
||||||
|
csvGenPrepare(db, stb);
|
||||||
|
|
||||||
|
|
||||||
|
int ret = 0;
|
||||||
|
if (stb->interlaceRows > 0) {
|
||||||
|
// interlace mode
|
||||||
|
ret = csvGenStbInterlace(db, stb);
|
||||||
|
} else {
|
||||||
|
// batch mode
|
||||||
|
ret = csvGenStbBatch(db, stb);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int csvValidateParamTsFormat(const char* csv_ts_format) {
|
||||||
if (!csv_ts_format) return 0;
|
if (!csv_ts_format) return 0;
|
||||||
|
|
||||||
struct tm test_tm = {
|
struct tm test_tm = {
|
||||||
|
@ -296,7 +547,11 @@ static int is_valid_csv_ts_format(const char* csv_ts_format) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef _WIN32
|
||||||
const char* invalid_chars = "/\\:*?\"<>|";
|
const char* invalid_chars = "/\\:*?\"<>|";
|
||||||
|
#else
|
||||||
|
const char* invalid_chars = "/\\?\"<>|";
|
||||||
|
#endif
|
||||||
if (strpbrk(buffer, invalid_chars) != NULL) {
|
if (strpbrk(buffer, invalid_chars) != NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -305,7 +560,7 @@ static int is_valid_csv_ts_format(const char* csv_ts_format) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static long validate_csv_ts_interval(const char* csv_ts_interval) {
|
static long csvValidateParamTsInterval(const char* csv_ts_interval) {
|
||||||
if (!csv_ts_interval || *csv_ts_interval == '\0') return -1;
|
if (!csv_ts_interval || *csv_ts_interval == '\0') return -1;
|
||||||
|
|
||||||
char* endptr;
|
char* endptr;
|
||||||
|
@ -335,35 +590,35 @@ static long validate_csv_ts_interval(const char* csv_ts_interval) {
|
||||||
|
|
||||||
static int csvParseParameter() {
|
static int csvParseParameter() {
|
||||||
// csv_output_path
|
// csv_output_path
|
||||||
size_t len = strlen(g_arguments->csv_output_path);
|
size_t len = strlen(g_arguments->output_path);
|
||||||
if (len == 0) {
|
if (len == 0) {
|
||||||
errorPrint("Failed to generate CSV, the specified output path is empty. Please provide a valid path. database: %s, super table: %s.\n",
|
errorPrint("Failed to generate CSV files, the specified output path is empty. Please provide a valid path. database: %s, super table: %s.\n",
|
||||||
db->dbName, stb->stbName);
|
db->dbName, stb->stbName);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (g_arguments->csv_output_path[len - 1] != '/') {
|
if (g_arguments->output_path[len - 1] != '/') {
|
||||||
int n = snprintf(g_arguments->csv_output_path_buf, sizeof(g_arguments->csv_output_path_buf), "%s/", g_arguments->csv_output_path);
|
int n = snprintf(g_arguments->output_path_buf, sizeof(g_arguments->output_path_buf), "%s/", g_arguments->output_path);
|
||||||
if (n < 0 || n >= sizeof(g_arguments->csv_output_path_buf)) {
|
if (n < 0 || n >= sizeof(g_arguments->output_path_buf)) {
|
||||||
errorPrint("Failed to generate CSV, path buffer overflow risk when appending '/'. path: %s, database: %s, super table: %s.\n",
|
errorPrint("Failed to generate CSV files, path buffer overflow risk when appending '/'. path: %s, database: %s, super table: %s.\n",
|
||||||
g_arguments->csv_output_path, db->dbName, stb->stbName);
|
g_arguments->csv_output_path, db->dbName, stb->stbName);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
g_arguments->csv_output_path = g_arguments->csv_output_path_buf;
|
g_arguments->output_path = g_arguments->output_path_buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
// csv_ts_format
|
// csv_ts_format
|
||||||
if (g_arguments->csv_ts_format) {
|
if (g_arguments->csv_ts_format) {
|
||||||
if (is_valid_csv_ts_format(g_arguments->csv_ts_format) != 0) {
|
if (csvValidateParamTsFormat(g_arguments->csv_ts_format) != 0) {
|
||||||
errorPrint("Failed to generate CSV, the parameter `csv_ts_format` is invalid. csv_ts_format: %s, database: %s, super table: %s.\n",
|
errorPrint("Failed to generate CSV files, the parameter `csv_ts_format` is invalid. csv_ts_format: %s, database: %s, super table: %s.\n",
|
||||||
g_arguments->csv_ts_format, db->dbName, stb->stbName);
|
g_arguments->csv_ts_format, db->dbName, stb->stbName);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// csv_ts_interval
|
// csv_ts_interval
|
||||||
long csv_ts_intv_secs = validate_csv_ts_interval(g_arguments->csv_ts_interval);
|
long csv_ts_intv_secs = csvValidateParamTsInterval(g_arguments->csv_ts_interval);
|
||||||
if (csv_ts_intv_secs <= 0) {
|
if (csv_ts_intv_secs <= 0) {
|
||||||
errorPrint("Failed to generate CSV, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s, database: %s, super table: %s.\n",
|
errorPrint("Failed to generate CSV files, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s, database: %s, super table: %s.\n",
|
||||||
g_arguments->csv_ts_interval, db->dbName, stb->stbName);
|
g_arguments->csv_ts_interval, db->dbName, stb->stbName);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -373,12 +628,12 @@ static int csvParseParameter() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void csvWriteThread() {
|
static int csvWriteThread() {
|
||||||
for (size_t i = 0; i < g_arguments->databases->size; ++i) {
|
for (size_t i = 0; i < g_arguments->databases->size && !g_arguments->terminate; ++i) {
|
||||||
// database
|
// database
|
||||||
SDataBase* db = benchArrayGet(g_arguments->databases, i);
|
SDataBase* db = benchArrayGet(g_arguments->databases, i);
|
||||||
if (database->superTbls) {
|
if (database->superTbls) {
|
||||||
for (size_t j = 0; j < db->superTbls->size; ++j) {
|
for (size_t j = 0; j < db->superTbls->size && !g_arguments->terminate; ++j) {
|
||||||
// stb
|
// stb
|
||||||
SSuperTable* stb = benchArrayGet(db->superTbls, j);
|
SSuperTable* stb = benchArrayGet(db->superTbls, j);
|
||||||
if (stb->insertRows == 0) {
|
if (stb->insertRows == 0) {
|
||||||
|
@ -386,37 +641,34 @@ static void csvWriteThread() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// gen csv
|
// gen csv
|
||||||
int ret = genWithSTable(db, stb);
|
int ret = csvGenStb(db, stb);
|
||||||
if(ret != 0) {
|
if(ret != 0) {
|
||||||
errorPrint("Failed to generate CSV files. database: %s, super table: %s, error code: %d.\n",
|
|
||||||
db->dbName, stb->stbName, ret);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int csvTestProcess() {
|
|
||||||
// parse parameter
|
|
||||||
if (csvParseParameter() != 0) {
|
|
||||||
errorPrint("Failed to generate CSV files. database: %s, super table: %s, error code: %d.\n",
|
errorPrint("Failed to generate CSV files. database: %s, super table: %s, error code: %d.\n",
|
||||||
db->dbName, stb->stbName, ret);
|
db->dbName, stb->stbName, ret);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
infoPrint("Starting to output data to CSV files in directory: %s ...\n", g_arguments->csv_output_path);
|
|
||||||
int64_t start = toolsGetTimestampMs();
|
int csvTestProcess() {
|
||||||
csvWriteThread();
|
// parsing parameters
|
||||||
int64_t delay = toolsGetTimestampMs() - start;
|
if (csvParseParameter() != 0) {
|
||||||
infoPrint("Data export to CSV files in directory: %s has been completed. Time elapsed: %.3f seconds\n",
|
return -1;
|
||||||
g_arguments->csv_output_path, delay / 1000.0);
|
}
|
||||||
|
|
||||||
|
infoPrint("Starting to output data to CSV files in directory: %s ...\n", g_arguments->output_path);
|
||||||
|
int64_t start = toolsGetTimestampMs();
|
||||||
|
int ret = csvWriteThread();
|
||||||
|
if (ret != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
int64_t delay = toolsGetTimestampMs() - start;
|
||||||
|
infoPrint("Generating CSV files in directory: %s has been completed. Time elapsed: %.3f seconds\n",
|
||||||
|
g_arguments->output_path, delay / 1000.0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1586,14 +1586,14 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// csv output dir
|
// output dir
|
||||||
tools_cJSON* csv_op = tools_cJSON_GetObjectItem(json, "csv_output_path");
|
tools_cJSON* opp = tools_cJSON_GetObjectItem(json, "output_path");
|
||||||
if (csv_op && csv_op->type == tools_cJSON_String && csv_op->valuestring != NULL) {
|
if (opp && opp->type == tools_cJSON_String && opp->valuestring != NULL) {
|
||||||
g_arguments->csv_output_path = csv_op->valuestring;
|
g_arguments->output_path = opp->valuestring;
|
||||||
} else {
|
} else {
|
||||||
g_arguments->csv_output_path = "./output/";
|
g_arguments->output_path = "./output/";
|
||||||
}
|
}
|
||||||
(void)mkdir(g_arguments->csv_output_path, 0775);
|
(void)mkdir(g_arguments->output_path, 0775);
|
||||||
|
|
||||||
// csv file prefix
|
// csv file prefix
|
||||||
tools_cJSON* csv_fp = tools_cJSON_GetObjectItem(json, "csv_file_prefix");
|
tools_cJSON* csv_fp = tools_cJSON_GetObjectItem(json, "csv_file_prefix");
|
||||||
|
|
Loading…
Reference in New Issue