Hotfix/sangshuduo/td 5872 taosdemo stmt improve (#7875)

* [TD-5872]<fix>: taosdemo stmt improve.

* refactor stmt functions.

* [TD-5872]<fix>: taosdemo stmt csv perf improve.

* rand func back to early impl.

* fix windows/mac compile error.

* fix empty tag sample.

* [TD-5873]<test>add stmt’performance taosdemo testcase

* add data_type enum and stmt_batch framework.

* use data type enum and fix test case limit/offset.

* revert thread number.

* rename MAX_SAMPLES_ONCE_FROM_FILE to reflect reality.

* split func for stmt interlace.

* fix bug that get build path.

* stmt batch interlace works.

* fix start time issue.

Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
Co-authored-by: tomchon <haoran920c@163.com>
This commit is contained in:
Shuduo Sang 2021-09-13 22:24:25 +08:00 committed by GitHub
parent d86c5910ec
commit f449f25078
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 301 additions and 183 deletions

View File

@ -109,8 +109,7 @@ extern char configDir[];
#define DEFAULT_DATATYPE_NUM 1 #define DEFAULT_DATATYPE_NUM 1
#define DEFAULT_CHILDTABLES 10000 #define DEFAULT_CHILDTABLES 10000
#define STMT_BIND_PARAM_BATCH 1
#define STMT_BIND_PARAM_BATCH 0
char* g_sampleDataBuf = NULL; char* g_sampleDataBuf = NULL;
#if STMT_BIND_PARAM_BATCH == 1 #if STMT_BIND_PARAM_BATCH == 1
@ -5430,7 +5429,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
g_Dbs.db[i].superTbls[j].interlaceRows = g_Dbs.db[i].superTbls[j].insertRows; g_Dbs.db[i].superTbls[j].interlaceRows = g_Dbs.db[i].superTbls[j].insertRows;
} }
} else if (!stbInterlaceRows) { } else if (!stbInterlaceRows) {
g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req g_Dbs.db[i].superTbls[j].interlaceRows = g_args.interlaceRows; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else { } else {
errorPrint( errorPrint(
"%s", "failed to read json, interlace rows input mistake\n"); "%s", "failed to read json, interlace rows input mistake\n");
@ -8294,7 +8293,7 @@ static uint32_t execBindParam(
} }
#endif #endif
static int32_t prepareStbStmtWithSample( static int32_t prepareStbStmt(
threadInfo *pThreadInfo, threadInfo *pThreadInfo,
char *tableName, char *tableName,
int64_t tableSeq, int64_t tableSeq,
@ -8468,21 +8467,18 @@ static void* syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, uint32_t inter
pThreadInfo->threadID, __func__, __LINE__); pThreadInfo->threadID, __func__, __LINE__);
int64_t insertRows; int64_t insertRows;
uint64_t maxSqlLen; int64_t timeStampStep;
int64_t nTimeStampStep;
uint64_t insert_interval; uint64_t insert_interval;
SSuperTable* stbInfo = pThreadInfo->stbInfo; SSuperTable* stbInfo = pThreadInfo->stbInfo;
if (stbInfo) { if (stbInfo) {
insertRows = stbInfo->insertRows; insertRows = stbInfo->insertRows;
maxSqlLen = stbInfo->maxSqlLen; timeStampStep = stbInfo->timeStampStep;
nTimeStampStep = stbInfo->timeStampStep;
insert_interval = stbInfo->insertInterval; insert_interval = stbInfo->insertInterval;
} else { } else {
insertRows = g_args.insertRows; insertRows = g_args.insertRows;
maxSqlLen = g_args.max_sql_len; timeStampStep = g_args.timestamp_step;
nTimeStampStep = g_args.timestamp_step;
insert_interval = g_args.insert_interval; insert_interval = g_args.insert_interval;
} }
@ -8491,18 +8487,14 @@ static void* syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, uint32_t inter
pThreadInfo->start_table_from, pThreadInfo->start_table_from,
pThreadInfo->ntables, insertRows); pThreadInfo->ntables, insertRows);
uint32_t batchPerTbl = interlaceRows; uint64_t timesInterlace = (insertRows / interlaceRows) + 1;
uint32_t batchPerTblTimes; uint32_t precalcBatch = interlaceRows;
if (interlaceRows > g_args.reqPerReq) if (precalcBatch > g_args.reqPerReq)
interlaceRows = g_args.reqPerReq; precalcBatch = g_args.reqPerReq;
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { if (precalcBatch > MAX_SAMPLES)
batchPerTblTimes = precalcBatch = MAX_SAMPLES;
g_args.reqPerReq / interlaceRows;
} else {
batchPerTblTimes = 1;
}
pThreadInfo->totalInsertRows = 0; pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0; pThreadInfo->totalAffectedRows = 0;
@ -8515,27 +8507,27 @@ static void* syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, uint32_t inter
uint64_t endTs; uint64_t endTs;
uint64_t tableSeq = pThreadInfo->start_table_from; uint64_t tableSeq = pThreadInfo->start_table_from;
int64_t startTime = pThreadInfo->start_time; int64_t startTime;
uint64_t generatedRecPerTbl = 0;
bool flagSleep = true; bool flagSleep = true;
uint64_t sleepTimeTotal = 0; uint64_t sleepTimeTotal = 0;
int percentComplete = 0; int percentComplete = 0;
int64_t totalRows = insertRows * pThreadInfo->ntables; int64_t totalRows = insertRows * pThreadInfo->ntables;
pThreadInfo->samplePos = 0;
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { for (int64_t interlace = 0;
interlace < timesInterlace; interlace ++) {
if ((flagSleep) && (insert_interval)) { if ((flagSleep) && (insert_interval)) {
st = taosGetTimestampMs(); st = taosGetTimestampMs();
flagSleep = false; flagSleep = false;
} }
uint32_t recOfBatch = 0; int64_t generated = 0;
int64_t samplePos;
int32_t generated; for (; tableSeq < pThreadInfo->start_table_from + pThreadInfo->ntables; tableSeq ++) {
for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
char tableName[TSDB_TABLE_NAME_LEN]; char tableName[TSDB_TABLE_NAME_LEN];
getTableName(tableName, pThreadInfo, tableSeq); getTableName(tableName, pThreadInfo, tableSeq);
if (0 == strlen(tableName)) { if (0 == strlen(tableName)) {
errorPrint2("[%d] %s() LN%d, getTableName return null\n", errorPrint2("[%d] %s() LN%d, getTableName return null\n",
@ -8543,127 +8535,121 @@ static void* syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, uint32_t inter
return NULL; return NULL;
} }
if (stbInfo) { samplePos = pThreadInfo->samplePos;
generated = prepareStbStmtWithSample( startTime = pThreadInfo->start_time
pThreadInfo, + interlace * interlaceRows * timeStampStep;
tableName, uint64_t remainRecPerTbl =
tableSeq, insertRows - interlaceRows * interlace;
batchPerTbl, uint64_t recPerTbl = 0;
insertRows, 0,
startTime, uint64_t remainPerInterlace;
&(pThreadInfo->samplePos)); if (remainRecPerTbl > interlaceRows) {
remainPerInterlace = interlaceRows;
} else { } else {
remainPerInterlace = remainRecPerTbl;
}
while(remainPerInterlace > 0) {
uint32_t batch;
if (remainPerInterlace > precalcBatch) {
batch = precalcBatch;
} else {
batch = remainPerInterlace;
}
debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n", debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n",
pThreadInfo->threadID, pThreadInfo->threadID,
__func__, __LINE__, __func__, __LINE__,
tableName, batchPerTbl, startTime); tableName, batch, startTime);
generated = prepareStmtWithoutStb(
pThreadInfo,
tableName,
batchPerTbl,
insertRows, i,
startTime);
}
debugPrint("[%d] %s() LN%d, generated records is %d\n", if (stbInfo) {
pThreadInfo->threadID, __func__, __LINE__, generated); generated = prepareStbStmt(
if (generated < 0) { pThreadInfo,
errorPrint2("[%d] %s() LN%d, generated records is %d\n", tableName,
tableSeq,
batch,
insertRows, 0,
startTime,
&samplePos);
} else {
generated = prepareStmtWithoutStb(
pThreadInfo,
tableName,
batch,
insertRows,
interlaceRows * interlace + recPerTbl,
startTime);
}
debugPrint("[%d] %s() LN%d, generated records is %"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__, generated); pThreadInfo->threadID, __func__, __LINE__, generated);
goto free_of_interlace_stmt; if (generated < 0) {
} else if (generated == 0) { errorPrint2("[%d] %s() LN%d, generated records is %"PRId64"\n",
break; pThreadInfo->threadID, __func__, __LINE__, generated);
} goto free_of_interlace_stmt;
} else if (generated == 0) {
tableSeq ++;
recOfBatch += batchPerTbl;
pThreadInfo->totalInsertRows += batchPerTbl;
verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl, recOfBatch);
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table
tableSeq = pThreadInfo->start_table_from;
generatedRecPerTbl += batchPerTbl;
startTime = pThreadInfo->start_time
+ generatedRecPerTbl * nTimeStampStep;
flagSleep = true;
if (generatedRecPerTbl >= insertRows)
break; break;
}
int64_t remainRows = insertRows - generatedRecPerTbl; recPerTbl += generated;
if ((remainRows > 0) && (batchPerTbl > remainRows)) remainPerInterlace -= generated;
batchPerTbl = remainRows; pThreadInfo->totalInsertRows += generated;
if (pThreadInfo->ntables * batchPerTbl < g_args.reqPerReq) verbosePrint("[%d] %s() LN%d totalInsertRows=%"PRIu64"\n",
break; pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->totalInsertRows);
startTs = taosGetTimestampUs();
int64_t affectedRows = execInsert(pThreadInfo, generated);
endTs = taosGetTimestampUs();
uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %10.2f ms\n",
__func__, __LINE__, delay / 1000.0);
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay;
if (generated != affectedRows) {
errorPrint2("[%d] %s() LN%d execInsert() insert %"PRId64", affected rows: %"PRId64"\n\n",
pThreadInfo->threadID, __func__, __LINE__,
generated, affectedRows);
goto free_of_interlace_stmt;
}
pThreadInfo->totalAffectedRows += affectedRows;
int currentPercent = pThreadInfo->totalAffectedRows * 100 / totalRows;
if (currentPercent > percentComplete ) {
printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent);
percentComplete = currentPercent;
}
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
lastPrintTime = currentPrintTime;
}
startTime += (generated * timeStampStep);
} }
verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__,
generatedRecPerTbl, insertRows);
if ((g_args.reqPerReq - recOfBatch) < batchPerTbl)
break;
} }
pThreadInfo->samplePos = samplePos;
verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n", if (tableSeq == pThreadInfo->start_table_from
pThreadInfo->threadID, __func__, __LINE__, recOfBatch, + pThreadInfo->ntables) {
pThreadInfo->totalInsertRows); // turn to first table
tableSeq = pThreadInfo->start_table_from;
startTs = taosGetTimestampUs(); flagSleep = true;
if (recOfBatch == 0) {
errorPrint2("[%d] %s() LN%d Failed to insert records of batch %d\n",
pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl);
if (batchPerTbl > 0) {
errorPrint("\tIf the batch is %d, the length of the SQL to insert a row must be less then %"PRId64"\n",
batchPerTbl, maxSqlLen / batchPerTbl);
}
goto free_of_interlace_stmt;
}
int64_t affectedRows = execInsert(pThreadInfo, recOfBatch);
endTs = taosGetTimestampUs();
uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %10.2f ms\n",
__func__, __LINE__, delay / 1000.0);
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay;
if (recOfBatch != affectedRows) {
errorPrint2("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n\n",
pThreadInfo->threadID, __func__, __LINE__,
recOfBatch, affectedRows);
goto free_of_interlace_stmt;
}
pThreadInfo->totalAffectedRows += affectedRows;
int currentPercent = pThreadInfo->totalAffectedRows * 100 / totalRows;
if (currentPercent > percentComplete ) {
printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent);
percentComplete = currentPercent;
}
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
lastPrintTime = currentPrintTime;
} }
if ((insert_interval) && flagSleep) { if ((insert_interval) && flagSleep) {
@ -8693,7 +8679,7 @@ static void* syncWriteInterlaceStmt(threadInfo *pThreadInfo, uint32_t interlaceR
int64_t insertRows; int64_t insertRows;
uint64_t maxSqlLen; uint64_t maxSqlLen;
int64_t nTimeStampStep; int64_t timeStampStep;
uint64_t insert_interval; uint64_t insert_interval;
SSuperTable* stbInfo = pThreadInfo->stbInfo; SSuperTable* stbInfo = pThreadInfo->stbInfo;
@ -8701,12 +8687,12 @@ static void* syncWriteInterlaceStmt(threadInfo *pThreadInfo, uint32_t interlaceR
if (stbInfo) { if (stbInfo) {
insertRows = stbInfo->insertRows; insertRows = stbInfo->insertRows;
maxSqlLen = stbInfo->maxSqlLen; maxSqlLen = stbInfo->maxSqlLen;
nTimeStampStep = stbInfo->timeStampStep; timeStampStep = stbInfo->timeStampStep;
insert_interval = stbInfo->insertInterval; insert_interval = stbInfo->insertInterval;
} else { } else {
insertRows = g_args.insertRows; insertRows = g_args.insertRows;
maxSqlLen = g_args.max_sql_len; maxSqlLen = g_args.max_sql_len;
nTimeStampStep = g_args.timestamp_step; timeStampStep = g_args.timestamp_step;
insert_interval = g_args.insert_interval; insert_interval = g_args.insert_interval;
} }
@ -8767,8 +8753,12 @@ static void* syncWriteInterlaceStmt(threadInfo *pThreadInfo, uint32_t interlaceR
return NULL; return NULL;
} }
debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n",
pThreadInfo->threadID,
__func__, __LINE__,
tableName, batchPerTbl, startTime);
if (stbInfo) { if (stbInfo) {
generated = prepareStbStmtWithSample( generated = prepareStbStmt(
pThreadInfo, pThreadInfo,
tableName, tableName,
tableSeq, tableSeq,
@ -8777,10 +8767,6 @@ static void* syncWriteInterlaceStmt(threadInfo *pThreadInfo, uint32_t interlaceR
startTime, startTime,
&(pThreadInfo->samplePos)); &(pThreadInfo->samplePos));
} else { } else {
debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n",
pThreadInfo->threadID,
__func__, __LINE__,
tableName, batchPerTbl, startTime);
generated = prepareStmtWithoutStb( generated = prepareStmtWithoutStb(
pThreadInfo, pThreadInfo,
tableName, tableName,
@ -8814,7 +8800,7 @@ static void* syncWriteInterlaceStmt(threadInfo *pThreadInfo, uint32_t interlaceR
generatedRecPerTbl += batchPerTbl; generatedRecPerTbl += batchPerTbl;
startTime = pThreadInfo->start_time startTime = pThreadInfo->start_time
+ generatedRecPerTbl * nTimeStampStep; + generatedRecPerTbl * timeStampStep;
flagSleep = true; flagSleep = true;
if (generatedRecPerTbl >= insertRows) if (generatedRecPerTbl >= insertRows)
@ -8919,7 +8905,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows)
int64_t insertRows; int64_t insertRows;
uint64_t maxSqlLen; uint64_t maxSqlLen;
int64_t nTimeStampStep; int64_t timeStampStep;
uint64_t insert_interval; uint64_t insert_interval;
SSuperTable* stbInfo = pThreadInfo->stbInfo; SSuperTable* stbInfo = pThreadInfo->stbInfo;
@ -8927,12 +8913,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows)
if (stbInfo) { if (stbInfo) {
insertRows = stbInfo->insertRows; insertRows = stbInfo->insertRows;
maxSqlLen = stbInfo->maxSqlLen; maxSqlLen = stbInfo->maxSqlLen;
nTimeStampStep = stbInfo->timeStampStep; timeStampStep = stbInfo->timeStampStep;
insert_interval = stbInfo->insertInterval; insert_interval = stbInfo->insertInterval;
} else { } else {
insertRows = g_args.insertRows; insertRows = g_args.insertRows;
maxSqlLen = g_args.max_sql_len; maxSqlLen = g_args.max_sql_len;
nTimeStampStep = g_args.timestamp_step; timeStampStep = g_args.timestamp_step;
insert_interval = g_args.insert_interval; insert_interval = g_args.insert_interval;
} }
@ -9075,7 +9061,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows)
generatedRecPerTbl += batchPerTbl; generatedRecPerTbl += batchPerTbl;
startTime = pThreadInfo->start_time startTime = pThreadInfo->start_time
+ generatedRecPerTbl * nTimeStampStep; + generatedRecPerTbl * timeStampStep;
flagSleep = true; flagSleep = true;
if (generatedRecPerTbl >= insertRows) if (generatedRecPerTbl >= insertRows)
@ -9176,6 +9162,144 @@ free_of_interlace:
return NULL; return NULL;
} }
static void* syncWriteProgressiveStmt(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### stmt progressive write\n", __func__, __LINE__);
SSuperTable* stbInfo = pThreadInfo->stbInfo;
int64_t timeStampStep =
stbInfo?stbInfo->timeStampStep:g_args.timestamp_step;
int64_t insertRows =
(stbInfo)?stbInfo->insertRows:g_args.insertRows;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n",
__func__, __LINE__, insertRows);
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
uint64_t endTs;
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
pThreadInfo->samplePos = 0;
int percentComplete = 0;
int64_t totalRows = insertRows * pThreadInfo->ntables;
for (uint64_t tableSeq = pThreadInfo->start_table_from;
tableSeq <= pThreadInfo->end_table_to;
tableSeq ++) {
int64_t start_time = pThreadInfo->start_time;
for (uint64_t i = 0; i < insertRows;) {
char tableName[TSDB_TABLE_NAME_LEN];
getTableName(tableName, pThreadInfo, tableSeq);
verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n",
__func__, __LINE__,
pThreadInfo->threadID, tableSeq, tableName);
if (0 == strlen(tableName)) {
errorPrint2("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__);
return NULL;
}
// measure prepare + insert
startTs = taosGetTimestampUs();
int32_t generated;
if (stbInfo) {
generated = prepareStbStmt(
pThreadInfo,
tableName,
tableSeq,
(g_args.reqPerReq>stbInfo->insertRows)?
stbInfo->insertRows:
g_args.reqPerReq,
insertRows, i, start_time,
&(pThreadInfo->samplePos));
} else {
generated = prepareStmtWithoutStb(
pThreadInfo,
tableName,
g_args.reqPerReq,
insertRows, i,
start_time);
}
verbosePrint("[%d] %s() LN%d generated=%d\n",
pThreadInfo->threadID,
__func__, __LINE__, generated);
if (generated > 0)
i += generated;
else
goto free_of_stmt_progressive;
start_time += generated * timeStampStep;
pThreadInfo->totalInsertRows += generated;
// only measure insert
// startTs = taosGetTimestampUs();
int32_t affectedRows = execInsert(pThreadInfo, generated);
endTs = taosGetTimestampUs();
uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %10.f ms\n",
__func__, __LINE__, delay/1000.0);
verbosePrint("[%d] %s() LN%d affectedRows=%d\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay;
if (affectedRows < 0) {
errorPrint2("%s() LN%d, affected rows: %d\n",
__func__, __LINE__, affectedRows);
goto free_of_stmt_progressive;
}
pThreadInfo->totalAffectedRows += affectedRows;
int currentPercent = pThreadInfo->totalAffectedRows * 100 / totalRows;
if (currentPercent > percentComplete ) {
printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent);
percentComplete = currentPercent;
}
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
lastPrintTime = currentPrintTime;
}
if (i >= insertRows)
break;
} // insertRows
if ((g_args.verbose_print) &&
(tableSeq == pThreadInfo->ntables - 1) && (stbInfo)
&& (0 == strncasecmp(
stbInfo->dataSource,
"sample", strlen("sample")))) {
verbosePrint("%s() LN%d samplePos=%"PRId64"\n",
__func__, __LINE__, pThreadInfo->samplePos);
}
} // tableSeq
if (percentComplete < 100) {
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
}
free_of_stmt_progressive:
tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo);
return NULL;
}
// sync insertion progressive data // sync insertion progressive data
static void* syncWriteProgressive(threadInfo *pThreadInfo) { static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
@ -9242,7 +9366,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
int32_t generated; int32_t generated;
if (stbInfo) { if (stbInfo) {
if (stbInfo->iface == STMT_IFACE) { if (stbInfo->iface == STMT_IFACE) {
generated = prepareStbStmtWithSample( generated = prepareStbStmt(
pThreadInfo, pThreadInfo,
tableName, tableName,
tableSeq, tableSeq,
@ -9374,20 +9498,28 @@ static void* syncWrite(void *sarg) {
if (interlaceRows > 0) { if (interlaceRows > 0) {
// interlace mode // interlace mode
if (((stbInfo) && (STMT_IFACE == stbInfo->iface)) if (stbInfo) {
|| (STMT_IFACE == g_args.iface)) { if (STMT_IFACE == stbInfo->iface) {
#if STMT_BIND_PARAM_BATCH == 1 #if STMT_BIND_PARAM_BATCH == 1
return syncWriteInterlaceStmtBatch(pThreadInfo, interlaceRows); return syncWriteInterlaceStmtBatch(pThreadInfo, interlaceRows);
#else #else
return syncWriteInterlaceStmt(pThreadInfo, interlaceRows); return syncWriteInterlaceStmt(pThreadInfo, interlaceRows);
#endif #endif
} else { } else {
return syncWriteInterlace(pThreadInfo, interlaceRows); return syncWriteInterlace(pThreadInfo, interlaceRows);
}
} }
}else { } else {
// progressive mode // progressive mode
return syncWriteProgressive(pThreadInfo); if (((stbInfo) && (STMT_IFACE == stbInfo->iface))
|| (STMT_IFACE == g_args.iface)) {
return syncWriteProgressiveStmt(pThreadInfo);
} else {
return syncWriteProgressive(pThreadInfo);
}
} }
return NULL;
} }
static void callBack(void *param, TAOS_RES *res, int code) { static void callBack(void *param, TAOS_RES *res, int code) {
@ -9518,24 +9650,24 @@ static void startMultiThreadInsertData(int threads, char* db_name,
} }
} }
int64_t start_time; int64_t startTime;
if (stbInfo) { if (stbInfo) {
if (0 == strncasecmp(stbInfo->startTimestamp, "now", 3)) { if (0 == strncasecmp(stbInfo->startTimestamp, "now", 3)) {
start_time = taosGetTimestamp(timePrec); startTime = taosGetTimestamp(timePrec);
} else { } else {
if (TSDB_CODE_SUCCESS != taosParseTime( if (TSDB_CODE_SUCCESS != taosParseTime(
stbInfo->startTimestamp, stbInfo->startTimestamp,
&start_time, &startTime,
strlen(stbInfo->startTimestamp), strlen(stbInfo->startTimestamp),
timePrec, 0)) { timePrec, 0)) {
ERROR_EXIT("failed to parse time!\n"); ERROR_EXIT("failed to parse time!\n");
} }
} }
} else { } else {
start_time = DEFAULT_START_TIME; startTime = DEFAULT_START_TIME;
} }
debugPrint("%s() LN%d, start_time= %"PRId64"\n", debugPrint("%s() LN%d, startTime= %"PRId64"\n",
__func__, __LINE__, start_time); __func__, __LINE__, startTime);
// read sample data from file first // read sample data from file first
int ret; int ret;
@ -9655,14 +9787,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
} }
pthread_t *pids = calloc(1, threads * sizeof(pthread_t)); pthread_t *pids = calloc(1, threads * sizeof(pthread_t));
assert(pids != NULL);
threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
assert(pids != NULL);
assert(infos != NULL); assert(infos != NULL);
memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo));
char *stmtBuffer = calloc(1, BUFFER_SIZE); char *stmtBuffer = calloc(1, BUFFER_SIZE);
assert(stmtBuffer); assert(stmtBuffer);
@ -9671,18 +9799,8 @@ static void startMultiThreadInsertData(int threads, char* db_name,
uint32_t batch; uint32_t batch;
if (stbInfo) { if (stbInfo) {
if ((stbInfo->interlaceRows == 0) if (stbInfo->interlaceRows < stbInfo->insertRows)
&& (g_args.interlaceRows > 0)
) {
interlaceRows = g_args.interlaceRows;
} else {
interlaceRows = stbInfo->interlaceRows; interlaceRows = stbInfo->interlaceRows;
}
if (interlaceRows > stbInfo->insertRows) {
interlaceRows = 0;
}
} else { } else {
if (g_args.interlaceRows < g_args.insertRows) if (g_args.interlaceRows < g_args.insertRows)
interlaceRows = g_args.interlaceRows; interlaceRows = g_args.interlaceRows;
@ -9739,7 +9857,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pThreadInfo->time_precision = timePrec; pThreadInfo->time_precision = timePrec;
pThreadInfo->stbInfo = stbInfo; pThreadInfo->stbInfo = stbInfo;
pThreadInfo->start_time = start_time; pThreadInfo->start_time = startTime;
pThreadInfo->minDelay = UINT64_MAX; pThreadInfo->minDelay = UINT64_MAX;
if ((NULL == stbInfo) || if ((NULL == stbInfo) ||
@ -9955,7 +10073,7 @@ static void *readTable(void *sarg) {
char *command = calloc(1, BUFFER_SIZE); char *command = calloc(1, BUFFER_SIZE);
assert(command); assert(command);
uint64_t sTime = pThreadInfo->start_time; uint64_t startTime = pThreadInfo->start_time;
char *tb_prefix = pThreadInfo->tb_prefix; char *tb_prefix = pThreadInfo->tb_prefix;
FILE *fp = fopen(pThreadInfo->filePath, "a"); FILE *fp = fopen(pThreadInfo->filePath, "a");
if (NULL == fp) { if (NULL == fp) {
@ -9988,7 +10106,7 @@ static void *readTable(void *sarg) {
uint64_t count = 0; uint64_t count = 0;
for (int64_t i = 0; i < ntables; i++) { for (int64_t i = 0; i < ntables; i++) {
sprintf(command, "SELECT %s FROM %s%"PRId64" WHERE ts>= %" PRIu64, sprintf(command, "SELECT %s FROM %s%"PRId64" WHERE ts>= %" PRIu64,
g_aggreFunc[j], tb_prefix, i, sTime); g_aggreFunc[j], tb_prefix, i, startTime);
double t = taosGetTimestampMs(); double t = taosGetTimestampMs();
TAOS_RES *pSql = taos_query(taos, command); TAOS_RES *pSql = taos_query(taos, command);