Feature/sangshuduo/td 3317 taosdemo interlace (#5593)
* [TD-3316] <fix>: add testcase for taosdemo limit and offset. check offset 0. * [TD-3316] <fix>: add testcase for taosdemo limit and offset. fix sample file import bug. * [TD-3316] <fix>: add test case for limit and offset. fix sample data issue. * [TD-3327] <fix>: fix taosdemo segfault when import data from sample data file. * [TD-3317] <feature>: make taosdemo support interlace mode. json parameter rows_per_tbl support. * [TD-3317] <feature>: support interlace mode. refactor * [TD-3317] <feature>: support interlace mode. refactor * [TD-3317] <feature>: support interlace mode insertion. refactor. * [TD-3317] <feature>: support interlace mode insertion. change json file. * [TD-3317] <feature>: support interlace mode insertion. fix multithread create table regression. * [TD-3317] <feature>: support interlace mode insertion. working but not perfect. * [TD-3317] <feature>: support interlace mode insertion. rename lowaTest with taosdemoTestWithJson * [TD-3317] <feature>: support interlace mode insertion. perfect * [TD-3317] <feature>: support interlace mode insertion. cleanup. * [TD-3317] <feature>: support interlace mode insertion. adjust algorithm of loop times. * [TD-3317] <feature>: support interlace mode insertion. fix delay time bug. * [TD-3317] <feature>: support interlace mode insertion. fix progressive timestamp bug. * [TD-3317] <feature>: support interlace mode insertion. add an option for performance print. * [TD-3317] <feature>: support interlace mode insertion. change json test case with less table for acceleration. * [TD-3317] <feature>: support interlace mode insertion. change progressive mode timestamp step and testcase. * [TD-3197] <fix>: fix taosdemo coverity scan issues. * [TD-3197] <fix>: fix taosdemo coverity scan issue. fix subscribeTest pids uninitialized. * [TD-3317] <feature>: support interlace mode insertion. add time shift for no sleep time. * [TD-3317] <feature>: support interlace insert. rework timestamp. * [TD-3317] <feature>: support interlace mode insertion. change rows_per_tbl to interlace_rows. Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
This commit is contained in:
parent
c89ada70fd
commit
4acaa5dfc0
|
@ -393,6 +393,7 @@ typedef struct SThreadInfo_S {
|
||||||
TAOS *taos;
|
TAOS *taos;
|
||||||
int threadID;
|
int threadID;
|
||||||
char db_name[MAX_DB_NAME_SIZE+1];
|
char db_name[MAX_DB_NAME_SIZE+1];
|
||||||
|
uint32_t time_precision;
|
||||||
char fp[4096];
|
char fp[4096];
|
||||||
char tb_prefix[MAX_TB_NAME_SIZE];
|
char tb_prefix[MAX_TB_NAME_SIZE];
|
||||||
int start_table_from;
|
int start_table_from;
|
||||||
|
@ -1269,7 +1270,6 @@ static void printfInsertMetaToFile(FILE* fp) {
|
||||||
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
|
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
|
||||||
fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount);
|
fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount);
|
||||||
fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl);
|
fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl);
|
||||||
fprintf(fp, "insert interval: %d\n", g_args.insert_interval);
|
|
||||||
fprintf(fp, "number of records per req: %d\n", g_args.num_of_RPR);
|
fprintf(fp, "number of records per req: %d\n", g_args.num_of_RPR);
|
||||||
fprintf(fp, "max sql length: %d\n", g_args.max_sql_len);
|
fprintf(fp, "max sql length: %d\n", g_args.max_sql_len);
|
||||||
fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
|
fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
|
||||||
|
@ -1355,7 +1355,10 @@ static void printfInsertMetaToFile(FILE* fp) {
|
||||||
fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource);
|
fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource);
|
||||||
fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode);
|
fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode);
|
||||||
fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows);
|
fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows);
|
||||||
|
fprintf(fp, " interlace rows: %d\n", g_Dbs.db[i].superTbls[j].interlaceRows);
|
||||||
|
if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) {
|
||||||
fprintf(fp, " insert interval: %d\n", g_Dbs.db[i].superTbls[j].insertInterval);
|
fprintf(fp, " insert interval: %d\n", g_Dbs.db[i].superTbls[j].insertInterval);
|
||||||
|
}
|
||||||
|
|
||||||
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
|
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
|
||||||
fprintf(fp, " multiThreadWriteOneTbl: no\n");
|
fprintf(fp, " multiThreadWriteOneTbl: no\n");
|
||||||
|
@ -4444,7 +4447,7 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int generateDataBuffer(char *pTblName,
|
static int generateProgressiveDataBuffer(char *pTblName,
|
||||||
int32_t tableSeq,
|
int32_t tableSeq,
|
||||||
threadInfo *pThreadInfo, char *buffer,
|
threadInfo *pThreadInfo, char *buffer,
|
||||||
int64_t insertRows,
|
int64_t insertRows,
|
||||||
|
@ -4584,6 +4587,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
verbosePrint("[%d] %s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n",
|
verbosePrint("[%d] %s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n",
|
||||||
pThreadInfo->threadID, __func__, __LINE__,
|
pThreadInfo->threadID, __func__, __LINE__,
|
||||||
i, batchPerTblTimes, batchPerTbl);
|
i, batchPerTblTimes, batchPerTbl);
|
||||||
|
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
|
||||||
|
startTime = taosGetTimestamp(pThreadInfo->time_precision);
|
||||||
|
}
|
||||||
generateDataTail(
|
generateDataTail(
|
||||||
tableName, tableSeq, pThreadInfo, superTblInfo,
|
tableName, tableSeq, pThreadInfo, superTblInfo,
|
||||||
batchPerTbl, pstr, insertRows, 0,
|
batchPerTbl, pstr, insertRows, 0,
|
||||||
|
@ -4716,10 +4722,11 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
|
|
||||||
int timeStampStep =
|
int timeStampStep =
|
||||||
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
|
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
|
||||||
int insert_interval =
|
/* int insert_interval =
|
||||||
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
|
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
|
||||||
uint64_t st = 0;
|
uint64_t st = 0;
|
||||||
uint64_t et = 0xffffffff;
|
uint64_t et = 0xffffffff;
|
||||||
|
*/
|
||||||
|
|
||||||
pThreadInfo->totalInsertRows = 0;
|
pThreadInfo->totalInsertRows = 0;
|
||||||
pThreadInfo->totalAffectedRows = 0;
|
pThreadInfo->totalAffectedRows = 0;
|
||||||
|
@ -4735,9 +4742,11 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
|
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
|
||||||
|
|
||||||
for (int64_t i = 0; i < insertRows;) {
|
for (int64_t i = 0; i < insertRows;) {
|
||||||
|
/*
|
||||||
if (insert_interval) {
|
if (insert_interval) {
|
||||||
st = taosGetTimestampUs();
|
st = taosGetTimestampUs();
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
char tableName[TSDB_TABLE_NAME_LEN];
|
char tableName[TSDB_TABLE_NAME_LEN];
|
||||||
getTableName(tableName, pThreadInfo, tableSeq);
|
getTableName(tableName, pThreadInfo, tableSeq);
|
||||||
|
@ -4745,7 +4754,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
__func__, __LINE__,
|
__func__, __LINE__,
|
||||||
pThreadInfo->threadID, tableSeq, tableName);
|
pThreadInfo->threadID, tableSeq, tableName);
|
||||||
|
|
||||||
int generated = generateDataBuffer(
|
int generated = generateProgressiveDataBuffer(
|
||||||
tableName, tableSeq, pThreadInfo, buffer, insertRows,
|
tableName, tableSeq, pThreadInfo, buffer, insertRows,
|
||||||
i, start_time,
|
i, start_time,
|
||||||
&(pThreadInfo->samplePos));
|
&(pThreadInfo->samplePos));
|
||||||
|
@ -4787,7 +4796,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
|
|
||||||
if (i >= insertRows)
|
if (i >= insertRows)
|
||||||
break;
|
break;
|
||||||
|
/*
|
||||||
if (insert_interval) {
|
if (insert_interval) {
|
||||||
et = taosGetTimestampUs();
|
et = taosGetTimestampUs();
|
||||||
|
|
||||||
|
@ -4798,6 +4807,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
taosMsleep(sleep_time); // ms
|
taosMsleep(sleep_time); // ms
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
} // num_of_DPT
|
} // num_of_DPT
|
||||||
|
|
||||||
if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo &&
|
if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo &&
|
||||||
|
@ -5061,6 +5071,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
||||||
threadInfo *t_info = infos + i;
|
threadInfo *t_info = infos + i;
|
||||||
t_info->threadID = i;
|
t_info->threadID = i;
|
||||||
tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE);
|
tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE);
|
||||||
|
t_info->time_precision = timePrec;
|
||||||
t_info->superTblInfo = superTblInfo;
|
t_info->superTblInfo = superTblInfo;
|
||||||
|
|
||||||
t_info->start_time = start_time;
|
t_info->start_time = start_time;
|
||||||
|
|
Loading…
Reference in New Issue