From a273c635929d0f2c6d9516fb482187c15e433c54 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Mon, 8 Mar 2021 19:47:56 +0800 Subject: [PATCH 1/7] [TD-3147] : suppoert insert interval. print sleep time. --- src/kit/taosdemo/taosdemo.c | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 4a9038c71e..a8326a23c9 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -3918,7 +3918,7 @@ static void syncWriteForNumberOfTblInOneSql( send_to_server: if (g_args.insert_interval && (g_args.insert_interval > (et - st))) { int sleep_time = g_args.insert_interval - (et -st); - debugPrint("DEBUG sleep: %d ms\n", sleep_time); + printf("sleep: %d ms specified by insert_interval\n", sleep_time); taosMsleep(sleep_time); // ms } @@ -4135,7 +4135,7 @@ static void* syncWrite(void *sarg) { if (i > 0 && g_args.insert_interval && (g_args.insert_interval > (et - st) )) { int sleep_time = g_args.insert_interval - (et -st); - debugPrint("DEBUG sleep: %d ms\n", sleep_time); + printf("sleep: %d ms specified by insert_interval\n", sleep_time); taosMsleep(sleep_time); // ms } @@ -4242,6 +4242,17 @@ static void* syncWriteWithStb(void *sarg) { uint64_t inserted = i; uint64_t tmp_time = time_counter; + if (i > 0 && g_args.insert_interval + && (g_args.insert_interval > (et - st) )) { + int sleep_time = g_args.insert_interval - (et -st); + printf("sleep: %d ms specified by insert_interval\n", sleep_time); + taosMsleep(sleep_time); // ms + } + + if (g_args.insert_interval) { + st = taosGetTimestampMs(); + } + int sampleUsePos = samplePos; int k = 0; debugPrint("DEBUG - %s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); @@ -4326,6 +4337,7 @@ static void* syncWriteWithStb(void *sarg) { */ inserted++; k++; + i++; totalRowsInserted++; if (inserted > superTblInfo->insertRows) @@ -4334,16 +4346,6 @@ static void* syncWriteWithStb(void *sarg) { || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) break; */ - if (i > 0 && g_args.insert_interval - && (g_args.insert_interval > (et - st) )) { - int sleep_time = g_args.insert_interval - (et -st); - debugPrint("DEBUG sleep: %d ms\n", sleep_time); - taosMsleep(sleep_time); // ms - } - - if (g_args.insert_interval) { - st = taosGetTimestampMs(); - } if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { //printf("===== sql: %s \n\n", buffer); From b8bc8759d12f32ef4e74083ee114a5f53de9c04f Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Mon, 8 Mar 2021 23:29:34 +0800 Subject: [PATCH 2/7] [TD-3147] : support insert interval. --- src/kit/taosdemo/taosdemo.c | 55 ++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 373f89ab63..9b2a1e1649 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -227,7 +227,7 @@ typedef struct SSuperTable_S { int disorderRange; // ms or us by database precision int maxSqlLen; // - int64_t insertRows; // 0: no limit +// int64_t insertRows; // 0: no limit int timeStampStep; char startTimestamp[MAX_TB_NAME_SIZE]; // char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json @@ -748,7 +748,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { printf("\n"); } printf("# Insertion interval: %d\n", arguments->insert_interval); - printf("# Number of Columns per record: %d\n", arguments->num_of_RPR); + printf("# Number of records per req: %d\n", arguments->num_of_RPR); printf("# Number of Threads: %d\n", arguments->num_of_threads); printf("# Number of Tables: %d\n", arguments->num_of_tables); printf("# Number of Data per Table: %d\n", arguments->num_of_DPT); @@ -1078,7 +1078,7 @@ static int printfInsertMeta() { printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix); printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode); - printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows); +// printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { printf(" multiThreadWriteOneTbl: \033[33mno\033[0m\n"); @@ -1225,7 +1225,7 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode); - fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); +// fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { fprintf(fp, " multiThreadWriteOneTbl: no\n"); @@ -3223,8 +3223,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("failed to read json, disorderRange not found"); goto PARSE_OVER; } - - +/* cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows"); if (insertRows && insertRows->type == cJSON_Number) { g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint; @@ -3237,7 +3236,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("failed to read json, insert_rows not found"); goto PARSE_OVER; } - +*/ if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { continue; @@ -3781,7 +3780,8 @@ static void syncWriteForNumberOfTblInOneSql( int64_t st = 0; int64_t et = 0; - for (int i = 0; i < superTblInfo->insertRows;) { +// for (int i = 0; i < superTblInfo->insertRows;) { + for (int i = 0; i < g_args.num_of_RPR;) { int32_t tbl_id = 0; for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { int inserted = i; @@ -3904,7 +3904,8 @@ static void syncWriteForNumberOfTblInOneSql( k++; totalRowsInserted++; - if (inserted >= superTblInfo->insertRows || + // if (inserted >= superTblInfo->insertRows || + if (inserted >= g_args.num_of_RPR || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { tID = tbl_id + 1; printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", @@ -4235,13 +4236,16 @@ static void* syncWriteWithStb(void *sarg) { int64_t time_counter = winfo->start_time; uint64_t st = 0; uint64_t et = 0; - - debugPrint("DEBUG - %s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); - +/* + debugPrint("DEBUG - %s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, + superTblInfo->insertRows); for (int i = 0; i < superTblInfo->insertRows;) { +*/ + for (int i = 0; i < g_args.num_of_RPR;) { - for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - uint64_t inserted = i; + for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; + tID++) { + int64_t inserted = i; uint64_t tmp_time = time_counter; if (i > 0 && g_args.insert_interval @@ -4341,8 +4345,10 @@ static void* syncWriteWithStb(void *sarg) { k++; i++; totalRowsInserted++; + debugPrint("DEBUG - %s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); - if (inserted > superTblInfo->insertRows) +// if (inserted > superTblInfo->insertRows) + if (inserted > g_args.num_of_RPR) break; /* if (inserted >= superTblInfo->insertRows || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) @@ -4455,7 +4461,8 @@ void callBack(void *param, TAOS_RES *res, int code) { char *data = calloc(1, MAX_DATA_SIZE); char *pstr = buffer; pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id); - if (winfo->counter >= winfo->superTblInfo->insertRows) { +// if (winfo->counter >= winfo->superTblInfo->insertRows) { + if (winfo->counter >= g_args.num_of_RPR) { winfo->start_table_id++; winfo->counter = 0; } @@ -4481,7 +4488,8 @@ void callBack(void *param, TAOS_RES *res, int code) { pstr += sprintf(pstr, "%s", data); winfo->counter++; - if (winfo->counter >= winfo->superTblInfo->insertRows) { +// if (winfo->counter >= winfo->superTblInfo->insertRows) { + if (winfo->counter >= g_args.num_of_RPR) { break; } } @@ -4700,11 +4708,12 @@ void *readTable(void *sarg) { } int num_of_DPT; - if (rinfo->superTblInfo) { +/* if (rinfo->superTblInfo) { num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table; } else { + */ num_of_DPT = g_args.num_of_DPT; - } +// } int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; @@ -4767,7 +4776,8 @@ void *readMetric(void *sarg) { return NULL; } - int num_of_DPT = rinfo->superTblInfo->insertRows; +// int num_of_DPT = rinfo->superTblInfo->insertRows; + int num_of_DPT = g_args.num_of_DPT; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; bool do_aggreFunc = g_Dbs.do_aggreFunc; @@ -4886,7 +4896,8 @@ int insertTestProcess() { if (g_Dbs.db[i].superTblCount > 0) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; - if (0 == g_Dbs.db[i].superTbls[j].insertRows) { +// if (0 == g_Dbs.db[i].superTbls[j].insertRows) { + if (0 == g_args.num_of_DPT) { continue; } startMultiThreadInsertData( @@ -5511,7 +5522,7 @@ void setParaFromArg(){ "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].timeStampStep = 10; - g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; + // g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; g_Dbs.db[0].superTbls[0].columnCount = 0; From 19fc8eb86be706c0ec6650d97602ff802d680fd4 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 9 Mar 2021 00:06:51 +0800 Subject: [PATCH 3/7] [TD-3147] : support insert interval. add verbose print. --- src/kit/taosdemo/taosdemo.c | 62 +++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 9b2a1e1649..d3cda7bef8 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -185,6 +185,7 @@ typedef struct SArguments_S { bool insert_only; bool answer_yes; bool debug_print; + bool verbose_print; char * output_file; int mode; char * datatype[MAX_NUM_DATATYPE + 1]; @@ -489,6 +490,7 @@ SArguments g_args = { false, // use_metric false, // insert_only false, // debug_print + false, // verbose_print false, // answer_yes; "./output.txt", // output_file 0, // mode : sync or async @@ -526,7 +528,11 @@ static SQueryMetaInfo g_queryInfo; static FILE * g_fpOfInsertResult = NULL; #define debugPrint(fmt, ...) \ - do { if (g_args.debug_print) fprintf(stderr, fmt, __VA_ARGS__); } while(0) + do { if (g_args.debug_print || g_args.verbose_print) \ + fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0) +#define verbosePrint(fmt, ...) \ + do { if (g_args.verbose_print) fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0) + /////////////////////////////////////////////////// void printHelp() { @@ -691,6 +697,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->answer_yes = true; } else if (strcmp(argv[i], "-g") == 0) { arguments->debug_print = true; + } else if (strcmp(argv[i], "-gg") == 0) { + arguments->verbose_print = true; } else if (strcmp(argv[i], "-c") == 0) { strcpy(configDir, argv[++i]); } else if (strcmp(argv[i], "-O") == 0) { @@ -805,7 +813,7 @@ static int queryDbExec(TAOS *taos, char *command, int type) { } if (code != 0) { - debugPrint("DEBUG %s() LN%d - command: %s\n", __func__, __LINE__, command); + debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command); fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res)); taos_free_result(res); //taos_close(taos); @@ -1986,7 +1994,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, exit(-1); } snprintf(superTbls->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols); - debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); + debugPrint("%s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); if (use_metric) { char tags[STRING_LEN] = "\0"; @@ -2039,13 +2047,13 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s (ts timestamp%s) tags %s", dbName, superTbls->sTblName, cols, tags); - debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, command); + debugPrint("%s() LN%d: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName); return -1; } - debugPrint("DEBUG - create supertable %s success!\n\n", superTbls->sTblName); + debugPrint("create supertable %s success!\n\n", superTbls->sTblName); } return 0; } @@ -2064,7 +2072,7 @@ static int createDatabases() { for (int i = 0; i < g_Dbs.dbCount; i++) { if (g_Dbs.db[i].drop) { sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName); - debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); + verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { taos_close(taos); return -1; @@ -2132,7 +2140,7 @@ static int createDatabases() { "precision \'%s\';", g_Dbs.db[i].dbCfg.precision); } - debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); + debugPrint("%s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { taos_close(taos); printf("\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName); @@ -2140,11 +2148,11 @@ static int createDatabases() { } printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName); - debugPrint("DEBUG %s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount); + debugPrint("%s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount); for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { // describe super table, if exists sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName); - debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); + verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS; ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric); @@ -2232,7 +2240,7 @@ static void* createTable(void *sarg) } len = 0; - debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer); + debugPrint("%s() LN%d %s\n", __func__, __LINE__, buffer); if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ free(buffer); return NULL; @@ -2247,7 +2255,7 @@ static void* createTable(void *sarg) } if (0 != len) { - debugPrint("DEBUG %s() %d buffer: %s\n", __func__, __LINE__, buffer); + debugPrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer); (void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE); } @@ -2285,7 +2293,7 @@ int startMultiThreadCreateChildTable( t_info->threadID = i; tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE); t_info->superTblInfo = superTblInfo; - debugPrint("DEBUG %s() %d db_name: %s\n", __func__, __LINE__, db_name); + verbosePrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name); t_info->taos = taos_connect( g_Dbs.host, g_Dbs.user, @@ -2336,7 +2344,7 @@ static void createChildTables() { continue; } - debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, + verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); startMultiThreadCreateChildTable( g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, @@ -2362,7 +2370,7 @@ static void createChildTables() { len = snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")"); - debugPrint("DEBUG - %s() LN%d: dbName: %s num of tb: %d schema: %s\n", __func__, __LINE__, + verbosePrint("%s() LN%d: dbName: %s num of tb: %d schema: %s\n", __func__, __LINE__, g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf); startMultiThreadCreateChildTable( tblColsBuf, @@ -3586,7 +3594,7 @@ PARSE_OVER: } static bool getInfoFromJsonFile(char* file) { - debugPrint("DEBUG - %s %d %s\n", __func__, __LINE__, file); + debugPrint("%s %d %s\n", __func__, __LINE__, file); FILE *fp = fopen(file, "r"); if (!fp) { @@ -3938,7 +3946,7 @@ send_to_server: int64_t endTs; startTs = taosGetTimestampUs(); - debugPrint("DEBUG %s() LN%d buff: %s\n", __func__, __LINE__, buffer); + debugPrint("%s() LN%d buff: %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec( winfo->taos, buffer, INSERT_TYPE); @@ -4145,7 +4153,7 @@ static void* syncWrite(void *sarg) { if (g_args.insert_interval) { st = taosGetTimestampMs(); } - debugPrint("DEBUG - %s() LN%d %s\n", __func__, __LINE__, buffer); + debugPrint("%s() LN%d %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec(winfo->taos, buffer, 1); if (0 <= affectedRows){ @@ -4237,7 +4245,7 @@ static void* syncWriteWithStb(void *sarg) { uint64_t st = 0; uint64_t et = 0; /* - debugPrint("DEBUG - %s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, + debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); for (int i = 0; i < superTblInfo->insertRows;) { */ @@ -4261,7 +4269,7 @@ static void* syncWriteWithStb(void *sarg) { int sampleUsePos = samplePos; int k = 0; - debugPrint("DEBUG - %s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); + verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); for (k = 0; k < g_args.num_of_RPR;) { int len = 0; memset(buffer, 0, superTblInfo->maxSqlLen); @@ -4345,7 +4353,7 @@ static void* syncWriteWithStb(void *sarg) { k++; i++; totalRowsInserted++; - debugPrint("DEBUG - %s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); + debugPrint("%s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); // if (inserted > superTblInfo->insertRows) if (inserted > g_args.num_of_RPR) @@ -4362,7 +4370,7 @@ static void* syncWriteWithStb(void *sarg) { int64_t endTs; startTs = taosGetTimestampUs(); - debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer); + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); if (0 > affectedRows){ @@ -4848,7 +4856,7 @@ int insertTestProcess() { if (ret == -1) exit(EXIT_FAILURE); - debugPrint("DEBUG - %d result file: %s\n", __LINE__, g_Dbs.resultFile); + debugPrint("%d result file: %s\n", __LINE__, g_Dbs.resultFile); g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a"); if (NULL == g_fpOfInsertResult) { fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile); @@ -5085,7 +5093,7 @@ static int queryTestProcess() { char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); - debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); + verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); (void)queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE); } else { t_info->taos = NULL; @@ -5196,7 +5204,7 @@ void *subSubscribeProcess(void *sarg) { char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); - debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); + debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)){ return NULL; } @@ -5262,7 +5270,7 @@ void *superSubscribeProcess(void *sarg) { char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); - debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); + debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)) { return NULL; } @@ -5627,7 +5635,7 @@ void querySqlFile(TAOS* taos, char* sqlFile) } memcpy(cmd + cmd_len, line, read_len); - debugPrint("DEBUG %s() LN%d cmd: %s\n", __func__, __LINE__, cmd); + verbosePrint("%s() LN%d cmd: %s\n", __func__, __LINE__, cmd); queryDbExec(taos, cmd, NO_INSERT_TYPE); memset(cmd, 0, MAX_SQL_SIZE); cmd_len = 0; @@ -5715,7 +5723,7 @@ static void testCmdLine() { int main(int argc, char *argv[]) { parse_args(argc, argv, &g_args); - debugPrint("DEBUG - meta file: %s\n", g_args.metaFile); + debugPrint("meta file: %s\n", g_args.metaFile); if (g_args.metaFile) { initOfInsertMeta(); From 7df0a86699d3713d0279bdfd6c8b327be220e28c Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 9 Mar 2021 00:36:56 +0800 Subject: [PATCH 4/7] [TD-3147] : support insert interval. cleanup. --- src/kit/taosdemo/taosdemo.c | 44 ++----------------------------------- 1 file changed, 2 insertions(+), 42 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index d3cda7bef8..0ac8269b02 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -228,7 +228,6 @@ typedef struct SSuperTable_S { int disorderRange; // ms or us by database precision int maxSqlLen; // -// int64_t insertRows; // 0: no limit int timeStampStep; char startTimestamp[MAX_TB_NAME_SIZE]; // char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json @@ -1086,7 +1085,6 @@ static int printfInsertMeta() { printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix); printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode); -// printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { printf(" multiThreadWriteOneTbl: \033[33mno\033[0m\n"); @@ -1233,7 +1231,6 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode); -// fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { fprintf(fp, " multiThreadWriteOneTbl: no\n"); @@ -3231,20 +3228,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("failed to read json, disorderRange not found"); goto PARSE_OVER; } -/* - cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows"); - if (insertRows && insertRows->type == cJSON_Number) { - g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint; - //if (0 == g_Dbs.db[i].superTbls[j].insertRows) { - // g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; - //} - } else if (!insertRows) { - g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; - } else { - printf("failed to read json, insert_rows not found"); - goto PARSE_OVER; - } -*/ if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { continue; @@ -3788,7 +3771,6 @@ static void syncWriteForNumberOfTblInOneSql( int64_t st = 0; int64_t et = 0; -// for (int i = 0; i < superTblInfo->insertRows;) { for (int i = 0; i < g_args.num_of_RPR;) { int32_t tbl_id = 0; for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { @@ -3912,7 +3894,6 @@ static void syncWriteForNumberOfTblInOneSql( k++; totalRowsInserted++; - // if (inserted >= superTblInfo->insertRows || if (inserted >= g_args.num_of_RPR || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { tID = tbl_id + 1; @@ -4244,16 +4225,11 @@ static void* syncWriteWithStb(void *sarg) { int64_t time_counter = winfo->start_time; uint64_t st = 0; uint64_t et = 0; -/* - debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, - superTblInfo->insertRows); - for (int i = 0; i < superTblInfo->insertRows;) { -*/ for (int i = 0; i < g_args.num_of_RPR;) { for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - int64_t inserted = i; + int64_t inserted = 0; uint64_t tmp_time = time_counter; if (i > 0 && g_args.insert_interval @@ -4355,13 +4331,8 @@ static void* syncWriteWithStb(void *sarg) { totalRowsInserted++; debugPrint("%s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); -// if (inserted > superTblInfo->insertRows) if (inserted > g_args.num_of_RPR) break; -/* if (inserted >= superTblInfo->insertRows - || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) - break; -*/ if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { //printf("===== sql: %s \n\n", buffer); @@ -4469,7 +4440,6 @@ void callBack(void *param, TAOS_RES *res, int code) { char *data = calloc(1, MAX_DATA_SIZE); char *pstr = buffer; pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id); -// if (winfo->counter >= winfo->superTblInfo->insertRows) { if (winfo->counter >= g_args.num_of_RPR) { winfo->start_table_id++; winfo->counter = 0; @@ -4496,7 +4466,6 @@ void callBack(void *param, TAOS_RES *res, int code) { pstr += sprintf(pstr, "%s", data); winfo->counter++; -// if (winfo->counter >= winfo->superTblInfo->insertRows) { if (winfo->counter >= g_args.num_of_RPR) { break; } @@ -4715,13 +4684,7 @@ void *readTable(void *sarg) { return NULL; } - int num_of_DPT; -/* if (rinfo->superTblInfo) { - num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table; - } else { - */ - num_of_DPT = g_args.num_of_DPT; -// } + int num_of_DPT = g_args.num_of_DPT; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; @@ -4784,7 +4747,6 @@ void *readMetric(void *sarg) { return NULL; } -// int num_of_DPT = rinfo->superTblInfo->insertRows; int num_of_DPT = g_args.num_of_DPT; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; @@ -4904,7 +4866,6 @@ int insertTestProcess() { if (g_Dbs.db[i].superTblCount > 0) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; -// if (0 == g_Dbs.db[i].superTbls[j].insertRows) { if (0 == g_args.num_of_DPT) { continue; } @@ -5530,7 +5491,6 @@ void setParaFromArg(){ "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].timeStampStep = 10; - // g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; g_Dbs.db[0].superTbls[0].columnCount = 0; From 9e4f06790402a7be75cf096d36cce1e3048b1848 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 9 Mar 2021 12:59:25 +0800 Subject: [PATCH 5/7] [TD-3147] : support insert interval. normal table working. --- src/kit/taosdemo/taosdemo.c | 115 +++++++++++++++++++++++++----------- 1 file changed, 79 insertions(+), 36 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 0ac8269b02..8bc0ff7f12 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -228,6 +228,7 @@ typedef struct SSuperTable_S { int disorderRange; // ms or us by database precision int maxSqlLen; // + int64_t insertRows; // 0: no limit int timeStampStep; char startTimestamp[MAX_TB_NAME_SIZE]; // char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json @@ -1085,6 +1086,7 @@ static int printfInsertMeta() { printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix); printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode); + printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { printf(" multiThreadWriteOneTbl: \033[33mno\033[0m\n"); @@ -1231,6 +1233,7 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode); + fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { fprintf(fp, " multiThreadWriteOneTbl: no\n"); @@ -1991,7 +1994,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, exit(-1); } snprintf(superTbls->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols); - debugPrint("%s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); + verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); if (use_metric) { char tags[STRING_LEN] = "\0"; @@ -2044,7 +2047,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s (ts timestamp%s) tags %s", dbName, superTbls->sTblName, cols, tags); - debugPrint("%s() LN%d: %s\n", __func__, __LINE__, command); + verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName); @@ -2237,7 +2240,7 @@ static void* createTable(void *sarg) } len = 0; - debugPrint("%s() LN%d %s\n", __func__, __LINE__, buffer); + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ free(buffer); return NULL; @@ -2252,7 +2255,7 @@ static void* createTable(void *sarg) } if (0 != len) { - debugPrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer); + verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer); (void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE); } @@ -3228,6 +3231,20 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("failed to read json, disorderRange not found"); goto PARSE_OVER; } + + cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows"); + if (insertRows && insertRows->type == cJSON_Number) { + g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint; + //if (0 == g_Dbs.db[i].superTbls[j].insertRows) { + // g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; + //} + } else if (!insertRows) { + g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; + } else { + printf("failed to read json, insert_rows not found"); + goto PARSE_OVER; + } + if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { continue; @@ -3771,7 +3788,7 @@ static void syncWriteForNumberOfTblInOneSql( int64_t st = 0; int64_t et = 0; - for (int i = 0; i < g_args.num_of_RPR;) { + for (int i = 0; i < superTblInfo->insertRows;) { int32_t tbl_id = 0; for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { int inserted = i; @@ -3894,7 +3911,7 @@ static void syncWriteForNumberOfTblInOneSql( k++; totalRowsInserted++; - if (inserted >= g_args.num_of_RPR || + if (inserted >= superTblInfo->insertRows || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { tID = tbl_id + 1; printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", @@ -4080,10 +4097,13 @@ static void* syncWrite(void *sarg) { uint64_t st = 0; uint64_t et = 0; - for (int i = 0; i < g_args.num_of_DPT;) { + winfo->totalRowsInserted = 0; + winfo->totalAffectedRows = 0; - for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - int inserted = i; + for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { + for (int i = 0; i < g_args.num_of_DPT;) { + + int tblInserted = i; int64_t tmp_time = time_counter; char *pstr = buffer; @@ -4112,13 +4132,15 @@ static void* syncWrite(void *sarg) { } pstr += sprintf(pstr, " %s", data); - inserted++; + tblInserted++; k++; + i++; - if (inserted >= g_args.num_of_DPT) + if (tblInserted >= g_args.num_of_DPT) break; } + winfo->totalRowsInserted += k; /* puts(buffer); */ int64_t startTs; int64_t endTs; @@ -4134,9 +4156,10 @@ static void* syncWrite(void *sarg) { if (g_args.insert_interval) { st = taosGetTimestampMs(); } - debugPrint("%s() LN%d %s\n", __func__, __LINE__, buffer); + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec(winfo->taos, buffer, 1); - + + verbosePrint("%s() LN%d: affectedRows:%d\n", __func__, __LINE__, affectedRows); if (0 <= affectedRows){ endTs = taosGetTimestampUs(); int64_t delay = endTs - startTs; @@ -4146,20 +4169,26 @@ static void* syncWrite(void *sarg) { winfo->minDelay = delay; winfo->cntDelay++; winfo->totalDelay += delay; - //winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; + winfo->totalAffectedRows += affectedRows; + winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; } + verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64"\n", __func__, __LINE__, winfo->totalAffectedRows); if (g_args.insert_interval) { et = taosGetTimestampMs(); } - if (tID == winfo->end_table_id) { - i = inserted; - time_counter = tmp_time; - } - } + if (tblInserted >= g_args.num_of_DPT) { + break; + } + } // num_of_DPT + } // tId + + printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", + winfo->threadID, + winfo->totalRowsInserted, + winfo->totalAffectedRows); - } return NULL; } @@ -4225,11 +4254,14 @@ static void* syncWriteWithStb(void *sarg) { int64_t time_counter = winfo->start_time; uint64_t st = 0; uint64_t et = 0; - for (int i = 0; i < g_args.num_of_RPR;) { - for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; + debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, + superTblInfo->insertRows); + for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - int64_t inserted = 0; + for (int i = 0; i < superTblInfo->insertRows;) { + + int64_t inserted = i; uint64_t tmp_time = time_counter; if (i > 0 && g_args.insert_interval @@ -4244,9 +4276,8 @@ static void* syncWriteWithStb(void *sarg) { } int sampleUsePos = samplePos; - int k = 0; verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); - for (k = 0; k < g_args.num_of_RPR;) { + for (int k = 0; k < g_args.num_of_RPR;) { int len = 0; memset(buffer, 0, superTblInfo->maxSqlLen); char *pstr = buffer; @@ -4329,10 +4360,14 @@ static void* syncWriteWithStb(void *sarg) { k++; i++; totalRowsInserted++; - debugPrint("%s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); + verbosePrint("%s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); - if (inserted > g_args.num_of_RPR) + if (inserted > superTblInfo->insertRows) break; +/* if (inserted >= superTblInfo->insertRows + || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) + break; +*/ if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { //printf("===== sql: %s \n\n", buffer); @@ -4440,6 +4475,7 @@ void callBack(void *param, TAOS_RES *res, int code) { char *data = calloc(1, MAX_DATA_SIZE); char *pstr = buffer; pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id); +// if (winfo->counter >= winfo->superTblInfo->insertRows) { if (winfo->counter >= g_args.num_of_RPR) { winfo->start_table_id++; winfo->counter = 0; @@ -4466,7 +4502,7 @@ void callBack(void *param, TAOS_RES *res, int code) { pstr += sprintf(pstr, "%s", data); winfo->counter++; - if (winfo->counter >= g_args.num_of_RPR) { + if (winfo->counter >= winfo->superTblInfo->insertRows) { break; } } @@ -4631,12 +4667,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, if (superTblInfo) { superTblInfo->totalAffectedRows += t_info->totalAffectedRows; superTblInfo->totalRowsInserted += t_info->totalRowsInserted; - - totalDelay += t_info->totalDelay; - cntDelay += t_info->cntDelay; - if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; - if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; } + + totalDelay += t_info->totalDelay; + cntDelay += t_info->cntDelay; + if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; + if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; } cntDelay -= 1; @@ -4684,7 +4720,13 @@ void *readTable(void *sarg) { return NULL; } - int num_of_DPT = g_args.num_of_DPT; + int num_of_DPT; +/* if (rinfo->superTblInfo) { + num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table; + } else { + */ + num_of_DPT = g_args.num_of_DPT; +// } int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; @@ -4747,7 +4789,7 @@ void *readMetric(void *sarg) { return NULL; } - int num_of_DPT = g_args.num_of_DPT; + int num_of_DPT = rinfo->superTblInfo->insertRows; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; bool do_aggreFunc = g_Dbs.do_aggreFunc; @@ -4866,7 +4908,7 @@ int insertTestProcess() { if (g_Dbs.db[i].superTblCount > 0) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; - if (0 == g_args.num_of_DPT) { + if (0 == g_Dbs.db[i].superTbls[j].insertRows) { continue; } startMultiThreadInsertData( @@ -5491,6 +5533,7 @@ void setParaFromArg(){ "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].timeStampStep = 10; + g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; g_Dbs.db[0].superTbls[0].columnCount = 0; From 724fb5e03aecc217946fdc2a605ab097608b53b0 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 9 Mar 2021 14:47:17 +0800 Subject: [PATCH 6/7] [TD-3147] : support insert interval. stable works. --- src/kit/taosdemo/taosdemo.c | 164 ++++++++++++++++-------------------- 1 file changed, 74 insertions(+), 90 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 8bc0ff7f12..4f0e4a8766 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -3753,6 +3753,7 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* return (-1); } } + dataLen -= 2; dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); @@ -3820,7 +3821,7 @@ static void syncWriteForNumberOfTblInOneSql( if (0 == len) { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - "insert into %s.%s%d using %s.%s tags %s values ", + "insert into %s.%s%d using %s.%s tags %s values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id, @@ -3830,7 +3831,7 @@ static void syncWriteForNumberOfTblInOneSql( } else { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - " %s.%s%d using %s.%s tags %s values ", + " %s.%s%d using %s.%s tags %s values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id, @@ -3843,13 +3844,13 @@ static void syncWriteForNumberOfTblInOneSql( if (0 == len) { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - "insert into %s.%s values ", + "insert into %s.%s values ", winfo->db_name, superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); } else { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - " %s.%s values ", + " %s.%s values ", winfo->db_name, superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); } @@ -3857,14 +3858,14 @@ static void syncWriteForNumberOfTblInOneSql( if (0 == len) { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - "insert into %s.%s%d values ", + "insert into %s.%s%d values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id); } else { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - " %s.%s%d values ", + " %s.%s%d values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id); @@ -3899,7 +3900,7 @@ static void syncWriteForNumberOfTblInOneSql( } else { retLen = generateRowData(pstr + len, superTblInfo->maxSqlLen - len, - tmp_time += superTblInfo->timeStampStep, + tmp_time += superTblInfo->timeStampStep, superTblInfo); } if (retLen < 0) { @@ -3957,16 +3958,16 @@ send_to_server: if (delay < winfo->minDelay) winfo->minDelay = delay; winfo->cntDelay++; winfo->totalDelay += delay; - //winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; + winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; + winfo->totalAffectedRows += affectedRows; } - totalAffectedRows += affectedRows; int64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", winfo->threadID, - totalRowsInserted, - totalAffectedRows); + winfo->totalRowsInserted, + winfo->totalAffectedRows); lastPrintTime = currentPrintTime; } //int64_t t2 = taosGetTimestampMs(); @@ -4108,7 +4109,7 @@ static void* syncWrite(void *sarg) { char *pstr = buffer; pstr += sprintf(pstr, - "insert into %s.%s%d values", + "insert into %s.%s%d values ", winfo->db_name, g_args.tb_prefix, tID); int k; for (k = 0; k < g_args.num_of_RPR;) { @@ -4146,16 +4147,16 @@ static void* syncWrite(void *sarg) { int64_t endTs; startTs = taosGetTimestampUs(); //queryDB(winfo->taos, buffer); - if (i > 0 && g_args.insert_interval + if (i > 0 && g_args.insert_interval && (g_args.insert_interval > (et - st) )) { int sleep_time = g_args.insert_interval - (et -st); printf("sleep: %d ms specified by insert_interval\n", sleep_time); taosMsleep(sleep_time); // ms - } + } - if (g_args.insert_interval) { + if (g_args.insert_interval) { st = taosGetTimestampMs(); - } + } verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec(winfo->taos, buffer, 1); @@ -4174,13 +4175,13 @@ static void* syncWrite(void *sarg) { } verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64"\n", __func__, __LINE__, winfo->totalAffectedRows); - if (g_args.insert_interval) { + if (g_args.insert_interval) { et = taosGetTimestampMs(); - } + } - if (tblInserted >= g_args.num_of_DPT) { + if (tblInserted >= g_args.num_of_DPT) { break; - } + } } // num_of_DPT } // tId @@ -4194,8 +4195,6 @@ static void* syncWrite(void *sarg) { static void* syncWriteWithStb(void *sarg) { - uint64_t totalRowsInserted = 0; - uint64_t totalAffectedRows = 0; uint64_t lastPrintTime = taosGetTimestampMs(); threadInfo *winfo = (threadInfo *)sarg; @@ -4255,14 +4254,20 @@ static void* syncWriteWithStb(void *sarg) { uint64_t st = 0; uint64_t et = 0; - debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, - superTblInfo->insertRows); + winfo->totalRowsInserted = 0; + winfo->totalAffectedRows = 0; + + int sampleUsePos; + uint64_t tmp_time; + + debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); + for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { for (int i = 0; i < superTblInfo->insertRows;) { - int64_t inserted = i; - uint64_t tmp_time = time_counter; + int64_t tblInserted = i; + tmp_time = time_counter; if (i > 0 && g_args.insert_interval && (g_args.insert_interval > (et - st) )) { @@ -4275,14 +4280,15 @@ static void* syncWriteWithStb(void *sarg) { st = taosGetTimestampMs(); } - int sampleUsePos = samplePos; + sampleUsePos = samplePos; verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); - for (int k = 0; k < g_args.num_of_RPR;) { - int len = 0; - memset(buffer, 0, superTblInfo->maxSqlLen); - char *pstr = buffer; - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { + memset(buffer, 0, superTblInfo->maxSqlLen); + int len = 0; + + char *pstr = buffer; + + if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { char* tagsValBuf = NULL; if (0 == superTblInfo->tagSource) { tagsValBuf = generateTagVaulesForStb(superTblInfo); @@ -4305,21 +4311,23 @@ static void* syncWriteWithStb(void *sarg) { superTblInfo->sTblName, tagsValBuf); tmfree(tagsValBuf); - } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { + } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s values", winfo->db_name, superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN); - } else { + } else { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s%d values", winfo->db_name, superTblInfo->childTblPrefix, tID); - } + } + int k; + for (k = 0; k < g_args.num_of_RPR;) { int retLen = 0; if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { retLen = getRowDataFromSample( @@ -4340,7 +4348,8 @@ static void* syncWriteWithStb(void *sarg) { int64_t d = tmp_time - rand() % superTblInfo->disorderRange; retLen = generateRowData( pstr + len, - superTblInfo->maxSqlLen - len, d, + superTblInfo->maxSqlLen - len, + d, superTblInfo); //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d); } else { @@ -4354,24 +4363,21 @@ static void* syncWriteWithStb(void *sarg) { goto free_and_statistics_2; } } -/* len += retLen; -*/ - inserted++; + + len += retLen; + verbosePrint("%s() LN%d retLen=%d len=%d k=%d buffer=%s\n", __func__, __LINE__, retLen, len, k, buffer); + + tblInserted++; k++; i++; - totalRowsInserted++; - verbosePrint("%s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); - - if (inserted > superTblInfo->insertRows) - break; -/* if (inserted >= superTblInfo->insertRows - || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) - break; -*/ - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { - //printf("===== sql: %s \n\n", buffer); - //int64_t t1 = taosGetTimestampMs(); + if (tblInserted >= superTblInfo->insertRows) + break; + } + + winfo->totalRowsInserted += k; + + if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { int64_t startTs; int64_t endTs; startTs = taosGetTimestampUs(); @@ -4388,76 +4394,54 @@ static void* syncWriteWithStb(void *sarg) { if (delay < winfo->minDelay) winfo->minDelay = delay; winfo->cntDelay++; winfo->totalDelay += delay; - //winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; } - totalAffectedRows += affectedRows; + winfo->totalAffectedRows += affectedRows; int64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", winfo->threadID, - totalRowsInserted, - totalAffectedRows); + winfo->totalRowsInserted, + winfo->totalAffectedRows); lastPrintTime = currentPrintTime; } - //int64_t t2 = taosGetTimestampMs(); - //printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0); - } else { - //int64_t t1 = taosGetTimestampMs(); + } else { int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); - //int64_t t2 = taosGetTimestampMs(); - //printf("http insert sql return, Spent %ld ms \n", t2 - t1); if (0 != retCode) { printf("========restful return fail, threadID[%d]\n", winfo->threadID); goto free_and_statistics_2; } - } - if (g_args.insert_interval) { - et = taosGetTimestampMs(); - } -/* - if (loop_cnt) { - loop_cnt--; - if ((1 == loop_cnt) && (0 != nrecords_last_req)) { - nrecords_cur_req = nrecords_last_req; - } else if (0 == loop_cnt){ - nrecords_cur_req = nrecords_no_last_req; - loop_cnt = loop_cnt_orig; - break; - } - } else { - break; - } - */ + } + if (g_args.insert_interval) { + et = taosGetTimestampMs(); } - if (tID == winfo->end_table_id) { + time_counter = tmp_time; + + if (tblInserted >= superTblInfo->insertRows) + break; + } // num_of_DPT + + if (tID == winfo->end_table_id) { if (0 == strncasecmp( superTblInfo->dataSource, "sample", strlen("sample"))) { samplePos = sampleUsePos; } - i = inserted; - time_counter = tmp_time; - } } - //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); - } + } // tID free_and_statistics_2: tmfree(buffer); tmfree(sampleDataBuf); tmfclose(fp); - winfo->totalRowsInserted = totalRowsInserted; - winfo->totalAffectedRows = totalAffectedRows; - printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", winfo->threadID, - totalRowsInserted, - totalAffectedRows); + winfo->totalRowsInserted, + winfo->totalAffectedRows); return NULL; } From 530401602ab451feada7d3e5101e8a6d03265560 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 9 Mar 2021 17:00:00 +0800 Subject: [PATCH 7/7] [TD-3147] : suppport insert interval. pass test. --- src/kit/taosdemo/taosdemo.c | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 4f0e4a8766..63038725cf 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -3784,7 +3784,6 @@ static void syncWriteForNumberOfTblInOneSql( } uint64_t time_counter = winfo->start_time; - int64_t tmp_time; int sampleUsePos; int64_t st = 0; @@ -3792,6 +3791,7 @@ static void syncWriteForNumberOfTblInOneSql( for (int i = 0; i < superTblInfo->insertRows;) { int32_t tbl_id = 0; for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { + int64_t tmp_time = 0; int inserted = i; for (int k = 0; k < g_args.num_of_RPR;) { @@ -4250,7 +4250,6 @@ static void* syncWriteWithStb(void *sarg) { return NULL; } - int64_t time_counter = winfo->start_time; uint64_t st = 0; uint64_t et = 0; @@ -4258,16 +4257,16 @@ static void* syncWriteWithStb(void *sarg) { winfo->totalAffectedRows = 0; int sampleUsePos; - uint64_t tmp_time; debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { + int64_t start_time = winfo->start_time; + for (int i = 0; i < superTblInfo->insertRows;) { int64_t tblInserted = i; - tmp_time = time_counter; if (i > 0 && g_args.insert_interval && (g_args.insert_interval > (et - st) )) { @@ -4333,7 +4332,7 @@ static void* syncWriteWithStb(void *sarg) { retLen = getRowDataFromSample( pstr + len, superTblInfo->maxSqlLen - len, - tmp_time += superTblInfo->timeStampStep, + start_time + superTblInfo->timeStampStep * i, superTblInfo, &sampleUsePos, fp, @@ -4345,7 +4344,7 @@ static void* syncWriteWithStb(void *sarg) { int rand_num = rand_tinyint() % 100; if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) { - int64_t d = tmp_time - rand() % superTblInfo->disorderRange; + int64_t d = start_time - rand() % superTblInfo->disorderRange; retLen = generateRowData( pstr + len, superTblInfo->maxSqlLen - len, @@ -4356,7 +4355,7 @@ static void* syncWriteWithStb(void *sarg) { retLen = generateRowData( pstr + len, superTblInfo->maxSqlLen - len, - tmp_time += superTblInfo->timeStampStep, + start_time + superTblInfo->timeStampStep * i, superTblInfo); } if (retLen < 0) { @@ -4417,8 +4416,6 @@ static void* syncWriteWithStb(void *sarg) { et = taosGetTimestampMs(); } - time_counter = tmp_time; - if (tblInserted >= superTblInfo->insertRows) break; } // num_of_DPT