Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_last

This commit is contained in:
Hongze Cheng 2022-08-08 02:24:28 +00:00
commit b56aedffcb
37 changed files with 504 additions and 372 deletions

View File

@ -1,4 +1,4 @@
const taos = require("td2.0-connector"); const taos = require("@tdengine/client");
const conn = taos.connect({ host: "localhost", database: "power" }); const conn = taos.connect({ host: "localhost", database: "power" });
const cursor = conn.cursor(); const cursor = conn.cursor();
@ -18,4 +18,3 @@ try {
conn.close(); conn.close();
}, 2000); }, 2000);
} }
// bug here: jira 14506

View File

@ -1,13 +1,20 @@
const taos = require("td2.0-connector"); const { options, connect } = require("@tdengine/rest");
var conn = taos.connect({ async function test() {
host: "localhost", options.path = "/rest/sql";
port: 6030, options.host = "localhost";
user: "root", let conn = connect(options);
password: "taosdata", let cursor = conn.cursor();
}); try {
conn.close(); let res = await cursor.query("SELECT server_version()");
res.toString();
} catch (err) {
console.log(err);
}
}
test();
// run with: node connect.js
// output: // output:
// Successfully connected to TDengine // server_version() |
// ===================
// 3.0.0.0 |

View File

@ -1,4 +1,4 @@
const taos = require("td2.0-connector"); const taos = require("@tdengine/client");
const conn = taos.connect({ const conn = taos.connect({
host: "localhost", host: "localhost",

View File

@ -1,4 +1,4 @@
const taos = require("td2.0-connector"); const taos = require("@tdengine/client");
const conn = taos.connect({ const conn = taos.connect({
host: "localhost", host: "localhost",
@ -11,11 +11,11 @@ try {
cursor.execute( cursor.execute(
"CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)" "CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
); );
var sql = `INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) var sql = `INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000) power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000) power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`; power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`;
cursor.execute(sql); cursor.execute(sql,{'quiet':false});
} finally { } finally {
cursor.close(); cursor.close();
conn.close(); conn.close();

View File

@ -1,4 +1,4 @@
const taos = require("td2.0-connector"); const taos = require("@tdengine/client");
const conn = taos.connect({ const conn = taos.connect({
host: "localhost", host: "localhost",
@ -24,10 +24,10 @@ function insertData() {
); );
// bind table name and tags // bind table name and tags
let tagBind = new taos.TaosBind(2); let tagBind = new taos.TaosMultiBindArr(2);
tagBind.bindBinary("California.SanFrancisco"); tagBind.multiBindBinary(["California.SanFrancisco"]);
tagBind.bindInt(2); tagBind.multiBindInt([2]);
cursor.stmtSetTbnameTags("d1001", tagBind.getBind()); cursor.stmtSetTbnameTags("d1001", tagBind.getMultiBindArr());
// bind values // bind values
let valueBind = new taos.TaosMultiBindArr(4); let valueBind = new taos.TaosMultiBindArr(4);

View File

@ -1,4 +1,4 @@
const taos = require("td2.0-connector"); const taos = require("@tdengine/client");
const conn = taos.connect({ const conn = taos.connect({
host: "localhost", host: "localhost",

View File

@ -1,4 +1,4 @@
const taos = require("td2.0-connector"); const taos = require("@tdengine/client");
const conn = taos.connect({ const conn = taos.connect({
host: "localhost", host: "localhost",

View File

@ -1,4 +1,4 @@
const taos = require("td2.0-connector"); const taos = require("@tdengine/client");
const conn = taos.connect({ const conn = taos.connect({
host: "localhost", host: "localhost",
@ -23,25 +23,22 @@ function insertData() {
); );
// bind table name and tags // bind table name and tags
let tagBind = new taos.TaosBind(2); let tagBind = new taos.TaosMultiBindArr(2);
tagBind.bindBinary("California.SanFrancisco"); tagBind.multiBindBinary(["California.SanFrancisco"]);
tagBind.bindInt(2); tagBind.multiBindInt([2]);
cursor.stmtSetTbnameTags("d1001", tagBind.getBind()); cursor.stmtSetTbnameTags("d1001", tagBind.getMultiBindArr());
// bind values // bind values
let rows = [ let rows = [[1648432611249, 1648432611749], [10.3, 12.6], [219, 218], [0.31, 0.33]];
[1648432611249, 10.3, 219, 0.31],
[1648432611749, 12.6, 218, 0.33], let valueBind = new taos.TaosMultiBindArr(4);
]; valueBind.multiBindTimestamp(rows[0]);
for (let row of rows) { valueBind.multiBindFloat(rows[1]);
let valueBind = new taos.TaosBind(4); valueBind.multiBindInt(rows[2]);
valueBind.bindTimestamp(row[0]); valueBind.multiBindFloat(rows[3]);
valueBind.bindFloat(row[1]); cursor.stmtBindParamBatch(valueBind.getMultiBindArr());
valueBind.bindInt(row[2]); cursor.stmtAddBatch();
valueBind.bindFloat(row[3]);
cursor.stmtBindParam(valueBind.getBind());
cursor.stmtAddBatch();
}
// execute // execute
cursor.stmtExecute(); cursor.stmtExecute();

View File

@ -1,4 +1,4 @@
const taos = require("td2.0-connector"); const taos = require("@tdengine/client");
const conn = taos.connect({ host: "localhost", database: "power" }); const conn = taos.connect({ host: "localhost", database: "power" });
const cursor = conn.cursor(); const cursor = conn.cursor();
@ -9,8 +9,6 @@ query.execute().then(function (result) {
// output: // output:
// Successfully connected to TDengine // Successfully connected to TDengine
// Query OK, 2 row(s) in set (0.00317767s)
// ts | current | // ts | current |
// ======================================================= // =======================================================
// 2018-10-03 14:38:05.000 | 10.3 | // 2018-10-03 14:38:05.000 | 10.3 |

View File

@ -1,4 +1,51 @@
const taos = require("td2.0-connector"); const taos = require("@tdengine/client");
const conn = taos.connect({ host: "localhost", database: "power" }); const conn = taos.connect({ host: "localhost", database: "power" });
// 未完成 var cursor = conn.cursor();
function runConsumer() {
// create topic
cursor.execute("create topic topic_name_example as select * from meters");
let consumer = taos.consumer({
'group.id': 'tg2',
'td.connect.user': 'root',
'td.connect.pass': 'taosdata',
'msg.with.table.name': 'true',
'enable.auto.commit': 'true'
});
// subscribe the topic just created.
consumer.subscribe("topic_name_example");
// get subscribe topic list
let topicList = consumer.subscription();
console.log(topicList);
for (let i = 0; i < 5; i++) {
let msg = consumer.consume(100);
console.log(msg.topicPartition);
console.log(msg.block);
console.log(msg.fields)
consumer.commit(msg);
console.log(`=======consumer ${i} done`)
}
consumer.unsubscribe();
consumer.close();
// drop topic
cursor.execute("drop topic topic_name_example");
}
try {
runConsumer();
} finally {
setTimeout(() => {
cursor.close();
conn.close();
}, 2000);
}

View File

@ -4,7 +4,7 @@
"main": "index.js", "main": "index.js",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"td2.0-connector": "^2.0.12", "@tdengine/client": "^3.0.0",
"td2.0-rest-connector": "^1.0.0" "@tdengine/rest": "^3.0.0"
} }
} }

View File

@ -1,4 +1,4 @@
const { options, connect } = require("td2.0-rest-connector"); const { options, connect } = require("@tdengine/rest");
async function test() { async function test() {
options.path = "/rest/sqlt"; options.path = "/rest/sqlt";
@ -17,4 +17,4 @@ test();
// output: // output:
// server_version() | // server_version() |
// =================== // ===================
// 2.4.0.12 | // 3.0.0.0 |

View File

@ -15,11 +15,11 @@ import NodeOpenTSDBTelnet from "../../07-develop/03-insert-data/_js_opts_telnet.
import NodeOpenTSDBJson from "../../07-develop/03-insert-data/_js_opts_json.mdx"; import NodeOpenTSDBJson from "../../07-develop/03-insert-data/_js_opts_json.mdx";
import NodeQuery from "../../07-develop/04-query-data/_js.mdx"; import NodeQuery from "../../07-develop/04-query-data/_js.mdx";
`td2.0-connector` 和 `td2.0-rest-connector` 是 TDengine 的官方 Node.js 语言连接器。Node.js 开发人员可以通过它开发可以存取 TDengine 集群数据的应用软件。 `@tdengine/client` 和 `@tdengine/rest` 是 TDengine 的官方 Node.js 语言连接器。 Node.js 开发人员可以通过它开发可以存取 TDengine 集群数据的应用软件。注意:从 TDengine 3.0 开始 Node.js 原生连接器的包名由 `td2.0-connector` 改名为 `@tdengine/client` 而 rest 连接器的包名由 `td2.0-rest-connector` 改为 `@tdengine/rest`。并且不与 TDengine 2.x 兼容。
`td2.0-connector` 是**原生连接器**,它通过 TDengine 客户端驱动程序taosc连接 TDengine 运行实例支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能。`td2.0-rest-connector` 是 **REST 连接器**,它通过 taosAdapter 提供的 REST 接口连接 TDengine 的运行实例。REST 连接器可以在任何平台运行,但性能略为下降,接口实现的功能特性集合和原生接口有少量不同。 `@tdengine/client` 是**原生连接器**,它通过 TDengine 客户端驱动程序taosc连接 TDengine 运行实例支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能。`@tdengine/rest` 是 **REST 连接器**,它通过 taosAdapter 提供的 REST 接口连接 TDengine 的运行实例。REST 连接器可以在任何平台运行,但性能略为下降,接口实现的功能特性集合和原生接口有少量不同。
Node.js 连接器源码托管在 [GitHub](https://github.com/taosdata/taos-connector-node)。 Node.js 连接器源码托管在 [GitHub](https://github.com/taosdata/taos-connector-node/tree/3.0)。
## 支持的平台 ## 支持的平台
@ -58,7 +58,7 @@ REST 连接器支持所有能运行 Node.js 的平台。
<TabItem value="Linux" label="Linux 系统安装依赖工具"> <TabItem value="Linux" label="Linux 系统安装依赖工具">
- `python` (建议`v2.7` , `v3.x.x` 目前还不支持) - `python` (建议`v2.7` , `v3.x.x` 目前还不支持)
- `td2.0-connector` 2.0.6 支持 Node.js LTS v10.9.0 或更高版本, Node.js LTS v12.8.0 或更高版本;2.0.5 及更早版本支持 Node.js LTS v10.x 版本。其他版本可能存在包兼容性的问题 - `@tdengine/client` 3.0.0 支持 Node.js LTS v10.9.0 或更高版本, Node.js LTS v12.8.0 或更高版本;其他版本可能存在包兼容性的问题
- `make` - `make`
- C 语言编译器,[GCC](https://gcc.gnu.org) v4.8.5 或更高版本 - C 语言编译器,[GCC](https://gcc.gnu.org) v4.8.5 或更高版本
@ -90,14 +90,14 @@ REST 连接器支持所有能运行 Node.js 的平台。
<TabItem value="install_native" label="安装原生连接器"> <TabItem value="install_native" label="安装原生连接器">
```bash ```bash
npm install td2.0-connector npm install @tdengine/client
``` ```
</TabItem> </TabItem>
<TabItem value="install_rest" label="安装 REST 连接器"> <TabItem value="install_rest" label="安装 REST 连接器">
```bash ```bash
npm i td2.0-rest-connector npm install @tdengine/rest
``` ```
</TabItem> </TabItem>
@ -109,13 +109,13 @@ npm i td2.0-rest-connector
验证方法: 验证方法:
- 新建安装验证目录,例如:`~/tdengine-test`,下载 GitHub 上 [nodejsChecker.js 源代码](https://github.com/taosdata/TDengine/tree/develop/examples/nodejs/nodejsChecker.js)到本地。 - 新建安装验证目录,例如:`~/tdengine-test`,下载 GitHub 上 [nodejsChecker.js 源代码](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/nodejsChecker.js)到本地。
- 在命令行中执行以下命令。 - 在命令行中执行以下命令。
```bash ```bash
npm init -y npm init -y
npm install td2.0-connector npm install @tdengine/client
node nodejsChecker.js host=localhost node nodejsChecker.js host=localhost
``` ```
@ -128,11 +128,11 @@ node nodejsChecker.js host=localhost
<Tabs defaultValue="native"> <Tabs defaultValue="native">
<TabItem value="native" label="原生连接"> <TabItem value="native" label="原生连接">
安装并引用 `td2.0-connector` 包。 安装并引用 `@tdengine/client` 包。
```javascript ```javascript
//A cursor also needs to be initialized in order to interact with TDengine from Node.js. //A cursor also needs to be initialized in order to interact with TDengine from Node.js.
const taos = require("td2.0-connector"); const taos = require("@tdengine/client");
var conn = taos.connect({ var conn = taos.connect({
host: "127.0.0.1", host: "127.0.0.1",
user: "root", user: "root",
@ -149,12 +149,12 @@ conn.close();
</TabItem> </TabItem>
<TabItem value="rest" label="REST 连接"> <TabItem value="rest" label="REST 连接">
安装并引用 `td2.0-rest-connector` 包。 安装并引用 `@tdengine/rest` 包。
```javascript ```javascript
//A cursor also needs to be initialized in order to interact with TDengine from Node.js. //A cursor also needs to be initialized in order to interact with TDengine from Node.js.
import { options, connect } from "td2.0-rest-connector"; import { options, connect } from "@tdengine/rest";
options.path = "/rest/sqlt"; options.path = "/rest/sql";
// set host // set host
options.host = "localhost"; options.host = "localhost";
// set other options like user/passwd // set other options like user/passwd
@ -190,26 +190,23 @@ let cursor = conn.cursor();
<NodeQuery /> <NodeQuery />
## 更多示例程序 ## 更多示例程序
| 示例程序 | 示例程序描述 | | 示例程序 | 示例程序描述 |
| ------------------------------------------------------------------------------------------------------------------------------------------ | -------------------------------------- | | ------------------------------------------------------------------------------------------------------------------------------------------ | -------------------------------------- |
| [connection](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/cursorClose.js) | 建立连接的示例。 | | [basicUse](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/queryExample.js) | 基本的使用如如建立连接,执行 SQL 等操作。 |
| [stmtBindBatch](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/stmtBindParamBatchSample.js) | 绑定多行参数插入的示例。 | | [stmtBindBatch](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/bindParamBatch.js) | 绑定多行参数插入的示例。 | |
| [stmtBind](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/stmtBindParamSample.js) | 一行一行绑定参数插入的示例。 | | [stmtBindSingleParamBatch](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/bindSingleParamBatch.js) | 按列绑定参数插入的示例。 |
| [stmtBindSingleParamBatch](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/stmtBindSingleParamBatchSample.js) | 按列绑定参数插入的示例。 | | [stmtQuery](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/stmtQuery.js) | 绑定参数查询的示例。 |
| [stmtUseResult](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/stmtUseResultSample.js) | 绑定参数查询的示例。 | | [schemless insert](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/schemaless.js) | schemless 插入的示例。 |
| [json tag](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/testJsonTag.js) | Json tag 的使用示例。 | | [TMQ](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/tmq.js) | 订阅的使用示例。 |
| [Nanosecond](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/testNanoseconds.js) | 时间戳为纳秒精度的使用的示例。 | | [asyncQuery](https://github.com/taosdata/taos-connector-node/blob/3.0/nodejs/examples/asyncQueryExample.js) | 异步查询的使用示例。 |
| [Microsecond](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/testMicroseconds.js) | 时间戳为微秒精度的使用的示例。 | | [REST](https://github.com/taosdata/taos-connector-node/blob/3.0/typescript-rest/example/example.ts) | 使用 REST 连接的 TypeScript 使用示例。 |
| [schemless insert](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/testSchemalessInsert.js) | schemless 插入的示例。 |
| [subscribe](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/testSubscribe.js) | 订阅的使用示例。 |
| [asyncQuery](https://github.com/taosdata/taos-connector-node/tree/develop/nodejs/examples/tset.js) | 异步查询的使用示例。 |
| [REST](https://github.com/taosdata/taos-connector-node/blob/develop/typescript-rest/example/example.ts) | 使用 REST 连接的 TypeScript 使用示例。 |
## 使用限制 ## 使用限制
Node.js 连接器 >= v2.0.6 目前支持 node 的版本为:支持 >=v12.8.0 <= v12.9.1 || >=v10.20.0 <= v10.9.0 2.0.5 及更早版本支持 v10.x 版本,其他版本可能存在包兼容性的问题。 native 连接器(`@tdengine/client` >= v3.0.0 目前支持 node 的版本为:支持 >=v12.8.0 <= v12.9.1 || >=v10.20.0 <= v10.9.0 2.0.5 及更早版本支持 v10.x 版本,其他版本可能存在包兼容性的问题。
## 其他说明 ## 其他说明
@ -225,7 +222,7 @@ Node.js 连接器的使用参见[视频教程](https://www.taosdata.com/blog/202
2. Node.js 版本 2. Node.js 版本
连接器 >v2.0.6 目前兼容的 Node.js 版本为:>=v10.20.0 <= v10.9.0 || >=v12.8.0 <= v12.9.1 原生连接器 `@tdengine/client` 目前兼容的 Node.js 版本为:>=v10.20.0 <= v10.9.0 || >=v12.8.0 <= v12.9.1
3. "Unable to establish connection""Unable to resolve FQDN" 3. "Unable to establish connection""Unable to resolve FQDN"
@ -235,17 +232,21 @@ Node.js 连接器的使用参见[视频教程](https://www.taosdata.com/blog/202
### 原生连接器 ### 原生连接器
| td2.0-connector 版本 | 说明 | | package name | version | TDengine version | 说明 |
| -------------------- | ---------------------------------------------------------------- | |------------------|---------|---------------------|------------------------------------------------------------------|
| 2.0.12 | 修复 cursor.close() 报错的 bug。 | | @tdengine/client | 3.0.0 | 3.0.0 | 支持TDengine 3.0 且不与2.x 兼容。 |
| 2.0.11 | 支持绑定参数、json tag、schemaless 接口等功能。 | | td2.0-connector | 2.0.12 | 2.4.x2.5.x2.6.x | 修复 cursor.close() 报错的 bug。 |
| 2.0.10 | 支持连接管理,普通查询、连续查询、获取系统信息、订阅功能等功能。 | | td2.0-connector | 2.0.11 | 2.4.x2.5.x2.6.x | 支持绑定参数、json tag、schemaless 接口等功能。 |
| td2.0-connector | 2.0.10 | 2.4.x2.5.x2.6.x | 支持连接管理,普通查询、连续查询、获取系统信息、订阅功能等功能。 |
### REST 连接器 ### REST 连接器
| td2.0-rest-connector 版本 | 说明 | | package name | version | TDengine version | 说明 |
| ------------------------- | ---------------------------------------------------------------- | |----------------------|---------|---------------------|---------------------------------------------------------------------------|
| 1.0.3 | 支持连接管理、普通查询、获取系统信息、错误信息、连续查询等功能。 | | @tdengine/rest | 3.0.0 | 3.0.0 | 支持 TDegnine 3.0且不与2.x 兼容。 |
| td2.0-rest-connector | 1.0.7 | 2.4.x2.5.x2.6.x | 移除默认端口 6041。 |
| td2.0-rest-connector | 1.0.6 | 2.4.x2.5.x2.6.x | 修复createinsertupdatealter 等SQL 执行返回的 affectRows 错误的bug。 |
| td2.0-rest-connector | 1.0.5 | 2.4.x2.5.x2.6.x | 支持云服务 cloud Token |
| td2.0-rest-connector | 1.0.3 | 2.4.x2.5.x2.6.x | 支持连接管理、普通查询、获取系统信息、错误信息、连续查询等功能。 |
## API 参考 ## API 参考

View File

@ -2657,6 +2657,34 @@ typedef struct {
SEpSet epSet; SEpSet epSet;
} SVgEpSet; } SVgEpSet;
typedef struct {
int64_t refId;
int64_t suid;
int8_t level;
} SRSmaFetchMsg;
static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFetchMsg* pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->refId) < 0) return -1;
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
if (tEncodeI8(pCoder, pReq->level) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
static FORCE_INLINE int32_t tDecodeSRSmaFetchMsg(SDecoder* pCoder, SRSmaFetchMsg* pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->refId) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->level) < 0) return -1;
tEndDecode(pCoder);
return 0;
}
typedef struct { typedef struct {
int8_t version; // for compatibility(default 0) int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX int8_t intervalUnit; // MACRO: TIME_UNIT_XXX

View File

@ -202,7 +202,7 @@ bool fmIsForbidStreamFunc(int32_t funcId);
bool fmIsIntervalInterpoFunc(int32_t funcId); bool fmIsIntervalInterpoFunc(int32_t funcId);
bool fmIsInterpFunc(int32_t funcId); bool fmIsInterpFunc(int32_t funcId);
bool fmIsLastRowFunc(int32_t funcId); bool fmIsLastRowFunc(int32_t funcId);
bool fmIsReturnNotNullFunc(int32_t funcId); bool fmIsNotNullOutputFunc(int32_t funcId);
bool fmIsSelectValueFunc(int32_t funcId); bool fmIsSelectValueFunc(int32_t funcId);
bool fmIsSystemInfoFunc(int32_t funcId); bool fmIsSystemInfoFunc(int32_t funcId);
bool fmIsImplicitTsFunc(int32_t funcId); bool fmIsImplicitTsFunc(int32_t funcId);

View File

@ -25,9 +25,9 @@ extern "C" {
// reference counting // reference counting
typedef void (*_ref_fn_t)(const void *pObj); typedef void (*_ref_fn_t)(const void *pObj);
#define T_REF_DECLARE() \ #define T_REF_DECLARE() \
struct { \ struct { \
int32_t val; \ volatile int32_t val; \
} _ref; } _ref;
#define T_REF_REGISTER_FUNC(s, e) \ #define T_REF_REGISTER_FUNC(s, e) \

View File

@ -354,11 +354,19 @@ static const SSysTableMeta perfsMeta[] = {
{TSDB_PERFS_TABLE_APPS, appSchema, tListLen(appSchema)}}; {TSDB_PERFS_TABLE_APPS, appSchema, tListLen(appSchema)}};
void getInfosDbMeta(const SSysTableMeta** pInfosTableMeta, size_t* size) { void getInfosDbMeta(const SSysTableMeta** pInfosTableMeta, size_t* size) {
*pInfosTableMeta = infosMeta; if (pInfosTableMeta) {
*size = tListLen(infosMeta); *pInfosTableMeta = infosMeta;
}
if (size) {
*size = tListLen(infosMeta);
}
} }
void getPerfDbMeta(const SSysTableMeta** pPerfsTableMeta, size_t* size) { void getPerfDbMeta(const SSysTableMeta** pPerfsTableMeta, size_t* size) {
*pPerfsTableMeta = perfsMeta; if (pPerfsTableMeta) {
*size = tListLen(perfsMeta); *pPerfsTableMeta = perfsMeta;
}
if (size) {
*size = tListLen(perfsMeta);
}
} }

View File

@ -1691,13 +1691,17 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
if (!pShow->sysDbRsp) { if (!pShow->sysDbRsp) {
SDbObj infoschemaDb = {0}; SDbObj infoschemaDb = {0};
setInformationSchemaDbCfg(&infoschemaDb); setInformationSchemaDbCfg(&infoschemaDb);
dumpDbInfoData(pBlock, &infoschemaDb, pShow, numOfRows, 14, true, 0, 1); size_t numOfTables = 0;
getInfosDbMeta(NULL, &numOfTables);
dumpDbInfoData(pBlock, &infoschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
numOfRows += 1; numOfRows += 1;
SDbObj perfschemaDb = {0}; SDbObj perfschemaDb = {0};
setPerfSchemaDbCfg(&perfschemaDb); setPerfSchemaDbCfg(&perfschemaDb);
dumpDbInfoData(pBlock, &perfschemaDb, pShow, numOfRows, 3, true, 0, 1); numOfTables = 0;
getPerfDbMeta(NULL, &numOfTables);
dumpDbInfoData(pBlock, &perfschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
numOfRows += 1; numOfRows += 1;
pShow->sysDbRsp = true; pShow->sysDbRsp = true;

View File

@ -489,7 +489,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
ASSERT(smaObj.uid != 0); ASSERT(smaObj.uid != 0);
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0}; char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name); snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb",pCreate->name);
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN); memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN); smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
smaObj.stbUid = pStb->uid; smaObj.stbUid = pStb->uid;
@ -603,6 +603,9 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER; if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
mDebug("mndSma: create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d",
pCreate->name, smaObj.uid, smaObj.stbUid, smaObj.dstTbUid, smaObj.dstTbName, smaObj.dstVgId);
code = 0; code = 0;
_OVER: _OVER:

View File

@ -115,24 +115,29 @@ struct SSmaStat {
#define RSMA_FS_LOCK(r) (&(r)->lock) #define RSMA_FS_LOCK(r) (&(r)->lock)
struct SRSmaInfoItem { struct SRSmaInfoItem {
void *taskInfo; // qTaskInfo_t
int64_t refId;
tmr_h tmrId;
int32_t maxDelay;
int8_t level; int8_t level;
int8_t triggerStat; int8_t triggerStat;
int32_t maxDelay;
tmr_h tmrId;
}; };
struct SRSmaInfo { struct SRSmaInfo {
STSchema *pTSchema; STSchema *pTSchema;
int64_t suid; int64_t suid;
int64_t refId; // refId of SRSmaStat
int8_t delFlag; int8_t delFlag;
T_REF_DECLARE() T_REF_DECLARE()
SRSmaInfoItem items[TSDB_RETENTION_L2]; SRSmaInfoItem items[TSDB_RETENTION_L2];
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
void *iTaskInfo[TSDB_RETENTION_L2]; // immutable
}; };
#define RSMA_INFO_HEAD_LEN 24
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1) #define RSMA_INFO_HEAD_LEN 32
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1) #define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i])
#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i])
#define RSMA_INFO_ITEM(r, i) (&(r)->items[i])
enum { enum {
TASK_TRIGGER_STAT_INIT = 0, TASK_TRIGGER_STAT_INIT = 0,
@ -168,8 +173,8 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo);
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln); void *tdAcquireSmaRef(int32_t rsetId, int64_t refId);
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln); int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId);
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType); int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
@ -223,12 +228,11 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName); void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName); void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc); int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc);
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);

View File

@ -381,6 +381,8 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST; terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
metaReaderClear(&mr); metaReaderClear(&mr);
return -1; return -1;
} else if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
terrno = TSDB_CODE_SUCCESS;
} }
metaReaderClear(&mr); metaReaderClear(&mr);

View File

@ -308,12 +308,12 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
* @return int32_t * @return int32_t
*/ */
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) { if (!pEnv) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
// step 1: set rsma stat // step 1: set rsma stat
@ -337,18 +337,26 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
} }
// step 3: swap rsmaInfoHash and iRsmaInfoHash // step 3: swap rsmaInfoHash and iRsmaInfoHash
ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat)); // lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
ASSERT(RSMA_INFO_HASH(pRSmaStat)); ASSERT(RSMA_INFO_HASH(pRSmaStat));
ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat));
RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat); RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat);
RSMA_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat) =
taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!RSMA_INFO_HASH(pRSmaStat)) { if (!RSMA_INFO_HASH(pRSmaStat)) {
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr()); smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// step 4: others // step 4: others
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
@ -383,26 +391,52 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
* @return int32_t * @return int32_t
*/ */
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) { if (!pEnv) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
// step 1: merge rsmaInfoHash and iRsmaInfoHash // step 1: merge rsmaInfoHash and iRsmaInfoHash
taosWLockLatch(SMA_ENV_LOCK(pSmaEnv)); // lock
taosWLockLatch(SMA_ENV_LOCK(pEnv));
#if 0
if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) { if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) {
// TODO: optimization - just switch the hash pointer if rsmaInfoHash is empty // just switch the hash pointer if rsmaInfoHash is empty
} if (taosHashGetSize(RSMA_IMU_INFO_HASH(pRSmaStat)) > 0) {
SHashObj *infoHash = RSMA_INFO_HASH(pRSmaStat);
RSMA_INFO_HASH(pRSmaStat) = RSMA_IMU_INFO_HASH(pRSmaStat);
RSMA_IMU_INFO_HASH(pRSmaStat) = infoHash;
}
} else {
#endif
#if 1
void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL); void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL);
while (pIter) { while (pIter) {
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) { if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
if (refVal == 0) {
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
smaDebug(
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for "
"table:%" PRIi64,
SMA_VID(pSma), *pSuid);
} else {
smaDebug(
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
"table:%" PRIi64,
SMA_VID(pSma), refVal, *pSuid);
}
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter);
continue;
}
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma),
*pSuid); *pSuid);
@ -416,11 +450,14 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter);
} }
#endif
// }
taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat)); taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat));
RSMA_IMU_INFO_HASH(pRSmaStat) = NULL; RSMA_IMU_INFO_HASH(pRSmaStat) = NULL;
taosWUnLockLatch(SMA_ENV_LOCK(pSmaEnv)); // unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// step 2: cleanup outdated qtaskinfo files // step 2: cleanup outdated qtaskinfo files
tdCleanupQTaskInfoFiles(pSma, pRSmaStat); tdCleanupQTaskInfoFiles(pSma, pRSmaStat);

