[TD-3147] <fix>: support insert interval. support no-stb in middle.
This commit is contained in:
parent
9f8f962e22
commit
5785339857
|
@ -172,6 +172,7 @@ typedef struct {
|
|||
/* Used by main to communicate with parse_opt. */
|
||||
typedef struct SArguments_S {
|
||||
char * metaFile;
|
||||
int test_mode;
|
||||
char * host;
|
||||
uint16_t port;
|
||||
char * user;
|
||||
|
@ -323,9 +324,6 @@ typedef struct SDbs_S {
|
|||
int dbCount;
|
||||
SDataBase db[MAX_DB_COUNT];
|
||||
|
||||
int insert_interval;
|
||||
int num_of_RPR;
|
||||
|
||||
// statistics
|
||||
int64_t totalRowsInserted;
|
||||
int64_t totalAffectedRows;
|
||||
|
@ -473,7 +471,9 @@ double randdouble[MAX_PREPARED_RAND];
|
|||
char *aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)",
|
||||
"max(col0)", "min(col0)", "first(col0)", "last(col0)"};
|
||||
|
||||
SArguments g_args = {NULL,
|
||||
SArguments g_args = {
|
||||
NULL, // metaFile
|
||||
0, // test_mode
|
||||
"127.0.0.1", // host
|
||||
6030, // port
|
||||
"root", // user
|
||||
|
@ -519,7 +519,6 @@ SArguments g_args = {NULL,
|
|||
};
|
||||
|
||||
|
||||
static int g_jsonType = 0;
|
||||
|
||||
static SDbs g_Dbs;
|
||||
static int g_totalChildTables = 0;
|
||||
|
@ -729,6 +728,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
|
|||
|
||||
if (arguments->debug_print) {
|
||||
printf("###################################################################\n");
|
||||
printf("# meta file: %s\n", arguments->metaFile);
|
||||
printf("# Server IP: %s:%hu\n",
|
||||
arguments->host == NULL ? "localhost" : arguments->host,
|
||||
arguments->port );
|
||||
|
@ -793,6 +793,7 @@ static int queryDbExec(TAOS *taos, char *command, int type) {
|
|||
}
|
||||
|
||||
if (code != 0) {
|
||||
debugPrint("DEBUG %s() %d - command: %s\n", __func__, __LINE__, command);
|
||||
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res));
|
||||
taos_free_result(res);
|
||||
//taos_close(taos);
|
||||
|
@ -949,16 +950,37 @@ static void init_rand_data() {
|
|||
}
|
||||
}
|
||||
|
||||
#define SHOW_PARSE_RESULT_START() \
|
||||
do { if (g_args.metaFile) \
|
||||
printf("\033[1m\033[40;32m================ %s parse result START ================\033[0m\n", \
|
||||
g_args.metaFile); } while(0)
|
||||
|
||||
#define SHOW_PARSE_RESULT_END() \
|
||||
do { if (g_args.metaFile) \
|
||||
printf("\033[1m\033[40;32m================ %s parse result END================\033[0m\n", \
|
||||
g_args.metaFile); } while(0)
|
||||
|
||||
#define SHOW_PARSE_RESULT_START_TO_FILE(fp) \
|
||||
do { if (g_args.metaFile) \
|
||||
fprintf(fp, "\033[1m\033[40;32m================ %s parse result START ================\033[0m\n", \
|
||||
g_args.metaFile); } while(0)
|
||||
|
||||
#define SHOW_PARSE_RESULT_END_TO_FILE(fp) \
|
||||
do { if (g_args.metaFile) \
|
||||
fprintf(fp, "\033[1m\033[40;32m================ %s parse result END================\033[0m\n", \
|
||||
g_args.metaFile); } while(0)
|
||||
|
||||
static int printfInsertMeta() {
|
||||
printf("\033[1m\033[40;32m================ insert.json parse result START ================\033[0m\n");
|
||||
SHOW_PARSE_RESULT_START();
|
||||
|
||||
printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port);
|
||||
printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
|
||||
printf("password: \033[33m%s\033[0m\n", g_Dbs.password);
|
||||
printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile);
|
||||
printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount);
|
||||
printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl);
|
||||
printf("insert interval: \033[33m%d\033[0m\n", g_Dbs.insert_interval);
|
||||
printf("number of records per req: \033[33m%d\033[0m\n", g_Dbs.num_of_RPR);
|
||||
printf("insert interval: \033[33m%d\033[0m\n", g_args.insert_interval);
|
||||
printf("number of records per req: \033[33m%d\033[0m\n", g_args.num_of_RPR);
|
||||
|
||||
printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
|
||||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||
|
@ -1007,10 +1029,12 @@ static int printfInsertMeta() {
|
|||
printf(" quorum: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.quorum);
|
||||
}
|
||||
if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
|
||||
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
|
||||
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
|
||||
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
|
||||
printf(" precision: \033[33m%s\033[0m\n", g_Dbs.db[i].dbCfg.precision);
|
||||
} else {
|
||||
printf(" precision error: \033[33m%s\033[0m\n", g_Dbs.db[i].dbCfg.precision);
|
||||
printf("\033[1m\033[40;31m precision error: %s\033[0m\n",
|
||||
g_Dbs.db[i].dbCfg.precision);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -1063,34 +1087,43 @@ static int printfInsertMeta() {
|
|||
printf(" columnCount: \033[33m%d\033[0m\n ", g_Dbs.db[i].superTbls[j].columnCount);
|
||||
for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) {
|
||||
//printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
|
||||
if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "binary", 6)) || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "nchar", 5))) {
|
||||
printf("column[\033[33m%d\033[0m]:\033[33m%s(%d)\033[0m ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
|
||||
if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "binary", 6))
|
||||
|| (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "nchar", 5))) {
|
||||
printf("column[\033[33m%d\033[0m]:\033[33m%s(%d)\033[0m ", k,
|
||||
g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
|
||||
} else {
|
||||
printf("column[%d]:\033[33m%s\033[0m ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType);
|
||||
printf("column[%d]:\033[33m%s\033[0m ", k,
|
||||
g_Dbs.db[i].superTbls[j].columns[k].dataType);
|
||||
}
|
||||
}
|
||||
printf("\n");
|
||||
|
||||
printf(" tagCount: \033[33m%d\033[0m\n ", g_Dbs.db[i].superTbls[j].tagCount);
|
||||
printf(" tagCount: \033[33m%d\033[0m\n ",
|
||||
g_Dbs.db[i].superTbls[j].tagCount);
|
||||
for (int k = 0; k < g_Dbs.db[i].superTbls[j].tagCount; k++) {
|
||||
//printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].tags[k].dataType, g_Dbs.db[i].superTbls[j].tags[k].dataLen);
|
||||
if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "binary", 6)) || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "nchar", 5))) {
|
||||
printf("tag[%d]:\033[33m%s(%d)\033[0m ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType, g_Dbs.db[i].superTbls[j].tags[k].dataLen);
|
||||
if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "binary", 6))
|
||||
|| (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "nchar", 5))) {
|
||||
printf("tag[%d]:\033[33m%s(%d)\033[0m ", k,
|
||||
g_Dbs.db[i].superTbls[j].tags[k].dataType, g_Dbs.db[i].superTbls[j].tags[k].dataLen);
|
||||
} else {
|
||||
printf("tag[%d]:\033[33m%s\033[0m ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType);
|
||||
printf("tag[%d]:\033[33m%s\033[0m ", k,
|
||||
g_Dbs.db[i].superTbls[j].tags[k].dataType);
|
||||
}
|
||||
}
|
||||
printf("\n");
|
||||
}
|
||||
printf("\n");
|
||||
}
|
||||
printf("\033[1m\033[40;32m================ insert.json parse result END================\033[0m\n");
|
||||
|
||||
SHOW_PARSE_RESULT_END();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void printfInsertMetaToFile(FILE* fp) {
|
||||
fprintf(fp, "================ insert.json parse result START================\n");
|
||||
SHOW_PARSE_RESULT_START_TO_FILE(fp);
|
||||
|
||||
fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port);
|
||||
fprintf(fp, "user: %s\n", g_Dbs.user);
|
||||
fprintf(fp, "password: %s\n", g_Dbs.password);
|
||||
|
@ -1145,7 +1178,8 @@ static void printfInsertMetaToFile(FILE* fp) {
|
|||
fprintf(fp, " quorum: %d\n", g_Dbs.db[i].dbCfg.quorum);
|
||||
}
|
||||
if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
|
||||
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
|
||||
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
|
||||
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
|
||||
fprintf(fp, " precision: %s\n", g_Dbs.db[i].dbCfg.precision);
|
||||
} else {
|
||||
fprintf(fp, " precision error: %s\n", g_Dbs.db[i].dbCfg.precision);
|
||||
|
@ -1221,11 +1255,11 @@ static void printfInsertMetaToFile(FILE* fp) {
|
|||
}
|
||||
fprintf(fp, "\n");
|
||||
}
|
||||
fprintf(fp, "================ insert.json parse result END ================\n\n");
|
||||
SHOW_PARSE_RESULT_END_TO_FILE(fp);
|
||||
}
|
||||
|
||||
static void printfQueryMeta() {
|
||||
printf("\033[1m\033[40;32m================ query.json parse result ================\033[0m\n");
|
||||
SHOW_PARSE_RESULT_START();
|
||||
printf("host: \033[33m%s:%u\033[0m\n", g_queryInfo.host, g_queryInfo.port);
|
||||
printf("user: \033[33m%s\033[0m\n", g_queryInfo.user);
|
||||
printf("password: \033[33m%s\033[0m\n", g_queryInfo.password);
|
||||
|
@ -1237,14 +1271,13 @@ static void printfQueryMeta() {
|
|||
printf("concurrent: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.concurrent);
|
||||
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount);
|
||||
|
||||
if (SUBSCRIBE_MODE == g_jsonType) {
|
||||
if (SUBSCRIBE_MODE == g_args.test_mode) {
|
||||
printf("mod: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeMode);
|
||||
printf("interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeInterval);
|
||||
printf("restart: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeRestart);
|
||||
printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
||||
}
|
||||
|
||||
|
||||
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
||||
printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.superQueryInfo.sql[i]);
|
||||
}
|
||||
|
@ -1255,7 +1288,7 @@ static void printfQueryMeta() {
|
|||
printf("childTblCount: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.childTblCount);
|
||||
printf("stable name: \033[33m%s\033[0m\n", g_queryInfo.subQueryInfo.sTblName);
|
||||
|
||||
if (SUBSCRIBE_MODE == g_jsonType) {
|
||||
if (SUBSCRIBE_MODE == g_args.test_mode) {
|
||||
printf("mod: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeMode);
|
||||
printf("interval: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeInterval);
|
||||
printf("restart: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeRestart);
|
||||
|
@ -1267,7 +1300,8 @@ static void printfQueryMeta() {
|
|||
printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.subQueryInfo.sql[i]);
|
||||
}
|
||||
printf("\n");
|
||||
printf("\033[1m\033[40;32m================ query.json parse result ================\033[0m\n");
|
||||
|
||||
SHOW_PARSE_RESULT_END();
|
||||
}
|
||||
|
||||
|
||||
|
@ -1434,7 +1468,9 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
|
|||
dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX]));
|
||||
dbInfos[count]->cachelast = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX]));
|
||||
|
||||
tstrncpy(dbInfos[count]->precision, (char *)row[TSDB_SHOW_DB_PRECISION_INDEX], fields[TSDB_SHOW_DB_PRECISION_INDEX].bytes);
|
||||
tstrncpy(dbInfos[count]->precision,
|
||||
(char *)row[TSDB_SHOW_DB_PRECISION_INDEX],
|
||||
fields[TSDB_SHOW_DB_PRECISION_INDEX].bytes);
|
||||
dbInfos[count]->update = *((int8_t *)row[TSDB_SHOW_DB_UPDATE_INDEX]);
|
||||
tstrncpy(dbInfos[count]->status, (char *)row[TSDB_SHOW_DB_STATUS_INDEX], fields[TSDB_SHOW_DB_STATUS_INDEX].bytes);
|
||||
|
||||
|
@ -1982,6 +2018,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
|
|||
superTbls->lenOfTagOfOneRow = lenOfTagOfOneRow;
|
||||
|
||||
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s (ts timestamp%s) tags %s", dbName, superTbls->sTblName, cols, tags);
|
||||
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
|
||||
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -2005,6 +2042,7 @@ static int createDatabases() {
|
|||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||
if (g_Dbs.db[i].drop) {
|
||||
sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName);
|
||||
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
|
||||
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
|
||||
taos_close(taos);
|
||||
return -1;
|
||||
|
@ -2053,19 +2091,24 @@ static int createDatabases() {
|
|||
if (g_Dbs.db[i].dbCfg.fsync > 0) {
|
||||
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "fsync %d ", g_Dbs.db[i].dbCfg.fsync);
|
||||
}
|
||||
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
|
||||
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
|
||||
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
|
||||
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
|
||||
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
|
||||
"precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
|
||||
}
|
||||
|
||||
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
|
||||
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
|
||||
taos_close(taos);
|
||||
return -1;
|
||||
}
|
||||
printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName);
|
||||
|
||||
debugPrint("DEBUG %s() %d count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount);
|
||||
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
||||
// describe super table, if exists
|
||||
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName);
|
||||
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
|
||||
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
|
||||
g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS;
|
||||
ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric);
|
||||
|
@ -2085,7 +2128,6 @@ static int createDatabases() {
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void * createTable(void *sarg)
|
||||
{
|
||||
threadInfo *winfo = (threadInfo *)sarg;
|
||||
|
@ -2100,35 +2142,50 @@ void * createTable(void *sarg)
|
|||
//printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
|
||||
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
|
||||
if (0 == g_Dbs.use_metric) {
|
||||
snprintf(buffer, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable);
|
||||
snprintf(buffer, BUFFER_SIZE,
|
||||
"create table if not exists %s.%s%d %s;",
|
||||
winfo->db_name,
|
||||
superTblInfo->childTblPrefix, i,
|
||||
superTblInfo->colsOfCreatChildTable);
|
||||
} else {
|
||||
if (0 == len) {
|
||||
batchNum = 0;
|
||||
memset(buffer, 0, superTblInfo->maxSqlLen);
|
||||
len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "create table ");
|
||||
len += snprintf(buffer + len,
|
||||
superTblInfo->maxSqlLen - len, "create table ");
|
||||
}
|
||||
|
||||
char* tagsValBuf = NULL;
|
||||
if (0 == superTblInfo->tagSource) {
|
||||
tagsValBuf = generateTagVaulesForStb(superTblInfo);
|
||||
} else {
|
||||
tagsValBuf = getTagValueFromTagSample(superTblInfo, i % superTblInfo->tagSampleCount);
|
||||
tagsValBuf = getTagValueFromTagSample(
|
||||
superTblInfo,
|
||||
i % superTblInfo->tagSampleCount);
|
||||
}
|
||||
if (NULL == tagsValBuf) {
|
||||
free(buffer);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "if not exists %s.%s%d using %s.%s tags %s ", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
|
||||
len += snprintf(buffer + len,
|
||||
superTblInfo->maxSqlLen - len,
|
||||
"if not exists %s.%s%d using %s.%s tags %s ",
|
||||
winfo->db_name, superTblInfo->childTblPrefix,
|
||||
i, winfo->db_name,
|
||||
superTblInfo->sTblName, tagsValBuf);
|
||||
free(tagsValBuf);
|
||||
batchNum++;
|
||||
|
||||
if ((batchNum < superTblInfo->batchCreateTableNum) && ((superTblInfo->maxSqlLen - len) >= (superTblInfo->lenOfTagOfOneRow + 256))) {
|
||||
if ((batchNum < superTblInfo->batchCreateTableNum)
|
||||
&& ((superTblInfo->maxSqlLen - len)
|
||||
>= (superTblInfo->lenOfTagOfOneRow + 256))) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
len = 0;
|
||||
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
|
||||
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
|
||||
free(buffer);
|
||||
return NULL;
|
||||
|
@ -2136,12 +2193,14 @@ void * createTable(void *sarg)
|
|||
|
||||
int64_t currentPrintTime = taosGetTimestampMs();
|
||||
if (currentPrintTime - lastPrintTime > 30*1000) {
|
||||
printf("thread[%d] already create %d - %d tables\n", winfo->threadID, winfo->start_table_id, i);
|
||||
printf("thread[%d] already create %d - %d tables\n",
|
||||
winfo->threadID, winfo->start_table_id, i);
|
||||
lastPrintTime = currentPrintTime;
|
||||
}
|
||||
}
|
||||
|
||||
if (0 != len) {
|
||||
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
|
||||
(void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE);
|
||||
}
|
||||
|
||||
|
@ -2149,7 +2208,9 @@ void * createTable(void *sarg)
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void startMultiThreadCreateChildTable(char* cols, int threads, int ntables, char* db_name, SSuperTable* superTblInfo) {
|
||||
void startMultiThreadCreateChildTable(
|
||||
char* cols, int threads, int ntables,
|
||||
char* db_name, SSuperTable* superTblInfo) {
|
||||
pthread_t *pids = malloc(threads * sizeof(pthread_t));
|
||||
threadInfo *infos = malloc(threads * sizeof(threadInfo));
|
||||
|
||||
|
@ -2177,7 +2238,12 @@ void startMultiThreadCreateChildTable(char* cols, int threads, int ntables, char
|
|||
t_info->threadID = i;
|
||||
tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE);
|
||||
t_info->superTblInfo = superTblInfo;
|
||||
t_info->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
|
||||
t_info->taos = taos_connect(
|
||||
g_Dbs.host,
|
||||
g_Dbs.user,
|
||||
g_Dbs.password,
|
||||
db_name,
|
||||
g_Dbs.port);
|
||||
t_info->start_table_id = last;
|
||||
t_info->end_table_id = i < b ? last + a : last + a - 1;
|
||||
last = t_info->end_table_id + 1;
|
||||
|
@ -2204,10 +2270,15 @@ void startMultiThreadCreateChildTable(char* cols, int threads, int ntables, char
|
|||
static void createChildTables() {
|
||||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
||||
if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
|
||||
if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable)
|
||||
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
|
||||
continue;
|
||||
}
|
||||
startMultiThreadCreateChildTable(g_Dbs.db[i].superTbls[j].colsOfCreatChildTable, g_Dbs.threadCountByCreateTbl, g_Dbs.db[i].superTbls[j].childTblCount, g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
|
||||
startMultiThreadCreateChildTable(
|
||||
g_Dbs.db[i].superTbls[j].colsOfCreatChildTable,
|
||||
g_Dbs.threadCountByCreateTbl,
|
||||
g_Dbs.db[i].superTbls[j].childTblCount,
|
||||
g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
|
||||
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
|
||||
}
|
||||
}
|
||||
|
@ -2247,7 +2318,8 @@ int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
|
|||
|
||||
FILE *fp = fopen(superTblInfo->tagsFile, "r");
|
||||
if (fp == NULL) {
|
||||
printf("Failed to open tags file: %s, reason:%s\n", superTblInfo->tagsFile, strerror(errno));
|
||||
printf("Failed to open tags file: %s, reason:%s\n",
|
||||
superTblInfo->tagsFile, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -2278,11 +2350,13 @@ int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
|
|||
count++;
|
||||
|
||||
if (count >= tagCount - 1) {
|
||||
char *tmp = realloc(tagDataBuf, (size_t)tagCount*1.5*superTblInfo->lenOfTagOfOneRow);
|
||||
char *tmp = realloc(tagDataBuf,
|
||||
(size_t)tagCount*1.5*superTblInfo->lenOfTagOfOneRow);
|
||||
if (tmp != NULL) {
|
||||
tagDataBuf = tmp;
|
||||
tagCount = (int)(tagCount*1.5);
|
||||
memset(tagDataBuf + count*superTblInfo->lenOfTagOfOneRow, 0, (size_t)((tagCount-count)*superTblInfo->lenOfTagOfOneRow));
|
||||
memset(tagDataBuf + count*superTblInfo->lenOfTagOfOneRow,
|
||||
0, (size_t)((tagCount-count)*superTblInfo->lenOfTagOfOneRow));
|
||||
} else {
|
||||
// exit, if allocate more memory failed
|
||||
printf("realloc fail for save tag val from %s\n", superTblInfo->tagsFile);
|
||||
|
@ -2322,7 +2396,8 @@ int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sample
|
|||
readLen = tgetline(&line, &n, fp);
|
||||
if (-1 == readLen) {
|
||||
if(0 != fseek(fp, 0, SEEK_SET)) {
|
||||
printf("Failed to fseek file: %s, reason:%s\n", superTblInfo->sampleFile, strerror(errno));
|
||||
printf("Failed to fseek file: %s, reason:%s\n",
|
||||
superTblInfo->sampleFile, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
continue;
|
||||
|
@ -2337,7 +2412,8 @@ int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sample
|
|||
}
|
||||
|
||||
if (readLen > superTblInfo->lenOfOneRow) {
|
||||
printf("sample row len[%d] overflow define schema len[%d], so discard this row\n", (int32_t)readLen, superTblInfo->lenOfOneRow);
|
||||
printf("sample row len[%d] overflow define schema len[%d], so discard this row\n",
|
||||
(int32_t)readLen, superTblInfo->lenOfOneRow);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -2383,7 +2459,8 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
|
|||
|
||||
int columnSize = cJSON_GetArraySize(columns);
|
||||
if (columnSize > MAX_COLUMN_COUNT) {
|
||||
printf("failed to read json, column size overflow, max column size is %d\n", MAX_COLUMN_COUNT);
|
||||
printf("failed to read json, column size overflow, max column size is %d\n",
|
||||
MAX_COLUMN_COUNT);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -2570,9 +2647,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
|
||||
cJSON* insertInterval = cJSON_GetObjectItem(root, "insert_interval");
|
||||
if (insertInterval && insertInterval->type == cJSON_Number) {
|
||||
g_Dbs.insert_interval = insertInterval->valueint;
|
||||
g_args.insert_interval = insertInterval->valueint;
|
||||
} else if (!insertInterval) {
|
||||
g_Dbs.insert_interval = 0;
|
||||
g_args.insert_interval = 0;
|
||||
} else {
|
||||
printf("failed to read json, insert_interval not found");
|
||||
goto PARSE_OVER;
|
||||
|
@ -2580,16 +2657,18 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
|
||||
cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req");
|
||||
if (numRecPerReq && numRecPerReq->type == cJSON_Number) {
|
||||
g_Dbs.num_of_RPR = numRecPerReq->valueint;
|
||||
g_args.num_of_RPR = numRecPerReq->valueint;
|
||||
} else if (!numRecPerReq) {
|
||||
g_Dbs.num_of_RPR = 0;
|
||||
g_args.num_of_RPR = 100;
|
||||
} else {
|
||||
printf("failed to read json, num_of_records_per_req not found");
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no,
|
||||
if (answerPrompt && answerPrompt->type == cJSON_String && answerPrompt->valuestring != NULL) {
|
||||
if (answerPrompt
|
||||
&& answerPrompt->type == cJSON_String
|
||||
&& answerPrompt->valuestring != NULL) {
|
||||
if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) {
|
||||
g_args.answer_yes = false;
|
||||
} else if (0 == strncasecmp(answerPrompt->valuestring, "no", 2)) {
|
||||
|
@ -2834,7 +2913,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring, MAX_DB_NAME_SIZE);
|
||||
|
||||
cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table"); // yes, no, null
|
||||
if (autoCreateTbl && autoCreateTbl->type == cJSON_String && autoCreateTbl->valuestring != NULL) {
|
||||
if (autoCreateTbl
|
||||
&& autoCreateTbl->type == cJSON_String
|
||||
&& autoCreateTbl->valuestring != NULL) {
|
||||
if (0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3)) {
|
||||
g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL;
|
||||
} else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) {
|
||||
|
@ -2860,7 +2941,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
}
|
||||
|
||||
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no
|
||||
if (childTblExists && childTblExists->type == cJSON_String && childTblExists->valuestring != NULL) {
|
||||
if (childTblExists
|
||||
&& childTblExists->type == cJSON_String
|
||||
&& childTblExists->valuestring != NULL) {
|
||||
if (0 == strncasecmp(childTblExists->valuestring, "yes", 3)) {
|
||||
g_Dbs.db[i].superTbls[j].childTblExists = TBL_ALREADY_EXISTS;
|
||||
} else if (0 == strncasecmp(childTblExists->valuestring, "no", 2)) {
|
||||
|
@ -2883,8 +2966,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
g_Dbs.db[i].superTbls[j].childTblCount = count->valueint;
|
||||
|
||||
cJSON *dataSource = cJSON_GetObjectItem(stbInfo, "data_source");
|
||||
if (dataSource && dataSource->type == cJSON_String && dataSource->valuestring != NULL) {
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, dataSource->valuestring, MAX_DB_NAME_SIZE);
|
||||
if (dataSource && dataSource->type == cJSON_String
|
||||
&& dataSource->valuestring != NULL) {
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource,
|
||||
dataSource->valuestring, MAX_DB_NAME_SIZE);
|
||||
} else if (!dataSource) {
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", MAX_DB_NAME_SIZE);
|
||||
} else {
|
||||
|
@ -2893,8 +2978,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
}
|
||||
|
||||
cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , restful
|
||||
if (insertMode && insertMode->type == cJSON_String && insertMode->valuestring != NULL) {
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, insertMode->valuestring, MAX_DB_NAME_SIZE);
|
||||
if (insertMode && insertMode->type == cJSON_String
|
||||
&& insertMode->valuestring != NULL) {
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].insertMode,
|
||||
insertMode->valuestring, MAX_DB_NAME_SIZE);
|
||||
} else if (!insertMode) {
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, "taosc", MAX_DB_NAME_SIZE);
|
||||
} else {
|
||||
|
@ -2937,7 +3024,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
|
||||
cJSON *sampleFormat = cJSON_GetObjectItem(stbInfo, "sample_format");
|
||||
if (sampleFormat && sampleFormat->type == cJSON_String && sampleFormat->valuestring != NULL) {
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, sampleFormat->valuestring, MAX_DB_NAME_SIZE);
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat,
|
||||
sampleFormat->valuestring, MAX_DB_NAME_SIZE);
|
||||
} else if (!sampleFormat) {
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, "csv", MAX_DB_NAME_SIZE);
|
||||
} else {
|
||||
|
@ -2947,7 +3035,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
|
||||
cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file");
|
||||
if (sampleFile && sampleFile->type == cJSON_String && sampleFile->valuestring != NULL) {
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile, sampleFile->valuestring, MAX_FILE_NAME_LEN);
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile,
|
||||
sampleFile->valuestring, MAX_FILE_NAME_LEN);
|
||||
} else if (!sampleFile) {
|
||||
memset(g_Dbs.db[i].superTbls[j].sampleFile, 0, MAX_FILE_NAME_LEN);
|
||||
} else {
|
||||
|
@ -2957,7 +3046,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
|
||||
cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file");
|
||||
if (tagsFile && tagsFile->type == cJSON_String && tagsFile->valuestring != NULL) {
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].tagsFile, tagsFile->valuestring, MAX_FILE_NAME_LEN);
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].tagsFile,
|
||||
tagsFile->valuestring, MAX_FILE_NAME_LEN);
|
||||
if (0 == g_Dbs.db[i].superTbls[j].tagsFile[0]) {
|
||||
g_Dbs.db[i].superTbls[j].tagSource = 0;
|
||||
} else {
|
||||
|
@ -2987,8 +3077,11 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
cJSON *multiThreadWriteOneTbl = cJSON_GetObjectItem(stbInfo, "multi_thread_write_one_tbl"); // no , yes
|
||||
if (multiThreadWriteOneTbl && multiThreadWriteOneTbl->type == cJSON_String && multiThreadWriteOneTbl->valuestring != NULL) {
|
||||
cJSON *multiThreadWriteOneTbl =
|
||||
cJSON_GetObjectItem(stbInfo, "multi_thread_write_one_tbl"); // no , yes
|
||||
if (multiThreadWriteOneTbl
|
||||
&& multiThreadWriteOneTbl->type == cJSON_String
|
||||
&& multiThreadWriteOneTbl->valuestring != NULL) {
|
||||
if (0 == strncasecmp(multiThreadWriteOneTbl->valuestring, "yes", 3)) {
|
||||
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 1;
|
||||
} else {
|
||||
|
@ -3055,7 +3148,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
|
||||
if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable
|
||||
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -3115,7 +3209,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
|
|||
}
|
||||
|
||||
cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no,
|
||||
if (answerPrompt && answerPrompt->type == cJSON_String && answerPrompt->valuestring != NULL) {
|
||||
if (answerPrompt && answerPrompt->type == cJSON_String
|
||||
&& answerPrompt->valuestring != NULL) {
|
||||
if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) {
|
||||
g_args.answer_yes = false;
|
||||
} else if (0 == strncasecmp(answerPrompt->valuestring, "no", 2)) {
|
||||
|
@ -3209,7 +3304,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
|
|||
}
|
||||
|
||||
cJSON* keepProgress = cJSON_GetObjectItem(superQuery, "keepProgress");
|
||||
if (keepProgress && keepProgress->type == cJSON_String && keepProgress->valuestring != NULL) {
|
||||
if (keepProgress
|
||||
&& keepProgress->type == cJSON_String
|
||||
&& keepProgress->valuestring != NULL) {
|
||||
if (0 == strcmp("yes", keepProgress->valuestring)) {
|
||||
g_queryInfo.superQueryInfo.subscribeKeepProgress = 1;
|
||||
} else if (0 == strcmp("no", keepProgress->valuestring)) {
|
||||
|
@ -3400,6 +3497,8 @@ PARSE_OVER:
|
|||
}
|
||||
|
||||
static bool getInfoFromJsonFile(char* file) {
|
||||
debugPrint("DEBUG - %s %d %s\n", __func__, __LINE__, file);
|
||||
|
||||
FILE *fp = fopen(file, "r");
|
||||
if (!fp) {
|
||||
printf("failed to read %s, reason:%s\n", file, strerror(errno));
|
||||
|
@ -3427,27 +3526,27 @@ static bool getInfoFromJsonFile(char* file) {
|
|||
cJSON* filetype = cJSON_GetObjectItem(root, "filetype");
|
||||
if (filetype && filetype->type == cJSON_String && filetype->valuestring != NULL) {
|
||||
if (0 == strcasecmp("insert", filetype->valuestring)) {
|
||||
g_jsonType = INSERT_MODE;
|
||||
g_args.test_mode = INSERT_MODE;
|
||||
} else if (0 == strcasecmp("query", filetype->valuestring)) {
|
||||
g_jsonType = QUERY_MODE;
|
||||
g_args.test_mode = QUERY_MODE;
|
||||
} else if (0 == strcasecmp("subscribe", filetype->valuestring)) {
|
||||
g_jsonType = SUBSCRIBE_MODE;
|
||||
g_args.test_mode = SUBSCRIBE_MODE;
|
||||
} else {
|
||||
printf("failed to read json, filetype not support\n");
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
} else if (!filetype) {
|
||||
g_jsonType = INSERT_MODE;
|
||||
g_args.test_mode = INSERT_MODE;
|
||||
} else {
|
||||
printf("failed to read json, filetype not found\n");
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
if (INSERT_MODE == g_jsonType) {
|
||||
if (INSERT_MODE == g_args.test_mode) {
|
||||
ret = getMetaFromInsertJsonFile(root);
|
||||
} else if (QUERY_MODE == g_jsonType) {
|
||||
} else if (QUERY_MODE == g_args.test_mode) {
|
||||
ret = getMetaFromQueryJsonFile(root);
|
||||
} else if (SUBSCRIBE_MODE == g_jsonType) {
|
||||
} else if (SUBSCRIBE_MODE == g_args.test_mode) {
|
||||
ret = getMetaFromQueryJsonFile(root);
|
||||
} else {
|
||||
printf("input json file type error! please input correct file type: insert or query or subscribe\n");
|
||||
|
@ -3593,11 +3692,11 @@ void syncWriteForNumberOfTblInOneSql(
|
|||
int64_t st = 0;
|
||||
int64_t et = 0;
|
||||
for (int i = 0; i < superTblInfo->insertRows;) {
|
||||
if (g_Dbs.insert_interval && (g_Dbs.insert_interval > (et - st))) {
|
||||
taosMsleep(g_Dbs.insert_interval - (et - st)); // ms
|
||||
if (g_args.insert_interval && (g_args.insert_interval > (et - st))) {
|
||||
taosMsleep(g_args.insert_interval - (et - st)); // ms
|
||||
}
|
||||
|
||||
if (g_Dbs.insert_interval) {
|
||||
if (g_args.insert_interval) {
|
||||
st = taosGetTimestampMs();
|
||||
}
|
||||
|
||||
|
@ -3605,7 +3704,7 @@ void syncWriteForNumberOfTblInOneSql(
|
|||
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) {
|
||||
int inserted = i;
|
||||
|
||||
for (int k = 0; k < g_Dbs.num_of_RPR;) {
|
||||
for (int k = 0; k < g_args.num_of_RPR;) {
|
||||
int len = 0;
|
||||
memset(buffer, 0, superTblInfo->maxSqlLen);
|
||||
char *pstr = buffer;
|
||||
|
@ -3746,6 +3845,7 @@ send_to_server:
|
|||
int64_t endTs;
|
||||
startTs = taosGetTimestampUs();
|
||||
|
||||
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
|
||||
int affectedRows = queryDbExec(
|
||||
winfo->taos, buffer, INSERT_TYPE);
|
||||
if (0 > affectedRows) {
|
||||
|
@ -3795,7 +3895,7 @@ send_to_server:
|
|||
}
|
||||
}
|
||||
|
||||
if (g_Dbs.insert_interval) {
|
||||
if (g_args.insert_interval) {
|
||||
et = taosGetTimestampMs();
|
||||
}
|
||||
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
|
||||
|
@ -3880,12 +3980,12 @@ void *syncWrite(void *sarg) {
|
|||
uint64_t et = 0;
|
||||
|
||||
for (int i = 0; i < superTblInfo->insertRows;) {
|
||||
if (i > 0 && g_Dbs.insert_interval
|
||||
&& (g_Dbs.insert_interval > (et - st) )) {
|
||||
taosMsleep(g_Dbs.insert_interval - (et - st)); // ms
|
||||
if (i > 0 && g_args.insert_interval
|
||||
&& (g_args.insert_interval > (et - st) )) {
|
||||
taosMsleep(g_args.insert_interval - (et - st)); // ms
|
||||
}
|
||||
|
||||
if (g_Dbs.insert_interval) {
|
||||
if (g_args.insert_interval) {
|
||||
st = taosGetTimestampMs();
|
||||
}
|
||||
|
||||
|
@ -3939,7 +4039,7 @@ void *syncWrite(void *sarg) {
|
|||
tID);
|
||||
}
|
||||
|
||||
for (k = 0; k < g_Dbs.num_of_RPR;) {
|
||||
for (k = 0; k < g_args.num_of_RPR;) {
|
||||
int retLen = 0;
|
||||
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
|
||||
retLen = getRowDataFromSample(
|
||||
|
@ -3991,6 +4091,7 @@ void *syncWrite(void *sarg) {
|
|||
int64_t endTs;
|
||||
startTs = taosGetTimestampUs();
|
||||
|
||||
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
|
||||
int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
|
||||
if (0 > affectedRows){
|
||||
goto free_and_statistics_2;
|
||||
|
@ -4038,7 +4139,7 @@ void *syncWrite(void *sarg) {
|
|||
}
|
||||
}
|
||||
|
||||
if (g_Dbs.insert_interval) {
|
||||
if (g_args.insert_interval) {
|
||||
et = taosGetTimestampMs();
|
||||
}
|
||||
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
|
||||
|
@ -4062,7 +4163,7 @@ free_and_statistics_2:
|
|||
void callBack(void *param, TAOS_RES *res, int code) {
|
||||
threadInfo* winfo = (threadInfo*)param;
|
||||
|
||||
if (g_Dbs.insert_interval) {
|
||||
if (g_args.insert_interval) {
|
||||
winfo->et = taosGetTimestampMs();
|
||||
if (winfo->et - winfo->st < 1000) {
|
||||
taosMsleep(1000 - (winfo->et - winfo->st)); // ms
|
||||
|
@ -4085,7 +4186,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
|
|||
return;
|
||||
}
|
||||
|
||||
for (int i = 0; i < g_Dbs.num_of_RPR; i++) {
|
||||
for (int i = 0; i < g_args.num_of_RPR; i++) {
|
||||
int rand_num = rand() % 100;
|
||||
if (0 != winfo->superTblInfo->disorderRatio && rand_num < winfo->superTblInfo->disorderRatio)
|
||||
{
|
||||
|
@ -4104,7 +4205,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
|
|||
}
|
||||
}
|
||||
|
||||
if (g_Dbs.insert_interval) {
|
||||
if (g_args.insert_interval) {
|
||||
winfo->st = taosGetTimestampMs();
|
||||
}
|
||||
taos_query_a(winfo->taos, buffer, callBack, winfo);
|
||||
|
@ -4121,7 +4222,7 @@ void *asyncWrite(void *sarg) {
|
|||
winfo->et = 0;
|
||||
winfo->lastTs = winfo->start_time;
|
||||
|
||||
if (g_Dbs.insert_interval) {
|
||||
if (g_args.insert_interval) {
|
||||
winfo->st = taosGetTimestampMs();
|
||||
}
|
||||
taos_query_a(winfo->taos, "show databases", callBack, winfo);
|
||||
|
@ -4131,12 +4232,18 @@ void *asyncWrite(void *sarg) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSuperTable* superTblInfo) {
|
||||
void startMultiThreadInsertData(int threads, char* db_name, char* precision,
|
||||
SSuperTable* superTblInfo) {
|
||||
pthread_t *pids = malloc(threads * sizeof(pthread_t));
|
||||
threadInfo *infos = malloc(threads * sizeof(threadInfo));
|
||||
memset(pids, 0, threads * sizeof(pthread_t));
|
||||
memset(infos, 0, threads * sizeof(threadInfo));
|
||||
int ntables = superTblInfo->childTblCount;
|
||||
|
||||
int ntables = 0;
|
||||
if (superTblInfo)
|
||||
ntables = superTblInfo->childTblCount;
|
||||
else
|
||||
ntables = g_args.num_of_tables;
|
||||
|
||||
int a = ntables / threads;
|
||||
if (a < 1) {
|
||||
|
@ -4256,21 +4363,25 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
|
|||
double end = getCurrentTime();
|
||||
double t = end - start;
|
||||
printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n",
|
||||
t, superTblInfo->totalRowsInserted, superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName, superTblInfo->totalRowsInserted / t);
|
||||
t, superTblInfo->totalRowsInserted,
|
||||
superTblInfo->totalAffectedRows,
|
||||
threads, db_name, superTblInfo->sTblName,
|
||||
superTblInfo->totalRowsInserted / t);
|
||||
fprintf(g_fpOfInsertResult, "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n",
|
||||
t, superTblInfo->totalRowsInserted, superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName, superTblInfo->totalRowsInserted / t);
|
||||
t, superTblInfo->totalRowsInserted,
|
||||
superTblInfo->totalAffectedRows,
|
||||
threads, db_name, superTblInfo->sTblName,
|
||||
superTblInfo->totalRowsInserted / t);
|
||||
|
||||
printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n",
|
||||
avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0);
|
||||
fprintf(g_fpOfInsertResult, "insert delay, avg:%10.6fms, max: %10.6fms, min: %10.6fms\n\n",
|
||||
avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0);
|
||||
|
||||
|
||||
//taos_close(taos);
|
||||
|
||||
free(pids);
|
||||
free(infos);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -4413,6 +4524,8 @@ void *readMetric(void *sarg) {
|
|||
|
||||
int insertTestProcess() {
|
||||
|
||||
debugPrint("DEBUG - %d result file: %s\n", __LINE__, g_Dbs.resultFile);
|
||||
|
||||
g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a");
|
||||
if (NULL == g_fpOfInsertResult) {
|
||||
fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile);
|
||||
|
@ -4422,6 +4535,7 @@ int insertTestProcess() {
|
|||
setupForAnsiEscape();
|
||||
int ret = printfInsertMeta();
|
||||
resetAfterAnsiEscape();
|
||||
|
||||
if (ret == -1)
|
||||
exit(EXIT_FAILURE);
|
||||
|
||||
|
@ -4460,6 +4574,7 @@ int insertTestProcess() {
|
|||
// create sub threads for inserting data
|
||||
//start = getCurrentTime();
|
||||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||
if (g_Dbs.db[i].superTblCount > 0) {
|
||||
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
||||
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j];
|
||||
if (0 == g_Dbs.db[i].superTbls[j].insertRows) {
|
||||
|
@ -4471,6 +4586,13 @@ int insertTestProcess() {
|
|||
g_Dbs.db[i].dbCfg.precision,
|
||||
superTblInfo);
|
||||
}
|
||||
} else {
|
||||
startMultiThreadInsertData(
|
||||
g_Dbs.threadCount,
|
||||
g_Dbs.db[i].dbName,
|
||||
g_Dbs.db[i].dbCfg.precision,
|
||||
NULL);
|
||||
}
|
||||
}
|
||||
//end = getCurrentTime();
|
||||
|
||||
|
@ -4671,6 +4793,7 @@ int queryTestProcess() {
|
|||
|
||||
char sqlStr[MAX_TB_NAME_SIZE*2];
|
||||
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
||||
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
|
||||
(void)queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE);
|
||||
} else {
|
||||
t_info->taos = NULL;
|
||||
|
@ -4781,6 +4904,7 @@ void *subSubscribeProcess(void *sarg) {
|
|||
|
||||
char sqlStr[MAX_TB_NAME_SIZE*2];
|
||||
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
||||
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
|
||||
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)){
|
||||
return NULL;
|
||||
}
|
||||
|
@ -4846,6 +4970,7 @@ void *superSubscribeProcess(void *sarg) {
|
|||
|
||||
char sqlStr[MAX_TB_NAME_SIZE*2];
|
||||
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
||||
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
|
||||
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -5068,6 +5193,20 @@ void setParaFromArg(){
|
|||
g_Dbs.use_metric = g_args.use_metric;
|
||||
g_Dbs.insert_only = g_args.insert_only;
|
||||
|
||||
g_Dbs.do_aggreFunc = true;
|
||||
|
||||
char dataString[STRING_LEN];
|
||||
char **data_type = g_args.datatype;
|
||||
|
||||
memset(dataString, 0, STRING_LEN);
|
||||
|
||||
if (strcasecmp(data_type[0], "BINARY") == 0
|
||||
|| strcasecmp(data_type[0], "BOOL") == 0
|
||||
|| strcasecmp(data_type[0], "NCHAR") == 0 ) {
|
||||
g_Dbs.do_aggreFunc = false;
|
||||
}
|
||||
|
||||
if (g_args.use_metric) {
|
||||
g_Dbs.db[0].superTblCount = 1;
|
||||
tstrncpy(g_Dbs.db[0].superTbls[0].sTblName, "meters", MAX_TB_NAME_SIZE);
|
||||
g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables;
|
||||
|
@ -5088,23 +5227,9 @@ void setParaFromArg(){
|
|||
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
|
||||
g_Dbs.db[0].superTbls[0].timeStampStep = 10;
|
||||
|
||||
// g_args.num_of_RPR;
|
||||
g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT;
|
||||
g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE;
|
||||
|
||||
g_Dbs.do_aggreFunc = true;
|
||||
|
||||
char dataString[STRING_LEN];
|
||||
char **data_type = g_args.datatype;
|
||||
|
||||
memset(dataString, 0, STRING_LEN);
|
||||
|
||||
if (strcasecmp(data_type[0], "BINARY") == 0
|
||||
|| strcasecmp(data_type[0], "BOOL") == 0
|
||||
|| strcasecmp(data_type[0], "NCHAR") == 0 ) {
|
||||
g_Dbs.do_aggreFunc = false;
|
||||
}
|
||||
|
||||
g_Dbs.db[0].superTbls[0].columnCount = 0;
|
||||
for (int i = 0; i < MAX_NUM_DATATYPE; i++) {
|
||||
if (data_type[i] == NULL) {
|
||||
|
@ -5127,7 +5252,6 @@ void setParaFromArg(){
|
|||
}
|
||||
}
|
||||
|
||||
if (g_Dbs.use_metric) {
|
||||
tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType, "INT", MAX_TB_NAME_SIZE);
|
||||
g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0;
|
||||
|
||||
|
@ -5137,6 +5261,7 @@ void setParaFromArg(){
|
|||
} else {
|
||||
g_Dbs.db[0].superTbls[0].tagCount = 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/* Function to do regular expression check */
|
||||
|
@ -5206,6 +5331,7 @@ void querySqlFile(TAOS* taos, char* sqlFile)
|
|||
}
|
||||
|
||||
memcpy(cmd + cmd_len, line, read_len);
|
||||
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
|
||||
queryDbExec(taos, cmd, NO_INSERT_TYPE);
|
||||
memset(cmd, 0, MAX_SQL_SIZE);
|
||||
cmd_len = 0;
|
||||
|
@ -5223,6 +5349,8 @@ void querySqlFile(TAOS* taos, char* sqlFile)
|
|||
int main(int argc, char *argv[]) {
|
||||
parse_args(argc, argv, &g_args);
|
||||
|
||||
debugPrint("DEBUG - meta file: %s\n", g_args.metaFile);
|
||||
|
||||
if (g_args.metaFile) {
|
||||
initOfInsertMeta();
|
||||
initOfQueryMeta();
|
||||
|
@ -5232,14 +5360,14 @@ int main(int argc, char *argv[]) {
|
|||
return 1;
|
||||
}
|
||||
|
||||
if (INSERT_MODE == g_jsonType) {
|
||||
if (INSERT_MODE == g_args.test_mode) {
|
||||
if (g_Dbs.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_Dbs.cfgDir);
|
||||
(void)insertTestProcess();
|
||||
} else if (QUERY_MODE == g_jsonType) {
|
||||
} else if (QUERY_MODE == g_args.test_mode) {
|
||||
if (g_queryInfo.cfgDir[0])
|
||||
taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir);
|
||||
(void)queryTestProcess();
|
||||
} else if (SUBSCRIBE_MODE == g_jsonType) {
|
||||
} else if (SUBSCRIBE_MODE == g_args.test_mode) {
|
||||
if (g_queryInfo.cfgDir[0])
|
||||
taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir);
|
||||
(void)subscribeTestProcess();
|
||||
|
@ -5248,7 +5376,7 @@ int main(int argc, char *argv[]) {
|
|||
}
|
||||
} else {
|
||||
memset(&g_Dbs, 0, sizeof(SDbs));
|
||||
g_jsonType = INSERT_MODE;
|
||||
g_args.test_mode = INSERT_MODE;
|
||||
setParaFromArg();
|
||||
|
||||
if (NULL != g_args.sqlFile) {
|
||||
|
|
Loading…
Reference in New Issue