Hotfix/sangshuduo/td 3733 taosdemo buffer processing (#5769)
* [TD-3733]<fix>: taosdemo buffer processing refactor. * [TD-3733]<fix>: taosdemo generate SQL head buffer process. * [TD-3733]<fix>: taosdemo generate sql head buffer processing. fix compile issue for windows. * [TD-3733]<fix>: taosdemo generate sql head buffer processing. fix max_sql_len. generate data buffer refactor. * resolve conflict with master. Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
This commit is contained in:
parent
d6ec9c6700
commit
5e43fc0d24
|
@ -146,17 +146,6 @@ typedef struct SInternalField {
|
|||
SExprFilter *pFieldFilters;
|
||||
} SInternalField;
|
||||
|
||||
typedef struct SFieldInfo {
|
||||
int16_t numOfOutput; // number of column in result
|
||||
SArray *internalField; // SArray<SInternalField>
|
||||
} SFieldInfo;
|
||||
|
||||
typedef struct SColumn {
|
||||
SColumnIndex colIndex;
|
||||
int32_t numOfFilters;
|
||||
SColumnFilterInfo *filterInfo;
|
||||
} SColumn;
|
||||
|
||||
typedef struct SCond {
|
||||
uint64_t uid;
|
||||
int32_t len; // length of tag query condition data
|
||||
|
|
|
@ -3211,9 +3211,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
|
||||
// rows per table need be less than insert batch
|
||||
if (g_args.interlace_rows > g_args.num_of_RPR) {
|
||||
printf("NOTICE: interlace rows value %d > num_of_records_per_request %d\n\n",
|
||||
printf("NOTICE: interlace rows value %d > num_of_records_per_req %d\n\n",
|
||||
g_args.interlace_rows, g_args.num_of_RPR);
|
||||
printf(" interlace rows value will be set to num_of_records_per_request %d\n\n",
|
||||
printf(" interlace rows value will be set to num_of_records_per_req %d\n\n",
|
||||
g_args.num_of_RPR);
|
||||
printf(" press Enter key to continue or Ctrl-C to stop.");
|
||||
(void)getchar();
|
||||
|
@ -3682,14 +3682,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
int32_t len = maxSqlLen->valueint;
|
||||
if (len > TSDB_MAX_ALLOWED_SQL_LEN) {
|
||||
len = TSDB_MAX_ALLOWED_SQL_LEN;
|
||||
} else if (len < TSDB_MAX_SQL_LEN) {
|
||||
len = TSDB_MAX_SQL_LEN;
|
||||
} else if (len < 5) {
|
||||
len = 5;
|
||||
}
|
||||
g_Dbs.db[i].superTbls[j].maxSqlLen = len;
|
||||
} else if (!maxSqlLen) {
|
||||
g_Dbs.db[i].superTbls[j].maxSqlLen = g_args.max_sql_len;
|
||||
} else {
|
||||
printf("ERROR: failed to read json, maxSqlLen not found\n");
|
||||
errorPrint("%s() LN%d, failed to read json, maxSqlLen input mistake\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -3715,9 +3716,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
g_Dbs.db[i].superTbls[j].interlaceRows = interlaceRows->valueint;
|
||||
// rows per table need be less than insert batch
|
||||
if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) {
|
||||
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %d > num_of_records_per_request %d\n\n",
|
||||
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %d > num_of_records_per_req %d\n\n",
|
||||
i, j, g_Dbs.db[i].superTbls[j].interlaceRows, g_args.num_of_RPR);
|
||||
printf(" interlace rows value will be set to num_of_records_per_request %d\n\n",
|
||||
printf(" interlace rows value will be set to num_of_records_per_req %d\n\n",
|
||||
g_args.num_of_RPR);
|
||||
printf(" press Enter key to continue or Ctrl-C to stop.");
|
||||
(void)getchar();
|
||||
|
@ -4511,13 +4512,15 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq)
|
|||
}
|
||||
}
|
||||
|
||||
static int generateDataTail(char *tableName, int32_t tableSeq,
|
||||
threadInfo* pThreadInfo, SSuperTable* superTblInfo,
|
||||
static int generateDataTail(
|
||||
SSuperTable* superTblInfo,
|
||||
int batch, char* buffer, int remainderBufLen, int64_t insertRows,
|
||||
int64_t startFrom, uint64_t startTime, int *pSamplePos, int *dataLen) {
|
||||
int len = 0;
|
||||
int ncols_per_record = 1; // count first col ts
|
||||
|
||||
char *pstr = buffer;
|
||||
|
||||
if (superTblInfo == NULL) {
|
||||
int datatypeSeq = 0;
|
||||
while(g_args.datatype[datatypeSeq]) {
|
||||
|
@ -4546,15 +4549,14 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
|
|||
pSamplePos);
|
||||
} else if (0 == strncasecmp(superTblInfo->dataSource,
|
||||
"rand", strlen("rand"))) {
|
||||
int rand_num = taosRandom() % 100;
|
||||
int randTail;
|
||||
if (0 != superTblInfo->disorderRatio
|
||||
&& rand_num < superTblInfo->disorderRatio) {
|
||||
randTail = (superTblInfo->timeStampStep * k
|
||||
+ (taosRandom() % superTblInfo->disorderRange + 1)) * (-1);
|
||||
debugPrint("rand data generated, back %d\n", randTail);
|
||||
} else {
|
||||
randTail = superTblInfo->timeStampStep * k;
|
||||
|
||||
int randTail = superTblInfo->timeStampStep * k;
|
||||
if (superTblInfo->disorderRatio > 0) {
|
||||
int rand_num = taosRandom() % 100;
|
||||
if(rand_num < superTblInfo->disorderRatio) {
|
||||
randTail = (randTail + (taosRandom() % superTblInfo->disorderRange + 1)) * (-1);
|
||||
debugPrint("rand data generated, back %d\n", randTail);
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t d = startTime
|
||||
|
@ -4569,7 +4571,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
|
|||
break;
|
||||
}
|
||||
|
||||
buffer += snprintf(buffer, retLen + 1, "%s", data);
|
||||
pstr += snprintf(pstr , retLen + 1, "%s", data);
|
||||
k++;
|
||||
len += retLen;
|
||||
remainderBufLen -= retLen;
|
||||
|
@ -4597,7 +4599,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
|
|||
if (len > remainderBufLen)
|
||||
break;
|
||||
|
||||
buffer += sprintf(buffer, " %s", data);
|
||||
pstr += sprintf(pstr, " %s", data);
|
||||
k++;
|
||||
len += retLen;
|
||||
remainderBufLen -= retLen;
|
||||
|
@ -4622,6 +4624,10 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
|
|||
char *buffer, int remainderBufLen)
|
||||
{
|
||||
int len;
|
||||
|
||||
#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into ..
|
||||
char headBuf[HEAD_BUFF_LEN];
|
||||
|
||||
if (superTblInfo) {
|
||||
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
|
||||
char* tagsValBuf = NULL;
|
||||
|
@ -4638,8 +4644,9 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
|
|||
return -1;
|
||||
}
|
||||
|
||||
len = snprintf(buffer,
|
||||
superTblInfo->maxSqlLen,
|
||||
len = snprintf(
|
||||
headBuf,
|
||||
HEAD_BUFF_LEN,
|
||||
"insert into %s.%s using %s.%s tags %s values",
|
||||
pThreadInfo->db_name,
|
||||
tableName,
|
||||
|
@ -4648,26 +4655,34 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
|
|||
tagsValBuf);
|
||||
tmfree(tagsValBuf);
|
||||
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
|
||||
len = snprintf(buffer,
|
||||
superTblInfo->maxSqlLen,
|
||||
len = snprintf(
|
||||
headBuf,
|
||||
HEAD_BUFF_LEN,
|
||||
"insert into %s.%s values",
|
||||
pThreadInfo->db_name,
|
||||
tableName);
|
||||
} else {
|
||||
len = snprintf(buffer,
|
||||
superTblInfo->maxSqlLen,
|
||||
len = snprintf(
|
||||
headBuf,
|
||||
HEAD_BUFF_LEN,
|
||||
"insert into %s.%s values",
|
||||
pThreadInfo->db_name,
|
||||
tableName);
|
||||
}
|
||||
} else {
|
||||
len = snprintf(buffer,
|
||||
g_args.max_sql_len,
|
||||
len = snprintf(
|
||||
headBuf,
|
||||
HEAD_BUFF_LEN,
|
||||
"insert into %s.%s values",
|
||||
pThreadInfo->db_name,
|
||||
tableName);
|
||||
}
|
||||
|
||||
if (len > remainderBufLen)
|
||||
return -1;
|
||||
|
||||
tstrncpy(buffer, headBuf, len + 1);
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
|
@ -4709,7 +4724,7 @@ static int generateInterlaceDataBuffer(
|
|||
startTime = 1500000000000;
|
||||
}
|
||||
int k = generateDataTail(
|
||||
tableName, tableSeq, pThreadInfo, superTblInfo,
|
||||
superTblInfo,
|
||||
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
|
||||
startTime,
|
||||
&(pThreadInfo->samplePos), &dataLen);
|
||||
|
@ -4764,7 +4779,7 @@ static int generateProgressiveDataBuffer(
|
|||
remainderBufLen -= headLen;
|
||||
|
||||
int dataLen;
|
||||
k = generateDataTail(tableName, tableSeq, pThreadInfo, superTblInfo,
|
||||
k = generateDataTail(superTblInfo,
|
||||
g_args.num_of_RPR, pstr, remainderBufLen, insertRows, startFrom,
|
||||
startTime,
|
||||
pSamplePos, &dataLen);
|
||||
|
@ -4781,8 +4796,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
|||
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
|
||||
int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows;
|
||||
|
||||
if (interlaceRows > insertRows)
|
||||
interlaceRows = insertRows;
|
||||
if (interlaceRows > g_args.num_of_RPR)
|
||||
interlaceRows = g_args.num_of_RPR;
|
||||
|
||||
int insertMode;
|
||||
|
||||
|
@ -4847,8 +4862,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
|||
bool flagSleep = true;
|
||||
int sleepTimeTotal = 0;
|
||||
|
||||
int remainderBufLen;
|
||||
|
||||
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
|
||||
if ((flagSleep) && (insert_interval)) {
|
||||
st = taosGetTimestampUs();
|
||||
|
@ -4856,7 +4869,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
|||
}
|
||||
// generate data
|
||||
memset(buffer, 0, maxSqlLen);
|
||||
remainderBufLen = maxSqlLen;
|
||||
int remainderBufLen = maxSqlLen;
|
||||
|
||||
char *pstr = buffer;
|
||||
int recOfBatch = 0;
|
||||
|
@ -5102,11 +5115,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
|||
*/
|
||||
} // num_of_DPT
|
||||
|
||||
if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo &&
|
||||
if (g_args.verbose_print) {
|
||||
if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo &&
|
||||
(0 == strncasecmp(
|
||||
superTblInfo->dataSource, "sample", strlen("sample")))) {
|
||||
printf("%s() LN%d samplePos=%d\n",
|
||||
verbosePrint("%s() LN%d samplePos=%d\n",
|
||||
__func__, __LINE__, pThreadInfo->samplePos);
|
||||
}
|
||||
}
|
||||
} // tableSeq
|
||||
|
||||
|
|
Loading…
Reference in New Issue