From 3e64ef252f99f09b4a92e37f6f2b9c021554d986 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 12 Mar 2021 13:51:55 +0800 Subject: [PATCH] [TD-3192] : support stb limit and offset. generate data refactor. --- src/kit/taosdemo/taosdemo.c | 310 +++++++++++++++++++----------------- 1 file changed, 164 insertions(+), 146 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 4e5a59550b..083752ec7e 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -98,6 +98,8 @@ extern char configDir[]; #define MAX_DATABASE_COUNT 256 #define INPUT_BUF_LEN 256 +#define DEFAULT_TIMESTAMP_STEP 10 + typedef enum CREATE_SUB_TALBE_MOD_EN { PRE_CREATE_SUBTBL, AUTO_CREATE_SUBTBL, @@ -3307,7 +3309,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { if (timestampStep && timestampStep->type == cJSON_Number) { g_Dbs.db[i].superTbls[j].timeStampStep = timestampStep->valueint; } else if (!timestampStep) { - g_Dbs.db[i].superTbls[j].timeStampStep = 1000; + g_Dbs.db[i].superTbls[j].timeStampStep = DEFAULT_TIMESTAMP_STEP; } else { printf("ERROR: failed to read json, timestamp_step not found\n"); goto PARSE_OVER; @@ -4344,6 +4346,154 @@ static int execInsert(threadInfo *winfo, char *buffer, int k) return affectedRows; } +static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *buffer, + int64_t insertRows, + int64_t startFrom, int64_t startTime, int *pSampleUsePos) +{ + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + + int ncols_per_record = 1; // count first col ts + + if (superTblInfo == NULL) { + int datatypeSeq = 0; + while(g_args.datatype[datatypeSeq]) { + datatypeSeq ++; + ncols_per_record ++; + } + } + + memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); + + char *pstr = buffer; + + if (superTblInfo) { + if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { + char* tagsValBuf = NULL; + if (0 == superTblInfo->tagSource) { + tagsValBuf = generateTagVaulesForStb(superTblInfo); + } else { + tagsValBuf = getTagValueFromTagSample( + superTblInfo, + threadID % superTblInfo->tagSampleCount); + } + if (NULL == tagsValBuf) { + fprintf(stderr, "tag buf failed to allocate memory\n"); + return -1; + } + + pstr += snprintf(pstr, + superTblInfo->maxSqlLen, + "insert into %s.%s%d using %s.%s tags %s values", + pThreadInfo->db_name, + superTblInfo->childTblPrefix, + threadID, + pThreadInfo->db_name, + superTblInfo->sTblName, + tagsValBuf); + tmfree(tagsValBuf); + } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { + pstr += snprintf(pstr, + superTblInfo->maxSqlLen, + "insert into %s.%s values", + pThreadInfo->db_name, + superTblInfo->childTblName + threadID * TSDB_TABLE_NAME_LEN); + } else { + pstr += snprintf(pstr, + (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), + "insert into %s.%s%d values", + pThreadInfo->db_name, + superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, + threadID); + } + } else { + pstr += snprintf(pstr, + (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), + "insert into %s.%s%d values", + pThreadInfo->db_name, + superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, + threadID); + } + + int k; + int len = 0; + + verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); + for (k = 0; k < g_args.num_of_RPR;) { + if (superTblInfo) { + int retLen = 0; + + if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { + retLen = getRowDataFromSample( + pstr + len, + superTblInfo->maxSqlLen - len, + startTime + superTblInfo->timeStampStep * startFrom, + superTblInfo, + pSampleUsePos); + } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { + int rand_num = rand_tinyint() % 100; + if (0 != superTblInfo->disorderRatio + && rand_num < superTblInfo->disorderRatio) { + int64_t d = startTime - rand() % superTblInfo->disorderRange; + retLen = generateRowData( + pstr + len, + superTblInfo->maxSqlLen - len, + d, + superTblInfo); + //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, start_time, d); + } else { + retLen = generateRowData( + pstr + len, + superTblInfo->maxSqlLen - len, + startTime + superTblInfo->timeStampStep * startFrom, + superTblInfo); + } + + if (retLen < 0) { + return -1; + } + + len += retLen; + } + } else { + int rand_num = rand() % 100; + char data[MAX_DATA_SIZE]; + char **data_type = g_args.datatype; + int lenOfBinary = g_args.len_of_binary; + + if ((g_args.disorderRatio != 0) + && (rand_num < g_args.disorderRange)) { + + int64_t d = startTime - rand() % 1000000 + rand_num; + len = generateData(data, data_type, + ncols_per_record, d, lenOfBinary); + } else { + len = generateData(data, data_type, + ncols_per_record, + startTime + DEFAULT_TIMESTAMP_STEP * startFrom, + lenOfBinary); + } + + //assert(len + pstr - buffer < BUFFER_SIZE); + if (len + pstr - buffer >= g_args.max_sql_len) { // too long + break; + } + + pstr += sprintf(pstr, " %s", data); + } + + verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); + + k++; + startFrom ++; + + debugPrint("%s() LN%d k=%d startFrom=%ld insertRows=%ld\n", __func__, __LINE__, k, startFrom, insertRows); + if (startFrom >= insertRows) + break; + } + + return k; +} + // sync insertion /* 1 thread: 100 tables * 2000 rows/s @@ -4357,9 +4507,6 @@ static void* syncWrite(void *sarg) { threadInfo *winfo = (threadInfo *)sarg; SSuperTable* superTblInfo = winfo->superTblInfo; - int ncols_per_record = 1; // count first col ts - - int samplePos = 0; if (superTblInfo) { if (0 != prepareSampleData(superTblInfo)) @@ -4370,14 +4517,9 @@ static void* syncWrite(void *sarg) { tmfree(superTblInfo->sampleDataBuf); return NULL; } - } else { - int datatypeSeq = 0; - while(g_args.datatype[datatypeSeq]) { - datatypeSeq ++; - ncols_per_record ++; - } - } + + int samplePos = 0; char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); if (NULL == buffer) { @@ -4413,148 +4555,24 @@ static void* syncWrite(void *sarg) { verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); for (int64_t i = 0; i < insertRows;) { - int64_t prepared = i; - if (insert_interval) { st = taosGetTimestampUs(); } sampleUsePos = samplePos; - memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); + int generated = generateDataBuffer(tID, winfo, buffer, insertRows, + i, start_time, &sampleUsePos); + if (generated > 0) + i += generated; + else + goto free_and_statistics_2; - char *pstr = buffer; - - if (superTblInfo) { - - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { - char* tagsValBuf = NULL; - if (0 == superTblInfo->tagSource) { - tagsValBuf = generateTagVaulesForStb(superTblInfo); - } else { - tagsValBuf = getTagValueFromTagSample( - superTblInfo, - tID % superTblInfo->tagSampleCount); - } - if (NULL == tagsValBuf) { - goto free_and_statistics_2; - } - - pstr += snprintf(pstr, - superTblInfo->maxSqlLen, - "insert into %s.%s%d using %s.%s tags %s values", - winfo->db_name, - superTblInfo->childTblPrefix, - tID, - winfo->db_name, - superTblInfo->sTblName, - tagsValBuf); - tmfree(tagsValBuf); - } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { - pstr += snprintf(pstr, - superTblInfo->maxSqlLen, - "insert into %s.%s values", - winfo->db_name, - superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN); - } else { - pstr += snprintf(pstr, - (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), - "insert into %s.%s%d values", - winfo->db_name, - superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, - tID); - } - } else { - - pstr += snprintf(pstr, - (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), - "insert into %s.%s%d values", - winfo->db_name, - superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, - tID); - } - - int k; - int len = 0; - - verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); - for (k = 0; k < g_args.num_of_RPR;) { - - if (superTblInfo) { - int retLen = 0; - - if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { - retLen = getRowDataFromSample( - pstr + len, - superTblInfo->maxSqlLen - len, - start_time + superTblInfo->timeStampStep * i, - superTblInfo, - &sampleUsePos); - } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { - int rand_num = rand_tinyint() % 100; - if (0 != superTblInfo->disorderRatio - && rand_num < superTblInfo->disorderRatio) { - int64_t d = start_time - rand() % superTblInfo->disorderRange; - retLen = generateRowData( - pstr + len, - superTblInfo->maxSqlLen - len, - d, - superTblInfo); - //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, start_time, d); - } else { - retLen = generateRowData( - pstr + len, - superTblInfo->maxSqlLen - len, - start_time + superTblInfo->timeStampStep * i, - superTblInfo); - } - - if (retLen < 0) { - goto free_and_statistics_2; - } - - len += retLen; - } - } else { - int rand_num = rand() % 100; - char data[MAX_DATA_SIZE]; - char **data_type = g_args.datatype; - int lenOfBinary = g_args.len_of_binary; - - if ((g_args.disorderRatio != 0) - && (rand_num < g_args.disorderRange)) { - - int64_t d = start_time - rand() % 1000000 + rand_num; - len = generateData(data, data_type, - ncols_per_record, d, lenOfBinary); - } else { - len = generateData(data, data_type, - ncols_per_record, start_time += 1000, lenOfBinary); - } - - //assert(len + pstr - buffer < BUFFER_SIZE); - if (len + pstr - buffer >= g_args.max_sql_len) { // too long - break; - } - - pstr += sprintf(pstr, " %s", data); - } - - verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); - - prepared ++; - k++; - i++; - - if (prepared >= insertRows) - break; - } - - int affectedRows = execInsert(winfo, buffer, k); + int affectedRows = execInsert(winfo, buffer, generated); if (affectedRows < 0) goto free_and_statistics_2; - winfo->totalInsertRows += k; + winfo->totalInsertRows += generated; winfo->totalAffectedRows += affectedRows; endTs = taosGetTimestampUs(); @@ -4573,7 +4591,7 @@ static void* syncWrite(void *sarg) { lastPrintTime = currentPrintTime; } - if (prepared >= insertRows) + if (i >= insertRows) break; if (insert_interval) { @@ -4581,7 +4599,7 @@ static void* syncWrite(void *sarg) { if (insert_interval > ((et - st)/1000) ) { int sleep_time = insert_interval - (et -st)/1000; - printf("sleep: %d ms for insert interval\n", sleep_time); + verbosePrint("%s() LN%d sleep: %d ms for insert interval\n", __func__, __LINE__, sleep_time); taosMsleep(sleep_time); // ms } } @@ -5675,7 +5693,7 @@ void setParaFromArg(){ tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp, "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); - g_Dbs.db[0].superTbls[0].timeStampStep = 10; + g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP; g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE;