diff --git a/importSampleData/app/main.go b/importSampleData/app/main.go index aef4133207..d714fc339c 100644 --- a/importSampleData/app/main.go +++ b/importSampleData/app/main.go @@ -18,7 +18,7 @@ import ( "sync" "time" - _ "github.com/taosdata/TDengine/src/connector/go/src/taosSql" + _ "github.com/taosdata/TDengine/src/connector/go/taosSql" ) const ( @@ -634,6 +634,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] if appendRows == batch { // executebatch insertSql := buffers.String() + connection.Exec("use " + db) affectedRows := executeBatchInsert(insertSql, connection) successRows[threadIndex] += affectedRows @@ -658,6 +659,7 @@ func insertData(threadIndex, start, end int, wg *sync.WaitGroup, successRows [] if appendRows > 0 { // executebatch insertSql := buffers.String() + connection.Exec("use " + db) affectedRows := executeBatchInsert(insertSql, connection) successRows[threadIndex] += affectedRows diff --git a/importSampleData/bin/taosimport b/importSampleData/bin/taosimport index b042549341..235fde9f06 100755 Binary files a/importSampleData/bin/taosimport and b/importSampleData/bin/taosimport differ diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 04194c6127..cf84ecad43 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -49,7 +49,7 @@ static struct argp_option options[] = { {0, 'h', "host", 0, "The host to connect to TDEngine. Default is localhost.", 0}, {0, 'p', "port", 0, "The TCP/IP port number to use for the connection. Default is 0.", 1}, {0, 'u', "user", 0, "The TDEngine user name to use when connecting to the server. Default is 'root'.", 2}, - {0, 'a', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3}, + {0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3}, {0, 'd', "database", 0, "Destination database. Default is 'test'.", 3}, {0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 3}, {0, 'M', 0, 0, "Use metric flag.", 13}, @@ -58,12 +58,15 @@ static struct argp_option options[] = { {0, 'b', "type_of_cols", 0, "The data_type of columns: 'INT', 'TINYINT', 'SMALLINT', 'BIGINT', 'FLOAT', 'DOUBLE', 'BINARY'. Default is 'INT'.", 7}, {0, 'w', "length_of_binary", 0, "The length of data_type 'BINARY'. Only applicable when type of cols is 'BINARY'. Default is 8", 8}, {0, 'l', "num_of_cols_per_record", 0, "The number of columns per record. Default is 3.", 8}, - {0, 'c', "num_of_conns", 0, "The number of connections. Default is 10.", 9}, + {0, 'T', "num_of_threads", 0, "The number of threads. Default is 10.", 9}, {0, 'r', "num_of_records_per_req", 0, "The number of records per request. Default is 1000.", 10}, {0, 't', "num_of_tables", 0, "The number of tables. Default is 10000.", 11}, {0, 'n', "num_of_records_per_table", 0, "The number of records per table. Default is 100000.", 12}, - {0, 'f', "config_directory", 0, "Configuration directory. Default is '/etc/taos/'.", 14}, + {0, 'c', "config_directory", 0, "Configuration directory. Default is '/etc/taos/'.", 14}, {0, 'x', 0, 0, "Insert only flag.", 13}, + {0, 'O', "order", 0, "Insert mode--0: In order, 1: Out of order. Default is in order.", 14}, + {0, 'R', "rate", 0, "Out of order data's rate--if order=1 Default 10, min: 0, max: 50.", 14}, + {0, 'D', "delete table", 0, "Delete data methods——0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database", 14}, {0}}; /* Used by main to communicate with parse_opt. */ @@ -81,11 +84,14 @@ typedef struct DemoArguments { char *datatype[MAX_NUM_DATATYPE]; int len_of_binary; int num_of_CPR; - int num_of_connections; + int num_of_threads; int num_of_RPR; int num_of_tables; int num_of_DPT; int abort; + int order; + int rate; + int method_of_delete; char **arg_list; } SDemoArguments; @@ -106,7 +112,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'u': arguments->user = arg; break; - case 'a': + case 'P': arguments->password = arg; break; case 'o': @@ -115,8 +121,8 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'q': arguments->mode = atoi(arg); break; - case 'c': - arguments->num_of_connections = atoi(arg); + case 'T': + arguments->num_of_threads = atoi(arg); break; case 'r': arguments->num_of_RPR = atoi(arg); @@ -176,7 +182,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'x': arguments->insert_only = true; break; - case 'f': + case 'c': if (wordexp(arg, &full_path, 0) != 0) { fprintf(stderr, "Invalid path %s\n", arg); return -1; @@ -184,6 +190,30 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { taos_options(TSDB_OPTION_CONFIGDIR, full_path.we_wordv[0]); wordfree(&full_path); break; + case 'O': + arguments->order = atoi(arg); + if (arguments->order > 1 || arguments->order < 0) + { + arguments->order = 0; + } else if (arguments->order == 1) + { + arguments->rate = 10; + } + break; + case 'R': + arguments->rate = atoi(arg); + if (arguments->order == 1 && (arguments->rate > 50 || arguments->rate <= 0)) + { + arguments->rate = 10; + } + break; + case 'D': + arguments->method_of_delete = atoi(arg); + if (arguments->method_of_delete < 0 || arguments->method_of_delete > 3) + { + arguments->method_of_delete = 0; + } + break; case OPT_ABORT: arguments->abort = 1; break; @@ -217,6 +247,8 @@ typedef struct { int ncols_per_record; int nrecords_per_table; int nrecords_per_request; + int data_of_order; + int data_of_rate; int64_t start_time; bool do_aggreFunc; @@ -236,6 +268,8 @@ typedef struct { int ncols_per_record; char **data_type; int len_of_binary; + int data_of_order; + int data_of_rate; sem_t *mutex_sem; int *notFinished; @@ -258,6 +292,8 @@ void *readMetric(void *sarg); void *syncWrite(void *sarg); +void *deleteTable(); + void *asyncWrite(void *sarg); void generateData(char *res, char **data_type, int num_of_cols, int64_t timestamp, int len_of_binary); @@ -291,11 +327,14 @@ int main(int argc, char *argv[]) { }, 8, // len_of_binary 1, // num_of_CPR - 1, // num_of_connections + 1, // num_of_connections/thread 1, // num_of_RPR 1, // num_of_tables 50000, // num_of_DPT 0, // abort + 0, // order + 0, // rate + 0, // method_of_delete NULL // arg_list }; @@ -304,7 +343,7 @@ int main(int argc, char *argv[]) { // For demo use, change default values for some parameters; arguments.num_of_tables = 10000; arguments.num_of_CPR = 3; - arguments.num_of_connections = 10; + arguments.num_of_threads = 10; arguments.num_of_DPT = 100000; arguments.num_of_RPR = 1000; arguments.use_metric = true; @@ -330,8 +369,11 @@ int main(int argc, char *argv[]) { char *tb_prefix = arguments.tb_prefix; int len_of_binary = arguments.len_of_binary; int ncols_per_record = arguments.num_of_CPR; + int order = arguments.order; + int rate = arguments.rate; + int method_of_delete = arguments.method_of_delete; int ntables = arguments.num_of_tables; - int nconnections = arguments.num_of_connections; + int threads = arguments.num_of_threads; int nrecords_per_table = arguments.num_of_DPT; int nrecords_per_request = arguments.num_of_RPR; bool use_metric = arguments.use_metric; @@ -371,12 +413,19 @@ int main(int argc, char *argv[]) { printf("# Binary Length(If applicable): %d\n", (strcasestr(dataString, "BINARY") != NULL) ? len_of_binary : -1); printf("# Number of Columns per record: %d\n", ncols_per_record); - printf("# Number of Connections: %d\n", nconnections); + printf("# Number of Threads: %d\n", threads); printf("# Number of Tables: %d\n", ntables); printf("# Number of Data per Table: %d\n", nrecords_per_table); printf("# Records/Request: %d\n", nrecords_per_request); printf("# Database name: %s\n", db_name); printf("# Table prefix: %s\n", tb_prefix); + if (order == 1) + { + printf("# Data order: %d\n", order); + printf("# Data out of order rate: %d\n", rate); + + } + printf("# Delete method: %d\n", method_of_delete); printf("# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); printf("###################################################################\n\n"); @@ -392,12 +441,18 @@ int main(int argc, char *argv[]) { fprintf(fp, "# Binary Length(If applicable): %d\n", (strcasestr(dataString, "BINARY") != NULL) ? len_of_binary : -1); fprintf(fp, "# Number of Columns per record: %d\n", ncols_per_record); - fprintf(fp, "# Number of Connections: %d\n", nconnections); + fprintf(fp, "# Number of Threads: %d\n", threads); fprintf(fp, "# Number of Tables: %d\n", ntables); fprintf(fp, "# Number of Data per Table: %d\n", nrecords_per_table); fprintf(fp, "# Records/Request: %d\n", nrecords_per_request); fprintf(fp, "# Database name: %s\n", db_name); fprintf(fp, "# Table prefix: %s\n", tb_prefix); + if (order == 1) + { + printf("# Data order: %d\n", order); + printf("# Data out of order rate: %d\n", rate); + + } fprintf(fp, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); fprintf(fp, "###################################################################\n\n"); @@ -414,7 +469,7 @@ int main(int argc, char *argv[]) { sprintf(command, "drop database %s;", db_name); taos_query(taos, command); - sleep(3); + sprintf(command, "create database %s;", db_name); taos_query(taos, command); @@ -479,22 +534,22 @@ int main(int argc, char *argv[]) { taos_close(taos); } /* Wait for table to create */ - sleep(5); + /* Insert data */ double ts = getCurrentTime(); printf("Inserting data......\n"); - pthread_t *pids = malloc(nconnections * sizeof(pthread_t)); - info *infos = malloc(nconnections * sizeof(info)); + pthread_t *pids = malloc(threads * sizeof(pthread_t)); + info *infos = malloc(threads * sizeof(info)); - int a = ntables / nconnections; + int a = ntables / threads; if (a < 1) { - nconnections = ntables; + threads = ntables; a = 1; } - int b = ntables % nconnections; + int b = ntables % threads; int last = 0; - for (int i = 0; i < nconnections; i++) { + for (int i = 0; i < threads; i++) { info *t_info = infos + i; t_info->threadID = i; strcpy(t_info->db_name, db_name); @@ -507,6 +562,8 @@ int main(int argc, char *argv[]) { t_info->len_of_binary = len_of_binary; t_info->nrecords_per_request = nrecords_per_request; t_info->start_table_id = last; + t_info->data_of_order = order; + t_info->data_of_rate = rate; t_info->end_table_id = i < b ? last + a : last + a - 1; last = t_info->end_table_id + 1; @@ -520,15 +577,15 @@ int main(int argc, char *argv[]) { pthread_create(pids + i, NULL, asyncWrite, t_info); } } - for (int i = 0; i < nconnections; i++) { + for (int i = 0; i < threads; i++) { pthread_join(pids[i], NULL); } double t = getCurrentTime() - ts; if (query_mode == SYNC) { - printf("SYNC Insert with %d connections:\n", nconnections); + printf("SYNC Insert with %d connections:\n", threads); } else { - printf("ASYNC Insert with %d connections:\n", nconnections); + printf("ASYNC Insert with %d connections:\n", threads); } fprintf(fp, "|%10.d | %10.2f | %10.2f | %10.4f |\n\n", @@ -540,7 +597,7 @@ int main(int argc, char *argv[]) { t, ntables * nrecords_per_table, nrecords_per_request, ntables * nrecords_per_table / t); - for (int i = 0; i < nconnections; i++) { + for (int i = 0; i < threads; i++) { info *t_info = infos + i; taos_close(t_info->taos); sem_destroy(&(t_info->mutex_sem)); @@ -551,6 +608,55 @@ int main(int argc, char *argv[]) { free(infos); fclose(fp); + if (method_of_delete != 0) + { + TAOS *dtaos = taos_connect(ip_addr, user, pass, db_name, port); + double dts = getCurrentTime(); + printf("Deleteing %d table(s)......\n", ntables); + + switch (method_of_delete) + { + case 1: + // delete by table + /* Create all the tables; */ + for (int i = 0; i < ntables; i++) { + sprintf(command, "drop table %s.%s%d;", db_name, tb_prefix, i); + queryDB(dtaos, command); + } + break; + case 2: + // delete by stable + if (!use_metric) { + break; + } + else + { + sprintf(command, "drop table %s.meters;", db_name); + queryDB(dtaos, command); + } + break; + case 3: + // delete by database + sprintf(command, "drop database %s;", db_name); + queryDB(dtaos, command); + break; + default: + break; + } + + printf("Table(s) droped!\n"); + taos_close(dtaos); + + double dt = getCurrentTime() - dts; + printf("Spent %.4f seconds to drop %d tables\n", dt, ntables); + + FILE *fp = fopen(arguments.output_file, "a"); + fprintf(fp, "Spent %.4f seconds to drop %d tables\n", dt, ntables); + fclose(fp); + + } + + if (!insert_only) { // query data pthread_t read_id; @@ -735,7 +841,15 @@ void *syncWrite(void *sarg) { pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, tID); int k; for (k = 0; k < winfo->nrecords_per_request;) { - generateData(data, data_type, ncols_per_record, tmp_time++, len_of_binary); + int rand_num = rand() % 100; + if (winfo->data_of_order ==1 && rand_num < winfo->data_of_rate) + { + long d = tmp_time - rand() % 1000000 + rand_num; + generateData(data, data_type, ncols_per_record, d, len_of_binary); + } else + { + generateData(data, data_type, ncols_per_record, tmp_time += 1000, len_of_binary); + } pstr += sprintf(pstr, " %s", data); inserted++; k++; @@ -774,6 +888,8 @@ void *asyncWrite(void *sarg) { tb_info->mutex_sem = &(winfo->mutex_sem); tb_info->notFinished = &(winfo->notFinished); tb_info->lock_sem = &(winfo->lock_sem); + tb_info->data_of_order = winfo->data_of_order; + tb_info->data_of_rate = winfo->data_of_rate; /* char buff[BUFFER_SIZE] = "\0"; */ /* sprintf(buff, "insert into %s values (0, 0)", tb_info->tb_name); */ @@ -815,7 +931,15 @@ void callBack(void *param, TAOS_RES *res, int code) { pstr += sprintf(pstr, "insert into %s values", tb_info->tb_name); for (int i = 0; i < tb_info->nrecords_per_request; i++) { - generateData(data, datatype, ncols_per_record, tmp_time++, len_of_binary); + int rand_num = rand() % 100; + if (tb_info->data_of_order ==1 && rand_num < tb_info->data_of_rate) + { + long d = tmp_time - rand() % 1000000 + rand_num; + generateData(data, datatype, ncols_per_record, d, len_of_binary); + } else + { + generateData(data, datatype, ncols_per_record, tmp_time += 1000, len_of_binary); + } pstr += sprintf(pstr, "%s", data); tb_info->counter++;