Merge pull request #28408 from taosdata/enh/TD-32181/client

Enh/td 32181/client
This commit is contained in:
Shengliang Guan 2024-11-06 09:42:05 +08:00 committed by GitHub
commit 0be118c0aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 499 additions and 175 deletions

View File

@ -189,7 +189,12 @@ static FORCE_INLINE void colDataSetDouble(SColumnInfoData* pColumnInfoData, uint
int32_t getJsonValueLen(const char* data);
// For the VAR_DATA_TYPE type, new data is inserted strictly according to the position of SVarColAttr.length.
// If the same row is inserted repeatedly, data holes will result.
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
// For the VAR_DATA_TYPE type, if a row already has data before inserting it (judged by offset != -1),
// it will be inserted at the original position and the old data will be overwritten.
int32_t colDataSetValOrCover(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData);
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows,
bool trimValue);
@ -233,7 +238,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
* @brief find how many rows already in order start from first row
*/
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo);
void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk);
int32_t blockDataCheck(const SSDataBlock* pDataBlock);
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
@ -266,7 +271,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId)
int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData);
int32_t blockGetEncodeSize(const SSDataBlock* pBlock);
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataLen, int32_t numOfCols);
int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos);
// for debug

View File

@ -153,6 +153,12 @@ extern bool tsEnableCrashReport;
extern char *tsTelemUri;
extern char *tsClientCrashReportUri;
extern char *tsSvrCrashReportUri;
extern int8_t tsSafetyCheckLevel;
enum {
TSDB_SAFETY_CHECK_LEVELL_NEVER = 0,
TSDB_SAFETY_CHECK_LEVELL_NORMAL = 1,
TSDB_SAFETY_CHECK_LEVELL_BYROW = 2,
};
// query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing

View File

@ -1215,6 +1215,7 @@ typedef struct {
int32_t bytes;
int8_t type;
uint8_t pk;
bool noData;
} SColumnInfo;
typedef struct STimeWindow {

View File

@ -298,6 +298,7 @@ typedef struct {
#define IS_VALID_UINT64(_t) ((_t) >= 0 && (_t) <= UINT64_MAX)
#define IS_VALID_FLOAT(_t) ((_t) >= -FLT_MAX && (_t) <= FLT_MAX)
#define IS_VALID_DOUBLE(_t) ((_t) >= -DBL_MAX && (_t) <= DBL_MAX)
#define IS_INVALID_TYPE(_t) ((_t) < TSDB_DATA_TYPE_NULL || (_t) >= TSDB_DATA_TYPE_MAX)
#define IS_CONVERT_AS_SIGNED(_t) \
(IS_SIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))

View File

@ -292,6 +292,7 @@ bool fmIsElapsedFunc(int32_t funcId);
void getLastCacheDataType(SDataType* pType, int32_t pkBytes);
int32_t createFunction(const char* pName, SNodeList* pParameterList, SFunctionNode** pFunc);
int32_t createFunctionWithSrcFunc(const char* pName, const SFunctionNode* pSrcFunc, SNodeList* pParameterList, SFunctionNode** pFunc);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc, SFunctionNode** pMergeFunc);

View File

@ -105,6 +105,7 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t timeZoneStrLen();
int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t weekdayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t dayofweekFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);

View File

@ -297,8 +297,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
bool convertUcs4);
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4);
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq);

View File

