Feature/sangshuduo/td 3317 taosdemo interlace (#5515)
* [TD-3316] <fix>: add testcase for taosdemo limit and offset. check offset 0. * [TD-3316] <fix>: add testcase for taosdemo limit and offset. fix sample file import bug. * [TD-3316] <fix>: add test case for limit and offset. fix sample data issue. * [TD-3327] <fix>: fix taosdemo segfault when import data from sample data file. * [TD-3317] <feature>: make taosdemo support interlace mode. json parameter rows_per_tbl support. * [TD-3317] <feature>: support interlace mode. refactor * [TD-3317] <feature>: support interlace mode. refactor * [TD-3317] <feature>: support interlace mode insertion. refactor. * [TD-3317] <feature>: support interlace mode insertion. change json file. * [TD-3317] <feature>: support interlace mode insertion. fix multithread create table regression. * [TD-3317] <feature>: support interlace mode insertion. working but not perfect. * [TD-3317] <feature>: support interlace mode insertion. rename lowaTest with taosdemoTestWithJson * [TD-3317] <feature>: support interlace mode insertion. perfect * [TD-3317] <feature>: support interlace mode insertion. cleanup. * [TD-3317] <feature>: support interlace mode insertion. adjust algorithm of loop times. * [TD-3317] <feature>: support interlace mode insertion. fix delay time bug. * [TD-3317] <feature>: support interlace mode insertion. fix progressive timestamp bug. * [TD-3317] <feature>: support interlace mode insertion. add an option for performance print. * [TD-3317] <feature>: support interlace mode insertion. change json test case with less table for acceleration. * [TD-3317] <feature>: support interlace mode insertion. change progressive mode timestamp step and testcase. * [TD-3197] <fix>: fix taosdemo coverity scan issues. * [TD-3197] <fix>: fix taosdemo coverity scan issue. fix subscribeTest pids uninitialized. * [TD-3317] <feature>: support interlace mode insertion. add time shift for no sleep time. Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
This commit is contained in:
parent
e555bb974c
commit
30b2611257
|
@ -2633,10 +2633,10 @@ static void createChildTables() {
|
||||||
if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0)
|
if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0)
|
||||||
|| (strncasecmp(g_args.datatype[j],
|
|| (strncasecmp(g_args.datatype[j],
|
||||||
"NCHAR", strlen("NCHAR")) == 0)) {
|
"NCHAR", strlen("NCHAR")) == 0)) {
|
||||||
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE,
|
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
|
||||||
", COL%d %s(60)", j, g_args.datatype[j]);
|
", COL%d %s(60)", j, g_args.datatype[j]);
|
||||||
} else {
|
} else {
|
||||||
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE,
|
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
|
||||||
", COL%d %s", j, g_args.datatype[j]);
|
", COL%d %s", j, g_args.datatype[j]);
|
||||||
}
|
}
|
||||||
len = strlen(tblColsBuf);
|
len = strlen(tblColsBuf);
|
||||||
|
@ -4319,7 +4319,8 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
|
||||||
return k;
|
return k;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int generateSQLHead(char *tableName, int32_t tableSeq, threadInfo* pThreadInfo, SSuperTable* superTblInfo, char *buffer)
|
static int generateSQLHead(char *tableName, int32_t tableSeq,
|
||||||
|
threadInfo* pThreadInfo, SSuperTable* superTblInfo, char *buffer)
|
||||||
{
|
{
|
||||||
int len;
|
int len;
|
||||||
if (superTblInfo) {
|
if (superTblInfo) {
|
||||||
|
@ -4440,7 +4441,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
|
|
||||||
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
|
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
|
||||||
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
|
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
|
||||||
int timeStempStep = superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
|
|
||||||
uint64_t st = 0;
|
uint64_t st = 0;
|
||||||
uint64_t et = 0xffffffff;
|
uint64_t et = 0xffffffff;
|
||||||
|
|
||||||
|
@ -4475,6 +4475,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
int generatedRecPerTbl = 0;
|
int generatedRecPerTbl = 0;
|
||||||
bool flagSleep = true;
|
bool flagSleep = true;
|
||||||
int sleepTimeTotal = 0;
|
int sleepTimeTotal = 0;
|
||||||
|
int timeShift = 0;
|
||||||
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
|
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
|
||||||
if ((flagSleep) && (insert_interval)) {
|
if ((flagSleep) && (insert_interval)) {
|
||||||
st = taosGetTimestampUs();
|
st = taosGetTimestampUs();
|
||||||
|
@ -4512,7 +4513,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
generateDataTail(
|
generateDataTail(
|
||||||
tableName, tableSeq, pThreadInfo, superTblInfo,
|
tableName, tableSeq, pThreadInfo, superTblInfo,
|
||||||
batchPerTbl, pstr, insertRows, 0,
|
batchPerTbl, pstr, insertRows, 0,
|
||||||
startTime + sleepTimeTotal + 0 * timeStempStep,
|
startTime + timeShift + sleepTimeTotal,
|
||||||
&(pThreadInfo->samplePos), &dataLen);
|
&(pThreadInfo->samplePos), &dataLen);
|
||||||
pstr += dataLen;
|
pstr += dataLen;
|
||||||
recOfBatch += batchPerTbl;
|
recOfBatch += batchPerTbl;
|
||||||
|
@ -4521,6 +4522,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
pThreadInfo->threadID, __func__, __LINE__,
|
pThreadInfo->threadID, __func__, __LINE__,
|
||||||
batchPerTbl, recOfBatch);
|
batchPerTbl, recOfBatch);
|
||||||
|
|
||||||
|
timeShift ++;
|
||||||
tableSeq ++;
|
tableSeq ++;
|
||||||
if (insertMode == INTERLACE_INSERT_MODE) {
|
if (insertMode == INTERLACE_INSERT_MODE) {
|
||||||
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
|
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
|
||||||
|
@ -5486,7 +5488,11 @@ static int queryTestProcess() {
|
||||||
char sqlStr[MAX_TB_NAME_SIZE*2];
|
char sqlStr[MAX_TB_NAME_SIZE*2];
|
||||||
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
||||||
verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
|
verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
|
||||||
(void)queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE);
|
if (0 != queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE)) {
|
||||||
|
errorPrint( "use database %s failed!\n\n",
|
||||||
|
g_queryInfo.dbName);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
t_info->taos = NULL;
|
t_info->taos = NULL;
|
||||||
}
|
}
|
||||||
|
@ -5756,22 +5762,27 @@ static int subscribeTestProcess() {
|
||||||
pthread_t *pids = NULL;
|
pthread_t *pids = NULL;
|
||||||
threadInfo *infos = NULL;
|
threadInfo *infos = NULL;
|
||||||
//==== create sub threads for query from super table
|
//==== create sub threads for query from super table
|
||||||
if (g_queryInfo.superQueryInfo.sqlCount > 0
|
if ((g_queryInfo.superQueryInfo.sqlCount <= 0) ||
|
||||||
&& g_queryInfo.superQueryInfo.concurrent > 0) {
|
(g_queryInfo.superQueryInfo.concurrent <= 0)) {
|
||||||
pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t));
|
errorPrint("%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n",
|
||||||
infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo));
|
__func__, __LINE__, g_queryInfo.superQueryInfo.sqlCount,
|
||||||
if ((NULL == pids) || (NULL == infos)) {
|
g_queryInfo.superQueryInfo.concurrent);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t));
|
||||||
|
infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo));
|
||||||
|
if ((NULL == pids) || (NULL == infos)) {
|
||||||
printf("malloc failed for create threads\n");
|
printf("malloc failed for create threads\n");
|
||||||
taos_close(taos);
|
taos_close(taos);
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
|
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
|
||||||
threadInfo *t_info = infos + i;
|
threadInfo *t_info = infos + i;
|
||||||
t_info->threadID = i;
|
t_info->threadID = i;
|
||||||
t_info->taos = taos;
|
t_info->taos = taos;
|
||||||
pthread_create(pids + i, NULL, superSubscribeProcess, t_info);
|
pthread_create(pids + i, NULL, superSubscribeProcess, t_info);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//==== create sub threads for query from sub table
|
//==== create sub threads for query from sub table
|
||||||
|
@ -6031,7 +6042,6 @@ static void querySqlFile(TAOS* taos, char* sqlFile)
|
||||||
|
|
||||||
memcpy(cmd + cmd_len, line, read_len);
|
memcpy(cmd + cmd_len, line, read_len);
|
||||||
verbosePrint("%s() LN%d cmd: %s\n", __func__, __LINE__, cmd);
|
verbosePrint("%s() LN%d cmd: %s\n", __func__, __LINE__, cmd);
|
||||||
queryDbExec(taos, cmd, NO_INSERT_TYPE);
|
|
||||||
if (0 != queryDbExec(taos, cmd, NO_INSERT_TYPE)) {
|
if (0 != queryDbExec(taos, cmd, NO_INSERT_TYPE)) {
|
||||||
printf("queryDbExec %s failed!\n", cmd);
|
printf("queryDbExec %s failed!\n", cmd);
|
||||||
tmfree(cmd);
|
tmfree(cmd);
|
||||||
|
|
Loading…
Reference in New Issue