refactor: csv init write & thread meta

This commit is contained in:
Yaming Pei 2025-02-27 15:13:58 +08:00
parent 3627a54c13
commit 5ce4bd2465
1 changed files with 42 additions and 39 deletions

View File

@ -138,38 +138,34 @@ static void csvGenThreadFormatter(CsvWriteMeta* meta) {
}
static CsvWriteMeta csvInitWriteMeta(SDataBase* db, SSuperTable* stb) {
CsvWriteMeta meta = {
.naming_type = CSV_NAMING_I_SINGLE,
.total_threads = 1,
.thread_formatter = {},
.db = db,
.stb = stb,
.start_ts = stb->startTimestamp,
.end_ts = stb->startTimestamp + stb->timestamp_step * stb->insertRows,
.ts_step = stb->timestamp_step * stb->insertRows,
.interlace_step = stb->timestamp_step * stb->interlaceRows
};
static void csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* write_meta) {
write_meta->naming_type = csvGetFileNamingType(stb);
write_meta->total_threads = 1;
write_meta->db = db;
write_meta->stb = stb;
write_meta->start_ts = stb->startTimestamp;
write_meta->end_ts = stb->startTimestamp + stb->timestamp_step * stb->insertRows;
write_meta->ts_step = stb->timestamp_step * stb->insertRows;
write_meta->interlace_step = stb->timestamp_step * stb->interlaceRows;
meta.naming_type = csvGetFileNamingType(stb);
switch (meta.naming_type) {
case CSV_NAMING_I_SINGLE: {
break;
}
case CSV_NAMING_I_TIME_SLICE: {
csvCalcTimestampStep(&meta);
csvCalcTimestampStep(write_meta);
break;
}
case CSV_NAMING_B_THREAD: {
meta.total_threads = g_arguments->nthreads;
csvGenThreadFormatter(&meta);
csvGenThreadFormatter(write_meta);
break;
}
case CSV_NAMING_B_THREAD_TIME_SLICE: {
meta.total_threads = g_arguments->nthreads;
csvGenThreadFormatter(&meta);
csvCalcTimestampStep(&meta);
csvGenThreadFormatter(write_meta);
csvCalcTimestampStep(write_meta);
break;
}
default: {
@ -178,25 +174,26 @@ static CsvWriteMeta csvInitWriteMeta(SDataBase* db, SSuperTable* stb) {
}
}
return meta;
return;
}
static CsvThreadMeta csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id) {
static void csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id, CsvThreadMeta* thread_meta) {
SDataBase* db = write_meta->db;
SSuperTable* stb = write_meta->stb;
CsvThreadMeta meta = {
.ctb_start_idx = 0,
.ctb_end_idx = 0,
.ctb_count = 0,
.start_secs = 0,
.end_secs = 0,
.thread_id = thread_id,
.tags_buf_bucket = NULL,
.cols_buf = NULL
};
csvCalcCtbRange(&meta, write_meta->total_threads, stb->childTblFrom, stb->childTblCount);
thread_meta->ctb_start_idx = 0;
thread_meta->ctb_end_idx = 0;
thread_meta->ctb_count = 0;
thread_meta->start_secs = 0;
thread_meta->end_secs = 0;
thread_meta->thread_id = thread_id;
thread_meta->output_header = false;
thread_meta->tags_buf_size = 0;
thread_meta->tags_buf_bucket = NULL;
thread_meta->cols_buf = NULL;
csvCalcCtbRange(write_meta, write_meta->total_threads, stb->childTblFrom, stb->childTblCount);
switch (write_meta->naming_type) {
case CSV_NAMING_I_SINGLE:
@ -205,17 +202,17 @@ static CsvThreadMeta csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread
}
case CSV_NAMING_I_TIME_SLICE:
case CSV_NAMING_B_THREAD_TIME_SLICE: {
meta.start_secs = csvGetStartSeconds(db->precision, stb->startTimestamp);
meta.end_secs = meta.start_secs + g_arguments->csv_ts_intv_secs;
thread_meta->start_secs = csvGetStartSeconds(db->precision, stb->startTimestamp);
thread_meta->end_secs = thread_meta->start_secs + g_arguments->csv_ts_intv_secs;
break;
}
default: {
meta.naming_type = CSV_NAMING_I_SINGLE;
thread_meta->naming_type = CSV_NAMING_I_SINGLE;
break;
}
}
return meta;
return;
}
@ -576,23 +573,28 @@ end:
static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) {
int ret = 0;
CsvWriteMeta write_meta = csvInitWriteMeta(db, stb);
pthread_t* pids = benchCalloc(write_meta.total_threads, sizeof(pthread_t), false);
if (!pids) {
ret = -1;
goto end;
}
CsvWriteMeta* write_meta = benchCalloc(1, sizeof(CsvWriteMeta), false);
if (!args) {
ret = -1;
goto end;
}
CsvThreadArgs* args = benchCalloc(write_meta.total_threads, sizeof(CsvThreadArgs), false);
if (!args) {
ret = -1;
goto end;
}
for (uint32_t i = 0; (i < write_meta.total_threads && !g_arguments->terminate); ++i) {
csvInitWriteMeta(db, stb, write_meta);
for (uint32_t i = 0; (i < write_meta->total_threads && !g_arguments->terminate); ++i) {
CsvThreadArgs* arg = &args[i];
arg->write_meta = &write_meta;
arg->thread_meta = csvInitThreadMeta(&write_meta, i + 1);
arg->write_meta = write_meta;
csvInitThreadMeta(write_meta, i + 1, &arg->thread_meta);
ret = pthread_create(&pids[i], NULL, csvGenStbThread, arg);
if (!ret) {
@ -602,13 +604,14 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) {
}
// wait threads
for (uint32_t i = 0; i < write_meta.total_threads; ++i) {
for (uint32_t i = 0; i < write_meta->total_threads; ++i) {
infoPrint("pthread_join %d ...\n", i);
pthread_join(pids[i], NULL);
}
end:
tmfree(pids);
tmfree(write_meta);
tmfree(args);
return ret;
}