[TD-2551]
This commit is contained in:
parent
5daeac0f00
commit
24dc1ce14b
|
@ -205,10 +205,10 @@ typedef struct DemoArguments {
|
||||||
arguments->tb_prefix = arg;
|
arguments->tb_prefix = arg;
|
||||||
break;
|
break;
|
||||||
case 'M':
|
case 'M':
|
||||||
arguments->use_metric = false;
|
arguments->use_metric = true;
|
||||||
break;
|
break;
|
||||||
case 'x':
|
case 'x':
|
||||||
arguments->insert_only = false;
|
arguments->insert_only = true;
|
||||||
break;
|
break;
|
||||||
case 'c':
|
case 'c':
|
||||||
if (wordexp(arg, &full_path, 0) != 0) {
|
if (wordexp(arg, &full_path, 0) != 0) {
|
||||||
|
@ -406,9 +406,9 @@ typedef struct DemoArguments {
|
||||||
} else if (strcmp(argv[i], "-m") == 0) {
|
} else if (strcmp(argv[i], "-m") == 0) {
|
||||||
arguments->tb_prefix = argv[++i];
|
arguments->tb_prefix = argv[++i];
|
||||||
} else if (strcmp(argv[i], "-M") == 0) {
|
} else if (strcmp(argv[i], "-M") == 0) {
|
||||||
arguments->use_metric = false;
|
arguments->use_metric = true;
|
||||||
} else if (strcmp(argv[i], "-x") == 0) {
|
} else if (strcmp(argv[i], "-x") == 0) {
|
||||||
arguments->insert_only = false;
|
arguments->insert_only = true;
|
||||||
} else if (strcmp(argv[i], "-c") == 0) {
|
} else if (strcmp(argv[i], "-c") == 0) {
|
||||||
strcpy(configDir, argv[++i]);
|
strcpy(configDir, argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-O") == 0) {
|
} else if (strcmp(argv[i], "-O") == 0) {
|
||||||
|
@ -476,6 +476,14 @@ typedef struct {
|
||||||
int notFinished;
|
int notFinished;
|
||||||
tsem_t lock_sem;
|
tsem_t lock_sem;
|
||||||
int counter;
|
int counter;
|
||||||
|
|
||||||
|
// insert delay statitics
|
||||||
|
int64_t cntDelay;
|
||||||
|
int64_t totalDelay;
|
||||||
|
int64_t avgDelay;
|
||||||
|
int64_t maxDelay;
|
||||||
|
int64_t minDelay;
|
||||||
|
|
||||||
} info;
|
} info;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -575,7 +583,7 @@ int main(int argc, char *argv[]) {
|
||||||
arguments.num_of_DPT = 100000;
|
arguments.num_of_DPT = 100000;
|
||||||
arguments.num_of_RPR = 1000;
|
arguments.num_of_RPR = 1000;
|
||||||
arguments.use_metric = true;
|
arguments.use_metric = true;
|
||||||
arguments.insert_only = true;
|
arguments.insert_only = false;
|
||||||
// end change
|
// end change
|
||||||
|
|
||||||
parse_args(argc, argv, &arguments);
|
parse_args(argc, argv, &arguments);
|
||||||
|
@ -740,6 +748,9 @@ int main(int argc, char *argv[]) {
|
||||||
pthread_t *pids = malloc(threads * sizeof(pthread_t));
|
pthread_t *pids = malloc(threads * sizeof(pthread_t));
|
||||||
info *infos = malloc(threads * sizeof(info));
|
info *infos = malloc(threads * sizeof(info));
|
||||||
|
|
||||||
|
memset(pids, 0, threads * sizeof(pthread_t));
|
||||||
|
memset(infos, 0, threads * sizeof(info));
|
||||||
|
|
||||||
int a = ntables / threads;
|
int a = ntables / threads;
|
||||||
if (a < 1) {
|
if (a < 1) {
|
||||||
threads = ntables;
|
threads = ntables;
|
||||||
|
@ -768,6 +779,7 @@ int main(int argc, char *argv[]) {
|
||||||
t_info->end_table_id = i < b ? last + a : last + a - 1;
|
t_info->end_table_id = i < b ? last + a : last + a - 1;
|
||||||
last = t_info->end_table_id + 1;
|
last = t_info->end_table_id + 1;
|
||||||
t_info->counter = 0;
|
t_info->counter = 0;
|
||||||
|
t_info->minDelay = INT16_MAX;
|
||||||
|
|
||||||
tsem_init(&(t_info->mutex_sem), 0, 1);
|
tsem_init(&(t_info->mutex_sem), 0, 1);
|
||||||
t_info->notFinished = t_info->end_table_id - t_info->start_table_id + 1;
|
t_info->notFinished = t_info->end_table_id - t_info->start_table_id + 1;
|
||||||
|
@ -799,12 +811,29 @@ int main(int argc, char *argv[]) {
|
||||||
t, (int64_t)ntables * nrecords_per_table, nrecords_per_request,
|
t, (int64_t)ntables * nrecords_per_table, nrecords_per_request,
|
||||||
(int64_t)ntables * nrecords_per_table / t);
|
(int64_t)ntables * nrecords_per_table / t);
|
||||||
|
|
||||||
|
int64_t totalDelay = 0;
|
||||||
|
int64_t maxDelay = 0;
|
||||||
|
int64_t minDelay = INT16_MAX;
|
||||||
|
int64_t cntDelay = 0;
|
||||||
|
double avgDelay = 0;
|
||||||
for (int i = 0; i < threads; i++) {
|
for (int i = 0; i < threads; i++) {
|
||||||
info *t_info = infos + i;
|
info *t_info = infos + i;
|
||||||
taos_close(t_info->taos);
|
taos_close(t_info->taos);
|
||||||
tsem_destroy(&(t_info->mutex_sem));
|
tsem_destroy(&(t_info->mutex_sem));
|
||||||
tsem_destroy(&(t_info->lock_sem));
|
tsem_destroy(&(t_info->lock_sem));
|
||||||
|
|
||||||
|
totalDelay += t_info->totalDelay;
|
||||||
|
cntDelay += t_info->cntDelay;
|
||||||
|
if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay;
|
||||||
|
if (t_info->minDelay < minDelay) minDelay = t_info->minDelay;
|
||||||
}
|
}
|
||||||
|
avgDelay = (double)totalDelay / cntDelay;
|
||||||
|
|
||||||
|
fprintf(fp, "insert delay, avg:%10.6fms, max: %10.6fms, min: %10.6fms\n\n",
|
||||||
|
avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0);
|
||||||
|
|
||||||
|
printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n",
|
||||||
|
avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0);
|
||||||
|
|
||||||
free(pids);
|
free(pids);
|
||||||
free(infos);
|
free(infos);
|
||||||
|
@ -859,7 +888,7 @@ int main(int argc, char *argv[]) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (!insert_only) {
|
if (false == insert_only) {
|
||||||
// query data
|
// query data
|
||||||
pthread_t read_id;
|
pthread_t read_id;
|
||||||
info *rInfo = malloc(sizeof(info));
|
info *rInfo = malloc(sizeof(info));
|
||||||
|
@ -998,7 +1027,7 @@ void * createTable(void *sarg)
|
||||||
/* Create all the tables; */
|
/* Create all the tables; */
|
||||||
printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
|
printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
|
||||||
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
|
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
|
||||||
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", winfo->db_name, winfo->tb_prefix, i, winfo->cols);
|
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s);", winfo->db_name, winfo->tb_prefix, i, winfo->cols);
|
||||||
queryDB(winfo->taos, command);
|
queryDB(winfo->taos, command);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1204,6 +1233,41 @@ void *readMetric(void *sarg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int queryDbExec(TAOS *taos, char *command, int type) {
|
||||||
|
int i;
|
||||||
|
TAOS_RES *res = NULL;
|
||||||
|
int32_t code = -1;
|
||||||
|
|
||||||
|
for (i = 0; i < 5; i++) {
|
||||||
|
if (NULL != res) {
|
||||||
|
taos_free_result(res);
|
||||||
|
res = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
res = taos_query(taos, command);
|
||||||
|
code = taos_errno(res);
|
||||||
|
if (0 == code) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res));
|
||||||
|
taos_free_result(res);
|
||||||
|
//taos_close(taos);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (1 == type) {
|
||||||
|
int affectedRows = taos_affected_rows(res);
|
||||||
|
taos_free_result(res);
|
||||||
|
return affectedRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_free_result(res);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void queryDB(TAOS *taos, char *command) {
|
void queryDB(TAOS *taos, char *command) {
|
||||||
int i;
|
int i;
|
||||||
TAOS_RES *pSql = NULL;
|
TAOS_RES *pSql = NULL;
|
||||||
|
@ -1273,7 +1337,21 @@ void *syncWrite(void *sarg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* puts(buffer); */
|
/* puts(buffer); */
|
||||||
queryDB(winfo->taos, buffer);
|
int64_t startTs;
|
||||||
|
int64_t endTs;
|
||||||
|
startTs = taosGetTimestampUs();
|
||||||
|
//queryDB(winfo->taos, buffer);
|
||||||
|
int affectedRows = queryDbExec(winfo->taos, buffer, 1);
|
||||||
|
|
||||||
|
if (0 <= affectedRows){
|
||||||
|
endTs = taosGetTimestampUs();
|
||||||
|
int64_t delay = endTs - startTs;
|
||||||
|
if (delay > winfo->maxDelay) winfo->maxDelay = delay;
|
||||||
|
if (delay < winfo->minDelay) winfo->minDelay = delay;
|
||||||
|
winfo->cntDelay++;
|
||||||
|
winfo->totalDelay += delay;
|
||||||
|
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
|
||||||
|
}
|
||||||
|
|
||||||
if (tID == winfo->end_table_id) {
|
if (tID == winfo->end_table_id) {
|
||||||
i = inserted;
|
i = inserted;
|
||||||
|
|
Loading…
Reference in New Issue