[TD-3192] <feature>: support stb limit and offset. refactoring.
stb and tb insert works.
This commit is contained in:
parent
b17b5edc60
commit
1489c458f0
|
@ -764,6 +764,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
|
||||||
printf("# Insertion interval: %d\n", arguments->insert_interval);
|
printf("# Insertion interval: %d\n", arguments->insert_interval);
|
||||||
printf("# Number of records per req: %d\n", arguments->num_of_RPR);
|
printf("# Number of records per req: %d\n", arguments->num_of_RPR);
|
||||||
printf("# Max SQL length: %d\n", arguments->max_sql_len);
|
printf("# Max SQL length: %d\n", arguments->max_sql_len);
|
||||||
|
printf("# Length of Binary: %d\n", arguments->len_of_binary);
|
||||||
printf("# Number of Threads: %d\n", arguments->num_of_threads);
|
printf("# Number of Threads: %d\n", arguments->num_of_threads);
|
||||||
printf("# Number of Tables: %d\n", arguments->num_of_tables);
|
printf("# Number of Tables: %d\n", arguments->num_of_tables);
|
||||||
printf("# Number of Data per Table: %d\n", arguments->num_of_DPT);
|
printf("# Number of Data per Table: %d\n", arguments->num_of_DPT);
|
||||||
|
@ -3997,7 +3998,7 @@ static void syncWriteForNumberOfTblInOneSql(
|
||||||
for (int i = 0; i < superTblInfo->insertRows;) {
|
for (int i = 0; i < superTblInfo->insertRows;) {
|
||||||
int32_t tbl_id = 0;
|
int32_t tbl_id = 0;
|
||||||
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) {
|
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) {
|
||||||
int64_t tmp_time = 0;
|
int64_t start_time = 0;
|
||||||
int inserted = i;
|
int inserted = i;
|
||||||
|
|
||||||
for (int k = 0; k < g_args.num_of_RPR;) {
|
for (int k = 0; k < g_args.num_of_RPR;) {
|
||||||
|
@ -4078,14 +4079,14 @@ static void syncWriteForNumberOfTblInOneSql(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tmp_time = time_counter;
|
start_time = time_counter;
|
||||||
for (int j = 0; j < superTblInfo->rowsPerTbl;) {
|
for (int j = 0; j < superTblInfo->rowsPerTbl;) {
|
||||||
int retLen = 0;
|
int retLen = 0;
|
||||||
if (0 == strncasecmp(superTblInfo->dataSource,
|
if (0 == strncasecmp(superTblInfo->dataSource,
|
||||||
"sample", strlen("sample"))) {
|
"sample", strlen("sample"))) {
|
||||||
retLen = getRowDataFromSample(pstr + len,
|
retLen = getRowDataFromSample(pstr + len,
|
||||||
superTblInfo->maxSqlLen - len,
|
superTblInfo->maxSqlLen - len,
|
||||||
tmp_time += superTblInfo->timeStampStep,
|
start_time += superTblInfo->timeStampStep,
|
||||||
superTblInfo,
|
superTblInfo,
|
||||||
&sampleUsePos);
|
&sampleUsePos);
|
||||||
if (retLen < 0) {
|
if (retLen < 0) {
|
||||||
|
@ -4096,7 +4097,7 @@ static void syncWriteForNumberOfTblInOneSql(
|
||||||
int rand_num = rand_tinyint() % 100;
|
int rand_num = rand_tinyint() % 100;
|
||||||
if (0 != superTblInfo->disorderRatio
|
if (0 != superTblInfo->disorderRatio
|
||||||
&& rand_num < superTblInfo->disorderRatio) {
|
&& rand_num < superTblInfo->disorderRatio) {
|
||||||
int64_t d = tmp_time - rand() % superTblInfo->disorderRange;
|
int64_t d = start_time - rand() % superTblInfo->disorderRange;
|
||||||
retLen = generateRowData(pstr + len,
|
retLen = generateRowData(pstr + len,
|
||||||
superTblInfo->maxSqlLen - len,
|
superTblInfo->maxSqlLen - len,
|
||||||
d,
|
d,
|
||||||
|
@ -4104,7 +4105,7 @@ static void syncWriteForNumberOfTblInOneSql(
|
||||||
} else {
|
} else {
|
||||||
retLen = generateRowData(pstr + len,
|
retLen = generateRowData(pstr + len,
|
||||||
superTblInfo->maxSqlLen - len,
|
superTblInfo->maxSqlLen - len,
|
||||||
tmp_time += superTblInfo->timeStampStep,
|
start_time += superTblInfo->timeStampStep,
|
||||||
superTblInfo);
|
superTblInfo);
|
||||||
}
|
}
|
||||||
if (retLen < 0) {
|
if (retLen < 0) {
|
||||||
|
@ -4200,7 +4201,7 @@ send_to_server:
|
||||||
samplePos = sampleUsePos;
|
samplePos = sampleUsePos;
|
||||||
}
|
}
|
||||||
i = inserted;
|
i = inserted;
|
||||||
time_counter = tmp_time;
|
time_counter = start_time;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4216,7 +4217,7 @@ free_and_statistics:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t generateData(char *res, char **data_type,
|
int32_t generateData(char *res, char **data_type,
|
||||||
int num_of_cols, int64_t timestamp, int len_of_binary) {
|
int num_of_cols, int64_t timestamp, int lenOfBinary) {
|
||||||
memset(res, 0, MAX_DATA_SIZE);
|
memset(res, 0, MAX_DATA_SIZE);
|
||||||
char *pstr = res;
|
char *pstr = res;
|
||||||
pstr += sprintf(pstr, "(%" PRId64, timestamp);
|
pstr += sprintf(pstr, "(%" PRId64, timestamp);
|
||||||
|
@ -4251,13 +4252,13 @@ int32_t generateData(char *res, char **data_type,
|
||||||
bool b = rand() & 1;
|
bool b = rand() & 1;
|
||||||
pstr += sprintf(pstr, ", %s", b ? "true" : "false");
|
pstr += sprintf(pstr, ", %s", b ? "true" : "false");
|
||||||
} else if (strcasecmp(data_type[i % c], "binary") == 0) {
|
} else if (strcasecmp(data_type[i % c], "binary") == 0) {
|
||||||
char *s = malloc(len_of_binary);
|
char *s = malloc(lenOfBinary);
|
||||||
rand_string(s, len_of_binary);
|
rand_string(s, lenOfBinary);
|
||||||
pstr += sprintf(pstr, ", \"%s\"", s);
|
pstr += sprintf(pstr, ", \"%s\"", s);
|
||||||
free(s);
|
free(s);
|
||||||
}else if (strcasecmp(data_type[i % c], "nchar") == 0) {
|
}else if (strcasecmp(data_type[i % c], "nchar") == 0) {
|
||||||
char *s = malloc(len_of_binary);
|
char *s = malloc(lenOfBinary);
|
||||||
rand_string(s, len_of_binary);
|
rand_string(s, lenOfBinary);
|
||||||
pstr += sprintf(pstr, ", \"%s\"", s);
|
pstr += sprintf(pstr, ", \"%s\"", s);
|
||||||
free(s);
|
free(s);
|
||||||
}
|
}
|
||||||
|
@ -4273,142 +4274,6 @@ int32_t generateData(char *res, char **data_type,
|
||||||
return (int32_t)(pstr - res);
|
return (int32_t)(pstr - res);
|
||||||
}
|
}
|
||||||
|
|
||||||
// sync insertion
|
|
||||||
/*
|
|
||||||
1 thread: 100 tables * 2000 rows/s
|
|
||||||
1 thread: 10 tables * 20000 rows/s
|
|
||||||
6 thread: 300 tables * 2000 rows/s
|
|
||||||
|
|
||||||
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
|
|
||||||
*/
|
|
||||||
static void* syncWrite(void *sarg) {
|
|
||||||
|
|
||||||
threadInfo *winfo = (threadInfo *)sarg;
|
|
||||||
|
|
||||||
char* buffer = calloc(g_args.max_sql_len, 1);
|
|
||||||
if (NULL == buffer) {
|
|
||||||
fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n",
|
|
||||||
g_args.max_sql_len,
|
|
||||||
strerror(errno));
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
char data[MAX_DATA_SIZE];
|
|
||||||
char **data_type = g_args.datatype;
|
|
||||||
int len_of_binary = g_args.len_of_binary;
|
|
||||||
|
|
||||||
int ncols_per_record = 1; // count first col ts
|
|
||||||
int i = 0;
|
|
||||||
while(g_args.datatype[i]) {
|
|
||||||
i ++;
|
|
||||||
ncols_per_record ++;
|
|
||||||
}
|
|
||||||
|
|
||||||
srand((uint32_t)time(NULL));
|
|
||||||
int64_t time_counter = winfo->start_time;
|
|
||||||
|
|
||||||
uint64_t st = 0;
|
|
||||||
uint64_t et = 0;
|
|
||||||
|
|
||||||
winfo->totalRowsInserted = 0;
|
|
||||||
winfo->totalAffectedRows = 0;
|
|
||||||
|
|
||||||
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
|
|
||||||
int64_t tmp_time = time_counter;
|
|
||||||
|
|
||||||
for (int i = 0; i < g_args.num_of_DPT;) {
|
|
||||||
|
|
||||||
int tblInserted = i;
|
|
||||||
|
|
||||||
memset(buffer, 0, g_args.max_sql_len);
|
|
||||||
char *pstr = buffer;
|
|
||||||
pstr += sprintf(pstr,
|
|
||||||
"insert into %s.%s%d values ",
|
|
||||||
winfo->db_name, g_args.tb_prefix, tID);
|
|
||||||
int k;
|
|
||||||
for (k = 0; k < g_args.num_of_RPR;) {
|
|
||||||
int rand_num = rand() % 100;
|
|
||||||
int len = -1;
|
|
||||||
|
|
||||||
if ((g_args.disorderRatio != 0)
|
|
||||||
&& (rand_num < g_args.disorderRange)) {
|
|
||||||
|
|
||||||
int64_t d = tmp_time - rand() % 1000000 + rand_num;
|
|
||||||
len = generateData(data, data_type,
|
|
||||||
ncols_per_record, d, len_of_binary);
|
|
||||||
} else {
|
|
||||||
len = generateData(data, data_type,
|
|
||||||
ncols_per_record, tmp_time += 1000, len_of_binary);
|
|
||||||
}
|
|
||||||
|
|
||||||
//assert(len + pstr - buffer < BUFFER_SIZE);
|
|
||||||
if (len + pstr - buffer >= BUFFER_SIZE) { // too long
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
pstr += sprintf(pstr, " %s", data);
|
|
||||||
tblInserted++;
|
|
||||||
k++;
|
|
||||||
i++;
|
|
||||||
|
|
||||||
if (tblInserted >= g_args.num_of_DPT)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
winfo->totalRowsInserted += k;
|
|
||||||
/* puts(buffer); */
|
|
||||||
int64_t startTs;
|
|
||||||
int64_t endTs;
|
|
||||||
startTs = taosGetTimestampUs();
|
|
||||||
//queryDB(winfo->taos, buffer);
|
|
||||||
if (i > 0 && g_args.insert_interval
|
|
||||||
&& (g_args.insert_interval > (et - st) )) {
|
|
||||||
int sleep_time = g_args.insert_interval - (et -st);
|
|
||||||
printf("sleep: %d ms specified by insert_interval\n", sleep_time);
|
|
||||||
taosMsleep(sleep_time); // ms
|
|
||||||
}
|
|
||||||
|
|
||||||
if (g_args.insert_interval) {
|
|
||||||
st = taosGetTimestampMs();
|
|
||||||
}
|
|
||||||
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
|
|
||||||
int affectedRows = queryDbExec(winfo->taos, buffer, 1);
|
|
||||||
|
|
||||||
if (0 < affectedRows){
|
|
||||||
endTs = taosGetTimestampUs();
|
|
||||||
int64_t delay = endTs - startTs;
|
|
||||||
if (delay > winfo->maxDelay)
|
|
||||||
winfo->maxDelay = delay;
|
|
||||||
if (delay < winfo->minDelay)
|
|
||||||
winfo->minDelay = delay;
|
|
||||||
winfo->cntDelay++;
|
|
||||||
winfo->totalDelay += delay;
|
|
||||||
winfo->totalAffectedRows += affectedRows;
|
|
||||||
winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
|
|
||||||
} else {
|
|
||||||
fprintf(stderr, "queryDbExec() buffer:\n%s\naffected rows is %d", buffer, affectedRows);
|
|
||||||
}
|
|
||||||
|
|
||||||
verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64" tblInserted=%d\n", __func__, __LINE__, winfo->totalAffectedRows, tblInserted);
|
|
||||||
if (g_args.insert_interval) {
|
|
||||||
et = taosGetTimestampMs();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tblInserted >= g_args.num_of_DPT) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} // num_of_DPT
|
|
||||||
} // tId
|
|
||||||
|
|
||||||
tmfree(buffer);
|
|
||||||
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
|
|
||||||
winfo->threadID,
|
|
||||||
winfo->totalRowsInserted,
|
|
||||||
winfo->totalAffectedRows);
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int prepareSampleData(SSuperTable *superTblInfo) {
|
static int prepareSampleData(SSuperTable *superTblInfo) {
|
||||||
char* sampleDataBuf = NULL;
|
char* sampleDataBuf = NULL;
|
||||||
|
|
||||||
|
@ -4437,12 +4302,22 @@ static int prepareSampleData(SSuperTable *superTblInfo) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* syncWriteWithStb(void *sarg) {
|
// sync insertion
|
||||||
|
/*
|
||||||
|
1 thread: 100 tables * 2000 rows/s
|
||||||
|
1 thread: 10 tables * 20000 rows/s
|
||||||
|
6 thread: 300 tables * 2000 rows/s
|
||||||
|
|
||||||
|
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
|
||||||
|
*/
|
||||||
|
static void* syncWrite(void *sarg) {
|
||||||
uint64_t lastPrintTime = taosGetTimestampMs();
|
uint64_t lastPrintTime = taosGetTimestampMs();
|
||||||
|
|
||||||
threadInfo *winfo = (threadInfo *)sarg;
|
threadInfo *winfo = (threadInfo *)sarg;
|
||||||
SSuperTable* superTblInfo = winfo->superTblInfo;
|
SSuperTable* superTblInfo = winfo->superTblInfo;
|
||||||
|
|
||||||
|
int ncols_per_record = 1; // count first col ts
|
||||||
|
|
||||||
int samplePos = 0;
|
int samplePos = 0;
|
||||||
|
|
||||||
if (superTblInfo) {
|
if (superTblInfo) {
|
||||||
|
@ -4454,9 +4329,16 @@ static void* syncWriteWithStb(void *sarg) {
|
||||||
tmfree(superTblInfo->sampleDataBuf);
|
tmfree(superTblInfo->sampleDataBuf);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
int datatypeSeq = 0;
|
||||||
|
while(g_args.datatype[datatypeSeq]) {
|
||||||
|
datatypeSeq ++;
|
||||||
|
ncols_per_record ++;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
char* buffer = calloc(superTblInfo->maxSqlLen, 1);
|
char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1);
|
||||||
if (NULL == buffer) {
|
if (NULL == buffer) {
|
||||||
fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n",
|
fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n",
|
||||||
superTblInfo->maxSqlLen,
|
superTblInfo->maxSqlLen,
|
||||||
|
@ -4473,91 +4355,92 @@ static void* syncWriteWithStb(void *sarg) {
|
||||||
|
|
||||||
int sampleUsePos;
|
int sampleUsePos;
|
||||||
|
|
||||||
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows);
|
if (superTblInfo && superTblInfo->childTblLimit ) {
|
||||||
|
// TODO
|
||||||
if (superTblInfo->childTblLimit ) {
|
|
||||||
// CBD
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id;
|
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id;
|
||||||
tID++) {
|
tID++) {
|
||||||
int64_t start_time = winfo->start_time;
|
int64_t start_time = winfo->start_time;
|
||||||
|
|
||||||
for (int i = 0; i < superTblInfo->insertRows;) {
|
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
|
||||||
|
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
|
||||||
|
|
||||||
int64_t tblInserted = i;
|
for (int64_t i = 0; i < insertRows;) {
|
||||||
|
int64_t prepared = i;
|
||||||
if (i > 0 && g_args.insert_interval
|
|
||||||
&& (g_args.insert_interval > (et - st) )) {
|
|
||||||
int sleep_time = g_args.insert_interval - (et -st);
|
|
||||||
printf("sleep: %d ms specified by insert_interval\n", sleep_time);
|
|
||||||
taosMsleep(sleep_time); // ms
|
|
||||||
}
|
|
||||||
|
|
||||||
if (g_args.insert_interval) {
|
|
||||||
st = taosGetTimestampMs();
|
|
||||||
}
|
|
||||||
|
|
||||||
sampleUsePos = samplePos;
|
sampleUsePos = samplePos;
|
||||||
verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR);
|
|
||||||
|
|
||||||
memset(buffer, 0, superTblInfo->maxSqlLen);
|
memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len);
|
||||||
int len = 0;
|
|
||||||
|
|
||||||
char *pstr = buffer;
|
char *pstr = buffer;
|
||||||
|
|
||||||
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
|
if (superTblInfo) {
|
||||||
|
|
||||||
|
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
|
||||||
char* tagsValBuf = NULL;
|
char* tagsValBuf = NULL;
|
||||||
if (0 == superTblInfo->tagSource) {
|
if (0 == superTblInfo->tagSource) {
|
||||||
tagsValBuf = generateTagVaulesForStb(superTblInfo);
|
tagsValBuf = generateTagVaulesForStb(superTblInfo);
|
||||||
} else {
|
} else {
|
||||||
tagsValBuf = getTagValueFromTagSample(
|
tagsValBuf = getTagValueFromTagSample(
|
||||||
superTblInfo,
|
superTblInfo,
|
||||||
tID % superTblInfo->tagSampleCount);
|
tID % superTblInfo->tagSampleCount);
|
||||||
}
|
}
|
||||||
if (NULL == tagsValBuf) {
|
if (NULL == tagsValBuf) {
|
||||||
goto free_and_statistics_2;
|
goto free_and_statistics_2;
|
||||||
}
|
}
|
||||||
|
|
||||||
len += snprintf(pstr + len,
|
pstr += snprintf(pstr,
|
||||||
superTblInfo->maxSqlLen - len,
|
superTblInfo->maxSqlLen,
|
||||||
"insert into %s.%s%d using %s.%s tags %s values",
|
"insert into %s.%s%d using %s.%s tags %s values",
|
||||||
winfo->db_name,
|
winfo->db_name,
|
||||||
superTblInfo->childTblPrefix,
|
superTblInfo->childTblPrefix,
|
||||||
tID,
|
tID,
|
||||||
winfo->db_name,
|
winfo->db_name,
|
||||||
superTblInfo->sTblName,
|
superTblInfo->sTblName,
|
||||||
tagsValBuf);
|
tagsValBuf);
|
||||||
tmfree(tagsValBuf);
|
tmfree(tagsValBuf);
|
||||||
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
|
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
|
||||||
len += snprintf(pstr + len,
|
pstr += snprintf(pstr,
|
||||||
superTblInfo->maxSqlLen - len,
|
superTblInfo->maxSqlLen,
|
||||||
"insert into %s.%s values",
|
"insert into %s.%s values",
|
||||||
winfo->db_name,
|
winfo->db_name,
|
||||||
superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN);
|
superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN);
|
||||||
|
} else {
|
||||||
|
pstr += snprintf(pstr,
|
||||||
|
(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len),
|
||||||
|
"insert into %s.%s%d values",
|
||||||
|
winfo->db_name,
|
||||||
|
superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix,
|
||||||
|
tID);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
len += snprintf(pstr + len,
|
|
||||||
superTblInfo->maxSqlLen - len,
|
pstr += snprintf(pstr,
|
||||||
"insert into %s.%s%d values",
|
(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len),
|
||||||
winfo->db_name,
|
"insert into %s.%s%d values",
|
||||||
superTblInfo->childTblPrefix,
|
winfo->db_name,
|
||||||
|
superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix,
|
||||||
tID);
|
tID);
|
||||||
}
|
}
|
||||||
|
|
||||||
int k;
|
int k;
|
||||||
|
int len = 0;
|
||||||
|
|
||||||
|
verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR);
|
||||||
for (k = 0; k < g_args.num_of_RPR;) {
|
for (k = 0; k < g_args.num_of_RPR;) {
|
||||||
int retLen = 0;
|
|
||||||
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
|
if (superTblInfo) {
|
||||||
|
int retLen = 0;
|
||||||
|
|
||||||
|
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
|
||||||
retLen = getRowDataFromSample(
|
retLen = getRowDataFromSample(
|
||||||
pstr + len,
|
pstr + len,
|
||||||
superTblInfo->maxSqlLen - len,
|
superTblInfo->maxSqlLen - len,
|
||||||
start_time + superTblInfo->timeStampStep * i,
|
start_time + superTblInfo->timeStampStep * i,
|
||||||
superTblInfo,
|
superTblInfo,
|
||||||
&sampleUsePos);
|
&sampleUsePos);
|
||||||
if (retLen < 0) {
|
} else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
|
||||||
goto free_and_statistics_2;
|
|
||||||
}
|
|
||||||
} else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
|
|
||||||
int rand_num = rand_tinyint() % 100;
|
int rand_num = rand_tinyint() % 100;
|
||||||
if (0 != superTblInfo->disorderRatio
|
if (0 != superTblInfo->disorderRatio
|
||||||
&& rand_num < superTblInfo->disorderRatio) {
|
&& rand_num < superTblInfo->disorderRatio) {
|
||||||
|
@ -4567,43 +4450,82 @@ static void* syncWriteWithStb(void *sarg) {
|
||||||
superTblInfo->maxSqlLen - len,
|
superTblInfo->maxSqlLen - len,
|
||||||
d,
|
d,
|
||||||
superTblInfo);
|
superTblInfo);
|
||||||
//printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d);
|
//printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, start_time, d);
|
||||||
} else {
|
} else {
|
||||||
retLen = generateRowData(
|
retLen = generateRowData(
|
||||||
pstr + len,
|
pstr + len,
|
||||||
superTblInfo->maxSqlLen - len,
|
superTblInfo->maxSqlLen - len,
|
||||||
start_time + superTblInfo->timeStampStep * i,
|
start_time + superTblInfo->timeStampStep * i,
|
||||||
superTblInfo);
|
superTblInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (retLen < 0) {
|
if (retLen < 0) {
|
||||||
goto free_and_statistics_2;
|
goto free_and_statistics_2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
len += retLen;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
int rand_num = rand() % 100;
|
||||||
|
char data[MAX_DATA_SIZE];
|
||||||
|
char **data_type = g_args.datatype;
|
||||||
|
int lenOfBinary = g_args.len_of_binary;
|
||||||
|
|
||||||
|
if ((g_args.disorderRatio != 0)
|
||||||
|
&& (rand_num < g_args.disorderRange)) {
|
||||||
|
|
||||||
|
int64_t d = start_time - rand() % 1000000 + rand_num;
|
||||||
|
len = generateData(data, data_type,
|
||||||
|
ncols_per_record, d, lenOfBinary);
|
||||||
|
} else {
|
||||||
|
len = generateData(data, data_type,
|
||||||
|
ncols_per_record, start_time += 1000, lenOfBinary);
|
||||||
|
}
|
||||||
|
|
||||||
|
//assert(len + pstr - buffer < BUFFER_SIZE);
|
||||||
|
if (len + pstr - buffer >= g_args.max_sql_len) { // too long
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
pstr += sprintf(pstr, " %s", data);
|
||||||
}
|
}
|
||||||
|
|
||||||
len += retLen;
|
verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer);
|
||||||
verbosePrint("%s() LN%d retLen=%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, retLen, len, k, buffer);
|
|
||||||
|
|
||||||
tblInserted++;
|
prepared ++;
|
||||||
k++;
|
k++;
|
||||||
i++;
|
i++;
|
||||||
|
|
||||||
if (tblInserted >= superTblInfo->insertRows)
|
if (prepared >= insertRows)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
winfo->totalRowsInserted += k;
|
winfo->totalRowsInserted += k;
|
||||||
|
|
||||||
|
if (g_args.insert_interval) {
|
||||||
|
st = taosGetTimestampMs();
|
||||||
|
|
||||||
|
if (i > 0 && g_args.insert_interval
|
||||||
|
&& (g_args.insert_interval > (et - st) )) {
|
||||||
|
int sleep_time = g_args.insert_interval - (et -st);
|
||||||
|
printf("sleep: %d ms for insert interval\n", sleep_time);
|
||||||
|
taosMsleep(sleep_time); // ms
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int64_t startTs = taosGetTimestampUs();
|
int64_t startTs = taosGetTimestampUs();
|
||||||
int64_t endTs;
|
int64_t endTs;
|
||||||
int affectedRows;
|
int affectedRows;
|
||||||
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
|
|
||||||
|
if (superTblInfo) {
|
||||||
|
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
|
||||||
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
|
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
|
||||||
affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
|
affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
|
||||||
|
|
||||||
if (0 > affectedRows){
|
if (0 > affectedRows){
|
||||||
goto free_and_statistics_2;
|
goto free_and_statistics_2;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
|
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
|
||||||
int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer);
|
int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer);
|
||||||
|
|
||||||
|
@ -4613,6 +4535,10 @@ static void* syncWriteWithStb(void *sarg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
affectedRows = k;
|
affectedRows = k;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
|
||||||
|
affectedRows = queryDbExec(winfo->taos, buffer, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
endTs = taosGetTimestampUs();
|
endTs = taosGetTimestampUs();
|
||||||
|
@ -4637,23 +4563,21 @@ static void* syncWriteWithStb(void *sarg) {
|
||||||
et = taosGetTimestampMs();
|
et = taosGetTimestampMs();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tblInserted >= superTblInfo->insertRows)
|
if (prepared >= insertRows)
|
||||||
break;
|
break;
|
||||||
} // num_of_DPT
|
} // num_of_DPT
|
||||||
|
|
||||||
if (tID == winfo->end_table_id) {
|
if ((tID == winfo->end_table_id) && superTblInfo &&
|
||||||
if (0 == strncasecmp(
|
(0 == strncasecmp(
|
||||||
superTblInfo->dataSource, "sample", strlen("sample"))) {
|
superTblInfo->dataSource, "sample", strlen("sample")))) {
|
||||||
samplePos = sampleUsePos;
|
samplePos = sampleUsePos;
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
|
|
||||||
} // tID
|
} // tID
|
||||||
|
|
||||||
free_and_statistics_2:
|
free_and_statistics_2:
|
||||||
tmfree(buffer);
|
tmfree(buffer);
|
||||||
tmfree(superTblInfo->sampleDataBuf);
|
if (superTblInfo)
|
||||||
|
tmfree(superTblInfo->sampleDataBuf);
|
||||||
|
|
||||||
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
|
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
|
||||||
winfo->threadID,
|
winfo->threadID,
|
||||||
|
@ -4697,7 +4621,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
|
||||||
//generateData(data, datatype, ncols_per_record, d, len_of_binary);
|
//generateData(data, datatype, ncols_per_record, d, len_of_binary);
|
||||||
(void)generateRowData(data, MAX_DATA_SIZE, d, winfo->superTblInfo);
|
(void)generateRowData(data, MAX_DATA_SIZE, d, winfo->superTblInfo);
|
||||||
} else {
|
} else {
|
||||||
//generateData(data, datatype, ncols_per_record, tmp_time += 1000, len_of_binary);
|
//generateData(data, datatype, ncols_per_record, start_time += 1000, len_of_binary);
|
||||||
(void)generateRowData(data, MAX_DATA_SIZE, winfo->lastTs += 1000, winfo->superTblInfo);
|
(void)generateRowData(data, MAX_DATA_SIZE, winfo->lastTs += 1000, winfo->superTblInfo);
|
||||||
}
|
}
|
||||||
pstr += sprintf(pstr, "%s", data);
|
pstr += sprintf(pstr, "%s", data);
|
||||||
|
@ -4839,11 +4763,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
||||||
|
|
||||||
tsem_init(&(t_info->lock_sem), 0, 0);
|
tsem_init(&(t_info->lock_sem), 0, 0);
|
||||||
if (SYNC == g_Dbs.queryMode) {
|
if (SYNC == g_Dbs.queryMode) {
|
||||||
if (superTblInfo) {
|
pthread_create(pids + i, NULL, syncWrite, t_info);
|
||||||
pthread_create(pids + i, NULL, syncWriteWithStb, t_info);
|
|
||||||
} else {
|
|
||||||
pthread_create(pids + i, NULL, syncWrite, t_info);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
pthread_create(pids + i, NULL, asyncWrite, t_info);
|
pthread_create(pids + i, NULL, asyncWrite, t_info);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue