Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-30837

This commit is contained in:
54liuyao 2024-11-06 11:13:15 +08:00
commit 719a3faa12
62 changed files with 973 additions and 323 deletions

View File

@ -8,11 +8,11 @@ sidebar_label: "ARIMA"
## 功能概述
ARIMA 即自回归移动平均模型Autoregressive Integrated Moving Average, ARIMA也记作 ARIMA(p,d,q),是统计模型中最常见的一种用来进行时间序列预测的模型。
ARIMA 模型是一种自回归模型只需要自变量即可预测后续的值。ARIMA 模型要求时序数据**平稳**,或经过差分处理后平稳,如果是不平稳的数据,**无法**获得正确的结果。
ARIMA 模型是一种自回归模型只需要自变量即可预测后续的值。ARIMA 模型要求时间序列**平稳**,或经过差分处理后平稳,如果是不平稳的数据,**无法**获得正确的结果。
>平稳的时间序列:其性质不随观测时间的变化而变化。具有趋势或季节性的时间序列不是平稳时间序列——趋势和季节性使得时间序列在不同时段呈现不同性质。
以下参数可以动态输入控制预测过程中生成 合适的 ARIMA 模型。
以下参数可以动态输入控制预测过程中生成合适的 ARIMA 模型。
- p= 自回归模型阶数
- d= 差分阶数
@ -21,13 +21,13 @@ ARIMA 模型是一种自回归模型,只需要自变量即可预测后续的
### 参数
分析平台中使用自动化的 ARIMA 模型进行计算,因此每次计算的时候会根据输入的数据自动拟合最合适的模型,然后根据该模型进行预测输出结果。
|参数名称|说明|必填项|
|---|---|---|
|period|输入时间序列每个周期包含的数据点个数如果不设置该参数或该参数设置为 0将使用非季节性/周期性的 ARIMA 模型预测|选填|
|start_p|自回归模型阶数的起始值0 开始的整数,不推荐大于 10 |选填|
|max_p|自回归模型阶数的结束值0 开始的整数,不推荐大于 10 |选填|
|start_q|移动平均模型阶数的起始值0 开始的整数,不推荐大于 10 |选填|
|max_q|移动平均模型阶数的结束值0 开始的整数,不推荐大于 10 |选填|
|参数|说明|必填项|
|---|---|-----|
|period|输入时间序列每个周期包含的数据点个数如果不设置该参数或该参数设置为 0将使用非季节性/周期性的 ARIMA 模型预测|选填|
|start_p|自回归模型阶数的起始值0 开始的整数,不推荐大于 10|选填|
|max_p|自回归模型阶数的结束值0 开始的整数,不推荐大于 10|选填|
|start_q|移动平均模型阶数的起始值0 开始的整数,不推荐大于 10|选填|
|max_q|移动平均模型阶数的结束值0 开始的整数,不推荐大于 10|选填|
|d|差分阶数|选填|
`start_p`、`max_p` `start_q` `max_q` 四个参数约束了模型在多大的范围内去搜寻合适的最优解。相同输入数据的条件下,参数范围越大,消耗的资源越多,系统响应的时间越长。
@ -40,11 +40,11 @@ FORECAST(i32, "algo=arima,alpha=95,period=10,start_p=1,max_p=5,start_q=1,max_q=5
```json5
{
"rows": fc_rows, // 预测结果的行数
"rows": fc_rows, // 返回结果的行数
"period": period, // 返回结果的周期性,同输入
"alpha": alpha, // 返回结果的置信区间,同输入
"algo": "arima", // 返回结果使用的算法
"mse":mse, // 拟合输入时序数据时候生成模型的最小均方误差(MSE)
"mse": mse, // 拟合输入时间序列时候生成模型的最小均方误差(MSE)
"res": res // 列模式的结果
}
```

View File

@ -8,15 +8,15 @@ sidebar_label: "HoltWinters"
## 功能概述
HoltWinters 模型又称为多次指数平滑模型EMA。适用于含有线性趋势和周期波动的非平稳序列利用指数平滑法让模型参数不断适应非平稳序列的变化并对未来趋势进行**短期**预测。
HoltWinters 有两种不同的季节性组成部分,当季节变化在该时间序列中大致保持不变时,通常选择**加法模型**;而当季节变化与时间序列的水平成比例变化时,通常选择**乘法模型**。
该模型对于返回数据也不提供计算的置信区间范围结果。在 95% 置信区间的上下界结果与预测结果相同。
该模型对于返回数据不提供计算的置信区间范围结果,在 95% 置信区间的上下界结果与预测结果相同。
### 参数
分析平台中使用自动化的 ARIMA 模型进行计算,因此每次计算的时候会根据输入的数据自动拟合最合适的模型,然后根据该模型进行预测输出结果。
|参数名称|说明|必填项|
分析平台中使用自动化的 HoltWinters 模型进行计算,因此每次计算的时候会根据输入的数据自动拟合最合适的模型,然后根据该模型进行预测输出结果。
|参数|说明|必填项|
|---|---|---|
|period|输入时间序列每个周期包含的数据点个数。如果不设置该参数或该参数设置为 0 将使用一次(简单)指数平滑方式进行数据拟合,并据此进行未来数据的预测|选填|
|period|输入时间序列每个周期包含的数据点个数。如果不设置该参数或该参数设置为 0将使用一次简单指数平滑方式进行数据拟合并据此进行未来数据的预测|选填|
|trend|趋势模型使用加法模型还是乘法模型|选填|
|seasonal|季节性采用加法模型还是乘法模型|选填|
@ -30,11 +30,11 @@ FORECAST(i32, "algo=holtwinters,period=10,trend=mul,seasonal=mul")
```json5
{
"rows": rows, // 结果的行数
"period": period, // 返回结果的周期性, 该结果与输入的周期性相同,如果没有周期性,该值为 0
"rows": rows, // 返回结果的行数
"period": period, // 返回结果的周期性,该结果与输入的周期性相同,如果没有周期性,该值为 0
"algo": 'holtwinters' // 返回结果使用的计算模型
"mse":mse, // 最小均方误差minmum square error
"res": res // 具体的结果,按照列形式返回的结果。一般意义上包含了 两列 [timestamp][fc_results]。
"mse": mse, // 最小均方误差minmum square error
"res": res // 具体的结果,按照列形式返回的结果。一般意义上包含了两列 [timestamp][fc_results]。
}
```

View File

@ -3,7 +3,7 @@ title: "Anomaly-detection"
sidebar_label: "Anomaly-detection"
---
本节讲述 异常检测算法模型的使用方法。
本节讲述异常检测算法模型的使用方法。
## 概述
分析平台提供了 6 种异常检查模型6 种异常检查模型分为 3 个类别,分别属于基于统计的异常检测模型、基于数据密度的检测模型、基于深度学习的异常检测模型。在不指定异常检测使用的方法的情况下,默认调用 iqr 的方法进行计算。
@ -11,20 +11,20 @@ sidebar_label: "Anomaly-detection"
### 统计学异常检测方法
- k-sigma<sup>[1]</sup>: 即 ***689599.7 rule*** 。***k***值默认为 3 即序列均值的 3 倍标准差范围为边界超过边界的是异常值。KSigma 要求数据整体上服从正态分布,如果一个点偏离均值 K 倍标准差,则该点被视为异常点.
- k-sigma<sup>[1]</sup>: 即 ***689599.7 rule*** 。***k***值默认为 3即序列均值的 3 倍标准差范围为边界超过边界的是异常值。KSigma 要求数据整体上服从正态分布,如果一个点偏离均值 K 倍标准差,则该点被视为异常点.
|参数名称|说明|是否必选|默认值|
|参数|说明|是否必选|默认值|
|---|---|---|---|
|k|标准差倍数|选填|3|
- IQR<sup>[2]</sup>:四分位距 (Interquartile range, IQR) 是一种衡量变异性的方法. 四分位数将一个按等级排序的数据集划分为四个相等的部分。即 Q1第 1 个四分位数、Q2第 2 个四分位数)和 Q3第 3 个四分位数。IQR 定义为 Q3Q1位于 Q3+1.5 。无输入参数。
- IQR<sup>[2]</sup>:四分位距 (Interquartile range, IQR) 是一种衡量变异性的方法. 四分位数将一个按等级排序的数据集划分为四个相等的部分。即 Q1第 1 个四分位数、Q2第 2 个四分位数)和 Q3第 3 个四分位数。IQR 定义为 Q3Q1位于 Q3+1.5。无输入参数。
- Grubbs<sup>[3]</sup>: 又称为 Grubbs' test即最大标准残差测试。Grubbs 通常用作检验最大值、最小值偏离均值的程度是否为异常,该单变量数据集遵循近似标准正态分布。非正态分布数据集不能使用该方法。无输入参数。
- SHESD<sup>[4]</sup> 带有季节性的 ESD 检测算法。ESD 可以检测时间序列数据的多异常点。需要指定异常点比例的上界***k***,最差的情况是至多 49.9%。数据集的异常比例一般不超过 5%
|参数名称|说明|是否必选|默认值|
|参数|说明|是否必选|默认值|
|---|---|---|---|
|k|异常点在输入数据集中占比,范围是$`1\le K \le 49.9`$ |选填|5|

View File

@ -3,7 +3,7 @@ title: "addins"
sidebar_label: "addins"
---
本节说明如何将自己开发的预测算法和异常检测算法整合到 TDengine 分析平台, 并能够通过 SQL 语句进行调用。
本节说明如何将自己开发的预测算法和异常检测算法整合到 TDengine 分析平台,并能够通过 SQL 语句进行调用。
## 目录结构
@ -11,14 +11,14 @@ sidebar_label: "addins"
|目录|说明|
|---|---|
|taos|Python 源代码目录,其下包含了算法具体保存目录 algo放置杂项目录 misc 单元测试和集成测试目录 test。 algo 目录下 ad 放置异常检测算法代码, fc 放置预测算法代码|
|taos|Python 源代码目录,其下包含了算法具体保存目录 algo放置杂项目录 misc单元测试和集成测试目录 test。 algo 目录下 ad 放置异常检测算法代码fc 放置预测算法代码|
|script|是安装脚本和发布脚本放置目录|
|model|放置针对数据集完成的训练模型|
|cfg|配置文件目录|
## 约定与限制
定义异常检测算法的 Python 代码文件 需放在 /taos/algo/ad 目录中,预测算法 Python 代码文件需要放在 /taos/algo/fc 目录中,以确保系统启动的时候能够正常加载对应目录下的 Python 文件。
定义异常检测算法的 Python 代码文件需放在 /taos/algo/ad 目录中,预测算法 Python 代码文件需要放在 /taos/algo/fc 目录中,以确保系统启动的时候能够正常加载对应目录下的 Python 文件。
### 类命名规范
@ -27,33 +27,35 @@ sidebar_label: "addins"
### 类继承约定
异常检测算法需要从 `AbstractAnomalyDetectionService` 继承,并实现其核心抽象方法 `execute`.
预测算法需要从 `AbstractForecastService` 继承,同样需要实现其核心抽象方法 `execute`
- 异常检测算法需要从 `AbstractAnomalyDetectionService` 继承,并实现其核心抽象方法 `execute`
- 预测算法需要从 `AbstractForecastService` 继承,同样需要实现其核心抽象方法 `execute`
### 类属性初始化
每个算法实现的类需要静态初始化两个类属性,分别是
每个算法实现的类需要静态初始化两个类属性,分别是
`name`: 的触发调用关键词,全小写英文字母。
`desc`算法的描述信息
- `name`:触发调用的关键词,全小写英文字母
- `desc`:算法的描述信息
### 核心方法输入与输出约定
`execute` 是算法处理的核心方法。调用该方法的时候,`self.list` 已经设置好输入数组。
异常检测输出结果
`execute` 的返回值是长度与 `self.list` 相同的数组,数组位置为 -1 的即为异常值点。例如:输入数组是 [2, 2, 2, 2, 100] 如果 100 是异常点,那么返回值是 [1, 1, 1, 1, -1]。
预测输出结果
对于预测算法,`AbstractForecastService` 的对象属性说明如下:
|属性名称|说明|默认值|
|---|---|---|
|period|输入时序数据的周期性,多少个数据点表示一个完整的周期。如果没有周期性,那么设置为 0 即可| 0|
|start_ts|预测数据的开始时间| 0|
|period|输入时间序列的周期性,多少个数据点表示一个完整的周期。如果没有周期性,那么设置为 0 即可| 0|
|start_ts|预测结果的开始时间| 0|
|time_step|预测结果的两个数据点之间时间间隔|0 |
|fc_rows|预测结果数量| 0 |
|return_conf|返回结果中是否包含执行区间范围,如果算法计算结果不包含置信区间,那么上界和下界与自身相同| 1|
|conf|执行区间分位数 0.05|
|fc_rows|预测结果数量| 0 |
|return_conf|预测结果中是否包含置信区间范围,如果不包含置信区间,那么上界和下界与自身相同| 1|
|conf|置信区间分位数 0.05|
预测返回结果如下:

