Merge branch '3.0' into docs/xiaolei/TD-18414-update-node-subscribe-doc
This commit is contained in:
commit
8b97a3f042
|
@ -5,12 +5,69 @@ title: 使用安装包立即开始
|
|||
|
||||
import Tabs from "@theme/Tabs";
|
||||
import TabItem from "@theme/TabItem";
|
||||
import PkgListV3 from "/components/PkgListV3";
|
||||
|
||||
在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用Docker立即体验](../../get-started/docker/)。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
|
||||
|
||||
## 安装
|
||||
|
||||
:::info
|
||||
下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases)
|
||||
:::
|
||||
|
||||
<Tabs>
|
||||
<TabItem label="Deb 安装" value="debinst">
|
||||
|
||||
1. 从 [发布历史页面](../../releases) 下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb;
|
||||
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.deb 安装包所在目录,执行如下的安装命令:
|
||||
|
||||
```bash
|
||||
sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="RPM 安装" value="rpminst">
|
||||
|
||||
1. 从 [发布历史页面](../../releases) 下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm;
|
||||
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.rpm 安装包所在目录,执行如下的安装命令:
|
||||
|
||||
```bash
|
||||
sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="tar.gz 安装" value="tarinst">
|
||||
|
||||
1. 从列表中下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz;
|
||||
<PkgListV3 type={0}/>
|
||||
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本:
|
||||
|
||||
```bash
|
||||
tar -zxvf TDengine-server-3.0.0.0-Linux-x64.tar.gz
|
||||
```
|
||||
|
||||
解压后进入相应路径,执行
|
||||
|
||||
```bash
|
||||
sudo ./install.sh
|
||||
```
|
||||
|
||||
:::info
|
||||
install.sh 安装脚本在执行过程中,会通过命令行交互界面询问一些配置信息。如果希望采取无交互安装方式,那么可以用 -e no 参数来执行 install.sh 脚本。运行 `./install.sh -h` 指令可以查看所有参数的详细说明信息。
|
||||
:::
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Windows 安装" value="windows">
|
||||
|
||||
<PkgListV3 type={3}/>
|
||||
|
||||
1. 从 [发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe;
|
||||
2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="apt-get" label="apt-get">
|
||||
可以使用 apt-get 工具从官方仓库安装。
|
||||
|
||||
|
@ -39,56 +96,6 @@ sudo apt-get install tdengine
|
|||
:::tip
|
||||
apt-get 方式只适用于 Debian 或 Ubuntu 系统
|
||||
::::
|
||||
</TabItem>
|
||||
<TabItem label="Deb 安装" value="debinst">
|
||||
|
||||
1. 从 [发布历史页面](../../releases) 下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb;
|
||||
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.deb 安装包所在目录,执行如下的安装命令:
|
||||
|
||||
```bash
|
||||
sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="RPM 安装" value="rpminst">
|
||||
|
||||
1. 从 [发布历史页面](../../releases) 下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm;
|
||||
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.rpm 安装包所在目录,执行如下的安装命令:
|
||||
|
||||
```bash
|
||||
sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="tar.gz 安装" value="tarinst">
|
||||
|
||||
1. 从 [发布历史页面](../../releases) 下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz;
|
||||
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本:
|
||||
|
||||
```bash
|
||||
tar -zxvf TDengine-server-3.0.0.0-Linux-x64.tar.gz
|
||||
```
|
||||
|
||||
解压后进入相应路径,执行
|
||||
|
||||
```bash
|
||||
sudo ./install.sh
|
||||
```
|
||||
|
||||
:::info
|
||||
install.sh 安装脚本在执行过程中,会通过命令行交互界面询问一些配置信息。如果希望采取无交互安装方式,那么可以用 -e no 参数来执行 install.sh 脚本。运行 `./install.sh -h` 指令可以查看所有参数的详细说明信息。
|
||||
|
||||
:::
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Windows 安装" value="windows">
|
||||
|
||||
1. 从 [发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe;
|
||||
2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
|
|
|
@ -88,6 +88,7 @@ void close() throws SQLException;
|
|||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="C#" label="C#">
|
||||
|
||||
```C#
|
||||
|
@ -110,8 +111,26 @@ void Unsubscribe()
|
|||
void Commit(ConsumeResult consumerResult)
|
||||
|
||||
void Close()
|
||||
```
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Go" value="Go">
|
||||
|
||||
```go
|
||||
func NewConsumer(conf *Config) (*Consumer, error)
|
||||
|
||||
func (c *Consumer) Close() error
|
||||
|
||||
func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
|
||||
|
||||
func (c *Consumer) FreeMessage(message unsafe.Pointer)
|
||||
|
||||
func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
|
||||
|
||||
func (c *Consumer) Subscribe(topics []string) error
|
||||
|
||||
func (c *Consumer) Unsubscribe() error
|
||||
|
||||
```
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
|
@ -255,6 +274,7 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
|
|||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="C#" label="C#">
|
||||
|
||||
```C#
|
||||
|
@ -276,6 +296,55 @@ var cfg = new ConsumerConfig
|
|||
};
|
||||
|
||||
var consumer = new ConsumerBuilder(cfg).Build();
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Go" value="Go">
|
||||
|
||||
```go
|
||||
config := tmq.NewConfig()
|
||||
defer config.Destroy()
|
||||
err = config.SetGroupID("test")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetAutoOffsetReset("earliest")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetConnectIP("127.0.0.1")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetConnectUser("root")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetConnectPass("taosdata")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetConnectPort("6030")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.SetMsgWithTableName(true)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.EnableHeartBeat()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
|
||||
if result.ErrCode != 0 {
|
||||
errStr := wrapper.TMQErr2Str(result.ErrCode)
|
||||
err := errors.NewError(int(result.ErrCode), errStr)
|
||||
panic(err)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
@ -309,6 +378,20 @@ topics.add("tmq_topic");
|
|||
consumer.subscribe(topics);
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="Go" label="Go">
|
||||
|
||||
```go
|
||||
consumer, err := tmq.NewConsumer(config)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = consumer.Subscribe([]string{"example_tmq_topic"})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="C#" label="C#">
|
||||
|
@ -354,6 +437,21 @@ while(running){
|
|||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="Go" label="Go">
|
||||
|
||||
```go
|
||||
for {
|
||||
result, err := consumer.Poll(time.Second)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
fmt.Println(result)
|
||||
consumer.Commit(context.Background(), result.Message)
|
||||
consumer.FreeMessage(result.Message)
|
||||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
|
@ -410,6 +508,13 @@ consumer.Unsubscribe();
|
|||
|
||||
// 关闭消费
|
||||
consumer.Close();
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Go" label="Go">
|
||||
|
||||
```go
|
||||
consumer.Close()
|
||||
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
|
|
@ -6,7 +6,7 @@ description: "TDengine 3.0 版本的语法变更说明"
|
|||
|
||||
## SQL 基本元素变更
|
||||
|
||||
| # | **元素** | ****<div style={{width: 100}}>差异性</div>**** | **说明** |
|
||||
| # | **元素** | **<div style={{width: 60}}>差异性</div>** | **说明** |
|
||||
| - | :------- | :-------- | :------- |
|
||||
| 1 | VARCHAR | 新增 | BINARY类型的别名。
|
||||
| 2 | TIMESTAMP字面量 | 新增 | 新增支持 TIMESTAMP 'timestamp format' 语法。
|
||||
|
@ -22,7 +22,7 @@ description: "TDengine 3.0 版本的语法变更说明"
|
|||
|
||||
在 TDengine 中,普通表的数据模型中可使用以下数据类型。
|
||||
|
||||
| # | **语句** | **<div style={{width: 100}}>差异性</div>** | **说明** |
|
||||
| # | **语句** | **<div style={{width: 60}}>差异性</div>** | **说明** |
|
||||
| - | :------- | :-------- | :------- |
|
||||
| 1 | ALTER ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。
|
||||
| 2 | ALTER ALL DNODES | 新增 | 修改所有DNODE的参数。
|
||||
|
@ -80,7 +80,7 @@ description: "TDengine 3.0 版本的语法变更说明"
|
|||
|
||||
## SQL 函数变更
|
||||
|
||||
| # | **函数** | **<div style={{width: 100}}>差异性</div>** | **说明** |
|
||||
| # | **函数** | ** <div style={{width: 60}}>差异性</div> ** | **说明** |
|
||||
| - | :------- | :-------- | :------- |
|
||||
| 1 | TWA | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
|
||||
| 2 | IRATE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import PkgList from "/components/PkgList";
|
||||
import PkgListV3 from "/components/PkgListV3";
|
||||
|
||||
1. 下载客户端安装包
|
||||
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
import PkgList from "/components/PkgList";
|
||||
import PkgListV3 from "/components/PkgListV3";
|
||||
|
||||
1. 下载客户端安装包
|
||||
|
||||
<PkgList type={1} sys="Windows" />
|
||||
<PkgListV3 type={4} sys="Windows" />
|
||||
|
||||
[所有下载](https://www.taosdata.com/cn/all-downloads/)
|
||||
|
||||
|
|
|
@ -93,12 +93,12 @@ Maven 项目中,在 pom.xml 中添加以下依赖:
|
|||
可以通过下载 TDengine 的源码,自己编译最新版本的 Java connector
|
||||
|
||||
```shell
|
||||
git clone https://github.com/taosdata/taos-connector-jdbc.git --branch 2.0
|
||||
git clone https://github.com/taosdata/taos-connector-jdbc.git
|
||||
cd taos-connector-jdbc
|
||||
mvn clean install -Dmaven.test.skip=true
|
||||
```
|
||||
|
||||
编译后,在 target 目录下会产生 taos-jdbcdriver-2.0.XX-dist.jar 的 jar 包,并自动将编译的 jar 文件放在本地的 Maven 仓库中。
|
||||
编译后,在 target 目录下会产生 taos-jdbcdriver-3.0.*-dist.jar 的 jar 包,并自动将编译的 jar 文件放在本地的 Maven 仓库中。
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
@ -198,7 +198,7 @@ url 中的配置参数如下:
|
|||
|
||||
- user:登录 TDengine 用户名,默认值 'root'。
|
||||
- password:用户登录密码,默认值 'taosdata'。
|
||||
- batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。从 taos-jdbcdriver-2.0.38 开始,JDBC REST 连接增加批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
|
||||
- batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。JDBC REST 连接支持批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
|
||||
- charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。
|
||||
- batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL 后的任何语句。默认值为:false。
|
||||
- httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 5000。
|
||||
|
@ -216,7 +216,7 @@ url 中的配置参数如下:
|
|||
INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6);
|
||||
```
|
||||
|
||||
- 从 taos-jdbcdriver-2.0.36 开始,如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行 sql:insert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6);
|
||||
- 如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行 sql:insert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6);
|
||||
|
||||
:::
|
||||
|
||||
|
@ -230,7 +230,7 @@ INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFra
|
|||
**注意**:
|
||||
|
||||
- 应用中设置的 client parameter 为进程级别的,即如果要更新 client 的参数,需要重启应用。这是因为 client parameter 是全局参数,仅在应用程序的第一次设置生效。
|
||||
- 以下示例代码基于 taos-jdbcdriver-2.0.36。
|
||||
- 以下示例代码基于 taos-jdbcdriver-3.0.0。
|
||||
|
||||
```java
|
||||
public Connection getConn() throws Exception{
|
||||
|
@ -367,7 +367,7 @@ TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据
|
|||
**注意**:
|
||||
|
||||
- JDBC REST 连接目前不支持参数绑定
|
||||
- 以下示例代码基于 taos-jdbcdriver-2.0.36
|
||||
- 以下示例代码基于 taos-jdbcdriver-3.0.0
|
||||
- binary 类型数据需要调用 setString 方法,nchar 类型数据需要调用 setNString 方法
|
||||
- setString 和 setNString 都要求用户在 size 参数里声明表定义中对应列的列宽
|
||||
|
||||
|
@ -635,7 +635,7 @@ TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协
|
|||
**注意**:
|
||||
|
||||
- JDBC REST 连接目前不支持无模式写入
|
||||
- 以下示例代码基于 taos-jdbcdriver-2.0.36
|
||||
- 以下示例代码基于 taos-jdbcdriver-3.0.0
|
||||
|
||||
```java
|
||||
public class SchemalessInsertTest {
|
||||
|
@ -666,7 +666,7 @@ public class SchemalessInsertTest {
|
|||
}
|
||||
```
|
||||
|
||||
### 订阅
|
||||
### 数据订阅
|
||||
|
||||
TDengine Java 连接器支持订阅功能,应用 API 如下:
|
||||
|
||||
|
@ -717,9 +717,14 @@ while(true) {
|
|||
#### 关闭订阅
|
||||
|
||||
```java
|
||||
// 取消订阅
|
||||
consumer.unsubscribe();
|
||||
// 关闭消费
|
||||
consumer.close()
|
||||
```
|
||||
|
||||
详情请参考:[数据订阅](../../../develop/tmq)
|
||||
|
||||
### 使用示例如下:
|
||||
|
||||
```java
|
||||
|
@ -734,7 +739,7 @@ public abstract class ConsumerLoop {
|
|||
config.setProperty("msg.with.table.name", "true");
|
||||
config.setProperty("enable.auto.commit", "true");
|
||||
config.setProperty("group.id", "group1");
|
||||
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ResultDeserializer");
|
||||
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
|
||||
|
||||
this.consumer = new TaosConsumer<>(config);
|
||||
this.topics = Collections.singletonList("topic_speed");
|
||||
|
@ -754,6 +759,7 @@ public abstract class ConsumerLoop {
|
|||
process(record);
|
||||
}
|
||||
}
|
||||
consumer.unsubscribe();
|
||||
} finally {
|
||||
consumer.close();
|
||||
shutdownLatch.countDown();
|
||||
|
@ -875,6 +881,7 @@ public static void main(String[] args) throws Exception {
|
|||
|
||||
| taos-jdbcdriver 版本 | 主要变化 |
|
||||
| :------------------: | :----------------------------: |
|
||||
| 3.0.0 | 支持 TDengine 3.0 |
|
||||
| 2.0.39 - 2.0.40 | 增加 REST 连接/请求 超时设置 |
|
||||
| 2.0.38 | JDBC REST 连接增加批量拉取功能 |
|
||||
| 2.0.37 | 增加对 json tag 支持 |
|
||||
|
|
|
@ -30,6 +30,7 @@ extern bool gRaftDetailLog;
|
|||
#define SYNC_SPEED_UP_HB_TIMER 400
|
||||
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
|
||||
#define SYNC_SLOW_DOWN_RANGE 100
|
||||
#define SYNC_MAX_READ_RANGE 10
|
||||
|
||||
#define SYNC_MAX_BATCH_SIZE 1
|
||||
#define SYNC_INDEX_BEGIN 0
|
||||
|
@ -210,9 +211,12 @@ void syncStop(int64_t rid);
|
|||
int32_t syncSetStandby(int64_t rid);
|
||||
ESyncState syncGetMyRole(int64_t rid);
|
||||
bool syncIsReady(int64_t rid);
|
||||
bool syncIsReadyForRead(int64_t rid);
|
||||
const char* syncGetMyRoleStr(int64_t rid);
|
||||
bool syncRestoreFinish(int64_t rid);
|
||||
SyncTerm syncGetMyTerm(int64_t rid);
|
||||
SyncIndex syncGetLastIndex(int64_t rid);
|
||||
SyncIndex syncGetCommitIndex(int64_t rid);
|
||||
SyncGroupId syncGetVgId(int64_t rid);
|
||||
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
|
|
|
@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode);
|
|||
void vnodeSyncClose(SVnode* pVnode);
|
||||
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
|
||||
bool vnodeIsLeader(SVnode* pVnode);
|
||||
bool vnodeIsReadyForRead(SVnode* pVnode);
|
||||
bool vnodeIsRoleLeader(SVnode* pVnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -283,7 +283,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
vTrace("message in vnode query queue is processing");
|
||||
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
|
||||
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsReadyForRead(pVnode)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
@ -307,7 +307,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
|
||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||
!vnodeIsLeader(pVnode)) {
|
||||
!vnodeIsReadyForRead(pVnode)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -781,3 +781,17 @@ bool vnodeIsLeader(SVnode *pVnode) {
|
|||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool vnodeIsReadyForRead(SVnode *pVnode) {
|
||||
if (syncIsReady(pVnode->sync)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (syncIsReadyForRead(pVnode->sync)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId,
|
||||
syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync));
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -1399,7 +1399,7 @@ static int32_t translateTimelineFunc(STranslateContext* pCxt, SFunctionNode* pFu
|
|||
"%s function must be used in select statements", pFunc->functionName);
|
||||
}
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
|
||||
if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) &&
|
||||
if (NULL != pSelect->pFromTable && QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) &&
|
||||
!isTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
|
||||
"%s function requires valid time series input", pFunc->functionName);
|
||||
|
@ -2037,16 +2037,13 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName,
|
|||
code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code &&
|
||||
0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
|
||||
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) &&
|
||||
isSelectStmt(pCxt->pCurrStmt) &&
|
||||
if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
|
||||
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) && isSelectStmt(pCxt->pCurrStmt) &&
|
||||
0 == taosArrayGetSize(vgroupList)) {
|
||||
((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code &&
|
||||
0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
|
||||
if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
|
||||
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) {
|
||||
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
|
||||
}
|
||||
|
|
|
@ -392,6 +392,29 @@ bool syncIsReady(int64_t rid) {
|
|||
return b;
|
||||
}
|
||||
|
||||
bool syncIsReadyForRead(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return false;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
|
||||
// TODO: last not noop?
|
||||
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||
bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && (pSyncNode->commitIndex >= lastIndex - SYNC_MAX_READ_RANGE);
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
|
||||
// if false, set error code
|
||||
if (false == b) {
|
||||
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
||||
terrno = TSDB_CODE_SYN_NOT_LEADER;
|
||||
} else {
|
||||
terrno = TSDB_CODE_APP_NOT_READY;
|
||||
}
|
||||
}
|
||||
return b;
|
||||
}
|
||||
|
||||
bool syncIsRestoreFinish(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
|
@ -519,6 +542,30 @@ SyncTerm syncGetMyTerm(int64_t rid) {
|
|||
return term;
|
||||
}
|
||||
|
||||
SyncIndex syncGetLastIndex(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return SYNC_INDEX_INVALID;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return lastIndex;
|
||||
}
|
||||
|
||||
SyncIndex syncGetCommitIndex(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return SYNC_INDEX_INVALID;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
SyncIndex cmtIndex = pSyncNode->commitIndex;
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return cmtIndex;
|
||||
}
|
||||
|
||||
SyncGroupId syncGetVgId(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
|
@ -828,6 +875,15 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
|||
pSyncNode->changing = true;
|
||||
}
|
||||
|
||||
// not restored, vnode enable
|
||||
if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
|
||||
ret = -1;
|
||||
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
|
||||
sError("vgId:%d, failed to sync propose since not ready, type:%s, last:%ld, cmt:%ld", pSyncNode->vgId,
|
||||
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
|
||||
goto _END;
|
||||
}
|
||||
|
||||
SRespStub stub;
|
||||
stub.createTime = taosGetTimestampMs();
|
||||
stub.rpcMsg = *pMsg;
|
||||
|
|
Loading…
Reference in New Issue