diff --git a/docs/zh/04-get-started/05-cloud.md b/docs/zh/04-get-started/05-cloud.md
index bd76add527..1bca09ee91 100644
--- a/docs/zh/04-get-started/05-cloud.md
+++ b/docs/zh/04-get-started/05-cloud.md
@@ -15,7 +15,7 @@ TDengine Cloud 大幅减轻了用户在部署、运维等方面的人力负担
要在 TDengine Cloud 注册新用户,请遵循以下简易步骤完成注册流程:
-1. 打开浏览器,访问 TDengine Cloud 的首页:https://cloud.taosdata.com,在右边的“注册”部分,填入自己的姓名以及企业邮箱地址,点击“获取验证码”按钮。
+1. 打开浏览器,访问 [TDengine Cloud](https://cloud.taosdata.com),在右边的“注册”部分,填入自己的姓名以及企业邮箱地址,点击“获取验证码”按钮。
2. 检查企业邮箱,找到主题为“你的 TDengine Cloud 注册账户验证码”的邮件。从邮件内容中复制 6 位验证码,并将其粘贴到注册页面上的“验证码”输入框中。接着,点击“注册 TDengine Cloud”按钮,进入客户信息补全页面。
@@ -32,4 +32,4 @@ TDengine Cloud 大幅减轻了用户在部署、运维等方面的人力负担
3. 第 3 步,创建实例。在此步骤中,你需要填写实例的区域、名称、是否选择高可用选项以及计费方案等必填信息。确认无误后,点击“创建”按钮。大约等待 1min,新的TDengine 实例便会创建完成。随后,你可以在控制台中对该实例进行各种操作,如查询数据、创建订阅、创建流等。
-TDengine Cloud 提供多种级别的计费方案,包括入门版、基础版、标准版、专业版和旗舰版,以满足不同客户的需求。如果你觉得现有计费方案无法满足自己的特定需求,请联系 TDengine Cloud 的客户支持团队,他们将为你量身定制计费方案。注册后,你将获得一定的免费额度,以便体验服务
\ No newline at end of file
+TDengine Cloud 提供多种级别的计费方案,包括入门版、基础版、标准版、专业版和旗舰版,以满足不同客户的需求。如果你觉得现有计费方案无法满足自己的特定需求,请联系 TDengine Cloud 的客户支持团队,他们将为你量身定制计费方案。注册后,你将获得一定的免费额度,以便体验服务
diff --git a/docs/zh/08-operation/03-deployment.md b/docs/zh/08-operation/03-deployment.md
index 83b2c91843..2e0c2a7989 100644
--- a/docs/zh/08-operation/03-deployment.md
+++ b/docs/zh/08-operation/03-deployment.md
@@ -206,11 +206,11 @@ http {
### 部署 taosX
-如果想使用 TDengine 的数据接入能力,需要部署 taosX 服务,关于它的详细说明和部署请参考[taosX 参考手册](../../reference/components/taosx)。
+如果想使用 TDengine 的数据接入能力,需要部署 taosX 服务,关于它的详细说明和部署请参考企业版参考手册。
### 部署 taosX-Agent
-有些数据源如 Pi, OPC 等,因为网络条件和数据源访问的限制,taosX 无法直接访问数据源,这种情况下需要部署一个代理服务 taosX-Agent,关于它的详细说明和部署请参考[taosX-Agent 参考手册](../../reference/components/taosx-agent)。
+有些数据源如 Pi, OPC 等,因为网络条件和数据源访问的限制,taosX 无法直接访问数据源,这种情况下需要部署一个代理服务 taosX-Agent,关于它的详细说明和部署请参考企业版参考手册。
### 部署 taos-Explorer
diff --git a/docs/zh/08-operation/12-multi.md b/docs/zh/08-operation/12-multi.md
index 8f11ee4326..a5608ad5fa 100644
--- a/docs/zh/08-operation/12-multi.md
+++ b/docs/zh/08-operation/12-multi.md
@@ -70,7 +70,7 @@ dataDir /mnt/data6 2 0
|参数名称 | 参数含义 |
|:-------------|:-----------------------------------------------|
-|s3EndPoint | 用户所在地域的 COS 服务域名,支持 http 和 https,bucket 的区域需要与 endpoint 的保持一致,否则无法访问。例如:http://cos.ap-beijing.myqcloud.com |
+|s3EndPoint | 用户所在地域的 COS 服务域名,支持 http 和 https,bucket 的区域需要与 endpoint 的保持一致,否则无法访问。 |
|s3AccessKey |冒号分隔的用户 SecretId:SecretKey。例如:AKIDsQmwsfKxTo2A6nGVXZN0UlofKn6JRRSJ:lIdoy99ygEacU7iHfogaN2Xq0yumSm1E |
|s3BucketName | 存储桶名称,减号后面是用户注册 COS 服务的 AppId。其中 AppId 是 COS 特有,AWS 和阿里云都没有,配置时需要作为 bucket name 的一部分,使用减号分隔。参数值均为字符串类型,但不需要引号。例如:test0711-1309024725 |
|s3UploadDelaySec | data 文件持续多长时间不再变动后上传至 s3,单位:秒。最小值:1;最大值:2592000 (30天),默认值 60 秒 |
diff --git a/docs/zh/08-operation/18-dual.md b/docs/zh/08-operation/18-dual.md
index 354e715602..c7871a8e1e 100644
--- a/docs/zh/08-operation/18-dual.md
+++ b/docs/zh/08-operation/18-dual.md
@@ -83,7 +83,7 @@ taosx replica start
```shell
taosx replica start -f td1:6030 -t td2:6030
```
-该示例命令会自动创建除 information_schema、performance_schema、log、audit 库之外的同步任务。可以使用 http://td2:6041 指定该 endpoint 使用 websocket 接口(默认是原生接口)。也可以指定数据库同步:taosx replica start -f td1:6030 -t td2:6030 db1 仅创建指定的数据库同步任务。
+该示例命令会自动创建除 information_schema、performance_schema、log、audit 库之外的同步任务。可以使用 `http://td2:6041` 指定该 endpoint 使用 websocket 接口(默认是原生接口)。也可以指定数据库同步:taosx replica start -f td1:6030 -t td2:6030 db1 仅创建指定的数据库同步任务。
2. 方法二
diff --git a/docs/zh/14-reference/03-taos-sql/14-stream.md b/docs/zh/14-reference/03-taos-sql/14-stream.md
index c0d14f0455..d995c2a09b 100644
--- a/docs/zh/14-reference/03-taos-sql/14-stream.md
+++ b/docs/zh/14-reference/03-taos-sql/14-stream.md
@@ -99,7 +99,7 @@ PARTITION 子句中,为 tbname 定义了一个别名 tname, 在PARTITION 子
## 流式计算读取历史数据
-正常情况下,流式计算不会处理创建前已经写入源表中的数据,若要处理已经写入的数据,可以在创建流时设置 fill_history 1 选项,这样创建的流式计算会自动处理创建前、创建中、创建后写入的数据。例如:
+正常情况下,流式计算不会处理创建前已经写入源表中的数据,若要处理已经写入的数据,可以在创建流时设置 fill_history 1 选项,这样创建的流式计算会自动处理创建前、创建中、创建后写入的数据。流计算处理历史数据的最大窗口数是2000万,超过限制会报错。例如:
```sql
create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 interval(10s)
diff --git a/packaging/tools/com.taosdata.taos-explorer.plist b/packaging/tools/com.taosdata.taos-explorer.plist
new file mode 100644
index 0000000000..2edb5552ad
--- /dev/null
+++ b/packaging/tools/com.taosdata.taos-explorer.plist
@@ -0,0 +1,33 @@
+
+
+
+
+ Label
+ com.tdengine.taos-explorer
+ ProgramArguments
+
+ /usr/local/bin/taos-explorer
+
+ ProcessType
+ Interactive
+ Disabled
+
+ RunAtLoad
+
+ LaunchOnlyOnce
+
+ SessionCreate
+
+ ExitTimeOut
+ 600
+ KeepAlive
+
+ SuccessfulExit
+
+ AfterInitialDemand
+
+
+ Program
+ /usr/local/bin/taos-explorer
+
+
\ No newline at end of file
diff --git a/packaging/tools/remove.sh b/packaging/tools/remove.sh
index 58a17e2a50..c3f459ca9c 100755
--- a/packaging/tools/remove.sh
+++ b/packaging/tools/remove.sh
@@ -206,10 +206,17 @@ function clean_log() {
}
function clean_service_on_launchctl() {
- ${csudouser}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taosd.plist > /dev/null 2>&1 || :
- ${csudo}rm /Library/LaunchDaemons/com.taosdata.taosd.plist > /dev/null 2>&1 || :
- ${csudouser}launchctl unload -w /Library/LaunchDaemons/com.taosdata.${clientName2}adapter.plist > /dev/null 2>&1 || :
- ${csudo}rm /Library/LaunchDaemons/com.taosdata.${clientName2}adapter.plist > /dev/null 2>&1 || :
+ ${csudo}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taosd.plist || :
+ ${csudo}launchctl unload -w /Library/LaunchDaemons/com.taosdata.${PREFIX}adapter.plist || :
+ ${csudo}launchctl unload -w /Library/LaunchDaemons/com.taosdata.${PREFIX}keeper.plist || :
+ ${csudo}launchctl unload -w /Library/LaunchDaemons/com.taosdata.${PREFIX}-explorer.plist || :
+
+ ${csudo}launchctl remove com.tdengine.taosd || :
+ ${csudo}launchctl remove com.tdengine.${PREFIX}adapter || :
+ ${csudo}launchctl remove com.tdengine.${PREFIX}keeper || :
+ ${csudo}launchctl remove com.tdengine.${PREFIX}-explorer || :
+
+ ${csudo}rm /Library/LaunchDaemons/com.taosdata.* > /dev/null 2>&1 || :
}
function remove_data_and_config() {
@@ -250,6 +257,12 @@ if [ -e ${install_main_dir}/uninstall_${PREFIX}x.sh ]; then
fi
fi
+
+if [ "$osType" = "Darwin" ]; then
+ clean_service_on_launchctl
+ ${csudo}rm -rf /Applications/TDengine.app
+fi
+
remove_bin
clean_header
# Remove lib file
@@ -282,10 +295,7 @@ elif echo $osinfo | grep -qwi "centos"; then
# echo "this is centos system"
${csudo}rpm -e --noscripts tdengine >/dev/null 2>&1 || :
fi
-if [ "$osType" = "Darwin" ]; then
- clean_service_on_launchctl
- ${csudo}rm -rf /Applications/TDengine.app
-fi
+
command -v systemctl >/dev/null 2>&1 && ${csudo}systemctl daemon-reload >/dev/null 2>&1 || true
echo
diff --git a/packaging/tools/tdengine.iss b/packaging/tools/tdengine.iss
index 8085c55e3e..c3eb6f9f68 100644
--- a/packaging/tools/tdengine.iss
+++ b/packaging/tools/tdengine.iss
@@ -71,8 +71,8 @@ Source: {#MyAppSourceDir}\taosdump.exe; DestDir: "{app}"; DestName: "{#CusPrompt
Filename: {sys}\sc.exe; Parameters: "create taosd start= DEMAND binPath= ""C:\\TDengine\\taosd.exe --win_service""" ; Flags: runhidden
Filename: {sys}\sc.exe; Parameters: "create taosadapter start= DEMAND binPath= ""C:\\TDengine\\taosadapter.exe""" ; Flags: runhidden
-Filename: "C:\Windows\System32\odbcconf.exe"; Parameters: "/S /F win_odbcinst.ini"; WorkingDir: "{app}\taos_odbc\x64"; Flags: runhidden; StatusMsg: "Configuring ODBC x64"
-Filename: "C:\Windows\SysWOW64\odbcconf.exe"; Parameters: "/S /F win_odbcinst.ini"; WorkingDir: "{app}\taos_odbc\x86"; Flags: runhidden; StatusMsg: "Configuring ODBC x86"
+Filename: "C:\Windows\System32\odbcconf.exe"; Parameters: "/S /F win_odbc_install.ini"; WorkingDir: "{app}\taos_odbc\x64"; Flags: runhidden; StatusMsg: "Configuring ODBC x64"
+Filename: "C:\Windows\SysWOW64\odbcconf.exe"; Parameters: "/S /F win_odbc_install.ini"; WorkingDir: "{app}\taos_odbc\x86"; Flags: runhidden; StatusMsg: "Configuring ODBC x86"
[UninstallRun]
RunOnceId: "stoptaosd"; Filename: {sys}\sc.exe; Parameters: "stop taosd" ; Flags: runhidden
diff --git a/source/common/src/systable.c b/source/common/src/systable.c
index be841d9682..eef38bf18e 100644
--- a/source/common/src/systable.c
+++ b/source/common/src/systable.c
@@ -398,6 +398,21 @@ static const SSysDbTableSchema userCompactsDetailSchema[] = {
{.name = "finished", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
};
+
+static const SSysDbTableSchema anodesSchema[] = {
+ {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
+ {.name = "url", .bytes = TSDB_ANAL_ANODE_URL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
+ {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
+ {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
+ {.name = "update_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
+};
+
+static const SSysDbTableSchema anodesFullSchema[] = {
+ {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
+ {.name = "type", .bytes = TSDB_ANAL_ALGO_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
+ {.name = "algo", .bytes = TSDB_ANAL_ALGO_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
+};
+
static const SSysDbTableSchema tsmaSchema[] = {
{.name = "tsma_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
@@ -472,6 +487,8 @@ static const SSysTableMeta infosMeta[] = {
{TSDB_INS_TABLE_ARBGROUPS, arbGroupsSchema, tListLen(arbGroupsSchema), true},
{TSDB_INS_TABLE_ENCRYPTIONS, encryptionsSchema, tListLen(encryptionsSchema), true},
{TSDB_INS_TABLE_TSMAS, tsmaSchema, tListLen(tsmaSchema), false},
+ {TSDB_INS_TABLE_ANODES, anodesSchema, tListLen(anodesSchema), true},
+ {TSDB_INS_TABLE_ANODES_FULL, anodesFullSchema, tListLen(anodesFullSchema), true},
};
static const SSysDbTableSchema connectionsSchema[] = {
diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c
index 986747fe58..63fcf900bf 100644
--- a/source/common/src/tmsg.c
+++ b/source/common/src/tmsg.c
@@ -40,6 +40,7 @@
#define TD_MSG_RANGE_CODE_
#include "tmsgdef.h"
+#include "tanal.h"
#include "tcol.h"
#include "tlog.h"
@@ -1453,6 +1454,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
}
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->ipWhiteVer));
+ TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->analVer));
TAOS_CHECK_EXIT(tSerializeSMonitorParas(&encoder, &pReq->clusterCfg.monitorParas));
tEndEncode(&encoder);
@@ -1576,6 +1578,10 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->ipWhiteVer));
}
+ if (!tDecodeIsEnd(&decoder)) {
+ TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->analVer));
+ }
+
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDeserializeSMonitorParas(&decoder, &pReq->clusterCfg.monitorParas));
}
@@ -1652,6 +1658,7 @@ int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->statusSeq));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ipWhiteVer));
+ TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->analVer));
tEndEncode(&encoder);
_exit:
@@ -1703,6 +1710,11 @@ int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ipWhiteVer));
}
+
+ if (!tDecodeIsEnd(&decoder)) {
+ TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->analVer));
+ }
+
tEndDecode(&decoder);
_exit:
tDecoderClear(&decoder);
@@ -2044,6 +2056,156 @@ _exit:
return code;
}
+int32_t tSerializeRetrieveAnalAlgoReq(void *buf, int32_t bufLen, SRetrieveAnalAlgoReq *pReq) {
+ SEncoder encoder = {0};
+ int32_t code = 0;
+ int32_t lino;
+ int32_t tlen;
+ tEncoderInit(&encoder, buf, bufLen);
+
+ TAOS_CHECK_EXIT(tStartEncode(&encoder));
+ TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->dnodeId));
+ TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->analVer));
+ tEndEncode(&encoder);
+
+_exit:
+ if (code) {
+ tlen = code;
+ } else {
+ tlen = encoder.pos;
+ }
+ tEncoderClear(&encoder);
+ return tlen;
+}
+
+int32_t tDeserializeRetrieveAnalAlgoReq(void *buf, int32_t bufLen, SRetrieveAnalAlgoReq *pReq) {
+ SDecoder decoder = {0};
+ int32_t code = 0;
+ int32_t lino;
+
+ tDecoderInit(&decoder, buf, bufLen);
+
+ TAOS_CHECK_EXIT(tStartDecode(&decoder));
+ TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->dnodeId));
+ TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->analVer));
+ tEndDecode(&decoder);
+
+_exit:
+ tDecoderClear(&decoder);
+ return code;
+}
+
+int32_t tSerializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalAlgoRsp *pRsp) {
+ SEncoder encoder = {0};
+ int32_t code = 0;
+ int32_t lino;
+ int32_t tlen;
+ tEncoderInit(&encoder, buf, bufLen);
+
+ int32_t numOfAlgos = 0;
+ void *pIter = taosHashIterate(pRsp->hash, NULL);
+ while (pIter != NULL) {
+ SAnalUrl *pUrl = pIter;
+ size_t nameLen = 0;
+ const char *name = taosHashGetKey(pIter, &nameLen);
+ if (nameLen > 0 && nameLen <= TSDB_ANAL_ALGO_KEY_LEN && pUrl->urlLen > 0) {
+ numOfAlgos++;
+ }
+ pIter = taosHashIterate(pRsp->hash, pIter);
+ }
+
+ TAOS_CHECK_EXIT(tStartEncode(&encoder));
+ TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ver));
+ TAOS_CHECK_EXIT(tEncodeI32(&encoder, numOfAlgos));
+
+ pIter = taosHashIterate(pRsp->hash, NULL);
+ while (pIter != NULL) {
+ SAnalUrl *pUrl = pIter;
+ size_t nameLen = 0;
+ const char *name = taosHashGetKey(pIter, &nameLen);
+ if (nameLen > 0 && pUrl->urlLen > 0) {
+ TAOS_CHECK_EXIT(tEncodeI32(&encoder, nameLen));
+ TAOS_CHECK_EXIT(tEncodeBinary(&encoder, (const uint8_t *)name, nameLen));
+ TAOS_CHECK_EXIT(tEncodeI32(&encoder, pUrl->anode));
+ TAOS_CHECK_EXIT(tEncodeI32(&encoder, pUrl->type));
+ TAOS_CHECK_EXIT(tEncodeI32(&encoder, pUrl->urlLen));
+ TAOS_CHECK_EXIT(tEncodeBinary(&encoder, (const uint8_t *)pUrl->url, pUrl->urlLen));
+ }
+ pIter = taosHashIterate(pRsp->hash, pIter);
+ }
+
+ tEndEncode(&encoder);
+
+_exit:
+ if (code) {
+ tlen = code;
+ } else {
+ tlen = encoder.pos;
+ }
+ tEncoderClear(&encoder);
+ return tlen;
+}
+
+int32_t tDeserializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalAlgoRsp *pRsp) {
+ if (pRsp->hash == NULL) {
+ pRsp->hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
+ if (pRsp->hash == NULL) {
+ terrno = TSDB_CODE_OUT_OF_BUFFER;
+ return terrno;
+ }
+ }
+
+ SDecoder decoder = {0};
+ int32_t code = 0;
+ int32_t lino;
+ tDecoderInit(&decoder, buf, bufLen);
+
+ int32_t numOfAlgos = 0;
+ int32_t nameLen;
+ int32_t type;
+ char name[TSDB_ANAL_ALGO_KEY_LEN];
+ SAnalUrl url = {0};
+
+ TAOS_CHECK_EXIT(tStartDecode(&decoder));
+ TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pRsp->ver));
+ TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numOfAlgos));
+
+ for (int32_t f = 0; f < numOfAlgos; ++f) {
+ TAOS_CHECK_EXIT(tDecodeI32(&decoder, &nameLen));
+ if (nameLen > 0 && nameLen <= TSDB_ANAL_ALGO_NAME_LEN) {
+ TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, name));
+ }
+
+ TAOS_CHECK_EXIT(tDecodeI32(&decoder, &url.anode));
+ TAOS_CHECK_EXIT(tDecodeI32(&decoder, &type));
+ url.type = (EAnalAlgoType)type;
+ TAOS_CHECK_EXIT(tDecodeI32(&decoder, &url.urlLen));
+ if (url.urlLen > 0) {
+ TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void **)&url.url, NULL) < 0);
+ }
+
+ TAOS_CHECK_EXIT(taosHashPut(pRsp->hash, name, nameLen, &url, sizeof(SAnalUrl)));
+ }
+
+ tEndDecode(&decoder);
+
+_exit:
+ tDecoderClear(&decoder);
+ return code;
+}
+
+void tFreeRetrieveAnalAlgoRsp(SRetrieveAnalAlgoRsp *pRsp) {
+ void *pIter = taosHashIterate(pRsp->hash, NULL);
+ while (pIter != NULL) {
+ SAnalUrl *pUrl = (SAnalUrl *)pIter;
+ taosMemoryFree(pUrl->url);
+ pIter = taosHashIterate(pRsp->hash, pIter);
+ }
+ taosHashCleanup(pRsp->hash);
+
+ pRsp->hash = NULL;
+}
+
void tFreeSCreateUserReq(SCreateUserReq *pReq) {
FREESQL();
taosMemoryFreeClear(pReq->pIpRanges);
@@ -2961,6 +3123,108 @@ _exit:
return code;
}
+int32_t tSerializeSMCreateAnodeReq(void *buf, int32_t bufLen, SMCreateAnodeReq *pReq) {
+ SEncoder encoder = {0};
+ int32_t code = 0;
+ int32_t lino;
+ int32_t tlen;
+ tEncoderInit(&encoder, buf, bufLen);
+
+ TAOS_CHECK_EXIT(tStartEncode(&encoder));
+ TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->urlLen));
+ if (pReq->urlLen > 0) {
+ TAOS_CHECK_EXIT(tEncodeBinary(&encoder, (const uint8_t *)pReq->url, pReq->urlLen));
+ }
+ ENCODESQL();
+ tEndEncode(&encoder);
+
+_exit:
+ if (code) {
+ tlen = code;
+ } else {
+ tlen = encoder.pos;
+ }
+ tEncoderClear(&encoder);
+ return tlen;
+}
+
+int32_t tDeserializeSMCreateAnodeReq(void *buf, int32_t bufLen, SMCreateAnodeReq *pReq) {
+ SDecoder decoder = {0};
+ int32_t code = 0;
+ int32_t lino;
+
+ tDecoderInit(&decoder, buf, bufLen);
+
+ TAOS_CHECK_EXIT(tStartDecode(&decoder));
+ TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->urlLen));
+ if (pReq->urlLen > 0) {
+ TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void **)&pReq->url, NULL));
+ }
+
+ DECODESQL();
+ tEndDecode(&decoder);
+
+_exit:
+ tDecoderClear(&decoder);
+ return code;
+}
+
+void tFreeSMCreateAnodeReq(SMCreateAnodeReq *pReq) {
+ taosMemoryFreeClear(pReq->url);
+ FREESQL();
+}
+
+int32_t tSerializeSMDropAnodeReq(void *buf, int32_t bufLen, SMDropAnodeReq *pReq) {
+ SEncoder encoder = {0};
+ int32_t code = 0;
+ int32_t lino;
+ int32_t tlen;
+ tEncoderInit(&encoder, buf, bufLen);
+
+ TAOS_CHECK_EXIT(tStartEncode(&encoder));
+ TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->anodeId));
+ ENCODESQL();
+ tEndEncode(&encoder);
+
+_exit:
+ if (code) {
+ tlen = code;
+ } else {
+ tlen = encoder.pos;
+ }
+ tEncoderClear(&encoder);
+ return tlen;
+}
+
+int32_t tDeserializeSMDropAnodeReq(void *buf, int32_t bufLen, SMDropAnodeReq *pReq) {
+ SDecoder decoder = {0};
+ int32_t code = 0;
+ int32_t lino;
+
+ tDecoderInit(&decoder, buf, bufLen);
+
+ TAOS_CHECK_EXIT(tStartDecode(&decoder));
+ TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->anodeId));
+ DECODESQL();
+ tEndDecode(&decoder);
+
+_exit:
+ tDecoderClear(&decoder);
+ return code;
+}
+
+void tFreeSMDropAnodeReq(SMDropAnodeReq *pReq) { FREESQL(); }
+
+int32_t tSerializeSMUpdateAnodeReq(void *buf, int32_t bufLen, SMUpdateAnodeReq *pReq) {
+ return tSerializeSMDropAnodeReq(buf, bufLen, pReq);
+}
+
+int32_t tDeserializeSMUpdateAnodeReq(void *buf, int32_t bufLen, SMUpdateAnodeReq *pReq) {
+ return tDeserializeSMDropAnodeReq(buf, bufLen, pReq);
+}
+
+void tFreeSMUpdateAnodeReq(SMUpdateAnodeReq *pReq) { tFreeSMDropAnodeReq(pReq); }
+
int32_t tSerializeSCreateDnodeReq(void *buf, int32_t bufLen, SCreateDnodeReq *pReq) {
SEncoder encoder = {0};
int32_t code = 0;
diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
index 419c669103..bc33fc43dc 100644
--- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
+++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
@@ -18,6 +18,7 @@
#include "dmInt.h"
#include "monitor.h"
#include "systable.h"
+#include "tanal.h"
#include "tchecksum.h"
extern SConfig *tsCfg;
@@ -39,6 +40,7 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
(void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
}
}
+
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
int32_t code = 0;
dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
@@ -84,6 +86,47 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
}
}
+
+static void dmMayShouldUpdateAnalFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
+ int32_t code = 0;
+ int64_t oldVer = taosAnalGetVersion();
+ if (oldVer == newVer) return;
+ dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
+
+ SRetrieveAnalAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
+ int32_t contLen = tSerializeRetrieveAnalAlgoReq(NULL, 0, &req);
+ if (contLen < 0) {
+ dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
+ return;
+ }
+
+ void *pHead = rpcMallocCont(contLen);
+ contLen = tSerializeRetrieveAnalAlgoReq(pHead, contLen, &req);
+ if (contLen < 0) {
+ rpcFreeCont(pHead);
+ dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
+ return;
+ }
+
+ SRpcMsg rpcMsg = {
+ .pCont = pHead,
+ .contLen = contLen,
+ .msgType = TDMT_MND_RETRIEVE_ANAL_ALGO,
+ .info.ahandle = (void *)0x9527,
+ .info.refId = 0,
+ .info.noResp = 0,
+ .info.handle = 0,
+ };
+ SEpSet epset = {0};
+
+ (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
+
+ code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
+ if (code != 0) {
+ dError("failed to send retrieve analysis func ver request since %s", tstrerror(code));
+ }
+}
+
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
const STraceId *trace = &pRsp->info.traceId;
dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
@@ -111,6 +154,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
}
dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer);
+ dmMayShouldUpdateAnalFunc(pMgmt, statusRsp.analVer);
}
tFreeSStatusRsp(&statusRsp);
}
@@ -172,6 +216,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
pMgmt->statusSeq++;
req.statusSeq = pMgmt->statusSeq;
req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
+ req.analVer = taosAnalGetVersion();
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
if (contLen < 0) {
diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c
index b9f4ab54f4..e84d756e0a 100644
--- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c
+++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c
@@ -17,6 +17,7 @@
#include "dmMgmt.h"
#include "qworker.h"
#include "tversion.h"
+#include "tanal.h"
static inline void dmSendRsp(SRpcMsg *pMsg) {
if (rpcSendResponse(pMsg) != 0) {
@@ -105,6 +106,17 @@ static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
return false;
}
}
+
+static void dmUpdateAnalFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
+ SRetrieveAnalAlgoRsp rsp = {0};
+ if (tDeserializeRetrieveAnalAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
+ taosAnalUpdate(rsp.ver, rsp.hash);
+ rsp.hash = NULL;
+ }
+ tFreeRetrieveAnalAlgoRsp(&rsp);
+ rpcFreeCont(pRpc->pCont);
+}
+
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans;
int32_t code = -1;
@@ -150,10 +162,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dmSetMnodeEpSet(&pDnode->data, pEpSet);
}
break;
- case TDMT_MND_RETRIEVE_IP_WHITE_RSP: {
+ case TDMT_MND_RETRIEVE_IP_WHITE_RSP:
dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
return;
- } break;
+ case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
+ dmUpdateAnalFunc(&pDnode->data, pTrans->serverRpc, pRpc);
+ return;
default:
break;
}
diff --git a/source/dnode/mnode/impl/src/mndStreamTransAct.c b/source/dnode/mnode/impl/src/mndStreamTransAct.c
index 3ecd192222..4e0bf97587 100644
--- a/source/dnode/mnode/impl/src/mndStreamTransAct.c
+++ b/source/dnode/mnode/impl/src/mndStreamTransAct.c
@@ -234,6 +234,7 @@ static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask
code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
mError("failed to extract epset during create update epset, code:%s", tstrerror(code));
+ taosMemoryFree(pBuf);
return code;
}
diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c
index 40bb99d6b5..e16c6efa47 100644
--- a/source/dnode/mnode/impl/src/mndTrans.c
+++ b/source/dnode/mnode/impl/src/mndTrans.c
@@ -1495,7 +1495,7 @@ static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArr
return code;
}
- mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->actionPos);
+ mInfo("trans:%d, execute %d actions serial, current action:%d", pTrans->id, numOfActions, pTrans->actionPos);
for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pActions, action);
@@ -1768,7 +1768,8 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool to
if (code == 0) {
pTrans->stage = TRN_STAGE_UNDO_ACTION;
- mInfo("trans:%d, stage from rollback to undoAction", pTrans->id);
+ pTrans->actionPos = 0;
+ mInfo("trans:%d, stage from rollback to undoAction, actionPos:%d", pTrans->id, pTrans->actionPos);
continueExec = true;
} else {
pTrans->failedTimes++;
diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c
new file mode 100644
index 0000000000..7267bbbe09
--- /dev/null
+++ b/source/libs/executor/src/anomalywindowoperator.c
@@ -0,0 +1,609 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include "executorInt.h"
+#include "filter.h"
+#include "function.h"
+#include "functionMgt.h"
+#include "operator.h"
+#include "querytask.h"
+#include "tanal.h"
+#include "tcommon.h"
+#include "tcompare.h"
+#include "tdatablock.h"
+#include "tjson.h"
+#include "ttime.h"
+
+#ifdef USE_ANAL
+
+typedef struct {
+ SArray* blocks; // SSDataBlock*
+ SArray* windows; // STimeWindow
+ uint64_t groupId;
+ int64_t numOfRows;
+ int32_t curWinIndex;
+ STimeWindow curWin;
+ SResultRow* pResultRow;
+} SAnomalyWindowSupp;
+
+typedef struct {
+ SOptrBasicInfo binfo;
+ SAggSupporter aggSup;
+ SExprSupp scalarSup;
+ int32_t tsSlotId;
+ STimeWindowAggSupp twAggSup;
+ char algoName[TSDB_ANAL_ALGO_NAME_LEN];
+ char algoUrl[TSDB_ANAL_ALGO_URL_LEN];
+ char anomalyOpt[TSDB_ANAL_ALGO_OPTION_LEN];
+ SAnomalyWindowSupp anomalySup;
+ SWindowRowsSup anomalyWinRowSup;
+ SColumn anomalyCol;
+ SStateKeys anomalyKey;
+} SAnomalyWindowOperatorInfo;
+
+static void anomalyDestroyOperatorInfo(void* param);
+static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
+static void anomalyAggregateBlocks(SOperatorInfo* pOperator);
+static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* pBlock);
+
+int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
+ SOperatorInfo** pOptrInfo) {
+ QRY_PARAM_CHECK(pOptrInfo);
+
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SAnomalyWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnomalyWindowOperatorInfo));
+ SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
+ SAnomalyWindowPhysiNode* pAnomalyNode = (SAnomalyWindowPhysiNode*)physiNode;
+ SColumnNode* pColNode = (SColumnNode*)(pAnomalyNode->pAnomalyKey);
+ if (pInfo == NULL || pOperator == NULL) {
+ code = terrno;
+ goto _error;
+ }
+
+ if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) {
+ qError("failed to get anomaly_window algorithm name from %s", pAnomalyNode->anomalyOpt);
+ code = TSDB_CODE_ANAL_ALGO_NOT_FOUND;
+ goto _error;
+ }
+ if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) {
+ qError("failed to get anomaly_window algorithm url from %s", pInfo->algoName);
+ code = TSDB_CODE_ANAL_ALGO_NOT_LOAD;
+ goto _error;
+ }
+
+ pOperator->exprSupp.hasWindowOrGroup = true;
+ pInfo->tsSlotId = ((SColumnNode*)pAnomalyNode->window.pTspk)->slotId;
+ strncpy(pInfo->anomalyOpt, pAnomalyNode->anomalyOpt, sizeof(pInfo->anomalyOpt));
+
+ if (pAnomalyNode->window.pExprs != NULL) {
+ int32_t numOfScalarExpr = 0;
+ SExprInfo* pScalarExprInfo = NULL;
+ code = createExprInfo(pAnomalyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
+ QUERY_CHECK_CODE(code, lino, _error);
+ code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
+ QUERY_CHECK_CODE(code, lino, _error);
+ }
+
+ size_t keyBufSize = 0;
+ int32_t num = 0;
+ SExprInfo* pExprInfo = NULL;
+ code = createExprInfo(pAnomalyNode->window.pFuncs, NULL, &pExprInfo, &num);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ initResultSizeInfo(&pOperator->resultInfo, 4096);
+
+ code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
+ pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ SSDataBlock* pResBlock = createDataBlockFromDescNode(pAnomalyNode->window.node.pOutputDataBlockDesc);
+ QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
+ initBasicInfo(&pInfo->binfo, pResBlock);
+
+ code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ initResultRowInfo(&pInfo->binfo.resultRowInfo);
+ pInfo->binfo.inputTsOrder = pAnomalyNode->window.node.inputTsOrder;
+ pInfo->binfo.outputTsOrder = pAnomalyNode->window.node.outputTsOrder;
+
+ pInfo->anomalyCol = extractColumnFromColumnNode(pColNode);
+ pInfo->anomalyKey.type = pInfo->anomalyCol.type;
+ pInfo->anomalyKey.bytes = pInfo->anomalyCol.bytes;
+ pInfo->anomalyKey.pData = taosMemoryCalloc(1, pInfo->anomalyCol.bytes);
+ if (pInfo->anomalyKey.pData == NULL) {
+ goto _error;
+ }
+
+ int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize + pInfo->anomalyKey.bytes;
+ pInfo->anomalySup.pResultRow = taosMemoryCalloc(1, itemSize);
+ pInfo->anomalySup.blocks = taosArrayInit(16, sizeof(SSDataBlock*));
+ pInfo->anomalySup.windows = taosArrayInit(16, sizeof(STimeWindow));
+
+ if (pInfo->anomalySup.windows == NULL || pInfo->anomalySup.blocks == NULL || pInfo->anomalySup.pResultRow == NULL) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _error;
+ }
+
+ code = filterInitFromNode((SNode*)pAnomalyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ setOperatorInfo(pOperator, "AnomalyWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY, true, OP_NOT_OPENED,
+ pInfo, pTaskInfo);
+ pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, anomalyAggregateNext, NULL, anomalyDestroyOperatorInfo,
+ optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
+
+ code = appendDownstream(pOperator, &downstream, 1);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ *pOptrInfo = pOperator;
+
+ qDebug("anomaly_window operator is created, algo:%s url:%s opt:%s", pInfo->algoName, pInfo->algoUrl,
+ pInfo->anomalyOpt);
+ return TSDB_CODE_SUCCESS;
+
+_error:
+ if (pInfo != NULL) {
+ anomalyDestroyOperatorInfo(pInfo);
+ }
+
+ destroyOperatorAndDownstreams(pOperator, &downstream, 1);
+ pTaskInfo->code = code;
+ qError("failed to create anomaly_window operator, algo:%s code:0x%x", pInfo->algoName, code);
+ return code;
+}
+
+static int32_t anomalyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SAnomalyWindowOperatorInfo* pInfo = pOperator->info;
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+ SOptrBasicInfo* pBInfo = &pInfo->binfo;
+ SAnomalyWindowSupp* pSupp = &pInfo->anomalySup;
+ SSDataBlock* pRes = pInfo->binfo.pRes;
+ int64_t st = taosGetTimestampUs();
+ int32_t numOfBlocks = taosArrayGetSize(pSupp->blocks);
+
+ blockDataCleanup(pRes);
+
+ while (1) {
+ SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
+ if (pBlock == NULL) {
+ break;
+ }
+
+ if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) {
+ pSupp->groupId = pBlock->info.id.groupId;
+ numOfBlocks++;
+ qDebug("group:%" PRId64 ", blocks:%d, cache block rows:%" PRId64, pSupp->groupId, numOfBlocks, pBlock->info.rows);
+ code = anomalyCacheBlock(pInfo, pBlock);
+ QUERY_CHECK_CODE(code, lino, _end);
+ } else {
+ qDebug("group:%" PRId64 ", read finish for new group coming, blocks:%d", pSupp->groupId, numOfBlocks);
+ anomalyAggregateBlocks(pOperator);
+ pSupp->groupId = pBlock->info.id.groupId;
+ numOfBlocks = 1;
+ qDebug("group:%" PRId64 ", new group, cache block rows:%" PRId64, pSupp->groupId, pBlock->info.rows);
+ code = anomalyCacheBlock(pInfo, pBlock);
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
+
+ if (pRes->info.rows > 0) {
+ (*ppRes) = pRes;
+ qDebug("group:%" PRId64 ", return to upstream, blocks:%d", pRes->info.id.groupId, numOfBlocks);
+ return code;
+ }
+ }
+
+ if (numOfBlocks > 0) {
+ qDebug("group:%" PRId64 ", read finish, blocks:%d", pInfo->anomalySup.groupId, numOfBlocks);
+ anomalyAggregateBlocks(pOperator);
+ }
+
+ int64_t cost = taosGetTimestampUs() - st;
+ qDebug("all groups finished, cost:%" PRId64 "us", cost);
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ pTaskInfo->code = code;
+ T_LONG_JMP(pTaskInfo->env, code);
+ }
+ (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
+ return code;
+}
+
+static void anomalyDestroyOperatorInfo(void* param) {
+ SAnomalyWindowOperatorInfo* pInfo = (SAnomalyWindowOperatorInfo*)param;
+ if (pInfo == NULL) return;
+
+ qDebug("anomaly_window operator is destroyed, algo:%s", pInfo->algoName);
+
+ cleanupBasicInfo(&pInfo->binfo);
+ cleanupAggSup(&pInfo->aggSup);
+ cleanupExprSupp(&pInfo->scalarSup);
+ colDataDestroy(&pInfo->twAggSup.timeWindowData);
+
+ for (int32_t i = 0; i < taosArrayGetSize(pInfo->anomalySup.blocks); ++i) {
+ SSDataBlock* pBlock = taosArrayGetP(pInfo->anomalySup.blocks, i);
+ blockDataDestroy(pBlock);
+ }
+ taosArrayDestroy(pInfo->anomalySup.blocks);
+ taosArrayDestroy(pInfo->anomalySup.windows);
+ taosMemoryFreeClear(pInfo->anomalySup.pResultRow);
+ taosMemoryFreeClear(pInfo->anomalyKey.pData);
+
+ taosMemoryFreeClear(param);
+}
+
+static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* pSrc) {
+ SSDataBlock* pDst = NULL;
+ int32_t code = createOneDataBlock(pSrc, true, &pDst);
+
+ if (code != 0) return code;
+ if (pDst == NULL) return TSDB_CODE_OUT_OF_MEMORY;
+ if (taosArrayPush(pInfo->anomalySup.blocks, &pDst) == NULL) return TSDB_CODE_OUT_OF_MEMORY;
+
+ return 0;
+}
+
+static int32_t anomalyFindWindow(SAnomalyWindowSupp* pSupp, TSKEY key) {
+ for (int32_t i = pSupp->curWinIndex; i < taosArrayGetSize(pSupp->windows); ++i) {
+ STimeWindow* pWindow = taosArrayGet(pSupp->windows, i);
+ if (key >= pWindow->skey && key < pWindow->ekey) {
+ pSupp->curWin = *pWindow;
+ pSupp->curWinIndex = i;
+ return 0;
+ }
+ }
+ return -1;
+}
+
+static int32_t anomalyParseJson(SJson* pJson, SArray* pWindows) {
+ int32_t code = 0;
+ int32_t rows = 0;
+ STimeWindow win = {0};
+
+ taosArrayClear(pWindows);
+
+ tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code);
+ if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
+ if (rows <= 0) return 0;
+
+ SJson* res = tjsonGetObjectItem(pJson, "res");
+ if (res == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
+
+ int32_t ressize = tjsonGetArraySize(res);
+ if (ressize != rows) return TSDB_CODE_INVALID_JSON_FORMAT;
+
+ for (int32_t i = 0; i < rows; ++i) {
+ SJson* row = tjsonGetArrayItem(res, i);
+ if (row == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
+
+ int32_t colsize = tjsonGetArraySize(row);
+ if (colsize != 2) return TSDB_CODE_INVALID_JSON_FORMAT;
+
+ SJson* start = tjsonGetArrayItem(row, 0);
+ SJson* end = tjsonGetArrayItem(row, 1);
+ if (start == NULL || end == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
+
+ tjsonGetObjectValueBigInt(start, &win.skey);
+ tjsonGetObjectValueBigInt(end, &win.ekey);
+
+ if (win.skey >= win.ekey) {
+ win.ekey = win.skey + 1;
+ }
+
+ if (taosArrayPush(pWindows, &win) == NULL) return TSDB_CODE_OUT_OF_BUFFER;
+ }
+
+ int32_t numOfWins = taosArrayGetSize(pWindows);
+ qDebug("anomaly window recevied, total:%d", numOfWins);
+ for (int32_t i = 0; i < numOfWins; ++i) {
+ STimeWindow* pWindow = taosArrayGet(pWindows, i);
+ qDebug("anomaly win:%d [%" PRId64 ", %" PRId64 ")", i, pWindow->skey, pWindow->ekey);
+ }
+
+ return 0;
+}
+
+static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) {
+ SAnomalyWindowOperatorInfo* pInfo = pOperator->info;
+ SAnomalyWindowSupp* pSupp = &pInfo->anomalySup;
+ SJson* pJson = NULL;
+ SAnalBuf analBuf = {.bufType = ANAL_BUF_TYPE_JSON};
+ char dataBuf[64] = {0};
+ int32_t code = 0;
+
+ int64_t ts = 0;
+ // int64_t ts = taosGetTimestampMs();
+ snprintf(analBuf.fileName, sizeof(analBuf.fileName), "%s/tdengine-anomaly-%" PRId64 "-%" PRId64, tsTempDir, ts,
+ pSupp->groupId);
+ code = tsosAnalBufOpen(&analBuf, 2);
+ if (code != 0) goto _OVER;
+
+ const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
+ if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR;
+ if (pInfo->anomalyCol.precision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR;
+
+ code = taosAnalBufWriteOptStr(&analBuf, "algo", pInfo->algoName);
+ if (code != 0) goto _OVER;
+
+ code = taosAnalBufWriteOptStr(&analBuf, "prec", prec);
+ if (code != 0) goto _OVER;
+
+ code = taosAnalBufWriteColMeta(&analBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts");
+ if (code != 0) goto _OVER;
+
+ code = taosAnalBufWriteColMeta(&analBuf, 1, pInfo->anomalyCol.type, "val");
+ if (code != 0) goto _OVER;
+
+ code = taosAnalBufWriteDataBegin(&analBuf);
+ if (code != 0) goto _OVER;
+
+ int32_t numOfBlocks = (int32_t)taosArrayGetSize(pSupp->blocks);
+
+ // timestamp
+ code = taosAnalBufWriteColBegin(&analBuf, 0);
+ if (code != 0) goto _OVER;
+ for (int32_t i = 0; i < numOfBlocks; ++i) {
+ SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, i);
+ if (pBlock == NULL) break;
+ SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
+ if (pTsCol == NULL) break;
+ for (int32_t j = 0; j < pBlock->info.rows; ++j) {
+ code = taosAnalBufWriteColData(&analBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &((TSKEY*)pTsCol->pData)[j]);
+ if (code != 0) goto _OVER;
+ }
+ }
+ code = taosAnalBufWriteColEnd(&analBuf, 0);
+ if (code != 0) goto _OVER;
+
+ // data
+ code = taosAnalBufWriteColBegin(&analBuf, 1);
+ if (code != 0) goto _OVER;
+ for (int32_t i = 0; i < numOfBlocks; ++i) {
+ SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, i);
+ if (pBlock == NULL) break;
+ SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pInfo->anomalyCol.slotId);
+ if (pValCol == NULL) break;
+
+ for (int32_t j = 0; j < pBlock->info.rows; ++j) {
+ code = taosAnalBufWriteColData(&analBuf, 1, pValCol->info.type, colDataGetData(pValCol, j));
+ if (code != 0) goto _OVER;
+ if (code != 0) goto _OVER;
+ }
+ }
+ code = taosAnalBufWriteColEnd(&analBuf, 1);
+ if (code != 0) goto _OVER;
+
+ code = taosAnalBufWriteDataEnd(&analBuf);
+ if (code != 0) goto _OVER;
+
+ code = taosAnalBufWriteOptStr(&analBuf, "option", pInfo->anomalyOpt);
+ if (code != 0) goto _OVER;
+
+ code = taosAnalBufClose(&analBuf);
+ if (code != 0) goto _OVER;
+
+ pJson = taosAnalSendReqRetJson(pInfo->algoUrl, ANAL_HTTP_TYPE_POST, &analBuf);
+ if (pJson == NULL) {
+ code = terrno;
+ goto _OVER;
+ }
+
+ code = anomalyParseJson(pJson, pSupp->windows);
+ if (code != 0) goto _OVER;
+
+_OVER:
+ if (code != 0) {
+ qError("failed to analysis window since %s", tstrerror(code));
+ }
+ taosAnalBufDestroy(&analBuf);
+ if (pJson != NULL) tjsonDelete(pJson);
+ return code;
+}
+
+static void anomalyAggregateRows(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
+ SAnomalyWindowOperatorInfo* pInfo = pOperator->info;
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+ SExprSupp* pExprSup = &pOperator->exprSupp;
+ SAnomalyWindowSupp* pSupp = &pInfo->anomalySup;
+ SWindowRowsSup* pRowSup = &pInfo->anomalyWinRowSup;
+ SResultRow* pResRow = pSupp->pResultRow;
+ int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
+
+ if (setResultRowInitCtx(pResRow, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset) == 0) {
+ updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pSupp->curWin, 0);
+ applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
+ pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
+ }
+}
+
+static void anomalyBuildResult(SOperatorInfo* pOperator) {
+ SAnomalyWindowOperatorInfo* pInfo = pOperator->info;
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+ SExprSupp* pExprSup = &pOperator->exprSupp;
+ SSDataBlock* pRes = pInfo->binfo.pRes;
+ SResultRow* pResRow = pInfo->anomalySup.pResultRow;
+
+ doUpdateNumOfRows(pExprSup->pCtx, pResRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
+ copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pResRow, pExprSup->pCtx, pRes,
+ pExprSup->rowEntryInfoOffset, pTaskInfo);
+ pRes->info.rows += pResRow->numOfRows;
+ clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
+}
+
+static void anomalyAggregateBlocks(SOperatorInfo* pOperator) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SAnomalyWindowOperatorInfo* pInfo = pOperator->info;
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+ SExprSupp* pExprSup = &pOperator->exprSupp;
+ SSDataBlock* pRes = pInfo->binfo.pRes;
+ SAnomalyWindowSupp* pSupp = &pInfo->anomalySup;
+ SWindowRowsSup* pRowSup = &pInfo->anomalyWinRowSup;
+ SResultRow* pResRow = pSupp->pResultRow;
+ int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
+ int32_t rowsInWin = 0;
+ int32_t rowsInBlock = 0;
+ const int64_t gid = pSupp->groupId;
+ const int32_t order = pInfo->binfo.inputTsOrder;
+
+ int32_t numOfBlocks = (int32_t)taosArrayGetSize(pSupp->blocks);
+ if (numOfBlocks == 0) goto _OVER;
+
+ qDebug("group:%" PRId64 ", aggregate blocks, blocks:%d", pSupp->groupId, numOfBlocks);
+ pRes->info.id.groupId = pSupp->groupId;
+
+ code = anomalyAnalysisWindow(pOperator);
+ QUERY_CHECK_CODE(code, lino, _OVER);
+
+ int32_t numOfWins = taosArrayGetSize(pSupp->windows);
+ qDebug("group:%" PRId64 ", wins:%d, rows:%" PRId64, pSupp->groupId, numOfWins, pSupp->numOfRows);
+ for (int32_t w = 0; w < numOfWins; ++w) {
+ STimeWindow* pWindow = taosArrayGet(pSupp->windows, w);
+ if (w == 0) {
+ pSupp->curWin = *pWindow;
+ pRowSup->win.skey = pSupp->curWin.skey;
+ }
+ qDebug("group:%" PRId64 ", win:%d [%" PRId64 ", %" PRId64 ")", pSupp->groupId, w, pWindow->skey, pWindow->ekey);
+ }
+
+ if (numOfWins <= 0) goto _OVER;
+ if (numOfWins > pRes->info.capacity) {
+ code = blockDataEnsureCapacity(pRes, numOfWins);
+ QUERY_CHECK_CODE(code, lino, _OVER);
+ }
+
+ for (int32_t b = 0; b < numOfBlocks; ++b) {
+ SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, b);
+ if (pBlock == NULL) break;
+
+ pRes->info.scanFlag = pBlock->info.scanFlag;
+ code = setInputDataBlock(pExprSup, pBlock, order, MAIN_SCAN, true);
+ if (code != 0) break;
+
+ code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
+ if (code != 0) break;
+
+ // there is an scalar expression that needs to be calculated right before apply the group aggregation.
+ if (pInfo->scalarSup.pExprInfo != NULL) {
+ code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
+ pInfo->scalarSup.numOfExprs, NULL);
+ if (code != 0) break;
+ }
+
+ SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pInfo->anomalyCol.slotId);
+ if (pValCol == NULL) break;
+ SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
+ if (pTsCol == NULL) break;
+ TSKEY* tsList = (TSKEY*)pTsCol->pData;
+ bool lastBlock = (b == numOfBlocks - 1);
+
+ qTrace("group:%" PRId64 ", block:%d win:%d, riwin:%d riblock:%d, rows:%" PRId64, pSupp->groupId, b,
+ pSupp->curWinIndex, rowsInWin, rowsInBlock, pBlock->info.rows);
+
+ for (int32_t r = 0; r < pBlock->info.rows; ++r) {
+ TSKEY key = tsList[r];
+ bool keyInWin = (key >= pSupp->curWin.skey && key < pSupp->curWin.ekey);
+ bool lastRow = (r == pBlock->info.rows - 1);
+
+ if (keyInWin) {
+ if (r < 5) {
+ qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d", pSupp->groupId, b,
+ pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock);
+ }
+ if (rowsInBlock == 0) {
+ doKeepNewWindowStartInfo(pRowSup, tsList, r, gid);
+ }
+ doKeepTuple(pRowSup, tsList[r], gid);
+ rowsInBlock++;
+ rowsInWin++;
+ } else {
+ if (rowsInBlock > 0) {
+ qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, agg", pSupp->groupId,
+ b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock);
+ anomalyAggregateRows(pOperator, pBlock);
+ rowsInBlock = 0;
+ }
+ if (rowsInWin > 0) {
+ qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, build result",
+ pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock);
+ anomalyBuildResult(pOperator);
+ rowsInWin = 0;
+ }
+ if (anomalyFindWindow(pSupp, tsList[r]) == 0) {
+ qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, new window detect",
+ pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock);
+ doKeepNewWindowStartInfo(pRowSup, tsList, r, gid);
+ doKeepTuple(pRowSup, tsList[r], gid);
+ rowsInBlock = 1;
+ rowsInWin = 1;
+ } else {
+ qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, window not found",
+ pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock);
+ rowsInBlock = 0;
+ rowsInWin = 0;
+ }
+ }
+
+ if (lastRow && rowsInBlock > 0) {
+ qTrace("group:%" PRId64 ", block:%d win:%d, row:%d ts:%" PRId64 ", riwin:%d riblock:%d, agg since lastrow",
+ pSupp->groupId, b, pSupp->curWinIndex, r, key, rowsInWin, rowsInBlock);
+ anomalyAggregateRows(pOperator, pBlock);
+ rowsInBlock = 0;
+ }
+ }
+
+ if (lastBlock && rowsInWin > 0) {
+ qTrace("group:%" PRId64 ", block:%d win:%d, riwin:%d riblock:%d, build result since lastblock", pSupp->groupId, b,
+ pSupp->curWinIndex, rowsInWin, rowsInBlock);
+ anomalyBuildResult(pOperator);
+ rowsInWin = 0;
+ }
+ }
+
+ code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
+ QUERY_CHECK_CODE(code, lino, _OVER);
+
+_OVER:
+ for (int32_t i = 0; i < numOfBlocks; ++i) {
+ SSDataBlock* pBlock = taosArrayGetP(pSupp->blocks, i);
+ qDebug("%s, clear block, pBlock:%p pBlock->pDataBlock:%p", __func__, pBlock, pBlock->pDataBlock);
+ blockDataDestroy(pBlock);
+ }
+
+ taosArrayClear(pSupp->blocks);
+ taosArrayClear(pSupp->windows);
+ pSupp->numOfRows = 0;
+ pSupp->curWin.ekey = 0;
+ pSupp->curWin.skey = 0;
+ pSupp->curWinIndex = 0;
+}
+
+#else
+
+int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
+ SOperatorInfo** pOptrInfo) {
+ return TSDB_CODE_OPS_NOT_SUPPORT;
+}
+void destroyForecastInfo(void* param) {}
+
+#endif
\ No newline at end of file
diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c
index 05654f65c1..92096b11e6 100644
--- a/source/libs/executor/src/executil.c
+++ b/source/libs/executor/src/executil.c
@@ -1794,9 +1794,13 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
SFunctionNode* pFuncNode = (SFunctionNode*)pNode;
- SDataType* pType = &pFuncNode->node.resType;
- pExp->base.resSchema =
- createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pFuncNode->node.aliasName);
+ SDataType* pType = &pFuncNode->node.resType;
+ const char* pName = pFuncNode->node.aliasName;
+ if (pFuncNode->funcType == FUNCTION_TYPE_FORECAST_LOW || pFuncNode->funcType == FUNCTION_TYPE_FORECAST_HIGH ||
+ pFuncNode->funcType == FUNCTION_TYPE_FORECAST_ROWTS) {
+ pName = pFuncNode->functionName;
+ }
+ pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pName);
tExprNode* pExprNode = pExp->pExpr;
diff --git a/source/libs/executor/src/forecastoperator.c b/source/libs/executor/src/forecastoperator.c
new file mode 100644
index 0000000000..599678106c
--- /dev/null
+++ b/source/libs/executor/src/forecastoperator.c
@@ -0,0 +1,663 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#include "executorInt.h"
+#include "filter.h"
+#include "function.h"
+#include "functionMgt.h"
+#include "operator.h"
+#include "querytask.h"
+#include "storageapi.h"
+#include "tanal.h"
+#include "tcommon.h"
+#include "tcompare.h"
+#include "tdatablock.h"
+#include "tfill.h"
+#include "ttime.h"
+
+#ifdef USE_ANAL
+
+typedef struct {
+ char algoName[TSDB_ANAL_ALGO_NAME_LEN];
+ char algoUrl[TSDB_ANAL_ALGO_URL_LEN];
+ char algoOpt[TSDB_ANAL_ALGO_OPTION_LEN];
+ int64_t maxTs;
+ int64_t minTs;
+ int64_t numOfRows;
+ uint64_t groupId;
+ int32_t numOfBlocks;
+ int32_t optRows;
+ int16_t resTsSlot;
+ int16_t resValSlot;
+ int16_t resLowSlot;
+ int16_t resHighSlot;
+ int16_t inputTsSlot;
+ int16_t inputValSlot;
+ int8_t inputValType;
+ int8_t inputPrecision;
+ SAnalBuf analBuf;
+} SForecastSupp;
+
+typedef struct SForecastOperatorInfo {
+ SSDataBlock* pRes;
+ SExprSupp scalarSup; // scalar calculation
+ SForecastSupp forecastSupp;
+} SForecastOperatorInfo;
+
+static void destroyForecastInfo(void* param);
+
+static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int32_t newRowsNum) {
+ if (pBlock->info.rows < pBlock->info.capacity) {
+ return TSDB_CODE_SUCCESS;
+ }
+
+ int32_t code = blockDataEnsureCapacity(pBlock, newRowsNum);
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
+ return code;
+ }
+
+ return TSDB_CODE_SUCCESS;
+}
+
+static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SAnalBuf* pBuf = &pSupp->analBuf;
+
+ qDebug("block:%d, %p rows:%" PRId64, pSupp->numOfBlocks, pBlock, pBlock->info.rows);
+ pSupp->numOfBlocks++;
+
+ for (int32_t j = 0; j < pBlock->info.rows; ++j) {
+ SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->inputValSlot);
+ SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSupp->inputTsSlot);
+ if (pTsCol == NULL || pValCol == NULL) break;
+
+ int64_t ts = ((TSKEY*)pTsCol->pData)[j];
+ char* val = colDataGetData(pValCol, j);
+ int16_t valType = pValCol->info.type;
+
+ pSupp->minTs = MIN(pSupp->minTs, ts);
+ pSupp->maxTs = MAX(pSupp->maxTs, ts);
+ pSupp->numOfRows++;
+
+ code = taosAnalBufWriteColData(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, &ts);
+ if (TSDB_CODE_SUCCESS != code) return code;
+
+ code = taosAnalBufWriteColData(pBuf, 1, valType, val);
+ if (TSDB_CODE_SUCCESS != code) return code;
+ }
+
+ return 0;
+}
+
+static int32_t forecastCloseBuf(SForecastSupp* pSupp) {
+ SAnalBuf* pBuf = &pSupp->analBuf;
+ int32_t code = 0;
+
+ for (int32_t i = 0; i < 2; ++i) {
+ code = taosAnalBufWriteColEnd(pBuf, i);
+ if (code != 0) return code;
+ }
+
+ code = taosAnalBufWriteDataEnd(pBuf);
+ if (code != 0) return code;
+
+ int32_t len = strlen(pSupp->algoOpt);
+ int64_t every = (pSupp->maxTs - pSupp->minTs) / (pSupp->numOfRows + 1);
+ int64_t start = pSupp->maxTs + every;
+ bool hasStart = taosAnalGetOptStr(pSupp->algoOpt, "start", NULL, 0);
+ if (!hasStart) {
+ qDebug("forecast start not found from %s, use %" PRId64, pSupp->algoOpt, start);
+ code = taosAnalBufWriteOptInt(pBuf, "start", start);
+ if (code != 0) return code;
+ }
+
+ bool hasEvery = taosAnalGetOptStr(pSupp->algoOpt, "every", NULL, 0);
+ if (!hasEvery) {
+ qDebug("forecast every not found from %s, use %" PRId64, pSupp->algoOpt, every);
+ code = taosAnalBufWriteOptInt(pBuf, "every", every);
+ if (code != 0) return code;
+ }
+
+ code = taosAnalBufWriteOptStr(pBuf, "option", pSupp->algoOpt);
+ if (code != 0) return code;
+
+ code = taosAnalBufClose(pBuf);
+ return code;
+}
+
+static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock) {
+ SAnalBuf* pBuf = &pSupp->analBuf;
+ int32_t resCurRow = pBlock->info.rows;
+ int8_t tmpI8;
+ int16_t tmpI16;
+ int32_t tmpI32;
+ int64_t tmpI64;
+ float tmpFloat;
+ double tmpDouble;
+ int32_t code = 0;
+
+ SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot);
+ if (NULL == pResValCol) return TSDB_CODE_OUT_OF_RANGE;
+
+ SColumnInfoData* pResTsCol = (pSupp->resTsSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL);
+ SColumnInfoData* pResLowCol = (pSupp->resLowSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL);
+ SColumnInfoData* pResHighCol =
+ (pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL);
+
+ SJson* pJson = taosAnalSendReqRetJson(pSupp->algoUrl, ANAL_HTTP_TYPE_POST, pBuf);
+ if (pJson == NULL) return terrno;
+
+ int32_t rows = 0;
+ tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code);
+ if (code < 0) goto _OVER;
+ if (rows <= 0) goto _OVER;
+
+ SJson* res = tjsonGetObjectItem(pJson, "res");
+ if (res == NULL) goto _OVER;
+ int32_t ressize = tjsonGetArraySize(res);
+ bool returnConf = (pSupp->resHighSlot != -1 || pSupp->resLowSlot != -1);
+ if (returnConf) {
+ if (ressize != 4) goto _OVER;
+ } else if (ressize != 2) {
+ goto _OVER;
+ }
+
+ if (pResTsCol != NULL) {
+ resCurRow = pBlock->info.rows;
+ SJson* tsJsonArray = tjsonGetArrayItem(res, 0);
+ if (tsJsonArray == NULL) goto _OVER;
+ int32_t tsSize = tjsonGetArraySize(tsJsonArray);
+ if (tsSize != rows) goto _OVER;
+ for (int32_t i = 0; i < tsSize; ++i) {
+ SJson* tsJson = tjsonGetArrayItem(tsJsonArray, i);
+ tjsonGetObjectValueBigInt(tsJson, &tmpI64);
+ colDataSetInt64(pResTsCol, resCurRow, &tmpI64);
+ resCurRow++;
+ }
+ }
+
+ if (pResLowCol != NULL) {
+ resCurRow = pBlock->info.rows;
+ SJson* lowJsonArray = tjsonGetArrayItem(res, 2);
+ if (lowJsonArray == NULL) goto _OVER;
+ int32_t lowSize = tjsonGetArraySize(lowJsonArray);
+ if (lowSize != rows) goto _OVER;
+ for (int32_t i = 0; i < lowSize; ++i) {
+ SJson* lowJson = tjsonGetArrayItem(lowJsonArray, i);
+ tjsonGetObjectValueDouble(lowJson, &tmpDouble);
+ tmpFloat = (float)tmpDouble;
+ colDataSetFloat(pResLowCol, resCurRow, &tmpFloat);
+ resCurRow++;
+ }
+ }
+
+ if (pResHighCol != NULL) {
+ resCurRow = pBlock->info.rows;
+ SJson* highJsonArray = tjsonGetArrayItem(res, 3);
+ if (highJsonArray == NULL) goto _OVER;
+ int32_t highSize = tjsonGetArraySize(highJsonArray);
+ if (highSize != rows) goto _OVER;
+ for (int32_t i = 0; i < highSize; ++i) {
+ SJson* highJson = tjsonGetArrayItem(highJsonArray, i);
+ tjsonGetObjectValueDouble(highJson, &tmpDouble);
+ tmpFloat = (float)tmpDouble;
+ colDataSetFloat(pResHighCol, resCurRow, &tmpFloat);
+ resCurRow++;
+ }
+ }
+
+ resCurRow = pBlock->info.rows;
+ SJson* valJsonArray = tjsonGetArrayItem(res, 1);
+ if (valJsonArray == NULL) goto _OVER;
+ int32_t valSize = tjsonGetArraySize(valJsonArray);
+ if (valSize != rows) goto _OVER;
+ for (int32_t i = 0; i < valSize; ++i) {
+ SJson* valJson = tjsonGetArrayItem(valJsonArray, i);
+ tjsonGetObjectValueDouble(valJson, &tmpDouble);
+
+ switch (pSupp->inputValType) {
+ case TSDB_DATA_TYPE_BOOL:
+ case TSDB_DATA_TYPE_UTINYINT:
+ case TSDB_DATA_TYPE_TINYINT: {
+ tmpI8 = (int8_t)tmpDouble;
+ colDataSetInt8(pResValCol, resCurRow, &tmpI8);
+ break;
+ }
+ case TSDB_DATA_TYPE_USMALLINT:
+ case TSDB_DATA_TYPE_SMALLINT: {
+ tmpI16 = (int16_t)tmpDouble;
+ colDataSetInt16(pResValCol, resCurRow, &tmpI16);
+ break;
+ }
+ case TSDB_DATA_TYPE_INT:
+ case TSDB_DATA_TYPE_UINT: {
+ tmpI32 = (int32_t)tmpDouble;
+ colDataSetInt32(pResValCol, resCurRow, &tmpI32);
+ break;
+ }
+ case TSDB_DATA_TYPE_TIMESTAMP:
+ case TSDB_DATA_TYPE_UBIGINT:
+ case TSDB_DATA_TYPE_BIGINT: {
+ tmpI64 = (int64_t)tmpDouble;
+ colDataSetInt64(pResValCol, resCurRow, &tmpI64);
+ break;
+ }
+ case TSDB_DATA_TYPE_FLOAT: {
+ tmpFloat = (float)tmpDouble;
+ colDataSetFloat(pResValCol, resCurRow, &tmpFloat);
+ break;
+ }
+ case TSDB_DATA_TYPE_DOUBLE: {
+ colDataSetDouble(pResValCol, resCurRow, &tmpDouble);
+ break;
+ }
+ default:
+ code = TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
+ goto _OVER;
+ }
+ resCurRow++;
+ }
+
+ // for (int32_t i = rows; i < pSupp->optRows; ++i) {
+ // colDataSetNNULL(pResValCol, rows, (pSupp->optRows - rows));
+ // if (pResTsCol != NULL) {
+ // colDataSetNNULL(pResTsCol, rows, (pSupp->optRows - rows));
+ // }
+ // if (pResLowCol != NULL) {
+ // colDataSetNNULL(pResLowCol, rows, (pSupp->optRows - rows));
+ // }
+ // if (pResHighCol != NULL) {
+ // colDataSetNNULL(pResHighCol, rows, (pSupp->optRows - rows));
+ // }
+ // }
+
+ // if (rows == pSupp->optRows) {
+ // pResValCol->hasNull = false;
+ // }
+
+ pBlock->info.rows += rows;
+
+ if (pJson != NULL) tjsonDelete(pJson);
+ return 0;
+
+_OVER:
+ if (pJson != NULL) tjsonDelete(pJson);
+ if (code == 0) {
+ code = TSDB_CODE_INVALID_JSON_FORMAT;
+ }
+ qError("failed to perform forecast finalize since %s", tstrerror(code));
+ return TSDB_CODE_INVALID_JSON_FORMAT;
+}
+
+static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SAnalBuf* pBuf = &pSupp->analBuf;
+
+ code = forecastCloseBuf(pSupp);
+ QUERY_CHECK_CODE(code, lino, _end);
+
+ code = forecastEnsureBlockCapacity(pResBlock, 1);
+ QUERY_CHECK_CODE(code, lino, _end);
+
+ code = forecastAnalysis(pSupp, pResBlock);
+ QUERY_CHECK_CODE(code, lino, _end);
+
+ uInfo("block:%d, forecast finalize", pSupp->numOfBlocks);
+
+_end:
+ pSupp->numOfBlocks = 0;
+ taosAnalBufDestroy(&pSupp->analBuf);
+ return code;
+}
+
+static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+ SForecastOperatorInfo* pInfo = pOperator->info;
+ SSDataBlock* pResBlock = pInfo->pRes;
+ SForecastSupp* pSupp = &pInfo->forecastSupp;
+ SAnalBuf* pBuf = &pSupp->analBuf;
+ int64_t st = taosGetTimestampUs();
+ int32_t numOfBlocks = pSupp->numOfBlocks;
+
+ blockDataCleanup(pResBlock);
+
+ while (1) {
+ SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
+ if (pBlock == NULL) {
+ break;
+ }
+
+ if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) {
+ pSupp->groupId = pBlock->info.id.groupId;
+ numOfBlocks++;
+ qDebug("group:%" PRId64 ", blocks:%d, cache block rows:%" PRId64, pSupp->groupId, numOfBlocks, pBlock->info.rows);
+ code = forecastCacheBlock(pSupp, pBlock);
+ QUERY_CHECK_CODE(code, lino, _end);
+ } else {
+ qDebug("group:%" PRId64 ", read finish for new group coming, blocks:%d", pSupp->groupId, numOfBlocks);
+ forecastAggregateBlocks(pSupp, pResBlock);
+ pSupp->groupId = pBlock->info.id.groupId;
+ numOfBlocks = 1;
+ qDebug("group:%" PRId64 ", new group, cache block rows:%" PRId64, pSupp->groupId, pBlock->info.rows);
+ code = forecastCacheBlock(pSupp, pBlock);
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
+
+ if (pResBlock->info.rows > 0) {
+ (*ppRes) = pResBlock;
+ qDebug("group:%" PRId64 ", return to upstream, blocks:%d", pResBlock->info.id.groupId, numOfBlocks);
+ return code;
+ }
+ }
+
+ if (numOfBlocks > 0) {
+ qDebug("group:%" PRId64 ", read finish, blocks:%d", pSupp->groupId, numOfBlocks);
+ forecastAggregateBlocks(pSupp, pResBlock);
+ }
+
+ int64_t cost = taosGetTimestampUs() - st;
+ qDebug("all groups finished, cost:%" PRId64 "us", cost);
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ pTaskInfo->code = code;
+ T_LONG_JMP(pTaskInfo->env, code);
+ }
+ (*ppRes) = (pResBlock->info.rows == 0) ? NULL : pResBlock;
+ return code;
+}
+
+static int32_t forecastParseOutput(SForecastSupp* pSupp, SExprSupp* pExprSup) {
+ pSupp->resLowSlot = -1;
+ pSupp->resHighSlot = -1;
+ pSupp->resTsSlot = -1;
+ pSupp->resValSlot = -1;
+
+ for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
+ SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
+ int32_t dstSlot = pExprInfo->base.resSchema.slotId;
+ if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST) {
+ pSupp->resValSlot = dstSlot;
+ } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_ROWTS) {
+ pSupp->resTsSlot = dstSlot;
+ } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_LOW) {
+ pSupp->resLowSlot = dstSlot;
+ } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_HIGH) {
+ pSupp->resHighSlot = dstSlot;
+ } else {
+ }
+ }
+
+ return 0;
+}
+
+static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs) {
+ SNode* pNode = NULL;
+
+ pSupp->inputTsSlot = -1;
+ pSupp->inputValSlot = -1;
+ pSupp->inputValType = -1;
+ pSupp->inputPrecision = -1;
+
+ FOREACH(pNode, pFuncs) {
+ if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
+ SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
+ int32_t numOfParam = LIST_LENGTH(pFunc->pParameterList);
+
+ if (pFunc->funcType == FUNCTION_TYPE_FORECAST) {
+ if (numOfParam == 3) {
+ SNode* p1 = nodesListGetNode(pFunc->pParameterList, 0);
+ SNode* p2 = nodesListGetNode(pFunc->pParameterList, 1);
+ SNode* p3 = nodesListGetNode(pFunc->pParameterList, 2);
+ if (p1 == NULL || p2 == NULL || p3 == NULL) return TSDB_CODE_PLAN_INTERNAL_ERROR;
+ if (p1->type != QUERY_NODE_COLUMN) return TSDB_CODE_PLAN_INTERNAL_ERROR;
+ if (p2->type != QUERY_NODE_VALUE) return TSDB_CODE_PLAN_INTERNAL_ERROR;
+ if (p3->type != QUERY_NODE_COLUMN) return TSDB_CODE_PLAN_INTERNAL_ERROR;
+ SColumnNode* pValNode = (SColumnNode*)p1;
+ SValueNode* pOptNode = (SValueNode*)p2;
+ SColumnNode* pTsNode = (SColumnNode*)p3;
+ pSupp->inputTsSlot = pTsNode->slotId;
+ pSupp->inputPrecision = pTsNode->node.resType.precision;
+ pSupp->inputValSlot = pValNode->slotId;
+ pSupp->inputValType = pValNode->node.resType.type;
+ tstrncpy(pSupp->algoOpt, pOptNode->literal, sizeof(pSupp->algoOpt));
+ } else if (numOfParam == 2) {
+ SNode* p1 = nodesListGetNode(pFunc->pParameterList, 0);
+ SNode* p2 = nodesListGetNode(pFunc->pParameterList, 1);
+ if (p1 == NULL || p2 == NULL) return TSDB_CODE_PLAN_INTERNAL_ERROR;
+ if (p1->type != QUERY_NODE_COLUMN) return TSDB_CODE_PLAN_INTERNAL_ERROR;
+ if (p2->type != QUERY_NODE_COLUMN) return TSDB_CODE_PLAN_INTERNAL_ERROR;
+ SColumnNode* pValNode = (SColumnNode*)p1;
+ SColumnNode* pTsNode = (SColumnNode*)p2;
+ pSupp->inputTsSlot = pTsNode->slotId;
+ pSupp->inputPrecision = pTsNode->node.resType.precision;
+ pSupp->inputValSlot = pValNode->slotId;
+ pSupp->inputValType = pValNode->node.resType.type;
+ tstrncpy(pSupp->algoOpt, "algo=arima", TSDB_ANAL_ALGO_OPTION_LEN);
+ } else {
+ return TSDB_CODE_PLAN_INTERNAL_ERROR;
+ }
+ }
+ }
+ }
+
+ return 0;
+}
+
+static int32_t forecastParseAlgo(SForecastSupp* pSupp) {
+ pSupp->maxTs = 0;
+ pSupp->minTs = INT64_MAX;
+ pSupp->numOfRows = 0;
+
+ if (!taosAnalGetOptStr(pSupp->algoOpt, "algo", pSupp->algoName, sizeof(pSupp->algoName))) {
+ qError("failed to get forecast algorithm name from %s", pSupp->algoOpt);
+ return TSDB_CODE_ANAL_ALGO_NOT_FOUND;
+ }
+
+ if (taosAnalGetAlgoUrl(pSupp->algoName, ANAL_ALGO_TYPE_FORECAST, pSupp->algoUrl, sizeof(pSupp->algoUrl)) != 0) {
+ qError("failed to get forecast algorithm url from %s", pSupp->algoName);
+ return TSDB_CODE_ANAL_ALGO_NOT_LOAD;
+ }
+
+ return 0;
+}
+
+static int32_t forecastCreateBuf(SForecastSupp* pSupp) {
+ SAnalBuf* pBuf = &pSupp->analBuf;
+ int64_t ts = 0; // taosGetTimestampMs();
+
+ pBuf->bufType = ANAL_BUF_TYPE_JSON_COL;
+ snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-forecast-%" PRId64, tsTempDir, ts);
+ int32_t code = tsosAnalBufOpen(pBuf, 2);
+ if (code != 0) goto _OVER;
+
+ code = taosAnalBufWriteOptStr(pBuf, "algo", pSupp->algoName);
+ if (code != 0) goto _OVER;
+
+ bool returnConf = (pSupp->resHighSlot == -1 || pSupp->resLowSlot == -1);
+ code = taosAnalBufWriteOptStr(pBuf, "return_conf", returnConf ? "true" : "false");
+ if (code != 0) goto _OVER;
+
+ bool hasAlpha = taosAnalGetOptStr(pSupp->algoOpt, "alpha", NULL, 0);
+ if (!hasAlpha) {
+ qDebug("forecast alpha not found from %s, use default:%f", pSupp->algoOpt, ANAL_FORECAST_DEFAULT_ALPHA);
+ code = taosAnalBufWriteOptFloat(pBuf, "alpha", ANAL_FORECAST_DEFAULT_ALPHA);
+ if (code != 0) goto _OVER;
+ }
+
+ char tmpOpt[32] = {0};
+ bool hasParam = taosAnalGetOptStr(pSupp->algoOpt, "param", tmpOpt, sizeof(tmpOpt));
+ if (!hasParam) {
+ qDebug("forecast param not found from %s, use default:%s", pSupp->algoOpt, ANAL_FORECAST_DEFAULT_PARAM);
+ code = taosAnalBufWriteOptStr(pBuf, "param", ANAL_FORECAST_DEFAULT_PARAM);
+ if (code != 0) goto _OVER;
+ }
+
+ bool hasPeriod = taosAnalGetOptInt(pSupp->algoOpt, "period", NULL);
+ if (!hasPeriod) {
+ qDebug("forecast period not found from %s, use default:%d", pSupp->algoOpt, ANAL_FORECAST_DEFAULT_PERIOD);
+ code = taosAnalBufWriteOptInt(pBuf, "period", ANAL_FORECAST_DEFAULT_PERIOD);
+ if (code != 0) goto _OVER;
+ }
+
+ bool hasRows = taosAnalGetOptInt(pSupp->algoOpt, "rows", &pSupp->optRows);
+ if (!hasRows) {
+ pSupp->optRows = ANAL_FORECAST_DEFAULT_ROWS;
+ qDebug("forecast rows not found from %s, use default:%d", pSupp->algoOpt, pSupp->optRows);
+ code = taosAnalBufWriteOptInt(pBuf, "forecast_rows", pSupp->optRows);
+ if (code != 0) goto _OVER;
+ }
+
+ const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
+ if (pSupp->inputPrecision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR;
+ if (pSupp->inputPrecision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR;
+ code = taosAnalBufWriteOptStr(pBuf, "prec", prec);
+ if (code != 0) goto _OVER;
+
+ if (returnConf) {
+ bool hasConf = taosAnalGetOptStr(pSupp->algoOpt, "conf", NULL, 0);
+ if (!hasConf) {
+ qDebug("forecast conf not found from %s, use default:%d", pSupp->algoOpt, ANAL_FORECAST_DEFAULT_CONF);
+ code = taosAnalBufWriteOptInt(pBuf, "conf", ANAL_FORECAST_DEFAULT_CONF);
+ if (code != 0) goto _OVER;
+ }
+ }
+
+ code = taosAnalBufWriteColMeta(pBuf, 0, TSDB_DATA_TYPE_TIMESTAMP, "ts");
+ if (code != 0) goto _OVER;
+
+ code = taosAnalBufWriteColMeta(pBuf, 1, pSupp->inputValType, "val");
+ if (code != 0) goto _OVER;
+
+ code = taosAnalBufWriteDataBegin(pBuf);
+ if (code != 0) goto _OVER;
+
+ for (int32_t i = 0; i < 2; ++i) {
+ code = taosAnalBufWriteColBegin(pBuf, i);
+ if (code != 0) goto _OVER;
+ }
+
+_OVER:
+ if (code != 0) {
+ taosAnalBufClose(pBuf);
+ taosAnalBufDestroy(pBuf);
+ }
+ return code;
+}
+
+int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
+ SOperatorInfo** pOptrInfo) {
+ QRY_PARAM_CHECK(pOptrInfo);
+
+ int32_t code = 0;
+ int32_t lino = 0;
+ SForecastOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SForecastOperatorInfo));
+ SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
+ if (pOperator == NULL || pInfo == NULL) {
+ code = terrno;
+ goto _error;
+ }
+
+ SForecastSupp* pSupp = &pInfo->forecastSupp;
+ SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pPhyNode;
+ SExprSupp* pExprSup = &pOperator->exprSupp;
+ int32_t numOfExprs = 0;
+ SExprInfo* pExprInfo = NULL;
+
+ code = createExprInfo(pForecastPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ code = initExprSupp(pExprSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ if (pForecastPhyNode->pExprs != NULL) {
+ int32_t num = 0;
+ SExprInfo* pScalarExprInfo = NULL;
+ code = createExprInfo(pForecastPhyNode->pExprs, NULL, &pScalarExprInfo, &num);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
+ QUERY_CHECK_CODE(code, lino, _error);
+ }
+
+ code = filterInitFromNode((SNode*)pForecastPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ code = forecastParseInput(pSupp, pForecastPhyNode->pFuncs);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ code = forecastParseOutput(pSupp, pExprSup);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ code = forecastParseAlgo(pSupp);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ code = forecastCreateBuf(pSupp);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ initResultSizeInfo(&pOperator->resultInfo, 4096);
+
+ pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
+ QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
+
+ setOperatorInfo(pOperator, "ForecastOperator", QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC, false, OP_NOT_OPENED, pInfo,
+ pTaskInfo);
+ pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, forecastNext, NULL, destroyForecastInfo, optrDefaultBufFn,
+ NULL, optrDefaultGetNextExtFn, NULL);
+
+ code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ code = appendDownstream(pOperator, &downstream, 1);
+ QUERY_CHECK_CODE(code, lino, _error);
+
+ *pOptrInfo = pOperator;
+
+ qDebug("forecast env is initialized, option:%s", pSupp->algoOpt);
+ return TSDB_CODE_SUCCESS;
+
+_error:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ if (pInfo != NULL) destroyForecastInfo(pInfo);
+ destroyOperatorAndDownstreams(pOperator, &downstream, 1);
+ pTaskInfo->code = code;
+ return code;
+}
+
+static void destroyForecastInfo(void* param) {
+ SForecastOperatorInfo* pInfo = (SForecastOperatorInfo*)param;
+
+ blockDataDestroy(pInfo->pRes);
+ pInfo->pRes = NULL;
+ cleanupExprSupp(&pInfo->scalarSup);
+ taosAnalBufDestroy(&pInfo->forecastSupp.analBuf);
+ taosMemoryFreeClear(param);
+}
+
+#else
+
+int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
+ SOperatorInfo** pOptrInfo) {
+ return TSDB_CODE_OPS_NOT_SUPPORT;
+}
+
+#endif
diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c
index 33f46ec73b..6d892a0ca7 100644
--- a/source/libs/executor/src/operator.c
+++ b/source/libs/executor/src/operator.c
@@ -619,6 +619,8 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
code = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
code = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
+ } else if (QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC == type) {
+ code = createForecastOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) {
code = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
} else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE == type) {
@@ -631,6 +633,8 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
code = createCountwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC == type) {
code = createStreamTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle, &pOptr);
+ } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY == type) {
+ code = createAnomalywindowOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
} else {
code = TSDB_CODE_INVALID_PARA;
pTaskInfo->code = code;
diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c
index 7d1b1bc1c4..3155b512d5 100644
--- a/source/libs/executor/src/streamtimewindowoperator.c
+++ b/source/libs/executor/src/streamtimewindowoperator.c
@@ -49,7 +49,7 @@
#define STREAM_SESSION_OP_CHECKPOINT_NAME "StreamSessionOperator_Checkpoint"
#define STREAM_STATE_OP_CHECKPOINT_NAME "StreamStateOperator_Checkpoint"
-#define MAX_STREAM_HISTORY_RESULT 100000000
+#define MAX_STREAM_HISTORY_RESULT 20000000
typedef struct SStateWindowInfo {
SResultWindowInfo winInfo;
diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c
index 7ca3e5852b..68b6bcb2c0 100644
--- a/source/libs/function/src/builtins.c
+++ b/source/libs/function/src/builtins.c
@@ -16,9 +16,10 @@
#include "builtins.h"
#include "builtinsimpl.h"
#include "cJSON.h"
+#include "geomFunc.h"
#include "querynodes.h"
#include "scalar.h"
-#include "geomFunc.h"
+#include "tanal.h"
#include "taoserror.h"
#include "ttime.h"
@@ -237,7 +238,7 @@ static int32_t addTimezoneParam(SNodeList* pList) {
return terrno;
}
varDataSetLen(pVal->datum.p, len);
- (void)strncpy(varDataVal(pVal->datum.p), pVal->literal, len);
+ tstrncpy(varDataVal(pVal->datum.p), pVal->literal, len + 1);
code = nodesListAppend(pList, (SNode*)pVal);
if (TSDB_CODE_SUCCESS != code) {
@@ -2078,6 +2079,47 @@ static int32_t translateMode(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateUniqueMode(pFunc, pErrBuf, len, false);
}
+static int32_t translateForecast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
+ int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
+ if (2 != numOfParams && 1 != numOfParams) {
+ return invaildFuncParaNumErrMsg(pErrBuf, len, "FORECAST require 1 or 2 parameters");
+ }
+
+ uint8_t valType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type;
+ if (!IS_MATHABLE_TYPE(valType)) {
+ return invaildFuncParaTypeErrMsg(pErrBuf, len, "FORECAST only support mathable column");
+ }
+
+ if (numOfParams == 2) {
+ uint8_t optionType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 1))->type;
+ if (TSDB_DATA_TYPE_BINARY != optionType) {
+ return invaildFuncParaTypeErrMsg(pErrBuf, len, "FORECAST option should be varchar");
+ }
+
+ SNode* pOption = nodesListGetNode(pFunc->pParameterList, 1);
+ if (QUERY_NODE_VALUE != nodeType(pOption)) {
+ return invaildFuncParaTypeErrMsg(pErrBuf, len, "FORECAST option should be value");
+ }
+
+ SValueNode* pValue = (SValueNode*)pOption;
+ if (!taosAnalGetOptStr(pValue->literal, "algo", NULL, 0) != 0) {
+ return invaildFuncParaValueErrMsg(pErrBuf, len, "FORECAST option should include algo field");
+ }
+
+ pValue->notReserved = true;
+ }
+
+ pFunc->node.resType = (SDataType){.bytes = tDataTypes[valType].bytes, .type = valType};
+ return TSDB_CODE_SUCCESS;
+}
+
+static int32_t translateForecastConf(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
+ pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_FLOAT].bytes, .type = TSDB_DATA_TYPE_FLOAT};
+ return TSDB_CODE_SUCCESS;
+}
+
+static EFuncReturnRows forecastEstReturnRows(SFunctionNode* pFunc) { return FUNC_RETURN_ROWS_N; }
+
static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (numOfParams > 2) {
diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c
index a44b9e3ac2..f13685239a 100644
--- a/source/libs/function/src/builtinsimpl.c
+++ b/source/libs/function/src/builtinsimpl.c
@@ -18,6 +18,7 @@
#include "function.h"
#include "query.h"
#include "querynodes.h"
+#include "tanal.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tdigest.h"
@@ -3578,6 +3579,11 @@ bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool f
}
}
+bool getForecastConfEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
+ pEnv->calcMemSize = sizeof(float);
+ return true;
+}
+
int32_t diffResultIsNull(SqlFunctionCtx* pCtx, SFuncInputRow* pRow){
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
@@ -4970,10 +4976,10 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t len;
char buf[512] = {0};
if (!pInfo->normalized) {
- len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%" PRId64 "}",
+ len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%" PRId64 "}",
pInfo->bins[i].lower, pInfo->bins[i].upper, pInfo->bins[i].count);
} else {
- len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}", pInfo->bins[i].lower,
+ len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}", pInfo->bins[i].lower,
pInfo->bins[i].upper, pInfo->bins[i].percentage);
}
varDataSetLen(buf, len);
@@ -6601,7 +6607,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
compRatio = pData->totalSize * 100 / (double)totalRawSize;
}
- int32_t len = sprintf(st + VARSTR_HEADER_SIZE,
+ int32_t len = snprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE,
"Total_Blocks=[%d] Total_Size=[%.2f KiB] Average_size=[%.2f KiB] Compression_Ratio=[%.2f %c]",
pData->numOfBlocks, pData->totalSize / 1024.0, averageSize / 1024.0, compRatio, '%');
@@ -6616,7 +6622,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
avgRows = pData->totalRows / pData->numOfBlocks;
}
- len = sprintf(st + VARSTR_HEADER_SIZE, "Block_Rows=[%" PRId64 "] MinRows=[%d] MaxRows=[%d] AvgRows=[%" PRId64 "]",
+ len = snprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE, "Block_Rows=[%" PRId64 "] MinRows=[%d] MaxRows=[%d] AvgRows=[%" PRId64 "]",
pData->totalRows, pData->minRows, pData->maxRows, avgRows);
varDataSetLen(st, len);
code = colDataSetVal(pColInfo, row++, st, false);
@@ -6624,14 +6630,14 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return code;
}
- len = sprintf(st + VARSTR_HEADER_SIZE, "Inmem_Rows=[%d] Stt_Rows=[%d] ", pData->numOfInmemRows, pData->numOfSttRows);
+ len = snprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE, "Inmem_Rows=[%d] Stt_Rows=[%d] ", pData->numOfInmemRows, pData->numOfSttRows);
varDataSetLen(st, len);
code = colDataSetVal(pColInfo, row++, st, false);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
- len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Filesets=[%d] Total_Vgroups=[%d]", pData->numOfTables,
+ len = snprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Filesets=[%d] Total_Vgroups=[%d]", pData->numOfTables,
pData->numOfFiles, pData->numOfVgroups);
varDataSetLen(st, len);
@@ -6640,7 +6646,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return code;
}
- len = sprintf(st + VARSTR_HEADER_SIZE,
+ len = snprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE,
"--------------------------------------------------------------------------------");
varDataSetLen(st, len);
code = colDataSetVal(pColInfo, row++, st, false);
@@ -6667,7 +6673,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t bucketRange = ceil(((double) (pData->defMaxRows - pData->defMinRows)) / numOfBuckets);
for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) {
- len = sprintf(st + VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * (i + 1));
+ len = snprintf(varDataVal(st), sizeof(st) - VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * (i + 1));
int32_t num = 0;
if (pData->blockRowsHisto[i] > 0) {
@@ -6675,13 +6681,13 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
}
for (int32_t j = 0; j < num; ++j) {
- int32_t x = sprintf(st + VARSTR_HEADER_SIZE + len, "%c", '|');
+ int32_t x = snprintf(varDataVal(st) + len, sizeof(st) - VARSTR_HEADER_SIZE - len, "%c", '|');
len += x;
}
if (pData->blockRowsHisto[i] > 0) {
double v = pData->blockRowsHisto[i] * 100.0 / pData->numOfBlocks;
- len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.2f%c)", pData->blockRowsHisto[i], v, '%');
+ len += snprintf(varDataVal(st) + len, sizeof(st) - VARSTR_HEADER_SIZE - len, " %d (%.2f%c)", pData->blockRowsHisto[i], v, '%');
}
varDataSetLen(st, len);
diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c
index 886772b36c..1717702df7 100644
--- a/source/libs/function/src/functionMgt.c
+++ b/source/libs/function/src/functionMgt.c
@@ -232,6 +232,15 @@ bool fmIsInterpFunc(int32_t funcId) {
bool fmIsInterpPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INTERP_PC_FUNC); }
+bool fmIsForecastFunc(int32_t funcId) {
+ if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
+ return false;
+ }
+ return FUNCTION_TYPE_FORECAST == funcMgtBuiltins[funcId].type;
+}
+
+bool fmIsForecastPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORECAST_PC_FUNC); }
+
bool fmIsLastRowFunc(int32_t funcId) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false;
@@ -408,7 +417,7 @@ static int32_t createColumnByFunc(const SFunctionNode* pFunc, SColumnNode** ppCo
if (NULL == *ppCol) {
return code;
}
- (void)strcpy((*ppCol)->colName, pFunc->node.aliasName);
+ tstrncpy((*ppCol)->colName, pFunc->node.aliasName, TSDB_COL_NAME_LEN);
(*ppCol)->node.resType = pFunc->node.resType;
return TSDB_CODE_SUCCESS;
}
@@ -437,11 +446,11 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod
(*pPartialFunc)->hasOriginalFunc = true;
(*pPartialFunc)->originalFuncId = pSrcFunc->hasOriginalFunc ? pSrcFunc->originalFuncId : pSrcFunc->funcId;
char name[TSDB_FUNC_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_POINTER_PRINT_BYTES + 1] = {0};
- int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", (*pPartialFunc)->functionName, pSrcFunc);
+ int32_t len = snprintf(name, sizeof(name), "%s.%p", (*pPartialFunc)->functionName, pSrcFunc);
if (taosHashBinary(name, len) < 0) {
return TSDB_CODE_FAILED;
}
- (void)strncpy((*pPartialFunc)->node.aliasName, name, TSDB_COL_NAME_LEN - 1);
+ tstrncpy((*pPartialFunc)->node.aliasName, name, TSDB_COL_NAME_LEN);
(*pPartialFunc)->hasPk = pSrcFunc->hasPk;
(*pPartialFunc)->pkBytes = pSrcFunc->pkBytes;
return TSDB_CODE_SUCCESS;
@@ -475,7 +484,7 @@ static int32_t createMidFunction(const SFunctionNode* pSrcFunc, const SFunctionN
}
}
if (TSDB_CODE_SUCCESS == code) {
- (void)strcpy(pFunc->node.aliasName, pPartialFunc->node.aliasName);
+ tstrncpy(pFunc->node.aliasName, pPartialFunc->node.aliasName, TSDB_COL_NAME_LEN);
}
if (TSDB_CODE_SUCCESS == code) {
@@ -504,7 +513,7 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
if (fmIsSameInOutType(pSrcFunc->funcId)) {
pFunc->node.resType = pSrcFunc->node.resType;
}
- (void)strcpy(pFunc->node.aliasName, pSrcFunc->node.aliasName);
+ tstrncpy(pFunc->node.aliasName, pSrcFunc->node.aliasName, TSDB_COL_NAME_LEN);
}
if (TSDB_CODE_SUCCESS == code) {
@@ -558,8 +567,8 @@ static int32_t fmCreateStateFunc(const SFunctionNode* pFunc, SFunctionNode** pSt
nodesDestroyList(pParams);
return code;
}
- (void)strcpy((*pStateFunc)->node.aliasName, pFunc->node.aliasName);
- (void)strcpy((*pStateFunc)->node.userAlias, pFunc->node.userAlias);
+ tstrncpy((*pStateFunc)->node.aliasName, pFunc->node.aliasName, TSDB_COL_NAME_LEN);
+ tstrncpy((*pStateFunc)->node.userAlias, pFunc->node.userAlias, TSDB_COL_NAME_LEN);
}
return TSDB_CODE_SUCCESS;
}
@@ -605,8 +614,8 @@ static int32_t fmCreateStateMergeFunc(SFunctionNode* pFunc, SFunctionNode** pSta
nodesDestroyList(pParams);
return code;
}
- (void)strcpy((*pStateMergeFunc)->node.aliasName, pFunc->node.aliasName);
- (void)strcpy((*pStateMergeFunc)->node.userAlias, pFunc->node.userAlias);
+ tstrncpy((*pStateMergeFunc)->node.aliasName, pFunc->node.aliasName, TSDB_COL_NAME_LEN);
+ tstrncpy((*pStateMergeFunc)->node.userAlias, pFunc->node.userAlias, TSDB_COL_NAME_LEN);
}
return TSDB_CODE_SUCCESS;
}
diff --git a/source/libs/function/src/tscript.c b/source/libs/function/src/tscript.c
index 768581285b..eecc66d6d6 100644
--- a/source/libs/function/src/tscript.c
+++ b/source/libs/function/src/tscript.c
@@ -92,7 +92,7 @@ void taosValueToLuaType(lua_State *lua, int32_t type, char *val) {
int taosLoadScriptInit(void* pInit) {
ScriptCtx *pCtx = pInit;
char funcName[MAX_FUNC_NAME] = {0};
- sprintf(funcName, "%s_init", pCtx->funcName);
+ snprintf(funcName, MAX_FUNC_NAME, "%s_init", pCtx->funcName);
lua_State* lua = pCtx->pEnv->lua_state;
lua_getglobal(lua, funcName);
@@ -106,7 +106,7 @@ void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iByt
int64_t *ptsList, int64_t key, char* pOutput, char *ptsOutput, int32_t *numOfOutput, int16_t oType, int16_t oBytes) {
ScriptCtx* pCtx = pInit;
char funcName[MAX_FUNC_NAME] = {0};
- sprintf(funcName, "%s_add", pCtx->funcName);
+ snprintf(funcName, MAX_FUNC_NAME, "%s_add", pCtx->funcName);
lua_State* lua = pCtx->pEnv->lua_state;
lua_getglobal(lua, funcName);
@@ -143,7 +143,7 @@ void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iByt
void taosLoadScriptMerge(void *pInit, char* data, int32_t numOfRows, char* pOutput, int32_t* numOfOutput) {
ScriptCtx *pCtx = pInit;
char funcName[MAX_FUNC_NAME] = {0};
- sprintf(funcName, "%s_merge", pCtx->funcName);
+ snprintf(funcName, MAX_FUNC_NAME, "%s_merge", pCtx->funcName);
lua_State* lua = pCtx->pEnv->lua_state;
lua_getglobal(lua, funcName);
@@ -167,7 +167,7 @@ void taosLoadScriptMerge(void *pInit, char* data, int32_t numOfRows, char* pOutp
void taosLoadScriptFinalize(void *pInit,int64_t key, char *pOutput, int32_t* numOfOutput) {
ScriptCtx *pCtx = pInit;
char funcName[MAX_FUNC_NAME] = {0};
- sprintf(funcName, "%s_finalize", pCtx->funcName);
+ snprintf(funcName, MAX_FUNC_NAME, "%s_finalize", pCtx->funcName);
lua_State* lua = pCtx->pEnv->lua_state;
lua_getglobal(lua, funcName);
diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c
index 7851bd0fed..29196c1df6 100644
--- a/source/libs/parser/src/parAstCreater.c
+++ b/source/libs/parser/src/parAstCreater.c
@@ -1367,6 +1367,25 @@ _err:
return NULL;
}
+SNode* createAnomalyWindowNode(SAstCreateContext* pCxt, SNode* pExpr, const SToken* pFuncOpt) {
+ SAnomalyWindowNode* pAnomaly = NULL;
+ CHECK_PARSER_STATUS(pCxt);
+ pCxt->errCode = nodesMakeNode(QUERY_NODE_ANOMALY_WINDOW, (SNode**)&pAnomaly);
+ CHECK_MAKE_NODE(pAnomaly);
+ pAnomaly->pCol = createPrimaryKeyCol(pCxt, NULL);
+ CHECK_MAKE_NODE(pAnomaly->pCol);
+ pAnomaly->pExpr = pExpr;
+ if (pFuncOpt == NULL) {
+ tstrncpy(pAnomaly->anomalyOpt, "algo=iqr", TSDB_ANAL_ALGO_OPTION_LEN);
+ } else {
+ (void)trimString(pFuncOpt->z, pFuncOpt->n, pAnomaly->anomalyOpt, sizeof(pAnomaly->anomalyOpt));
+ }
+ return (SNode*)pAnomaly;
+_err:
+ nodesDestroyNode((SNode*)pAnomaly);
+ return NULL;
+}
+
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding,
SNode* pFill) {
SIntervalWindowNode* interval = NULL;
@@ -2997,6 +3016,47 @@ _err:
return NULL;
}
+SNode* createCreateAnodeStmt(SAstCreateContext* pCxt, const SToken* pUrl) {
+ CHECK_PARSER_STATUS(pCxt);
+ SCreateAnodeStmt* pStmt = NULL;
+ pCxt->errCode = nodesMakeNode(QUERY_NODE_CREATE_ANODE_STMT, (SNode**)&pStmt);
+ CHECK_MAKE_NODE(pStmt);
+ (void)trimString(pUrl->z, pUrl->n, pStmt->url, sizeof(pStmt->url));
+ return (SNode*)pStmt;
+_err:
+ return NULL;
+}
+
+SNode* createDropAnodeStmt(SAstCreateContext* pCxt, const SToken* pAnode) {
+ CHECK_PARSER_STATUS(pCxt);
+ SUpdateAnodeStmt* pStmt = NULL;
+ pCxt->errCode = nodesMakeNode(QUERY_NODE_DROP_ANODE_STMT, (SNode**)&pStmt);
+ CHECK_MAKE_NODE(pStmt);
+ if (NULL != pAnode) {
+ pStmt->anodeId = taosStr2Int32(pAnode->z, NULL, 10);
+ } else {
+ pStmt->anodeId = -1;
+ }
+ return (SNode*)pStmt;
+_err:
+ return NULL;
+}
+
+SNode* createUpdateAnodeStmt(SAstCreateContext* pCxt, const SToken* pAnode, bool updateAll) {
+ CHECK_PARSER_STATUS(pCxt);
+ SUpdateAnodeStmt* pStmt = NULL;
+ pCxt->errCode = nodesMakeNode(QUERY_NODE_UPDATE_ANODE_STMT, (SNode**)&pStmt);
+ CHECK_MAKE_NODE(pStmt);
+ if (NULL != pAnode) {
+ pStmt->anodeId = taosStr2Int32(pAnode->z, NULL, 10);
+ } else {
+ pStmt->anodeId = -1;
+ }
+ return (SNode*)pStmt;
+_err:
+ return NULL;
+}
+
SNode* createEncryptKeyStmt(SAstCreateContext* pCxt, const SToken* pValue) {
SToken config;
config.type = TK_NK_STRING;
diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c
index 1ce8b04324..189afdfcd3 100644
--- a/source/libs/parser/src/parUtil.c
+++ b/source/libs/parser/src/parUtil.c
@@ -185,6 +185,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "%s is not supported in system table query";
case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE:
return "Invalid usage of RANGE clause, EVERY clause or FILL clause";
+ case TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE:
+ return "Invalid usage of forecast clause";
case TSDB_CODE_PAR_NO_VALID_FUNC_IN_WIN:
return "No valid function in window query";
case TSDB_CODE_PAR_INVALID_OPTR_USAGE:
diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c
index 1bcec86385..3b4e835465 100644
--- a/source/libs/planner/src/planOptimizer.c
+++ b/source/libs/planner/src/planOptimizer.c
@@ -2380,6 +2380,8 @@ static bool sortPriKeyOptHasUnsupportedPkFunc(SLogicNode* pLogicNode, EOrder sor
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
pFuncList = ((SInterpFuncLogicNode*)pLogicNode)->pFuncs;
break;
+ case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
+ pFuncList = ((SForecastFuncLogicNode*)pLogicNode)->pFuncs;
default:
break;
}
diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c
index 706394507a..755dd8739b 100644
--- a/source/libs/planner/src/planSpliter.c
+++ b/source/libs/planner/src/planSpliter.c
@@ -939,6 +939,18 @@ static int32_t stbSplSplitCount(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
}
}
+static int32_t stbSplSplitAnomalyForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
+ return TSDB_CODE_PLAN_INTERNAL_ERROR;
+}
+
+static int32_t stbSplSplitAnomaly(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
+ if (pCxt->pPlanCxt->streamQuery) {
+ return stbSplSplitAnomalyForStream(pCxt, pInfo);
+ } else {
+ return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
+ }
+}
+
static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
case WINDOW_TYPE_INTERVAL:
@@ -951,6 +963,8 @@ static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitI
return stbSplSplitEvent(pCxt, pInfo);
case WINDOW_TYPE_COUNT:
return stbSplSplitCount(pCxt, pInfo);
+ case WINDOW_TYPE_ANOMALY:
+ return stbSplSplitAnomaly(pCxt, pInfo);
default:
break;
}
@@ -2000,7 +2014,8 @@ typedef struct SQnodeSplitInfo {
static bool qndSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
SQnodeSplitInfo* pInfo) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent &&
- QUERY_NODE_LOGIC_PLAN_INTERP_FUNC != nodeType(pNode->pParent) && ((SScanLogicNode*)pNode)->scanSeq[0] <= 1 &&
+ QUERY_NODE_LOGIC_PLAN_INTERP_FUNC != nodeType(pNode->pParent) &&
+ QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC != nodeType(pNode->pParent) && ((SScanLogicNode*)pNode)->scanSeq[0] <= 1 &&
((SScanLogicNode*)pNode)->scanSeq[1] <= 1) {
pInfo->pSplitNode = pNode;
pInfo->pSubplan = pSubplan;
diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c
index a3608cc1dc..e07ef69990 100644
--- a/source/libs/scalar/src/filter.c
+++ b/source/libs/scalar/src/filter.c
@@ -1764,41 +1764,41 @@ _return:
return DEAL_RES_ERROR;
}
-int32_t fltConverToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len) {
+int32_t fltConverToStr(char *str, int32_t strMaxLen, int type, void *buf, int32_t bufSize, int32_t *len) {
int32_t n = 0;
switch (type) {
case TSDB_DATA_TYPE_NULL:
- n = sprintf(str, "null");
+ n = snprintf(str, strMaxLen, "null");
break;
case TSDB_DATA_TYPE_BOOL:
- n = sprintf(str, (*(int8_t *)buf) ? "true" : "false");
+ n = snprintf(str, strMaxLen, (*(int8_t *)buf) ? "true" : "false");
break;
case TSDB_DATA_TYPE_TINYINT:
- n = sprintf(str, "%d", *(int8_t *)buf);
+ n = snprintf(str, strMaxLen, "%d", *(int8_t *)buf);
break;
case TSDB_DATA_TYPE_SMALLINT:
- n = sprintf(str, "%d", *(int16_t *)buf);
+ n = snprintf(str, strMaxLen, "%d", *(int16_t *)buf);
break;
case TSDB_DATA_TYPE_INT:
- n = sprintf(str, "%d", *(int32_t *)buf);
+ n = snprintf(str, strMaxLen, "%d", *(int32_t *)buf);
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
- n = sprintf(str, "%" PRId64, *(int64_t *)buf);
+ n = snprintf(str, strMaxLen, "%" PRId64, *(int64_t *)buf);
break;
case TSDB_DATA_TYPE_FLOAT:
- n = sprintf(str, "%e", GET_FLOAT_VAL(buf));
+ n = snprintf(str, strMaxLen, "%e", GET_FLOAT_VAL(buf));
break;
case TSDB_DATA_TYPE_DOUBLE:
- n = sprintf(str, "%e", GET_DOUBLE_VAL(buf));
+ n = snprintf(str, strMaxLen, "%e", GET_DOUBLE_VAL(buf));
break;
case TSDB_DATA_TYPE_BINARY:
@@ -1817,19 +1817,19 @@ int32_t fltConverToStr(char *str, int type, void *buf, int32_t bufSize, int32_t
break;
case TSDB_DATA_TYPE_UTINYINT:
- n = sprintf(str, "%d", *(uint8_t *)buf);
+ n = snprintf(str, strMaxLen, "%d", *(uint8_t *)buf);
break;
case TSDB_DATA_TYPE_USMALLINT:
- n = sprintf(str, "%d", *(uint16_t *)buf);
+ n = snprintf(str, strMaxLen, "%d", *(uint16_t *)buf);
break;
case TSDB_DATA_TYPE_UINT:
- n = sprintf(str, "%u", *(uint32_t *)buf);
+ n = snprintf(str, strMaxLen, "%u", *(uint32_t *)buf);
break;
case TSDB_DATA_TYPE_UBIGINT:
- n = sprintf(str, "%" PRIu64, *(uint64_t *)buf);
+ n = snprintf(str, strMaxLen, "%" PRIu64, *(uint64_t *)buf);
break;
default:
@@ -1886,8 +1886,8 @@ int32_t filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t optio
SFilterField *left = FILTER_UNIT_LEFT_FIELD(info, unit);
SColumnNode *refNode = (SColumnNode *)left->desc;
if (unit->compare.optr <= OP_TYPE_JSON_CONTAINS) {
- len = sprintf(str, "UNIT[%d] => [%d][%d] %s [", i, refNode->dataBlockId, refNode->slotId,
- operatorTypeStr(unit->compare.optr));
+ len += snprintf(str, sizeof(str), "UNIT[%d] => [%d][%d] %s [", i, refNode->dataBlockId, refNode->slotId,
+ operatorTypeStr(unit->compare.optr));
}
if (unit->right.type == FLD_TYPE_VALUE && FILTER_UNIT_OPTR(unit) != OP_TYPE_IN) {
@@ -1898,18 +1898,22 @@ int32_t filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t optio
data += VARSTR_HEADER_SIZE;
}
if (data) {
- FLT_ERR_RET(fltConverToStr(str + len, type, data, tlen > 32 ? 32 : tlen, &tlen));
+ FLT_ERR_RET(fltConverToStr(str + len, sizeof(str) - len, type, data, tlen > 32 ? 32 : tlen, &tlen));
+ len += tlen;
}
} else {
- (void)strcat(str, "NULL");
+ (void)strncat(str, "NULL", sizeof(str) - len - 1);
+ len += 4;
}
- (void)strcat(str, "]");
+ (void)strncat(str, "]", sizeof(str) - len - 1);
+ len += 1;
if (unit->compare.optr2) {
- (void)strcat(str, " && ");
+ (void)strncat(str, " && ", sizeof(str) - len - 1);
+ len += 4;
if (unit->compare.optr2 <= OP_TYPE_JSON_CONTAINS) {
- (void)sprintf(str + strlen(str), "[%d][%d] %s [", refNode->dataBlockId, refNode->slotId,
- operatorTypeStr(unit->compare.optr2));
+ len += snprintf(str + len, sizeof(str) - len, "[%d][%d] %s [", refNode->dataBlockId,
+ refNode->slotId, operatorTypeStr(unit->compare.optr2));
}
if (unit->right2.type == FLD_TYPE_VALUE && FILTER_UNIT_OPTR(unit) != OP_TYPE_IN) {
@@ -1919,11 +1923,14 @@ int32_t filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t optio
tlen = varDataLen(data);
data += VARSTR_HEADER_SIZE;
}
- FLT_ERR_RET(fltConverToStr(str + strlen(str), type, data, tlen > 32 ? 32 : tlen, &tlen));
+ FLT_ERR_RET(fltConverToStr(str + len, sizeof(str) - len, type, data, tlen > 32 ? 32 : tlen, &tlen));
+ len += tlen;
} else {
- (void)strcat(str, "NULL");
+ (void)strncat(str, "NULL", sizeof(str) - len - 1);
+ len += 4;
}
- (void)strcat(str, "]");
+ (void)strncat(str, "]", sizeof(str) - len - 1);
+ len += 1;
}
qDebug("%s", str); // TODO
@@ -1955,21 +1962,39 @@ int32_t filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t optio
SFilterRangeNode *r = ctx->rs;
int32_t tlen = 0;
while (r) {
- char str[256] = {0};
+ char str[256] = {0};
+ int32_t len = 0;
if (FILTER_GET_FLAG(r->ra.sflag, RANGE_FLG_NULL)) {
- (void)strcat(str, "(NULL)");
+ (void)strncat(str, "(NULL)", sizeof(str) - len - 1);
+ len += 6;
} else {
- FILTER_GET_FLAG(r->ra.sflag, RANGE_FLG_EXCLUDE) ? strcat(str, "(") : strcat(str, "[");
- FLT_ERR_RET(fltConverToStr(str + strlen(str), ctx->type, &r->ra.s, tlen > 32 ? 32 : tlen, &tlen));
- FILTER_GET_FLAG(r->ra.sflag, RANGE_FLG_EXCLUDE) ? strcat(str, ")") : strcat(str, "]");
+ FILTER_GET_FLAG(r->ra.sflag, RANGE_FLG_EXCLUDE) ?
+ (void)strncat(str, "(", sizeof(str) - len - 1) :
+ (void)strncat(str, "[", sizeof(str) - len - 1);
+ len += 1;
+ FLT_ERR_RET(fltConverToStr(str + len, sizeof(str) - len, ctx->type, &r->ra.s, tlen > 32 ? 32 : tlen, &tlen));
+ len += tlen;
+ FILTER_GET_FLAG(r->ra.sflag, RANGE_FLG_EXCLUDE) ?
+ (void)strncat(str, ")", sizeof(str) - len - 1) :
+ (void)strncat(str, "]", sizeof(str) - len - 1);
+ len += 1;
}
- (void)strcat(str, " - ");
+ (void)strncat(str, " - ", sizeof(str) - len - 1);
+ len += 3;
if (FILTER_GET_FLAG(r->ra.eflag, RANGE_FLG_NULL)) {
- (void)strcat(str, "(NULL)");
+ (void)strncat(str, "(NULL)", sizeof(str) - len - 1);
+ len += 6;
} else {
- FILTER_GET_FLAG(r->ra.eflag, RANGE_FLG_EXCLUDE) ? strcat(str, "(") : strcat(str, "[");
- FLT_ERR_RET(fltConverToStr(str + strlen(str), ctx->type, &r->ra.e, tlen > 32 ? 32 : tlen, &tlen));
- FILTER_GET_FLAG(r->ra.eflag, RANGE_FLG_EXCLUDE) ? strcat(str, ")") : strcat(str, "]");
+ FILTER_GET_FLAG(r->ra.eflag, RANGE_FLG_EXCLUDE) ?
+ (void)strncat(str, "(", sizeof(str) - len - 1) :
+ (void)strncat(str, "[", sizeof(str) - len - 1);
+ len += 1;
+ FLT_ERR_RET(fltConverToStr(str + len, sizeof(str) - len, ctx->type, &r->ra.e, tlen > 32 ? 32 : tlen, &tlen));
+ len += tlen;
+ FILTER_GET_FLAG(r->ra.eflag, RANGE_FLG_EXCLUDE) ?
+ (void)strncat(str, ")", sizeof(str) - len - 1) :
+ (void)strncat(str, "]", sizeof(str) - len - 1);
+ len += 1;
}
qDebug("range: %s", str);
diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c
index 2a4951d237..209110b014 100644
--- a/source/libs/scalar/src/scalar.c
+++ b/source/libs/scalar/src/scalar.c
@@ -1211,7 +1211,7 @@ EDealRes sclRewriteFunction(SNode **pNode, SScalarCtx *ctx) {
res->translate = true;
- (void)strcpy(res->node.aliasName, node->node.aliasName);
+ tstrncpy(res->node.aliasName, node->node.aliasName, TSDB_COL_NAME_LEN);
res->node.resType.type = output.columnData->info.type;
res->node.resType.bytes = output.columnData->info.bytes;
res->node.resType.scale = output.columnData->info.scale;
@@ -1286,7 +1286,7 @@ EDealRes sclRewriteLogic(SNode **pNode, SScalarCtx *ctx) {
res->node.resType = node->node.resType;
res->translate = true;
- (void)strcpy(res->node.aliasName, node->node.aliasName);
+ tstrncpy(res->node.aliasName, node->node.aliasName, TSDB_COL_NAME_LEN);
int32_t type = output.columnData->info.type;
if (IS_VAR_DATA_TYPE(type)) {
res->datum.p = output.columnData->pData;
@@ -1356,7 +1356,7 @@ EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) {
res->translate = true;
- (void)strcpy(res->node.aliasName, node->node.aliasName);
+ tstrncpy(res->node.aliasName, node->node.aliasName, TSDB_COL_NAME_LEN);
res->node.resType = node->node.resType;
if (colDataIsNull_s(output.columnData, 0)) {
res->isNull = true;
@@ -1419,7 +1419,7 @@ EDealRes sclRewriteCaseWhen(SNode **pNode, SScalarCtx *ctx) {
res->translate = true;
- (void)strcpy(res->node.aliasName, node->node.aliasName);
+ tstrncpy(res->node.aliasName, node->node.aliasName, TSDB_COL_NAME_LEN);
res->node.resType = node->node.resType;
if (colDataIsNull_s(output.columnData, 0)) {
res->isNull = true;
diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c
index ff47c091b7..f408314fad 100644
--- a/source/libs/scalar/src/sclfunc.c
+++ b/source/libs/scalar/src/sclfunc.c
@@ -2067,9 +2067,9 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_GEOMETRY: {
if (inputType == TSDB_DATA_TYPE_BOOL) {
- // NOTE: sprintf will append '\0' at the end of string
- int32_t len = sprintf(varDataVal(output), "%.*s", (int32_t)(outputLen - VARSTR_HEADER_SIZE),
- *(int8_t *)input ? "true" : "false");
+ // NOTE: snprintf will append '\0' at the end of string
+ int32_t len = snprintf(varDataVal(output), outputLen + TSDB_NCHAR_SIZE - VARSTR_HEADER_SIZE, "%.*s",
+ (int32_t)(outputLen - VARSTR_HEADER_SIZE), *(int8_t *)input ? "true" : "false");
varDataSetLen(output, len);
} else if (inputType == TSDB_DATA_TYPE_BINARY) {
int32_t len = TMIN(varDataLen(input), outputLen - VARSTR_HEADER_SIZE);
@@ -2109,7 +2109,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
int32_t len;
if (inputType == TSDB_DATA_TYPE_BOOL) {
char tmp[8] = {0};
- len = sprintf(tmp, "%.*s", outputCharLen, *(int8_t *)input ? "true" : "false");
+ len = snprintf(tmp, sizeof(tmp), "%.*s", outputCharLen, *(int8_t *)input ? "true" : "false");
bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
if (!ret) {
code = TSDB_CODE_SCALAR_CONVERT_ERROR;
@@ -4411,11 +4411,11 @@ int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
int32_t len;
char buf[512] = {0};
if (!normalized) {
- len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%" PRId64 "}", bins[k].lower,
- bins[k].upper, bins[k].count);
+ len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%" PRId64 "}",
+ bins[k].lower, bins[k].upper, bins[k].count);
} else {
- len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}", bins[k].lower,
- bins[k].upper, bins[k].percentage);
+ len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}",
+ bins[k].lower, bins[k].upper, bins[k].percentage);
}
varDataSetLen(buf, len);
SCL_ERR_JRET(colDataSetVal(pOutputData, k, buf, false));
diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c
index 230454483d..a7c842172a 100644
--- a/source/libs/scalar/src/sclvector.c
+++ b/source/libs/scalar/src/sclvector.c
@@ -734,7 +734,7 @@ int32_t vectorConvertToVarData(SSclVectorConvCtx *pCtx) {
int64_t value = 0;
GET_TYPED_DATA(value, int64_t, pCtx->inType, colDataGetData(pInputCol, i));
- int32_t len = sprintf(varDataVal(tmp), "%" PRId64, value);
+ int32_t len = snprintf(varDataVal(tmp), sizeof(tmp) - VARSTR_HEADER_SIZE, "%" PRId64, value);
varDataLen(tmp) = len;
if (pCtx->outType == TSDB_DATA_TYPE_NCHAR) {
SCL_ERR_RET(varToNchar(tmp, pCtx->pOut, i, NULL));
@@ -751,7 +751,7 @@ int32_t vectorConvertToVarData(SSclVectorConvCtx *pCtx) {
uint64_t value = 0;
GET_TYPED_DATA(value, uint64_t, pCtx->inType, colDataGetData(pInputCol, i));
- int32_t len = sprintf(varDataVal(tmp), "%" PRIu64, value);
+ int32_t len = snprintf(varDataVal(tmp), sizeof(tmp) - VARSTR_HEADER_SIZE, "%" PRIu64, value);
varDataLen(tmp) = len;
if (pCtx->outType == TSDB_DATA_TYPE_NCHAR) {
SCL_ERR_RET(varToNchar(tmp, pCtx->pOut, i, NULL));
@@ -768,7 +768,7 @@ int32_t vectorConvertToVarData(SSclVectorConvCtx *pCtx) {
double value = 0;
GET_TYPED_DATA(value, double, pCtx->inType, colDataGetData(pInputCol, i));
- int32_t len = sprintf(varDataVal(tmp), "%lf", value);
+ int32_t len = snprintf(varDataVal(tmp), sizeof(tmp) - VARSTR_HEADER_SIZE, "%lf", value);
varDataLen(tmp) = len;
if (pCtx->outType == TSDB_DATA_TYPE_NCHAR) {
SCL_ERR_RET(varToNchar(tmp, pCtx->pOut, i, NULL));
diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp
index 70d6f7d0ae..8bbadd0e22 100644
--- a/source/libs/scalar/test/filter/filterTests.cpp
+++ b/source/libs/scalar/test/filter/filterTests.cpp
@@ -55,7 +55,7 @@ void flttInitLogFile() {
tsAsyncLog = 0;
qDebugFlag = 159;
- (void)strcpy(tsLogDir, TD_LOG_DIR_PATH);
+ tstrncpy(tsLogDir, TD_LOG_DIR_PATH, PATH_MAX);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum, false) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
@@ -101,7 +101,7 @@ int32_t flttMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType,
rnode->node.resType.bytes = dataBytes;
rnode->dataBlockId = 0;
- sprintf(rnode->dbName, "%" PRIu64, dbidx++);
+ snprintf(rnode->dbName, TSDB_DB_NAME_LEN, "%" PRIu64, dbidx++);
if (NULL == block) {
rnode->slotId = 2;
@@ -666,7 +666,7 @@ TEST(columnTest, binary_column_like_binary) {
int32_t rowNum = sizeof(leftv) / sizeof(leftv[0]);
flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_BINARY, 3, rowNum, leftv);
- sprintf(&rightv[2], "%s", "__0");
+ snprintf(&rightv[2], sizeof(rightv) - 2, "%s", "__0");
varDataSetLen(rightv, strlen(&rightv[2]));
flttMakeValueNode(&pRight, TSDB_DATA_TYPE_BINARY, rightv);
flttMakeOpNode(&opNode, OP_TYPE_LIKE, TSDB_DATA_TYPE_BOOL, pLeft, pRight);
diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp
index e14b772ea8..4cab644582 100644
--- a/source/libs/scalar/test/scalar/scalarTests.cpp
+++ b/source/libs/scalar/test/scalar/scalarTests.cpp
@@ -81,7 +81,7 @@ void scltInitLogFile() {
tsAsyncLog = 0;
qDebugFlag = 159;
- (void)strcpy(tsLogDir, TD_LOG_DIR_PATH);
+ tstrncpy(tsLogDir, TD_LOG_DIR_PATH, PATH_MAX);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum, false) < 0) {
(void)printf("failed to open log file in directory:%s\n", tsLogDir);
diff --git a/source/libs/tdb/src/db/tdbPage.c b/source/libs/tdb/src/db/tdbPage.c
index be391a75f1..49a15070a6 100644
--- a/source/libs/tdb/src/db/tdbPage.c
+++ b/source/libs/tdb/src/db/tdbPage.c
@@ -102,6 +102,10 @@ void tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg
tdbOsFree(pPage->apOvfl[iOvfl]);
}
+ if (TDB_DESTROY_PAGE_LOCK(pPage) != 0) {
+ tdbError("tdb/page-destroy: destroy page lock failed.");
+ }
+
ptr = pPage->pData;
xFree(arg, ptr);
diff --git a/tests/develop-test/2-query/table_count_scan.py b/tests/develop-test/2-query/table_count_scan.py
index 38e35a175e..b2b48c1f0b 100644
--- a/tests/develop-test/2-query/table_count_scan.py
+++ b/tests/develop-test/2-query/table_count_scan.py
@@ -65,7 +65,7 @@ class TDTestCase:
tdSql.query('select count(*),db_name, stable_name from information_schema.ins_tables group by db_name, stable_name;')
tdSql.checkRows(3)
- tdSql.checkData(0, 0, 32)
+ tdSql.checkData(0, 0, 34)
tdSql.checkData(0, 1, 'information_schema')
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 0, 3)
@@ -77,7 +77,7 @@ class TDTestCase:
tdSql.query('select count(1) v,db_name, stable_name from information_schema.ins_tables group by db_name, stable_name order by v desc;')
tdSql.checkRows(3)
- tdSql.checkData(0, 0, 32)
+ tdSql.checkData(0, 0, 34)
tdSql.checkData(0, 1, 'information_schema')
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 0, 5)
@@ -93,7 +93,7 @@ class TDTestCase:
tdSql.checkData(1, 1, 'performance_schema')
tdSql.checkData(0, 0, 3)
tdSql.checkData(0, 1, 'tbl_count')
- tdSql.checkData(2, 0, 32)
+ tdSql.checkData(2, 0, 34)
tdSql.checkData(2, 1, 'information_schema')
tdSql.query("select count(*) from information_schema.ins_tables where db_name='tbl_count'")
@@ -106,7 +106,7 @@ class TDTestCase:
tdSql.query('select count(*) from information_schema.ins_tables')
tdSql.checkRows(1)
- tdSql.checkData(0, 0, 40)
+ tdSql.checkData(0, 0, 42)
tdSql.execute('create table stba (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(10), c9 nchar(10), c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) TAGS(t1 int, t2 binary(10), t3 double);')
@@ -189,7 +189,7 @@ class TDTestCase:
tdSql.checkData(2, 0, 5)
tdSql.checkData(2, 1, 'performance_schema')
tdSql.checkData(2, 2, None)
- tdSql.checkData(3, 0, 32)
+ tdSql.checkData(3, 0, 34)
tdSql.checkData(3, 1, 'information_schema')
tdSql.checkData(3, 2, None)
@@ -204,7 +204,7 @@ class TDTestCase:
tdSql.checkData(2, 0, 5)
tdSql.checkData(2, 1, 'performance_schema')
tdSql.checkData(2, 2, None)
- tdSql.checkData(3, 0, 32)
+ tdSql.checkData(3, 0, 34)
tdSql.checkData(3, 1, 'information_schema')
tdSql.checkData(3, 2, None)
@@ -215,7 +215,7 @@ class TDTestCase:
tdSql.checkData(0, 1, 'tbl_count')
tdSql.checkData(1, 0, 5)
tdSql.checkData(1, 1, 'performance_schema')
- tdSql.checkData(2, 0, 32)
+ tdSql.checkData(2, 0, 34)
tdSql.checkData(2, 1, 'information_schema')
tdSql.query("select count(*) from information_schema.ins_tables where db_name='tbl_count'")
@@ -228,7 +228,7 @@ class TDTestCase:
tdSql.query('select count(*) from information_schema.ins_tables')
tdSql.checkRows(1)
- tdSql.checkData(0, 0, 41)
+ tdSql.checkData(0, 0, 43)
tdSql.execute('drop database tbl_count')
diff --git a/tests/script/tsim/query/sys_tbname.sim b/tests/script/tsim/query/sys_tbname.sim
index dabe4fcdde..9736893428 100644
--- a/tests/script/tsim/query/sys_tbname.sim
+++ b/tests/script/tsim/query/sys_tbname.sim
@@ -58,7 +58,7 @@ endi
sql select tbname from information_schema.ins_tables;
print $rows $data00
-if $rows != 41 then
+if $rows != 43 then
return -1
endi
if $data00 != @ins_tables@ then
diff --git a/tests/script/tsim/query/tableCount.sim b/tests/script/tsim/query/tableCount.sim
index 5a3dd0714f..87f72eb3b6 100644
--- a/tests/script/tsim/query/tableCount.sim
+++ b/tests/script/tsim/query/tableCount.sim
@@ -53,7 +53,7 @@ sql select stable_name,count(table_name) from information_schema.ins_tables grou
if $rows != 3 then
return -1
endi
-if $data01 != 38 then
+if $data01 != 40 then
return -1
endi
if $data11 != 10 then
@@ -72,7 +72,7 @@ endi
if $data11 != 5 then
return -1
endi
-if $data21 != 32 then
+if $data21 != 34 then
return -1
endi
if $data31 != 5 then
@@ -97,7 +97,7 @@ endi
if $data42 != 3 then
return -1
endi
-if $data52 != 32 then
+if $data52 != 34 then
return -1
endi
if $data62 != 5 then
diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py
index f59410b552..01e416bb26 100644
--- a/tests/system-test/0-others/information_schema.py
+++ b/tests/system-test/0-others/information_schema.py
@@ -61,7 +61,7 @@ class TDTestCase:
self.ins_list = ['ins_dnodes','ins_mnodes','ins_qnodes','ins_snodes','ins_cluster','ins_databases','ins_functions',\
'ins_indexes','ins_stables','ins_tables','ins_tags','ins_columns','ins_users','ins_grants','ins_vgroups','ins_configs','ins_dnode_variables',\
'ins_topics','ins_subscriptions','ins_streams','ins_stream_tasks','ins_vnodes','ins_user_privileges','ins_views',
- 'ins_compacts', 'ins_compact_details', 'ins_grants_full','ins_grants_logs', 'ins_machines', 'ins_arbgroups', 'ins_tsmas', "ins_encryptions"]
+ 'ins_compacts', 'ins_compact_details', 'ins_grants_full','ins_grants_logs', 'ins_machines', 'ins_arbgroups', 'ins_tsmas', "ins_encryptions", "ins_anodes", "ins_anodes_full"]
self.perf_list = ['perf_connections','perf_queries','perf_consumers','perf_trans','perf_apps']
def insert_data(self,column_dict,tbname,row_num):
insert_sql = self.setsql.set_insertsql(column_dict,tbname,self.binary_str,self.nchar_str)
@@ -222,7 +222,7 @@ class TDTestCase:
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdLog.info(len(tdSql.queryResult))
- tdSql.checkEqual(True, len(tdSql.queryResult) in range(272, 273))
+ tdSql.checkEqual(True, len(tdSql.queryResult) in range(280, 281))
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(56, len(tdSql.queryResult))