View File

@ -156,7 +156,7 @@ SHOW ANODES;
SHOW ANODES FULL;
```
#### 强制刷新 TDengine 集群中分析算法缓存
#### 强制刷新集群中分析算法缓存
```SQL
UPDATE ANODE {node_id}
UPDATE ALL ANODES

View File

@ -163,3 +163,15 @@ s3BucketName td-test
- 认为全部 S3 服务均指向同一数据源,对各个 S3 服务操作完全等价
- 在某一 S3 服务上操作失败后会切换至其他服务,全部服务都失败后将返回最后产生的错误码
- 最大支持的 S3 服务配置数为 10
### 不依赖 Flexify 服务
用户界面同 S3不同的地方在于下面三个参数的配置
| # | 参数 | 示例值 | 描述 |
| :--- | :----------- | :--------------------------------------- | :----------------------------------------------------------- |
| 1 | s3EndPoint | https://fd2d01c73.blob.core.windows.net | Blob URL |
| 2 | s3AccessKey | fd2d01c73:veUy/iRBeWaI2YAerl+AStw6PPqg== | 冒号分隔的用户 accountId:accountKey |
| 3 | s3BucketName | test-container | Container name |
其中 fd2d01c73 是账户 ID微软 Blob 存储服务只支持 Https 协议,不支持 Http。

View File

@ -189,7 +189,12 @@ static FORCE_INLINE void colDataSetDouble(SColumnInfoData* pColumnInfoData, uint
int32_t getJsonValueLen(const char* data);
// For the VAR_DATA_TYPE type, new data is inserted strictly according to the position of SVarColAttr.length.
// If the same row is inserted repeatedly, data holes will result.
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
// For the VAR_DATA_TYPE type, if a row already has data before inserting it (judged by offset != -1),
// it will be inserted at the original position and the old data will be overwritten.
int32_t colDataSetValOrCover(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData);
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows,
bool trimValue);
@ -233,7 +238,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
* @brief find how many rows already in order start from first row
*/
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo);
void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk);
int32_t blockDataCheck(const SSDataBlock* pDataBlock);
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
@ -266,7 +271,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId)
int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData);
int32_t blockGetEncodeSize(const SSDataBlock* pBlock);
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataLen, int32_t numOfCols);
int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos);
// for debug

View File

@ -142,6 +142,7 @@ extern bool tsMonitorForceV2;
// audit
extern bool tsEnableAudit;
extern bool tsEnableAuditCreateTable;
extern bool tsEnableAuditDelete;
extern int32_t tsAuditInterval;
// telem
@ -153,6 +154,12 @@ extern bool tsEnableCrashReport;
extern char *tsTelemUri;
extern char *tsClientCrashReportUri;
extern char *tsSvrCrashReportUri;
extern int8_t tsSafetyCheckLevel;
enum {
TSDB_SAFETY_CHECK_LEVELL_NEVER = 0,
TSDB_SAFETY_CHECK_LEVELL_NORMAL = 1,
TSDB_SAFETY_CHECK_LEVELL_BYROW = 2,
};
// query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing

View File

@ -1024,6 +1024,7 @@ typedef struct {
char sDetailVer[128];
int64_t whiteListVer;
SMonitorParas monitorParas;
int8_t enableAuditDelete;
} SConnectRsp;
int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
@ -1217,6 +1218,7 @@ typedef struct {
int32_t bytes;
int8_t type;
uint8_t pk;
bool noData;
} SColumnInfo;
typedef struct STimeWindow {
@ -1828,6 +1830,17 @@ int32_t tSerializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq);
int32_t tDeserializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq);
void tFreeSStatisReq(SStatisReq* pReq);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
char table[TSDB_TABLE_NAME_LEN];
char operation[AUDIT_OPERATION_LEN];
int32_t sqlLen;
char* pSql;
} SAuditReq;
int32_t tSerializeSAuditReq(void* buf, int32_t bufLen, SAuditReq* pReq);
int32_t tDeserializeSAuditReq(void* buf, int32_t bufLen, SAuditReq* pReq);
void tFreeSAuditReq(SAuditReq* pReq);
typedef struct {
int32_t dnodeId;
int64_t clusterId;
@ -3418,6 +3431,7 @@ typedef struct {
int32_t svrTimestamp;
SArray* rsps; // SArray<SClientHbRsp>
SMonitorParas monitorParas;
int8_t enableAuditDelete;
} SClientHbBatchRsp;
static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { return taosIntHash_64(key, keyLen); }

View File

@ -259,6 +259,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_DROP_ORPHANTASKS, "stream-drop-orphan-tasks", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_TASK_RESET, "stream-reset-tasks", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUDIT, "audit", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8

View File

@ -298,6 +298,7 @@ typedef struct {
#define IS_VALID_UINT64(_t) ((_t) >= 0 && (_t) <= UINT64_MAX)
#define IS_VALID_FLOAT(_t) ((_t) >= -FLT_MAX && (_t) <= FLT_MAX)
#define IS_VALID_DOUBLE(_t) ((_t) >= -DBL_MAX && (_t) <= DBL_MAX)
#define IS_INVALID_TYPE(_t) ((_t) < TSDB_DATA_TYPE_NULL || (_t) >= TSDB_DATA_TYPE_MAX)
#define IS_CONVERT_AS_SIGNED(_t) \
(IS_SIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))

View File

@ -29,7 +29,6 @@ extern "C" {
#endif
#define AUDIT_DETAIL_MAX 65472
#define AUDIT_OPERATION_LEN 20
typedef struct {
const char *server;

View File

@ -292,6 +292,7 @@ bool fmIsElapsedFunc(int32_t funcId);
void getLastCacheDataType(SDataType* pType, int32_t pkBytes);
int32_t createFunction(const char* pName, SNodeList* pParameterList, SFunctionNode** pFunc);
int32_t createFunctionWithSrcFunc(const char* pName, const SFunctionNode* pSrcFunc, SNodeList* pParameterList, SFunctionNode** pFunc);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMidFunc, SFunctionNode** pMergeFunc);

View File

@ -24,6 +24,7 @@ extern "C" {
#include "thash.h"
#include "query.h"
#include "tqueue.h"
#include "clientInt.h"
typedef enum {
SQL_RESULT_SUCCESS = 0,
@ -81,6 +82,8 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name,
void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values);
const char* monitorResultStr(SQL_RESULT_CODE code);
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data);
void clientOperateReport(SRequestObj* pRequest);
#ifdef __cplusplus
}
#endif

View File

@ -105,6 +105,7 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t timeZoneStrLen();
int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t weekdayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t dayofweekFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);

View File

@ -653,6 +653,8 @@ enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 };
#define MONITOR_TAG_VALUE_LEN 300
#define MONITOR_METRIC_NAME_LEN 100
#define AUDIT_OPERATION_LEN 20
typedef enum {
ANAL_ALGO_TYPE_ANOMALY_DETECT = 0,
ANAL_ALGO_TYPE_FORECAST = 1,

View File

@ -108,6 +108,10 @@ typedef struct SQueryExecMetric {
int64_t execCostUs;
} SQueryExecMetric;
typedef struct {
SMonitorParas monitorParas;
int8_t enableAuditDelete;
} SAppInstServerCFG;
struct SAppInstInfo {
int64_t numOfConns;
SCorEpSet mgmtEp;
@ -121,7 +125,7 @@ struct SAppInstInfo {
void* pTransporter;
SAppHbMgr* pAppHbMgr;
char* instKey;
SMonitorParas monitorParas;
SAppInstServerCFG serverCfg;
};
typedef struct SAppInfo {
@ -297,8 +301,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
bool convertUcs4);
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4);
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq);

View File

