diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index aad2fe95fa..3b91de32b0 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -295,6 +295,9 @@ typedef struct SSuperTable_S { uint64_t lenOfTagOfOneRow; char* sampleDataBuf; +#if STMT_IFACE_ENABLED == 1 + char* sampleBindArray; +#endif //int sampleRowCount; //int sampleUsePos; @@ -441,6 +444,7 @@ typedef struct SQueryMetaInfo_S { typedef struct SThreadInfo_S { TAOS * taos; TAOS_STMT *stmt; + int64_t *bind_ts; int threadID; char db_name[TSDB_DB_NAME_LEN]; uint32_t time_precision; @@ -454,7 +458,7 @@ typedef struct SThreadInfo_S { int64_t start_time; char* cols; bool use_metric; - SSuperTable* superTblInfo; + SSuperTable* stbInfo; char *buffer; // sql cmd buffer // for async insert @@ -674,7 +678,7 @@ static FILE * g_fpOfInsertResult = NULL; /////////////////////////////////////////////////// -static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); } +static void ERROR_EXIT(const char *msg) { errorPrint("%s", msg); exit(-1); } #ifndef TAOSDEMO_COMMIT_SHA1 #define TAOSDEMO_COMMIT_SHA1 "unknown" @@ -1136,8 +1140,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } if (0 == columnCount) { - perror("data type error!"); - exit(-1); + ERROR_EXIT("data type error!"); } g_args.num_of_CPR = columnCount; @@ -2425,14 +2428,14 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port #endif debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd); free(request_buf); - ERROR_EXIT("ERROR opening socket"); + ERROR_EXIT("opening socket"); } int retConn = connect(sockfd, (struct sockaddr *)pServAddr, sizeof(struct sockaddr)); debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn); if (retConn < 0) { free(request_buf); - ERROR_EXIT("ERROR connecting"); + ERROR_EXIT("connecting"); } memset(base64_buf, 0, INPUT_BUF_LEN); @@ -2465,7 +2468,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port auth, strlen(sqlstr), sqlstr); if (r >= req_buf_len) { free(request_buf); - ERROR_EXIT("ERROR too long request"); + ERROR_EXIT("too long request"); } verbosePrint("%s() LN%d: Request:\n%s\n", __func__, __LINE__, request_buf); @@ -2478,7 +2481,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port bytes = write(sockfd, request_buf + sent, req_str_len - sent); #endif if (bytes < 0) - ERROR_EXIT("ERROR writing message to socket"); + ERROR_EXIT("writing message to socket"); if (bytes == 0) break; sent+=bytes; @@ -2495,7 +2498,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port #endif if (bytes < 0) { free(request_buf); - ERROR_EXIT("ERROR reading response from socket"); + ERROR_EXIT("reading response from socket"); } if (bytes == 0) break; @@ -2504,7 +2507,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port if (received == resp_len) { free(request_buf); - ERROR_EXIT("ERROR storing complete response from socket"); + ERROR_EXIT("storing complete response from socket"); } response_buf[RESP_BUF_LEN - 1] = '\0'; @@ -2667,8 +2670,8 @@ static int calcRowLen(SSuperTable* superTbls) { } else if (strcasecmp(dataType, "TIMESTAMP") == 0) { lenOfOneRow += TIMESTAMP_BUFF_LEN; } else { - printf("get error data type : %s\n", dataType); - exit(-1); + errorPrint("get error data type : %s\n", dataType); + exit(EXIT_FAILURE); } } @@ -2698,8 +2701,8 @@ static int calcRowLen(SSuperTable* superTbls) { } else if (strcasecmp(dataType, "DOUBLE") == 0) { lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + DOUBLE_BUFF_LEN; } else { - printf("get error tag type : %s\n", dataType); - exit(-1); + errorPrint("get error tag type : %s\n", dataType); + exit(EXIT_FAILURE); } } @@ -2737,7 +2740,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, taos_close(taos); errorPrint("%s() LN%d, failed to run command %s\n", __func__, __LINE__, command); - exit(-1); + exit(EXIT_FAILURE); } int64_t childTblCount = (limit < 0)?10000:limit; @@ -2748,7 +2751,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, taos_free_result(res); taos_close(taos); errorPrint("%s() LN%d, failed to allocate memory!\n", __func__, __LINE__); - exit(-1); + exit(EXIT_FAILURE); } } @@ -2759,7 +2762,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, if (0 == strlen((char *)row[0])) { errorPrint("%s() LN%d, No.%"PRId64" table return empty name\n", __func__, __LINE__, count); - exit(-1); + exit(EXIT_FAILURE); } tstrncpy(pTblName, (char *)row[0], len[0]+1); @@ -2775,12 +2778,12 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, (size_t)((childTblCount-count)*TSDB_TABLE_NAME_LEN)); } else { // exit, if allocate more memory failed - errorPrint("%s() LN%d, realloc fail for save child table name of %s.%s\n", - __func__, __LINE__, dbName, sTblName); tmfree(childTblName); taos_free_result(res); taos_close(taos); - exit(-1); + errorPrint("%s() LN%d, realloc fail for save child table name of %s.%s\n", + __func__, __LINE__, dbName, sTblName); + exit(EXIT_FAILURE); } } pTblName = childTblName + count * TSDB_TABLE_NAME_LEN; @@ -2964,10 +2967,10 @@ static int createSuperTable( lenOfOneRow += TIMESTAMP_BUFF_LEN; } else { taos_close(taos); + free(command); errorPrint("%s() LN%d, config error data type : %s\n", __func__, __LINE__, dataType); - free(command); - exit(-1); + exit(EXIT_FAILURE); } } @@ -2976,11 +2979,11 @@ static int createSuperTable( // save for creating child table superTbl->colsOfCreateChildTable = (char*)calloc(len+20, 1); if (NULL == superTbl->colsOfCreateChildTable) { - errorPrint("%s() LN%d, Failed when calloc, size:%d", - __func__, __LINE__, len+1); taos_close(taos); free(command); - exit(-1); + errorPrint("%s() LN%d, Failed when calloc, size:%d", + __func__, __LINE__, len+1); + exit(EXIT_FAILURE); } snprintf(superTbl->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols); @@ -3054,10 +3057,10 @@ static int createSuperTable( lenOfTagOfOneRow += superTbl->tags[tagIndex].dataLen + DOUBLE_BUFF_LEN; } else { taos_close(taos); + free(command); errorPrint("%s() LN%d, config error tag type : %s\n", __func__, __LINE__, dataType); - free(command); - exit(-1); + exit(EXIT_FAILURE); } } @@ -3089,7 +3092,7 @@ int createDatabasesAndStables(char *command) { errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); return -1; } - + for (int i = 0; i < g_Dbs.dbCount; i++) { if (g_Dbs.db[i].drop) { sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName); @@ -3100,35 +3103,43 @@ int createDatabasesAndStables(char *command) { int dataLen = 0; dataLen += snprintf(command + dataLen, - BUFFER_SIZE - dataLen, "create database if not exists %s", g_Dbs.db[i].dbName); + BUFFER_SIZE - dataLen, "create database if not exists %s", + g_Dbs.db[i].dbName); if (g_Dbs.db[i].dbCfg.blocks > 0) { dataLen += snprintf(command + dataLen, - BUFFER_SIZE - dataLen, " blocks %d", g_Dbs.db[i].dbCfg.blocks); + BUFFER_SIZE - dataLen, " blocks %d", + g_Dbs.db[i].dbCfg.blocks); } if (g_Dbs.db[i].dbCfg.cache > 0) { dataLen += snprintf(command + dataLen, - BUFFER_SIZE - dataLen, " cache %d", g_Dbs.db[i].dbCfg.cache); + BUFFER_SIZE - dataLen, " cache %d", + g_Dbs.db[i].dbCfg.cache); } if (g_Dbs.db[i].dbCfg.days > 0) { dataLen += snprintf(command + dataLen, - BUFFER_SIZE - dataLen, " days %d", g_Dbs.db[i].dbCfg.days); + BUFFER_SIZE - dataLen, " days %d", + g_Dbs.db[i].dbCfg.days); } if (g_Dbs.db[i].dbCfg.keep > 0) { dataLen += snprintf(command + dataLen, - BUFFER_SIZE - dataLen, " keep %d", g_Dbs.db[i].dbCfg.keep); + BUFFER_SIZE - dataLen, " keep %d", + g_Dbs.db[i].dbCfg.keep); } if (g_Dbs.db[i].dbCfg.quorum > 1) { dataLen += snprintf(command + dataLen, - BUFFER_SIZE - dataLen, " quorum %d", g_Dbs.db[i].dbCfg.quorum); + BUFFER_SIZE - dataLen, " quorum %d", + g_Dbs.db[i].dbCfg.quorum); } if (g_Dbs.db[i].dbCfg.replica > 0) { dataLen += snprintf(command + dataLen, - BUFFER_SIZE - dataLen, " replica %d", g_Dbs.db[i].dbCfg.replica); + BUFFER_SIZE - dataLen, " replica %d", + g_Dbs.db[i].dbCfg.replica); } if (g_Dbs.db[i].dbCfg.update > 0) { dataLen += snprintf(command + dataLen, - BUFFER_SIZE - dataLen, " update %d", g_Dbs.db[i].dbCfg.update); + BUFFER_SIZE - dataLen, " update %d", + g_Dbs.db[i].dbCfg.update); } //if (g_Dbs.db[i].dbCfg.maxtablesPerVnode > 0) { // dataLen += snprintf(command + dataLen, @@ -3136,42 +3147,48 @@ int createDatabasesAndStables(char *command) { //} if (g_Dbs.db[i].dbCfg.minRows > 0) { dataLen += snprintf(command + dataLen, - BUFFER_SIZE - dataLen, " minrows %d", g_Dbs.db[i].dbCfg.minRows); + BUFFER_SIZE - dataLen, " minrows %d", + g_Dbs.db[i].dbCfg.minRows); } if (g_Dbs.db[i].dbCfg.maxRows > 0) { dataLen += snprintf(command + dataLen, - BUFFER_SIZE - dataLen, " maxrows %d", g_Dbs.db[i].dbCfg.maxRows); + BUFFER_SIZE - dataLen, " maxrows %d", + g_Dbs.db[i].dbCfg.maxRows); } if (g_Dbs.db[i].dbCfg.comp > 0) { dataLen += snprintf(command + dataLen, - BUFFER_SIZE - dataLen, " comp %d", g_Dbs.db[i].dbCfg.comp); + BUFFER_SIZE - dataLen, " comp %d", + g_Dbs.db[i].dbCfg.comp); } if (g_Dbs.db[i].dbCfg.walLevel > 0) { dataLen += snprintf(command + dataLen, - BUFFER_SIZE - dataLen, " wal %d", g_Dbs.db[i].dbCfg.walLevel); + BUFFER_SIZE - dataLen, " wal %d", + g_Dbs.db[i].dbCfg.walLevel); } if (g_Dbs.db[i].dbCfg.cacheLast > 0) { dataLen += snprintf(command + dataLen, - BUFFER_SIZE - dataLen, " cachelast %d", g_Dbs.db[i].dbCfg.cacheLast); + BUFFER_SIZE - dataLen, " cachelast %d", + g_Dbs.db[i].dbCfg.cacheLast); } if (g_Dbs.db[i].dbCfg.fsync > 0) { dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, " fsync %d", g_Dbs.db[i].dbCfg.fsync); } - if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", strlen("ms"))) + if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) #if NANO_SECOND_ENABLED == 1 || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, - "ns", strlen("ns"))) + "ns", 2)) #endif || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, - "us", strlen("us")))) { + "us", 2))) { dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, " precision \'%s\';", g_Dbs.db[i].dbCfg.precision); } if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) { taos_close(taos); - errorPrint( "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName); + errorPrint( "\ncreate database %s failed!\n\n", + g_Dbs.db[i].dbName); return -1; } printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName); @@ -3218,7 +3235,7 @@ int createDatabasesAndStables(char *command) { static void* createTable(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + SSuperTable* stbInfo = pThreadInfo->stbInfo; setThreadName("createTable"); @@ -3229,7 +3246,7 @@ static void* createTable(void *sarg) pThreadInfo->buffer = calloc(buff_len, 1); if (pThreadInfo->buffer == NULL) { errorPrint("%s() LN%d, Memory allocated failed!\n", __func__, __LINE__); - exit(-1); + exit(EXIT_FAILURE); } int len = 0; @@ -3248,11 +3265,11 @@ static void* createTable(void *sarg) g_args.tb_prefix, i, pThreadInfo->cols); } else { - if (superTblInfo == NULL) { + if (stbInfo == NULL) { + free(pThreadInfo->buffer); errorPrint("%s() LN%d, use metric, but super table info is NULL\n", __func__, __LINE__); - free(pThreadInfo->buffer); - exit(-1); + exit(EXIT_FAILURE); } else { if (0 == len) { batchNum = 0; @@ -3260,29 +3277,35 @@ static void* createTable(void *sarg) len += snprintf(pThreadInfo->buffer + len, buff_len - len, "create table "); } + char* tagsValBuf = NULL; - if (0 == superTblInfo->tagSource) { - tagsValBuf = generateTagValuesForStb(superTblInfo, i); + if (0 == stbInfo->tagSource) { + tagsValBuf = generateTagValuesForStb(stbInfo, i); } else { + if (0 == stbInfo->tagSampleCount) { + free(pThreadInfo->buffer); + ERROR_EXIT("use sample file for tag, but has no content!\n"); + } tagsValBuf = getTagValueFromTagSample( - superTblInfo, - i % superTblInfo->tagSampleCount); + stbInfo, + i % stbInfo->tagSampleCount); } + if (NULL == tagsValBuf) { free(pThreadInfo->buffer); - return NULL; + ERROR_EXIT("use metric, but tag buffer is NULL\n"); } len += snprintf(pThreadInfo->buffer + len, buff_len - len, "if not exists %s.%s%"PRIu64" using %s.%s tags %s ", - pThreadInfo->db_name, superTblInfo->childTblPrefix, + pThreadInfo->db_name, stbInfo->childTblPrefix, i, pThreadInfo->db_name, - superTblInfo->sTblName, tagsValBuf); + stbInfo->sTblName, tagsValBuf); free(tagsValBuf); batchNum++; - if ((batchNum < superTblInfo->batchCreateTableNum) + if ((batchNum < stbInfo->batchCreateTableNum) && ((buff_len - len) - >= (superTblInfo->lenOfTagOfOneRow + 256))) { + >= (stbInfo->lenOfTagOfOneRow + 256))) { continue; } } @@ -3317,14 +3340,13 @@ static void* createTable(void *sarg) static int startMultiThreadCreateChildTable( char* cols, int threads, uint64_t tableFrom, int64_t ntables, - char* db_name, SSuperTable* superTblInfo) { + char* db_name, SSuperTable* stbInfo) { pthread_t *pids = calloc(1, threads * sizeof(pthread_t)); threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { - printf("malloc failed\n"); - exit(-1); + ERROR_EXIT("createChildTable malloc failed\n"); } if (threads < 1) { @@ -3344,7 +3366,7 @@ static int startMultiThreadCreateChildTable( threadInfo *pThreadInfo = infos + i; pThreadInfo->threadID = i; tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN); - pThreadInfo->superTblInfo = superTblInfo; + pThreadInfo->stbInfo = stbInfo; verbosePrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name); pThreadInfo->taos = taos_connect( g_Dbs.host, @@ -3451,26 +3473,26 @@ static void createChildTables() { /* Read 10000 lines at most. If more than 10000 lines, continue to read after using */ -static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { +static int readTagFromCsvFileToMem(SSuperTable * stbInfo) { size_t n = 0; ssize_t readLen = 0; char * line = NULL; - FILE *fp = fopen(superTblInfo->tagsFile, "r"); + FILE *fp = fopen(stbInfo->tagsFile, "r"); if (fp == NULL) { printf("Failed to open tags file: %s, reason:%s\n", - superTblInfo->tagsFile, strerror(errno)); + stbInfo->tagsFile, strerror(errno)); return -1; } - if (superTblInfo->tagDataBuf) { - free(superTblInfo->tagDataBuf); - superTblInfo->tagDataBuf = NULL; + if (stbInfo->tagDataBuf) { + free(stbInfo->tagDataBuf); + stbInfo->tagDataBuf = NULL; } int tagCount = 10000; int count = 0; - char* tagDataBuf = calloc(1, superTblInfo->lenOfTagOfOneRow * tagCount); + char* tagDataBuf = calloc(1, stbInfo->lenOfTagOfOneRow * tagCount); if (tagDataBuf == NULL) { printf("Failed to calloc, reason:%s\n", strerror(errno)); fclose(fp); @@ -3486,20 +3508,20 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { continue; } - memcpy(tagDataBuf + count * superTblInfo->lenOfTagOfOneRow, line, readLen); + memcpy(tagDataBuf + count * stbInfo->lenOfTagOfOneRow, line, readLen); count++; if (count >= tagCount - 1) { char *tmp = realloc(tagDataBuf, - (size_t)tagCount*1.5*superTblInfo->lenOfTagOfOneRow); + (size_t)tagCount*1.5*stbInfo->lenOfTagOfOneRow); if (tmp != NULL) { tagDataBuf = tmp; tagCount = (int)(tagCount*1.5); - memset(tagDataBuf + count*superTblInfo->lenOfTagOfOneRow, - 0, (size_t)((tagCount-count)*superTblInfo->lenOfTagOfOneRow)); + memset(tagDataBuf + count*stbInfo->lenOfTagOfOneRow, + 0, (size_t)((tagCount-count)*stbInfo->lenOfTagOfOneRow)); } else { // exit, if allocate more memory failed - printf("realloc fail for save tag val from %s\n", superTblInfo->tagsFile); + printf("realloc fail for save tag val from %s\n", stbInfo->tagsFile); tmfree(tagDataBuf); free(line); fclose(fp); @@ -3508,8 +3530,8 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { } } - superTblInfo->tagDataBuf = tagDataBuf; - superTblInfo->tagSampleCount = count; + stbInfo->tagDataBuf = tagDataBuf; + stbInfo->tagSampleCount = count; free(line); fclose(fp); @@ -3520,28 +3542,28 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { Read 10000 lines at most. If more than 10000 lines, continue to read after using */ static int readSampleFromCsvFileToMem( - SSuperTable* superTblInfo) { + SSuperTable* stbInfo) { size_t n = 0; ssize_t readLen = 0; char * line = NULL; int getRows = 0; - FILE* fp = fopen(superTblInfo->sampleFile, "r"); + FILE* fp = fopen(stbInfo->sampleFile, "r"); if (fp == NULL) { errorPrint( "Failed to open sample file: %s, reason:%s\n", - superTblInfo->sampleFile, strerror(errno)); + stbInfo->sampleFile, strerror(errno)); return -1; } - assert(superTblInfo->sampleDataBuf); - memset(superTblInfo->sampleDataBuf, 0, - MAX_SAMPLES_ONCE_FROM_FILE * superTblInfo->lenOfOneRow); + assert(stbInfo->sampleDataBuf); + memset(stbInfo->sampleDataBuf, 0, + MAX_SAMPLES_ONCE_FROM_FILE * stbInfo->lenOfOneRow); while(1) { readLen = tgetline(&line, &n, fp); if (-1 == readLen) { if(0 != fseek(fp, 0, SEEK_SET)) { errorPrint( "Failed to fseek file: %s, reason:%s\n", - superTblInfo->sampleFile, strerror(errno)); + stbInfo->sampleFile, strerror(errno)); fclose(fp); return -1; } @@ -3556,13 +3578,13 @@ static int readSampleFromCsvFileToMem( continue; } - if (readLen > superTblInfo->lenOfOneRow) { + if (readLen > stbInfo->lenOfOneRow) { printf("sample row len[%d] overflow define schema len[%"PRIu64"], so discard this row\n", - (int32_t)readLen, superTblInfo->lenOfOneRow); + (int32_t)readLen, stbInfo->lenOfOneRow); continue; } - memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow, + memcpy(stbInfo->sampleDataBuf + getRows * stbInfo->lenOfOneRow, line, readLen); getRows++; @@ -5047,6 +5069,23 @@ static void postFreeResource() { free(g_Dbs.db[i].superTbls[j].sampleDataBuf); g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL; } +#if STMT_IFACE_ENABLED == 1 + if (g_Dbs.db[i].superTbls[j].sampleBindArray) { + for (int k = 0; k < MAX_SAMPLES_ONCE_FROM_FILE; k++) { + uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)( + g_Dbs.db[i].superTbls[j].sampleBindArray + + sizeof(uintptr_t *) * k)); + for (int c = 1; c < g_Dbs.db[i].superTbls[j].columnCount + 1; c++) { + TAOS_BIND *bind = (TAOS_BIND *)((char *)tmp + (sizeof(TAOS_BIND) * c)); + if (bind) + tmfree(bind->buffer); + } + tmfree((char *)tmp); + } + } + tmfree((char *)g_Dbs.db[i].superTbls[j].sampleBindArray); +#endif + if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) { free(g_Dbs.db[i].superTbls[j].tagDataBuf); g_Dbs.db[i].superTbls[j].tagDataBuf = NULL; @@ -5067,21 +5106,14 @@ static void postFreeResource() { tmfree(g_randfloat_buff); tmfree(g_rand_current_buff); tmfree(g_rand_phase_buff); - tmfree(g_randdouble_buff); + } static int getRowDataFromSample( char* dataBuf, int64_t maxLen, int64_t timestamp, - SSuperTable* superTblInfo, int64_t* sampleUsePos) + SSuperTable* stbInfo, int64_t* sampleUsePos) { if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) { - /* int ret = readSampleFromCsvFileToMem(superTblInfo); - if (0 != ret) { - tmfree(superTblInfo->sampleDataBuf); - superTblInfo->sampleDataBuf = NULL; - return -1; - } - */ *sampleUsePos = 0; } @@ -5091,8 +5123,8 @@ static int getRowDataFromSample( "(%" PRId64 ", ", timestamp); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%s", - superTblInfo->sampleDataBuf - + superTblInfo->lenOfOneRow * (*sampleUsePos)); + stbInfo->sampleDataBuf + + stbInfo->lenOfOneRow * (*sampleUsePos)); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); (*sampleUsePos)++; @@ -5248,7 +5280,7 @@ static int64_t generateData(char *recBuf, char **data_type, if (s == NULL) { errorPrint("%s() LN%d, memory allocation %d bytes failed\n", __func__, __LINE__, lenOfBinary + 1); - exit(-1); + exit(EXIT_FAILURE); } rand_string(s, lenOfBinary); pstr += sprintf(pstr, ",\"%s\"", s); @@ -5258,7 +5290,7 @@ static int64_t generateData(char *recBuf, char **data_type, if (s == NULL) { errorPrint("%s() LN%d, memory allocation %d bytes failed\n", __func__, __LINE__, lenOfBinary + 1); - exit(-1); + exit(EXIT_FAILURE); } rand_string(s, lenOfBinary); pstr += sprintf(pstr, ",\"%s\"", s); @@ -5266,8 +5298,7 @@ static int64_t generateData(char *recBuf, char **data_type, } if (strlen(recBuf) > MAX_DATA_SIZE) { - perror("column length too long, abort"); - exit(-1); + ERROR_EXIT("column length too long, abort"); } } @@ -5278,27 +5309,27 @@ static int64_t generateData(char *recBuf, char **data_type, return (int32_t)strlen(recBuf); } -static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { +static int prepareSampleDataForSTable(SSuperTable *stbInfo) { char* sampleDataBuf = NULL; sampleDataBuf = calloc( - superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); + stbInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); if (sampleDataBuf == NULL) { errorPrint("%s() LN%d, Failed to calloc %"PRIu64" Bytes, reason:%s\n", __func__, __LINE__, - superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, + stbInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, strerror(errno)); return -1; } - superTblInfo->sampleDataBuf = sampleDataBuf; - int ret = readSampleFromCsvFileToMem(superTblInfo); + stbInfo->sampleDataBuf = sampleDataBuf; + int ret = readSampleFromCsvFileToMem(stbInfo); if (0 != ret) { errorPrint("%s() LN%d, read sample from csv file failed.\n", __func__, __LINE__); tmfree(sampleDataBuf); - superTblInfo->sampleDataBuf = NULL; + stbInfo->sampleDataBuf = NULL; return -1; } @@ -5308,14 +5339,14 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) { int32_t affectedRows; - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + SSuperTable* stbInfo = pThreadInfo->stbInfo; verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer); uint16_t iface; - if (superTblInfo) - iface = superTblInfo->iface; + if (stbInfo) + iface = stbInfo->iface; else { if (g_args.iface == INTERFACE_BUT) iface = TAOSC_IFACE; @@ -5355,7 +5386,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) __func__, __LINE__, taos_stmt_errstr(pThreadInfo->stmt)); fprintf(stderr, "\n\033[31m === Please reduce batch number if WAL size exceeds limit. ===\033[0m\n\n"); - exit(-1); + exit(EXIT_FAILURE); } affectedRows = k; break; @@ -5363,7 +5394,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) default: errorPrint("%s() LN%d: unknown insert mode: %d\n", - __func__, __LINE__, superTblInfo->iface); + __func__, __LINE__, stbInfo->iface); affectedRows = 0; } @@ -5373,24 +5404,24 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) static void getTableName(char *pTblName, threadInfo* pThreadInfo, uint64_t tableSeq) { - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - if (superTblInfo) { - if (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable) { - if (superTblInfo->childTblLimit > 0) { + SSuperTable* stbInfo = pThreadInfo->stbInfo; + if (stbInfo) { + if (AUTO_CREATE_SUBTBL != stbInfo->autoCreateTable) { + if (stbInfo->childTblLimit > 0) { snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", - superTblInfo->childTblName + - (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); + stbInfo->childTblName + + (tableSeq - stbInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); } else { verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n", pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->start_table_from, pThreadInfo->ntables, tableSeq); snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", - superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); + stbInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); } } else { snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", - superTblInfo->childTblPrefix, tableSeq); + stbInfo->childTblPrefix, tableSeq); } } else { snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", @@ -5471,7 +5502,7 @@ static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, } static int32_t generateStbDataTail( - SSuperTable* superTblInfo, + SSuperTable* stbInfo, uint32_t batch, char* buffer, int64_t remainderBufLen, int64_t insertRows, uint64_t recordFrom, int64_t startTime, @@ -5481,7 +5512,7 @@ static int32_t generateStbDataTail( char *pstr = buffer; bool tsRand; - if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { + if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) { tsRand = true; } else { tsRand = false; @@ -5496,26 +5527,26 @@ static int32_t generateStbDataTail( int64_t lenOfRow = 0; if (tsRand) { - if (superTblInfo->disorderRatio > 0) { - lenOfRow = generateStbRowData(superTblInfo, data, + if (stbInfo->disorderRatio > 0) { + lenOfRow = generateStbRowData(stbInfo, data, remainderBufLen, startTime + getTSRandTail( - superTblInfo->timeStampStep, k, - superTblInfo->disorderRatio, - superTblInfo->disorderRange) + stbInfo->timeStampStep, k, + stbInfo->disorderRatio, + stbInfo->disorderRange) ); } else { - lenOfRow = generateStbRowData(superTblInfo, data, + lenOfRow = generateStbRowData(stbInfo, data, remainderBufLen, - startTime + superTblInfo->timeStampStep * k + startTime + stbInfo->timeStampStep * k ); } } else { lenOfRow = getRowDataFromSample( data, (remainderBufLen < MAX_DATA_SIZE)?remainderBufLen:MAX_DATA_SIZE, - startTime + superTblInfo->timeStampStep * k, - superTblInfo, + startTime + stbInfo->timeStampStep * k, + stbInfo, pSamplePos); } @@ -5571,7 +5602,7 @@ static int generateSQLHeadWithoutStb(char *tableName, } static int generateStbSQLHead( - SSuperTable* superTblInfo, + SSuperTable* stbInfo, char *tableName, int64_t tableSeq, char *dbName, char *buffer, int remainderBufLen) @@ -5580,14 +5611,14 @@ static int generateStbSQLHead( char headBuf[HEAD_BUFF_LEN]; - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { + if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) { char* tagsValBuf = NULL; - if (0 == superTblInfo->tagSource) { - tagsValBuf = generateTagValuesForStb(superTblInfo, tableSeq); + if (0 == stbInfo->tagSource) { + tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq); } else { tagsValBuf = getTagValueFromTagSample( - superTblInfo, - tableSeq % superTblInfo->tagSampleCount); + stbInfo, + tableSeq % stbInfo->tagSampleCount); } if (NULL == tagsValBuf) { errorPrint("%s() LN%d, tag buf failed to allocate memory\n", @@ -5602,10 +5633,10 @@ static int generateStbSQLHead( dbName, tableName, dbName, - superTblInfo->sTblName, + stbInfo->sTblName, tagsValBuf); tmfree(tagsValBuf); - } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { + } else if (TBL_ALREADY_EXISTS == stbInfo->childTblExists) { len = snprintf( headBuf, HEAD_BUFF_LEN, @@ -5630,12 +5661,12 @@ static int generateStbSQLHead( } static int32_t generateStbInterlaceData( - SSuperTable *superTblInfo, + threadInfo *pThreadInfo, char *tableName, uint32_t batchPerTbl, uint64_t i, uint32_t batchPerTblTimes, uint64_t tableSeq, - threadInfo *pThreadInfo, char *buffer, + char *buffer, int64_t insertRows, int64_t startTime, uint64_t *pRemainderBufLen) @@ -5643,8 +5674,9 @@ static int32_t generateStbInterlaceData( assert(buffer); char *pstr = buffer; + SSuperTable *stbInfo = pThreadInfo->stbInfo; int headLen = generateStbSQLHead( - superTblInfo, + stbInfo, tableName, tableSeq, pThreadInfo->db_name, pstr, *pRemainderBufLen); @@ -5664,12 +5696,12 @@ static int32_t generateStbInterlaceData( pThreadInfo->threadID, __func__, __LINE__, i, batchPerTblTimes, batchPerTbl); - if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { + if (0 == strncasecmp(stbInfo->startTimestamp, "now", 3)) { startTime = taosGetTimestamp(pThreadInfo->time_precision); } int32_t k = generateStbDataTail( - superTblInfo, + stbInfo, batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, startTime, &(pThreadInfo->samplePos), &dataLen); @@ -5732,8 +5764,206 @@ static int64_t generateInterlaceDataWithoutStb( } #if STMT_IFACE_ENABLED == 1 -static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, - char *dataType, int32_t dataLen, char **ptr, char *value) +static int32_t prepareStmtBindArrayByType( + TAOS_BIND *bind, + char *dataType, int32_t dataLen, + int32_t timePrec, + char *value) +{ + if (0 == strncasecmp(dataType, + "BINARY", strlen("BINARY"))) { + if (dataLen > TSDB_MAX_BINARY_LEN) { + errorPrint( "binary length overflow, max size:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); + return -1; + } + char *bind_binary; + + bind->buffer_type = TSDB_DATA_TYPE_BINARY; + if (value) { + bind_binary = calloc(1, strlen(value) + 1); + strncpy(bind_binary, value, strlen(value)); + bind->buffer_length = strlen(bind_binary); + } else { + bind_binary = calloc(1, dataLen + 1); + rand_string(bind_binary, dataLen); + bind->buffer_length = dataLen; + } + + bind->length = &bind->buffer_length; + bind->buffer = bind_binary; + bind->is_null = NULL; + } else if (0 == strncasecmp(dataType, + "NCHAR", strlen("NCHAR"))) { + if (dataLen > TSDB_MAX_BINARY_LEN) { + errorPrint( "nchar length overflow, max size:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); + return -1; + } + char *bind_nchar; + + bind->buffer_type = TSDB_DATA_TYPE_NCHAR; + if (value) { + bind_nchar = calloc(1, strlen(value) + 1); + strncpy(bind_nchar, value, strlen(value)); + } else { + bind_nchar = calloc(1, dataLen + 1); + rand_string(bind_nchar, dataLen); + } + + bind->buffer_length = strlen(bind_nchar); + bind->buffer = bind_nchar; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + } else if (0 == strncasecmp(dataType, + "INT", strlen("INT"))) { + int32_t *bind_int = malloc(sizeof(int32_t)); + + if (value) { + *bind_int = atoi(value); + } else { + *bind_int = rand_int(); + } + bind->buffer_type = TSDB_DATA_TYPE_INT; + bind->buffer_length = sizeof(int32_t); + bind->buffer = bind_int; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + } else if (0 == strncasecmp(dataType, + "BIGINT", strlen("BIGINT"))) { + int64_t *bind_bigint = malloc(sizeof(int64_t)); + + if (value) { + *bind_bigint = atoll(value); + } else { + *bind_bigint = rand_bigint(); + } + bind->buffer_type = TSDB_DATA_TYPE_BIGINT; + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_bigint; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + } else if (0 == strncasecmp(dataType, + "FLOAT", strlen("FLOAT"))) { + float *bind_float = malloc(sizeof(float)); + + if (value) { + *bind_float = (float)atof(value); + } else { + *bind_float = rand_float(); + } + bind->buffer_type = TSDB_DATA_TYPE_FLOAT; + bind->buffer_length = sizeof(float); + bind->buffer = bind_float; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + } else if (0 == strncasecmp(dataType, + "DOUBLE", strlen("DOUBLE"))) { + double *bind_double = malloc(sizeof(double)); + + if (value) { + *bind_double = atof(value); + } else { + *bind_double = rand_double(); + } + bind->buffer_type = TSDB_DATA_TYPE_DOUBLE; + bind->buffer_length = sizeof(double); + bind->buffer = bind_double; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + } else if (0 == strncasecmp(dataType, + "SMALLINT", strlen("SMALLINT"))) { + int16_t *bind_smallint = malloc(sizeof(int16_t)); + + if (value) { + *bind_smallint = (int16_t)atoi(value); + } else { + *bind_smallint = rand_smallint(); + } + bind->buffer_type = TSDB_DATA_TYPE_SMALLINT; + bind->buffer_length = sizeof(int16_t); + bind->buffer = bind_smallint; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + } else if (0 == strncasecmp(dataType, + "TINYINT", strlen("TINYINT"))) { + int8_t *bind_tinyint = malloc(sizeof(int8_t)); + + if (value) { + *bind_tinyint = (int8_t)atoi(value); + } else { + *bind_tinyint = rand_tinyint(); + } + bind->buffer_type = TSDB_DATA_TYPE_TINYINT; + bind->buffer_length = sizeof(int8_t); + bind->buffer = bind_tinyint; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + } else if (0 == strncasecmp(dataType, + "BOOL", strlen("BOOL"))) { + int8_t *bind_bool = malloc(sizeof(int8_t)); + + if (value) { + if (strncasecmp(value, "true", 4)) { + *bind_bool = true; + } else { + *bind_bool = false; + } + } else { + *bind_bool = rand_bool(); + } + bind->buffer_type = TSDB_DATA_TYPE_BOOL; + bind->buffer_length = sizeof(int8_t); + bind->buffer = bind_bool; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + } else if (0 == strncasecmp(dataType, + "TIMESTAMP", strlen("TIMESTAMP"))) { + int64_t *bind_ts2 = malloc(sizeof(int64_t)); + + if (value) { + if (strchr(value, ':') && strchr(value, '-')) { + int i = 0; + while(value[i] != '\0') { + if (value[i] == '\"' || value[i] == '\'') { + value[i] = ' '; + } + i++; + } + int64_t tmpEpoch; + if (TSDB_CODE_SUCCESS != taosParseTime( + value, &tmpEpoch, strlen(value), + timePrec, 0)) { + errorPrint("Input %s, time format error!\n", value); + return -1; + } + *bind_ts2 = tmpEpoch; + } else { + *bind_ts2 = atoll(value); + } + } else { + *bind_ts2 = rand_bigint(); + } + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_ts2; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + } else { + errorPrint( "No support data type: %s\n", dataType); + return -1; + } + + return 0; +} + +static int32_t prepareStmtBindArrayByTypeForRand( + TAOS_BIND *bind, + char *dataType, int32_t dataLen, + int32_t timePrec, + char **ptr, + char *value) { if (0 == strncasecmp(dataType, "BINARY", strlen("BINARY"))) { @@ -5814,7 +6044,7 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, *ptr += bind->buffer_length; } else if (0 == strncasecmp(dataType, "FLOAT", strlen("FLOAT"))) { - float *bind_float = (float *) *ptr; + float *bind_float = (float *)*ptr; if (value) { *bind_float = (float)atof(value); @@ -5830,7 +6060,7 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, *ptr += bind->buffer_length; } else if (0 == strncasecmp(dataType, "DOUBLE", strlen("DOUBLE"))) { - double *bind_double = (double *)*ptr; + double *bind_double = (double *)*ptr; if (value) { *bind_double = atof(value); @@ -5862,7 +6092,7 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, *ptr += bind->buffer_length; } else if (0 == strncasecmp(dataType, "TINYINT", strlen("TINYINT"))) { - int8_t *bind_tinyint = (int8_t *)*ptr; + int8_t *bind_tinyint = (int8_t *)*ptr; if (value) { *bind_tinyint = (int8_t)atoi(value); @@ -5874,12 +6104,21 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, bind->buffer = bind_tinyint; bind->length = &bind->buffer_length; bind->is_null = NULL; + *ptr += bind->buffer_length; } else if (0 == strncasecmp(dataType, "BOOL", strlen("BOOL"))) { - int8_t *bind_bool = (int8_t *)*ptr; + int8_t *bind_bool = (int8_t *)*ptr; - *bind_bool = rand_bool(); + if (value) { + if (strncasecmp(value, "true", 4)) { + *bind_bool = true; + } else { + *bind_bool = false; + } + } else { + *bind_bool = rand_bool(); + } bind->buffer_type = TSDB_DATA_TYPE_BOOL; bind->buffer_length = sizeof(int8_t); bind->buffer = bind_bool; @@ -5889,10 +6128,28 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, *ptr += bind->buffer_length; } else if (0 == strncasecmp(dataType, "TIMESTAMP", strlen("TIMESTAMP"))) { - int64_t *bind_ts2 = (int64_t *) *ptr; + int64_t *bind_ts2 = (int64_t *)*ptr; if (value) { - *bind_ts2 = atoll(value); + if (strchr(value, ':') && strchr(value, '-')) { + int i = 0; + while(value[i] != '\0') { + if (value[i] == '\"' || value[i] == '\'') { + value[i] = ' '; + } + i++; + } + int64_t tmpEpoch; + if (TSDB_CODE_SUCCESS != taosParseTime( + value, &tmpEpoch, strlen(value), + timePrec, 0)) { + errorPrint("Input %s, time format error!\n", value); + return -1; + } + *bind_ts2 = tmpEpoch; + } else { + *bind_ts2 = atoll(value); + } } else { *bind_ts2 = rand_bigint(); } @@ -5912,13 +6169,14 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, } static int32_t prepareStmtWithoutStb( - TAOS_STMT *stmt, + threadInfo *pThreadInfo, char *tableName, uint32_t batch, int64_t insertRows, int64_t recordFrom, int64_t startTime) { + TAOS_STMT *stmt = pThreadInfo->stmt; int ret = taos_stmt_set_tbname(stmt, tableName); if (ret != 0) { errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n", @@ -5938,15 +6196,11 @@ static int32_t prepareStmtWithoutStb( int32_t k = 0; for (k = 0; k < batch;) { /* columnCount + 1 (ts) */ - char data[MAX_DATA_SIZE]; - memset(data, 0, MAX_DATA_SIZE); - char *ptr = data; TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0); - int64_t *bind_ts; + int64_t *bind_ts = pThreadInfo->bind_ts; - bind_ts = (int64_t *)ptr; bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; if (g_args.disorderRatio) { @@ -5962,8 +6216,6 @@ static int32_t prepareStmtWithoutStb( bind->length = &bind->buffer_length; bind->is_null = NULL; - ptr += bind->buffer_length; - for (int i = 0; i < g_args.num_of_CPR; i ++) { bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1))); @@ -5971,7 +6223,8 @@ static int32_t prepareStmtWithoutStb( bind, data_type[i], g_args.len_of_binary, - &ptr, NULL)) { + pThreadInfo->time_precision, + NULL)) { return -1; } } @@ -5998,10 +6251,42 @@ static int32_t prepareStmtWithoutStb( return k; } -static int32_t prepareStbStmtBind( - char *bindArray, SSuperTable *stbInfo, bool sourceRand, +static int32_t prepareStbStmtBindTag( + char *bindArray, SSuperTable *stbInfo, + char *tagsVal, + int32_t timePrec) +{ + char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary); + if (bindBuffer == NULL) { + errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n", + __func__, __LINE__, g_args.len_of_binary); + return -1; + } + + TAOS_BIND *tag; + + for (int t = 0; t < stbInfo->tagCount; t ++) { + tag = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * t)); + if ( -1 == prepareStmtBindArrayByType( + tag, + stbInfo->tags[t].dataType, + stbInfo->tags[t].dataLen, + timePrec, + NULL)) { + free(bindBuffer); + return -1; + } + } + + free(bindBuffer); + return 0; +} + +static int32_t prepareStbStmtBindRand( + int64_t *ts, + char *bindArray, SSuperTable *stbInfo, int64_t startTime, int32_t recSeq, - bool isColumn) + int32_t timePrec) { char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary); if (bindBuffer == NULL) { @@ -6016,121 +6301,92 @@ static int32_t prepareStbStmtBind( TAOS_BIND *bind; - if (isColumn) { - int cursor = 0; + for (int i = 0; i < stbInfo->columnCount + 1; i ++) { + bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i)); - for (int i = 0; i < stbInfo->columnCount + 1; i ++) { - bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i)); + if (i == 0) { + int64_t *bind_ts = ts; - if (i == 0) { - int64_t *bind_ts; - - bind_ts = (int64_t *)ptr; - bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; - if (stbInfo->disorderRatio) { - *bind_ts = startTime + getTSRandTail( - stbInfo->timeStampStep, recSeq, - stbInfo->disorderRatio, - stbInfo->disorderRange); - } else { - *bind_ts = startTime + stbInfo->timeStampStep * recSeq; - } - bind->buffer_length = sizeof(int64_t); - bind->buffer = bind_ts; - bind->length = &bind->buffer_length; - bind->is_null = NULL; - - ptr += bind->buffer_length; + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + if (stbInfo->disorderRatio) { + *bind_ts = startTime + getTSRandTail( + stbInfo->timeStampStep, recSeq, + stbInfo->disorderRatio, + stbInfo->disorderRange); } else { - - if (sourceRand) { - if ( -1 == prepareStmtBindArrayByType( - bind, - stbInfo->columns[i-1].dataType, - stbInfo->columns[i-1].dataLen, - &ptr, - NULL)) { - free(bindBuffer); - return -1; - } - } else { - char *restStr = stbInfo->sampleDataBuf + cursor; - int lengthOfRest = strlen(restStr); - - int index = 0; - for (index = 0; index < lengthOfRest; index ++) { - if (restStr[index] == ',') { - break; - } - } - - memset(bindBuffer, 0, g_args.len_of_binary); - strncpy(bindBuffer, restStr, index); - cursor += index + 1; // skip ',' too - - if ( -1 == prepareStmtBindArrayByType( - bind, - stbInfo->columns[i-1].dataType, - stbInfo->columns[i-1].dataLen, - &ptr, - bindBuffer)) { - free(bindBuffer); - return -1; - } - } + *bind_ts = startTime + stbInfo->timeStampStep * recSeq; } - } - } else { - TAOS_BIND *tag; + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_ts; + bind->length = &bind->buffer_length; + bind->is_null = NULL; - for (int t = 0; t < stbInfo->tagCount; t ++) { - tag = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * t)); - if ( -1 == prepareStmtBindArrayByType( - tag, - stbInfo->tags[t].dataType, - stbInfo->tags[t].dataLen, - &ptr, - NULL)) { - free(bindBuffer); - return -1; - } + ptr += bind->buffer_length; + } else if ( -1 == prepareStmtBindArrayByTypeForRand( + bind, + stbInfo->columns[i-1].dataType, + stbInfo->columns[i-1].dataLen, + timePrec, + &ptr, + NULL)) { + tmfree(bindBuffer); + return -1; } - } - free(bindBuffer); + tmfree(bindBuffer); return 0; } -static int32_t prepareStbStmt( - SSuperTable *stbInfo, - TAOS_STMT *stmt, +static int32_t prepareStbStmtBindWithSample( + int64_t *ts, + char *bindArray, SSuperTable *stbInfo, + int64_t startTime, int32_t recSeq, + int32_t timePrec, + int64_t samplePos) +{ + TAOS_BIND *bind; + + bind = (TAOS_BIND *)bindArray; + + int64_t *bind_ts = ts; + + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + if (stbInfo->disorderRatio) { + *bind_ts = startTime + getTSRandTail( + stbInfo->timeStampStep, recSeq, + stbInfo->disorderRatio, + stbInfo->disorderRange); + } else { + *bind_ts = startTime + stbInfo->timeStampStep * recSeq; + } + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_ts; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + return 0; +} + +static int32_t prepareStbStmtRand( + threadInfo *pThreadInfo, char *tableName, int64_t tableSeq, uint32_t batch, uint64_t insertRows, uint64_t recordFrom, - int64_t startTime, - int64_t *pSamplePos) + int64_t startTime) { int ret; - - bool sourceRand; - if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) { - sourceRand = true; - } else { - sourceRand = false; // from sample data file - } + SSuperTable *stbInfo = pThreadInfo->stbInfo; + TAOS_STMT *stmt = pThreadInfo->stmt; if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) { char* tagsValBuf = NULL; - bool tagRand; if (0 == stbInfo->tagSource) { - tagRand = true; tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq); } else { - tagRand = false; tagsValBuf = getTagValueFromTagSample( stbInfo, tableSeq % stbInfo->tagSampleCount); @@ -6150,8 +6406,9 @@ static int32_t prepareStbStmt( return -1; } - if (-1 == prepareStbStmtBind( - tagsArray, stbInfo, tagRand, -1, -1, false /* is tag */)) { + if (-1 == prepareStbStmtBindTag( + tagsArray, stbInfo, tagsValBuf, pThreadInfo->time_precision + /* is tag */)) { tmfree(tagsValBuf); tmfree(tagsArray); return -1; @@ -6186,8 +6443,12 @@ static int32_t prepareStbStmt( uint32_t k; for (k = 0; k < batch;) { /* columnCount + 1 (ts) */ - if (-1 == prepareStbStmtBind(bindArray, stbInfo, sourceRand, - startTime, k, true /* is column */)) { + if (-1 == prepareStbStmtBindRand( + pThreadInfo->bind_ts, + bindArray, stbInfo, + startTime, k, + pThreadInfo->time_precision + /* is column */)) { free(bindArray); return -1; } @@ -6210,10 +6471,6 @@ static int32_t prepareStbStmt( k++; recordFrom ++; - if (!sourceRand) { - (*pSamplePos) ++; - } - if (recordFrom >= insertRows) { break; } @@ -6223,9 +6480,8 @@ static int32_t prepareStbStmt( return k; } -static int32_t prepareStbStmtInterlace( - SSuperTable *stbInfo, - TAOS_STMT *stmt, +static int32_t prepareStbStmtWithSample( + threadInfo *pThreadInfo, char *tableName, int64_t tableSeq, uint32_t batch, @@ -6234,41 +6490,109 @@ static int32_t prepareStbStmtInterlace( int64_t startTime, int64_t *pSamplePos) { - return prepareStbStmt( - stbInfo, - stmt, - tableName, - tableSeq, - batch, - insertRows, 0, startTime, - pSamplePos); -} + int ret; + SSuperTable *stbInfo = pThreadInfo->stbInfo; + TAOS_STMT *stmt = pThreadInfo->stmt; -static int32_t prepareStbStmtProgressive( - SSuperTable *stbInfo, - TAOS_STMT *stmt, - char *tableName, - int64_t tableSeq, - uint32_t batch, - uint64_t insertRows, - uint64_t recordFrom, - int64_t startTime, - int64_t *pSamplePos) -{ - return prepareStbStmt( - stbInfo, - stmt, - tableName, - tableSeq, - g_args.num_of_RPR, - insertRows, recordFrom, startTime, - pSamplePos); -} + if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) { + char* tagsValBuf = NULL; + if (0 == stbInfo->tagSource) { + tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq); + } else { + tagsValBuf = getTagValueFromTagSample( + stbInfo, + tableSeq % stbInfo->tagSampleCount); + } + + if (NULL == tagsValBuf) { + errorPrint("%s() LN%d, tag buf failed to allocate memory\n", + __func__, __LINE__); + return -1; + } + + char *tagsArray = calloc(1, sizeof(TAOS_BIND) * stbInfo->tagCount); + if (NULL == tagsArray) { + tmfree(tagsValBuf); + errorPrint("%s() LN%d, tag buf failed to allocate memory\n", + __func__, __LINE__); + return -1; + } + + if (-1 == prepareStbStmtBindTag( + tagsArray, stbInfo, tagsValBuf, pThreadInfo->time_precision + /* is tag */)) { + tmfree(tagsValBuf); + tmfree(tagsArray); + return -1; + } + + ret = taos_stmt_set_tbname_tags(stmt, tableName, (TAOS_BIND *)tagsArray); + + tmfree(tagsValBuf); + tmfree(tagsArray); + + if (0 != ret) { + errorPrint("%s() LN%d, stmt_set_tbname_tags() failed! reason: %s\n", + __func__, __LINE__, taos_stmt_errstr(stmt)); + return -1; + } + } else { + ret = taos_stmt_set_tbname(stmt, tableName); + if (0 != ret) { + errorPrint("%s() LN%d, stmt_set_tbname() failed! reason: %s\n", + __func__, __LINE__, taos_stmt_errstr(stmt)); + return -1; + } + } + + uint32_t k; + for (k = 0; k < batch;) { + char *bindArray = (char *)(*((uintptr_t *) + (stbInfo->sampleBindArray + (sizeof(char *)) * (*pSamplePos)))); + /* columnCount + 1 (ts) */ + if (-1 == prepareStbStmtBindWithSample( + pThreadInfo->bind_ts, + bindArray, stbInfo, + startTime, k, + pThreadInfo->time_precision, + *pSamplePos + /* is column */)) { + return -1; + } + ret = taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray); + if (0 != ret) { + errorPrint("%s() LN%d, stmt_bind_param() failed! reason: %s\n", + __func__, __LINE__, taos_stmt_errstr(stmt)); + return -1; + } + // if msg > 3MB, break + ret = taos_stmt_add_batch(stmt); + if (0 != ret) { + errorPrint("%s() LN%d, stmt_add_batch() failed! reason: %s\n", + __func__, __LINE__, taos_stmt_errstr(stmt)); + return -1; + } + + k++; + recordFrom ++; + + (*pSamplePos) ++; + if ((*pSamplePos) == MAX_SAMPLES_ONCE_FROM_FILE) { + *pSamplePos = 0; + } + + if (recordFrom >= insertRows) { + break; + } + } + + return k; +} #endif static int32_t generateStbProgressiveData( - SSuperTable *superTblInfo, + SSuperTable *stbInfo, char *tableName, int64_t tableSeq, char *dbName, char *buffer, @@ -6282,7 +6606,7 @@ static int32_t generateStbProgressiveData( memset(pstr, 0, *pRemainderBufLen); int64_t headLen = generateStbSQLHead( - superTblInfo, + stbInfo, tableName, tableSeq, dbName, buffer, *pRemainderBufLen); @@ -6294,7 +6618,7 @@ static int32_t generateStbProgressiveData( int64_t dataLen; - return generateStbDataTail(superTblInfo, + return generateStbDataTail(stbInfo, g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, recordFrom, startTime, @@ -6354,26 +6678,34 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { int64_t nTimeStampStep; uint64_t insert_interval; - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + bool sourceRand; - if (superTblInfo) { - insertRows = superTblInfo->insertRows; + SSuperTable* stbInfo = pThreadInfo->stbInfo; - if ((superTblInfo->interlaceRows == 0) + if (stbInfo) { + insertRows = stbInfo->insertRows; + + if ((stbInfo->interlaceRows == 0) && (g_args.interlace_rows > 0)) { interlaceRows = g_args.interlace_rows; } else { - interlaceRows = superTblInfo->interlaceRows; + interlaceRows = stbInfo->interlaceRows; + } + maxSqlLen = stbInfo->maxSqlLen; + nTimeStampStep = stbInfo->timeStampStep; + insert_interval = stbInfo->insertInterval; + if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) { + sourceRand = true; + } else { + sourceRand = false; // from sample data file } - maxSqlLen = superTblInfo->maxSqlLen; - nTimeStampStep = superTblInfo->timeStampStep; - insert_interval = superTblInfo->insertInterval; } else { insertRows = g_args.num_of_DPT; interlaceRows = g_args.interlace_rows; maxSqlLen = g_args.max_sql_len; nTimeStampStep = g_args.timestamp_step; insert_interval = g_args.insert_interval; + sourceRand = true; } debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", @@ -6456,28 +6788,38 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { uint64_t oldRemainderLen = remainderBufLen; int32_t generated; - if (superTblInfo) { - if (superTblInfo->iface == STMT_IFACE) { + if (stbInfo) { + if (stbInfo->iface == STMT_IFACE) { #if STMT_IFACE_ENABLED == 1 - generated = prepareStbStmtInterlace( - superTblInfo, - pThreadInfo->stmt, - tableName, - tableSeq, - batchPerTbl, - insertRows, i, - startTime, - &(pThreadInfo->samplePos)); + if (sourceRand) { + generated = prepareStbStmtRand( + pThreadInfo, + tableName, + tableSeq, + batchPerTbl, + insertRows, 0, + startTime + ); + } else { + generated = prepareStbStmtWithSample( + pThreadInfo, + tableName, + tableSeq, + batchPerTbl, + insertRows, 0, + startTime, + &(pThreadInfo->samplePos)); + } #else generated = -1; #endif } else { generated = generateStbInterlaceData( - superTblInfo, + pThreadInfo, tableName, batchPerTbl, i, batchPerTblTimes, tableSeq, - pThreadInfo, pstr, + pstr, insertRows, startTime, &remainderBufLen); @@ -6490,7 +6832,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { tableName, batchPerTbl, startTime); #if STMT_IFACE_ENABLED == 1 generated = prepareStmtWithoutStb( - pThreadInfo->stmt, tableName, + pThreadInfo, + tableName, batchPerTbl, insertRows, i, startTime); @@ -6639,12 +6982,12 @@ free_of_interlace: static void* syncWriteProgressive(threadInfo *pThreadInfo) { debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; + SSuperTable* stbInfo = pThreadInfo->stbInfo; + uint64_t maxSqlLen = stbInfo?stbInfo->maxSqlLen:g_args.max_sql_len; int64_t timeStampStep = - superTblInfo?superTblInfo->timeStampStep:g_args.timestamp_step; + stbInfo?stbInfo->timeStampStep:g_args.timestamp_step; int64_t insertRows = - (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; + (stbInfo)?stbInfo->insertRows:g_args.num_of_DPT; verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); @@ -6663,6 +7006,17 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { pThreadInfo->totalInsertRows = 0; pThreadInfo->totalAffectedRows = 0; + bool sourceRand; + if (stbInfo) { + if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) { + sourceRand = true; + } else { + sourceRand = false; // from sample data file + } + } else { + sourceRand = true; + } + pThreadInfo->samplePos = 0; int percentComplete = 0; @@ -6696,24 +7050,35 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { remainderBufLen -= len; int32_t generated; - if (superTblInfo) { - if (superTblInfo->iface == STMT_IFACE) { + if (stbInfo) { + if (stbInfo->iface == STMT_IFACE) { #if STMT_IFACE_ENABLED == 1 - generated = prepareStbStmtProgressive( - superTblInfo, - pThreadInfo->stmt, - tableName, - tableSeq, - g_args.num_of_RPR, - insertRows, i, start_time, - &(pThreadInfo->samplePos)); + if (sourceRand) { + generated = prepareStbStmtRand( + pThreadInfo, + tableName, + tableSeq, + g_args.num_of_RPR, + insertRows, + i, start_time + ); + } else { + generated = prepareStbStmtWithSample( + pThreadInfo, + tableName, + tableSeq, + g_args.num_of_RPR, + insertRows, i, start_time, + &(pThreadInfo->samplePos)); + } #else generated = -1; #endif } else { generated = generateStbProgressiveData( - superTblInfo, - tableName, tableSeq, pThreadInfo->db_name, pstr, + stbInfo, + tableName, tableSeq, + pThreadInfo->db_name, pstr, insertRows, i, start_time, &(pThreadInfo->samplePos), &remainderBufLen); @@ -6722,7 +7087,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { if (g_args.iface == STMT_IFACE) { #if STMT_IFACE_ENABLED == 1 generated = prepareStmtWithoutStb( - pThreadInfo->stmt, + pThreadInfo, tableName, g_args.num_of_RPR, insertRows, i, @@ -6792,9 +7157,9 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { } // num_of_DPT if ((g_args.verbose_print) && - (tableSeq == pThreadInfo->ntables - 1) && (superTblInfo) + (tableSeq == pThreadInfo->ntables - 1) && (stbInfo) && (0 == strncasecmp( - superTblInfo->dataSource, + stbInfo->dataSource, "sample", strlen("sample")))) { verbosePrint("%s() LN%d samplePos=%"PRId64"\n", __func__, __LINE__, pThreadInfo->samplePos); @@ -6812,18 +7177,18 @@ free_of_progressive: static void* syncWrite(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + SSuperTable* stbInfo = pThreadInfo->stbInfo; setThreadName("syncWrite"); uint32_t interlaceRows; - if (superTblInfo) { - if ((superTblInfo->interlaceRows == 0) + if (stbInfo) { + if ((stbInfo->interlaceRows == 0) && (g_args.interlace_rows > 0)) { interlaceRows = g_args.interlace_rows; } else { - interlaceRows = superTblInfo->interlaceRows; + interlaceRows = stbInfo->interlaceRows; } } else { interlaceRows = g_args.interlace_rows; @@ -6840,10 +7205,10 @@ static void* syncWrite(void *sarg) { static void callBack(void *param, TAOS_RES *res, int code) { threadInfo* pThreadInfo = (threadInfo*)param; - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + SSuperTable* stbInfo = pThreadInfo->stbInfo; int insert_interval = - superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; + stbInfo?stbInfo->insertInterval:g_args.insert_interval; if (insert_interval) { pThreadInfo->et = taosGetTimestampMs(); if ((pThreadInfo->et - pThreadInfo->st) < insert_interval) { @@ -6851,13 +7216,13 @@ static void callBack(void *param, TAOS_RES *res, int code) { } } - char *buffer = calloc(1, pThreadInfo->superTblInfo->maxSqlLen); + char *buffer = calloc(1, pThreadInfo->stbInfo->maxSqlLen); char data[MAX_DATA_SIZE]; char *pstr = buffer; pstr += sprintf(pstr, "insert into %s.%s%"PRId64" values", pThreadInfo->db_name, pThreadInfo->tb_prefix, pThreadInfo->start_table_from); - // if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) { + // if (pThreadInfo->counter >= pThreadInfo->stbInfo->insertRows) { if (pThreadInfo->counter >= g_args.num_of_RPR) { pThreadInfo->start_table_from++; pThreadInfo->counter = 0; @@ -6871,15 +7236,15 @@ static void callBack(void *param, TAOS_RES *res, int code) { for (int i = 0; i < g_args.num_of_RPR; i++) { int rand_num = taosRandom() % 100; - if (0 != pThreadInfo->superTblInfo->disorderRatio - && rand_num < pThreadInfo->superTblInfo->disorderRatio) { + if (0 != pThreadInfo->stbInfo->disorderRatio + && rand_num < pThreadInfo->stbInfo->disorderRatio) { int64_t d = pThreadInfo->lastTs - - (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1); - generateStbRowData(pThreadInfo->superTblInfo, data, + - (taosRandom() % pThreadInfo->stbInfo->disorderRange + 1); + generateStbRowData(pThreadInfo->stbInfo, data, MAX_DATA_SIZE, d); } else { - generateStbRowData(pThreadInfo->superTblInfo, + generateStbRowData(pThreadInfo->stbInfo, data, MAX_DATA_SIZE, pThreadInfo->lastTs += 1000); @@ -6887,7 +7252,7 @@ static void callBack(void *param, TAOS_RES *res, int code) { pstr += sprintf(pstr, "%s", data); pThreadInfo->counter++; - if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) { + if (pThreadInfo->counter >= pThreadInfo->stbInfo->insertRows) { break; } } @@ -6903,7 +7268,7 @@ static void callBack(void *param, TAOS_RES *res, int code) { static void *asyncWrite(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + SSuperTable* stbInfo = pThreadInfo->stbInfo; setThreadName("asyncWrite"); @@ -6912,7 +7277,7 @@ static void *asyncWrite(void *sarg) { pThreadInfo->lastTs = pThreadInfo->start_time; int insert_interval = - superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; + stbInfo?stbInfo->insertInterval:g_args.insert_interval; if (insert_interval) { pThreadInfo->st = taosGetTimestampMs(); } @@ -6949,8 +7314,81 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * return 0; } +#if STMT_IFACE_ENABLED == 1 +static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec) +{ + stbInfo->sampleBindArray = calloc(1, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE); + if (stbInfo->sampleBindArray == NULL) { + errorPrint("%s() LN%d, Failed to allocate %"PRIu64" bind array buffer\n", + __func__, __LINE__, (uint64_t)sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE); + return -1; + } + + + for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) { + char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1)); + if (bindArray == NULL) { + errorPrint("%s() LN%d, Failed to allocate %d bind params\n", + __func__, __LINE__, (stbInfo->columnCount + 1)); + return -1; + } + + + TAOS_BIND *bind; + int cursor = 0; + + for (int c = 0; c < stbInfo->columnCount + 1; c++) { + bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * c)); + + if (c == 0) { + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + bind->buffer_length = sizeof(int64_t); + bind->buffer = NULL; //bind_ts; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + } else { + char *restStr = stbInfo->sampleDataBuf + + stbInfo->lenOfOneRow * i + cursor; + int lengthOfRest = strlen(restStr); + + int index = 0; + for (index = 0; index < lengthOfRest; index ++) { + if (restStr[index] == ',') { + break; + } + } + + char *bindBuffer = calloc(1, index + 1); + if (bindBuffer == NULL) { + errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n", + __func__, __LINE__, DOUBLE_BUFF_LEN); + return -1; + } + + strncpy(bindBuffer, restStr, index); + cursor += index + 1; // skip ',' too + + if (-1 == prepareStmtBindArrayByType( + bind, + stbInfo->columns[c-1].dataType, + stbInfo->columns[c-1].dataLen, + timePrec, + bindBuffer)) { + free(bindBuffer); + return -1; + } + free(bindBuffer); + } + } + *((uintptr_t *)(stbInfo->sampleBindArray + (sizeof(char *)) * i)) = (uintptr_t)bindArray; + } + + return 0; +} +#endif + static void startMultiThreadInsertData(int threads, char* db_name, - char* precision, SSuperTable* superTblInfo) { + char* precision, SSuperTable* stbInfo) { int32_t timePrec = TSDB_TIME_PRECISION_MILLI; if (0 != precision[0]) { @@ -6964,19 +7402,19 @@ static void startMultiThreadInsertData(int threads, char* db_name, #endif } else { errorPrint("Not support precision: %s\n", precision); - exit(-1); + exit(EXIT_FAILURE); } } int64_t start_time; - if (superTblInfo) { - if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { + if (stbInfo) { + if (0 == strncasecmp(stbInfo->startTimestamp, "now", 3)) { start_time = taosGetTimestamp(timePrec); } else { if (TSDB_CODE_SUCCESS != taosParseTime( - superTblInfo->startTimestamp, + stbInfo->startTimestamp, &start_time, - strlen(superTblInfo->startTimestamp), + strlen(stbInfo->startTimestamp), timePrec, 0)) { ERROR_EXIT("failed to parse time!\n"); } @@ -6990,12 +7428,12 @@ static void startMultiThreadInsertData(int threads, char* db_name, int64_t start = taosGetTimestampMs(); // read sample data from file first - if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, + if ((stbInfo) && (0 == strncasecmp(stbInfo->dataSource, "sample", strlen("sample")))) { - if (0 != prepareSampleDataForSTable(superTblInfo)) { + if (0 != prepareSampleDataForSTable(stbInfo)) { errorPrint("%s() LN%d, prepare sample data for stable failed!\n", __func__, __LINE__); - exit(-1); + exit(EXIT_FAILURE); } } @@ -7005,68 +7443,68 @@ static void startMultiThreadInsertData(int threads, char* db_name, if (NULL == taos0) { errorPrint("%s() LN%d, connect to server fail , reason: %s\n", __func__, __LINE__, taos_errstr(NULL)); - exit(-1); + exit(EXIT_FAILURE); } int64_t ntables = 0; uint64_t tableFrom; - if (superTblInfo) { + if (stbInfo) { int64_t limit; uint64_t offset; if ((NULL != g_args.sqlFile) - && (superTblInfo->childTblExists == TBL_NO_EXISTS) - && ((superTblInfo->childTblOffset != 0) - || (superTblInfo->childTblLimit >= 0))) { + && (stbInfo->childTblExists == TBL_NO_EXISTS) + && ((stbInfo->childTblOffset != 0) + || (stbInfo->childTblLimit >= 0))) { printf("WARNING: offset and limit will not be used since the child tables not exists!\n"); } - if (superTblInfo->childTblExists == TBL_ALREADY_EXISTS) { - if ((superTblInfo->childTblLimit < 0) - || ((superTblInfo->childTblOffset - + superTblInfo->childTblLimit) - > (superTblInfo->childTblCount))) { - superTblInfo->childTblLimit = - superTblInfo->childTblCount - superTblInfo->childTblOffset; + if (stbInfo->childTblExists == TBL_ALREADY_EXISTS) { + if ((stbInfo->childTblLimit < 0) + || ((stbInfo->childTblOffset + + stbInfo->childTblLimit) + > (stbInfo->childTblCount))) { + stbInfo->childTblLimit = + stbInfo->childTblCount - stbInfo->childTblOffset; } - offset = superTblInfo->childTblOffset; - limit = superTblInfo->childTblLimit; + offset = stbInfo->childTblOffset; + limit = stbInfo->childTblLimit; } else { - limit = superTblInfo->childTblCount; + limit = stbInfo->childTblCount; offset = 0; } ntables = limit; tableFrom = offset; - if ((superTblInfo->childTblExists != TBL_NO_EXISTS) - && ((superTblInfo->childTblOffset + superTblInfo->childTblLimit ) - > superTblInfo->childTblCount)) { + if ((stbInfo->childTblExists != TBL_NO_EXISTS) + && ((stbInfo->childTblOffset + stbInfo->childTblLimit) + > stbInfo->childTblCount)) { printf("WARNING: specified offset + limit > child table count!\n"); prompt(); } - if ((superTblInfo->childTblExists != TBL_NO_EXISTS) - && (0 == superTblInfo->childTblLimit)) { + if ((stbInfo->childTblExists != TBL_NO_EXISTS) + && (0 == stbInfo->childTblLimit)) { printf("WARNING: specified limit = 0, which cannot find table name to insert or query! \n"); prompt(); } - superTblInfo->childTblName = (char*)calloc(1, + stbInfo->childTblName = (char*)calloc(1, limit * TSDB_TABLE_NAME_LEN); - if (superTblInfo->childTblName == NULL) { - errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__); + if (stbInfo->childTblName == NULL) { taos_close(taos0); - exit(-1); + errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__); + exit(EXIT_FAILURE); } int64_t childTblCount; getChildNameOfSuperTableWithLimitAndOffset( taos0, - db_name, superTblInfo->sTblName, - &superTblInfo->childTblName, &childTblCount, + db_name, stbInfo->sTblName, + &stbInfo->childTblName, &childTblCount, limit, offset); } else { @@ -7087,11 +7525,11 @@ static void startMultiThreadInsertData(int threads, char* db_name, b = ntables % threads; } - if ((superTblInfo) - && (superTblInfo->iface == REST_IFACE)) { + if ((stbInfo) + && (stbInfo->iface == REST_IFACE)) { if (convertHostToServAddr( g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) { - exit(-1); + ERROR_EXIT("convert host to server address"); } } @@ -7104,98 +7542,110 @@ static void startMultiThreadInsertData(int threads, char* db_name, memset(pids, 0, threads * sizeof(pthread_t)); memset(infos, 0, threads * sizeof(threadInfo)); +#if STMT_IFACE_ENABLED == 1 + char *stmtBuffer = calloc(1, BUFFER_SIZE); + assert(stmtBuffer); + if ((g_args.iface == STMT_IFACE) + || ((stbInfo) + && (stbInfo->iface == STMT_IFACE))) { + char *pstr = stmtBuffer; + + if ((stbInfo) + && (AUTO_CREATE_SUBTBL + == stbInfo->autoCreateTable)) { + pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?", + stbInfo->sTblName); + for (int tag = 0; tag < (stbInfo->tagCount - 1); + tag ++ ) { + pstr += sprintf(pstr, ",?"); + } + pstr += sprintf(pstr, ") VALUES(?"); + } else { + pstr += sprintf(pstr, "INSERT INTO ? VALUES(?"); + } + + int columnCount; + if (stbInfo) { + columnCount = stbInfo->columnCount; + } else { + columnCount = g_args.num_of_CPR; + } + + for (int col = 0; col < columnCount; col ++) { + pstr += sprintf(pstr, ",?"); + } + pstr += sprintf(pstr, ")"); + + debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer); + + if ((stbInfo) && (0 == strncasecmp(stbInfo->dataSource, + "sample", strlen("sample")))) { + parseSampleFileToStmt(stbInfo, timePrec); + } + } +#endif + for (int i = 0; i < threads; i++) { threadInfo *pThreadInfo = infos + i; pThreadInfo->threadID = i; tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN); pThreadInfo->time_precision = timePrec; - pThreadInfo->superTblInfo = superTblInfo; + pThreadInfo->stbInfo = stbInfo; pThreadInfo->start_time = start_time; pThreadInfo->minDelay = UINT64_MAX; - if ((NULL == superTblInfo) || - (superTblInfo->iface != REST_IFACE)) { + if ((NULL == stbInfo) || + (stbInfo->iface != REST_IFACE)) { //t_info->taos = taos; pThreadInfo->taos = taos_connect( g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); if (NULL == pThreadInfo->taos) { + free(infos); errorPrint( "%s() LN%d, connect to server fail from insert sub thread, reason: %s\n", __func__, __LINE__, taos_errstr(NULL)); - free(infos); - exit(-1); + exit(EXIT_FAILURE); } #if STMT_IFACE_ENABLED == 1 if ((g_args.iface == STMT_IFACE) - || ((superTblInfo) - && (superTblInfo->iface == STMT_IFACE))) { + || ((stbInfo) + && (stbInfo->iface == STMT_IFACE))) { - int columnCount; - if (superTblInfo) { - columnCount = superTblInfo->columnCount; - } else { - columnCount = g_args.num_of_CPR; - } pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos); if (NULL == pThreadInfo->stmt) { + free(pids); + free(infos); errorPrint( "%s() LN%d, failed init stmt, reason: %s\n", __func__, __LINE__, taos_errstr(NULL)); + exit(EXIT_FAILURE); + } + + int ret = taos_stmt_prepare(pThreadInfo->stmt, stmtBuffer, 0); + if (ret != 0){ free(pids); free(infos); - exit(-1); - } - - char *buffer = calloc(1, BUFFER_SIZE); - assert(buffer); - char *pstr = buffer; - - if ((superTblInfo) - && (AUTO_CREATE_SUBTBL - == superTblInfo->autoCreateTable)) { - pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?", - superTblInfo->sTblName); - for (int tag = 0; tag < (superTblInfo->tagCount - 1); - tag ++ ) { - pstr += sprintf(pstr, ",?"); - } - pstr += sprintf(pstr, ") VALUES(?"); - } else { - pstr += sprintf(pstr, "INSERT INTO ? VALUES(?"); - } - - for (int col = 0; col < columnCount; col ++) { - pstr += sprintf(pstr, ",?"); - } - pstr += sprintf(pstr, ")"); - - debugPrint("%s() LN%d, buffer: %s", __func__, __LINE__, buffer); - int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0); - if (ret != 0){ + free(stmtBuffer); errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n", ret, taos_stmt_errstr(pThreadInfo->stmt)); - free(pids); - free(infos); - free(buffer); - exit(-1); + exit(EXIT_FAILURE); } - - free(buffer); + pThreadInfo->bind_ts = malloc(sizeof(int64_t)); } #endif } else { pThreadInfo->taos = NULL; } - /* if ((NULL == superTblInfo) - || (0 == superTblInfo->multiThreadWriteOneTbl)) { + /* if ((NULL == stbInfo) + || (0 == stbInfo->multiThreadWriteOneTbl)) { */ pThreadInfo->start_table_from = tableFrom; pThreadInfo->ntables = iend_table_to + 1; /* } else { pThreadInfo->start_table_from = 0; - pThreadInfo->ntables = superTblInfo->childTblCount; + pThreadInfo->ntables = stbInfo->childTblCount; pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint(); } */ @@ -7215,6 +7665,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, } } +#if STMT_IFACE_ENABLED == 1 + free(stmtBuffer); +#endif + for (int i = 0; i < threads; i++) { pthread_join(pids[i], NULL); } @@ -7228,11 +7682,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, for (int i = 0; i < threads; i++) { threadInfo *pThreadInfo = infos + i; - tsem_destroy(&(pThreadInfo->lock_sem)); - #if STMT_IFACE_ENABLED == 1 if (pThreadInfo->stmt) { taos_stmt_close(pThreadInfo->stmt); + tmfree((char *)pThreadInfo->bind_ts); } #endif tsem_destroy(&(pThreadInfo->lock_sem)); @@ -7242,9 +7695,9 @@ static void startMultiThreadInsertData(int threads, char* db_name, __func__, __LINE__, pThreadInfo->threadID, pThreadInfo->totalInsertRows, pThreadInfo->totalAffectedRows); - if (superTblInfo) { - superTblInfo->totalAffectedRows += pThreadInfo->totalAffectedRows; - superTblInfo->totalInsertRows += pThreadInfo->totalInsertRows; + if (stbInfo) { + stbInfo->totalAffectedRows += pThreadInfo->totalAffectedRows; + stbInfo->totalInsertRows += pThreadInfo->totalInsertRows; } else { g_args.totalAffectedRows += pThreadInfo->totalAffectedRows; g_args.totalInsertRows += pThreadInfo->totalInsertRows; @@ -7265,22 +7718,22 @@ static void startMultiThreadInsertData(int threads, char* db_name, double tInMs = t/1000.0; - if (superTblInfo) { + if (stbInfo) { fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n", - tInMs, superTblInfo->totalInsertRows, - superTblInfo->totalAffectedRows, - threads, db_name, superTblInfo->sTblName, + tInMs, stbInfo->totalInsertRows, + stbInfo->totalAffectedRows, + threads, db_name, stbInfo->sTblName, (tInMs)? - (double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX); + (double)(stbInfo->totalInsertRows/tInMs):FLT_MAX); if (g_fpOfInsertResult) { fprintf(g_fpOfInsertResult, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n", - tInMs, superTblInfo->totalInsertRows, - superTblInfo->totalAffectedRows, - threads, db_name, superTblInfo->sTblName, + tInMs, stbInfo->totalInsertRows, + stbInfo->totalAffectedRows, + threads, db_name, stbInfo->sTblName, (tInMs)? - (double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX); + (double)(stbInfo->totalInsertRows/tInMs):FLT_MAX); } } else { fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n", @@ -7335,8 +7788,8 @@ static void *readTable(void *sarg) { } int64_t num_of_DPT; - /* if (pThreadInfo->superTblInfo) { - num_of_DPT = pThreadInfo->superTblInfo->insertRows; // nrecords_per_table; + /* if (pThreadInfo->stbInfo) { + num_of_DPT = pThreadInfo->stbInfo->insertRows; // nrecords_per_table; } else { */ num_of_DPT = g_args.num_of_DPT; @@ -7410,7 +7863,7 @@ static void *readMetric(void *sarg) { return NULL; } - int64_t num_of_DPT = pThreadInfo->superTblInfo->insertRows; + int64_t num_of_DPT = pThreadInfo->stbInfo->insertRows; int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1; int64_t totalData = num_of_DPT * num_of_tables; bool do_aggreFunc = g_Dbs.do_aggreFunc; @@ -7549,14 +8002,14 @@ static int insertTestProcess() { if (g_Dbs.db[i].superTblCount > 0) { for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { - SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; + SSuperTable* stbInfo = &g_Dbs.db[i].superTbls[j]; - if (superTblInfo && (superTblInfo->insertRows > 0)) { + if (stbInfo && (stbInfo->insertRows > 0)) { startMultiThreadInsertData( g_Dbs.threadCount, g_Dbs.db[i].dbName, g_Dbs.db[i].dbCfg.precision, - superTblInfo); + stbInfo); } } } @@ -7776,7 +8229,7 @@ static int queryTestProcess() { if (taos == NULL) { errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); - exit(-1); + exit(EXIT_FAILURE); } if (0 != g_queryInfo.superQueryInfo.sqlCount) { @@ -7796,7 +8249,7 @@ static int queryTestProcess() { if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) { if (convertHostToServAddr( g_queryInfo.host, g_queryInfo.port, &g_queryInfo.serv_addr) != 0) - exit(-1); + ERROR_EXIT("convert host to server address"); } pthread_t *pids = NULL; @@ -8000,10 +8453,10 @@ static void *superSubscribe(void *sarg) { setThreadName("superSub"); if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { + free(subSqlStr); errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n", pThreadInfo->ntables, MAX_QUERY_SQL_COUNT); - free(subSqlStr); - exit(-1); + exit(EXIT_FAILURE); } if (pThreadInfo->taos == NULL) { @@ -8269,7 +8722,7 @@ static int subscribeTestProcess() { if (taos == NULL) { errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); - exit(-1); + exit(EXIT_FAILURE); } if (0 != g_queryInfo.superQueryInfo.sqlCount) { @@ -8298,7 +8751,7 @@ static int subscribeTestProcess() { errorPrint("%s() LN%d, sepcified query sqlCount %d.\n", __func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount); - exit(-1); + exit(EXIT_FAILURE); } pids = calloc( @@ -8313,7 +8766,7 @@ static int subscribeTestProcess() { sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); - exit(-1); + exit(EXIT_FAILURE); } for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { @@ -8350,7 +8803,7 @@ static int subscribeTestProcess() { errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); // taos_close(taos); - exit(-1); + exit(EXIT_FAILURE); } int64_t ntables = g_queryInfo.superQueryInfo.childTblCount; @@ -8553,8 +9006,7 @@ static int regexMatch(const char *s, const char *reg, int cflags) { /* Compile regular expression */ if (regcomp(®ex, reg, cflags) != 0) { - printf("Fail to compile regex\n"); - exit(-1); + ERROR_EXIT("Fail to compile regex\n"); } /* Execute regular expression */ @@ -8567,9 +9019,9 @@ static int regexMatch(const char *s, const char *reg, int cflags) { return 0; } else { regerror(reti, ®ex, msgbuf, sizeof(msgbuf)); - printf("Regex match failed: %s\n", msgbuf); regfree(®ex); - exit(-1); + printf("Regex match failed: %s\n", msgbuf); + exit(EXIT_FAILURE); } return 0; @@ -8671,7 +9123,7 @@ static void queryResult() { if (g_args.use_metric) { pThreadInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount; pThreadInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1; - pThreadInfo->superTblInfo = &g_Dbs.db[0].superTbls[0]; + pThreadInfo->stbInfo = &g_Dbs.db[0].superTbls[0]; tstrncpy(pThreadInfo->tb_prefix, g_Dbs.db[0].superTbls[0].childTblPrefix, TBNAME_PREFIX_LEN); } else { @@ -8687,10 +9139,10 @@ static void queryResult() { g_Dbs.db[0].dbName, g_Dbs.port); if (pThreadInfo->taos == NULL) { + free(pThreadInfo); errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); - free(pThreadInfo); - exit(-1); + exit(EXIT_FAILURE); } tstrncpy(pThreadInfo->filePath, g_Dbs.resultFile, MAX_FILE_NAME_LEN);