diff --git a/documentation20/cn/00.index/docs.md b/documentation20/cn/00.index/docs.md index aba10a14e3..c16673cba5 100644 --- a/documentation20/cn/00.index/docs.md +++ b/documentation20/cn/00.index/docs.md @@ -120,7 +120,7 @@ TDengine是一个高效的存储、查询、分析时序大数据的平台,专 * [TDengine性能对比测试工具](https://www.taosdata.com/blog/2020/01/18/1166.html) * [IDEA数据库管理工具可视化使用TDengine](https://www.taosdata.com/blog/2020/08/27/1767.html) * [基于eletron开发的跨平台TDengine图形化管理工具](https://github.com/skye0207/TDengineGUI) -* [DataX,支持TDengine的离线数据采集/同步工具](https://github.com/wgzhao/DataX)(文档:[读取插件](https://github.com/wgzhao/DataX/blob/master/docs/src/main/sphinx/reader/tdenginereader.md)、[写入插件](https://github.com/wgzhao/DataX/blob/master/docs/src/main/sphinx/writer/tdenginewriter.md)) +* [DataX,支持TDengine的离线数据采集/同步工具](https://github.com/alibaba/DataX) ## TDengine与其他数据库的对比测试 diff --git a/src/kit/taosdemo/insert.json b/src/kit/taosdemo/insert.json index 5276b9fb61..5ed744cde9 100644 --- a/src/kit/taosdemo/insert.json +++ b/src/kit/taosdemo/insert.json @@ -11,6 +11,7 @@ "confirm_parameter_prompt": "no", "insert_interval": 0, "num_of_records_per_req": 100, + "max_sql_len": 1024000, "databases": [{ "dbinfo": { "name": "db", @@ -38,7 +39,9 @@ "auto_create_table": "no", "data_source": "rand", "insert_mode": "taosc", - "insert_rows": 100000, + "childtable_limit": 33, + "childtable_offset": 33, + "insert_rows": 1000, "multi_thread_write_one_tbl": "no", "number_of_tbl_in_one_sql": 0, "rows_per_tbl": 100, diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 807bef76cd..615b0aebe0 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -98,6 +98,8 @@ extern char configDir[]; #define MAX_DATABASE_COUNT 256 #define INPUT_BUF_LEN 256 +#define DEFAULT_TIMESTAMP_STEP 10 + typedef enum CREATE_SUB_TALBE_MOD_EN { PRE_CREATE_SUBTBL, AUTO_CREATE_SUBTBL, @@ -196,6 +198,7 @@ typedef struct SArguments_S { int num_of_threads; int insert_interval; int num_of_RPR; + int max_sql_len; int num_of_tables; int num_of_DPT; int abort; @@ -222,6 +225,8 @@ typedef struct SSuperTable_S { char childTblPrefix[MAX_TB_NAME_SIZE]; char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample char insertMode[MAX_TB_NAME_SIZE]; // taosc, restful + int childTblLimit; + int childTblOffset; int multiThreadWriteOneTbl; // 0: no, 1: yes int numberOfTblInOneSql; // 0/1: one table, > 1: number of tbl @@ -259,7 +264,7 @@ typedef struct SSuperTable_S { int tagUsePos; // statistics - int64_t totalRowsInserted; + int64_t totalInsertRows; int64_t totalAffectedRows; } SSuperTable; @@ -329,7 +334,7 @@ typedef struct SDbs_S { SDataBase db[MAX_DB_COUNT]; // statistics - int64_t totalRowsInserted; + int64_t totalInsertRows; int64_t totalAffectedRows; } SDbs; @@ -400,7 +405,7 @@ typedef struct SThreadInfo_S { int64_t lastTs; // statistics - int64_t totalRowsInserted; + int64_t totalInsertRows; int64_t totalAffectedRows; // insert delay statistics @@ -514,6 +519,7 @@ SArguments g_args = { 10, // num_of_connections/thread 0, // insert_interval 100, // num_of_RPR + TSDB_PAYLOAD_SIZE, // max_sql_len 10000, // num_of_tables 10000, // num_of_DPT 0, // abort @@ -761,6 +767,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { } printf("# Insertion interval: %d\n", arguments->insert_interval); printf("# Number of records per req: %d\n", arguments->num_of_RPR); + printf("# Max SQL length: %d\n", arguments->max_sql_len); + printf("# Length of Binary: %d\n", arguments->len_of_binary); printf("# Number of Threads: %d\n", arguments->num_of_threads); printf("# Number of Tables: %d\n", arguments->num_of_tables); printf("# Number of Data per Table: %d\n", arguments->num_of_DPT); @@ -774,6 +782,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { printf("# Delete method: %d\n", arguments->method_of_delete); printf("# Answer yes when prompt: %d\n", arguments->answer_yes); printf("# Print debug info: %d\n", arguments->debug_print); + printf("# Print verbose info: %d\n", arguments->verbose_print); printf("###################################################################\n"); if (!arguments->answer_yes) { printf("Press enter key to continue\n\n"); @@ -1006,6 +1015,7 @@ static int printfInsertMeta() { printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl); printf("insert interval: \033[33m%d\033[0m\n", g_args.insert_interval); printf("number of records per req: \033[33m%d\033[0m\n", g_args.num_of_RPR); + printf("max sql length: \033[33m%d\033[0m\n", g_args.max_sql_len); printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount); for (int i = 0; i < g_Dbs.dbCount; i++) { @@ -1085,37 +1095,55 @@ static int printfInsertMeta() { } else { printf(" childTblExists: \033[33m%s\033[0m\n", "error"); } - - printf(" childTblCount: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].childTblCount); - printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix); - printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); - printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode); - printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows); + + printf(" childTblCount: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].childTblCount); + printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix); + printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); + printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode); + if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) { + printf(" childTblLimit: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].childTblLimit); + } + if (g_Dbs.db[i].superTbls[j].childTblOffset > 0) { + printf(" childTblOffset: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].childTblOffset); + } + printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows); if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { - printf(" multiThreadWriteOneTbl: \033[33mno\033[0m\n"); + printf(" multiThreadWriteOneTbl: \033[33mno\033[0m\n"); }else { - printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n"); + printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n"); } - printf(" numberOfTblInOneSql: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].numberOfTblInOneSql); - printf(" rowsPerTbl: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].rowsPerTbl); - printf(" disorderRange: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].disorderRange); - printf(" disorderRatio: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].disorderRatio); - printf(" maxSqlLen: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].maxSqlLen); - - printf(" timeStampStep: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].timeStampStep); - printf(" startTimestamp: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].startTimestamp); - printf(" sampleFormat: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sampleFormat); - printf(" sampleFile: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sampleFile); - printf(" tagsFile: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].tagsFile); - - printf(" columnCount: \033[33m%d\033[0m\n ", g_Dbs.db[i].superTbls[j].columnCount); + printf(" numberOfTblInOneSql: \033[33m%d\033[0m\n", + g_Dbs.db[i].superTbls[j].numberOfTblInOneSql); + printf(" rowsPerTbl: \033[33m%d\033[0m\n", + g_Dbs.db[i].superTbls[j].rowsPerTbl); + printf(" disorderRange: \033[33m%d\033[0m\n", + g_Dbs.db[i].superTbls[j].disorderRange); + printf(" disorderRatio: \033[33m%d\033[0m\n", + g_Dbs.db[i].superTbls[j].disorderRatio); + printf(" maxSqlLen: \033[33m%d\033[0m\n", + g_Dbs.db[i].superTbls[j].maxSqlLen); + printf(" timeStampStep: \033[33m%d\033[0m\n", + g_Dbs.db[i].superTbls[j].timeStampStep); + printf(" startTimestamp: \033[33m%s\033[0m\n", + g_Dbs.db[i].superTbls[j].startTimestamp); + printf(" sampleFormat: \033[33m%s\033[0m\n", + g_Dbs.db[i].superTbls[j].sampleFormat); + printf(" sampleFile: \033[33m%s\033[0m\n", + g_Dbs.db[i].superTbls[j].sampleFile); + printf(" tagsFile: \033[33m%s\033[0m\n", + g_Dbs.db[i].superTbls[j].tagsFile); + printf(" columnCount: \033[33m%d\033[0m\n ", + g_Dbs.db[i].superTbls[j].columnCount); for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) { //printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); - if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "binary", 6)) - || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "nchar", 5))) { + if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, + "binary", 6)) + || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, + "nchar", 5))) { printf("column[\033[33m%d\033[0m]:\033[33m%s(%d)\033[0m ", k, - g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); + g_Dbs.db[i].superTbls[j].columns[k].dataType, + g_Dbs.db[i].superTbls[j].columns[k].dataLen); } else { printf("column[%d]:\033[33m%s\033[0m ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType); @@ -1130,11 +1158,12 @@ static int printfInsertMeta() { if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "binary", 6)) || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "nchar", 5))) { printf("tag[%d]:\033[33m%s(%d)\033[0m ", k, - g_Dbs.db[i].superTbls[j].tags[k].dataType, g_Dbs.db[i].superTbls[j].tags[k].dataLen); + g_Dbs.db[i].superTbls[j].tags[k].dataType, + g_Dbs.db[i].superTbls[j].tags[k].dataLen); } else { printf("tag[%d]:\033[33m%s\033[0m ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType); - } + } } printf("\n"); } @@ -1768,21 +1797,21 @@ int postProceSql(char* host, uint16_t port, char* sqlstr) return 0; } - -char* getTagValueFromTagSample( SSuperTable* stbInfo, int tagUsePos) { +static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) { char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1); if (NULL == dataBuf) { printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1); return NULL; } - + int dataLen = 0; - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "(%s)", stbInfo->tagDataBuf + stbInfo->lenOfTagOfOneRow * tagUsePos); - + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "(%s)", stbInfo->tagDataBuf + stbInfo->lenOfTagOfOneRow * tagUsePos); + return dataBuf; } -char* generateTagVaulesForStb(SSuperTable* stbInfo) { +static char* generateTagVaulesForStb(SSuperTable* stbInfo) { char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1); if (NULL == dataBuf) { printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1); @@ -1792,13 +1821,15 @@ char* generateTagVaulesForStb(SSuperTable* stbInfo) { int dataLen = 0; dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "("); for (int i = 0; i < stbInfo->tagCount; i++) { - if ((0 == strncasecmp(stbInfo->tags[i].dataType, "binary", 6)) || (0 == strncasecmp(stbInfo->tags[i].dataType, "nchar", 5))) { + if ((0 == strncasecmp(stbInfo->tags[i].dataType, "binary", strlen("binary"))) + || (0 == strncasecmp(stbInfo->tags[i].dataType, "nchar", strlen("nchar")))) { if (stbInfo->tags[i].dataLen > TSDB_MAX_BINARY_LEN) { - printf("binary or nchar length overflow, max size:%u\n", (uint32_t)TSDB_MAX_BINARY_LEN); + printf("binary or nchar length overflow, max size:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); tmfree(dataBuf); return NULL; } - + char* buf = (char*)calloc(stbInfo->tags[i].dataLen+1, 1); if (NULL == buf) { printf("calloc failed! size:%d\n", stbInfo->tags[i].dataLen); @@ -1806,30 +1837,48 @@ char* generateTagVaulesForStb(SSuperTable* stbInfo) { return NULL; } rand_string(buf, stbInfo->tags[i].dataLen); - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "\'%s\', ", buf); + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "\'%s\', ", buf); tmfree(buf); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "int", 3)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_int()); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "bigint", 6)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%"PRId64", ", rand_bigint()); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "float", 5)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%f, ", rand_float()); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "double", 6)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%f, ", rand_double()); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "smallint", 8)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_smallint()); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "tinyint", 7)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_tinyint()); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "bool", 4)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_bool()); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "timestamp", 4)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%"PRId64", ", rand_bigint()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "int", strlen("int"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%d, ", rand_int()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "bigint", strlen("bigint"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%"PRId64", ", rand_bigint()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "float", strlen("float"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%f, ", rand_float()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "double", strlen("double"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%f, ", rand_double()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "smallint", strlen("smallint"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%d, ", rand_smallint()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "tinyint", strlen("tinyint"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%d, ", rand_tinyint()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "bool", strlen("bool"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%d, ", rand_bool()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "timestamp", strlen("timestamp"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%"PRId64", ", rand_bigint()); } else { printf("No support data type: %s\n", stbInfo->tags[i].dataType); tmfree(dataBuf); return NULL; } } + dataLen -= 2; dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, ")"); return dataBuf; @@ -1874,7 +1923,7 @@ static int calcRowLen(SSuperTable* superTbls) { int lenOfTagOfOneRow = 0; for (tagIndex = 0; tagIndex < superTbls->tagCount; tagIndex++) { char* dataType = superTbls->tags[tagIndex].dataType; - + if (strcasecmp(dataType, "BINARY") == 0) { lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 3; } else if (strcasecmp(dataType, "NCHAR") == 0) { @@ -1905,15 +1954,25 @@ static int calcRowLen(SSuperTable* superTbls) { } -static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, char* sTblName, char** childTblNameOfSuperTbl, int* childTblCountOfSuperTbl) { +static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, + char* dbName, char* sTblName, char** childTblNameOfSuperTbl, + int* childTblCountOfSuperTbl, int limit, int offset) { + char command[BUFFER_SIZE] = "\0"; + char limitBuf[100] = "\0"; + TAOS_RES * res; TAOS_ROW row = NULL; char* childTblName = *childTblNameOfSuperTbl; - + + if (offset >= 0) { + snprintf(limitBuf, 100, " limit %d offset %d", limit, offset); + } + //get all child table name use cmd: select tbname from superTblName; - snprintf(command, BUFFER_SIZE, "select tbname from %s.%s", dbName, sTblName); + snprintf(command, BUFFER_SIZE, "select tbname from %s.%s %s", dbName, sTblName, limitBuf); + res = taos_query(taos, command); int32_t code = taos_errno(res); if (code != 0) { @@ -1923,7 +1982,7 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, char* sTblName exit(-1); } - int childTblCount = 10000; + int childTblCount = (limit < 0)?10000:limit; int count = 0; childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN); char* pTblName = childTblName; @@ -1937,7 +1996,8 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, char* sTblName if (tmp != NULL) { childTblName = tmp; childTblCount = (int)(childTblCount*1.5); - memset(childTblName + count*TSDB_TABLE_NAME_LEN, 0, (size_t)((childTblCount-count)*TSDB_TABLE_NAME_LEN)); + memset(childTblName + count*TSDB_TABLE_NAME_LEN, 0, + (size_t)((childTblCount-count)*TSDB_TABLE_NAME_LEN)); } else { // exit, if allocate more memory failed printf("realloc fail for save child table name of %s.%s\n", dbName, sTblName); @@ -1949,7 +2009,7 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, char* sTblName } pTblName = childTblName + count * TSDB_TABLE_NAME_LEN; } - + *childTblCountOfSuperTbl = count; *childTblNameOfSuperTbl = childTblName; @@ -1957,19 +2017,29 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, char* sTblName return 0; } -static int getSuperTableFromServer(TAOS * taos, char* dbName, SSuperTable* superTbls) { +static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, + char* sTblName, char** childTblNameOfSuperTbl, + int* childTblCountOfSuperTbl) { + + return getChildNameOfSuperTableWithLimitAndOffset(taos, dbName, sTblName, + childTblNameOfSuperTbl, childTblCountOfSuperTbl, + -1, -1); +} + +static int getSuperTableFromServer(TAOS * taos, char* dbName, + SSuperTable* superTbls) { char command[BUFFER_SIZE] = "\0"; TAOS_RES * res; TAOS_ROW row = NULL; int count = 0; - - //get schema use cmd: describe superTblName; + + //get schema use cmd: describe superTblName; snprintf(command, BUFFER_SIZE, "describe %s.%s", dbName, superTbls->sTblName); - res = taos_query(taos, command); + res = taos_query(taos, command); int32_t code = taos_errno(res); if (code != 0) { printf("failed to run command %s\n", command); - taos_free_result(res); + taos_free_result(res); return -1; } @@ -1980,19 +2050,33 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, SSuperTable* supe if (0 == count) { count++; continue; - } + } if (strcmp((char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], "TAG") == 0) { - tstrncpy(superTbls->tags[tagIndex].field, (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX], fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes); - tstrncpy(superTbls->tags[tagIndex].dataType, (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes); - superTbls->tags[tagIndex].dataLen = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]); - tstrncpy(superTbls->tags[tagIndex].note, (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes); + tstrncpy(superTbls->tags[tagIndex].field, + (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX], + fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes); + tstrncpy(superTbls->tags[tagIndex].dataType, + (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], + fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes); + superTbls->tags[tagIndex].dataLen = + *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]); + tstrncpy(superTbls->tags[tagIndex].note, + (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], + fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes); tagIndex++; - } else { - tstrncpy(superTbls->columns[columnIndex].field, (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX], fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes); - tstrncpy(superTbls->columns[columnIndex].dataType, (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes); - superTbls->columns[columnIndex].dataLen = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]); - tstrncpy(superTbls->columns[columnIndex].note, (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes); + } else { + tstrncpy(superTbls->columns[columnIndex].field, + (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX], + fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes); + tstrncpy(superTbls->columns[columnIndex].dataType, + (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], + fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes); + superTbls->columns[columnIndex].dataLen = + *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]); + tstrncpy(superTbls->columns[columnIndex].note, + (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], + fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes); columnIndex++; } count++; @@ -2006,14 +2090,17 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, SSuperTable* supe if (TBL_ALREADY_EXISTS == superTbls->childTblExists) { //get all child table name use cmd: select tbname from superTblName; - getAllChildNameOfSuperTable(taos, dbName, superTbls->sTblName, &superTbls->childTblName, &superTbls->childTblCount); + getAllChildNameOfSuperTable(taos, dbName, + superTbls->sTblName, + &superTbls->childTblName, + &superTbls->childTblCount); } return 0; } -static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, bool use_metric) { +static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, bool use_metric) { char command[BUFFER_SIZE] = "\0"; - + char cols[STRING_LEN] = "\0"; int colIndex; int len = 0; @@ -2021,7 +2108,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, int lenOfOneRow = 0; for (colIndex = 0; colIndex < superTbls->columnCount; colIndex++) { char* dataType = superTbls->columns[colIndex].dataType; - + if (strcasecmp(dataType, "BINARY") == 0) { len += snprintf(cols + len, STRING_LEN - len, ", col%d %s(%d)", colIndex, "BINARY", @@ -2050,10 +2137,10 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, } else if (strcasecmp(dataType, "FLOAT") == 0) { len += snprintf(cols + len, STRING_LEN - len, ", col%d %s", colIndex, "FLOAT"); lenOfOneRow += 22; - } else if (strcasecmp(dataType, "DOUBLE") == 0) { + } else if (strcasecmp(dataType, "DOUBLE") == 0) { len += snprintf(cols + len, STRING_LEN - len, ", col%d %s", colIndex, "DOUBLE"); lenOfOneRow += 42; - } else if (strcasecmp(dataType, "TIMESTAMP") == 0) { + } else if (strcasecmp(dataType, "TIMESTAMP") == 0) { len += snprintf(cols + len, STRING_LEN - len, ", col%d %s", colIndex, "TIMESTAMP"); lenOfOneRow += 21; } else { @@ -2085,33 +2172,42 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, len += snprintf(tags + len, STRING_LEN - len, "("); for (tagIndex = 0; tagIndex < superTbls->tagCount; tagIndex++) { char* dataType = superTbls->tags[tagIndex].dataType; - + if (strcasecmp(dataType, "BINARY") == 0) { - len += snprintf(tags + len, STRING_LEN - len, "t%d %s(%d), ", tagIndex, "BINARY", superTbls->tags[tagIndex].dataLen); + len += snprintf(tags + len, STRING_LEN - len, "t%d %s(%d), ", tagIndex, + "BINARY", superTbls->tags[tagIndex].dataLen); lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 3; } else if (strcasecmp(dataType, "NCHAR") == 0) { - len += snprintf(tags + len, STRING_LEN - len, "t%d %s(%d), ", tagIndex, "NCHAR", superTbls->tags[tagIndex].dataLen); + len += snprintf(tags + len, STRING_LEN - len, "t%d %s(%d), ", tagIndex, + "NCHAR", superTbls->tags[tagIndex].dataLen); lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 3; } else if (strcasecmp(dataType, "INT") == 0) { - len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "INT"); + len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, + "INT"); lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 11; } else if (strcasecmp(dataType, "BIGINT") == 0) { - len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "BIGINT"); + len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, + "BIGINT"); lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 21; } else if (strcasecmp(dataType, "SMALLINT") == 0) { - len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "SMALLINT"); + len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, + "SMALLINT"); lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 6; } else if (strcasecmp(dataType, "TINYINT") == 0) { - len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "TINYINT"); + len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, + "TINYINT"); lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 4; } else if (strcasecmp(dataType, "BOOL") == 0) { - len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "BOOL"); + len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, + "BOOL"); lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 6; } else if (strcasecmp(dataType, "FLOAT") == 0) { - len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "FLOAT"); + len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, + "FLOAT"); lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 22; } else if (strcasecmp(dataType, "DOUBLE") == 0) { - len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "DOUBLE"); + len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, + "DOUBLE"); lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 42; } else { taos_close(taos); @@ -2130,7 +2226,8 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { - fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName); + fprintf(stderr, "create supertable %s failed!\n\n", + superTbls->sTblName); return -1; } debugPrint("create supertable %s success!\n\n", superTbls->sTblName); @@ -2138,7 +2235,6 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, return 0; } - static int createDatabases() { TAOS * taos = NULL; int ret = 0; @@ -2352,7 +2448,7 @@ static void* createTable(void *sarg) } int startMultiThreadCreateChildTable( - char* cols, int threads, int ntables, + char* cols, int threads, int startFrom, int ntables, char* db_name, SSuperTable* superTblInfo) { pthread_t *pids = malloc(threads * sizeof(pthread_t)); threadInfo *infos = malloc(threads * sizeof(threadInfo)); @@ -2375,7 +2471,7 @@ int startMultiThreadCreateChildTable( int b = 0; b = ntables % threads; - int last = 0; + int last = startFrom; for (int i = 0; i < threads; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; @@ -2423,7 +2519,7 @@ static void createChildTables() { char tblColsBuf[MAX_SQL_SIZE]; int len; - for (int i = 0; i < g_Dbs.dbCount; i++) { + for (int i = 0; i < g_Dbs.dbCount; i++) { if (g_Dbs.db[i].superTblCount > 0) { // with super table for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { @@ -2434,12 +2530,17 @@ static void createChildTables() { verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); + int startFrom = 0; + g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; + + verbosePrint("%s() LN%d: create %d child tables from %d\n", __func__, __LINE__, + g_totalChildTables, startFrom); startMultiThreadCreateChildTable( g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, g_Dbs.threadCountByCreateTbl, - g_Dbs.db[i].superTbls[j].childTblCount, + startFrom, + g_totalChildTables, g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); - g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; } } else { // normal table @@ -2447,10 +2548,13 @@ static void createChildTables() { int j = 0; while (g_args.datatype[j]) { if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0) - || (strncasecmp(g_args.datatype[j], "NCHAR", strlen("NCHAR")) == 0)) { - len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s(60)", j, g_args.datatype[j]); + || (strncasecmp(g_args.datatype[j], + "NCHAR", strlen("NCHAR")) == 0)) { + len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, + ", COL%d %s(60)", j, g_args.datatype[j]); } else { - len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s", j, g_args.datatype[j]); + len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, + ", COL%d %s", j, g_args.datatype[j]); } len = strlen(tblColsBuf); j++; @@ -2458,11 +2562,13 @@ static void createChildTables() { len = snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")"); - verbosePrint("%s() LN%d: dbName: %s num of tb: %d schema: %s\n", __func__, __LINE__, + verbosePrint("%s() LN%d: dbName: %s num of tb: %d schema: %s\n", + __func__, __LINE__, g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf); startMultiThreadCreateChildTable( tblColsBuf, g_Dbs.threadCountByCreateTbl, + 0, g_args.num_of_tables, g_Dbs.db[i].dbName, NULL); @@ -2470,30 +2576,6 @@ static void createChildTables() { } } -/* -static int taosGetLineNum(const char *fileName) -{ - int lineNum = 0; - char cmd[1024] = { 0 }; - char buf[1024] = { 0 }; - sprintf(cmd, "wc -l %s", fileName); - - FILE *fp = popen(cmd, "r"); - if (fp == NULL) { - fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno)); - return lineNum; - } - - if (fgets(buf, sizeof(buf), fp)) { - int index = strchr((const char*)buf, ' ') - buf; - buf[index] = '\0'; - lineNum = atoi(buf); - } - pclose(fp); - return lineNum; -} -*/ - /* Read 10000 lines at most. If more than 10000 lines, continue to read after using */ @@ -2571,19 +2653,29 @@ int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) { /* Read 10000 lines at most. If more than 10000 lines, continue to read after using */ -int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sampleBuf) { +static int readSampleFromCsvFileToMem( + SSuperTable* superTblInfo) { size_t n = 0; ssize_t readLen = 0; char * line = NULL; int getRows = 0; + + FILE* fp = fopen(superTblInfo->sampleFile, "r"); + if (fp == NULL) { + fprintf(stderr, "Failed to open sample file: %s, reason:%s\n", + superTblInfo->sampleFile, strerror(errno)); + return -1; + } - memset(sampleBuf, 0, MAX_SAMPLES_ONCE_FROM_FILE* superTblInfo->lenOfOneRow); + memset(superTblInfo->sampleDataBuf, 0, + MAX_SAMPLES_ONCE_FROM_FILE * superTblInfo->lenOfOneRow); while (1) { readLen = tgetline(&line, &n, fp); if (-1 == readLen) { if(0 != fseek(fp, 0, SEEK_SET)) { - printf("Failed to fseek file: %s, reason:%s\n", + fprintf(stderr, "Failed to fseek file: %s, reason:%s\n", superTblInfo->sampleFile, strerror(errno)); + fclose(fp); return -1; } continue; @@ -2603,7 +2695,8 @@ int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sample continue; } - memcpy(sampleBuf + getRows * superTblInfo->lenOfOneRow, line, readLen); + memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow, + line, readLen); getRows++; if (getRows == MAX_SAMPLES_ONCE_FROM_FILE) { @@ -2611,6 +2704,7 @@ int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sample } } + fclose(fp); tmfree(line); return 0; } @@ -2635,7 +2729,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s // columns cJSON *columns = cJSON_GetObjectItem(stbInfo, "columns"); if (columns && columns->type != cJSON_Array) { - printf("failed to read json, columns not found\n"); + printf("ERROR: failed to read json, columns not found\n"); goto PARSE_OVER; } else if (NULL == columns) { superTbls->columnCount = 0; @@ -2645,7 +2739,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s int columnSize = cJSON_GetArraySize(columns); if (columnSize > MAX_COLUMN_COUNT) { - printf("failed to read json, column size overflow, max column size is %d\n", + printf("ERROR: failed to read json, column size overflow, max column size is %d\n", MAX_COLUMN_COUNT); goto PARSE_OVER; } @@ -2664,7 +2758,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s if (countObj && countObj->type == cJSON_Number) { count = countObj->valueint; } else if (countObj && countObj->type != cJSON_Number) { - printf("failed to read json, column count not found"); + printf("ERROR: failed to read json, column count not found\n"); goto PARSE_OVER; } else { count = 1; @@ -2674,7 +2768,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s memset(&columnCase, 0, sizeof(StrColumn)); cJSON *dataType = cJSON_GetObjectItem(column, "type"); if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) { - printf("failed to read json, column type not found"); + printf("ERROR: failed to read json, column type not found\n"); goto PARSE_OVER; } //tstrncpy(superTbls->columns[k].dataType, dataType->valuestring, MAX_TB_NAME_SIZE); @@ -2684,7 +2778,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s if (dataLen && dataLen->type == cJSON_Number) { columnCase.dataLen = dataLen->valueint; } else if (dataLen && dataLen->type != cJSON_Number) { - printf("failed to read json, column len not found"); + printf("ERROR: failed to read json, column len not found\n"); goto PARSE_OVER; } else { columnCase.dataLen = 8; @@ -2703,13 +2797,13 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s // tags cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags"); if (!tags || tags->type != cJSON_Array) { - printf("failed to read json, tags not found"); + printf("ERROR: failed to read json, tags not found\n"); goto PARSE_OVER; } int tagSize = cJSON_GetArraySize(tags); if (tagSize > MAX_TAG_COUNT) { - printf("failed to read json, tags size overflow, max tag size is %d\n", MAX_TAG_COUNT); + printf("ERROR: failed to read json, tags size overflow, max tag size is %d\n", MAX_TAG_COUNT); goto PARSE_OVER; } @@ -2723,7 +2817,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s if (countObj && countObj->type == cJSON_Number) { count = countObj->valueint; } else if (countObj && countObj->type != cJSON_Number) { - printf("failed to read json, column count not found"); + printf("ERROR: failed to read json, column count not found\n"); goto PARSE_OVER; } else { count = 1; @@ -2733,7 +2827,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s memset(&columnCase, 0, sizeof(StrColumn)); cJSON *dataType = cJSON_GetObjectItem(tag, "type"); if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) { - printf("failed to read json, tag type not found"); + printf("ERROR: failed to read json, tag type not found\n"); goto PARSE_OVER; } tstrncpy(columnCase.dataType, dataType->valuestring, MAX_TB_NAME_SIZE); @@ -2742,7 +2836,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s if (dataLen && dataLen->type == cJSON_Number) { columnCase.dataLen = dataLen->valueint; } else if (dataLen && dataLen->type != cJSON_Number) { - printf("failed to read json, column len not found"); + printf("ERROR: failed to read json, column len not found\n"); goto PARSE_OVER; } else { columnCase.dataLen = 0; @@ -2779,7 +2873,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!host) { tstrncpy(g_Dbs.host, "127.0.0.1", MAX_DB_NAME_SIZE); } else { - printf("failed to read json, host not found\n"); + printf("ERROR: failed to read json, host not found\n"); goto PARSE_OVER; } @@ -2817,17 +2911,17 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!threads) { g_Dbs.threadCount = 1; } else { - printf("failed to read json, threads not found"); + printf("ERROR: failed to read json, threads not found\n"); goto PARSE_OVER; } - + cJSON* threads2 = cJSON_GetObjectItem(root, "thread_count_create_tbl"); if (threads2 && threads2->type == cJSON_Number) { g_Dbs.threadCountByCreateTbl = threads2->valueint; } else if (!threads2) { g_Dbs.threadCountByCreateTbl = 1; } else { - printf("failed to read json, threads2 not found"); + printf("ERROR: failed to read json, threads2 not found\n"); goto PARSE_OVER; } @@ -2837,17 +2931,28 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!gInsertInterval) { g_args.insert_interval = 0; } else { - printf("failed to read json, insert_interval input mistake"); + fprintf(stderr, "ERROR: failed to read json, insert_interval input mistake\n"); goto PARSE_OVER; } + cJSON* maxSqlLen = cJSON_GetObjectItem(root, "max_sql_len"); + if (maxSqlLen && maxSqlLen->type == cJSON_Number) { + g_args.max_sql_len = maxSqlLen->valueint; + } else if (!maxSqlLen) { + g_args.max_sql_len = TSDB_PAYLOAD_SIZE; + } else { + fprintf(stderr, "ERROR: failed to read json, max_sql_len input mistake\n"); + goto PARSE_OVER; + } + + cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req"); if (numRecPerReq && numRecPerReq->type == cJSON_Number) { g_args.num_of_RPR = numRecPerReq->valueint; } else if (!numRecPerReq) { g_args.num_of_RPR = 100; } else { - printf("failed to read json, num_of_records_per_req not found"); + printf("ERROR: failed to read json, num_of_records_per_req not found\n"); goto PARSE_OVER; } @@ -2865,19 +2970,19 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!answerPrompt) { g_args.answer_yes = false; } else { - printf("failed to read json, confirm_parameter_prompt not found"); + printf("ERROR: failed to read json, confirm_parameter_prompt not found\n"); goto PARSE_OVER; } cJSON* dbs = cJSON_GetObjectItem(root, "databases"); if (!dbs || dbs->type != cJSON_Array) { - printf("failed to read json, databases not found\n"); + printf("ERROR: failed to read json, databases not found\n"); goto PARSE_OVER; } int dbSize = cJSON_GetArraySize(dbs); if (dbSize > MAX_DB_COUNT) { - printf("failed to read json, databases size overflow, max database is %d\n", MAX_DB_COUNT); + printf("ERROR: failed to read json, databases size overflow, max database is %d\n", MAX_DB_COUNT); goto PARSE_OVER; } @@ -2889,13 +2994,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { // dbinfo cJSON *dbinfo = cJSON_GetObjectItem(dbinfos, "dbinfo"); if (!dbinfo || dbinfo->type != cJSON_Object) { - printf("failed to read json, dbinfo not found"); + printf("ERROR: failed to read json, dbinfo not found\n"); goto PARSE_OVER; } cJSON *dbName = cJSON_GetObjectItem(dbinfo, "name"); if (!dbName || dbName->type != cJSON_String || dbName->valuestring == NULL) { - printf("failed to read json, db name not found"); + printf("ERROR: failed to read json, db name not found\n"); goto PARSE_OVER; } tstrncpy(g_Dbs.db[i].dbName, dbName->valuestring, MAX_DB_NAME_SIZE); @@ -2910,7 +3015,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!drop) { g_Dbs.db[i].drop = 0; } else { - printf("failed to read json, drop not found"); + printf("ERROR: failed to read json, drop not found\n"); goto PARSE_OVER; } @@ -2921,7 +3026,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { //tstrncpy(g_Dbs.db[i].dbCfg.precision, "ms", MAX_DB_NAME_SIZE); memset(g_Dbs.db[i].dbCfg.precision, 0, MAX_DB_NAME_SIZE); } else { - printf("failed to read json, precision not found"); + printf("ERROR: failed to read json, precision not found\n"); goto PARSE_OVER; } @@ -2931,7 +3036,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!update) { g_Dbs.db[i].dbCfg.update = -1; } else { - printf("failed to read json, update not found"); + printf("ERROR: failed to read json, update not found\n"); goto PARSE_OVER; } @@ -2941,7 +3046,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!replica) { g_Dbs.db[i].dbCfg.replica = -1; } else { - printf("failed to read json, replica not found"); + printf("ERROR: failed to read json, replica not found\n"); goto PARSE_OVER; } @@ -2951,7 +3056,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!keep) { g_Dbs.db[i].dbCfg.keep = -1; } else { - printf("failed to read json, keep not found"); + printf("ERROR: failed to read json, keep not found\n"); goto PARSE_OVER; } @@ -2961,7 +3066,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!days) { g_Dbs.db[i].dbCfg.days = -1; } else { - printf("failed to read json, days not found"); + printf("ERROR: failed to read json, days not found\n"); goto PARSE_OVER; } @@ -2971,7 +3076,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!cache) { g_Dbs.db[i].dbCfg.cache = -1; } else { - printf("failed to read json, cache not found"); + printf("ERROR: failed to read json, cache not found\n"); goto PARSE_OVER; } @@ -2981,7 +3086,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!blocks) { g_Dbs.db[i].dbCfg.blocks = -1; } else { - printf("failed to read json, block not found"); + printf("ERROR: failed to read json, block not found\n"); goto PARSE_OVER; } @@ -3001,7 +3106,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!minRows) { g_Dbs.db[i].dbCfg.minRows = -1; } else { - printf("failed to read json, minRows not found"); + printf("ERROR: failed to read json, minRows not found\n"); goto PARSE_OVER; } @@ -3011,7 +3116,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!maxRows) { g_Dbs.db[i].dbCfg.maxRows = -1; } else { - printf("failed to read json, maxRows not found"); + printf("ERROR: failed to read json, maxRows not found\n"); goto PARSE_OVER; } @@ -3021,7 +3126,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!comp) { g_Dbs.db[i].dbCfg.comp = -1; } else { - printf("failed to read json, comp not found"); + printf("ERROR: failed to read json, comp not found\n"); goto PARSE_OVER; } @@ -3031,7 +3136,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!walLevel) { g_Dbs.db[i].dbCfg.walLevel = -1; } else { - printf("failed to read json, walLevel not found"); + printf("ERROR: failed to read json, walLevel not found\n"); goto PARSE_OVER; } @@ -3041,7 +3146,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!cacheLast) { g_Dbs.db[i].dbCfg.cacheLast = -1; } else { - printf("failed to read json, cacheLast not found"); + printf("ERROR: failed to read json, cacheLast not found\n"); goto PARSE_OVER; } @@ -3061,20 +3166,20 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!fsync) { g_Dbs.db[i].dbCfg.fsync = -1; } else { - printf("failed to read json, fsync not found"); + printf("ERROR: failed to read json, fsync not found\n"); goto PARSE_OVER; } // super_talbes cJSON *stables = cJSON_GetObjectItem(dbinfos, "super_tables"); if (!stables || stables->type != cJSON_Array) { - printf("failed to read json, super_tables not found"); + printf("ERROR: failed to read json, super_tables not found\n"); goto PARSE_OVER; } int stbSize = cJSON_GetArraySize(stables); if (stbSize > MAX_SUPER_TABLE_COUNT) { - printf("failed to read json, databases size overflow, max database is %d\n", MAX_SUPER_TABLE_COUNT); + printf("ERROR: failed to read json, databases size overflow, max database is %d\n", MAX_SUPER_TABLE_COUNT); goto PARSE_OVER; } @@ -3086,14 +3191,14 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { // dbinfo cJSON *stbName = cJSON_GetObjectItem(stbInfo, "name"); if (!stbName || stbName->type != cJSON_String || stbName->valuestring == NULL) { - printf("failed to read json, stb name not found"); + printf("ERROR: failed to read json, stb name not found\n"); goto PARSE_OVER; } tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring, MAX_TB_NAME_SIZE); cJSON *prefix = cJSON_GetObjectItem(stbInfo, "childtable_prefix"); if (!prefix || prefix->type != cJSON_String || prefix->valuestring == NULL) { - printf("failed to read json, childtable_prefix not found"); + printf("ERROR: failed to read json, childtable_prefix not found\n"); goto PARSE_OVER; } tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring, MAX_DB_NAME_SIZE); @@ -3112,7 +3217,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!autoCreateTbl) { g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; } else { - printf("failed to read json, auto_create_table not found"); + printf("ERROR: failed to read json, auto_create_table not found\n"); goto PARSE_OVER; } @@ -3122,7 +3227,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!batchCreateTbl) { g_Dbs.db[i].superTbls[j].batchCreateTableNum = 1000; } else { - printf("failed to read json, batch_create_tbl_num not found"); + printf("ERROR: failed to read json, batch_create_tbl_num not found\n"); goto PARSE_OVER; } @@ -3140,13 +3245,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!childTblExists) { g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS; } else { - printf("failed to read json, child_table_exists not found"); + printf("ERROR: failed to read json, child_table_exists not found\n"); goto PARSE_OVER; } cJSON* count = cJSON_GetObjectItem(stbInfo, "childtable_count"); if (!count || count->type != cJSON_Number || 0 >= count->valueint) { - printf("failed to read json, childtable_count not found"); + printf("ERROR: failed to read json, childtable_count not found\n"); goto PARSE_OVER; } g_Dbs.db[i].superTbls[j].childTblCount = count->valueint; @@ -3159,7 +3264,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!dataSource) { tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", MAX_DB_NAME_SIZE); } else { - printf("failed to read json, data_source not found"); + printf("ERROR: failed to read json, data_source not found\n"); goto PARSE_OVER; } @@ -3171,27 +3276,49 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!insertMode) { tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, "taosc", MAX_DB_NAME_SIZE); } else { - printf("failed to read json, insert_mode not found"); + printf("ERROR: failed to read json, insert_mode not found\n"); goto PARSE_OVER; } + cJSON* childTbl_limit = cJSON_GetObjectItem(stbInfo, "childtable_limit"); + if (childTbl_limit) { + if (childTbl_limit->type != cJSON_Number) { + printf("ERROR: failed to read json, childtable_limit\n"); + goto PARSE_OVER; + } + g_Dbs.db[i].superTbls[j].childTblLimit = childTbl_limit->valueint; + } else { + g_Dbs.db[i].superTbls[j].childTblLimit = -1; // select ... limit -1 means all query result + } + + cJSON* childTbl_offset = cJSON_GetObjectItem(stbInfo, "childtable_offset"); + if (childTbl_offset) { + if (childTbl_offset->type != cJSON_Number || 0 > childTbl_offset->valueint) { + printf("ERROR: failed to read json, childtable_offset\n"); + goto PARSE_OVER; + } + g_Dbs.db[i].superTbls[j].childTblOffset = childTbl_offset->valueint; + } else { + g_Dbs.db[i].superTbls[j].childTblOffset = 0; + } + cJSON *ts = cJSON_GetObjectItem(stbInfo, "start_timestamp"); if (ts && ts->type == cJSON_String && ts->valuestring != NULL) { tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp, ts->valuestring, MAX_DB_NAME_SIZE); } else if (!ts) { tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp, "now", MAX_DB_NAME_SIZE); } else { - printf("failed to read json, start_timestamp not found"); + printf("ERROR: failed to read json, start_timestamp not found\n"); goto PARSE_OVER; } - + cJSON* timestampStep = cJSON_GetObjectItem(stbInfo, "timestamp_step"); if (timestampStep && timestampStep->type == cJSON_Number) { g_Dbs.db[i].superTbls[j].timeStampStep = timestampStep->valueint; } else if (!timestampStep) { - g_Dbs.db[i].superTbls[j].timeStampStep = 1000; + g_Dbs.db[i].superTbls[j].timeStampStep = DEFAULT_TIMESTAMP_STEP; } else { - printf("failed to read json, timestamp_step not found"); + printf("ERROR: failed to read json, timestamp_step not found\n"); goto PARSE_OVER; } @@ -3204,10 +3331,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!sampleDataBufSize) { g_Dbs.db[i].superTbls[j].sampleDataBufSize = 1024*1024 + 1024; } else { - printf("failed to read json, sample_buf_size not found"); + printf("ERROR: failed to read json, sample_buf_size not found\n"); goto PARSE_OVER; } - + cJSON *sampleFormat = cJSON_GetObjectItem(stbInfo, "sample_format"); if (sampleFormat && sampleFormat->type == cJSON_String && sampleFormat->valuestring != NULL) { tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, @@ -3215,10 +3342,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!sampleFormat) { tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, "csv", MAX_DB_NAME_SIZE); } else { - printf("failed to read json, sample_format not found"); + printf("ERROR: failed to read json, sample_format not found\n"); goto PARSE_OVER; } - + cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file"); if (sampleFile && sampleFile->type == cJSON_String && sampleFile->valuestring != NULL) { tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile, @@ -3226,10 +3353,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!sampleFile) { memset(g_Dbs.db[i].superTbls[j].sampleFile, 0, MAX_FILE_NAME_LEN); } else { - printf("failed to read json, sample_file not found"); + printf("ERROR: failed to read json, sample_file not found\n"); goto PARSE_OVER; } - + cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file"); if (tagsFile && tagsFile->type == cJSON_String && tagsFile->valuestring != NULL) { tstrncpy(g_Dbs.db[i].superTbls[j].tagsFile, @@ -3243,10 +3370,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { memset(g_Dbs.db[i].superTbls[j].tagsFile, 0, MAX_FILE_NAME_LEN); g_Dbs.db[i].superTbls[j].tagSource = 0; } else { - printf("failed to read json, tags_file not found"); + printf("ERROR: failed to read json, tags_file not found\n"); goto PARSE_OVER; } - + cJSON* maxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len"); if (maxSqlLen && maxSqlLen->type == cJSON_Number) { int32_t len = maxSqlLen->valueint; @@ -3259,7 +3386,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!maxSqlLen) { g_Dbs.db[i].superTbls[j].maxSqlLen = TSDB_MAX_SQL_LEN; } else { - printf("failed to read json, maxSqlLen not found"); + printf("ERROR: failed to read json, maxSqlLen not found\n"); goto PARSE_OVER; } @@ -3276,7 +3403,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!multiThreadWriteOneTbl) { g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 0; } else { - printf("failed to read json, multiThreadWriteOneTbl not found"); + printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n"); goto PARSE_OVER; } @@ -3286,7 +3413,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!numberOfTblInOneSql) { g_Dbs.db[i].superTbls[j].numberOfTblInOneSql = 0; } else { - printf("failed to read json, numberOfTblInOneSql not found"); + printf("ERROR: failed to read json, numberOfTblInOneSql not found\n"); goto PARSE_OVER; } @@ -3296,7 +3423,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!rowsPerTbl) { g_Dbs.db[i].superTbls[j].rowsPerTbl = 1; } else { - printf("failed to read json, rowsPerTbl not found"); + printf("ERROR: failed to read json, rowsPerTbl not found\n"); goto PARSE_OVER; } @@ -3306,7 +3433,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!disorderRatio) { g_Dbs.db[i].superTbls[j].disorderRatio = 0; } else { - printf("failed to read json, disorderRatio not found"); + printf("ERROR: failed to read json, disorderRatio not found\n"); goto PARSE_OVER; } @@ -3316,7 +3443,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!disorderRange) { g_Dbs.db[i].superTbls[j].disorderRange = 1000; } else { - printf("failed to read json, disorderRange not found"); + printf("ERROR: failed to read json, disorderRange not found\n"); goto PARSE_OVER; } @@ -3326,7 +3453,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!insertRows) { g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; } else { - printf("failed to read json, insert_rows input mistake"); + fprintf(stderr, "failed to read json, insert_rows input mistake"); goto PARSE_OVER; } @@ -3338,7 +3465,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { __func__, __LINE__, g_args.insert_interval); g_Dbs.db[i].superTbls[j].insertInterval = g_args.insert_interval; } else { - printf("failed to read json, insert_interval input mistake"); + fprintf(stderr, "failed to read json, insert_interval input mistake"); goto PARSE_OVER; } @@ -3377,7 +3504,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else if (!host) { tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_DB_NAME_SIZE); } else { - printf("failed to read json, host not found\n"); + printf("ERROR: failed to read json, host not found\n"); goto PARSE_OVER; } @@ -3415,7 +3542,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else if (!answerPrompt) { g_args.answer_yes = false; } else { - printf("failed to read json, confirm_parameter_prompt not found"); + printf("ERROR: failed to read json, confirm_parameter_prompt not found\n"); goto PARSE_OVER; } @@ -3423,7 +3550,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { if (dbs && dbs->type == cJSON_String && dbs->valuestring != NULL) { tstrncpy(g_queryInfo.dbName, dbs->valuestring, MAX_DB_NAME_SIZE); } else if (!dbs) { - printf("failed to read json, databases not found\n"); + printf("ERROR: failed to read json, databases not found\n"); goto PARSE_OVER; } @@ -3433,7 +3560,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else if (!queryMode) { tstrncpy(g_queryInfo.queryMode, "taosc", MAX_TB_NAME_SIZE); } else { - printf("failed to read json, query_mode not found\n"); + printf("ERROR: failed to read json, query_mode not found\n"); goto PARSE_OVER; } @@ -3443,7 +3570,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { g_queryInfo.superQueryInfo.concurrent = 0; g_queryInfo.superQueryInfo.sqlCount = 0; } else if (superQuery->type != cJSON_Object) { - printf("failed to read json, super_table_query not found"); + printf("ERROR: failed to read json, super_table_query not found\n"); goto PARSE_OVER; } else { cJSON* rate = cJSON_GetObjectItem(superQuery, "query_interval"); @@ -3467,7 +3594,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else if (0 == strcmp("async", mode->valuestring)) { g_queryInfo.superQueryInfo.subscribeMode = 1; } else { - printf("failed to read json, subscribe mod error\n"); + printf("ERROR: failed to read json, subscribe mod error\n"); goto PARSE_OVER; } } else { @@ -3490,7 +3617,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else if (0 == strcmp("no", restart->valuestring)) { g_queryInfo.superQueryInfo.subscribeRestart = 0; } else { - printf("failed to read json, subscribe restart error\n"); + printf("ERROR: failed to read json, subscribe restart error\n"); goto PARSE_OVER; } } else { @@ -3506,7 +3633,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else if (0 == strcmp("no", keepProgress->valuestring)) { g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; } else { - printf("failed to read json, subscribe keepProgress error\n"); + printf("ERROR: failed to read json, subscribe keepProgress error\n"); goto PARSE_OVER; } } else { @@ -3518,15 +3645,15 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { if (!superSqls) { g_queryInfo.superQueryInfo.sqlCount = 0; } else if (superSqls->type != cJSON_Array) { - printf("failed to read json, super sqls not found\n"); + printf("ERROR: failed to read json, super sqls not found\n"); goto PARSE_OVER; } else { int superSqlSize = cJSON_GetArraySize(superSqls); if (superSqlSize > MAX_QUERY_SQL_COUNT) { - printf("failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT); + printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT); goto PARSE_OVER; } - + g_queryInfo.superQueryInfo.sqlCount = superSqlSize; for (int j = 0; j < superSqlSize; ++j) { cJSON* sql = cJSON_GetArrayItem(superSqls, j); @@ -3534,7 +3661,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql"); if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) { - printf("failed to read json, sql not found\n"); + printf("ERROR: failed to read json, sql not found\n"); goto PARSE_OVER; } tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); @@ -3545,20 +3672,20 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else if (NULL == result) { memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN); } else { - printf("failed to read json, super query result file not found\n"); + printf("ERROR: failed to read json, super query result file not found\n"); goto PARSE_OVER; } } } } - + // sub_table_query cJSON *subQuery = cJSON_GetObjectItem(root, "super_table_query"); if (!subQuery) { g_queryInfo.subQueryInfo.threadCnt = 0; g_queryInfo.subQueryInfo.sqlCount = 0; } else if (subQuery->type != cJSON_Object) { - printf("failed to read json, sub_table_query not found"); + printf("ERROR: failed to read json, sub_table_query not found\n"); ret = true; goto PARSE_OVER; } else { @@ -3568,29 +3695,29 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else if (!subrate) { g_queryInfo.subQueryInfo.rate = 0; } - + cJSON* threads = cJSON_GetObjectItem(subQuery, "threads"); if (threads && threads->type == cJSON_Number) { g_queryInfo.subQueryInfo.threadCnt = threads->valueint; } else if (!threads) { g_queryInfo.subQueryInfo.threadCnt = 1; } - + //cJSON* subTblCnt = cJSON_GetObjectItem(subQuery, "childtable_count"); //if (subTblCnt && subTblCnt->type == cJSON_Number) { // g_queryInfo.subQueryInfo.childTblCount = subTblCnt->valueint; //} else if (!subTblCnt) { // g_queryInfo.subQueryInfo.childTblCount = 0; //} - + cJSON* stblname = cJSON_GetObjectItem(subQuery, "stblname"); if (stblname && stblname->type == cJSON_String && stblname->valuestring != NULL) { tstrncpy(g_queryInfo.subQueryInfo.sTblName, stblname->valuestring, MAX_TB_NAME_SIZE); } else { - printf("failed to read json, super table name not found\n"); + printf("ERROR: failed to read json, super table name not found\n"); goto PARSE_OVER; } - + cJSON* submode = cJSON_GetObjectItem(subQuery, "mode"); if (submode && submode->type == cJSON_String && submode->valuestring != NULL) { if (0 == strcmp("sync", submode->valuestring)) { @@ -3598,13 +3725,13 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else if (0 == strcmp("async", submode->valuestring)) { g_queryInfo.subQueryInfo.subscribeMode = 1; } else { - printf("failed to read json, subscribe mod error\n"); + printf("ERROR: failed to read json, subscribe mod error\n"); goto PARSE_OVER; } } else { g_queryInfo.subQueryInfo.subscribeMode = 0; } - + cJSON* subinterval = cJSON_GetObjectItem(subQuery, "interval"); if (subinterval && subinterval->type == cJSON_Number) { g_queryInfo.subQueryInfo.subscribeInterval = subinterval->valueint; @@ -3621,7 +3748,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else if (0 == strcmp("no", subrestart->valuestring)) { g_queryInfo.subQueryInfo.subscribeRestart = 0; } else { - printf("failed to read json, subscribe restart error\n"); + printf("ERROR: failed to read json, subscribe restart error\n"); goto PARSE_OVER; } } else { @@ -3635,7 +3762,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else if (0 == strcmp("no", subkeepProgress->valuestring)) { g_queryInfo.subQueryInfo.subscribeKeepProgress = 0; } else { - printf("failed to read json, subscribe keepProgress error\n"); + printf("ERROR: failed to read json, subscribe keepProgress error\n"); goto PARSE_OVER; } } else { @@ -3647,12 +3774,12 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { if (!subsqls) { g_queryInfo.subQueryInfo.sqlCount = 0; } else if (subsqls->type != cJSON_Array) { - printf("failed to read json, super sqls not found\n"); + printf("ERROR: failed to read json, super sqls not found\n"); goto PARSE_OVER; } else { int superSqlSize = cJSON_GetArraySize(subsqls); if (superSqlSize > MAX_QUERY_SQL_COUNT) { - printf("failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT); + printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT); goto PARSE_OVER; } @@ -3663,7 +3790,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql"); if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) { - printf("failed to read json, sql not found\n"); + printf("ERROR: failed to read json, sql not found\n"); goto PARSE_OVER; } tstrncpy(g_queryInfo.subQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); @@ -3674,7 +3801,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } else if (NULL == result) { memset(g_queryInfo.subQueryInfo.result[j], 0, MAX_FILE_NAME_LEN); } else { - printf("failed to read json, sub query result file not found\n"); + printf("ERROR: failed to read json, sub query result file not found\n"); goto PARSE_OVER; } } @@ -3713,7 +3840,7 @@ static bool getInfoFromJsonFile(char* file) { content[len] = 0; cJSON* root = cJSON_Parse(content); if (root == NULL) { - printf("failed to cjson parse %s, invalid json format", file); + printf("ERROR: failed to cjson parse %s, invalid json format\n", file); goto PARSE_OVER; } @@ -3726,13 +3853,13 @@ static bool getInfoFromJsonFile(char* file) { } else if (0 == strcasecmp("subscribe", filetype->valuestring)) { g_args.test_mode = SUBSCRIBE_MODE; } else { - printf("failed to read json, filetype not support\n"); + printf("ERROR: failed to read json, filetype not support\n"); goto PARSE_OVER; } } else if (!filetype) { g_args.test_mode = INSERT_MODE; } else { - printf("failed to read json, filetype not found\n"); + printf("ERROR: failed to read json, filetype not found\n"); goto PARSE_OVER; } @@ -3743,7 +3870,7 @@ static bool getInfoFromJsonFile(char* file) { } else if (SUBSCRIBE_MODE == g_args.test_mode) { ret = getMetaFromQueryJsonFile(root); } else { - printf("input json file type error! please input correct file type: insert or query or subscribe\n"); + printf("ERROR: input json file type error! please input correct file type: insert or query or subscribe\n"); goto PARSE_OVER; } @@ -3754,7 +3881,7 @@ PARSE_OVER: return ret; } -void prePareSampleData() { +void prepareSampleData() { for (int i = 0; i < g_Dbs.dbCount; i++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { //if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) { @@ -3792,22 +3919,26 @@ void postFreeResource() { } } -int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* superTblInfo, int* sampleUsePos, FILE *fp, char* sampleBuf) { +static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp, + SSuperTable* superTblInfo, int* sampleUsePos) { if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) { - int ret = readSampleFromCsvFileToMem(fp, superTblInfo, sampleBuf); + int ret = readSampleFromCsvFileToMem(superTblInfo); if (0 != ret) { + tmfree(superTblInfo->sampleDataBuf); return -1; } *sampleUsePos = 0; } int dataLen = 0; - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp); - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%s", sampleBuf + superTblInfo->lenOfOneRow * (*sampleUsePos)); + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + "(%" PRId64 ", ", timestamp); + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + "%s", superTblInfo->sampleDataBuf + superTblInfo->lenOfOneRow * (*sampleUsePos)); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); (*sampleUsePos)++; - + return dataLen; } @@ -3858,14 +3989,12 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* } static void syncWriteForNumberOfTblInOneSql( - threadInfo *winfo, FILE *fp, char* sampleDataBuf) { + threadInfo *winfo, char* sampleDataBuf) { SSuperTable* superTblInfo = winfo->superTblInfo; int samplePos = 0; //printf("========threadID[%d], table rang: %d - %d \n", winfo->threadID, winfo->start_table_id, winfo->end_table_id); - int64_t totalRowsInserted = 0; - int64_t totalAffectedRows = 0; int64_t lastPrintTime = taosGetTimestampMs(); char* buffer = calloc(superTblInfo->maxSqlLen+1, 1); @@ -3883,12 +4012,13 @@ static void syncWriteForNumberOfTblInOneSql( uint64_t time_counter = winfo->start_time; int sampleUsePos; + int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; int64_t st = 0; - int64_t et = 0; + int64_t et = 0xffffffff; for (int i = 0; i < superTblInfo->insertRows;) { int32_t tbl_id = 0; - for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { - int64_t tmp_time = 0; + for (int tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id; ) { + int64_t start_time = 0; int inserted = i; for (int k = 0; k < g_args.num_of_RPR;) { @@ -3896,12 +4026,12 @@ static void syncWriteForNumberOfTblInOneSql( memset(buffer, 0, superTblInfo->maxSqlLen); char *pstr = buffer; - int32_t end_tbl_id = tID + numberOfTblInOneSql; + int32_t end_tbl_id = tableSeq + numberOfTblInOneSql; if (end_tbl_id > winfo->end_table_id) { end_tbl_id = winfo->end_table_id+1; } - for (tbl_id = tID; tbl_id < end_tbl_id; tbl_id++) { + for (tbl_id = tableSeq ; tbl_id < end_tbl_id; tbl_id++) { sampleUsePos = samplePos; if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { char* tagsValBuf = NULL; @@ -3969,18 +4099,16 @@ static void syncWriteForNumberOfTblInOneSql( } } - tmp_time = time_counter; - for (k = 0; k < superTblInfo->rowsPerTbl;) { + start_time = time_counter; + for (int j = 0; j < superTblInfo->rowsPerTbl;) { int retLen = 0; if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { - retLen = getRowDataFromSample(pstr + len, - superTblInfo->maxSqlLen - len, - tmp_time += superTblInfo->timeStampStep, - superTblInfo, - &sampleUsePos, - fp, - sampleDataBuf); + retLen = getRowDataFromSample(pstr + len, + superTblInfo->maxSqlLen - len, + start_time += superTblInfo->timeStampStep, + superTblInfo, + &sampleUsePos); if (retLen < 0) { goto free_and_statistics; } @@ -3989,15 +4117,15 @@ static void syncWriteForNumberOfTblInOneSql( int rand_num = rand_tinyint() % 100; if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) { - int64_t d = tmp_time - rand() % superTblInfo->disorderRange; - retLen = generateRowData(pstr + len, - superTblInfo->maxSqlLen - len, - d, + int64_t d = start_time - rand() % superTblInfo->disorderRange; + retLen = generateRowData(pstr + len, + superTblInfo->maxSqlLen - len, + d, superTblInfo); } else { - retLen = generateRowData(pstr + len, - superTblInfo->maxSqlLen - len, - tmp_time += superTblInfo->timeStampStep, + retLen = generateRowData(pstr + len, + superTblInfo->maxSqlLen - len, + start_time += superTblInfo->timeStampStep, superTblInfo); } if (retLen < 0) { @@ -4006,12 +4134,12 @@ static void syncWriteForNumberOfTblInOneSql( } len += retLen; //inserted++; - k++; - totalRowsInserted++; + j++; + winfo->totalInsertRows++; if (inserted >= superTblInfo->insertRows || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { - tID = tbl_id + 1; + tableSeq = tbl_id + 1; printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", superTblInfo->lenOfOneRow); goto send_to_server; @@ -4019,18 +4147,18 @@ static void syncWriteForNumberOfTblInOneSql( } } - tID = tbl_id; + tableSeq = tbl_id; inserted += superTblInfo->rowsPerTbl; send_to_server: - if (g_args.insert_interval && (g_args.insert_interval > (et - st))) { - int sleep_time = g_args.insert_interval - (et -st); + if (insert_interval) { + st = taosGetTimestampUs(); + + if (insert_interval > ((et - st)/1000)) { + int sleep_time = insert_interval - (et -st); printf("sleep: %d ms insert interval\n", sleep_time); taosMsleep(sleep_time); // ms - } - - if (g_args.insert_interval) { - st = taosGetTimestampMs(); + } } if (0 == strncasecmp(superTblInfo->insertMode, @@ -4064,7 +4192,7 @@ send_to_server: if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", winfo->threadID, - winfo->totalRowsInserted, + winfo->totalInsertRows, winfo->totalAffectedRows); lastPrintTime = currentPrintTime; } @@ -4081,19 +4209,19 @@ send_to_server: goto free_and_statistics; } } - if (g_args.insert_interval) { - et = taosGetTimestampMs(); + if (insert_interval) { + et = taosGetTimestampUs(); } break; } - if (tID > winfo->end_table_id) { + if (tableSeq > winfo->end_table_id) { if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { samplePos = sampleUsePos; } i = inserted; - time_counter = tmp_time; + time_counter = start_time; } } @@ -4102,14 +4230,13 @@ send_to_server: free_and_statistics: tmfree(buffer); - winfo->totalRowsInserted = totalRowsInserted; - winfo->totalAffectedRows = totalAffectedRows; - printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n", winfo->threadID, totalRowsInserted, totalAffectedRows); + printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n", + winfo->threadID, winfo->totalInsertRows, winfo->totalAffectedRows); return; } int32_t generateData(char *res, char **data_type, - int num_of_cols, int64_t timestamp, int len_of_binary) { + int num_of_cols, int64_t timestamp, int lenOfBinary) { memset(res, 0, MAX_DATA_SIZE); char *pstr = res; pstr += sprintf(pstr, "(%" PRId64, timestamp); @@ -4144,13 +4271,13 @@ int32_t generateData(char *res, char **data_type, bool b = rand() & 1; pstr += sprintf(pstr, ", %s", b ? "true" : "false"); } else if (strcasecmp(data_type[i % c], "binary") == 0) { - char *s = malloc(len_of_binary); - rand_string(s, len_of_binary); + char *s = malloc(lenOfBinary); + rand_string(s, lenOfBinary); pstr += sprintf(pstr, ", \"%s\"", s); free(s); }else if (strcasecmp(data_type[i % c], "nchar") == 0) { - char *s = malloc(len_of_binary); - rand_string(s, len_of_binary); + char *s = malloc(lenOfBinary); + rand_string(s, lenOfBinary); pstr += sprintf(pstr, ", \"%s\"", s); free(s); } @@ -4166,6 +4293,237 @@ int32_t generateData(char *res, char **data_type, return (int32_t)(pstr - res); } +static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { + char* sampleDataBuf = NULL; + + // each thread read sample data from csv file + if (0 == strncasecmp(superTblInfo->dataSource, + "sample", + strlen("sample"))) { + sampleDataBuf = calloc( + superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); + if (sampleDataBuf == NULL) { + fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n", + superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, + strerror(errno)); + return -1; + } + + int ret = readSampleFromCsvFileToMem(superTblInfo); + if (0 != ret) { + tmfree(superTblInfo->sampleDataBuf); + return -1; + } + } + + superTblInfo->sampleDataBuf = sampleDataBuf; + + return 0; +} + +static int execInsert(threadInfo *winfo, char *buffer, int k) +{ + int affectedRows; + SSuperTable* superTblInfo = winfo->superTblInfo; + + if (superTblInfo) { + if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); + affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); + } else { + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); + int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); + + if (0 != retCode) { + affectedRows = -1; + printf("========restful return fail, threadID[%d]\n", winfo->threadID); + } else { + affectedRows = k; + } + } + } else { + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); + affectedRows = queryDbExec(winfo->taos, buffer, 1); + } + + if (0 > affectedRows){ + return affectedRows; + } + + return affectedRows; +} + +static int generateDataBuffer(int32_t tableSeq, + threadInfo *pThreadInfo, char *buffer, + int64_t insertRows, + int64_t startFrom, int64_t startTime, int *pSampleUsePos) +{ + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + + int ncols_per_record = 1; // count first col ts + + if (superTblInfo == NULL) { + int datatypeSeq = 0; + while(g_args.datatype[datatypeSeq]) { + datatypeSeq ++; + ncols_per_record ++; + } + } + + assert(buffer != NULL); + + char *pChildTblName; + int childTblCount; + + if (superTblInfo && (superTblInfo->childTblOffset > 0)) { + // TODO + // select tbname from stb limit 1 offset tableSeq + getChildNameOfSuperTableWithLimitAndOffset(pThreadInfo->taos, + pThreadInfo->db_name, superTblInfo->sTblName, + &pChildTblName, &childTblCount, + 1, tableSeq); + } else { + pChildTblName = calloc(TSDB_TABLE_NAME_LEN, 1); + if (NULL == pChildTblName) { + fprintf(stderr, "failed to alloc memory %d\n", TSDB_TABLE_NAME_LEN); + return -1; + } + snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s%d", + superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, tableSeq); + } + + memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); + + char *pstr = buffer; + + if (superTblInfo) { + if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { + char* tagsValBuf = NULL; + if (0 == superTblInfo->tagSource) { + tagsValBuf = generateTagVaulesForStb(superTblInfo); + } else { + tagsValBuf = getTagValueFromTagSample( + superTblInfo, + tableSeq % superTblInfo->tagSampleCount); + } + if (NULL == tagsValBuf) { + fprintf(stderr, "tag buf failed to allocate memory\n"); + free(pChildTblName); + return -1; + } + + pstr += snprintf(pstr, + superTblInfo->maxSqlLen, + "insert into %s.%s using %s.%s tags %s values", + pThreadInfo->db_name, + pChildTblName, + pThreadInfo->db_name, + superTblInfo->sTblName, + tagsValBuf); + tmfree(tagsValBuf); + } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { + pstr += snprintf(pstr, + superTblInfo->maxSqlLen, + "insert into %s.%s values", + pThreadInfo->db_name, + superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); + } else { + pstr += snprintf(pstr, + (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), + "insert into %s.%s values", + pThreadInfo->db_name, + pChildTblName); + } + } else { + pstr += snprintf(pstr, + (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), + "insert into %s.%s values", + pThreadInfo->db_name, + pChildTblName); + } + + int k; + int len = 0; + + verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); + for (k = 0; k < g_args.num_of_RPR;) { + if (superTblInfo) { + int retLen = 0; + + if (0 == strncasecmp(superTblInfo->dataSource, + "sample", strlen("sample"))) { + retLen = getRowDataFromSample( + pstr + len, + superTblInfo->maxSqlLen - len, + startTime + superTblInfo->timeStampStep * startFrom, + superTblInfo, + pSampleUsePos); + } else if (0 == strncasecmp(superTblInfo->dataSource, + "rand", strlen("rand"))) { + int rand_num = rand_tinyint() % 100; + if (0 != superTblInfo->disorderRatio + && rand_num < superTblInfo->disorderRatio) { + int64_t d = startTime - rand() % superTblInfo->disorderRange; + retLen = generateRowData( + pstr + len, + superTblInfo->maxSqlLen - len, + d, + superTblInfo); + //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, start_time, d); + } else { + retLen = generateRowData( + pstr + len, + superTblInfo->maxSqlLen - len, + startTime + superTblInfo->timeStampStep * startFrom, + superTblInfo); + } + + if (retLen < 0) { + free(pChildTblName); + return -1; + } + + len += retLen; + } + } else { + int rand_num = rand() % 100; + char data[MAX_DATA_SIZE]; + char **data_type = g_args.datatype; + int lenOfBinary = g_args.len_of_binary; + + if ((g_args.disorderRatio != 0) + && (rand_num < g_args.disorderRange)) { + + int64_t d = startTime - rand() % 1000000 + rand_num; + len = generateData(data, data_type, + ncols_per_record, d, lenOfBinary); + } else { + len = generateData(data, data_type, + ncols_per_record, + startTime + DEFAULT_TIMESTAMP_STEP * startFrom, + lenOfBinary); + } + + //assert(len + pstr - buffer < BUFFER_SIZE); + if (len + pstr - buffer >= g_args.max_sql_len) { // too long + break; + } + + pstr += sprintf(pstr, " %s", data); + } + + verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); + + k++; + startFrom ++; + + if (startFrom >= insertRows) + break; + } + + return k; +} + // sync insertion /* 1 thread: 100 tables * 2000 rows/s @@ -4176,327 +4534,73 @@ int32_t generateData(char *res, char **data_type, */ static void* syncWrite(void *sarg) { - threadInfo *winfo = (threadInfo *)sarg; - - char buffer[BUFFER_SIZE] = "\0"; - char data[MAX_DATA_SIZE]; - char **data_type = g_args.datatype; - int len_of_binary = g_args.len_of_binary; - - int ncols_per_record = 1; // count first col ts - int i = 0; - while(g_args.datatype[i]) { - i ++; - ncols_per_record ++; - } - - srand((uint32_t)time(NULL)); - int64_t time_counter = winfo->start_time; - - uint64_t st = 0; - uint64_t et = 0; - - winfo->totalRowsInserted = 0; - winfo->totalAffectedRows = 0; - - for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - int64_t tmp_time = time_counter; - - for (int i = 0; i < g_args.num_of_DPT;) { - - int tblInserted = i; - - char *pstr = buffer; - pstr += sprintf(pstr, - "insert into %s.%s%d values ", - winfo->db_name, g_args.tb_prefix, tID); - int k; - for (k = 0; k < g_args.num_of_RPR;) { - int rand_num = rand() % 100; - int len = -1; - - if ((g_args.disorderRatio != 0) - && (rand_num < g_args.disorderRange)) { - - int64_t d = tmp_time - rand() % 1000000 + rand_num; - len = generateData(data, data_type, - ncols_per_record, d, len_of_binary); - } else { - len = generateData(data, data_type, - ncols_per_record, tmp_time += 1000, len_of_binary); - } - - //assert(len + pstr - buffer < BUFFER_SIZE); - if (len + pstr - buffer >= BUFFER_SIZE) { // too long - break; - } - - pstr += sprintf(pstr, " %s", data); - tblInserted++; - k++; - i++; - - if (tblInserted >= g_args.num_of_DPT) - break; - } - - winfo->totalRowsInserted += k; - /* puts(buffer); */ - int64_t startTs; - int64_t endTs; - startTs = taosGetTimestampUs(); - //queryDB(winfo->taos, buffer); - if (i > 0 && g_args.insert_interval - && (g_args.insert_interval > (et - st) )) { - int sleep_time = g_args.insert_interval - (et -st); - printf("sleep: %d ms specified by insert_interval\n", sleep_time); - taosMsleep(sleep_time); // ms - } - - if (g_args.insert_interval) { - st = taosGetTimestampMs(); - } - verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); - int affectedRows = queryDbExec(winfo->taos, buffer, 1); - - if (0 < affectedRows){ - endTs = taosGetTimestampUs(); - int64_t delay = endTs - startTs; - if (delay > winfo->maxDelay) - winfo->maxDelay = delay; - if (delay < winfo->minDelay) - winfo->minDelay = delay; - winfo->cntDelay++; - winfo->totalDelay += delay; - winfo->totalAffectedRows += affectedRows; - winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; - } else { - fprintf(stderr, "queryDbExec() buffer:\n%s\naffected rows is %d", buffer, affectedRows); - } - - verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64" tblInserted=%d\n", __func__, __LINE__, winfo->totalAffectedRows, tblInserted); - if (g_args.insert_interval) { - et = taosGetTimestampMs(); - } - - if (tblInserted >= g_args.num_of_DPT) { - break; - } - } // num_of_DPT - } // tId - - printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", - winfo->threadID, - winfo->totalRowsInserted, - winfo->totalAffectedRows); - - return NULL; -} - - -static void* syncWriteWithStb(void *sarg) { - uint64_t lastPrintTime = taosGetTimestampMs(); - threadInfo *winfo = (threadInfo *)sarg; SSuperTable* superTblInfo = winfo->superTblInfo; - FILE *fp = NULL; - char* sampleDataBuf = NULL; + + if (superTblInfo) { + if (0 != prepareSampleDataForSTable(superTblInfo)) + return NULL; + + if (superTblInfo->numberOfTblInOneSql > 0) { + syncWriteForNumberOfTblInOneSql(winfo, superTblInfo->sampleDataBuf); + tmfree(superTblInfo->sampleDataBuf); + return NULL; + } + } + int samplePos = 0; - // each thread read sample data from csv file - if (0 == strncasecmp(superTblInfo->dataSource, - "sample", - strlen("sample"))) { - sampleDataBuf = calloc( - superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); - if (sampleDataBuf == NULL) { - printf("Failed to calloc %d Bytes, reason:%s\n", - superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, - strerror(errno)); - return NULL; - } - - fp = fopen(superTblInfo->sampleFile, "r"); - if (fp == NULL) { - printf("Failed to open sample file: %s, reason:%s\n", - superTblInfo->sampleFile, strerror(errno)); - tmfree(sampleDataBuf); - return NULL; - } - int ret = readSampleFromCsvFileToMem(fp, - superTblInfo, sampleDataBuf); - if (0 != ret) { - tmfree(sampleDataBuf); - tmfclose(fp); - return NULL; - } - } - - if (superTblInfo->numberOfTblInOneSql > 0) { - syncWriteForNumberOfTblInOneSql(winfo, fp, sampleDataBuf); - tmfree(sampleDataBuf); - tmfclose(fp); - return NULL; - } - - char* buffer = calloc(superTblInfo->maxSqlLen, 1); + char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); if (NULL == buffer) { - printf("Failed to calloc %d Bytes, reason:%s\n", + fprintf(stderr, "Failed to alloc %d Bytes, reason:%s\n", superTblInfo->maxSqlLen, strerror(errno)); - tmfree(sampleDataBuf); - tmfclose(fp); + tmfree(superTblInfo->sampleDataBuf); return NULL; } - uint64_t st = 0; - uint64_t et = 0; + int64_t lastPrintTime = taosGetTimestampMs(); + int64_t startTs = taosGetTimestampUs(); + int64_t endTs; - winfo->totalRowsInserted = 0; + int insert_interval = superTblInfo?superTblInfo->insertInterval: + g_args.insert_interval; + uint64_t st = 0; + uint64_t et = 0xffffffff; + + winfo->totalInsertRows = 0; winfo->totalAffectedRows = 0; int sampleUsePos; - verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); - - for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; - tID++) { + for (uint32_t tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id; + tableSeq ++) { int64_t start_time = winfo->start_time; - for (int i = 0; i < superTblInfo->insertRows;) { + int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; + verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); - int64_t tblInserted = i; - - if (i > 0 && superTblInfo->insertInterval - && (superTblInfo->insertInterval > (et - st) )) { - int sleep_time = superTblInfo->insertInterval - (et -st); - printf("sleep: %d ms insert interval\n", sleep_time); - taosMsleep(sleep_time); // ms - } - - if (superTblInfo->insertInterval) { - st = taosGetTimestampMs(); + for (int64_t i = 0; i < insertRows;) { + if (insert_interval) { + st = taosGetTimestampUs(); } sampleUsePos = samplePos; - verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); - memset(buffer, 0, superTblInfo->maxSqlLen); - int len = 0; + int generated = generateDataBuffer(tableSeq, winfo, buffer, insertRows, + i, start_time, &sampleUsePos); + if (generated > 0) + i += generated; + else + goto free_and_statistics_2; - char *pstr = buffer; + int affectedRows = execInsert(winfo, buffer, generated); + if (affectedRows < 0) + goto free_and_statistics_2; - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { - char* tagsValBuf = NULL; - if (0 == superTblInfo->tagSource) { - tagsValBuf = generateTagVaulesForStb(superTblInfo); - } else { - tagsValBuf = getTagValueFromTagSample( - superTblInfo, - tID % superTblInfo->tagSampleCount); - } - if (NULL == tagsValBuf) { - goto free_and_statistics_2; - } - - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - "insert into %s.%s%d using %s.%s tags %s values", - winfo->db_name, - superTblInfo->childTblPrefix, - tID, - winfo->db_name, - superTblInfo->sTblName, - tagsValBuf); - tmfree(tagsValBuf); - } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - "insert into %s.%s values", - winfo->db_name, - superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN); - } else { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - "insert into %s.%s%d values", - winfo->db_name, - superTblInfo->childTblPrefix, - tID); - } - - int k; - for (k = 0; k < g_args.num_of_RPR;) { - int retLen = 0; - if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { - retLen = getRowDataFromSample( - pstr + len, - superTblInfo->maxSqlLen - len, - start_time + superTblInfo->timeStampStep * i, - superTblInfo, - &sampleUsePos, - fp, - sampleDataBuf); - if (retLen < 0) { - goto free_and_statistics_2; - } - } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { - int rand_num = rand_tinyint() % 100; - if (0 != superTblInfo->disorderRatio - && rand_num < superTblInfo->disorderRatio) { - int64_t d = start_time - rand() % superTblInfo->disorderRange; - retLen = generateRowData( - pstr + len, - superTblInfo->maxSqlLen - len, - d, - superTblInfo); - //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d); - } else { - retLen = generateRowData( - pstr + len, - superTblInfo->maxSqlLen - len, - start_time + superTblInfo->timeStampStep * i, - superTblInfo); - } - if (retLen < 0) { - goto free_and_statistics_2; - } - } - - len += retLen; - verbosePrint("%s() LN%d retLen=%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, retLen, len, k, buffer); - - tblInserted++; - k++; - i++; - - if (tblInserted >= superTblInfo->insertRows) - break; - } - - winfo->totalRowsInserted += k; - - int64_t startTs = taosGetTimestampUs(); - int64_t endTs; - int affectedRows; - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { - verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); - affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); - - if (0 > affectedRows){ - goto free_and_statistics_2; - } - } else { - verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); - int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); - - if (0 != retCode) { - printf("========restful return fail, threadID[%d]\n", winfo->threadID); - goto free_and_statistics_2; - } - - affectedRows = k; - } + winfo->totalInsertRows += generated; + winfo->totalAffectedRows += affectedRows; endTs = taosGetTimestampUs(); int64_t delay = endTs - startTs; @@ -4505,53 +4609,57 @@ static void* syncWriteWithStb(void *sarg) { winfo->cntDelay++; winfo->totalDelay += delay; - winfo->totalAffectedRows += affectedRows; - int64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", winfo->threadID, - winfo->totalRowsInserted, + winfo->totalInsertRows, winfo->totalAffectedRows); lastPrintTime = currentPrintTime; } - if (superTblInfo->insertInterval) { - et = taosGetTimestampMs(); - } - - if (tblInserted >= superTblInfo->insertRows) + if (i >= insertRows) break; + + if (insert_interval) { + et = taosGetTimestampUs(); + + if (insert_interval > ((et - st)/1000) ) { + int sleep_time = insert_interval - (et -st)/1000; + verbosePrint("%s() LN%d sleep: %d ms for insert interval\n", __func__, __LINE__, sleep_time); + taosMsleep(sleep_time); // ms + } + } } // num_of_DPT - if (tID == winfo->end_table_id) { - if (0 == strncasecmp( - superTblInfo->dataSource, "sample", strlen("sample"))) { + if ((tableSeq == winfo->end_table_id) && superTblInfo && + (0 == strncasecmp( + superTblInfo->dataSource, "sample", strlen("sample")))) { samplePos = sampleUsePos; - } - } - } // tID + } // tableSeq free_and_statistics_2: tmfree(buffer); - tmfree(sampleDataBuf); - tmfclose(fp); + if (superTblInfo) + tmfree(superTblInfo->sampleDataBuf); printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", winfo->threadID, - winfo->totalRowsInserted, + winfo->totalInsertRows, winfo->totalAffectedRows); return NULL; } void callBack(void *param, TAOS_RES *res, int code) { threadInfo* winfo = (threadInfo*)param; + SSuperTable* superTblInfo = winfo->superTblInfo; - if (g_args.insert_interval) { - winfo->et = taosGetTimestampMs(); - if (winfo->et - winfo->st < 1000) { - taosMsleep(1000 - (winfo->et - winfo->st)); // ms + int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; + if (insert_interval) { + winfo->et = taosGetTimestampUs(); + if (((winfo->et - winfo->st)/1000) < insert_interval) { + taosMsleep(insert_interval - (winfo->et - winfo->st)/1000); // ms } } @@ -4580,7 +4688,7 @@ void callBack(void *param, TAOS_RES *res, int code) { //generateData(data, datatype, ncols_per_record, d, len_of_binary); (void)generateRowData(data, MAX_DATA_SIZE, d, winfo->superTblInfo); } else { - //generateData(data, datatype, ncols_per_record, tmp_time += 1000, len_of_binary); + //generateData(data, datatype, ncols_per_record, start_time += 1000, len_of_binary); (void)generateRowData(data, MAX_DATA_SIZE, winfo->lastTs += 1000, winfo->superTblInfo); } pstr += sprintf(pstr, "%s", data); @@ -4591,8 +4699,8 @@ void callBack(void *param, TAOS_RES *res, int code) { } } - if (g_args.insert_interval) { - winfo->st = taosGetTimestampMs(); + if (insert_interval) { + winfo->st = taosGetTimestampUs(); } taos_query_a(winfo->taos, buffer, callBack, winfo); free(buffer); @@ -4603,13 +4711,15 @@ void callBack(void *param, TAOS_RES *res, int code) { void *asyncWrite(void *sarg) { threadInfo *winfo = (threadInfo *)sarg; + SSuperTable* superTblInfo = winfo->superTblInfo; winfo->st = 0; winfo->et = 0; winfo->lastTs = winfo->start_time; - if (g_args.insert_interval) { - winfo->st = taosGetTimestampMs(); + int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; + if (insert_interval) { + winfo->st = taosGetTimestampUs(); } taos_query_a(winfo->taos, "show databases", callBack, winfo); @@ -4618,17 +4728,24 @@ void *asyncWrite(void *sarg) { return NULL; } -void startMultiThreadInsertData(int threads, char* db_name, char* precision, - SSuperTable* superTblInfo) { +static void startMultiThreadInsertData(int threads, char* db_name, + char* precision,SSuperTable* superTblInfo) { pthread_t *pids = malloc(threads * sizeof(pthread_t)); + assert(pids != NULL); + threadInfo *infos = malloc(threads * sizeof(threadInfo)); + assert(infos != NULL); + memset(pids, 0, threads * sizeof(pthread_t)); memset(infos, 0, threads * sizeof(threadInfo)); int ntables = 0; if (superTblInfo) - ntables = superTblInfo->childTblCount; + if (superTblInfo->childTblOffset) + ntables = superTblInfo->childTblLimit; + else + ntables = superTblInfo->childTblCount; else ntables = g_args.num_of_tables; @@ -4659,7 +4776,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, } else if (0 == strncasecmp(precision, "us", 2)) { timePrec = TSDB_TIME_PRECISION_MICRO; } else { - printf("No support precision: %s\n", precision); + fprintf(stderr, "No support precision: %s\n", precision); exit(-1); } } @@ -4668,11 +4785,11 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, if (superTblInfo) { if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { start_time = taosGetTimestamp(timePrec); - } else { + } else { if (TSDB_CODE_SUCCESS != taosParseTime( - superTblInfo->startTimestamp, - &start_time, - strlen(superTblInfo->startTimestamp), + superTblInfo->startTimestamp, + &start_time, + strlen(superTblInfo->startTimestamp), timePrec, 0)) { printf("ERROR to parse time!\n"); exit(-1); @@ -4683,8 +4800,14 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, } double start = getCurrentTime(); - - int last = 0; + + int last; + + if ((superTblInfo) && (superTblInfo->childTblOffset)) + last = superTblInfo->childTblOffset; + else + last = 0; + for (int i = 0; i < threads; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; @@ -4698,7 +4821,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) { //t_info->taos = taos; t_info->taos = taos_connect( - g_Dbs.host, g_Dbs.user, + g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); if (NULL == t_info->taos) { printf("connect to server fail from insert sub thread, reason: %s\n", @@ -4722,16 +4845,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, tsem_init(&(t_info->lock_sem), 0, 0); if (SYNC == g_Dbs.queryMode) { - if (superTblInfo) { - pthread_create(pids + i, NULL, syncWriteWithStb, t_info); - } else { - pthread_create(pids + i, NULL, syncWrite, t_info); - } - } else { + pthread_create(pids + i, NULL, syncWrite, t_info); + } else { pthread_create(pids + i, NULL, asyncWrite, t_info); } } - + for (int i = 0; i < threads; i++) { pthread_join(pids[i], NULL); } @@ -4750,13 +4869,13 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, if (superTblInfo) { superTblInfo->totalAffectedRows += t_info->totalAffectedRows; - superTblInfo->totalRowsInserted += t_info->totalRowsInserted; + superTblInfo->totalInsertRows += t_info->totalInsertRows; } totalDelay += t_info->totalDelay; cntDelay += t_info->cntDelay; if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; - if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; + if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; } cntDelay -= 1; @@ -4767,16 +4886,17 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, double t = end - start; if (superTblInfo) { - printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", - t, superTblInfo->totalRowsInserted, + printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", + t, superTblInfo->totalInsertRows, superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName, - superTblInfo->totalRowsInserted / t); - fprintf(g_fpOfInsertResult, "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", - t, superTblInfo->totalRowsInserted, + superTblInfo->totalInsertRows / t); + fprintf(g_fpOfInsertResult, + "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", + t, superTblInfo->totalInsertRows, superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName, - superTblInfo->totalRowsInserted / t); + superTblInfo->totalInsertRows/ t); } printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n", @@ -4787,10 +4907,9 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, //taos_close(taos); free(pids); - free(infos); + free(infos); } - void *readTable(void *sarg) { #if 1 threadInfo *rinfo = (threadInfo *)sarg; @@ -4935,7 +5054,7 @@ void *readMetric(void *sarg) { } -int insertTestProcess() { +static int insertTestProcess() { setupForAnsiEscape(); int ret = printfInsertMeta(); @@ -4949,15 +5068,15 @@ int insertTestProcess() { if (NULL == g_fpOfInsertResult) { fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile); return -1; - } { - printfInsertMetaToFile(g_fpOfInsertResult); } + printfInsertMetaToFile(g_fpOfInsertResult); + if (!g_args.answer_yes) { printf("Press enter key to continue\n\n"); (void)getchar(); } - + init_rand_data(); // create database and super tables @@ -4967,7 +5086,7 @@ int insertTestProcess() { } // pretreatement - prePareSampleData(); + prepareSampleData(); double start; double end; @@ -4978,47 +5097,47 @@ int insertTestProcess() { end = getCurrentTime(); if (g_totalChildTables > 0) { - printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n", + printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n", end - start, g_totalChildTables, g_Dbs.threadCount); - fprintf(g_fpOfInsertResult, - "Spent %.4f seconds to create %d tables with %d thread(s)\n\n", + fprintf(g_fpOfInsertResult, + "Spent %.4f seconds to create %d tables with %d thread(s)\n\n", end - start, g_totalChildTables, g_Dbs.threadCount); } taosMsleep(1000); // create sub threads for inserting data //start = getCurrentTime(); - for (int i = 0; i < g_Dbs.dbCount; i++) { - if (g_Dbs.db[i].superTblCount > 0) { - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { - SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; - if (0 == g_Dbs.db[i].superTbls[j].insertRows) { - continue; - } - startMultiThreadInsertData( - g_Dbs.threadCount, - g_Dbs.db[i].dbName, - g_Dbs.db[i].dbCfg.precision, - superTblInfo); - } - } else { - startMultiThreadInsertData( - g_Dbs.threadCount, - g_Dbs.db[i].dbName, - g_Dbs.db[i].dbCfg.precision, - NULL); + for (int i = 0; i < g_Dbs.dbCount; i++) { + if (g_Dbs.db[i].superTblCount > 0) { + for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; + if (0 == g_Dbs.db[i].superTbls[j].insertRows) { + continue; } + startMultiThreadInsertData( + g_Dbs.threadCount, + g_Dbs.db[i].dbName, + g_Dbs.db[i].dbCfg.precision, + superTblInfo); + } + } else { + startMultiThreadInsertData( + g_Dbs.threadCount, + g_Dbs.db[i].dbName, + g_Dbs.db[i].dbCfg.precision, + NULL); } + } //end = getCurrentTime(); - //int64_t totalRowsInserted = 0; + //int64_t totalInsertRows = 0; //int64_t totalAffectedRows = 0; //for (int i = 0; i < g_Dbs.dbCount; i++) { // for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { - // totalRowsInserted += g_Dbs.db[i].superTbls[j].totalRowsInserted; + // totalInsertRows+= g_Dbs.db[i].superTbls[j].totalInsertRows; // totalAffectedRows += g_Dbs.db[i].superTbls[j].totalAffectedRows; //} - //printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s)\n\n", end - start, totalRowsInserted, totalAffectedRows, g_Dbs.threadCount); + //printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s)\n\n", end - start, totalInsertRows, totalAffectedRows, g_Dbs.threadCount); postFreeResource(); return 0; @@ -5039,7 +5158,7 @@ void *superQueryProcess(void *sarg) { //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id); } - st = taosGetTimestampMs(); + st = taosGetTimestampUs(); for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { int64_t t1 = taosGetTimestampUs(); @@ -5065,14 +5184,14 @@ void *superQueryProcess(void *sarg) { } } } - et = taosGetTimestampMs(); + et = taosGetTimestampUs(); printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0); } return NULL; } -void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { +static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { char sourceString[32] = "xxxx"; char subTblName[MAX_TB_NAME_SIZE*3]; sprintf(subTblName, "%s.%s", @@ -5094,7 +5213,7 @@ void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { //printf("3: %s\n", outSql); } -void *subQueryProcess(void *sarg) { +static void *subQueryProcess(void *sarg) { char sqlstr[1024]; threadInfo *winfo = (threadInfo *)sarg; int64_t st = 0; @@ -5105,7 +5224,7 @@ void *subQueryProcess(void *sarg) { //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id); } - st = taosGetTimestampMs(); + st = taosGetTimestampUs(); for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) { for (int j = 0; j < g_queryInfo.subQueryInfo.sqlCount; j++) { memset(sqlstr,0,sizeof(sqlstr)); @@ -5119,12 +5238,12 @@ void *subQueryProcess(void *sarg) { selectAndGetResult(winfo->taos, sqlstr, tmpFile); } } - et = taosGetTimestampMs(); + et = taosGetTimestampUs(); printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n", taosGetSelfPthreadId(), winfo->start_table_id, winfo->end_table_id, - (double)(et - st)/1000.0); + (double)(et - st)/1000000.0); } return NULL; } @@ -5142,10 +5261,10 @@ static int queryTestProcess() { } if (0 != g_queryInfo.subQueryInfo.sqlCount) { - (void)getAllChildNameOfSuperTable(taos, - g_queryInfo.dbName, - g_queryInfo.subQueryInfo.sTblName, - &g_queryInfo.subQueryInfo.childTblName, + getAllChildNameOfSuperTable(taos, + g_queryInfo.dbName, + g_queryInfo.subQueryInfo.sTblName, + &g_queryInfo.subQueryInfo.childTblName, &g_queryInfo.subQueryInfo.childTblCount); } @@ -5162,7 +5281,7 @@ static int queryTestProcess() { threadInfo *infos = NULL; //==== create sub threads for query from specify table if (g_queryInfo.superQueryInfo.sqlCount > 0 && g_queryInfo.superQueryInfo.concurrent > 0) { - + pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t)); infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { @@ -5170,14 +5289,14 @@ static int queryTestProcess() { taos_close(taos); exit(-1); } - - for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { + + for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { threadInfo *t_info = infos + i; - t_info->threadID = i; + t_info->threadID = i; if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { t_info->taos = taos; - + char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); @@ -5186,16 +5305,17 @@ static int queryTestProcess() { t_info->taos = NULL; } - pthread_create(pids + i, NULL, superQueryProcess, t_info); - } + pthread_create(pids + i, NULL, superQueryProcess, t_info); + } }else { g_queryInfo.superQueryInfo.concurrent = 0; } - + pthread_t *pidsOfSub = NULL; threadInfo *infosOfSub = NULL; //==== create sub threads for query from all sub table of the super table - if ((g_queryInfo.subQueryInfo.sqlCount > 0) && (g_queryInfo.subQueryInfo.threadCnt > 0)) { + if ((g_queryInfo.subQueryInfo.sqlCount > 0) + && (g_queryInfo.subQueryInfo.threadCnt > 0)) { pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(pthread_t)); infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo)); if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { @@ -5203,7 +5323,7 @@ static int queryTestProcess() { taos_close(taos); exit(-1); } - + int ntables = g_queryInfo.subQueryInfo.childTblCount; int threads = g_queryInfo.subQueryInfo.threadCnt; @@ -5212,12 +5332,12 @@ static int queryTestProcess() { threads = ntables; a = 1; } - + int b = 0; if (threads != 0) { b = ntables % threads; } - + int last = 0; for (int i = 0; i < threads; i++) { threadInfo *t_info = infosOfSub + i; @@ -5411,7 +5531,7 @@ void *superSubscribeProcess(void *sarg) { } } taos_free_result(res); - + for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress); @@ -5439,14 +5559,13 @@ static int subscribeTestProcess() { } if (0 != g_queryInfo.subQueryInfo.sqlCount) { - (void)getAllChildNameOfSuperTable(taos, + getAllChildNameOfSuperTable(taos, g_queryInfo.dbName, g_queryInfo.subQueryInfo.sTblName, &g_queryInfo.subQueryInfo.childTblName, &g_queryInfo.subQueryInfo.childTblCount); } - pthread_t *pids = NULL; threadInfo *infos = NULL; //==== create sub threads for query from super table @@ -5455,53 +5574,53 @@ static int subscribeTestProcess() { pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t)); infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { - printf("malloc failed for create threads\n"); + printf("malloc failed for create threads\n"); taos_close(taos); exit(-1); } - - for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { + + for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; t_info->taos = taos; pthread_create(pids + i, NULL, superSubscribeProcess, t_info); } } - + //==== create sub threads for query from sub table pthread_t *pidsOfSub = NULL; threadInfo *infosOfSub = NULL; - if ((g_queryInfo.subQueryInfo.sqlCount > 0) + if ((g_queryInfo.subQueryInfo.sqlCount > 0) && (g_queryInfo.subQueryInfo.threadCnt > 0)) { - pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * + pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(pthread_t)); infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo)); if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { - printf("malloc failed for create threads\n"); + printf("malloc failed for create threads\n"); taos_close(taos); exit(-1); } - + int ntables = g_queryInfo.subQueryInfo.childTblCount; int threads = g_queryInfo.subQueryInfo.threadCnt; - + int a = ntables / threads; if (a < 1) { threads = ntables; a = 1; } - + int b = 0; if (threads != 0) { b = ntables % threads; } - + int last = 0; - for (int i = 0; i < threads; i++) { + for (int i = 0; i < threads; i++) { threadInfo *t_info = infosOfSub + i; t_info->threadID = i; - + t_info->start_table_id = last; t_info->end_table_id = i < b ? last + a : last + a - 1; t_info->taos = taos; @@ -5509,20 +5628,20 @@ static int subscribeTestProcess() { } g_queryInfo.subQueryInfo.threadCnt = threads; } - + for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { pthread_join(pids[i], NULL); } tmfree((char*)pids); - tmfree((char*)infos); + tmfree((char*)infos); for (int i = 0; i < g_queryInfo.subQueryInfo.threadCnt; i++) { pthread_join(pidsOfSub[i], NULL); } tmfree((char*)pidsOfSub); - tmfree((char*)infosOfSub); + tmfree((char*)infosOfSub); taos_close(taos); return 0; } @@ -5615,9 +5734,9 @@ void setParaFromArg(){ tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp, "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); - g_Dbs.db[0].superTbls[0].timeStampStep = 10; + g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP; - g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; + g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; g_Dbs.db[0].superTbls[0].columnCount = 0; @@ -5761,14 +5880,7 @@ static void testMetaFile() { } } -static void testCmdLine() { - - g_args.test_mode = INSERT_MODE; - insertTestProcess(); - - if (g_Dbs.insert_only) - return; - +static void queryResult() { // select if (false == g_Dbs.insert_only) { // query data @@ -5814,6 +5926,17 @@ static void testCmdLine() { } } +static void testCmdLine() { + + g_args.test_mode = INSERT_MODE; + insertTestProcess(); + + if (g_Dbs.insert_only) + return; + else + queryResult(); +} + int main(int argc, char *argv[]) { parse_args(argc, argv, &g_args); diff --git a/tests/pytest/tools/taosdemoTestWithoutMetric.py b/tests/pytest/tools/taosdemoTestWithoutMetric.py new file mode 100644 index 0000000000..647d6a37cb --- /dev/null +++ b/tests/pytest/tools/taosdemoTestWithoutMetric.py @@ -0,0 +1,72 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + self.numberOfTables = 100 + self.numberOfRecords = 1000 + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def run(self): + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + binPath = buildPath + "/build/bin/" + os.system("%staosdemo -y -t %d -n %d -x" % + (binPath, self.numberOfTables, self.numberOfRecords)) + + tdSql.query("show databases") + for i in range(18): + print(tdSql.getData(0, i) ) + tdSql.checkData(0, 2, self.numberOfTables) + + tdSql.execute("use test") + tdSql.query( + "select count(*) from test.t%d" % (self.numberOfTables -1)) + tdSql.checkData(0, 0, self.numberOfRecords) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/test-all.sh b/tests/test-all.sh index 4f7afe7d17..db9d6523a0 100755 --- a/tests/test-all.sh +++ b/tests/test-all.sh @@ -316,6 +316,7 @@ if [ "$2" != "sim" ] && [ "$2" != "python" ] && [ "$2" != "unit" ] && [ "$1" == cd debug/ stopTaosd + rm -rf /var/lib/taos/* nohup build/bin/taosd -c /etc/taos/ > /dev/null 2>&1 & sleep 30 @@ -358,6 +359,7 @@ if [ "$2" != "sim" ] && [ "$2" != "python" ] && [ "$2" != "jdbc" ] && [ "$1" == pwd cd debug/build/bin + rm -rf /var/lib/taos/* nohup ./taosd -c /etc/taos/ > /dev/null 2>&1 & sleep 30