Merge branch '3.0' of github.com:taosdata/TDengine into szhou/fixbugs
This commit is contained in:
commit
690f0224b4
|
@ -29,6 +29,7 @@ echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" |
|
|||
如果安装 Beta 版需要安装包仓库
|
||||
|
||||
```bash
|
||||
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
|
||||
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list
|
||||
```
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
---
|
||||
sidebar_label: 消息队列
|
||||
title: 消息队列
|
||||
sidebar_label: 数据订阅
|
||||
title: 数据订阅
|
||||
---
|
||||
|
||||
TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用户的解决方案。
|
||||
|
@ -8,24 +8,17 @@ TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用
|
|||
## 创建订阅主题
|
||||
|
||||
```sql
|
||||
CREATE TOPIC [IF NOT EXISTS] topic_name AS {subquery | DATABASE db_name | STABLE stb_name };
|
||||
CREATE TOPIC [IF NOT EXISTS] topic_name AS subquery;
|
||||
```
|
||||
|
||||
订阅主题包括三种:列订阅、超级表订阅和数据库订阅。
|
||||
|
||||
**列订阅是**用 subquery 描述,支持过滤和标量函数和 UDF 标量函数,不支持 JOIN、GROUP BY、窗口切分子句、聚合函数和 UDF 聚合函数。列订阅规则如下:
|
||||
TOPIC 支持过滤和标量函数和 UDF 标量函数,不支持 JOIN、GROUP BY、窗口切分子句、聚合函数和 UDF 聚合函数。列订阅规则如下:
|
||||
|
||||
1. TOPIC 一旦创建则返回结果的字段确定
|
||||
2. 被订阅或用于计算的列不可被删除、修改
|
||||
3. 列可以新增,但新增的列不出现在订阅结果字段中
|
||||
4. 对于 select \*,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列)
|
||||
|
||||
**超级表订阅和数据库订阅**规则如下:
|
||||
|
||||
1. 被订阅主体的 schema 变更不受限
|
||||
2. 返回消息中 schema 是块级别的,每块的 schema 可能不一样
|
||||
3. 列变更后写入的数据若未落盘,将以写入时的 schema 返回
|
||||
4. 列变更后写入的数据若未已落盘,将以落盘时的 schema 返回
|
||||
|
||||
## 删除订阅主题
|
||||
|
||||
|
|
|
@ -20,8 +20,10 @@ RESTful 接口不依赖于任何 TDengine 的库,因此客户端不需要安
|
|||
|
||||
下面示例是列出所有的数据库,请把 h1.taosdata.com 和 6041(缺省值)替换为实际运行的 TDengine 服务 FQDN 和端口号:
|
||||
|
||||
```html
|
||||
curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -d "show databases;" h1.taosdata.com:6041/rest/sql
|
||||
```bash
|
||||
curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" \
|
||||
-d "select name, ntables, status from information_schema.ins_databases;" \
|
||||
h1.taosdata.com:6041/rest/sql
|
||||
```
|
||||
|
||||
返回值结果如下表示验证通过:
|
||||
|
@ -35,188 +37,27 @@ curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -d "show databases;" h1.t
|
|||
"VARCHAR",
|
||||
64
|
||||
],
|
||||
[
|
||||
"create_time",
|
||||
"TIMESTAMP",
|
||||
8
|
||||
],
|
||||
[
|
||||
"vgroups",
|
||||
"SMALLINT",
|
||||
2
|
||||
],
|
||||
[
|
||||
"ntables",
|
||||
"BIGINT",
|
||||
8
|
||||
],
|
||||
[
|
||||
"replica",
|
||||
"TINYINT",
|
||||
1
|
||||
],
|
||||
[
|
||||
"strict",
|
||||
"VARCHAR",
|
||||
4
|
||||
],
|
||||
[
|
||||
"duration",
|
||||
"VARCHAR",
|
||||
10
|
||||
],
|
||||
[
|
||||
"keep",
|
||||
"VARCHAR",
|
||||
32
|
||||
],
|
||||
[
|
||||
"buffer",
|
||||
"INT",
|
||||
4
|
||||
],
|
||||
[
|
||||
"pagesize",
|
||||
"INT",
|
||||
4
|
||||
],
|
||||
[
|
||||
"pages",
|
||||
"INT",
|
||||
4
|
||||
],
|
||||
[
|
||||
"minrows",
|
||||
"INT",
|
||||
4
|
||||
],
|
||||
[
|
||||
"maxrows",
|
||||
"INT",
|
||||
4
|
||||
],
|
||||
[
|
||||
"comp",
|
||||
"TINYINT",
|
||||
1
|
||||
],
|
||||
[
|
||||
"precision",
|
||||
"VARCHAR",
|
||||
2
|
||||
],
|
||||
[
|
||||
"status",
|
||||
"VARCHAR",
|
||||
10
|
||||
],
|
||||
[
|
||||
"retention",
|
||||
"VARCHAR",
|
||||
60
|
||||
],
|
||||
[
|
||||
"single_stable",
|
||||
"BOOL",
|
||||
1
|
||||
],
|
||||
[
|
||||
"cachemodel",
|
||||
"VARCHAR",
|
||||
11
|
||||
],
|
||||
[
|
||||
"cachesize",
|
||||
"INT",
|
||||
4
|
||||
],
|
||||
[
|
||||
"wal_level",
|
||||
"TINYINT",
|
||||
1
|
||||
],
|
||||
[
|
||||
"wal_fsync_period",
|
||||
"INT",
|
||||
4
|
||||
],
|
||||
[
|
||||
"wal_retention_period",
|
||||
"INT",
|
||||
4
|
||||
],
|
||||
[
|
||||
"wal_retention_size",
|
||||
"BIGINT",
|
||||
8
|
||||
],
|
||||
[
|
||||
"wal_roll_period",
|
||||
"INT",
|
||||
4
|
||||
],
|
||||
[
|
||||
"wal_seg_size",
|
||||
"BIGINT",
|
||||
8
|
||||
]
|
||||
],
|
||||
"data": [
|
||||
[
|
||||
"information_schema",
|
||||
null,
|
||||
null,
|
||||
14,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"ready",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
16,
|
||||
"ready"
|
||||
],
|
||||
[
|
||||
"performance_schema",
|
||||
null,
|
||||
null,
|
||||
3,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
"ready",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
9,
|
||||
"ready"
|
||||
]
|
||||
],
|
||||
"rows": 2
|
||||
|
@ -231,21 +72,21 @@ http://<fqdn>:<port>/rest/sql/[db_name]
|
|||
|
||||
参数说明:
|
||||
|
||||
- fqnd: 集群中的任一台主机 FQDN 或 IP 地址
|
||||
- port: 配置文件中 httpPort 配置项,缺省为 6041
|
||||
- fqnd: 集群中的任一台主机 FQDN 或 IP 地址。
|
||||
- port: 配置文件中 httpPort 配置项,缺省为 6041。
|
||||
- db_name: 可选参数,指定本次所执行的 SQL 语句的默认数据库库名。
|
||||
|
||||
例如:`http://h1.taos.com:6041/rest/sql/test` 是指向地址为 `h1.taos.com:6041` 的 URL,并将默认使用的数据库库名设置为 `test`。
|
||||
|
||||
HTTP 请求的 Header 里需带有身份认证信息,TDengine 支持 Basic 认证与自定义认证两种机制,后续版本将提供标准安全的数字签名机制来做身份验证。
|
||||
|
||||
- [自定义身份认证信息](#自定义授权码)如下所示
|
||||
- [自定义身份认证信息](#自定义授权码)如下所示:
|
||||
|
||||
```text
|
||||
Authorization: Taosd <TOKEN>
|
||||
```
|
||||
|
||||
- Basic 身份认证信息如下所示
|
||||
- Basic 身份认证信息如下所示:
|
||||
|
||||
```text
|
||||
Authorization: Basic <TOKEN>
|
||||
|
@ -259,13 +100,13 @@ HTTP 请求的 BODY 里就是一个完整的 SQL 语句,SQL 语句中的数据
|
|||
curl -L -H "Authorization: Basic <TOKEN>" -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
|
||||
```
|
||||
|
||||
或者
|
||||
或者,
|
||||
|
||||
```bash
|
||||
curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
|
||||
```
|
||||
|
||||
其中,`TOKEN` 为 `{username}:{password}` 经过 Base64 编码之后的字符串,例如 `root:taosdata` 编码后为 `cm9vdDp0YW9zZGF0YQ==`
|
||||
其中,`TOKEN` 为 `{username}:{password}` 经过 Base64 编码之后的字符串,例如 `root:taosdata` 编码后为 `cm9vdDp0YW9zZGF0YQ==`。
|
||||
|
||||
## HTTP 返回格式
|
||||
|
||||
|
@ -282,27 +123,9 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
|
|||
|
||||
### HTTP body 结构
|
||||
|
||||
<table>
|
||||
<tr>
|
||||
<th>执行结果</th>
|
||||
<th>说明</th>
|
||||
<th>样例</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>正确执行</td>
|
||||
<td>
|
||||
code:(int)0 代表成功
|
||||
<br/>
|
||||
<br/>
|
||||
column_meta:([][3]any)列信息,每个列会用三个值来说明,分别为:列名(string)、列类型(string)、类型长度(int)
|
||||
<br/>
|
||||
<br/>
|
||||
rows:(int)数据返回行数
|
||||
<br/>
|
||||
<br/>
|
||||
data:([][]any)具体数据内容
|
||||
</td>
|
||||
<td>
|
||||
#### 正确执行
|
||||
|
||||
样例:
|
||||
|
||||
```json
|
||||
{
|
||||
|
@ -313,23 +136,16 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
|
|||
}
|
||||
```
|
||||
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>正确查询</td>
|
||||
<td>
|
||||
code:(int)0 代表成功
|
||||
<br/>
|
||||
<br/>
|
||||
column_meta:([][3]any) 列信息,每个列会用三个值来说明,分别为:列名(string)、列类型(string)、类型长度(int)
|
||||
<br/>
|
||||
<br/>
|
||||
rows:(int)数据返回行数
|
||||
<br/>
|
||||
<br/>
|
||||
data:([][]any)具体数据内容
|
||||
</td>
|
||||
<td>
|
||||
说明:
|
||||
|
||||
- code:(`int`)0 代表成功。
|
||||
- column_meta:(`[1][3]any`)只返回 `[["affected_rows", "INT", 4]]`。
|
||||
- rows:(`int`)只返回 `1`。
|
||||
- data:(`[][]any`)返回受影响行数。
|
||||
|
||||
#### 正确查询
|
||||
|
||||
样例:
|
||||
|
||||
```json
|
||||
{
|
||||
|
@ -385,17 +201,35 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
|
|||
}
|
||||
```
|
||||
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>错误</td>
|
||||
<td>
|
||||
code:(int)错误码
|
||||
<br/>
|
||||
<br/>
|
||||
desc:(string)错误描述
|
||||
</td>
|
||||
<td>
|
||||
说明:
|
||||
|
||||
- code:(`int`)0 代表成功。
|
||||
- column_meta:(`[][3]any`) 列信息,每个列会用三个值来说明,分别为:列名(string)、列类型(string)、类型长度(int)。
|
||||
- rows:(`int`)数据返回行数。
|
||||
- data:(`[][]any`)具体数据内容(时间格式仅支持 RFC3339,结果集为 0 时区)。
|
||||
|
||||
列类型使用如下字符串:
|
||||
|
||||
- "NULL"
|
||||
- "BOOL"
|
||||
- "TINYINT"
|
||||
- "SMALLINT"
|
||||
- "INT"
|
||||
- "BIGINT"
|
||||
- "FLOAT"
|
||||
- "DOUBLE"
|
||||
- "VARCHAR"
|
||||
- "TIMESTAMP"
|
||||
- "NCHAR"
|
||||
- "TINYINT UNSIGNED"
|
||||
- "SMALLINT UNSIGNED"
|
||||
- "INT UNSIGNED"
|
||||
- "BIGINT UNSIGNED"
|
||||
- "JSON"
|
||||
|
||||
#### 错误
|
||||
|
||||
样例:
|
||||
|
||||
```json
|
||||
{
|
||||
|
@ -404,30 +238,10 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
|
|||
}
|
||||
```
|
||||
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
说明:
|
||||
|
||||
### 说明
|
||||
|
||||
- 时间格式仅支持 RFC3339,结果集为 0 时区
|
||||
- 列类型使用如下字符串:
|
||||
> "NULL"
|
||||
> "BOOL"
|
||||
> "TINYINT"
|
||||
> "SMALLINT"
|
||||
> "INT"
|
||||
> "BIGINT"
|
||||
> "FLOAT"
|
||||
> "DOUBLE"
|
||||
> "VARCHAR"
|
||||
> "TIMESTAMP"
|
||||
> "NCHAR"
|
||||
> "TINYINT UNSIGNED"
|
||||
> "SMALLINT UNSIGNED"
|
||||
> "INT UNSIGNED"
|
||||
> "BIGINT UNSIGNED"
|
||||
> "JSON"
|
||||
- code:(`int`)错误码。
|
||||
- desc:(`string`)错误描述。
|
||||
|
||||
## 自定义授权码
|
||||
|
||||
|
@ -439,11 +253,9 @@ curl http://<fqnd>:<port>/rest/login/<username>/<password>
|
|||
|
||||
其中,`fqdn` 是 TDengine 数据库的 FQDN 或 IP 地址,`port` 是 TDengine 服务的端口号,`username` 为数据库用户名,`password` 为数据库密码,返回值为 JSON 格式,各字段含义如下:
|
||||
|
||||
- status:请求结果的标志位
|
||||
|
||||
- code:返回值代码
|
||||
|
||||
- desc:授权码
|
||||
- status:请求结果的标志位。
|
||||
- code:返回值代码。
|
||||
- desc:授权码。
|
||||
|
||||
获取授权码示例:
|
||||
|
||||
|
|
|
@ -404,47 +404,3 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
|
|||
|
||||
**支持版本**
|
||||
该功能接口从 2.3.0.0 版本开始支持。
|
||||
|
||||
### 订阅和消费 API
|
||||
|
||||
订阅 API 目前支持订阅一张或多张表,并通过定期轮询的方式不断获取写入表中的最新数据。
|
||||
|
||||
- `TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval)`
|
||||
|
||||
该函数负责启动订阅服务,成功时返回订阅对象,失败时返回 `NULL`,其参数为:
|
||||
|
||||
- taos:已经建立好的数据库连接
|
||||
- restart:如果订阅已经存在,是重新开始,还是继续之前的订阅
|
||||
- topic:订阅的主题(即名称),此参数是订阅的唯一标识
|
||||
- sql:订阅的查询语句,此语句只能是 `select` 语句,只应查询原始数据,只能按时间正序查询数据
|
||||
- fp:收到查询结果时的回调函数(稍后介绍函数原型),只在异步调用时使用,同步调用时此参数应该传 `NULL`
|
||||
- param:调用回调函数时的附加参数,系统 API 将其原样传递到回调函数,不进行任何处理
|
||||
- interval:轮询周期,单位为毫秒。异步调用时,将根据此参数周期性的调用回调函数,为避免对系统性能造成影响,不建议将此参数设置的过小;同步调用时,如两次调用 `taos_consume()` 的间隔小于此周期,API 将会阻塞,直到时间间隔超过此周期。
|
||||
|
||||
- `typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code)`
|
||||
|
||||
异步模式下,回调函数的原型,其参数为:
|
||||
|
||||
- tsub:订阅对象
|
||||
- res:查询结果集,注意结果集中可能没有记录
|
||||
- param:调用 `taos_subscribe()` 时客户程序提供的附加参数
|
||||
- code:错误码
|
||||
|
||||
:::note
|
||||
在这个回调函数里不可以做耗时过长的处理,尤其是对于返回的结果集中数据较多的情况,否则有可能导致客户端阻塞等异常状态。如果必须进行复杂计算,则建议在另外的线程中进行处理。
|
||||
|
||||
:::
|
||||
|
||||
- `TAOS_RES *taos_consume(TAOS_SUB *tsub)`
|
||||
|
||||
同步模式下,该函数用来获取订阅的结果。 用户应用程序将其置于一个循环之中。 如两次调用 `taos_consume()` 的间隔小于订阅的轮询周期,API 将会阻塞,直到时间间隔超过此周期。如果数据库有新记录到达,该 API 将返回该最新的记录,否则返回一个没有记录的空结果集。 如果返回值为 `NULL`,说明系统出错。 异步模式下,用户程序不应调用此 API。
|
||||
|
||||
:::note
|
||||
在调用 `taos_consume()` 之后,用户应用应确保尽快调用 `taos_fetch_row()` 或 `taos_fetch_block()` 来处理订阅结果,否则服务端会持续缓存查询结果数据等待客户端读取,极端情况下会导致服务端内存消耗殆尽,影响服务稳定性。
|
||||
|
||||
:::
|
||||
|
||||
- `void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress)`
|
||||
|
||||
取消订阅。 如参数 `keepProgress` 不为 0,API 会保留订阅的进度信息,后续调用 `taos_subscribe()` 时可以基于此进度继续;否则将删除进度信息,后续只能重新开始读取数据。
|
||||
|
||||
|
|
|
@ -712,7 +712,7 @@ while(true) {
|
|||
}
|
||||
```
|
||||
|
||||
`poll` 每次调用获取一个消息。请按需选择合理的调用 `poll` 的频率(如例子中的 `Duration.ofMillis(100)`),否则会给服务端造成不必要的压力。
|
||||
`poll` 每次调用获取一个消息。
|
||||
|
||||
#### 关闭订阅
|
||||
|
||||
|
|
|
@ -275,12 +275,8 @@ typedef struct SStreamTask {
|
|||
int32_t nodeId;
|
||||
SEpSet epSet;
|
||||
|
||||
// used for task source and sink,
|
||||
// while task agg should have processedVer for each child
|
||||
int64_t recoverSnapVer;
|
||||
int64_t startVer;
|
||||
int64_t checkpointVer;
|
||||
int64_t processedVer;
|
||||
|
||||
// children info
|
||||
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
|
||||
|
|
|
@ -381,6 +381,9 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
|||
}
|
||||
#endif
|
||||
|
||||
taosMemoryFree(pParam->pOffset);
|
||||
if (pBuf->pData) taosMemoryFree(pBuf->pData);
|
||||
|
||||
/*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
|
||||
* pOffset->version);*/
|
||||
|
||||
|
@ -402,6 +405,8 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
|||
tsem_post(&pParamSet->rspSem);
|
||||
}
|
||||
|
||||
taosMemoryFree(pParamSet);
|
||||
|
||||
#if 0
|
||||
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
|
||||
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
|
||||
|
@ -611,12 +616,12 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
|
|||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (!async) {
|
||||
#if 0
|
||||
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
|
||||
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -1216,6 +1221,7 @@ END:
|
|||
} else {
|
||||
taosMemoryFree(pParam);
|
||||
}
|
||||
taosMemoryFree(pMsg->pData);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -128,19 +128,19 @@ typedef struct STsdbReader STsdbReader;
|
|||
#define LASTROW_RETRIEVE_TYPE_ALL 0x1
|
||||
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
|
||||
|
||||
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
|
||||
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader,
|
||||
const char *idstr);
|
||||
void tsdbReaderClose(STsdbReader *pReader);
|
||||
bool tsdbNextDataBlock(STsdbReader *pReader);
|
||||
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
|
||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
|
||||
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
||||
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
||||
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
||||
void *tsdbGetIdx(SMeta *pMeta);
|
||||
void *tsdbGetIvtIdx(SMeta *pMeta);
|
||||
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
|
||||
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader,
|
||||
const char *idstr);
|
||||
void tsdbReaderClose(STsdbReader *pReader);
|
||||
bool tsdbNextDataBlock(STsdbReader *pReader);
|
||||
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
|
||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
|
||||
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
||||
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
||||
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
||||
void *tsdbGetIdx(SMeta *pMeta);
|
||||
void *tsdbGetIvtIdx(SMeta *pMeta);
|
||||
uint64_t getReaderMaxVersion(STsdbReader *pReader);
|
||||
|
||||
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
|
||||
|
|
|
@ -80,7 +80,7 @@ int32_t vnodeQueryOpen(SVnode* pVnode);
|
|||
void vnodeQueryClose(SVnode* pVnode);
|
||||
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
|
||||
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
|
||||
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg);
|
||||
int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
|
||||
|
||||
// vnodeCommit.c
|
||||
int32_t vnodeBegin(SVnode* pVnode);
|
||||
|
@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode);
|
|||
void vnodeSyncClose(SVnode* pVnode);
|
||||
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
|
||||
bool vnodeIsLeader(SVnode* pVnode);
|
||||
bool vnodeIsRoleLeader(SVnode* pVnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -144,6 +144,7 @@ int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb
|
|||
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||
void* pMemRef);
|
||||
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
|
||||
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
|
||||
|
||||
// tq
|
||||
int tqInit();
|
||||
|
@ -169,10 +170,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
|||
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
|
||||
const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq);
|
||||
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
|
||||
const char* stbFullName, SBatchDeleteReq* pDeleteReq);
|
||||
|
||||
// sma
|
||||
int32_t smaInit();
|
||||
|
|
|
@ -201,9 +201,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
|||
}
|
||||
|
||||
SBatchDeleteReq deleteReq;
|
||||
SSubmitReq *pSubmitReq =
|
||||
tdBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid,
|
||||
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq);
|
||||
SSubmitReq *pSubmitReq = tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true,
|
||||
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq);
|
||||
|
||||
if (!pSubmitReq) {
|
||||
smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "tq.h"
|
||||
#include "vnd.h"
|
||||
|
||||
#if 0
|
||||
void tqTmrRspFunc(void* param, void* tmrId) {
|
||||
|
@ -212,9 +213,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
|||
#endif
|
||||
|
||||
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
||||
walApplyVer(pTq->pVnode->pWal, ver);
|
||||
|
||||
if (msgType == TDMT_VND_SUBMIT) {
|
||||
if (vnodeIsRoleLeader(pTq->pVnode) && msgType == TDMT_VND_SUBMIT) {
|
||||
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
|
||||
|
||||
void* data = taosMemoryMalloc(msgLen);
|
||||
|
|
|
@ -25,8 +25,7 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
|
|||
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
for (int32_t row = 0; row < totRow; row++) {
|
||||
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
|
||||
/*int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);*/
|
||||
int64_t groupId = 0;
|
||||
int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
|
||||
char* name = buildCtbNameByGroupId(stbFullName, groupId);
|
||||
tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name);
|
||||
SMetaReader mr = {0};
|
||||
|
@ -49,8 +48,8 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
|
|||
return 0;
|
||||
}
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb,
|
||||
int64_t suid, const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq) {
|
||||
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb,
|
||||
int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) {
|
||||
SSubmitReq* ret = NULL;
|
||||
SArray* schemaReqs = NULL;
|
||||
SArray* schemaReqSz = NULL;
|
||||
|
@ -153,7 +152,7 @@ SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
|||
// assign data
|
||||
// TODO
|
||||
ret = rpcMallocCont(cap);
|
||||
ret->header.vgId = vgId;
|
||||
ret->header.vgId = pVnode->config.vgId;
|
||||
ret->length = sizeof(SSubmitReq);
|
||||
ret->numOfBlocks = htonl(sz);
|
||||
|
||||
|
@ -234,8 +233,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
|||
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
||||
SSubmitReq* pReq = tdBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
|
||||
pTask->tbSink.stbFullName, pVnode->config.vgId, &deleteReq);
|
||||
SSubmitReq* pReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
|
||||
pTask->tbSink.stbFullName, &deleteReq);
|
||||
|
||||
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
|
||||
|
||||
|
|
|
@ -247,6 +247,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
|
||||
vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
|
||||
|
||||
walApplyVer(pVnode->pWal, version);
|
||||
|
||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
||||
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
|
|
|
@ -764,6 +764,8 @@ void vnodeSyncStart(SVnode *pVnode) {
|
|||
|
||||
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
||||
|
||||
bool vnodeIsRoleLeader(SVnode *pVnode) { return syncGetMyRole(pVnode->sync) == TAOS_SYNC_STATE_LEADER; }
|
||||
|
||||
bool vnodeIsLeader(SVnode *pVnode) {
|
||||
if (!syncIsReady(pVnode->sync)) {
|
||||
vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync),
|
||||
|
|
|
@ -642,7 +642,6 @@ static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarPar
|
|||
int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE;
|
||||
trimFn(input, output, type, charLen);
|
||||
|
||||
varDataSetLen(output, len);
|
||||
colDataAppend(pOutputData, i, output, false);
|
||||
output += varDataTLen(output);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue