Hotfix/sangshuduo/td 5300 taosdemo stmt print (#6918)
* [TD-5300]<fix>: taosdemo stmt debug print. * fix default iface is unknown.
This commit is contained in:
parent
0953a60676
commit
b0bdd7c2c6
|
@ -5225,63 +5225,69 @@ static int32_t generateStbDataTail(
|
|||
int64_t remainderBufLen, int64_t insertRows,
|
||||
uint64_t recordFrom, int64_t startTime,
|
||||
int64_t *pSamplePos, int64_t *dataLen) {
|
||||
uint64_t len = 0;
|
||||
uint64_t len = 0;
|
||||
|
||||
char *pstr = buffer;
|
||||
char *pstr = buffer;
|
||||
|
||||
bool tsRand;
|
||||
if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
|
||||
tsRand = true;
|
||||
} else {
|
||||
tsRand = false;
|
||||
}
|
||||
verbosePrint("%s() LN%d batch=%u buflen=%"PRId64"\n",
|
||||
__func__, __LINE__, batch, remainderBufLen);
|
||||
|
||||
int32_t k;
|
||||
for (k = 0; k < batch;) {
|
||||
char data[MAX_DATA_SIZE];
|
||||
memset(data, 0, MAX_DATA_SIZE);
|
||||
|
||||
int64_t lenOfRow = 0;
|
||||
|
||||
if (tsRand) {
|
||||
lenOfRow = generateStbRowData(superTblInfo, data,
|
||||
startTime + getTSRandTail(
|
||||
superTblInfo->timeStampStep, k,
|
||||
superTblInfo->disorderRatio,
|
||||
superTblInfo->disorderRange)
|
||||
);
|
||||
bool tsRand;
|
||||
if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
|
||||
tsRand = true;
|
||||
} else {
|
||||
lenOfRow = getRowDataFromSample(
|
||||
data,
|
||||
(remainderBufLen < MAX_DATA_SIZE)?remainderBufLen:MAX_DATA_SIZE,
|
||||
startTime + superTblInfo->timeStampStep * k,
|
||||
superTblInfo,
|
||||
pSamplePos);
|
||||
tsRand = false;
|
||||
}
|
||||
verbosePrint("%s() LN%d batch=%u buflen=%"PRId64"\n",
|
||||
__func__, __LINE__, batch, remainderBufLen);
|
||||
|
||||
int32_t k;
|
||||
for (k = 0; k < batch;) {
|
||||
char data[MAX_DATA_SIZE];
|
||||
memset(data, 0, MAX_DATA_SIZE);
|
||||
|
||||
int64_t lenOfRow = 0;
|
||||
|
||||
if (tsRand) {
|
||||
if (superTblInfo->disorderRatio > 0) {
|
||||
lenOfRow = generateStbRowData(superTblInfo, data,
|
||||
startTime + getTSRandTail(
|
||||
superTblInfo->timeStampStep, k,
|
||||
superTblInfo->disorderRatio,
|
||||
superTblInfo->disorderRange)
|
||||
);
|
||||
} else {
|
||||
lenOfRow = generateStbRowData(superTblInfo, data,
|
||||
startTime + superTblInfo->timeStampStep * k
|
||||
);
|
||||
}
|
||||
} else {
|
||||
lenOfRow = getRowDataFromSample(
|
||||
data,
|
||||
(remainderBufLen < MAX_DATA_SIZE)?remainderBufLen:MAX_DATA_SIZE,
|
||||
startTime + superTblInfo->timeStampStep * k,
|
||||
superTblInfo,
|
||||
pSamplePos);
|
||||
}
|
||||
|
||||
if ((lenOfRow + 1) > remainderBufLen) {
|
||||
break;
|
||||
}
|
||||
|
||||
pstr += snprintf(pstr , lenOfRow + 1, "%s", data);
|
||||
k++;
|
||||
len += lenOfRow;
|
||||
remainderBufLen -= lenOfRow;
|
||||
|
||||
verbosePrint("%s() LN%d len=%"PRIu64" k=%u \nbuffer=%s\n",
|
||||
__func__, __LINE__, len, k, buffer);
|
||||
|
||||
recordFrom ++;
|
||||
|
||||
if (recordFrom >= insertRows) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ((lenOfRow + 1) > remainderBufLen) {
|
||||
break;
|
||||
}
|
||||
|
||||
pstr += snprintf(pstr , lenOfRow + 1, "%s", data);
|
||||
k++;
|
||||
len += lenOfRow;
|
||||
remainderBufLen -= lenOfRow;
|
||||
|
||||
verbosePrint("%s() LN%d len=%"PRIu64" k=%u \nbuffer=%s\n",
|
||||
__func__, __LINE__, len, k, buffer);
|
||||
|
||||
recordFrom ++;
|
||||
|
||||
if (recordFrom >= insertRows) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
*dataLen = len;
|
||||
return k;
|
||||
*dataLen = len;
|
||||
return k;
|
||||
}
|
||||
|
||||
|
||||
|
@ -5378,52 +5384,52 @@ static int32_t generateStbInterlaceData(
|
|||
int64_t startTime,
|
||||
uint64_t *pRemainderBufLen)
|
||||
{
|
||||
assert(buffer);
|
||||
char *pstr = buffer;
|
||||
assert(buffer);
|
||||
char *pstr = buffer;
|
||||
|
||||
int headLen = generateStbSQLHead(
|
||||
superTblInfo,
|
||||
tableName, tableSeq, pThreadInfo->db_name,
|
||||
pstr, *pRemainderBufLen);
|
||||
int headLen = generateStbSQLHead(
|
||||
superTblInfo,
|
||||
tableName, tableSeq, pThreadInfo->db_name,
|
||||
pstr, *pRemainderBufLen);
|
||||
|
||||
if (headLen <= 0) {
|
||||
return 0;
|
||||
}
|
||||
// generate data buffer
|
||||
verbosePrint("[%d] %s() LN%d i=%"PRIu64" buffer:\n%s\n",
|
||||
if (headLen <= 0) {
|
||||
return 0;
|
||||
}
|
||||
// generate data buffer
|
||||
verbosePrint("[%d] %s() LN%d i=%"PRIu64" buffer:\n%s\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__, i, buffer);
|
||||
|
||||
pstr += headLen;
|
||||
*pRemainderBufLen -= headLen;
|
||||
pstr += headLen;
|
||||
*pRemainderBufLen -= headLen;
|
||||
|
||||
int64_t dataLen = 0;
|
||||
int64_t dataLen = 0;
|
||||
|
||||
verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%u batchPerTbl = %u\n",
|
||||
verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%u batchPerTbl = %u\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
i, batchPerTblTimes, batchPerTbl);
|
||||
|
||||
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
|
||||
startTime = taosGetTimestamp(pThreadInfo->time_precision);
|
||||
}
|
||||
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
|
||||
startTime = taosGetTimestamp(pThreadInfo->time_precision);
|
||||
}
|
||||
|
||||
int32_t k = generateStbDataTail(
|
||||
int32_t k = generateStbDataTail(
|
||||
superTblInfo,
|
||||
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
|
||||
startTime,
|
||||
&(pThreadInfo->samplePos), &dataLen);
|
||||
|
||||
if (k == batchPerTbl) {
|
||||
pstr += dataLen;
|
||||
*pRemainderBufLen -= dataLen;
|
||||
} else {
|
||||
debugPrint("%s() LN%d, generated data tail: %u, not equal batch per table: %u\n",
|
||||
__func__, __LINE__, k, batchPerTbl);
|
||||
pstr -= headLen;
|
||||
pstr[0] = '\0';
|
||||
k = 0;
|
||||
}
|
||||
if (k == batchPerTbl) {
|
||||
pstr += dataLen;
|
||||
*pRemainderBufLen -= dataLen;
|
||||
} else {
|
||||
debugPrint("%s() LN%d, generated data tail: %u, not equal batch per table: %u\n",
|
||||
__func__, __LINE__, k, batchPerTbl);
|
||||
pstr -= headLen;
|
||||
pstr[0] = '\0';
|
||||
k = 0;
|
||||
}
|
||||
|
||||
return k;
|
||||
return k;
|
||||
}
|
||||
|
||||
static int64_t generateInterlaceDataWithoutStb(
|
||||
|
@ -5871,7 +5877,7 @@ static int32_t prepareStbStmtInterlace(
|
|||
stbInfo,
|
||||
stmt,
|
||||
tableName,
|
||||
g_args.num_of_RPR,
|
||||
batch,
|
||||
insertRows, 0, startTime,
|
||||
pSamplePos);
|
||||
}
|
||||
|
@ -5905,29 +5911,29 @@ static int32_t generateStbProgressiveData(
|
|||
uint64_t recordFrom, int64_t startTime, int64_t *pSamplePos,
|
||||
int64_t *pRemainderBufLen)
|
||||
{
|
||||
assert(buffer != NULL);
|
||||
char *pstr = buffer;
|
||||
assert(buffer != NULL);
|
||||
char *pstr = buffer;
|
||||
|
||||
memset(buffer, 0, *pRemainderBufLen);
|
||||
memset(buffer, 0, *pRemainderBufLen);
|
||||
|
||||
int64_t headLen = generateStbSQLHead(
|
||||
superTblInfo,
|
||||
tableName, tableSeq, dbName,
|
||||
buffer, *pRemainderBufLen);
|
||||
int64_t headLen = generateStbSQLHead(
|
||||
superTblInfo,
|
||||
tableName, tableSeq, dbName,
|
||||
buffer, *pRemainderBufLen);
|
||||
|
||||
if (headLen <= 0) {
|
||||
return 0;
|
||||
}
|
||||
pstr += headLen;
|
||||
*pRemainderBufLen -= headLen;
|
||||
if (headLen <= 0) {
|
||||
return 0;
|
||||
}
|
||||
pstr += headLen;
|
||||
*pRemainderBufLen -= headLen;
|
||||
|
||||
int64_t dataLen;
|
||||
int64_t dataLen;
|
||||
|
||||
return generateStbDataTail(superTblInfo,
|
||||
g_args.num_of_RPR, pstr, *pRemainderBufLen,
|
||||
insertRows, recordFrom,
|
||||
startTime,
|
||||
pSamplePos, &dataLen);
|
||||
return generateStbDataTail(superTblInfo,
|
||||
g_args.num_of_RPR, pstr, *pRemainderBufLen,
|
||||
insertRows, recordFrom,
|
||||
startTime,
|
||||
pSamplePos, &dataLen);
|
||||
}
|
||||
|
||||
static int32_t generateProgressiveDataWithoutStb(
|
||||
|
@ -5974,283 +5980,283 @@ static void printStatPerThread(threadInfo *pThreadInfo)
|
|||
|
||||
// sync write interlace data
|
||||
static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||
debugPrint("[%d] %s() LN%d: ### interlace write\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__);
|
||||
|
||||
int64_t insertRows;
|
||||
uint32_t interlaceRows;
|
||||
uint64_t maxSqlLen;
|
||||
int64_t nTimeStampStep;
|
||||
uint64_t insert_interval;
|
||||
|
||||
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
||||
|
||||
if (superTblInfo) {
|
||||
insertRows = superTblInfo->insertRows;
|
||||
|
||||
if ((superTblInfo->interlaceRows == 0)
|
||||
&& (g_args.interlace_rows > 0)) {
|
||||
interlaceRows = g_args.interlace_rows;
|
||||
} else {
|
||||
interlaceRows = superTblInfo->interlaceRows;
|
||||
}
|
||||
maxSqlLen = superTblInfo->maxSqlLen;
|
||||
nTimeStampStep = superTblInfo->timeStampStep;
|
||||
insert_interval = superTblInfo->insertInterval;
|
||||
} else {
|
||||
insertRows = g_args.num_of_DPT;
|
||||
interlaceRows = g_args.interlace_rows;
|
||||
maxSqlLen = g_args.max_sql_len;
|
||||
nTimeStampStep = DEFAULT_TIMESTAMP_STEP;
|
||||
insert_interval = g_args.insert_interval;
|
||||
}
|
||||
|
||||
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
pThreadInfo->start_table_from,
|
||||
pThreadInfo->ntables, insertRows);
|
||||
|
||||
if (interlaceRows > insertRows)
|
||||
interlaceRows = insertRows;
|
||||
|
||||
if (interlaceRows > g_args.num_of_RPR)
|
||||
interlaceRows = g_args.num_of_RPR;
|
||||
|
||||
uint32_t batchPerTbl = interlaceRows;
|
||||
uint32_t batchPerTblTimes;
|
||||
|
||||
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
|
||||
batchPerTblTimes =
|
||||
g_args.num_of_RPR / interlaceRows;
|
||||
} else {
|
||||
batchPerTblTimes = 1;
|
||||
}
|
||||
|
||||
pThreadInfo->buffer = calloc(maxSqlLen, 1);
|
||||
if (NULL == pThreadInfo->buffer) {
|
||||
errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n",
|
||||
__func__, __LINE__, maxSqlLen, strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pThreadInfo->totalInsertRows = 0;
|
||||
pThreadInfo->totalAffectedRows = 0;
|
||||
|
||||
uint64_t st = 0;
|
||||
uint64_t et = UINT64_MAX;
|
||||
|
||||
uint64_t lastPrintTime = taosGetTimestampMs();
|
||||
uint64_t startTs = taosGetTimestampMs();
|
||||
uint64_t endTs;
|
||||
|
||||
uint64_t tableSeq = pThreadInfo->start_table_from;
|
||||
int64_t startTime = pThreadInfo->start_time;
|
||||
|
||||
uint64_t generatedRecPerTbl = 0;
|
||||
bool flagSleep = true;
|
||||
uint64_t sleepTimeTotal = 0;
|
||||
|
||||
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
|
||||
if ((flagSleep) && (insert_interval)) {
|
||||
st = taosGetTimestampMs();
|
||||
flagSleep = false;
|
||||
}
|
||||
// generate data
|
||||
memset(pThreadInfo->buffer, 0, maxSqlLen);
|
||||
uint64_t remainderBufLen = maxSqlLen;
|
||||
|
||||
char *pstr = pThreadInfo->buffer;
|
||||
|
||||
int len = snprintf(pstr,
|
||||
strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO);
|
||||
pstr += len;
|
||||
remainderBufLen -= len;
|
||||
|
||||
uint32_t recOfBatch = 0;
|
||||
|
||||
for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
|
||||
char tableName[TSDB_TABLE_NAME_LEN];
|
||||
|
||||
getTableName(tableName, pThreadInfo, tableSeq);
|
||||
if (0 == strlen(tableName)) {
|
||||
errorPrint("[%d] %s() LN%d, getTableName return null\n",
|
||||
debugPrint("[%d] %s() LN%d: ### interlace write\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__);
|
||||
free(pThreadInfo->buffer);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
uint64_t oldRemainderLen = remainderBufLen;
|
||||
int64_t insertRows;
|
||||
uint32_t interlaceRows;
|
||||
uint64_t maxSqlLen;
|
||||
int64_t nTimeStampStep;
|
||||
uint64_t insert_interval;
|
||||
|
||||
int32_t generated;
|
||||
if (superTblInfo) {
|
||||
if (superTblInfo->iface == STMT_IFACE) {
|
||||
#if STMT_IFACE_ENABLED == 1
|
||||
generated = prepareStbStmtInterlace(
|
||||
superTblInfo,
|
||||
pThreadInfo->stmt,
|
||||
tableName,
|
||||
batchPerTbl,
|
||||
insertRows, i,
|
||||
startTime,
|
||||
&(pThreadInfo->samplePos));
|
||||
#else
|
||||
generated = -1;
|
||||
#endif
|
||||
} else {
|
||||
generated = generateStbInterlaceData(
|
||||
superTblInfo,
|
||||
tableName, batchPerTbl, i,
|
||||
batchPerTblTimes,
|
||||
tableSeq,
|
||||
pThreadInfo, pstr,
|
||||
insertRows,
|
||||
startTime,
|
||||
&remainderBufLen);
|
||||
}
|
||||
} else {
|
||||
if (g_args.iface == STMT_IFACE) {
|
||||
debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n",
|
||||
pThreadInfo->threadID,
|
||||
__func__, __LINE__,
|
||||
tableName, batchPerTbl, startTime);
|
||||
#if STMT_IFACE_ENABLED == 1
|
||||
generated = prepareStmtWithoutStb(
|
||||
pThreadInfo->stmt, tableName,
|
||||
batchPerTbl,
|
||||
insertRows, i,
|
||||
startTime);
|
||||
#else
|
||||
generated = -1;
|
||||
#endif
|
||||
} else {
|
||||
generated = generateInterlaceDataWithoutStb(
|
||||
tableName, batchPerTbl,
|
||||
tableSeq,
|
||||
pThreadInfo->db_name, pstr,
|
||||
insertRows,
|
||||
startTime,
|
||||
&remainderBufLen);
|
||||
}
|
||||
}
|
||||
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
||||
|
||||
debugPrint("[%d] %s() LN%d, generated records is %d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__, generated);
|
||||
if (generated < 0) {
|
||||
errorPrint("[%d] %s() LN%d, generated records is %d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__, generated);
|
||||
goto free_of_interlace;
|
||||
} else if (generated == 0) {
|
||||
break;
|
||||
}
|
||||
if (superTblInfo) {
|
||||
insertRows = superTblInfo->insertRows;
|
||||
|
||||
tableSeq ++;
|
||||
recOfBatch += batchPerTbl;
|
||||
|
||||
pstr += (oldRemainderLen - remainderBufLen);
|
||||
pThreadInfo->totalInsertRows += batchPerTbl;
|
||||
|
||||
verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
batchPerTbl, recOfBatch);
|
||||
|
||||
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
|
||||
// turn to first table
|
||||
tableSeq = pThreadInfo->start_table_from;
|
||||
generatedRecPerTbl += batchPerTbl;
|
||||
|
||||
startTime = pThreadInfo->start_time
|
||||
+ generatedRecPerTbl * nTimeStampStep;
|
||||
|
||||
flagSleep = true;
|
||||
if (generatedRecPerTbl >= insertRows)
|
||||
break;
|
||||
|
||||
int64_t remainRows = insertRows - generatedRecPerTbl;
|
||||
if ((remainRows > 0) && (batchPerTbl > remainRows))
|
||||
batchPerTbl = remainRows;
|
||||
|
||||
if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR)
|
||||
break;
|
||||
}
|
||||
|
||||
verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
generatedRecPerTbl, insertRows);
|
||||
|
||||
if ((g_args.num_of_RPR - recOfBatch) < batchPerTbl)
|
||||
break;
|
||||
}
|
||||
|
||||
verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
|
||||
pThreadInfo->totalInsertRows);
|
||||
verbosePrint("[%d] %s() LN%d, buffer=%s\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer);
|
||||
|
||||
startTs = taosGetTimestampMs();
|
||||
|
||||
if (recOfBatch == 0) {
|
||||
errorPrint("[%d] %s() LN%d Failed to insert records of batch %d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
batchPerTbl);
|
||||
if (batchPerTbl > 0) {
|
||||
errorPrint("\tIf the batch is %d, the length of the SQL to insert a row must be less then %"PRId64"\n",
|
||||
batchPerTbl, maxSqlLen / batchPerTbl);
|
||||
if ((superTblInfo->interlaceRows == 0)
|
||||
&& (g_args.interlace_rows > 0)) {
|
||||
interlaceRows = g_args.interlace_rows;
|
||||
} else {
|
||||
interlaceRows = superTblInfo->interlaceRows;
|
||||
}
|
||||
errorPrint("\tPlease check if the buffer length(%"PRId64") or batch(%d) is set with proper value!\n",
|
||||
maxSqlLen, batchPerTbl);
|
||||
goto free_of_interlace;
|
||||
}
|
||||
int64_t affectedRows = execInsert(pThreadInfo, recOfBatch);
|
||||
|
||||
endTs = taosGetTimestampMs();
|
||||
uint64_t delay = endTs - startTs;
|
||||
performancePrint("%s() LN%d, insert execution time is %"PRIu64"ms\n",
|
||||
__func__, __LINE__, delay);
|
||||
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
|
||||
pThreadInfo->threadID,
|
||||
__func__, __LINE__, affectedRows);
|
||||
|
||||
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
|
||||
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
|
||||
pThreadInfo->cntDelay++;
|
||||
pThreadInfo->totalDelay += delay;
|
||||
|
||||
if (recOfBatch != affectedRows) {
|
||||
errorPrint("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n%s\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
recOfBatch, affectedRows, pThreadInfo->buffer);
|
||||
goto free_of_interlace;
|
||||
maxSqlLen = superTblInfo->maxSqlLen;
|
||||
nTimeStampStep = superTblInfo->timeStampStep;
|
||||
insert_interval = superTblInfo->insertInterval;
|
||||
} else {
|
||||
insertRows = g_args.num_of_DPT;
|
||||
interlaceRows = g_args.interlace_rows;
|
||||
maxSqlLen = g_args.max_sql_len;
|
||||
nTimeStampStep = DEFAULT_TIMESTAMP_STEP;
|
||||
insert_interval = g_args.insert_interval;
|
||||
}
|
||||
|
||||
pThreadInfo->totalAffectedRows += affectedRows;
|
||||
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
pThreadInfo->start_table_from,
|
||||
pThreadInfo->ntables, insertRows);
|
||||
|
||||
int64_t currentPrintTime = taosGetTimestampMs();
|
||||
if (currentPrintTime - lastPrintTime > 30*1000) {
|
||||
printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n",
|
||||
if (interlaceRows > insertRows)
|
||||
interlaceRows = insertRows;
|
||||
|
||||
if (interlaceRows > g_args.num_of_RPR)
|
||||
interlaceRows = g_args.num_of_RPR;
|
||||
|
||||
uint32_t batchPerTbl = interlaceRows;
|
||||
uint32_t batchPerTblTimes;
|
||||
|
||||
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
|
||||
batchPerTblTimes =
|
||||
g_args.num_of_RPR / interlaceRows;
|
||||
} else {
|
||||
batchPerTblTimes = 1;
|
||||
}
|
||||
|
||||
pThreadInfo->buffer = calloc(maxSqlLen, 1);
|
||||
if (NULL == pThreadInfo->buffer) {
|
||||
errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n",
|
||||
__func__, __LINE__, maxSqlLen, strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pThreadInfo->totalInsertRows = 0;
|
||||
pThreadInfo->totalAffectedRows = 0;
|
||||
|
||||
uint64_t st = 0;
|
||||
uint64_t et = UINT64_MAX;
|
||||
|
||||
uint64_t lastPrintTime = taosGetTimestampMs();
|
||||
uint64_t startTs = taosGetTimestampMs();
|
||||
uint64_t endTs;
|
||||
|
||||
uint64_t tableSeq = pThreadInfo->start_table_from;
|
||||
int64_t startTime = pThreadInfo->start_time;
|
||||
|
||||
uint64_t generatedRecPerTbl = 0;
|
||||
bool flagSleep = true;
|
||||
uint64_t sleepTimeTotal = 0;
|
||||
|
||||
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
|
||||
if ((flagSleep) && (insert_interval)) {
|
||||
st = taosGetTimestampMs();
|
||||
flagSleep = false;
|
||||
}
|
||||
// generate data
|
||||
memset(pThreadInfo->buffer, 0, maxSqlLen);
|
||||
uint64_t remainderBufLen = maxSqlLen;
|
||||
|
||||
char *pstr = pThreadInfo->buffer;
|
||||
|
||||
int len = snprintf(pstr,
|
||||
strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO);
|
||||
pstr += len;
|
||||
remainderBufLen -= len;
|
||||
|
||||
uint32_t recOfBatch = 0;
|
||||
|
||||
for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
|
||||
char tableName[TSDB_TABLE_NAME_LEN];
|
||||
|
||||
getTableName(tableName, pThreadInfo, tableSeq);
|
||||
if (0 == strlen(tableName)) {
|
||||
errorPrint("[%d] %s() LN%d, getTableName return null\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__);
|
||||
free(pThreadInfo->buffer);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
uint64_t oldRemainderLen = remainderBufLen;
|
||||
|
||||
int32_t generated;
|
||||
if (superTblInfo) {
|
||||
if (superTblInfo->iface == STMT_IFACE) {
|
||||
#if STMT_IFACE_ENABLED == 1
|
||||
generated = prepareStbStmtInterlace(
|
||||
superTblInfo,
|
||||
pThreadInfo->stmt,
|
||||
tableName,
|
||||
batchPerTbl,
|
||||
insertRows, i,
|
||||
startTime,
|
||||
&(pThreadInfo->samplePos));
|
||||
#else
|
||||
generated = -1;
|
||||
#endif
|
||||
} else {
|
||||
generated = generateStbInterlaceData(
|
||||
superTblInfo,
|
||||
tableName, batchPerTbl, i,
|
||||
batchPerTblTimes,
|
||||
tableSeq,
|
||||
pThreadInfo, pstr,
|
||||
insertRows,
|
||||
startTime,
|
||||
&remainderBufLen);
|
||||
}
|
||||
} else {
|
||||
if (g_args.iface == STMT_IFACE) {
|
||||
debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n",
|
||||
pThreadInfo->threadID,
|
||||
__func__, __LINE__,
|
||||
tableName, batchPerTbl, startTime);
|
||||
#if STMT_IFACE_ENABLED == 1
|
||||
generated = prepareStmtWithoutStb(
|
||||
pThreadInfo->stmt, tableName,
|
||||
batchPerTbl,
|
||||
insertRows, i,
|
||||
startTime);
|
||||
#else
|
||||
generated = -1;
|
||||
#endif
|
||||
} else {
|
||||
generated = generateInterlaceDataWithoutStb(
|
||||
tableName, batchPerTbl,
|
||||
tableSeq,
|
||||
pThreadInfo->db_name, pstr,
|
||||
insertRows,
|
||||
startTime,
|
||||
&remainderBufLen);
|
||||
}
|
||||
}
|
||||
|
||||
debugPrint("[%d] %s() LN%d, generated records is %d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__, generated);
|
||||
if (generated < 0) {
|
||||
errorPrint("[%d] %s() LN%d, generated records is %d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__, generated);
|
||||
goto free_of_interlace;
|
||||
} else if (generated == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
tableSeq ++;
|
||||
recOfBatch += batchPerTbl;
|
||||
|
||||
pstr += (oldRemainderLen - remainderBufLen);
|
||||
pThreadInfo->totalInsertRows += batchPerTbl;
|
||||
|
||||
verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
batchPerTbl, recOfBatch);
|
||||
|
||||
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
|
||||
// turn to first table
|
||||
tableSeq = pThreadInfo->start_table_from;
|
||||
generatedRecPerTbl += batchPerTbl;
|
||||
|
||||
startTime = pThreadInfo->start_time
|
||||
+ generatedRecPerTbl * nTimeStampStep;
|
||||
|
||||
flagSleep = true;
|
||||
if (generatedRecPerTbl >= insertRows)
|
||||
break;
|
||||
|
||||
int64_t remainRows = insertRows - generatedRecPerTbl;
|
||||
if ((remainRows > 0) && (batchPerTbl > remainRows))
|
||||
batchPerTbl = remainRows;
|
||||
|
||||
if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR)
|
||||
break;
|
||||
}
|
||||
|
||||
verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
generatedRecPerTbl, insertRows);
|
||||
|
||||
if ((g_args.num_of_RPR - recOfBatch) < batchPerTbl)
|
||||
break;
|
||||
}
|
||||
|
||||
verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
|
||||
pThreadInfo->totalInsertRows);
|
||||
verbosePrint("[%d] %s() LN%d, buffer=%s\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer);
|
||||
|
||||
startTs = taosGetTimestampMs();
|
||||
|
||||
if (recOfBatch == 0) {
|
||||
errorPrint("[%d] %s() LN%d Failed to insert records of batch %d\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
batchPerTbl);
|
||||
if (batchPerTbl > 0) {
|
||||
errorPrint("\tIf the batch is %d, the length of the SQL to insert a row must be less then %"PRId64"\n",
|
||||
batchPerTbl, maxSqlLen / batchPerTbl);
|
||||
}
|
||||
errorPrint("\tPlease check if the buffer length(%"PRId64") or batch(%d) is set with proper value!\n",
|
||||
maxSqlLen, batchPerTbl);
|
||||
goto free_of_interlace;
|
||||
}
|
||||
int64_t affectedRows = execInsert(pThreadInfo, recOfBatch);
|
||||
|
||||
endTs = taosGetTimestampMs();
|
||||
uint64_t delay = endTs - startTs;
|
||||
performancePrint("%s() LN%d, insert execution time is %"PRIu64"ms\n",
|
||||
__func__, __LINE__, delay);
|
||||
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
|
||||
pThreadInfo->threadID,
|
||||
__func__, __LINE__, affectedRows);
|
||||
|
||||
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
|
||||
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
|
||||
pThreadInfo->cntDelay++;
|
||||
pThreadInfo->totalDelay += delay;
|
||||
|
||||
if (recOfBatch != affectedRows) {
|
||||
errorPrint("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n%s\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
recOfBatch, affectedRows, pThreadInfo->buffer);
|
||||
goto free_of_interlace;
|
||||
}
|
||||
|
||||
pThreadInfo->totalAffectedRows += affectedRows;
|
||||
|
||||
int64_t currentPrintTime = taosGetTimestampMs();
|
||||
if (currentPrintTime - lastPrintTime > 30*1000) {
|
||||
printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n",
|
||||
pThreadInfo->threadID,
|
||||
pThreadInfo->totalInsertRows,
|
||||
pThreadInfo->totalAffectedRows);
|
||||
lastPrintTime = currentPrintTime;
|
||||
}
|
||||
lastPrintTime = currentPrintTime;
|
||||
}
|
||||
|
||||
if ((insert_interval) && flagSleep) {
|
||||
et = taosGetTimestampMs();
|
||||
if ((insert_interval) && flagSleep) {
|
||||
et = taosGetTimestampMs();
|
||||
|
||||
if (insert_interval > (et - st) ) {
|
||||
uint64_t sleepTime = insert_interval - (et -st);
|
||||
performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n",
|
||||
__func__, __LINE__, sleepTime);
|
||||
taosMsleep(sleepTime); // ms
|
||||
sleepTimeTotal += insert_interval;
|
||||
}
|
||||
if (insert_interval > (et - st) ) {
|
||||
uint64_t sleepTime = insert_interval - (et -st);
|
||||
performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n",
|
||||
__func__, __LINE__, sleepTime);
|
||||
taosMsleep(sleepTime); // ms
|
||||
sleepTimeTotal += insert_interval;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
free_of_interlace:
|
||||
tmfree(pThreadInfo->buffer);
|
||||
printStatPerThread(pThreadInfo);
|
||||
return NULL;
|
||||
tmfree(pThreadInfo->buffer);
|
||||
printStatPerThread(pThreadInfo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// sync insertion progressive data
|
||||
|
@ -6417,29 +6423,29 @@ free_of_progressive:
|
|||
|
||||
static void* syncWrite(void *sarg) {
|
||||
|
||||
threadInfo *pThreadInfo = (threadInfo *)sarg;
|
||||
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
||||
threadInfo *pThreadInfo = (threadInfo *)sarg;
|
||||
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
||||
|
||||
uint32_t interlaceRows;
|
||||
uint32_t interlaceRows;
|
||||
|
||||
if (superTblInfo) {
|
||||
if ((superTblInfo->interlaceRows == 0)
|
||||
&& (g_args.interlace_rows > 0)) {
|
||||
interlaceRows = g_args.interlace_rows;
|
||||
if (superTblInfo) {
|
||||
if ((superTblInfo->interlaceRows == 0)
|
||||
&& (g_args.interlace_rows > 0)) {
|
||||
interlaceRows = g_args.interlace_rows;
|
||||
} else {
|
||||
interlaceRows = superTblInfo->interlaceRows;
|
||||
}
|
||||
} else {
|
||||
interlaceRows = superTblInfo->interlaceRows;
|
||||
interlaceRows = g_args.interlace_rows;
|
||||
}
|
||||
} else {
|
||||
interlaceRows = g_args.interlace_rows;
|
||||
}
|
||||
|
||||
if (interlaceRows > 0) {
|
||||
// interlace mode
|
||||
return syncWriteInterlace(pThreadInfo);
|
||||
} else {
|
||||
// progressive mode
|
||||
return syncWriteProgressive(pThreadInfo);
|
||||
}
|
||||
if (interlaceRows > 0) {
|
||||
// interlace mode
|
||||
return syncWriteInterlace(pThreadInfo);
|
||||
} else {
|
||||
// progressive mode
|
||||
return syncWriteProgressive(pThreadInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static void callBack(void *param, TAOS_RES *res, int code) {
|
||||
|
|
Loading…
Reference in New Issue