View File

@ -17,7 +17,7 @@
typedef struct SSmaStat SSmaStat; typedef struct SSmaStat SSmaStat;
#define SMA_MGMT_REF_NUM 10240 #define SMA_MGMT_REF_NUM 10240
extern SSmaMgmt smaMgmt; extern SSmaMgmt smaMgmt;
@ -183,9 +183,6 @@ int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
int ref = T_REF_DEC(pRSmaInfo); int ref = T_REF_DEC(pRSmaInfo);
smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
if (ref == 0) {
tdRemoveRSmaInfoBySuid(pSma, pRSmaInfo->suid);
}
return 0; return 0;
} }

View File

@ -32,12 +32,14 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *ui
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
int8_t idx); int8_t idx);
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem, static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid,
STSchema *pTSchema, tb_uid_t suid, int8_t level); int8_t level);
static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid); static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat, static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
int8_t blkType);
static void tdRSmaFetchTrigger(void *param, void *tmrId); static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid,
SRSmaStat *pStat, int8_t blkType);
static void tdRSmaFetchTrigger(void *param, void *tmrId);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
@ -115,17 +117,26 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
if (pInfo) { if (pInfo) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i]; SRSmaInfoItem *pItem = &pInfo->items[i];
if (pItem->taskInfo) {
if (isDeepFree && pItem->tmrId) { if (isDeepFree && pItem->tmrId) {
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid, smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid,
pItem->tmrId, i + 1); pItem->tmrId, i + 1);
taosTmrStopA(&pItem->tmrId); taosTmrStopA(&pItem->tmrId);
} }
tdFreeQTaskInfo(&pItem->taskInfo, SMA_VID(pSma), i + 1);
if (isDeepFree && pInfo->taskInfo[i]) {
tdFreeQTaskInfo(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
} else { } else {
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma), smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
pInfo->suid, i + 1); pInfo->suid, i + 1);
} }
if (pInfo->iTaskInfo[i]) {
tdFreeQTaskInfo(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1);
} else {
smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo",
SMA_VID(pSma), pInfo->suid, i + 1);
}
} }
if (isDeepFree) { if (isDeepFree) {
taosMemoryFreeClear(pInfo->pTSchema); taosMemoryFreeClear(pInfo->pTSchema);
@ -155,7 +166,12 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
pRSmaInfo = tdGetRSmaInfoBySuid(pSma, *suid); if (!taosArrayGetSize(tbUids)) {
smaDebug("vgId:%d, no need to update tbUidList for suid:%" PRIi64 " since Empty tbUids", SMA_VID(pSma), *suid);
return TSDB_CODE_SUCCESS;
}
pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, *suid);
if (!pRSmaInfo) { if (!pRSmaInfo) {
smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid); smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
@ -163,26 +179,21 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (pRSmaInfo->items[0].taskInfo) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if ((qUpdateQualifiedTableId(pRSmaInfo->items[0].taskInfo, tbUids, true) < 0)) { if (pRSmaInfo->taskInfo[i]) {
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr()); if ((qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, true) < 0)) {
return TSDB_CODE_FAILED; tdReleaseRSmaInfo(pSma, pRSmaInfo);
} else { smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma), terrstr());
pRSmaInfo->items[0].taskInfo, *suid, *(int64_t *)taosArrayGet(tbUids, 0)); return TSDB_CODE_FAILED;
} } else {
} smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 " uid:%" PRIi64 " level %d",
SMA_VID(pSma), pRSmaInfo->taskInfo[0], *suid, *(int64_t *)taosArrayGet(tbUids, 0), i);
if (pRSmaInfo->items[1].taskInfo) { }
if ((qUpdateQualifiedTableId(pRSmaInfo->items[1].taskInfo, tbUids, true) < 0)) {
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr());
return TSDB_CODE_FAILED;
} else {
smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma),
pRSmaInfo->items[1].taskInfo, *suid, *(int64_t *)taosArrayGet(tbUids, 0));
} }
} }
tdReleaseRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -267,13 +278,12 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
.initTqReader = 1, .initTqReader = 1,
}; };
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
pItem->refId = RSMA_REF_ID(pStat); if (!pRSmaInfo->taskInfo[idx]) {
pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
if (!pItem->taskInfo) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
pItem->triggerStat = TASK_TRIGGER_STAT_INACTIVE; pItem->triggerStat = TASK_TRIGGER_STAT_INACTIVE;
if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
int64_t msInterval = int64_t msInterval =
@ -342,6 +352,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
} }
pRSmaInfo->pTSchema = pTSchema; pRSmaInfo->pTSchema = pTSchema;
pRSmaInfo->suid = suid; pRSmaInfo->suid = suid;
pRSmaInfo->refId = RSMA_REF_ID(pStat);
T_REF_INIT_VAL(pRSmaInfo, 1); T_REF_INIT_VAL(pRSmaInfo, 1);
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) { if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
@ -411,7 +422,7 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoBySuid(pSma, pReq->suid); SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pReq->suid);
if (!pRSmaInfo) { if (!pRSmaInfo) {
smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name, smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name,
@ -423,8 +434,10 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
RSMA_INFO_SET_DEL(pRSmaInfo); RSMA_INFO_SET_DEL(pRSmaInfo);
tdUnRefRSmaInfo(pSma, pRSmaInfo); tdUnRefRSmaInfo(pSma, pRSmaInfo);
// save to file tdReleaseRSmaInfo(pSma, pRSmaInfo);
// save to file
// TODO
smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid); smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -567,8 +580,32 @@ static void tdDestroySDataBlockArray(SArray *pArray) {
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
} }
static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat, /**
int8_t blkType) { * @brief retention of rsma1/rsma2
*
* @param pSma
* @param now
* @return int32_t
*/
int32_t smaDoRetention(SSma *pSma, int64_t now) {
int32_t code = TSDB_CODE_SUCCESS;
if (VND_IS_RSMA(pSma->pVnode)) {
return code;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) {
code = tsdbDoRetention(pSma->pRSmaTsdb[i], now);
if (code) goto _end;
}
}
_end:
return code;
}
static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid,
SRSmaStat *pStat, int8_t blkType) {
SArray *pResult = NULL; SArray *pResult = NULL;
SSma *pSma = pStat->pSma; SSma *pSma = pStat->pSma;
@ -576,7 +613,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
SSDataBlock *output = NULL; SSDataBlock *output = NULL;
uint64_t ts; uint64_t ts;
int32_t code = qExecTask(pItem->taskInfo, &output, &ts); int32_t code = qExecTask(taskInfo, &output, &ts);
if (code < 0) { if (code < 0) {
smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid,
pItem->level, terrstr(code)); pItem->level, terrstr(code));
@ -637,29 +674,32 @@ _err:
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid,
STSchema *pTSchema, tb_uid_t suid, int8_t level) { int8_t level) {
if (!pItem || !pItem->taskInfo) { int32_t idx = level - 1;
if (!pInfo || !RSMA_INFO_QTASK(pInfo, idx)) {
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (!pTSchema) { if (!pInfo->pTSchema) {
smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
pItem->taskInfo, suid); RSMA_INFO_QTASK(pInfo, idx), suid);
if (qSetMultiStreamInput(pItem->taskInfo, pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT if (qSetMultiStreamInput(RSMA_INFO_QTASK(pInfo, idx), pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat); SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat);
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
tdRSmaFetchAndSubmitResult(pItem, pTSchema, suid, pStat, STREAM_INPUT__DATA_SUBMIT); tdRSmaFetchAndSubmitResult(RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid, pStat,
STREAM_INPUT__DATA_SUBMIT);
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
if (smaMgmt.tmrHandle) { if (smaMgmt.tmrHandle) {
@ -678,7 +718,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
* @param suid * @param suid
* @return SRSmaInfo* * @return SRSmaInfo*
*/ */
static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) { static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = NULL; SRSmaStat *pStat = NULL;
SRSmaInfo *pRSmaInfo = NULL; SRSmaInfo *pRSmaInfo = NULL;
@ -692,94 +732,86 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) {
return NULL; return NULL;
} }
taosRLockLatch(SMA_ENV_LOCK(pEnv));
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
tdRefRSmaInfo(pSma, pRSmaInfo);
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return pRSmaInfo; return pRSmaInfo;
} }
if (RSMA_COMMIT_STAT(pStat) == 0) { if (RSMA_COMMIT_STAT(pStat) == 0) { // return NULL if not in committing stat
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL; return NULL;
} }
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
// clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat // clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat
SRSmaInfo *pCowRSmaInfo = NULL; SRSmaInfo *pCowRSmaInfo = NULL;
// lock // lock
taosWLockLatch(SMA_ENV_LOCK(pEnv)); taosWLockLatch(SMA_ENV_LOCK(pEnv));
if (!taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t))) { // 2-phase lock if (!(pCowRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)))) { // 2-phase lock
void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (iRSmaInfo) { if (iRSmaInfo) {
SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo; SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo;
if (pIRSmaInfo) { if (pIRSmaInfo && !RSMA_INFO_IS_DEL(pIRSmaInfo)) {
if (tdCloneRSmaInfo(pSma, pCowRSmaInfo, pIRSmaInfo) < 0) { if (tdCloneRSmaInfo(pSma, &pCowRSmaInfo, pIRSmaInfo) < 0) {
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr()); smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr());
return NULL; return NULL;
} }
smaDebug("vgId:%d, clone rsma info succeed for suid:%" PRIu64, SMA_VID(pSma), suid); smaDebug("vgId:%d, clone rsma info succeed for suid:%" PRIu64, SMA_VID(pSma), suid);
if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) { if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) {
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr());
return NULL; return NULL;
} }
} }
} }
} else {
pCowRSmaInfo = *(SRSmaInfo **)pCowRSmaInfo;
ASSERT(!pCowRSmaInfo);
}
if(pCowRSmaInfo) {
tdRefRSmaInfo(pSma, pCowRSmaInfo);
} }
// unlock // unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
return pCowRSmaInfo; return pCowRSmaInfo;
} }
/** static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
* @brief During the drop procedure, only need to delete the object in rsmaInfoHash. if (pInfo) {
* tdUnRefRSmaInfo(pSma, pInfo);
* @param pSma
* @param suid
* @return SRSmaInfo*
*/
void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = NULL;
SRSmaInfo *pRSmaInfo = NULL;
if (!pEnv) {
return;
}
pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
if (!pStat || !RSMA_INFO_HASH(pStat)) {
return;
}
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (pRSmaInfo) {
if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
}
taosHashRemove(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
smaDebug("vgId:%d, remove from infoHash for table:%" PRIu64 " succeed", SMA_VID(pSma), suid);
} }
} }
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoBySuid(pSma, suid); SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
if (!pRSmaInfo) { if (!pRSmaInfo) {
smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (!pRSmaInfo->items[0].taskInfo) { if (!RSMA_INFO_QTASK(pRSmaInfo, 0)) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaDebug("vgId:%d, execute rsma, no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid); smaDebug("vgId:%d, execute rsma, no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (inputType == STREAM_INPUT__DATA_SUBMIT) { if (inputType == STREAM_INPUT__DATA_SUBMIT) {
tdRefRSmaInfo(pSma, pRSmaInfo); tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L1);
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L2);
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[0], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L1);
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[1], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L2);
tdUnRefRSmaInfo(pSma, pRSmaInfo);
} }
tdReleaseRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -990,26 +1022,28 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *
SRSmaInfo *pRSmaInfo = NULL; SRSmaInfo *pRSmaInfo = NULL;
void *qTaskInfo = NULL; void *qTaskInfo = NULL;
pRSmaInfo = tdGetRSmaInfoBySuid(pSma, pItem->suid); pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pItem->suid);
if (!pRSmaInfo) { if (!pRSmaInfo) {
smaDebug("vgId:%d, no restore as no rsma info for table:%" PRIu64, SMA_VID(pSma), pItem->suid); smaDebug("vgId:%d, no restore as no rsma info for table:%" PRIu64, SMA_VID(pSma), pItem->suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pItem->type == TSDB_RETENTION_L1) { if (pItem->type == TSDB_RETENTION_L1) {
qTaskInfo = pRSmaInfo->items[0].taskInfo; qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 0);
} else if (pItem->type == TSDB_RETENTION_L2) { } else if (pItem->type == TSDB_RETENTION_L2) {
qTaskInfo = pRSmaInfo->items[1].taskInfo; qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 1);
} else { } else {
ASSERT(0); ASSERT(0);
} }
if (!qTaskInfo) { if (!qTaskInfo) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for table:%" PRIu64, SMA_VID(pSma), pItem->suid); smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for table:%" PRIu64, SMA_VID(pSma), pItem->suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (qDeserializeTaskStatus(qTaskInfo, pItem->qTaskInfo, pItem->len) < 0) { if (qDeserializeTaskStatus(qTaskInfo, pItem->qTaskInfo, pItem->len) < 0) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaError("vgId:%d, restore rsma task failed for table:%" PRIi64 " level %d since %s", SMA_VID(pSma), pItem->suid, smaError("vgId:%d, restore rsma task failed for table:%" PRIi64 " level %d since %s", SMA_VID(pSma), pItem->suid,
pItem->type, terrstr()); pItem->type, terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
@ -1017,6 +1051,7 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *
smaDebug("vgId:%d, restore rsma task success for table:%" PRIi64 " level %d", SMA_VID(pSma), pItem->suid, smaDebug("vgId:%d, restore rsma task success for table:%" PRIi64 " level %d", SMA_VID(pSma), pItem->suid,
pItem->type); pItem->type);
tdReleaseRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1195,8 +1230,14 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
while (infoHash) { while (infoHash) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
infoHash = taosHashIterate(pInfoHash, infoHash);
continue;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
qTaskInfo_t taskInfo = pRSmaInfo->items[i].taskInfo; qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i);
if (!taskInfo) { if (!taskInfo) {
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1);
continue; continue;
@ -1290,11 +1331,17 @@ _err:
static void tdRSmaFetchTrigger(void *param, void *tmrId) { static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SRSmaInfoItem *pItem = param; SRSmaInfoItem *pItem = param;
SSma *pSma = NULL; SSma *pSma = NULL;
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem);
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
return;
}
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
if (!pStat) { if (!pStat) {
smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
pItem->refId); pRSmaInfo->refId);
return; return;
} }
@ -1305,10 +1352,10 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
switch (rsmaTriggerStat) { switch (rsmaTriggerStat) {
case TASK_TRIGGER_STAT_PAUSED: case TASK_TRIGGER_STAT_PAUSED:
case TASK_TRIGGER_STAT_CANCELLED: { case TASK_TRIGGER_STAT_CANCELLED: {
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64 smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64
" refId:%d", " refId:%d",
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pItem->refId); SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId);
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay > 5000 ? 5000 : pItem->maxDelay, pItem, smaMgmt.tmrHandle, taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay > 5000 ? 5000 : pItem->maxDelay, pItem, smaMgmt.tmrHandle,
&pItem->tmrId); &pItem->tmrId);
@ -1319,11 +1366,6 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
break; break;
} }
SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem);
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
goto _end;
}
int8_t fetchTriggerStat = int8_t fetchTriggerStat =
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
switch (fetchTriggerStat) { switch (fetchTriggerStat) {
@ -1332,16 +1374,14 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
pItem->level, pRSmaInfo->suid); pItem->level, pRSmaInfo->suid);
// sync procedure => async process // sync procedure => async process
tdRefRSmaInfo(pSma, pRSmaInfo);
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
qSetMultiStreamInput(pItem->taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK); qTaskInfo_t taskInfo = pRSmaInfo->taskInfo[pItem->level - 1];
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK); qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK);
tdCleanupStreamInputDataBlock(pItem->taskInfo); tdRSmaFetchAndSubmitResult(taskInfo, pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat,
STREAM_INPUT__DATA_BLOCK);
tdCleanupStreamInputDataBlock(taskInfo);
tdUnRefRSmaInfo(pSma, pRSmaInfo);
// atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
// taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
} break; } break;
case TASK_TRIGGER_STAT_PAUSED: { case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused", smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
@ -1362,22 +1402,5 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
} }
_end: _end:
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
}
int32_t smaDoRetention(SSma *pSma, int64_t now) {
int32_t code = TSDB_CODE_SUCCESS;
if (VND_IS_RSMA(pSma->pVnode)) {
return code;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) {
code = tsdbDoRetention(pSma->pRSmaTsdb[i], now);
if (code) goto _end;
}
}
_end:
return code;
} }

View File

@ -116,8 +116,10 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
} }
// create stable to save tsma result in dstVgId // create stable to save tsma result in dstVgId
SName stbFullName = {0};
tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
SVCreateStbReq pReq = {0}; SVCreateStbReq pReq = {0};
pReq.name = pCfg->dstTbName; pReq.name = (char*)tNameGetTableName(&stbFullName);
pReq.suid = pCfg->dstTbUid; pReq.suid = pCfg->dstTbUid;
pReq.schemaRow = pCfg->schemaRow; pReq.schemaRow = pCfg->schemaRow;
pReq.schemaTag = pCfg->schemaTag; pReq.schemaTag = pCfg->schemaTag;
@ -125,6 +127,12 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) { if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) {
return -1; return -1;
} }
smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64
" dstTb:%s dstVg:%d",
SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId);
} else {
ASSERT(0);
} }
return 0; return 0;

View File

@ -287,22 +287,22 @@ int32_t tdRemoveTFile(STFile *pTFile) {
} }
// smaXXXUtil ================ // smaXXXUtil ================
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) { void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) {
void *pResult = taosAcquireRef(rsetId, refId); void *pResult = taosAcquireRef(rsetId, refId);
if (!pResult) { if (!pResult) {
smaWarn("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr()); smaWarn("rsma acquire ref for rsetId:%" PRIi64 " refId:%d failed since %s", rsetId, refId, terrstr());
} else { } else {
smaDebug("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId); smaDebug("rsma acquire ref for rsetId:%" PRIi64 " refId:%d success", rsetId, refId);
} }
return pResult; return pResult;
} }
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) { int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) {
if (taosReleaseRef(rsetId, refId) < 0) { if (taosReleaseRef(rsetId, refId) < 0) {
smaWarn("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr()); smaWarn("rsma release ref for rsetId:%" PRIi64 " refId:%d failed since %s", rsetId, refId, terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
smaDebug("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId); smaDebug("rsma release ref for rsetId:%" PRIi64 " refId:%d success", rsetId, refId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -313,7 +313,7 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
char *pOutput = NULL; char *pOutput = NULL;
int32_t len = 0; int32_t len = 0;
if (qSerializeTaskStatus(srcTaskInfo, &pOutput, &len) < 0) { if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) {
smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid, smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid,
terrstr()); terrstr());
goto _err; goto _err;
@ -337,13 +337,15 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
goto _err; goto _err;
} }
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid); smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid);
taosMemoryFreeClear(pOutput); taosMemoryFreeClear(pOutput);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
taosMemoryFreeClear(pOutput); taosMemoryFreeClear(pOutput);
tdFreeQTaskInfo(dstTaskInfo, TD_VID(pVnode), idx + 1); tdFreeQTaskInfo(dstTaskInfo, TD_VID(pVnode), idx + 1);
smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
@ -355,23 +357,14 @@ _err:
* @param pSrc * @param pSrc
* @return int32_t * @return int32_t
*/ */
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc) { int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc) {
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
SRSmaParam *param = NULL; SRSmaParam *param = NULL;
if (!pSrc) { if (!pSrc) {
*pDest = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (!pDest) {
pDest = taosMemoryCalloc(1, sizeof(SRSmaInfo));
if (!pDest) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
}
memcpy(pDest, pSrc, sizeof(SRSmaInfo));
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0); metaReaderInit(&mr, SMA_META(pSma), 0);
smaDebug("vgId:%d, rsma clone, suid is %" PRIi64, TD_VID(pVnode), pSrc->suid); smaDebug("vgId:%d, rsma clone, suid is %" PRIi64, TD_VID(pVnode), pSrc->suid);
@ -384,21 +377,22 @@ int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc) {
ASSERT(mr.me.uid == pSrc->suid); ASSERT(mr.me.uid == pSrc->suid);
if (TABLE_IS_ROLLUP(mr.me.flags)) { if (TABLE_IS_ROLLUP(mr.me.flags)) {
param = &mr.me.stbEntry.rsmaParam; param = &mr.me.stbEntry.rsmaParam;
for (int i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pSrc->items[i]; if (tdCloneQTaskInfo(pSma, pSrc->iTaskInfo[i], pSrc->taskInfo[i], param, pSrc->suid, i) < 0) {
if (pItem->taskInfo) { goto _err;
tdCloneQTaskInfo(pSma, pDest->items[i].taskInfo, pItem->taskInfo, param, pSrc->suid, i);
} }
} }
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, TD_VID(pVnode), pSrc->suid); smaDebug("vgId:%d, rsma clone env success for %" PRIi64, TD_VID(pVnode), pSrc->suid);
} }
metaReaderClear(&mr); metaReaderClear(&mr);
*pDest = pSrc; // pointer copy
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
*pDest = NULL;
metaReaderClear(&mr); metaReaderClear(&mr);
tdFreeRSmaInfo(pSma, pDest, false); smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid, terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
// ...

