From 2c4bcbe3061c86fc30a5ec66dc12ca0bf7eb2fe5 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sat, 27 Mar 2021 13:30:14 +0800 Subject: [PATCH] Feature/sangshuduo/td 3317 taosdemo interlace (#5596) * [TD-3316] : add testcase for taosdemo limit and offset. check offset 0. * [TD-3316] : add testcase for taosdemo limit and offset. fix sample file import bug. * [TD-3316] : add test case for limit and offset. fix sample data issue. * [TD-3327] : fix taosdemo segfault when import data from sample data file. * [TD-3317] : make taosdemo support interlace mode. json parameter rows_per_tbl support. * [TD-3317] : support interlace mode. refactor * [TD-3317] : support interlace mode. refactor * [TD-3317] : support interlace mode insertion. refactor. * [TD-3317] : support interlace mode insertion. change json file. * [TD-3317] : support interlace mode insertion. fix multithread create table regression. * [TD-3317] : support interlace mode insertion. working but not perfect. * [TD-3317] : support interlace mode insertion. rename lowaTest with taosdemoTestWithJson * [TD-3317] : support interlace mode insertion. perfect * [TD-3317] : support interlace mode insertion. cleanup. * [TD-3317] : support interlace mode insertion. adjust algorithm of loop times. * [TD-3317] : support interlace mode insertion. fix delay time bug. * [TD-3317] : support interlace mode insertion. fix progressive timestamp bug. * [TD-3317] : support interlace mode insertion. add an option for performance print. * [TD-3317] : support interlace mode insertion. change json test case with less table for acceleration. * [TD-3317] : support interlace mode insertion. change progressive mode timestamp step and testcase. * [TD-3197] : fix taosdemo coverity scan issues. * [TD-3197] : fix taosdemo coverity scan issue. fix subscribeTest pids uninitialized. * [TD-3317] : support interlace mode insertion. add time shift for no sleep time. * [TD-3317] : support interlace insert. rework timestamp. * [TD-3317] : support interlace mode insertion. change rows_per_tbl to interlace_rows. * [TD-3317] : taosdemo suppoert interlace mode. remove trailing spaces. * [TD-3317] : taosdemo support interlace insertion. prompt if interlace > num_of_records_per_req Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 522 +++++++++++++++++++----------------- 1 file changed, 273 insertions(+), 249 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 97b661253a..9e8e974096 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -37,10 +37,10 @@ #include #include #include -#else +#else #include #include -#endif +#endif #include #include @@ -100,7 +100,7 @@ typedef enum CREATE_SUB_TALBE_MOD_EN { AUTO_CREATE_SUBTBL, NO_CREATE_SUBTBL } CREATE_SUB_TALBE_MOD_EN; - + typedef enum TALBE_EXISTS_EN { TBL_NO_EXISTS, TBL_ALREADY_EXISTS, @@ -108,7 +108,7 @@ typedef enum TALBE_EXISTS_EN { } TALBE_EXISTS_EN; enum MODE { - SYNC, + SYNC, ASYNC, MODE_BUT }; @@ -131,7 +131,7 @@ enum _show_db_index { TSDB_SHOW_DB_NTABLES_INDEX, TSDB_SHOW_DB_VGROUPS_INDEX, TSDB_SHOW_DB_REPLICA_INDEX, - TSDB_SHOW_DB_QUORUM_INDEX, + TSDB_SHOW_DB_QUORUM_INDEX, TSDB_SHOW_DB_DAYS_INDEX, TSDB_SHOW_DB_KEEP_INDEX, TSDB_SHOW_DB_CACHE_INDEX, @@ -153,10 +153,10 @@ enum _show_stables_index { TSDB_SHOW_STABLES_NAME_INDEX, TSDB_SHOW_STABLES_CREATED_TIME_INDEX, TSDB_SHOW_STABLES_COLUMNS_INDEX, - TSDB_SHOW_STABLES_METRIC_INDEX, - TSDB_SHOW_STABLES_UID_INDEX, + TSDB_SHOW_STABLES_METRIC_INDEX, + TSDB_SHOW_STABLES_UID_INDEX, TSDB_SHOW_STABLES_TID_INDEX, - TSDB_SHOW_STABLES_VGID_INDEX, + TSDB_SHOW_STABLES_VGID_INDEX, TSDB_MAX_SHOW_STABLES }; @@ -220,7 +220,7 @@ typedef struct SColumn_S { char field[TSDB_COL_NAME_LEN + 1]; char dataType[MAX_TB_NAME_SIZE]; int dataLen; - char note[128]; + char note[128]; } StrColumn; typedef struct SSuperTable_S { @@ -269,7 +269,7 @@ typedef struct SSuperTable_S { int tagSampleCount; int tagUsePos; - // statistics + // statistics int64_t totalInsertRows; int64_t totalAffectedRows; } SSuperTable; @@ -278,10 +278,10 @@ typedef struct { char name[TSDB_DB_NAME_LEN + 1]; char create_time[32]; int32_t ntables; - int32_t vgroups; + int32_t vgroups; int16_t replica; int16_t quorum; - int16_t days; + int16_t days; char keeplist[32]; int32_t cache; //MB int32_t blocks; @@ -296,14 +296,14 @@ typedef struct { char status[16]; } SDbInfo; -typedef struct SDbCfg_S { +typedef struct SDbCfg_S { // int maxtablesPerVnode; - int minRows; + int minRows; int maxRows; int comp; int walLevel; int cacheLast; - int fsync; + int fsync; int replica; int update; int keep; @@ -311,7 +311,7 @@ typedef struct SDbCfg_S { int cache; int blocks; int quorum; - char precision[MAX_TB_NAME_SIZE]; + char precision[MAX_TB_NAME_SIZE]; } SDbCfg; typedef struct SDataBase_S { @@ -333,7 +333,7 @@ typedef struct SDbs_S { bool insert_only; bool do_aggreFunc; bool queryMode; - + int threadCount; int threadCountByCreateTbl; int dbCount; @@ -372,7 +372,7 @@ typedef struct SubQueryInfo_S { char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; - + char* childTblName; } SubQueryInfo; @@ -386,7 +386,7 @@ typedef struct SQueryMetaInfo_S { char queryMode[MAX_TB_NAME_SIZE]; // taosc, restful SuperQueryInfo superQueryInfo; - SubQueryInfo subQueryInfo; + SubQueryInfo subQueryInfo; } SQueryMetaInfo; typedef struct SThreadInfo_S { @@ -424,7 +424,7 @@ typedef struct SThreadInfo_S { int64_t avgDelay; int64_t maxDelay; int64_t minDelay; - + } threadInfo; #ifdef WINDOWS @@ -463,12 +463,12 @@ static void setupForAnsiEscape(void) { if(!SetConsoleMode(g_stdoutHandle, mode)) { exit(GetLastError()); - } + } } static void resetAfterAnsiEscape(void) { // Reset colors - printf("\x1b[0m"); + printf("\x1b[0m"); // Reset console mode if(!SetConsoleMode(g_stdoutHandle, g_consoleMode)) { @@ -508,7 +508,7 @@ int32_t randint[MAX_PREPARED_RAND]; int64_t randbigint[MAX_PREPARED_RAND]; float randfloat[MAX_PREPARED_RAND]; double randdouble[MAX_PREPARED_RAND]; -char *aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)", +char *aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)", "max(col0)", "min(col0)", "first(col0)", "last(col0)"}; SArguments g_args = { @@ -517,7 +517,7 @@ SArguments g_args = { "127.0.0.1", // host 6030, // port "root", // user - #ifdef _TD_POWER_ + #ifdef _TD_POWER_ "powerdb", // password #else "taosdata", // password @@ -599,7 +599,7 @@ static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); } #define TD_VERNUMBER "unknown" #endif -#ifndef TAOSDEMO_STATUS +#ifndef TAOSDEMO_STATUS #define TAOSDEMO_STATUS "unknown" #endif @@ -619,62 +619,62 @@ static void printVersion() { static void printHelp() { char indent[10] = " "; - printf("%s%s%s%s\n", indent, "-f", indent, + printf("%s%s%s%s\n", indent, "-f", indent, "The meta file to the execution procedure. Default is './meta.json'."); - printf("%s%s%s%s\n", indent, "-u", indent, + printf("%s%s%s%s\n", indent, "-u", indent, "The TDengine user name to use when connecting to the server. Default is 'root'."); #ifdef _TD_POWER_ - printf("%s%s%s%s\n", indent, "-P", indent, + printf("%s%s%s%s\n", indent, "-P", indent, "The password to use when connecting to the server. Default is 'powerdb'."); - printf("%s%s%s%s\n", indent, "-c", indent, + printf("%s%s%s%s\n", indent, "-c", indent, "Configuration directory. Default is '/etc/power/'."); #else - printf("%s%s%s%s\n", indent, "-P", indent, + printf("%s%s%s%s\n", indent, "-P", indent, "The password to use when connecting to the server. Default is 'taosdata'."); - printf("%s%s%s%s\n", indent, "-c", indent, + printf("%s%s%s%s\n", indent, "-c", indent, "Configuration directory. Default is '/etc/taos/'."); #endif - printf("%s%s%s%s\n", indent, "-h", indent, + printf("%s%s%s%s\n", indent, "-h", indent, "The host to connect to TDengine. Default is localhost."); - printf("%s%s%s%s\n", indent, "-p", indent, + printf("%s%s%s%s\n", indent, "-p", indent, "The TCP/IP port number to use for the connection. Default is 0."); - printf("%s%s%s%s\n", indent, "-d", indent, + printf("%s%s%s%s\n", indent, "-d", indent, "Destination database. Default is 'test'."); - printf("%s%s%s%s\n", indent, "-a", indent, + printf("%s%s%s%s\n", indent, "-a", indent, "Set the replica parameters of the database, Default 1, min: 1, max: 3."); - printf("%s%s%s%s\n", indent, "-m", indent, + printf("%s%s%s%s\n", indent, "-m", indent, "Table prefix name. Default is 't'."); printf("%s%s%s%s\n", indent, "-s", indent, "The select sql file."); printf("%s%s%s%s\n", indent, "-N", indent, "Use normal table flag."); - printf("%s%s%s%s\n", indent, "-o", indent, + printf("%s%s%s%s\n", indent, "-o", indent, "Direct output to the named file. Default is './output.txt'."); - printf("%s%s%s%s\n", indent, "-q", indent, + printf("%s%s%s%s\n", indent, "-q", indent, "Query mode--0: SYNC, 1: ASYNC. Default is SYNC."); - printf("%s%s%s%s\n", indent, "-b", indent, + printf("%s%s%s%s\n", indent, "-b", indent, "The data_type of columns, default: TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,BINARY,NCHAR,BOOL,TIMESTAMP."); - printf("%s%s%s%s\n", indent, "-w", indent, + printf("%s%s%s%s\n", indent, "-w", indent, "The length of data_type 'BINARY' or 'NCHAR'. Default is 16"); - printf("%s%s%s%s\n", indent, "-l", indent, + printf("%s%s%s%s\n", indent, "-l", indent, "The number of columns per record. Default is 10."); - printf("%s%s%s%s\n", indent, "-T", indent, + printf("%s%s%s%s\n", indent, "-T", indent, "The number of threads. Default is 10."); - printf("%s%s%s%s\n", indent, "-i", indent, + printf("%s%s%s%s\n", indent, "-i", indent, "The sleep time (ms) between insertion. Default is 0."); - printf("%s%s%s%s\n", indent, "-r", indent, + printf("%s%s%s%s\n", indent, "-r", indent, "The number of records per request. Default is 100."); - printf("%s%s%s%s\n", indent, "-t", indent, + printf("%s%s%s%s\n", indent, "-t", indent, "The number of tables. Default is 10000."); - printf("%s%s%s%s\n", indent, "-n", indent, + printf("%s%s%s%s\n", indent, "-n", indent, "The number of records per table. Default is 10000."); printf("%s%s%s%s\n", indent, "-x", indent, "Not insert only flag."); printf("%s%s%s%s\n", indent, "-y", indent, "Default input yes for prompt."); - printf("%s%s%s%s\n", indent, "-O", indent, + printf("%s%s%s%s\n", indent, "-O", indent, "Insert mode--0: In order, > 0: disorder ratio. Default is in order."); - printf("%s%s%s%s\n", indent, "-R", indent, + printf("%s%s%s%s\n", indent, "-R", indent, "Out of order data's range, ms, default is 1000."); - printf("%s%s%s%s\n", indent, "-g", indent, + printf("%s%s%s%s\n", indent, "-g", indent, "Print debug info."); - printf("%s%s%s%s\n", indent, "-V, --version", indent, + printf("%s%s%s%s\n", indent, "-V, --version", indent, "Print version info."); /* printf("%s%s%s%s\n", indent, "-D", indent, "if elete database if exists. 0: no, 1: yes, default is 1"); @@ -696,6 +696,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } taos_options(TSDB_OPTION_CONFIGDIR, full_path.we_wordv[0]); wordfree(&full_path); + } else if (strcmp(argv[i], "-h") == 0) { arguments->host = argv[++i]; } else if (strcmp(argv[i], "-p") == 0) { @@ -793,7 +794,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { strcpy(configDir, argv[++i]); } else if (strcmp(argv[i], "-O") == 0) { arguments->disorderRatio = atoi(argv[++i]); - if (arguments->disorderRatio > 1 + if (arguments->disorderRatio > 1 || arguments->disorderRatio < 0) { arguments->disorderRatio = 0; } else if (arguments->disorderRatio == 1) { @@ -801,8 +802,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } } else if (strcmp(argv[i], "-R") == 0) { arguments->disorderRange = atoi(argv[++i]); - if (arguments->disorderRange == 1 - && (arguments->disorderRange > 50 + if (arguments->disorderRange == 1 + && (arguments->disorderRange > 50 || arguments->disorderRange <= 0)) { arguments->disorderRange = 10; } @@ -835,7 +836,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { || arguments->verbose_print) { printf("###################################################################\n"); printf("# meta file: %s\n", arguments->metaFile); - printf("# Server IP: %s:%hu\n", + printf("# Server IP: %s:%hu\n", arguments->host == NULL ? "localhost" : arguments->host, arguments->port ); printf("# User: %s\n", arguments->user); @@ -862,7 +863,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { if (arguments->disorderRatio) { printf("# Data order: %d\n", arguments->disorderRatio); printf("# Data out of order rate: %d\n", arguments->disorderRange); - + } printf("# Delete method: %d\n", arguments->method_of_delete); printf("# Answer yes when prompt: %d\n", arguments->answer_yes); @@ -930,7 +931,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) { return 0; } -static void getResult(TAOS_RES *res, char* resultFileName) { +static void getResult(TAOS_RES *res, char* resultFileName) { TAOS_ROW row = NULL; int num_rows = 0; int num_fields = taos_field_count(res); @@ -940,13 +941,15 @@ static void getResult(TAOS_RES *res, char* resultFileName) { if (resultFileName[0] != 0) { fp = fopen(resultFileName, "at"); if (fp == NULL) { - errorPrint("%s() LN%d, failed to open result file: %s, result will not save to file\n", __func__, __LINE__, resultFileName); + errorPrint("%s() LN%d, failed to open result file: %s, result will not save to file\n", + __func__, __LINE__, resultFileName); } } - + char* databuf = (char*) calloc(1, 100*1024*1024); if (databuf == NULL) { - errorPrint("%s() LN%d, failed to malloc, warning: save result to file slowly!\n", __func__, __LINE__); + errorPrint("%s() LN%d, failed to malloc, warning: save result to file slowly!\n", + __func__, __LINE__); if (fp) fclose(fp); return ; @@ -982,7 +985,7 @@ static void selectAndGetResult(TAOS *taos, char *command, char* resultFileName) taos_free_result(res); return; } - + getResult(res, resultFileName); taos_free_result(res); } @@ -1030,14 +1033,13 @@ static int64_t rand_bigint(){ cursor++; cursor = cursor % MAX_PREPARED_RAND; return randbigint[cursor]; - } static float rand_float(){ static int cursor; cursor++; cursor = cursor % MAX_PREPARED_RAND; - return randfloat[cursor]; + return randfloat[cursor]; } static const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"; @@ -1110,9 +1112,9 @@ static int printfInsertMeta() { printf("database[\033[33m%d\033[0m]:\n", i); printf(" database[%d] name: \033[33m%s\033[0m\n", i, g_Dbs.db[i].dbName); if (0 == g_Dbs.db[i].drop) { - printf(" drop: \033[33mno\033[0m\n"); - }else { - printf(" drop: \033[33myes\033[0m\n"); + printf(" drop: \033[33mno\033[0m\n"); + } else { + printf(" drop: \033[33myes\033[0m\n"); } if (g_Dbs.db[i].dbCfg.blocks > 0) { @@ -1165,8 +1167,8 @@ static int printfInsertMeta() { printf(" super table count: \033[33m%d\033[0m\n", g_Dbs.db[i].superTblCount); for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { printf(" super table[\033[33m%d\033[0m]:\n", j); - - printf(" stbName: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sTblName); + + printf(" stbName: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sTblName); if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { printf(" autoCreateTable: \033[33m%s\033[0m\n", "no"); @@ -1175,7 +1177,7 @@ static int printfInsertMeta() { } else { printf(" autoCreateTable: \033[33m%s\033[0m\n", "error"); } - + if (TBL_NO_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) { printf(" childTblExists: \033[33m%s\033[0m\n", "no"); } else if (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) { @@ -1278,9 +1280,9 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, "database[%d]:\n", i); fprintf(fp, " database[%d] name: %s\n", i, g_Dbs.db[i].dbName); if (0 == g_Dbs.db[i].drop) { - fprintf(fp, " drop: no\n"); + fprintf(fp, " drop: no\n"); }else { - fprintf(fp, " drop: yes\n"); + fprintf(fp, " drop: yes\n"); } if (g_Dbs.db[i].dbCfg.blocks > 0) { @@ -1331,8 +1333,8 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " super table count: %d\n", g_Dbs.db[i].superTblCount); for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { fprintf(fp, " super table[%d]:\n", j); - - fprintf(fp, " stbName: %s\n", g_Dbs.db[i].superTbls[j].sTblName); + + fprintf(fp, " stbName: %s\n", g_Dbs.db[i].superTbls[j].sTblName); if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { fprintf(fp, " autoCreateTable: %s\n", "no"); @@ -1341,7 +1343,7 @@ static void printfInsertMetaToFile(FILE* fp) { } else { fprintf(fp, " autoCreateTable: %s\n", "error"); } - + if (TBL_NO_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) { fprintf(fp, " childTblExists: %s\n", "no"); } else if (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) { @@ -1349,33 +1351,33 @@ static void printfInsertMetaToFile(FILE* fp) { } else { fprintf(fp, " childTblExists: %s\n", "error"); } - - fprintf(fp, " childTblCount: %d\n", g_Dbs.db[i].superTbls[j].childTblCount); - fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix); - fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); - fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode); - fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); + + fprintf(fp, " childTblCount: %d\n", g_Dbs.db[i].superTbls[j].childTblCount); + fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix); + fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); + fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode); + fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); fprintf(fp, " interlace rows: %d\n", g_Dbs.db[i].superTbls[j].interlaceRows); if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) { fprintf(fp, " insert interval: %d\n", g_Dbs.db[i].superTbls[j].insertInterval); } if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { - fprintf(fp, " multiThreadWriteOneTbl: no\n"); + fprintf(fp, " multiThreadWriteOneTbl: no\n"); }else { - fprintf(fp, " multiThreadWriteOneTbl: yes\n"); + fprintf(fp, " multiThreadWriteOneTbl: yes\n"); } - fprintf(fp, " interlaceRows: %d\n", g_Dbs.db[i].superTbls[j].interlaceRows); - fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange); + fprintf(fp, " interlaceRows: %d\n", g_Dbs.db[i].superTbls[j].interlaceRows); + fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange); fprintf(fp, " disorderRatio: %d\n", g_Dbs.db[i].superTbls[j].disorderRatio); - fprintf(fp, " maxSqlLen: %d\n", g_Dbs.db[i].superTbls[j].maxSqlLen); - - fprintf(fp, " timeStampStep: %d\n", g_Dbs.db[i].superTbls[j].timeStampStep); - fprintf(fp, " startTimestamp: %s\n", g_Dbs.db[i].superTbls[j].startTimestamp); + fprintf(fp, " maxSqlLen: %d\n", g_Dbs.db[i].superTbls[j].maxSqlLen); + + fprintf(fp, " timeStampStep: %d\n", g_Dbs.db[i].superTbls[j].timeStampStep); + fprintf(fp, " startTimestamp: %s\n", g_Dbs.db[i].superTbls[j].startTimestamp); fprintf(fp, " sampleFormat: %s\n", g_Dbs.db[i].superTbls[j].sampleFormat); - fprintf(fp, " sampleFile: %s\n", g_Dbs.db[i].superTbls[j].sampleFile); - fprintf(fp, " tagsFile: %s\n", g_Dbs.db[i].superTbls[j].tagsFile); - + fprintf(fp, " sampleFile: %s\n", g_Dbs.db[i].superTbls[j].sampleFile); + fprintf(fp, " tagsFile: %s\n", g_Dbs.db[i].superTbls[j].tagsFile); + fprintf(fp, " columnCount: %d\n ", g_Dbs.db[i].superTbls[j].columnCount); for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) { //printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); @@ -1565,7 +1567,7 @@ static int xDumpResultToFile(const char* fname, TAOS_RES* tres) { fprintf(fp, "%s", fields[col].name); } fputc('\n', fp); - + int numOfRows = 0; do { int32_t* length = taos_fetch_lengths(tres); @@ -1586,14 +1588,14 @@ static int xDumpResultToFile(const char* fname, TAOS_RES* tres) { return numOfRows; } -static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) { - TAOS_RES * res; +static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) { + TAOS_RES * res; TAOS_ROW row = NULL; int count = 0; - - res = taos_query(taos, "show databases;"); + + res = taos_query(taos, "show databases;"); int32_t code = taos_errno(res); - + if (code != 0) { errorPrint( "failed to run , reason: %s\n", taos_errstr(res)); return -1; @@ -1617,13 +1619,13 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) { *(int64_t*)row[TSDB_SHOW_DB_CREATED_TIME_INDEX], TSDB_TIME_PRECISION_MILLI); dbInfos[count]->ntables = *((int32_t *)row[TSDB_SHOW_DB_NTABLES_INDEX]); - dbInfos[count]->vgroups = *((int32_t *)row[TSDB_SHOW_DB_VGROUPS_INDEX]); + dbInfos[count]->vgroups = *((int32_t *)row[TSDB_SHOW_DB_VGROUPS_INDEX]); dbInfos[count]->replica = *((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX]); dbInfos[count]->quorum = *((int16_t *)row[TSDB_SHOW_DB_QUORUM_INDEX]); - dbInfos[count]->days = *((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX]); + dbInfos[count]->days = *((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX]); tstrncpy(dbInfos[count]->keeplist, (char *)row[TSDB_SHOW_DB_KEEP_INDEX], - fields[TSDB_SHOW_DB_KEEP_INDEX].bytes); + fields[TSDB_SHOW_DB_KEEP_INDEX].bytes); dbInfos[count]->cache = *((int32_t *)row[TSDB_SHOW_DB_CACHE_INDEX]); dbInfos[count]->blocks = *((int32_t *)row[TSDB_SHOW_DB_BLOCKS_INDEX]); dbInfos[count]->minrows = *((int32_t *)row[TSDB_SHOW_DB_MINROWS_INDEX]); @@ -1633,16 +1635,16 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) { dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX])); dbInfos[count]->cachelast = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX])); - tstrncpy(dbInfos[count]->precision, + tstrncpy(dbInfos[count]->precision, (char *)row[TSDB_SHOW_DB_PRECISION_INDEX], - fields[TSDB_SHOW_DB_PRECISION_INDEX].bytes); + fields[TSDB_SHOW_DB_PRECISION_INDEX].bytes); dbInfos[count]->update = *((int8_t *)row[TSDB_SHOW_DB_UPDATE_INDEX]); tstrncpy(dbInfos[count]->status, (char *)row[TSDB_SHOW_DB_STATUS_INDEX], - fields[TSDB_SHOW_DB_STATUS_INDEX].bytes); + fields[TSDB_SHOW_DB_STATUS_INDEX].bytes); count++; if (count > MAX_DATABASE_COUNT) { - errorPrint( "The database count overflow than %d\n", MAX_DATABASE_COUNT); + errorPrint( "The database count overflow than %d\n", MAX_DATABASE_COUNT); break; } } @@ -1664,11 +1666,11 @@ static void printfDbInfoForQueryToFile(char* filename, SDbInfo* dbInfos, int ind fprintf(fp, "name: %s\n", dbInfos->name); fprintf(fp, "created_time: %s\n", dbInfos->create_time); fprintf(fp, "ntables: %d\n", dbInfos->ntables); - fprintf(fp, "vgroups: %d\n", dbInfos->vgroups); + fprintf(fp, "vgroups: %d\n", dbInfos->vgroups); fprintf(fp, "replica: %d\n", dbInfos->replica); fprintf(fp, "quorum: %d\n", dbInfos->quorum); - fprintf(fp, "days: %d\n", dbInfos->days); - fprintf(fp, "keep0,keep1,keep(D): %s\n", dbInfos->keeplist); + fprintf(fp, "days: %d\n", dbInfos->days); + fprintf(fp, "keep0,keep1,keep(D): %s\n", dbInfos->keeplist); fprintf(fp, "cache(MB): %d\n", dbInfos->cache); fprintf(fp, "blocks: %d\n", dbInfos->blocks); fprintf(fp, "minrows: %d\n", dbInfos->minrows); @@ -1676,10 +1678,10 @@ static void printfDbInfoForQueryToFile(char* filename, SDbInfo* dbInfos, int ind fprintf(fp, "wallevel: %d\n", dbInfos->wallevel); fprintf(fp, "fsync: %d\n", dbInfos->fsync); fprintf(fp, "comp: %d\n", dbInfos->comp); - fprintf(fp, "cachelast: %d\n", dbInfos->cachelast); - fprintf(fp, "precision: %s\n", dbInfos->precision); + fprintf(fp, "cachelast: %d\n", dbInfos->cachelast); + fprintf(fp, "precision: %s\n", dbInfos->precision); fprintf(fp, "update: %d\n", dbInfos->update); - fprintf(fp, "status: %s\n", dbInfos->status); + fprintf(fp, "status: %s\n", dbInfos->status); fprintf(fp, "\n"); fclose(fp); @@ -1729,7 +1731,7 @@ static void printfQuerySystemInfo(TAOS * taos) { snprintf(buffer, MAX_QUERY_SQL_LENGTH, "show %s.vgroups;", dbInfos[i]->name); res = taos_query(taos, buffer); xDumpResultToFile(filename, res); - + // show db.stables snprintf(buffer, MAX_QUERY_SQL_LENGTH, "show %s.stables;", dbInfos[i]->name); res = taos_query(taos, buffer); @@ -1739,7 +1741,6 @@ static void printfQuerySystemInfo(TAOS * taos) { } free(dbInfos); - } static int postProceSql(char* host, uint16_t port, char* sqlstr) @@ -1991,17 +1992,17 @@ static char* generateTagVaulesForStb(SSuperTable* stbInfo) { } dataLen -= 2; - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, ")"); + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, ")"); return dataBuf; } -static int calcRowLen(SSuperTable* superTbls) { +static int calcRowLen(SSuperTable* superTbls) { int colIndex; int lenOfOneRow = 0; - + for (colIndex = 0; colIndex < superTbls->columnCount; colIndex++) { char* dataType = superTbls->columns[colIndex].dataType; - + if (strcasecmp(dataType, "BINARY") == 0) { lenOfOneRow += superTbls->columns[colIndex].dataLen + 3; } else if (strcasecmp(dataType, "NCHAR") == 0) { @@ -2018,9 +2019,9 @@ static int calcRowLen(SSuperTable* superTbls) { lenOfOneRow += 6; } else if (strcasecmp(dataType, "FLOAT") == 0) { lenOfOneRow += 22; - } else if (strcasecmp(dataType, "DOUBLE") == 0) { + } else if (strcasecmp(dataType, "DOUBLE") == 0) { lenOfOneRow += 42; - } else if (strcasecmp(dataType, "TIMESTAMP") == 0) { + } else if (strcasecmp(dataType, "TIMESTAMP") == 0) { lenOfOneRow += 21; } else { printf("get error data type : %s\n", dataType); @@ -2060,7 +2061,7 @@ static int calcRowLen(SSuperTable* superTbls) { } superTbls->lenOfTagOfOneRow = lenOfTagOfOneRow; - + return 0; } @@ -2072,7 +2073,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, char command[BUFFER_SIZE] = "\0"; char limitBuf[100] = "\0"; - TAOS_RES * res; + TAOS_RES * res; TAOS_ROW row = NULL; char* childTblName = *childTblNameOfSuperTbl; @@ -2081,10 +2082,10 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, snprintf(limitBuf, 100, " limit %d offset %d", limit, offset); } - //get all child table name use cmd: select tbname from superTblName; + //get all child table name use cmd: select tbname from superTblName; snprintf(command, BUFFER_SIZE, "select tbname from %s.%s %s", dbName, sTblName, limitBuf); - res = taos_query(taos, command); + res = taos_query(taos, command); int32_t code = taos_errno(res); if (code != 0) { taos_free_result(res); @@ -2153,7 +2154,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, SSuperTable* superTbls) { char command[BUFFER_SIZE] = "\0"; - TAOS_RES * res; + TAOS_RES * res; TAOS_ROW row = NULL; int count = 0; @@ -2474,7 +2475,7 @@ static int createDatabases() { if ((ret != 0) || (g_Dbs.db[i].drop)) { ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric); - + if (0 != ret) { errorPrint("\ncreate super table %d failed!\n\n", j); taos_close(taos); @@ -2618,7 +2619,7 @@ static int startMultiThreadCreateChildTable( int b = 0; b = ntables % threads; - + for (int i = 0; i < threads; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; @@ -2806,7 +2807,7 @@ static int readSampleFromCsvFileToMem( ssize_t readLen = 0; char * line = NULL; int getRows = 0; - + FILE* fp = fopen(superTblInfo->sampleFile, "r"); if (fp == NULL) { errorPrint( "Failed to open sample file: %s, reason:%s\n", @@ -2828,7 +2829,7 @@ static int readSampleFromCsvFileToMem( } continue; } - + if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) { line[--readLen] = 0; } @@ -2939,7 +2940,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile( debugPrint("%s() LN%d, failed to read json, tags not found\n", __func__, __LINE__); goto PARSE_OVER; } - + int tagSize = cJSON_GetArraySize(tags); if (tagSize > MAX_TAG_COUNT) { debugPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n", __func__, __LINE__, MAX_TAG_COUNT); @@ -3025,7 +3026,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { cJSON* user = cJSON_GetObjectItem(root, "user"); if (user && user->type == cJSON_String && user->valuestring != NULL) { - tstrncpy(g_Dbs.user, user->valuestring, MAX_DB_NAME_SIZE); + tstrncpy(g_Dbs.user, user->valuestring, MAX_DB_NAME_SIZE); } else if (!user) { tstrncpy(g_Dbs.user, "root", MAX_DB_NAME_SIZE); } @@ -3052,8 +3053,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else { printf("ERROR: failed to read json, threads not found\n"); goto PARSE_OVER; - } - + } + cJSON* threads2 = cJSON_GetObjectItem(root, "thread_count_create_tbl"); if (threads2 && threads2->type == cJSON_Number) { g_Dbs.threadCountByCreateTbl = threads2->valueint; @@ -3063,7 +3064,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { errorPrint("%s() LN%d, failed to read json, threads2 not found\n", __func__, __LINE__); goto PARSE_OVER; - } + } cJSON* gInsertInterval = cJSON_GetObjectItem(root, "insert_interval"); if (gInsertInterval && gInsertInterval->type == cJSON_Number) { @@ -3088,12 +3089,23 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { cJSON* interlaceRows = cJSON_GetObjectItem(root, "interlace_rows"); if (interlaceRows && interlaceRows->type == cJSON_Number) { g_args.interlace_rows = interlaceRows->valueint; + + // rows per table need be less than insert batch + if (g_args.interlace_rows > g_args.num_of_RPR) { + printf("NOTICE: interlace rows value %d > num_of_records_per_request %d\n\n", + g_args.interlace_rows, g_args.num_of_RPR); + printf(" interlace rows value will be set to num_of_records_per_request %d\n\n", + g_args.num_of_RPR); + printf(" press Enter key to continue or Ctrl+C to stop."); + (void)getchar(); + g_args.interlace_rows = g_args.num_of_RPR; + } } else if (!interlaceRows) { g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req } else { errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n", __func__, __LINE__); goto PARSE_OVER; - } + } cJSON* maxSqlLen = cJSON_GetObjectItem(root, "max_sql_len"); if (maxSqlLen && maxSqlLen->type == cJSON_Number) { @@ -3104,7 +3116,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n", __func__, __LINE__); goto PARSE_OVER; } - cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req"); if (numRecPerReq && numRecPerReq->type == cJSON_Number) { @@ -3117,7 +3128,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no, - if (answerPrompt + if (answerPrompt && answerPrompt->type == cJSON_String && answerPrompt->valuestring != NULL) { if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) { @@ -3132,7 +3143,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else { printf("ERROR: failed to read json, confirm_parameter_prompt not found\n"); goto PARSE_OVER; - } + } cJSON* dbs = cJSON_GetObjectItem(root, "databases"); if (!dbs || dbs->type != cJSON_Array) { @@ -3159,7 +3170,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("ERROR: failed to read json, dbinfo not found\n"); goto PARSE_OVER; } - + cJSON *dbName = cJSON_GetObjectItem(dbinfo, "name"); if (!dbName || dbName->type != cJSON_String || dbName->valuestring == NULL) { printf("ERROR: failed to read json, db name not found\n"); @@ -3173,7 +3184,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { g_Dbs.db[i].drop = true; } else { g_Dbs.db[i].drop = false; - } + } } else if (!drop) { g_Dbs.db[i].drop = g_args.drop_database; } else { @@ -3224,7 +3235,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("ERROR: failed to read json, keep not found\n"); goto PARSE_OVER; } - + cJSON* days = cJSON_GetObjectItem(dbinfo, "days"); if (days && days->type == cJSON_Number) { g_Dbs.db[i].dbCfg.days = days->valueint; @@ -3234,7 +3245,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("ERROR: failed to read json, days not found\n"); goto PARSE_OVER; } - + cJSON* cache = cJSON_GetObjectItem(dbinfo, "cache"); if (cache && cache->type == cJSON_Number) { g_Dbs.db[i].dbCfg.cache = cache->valueint; @@ -3244,7 +3255,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("ERROR: failed to read json, cache not found\n"); goto PARSE_OVER; } - + cJSON* blocks= cJSON_GetObjectItem(dbinfo, "blocks"); if (blocks && blocks->type == cJSON_Number) { g_Dbs.db[i].dbCfg.blocks = blocks->valueint; @@ -3333,15 +3344,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else { printf("ERROR: failed to read json, fsync not found\n"); goto PARSE_OVER; - } + } - // super_talbes + // super_talbes cJSON *stables = cJSON_GetObjectItem(dbinfos, "super_tables"); if (!stables || stables->type != cJSON_Array) { printf("ERROR: failed to read json, super_tables not found\n"); goto PARSE_OVER; - } - + } + int stbSize = cJSON_GetArraySize(stables); if (stbSize > MAX_SUPER_TABLE_COUNT) { errorPrint( @@ -3354,15 +3365,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { for (int j = 0; j < stbSize; ++j) { cJSON* stbInfo = cJSON_GetArrayItem(stables, j); if (stbInfo == NULL) continue; - - // dbinfo + + // dbinfo cJSON *stbName = cJSON_GetObjectItem(stbInfo, "name"); if (!stbName || stbName->type != cJSON_String || stbName->valuestring == NULL) { printf("ERROR: failed to read json, stb name not found\n"); goto PARSE_OVER; } tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring, MAX_TB_NAME_SIZE); - + cJSON *prefix = cJSON_GetObjectItem(stbInfo, "childtable_prefix"); if (!prefix || prefix->type != cJSON_String || prefix->valuestring == NULL) { printf("ERROR: failed to read json, childtable_prefix not found\n"); @@ -3387,7 +3398,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("ERROR: failed to read json, auto_create_table not found\n"); goto PARSE_OVER; } - + cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num"); if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) { g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint; @@ -3396,7 +3407,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else { printf("ERROR: failed to read json, batch_create_tbl_num not found\n"); goto PARSE_OVER; - } + } cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no if (childTblExists @@ -3412,13 +3423,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!childTblExists) { g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS; } else { - errorPrint("%s() LN%d, failed to read json, child_table_exists not found\n", __func__, __LINE__); + errorPrint("%s() LN%d, failed to read json, child_table_exists not found\n", + __func__, __LINE__); goto PARSE_OVER; } - + cJSON* count = cJSON_GetObjectItem(stbInfo, "childtable_count"); if (!count || count->type != cJSON_Number || 0 >= count->valueint) { - errorPrint("%s() LN%d, failed to read json, childtable_count not found\n", __func__, __LINE__); + errorPrint("%s() LN%d, failed to read json, childtable_count not found\n", + __func__, __LINE__); goto PARSE_OVER; } g_Dbs.db[i].superTbls[j].childTblCount = count->valueint; @@ -3502,10 +3515,11 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else { printf("ERROR: failed to read json, sample_buf_size not found\n"); goto PARSE_OVER; - } + } cJSON *sampleFormat = cJSON_GetObjectItem(stbInfo, "sample_format"); - if (sampleFormat && sampleFormat->type == cJSON_String && sampleFormat->valuestring != NULL) { + if (sampleFormat && sampleFormat->type + == cJSON_String && sampleFormat->valuestring != NULL) { tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, sampleFormat->valuestring, MAX_DB_NAME_SIZE); } else if (!sampleFormat) { @@ -3513,7 +3527,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else { printf("ERROR: failed to read json, sample_format not found\n"); goto PARSE_OVER; - } + } cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file"); if (sampleFile && sampleFile->type == cJSON_String && sampleFile->valuestring != NULL) { @@ -3524,7 +3538,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else { printf("ERROR: failed to read json, sample_file not found\n"); goto PARSE_OVER; - } + } cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file"); if (tagsFile && tagsFile->type == cJSON_String && tagsFile->valuestring != NULL) { @@ -3550,14 +3564,14 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { len = TSDB_MAX_ALLOWED_SQL_LEN; } else if (len < TSDB_MAX_SQL_LEN) { len = TSDB_MAX_SQL_LEN; - } + } g_Dbs.db[i].superTbls[j].maxSqlLen = len; } else if (!maxSqlLen) { g_Dbs.db[i].superTbls[j].maxSqlLen = TSDB_MAX_SQL_LEN; } else { printf("ERROR: failed to read json, maxSqlLen not found\n"); goto PARSE_OVER; - } + } cJSON *multiThreadWriteOneTbl = cJSON_GetObjectItem(stbInfo, "multi_thread_write_one_tbl"); // no , yes @@ -3568,7 +3582,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 1; } else { g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 0; - } + } } else if (!multiThreadWriteOneTbl) { g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 0; } else { @@ -3579,6 +3593,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { cJSON* interlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows"); if (interlaceRows && interlaceRows->type == cJSON_Number) { g_Dbs.db[i].superTbls[j].interlaceRows = interlaceRows->valueint; + // rows per table need be less than insert batch + if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) { + printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %d > num_of_records_per_request %d\n\n", + i, j, g_Dbs.db[i].superTbls[j].interlaceRows, g_args.num_of_RPR); + printf(" interlace rows value will be set to num_of_records_per_request %d\n\n", + g_args.num_of_RPR); + printf(" press Enter key to continue or Ctrl+C to stop."); + (void)getchar(); + g_Dbs.db[i].superTbls[j].interlaceRows = g_args.num_of_RPR; + } } else if (!interlaceRows) { g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req } else { @@ -3586,7 +3610,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { "%s() LN%d, failed to read json, interlace rows input mistake\n", __func__, __LINE__); goto PARSE_OVER; - } + } cJSON* disorderRatio = cJSON_GetObjectItem(stbInfo, "disorder_ratio"); if (disorderRatio && disorderRatio->type == cJSON_Number) { @@ -3596,7 +3620,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else { printf("ERROR: failed to read json, disorderRatio not found\n"); goto PARSE_OVER; - } + } cJSON* disorderRange = cJSON_GetObjectItem(stbInfo, "disorder_range"); if (disorderRange && disorderRange->type == cJSON_Number) { @@ -3636,8 +3660,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { stbInfo, &g_Dbs.db[i].superTbls[j]); if (false == retVal) { goto PARSE_OVER; - } - } + } + } } ret = true; @@ -3703,7 +3727,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else { printf("ERROR: failed to read json, confirm_parameter_prompt not found\n"); goto PARSE_OVER; - } + } cJSON* dbs = cJSON_GetObjectItem(root, "databases"); if (dbs && dbs->type == cJSON_String && dbs->valuestring != NULL) { @@ -3722,7 +3746,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { printf("ERROR: failed to read json, query_mode not found\n"); goto PARSE_OVER; } - + // super_table_query cJSON *superQuery = cJSON_GetObjectItem(root, "specified_table_query"); if (!superQuery) { @@ -3731,26 +3755,26 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else if (superQuery->type != cJSON_Object) { printf("ERROR: failed to read json, super_table_query not found\n"); goto PARSE_OVER; - } else { + } else { cJSON* rate = cJSON_GetObjectItem(superQuery, "query_interval"); if (rate && rate->type == cJSON_Number) { g_queryInfo.superQueryInfo.rate = rate->valueint; } else if (!rate) { g_queryInfo.superQueryInfo.rate = 0; } - + cJSON* concurrent = cJSON_GetObjectItem(superQuery, "concurrent"); if (concurrent && concurrent->type == cJSON_Number) { g_queryInfo.superQueryInfo.concurrent = concurrent->valueint; } else if (!concurrent) { g_queryInfo.superQueryInfo.concurrent = 1; } - + cJSON* mode = cJSON_GetObjectItem(superQuery, "mode"); if (mode && mode->type == cJSON_String && mode->valuestring != NULL) { - if (0 == strcmp("sync", mode->valuestring)) { + if (0 == strcmp("sync", mode->valuestring)) { g_queryInfo.superQueryInfo.subscribeMode = 0; - } else if (0 == strcmp("async", mode->valuestring)) { + } else if (0 == strcmp("async", mode->valuestring)) { g_queryInfo.superQueryInfo.subscribeMode = 1; } else { printf("ERROR: failed to read json, subscribe mod error\n"); @@ -3759,21 +3783,21 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else { g_queryInfo.superQueryInfo.subscribeMode = 0; } - + cJSON* interval = cJSON_GetObjectItem(superQuery, "interval"); if (interval && interval->type == cJSON_Number) { g_queryInfo.superQueryInfo.subscribeInterval = interval->valueint; - } else if (!interval) { + } else if (!interval) { //printf("failed to read json, subscribe interval no found\n"); //goto PARSE_OVER; g_queryInfo.superQueryInfo.subscribeInterval = 10000; } - + cJSON* restart = cJSON_GetObjectItem(superQuery, "restart"); if (restart && restart->type == cJSON_String && restart->valuestring != NULL) { - if (0 == strcmp("yes", restart->valuestring)) { + if (0 == strcmp("yes", restart->valuestring)) { g_queryInfo.superQueryInfo.subscribeRestart = 1; - } else if (0 == strcmp("no", restart->valuestring)) { + } else if (0 == strcmp("no", restart->valuestring)) { g_queryInfo.superQueryInfo.subscribeRestart = 0; } else { printf("ERROR: failed to read json, subscribe restart error\n"); @@ -3782,14 +3806,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else { g_queryInfo.superQueryInfo.subscribeRestart = 1; } - + cJSON* keepProgress = cJSON_GetObjectItem(superQuery, "keepProgress"); if (keepProgress && keepProgress->type == cJSON_String && keepProgress->valuestring != NULL) { - if (0 == strcmp("yes", keepProgress->valuestring)) { + if (0 == strcmp("yes", keepProgress->valuestring)) { g_queryInfo.superQueryInfo.subscribeKeepProgress = 1; - } else if (0 == strcmp("no", keepProgress->valuestring)) { + } else if (0 == strcmp("no", keepProgress->valuestring)) { g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; } else { printf("ERROR: failed to read json, subscribe keepProgress error\n"); @@ -3799,14 +3823,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; } - // sqls + // sqls cJSON* superSqls = cJSON_GetObjectItem(superQuery, "sqls"); if (!superSqls) { g_queryInfo.superQueryInfo.sqlCount = 0; } else if (superSqls->type != cJSON_Array) { printf("ERROR: failed to read json, super sqls not found\n"); goto PARSE_OVER; - } else { + } else { int superSqlSize = cJSON_GetArraySize(superSqls); if (superSqlSize > MAX_QUERY_SQL_COUNT) { printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT); @@ -3817,7 +3841,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { for (int j = 0; j < superSqlSize; ++j) { cJSON* sql = cJSON_GetArrayItem(superSqls, j); if (sql == NULL) continue; - + cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql"); if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) { printf("ERROR: failed to read json, sql not found\n"); @@ -3833,7 +3857,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else { printf("ERROR: failed to read json, super query result file not found\n"); goto PARSE_OVER; - } + } } } } @@ -3879,9 +3903,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { cJSON* submode = cJSON_GetObjectItem(subQuery, "mode"); if (submode && submode->type == cJSON_String && submode->valuestring != NULL) { - if (0 == strcmp("sync", submode->valuestring)) { + if (0 == strcmp("sync", submode->valuestring)) { g_queryInfo.subQueryInfo.subscribeMode = 0; - } else if (0 == strcmp("async", submode->valuestring)) { + } else if (0 == strcmp("async", submode->valuestring)) { g_queryInfo.subQueryInfo.subscribeMode = 1; } else { printf("ERROR: failed to read json, subscribe mod error\n"); @@ -3899,12 +3923,12 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { //goto PARSE_OVER; g_queryInfo.subQueryInfo.subscribeInterval = 10000; } - + cJSON* subrestart = cJSON_GetObjectItem(subQuery, "restart"); if (subrestart && subrestart->type == cJSON_String && subrestart->valuestring != NULL) { - if (0 == strcmp("yes", subrestart->valuestring)) { + if (0 == strcmp("yes", subrestart->valuestring)) { g_queryInfo.subQueryInfo.subscribeRestart = 1; - } else if (0 == strcmp("no", subrestart->valuestring)) { + } else if (0 == strcmp("no", subrestart->valuestring)) { g_queryInfo.subQueryInfo.subscribeRestart = 0; } else { printf("ERROR: failed to read json, subscribe restart error\n"); @@ -3913,14 +3937,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else { g_queryInfo.subQueryInfo.subscribeRestart = 1; } - + cJSON* subkeepProgress = cJSON_GetObjectItem(subQuery, "keepProgress"); if (subkeepProgress && subkeepProgress->type == cJSON_String && subkeepProgress->valuestring != NULL) { - if (0 == strcmp("yes", subkeepProgress->valuestring)) { + if (0 == strcmp("yes", subkeepProgress->valuestring)) { g_queryInfo.subQueryInfo.subscribeKeepProgress = 1; - } else if (0 == strcmp("no", subkeepProgress->valuestring)) { + } else if (0 == strcmp("no", subkeepProgress->valuestring)) { g_queryInfo.subQueryInfo.subscribeKeepProgress = 0; } else { printf("ERROR: failed to read json, subscribe keepProgress error\n"); @@ -3928,27 +3952,27 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } } else { g_queryInfo.subQueryInfo.subscribeKeepProgress = 0; - } + } - // sqls + // sqls cJSON* subsqls = cJSON_GetObjectItem(subQuery, "sqls"); if (!subsqls) { g_queryInfo.subQueryInfo.sqlCount = 0; } else if (subsqls->type != cJSON_Array) { printf("ERROR: failed to read json, super sqls not found\n"); goto PARSE_OVER; - } else { + } else { int superSqlSize = cJSON_GetArraySize(subsqls); if (superSqlSize > MAX_QUERY_SQL_COUNT) { printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT); goto PARSE_OVER; } - + g_queryInfo.subQueryInfo.sqlCount = superSqlSize; - for (int j = 0; j < superSqlSize; ++j) { + for (int j = 0; j < superSqlSize; ++j) { cJSON* sql = cJSON_GetArrayItem(subsqls, j); if (sql == NULL) continue; - + cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql"); if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) { printf("ERROR: failed to read json, sql not found\n"); @@ -3964,7 +3988,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else { printf("ERROR: failed to read json, sub query result file not found\n"); goto PARSE_OVER; - } + } } } } @@ -4033,7 +4057,7 @@ static bool getInfoFromJsonFile(char* file) { } else { errorPrint("%s() LN%d, input json file type error! please input correct file type: insert or query or subscribe\n", __func__, __LINE__); goto PARSE_OVER; - } + } PARSE_OVER: free(content); @@ -4043,7 +4067,7 @@ PARSE_OVER: } static void prepareSampleData() { - for (int i = 0; i < g_Dbs.dbCount; i++) { + for (int i = 0; i < g_Dbs.dbCount; i++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { if (g_Dbs.db[i].superTbls[j].tagsFile[0] != 0) { (void)readTagFromCsvFileToMem(&g_Dbs.db[i].superTbls[j]); @@ -4054,7 +4078,7 @@ static void prepareSampleData() { static void postFreeResource() { tmfclose(g_fpOfInsertResult); - for (int i = 0; i < g_Dbs.dbCount; i++) { + for (int i = 0; i < g_Dbs.dbCount; i++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) { free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); @@ -4105,7 +4129,7 @@ static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp, static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* stbInfo) { int dataLen = 0; dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp); - for (int i = 0; i < stbInfo->columnCount; i++) { + for (int i = 0; i < stbInfo->columnCount; i++) { if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6)) || (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) { if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) { @@ -4316,28 +4340,28 @@ static int generateDataTail(char *tableName, int32_t tableSeq, if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { retLen = getRowDataFromSample( - buffer + len, - superTblInfo->maxSqlLen - len, + buffer + len, + superTblInfo->maxSqlLen - len, startTime + superTblInfo->timeStampStep * k, - superTblInfo, + superTblInfo, pSamplePos); } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { int rand_num = rand_tinyint() % 100; - if (0 != superTblInfo->disorderRatio + if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) { int64_t d = startTime + superTblInfo->timeStampStep * k - taosRandom() % superTblInfo->disorderRange; retLen = generateRowData( - buffer + len, + buffer + len, superTblInfo->maxSqlLen - len, - d, + d, superTblInfo); } else { retLen = generateRowData( - buffer + len, - superTblInfo->maxSqlLen - len, + buffer + len, + superTblInfo->maxSqlLen - len, startTime + superTblInfo->timeStampStep * k, superTblInfo); } @@ -4361,7 +4385,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq, if ((g_args.disorderRatio != 0) && (rand_num < g_args.disorderRange)) { - + int64_t d = startTime + DEFAULT_TIMESTAMP_STEP * k - taosRandom() % 1000000 + rand_num; len = generateData(data, data_type, @@ -4489,6 +4513,18 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { pThreadInfo->threadID, __func__, __LINE__); SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows; + + int insertMode; + + if (interlaceRows > 0) { + insertMode = INTERLACE_INSERT_MODE; + } else { + insertMode = PROGRESSIVE_INSERT_MODE; + } + + // TODO: prompt tbl count multple interlace rows and batch + // char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); if (NULL == buffer) { @@ -4497,22 +4533,10 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { strerror(errno)); return NULL; } - - int insertMode; + + char tableName[TSDB_TABLE_NAME_LEN]; - int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows; - - if (interlaceRows > 0) { - insertMode = INTERLACE_INSERT_MODE; - } else { - insertMode = PROGRESSIVE_INSERT_MODE; - } - - // rows per table need be less than insert batch - if (interlaceRows > g_args.num_of_RPR) - interlaceRows = g_args.num_of_RPR; - pThreadInfo->totalInsertRows = 0; pThreadInfo->totalAffectedRows = 0; @@ -4715,7 +4739,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { strerror(errno)); return NULL; } - + int64_t lastPrintTime = taosGetTimestampMs(); int64_t startTs = taosGetTimestampUs(); int64_t endTs; @@ -4856,7 +4880,7 @@ static void callBack(void *param, TAOS_RES *res, int code) { taosMsleep(insert_interval - (winfo->et - winfo->st)/1000); // ms } } - + char *buffer = calloc(1, winfo->superTblInfo->maxSqlLen); char *data = calloc(1, MAX_DATA_SIZE); char *pstr = buffer; @@ -4874,7 +4898,7 @@ static void callBack(void *param, TAOS_RES *res, int code) { taos_free_result(res); return; } - + for (int i = 0; i < g_args.num_of_RPR; i++) { int rand_num = taosRandom() % 100; if (0 != winfo->superTblInfo->disorderRatio @@ -4911,7 +4935,7 @@ static void *asyncWrite(void *sarg) { winfo->st = 0; winfo->et = 0; winfo->lastTs = winfo->start_time; - + int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; if (insert_interval) { @@ -5000,7 +5024,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, } double start = getCurrentTime(); - + int startFrom; if ((superTblInfo) && (superTblInfo->childTblOffset >= 0)) @@ -5192,7 +5216,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, } static void *readTable(void *sarg) { -#if 1 +#if 1 threadInfo *rinfo = (threadInfo *)sarg; TAOS *taos = rinfo->taos; char command[BUFFER_SIZE] = "\0"; @@ -5359,7 +5383,7 @@ static int insertTestProcess() { printf("Press enter key to continue\n\n"); (void)getchar(); } - + init_rand_data(); // create database and super tables @@ -5448,7 +5472,7 @@ static void *superQueryProcess(void *sarg) { //char sqlStr[MAX_TB_NAME_SIZE*2]; //sprintf(sqlStr, "use %s", g_queryInfo.dbName); //queryDB(winfo->taos, sqlStr); - + int64_t st = 0; int64_t et = 0; @@ -5603,7 +5627,7 @@ static int queryTestProcess() { printf("Press enter key to continue\n\n"); (void)getchar(); } - + printfQuerySystemInfo(taos); pthread_t *pids = NULL; @@ -5676,20 +5700,20 @@ static int queryTestProcess() { int ntables = g_queryInfo.subQueryInfo.childTblCount; int threads = g_queryInfo.subQueryInfo.threadCnt; - + int a = ntables / threads; if (a < 1) { threads = ntables; a = 1; } - + int b = 0; if (threads != 0) { b = ntables % threads; } int startFrom = 0; - for (int i = 0; i < threads; i++) { + for (int i = 0; i < threads; i++) { threadInfo *t_info = infosOfSub + i; t_info->threadID = i; @@ -5705,21 +5729,21 @@ static int queryTestProcess() { } else { g_queryInfo.subQueryInfo.threadCnt = 0; } - + for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { pthread_join(pids[i], NULL); } tmfree((char*)pids); tmfree((char*)infos); - + for (int i = 0; i < g_queryInfo.subQueryInfo.threadCnt; i++) { pthread_join(pidsOfSub[i], NULL); } tmfree((char*)pidsOfSub); tmfree((char*)infosOfSub); - + // taos_close(taos);// TODO: workaround to use separate taos connection; return 0; } @@ -5784,7 +5808,7 @@ static void *subSubscribeProcess(void *sarg) { taos_close(winfo->taos); return NULL; } - + //int64_t st = 0; //int64_t et = 0; do { @@ -5871,7 +5895,7 @@ static void *superSubscribeProcess(void *sarg) { taos_close(winfo->taos); return NULL; } - + //int64_t st = 0; //int64_t et = 0; do { @@ -5988,7 +6012,7 @@ static int subscribeTestProcess() { t_info->taos = NULL; // TODO: workaround to use separate taos connection; pthread_create(pids + i, NULL, superSubscribeProcess, t_info); } - + //==== create sub threads for query from sub table pthread_t *pidsOfSub = NULL; threadInfo *infosOfSub = NULL; @@ -6007,18 +6031,18 @@ static int subscribeTestProcess() { int ntables = g_queryInfo.subQueryInfo.childTblCount; int threads = g_queryInfo.subQueryInfo.threadCnt; - + int a = ntables / threads; if (a < 1) { threads = ntables; a = 1; } - + int b = 0; if (threads != 0) { b = ntables % threads; } - + int startFrom = 0; for (int i = 0; i < threads; i++) { threadInfo *t_info = infosOfSub + i; @@ -6033,7 +6057,7 @@ static int subscribeTestProcess() { } g_queryInfo.subQueryInfo.threadCnt = threads; } - + for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { pthread_join(pids[i], NULL); } @@ -6128,7 +6152,7 @@ static void setParaFromArg(){ g_Dbs.threadCount = g_args.num_of_threads; g_Dbs.threadCountByCreateTbl = g_args.num_of_threads; g_Dbs.queryMode = g_args.mode; - + g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL; g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS; g_Dbs.db[0].superTbls[0].disorderRange = g_args.disorderRange; @@ -6149,7 +6173,7 @@ static void setParaFromArg(){ if (data_type[i] == NULL) { break; } - + tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType, data_type[i], MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].columns[i].dataLen = g_args.len_of_binary; @@ -6168,7 +6192,7 @@ static void setParaFromArg(){ tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType, "INT", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0; - + tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType, "BINARY", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary; g_Dbs.db[0].superTbls[0].tagCount = 2;