@ -166,11 +166,11 @@ static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)));
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(
json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)));
if (pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen) {
char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen];
pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0';
if (pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) {
char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen];
pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0';
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = tmp;
pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = tmp;
} else {
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
}
@ -284,7 +284,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
}
}
if (pTscObj->pAppInfo->monitorParas.tsEnableMonitor) {
if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) {
sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
@ -294,15 +294,15 @@ static void deregisterRequest(SRequestObj *pRequest) {
}
}
if ((duration >= pTscObj->pAppInfo->monitorParas.tsSlowLogThreshold * 1000000UL ||
duration >= pTscObj->pAppInfo->monitorParas.tsSlowLogThresholdTest * 1000000UL) &&
checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->monitorParas.tsSlowLogExceptDb)) {
if ((duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThreshold * 1000000UL ||
duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThresholdTest * 1000000UL) &&
checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogExceptDb)) {
(void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
if (pTscObj->pAppInfo->monitorParas.tsSlowLogScope & reqType) {
if (pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & reqType) {
taosPrintSlowLog("PID:%d, Conn:%u,QID:0x%" PRIx64 ", Start:%" PRId64 " us, Duration:%" PRId64 "us, SQL:%s",
taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
pRequest->sqlstr);
if (pTscObj->pAppInfo->monitorParas.tsEnableMonitor) {
if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration);
if (TSDB_CODE_SUCCESS != generateWriteSlowLog(pTscObj, pRequest, reqType, duration)) {
tscError("failed to generate write slow log");
@ -689,7 +689,7 @@ void doDestroyRequest(void *p) {
int32_t code = taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to remove request from hash, code:%s", tstrerror(code));
tscWarn("failed to remove request from hash, code:%s", tstrerror(code));
}
schedulerFreeJob(&pRequest->body.queryJob, 0);

View File

@ -605,7 +605,8 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
return code;
}
pInst->monitorParas = pRsp.monitorParas;
pInst->serverCfg.monitorParas = pRsp.monitorParas;
pInst->serverCfg.enableAuditDelete = pRsp.enableAuditDelete;
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", pInst->clusterId,
pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);

View File

@ -2084,12 +2084,12 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
return TSDB_CODE_SUCCESS;
}
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength) {
int32_t idx = -1;
iconv_t conv = taosAcquireConv(&idx, C2M);
if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
for (int32_t i = 0; i < numOfCols; ++i) {
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
int32_t type = pResultInfo->fields[i].type;
int32_t bytes = pResultInfo->fields[i].bytes;
@ -2103,7 +2103,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
pResultInfo->convertBuf[i] = p;
SResultColumn* pCol = &pResultInfo->pCol[i];
for (int32_t j = 0; j < numOfRows; ++j) {
for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
if (pCol->offset[j] != -1) {
char* pStart = pCol->offset[j] + pCol->pData;
@ -2136,10 +2136,13 @@ int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
numOfCols * (sizeof(int8_t) + sizeof(int32_t));
}
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) {
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
char* p = (char*)pResultInfo->pData;
int32_t blockVersion = *(int32_t*)p;
int32_t numOfRows = pResultInfo->numOfRows;
int32_t numOfCols = pResultInfo->numOfCols;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
// length |
int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
@ -2198,10 +2201,16 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i
}
pStart += colLen;
}
// Ensure the complete structure of the block, including the blankfill field,
// even though it is not used on the client side.
len += sizeof(bool);
return len;
}
static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) {
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
int32_t numOfRows = pResultInfo->numOfRows;
int32_t numOfCols = pResultInfo->numOfCols;
bool needConvert = false;
for (int32_t i = 0; i < numOfCols; ++i) {
if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
@ -2218,7 +2227,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
char* p = (char*)pResultInfo->pData;
int32_t blockVersion = *(int32_t*)p;
int32_t dataLen = estimateJsonLen(pResultInfo, numOfCols, numOfRows);
int32_t dataLen = estimateJsonLen(pResultInfo);
if (dataLen <= 0) {
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
@ -2341,27 +2350,36 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
pStart1 += colLen1;
}
// Ensure the complete structure of the block, including the blankfill field,
// even though it is not used on the client side.
// (void)memcpy(pStart1, pStart, sizeof(bool));
totalLen += sizeof(bool);
*(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
pResultInfo->pData = pResultInfo->convertJson;
return TSDB_CODE_SUCCESS;
}
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
bool convertUcs4) {
if (numOfCols <= 0 || pFields == NULL || pResultInfo == NULL) {
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4) {
if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
tscError("setResultDataPtr paras error");
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (numOfRows == 0) {
if (pResultInfo->numOfRows == 0) {
return TSDB_CODE_SUCCESS;
}
if (pResultInfo->pData == NULL) {
tscError("setResultDataPtr error: pData is NULL");
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
int32_t code = doPrepareResPtr(pResultInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doConvertJson(pResultInfo, numOfCols, numOfRows);
code = doConvertJson(pResultInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -2381,9 +2399,9 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
int32_t cols = *(int32_t*)p;
p += sizeof(int32_t);
if (rows != numOfRows || cols != numOfCols) {
tscError("setResultDataPtr paras error:rows;%d numOfRows:%d cols:%d numOfCols:%d", rows, numOfRows, cols,
numOfCols);
if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows, pResultInfo->numOfRows, cols,
pResultInfo->numOfCols);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
@ -2394,7 +2412,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
p += sizeof(uint64_t);
// check fields
for (int32_t i = 0; i < numOfCols; ++i) {
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
int8_t type = *(int8_t*)p;
p += sizeof(int8_t);
@ -2403,10 +2421,14 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
}
int32_t* colLength = (int32_t*)p;
p += sizeof(int32_t) * numOfCols;
p += sizeof(int32_t) * pResultInfo->numOfCols;
char* pStart = p;
for (int32_t i = 0; i < numOfCols; ++i) {
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
if ((pStart - pResultInfo->pData) >= dataLen) {
tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (blockVersion == BLOCK_VERSION_1) {
colLength[i] = htonl(colLength[i]);
}
@ -2414,10 +2436,13 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
tscError("invalid type %d", pResultInfo->fields[i].type);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
pResultInfo->pCol[i].offset = (int32_t*)pStart;
pStart += numOfRows * sizeof(int32_t);
pStart += pResultInfo->numOfRows * sizeof(int32_t);
} else {
pResultInfo->pCol[i].nullbitmap = pStart;
pStart += BitmapLen(pResultInfo->numOfRows);
@ -2430,11 +2455,17 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
pStart += colLength[i];
}
p = pStart;
// bool blankFill = *(bool*)p;
p += sizeof(bool);
int32_t offset = p - pResultInfo->pData;
if (offset > dataLen) {
tscError("invalid offset %d, dataLen %d", offset, dataLen);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (convertUcs4) {
code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength);
code = doConvertUCS4(pResultInfo, colLength);
}
return code;
@ -2547,7 +2578,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
pResultInfo->totalRows += pResultInfo->numOfRows;
int32_t code =
setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, convertUcs4);
setResultDataPtr(pResultInfo, convertUcs4);
return code;
}
@ -2842,6 +2873,7 @@ void syncQueryFn(void* param, void* res, int32_t code) {
if (pParam->pRequest) {
pParam->pRequest->code = code;
clientOperateReport(pParam->pRequest);
}
if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {

View File

@ -2,8 +2,6 @@
#include "cJSON.h"
#include "clientInt.h"
#include "clientLog.h"
#include "os.h"
#include "tglobal.h"
#include "tmisce.h"
#include "tqueue.h"
#include "ttime.h"
@ -19,6 +17,7 @@ STaosQueue* monitorQueue;
SHashObj* monitorSlowLogHash;
char tmpSlowLogPath[PATH_MAX] = {0};
TdThread monitorThread;
extern bool tsEnableAuditDelete;
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) {
int ret = tsnprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir);
@ -216,7 +215,7 @@ static void reportSendProcess(void* param, void* tmrId) {
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
bool reset =
taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
taosTmrReset(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
tscDebug("reset timer, pMonitor:%p, %d", pMonitor, reset);
taosRUnLockLatch(&monitorLock);
}
@ -289,7 +288,7 @@ void monitorCreateClient(int64_t clusterId) {
goto fail;
}
pMonitor->timer =
taosTmrStart(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer);
taosTmrStart(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer);
if (pMonitor->timer == NULL) {
tscError("failed to start timer");
goto fail;
@ -660,7 +659,7 @@ static void monitorSendAllSlowLog() {
taosHashCancelIterate(monitorSlowLogHash, pIter);
return;
}
if (t - pClient->lastCheckTime > pInst->monitorParas.tsMonitorInterval * 1000) {
if (t - pClient->lastCheckTime > pInst->serverCfg.monitorParas.tsMonitorInterval * 1000) {
pClient->lastCheckTime = t;
} else {
continue;
@ -686,7 +685,7 @@ static void monitorSendAllSlowLog() {
static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
if (pInst == NULL || !pInst->monitorParas.tsEnableMonitor) {
if (pInst == NULL || !pInst->serverCfg.monitorParas.tsEnableMonitor) {
tscInfo("[monitor] monitor is disabled, skip send slow log");
return;
}
@ -932,4 +931,101 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
taosFreeQitem(slowLogData);
}
return 0;
}
}
int32_t reportCB(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tscDebug("[del report]delete reportCB code:%d", code);
return 0;
}
int32_t senAuditInfo(STscObj* pTscObj, void* pReq, int32_t len, uint64_t requestId) {
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
tscError("[del report]failed to allocate memory for sendInfo");
return terrno;
}
sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = len, .handle = NULL};
sendInfo->requestId = requestId;
sendInfo->requestObjRefId = 0;
sendInfo->param = NULL;
sendInfo->fp = reportCB;
sendInfo->msgType = TDMT_MND_AUDIT;
SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int32_t code = asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
if (code != 0) {
tscError("[del report]failed to send msg to server, code:%d", code);
taosMemoryFree(sendInfo);
return code;
}
return TSDB_CODE_SUCCESS;
}
static void reportDeleteSql(SRequestObj* pRequest) {
SDeleteStmt* pStmt = (SDeleteStmt*)pRequest->pQuery->pRoot;
STscObj* pTscObj = pRequest->pTscObj;
if (pTscObj == NULL || pTscObj->pAppInfo == NULL) {
tscError("[del report]invalid tsc obj");
return;
}
if(pTscObj->pAppInfo->serverCfg.enableAuditDelete == 0) {
tscDebug("[del report]audit delete is disabled");
return;
}
if (pRequest->code != TSDB_CODE_SUCCESS) {
tscDebug("[del report]delete request result code:%d", pRequest->code);
return;
}
if (nodeType(pStmt->pFromTable) != QUERY_NODE_REAL_TABLE) {
tscError("[del report]invalid from table node type:%d", nodeType(pStmt->pFromTable));
return;
}
SRealTableNode* pTable = (SRealTableNode*)pStmt->pFromTable;
SAuditReq req;
req.pSql = pRequest->sqlstr;
req.sqlLen = pRequest->sqlLen;
TAOS_UNUSED(tsnprintf(req.table, TSDB_TABLE_NAME_LEN, "%s", pTable->table.tableName));
TAOS_UNUSED(tsnprintf(req.db, TSDB_DB_FNAME_LEN, "%s", pTable->table.dbName));
TAOS_UNUSED(tsnprintf(req.operation, AUDIT_OPERATION_LEN, "delete"));
int32_t tlen = tSerializeSAuditReq(NULL, 0, &req);
void* pReq = taosMemoryCalloc(1, tlen);
if (pReq == NULL) {
tscError("[del report]failed to allocate memory for req");
return;
}
if (tSerializeSAuditReq(pReq, tlen, &req) < 0) {
tscError("[del report]failed to serialize req");
taosMemoryFree(pReq);
return;
}
int32_t code = senAuditInfo(pRequest->pTscObj, pReq, tlen, pRequest->requestId);
if (code != 0) {
tscError("[del report]failed to send audit info, code:%d", code);
taosMemoryFree(pReq);
return;
}
tscDebug("[del report]delete data, sql:%s", req.pSql);
}
void clientOperateReport(SRequestObj* pRequest) {
if (pRequest == NULL || pRequest->pQuery == NULL || pRequest->pQuery->pRoot == NULL) {
tscError("[del report]invalid request");
return;
}
if (QUERY_NODE_DELETE_STMT == nodeType(pRequest->pQuery->pRoot)) {
reportDeleteSql(pRequest);
}
}

View File

@ -135,7 +135,8 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
// update the appInstInfo
pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
pTscObj->pAppInfo->monitorParas = connectRsp.monitorParas;
pTscObj->pAppInfo->serverCfg.monitorParas = connectRsp.monitorParas;
pTscObj->pAppInfo->serverCfg.enableAuditDelete = connectRsp.enableAuditDelete;
tscDebug("[monitor] paras from connect rsp, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d",
connectRsp.clusterId, connectRsp.monitorParas.tsSlowLogThreshold, connectRsp.monitorParas.tsSlowLogScope);
lastClusterId = connectRsp.clusterId;
@ -588,7 +589,8 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
return code;
}
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
*pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) {
code = terrno;
@ -603,7 +605,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, SHOW_VARIABLES_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SHOW_VARIABLES_RESULT_COLS);
if(len < 0) {
uError("buildShowVariablesRsp error, len:%d", len);
code = terrno;
@ -741,7 +743,8 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
return code;
}
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
*pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) {
code = terrno;
@ -757,7 +760,7 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, COMPACT_DB_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, COMPACT_DB_RESULT_COLS);
if(len < 0) {
uError("buildRetriveTableRspForCompactDb error, len:%d", len);
code = terrno;

View File

@ -1570,7 +1570,7 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat
SHashObj* pVgHash = NULL;
SRequestObj* pRequest = NULL;
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, reqid, &pRequest));
RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
rows, pData, tbname, fields, numFields);
@ -1631,7 +1631,7 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha
SHashObj* pVgHash = NULL;
SRequestObj* pRequest = NULL;
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, reqid, &pRequest));
RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname);
@ -1835,7 +1835,7 @@ end:
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
int32_t code = 0;
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, pRequest));
RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
(*pRequest)->syncQuery = true;
if (!(*pRequest)->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
@ -1855,19 +1855,19 @@ end:
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
static int32_t decodeRawData(SDecoder* decoder, void* data, int32_t dataLen, _raw_decode_func_ func,
SMqRspObj* rspObj) {
int8_t dataVersion = *(int8_t*)data;
if (dataVersion >= MQ_DATA_RSP_VERSION) {
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
dataLen -= sizeof(int8_t) + sizeof(int32_t);
int8_t dataVersion = *(int8_t*)data;
if (dataVersion >= MQ_DATA_RSP_VERSION) {
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
dataLen -= sizeof(int8_t) + sizeof(int32_t);
}
rspObj->resIter = -1;
tDecoderInit(decoder, data, dataLen);
int32_t code = func(decoder, &rspObj->dataRsp);
if (code != 0) {
SET_ERROR_MSG("decode mq taosx data rsp failed");
rspObj->resIter = -1;
tDecoderInit(decoder, data, dataLen);
int32_t code = func(decoder, &rspObj->dataRsp);
if (code != 0) {
SET_ERROR_MSG("decode mq taosx data rsp failed");
}
return code;
return code;
}
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
@ -2195,44 +2195,44 @@ static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
int32_t len = 0;
int32_t code = 0;
SEncoder encoder = {0};
void* buf = NULL;
tEncodeSize(encodeFunc, rspObj, len, code);
if (code < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
int32_t len = 0;
int32_t code = 0;
SEncoder encoder = {0};
void* buf = NULL;
tEncodeSize(encodeFunc, rspObj, len, code);
if (code < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
len += sizeof(int8_t) + sizeof(int32_t);
buf = taosMemoryCalloc(1, len);
if (buf == NULL) {
code = terrno;
goto FAILED;
len += sizeof(int8_t) + sizeof(int32_t);
buf = taosMemoryCalloc(1, len);
if (buf == NULL) {
code = terrno;
goto FAILED;
}
tEncoderInit(&encoder, buf, len);
if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
tEncoderInit(&encoder, buf, len);
if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
int32_t offsetLen = getOffSetLen(rspObj);
if (offsetLen <= 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
int32_t offsetLen = getOffSetLen(rspObj);
if (offsetLen <= 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
if (tEncodeI32(&encoder, offsetLen) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
if (tEncodeI32(&encoder, offsetLen) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
if (encodeFunc(&encoder, rspObj) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
if (encodeFunc(&encoder, rspObj) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
tEncoderClear(&encoder);
tEncoderClear(&encoder);
raw->raw = buf;
raw->raw_len = len;
return code;
raw->raw = buf;
raw->raw_len = len;
return code;
FAILED:
tEncoderClear(&encoder);
taosMemoryFree(buf);
@ -2380,4 +2380,4 @@ static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen
end:
tDeleteMqBatchMetaRsp(&rsp);
return code;
}
}

View File

@ -1695,7 +1695,7 @@ END:
}
void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd) {
if (request->pTscObj->pAppInfo->monitorParas.tsSlowLogScope & SLOW_LOG_TYPE_INSERT) {
if (request->pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & SLOW_LOG_TYPE_INSERT) {
int32_t len = 0;
int32_t rlen = 0;
char *p = NULL;
@ -1740,7 +1740,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
SSmlHandle *info = NULL;
int cnt = 0;
while (1) {
SML_CHECK_CODE(createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid, &request));
SML_CHECK_CODE(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &request, reqid));
SSmlMsgBuf msg = {request->msgBufLen, request->msgBuf};
request->code = smlBuildSmlInfo(taos, &info);
SML_CHECK_CODE(request->code);

View File

@ -2869,8 +2869,7 @@ int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pRes
pRspObj->resInfo.precision = precision;
pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
int32_t code = setResultDataPtr(&pRspObj->resInfo, pRspObj->resInfo.fields, pRspObj->resInfo.numOfCols,
pRspObj->resInfo.numOfRows, convertUcs4);
int32_t code = setResultDataPtr(&pRspObj->resInfo, convertUcs4);
if (code != 0) {
return code;
}

View File

@ -165,8 +165,8 @@ static const SSysDbTableSchema userStbsSchema[] = {
static const SSysDbTableSchema streamSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "stream_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "stream_id", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_id", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
@ -190,9 +190,9 @@ static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "process_total", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "process_throughput", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "out_total", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "out_throughput", .bytes = 14, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "process_throughput", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "out_total", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "out_throughput", .bytes = 15, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
// {.name = "dispatch_throughput", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
// {.name = "dispatch_total", .bytes = 12, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},

View File

@ -18,6 +18,7 @@
#include "tcompare.h"
#include "tlog.h"
#include "tname.h"
#include "tglobal.h"
#define MALLOC_ALIGN_BYTES 32
@ -86,8 +87,18 @@ int32_t getJsonValueLen(const char* data) {
return dataLen;
}
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
if (isNull || pData == NULL) {
static int32_t getDataLen(int32_t type, const char* pData) {
int32_t dataLen = 0;
if (type == TSDB_DATA_TYPE_JSON) {
dataLen = getJsonValueLen(pData);
} else {
dataLen = varDataTLen(pData);
}
return dataLen;
}
static int32_t colDataSetValHelp(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
if (isNull || pData == NULL) {
// There is a placehold for each NULL value of binary or nchar type.
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
pColumnInfoData->varmeta.offset[rowIndex] = -1; // it is a null value of VAR type.
@ -101,11 +112,9 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
int32_t type = pColumnInfoData->info.type;
if (IS_VAR_DATA_TYPE(type)) {
int32_t dataLen = 0;
if (type == TSDB_DATA_TYPE_JSON) {
dataLen = getJsonValueLen(pData);
} else {
dataLen = varDataTLen(pData);
int32_t dataLen = getDataLen(type, pData);
if (pColumnInfoData->varmeta.offset[rowIndex] > 0) {
pColumnInfoData->varmeta.length = pColumnInfoData->varmeta.offset[rowIndex];
}
SVarColAttr* pAttr = &pColumnInfoData->varmeta;
@ -134,7 +143,7 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
uint32_t len = pColumnInfoData->varmeta.length;
pColumnInfoData->varmeta.offset[rowIndex] = len;
(void) memmove(pColumnInfoData->pData + len, pData, dataLen);
(void)memmove(pColumnInfoData->pData + len, pData, dataLen);
pColumnInfoData->varmeta.length += dataLen;
} else {
memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes);
@ -144,6 +153,18 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
return 0;
}
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
pColumnInfoData->varmeta.offset[rowIndex] = -1;
}
return colDataSetValHelp(pColumnInfoData, rowIndex, pData, isNull);
}
int32_t colDataSetValOrCover(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
return colDataSetValHelp(pColumnInfoData, rowIndex, pData, isNull);
}
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx,
const char* pData) {
int32_t type = pColumnInfoData->info.type;
@ -3041,8 +3062,12 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
}
// return length of encoded data, return -1 if failed
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
blockDataCheck(pBlock, false);
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) {
int32_t code = blockDataCheck(pBlock);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return -1;
}
int32_t dataLen = 0;
@ -3106,9 +3131,11 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
size_t metaSize = 0;
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
metaSize = numOfRows * sizeof(int32_t);
if(dataLen + metaSize > dataBuflen) goto _exit;
memcpy(data, pColRes->varmeta.offset, metaSize);
} else {
metaSize = BitmapLen(numOfRows);
if(dataLen + metaSize > dataBuflen) goto _exit;
memcpy(data, pColRes->nullbitmap, metaSize);
}
@ -3127,12 +3154,14 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
}
colSizes[col] += colSize;
dataLen += colSize;
if(dataLen > dataBuflen) goto _exit;
(void) memmove(data, pColData, colSize);
data += colSize;
}
} else {
colSizes[col] = colDataGetLength(pColRes, numOfRows);
dataLen += colSizes[col];
if(dataLen > dataBuflen) goto _exit;
if (pColRes->pData != NULL) {
(void) memmove(data, pColRes->pData, colSizes[col]);
}
@ -3156,7 +3185,14 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
*actualLen = dataLen;
*groupId = pBlock->info.id.groupId;
if (dataLen > dataBuflen) goto _exit;
return dataLen;
_exit:
uError("blockEncode dataLen:%d, dataBuflen:%zu", dataLen, dataBuflen);
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
return -1;
}
int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos) {
@ -3286,9 +3322,13 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos
*pEndPos = pStart;
blockDataCheck(pBlock, false);
code = blockDataCheck(pBlock);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}
return code;
return TSDB_CODE_SUCCESS;
}
int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
@ -3498,20 +3538,19 @@ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
return nextRowIdx;
}
void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) {
return;
if (NULL == pDataBlock || pDataBlock->info.rows == 0) {
return;
#define BLOCK_DATA_CHECK_TRESSA(o) \
if (!(o)) { \
uError("blockDataCheck failed! line:%d", __LINE__); \
return TSDB_CODE_INTERNAL_ERROR; \
}
int32_t blockDataCheck(const SSDataBlock* pDataBlock) {
if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER || NULL == pDataBlock || pDataBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
#define BLOCK_DATA_CHECK_TRESSA(o) ;
//#define BLOCK_DATA_CHECK_TRESSA(o) A S S E R T(o)
BLOCK_DATA_CHECK_TRESSA(pDataBlock->info.rows > 0);
if (!pDataBlock->info.dataLoad && !forceChk) {
return;
if (!pDataBlock->info.dataLoad) {
return TSDB_CODE_SUCCESS;
}
bool isVarType = false;
@ -3522,8 +3561,10 @@ void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) {
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < colNum; ++i) {
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, i);
BLOCK_DATA_CHECK_TRESSA(pCol != NULL);
isVarType = IS_VAR_DATA_TYPE(pCol->info.type);
checkRows = pDataBlock->info.rows;
if (pCol->info.noData == true) continue;
if (isVarType) {
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset);
@ -3531,27 +3572,39 @@ void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) {
BLOCK_DATA_CHECK_TRESSA(pCol->nullbitmap);
}
nextPos = 0;
nextPos = -1;
for (int64_t r = 0; r < checkRows; ++r) {
if (tsSafetyCheckLevel <= TSDB_SAFETY_CHECK_LEVELL_NORMAL) break;
if (!colDataIsNull_s(pCol, r)) {
BLOCK_DATA_CHECK_TRESSA(pCol->pData);
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.length <= pCol->varmeta.allocLen);
if (isVarType) {
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.allocLen > 0);
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] < pCol->varmeta.length);
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] <= pCol->varmeta.length);
if (pCol->reassigned) {
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] >= 0);
} else if (0 == r) {
} else if (0 == r || nextPos == -1) {
nextPos = pCol->varmeta.offset[r];
} else {
BLOCK_DATA_CHECK_TRESSA(pCol->varmeta.offset[r] == nextPos);
}
colLen = varDataTLen(pCol->pData + pCol->varmeta.offset[r]);
BLOCK_DATA_CHECK_TRESSA(colLen >= VARSTR_HEADER_SIZE);
char* pColData = pCol->pData + pCol->varmeta.offset[r];
int32_t colSize = 0;
if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
colLen = getJsonValueLen(pColData);
} else {
colLen = varDataTLen(pColData);
}
if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
BLOCK_DATA_CHECK_TRESSA(colLen >= CHAR_BYTES);
} else {
BLOCK_DATA_CHECK_TRESSA(colLen >= VARSTR_HEADER_SIZE);
}
BLOCK_DATA_CHECK_TRESSA(colLen <= pCol->info.bytes);
if (pCol->reassigned) {
BLOCK_DATA_CHECK_TRESSA((pCol->varmeta.offset[r] + colLen) <= pCol->varmeta.length);
} else {
@ -3561,13 +3614,21 @@ void blockDataCheck(const SSDataBlock* pDataBlock, bool forceChk) {
typeValue = *(char*)(pCol->pData + pCol->varmeta.offset[r] + colLen - 1);
} else {
GET_TYPED_DATA(typeValue, int64_t, pCol->info.type, colDataGetNumData(pCol, r));
if (TSDB_DATA_TYPE_FLOAT == pCol->info.type) {
float v = 0;
GET_TYPED_DATA(v, float, pCol->info.type, colDataGetNumData(pCol, r));
} else if (TSDB_DATA_TYPE_DOUBLE == pCol->info.type) {
double v = 0;
GET_TYPED_DATA(v, double, pCol->info.type, colDataGetNumData(pCol, r));
} else {
GET_TYPED_DATA(typeValue, int64_t, pCol->info.type, colDataGetNumData(pCol, r));
}
}
}
}
}
return;
return TSDB_CODE_SUCCESS;
}

