Feature/sangshuduo/td 3317 taosdemo interlace (#5500)
* [TD-3316] <fix>: add testcase for taosdemo limit and offset. check offset 0. * [TD-3316] <fix>: add testcase for taosdemo limit and offset. fix sample file import bug. * [TD-3316] <fix>: add test case for limit and offset. fix sample data issue. * [TD-3327] <fix>: fix taosdemo segfault when import data from sample data file. * [TD-3317] <feature>: make taosdemo support interlace mode. json parameter rows_per_tbl support. * [TD-3317] <feature>: support interlace mode. refactor * [TD-3317] <feature>: support interlace mode. refactor * [TD-3317] <feature>: support interlace mode insertion. refactor. * [TD-3317] <feature>: support interlace mode insertion. change json file. * [TD-3317] <feature>: support interlace mode insertion. fix multithread create table regression. * [TD-3317] <feature>: support interlace mode insertion. working but not perfect. * [TD-3317] <feature>: support interlace mode insertion. rename lowaTest with taosdemoTestWithJson * [TD-3317] <feature>: support interlace mode insertion. perfect * [TD-3317] <feature>: support interlace mode insertion. cleanup. * [TD-3317] <feature>: support interlace mode insertion. adjust algorithm of loop times. * [TD-3317] <feature>: support interlace mode insertion. fix delay time bug. * [TD-3317] <feature>: support interlace mode insertion. fix progressive timestamp bug. * [TD-3317] <feature>: support interlace mode insertion. add an option for performance print. * [TD-3317] <feature>: support interlace mode insertion. change json test case with less table for acceleration. * [TD-3317] <feature>: support interlace mode insertion. change progressive mode timestamp step and testcase. Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
This commit is contained in:
parent
8805404cd5
commit
b294d3ebaa
|
@ -191,6 +191,7 @@ typedef struct SArguments_S {
|
||||||
bool answer_yes;
|
bool answer_yes;
|
||||||
bool debug_print;
|
bool debug_print;
|
||||||
bool verbose_print;
|
bool verbose_print;
|
||||||
|
bool performance_print;
|
||||||
char * output_file;
|
char * output_file;
|
||||||
int mode;
|
int mode;
|
||||||
char * datatype[MAX_NUM_DATATYPE + 1];
|
char * datatype[MAX_NUM_DATATYPE + 1];
|
||||||
|
@ -440,7 +441,7 @@ typedef unsigned __int32 uint32_t;
|
||||||
static HANDLE g_stdoutHandle;
|
static HANDLE g_stdoutHandle;
|
||||||
static DWORD g_consoleMode;
|
static DWORD g_consoleMode;
|
||||||
|
|
||||||
void setupForAnsiEscape(void) {
|
static void setupForAnsiEscape(void) {
|
||||||
DWORD mode = 0;
|
DWORD mode = 0;
|
||||||
g_stdoutHandle = GetStdHandle(STD_OUTPUT_HANDLE);
|
g_stdoutHandle = GetStdHandle(STD_OUTPUT_HANDLE);
|
||||||
|
|
||||||
|
@ -462,7 +463,7 @@ void setupForAnsiEscape(void) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void resetAfterAnsiEscape(void) {
|
static void resetAfterAnsiEscape(void) {
|
||||||
// Reset colors
|
// Reset colors
|
||||||
printf("\x1b[0m");
|
printf("\x1b[0m");
|
||||||
|
|
||||||
|
@ -472,7 +473,7 @@ void resetAfterAnsiEscape(void) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosRandom()
|
static int taosRandom()
|
||||||
{
|
{
|
||||||
int number;
|
int number;
|
||||||
rand_s(&number);
|
rand_s(&number);
|
||||||
|
@ -480,14 +481,14 @@ int taosRandom()
|
||||||
return number;
|
return number;
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
void setupForAnsiEscape(void) {}
|
static void setupForAnsiEscape(void) {}
|
||||||
|
|
||||||
void resetAfterAnsiEscape(void) {
|
static void resetAfterAnsiEscape(void) {
|
||||||
// Reset colors
|
// Reset colors
|
||||||
printf("\x1b[0m");
|
printf("\x1b[0m");
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosRandom()
|
static int taosRandom()
|
||||||
{
|
{
|
||||||
return random();
|
return random();
|
||||||
}
|
}
|
||||||
|
@ -526,6 +527,7 @@ SArguments g_args = {
|
||||||
false, // insert_only
|
false, // insert_only
|
||||||
false, // debug_print
|
false, // debug_print
|
||||||
false, // verbose_print
|
false, // verbose_print
|
||||||
|
false, // performance statistic print
|
||||||
false, // answer_yes;
|
false, // answer_yes;
|
||||||
"./output.txt", // output_file
|
"./output.txt", // output_file
|
||||||
0, // mode : sync or async
|
0, // mode : sync or async
|
||||||
|
@ -572,6 +574,10 @@ static FILE * g_fpOfInsertResult = NULL;
|
||||||
do { if (g_args.verbose_print) \
|
do { if (g_args.verbose_print) \
|
||||||
fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0)
|
fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0)
|
||||||
|
|
||||||
|
#define performancePrint(fmt, ...) \
|
||||||
|
do { if (g_args.performance_print) \
|
||||||
|
fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0)
|
||||||
|
|
||||||
#define errorPrint(fmt, ...) \
|
#define errorPrint(fmt, ...) \
|
||||||
do { fprintf(stderr, "ERROR: "fmt, __VA_ARGS__); } while(0)
|
do { fprintf(stderr, "ERROR: "fmt, __VA_ARGS__); } while(0)
|
||||||
|
|
||||||
|
@ -580,7 +586,7 @@ static FILE * g_fpOfInsertResult = NULL;
|
||||||
|
|
||||||
static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); }
|
static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); }
|
||||||
|
|
||||||
void printHelp() {
|
static void printHelp() {
|
||||||
char indent[10] = " ";
|
char indent[10] = " ";
|
||||||
printf("%s%s%s%s\n", indent, "-f", indent,
|
printf("%s%s%s%s\n", indent, "-f", indent,
|
||||||
"The meta file to the execution procedure. Default is './meta.json'.");
|
"The meta file to the execution procedure. Default is './meta.json'.");
|
||||||
|
@ -642,7 +648,7 @@ void printHelp() {
|
||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
void parse_args(int argc, char *argv[], SArguments *arguments) {
|
static void parse_args(int argc, char *argv[], SArguments *arguments) {
|
||||||
char **sptr;
|
char **sptr;
|
||||||
wordexp_t full_path;
|
wordexp_t full_path;
|
||||||
|
|
||||||
|
@ -746,6 +752,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
|
||||||
arguments->debug_print = true;
|
arguments->debug_print = true;
|
||||||
} else if (strcmp(argv[i], "-gg") == 0) {
|
} else if (strcmp(argv[i], "-gg") == 0) {
|
||||||
arguments->verbose_print = true;
|
arguments->verbose_print = true;
|
||||||
|
} else if (strcmp(argv[i], "-pp") == 0) {
|
||||||
|
arguments->performance_print = true;
|
||||||
} else if (strcmp(argv[i], "-c") == 0) {
|
} else if (strcmp(argv[i], "-c") == 0) {
|
||||||
strcpy(configDir, argv[++i]);
|
strcpy(configDir, argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-O") == 0) {
|
} else if (strcmp(argv[i], "-O") == 0) {
|
||||||
|
@ -833,13 +841,13 @@ static bool getInfoFromJsonFile(char* file);
|
||||||
//static int generateOneRowDataForStb(SSuperTable* stbInfo);
|
//static int generateOneRowDataForStb(SSuperTable* stbInfo);
|
||||||
//static int getDataIntoMemForStb(SSuperTable* stbInfo);
|
//static int getDataIntoMemForStb(SSuperTable* stbInfo);
|
||||||
static void init_rand_data();
|
static void init_rand_data();
|
||||||
void tmfclose(FILE *fp) {
|
static void tmfclose(FILE *fp) {
|
||||||
if (NULL != fp) {
|
if (NULL != fp) {
|
||||||
fclose(fp);
|
fclose(fp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmfree(char *buf) {
|
static void tmfree(char *buf) {
|
||||||
if (NULL != buf) {
|
if (NULL != buf) {
|
||||||
free(buf);
|
free(buf);
|
||||||
}
|
}
|
||||||
|
@ -938,7 +946,7 @@ static void selectAndGetResult(TAOS *taos, char *command, char* resultFileName)
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
double getCurrentTime() {
|
static double getCurrentTime() {
|
||||||
struct timeval tv;
|
struct timeval tv;
|
||||||
if (gettimeofday(&tv, NULL) != 0) {
|
if (gettimeofday(&tv, NULL) != 0) {
|
||||||
perror("Failed to get current time in ms");
|
perror("Failed to get current time in ms");
|
||||||
|
@ -992,7 +1000,7 @@ static float rand_float(){
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
|
static const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
|
||||||
void rand_string(char *str, int size) {
|
static void rand_string(char *str, int size) {
|
||||||
str[0] = 0;
|
str[0] = 0;
|
||||||
if (size > 0) {
|
if (size > 0) {
|
||||||
//--size;
|
//--size;
|
||||||
|
@ -2787,20 +2795,6 @@ static int readSampleFromCsvFileToMem(
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
void readSampleFromFileToMem(SSuperTable * supterTblInfo) {
|
|
||||||
int ret;
|
|
||||||
if (0 == strncasecmp(supterTblInfo->sampleFormat, "csv", 3)) {
|
|
||||||
ret = readSampleFromCsvFileToMem(supterTblInfo);
|
|
||||||
} else if (0 == strncasecmp(supterTblInfo->sampleFormat, "json", 4)) {
|
|
||||||
ret = readSampleFromJsonFileToMem(supterTblInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (0 != ret) {
|
|
||||||
exit(-1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
static bool getColumnAndTagTypeFromInsertJsonFile(
|
static bool getColumnAndTagTypeFromInsertJsonFile(
|
||||||
cJSON* stbInfo, SSuperTable* superTbls) {
|
cJSON* stbInfo, SSuperTable* superTbls) {
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
|
@ -3976,10 +3970,6 @@ PARSE_OVER:
|
||||||
static void prepareSampleData() {
|
static void prepareSampleData() {
|
||||||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||||
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
||||||
//if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) {
|
|
||||||
// readSampleFromFileToMem(&g_Dbs.db[i].superTbls[j]);
|
|
||||||
//}
|
|
||||||
|
|
||||||
if (g_Dbs.db[i].superTbls[j].tagsFile[0] != 0) {
|
if (g_Dbs.db[i].superTbls[j].tagsFile[0] != 0) {
|
||||||
(void)readTagFromCsvFileToMem(&g_Dbs.db[i].superTbls[j]);
|
(void)readTagFromCsvFileToMem(&g_Dbs.db[i].superTbls[j]);
|
||||||
}
|
}
|
||||||
|
@ -4094,7 +4084,7 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper
|
||||||
return dataLen;
|
return dataLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t generateData(char *res, char **data_type,
|
static int32_t generateData(char *res, char **data_type,
|
||||||
int num_of_cols, int64_t timestamp, int lenOfBinary) {
|
int num_of_cols, int64_t timestamp, int lenOfBinary) {
|
||||||
memset(res, 0, MAX_DATA_SIZE);
|
memset(res, 0, MAX_DATA_SIZE);
|
||||||
char *pstr = res;
|
char *pstr = res;
|
||||||
|
@ -4227,8 +4217,7 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq)
|
||||||
}
|
}
|
||||||
|
|
||||||
static int generateDataTail(char *tableName, int32_t tableSeq,
|
static int generateDataTail(char *tableName, int32_t tableSeq,
|
||||||
threadInfo* pThreadInfo,
|
threadInfo* pThreadInfo, SSuperTable* superTblInfo,
|
||||||
SSuperTable* superTblInfo,
|
|
||||||
int batch, char* buffer, int64_t insertRows,
|
int batch, char* buffer, int64_t insertRows,
|
||||||
int64_t startFrom, uint64_t startTime, int *pSamplePos, int *dataLen) {
|
int64_t startFrom, uint64_t startTime, int *pSamplePos, int *dataLen) {
|
||||||
int len = 0;
|
int len = 0;
|
||||||
|
@ -4254,7 +4243,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
|
||||||
retLen = getRowDataFromSample(
|
retLen = getRowDataFromSample(
|
||||||
buffer + len,
|
buffer + len,
|
||||||
superTblInfo->maxSqlLen - len,
|
superTblInfo->maxSqlLen - len,
|
||||||
startTime + superTblInfo->timeStampStep * startFrom,
|
startTime + superTblInfo->timeStampStep * k,
|
||||||
superTblInfo,
|
superTblInfo,
|
||||||
pSamplePos);
|
pSamplePos);
|
||||||
} else if (0 == strncasecmp(superTblInfo->dataSource,
|
} else if (0 == strncasecmp(superTblInfo->dataSource,
|
||||||
|
@ -4262,7 +4251,9 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
|
||||||
int rand_num = rand_tinyint() % 100;
|
int rand_num = rand_tinyint() % 100;
|
||||||
if (0 != superTblInfo->disorderRatio
|
if (0 != superTblInfo->disorderRatio
|
||||||
&& rand_num < superTblInfo->disorderRatio) {
|
&& rand_num < superTblInfo->disorderRatio) {
|
||||||
int64_t d = startTime - taosRandom() % superTblInfo->disorderRange;
|
int64_t d = startTime
|
||||||
|
+ superTblInfo->timeStampStep * k
|
||||||
|
- taosRandom() % superTblInfo->disorderRange;
|
||||||
retLen = generateRowData(
|
retLen = generateRowData(
|
||||||
buffer + len,
|
buffer + len,
|
||||||
superTblInfo->maxSqlLen - len,
|
superTblInfo->maxSqlLen - len,
|
||||||
|
@ -4272,7 +4263,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
|
||||||
retLen = generateRowData(
|
retLen = generateRowData(
|
||||||
buffer + len,
|
buffer + len,
|
||||||
superTblInfo->maxSqlLen - len,
|
superTblInfo->maxSqlLen - len,
|
||||||
startTime + superTblInfo->timeStampStep * startFrom,
|
startTime + superTblInfo->timeStampStep * k,
|
||||||
superTblInfo);
|
superTblInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4403,7 +4394,8 @@ static int generateDataBuffer(char *pTblName,
|
||||||
|
|
||||||
char *pstr = buffer;
|
char *pstr = buffer;
|
||||||
|
|
||||||
int headLen = generateSQLHead(pTblName, tableSeq, pThreadInfo, superTblInfo, buffer);
|
int headLen = generateSQLHead(pTblName, tableSeq, pThreadInfo, superTblInfo,
|
||||||
|
buffer);
|
||||||
pstr += headLen;
|
pstr += headLen;
|
||||||
|
|
||||||
int k;
|
int k;
|
||||||
|
@ -4448,6 +4440,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
|
|
||||||
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
|
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
|
||||||
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
|
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
|
||||||
|
int timeStempStep = superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
|
||||||
uint64_t st = 0;
|
uint64_t st = 0;
|
||||||
uint64_t et = 0xffffffff;
|
uint64_t et = 0xffffffff;
|
||||||
|
|
||||||
|
@ -4519,8 +4512,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
generateDataTail(
|
generateDataTail(
|
||||||
tableName, tableSeq, pThreadInfo, superTblInfo,
|
tableName, tableSeq, pThreadInfo, superTblInfo,
|
||||||
batchPerTbl, pstr, insertRows, 0,
|
batchPerTbl, pstr, insertRows, 0,
|
||||||
startTime + sleepTimeTotal +
|
startTime + sleepTimeTotal + 0 * timeStempStep,
|
||||||
pThreadInfo->totalInsertRows * superTblInfo->timeStampStep,
|
|
||||||
&(pThreadInfo->samplePos), &dataLen);
|
&(pThreadInfo->samplePos), &dataLen);
|
||||||
pstr += dataLen;
|
pstr += dataLen;
|
||||||
recOfBatch += batchPerTbl;
|
recOfBatch += batchPerTbl;
|
||||||
|
@ -4562,7 +4554,20 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
verbosePrint("[%d] %s() LN%d, buffer=%s\n",
|
verbosePrint("[%d] %s() LN%d, buffer=%s\n",
|
||||||
pThreadInfo->threadID, __func__, __LINE__, buffer);
|
pThreadInfo->threadID, __func__, __LINE__, buffer);
|
||||||
|
|
||||||
|
startTs = taosGetTimestampUs();
|
||||||
|
|
||||||
int affectedRows = execInsert(pThreadInfo, buffer, recOfBatch);
|
int affectedRows = execInsert(pThreadInfo, buffer, recOfBatch);
|
||||||
|
|
||||||
|
endTs = taosGetTimestampUs();
|
||||||
|
int64_t delay = endTs - startTs;
|
||||||
|
performancePrint("%s() LN%d, insert execution time is %10.6fms\n",
|
||||||
|
__func__, __LINE__, delay/1000.0);
|
||||||
|
|
||||||
|
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
|
||||||
|
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
|
||||||
|
pThreadInfo->cntDelay++;
|
||||||
|
pThreadInfo->totalDelay += delay;
|
||||||
|
|
||||||
verbosePrint("[%d] %s() LN%d affectedRows=%d\n", pThreadInfo->threadID,
|
verbosePrint("[%d] %s() LN%d affectedRows=%d\n", pThreadInfo->threadID,
|
||||||
__func__, __LINE__, affectedRows);
|
__func__, __LINE__, affectedRows);
|
||||||
if ((affectedRows < 0) || (recOfBatch != affectedRows)) {
|
if ((affectedRows < 0) || (recOfBatch != affectedRows)) {
|
||||||
|
@ -4574,13 +4579,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
|
|
||||||
pThreadInfo->totalAffectedRows += affectedRows;
|
pThreadInfo->totalAffectedRows += affectedRows;
|
||||||
|
|
||||||
endTs = taosGetTimestampUs();
|
|
||||||
int64_t delay = endTs - startTs;
|
|
||||||
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
|
|
||||||
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
|
|
||||||
pThreadInfo->cntDelay++;
|
|
||||||
pThreadInfo->totalDelay += delay;
|
|
||||||
|
|
||||||
int64_t currentPrintTime = taosGetTimestampMs();
|
int64_t currentPrintTime = taosGetTimestampMs();
|
||||||
if (currentPrintTime - lastPrintTime > 30*1000) {
|
if (currentPrintTime - lastPrintTime > 30*1000) {
|
||||||
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
|
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
|
||||||
|
@ -4595,8 +4593,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
|
|
||||||
if (insert_interval > ((et - st)/1000) ) {
|
if (insert_interval > ((et - st)/1000) ) {
|
||||||
int sleepTime = insert_interval - (et -st)/1000;
|
int sleepTime = insert_interval - (et -st)/1000;
|
||||||
// verbosePrint("%s() LN%d sleep: %d ms for insert interval\n",
|
performancePrint("%s() LN%d sleep: %d ms for insert interval\n",
|
||||||
// __func__, __LINE__, sleepTime);
|
__func__, __LINE__, sleepTime);
|
||||||
taosMsleep(sleepTime); // ms
|
taosMsleep(sleepTime); // ms
|
||||||
sleepTimeTotal += insert_interval;
|
sleepTimeTotal += insert_interval;
|
||||||
}
|
}
|
||||||
|
@ -4638,6 +4636,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
int64_t startTs = taosGetTimestampUs();
|
int64_t startTs = taosGetTimestampUs();
|
||||||
int64_t endTs;
|
int64_t endTs;
|
||||||
|
|
||||||
|
int timeStampStep = superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
|
||||||
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
|
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
|
||||||
uint64_t st = 0;
|
uint64_t st = 0;
|
||||||
uint64_t et = 0xffffffff;
|
uint64_t et = 0xffffffff;
|
||||||
|
@ -4665,27 +4664,36 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
__func__, __LINE__,
|
__func__, __LINE__,
|
||||||
pThreadInfo->threadID, tableSeq, tableName);
|
pThreadInfo->threadID, tableSeq, tableName);
|
||||||
|
|
||||||
int generated = generateDataBuffer(tableName, tableSeq, pThreadInfo, buffer, insertRows,
|
int generated = generateDataBuffer(
|
||||||
i, start_time, &(pThreadInfo->samplePos));
|
tableName, tableSeq, pThreadInfo, buffer, insertRows,
|
||||||
|
i, start_time + pThreadInfo->totalInsertRows * timeStampStep,
|
||||||
|
&(pThreadInfo->samplePos));
|
||||||
if (generated > 0)
|
if (generated > 0)
|
||||||
i += generated;
|
i += generated;
|
||||||
else
|
else
|
||||||
goto free_and_statistics_2;
|
goto free_and_statistics_2;
|
||||||
|
|
||||||
int affectedRows = execInsert(pThreadInfo, buffer, generated);
|
|
||||||
if (affectedRows < 0)
|
|
||||||
goto free_and_statistics_2;
|
|
||||||
|
|
||||||
pThreadInfo->totalInsertRows += generated;
|
pThreadInfo->totalInsertRows += generated;
|
||||||
pThreadInfo->totalAffectedRows += affectedRows;
|
|
||||||
|
startTs = taosGetTimestampUs();
|
||||||
|
|
||||||
|
int affectedRows = execInsert(pThreadInfo, buffer, generated);
|
||||||
|
|
||||||
endTs = taosGetTimestampUs();
|
endTs = taosGetTimestampUs();
|
||||||
int64_t delay = endTs - startTs;
|
int64_t delay = endTs - startTs;
|
||||||
|
performancePrint("%s() LN%d, insert execution time is %10.6fms\n",
|
||||||
|
__func__, __LINE__, delay/1000.0);
|
||||||
|
|
||||||
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
|
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
|
||||||
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
|
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
|
||||||
pThreadInfo->cntDelay++;
|
pThreadInfo->cntDelay++;
|
||||||
pThreadInfo->totalDelay += delay;
|
pThreadInfo->totalDelay += delay;
|
||||||
|
|
||||||
|
if (affectedRows < 0)
|
||||||
|
goto free_and_statistics_2;
|
||||||
|
|
||||||
|
pThreadInfo->totalAffectedRows += affectedRows;
|
||||||
|
|
||||||
int64_t currentPrintTime = taosGetTimestampMs();
|
int64_t currentPrintTime = taosGetTimestampMs();
|
||||||
if (currentPrintTime - lastPrintTime > 30*1000) {
|
if (currentPrintTime - lastPrintTime > 30*1000) {
|
||||||
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
|
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
|
||||||
|
@ -4703,7 +4711,8 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
|
|
||||||
if (insert_interval > ((et - st)/1000) ) {
|
if (insert_interval > ((et - st)/1000) ) {
|
||||||
int sleep_time = insert_interval - (et -st)/1000;
|
int sleep_time = insert_interval - (et -st)/1000;
|
||||||
verbosePrint("%s() LN%d sleep: %d ms for insert interval\n", __func__, __LINE__, sleep_time);
|
performancePrint("%s() LN%d sleep: %d ms for insert interval\n",
|
||||||
|
__func__, __LINE__, sleep_time);
|
||||||
taosMsleep(sleep_time); // ms
|
taosMsleep(sleep_time); // ms
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4743,7 +4752,7 @@ static void* syncWrite(void *sarg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void callBack(void *param, TAOS_RES *res, int code) {
|
static void callBack(void *param, TAOS_RES *res, int code) {
|
||||||
threadInfo* winfo = (threadInfo*)param;
|
threadInfo* winfo = (threadInfo*)param;
|
||||||
SSuperTable* superTblInfo = winfo->superTblInfo;
|
SSuperTable* superTblInfo = winfo->superTblInfo;
|
||||||
|
|
||||||
|
@ -4802,7 +4811,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *asyncWrite(void *sarg) {
|
static void *asyncWrite(void *sarg) {
|
||||||
threadInfo *winfo = (threadInfo *)sarg;
|
threadInfo *winfo = (threadInfo *)sarg;
|
||||||
SSuperTable* superTblInfo = winfo->superTblInfo;
|
SSuperTable* superTblInfo = winfo->superTblInfo;
|
||||||
|
|
||||||
|
@ -5084,7 +5093,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
||||||
free(infos);
|
free(infos);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *readTable(void *sarg) {
|
static void *readTable(void *sarg) {
|
||||||
#if 1
|
#if 1
|
||||||
threadInfo *rinfo = (threadInfo *)sarg;
|
threadInfo *rinfo = (threadInfo *)sarg;
|
||||||
TAOS *taos = rinfo->taos;
|
TAOS *taos = rinfo->taos;
|
||||||
|
@ -5155,7 +5164,7 @@ void *readTable(void *sarg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *readMetric(void *sarg) {
|
static void *readMetric(void *sarg) {
|
||||||
#if 1
|
#if 1
|
||||||
threadInfo *rinfo = (threadInfo *)sarg;
|
threadInfo *rinfo = (threadInfo *)sarg;
|
||||||
TAOS *taos = rinfo->taos;
|
TAOS *taos = rinfo->taos;
|
||||||
|
@ -5318,7 +5327,7 @@ static int insertTestProcess() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *superQueryProcess(void *sarg) {
|
static void *superQueryProcess(void *sarg) {
|
||||||
threadInfo *winfo = (threadInfo *)sarg;
|
threadInfo *winfo = (threadInfo *)sarg;
|
||||||
|
|
||||||
//char sqlStr[MAX_TB_NAME_SIZE*2];
|
//char sqlStr[MAX_TB_NAME_SIZE*2];
|
||||||
|
@ -5583,7 +5592,7 @@ static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultF
|
||||||
return tsub;
|
return tsub;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *subSubscribeProcess(void *sarg) {
|
static void *subSubscribeProcess(void *sarg) {
|
||||||
threadInfo *winfo = (threadInfo *)sarg;
|
threadInfo *winfo = (threadInfo *)sarg;
|
||||||
char subSqlstr[1024];
|
char subSqlstr[1024];
|
||||||
|
|
||||||
|
@ -5650,7 +5659,7 @@ void *subSubscribeProcess(void *sarg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *superSubscribeProcess(void *sarg) {
|
static void *superSubscribeProcess(void *sarg) {
|
||||||
threadInfo *winfo = (threadInfo *)sarg;
|
threadInfo *winfo = (threadInfo *)sarg;
|
||||||
|
|
||||||
char sqlStr[MAX_TB_NAME_SIZE*2];
|
char sqlStr[MAX_TB_NAME_SIZE*2];
|
||||||
|
@ -5826,7 +5835,7 @@ static int subscribeTestProcess() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void initOfInsertMeta() {
|
static void initOfInsertMeta() {
|
||||||
memset(&g_Dbs, 0, sizeof(SDbs));
|
memset(&g_Dbs, 0, sizeof(SDbs));
|
||||||
|
|
||||||
// set default values
|
// set default values
|
||||||
|
@ -5838,7 +5847,7 @@ void initOfInsertMeta() {
|
||||||
g_Dbs.use_metric = true;
|
g_Dbs.use_metric = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void initOfQueryMeta() {
|
static void initOfQueryMeta() {
|
||||||
memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo));
|
memset(&g_queryInfo, 0, sizeof(SQueryMetaInfo));
|
||||||
|
|
||||||
// set default values
|
// set default values
|
||||||
|
@ -5848,7 +5857,7 @@ void initOfQueryMeta() {
|
||||||
tstrncpy(g_queryInfo.password, TSDB_DEFAULT_PASS, MAX_DB_NAME_SIZE);
|
tstrncpy(g_queryInfo.password, TSDB_DEFAULT_PASS, MAX_DB_NAME_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
void setParaFromArg(){
|
static void setParaFromArg(){
|
||||||
if (g_args.host) {
|
if (g_args.host) {
|
||||||
strcpy(g_Dbs.host, g_args.host);
|
strcpy(g_Dbs.host, g_args.host);
|
||||||
} else {
|
} else {
|
||||||
|
@ -5989,7 +5998,7 @@ static int isCommentLine(char *line) {
|
||||||
return regexMatch(line, "^\\s*#.*", REG_EXTENDED);
|
return regexMatch(line, "^\\s*#.*", REG_EXTENDED);
|
||||||
}
|
}
|
||||||
|
|
||||||
void querySqlFile(TAOS* taos, char* sqlFile)
|
static void querySqlFile(TAOS* taos, char* sqlFile)
|
||||||
{
|
{
|
||||||
FILE *fp = fopen(sqlFile, "r");
|
FILE *fp = fopen(sqlFile, "r");
|
||||||
if (fp == NULL) {
|
if (fp == NULL) {
|
||||||
|
|
|
@ -5,11 +5,13 @@
|
||||||
"port": 6030,
|
"port": 6030,
|
||||||
"user": "root",
|
"user": "root",
|
||||||
"password": "taosdata",
|
"password": "taosdata",
|
||||||
"thread_count": 4,
|
"thread_count": 2,
|
||||||
|
"num_of_records_per_req": 10,
|
||||||
"thread_count_create_tbl": 4,
|
"thread_count_create_tbl": 4,
|
||||||
"databases": [{
|
"databases": [{
|
||||||
"dbinfo": {
|
"dbinfo": {
|
||||||
"name": "db01",
|
"name": "db01",
|
||||||
|
"drop": "yes",
|
||||||
"replica": 1,
|
"replica": 1,
|
||||||
"days": 10,
|
"days": 10,
|
||||||
"cache": 16,
|
"cache": 16,
|
||||||
|
@ -20,31 +22,23 @@
|
||||||
},
|
},
|
||||||
"super_tables": [{
|
"super_tables": [{
|
||||||
"name": "stb01",
|
"name": "stb01",
|
||||||
"childtable_count": 100,
|
"childtable_count": 3,
|
||||||
"childtable_prefix": "stb01_",
|
"childtable_prefix": "stb01_",
|
||||||
"auto_create_table": "no",
|
"auto_create_table": "no",
|
||||||
"data_source": "rand",
|
"data_source": "rand",
|
||||||
"insert_mode": "taosc",
|
"insert_mode": "taosc",
|
||||||
"insert_rate": 0,
|
"insert_rate": 0,
|
||||||
"insert_rows": 1000,
|
"insert_rows": 20,
|
||||||
"timestamp_step": 1000,
|
"timestamp_step": 1000,
|
||||||
"start_timestamp": "2020-10-01 00:00:00.000",
|
"start_timestamp": "2020-10-01 00:00:00.000",
|
||||||
"sample_format": "csv",
|
"sample_format": "csv",
|
||||||
"sample_file": "/home/data/sample.csv",
|
"sample_file": "/home/data/sample.csv",
|
||||||
"tags_file": "",
|
"tags_file": "",
|
||||||
"columns": [{
|
"columns": [{
|
||||||
"type": "SMALLINT"
|
"type": "INT"
|
||||||
}, {
|
|
||||||
"type": "BOOL"
|
|
||||||
}, {
|
|
||||||
"type": "BINARY",
|
|
||||||
"len": 6
|
|
||||||
}],
|
}],
|
||||||
"tags": [{
|
"tags": [{
|
||||||
"type": "INT"
|
"type": "INT"
|
||||||
},{
|
|
||||||
"type": "BINARY",
|
|
||||||
"len": 4
|
|
||||||
}]
|
}]
|
||||||
}]
|
}]
|
||||||
}]
|
}]
|
||||||
|
|
|
@ -63,7 +63,7 @@ class TDTestCase:
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
|
|
||||||
tdSql.query(
|
tdSql.query(
|
||||||
"select apercentile(col1, 1) from test.meters interval(10s)")
|
"select apercentile(col1, 1) from test.meters interval(100s)")
|
||||||
tdSql.checkRows(1)
|
tdSql.checkRows(1)
|
||||||
|
|
||||||
tdSql.error("select loc, count(loc) from test.meters")
|
tdSql.error("select loc, count(loc) from test.meters")
|
||||||
|
|
|
@ -24,9 +24,6 @@ class TDTestCase:
|
||||||
tdLog.debug("start to execute %s" % __file__)
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
tdSql.init(conn.cursor(), logSql)
|
tdSql.init(conn.cursor(), logSql)
|
||||||
|
|
||||||
self.numberOfTables = 10000
|
|
||||||
self.numberOfRecords = 100
|
|
||||||
|
|
||||||
def getBuildPath(self):
|
def getBuildPath(self):
|
||||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
|
@ -55,7 +52,7 @@ class TDTestCase:
|
||||||
|
|
||||||
tdSql.execute("use db01")
|
tdSql.execute("use db01")
|
||||||
tdSql.query("select count(*) from stb01")
|
tdSql.query("select count(*) from stb01")
|
||||||
tdSql.checkData(0, 0, 100000)
|
tdSql.checkData(0, 0, 60)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
Loading…
Reference in New Issue