feat: csv supports exporting create sql stmt

This commit is contained in:
Yaming Pei 2025-03-03 09:06:08 +08:00
parent 695e921105
commit 4d7c1a4067
2 changed files with 288 additions and 3 deletions

View File

@ -95,9 +95,6 @@ typedef struct {
} CsvThreadArgs; } CsvThreadArgs;
int csvTestProcess(); int csvTestProcess();
#endif // INC_BENCHCSV_H_ #endif // INC_BENCHCSV_H_

View File

@ -167,11 +167,296 @@ static int csvGenCsvHeader(CsvWriteMeta* write_meta) {
pos += snprintf(buf + pos, size - pos, ",%s", tag->name); pos += snprintf(buf + pos, size - pos, ",%s", tag->name);
} }
// line break
pos += snprintf(buf + pos, size - pos, "\n");
write_meta->csv_header_length = (pos > 0 && pos < size) ? pos : 0; write_meta->csv_header_length = (pos > 0 && pos < size) ? pos : 0;
return (pos > 0 && pos < size) ? 0 : -1; return (pos > 0 && pos < size) ? 0 : -1;
} }
int csvGenCreateDbSql(SDataBase* db, char* buf, int size) {
int pos = 0;
pos += snprintf(buf + pos, size - pos, "CREATE DATABASE IF NOT EXISTS ");
if (pos <= 0 || pos >= size) return -1;
pos += snprintf(buf + pos, size - pos, g_arguments->escape_character ? "`%s`" : "%s", db->dbName);
if (pos <= 0 || pos >= size) return -1;
if (-1 != g_arguments->inputted_vgroups) {
pos += snprintf(buf + pos, size - pos, " VGROUPS %d", g_arguments->inputted_vgroups);
if (pos <= 0 || pos >= size) return -1;
}
if (db->cfgs) {
for (size i = 0; i < db->cfgs->size; ++i) {
SDbCfg* cfg = benchArrayGet(db->cfgs, i);
if (cfg->valuestring) {
pos += snprintf(buf + pos, size - pos, " %s %s", cfg->name, cfg->valuestring);
} else {
pos += snprintf(buf + pos, size - pos, " %s %d", cfg->name, cfg->valueint);
}
if (pos <= 0 || pos >= size) return -1;
}
}
switch (db->precision) {
case TSDB_TIME_PRECISION_MILLI:
pos += snprintf(buf + pos, size - pos, " PRECISION 'ms';\n");
break;
case TSDB_TIME_PRECISION_MICRO:
pos += snprintf(buf + pos, size - pos, " PRECISION 'us';\n");
break;
case TSDB_TIME_PRECISION_NANO:
pos += snprintf(buf + pos, size - pos, " PRECISION 'ns';\n");
break;
}
return (pos > 0 && pos < size) ? pos : -1;
}
static int csvExportCreateDbSql(CsvWriteMeta* write_meta, FILE* fp) {
char buf[LARGE_BUFF_LEN] = {};
int ret = 0;
int length = 0;
length = csvGenCreateDbSql(write_meta->db, buf, sizeof(buf));
if (length < 0) {
errorPrint("Failed to generate create db sql, maybe buffer[%d] not enough.\n", sizeof(buf));
return -1;
}
ret = fwrite(buf, 1, length, fp);
if (ret != length) {
errorPrint("Failed to write create db sql: %s. expected written %d but %d.\n",
buf, length, ret);
if (ferror(fp)) {
perror("error");
}
return -1;
}
return 0;
}
int csvGenCreateStbSql(SDataBase* db, SSuperTable* stb, char* buf, int size) {
int pos = 0;
pos += snprintf(buf + pos, size - pos, "CREATE TABLE IF NOT EXISTS ");
if (pos <= 0 || pos >= size) return -1;
pos += snprintf(buf + pos, size - pos, g_arguments->escape_character ? "`%s`.`%s`" : "%s.%s", db->dbName, stb->stbName);
if (pos <= 0 || pos >= size) return -1;
pos += snprintf(buf + pos, size - pos, " (ts TIMESTAMP");
if (pos <= 0 || pos >= size) return -1;
// columns
for (sizt_t i = 0; i < stb->cols->size; ++i) {
Field* col = benchArrayGet(stb->cols, i);
if (col->type == TSDB_DATA_TYPE_BINARY
|| col->type == TSDB_DATA_TYPE_NCHAR
|| col->type == TSDB_DATA_TYPE_VARBINARY
|| col->type == TSDB_DATA_TYPE_GEOMETRY) {
if (col->type == TSDB_DATA_TYPE_GEOMETRY && col->length < 21) {
errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__, i);
return -1;
}
pos += snprintf(buf + pos, size - pos, ",%s %s(%d)", col->name, convertDatatypeToString(col->type), col->length);
} else {
pos += snprintf(buf + pos, size - pos, ",%s %s", col->name, convertDatatypeToString(col->type));
}
if (pos <= 0 || pos >= size) return -1;
// primary key
if (stb->primary_key && i == 0) {
pos += snprintf(buf + pos, size - pos, " %s", PRIMARY_KEY);
if (pos <= 0 || pos >= size) return -1;
}
// compress key
if (strlen(col->encode) > 0) {
pos += snprintf(buf + pos, size - pos, " encode '%s'", col->encode);
if (pos <= 0 || pos >= size) return -1;
}
if (strlen(col->compress) > 0) {
pos += snprintf(buf + pos, size - pos, " compress '%s'", col->compress);
if (pos <= 0 || pos >= size) return -1;
}
if (strlen(col->level) > 0) {
pos += snprintf(buf + pos, size - pos, " level '%s'", col->level);
if (pos <= 0 || pos >= size) return -1;
}
}
pos += snprintf(buf + pos, size - pos, ") TAGS (");
if (pos <= 0 || pos >= size) return -1;
// tags
for (sizt_t i = 0; i < stb->tags->size; ++i) {
Field* tag = benchArrayGet(stb->tags, i);
if (i > 0) {
pos += snprintf(buf + pos, size - pos, ",");
if (pos <= 0 || pos >= size) return -1;
}
if (tag->type == TSDB_DATA_TYPE_BINARY
|| tag->type == TSDB_DATA_TYPE_NCHAR
|| tag->type == TSDB_DATA_TYPE_VARBINARY
|| tag->type == TSDB_DATA_TYPE_GEOMETRY) {
if (tag->type == TSDB_DATA_TYPE_GEOMETRY && tag->length < 21) {
errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__, i);
return -1;
}
pos += snprintf(buf + pos, size - pos, "%s %s(%d)", tag->name, convertDatatypeToString(tag->type), tag->length);
} else {
pos += snprintf(buf + pos, size - pos, "%s %s", tag->name, convertDatatypeToString(tag->type));
}
if (pos <= 0 || pos >= size) return -1;
}
pos += snprintf(buf + pos, size - pos, ")");
if (pos <= 0 || pos >= size) return -1;
// comment
if (stb->comment != NULL) {
pos += snprintf(buf + pos, size - pos," COMMENT '%s'", stb->comment);
if (pos <= 0 || pos >= size) return -1;
}
// delay
if (stb->delay >= 0) {
pos += snprintf(buf + pos, size - pos, " DELAY %d", stb->delay);
if (pos <= 0 || pos >= size) return -1;
}
// file factor
if (stb->file_factor >= 0) {
pos += snprintf(buf + pos, size - pos, " FILE_FACTOR %f", stb->file_factor / 100.0);
if (pos <= 0 || pos >= size) return -1;
}
// rollup
if (stb->rollup != NULL) {
pos += snprintf(buf + pos, size - pos, " ROLLUP(%s)", stb->rollup);
if (pos <= 0 || pos >= size) return -1;
}
// max delay
if (stb->max_delay != NULL) {
pos += snprintf(buf + pos, size - pos, " MAX_DELAY %s", stb->max_delay);
if (pos <= 0 || pos >= size) return -1;
}
// watermark
if (stb->watermark != NULL) {
pos += snprintf(buf + pos, size - pos, " WATERMARK %s", stb->watermark);
if (pos <= 0 || pos >= size) return -1;
}
bool first_sma = true;
for (size_t i = 0; i < stb->cols->size; ++i) {
Field* col = benchArrayGet(stb->cols, i);
if (col->sma) {
if (first_sma) {
pos += snprintf(buf + pos, size - pos, " SMA(%s", col->name);
first_sma = false;
} else {
pos += snprintf(buf + pos, size - pos, ",%s", col->name);
}
if (pos <= 0 || pos >= size) return -1;
}
}
if (!first_sma) {
pos += snprintf(buf + pos, size - pos, ")");
if (pos <= 0 || pos >= size) return -1;
}
infoPrint("create stable: <%s>\n", buf);
return (pos > 0 && pos < size) ? pos : -1;
}
static int csvExportCreateStbSql(CsvWriteMeta* write_meta, FILE* fp) {
char buf[4096] = {};
int ret = 0;
int length = 0;
length = csvGenCreateStbSql(write_meta->db, write_meta->stb, buf, sizeof(buf));
if (length < 0) {
errorPrint("Failed to generate create stb sql, maybe buffer[%d] not enough.\n", sizeof(buf));
return -1;
}
ret = fwrite(buf, 1, length, fp);
if (ret != length) {
errorPrint("Failed to write create stb sql: %s. expected written %d but %d.\n",
buf, length, ret);
if (ferror(fp)) {
perror("error");
}
return -1;
}
return 0;
}
static int csvExportCreateSql(CsvWriteMeta* write_meta) {
char fullname[MAX_PATH_LEN] = {};
char buf[LARGE_BUFF_LEN] = {};
int ret = 0;
int length = 0;
FILE* fp = NULL;
length = snprintf(fullname, sizeof(fullname), "%s%s.txt", g_arguments->output_path, "create_stmt");
if (length <= 0 || length >= sizeof(fullname)) {
return -1;
}
FILE* fp = fopen(fullname, "w");
if (!fp) {
return -1;
}
// export db
ret = csvExportCreateDbSql(write_meta, fp);
if (ret < 0) {
goto end;
}
// export stb
ret = csvExportCreateStbSql(write_meta, fp);
if (ret < 0) {
goto end;
}
succPrint("Export create sql to file: %s successfully..\n", fullname);
end:
if (fp) {
fclose(fp);
}
return ret;
}
static int csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* write_meta) { static int csvInitWriteMeta(SDataBase* db, SSuperTable* stb, CsvWriteMeta* write_meta) {
write_meta->naming_type = csvGetFileNamingType(stb); write_meta->naming_type = csvGetFileNamingType(stb);
write_meta->total_threads = 1; write_meta->total_threads = 1;
@ -816,6 +1101,9 @@ static int csvGenStbProcess(SDataBase* db, SSuperTable* stb) {
ts_elapse / 1000.0, total_rows, write_meta->total_threads, g_arguments->output_path, total_rows * 1000.0 / ts_elapse); ts_elapse / 1000.0, total_rows, write_meta->total_threads, g_arguments->output_path, total_rows * 1000.0 / ts_elapse);
} }
// export create db/stb sql
ret = csvExportCreateSql(write_meta);
end: end:
tmfree(pids); tmfree(pids);
tmfree(args); tmfree(args);