View File

@ -119,6 +119,7 @@ bool tsMonitorForceV2 = true;
// audit
bool tsEnableAudit = true;
bool tsEnableAuditCreateTable = true;
bool tsEnableAuditDelete = true;
int32_t tsAuditInterval = 5000;
// telem
@ -137,8 +138,9 @@ bool tsEnableCrashReport = false;
#else
bool tsEnableCrashReport = true;
#endif
char *tsClientCrashReportUri = "/ccrashreport";
char *tsSvrCrashReportUri = "/dcrashreport";
char *tsClientCrashReportUri = "/ccrashreport";
char *tsSvrCrashReportUri = "/dcrashreport";
int8_t tsSafetyCheckLevel = TSDB_SAFETY_CHECK_LEVELL_NORMAL;
// schemaless
bool tsSmlDot2Underline = true;
@ -610,6 +612,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(
cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorScope", tsRandErrScope, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "safetyCheckLevel", tsSafetyCheckLevel, 0, 5, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, TSDB_MAX_RPC_THREADS);
@ -777,6 +780,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "monitorForceV2", tsMonitorForceV2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableAuditDelete", tsEnableAuditDelete, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE));
@ -1305,6 +1309,9 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "tsmaDataDeleteMark");
tsmaDataDeleteMark = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "safetyCheckLevel");
tsSafetyCheckLevel = pItem->i32;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -1490,6 +1497,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "auditCreateTable");
tsEnableAuditCreateTable = pItem->bval;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "enableAuditDelete");
tsEnableAuditDelete = pItem->bval;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "auditInterval");
tsAuditInterval = pItem->i32;
@ -2049,7 +2059,8 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"s3UploadDelaySec", &tsS3UploadDelaySec},
{"supportVnodes", &tsNumOfSupportVnodes},
{"experimental", &tsExperimental},
{"maxTsmaNum", &tsMaxTsmaNum}};
{"maxTsmaNum", &tsMaxTsmaNum},
{"safetyCheckLevel", &tsSafetyCheckLevel}};
if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false);
@ -2305,7 +2316,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
{"experimental", &tsExperimental},
{"multiResultFunctionStarReturnTags", &tsMultiResultFunctionStarReturnTags},
{"maxTsmaCalcDelay", &tsMaxTsmaCalcDelay},
{"tsmaDataDeleteMark", &tsmaDataDeleteMark}};
{"tsmaDataDeleteMark", &tsmaDataDeleteMark},
{"safetyCheckLevel", &tsSafetyCheckLevel}};
if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false);

