Feature/sangshuduo/td 4068 taosdemo stmt (#6250)
* merge with develop branch. change query/tests/CMakeLists.txt to allow unused function and variable. * refactor data generating. * refactor. * refactor. * refactor. * refactor. * refactor * add prepare stmt function. * refactor get rand timestamp. * fix windows compile error. * copy logic of generate data for stmt. * insert data basically works now. * fix windows compile issue. * [TD-4068]<feature>: taosdemo stmt interface. stb batch insert works. * [TD-4068]<feature>: taosdemo support stmt. normal table insert works. * [TD-4068]<feature>: taosdemo support stmt. interlace write works. Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
This commit is contained in:
parent
703cc9a5e1
commit
dae5a07488
|
@ -1054,6 +1054,19 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int columnCount;
|
||||||
|
for (columnCount = 0; columnCount < MAX_NUM_DATATYPE; columnCount ++) {
|
||||||
|
if (g_args.datatype[columnCount] == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (0 == columnCount) {
|
||||||
|
perror("data type error!");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
g_args.num_of_CPR = columnCount;
|
||||||
|
|
||||||
if (((arguments->debug_print) && (arguments->metaFile == NULL))
|
if (((arguments->debug_print) && (arguments->metaFile == NULL))
|
||||||
|| arguments->verbose_print) {
|
|| arguments->verbose_print) {
|
||||||
printf("###################################################################\n");
|
printf("###################################################################\n");
|
||||||
|
@ -1076,7 +1089,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
|
||||||
}
|
}
|
||||||
printf("# Insertion interval: %"PRIu64"\n",
|
printf("# Insertion interval: %"PRIu64"\n",
|
||||||
arguments->insert_interval);
|
arguments->insert_interval);
|
||||||
printf("# Number of records per req: %ud\n",
|
printf("# Number of records per req: %u\n",
|
||||||
arguments->num_of_RPR);
|
arguments->num_of_RPR);
|
||||||
printf("# Max SQL length: %"PRIu64"\n",
|
printf("# Max SQL length: %"PRIu64"\n",
|
||||||
arguments->max_sql_len);
|
arguments->max_sql_len);
|
||||||
|
@ -1368,7 +1381,7 @@ static int printfInsertMeta() {
|
||||||
g_Dbs.threadCountByCreateTbl);
|
g_Dbs.threadCountByCreateTbl);
|
||||||
printf("top insert interval: \033[33m%"PRIu64"\033[0m\n",
|
printf("top insert interval: \033[33m%"PRIu64"\033[0m\n",
|
||||||
g_args.insert_interval);
|
g_args.insert_interval);
|
||||||
printf("number of records per req: \033[33m%ud\033[0m\n",
|
printf("number of records per req: \033[33m%u\033[0m\n",
|
||||||
g_args.num_of_RPR);
|
g_args.num_of_RPR);
|
||||||
printf("max sql length: \033[33m%"PRIu64"\033[0m\n",
|
printf("max sql length: \033[33m%"PRIu64"\033[0m\n",
|
||||||
g_args.max_sql_len);
|
g_args.max_sql_len);
|
||||||
|
@ -1494,7 +1507,7 @@ static int printfInsertMeta() {
|
||||||
printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n");
|
printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n");
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
printf(" interlaceRows: \033[33m%ud\033[0m\n",
|
printf(" interlaceRows: \033[33m%u\033[0m\n",
|
||||||
g_Dbs.db[i].superTbls[j].interlaceRows);
|
g_Dbs.db[i].superTbls[j].interlaceRows);
|
||||||
|
|
||||||
if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) {
|
if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) {
|
||||||
|
@ -1572,7 +1585,7 @@ static void printfInsertMetaToFile(FILE* fp) {
|
||||||
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
|
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
|
||||||
fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount);
|
fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount);
|
||||||
fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl);
|
fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl);
|
||||||
fprintf(fp, "number of records per req: %ud\n", g_args.num_of_RPR);
|
fprintf(fp, "number of records per req: %u\n", g_args.num_of_RPR);
|
||||||
fprintf(fp, "max sql length: %"PRIu64"\n", g_args.max_sql_len);
|
fprintf(fp, "max sql length: %"PRIu64"\n", g_args.max_sql_len);
|
||||||
fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
|
fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
|
||||||
|
|
||||||
|
@ -1669,7 +1682,7 @@ static void printfInsertMetaToFile(FILE* fp) {
|
||||||
(g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt");
|
(g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt");
|
||||||
fprintf(fp, " insertRows: %"PRId64"\n",
|
fprintf(fp, " insertRows: %"PRId64"\n",
|
||||||
g_Dbs.db[i].superTbls[j].insertRows);
|
g_Dbs.db[i].superTbls[j].insertRows);
|
||||||
fprintf(fp, " interlace rows: %ud\n",
|
fprintf(fp, " interlace rows: %u\n",
|
||||||
g_Dbs.db[i].superTbls[j].interlaceRows);
|
g_Dbs.db[i].superTbls[j].interlaceRows);
|
||||||
if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) {
|
if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) {
|
||||||
fprintf(fp, " stable insert interval: %"PRIu64"\n",
|
fprintf(fp, " stable insert interval: %"PRIu64"\n",
|
||||||
|
@ -1682,7 +1695,7 @@ static void printfInsertMetaToFile(FILE* fp) {
|
||||||
fprintf(fp, " multiThreadWriteOneTbl: yes\n");
|
fprintf(fp, " multiThreadWriteOneTbl: yes\n");
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
fprintf(fp, " interlaceRows: %ud\n",
|
fprintf(fp, " interlaceRows: %u\n",
|
||||||
g_Dbs.db[i].superTbls[j].interlaceRows);
|
g_Dbs.db[i].superTbls[j].interlaceRows);
|
||||||
fprintf(fp, " disorderRange: %d\n",
|
fprintf(fp, " disorderRange: %d\n",
|
||||||
g_Dbs.db[i].superTbls[j].disorderRange);
|
g_Dbs.db[i].superTbls[j].disorderRange);
|
||||||
|
@ -3075,11 +3088,14 @@ static void createChildTables() {
|
||||||
// normal table
|
// normal table
|
||||||
len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP");
|
len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP");
|
||||||
for (int j = 0; j < g_args.num_of_CPR; j++) {
|
for (int j = 0; j < g_args.num_of_CPR; j++) {
|
||||||
if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0)
|
if (g_args.datatype[j]
|
||||||
|
&& ((strncasecmp(g_args.datatype[j],
|
||||||
|
"BINARY", strlen("BINARY")) == 0)
|
||||||
|| (strncasecmp(g_args.datatype[j],
|
|| (strncasecmp(g_args.datatype[j],
|
||||||
"NCHAR", strlen("NCHAR")) == 0)) {
|
"NCHAR", strlen("NCHAR")) == 0))) {
|
||||||
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
|
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
|
||||||
", COL%d %s(%d)", j, g_args.datatype[j], g_args.len_of_binary);
|
", COL%d %s(%d)", j, g_args.datatype[j],
|
||||||
|
g_args.len_of_binary);
|
||||||
} else {
|
} else {
|
||||||
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
|
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
|
||||||
", COL%d %s", j, g_args.datatype[j]);
|
", COL%d %s", j, g_args.datatype[j]);
|
||||||
|
@ -3561,9 +3577,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
||||||
|
|
||||||
// rows per table need be less than insert batch
|
// rows per table need be less than insert batch
|
||||||
if (g_args.interlace_rows > g_args.num_of_RPR) {
|
if (g_args.interlace_rows > g_args.num_of_RPR) {
|
||||||
printf("NOTICE: interlace rows value %ud > num_of_records_per_req %ud\n\n",
|
printf("NOTICE: interlace rows value %u > num_of_records_per_req %u\n\n",
|
||||||
g_args.interlace_rows, g_args.num_of_RPR);
|
g_args.interlace_rows, g_args.num_of_RPR);
|
||||||
printf(" interlace rows value will be set to num_of_records_per_req %ud\n\n",
|
printf(" interlace rows value will be set to num_of_records_per_req %u\n\n",
|
||||||
g_args.num_of_RPR);
|
g_args.num_of_RPR);
|
||||||
prompt();
|
prompt();
|
||||||
g_args.interlace_rows = g_args.num_of_RPR;
|
g_args.interlace_rows = g_args.num_of_RPR;
|
||||||
|
@ -4030,10 +4046,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
||||||
g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint;
|
g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint;
|
||||||
// rows per table need be less than insert batch
|
// rows per table need be less than insert batch
|
||||||
if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) {
|
if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) {
|
||||||
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %ud > num_of_records_per_req %ud\n\n",
|
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %u > num_of_records_per_req %u\n\n",
|
||||||
i, j, g_Dbs.db[i].superTbls[j].interlaceRows,
|
i, j, g_Dbs.db[i].superTbls[j].interlaceRows,
|
||||||
g_args.num_of_RPR);
|
g_args.num_of_RPR);
|
||||||
printf(" interlace rows value will be set to num_of_records_per_req %ud\n\n",
|
printf(" interlace rows value will be set to num_of_records_per_req %u\n\n",
|
||||||
g_args.num_of_RPR);
|
g_args.num_of_RPR);
|
||||||
prompt();
|
prompt();
|
||||||
g_Dbs.db[i].superTbls[j].interlaceRows = g_args.num_of_RPR;
|
g_Dbs.db[i].superTbls[j].interlaceRows = g_args.num_of_RPR;
|
||||||
|
@ -4773,44 +4789,34 @@ static int64_t generateData(char *recBuf, char **data_type,
|
||||||
memset(recBuf, 0, MAX_DATA_SIZE);
|
memset(recBuf, 0, MAX_DATA_SIZE);
|
||||||
char *pstr = recBuf;
|
char *pstr = recBuf;
|
||||||
pstr += sprintf(pstr, "(%" PRId64, timestamp);
|
pstr += sprintf(pstr, "(%" PRId64, timestamp);
|
||||||
int c = 0;
|
|
||||||
|
|
||||||
for (; c < MAX_NUM_DATATYPE; c++) {
|
int columnCount = g_args.num_of_CPR;
|
||||||
if (data_type[c] == NULL) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (0 == c) {
|
for (int i = 0; i < columnCount; i++) {
|
||||||
perror("data type error!");
|
if (strcasecmp(data_type[i % columnCount], "TINYINT") == 0) {
|
||||||
exit(-1);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < c; i++) {
|
|
||||||
if (strcasecmp(data_type[i % c], "TINYINT") == 0) {
|
|
||||||
pstr += sprintf(pstr, ",%d", rand_tinyint() );
|
pstr += sprintf(pstr, ",%d", rand_tinyint() );
|
||||||
} else if (strcasecmp(data_type[i % c], "SMALLINT") == 0) {
|
} else if (strcasecmp(data_type[i % columnCount], "SMALLINT") == 0) {
|
||||||
pstr += sprintf(pstr, ",%d", rand_smallint());
|
pstr += sprintf(pstr, ",%d", rand_smallint());
|
||||||
} else if (strcasecmp(data_type[i % c], "INT") == 0) {
|
} else if (strcasecmp(data_type[i % columnCount], "INT") == 0) {
|
||||||
pstr += sprintf(pstr, ",%d", rand_int());
|
pstr += sprintf(pstr, ",%d", rand_int());
|
||||||
} else if (strcasecmp(data_type[i % c], "BIGINT") == 0) {
|
} else if (strcasecmp(data_type[i % columnCount], "BIGINT") == 0) {
|
||||||
pstr += sprintf(pstr, ",%" PRId64, rand_bigint());
|
pstr += sprintf(pstr, ",%" PRId64, rand_bigint());
|
||||||
} else if (strcasecmp(data_type[i % c], "TIMESTAMP") == 0) {
|
} else if (strcasecmp(data_type[i % columnCount], "TIMESTAMP") == 0) {
|
||||||
pstr += sprintf(pstr, ",%" PRId64, rand_bigint());
|
pstr += sprintf(pstr, ",%" PRId64, rand_bigint());
|
||||||
} else if (strcasecmp(data_type[i % c], "FLOAT") == 0) {
|
} else if (strcasecmp(data_type[i % columnCount], "FLOAT") == 0) {
|
||||||
pstr += sprintf(pstr, ",%10.4f", rand_float());
|
pstr += sprintf(pstr, ",%10.4f", rand_float());
|
||||||
} else if (strcasecmp(data_type[i % c], "DOUBLE") == 0) {
|
} else if (strcasecmp(data_type[i % columnCount], "DOUBLE") == 0) {
|
||||||
double t = rand_double();
|
double t = rand_double();
|
||||||
pstr += sprintf(pstr, ",%20.8f", t);
|
pstr += sprintf(pstr, ",%20.8f", t);
|
||||||
} else if (strcasecmp(data_type[i % c], "BOOL") == 0) {
|
} else if (strcasecmp(data_type[i % columnCount], "BOOL") == 0) {
|
||||||
bool b = rand_bool() & 1;
|
bool b = rand_bool() & 1;
|
||||||
pstr += sprintf(pstr, ",%s", b ? "true" : "false");
|
pstr += sprintf(pstr, ",%s", b ? "true" : "false");
|
||||||
} else if (strcasecmp(data_type[i % c], "BINARY") == 0) {
|
} else if (strcasecmp(data_type[i % columnCount], "BINARY") == 0) {
|
||||||
char *s = malloc(lenOfBinary);
|
char *s = malloc(lenOfBinary);
|
||||||
rand_string(s, lenOfBinary);
|
rand_string(s, lenOfBinary);
|
||||||
pstr += sprintf(pstr, ",\"%s\"", s);
|
pstr += sprintf(pstr, ",\"%s\"", s);
|
||||||
free(s);
|
free(s);
|
||||||
} else if (strcasecmp(data_type[i % c], "NCHAR") == 0) {
|
} else if (strcasecmp(data_type[i % columnCount], "NCHAR") == 0) {
|
||||||
char *s = malloc(lenOfBinary);
|
char *s = malloc(lenOfBinary);
|
||||||
rand_string(s, lenOfBinary);
|
rand_string(s, lenOfBinary);
|
||||||
pstr += sprintf(pstr, ",\"%s\"", s);
|
pstr += sprintf(pstr, ",\"%s\"", s);
|
||||||
|
@ -4857,46 +4863,60 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t execInsert(threadInfo *pThreadInfo, uint64_t k)
|
static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
|
||||||
{
|
{
|
||||||
int affectedRows;
|
int32_t affectedRows;
|
||||||
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
||||||
|
|
||||||
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
|
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
|
||||||
__func__, __LINE__, pThreadInfo->buffer);
|
__func__, __LINE__, pThreadInfo->buffer);
|
||||||
if (superTblInfo) {
|
|
||||||
if (superTblInfo->iface == TAOSC_IFACE) {
|
|
||||||
affectedRows = queryDbExec(
|
|
||||||
pThreadInfo->taos,
|
|
||||||
pThreadInfo->buffer, INSERT_TYPE, false);
|
|
||||||
} else if (superTblInfo->iface == REST_IFACE) {
|
|
||||||
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port,
|
|
||||||
pThreadInfo->buffer, NULL /* not set result file */)) {
|
|
||||||
affectedRows = -1;
|
|
||||||
printf("========restful return fail, threadID[%d]\n",
|
|
||||||
pThreadInfo->threadID);
|
|
||||||
} else {
|
|
||||||
affectedRows = k;
|
|
||||||
}
|
|
||||||
} else if (superTblInfo->iface == STMT_IFACE) {
|
|
||||||
debugPrint("%s() LN%d, stmt=%p", __func__, __LINE__, pThreadInfo->stmt);
|
|
||||||
if (0 != taos_stmt_execute(pThreadInfo->stmt)) {
|
|
||||||
errorPrint("%s() LN%d, failied to execute insert statement\n",
|
|
||||||
__func__, __LINE__);
|
|
||||||
exit(-1);
|
|
||||||
}
|
|
||||||
|
|
||||||
affectedRows = k;
|
uint16_t iface;
|
||||||
} else {
|
if (superTblInfo)
|
||||||
errorPrint("%s() LN%d: unknown insert mode: %d\n",
|
iface = superTblInfo->iface;
|
||||||
__func__, __LINE__, superTblInfo->iface);
|
else
|
||||||
affectedRows = 0;
|
iface = g_args.iface;
|
||||||
|
|
||||||
|
debugPrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
|
||||||
|
__func__, __LINE__,
|
||||||
|
(g_args.iface==TAOSC_IFACE)?
|
||||||
|
"taosc":(g_args.iface==REST_IFACE)?"rest":"stmt");
|
||||||
|
|
||||||
|
switch(iface) {
|
||||||
|
case TAOSC_IFACE:
|
||||||
|
affectedRows = queryDbExec(
|
||||||
|
pThreadInfo->taos,
|
||||||
|
pThreadInfo->buffer, INSERT_TYPE, false);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case REST_IFACE:
|
||||||
|
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port,
|
||||||
|
pThreadInfo->buffer, NULL /* not set result file */)) {
|
||||||
|
affectedRows = -1;
|
||||||
|
printf("========restful return fail, threadID[%d]\n",
|
||||||
|
pThreadInfo->threadID);
|
||||||
|
} else {
|
||||||
|
affectedRows = k;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case STMT_IFACE:
|
||||||
|
debugPrint("%s() LN%d, stmt=%p", __func__, __LINE__, pThreadInfo->stmt);
|
||||||
|
if (0 != taos_stmt_execute(pThreadInfo->stmt)) {
|
||||||
|
errorPrint("%s() LN%d, failied to execute insert statement\n",
|
||||||
|
__func__, __LINE__);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
affectedRows = k;
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
errorPrint("%s() LN%d: unknown insert mode: %d\n",
|
||||||
|
__func__, __LINE__, superTblInfo->iface);
|
||||||
|
affectedRows = 0;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
affectedRows = queryDbExec(pThreadInfo->taos, pThreadInfo->buffer, INSERT_TYPE, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
return affectedRows;
|
return affectedRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getTableName(char *pTblName,
|
static void getTableName(char *pTblName,
|
||||||
|
@ -4924,7 +4944,7 @@ static void getTableName(char *pTblName,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t generateDataTailWithoutStb(
|
static int32_t generateDataTailWithoutStb(
|
||||||
uint32_t batch, char* buffer,
|
uint32_t batch, char* buffer,
|
||||||
int64_t remainderBufLen, int64_t insertRows,
|
int64_t remainderBufLen, int64_t insertRows,
|
||||||
uint64_t recordFrom, int64_t startTime,
|
uint64_t recordFrom, int64_t startTime,
|
||||||
|
@ -4935,7 +4955,7 @@ static int64_t generateDataTailWithoutStb(
|
||||||
|
|
||||||
verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch);
|
verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch);
|
||||||
|
|
||||||
int64_t k = 0;
|
int32_t k = 0;
|
||||||
for (k = 0; k < batch;) {
|
for (k = 0; k < batch;) {
|
||||||
char data[MAX_DATA_SIZE];
|
char data[MAX_DATA_SIZE];
|
||||||
memset(data, 0, MAX_DATA_SIZE);
|
memset(data, 0, MAX_DATA_SIZE);
|
||||||
|
@ -4947,7 +4967,7 @@ static int64_t generateDataTailWithoutStb(
|
||||||
|
|
||||||
retLen = generateData(data, data_type,
|
retLen = generateData(data, data_type,
|
||||||
startTime + getTSRandTail(
|
startTime + getTSRandTail(
|
||||||
(int64_t)DEFAULT_TIMESTAMP_STEP, k,
|
(int64_t) DEFAULT_TIMESTAMP_STEP, k,
|
||||||
g_args.disorderRatio,
|
g_args.disorderRatio,
|
||||||
g_args.disorderRange),
|
g_args.disorderRange),
|
||||||
lenOfBinary);
|
lenOfBinary);
|
||||||
|
@ -4960,7 +4980,7 @@ static int64_t generateDataTailWithoutStb(
|
||||||
len += retLen;
|
len += retLen;
|
||||||
remainderBufLen -= retLen;
|
remainderBufLen -= retLen;
|
||||||
|
|
||||||
verbosePrint("%s() LN%d len=%"PRIu64" k=%"PRIu64" \nbuffer=%s\n",
|
verbosePrint("%s() LN%d len=%"PRIu64" k=%d \nbuffer=%s\n",
|
||||||
__func__, __LINE__, len, k, buffer);
|
__func__, __LINE__, len, k, buffer);
|
||||||
|
|
||||||
recordFrom ++;
|
recordFrom ++;
|
||||||
|
@ -5006,7 +5026,7 @@ static int32_t generateStbDataTail(
|
||||||
} else {
|
} else {
|
||||||
tsRand = false;
|
tsRand = false;
|
||||||
}
|
}
|
||||||
verbosePrint("%s() LN%d batch=%ud\n", __func__, __LINE__, batch);
|
verbosePrint("%s() LN%d batch=%u\n", __func__, __LINE__, batch);
|
||||||
|
|
||||||
int32_t k = 0;
|
int32_t k = 0;
|
||||||
for (k = 0; k < batch;) {
|
for (k = 0; k < batch;) {
|
||||||
|
@ -5040,7 +5060,7 @@ static int32_t generateStbDataTail(
|
||||||
len += retLen;
|
len += retLen;
|
||||||
remainderBufLen -= retLen;
|
remainderBufLen -= retLen;
|
||||||
|
|
||||||
verbosePrint("%s() LN%d len=%"PRIu64" k=%ud \nbuffer=%s\n",
|
verbosePrint("%s() LN%d len=%"PRIu64" k=%u \nbuffer=%s\n",
|
||||||
__func__, __LINE__, len, k, buffer);
|
__func__, __LINE__, len, k, buffer);
|
||||||
|
|
||||||
recordFrom ++;
|
recordFrom ++;
|
||||||
|
@ -5168,7 +5188,7 @@ static int32_t generateStbInterlaceData(
|
||||||
|
|
||||||
int64_t dataLen = 0;
|
int64_t dataLen = 0;
|
||||||
|
|
||||||
verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%ud batchPerTbl = %ud\n",
|
verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%u batchPerTbl = %u\n",
|
||||||
pThreadInfo->threadID, __func__, __LINE__,
|
pThreadInfo->threadID, __func__, __LINE__,
|
||||||
i, batchPerTblTimes, batchPerTbl);
|
i, batchPerTblTimes, batchPerTbl);
|
||||||
|
|
||||||
|
@ -5186,7 +5206,7 @@ static int32_t generateStbInterlaceData(
|
||||||
pstr += dataLen;
|
pstr += dataLen;
|
||||||
*pRemainderBufLen -= dataLen;
|
*pRemainderBufLen -= dataLen;
|
||||||
} else {
|
} else {
|
||||||
debugPrint("%s() LN%d, generated data tail: %ud, not equal batch per table: %ud\n",
|
debugPrint("%s() LN%d, generated data tail: %u, not equal batch per table: %u\n",
|
||||||
__func__, __LINE__, k, batchPerTbl);
|
__func__, __LINE__, k, batchPerTbl);
|
||||||
pstr -= headLen;
|
pstr -= headLen;
|
||||||
pstr[0] = '\0';
|
pstr[0] = '\0';
|
||||||
|
@ -5197,10 +5217,11 @@ static int32_t generateStbInterlaceData(
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t generateInterlaceDataWithoutStb(
|
static int64_t generateInterlaceDataWithoutStb(
|
||||||
char *tableName, uint32_t batchPerTbl,
|
char *tableName, uint32_t batch,
|
||||||
uint64_t tableSeq,
|
uint64_t tableSeq,
|
||||||
char *dbName, char *buffer,
|
char *dbName, char *buffer,
|
||||||
int64_t insertRows,
|
int64_t insertRows,
|
||||||
|
int64_t startTime,
|
||||||
uint64_t *pRemainderBufLen)
|
uint64_t *pRemainderBufLen)
|
||||||
{
|
{
|
||||||
assert(buffer);
|
assert(buffer);
|
||||||
|
@ -5219,18 +5240,17 @@ static int64_t generateInterlaceDataWithoutStb(
|
||||||
|
|
||||||
int64_t dataLen = 0;
|
int64_t dataLen = 0;
|
||||||
|
|
||||||
int64_t startTime = 1500000000000;
|
int32_t k = generateDataTailWithoutStb(
|
||||||
int64_t k = generateDataTailWithoutStb(
|
batch, pstr, *pRemainderBufLen, insertRows, 0,
|
||||||
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
|
|
||||||
startTime,
|
startTime,
|
||||||
&dataLen);
|
&dataLen);
|
||||||
|
|
||||||
if (k == batchPerTbl) {
|
if (k == batch) {
|
||||||
pstr += dataLen;
|
pstr += dataLen;
|
||||||
*pRemainderBufLen -= dataLen;
|
*pRemainderBufLen -= dataLen;
|
||||||
} else {
|
} else {
|
||||||
debugPrint("%s() LN%d, generated data tail: %"PRIu64", not equal batch per table: %ud\n",
|
debugPrint("%s() LN%d, generated data tail: %d, not equal batch per table: %u\n",
|
||||||
__func__, __LINE__, k, batchPerTbl);
|
__func__, __LINE__, k, batch);
|
||||||
pstr -= headLen;
|
pstr -= headLen;
|
||||||
pstr[0] = '\0';
|
pstr[0] = '\0';
|
||||||
k = 0;
|
k = 0;
|
||||||
|
@ -5239,32 +5259,237 @@ static int64_t generateInterlaceDataWithoutStb(
|
||||||
return k;
|
return k;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
|
||||||
|
char *dataType, int32_t dataLen, char **ptr)
|
||||||
|
{
|
||||||
|
if (0 == strncasecmp(dataType,
|
||||||
|
"BINARY", strlen("BINARY"))) {
|
||||||
|
if (dataLen > TSDB_MAX_BINARY_LEN) {
|
||||||
|
errorPrint( "binary length overflow, max size:%u\n",
|
||||||
|
(uint32_t)TSDB_MAX_BINARY_LEN);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
char *bind_binary = (char *)*ptr;
|
||||||
|
rand_string(bind_binary, dataLen);
|
||||||
|
|
||||||
|
bind->buffer_type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
bind->buffer_length = dataLen;
|
||||||
|
bind->buffer = bind_binary;
|
||||||
|
bind->length = &bind->buffer_length;
|
||||||
|
bind->is_null = NULL;
|
||||||
|
|
||||||
|
*ptr += bind->buffer_length;
|
||||||
|
} else if (0 == strncasecmp(dataType,
|
||||||
|
"NCHAR", strlen("NCHAR"))) {
|
||||||
|
if (dataLen > TSDB_MAX_BINARY_LEN) {
|
||||||
|
errorPrint( "nchar length overflow, max size:%u\n",
|
||||||
|
(uint32_t)TSDB_MAX_BINARY_LEN);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
char *bind_nchar = (char *)*ptr;
|
||||||
|
rand_string(bind_nchar, dataLen);
|
||||||
|
|
||||||
|
bind->buffer_type = TSDB_DATA_TYPE_NCHAR;
|
||||||
|
bind->buffer_length = strlen(bind_nchar);
|
||||||
|
bind->buffer = bind_nchar;
|
||||||
|
bind->length = &bind->buffer_length;
|
||||||
|
bind->is_null = NULL;
|
||||||
|
|
||||||
|
*ptr += bind->buffer_length;
|
||||||
|
} else if (0 == strncasecmp(dataType,
|
||||||
|
"INT", strlen("INT"))) {
|
||||||
|
int32_t *bind_int = (int32_t *)*ptr;
|
||||||
|
|
||||||
|
*bind_int = rand_int();
|
||||||
|
bind->buffer_type = TSDB_DATA_TYPE_INT;
|
||||||
|
bind->buffer_length = sizeof(int32_t);
|
||||||
|
bind->buffer = bind_int;
|
||||||
|
bind->length = &bind->buffer_length;
|
||||||
|
bind->is_null = NULL;
|
||||||
|
|
||||||
|
*ptr += bind->buffer_length;
|
||||||
|
} else if (0 == strncasecmp(dataType,
|
||||||
|
"BIGINT", strlen("BIGINT"))) {
|
||||||
|
int64_t *bind_bigint = (int64_t *)*ptr;
|
||||||
|
|
||||||
|
*bind_bigint = rand_bigint();
|
||||||
|
bind->buffer_type = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
bind->buffer_length = sizeof(int64_t);
|
||||||
|
bind->buffer = bind_bigint;
|
||||||
|
bind->length = &bind->buffer_length;
|
||||||
|
bind->is_null = NULL;
|
||||||
|
|
||||||
|
*ptr += bind->buffer_length;
|
||||||
|
} else if (0 == strncasecmp(dataType,
|
||||||
|
"FLOAT", strlen("FLOAT"))) {
|
||||||
|
float *bind_float = (float *) *ptr;
|
||||||
|
|
||||||
|
*bind_float = rand_float();
|
||||||
|
bind->buffer_type = TSDB_DATA_TYPE_FLOAT;
|
||||||
|
bind->buffer_length = sizeof(float);
|
||||||
|
bind->buffer = bind_float;
|
||||||
|
bind->length = &bind->buffer_length;
|
||||||
|
bind->is_null = NULL;
|
||||||
|
|
||||||
|
*ptr += bind->buffer_length;
|
||||||
|
} else if (0 == strncasecmp(dataType,
|
||||||
|
"DOUBLE", strlen("DOUBLE"))) {
|
||||||
|
double *bind_double = (double *)*ptr;
|
||||||
|
|
||||||
|
*bind_double = rand_double();
|
||||||
|
bind->buffer_type = TSDB_DATA_TYPE_DOUBLE;
|
||||||
|
bind->buffer_length = sizeof(double);
|
||||||
|
bind->buffer = bind_double;
|
||||||
|
bind->length = &bind->buffer_length;
|
||||||
|
bind->is_null = NULL;
|
||||||
|
|
||||||
|
*ptr += bind->buffer_length;
|
||||||
|
} else if (0 == strncasecmp(dataType,
|
||||||
|
"SMALLINT", strlen("SMALLINT"))) {
|
||||||
|
int16_t *bind_smallint = (int16_t *)*ptr;
|
||||||
|
|
||||||
|
*bind_smallint = rand_smallint();
|
||||||
|
bind->buffer_type = TSDB_DATA_TYPE_SMALLINT;
|
||||||
|
bind->buffer_length = sizeof(int16_t);
|
||||||
|
bind->buffer = bind_smallint;
|
||||||
|
bind->length = &bind->buffer_length;
|
||||||
|
bind->is_null = NULL;
|
||||||
|
|
||||||
|
*ptr += bind->buffer_length;
|
||||||
|
} else if (0 == strncasecmp(dataType,
|
||||||
|
"TINYINT", strlen("TINYINT"))) {
|
||||||
|
int8_t *bind_tinyint = (int8_t *)*ptr;
|
||||||
|
|
||||||
|
*bind_tinyint = rand_tinyint();
|
||||||
|
bind->buffer_type = TSDB_DATA_TYPE_TINYINT;
|
||||||
|
bind->buffer_length = sizeof(int8_t);
|
||||||
|
bind->buffer = bind_tinyint;
|
||||||
|
bind->length = &bind->buffer_length;
|
||||||
|
bind->is_null = NULL;
|
||||||
|
*ptr += bind->buffer_length;
|
||||||
|
} else if (0 == strncasecmp(dataType,
|
||||||
|
"BOOL", strlen("BOOL"))) {
|
||||||
|
int8_t *bind_bool = (int8_t *)*ptr;
|
||||||
|
|
||||||
|
*bind_bool = rand_bool();
|
||||||
|
bind->buffer_type = TSDB_DATA_TYPE_BOOL;
|
||||||
|
bind->buffer_length = sizeof(int8_t);
|
||||||
|
bind->buffer = bind_bool;
|
||||||
|
bind->length = &bind->buffer_length;
|
||||||
|
bind->is_null = NULL;
|
||||||
|
|
||||||
|
*ptr += bind->buffer_length;
|
||||||
|
} else if (0 == strncasecmp(dataType,
|
||||||
|
"TIMESTAMP", strlen("TIMESTAMP"))) {
|
||||||
|
int64_t *bind_ts2 = (int64_t *) *ptr;
|
||||||
|
|
||||||
|
*bind_ts2 = rand_bigint();
|
||||||
|
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
bind->buffer_length = sizeof(int64_t);
|
||||||
|
bind->buffer = bind_ts2;
|
||||||
|
bind->length = &bind->buffer_length;
|
||||||
|
bind->is_null = NULL;
|
||||||
|
|
||||||
|
*ptr += bind->buffer_length;
|
||||||
|
} else {
|
||||||
|
errorPrint( "No support data type: %s\n",
|
||||||
|
dataType);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t prepareStmtWithoutStb(
|
||||||
|
TAOS_STMT *stmt,
|
||||||
|
char *tableName,
|
||||||
|
uint32_t batch,
|
||||||
|
int64_t insertRows,
|
||||||
|
int64_t recordFrom,
|
||||||
|
int64_t startTime)
|
||||||
|
{
|
||||||
|
int ret = taos_stmt_set_tbname(stmt, tableName);
|
||||||
|
if (ret != 0) {
|
||||||
|
errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n",
|
||||||
|
tableName, ret, taos_errstr(NULL));
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
char **data_type = g_args.datatype;
|
||||||
|
|
||||||
|
char *bindArray = malloc(sizeof(TAOS_BIND) * (g_args.num_of_CPR + 1));
|
||||||
|
if (bindArray == NULL) {
|
||||||
|
errorPrint("Failed to allocate %d bind params\n",
|
||||||
|
(g_args.num_of_CPR + 1));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t k = 0;
|
||||||
|
for (k = 0; k < batch;) {
|
||||||
|
/* columnCount + 1 (ts) */
|
||||||
|
char data[MAX_DATA_SIZE];
|
||||||
|
memset(data, 0, MAX_DATA_SIZE);
|
||||||
|
|
||||||
|
char *ptr = data;
|
||||||
|
TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0);
|
||||||
|
|
||||||
|
int64_t *bind_ts;
|
||||||
|
|
||||||
|
bind_ts = (int64_t *)ptr;
|
||||||
|
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
*bind_ts = startTime + getTSRandTail(
|
||||||
|
(int64_t)DEFAULT_TIMESTAMP_STEP, k,
|
||||||
|
g_args.disorderRatio,
|
||||||
|
g_args.disorderRange);
|
||||||
|
bind->buffer_length = sizeof(int64_t);
|
||||||
|
bind->buffer = bind_ts;
|
||||||
|
bind->length = &bind->buffer_length;
|
||||||
|
bind->is_null = NULL;
|
||||||
|
|
||||||
|
ptr += bind->buffer_length;
|
||||||
|
|
||||||
|
for (int i = 0; i < g_args.num_of_CPR; i ++) {
|
||||||
|
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1)));
|
||||||
|
if ( -1 == prepareStmtBindArrayByType(
|
||||||
|
bind,
|
||||||
|
data_type[i],
|
||||||
|
g_args.len_of_binary,
|
||||||
|
&ptr)) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
|
||||||
|
// if msg > 3MB, break
|
||||||
|
taos_stmt_add_batch(stmt);
|
||||||
|
|
||||||
|
k++;
|
||||||
|
recordFrom ++;
|
||||||
|
if (recordFrom >= insertRows) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return k;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t prepareStbStmt(SSuperTable *stbInfo,
|
static int32_t prepareStbStmt(SSuperTable *stbInfo,
|
||||||
TAOS_STMT *stmt,
|
TAOS_STMT *stmt,
|
||||||
char *tableName, uint32_t batch, uint64_t insertRows,
|
char *tableName, uint32_t batch,
|
||||||
|
uint64_t insertRows,
|
||||||
uint64_t recordFrom,
|
uint64_t recordFrom,
|
||||||
int64_t startTime, char *buffer)
|
int64_t startTime, char *buffer)
|
||||||
{
|
{
|
||||||
uint32_t k;
|
int ret = taos_stmt_set_tbname(stmt, tableName);
|
||||||
int ret;
|
if (ret != 0) {
|
||||||
char *pstr = buffer;
|
errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n",
|
||||||
pstr += sprintf(pstr, "INSERT INTO %s values(?", tableName);
|
tableName, ret, taos_errstr(NULL));
|
||||||
|
|
||||||
for (int i = 0; i < stbInfo->columnCount; i++) {
|
|
||||||
pstr += sprintf(pstr, ",?");
|
|
||||||
}
|
|
||||||
pstr += sprintf(pstr, ")");
|
|
||||||
|
|
||||||
ret = taos_stmt_prepare(stmt, buffer, 0);
|
|
||||||
if (ret != 0){
|
|
||||||
errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
|
|
||||||
ret, taos_errstr(NULL));
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
|
char *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
|
||||||
if (bindArray == NULL) {
|
if (bindArray == NULL) {
|
||||||
errorPrint("Failed to allocate %d bind params\n", batch);
|
errorPrint("Failed to allocate %d bind params\n",
|
||||||
|
(stbInfo->columnCount + 1));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5274,6 +5499,8 @@ static int32_t prepareStbStmt(SSuperTable *stbInfo,
|
||||||
} else {
|
} else {
|
||||||
tsRand = false;
|
tsRand = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint32_t k;
|
||||||
for (k = 0; k < batch;) {
|
for (k = 0; k < batch;) {
|
||||||
/* columnCount + 1 (ts) */
|
/* columnCount + 1 (ts) */
|
||||||
char data[MAX_DATA_SIZE];
|
char data[MAX_DATA_SIZE];
|
||||||
|
@ -5303,135 +5530,12 @@ static int32_t prepareStbStmt(SSuperTable *stbInfo,
|
||||||
|
|
||||||
for (int i = 0; i < stbInfo->columnCount; i ++) {
|
for (int i = 0; i < stbInfo->columnCount; i ++) {
|
||||||
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1)));
|
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1)));
|
||||||
if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
if ( -1 == prepareStmtBindArrayByType(
|
||||||
"BINARY", strlen("BINARY"))) {
|
bind,
|
||||||
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
|
stbInfo->columns[i].dataType,
|
||||||
errorPrint( "binary length overflow, max size:%u\n",
|
stbInfo->columns[i].dataLen,
|
||||||
(uint32_t)TSDB_MAX_BINARY_LEN);
|
&ptr)) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
|
||||||
char *bind_binary = (char *)ptr;
|
|
||||||
rand_string(bind_binary, stbInfo->columns[i].dataLen);
|
|
||||||
|
|
||||||
bind->buffer_type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
bind->buffer_length = stbInfo->columns[i].dataLen;
|
|
||||||
bind->buffer = bind_binary;
|
|
||||||
bind->length = &bind->buffer_length;
|
|
||||||
bind->is_null = NULL;
|
|
||||||
|
|
||||||
ptr += bind->buffer_length;
|
|
||||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
|
||||||
"NCHAR", strlen("NCHAR"))) {
|
|
||||||
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
|
|
||||||
errorPrint( "nchar length overflow, max size:%u\n",
|
|
||||||
(uint32_t)TSDB_MAX_BINARY_LEN);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
char *bind_nchar = (char *)ptr;
|
|
||||||
rand_string(bind_nchar, stbInfo->columns[i].dataLen);
|
|
||||||
|
|
||||||
bind->buffer_type = TSDB_DATA_TYPE_NCHAR;
|
|
||||||
bind->buffer_length = strlen(bind_nchar);
|
|
||||||
bind->buffer = bind_nchar;
|
|
||||||
bind->length = &bind->buffer_length;
|
|
||||||
bind->is_null = NULL;
|
|
||||||
|
|
||||||
ptr += bind->buffer_length;
|
|
||||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
|
||||||
"INT", strlen("INT"))) {
|
|
||||||
int32_t *bind_int = (int32_t *)ptr;
|
|
||||||
|
|
||||||
*bind_int = rand_int();
|
|
||||||
bind->buffer_type = TSDB_DATA_TYPE_INT;
|
|
||||||
bind->buffer_length = sizeof(int32_t);
|
|
||||||
bind->buffer = bind_int;
|
|
||||||
bind->length = &bind->buffer_length;
|
|
||||||
bind->is_null = NULL;
|
|
||||||
|
|
||||||
ptr += bind->buffer_length;
|
|
||||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
|
||||||
"BIGINT", strlen("BIGINT"))) {
|
|
||||||
int64_t *bind_bigint = (int64_t *)ptr;
|
|
||||||
|
|
||||||
*bind_bigint = rand_bigint();
|
|
||||||
bind->buffer_type = TSDB_DATA_TYPE_BIGINT;
|
|
||||||
bind->buffer_length = sizeof(int64_t);
|
|
||||||
bind->buffer = bind_bigint;
|
|
||||||
bind->length = &bind->buffer_length;
|
|
||||||
bind->is_null = NULL;
|
|
||||||
ptr += bind->buffer_length;
|
|
||||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
|
||||||
"FLOAT", strlen("FLOAT"))) {
|
|
||||||
float *bind_float = (float *)ptr;
|
|
||||||
|
|
||||||
*bind_float = rand_float();
|
|
||||||
bind->buffer_type = TSDB_DATA_TYPE_FLOAT;
|
|
||||||
bind->buffer_length = sizeof(float);
|
|
||||||
bind->buffer = bind_float;
|
|
||||||
bind->length = &bind->buffer_length;
|
|
||||||
bind->is_null = NULL;
|
|
||||||
ptr += bind->buffer_length;
|
|
||||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
|
||||||
"DOUBLE", strlen("DOUBLE"))) {
|
|
||||||
double *bind_double = (double *)ptr;
|
|
||||||
|
|
||||||
*bind_double = rand_double();
|
|
||||||
bind->buffer_type = TSDB_DATA_TYPE_DOUBLE;
|
|
||||||
bind->buffer_length = sizeof(double);
|
|
||||||
bind->buffer = bind_double;
|
|
||||||
bind->length = &bind->buffer_length;
|
|
||||||
bind->is_null = NULL;
|
|
||||||
ptr += bind->buffer_length;
|
|
||||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
|
||||||
"SMALLINT", strlen("SMALLINT"))) {
|
|
||||||
int16_t *bind_smallint = (int16_t *)ptr;
|
|
||||||
|
|
||||||
*bind_smallint = rand_smallint();
|
|
||||||
bind->buffer_type = TSDB_DATA_TYPE_SMALLINT;
|
|
||||||
bind->buffer_length = sizeof(int16_t);
|
|
||||||
bind->buffer = bind_smallint;
|
|
||||||
bind->length = &bind->buffer_length;
|
|
||||||
bind->is_null = NULL;
|
|
||||||
ptr += bind->buffer_length;
|
|
||||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
|
||||||
"TINYINT", strlen("TINYINT"))) {
|
|
||||||
int8_t *bind_tinyint = (int8_t *)ptr;
|
|
||||||
|
|
||||||
*bind_tinyint = rand_tinyint();
|
|
||||||
bind->buffer_type = TSDB_DATA_TYPE_TINYINT;
|
|
||||||
bind->buffer_length = sizeof(int8_t);
|
|
||||||
bind->buffer = bind_tinyint;
|
|
||||||
bind->length = &bind->buffer_length;
|
|
||||||
bind->is_null = NULL;
|
|
||||||
ptr += bind->buffer_length;
|
|
||||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
|
||||||
"BOOL", strlen("BOOL"))) {
|
|
||||||
int8_t *bind_bool = (int8_t *)ptr;
|
|
||||||
|
|
||||||
*bind_bool = rand_bool();
|
|
||||||
bind->buffer_type = TSDB_DATA_TYPE_BOOL;
|
|
||||||
bind->buffer_length = sizeof(int8_t);
|
|
||||||
bind->buffer = bind_bool;
|
|
||||||
bind->length = &bind->buffer_length;
|
|
||||||
bind->is_null = NULL;
|
|
||||||
|
|
||||||
ptr += bind->buffer_length;
|
|
||||||
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
|
|
||||||
"TIMESTAMP", strlen("TIMESTAMP"))) {
|
|
||||||
int64_t *bind_ts2 = (int64_t *)ptr;
|
|
||||||
|
|
||||||
*bind_ts2 = rand_bigint();
|
|
||||||
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
|
|
||||||
bind->buffer_length = sizeof(int64_t);
|
|
||||||
bind->buffer = bind_ts2;
|
|
||||||
bind->length = &bind->buffer_length;
|
|
||||||
bind->is_null = NULL;
|
|
||||||
|
|
||||||
ptr += bind->buffer_length;
|
|
||||||
} else {
|
|
||||||
errorPrint( "No support data type: %s\n",
|
|
||||||
stbInfo->columns[i].dataType);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
|
taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
|
||||||
|
@ -5482,12 +5586,7 @@ static int32_t generateStbProgressiveData(
|
||||||
pSamplePos, &dataLen);
|
pSamplePos, &dataLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t prepareStmtWithoutStb(char *tableName)
|
static int32_t generateProgressiveDataWithoutStb(
|
||||||
{
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int64_t generateProgressiveDataWithoutStb(
|
|
||||||
char *tableName,
|
char *tableName,
|
||||||
/* int64_t tableSeq, */
|
/* int64_t tableSeq, */
|
||||||
threadInfo *pThreadInfo, char *buffer,
|
threadInfo *pThreadInfo, char *buffer,
|
||||||
|
@ -5520,7 +5619,7 @@ static int64_t generateProgressiveDataWithoutStb(
|
||||||
|
|
||||||
static void printStatPerThread(threadInfo *pThreadInfo)
|
static void printStatPerThread(threadInfo *pThreadInfo)
|
||||||
{
|
{
|
||||||
fprintf(stderr, "====thread[%d] completed total inserted rows: %"PRIu64 ", total affected rows: %"PRIu64". %.2f records/second====\n",
|
fprintf(stderr, "====thread[%d] completed total inserted rows: %"PRIu64 ", total affected rows: %"PRIu64". %.2f records/second====\n",
|
||||||
pThreadInfo->threadID,
|
pThreadInfo->threadID,
|
||||||
pThreadInfo->totalInsertRows,
|
pThreadInfo->totalInsertRows,
|
||||||
pThreadInfo->totalAffectedRows,
|
pThreadInfo->totalAffectedRows,
|
||||||
|
@ -5638,22 +5737,45 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
|
|
||||||
int32_t generated;
|
int32_t generated;
|
||||||
if (superTblInfo) {
|
if (superTblInfo) {
|
||||||
generated = generateStbInterlaceData(
|
if (superTblInfo->iface == STMT_IFACE) {
|
||||||
superTblInfo,
|
generated = prepareStbStmt(superTblInfo,
|
||||||
tableName, batchPerTbl, i,
|
pThreadInfo->stmt,
|
||||||
batchPerTblTimes,
|
tableName,
|
||||||
tableSeq,
|
batchPerTbl,
|
||||||
pThreadInfo, pstr,
|
insertRows, i,
|
||||||
insertRows,
|
startTime,
|
||||||
startTime,
|
pThreadInfo->buffer);
|
||||||
&remainderBufLen);
|
} else {
|
||||||
|
generated = generateStbInterlaceData(
|
||||||
|
superTblInfo,
|
||||||
|
tableName, batchPerTbl, i,
|
||||||
|
batchPerTblTimes,
|
||||||
|
tableSeq,
|
||||||
|
pThreadInfo, pstr,
|
||||||
|
insertRows,
|
||||||
|
startTime,
|
||||||
|
&remainderBufLen);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
generated = generateInterlaceDataWithoutStb(
|
if (g_args.iface == STMT_IFACE) {
|
||||||
tableName, batchPerTbl,
|
debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n",
|
||||||
tableSeq,
|
pThreadInfo->threadID,
|
||||||
pThreadInfo->db_name, pstr,
|
__func__, __LINE__,
|
||||||
insertRows,
|
tableName, batchPerTbl, startTime);
|
||||||
&remainderBufLen);
|
generated = prepareStmtWithoutStb(
|
||||||
|
pThreadInfo->stmt, tableName,
|
||||||
|
batchPerTbl,
|
||||||
|
insertRows, i,
|
||||||
|
startTime);
|
||||||
|
} else {
|
||||||
|
generated = generateInterlaceDataWithoutStb(
|
||||||
|
tableName, batchPerTbl,
|
||||||
|
tableSeq,
|
||||||
|
pThreadInfo->db_name, pstr,
|
||||||
|
insertRows,
|
||||||
|
startTime,
|
||||||
|
&remainderBufLen);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
debugPrint("[%d] %s() LN%d, generated records is %d\n",
|
debugPrint("[%d] %s() LN%d, generated records is %d\n",
|
||||||
|
@ -5668,9 +5790,10 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
|
|
||||||
tableSeq ++;
|
tableSeq ++;
|
||||||
recOfBatch += batchPerTbl;
|
recOfBatch += batchPerTbl;
|
||||||
|
|
||||||
pstr += (oldRemainderLen - remainderBufLen);
|
pstr += (oldRemainderLen - remainderBufLen);
|
||||||
// startTime += batchPerTbl * superTblInfo->timeStampStep;
|
|
||||||
pThreadInfo->totalInsertRows += batchPerTbl;
|
pThreadInfo->totalInsertRows += batchPerTbl;
|
||||||
|
|
||||||
verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
|
verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
|
||||||
pThreadInfo->threadID, __func__, __LINE__,
|
pThreadInfo->threadID, __func__, __LINE__,
|
||||||
batchPerTbl, recOfBatch);
|
batchPerTbl, recOfBatch);
|
||||||
|
@ -5824,9 +5947,11 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
int32_t generated;
|
int32_t generated;
|
||||||
if (superTblInfo) {
|
if (superTblInfo) {
|
||||||
if (superTblInfo->iface == STMT_IFACE) {
|
if (superTblInfo->iface == STMT_IFACE) {
|
||||||
generated = prepareStbStmt(superTblInfo,
|
generated = prepareStbStmt(
|
||||||
|
superTblInfo,
|
||||||
pThreadInfo->stmt,
|
pThreadInfo->stmt,
|
||||||
tableName, g_args.num_of_RPR,
|
tableName,
|
||||||
|
g_args.num_of_RPR,
|
||||||
insertRows, i, start_time, pstr);
|
insertRows, i, start_time, pstr);
|
||||||
} else {
|
} else {
|
||||||
generated = generateStbProgressiveData(
|
generated = generateStbProgressiveData(
|
||||||
|
@ -5838,7 +5963,12 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (g_args.iface == STMT_IFACE) {
|
if (g_args.iface == STMT_IFACE) {
|
||||||
generated = prepareStmtWithoutStb(tableName);
|
generated = prepareStmtWithoutStb(
|
||||||
|
pThreadInfo->stmt,
|
||||||
|
tableName,
|
||||||
|
g_args.num_of_RPR,
|
||||||
|
insertRows, i,
|
||||||
|
start_time);
|
||||||
} else {
|
} else {
|
||||||
generated = generateProgressiveDataWithoutStb(
|
generated = generateProgressiveDataWithoutStb(
|
||||||
tableName,
|
tableName,
|
||||||
|
@ -5859,13 +5989,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
|
|
||||||
startTs = taosGetTimestampMs();
|
startTs = taosGetTimestampMs();
|
||||||
|
|
||||||
int64_t affectedRows = execInsert(pThreadInfo, generated);
|
int32_t affectedRows = execInsert(pThreadInfo, generated);
|
||||||
|
|
||||||
endTs = taosGetTimestampMs();
|
endTs = taosGetTimestampMs();
|
||||||
uint64_t delay = endTs - startTs;
|
uint64_t delay = endTs - startTs;
|
||||||
performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n",
|
performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n",
|
||||||
__func__, __LINE__, delay);
|
__func__, __LINE__, delay);
|
||||||
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
|
verbosePrint("[%d] %s() LN%d affectedRows=%d\n",
|
||||||
pThreadInfo->threadID,
|
pThreadInfo->threadID,
|
||||||
__func__, __LINE__, affectedRows);
|
__func__, __LINE__, affectedRows);
|
||||||
|
|
||||||
|
@ -5875,7 +6005,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
pThreadInfo->totalDelay += delay;
|
pThreadInfo->totalDelay += delay;
|
||||||
|
|
||||||
if (affectedRows < 0) {
|
if (affectedRows < 0) {
|
||||||
errorPrint("%s() LN%d, affected rows: %"PRId64"\n",
|
errorPrint("%s() LN%d, affected rows: %d\n",
|
||||||
__func__, __LINE__, affectedRows);
|
__func__, __LINE__, affectedRows);
|
||||||
goto free_of_progressive;
|
goto free_of_progressive;
|
||||||
}
|
}
|
||||||
|
@ -6045,15 +6175,6 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
|
||||||
static void startMultiThreadInsertData(int threads, char* db_name,
|
static void startMultiThreadInsertData(int threads, char* db_name,
|
||||||
char* precision,SSuperTable* superTblInfo) {
|
char* precision,SSuperTable* superTblInfo) {
|
||||||
|
|
||||||
//TAOS* taos;
|
|
||||||
//if (0 == strncasecmp(superTblInfo->iface, "taosc", 5)) {
|
|
||||||
// taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
|
|
||||||
// if (NULL == taos) {
|
|
||||||
// printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
|
|
||||||
// exit(-1);
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
||||||
int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
|
int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
|
||||||
if (0 != precision[0]) {
|
if (0 != precision[0]) {
|
||||||
if (0 == strncasecmp(precision, "ms", 2)) {
|
if (0 == strncasecmp(precision, "ms", 2)) {
|
||||||
|
@ -6232,7 +6353,16 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((superTblInfo) && (superTblInfo->iface == STMT_IFACE)) {
|
if ((g_args.iface == STMT_IFACE)
|
||||||
|
|| ((superTblInfo) && (superTblInfo->iface == STMT_IFACE))) {
|
||||||
|
|
||||||
|
int columnCount;
|
||||||
|
if (superTblInfo) {
|
||||||
|
columnCount = superTblInfo->columnCount;
|
||||||
|
} else {
|
||||||
|
columnCount = g_args.num_of_CPR;
|
||||||
|
}
|
||||||
|
|
||||||
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
|
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
|
||||||
if (NULL == pThreadInfo->stmt) {
|
if (NULL == pThreadInfo->stmt) {
|
||||||
errorPrint(
|
errorPrint(
|
||||||
|
@ -6243,6 +6373,24 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
||||||
free(infos);
|
free(infos);
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char buffer[3000];
|
||||||
|
char *pstr = buffer;
|
||||||
|
pstr += sprintf(pstr, "INSERT INTO ? values(?");
|
||||||
|
|
||||||
|
for (int col = 0; col < columnCount; col ++) {
|
||||||
|
pstr += sprintf(pstr, ",?");
|
||||||
|
}
|
||||||
|
pstr += sprintf(pstr, ")");
|
||||||
|
|
||||||
|
int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0);
|
||||||
|
if (ret != 0){
|
||||||
|
errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
|
||||||
|
ret, taos_errstr(NULL));
|
||||||
|
free(pids);
|
||||||
|
free(infos);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pThreadInfo->taos = NULL;
|
pThreadInfo->taos = NULL;
|
||||||
|
@ -6563,7 +6711,7 @@ static int insertTestProcess() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMsleep(1000);
|
// taosMsleep(1000);
|
||||||
// create sub threads for inserting data
|
// create sub threads for inserting data
|
||||||
//start = taosGetTimestampMs();
|
//start = taosGetTimestampMs();
|
||||||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||||
|
|
Loading…
Reference in New Issue