@ -2084,12 +2084,12 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
return TSDB_CODE_SUCCESS;
}
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength) {
int32_t idx = -1;
iconv_t conv = taosAcquireConv(&idx, C2M);
if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
for (int32_t i = 0; i < numOfCols; ++i) {
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
int32_t type = pResultInfo->fields[i].type;
int32_t bytes = pResultInfo->fields[i].bytes;
@ -2103,7 +2103,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
pResultInfo->convertBuf[i] = p;
SResultColumn* pCol = &pResultInfo->pCol[i];
for (int32_t j = 0; j < numOfRows; ++j) {
for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
if (pCol->offset[j] != -1) {
char* pStart = pCol->offset[j] + pCol->pData;
@ -2136,10 +2136,13 @@ int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
numOfCols * (sizeof(int8_t) + sizeof(int32_t));
}
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) {
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
char* p = (char*)pResultInfo->pData;
int32_t blockVersion = *(int32_t*)p;
int32_t numOfRows = pResultInfo->numOfRows;
int32_t numOfCols = pResultInfo->numOfCols;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
// length |
int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
@ -2198,10 +2201,16 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i
}
pStart += colLen;
}
// Ensure the complete structure of the block, including the blankfill field,
// even though it is not used on the client side.
len += sizeof(bool);
return len;
}
static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) {
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
int32_t numOfRows = pResultInfo->numOfRows;
int32_t numOfCols = pResultInfo->numOfCols;
bool needConvert = false;
for (int32_t i = 0; i < numOfCols; ++i) {
if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
@ -2218,7 +2227,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
char* p = (char*)pResultInfo->pData;
int32_t blockVersion = *(int32_t*)p;
int32_t dataLen = estimateJsonLen(pResultInfo, numOfCols, numOfRows);
int32_t dataLen = estimateJsonLen(pResultInfo);
if (dataLen <= 0) {
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
@ -2341,27 +2350,36 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
pStart1 += colLen1;
}
// Ensure the complete structure of the block, including the blankfill field,
// even though it is not used on the client side.
// (void)memcpy(pStart1, pStart, sizeof(bool));
totalLen += sizeof(bool);
*(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
pResultInfo->pData = pResultInfo->convertJson;
return TSDB_CODE_SUCCESS;
}
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
bool convertUcs4) {
if (numOfCols <= 0 || pFields == NULL || pResultInfo == NULL) {
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4) {
if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
tscError("setResultDataPtr paras error");
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (numOfRows == 0) {
if (pResultInfo->numOfRows == 0) {
return TSDB_CODE_SUCCESS;
}
if (pResultInfo->pData == NULL) {
tscError("setResultDataPtr error: pData is NULL");
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
int32_t code = doPrepareResPtr(pResultInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doConvertJson(pResultInfo, numOfCols, numOfRows);
code = doConvertJson(pResultInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -2381,9 +2399,9 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
int32_t cols = *(int32_t*)p;
p += sizeof(int32_t);
if (rows != numOfRows || cols != numOfCols) {
tscError("setResultDataPtr paras error:rows;%d numOfRows:%d cols:%d numOfCols:%d", rows, numOfRows, cols,
numOfCols);
if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows, pResultInfo->numOfRows, cols,
pResultInfo->numOfCols);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
@ -2394,7 +2412,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
p += sizeof(uint64_t);
// check fields
for (int32_t i = 0; i < numOfCols; ++i) {
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
int8_t type = *(int8_t*)p;
p += sizeof(int8_t);
@ -2403,10 +2421,14 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
}
int32_t* colLength = (int32_t*)p;
p += sizeof(int32_t) * numOfCols;
p += sizeof(int32_t) * pResultInfo->numOfCols;
char* pStart = p;
for (int32_t i = 0; i < numOfCols; ++i) {
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
if ((pStart - pResultInfo->pData) >= dataLen) {
tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (blockVersion == BLOCK_VERSION_1) {
colLength[i] = htonl(colLength[i]);
}
@ -2414,10 +2436,13 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
tscError("invalid type %d", pResultInfo->fields[i].type);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
pResultInfo->pCol[i].offset = (int32_t*)pStart;
pStart += numOfRows * sizeof(int32_t);
pStart += pResultInfo->numOfRows * sizeof(int32_t);
} else {
pResultInfo->pCol[i].nullbitmap = pStart;
pStart += BitmapLen(pResultInfo->numOfRows);
@ -2430,11 +2455,17 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
pStart += colLength[i];
}
p = pStart;
// bool blankFill = *(bool*)p;
p += sizeof(bool);
int32_t offset = p - pResultInfo->pData;
if (offset > dataLen) {
tscError("invalid offset %d, dataLen %d", offset, dataLen);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (convertUcs4) {
code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength);
code = doConvertUCS4(pResultInfo, colLength);
}
return code;
@ -2547,7 +2578,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
pResultInfo->totalRows += pResultInfo->numOfRows;
int32_t code =
setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, convertUcs4);
setResultDataPtr(pResultInfo, convertUcs4);
return code;
}

View File

@ -588,7 +588,8 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
return code;
}
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
*pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) {
code = terrno;
@ -603,7 +604,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, SHOW_VARIABLES_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SHOW_VARIABLES_RESULT_COLS);
if(len < 0) {
uError("buildShowVariablesRsp error, len:%d", len);
code = terrno;
@ -741,7 +742,8 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
return code;
}
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
*pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) {
code = terrno;
@ -757,7 +759,7 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, COMPACT_DB_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, COMPACT_DB_RESULT_COLS);
if(len < 0) {
uError("buildRetriveTableRspForCompactDb error, len:%d", len);
code = terrno;

View File

@ -2869,8 +2869,7 @@ int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pRes
pRspObj->resInfo.precision = precision;
pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
int32_t code = setResultDataPtr(&pRspObj->resInfo, pRspObj->resInfo.fields, pRspObj->resInfo.numOfCols,
pRspObj->resInfo.numOfRows, convertUcs4);
int32_t code = setResultDataPtr(&pRspObj->resInfo, convertUcs4);
if (code != 0) {
return code;
}

View File

@ -165,8 +165,8 @@ static const SSysDbTableSchema userStbsSchema[] = {
static const SSysDbTableSchema streamSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "stream_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "stream_id", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_id", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
@ -190,9 +190,9 @@ static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "process_total", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "process_throughput", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "out_total", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "out_throughput", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "process_throughput", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "out_total", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "out_throughput", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
// {.name = "dispatch_throughput", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
// {.name = "dispatch_total", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},

View File

@ -18,6 +18,7 @@
#include "tcompare.h"
#include "tlog.h"
#include "tname.h"
#include "tglobal.h"
#define MALLOC_ALIGN_BYTES 32
@ -86,8 +87,18 @@ int32_t getJsonValueLen(const char* data) {
return dataLen;
}
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
if (isNull || pData == NULL) {
static int32_t getDataLen(int32_t type, const char* pData) {
int32_t dataLen = 0;
if (type == TSDB_DATA_TYPE_JSON) {
dataLen = getJsonValueLen(pData);
} else {
dataLen = varDataTLen(pData);
}
return dataLen;
}
static int32_t colDataSetValHelp(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
if (isNull || pData == NULL) {
// There is a placehold for each NULL value of binary or nchar type.
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
pColumnInfoData->varmeta.offset[rowIndex] = -1; // it is a null value of VAR type.
@ -101,11 +112,9 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
int32_t type = pColumnInfoData->info.type;
if (IS_VAR_DATA_TYPE(type)) {
int32_t dataLen = 0;
if (type == TSDB_DATA_TYPE_JSON) {
dataLen = getJsonValueLen(pData);
} else {
dataLen = varDataTLen(pData);
int32_t dataLen = getDataLen(type, pData);
if (pColumnInfoData->varmeta.offset[rowIndex] > 0) {
pColumnInfoData->varmeta.length = pColumnInfoData->varmeta.offset[rowIndex];
}
SVarColAttr* pAttr = &pColumnInfoData->varmeta;
@ -134,7 +143,7 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
uint32_t len = pColumnInfoData->varmeta.length;
pColumnInfoData->varmeta.offset[rowIndex] = len;
(void) memmove(pColumnInfoData->pData + len, pData, dataLen);
(void)memmove(pColumnInfoData->pData + len, pData, dataLen);
pColumnInfoData->varmeta.length += dataLen;
} else {
memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes);
@ -144,6 +153,18 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
return 0;
}
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
pColumnInfoData->varmeta.offset[rowIndex] = -1;
}
return colDataSetValHelp(pColumnInfoData, rowIndex, pData, isNull);
}
int32_t colDataSetValOrCover(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
return colDataSetValHelp(pColumnInfoData, rowIndex, pData, isNull);
}
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx,
const char* pData) {
int32_t type = pColumnInfoData->info.type;
@ -3041,8 +3062,12 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
}
// return length of encoded data, return -1 if failed
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
blockDataCheck(pBlock, false);
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) {
int32_t code = blockDataCheck(pBlock);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return -1;
}
int32_t dataLen = 0;
@ -3106,9 +3131,11 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
size_t metaSize = 0;
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
metaSize = numOfRows * sizeof(int32_t);
if(dataLen + metaSize > dataBuflen) goto _exit;
memcpy(data, pColRes->varmeta.offset, metaSize);
} else {
metaSize = BitmapLen(numOfRows);
if(dataLen + metaSize > dataBuflen) goto _exit;
memcpy(data, pColRes->nullbitmap, metaSize);
}
@ -3127,12 +3154,14 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
}
colSizes[col] += colSize;
dataLen += colSize;
if(dataLen > dataBuflen) goto _exit;
(void) memmove(data, pColData, colSize);
data += colSize;
}
} else {
colSizes[col] = colDataGetLength(pColRes, numOfRows);
dataLen += colSizes[col];
if(dataLen > dataBuflen) goto _exit;
if (pColRes->pData != NULL) {
(void) memmove(data, pColRes->pData, colSizes[col]);
}
@ -3156,7 +3185,14 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
*actualLen = dataLen;
*groupId = pBlock->info.id.groupId;
if (dataLen > dataBuflen) goto _exit;
return dataLen;
_exit:
uError("blockEncode dataLen:%d, dataBuflen:%zu", dataLen, dataBuflen);
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
return -1;
}
int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos) {
@ -3286,9 +3322,13 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos
*pEndPos = pStart;
blockDataCheck(pBlock, false);
code = blockDataCheck(pBlock);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}
return code;
return TSDB_CODE_SUCCESS;
}
int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
@ -3498,20 +3538,19 @@ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
return nextRowIdx;
}
void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) {
return;
if (NULL == pDataBlock || pDataBlock->info.rows == 0) {
return;
#define BLOCK_DATA_CHECK_TRESSA(o) \
if (!(o)) { \
uError("blockDataCheck failed! line:%d", __LINE__); \
return TSDB_CODE_INTERNAL_ERROR; \
}
int32_t blockDataCheck(const SSDataBlock* pDataBlock) {
if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER || NULL == pDataBlock || pDataBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
#define BLOCK_DATA_CHECK_TRESSA(o) ;
//#define BLOCK_DATA_CHECK_TRESSA(o) A S S E R T(o)
BLOCK_DATA_CHECK_TRESSA(pDataBlock->info.rows > 0);
if (!pDataBlock->info.dataLoad && !forceChk) {
return;
if (!pDataBlock->info.dataLoad) {
return TSDB_CODE_SUCCESS;
}
bool isVarType = false;
@ -3522,8 +3561,10 @@ void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) {
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < colNum; ++i) {
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, i);
BLOCK_DATA_CHECK_TRESSA(pCol != NULL);
isVarType = IS_VAR_DATA_TYPE(pCol->info.type);
checkRows = pDataBlock->info.rows;
if (pCol->info.noData == true) continue;
if (isVarType) {
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset);
@ -3531,27 +3572,39 @@ void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) {
BLOCK_DATA_CHECK_TRESSA(pCol->nullbitmap);
}
nextPos = 0;
nextPos = -1;
for (int64_t r = 0; r < checkRows; ++r) {
if (tsSafetyCheckLevel <= TSDB_SAFETY_CHECK_LEVELL_NORMAL) break;
if (!colDataIsNull_s(pCol, r)) {
BLOCK_DATA_CHECK_TRESSA(pCol->pData);
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.length <= pCol->varmeta.allocLen);
if (isVarType) {
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.allocLen > 0);
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] < pCol->varmeta.length);
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] <= pCol->varmeta.length);
if (pCol->reassigned) {
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] >= 0);
} else if (0 == r) {
} else if (0 == r || nextPos == -1) {
nextPos = pCol->varmeta.offset[r];
} else {
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] == nextPos);
}
colLen = varDataTLen(pCol->pData + pCol->varmeta.offset[r]);
BLOCK_DATA_CHECK_TRESSA(colLen >= VARSTR_HEADER_SIZE);
char* pColData = pCol->pData + pCol->varmeta.offset[r];
int32_t colSize = 0;
if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
colLen = getJsonValueLen(pColData);
} else {
colLen = varDataTLen(pColData);
}
if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
BLOCK_DATA_CHECK_TRESSA(colLen >= CHAR_BYTES);
} else {
BLOCK_DATA_CHECK_TRESSA(colLen >= VARSTR_HEADER_SIZE);
}
BLOCK_DATA_CHECK_TRESSA(colLen <= pCol->info.bytes);
if (pCol->reassigned) {
BLOCK_DATA_CHECK_TRESSA((pCol->varmeta.offset[r] + colLen) <= pCol->varmeta.length);
} else {
@ -3561,13 +3614,21 @@ void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) {
typeValue = *(char*)(pCol->pData + pCol->varmeta.offset[r] + colLen - 1);
} else {
GET_TYPED_DATA(typeValue, int64_t, pCol->info.type, colDataGetNumData(pCol, r));
if (TSDB_DATA_TYPE_FLOAT == pCol->info.type) {
float v = 0;
GET_TYPED_DATA(v, float, pCol->info.type, colDataGetNumData(pCol, r));
} else if (TSDB_DATA_TYPE_DOUBLE == pCol->info.type) {
double v = 0;
GET_TYPED_DATA(v, double, pCol->info.type, colDataGetNumData(pCol, r));
} else {
GET_TYPED_DATA(typeValue, int64_t, pCol->info.type, colDataGetNumData(pCol, r));
}
}
}
}
}
return;
return TSDB_CODE_SUCCESS;
}