View File

@ -567,6 +567,7 @@ int32_t tSerializeSClientHbBatchRsp(void *buf, int32_t bufLen, const SClientHbBa
TAOS_CHECK_EXIT(tSerializeSClientHbRsp(&encoder, pRsp));
}
TAOS_CHECK_EXIT(tSerializeSMonitorParas(&encoder, &pBatchRsp->monitorParas));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pBatchRsp->enableAuditDelete));
tEndEncode(&encoder);
_exit:
@ -609,6 +610,12 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR
TAOS_CHECK_EXIT(tDeserializeSMonitorParas(&decoder, &pBatchRsp->monitorParas));
}
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pBatchRsp->enableAuditDelete));
} else {
pBatchRsp->enableAuditDelete = 0;
}
tEndDecode(&decoder);
_exit:
@ -1813,6 +1820,60 @@ _exit:
void tFreeSDropUserReq(SDropUserReq *pReq) { FREESQL(); }
int32_t tSerializeSAuditReq(void *buf, int32_t bufLen, SAuditReq *pReq) {
SEncoder encoder = {0};
int32_t code = 0;
int32_t lino;
int32_t tlen;
tEncoderInit(&encoder, buf, bufLen);
TAOS_CHECK_EXIT(tStartEncode(&encoder));
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->operation));
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->db));
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->table));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->sqlLen));
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->pSql));
tEndEncode(&encoder);
_exit:
if (code) {
tlen = code;
} else {
tlen = encoder.pos;
}
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSAuditReq(void *buf, int32_t bufLen, SAuditReq *pReq) {
SDecoder decoder = {0};
int32_t code = 0;
int32_t lino;
tDecoderInit(&decoder, buf, bufLen);
TAOS_CHECK_EXIT(tStartDecode(&decoder));
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->operation));
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->db));
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->table));
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->sqlLen));
if (pReq->sqlLen > 0) {
pReq->pSql = taosMemoryMalloc(pReq->sqlLen + 1);
if (pReq->pSql == NULL) {
TAOS_CHECK_EXIT(terrno);
}
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->pSql));
}
tEndDecode(&decoder);
_exit:
tDecoderClear(&decoder);
return code;
}
void tFreeSAuditReq(SAuditReq *pReq) { taosMemoryFreeClear(pReq->pSql); }
SIpWhiteList *cloneIpWhiteList(SIpWhiteList *pIpWhiteList) {
if (pIpWhiteList == NULL) return NULL;
@ -6294,6 +6355,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->authVer));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->whiteListVer));
TAOS_CHECK_EXIT(tSerializeSMonitorParas(&encoder, &pRsp->monitorParas));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRsp->enableAuditDelete));
tEndEncode(&encoder);
_exit:
@ -6345,6 +6407,11 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDeserializeSMonitorParas(&decoder, &pRsp->monitorParas));
}
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRsp->enableAuditDelete));
} else {
pRsp->enableAuditDelete = 0;
}
tEndDecode(&decoder);
_exit:

View File

@ -548,8 +548,8 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols +
blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(numOfCols);
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
@ -574,7 +574,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
pStart += sizeof(SSysTableSchema);
}
int32_t len = blockEncode(pBlock, pStart, numOfCols);
int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
if (len < 0) {
dError("failed to retrieve data since %s", tstrerror(code));
blockDataDestroy(pBlock);

View File

@ -212,6 +212,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_VIEW, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_VIEW_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STATIS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUDIT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_COMPACT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_CLUSTER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -86,6 +86,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq);
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
@ -125,6 +126,7 @@ int32_t mndInitDnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq);
mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
@ -604,6 +606,24 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
return 0;
}
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
mTrace("process audit req:%p", pReq);
if (tsEnableAudit && tsEnableAuditDelete) {
SMnode *pMnode = pReq->info.node;
SAuditReq auditReq = {0};
TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
auditReq.sqlLen);
tFreeSAuditReq(&auditReq);
}
return 0;
}
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
int32_t code = 0, lino = 0;
SDnodeInfoReq infoReq = {0};

View File

@ -305,6 +305,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
connectRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
connectRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
connectRsp.monitorParas.tsSlowLogThresholdTest = tsSlowLogThresholdTest;
connectRsp.enableAuditDelete = tsEnableAuditDelete;
tstrncpy(connectRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
connectRsp.whiteListVer = pUser->ipWhiteListVer;
@ -709,6 +710,7 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
tstrncpy(batchRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
batchRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
batchRsp.monitorParas.tsSlowLogScope = tsSlowLogScope;
batchRsp.enableAuditDelete = tsEnableAuditDelete;
int32_t sz = taosArrayGetSize(batchReq.reqs);
for (int i = 0; i < sz; i++) {

View File

@ -333,8 +333,8 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
}
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns +
blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock));
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns + dataEncodeBufSize;
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
@ -361,7 +361,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
pStart += sizeof(SSysTableSchema);
}
int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, pShow->pMeta->numOfColumns);
if(len < 0){
mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, tstrerror(code));
code = terrno;

View File

