From 8203389adab54cb76fb9c4e180645a431000964e Mon Sep 17 00:00:00 2001 From: Yaming Pei Date: Thu, 27 Feb 2025 16:18:12 +0800 Subject: [PATCH] feat: csv supports optional table header --- tools/taos-tools/inc/benchCsv.h | 2 + tools/taos-tools/src/benchCsv.c | 83 +++++++++++++++++++++++++-------- 2 files changed, 66 insertions(+), 19 deletions(-) diff --git a/tools/taos-tools/inc/benchCsv.h b/tools/taos-tools/inc/benchCsv.h index 717dfd8a71..c522a12c50 100644 --- a/tools/taos-tools/inc/benchCsv.h +++ b/tools/taos-tools/inc/benchCsv.h @@ -41,6 +41,8 @@ typedef struct { CsvNamingType naming_type; size_t total_threads; char thread_formatter[TINY_BUFF_LEN]; + char csv_header[LARGE_BUFF_LEN]; + int csv_header_length; SDataBase* db; SSuperTable* stb; int64_t start_ts; diff --git a/tools/taos-tools/src/benchCsv.c b/tools/taos-tools/src/benchCsv.c index 673e223959..f1aacaec66 100644 --- a/tools/taos-tools/src/benchCsv.c +++ b/tools/taos-tools/src/benchCsv.c @@ -138,9 +138,44 @@ static void csvGenThreadFormatter(CsvWriteMeta* meta) { } -static void csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* write_meta) { +static int csvGenCsvHeader(CsvWriteMeta* write_meta) { + SDataBase* db = write_meta->db; + SSuperTable* stb = write_meta->stb; + char* buf = write_meta->csv_header; + int pos = 0; + int size = sizeof(write_meta->csv_header); + + if (!g_arguments->csv_output_header) { + return 0; + } + + // ts + pos += snprintf(buf + pos, size - pos, "ts"); + + // columns + for (size_t i = 0; i < stb->cols->size; ++i) { + Field* col = benchArrayGet(stb->cols, i); + pos += snprintf(buf + pos, size - pos, ",%s", col->name); + } + + // tbname + pos += snprintf(buf + pos, size - pos, ",%s", g_arguments->csv_tbname_alias); + + // tags + for (size_t i = 0; i < stb->tags->size; ++i) { + Field* tag = benchArrayGet(stb->tags, i); + pos += snprintf(buf + pos, size - pos, ",%s", tag->name); + } + + write_meta->csv_header_length = (pos > 0 && pos < size) ? pos : 0; + return (pos > 0 && pos < size) ? 0 : -1; +} + + +static int csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* write_meta) { write_meta->naming_type = csvGetFileNamingType(stb); write_meta->total_threads = 1; + write_meta->csv_header_length = 0; write_meta->db = db; write_meta->stb = stb; write_meta->start_ts = stb->startTimestamp; @@ -148,6 +183,12 @@ static void csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* writ write_meta->ts_step = stb->timestamp_step * stb->insertRows; write_meta->interlace_step = stb->timestamp_step * stb->interlaceRows; + int ret = csvGenCsvHeader(write_meta); + if (ret < 0) { + errorPrint("Failed to generate csv header data. database: %s, super table: %s, naming type: %d.\n", + db->dbName, stb->stbName, write_meta->naming_type); + return -1; + } switch (meta.naming_type) { case CSV_NAMING_I_SINGLE: { @@ -174,7 +215,7 @@ static void csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* writ } } - return; + return 0; } @@ -428,7 +469,8 @@ static int csvWriteFile(FILE* fp, uint64_t ctb_idx, int64_t cur_ts, int64_t* ck, CsvRowTagsBuf* tags_buf_bucket = thread_meta->tags_buf_bucket; CsvRowColsBuf* tags_buf = &tags_buf_bucket[ctb_idx]; CsvRowColsBuf* cols_buf = thread_meta->cols_buf; - int ret = 0; + int ret = 0; + size_t written = 0; ret = csvGenRowColData(cols_buf->buf, cols_buf->buf_size, stb, cur_ts, db->precision, ck); @@ -440,26 +482,22 @@ static int csvWriteFile(FILE* fp, uint64_t ctb_idx, int64_t cur_ts, int64_t* ck, cols_buf->length = ret; - // write header if (thread_meta->output_header) { - // TODO - + written = fwrite(write_meta->csv_header, 1, write_meta->csv_header_length, fp); thread_meta->output_header = false; } - // write columns - size_t written = fwrite(cols_buf->buf, 1, cols_buf->length, fp); + written = fwrite(cols_buf->buf, 1, cols_buf->length, fp); if (written != cols_buf->length) { errorPrint("Failed to write csv column data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", cols_buf->length, written, db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx); return -1; } - // write tags - size_t written = fwrite(tags_buf->buf, 1, tags_buf->length, fp); + written = fwrite(tags_buf->buf, 1, tags_buf->length, fp); if (written != tags_buf->length) { errorPrint("Failed to write csv tag data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", tags_buf->length, written, db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx); @@ -574,23 +612,30 @@ end: static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) { int ret = 0; - pthread_t* pids = benchCalloc(write_meta.total_threads, sizeof(pthread_t), false); - if (!pids) { - ret = -1; - goto end; - } CsvWriteMeta* write_meta = benchCalloc(1, sizeof(CsvWriteMeta), false); - if (!args) { + if (!write_meta) { ret = -1; goto end; } - CsvThreadArgs* args = benchCalloc(write_meta.total_threads, sizeof(CsvThreadArgs), false); + + ret = csvInitWriteMeta(db, stb, write_meta); + if (ret < 0) { + ret = -1; + goto end; + } + + CsvThreadArgs* args = benchCalloc(write_meta->total_threads, sizeof(CsvThreadArgs), false); if (!args) { ret = -1; goto end; } - csvInitWriteMeta(db, stb, write_meta); + pthread_t* pids = benchCalloc(write_meta.total_threads, sizeof(pthread_t), false); + if (!pids) { + ret = -1; + goto end; + } + for (uint32_t i = 0; (i < write_meta->total_threads && !g_arguments->terminate); ++i) { CsvThreadArgs* arg = &args[i]; arg->write_meta = write_meta; @@ -611,8 +656,8 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) { end: tmfree(pids); - tmfree(write_meta); tmfree(args); + tmfree(write_meta); return ret; }