View File

@ -137,8 +137,9 @@ bool tsEnableCrashReport = false;
#else
bool tsEnableCrashReport = true;
#endif
char *tsClientCrashReportUri = "/ccrashreport";
char *tsSvrCrashReportUri = "/dcrashreport";
char *tsClientCrashReportUri = "/ccrashreport";
char *tsSvrCrashReportUri = "/dcrashreport";
int8_t tsSafetyCheckLevel = TSDB_SAFETY_CHECK_LEVELL_NORMAL;
// schemaless
bool tsSmlDot2Underline = true;
@ -610,6 +611,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(
cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorScope", tsRandErrScope, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "safetyCheckLevel", tsSafetyCheckLevel, 0, 5, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, TSDB_MAX_RPC_THREADS);
@ -1305,6 +1307,9 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "tsmaDataDeleteMark");
tsmaDataDeleteMark = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "safetyCheckLevel");
tsSafetyCheckLevel = pItem->i32;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -2049,7 +2054,8 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"s3UploadDelaySec", &tsS3UploadDelaySec},
{"supportVnodes", &tsNumOfSupportVnodes},
{"experimental", &tsExperimental},
{"maxTsmaNum", &tsMaxTsmaNum}};
{"maxTsmaNum", &tsMaxTsmaNum},
{"safetyCheckLevel", &tsSafetyCheckLevel}};
if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false);
@ -2305,7 +2311,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
{"experimental", &tsExperimental},
{"multiResultFunctionStarReturnTags", &tsMultiResultFunctionStarReturnTags},
{"maxTsmaCalcDelay", &tsMaxTsmaCalcDelay},
{"tsmaDataDeleteMark", &tsmaDataDeleteMark}};
{"tsmaDataDeleteMark", &tsmaDataDeleteMark},
{"safetyCheckLevel", &tsSafetyCheckLevel}};
if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false);