@ -16,7 +16,8 @@
#include "tq.h"
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + blockGetEncodeSize(pBlock);
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) {
return terrno;
@ -28,7 +29,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
pRetrieve->compressed = 0;
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols);
if(actualLen < 0){
taosMemoryFree(buf);
return terrno;

View File

@ -628,6 +628,9 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void*
tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
TSDB_CHECK_NULL(tmp, code, line, END, terrno)
colDataSetNULL(tmp, i);
tmp = taosArrayGet(pDelBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
TSDB_CHECK_NULL(tmp, code, line, END, terrno)
colDataSetNULL(tmp, i);
}
if (type == 0) {

View File

@ -22,13 +22,16 @@
#include "taoserror.h"
#include "tglobal.h"
int32_t azBegin() { return TSDB_CODE_SUCCESS; }
void azEnd() {}
#if defined(USE_S3)
#include <azure/core.hpp>
#include <azure/storage/blobs.hpp>
#include "td_block_blob_client.hpp"
// Add appropriate using namespace directives
using namespace Azure::Storage;
using namespace Azure::Storage::Blobs;
@ -40,10 +43,6 @@ extern char tsS3BucketName[TSDB_FQDN_LEN];
extern int8_t tsS3Enabled;
extern int8_t tsS3EpNum;
int32_t azBegin() { return TSDB_CODE_SUCCESS; }
void azEnd() {}
static void checkPrint(const char *fmt, ...) {
va_list arg_ptr;
va_start(arg_ptr, fmt);
@ -223,7 +222,6 @@ static int32_t azPutObjectFromFileOffsetImpl(const char *file, const char *objec
uint8_t blobContent[] = "Hello Azure!";
// Create the block blob client
// BlockBlobClient blobClient = containerClient.GetBlockBlobClient(blobName);
// TDBlockBlobClient blobClient(containerClient.GetBlobClient(blobName));
TDBlockBlobClient blobClient(containerClient.GetBlobClient(object_name));
blobClient.UploadFrom(file, offset, size);
@ -467,7 +465,7 @@ int32_t azGetObjectToFile(const char *object_name, const char *fileName) {
TAOS_RETURN(code);
}
int32_t azGetObjectsByPrefix(const char *prefix, const char *path) {
static int32_t azGetObjectsByPrefixImpl(const char *prefix, const char *path) {
const std::string delimiter = "/";
std::string accountName = tsS3AccessKeyId[0];
std::string accountKey = tsS3AccessKeySecret[0];
@ -514,6 +512,23 @@ int32_t azGetObjectsByPrefix(const char *prefix, const char *path) {
return 0;
}
int32_t azGetObjectsByPrefix(const char *prefix, const char *path) {
int32_t code = 0;
try {
code = azGetObjectsByPrefixImpl(prefix, path);
} catch (const std::exception &e) {
azError("%s: Reason Phrase: %s", __func__, e.what());
code = TAOS_SYSTEM_ERROR(EIO);
azError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
TAOS_RETURN(code);
}
TAOS_RETURN(code);
}
int32_t azDeleteObjects(const char *object_name[], int nobject) {
for (int i = 0; i < nobject; ++i) {
azDeleteObjectsByPrefix(object_name[i]);
@ -524,10 +539,6 @@ int32_t azDeleteObjects(const char *object_name[], int nobject) {
#else
int32_t azBegin() { return TSDB_CODE_SUCCESS; }
void azEnd() {}
int32_t azCheckCfg() { return TSDB_CODE_SUCCESS; }
int32_t azPutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) {

View File

@ -35,7 +35,8 @@
extern SConfig* tsCfg;
static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRetrieveTableRsp** pRsp) {
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
*pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) {
return terrno;
@ -49,8 +50,8 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(numOfCols);
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, numOfCols);
if (len < 0) {
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, numOfCols);
if(len < 0) {
taosMemoryFree(*pRsp);
return terrno;
}

View File

@ -1966,7 +1966,8 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
pBlock->info.rows = rowNum;
int32_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
int32_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize);
if (NULL == rsp) {
@ -1977,7 +1978,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
rsp->completed = 1;
rsp->numOfRows = htobe64((int64_t)rowNum);
int32_t len = blockEncode(pBlock, rsp->data + PAYLOAD_PREFIX_LEN, taosArrayGetSize(pBlock->pDataBlock));
int32_t len = blockEncode(pBlock, rsp->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, taosArrayGetSize(pBlock->pDataBlock));
if(len < 0) {
qError("qExplainGetRspFromCtx: blockEncode failed");
QRY_ERR_JRET(terrno);

View File

@ -45,6 +45,7 @@ typedef struct SDataDispatchHandle {
SDataBlockDescNode* pSchema;
STaosQueue* pDataBlocks;
SDataDispatchBuf nextOutput;
int32_t outPutColCounts;
int32_t status;
bool queryEnd;
uint64_t useconds;
@ -54,6 +55,65 @@ typedef struct SDataDispatchHandle {
TdThreadMutex mutex;
} SDataDispatchHandle;
static int32_t inputSafetyCheck(SDataDispatchHandle* pHandle, const SInputData* pInput) {
if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
return TSDB_CODE_SUCCESS;
}
if (pInput == NULL || pInput->pData == NULL || pInput->pData->info.rows <= 0) {
qError("invalid input data");
return TSDB_CODE_QRY_INVALID_INPUT;
}
SDataBlockDescNode* pSchema = pHandle->pSchema;
if (pSchema == NULL || pSchema->totalRowSize != pInput->pData->info.rowSize) {
qError("invalid schema");
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (pHandle->outPutColCounts > taosArrayGetSize(pInput->pData->pDataBlock)) {
qError("invalid column number, schema:%d, input:%zu", pHandle->outPutColCounts, taosArrayGetSize(pInput->pData->pDataBlock));
return TSDB_CODE_QRY_INVALID_INPUT;
}
SNode* pNode;
int32_t colNum = 0;
FOREACH(pNode, pHandle->pSchema->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
SColumnInfoData* pColInfoData = taosArrayGet(pInput->pData->pDataBlock, colNum);
if (pColInfoData == NULL) {
return -1;
}
if (pColInfoData->info.bytes < 0) {
qError("invalid column bytes, schema:%d, input:%d", pSlotDesc->dataType.bytes, pColInfoData->info.bytes);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (!IS_VAR_DATA_TYPE(pColInfoData->info.type) &&
TYPE_BYTES[pColInfoData->info.type] != pColInfoData->info.bytes) {
qError("invalid column bytes, schema:%d, input:%d", TYPE_BYTES[pColInfoData->info.type],
pColInfoData->info.bytes);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (pColInfoData->info.type != pSlotDesc->dataType.type) {
qError("invalid column type, schema:%d, input:%d", pSlotDesc->dataType.type, pColInfoData->info.type);
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (pColInfoData->info.bytes != pSlotDesc->dataType.bytes) {
qError("invalid column bytes, schema:%d, input:%d", pSlotDesc->dataType.bytes, pColInfoData->info.bytes);
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (IS_INVALID_TYPE(pColInfoData->info.type)) {
qError("invalid column type, type:%d", pColInfoData->info.type);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
++colNum;
}
}
return TSDB_CODE_SUCCESS;
}
// clang-format off
// data format:
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
@ -67,6 +127,12 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
int32_t numOfCols = 0;
SNode* pNode;
int32_t code = inputSafetyCheck(pHandle, pInput);
if (code) {
qError("failed to check input data, code:%d", code);
return code;
}
FOREACH(pNode, pHandle->pSchema->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
@ -84,17 +150,18 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
pBuf->useSize = sizeof(SDataCacheEntry);
{
// allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size
size_t dataEncodeBufSize = pBuf->allocSize + 8;
if ((pBuf->allocSize > tsCompressMsgSize) && (tsCompressMsgSize > 0) && pHandle->pManager->cfg.compress) {
if (pHandle->pCompressBuf == NULL) {
// allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size
pHandle->pCompressBuf = taosMemoryMalloc(pBuf->allocSize + 8);
pHandle->pCompressBuf = taosMemoryMalloc(dataEncodeBufSize);
if (NULL == pHandle->pCompressBuf) {
QRY_RET(terrno);
}
pHandle->bufSize = pBuf->allocSize + 8;
pHandle->bufSize = dataEncodeBufSize;
} else {
if (pHandle->bufSize < pBuf->allocSize + 8) {
pHandle->bufSize = pBuf->allocSize + 8;
if (pHandle->bufSize < dataEncodeBufSize) {
pHandle->bufSize = dataEncodeBufSize;
void* p = taosMemoryRealloc(pHandle->pCompressBuf, pHandle->bufSize);
if (p != NULL) {
pHandle->pCompressBuf = p;
@ -105,7 +172,7 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
}
}
int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, numOfCols);
int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, dataEncodeBufSize, numOfCols);
if(dataLen < 0) {
qError("failed to encode data block, code: %d", dataLen);
return terrno;
@ -123,7 +190,7 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
TAOS_MEMCPY(pEntry->data, pHandle->pCompressBuf, dataLen);
}
} else {
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, pBuf->allocSize, numOfCols);
if(pEntry->dataLen < 0) {
qError("failed to encode data block, code: %d", pEntry->dataLen);
return terrno;
@ -314,8 +381,60 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
return TSDB_CODE_SUCCESS;
}
static int32_t blockDescNodeCheck(SDataBlockDescNode* pInputDataBlockDesc) {
if(tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
return TSDB_CODE_SUCCESS;
}
if (pInputDataBlockDesc == NULL) {
qError("invalid schema");
return TSDB_CODE_QRY_INVALID_INPUT;
}
SNode* pNode;
int32_t realOutputRowSize = 0;
FOREACH(pNode, pInputDataBlockDesc->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
realOutputRowSize += pSlotDesc->dataType.bytes;
} else {
// Slots must be sorted, and slots with 'output' set to true must come first
break;
}
}
if (realOutputRowSize != pInputDataBlockDesc->outputRowSize) {
qError("invalid schema, realOutputRowSize:%d, outputRowSize:%d", realOutputRowSize, pInputDataBlockDesc->outputRowSize);
return TSDB_CODE_QRY_INVALID_INPUT;
}
return TSDB_CODE_SUCCESS;
}
int32_t getOutputColCounts(SDataBlockDescNode* pInputDataBlockDesc) {
if (pInputDataBlockDesc == NULL) {
qError("invalid schema");
return 0;
}
SNode* pNode;
int32_t numOfCols = 0;
FOREACH(pNode, pInputDataBlockDesc->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
++numOfCols;
} else {
// Slots must be sorted, and slots with 'output' set to true must come first
break;
}
}
return numOfCols;
}
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) {
int32_t code;
code = blockDescNodeCheck(pDataSink->pInputDataBlockDesc);
if (code) {
qError("failed to check input data block desc, code:%d", code);
return code;
}
SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
if (NULL == dispatcher) {
@ -333,6 +452,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
dispatcher->pManager = pManager;
pManager = NULL;
dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
dispatcher->outPutColCounts = getOutputColCounts(dispatcher->pSchema);
dispatcher->status = DS_BUF_EMPTY;
dispatcher->queryEnd = false;
code = taosOpenQueue(&dispatcher->pDataBlocks);

View File

@ -528,7 +528,12 @@ static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock**
qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
if (*ppRes && (code == 0)) {
blockDataCheck(*ppRes, false);
code = blockDataCheck(*ppRes);
if (code) {
qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
}
pPost->isStarted = true;
pStbJoin->execInfo.postBlkNum++;
pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;

View File

@ -390,6 +390,7 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) {
createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
idata.info.scale = pDescNode->dataType.scale;
idata.info.precision = pDescNode->dataType.precision;
idata.info.noData = pDescNode->reserve;
code = blockDataAppendColInfo(pBlock, &idata);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -700,12 +700,12 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
pTaskInfo->paramSet = true;
code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
blockDataCheck(pRes, false);
} else {
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
blockDataCheck(pRes, false);
}
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataCheck(pRes);
QUERY_CHECK_CODE(code, lino, _end);
if (pRes == NULL) {
@ -750,7 +750,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
}
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
blockDataCheck(pRes, false);
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataCheck(pRes);
QUERY_CHECK_CODE(code, lino, _end);
}
@ -849,7 +850,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
}
blockDataCheck(*pRes, false);
code = blockDataCheck(*pRes);
if (code) {
pTaskInfo->code = code;
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
}
uint64_t el = (taosGetTimestampUs() - st);

View File

@ -616,11 +616,12 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p
}
}
}
code = TSDB_CODE_SUCCESS;
code = blockDataCheck(pBlock);
QUERY_CHECK_CODE(code, lino, _err);
_err:
blockDataCheck(pBlock, true);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
colDataDestroy(p);
taosMemoryFree(p);
return code;
@ -701,7 +702,7 @@ int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResu
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
code = colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
code = colDataSetValOrCover(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
QUERY_CHECK_CODE(code, lino, _end);
}
}

View File

@ -765,7 +765,7 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p
}
}
blockDataCheck(pBlock, false);
code = blockDataCheck(pBlock);
*ppRes = pBlock;
return code;

View File

@ -65,11 +65,14 @@ static int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock);
int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
blockDataCheck(*ppBlock, false);
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
if (code) {
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
}
code = blockDataCheck(*ppBlock);
if (code) {
qError("failed to check data block got from upstream, %s code:%s", __func__, tstrerror(code));
}
return code;
}
@ -526,7 +529,8 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
if ((*pResBlock) != NULL) {
pOperator->resultInfo.totalRows += (*pResBlock)->info.rows;
blockDataCheck(*pResBlock, false);
code = blockDataCheck(*pResBlock);
QUERY_CHECK_CODE(code, lino, _end);
} else {
setOperatorCompleted(pOperator);
}

View File

@ -872,15 +872,25 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInpu
SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) {
SSDataBlock* p = NULL;
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
blockDataCheck(p, false);
return (code == 0)? p:NULL;
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, true, &p);
if (code == TSDB_CODE_SUCCESS) {
code = blockDataCheck(p);
if (code != TSDB_CODE_SUCCESS) {
qError("blockDataCheck failed, code:%s", tstrerror(code));
}
}
return (code == 0) ? p : NULL;
}
SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, int32_t idx) {
SSDataBlock* p = NULL;
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
blockDataCheck(p, false);
int32_t code = getNextBlockFromDownstreamImpl(pOperator, idx, false, &p);
if (code == TSDB_CODE_SUCCESS) {
code = blockDataCheck(p);
if (code != TSDB_CODE_SUCCESS) {
qError("blockDataCheck failed, code:%s", tstrerror(code));
}
}
return (code == 0)? p:NULL;
}

View File

@ -1462,6 +1462,18 @@ static void destroyTableScanOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
static void resetClolumnReserve(SSDataBlock* pBlock, int32_t dataRequireFlag) {
if (pBlock && dataRequireFlag == FUNC_DATA_REQUIRED_NOT_LOAD) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
if (pCol) {
pCol->info.noData = true;
}
}
}
}
int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
@ -1512,6 +1524,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
resetClolumnReserve(pInfo->pResBlock, pInfo->base.dataBlockLoadFlag);
QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno);
code = prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo);
@ -2117,6 +2130,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
SColumnInfoData* pDestTableNameInxCol = taosArrayGet(pDestBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
uint64_t groupId = pSrcGp[i];
if (groupId == 0) {
@ -2154,7 +2168,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
QUERY_CHECK_CODE(code, lino, _end);
colDataSetNULL(pDestCalStartTsCol, i);
colDataSetNULL(pDestCalEndTsCol, i);
colDataSetNULL(pDestTableNameInxCol, i);
pDestBlock->info.rows++;
}
@ -2207,6 +2221,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
SColumnInfoData* pDestTableNameInxCol = taosArrayGet(pDestBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
uint64_t groupId = pSrcGp[i];
if (groupId == 0) {
@ -2235,6 +2250,8 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
code = colDataSetVal(pDestCalEndTsCol, i, (const char*)&range.win.ekey, false);
QUERY_CHECK_CODE(code, lino, _end);
colDataSetNULL(pDestTableNameInxCol, i);
pDestBlock->info.rows++;
}
@ -2450,6 +2467,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
SColumnInfoData* pDestTableNameInxCol = taosArrayGet(pDestBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
for (int32_t i = 0; i < pSrcBlock->info.rows;) {
uint64_t srcUid = srcUidData[i];
uint64_t groupId = srcGp[i];
@ -2482,6 +2500,8 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
code = colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
QUERY_CHECK_CODE(code, lino, _end);
colDataSetNULL(pDestTableNameInxCol, pDestBlock->info.rows);
pDestBlock->info.rows++;
}
@ -3240,6 +3260,7 @@ static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, S
colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
colDataSetNULL(taosArrayGet(pDst->pDataBlock, TABLE_NAME_COLUMN_INDEX), j);
j++;
}
}
@ -3948,6 +3969,9 @@ FETCH_NEXT_BLOCK:
// printDataBlock(pInfo->pCheckpointRes, "stream scan ck", GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->pCheckpointRes;
return code;
} else {
qError("stream scan error, invalid block type %d, %s", pInfo->blockType, id);
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
_end:

View File

@ -334,10 +334,14 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
blockDataCheck(*ppBlock, false);
int32_t code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
if (code) {
qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
} else {
code = blockDataCheck(*ppBlock);
if (code) {
qError("failed to check block data, %s code:%s", __func__, tstrerror(code));
}
}
return code;
}
@ -630,7 +634,8 @@ int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
QUERY_CHECK_CODE(code, lino, _end);
if (block != NULL) {
blockDataCheck(block, false);
code = blockDataCheck(block);
QUERY_CHECK_CODE(code, lino, _end);
if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
*ppBlock = block;

View File

@ -1385,9 +1385,12 @@ static int32_t translateRepeat(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
uint8_t type = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type;
int32_t orgLen = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->bytes;
int32_t count = TMAX((int32_t)((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i, 1);
int32_t resLen = orgLen * count;
int32_t resLen;
if (nodeType(nodesListGetNode(pFunc->pParameterList, 1)) == QUERY_NODE_VALUE) {
resLen = orgLen * TMAX((int32_t)((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i, 1);
} else {
resLen = TSDB_MAX_BINARY_LEN;
}
pFunc->node.resType = (SDataType){.bytes = resLen, .type = type};
return TSDB_CODE_SUCCESS;
}
@ -1535,14 +1538,16 @@ static int32_t translateToJson(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
static int32_t translateOutGeom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_GEOMETRY].bytes, .type = TSDB_DATA_TYPE_GEOMETRY};
SDataType dt = *getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0));
pFunc->node.resType = (SDataType){.bytes = dt.bytes, .type = TSDB_DATA_TYPE_GEOMETRY};
return TSDB_CODE_SUCCESS;
}
static int32_t translateInGeomOutStr(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
FUNC_ERR_RET(validateParam(pFunc, pErrBuf, len));
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_VARCHAR].bytes, .type = TSDB_DATA_TYPE_VARCHAR};
SDataType dt = *getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0));
pFunc->node.resType = (SDataType){.bytes = dt.bytes, .type = TSDB_DATA_TYPE_VARCHAR};
return TSDB_CODE_SUCCESS;
}
@ -1613,7 +1618,7 @@ static int32_t translateOutVarchar(SFunctionNode* pFunc, char* pErrBuf, int32_t
break;
case FUNCTION_TYPE_BLOCK_DIST:
case FUNCTION_TYPE_BLOCK_DIST_INFO:
bytes = 128;
bytes = sizeof(STableBlockDistInfo);
break;
case FUNCTION_TYPE_TO_CHAR:
bytes = 4096;
@ -1655,7 +1660,7 @@ static int32_t translateOutVarchar(SFunctionNode* pFunc, char* pErrBuf, int32_t
bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
break;
case FUNCTION_TYPE_TIMEZONE:
bytes = TD_TIMEZONE_LEN;
bytes = timeZoneStrLen();
break;
case FUNCTION_TYPE_IRATE_PARTIAL:
bytes = getIrateInfoSize((pFunc->hasPk) ? pFunc->pkBytes : 0) + VARSTR_HEADER_SIZE;

View File

@ -412,6 +412,27 @@ int32_t createFunction(const char* pName, SNodeList* pParameterList, SFunctionNo
return code;
}
int32_t createFunctionWithSrcFunc(const char* pName, const SFunctionNode* pSrcFunc, SNodeList* pParameterList, SFunctionNode** ppFunc) {
int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)ppFunc);
if (NULL == *ppFunc) {
return code;
}
(*ppFunc)->hasPk = pSrcFunc->hasPk;
(*ppFunc)->pkBytes = pSrcFunc->pkBytes;
(void)snprintf((*ppFunc)->functionName, sizeof((*ppFunc)->functionName), "%s", pName);
(*ppFunc)->pParameterList = pParameterList;
code = getFuncInfo((*ppFunc));
if (TSDB_CODE_SUCCESS != code) {
(*ppFunc)->pParameterList = NULL;
nodesDestroyNode((SNode*)*ppFunc);
*ppFunc = NULL;
return code;
}
return code;
}
static int32_t createColumnByFunc(const SFunctionNode* pFunc, SColumnNode** ppCol) {
int32_t code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)ppCol);
if (NULL == *ppCol) {
@ -438,7 +459,8 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod
if (NULL == pParameterList) {
return code;
}
code = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pPartialFunc, pParameterList,pPartialFunc );
code =
createFunctionWithSrcFunc(funcMgtBuiltins[pSrcFunc->funcId].pPartialFunc, pSrcFunc, pParameterList, pPartialFunc);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pParameterList);
return code;
@ -452,8 +474,6 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod
return TSDB_CODE_FAILED;
}
tstrncpy((*pPartialFunc)->node.aliasName, name, TSDB_COL_NAME_LEN);
(*pPartialFunc)->hasPk = pSrcFunc->hasPk;
(*pPartialFunc)->pkBytes = pSrcFunc->pkBytes;
return TSDB_CODE_SUCCESS;
}
@ -479,9 +499,9 @@ static int32_t createMidFunction(const SFunctionNode* pSrcFunc, const SFunctionN
int32_t code = createMergeFuncPara(pSrcFunc, pPartialFunc, &pParameterList);
if (TSDB_CODE_SUCCESS == code) {
if(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc != NULL){
code = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc, pParameterList, &pFunc);
code = createFunctionWithSrcFunc(funcMgtBuiltins[pSrcFunc->funcId].pMiddleFunc, pSrcFunc, pParameterList, &pFunc);
}else{
code = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList, &pFunc);
code = createFunctionWithSrcFunc(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pSrcFunc, pParameterList, &pFunc);
}
}
if (TSDB_CODE_SUCCESS == code) {
@ -493,8 +513,6 @@ static int32_t createMidFunction(const SFunctionNode* pSrcFunc, const SFunctionN
} else {
nodesDestroyList(pParameterList);
}
(*pMidFunc)->hasPk = pPartialFunc->hasPk;
(*pMidFunc)->pkBytes = pPartialFunc->pkBytes;
return code;
}
@ -505,7 +523,7 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
int32_t code = createMergeFuncPara(pSrcFunc, pPartialFunc, &pParameterList);
if (TSDB_CODE_SUCCESS == code) {
code = createFunction(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pParameterList, &pFunc);
code = createFunctionWithSrcFunc(funcMgtBuiltins[pSrcFunc->funcId].pMergeFunc, pSrcFunc, pParameterList, &pFunc);
}
if (TSDB_CODE_SUCCESS == code) {
pFunc->hasOriginalFunc = true;
@ -522,8 +540,6 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
} else {
nodesDestroyList(pParameterList);
}
(*pMergeFunc)->hasPk = pPartialFunc->hasPk;
(*pMergeFunc)->pkBytes = pPartialFunc->pkBytes;
return code;
}

