diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index f54237306c..f1dd8975dc 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -769,6 +769,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC index = 0; sToken = tStrGetToken(sql, &index, false); + if (sToken.type == TK_ILLEGAL) { + return tscSQLSyntaxErrMsg(pCmd->payload, "unrecognized token", sToken.z); + } + if (sToken.type == TK_RP) { break; } diff --git a/src/connector/python/taos/__init__.py b/src/connector/python/taos/__init__.py index 9732635738..52c6db311e 100644 --- a/src/connector/python/taos/__init__.py +++ b/src/connector/python/taos/__init__.py @@ -2,6 +2,10 @@ from .connection import TDengineConnection from .cursor import TDengineCursor +# For some reason, the following is needed for VS Code (through PyLance) to +# recognize that "error" is a valid module of the "taos" package. +from .error import ProgrammingError + # Globals threadsafety = 0 paramstyle = 'pyformat' diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 3c528ab26f..dcfed8bc2a 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -19,6 +19,7 @@ */ #include +#include #define _GNU_SOURCE #define CURL_STATICLIB @@ -52,6 +53,8 @@ #include "taoserror.h" #include "tutil.h" +#define STMT_IFACE_ENABLED 0 + #define REQ_EXTRA_BUF_LEN 1024 #define RESP_BUF_LEN 4096 @@ -61,6 +64,8 @@ extern char configDir[]; #define QUERY_JSON_NAME "query.json" #define SUBSCRIBE_JSON_NAME "subscribe.json" +#define STR_INSERT_INTO "INSERT INTO " + enum TEST_MODE { INSERT_TEST, // 0 QUERY_TEST, // 1 @@ -70,6 +75,8 @@ enum TEST_MODE { #define MAX_RECORDS_PER_REQ 32766 +#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into .. + #define MAX_SQL_SIZE 65536 #define BUFFER_SIZE (65536*2) #define COND_BUF_LEN BUFFER_SIZE - 30 @@ -120,17 +127,24 @@ enum enumSYNC_MODE { MODE_BUT }; +enum enum_TAOS_INTERFACE { + TAOSC_IFACE, + REST_IFACE, + STMT_IFACE, + INTERFACE_BUT +}; + typedef enum enumQUERY_CLASS { SPECIFIED_CLASS, STABLE_CLASS, CLASS_BUT } QUERY_CLASS; -typedef enum enum_INSERT_MODE { +typedef enum enum_PROGRESSIVE_OR_INTERLACE { PROGRESSIVE_INSERT_MODE, INTERLACE_INSERT_MODE, INVALID_INSERT_MODE -} INSERT_MODE; +} PROG_OR_INTERLACE_MODE; typedef enum enumQUERY_TYPE { NO_INSERT_TYPE, @@ -196,6 +210,7 @@ typedef struct SArguments_S { uint32_t test_mode; char * host; uint16_t port; + uint16_t iface; char * user; char * password; char * database; @@ -217,13 +232,13 @@ typedef struct SArguments_S { uint32_t num_of_threads; uint64_t insert_interval; int64_t query_times; - uint64_t interlace_rows; - uint64_t num_of_RPR; // num_of_records_per_req + uint32_t interlace_rows; + uint32_t num_of_RPR; // num_of_records_per_req uint64_t max_sql_len; int64_t num_of_tables; int64_t num_of_DPT; int abort; - int disorderRatio; // 0: no disorder, >0: x% + uint32_t disorderRatio; // 0: no disorder, >0: x% int disorderRange; // ms or us by database precision uint32_t method_of_delete; char ** arg_list; @@ -240,18 +255,19 @@ typedef struct SColumn_S { typedef struct SSuperTable_S { char sTblName[MAX_TB_NAME_SIZE+1]; + char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample + char childTblPrefix[MAX_TB_NAME_SIZE]; + char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest + uint16_t childTblExists; int64_t childTblCount; - bool childTblExists; // 0: no, 1: yes uint64_t batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table - char childTblPrefix[MAX_TB_NAME_SIZE]; - char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample - char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest + uint16_t iface; // 0: taosc, 1: rest, 2: stmt int64_t childTblLimit; uint64_t childTblOffset; // int multiThreadWriteOneTbl; // 0: no, 1: yes - uint64_t interlaceRows; // + uint32_t interlaceRows; // int disorderRatio; // 0: no disorder, >0: x% int disorderRange; // ms or us by database precision uint64_t maxSqlLen; // @@ -420,6 +436,7 @@ typedef struct SQueryMetaInfo_S { typedef struct SThreadInfo_S { TAOS * taos; + TAOS_STMT *stmt; int threadID; char db_name[MAX_DB_NAME_SIZE+1]; uint32_t time_precision; @@ -434,6 +451,7 @@ typedef struct SThreadInfo_S { char* cols; bool use_metric; SSuperTable* superTblInfo; + char *buffer; // sql cmd buffer // for async insert tsem_t lock_sem; @@ -541,7 +559,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet); static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port, char* sqlstr, threadInfo *pThreadInfo); static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, - int disorderRatio, int disorderRange); + int disorderRatio, int disorderRange); /* ************ Global variables ************ */ @@ -557,6 +575,7 @@ SArguments g_args = { 0, // test_mode "127.0.0.1", // host 6030, // port + TAOSC_IFACE, // iface "root", // user #ifdef _TD_POWER_ "powerdb", // password @@ -673,6 +692,12 @@ static void printHelp() { "The host to connect to TDengine. Default is localhost."); printf("%s%s%s%s\n", indent, "-p", indent, "The TCP/IP port number to use for the connection. Default is 0."); + printf("%s%s%s%s\n", indent, "-I", indent, +#if STMT_IFACE_ENABLED == 1 + "The interface (taosc, rest, and stmt) taosdemo uses. Default is 'taosc'."); +#else + "The interface (taosc, rest) taosdemo uses. Default is 'taosc'."); +#endif printf("%s%s%s%s\n", indent, "-d", indent, "Destination database. Default is 'test'."); printf("%s%s%s%s\n", indent, "-a", indent, @@ -761,6 +786,25 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { exit(EXIT_FAILURE); } arguments->port = atoi(argv[++i]); + } else if (strcmp(argv[i], "-I") == 0) { + if (argc == i+1) { + printHelp(); + errorPrint("%s", "\n\t-I need a valid string following!\n"); + exit(EXIT_FAILURE); + } + ++i; + if (0 == strcasecmp(argv[i], "taosc")) { + arguments->iface = TAOSC_IFACE; + } else if (0 == strcasecmp(argv[i], "rest")) { + arguments->iface = REST_IFACE; +#if STMT_IFACE_ENABLED == 1 + } else if (0 == strcasecmp(argv[i], "stmt")) { + arguments->iface = STMT_IFACE; +#endif + } else { + errorPrint("%s", "\n\t-I need a valid string following!\n"); + exit(EXIT_FAILURE); + } } else if (strcmp(argv[i], "-u") == 0) { if (argc == i+1) { printHelp(); @@ -793,7 +837,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { if ((argc == i+1) || (!isStringNumber(argv[i+1]))) { printHelp(); - errorPrint("%s", "\n\t-q need a number following!\nQuery mode -- 0: SYNC, 1: ASYNC. Default is SYNC.\n"); + errorPrint("%s", "\n\t-q need a number following!\nQuery mode -- 0: SYNC, not-0: ASYNC. Default is SYNC.\n"); exit(EXIT_FAILURE); } arguments->async_mode = atoi(argv[++i]); @@ -897,6 +941,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { && strcasecmp(argv[i], "BIGINT") && strcasecmp(argv[i], "DOUBLE") && strcasecmp(argv[i], "BINARY") + && strcasecmp(argv[i], "TIMESTAMP") && strcasecmp(argv[i], "NCHAR")) { printHelp(); errorPrint("%s", "-b: Invalid data_type!\n"); @@ -918,6 +963,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { && strcasecmp(token, "BIGINT") && strcasecmp(token, "DOUBLE") && strcasecmp(token, "BINARY") + && strcasecmp(token, "TIMESTAMP") && strcasecmp(token, "NCHAR")) { printHelp(); free(g_dupstr); @@ -1019,6 +1065,19 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } } + int columnCount; + for (columnCount = 0; columnCount < MAX_NUM_DATATYPE; columnCount ++) { + if (g_args.datatype[columnCount] == NULL) { + break; + } + } + + if (0 == columnCount) { + perror("data type error!"); + exit(-1); + } + g_args.num_of_CPR = columnCount; + if (((arguments->debug_print) && (arguments->metaFile == NULL)) || arguments->verbose_print) { printf("###################################################################\n"); @@ -1028,7 +1087,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->port ); printf("# User: %s\n", arguments->user); printf("# Password: %s\n", arguments->password); - printf("# Use metric: %s\n", arguments->use_metric ? "true" : "false"); + printf("# Use metric: %s\n", + arguments->use_metric ? "true" : "false"); if (*(arguments->datatype)) { printf("# Specified data type: "); for (int i = 0; i < MAX_NUM_DATATYPE; i++) @@ -1040,7 +1100,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } printf("# Insertion interval: %"PRIu64"\n", arguments->insert_interval); - printf("# Number of records per req: %"PRIu64"\n", + printf("# Number of records per req: %u\n", arguments->num_of_RPR); printf("# Max SQL length: %"PRIu64"\n", arguments->max_sql_len); @@ -1068,8 +1128,6 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } static bool getInfoFromJsonFile(char* file); -//static int generateOneRowDataForStb(SSuperTable* stbInfo); -//static int getDataIntoMemForStb(SSuperTable* stbInfo); static void init_rand_data(); static void tmfclose(FILE *fp) { if (NULL != fp) { @@ -1088,7 +1146,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) { TAOS_RES *res = NULL; int32_t code = -1; - for (i = 0; i < 5; i++) { + for (i = 0; i < 5 /* retry */; i++) { if (NULL != res) { taos_free_result(res); res = NULL; @@ -1104,7 +1162,8 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) { verbosePrint("%s() LN%d - command: %s\n", __func__, __LINE__, command); if (code != 0) { if (!quiet) { - errorPrint("Failed to execute %s, reason: %s\n", command, taos_errstr(res)); + errorPrint("Failed to execute %s, reason: %s\n", + command, taos_errstr(res)); } taos_free_result(res); //taos_close(taos); @@ -1320,6 +1379,8 @@ static void init_rand_data() { static int printfInsertMeta() { SHOW_PARSE_RESULT_START(); + printf("interface: \033[33m%s\033[0m\n", + (g_args.iface==TAOSC_IFACE)?"taosc":(g_args.iface==REST_IFACE)?"rest":"stmt"); printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port); printf("user: \033[33m%s\033[0m\n", g_Dbs.user); @@ -1331,7 +1392,7 @@ static int printfInsertMeta() { g_Dbs.threadCountByCreateTbl); printf("top insert interval: \033[33m%"PRIu64"\033[0m\n", g_args.insert_interval); - printf("number of records per req: \033[33m%"PRIu64"\033[0m\n", + printf("number of records per req: \033[33m%u\033[0m\n", g_args.num_of_RPR); printf("max sql length: \033[33m%"PRIu64"\033[0m\n", g_args.max_sql_len); @@ -1417,7 +1478,8 @@ static int printfInsertMeta() { if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { printf(" autoCreateTable: \033[33m%s\033[0m\n", "no"); - } else if (AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { + } else if (AUTO_CREATE_SUBTBL == + g_Dbs.db[i].superTbls[j].autoCreateTable) { printf(" autoCreateTable: \033[33m%s\033[0m\n", "yes"); } else { printf(" autoCreateTable: \033[33m%s\033[0m\n", "error"); @@ -1437,8 +1499,9 @@ static int printfInsertMeta() { g_Dbs.db[i].superTbls[j].childTblPrefix); printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); - printf(" insertMode: \033[33m%s\033[0m\n", - g_Dbs.db[i].superTbls[j].insertMode); + printf(" iface: \033[33m%s\033[0m\n", + (g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc": + (g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt"); if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) { printf(" childTblLimit: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].childTblLimit); @@ -1456,7 +1519,7 @@ static int printfInsertMeta() { printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n"); } */ - printf(" interlaceRows: \033[33m%"PRIu64"\033[0m\n", + printf(" interlaceRows: \033[33m%u\033[0m\n", g_Dbs.db[i].superTbls[j].interlaceRows); if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) { @@ -1534,7 +1597,7 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile); fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount); fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl); - fprintf(fp, "number of records per req: %"PRIu64"\n", g_args.num_of_RPR); + fprintf(fp, "number of records per req: %u\n", g_args.num_of_RPR); fprintf(fp, "max sql length: %"PRIu64"\n", g_args.max_sql_len); fprintf(fp, "database count: %d\n", g_Dbs.dbCount); @@ -1626,11 +1689,12 @@ static void printfInsertMetaToFile(FILE* fp) { g_Dbs.db[i].superTbls[j].childTblPrefix); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); - fprintf(fp, " insertMode: %s\n", - g_Dbs.db[i].superTbls[j].insertMode); + fprintf(fp, " iface: %s\n", + (g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc": + (g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt"); fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); - fprintf(fp, " interlace rows: %"PRIu64"\n", + fprintf(fp, " interlace rows: %u\n", g_Dbs.db[i].superTbls[j].interlaceRows); if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) { fprintf(fp, " stable insert interval: %"PRIu64"\n", @@ -1643,7 +1707,7 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " multiThreadWriteOneTbl: yes\n"); } */ - fprintf(fp, " interlaceRows: %"PRIu64"\n", + fprintf(fp, " interlaceRows: %u\n", g_Dbs.db[i].superTbls[j].interlaceRows); fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange); @@ -2803,7 +2867,7 @@ static int createDatabasesAndStables() { int validStbCount = 0; - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName); ret = queryDbExec(taos, command, NO_INSERT_TYPE, true); @@ -2813,7 +2877,7 @@ static int createDatabasesAndStables() { &g_Dbs.db[i].superTbls[j]); if (0 != ret) { - errorPrint("create super table %d failed!\n\n", j); + errorPrint("create super table %"PRIu64" failed!\n\n", j); continue; } } @@ -2841,13 +2905,13 @@ static void* createTable(void *sarg) threadInfo *pThreadInfo = (threadInfo *)sarg; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - int64_t lastPrintTime = taosGetTimestampMs(); + uint64_t lastPrintTime = taosGetTimestampMs(); int buff_len; buff_len = BUFFER_SIZE / 8; - char *buffer = calloc(buff_len, 1); - if (buffer == NULL) { + pThreadInfo->buffer = calloc(buff_len, 1); + if (pThreadInfo->buffer == NULL) { errorPrint("%s() LN%d, Memory allocated failed!\n", __func__, __LINE__); exit(-1); } @@ -2862,7 +2926,7 @@ static void* createTable(void *sarg) for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { if (0 == g_Dbs.use_metric) { - snprintf(buffer, buff_len, + snprintf(pThreadInfo->buffer, buff_len, "create table if not exists %s.%s%"PRIu64" %s;", pThreadInfo->db_name, g_args.tb_prefix, i, @@ -2871,13 +2935,13 @@ static void* createTable(void *sarg) if (superTblInfo == NULL) { errorPrint("%s() LN%d, use metric, but super table info is NULL\n", __func__, __LINE__); - free(buffer); + free(pThreadInfo->buffer); exit(-1); } else { if (0 == len) { batchNum = 0; - memset(buffer, 0, buff_len); - len += snprintf(buffer + len, + memset(pThreadInfo->buffer, 0, buff_len); + len += snprintf(pThreadInfo->buffer + len, buff_len - len, "create table "); } char* tagsValBuf = NULL; @@ -2889,10 +2953,10 @@ static void* createTable(void *sarg) i % superTblInfo->tagSampleCount); } if (NULL == tagsValBuf) { - free(buffer); + free(pThreadInfo->buffer); return NULL; } - len += snprintf(buffer + len, + len += snprintf(pThreadInfo->buffer + len, buff_len - len, "if not exists %s.%s%"PRIu64" using %s.%s tags %s ", pThreadInfo->db_name, superTblInfo->childTblPrefix, @@ -2909,13 +2973,14 @@ static void* createTable(void *sarg) } len = 0; - if (0 != queryDbExec(pThreadInfo->taos, buffer, NO_INSERT_TYPE, false)){ - errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer); - free(buffer); + if (0 != queryDbExec(pThreadInfo->taos, pThreadInfo->buffer, + NO_INSERT_TYPE, false)){ + errorPrint( "queryDbExec() failed. buffer:\n%s\n", pThreadInfo->buffer); + free(pThreadInfo->buffer); return NULL; } - int64_t currentPrintTime = taosGetTimestampMs(); + uint64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n", pThreadInfo->threadID, pThreadInfo->start_table_from, i); @@ -2924,21 +2989,22 @@ static void* createTable(void *sarg) } if (0 != len) { - if (0 != queryDbExec(pThreadInfo->taos, buffer, NO_INSERT_TYPE, false)) { - errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer); + if (0 != queryDbExec(pThreadInfo->taos, pThreadInfo->buffer, + NO_INSERT_TYPE, false)) { + errorPrint( "queryDbExec() failed. buffer:\n%s\n", pThreadInfo->buffer); } } - free(buffer); + free(pThreadInfo->buffer); return NULL; } static int startMultiThreadCreateChildTable( - char* cols, int threads, uint64_t startFrom, int64_t ntables, + char* cols, int threads, uint64_t tableFrom, int64_t ntables, char* db_name, SSuperTable* superTblInfo) { pthread_t *pids = malloc(threads * sizeof(pthread_t)); - threadInfo *infos = malloc(threads * sizeof(threadInfo)); + threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { printf("malloc failed\n"); @@ -2978,10 +3044,10 @@ static int startMultiThreadCreateChildTable( return -1; } - pThreadInfo->start_table_from = startFrom; + pThreadInfo->start_table_from = tableFrom; pThreadInfo->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; - startFrom = pThreadInfo->end_table_to + 1; + pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1; + tableFrom = pThreadInfo->end_table_to + 1; pThreadInfo->use_metric = true; pThreadInfo->cols = cols; pThreadInfo->minDelay = UINT64_MAX; @@ -3007,61 +3073,61 @@ static void createChildTables() { char tblColsBuf[MAX_SQL_SIZE]; int len; - for (int i = 0; i < g_Dbs.dbCount; i++) { - if (g_Dbs.use_metric) { - if (g_Dbs.db[i].superTblCount > 0) { - // with super table - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { - if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) - || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { - continue; - } + for (int i = 0; i < g_Dbs.dbCount; i++) { + if (g_Dbs.use_metric) { + if (g_Dbs.db[i].superTblCount > 0) { + // with super table + for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) + || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { + continue; + } + verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, + g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); + uint64_t startFrom = 0; + g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; - verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, - g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); - uint64_t startFrom = 0; - g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; + verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n", + __func__, __LINE__, g_totalChildTables, startFrom); - verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n", - __func__, __LINE__, g_totalChildTables, startFrom); - startMultiThreadCreateChildTable( - g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, - g_Dbs.threadCountByCreateTbl, - startFrom, - g_Dbs.db[i].superTbls[j].childTblCount, - g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); + startMultiThreadCreateChildTable( + g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, + g_Dbs.threadCountByCreateTbl, + startFrom, + g_Dbs.db[i].superTbls[j].childTblCount, + g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); + } + } + } else { + // normal table + len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP"); + for (int j = 0; j < g_args.num_of_CPR; j++) { + if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0) + || (strncasecmp(g_args.datatype[j], + "NCHAR", strlen("NCHAR")) == 0)) { + snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, + ", COL%d %s(%d)", j, g_args.datatype[j], g_args.len_of_binary); + } else { + snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, + ", COL%d %s", j, g_args.datatype[j]); + } + len = strlen(tblColsBuf); + } + + snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")"); + + verbosePrint("%s() LN%d: dbName: %s num of tb: %"PRId64" schema: %s\n", + __func__, __LINE__, + g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf); + startMultiThreadCreateChildTable( + tblColsBuf, + g_Dbs.threadCountByCreateTbl, + 0, + g_args.num_of_tables, + g_Dbs.db[i].dbName, + NULL); } - } - } else { - // normal table - len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP"); - for (int j = 0; j < g_args.num_of_CPR; j++) { - if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0) - || (strncasecmp(g_args.datatype[j], - "NCHAR", strlen("NCHAR")) == 0)) { - snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, - ", COL%d %s(%d)", j, g_args.datatype[j], g_args.len_of_binary); - } else { - snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, - ", COL%d %s", j, g_args.datatype[j]); - } - len = strlen(tblColsBuf); - } - - snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")"); - - verbosePrint("%s() LN%d: dbName: %s num of tb: %"PRId64" schema: %s\n", - __func__, __LINE__, - g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf); - startMultiThreadCreateChildTable( - tblColsBuf, - g_Dbs.threadCountByCreateTbl, - 0, - g_args.num_of_tables, - g_Dbs.db[i].dbName, - NULL); } - } } /* @@ -3132,10 +3198,12 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { return 0; } +#if 0 int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) { // TODO return 0; } +#endif /* Read 10000 lines at most. If more than 10000 lines, continue to read after using @@ -3520,9 +3588,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { // rows per table need be less than insert batch if (g_args.interlace_rows > g_args.num_of_RPR) { - printf("NOTICE: interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n", + printf("NOTICE: interlace rows value %u > num_of_records_per_req %u\n\n", g_args.interlace_rows, g_args.num_of_RPR); - printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n", + printf(" interlace rows value will be set to num_of_records_per_req %u\n\n", g_args.num_of_RPR); prompt(); g_args.interlace_rows = g_args.num_of_RPR; @@ -3753,36 +3821,40 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { // dbinfo cJSON *stbName = cJSON_GetObjectItem(stbInfo, "name"); - if (!stbName || stbName->type != cJSON_String || stbName->valuestring == NULL) { + if (!stbName || stbName->type != cJSON_String + || stbName->valuestring == NULL) { errorPrint("%s() LN%d, failed to read json, stb name not found\n", __func__, __LINE__); goto PARSE_OVER; } - tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring, MAX_TB_NAME_SIZE); + tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring, + MAX_TB_NAME_SIZE); cJSON *prefix = cJSON_GetObjectItem(stbInfo, "childtable_prefix"); if (!prefix || prefix->type != cJSON_String || prefix->valuestring == NULL) { printf("ERROR: failed to read json, childtable_prefix not found\n"); goto PARSE_OVER; } - tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring, MAX_DB_NAME_SIZE); + tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring, + MAX_DB_NAME_SIZE); - cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table"); // yes, no, null + cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table"); if (autoCreateTbl && autoCreateTbl->type == cJSON_String && autoCreateTbl->valuestring != NULL) { - if (0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3)) { - g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL; - } else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) { - g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; - } else { - g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; - } + if ((0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3)) + && (TBL_ALREADY_EXISTS != g_Dbs.db[i].superTbls[j].childTblExists)) { + g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL; + } else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) { + g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; + } else { + g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; + } } else if (!autoCreateTbl) { - g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; + g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; } else { - printf("ERROR: failed to read json, auto_create_table not found\n"); - goto PARSE_OVER; + printf("ERROR: failed to read json, auto_create_table not found\n"); + goto PARSE_OVER; } cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num"); @@ -3816,6 +3888,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } + if (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) { + g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; + } + cJSON* count = cJSON_GetObjectItem(stbInfo, "childtable_count"); if (!count || count->type != cJSON_Number || 0 >= count->valueint) { errorPrint("%s() LN%d, failed to read json, childtable_count input mistake\n", @@ -3837,15 +3913,26 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } - cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest - if (insertMode && insertMode->type == cJSON_String - && insertMode->valuestring != NULL) { - tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, - insertMode->valuestring, MAX_DB_NAME_SIZE); - } else if (!insertMode) { - tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, "taosc", MAX_DB_NAME_SIZE); + cJSON *stbIface = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt + if (stbIface && stbIface->type == cJSON_String + && stbIface->valuestring != NULL) { + if (0 == strcasecmp(stbIface->valuestring, "taosc")) { + g_Dbs.db[i].superTbls[j].iface= TAOSC_IFACE; + } else if (0 == strcasecmp(stbIface->valuestring, "rest")) { + g_Dbs.db[i].superTbls[j].iface= REST_IFACE; +#if STMT_IFACE_ENABLED == 1 + } else if (0 == strcasecmp(stbIface->valuestring, "stmt")) { + g_Dbs.db[i].superTbls[j].iface= STMT_IFACE; +#endif + } else { + errorPrint("%s() LN%d, failed to read json, insert_mode %s not recognized\n", + __func__, __LINE__, stbIface->valuestring); + goto PARSE_OVER; + } + } else if (!stbIface) { + g_Dbs.db[i].superTbls[j].iface = TAOSC_IFACE; } else { - printf("ERROR: failed to read json, insert_mode not found\n"); + errorPrint("%s", "failed to read json, insert_mode not found\n"); goto PARSE_OVER; } @@ -3864,7 +3951,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { cJSON* childTbl_offset = cJSON_GetObjectItem(stbInfo, "childtable_offset"); if ((childTbl_offset) && (g_Dbs.db[i].drop != true) && (g_Dbs.db[i].superTbls[j].childTblExists == TBL_ALREADY_EXISTS)) { - if (childTbl_offset->type != cJSON_Number || 0 > childTbl_offset->valueint) { + if ((childTbl_offset->type != cJSON_Number) + || (0 > childTbl_offset->valueint)) { printf("ERROR: failed to read json, childtable_offset\n"); goto PARSE_OVER; } @@ -3920,7 +4008,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file"); - if (tagsFile && tagsFile->type == cJSON_String && tagsFile->valuestring != NULL) { + if ((tagsFile && tagsFile->type == cJSON_String) + && (tagsFile->valuestring != NULL)) { tstrncpy(g_Dbs.db[i].superTbls[j].tagsFile, tagsFile->valuestring, MAX_FILE_NAME_LEN); if (0 == g_Dbs.db[i].superTbls[j].tagsFile[0]) { @@ -3936,9 +4025,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } - cJSON* maxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len"); - if (maxSqlLen && maxSqlLen->type == cJSON_Number) { - int32_t len = maxSqlLen->valueint; + cJSON* stbMaxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len"); + if (stbMaxSqlLen && stbMaxSqlLen->type == cJSON_Number) { + int32_t len = stbMaxSqlLen->valueint; if (len > TSDB_MAX_ALLOWED_SQL_LEN) { len = TSDB_MAX_ALLOWED_SQL_LEN; } else if (len < 5) { @@ -3948,7 +4037,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!maxSqlLen) { g_Dbs.db[i].superTbls[j].maxSqlLen = g_args.max_sql_len; } else { - errorPrint("%s() LN%d, failed to read json, maxSqlLen input mistake\n", + errorPrint("%s() LN%d, failed to read json, stbMaxSqlLen input mistake\n", __func__, __LINE__); goto PARSE_OVER; } @@ -3970,24 +4059,25 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } */ - cJSON* interlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows"); - if (interlaceRows && interlaceRows->type == cJSON_Number) { - if (interlaceRows->valueint < 0) { + cJSON* stbInterlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows"); + if (stbInterlaceRows && stbInterlaceRows->type == cJSON_Number) { + if (stbInterlaceRows->valueint < 0) { errorPrint("%s() LN%d, failed to read json, interlace rows input mistake\n", __func__, __LINE__); goto PARSE_OVER; } - g_Dbs.db[i].superTbls[j].interlaceRows = interlaceRows->valueint; + g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint; // rows per table need be less than insert batch if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) { - printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n", - i, j, g_Dbs.db[i].superTbls[j].interlaceRows, g_args.num_of_RPR); - printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n", + printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %u > num_of_records_per_req %u\n\n", + i, j, g_Dbs.db[i].superTbls[j].interlaceRows, + g_args.num_of_RPR); + printf(" interlace rows value will be set to num_of_records_per_req %u\n\n", g_args.num_of_RPR); prompt(); g_Dbs.db[i].superTbls[j].interlaceRows = g_args.num_of_RPR; } - } else if (!interlaceRows) { + } else if (!stbInterlaceRows) { g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req } else { errorPrint( @@ -4311,8 +4401,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { cJSON* resubAfterConsume = cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume"); - if (resubAfterConsume - && resubAfterConsume->type == cJSON_Number) { + if ((resubAfterConsume) + && (resubAfterConsume->type == cJSON_Number) + && (resubAfterConsume->valueint >= 0)) { g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = resubAfterConsume->valueint; } else if (!resubAfterConsume) { @@ -4473,14 +4564,15 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } cJSON* superResubAfterConsume = - cJSON_GetObjectItem(superQuery, "endAfterConsume"); - if (superResubAfterConsume - && superResubAfterConsume->type == cJSON_Number) { - g_queryInfo.superQueryInfo.endAfterConsume = + cJSON_GetObjectItem(superQuery, "resubAfterConsume"); + if ((superResubAfterConsume) + && (superResubAfterConsume->type == cJSON_Number) + && (superResubAfterConsume->valueint >= 0)) { + g_queryInfo.superQueryInfo.resubAfterConsume = superResubAfterConsume->valueint; } else if (!superResubAfterConsume) { // default value is -1, which mean do not resub - g_queryInfo.superQueryInfo.endAfterConsume = -1; + g_queryInfo.superQueryInfo.resubAfterConsume = -1; } // supert table sqls @@ -4613,7 +4705,7 @@ static void prepareSampleData() { static void postFreeResource() { tmfclose(g_fpOfInsertResult); for (int i = 0; i < g_Dbs.dbCount; i++) { - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) { free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL; @@ -4661,16 +4753,22 @@ static int getRowDataFromSample( return dataLen; } -static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stbInfo) { +static int64_t generateStbRowData( + SSuperTable* stbInfo, + char* recBuf, int64_t timestamp) +{ int64_t dataLen = 0; char *pstr = recBuf; int64_t maxLen = MAX_DATA_SIZE; - dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "(%" PRId64 ",", timestamp); + dataLen += snprintf(pstr + dataLen, maxLen - dataLen, + "(%" PRId64 ",", timestamp); for (int i = 0; i < stbInfo->columnCount; i++) { - if ((0 == strncasecmp(stbInfo->columns[i].dataType, "BINARY", strlen("BINARY"))) - || (0 == strncasecmp(stbInfo->columns[i].dataType, "NCHAR", strlen("NCHAR")))) { + if ((0 == strncasecmp(stbInfo->columns[i].dataType, + "BINARY", strlen("BINARY"))) + || (0 == strncasecmp(stbInfo->columns[i].dataType, + "NCHAR", strlen("NCHAR")))) { if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) { errorPrint( "binary or nchar length overflow, max size:%u\n", (uint32_t)TSDB_MAX_BINARY_LEN); @@ -4686,23 +4784,23 @@ static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stb dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "\'%s\',", buf); tmfree(buf); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, - "INT", 3)) { + "INT", strlen("INT"))) { dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%d,", rand_int()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, - "BIGINT", 6)) { + "BIGINT", strlen("BIGINT"))) { dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%"PRId64",", rand_bigint()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, - "FLOAT", 5)) { + "FLOAT", strlen("FLOAT"))) { dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%f,", rand_float()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, - "DOUBLE", 6)) { + "DOUBLE", strlen("DOUBLE"))) { dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%f,", rand_double()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, - "SMALLINT", 8)) { + "SMALLINT", strlen("SMALLINT"))) { dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%d,", rand_smallint()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, @@ -4732,46 +4830,38 @@ static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stb } static int64_t generateData(char *recBuf, char **data_type, - int num_of_cols, int64_t timestamp, int lenOfBinary) { + int64_t timestamp, int lenOfBinary) { memset(recBuf, 0, MAX_DATA_SIZE); char *pstr = recBuf; pstr += sprintf(pstr, "(%" PRId64, timestamp); - int c = 0; - for (; c < MAX_NUM_DATATYPE; c++) { - if (data_type[c] == NULL) { - break; - } - } + int columnCount = g_args.num_of_CPR; - if (0 == c) { - perror("data type error!"); - exit(-1); - } - - for (int i = 0; i < c; i++) { - if (strcasecmp(data_type[i % c], "TINYINT") == 0) { + for (int i = 0; i < columnCount; i++) { + if (strcasecmp(data_type[i % columnCount], "TINYINT") == 0) { pstr += sprintf(pstr, ",%d", rand_tinyint() ); - } else if (strcasecmp(data_type[i % c], "SMALLINT") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "SMALLINT") == 0) { pstr += sprintf(pstr, ",%d", rand_smallint()); - } else if (strcasecmp(data_type[i % c], "INT") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "INT") == 0) { pstr += sprintf(pstr, ",%d", rand_int()); - } else if (strcasecmp(data_type[i % c], "BIGINT") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "BIGINT") == 0) { pstr += sprintf(pstr, ",%" PRId64, rand_bigint()); - } else if (strcasecmp(data_type[i % c], "FLOAT") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "TIMESTAMP") == 0) { + pstr += sprintf(pstr, ",%" PRId64, rand_bigint()); + } else if (strcasecmp(data_type[i % columnCount], "FLOAT") == 0) { pstr += sprintf(pstr, ",%10.4f", rand_float()); - } else if (strcasecmp(data_type[i % c], "DOUBLE") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "DOUBLE") == 0) { double t = rand_double(); pstr += sprintf(pstr, ",%20.8f", t); - } else if (strcasecmp(data_type[i % c], "BOOL") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "BOOL") == 0) { bool b = rand_bool() & 1; pstr += sprintf(pstr, ",%s", b ? "true" : "false"); - } else if (strcasecmp(data_type[i % c], "BINARY") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "BINARY") == 0) { char *s = malloc(lenOfBinary); rand_string(s, lenOfBinary); pstr += sprintf(pstr, ",\"%s\"", s); free(s); - } else if (strcasecmp(data_type[i % c], "NCHAR") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "NCHAR") == 0) { char *s = malloc(lenOfBinary); rand_string(s, lenOfBinary); pstr += sprintf(pstr, ",\"%s\"", s); @@ -4818,146 +4908,134 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { return 0; } -static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k) +static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) { - int affectedRows; - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + int32_t affectedRows; + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, - __func__, __LINE__, buffer); - if (superTblInfo) { - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { - affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false); - } else if (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest"))) { - if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, - buffer, NULL /* not set result file */)) { - affectedRows = -1; - printf("========restful return fail, threadID[%d]\n", - pThreadInfo->threadID); - } else { - affectedRows = k; - } - } else { - errorPrint("%s() LN%d: unknown insert mode: %s\n", - __func__, __LINE__, superTblInfo->insertMode); - affectedRows = 0; + verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, + __func__, __LINE__, pThreadInfo->buffer); + + uint16_t iface; + if (superTblInfo) + iface = superTblInfo->iface; + else + iface = g_args.iface; + + debugPrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, + __func__, __LINE__, + (g_args.iface==TAOSC_IFACE)? + "taosc":(g_args.iface==REST_IFACE)?"rest":"stmt"); + + switch(iface) { + case TAOSC_IFACE: + affectedRows = queryDbExec( + pThreadInfo->taos, + pThreadInfo->buffer, INSERT_TYPE, false); + break; + + case REST_IFACE: + if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, + pThreadInfo->buffer, pThreadInfo)) { + affectedRows = -1; + printf("========restful return fail, threadID[%d]\n", + pThreadInfo->threadID); + } else { + affectedRows = k; + } + break; + +#if STMT_IFACE_ENABLED == 1 + case STMT_IFACE: + debugPrint("%s() LN%d, stmt=%p", __func__, __LINE__, pThreadInfo->stmt); + if (0 != taos_stmt_execute(pThreadInfo->stmt)) { + errorPrint("%s() LN%d, failied to execute insert statement\n", + __func__, __LINE__); + exit(-1); + } + affectedRows = k; + break; +#endif + + default: + errorPrint("%s() LN%d: unknown insert mode: %d\n", + __func__, __LINE__, superTblInfo->iface); + affectedRows = 0; } - } else { - affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false); - } - return affectedRows; + return affectedRows; } static void getTableName(char *pTblName, threadInfo* pThreadInfo, uint64_t tableSeq) { - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - if ((superTblInfo) - && (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable)) { - if (superTblInfo->childTblLimit > 0) { - snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", - superTblInfo->childTblName + - (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + if (superTblInfo) { + if (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable) { + if (superTblInfo->childTblLimit > 0) { + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", + superTblInfo->childTblName + + (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); + } else { + verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n", + pThreadInfo->threadID, __func__, __LINE__, + pThreadInfo->start_table_from, + pThreadInfo->ntables, tableSeq); + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", + superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); + } + } else { + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", + superTblInfo->childTblPrefix, tableSeq); + } } else { - - verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n", - pThreadInfo->threadID, __func__, __LINE__, - pThreadInfo->start_table_from, - pThreadInfo->ntables, tableSeq); - snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", - superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", + g_args.tb_prefix, tableSeq); } - } else { - snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", - g_args.tb_prefix, tableSeq); - } } -static int64_t generateDataTail( - SSuperTable* superTblInfo, - uint64_t batch, char* buffer, int64_t remainderBufLen, int64_t insertRows, - uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, int64_t *dataLen) { - uint64_t len = 0; - uint32_t ncols_per_record = 1; // count first col ts +static int32_t generateDataTailWithoutStb( + uint32_t batch, char* buffer, + int64_t remainderBufLen, int64_t insertRows, + uint64_t recordFrom, int64_t startTime, + /* int64_t *pSamplePos, */int64_t *dataLen) { + uint64_t len = 0; char *pstr = buffer; - if (superTblInfo == NULL) { - uint32_t datatypeSeq = 0; - while(g_args.datatype[datatypeSeq]) { - datatypeSeq ++; - ncols_per_record ++; - } - } + verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch); - verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch); - - bool tsRand; - if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, - "rand", strlen("rand")))) { - tsRand = true; - } else { - tsRand = false; - } - - uint64_t k = 0; + int32_t k = 0; for (k = 0; k < batch;) { char data[MAX_DATA_SIZE]; memset(data, 0, MAX_DATA_SIZE); int64_t retLen = 0; - if (superTblInfo) { - if (tsRand) { - retLen = generateRowData( - data, - startTime + getTSRandTail( - superTblInfo->timeStampStep, k, - superTblInfo->disorderRatio, - superTblInfo->disorderRange), - superTblInfo); - } else { - retLen = getRowDataFromSample( - data, - remainderBufLen, - startTime + superTblInfo->timeStampStep * k, - superTblInfo, - pSamplePos); - } - if (retLen > remainderBufLen) { - break; - } + char **data_type = g_args.datatype; + int lenOfBinary = g_args.len_of_binary; - pstr += snprintf(pstr , retLen + 1, "%s", data); - k++; - len += retLen; - remainderBufLen -= retLen; - } else { - char **data_type = g_args.datatype; - int lenOfBinary = g_args.len_of_binary; - retLen = generateData(data, data_type, - ncols_per_record, - startTime + getTSRandTail( - DEFAULT_TIMESTAMP_STEP, k, - g_args.disorderRatio, - g_args.disorderRange), - lenOfBinary); - if (len > remainderBufLen) - break; + retLen = generateData(data, data_type, + startTime + getTSRandTail( + (int64_t) DEFAULT_TIMESTAMP_STEP, k, + g_args.disorderRatio, + g_args.disorderRange), + lenOfBinary); - pstr += sprintf(pstr, "%s", data); - k++; - len += retLen; - remainderBufLen -= retLen; - } + if (len > remainderBufLen) + break; - verbosePrint("%s() LN%d len=%"PRIu64" k=%"PRIu64" \nbuffer=%s\n", + pstr += sprintf(pstr, "%s", data); + k++; + len += retLen; + remainderBufLen -= retLen; + + verbosePrint("%s() LN%d len=%"PRIu64" k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); - startFrom ++; + recordFrom ++; - if (startFrom >= insertRows) { + if (recordFrom >= insertRows) { break; } } @@ -4966,17 +5044,121 @@ static int64_t generateDataTail( return k; } -static int generateSQLHead(char *tableName, int32_t tableSeq, - threadInfo* pThreadInfo, SSuperTable* superTblInfo, +static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, + int disorderRatio, int disorderRange) +{ + int64_t randTail = timeStampStep * seq; + if (disorderRatio > 0) { + int rand_num = taosRandom() % 100; + if(rand_num < disorderRatio) { + randTail = (randTail + + (taosRandom() % disorderRange + 1)) * (-1); + debugPrint("rand data generated, back %"PRId64"\n", randTail); + } + } + + return randTail; +} + +static int32_t generateStbDataTail( + SSuperTable* superTblInfo, + uint32_t batch, char* buffer, + int64_t remainderBufLen, int64_t insertRows, + uint64_t recordFrom, int64_t startTime, + int64_t *pSamplePos, int64_t *dataLen) { + uint64_t len = 0; + + char *pstr = buffer; + + bool tsRand; + if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { + tsRand = true; + } else { + tsRand = false; + } + verbosePrint("%s() LN%d batch=%u\n", __func__, __LINE__, batch); + + int32_t k = 0; + for (k = 0; k < batch;) { + char data[MAX_DATA_SIZE]; + memset(data, 0, MAX_DATA_SIZE); + + int64_t retLen = 0; + + if (tsRand) { + retLen = generateStbRowData(superTblInfo, data, + startTime + getTSRandTail( + superTblInfo->timeStampStep, k, + superTblInfo->disorderRatio, + superTblInfo->disorderRange) + ); + } else { + retLen = getRowDataFromSample( + data, + remainderBufLen, + startTime + superTblInfo->timeStampStep * k, + superTblInfo, + pSamplePos); + } + + if (retLen > remainderBufLen) { + break; + } + + pstr += snprintf(pstr , retLen + 1, "%s", data); + k++; + len += retLen; + remainderBufLen -= retLen; + + verbosePrint("%s() LN%d len=%"PRIu64" k=%u \nbuffer=%s\n", + __func__, __LINE__, len, k, buffer); + + recordFrom ++; + + if (recordFrom >= insertRows) { + break; + } + } + + *dataLen = len; + return k; +} + + +static int generateSQLHeadWithoutStb(char *tableName, + char *dbName, char *buffer, int remainderBufLen) { int len; -#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into .. char headBuf[HEAD_BUFF_LEN]; - if (superTblInfo) { - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { + len = snprintf( + headBuf, + HEAD_BUFF_LEN, + "%s.%s values", + dbName, + tableName); + + if (len > remainderBufLen) + return -1; + + tstrncpy(buffer, headBuf, len + 1); + + return len; +} + +static int generateStbSQLHead( + SSuperTable* superTblInfo, + char *tableName, int32_t tableSeq, + char *dbName, + char *buffer, int remainderBufLen) +{ + int len; + + char headBuf[HEAD_BUFF_LEN]; + + if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { char* tagsValBuf = NULL; if (0 == superTblInfo->tagSource) { tagsValBuf = generateTagVaulesForStb(superTblInfo, tableSeq); @@ -4995,9 +5177,9 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, headBuf, HEAD_BUFF_LEN, "%s.%s using %s.%s tags %s values", - pThreadInfo->db_name, + dbName, tableName, - pThreadInfo->db_name, + dbName, superTblInfo->sTblName, tagsValBuf); tmfree(tagsValBuf); @@ -5006,22 +5188,14 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, headBuf, HEAD_BUFF_LEN, "%s.%s values", - pThreadInfo->db_name, + dbName, tableName); } else { len = snprintf( headBuf, HEAD_BUFF_LEN, "%s.%s values", - pThreadInfo->db_name, - tableName); - } - } else { - len = snprintf( - headBuf, - HEAD_BUFF_LEN, - "%s.%s values", - pThreadInfo->db_name, + dbName, tableName); } @@ -5033,8 +5207,11 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, return len; } -static int64_t generateInterlaceDataBuffer( - char *tableName, uint64_t batchPerTbl, uint64_t i, uint64_t batchPerTblTimes, +static int32_t generateStbInterlaceData( + SSuperTable *superTblInfo, + char *tableName, uint32_t batchPerTbl, + uint64_t i, + uint32_t batchPerTblTimes, uint64_t tableSeq, threadInfo *pThreadInfo, char *buffer, int64_t insertRows, @@ -5043,10 +5220,11 @@ static int64_t generateInterlaceDataBuffer( { assert(buffer); char *pstr = buffer; - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, - superTblInfo, pstr, *pRemainderBufLen); + int headLen = generateStbSQLHead( + superTblInfo, + tableName, tableSeq, pThreadInfo->db_name, + pstr, *pRemainderBufLen); if (headLen <= 0) { return 0; @@ -5060,29 +5238,25 @@ static int64_t generateInterlaceDataBuffer( int64_t dataLen = 0; - verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%"PRIu64" batchPerTbl = %"PRIu64"\n", + verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%u batchPerTbl = %u\n", pThreadInfo->threadID, __func__, __LINE__, i, batchPerTblTimes, batchPerTbl); - if (superTblInfo) { - if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { + if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { startTime = taosGetTimestamp(pThreadInfo->time_precision); - } - } else { - startTime = 1500000000000; } - int64_t k = generateDataTail( - superTblInfo, - batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, - startTime, - &(pThreadInfo->samplePos), &dataLen); + int32_t k = generateStbDataTail( + superTblInfo, + batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, + startTime, + &(pThreadInfo->samplePos), &dataLen); if (k == batchPerTbl) { pstr += dataLen; *pRemainderBufLen -= dataLen; } else { - debugPrint("%s() LN%d, generated data tail: %"PRIu64", not equal batch per table: %"PRIu64"\n", + debugPrint("%s() LN%d, generated data tail: %u, not equal batch per table: %u\n", __func__, __LINE__, k, batchPerTbl); pstr -= headLen; pstr[0] = '\0'; @@ -5092,50 +5266,363 @@ static int64_t generateInterlaceDataBuffer( return k; } -static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, - int disorderRatio, int disorderRange) +static int64_t generateInterlaceDataWithoutStb( + char *tableName, uint32_t batch, + uint64_t tableSeq, + char *dbName, char *buffer, + int64_t insertRows, + int64_t startTime, + uint64_t *pRemainderBufLen) { - int64_t randTail = timeStampStep * seq; - if (disorderRatio > 0) { - int rand_num = taosRandom() % 100; - if(rand_num < disorderRatio) { - randTail = (randTail + - (taosRandom() % disorderRange + 1)) * (-1); - debugPrint("rand data generated, back %"PRId64"\n", randTail); + assert(buffer); + char *pstr = buffer; + + int headLen = generateSQLHeadWithoutStb( + tableName, dbName, + pstr, *pRemainderBufLen); + + if (headLen <= 0) { + return 0; + } + + pstr += headLen; + *pRemainderBufLen -= headLen; + + int64_t dataLen = 0; + + int32_t k = generateDataTailWithoutStb( + batch, pstr, *pRemainderBufLen, insertRows, 0, + startTime, + &dataLen); + + if (k == batch) { + pstr += dataLen; + *pRemainderBufLen -= dataLen; + } else { + debugPrint("%s() LN%d, generated data tail: %d, not equal batch per table: %u\n", + __func__, __LINE__, k, batch); + pstr -= headLen; + pstr[0] = '\0'; + k = 0; + } + + return k; +} + +#if STMT_IFACE_ENABLED == 1 +static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, + char *dataType, int32_t dataLen, char **ptr) +{ + if (0 == strncasecmp(dataType, + "BINARY", strlen("BINARY"))) { + if (dataLen > TSDB_MAX_BINARY_LEN) { + errorPrint( "binary length overflow, max size:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); + return -1; + } + char *bind_binary = (char *)*ptr; + rand_string(bind_binary, dataLen); + + bind->buffer_type = TSDB_DATA_TYPE_BINARY; + bind->buffer_length = dataLen; + bind->buffer = bind_binary; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "NCHAR", strlen("NCHAR"))) { + if (dataLen > TSDB_MAX_BINARY_LEN) { + errorPrint( "nchar length overflow, max size:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); + return -1; + } + char *bind_nchar = (char *)*ptr; + rand_string(bind_nchar, dataLen); + + bind->buffer_type = TSDB_DATA_TYPE_NCHAR; + bind->buffer_length = strlen(bind_nchar); + bind->buffer = bind_nchar; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "INT", strlen("INT"))) { + int32_t *bind_int = (int32_t *)*ptr; + + *bind_int = rand_int(); + bind->buffer_type = TSDB_DATA_TYPE_INT; + bind->buffer_length = sizeof(int32_t); + bind->buffer = bind_int; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "BIGINT", strlen("BIGINT"))) { + int64_t *bind_bigint = (int64_t *)*ptr; + + *bind_bigint = rand_bigint(); + bind->buffer_type = TSDB_DATA_TYPE_BIGINT; + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_bigint; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "FLOAT", strlen("FLOAT"))) { + float *bind_float = (float *) *ptr; + + *bind_float = rand_float(); + bind->buffer_type = TSDB_DATA_TYPE_FLOAT; + bind->buffer_length = sizeof(float); + bind->buffer = bind_float; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "DOUBLE", strlen("DOUBLE"))) { + double *bind_double = (double *)*ptr; + + *bind_double = rand_double(); + bind->buffer_type = TSDB_DATA_TYPE_DOUBLE; + bind->buffer_length = sizeof(double); + bind->buffer = bind_double; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "SMALLINT", strlen("SMALLINT"))) { + int16_t *bind_smallint = (int16_t *)*ptr; + + *bind_smallint = rand_smallint(); + bind->buffer_type = TSDB_DATA_TYPE_SMALLINT; + bind->buffer_length = sizeof(int16_t); + bind->buffer = bind_smallint; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "TINYINT", strlen("TINYINT"))) { + int8_t *bind_tinyint = (int8_t *)*ptr; + + *bind_tinyint = rand_tinyint(); + bind->buffer_type = TSDB_DATA_TYPE_TINYINT; + bind->buffer_length = sizeof(int8_t); + bind->buffer = bind_tinyint; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "BOOL", strlen("BOOL"))) { + int8_t *bind_bool = (int8_t *)*ptr; + + *bind_bool = rand_bool(); + bind->buffer_type = TSDB_DATA_TYPE_BOOL; + bind->buffer_length = sizeof(int8_t); + bind->buffer = bind_bool; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "TIMESTAMP", strlen("TIMESTAMP"))) { + int64_t *bind_ts2 = (int64_t *) *ptr; + + *bind_ts2 = rand_bigint(); + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_ts2; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else { + errorPrint( "No support data type: %s\n", + dataType); + return -1; + } + + return 0; +} + +static int32_t prepareStmtWithoutStb( + TAOS_STMT *stmt, + char *tableName, + uint32_t batch, + int64_t insertRows, + int64_t recordFrom, + int64_t startTime) +{ + int ret = taos_stmt_set_tbname(stmt, tableName); + if (ret != 0) { + errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n", + tableName, ret, taos_errstr(NULL)); + return ret; + } + + char **data_type = g_args.datatype; + + char *bindArray = malloc(sizeof(TAOS_BIND) * (g_args.num_of_CPR + 1)); + if (bindArray == NULL) { + errorPrint("Failed to allocate %d bind params\n", + (g_args.num_of_CPR + 1)); + return -1; + } + + int32_t k = 0; + for (k = 0; k < batch;) { + /* columnCount + 1 (ts) */ + char data[MAX_DATA_SIZE]; + memset(data, 0, MAX_DATA_SIZE); + + char *ptr = data; + TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0); + + int64_t *bind_ts; + + bind_ts = (int64_t *)ptr; + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + *bind_ts = startTime + getTSRandTail( + (int64_t)DEFAULT_TIMESTAMP_STEP, k, + g_args.disorderRatio, + g_args.disorderRange); + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_ts; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + ptr += bind->buffer_length; + + for (int i = 0; i < g_args.num_of_CPR; i ++) { + bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1))); + if ( -1 == prepareStmtBindArrayByType( + bind, + data_type[i], + g_args.len_of_binary, + &ptr)) { + return -1; + } + } + taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray); + // if msg > 3MB, break + taos_stmt_add_batch(stmt); + + k++; + recordFrom ++; + if (recordFrom >= insertRows) { + break; } } - return randTail; + free(bindArray); + return k; } -static int64_t generateProgressiveDataBuffer( +static int32_t prepareStbStmt(SSuperTable *stbInfo, + TAOS_STMT *stmt, + char *tableName, uint32_t batch, + uint64_t insertRows, + uint64_t recordFrom, + int64_t startTime, char *buffer) +{ + int ret = taos_stmt_set_tbname(stmt, tableName); + if (ret != 0) { + errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n", + tableName, ret, taos_errstr(NULL)); + return ret; + } + + char *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1)); + if (bindArray == NULL) { + errorPrint("Failed to allocate %d bind params\n", + (stbInfo->columnCount + 1)); + return -1; + } + + bool tsRand; + if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) { + tsRand = true; + } else { + tsRand = false; + } + + uint32_t k; + for (k = 0; k < batch;) { + /* columnCount + 1 (ts) */ + char data[MAX_DATA_SIZE]; + memset(data, 0, MAX_DATA_SIZE); + + char *ptr = data; + TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0); + + int64_t *bind_ts; + + bind_ts = (int64_t *)ptr; + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + if (tsRand) { + *bind_ts = startTime + getTSRandTail( + stbInfo->timeStampStep, k, + stbInfo->disorderRatio, + stbInfo->disorderRange); + } else { + *bind_ts = startTime + stbInfo->timeStampStep * k; + } + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_ts; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + ptr += bind->buffer_length; + + for (int i = 0; i < stbInfo->columnCount; i ++) { + bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1))); + if ( -1 == prepareStmtBindArrayByType( + bind, + stbInfo->columns[i].dataType, + stbInfo->columns[i].dataLen, + &ptr)) { + return -1; + } + } + taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray); + // if msg > 3MB, break + taos_stmt_add_batch(stmt); + + k++; + recordFrom ++; + if (recordFrom >= insertRows) { + break; + } + } + + free(bindArray); + return k; +} +#endif + +static int32_t generateStbProgressiveData( + SSuperTable *superTblInfo, char *tableName, int64_t tableSeq, - threadInfo *pThreadInfo, char *buffer, + char *dbName, char *buffer, int64_t insertRows, - uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, + uint64_t recordFrom, int64_t startTime, int64_t *pSamplePos, int64_t *pRemainderBufLen) { - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - - int ncols_per_record = 1; // count first col ts - - if (superTblInfo == NULL) { - int datatypeSeq = 0; - while(g_args.datatype[datatypeSeq]) { - datatypeSeq ++; - ncols_per_record ++; - } - } - assert(buffer != NULL); char *pstr = buffer; - int64_t k = 0; - memset(buffer, 0, *pRemainderBufLen); - int64_t headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo, + int64_t headLen = generateStbSQLHead( + superTblInfo, + tableName, tableSeq, dbName, buffer, *pRemainderBufLen); if (headLen <= 0) { @@ -5145,29 +5632,64 @@ static int64_t generateProgressiveDataBuffer( *pRemainderBufLen -= headLen; int64_t dataLen; - k = generateDataTail(superTblInfo, - g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom, + + return generateStbDataTail(superTblInfo, + g_args.num_of_RPR, pstr, *pRemainderBufLen, + insertRows, recordFrom, startTime, pSamplePos, &dataLen); +} - return k; +static int32_t generateProgressiveDataWithoutStb( + char *tableName, + /* int64_t tableSeq, */ + threadInfo *pThreadInfo, char *buffer, + int64_t insertRows, + uint64_t recordFrom, int64_t startTime, /*int64_t *pSamplePos, */ + int64_t *pRemainderBufLen) +{ + assert(buffer != NULL); + char *pstr = buffer; + + memset(buffer, 0, *pRemainderBufLen); + + int64_t headLen = generateSQLHeadWithoutStb( + tableName, pThreadInfo->db_name, + buffer, *pRemainderBufLen); + + if (headLen <= 0) { + return 0; + } + pstr += headLen; + *pRemainderBufLen -= headLen; + + int64_t dataLen; + + return generateDataTailWithoutStb( + g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, recordFrom, + startTime, + /*pSamplePos, */&dataLen); } static void printStatPerThread(threadInfo *pThreadInfo) { - fprintf(stderr, "====thread[%d] completed total inserted rows: %"PRIu64 ", total affected rows: %"PRIu64". %.2f records/second====\n", + fprintf(stderr, "====thread[%d] completed total inserted rows: %"PRIu64 ", total affected rows: %"PRIu64". %.2f records/second====\n", pThreadInfo->threadID, pThreadInfo->totalInsertRows, pThreadInfo->totalAffectedRows, (double)(pThreadInfo->totalAffectedRows / (pThreadInfo->totalDelay/1000.0))); } +// sync write interlace data static void* syncWriteInterlace(threadInfo *pThreadInfo) { debugPrint("[%d] %s() LN%d: ### interlace write\n", pThreadInfo->threadID, __func__, __LINE__); int64_t insertRows; - uint64_t interlaceRows; + uint32_t interlaceRows; + uint64_t maxSqlLen; + int64_t nTimeStampStep; + uint64_t insert_interval; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; @@ -5180,62 +5702,30 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { } else { interlaceRows = superTblInfo->interlaceRows; } + maxSqlLen = superTblInfo->maxSqlLen; + nTimeStampStep = superTblInfo->timeStampStep; + insert_interval = superTblInfo->insertInterval; } else { insertRows = g_args.num_of_DPT; interlaceRows = g_args.interlace_rows; + maxSqlLen = g_args.max_sql_len; + nTimeStampStep = DEFAULT_TIMESTAMP_STEP; + insert_interval = g_args.insert_interval; } + debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", + pThreadInfo->threadID, __func__, __LINE__, + pThreadInfo->start_table_from, + pThreadInfo->ntables, insertRows); + if (interlaceRows > insertRows) interlaceRows = insertRows; if (interlaceRows > g_args.num_of_RPR) interlaceRows = g_args.num_of_RPR; - int insertMode; - - if (interlaceRows > 0) { - insertMode = INTERLACE_INSERT_MODE; - } else { - insertMode = PROGRESSIVE_INSERT_MODE; - } - - // TODO: prompt tbl count multple interlace rows and batch - // - - uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; - char* buffer = calloc(maxSqlLen, 1); - if (NULL == buffer) { - errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n", - __func__, __LINE__, maxSqlLen, strerror(errno)); - return NULL; - } - - char tableName[TSDB_TABLE_NAME_LEN]; - - pThreadInfo->totalInsertRows = 0; - pThreadInfo->totalAffectedRows = 0; - - int64_t nTimeStampStep = superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; - - uint64_t insert_interval = - superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; - uint64_t st = 0; - uint64_t et = UINT64_MAX; - - uint64_t lastPrintTime = taosGetTimestampMs(); - uint64_t startTs = taosGetTimestampMs(); - uint64_t endTs; - - uint64_t tableSeq = pThreadInfo->start_table_from; - - debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", - pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->start_table_from, - pThreadInfo->ntables, insertRows); - - int64_t startTime = pThreadInfo->start_time; - - uint64_t batchPerTbl = interlaceRows; - uint64_t batchPerTblTimes; + uint32_t batchPerTbl = interlaceRows; + uint32_t batchPerTblTimes; if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { batchPerTblTimes = @@ -5244,52 +5734,116 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { batchPerTblTimes = 1; } + pThreadInfo->buffer = calloc(maxSqlLen, 1); + if (NULL == pThreadInfo->buffer) { + errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n", + __func__, __LINE__, maxSqlLen, strerror(errno)); + return NULL; + } + + pThreadInfo->totalInsertRows = 0; + pThreadInfo->totalAffectedRows = 0; + + uint64_t st = 0; + uint64_t et = UINT64_MAX; + + uint64_t lastPrintTime = taosGetTimestampMs(); + uint64_t startTs = taosGetTimestampMs(); + uint64_t endTs; + + uint64_t tableSeq = pThreadInfo->start_table_from; + int64_t startTime = pThreadInfo->start_time; + uint64_t generatedRecPerTbl = 0; bool flagSleep = true; uint64_t sleepTimeTotal = 0; - char *strInsertInto = "insert into "; - int nInsertBufLen = strlen(strInsertInto); - while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { if ((flagSleep) && (insert_interval)) { st = taosGetTimestampMs(); flagSleep = false; } // generate data - memset(buffer, 0, maxSqlLen); + memset(pThreadInfo->buffer, 0, maxSqlLen); uint64_t remainderBufLen = maxSqlLen; - char *pstr = buffer; + char *pstr = pThreadInfo->buffer; - int len = snprintf(pstr, nInsertBufLen + 1, "%s", strInsertInto); + int len = snprintf(pstr, + strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO); pstr += len; remainderBufLen -= len; - uint64_t recOfBatch = 0; + uint32_t recOfBatch = 0; + + for (uint32_t i = 0; i < batchPerTblTimes; i ++) { + char tableName[TSDB_TABLE_NAME_LEN]; - for (uint64_t i = 0; i < batchPerTblTimes; i ++) { getTableName(tableName, pThreadInfo, tableSeq); if (0 == strlen(tableName)) { errorPrint("[%d] %s() LN%d, getTableName return null\n", pThreadInfo->threadID, __func__, __LINE__); - free(buffer); + free(pThreadInfo->buffer); return NULL; } uint64_t oldRemainderLen = remainderBufLen; - int64_t generated = generateInterlaceDataBuffer( - tableName, batchPerTbl, i, batchPerTblTimes, - tableSeq, - pThreadInfo, pstr, - insertRows, - startTime, - &remainderBufLen); - debugPrint("[%d] %s() LN%d, generated records is %"PRId64"\n", + int32_t generated; + if (superTblInfo) { + if (superTblInfo->iface == STMT_IFACE) { +#if STMT_IFACE_ENABLED == 1 + generated = prepareStbStmt(superTblInfo, + pThreadInfo->stmt, + tableName, + batchPerTbl, + insertRows, i, + startTime, + pThreadInfo->buffer); +#else + generated = -1; +#endif + } else { + generated = generateStbInterlaceData( + superTblInfo, + tableName, batchPerTbl, i, + batchPerTblTimes, + tableSeq, + pThreadInfo, pstr, + insertRows, + startTime, + &remainderBufLen); + } + } else { + if (g_args.iface == STMT_IFACE) { + debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n", + pThreadInfo->threadID, + __func__, __LINE__, + tableName, batchPerTbl, startTime); +#if STMT_IFACE_ENABLED == 1 + generated = prepareStmtWithoutStb( + pThreadInfo->stmt, tableName, + batchPerTbl, + insertRows, i, + startTime); +#else + generated = -1; +#endif + } else { + generated = generateInterlaceDataWithoutStb( + tableName, batchPerTbl, + tableSeq, + pThreadInfo->db_name, pstr, + insertRows, + startTime, + &remainderBufLen); + } + } + + debugPrint("[%d] %s() LN%d, generated records is %d\n", pThreadInfo->threadID, __func__, __LINE__, generated); if (generated < 0) { - errorPrint("[%d] %s() LN%d, generated records is %"PRId64"\n", + errorPrint("[%d] %s() LN%d, generated records is %d\n", pThreadInfo->threadID, __func__, __LINE__, generated); goto free_of_interlace; } else if (generated == 0) { @@ -5298,15 +5852,15 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { tableSeq ++; recOfBatch += batchPerTbl; + pstr += (oldRemainderLen - remainderBufLen); -// startTime += batchPerTbl * superTblInfo->timeStampStep; pThreadInfo->totalInsertRows += batchPerTbl; - verbosePrint("[%d] %s() LN%d batchPerTbl=%"PRId64" recOfBatch=%"PRId64"\n", + + verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n", pThreadInfo->threadID, __func__, __LINE__, batchPerTbl, recOfBatch); - if (insertMode == INTERLACE_INSERT_MODE) { - if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { + if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { // turn to first table tableSeq = pThreadInfo->start_table_from; generatedRecPerTbl += batchPerTbl; @@ -5318,13 +5872,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { if (generatedRecPerTbl >= insertRows) break; - int remainRows = insertRows - generatedRecPerTbl; + int64_t remainRows = insertRows - generatedRecPerTbl; if ((remainRows > 0) && (batchPerTbl > remainRows)) batchPerTbl = remainRows; if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR) break; - } } verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n", @@ -5335,22 +5888,22 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { break; } - verbosePrint("[%d] %s() LN%d recOfBatch=%"PRIu64" totalInsertRows=%"PRIu64"\n", + verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n", pThreadInfo->threadID, __func__, __LINE__, recOfBatch, pThreadInfo->totalInsertRows); verbosePrint("[%d] %s() LN%d, buffer=%s\n", - pThreadInfo->threadID, __func__, __LINE__, buffer); + pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer); startTs = taosGetTimestampMs(); if (recOfBatch == 0) { - errorPrint("[%d] %s() LN%d try inserting records of batch is %"PRIu64"\n", + errorPrint("[%d] %s() LN%d try inserting records of batch is %d\n", pThreadInfo->threadID, __func__, __LINE__, recOfBatch); errorPrint("%s\n", "\tPlease check if the batch or the buffer length is proper value!\n"); goto free_of_interlace; } - int64_t affectedRows = execInsert(pThreadInfo, buffer, recOfBatch); + int64_t affectedRows = execInsert(pThreadInfo, recOfBatch); endTs = taosGetTimestampMs(); uint64_t delay = endTs - startTs; @@ -5366,9 +5919,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { pThreadInfo->totalDelay += delay; if (recOfBatch != affectedRows) { - errorPrint("[%d] %s() LN%d execInsert insert %"PRIu64", affected rows: %"PRId64"\n%s\n", + errorPrint("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n%s\n", pThreadInfo->threadID, __func__, __LINE__, - recOfBatch, affectedRows, buffer); + recOfBatch, affectedRows, pThreadInfo->buffer); goto free_of_interlace; } @@ -5387,8 +5940,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { et = taosGetTimestampMs(); if (insert_interval > (et - st) ) { - int sleepTime = insert_interval - (et -st); - performancePrint("%s() LN%d sleep: %d ms for insert interval\n", + uint64_t sleepTime = insert_interval - (et -st); + performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n", __func__, __LINE__, sleepTime); taosMsleep(sleepTime); // ms sleepTimeTotal += insert_interval; @@ -5397,27 +5950,26 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { } free_of_interlace: - tmfree(buffer); + tmfree(pThreadInfo->buffer); printStatPerThread(pThreadInfo); return NULL; } -// sync insertion -/* - 1 thread: 100 tables * 2000 rows/s - 1 thread: 10 tables * 20000 rows/s - 6 thread: 300 tables * 2000 rows/s - - 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s -*/ +// sync insertion progressive data static void* syncWriteProgressive(threadInfo *pThreadInfo) { debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); SSuperTable* superTblInfo = pThreadInfo->superTblInfo; uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; + int64_t timeStampStep = + superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; + int64_t insertRows = + (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; + verbosePrint("%s() LN%d insertRows=%"PRId64"\n", + __func__, __LINE__, insertRows); - char* buffer = calloc(maxSqlLen, 1); - if (NULL == buffer) { + pThreadInfo->buffer = calloc(maxSqlLen, 1); + if (NULL == pThreadInfo->buffer) { errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n", maxSqlLen, strerror(errno)); @@ -5428,35 +5980,17 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { uint64_t startTs = taosGetTimestampMs(); uint64_t endTs; - int64_t timeStampStep = - superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; -/* int insert_interval = - superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; - uint64_t st = 0; - uint64_t et = 0xffffffff; - */ - pThreadInfo->totalInsertRows = 0; pThreadInfo->totalAffectedRows = 0; pThreadInfo->samplePos = 0; - for (uint64_t tableSeq = - pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to; - tableSeq ++) { + for (uint64_t tableSeq = pThreadInfo->start_table_from; + tableSeq <= pThreadInfo->end_table_to; + tableSeq ++) { int64_t start_time = pThreadInfo->start_time; - int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; - - verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); - for (uint64_t i = 0; i < insertRows;) { - /* - if (insert_interval) { - st = taosGetTimestampMs(); - } - */ - char tableName[TSDB_TABLE_NAME_LEN]; getTableName(tableName, pThreadInfo, tableSeq); verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n", @@ -5464,19 +5998,57 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { pThreadInfo->threadID, tableSeq, tableName); int64_t remainderBufLen = maxSqlLen; - char *pstr = buffer; - int nInsertBufLen = strlen("insert into "); + char *pstr = pThreadInfo->buffer; - int len = snprintf(pstr, nInsertBufLen + 1, "%s", "insert into "); + int len = snprintf(pstr, + strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO); pstr += len; remainderBufLen -= len; - int64_t generated = generateProgressiveDataBuffer( - tableName, tableSeq, pThreadInfo, pstr, insertRows, - i, start_time, - &(pThreadInfo->samplePos), - &remainderBufLen); + int32_t generated; + if (superTblInfo) { + if (superTblInfo->iface == STMT_IFACE) { +#if STMT_IFACE_ENABLED == 1 + generated = prepareStbStmt( + superTblInfo, + pThreadInfo->stmt, + tableName, + g_args.num_of_RPR, + insertRows, i, start_time, pstr); +#else + generated = -1; +#endif + } else { + generated = generateStbProgressiveData( + superTblInfo, + tableName, tableSeq, pThreadInfo->db_name, pstr, + insertRows, i, start_time, + &(pThreadInfo->samplePos), + &remainderBufLen); + } + } else { + if (g_args.iface == STMT_IFACE) { +#if STMT_IFACE_ENABLED == 1 + generated = prepareStmtWithoutStb( + pThreadInfo->stmt, + tableName, + g_args.num_of_RPR, + insertRows, i, + start_time); +#else + generated = -1; +#endif + } else { + generated = generateProgressiveDataWithoutStb( + tableName, + /* tableSeq, */ + pThreadInfo, pstr, insertRows, + i, start_time, + /* &(pThreadInfo->samplePos), */ + &remainderBufLen); + } + } if (generated > 0) i += generated; else @@ -5487,13 +6059,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { startTs = taosGetTimestampMs(); - int64_t affectedRows = execInsert(pThreadInfo, buffer, generated); + int32_t affectedRows = execInsert(pThreadInfo, generated); endTs = taosGetTimestampMs(); uint64_t delay = endTs - startTs; performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n", __func__, __LINE__, delay); - verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n", + verbosePrint("[%d] %s() LN%d affectedRows=%d\n", pThreadInfo->threadID, __func__, __LINE__, affectedRows); @@ -5503,7 +6075,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { pThreadInfo->totalDelay += delay; if (affectedRows < 0) { - errorPrint("%s() LN%d, affected rows: %"PRId64"\n", + errorPrint("%s() LN%d, affected rows: %d\n", __func__, __LINE__, affectedRows); goto free_of_progressive; } @@ -5521,32 +6093,19 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { if (i >= insertRows) break; -/* - if (insert_interval) { - et = taosGetTimestampMs(); - - if (insert_interval > ((et - st)) ) { - int sleep_time = insert_interval - (et -st); - performancePrint("%s() LN%d sleep: %d ms for insert interval\n", - __func__, __LINE__, sleep_time); - taosMsleep(sleep_time); // ms - } - } - */ } // num_of_DPT - if (g_args.verbose_print) { - if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo && + if ((g_args.verbose_print) && + (tableSeq == pThreadInfo->ntables - 1) && (superTblInfo) && (0 == strncasecmp( superTblInfo->dataSource, "sample", strlen("sample")))) { verbosePrint("%s() LN%d samplePos=%"PRId64"\n", __func__, __LINE__, pThreadInfo->samplePos); - } } } // tableSeq free_of_progressive: - tmfree(buffer); + tmfree(pThreadInfo->buffer); printStatPerThread(pThreadInfo); return NULL; } @@ -5556,7 +6115,7 @@ static void* syncWrite(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - int interlaceRows; + uint32_t interlaceRows; if (superTblInfo) { if ((superTblInfo->interlaceRows == 0) @@ -5576,7 +6135,6 @@ static void* syncWrite(void *sarg) { // progressive mode return syncWriteProgressive(pThreadInfo); } - } static void callBack(void *param, TAOS_RES *res, int code) { @@ -5616,9 +6174,10 @@ static void callBack(void *param, TAOS_RES *res, int code) { && rand_num < pThreadInfo->superTblInfo->disorderRatio) { int64_t d = pThreadInfo->lastTs - (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1); - generateRowData(data, d, pThreadInfo->superTblInfo); + generateStbRowData(pThreadInfo->superTblInfo, data, d); } else { - generateRowData(data, pThreadInfo->lastTs += 1000, pThreadInfo->superTblInfo); + generateStbRowData(pThreadInfo->superTblInfo, + data, pThreadInfo->lastTs += 1000); } pstr += sprintf(pstr, "%s", data); pThreadInfo->counter++; @@ -5686,24 +6245,6 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * static void startMultiThreadInsertData(int threads, char* db_name, char* precision,SSuperTable* superTblInfo) { - pthread_t *pids = malloc(threads * sizeof(pthread_t)); - assert(pids != NULL); - - threadInfo *infos = malloc(threads * sizeof(threadInfo)); - assert(infos != NULL); - - memset(pids, 0, threads * sizeof(pthread_t)); - memset(infos, 0, threads * sizeof(threadInfo)); - - //TAOS* taos; - //if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { - // taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); - // if (NULL == taos) { - // printf("connect to server fail, reason: %s\n", taos_errstr(NULL)); - // exit(-1); - // } - //} - int32_t timePrec = TSDB_TIME_PRECISION_MILLI; if (0 != precision[0]) { if (0 == strncasecmp(precision, "ms", 2)) { @@ -5755,17 +6296,17 @@ static void startMultiThreadInsertData(int threads, char* db_name, } } - TAOS* taos = taos_connect( + TAOS* taos0 = taos_connect( g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); - if (NULL == taos) { + if (NULL == taos0) { errorPrint("%s() LN%d, connect to server fail , reason: %s\n", __func__, __LINE__, taos_errstr(NULL)); exit(-1); } int64_t ntables = 0; - uint64_t startFrom; + uint64_t tableFrom; if (superTblInfo) { int64_t limit; @@ -5792,7 +6333,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, } ntables = limit; - startFrom = offset; + tableFrom = offset; if ((superTblInfo->childTblExists != TBL_NO_EXISTS) && ((superTblInfo->childTblOffset + superTblInfo->childTblLimit ) @@ -5811,23 +6352,23 @@ static void startMultiThreadInsertData(int threads, char* db_name, limit * TSDB_TABLE_NAME_LEN); if (superTblInfo->childTblName == NULL) { errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__); - taos_close(taos); + taos_close(taos0); exit(-1); } int64_t childTblCount; getChildNameOfSuperTableWithLimitAndOffset( - taos, + taos0, db_name, superTblInfo->sTblName, &superTblInfo->childTblName, &childTblCount, limit, offset); } else { ntables = g_args.num_of_tables; - startFrom = 0; + tableFrom = 0; } - taos_close(taos); + taos_close(taos0); int64_t a = ntables / threads; if (a < 1) { @@ -5841,11 +6382,22 @@ static void startMultiThreadInsertData(int threads, char* db_name, } if ((superTblInfo) - && (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest")))) { - if (convertHostToServAddr(g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) - exit(-1); + && (superTblInfo->iface == REST_IFACE)) { + if (convertHostToServAddr( + g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) { + exit(-1); + } } + pthread_t *pids = malloc(threads * sizeof(pthread_t)); + assert(pids != NULL); + + threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); + assert(infos != NULL); + + memset(pids, 0, threads * sizeof(pthread_t)); + memset(infos, 0, threads * sizeof(threadInfo)); + for (int i = 0; i < threads; i++) { threadInfo *pThreadInfo = infos + i; pThreadInfo->threadID = i; @@ -5857,17 +6409,61 @@ static void startMultiThreadInsertData(int threads, char* db_name, pThreadInfo->minDelay = UINT64_MAX; if ((NULL == superTblInfo) || - (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) { - //pThreadInfo->taos = taos; + (superTblInfo->iface != REST_IFACE)) { + //t_info->taos = taos; pThreadInfo->taos = taos_connect( g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); if (NULL == pThreadInfo->taos) { errorPrint( - "connect to server fail from insert sub thread, reason: %s\n", + "%s() LN%d, connect to server fail from insert sub thread, reason: %s\n", + __func__, __LINE__, taos_errstr(NULL)); + free(infos); exit(-1); } + +#if STMT_IFACE_ENABLED == 1 + if ((g_args.iface == STMT_IFACE) + || ((superTblInfo) && (superTblInfo->iface == STMT_IFACE))) { + + int columnCount; + if (superTblInfo) { + columnCount = superTblInfo->columnCount; + } else { + columnCount = g_args.num_of_CPR; + } + + pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos); + if (NULL == pThreadInfo->stmt) { + errorPrint( + "%s() LN%d, failed init stmt, reason: %s\n", + __func__, __LINE__, + taos_errstr(NULL)); + free(pids); + free(infos); + exit(-1); + } + + char buffer[3000]; + char *pstr = buffer; + pstr += sprintf(pstr, "INSERT INTO ? values(?"); + + for (int col = 0; col < columnCount; col ++) { + pstr += sprintf(pstr, ",?"); + } + pstr += sprintf(pstr, ")"); + + int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0); + if (ret != 0){ + errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n", + ret, taos_errstr(NULL)); + free(pids); + free(infos); + exit(-1); + } + } +#endif } else { pThreadInfo->taos = NULL; } @@ -5875,10 +6471,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, /* if ((NULL == superTblInfo) || (0 == superTblInfo->multiThreadWriteOneTbl)) { */ - pThreadInfo->start_table_from = startFrom; + pThreadInfo->start_table_from = tableFrom; pThreadInfo->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; - startFrom = pThreadInfo->end_table_to + 1; + pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1; + tableFrom = pThreadInfo->end_table_to + 1; /* } else { pThreadInfo->start_table_from = 0; pThreadInfo->ntables = superTblInfo->childTblCount; @@ -5907,6 +6503,13 @@ static void startMultiThreadInsertData(int threads, char* db_name, threadInfo *pThreadInfo = infos + i; tsem_destroy(&(pThreadInfo->lock_sem)); + +#if STMT_IFACE_ENABLED == 1 + if (pThreadInfo->stmt) { + taos_stmt_close(pThreadInfo->stmt); + } +#endif + tsem_destroy(&(pThreadInfo->lock_sem)); taos_close(pThreadInfo->taos); debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n", @@ -6182,13 +6785,12 @@ static int insertTestProcess() { } } - taosMsleep(1000); // create sub threads for inserting data //start = taosGetTimestampMs(); for (int i = 0; i < g_Dbs.dbCount; i++) { if (g_Dbs.use_metric) { if (g_Dbs.db[i].superTblCount > 0) { - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; @@ -6512,15 +7114,15 @@ static int queryTestProcess() { b = ntables % threads; } - uint64_t startFrom = 0; + uint64_t tableFrom = 0; for (int i = 0; i < threads; i++) { threadInfo *pThreadInfo = infosOfSub + i; pThreadInfo->threadID = i; - pThreadInfo->start_table_from = startFrom; + pThreadInfo->start_table_from = tableFrom; pThreadInfo->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; - startFrom = pThreadInfo->end_table_to + 1; + pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1; + tableFrom = pThreadInfo->end_table_to + 1; pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection; pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo); } @@ -6826,11 +7428,14 @@ static void *specifiedSubscribe(void *sarg) { g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID] = taos_consume( g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]); if (g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]) { - if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) { + if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] + != 0) { sprintf(pThreadInfo->filePath, "%s-%d", g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], pThreadInfo->threadID); - fetchResult(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID], pThreadInfo); + fetchResult( + g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID], + pThreadInfo); } g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++; @@ -6843,16 +7448,17 @@ static void *specifiedSubscribe(void *sarg) { g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0; taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID], g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); - g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = subscribeImpl( - SPECIFIED_CLASS, - pThreadInfo, - g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], - g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID], - g_queryInfo.specifiedQueryInfo.subscribeRestart, - g_queryInfo.specifiedQueryInfo.subscribeInterval); + g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = + subscribeImpl( + SPECIFIED_CLASS, + pThreadInfo, + g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], + g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID], + g_queryInfo.specifiedQueryInfo.subscribeRestart, + g_queryInfo.specifiedQueryInfo.subscribeInterval); if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) { - taos_close(pThreadInfo->taos); - return NULL; + taos_close(pThreadInfo->taos); + return NULL; } } } @@ -6975,17 +7581,17 @@ static int subscribeTestProcess() { } for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { - uint64_t startFrom = 0; + uint64_t tableFrom = 0; for (int j = 0; j < threads; j++) { uint64_t seq = i * threads + j; threadInfo *pThreadInfo = infosOfStable + seq; pThreadInfo->threadID = seq; pThreadInfo->querySeq = i; - pThreadInfo->start_table_from = startFrom; + pThreadInfo->start_table_from = tableFrom; pThreadInfo->ntables = jend_table_to = jend_table_to + 1; + pThreadInfo->end_table_to = jend_table_to + 1; pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection; pthread_create(pidsOfStable + seq, NULL, superSubscribe, pThreadInfo); @@ -7065,7 +7671,7 @@ static void setParaFromArg(){ g_Dbs.threadCountByCreateTbl = g_args.num_of_threads; g_Dbs.dbCount = 1; - g_Dbs.db[0].drop = 1; + g_Dbs.db[0].drop = true; tstrncpy(g_Dbs.db[0].dbName, g_args.database, MAX_DB_NAME_SIZE); g_Dbs.db[0].dbCfg.replica = g_args.replica; @@ -7104,7 +7710,7 @@ static void setParaFromArg(){ tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix, g_args.tb_prefix, MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE); - tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE); + g_Dbs.db[0].superTbls[0].iface = g_args.iface; tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp, "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP; diff --git a/src/os/src/detail/osTime.c b/src/os/src/detail/osTime.c index d9d070218e..67e0c2642e 100644 --- a/src/os/src/detail/osTime.c +++ b/src/os/src/detail/osTime.c @@ -87,12 +87,12 @@ static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t time int32_t taosGetTimestampSec() { return (int32_t)time(NULL); } -int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t daylight) { +int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) { /* parse datatime string in with tz */ if (strnchr(timestr, 'T', len, false) != NULL) { return parseTimeWithTz(timestr, time, timePrec); } else { - return (*parseLocaltimeFp[daylight])(timestr, time, timePrec); + return (*parseLocaltimeFp[day_light])(timestr, time, timePrec); } } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index eca2a25a35..f97a0c4a74 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2845,6 +2845,8 @@ static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, if (tagColId == TSDB_TBNAME_COLUMN_INDEX) { val = tsdbGetTableName(pTable); assert(val != NULL); + } else if (tagColId == TSDB_BLOCK_DIST_COLUMN_INDEX) { + val = NULL; } else { val = tsdbGetTableTagVal(pTable, tagColId, type, bytes); } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 0a4ea5e153..88293cfbbb 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -68,7 +68,7 @@ int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) { TABLE_CHAR_NAME(pMeta->tables[tid]), TABLE_TID(pMeta->tables[tid]), TABLE_UID(pMeta->tables[tid])); return 0; } else { - tsdbError("vgId:%d table %s at tid %d uid %" PRIu64 + tsdbInfo("vgId:%d table %s at tid %d uid %" PRIu64 " exists, replace it with new table, this can be not reasonable", REPO_ID(pRepo), TABLE_CHAR_NAME(pMeta->tables[tid]), TABLE_TID(pMeta->tables[tid]), TABLE_UID(pMeta->tables[tid])); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 1b24405952..1cc0780b9d 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -367,40 +367,39 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC goto out_of_memory; } - assert(pCond != NULL && pCond->numOfCols > 0 && pMemRef != NULL); + assert(pCond != NULL && pMemRef != NULL); if (ASCENDING_TRAVERSE(pCond->order)) { assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey); } else { assert(pQueryHandle->window.skey >= pQueryHandle->window.ekey); } - - // allocate buffer in order to load data blocks from file - pQueryHandle->statis = calloc(pCond->numOfCols, sizeof(SDataStatis)); - if (pQueryHandle->statis == NULL) { - goto out_of_memory; - } - - pQueryHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array? - if (pQueryHandle->pColumns == NULL) { - goto out_of_memory; - } - - for (int32_t i = 0; i < pCond->numOfCols; ++i) { - SColumnInfoData colInfo = {{0}, 0}; - - colInfo.info = pCond->colList[i]; - colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes); - if (colInfo.pData == NULL) { + if (pCond->numOfCols > 0) { + // allocate buffer in order to load data blocks from file + pQueryHandle->statis = calloc(pCond->numOfCols, sizeof(SDataStatis)); + if (pQueryHandle->statis == NULL) { goto out_of_memory; } - taosArrayPush(pQueryHandle->pColumns, &colInfo); - pQueryHandle->statis[i].colId = colInfo.info.colId; - } - if (pCond->numOfCols > 0) { + pQueryHandle->pColumns = + taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array? + if (pQueryHandle->pColumns == NULL) { + goto out_of_memory; + } + + for (int32_t i = 0; i < pCond->numOfCols; ++i) { + SColumnInfoData colInfo = {{0}, 0}; + + colInfo.info = pCond->colList[i]; + colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes); + if (colInfo.pData == NULL) { + goto out_of_memory; + } + taosArrayPush(pQueryHandle->pColumns, &colInfo); + pQueryHandle->statis[i].colId = colInfo.info.colId; + } + pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true); } - STsdbMeta* pMeta = tsdbGetMeta(tsdb); assert(pMeta != NULL); diff --git a/src/vnode/src/vnodeMgmt.c b/src/vnode/src/vnodeMgmt.c index 32f9532138..7e6022fc87 100644 --- a/src/vnode/src/vnodeMgmt.c +++ b/src/vnode/src/vnodeMgmt.c @@ -91,18 +91,18 @@ static void vnodeIncRef(void *ptNode) { } void *vnodeAcquire(int32_t vgId) { - SVnodeObj **ppVnode = NULL; + SVnodeObj *pVnode = NULL; if (tsVnodesHash != NULL) { - ppVnode = taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *)); + taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, &pVnode, sizeof(void *)); } - if (ppVnode == NULL || *ppVnode == NULL) { + if (pVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; vDebug("vgId:%d, not exist", vgId); return NULL; } - return *ppVnode; + return pVnode; } void vnodeRelease(void *vparam) { diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 16089c8e91..555eda6d13 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -303,6 +303,17 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) { } int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { + SVnodeObj *pVnode = vparam; + if (qtype == TAOS_QTYPE_RPC) { + if (!vnodeInReadyStatus(pVnode)) { + return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state + } + + if (pVnode->role != TAOS_SYNC_ROLE_MASTER) { + return TSDB_CODE_APP_NOT_READY; + } + } + SVWriteMsg *pWrite = vnodeBuildVWriteMsg(vparam, wparam, qtype, rparam); if (pWrite == NULL) { assert(terrno != 0); diff --git a/tests/pytest/crash_gen/crash_gen_main.py b/tests/pytest/crash_gen/crash_gen_main.py index 644aa79916..b743eee2ef 100755 --- a/tests/pytest/crash_gen/crash_gen_main.py +++ b/tests/pytest/crash_gen/crash_gen_main.py @@ -37,6 +37,7 @@ import requests import gc import taos + from .shared.types import TdColumns, TdTags # from crash_gen import ServiceManager, TdeInstance, TdeSubProcess @@ -160,6 +161,7 @@ class WorkerThread: Logging.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...") break + # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more) try: if (Config.getConfig().per_thread_db_connection): # most likely TRUE @@ -1362,9 +1364,12 @@ class Task(): Progress.emit(Progress.ACCEPTABLE_ERROR) self._err = err else: # not an acceptable error - errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format( + shortTid = threading.get_ident() % 10000 + errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, thread={}, msg: {}, SQL: {}".format( self.__class__.__name__, - errno2, err, wt.getDbConn().getLastSql()) + errno2, + shortTid, + err, wt.getDbConn().getLastSql()) self.logDebug(errMsg) if Config.getConfig().debug: # raise # so that we see full stack @@ -1411,21 +1416,31 @@ class Task(): def lockTable(self, ftName): # full table name # print(" <<" + ftName + '_', end="", flush=True) - with Task._lock: - if not ftName in Task._tableLocks: + with Task._lock: # SHORT lock! so we only protect lock creation + if not ftName in Task._tableLocks: # Create new lock and add to list, if needed Task._tableLocks[ftName] = threading.Lock() - Task._tableLocks[ftName].acquire() + # No lock protection, anybody can do this any time + lock = Task._tableLocks[ftName] + # Logging.info("Acquiring lock: {}, {}".format(ftName, lock)) + lock.acquire() + # Logging.info("Acquiring lock successful: {}".format(lock)) def unlockTable(self, ftName): # print('_' + ftName + ">> ", end="", flush=True) - with Task._lock: + with Task._lock: if not ftName in self._tableLocks: raise RuntimeError("Corrupt state, no such lock") lock = Task._tableLocks[ftName] if not lock.locked(): raise RuntimeError("Corrupte state, already unlocked") - lock.release() + + # Important note, we want to protect unlocking under the task level + # locking, because we don't want the lock to be deleted (maybe in the futur) + # while we unlock it + # Logging.info("Releasing lock: {}".format(lock)) + lock.release() + # Logging.info("Releasing lock successful: {}".format(lock)) class ExecutionStats: @@ -1696,6 +1711,11 @@ class TdSuperTable: return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0 def ensureRegTable(self, task: Optional[Task], dbc: DbConn, regTableName: str): + ''' + Make sure a regular table exists for this super table, creating it if necessary. + If there is an associated "Task" that wants to do this, "lock" this table so that + others don't access it while we create it. + ''' dbName = self._dbName sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName) if dbc.query(sql) >= 1 : # reg table exists already @@ -1703,18 +1723,24 @@ class TdSuperTable: # acquire a lock first, so as to be able to *verify*. More details in TD-1471 fullTableName = dbName + '.' + regTableName - if task is not None: # TODO: what happens if we don't lock the table - task.lockTable(fullTableName) + if task is not None: # Somethime thie operation is requested on behalf of a "task" + # Logging.info("Locking table for creation: {}".format(fullTableName)) + task.lockTable(fullTableName) # in which case we'll lock this table to ensure serialized access + # Logging.info("Table locked for creation".format(fullTableName)) Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table # print("(" + fullTableName[-3:] + ")", end="", flush=True) try: sql = "CREATE TABLE {} USING {}.{} tags ({})".format( fullTableName, dbName, self._stName, self._getTagStrForSql(dbc) ) + # Logging.info("Creating regular with SQL: {}".format(sql)) dbc.execute(sql) + # Logging.info("Regular table created: {}".format(sql)) finally: if task is not None: + # Logging.info("Unlocking table after creation: {}".format(fullTableName)) task.unlockTable(fullTableName) # no matter what + # Logging.info("Table unlocked after creation: {}".format(fullTableName)) def _getTagStrForSql(self, dbc) : tags = self._getTags(dbc) @@ -2011,9 +2037,30 @@ class TaskAddData(StateTransitionTask): def canBeginFrom(cls, state: AnyState): return state.canAddData() + def _lockTableIfNeeded(self, fullTableName, extraMsg = ''): + if Config.getConfig().verify_data: + # Logging.info("Locking table: {}".format(fullTableName)) + self.lockTable(fullTableName) + # Logging.info("Table locked {}: {}".format(extraMsg, fullTableName)) + # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written + else: + # Logging.info("Skipping locking table") + pass + + def _unlockTableIfNeeded(self, fullTableName): + if Config.getConfig().verify_data: + # Logging.info("Unlocking table: {}".format(fullTableName)) + self.unlockTable(fullTableName) + # Logging.info("Table unlocked: {}".format(fullTableName)) + else: + pass + # Logging.info("Skipping unlocking table") + def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor): numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS + fullTableName = db.getName() + '.' + regTableName + self._lockTableIfNeeded(fullTableName, 'batch') sql = "INSERT INTO {} VALUES ".format(fullTableName) for j in range(numRecords): # number of records per table @@ -2021,51 +2068,60 @@ class TaskAddData(StateTransitionTask): nextTick = db.getNextTick() nextColor = db.getNextColor() sql += "('{}', {}, '{}');".format(nextTick, nextInt, nextColor) - dbc.execute(sql) + + # Logging.info("Adding data in batch: {}".format(sql)) + try: + dbc.execute(sql) + finally: + # Logging.info("Data added in batch: {}".format(sql)) + self._unlockTableIfNeeded(fullTableName) + + def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS for j in range(numRecords): # number of records per table - nextInt = db.getNextInt() + intToWrite = db.getNextInt() nextTick = db.getNextTick() nextColor = db.getNextColor() if Config.getConfig().record_ops: self.prepToRecordOps() if self.fAddLogReady is None: raise CrashGenError("Unexpected empty fAddLogReady") - self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) + self.fAddLogReady.write("Ready to write {} to {}\n".format(intToWrite, regTableName)) self.fAddLogReady.flush() os.fsync(self.fAddLogReady.fileno()) # TODO: too ugly trying to lock the table reliably, refactor... fullTableName = db.getName() + '.' + regTableName - if Config.getConfig().verify_data: - self.lockTable(fullTableName) - # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written - + self._lockTableIfNeeded(fullTableName) # so that we are verify read-back. TODO: deal with exceptions before unlock + try: sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {}) fullTableName, # ds.getFixedSuperTableName(), # ds.getNextBinary(), ds.getNextFloat(), - nextTick, nextInt, nextColor) + nextTick, intToWrite, nextColor) + # Logging.info("Adding data: {}".format(sql)) dbc.execute(sql) + # Logging.info("Data added: {}".format(sql)) + intWrote = intToWrite # Quick hack, attach an update statement here. TODO: create an "update" task if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB - nextInt = db.getNextInt() + intToUpdate = db.getNextInt() # Updated, but should not succeed nextColor = db.getNextColor() sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here fullTableName, - nextTick, nextInt, nextColor) + nextTick, intToUpdate, nextColor) # sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format( # fullTableName, db.getNextInt(), db.getNextColor(), nextTick) dbc.execute(sql) + intWrote = intToUpdate # We updated, seems TDengine non-cluster accepts this. except: # Any exception at all - if Config.getConfig().verify_data: - self.unlockTable(fullTableName) + self._unlockTableIfNeeded(fullTableName) raise # Now read it back and verify, we might encounter an error if table is dropped @@ -2073,33 +2129,41 @@ class TaskAddData(StateTransitionTask): try: readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'". format(db.getName(), regTableName, nextTick)) - if readBack != nextInt : + if readBack != intWrote : raise taos.error.ProgrammingError( "Failed to read back same data, wrote: {}, read: {}" - .format(nextInt, readBack), 0x999) + .format(intWrote, readBack), 0x999) except taos.error.ProgrammingError as err: errno = Helper.convertErrno(err.errno) - if errno in [CrashGenError.INVALID_EMPTY_RESULT, CrashGenError.INVALID_MULTIPLE_RESULT] : # not a single result + if errno == CrashGenError.INVALID_EMPTY_RESULT: # empty result raise taos.error.ProgrammingError( - "Failed to read back same data for tick: {}, wrote: {}, read: {}" - .format(nextTick, nextInt, "Empty Result" if errno == CrashGenError.INVALID_EMPTY_RESULT else "Multiple Result"), + "Failed to read back same data for tick: {}, wrote: {}, read: EMPTY" + .format(nextTick, intWrote), + errno) + elif errno == CrashGenError.INVALID_MULTIPLE_RESULT : # multiple results + raise taos.error.ProgrammingError( + "Failed to read back same data for tick: {}, wrote: {}, read: MULTIPLE RESULTS" + .format(nextTick, intWrote), errno) elif errno in [0x218, 0x362]: # table doesn't exist # do nothing - dummy = 0 + pass else: # Re-throw otherwise raise finally: - self.unlockTable(fullTableName) # Unlock the table no matter what + self._unlockTableIfNeeded(fullTableName) # Quite ugly, refactor lock/unlock + # Done with read-back verification, unlock the table now + else: + self._unlockTableIfNeeded(fullTableName) # Successfully wrote the data into the DB, let's record it somehow - te.recordDataMark(nextInt) + te.recordDataMark(intWrote) if Config.getConfig().record_ops: if self.fAddLogDone is None: raise CrashGenError("Unexpected empty fAddLogDone") - self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) + self.fAddLogDone.write("Wrote {} to {}\n".format(intWrote, regTableName)) self.fAddLogDone.flush() os.fsync(self.fAddLogDone.fileno()) @@ -2137,15 +2201,16 @@ class TaskAddData(StateTransitionTask): class ThreadStacks: # stack info for all threads def __init__(self): self._allStacks = {} - allFrames = sys._current_frames() - for th in threading.enumerate(): + allFrames = sys._current_frames() # All current stack frames + for th in threading.enumerate(): # For each thread if th.ident is None: continue - stack = traceback.extract_stack(allFrames[th.ident]) - self._allStacks[th.native_id] = stack + stack = traceback.extract_stack(allFrames[th.ident]) # Get stack for a thread + shortTid = th.ident % 10000 + self._allStacks[shortTid] = stack # Was using th.native_id def print(self, filteredEndName = None, filterInternal = False): - for thNid, stack in self._allStacks.items(): # for each thread, stack frames top to bottom + for tIdent, stack in self._allStacks.items(): # for each thread, stack frames top to bottom lastFrame = stack[-1] if filteredEndName: # we need to filter out stacks that match this name if lastFrame.name == filteredEndName : # end did not match @@ -2157,7 +2222,7 @@ class ThreadStacks: # stack info for all threads '__init__']: # the thread that extracted the stack continue # ignore # Now print - print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(thNid)) + print("\n<----- Thread Info for LWP/ID: {} (most recent call last) <-----".format(tIdent)) stackFrame = 0 for frame in stack: # was using: reversed(stack) # print(frame) @@ -2376,7 +2441,7 @@ class MainExec: action='store', default=0, type=int, - help='Maximum number of DBs to keep, set to disable dropping DB. (default: 0)') + help='Number of DBs to use, set to disable dropping DB. (default: 0)') parser.add_argument( '-c', '--connector-type', diff --git a/tests/pytest/crash_gen/service_manager.py b/tests/pytest/crash_gen/service_manager.py index 1cd65c1dde..c6685ec469 100644 --- a/tests/pytest/crash_gen/service_manager.py +++ b/tests/pytest/crash_gen/service_manager.py @@ -179,7 +179,7 @@ quorum 2 def getServiceCmdLine(self): # to start the instance if Config.getConfig().track_memory_leaks: Logging.info("Invoking VALGRIND on service...") - return ['exec /usr/bin/valgrind', '--leak-check=yes', self.getExecFile(), '-c', self.getCfgDir()] + return ['exec valgrind', '--leak-check=yes', self.getExecFile(), '-c', self.getCfgDir()] else: # TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control return ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen() @@ -310,7 +310,7 @@ class TdeSubProcess: # print("Starting TDengine with env: ", myEnv.items()) print("Starting TDengine: {}".format(cmdLine)) - return Popen( + ret = Popen( ' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine, shell=True, # Always use shell, since we need to pass ENV vars stdout=PIPE, @@ -318,6 +318,10 @@ class TdeSubProcess: close_fds=ON_POSIX, env=myEnv ) # had text=True, which interferred with reading EOF + time.sleep(0.01) # very brief wait, then let's check if sub process started successfully. + if ret.poll(): + raise CrashGenError("Sub process failed to start with command line: {}".format(cmdLine)) + return ret STOP_SIGNAL = signal.SIGINT # signal.SIGKILL/SIGINT # What signal to use (in kill) to stop a taosd process? SIG_KILL_RETCODE = 137 # ref: https://stackoverflow.com/questions/43268156/process-finished-with-exit-code-137-in-pycharm @@ -614,7 +618,7 @@ class ServiceManager: # Find if there's already a taosd service, and then kill it for proc in psutil.process_iter(): - if proc.name() == 'taosd': + if proc.name() == 'taosd' or proc.name() == 'memcheck-amd64-': # Regular or under Valgrind Logging.info("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt") time.sleep(2.0) proc.kill() diff --git a/tests/pytest/crash_gen/shared/misc.py b/tests/pytest/crash_gen/shared/misc.py index 90ad802ff1..78923bcc29 100644 --- a/tests/pytest/crash_gen/shared/misc.py +++ b/tests/pytest/crash_gen/shared/misc.py @@ -35,7 +35,8 @@ class LoggingFilter(logging.Filter): class MyLoggingAdapter(logging.LoggerAdapter): def process(self, msg, kwargs): - return "[{:04d}] {}".format(threading.get_ident() % 10000, msg), kwargs + shortTid = threading.get_ident() % 10000 + return "[{:04d}] {}".format(shortTid, msg), kwargs # return '[%s] %s' % (self.extra['connid'], msg), kwargs diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index d8e2a31e70..24ae0ad557 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -31,7 +31,7 @@ python3 ./test.py -f table/column_name.py python3 ./test.py -f table/column_num.py python3 ./test.py -f table/db_table.py python3 ./test.py -f table/create_sensitive.py -#python3 ./test.py -f table/tablename-boundary.py +python3 ./test.py -f table/tablename-boundary.py python3 ./test.py -f table/max_table_length.py python3 ./test.py -f table/alter_column.py python3 ./test.py -f table/boundary.py @@ -332,5 +332,5 @@ python3 ./test.py -f tag_lite/alter_tag.py python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertWithJson.py python3 test.py -f tools/taosdemoAllTest/taosdemoTestQueryWithJson.py - +python3 ./test.py -f tag_lite/drop_auto_create.py #======================p4-end=============== diff --git a/tests/pytest/import_merge/importCSV.py b/tests/pytest/import_merge/importCSV.py index b4441949a1..24ebffd485 100644 --- a/tests/pytest/import_merge/importCSV.py +++ b/tests/pytest/import_merge/importCSV.py @@ -82,6 +82,8 @@ class TDTestCase: tdSql.execute("import into tbx file \'%s\'"%(self.csvfile)) tdSql.query('select * from tbx') tdSql.checkRows(self.rows) + #TD-4447 import the same csv twice + tdSql.execute("import into tbx file \'%s\'"%(self.csvfile)) def stop(self): self.destroyCSVFile() diff --git a/tests/pytest/insert/nchar.py b/tests/pytest/insert/nchar.py index 3319aa3c56..5ad52b96a1 100644 --- a/tests/pytest/insert/nchar.py +++ b/tests/pytest/insert/nchar.py @@ -36,6 +36,10 @@ class TDTestCase: tdSql.checkData(1, 1, '涛思数据') tdSql.error("insert into tb values (now, 'taosdata001')") + + tdSql.error("insert into tb(now, 😀)") + tdSql.query("select * from tb") + tdSql.checkRows(2) def stop(self): tdSql.close() diff --git a/tests/pytest/table/tablename-boundary.py b/tests/pytest/table/tablename-boundary.py index 0755e75355..dc22c3343b 100644 --- a/tests/pytest/table/tablename-boundary.py +++ b/tests/pytest/table/tablename-boundary.py @@ -14,6 +14,13 @@ class TDTestCase: tdLog.debug("start to execute %s" % __file__) tdSql.init(conn.cursor(), logSql) + self.ts = 1622100000000 + + def get_random_string(self, length): + letters = string.ascii_lowercase + result_str = ''.join(random.choice(letters) for i in range(length)) + return result_str + def run(self): tdSql.prepare() @@ -24,19 +31,62 @@ class TDTestCase: shell=True)) - 1 tdLog.info("table name max length is %d" % tableNameMaxLen) chars = string.ascii_uppercase + string.ascii_lowercase - tb_name = ''.join(random.choices(chars, k=tableNameMaxLen)) + tb_name = ''.join(random.choices(chars, k=tableNameMaxLen + 1)) tdLog.info('tb_name length %d' % len(tb_name)) tdLog.info('create table %s (ts timestamp, value int)' % tb_name) - tdSql.error( - 'create table %s (ts timestamp, speed binary(4089))' % - tb_name) + tdSql.error('create table %s (ts timestamp, speed binary(4089))' % tb_name) - tb_name = ''.join(random.choices(chars, k=191)) + tb_name = ''.join(random.choices(chars, k=tableNameMaxLen)) tdLog.info('tb_name length %d' % len(tb_name)) tdLog.info('create table %s (ts timestamp, value int)' % tb_name) tdSql.execute( 'create table %s (ts timestamp, speed binary(4089))' % tb_name) + + db_name = self.get_random_string(33) + tdSql.error("create database %s" % db_name) + + db_name = self.get_random_string(32) + tdSql.execute("create database %s" % db_name) + tdSql.execute("use %s" % db_name) + + tb_name = self.get_random_string(193) + tdSql.error("create table %s(ts timestamp, val int)" % tb_name) + + tb_name = self.get_random_string(192) + tdSql.execute("create table %s.%s(ts timestamp, val int)" % (db_name, tb_name)) + tdSql.query("show %s.tables" % db_name) + tdSql.checkRows(1) + tdSql.checkData(0, 0, tb_name) + + tdSql.execute("insert into %s.%s values(now, 1)" % (db_name, tb_name)) + tdSql.query("select * from %s.%s" %(db_name, tb_name)) + tdSql.checkRows(1) + + db_name = self.get_random_string(32) + tdSql.execute("create database %s update 1" % db_name) + + stb_name = self.get_random_string(192) + tdSql.execute("create table %s.%s(ts timestamp, val int) tags(id int)" % (db_name, stb_name)) + tb_name1 = self.get_random_string(192) + tdSql.execute("insert into %s.%s using %s.%s tags(1) values(%d, 1)(%d, 2)(%d, 3)" % (db_name, tb_name1, db_name, stb_name, self.ts, self.ts + 1, self.ts + 2)) + tb_name2 = self.get_random_string(192) + tdSql.execute("insert into %s.%s using %s.%s tags(2) values(%d, 1)(%d, 2)(%d, 3)" % (db_name, tb_name2, db_name, stb_name, self.ts, self.ts + 1, self.ts + 2)) + + tdSql.query("show %s.tables" % db_name) + tdSql.checkRows(2) + + tdSql.query("select * from %s.%s" % (db_name, stb_name)) + tdSql.checkRows(6) + + tdSql.execute("insert into %s.%s using %s.%s tags(1) values(%d, null)" % (db_name, tb_name1, db_name, stb_name, self.ts)) + + tdSql.query("select * from %s.%s" % (db_name, stb_name)) + tdSql.checkRows(6) + + + + def stop(self): tdSql.close() diff --git a/tests/pytest/tag_lite/drop_auto_create.py b/tests/pytest/tag_lite/drop_auto_create.py new file mode 100644 index 0000000000..f89b41008b --- /dev/null +++ b/tests/pytest/tag_lite/drop_auto_create.py @@ -0,0 +1,47 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from util.log import * +from util.cases import * +from util.sql import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def run(self): + tdSql.prepare() + + tdSql.execute('create table m1(ts timestamp, k int) tags(a binary(12), b int, c double);') + tdSql.execute('insert into tm0 using m1(b,c) tags(1, 99) values(now, 1);') + tdSql.execute('insert into tm1 using m1(b,c) tags(2, 100) values(now, 2);') + tdLog.info("2 rows inserted") + tdSql.query('select * from m1;') + tdSql.checkRows(2) + tdSql.query('select *,tbname from m1;') + tdSql.execute("drop table tm0; ") + tdSql.query('select * from m1') + tdSql.checkRows(1) + + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/script/fullGeneralSuite.sim b/tests/script/fullGeneralSuite.sim index cde51ebdbf..2cd2236200 100644 --- a/tests/script/fullGeneralSuite.sim +++ b/tests/script/fullGeneralSuite.sim @@ -33,6 +33,7 @@ run general/compute/percentile.sim run general/compute/stddev.sim run general/compute/sum.sim run general/compute/top.sim +run general/compute/block_dist.sim run general/db/alter_option.sim run general/db/alter_tables_d2.sim run general/db/alter_tables_v1.sim diff --git a/tests/script/general/compute/block_dist.sim b/tests/script/general/compute/block_dist.sim new file mode 100644 index 0000000000..51cf903654 --- /dev/null +++ b/tests/script/general/compute/block_dist.sim @@ -0,0 +1,94 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c walLevel -v 1 +system sh/exec.sh -n dnode1 -s start +sleep 2000 +sql connect + +$dbPrefix = m_di_db +$tbPrefix = m_di_tb +$mtPrefix = m_di_mt +$ntPrefix = m_di_nt +$tbNum = 1 +$rowNum = 2000 + +print =============== step1 +$i = 0 +$db = $dbPrefix . $i +$mt = $mtPrefix . $i +$nt = $ntPrefix . $i + +sql drop database $db -x step1 +step1: +sql create database $db +sql use $db +sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int) + +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + sql create table $tb using $mt tags( $i ) + + $x = 0 + while $x < $rowNum + $cc = $x * 60000 + $ms = 1601481600000 + $cc + sql insert into $tb values ($ms , $x ) + $x = $x + 1 + endw + + $i = $i + 1 +endw + +sql create table $nt (ts timestamp, tbcol int) +$x = 0 +while $x < $rowNum + $cc = $x * 60000 + $ms = 1601481600000 + $cc + sql insert into $nt values ($ms , $x ) + $x = $x + 1 +endw + +sleep 100 + +print =============== step2 +$i = 0 +$tb = $tbPrefix . $i + +sql select _block_dist() from $tb + +if $rows != 1 then + print expect 1, actual:$rows + return -1 +endi + +print =============== step3 +$i = 0 +$mt = $mtPrefix . $i +sql select _block_dist() from $mt + +if $rows != 1 then + print expect 1, actual:$rows + return -1 +endi + +print =============== step4 +$i = 0 +$nt = $ntPrefix . $i + +sql select _block_dist() from $nt + +if $rows != 1 then + print expect 1, actual:$rows + return -1 +endi + +print =============== clear +sql drop database $db +sql show databases +if $rows != 0 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/general/compute/testSuite.sim b/tests/script/general/compute/testSuite.sim index 6cd6badaee..91bf4bf0cd 100644 --- a/tests/script/general/compute/testSuite.sim +++ b/tests/script/general/compute/testSuite.sim @@ -14,3 +14,4 @@ run general/compute/percentile.sim run general/compute/stddev.sim run general/compute/sum.sim run general/compute/top.sim +run general/compute/block_dist.sim diff --git a/tests/script/regressionSuite.sim b/tests/script/regressionSuite.sim index e5e2194e87..d5742cd98f 100644 --- a/tests/script/regressionSuite.sim +++ b/tests/script/regressionSuite.sim @@ -32,6 +32,7 @@ run general/compute/percentile.sim run general/compute/stddev.sim run general/compute/sum.sim run general/compute/top.sim +run general/compute/block_dist.sim run general/db/alter_option.sim run general/db/alter_tables_d2.sim run general/db/alter_tables_v1.sim