View File

@ -548,8 +548,8 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols +
blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(numOfCols);
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
@ -574,7 +574,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
pStart += sizeof(SSysTableSchema);
}
int32_t len = blockEncode(pBlock, pStart, numOfCols);
int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
if (len < 0) {
dError("failed to retrieve data since %s", tstrerror(code));
blockDataDestroy(pBlock);

View File

@ -333,8 +333,8 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
}
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns +
blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock));
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns + dataEncodeBufSize;
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
@ -361,7 +361,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
pStart += sizeof(SSysTableSchema);
}
int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, pShow->pMeta->numOfColumns);
if(len < 0){
mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, tstrerror(code));
code = terrno;

View File

@ -16,7 +16,8 @@
#include "tq.h"
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + blockGetEncodeSize(pBlock);
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) {
return terrno;
@ -28,7 +29,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
pRetrieve->compressed = 0;
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols);
if(actualLen < 0){
taosMemoryFree(buf);
return terrno;

View File

@ -628,6 +628,9 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void*
tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
TSDB_CHECK_NULL(tmp, code, line, END, terrno)
colDataSetNULL(tmp, i);
tmp = taosArrayGet(pDelBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
TSDB_CHECK_NULL(tmp, code, line, END, terrno)
colDataSetNULL(tmp, i);
}
if (type == 0) {

View File

@ -35,7 +35,8 @@
extern SConfig* tsCfg;
static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRetrieveTableRsp** pRsp) {
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
*pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) {
return terrno;
@ -49,8 +50,8 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(numOfCols);
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, numOfCols);
if (len < 0) {
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, numOfCols);
if(len < 0) {
taosMemoryFree(*pRsp);
return terrno;
}

View File

@ -1966,7 +1966,8 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
pBlock->info.rows = rowNum;
int32_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
int32_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize);
if (NULL == rsp) {
@ -1977,7 +1978,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
rsp->completed = 1;
rsp->numOfRows = htobe64((int64_t)rowNum);
int32_t len = blockEncode(pBlock, rsp->data + PAYLOAD_PREFIX_LEN, taosArrayGetSize(pBlock->pDataBlock));
int32_t len = blockEncode(pBlock, rsp->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, taosArrayGetSize(pBlock->pDataBlock));
if(len < 0) {
qError("qExplainGetRspFromCtx: blockEncode failed");
QRY_ERR_JRET(terrno);

View File

@ -45,6 +45,7 @@ typedef struct SDataDispatchHandle {
SDataBlockDescNode* pSchema;
STaosQueue* pDataBlocks;
SDataDispatchBuf nextOutput;
int32_t outPutColCounts;
int32_t status;
bool queryEnd;
uint64_t useconds;
@ -54,6 +55,65 @@ typedef struct SDataDispatchHandle {
TdThreadMutex mutex;
} SDataDispatchHandle;
static int32_t inputSafetyCheck(SDataDispatchHandle* pHandle, const SInputData* pInput) {
if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
return TSDB_CODE_SUCCESS;
}
if (pInput == NULL || pInput->pData == NULL || pInput->pData->info.rows <= 0) {
qError("invalid input data");
return TSDB_CODE_QRY_INVALID_INPUT;
}
SDataBlockDescNode* pSchema = pHandle->pSchema;
if (pSchema == NULL || pSchema->totalRowSize != pInput->pData->info.rowSize) {
qError("invalid schema");
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (pHandle->outPutColCounts > taosArrayGetSize(pInput->pData->pDataBlock)) {
qError("invalid column number, schema:%d, input:%zu", pHandle->outPutColCounts, taosArrayGetSize(pInput->pData->pDataBlock));
return TSDB_CODE_QRY_INVALID_INPUT;
}
SNode* pNode;
int32_t colNum = 0;
FOREACH(pNode, pHandle->pSchema->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
SColumnInfoData* pColInfoData = taosArrayGet(pInput->pData->pDataBlock, colNum);
if (pColInfoData == NULL) {
return -1;
}
if (pColInfoData->info.bytes < 0) {
qError("invalid column bytes, schema:%d, input:%d", pSlotDesc->dataType.bytes, pColInfoData->info.bytes);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (!IS_VAR_DATA_TYPE(pColInfoData->info.type) &&
TYPE_BYTES[pColInfoData->info.type] != pColInfoData->info.bytes) {
qError("invalid column bytes, schema:%d, input:%d", TYPE_BYTES[pColInfoData->info.type],
pColInfoData->info.bytes);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (pColInfoData->info.type != pSlotDesc->dataType.type) {
qError("invalid column type, schema:%d, input:%d", pSlotDesc->dataType.type, pColInfoData->info.type);
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (pColInfoData->info.bytes != pSlotDesc->dataType.bytes) {
qError("invalid column bytes, schema:%d, input:%d", pSlotDesc->dataType.bytes, pColInfoData->info.bytes);
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (IS_INVALID_TYPE(pColInfoData->info.type)) {
qError("invalid column type, type:%d", pColInfoData->info.type);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
++colNum;
}
}
return TSDB_CODE_SUCCESS;
}
// clang-format off
// data format:
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
@ -67,6 +127,12 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
int32_t numOfCols = 0;
SNode* pNode;
int32_t code = inputSafetyCheck(pHandle, pInput);
if (code) {
qError("failed to check input data, code:%d", code);
return code;
}
FOREACH(pNode, pHandle->pSchema->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
@ -84,17 +150,18 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
pBuf->useSize = sizeof(SDataCacheEntry);
{
// allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size
size_t dataEncodeBufSize = pBuf->allocSize + 8;
if ((pBuf->allocSize > tsCompressMsgSize) && (tsCompressMsgSize > 0) && pHandle->pManager->cfg.compress) {
if (pHandle->pCompressBuf == NULL) {
// allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size
pHandle->pCompressBuf = taosMemoryMalloc(pBuf->allocSize + 8);
pHandle->pCompressBuf = taosMemoryMalloc(dataEncodeBufSize);
if (NULL == pHandle->pCompressBuf) {
QRY_RET(terrno);
}
pHandle->bufSize = pBuf->allocSize + 8;
pHandle->bufSize = dataEncodeBufSize;
} else {
if (pHandle->bufSize < pBuf->allocSize + 8) {
pHandle->bufSize = pBuf->allocSize + 8;
if (pHandle->bufSize < dataEncodeBufSize) {
pHandle->bufSize = dataEncodeBufSize;
void* p = taosMemoryRealloc(pHandle->pCompressBuf, pHandle->bufSize);
if (p != NULL) {
pHandle->pCompressBuf = p;
@ -105,7 +172,7 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
}
}
int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, numOfCols);
int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, dataEncodeBufSize, numOfCols);
if(dataLen < 0) {
qError("failed to encode data block, code: %d", dataLen);
return terrno;
@ -123,7 +190,7 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
TAOS_MEMCPY(pEntry->data, pHandle->pCompressBuf, dataLen);
}
} else {
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, pBuf->allocSize, numOfCols);
if(pEntry->dataLen < 0) {
qError("failed to encode data block, code: %d", pEntry->dataLen);
return terrno;
@ -314,8 +381,60 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
return TSDB_CODE_SUCCESS;
}
static int32_t blockDescNodeCheck(SDataBlockDescNode* pInputDataBlockDesc) {
if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
return TSDB_CODE_SUCCESS;
}
if (pInputDataBlockDesc == NULL) {
qError("invalid schema");
return TSDB_CODE_QRY_INVALID_INPUT;
}
SNode* pNode;
int32_t realOutputRowSize = 0;
FOREACH(pNode, pInputDataBlockDesc->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
realOutputRowSize += pSlotDesc->dataType.bytes;
} else {
// Slots must be sorted, and slots with 'output' set to true must come first
break;
}
}
if (realOutputRowSize != pInputDataBlockDesc->outputRowSize) {
qError("invalid schema, realOutputRowSize:%d, outputRowSize:%d", realOutputRowSize, pInputDataBlockDesc->outputRowSize);
return TSDB_CODE_QRY_INVALID_INPUT;
}
return TSDB_CODE_SUCCESS;
}
int32_t getOutputColCounts(SDataBlockDescNode* pInputDataBlockDesc) {
if (pInputDataBlockDesc == NULL) {
qError("invalid schema");
return 0;
}
SNode* pNode;
int32_t numOfCols = 0;
FOREACH(pNode, pInputDataBlockDesc->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
++numOfCols;
} else {
// Slots must be sorted, and slots with 'output' set to true must come first
break;
}
}
return numOfCols;
}
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) {
int32_t code;
code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc);
if (code) {
qError("failed to check input data block desc, code:%d", code);
return code;
}
SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
if (NULL == dispatcher) {
@ -333,6 +452,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
dispatcher->pManager = pManager;
pManager = NULL;
dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
dispatcher->outPutColCounts = getOutputColCounts(dispatcher->pSchema);
dispatcher->status = DS_BUF_EMPTY;
dispatcher->queryEnd = false;
code = taosOpenQueue(&dispatcher->pDataBlocks);

View File

@ -528,7 +528,12 @@ static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock**
qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
if (*ppRes && (code == 0)) {
blockDataCheck(*ppRes, false);
code = blockDataCheck(*ppRes);
if (code) {
qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
}
pPost->isStarted = true;
pStbJoin->execInfo.postBlkNum++;
pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;

View File

@ -390,6 +390,7 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) {
createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
idata.info.scale = pDescNode->dataType.scale;
idata.info.precision = pDescNode->dataType.precision;
idata.info.noData = pDescNode->reserve;
code = blockDataAppendColInfo(pBlock, &idata);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -700,12 +700,12 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
pTaskInfo->paramSet = true;
code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
blockDataCheck(pRes, false);
} else {
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
blockDataCheck(pRes, false);
}
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataCheck(pRes);
QUERY_CHECK_CODE(code, lino, _end);
if (pRes == NULL) {
@ -750,7 +750,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
}
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
blockDataCheck(pRes, false);
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataCheck(pRes);
QUERY_CHECK_CODE(code, lino, _end);
}
@ -849,7 +850,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
}
blockDataCheck(*pRes, false);
code = blockDataCheck(*pRes);
if (code) {
pTaskInfo->code = code;
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
}
uint64_t el = (taosGetTimestampUs() - st);

View File

@ -616,11 +616,12 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p
}
}
}
code = TSDB_CODE_SUCCESS;
code = blockDataCheck(pBlock);
QUERY_CHECK_CODE(code, lino, _err);
_err:
blockDataCheck(pBlock, true);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
colDataDestroy(p);
taosMemoryFree(p);
return code;
@ -701,7 +702,7 @@ int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResu
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
code = colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
code = colDataSetValOrCover(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
QUERY_CHECK_CODE(code, lino, _end);
}
}

