enh: framework that supports concurrent writing to csv
This commit is contained in:
parent
5cb31be1e6
commit
80ecd4feb4
|
@ -20,6 +20,9 @@
|
||||||
#define CURL_STATICLIB
|
#define CURL_STATICLIB
|
||||||
#define ALLOW_FORBID_FUNC
|
#define ALLOW_FORBID_FUNC
|
||||||
|
|
||||||
|
#define MAX(a, b) ((a) > (b) ? (a) : (b))
|
||||||
|
#define MIN(a, b) ((a) < (b) ? (a) : (b))
|
||||||
|
|
||||||
#ifdef LINUX
|
#ifdef LINUX
|
||||||
|
|
||||||
#ifndef _ALPINE
|
#ifndef _ALPINE
|
||||||
|
@ -787,6 +790,8 @@ typedef struct SArguments_S {
|
||||||
char* csv_ts_format;
|
char* csv_ts_format;
|
||||||
char* csv_ts_interval;
|
char* csv_ts_interval;
|
||||||
long csv_ts_intv_secs;
|
long csv_ts_intv_secs;
|
||||||
|
bool csv_output_header;
|
||||||
|
bool csv_tbname_alias;
|
||||||
|
|
||||||
bool bind_vgroup;
|
bool bind_vgroup;
|
||||||
} SArguments;
|
} SArguments;
|
||||||
|
|
|
@ -20,22 +20,49 @@
|
||||||
|
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
CSV_NAMING_SINGLE,
|
CSV_NAMING_I_SINGLE,
|
||||||
CSV_NAMING_TIME_SLICE,
|
CSV_NAMING_I_TIME_SLICE,
|
||||||
CSV_NAMING_THREAD,
|
CSV_NAMING_B_THREAD,
|
||||||
CSV_NAMING_THREAD_TIME_SLICE
|
CSV_NAMING_B_THREAD_TIME_SLICE
|
||||||
} CsvNamingType;
|
} CsvNamingType;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char* buf;
|
||||||
|
int buf_size;
|
||||||
|
int length;
|
||||||
|
} CsvRowFieldsBuf;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
CsvNamingType naming_type;
|
CsvNamingType naming_type;
|
||||||
time_t start_secs;
|
|
||||||
time_t end_secs;
|
|
||||||
time_t end_ts;
|
|
||||||
size_t thread_id;
|
|
||||||
size_t total_threads;
|
size_t total_threads;
|
||||||
char thread_formatter[TINY_BUFF_LEN];
|
char thread_formatter[TINY_BUFF_LEN];
|
||||||
|
SDataBase* db;
|
||||||
|
SSuperTable* stb;
|
||||||
|
int64_t start_ts;
|
||||||
|
int64_t end_ts;
|
||||||
|
int64_t ts_step;
|
||||||
|
int64_t interlace_step;
|
||||||
} CsvWriteMeta;
|
} CsvWriteMeta;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint64_t ctb_start_idx;
|
||||||
|
uint64_t ctb_end_idx;
|
||||||
|
uint64_t ctb_count;
|
||||||
|
time_t start_secs;
|
||||||
|
time_t end_secs;
|
||||||
|
size_t thread_id;
|
||||||
|
bool output_header;
|
||||||
|
CsvRowFieldsBuf* tags_buf_bucket;
|
||||||
|
CsvRowFieldsBuf* cols_buf;
|
||||||
|
} CsvThreadMeta;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
CsvWriteMeta* write_meta;
|
||||||
|
CsvThreadMeta thread_meta;
|
||||||
|
} CsvThreadArgs;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int csvTestProcess();
|
int csvTestProcess();
|
||||||
|
|
|
@ -25,105 +25,26 @@
|
||||||
//
|
//
|
||||||
|
|
||||||
#define SHOW_CNT 100000
|
#define SHOW_CNT 100000
|
||||||
|
#define GEN_ROW_FIELDS_TAG 0
|
||||||
|
#define GEN_ROW_FIELDS_COL 1
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
static time_t csvGetStartSeconds(int precision, int64_t start_ts) {
|
||||||
|
|
||||||
int32_t writeCsvFile(FILE* f, char * buf, int32_t len) {
|
|
||||||
size_t size = fwrite(buf, 1, len, f);
|
|
||||||
if(size != len) {
|
|
||||||
errorPrint("failed to write csv file. expect write length:%d real write length:%d \n", len, (int32_t)size);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int rows_buf_len, int minRemain) {
|
|
||||||
int ret = 0;
|
|
||||||
int pos = 0;
|
|
||||||
int64_t tk = 0;
|
|
||||||
int64_t show = 0;
|
|
||||||
|
|
||||||
|
|
||||||
uint32_t tags_length = accumulateRowLen(stbInfo->tags, stbInfo->iface);
|
|
||||||
uint32_t cols_length = accumulateRowLen(stbInfo->cols, stbInfo->iface);
|
|
||||||
|
|
||||||
size_t tags_csv_length = tags_length + stb->tags->size;
|
|
||||||
size_t cols_csv_length = cols_length + stb->cols->size;
|
|
||||||
char* tags_csv_buf = (char*)benchCalloc(1, tags_csv_length, true);
|
|
||||||
char* cols_csv_buf = (char*)benchCalloc(1, cols_csv_length, true);
|
|
||||||
|
|
||||||
// gen child name
|
|
||||||
for (int64_t i = 0; i < stb->childTblCount; ++i) {
|
|
||||||
int64_t ts = stb->startTimestamp;
|
|
||||||
int64_t ck = 0;
|
|
||||||
|
|
||||||
// child table
|
|
||||||
|
|
||||||
// tags
|
|
||||||
csvGenRowTagData(tags_csv_buf, stb, i, &tk);
|
|
||||||
// insert child column data
|
|
||||||
for(int64_t j = 0; j < stb->insertRows; j++) {
|
|
||||||
genColumnData(cols_csv_buf, stb, ts, db->precision, &ck);
|
|
||||||
// combine
|
|
||||||
pos += sprintf(buf + pos, "%s,%s.\n", tags_csv_buf, cols_csv_buf);
|
|
||||||
if (rows_buf_len - pos < minRemain) {
|
|
||||||
// submit
|
|
||||||
ret = writeCsvFile(fs, buf, pos);
|
|
||||||
if (ret != 0) {
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
|
|
||||||
pos = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ts move next
|
|
||||||
ts += stb->timestamp_step;
|
|
||||||
|
|
||||||
// check cancel
|
|
||||||
if(g_arguments->terminate) {
|
|
||||||
infoPrint("%s", "You are cancel, exiting ...\n");
|
|
||||||
ret = -1;
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
|
|
||||||
// print show
|
|
||||||
if (++show % SHOW_CNT == 0) {
|
|
||||||
infoPrint("batch write child table cnt = %"PRId64 " all rows = %" PRId64 "\n", i+1, show);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pos > 0) {
|
|
||||||
ret = writeCsvFile(fs, buf, pos);
|
|
||||||
pos = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
END:
|
|
||||||
// free
|
|
||||||
tmfree(tags_csv_buf);
|
|
||||||
tmfree(cols_csv_buf);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static time_t csvGetStartSeconds(SDataBase* db, SSuperTable* stb) {
|
|
||||||
time_t start_seconds = 0;
|
time_t start_seconds = 0;
|
||||||
|
|
||||||
if (db->precision == TSDB_TIME_PRECISION_MICRO) {
|
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||||
start_seconds = stb->startTimestamp / 1000000L;
|
start_seconds = start_ts / 1000000L;
|
||||||
} else if (db->precision == TSDB_TIME_PRECISION_NANO) {
|
} else if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||||
start_seconds = stb->startTimestamp / 1000000000L;
|
start_seconds = start_ts / 1000000000L;
|
||||||
} else {
|
} else {
|
||||||
start_seconds = stb->startTimestamp / 1000L;
|
start_seconds = start_ts / 1000L;
|
||||||
}
|
}
|
||||||
return start_seconds;
|
return start_seconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void csvConvertTime2String(time_t time_value, char* time_buf, size_t buf_size) {
|
static void csvConvertTime2String(time_t time_value, char* time_buf, size_t buf_size) {
|
||||||
struct tm tm_result;
|
struct tm tm_result;
|
||||||
char *old_locale = setlocale(LC_TIME, "C");
|
char *old_locale = setlocale(LC_TIME, "C");
|
||||||
#ifdef _WIN32
|
#ifdef _WIN32
|
||||||
|
@ -133,45 +54,73 @@ void csvConvertTime2String(time_t time_value, char* time_buf, size_t buf_size) {
|
||||||
#endif
|
#endif
|
||||||
strftime(time_buf, buf_size, g_arguments->csv_ts_format, &tm_result);
|
strftime(time_buf, buf_size, g_arguments->csv_ts_format, &tm_result);
|
||||||
if (old_locale) {
|
if (old_locale) {
|
||||||
(LC_TIME, old_locale);
|
setlocale(LC_TIME, old_locale);
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static CsvNamingType csvGetFileNamingType(SSuperTable* stb) {
|
static CsvNamingType csvGetFileNamingType(SSuperTable* stb) {
|
||||||
if (stb->interlaceRows > 0) {
|
if (stb->interlaceRows > 0) {
|
||||||
if (g_arguments->csv_ts_format) {
|
if (g_arguments->csv_ts_format) {
|
||||||
return CSV_NAMING_TIME_SLICE;
|
return CSV_NAMING_I_TIME_SLICE;
|
||||||
} else {
|
} else {
|
||||||
return CSV_NAMING_SINGLE;
|
return CSV_NAMING_I_SINGLE;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (g_arguments->csv_ts_format) {
|
if (g_arguments->csv_ts_format) {
|
||||||
return CSV_NAMING_THREAD_TIME_SLICE;
|
return CSV_NAMING_B_THREAD_TIME_SLICE;
|
||||||
} else {
|
} else {
|
||||||
return CSV_NAMING_THREAD;
|
return CSV_NAMING_B_THREAD;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void csvGenEndTimestamp(CsvWriteMeta* meta, SDataBase* db) {
|
static void csvCalcTimestampStep(CsvWriteMeta* meta) {
|
||||||
time_t end_ts = 0;
|
time_t ts_step = 0;
|
||||||
|
|
||||||
if (db->precision == TSDB_TIME_PRECISION_MICRO) {
|
if (meta->db->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||||
end_ts = meta->end_secs * 1000000L;
|
ts_step = g_arguments->csv_ts_intv_secs * 1000000L;
|
||||||
} else if (db->precision == TSDB_TIME_PRECISION_NANO) {
|
} else if (db->precision == TSDB_TIME_PRECISION_NANO) {
|
||||||
end_ts = meta->end_secs * 1000000000L;
|
ts_step = g_arguments->csv_ts_intv_secs * 1000000000L;
|
||||||
} else {
|
} else {
|
||||||
end_ts = meta->end_secs * 1000L;
|
ts_step = g_arguments->csv_ts_intv_secs * 1000L;
|
||||||
}
|
}
|
||||||
meta->end_ts = end_ts;
|
meta->ts_step = ts_step;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void csvCalcCtbRange(CsvThreadMeta* meta, size_t total_threads, int64_t ctb_offset, int64_t ctb_count) {
|
||||||
|
uint64_t ctb_start_idx = 0;
|
||||||
|
uint64_t ctb_end_idx = 0;
|
||||||
|
size_t tid_idx = meta->thread_id - 1;
|
||||||
|
size_t base = ctb_count / total_threads;
|
||||||
|
size_t remainder = ctb_count % total_threads;
|
||||||
|
|
||||||
|
if (tid_idx < remainder) {
|
||||||
|
ctb_start_idx = ctb_offset + tid_idx * (base + 1);
|
||||||
|
ctb_end_idx = ctb_start_idx + (base + 1);
|
||||||
|
} else {
|
||||||
|
ctb_start_idx = ctb_offset + remainder * (base + 1) + (tid_idx - remainder) * base;
|
||||||
|
ctb_end_idx = ctb_start_idx + base;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ctb_end_idx > ctb_offset + ctb_count) {
|
||||||
|
ctb_end_idx = ctb_offset + ctb_count;
|
||||||
|
}
|
||||||
|
|
||||||
|
meta->ctb_start_idx = ctb_start_idx;
|
||||||
|
meta->ctb_end_idx = ctb_end_idx;
|
||||||
|
meta->ctb_count = ctb_count;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void csvGenThreadFormatter(CsvWriteMeta* meta) {
|
static void csvGenThreadFormatter(CsvWriteMeta* meta) {
|
||||||
int digits = 0;
|
int digits = 0;
|
||||||
|
|
||||||
if (meta->total_threads == 0) {
|
if (meta->total_threads == 0) {
|
||||||
digits = 1;
|
digits = 1;
|
||||||
} else {
|
} else {
|
||||||
|
@ -181,52 +130,50 @@ static void csvGenThreadFormatter(CsvWriteMeta* meta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (digits <= 1) {
|
if (digits <= 1) {
|
||||||
(void)sprintf(meta->thread_formatter, "%%d");
|
(void)snprintf(meta->thread_formatter, sizeof(meta->thread_formatter), "%%d");
|
||||||
} else {
|
} else {
|
||||||
(void)snprintf(meta->thread_formatter, sizeof(meta->thread_formatter), "%%0%dd", digits);
|
(void)snprintf(meta->thread_formatter, sizeof(meta->thread_formatter), "%%0%dd", digits);
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static CsvWriteMeta csvInitFileNamingMeta(SDataBase* db, SSuperTable* stb) {
|
static CsvWriteMeta csvInitWriteMeta(SDataBase* db, SSuperTable* stb) {
|
||||||
CsvWriteMeta meta = {
|
CsvWriteMeta meta = {
|
||||||
.naming_type = CSV_NAMING_SINGLE,
|
.naming_type = CSV_NAMING_I_SINGLE,
|
||||||
.start_secs = 0,
|
|
||||||
.end_secs = 0,
|
|
||||||
.thread_id = 0,
|
|
||||||
.total_threads = 1,
|
.total_threads = 1,
|
||||||
.thread_formatter = {}
|
.thread_formatter = {},
|
||||||
|
.db = db,
|
||||||
|
.stb = stb,
|
||||||
|
.start_ts = stb->startTimestamp,
|
||||||
|
.end_ts = stb->startTimestamp + stb->timestamp_step * stb->insertRows,
|
||||||
|
.ts_step = stb->timestamp_step * stb->insertRows,
|
||||||
|
.interlace_step = stb->timestamp_step * stb->interlaceRows
|
||||||
};
|
};
|
||||||
|
|
||||||
meta.naming_type = csvGetFileNamingType(stb);
|
meta.naming_type = csvGetFileNamingType(stb);
|
||||||
|
|
||||||
switch (meta.naming_type) {
|
switch (meta.naming_type) {
|
||||||
case CSV_NAMING_SINGLE: {
|
case CSV_NAMING_I_SINGLE: {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case CSV_NAMING_TIME_SLICE: {
|
case CSV_NAMING_I_TIME_SLICE: {
|
||||||
meta.start_secs = csvGetStartSeconds(db, stb);
|
csvCalcTimestampStep(&meta);
|
||||||
meta.end_secs = meta.start_secs + g_arguments->csv_ts_intv_secs;
|
|
||||||
csvGenEndTimestamp(&meta, db);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case CSV_NAMING_THREAD: {
|
case CSV_NAMING_B_THREAD: {
|
||||||
meta.thread_id = 1;
|
|
||||||
meta.total_threads = g_arguments->nthreads;
|
meta.total_threads = g_arguments->nthreads;
|
||||||
csvGenThreadFormatter(&meta);
|
csvGenThreadFormatter(&meta);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case CSV_NAMING_THREAD_TIME_SLICE: {
|
case CSV_NAMING_B_THREAD_TIME_SLICE: {
|
||||||
meta.thread_id = 1;
|
|
||||||
meta.total_threads = g_arguments->nthreads;
|
meta.total_threads = g_arguments->nthreads;
|
||||||
csvGenThreadFormatter(&meta);
|
csvGenThreadFormatter(&meta);
|
||||||
meta.start_secs = csvGetStartSeconds(db, stb);
|
csvCalcTimestampStep(&meta);
|
||||||
meta.end_secs = meta.start_secs + g_arguments->csv_ts_intv_secs;
|
|
||||||
csvGenEndTimestamp(&meta, db);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default: {
|
default: {
|
||||||
meta.naming_type = CSV_NAMING_SINGLE;
|
meta.naming_type = CSV_NAMING_I_SINGLE;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -235,7 +182,67 @@ static CsvWriteMeta csvInitFileNamingMeta(SDataBase* db, SSuperTable* stb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int csvGetFileFullname(CsvWriteMeta* meta, char* fullname, size_t size) {
|
static CsvThreadMeta csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id) {
|
||||||
|
SDataBase* db = write_meta->db;
|
||||||
|
SSuperTable* stb = write_meta->stb;
|
||||||
|
CsvThreadMeta meta = {
|
||||||
|
.ctb_start_idx = 0,
|
||||||
|
.ctb_end_idx = 0,
|
||||||
|
.ctb_count = 0,
|
||||||
|
.start_secs = 0,
|
||||||
|
.end_secs = 0,
|
||||||
|
.thread_id = thread_id,
|
||||||
|
.tags_buf_bucket = NULL,
|
||||||
|
.cols_buf = NULL
|
||||||
|
};
|
||||||
|
|
||||||
|
csvCalcCtbRange(&meta, write_meta->total_threads, stb->childTblFrom, stb->childTblCount);
|
||||||
|
|
||||||
|
switch (write_meta->naming_type) {
|
||||||
|
case CSV_NAMING_I_SINGLE:
|
||||||
|
case CSV_NAMING_B_THREAD: {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CSV_NAMING_I_TIME_SLICE:
|
||||||
|
case CSV_NAMING_B_THREAD_TIME_SLICE: {
|
||||||
|
meta.start_secs = csvGetStartSeconds(db->precision, stb->startTimestamp);
|
||||||
|
meta.end_secs = meta.start_secs + g_arguments->csv_ts_intv_secs;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
meta.naming_type = CSV_NAMING_I_SINGLE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return meta;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void csvUpdateSliceRange(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta, int64_t last_end_ts) {
|
||||||
|
SDataBase* db = write_meta->db;
|
||||||
|
|
||||||
|
switch (write_meta->naming_type) {
|
||||||
|
case CSV_NAMING_I_SINGLE:
|
||||||
|
case CSV_NAMING_B_THREAD: {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CSV_NAMING_I_TIME_SLICE:
|
||||||
|
case CSV_NAMING_B_THREAD_TIME_SLICE: {
|
||||||
|
thread_meta->start_secs = csvGetStartSeconds(db->precision, last_end_ts);
|
||||||
|
thread_meta->end_secs = thread_meta.start_secs + g_arguments->csv_ts_intv_secs;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta, char* fullname, size_t size) {
|
||||||
char thread_buf[SMALL_BUFF_LEN];
|
char thread_buf[SMALL_BUFF_LEN];
|
||||||
char start_time_buf[MIDDLE_BUFF_LEN];
|
char start_time_buf[MIDDLE_BUFF_LEN];
|
||||||
char end_time_buf[MIDDLE_BUFF_LEN];
|
char end_time_buf[MIDDLE_BUFF_LEN];
|
||||||
|
@ -244,22 +251,22 @@ int csvGetFileFullname(CsvWriteMeta* meta, char* fullname, size_t size) {
|
||||||
const char* file_prefix = g_arguments->csv_file_prefix;
|
const char* file_prefix = g_arguments->csv_file_prefix;
|
||||||
|
|
||||||
switch (meta->naming_type) {
|
switch (meta->naming_type) {
|
||||||
case CSV_NAMING_SINGLE: {
|
case CSV_NAMING_I_SINGLE: {
|
||||||
ret = snprintf(fullname, size, "%s%s.csv", base_path, file_prefix);
|
ret = snprintf(fullname, size, "%s%s.csv", base_path, file_prefix);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case CSV_NAMING_TIME_SLICE: {
|
case CSV_NAMING_I_TIME_SLICE: {
|
||||||
csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf));
|
csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf));
|
||||||
csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf));
|
csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf));
|
||||||
ret = snprintf(fullname, size, "%s%s_%s_%s.csv", base_path, file_prefix, start_time_buf, end_time_buf);
|
ret = snprintf(fullname, size, "%s%s_%s_%s.csv", base_path, file_prefix, start_time_buf, end_time_buf);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case CSV_NAMING_THREAD: {
|
case CSV_NAMING_B_THREAD: {
|
||||||
(void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id);
|
(void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id);
|
||||||
ret = snprintf(fullname, size, "%s%s_%s.csv", base_path, file_prefix, thread_buf);
|
ret = snprintf(fullname, size, "%s%s_%s.csv", base_path, file_prefix, thread_buf);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case CSV_NAMING_THREAD_TIME_SLICE: {
|
case CSV_NAMING_B_THREAD_TIME_SLICE: {
|
||||||
(void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id);
|
(void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id);
|
||||||
csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf));
|
csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf));
|
||||||
csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf));
|
csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf));
|
||||||
|
@ -276,184 +283,55 @@ int csvGetFileFullname(CsvWriteMeta* meta, char* fullname, size_t size) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
uint32_t csvCalcInterlaceRows(CsvWriteMeta* meta, SSuperTable* stb, int64_t ts) {
|
static int64_t csvCalcSliceBatchTimestamp(CsvWriteMeta* write_meta, int64_t slice_cur_ts, int64_t slice_end_ts) {
|
||||||
uint32_t need_rows = 0;
|
int64_t slice_batch_ts = 0;
|
||||||
|
|
||||||
|
switch (write_meta->naming_type) {
|
||||||
switch (meta->naming_type) {
|
case CSV_NAMING_I_SINGLE:
|
||||||
case CSV_NAMING_SINGLE: {
|
case CSV_NAMING_I_TIME_SLICE: {
|
||||||
need_rows = stb->interlaceRows;
|
slice_batch_ts = MIN(slice_cur_ts + write_meta->interlace_step, slice_end_ts);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case CSV_NAMING_TIME_SLICE: {
|
case CSV_NAMING_B_THREAD:
|
||||||
(meta->end_ts - ts) / stb->timestamp_step
|
case CSV_NAMING_B_THREAD_TIME_SLICE: {
|
||||||
need_rows = stb->interlaceRows;
|
slice_batch_ts = slice_end_ts;
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CSV_NAMING_THREAD: {
|
|
||||||
(void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id);
|
|
||||||
ret = snprintf(fullname, size, "%s%s_%s.csv", base_path, file_prefix, thread_buf);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case CSV_NAMING_THREAD_TIME_SLICE: {
|
|
||||||
(void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id);
|
|
||||||
csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf));
|
|
||||||
csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf));
|
|
||||||
ret = snprintf(fullname, size, "%s%s_%s_%s_%s.csv", base_path, file_prefix, thread_buf, start_time_buf, end_time_buf);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default: {
|
default: {
|
||||||
ret = -1;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return slice_batch_ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int csvGenRowFields(char* buf, int size, SSuperTable* stb, int fields_cate, int64_t* k) {
|
||||||
|
int pos = 0;
|
||||||
|
BArray* fields = NULL;
|
||||||
|
int16_t field_count = 0;
|
||||||
|
char* binanry_prefix = stb->binaryPrefex ? stb->binaryPrefex : "";
|
||||||
|
char* nchar_prefix = stb->ncharPrefex ? stb->ncharPrefex : "";
|
||||||
|
|
||||||
|
if (!buf || !stb || !k || size <= 0) {
|
||||||
static int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fp, char* rows_buf, int rows_buf_len) {
|
|
||||||
char fullname[MAX_PATH_LEN] = {};
|
|
||||||
CsvWriteMeta meta = csvInitFileNamingMeta();
|
|
||||||
|
|
||||||
int ret = csvGetFileFullname(&meta, fullname, sizeof(fullname));
|
|
||||||
if (ret < 0) {
|
|
||||||
errorPrint("Failed to generate csv filename. database: %s, super table: %s, naming type: %d.\n",
|
|
||||||
db->dbName, stb->stbName, meta.naming_type);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ret = 0;
|
if (fields_cate == GEN_ROW_FIELDS_TAG) {
|
||||||
int pos = 0;
|
fields = stb->tags;
|
||||||
int64_t n = 0; // already inserted rows for one child table
|
field_count = stb->tags->size;
|
||||||
int64_t tk = 0;
|
} else {
|
||||||
int64_t show = 0;
|
fields = stb->cols;
|
||||||
int64_t ts = 0;
|
field_count = stb->cols->size;
|
||||||
int64_t last_ts = stb->startTimestamp;
|
|
||||||
|
|
||||||
// init buffer
|
|
||||||
char** tags_buf_bucket = (char **)benchCalloc(stb->childTblCount, sizeof(char *), true);
|
|
||||||
int cols_buf_length = stb->lenOfCols + stb->cols->size;
|
|
||||||
char* cols_buf = (char *)benchCalloc(1, cols_buf_length, true);
|
|
||||||
|
|
||||||
for (int64_t i = 0; i < stb->childTblCount; ++i) {
|
|
||||||
int tags_buf_length = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->tags->size;
|
|
||||||
tags_buf_bucket[i] = benchCalloc(1, tags_buf_length, true);
|
|
||||||
if (!tags_buf_bucket[i]) {
|
|
||||||
ret = -1;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = csvGenRowTagData(tags_buf_bucket[i], tags_buf_length, stb, i, &tk);
|
|
||||||
if (!ret) {
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
while (n < stb->insertRows ) {
|
|
||||||
for (int64_t i = 0; i < stb->childTblCount; ++i) {
|
|
||||||
ts = last_ts;
|
|
||||||
int64_t ck = 0;
|
|
||||||
|
|
||||||
|
|
||||||
// calc need insert rows
|
|
||||||
uint32_t need_rows = csvCalcInterlaceRows(&meta, stb, ts)
|
|
||||||
|
|
||||||
int64_t needInserts = stb->interlaceRows;
|
|
||||||
if(needInserts > stb->insertRows - n) {
|
|
||||||
needInserts = stb->insertRows - n;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int64_t j = 0; j < needInserts; j++) {
|
|
||||||
genColumnData(cols_buf, stb, ts, db->precision, &ck);
|
|
||||||
// combine tags,cols
|
|
||||||
pos += sprintf(buf + pos, "%s,%s\n", tags_buf_bucket[i], cols_buf);
|
|
||||||
if (rows_buf_len - pos < minRemain) {
|
|
||||||
// submit
|
|
||||||
ret = writeCsvFile(fp, buf, pos);
|
|
||||||
if (ret != 0) {
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
pos = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ts move next
|
|
||||||
ts += stb->timestamp_step;
|
|
||||||
|
|
||||||
// check cancel
|
|
||||||
if(g_arguments->terminate) {
|
|
||||||
infoPrint("%s", "You are cancel, exiting ... \n");
|
|
||||||
ret = -1;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
// print show
|
|
||||||
if (++show % SHOW_CNT == 0) {
|
|
||||||
infoPrint("interlace write child table index = %"PRId64 " all rows = %"PRId64 "\n", i+1, show);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// if last child table
|
|
||||||
if (i + 1 == stb->childTblCount ) {
|
|
||||||
n += needInserts;
|
|
||||||
last_ts = ts;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pos > 0) {
|
|
||||||
ret = writeCsvFile(fp, buf, pos);
|
|
||||||
pos = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
end:
|
|
||||||
// free
|
|
||||||
for (int64_t m = 0 ; m < stb->childTblCount; m ++) {
|
|
||||||
tmfree(tags_buf_bucket[m]);
|
|
||||||
}
|
|
||||||
tmfree(tags_buf_bucket);
|
|
||||||
tmfree(cols_buf);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// gen tag data
|
|
||||||
int csvGenRowTagData(char* buf, size_t size, SSuperTable* stb, int64_t index, int64_t* k) {
|
|
||||||
// tbname
|
|
||||||
int pos = snprintf(buf, size, "\'%s%"PRId64"\'", stb->childTblPrefix, index);
|
|
||||||
// tags
|
|
||||||
pos += csvGenRowFields(buf + pos, stb->tags, stb->tags->size, stb->binaryPrefex, stb->ncharPrefex, k);
|
|
||||||
|
|
||||||
return (pos > 0 && (size_t)pos < size) ? 0 : -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// gen column data
|
|
||||||
char * genColumnData(char* cols_csv_buf, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k) {
|
|
||||||
char szTime[128] = {0};
|
|
||||||
toolsFormatTimestamp(szTime, ts, precision);
|
|
||||||
int pos = sprintf(cols_csv_buf, "\'%s\'", szTime);
|
|
||||||
|
|
||||||
// columns
|
|
||||||
csvGenRowFields(cols_csv_buf + pos, stb->cols, stb->cols->size, stb->binaryPrefex, stb->ncharPrefex, k);
|
|
||||||
return cols_csv_buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int32_t csvGenRowFields(char* buf, BArray* fields, int16_t field_count, char* binanry_prefix, char* nchar_prefix, int64_t* k) {
|
|
||||||
int32_t pos = 0;
|
|
||||||
|
|
||||||
for (uint16_t i = 0; i < field_count; ++i) {
|
for (uint16_t i = 0; i < field_count; ++i) {
|
||||||
Field* field = benchArrayGet(fields, i);
|
Field* field = benchArrayGet(fields, i);
|
||||||
char* prefix = "";
|
char* prefix = "";
|
||||||
if(field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY) {
|
if(field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY) {
|
||||||
if (binanry_prefix) {
|
prefix = binanry_prefix;
|
||||||
prefix = binanry_prefix;
|
|
||||||
}
|
|
||||||
} else if(field->type == TSDB_DATA_TYPE_NCHAR) {
|
} else if(field->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
if (nchar_prefix) {
|
prefix = nchar_prefix;
|
||||||
prefix = nchar_prefix;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
pos += dataGenByField(field, buf, pos, prefix, k, "");
|
pos += dataGenByField(field, buf, pos, prefix, k, "");
|
||||||
}
|
}
|
||||||
|
@ -462,68 +340,297 @@ int32_t csvGenRowFields(char* buf, BArray* fields, int16_t field_count, char* bi
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int csvGenRowTagData(char* buf, int size, SSuperTable* stb, int64_t index, int64_t* k) {
|
||||||
int csvGenStbInterlace(SDataBase* db, SSuperTable* stb) {
|
if (!buf || !stb || !k || size <= 0) {
|
||||||
|
|
||||||
|
|
||||||
int ret = 0;
|
|
||||||
char outFile[MAX_FILE_NAME_LEN] = {0};
|
|
||||||
obtainCsvFile(outFile, db, stb, outDir);
|
|
||||||
FILE* fp = fopen(outFile, "w");
|
|
||||||
if(fp == NULL) {
|
|
||||||
errorPrint("failed create csv file. file=%s, last errno=%d strerror=%s \n", outFile, errno, strerror(errno));
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int row_buf_len = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->lenOfCols + stb->tags->size + stb->cols->size;
|
// tbname
|
||||||
int rows_buf_len = row_buf_len * g_arguments->interlaceRows;
|
int pos = snprintf(buf, size, "\'%s%"PRId64"\'", stb->childTblPrefix, index);
|
||||||
char* rows_buf = benchCalloc(1, rows_buf_len, true);
|
|
||||||
|
|
||||||
infoPrint("start write csv file: %s \n", outFile);
|
// tags
|
||||||
|
pos += csvGenRowFields(buf + pos, size - pos, stb, GEN_ROW_FIELDS_TAG, k);
|
||||||
|
|
||||||
// interlace mode
|
return (pos > 0 && pos < size) ? pos : -1;
|
||||||
ret = interlaceWriteCsv(db, stb, fp, rows_buf, rows_buf_len);
|
|
||||||
|
|
||||||
|
|
||||||
tmfree(rows_buf);
|
|
||||||
fclose(fp);
|
|
||||||
|
|
||||||
succPrint("end write csv file: %s \n", outFile);
|
|
||||||
|
|
||||||
|
|
||||||
// wait threads
|
|
||||||
for (int i = 0; i < threadCnt; i++) {
|
|
||||||
infoPrint("pthread_join %d ...\n", i);
|
|
||||||
pthread_join(pids[i], NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void csvGenPrepare(SDataBase* db, SSuperTable* stb) {
|
static int csvGenRowColData(char* buf, int size, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k) {
|
||||||
stbInfo->lenOfTags = accumulateRowLen(stbInfo->tags, stbInfo->iface);
|
char ts_fmt[128] = {0};
|
||||||
stbInfo->lenOfCols = accumulateRowLen(stbInfo->cols, stbInfo->iface);
|
toolsFormatTimestamp(ts_fmt, ts, precision);
|
||||||
|
int pos = snprintf(buf, size, "\'%s\'", ts_fmt);
|
||||||
|
|
||||||
|
// columns
|
||||||
|
pos += csvGenRowFields(buf + pos, size - pos, stb, GEN_ROW_FIELDS_COL, k);
|
||||||
|
return (pos > 0 && pos < size) ? pos : -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static CsvRowFieldsBuf* csvGenCtbTagData(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) {
|
||||||
|
SSuperTable* stb = write_meta->stb;
|
||||||
|
int ret = 0;
|
||||||
|
int64_t tk = 0;
|
||||||
|
|
||||||
|
if (!write_meta || !thread_meta) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
CsvRowFieldsBuf* tags_buf_bucket = (CsvRowFieldsBuf*)benchCalloc(thread_meta->ctb_count, sizeof(CsvRowFieldsBuf), true);
|
||||||
|
if (!tags_buf_bucket) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* tags_buf = NULL;
|
||||||
|
int tags_buf_size = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->tags->size;
|
||||||
|
for (uint64_t i = 0; i < thread_meta->ctb_count; ++i) {
|
||||||
|
tags_buf = benchCalloc(1, tags_buf_size, true);
|
||||||
|
if (!tags_buf) {
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
|
||||||
|
tags_buf_bucket[i].buf = tags_buf;
|
||||||
|
tags_buf_bucket[i].buf_size = tags_buf_size;
|
||||||
|
|
||||||
|
ret = csvGenRowTagData(tags_buf, tags_buf_size, stb, thread_meta->ctb_start_idx + i, &tk);
|
||||||
|
if (ret <= 0) {
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
|
||||||
|
tags_buf_bucket[i].length = ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
return tags_buf_bucket;
|
||||||
|
|
||||||
|
error:
|
||||||
|
csvFreeCtbTagData(thread_meta, tags_buf_bucket);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void csvFreeCtbTagData(CsvThreadMeta* thread_meta, CsvRowFieldsBuf* tags_buf_bucket) {
|
||||||
|
if (!thread_meta || !tags_buf_bucket) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (uint64_t i = 0 ; i < thread_meta->ctb_count; ++i) {
|
||||||
|
char* tags_buf = tags_buf_bucket[i].buf;
|
||||||
|
if (tags_buf) {
|
||||||
|
tmfree(tags_buf_bucket);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tmfree(tags_buf_bucket);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int csvGenStb(SDataBase* db, SSuperTable* stb) {
|
static int csvWriteFile(FILE* fp, uint64_t ctb_idx, int64_t cur_ts, int64_t* ck, CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) {
|
||||||
|
SDataBase* db = write_meta->db;
|
||||||
|
SSuperTable* stb = write_meta->stb;
|
||||||
|
CsvRowFieldsBuf* tags_buf_bucket = thread_meta->tags_buf_bucket;
|
||||||
|
CsvRowFieldsBuf* tags_buf = &tags_buf_bucket[ctb_idx];
|
||||||
|
CsvRowFieldsBuf* cols_buf = thread_meta->cols_buf;
|
||||||
|
int ret = 0;
|
||||||
|
|
||||||
|
|
||||||
|
ret = csvGenRowColData(cols_buf->buf, cols_buf->buf_size, stb, cur_ts, db->precision, ck);
|
||||||
|
if (ret <= 0) {
|
||||||
|
errorPrint("Failed to generate csv column data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n",
|
||||||
|
db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
cols_buf->length = ret;
|
||||||
|
|
||||||
|
|
||||||
|
// write header
|
||||||
|
if (thread_meta->output_header) {
|
||||||
|
// TODO
|
||||||
|
|
||||||
|
thread_meta->output_header = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// write columns
|
||||||
|
size_t written = fwrite(cols_buf->buf, 1, cols_buf->length, fp);
|
||||||
|
if (written != cols_buf->length) {
|
||||||
|
errorPrint("Failed to write csv column data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n",
|
||||||
|
cols_buf->length, written, db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// write tags
|
||||||
|
size_t written = fwrite(tags_buf->buf, 1, tags_buf->length, fp);
|
||||||
|
if (written != tags_buf->length) {
|
||||||
|
errorPrint("Failed to write csv tag data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n",
|
||||||
|
tags_buf->length, written, db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void* csvGenStbThread(void* arg) {
|
||||||
|
CsvThreadArgs* thread_arg = (CsvThreadArgs*)arg;
|
||||||
|
CsvWriteMeta* write_meta = thread_arg->write_meta;
|
||||||
|
CsvThreadMeta* thread_meta = &thread_arg->thread_meta;
|
||||||
|
SDataBase* db = write_meta->db;
|
||||||
|
SSuperTable* stb = write_meta->stb;
|
||||||
|
|
||||||
|
int64_t cur_ts = 0;
|
||||||
|
int64_t slice_cur_ts = 0;
|
||||||
|
int64_t slice_end_ts = 0;
|
||||||
|
int64_t slice_batch_ts = 0;
|
||||||
|
int64_t slice_ctb_cur_ts = 0;
|
||||||
|
int64_t ck = 0;
|
||||||
|
uint64_t ctb_idx = 0;
|
||||||
|
int ret = 0;
|
||||||
|
FILE* fp = NULL;
|
||||||
|
char fullname[MAX_PATH_LEN] = {};
|
||||||
|
|
||||||
|
|
||||||
|
// tags buffer
|
||||||
|
CsvRowFieldsBuf* tags_buf_bucket = csvGenCtbTagData(write_meta, thread_meta);
|
||||||
|
if (!tags_buf_bucket) {
|
||||||
|
errorPrint("Failed to generate csv tag data. database: %s, super table: %s, naming type: %d, thread index: %d.\n",
|
||||||
|
db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// column buffer
|
||||||
|
int buf_size = stb->lenOfCols + stb->cols->size;
|
||||||
|
char* buf = (char*)benchCalloc(1, buf_size, true);
|
||||||
|
if (!buf) {
|
||||||
|
errorPrint("Failed to malloc csv column buffer. database: %s, super table: %s, naming type: %d, thread index: %d.\n",
|
||||||
|
db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
CsvRowFieldsBuf cols_buf = {
|
||||||
|
.buf = buf,
|
||||||
|
.buf_size = buf_size,
|
||||||
|
.length = 0
|
||||||
|
};
|
||||||
|
|
||||||
|
thread_meta->tags_buf_bucket = tags_buf_bucket;
|
||||||
|
thread_meta->cols_buf = &cols_buf;
|
||||||
|
|
||||||
|
|
||||||
|
for (cur_ts = write_meta->start_ts; cur_ts < write_meta->end_ts; cur_ts += write_meta->ts_step) {
|
||||||
|
// get filename
|
||||||
|
fullname[MAX_PATH_LEN] = {};
|
||||||
|
ret = csvGetFileFullname(write_meta, thread_meta, fullname, sizeof(fullname));
|
||||||
|
if (ret < 0) {
|
||||||
|
errorPrint("Failed to generate csv filename. database: %s, super table: %s, naming type: %d, thread index: %d.\n",
|
||||||
|
db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
// create fd
|
||||||
|
fp = fopen(fullname, "w");
|
||||||
|
if (fp == NULL) {
|
||||||
|
errorPrint("Failed to create csv file. thread index: %d, file: %s, errno: %d, strerror: %s.\n",
|
||||||
|
thread_meta->thread_id, fullname, errno, strerror(errno));
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
thread_meta->output_header = g_arguments->csv_output_header;
|
||||||
|
slice_cur_ts = cur_ts;
|
||||||
|
slice_end_ts = MIN(cur_ts + write_meta->ts_step, write_meta->end_ts);
|
||||||
|
|
||||||
|
// write data
|
||||||
|
while (slice_cur_ts < slice_end_ts) {
|
||||||
|
slice_batch_ts = csvCalcSliceBatchTimestamp(write_meta, slice_cur_ts, slice_end_ts);
|
||||||
|
|
||||||
|
for (ctb_idx = 0; ctb_idx < thread_meta->ctb_count; ++ctb_idx) {
|
||||||
|
for (slice_ctb_cur_ts = slice_cur_ts; slice_ctb_cur_ts < slice_batch_ts; slice_ctb_cur_ts += write_meta->stb->timestamp_step) {
|
||||||
|
ret = csvWriteFile(fp, ctb_idx, slice_ctb_cur_ts, &ck, write_meta, thread_meta);
|
||||||
|
if (!ret) {
|
||||||
|
errorPrint("Failed to write csv file. thread index: %d, file: %s, errno: %d, strerror: %s.\n",
|
||||||
|
thread_meta->thread_id, fullname, errno, strerror(errno));
|
||||||
|
fclose(fp);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
ck += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
slice_cur_ts = slice_batch_ts;
|
||||||
|
}
|
||||||
|
|
||||||
|
fclose(fp);
|
||||||
|
csvUpdateSliceRange(write_meta, thread_meta, last_end_ts);
|
||||||
|
}
|
||||||
|
|
||||||
|
end:
|
||||||
|
csvFreeCtbTagData(tags_buf_bucket);
|
||||||
|
tmfree(cols_buf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) {
|
||||||
|
int ret = 0;
|
||||||
|
CsvWriteMeta write_meta = csvInitWriteMeta(db, stb);
|
||||||
|
|
||||||
|
pthread_t* pids = benchCalloc(write_meta.total_threads, sizeof(pthread_t), false);
|
||||||
|
if (!pids) {
|
||||||
|
ret = -1;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
CsvThreadArgs* args = benchCalloc(write_meta.total_threads, sizeof(CsvThreadArgs), false);
|
||||||
|
if (!args) {
|
||||||
|
ret = -1;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (uint32_t i = 0; (i < write_meta.total_threads && !g_arguments->terminate); ++i) {
|
||||||
|
CsvThreadArgs* arg = &args[i];
|
||||||
|
arg->write_meta = &write_meta;
|
||||||
|
arg->thread_meta = csvInitThreadMeta(&write_meta, i + 1);
|
||||||
|
|
||||||
|
ret = pthread_create(&pids[i], NULL, csvGenStbThread, arg);
|
||||||
|
if (!ret) {
|
||||||
|
perror("Failed to create thread");
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait threads
|
||||||
|
for (uint32_t i = 0; i < write_meta.total_threads; ++i) {
|
||||||
|
infoPrint("pthread_join %d ...\n", i);
|
||||||
|
pthread_join(pids[i], NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
end:
|
||||||
|
tmfree(pids);
|
||||||
|
tmfree(args);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void csvGenPrepare(SDataBase* db, SSuperTable* stb) {
|
||||||
|
stb->lenOfTags = accumulateRowLen(stb->tags, stb->iface);
|
||||||
|
stb->lenOfCols = accumulateRowLen(stb->cols, stb->iface);
|
||||||
|
|
||||||
|
if (stb->childTblTo) {
|
||||||
|
stb->childTblCount = stb->childTblTo - stb->childTblFrom;
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int csvGenStb(SDataBase* db, SSuperTable* stb) {
|
||||||
// prepare
|
// prepare
|
||||||
csvGenPrepare(db, stb);
|
csvGenPrepare(db, stb);
|
||||||
|
|
||||||
|
return csvGenStbProcess(db, stb);
|
||||||
int ret = 0;
|
|
||||||
if (stb->interlaceRows > 0) {
|
|
||||||
// interlace mode
|
|
||||||
ret = csvGenStbInterlace(db, stb);
|
|
||||||
} else {
|
|
||||||
// batch mode
|
|
||||||
ret = csvGenStbBatch(db, stb);
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1619,6 +1619,23 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) {
|
||||||
g_arguments->csv_ts_interval = "1d";
|
g_arguments->csv_ts_interval = "1d";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// csv output header
|
||||||
|
g_arguments->csv_output_header = false;
|
||||||
|
tools_cJSON* oph = tools_cJSON_GetObjectItem(json, "csv_output_header");
|
||||||
|
if (oph && oph->type == tools_cJSON_String && oph->valuestring != NULL) {
|
||||||
|
if (0 == strcasecmp(oph->valuestring, "yes")) {
|
||||||
|
g_arguments->csv_output_header = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// csv tbname alias
|
||||||
|
tools_cJSON* tba = tools_cJSON_GetObjectItem(json, "csv_tbname_alias");
|
||||||
|
if (tba && tba->type == tools_cJSON_String && tba->valuestring != NULL) {
|
||||||
|
g_arguments->csv_tbname_alias = tba->valuestring;
|
||||||
|
} else {
|
||||||
|
g_arguments->csv_tbname_alias = "device_id";
|
||||||
|
}
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue