feat: csv supports optional table header

This commit is contained in:
Yaming Pei 2025-02-27 16:18:12 +08:00
parent 5ce4bd2465
commit 8203389ada
2 changed files with 66 additions and 19 deletions

View File

@ -41,6 +41,8 @@ typedef struct {
CsvNamingType naming_type; CsvNamingType naming_type;
size_t total_threads; size_t total_threads;
char thread_formatter[TINY_BUFF_LEN]; char thread_formatter[TINY_BUFF_LEN];
char csv_header[LARGE_BUFF_LEN];
int csv_header_length;
SDataBase* db; SDataBase* db;
SSuperTable* stb; SSuperTable* stb;
int64_t start_ts; int64_t start_ts;

View File

@ -138,9 +138,44 @@ static void csvGenThreadFormatter(CsvWriteMeta* meta) {
} }
static void csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* write_meta) { static int csvGenCsvHeader(CsvWriteMeta* write_meta) {
SDataBase* db = write_meta->db;
SSuperTable* stb = write_meta->stb;
char* buf = write_meta->csv_header;
int pos = 0;
int size = sizeof(write_meta->csv_header);
if (!g_arguments->csv_output_header) {
return 0;
}
// ts
pos += snprintf(buf + pos, size - pos, "ts");
// columns
for (size_t i = 0; i < stb->cols->size; ++i) {
Field* col = benchArrayGet(stb->cols, i);
pos += snprintf(buf + pos, size - pos, ",%s", col->name);
}
// tbname
pos += snprintf(buf + pos, size - pos, ",%s", g_arguments->csv_tbname_alias);
// tags
for (size_t i = 0; i < stb->tags->size; ++i) {
Field* tag = benchArrayGet(stb->tags, i);
pos += snprintf(buf + pos, size - pos, ",%s", tag->name);
}
write_meta->csv_header_length = (pos > 0 && pos < size) ? pos : 0;
return (pos > 0 && pos < size) ? 0 : -1;
}
static int csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* write_meta) {
write_meta->naming_type = csvGetFileNamingType(stb); write_meta->naming_type = csvGetFileNamingType(stb);
write_meta->total_threads = 1; write_meta->total_threads = 1;
write_meta->csv_header_length = 0;
write_meta->db = db; write_meta->db = db;
write_meta->stb = stb; write_meta->stb = stb;
write_meta->start_ts = stb->startTimestamp; write_meta->start_ts = stb->startTimestamp;
@ -148,6 +183,12 @@ static void csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* writ
write_meta->ts_step = stb->timestamp_step * stb->insertRows; write_meta->ts_step = stb->timestamp_step * stb->insertRows;
write_meta->interlace_step = stb->timestamp_step * stb->interlaceRows; write_meta->interlace_step = stb->timestamp_step * stb->interlaceRows;
int ret = csvGenCsvHeader(write_meta);
if (ret < 0) {
errorPrint("Failed to generate csv header data. database: %s, super table: %s, naming type: %d.\n",
db->dbName, stb->stbName, write_meta->naming_type);
return -1;
}
switch (meta.naming_type) { switch (meta.naming_type) {
case CSV_NAMING_I_SINGLE: { case CSV_NAMING_I_SINGLE: {
@ -174,7 +215,7 @@ static void csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* writ
} }
} }
return; return 0;
} }
@ -428,7 +469,8 @@ static int csvWriteFile(FILE* fp, uint64_t ctb_idx, int64_t cur_ts, int64_t* ck,
CsvRowTagsBuf* tags_buf_bucket = thread_meta->tags_buf_bucket; CsvRowTagsBuf* tags_buf_bucket = thread_meta->tags_buf_bucket;
CsvRowColsBuf* tags_buf = &tags_buf_bucket[ctb_idx]; CsvRowColsBuf* tags_buf = &tags_buf_bucket[ctb_idx];
CsvRowColsBuf* cols_buf = thread_meta->cols_buf; CsvRowColsBuf* cols_buf = thread_meta->cols_buf;
int ret = 0; int ret = 0;
size_t written = 0;
ret = csvGenRowColData(cols_buf->buf, cols_buf->buf_size, stb, cur_ts, db->precision, ck); ret = csvGenRowColData(cols_buf->buf, cols_buf->buf_size, stb, cur_ts, db->precision, ck);
@ -440,26 +482,22 @@ static int csvWriteFile(FILE* fp, uint64_t ctb_idx, int64_t cur_ts, int64_t* ck,
cols_buf->length = ret; cols_buf->length = ret;
// write header // write header
if (thread_meta->output_header) { if (thread_meta->output_header) {
// TODO written = fwrite(write_meta->csv_header, 1, write_meta->csv_header_length, fp);
thread_meta->output_header = false; thread_meta->output_header = false;
} }
// write columns // write columns
size_t written = fwrite(cols_buf->buf, 1, cols_buf->length, fp); written = fwrite(cols_buf->buf, 1, cols_buf->length, fp);
if (written != cols_buf->length) { if (written != cols_buf->length) {
errorPrint("Failed to write csv column data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", errorPrint("Failed to write csv column data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n",
cols_buf->length, written, db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx); cols_buf->length, written, db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx);
return -1; return -1;
} }
// write tags // write tags
size_t written = fwrite(tags_buf->buf, 1, tags_buf->length, fp); written = fwrite(tags_buf->buf, 1, tags_buf->length, fp);
if (written != tags_buf->length) { if (written != tags_buf->length) {
errorPrint("Failed to write csv tag data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", errorPrint("Failed to write csv tag data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n",
tags_buf->length, written, db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx); tags_buf->length, written, db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx);
@ -574,23 +612,30 @@ end:
static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) { static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) {
int ret = 0; int ret = 0;
pthread_t* pids = benchCalloc(write_meta.total_threads, sizeof(pthread_t), false);
if (!pids) {
ret = -1;
goto end;
}
CsvWriteMeta* write_meta = benchCalloc(1, sizeof(CsvWriteMeta), false); CsvWriteMeta* write_meta = benchCalloc(1, sizeof(CsvWriteMeta), false);
if (!args) { if (!write_meta) {
ret = -1; ret = -1;
goto end; goto end;
} }
CsvThreadArgs* args = benchCalloc(write_meta.total_threads, sizeof(CsvThreadArgs), false);
ret = csvInitWriteMeta(db, stb, write_meta);
if (ret < 0) {
ret = -1;
goto end;
}
CsvThreadArgs* args = benchCalloc(write_meta->total_threads, sizeof(CsvThreadArgs), false);
if (!args) { if (!args) {
ret = -1; ret = -1;
goto end; goto end;
} }
csvInitWriteMeta(db, stb, write_meta); pthread_t* pids = benchCalloc(write_meta.total_threads, sizeof(pthread_t), false);
if (!pids) {
ret = -1;
goto end;
}
for (uint32_t i = 0; (i < write_meta->total_threads && !g_arguments->terminate); ++i) { for (uint32_t i = 0; (i < write_meta->total_threads && !g_arguments->terminate); ++i) {
CsvThreadArgs* arg = &args[i]; CsvThreadArgs* arg = &args[i];
arg->write_meta = write_meta; arg->write_meta = write_meta;
@ -611,8 +656,8 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) {
end: end:
tmfree(pids); tmfree(pids);
tmfree(write_meta);
tmfree(args); tmfree(args);
tmfree(write_meta);
return ret; return ret;
} }