merge from master

This commit is contained in:
Shengliang Guan 2021-08-03 17:21:41 +08:00
commit 6bdf22f737
4 changed files with 34 additions and 15 deletions

View File

@ -992,9 +992,7 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
index = 0;
sToken = tStrGetToken(*str, &index, false);
if (sToken.n == 0 || sToken.type != TK_RP) {
tscSQLSyntaxErrMsg(pInsertParam->msg, ") expected", *str);
code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return code;
return tscSQLSyntaxErrMsg(pCmd->payload, ") expected", *str);
}
*str += index;

View File

@ -5095,7 +5095,9 @@ static int getRowDataFromSample(
static int64_t generateStbRowData(
SSuperTable* stbInfo,
char* recBuf, int64_t timestamp)
char* recBuf,
int64_t remainderBufLen,
int64_t timestamp)
{
int64_t dataLen = 0;
char *pstr = recBuf;
@ -5123,6 +5125,7 @@ static int64_t generateStbRowData(
rand_string(buf, stbInfo->columns[i].dataLen);
dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "\'%s\',", buf);
tmfree(buf);
} else {
char *tmp;
@ -5179,6 +5182,9 @@ static int64_t generateStbRowData(
tstrncpy(pstr + dataLen, ",", 2);
dataLen += 1;
}
if (dataLen > remainderBufLen)
return 0;
}
dataLen -= 1;
@ -5385,7 +5391,7 @@ static int32_t generateDataTailWithoutStb(
int32_t k = 0;
for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE];
char *data = pstr;
memset(data, 0, MAX_DATA_SIZE);
int64_t retLen = 0;
@ -5409,7 +5415,7 @@ static int32_t generateDataTailWithoutStb(
if (len > remainderBufLen)
break;
pstr += sprintf(pstr, "%s", data);
pstr += retLen;
k++;
len += retLen;
remainderBufLen -= retLen;
@ -5465,14 +5471,14 @@ static int32_t generateStbDataTail(
int32_t k;
for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *data = pstr;
int64_t lenOfRow = 0;
if (tsRand) {
if (superTblInfo->disorderRatio > 0) {
lenOfRow = generateStbRowData(superTblInfo, data,
remainderBufLen,
startTime + getTSRandTail(
superTblInfo->timeStampStep, k,
superTblInfo->disorderRatio,
@ -5480,6 +5486,7 @@ static int32_t generateStbDataTail(
);
} else {
lenOfRow = generateStbRowData(superTblInfo, data,
remainderBufLen,
startTime + superTblInfo->timeStampStep * k
);
}
@ -5492,11 +5499,15 @@ static int32_t generateStbDataTail(
pSamplePos);
}
if (lenOfRow == 0) {
data[0] = '\0';
break;
}
if ((lenOfRow + 1) > remainderBufLen) {
break;
}
pstr += snprintf(pstr , lenOfRow + 1, "%s", data);
pstr += lenOfRow;
k++;
len += lenOfRow;
remainderBufLen -= lenOfRow;
@ -6248,7 +6259,7 @@ static int32_t generateStbProgressiveData(
assert(buffer != NULL);
char *pstr = buffer;
memset(buffer, 0, *pRemainderBufLen);
memset(pstr, 0, *pRemainderBufLen);
int64_t headLen = generateStbSQLHead(
superTblInfo,
@ -6642,7 +6653,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
return NULL;
}
int64_t remainderBufLen = maxSqlLen;
int64_t remainderBufLen = maxSqlLen - 2000;
char *pstr = pThreadInfo->buffer;
int len = snprintf(pstr,
@ -6824,10 +6835,14 @@ static void callBack(void *param, TAOS_RES *res, int code) {
&& rand_num < pThreadInfo->superTblInfo->disorderRatio) {
int64_t d = pThreadInfo->lastTs
- (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1);
generateStbRowData(pThreadInfo->superTblInfo, data, d);
generateStbRowData(pThreadInfo->superTblInfo, data,
MAX_DATA_SIZE,
d);
} else {
generateStbRowData(pThreadInfo->superTblInfo,
data, pThreadInfo->lastTs += 1000);
data,
MAX_DATA_SIZE,
pThreadInfo->lastTs += 1000);
}
pstr += sprintf(pstr, "%s", data);
pThreadInfo->counter++;
@ -7052,6 +7067,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN);
pThreadInfo->time_precision = timePrec;
pThreadInfo->superTblInfo = superTblInfo;

View File

@ -253,11 +253,15 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
int32_t connId = htonl(pHBMsg->connId);
SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort);
if (pConn == NULL) {
pHBMsg->pid = htonl(pHBMsg->pid);
pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pHBMsg->pid, pHBMsg->appName);
}
if (pConn == NULL) {
// do not close existing links, otherwise
// mError("failed to create connId, close connect");
// pRsp->killConnection = 1;
// pRsp->killConnection = 1;
} else {
pRsp->connId = htonl(pConn->connId);
mnodeSaveQueryStreamList(pConn, pHBMsg);

View File

@ -2696,7 +2696,7 @@ static void destroyHelper(void* param) {
free(param);
}
static bool loadBlockOfActiveTable(STsdbQueryHandle* pQueryHandle) {
static bool loadBlockOfActiveTable(STsdbQueryHandle* pQueryHandle) {
if (pQueryHandle->checkFiles) {
// check if the query range overlaps with the file data block
bool exists = true;
@ -2708,6 +2708,7 @@ static bool loadBlockOfActiveTable(STsdbQueryHandle* pQueryHandle) {
}
if (exists) {
tsdbRetrieveDataBlock((TsdbQueryHandleT) pQueryHandle, NULL);
if (pQueryHandle->currentLoadExternalRows && pQueryHandle->window.skey == pQueryHandle->window.ekey) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, 0);
assert(*(int64_t*)pColInfo->pData == pQueryHandle->window.skey);