Feature/sangshuduo/td 4068 taosdemo stmt (#6225)
* merge with develop branch. change query/tests/CMakeLists.txt to allow unused function and variable. * refactor data generating. * refactor. * refactor. * refactor. * refactor. * refactor * add prepare stmt function. * refactor get rand timestamp. * fix windows compile error. Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
This commit is contained in:
parent
93e481f7ee
commit
13004a5740
|
@ -19,6 +19,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
#include <taos.h>
|
||||||
#define _GNU_SOURCE
|
#define _GNU_SOURCE
|
||||||
#define CURL_STATICLIB
|
#define CURL_STATICLIB
|
||||||
|
|
||||||
|
@ -229,13 +230,13 @@ typedef struct SArguments_S {
|
||||||
uint32_t num_of_threads;
|
uint32_t num_of_threads;
|
||||||
uint64_t insert_interval;
|
uint64_t insert_interval;
|
||||||
int64_t query_times;
|
int64_t query_times;
|
||||||
uint64_t interlace_rows;
|
uint32_t interlace_rows;
|
||||||
uint64_t num_of_RPR; // num_of_records_per_req
|
uint32_t num_of_RPR; // num_of_records_per_req
|
||||||
uint64_t max_sql_len;
|
uint64_t max_sql_len;
|
||||||
int64_t num_of_tables;
|
int64_t num_of_tables;
|
||||||
int64_t num_of_DPT;
|
int64_t num_of_DPT;
|
||||||
int abort;
|
int abort;
|
||||||
int disorderRatio; // 0: no disorder, >0: x%
|
uint32_t disorderRatio; // 0: no disorder, >0: x%
|
||||||
int disorderRange; // ms or us by database precision
|
int disorderRange; // ms or us by database precision
|
||||||
uint32_t method_of_delete;
|
uint32_t method_of_delete;
|
||||||
char ** arg_list;
|
char ** arg_list;
|
||||||
|
@ -258,12 +259,12 @@ typedef struct SSuperTable_S {
|
||||||
uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
|
uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
|
||||||
char childTblPrefix[MAX_TB_NAME_SIZE];
|
char childTblPrefix[MAX_TB_NAME_SIZE];
|
||||||
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
|
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
|
||||||
uint16_t insertMode; // 0: taosc, 1: rest, 2: stmt
|
uint16_t iface; // 0: taosc, 1: rest, 2: stmt
|
||||||
int64_t childTblLimit;
|
int64_t childTblLimit;
|
||||||
uint64_t childTblOffset;
|
uint64_t childTblOffset;
|
||||||
|
|
||||||
// int multiThreadWriteOneTbl; // 0: no, 1: yes
|
// int multiThreadWriteOneTbl; // 0: no, 1: yes
|
||||||
uint64_t interlaceRows; //
|
uint32_t interlaceRows; //
|
||||||
int disorderRatio; // 0: no disorder, >0: x%
|
int disorderRatio; // 0: no disorder, >0: x%
|
||||||
int disorderRange; // ms or us by database precision
|
int disorderRange; // ms or us by database precision
|
||||||
uint64_t maxSqlLen; //
|
uint64_t maxSqlLen; //
|
||||||
|
@ -551,6 +552,8 @@ static void createChildTables();
|
||||||
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
|
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
|
||||||
static int postProceSql(char *host, struct sockaddr_in *pServAddr,
|
static int postProceSql(char *host, struct sockaddr_in *pServAddr,
|
||||||
uint16_t port, char* sqlstr, threadInfo *pThreadInfo);
|
uint16_t port, char* sqlstr, threadInfo *pThreadInfo);
|
||||||
|
static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq,
|
||||||
|
int disorderRatio, int disorderRange);
|
||||||
|
|
||||||
/* ************ Global variables ************ */
|
/* ************ Global variables ************ */
|
||||||
|
|
||||||
|
@ -1070,7 +1073,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
|
||||||
}
|
}
|
||||||
printf("# Insertion interval: %"PRIu64"\n",
|
printf("# Insertion interval: %"PRIu64"\n",
|
||||||
arguments->insert_interval);
|
arguments->insert_interval);
|
||||||
printf("# Number of records per req: %"PRIu64"\n",
|
printf("# Number of records per req: %ud\n",
|
||||||
arguments->num_of_RPR);
|
arguments->num_of_RPR);
|
||||||
printf("# Max SQL length: %"PRIu64"\n",
|
printf("# Max SQL length: %"PRIu64"\n",
|
||||||
arguments->max_sql_len);
|
arguments->max_sql_len);
|
||||||
|
@ -1362,7 +1365,7 @@ static int printfInsertMeta() {
|
||||||
g_Dbs.threadCountByCreateTbl);
|
g_Dbs.threadCountByCreateTbl);
|
||||||
printf("top insert interval: \033[33m%"PRIu64"\033[0m\n",
|
printf("top insert interval: \033[33m%"PRIu64"\033[0m\n",
|
||||||
g_args.insert_interval);
|
g_args.insert_interval);
|
||||||
printf("number of records per req: \033[33m%"PRIu64"\033[0m\n",
|
printf("number of records per req: \033[33m%ud\033[0m\n",
|
||||||
g_args.num_of_RPR);
|
g_args.num_of_RPR);
|
||||||
printf("max sql length: \033[33m%"PRIu64"\033[0m\n",
|
printf("max sql length: \033[33m%"PRIu64"\033[0m\n",
|
||||||
g_args.max_sql_len);
|
g_args.max_sql_len);
|
||||||
|
@ -1468,9 +1471,9 @@ static int printfInsertMeta() {
|
||||||
g_Dbs.db[i].superTbls[j].childTblPrefix);
|
g_Dbs.db[i].superTbls[j].childTblPrefix);
|
||||||
printf(" dataSource: \033[33m%s\033[0m\n",
|
printf(" dataSource: \033[33m%s\033[0m\n",
|
||||||
g_Dbs.db[i].superTbls[j].dataSource);
|
g_Dbs.db[i].superTbls[j].dataSource);
|
||||||
printf(" insertMode: \033[33m%s\033[0m\n",
|
printf(" iface: \033[33m%s\033[0m\n",
|
||||||
(g_Dbs.db[i].superTbls[j].insertMode==TAOSC_IFACE)?"taosc":
|
(g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc":
|
||||||
(g_Dbs.db[i].superTbls[j].insertMode==REST_IFACE)?"rest":"stmt");
|
(g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt");
|
||||||
if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) {
|
if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) {
|
||||||
printf(" childTblLimit: \033[33m%"PRId64"\033[0m\n",
|
printf(" childTblLimit: \033[33m%"PRId64"\033[0m\n",
|
||||||
g_Dbs.db[i].superTbls[j].childTblLimit);
|
g_Dbs.db[i].superTbls[j].childTblLimit);
|
||||||
|
@ -1488,7 +1491,7 @@ static int printfInsertMeta() {
|
||||||
printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n");
|
printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n");
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
printf(" interlaceRows: \033[33m%"PRIu64"\033[0m\n",
|
printf(" interlaceRows: \033[33m%ud\033[0m\n",
|
||||||
g_Dbs.db[i].superTbls[j].interlaceRows);
|
g_Dbs.db[i].superTbls[j].interlaceRows);
|
||||||
|
|
||||||
if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) {
|
if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) {
|
||||||
|
@ -1566,7 +1569,7 @@ static void printfInsertMetaToFile(FILE* fp) {
|
||||||
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
|
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
|
||||||
fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount);
|
fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount);
|
||||||
fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl);
|
fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl);
|
||||||
fprintf(fp, "number of records per req: %"PRIu64"\n", g_args.num_of_RPR);
|
fprintf(fp, "number of records per req: %ud\n", g_args.num_of_RPR);
|
||||||
fprintf(fp, "max sql length: %"PRIu64"\n", g_args.max_sql_len);
|
fprintf(fp, "max sql length: %"PRIu64"\n", g_args.max_sql_len);
|
||||||
fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
|
fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
|
||||||
|
|
||||||
|
@ -1658,12 +1661,12 @@ static void printfInsertMetaToFile(FILE* fp) {
|
||||||
g_Dbs.db[i].superTbls[j].childTblPrefix);
|
g_Dbs.db[i].superTbls[j].childTblPrefix);
|
||||||
fprintf(fp, " dataSource: %s\n",
|
fprintf(fp, " dataSource: %s\n",
|
||||||
g_Dbs.db[i].superTbls[j].dataSource);
|
g_Dbs.db[i].superTbls[j].dataSource);
|
||||||
fprintf(fp, " insertMode: %s\n",
|
fprintf(fp, " iface: %s\n",
|
||||||
(g_Dbs.db[i].superTbls[j].insertMode==TAOSC_IFACE)?"taosc":
|
(g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc":
|
||||||
(g_Dbs.db[i].superTbls[j].insertMode==REST_IFACE)?"rest":"stmt");
|
(g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt");
|
||||||
fprintf(fp, " insertRows: %"PRId64"\n",
|
fprintf(fp, " insertRows: %"PRId64"\n",
|
||||||
g_Dbs.db[i].superTbls[j].insertRows);
|
g_Dbs.db[i].superTbls[j].insertRows);
|
||||||
fprintf(fp, " interlace rows: %"PRIu64"\n",
|
fprintf(fp, " interlace rows: %ud\n",
|
||||||
g_Dbs.db[i].superTbls[j].interlaceRows);
|
g_Dbs.db[i].superTbls[j].interlaceRows);
|
||||||
if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) {
|
if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) {
|
||||||
fprintf(fp, " stable insert interval: %"PRIu64"\n",
|
fprintf(fp, " stable insert interval: %"PRIu64"\n",
|
||||||
|
@ -1676,7 +1679,7 @@ static void printfInsertMetaToFile(FILE* fp) {
|
||||||
fprintf(fp, " multiThreadWriteOneTbl: yes\n");
|
fprintf(fp, " multiThreadWriteOneTbl: yes\n");
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
fprintf(fp, " interlaceRows: %"PRIu64"\n",
|
fprintf(fp, " interlaceRows: %ud\n",
|
||||||
g_Dbs.db[i].superTbls[j].interlaceRows);
|
g_Dbs.db[i].superTbls[j].interlaceRows);
|
||||||
fprintf(fp, " disorderRange: %d\n",
|
fprintf(fp, " disorderRange: %d\n",
|
||||||
g_Dbs.db[i].superTbls[j].disorderRange);
|
g_Dbs.db[i].superTbls[j].disorderRange);
|
||||||
|
@ -3563,9 +3566,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
||||||
|
|
||||||
// rows per table need be less than insert batch
|
// rows per table need be less than insert batch
|
||||||
if (g_args.interlace_rows > g_args.num_of_RPR) {
|
if (g_args.interlace_rows > g_args.num_of_RPR) {
|
||||||
printf("NOTICE: interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n",
|
printf("NOTICE: interlace rows value %ud > num_of_records_per_req %ud\n\n",
|
||||||
g_args.interlace_rows, g_args.num_of_RPR);
|
g_args.interlace_rows, g_args.num_of_RPR);
|
||||||
printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n",
|
printf(" interlace rows value will be set to num_of_records_per_req %ud\n\n",
|
||||||
g_args.num_of_RPR);
|
g_args.num_of_RPR);
|
||||||
prompt();
|
prompt();
|
||||||
g_args.interlace_rows = g_args.num_of_RPR;
|
g_args.interlace_rows = g_args.num_of_RPR;
|
||||||
|
@ -3880,22 +3883,22 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt
|
cJSON *stbIface = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt
|
||||||
if (insertMode && insertMode->type == cJSON_String
|
if (stbIface && stbIface->type == cJSON_String
|
||||||
&& insertMode->valuestring != NULL) {
|
&& stbIface->valuestring != NULL) {
|
||||||
if (0 == strcasecmp(insertMode->valuestring, "taosc")) {
|
if (0 == strcasecmp(stbIface->valuestring, "taosc")) {
|
||||||
g_Dbs.db[i].superTbls[j].insertMode = TAOSC_IFACE;
|
g_Dbs.db[i].superTbls[j].iface= TAOSC_IFACE;
|
||||||
} else if (0 == strcasecmp(insertMode->valuestring, "rest")) {
|
} else if (0 == strcasecmp(stbIface->valuestring, "rest")) {
|
||||||
g_Dbs.db[i].superTbls[j].insertMode = REST_IFACE;
|
g_Dbs.db[i].superTbls[j].iface= REST_IFACE;
|
||||||
} else if (0 == strcasecmp(insertMode->valuestring, "stmt")) {
|
} else if (0 == strcasecmp(stbIface->valuestring, "stmt")) {
|
||||||
g_Dbs.db[i].superTbls[j].insertMode = STMT_IFACE;
|
g_Dbs.db[i].superTbls[j].iface= STMT_IFACE;
|
||||||
} else {
|
} else {
|
||||||
errorPrint("%s() LN%d, failed to read json, insert_mode %s not recognized\n",
|
errorPrint("%s() LN%d, failed to read json, insert_mode %s not recognized\n",
|
||||||
__func__, __LINE__, insertMode->valuestring);
|
__func__, __LINE__, stbIface->valuestring);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
} else if (!insertMode) {
|
} else if (!stbIface) {
|
||||||
g_Dbs.db[i].superTbls[j].insertMode = TAOSC_IFACE;
|
g_Dbs.db[i].superTbls[j].iface = TAOSC_IFACE;
|
||||||
} else {
|
} else {
|
||||||
errorPrint("%s", "failed to read json, insert_mode not found\n");
|
errorPrint("%s", "failed to read json, insert_mode not found\n");
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
|
@ -4032,9 +4035,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
||||||
g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint;
|
g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint;
|
||||||
// rows per table need be less than insert batch
|
// rows per table need be less than insert batch
|
||||||
if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) {
|
if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) {
|
||||||
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n",
|
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %ud > num_of_records_per_req %ud\n\n",
|
||||||
i, j, g_Dbs.db[i].superTbls[j].interlaceRows, g_args.num_of_RPR);
|
i, j, g_Dbs.db[i].superTbls[j].interlaceRows,
|
||||||
printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n",
|
g_args.num_of_RPR);
|
||||||
|
printf(" interlace rows value will be set to num_of_records_per_req %ud\n\n",
|
||||||
g_args.num_of_RPR);
|
g_args.num_of_RPR);
|
||||||
prompt();
|
prompt();
|
||||||
g_Dbs.db[i].superTbls[j].interlaceRows = g_args.num_of_RPR;
|
g_Dbs.db[i].superTbls[j].interlaceRows = g_args.num_of_RPR;
|
||||||
|
@ -4864,11 +4868,11 @@ static int64_t execInsert(threadInfo *pThreadInfo, uint64_t k)
|
||||||
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
|
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
|
||||||
__func__, __LINE__, pThreadInfo->buffer);
|
__func__, __LINE__, pThreadInfo->buffer);
|
||||||
if (superTblInfo) {
|
if (superTblInfo) {
|
||||||
if (superTblInfo->insertMode == TAOSC_IFACE) {
|
if (superTblInfo->iface == TAOSC_IFACE) {
|
||||||
affectedRows = queryDbExec(
|
affectedRows = queryDbExec(
|
||||||
pThreadInfo->taos,
|
pThreadInfo->taos,
|
||||||
pThreadInfo->buffer, INSERT_TYPE, false);
|
pThreadInfo->buffer, INSERT_TYPE, false);
|
||||||
} else if (superTblInfo->insertMode == REST_IFACE) {
|
} else if (superTblInfo->iface == REST_IFACE) {
|
||||||
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port,
|
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port,
|
||||||
pThreadInfo->buffer, NULL /* not set result file */)) {
|
pThreadInfo->buffer, NULL /* not set result file */)) {
|
||||||
affectedRows = -1;
|
affectedRows = -1;
|
||||||
|
@ -4877,7 +4881,7 @@ static int64_t execInsert(threadInfo *pThreadInfo, uint64_t k)
|
||||||
} else {
|
} else {
|
||||||
affectedRows = k;
|
affectedRows = k;
|
||||||
}
|
}
|
||||||
} else if (superTblInfo->insertMode == STMT_IFACE) {
|
} else if (superTblInfo->iface == STMT_IFACE) {
|
||||||
debugPrint("%s() LN%d, stmt=%p", __func__, __LINE__, pThreadInfo->stmt);
|
debugPrint("%s() LN%d, stmt=%p", __func__, __LINE__, pThreadInfo->stmt);
|
||||||
if (0 != taos_stmt_execute(pThreadInfo->stmt)) {
|
if (0 != taos_stmt_execute(pThreadInfo->stmt)) {
|
||||||
errorPrint("%s() LN%d, failied to execute insert statement\n",
|
errorPrint("%s() LN%d, failied to execute insert statement\n",
|
||||||
|
@ -4888,7 +4892,7 @@ static int64_t execInsert(threadInfo *pThreadInfo, uint64_t k)
|
||||||
affectedRows = k;
|
affectedRows = k;
|
||||||
} else {
|
} else {
|
||||||
errorPrint("%s() LN%d: unknown insert mode: %d\n",
|
errorPrint("%s() LN%d: unknown insert mode: %d\n",
|
||||||
__func__, __LINE__, superTblInfo->insertMode);
|
__func__, __LINE__, superTblInfo->iface);
|
||||||
affectedRows = 0;
|
affectedRows = 0;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -4924,7 +4928,7 @@ static void getTableName(char *pTblName,
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t generateDataTailWithoutStb(
|
static int64_t generateDataTailWithoutStb(
|
||||||
uint64_t batch, char* buffer,
|
uint32_t batch, char* buffer,
|
||||||
int64_t remainderBufLen, int64_t insertRows,
|
int64_t remainderBufLen, int64_t insertRows,
|
||||||
uint64_t startFrom, int64_t startTime,
|
uint64_t startFrom, int64_t startTime,
|
||||||
/* int64_t *pSamplePos, */int64_t *dataLen) {
|
/* int64_t *pSamplePos, */int64_t *dataLen) {
|
||||||
|
@ -4932,7 +4936,7 @@ static int64_t generateDataTailWithoutStb(
|
||||||
uint64_t len = 0;
|
uint64_t len = 0;
|
||||||
char *pstr = buffer;
|
char *pstr = buffer;
|
||||||
|
|
||||||
verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch);
|
verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch);
|
||||||
|
|
||||||
int64_t k = 0;
|
int64_t k = 0;
|
||||||
for (k = 0; k < batch;) {
|
for (k = 0; k < batch;) {
|
||||||
|
@ -4944,22 +4948,11 @@ static int64_t generateDataTailWithoutStb(
|
||||||
char **data_type = g_args.datatype;
|
char **data_type = g_args.datatype;
|
||||||
int lenOfBinary = g_args.len_of_binary;
|
int lenOfBinary = g_args.len_of_binary;
|
||||||
|
|
||||||
int64_t randTail = DEFAULT_TIMESTAMP_STEP * k;
|
|
||||||
|
|
||||||
if (g_args.disorderRatio != 0) {
|
|
||||||
int rand_num = taosRandom() % 100;
|
|
||||||
if (rand_num < g_args.disorderRatio) {
|
|
||||||
randTail = (randTail +
|
|
||||||
(taosRandom() % g_args.disorderRange + 1)) * (-1);
|
|
||||||
|
|
||||||
debugPrint("rand data generated, back %"PRId64"\n", randTail);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
randTail = DEFAULT_TIMESTAMP_STEP * k;
|
|
||||||
}
|
|
||||||
|
|
||||||
retLen = generateData(data, data_type,
|
retLen = generateData(data, data_type,
|
||||||
startTime + randTail,
|
startTime + getTSRandTail(
|
||||||
|
(int64_t)DEFAULT_TIMESTAMP_STEP, k,
|
||||||
|
g_args.disorderRatio,
|
||||||
|
g_args.disorderRange),
|
||||||
lenOfBinary);
|
lenOfBinary);
|
||||||
|
|
||||||
if (len > remainderBufLen)
|
if (len > remainderBufLen)
|
||||||
|
@ -4984,9 +4977,25 @@ static int64_t generateDataTailWithoutStb(
|
||||||
return k;
|
return k;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t generateStbDataTail(
|
static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq,
|
||||||
|
int disorderRatio, int disorderRange)
|
||||||
|
{
|
||||||
|
int64_t randTail = timeStampStep * seq;
|
||||||
|
if (disorderRatio > 0) {
|
||||||
|
int rand_num = taosRandom() % 100;
|
||||||
|
if(rand_num < disorderRatio) {
|
||||||
|
randTail = (randTail +
|
||||||
|
(taosRandom() % disorderRange + 1)) * (-1);
|
||||||
|
debugPrint("rand data generated, back %"PRId64"\n", randTail);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return randTail;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t generateStbDataTail(
|
||||||
SSuperTable* superTblInfo,
|
SSuperTable* superTblInfo,
|
||||||
uint64_t batch, char* buffer,
|
uint32_t batch, char* buffer,
|
||||||
int64_t remainderBufLen, int64_t insertRows,
|
int64_t remainderBufLen, int64_t insertRows,
|
||||||
uint64_t startFrom, int64_t startTime,
|
uint64_t startFrom, int64_t startTime,
|
||||||
int64_t *pSamplePos, int64_t *dataLen) {
|
int64_t *pSamplePos, int64_t *dataLen) {
|
||||||
|
@ -4994,37 +5003,35 @@ static int64_t generateStbDataTail(
|
||||||
|
|
||||||
char *pstr = buffer;
|
char *pstr = buffer;
|
||||||
|
|
||||||
verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch);
|
bool tsRand;
|
||||||
|
if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
|
||||||
|
tsRand = true;
|
||||||
|
} else {
|
||||||
|
tsRand = false;
|
||||||
|
}
|
||||||
|
verbosePrint("%s() LN%d batch=%ud\n", __func__, __LINE__, batch);
|
||||||
|
|
||||||
int64_t k = 0;
|
int32_t k = 0;
|
||||||
for (k = 0; k < batch;) {
|
for (k = 0; k < batch;) {
|
||||||
char data[MAX_DATA_SIZE];
|
char data[MAX_DATA_SIZE];
|
||||||
memset(data, 0, MAX_DATA_SIZE);
|
memset(data, 0, MAX_DATA_SIZE);
|
||||||
|
|
||||||
int64_t retLen = 0;
|
int64_t retLen = 0;
|
||||||
|
|
||||||
if (0 == strncasecmp(superTblInfo->dataSource,
|
if (tsRand) {
|
||||||
"sample", strlen("sample"))) {
|
retLen = generateStbRowData(superTblInfo, data,
|
||||||
retLen = getRowDataFromSample(
|
startTime + getTSRandTail(
|
||||||
|
superTblInfo->timeStampStep, k,
|
||||||
|
superTblInfo->disorderRatio,
|
||||||
|
superTblInfo->disorderRange)
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
retLen = getRowDataFromSample(
|
||||||
data,
|
data,
|
||||||
remainderBufLen,
|
remainderBufLen,
|
||||||
startTime + superTblInfo->timeStampStep * k,
|
startTime + superTblInfo->timeStampStep * k,
|
||||||
superTblInfo,
|
superTblInfo,
|
||||||
pSamplePos);
|
pSamplePos);
|
||||||
} else if (0 == strncasecmp(superTblInfo->dataSource,
|
|
||||||
"rand", strlen("rand"))) {
|
|
||||||
int64_t randTail = superTblInfo->timeStampStep * k;
|
|
||||||
if (superTblInfo->disorderRatio > 0) {
|
|
||||||
int rand_num = taosRandom() % 100;
|
|
||||||
if(rand_num < superTblInfo->disorderRatio) {
|
|
||||||
randTail = (randTail +
|
|
||||||
(taosRandom() % superTblInfo->disorderRange + 1)) * (-1);
|
|
||||||
debugPrint("rand data generated, back %"PRId64"\n", randTail);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t d = startTime + randTail;
|
|
||||||
retLen = generateStbRowData(superTblInfo, data, d);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (retLen > remainderBufLen) {
|
if (retLen > remainderBufLen) {
|
||||||
|
@ -5036,7 +5043,7 @@ static int64_t generateStbDataTail(
|
||||||
len += retLen;
|
len += retLen;
|
||||||
remainderBufLen -= retLen;
|
remainderBufLen -= retLen;
|
||||||
|
|
||||||
verbosePrint("%s() LN%d len=%"PRIu64" k=%"PRIu64" \nbuffer=%s\n",
|
verbosePrint("%s() LN%d len=%"PRIu64" k=%ud \nbuffer=%s\n",
|
||||||
__func__, __LINE__, len, k, buffer);
|
__func__, __LINE__, len, k, buffer);
|
||||||
|
|
||||||
startFrom ++;
|
startFrom ++;
|
||||||
|
@ -5133,9 +5140,11 @@ static int generateStbSQLHead(
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t generateStbInterlaceData(
|
static int32_t generateStbInterlaceData(
|
||||||
SSuperTable *superTblInfo,
|
SSuperTable *superTblInfo,
|
||||||
char *tableName, uint64_t batchPerTbl, uint64_t i, uint64_t batchPerTblTimes,
|
char *tableName, uint32_t batchPerTbl,
|
||||||
|
uint64_t i,
|
||||||
|
uint32_t batchPerTblTimes,
|
||||||
uint64_t tableSeq,
|
uint64_t tableSeq,
|
||||||
threadInfo *pThreadInfo, char *buffer,
|
threadInfo *pThreadInfo, char *buffer,
|
||||||
int64_t insertRows,
|
int64_t insertRows,
|
||||||
|
@ -5162,7 +5171,7 @@ static int64_t generateStbInterlaceData(
|
||||||
|
|
||||||
int64_t dataLen = 0;
|
int64_t dataLen = 0;
|
||||||
|
|
||||||
verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%"PRIu64" batchPerTbl = %"PRIu64"\n",
|
verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%ud batchPerTbl = %ud\n",
|
||||||
pThreadInfo->threadID, __func__, __LINE__,
|
pThreadInfo->threadID, __func__, __LINE__,
|
||||||
i, batchPerTblTimes, batchPerTbl);
|
i, batchPerTblTimes, batchPerTbl);
|
||||||
|
|
||||||
|
@ -5170,7 +5179,7 @@ static int64_t generateStbInterlaceData(
|
||||||
startTime = taosGetTimestamp(pThreadInfo->time_precision);
|
startTime = taosGetTimestamp(pThreadInfo->time_precision);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t k = generateStbDataTail(
|
int32_t k = generateStbDataTail(
|
||||||
superTblInfo,
|
superTblInfo,
|
||||||
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
|
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
|
||||||
startTime,
|
startTime,
|
||||||
|
@ -5180,7 +5189,7 @@ static int64_t generateStbInterlaceData(
|
||||||
pstr += dataLen;
|
pstr += dataLen;
|
||||||
*pRemainderBufLen -= dataLen;
|
*pRemainderBufLen -= dataLen;
|
||||||
} else {
|
} else {
|
||||||
debugPrint("%s() LN%d, generated data tail: %"PRIu64", not equal batch per table: %"PRIu64"\n",
|
debugPrint("%s() LN%d, generated data tail: %ud, not equal batch per table: %ud\n",
|
||||||
__func__, __LINE__, k, batchPerTbl);
|
__func__, __LINE__, k, batchPerTbl);
|
||||||
pstr -= headLen;
|
pstr -= headLen;
|
||||||
pstr[0] = '\0';
|
pstr[0] = '\0';
|
||||||
|
@ -5191,7 +5200,7 @@ static int64_t generateStbInterlaceData(
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t generateInterlaceDataWithoutStb(
|
static int64_t generateInterlaceDataWithoutStb(
|
||||||
char *tableName, uint64_t batchPerTbl,
|
char *tableName, uint32_t batchPerTbl,
|
||||||
uint64_t tableSeq,
|
uint64_t tableSeq,
|
||||||
char *dbName, char *buffer,
|
char *dbName, char *buffer,
|
||||||
int64_t insertRows,
|
int64_t insertRows,
|
||||||
|
@ -5223,7 +5232,7 @@ static int64_t generateInterlaceDataWithoutStb(
|
||||||
pstr += dataLen;
|
pstr += dataLen;
|
||||||
*pRemainderBufLen -= dataLen;
|
*pRemainderBufLen -= dataLen;
|
||||||
} else {
|
} else {
|
||||||
debugPrint("%s() LN%d, generated data tail: %"PRIu64", not equal batch per table: %"PRIu64"\n",
|
debugPrint("%s() LN%d, generated data tail: %"PRIu64", not equal batch per table: %ud\n",
|
||||||
__func__, __LINE__, k, batchPerTbl);
|
__func__, __LINE__, k, batchPerTbl);
|
||||||
pstr -= headLen;
|
pstr -= headLen;
|
||||||
pstr[0] = '\0';
|
pstr[0] = '\0';
|
||||||
|
@ -5233,7 +5242,71 @@ static int64_t generateInterlaceDataWithoutStb(
|
||||||
return k;
|
return k;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t generateStbProgressiveData(
|
static int32_t prepareStbStmt(SSuperTable *stbInfo,
|
||||||
|
TAOS_STMT *stmt,
|
||||||
|
char *tableName, uint32_t batch, uint64_t insertRows,
|
||||||
|
int64_t startTime, char *buffer)
|
||||||
|
{
|
||||||
|
uint32_t k;
|
||||||
|
int ret;
|
||||||
|
char *pstr = buffer;
|
||||||
|
pstr += sprintf(pstr, "INSERT INTO %s values(?", tableName);
|
||||||
|
|
||||||
|
for (int i = 0; i < stbInfo->columnCount; i++) {
|
||||||
|
pstr += sprintf(pstr, ",?");
|
||||||
|
}
|
||||||
|
pstr += sprintf(pstr, ")");
|
||||||
|
|
||||||
|
ret = taos_stmt_prepare(stmt, buffer, 0);
|
||||||
|
if (ret != 0){
|
||||||
|
errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
|
||||||
|
ret, taos_errstr(NULL));
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
|
||||||
|
if (bindArray == NULL) {
|
||||||
|
errorPrint("Failed to allocate %d bind params\n", batch);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool tsRand;
|
||||||
|
if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) {
|
||||||
|
tsRand = true;
|
||||||
|
} else {
|
||||||
|
tsRand = false;
|
||||||
|
}
|
||||||
|
for (k = 0; k < batch; k++) {
|
||||||
|
/* columnCount + 1 (ts) */
|
||||||
|
for (int i = 0; i <= stbInfo->columnCount; i ++) {
|
||||||
|
TAOS_BIND *bind = (TAOS_BIND *)bindArray + (sizeof(TAOS_BIND) * i);
|
||||||
|
if (i == 0) {
|
||||||
|
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
int64_t ts;
|
||||||
|
if (tsRand) {
|
||||||
|
ts = startTime + getTSRandTail(
|
||||||
|
stbInfo->timeStampStep, k,
|
||||||
|
stbInfo->disorderRatio,
|
||||||
|
stbInfo->disorderRange);
|
||||||
|
} else {
|
||||||
|
ts = startTime + stbInfo->timeStampStep * k;
|
||||||
|
}
|
||||||
|
bind->buffer = &ts;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// if msg > 3MB, break
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_stmt_bind_param(stmt, bindArray);
|
||||||
|
taos_stmt_add_batch(stmt);
|
||||||
|
|
||||||
|
return k;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t generateStbProgressiveData(
|
||||||
SSuperTable *superTblInfo,
|
SSuperTable *superTblInfo,
|
||||||
char *tableName,
|
char *tableName,
|
||||||
int64_t tableSeq,
|
int64_t tableSeq,
|
||||||
|
@ -5267,12 +5340,17 @@ static int64_t generateStbProgressiveData(
|
||||||
pSamplePos, &dataLen);
|
pSamplePos, &dataLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int64_t prepareStmtWithoutStb(char *tableName)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
static int64_t generateProgressiveDataWithoutStb(
|
static int64_t generateProgressiveDataWithoutStb(
|
||||||
char *tableName,
|
char *tableName,
|
||||||
int64_t tableSeq,
|
/* int64_t tableSeq, */
|
||||||
threadInfo *pThreadInfo, char *buffer,
|
threadInfo *pThreadInfo, char *buffer,
|
||||||
int64_t insertRows,
|
int64_t insertRows,
|
||||||
uint64_t startFrom, int64_t startTime, int64_t *pSamplePos,
|
uint64_t startFrom, int64_t startTime, /*int64_t *pSamplePos, */
|
||||||
int64_t *pRemainderBufLen)
|
int64_t *pRemainderBufLen)
|
||||||
{
|
{
|
||||||
assert(buffer != NULL);
|
assert(buffer != NULL);
|
||||||
|
@ -5313,7 +5391,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
pThreadInfo->threadID, __func__, __LINE__);
|
pThreadInfo->threadID, __func__, __LINE__);
|
||||||
|
|
||||||
int64_t insertRows;
|
int64_t insertRows;
|
||||||
uint64_t interlaceRows;
|
uint32_t interlaceRows;
|
||||||
uint64_t maxSqlLen;
|
uint64_t maxSqlLen;
|
||||||
int64_t nTimeStampStep;
|
int64_t nTimeStampStep;
|
||||||
uint64_t insert_interval;
|
uint64_t insert_interval;
|
||||||
|
@ -5351,8 +5429,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
if (interlaceRows > g_args.num_of_RPR)
|
if (interlaceRows > g_args.num_of_RPR)
|
||||||
interlaceRows = g_args.num_of_RPR;
|
interlaceRows = g_args.num_of_RPR;
|
||||||
|
|
||||||
uint64_t batchPerTbl = interlaceRows;
|
uint32_t batchPerTbl = interlaceRows;
|
||||||
uint64_t batchPerTblTimes;
|
uint32_t batchPerTblTimes;
|
||||||
|
|
||||||
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
|
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
|
||||||
batchPerTblTimes =
|
batchPerTblTimes =
|
||||||
|
@ -5401,9 +5479,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
pstr += len;
|
pstr += len;
|
||||||
remainderBufLen -= len;
|
remainderBufLen -= len;
|
||||||
|
|
||||||
uint64_t recOfBatch = 0;
|
uint32_t recOfBatch = 0;
|
||||||
|
|
||||||
for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
|
for (uint32_t i = 0; i < batchPerTblTimes; i ++) {
|
||||||
char tableName[TSDB_TABLE_NAME_LEN];
|
char tableName[TSDB_TABLE_NAME_LEN];
|
||||||
|
|
||||||
getTableName(tableName, pThreadInfo, tableSeq);
|
getTableName(tableName, pThreadInfo, tableSeq);
|
||||||
|
@ -5416,11 +5494,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
|
|
||||||
uint64_t oldRemainderLen = remainderBufLen;
|
uint64_t oldRemainderLen = remainderBufLen;
|
||||||
|
|
||||||
int64_t generated;
|
int32_t generated;
|
||||||
if (superTblInfo) {
|
if (superTblInfo) {
|
||||||
generated = generateStbInterlaceData(
|
generated = generateStbInterlaceData(
|
||||||
superTblInfo,
|
superTblInfo,
|
||||||
tableName, batchPerTbl, i, batchPerTblTimes,
|
tableName, batchPerTbl, i,
|
||||||
|
batchPerTblTimes,
|
||||||
tableSeq,
|
tableSeq,
|
||||||
pThreadInfo, pstr,
|
pThreadInfo, pstr,
|
||||||
insertRows,
|
insertRows,
|
||||||
|
@ -5435,10 +5514,10 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
&remainderBufLen);
|
&remainderBufLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
debugPrint("[%d] %s() LN%d, generated records is %"PRId64"\n",
|
debugPrint("[%d] %s() LN%d, generated records is %d\n",
|
||||||
pThreadInfo->threadID, __func__, __LINE__, generated);
|
pThreadInfo->threadID, __func__, __LINE__, generated);
|
||||||
if (generated < 0) {
|
if (generated < 0) {
|
||||||
errorPrint("[%d] %s() LN%d, generated records is %"PRId64"\n",
|
errorPrint("[%d] %s() LN%d, generated records is %d\n",
|
||||||
pThreadInfo->threadID, __func__, __LINE__, generated);
|
pThreadInfo->threadID, __func__, __LINE__, generated);
|
||||||
goto free_of_interlace;
|
goto free_of_interlace;
|
||||||
} else if (generated == 0) {
|
} else if (generated == 0) {
|
||||||
|
@ -5450,7 +5529,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
pstr += (oldRemainderLen - remainderBufLen);
|
pstr += (oldRemainderLen - remainderBufLen);
|
||||||
// startTime += batchPerTbl * superTblInfo->timeStampStep;
|
// startTime += batchPerTbl * superTblInfo->timeStampStep;
|
||||||
pThreadInfo->totalInsertRows += batchPerTbl;
|
pThreadInfo->totalInsertRows += batchPerTbl;
|
||||||
verbosePrint("[%d] %s() LN%d batchPerTbl=%"PRId64" recOfBatch=%"PRId64"\n",
|
verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
|
||||||
pThreadInfo->threadID, __func__, __LINE__,
|
pThreadInfo->threadID, __func__, __LINE__,
|
||||||
batchPerTbl, recOfBatch);
|
batchPerTbl, recOfBatch);
|
||||||
|
|
||||||
|
@ -5466,7 +5545,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
if (generatedRecPerTbl >= insertRows)
|
if (generatedRecPerTbl >= insertRows)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
int remainRows = insertRows - generatedRecPerTbl;
|
int64_t remainRows = insertRows - generatedRecPerTbl;
|
||||||
if ((remainRows > 0) && (batchPerTbl > remainRows))
|
if ((remainRows > 0) && (batchPerTbl > remainRows))
|
||||||
batchPerTbl = remainRows;
|
batchPerTbl = remainRows;
|
||||||
|
|
||||||
|
@ -5482,7 +5561,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
verbosePrint("[%d] %s() LN%d recOfBatch=%"PRIu64" totalInsertRows=%"PRIu64"\n",
|
verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n",
|
||||||
pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
|
pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
|
||||||
pThreadInfo->totalInsertRows);
|
pThreadInfo->totalInsertRows);
|
||||||
verbosePrint("[%d] %s() LN%d, buffer=%s\n",
|
verbosePrint("[%d] %s() LN%d, buffer=%s\n",
|
||||||
|
@ -5491,7 +5570,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
startTs = taosGetTimestampMs();
|
startTs = taosGetTimestampMs();
|
||||||
|
|
||||||
if (recOfBatch == 0) {
|
if (recOfBatch == 0) {
|
||||||
errorPrint("[%d] %s() LN%d try inserting records of batch is %"PRIu64"\n",
|
errorPrint("[%d] %s() LN%d try inserting records of batch is %d\n",
|
||||||
pThreadInfo->threadID, __func__, __LINE__,
|
pThreadInfo->threadID, __func__, __LINE__,
|
||||||
recOfBatch);
|
recOfBatch);
|
||||||
errorPrint("%s\n", "\tPlease check if the batch or the buffer length is proper value!\n");
|
errorPrint("%s\n", "\tPlease check if the batch or the buffer length is proper value!\n");
|
||||||
|
@ -5513,7 +5592,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
pThreadInfo->totalDelay += delay;
|
pThreadInfo->totalDelay += delay;
|
||||||
|
|
||||||
if (recOfBatch != affectedRows) {
|
if (recOfBatch != affectedRows) {
|
||||||
errorPrint("[%d] %s() LN%d execInsert insert %"PRIu64", affected rows: %"PRId64"\n%s\n",
|
errorPrint("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n%s\n",
|
||||||
pThreadInfo->threadID, __func__, __LINE__,
|
pThreadInfo->threadID, __func__, __LINE__,
|
||||||
recOfBatch, affectedRows, pThreadInfo->buffer);
|
recOfBatch, affectedRows, pThreadInfo->buffer);
|
||||||
goto free_of_interlace;
|
goto free_of_interlace;
|
||||||
|
@ -5574,12 +5653,6 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
uint64_t startTs = taosGetTimestampMs();
|
uint64_t startTs = taosGetTimestampMs();
|
||||||
uint64_t endTs;
|
uint64_t endTs;
|
||||||
|
|
||||||
/* int insert_interval =
|
|
||||||
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
|
|
||||||
uint64_t st = 0;
|
|
||||||
uint64_t et = 0xffffffff;
|
|
||||||
*/
|
|
||||||
|
|
||||||
pThreadInfo->totalInsertRows = 0;
|
pThreadInfo->totalInsertRows = 0;
|
||||||
pThreadInfo->totalAffectedRows = 0;
|
pThreadInfo->totalAffectedRows = 0;
|
||||||
|
|
||||||
|
@ -5606,20 +5679,33 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
pstr += len;
|
pstr += len;
|
||||||
remainderBufLen -= len;
|
remainderBufLen -= len;
|
||||||
|
|
||||||
int64_t generated;
|
int32_t generated;
|
||||||
if (superTblInfo) {
|
if (superTblInfo) {
|
||||||
generated = generateStbProgressiveData(
|
if (superTblInfo->iface == STMT_IFACE) {
|
||||||
superTblInfo,
|
generated = prepareStbStmt(superTblInfo,
|
||||||
tableName, tableSeq, pThreadInfo->db_name, pstr, insertRows,
|
pThreadInfo->stmt,
|
||||||
i, start_time,
|
tableName, g_args.num_of_RPR,
|
||||||
&(pThreadInfo->samplePos),
|
insertRows, start_time, pstr);
|
||||||
&remainderBufLen);
|
} else {
|
||||||
|
generated = generateStbProgressiveData(
|
||||||
|
superTblInfo,
|
||||||
|
tableName, tableSeq, pThreadInfo->db_name, pstr,
|
||||||
|
insertRows, i, start_time,
|
||||||
|
&(pThreadInfo->samplePos),
|
||||||
|
&remainderBufLen);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
generated = generateProgressiveDataWithoutStb(
|
if (g_args.iface == STMT_IFACE) {
|
||||||
tableName, tableSeq, pThreadInfo, pstr, insertRows,
|
generated = prepareStmtWithoutStb(tableName);
|
||||||
i, start_time,
|
} else {
|
||||||
&(pThreadInfo->samplePos),
|
generated = generateProgressiveDataWithoutStb(
|
||||||
&remainderBufLen);
|
tableName,
|
||||||
|
/* tableSeq, */
|
||||||
|
pThreadInfo, pstr, insertRows,
|
||||||
|
i, start_time,
|
||||||
|
/* &(pThreadInfo->samplePos), */
|
||||||
|
&remainderBufLen);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (generated > 0)
|
if (generated > 0)
|
||||||
i += generated;
|
i += generated;
|
||||||
|
@ -5687,7 +5773,7 @@ static void* syncWrite(void *sarg) {
|
||||||
threadInfo *pThreadInfo = (threadInfo *)sarg;
|
threadInfo *pThreadInfo = (threadInfo *)sarg;
|
||||||
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
||||||
|
|
||||||
int interlaceRows;
|
uint32_t interlaceRows;
|
||||||
|
|
||||||
if (superTblInfo) {
|
if (superTblInfo) {
|
||||||
if ((superTblInfo->interlaceRows == 0)
|
if ((superTblInfo->interlaceRows == 0)
|
||||||
|
@ -5818,7 +5904,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
||||||
char* precision,SSuperTable* superTblInfo) {
|
char* precision,SSuperTable* superTblInfo) {
|
||||||
|
|
||||||
//TAOS* taos;
|
//TAOS* taos;
|
||||||
//if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
|
//if (0 == strncasecmp(superTblInfo->iface, "taosc", 5)) {
|
||||||
// taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
|
// taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
|
||||||
// if (NULL == taos) {
|
// if (NULL == taos) {
|
||||||
// printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
|
// printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
|
||||||
|
@ -5963,8 +6049,9 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((superTblInfo)
|
if ((superTblInfo)
|
||||||
&& (superTblInfo->insertMode == REST_IFACE)) {
|
&& (superTblInfo->iface == REST_IFACE)) {
|
||||||
if (convertHostToServAddr(g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) {
|
if (convertHostToServAddr(
|
||||||
|
g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) {
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5989,8 +6076,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
||||||
pThreadInfo->minDelay = UINT64_MAX;
|
pThreadInfo->minDelay = UINT64_MAX;
|
||||||
|
|
||||||
if ((NULL == superTblInfo) ||
|
if ((NULL == superTblInfo) ||
|
||||||
(superTblInfo->insertMode != REST_IFACE)) {
|
(superTblInfo->iface != REST_IFACE)) {
|
||||||
//pThreadInfo->taos = taos;
|
//t_info->taos = taos;
|
||||||
pThreadInfo->taos = taos_connect(
|
pThreadInfo->taos = taos_connect(
|
||||||
g_Dbs.host, g_Dbs.user,
|
g_Dbs.host, g_Dbs.user,
|
||||||
g_Dbs.password, db_name, g_Dbs.port);
|
g_Dbs.password, db_name, g_Dbs.port);
|
||||||
|
@ -6003,7 +6090,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((superTblInfo) && (superTblInfo->insertMode == STMT_IFACE)) {
|
if ((superTblInfo) && (superTblInfo->iface == STMT_IFACE)) {
|
||||||
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
|
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
|
||||||
if (NULL == pThreadInfo->stmt) {
|
if (NULL == pThreadInfo->stmt) {
|
||||||
errorPrint(
|
errorPrint(
|
||||||
|
@ -7252,7 +7339,7 @@ static void setParaFromArg(){
|
||||||
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
|
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
|
||||||
g_args.tb_prefix, MAX_TB_NAME_SIZE);
|
g_args.tb_prefix, MAX_TB_NAME_SIZE);
|
||||||
tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE);
|
tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE);
|
||||||
g_Dbs.db[0].superTbls[0].insertMode = g_args.iface;
|
g_Dbs.db[0].superTbls[0].iface = g_args.iface;
|
||||||
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
|
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
|
||||||
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
|
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
|
||||||
g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
|
g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
|
||||||
|
|
Loading…
Reference in New Issue