[TD-3317]<fix>: taosdemo interlace insertion. (#5774)

patch for master.

Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
This commit is contained in:
Shuduo Sang 2021-04-11 21:31:47 +08:00 committed by GitHub
parent 5e43fc0d24
commit 7153904339
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 28 deletions

View File

@ -18,6 +18,7 @@
when in some thread query return error, thread don't exit, but return, otherwise coredump in other thread. when in some thread query return error, thread don't exit, but return, otherwise coredump in other thread.
*/ */
#include <stdint.h>
#define _GNU_SOURCE #define _GNU_SOURCE
#define CURL_STATICLIB #define CURL_STATICLIB
@ -3242,7 +3243,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
if (numRecPerReq && numRecPerReq->type == cJSON_Number) { if (numRecPerReq && numRecPerReq->type == cJSON_Number) {
g_args.num_of_RPR = numRecPerReq->valueint; g_args.num_of_RPR = numRecPerReq->valueint;
} else if (!numRecPerReq) { } else if (!numRecPerReq) {
g_args.num_of_RPR = 0xffff; g_args.num_of_RPR = INT32_MAX;
} else { } else {
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n", errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n",
__func__, __LINE__); __func__, __LINE__);
@ -4647,7 +4648,7 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
len = snprintf( len = snprintf(
headBuf, headBuf,
HEAD_BUFF_LEN, HEAD_BUFF_LEN,
"insert into %s.%s using %s.%s tags %s values", "%s.%s using %s.%s tags %s values",
pThreadInfo->db_name, pThreadInfo->db_name,
tableName, tableName,
pThreadInfo->db_name, pThreadInfo->db_name,
@ -4658,14 +4659,14 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
len = snprintf( len = snprintf(
headBuf, headBuf,
HEAD_BUFF_LEN, HEAD_BUFF_LEN,
"insert into %s.%s values", "%s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
tableName); tableName);
} else { } else {
len = snprintf( len = snprintf(
headBuf, headBuf,
HEAD_BUFF_LEN, HEAD_BUFF_LEN,
"insert into %s.%s values", "%s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
tableName); tableName);
} }
@ -4673,7 +4674,7 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
len = snprintf( len = snprintf(
headBuf, headBuf,
HEAD_BUFF_LEN, HEAD_BUFF_LEN,
"insert into %s.%s values", "%s.%s values",
pThreadInfo->db_name, pThreadInfo->db_name,
tableName); tableName);
} }
@ -4694,6 +4695,7 @@ static int generateInterlaceDataBuffer(
int64_t startTime, int64_t startTime,
int *pRemainderBufLen) int *pRemainderBufLen)
{ {
assert(buffer);
char *pstr = buffer; char *pstr = buffer;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
@ -4723,18 +4725,20 @@ static int generateInterlaceDataBuffer(
} else { } else {
startTime = 1500000000000; startTime = 1500000000000;
} }
int k = generateDataTail( int k = generateDataTail(
superTblInfo, superTblInfo,
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
startTime, startTime,
&(pThreadInfo->samplePos), &dataLen); &(pThreadInfo->samplePos), &dataLen);
if (k > 0) { if (k == batchPerTbl) {
pstr += dataLen; pstr += dataLen;
*pRemainderBufLen -= dataLen; *pRemainderBufLen -= dataLen;
} else { } else {
pstr -= headLen; pstr -= headLen;
pstr[0] = '\0'; pstr[0] = '\0';
k = 0;
} }
return k; return k;
@ -4745,7 +4749,8 @@ static int generateProgressiveDataBuffer(
int32_t tableSeq, int32_t tableSeq,
threadInfo *pThreadInfo, char *buffer, threadInfo *pThreadInfo, char *buffer,
int64_t insertRows, int64_t insertRows,
int64_t startFrom, int64_t startTime, int *pSamplePos) int64_t startFrom, int64_t startTime, int *pSamplePos,
int *pRemainderBufLen)
{ {
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
@ -4760,27 +4765,24 @@ static int generateProgressiveDataBuffer(
} }
assert(buffer != NULL); assert(buffer != NULL);
int k = 0;
int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
int remainderBufLen = maxSqlLen;
memset(buffer, 0, maxSqlLen);
char *pstr = buffer; char *pstr = buffer;
int k = 0;
memset(buffer, 0, *pRemainderBufLen);
int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo, int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo,
buffer, remainderBufLen); buffer, *pRemainderBufLen);
if (headLen <= 0) { if (headLen <= 0) {
return 0; return 0;
} }
pstr += headLen; pstr += headLen;
remainderBufLen -= headLen; *pRemainderBufLen -= headLen;
int dataLen; int dataLen;
k = generateDataTail(superTblInfo, k = generateDataTail(superTblInfo,
g_args.num_of_RPR, pstr, remainderBufLen, insertRows, startFrom, g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom,
startTime, startTime,
pSamplePos, &dataLen); pSamplePos, &dataLen);
@ -4842,18 +4844,17 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int64_t startTime = pThreadInfo->start_time; int64_t startTime = pThreadInfo->start_time;
int batchPerTblTimes;
int batchPerTbl;
assert(pThreadInfo->ntables > 0); assert(pThreadInfo->ntables > 0);
if (interlaceRows > g_args.num_of_RPR) if (interlaceRows > g_args.num_of_RPR)
interlaceRows = g_args.num_of_RPR; interlaceRows = g_args.num_of_RPR;
batchPerTbl = interlaceRows; int batchPerTbl = interlaceRows;
int batchPerTblTimes;
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes = batchPerTblTimes =
(g_args.num_of_RPR / (interlaceRows * pThreadInfo->ntables)) + 1; g_args.num_of_RPR / interlaceRows;
} else { } else {
batchPerTblTimes = 1; batchPerTblTimes = 1;
} }
@ -4862,6 +4863,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
bool flagSleep = true; bool flagSleep = true;
int sleepTimeTotal = 0; int sleepTimeTotal = 0;
char *strInsertInto = "insert into ";
int nInsertBufLen = strlen(strInsertInto);
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
if ((flagSleep) && (insert_interval)) { if ((flagSleep) && (insert_interval)) {
st = taosGetTimestampUs(); st = taosGetTimestampUs();
@ -4872,6 +4876,11 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int remainderBufLen = maxSqlLen; int remainderBufLen = maxSqlLen;
char *pstr = buffer; char *pstr = buffer;
int len = snprintf(pstr, nInsertBufLen + 1, "%s", strInsertInto);
pstr += len;
remainderBufLen -= len;
int recOfBatch = 0; int recOfBatch = 0;
for (int i = 0; i < batchPerTblTimes; i ++) { for (int i = 0; i < batchPerTblTimes; i ++) {
@ -4883,6 +4892,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
return NULL; return NULL;
} }
int oldRemainderLen = remainderBufLen;
int generated = generateInterlaceDataBuffer( int generated = generateInterlaceDataBuffer(
tableName, batchPerTbl, i, batchPerTblTimes, tableName, batchPerTbl, i, batchPerTblTimes,
tableSeq, tableSeq,
@ -4901,6 +4911,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
tableSeq ++; tableSeq ++;
recOfBatch += batchPerTbl; recOfBatch += batchPerTbl;
pstr += (oldRemainderLen - remainderBufLen);
// startTime += batchPerTbl * superTblInfo->timeStampStep; // startTime += batchPerTbl * superTblInfo->timeStampStep;
pThreadInfo->totalInsertRows += batchPerTbl; pThreadInfo->totalInsertRows += batchPerTbl;
verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n", verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
@ -5012,11 +5023,12 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); char* buffer = calloc(maxSqlLen, 1);
if (NULL == buffer) { if (NULL == buffer) {
errorPrint( "Failed to alloc %d Bytes, reason:%s\n", errorPrint( "Failed to alloc %d Bytes, reason:%s\n",
superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, maxSqlLen,
strerror(errno)); strerror(errno));
return NULL; return NULL;
} }
@ -5059,10 +5071,20 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
__func__, __LINE__, __func__, __LINE__,
pThreadInfo->threadID, tableSeq, tableName); pThreadInfo->threadID, tableSeq, tableName);
int remainderBufLen = maxSqlLen;
char *pstr = buffer;
int nInsertBufLen = strlen("insert into ");
int len = snprintf(pstr, nInsertBufLen + 1, "%s", "insert into ");
pstr += len;
remainderBufLen -= len;
int generated = generateProgressiveDataBuffer( int generated = generateProgressiveDataBuffer(
tableName, tableSeq, pThreadInfo, buffer, insertRows, tableName, tableSeq, pThreadInfo, pstr, insertRows,
i, start_time, i, start_time,
&(pThreadInfo->samplePos)); &(pThreadInfo->samplePos),
&remainderBufLen);
if (generated > 0) if (generated > 0)
i += generated; i += generated;
else else

View File

@ -51,7 +51,7 @@ class taosdemoPerformace:
"insert_rows": 100000, "insert_rows": 100000,
"multi_thread_write_one_tbl": "no", "multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0, "number_of_tbl_in_one_sql": 0,
"rows_per_tbl": 100, "interlace_rows": 100,
"max_sql_len": 1024000, "max_sql_len": 1024000,
"disorder_ratio": 0, "disorder_ratio": 0,
"disorder_range": 1000, "disorder_range": 1000,
@ -159,4 +159,4 @@ if __name__ == '__main__':
perftest = taosdemoPerformace(args.commit_id, args.database_name) perftest = taosdemoPerformace(args.commit_id, args.database_name)
perftest.insertData() perftest.insertData()
perftest.createTablesAndStoreData() perftest.createTablesAndStoreData()