View File

@ -765,7 +765,7 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p
}
}
blockDataCheck(pBlock, false);
code = blockDataCheck(pBlock);
*ppRes = pBlock;
return code;

View File

@ -65,11 +65,14 @@ static int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock);
int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
blockDataCheck(*ppBlock, false);
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
if (code) {
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
}
code = blockDataCheck(*ppBlock);
if (code) {
qError("failed to check data block got from upstream, %s code:%s", __func__, tstrerror(code));
}
return code;
}
@ -526,7 +529,8 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
if ((*pResBlock) != NULL) {
pOperator->resultInfo.totalRows += (*pResBlock)->info.rows;
blockDataCheck(*pResBlock, false);
code = blockDataCheck(*pResBlock);
QUERY_CHECK_CODE(code, lino, _end);
} else {
setOperatorCompleted(pOperator);
}

View File

@ -870,15 +870,25 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
SSDataBlock* p = NULL;
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
blockDataCheck(p, false);
return (code == 0)? p:NULL;
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
if (code == TSDB_CODE_SUCCESS) {
code = blockDataCheck(p);
if (code != TSDB_CODE_SUCCESS) {
qError("blockDataCheck failed, code:%s", tstrerror(code));
}
}
return (code == 0) ? p : NULL;
}
SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) {
SSDataBlock* p = NULL;
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
blockDataCheck(p, false);
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
if (code == TSDB_CODE_SUCCESS) {
code = blockDataCheck(p);
if (code != TSDB_CODE_SUCCESS) {
qError("blockDataCheck failed, code:%s", tstrerror(code));
}
}
return (code == 0)? p:NULL;
}

