add async write
This commit is contained in:
parent
7cce5d8871
commit
d1c9131772
|
@ -475,6 +475,7 @@ typedef struct {
|
|||
tsem_t mutex_sem;
|
||||
int notFinished;
|
||||
tsem_t lock_sem;
|
||||
int counter;
|
||||
} info;
|
||||
|
||||
typedef struct {
|
||||
|
@ -766,6 +767,7 @@ int main(int argc, char *argv[]) {
|
|||
t_info->data_of_rate = rate;
|
||||
t_info->end_table_id = i < b ? last + a : last + a - 1;
|
||||
last = t_info->end_table_id + 1;
|
||||
t_info->counter = 0;
|
||||
|
||||
tsem_init(&(t_info->mutex_sem), 0, 1);
|
||||
t_info->notFinished = t_info->end_table_id - t_info->start_table_id + 1;
|
||||
|
@ -793,9 +795,9 @@ int main(int argc, char *argv[]) {
|
|||
(ntables * nrecords_per_table) / (t * nrecords_per_request),
|
||||
t * 1000);
|
||||
|
||||
printf("Spent %.4f seconds to insert %lld records with %d record(s) per request: %.2f records/second\n",
|
||||
t, (long long int)ntables * nrecords_per_table, nrecords_per_request,
|
||||
((long long int)ntables * nrecords_per_table) / t);
|
||||
printf("Spent %.4f seconds to insert %d records with %d record(s) per request: %.2f records/second\n",
|
||||
t, ntables * nrecords_per_table, nrecords_per_request,
|
||||
ntables * nrecords_per_table / t);
|
||||
|
||||
for (int i = 0; i < threads; i++) {
|
||||
info *t_info = infos + i;
|
||||
|
@ -955,7 +957,7 @@ void querySqlFile(TAOS* taos, char* sqlFile)
|
|||
|
||||
double t = getCurrentTime();
|
||||
|
||||
while ((read_len = tgetline(&line, &line_len, fp)) != -1) {
|
||||
while ((read_len = taosGetline(&line, &line_len, fp)) != -1) {
|
||||
if (read_len >= MAX_SQL_SIZE) continue;
|
||||
line[--read_len] = '\0';
|
||||
|
||||
|
@ -1283,68 +1285,37 @@ void *syncWrite(void *sarg) {
|
|||
|
||||
void *asyncWrite(void *sarg) {
|
||||
info *winfo = (info *)sarg;
|
||||
|
||||
sTable *tb_infos = (sTable *)malloc(sizeof(sTable) * (winfo->end_table_id - winfo->start_table_id + 1));
|
||||
|
||||
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
|
||||
sTable *tb_info = tb_infos + tID - winfo->start_table_id;
|
||||
tb_info->data_type = winfo->datatype;
|
||||
tb_info->ncols_per_record = winfo->ncols_per_record;
|
||||
tb_info->taos = winfo->taos;
|
||||
sprintf(tb_info->tb_name, "%s.%s%d", winfo->db_name, winfo->tb_prefix, tID);
|
||||
tb_info->timestamp = winfo->start_time;
|
||||
tb_info->counter = 0;
|
||||
tb_info->target = winfo->nrecords_per_table;
|
||||
tb_info->len_of_binary = winfo->len_of_binary;
|
||||
tb_info->nrecords_per_request = winfo->nrecords_per_request;
|
||||
tb_info->mutex_sem = &(winfo->mutex_sem);
|
||||
tb_info->notFinished = &(winfo->notFinished);
|
||||
tb_info->lock_sem = &(winfo->lock_sem);
|
||||
tb_info->data_of_order = winfo->data_of_order;
|
||||
tb_info->data_of_rate = winfo->data_of_rate;
|
||||
|
||||
/* char buff[BUFFER_SIZE] = "\0"; */
|
||||
/* sprintf(buff, "insert into %s values (0, 0)", tb_info->tb_name); */
|
||||
/* queryDB(tb_info->taos,buff); */
|
||||
|
||||
taos_query_a(winfo->taos, "show databases", callBack, tb_info);
|
||||
}
|
||||
taos_query_a(winfo->taos, "show databases", callBack, winfo);
|
||||
|
||||
tsem_wait(&(winfo->lock_sem));
|
||||
free(tb_infos);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void callBack(void *param, TAOS_RES *res, int code) {
|
||||
sTable *tb_info = (sTable *)param;
|
||||
char **datatype = tb_info->data_type;
|
||||
int ncols_per_record = tb_info->ncols_per_record;
|
||||
int len_of_binary = tb_info->len_of_binary;
|
||||
int64_t tmp_time = tb_info->timestamp;
|
||||
info* winfo = (info*)param;
|
||||
char **datatype = winfo->datatype;
|
||||
int ncols_per_record = winfo->ncols_per_record;
|
||||
int len_of_binary = winfo->len_of_binary;
|
||||
|
||||
if (code < 0) {
|
||||
fprintf(stderr, "failed to insert data %d:reason; %s\n", code, taos_errstr(res));
|
||||
exit(EXIT_FAILURE);
|
||||
int64_t tmp_time = winfo->start_time;
|
||||
char *buffer = calloc(1, BUFFER_SIZE);
|
||||
char *data = calloc(1, MAX_DATA_SIZE);
|
||||
char *pstr = buffer;
|
||||
pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id);
|
||||
if (winfo->counter >= winfo->nrecords_per_table) {
|
||||
winfo->start_table_id++;
|
||||
winfo->counter = 0;
|
||||
}
|
||||
|
||||
// If finished;
|
||||
if (tb_info->counter >= tb_info->target) {
|
||||
tsem_wait(tb_info->mutex_sem);
|
||||
(*(tb_info->notFinished))--;
|
||||
if (*(tb_info->notFinished) == 0) tsem_post(tb_info->lock_sem);
|
||||
tsem_post(tb_info->mutex_sem);
|
||||
if (winfo->start_table_id > winfo->end_table_id) {
|
||||
tsem_post(&winfo->lock_sem);
|
||||
taos_free_result(res);
|
||||
return;
|
||||
}
|
||||
|
||||
char buffer[BUFFER_SIZE] = "\0";
|
||||
char data[MAX_DATA_SIZE];
|
||||
char *pstr = buffer;
|
||||
pstr += sprintf(pstr, "insert into %s values", tb_info->tb_name);
|
||||
|
||||
for (int i = 0; i < tb_info->nrecords_per_request; i++) {
|
||||
for (int i = 0; i < winfo->nrecords_per_request; i++) {
|
||||
int rand_num = rand() % 100;
|
||||
if (tb_info->data_of_order ==1 && rand_num < tb_info->data_of_rate)
|
||||
if (winfo->data_of_order ==1 && rand_num < winfo->data_of_rate)
|
||||
{
|
||||
int64_t d = tmp_time - rand() % 1000000 + rand_num;
|
||||
generateData(data, datatype, ncols_per_record, d, len_of_binary);
|
||||
|
@ -1353,15 +1324,15 @@ void callBack(void *param, TAOS_RES *res, int code) {
|
|||
generateData(data, datatype, ncols_per_record, tmp_time += 1000, len_of_binary);
|
||||
}
|
||||
pstr += sprintf(pstr, "%s", data);
|
||||
tb_info->counter++;
|
||||
winfo->counter++;
|
||||
|
||||
if (tb_info->counter >= tb_info->target) {
|
||||
if (winfo->counter >= winfo->nrecords_per_table) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
tb_info->timestamp = tmp_time;
|
||||
|
||||
taos_query_a(tb_info->taos, buffer, callBack, tb_info);
|
||||
taos_query_a(winfo->taos, buffer, callBack, winfo);
|
||||
free(buffer);
|
||||
free(data);
|
||||
|
||||
taos_free_result(res);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue