Feature/sangshuduo/td 4068 taosdemo stmt (#6166)

* merge with develop branch.

change query/tests/CMakeLists.txt to allow unused function and variable.

* refactor data generating.

* refactor.

* refactor.

* refactor.

Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
This commit is contained in:
Shuduo Sang 2021-05-20 13:11:48 +08:00 committed by GitHub
parent b05e9dc640
commit 0a07ea10ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 213 additions and 170 deletions

View File

@ -70,6 +70,8 @@ enum TEST_MODE {
#define MAX_RECORDS_PER_REQ 32766
#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into ..
#define MAX_SQL_SIZE 65536
#define BUFFER_SIZE (65536*2)
#define COND_BUF_LEN BUFFER_SIZE - 30
@ -1079,8 +1081,6 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
static bool getInfoFromJsonFile(char* file);
//static int generateOneRowDataForStb(SSuperTable* stbInfo);
//static int getDataIntoMemForStb(SSuperTable* stbInfo);
static void init_rand_data();
static void tmfclose(FILE *fp) {
if (NULL != fp) {
@ -1180,7 +1180,8 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) {
totalLen += len;
}
verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n", __func__, __LINE__, databuf, resultFile);
verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n",
__func__, __LINE__, databuf, resultFile);
appendResultBufToFile(databuf, resultFile);
free(databuf);
}
@ -1640,8 +1641,10 @@ static void printfInsertMetaToFile(FILE* fp) {
*/
fprintf(fp, " interlaceRows: %"PRIu64"\n",
g_Dbs.db[i].superTbls[j].interlaceRows);
fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange);
fprintf(fp, " disorderRatio: %d\n", g_Dbs.db[i].superTbls[j].disorderRatio);
fprintf(fp, " disorderRange: %d\n",
g_Dbs.db[i].superTbls[j].disorderRange);
fprintf(fp, " disorderRatio: %d\n",
g_Dbs.db[i].superTbls[j].disorderRatio);
fprintf(fp, " maxSqlLen: %"PRIu64"\n",
g_Dbs.db[i].superTbls[j].maxSqlLen);
@ -1649,11 +1652,15 @@ static void printfInsertMetaToFile(FILE* fp) {
g_Dbs.db[i].superTbls[j].timeStampStep);
fprintf(fp, " startTimestamp: %s\n",
g_Dbs.db[i].superTbls[j].startTimestamp);
fprintf(fp, " sampleFormat: %s\n", g_Dbs.db[i].superTbls[j].sampleFormat);
fprintf(fp, " sampleFile: %s\n", g_Dbs.db[i].superTbls[j].sampleFile);
fprintf(fp, " tagsFile: %s\n", g_Dbs.db[i].superTbls[j].tagsFile);
fprintf(fp, " sampleFormat: %s\n",
g_Dbs.db[i].superTbls[j].sampleFormat);
fprintf(fp, " sampleFile: %s\n",
g_Dbs.db[i].superTbls[j].sampleFile);
fprintf(fp, " tagsFile: %s\n",
g_Dbs.db[i].superTbls[j].tagsFile);
fprintf(fp, " columnCount: %d\n ", g_Dbs.db[i].superTbls[j].columnCount);
fprintf(fp, " columnCount: %d\n ",
g_Dbs.db[i].superTbls[j].columnCount);
for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) {
//printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
if ((0 == strncasecmp(
@ -1665,7 +1672,8 @@ static void printfInsertMetaToFile(FILE* fp) {
g_Dbs.db[i].superTbls[j].columns[k].dataType,
g_Dbs.db[i].superTbls[j].columns[k].dataLen);
} else {
fprintf(fp, "column[%d]:%s ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType);
fprintf(fp, "column[%d]:%s ",
k, g_Dbs.db[i].superTbls[j].columns[k].dataType);
}
}
fprintf(fp, "\n");
@ -4637,16 +4645,22 @@ static int getRowDataFromSample(
return dataLen;
}
static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stbInfo) {
static int64_t generateStbRowData(
SSuperTable* stbInfo,
char* recBuf, int64_t timestamp
) {
int64_t dataLen = 0;
char *pstr = recBuf;
int64_t maxLen = MAX_DATA_SIZE;
dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "(%" PRId64 ",", timestamp);
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"(%" PRId64 ",", timestamp);
for (int i = 0; i < stbInfo->columnCount; i++) {
if ((0 == strncasecmp(stbInfo->columns[i].dataType, "BINARY", strlen("BINARY")))
|| (0 == strncasecmp(stbInfo->columns[i].dataType, "NCHAR", strlen("NCHAR")))) {
if ((0 == strncasecmp(stbInfo->columns[i].dataType,
"BINARY", strlen("BINARY")))
|| (0 == strncasecmp(stbInfo->columns[i].dataType,
"NCHAR", strlen("NCHAR")))) {
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint( "binary or nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
@ -4708,7 +4722,7 @@ static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stb
}
static int64_t generateData(char *recBuf, char **data_type,
int num_of_cols, int64_t timestamp, int lenOfBinary) {
int64_t timestamp, int lenOfBinary) {
memset(recBuf, 0, MAX_DATA_SIZE);
char *pstr = recBuf;
pstr += sprintf(pstr, "(%" PRId64, timestamp);
@ -4859,70 +4873,24 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, uint64_t table
}
}
static int64_t generateDataTail(
SSuperTable* superTblInfo,
uint64_t batch, char* buffer, int64_t remainderBufLen, int64_t insertRows,
uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, int64_t *dataLen) {
static int64_t generateDataTailWithoutStb(
uint64_t batch, char* buffer,
int64_t remainderBufLen, int64_t insertRows,
uint64_t startFrom, int64_t startTime,
/* int64_t *pSamplePos, */int64_t *dataLen) {
uint64_t len = 0;
uint32_t ncols_per_record = 1; // count first col ts
char *pstr = buffer;
if (superTblInfo == NULL) {
uint32_t datatypeSeq = 0;
while(g_args.datatype[datatypeSeq]) {
datatypeSeq ++;
ncols_per_record ++;
}
}
verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch);
uint64_t k = 0;
int64_t k = 0;
for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
int64_t retLen = 0;
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample"))) {
retLen = getRowDataFromSample(
data,
remainderBufLen,
startTime + superTblInfo->timeStampStep * k,
superTblInfo,
pSamplePos);
} else if (0 == strncasecmp(superTblInfo->dataSource,
"rand", strlen("rand"))) {
int64_t randTail = superTblInfo->timeStampStep * k;
if (superTblInfo->disorderRatio > 0) {
int rand_num = taosRandom() % 100;
if(rand_num < superTblInfo->disorderRatio) {
randTail = (randTail + (taosRandom() % superTblInfo->disorderRange + 1)) * (-1);
debugPrint("rand data generated, back %"PRId64"\n", randTail);
}
}
int64_t d = startTime
+ randTail;
retLen = generateRowData(
data,
d,
superTblInfo);
}
if (retLen > remainderBufLen) {
break;
}
pstr += snprintf(pstr , retLen + 1, "%s", data);
k++;
len += retLen;
remainderBufLen -= retLen;
} else {
char **data_type = g_args.datatype;
int lenOfBinary = g_args.len_of_binary;
@ -4931,7 +4899,8 @@ static int64_t generateDataTail(
if (g_args.disorderRatio != 0) {
int rand_num = taosRandom() % 100;
if (rand_num < g_args.disorderRatio) {
randTail = (randTail + (taosRandom() % g_args.disorderRange + 1)) * (-1);
randTail = (randTail +
(taosRandom() % g_args.disorderRange + 1)) * (-1);
debugPrint("rand data generated, back %"PRId64"\n", randTail);
}
@ -4940,7 +4909,6 @@ static int64_t generateDataTail(
}
retLen = generateData(data, data_type,
ncols_per_record,
startTime + randTail,
lenOfBinary);
@ -4951,7 +4919,6 @@ static int64_t generateDataTail(
k++;
len += retLen;
remainderBufLen -= retLen;
}
verbosePrint("%s() LN%d len=%"PRIu64" k=%"PRIu64" \nbuffer=%s\n",
__func__, __LINE__, len, k, buffer);
@ -4967,16 +4934,106 @@ static int64_t generateDataTail(
return k;
}
static int generateSQLHead(char *tableName, int32_t tableSeq,
threadInfo* pThreadInfo, SSuperTable* superTblInfo,
static int64_t generateStbDataTail(
SSuperTable* superTblInfo,
uint64_t batch, char* buffer,
int64_t remainderBufLen, int64_t insertRows,
uint64_t startFrom, int64_t startTime,
int64_t *pSamplePos, int64_t *dataLen) {
uint64_t len = 0;
char *pstr = buffer;
verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch);
int64_t k = 0;
for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
int64_t retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample"))) {
retLen = getRowDataFromSample(
data,
remainderBufLen,
startTime + superTblInfo->timeStampStep * k,
superTblInfo,
pSamplePos);
} else if (0 == strncasecmp(superTblInfo->dataSource,
"rand", strlen("rand"))) {
int64_t randTail = superTblInfo->timeStampStep * k;
if (superTblInfo->disorderRatio > 0) {
int rand_num = taosRandom() % 100;
if(rand_num < superTblInfo->disorderRatio) {
randTail = (randTail +
(taosRandom() % superTblInfo->disorderRange + 1)) * (-1);
debugPrint("rand data generated, back %"PRId64"\n", randTail);
}
}
int64_t d = startTime + randTail;
retLen = generateStbRowData(superTblInfo, data, d);
}
if (retLen > remainderBufLen) {
break;
}
pstr += snprintf(pstr , retLen + 1, "%s", data);
k++;
len += retLen;
remainderBufLen -= retLen;
verbosePrint("%s() LN%d len=%"PRIu64" k=%"PRIu64" \nbuffer=%s\n",
__func__, __LINE__, len, k, buffer);
startFrom ++;
if (startFrom >= insertRows) {
break;
}
}
*dataLen = len;
return k;
}
static int generateSQLHeadWithoutStb(char *tableName,
char *dbName,
char *buffer, int remainderBufLen)
{
int len;
char headBuf[HEAD_BUFF_LEN];
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
dbName,
tableName);
if (len > remainderBufLen)
return -1;
tstrncpy(buffer, headBuf, len + 1);
return len;
}
static int generateStbSQLHead(
SSuperTable* superTblInfo,
char *tableName, int32_t tableSeq,
char *dbName,
char *buffer, int remainderBufLen)
{
int len;
#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into ..
char headBuf[HEAD_BUFF_LEN];
if (superTblInfo) {
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
@ -4996,9 +5053,9 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
headBuf,
HEAD_BUFF_LEN,
"%s.%s using %s.%s tags %s values",
pThreadInfo->db_name,
dbName,
tableName,
pThreadInfo->db_name,
dbName,
superTblInfo->sTblName,
tagsValBuf);
tmfree(tagsValBuf);
@ -5007,22 +5064,14 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
pThreadInfo->db_name,
dbName,
tableName);
} else {
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
pThreadInfo->db_name,
tableName);
}
} else {
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
pThreadInfo->db_name,
dbName,
tableName);
}
@ -5046,8 +5095,10 @@ static int64_t generateInterlaceDataBuffer(
char *pstr = buffer;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo,
superTblInfo, pstr, *pRemainderBufLen);
int headLen = generateStbSQLHead(
superTblInfo,
tableName, tableSeq, pThreadInfo->db_name,
pstr, *pRemainderBufLen);
if (headLen <= 0) {
return 0;
@ -5065,19 +5116,24 @@ static int64_t generateInterlaceDataBuffer(
pThreadInfo->threadID, __func__, __LINE__,
i, batchPerTblTimes, batchPerTbl);
int64_t k;
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
startTime = taosGetTimestamp(pThreadInfo->time_precision);
}
} else {
startTime = 1500000000000;
}
int64_t k = generateDataTail(
k = generateStbDataTail(
superTblInfo,
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
startTime,
&(pThreadInfo->samplePos), &dataLen);
} else {
startTime = 1500000000000;
k = generateDataTailWithoutStb(
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
startTime,
/* &(pThreadInfo->samplePos), */&dataLen);
}
if (k == batchPerTbl) {
pstr += dataLen;
@ -5103,25 +5159,23 @@ static int64_t generateProgressiveDataBuffer(
{
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int ncols_per_record = 1; // count first col ts
if (superTblInfo == NULL) {
int datatypeSeq = 0;
while(g_args.datatype[datatypeSeq]) {
datatypeSeq ++;
ncols_per_record ++;
}
}
assert(buffer != NULL);
char *pstr = buffer;
int64_t k = 0;
memset(buffer, 0, *pRemainderBufLen);
int64_t headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo,
int64_t headLen;
if (superTblInfo) {
headLen = generateStbSQLHead(
superTblInfo,
tableName, tableSeq, pThreadInfo->db_name,
buffer, *pRemainderBufLen);
} else {
headLen = generateSQLHeadWithoutStb(
tableName, pThreadInfo->db_name,
buffer, *pRemainderBufLen);
}
if (headLen <= 0) {
return 0;
@ -5130,10 +5184,20 @@ static int64_t generateProgressiveDataBuffer(
*pRemainderBufLen -= headLen;
int64_t dataLen;
k = generateDataTail(superTblInfo,
g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom,
int64_t k;
if (superTblInfo) {
k = generateStbDataTail(superTblInfo,
g_args.num_of_RPR, pstr, *pRemainderBufLen,
insertRows, startFrom,
startTime,
pSamplePos, &dataLen);
} else {
k = generateDataTailWithoutStb(
g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom,
startTime,
/*pSamplePos, */&dataLen);
}
return k;
}
@ -5147,6 +5211,7 @@ static void printStatPerThread(threadInfo *pThreadInfo)
(double)(pThreadInfo->totalAffectedRows / (pThreadInfo->totalDelay/1000.0)));
}
// sync write interlace data
static void* syncWriteInterlace(threadInfo *pThreadInfo) {
debugPrint("[%d] %s() LN%d: ### interlace write\n",
pThreadInfo->threadID, __func__, __LINE__);
@ -5197,7 +5262,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
int64_t nTimeStampStep = superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
int64_t nTimeStampStep =
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
uint64_t insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
@ -5211,7 +5277,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
uint64_t tableSeq = pThreadInfo->start_table_from;
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->start_table_from,
pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->start_table_from,
pThreadInfo->ntables, insertRows);
int64_t startTime = pThreadInfo->start_time;
@ -5384,19 +5451,18 @@ free_of_interlace:
return NULL;
}
// sync insertion
/*
1 thread: 100 tables * 2000 rows/s
1 thread: 10 tables * 20000 rows/s
6 thread: 300 tables * 2000 rows/s
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
*/
// sync insertion progressive data
static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
int64_t timeStampStep =
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
int64_t insertRows =
(superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n",
__func__, __LINE__, insertRows);
pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == pThreadInfo->buffer) {
@ -5410,8 +5476,6 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
uint64_t startTs = taosGetTimestampMs();
uint64_t endTs;
int64_t timeStampStep =
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
/* int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0;
@ -5423,21 +5487,12 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->samplePos = 0;
for (uint64_t tableSeq =
pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to;
for (uint64_t tableSeq = pThreadInfo->start_table_from;
tableSeq <= pThreadInfo->end_table_to;
tableSeq ++) {
int64_t start_time = pThreadInfo->start_time;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
for (uint64_t i = 0; i < insertRows;) {
/*
if (insert_interval) {
st = taosGetTimestampMs();
}
*/
char tableName[TSDB_TABLE_NAME_LEN];
getTableName(tableName, pThreadInfo, tableSeq);
verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n",
@ -5502,28 +5557,15 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if (i >= insertRows)
break;
/*
if (insert_interval) {
et = taosGetTimestampMs();
if (insert_interval > ((et - st)) ) {
int sleep_time = insert_interval - (et -st);
performancePrint("%s() LN%d sleep: %d ms for insert interval\n",
__func__, __LINE__, sleep_time);
taosMsleep(sleep_time); // ms
}
}
*/
} // num_of_DPT
if (g_args.verbose_print) {
if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo &&
if ((g_args.verbose_print) &&
(tableSeq == pThreadInfo->ntables - 1) && (superTblInfo) &&
(0 == strncasecmp(
superTblInfo->dataSource, "sample", strlen("sample")))) {
verbosePrint("%s() LN%d samplePos=%"PRId64"\n",
__func__, __LINE__, pThreadInfo->samplePos);
}
}
} // tableSeq
free_of_progressive:
@ -5557,7 +5599,6 @@ static void* syncWrite(void *sarg) {
// progressive mode
return syncWriteProgressive(pThreadInfo);
}
}
static void callBack(void *param, TAOS_RES *res, int code) {
@ -5595,10 +5636,12 @@ static void callBack(void *param, TAOS_RES *res, int code) {
int rand_num = taosRandom() % 100;
if (0 != pThreadInfo->superTblInfo->disorderRatio
&& rand_num < pThreadInfo->superTblInfo->disorderRatio) {
int64_t d = pThreadInfo->lastTs - (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1);
generateRowData(data, d, pThreadInfo->superTblInfo);
int64_t d = pThreadInfo->lastTs
- (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1);
generateStbRowData(pThreadInfo->superTblInfo, data, d);
} else {
generateRowData(data, pThreadInfo->lastTs += 1000, pThreadInfo->superTblInfo);
generateStbRowData(pThreadInfo->superTblInfo,
data, pThreadInfo->lastTs += 1000);
}
pstr += sprintf(pstr, "%s", data);
pThreadInfo->counter++;