From 5ce4bd2465fd2e58f1d2b0da91c4cb85221aa0a6 Mon Sep 17 00:00:00 2001 From: Yaming Pei Date: Thu, 27 Feb 2025 15:13:58 +0800 Subject: [PATCH] refactor: csv init write & thread meta --- tools/taos-tools/src/benchCsv.c | 81 +++++++++++++++++---------------- 1 file changed, 42 insertions(+), 39 deletions(-) diff --git a/tools/taos-tools/src/benchCsv.c b/tools/taos-tools/src/benchCsv.c index 95c94dc57d..673e223959 100644 --- a/tools/taos-tools/src/benchCsv.c +++ b/tools/taos-tools/src/benchCsv.c @@ -138,38 +138,34 @@ static void csvGenThreadFormatter(CsvWriteMeta* meta) { } -static CsvWriteMeta csvInitWriteMeta(SDataBase* db, SSuperTable* stb) { - CsvWriteMeta meta = { - .naming_type = CSV_NAMING_I_SINGLE, - .total_threads = 1, - .thread_formatter = {}, - .db = db, - .stb = stb, - .start_ts = stb->startTimestamp, - .end_ts = stb->startTimestamp + stb->timestamp_step * stb->insertRows, - .ts_step = stb->timestamp_step * stb->insertRows, - .interlace_step = stb->timestamp_step * stb->interlaceRows - }; +static void csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* write_meta) { + write_meta->naming_type = csvGetFileNamingType(stb); + write_meta->total_threads = 1; + write_meta->db = db; + write_meta->stb = stb; + write_meta->start_ts = stb->startTimestamp; + write_meta->end_ts = stb->startTimestamp + stb->timestamp_step * stb->insertRows; + write_meta->ts_step = stb->timestamp_step * stb->insertRows; + write_meta->interlace_step = stb->timestamp_step * stb->interlaceRows; - meta.naming_type = csvGetFileNamingType(stb); switch (meta.naming_type) { case CSV_NAMING_I_SINGLE: { break; } case CSV_NAMING_I_TIME_SLICE: { - csvCalcTimestampStep(&meta); + csvCalcTimestampStep(write_meta); break; } case CSV_NAMING_B_THREAD: { meta.total_threads = g_arguments->nthreads; - csvGenThreadFormatter(&meta); + csvGenThreadFormatter(write_meta); break; } case CSV_NAMING_B_THREAD_TIME_SLICE: { meta.total_threads = g_arguments->nthreads; - csvGenThreadFormatter(&meta); - csvCalcTimestampStep(&meta); + csvGenThreadFormatter(write_meta); + csvCalcTimestampStep(write_meta); break; } default: { @@ -178,25 +174,26 @@ static CsvWriteMeta csvInitWriteMeta(SDataBase* db, SSuperTable* stb) { } } - return meta; + return; } -static CsvThreadMeta csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id) { +static void csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id, CsvThreadMeta* thread_meta) { SDataBase* db = write_meta->db; SSuperTable* stb = write_meta->stb; - CsvThreadMeta meta = { - .ctb_start_idx = 0, - .ctb_end_idx = 0, - .ctb_count = 0, - .start_secs = 0, - .end_secs = 0, - .thread_id = thread_id, - .tags_buf_bucket = NULL, - .cols_buf = NULL - }; - csvCalcCtbRange(&meta, write_meta->total_threads, stb->childTblFrom, stb->childTblCount); + thread_meta->ctb_start_idx = 0; + thread_meta->ctb_end_idx = 0; + thread_meta->ctb_count = 0; + thread_meta->start_secs = 0; + thread_meta->end_secs = 0; + thread_meta->thread_id = thread_id; + thread_meta->output_header = false; + thread_meta->tags_buf_size = 0; + thread_meta->tags_buf_bucket = NULL; + thread_meta->cols_buf = NULL; + + csvCalcCtbRange(write_meta, write_meta->total_threads, stb->childTblFrom, stb->childTblCount); switch (write_meta->naming_type) { case CSV_NAMING_I_SINGLE: @@ -205,17 +202,17 @@ static CsvThreadMeta csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread } case CSV_NAMING_I_TIME_SLICE: case CSV_NAMING_B_THREAD_TIME_SLICE: { - meta.start_secs = csvGetStartSeconds(db->precision, stb->startTimestamp); - meta.end_secs = meta.start_secs + g_arguments->csv_ts_intv_secs; + thread_meta->start_secs = csvGetStartSeconds(db->precision, stb->startTimestamp); + thread_meta->end_secs = thread_meta->start_secs + g_arguments->csv_ts_intv_secs; break; } default: { - meta.naming_type = CSV_NAMING_I_SINGLE; + thread_meta->naming_type = CSV_NAMING_I_SINGLE; break; } } - return meta; + return; } @@ -576,23 +573,28 @@ end: static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) { int ret = 0; - CsvWriteMeta write_meta = csvInitWriteMeta(db, stb); pthread_t* pids = benchCalloc(write_meta.total_threads, sizeof(pthread_t), false); if (!pids) { ret = -1; goto end; } + CsvWriteMeta* write_meta = benchCalloc(1, sizeof(CsvWriteMeta), false); + if (!args) { + ret = -1; + goto end; + } CsvThreadArgs* args = benchCalloc(write_meta.total_threads, sizeof(CsvThreadArgs), false); if (!args) { ret = -1; goto end; } - for (uint32_t i = 0; (i < write_meta.total_threads && !g_arguments->terminate); ++i) { + csvInitWriteMeta(db, stb, write_meta); + for (uint32_t i = 0; (i < write_meta->total_threads && !g_arguments->terminate); ++i) { CsvThreadArgs* arg = &args[i]; - arg->write_meta = &write_meta; - arg->thread_meta = csvInitThreadMeta(&write_meta, i + 1); + arg->write_meta = write_meta; + csvInitThreadMeta(write_meta, i + 1, &arg->thread_meta); ret = pthread_create(&pids[i], NULL, csvGenStbThread, arg); if (!ret) { @@ -602,13 +604,14 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) { } // wait threads - for (uint32_t i = 0; i < write_meta.total_threads; ++i) { + for (uint32_t i = 0; i < write_meta->total_threads; ++i) { infoPrint("pthread_join %d ...\n", i); pthread_join(pids[i], NULL); } end: tmfree(pids); + tmfree(write_meta); tmfree(args); return ret; }