View File

@ -332,7 +332,7 @@ static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList,
}
static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false, false);
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false, true);
}
static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlockDescNode* pDataBlockDesc) {
@ -354,7 +354,7 @@ static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlo
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList,
SDataBlockDescNode* pDataBlockDesc) {
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, false, false);
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, false, true);
}
static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {

View File

@ -2414,7 +2414,7 @@ int32_t toCharFunction(SScalarParam* pInput, int32_t inputNum, SScalarParam* pOu
char *ts = colDataGetData(pInput[0].columnData, i);
char *formatData = colDataGetData(pInput[1].columnData, pInput[1].numOfRows > 1 ? i : 0);
len = TMIN(TS_FORMAT_MAX_LEN - 1, varDataLen(formatData));
len = TMIN(TS_FORMAT_MAX_LEN - VARSTR_HEADER_SIZE, varDataLen(formatData));
if (pInput[1].numOfRows > 1 || i == 0) {
(void)strncpy(format, varDataVal(formatData), len);
format[len] = '\0';
@ -2663,6 +2663,10 @@ int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOut
return TSDB_CODE_SUCCESS;
}
int32_t timeZoneStrLen() {
return sizeof(VarDataLenT) + strlen(tsTimezoneStr);
}
int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
char output[TD_TIMEZONE_LEN + VARSTR_HEADER_SIZE] = {0};
(void)memcpy(varDataVal(output), tsTimezoneStr, TD_TIMEZONE_LEN);

View File

@ -5136,13 +5136,13 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
goto _ERROR;
}
memset(dstBuf, 0, cap);
nBytes = snprintf(dstDir, cap, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta);
nBytes = snprintf(dstBuf, cap, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta);
if (nBytes <= 0 || nBytes >= cap) {
code = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR;
}
TdFilePtr pFile = taosOpenFile(dstDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
TdFilePtr pFile = taosOpenFile(dstBuf, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
code = terrno;
stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(code));

View File

@ -145,7 +145,8 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req) {
SRetrieveTableRsp* pRetrieve = NULL;
int32_t len = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
int32_t len = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN;
pRetrieve = taosMemoryCalloc(1, len);
if (pRetrieve == NULL) return terrno;
@ -162,7 +163,7 @@ static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBl
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, dataEncodeSize, numOfCols);
if (actualLen < 0) {
taosMemoryFree(pRetrieve);
return terrno;
@ -1247,7 +1248,8 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
}
int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN;
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) {
return terrno;
@ -1269,7 +1271,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, dataEncodeSize, numOfCols);
if (actualLen < 0) {
taosMemoryFree(buf);
return terrno;

View File

@ -108,13 +108,13 @@ taos> select repeat(nch1, id) from ts_4893.meters where id > 0 order by ts limit
novelnovelnovelnovelnovel |
taos> select repeat(var1, id) from ts_4893.meters where id > 0 order by ts limit 5
repeat(var1, id) |
===================
person |
novelnovel |
plateplateplate |
一二三四五六... |
updateupdateu... |
repeat(var1, id) |
=================================
person |
novelnovel |
plateplateplate |
一二三四五六七八九十一二三... |
updateupdateupdateupdateupdate |
taos> select repeat('nch1', id) from ts_4893.meters where id > 0 order by ts limit 5
repeat('nch1', id) |
@ -229,32 +229,32 @@ taos> select repeat(var1, 3) from ts_4893.meters order by ts limit 10
plateplateplate |
taos> select repeat(name, groupid) from ts_4893.d0 order by ts limit 10
repeat(name, groupid) |
========================
lili |
x |
lili |
x |
lili |
taos |
haha |
taos |
taos |
haha |
repeat(name, groupid) |
=================================
lili |
x |
lili |
x |
lili |
taos |
haha |
taos |
taos |
haha |
taos> select repeat(name, groupid) from ts_4893.meters order by ts limit 10
repeat(name, groupid) |
========================
lili |
x |
lili |
x |
lili |
taos |
haha |
taos |
taos |
haha |
repeat(name, groupid) |
=================================
lili |
x |
lili |
x |
lili |
taos |
haha |
taos |
taos |
haha |
taos> select repeat(nch1, groupid) from ts_4893.d0 order by ts limit 10
repeat(nch1, groupid) |
@ -355,9 +355,9 @@ taos> select repeat('你好', 2)
你好你好 |
taos> select repeat('abc', length('abc'))
repeat('abc', length('abc')) |
===============================
abcabcabc |
repeat('abc', length('abc')) |
=================================
abcabcabc |
taos> select repeat(concat('A', 'B', 'C'), 3)
repeat(concat('A', 'B', 'C'), 3) |

Can't render this file because it has a wrong number of fields in line 4.

View File

@ -31,8 +31,6 @@ from frame.eos import *
class TDTestCase(TBase):
index = eutil.cpuRand(20) + 1
bucketName = f"ci-bucket{index}"
updatecfgDict = {
"supportVnodes":"1000",
's3EndPoint': 'https://<account-id>.blob.core.windows.net',
@ -44,7 +42,6 @@ class TDTestCase(TBase):
's3MigrateEnabled': '1'
}
tdLog.info(f"assign bucketName is {bucketName}\n")
maxFileSize = (128 + 10) * 1014 * 1024 # add 10M buffer
def insertData(self):
@ -152,13 +149,13 @@ class TDTestCase(TBase):
if keepLocal is not None:
kw1 = f"s3_keeplocal {keepLocal}"
if chunkSize is not None:
kw2 = f"s3_chunksize {chunkSize}"
kw2 = f"s3_chunkpages {chunkSize}"
if compact is not None:
kw3 = f"s3_compact {compact}"
sql = f" create database db1 vgroups 1 duration 1h {kw1} {kw2} {kw3}"
tdSql.execute(sql, show=True)
#sql = f"select name,s3_keeplocal,s3_chunksize,s3_compact from information_schema.ins_databases where name='db1';"
#sql = f"select name,s3_keeplocal,s3_chunkpages,s3_compact from information_schema.ins_databases where name='db1';"
sql = f"select * from information_schema.ins_databases where name='db1';"
tdSql.query(sql)
# 29 30 31 -> chunksize keeplocal compact
@ -172,15 +169,32 @@ class TDTestCase(TBase):
sql = "drop database db1"
tdSql.execute(sql)
def checkDefault(self, keepLocal, chunkSize, compact):
sql = f" create database db1 vgroups 1"
tdSql.execute(sql, show=True)
#sql = f"select name,s3_keeplocal,s3_chunkpages,s3_compact from information_schema.ins_databases where name='db1';"
sql = f"select * from information_schema.ins_databases where name='db1';"
tdSql.query(sql)
# 29 30 31 -> chunksize keeplocal compact
if chunkSize is not None:
tdSql.checkData(0, 29, chunkSize)
if keepLocal is not None:
keepLocalm = keepLocal * 24 * 60
tdSql.checkData(0, 30, f"{keepLocalm}m")
if compact is not None:
tdSql.checkData(0, 31, compact)
sql = "drop database db1"
tdSql.execute(sql)
def checkExcept(self):
# errors
sqls = [
f"create database db2 s3_keeplocal -1",
f"create database db2 s3_keeplocal 0",
f"create database db2 s3_keeplocal 365001",
f"create database db2 s3_chunksize -1",
f"create database db2 s3_chunksize 0",
f"create database db2 s3_chunksize 900000000",
f"create database db2 s3_chunkpages -1",
f"create database db2 s3_chunkpages 0",
f"create database db2 s3_chunkpages 900000000",
f"create database db2 s3_compact -1",
f"create database db2 s3_compact 100",
f"create database db2 duration 1d s3_keeplocal 1d"
@ -226,16 +240,7 @@ class TDTestCase(TBase):
# except
self.checkExcept()
#
def preDb(self, vgroups):
cnt = int(time.time())%2 + 1
for i in range(cnt):
vg = eutil.cpuRand(9) + 1
sql = f"create database predb vgroups {vg}"
tdSql.execute(sql, show=True)
sql = "drop database predb"
tdSql.execute(sql, show=True)
self.checkDefault(365, 131072, 1)
# history
def insertHistory(self):
@ -287,9 +292,6 @@ class TDTestCase(TBase):
if eos.isArm64Cpu():
tdLog.success(f"{__file__} arm64 ignore executed")
else:
self.preDb(10)
# insert data
self.insertData()
@ -311,7 +313,6 @@ class TDTestCase(TBase):
# check insert correct again
self.checkInsertCorrect()
# check stream correct and drop stream
#self.checkStreamCorrect()
@ -321,7 +322,7 @@ class TDTestCase(TBase):
# insert history disorder data
self.insertHistory()
# checkBasic
# check db params
self.checkBasic()
#self.checkInsertCorrect()
@ -335,10 +336,8 @@ class TDTestCase(TBase):
# drop database and free s3 file
self.dropDb()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,67 @@
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"connection_pool_size": 8,
"num_of_records_per_req": 4000,
"prepared_rand": 500,
"thread_count": 4,
"create_table_thread_count": 1,
"confirm_parameter_prompt": "no",
"databases": [
{
"dbinfo": {
"name": "db",
"drop": "yes",
"vgroups": 3,
"replica": 3,
"duration":"10d",
"s3_keeplocal":"30d",
"s3_chunkpages":"131072",
"tsdb_pagesize":"1",
"s3_compact":"1",
"wal_retention_size":"1",
"wal_retention_period":"1",
"flush_each_batch":"no",
"keep": "3650d"
},
"super_tables": [
{
"name": "stb",
"child_table_exists": "no",
"childtable_count": 500,
"insert_rows": 200000,
"childtable_prefix": "d",
"insert_mode": "taosc",
"timestamp_step": 100,
"start_timestamp": 1600000000000,
"columns": [
{ "type": "bool", "name": "bc"},
{ "type": "float", "name": "fc" },
{ "type": "double", "name": "dc"},
{ "type": "tinyint", "name": "ti"},
{ "type": "smallint", "name": "si" },
{ "type": "int", "name": "ic" ,"max": 1,"min": 1},
{ "type": "bigint", "name": "bi" },
{ "type": "utinyint", "name": "uti"},
{ "type": "usmallint", "name": "usi"},
{ "type": "uint", "name": "ui" },
{ "type": "ubigint", "name": "ubi"},
{ "type": "binary", "name": "bin", "len": 50},
{ "type": "nchar", "name": "nch", "len": 100}
],
"tags": [
{"type": "tinyint", "name": "groupid","max": 10,"min": 1},
{"name": "location","type": "binary", "len": 16, "values":
["San Francisco", "Los Angles", "San Diego", "San Jose", "Palo Alto", "Campbell", "Mountain View","Sunnyvale", "Santa Clara", "Cupertino"]
}
]
}
]
}
]
}

View File

@ -20,7 +20,7 @@
"replica": 1,
"duration":"10d",
"s3_keeplocal":"30d",
"s3_chunksize":"131072",
"s3_chunkpages":"131072",
"tsdb_pagesize":"1",
"s3_compact":"1",
"wal_retention_size":"1",

View File

@ -20,7 +20,7 @@
"replica": 1,
"duration":"10d",
"s3_keeplocal":"30d",
"s3_chunksize":"131072",
"s3_chunkpages":"131072",
"tsdb_pagesize":"1",
"s3_compact":"1",
"wal_retention_size":"1",

View File

@ -48,6 +48,7 @@ class TDSimClient:
"telemetryReporting": "0",
"tqDebugflag": "135",
"stDebugflag":"135",
"safetyCheckLevel":"2"
}
def getLogDir(self):
@ -149,7 +150,8 @@ class TDDnode:
"statusInterval": "1",
"enableQueryHb": "1",
"supportVnodes": "1024",
"telemetryReporting": "0"
"telemetryReporting": "0",
"safetyCheckLevel":"2"
}
def init(self, path, remoteIP = ""):

View File

@ -62,12 +62,30 @@ class TDTestCase:
tdSql.query("show dnode 1 variables like '____debugFlag'")
tdSql.checkRows(2)
tdSql.query("show dnode 1 variables like 's3MigrateEna%'")
tdSql.query("show dnode 1 variables like 's3MigrateEnab%'")
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, 's3MigrateEnabled')
tdSql.checkData(0, 2, 0)
tdSql.query("show dnode 1 variables like 's3MigrateIntervalSec%'")
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, 's3MigrateIntervalSec')
tdSql.checkData(0, 2, 3600)
tdSql.query("show dnode 1 variables like 's3PageCacheSize%'")
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, 's3PageCacheSize')
tdSql.checkData(0, 2, 4096)
tdSql.query("show dnode 1 variables like 's3UploadDelaySec%'")
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, 's3UploadDelaySec')
tdSql.checkData(0, 2, 60)
def threadTest(self, threadID):
print(f"Thread {threadID} starting...")
tdsqln = tdCom.newTdSql()