fix: resolve csv compilation errors

This commit is contained in:
Yaming Pei 2025-03-03 14:34:15 +08:00
parent 5105f27b02
commit 8ba478cad0
5 changed files with 139 additions and 136 deletions

View File

@ -719,6 +719,14 @@ typedef struct STmqMetaInfo_S {
uint16_t iface;
} STmqMetaInfo;
typedef enum {
CSV_COMPRESS_NONE = 0,
CSV_COMPRESS_FAST = 1,
CSV_COMPRESS_BALANCE = 6,
CSV_COMPRESS_BEST = 9
} CsvCompressionLevel;
typedef struct SArguments_S {
uint8_t taosc_version;
char * metaFile;
@ -786,9 +794,10 @@ typedef struct SArguments_S {
char* csv_file_prefix;
char* csv_ts_format;
char* csv_ts_interval;
char* csv_tbname_alias;
long csv_ts_intv_secs;
bool csv_output_header;
bool csv_tbname_alias;
CsvCompressionLevel csv_compress_level;
} SArguments;

View File

@ -27,13 +27,6 @@ typedef enum {
CSV_NAMING_B_THREAD_TIME_SLICE
} CsvNamingType;
typedef enum {
CSV_COMPRESS_NONE = 0,
CSV_COMPRESS_FAST = 1,
CSV_COMPRESS_BALANCE = 6,
CSV_COMPRESS_BEST = 9
} CsvCompressionLevel;
typedef enum {
CSV_ERR_OK = 0,
CSV_ERR_OPEN_FAILED,
@ -85,7 +78,7 @@ typedef struct {
size_t thread_id;
bool output_header;
int tags_buf_size;
CsvRowTagsBuf* tags_buf_bucket;
CsvRowTagsBuf* tags_buf_array;
CsvRowColsBuf* cols_buf;
} CsvThreadMeta;

View File

@ -16,6 +16,8 @@
#ifndef INC_BENCHLOG_H_
#define INC_BENCHLOG_H_
#include <stdbool.h>
//
// suport thread safe log module
//
@ -53,7 +55,7 @@ void exitLog();
(int32_t)timeSecs.tv_usec); \
fprintf(stdout, "DEBG: "); \
fprintf(stdout, "%s(%d) ", __FILE__, __LINE__); \
fprintf(stdout, "" fmt, __VA_ARGS__); \
fprintf(stdout, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_STDOUT); \
} \
} while (0)
@ -74,7 +76,7 @@ void exitLog();
(int32_t)timeSecs.tv_usec); \
fprintf(stdout, "DEBG: "); \
fprintf(stdout, "%s(%d) ", __FILE__, __LINE__); \
fprintf(stdout, "" fmt, __VA_ARGS__); \
fprintf(stdout, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_STDOUT); \
} \
} while (0)
@ -94,7 +96,7 @@ void exitLog();
do { \
if (g_arguments->debug_print) { \
lockLog(LOG_STDOUT); \
fprintf(stdout, "" fmt, __VA_ARGS__); \
fprintf(stdout, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_STDOUT); \
} \
} while (0)
@ -102,14 +104,14 @@ void exitLog();
#define infoPrintNoTimestamp(fmt, ...) \
do { \
lockLog(LOG_STDOUT); \
fprintf(stdout, "" fmt, __VA_ARGS__); \
fprintf(stdout, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_STDOUT); \
} while (0)
#define infoPrintNoTimestampToFile(fmt, ...) \
do { \
lockLog(LOG_RESULT); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_RESULT); \
} while (0)
@ -126,7 +128,7 @@ void exitLog();
ptm->tm_mon + 1, \
ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \
(int32_t)timeSecs.tv_usec); \
fprintf(stdout, "INFO: " fmt, __VA_ARGS__); \
fprintf(stdout, "INFO: " fmt, ##__VA_ARGS__); \
unlockLog(LOG_STDOUT); \
} while (0)
@ -142,7 +144,7 @@ void exitLog();
fprintf(g_arguments->fpOfInsertResult,"[%02d/%02d %02d:%02d:%02d.%06d] ", ptm->tm_mon + 1, \
ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \
(int32_t)timeSecs.tv_usec); \
fprintf(g_arguments->fpOfInsertResult, "INFO: " fmt, __VA_ARGS__);\
fprintf(g_arguments->fpOfInsertResult, "INFO: " fmt, ##__VA_ARGS__);\
unlockLog(LOG_RESULT); \
} while (0)
@ -160,7 +162,7 @@ void exitLog();
ptm->tm_mon + 1, \
ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \
(int32_t)timeSecs.tv_usec); \
fprintf(stderr, "PERF: " fmt, __VA_ARGS__); \
fprintf(stderr, "PERF: " fmt, ##__VA_ARGS__); \
unlockLog(LOG_STDERR); \
if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \
lockLog(LOG_RESULT); \
@ -172,7 +174,7 @@ void exitLog();
(int32_t)timeSecs.tv_usec); \
fprintf(g_arguments->fpOfInsertResult, "PERF: "); \
fprintf(g_arguments->fpOfInsertResult, \
"" fmt, __VA_ARGS__); \
"" fmt, ##__VA_ARGS__); \
unlockLog(LOG_RESULT); \
} \
} \
@ -196,7 +198,7 @@ void exitLog();
if (g_arguments->debug_print) { \
fprintf(stderr, "%s(%d) ", __FILE__, __LINE__); \
} \
fprintf(stderr, "" fmt, __VA_ARGS__); \
fprintf(stderr, "" fmt, ##__VA_ARGS__); \
fprintf(stderr, "\033[0m"); \
unlockLog(LOG_STDERR); \
if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \
@ -206,7 +208,7 @@ void exitLog();
ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \
(int32_t)timeSecs.tv_usec); \
fprintf(g_arguments->fpOfInsertResult, "ERROR: "); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_RESULT); \
} \
} while (0)
@ -229,7 +231,7 @@ void exitLog();
if (g_arguments->debug_print) { \
fprintf(stderr, "%s(%d) ", __FILE__, __LINE__); \
} \
fprintf(stderr, "" fmt, __VA_ARGS__); \
fprintf(stderr, "" fmt, ##__VA_ARGS__); \
fprintf(stderr, "\033[0m"); \
unlockLog(LOG_STDERR); \
if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \
@ -239,7 +241,7 @@ void exitLog();
ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \
(int32_t)timeSecs.tv_usec); \
fprintf(g_arguments->fpOfInsertResult, "WARN: "); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_RESULT); \
} \
} while (0)
@ -262,7 +264,7 @@ void exitLog();
if (g_arguments->debug_print) { \
fprintf(stderr, "%s(%d) ", __FILE__, __LINE__); \
} \
fprintf(stderr, "" fmt, __VA_ARGS__); \
fprintf(stderr, "" fmt, ##__VA_ARGS__); \
fprintf(stderr, "\033[0m"); \
unlockLog(LOG_STDERR); \
if (g_arguments->fpOfInsertResult && !g_arguments->terminate) { \
@ -272,7 +274,7 @@ void exitLog();
ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, \
(int32_t)timeSecs.tv_usec); \
fprintf(g_arguments->fpOfInsertResult, "SUCC: "); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, __VA_ARGS__); \
fprintf(g_arguments->fpOfInsertResult, "" fmt, ##__VA_ARGS__); \
unlockLog(LOG_RESULT); \
} \
} while (0)