View File

@ -1461,6 +1461,18 @@ static void destroyTableScanOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
static void resetClolumnReserve(SSDataBlock* pBlock, int32_t dataRequireFlag) {
if (pBlock && dataRequireFlag == FUNC_DATA_REQUIRED_NOT_LOAD) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
if (pCol) {
pCol->info.noData = true;
}
}
}
}
int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
@ -1511,6 +1523,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
resetClolumnReserve(pInfo->pResBlock, pInfo->base.dataBlockLoadFlag);
QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno);
code = prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo);
@ -2102,6 +2115,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
SColumnInfoData* pDestTableNameInxCol = taosArrayGet(pDestBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
uint64_t groupId = pSrcGp[i];
if (groupId == 0) {
@ -2139,7 +2153,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
QUERY_CHECK_CODE(code, lino, _end);
colDataSetNULL(pDestCalStartTsCol, i);
colDataSetNULL(pDestCalEndTsCol, i);
colDataSetNULL(pDestTableNameInxCol, i);
pDestBlock->info.rows++;
}
@ -2192,6 +2206,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
SColumnInfoData* pDestTableNameInxCol = taosArrayGet(pDestBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
uint64_t groupId = pSrcGp[i];
if (groupId == 0) {
@ -2220,6 +2235,8 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
code = colDataSetVal(pDestCalEndTsCol, i, (const char*)&range.win.ekey, false);
QUERY_CHECK_CODE(code, lino, _end);
colDataSetNULL(pDestTableNameInxCol, i);
pDestBlock->info.rows++;
}
@ -2274,6 +2291,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
SColumnInfoData* pDestTableNameInxCol = taosArrayGet(pDestBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
for (int32_t i = 0; i < pSrcBlock->info.rows;) {
uint64_t srcUid = srcUidData[i];
uint64_t groupId = srcGp[i];
@ -2306,6 +2324,8 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
code = colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
QUERY_CHECK_CODE(code, lino, _end);
colDataSetNULL(pDestTableNameInxCol, pDestBlock->info.rows);
pDestBlock->info.rows++;
}
@ -3061,6 +3081,7 @@ static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, S
colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
colDataSetNULL(taosArrayGet(pDst->pDataBlock, TABLE_NAME_COLUMN_INDEX), j);
j++;
}
}
@ -3657,6 +3678,9 @@ FETCH_NEXT_BLOCK:
// printDataBlock(pInfo->pCheckpointRes, "stream scan ck", GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->pCheckpointRes;
return code;
} else {
qError("stream scan error, invalid block type %d, %s", pInfo->blockType, id);
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
_end:

View File

@ -334,10 +334,14 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
blockDataCheck(*ppBlock, false);
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
if (code) {
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
} else {
code = blockDataCheck(*ppBlock);
if (code) {
qError("failed to check block data, %s code:%s", __func__, tstrerror(code));
}
}
return code;
}
@ -630,7 +634,8 @@ int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
QUERY_CHECK_CODE(code, lino, _end);
if (block != NULL) {
blockDataCheck(block, false);
code = blockDataCheck(block);
QUERY_CHECK_CODE(code, lino, _end);
if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
*ppBlock = block;

View File

@ -1385,9 +1385,12 @@ static int32_t translateRepeat(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
uint8_t type = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type;
int32_t orgLen = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->bytes;
int32_t count = TMAX((int32_t)((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i, 1);
int32_t resLen = orgLen * count;
int32_t resLen;
if (nodeType(nodesListGetNode(pFunc->pParameterList, 1)) == QUERY_NODE_VALUE) {
resLen = orgLen * TMAX((int32_t)((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i, 1);
} else {
resLen = TSDB_MAX_BINARY_LEN;
}
pFunc->node.resType = (SDataType){.bytes = resLen, .type = type};
return TSDB_CODE_SUCCESS;
}
@ -1535,14 +1538,16 @@ static int32_t translateToJson(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
static int32_t translateOutGeom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_GEOMETRY].bytes, .type = TSDB_DATA_TYPE_GEOMETRY};
SDataType dt = *getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0));
pFunc->node.resType = (SDataType){.bytes = dt.bytes, .type = TSDB_DATA_TYPE_GEOMETRY};
return TSDB_CODE_SUCCESS;
}
static int32_t translateInGeomOutStr(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_VARCHAR].bytes, .type = TSDB_DATA_TYPE_VARCHAR};
SDataType dt = *getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0));
pFunc->node.resType = (SDataType){.bytes = dt.bytes, .type = TSDB_DATA_TYPE_VARCHAR};
return TSDB_CODE_SUCCESS;
}
@ -1613,7 +1618,7 @@ static int32_t translateOutVarchar(SFunctionNode* pFunc, char* pErrBuf, int32_t
break;
case FUNCTION_TYPE_BLOCK_DIST:
case FUNCTION_TYPE_BLOCK_DIST_INFO:
bytes = 128;
bytes = sizeof(STableBlockDistInfo);
break;
case FUNCTION_TYPE_TO_CHAR:
bytes = 4096;
@ -1655,7 +1660,7 @@ static int32_t translateOutVarchar(SFunctionNode* pFunc, char* pErrBuf, int32_t
bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
break;
case FUNCTION_TYPE_TIMEZONE:
bytes = TD_TIMEZONE_LEN;
bytes = timeZoneStrLen();
break;
case FUNCTION_TYPE_IRATE_PARTIAL:
bytes = getIrateInfoSize((pFunc->hasPk) ? pFunc->pkBytes : 0) + VARSTR_HEADER_SIZE;

View File

@ -412,6 +412,27 @@ int32_t createFunction(const char* pName, SNodeList* pParameterList, SFunctionNo
return code;
}
int32_t createFunctionWithSrcFunc(const char* pName, const SFunctionNode* pSrcFunc, SNodeList* pParameterList, SFunctionNode** ppFunc) {
int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)ppFunc);
if (NULL == *ppFunc) {
return code;
}
(*ppFunc)->hasPk = pSrcFunc->hasPk;
(*ppFunc)->pkBytes = pSrcFunc->pkBytes;
(void)snprintf((*ppFunc)->functionName, sizeof((*ppFunc)->functionName), "%s", pName);
(*ppFunc)->pParameterList = pParameterList;
code = getFuncInfo((*ppFunc));
if (TSDB_CODE_SUCCESS != code) {
(*ppFunc)->pParameterList = NULL;
nodesDestroyNode((SNode*)*ppFunc);
*ppFunc = NULL;
return code;
}
return code;
}
static int32_t createColumnByFunc(const SFunctionNode* pFunc, SColumnNode** ppCol) {
int32_t code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)ppCol);
if (NULL == *ppCol) {
@ -438,7 +459,8 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod
if (NULL == pParameterList) {
return code;
}
code = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pPartialFunc, pParameterList,pPartialFunc );
code =
createFunctionWithSrcFunc(funcMgtBuiltins[pSrcFunc->funcId].pPartialFunc, pSrcFunc, pParameterList, pPartialFunc);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pParameterList);
return code;
@ -452,8 +474,6 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod
return TSDB_CODE_FAILED;
}
tstrncpy((*pPartialFunc)->node.aliasName, name, TSDB_COL_NAME_LEN);
(*pPartialFunc)->hasPk = pSrcFunc->hasPk;
(*pPartialFunc)->pkBytes = pSrcFunc->pkBytes;
return TSDB_CODE_SUCCESS;
}
@ -479,9 +499,9 @@ static int32_t createMidFunction(const SFunctionNode* pSrcFunc, const SFunctionN
int32_t code = createMergeFuncPara(pSrcFunc, pPartialFunc, &pParameterList);
if (TSDB_CODE_SUCCESS == code) {
if(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc != NULL){
code = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc, pParameterList, &pFunc);
code = createFunctionWithSrcFunc(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc, pSrcFunc, pParameterList, &pFunc);
}else{
code = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList, &pFunc);
code = createFunctionWithSrcFunc(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pSrcFunc, pParameterList, &pFunc);
}
}
if (TSDB_CODE_SUCCESS == code) {
@ -493,8 +513,6 @@ static int32_t createMidFunction(const SFunctionNode* pSrcFunc, const SFunctionN
} else {
nodesDestroyList(pParameterList);
}
(*pMidFunc)->hasPk = pPartialFunc->hasPk;
(*pMidFunc)->pkBytes = pPartialFunc->pkBytes;
return code;
}
@ -505,7 +523,7 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
int32_t code = createMergeFuncPara(pSrcFunc, pPartialFunc, &pParameterList);
if (TSDB_CODE_SUCCESS == code) {
code = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList, &pFunc);
code = createFunctionWithSrcFunc(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pSrcFunc, pParameterList, &pFunc);
}
if (TSDB_CODE_SUCCESS == code) {
pFunc->hasOriginalFunc = true;
@ -522,8 +540,6 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
} else {
nodesDestroyList(pParameterList);
}
(*pMergeFunc)->hasPk = pPartialFunc->hasPk;
(*pMergeFunc)->pkBytes = pPartialFunc->pkBytes;
return code;
}