View File

@ -351,7 +351,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
// TODO: remove the function // TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// TODO // TODO
blockDebugShowDataBlocks(data, __func__); blockDebugShowDataBlocks(data, __func__);
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
} }
@ -852,6 +851,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) { if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
submitBlkRsp.code = terrno; submitBlkRsp.code = terrno;
pRsp->code = terrno;
tDecoderClear(&decoder); tDecoderClear(&decoder);
taosArrayDestroy(createTbReq.ctb.tagName); taosArrayDestroy(createTbReq.ctb.tagName);
goto _exit; goto _exit;

View File

@ -1449,7 +1449,7 @@ static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t nu
pRow->numOfRows = pResInfo->numOfRes; pRow->numOfRows = pResInfo->numOfRes;
} }
if (fmIsReturnNotNullFunc(pCtx[j].functionId)) { if (fmIsNotNullOutputFunc(pCtx[j].functionId)) {
returnNotNull = true; returnNotNull = true;
} }
} }

View File

@ -221,7 +221,7 @@ bool fmIsLastRowFunc(int32_t funcId) {
return FUNCTION_TYPE_LAST_ROW == funcMgtBuiltins[funcId].type; return FUNCTION_TYPE_LAST_ROW == funcMgtBuiltins[funcId].type;
} }
bool fmIsReturnNotNullFunc(int32_t funcId) { bool fmIsNotNullOutputFunc(int32_t funcId) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) { if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false; return false;
} }