View File

@ -14,6 +14,7 @@
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <locale.h>
#include "benchLog.h"
#include "benchData.h"
@ -77,25 +78,25 @@ static CsvNamingType csvGetFileNamingType(SSuperTable* stb) {
}
static void csvCalcTimestampStep(CsvWriteMeta* meta) {
static void csvCalcTimestampStep(CsvWriteMeta* write_meta) {
time_t ts_step = 0;
if (meta->db->precision == TSDB_TIME_PRECISION_MICRO) {
if (write_meta->db->precision == TSDB_TIME_PRECISION_MICRO) {
ts_step = g_arguments->csv_ts_intv_secs * 1000000L;
} else if (db->precision == TSDB_TIME_PRECISION_NANO) {
} else if (write_meta->db->precision == TSDB_TIME_PRECISION_NANO) {
ts_step = g_arguments->csv_ts_intv_secs * 1000000000L;
} else {
ts_step = g_arguments->csv_ts_intv_secs * 1000L;
}
meta->ts_step = ts_step;
write_meta->ts_step = ts_step;
return;
}
static void csvCalcCtbRange(CsvThreadMeta* meta, size_t total_threads, int64_t ctb_offset, int64_t ctb_count) {
static void csvCalcCtbRange(CsvThreadMeta* thread_meta, size_t total_threads, int64_t ctb_offset, int64_t ctb_count) {
uint64_t ctb_start_idx = 0;
uint64_t ctb_end_idx = 0;
size_t tid_idx = meta->thread_id - 1;
size_t tid_idx = thread_meta->thread_id - 1;
size_t base = ctb_count / total_threads;
size_t remainder = ctb_count % total_threads;
@ -111,35 +112,34 @@ static void csvCalcCtbRange(CsvThreadMeta* meta, size_t total_threads, int64_t c
ctb_end_idx = ctb_offset + ctb_count;
}
meta->ctb_start_idx = ctb_start_idx;
meta->ctb_end_idx = ctb_end_idx;
meta->ctb_count = ctb_count;
thread_meta->ctb_start_idx = ctb_start_idx;
thread_meta->ctb_end_idx = ctb_end_idx;
thread_meta->ctb_count = ctb_count;
return;
}
static void csvGenThreadFormatter(CsvWriteMeta* meta) {
static void csvGenThreadFormatter(CsvWriteMeta* write_meta) {
int digits = 0;
if (meta->total_threads == 0) {
if (write_meta->total_threads == 0) {
digits = 1;
} else {
for (int n = meta->total_threads; n > 0; n /= 10) {
for (int n = write_meta->total_threads; n > 0; n /= 10) {
digits++;
}
}
if (digits <= 1) {
(void)snprintf(meta->thread_formatter, sizeof(meta->thread_formatter), "%%d");
(void)snprintf(write_meta->thread_formatter, sizeof(write_meta->thread_formatter), "%%d");
} else {
(void)snprintf(meta->thread_formatter, sizeof(meta->thread_formatter), "%%0%dd", digits);
(void)snprintf(write_meta->thread_formatter, sizeof(write_meta->thread_formatter), "%%0%dd", digits);
}
return;
}
static int csvGenCsvHeader(CsvWriteMeta* write_meta) {
SDataBase* db = write_meta->db;
SSuperTable* stb = write_meta->stb;
char* buf = write_meta->csv_header;
int pos = 0;
@ -190,7 +190,7 @@ int csvGenCreateDbSql(SDataBase* db, char* buf, int size) {
}
if (db->cfgs) {
for (size i = 0; i < db->cfgs->size; ++i) {
for (size_t i = 0; i < db->cfgs->size; ++i) {
SDbCfg* cfg = benchArrayGet(db->cfgs, i);
if (cfg->valuestring) {
pos += snprintf(buf + pos, size - pos, " %s %s", cfg->name, cfg->valuestring);
@ -224,7 +224,7 @@ static int csvExportCreateDbSql(CsvWriteMeta* write_meta, FILE* fp) {
length = csvGenCreateDbSql(write_meta->db, buf, sizeof(buf));
if (length < 0) {
errorPrint("Failed to generate create db sql, maybe buffer[%d] not enough.\n", sizeof(buf));
errorPrint("Failed to generate create db sql, maybe buffer[%zu] not enough.\n", sizeof(buf));
return -1;
}
@ -256,7 +256,7 @@ int csvGenCreateStbSql(SDataBase* db, SSuperTable* stb, char* buf, int size) {
// columns
for (sizt_t i = 0; i < stb->cols->size; ++i) {
for (size_t i = 0; i < stb->cols->size; ++i) {
Field* col = benchArrayGet(stb->cols, i);
if (col->type == TSDB_DATA_TYPE_BINARY
@ -265,7 +265,7 @@ int csvGenCreateStbSql(SDataBase* db, SSuperTable* stb, char* buf, int size) {
|| col->type == TSDB_DATA_TYPE_GEOMETRY) {
if (col->type == TSDB_DATA_TYPE_GEOMETRY && col->length < 21) {
errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__, i);
errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %zu.\n", __func__, __LINE__, i);
return -1;
}
@ -301,7 +301,7 @@ int csvGenCreateStbSql(SDataBase* db, SSuperTable* stb, char* buf, int size) {
// tags
for (sizt_t i = 0; i < stb->tags->size; ++i) {
for (size_t i = 0; i < stb->tags->size; ++i) {
Field* tag = benchArrayGet(stb->tags, i);
if (i > 0) {
@ -315,7 +315,7 @@ int csvGenCreateStbSql(SDataBase* db, SSuperTable* stb, char* buf, int size) {
|| tag->type == TSDB_DATA_TYPE_GEOMETRY) {
if (tag->type == TSDB_DATA_TYPE_GEOMETRY && tag->length < 21) {
errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__, i);
errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %zu.\n", __func__, __LINE__, i);
return -1;
}
@ -397,7 +397,7 @@ static int csvExportCreateStbSql(CsvWriteMeta* write_meta, FILE* fp) {
length = csvGenCreateStbSql(write_meta->db, write_meta->stb, buf, sizeof(buf));
if (length < 0) {
errorPrint("Failed to generate create stb sql, maybe buffer[%d] not enough.\n", sizeof(buf));
errorPrint("Failed to generate create stb sql, maybe buffer[%zu] not enough.\n", sizeof(buf));
return -1;
}
@ -417,7 +417,6 @@ static int csvExportCreateStbSql(CsvWriteMeta* write_meta, FILE* fp) {
static int csvExportCreateSql(CsvWriteMeta* write_meta) {
char fullname[MAX_PATH_LEN] = {};
char buf[LARGE_BUFF_LEN] = {};
int ret = 0;
int length = 0;
FILE* fp = NULL;
@ -428,7 +427,7 @@ static int csvExportCreateSql(CsvWriteMeta* write_meta) {
return -1;
}
FILE* fp = fopen(fullname, "w");
fp = fopen(fullname, "w");
if (!fp) {
return -1;
}
@ -475,7 +474,7 @@ static int csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* write
return -1;
}
switch (meta.naming_type) {
switch (write_meta->naming_type) {
case CSV_NAMING_I_SINGLE: {
break;
}
@ -484,18 +483,18 @@ static int csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* write
break;
}
case CSV_NAMING_B_THREAD: {
meta.total_threads = g_arguments->nthreads;
write_meta->total_threads = g_arguments->nthreads;
csvGenThreadFormatter(write_meta);
break;
}
case CSV_NAMING_B_THREAD_TIME_SLICE: {
meta.total_threads = g_arguments->nthreads;
write_meta->total_threads = g_arguments->nthreads;
csvGenThreadFormatter(write_meta);
csvCalcTimestampStep(write_meta);
break;
}
default: {
meta.naming_type = CSV_NAMING_I_SINGLE;
write_meta->naming_type = CSV_NAMING_I_SINGLE;
break;
}
}
@ -516,10 +515,10 @@ static void csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id, CsvT
thread_meta->thread_id = thread_id;
thread_meta->output_header = false;
thread_meta->tags_buf_size = 0;
thread_meta->tags_buf_bucket = NULL;
thread_meta->tags_buf_array = NULL;
thread_meta->cols_buf = NULL;
csvCalcCtbRange(write_meta, write_meta->total_threads, stb->childTblFrom, stb->childTblCount);
csvCalcCtbRange(thread_meta, write_meta->total_threads, stb->childTblFrom, stb->childTblCount);
switch (write_meta->naming_type) {
case CSV_NAMING_I_SINGLE:
@ -533,7 +532,6 @@ static void csvInitThreadMeta(CsvWriteMeta* write_meta, uint32_t thread_id, CsvT
break;
}
default: {
thread_meta->naming_type = CSV_NAMING_I_SINGLE;
break;
}
}
@ -553,7 +551,7 @@ static void csvUpdateSliceRange(CsvWriteMeta* write_meta, CsvThreadMeta* thread_
case CSV_NAMING_I_TIME_SLICE:
case CSV_NAMING_B_THREAD_TIME_SLICE: {
thread_meta->start_secs = csvGetStartSeconds(db->precision, last_end_ts);
thread_meta->end_secs = thread_meta.start_secs + g_arguments->csv_ts_intv_secs;
thread_meta->end_secs = thread_meta->start_secs + g_arguments->csv_ts_intv_secs;
break;
}
default: {
@ -569,7 +567,7 @@ static const char* csvGetGzipFilePrefix() {
if (g_arguments->csv_compress_level == CSV_COMPRESS_NONE) {
return "";
} else {
return ".gz"
return ".gz";
}
}
@ -583,26 +581,26 @@ static int csvGetFileFullname(CsvWriteMeta* write_meta, CsvThreadMeta* thread_me
const char* file_prefix = g_arguments->csv_file_prefix;
const char* gzip_suffix = csvGetGzipFilePrefix();
switch (meta->naming_type) {
switch (write_meta->naming_type) {
case CSV_NAMING_I_SINGLE: {
ret = snprintf(fullname, size, "%s%s.csv%s", base_path, file_prefix, g_arguments->csv_compress_level, gzip_suffix);
ret = snprintf(fullname, size, "%s%s.csv%s", base_path, file_prefix, gzip_suffix);
break;
}
case CSV_NAMING_I_TIME_SLICE: {
csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf));
csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf));
csvConvertTime2String(thread_meta->start_secs, start_time_buf, sizeof(start_time_buf));
csvConvertTime2String(thread_meta->end_secs, end_time_buf, sizeof(end_time_buf));
ret = snprintf(fullname, size, "%s%s_%s_%s.csv%s", base_path, file_prefix, start_time_buf, end_time_buf, gzip_suffix);
break;
}
case CSV_NAMING_B_THREAD: {
(void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id);
(void)snprintf(thread_buf, sizeof(thread_buf), write_meta->thread_formatter, thread_meta->thread_id);
ret = snprintf(fullname, size, "%s%s_%s.csv%s", base_path, file_prefix, thread_buf, gzip_suffix);
break;
}
case CSV_NAMING_B_THREAD_TIME_SLICE: {
(void)snprintf(thread_buf, sizeof(thread_buf), meta->thread_formatter, meta->thread_id);
csvConvertTime2String(meta->start_secs, start_time_buf, sizeof(start_time_buf));
csvConvertTime2String(meta->end_secs, end_time_buf, sizeof(end_time_buf));
(void)snprintf(thread_buf, sizeof(thread_buf), write_meta->thread_formatter, thread_meta->thread_id);
csvConvertTime2String(thread_meta->start_secs, start_time_buf, sizeof(start_time_buf));
csvConvertTime2String(thread_meta->end_secs, end_time_buf, sizeof(end_time_buf));
ret = snprintf(fullname, size, "%s%s_%s_%s_%s.csv%s", base_path, file_prefix, thread_buf, start_time_buf, end_time_buf, gzip_suffix);
break;
}
@ -699,6 +697,24 @@ static int csvGenRowColData(char* buf, int size, SSuperTable* stb, int64_t ts, i
}
static void csvFreeCtbTagData(CsvThreadMeta* thread_meta, CsvRowTagsBuf* tags_buf_array) {
if (!thread_meta || !tags_buf_array) {
return;
}
for (uint64_t i = 0 ; i < thread_meta->ctb_count; ++i) {
char* tags_buf = tags_buf_array[i].buf;
if (tags_buf) {
tmfree(tags_buf_array);
} else {
break;
}
}
tmfree(tags_buf_array);
return;
}
static CsvRowTagsBuf* csvGenCtbTagData(CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) {
SSuperTable* stb = write_meta->stb;
int ret = 0;
@ -708,8 +724,8 @@ static CsvRowTagsBuf* csvGenCtbTagData(CsvWriteMeta* write_meta, CsvThreadMeta*
return NULL;
}
CsvRowTagsBuf* tags_buf_bucket = (CsvRowTagsBuf*)benchCalloc(thread_meta->ctb_count, sizeof(CsvRowTagsBuf), true);
if (!tags_buf_bucket) {
CsvRowTagsBuf* tags_buf_array = (CsvRowTagsBuf*)benchCalloc(thread_meta->ctb_count, sizeof(CsvRowTagsBuf), true);
if (!tags_buf_array) {
return NULL;
}
@ -721,43 +737,25 @@ static CsvRowTagsBuf* csvGenCtbTagData(CsvWriteMeta* write_meta, CsvThreadMeta*
goto error;
}
tags_buf_bucket[i].buf = tags_buf;
write_meta->tags_buf_size = tags_buf_size;
tags_buf_array[i].buf = tags_buf;
thread_meta->tags_buf_size = tags_buf_size;
ret = csvGenRowTagData(tags_buf, tags_buf_size, stb, thread_meta->ctb_start_idx + i, &tk);
if (ret <= 0) {
goto error;
}
tags_buf_bucket[i].length = ret;
tags_buf_array[i].length = ret;
}
return tags_buf_bucket;
return tags_buf_array;
error:
csvFreeCtbTagData(thread_meta, tags_buf_bucket);
csvFreeCtbTagData(thread_meta, tags_buf_array);
return NULL;
}
static void csvFreeCtbTagData(CsvThreadMeta* thread_meta, CsvRowTagsBuf* tags_buf_bucket) {
if (!thread_meta || !tags_buf_bucket) {
return;
}
for (uint64_t i = 0 ; i < thread_meta->ctb_count; ++i) {
char* tags_buf = tags_buf_bucket[i].buf;
if (tags_buf) {
tmfree(tags_buf_bucket);
} else {
break;
}
}
tmfree(tags_buf_bucket);
return;
}
static CsvFileHandle* csvOpen(const char* filename, CsvCompressionLevel compress_level) {
CsvFileHandle* fhdl = NULL;
bool failed = false;
@ -770,13 +768,13 @@ static CsvFileHandle* csvOpen(const char* filename, CsvCompressionLevel compress
}
if (compress_level == CSV_COMPRESS_NONE) {
fhdl.handle.fp = fopen(filename, "w");
failed = (!fhdl.handle.fp);
fhdl->handle.fp = fopen(filename, "w");
failed = (!fhdl->handle.fp);
} else {
char mode[TINY_BUFF_LEN];
(void)snprintf(mode, sizeof(mode), "wb%d", compress_level);
fhdl.handle.gf = gzopen(filename, mode);
failed = (!fhdl.handle.gf);
fhdl->handle.gf = gzopen(filename, mode);
failed = (!fhdl->handle.gf);
}
if (failed) {
@ -806,9 +804,9 @@ static CsvIoError csvWrite(CsvFileHandle* fhdl, const char* buf, size_t size) {
return CSV_ERR_WRITE_FAILED;
}
} else {
unsigned int ret = gzwrite(fhdl->handle.gf, buf, size);
int ret = gzwrite(fhdl->handle.gf, buf, size);
if (ret != size) {
errorPrint("Failed to write csv file: %s. expected written %zu but %zu.\n",
errorPrint("Failed to write csv file: %s. expected written %zu but %d.\n",
fhdl->filename, size, ret);
int errnum;
const char* errmsg = gzerror(fhdl->handle.gf, &errnum);
@ -839,16 +837,16 @@ static void csvClose(CsvFileHandle* fhdl) {
static int csvWriteFile(CsvFileHandle* fhdl, uint64_t ctb_idx, int64_t cur_ts, int64_t* ck, CsvWriteMeta* write_meta, CsvThreadMeta* thread_meta) {
SDataBase* db = write_meta->db;
SSuperTable* stb = write_meta->stb;
CsvRowTagsBuf* tags_buf_bucket = thread_meta->tags_buf_bucket;
CsvRowColsBuf* tags_buf = &tags_buf_bucket[ctb_idx];
CsvRowTagsBuf* tags_buf_array = thread_meta->tags_buf_array;
CsvRowTagsBuf* tags_buf = &tags_buf_array[ctb_idx];
CsvRowColsBuf* cols_buf = thread_meta->cols_buf;
int ret = 0;
ret = csvGenRowColData(cols_buf->buf, cols_buf->buf_size, stb, cur_ts, db->precision, ck);
if (ret <= 0) {
errorPrint("Failed to generate csv column data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n",
db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id, ctb_idx);
errorPrint("Failed to generate csv column data. database: %s, super table: %s, naming type: %d, thread index: %zu, ctb index: %" PRIu64 ".\n",
db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx);
return -1;
}
@ -858,7 +856,7 @@ static int csvWriteFile(CsvFileHandle* fhdl, uint64_t ctb_idx, int64_t cur_ts, i
if (thread_meta->output_header) {
ret = csvWrite(fhdl, write_meta->csv_header, write_meta->csv_header_length);
if (ret != CSV_ERR_OK) {
errorPrint("Failed to write csv header data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n",
errorPrint("Failed to write csv header data. database: %s, super table: %s, naming type: %d, thread index: %zu, ctb index: %" PRIu64 ".\n",
db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx);
return -1;
}
@ -869,7 +867,7 @@ static int csvWriteFile(CsvFileHandle* fhdl, uint64_t ctb_idx, int64_t cur_ts, i
// write columns
ret = csvWrite(fhdl, cols_buf->buf, cols_buf->length);
if (ret != CSV_ERR_OK) {
errorPrint("Failed to write csv column data, expected written %d but got %zu. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n",
errorPrint("Failed to write csv column data. database: %s, super table: %s, naming type: %d, thread index: %zu, ctb index: %" PRIu64 ".\n",
db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx);
return -1;
}
@ -877,7 +875,7 @@ static int csvWriteFile(CsvFileHandle* fhdl, uint64_t ctb_idx, int64_t cur_ts, i
// write tags
ret = csvWrite(fhdl, tags_buf->buf, tags_buf->length);
if (ret != CSV_ERR_OK) {
errorPrint("Failed to write csv tag data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n",
errorPrint("Failed to write csv tag data. database: %s, super table: %s, naming type: %d, thread index: %zu, ctb index: %" PRIu64 ".\n",
db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx);
return -1;
}
@ -885,7 +883,7 @@ static int csvWriteFile(CsvFileHandle* fhdl, uint64_t ctb_idx, int64_t cur_ts, i
// write line break
ret = csvWrite(fhdl, "\n", 1);
if (ret != CSV_ERR_OK) {
errorPrint("Failed to write csv line break data. database: %s, super table: %s, naming type: %d, thread index: %d, ctb index: %" PRIu64 ".\n",
errorPrint("Failed to write csv line break data. database: %s, super table: %s, naming type: %d, thread index: %zu, ctb index: %" PRIu64 ".\n",
db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id, ctb_idx);
return -1;
}
@ -922,10 +920,10 @@ static void* csvGenStbThread(void* arg) {
// tags buffer
CsvRowTagsBuf* tags_buf_bucket = csvGenCtbTagData(write_meta, thread_meta);
if (!tags_buf_bucket) {
errorPrint("Failed to generate csv tag data. database: %s, super table: %s, naming type: %d, thread index: %d.\n",
db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id);
CsvRowTagsBuf* tags_buf_array = csvGenCtbTagData(write_meta, thread_meta);
if (!tags_buf_array) {
errorPrint("Failed to generate csv tag data. database: %s, super table: %s, naming type: %d, thread index: %zu.\n",
db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id);
return NULL;
}
@ -933,8 +931,8 @@ static void* csvGenStbThread(void* arg) {
int buf_size = stb->lenOfCols + stb->cols->size;
char* buf = (char*)benchCalloc(1, buf_size, true);
if (!buf) {
errorPrint("Failed to malloc csv column buffer. database: %s, super table: %s, naming type: %d, thread index: %d.\n",
db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id);
errorPrint("Failed to malloc csv column buffer. database: %s, super table: %s, naming type: %d, thread index: %zu.\n",
db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id);
goto end;
}
@ -944,24 +942,23 @@ static void* csvGenStbThread(void* arg) {
.length = 0
};
thread_meta->tags_buf_bucket = tags_buf_bucket;
thread_meta->tags_buf_array = tags_buf_array;
thread_meta->cols_buf = &cols_buf;
start_print_ts = toolsGetTimestampMs();
for (cur_ts = write_meta->start_ts; cur_ts < write_meta->end_ts; cur_ts += write_meta->ts_step) {
// get filename
fullname[MAX_PATH_LEN] = {};
ret = csvGetFileFullname(write_meta, thread_meta, fullname, sizeof(fullname));
if (ret < 0) {
errorPrint("Failed to generate csv filename. database: %s, super table: %s, naming type: %d, thread index: %d.\n",
db->dbName, stb->stbName, write_meta.naming_type, thread_meta->thread_id);
errorPrint("Failed to generate csv filename. database: %s, super table: %s, naming type: %d, thread index: %zu.\n",
db->dbName, stb->stbName, write_meta->naming_type, thread_meta->thread_id);
goto end;
}
// create fd
fhdl = csvOpen(fullname, g_arguments->csv_compress_level);
if (fhdl == NULL) {
errorPrint("Failed to create csv file. thread index: %d, file: %s, errno: %d, strerror: %s.\n",
errorPrint("Failed to create csv file. thread index: %zu, file: %s, errno: %d, strerror: %s.\n",
thread_meta->thread_id, fullname, errno, strerror(errno));
goto end;
}
@ -972,7 +969,7 @@ static void* csvGenStbThread(void* arg) {
slice_end_ts = MIN(cur_ts + write_meta->ts_step, write_meta->end_ts);
file_rows = 0;
infoPrint("thread[%d] begin to write csv file: %s.\n", thread_meta->thread_id, fullname);
infoPrint("thread[%zu] begin to write csv file: %s.\n", thread_meta->thread_id, fullname);
// write data
while (slice_cur_ts < slice_end_ts) {
@ -982,7 +979,7 @@ static void* csvGenStbThread(void* arg) {
for (slice_ctb_cur_ts = slice_cur_ts; slice_ctb_cur_ts < slice_batch_ts; slice_ctb_cur_ts += write_meta->stb->timestamp_step) {
ret = csvWriteFile(fhdl, ctb_idx, slice_ctb_cur_ts, &ck, write_meta, thread_meta);
if (!ret) {
errorPrint("Failed to write csv file. thread index: %d, file: %s, errno: %d, strerror: %s.\n",
errorPrint("Failed to write csv file. thread index: %zu, file: %s, errno: %d, strerror: %s.\n",
thread_meta->thread_id, fullname, errno, strerror(errno));
csvClose(fhdl);
goto end;
@ -995,7 +992,7 @@ static void* csvGenStbThread(void* arg) {
cur_print_ts = toolsGetTimestampMs();
print_ts_elapse = cur_print_ts - pre_print_ts;
if (print_ts_elapse > 30000) {
infoPrint("thread[%d] has currently inserted rows: %" PRIu64 ", period insert rate: %.2f rows/s.\n",
infoPrint("thread[%zu] has currently inserted rows: %" PRIu64 ", period insert rate: %.2f rows/s.\n",
thread_meta->thread_id, total_rows, (total_rows - pre_total_rows) * 1000.0 / print_ts_elapse);
pre_print_ts = cur_print_ts;
@ -1014,18 +1011,18 @@ static void* csvGenStbThread(void* arg) {
}
csvClose(fhdl);
csvUpdateSliceRange(write_meta, thread_meta, last_end_ts);
csvUpdateSliceRange(write_meta, thread_meta, slice_end_ts);
}
cur_print_ts = toolsGetTimestampMs();
print_ts_elapse = cur_print_ts - start_print_ts;
succPrint("thread [%d] has completed inserting rows: %" PRIu64 ", insert rate %.2f rows/s.\n",
succPrint("thread [%zu] has completed inserting rows: %" PRIu64 ", insert rate %.2f rows/s.\n",
thread_meta->thread_id, total_rows, total_rows * 1000.0 / print_ts_elapse);
end:
thread_meta->total_rows = total_rows;
csvFreeCtbTagData(tags_buf_bucket);
csvFreeCtbTagData(thread_meta, tags_buf_array);
tmfree(buf);
return NULL;
}
@ -1038,8 +1035,12 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) {
int64_t start_ts = 0;
int64_t ts_elapse = 0;
CsvWriteMeta* write_meta = NULL;
CsvThreadArgs* args = NULL;
pthread_t* pids = NULL;
CsvWriteMeta* write_meta = benchCalloc(1, sizeof(CsvWriteMeta), false);
write_meta = benchCalloc(1, sizeof(CsvWriteMeta), false);
if (!write_meta) {
ret = -1;
goto end;
@ -1051,13 +1052,13 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) {
goto end;
}
CsvThreadArgs* args = benchCalloc(write_meta->total_threads, sizeof(CsvThreadArgs), false);
args = benchCalloc(write_meta->total_threads, sizeof(CsvThreadArgs), false);
if (!args) {
ret = -1;
goto end;
}
pthread_t* pids = benchCalloc(write_meta.total_threads, sizeof(pthread_t), false);
pids = benchCalloc(write_meta->total_threads, sizeof(pthread_t), false);
if (!pids) {
ret = -1;
goto end;
@ -1083,7 +1084,7 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) {
prompt = false;
}
infoPrint("pthread_join %d ...\n", i);
infoPrint("pthread_join %u ...\n", i);
pthread_join(pids[i], NULL);
}
@ -1097,7 +1098,7 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) {
ts_elapse = toolsGetTimestampMs() - start_ts;
if (ts_elapse > 0) {
succPrint("Spent %.6f seconds to insert rows: %" PRIu64 " with %d thread(s) into %s, at a rate of %.2f rows/s.\n",
succPrint("Spent %.6f seconds to insert rows: %" PRIu64 " with %zu thread(s) into %s, at a rate of %.2f rows/s.\n",
ts_elapse / 1000.0, total_rows, write_meta->total_threads, g_arguments->output_path, total_rows * 1000.0 / ts_elapse);
}
@ -1197,15 +1198,14 @@ static int csvParseParameter() {
// csv_output_path
size_t len = strlen(g_arguments->output_path);
if (len == 0) {
errorPrint("Failed to generate csv files, the specified output path is empty. Please provide a valid path. database: %s, super table: %s.\n",
db->dbName, stb->stbName);
errorPrint("Failed to generate csv files, the specified output path is empty. Please provide a valid path.\n");
return -1;
}
if (g_arguments->output_path[len - 1] != '/') {
int n = snprintf(g_arguments->output_path_buf, sizeof(g_arguments->output_path_buf), "%s/", g_arguments->output_path);
if (n < 0 || n >= sizeof(g_arguments->output_path_buf)) {
errorPrint("Failed to generate csv files, path buffer overflow risk when appending '/'. path: %s, database: %s, super table: %s.\n",
g_arguments->csv_output_path, db->dbName, stb->stbName);
errorPrint("Failed to generate csv files, path buffer overflow risk when appending '/'. path: %s.\n",
g_arguments->output_path);
return -1;
}
g_arguments->output_path = g_arguments->output_path_buf;
@ -1214,8 +1214,8 @@ static int csvParseParameter() {
// csv_ts_format
if (g_arguments->csv_ts_format) {
if (csvValidateParamTsFormat(g_arguments->csv_ts_format) != 0) {
errorPrint("Failed to generate csv files, the parameter `csv_ts_format` is invalid. csv_ts_format: %s, database: %s, super table: %s.\n",
g_arguments->csv_ts_format, db->dbName, stb->stbName);
errorPrint("Failed to generate csv files, the parameter `csv_ts_format` is invalid. csv_ts_format: %s.\n",
g_arguments->csv_ts_format);
return -1;
}
}
@ -1223,8 +1223,8 @@ static int csvParseParameter() {
// csv_ts_interval
long csv_ts_intv_secs = csvValidateParamTsInterval(g_arguments->csv_ts_interval);
if (csv_ts_intv_secs <= 0) {
errorPrint("Failed to generate csv files, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s, database: %s, super table: %s.\n",
g_arguments->csv_ts_interval, db->dbName, stb->stbName);
errorPrint("Failed to generate csv files, the parameter `csv_ts_interval` is invalid. csv_ts_interval: %s.\n",
g_arguments->csv_ts_interval);
return -1;
}
g_arguments->csv_ts_intv_secs = csv_ts_intv_secs;
@ -1237,7 +1237,7 @@ static int csvWriteThread() {
for (size_t i = 0; i < g_arguments->databases->size && !g_arguments->terminate; ++i) {
// database
SDataBase* db = benchArrayGet(g_arguments->databases, i);
if (database->superTbls) {
if (db->superTbls) {
for (size_t j = 0; j < db->superTbls->size && !g_arguments->terminate; ++j) {
// stb
SSuperTable* stb = benchArrayGet(db->superTbls, j);

View File

@ -14,7 +14,6 @@
#include <sys/stat.h>
#include <bench.h>
#include "benchLog.h"
#include "benchCsv.h"
extern char g_configDir[MAX_PATH_LEN];