From 13004a574010119332f99c59d8ed1ac591a149c4 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 25 May 2021 19:41:36 +0800 Subject: [PATCH] Feature/sangshuduo/td 4068 taosdemo stmt (#6225) * merge with develop branch. change query/tests/CMakeLists.txt to allow unused function and variable. * refactor data generating. * refactor. * refactor. * refactor. * refactor. * refactor * add prepare stmt function. * refactor get rand timestamp. * fix windows compile error. Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 341 ++++++++++++++++++++++-------------- 1 file changed, 214 insertions(+), 127 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 6f651c18cf..bae0476b17 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -19,6 +19,7 @@ */ #include +#include #define _GNU_SOURCE #define CURL_STATICLIB @@ -229,13 +230,13 @@ typedef struct SArguments_S { uint32_t num_of_threads; uint64_t insert_interval; int64_t query_times; - uint64_t interlace_rows; - uint64_t num_of_RPR; // num_of_records_per_req + uint32_t interlace_rows; + uint32_t num_of_RPR; // num_of_records_per_req uint64_t max_sql_len; int64_t num_of_tables; int64_t num_of_DPT; int abort; - int disorderRatio; // 0: no disorder, >0: x% + uint32_t disorderRatio; // 0: no disorder, >0: x% int disorderRange; // ms or us by database precision uint32_t method_of_delete; char ** arg_list; @@ -258,12 +259,12 @@ typedef struct SSuperTable_S { uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table char childTblPrefix[MAX_TB_NAME_SIZE]; char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample - uint16_t insertMode; // 0: taosc, 1: rest, 2: stmt + uint16_t iface; // 0: taosc, 1: rest, 2: stmt int64_t childTblLimit; uint64_t childTblOffset; // int multiThreadWriteOneTbl; // 0: no, 1: yes - uint64_t interlaceRows; // + uint32_t interlaceRows; // int disorderRatio; // 0: no disorder, >0: x% int disorderRange; // ms or us by database precision uint64_t maxSqlLen; // @@ -551,6 +552,8 @@ static void createChildTables(); static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet); static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port, char* sqlstr, threadInfo *pThreadInfo); +static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, + int disorderRatio, int disorderRange); /* ************ Global variables ************ */ @@ -1070,7 +1073,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } printf("# Insertion interval: %"PRIu64"\n", arguments->insert_interval); - printf("# Number of records per req: %"PRIu64"\n", + printf("# Number of records per req: %ud\n", arguments->num_of_RPR); printf("# Max SQL length: %"PRIu64"\n", arguments->max_sql_len); @@ -1362,7 +1365,7 @@ static int printfInsertMeta() { g_Dbs.threadCountByCreateTbl); printf("top insert interval: \033[33m%"PRIu64"\033[0m\n", g_args.insert_interval); - printf("number of records per req: \033[33m%"PRIu64"\033[0m\n", + printf("number of records per req: \033[33m%ud\033[0m\n", g_args.num_of_RPR); printf("max sql length: \033[33m%"PRIu64"\033[0m\n", g_args.max_sql_len); @@ -1468,9 +1471,9 @@ static int printfInsertMeta() { g_Dbs.db[i].superTbls[j].childTblPrefix); printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); - printf(" insertMode: \033[33m%s\033[0m\n", - (g_Dbs.db[i].superTbls[j].insertMode==TAOSC_IFACE)?"taosc": - (g_Dbs.db[i].superTbls[j].insertMode==REST_IFACE)?"rest":"stmt"); + printf(" iface: \033[33m%s\033[0m\n", + (g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc": + (g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt"); if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) { printf(" childTblLimit: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].childTblLimit); @@ -1488,7 +1491,7 @@ static int printfInsertMeta() { printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n"); } */ - printf(" interlaceRows: \033[33m%"PRIu64"\033[0m\n", + printf(" interlaceRows: \033[33m%ud\033[0m\n", g_Dbs.db[i].superTbls[j].interlaceRows); if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) { @@ -1566,7 +1569,7 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile); fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount); fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl); - fprintf(fp, "number of records per req: %"PRIu64"\n", g_args.num_of_RPR); + fprintf(fp, "number of records per req: %ud\n", g_args.num_of_RPR); fprintf(fp, "max sql length: %"PRIu64"\n", g_args.max_sql_len); fprintf(fp, "database count: %d\n", g_Dbs.dbCount); @@ -1658,12 +1661,12 @@ static void printfInsertMetaToFile(FILE* fp) { g_Dbs.db[i].superTbls[j].childTblPrefix); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); - fprintf(fp, " insertMode: %s\n", - (g_Dbs.db[i].superTbls[j].insertMode==TAOSC_IFACE)?"taosc": - (g_Dbs.db[i].superTbls[j].insertMode==REST_IFACE)?"rest":"stmt"); + fprintf(fp, " iface: %s\n", + (g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc": + (g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt"); fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); - fprintf(fp, " interlace rows: %"PRIu64"\n", + fprintf(fp, " interlace rows: %ud\n", g_Dbs.db[i].superTbls[j].interlaceRows); if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) { fprintf(fp, " stable insert interval: %"PRIu64"\n", @@ -1676,7 +1679,7 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " multiThreadWriteOneTbl: yes\n"); } */ - fprintf(fp, " interlaceRows: %"PRIu64"\n", + fprintf(fp, " interlaceRows: %ud\n", g_Dbs.db[i].superTbls[j].interlaceRows); fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange); @@ -3563,9 +3566,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { // rows per table need be less than insert batch if (g_args.interlace_rows > g_args.num_of_RPR) { - printf("NOTICE: interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n", + printf("NOTICE: interlace rows value %ud > num_of_records_per_req %ud\n\n", g_args.interlace_rows, g_args.num_of_RPR); - printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n", + printf(" interlace rows value will be set to num_of_records_per_req %ud\n\n", g_args.num_of_RPR); prompt(); g_args.interlace_rows = g_args.num_of_RPR; @@ -3880,22 +3883,22 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } - cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt - if (insertMode && insertMode->type == cJSON_String - && insertMode->valuestring != NULL) { - if (0 == strcasecmp(insertMode->valuestring, "taosc")) { - g_Dbs.db[i].superTbls[j].insertMode = TAOSC_IFACE; - } else if (0 == strcasecmp(insertMode->valuestring, "rest")) { - g_Dbs.db[i].superTbls[j].insertMode = REST_IFACE; - } else if (0 == strcasecmp(insertMode->valuestring, "stmt")) { - g_Dbs.db[i].superTbls[j].insertMode = STMT_IFACE; + cJSON *stbIface = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt + if (stbIface && stbIface->type == cJSON_String + && stbIface->valuestring != NULL) { + if (0 == strcasecmp(stbIface->valuestring, "taosc")) { + g_Dbs.db[i].superTbls[j].iface= TAOSC_IFACE; + } else if (0 == strcasecmp(stbIface->valuestring, "rest")) { + g_Dbs.db[i].superTbls[j].iface= REST_IFACE; + } else if (0 == strcasecmp(stbIface->valuestring, "stmt")) { + g_Dbs.db[i].superTbls[j].iface= STMT_IFACE; } else { errorPrint("%s() LN%d, failed to read json, insert_mode %s not recognized\n", - __func__, __LINE__, insertMode->valuestring); + __func__, __LINE__, stbIface->valuestring); goto PARSE_OVER; } - } else if (!insertMode) { - g_Dbs.db[i].superTbls[j].insertMode = TAOSC_IFACE; + } else if (!stbIface) { + g_Dbs.db[i].superTbls[j].iface = TAOSC_IFACE; } else { errorPrint("%s", "failed to read json, insert_mode not found\n"); goto PARSE_OVER; @@ -4032,9 +4035,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint; // rows per table need be less than insert batch if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) { - printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n", - i, j, g_Dbs.db[i].superTbls[j].interlaceRows, g_args.num_of_RPR); - printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n", + printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %ud > num_of_records_per_req %ud\n\n", + i, j, g_Dbs.db[i].superTbls[j].interlaceRows, + g_args.num_of_RPR); + printf(" interlace rows value will be set to num_of_records_per_req %ud\n\n", g_args.num_of_RPR); prompt(); g_Dbs.db[i].superTbls[j].interlaceRows = g_args.num_of_RPR; @@ -4864,11 +4868,11 @@ static int64_t execInsert(threadInfo *pThreadInfo, uint64_t k) verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer); if (superTblInfo) { - if (superTblInfo->insertMode == TAOSC_IFACE) { + if (superTblInfo->iface == TAOSC_IFACE) { affectedRows = queryDbExec( pThreadInfo->taos, pThreadInfo->buffer, INSERT_TYPE, false); - } else if (superTblInfo->insertMode == REST_IFACE) { + } else if (superTblInfo->iface == REST_IFACE) { if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, pThreadInfo->buffer, NULL /* not set result file */)) { affectedRows = -1; @@ -4877,7 +4881,7 @@ static int64_t execInsert(threadInfo *pThreadInfo, uint64_t k) } else { affectedRows = k; } - } else if (superTblInfo->insertMode == STMT_IFACE) { + } else if (superTblInfo->iface == STMT_IFACE) { debugPrint("%s() LN%d, stmt=%p", __func__, __LINE__, pThreadInfo->stmt); if (0 != taos_stmt_execute(pThreadInfo->stmt)) { errorPrint("%s() LN%d, failied to execute insert statement\n", @@ -4888,7 +4892,7 @@ static int64_t execInsert(threadInfo *pThreadInfo, uint64_t k) affectedRows = k; } else { errorPrint("%s() LN%d: unknown insert mode: %d\n", - __func__, __LINE__, superTblInfo->insertMode); + __func__, __LINE__, superTblInfo->iface); affectedRows = 0; } } else { @@ -4924,7 +4928,7 @@ static void getTableName(char *pTblName, } static int64_t generateDataTailWithoutStb( - uint64_t batch, char* buffer, + uint32_t batch, char* buffer, int64_t remainderBufLen, int64_t insertRows, uint64_t startFrom, int64_t startTime, /* int64_t *pSamplePos, */int64_t *dataLen) { @@ -4932,7 +4936,7 @@ static int64_t generateDataTailWithoutStb( uint64_t len = 0; char *pstr = buffer; - verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch); + verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch); int64_t k = 0; for (k = 0; k < batch;) { @@ -4944,22 +4948,11 @@ static int64_t generateDataTailWithoutStb( char **data_type = g_args.datatype; int lenOfBinary = g_args.len_of_binary; - int64_t randTail = DEFAULT_TIMESTAMP_STEP * k; - - if (g_args.disorderRatio != 0) { - int rand_num = taosRandom() % 100; - if (rand_num < g_args.disorderRatio) { - randTail = (randTail + - (taosRandom() % g_args.disorderRange + 1)) * (-1); - - debugPrint("rand data generated, back %"PRId64"\n", randTail); - } - } else { - randTail = DEFAULT_TIMESTAMP_STEP * k; - } - retLen = generateData(data, data_type, - startTime + randTail, + startTime + getTSRandTail( + (int64_t)DEFAULT_TIMESTAMP_STEP, k, + g_args.disorderRatio, + g_args.disorderRange), lenOfBinary); if (len > remainderBufLen) @@ -4984,9 +4977,25 @@ static int64_t generateDataTailWithoutStb( return k; } -static int64_t generateStbDataTail( +static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, + int disorderRatio, int disorderRange) +{ + int64_t randTail = timeStampStep * seq; + if (disorderRatio > 0) { + int rand_num = taosRandom() % 100; + if(rand_num < disorderRatio) { + randTail = (randTail + + (taosRandom() % disorderRange + 1)) * (-1); + debugPrint("rand data generated, back %"PRId64"\n", randTail); + } + } + + return randTail; +} + +static int32_t generateStbDataTail( SSuperTable* superTblInfo, - uint64_t batch, char* buffer, + uint32_t batch, char* buffer, int64_t remainderBufLen, int64_t insertRows, uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, int64_t *dataLen) { @@ -4994,37 +5003,35 @@ static int64_t generateStbDataTail( char *pstr = buffer; - verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch); + bool tsRand; + if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { + tsRand = true; + } else { + tsRand = false; + } + verbosePrint("%s() LN%d batch=%ud\n", __func__, __LINE__, batch); - int64_t k = 0; + int32_t k = 0; for (k = 0; k < batch;) { char data[MAX_DATA_SIZE]; memset(data, 0, MAX_DATA_SIZE); int64_t retLen = 0; - if (0 == strncasecmp(superTblInfo->dataSource, - "sample", strlen("sample"))) { - retLen = getRowDataFromSample( + if (tsRand) { + retLen = generateStbRowData(superTblInfo, data, + startTime + getTSRandTail( + superTblInfo->timeStampStep, k, + superTblInfo->disorderRatio, + superTblInfo->disorderRange) + ); + } else { + retLen = getRowDataFromSample( data, remainderBufLen, startTime + superTblInfo->timeStampStep * k, superTblInfo, pSamplePos); - } else if (0 == strncasecmp(superTblInfo->dataSource, - "rand", strlen("rand"))) { - int64_t randTail = superTblInfo->timeStampStep * k; - if (superTblInfo->disorderRatio > 0) { - int rand_num = taosRandom() % 100; - if(rand_num < superTblInfo->disorderRatio) { - randTail = (randTail + - (taosRandom() % superTblInfo->disorderRange + 1)) * (-1); - debugPrint("rand data generated, back %"PRId64"\n", randTail); - } - } - - int64_t d = startTime + randTail; - retLen = generateStbRowData(superTblInfo, data, d); } if (retLen > remainderBufLen) { @@ -5036,7 +5043,7 @@ static int64_t generateStbDataTail( len += retLen; remainderBufLen -= retLen; - verbosePrint("%s() LN%d len=%"PRIu64" k=%"PRIu64" \nbuffer=%s\n", + verbosePrint("%s() LN%d len=%"PRIu64" k=%ud \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); startFrom ++; @@ -5133,9 +5140,11 @@ static int generateStbSQLHead( return len; } -static int64_t generateStbInterlaceData( +static int32_t generateStbInterlaceData( SSuperTable *superTblInfo, - char *tableName, uint64_t batchPerTbl, uint64_t i, uint64_t batchPerTblTimes, + char *tableName, uint32_t batchPerTbl, + uint64_t i, + uint32_t batchPerTblTimes, uint64_t tableSeq, threadInfo *pThreadInfo, char *buffer, int64_t insertRows, @@ -5162,7 +5171,7 @@ static int64_t generateStbInterlaceData( int64_t dataLen = 0; - verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%"PRIu64" batchPerTbl = %"PRIu64"\n", + verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%ud batchPerTbl = %ud\n", pThreadInfo->threadID, __func__, __LINE__, i, batchPerTblTimes, batchPerTbl); @@ -5170,7 +5179,7 @@ static int64_t generateStbInterlaceData( startTime = taosGetTimestamp(pThreadInfo->time_precision); } - int64_t k = generateStbDataTail( + int32_t k = generateStbDataTail( superTblInfo, batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, startTime, @@ -5180,7 +5189,7 @@ static int64_t generateStbInterlaceData( pstr += dataLen; *pRemainderBufLen -= dataLen; } else { - debugPrint("%s() LN%d, generated data tail: %"PRIu64", not equal batch per table: %"PRIu64"\n", + debugPrint("%s() LN%d, generated data tail: %ud, not equal batch per table: %ud\n", __func__, __LINE__, k, batchPerTbl); pstr -= headLen; pstr[0] = '\0'; @@ -5191,7 +5200,7 @@ static int64_t generateStbInterlaceData( } static int64_t generateInterlaceDataWithoutStb( - char *tableName, uint64_t batchPerTbl, + char *tableName, uint32_t batchPerTbl, uint64_t tableSeq, char *dbName, char *buffer, int64_t insertRows, @@ -5223,7 +5232,7 @@ static int64_t generateInterlaceDataWithoutStb( pstr += dataLen; *pRemainderBufLen -= dataLen; } else { - debugPrint("%s() LN%d, generated data tail: %"PRIu64", not equal batch per table: %"PRIu64"\n", + debugPrint("%s() LN%d, generated data tail: %"PRIu64", not equal batch per table: %ud\n", __func__, __LINE__, k, batchPerTbl); pstr -= headLen; pstr[0] = '\0'; @@ -5233,7 +5242,71 @@ static int64_t generateInterlaceDataWithoutStb( return k; } -static int64_t generateStbProgressiveData( +static int32_t prepareStbStmt(SSuperTable *stbInfo, + TAOS_STMT *stmt, + char *tableName, uint32_t batch, uint64_t insertRows, + int64_t startTime, char *buffer) +{ + uint32_t k; + int ret; + char *pstr = buffer; + pstr += sprintf(pstr, "INSERT INTO %s values(?", tableName); + + for (int i = 0; i < stbInfo->columnCount; i++) { + pstr += sprintf(pstr, ",?"); + } + pstr += sprintf(pstr, ")"); + + ret = taos_stmt_prepare(stmt, buffer, 0); + if (ret != 0){ + errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n", + ret, taos_errstr(NULL)); + return ret; + } + + void *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1)); + if (bindArray == NULL) { + errorPrint("Failed to allocate %d bind params\n", batch); + return -1; + } + + bool tsRand; + if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) { + tsRand = true; + } else { + tsRand = false; + } + for (k = 0; k < batch; k++) { + /* columnCount + 1 (ts) */ + for (int i = 0; i <= stbInfo->columnCount; i ++) { + TAOS_BIND *bind = (TAOS_BIND *)bindArray + (sizeof(TAOS_BIND) * i); + if (i == 0) { + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + int64_t ts; + if (tsRand) { + ts = startTime + getTSRandTail( + stbInfo->timeStampStep, k, + stbInfo->disorderRatio, + stbInfo->disorderRange); + } else { + ts = startTime + stbInfo->timeStampStep * k; + } + bind->buffer = &ts; + + } else { + + } + } + // if msg > 3MB, break + } + + taos_stmt_bind_param(stmt, bindArray); + taos_stmt_add_batch(stmt); + + return k; +} + +static int32_t generateStbProgressiveData( SSuperTable *superTblInfo, char *tableName, int64_t tableSeq, @@ -5267,12 +5340,17 @@ static int64_t generateStbProgressiveData( pSamplePos, &dataLen); } +static int64_t prepareStmtWithoutStb(char *tableName) +{ + return -1; +} + static int64_t generateProgressiveDataWithoutStb( char *tableName, - int64_t tableSeq, + /* int64_t tableSeq, */ threadInfo *pThreadInfo, char *buffer, int64_t insertRows, - uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, + uint64_t startFrom, int64_t startTime, /*int64_t *pSamplePos, */ int64_t *pRemainderBufLen) { assert(buffer != NULL); @@ -5313,7 +5391,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { pThreadInfo->threadID, __func__, __LINE__); int64_t insertRows; - uint64_t interlaceRows; + uint32_t interlaceRows; uint64_t maxSqlLen; int64_t nTimeStampStep; uint64_t insert_interval; @@ -5351,8 +5429,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { if (interlaceRows > g_args.num_of_RPR) interlaceRows = g_args.num_of_RPR; - uint64_t batchPerTbl = interlaceRows; - uint64_t batchPerTblTimes; + uint32_t batchPerTbl = interlaceRows; + uint32_t batchPerTblTimes; if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { batchPerTblTimes = @@ -5401,9 +5479,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { pstr += len; remainderBufLen -= len; - uint64_t recOfBatch = 0; + uint32_t recOfBatch = 0; - for (uint64_t i = 0; i < batchPerTblTimes; i ++) { + for (uint32_t i = 0; i < batchPerTblTimes; i ++) { char tableName[TSDB_TABLE_NAME_LEN]; getTableName(tableName, pThreadInfo, tableSeq); @@ -5416,11 +5494,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { uint64_t oldRemainderLen = remainderBufLen; - int64_t generated; + int32_t generated; if (superTblInfo) { generated = generateStbInterlaceData( superTblInfo, - tableName, batchPerTbl, i, batchPerTblTimes, + tableName, batchPerTbl, i, + batchPerTblTimes, tableSeq, pThreadInfo, pstr, insertRows, @@ -5435,10 +5514,10 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { &remainderBufLen); } - debugPrint("[%d] %s() LN%d, generated records is %"PRId64"\n", + debugPrint("[%d] %s() LN%d, generated records is %d\n", pThreadInfo->threadID, __func__, __LINE__, generated); if (generated < 0) { - errorPrint("[%d] %s() LN%d, generated records is %"PRId64"\n", + errorPrint("[%d] %s() LN%d, generated records is %d\n", pThreadInfo->threadID, __func__, __LINE__, generated); goto free_of_interlace; } else if (generated == 0) { @@ -5450,7 +5529,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { pstr += (oldRemainderLen - remainderBufLen); // startTime += batchPerTbl * superTblInfo->timeStampStep; pThreadInfo->totalInsertRows += batchPerTbl; - verbosePrint("[%d] %s() LN%d batchPerTbl=%"PRId64" recOfBatch=%"PRId64"\n", + verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n", pThreadInfo->threadID, __func__, __LINE__, batchPerTbl, recOfBatch); @@ -5466,7 +5545,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { if (generatedRecPerTbl >= insertRows) break; - int remainRows = insertRows - generatedRecPerTbl; + int64_t remainRows = insertRows - generatedRecPerTbl; if ((remainRows > 0) && (batchPerTbl > remainRows)) batchPerTbl = remainRows; @@ -5482,7 +5561,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { break; } - verbosePrint("[%d] %s() LN%d recOfBatch=%"PRIu64" totalInsertRows=%"PRIu64"\n", + verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n", pThreadInfo->threadID, __func__, __LINE__, recOfBatch, pThreadInfo->totalInsertRows); verbosePrint("[%d] %s() LN%d, buffer=%s\n", @@ -5491,7 +5570,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { startTs = taosGetTimestampMs(); if (recOfBatch == 0) { - errorPrint("[%d] %s() LN%d try inserting records of batch is %"PRIu64"\n", + errorPrint("[%d] %s() LN%d try inserting records of batch is %d\n", pThreadInfo->threadID, __func__, __LINE__, recOfBatch); errorPrint("%s\n", "\tPlease check if the batch or the buffer length is proper value!\n"); @@ -5513,7 +5592,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { pThreadInfo->totalDelay += delay; if (recOfBatch != affectedRows) { - errorPrint("[%d] %s() LN%d execInsert insert %"PRIu64", affected rows: %"PRId64"\n%s\n", + errorPrint("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n%s\n", pThreadInfo->threadID, __func__, __LINE__, recOfBatch, affectedRows, pThreadInfo->buffer); goto free_of_interlace; @@ -5574,12 +5653,6 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { uint64_t startTs = taosGetTimestampMs(); uint64_t endTs; -/* int insert_interval = - superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; - uint64_t st = 0; - uint64_t et = 0xffffffff; - */ - pThreadInfo->totalInsertRows = 0; pThreadInfo->totalAffectedRows = 0; @@ -5606,20 +5679,33 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { pstr += len; remainderBufLen -= len; - int64_t generated; + int32_t generated; if (superTblInfo) { - generated = generateStbProgressiveData( - superTblInfo, - tableName, tableSeq, pThreadInfo->db_name, pstr, insertRows, - i, start_time, - &(pThreadInfo->samplePos), - &remainderBufLen); + if (superTblInfo->iface == STMT_IFACE) { + generated = prepareStbStmt(superTblInfo, + pThreadInfo->stmt, + tableName, g_args.num_of_RPR, + insertRows, start_time, pstr); + } else { + generated = generateStbProgressiveData( + superTblInfo, + tableName, tableSeq, pThreadInfo->db_name, pstr, + insertRows, i, start_time, + &(pThreadInfo->samplePos), + &remainderBufLen); + } } else { - generated = generateProgressiveDataWithoutStb( - tableName, tableSeq, pThreadInfo, pstr, insertRows, - i, start_time, - &(pThreadInfo->samplePos), - &remainderBufLen); + if (g_args.iface == STMT_IFACE) { + generated = prepareStmtWithoutStb(tableName); + } else { + generated = generateProgressiveDataWithoutStb( + tableName, + /* tableSeq, */ + pThreadInfo, pstr, insertRows, + i, start_time, + /* &(pThreadInfo->samplePos), */ + &remainderBufLen); + } } if (generated > 0) i += generated; @@ -5687,7 +5773,7 @@ static void* syncWrite(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - int interlaceRows; + uint32_t interlaceRows; if (superTblInfo) { if ((superTblInfo->interlaceRows == 0) @@ -5818,7 +5904,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, char* precision,SSuperTable* superTblInfo) { //TAOS* taos; - //if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { + //if (0 == strncasecmp(superTblInfo->iface, "taosc", 5)) { // taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); // if (NULL == taos) { // printf("connect to server fail, reason: %s\n", taos_errstr(NULL)); @@ -5963,8 +6049,9 @@ static void startMultiThreadInsertData(int threads, char* db_name, } if ((superTblInfo) - && (superTblInfo->insertMode == REST_IFACE)) { - if (convertHostToServAddr(g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) { + && (superTblInfo->iface == REST_IFACE)) { + if (convertHostToServAddr( + g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) { exit(-1); } } @@ -5989,8 +6076,8 @@ static void startMultiThreadInsertData(int threads, char* db_name, pThreadInfo->minDelay = UINT64_MAX; if ((NULL == superTblInfo) || - (superTblInfo->insertMode != REST_IFACE)) { - //pThreadInfo->taos = taos; + (superTblInfo->iface != REST_IFACE)) { + //t_info->taos = taos; pThreadInfo->taos = taos_connect( g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); @@ -6003,7 +6090,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, exit(-1); } - if ((superTblInfo) && (superTblInfo->insertMode == STMT_IFACE)) { + if ((superTblInfo) && (superTblInfo->iface == STMT_IFACE)) { pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos); if (NULL == pThreadInfo->stmt) { errorPrint( @@ -7252,7 +7339,7 @@ static void setParaFromArg(){ tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix, g_args.tb_prefix, MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE); - g_Dbs.db[0].superTbls[0].insertMode = g_args.iface; + g_Dbs.db[0].superTbls[0].iface = g_args.iface; tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp, "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;