View File

@ -84,10 +84,10 @@ class TDSql:
self.queryResult = None self.queryResult = None
tdLog.info("sql:%s, expect error occured" % (sql)) tdLog.info("sql:%s, expect error occured" % (sql))
def query(self, sql, row_tag=None,queyTimes=10): def query(self, sql, row_tag=None,queryTimes=10):
self.sql = sql self.sql = sql
i=1 i=1
while i <= queyTimes: while i <= queryTimes:
try: try:
self.cursor.execute(sql) self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall() self.queryResult = self.cursor.fetchall()
@ -97,26 +97,15 @@ class TDSql:
return self.queryResult return self.queryResult
return self.queryRows return self.queryRows
except Exception as e: except Exception as e:
i+=1
tdLog.notice("Try to query again, query times: %d "%i) tdLog.notice("Try to query again, query times: %d "%i)
if i == queryTimes:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
i+=1
time.sleep(1) time.sleep(1)
pass pass
else:
try:
tdLog.notice("Try the last query ")
self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
self.queryRows = len(self.queryResult)
self.queryCols = len(self.cursor.description)
if row_tag:
return self.queryResult
return self.queryRows
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
traceback.print_exc()
raise Exception(repr(e))
def is_err_sql(self, sql): def is_err_sql(self, sql):
@ -305,28 +294,23 @@ class TDSql:
time.sleep(1) time.sleep(1)
continue continue
def execute(self, sql,queyTimes=10): def execute(self, sql,queryTimes=10):
self.sql = sql self.sql = sql
i=1 i=1
while i <= queyTimes: while i <= queryTimes:
try: try:
self.affectedRows = self.cursor.execute(sql) self.affectedRows = self.cursor.execute(sql)
return self.affectedRows return self.affectedRows
except Exception as e: except Exception as e:
i+=1
tdLog.notice("Try to execute sql again, query times: %d "%i) tdLog.notice("Try to execute sql again, query times: %d "%i)
if i == queryTimes:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
i+=1
time.sleep(1) time.sleep(1)
pass pass
else:
try:
tdLog.notice("Try the last execute sql ")
self.affectedRows = self.cursor.execute(sql)
return self.affectedRows
except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
raise Exception(repr(e))
def checkAffectedRows(self, expectAffectedRows): def checkAffectedRows(self, expectAffectedRows):
if self.affectedRows != expectAffectedRows: if self.affectedRows != expectAffectedRows:

View File

@ -179,6 +179,7 @@
./test.sh -f tsim/query/scalarFunction.sim ./test.sh -f tsim/query/scalarFunction.sim
./test.sh -f tsim/query/scalarNull.sim ./test.sh -f tsim/query/scalarNull.sim
./test.sh -f tsim/query/session.sim ./test.sh -f tsim/query/session.sim
./test.sh -f tsim/query/udf.sim
# ---- qnode # ---- qnode
./test.sh -f tsim/qnode/basic1.sim ./test.sh -f tsim/qnode/basic1.sim

View File

@ -3,8 +3,8 @@ set +e
rm -rf /tmp/udf/libbitand.so /tmp/udf/libsqrsum.so rm -rf /tmp/udf/libbitand.so /tmp/udf/libsqrsum.so
mkdir -p /tmp/udf mkdir -p /tmp/udf
echo "compile udf bit_and and sqr_sum" echo "compile udf bit_and and sqr_sum"
gcc -fPIC -shared sh/bit_and.c -o /tmp/udf/libbitand.so gcc -fPIC -shared sh/bit_and.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libbitand.so
gcc -fPIC -shared sh/sqr_sum.c -o /tmp/udf/libsqrsum.so gcc -fPIC -shared sh/sqr_sum.c -I../../include/libs/function/ -I../../include/client -I../../include/util -o /tmp/udf/libsqrsum.so
echo "debug show /tmp/udf/*.so" echo "debug show /tmp/udf/*.so"
ls /tmp/udf/*.so ls /tmp/udf/*.so

View File

@ -48,7 +48,7 @@ endi
$replica = 3 $replica = 3
$vgroups = 1 $vgroups = 1
print ============= create database print ============= create database db
sql create database db replica $replica vgroups $vgroups sql create database db replica $replica vgroups $vgroups
$loop_cnt = 0 $loop_cnt = 0
@ -135,7 +135,7 @@ endw
$totalTblNum = $tbNum * 2 $totalTblNum = $tbNum * 2
sleep 1000 sleep 1000
sql show tables sql show tables
print ====> expect $totalTblNum and infinsert $rows in fact print ====> expect $totalTblNum and insert $rows in fact
if $rows != $totalTblNum then if $rows != $totalTblNum then
return -1 return -1
endi endi

View File

@ -202,7 +202,7 @@ else
endi endi
vg_ready: vg_ready:
print ====> create stable/child table print ====> create stable stb /child table ctb and general table ntb
sql create table stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int) sql create table stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int)
sql show stables sql show stables
@ -225,7 +225,7 @@ endw
$totalTblNum = $tbNum * 2 $totalTblNum = $tbNum * 2
sleep 1000 sleep 1000
sql show tables sql show tables
print ====> expect $totalTblNum and infinsert $rows in fact print ====> expect $totalTblNum and insert $rows in fact
if $rows != $totalTblNum then if $rows != $totalTblNum then
return -1 return -1
endi endi
@ -237,6 +237,7 @@ sql show vgroups
$dnodeId = $data[0][3] $dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId $dnodeId = dnode . $dnodeId
print ====> switch_leader_to_offine_loop
switch_leader_to_offine_loop: switch_leader_to_offine_loop:
print $dnodeId print $dnodeId
@ -271,22 +272,7 @@ system sh/exec.sh -n $dnodeId -s start
$switch_loop_cnt = $switch_loop_cnt + 1 $switch_loop_cnt = $switch_loop_cnt + 1
print $switch_loop_cnt print $switch_loop_cnt
if $switch_loop_cnt == 1 then if $switch_loop_cnt <= 4 then
sql show vgroups
$dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId
goto switch_leader_to_offine_loop
elif $switch_loop_cnt == 2 then
sql show vgroups
$dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId
goto switch_leader_to_offine_loop
elif $switch_loop_cnt == 3 then
sql show vgroups
$dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId
goto switch_leader_to_offine_loop
elif $switch_loop_cnt == 4 then
sql show vgroups sql show vgroups
$dnodeId = $data[0][3] $dnodeId = $data[0][3]
$dnodeId = dnode . $dnodeId $dnodeId = dnode . $dnodeId

View File

@ -1,7 +1,7 @@
{ {
"filetype": "insert", "filetype": "insert",
"cfgdir": "/etc/taos/", "cfgdir": "/etc/taos/",
"host": "test216", "host": "localhost",
"port": 6030, "port": 6030,
"user": "root", "user": "root",
"password": "taosdata", "password": "taosdata",
@ -10,14 +10,14 @@
"result_file": "./insert_res.txt", "result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no", "confirm_parameter_prompt": "no",
"insert_interval": 0, "insert_interval": 0,
"interlace_rows": 1000, "interlace_rows": 0,
"num_of_records_per_req": 100000, "num_of_records_per_req": 100000,
"databases": [ "databases": [
{ {
"dbinfo": { "dbinfo": {
"name": "db", "name": "db",
"drop": "yes", "drop": "yes",
"vgroups": 24 "vgroups": 8
}, },
"super_tables": [ "super_tables": [
{ {
@ -29,8 +29,8 @@
"batch_create_tbl_num": 50000, "batch_create_tbl_num": 50000,
"data_source": "rand", "data_source": "rand",
"insert_mode": "taosc", "insert_mode": "taosc",
"insert_rows": 5, "insert_rows": 1000,
"interlace_rows": 100000, "interlace_rows": 0,
"insert_interval": 0, "insert_interval": 0,
"max_sql_len": 10000000, "max_sql_len": 10000000,
"disorder_ratio": 0, "disorder_ratio": 0,

View File

@ -4,6 +4,8 @@ import sys
import datetime import datetime
import inspect import inspect
import random import random
from util.dnodes import TDDnode
from util.dnodes import tdDnodes
from util.log import * from util.log import *
from util.sql import * from util.sql import *
@ -30,10 +32,10 @@ class TDTestCase:
for i in range(ctbNum): for i in range(ctbNum):
tagValue = 'beijing' tagValue = 'beijing'
if (i % 10 == 0): if (i % 10 == 0):
sql += " %s%d using %s (name,fleet,driver,device_version) tags('truck_%d', 'South%d','Trish%d','v2.%d')"%(ctbPrefix,i,stbName,i,i,i,i) sql += " %s%d using %s (name,fleet,driver,device_version,load_capacity,fuel_capacity,nominal_fuel_consumption) tags('truck_%d', 'South%d','Trish%d','v2.%d', 1500+%d*20, 150+%d*2, 5+%d)"%(ctbPrefix,i,stbName,i,i,i,i,(1500+i*20),(150+i*2),(5+i))
else: else:
model = 'H-%d'%i model = 'H-%d'%i
sql += " %s%d using %s tags('truck_%d', 'South%d','Trish%d','%s','v2.%d')"%(ctbPrefix,i,stbName,i,i,i,model,i) sql += " %s%d using %s tags('truck_%d', 'South%d','Trish%d','%s','v2.%d', %d, %d,%d)"%(ctbPrefix,i,stbName,i,i,i,model,i,(1500+i*20),(150+i*2),(5+i))
if (i > 0) and (i%1000 == 0): if (i > 0) and (i%1000 == 0):
tsql.execute(sql) tsql.execute(sql)
sql = pre_create sql = pre_create
@ -55,10 +57,10 @@ class TDTestCase:
sql += " %s%d values "%(ctbPrefix,i) sql += " %s%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl): for j in range(rowsPerTbl):
if(ctbPrefix=="rct"): if(ctbPrefix=="rct"):
sql += f"({startTs+j*60000}, {80+j}, {90+j}, {85+j}, {30+j*10}, {1.2*j}, {221+j*2}, {20+j*0.2}, {1500+j*20}, {150+j*2},{5+j}) " sql += f"({startTs+j*60000}, {80+j}, {90+j}, {85+j}, {30+j*10}, {1.2*j}, {221+j*2}, {20+j*0.2}) "
elif ( ctbPrefix=="dct"): elif ( ctbPrefix=="dct"):
status= random.randint(0,1) status= random.randint(0,1)
sql += f"( {startTs+j*60000}, {1+j*0.1},{1400+j*15}, {status},{1500+j*20}, {150+j*2},{5+j} ) " sql += f"( {startTs+j*60000}, {1+j*0.1},{1400+j*15}, {status} ) "
# tdLog.debug("1insert sql:%s"%sql) # tdLog.debug("1insert sql:%s"%sql)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
# tdLog.debug("2insert sql:%s"%sql) # tdLog.debug("2insert sql:%s"%sql)
@ -79,22 +81,22 @@ class TDTestCase:
stabname2="diagnostics" stabname2="diagnostics"
ctbnamePre1="rct" ctbnamePre1="rct"
ctbnamePre2="dct" ctbnamePre2="dct"
ctbNums=40 ctbNums=50
self.ctbNums=ctbNums self.ctbNums=ctbNums
rowNUms=200 rowNUms=5000
ts=1451606400000 ts=1451606400000
tdSql.execute(f"create database {dbname};") tdSql.execute(f"create database {dbname};")
tdSql.execute(f"use {dbname} ") tdSql.execute(f"use {dbname} ")
tdSql.execute(f''' tdSql.execute(f'''
create table {stabname1} (ts timestamp,latitude double,longitude double,elevation double,velocity double,heading double,grade double,fuel_consumption double,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30)); create table {stabname1} (ts timestamp,latitude double,longitude double,elevation double,velocity double,heading double,grade double,fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30),load_capacity double,fuel_capacity double,nominal_fuel_consumption double);
''') ''')
tdSql.execute(f''' tdSql.execute(f'''
create table {stabname2} (ts timestamp,fuel_state double,current_load double,status bigint,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30)) ; create table {stabname2} (ts timestamp,fuel_state double,current_load double,status bigint) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30),load_capacity double,fuel_capacity double,nominal_fuel_consumption double) ;
''') ''')
self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname1,ctbPrefix=ctbnamePre1,ctbNum=ctbNums) self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname1,ctbPrefix=ctbnamePre1,ctbNum=ctbNums)
self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname2,ctbPrefix=ctbnamePre2,ctbNum=ctbNums) self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname2,ctbPrefix=ctbnamePre2,ctbNum=ctbNums)
self.insertData(tsql=tdSql,dbName=dbname,stbName=stabname1,ctbPrefix=ctbnamePre1,ctbNum=ctbNums,rowsPerTbl=rowNUms,startTs=ts,batchNum=1000) self.insertData(tsql=tdSql,dbName=dbname,stbName=stabname1,ctbPrefix=ctbnamePre1,ctbNum=ctbNums,rowsPerTbl=rowNUms,startTs=ts,batchNum=10000)
self.insertData(tsql=tdSql,dbName=dbname,stbName=stabname2,ctbPrefix=ctbnamePre2,ctbNum=ctbNums,rowsPerTbl=rowNUms,startTs=ts,batchNum=1000) self.insertData(tsql=tdSql,dbName=dbname,stbName=stabname2,ctbPrefix=ctbnamePre2,ctbNum=ctbNums,rowsPerTbl=rowNUms,startTs=ts,batchNum=10000)
# for i in range(ctbNum): # for i in range(ctbNum):
# if i %10 == 0 : # if i %10 == 0 :
# # tdLog.debug(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}', NULL,'v2.3')") # # tdLog.debug(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}', NULL,'v2.3')")
@ -131,7 +133,7 @@ class TDTestCase:
# tdLog.info("avg value check pass , it work as expected ,sql is \"%s\" "%check_query ) # tdLog.info("avg value check pass , it work as expected ,sql is \"%s\" "%check_query )
def tsbsIotQuery(self): def tsbsIotQuery(self,insertinto=True):
tdSql.execute("use db_tsbs") tdSql.execute("use db_tsbs")
@ -143,10 +145,11 @@ class TDTestCase:
# test insert into # test insert into
tdSql.execute("create table testsnode (ts timestamp, c1 float,c2 binary(30),c3 binary(30),c4 binary(30)) ;") if insertinto == True :
tdSql.query("insert into testsnode SELECT ts,avg(velocity) as mean_velocity,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet,ts interval(10m);") tdSql.execute("create table testsnode (ts timestamp, c1 float,c2 binary(30),c3 binary(30),c4 binary(30)) ;")
tdSql.query("insert into testsnode SELECT ts,avg(velocity) as mean_velocity,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet,ts interval(10m);")
tdSql.query("insert into testsnode(ts,c1,c2,c3,c4) SELECT ts,avg(velocity) as mean_velocity,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet,ts interval(10m);") tdSql.query("insert into testsnode(ts,c1,c2,c3,c4) SELECT ts,avg(velocity) as mean_velocity,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet,ts interval(10m);")
# test paitition interval fill # test paitition interval fill
@ -182,8 +185,7 @@ class TDTestCase:
# # 6. avg-daily-driving-session # # 6. avg-daily-driving-session
# #taosc core dumped # #taosc core dumped
# tdSql.execute("create table random_measure2_1 (ts timestamp,ela float, name binary(40))") tdSql.query(" SELECT _wstart as ts,name,floor(avg(velocity)/5) AS mv FROM readings WHERE name is not null AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0);")
# tdSql.query("SELECT ts,diff(mv) AS difka FROM (SELECT ts,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name,ts interval(10m) fill(value,0)) GROUP BY name,ts;")
# tdSql.query("select name,diff(mv) AS difka FROM (SELECT ts,name,mv FROM (SELECT _wstart as ts,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0))) group BY name ;") # tdSql.query("select name,diff(mv) AS difka FROM (SELECT ts,name,mv FROM (SELECT _wstart as ts,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0))) group BY name ;")
# tdSql.query("SELECT _wstart,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0)") # tdSql.query("SELECT _wstart,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0)")
@ -234,7 +236,9 @@ class TDTestCase:
tdLog.printNoPrefix("==========step1:create database and table,insert data ==============") tdLog.printNoPrefix("==========step1:create database and table,insert data ==============")
self.prepareData() self.prepareData()
self.tsbsIotQuery() self.tsbsIotQuery()
tdDnodes.stop(1)
tdDnodes.start(1)
self.tsbsIotQuery(False)
def stop(self): def stop(self):
tdSql.close() tdSql.close()

View File

@ -216,7 +216,7 @@ python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_query
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_follower.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_follower.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_leader_force_stop.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_leader_force_stop.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_leader.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_leader.py -N 4 -M 1
python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups_stopOne.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups_stopOne.py -N 4 -M 1