From fa13844ae6ad5e61f2a5f7f4bef8eda2e8702673 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 1 Mar 2021 08:24:07 +0000 Subject: [PATCH 01/12] Adjusted crash_gen tool to run on ARM32 platform, without Python guppy package --- tests/pytest/crash_gen/crash_gen_main.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index 309c0df910..c3510c7b6c 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -35,7 +35,7 @@ import os import signal import traceback import resource -from guppy import hpy +# from guppy import hpy import gc from crash_gen.service_manager import ServiceManager, TdeInstance @@ -1774,13 +1774,13 @@ class TdSuperTable: ]) # TODO: add more from 'top' - if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?! - sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName()) - if Dice.throw(3) == 0: # 1 in X chance - sql = sql + ' GROUP BY color' - Progress.emit(Progress.QUERY_GROUP_BY) - # Logging.info("Executing GROUP-BY query: " + sql) - ret.append(SqlQuery(sql)) + # if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049) + sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName()) + if Dice.throw(3) == 0: # 1 in X chance + sql = sql + ' GROUP BY color' + Progress.emit(Progress.QUERY_GROUP_BY) + # Logging.info("Executing GROUP-BY query: " + sql) + ret.append(SqlQuery(sql)) return ret From a273c635929d0f2c6d9516fb482187c15e433c54 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Mon, 8 Mar 2021 19:47:56 +0800 Subject: [PATCH 02/12] [TD-3147] : suppoert insert interval. print sleep time. --- src/kit/taosdemo/taosdemo.c | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 4a9038c71e..a8326a23c9 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -3918,7 +3918,7 @@ static void syncWriteForNumberOfTblInOneSql( send_to_server: if (g_args.insert_interval && (g_args.insert_interval > (et - st))) { int sleep_time = g_args.insert_interval - (et -st); - debugPrint("DEBUG sleep: %d ms\n", sleep_time); + printf("sleep: %d ms specified by insert_interval\n", sleep_time); taosMsleep(sleep_time); // ms } @@ -4135,7 +4135,7 @@ static void* syncWrite(void *sarg) { if (i > 0 && g_args.insert_interval && (g_args.insert_interval > (et - st) )) { int sleep_time = g_args.insert_interval - (et -st); - debugPrint("DEBUG sleep: %d ms\n", sleep_time); + printf("sleep: %d ms specified by insert_interval\n", sleep_time); taosMsleep(sleep_time); // ms } @@ -4242,6 +4242,17 @@ static void* syncWriteWithStb(void *sarg) { uint64_t inserted = i; uint64_t tmp_time = time_counter; + if (i > 0 && g_args.insert_interval + && (g_args.insert_interval > (et - st) )) { + int sleep_time = g_args.insert_interval - (et -st); + printf("sleep: %d ms specified by insert_interval\n", sleep_time); + taosMsleep(sleep_time); // ms + } + + if (g_args.insert_interval) { + st = taosGetTimestampMs(); + } + int sampleUsePos = samplePos; int k = 0; debugPrint("DEBUG - %s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); @@ -4326,6 +4337,7 @@ static void* syncWriteWithStb(void *sarg) { */ inserted++; k++; + i++; totalRowsInserted++; if (inserted > superTblInfo->insertRows) @@ -4334,16 +4346,6 @@ static void* syncWriteWithStb(void *sarg) { || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) break; */ - if (i > 0 && g_args.insert_interval - && (g_args.insert_interval > (et - st) )) { - int sleep_time = g_args.insert_interval - (et -st); - debugPrint("DEBUG sleep: %d ms\n", sleep_time); - taosMsleep(sleep_time); // ms - } - - if (g_args.insert_interval) { - st = taosGetTimestampMs(); - } if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { //printf("===== sql: %s \n\n", buffer); From b8bc8759d12f32ef4e74083ee114a5f53de9c04f Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Mon, 8 Mar 2021 23:29:34 +0800 Subject: [PATCH 03/12] [TD-3147] : support insert interval. --- src/kit/taosdemo/taosdemo.c | 55 ++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 373f89ab63..9b2a1e1649 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -227,7 +227,7 @@ typedef struct SSuperTable_S { int disorderRange; // ms or us by database precision int maxSqlLen; // - int64_t insertRows; // 0: no limit +// int64_t insertRows; // 0: no limit int timeStampStep; char startTimestamp[MAX_TB_NAME_SIZE]; // char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json @@ -748,7 +748,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { printf("\n"); } printf("# Insertion interval: %d\n", arguments->insert_interval); - printf("# Number of Columns per record: %d\n", arguments->num_of_RPR); + printf("# Number of records per req: %d\n", arguments->num_of_RPR); printf("# Number of Threads: %d\n", arguments->num_of_threads); printf("# Number of Tables: %d\n", arguments->num_of_tables); printf("# Number of Data per Table: %d\n", arguments->num_of_DPT); @@ -1078,7 +1078,7 @@ static int printfInsertMeta() { printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix); printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode); - printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows); +// printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { printf(" multiThreadWriteOneTbl: \033[33mno\033[0m\n"); @@ -1225,7 +1225,7 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode); - fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); +// fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { fprintf(fp, " multiThreadWriteOneTbl: no\n"); @@ -3223,8 +3223,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("failed to read json, disorderRange not found"); goto PARSE_OVER; } - - +/* cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows"); if (insertRows && insertRows->type == cJSON_Number) { g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint; @@ -3237,7 +3236,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("failed to read json, insert_rows not found"); goto PARSE_OVER; } - +*/ if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { continue; @@ -3781,7 +3780,8 @@ static void syncWriteForNumberOfTblInOneSql( int64_t st = 0; int64_t et = 0; - for (int i = 0; i < superTblInfo->insertRows;) { +// for (int i = 0; i < superTblInfo->insertRows;) { + for (int i = 0; i < g_args.num_of_RPR;) { int32_t tbl_id = 0; for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { int inserted = i; @@ -3904,7 +3904,8 @@ static void syncWriteForNumberOfTblInOneSql( k++; totalRowsInserted++; - if (inserted >= superTblInfo->insertRows || + // if (inserted >= superTblInfo->insertRows || + if (inserted >= g_args.num_of_RPR || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { tID = tbl_id + 1; printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", @@ -4235,13 +4236,16 @@ static void* syncWriteWithStb(void *sarg) { int64_t time_counter = winfo->start_time; uint64_t st = 0; uint64_t et = 0; - - debugPrint("DEBUG - %s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); - +/* + debugPrint("DEBUG - %s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, + superTblInfo->insertRows); for (int i = 0; i < superTblInfo->insertRows;) { +*/ + for (int i = 0; i < g_args.num_of_RPR;) { - for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - uint64_t inserted = i; + for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; + tID++) { + int64_t inserted = i; uint64_t tmp_time = time_counter; if (i > 0 && g_args.insert_interval @@ -4341,8 +4345,10 @@ static void* syncWriteWithStb(void *sarg) { k++; i++; totalRowsInserted++; + debugPrint("DEBUG - %s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); - if (inserted > superTblInfo->insertRows) +// if (inserted > superTblInfo->insertRows) + if (inserted > g_args.num_of_RPR) break; /* if (inserted >= superTblInfo->insertRows || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) @@ -4455,7 +4461,8 @@ void callBack(void *param, TAOS_RES *res, int code) { char *data = calloc(1, MAX_DATA_SIZE); char *pstr = buffer; pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id); - if (winfo->counter >= winfo->superTblInfo->insertRows) { +// if (winfo->counter >= winfo->superTblInfo->insertRows) { + if (winfo->counter >= g_args.num_of_RPR) { winfo->start_table_id++; winfo->counter = 0; } @@ -4481,7 +4488,8 @@ void callBack(void *param, TAOS_RES *res, int code) { pstr += sprintf(pstr, "%s", data); winfo->counter++; - if (winfo->counter >= winfo->superTblInfo->insertRows) { +// if (winfo->counter >= winfo->superTblInfo->insertRows) { + if (winfo->counter >= g_args.num_of_RPR) { break; } } @@ -4700,11 +4708,12 @@ void *readTable(void *sarg) { } int num_of_DPT; - if (rinfo->superTblInfo) { +/* if (rinfo->superTblInfo) { num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table; } else { + */ num_of_DPT = g_args.num_of_DPT; - } +// } int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; @@ -4767,7 +4776,8 @@ void *readMetric(void *sarg) { return NULL; } - int num_of_DPT = rinfo->superTblInfo->insertRows; +// int num_of_DPT = rinfo->superTblInfo->insertRows; + int num_of_DPT = g_args.num_of_DPT; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; bool do_aggreFunc = g_Dbs.do_aggreFunc; @@ -4886,7 +4896,8 @@ int insertTestProcess() { if (g_Dbs.db[i].superTblCount > 0) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; - if (0 == g_Dbs.db[i].superTbls[j].insertRows) { +// if (0 == g_Dbs.db[i].superTbls[j].insertRows) { + if (0 == g_args.num_of_DPT) { continue; } startMultiThreadInsertData( @@ -5511,7 +5522,7 @@ void setParaFromArg(){ "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].timeStampStep = 10; - g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; + // g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; g_Dbs.db[0].superTbls[0].columnCount = 0; From 19fc8eb86be706c0ec6650d97602ff802d680fd4 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 9 Mar 2021 00:06:51 +0800 Subject: [PATCH 04/12] [TD-3147] : support insert interval. add verbose print. --- src/kit/taosdemo/taosdemo.c | 62 +++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 9b2a1e1649..d3cda7bef8 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -185,6 +185,7 @@ typedef struct SArguments_S { bool insert_only; bool answer_yes; bool debug_print; + bool verbose_print; char * output_file; int mode; char * datatype[MAX_NUM_DATATYPE + 1]; @@ -489,6 +490,7 @@ SArguments g_args = { false, // use_metric false, // insert_only false, // debug_print + false, // verbose_print false, // answer_yes; "./output.txt", // output_file 0, // mode : sync or async @@ -526,7 +528,11 @@ static SQueryMetaInfo g_queryInfo; static FILE * g_fpOfInsertResult = NULL; #define debugPrint(fmt, ...) \ - do { if (g_args.debug_print) fprintf(stderr, fmt, __VA_ARGS__); } while(0) + do { if (g_args.debug_print || g_args.verbose_print) \ + fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0) +#define verbosePrint(fmt, ...) \ + do { if (g_args.verbose_print) fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0) + /////////////////////////////////////////////////// void printHelp() { @@ -691,6 +697,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->answer_yes = true; } else if (strcmp(argv[i], "-g") == 0) { arguments->debug_print = true; + } else if (strcmp(argv[i], "-gg") == 0) { + arguments->verbose_print = true; } else if (strcmp(argv[i], "-c") == 0) { strcpy(configDir, argv[++i]); } else if (strcmp(argv[i], "-O") == 0) { @@ -805,7 +813,7 @@ static int queryDbExec(TAOS *taos, char *command, int type) { } if (code != 0) { - debugPrint("DEBUG %s() LN%d - command: %s\n", __func__, __LINE__, command); + debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command); fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res)); taos_free_result(res); //taos_close(taos); @@ -1986,7 +1994,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, exit(-1); } snprintf(superTbls->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols); - debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); + debugPrint("%s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); if (use_metric) { char tags[STRING_LEN] = "\0"; @@ -2039,13 +2047,13 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s (ts timestamp%s) tags %s", dbName, superTbls->sTblName, cols, tags); - debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, command); + debugPrint("%s() LN%d: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName); return -1; } - debugPrint("DEBUG - create supertable %s success!\n\n", superTbls->sTblName); + debugPrint("create supertable %s success!\n\n", superTbls->sTblName); } return 0; } @@ -2064,7 +2072,7 @@ static int createDatabases() { for (int i = 0; i < g_Dbs.dbCount; i++) { if (g_Dbs.db[i].drop) { sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName); - debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); + verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { taos_close(taos); return -1; @@ -2132,7 +2140,7 @@ static int createDatabases() { "precision \'%s\';", g_Dbs.db[i].dbCfg.precision); } - debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); + debugPrint("%s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { taos_close(taos); printf("\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName); @@ -2140,11 +2148,11 @@ static int createDatabases() { } printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName); - debugPrint("DEBUG %s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount); + debugPrint("%s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount); for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { // describe super table, if exists sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName); - debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); + verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS; ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric); @@ -2232,7 +2240,7 @@ static void* createTable(void *sarg) } len = 0; - debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer); + debugPrint("%s() LN%d %s\n", __func__, __LINE__, buffer); if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ free(buffer); return NULL; @@ -2247,7 +2255,7 @@ static void* createTable(void *sarg) } if (0 != len) { - debugPrint("DEBUG %s() %d buffer: %s\n", __func__, __LINE__, buffer); + debugPrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer); (void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE); } @@ -2285,7 +2293,7 @@ int startMultiThreadCreateChildTable( t_info->threadID = i; tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE); t_info->superTblInfo = superTblInfo; - debugPrint("DEBUG %s() %d db_name: %s\n", __func__, __LINE__, db_name); + verbosePrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name); t_info->taos = taos_connect( g_Dbs.host, g_Dbs.user, @@ -2336,7 +2344,7 @@ static void createChildTables() { continue; } - debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, + verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); startMultiThreadCreateChildTable( g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, @@ -2362,7 +2370,7 @@ static void createChildTables() { len = snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")"); - debugPrint("DEBUG - %s() LN%d: dbName: %s num of tb: %d schema: %s\n", __func__, __LINE__, + verbosePrint("%s() LN%d: dbName: %s num of tb: %d schema: %s\n", __func__, __LINE__, g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf); startMultiThreadCreateChildTable( tblColsBuf, @@ -3586,7 +3594,7 @@ PARSE_OVER: } static bool getInfoFromJsonFile(char* file) { - debugPrint("DEBUG - %s %d %s\n", __func__, __LINE__, file); + debugPrint("%s %d %s\n", __func__, __LINE__, file); FILE *fp = fopen(file, "r"); if (!fp) { @@ -3938,7 +3946,7 @@ send_to_server: int64_t endTs; startTs = taosGetTimestampUs(); - debugPrint("DEBUG %s() LN%d buff: %s\n", __func__, __LINE__, buffer); + debugPrint("%s() LN%d buff: %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec( winfo->taos, buffer, INSERT_TYPE); @@ -4145,7 +4153,7 @@ static void* syncWrite(void *sarg) { if (g_args.insert_interval) { st = taosGetTimestampMs(); } - debugPrint("DEBUG - %s() LN%d %s\n", __func__, __LINE__, buffer); + debugPrint("%s() LN%d %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec(winfo->taos, buffer, 1); if (0 <= affectedRows){ @@ -4237,7 +4245,7 @@ static void* syncWriteWithStb(void *sarg) { uint64_t st = 0; uint64_t et = 0; /* - debugPrint("DEBUG - %s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, + debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); for (int i = 0; i < superTblInfo->insertRows;) { */ @@ -4261,7 +4269,7 @@ static void* syncWriteWithStb(void *sarg) { int sampleUsePos = samplePos; int k = 0; - debugPrint("DEBUG - %s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); + verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); for (k = 0; k < g_args.num_of_RPR;) { int len = 0; memset(buffer, 0, superTblInfo->maxSqlLen); @@ -4345,7 +4353,7 @@ static void* syncWriteWithStb(void *sarg) { k++; i++; totalRowsInserted++; - debugPrint("DEBUG - %s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); + debugPrint("%s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); // if (inserted > superTblInfo->insertRows) if (inserted > g_args.num_of_RPR) @@ -4362,7 +4370,7 @@ static void* syncWriteWithStb(void *sarg) { int64_t endTs; startTs = taosGetTimestampUs(); - debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer); + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); if (0 > affectedRows){ @@ -4848,7 +4856,7 @@ int insertTestProcess() { if (ret == -1) exit(EXIT_FAILURE); - debugPrint("DEBUG - %d result file: %s\n", __LINE__, g_Dbs.resultFile); + debugPrint("%d result file: %s\n", __LINE__, g_Dbs.resultFile); g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a"); if (NULL == g_fpOfInsertResult) { fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile); @@ -5085,7 +5093,7 @@ static int queryTestProcess() { char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); - debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); + verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); (void)queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE); } else { t_info->taos = NULL; @@ -5196,7 +5204,7 @@ void *subSubscribeProcess(void *sarg) { char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); - debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); + debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)){ return NULL; } @@ -5262,7 +5270,7 @@ void *superSubscribeProcess(void *sarg) { char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); - debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); + debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)) { return NULL; } @@ -5627,7 +5635,7 @@ void querySqlFile(TAOS* taos, char* sqlFile) } memcpy(cmd + cmd_len, line, read_len); - debugPrint("DEBUG %s() LN%d cmd: %s\n", __func__, __LINE__, cmd); + verbosePrint("%s() LN%d cmd: %s\n", __func__, __LINE__, cmd); queryDbExec(taos, cmd, NO_INSERT_TYPE); memset(cmd, 0, MAX_SQL_SIZE); cmd_len = 0; @@ -5715,7 +5723,7 @@ static void testCmdLine() { int main(int argc, char *argv[]) { parse_args(argc, argv, &g_args); - debugPrint("DEBUG - meta file: %s\n", g_args.metaFile); + debugPrint("meta file: %s\n", g_args.metaFile); if (g_args.metaFile) { initOfInsertMeta(); From 7df0a86699d3713d0279bdfd6c8b327be220e28c Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 9 Mar 2021 00:36:56 +0800 Subject: [PATCH 05/12] [TD-3147] : support insert interval. cleanup. --- src/kit/taosdemo/taosdemo.c | 44 ++----------------------------------- 1 file changed, 2 insertions(+), 42 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index d3cda7bef8..0ac8269b02 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -228,7 +228,6 @@ typedef struct SSuperTable_S { int disorderRange; // ms or us by database precision int maxSqlLen; // -// int64_t insertRows; // 0: no limit int timeStampStep; char startTimestamp[MAX_TB_NAME_SIZE]; // char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json @@ -1086,7 +1085,6 @@ static int printfInsertMeta() { printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix); printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode); -// printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { printf(" multiThreadWriteOneTbl: \033[33mno\033[0m\n"); @@ -1233,7 +1231,6 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode); -// fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { fprintf(fp, " multiThreadWriteOneTbl: no\n"); @@ -3231,20 +3228,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("failed to read json, disorderRange not found"); goto PARSE_OVER; } -/* - cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows"); - if (insertRows && insertRows->type == cJSON_Number) { - g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint; - //if (0 == g_Dbs.db[i].superTbls[j].insertRows) { - // g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; - //} - } else if (!insertRows) { - g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; - } else { - printf("failed to read json, insert_rows not found"); - goto PARSE_OVER; - } -*/ if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { continue; @@ -3788,7 +3771,6 @@ static void syncWriteForNumberOfTblInOneSql( int64_t st = 0; int64_t et = 0; -// for (int i = 0; i < superTblInfo->insertRows;) { for (int i = 0; i < g_args.num_of_RPR;) { int32_t tbl_id = 0; for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { @@ -3912,7 +3894,6 @@ static void syncWriteForNumberOfTblInOneSql( k++; totalRowsInserted++; - // if (inserted >= superTblInfo->insertRows || if (inserted >= g_args.num_of_RPR || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { tID = tbl_id + 1; @@ -4244,16 +4225,11 @@ static void* syncWriteWithStb(void *sarg) { int64_t time_counter = winfo->start_time; uint64_t st = 0; uint64_t et = 0; -/* - debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, - superTblInfo->insertRows); - for (int i = 0; i < superTblInfo->insertRows;) { -*/ for (int i = 0; i < g_args.num_of_RPR;) { for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - int64_t inserted = i; + int64_t inserted = 0; uint64_t tmp_time = time_counter; if (i > 0 && g_args.insert_interval @@ -4355,13 +4331,8 @@ static void* syncWriteWithStb(void *sarg) { totalRowsInserted++; debugPrint("%s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); -// if (inserted > superTblInfo->insertRows) if (inserted > g_args.num_of_RPR) break; -/* if (inserted >= superTblInfo->insertRows - || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) - break; -*/ if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { //printf("===== sql: %s \n\n", buffer); @@ -4469,7 +4440,6 @@ void callBack(void *param, TAOS_RES *res, int code) { char *data = calloc(1, MAX_DATA_SIZE); char *pstr = buffer; pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id); -// if (winfo->counter >= winfo->superTblInfo->insertRows) { if (winfo->counter >= g_args.num_of_RPR) { winfo->start_table_id++; winfo->counter = 0; @@ -4496,7 +4466,6 @@ void callBack(void *param, TAOS_RES *res, int code) { pstr += sprintf(pstr, "%s", data); winfo->counter++; -// if (winfo->counter >= winfo->superTblInfo->insertRows) { if (winfo->counter >= g_args.num_of_RPR) { break; } @@ -4715,13 +4684,7 @@ void *readTable(void *sarg) { return NULL; } - int num_of_DPT; -/* if (rinfo->superTblInfo) { - num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table; - } else { - */ - num_of_DPT = g_args.num_of_DPT; -// } + int num_of_DPT = g_args.num_of_DPT; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; @@ -4784,7 +4747,6 @@ void *readMetric(void *sarg) { return NULL; } -// int num_of_DPT = rinfo->superTblInfo->insertRows; int num_of_DPT = g_args.num_of_DPT; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; @@ -4904,7 +4866,6 @@ int insertTestProcess() { if (g_Dbs.db[i].superTblCount > 0) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; -// if (0 == g_Dbs.db[i].superTbls[j].insertRows) { if (0 == g_args.num_of_DPT) { continue; } @@ -5530,7 +5491,6 @@ void setParaFromArg(){ "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].timeStampStep = 10; - // g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; g_Dbs.db[0].superTbls[0].columnCount = 0; From ba7d9b559c13012dbd5f5f4aec548a102cfe86b2 Mon Sep 17 00:00:00 2001 From: Elias Soong Date: Tue, 9 Mar 2021 10:47:39 +0800 Subject: [PATCH 06/12] [TD-1424] : update description about Arbitrator. --- documentation20/cn/10.cluster/docs.md | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/documentation20/cn/10.cluster/docs.md b/documentation20/cn/10.cluster/docs.md index 6d7d68fe1b..7b4073a883 100644 --- a/documentation20/cn/10.cluster/docs.md +++ b/documentation20/cn/10.cluster/docs.md @@ -225,7 +225,13 @@ SHOW MNODES; ## Arbitrator的使用 -如果副本数为偶数,当一个vnode group里一半vnode不工作时,是无法从中选出master的。同理,一半mnode不工作时,是无法选出mnode的master的,因为存在“split brain”问题。为解决这个问题,TDengine引入了Arbitrator的概念。Arbitrator模拟一个vnode或mnode在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含Arbitrator在内,超过半数的vnode或mnode工作,那么该vnode group或mnode组就可以正常的提供数据插入或查询服务。比如对于副本数为2的情形,如果一个节点A离线,但另外一个节点B正常,而且能连接到Arbitrator,那么节点B就能正常工作。 +如果副本数为偶数,当一个 vnode group 里一半 vnode 不工作时,是无法从中选出 master 的。同理,一半 mnode 不工作时,是无法选出 mnode 的 master 的,因为存在“split brain”问题。为解决这个问题,TDengine 引入了 Arbitrator 的概念。Arbitrator 模拟一个 vnode 或 mnode 在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含 Arbitrator 在内,超过半数的 vnode 或 mnode 工作,那么该 vnode group 或 mnode 组就可以正常的提供数据插入或查询服务。比如对于副本数为 2 的情形,如果一个节点 A 离线,但另外一个节点 B 正常,而且能连接到 Arbitrator,那么节点 B 就能正常工作。 -TDengine提供一个执行程序,名为 tarbitrator,找任何一台Linux服务器运行它即可。请点击[安装包下载](https://www.taosdata.com/cn/all-downloads/),在TDengine Arbitrator Linux一节中,选择适合的版本下载并安装。该程序对系统资源几乎没有要求,只需要保证有网络连接即可。该应用的命令行参数`-p`可以指定其对外服务的端口号,缺省是6042。配置每个taosd实例时,可以在配置文件taos.cfg里将参数arbitrator设置为Arbitrator的End Point。如果该参数配置了,当副本数为偶数时,系统将自动连接配置的Arbitrator。如果副本数为奇数,即使配置了Arbitrator,系统也不会去建立连接。 +总之,在目前版本下,TDengine 建议在双副本环境要配置 Arbitrator,以提升系统的可用性。 + +Arbitrator 的执行程序名为 tarbitrator。该程序对系统资源几乎没有要求,只需要保证有网络连接,找任何一台 Linux 服务器运行它即可。以下简要描述安装配置的步骤: +1. 请点击 [安装包下载](https://www.taosdata.com/cn/all-downloads/),在 TDengine Arbitrator Linux 一节中,选择合适的版本下载并安装。 +2. 该应用的命令行参数 `-p` 可以指定其对外服务的端口号,缺省是 6042。 +3. 修改每个 taosd 实例的配置文件,在 taos.cfg 里将参数 arbitrator 设置为 tarbitrator 程序所对应的 End Point。(如果该参数配置了,当副本数为偶数时,系统将自动连接配置的 Arbitrator。如果副本数为奇数,即使配置了 Arbitrator,系统也不会去建立连接。) +4. 在配置文件中配置了的 Arbitrator,会出现在 `SHOW DNODES;` 指令的返回结果中,对应的 role 列的值会是“arb”。 From bbf7b47e2942b2bf7345029657e40912dcfd5a5b Mon Sep 17 00:00:00 2001 From: Steven Li Date: Tue, 9 Mar 2021 03:24:48 +0000 Subject: [PATCH 07/12] Added -w option to crash_gen tool to write duplicate data to shadow database --- tests/pytest/crash_gen/crash_gen_main.py | 72 +++++++++++++++++++----- tests/pytest/crash_gen/db.py | 37 +++++++++++- tests/pytest/crash_gen/misc.py | 12 +++- tests/pytest/crash_gen/settings.py | 8 +++ 4 files changed, 112 insertions(+), 17 deletions(-) create mode 100644 tests/pytest/crash_gen/settings.py diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index c3510c7b6c..43506c68d5 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -41,10 +41,13 @@ import gc from crash_gen.service_manager import ServiceManager, TdeInstance from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager +import crash_gen.settings import taos import requests +crash_gen.settings.init() + # Require Python 3 if sys.version_info[0] < 3: raise Exception("Must be using Python 3") @@ -259,6 +262,7 @@ class ThreadCoordinator: self._execStats = ExecutionStats() self._runStatus = Status.STATUS_RUNNING self._initDbs() + self._stepStartTime = None # Track how long it takes to execute each step def getTaskExecutor(self): return self._te @@ -394,6 +398,10 @@ class ThreadCoordinator: try: self._syncAtBarrier() # For now just cross the barrier Progress.emit(Progress.END_THREAD_STEP) + if self._stepStartTime : + stepExecTime = time.time() - self._stepStartTime + Progress.emitStr('{:.3f}s/{}'.format(stepExecTime, DbConnNative.totalRequests)) + DbConnNative.resetTotalRequests() # reset to zero except threading.BrokenBarrierError as err: self._execStats.registerFailure("Aborted due to worker thread timeout") Logging.error("\n") @@ -433,6 +441,7 @@ class ThreadCoordinator: # Then we move on to the next step Progress.emit(Progress.BEGIN_THREAD_STEP) + self._stepStartTime = time.time() self._releaseAllWorkerThreads(transitionFailed) if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate" @@ -691,7 +700,7 @@ class AnyState: def canDropDb(self): # If user requests to run up to a number of DBs, # we'd then not do drop_db operations any more - if gConfig.max_dbs > 0 : + if gConfig.max_dbs > 0 or gConfig.use_shadow_db : return False return self._info[self.CAN_DROP_DB] @@ -699,6 +708,8 @@ class AnyState: return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE] def canDropFixedSuperTable(self): + if gConfig.use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table + return False return self._info[self.CAN_DROP_FIXED_SUPER_TABLE] def canAddData(self): @@ -1037,7 +1048,7 @@ class Database: _clsLock = threading.Lock() # class wide lock _lastInt = 101 # next one is initial integer _lastTick = 0 - _lastLaggingTick = 0 # lagging tick, for unsequenced insersions + _lastLaggingTick = 0 # lagging tick, for out-of-sequence (oos) data insertions def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc self._dbNum = dbNum # we assign a number to databases, for our testing purpose @@ -1093,21 +1104,24 @@ class Database: t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years t4 = datetime.datetime.fromtimestamp( t3.timestamp() + elSec2) # see explanation above - Logging.debug("Setting up TICKS to start from: {}".format(t4)) + Logging.info("Setting up TICKS to start from: {}".format(t4)) return t4 @classmethod - def getNextTick(cls): + def getNextTick(cls): + ''' + Fetch a timestamp tick, with some random factor, may not be unique. + ''' with cls._clsLock: # prevent duplicate tick if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized # 10k at 1/20 chance, should be enough to avoid overlaps tick = cls.setupLastTick() cls._lastTick = tick - cls._lastLaggingTick = tick + datetime.timedelta(0, -10000) + cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast # if : # should be quite a bit into the future - if Dice.throw(20) == 0: # 1 in 20 chance, return lagging tick - cls._lastLaggingTick += datetime.timedelta(0, 1) # Go back in time 100 seconds + if gConfig.mix_oos_data and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick + cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence return cls._lastLaggingTick else: # regular # add one second to it @@ -1334,7 +1348,8 @@ class Task(): elif self._isErrAcceptable(errno2, err.__str__()): self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format( errno2, err, wt.getDbConn().getLastSql())) - print("_", end="", flush=True) + # print("_", end="", flush=True) + Progress.emit(Progress.ACCEPTABLE_ERROR) self._err = err else: # not an acceptable error errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format( @@ -1563,8 +1578,11 @@ class TaskCreateDb(StateTransitionTask): # numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N numReplica = gConfig.max_replicas # fixed, always repStr = "replica {}".format(numReplica) - self.execWtSql(wt, "create database {} {}" - .format(self._db.getName(), repStr) ) + updatePostfix = "update 1" if gConfig.verify_data else "" # allow update only when "verify data" is active + dbName = self._db.getName() + self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) ) + if dbName == "db_0" and gConfig.use_shadow_db: + self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) ) class TaskDropDb(StateTransitionTask): @classmethod @@ -1988,7 +2006,7 @@ class TaskAddData(StateTransitionTask): numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS fullTableName = db.getName() + '.' + regTableName - sql = "insert into {} values ".format(fullTableName) + sql = "INSERT INTO {} VALUES ".format(fullTableName) for j in range(numRecords): # number of records per table nextInt = db.getNextInt() nextTick = db.getNextTick() @@ -2016,12 +2034,24 @@ class TaskAddData(StateTransitionTask): # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written try: - sql = "insert into {} values ('{}', {}, '{}');".format( # removed: tags ('{}', {}) + sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {}) fullTableName, # ds.getFixedSuperTableName(), # ds.getNextBinary(), ds.getNextFloat(), nextTick, nextInt, nextColor) dbc.execute(sql) + + # Quick hack, attach an update statement here. TODO: create an "update" task + if (not gConfig.use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB + nextInt = db.getNextInt() + nextColor = db.getNextColor() + sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here + fullTableName, + nextTick, nextInt, nextColor) + # sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format( + # fullTableName, db.getNextInt(), db.getNextColor(), nextTick) + dbc.execute(sql) + except: # Any exception at all if gConfig.verify_data: self.unlockTable(fullTableName) @@ -2070,7 +2100,8 @@ class TaskAddData(StateTransitionTask): random.shuffle(tblSeq) # now we have random sequence for i in tblSeq: if (i in self.activeTable): # wow already active - print("x", end="", flush=True) # concurrent insertion + # print("x", end="", flush=True) # concurrent insertion + Progress.emit(Progress.CONCURRENT_INSERTION) else: self.activeTable.add(i) # marking it active @@ -2373,6 +2404,11 @@ class MainExec: '--larger-data', action='store_true', help='Write larger amount of data during write operations (default: false)') + parser.add_argument( + '-m', + '--mix-oos-data', + action='store_false', + help='Mix out-of-sequence data into the test data stream (default: true)') parser.add_argument( '-n', '--dynamic-db-table-names', @@ -2414,6 +2450,11 @@ class MainExec: '--verify-data', action='store_true', help='Verify data written in a number of places by reading back (default: false)') + parser.add_argument( + '-w', + '--use-shadow-db', + action='store_true', + help='Use a shaddow database to verify data integrity (default: false)') parser.add_argument( '-x', '--continue-on-exception', @@ -2422,6 +2463,11 @@ class MainExec: global gConfig gConfig = parser.parse_args() + crash_gen.settings.gConfig = gConfig # TODO: fix this hack, consolidate this global var + + # Sanity check for arguments + if gConfig.use_shadow_db and gConfig.max_dbs>1 : + raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1") Logging.clsInit(gConfig) diff --git a/tests/pytest/crash_gen/db.py b/tests/pytest/crash_gen/db.py index e38692dbe1..62a369c41a 100644 --- a/tests/pytest/crash_gen/db.py +++ b/tests/pytest/crash_gen/db.py @@ -18,6 +18,8 @@ import datetime import traceback # from .service_manager import TdeInstance +import crash_gen.settings + class DbConn: TYPE_NATIVE = "native-c" TYPE_REST = "rest-api" @@ -244,7 +246,7 @@ class MyTDSql: self._conn.close() # TODO: very important, cursor close does NOT close DB connection! self._cursor.close() - def _execInternal(self, sql): + def _execInternal(self, sql): startTime = time.time() # Logging.debug("Executing SQL: " + sql) ret = self._cursor.execute(sql) @@ -257,6 +259,27 @@ class MyTDSql: cls.longestQuery = sql cls.longestQueryTime = queryTime cls.lqStartTime = startTime + + # Now write to the shadow database + if crash_gen.settings.gConfig.use_shadow_db: + if sql[:11] == "INSERT INTO": + if sql[:16] == "INSERT INTO db_0": + sql2 = "INSERT INTO db_s" + sql[16:] + self._cursor.execute(sql2) + else: + raise CrashGenError("Did not find db_0 in INSERT statement: {}".format(sql)) + else: # not an insert statement + pass + + if sql[:12] == "CREATE TABLE": + if sql[:17] == "CREATE TABLE db_0": + sql2 = sql.replace('db_0', 'db_s') + self._cursor.execute(sql2) + else: + raise CrashGenError("Did not find db_0 in CREATE TABLE statement: {}".format(sql)) + else: # not an insert statement + pass + return ret def query(self, sql): @@ -302,12 +325,18 @@ class DbConnNative(DbConn): _lock = threading.Lock() # _connInfoDisplayed = False # TODO: find another way to display this totalConnections = 0 # Not private + totalRequests = 0 def __init__(self, dbTarget): super().__init__(dbTarget) self._type = self.TYPE_NATIVE self._conn = None - # self._cursor = None + # self._cursor = None + + @classmethod + def resetTotalRequests(cls): + with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!! + cls.totalRequests = 0 def openByType(self): # Open connection # global gContainer @@ -356,6 +385,8 @@ class DbConnNative(DbConn): Logging.debug("[SQL] Executing SQL: {}".format(sql)) self._lastSql = sql nRows = self._tdSql.execute(sql) + cls = self.__class__ + cls.totalRequests += 1 Logging.debug( "[SQL] Execution Result, nRows = {}, SQL = {}".format( nRows, sql)) @@ -369,6 +400,8 @@ class DbConnNative(DbConn): Logging.debug("[SQL] Executing SQL: {}".format(sql)) self._lastSql = sql nRows = self._tdSql.query(sql) + cls = self.__class__ + cls.totalRequests += 1 Logging.debug( "[SQL] Query Result, nRows = {}, SQL = {}".format( nRows, sql)) diff --git a/tests/pytest/crash_gen/misc.py b/tests/pytest/crash_gen/misc.py index 6ea5691ce2..9774ec5455 100644 --- a/tests/pytest/crash_gen/misc.py +++ b/tests/pytest/crash_gen/misc.py @@ -176,11 +176,13 @@ class Progress: SERVICE_START_NAP = 7 CREATE_TABLE_ATTEMPT = 8 QUERY_GROUP_BY = 9 + CONCURRENT_INSERTION = 10 + ACCEPTABLE_ERROR = 11 tokens = { STEP_BOUNDARY: '.', - BEGIN_THREAD_STEP: '[', - END_THREAD_STEP: '] ', + BEGIN_THREAD_STEP: ' [', + END_THREAD_STEP: ']', SERVICE_HEART_BEAT: '.Y.', SERVICE_RECONNECT_START: '', @@ -188,8 +190,14 @@ class Progress: SERVICE_START_NAP: '_zz', CREATE_TABLE_ATTEMPT: 'c', QUERY_GROUP_BY: 'g', + CONCURRENT_INSERTION: 'x', + ACCEPTABLE_ERROR: '_', } @classmethod def emit(cls, token): print(cls.tokens[token], end="", flush=True) + + @classmethod + def emitStr(cls, str): + print('({})'.format(str), end="", flush=True) diff --git a/tests/pytest/crash_gen/settings.py b/tests/pytest/crash_gen/settings.py new file mode 100644 index 0000000000..3c4c91e6e0 --- /dev/null +++ b/tests/pytest/crash_gen/settings.py @@ -0,0 +1,8 @@ +from __future__ import annotations +import argparse + +gConfig: argparse.Namespace + +def init(): + global gConfig + gConfig = [] \ No newline at end of file From 9e4f06790402a7be75cf096d36cce1e3048b1848 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 9 Mar 2021 12:59:25 +0800 Subject: [PATCH 08/12] [TD-3147] : support insert interval. normal table working. --- src/kit/taosdemo/taosdemo.c | 115 +++++++++++++++++++++++++----------- 1 file changed, 79 insertions(+), 36 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 0ac8269b02..8bc0ff7f12 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -228,6 +228,7 @@ typedef struct SSuperTable_S { int disorderRange; // ms or us by database precision int maxSqlLen; // + int64_t insertRows; // 0: no limit int timeStampStep; char startTimestamp[MAX_TB_NAME_SIZE]; // char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json @@ -1085,6 +1086,7 @@ static int printfInsertMeta() { printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix); printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode); + printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { printf(" multiThreadWriteOneTbl: \033[33mno\033[0m\n"); @@ -1231,6 +1233,7 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode); + fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { fprintf(fp, " multiThreadWriteOneTbl: no\n"); @@ -1991,7 +1994,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, exit(-1); } snprintf(superTbls->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols); - debugPrint("%s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); + verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); if (use_metric) { char tags[STRING_LEN] = "\0"; @@ -2044,7 +2047,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s (ts timestamp%s) tags %s", dbName, superTbls->sTblName, cols, tags); - debugPrint("%s() LN%d: %s\n", __func__, __LINE__, command); + verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName); @@ -2237,7 +2240,7 @@ static void* createTable(void *sarg) } len = 0; - debugPrint("%s() LN%d %s\n", __func__, __LINE__, buffer); + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ free(buffer); return NULL; @@ -2252,7 +2255,7 @@ static void* createTable(void *sarg) } if (0 != len) { - debugPrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer); + verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer); (void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE); } @@ -3228,6 +3231,20 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("failed to read json, disorderRange not found"); goto PARSE_OVER; } + + cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows"); + if (insertRows && insertRows->type == cJSON_Number) { + g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint; + //if (0 == g_Dbs.db[i].superTbls[j].insertRows) { + // g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; + //} + } else if (!insertRows) { + g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; + } else { + printf("failed to read json, insert_rows not found"); + goto PARSE_OVER; + } + if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { continue; @@ -3771,7 +3788,7 @@ static void syncWriteForNumberOfTblInOneSql( int64_t st = 0; int64_t et = 0; - for (int i = 0; i < g_args.num_of_RPR;) { + for (int i = 0; i < superTblInfo->insertRows;) { int32_t tbl_id = 0; for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { int inserted = i; @@ -3894,7 +3911,7 @@ static void syncWriteForNumberOfTblInOneSql( k++; totalRowsInserted++; - if (inserted >= g_args.num_of_RPR || + if (inserted >= superTblInfo->insertRows || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { tID = tbl_id + 1; printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", @@ -4080,10 +4097,13 @@ static void* syncWrite(void *sarg) { uint64_t st = 0; uint64_t et = 0; - for (int i = 0; i < g_args.num_of_DPT;) { + winfo->totalRowsInserted = 0; + winfo->totalAffectedRows = 0; - for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - int inserted = i; + for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { + for (int i = 0; i < g_args.num_of_DPT;) { + + int tblInserted = i; int64_t tmp_time = time_counter; char *pstr = buffer; @@ -4112,13 +4132,15 @@ static void* syncWrite(void *sarg) { } pstr += sprintf(pstr, " %s", data); - inserted++; + tblInserted++; k++; + i++; - if (inserted >= g_args.num_of_DPT) + if (tblInserted >= g_args.num_of_DPT) break; } + winfo->totalRowsInserted += k; /* puts(buffer); */ int64_t startTs; int64_t endTs; @@ -4134,9 +4156,10 @@ static void* syncWrite(void *sarg) { if (g_args.insert_interval) { st = taosGetTimestampMs(); } - debugPrint("%s() LN%d %s\n", __func__, __LINE__, buffer); + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec(winfo->taos, buffer, 1); - + + verbosePrint("%s() LN%d: affectedRows:%d\n", __func__, __LINE__, affectedRows); if (0 <= affectedRows){ endTs = taosGetTimestampUs(); int64_t delay = endTs - startTs; @@ -4146,20 +4169,26 @@ static void* syncWrite(void *sarg) { winfo->minDelay = delay; winfo->cntDelay++; winfo->totalDelay += delay; - //winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; + winfo->totalAffectedRows += affectedRows; + winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; } + verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64"\n", __func__, __LINE__, winfo->totalAffectedRows); if (g_args.insert_interval) { et = taosGetTimestampMs(); } - if (tID == winfo->end_table_id) { - i = inserted; - time_counter = tmp_time; - } - } + if (tblInserted >= g_args.num_of_DPT) { + break; + } + } // num_of_DPT + } // tId + + printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", + winfo->threadID, + winfo->totalRowsInserted, + winfo->totalAffectedRows); - } return NULL; } @@ -4225,11 +4254,14 @@ static void* syncWriteWithStb(void *sarg) { int64_t time_counter = winfo->start_time; uint64_t st = 0; uint64_t et = 0; - for (int i = 0; i < g_args.num_of_RPR;) { - for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; + debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, + superTblInfo->insertRows); + for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - int64_t inserted = 0; + for (int i = 0; i < superTblInfo->insertRows;) { + + int64_t inserted = i; uint64_t tmp_time = time_counter; if (i > 0 && g_args.insert_interval @@ -4244,9 +4276,8 @@ static void* syncWriteWithStb(void *sarg) { } int sampleUsePos = samplePos; - int k = 0; verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); - for (k = 0; k < g_args.num_of_RPR;) { + for (int k = 0; k < g_args.num_of_RPR;) { int len = 0; memset(buffer, 0, superTblInfo->maxSqlLen); char *pstr = buffer; @@ -4329,10 +4360,14 @@ static void* syncWriteWithStb(void *sarg) { k++; i++; totalRowsInserted++; - debugPrint("%s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); + verbosePrint("%s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); - if (inserted > g_args.num_of_RPR) + if (inserted > superTblInfo->insertRows) break; +/* if (inserted >= superTblInfo->insertRows + || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) + break; +*/ if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { //printf("===== sql: %s \n\n", buffer); @@ -4440,6 +4475,7 @@ void callBack(void *param, TAOS_RES *res, int code) { char *data = calloc(1, MAX_DATA_SIZE); char *pstr = buffer; pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id); +// if (winfo->counter >= winfo->superTblInfo->insertRows) { if (winfo->counter >= g_args.num_of_RPR) { winfo->start_table_id++; winfo->counter = 0; @@ -4466,7 +4502,7 @@ void callBack(void *param, TAOS_RES *res, int code) { pstr += sprintf(pstr, "%s", data); winfo->counter++; - if (winfo->counter >= g_args.num_of_RPR) { + if (winfo->counter >= winfo->superTblInfo->insertRows) { break; } } @@ -4631,12 +4667,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, if (superTblInfo) { superTblInfo->totalAffectedRows += t_info->totalAffectedRows; superTblInfo->totalRowsInserted += t_info->totalRowsInserted; - - totalDelay += t_info->totalDelay; - cntDelay += t_info->cntDelay; - if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; - if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; } + + totalDelay += t_info->totalDelay; + cntDelay += t_info->cntDelay; + if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; + if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; } cntDelay -= 1; @@ -4684,7 +4720,13 @@ void *readTable(void *sarg) { return NULL; } - int num_of_DPT = g_args.num_of_DPT; + int num_of_DPT; +/* if (rinfo->superTblInfo) { + num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table; + } else { + */ + num_of_DPT = g_args.num_of_DPT; +// } int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; @@ -4747,7 +4789,7 @@ void *readMetric(void *sarg) { return NULL; } - int num_of_DPT = g_args.num_of_DPT; + int num_of_DPT = rinfo->superTblInfo->insertRows; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; bool do_aggreFunc = g_Dbs.do_aggreFunc; @@ -4866,7 +4908,7 @@ int insertTestProcess() { if (g_Dbs.db[i].superTblCount > 0) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; - if (0 == g_args.num_of_DPT) { + if (0 == g_Dbs.db[i].superTbls[j].insertRows) { continue; } startMultiThreadInsertData( @@ -5491,6 +5533,7 @@ void setParaFromArg(){ "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].timeStampStep = 10; + g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; g_Dbs.db[0].superTbls[0].columnCount = 0; From 724fb5e03aecc217946fdc2a605ab097608b53b0 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 9 Mar 2021 14:47:17 +0800 Subject: [PATCH 09/12] [TD-3147] : support insert interval. stable works. --- src/kit/taosdemo/taosdemo.c | 164 ++++++++++++++++-------------------- 1 file changed, 74 insertions(+), 90 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 8bc0ff7f12..4f0e4a8766 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -3753,6 +3753,7 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* return (-1); } } + dataLen -= 2; dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); @@ -3820,7 +3821,7 @@ static void syncWriteForNumberOfTblInOneSql( if (0 == len) { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - "insert into %s.%s%d using %s.%s tags %s values ", + "insert into %s.%s%d using %s.%s tags %s values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id, @@ -3830,7 +3831,7 @@ static void syncWriteForNumberOfTblInOneSql( } else { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - " %s.%s%d using %s.%s tags %s values ", + " %s.%s%d using %s.%s tags %s values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id, @@ -3843,13 +3844,13 @@ static void syncWriteForNumberOfTblInOneSql( if (0 == len) { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - "insert into %s.%s values ", + "insert into %s.%s values ", winfo->db_name, superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); } else { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - " %s.%s values ", + " %s.%s values ", winfo->db_name, superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); } @@ -3857,14 +3858,14 @@ static void syncWriteForNumberOfTblInOneSql( if (0 == len) { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - "insert into %s.%s%d values ", + "insert into %s.%s%d values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id); } else { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, - " %s.%s%d values ", + " %s.%s%d values ", winfo->db_name, superTblInfo->childTblPrefix, tbl_id); @@ -3899,7 +3900,7 @@ static void syncWriteForNumberOfTblInOneSql( } else { retLen = generateRowData(pstr + len, superTblInfo->maxSqlLen - len, - tmp_time += superTblInfo->timeStampStep, + tmp_time += superTblInfo->timeStampStep, superTblInfo); } if (retLen < 0) { @@ -3957,16 +3958,16 @@ send_to_server: if (delay < winfo->minDelay) winfo->minDelay = delay; winfo->cntDelay++; winfo->totalDelay += delay; - //winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; + winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; + winfo->totalAffectedRows += affectedRows; } - totalAffectedRows += affectedRows; int64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", winfo->threadID, - totalRowsInserted, - totalAffectedRows); + winfo->totalRowsInserted, + winfo->totalAffectedRows); lastPrintTime = currentPrintTime; } //int64_t t2 = taosGetTimestampMs(); @@ -4108,7 +4109,7 @@ static void* syncWrite(void *sarg) { char *pstr = buffer; pstr += sprintf(pstr, - "insert into %s.%s%d values", + "insert into %s.%s%d values ", winfo->db_name, g_args.tb_prefix, tID); int k; for (k = 0; k < g_args.num_of_RPR;) { @@ -4146,16 +4147,16 @@ static void* syncWrite(void *sarg) { int64_t endTs; startTs = taosGetTimestampUs(); //queryDB(winfo->taos, buffer); - if (i > 0 && g_args.insert_interval + if (i > 0 && g_args.insert_interval && (g_args.insert_interval > (et - st) )) { int sleep_time = g_args.insert_interval - (et -st); printf("sleep: %d ms specified by insert_interval\n", sleep_time); taosMsleep(sleep_time); // ms - } + } - if (g_args.insert_interval) { + if (g_args.insert_interval) { st = taosGetTimestampMs(); - } + } verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec(winfo->taos, buffer, 1); @@ -4174,13 +4175,13 @@ static void* syncWrite(void *sarg) { } verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64"\n", __func__, __LINE__, winfo->totalAffectedRows); - if (g_args.insert_interval) { + if (g_args.insert_interval) { et = taosGetTimestampMs(); - } + } - if (tblInserted >= g_args.num_of_DPT) { + if (tblInserted >= g_args.num_of_DPT) { break; - } + } } // num_of_DPT } // tId @@ -4194,8 +4195,6 @@ static void* syncWrite(void *sarg) { static void* syncWriteWithStb(void *sarg) { - uint64_t totalRowsInserted = 0; - uint64_t totalAffectedRows = 0; uint64_t lastPrintTime = taosGetTimestampMs(); threadInfo *winfo = (threadInfo *)sarg; @@ -4255,14 +4254,20 @@ static void* syncWriteWithStb(void *sarg) { uint64_t st = 0; uint64_t et = 0; - debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, - superTblInfo->insertRows); + winfo->totalRowsInserted = 0; + winfo->totalAffectedRows = 0; + + int sampleUsePos; + uint64_t tmp_time; + + debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); + for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { for (int i = 0; i < superTblInfo->insertRows;) { - int64_t inserted = i; - uint64_t tmp_time = time_counter; + int64_t tblInserted = i; + tmp_time = time_counter; if (i > 0 && g_args.insert_interval && (g_args.insert_interval > (et - st) )) { @@ -4275,14 +4280,15 @@ static void* syncWriteWithStb(void *sarg) { st = taosGetTimestampMs(); } - int sampleUsePos = samplePos; + sampleUsePos = samplePos; verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); - for (int k = 0; k < g_args.num_of_RPR;) { - int len = 0; - memset(buffer, 0, superTblInfo->maxSqlLen); - char *pstr = buffer; - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { + memset(buffer, 0, superTblInfo->maxSqlLen); + int len = 0; + + char *pstr = buffer; + + if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { char* tagsValBuf = NULL; if (0 == superTblInfo->tagSource) { tagsValBuf = generateTagVaulesForStb(superTblInfo); @@ -4305,21 +4311,23 @@ static void* syncWriteWithStb(void *sarg) { superTblInfo->sTblName, tagsValBuf); tmfree(tagsValBuf); - } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { + } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s values", winfo->db_name, superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN); - } else { + } else { len += snprintf(pstr + len, superTblInfo->maxSqlLen - len, "insert into %s.%s%d values", winfo->db_name, superTblInfo->childTblPrefix, tID); - } + } + int k; + for (k = 0; k < g_args.num_of_RPR;) { int retLen = 0; if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { retLen = getRowDataFromSample( @@ -4340,7 +4348,8 @@ static void* syncWriteWithStb(void *sarg) { int64_t d = tmp_time - rand() % superTblInfo->disorderRange; retLen = generateRowData( pstr + len, - superTblInfo->maxSqlLen - len, d, + superTblInfo->maxSqlLen - len, + d, superTblInfo); //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d); } else { @@ -4354,24 +4363,21 @@ static void* syncWriteWithStb(void *sarg) { goto free_and_statistics_2; } } -/* len += retLen; -*/ - inserted++; + + len += retLen; + verbosePrint("%s() LN%d retLen=%d len=%d k=%d buffer=%s\n", __func__, __LINE__, retLen, len, k, buffer); + + tblInserted++; k++; i++; - totalRowsInserted++; - verbosePrint("%s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); - - if (inserted > superTblInfo->insertRows) - break; -/* if (inserted >= superTblInfo->insertRows - || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) - break; -*/ - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { - //printf("===== sql: %s \n\n", buffer); - //int64_t t1 = taosGetTimestampMs(); + if (tblInserted >= superTblInfo->insertRows) + break; + } + + winfo->totalRowsInserted += k; + + if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { int64_t startTs; int64_t endTs; startTs = taosGetTimestampUs(); @@ -4388,76 +4394,54 @@ static void* syncWriteWithStb(void *sarg) { if (delay < winfo->minDelay) winfo->minDelay = delay; winfo->cntDelay++; winfo->totalDelay += delay; - //winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; } - totalAffectedRows += affectedRows; + winfo->totalAffectedRows += affectedRows; int64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", winfo->threadID, - totalRowsInserted, - totalAffectedRows); + winfo->totalRowsInserted, + winfo->totalAffectedRows); lastPrintTime = currentPrintTime; } - //int64_t t2 = taosGetTimestampMs(); - //printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0); - } else { - //int64_t t1 = taosGetTimestampMs(); + } else { int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); - //int64_t t2 = taosGetTimestampMs(); - //printf("http insert sql return, Spent %ld ms \n", t2 - t1); if (0 != retCode) { printf("========restful return fail, threadID[%d]\n", winfo->threadID); goto free_and_statistics_2; } - } - if (g_args.insert_interval) { - et = taosGetTimestampMs(); - } -/* - if (loop_cnt) { - loop_cnt--; - if ((1 == loop_cnt) && (0 != nrecords_last_req)) { - nrecords_cur_req = nrecords_last_req; - } else if (0 == loop_cnt){ - nrecords_cur_req = nrecords_no_last_req; - loop_cnt = loop_cnt_orig; - break; - } - } else { - break; - } - */ + } + if (g_args.insert_interval) { + et = taosGetTimestampMs(); } - if (tID == winfo->end_table_id) { + time_counter = tmp_time; + + if (tblInserted >= superTblInfo->insertRows) + break; + } // num_of_DPT + + if (tID == winfo->end_table_id) { if (0 == strncasecmp( superTblInfo->dataSource, "sample", strlen("sample"))) { samplePos = sampleUsePos; } - i = inserted; - time_counter = tmp_time; - } } - //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); - } + } // tID free_and_statistics_2: tmfree(buffer); tmfree(sampleDataBuf); tmfclose(fp); - winfo->totalRowsInserted = totalRowsInserted; - winfo->totalAffectedRows = totalAffectedRows; - printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", winfo->threadID, - totalRowsInserted, - totalAffectedRows); + winfo->totalRowsInserted, + winfo->totalAffectedRows); return NULL; } From 032e19b1eeb623914739d3af987ab5228e6c7b8e Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 9 Mar 2021 15:26:25 +0800 Subject: [PATCH 10/12] [TD-3197] : fix taosdemo few coverity scan issues. --- src/kit/taosdemo/taosdemo.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 67397d1826..1caacd9f24 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -4992,7 +4992,7 @@ void *subQueryProcess(void *sarg) { int64_t st = 0; int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000; while (1) { - if (g_queryInfo.subQueryInfo.rate && (et - st) < g_queryInfo.subQueryInfo.rate*1000) { + if (g_queryInfo.subQueryInfo.rate && (et - st) < (int64_t)g_queryInfo.subQueryInfo.rate*1000) { taosMsleep(g_queryInfo.subQueryInfo.rate*1000 - (et - st)); // ms //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id); } @@ -5615,7 +5615,13 @@ void querySqlFile(TAOS* taos, char* sqlFile) memcpy(cmd + cmd_len, line, read_len); debugPrint("DEBUG %s() LN%d cmd: %s\n", __func__, __LINE__, cmd); - queryDbExec(taos, cmd, NO_INSERT_TYPE); + if (0 != queryDbExec(taos, cmd, NO_INSERT_TYPE)) { + printf("queryDbExec %s failed!\n", cmd); + tmfree(cmd); + tmfree(line); + tmfclose(fp); + return; + } memset(cmd, 0, MAX_SQL_SIZE); cmd_len = 0; } From 13baeeac7bb0b8bc0e8ed4cb007ad9b4070e46e2 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 9 Mar 2021 16:22:18 +0800 Subject: [PATCH 11/12] [TD-3045]: exit zombie dropped dnode --- src/dnode/src/dnodeVnodes.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/dnode/src/dnodeVnodes.c b/src/dnode/src/dnodeVnodes.c index 9f32541612..d00314fcbc 100644 --- a/src/dnode/src/dnodeVnodes.c +++ b/src/dnode/src/dnodeVnodes.c @@ -198,6 +198,14 @@ void dnodeCleanupVnodes() { static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { if (pMsg->code != TSDB_CODE_SUCCESS) { dError("status rsp is received, error:%s", tstrerror(pMsg->code)); + if (pMsg->code == TSDB_CODE_MND_DNODE_NOT_EXIST) { + char clusterId[TSDB_CLUSTER_ID_LEN]; + dnodeGetClusterId(clusterId); + if (clusterId[0] != '\0') { + dError("exit zombie dropped dnode"); + exit(EXIT_FAILURE); + } + } taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); return; } From 530401602ab451feada7d3e5101e8a6d03265560 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 9 Mar 2021 17:00:00 +0800 Subject: [PATCH 12/12] [TD-3147] : suppport insert interval. pass test. --- src/kit/taosdemo/taosdemo.c | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 4f0e4a8766..63038725cf 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -3784,7 +3784,6 @@ static void syncWriteForNumberOfTblInOneSql( } uint64_t time_counter = winfo->start_time; - int64_t tmp_time; int sampleUsePos; int64_t st = 0; @@ -3792,6 +3791,7 @@ static void syncWriteForNumberOfTblInOneSql( for (int i = 0; i < superTblInfo->insertRows;) { int32_t tbl_id = 0; for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { + int64_t tmp_time = 0; int inserted = i; for (int k = 0; k < g_args.num_of_RPR;) { @@ -4250,7 +4250,6 @@ static void* syncWriteWithStb(void *sarg) { return NULL; } - int64_t time_counter = winfo->start_time; uint64_t st = 0; uint64_t et = 0; @@ -4258,16 +4257,16 @@ static void* syncWriteWithStb(void *sarg) { winfo->totalAffectedRows = 0; int sampleUsePos; - uint64_t tmp_time; debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { + int64_t start_time = winfo->start_time; + for (int i = 0; i < superTblInfo->insertRows;) { int64_t tblInserted = i; - tmp_time = time_counter; if (i > 0 && g_args.insert_interval && (g_args.insert_interval > (et - st) )) { @@ -4333,7 +4332,7 @@ static void* syncWriteWithStb(void *sarg) { retLen = getRowDataFromSample( pstr + len, superTblInfo->maxSqlLen - len, - tmp_time += superTblInfo->timeStampStep, + start_time + superTblInfo->timeStampStep * i, superTblInfo, &sampleUsePos, fp, @@ -4345,7 +4344,7 @@ static void* syncWriteWithStb(void *sarg) { int rand_num = rand_tinyint() % 100; if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) { - int64_t d = tmp_time - rand() % superTblInfo->disorderRange; + int64_t d = start_time - rand() % superTblInfo->disorderRange; retLen = generateRowData( pstr + len, superTblInfo->maxSqlLen - len, @@ -4356,7 +4355,7 @@ static void* syncWriteWithStb(void *sarg) { retLen = generateRowData( pstr + len, superTblInfo->maxSqlLen - len, - tmp_time += superTblInfo->timeStampStep, + start_time + superTblInfo->timeStampStep * i, superTblInfo); } if (retLen < 0) { @@ -4417,8 +4416,6 @@ static void* syncWriteWithStb(void *sarg) { et = taosGetTimestampMs(); } - time_counter = tmp_time; - if (tblInserted >= superTblInfo->insertRows) break; } // num_of_DPT