View File

@ -332,7 +332,7 @@ static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList,
}
static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false, false);
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false, true);
}
static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlockDescNode* pDataBlockDesc) {
@ -354,7 +354,7 @@ static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlo
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList,
SDataBlockDescNode* pDataBlockDesc) {
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, false, false);
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, false, true);
}
static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {

View File

@ -2414,7 +2414,7 @@ int32_t toCharFunction(SScalarParam* pInput, int32_t inputNum, SScalarParam* pOu
char *ts = colDataGetData(pInput[0].columnData, i);
char *formatData = colDataGetData(pInput[1].columnData, pInput[1].numOfRows > 1 ? i : 0);
len = TMIN(TS_FORMAT_MAX_LEN - 1, varDataLen(formatData));
len = TMIN(TS_FORMAT_MAX_LEN - VARSTR_HEADER_SIZE, varDataLen(formatData));
if (pInput[1].numOfRows > 1 || i == 0) {
(void)strncpy(format, varDataVal(formatData), len);
format[len] = '\0';
@ -2663,6 +2663,10 @@ int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOut
return TSDB_CODE_SUCCESS;
}
int32_t timeZoneStrLen() {
return sizeof(VarDataLenT) + strlen(tsTimezoneStr);
}
int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
char output[TD_TIMEZONE_LEN + VARSTR_HEADER_SIZE] = {0};
(void)memcpy(varDataVal(output), tsTimezoneStr, TD_TIMEZONE_LEN);

View File

@ -145,7 +145,8 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req) {
SRetrieveTableRsp* pRetrieve = NULL;
int32_t len = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
int32_t len = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN;
pRetrieve = taosMemoryCalloc(1, len);
if (pRetrieve == NULL) return terrno;
@ -162,7 +163,7 @@ static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBl
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, dataEncodeSize, numOfCols);
if (actualLen < 0) {
taosMemoryFree(pRetrieve);
return terrno;
@ -1247,7 +1248,8 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
}
int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN;
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) {
return terrno;
@ -1269,7 +1271,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, dataEncodeSize, numOfCols);
if (actualLen < 0) {
taosMemoryFree(buf);
return terrno;

View File

@ -108,13 +108,13 @@ taos> select repeat(nch1, id) from ts_4893.meters where id > 0 order by ts limit
novelnovelnovelnovelnovel |
taos> select repeat(var1, id) from ts_4893.meters where id > 0 order by ts limit 5
repeat(var1, id) |
===================
person |
novelnovel |
plateplateplate |
一二三四五六... |
updateupdateu... |
repeat(var1, id) |
=================================
person |
novelnovel |
plateplateplate |
一二三四五六七八九十一二三... |
updateupdateupdateupdateupdate |
taos> select repeat('nch1', id) from ts_4893.meters where id > 0 order by ts limit 5
repeat('nch1', id) |
@ -229,32 +229,32 @@ taos> select repeat(var1, 3) from ts_4893.meters order by ts limit 10
plateplateplate |
taos> select repeat(name, groupid) from ts_4893.d0 order by ts limit 10
repeat(name, groupid) |
========================
lili |
x |
lili |
x |
lili |
taos |
haha |
taos |
taos |
haha |
repeat(name, groupid) |
=================================
lili |
x |
lili |
x |
lili |
taos |
haha |
taos |
taos |
haha |
taos> select repeat(name, groupid) from ts_4893.meters order by ts limit 10
repeat(name, groupid) |
========================
lili |
x |
lili |
x |
lili |
taos |
haha |
taos |
taos |
haha |
repeat(name, groupid) |
=================================
lili |
x |
lili |
x |
lili |
taos |
haha |
taos |
taos |
haha |
taos> select repeat(nch1, groupid) from ts_4893.d0 order by ts limit 10
repeat(nch1, groupid) |
@ -355,9 +355,9 @@ taos> select repeat('你好', 2)
你好你好 |
taos> select repeat('abc', length('abc'))
repeat('abc', length('abc')) |
===============================
abcabcabc |
repeat('abc', length('abc')) |
=================================
abcabcabc |
taos> select repeat(concat('A', 'B', 'C'), 3)
repeat(concat('A', 'B', 'C'), 3) |

Can't render this file because it has a wrong number of fields in line 4.

View File

@ -48,6 +48,7 @@ class TDSimClient:
"telemetryReporting": "0",
"tqDebugflag": "135",
"stDebugflag":"135",
"safetyCheckLevel":"2"
}
def getLogDir(self):
@ -149,7 +150,8 @@ class TDDnode:
"statusInterval": "1",
"enableQueryHb": "1",
"supportVnodes": "1024",
"telemetryReporting": "0"
"telemetryReporting": "0",
"safetyCheckLevel":"2"
}
def init(self, path, remoteIP = ""):