commit
5890091582
|
@ -32,6 +32,7 @@
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <wordexp.h>
|
#include <wordexp.h>
|
||||||
|
#include <regex.h>
|
||||||
|
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
@ -54,6 +55,7 @@ static struct argp_option options[] = {
|
||||||
{0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3},
|
{0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3},
|
||||||
{0, 'd', "database", 0, "Destination database. Default is 'test'.", 3},
|
{0, 'd', "database", 0, "Destination database. Default is 'test'.", 3},
|
||||||
{0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 3},
|
{0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 3},
|
||||||
|
{0, 's', "sql file", 0, "The select sql file.", 3},
|
||||||
{0, 'M', 0, 0, "Use metric flag.", 13},
|
{0, 'M', 0, 0, "Use metric flag.", 13},
|
||||||
{0, 'o', "outputfile", 0, "Direct output to the named file. Default is './output.txt'.", 14},
|
{0, 'o', "outputfile", 0, "Direct output to the named file. Default is './output.txt'.", 14},
|
||||||
{0, 'q', "query_mode", 0, "Query mode--0: SYNC, 1: ASYNC. Default is SYNC.", 6},
|
{0, 'q', "query_mode", 0, "Query mode--0: SYNC, 1: ASYNC. Default is SYNC.", 6},
|
||||||
|
@ -79,6 +81,7 @@ typedef struct DemoArguments {
|
||||||
char *password;
|
char *password;
|
||||||
char *database;
|
char *database;
|
||||||
char *tb_prefix;
|
char *tb_prefix;
|
||||||
|
char *sqlFile;
|
||||||
bool use_metric;
|
bool use_metric;
|
||||||
bool insert_only;
|
bool insert_only;
|
||||||
char *output_file;
|
char *output_file;
|
||||||
|
@ -120,6 +123,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
|
||||||
case 'o':
|
case 'o':
|
||||||
arguments->output_file = arg;
|
arguments->output_file = arg;
|
||||||
break;
|
break;
|
||||||
|
case 's':
|
||||||
|
arguments->sqlFile = arg;
|
||||||
|
break;
|
||||||
case 'q':
|
case 'q':
|
||||||
arguments->mode = atoi(arg);
|
arguments->mode = atoi(arg);
|
||||||
break;
|
break;
|
||||||
|
@ -179,10 +185,10 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
|
||||||
arguments->tb_prefix = arg;
|
arguments->tb_prefix = arg;
|
||||||
break;
|
break;
|
||||||
case 'M':
|
case 'M':
|
||||||
arguments->use_metric = true;
|
arguments->use_metric = false;
|
||||||
break;
|
break;
|
||||||
case 'x':
|
case 'x':
|
||||||
arguments->insert_only = true;
|
arguments->insert_only = false;
|
||||||
break;
|
break;
|
||||||
case 'c':
|
case 'c':
|
||||||
if (wordexp(arg, &full_path, 0) != 0) {
|
if (wordexp(arg, &full_path, 0) != 0) {
|
||||||
|
@ -254,6 +260,9 @@ typedef struct {
|
||||||
int64_t start_time;
|
int64_t start_time;
|
||||||
bool do_aggreFunc;
|
bool do_aggreFunc;
|
||||||
|
|
||||||
|
char* cols;
|
||||||
|
bool use_metric;
|
||||||
|
|
||||||
sem_t mutex_sem;
|
sem_t mutex_sem;
|
||||||
int notFinished;
|
int notFinished;
|
||||||
sem_t lock_sem;
|
sem_t lock_sem;
|
||||||
|
@ -305,6 +314,8 @@ void rand_string(char *str, int size);
|
||||||
double getCurrentTime();
|
double getCurrentTime();
|
||||||
|
|
||||||
void callBack(void *param, TAOS_RES *res, int code);
|
void callBack(void *param, TAOS_RES *res, int code);
|
||||||
|
void multiThreadCreateTable(char* cols, bool use_metric, int threads, int ntables, char* db_name, char* tb_prefix, char *ip_addr, uint16_t port, char *user, char *pass);
|
||||||
|
void querySqlFile(TAOS* taos, char* sqlFile);
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
SDemoArguments arguments = { NULL, // host
|
SDemoArguments arguments = { NULL, // host
|
||||||
|
@ -313,6 +324,7 @@ int main(int argc, char *argv[]) {
|
||||||
"taosdata", // password
|
"taosdata", // password
|
||||||
"test", // database
|
"test", // database
|
||||||
"t", // tb_prefix
|
"t", // tb_prefix
|
||||||
|
NULL,
|
||||||
false, // use_metric
|
false, // use_metric
|
||||||
false, // insert_only
|
false, // insert_only
|
||||||
"./output.txt", // output_file
|
"./output.txt", // output_file
|
||||||
|
@ -385,6 +397,13 @@ int main(int argc, char *argv[]) {
|
||||||
char dataString[STRING_LEN];
|
char dataString[STRING_LEN];
|
||||||
bool do_aggreFunc = true;
|
bool do_aggreFunc = true;
|
||||||
|
|
||||||
|
if (NULL != arguments.sqlFile) {
|
||||||
|
TAOS* qtaos = taos_connect(ip_addr, user, pass, db_name, port);
|
||||||
|
querySqlFile(qtaos, arguments.sqlFile);
|
||||||
|
taos_close(qtaos);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
memset(dataString, 0, STRING_LEN);
|
memset(dataString, 0, STRING_LEN);
|
||||||
int len = 0;
|
int len = 0;
|
||||||
|
|
||||||
|
@ -495,46 +514,18 @@ int main(int argc, char *argv[]) {
|
||||||
len += snprintf(cols + len, STRING_LEN - len, ",f%d %s(%d))", colIndex + 1, data_type[colIndex % count_data_type], len_of_binary);
|
len += snprintf(cols + len, STRING_LEN - len, ",f%d %s(%d))", colIndex + 1, data_type[colIndex % count_data_type], len_of_binary);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!use_metric) {
|
if (use_metric) {
|
||||||
/* Create all the tables; */
|
|
||||||
printf("Creating %d table(s)......\n", ntables);
|
|
||||||
for (int i = 0; i < ntables; i++) {
|
|
||||||
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", db_name, tb_prefix, i, cols);
|
|
||||||
queryDB(taos, command);
|
|
||||||
}
|
|
||||||
|
|
||||||
printf("Table(s) created!\n");
|
|
||||||
taos_close(taos);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
/* Create metric table */
|
/* Create metric table */
|
||||||
printf("Creating meters super table...\n");
|
printf("Creating meters super table...\n");
|
||||||
snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols);
|
snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols);
|
||||||
queryDB(taos, command);
|
queryDB(taos, command);
|
||||||
printf("meters created!\n");
|
printf("meters created!\n");
|
||||||
|
|
||||||
/* Create all the tables; */
|
|
||||||
printf("Creating %d table(s)......\n", ntables);
|
|
||||||
for (int i = 0; i < ntables; i++) {
|
|
||||||
int j;
|
|
||||||
if (i % 10 == 0) {
|
|
||||||
j = 10;
|
|
||||||
} else {
|
|
||||||
j = i % 10;
|
|
||||||
}
|
|
||||||
if (j % 2 == 0) {
|
|
||||||
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "shanghai");
|
|
||||||
} else {
|
|
||||||
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "beijing");
|
|
||||||
}
|
|
||||||
queryDB(taos, command);
|
|
||||||
}
|
|
||||||
|
|
||||||
printf("Table(s) created!\n");
|
|
||||||
taos_close(taos);
|
taos_close(taos);
|
||||||
}
|
}
|
||||||
/* Wait for table to create */
|
|
||||||
|
|
||||||
|
/* Wait for table to create */
|
||||||
|
multiThreadCreateTable(cols, use_metric, threads, ntables, db_name, tb_prefix, ip_addr, port, user, pass);
|
||||||
|
|
||||||
/* Insert data */
|
/* Insert data */
|
||||||
double ts = getCurrentTime();
|
double ts = getCurrentTime();
|
||||||
|
@ -685,6 +676,198 @@ int main(int argc, char *argv[]) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define MAX_SQL_SIZE 65536
|
||||||
|
void selectSql(TAOS* taos, char* sqlcmd)
|
||||||
|
{
|
||||||
|
TAOS_RES *pSql = taos_query(taos, sqlcmd);
|
||||||
|
int32_t code = taos_errno(pSql);
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
printf("Failed to sqlcmd:%s, reason:%s\n", sqlcmd, taos_errstr(pSql));
|
||||||
|
taos_free_result(pSql);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while (taos_fetch_row(pSql) != NULL) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_free_result(pSql);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* Function to do regular expression check */
|
||||||
|
static int regexMatch(const char *s, const char *reg, int cflags) {
|
||||||
|
regex_t regex;
|
||||||
|
char msgbuf[100] = {0};
|
||||||
|
|
||||||
|
/* Compile regular expression */
|
||||||
|
if (regcomp(®ex, reg, cflags) != 0) {
|
||||||
|
printf("Fail to compile regex\n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Execute regular expression */
|
||||||
|
int reti = regexec(®ex, s, 0, NULL, 0);
|
||||||
|
if (!reti) {
|
||||||
|
regfree(®ex);
|
||||||
|
return 1;
|
||||||
|
} else if (reti == REG_NOMATCH) {
|
||||||
|
regfree(®ex);
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
regerror(reti, ®ex, msgbuf, sizeof(msgbuf));
|
||||||
|
printf("Regex match failed: %s\n", msgbuf);
|
||||||
|
regfree(®ex);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int isCommentLine(char *line) {
|
||||||
|
if (line == NULL) return 1;
|
||||||
|
|
||||||
|
return regexMatch(line, "^\\s*#.*", REG_EXTENDED);
|
||||||
|
}
|
||||||
|
|
||||||
|
void querySqlFile(TAOS* taos, char* sqlFile)
|
||||||
|
{
|
||||||
|
FILE *fp = fopen(sqlFile, "r");
|
||||||
|
if (fp == NULL) {
|
||||||
|
printf("failed to open file %s, reason:%s\n", sqlFile, strerror(errno));
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int read_len = 0;
|
||||||
|
char * cmd = calloc(1, MAX_SQL_SIZE);
|
||||||
|
size_t cmd_len = 0;
|
||||||
|
char * line = NULL;
|
||||||
|
size_t line_len = 0;
|
||||||
|
|
||||||
|
double t = getCurrentTime();
|
||||||
|
|
||||||
|
while ((read_len = getline(&line, &line_len, fp)) != -1) {
|
||||||
|
if (read_len >= MAX_SQL_SIZE) continue;
|
||||||
|
line[--read_len] = '\0';
|
||||||
|
|
||||||
|
if (read_len == 0 || isCommentLine(line)) { // line starts with #
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (line[read_len - 1] == '\\') {
|
||||||
|
line[read_len - 1] = ' ';
|
||||||
|
memcpy(cmd + cmd_len, line, read_len);
|
||||||
|
cmd_len += read_len;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(cmd + cmd_len, line, read_len);
|
||||||
|
selectSql(taos, cmd);
|
||||||
|
memset(cmd, 0, MAX_SQL_SIZE);
|
||||||
|
cmd_len = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
t = getCurrentTime() - t;
|
||||||
|
printf("run %s took %.6f second(s)\n\n", sqlFile, t);
|
||||||
|
|
||||||
|
free(cmd);
|
||||||
|
if (line) free(line);
|
||||||
|
fclose(fp);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
void * createTable(void *sarg)
|
||||||
|
{
|
||||||
|
char command[BUFFER_SIZE] = "\0";
|
||||||
|
|
||||||
|
info *winfo = (info *)sarg;
|
||||||
|
|
||||||
|
if (!winfo->use_metric) {
|
||||||
|
/* Create all the tables; */
|
||||||
|
printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
|
||||||
|
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
|
||||||
|
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", winfo->db_name, winfo->tb_prefix, i, winfo->cols);
|
||||||
|
queryDB(winfo->taos, command);
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_close(winfo->taos);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
/* Create all the tables; */
|
||||||
|
printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
|
||||||
|
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
|
||||||
|
int j;
|
||||||
|
if (i % 10 == 0) {
|
||||||
|
j = 10;
|
||||||
|
} else {
|
||||||
|
j = i % 10;
|
||||||
|
}
|
||||||
|
if (j % 2 == 0) {
|
||||||
|
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", winfo->db_name, winfo->tb_prefix, i, winfo->db_name, j, "shanghai");
|
||||||
|
} else {
|
||||||
|
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", winfo->db_name, winfo->tb_prefix, i, winfo->db_name, j, "beijing");
|
||||||
|
}
|
||||||
|
queryDB(winfo->taos, command);
|
||||||
|
}
|
||||||
|
taos_close(winfo->taos);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void multiThreadCreateTable(char* cols, bool use_metric, int threads, int ntables, char* db_name, char* tb_prefix, char *ip_addr, uint16_t port, char *user, char *pass) {
|
||||||
|
double ts = getCurrentTime();
|
||||||
|
printf("create table......\n");
|
||||||
|
pthread_t *pids = malloc(threads * sizeof(pthread_t));
|
||||||
|
info *infos = malloc(threads * sizeof(info));
|
||||||
|
|
||||||
|
int a = ntables / threads;
|
||||||
|
if (a < 1) {
|
||||||
|
threads = ntables;
|
||||||
|
a = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int b = 0;
|
||||||
|
if (threads != 0)
|
||||||
|
b = ntables % threads;
|
||||||
|
int last = 0;
|
||||||
|
for (int i = 0; i < threads; i++) {
|
||||||
|
info *t_info = infos + i;
|
||||||
|
t_info->threadID = i;
|
||||||
|
tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE);
|
||||||
|
tstrncpy(t_info->tb_prefix, tb_prefix, MAX_TB_NAME_SIZE);
|
||||||
|
t_info->taos = taos_connect(ip_addr, user, pass, db_name, port);
|
||||||
|
t_info->start_table_id = last;
|
||||||
|
t_info->end_table_id = i < b ? last + a : last + a - 1;
|
||||||
|
last = t_info->end_table_id + 1;
|
||||||
|
t_info->use_metric = use_metric;
|
||||||
|
t_info->cols = cols;
|
||||||
|
pthread_create(pids + i, NULL, createTable, t_info);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < threads; i++) {
|
||||||
|
pthread_join(pids[i], NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
double t = getCurrentTime() - ts;
|
||||||
|
printf("Spent %.4f seconds to create %d tables with %d connections\n", t, ntables, threads);
|
||||||
|
|
||||||
|
for (int i = 0; i < threads; i++) {
|
||||||
|
info *t_info = infos + i;
|
||||||
|
taos_close(t_info->taos);
|
||||||
|
sem_destroy(&(t_info->mutex_sem));
|
||||||
|
sem_destroy(&(t_info->lock_sem));
|
||||||
|
}
|
||||||
|
|
||||||
|
free(pids);
|
||||||
|
free(infos);
|
||||||
|
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
|
||||||
void *readTable(void *sarg) {
|
void *readTable(void *sarg) {
|
||||||
info *rinfo = (info *)sarg;
|
info *rinfo = (info *)sarg;
|
||||||
TAOS *taos = rinfo->taos;
|
TAOS *taos = rinfo->taos;
|
||||||
|
|
Loading…
Reference in New Issue