Hotfix/sangshuduo/td 5872 taosdemo stmt improve (#7853)
* [TD-5872]<fix>: taosdemo stmt improve. * refactor stmt functions. * [TD-5872]<fix>: taosdemo stmt csv perf improve. * rand func back to early impl. * fix windows/mac compile error. * fix empty tag sample. * [TD-5873]<test>add stmt’performance taosdemo testcase * add data_type enum and stmt_batch framework. * use data type enum and fix test case limit/offset. * revert thread number. * rename MAX_SAMPLES_ONCE_FROM_FILE to reflect reality. * split func for stmt interlace. Co-authored-by: Shuduo Sang <sdsang@taosdata.com> Co-authored-by: tomchon <haoran920c@163.com>
This commit is contained in:
parent
875d569131
commit
d42ed29138
|
@ -244,7 +244,7 @@ typedef struct SArguments_S {
|
|||
uint64_t insert_interval;
|
||||
uint64_t timestamp_step;
|
||||
int64_t query_times;
|
||||
uint32_t interlace_rows;
|
||||
uint32_t interlaceRows;
|
||||
uint32_t reqPerReq; // num_of_records_per_req
|
||||
uint64_t max_sql_len;
|
||||
int64_t ntables;
|
||||
|
@ -451,14 +451,13 @@ typedef struct SQueryMetaInfo_S {
|
|||
typedef struct SThreadInfo_S {
|
||||
TAOS * taos;
|
||||
TAOS_STMT *stmt;
|
||||
int64_t *bind_ts;
|
||||
|
||||
#if STMT_BIND_PARAM_BATCH == 1
|
||||
int64_t *bind_ts;
|
||||
int64_t *bind_ts_array;
|
||||
char *bindParams;
|
||||
char *is_null;
|
||||
#else
|
||||
int64_t *bind_ts;
|
||||
char* sampleBindArray;
|
||||
#endif
|
||||
|
||||
|
@ -607,8 +606,8 @@ char *g_rand_current_buff = NULL;
|
|||
char *g_rand_phase_buff = NULL;
|
||||
char *g_randdouble_buff = NULL;
|
||||
|
||||
char *g_aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)",
|
||||
"max(col0)", "min(col0)", "first(col0)", "last(col0)"};
|
||||
char *g_aggreFunc[] = {"*", "count(*)", "avg(C0)", "sum(C0)",
|
||||
"max(C0)", "min(C0)", "first(C0)", "last(C0)"};
|
||||
|
||||
SArguments g_args = {
|
||||
NULL, // metaFile
|
||||
|
@ -652,7 +651,7 @@ SArguments g_args = {
|
|||
0, // insert_interval
|
||||
DEFAULT_TIMESTAMP_STEP, // timestamp_step
|
||||
1, // query_times
|
||||
DEFAULT_INTERLACE_ROWS, // interlace_rows;
|
||||
DEFAULT_INTERLACE_ROWS, // interlaceRows;
|
||||
30000, // reqPerReq
|
||||
(1024*1024), // max_sql_len
|
||||
DEFAULT_CHILDTABLES, // ntables
|
||||
|
@ -1310,17 +1309,17 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
|
|||
errorPrintReqArg2(argv[0], "B");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
arguments->interlace_rows = atoi(argv[++i]);
|
||||
arguments->interlaceRows = atoi(argv[++i]);
|
||||
} else if (0 == strncmp(argv[i], "--interlace-rows=", strlen("--interlace-rows="))) {
|
||||
if (isStringNumber((char *)(argv[i] + strlen("--interlace-rows=")))) {
|
||||
arguments->interlace_rows = atoi((char *)(argv[i]+strlen("--interlace-rows=")));
|
||||
arguments->interlaceRows = atoi((char *)(argv[i]+strlen("--interlace-rows=")));
|
||||
} else {
|
||||
errorPrintReqArg2(argv[0], "--interlace-rows");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
} else if (0 == strncmp(argv[i], "-B", strlen("-B"))) {
|
||||
if (isStringNumber((char *)(argv[i] + strlen("-B")))) {
|
||||
arguments->interlace_rows = atoi((char *)(argv[i]+strlen("-B")));
|
||||
arguments->interlaceRows = atoi((char *)(argv[i]+strlen("-B")));
|
||||
} else {
|
||||
errorPrintReqArg2(argv[0], "-B");
|
||||
exit(EXIT_FAILURE);
|
||||
|
@ -1333,7 +1332,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
|
|||
errorPrintReqArg2(argv[0], "--interlace-rows");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
arguments->interlace_rows = atoi(argv[++i]);
|
||||
arguments->interlaceRows = atoi(argv[++i]);
|
||||
} else {
|
||||
errorUnrecognized(argv[0], argv[i]);
|
||||
exit(EXIT_FAILURE);
|
||||
|
@ -4859,15 +4858,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
cJSON* interlaceRows = cJSON_GetObjectItem(root, "interlace_rows");
|
||||
if (interlaceRows && interlaceRows->type == cJSON_Number) {
|
||||
if (interlaceRows->valueint < 0) {
|
||||
errorPrint("%s", "failed to read json, interlace_rows input mistake\n");
|
||||
errorPrint("%s", "failed to read json, interlaceRows input mistake\n");
|
||||
goto PARSE_OVER;
|
||||
|
||||
}
|
||||
g_args.interlace_rows = interlaceRows->valueint;
|
||||
g_args.interlaceRows = interlaceRows->valueint;
|
||||
} else if (!interlaceRows) {
|
||||
g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
|
||||
g_args.interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
|
||||
} else {
|
||||
errorPrint("%s", "failed to read json, interlace_rows input mistake\n");
|
||||
errorPrint("%s", "failed to read json, interlaceRows input mistake\n");
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -4929,13 +4928,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
}
|
||||
|
||||
// rows per table need be less than insert batch
|
||||
if (g_args.interlace_rows > g_args.reqPerReq) {
|
||||
if (g_args.interlaceRows > g_args.reqPerReq) {
|
||||
printf("NOTICE: interlace rows value %u > num_of_records_per_req %u\n\n",
|
||||
g_args.interlace_rows, g_args.reqPerReq);
|
||||
g_args.interlaceRows, g_args.reqPerReq);
|
||||
printf(" interlace rows value will be set to num_of_records_per_req %u\n\n",
|
||||
g_args.reqPerReq);
|
||||
prompt();
|
||||
g_args.interlace_rows = g_args.reqPerReq;
|
||||
g_args.interlaceRows = g_args.reqPerReq;
|
||||
}
|
||||
|
||||
cJSON* dbs = cJSON_GetObjectItem(root, "databases");
|
||||
|
@ -8462,13 +8461,13 @@ static void printStatPerThread(threadInfo *pThreadInfo)
|
|||
);
|
||||
}
|
||||
|
||||
// sync write interlace data
|
||||
static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||
debugPrint("[%d] %s() LN%d: ### interlace write\n",
|
||||
#if STMT_BIND_PARAM_BATCH == 1
|
||||
// stmt sync write interlace data
|
||||
static void* syncWriteInterlaceStmtBatch(threadInfo *pThreadInfo, uint32_t interlaceRows) {
|
||||
debugPrint("[%d] %s() LN%d: ### stmt interlace write\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__);
|
||||
|
||||
int64_t insertRows;
|
||||
uint32_t interlaceRows;
|
||||
uint64_t maxSqlLen;
|
||||
int64_t nTimeStampStep;
|
||||
uint64_t insert_interval;
|
||||
|
@ -8477,19 +8476,11 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
|||
|
||||
if (stbInfo) {
|
||||
insertRows = stbInfo->insertRows;
|
||||
|
||||
if ((stbInfo->interlaceRows == 0)
|
||||
&& (g_args.interlace_rows > 0)) {
|
||||
interlaceRows = g_args.interlace_rows;
|
||||
} else {
|
||||
interlaceRows = stbInfo->interlaceRows;
|
||||
}
|
||||
maxSqlLen = stbInfo->maxSqlLen;
|
||||
nTimeStampStep = stbInfo->timeStampStep;
|
||||
insert_interval = stbInfo->insertInterval;
|
||||
} else {
|
||||
insertRows = g_args.insertRows;
|
||||
interlaceRows = g_args.interlace_rows;
|
||||
maxSqlLen = g_args.max_sql_len;
|
||||
nTimeStampStep = g_args.timestamp_step;
|
||||
insert_interval = g_args.insert_interval;
|
||||
|
@ -8500,9 +8491,456 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
|||
pThreadInfo->start_table_from,
|
||||
pThreadInfo->ntables, insertRows);
|
||||
|
||||
if (interlaceRows > insertRows)
|
||||
interlaceRows = insertRows;
|
||||
uint32_t batchPerTbl = interlaceRows;
|
||||
uint32_t batchPerTblTimes;
|
||||
|
||||
if (interlaceRows > g_args.reqPerReq)
|
||||
interlaceRows = g_args.reqPerReq;
|
||||
|
||||
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
|
||||
batchPerTblTimes =
|
||||
g_args.reqPerReq / interlaceRows;
|
||||
} else {
|
||||
batchPerTblTimes = 1;
|
||||
}
|
||||
|
||||
pThreadInfo->totalInsertRows = 0;
|
||||
pThreadInfo->totalAffectedRows = 0;
|
||||
|
||||
uint64_t st = 0;
|
||||
uint64_t et = UINT64_MAX;
|
||||
|
||||
uint64_t lastPrintTime = taosGetTimestampMs();
|
||||
uint64_t startTs = taosGetTimestampMs();
|
||||
uint64_t endTs;
|
||||
|
||||
uint64_t tableSeq = pThreadInfo->start_table_from;
|
||||
int64_t startTime = pThreadInfo->start_time;
|
||||
|
||||
uint64_t generatedRecPerTbl = 0;
|
||||
bool flagSleep = true;
|
||||
uint64_t sleepTimeTotal = 0;
|
||||
|
||||
int percentComplete = 0;
|
||||
int64_t totalRows = insertRows * pThreadInfo->ntables;
|
||||
|
||||
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
|
||||
if ((flagSleep) && (insert_interval)) {
|
||||
st = taosGetTimestampMs();
|
||||
flagSleep = false;
|
||||
}
|
||||
|
||||
uint32_t recOfBatch = 0;
|
||||
|
||||
int32_t generated;
|
||||
for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
|
||||
char tableName[TSDB_TABLE_NAME_LEN];
|
||||
|
||||
getTableName(tableName, pThreadInfo, tableSeq);
|
||||
if (0 == strlen(tableName)) {
|
||||
errorPrint2("[%d] %s() LN%d, getTableName return null\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (stbInfo) {
|
||||
generated = prepareStbStmtWithSample(
|
||||
pThreadInfo,
|
||||
tableName,
|
||||
tableSeq,
|
||||
batchPerTbl,
|
||||
insertRows, 0,
|
||||
startTime,
|
||||
&(pThreadInfo->samplePos));
|
||||
} else {
|
||||
debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n",
|
||||
pThreadInfo->threadID,
|
||||
__func__, __LINE__,
|
||||
tableName, batchPerTbl, startTime);
|
||||
generated = prepareStmtWithoutStb(
|
||||
pThreadInfo,
|
||||
tableName,
|
||||
batchPerTbl,
|
||||
insertRows, i,
|
||||
startTime);
|
||||
}
|
||||
|
||||
debugPrint("[%d] %s() LN%d, generated records is %d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__, generated);
|
||||
if (generated < 0) {
|
||||
errorPrint2("[%d] %s() LN%d, generated records is %d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__, generated);
|
||||
goto free_of_interlace_stmt;
|
||||
} else if (generated == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
tableSeq ++;
|
||||
recOfBatch += batchPerTbl;
|
||||
|
||||
pThreadInfo->totalInsertRows += batchPerTbl;
|
||||
|
||||
verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
batchPerTbl, recOfBatch);
|
||||
|
||||
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
|
||||
// turn to first table
|
||||
tableSeq = pThreadInfo->start_table_from;
|
||||
generatedRecPerTbl += batchPerTbl;
|
||||
|
||||
startTime = pThreadInfo->start_time
|
||||
+ generatedRecPerTbl * nTimeStampStep;
|
||||
|
||||
flagSleep = true;
|
||||
if (generatedRecPerTbl >= insertRows)
|
||||
break;
|
||||
|
||||
int64_t remainRows = insertRows - generatedRecPerTbl;
|
||||
if ((remainRows > 0) && (batchPerTbl > remainRows))
|
||||
batchPerTbl = remainRows;
|
||||
|
||||
if (pThreadInfo->ntables * batchPerTbl < g_args.reqPerReq)
|
||||
break;
|
||||
}
|
||||
|
||||
verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
generatedRecPerTbl, insertRows);
|
||||
|
||||
if ((g_args.reqPerReq - recOfBatch) < batchPerTbl)
|
||||
break;
|
||||
}
|
||||
|
||||
verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
|
||||
pThreadInfo->totalInsertRows);
|
||||
|
||||
startTs = taosGetTimestampUs();
|
||||
|
||||
if (recOfBatch == 0) {
|
||||
errorPrint2("[%d] %s() LN%d Failed to insert records of batch %d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
batchPerTbl);
|
||||
if (batchPerTbl > 0) {
|
||||
errorPrint("\tIf the batch is %d, the length of the SQL to insert a row must be less then %"PRId64"\n",
|
||||
batchPerTbl, maxSqlLen / batchPerTbl);
|
||||
}
|
||||
goto free_of_interlace_stmt;
|
||||
}
|
||||
int64_t affectedRows = execInsert(pThreadInfo, recOfBatch);
|
||||
|
||||
endTs = taosGetTimestampUs();
|
||||
uint64_t delay = endTs - startTs;
|
||||
performancePrint("%s() LN%d, insert execution time is %10.2f ms\n",
|
||||
__func__, __LINE__, delay / 1000.0);
|
||||
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
|
||||
pThreadInfo->threadID,
|
||||
__func__, __LINE__, affectedRows);
|
||||
|
||||
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
|
||||
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
|
||||
pThreadInfo->cntDelay++;
|
||||
pThreadInfo->totalDelay += delay;
|
||||
|
||||
if (recOfBatch != affectedRows) {
|
||||
errorPrint2("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
recOfBatch, affectedRows);
|
||||
goto free_of_interlace_stmt;
|
||||
}
|
||||
|
||||
pThreadInfo->totalAffectedRows += affectedRows;
|
||||
|
||||
int currentPercent = pThreadInfo->totalAffectedRows * 100 / totalRows;
|
||||
if (currentPercent > percentComplete ) {
|
||||
printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent);
|
||||
percentComplete = currentPercent;
|
||||
}
|
||||
int64_t currentPrintTime = taosGetTimestampMs();
|
||||
if (currentPrintTime - lastPrintTime > 30*1000) {
|
||||
printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n",
|
||||
pThreadInfo->threadID,
|
||||
pThreadInfo->totalInsertRows,
|
||||
pThreadInfo->totalAffectedRows);
|
||||
lastPrintTime = currentPrintTime;
|
||||
}
|
||||
|
||||
if ((insert_interval) && flagSleep) {
|
||||
et = taosGetTimestampMs();
|
||||
|
||||
if (insert_interval > (et - st) ) {
|
||||
uint64_t sleepTime = insert_interval - (et -st);
|
||||
performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n",
|
||||
__func__, __LINE__, sleepTime);
|
||||
taosMsleep(sleepTime); // ms
|
||||
sleepTimeTotal += insert_interval;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (percentComplete < 100)
|
||||
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
|
||||
|
||||
free_of_interlace_stmt:
|
||||
printStatPerThread(pThreadInfo);
|
||||
return NULL;
|
||||
}
|
||||
#else
|
||||
// stmt sync write interlace data
|
||||
static void* syncWriteInterlaceStmt(threadInfo *pThreadInfo, uint32_t interlaceRows) {
|
||||
debugPrint("[%d] %s() LN%d: ### stmt interlace write\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__);
|
||||
|
||||
int64_t insertRows;
|
||||
uint64_t maxSqlLen;
|
||||
int64_t nTimeStampStep;
|
||||
uint64_t insert_interval;
|
||||
|
||||
SSuperTable* stbInfo = pThreadInfo->stbInfo;
|
||||
|
||||
if (stbInfo) {
|
||||
insertRows = stbInfo->insertRows;
|
||||
maxSqlLen = stbInfo->maxSqlLen;
|
||||
nTimeStampStep = stbInfo->timeStampStep;
|
||||
insert_interval = stbInfo->insertInterval;
|
||||
} else {
|
||||
insertRows = g_args.insertRows;
|
||||
maxSqlLen = g_args.max_sql_len;
|
||||
nTimeStampStep = g_args.timestamp_step;
|
||||
insert_interval = g_args.insert_interval;
|
||||
}
|
||||
|
||||
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
pThreadInfo->start_table_from,
|
||||
pThreadInfo->ntables, insertRows);
|
||||
|
||||
uint32_t batchPerTbl = interlaceRows;
|
||||
uint32_t batchPerTblTimes;
|
||||
|
||||
if (interlaceRows > g_args.reqPerReq)
|
||||
interlaceRows = g_args.reqPerReq;
|
||||
|
||||
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
|
||||
batchPerTblTimes =
|
||||
g_args.reqPerReq / interlaceRows;
|
||||
} else {
|
||||
batchPerTblTimes = 1;
|
||||
}
|
||||
|
||||
pThreadInfo->totalInsertRows = 0;
|
||||
pThreadInfo->totalAffectedRows = 0;
|
||||
|
||||
uint64_t st = 0;
|
||||
uint64_t et = UINT64_MAX;
|
||||
|
||||
uint64_t lastPrintTime = taosGetTimestampMs();
|
||||
uint64_t startTs = taosGetTimestampMs();
|
||||
uint64_t endTs;
|
||||
|
||||
uint64_t tableSeq = pThreadInfo->start_table_from;
|
||||
int64_t startTime = pThreadInfo->start_time;
|
||||
|
||||
uint64_t generatedRecPerTbl = 0;
|
||||
bool flagSleep = true;
|
||||
uint64_t sleepTimeTotal = 0;
|
||||
|
||||
int percentComplete = 0;
|
||||
int64_t totalRows = insertRows * pThreadInfo->ntables;
|
||||
|
||||
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
|
||||
if ((flagSleep) && (insert_interval)) {
|
||||
st = taosGetTimestampMs();
|
||||
flagSleep = false;
|
||||
}
|
||||
|
||||
uint32_t recOfBatch = 0;
|
||||
|
||||
int32_t generated;
|
||||
for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
|
||||
char tableName[TSDB_TABLE_NAME_LEN];
|
||||
|
||||
getTableName(tableName, pThreadInfo, tableSeq);
|
||||
if (0 == strlen(tableName)) {
|
||||
errorPrint2("[%d] %s() LN%d, getTableName return null\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (stbInfo) {
|
||||
generated = prepareStbStmtWithSample(
|
||||
pThreadInfo,
|
||||
tableName,
|
||||
tableSeq,
|
||||
batchPerTbl,
|
||||
insertRows, 0,
|
||||
startTime,
|
||||
&(pThreadInfo->samplePos));
|
||||
} else {
|
||||
debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n",
|
||||
pThreadInfo->threadID,
|
||||
__func__, __LINE__,
|
||||
tableName, batchPerTbl, startTime);
|
||||
generated = prepareStmtWithoutStb(
|
||||
pThreadInfo,
|
||||
tableName,
|
||||
batchPerTbl,
|
||||
insertRows, i,
|
||||
startTime);
|
||||
}
|
||||
|
||||
debugPrint("[%d] %s() LN%d, generated records is %d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__, generated);
|
||||
if (generated < 0) {
|
||||
errorPrint2("[%d] %s() LN%d, generated records is %d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__, generated);
|
||||
goto free_of_interlace_stmt;
|
||||
} else if (generated == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
tableSeq ++;
|
||||
recOfBatch += batchPerTbl;
|
||||
|
||||
pThreadInfo->totalInsertRows += batchPerTbl;
|
||||
|
||||
verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
batchPerTbl, recOfBatch);
|
||||
|
||||
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
|
||||
// turn to first table
|
||||
tableSeq = pThreadInfo->start_table_from;
|
||||
generatedRecPerTbl += batchPerTbl;
|
||||
|
||||
startTime = pThreadInfo->start_time
|
||||
+ generatedRecPerTbl * nTimeStampStep;
|
||||
|
||||
flagSleep = true;
|
||||
if (generatedRecPerTbl >= insertRows)
|
||||
break;
|
||||
|
||||
int64_t remainRows = insertRows - generatedRecPerTbl;
|
||||
if ((remainRows > 0) && (batchPerTbl > remainRows))
|
||||
batchPerTbl = remainRows;
|
||||
|
||||
if (pThreadInfo->ntables * batchPerTbl < g_args.reqPerReq)
|
||||
break;
|
||||
}
|
||||
|
||||
verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
generatedRecPerTbl, insertRows);
|
||||
|
||||
if ((g_args.reqPerReq - recOfBatch) < batchPerTbl)
|
||||
break;
|
||||
}
|
||||
|
||||
verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
|
||||
pThreadInfo->totalInsertRows);
|
||||
|
||||
startTs = taosGetTimestampUs();
|
||||
|
||||
if (recOfBatch == 0) {
|
||||
errorPrint2("[%d] %s() LN%d Failed to insert records of batch %d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
batchPerTbl);
|
||||
if (batchPerTbl > 0) {
|
||||
errorPrint("\tIf the batch is %d, the length of the SQL to insert a row must be less then %"PRId64"\n",
|
||||
batchPerTbl, maxSqlLen / batchPerTbl);
|
||||
}
|
||||
goto free_of_interlace_stmt;
|
||||
}
|
||||
int64_t affectedRows = execInsert(pThreadInfo, recOfBatch);
|
||||
|
||||
endTs = taosGetTimestampUs();
|
||||
uint64_t delay = endTs - startTs;
|
||||
performancePrint("%s() LN%d, insert execution time is %10.2f ms\n",
|
||||
__func__, __LINE__, delay / 1000.0);
|
||||
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
|
||||
pThreadInfo->threadID,
|
||||
__func__, __LINE__, affectedRows);
|
||||
|
||||
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
|
||||
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
|
||||
pThreadInfo->cntDelay++;
|
||||
pThreadInfo->totalDelay += delay;
|
||||
|
||||
if (recOfBatch != affectedRows) {
|
||||
errorPrint2("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
recOfBatch, affectedRows);
|
||||
goto free_of_interlace_stmt;
|
||||
}
|
||||
|
||||
pThreadInfo->totalAffectedRows += affectedRows;
|
||||
|
||||
int currentPercent = pThreadInfo->totalAffectedRows * 100 / totalRows;
|
||||
if (currentPercent > percentComplete ) {
|
||||
printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent);
|
||||
percentComplete = currentPercent;
|
||||
}
|
||||
int64_t currentPrintTime = taosGetTimestampMs();
|
||||
if (currentPrintTime - lastPrintTime > 30*1000) {
|
||||
printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n",
|
||||
pThreadInfo->threadID,
|
||||
pThreadInfo->totalInsertRows,
|
||||
pThreadInfo->totalAffectedRows);
|
||||
lastPrintTime = currentPrintTime;
|
||||
}
|
||||
|
||||
if ((insert_interval) && flagSleep) {
|
||||
et = taosGetTimestampMs();
|
||||
|
||||
if (insert_interval > (et - st) ) {
|
||||
uint64_t sleepTime = insert_interval - (et -st);
|
||||
performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n",
|
||||
__func__, __LINE__, sleepTime);
|
||||
taosMsleep(sleepTime); // ms
|
||||
sleepTimeTotal += insert_interval;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (percentComplete < 100)
|
||||
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
|
||||
|
||||
free_of_interlace_stmt:
|
||||
printStatPerThread(pThreadInfo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
// sync write interlace data
|
||||
static void* syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) {
|
||||
debugPrint("[%d] %s() LN%d: ### interlace write\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__);
|
||||
|
||||
int64_t insertRows;
|
||||
uint64_t maxSqlLen;
|
||||
int64_t nTimeStampStep;
|
||||
uint64_t insert_interval;
|
||||
|
||||
SSuperTable* stbInfo = pThreadInfo->stbInfo;
|
||||
|
||||
if (stbInfo) {
|
||||
insertRows = stbInfo->insertRows;
|
||||
maxSqlLen = stbInfo->maxSqlLen;
|
||||
nTimeStampStep = stbInfo->timeStampStep;
|
||||
insert_interval = stbInfo->insertInterval;
|
||||
} else {
|
||||
insertRows = g_args.insertRows;
|
||||
maxSqlLen = g_args.max_sql_len;
|
||||
nTimeStampStep = g_args.timestamp_step;
|
||||
insert_interval = g_args.insert_interval;
|
||||
}
|
||||
|
||||
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
pThreadInfo->start_table_from,
|
||||
pThreadInfo->ntables, insertRows);
|
||||
#if 1
|
||||
if (interlaceRows > g_args.reqPerReq)
|
||||
interlaceRows = g_args.reqPerReq;
|
||||
|
||||
|
@ -8515,7 +8953,22 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
|||
} else {
|
||||
batchPerTblTimes = 1;
|
||||
}
|
||||
#else
|
||||
uint32_t batchPerTbl;
|
||||
if (interlaceRows > g_args.reqPerReq)
|
||||
batchPerTbl = g_args.reqPerReq;
|
||||
else
|
||||
batchPerTbl = interlaceRows;
|
||||
|
||||
uint32_t batchPerTblTimes;
|
||||
|
||||
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
|
||||
batchPerTblTimes =
|
||||
interlaceRows / batchPerTbl;
|
||||
} else {
|
||||
batchPerTblTimes = 1;
|
||||
}
|
||||
#endif
|
||||
pThreadInfo->buffer = calloc(maxSqlLen, 1);
|
||||
if (NULL == pThreadInfo->buffer) {
|
||||
errorPrint2( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n",
|
||||
|
@ -8548,6 +9001,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
|||
st = taosGetTimestampMs();
|
||||
flagSleep = false;
|
||||
}
|
||||
|
||||
// generate data
|
||||
memset(pThreadInfo->buffer, 0, maxSqlLen);
|
||||
uint64_t remainderBufLen = maxSqlLen;
|
||||
|
@ -8576,47 +9030,23 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
|||
uint64_t oldRemainderLen = remainderBufLen;
|
||||
|
||||
if (stbInfo) {
|
||||
if (stbInfo->iface == STMT_IFACE) {
|
||||
generated = prepareStbStmtWithSample(
|
||||
pThreadInfo,
|
||||
tableName,
|
||||
tableSeq,
|
||||
batchPerTbl,
|
||||
insertRows, 0,
|
||||
startTime,
|
||||
&(pThreadInfo->samplePos));
|
||||
} else {
|
||||
generated = generateStbInterlaceData(
|
||||
pThreadInfo,
|
||||
tableName, batchPerTbl, i,
|
||||
batchPerTblTimes,
|
||||
tableSeq,
|
||||
pstr,
|
||||
insertRows,
|
||||
startTime,
|
||||
&remainderBufLen);
|
||||
}
|
||||
generated = generateStbInterlaceData(
|
||||
pThreadInfo,
|
||||
tableName, batchPerTbl, i,
|
||||
batchPerTblTimes,
|
||||
tableSeq,
|
||||
pstr,
|
||||
insertRows,
|
||||
startTime,
|
||||
&remainderBufLen);
|
||||
} else {
|
||||
if (g_args.iface == STMT_IFACE) {
|
||||
debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n",
|
||||
pThreadInfo->threadID,
|
||||
__func__, __LINE__,
|
||||
tableName, batchPerTbl, startTime);
|
||||
generated = prepareStmtWithoutStb(
|
||||
pThreadInfo,
|
||||
tableName,
|
||||
batchPerTbl,
|
||||
insertRows, i,
|
||||
startTime);
|
||||
} else {
|
||||
generated = generateInterlaceDataWithoutStb(
|
||||
tableName, batchPerTbl,
|
||||
tableSeq,
|
||||
pThreadInfo->db_name, pstr,
|
||||
insertRows,
|
||||
startTime,
|
||||
&remainderBufLen);
|
||||
}
|
||||
generated = generateInterlaceDataWithoutStb(
|
||||
tableName, batchPerTbl,
|
||||
tableSeq,
|
||||
pThreadInfo->db_name, pstr,
|
||||
insertRows,
|
||||
startTime,
|
||||
&remainderBufLen);
|
||||
}
|
||||
|
||||
debugPrint("[%d] %s() LN%d, generated records is %d\n",
|
||||
|
@ -8932,23 +9362,29 @@ static void* syncWrite(void *sarg) {
|
|||
|
||||
setThreadName("syncWrite");
|
||||
|
||||
uint32_t interlaceRows;
|
||||
uint32_t interlaceRows = 0;
|
||||
|
||||
if (stbInfo) {
|
||||
if ((stbInfo->interlaceRows == 0)
|
||||
&& (g_args.interlace_rows > 0)) {
|
||||
interlaceRows = g_args.interlace_rows;
|
||||
} else {
|
||||
if (stbInfo->interlaceRows < stbInfo->insertRows)
|
||||
interlaceRows = stbInfo->interlaceRows;
|
||||
}
|
||||
} else {
|
||||
interlaceRows = g_args.interlace_rows;
|
||||
if (g_args.interlaceRows < g_args.insertRows)
|
||||
interlaceRows = g_args.interlaceRows;
|
||||
}
|
||||
|
||||
if (interlaceRows > 0) {
|
||||
// interlace mode
|
||||
return syncWriteInterlace(pThreadInfo);
|
||||
} else {
|
||||
if (((stbInfo) && (STMT_IFACE == stbInfo->iface))
|
||||
|| (STMT_IFACE == g_args.iface)) {
|
||||
#if STMT_BIND_PARAM_BATCH == 1
|
||||
return syncWriteInterlaceStmtBatch(pThreadInfo, interlaceRows);
|
||||
#else
|
||||
return syncWriteInterlaceStmt(pThreadInfo, interlaceRows);
|
||||
#endif
|
||||
} else {
|
||||
return syncWriteInterlace(pThreadInfo, interlaceRows);
|
||||
}
|
||||
}else {
|
||||
// progressive mode
|
||||
return syncWriteProgressive(pThreadInfo);
|
||||
}
|
||||
|
@ -9231,22 +9667,25 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
|||
assert(stmtBuffer);
|
||||
|
||||
#if STMT_BIND_PARAM_BATCH == 1
|
||||
uint32_t interlaceRows;
|
||||
uint32_t interlaceRows = 0;
|
||||
uint32_t batch;
|
||||
|
||||
if (stbInfo) {
|
||||
if ((stbInfo->interlaceRows == 0)
|
||||
&& (g_args.interlace_rows > 0)) {
|
||||
interlaceRows = g_args.interlace_rows;
|
||||
&& (g_args.interlaceRows > 0)
|
||||
) {
|
||||
interlaceRows = g_args.interlaceRows;
|
||||
|
||||
if (interlaceRows > stbInfo->insertRows) {
|
||||
interlaceRows = stbInfo->insertRows;
|
||||
}
|
||||
} else {
|
||||
interlaceRows = stbInfo->interlaceRows;
|
||||
}
|
||||
|
||||
if (interlaceRows > stbInfo->insertRows) {
|
||||
interlaceRows = 0;
|
||||
}
|
||||
} else {
|
||||
interlaceRows = g_args.interlace_rows;
|
||||
if (g_args.interlaceRows < g_args.insertRows)
|
||||
interlaceRows = g_args.interlaceRows;
|
||||
}
|
||||
|
||||
if (interlaceRows > 0) {
|
||||
|
@ -9408,13 +9847,12 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
|||
taos_stmt_close(pThreadInfo->stmt);
|
||||
}
|
||||
|
||||
#if STMT_BIND_PARAM_BATCH == 1
|
||||
tmfree((char *)pThreadInfo->bind_ts);
|
||||
#if STMT_BIND_PARAM_BATCH == 1
|
||||
tmfree((char *)pThreadInfo->bind_ts_array);
|
||||
tmfree(pThreadInfo->bindParams);
|
||||
tmfree(pThreadInfo->is_null);
|
||||
#else
|
||||
tmfree((char *)pThreadInfo->bind_ts);
|
||||
if (pThreadInfo->sampleBindArray) {
|
||||
for (int k = 0; k < MAX_SAMPLES; k++) {
|
||||
uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)(
|
||||
|
|
|
@ -41,7 +41,7 @@
|
|||
"batch_create_tbl_num": 10,
|
||||
"data_source": "rand",
|
||||
"insert_mode": "taosc",
|
||||
"insert_rows": 1000,
|
||||
"insert_rows": 1001,
|
||||
"childtable_limit": 0,
|
||||
"childtable_offset":0,
|
||||
"multi_thread_write_one_tbl": "no",
|
||||
|
|
Loading…
Reference in New Issue