Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_optimize
This commit is contained in:
commit
cab6bb1be2
|
@ -33,7 +33,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
|
||||||
关键不同点在于:
|
关键不同点在于:
|
||||||
|
|
||||||
1. 使用 REST 连接,用户无需安装客户端驱动程序 taosc,具有跨平台易用的优势,但性能要下降 30%左右。
|
1. 使用 REST 连接,用户无需安装客户端驱动程序 taosc,具有跨平台易用的优势,但性能要下降 30%左右。
|
||||||
2. 使用原生连接可以体验 TDengine 的全部功能,如[参数绑定接口](../connector/cpp/#参数绑定-api)、[订阅](../connector/cpp/#订阅和消费-api)等等。
|
2. 使用原生连接可以体验 TDengine 的全部功能,如[参数绑定接口](../../connector/cpp/#参数绑定-api)、[订阅](../../connector/cpp/#订阅和消费-api)等等。
|
||||||
|
|
||||||
## 安装客户端驱动 taosc
|
## 安装客户端驱动 taosc
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@ title: 开发指南
|
||||||
7. 在很多场景下(如车辆管理),应用需要获取每个数据采集点的最新状态,那么建议你采用TDengine的cache功能,而不用单独部署Redis等缓存软件。
|
7. 在很多场景下(如车辆管理),应用需要获取每个数据采集点的最新状态,那么建议你采用TDengine的cache功能,而不用单独部署Redis等缓存软件。
|
||||||
8. 如果你发现TDengine的函数无法满足你的要求,那么你可以使用用户自定义函数来解决问题。
|
8. 如果你发现TDengine的函数无法满足你的要求,那么你可以使用用户自定义函数来解决问题。
|
||||||
|
|
||||||
本部分内容就是按照上述的顺序组织的。为便于理解,TDengine为每个功能为每个支持的编程语言都提供了示例代码。如果你希望深入了解SQL的使用,需要查看[SQL手册](/taos-sql/)。如果想更深入地了解各连接器的使用,请阅读[连接器参考指南](../reference/connector/)。如果还希望想将TDengine与第三方系统集成起来,比如Grafana, 请参考[第三方工具](/third-party/)。
|
本部分内容就是按照上述的顺序组织的。为便于理解,TDengine为每个功能为每个支持的编程语言都提供了示例代码。如果你希望深入了解SQL的使用,需要查看[SQL手册](/taos-sql/)。如果想更深入地了解各连接器的使用,请阅读[连接器参考指南](../connector/)。如果还希望想将TDengine与第三方系统集成起来,比如Grafana, 请参考[第三方工具](../third-party/)。
|
||||||
|
|
||||||
如果在开发过程中遇到任何问题,请点击每个页面下方的["反馈问题"](https://github.com/taosdata/TDengine/issues/new/choose), 在GitHub上直接递交issue。
|
如果在开发过程中遇到任何问题,请点击每个页面下方的["反馈问题"](https://github.com/taosdata/TDengine/issues/new/choose), 在GitHub上直接递交issue。
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ TDengine 客户端驱动的动态库位于:
|
||||||
|
|
||||||
## 支持的平台
|
## 支持的平台
|
||||||
|
|
||||||
请参考[支持的平台列表](../connector#支持的平台)
|
请参考[支持的平台列表](../#支持的平台)
|
||||||
|
|
||||||
## 支持的版本
|
## 支持的版本
|
||||||
|
|
||||||
|
@ -30,7 +30,7 @@ TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一
|
||||||
|
|
||||||
## 安装步骤
|
## 安装步骤
|
||||||
|
|
||||||
TDengine 客户端驱动的安装请参考 [安装指南](../connector#安装步骤)
|
TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤)
|
||||||
|
|
||||||
## 建立连接
|
## 建立连接
|
||||||
|
|
|
@ -35,7 +35,7 @@ REST 连接支持所有能运行 Java 的平台。
|
||||||
|
|
||||||
## 版本支持
|
## 版本支持
|
||||||
|
|
||||||
请参考[版本支持列表](../connector#版本支持)
|
请参考[版本支持列表](../#版本支持)
|
||||||
|
|
||||||
## TDengine DataType 和 Java DataType
|
## TDengine DataType 和 Java DataType
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对
|
||||||
使用 Java Connector 连接数据库前,需要具备以下条件:
|
使用 Java Connector 连接数据库前,需要具备以下条件:
|
||||||
|
|
||||||
- 已安装 Java 1.8 或以上版本运行时环境和 Maven 3.6 或以上版本
|
- 已安装 Java 1.8 或以上版本运行时环境和 Maven 3.6 或以上版本
|
||||||
- 已安装 TDengine 客户端驱动(使用原生连接必须安装,使用 REST 连接无需安装),具体步骤请参考[安装客户端驱动](../connector#安装客户端驱动)
|
- 已安装 TDengine 客户端驱动(使用原生连接必须安装,使用 REST 连接无需安装),具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
|
||||||
|
|
||||||
### 安装连接器
|
### 安装连接器
|
||||||
|
|
||||||
|
@ -630,7 +630,7 @@ public void setNString(int columnIndex, ArrayList<String> list, int size) throws
|
||||||
|
|
||||||
### 无模式写入
|
### 无模式写入
|
||||||
|
|
||||||
TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../schemaless)。
|
TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../reference/schemaless/)。
|
||||||
|
|
||||||
**注意**:
|
**注意**:
|
||||||
|
|
|
@ -30,7 +30,7 @@ REST 连接支持所有能运行 Go 的平台。
|
||||||
|
|
||||||
## 版本支持
|
## 版本支持
|
||||||
|
|
||||||
请参考[版本支持列表](../connector#版本支持)
|
请参考[版本支持列表](../#版本支持)
|
||||||
|
|
||||||
## 支持的功能特性
|
## 支持的功能特性
|
||||||
|
|
||||||
|
@ -56,7 +56,7 @@ REST 连接支持所有能运行 Go 的平台。
|
||||||
### 安装前准备
|
### 安装前准备
|
||||||
|
|
||||||
* 安装 Go 开发环境(Go 1.14 及以上,GCC 4.8.5 及以上)
|
* 安装 Go 开发环境(Go 1.14 及以上,GCC 4.8.5 及以上)
|
||||||
* 如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../connector#安装客户端驱动)
|
* 如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
|
||||||
|
|
||||||
配置好环境变量,检查命令:
|
配置好环境变量,检查命令:
|
||||||
|
|
|
@ -28,7 +28,7 @@ Websocket 连接支持所有能运行 Rust 的平台。
|
||||||
|
|
||||||
## 版本支持
|
## 版本支持
|
||||||
|
|
||||||
请参考[版本支持列表](../connector#版本支持)
|
请参考[版本支持列表](../#版本支持)
|
||||||
|
|
||||||
Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。
|
Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容
|
||||||
### 安装前准备
|
### 安装前准备
|
||||||
|
|
||||||
* 安装 Rust 开发工具链
|
* 安装 Rust 开发工具链
|
||||||
* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../connector#安装客户端驱动)
|
* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
|
||||||
|
|
||||||
### 添加 taos 依赖
|
### 添加 taos 依赖
|
||||||
|
|
|
@ -8,7 +8,7 @@ description: "taospy 是 TDengine 的官方 Python 连接器。taospy 提供了
|
||||||
import Tabs from "@theme/Tabs";
|
import Tabs from "@theme/Tabs";
|
||||||
import TabItem from "@theme/TabItem";
|
import TabItem from "@theme/TabItem";
|
||||||
|
|
||||||
`taospy` 是 TDengine 的官方 Python 连接器。`taospy` 提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。`taospy` 对 TDengine 的[原生接口](../connector/cpp)和 [REST 接口](../rest-api)都进行了封装, 分别对应 `taospy` 包的 `taos` 模块 和 `taosrest` 模块。
|
`taospy` 是 TDengine 的官方 Python 连接器。`taospy` 提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。`taospy` 对 TDengine 的[原生接口](../cpp)和 [REST 接口](../rest-api)都进行了封装, 分别对应 `taospy` 包的 `taos` 模块 和 `taosrest` 模块。
|
||||||
除了对原生接口和 REST 接口的封装,`taospy` 还提供了符合 [Python 数据访问规范(PEP 249)](https://peps.python.org/pep-0249/) 的编程接口。这使得 `taospy` 和很多第三方工具集成变得简单,比如 [SQLAlchemy](https://www.sqlalchemy.org/) 和 [pandas](https://pandas.pydata.org/)。
|
除了对原生接口和 REST 接口的封装,`taospy` 还提供了符合 [Python 数据访问规范(PEP 249)](https://peps.python.org/pep-0249/) 的编程接口。这使得 `taospy` 和很多第三方工具集成变得简单,比如 [SQLAlchemy](https://www.sqlalchemy.org/) 和 [pandas](https://pandas.pydata.org/)。
|
||||||
|
|
||||||
使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 REST 接口与服务端建立的连接的方式下文中称为“REST 连接”。
|
使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 REST 接口与服务端建立的连接的方式下文中称为“REST 连接”。
|
||||||
|
@ -17,7 +17,7 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con
|
||||||
|
|
||||||
## 支持的平台
|
## 支持的平台
|
||||||
|
|
||||||
- 原生连接[支持的平台](../connector/#支持的平台)和 TDengine 客户端支持的平台一致。
|
- 原生连接[支持的平台](../#支持的平台)和 TDengine 客户端支持的平台一致。
|
||||||
- REST 连接支持所有能运行 Python 的平台。
|
- REST 连接支持所有能运行 Python 的平台。
|
||||||
|
|
||||||
## 版本选择
|
## 版本选择
|
|
@ -28,7 +28,7 @@ REST 连接器支持所有能运行 Node.js 的平台。
|
||||||
|
|
||||||
## 版本支持
|
## 版本支持
|
||||||
|
|
||||||
请参考[版本支持列表](../connector#版本支持)
|
请参考[版本支持列表](../#版本支持)
|
||||||
|
|
||||||
## 支持的功能特性
|
## 支持的功能特性
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ REST 连接器支持所有能运行 Node.js 的平台。
|
||||||
### 安装前准备
|
### 安装前准备
|
||||||
|
|
||||||
- 安装 Node.js 开发环境
|
- 安装 Node.js 开发环境
|
||||||
- 如果使用 REST 连接器,跳过此步。但如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../connector#安装客户端驱动)。我们使用 [node-gyp](https://github.com/nodejs/node-gyp) 和 TDengine 实例进行交互,还需要根据具体操作系统来安装下文提到的一些依赖工具。
|
- 如果使用 REST 连接器,跳过此步。但如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)。我们使用 [node-gyp](https://github.com/nodejs/node-gyp) 和 TDengine 实例进行交互,还需要根据具体操作系统来安装下文提到的一些依赖工具。
|
||||||
|
|
||||||
<Tabs defaultValue="Linux">
|
<Tabs defaultValue="Linux">
|
||||||
<TabItem value="Linux" label="Linux 系统安装依赖工具">
|
<TabItem value="Linux" label="Linux 系统安装依赖工具">
|
|
@ -32,7 +32,7 @@ import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx"
|
||||||
|
|
||||||
## 版本支持
|
## 版本支持
|
||||||
|
|
||||||
请参考[版本支持列表](../connector/#版本支持)
|
请参考[版本支持列表](../#版本支持)
|
||||||
|
|
||||||
## 支持的功能特性
|
## 支持的功能特性
|
||||||
|
|
||||||
|
@ -49,7 +49,7 @@ import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx"
|
||||||
|
|
||||||
* 安装 [.NET SDK](https://dotnet.microsoft.com/download)
|
* 安装 [.NET SDK](https://dotnet.microsoft.com/download)
|
||||||
* [Nuget 客户端](https://docs.microsoft.com/en-us/nuget/install-nuget-client-tools) (可选安装)
|
* [Nuget 客户端](https://docs.microsoft.com/en-us/nuget/install-nuget-client-tools) (可选安装)
|
||||||
* 安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../connector/#安装客户端驱动)
|
* 安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
|
||||||
|
|
||||||
### 使用 dotnet CLI 安装
|
### 使用 dotnet CLI 安装
|
||||||
|
|
|
@ -38,7 +38,7 @@ TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一
|
||||||
|
|
||||||
### 安装 TDengine 客户端驱动
|
### 安装 TDengine 客户端驱动
|
||||||
|
|
||||||
TDengine 客户端驱动的安装请参考 [安装指南](../connector#安装步骤)
|
TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤)
|
||||||
|
|
||||||
### 编译安装 php-tdengine
|
### 编译安装 php-tdengine
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
---
|
---
|
||||||
|
sidebar_label: 连接器
|
||||||
title: 连接器
|
title: 连接器
|
||||||
|
description: 详细介绍各种语言的连接器及 REST API
|
||||||
---
|
---
|
||||||
|
|
||||||
TDengine 提供了丰富的应用程序开发接口,为了便于用户快速开发自己的应用,TDengine 支持了多种编程语言的连接器,其中官方连接器包括支持 C/C++、Java、Python、Go、Node.js、C# 和 Rust 的连接器。这些连接器支持使用原生接口(taosc)和 REST 接口(部分语言暂不支持)连接 TDengine 集群。社区开发者也贡献了多个非官方连接器,例如 ADO.NET 连接器、Lua 连接器和 PHP 连接器。
|
TDengine 提供了丰富的应用程序开发接口,为了便于用户快速开发自己的应用,TDengine 支持了多种编程语言的连接器,其中官方连接器包括支持 C/C++、Java、Python、Go、Node.js、C# 和 Rust 的连接器。这些连接器支持使用原生接口(taosc)和 REST 接口(部分语言暂不支持)连接 TDengine 集群。社区开发者也贡献了多个非官方连接器,例如 ADO.NET 连接器、Lua 连接器和 PHP 连接器。
|
|
@ -18,7 +18,7 @@ stream_options: {
|
||||||
其中 subquery 是 select 普通查询语法的子集:
|
其中 subquery 是 select 普通查询语法的子集:
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
subquery: SELECT [DISTINCT] select_list
|
subquery: SELECT select_list
|
||||||
from_clause
|
from_clause
|
||||||
[WHERE condition]
|
[WHERE condition]
|
||||||
[PARTITION BY tag_list]
|
[PARTITION BY tag_list]
|
||||||
|
@ -37,7 +37,7 @@ window_clause: {
|
||||||
|
|
||||||
其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。
|
其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。
|
||||||
|
|
||||||
窗口的定义与时序数据特色查询中的定义完全相同。
|
窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished)
|
||||||
|
|
||||||
例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。
|
例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// clang-format off
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
@ -94,13 +95,8 @@ int32_t create_stream() {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
/*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
|
|
||||||
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
|
||||||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
|
||||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
|
||||||
pRes = taos_query(pConn,
|
pRes = taos_query(pConn,
|
||||||
"create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, "
|
"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, k from st1 partition by tbname state_window(k)");
|
||||||
"count(k) from st1 partition by tbname interval(20s) ");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -29,7 +29,7 @@ typedef void* DataSinkHandle;
|
||||||
struct SRpcMsg;
|
struct SRpcMsg;
|
||||||
struct SSubplan;
|
struct SSubplan;
|
||||||
|
|
||||||
typedef struct SReadHandle {
|
typedef struct {
|
||||||
void* tqReader;
|
void* tqReader;
|
||||||
void* meta;
|
void* meta;
|
||||||
void* config;
|
void* config;
|
||||||
|
@ -41,6 +41,7 @@ typedef struct SReadHandle {
|
||||||
bool initTableReader;
|
bool initTableReader;
|
||||||
bool initTqReader;
|
bool initTqReader;
|
||||||
int32_t numOfVgroups;
|
int32_t numOfVgroups;
|
||||||
|
void* pStateBackend;
|
||||||
} SReadHandle;
|
} SReadHandle;
|
||||||
|
|
||||||
// in queue mode, data streams are seperated by msg
|
// in queue mode, data streams are seperated by msg
|
||||||
|
@ -163,7 +164,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
|
||||||
|
|
||||||
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
|
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
|
||||||
|
|
||||||
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList/*,int32_t* resNum, SExplainExecInfo** pRes*/);
|
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/);
|
||||||
|
|
||||||
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
|
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
|
||||||
|
|
||||||
|
|
|
@ -263,6 +263,14 @@ typedef struct {
|
||||||
SArray* checkpointVer;
|
SArray* checkpointVer;
|
||||||
} SStreamRecoveringState;
|
} SStreamRecoveringState;
|
||||||
|
|
||||||
|
// incremental state storage
|
||||||
|
typedef struct {
|
||||||
|
SStreamTask* pOwner;
|
||||||
|
TDB* db;
|
||||||
|
TTB* pStateDb;
|
||||||
|
TXN txn;
|
||||||
|
} SStreamState;
|
||||||
|
|
||||||
typedef struct SStreamTask {
|
typedef struct SStreamTask {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
|
@ -312,6 +320,10 @@ typedef struct SStreamTask {
|
||||||
|
|
||||||
// msg handle
|
// msg handle
|
||||||
SMsgCb* pMsgCb;
|
SMsgCb* pMsgCb;
|
||||||
|
|
||||||
|
// state backend
|
||||||
|
SStreamState* pState;
|
||||||
|
|
||||||
} SStreamTask;
|
} SStreamTask;
|
||||||
|
|
||||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
|
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
|
||||||
|
@ -507,7 +519,7 @@ typedef struct SStreamMeta {
|
||||||
char* path;
|
char* path;
|
||||||
TDB* db;
|
TDB* db;
|
||||||
TTB* pTaskDb;
|
TTB* pTaskDb;
|
||||||
TTB* pStateDb;
|
TTB* pCheckpointDb;
|
||||||
SHashObj* pTasks;
|
SHashObj* pTasks;
|
||||||
SHashObj* pRecoverStatus;
|
SHashObj* pRecoverStatus;
|
||||||
void* ahandle;
|
void* ahandle;
|
||||||
|
@ -528,6 +540,36 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaRollBack(SStreamMeta* pMeta);
|
int32_t streamMetaRollBack(SStreamMeta* pMeta);
|
||||||
int32_t streamLoadTasks(SStreamMeta* pMeta);
|
int32_t streamLoadTasks(SStreamMeta* pMeta);
|
||||||
|
|
||||||
|
SStreamState* streamStateOpen(char* path, SStreamTask* pTask);
|
||||||
|
void streamStateClose(SStreamState* pState);
|
||||||
|
int32_t streamStateBegin(SStreamState* pState);
|
||||||
|
int32_t streamStateCommit(SStreamState* pState);
|
||||||
|
int32_t streamStateAbort(SStreamState* pState);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
TBC* pCur;
|
||||||
|
} SStreamStateCur;
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen);
|
||||||
|
int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen);
|
||||||
|
int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen);
|
||||||
|
|
||||||
|
SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen);
|
||||||
|
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen);
|
||||||
|
SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen);
|
||||||
|
void streamStateFreeCur(SStreamStateCur* pCur);
|
||||||
|
|
||||||
|
int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen);
|
||||||
|
|
||||||
|
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur);
|
||||||
|
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
|
||||||
|
|
||||||
|
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
|
||||||
|
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -5,7 +5,9 @@ target_link_libraries(
|
||||||
PUBLIC sut
|
PUBLIC sut
|
||||||
)
|
)
|
||||||
|
|
||||||
add_test(
|
if(NOT ${TD_WINDOWS})
|
||||||
NAME smaTest
|
add_test(
|
||||||
COMMAND smaTest
|
NAME smaTest
|
||||||
)
|
COMMAND smaTest
|
||||||
|
)
|
||||||
|
endif(NOT ${TD_WINDOWS})
|
||||||
|
|
|
@ -5,7 +5,9 @@ target_link_libraries(
|
||||||
PUBLIC sut
|
PUBLIC sut
|
||||||
)
|
)
|
||||||
|
|
||||||
add_test(
|
if(NOT ${TD_WINDOWS})
|
||||||
NAME stbTest
|
add_test(
|
||||||
COMMAND stbTest
|
NAME stbTest
|
||||||
)
|
COMMAND stbTest
|
||||||
|
)
|
||||||
|
endif(NOT ${TD_WINDOWS})
|
|
@ -79,6 +79,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (streamLoadTasks(pTq->pStreamMeta) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
return pTq;
|
return pTq;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -664,6 +668,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
||||||
ASSERT(pTask->exec.executor);
|
ASSERT(pTask->exec.executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
|
||||||
|
if (pTask->pState == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
// sink
|
// sink
|
||||||
/*pTask->ahandle = pTq->pVnode;*/
|
/*pTask->ahandle = pTq->pVnode;*/
|
||||||
if (pTask->outputType == TASK_OUTPUT__SMA) {
|
if (pTask->outputType == TASK_OUTPUT__SMA) {
|
||||||
|
|
|
@ -150,6 +150,7 @@ typedef struct {
|
||||||
SQueryTableDataCond tableCond;
|
SQueryTableDataCond tableCond;
|
||||||
int64_t recoverStartVer;
|
int64_t recoverStartVer;
|
||||||
int64_t recoverEndVer;
|
int64_t recoverEndVer;
|
||||||
|
SStreamState* pState;
|
||||||
} SStreamTaskInfo;
|
} SStreamTaskInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -392,7 +392,7 @@ static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
|
||||||
|
|
||||||
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
|
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
|
||||||
pCtx->input.colDataAggIsSet = pStatus->hasAgg;
|
pCtx->input.colDataAggIsSet = pStatus->hasAgg;
|
||||||
pCtx->input.numOfRows = pStatus->numOfRows;
|
pCtx->input.numOfRows = pStatus->numOfRows;
|
||||||
pCtx->input.startRowIndex = pStatus->startOffset;
|
pCtx->input.startRowIndex = pStatus->startOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3715,7 +3715,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
||||||
const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
|
const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
|
||||||
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
|
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
|
||||||
|
|
||||||
int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
|
int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
|
||||||
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey);
|
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey);
|
||||||
w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order);
|
w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order);
|
||||||
|
|
||||||
|
@ -3988,15 +3988,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
|
|
||||||
bool assignUid = groupbyTbname(group);
|
bool assignUid = groupbyTbname(group);
|
||||||
|
|
||||||
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
||||||
|
|
||||||
if(assignUid){
|
if (assignUid) {
|
||||||
for (int32_t i = 0; i < numOfTables; i++) {
|
for (int32_t i = 0; i < numOfTables; i++) {
|
||||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
info->groupId = info->uid;
|
info->groupId = info->uid;
|
||||||
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
|
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
|
||||||
}
|
}
|
||||||
}else{
|
} else {
|
||||||
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
|
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -4615,6 +4615,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
|
||||||
goto _complete;
|
goto _complete;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pHandle && pHandle->pStateBackend) {
|
||||||
|
(*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
|
||||||
|
}
|
||||||
|
|
||||||
(*pTaskInfo)->sql = sql;
|
(*pTaskInfo)->sql = sql;
|
||||||
sql = NULL;
|
sql = NULL;
|
||||||
(*pTaskInfo)->pSubplan = pPlan;
|
(*pTaskInfo)->pSubplan = pPlan;
|
||||||
|
|
|
@ -3128,8 +3128,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
||||||
maxTs = TMAX(maxTs, pBlock->info.watermark);
|
maxTs = TMAX(maxTs, pBlock->info.watermark);
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA ||
|
ASSERT(pBlock->info.type != STREAM_INVERT);
|
||||||
pBlock->info.type == STREAM_INVALID) {
|
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
|
||||||
pInfo->binfo.pRes->info.type = pBlock->info.type;
|
pInfo->binfo.pRes->info.type = pBlock->info.type;
|
||||||
} else if (pBlock->info.type == STREAM_CLEAR) {
|
} else if (pBlock->info.type == STREAM_CLEAR) {
|
||||||
SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes));
|
SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes));
|
||||||
|
|
|
@ -140,7 +140,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: handle version
|
|
||||||
int32_t streamExecForAll(SStreamTask* pTask) {
|
int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t batchCnt = 1;
|
int32_t batchCnt = 1;
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "tstream.h"
|
#include "streamInc.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
|
||||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
|
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
|
||||||
|
@ -23,17 +23,23 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pMeta->path = strdup(path);
|
int32_t len = strlen(path) + 20;
|
||||||
|
char* streamPath = taosMemoryCalloc(1, len);
|
||||||
|
sprintf(streamPath, "%s/%s", path, "stream");
|
||||||
|
pMeta->path = strdup(streamPath);
|
||||||
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) {
|
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
|
||||||
|
mkdir(streamPath, 0755);
|
||||||
|
taosMemoryFree(streamPath);
|
||||||
|
|
||||||
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) {
|
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// open state storage backend
|
if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb) < 0) {
|
||||||
if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pStateDb) < 0) {
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,16 +55,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
pMeta->ahandle = ahandle;
|
pMeta->ahandle = ahandle;
|
||||||
pMeta->expandFunc = expandFunc;
|
pMeta->expandFunc = expandFunc;
|
||||||
|
|
||||||
if (streamLoadTasks(pMeta) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
return pMeta;
|
return pMeta;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
if (pMeta->path) taosMemoryFree(pMeta->path);
|
if (pMeta->path) taosMemoryFree(pMeta->path);
|
||||||
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
|
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
|
||||||
if (pMeta->pStateDb) tdbTbClose(pMeta->pStateDb);
|
|
||||||
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
|
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
|
||||||
|
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
|
||||||
if (pMeta->db) tdbClose(pMeta->db);
|
if (pMeta->db) tdbClose(pMeta->db);
|
||||||
taosMemoryFree(pMeta);
|
taosMemoryFree(pMeta);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -67,7 +70,7 @@ _err:
|
||||||
void streamMetaClose(SStreamMeta* pMeta) {
|
void streamMetaClose(SStreamMeta* pMeta) {
|
||||||
tdbCommit(pMeta->db, &pMeta->txn);
|
tdbCommit(pMeta->db, &pMeta->txn);
|
||||||
tdbTbClose(pMeta->pTaskDb);
|
tdbTbClose(pMeta->pTaskDb);
|
||||||
tdbTbClose(pMeta->pStateDb);
|
tdbTbClose(pMeta->pCheckpointDb);
|
||||||
tdbClose(pMeta->db);
|
tdbClose(pMeta->db);
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
|
|
|
@ -176,6 +176,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
|
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
|
#if 0
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
|
|
||||||
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
|
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
|
||||||
|
@ -224,10 +225,12 @@ int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
FAIL:
|
FAIL:
|
||||||
if (buf) taosMemoryFree(buf);
|
if (buf) taosMemoryFree(buf);
|
||||||
return -1;
|
return -1;
|
||||||
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
|
int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
|
#if 0
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int32_t vLen = 0;
|
int32_t vLen = 0;
|
||||||
if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
|
if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
|
||||||
|
@ -241,7 +244,7 @@ int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
|
|
||||||
pTask->nextCheckId = aggCheckpoint.checkpointId + 1;
|
pTask->nextCheckId = aggCheckpoint.checkpointId + 1;
|
||||||
pTask->checkpointInfo = aggCheckpoint.checkpointVer;
|
pTask->checkpointInfo = aggCheckpoint.checkpointVer;
|
||||||
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,187 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "executor.h"
|
||||||
|
#include "streamInc.h"
|
||||||
|
#include "ttimer.h"
|
||||||
|
|
||||||
|
SStreamState* streamStateOpen(char* path, SStreamTask* pTask) {
|
||||||
|
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||||
|
if (pState == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
char statePath[200];
|
||||||
|
sprintf(statePath, "%s/%d", path, pTask->taskId);
|
||||||
|
if (tdbOpen(statePath, 16 * 1024, 1, &pState->db) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// open state storage backend
|
||||||
|
if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pState->db, &pState->pStateDb) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
pState->pOwner = pTask;
|
||||||
|
|
||||||
|
return pState;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
if (pState->pStateDb) tdbTbClose(pState->pStateDb);
|
||||||
|
if (pState->db) tdbClose(pState->db);
|
||||||
|
taosMemoryFree(pState);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamStateClose(SStreamState* pState) {
|
||||||
|
tdbCommit(pState->db, &pState->txn);
|
||||||
|
tdbTbClose(pState->pStateDb);
|
||||||
|
tdbClose(pState->db);
|
||||||
|
|
||||||
|
taosMemoryFree(pState);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateBegin(SStreamState* pState) {
|
||||||
|
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
|
||||||
|
0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbBegin(pState->db, &pState->txn) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateCommit(SStreamState* pState) {
|
||||||
|
if (tdbCommit(pState->db, &pState->txn) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
memset(&pState->txn, 0, sizeof(TXN));
|
||||||
|
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
|
||||||
|
0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (tdbBegin(pState->db, &pState->txn) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateAbort(SStreamState* pState) {
|
||||||
|
if (tdbAbort(pState->db, &pState->txn) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
memset(&pState->txn, 0, sizeof(TXN));
|
||||||
|
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
|
||||||
|
0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (tdbBegin(pState->db, &pState->txn) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen) {
|
||||||
|
return tdbTbUpsert(pState->pStateDb, key, kLen, value, vLen, &pState->txn);
|
||||||
|
}
|
||||||
|
int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen) {
|
||||||
|
return tdbTbGet(pState->pStateDb, key, kLen, pVal, pVLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen) {
|
||||||
|
return tdbTbDelete(pState->pStateDb, key, kLen, &pState->txn);
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen) {
|
||||||
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
|
if (pCur == NULL) return NULL;
|
||||||
|
tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL);
|
||||||
|
|
||||||
|
int32_t c;
|
||||||
|
tdbTbcMoveTo(pCur->pCur, key, kLen, &c);
|
||||||
|
if (c != 0) {
|
||||||
|
taosMemoryFree(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen) {
|
||||||
|
return tdbTbcGet(pCur->pCur, (const void**)pKey, pKLen, (const void**)pVal, pVLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
|
||||||
|
//
|
||||||
|
return tdbTbcMoveToFirst(pCur->pCur);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
|
||||||
|
//
|
||||||
|
return tdbTbcMoveToLast(pCur->pCur);
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen) {
|
||||||
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
|
if (pCur == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t c;
|
||||||
|
if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) {
|
||||||
|
taosMemoryFree(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (c > 0) return pCur;
|
||||||
|
|
||||||
|
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
|
||||||
|
taosMemoryFree(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pCur;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen) {
|
||||||
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
|
if (pCur == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t c;
|
||||||
|
if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) {
|
||||||
|
taosMemoryFree(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (c < 0) return pCur;
|
||||||
|
|
||||||
|
if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
|
||||||
|
taosMemoryFree(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pCur;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
|
||||||
|
//
|
||||||
|
return tdbTbcMoveToNext(pCur->pCur);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
|
||||||
|
//
|
||||||
|
return tdbTbcMoveToPrev(pCur->pCur);
|
||||||
|
}
|
|
@ -165,5 +165,8 @@ void tFreeSStreamTask(SStreamTask* pTask) {
|
||||||
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pTask->pState) streamStateClose(pTask->pState);
|
||||||
|
|
||||||
taosMemoryFree(pTask);
|
taosMemoryFree(pTask);
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,7 +121,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
|
||||||
if (found == NULL) {
|
if (found == NULL) {
|
||||||
// file corrupted, no complete log
|
// file corrupted, no complete log
|
||||||
// TODO delete and search in previous files
|
// TODO delete and search in previous files
|
||||||
ASSERT(0);
|
/*ASSERT(0);*/
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -221,7 +221,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
|
|
||||||
int code = walSaveMeta(pWal);
|
int code = walSaveMeta(pWal);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
taosArrayDestroy(actualLog);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue