diff --git a/tools/taos-tools/inc/bench.h b/tools/taos-tools/inc/bench.h index e8c94016f8..4dd19d83b9 100644 --- a/tools/taos-tools/inc/bench.h +++ b/tools/taos-tools/inc/bench.h @@ -20,6 +20,9 @@ #define CURL_STATICLIB #define ALLOW_FORBID_FUNC +#define MAX(a, b) ((a) > (b) ? (a) : (b)) +#define MIN(a, b) ((a) < (b) ? (a) : (b)) + #ifdef LINUX #ifndef _ALPINE @@ -787,6 +790,8 @@ typedef struct SArguments_S { char* csv_ts_format; char* csv_ts_interval; long csv_ts_intv_secs; + bool csv_output_header; + bool csv_tbname_alias; bool bind_vgroup; } SArguments; diff --git a/tools/taos-tools/inc/benchCsv.h b/tools/taos-tools/inc/benchCsv.h index a65d5d1c9c..19331b8976 100644 --- a/tools/taos-tools/inc/benchCsv.h +++ b/tools/taos-tools/inc/benchCsv.h @@ -20,22 +20,49 @@ typedef enum { - CSV_NAMING_SINGLE, - CSV_NAMING_TIME_SLICE, - CSV_NAMING_THREAD, - CSV_NAMING_THREAD_TIME_SLICE + CSV_NAMING_I_SINGLE, + CSV_NAMING_I_TIME_SLICE, + CSV_NAMING_B_THREAD, + CSV_NAMING_B_THREAD_TIME_SLICE } CsvNamingType; +typedef struct { + char* buf; + int buf_size; + int length; +} CsvRowFieldsBuf; + typedef struct { CsvNamingType naming_type; - time_t start_secs; - time_t end_secs; - time_t end_ts; - size_t thread_id; size_t total_threads; char thread_formatter[TINY_BUFF_LEN]; + SDataBase* db; + SSuperTable* stb; + int64_t start_ts; + int64_t end_ts; + int64_t ts_step; + int64_t interlace_step; } CsvWriteMeta; +typedef struct { + uint64_t ctb_start_idx; + uint64_t ctb_end_idx; + uint64_t ctb_count; + time_t start_secs; + time_t end_secs; + size_t thread_id; + bool output_header; + CsvRowFieldsBuf* tags_buf_bucket; + CsvRowFieldsBuf* cols_buf; +} CsvThreadMeta; + +typedef struct { + CsvWriteMeta* write_meta; + CsvThreadMeta thread_meta; +} CsvThreadArgs; + + + int csvTestProcess(); diff --git a/tools/taos-tools/src/benchCsv.c b/tools/taos-tools/src/benchCsv.c index c7d455c66a..cec38628ad 100644 --- a/tools/taos-tools/src/benchCsv.c +++ b/tools/taos-tools/src/benchCsv.c @@ -25,105 +25,26 @@ // #define SHOW_CNT 100000 +#define GEN_ROW_FIELDS_TAG 0 +#define GEN_ROW_FIELDS_COL 1 - - -int32_t writeCsvFile(FILE* f, char * buf, int32_t len) { - size_t size = fwrite(buf, 1, len, f); - if(size != len) { - errorPrint("failed to write csv file. expect write length:%d real write length:%d \n", len, (int32_t)size); - return -1; - } - return 0; -} - -int batchWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fs, char* buf, int rows_buf_len, int minRemain) { - int ret = 0; - int pos = 0; - int64_t tk = 0; - int64_t show = 0; - - - uint32_t tags_length = accumulateRowLen(stbInfo->tags, stbInfo->iface); - uint32_t cols_length = accumulateRowLen(stbInfo->cols, stbInfo->iface); - - size_t tags_csv_length = tags_length + stb->tags->size; - size_t cols_csv_length = cols_length + stb->cols->size; - char* tags_csv_buf = (char*)benchCalloc(1, tags_csv_length, true); - char* cols_csv_buf = (char*)benchCalloc(1, cols_csv_length, true); - - // gen child name - for (int64_t i = 0; i < stb->childTblCount; ++i) { - int64_t ts = stb->startTimestamp; - int64_t ck = 0; - - // child table - - // tags - csvGenRowTagData(tags_csv_buf, stb, i, &tk); - // insert child column data - for(int64_t j = 0; j < stb->insertRows; j++) { - genColumnData(cols_csv_buf, stb, ts, db->precision, &ck); - // combine - pos += sprintf(buf + pos, "%s,%s.\n", tags_csv_buf, cols_csv_buf); - if (rows_buf_len - pos < minRemain) { - // submit - ret = writeCsvFile(fs, buf, pos); - if (ret != 0) { - goto END; - } - - pos = 0; - } - - // ts move next - ts += stb->timestamp_step; - - // check cancel - if(g_arguments->terminate) { - infoPrint("%s", "You are cancel, exiting ...\n"); - ret = -1; - goto END; - } - - // print show - if (++show % SHOW_CNT == 0) { - infoPrint("batch write child table cnt = %"PRId64 " all rows = %" PRId64 "\n", i+1, show); - } - - } - } - - if (pos > 0) { - ret = writeCsvFile(fs, buf, pos); - pos = 0; - } - -END: - // free - tmfree(tags_csv_buf); - tmfree(cols_csv_buf); - return ret; -} - - -static time_t csvGetStartSeconds(SDataBase* db, SSuperTable* stb) { +static time_t csvGetStartSeconds(int precision, int64_t start_ts) { time_t start_seconds = 0; - if (db->precision == TSDB_TIME_PRECISION_MICRO) { - start_seconds = stb->startTimestamp / 1000000L; - } else if (db->precision == TSDB_TIME_PRECISION_NANO) { - start_seconds = stb->startTimestamp / 1000000000L; + if (precision == TSDB_TIME_PRECISION_MICRO) { + start_seconds = start_ts / 1000000L; + } else if (precision == TSDB_TIME_PRECISION_NANO) { + start_seconds = start_ts / 1000000000L; } else { - start_seconds = stb->startTimestamp / 1000L; + start_seconds = start_ts / 1000L; } return start_seconds; } -void csvConvertTime2String(time_t time_value, char* time_buf, size_t buf_size) { +static void csvConvertTime2String(time_t time_value, char* time_buf, size_t buf_size) { struct tm tm_result; char *old_locale = setlocale(LC_TIME, "C"); #ifdef _WIN32 @@ -133,45 +54,73 @@ void csvConvertTime2String(time_t time_value, char* time_buf, size_t buf_size) { #endif strftime(time_buf, buf_size, g_arguments->csv_ts_format, &tm_result); if (old_locale) { - (LC_TIME, old_locale); + setlocale(LC_TIME, old_locale); } + return; } static CsvNamingType csvGetFileNamingType(SSuperTable* stb) { if (stb->interlaceRows > 0) { if (g_arguments->csv_ts_format) { - return CSV_NAMING_TIME_SLICE; + return CSV_NAMING_I_TIME_SLICE; } else { - return CSV_NAMING_SINGLE; + return CSV_NAMING_I_SINGLE; } } else { if (g_arguments->csv_ts_format) { - return CSV_NAMING_THREAD_TIME_SLICE; + return CSV_NAMING_B_THREAD_TIME_SLICE; } else { - return CSV_NAMING_THREAD; + return CSV_NAMING_B_THREAD; } } } -static void csvGenEndTimestamp(CsvWriteMeta* meta, SDataBase* db) { - time_t end_ts = 0; +static void csvCalcTimestampStep(CsvWriteMeta* meta) { + time_t ts_step = 0; - if (db->precision == TSDB_TIME_PRECISION_MICRO) { - end_ts = meta->end_secs * 1000000L; + if (meta->db->precision == TSDB_TIME_PRECISION_MICRO) { + ts_step = g_arguments->csv_ts_intv_secs * 1000000L; } else if (db->precision == TSDB_TIME_PRECISION_NANO) { - end_ts = meta->end_secs * 1000000000L; + ts_step = g_arguments->csv_ts_intv_secs * 1000000000L; } else { - end_ts = meta->end_secs * 1000L; + ts_step = g_arguments->csv_ts_intv_secs * 1000L; } - meta->end_ts = end_ts; + meta->ts_step = ts_step; + return; +} + + +static void csvCalcCtbRange(CsvThreadMeta* meta, size_t total_threads, int64_t ctb_offset, int64_t ctb_count) { + uint64_t ctb_start_idx = 0; + uint64_t ctb_end_idx = 0; + size_t tid_idx = meta->thread_id - 1; + size_t base = ctb_count / total_threads; + size_t remainder = ctb_count % total_threads; + + if (tid_idx < remainder) { + ctb_start_idx = ctb_offset + tid_idx * (base + 1); + ctb_end_idx = ctb_start_idx + (base + 1); + } else { + ctb_start_idx = ctb_offset + remainder * (base + 1) + (tid_idx - remainder) * base; + ctb_end_idx = ctb_start_idx + base; + } + + if (ctb_end_idx > ctb_offset + ctb_count) { + ctb_end_idx = ctb_offset + ctb_count; + } + + meta->ctb_start_idx = ctb_start_idx; + meta->ctb_end_idx = ctb_end_idx; + meta->ctb_count = ctb_count; return; } static void csvGenThreadFormatter(CsvWriteMeta* meta) { int digits = 0; + if (meta->total_threads == 0) { digits = 1; } else { @@ -181,52 +130,50 @@ static void csvGenThreadFormatter(CsvWriteMeta* meta) { } if (digits <= 1) { - (void)sprintf(meta->thread_formatter, "%%d"); + (void)snprintf(meta->thread_formatter, sizeof(meta->thread_formatter), "%%d"); } else { (void)snprintf(meta->thread_formatter, sizeof(meta->thread_formatter), "%%0%dd", digits); } + return; } -static CsvWriteMeta csvInitFileNamingMeta(SDataBase* db, SSuperTable* stb) { +static CsvWriteMeta csvInitWriteMeta(SDataBase* db, SSuperTable* stb) { CsvWriteMeta meta = { - .naming_type = CSV_NAMING_SINGLE, - .start_secs = 0, - .end_secs = 0, - .thread_id = 0, + .naming_type = CSV_NAMING_I_SINGLE, .total_threads = 1, - .thread_formatter = {} + .thread_formatter = {}, + .db = db, + .stb = stb, + .start_ts = stb->startTimestamp, + .end_ts = stb->startTimestamp + stb->timestamp_step * stb->insertRows, + .ts_step = stb->timestamp_step * stb->insertRows, + .interlace_step = stb->timestamp_step * stb->interlaceRows }; meta.naming_type = csvGetFileNamingType(stb); switch (meta.naming_type) { - case CSV_NAMING_SINGLE: { + case CSV_NAMING_I_SINGLE: { break; } - case CSV_NAMING_TIME_SLICE: { - meta.start_secs = csvGetStartSeconds(db, stb); - meta.end_secs = meta.start_secs + g_arguments->csv_ts_intv_secs; - csvGenEndTimestamp(&meta, db); + case CSV_NAMING_I_TIME_SLICE: { + csvCalcTimestampStep(&meta); break; } - case CSV_NAMING_THREAD: { - meta.thread_id = 1; + case CSV_NAMING_B_THREAD: { meta.total_threads = g_arguments->nthreads; csvGenThreadFormatter(&meta); break; } - case CSV_NAMING_THREAD_TIME_SLICE: { - meta.thread_id = 1; + case CSV_NAMING_B_THREAD_TIME_SLICE: { meta.total_threads = g_arguments->nthreads; csvGenThreadFormatter(&meta); - meta.start_secs = csvGetStartSeconds(db, stb); - meta.end_secs = meta.start_secs + g_arguments->csv_ts_intv_secs; - csvGenEndTimestamp(&meta, db); + csvCalcTimestampStep(&meta); break; } default: { - meta.naming_type = CSV_NAMING_SINGLE; + meta.naming_type = CSV_NAMING_I_SINGLE; break; } } @@ -235,7 +182,67 @@ static CsvWriteMeta csvInitFileNamingMeta(SDataBase* db, SSuperTable* stb) { } -int csvGetFileFullname(CsvWriteMeta* meta, char* fullname, size_t size) { +static CsvThreadMeta csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id) { + SDataBase* db = write_meta->db; + SSuperTable* stb = write_meta->stb; + CsvThreadMeta meta = { + .ctb_start_idx = 0, + .ctb_end_idx = 0, + .ctb_count = 0, + .start_secs = 0, + .end_secs = 0, + .thread_id = thread_id, + .tags_buf_bucket = NULL, + .cols_buf = NULL + }; + + csvCalcCtbRange(&meta, write_meta->total_threads, stb->childTblFrom, stb->childTblCount); + + switch (write_meta->naming_type) { + case CSV_NAMING_I_SINGLE: + case CSV_NAMING_B_THREAD: { + break; + } + case CSV_NAMING_I_TIME_SLICE: + case CSV_NAMING_B_THREAD_TIME_SLICE: { + meta.start_secs = csvGetStartSeconds(db->precision, stb->startTimestamp); + meta.end_secs = meta.start_secs + g_arguments->csv_ts_intv_secs; + break; + } + default: { + meta.naming_type = CSV_NAMING_I_SINGLE; + break; + } + } + + return meta; +} + + +static void csvUpdateSliceRange(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta, int64_t last_end_ts) { + SDataBase* db = write_meta->db; + + switch (write_meta->naming_type) { + case CSV_NAMING_I_SINGLE: + case CSV_NAMING_B_THREAD: { + break; + } + case CSV_NAMING_I_TIME_SLICE: + case CSV_NAMING_B_THREAD_TIME_SLICE: { + thread_meta->start_secs = csvGetStartSeconds(db->precision, last_end_ts); + thread_meta->end_secs = thread_meta.start_secs + g_arguments->csv_ts_intv_secs; + break; + } + default: { + break; + } + } + + return; +} + + +static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta, char* fullname, size_t size) { char thread_buf[SMALL_BUFF_LEN]; char start_time_buf[MIDDLE_BUFF_LEN]; char end_time_buf[MIDDLE_BUFF_LEN]; @@ -244,22 +251,22 @@ int csvGetFileFullname(CsvWriteMeta* meta, char* fullname, size_t size) { const char* file_prefix = g_arguments->csv_file_prefix; switch (meta->naming_type) { - case CSV_NAMING_SINGLE: { + case CSV_NAMING_I_SINGLE: { ret = snprintf(fullname, size, "%s%s.csv", base_path, file_prefix); break; } - case CSV_NAMING_TIME_SLICE: { + case CSV_NAMING_I_TIME_SLICE: { csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf)); csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf)); ret = snprintf(fullname, size, "%s%s_%s_%s.csv", base_path, file_prefix, start_time_buf, end_time_buf); break; } - case CSV_NAMING_THREAD: { + case CSV_NAMING_B_THREAD: { (void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id); ret = snprintf(fullname, size, "%s%s_%s.csv", base_path, file_prefix, thread_buf); break; } - case CSV_NAMING_THREAD_TIME_SLICE: { + case CSV_NAMING_B_THREAD_TIME_SLICE: { (void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id); csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf)); csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf)); @@ -276,184 +283,55 @@ int csvGetFileFullname(CsvWriteMeta* meta, char* fullname, size_t size) { } -uint32_t csvCalcInterlaceRows(CsvWriteMeta* meta, SSuperTable* stb, int64_t ts) { - uint32_t need_rows = 0; +static int64_t csvCalcSliceBatchTimestamp(CsvWriteMeta* write_meta, int64_t slice_cur_ts, int64_t slice_end_ts) { + int64_t slice_batch_ts = 0; - - switch (meta->naming_type) { - case CSV_NAMING_SINGLE: { - need_rows = stb->interlaceRows; + switch (write_meta->naming_type) { + case CSV_NAMING_I_SINGLE: + case CSV_NAMING_I_TIME_SLICE: { + slice_batch_ts = MIN(slice_cur_ts + write_meta->interlace_step, slice_end_ts); break; } - case CSV_NAMING_TIME_SLICE: { - (meta->end_ts - ts) / stb->timestamp_step - need_rows = stb->interlaceRows; - - break; - } - case CSV_NAMING_THREAD: { - (void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id); - ret = snprintf(fullname, size, "%s%s_%s.csv", base_path, file_prefix, thread_buf); - break; - } - case CSV_NAMING_THREAD_TIME_SLICE: { - (void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id); - csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf)); - csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf)); - ret = snprintf(fullname, size, "%s%s_%s_%s_%s.csv", base_path, file_prefix, thread_buf, start_time_buf, end_time_buf); + case CSV_NAMING_B_THREAD: + case CSV_NAMING_B_THREAD_TIME_SLICE: { + slice_batch_ts = slice_end_ts; break; } default: { - ret = -1; break; } } + + return slice_batch_ts; } +static int csvGenRowFields(char* buf, int size, SSuperTable* stb, int fields_cate, int64_t* k) { + int pos = 0; + BArray* fields = NULL; + int16_t field_count = 0; + char* binanry_prefix = stb->binaryPrefex ? stb->binaryPrefex : ""; + char* nchar_prefix = stb->ncharPrefex ? stb->ncharPrefex : ""; - -static int interlaceWriteCsv(SDataBase* db, SSuperTable* stb, FILE* fp, char* rows_buf, int rows_buf_len) { - char fullname[MAX_PATH_LEN] = {}; - CsvWriteMeta meta = csvInitFileNamingMeta(); - - int ret = csvGetFileFullname(&meta, fullname, sizeof(fullname)); - if (ret < 0) { - errorPrint("Failed to generate csv filename. database: %s, super table: %s, naming type: %d.\n", - db->dbName, stb->stbName, meta.naming_type); + if (!buf || !stb || !k || size <= 0) { return -1; } - int ret = 0; - int pos = 0; - int64_t n = 0; // already inserted rows for one child table - int64_t tk = 0; - int64_t show = 0; - int64_t ts = 0; - int64_t last_ts = stb->startTimestamp; - - // init buffer - char** tags_buf_bucket = (char **)benchCalloc(stb->childTblCount, sizeof(char *), true); - int cols_buf_length = stb->lenOfCols + stb->cols->size; - char* cols_buf = (char *)benchCalloc(1, cols_buf_length, true); - - for (int64_t i = 0; i < stb->childTblCount; ++i) { - int tags_buf_length = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->tags->size; - tags_buf_bucket[i] = benchCalloc(1, tags_buf_length, true); - if (!tags_buf_bucket[i]) { - ret = -1; - goto end; - } - - ret = csvGenRowTagData(tags_buf_bucket[i], tags_buf_length, stb, i, &tk); - if (!ret) { - goto end; - } + if (fields_cate == GEN_ROW_FIELDS_TAG) { + fields = stb->tags; + field_count = stb->tags->size; + } else { + fields = stb->cols; + field_count = stb->cols->size; } - while (n < stb->insertRows ) { - for (int64_t i = 0; i < stb->childTblCount; ++i) { - ts = last_ts; - int64_t ck = 0; - - - // calc need insert rows - uint32_t need_rows = csvCalcInterlaceRows(&meta, stb, ts) - - int64_t needInserts = stb->interlaceRows; - if(needInserts > stb->insertRows - n) { - needInserts = stb->insertRows - n; - } - - for (int64_t j = 0; j < needInserts; j++) { - genColumnData(cols_buf, stb, ts, db->precision, &ck); - // combine tags,cols - pos += sprintf(buf + pos, "%s,%s\n", tags_buf_bucket[i], cols_buf); - if (rows_buf_len - pos < minRemain) { - // submit - ret = writeCsvFile(fp, buf, pos); - if (ret != 0) { - goto end; - } - pos = 0; - } - - // ts move next - ts += stb->timestamp_step; - - // check cancel - if(g_arguments->terminate) { - infoPrint("%s", "You are cancel, exiting ... \n"); - ret = -1; - goto end; - } - - // print show - if (++show % SHOW_CNT == 0) { - infoPrint("interlace write child table index = %"PRId64 " all rows = %"PRId64 "\n", i+1, show); - } - } - - // if last child table - if (i + 1 == stb->childTblCount ) { - n += needInserts; - last_ts = ts; - } - } - } - - if (pos > 0) { - ret = writeCsvFile(fp, buf, pos); - pos = 0; - } - -end: - // free - for (int64_t m = 0 ; m < stb->childTblCount; m ++) { - tmfree(tags_buf_bucket[m]); - } - tmfree(tags_buf_bucket); - tmfree(cols_buf); - return ret; -} - - -// gen tag data -int csvGenRowTagData(char* buf, size_t size, SSuperTable* stb, int64_t index, int64_t* k) { - // tbname - int pos = snprintf(buf, size, "\'%s%"PRId64"\'", stb->childTblPrefix, index); - // tags - pos += csvGenRowFields(buf + pos, stb->tags, stb->tags->size, stb->binaryPrefex, stb->ncharPrefex, k); - - return (pos > 0 && (size_t)pos < size) ? 0 : -1; -} - -// gen column data -char * genColumnData(char* cols_csv_buf, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k) { - char szTime[128] = {0}; - toolsFormatTimestamp(szTime, ts, precision); - int pos = sprintf(cols_csv_buf, "\'%s\'", szTime); - - // columns - csvGenRowFields(cols_csv_buf + pos, stb->cols, stb->cols->size, stb->binaryPrefex, stb->ncharPrefex, k); - return cols_csv_buf; -} - - -int32_t csvGenRowFields(char* buf, BArray* fields, int16_t field_count, char* binanry_prefix, char* nchar_prefix, int64_t* k) { - int32_t pos = 0; - for (uint16_t i = 0; i < field_count; ++i) { Field* field = benchArrayGet(fields, i); char* prefix = ""; if(field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY) { - if (binanry_prefix) { - prefix = binanry_prefix; - } + prefix = binanry_prefix; } else if(field->type == TSDB_DATA_TYPE_NCHAR) { - if (nchar_prefix) { - prefix = nchar_prefix; - } + prefix = nchar_prefix; } pos += dataGenByField(field, buf, pos, prefix, k, ""); } @@ -462,68 +340,297 @@ int32_t csvGenRowFields(char* buf, BArray* fields, int16_t field_count, char* bi } - -int csvGenStbInterlace(SDataBase* db, SSuperTable* stb) { - - - int ret = 0; - char outFile[MAX_FILE_NAME_LEN] = {0}; - obtainCsvFile(outFile, db, stb, outDir); - FILE* fp = fopen(outFile, "w"); - if(fp == NULL) { - errorPrint("failed create csv file. file=%s, last errno=%d strerror=%s \n", outFile, errno, strerror(errno)); +static int csvGenRowTagData(char* buf, int size, SSuperTable* stb, int64_t index, int64_t* k) { + if (!buf || !stb || !k || size <= 0) { return -1; } - int row_buf_len = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->lenOfCols + stb->tags->size + stb->cols->size; - int rows_buf_len = row_buf_len * g_arguments->interlaceRows; - char* rows_buf = benchCalloc(1, rows_buf_len, true); + // tbname + int pos = snprintf(buf, size, "\'%s%"PRId64"\'", stb->childTblPrefix, index); - infoPrint("start write csv file: %s \n", outFile); + // tags + pos += csvGenRowFields(buf + pos, size - pos, stb, GEN_ROW_FIELDS_TAG, k); - // interlace mode - ret = interlaceWriteCsv(db, stb, fp, rows_buf, rows_buf_len); - - - tmfree(rows_buf); - fclose(fp); - - succPrint("end write csv file: %s \n", outFile); - - - // wait threads - for (int i = 0; i < threadCnt; i++) { - infoPrint("pthread_join %d ...\n", i); - pthread_join(pids[i], NULL); - } - - - return ret; + return (pos > 0 && pos < size) ? pos : -1; } -void csvGenPrepare(SDataBase* db, SSuperTable* stb) { - stbInfo->lenOfTags = accumulateRowLen(stbInfo->tags, stbInfo->iface); - stbInfo->lenOfCols = accumulateRowLen(stbInfo->cols, stbInfo->iface); +static int csvGenRowColData(char* buf, int size, SSuperTable* stb, int64_t ts, int32_t precision, int64_t *k) { + char ts_fmt[128] = {0}; + toolsFormatTimestamp(ts_fmt, ts, precision); + int pos = snprintf(buf, size, "\'%s\'", ts_fmt); + + // columns + pos += csvGenRowFields(buf + pos, size - pos, stb, GEN_ROW_FIELDS_COL, k); + return (pos > 0 && pos < size) ? pos : -1; +} + + +static CsvRowFieldsBuf* csvGenCtbTagData(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) { + SSuperTable* stb = write_meta->stb; + int ret = 0; + int64_t tk = 0; + + if (!write_meta || !thread_meta) { + return NULL; + } + + CsvRowFieldsBuf* tags_buf_bucket = (CsvRowFieldsBuf*)benchCalloc(thread_meta->ctb_count, sizeof(CsvRowFieldsBuf), true); + if (!tags_buf_bucket) { + return NULL; + } + + char* tags_buf = NULL; + int tags_buf_size = TSDB_TABLE_NAME_LEN + stb->lenOfTags + stb->tags->size; + for (uint64_t i = 0; i < thread_meta->ctb_count; ++i) { + tags_buf = benchCalloc(1, tags_buf_size, true); + if (!tags_buf) { + goto error; + } + + tags_buf_bucket[i].buf = tags_buf; + tags_buf_bucket[i].buf_size = tags_buf_size; + + ret = csvGenRowTagData(tags_buf, tags_buf_size, stb, thread_meta->ctb_start_idx + i, &tk); + if (ret <= 0) { + goto error; + } + + tags_buf_bucket[i].length = ret; + } + + return tags_buf_bucket; + +error: + csvFreeCtbTagData(thread_meta, tags_buf_bucket); + return NULL; +} + + +static void csvFreeCtbTagData(CsvThreadMeta* thread_meta, CsvRowFieldsBuf* tags_buf_bucket) { + if (!thread_meta || !tags_buf_bucket) { + return; + } + + for (uint64_t i = 0 ; i < thread_meta->ctb_count; ++i) { + char* tags_buf = tags_buf_bucket[i].buf; + if (tags_buf) { + tmfree(tags_buf_bucket); + } else { + break; + } + } + tmfree(tags_buf_bucket); return; } -int csvGenStb(SDataBase* db, SSuperTable* stb) { +static int csvWriteFile(FILE* fp, uint64_t ctb_idx, int64_t cur_ts, int64_t* ck, CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) { + SDataBase* db = write_meta->db; + SSuperTable* stb = write_meta->stb; + CsvRowFieldsBuf* tags_buf_bucket = thread_meta->tags_buf_bucket; + CsvRowFieldsBuf* tags_buf = &tags_buf_bucket[ctb_idx]; + CsvRowFieldsBuf* cols_buf = thread_meta->cols_buf; + int ret = 0; + + + ret = csvGenRowColData(cols_buf->buf, cols_buf->buf_size, stb, cur_ts, db->precision, ck); + if (ret <= 0) { + errorPrint("Failed to generate csv column data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", + db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx); + return -1; + } + + cols_buf->length = ret; + + + // write header + if (thread_meta->output_header) { + // TODO + + thread_meta->output_header = false; + } + + + // write columns + size_t written = fwrite(cols_buf->buf, 1, cols_buf->length, fp); + if (written != cols_buf->length) { + errorPrint("Failed to write csv column data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", + cols_buf->length, written, db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx); + return -1; + } + + + // write tags + size_t written = fwrite(tags_buf->buf, 1, tags_buf->length, fp); + if (written != tags_buf->length) { + errorPrint("Failed to write csv tag data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", + tags_buf->length, written, db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx); + return -1; + } + + return 0; +} + + +static void* csvGenStbThread(void* arg) { + CsvThreadArgs* thread_arg = (CsvThreadArgs*)arg; + CsvWriteMeta* write_meta = thread_arg->write_meta; + CsvThreadMeta* thread_meta = &thread_arg->thread_meta; + SDataBase* db = write_meta->db; + SSuperTable* stb = write_meta->stb; + + int64_t cur_ts = 0; + int64_t slice_cur_ts = 0; + int64_t slice_end_ts = 0; + int64_t slice_batch_ts = 0; + int64_t slice_ctb_cur_ts = 0; + int64_t ck = 0; + uint64_t ctb_idx = 0; + int ret = 0; + FILE* fp = NULL; + char fullname[MAX_PATH_LEN] = {}; + + + // tags buffer + CsvRowFieldsBuf* tags_buf_bucket = csvGenCtbTagData(write_meta, thread_meta); + if (!tags_buf_bucket) { + errorPrint("Failed to generate csv tag data. database: %s, super table: %s, naming type: %d, thread index: %d.\n", + db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id); + return NULL; + } + + // column buffer + int buf_size = stb->lenOfCols + stb->cols->size; + char* buf = (char*)benchCalloc(1, buf_size, true); + if (!buf) { + errorPrint("Failed to malloc csv column buffer. database: %s, super table: %s, naming type: %d, thread index: %d.\n", + db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id); + goto end; + } + + CsvRowFieldsBuf cols_buf = { + .buf = buf, + .buf_size = buf_size, + .length = 0 + }; + + thread_meta->tags_buf_bucket = tags_buf_bucket; + thread_meta->cols_buf = &cols_buf; + + + for (cur_ts = write_meta->start_ts; cur_ts < write_meta->end_ts; cur_ts += write_meta->ts_step) { + // get filename + fullname[MAX_PATH_LEN] = {}; + ret = csvGetFileFullname(write_meta, thread_meta, fullname, sizeof(fullname)); + if (ret < 0) { + errorPrint("Failed to generate csv filename. database: %s, super table: %s, naming type: %d, thread index: %d.\n", + db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id); + goto end; + } + + // create fd + fp = fopen(fullname, "w"); + if (fp == NULL) { + errorPrint("Failed to create csv file. thread index: %d, file: %s, errno: %d, strerror: %s.\n", + thread_meta->thread_id, fullname, errno, strerror(errno)); + goto end; + } + + + thread_meta->output_header = g_arguments->csv_output_header; + slice_cur_ts = cur_ts; + slice_end_ts = MIN(cur_ts + write_meta->ts_step, write_meta->end_ts); + + // write data + while (slice_cur_ts < slice_end_ts) { + slice_batch_ts = csvCalcSliceBatchTimestamp(write_meta, slice_cur_ts, slice_end_ts); + + for (ctb_idx = 0; ctb_idx < thread_meta->ctb_count; ++ctb_idx) { + for (slice_ctb_cur_ts = slice_cur_ts; slice_ctb_cur_ts < slice_batch_ts; slice_ctb_cur_ts += write_meta->stb->timestamp_step) { + ret = csvWriteFile(fp, ctb_idx, slice_ctb_cur_ts, &ck, write_meta, thread_meta); + if (!ret) { + errorPrint("Failed to write csv file. thread index: %d, file: %s, errno: %d, strerror: %s.\n", + thread_meta->thread_id, fullname, errno, strerror(errno)); + fclose(fp); + goto end; + } + + ck += 1; + } + } + + slice_cur_ts = slice_batch_ts; + } + + fclose(fp); + csvUpdateSliceRange(write_meta, thread_meta, last_end_ts); + } + +end: + csvFreeCtbTagData(tags_buf_bucket); + tmfree(cols_buf); + return NULL; +} + + +static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) { + int ret = 0; + CsvWriteMeta write_meta = csvInitWriteMeta(db, stb); + + pthread_t* pids = benchCalloc(write_meta.total_threads, sizeof(pthread_t), false); + if (!pids) { + ret = -1; + goto end; + } + CsvThreadArgs* args = benchCalloc(write_meta.total_threads, sizeof(CsvThreadArgs), false); + if (!args) { + ret = -1; + goto end; + } + + for (uint32_t i = 0; (i < write_meta.total_threads && !g_arguments->terminate); ++i) { + CsvThreadArgs* arg = &args[i]; + arg->write_meta = &write_meta; + arg->thread_meta = csvInitThreadMeta(&write_meta, i + 1); + + ret = pthread_create(&pids[i], NULL, csvGenStbThread, arg); + if (!ret) { + perror("Failed to create thread"); + goto end; + } + } + + // wait threads + for (uint32_t i = 0; i < write_meta.total_threads; ++i) { + infoPrint("pthread_join %d ...\n", i); + pthread_join(pids[i], NULL); + } + +end: + tmfree(pids); + tmfree(args); + return ret; +} + + +static void csvGenPrepare(SDataBase* db, SSuperTable* stb) { + stb->lenOfTags = accumulateRowLen(stb->tags, stb->iface); + stb->lenOfCols = accumulateRowLen(stb->cols, stb->iface); + + if (stb->childTblTo) { + stb->childTblCount = stb->childTblTo - stb->childTblFrom; + } + + return; +} + + +static int csvGenStb(SDataBase* db, SSuperTable* stb) { // prepare csvGenPrepare(db, stb); - - int ret = 0; - if (stb->interlaceRows > 0) { - // interlace mode - ret = csvGenStbInterlace(db, stb); - } else { - // batch mode - ret = csvGenStbBatch(db, stb); - } - - return ret; + return csvGenStbProcess(db, stb); } diff --git a/tools/taos-tools/src/benchJsonOpt.c b/tools/taos-tools/src/benchJsonOpt.c index a88526c278..4cf690204e 100644 --- a/tools/taos-tools/src/benchJsonOpt.c +++ b/tools/taos-tools/src/benchJsonOpt.c @@ -1619,6 +1619,23 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) { g_arguments->csv_ts_interval = "1d"; } + // csv output header + g_arguments->csv_output_header = false; + tools_cJSON* oph = tools_cJSON_GetObjectItem(json, "csv_output_header"); + if (oph && oph->type == tools_cJSON_String && oph->valuestring != NULL) { + if (0 == strcasecmp(oph->valuestring, "yes")) { + g_arguments->csv_output_header = true; + } + } + + // csv tbname alias + tools_cJSON* tba = tools_cJSON_GetObjectItem(json, "csv_tbname_alias"); + if (tba && tba->type == tools_cJSON_String && tba->valuestring != NULL) { + g_arguments->csv_tbname_alias = tba->valuestring; + } else { + g_arguments->csv_tbname_alias = "device_id"; + } + code = 0; return code; }