feat: csv supports gzip

This commit is contained in:
Yaming Pei 2025-02-28 15:26:20 +08:00
parent 8203389ada
commit 32b575f73a
4 changed files with 166 additions and 23 deletions

View File

@ -783,6 +783,7 @@ typedef struct SArguments_S {
bool mistMode; bool mistMode;
bool escape_character; bool escape_character;
bool pre_load_tb_meta; bool pre_load_tb_meta;
bool bind_vgroup;
char* output_path; char* output_path;
char output_path_buf[MAX_PATH_LEN]; char output_path_buf[MAX_PATH_LEN];
@ -792,8 +793,8 @@ typedef struct SArguments_S {
long csv_ts_intv_secs; long csv_ts_intv_secs;
bool csv_output_header; bool csv_output_header;
bool csv_tbname_alias; bool csv_tbname_alias;
CsvCompressionLevel csv_compress_level;
bool bind_vgroup;
} SArguments; } SArguments;
typedef struct SBenchConn { typedef struct SBenchConn {

View File

@ -16,7 +16,8 @@
#ifndef INC_BENCHCSV_H_ #ifndef INC_BENCHCSV_H_
#define INC_BENCHCSV_H_ #define INC_BENCHCSV_H_
#include <bench.h> #include <zlib.h>
#include "bench.h"
typedef enum { typedef enum {
@ -26,6 +27,29 @@ typedef enum {
CSV_NAMING_B_THREAD_TIME_SLICE CSV_NAMING_B_THREAD_TIME_SLICE
} CsvNamingType; } CsvNamingType;
typedef enum {
CSV_COMPRESS_NONE = 0,
CSV_COMPRESS_FAST = 1,
CSV_COMPRESS_BALANCE = 6,
CSV_COMPRESS_BEST = 9
} CsvCompressionLevel;
typedef enum {
CSV_ERR_OK = 0,
CSV_ERR_OPEN_FAILED,
CSV_ERR_WRITE_FAILED
} CsvIoError;
typedef struct {
const char* filename;
CsvCompressionLevel compress_level;
CsvIoError result;
union {
gzFile gf;
FILE* fp;
} handle;
} CsvFileHandle;
typedef struct { typedef struct {
char* buf; char* buf;
int length; int length;

View File

@ -280,6 +280,15 @@ static void csvUpdateSliceRange(CsvWriteMeta* write_meta, CsvThreadMeta* thread_
} }
static const char* csvGetGzipFilePrefix() {
if (g_arguments->csv_compress_level == CSV_COMPRESS_NONE) {
return "";
} else {
return ".gz"
}
}
static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta, char* fullname, size_t size) { static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta, char* fullname, size_t size) {
char thread_buf[SMALL_BUFF_LEN]; char thread_buf[SMALL_BUFF_LEN];
char start_time_buf[MIDDLE_BUFF_LEN]; char start_time_buf[MIDDLE_BUFF_LEN];
@ -287,28 +296,29 @@ static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_me
int ret = -1; int ret = -1;
const char* base_path = g_arguments->output_path; const char* base_path = g_arguments->output_path;
const char* file_prefix = g_arguments->csv_file_prefix; const char* file_prefix = g_arguments->csv_file_prefix;
const char* gzip_suffix = csvGetGzipFilePrefix();
switch (meta->naming_type) { switch (meta->naming_type) {
case CSV_NAMING_I_SINGLE: { case CSV_NAMING_I_SINGLE: {
ret = snprintf(fullname, size, "%s%s.csv", base_path, file_prefix); ret = snprintf(fullname, size, "%s%s.csv%s", base_path, file_prefix, g_arguments->csv_compress_level, gzip_suffix);
break; break;
} }
case CSV_NAMING_I_TIME_SLICE: { case CSV_NAMING_I_TIME_SLICE: {
csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf)); csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf));
csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf)); csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf));
ret = snprintf(fullname, size, "%s%s_%s_%s.csv", base_path, file_prefix, start_time_buf, end_time_buf); ret = snprintf(fullname, size, "%s%s_%s_%s.csv%s", base_path, file_prefix, start_time_buf, end_time_buf, gzip_suffix);
break; break;
} }
case CSV_NAMING_B_THREAD: { case CSV_NAMING_B_THREAD: {
(void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id); (void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id);
ret = snprintf(fullname, size, "%s%s_%s.csv", base_path, file_prefix, thread_buf); ret = snprintf(fullname, size, "%s%s_%s.csv%s", base_path, file_prefix, thread_buf, gzip_suffix);
break; break;
} }
case CSV_NAMING_B_THREAD_TIME_SLICE: { case CSV_NAMING_B_THREAD_TIME_SLICE: {
(void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id); (void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id);
csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf)); csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf));
csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf)); csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf));
ret = snprintf(fullname, size, "%s%s_%s_%s_%s.csv", base_path, file_prefix, thread_buf, start_time_buf, end_time_buf); ret = snprintf(fullname, size, "%s%s_%s_%s_%s.csv%s", base_path, file_prefix, thread_buf, start_time_buf, end_time_buf, gzip_suffix);
break; break;
} }
default: { default: {
@ -463,14 +473,91 @@ static void csvFreeCtbTagData(CsvThreadMeta* thread_meta, CsvRowTagsBuf* tags_bu
} }
static int csvWriteFile(FILE* fp, uint64_t ctb_idx, int64_t cur_ts, int64_t* ck, CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) { static CsvFileHandle* csvOpen(const char* filename, CsvCompressionLevel compress_level) {
CsvFileHandle* fhdl = NULL;
bool failed = false;
fhdl = (CsvFileHandle*)benchCalloc(1, sizeof(CsvFileHandle), false);
if (!fhdl) {
errorPrint("Failed to malloc csv file handle. filename: %s, compress level: %d.\n",
filename, compress_level);
return NULL;
}
if (compress_level == CSV_COMPRESS_NONE) {
fhdl.handle.fp = fopen(filename, "w");
failed = (!fhdl.handle.fp);
} else {
char mode[TINY_BUFF_LEN];
(void)snprintf(mode, sizeof(mode), "wb%d", compress_level);
fhdl.handle.gf = gzopen(filename, mode);
failed = (!fhdl.handle.gf);
}
if (failed) {
tmfree(fhdl);
errorPrint("Failed to open csv file handle. filename: %s, compress level: %d.\n",
filename, compress_level);
return NULL;
} else {
fhdl->filename = filename;
fhdl->compress_level = compress_level;
fhdl->result = CSV_ERR_OK;
return fhdl;
}
}
static CsvIoError csvWrite(CsvFileHandle* fhdl, const char* buf, size_t size) {
if (fhdl->compress_level == CSV_COMPRESS_NONE) {
size_t ret = fwrite(buf, 1, size, fhdl->handle.fp);
if (ret != size) {
errorPrint("Failed to write csv file: %s. expected written %zu but %zu.\n",
fhdl->filename, size, ret);
if (ferror(fhdl->handle.fp)) {
perror("error");
}
fhdl->result = CSV_ERR_WRITE_FAILED;
return CSV_ERR_WRITE_FAILED;
}
} else {
unsigned int ret = gzwrite(fhdl->handle.gf, buf, size);
if (ret != size) {
errorPrint("Failed to write csv file: %s. expected written %zu but %zu.\n",
fhdl->filename, size, ret);
int errnum;
const char* errmsg = gzerror(fhdl->handle.gf, &errnum);
errorPrint("gzwrite error: %s\n", errmsg);
fhdl->result = CSV_ERR_WRITE_FAILED;
return CSV_ERR_WRITE_FAILED;
}
}
return CSV_ERR_OK;
}
static void csvClose(CsvFileHandle* fhdl) {
if (fhdl->compress_level == CSV_COMPRESS_NONE) {
if (fhdl->handle.fp) {
fclose(fhdl->handle.fp);
fhdl->handle.fp = NULL;
}
} else {
if (fhdl->handle.gf) {
gzclose(fhdl->handle.gf);
fhdl->handle.gf = NULL;
}
}
}
static int csvWriteFile(CsvFileHandle* fhdl, uint64_t ctb_idx, int64_t cur_ts, int64_t* ck, CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) {
SDataBase* db = write_meta->db; SDataBase* db = write_meta->db;
SSuperTable* stb = write_meta->stb; SSuperTable* stb = write_meta->stb;
CsvRowTagsBuf* tags_buf_bucket = thread_meta->tags_buf_bucket; CsvRowTagsBuf* tags_buf_bucket = thread_meta->tags_buf_bucket;
CsvRowColsBuf* tags_buf = &tags_buf_bucket[ctb_idx]; CsvRowColsBuf* tags_buf = &tags_buf_bucket[ctb_idx];
CsvRowColsBuf* cols_buf = thread_meta->cols_buf; CsvRowColsBuf* cols_buf = thread_meta->cols_buf;
int ret = 0; int ret = 0;
size_t written = 0;
ret = csvGenRowColData(cols_buf->buf, cols_buf->buf_size, stb, cur_ts, db->precision, ck); ret = csvGenRowColData(cols_buf->buf, cols_buf->buf_size, stb, cur_ts, db->precision, ck);
@ -484,23 +571,37 @@ static int csvWriteFile(FILE* fp, uint64_t ctb_idx, int64_t cur_ts, int64_t* ck,
// write header // write header
if (thread_meta->output_header) { if (thread_meta->output_header) {
written = fwrite(write_meta->csv_header, 1, write_meta->csv_header_length, fp); ret = csvWrite(fhdl, write_meta->csv_header, write_meta->csv_header_length);
if (ret != CSV_ERR_OK) {
errorPrint("Failed to write csv header data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n",
db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx);
return -1;
}
thread_meta->output_header = false; thread_meta->output_header = false;
} }
// write columns // write columns
written = fwrite(cols_buf->buf, 1, cols_buf->length, fp); ret = csvWrite(fhdl, cols_buf->buf, cols_buf->length);
if (written != cols_buf->length) { if (ret != CSV_ERR_OK) {
errorPrint("Failed to write csv column data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", errorPrint("Failed to write csv column data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n",
cols_buf->length, written, db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx); db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx);
return -1; return -1;
} }
// write tags // write tags
written = fwrite(tags_buf->buf, 1, tags_buf->length, fp); ret = csvWrite(fhdl, tags_buf->buf, tags_buf->length);
if (written != tags_buf->length) { if (ret != CSV_ERR_OK) {
errorPrint("Failed to write csv tag data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n", errorPrint("Failed to write csv tag data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n",
tags_buf->length, written, db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx); db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx);
return -1;
}
// write line break
ret = csvWrite(fhdl, "\n", 1);
if (ret != CSV_ERR_OK) {
errorPrint("Failed to write csv line break data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n",
db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx);
return -1; return -1;
} }
@ -523,7 +624,7 @@ static void* csvGenStbThread(void* arg) {
int64_t ck = 0; int64_t ck = 0;
uint64_t ctb_idx = 0; uint64_t ctb_idx = 0;
int ret = 0; int ret = 0;
FILE* fp = NULL; CsvFileHandle* fhdl = NULL;
char fullname[MAX_PATH_LEN] = {}; char fullname[MAX_PATH_LEN] = {};
@ -565,8 +666,8 @@ static void* csvGenStbThread(void* arg) {
} }
// create fd // create fd
fp = fopen(fullname, "w"); fhdl = csvOpen(fullname, g_arguments->csv_compress_level);
if (fp == NULL) { if (fhdl == NULL) {
errorPrint("Failed to create csv file. thread index: %d, file: %s, errno: %d, strerror: %s.\n", errorPrint("Failed to create csv file. thread index: %d, file: %s, errno: %d, strerror: %s.\n",
thread_meta->thread_id, fullname, errno, strerror(errno)); thread_meta->thread_id, fullname, errno, strerror(errno));
goto end; goto end;
@ -583,11 +684,11 @@ static void* csvGenStbThread(void* arg) {
for (ctb_idx = 0; ctb_idx < thread_meta->ctb_count; ++ctb_idx) { for (ctb_idx = 0; ctb_idx < thread_meta->ctb_count; ++ctb_idx) {
for (slice_ctb_cur_ts = slice_cur_ts; slice_ctb_cur_ts < slice_batch_ts; slice_ctb_cur_ts += write_meta->stb->timestamp_step) { for (slice_ctb_cur_ts = slice_cur_ts; slice_ctb_cur_ts < slice_batch_ts; slice_ctb_cur_ts += write_meta->stb->timestamp_step) {
ret = csvWriteFile(fp, ctb_idx, slice_ctb_cur_ts, &ck, write_meta, thread_meta); ret = csvWriteFile(fhdl, ctb_idx, slice_ctb_cur_ts, &ck, write_meta, thread_meta);
if (!ret) { if (!ret) {
errorPrint("Failed to write csv file. thread index: %d, file: %s, errno: %d, strerror: %s.\n", errorPrint("Failed to write csv file. thread index: %d, file: %s, errno: %d, strerror: %s.\n",
thread_meta->thread_id, fullname, errno, strerror(errno)); thread_meta->thread_id, fullname, errno, strerror(errno));
fclose(fp); csvClose(fhdl);
goto end; goto end;
} }
@ -598,7 +699,7 @@ static void* csvGenStbThread(void* arg) {
slice_cur_ts = slice_batch_ts; slice_cur_ts = slice_batch_ts;
} }
fclose(fp); csvClose(fhdl);
csvUpdateSliceRange(write_meta, thread_meta, last_end_ts); csvUpdateSliceRange(write_meta, thread_meta, last_end_ts);
} }

View File

@ -14,6 +14,7 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <bench.h> #include <bench.h>
#include "benchLog.h" #include "benchLog.h"
#include "benchCsv.h"
extern char g_configDir[MAX_PATH_LEN]; extern char g_configDir[MAX_PATH_LEN];
@ -1636,6 +1637,22 @@ static int getMetaFromCommonJsonFile(tools_cJSON *json) {
g_arguments->csv_tbname_alias = "device_id"; g_arguments->csv_tbname_alias = "device_id";
} }
// csv compression level
tools_cJSON* cl = tools_cJSON_GetObjectItem(json, "csv_compress_level");
if (cl && cl->type == tools_cJSON_String && cl->valuestring != NULL) {
if (0 == strcasecmp(cl->valuestring, "none")) {
g_arguments->csv_compress_level = CSV_COMPRESS_NONE;
} else if (0 == strcasecmp(cl->valuestring, "fast")) {
g_arguments->csv_compress_level = CSV_COMPRESS_FAST;
} else if (0 == strcasecmp(cl->valuestring, "balance")) {
g_arguments->csv_compress_level = CSV_COMPRESS_BALANCE;
} else if (0 == strcasecmp(cl->valuestring, "best")) {
g_arguments->csv_compress_level = CSV_COMPRESS_BEST;
}
} else {
g_arguments->csv_compress_level = CSV_COMPRESS_NONE;
}
code = 0; code = 0;
return code; return code;
} }