Merge branch '3.0' of github.com:taosdata/TDengine into szhou/fixbugs
This commit is contained in:
commit
3f3c1b9e8d
|
@ -1,59 +1,6 @@
|
|||
import taos
|
||||
from taos.tmq import *
|
||||
|
||||
conn = taos.connect()
|
||||
|
||||
# create database
|
||||
conn.execute("drop database if exists py_tmq")
|
||||
conn.execute("create database if not exists py_tmq vgroups 2")
|
||||
|
||||
# create table and stables
|
||||
conn.select_db("py_tmq")
|
||||
conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
|
||||
conn.execute("create table if not exists tb1 using stb1 tags(1)")
|
||||
conn.execute("create table if not exists tb2 using stb1 tags(2)")
|
||||
conn.execute("create table if not exists tb3 using stb1 tags(3)")
|
||||
|
||||
# create topic
|
||||
conn.execute("drop topic if exists topic_ctb_column")
|
||||
conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
|
||||
|
||||
# set consumer configure options
|
||||
conf = TaosTmqConf()
|
||||
conf.set("group.id", "tg2")
|
||||
conf.set("td.connect.user", "root")
|
||||
conf.set("td.connect.pass", "taosdata")
|
||||
conf.set("enable.auto.commit", "true")
|
||||
conf.set("msg.with.table.name", "true")
|
||||
|
||||
def tmq_commit_cb_print(tmq, resp, offset, param=None):
|
||||
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
|
||||
|
||||
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
|
||||
|
||||
# build consumer
|
||||
tmq = conf.new_consumer()
|
||||
|
||||
# build topic list
|
||||
topic_list = TaosTmqList()
|
||||
topic_list.append("topic_ctb_column")
|
||||
|
||||
# subscribe consumer
|
||||
tmq.subscribe(topic_list)
|
||||
|
||||
# check subscriptions
|
||||
sub_list = tmq.subscription()
|
||||
print("subscribed topics: ",sub_list)
|
||||
|
||||
# start subscribe
|
||||
while 1:
|
||||
res = tmq.poll(1000)
|
||||
if res:
|
||||
topic = res.get_topic_name()
|
||||
vg = res.get_vgroup_id()
|
||||
db = res.get_db_name()
|
||||
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
|
||||
for row in res:
|
||||
print(row)
|
||||
tb = res.get_table_name()
|
||||
print(f"from table: {tb}")
|
||||
from taos.tmq import TaosConsumer
|
||||
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
|
||||
for msg in consumer:
|
||||
for row in msg:
|
||||
print(row)
|
||||
|
|
|
@ -2,11 +2,8 @@
|
|||
sidebar_label: Docker
|
||||
title: 通过 Docker 快速体验 TDengine
|
||||
---
|
||||
:::info
|
||||
如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考[TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
|
||||
:::
|
||||
|
||||
本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。
|
||||
本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用[安装包的方式快速体验](../../get-started/package/)。如果您希望为 TDengine 贡献代码或对内部技术实现感兴趣,请参考 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
|
||||
|
||||
## 启动 TDengine
|
||||
|
||||
|
|
|
@ -5,17 +5,70 @@ title: 使用安装包立即开始
|
|||
|
||||
import Tabs from "@theme/Tabs";
|
||||
import TabItem from "@theme/TabItem";
|
||||
import PkgListV3 from "/components/PkgListV3";
|
||||
|
||||
:::info
|
||||
如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
|
||||
|
||||
:::
|
||||
|
||||
在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。
|
||||
在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用Docker立即体验](../../get-started/docker/)。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
|
||||
|
||||
## 安装
|
||||
|
||||
:::info
|
||||
下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases)
|
||||
:::
|
||||
|
||||
<Tabs>
|
||||
<TabItem label="Deb 安装" value="debinst">
|
||||
|
||||
1. 从列表中下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb;
|
||||
<PkgListV3 type={6}/>
|
||||
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.deb 安装包所在目录,执行如下的安装命令:
|
||||
|
||||
```bash
|
||||
sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="RPM 安装" value="rpminst">
|
||||
|
||||
1. 从列表中下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm;
|
||||
<PkgListV3 type={5}/>
|
||||
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.rpm 安装包所在目录,执行如下的安装命令:
|
||||
|
||||
```bash
|
||||
sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="tar.gz 安装" value="tarinst">
|
||||
|
||||
1. 从列表中下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz;
|
||||
<PkgListV3 type={0}/>
|
||||
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本:
|
||||
|
||||
```bash
|
||||
tar -zxvf TDengine-server-3.0.0.0-Linux-x64.tar.gz
|
||||
```
|
||||
|
||||
解压后进入相应路径,执行
|
||||
|
||||
```bash
|
||||
sudo ./install.sh
|
||||
```
|
||||
|
||||
:::info
|
||||
install.sh 安装脚本在执行过程中,会通过命令行交互界面询问一些配置信息。如果希望采取无交互安装方式,那么可以用 -e no 参数来执行 install.sh 脚本。运行 `./install.sh -h` 指令可以查看所有参数的详细说明信息。
|
||||
:::
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Windows 安装" value="windows">
|
||||
|
||||
1. 从列表中下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe;
|
||||
<PkgListV3 type={3}/>
|
||||
2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="apt-get" label="apt-get">
|
||||
可以使用 apt-get 工具从官方仓库安装。
|
||||
|
||||
|
@ -44,56 +97,6 @@ sudo apt-get install tdengine
|
|||
:::tip
|
||||
apt-get 方式只适用于 Debian 或 Ubuntu 系统
|
||||
::::
|
||||
</TabItem>
|
||||
<TabItem label="Deb 安装" value="debinst">
|
||||
|
||||
1. 从 [发布历史页面](../../releases) 下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb;
|
||||
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.deb 安装包所在目录,执行如下的安装命令:
|
||||
|
||||
```bash
|
||||
sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="RPM 安装" value="rpminst">
|
||||
|
||||
1. 从 [发布历史页面](../../releases) 下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm;
|
||||
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.rpm 安装包所在目录,执行如下的安装命令:
|
||||
|
||||
```bash
|
||||
sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="tar.gz 安装" value="tarinst">
|
||||
|
||||
1. 从 [发布历史页面](../../releases) 下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz;
|
||||
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本:
|
||||
|
||||
```bash
|
||||
tar -zxvf TDengine-server-3.0.0.0-Linux-x64.tar.gz
|
||||
```
|
||||
|
||||
解压后进入相应路径,执行
|
||||
|
||||
```bash
|
||||
sudo ./install.sh
|
||||
```
|
||||
|
||||
:::info
|
||||
install.sh 安装脚本在执行过程中,会通过命令行交互界面询问一些配置信息。如果希望采取无交互安装方式,那么可以用 -e no 参数来执行 install.sh 脚本。运行 `./install.sh -h` 指令可以查看所有参数的详细说明信息。
|
||||
|
||||
:::
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Windows 安装" value="windows">
|
||||
|
||||
1. 从 [发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe;
|
||||
2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
|
|
|
@ -88,6 +88,72 @@ void close() throws SQLException;
|
|||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
```python
|
||||
class TaosConsumer():
|
||||
def __init__(self, *topics, **configs)
|
||||
|
||||
def __iter__(self)
|
||||
|
||||
def __next__(self)
|
||||
|
||||
def sync_next(self)
|
||||
|
||||
def subscription(self)
|
||||
|
||||
def unsubscribe(self)
|
||||
|
||||
def close(self)
|
||||
|
||||
def __del__(self)
|
||||
```
|
||||
</TabItem>
|
||||
<TabItem label="Go" value="Go">
|
||||
|
||||
```go
|
||||
func NewConsumer(conf *Config) (*Consumer, error)
|
||||
|
||||
func (c *Consumer) Close() error
|
||||
|
||||
func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
|
||||
|
||||
func (c *Consumer) FreeMessage(message unsafe.Pointer)
|
||||
|
||||
func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
|
||||
|
||||
func (c *Consumer) Subscribe(topics []string) error
|
||||
|
||||
func (c *Consumer) Unsubscribe() error
|
||||
```
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="C#" label="C#">
|
||||
|
||||
```C#
|
||||
ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
|
||||
|
||||
virtual IConsumer Build()
|
||||
|
||||
Consumer(ConsumerBuilder builder)
|
||||
|
||||
void Subscribe(IEnumerable<string> topics)
|
||||
|
||||
void Subscribe(string topic)
|
||||
|
||||
ConsumeResult Consume(int millisecondsTimeout)
|
||||
|
||||
List<string> Subscription()
|
||||
|
||||
void Unsubscribe()
|
||||
|
||||
void Commit(ConsumeResult consumerResult)
|
||||
|
||||
void Close()
|
||||
|
||||
```
|
||||
</TabItem>
|
||||
|
||||
</Tabs>
|
||||
|
||||
## 写入数据
|
||||
|
@ -230,6 +296,105 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
|
|||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
|
||||
Python 使用以下配置项创建一个 Consumer 实例。
|
||||
|
||||
| 参数名称 | 类型 | 参数说明 | 备注 |
|
||||
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
|
||||
| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
|
||||
| `client_id` | string | 客户端 ID | 最大长度:192。 |
|
||||
| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
|
||||
| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 |
|
||||
| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | |
|
||||
| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
|
||||
| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` |
|
||||
| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` |
|
||||
| `timeout` | int | 消费者拉去的超时时间 | |
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Go" value="Go">
|
||||
|
||||
```go
|
||||
config := tmq.NewConfig()
|
||||
defer config.Destroy()
|
||||
err = config.SetGroupID("test")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetAutoOffsetReset("earliest")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetConnectIP("127.0.0.1")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetConnectUser("root")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetConnectPass("taosdata")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetConnectPort("6030")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetMsgWithTableName(true)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.EnableHeartBeat()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
|
||||
if result.ErrCode != 0 {
|
||||
errStr := wrapper.TMQErr2Str(result.ErrCode)
|
||||
err := errors.NewError(int(result.ErrCode), errStr)
|
||||
panic(err)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="C#" label="C#">
|
||||
|
||||
```C#
|
||||
using TDengineTMQ;
|
||||
|
||||
// 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、
|
||||
// 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数
|
||||
var cfg = new ConsumerConfig
|
||||
{
|
||||
EnableAutoCommit = "true"
|
||||
AutoCommitIntervalMs = "1000"
|
||||
GourpId = "TDengine-TMQ-C#",
|
||||
TDConnectUser = "root",
|
||||
TDConnectPasswd = "taosdata",
|
||||
AutoOffsetReset = "earliest"
|
||||
MsgWithTableName = "true",
|
||||
TDConnectIp = "127.0.0.1",
|
||||
TDConnectPort = "6030"
|
||||
};
|
||||
|
||||
var consumer = new ConsumerBuilder(cfg).Build();
|
||||
|
||||
```
|
||||
</TabItem>
|
||||
|
||||
</Tabs>
|
||||
|
||||
上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。
|
||||
|
@ -261,6 +426,42 @@ consumer.subscribe(topics);
|
|||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="Go" label="Go">
|
||||
|
||||
```go
|
||||
consumer, err := tmq.NewConsumer(config)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = consumer.Subscribe([]string{"example_tmq_topic"})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
|
||||
<TabItem value="C#" label="C#">
|
||||
|
||||
```C#
|
||||
// 创建订阅 topics 列表
|
||||
List<String> topics = new List<string>();
|
||||
topics.add("tmq_topic");
|
||||
// 启动订阅
|
||||
consumer.Subscribe(topics);
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
```python
|
||||
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
|
||||
```
|
||||
</TabItem>
|
||||
|
||||
|
||||
|
||||
</Tabs>
|
||||
|
||||
|
@ -293,9 +494,50 @@ while(running){
|
|||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
```python
|
||||
for msg in consumer:
|
||||
for row in msg:
|
||||
print(row)
|
||||
```
|
||||
</TabItem>
|
||||
|
||||
|
||||
<TabItem value="Go" label="Go">
|
||||
|
||||
```go
|
||||
for {
|
||||
result, err := consumer.Poll(time.Second)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
fmt.Println(result)
|
||||
consumer.Commit(context.Background(), result.Message)
|
||||
consumer.FreeMessage(result.Message)
|
||||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="C#" label="C#">
|
||||
|
||||
```C#
|
||||
// 消费数据
|
||||
while (true)
|
||||
{
|
||||
var consumerRes = consumer.Consume(100);
|
||||
// process ConsumeResult
|
||||
ProcessMsg(consumerRes);
|
||||
consumer.Commit(consumerRes);
|
||||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
|
||||
## 结束消费
|
||||
|
||||
消费结束后,应当取消订阅。
|
||||
|
@ -323,6 +565,35 @@ consumer.close();
|
|||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
|
||||
```python
|
||||
/* 取消订阅 */
|
||||
consumer.unsubscribe();
|
||||
|
||||
/* 关闭消费 */
|
||||
consumer.close();
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Go" label="Go">
|
||||
|
||||
```go
|
||||
consumer.Close()
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="C#" label="C#">
|
||||
|
||||
```C#
|
||||
// 取消订阅
|
||||
consumer.Unsubscribe();
|
||||
|
||||
// 关闭消费
|
||||
consumer.Close();
|
||||
</TabItem>
|
||||
|
||||
</Tabs>
|
||||
|
||||
## 删除 *topic*
|
||||
|
@ -670,65 +941,15 @@ int main(int argc, char* argv[]) {
|
|||
<TabItem label="Python" value="Python">
|
||||
|
||||
```python
|
||||
import taos
|
||||
from taos.tmq import TaosConsumer
|
||||
|
||||
import taos
|
||||
from taos.tmq import *
|
||||
|
||||
conn = taos.connect()
|
||||
|
||||
# create database
|
||||
conn.execute("drop database if exists py_tmq")
|
||||
conn.execute("create database if not exists py_tmq vgroups 2")
|
||||
|
||||
# create table and stables
|
||||
conn.select_db("py_tmq")
|
||||
conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
|
||||
conn.execute("create table if not exists tb1 using stb1 tags(1)")
|
||||
conn.execute("create table if not exists tb2 using stb1 tags(2)")
|
||||
conn.execute("create table if not exists tb3 using stb1 tags(3)")
|
||||
|
||||
# create topic
|
||||
conn.execute("drop topic if exists topic_ctb_column")
|
||||
conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
|
||||
|
||||
# set consumer configure options
|
||||
conf = TaosTmqConf()
|
||||
conf.set("group.id", "tg2")
|
||||
conf.set("td.connect.user", "root")
|
||||
conf.set("td.connect.pass", "taosdata")
|
||||
conf.set("enable.auto.commit", "true")
|
||||
conf.set("msg.with.table.name", "true")
|
||||
|
||||
def tmq_commit_cb_print(tmq, resp, offset, param=None):
|
||||
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
|
||||
|
||||
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
|
||||
|
||||
# build consumer
|
||||
tmq = conf.new_consumer()
|
||||
|
||||
# build topic list
|
||||
topic_list = TaosTmqList()
|
||||
topic_list.append("topic_ctb_column")
|
||||
|
||||
# subscribe consumer
|
||||
tmq.subscribe(topic_list)
|
||||
|
||||
# check subscriptions
|
||||
sub_list = tmq.subscription()
|
||||
print("subscribed topics: ",sub_list)
|
||||
|
||||
# start subscribe
|
||||
while 1:
|
||||
res = tmq.poll(1000)
|
||||
if res:
|
||||
topic = res.get_topic_name()
|
||||
vg = res.get_vgroup_id()
|
||||
db = res.get_db_name()
|
||||
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
|
||||
for row in res:
|
||||
print(row)
|
||||
tb = res.get_table_name()
|
||||
print(f"from table: {tb}")
|
||||
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
|
||||
for msg in consumer:
|
||||
for row in msg:
|
||||
print(row)
|
||||
|
||||
```
|
||||
|
||||
|
@ -745,3 +966,5 @@ while 1:
|
|||
</TabItem>
|
||||
|
||||
</Tabs>
|
||||
|
||||
|
||||
|
|
|
@ -6,9 +6,9 @@ description: "TDengine 3.0 版本的语法变更说明"
|
|||
|
||||
## SQL 基本元素变更
|
||||
|
||||
| # | **元素** | **差异性** | **说明** |
|
||||
| - | :------- | :--------: | :------- |
|
||||
| 1 | VARCHAR | <div style="width: 40pt">新增</div> | BINARY类型的别名。
|
||||
| # | **元素** | **<div style={{width: 60}}>差异性</div>** | **说明** |
|
||||
| - | :------- | :-------- | :------- |
|
||||
| 1 | VARCHAR | 新增 | BINARY类型的别名。
|
||||
| 2 | TIMESTAMP字面量 | 新增 | 新增支持 TIMESTAMP 'timestamp format' 语法。
|
||||
| 3 | _ROWTS伪列 | 新增 | 表示时间戳主键。是_C0伪列的别名。
|
||||
| 4 | INFORMATION_SCHEMA | 新增 | 包含各种SCHEMA定义的系统数据库。
|
||||
|
@ -22,9 +22,9 @@ description: "TDengine 3.0 版本的语法变更说明"
|
|||
|
||||
在 TDengine 中,普通表的数据模型中可使用以下数据类型。
|
||||
|
||||
| # | **语句** | **差异性** | **说明** |
|
||||
| - | :------- | :--------: | :------- |
|
||||
| 1 | ALTER ACCOUNT | <div style="width: 40pt">废除</div> | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。
|
||||
| # | **语句** | **<div style={{width: 60}}>差异性</div>** | **说明** |
|
||||
| - | :------- | :-------- | :------- |
|
||||
| 1 | ALTER ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。
|
||||
| 2 | ALTER ALL DNODES | 新增 | 修改所有DNODE的参数。
|
||||
| 3 | ALTER DATABASE | 调整 | 废除<ul><li>QUORUM:写入需要的副本确认数。3.0版本使用STRICT来指定强一致还是弱一致。3.0.0版本STRICT暂不支持修改。</li><li>BLOCKS:VNODE使用的内存块数。3.0版本使用BUFFER来表示VNODE写入内存池的大小。</li><li>UPDATE:更新操作的支持模式。3.0版本所有数据库都支持部分列更新。</li><li>CACHELAST:缓存最新一行数据的模式。3.0版本用CACHEMODEL代替。</li><li>COMP:3.0版本暂不支持修改。<br/>新增</li><li>CACHEMODEL:表示是否在内存中缓存子表的最近数据。</li><li>CACHESIZE:表示缓存子表最近数据的内存大小。</li><li>WAL_FSYNC_PERIOD:代替原FSYNC参数。</li><li>WAL_LEVEL:代替原WAL参数。<br/>调整</li><li>REPLICA:3.0.0版本暂不支持修改。</li><li>KEEP:3.0版本新增支持带单位的设置方式。</li></ul>
|
||||
| 4 | ALTER STABLE | 调整 | 废除<ul><li>CHANGE TAG:修改标签列的名称。3.0版本使用RENAME TAG代替。<br/>新增</li><li>RENAME TAG:代替原CHANGE TAG子句。</li><li>COMMENT:修改超级表的注释。</li></ul>
|
||||
|
@ -80,9 +80,9 @@ description: "TDengine 3.0 版本的语法变更说明"
|
|||
|
||||
## SQL 函数变更
|
||||
|
||||
| # | **函数** | **差异性** | **说明** |
|
||||
| - | :------- | :--------: | :------- |
|
||||
| 1 | TWA | <div style="width: 40pt">增强</div> | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
|
||||
| # | **函数** | ** <div style={{width: 60}}>差异性</div> ** | **说明** |
|
||||
| - | :------- | :-------- | :------- |
|
||||
| 1 | TWA | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
|
||||
| 2 | IRATE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
|
||||
| 3 | LEASTSQUARES | 增强 | 可以用于超级表了。
|
||||
| 4 | ELAPSED | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
import PkgList from "/components/PkgList";
|
||||
import PkgListV3 from "/components/PkgListV3";
|
||||
|
||||
1. 下载客户端安装包
|
||||
|
||||
<PkgList type={1} sys="Linux" />
|
||||
<PkgListV3 type={1} sys="Linux" />
|
||||
|
||||
[所有下载](https://www.taosdata.com/cn/all-downloads/)
|
||||
[所有下载](../../releases)
|
||||
|
||||
2. 解压缩软件包
|
||||
|
||||
|
|
|
@ -1,11 +1,10 @@
|
|||
import PkgList from "/components/PkgList";
|
||||
import PkgListV3 from "/components/PkgListV3";
|
||||
|
||||
1. 下载客户端安装包
|
||||
|
||||
<PkgList type={1} sys="Windows" />
|
||||
|
||||
[所有下载](https://www.taosdata.com/cn/all-downloads/)
|
||||
<PkgListV3 type={4} sys="Windows" />
|
||||
|
||||
[所有下载](../../releases)
|
||||
2. 执行安装程序,按提示选择默认值,完成安装
|
||||
3. 安装路径
|
||||
|
||||
|
|
|
@ -93,12 +93,12 @@ Maven 项目中,在 pom.xml 中添加以下依赖:
|
|||
可以通过下载 TDengine 的源码,自己编译最新版本的 Java connector
|
||||
|
||||
```shell
|
||||
git clone https://github.com/taosdata/taos-connector-jdbc.git --branch 2.0
|
||||
git clone https://github.com/taosdata/taos-connector-jdbc.git
|
||||
cd taos-connector-jdbc
|
||||
mvn clean install -Dmaven.test.skip=true
|
||||
```
|
||||
|
||||
编译后,在 target 目录下会产生 taos-jdbcdriver-2.0.XX-dist.jar 的 jar 包,并自动将编译的 jar 文件放在本地的 Maven 仓库中。
|
||||
编译后,在 target 目录下会产生 taos-jdbcdriver-3.0.*-dist.jar 的 jar 包,并自动将编译的 jar 文件放在本地的 Maven 仓库中。
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
@ -198,7 +198,7 @@ url 中的配置参数如下:
|
|||
|
||||
- user:登录 TDengine 用户名,默认值 'root'。
|
||||
- password:用户登录密码,默认值 'taosdata'。
|
||||
- batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。从 taos-jdbcdriver-2.0.38 开始,JDBC REST 连接增加批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
|
||||
- batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。JDBC REST 连接支持批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
|
||||
- charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。
|
||||
- batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL 后的任何语句。默认值为:false。
|
||||
- httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 5000。
|
||||
|
@ -216,7 +216,7 @@ url 中的配置参数如下:
|
|||
INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6);
|
||||
```
|
||||
|
||||
- 从 taos-jdbcdriver-2.0.36 开始,如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行 sql:insert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6);
|
||||
- 如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行 sql:insert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6);
|
||||
|
||||
:::
|
||||
|
||||
|
@ -230,7 +230,7 @@ INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFra
|
|||
**注意**:
|
||||
|
||||
- 应用中设置的 client parameter 为进程级别的,即如果要更新 client 的参数,需要重启应用。这是因为 client parameter 是全局参数,仅在应用程序的第一次设置生效。
|
||||
- 以下示例代码基于 taos-jdbcdriver-2.0.36。
|
||||
- 以下示例代码基于 taos-jdbcdriver-3.0.0。
|
||||
|
||||
```java
|
||||
public Connection getConn() throws Exception{
|
||||
|
@ -367,7 +367,7 @@ TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据
|
|||
**注意**:
|
||||
|
||||
- JDBC REST 连接目前不支持参数绑定
|
||||
- 以下示例代码基于 taos-jdbcdriver-2.0.36
|
||||
- 以下示例代码基于 taos-jdbcdriver-3.0.0
|
||||
- binary 类型数据需要调用 setString 方法,nchar 类型数据需要调用 setNString 方法
|
||||
- setString 和 setNString 都要求用户在 size 参数里声明表定义中对应列的列宽
|
||||
|
||||
|
@ -635,7 +635,7 @@ TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协
|
|||
**注意**:
|
||||
|
||||
- JDBC REST 连接目前不支持无模式写入
|
||||
- 以下示例代码基于 taos-jdbcdriver-2.0.36
|
||||
- 以下示例代码基于 taos-jdbcdriver-3.0.0
|
||||
|
||||
```java
|
||||
public class SchemalessInsertTest {
|
||||
|
@ -666,7 +666,7 @@ public class SchemalessInsertTest {
|
|||
}
|
||||
```
|
||||
|
||||
### 订阅
|
||||
### 数据订阅
|
||||
|
||||
TDengine Java 连接器支持订阅功能,应用 API 如下:
|
||||
|
||||
|
@ -717,9 +717,14 @@ while(true) {
|
|||
#### 关闭订阅
|
||||
|
||||
```java
|
||||
// 取消订阅
|
||||
consumer.unsubscribe();
|
||||
// 关闭消费
|
||||
consumer.close()
|
||||
```
|
||||
|
||||
详情请参考:[数据订阅](../../../develop/tmq)
|
||||
|
||||
### 使用示例如下:
|
||||
|
||||
```java
|
||||
|
@ -734,7 +739,7 @@ public abstract class ConsumerLoop {
|
|||
config.setProperty("msg.with.table.name", "true");
|
||||
config.setProperty("enable.auto.commit", "true");
|
||||
config.setProperty("group.id", "group1");
|
||||
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ResultDeserializer");
|
||||
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
|
||||
|
||||
this.consumer = new TaosConsumer<>(config);
|
||||
this.topics = Collections.singletonList("topic_speed");
|
||||
|
@ -754,8 +759,9 @@ public abstract class ConsumerLoop {
|
|||
process(record);
|
||||
}
|
||||
}
|
||||
consumer.unsubscribe();
|
||||
} finally {
|
||||
consumer.close();
|
||||
consumer.close();
|
||||
shutdownLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
@ -875,6 +881,7 @@ public static void main(String[] args) throws Exception {
|
|||
|
||||
| taos-jdbcdriver 版本 | 主要变化 |
|
||||
| :------------------: | :----------------------------: |
|
||||
| 3.0.0 | 支持 TDengine 3.0 |
|
||||
| 2.0.39 - 2.0.40 | 增加 REST 连接/请求 超时设置 |
|
||||
| 2.0.38 | JDBC REST 连接增加批量拉取功能 |
|
||||
| 2.0.37 | 增加对 json tag 支持 |
|
||||
|
|
|
@ -25,33 +25,34 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
typedef struct SUpdateInfo {
|
||||
SArray *pTsBuckets;
|
||||
uint64_t numBuckets;
|
||||
SArray *pTsSBFs;
|
||||
uint64_t numSBFs;
|
||||
int64_t interval;
|
||||
int64_t watermark;
|
||||
TSKEY minTS;
|
||||
SScalableBf* pCloseWinSBF;
|
||||
SHashObj* pMap;
|
||||
STimeWindow scanWindow;
|
||||
uint64_t scanGroupId;
|
||||
uint64_t maxVersion;
|
||||
SArray *pTsBuckets;
|
||||
uint64_t numBuckets;
|
||||
SArray *pTsSBFs;
|
||||
uint64_t numSBFs;
|
||||
int64_t interval;
|
||||
int64_t watermark;
|
||||
TSKEY minTS;
|
||||
SScalableBf *pCloseWinSBF;
|
||||
SHashObj *pMap;
|
||||
STimeWindow scanWindow;
|
||||
uint64_t scanGroupId;
|
||||
uint64_t maxVersion;
|
||||
} SUpdateInfo;
|
||||
|
||||
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
|
||||
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark);
|
||||
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
|
||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
||||
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version);
|
||||
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version);
|
||||
void updateInfoDestroy(SUpdateInfo *pInfo);
|
||||
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
|
||||
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
|
||||
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
|
||||
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
|
||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
||||
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid);
|
||||
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
|
||||
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
|
||||
void updateInfoDestroy(SUpdateInfo *pInfo);
|
||||
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
|
||||
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
|
||||
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
|
||||
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* ifndef _TSTREAMUPDATE_H_ */
|
||||
#endif /* ifndef _TSTREAMUPDATE_H_ */
|
||||
|
|
|
@ -30,6 +30,7 @@ extern bool gRaftDetailLog;
|
|||
#define SYNC_SPEED_UP_HB_TIMER 400
|
||||
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
|
||||
#define SYNC_SLOW_DOWN_RANGE 100
|
||||
#define SYNC_MAX_READ_RANGE 10
|
||||
|
||||
#define SYNC_MAX_BATCH_SIZE 1
|
||||
#define SYNC_INDEX_BEGIN 0
|
||||
|
@ -210,9 +211,12 @@ void syncStop(int64_t rid);
|
|||
int32_t syncSetStandby(int64_t rid);
|
||||
ESyncState syncGetMyRole(int64_t rid);
|
||||
bool syncIsReady(int64_t rid);
|
||||
bool syncIsReadyForRead(int64_t rid);
|
||||
const char* syncGetMyRoleStr(int64_t rid);
|
||||
bool syncRestoreFinish(int64_t rid);
|
||||
SyncTerm syncGetMyTerm(int64_t rid);
|
||||
SyncIndex syncGetLastIndex(int64_t rid);
|
||||
SyncIndex syncGetCommitIndex(int64_t rid);
|
||||
SyncGroupId syncGetVgId(int64_t rid);
|
||||
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
|
|
|
@ -56,8 +56,8 @@ struct tmq_conf_t {
|
|||
int8_t autoCommit;
|
||||
int8_t resetOffset;
|
||||
int8_t withTbName;
|
||||
int8_t ssEnable;
|
||||
int32_t ssBatchSize;
|
||||
int8_t snapEnable;
|
||||
int32_t snapBatchSize;
|
||||
|
||||
bool hbBgEnable;
|
||||
|
||||
|
@ -287,16 +287,21 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
|
||||
if (strcmp(key, "experimental.snapshot.enable") == 0) {
|
||||
if (strcmp(value, "true") == 0) {
|
||||
conf->ssEnable = true;
|
||||
conf->snapEnable = true;
|
||||
return TMQ_CONF_OK;
|
||||
} else if (strcmp(value, "false") == 0) {
|
||||
conf->ssEnable = false;
|
||||
conf->snapEnable = false;
|
||||
return TMQ_CONF_OK;
|
||||
} else {
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
}
|
||||
|
||||
if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
|
||||
conf->snapBatchSize = atoi(value);
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
if (strcmp(key, "enable.heartbeat.background") == 0) {
|
||||
if (strcmp(value, "true") == 0) {
|
||||
conf->hbBgEnable = true;
|
||||
|
@ -310,11 +315,6 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
|
||||
conf->ssBatchSize = atoi(value);
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
if (strcmp(key, "td.connect.ip") == 0) {
|
||||
conf->ip = strdup(value);
|
||||
return TMQ_CONF_OK;
|
||||
|
@ -889,7 +889,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
strcpy(pTmq->clientId, conf->clientId);
|
||||
strcpy(pTmq->groupId, conf->groupId);
|
||||
pTmq->withTbName = conf->withTbName;
|
||||
pTmq->useSnapshot = conf->ssEnable;
|
||||
pTmq->useSnapshot = conf->snapEnable;
|
||||
pTmq->autoCommit = conf->autoCommit;
|
||||
pTmq->autoCommitInterval = conf->autoCommitInterval;
|
||||
pTmq->commitCb = conf->commitCb;
|
||||
|
|
|
@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode);
|
|||
void vnodeSyncClose(SVnode* pVnode);
|
||||
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
|
||||
bool vnodeIsLeader(SVnode* pVnode);
|
||||
bool vnodeIsReadyForRead(SVnode* pVnode);
|
||||
bool vnodeIsRoleLeader(SVnode* pVnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
#include "tmsg.h"
|
||||
#include "tq.h"
|
||||
|
||||
int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock,
|
||||
int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock,
|
||||
SBatchDeleteReq* deleteReq) {
|
||||
ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT);
|
||||
int32_t totRow = pDataBlock->info.rows;
|
||||
|
@ -68,9 +68,10 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
|||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
int32_t padding1 = 0;
|
||||
void* padding2 = taosMemoryMalloc(1);
|
||||
void* padding2 = NULL;
|
||||
taosArrayPush(schemaReqSz, &padding1);
|
||||
taosArrayPush(schemaReqs, &padding2);
|
||||
continue;
|
||||
}
|
||||
|
||||
STagVal tagVal = {
|
||||
|
@ -138,8 +139,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
|||
continue;
|
||||
}
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
// TODO min
|
||||
int32_t rowSize = pDataBlock->info.rowSize;
|
||||
/*int32_t rowSize = pDataBlock->info.rowSize;*/
|
||||
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
|
||||
|
||||
int32_t schemaLen = 0;
|
||||
|
@ -150,7 +150,6 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
|||
}
|
||||
|
||||
// assign data
|
||||
// TODO
|
||||
ret = rpcMallocCont(cap);
|
||||
ret->header.vgId = pVnode->config.vgId;
|
||||
ret->length = sizeof(SSubmitReq);
|
||||
|
@ -161,13 +160,12 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
|||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
pDeleteReq->suid = suid;
|
||||
tdBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
|
||||
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
|
||||
continue;
|
||||
}
|
||||
|
||||
blkHead->numOfRows = htonl(pDataBlock->info.rows);
|
||||
blkHead->sversion = htonl(pTSchema->version);
|
||||
// TODO
|
||||
blkHead->suid = htobe64(suid);
|
||||
// uid is assigned by vnode
|
||||
blkHead->uid = 0;
|
||||
|
|
|
@ -196,9 +196,9 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
|||
tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey);
|
||||
}
|
||||
|
||||
tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
||||
" since %s",
|
||||
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
|
||||
tsdbInfo("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
||||
" since %s",
|
||||
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
|
||||
return code;
|
||||
|
||||
_err:
|
||||
|
|
|
@ -283,7 +283,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
vTrace("message in vnode query queue is processing");
|
||||
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
|
||||
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsReadyForRead(pVnode)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
@ -307,7 +307,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
|
||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||
!vnodeIsLeader(pVnode)) {
|
||||
!vnodeIsReadyForRead(pVnode)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -781,3 +781,17 @@ bool vnodeIsLeader(SVnode *pVnode) {
|
|||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool vnodeIsReadyForRead(SVnode *pVnode) {
|
||||
if (syncIsReady(pVnode->sync)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (syncIsReadyForRead(pVnode->sync)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId,
|
||||
syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync));
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -13,11 +13,11 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "executorimpl.h"
|
||||
#include "filter.h"
|
||||
#include "function.h"
|
||||
#include "functionMgt.h"
|
||||
#include "os.h"
|
||||
#include "querynodes.h"
|
||||
#include "systable.h"
|
||||
#include "tname.h"
|
||||
|
@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
|
|||
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
|
||||
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf,
|
||||
GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
|
||||
|
||||
if (p1 == NULL) {
|
||||
return NULL;
|
||||
|
@ -238,7 +238,7 @@ static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDat
|
|||
|
||||
// todo move to the initialization function
|
||||
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
|
||||
bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows);
|
||||
bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows);
|
||||
|
||||
filterFreeInfo(filter);
|
||||
return keep;
|
||||
|
@ -312,9 +312,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
|||
return TSDB_CODE_SUCCESS;
|
||||
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
||||
pCost->loadBlockStatis += 1;
|
||||
loadSMA = true; // mark the operation of load sma;
|
||||
loadSMA = true; // mark the operation of load sma;
|
||||
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
|
||||
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
||||
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
||||
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -453,7 +453,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
|
|||
colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
|
||||
} else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
|
||||
colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
|
||||
} else { // todo opt for json tag
|
||||
} else { // todo opt for json tag
|
||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
colDataAppend(pColInfoData, i, data, false);
|
||||
}
|
||||
|
@ -570,7 +570,10 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
|||
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
||||
qDebug("%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks due to query func required", GET_TASKID(pTaskInfo));
|
||||
qDebug(
|
||||
"%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks "
|
||||
"due to query func required",
|
||||
GET_TASKID(pTaskInfo));
|
||||
|
||||
// do prepare for the next round table scan operation
|
||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||
|
@ -1174,16 +1177,18 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
|
|||
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
|
||||
SResultRowInfo dumyInfo;
|
||||
dumyInfo.cur.pageId = -1;
|
||||
bool isClosed = false;
|
||||
bool isClosed = false;
|
||||
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||
if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
|
||||
win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
|
||||
isClosed = isCloseWindow(&win, &pInfo->twAggSup);
|
||||
}
|
||||
bool inserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid);
|
||||
// must check update info first.
|
||||
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
|
||||
if ((update || (isSignleIntervalWindow(pInfo) && isClosed &&
|
||||
isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup))) && out) {
|
||||
bool closedWin = isClosed && inserted && isSignleIntervalWindow(pInfo) &&
|
||||
isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup);
|
||||
if ((update || closedWin) && out) {
|
||||
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
|
||||
}
|
||||
}
|
||||
|
@ -1390,8 +1395,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||
if (pSDB) {
|
||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader);
|
||||
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId,version);
|
||||
uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader);
|
||||
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId, version);
|
||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||
checkUpdateData(pInfo, true, pSDB, false);
|
||||
return pSDB;
|
||||
|
@ -1445,7 +1450,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
|
||||
setBlockIntoRes(pInfo, &block);
|
||||
|
||||
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, pInfo->pRes->info.version)) {
|
||||
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId,
|
||||
pInfo->pRes->info.version)) {
|
||||
printDataBlock(pInfo->pRes, "stream scan ignore");
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
continue;
|
||||
|
@ -2248,7 +2254,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
|||
// build message and send to mnode to fetch the content of system tables.
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SSysTableScanInfo* pInfo = pOperator->info;
|
||||
char dbName[TSDB_DB_NAME_LEN] = {0};
|
||||
char dbName[TSDB_DB_NAME_LEN] = {0};
|
||||
|
||||
const char* name = tNameGetTableName(&pInfo->name);
|
||||
if (pInfo->showRewrite) {
|
||||
|
@ -2260,8 +2266,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
|||
return sysTableScanUserTables(pOperator);
|
||||
} else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
|
||||
return sysTableScanUserTags(pOperator);
|
||||
} else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 &&
|
||||
pInfo->showRewrite && IS_SYS_DBNAME(dbName)) {
|
||||
} else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && pInfo->showRewrite &&
|
||||
IS_SYS_DBNAME(dbName)) {
|
||||
return sysTableScanUserSTables(pOperator);
|
||||
} else { // load the meta from mnode of the given epset
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -2541,7 +2547,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||
|
||||
taosArrayDestroy(pInfo->pColMatchInfo);
|
||||
|
||||
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
|
@ -2597,7 +2603,6 @@ _error:
|
|||
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
|
||||
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
|
||||
const char* idStr) {
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
|
||||
|
@ -2606,7 +2611,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
|||
}
|
||||
|
||||
int64_t st1 = taosGetTimestampUs();
|
||||
qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1-st)/1000.0, idStr);
|
||||
qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr);
|
||||
|
||||
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
|
||||
qDebug("no table qualified for query, %s" PRIx64, idStr);
|
||||
|
@ -2620,7 +2625,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
|||
}
|
||||
|
||||
int64_t st2 = taosGetTimestampUs();
|
||||
qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2-st1)/1000.0, idStr);
|
||||
qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -4918,6 +4918,16 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx) {
|
|||
return numOfElems;
|
||||
}
|
||||
|
||||
static SSampleInfo* getSampleOutputInfo(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
pInfo->data = (char*)pInfo + sizeof(SSampleInfo);
|
||||
pInfo->tuplePos = (STuplePos*)((char*)pInfo + sizeof(SSampleInfo) + pInfo->samples * pInfo->colBytes);
|
||||
|
||||
return pInfo;
|
||||
}
|
||||
|
||||
bool getSampleFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||
SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
|
||||
|
@ -4972,7 +4982,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
|
|||
|
||||
int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
SSampleInfo* pInfo = getSampleOutputInfo(pCtx);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
||||
|
@ -4998,7 +5008,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
|||
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
SSampleInfo* pInfo = getSampleOutputInfo(pCtx);
|
||||
pEntryInfo->complete = true;
|
||||
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
|
|
|
@ -1399,7 +1399,7 @@ static int32_t translateTimelineFunc(STranslateContext* pCxt, SFunctionNode* pFu
|
|||
"%s function must be used in select statements", pFunc->functionName);
|
||||
}
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
|
||||
if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) &&
|
||||
if (NULL != pSelect->pFromTable && QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) &&
|
||||
!isTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
|
||||
"%s function requires valid time series input", pFunc->functionName);
|
||||
|
@ -2037,16 +2037,13 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName,
|
|||
code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code &&
|
||||
0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
|
||||
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) &&
|
||||
isSelectStmt(pCxt->pCurrStmt) &&
|
||||
if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
|
||||
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) && isSelectStmt(pCxt->pCurrStmt) &&
|
||||
0 == taosArrayGetSize(vgroupList)) {
|
||||
((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code &&
|
||||
0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
|
||||
if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
|
||||
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) {
|
||||
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
|
||||
}
|
||||
|
|
|
@ -13,33 +13,31 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tstreamUpdate.h"
|
||||
#include "tencode.h"
|
||||
#include "ttime.h"
|
||||
#include "query.h"
|
||||
#include "tencode.h"
|
||||
#include "tstreamUpdate.h"
|
||||
#include "ttime.h"
|
||||
|
||||
#define DEFAULT_FALSE_POSITIVE 0.01
|
||||
#define DEFAULT_BUCKET_SIZE 1310720
|
||||
#define DEFAULT_MAP_CAPACITY 1310720
|
||||
#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10)
|
||||
#define ROWS_PER_MILLISECOND 1
|
||||
#define MAX_NUM_SCALABLE_BF 100000
|
||||
#define MIN_NUM_SCALABLE_BF 10
|
||||
#define DEFAULT_PREADD_BUCKET 1
|
||||
#define MAX_INTERVAL MILLISECOND_PER_MINUTE
|
||||
#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10)
|
||||
#define DEFAULT_EXPECTED_ENTRIES 10000
|
||||
#define DEFAULT_FALSE_POSITIVE 0.01
|
||||
#define DEFAULT_BUCKET_SIZE 1310720
|
||||
#define DEFAULT_MAP_CAPACITY 1310720
|
||||
#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10)
|
||||
#define ROWS_PER_MILLISECOND 1
|
||||
#define MAX_NUM_SCALABLE_BF 100000
|
||||
#define MIN_NUM_SCALABLE_BF 10
|
||||
#define DEFAULT_PREADD_BUCKET 1
|
||||
#define MAX_INTERVAL MILLISECOND_PER_MINUTE
|
||||
#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10)
|
||||
#define DEFAULT_EXPECTED_ENTRIES 10000
|
||||
|
||||
static int64_t adjustExpEntries(int64_t entries) {
|
||||
return TMIN(DEFAULT_EXPECTED_ENTRIES, entries);
|
||||
}
|
||||
static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); }
|
||||
|
||||
static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
|
||||
if (pInfo->numSBFs < count) {
|
||||
count = pInfo->numSBFs;
|
||||
}
|
||||
for (uint64_t i = 0; i < count; ++i) {
|
||||
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
|
||||
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
|
||||
SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
|
||||
taosArrayPush(pInfo->pTsSBFs, &tsSBF);
|
||||
}
|
||||
|
@ -78,7 +76,7 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) {
|
|||
|
||||
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
|
||||
if (watermark <= adjInterval) {
|
||||
watermark = TMAX(originInt/adjInterval, 1) * adjInterval;
|
||||
watermark = TMAX(originInt / adjInterval, 1) * adjInterval;
|
||||
} else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
|
||||
watermark = MAX_NUM_SCALABLE_BF * adjInterval;
|
||||
}/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) {
|
||||
|
@ -158,11 +156,17 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
|
|||
return res;
|
||||
}
|
||||
|
||||
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) {
|
||||
void *pVal = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
|
||||
if (pVal || taosHashGetSize(pInfo->pMap) >= DEFAULT_MAP_SIZE) return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
||||
int32_t res = TSDB_CODE_FAILED;
|
||||
TSKEY* pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
|
||||
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
|
||||
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
|
||||
int32_t res = TSDB_CODE_FAILED;
|
||||
TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
|
||||
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
|
||||
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
|
||||
if (ts < maxTs - pInfo->watermark) {
|
||||
// this window has been closed.
|
||||
if (pInfo->pCloseWinSBF) {
|
||||
|
@ -178,42 +182,47 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
|||
}
|
||||
|
||||
int32_t size = taosHashGetSize(pInfo->pMap);
|
||||
if ( (!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) {
|
||||
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) {
|
||||
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));
|
||||
return false;
|
||||
}
|
||||
|
||||
if ( !pMapMaxTs && maxTs < ts ) {
|
||||
if (!pMapMaxTs && maxTs < ts) {
|
||||
taosArraySet(pInfo->pTsBuckets, index, &ts);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (ts < pInfo->minTS) {
|
||||
qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts);
|
||||
qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId,
|
||||
maxTs, *pMapMaxTs, ts);
|
||||
return true;
|
||||
} else if (res == TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts);
|
||||
qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64, tableId,
|
||||
maxTs, *pMapMaxTs, ts);
|
||||
// check from tsdb api
|
||||
return true;
|
||||
}
|
||||
|
||||
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) {
|
||||
qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version);
|
||||
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) {
|
||||
qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
|
||||
pWin->skey, pWin->ekey, version);
|
||||
pInfo->scanWindow = *pWin;
|
||||
pInfo->scanGroupId = groupId;
|
||||
pInfo->maxVersion = version;
|
||||
}
|
||||
|
||||
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) {
|
||||
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) {
|
||||
if (!pInfo) {
|
||||
return false;
|
||||
}
|
||||
qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version);
|
||||
if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey &&
|
||||
pWin->ekey <= pInfo->scanWindow.ekey && version <= pInfo->maxVersion ) {
|
||||
qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version);
|
||||
qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
|
||||
pWin->skey, pWin->ekey, version);
|
||||
if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && pWin->ekey <= pInfo->scanWindow.ekey &&
|
||||
version <= pInfo->maxVersion) {
|
||||
qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
|
||||
pWin->skey, pWin->ekey, version);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -261,7 +270,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
|
|||
int32_t size = taosArrayGetSize(pInfo->pTsBuckets);
|
||||
if (tEncodeI32(&encoder, size) < 0) return -1;
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i);
|
||||
TSKEY *pTs = (TSKEY *)taosArrayGet(pInfo->pTsBuckets, i);
|
||||
if (tEncodeI64(&encoder, *pTs) < 0) return -1;
|
||||
}
|
||||
|
||||
|
@ -270,7 +279,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
|
|||
int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs);
|
||||
if (tEncodeI32(&encoder, sBfSize) < 0) return -1;
|
||||
for (int32_t i = 0; i < sBfSize; i++) {
|
||||
SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i);
|
||||
SScalableBf *pSBf = taosArrayGetP(pInfo->pTsSBFs, i);
|
||||
if (tScalableBfEncode(pSBf, &encoder) < 0) return -1;
|
||||
}
|
||||
|
||||
|
@ -278,17 +287,17 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
|
|||
if (tEncodeI64(&encoder, pInfo->interval) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pInfo->watermark) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pInfo->minTS) < 0) return -1;
|
||||
|
||||
|
||||
if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) return -1;
|
||||
|
||||
int32_t mapSize = taosHashGetSize(pInfo->pMap);
|
||||
if (tEncodeI32(&encoder, mapSize) < 0) return -1;
|
||||
void* pIte = NULL;
|
||||
void *pIte = NULL;
|
||||
size_t keyLen = 0;
|
||||
while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) {
|
||||
void* key = taosHashGetKey(pIte, &keyLen);
|
||||
if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, *(TSKEY*)pIte) < 0) return -1;
|
||||
void *key = taosHashGetKey(pIte, &keyLen);
|
||||
if (tEncodeU64(&encoder, *(uint64_t *)key) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1;
|
||||
}
|
||||
|
||||
if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1;
|
||||
|
@ -311,7 +320,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
|
|||
|
||||
int32_t size = 0;
|
||||
if (tDecodeI32(&decoder, &size) < 0) return -1;
|
||||
pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
|
||||
pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
|
||||
TSKEY ts = INT64_MIN;
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
if (tDecodeI64(&decoder, &ts) < 0) return -1;
|
||||
|
@ -324,7 +333,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
|
|||
if (tDecodeI32(&decoder, &sBfSize) < 0) return -1;
|
||||
pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *));
|
||||
for (int32_t i = 0; i < sBfSize; i++) {
|
||||
SScalableBf* pSBf = tScalableBfDecode(&decoder);
|
||||
SScalableBf *pSBf = tScalableBfDecode(&decoder);
|
||||
if (!pSBf) return -1;
|
||||
taosArrayPush(pInfo->pTsSBFs, &pSBf);
|
||||
}
|
||||
|
@ -337,11 +346,11 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
|
|||
|
||||
int32_t mapSize = 0;
|
||||
if (tDecodeI32(&decoder, &mapSize) < 0) return -1;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
|
||||
pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK);
|
||||
uint64_t uid = 0;
|
||||
ts = INT64_MIN;
|
||||
for(int32_t i = 0; i < mapSize; i++) {
|
||||
for (int32_t i = 0; i < mapSize; i++) {
|
||||
if (tDecodeU64(&decoder, &uid) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &ts) < 0) return -1;
|
||||
taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY));
|
||||
|
|
|
@ -392,6 +392,29 @@ bool syncIsReady(int64_t rid) {
|
|||
return b;
|
||||
}
|
||||
|
||||
bool syncIsReadyForRead(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return false;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
|
||||
// TODO: last not noop?
|
||||
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||
bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && (pSyncNode->commitIndex >= lastIndex - SYNC_MAX_READ_RANGE);
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
|
||||
// if false, set error code
|
||||
if (false == b) {
|
||||
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
||||
terrno = TSDB_CODE_SYN_NOT_LEADER;
|
||||
} else {
|
||||
terrno = TSDB_CODE_APP_NOT_READY;
|
||||
}
|
||||
}
|
||||
return b;
|
||||
}
|
||||
|
||||
bool syncIsRestoreFinish(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
|
@ -519,6 +542,30 @@ SyncTerm syncGetMyTerm(int64_t rid) {
|
|||
return term;
|
||||
}
|
||||
|
||||
SyncIndex syncGetLastIndex(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return SYNC_INDEX_INVALID;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return lastIndex;
|
||||
}
|
||||
|
||||
SyncIndex syncGetCommitIndex(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return SYNC_INDEX_INVALID;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
SyncIndex cmtIndex = pSyncNode->commitIndex;
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return cmtIndex;
|
||||
}
|
||||
|
||||
SyncGroupId syncGetVgId(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
|
@ -828,6 +875,15 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
|||
pSyncNode->changing = true;
|
||||
}
|
||||
|
||||
// not restored, vnode enable
|
||||
if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
|
||||
ret = -1;
|
||||
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
|
||||
sError("vgId:%d, failed to sync propose since not ready, type:%s, last:%ld, cmt:%ld", pSyncNode->vgId,
|
||||
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
|
||||
goto _END;
|
||||
}
|
||||
|
||||
SRespStub stub;
|
||||
stub.createTime = taosGetTimestampMs();
|
||||
stub.rpcMsg = *pMsg;
|
||||
|
|
|
@ -237,8 +237,8 @@
|
|||
./test.sh -f tsim/stream/distributeInterval0.sim
|
||||
./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
|
||||
./test.sh -f tsim/stream/distributeSession0.sim
|
||||
#./test.sh -f tsim/stream/session0.sim
|
||||
#./test.sh -f tsim/stream/session1.sim
|
||||
./test.sh -f tsim/stream/session0.sim
|
||||
./test.sh -f tsim/stream/session1.sim
|
||||
./test.sh -f tsim/stream/state0.sim
|
||||
./test.sh -f tsim/stream/triggerInterval0.sim
|
||||
./test.sh -f tsim/stream/triggerSession0.sim
|
||||
|
|
|
@ -83,22 +83,22 @@ if $data11 != 3 then
|
|||
goto loop0
|
||||
endi
|
||||
|
||||
if $data12 != NULL then
|
||||
if $data12 != 10 then
|
||||
print ======data12=$data12
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data13 != NULL then
|
||||
if $data13 != 10 then
|
||||
print ======data13=$data13
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data14 != NULL then
|
||||
if $data14 != 1.100000000 then
|
||||
print ======data14=$data14
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data15 != NULL then
|
||||
if $data15 != 0.000000000 then
|
||||
print ======data15=$data15
|
||||
return -1
|
||||
endi
|
||||
|
@ -141,38 +141,38 @@ if $data01 != 7 then
|
|||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
if $data02 != 18 then
|
||||
print =====data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data03 != NULL then
|
||||
if $data03 != 4 then
|
||||
print =====data03=$data03
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data04 != NULL then
|
||||
print ======$data04
|
||||
if $data04 != 1.000000000 then
|
||||
print ======data04=$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data05 != NULL then
|
||||
print ======$data05
|
||||
if $data05 != 1.154700538 then
|
||||
print ======data05=$data05
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data06 != 4 then
|
||||
print ======$data06
|
||||
print ======data06=$data06
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data07 != 1.000000000 then
|
||||
print ======$data07
|
||||
print ======data07=$data07
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data08 != 13 then
|
||||
print ======$data08
|
||||
print ======data08=$data08
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
Loading…
Reference in New Issue