From 80711ce100735c1d8f8adb02d9c5b311579cf5fc Mon Sep 17 00:00:00 2001 From: kevin men Date: Thu, 20 Mar 2025 20:27:04 +0800 Subject: [PATCH 1/2] docs: use source code reference in the Perspective documentation. (#30301) * docs(zh): add perspective document * docs(zh): fix errors in the document * docs(zh): increase the font size of the image * docs(en): write a Perspective English document * docs(en): add Perspective English document * docs(en): add perspective docsument images * docs: add perspective docsument source code use reference * docs: add perspective docsument images notes * docs: adjust perspective docsument format * docs: delete blank lines * Update 02-perspective.md docs: delete blank lines in multiple documents --- .../03-visual/02-perspective.md | 113 +--------- .../perspective/perspective_server.py | 207 ++++++++++++++++++ docs/examples/perspective/prsp-viewer.html | 135 ++++++++++++ .../03-visual/02-perspective.mdx | 116 +--------- 4 files changed, 356 insertions(+), 215 deletions(-) create mode 100644 docs/examples/perspective/perspective_server.py create mode 100644 docs/examples/perspective/prsp-viewer.html diff --git a/docs/en/10-third-party/03-visual/02-perspective.md b/docs/en/10-third-party/03-visual/02-perspective.md index abb2baae23..056abed8f6 100644 --- a/docs/en/10-third-party/03-visual/02-perspective.md +++ b/docs/en/10-third-party/03-visual/02-perspective.md @@ -19,7 +19,7 @@ Perform the following installation operations in the Linux system: - Python version 3.10 or higher has been installed (if not installed, please refer to [Python Installation](https://docs.python.org/)). - Download or clone the [perspective-connect-demo](https://github.com/taosdata/perspective-connect-demo) project. After entering the root directory of the project, run the "install.sh" script to download and install the TDengine client library and related dependencies locally. -## Data Analysis +## Visualize data **Step 1**, Run the "run.sh" script in the root directory of the [perspective-connect-demo](https://github.com/taosdata/perspective-connect-demo) project to start the Perspective service. This service will retrieve data from the TDengine database every 300 milliseconds and transmit the data in a streaming form to the web-based `Perspective Viewer`. @@ -33,6 +33,8 @@ sh run.sh python -m http.server 8081 ``` +The effect presented after accessing the web page through the browser is shown in the following figure: + ![perspective-viewer](./perspective/prsp_view.webp) ## Instructions for use @@ -56,40 +58,9 @@ The `perspective_server.py` script in the root directory of the [perspective-con 3. Create a Perspective table (the table structure needs to match the type of the table in the TDengine database). 4. Call the `Tornado.PeriodicCallback` function to start a scheduled task, thereby achieving the update of the data in the Perspective table. The sample code is as follows: - ```python - def perspective_thread(perspective_server: perspective.Server, tdengine_conn: taosws.Connection): - """ - Create a new Perspective table and update it with new data every 50ms - """ - # create a new Perspective table - client = perspective_server.new_local_client() - schema = { - "timestamp": datetime, - "location": str, - "groupid": int, - "current": float, - "voltage": int, - "phase": float, - } - # define the table schema - table = client.table( - schema, - limit=1000, # maximum number of rows in the table - name=PERSPECTIVE_TABLE_NAME, # table name. Use this with perspective-viewer on the client side - ) - logger.info("Created new Perspective table") - - # update with new data - def updater(): - data = read_tdengine(tdengine_conn) - table.update(data) - logger.debug(f"Updated Perspective table: {len(data)} rows") - - logger.info(f"Starting tornado ioloop update loop every {PERSPECTIVE_REFRESH_RATE} milliseconds") - # start the periodic callback to update the table data - callback = tornado.ioloop.PeriodicCallback(callback=updater, callback_time=PERSPECTIVE_REFRESH_RATE) - callback.start() - ``` +```python +{{#include docs/examples/perspective/perspective_server.py:perspective_server}} +``` ### HTML Page Configuration @@ -100,77 +71,7 @@ The `prsp-viewer.html` file in the root directory of the [perspective-connect-de - Import the Perspective library, connect to the Perspective server via a WebSocket, and load the `meters_values` table to display dynamic data. ```html - - - -
-
- -
-
+{{#include docs/examples/perspective/prsp-viewer.html:perspective_viewer}} ``` ## Reference Materials diff --git a/docs/examples/perspective/perspective_server.py b/docs/examples/perspective/perspective_server.py new file mode 100644 index 0000000000..919828379e --- /dev/null +++ b/docs/examples/perspective/perspective_server.py @@ -0,0 +1,207 @@ +# ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +# ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃ +# ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃ +# ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃ +# ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃ +# ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫ +# ┃ Copyright (c) 2017, the Perspective Authors. ┃ +# ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃ +# ┃ This file is part of the Perspective library, distributed under the terms ┃ +# ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃ +# ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + +import logging +import tornado.websocket +import tornado.web +import tornado.ioloop +from datetime import date, datetime +import perspective +import perspective.handlers.tornado +import json +import taosws + + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger('main') + + +# ============================================================================= +# TDengine connection parameters +# ============================================================================= +TAOS_HOST = "localhost" # TDengine server host +TAOS_PORT = 6041 # TDengine server port +TAOS_USER = "root" # TDengine username +TAOS_PASSWORD = "taosdata" # TDengine password + +TAOS_DATABASE = "power" # TDengine database name +TAOS_TABLENAME = "meters" # TDengine table name + +# ============================================================================= +# Perspective server parameters +# ============================================================================= +PERSPECTIVE_TABLE_NAME = "meters_values" # name of the Perspective table +PERSPECTIVE_REFRESH_RATE = 250 # refresh rate in milliseconds + + +class CustomJSONEncoder(json.JSONEncoder): + """ + Custom JSON encoder that serializes datetime and date objects + """ + def default(self, obj): + if isinstance(obj, datetime): + return obj.isoformat() + elif isinstance(obj, date): + return obj.isoformat() + return super().default(obj) + + +json.JSONEncoder.default = CustomJSONEncoder().default + + +def convert_ts(ts) -> datetime: + """ + Convert a timestamp string to a datetime object + """ + for fmt in ('%Y-%m-%d %H:%M:%S.%f %z', '%Y-%m-%d %H:%M:%S %z'): + try: + return datetime.strptime(ts, fmt) + except ValueError: + continue + raise ValueError(f"Time data '{ts}' does not match any format") + + +def create_tdengine_connection( + host: str = TAOS_HOST, + port: int = TAOS_PORT, + user: str = TAOS_USER, + password: str = TAOS_PASSWORD, + ) -> taosws.Connection: + try: + # connect to the tdengine server + conn = taosws.connect( + user=user, + password=password, + host=host, + port=port, + ) + # switch to the right database + conn.execute(f"USE {TAOS_DATABASE}") + # connection successful + logger.info(f"Connected to tdengine successfully: {host}:{port}") + return conn + except Exception as err: + logger.error(f"Failed to connect to tdengine: {host}:{port} -- ErrMessage: {err}") + raise err + + +def read_tdengine( + conn: taosws.Connection, + ) -> list[dict]: + try: + # query the database + sql = f""" + SELECT `ts`, location, groupid, current, voltage, phase + FROM {TAOS_TABLENAME} + WHERE `ts` >= NOW() - 12h + ORDER BY `ts` DESC + LIMIT 1000 + """ + logger.debug(f"Executing query: {sql}") + res = conn.query(sql) + data = [ + { + "timestamp": convert_ts(row[0]), + "location": row[1], + "groupid": row[2], + "current": row[3], + "voltage": row[4], + "phase": row[5], + } + for row in res + ] + logger.info(f"select result: {data}") + return data + except Exception as err: + logger.error(f"Failed to query tdengine: {err}") + raise err + + +// ANCHOR: perspective_server +def perspective_thread(perspective_server: perspective.Server, tdengine_conn: taosws.Connection): + """ + Create a new Perspective table and update it with new data every 50ms + """ + # create a new Perspective table + client = perspective_server.new_local_client() + schema = { + "timestamp": datetime, + "location": str, + "groupid": int, + "current": float, + "voltage": int, + "phase": float, + } + # define the table schema + table = client.table( + schema, + limit=1000, # maximum number of rows in the table + name=PERSPECTIVE_TABLE_NAME, # table name. Use this with perspective-viewer on the client side + ) + logger.info("Created new Perspective table") + + # update with new data + def updater(): + data = read_tdengine(tdengine_conn) + table.update(data) + logger.debug(f"Updated Perspective table: {len(data)} rows") + + logger.info(f"Starting tornado ioloop update loop every {PERSPECTIVE_REFRESH_RATE} milliseconds") + # start the periodic callback to update the table data + callback = tornado.ioloop.PeriodicCallback(callback=updater, callback_time=PERSPECTIVE_REFRESH_RATE) + callback.start() + +// ANCHOR_END: perspective_server + +def make_app(perspective_server): + """ + Create a new Tornado application with a websocket handler that + serves a Perspective table. PerspectiveTornadoHandler handles + the websocket connection and streams the Perspective table changes + to the client. + """ + return tornado.web.Application([ + ( + r"/websocket", # websocket endpoint. Use this URL to configure the websocket client OR Prospective Server adapter + perspective.handlers.tornado.PerspectiveTornadoHandler, # PerspectiveTornadoHandler handles perspective table updates <-> websocket client + {"perspective_server": perspective_server}, # pass the perspective server to the handler + ), + ]) + + +if __name__ == "__main__": + logger.info("TDEngine <-> Perspective Demo") + + # create a new Perspective server + logger.info("Creating new Perspective server") + perspective_server = perspective.Server() + # create the tdengine connection + logger.info("Creating new TDEngine connection") + tdengine_conn = create_tdengine_connection() + + # setup and start the Tornado app + logger.info("Creating Tornado server") + app = make_app(perspective_server) + app.listen(8085, address='0.0.0.0') + logger.info("Listening on http://localhost:8080") + + try: + # start the io loop + logger.info("Starting ioloop to update Perspective table data via tornado websocket...") + loop = tornado.ioloop.IOLoop.current() + loop.call_later(0, perspective_thread, perspective_server, tdengine_conn) + loop.start() + except KeyboardInterrupt: + logger.warning("Keyboard interrupt detected. Shutting down tornado server...") + loop.stop() + loop.close() + logging.info("Shut down") diff --git a/docs/examples/perspective/prsp-viewer.html b/docs/examples/perspective/prsp-viewer.html new file mode 100644 index 0000000000..e6b1a6e734 --- /dev/null +++ b/docs/examples/perspective/prsp-viewer.html @@ -0,0 +1,135 @@ + + + + + + + Perspective Viewer Dashboard + + + + + + + + + + + +// ANCHOR: perspective_viewer + + + +
+
+ +
+
+// ANCHOR_END: perspective_viewer + + + \ No newline at end of file diff --git a/docs/zh/10-third-party/03-visual/02-perspective.mdx b/docs/zh/10-third-party/03-visual/02-perspective.mdx index 8e92e7209b..1db8474570 100644 --- a/docs/zh/10-third-party/03-visual/02-perspective.mdx +++ b/docs/zh/10-third-party/03-visual/02-perspective.mdx @@ -22,7 +22,7 @@ Perspective 是一款开源且强大的数据可视化库,由 [Prospective.co] - Python 3.10 及以上版本已安装(如未安装,可参考 [Python 安装](https://docs.python.org/)。 - 下载或克隆 [perspective-connect-demo](https://github.com/taosdata/perspective-connect-demo) 项目,进入项目根目录后运行 “install.sh” 脚本,以便在本地下载并安装 TDengine 客户端库以及相关的依赖项。 -## 数据分析 +## 可视化数据 **第 1 步**,运行 [perspective-connect-demo](https://github.com/taosdata/perspective-connect-demo) 项目根目录中的 “run.sh” 脚本,以此启动 Perspective 服务。该服务会每隔 300 毫秒从 TDengine 数据库中获取一次数据,并将数据以流的形式传输至基于 Web 的 `Perspective Viewer` 。 @@ -36,6 +36,8 @@ sh run.sh python -m http.server 8081 ``` +通过浏览器访问该 Web 页面后所呈现出的效果如下图所示: + ![perspective-viewer](./perspective/prsp_view.webp) ## 使用说明 @@ -59,40 +61,9 @@ Python 连接器详细写入说明可参见 [Python 参数绑定](../../../refer 1. 创建一个 Perspective 表(表结构需要与 TDengine 数据库中表的类型保持匹配)。 1. 调用 `Tornado.PeriodicCallback` 函数来启动定时任务,进而实现对 Perspective 表数据的更新,示例代码如下: - ```python - def perspective_thread(perspective_server: perspective.Server, tdengine_conn: taosws.Connection): - """ - Create a new Perspective table and update it with new data every 50ms - """ - # create a new Perspective table - client = perspective_server.new_local_client() - schema = { - "timestamp": datetime, - "location": str, - "groupid": int, - "current": float, - "voltage": int, - "phase": float, - } - # define the table schema - table = client.table( - schema, - limit=1000, # maximum number of rows in the table - name=PERSPECTIVE_TABLE_NAME, # table name. Use this with perspective-viewer on the client side - ) - logger.info("Created new Perspective table") - - # update with new data - def updater(): - data = read_tdengine(tdengine_conn) - table.update(data) - logger.debug(f"Updated Perspective table: {len(data)} rows") - - logger.info(f"Starting tornado ioloop update loop every {PERSPECTIVE_REFRESH_RATE} milliseconds") - # start the periodic callback to update the table data - callback = tornado.ioloop.PeriodicCallback(callback=updater, callback_time=PERSPECTIVE_REFRESH_RATE) - callback.start() - ``` +```python +{{#include docs/examples/perspective/perspective_server.py:perspective_server}} +``` ### HTML 页面配置 @@ -103,80 +74,7 @@ Python 连接器详细写入说明可参见 [Python 参数绑定](../../../refer - 引入 Perspective 库,通过 WebSocket 连接到 Perspective 服务器,加载 meters_values 表来展示动态数据。 ```html - - - - -
-
- -
-
+{{#include docs/examples/perspective/prsp-viewer.html:perspective_viewer}} ``` ## 参考资料 From 7cffc7aff249cc364a446d1ee7a7cb65c1b4b518 Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Thu, 20 Mar 2025 22:42:27 +0800 Subject: [PATCH 2/2] fix: [TD-34074] Forbid virtual table in tq and stmt. --- docs/en/14-reference/09-error-code.md | 17 ++- docs/zh/14-reference/09-error-code.md | 15 +- include/libs/nodes/plannodes.h | 2 + include/util/taoserror.h | 3 + source/libs/executor/inc/dynqueryctrl.h | 1 + .../libs/executor/src/dynqueryctrloperator.c | 79 +++++++--- source/libs/executor/src/exchangeoperator.c | 2 +- source/libs/executor/src/scanoperator.c | 21 ++- source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/nodes/src/nodesMsgFuncs.c | 7 + source/libs/parser/src/parInsertSql.c | 6 + source/libs/parser/src/parTranslater.c | 29 ++-- source/libs/planner/src/planLogicCreater.c | 1 + source/libs/planner/src/planOptimizer.c | 3 + source/libs/planner/src/planPhysiCreater.c | 1 + source/util/src/terror.c | 3 + tests/system-test/1-insert/stmt_error.py | 141 ++++++++++++++++++ 17 files changed, 276 insertions(+), 56 deletions(-) diff --git a/docs/en/14-reference/09-error-code.md b/docs/en/14-reference/09-error-code.md index 1aa62fbfce..9307b7f240 100644 --- a/docs/en/14-reference/09-error-code.md +++ b/docs/en/14-reference/09-error-code.md @@ -559,10 +559,13 @@ This document details the server error codes that may be encountered when using ## virtual table -| Error Code | Description | Possible Error Scenarios or Reasons | Recommended Actions for Users | -|------------|---------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------| -| 0x80006200 | Virtual table scan internal error | virtual table scan operator internal error, generally does not occur | Check error logs, contact development for handling | -| 0x80006201 | Virtual table scan invalid downstream operator type | The incorrect execution plan generated causes the downstream operator type of the virtual table scan operator to be incorrect. | Check error logs, contact development for handling | -| 0x80006202 | Virtual table prim timestamp column should not has ref | The timestamp primary key column of a virtual table should not have a data source. If it does, this error will occur during subsequent queries on the virtual table. | Check error logs, contact development for handling | -| 0x80006203 | Create virtual child table must use virtual super table | Create virtual child table using non-virtual super table | create virtual child table using virtual super table | -| 0x80006204 | Virtual table not support decimal type | Create virtual table using decimal type | create virtual table without using decimal type | +| Error Code | Description | Possible Error Scenarios or Reasons | Recommended Actions for Users | +|------------|---------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------| +| 0x80006200 | Virtual table scan internal error | virtual table scan operator internal error, generally does not occur | Check error logs, contact development for handling | +| 0x80006201 | Virtual table scan invalid downstream operator type | The incorrect execution plan generated causes the downstream operator type of the virtual table scan operator to be incorrect. | Check error logs, contact development for handling | +| 0x80006202 | Virtual table prim timestamp column should not has ref | The timestamp primary key column of a virtual table should not have a data source. If it does, this error will occur during subsequent queries on the virtual table. | Check error logs, contact development for handling | +| 0x80006203 | Create virtual child table must use virtual super table | Create virtual child table using non-virtual super table | create virtual child table using virtual super table | +| 0x80006204 | Virtual table not support decimal type | Create virtual table using decimal type | create virtual table without using decimal type | +| 0x80006205 | Virtual table not support in STMT query and STMT insert | Use virtual table in stmt query and stmt insert | do not use virtual table in stmt query and insert | +| 0x80006206 | Virtual table not support in Topic | Use virtual table in topic | do not use virtual table in topic | +| 0x80006206 | Virtual super table query not support origin table from different databases | Virtual super table ‘s child table's origin table from different databases | make sure virtual super table's child table's origin table from same database | diff --git a/docs/zh/14-reference/09-error-code.md b/docs/zh/14-reference/09-error-code.md index 4cf853283e..e2bf5b2776 100644 --- a/docs/zh/14-reference/09-error-code.md +++ b/docs/zh/14-reference/09-error-code.md @@ -578,11 +578,14 @@ description: TDengine 服务端的错误码列表和详细说明 ## virtual table -| 错误码 | 错误描述 | 可能的出错场景或者可能的原因 | 建议用户采取的措施 | -|------------|---------------------------------------------------------|------------------------------------------------|----------------------------| -| 0x80006200 | Virtual table scan 算子内部错误 | virtual table scan 算子内部逻辑错误,一般不会出现 | 具体查看client端的错误日志提示 | -| 0x80006201 | Virtual table scan invalid downstream operator type | 由于生成的执行计划不对,导致 virtual table scan 算子的下游算子类型不正确 | 保留 explain 执行计划,联系开发处理 | -| 0x80006202 | Virtual table prim timestamp column should not has ref | 虚拟表的时间戳主键列不应该有数据源,如果有,后续查询虚拟表的时候就会出现该错误 | 检查错误日志,联系开发处理 | -| 0x80006203 | Create virtual child table must use virtual super table | 虚拟子表必须建在虚拟超级表下,否则就会出现该错误 | 创建虚拟子表的时候,USING 虚拟超级表 | +| 错误码 | 错误描述 | 可能的出错场景或者可能的原因 | 建议用户采取的措施 | +|------------|---------------------------------------------------------|------------------------------------------------|-------------------------| +| 0x80006200 | Virtual table scan 算子内部错误 | virtual table scan 算子内部逻辑错误,一般不会出现 | 具体查看client端的错误日志提示 | +| 0x80006201 | Virtual table scan invalid downstream operator type | 由于生成的执行计划不对,导致 virtual table scan 算子的下游算子类型不正确 | 保留 explain 执行计划,联系开发处理 | +| 0x80006202 | Virtual table prim timestamp column should not has ref | 虚拟表的时间戳主键列不应该有数据源,如果有,后续查询虚拟表的时候就会出现该错误 | 检查错误日志,联系开发处理 | +| 0x80006203 | Create virtual child table must use virtual super table | 虚拟子表必须建在虚拟超级表下,否则就会出现该错误 | 创建虚拟子表的时候,USING 虚拟超级表 | | 0x80006204 | Virtual table not support decimal type | 虚拟表不支持 decimal 类型 | 创建虚拟表时不使用 decimal 类型的列/tag | +| 0x80006205 | Virtual table not support in STMT query and STMT insert | 不支持在 stmt 写入和查询中使用虚拟表 | 不在 stmt 写入和查询中使用虚拟表 | +| 0x80006206 | Virtual table not support in Topic | 不支持在订阅中使用虚拟表 | 不在订阅中使用虚拟表 | +| 0x80006207 | Virtual super table query not support origin table from different databases | 虚拟超级表不支持子表的数据源来自不同的数据库 | 确保虚拟超级表的子表的数据源都来自同一个数据库 | diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 328f7f6a8f..4d05bb8c80 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -263,6 +263,7 @@ typedef struct SDynQueryCtrlStbJoin { typedef struct SDynQueryCtrlVtbScan { bool scanAllCols; + char dbName[TSDB_DB_NAME_LEN]; uint64_t suid; SVgroupsInfo* pVgroupList; } SDynQueryCtrlVtbScan; @@ -666,6 +667,7 @@ typedef struct SStbJoinDynCtrlBasic { typedef struct SVtbScanDynCtrlBasic { bool scanAllCols; + char dbName[TSDB_DB_NAME_LEN]; uint64_t suid; int32_t accountId; SEpSet mgmtEpSet; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ee523166a6..1a9a5583df 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -1072,6 +1072,9 @@ int32_t taosGetErrSize(); #define TSDB_CODE_VTABLE_PRIMTS_HAS_REF TAOS_DEF_ERROR_CODE(0, 0x6202) #define TSDB_CODE_VTABLE_NOT_VIRTUAL_SUPER_TABLE TAOS_DEF_ERROR_CODE(0, 0x6203) #define TSDB_CODE_VTABLE_NOT_SUPPORT_DATA_TYPE TAOS_DEF_ERROR_CODE(0, 0x6204) +#define TSDB_CODE_VTABLE_NOT_SUPPORT_STMT TAOS_DEF_ERROR_CODE(0, 0x6205) +#define TSDB_CODE_VTABLE_NOT_SUPPORT_TOPIC TAOS_DEF_ERROR_CODE(0, 0x6206) +#define TSDB_CODE_VTABLE_NOT_SUPPORT_CROSS_DB TAOS_DEF_ERROR_CODE(0, 0x6207) #ifdef __cplusplus } #endif diff --git a/source/libs/executor/inc/dynqueryctrl.h b/source/libs/executor/inc/dynqueryctrl.h index feb2dca76f..f8c1675c42 100755 --- a/source/libs/executor/inc/dynqueryctrl.h +++ b/source/libs/executor/inc/dynqueryctrl.h @@ -76,6 +76,7 @@ typedef struct SStbJoinDynCtrlInfo { typedef struct SVtbScanDynCtrlInfo { bool scanAllCols; + char* dbName; tsem_t ready; SEpSet epSet; SUseDbRsp* pRsp; diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 97d0cf4a0e..78899d52bc 100644 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -100,6 +100,9 @@ void freeUseDbOutput(void* pOutput) { } static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) { + if (pVtbScan->dbName) { + taosMemoryFreeClear(pVtbScan->dbName); + } if (pVtbScan->childTableList) { taosArrayDestroy(pVtbScan->childTableList); } @@ -1136,13 +1139,15 @@ int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp)); QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno); - QUERY_CHECK_CODE(tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp), lino, _return); + code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp); + QUERY_CHECK_CODE(code, lino, _return); taosMemoryFreeClear(pMsg->pData); - QUERY_CHECK_CODE(tsem_post(&pScanResInfo->vtbScan.ready), lino, _return); + code = tsem_post(&pScanResInfo->vtbScan.ready); + QUERY_CHECK_CODE(code, lino, _return); - return TSDB_CODE_SUCCESS; + return code; _return: qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; @@ -1157,7 +1162,8 @@ static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SReadHandle* pHandle, pReq = taosMemoryMalloc(sizeof(SUseDbReq)); QUERY_CHECK_NULL(pReq, code, lino, _return, terrno); - QUERY_CHECK_CODE(tNameGetFullDbName(name, pReq->db), lino, _return); + code = tNameGetFullDbName(name, pReq->db); + QUERY_CHECK_CODE(code, lino, _return); int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq); buf1 = taosMemoryCalloc(1, contLen); QUERY_CHECK_NULL(buf1, code, lino, _return, terrno); @@ -1177,11 +1183,14 @@ static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SReadHandle* pHandle, pMsgSendInfo->fp = dynProcessUseDbRsp; pMsgSendInfo->requestId = pTaskInfo->id.queryId; - QUERY_CHECK_CODE(asyncSendMsgToServer(pHandle->pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo), lino, _return); + code = asyncSendMsgToServer(pHandle->pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo); + QUERY_CHECK_CODE(code, lino, _return); - QUERY_CHECK_CODE(tsem_wait(&pScanResInfo->vtbScan.ready), lino, _return); + code = tsem_wait(&pScanResInfo->vtbScan.ready); + QUERY_CHECK_CODE(code, lino, _return); - QUERY_CHECK_CODE(queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp), lino, _return); + code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp); + QUERY_CHECK_CODE(code, lino, _return); _return: if (code) { @@ -1250,12 +1259,13 @@ int32_t dynHashValueComp(void const* lp, void const* rp) { int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) { int32_t code = 0; int32_t lino = 0; - QUERY_CHECK_CODE(dynMakeVgArraySortBy(dbInfo, dynVgInfoComp), lino, _return); + code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp); + QUERY_CHECK_CODE(code, lino, _return); int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray); if (vgNum <= 0) { qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum); - QUERY_CHECK_CODE(TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return); + QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return); } SVgroupInfo* vgInfo = NULL; @@ -1309,8 +1319,10 @@ int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) if (find == NULL) { output = taosMemoryMalloc(sizeof(SUseDbOutput)); - QUERY_CHECK_CODE(buildDbVgInfoMap(pOperator, pHandle, name, pTaskInfo, output), line, _return); - QUERY_CHECK_CODE(taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES), line, _return); + code = buildDbVgInfoMap(pOperator, pHandle, name, pTaskInfo, output); + QUERY_CHECK_CODE(code, line, _return); + code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES); + QUERY_CHECK_CODE(code, line, _return); } else { output = *find; } @@ -1357,12 +1369,14 @@ int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) { while (true) { if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) { - QUERY_CHECK_CODE(pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0], pRes), line, _return); + code = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0], pRes); + QUERY_CHECK_CODE(code, line, _return); } else { uint64_t* id = taosArrayGet(pVtbScan->childTableList, pVtbScan->curTableIdx); QUERY_CHECK_NULL(id, code, line, _return, terrno); pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_LOCK, &pHandle->api.metaFn); - QUERY_CHECK_CODE(pHandle->api.metaReaderFn.getTableEntryByUid(&mr, *id), line, _return); + code = pHandle->api.metaReaderFn.getTableEntryByUid(&mr, *id); + QUERY_CHECK_CODE(code, line, _return); for (int32_t j = 0; j < mr.me.colRef.nCols; j++) { if (mr.me.colRef.pColRef[j].hasRef && colNeedScan(pOperator, mr.me.colRef.pColRef[j].id)) { @@ -1370,15 +1384,22 @@ int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) { char dbFname[TSDB_DB_FNAME_LEN] = {0}; char orgTbFName[TSDB_TABLE_FNAME_LEN] = {0}; + if (strncmp(mr.me.colRef.pColRef[j].refDbName, pVtbScan->dbName, strlen(pVtbScan->dbName)) != 0) { + QUERY_CHECK_CODE(code = TSDB_CODE_VTABLE_NOT_SUPPORT_CROSS_DB, line, _return); + } toName(pInfo->vtbScan.acctId, mr.me.colRef.pColRef[j].refDbName, mr.me.colRef.pColRef[j].refTableName, &name); - QUERY_CHECK_CODE(getDbVgInfo(pOperator, &name, &dbVgInfo), line, _return); - QUERY_CHECK_CODE(tNameGetFullDbName(&name, dbFname), line, _return); - QUERY_CHECK_CODE(tNameGetFullTableName(&name, orgTbFName), line, _return); + code = getDbVgInfo(pOperator, &name, &dbVgInfo); + QUERY_CHECK_CODE(code, line, _return); + tNameGetFullDbName(&name, dbFname); + QUERY_CHECK_CODE(code, line, _return); + tNameGetFullTableName(&name, orgTbFName); + QUERY_CHECK_CODE(code, line, _return); void *pVal = taosHashGet(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName)); if (!pVal) { SOrgTbInfo map = {0}; - QUERY_CHECK_CODE(getVgId(dbVgInfo, dbFname, &map.vgId, name.tname), line, _return); + code = getVgId(dbVgInfo, dbFname, &map.vgId, name.tname); + QUERY_CHECK_CODE(code, line, _return); tstrncpy(map.tbName, orgTbFName, sizeof(map.tbName)); map.colMap = taosArrayInit(10, sizeof(SColIdNameKV)); QUERY_CHECK_NULL(map.colMap, code, line, _return, terrno); @@ -1386,7 +1407,8 @@ int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) { colIdNameKV.colId = mr.me.colRef.pColRef[j].id; tstrncpy(colIdNameKV.colName, mr.me.colRef.pColRef[j].refColName, sizeof(colIdNameKV.colName)); QUERY_CHECK_NULL(taosArrayPush(map.colMap, &colIdNameKV), code, line, _return, terrno); - QUERY_CHECK_CODE(taosHashPut(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName), &map, sizeof(map)), line, _return); + code = taosHashPut(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName), &map, sizeof(map)); + QUERY_CHECK_CODE(code, line, _return); } else { SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal; SColIdNameKV colIdNameKV = {0}; @@ -1398,13 +1420,15 @@ int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) { } pVtbScan->vtbScanParam = NULL; - QUERY_CHECK_CODE(buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, *id), line, _return); + code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, *id); + QUERY_CHECK_CODE(code, line, _return); void* pIter = taosHashIterate(pVtbScan->orgTbVgColMap, NULL); while (pIter != NULL) { SOrgTbInfo* pMap = (SOrgTbInfo*)pIter; SOperatorParam* pExchangeParam = NULL; - QUERY_CHECK_CODE(buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap), line, _return); + code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap); + QUERY_CHECK_CODE(code, line, _return); QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno); pIter = taosHashIterate(pVtbScan->orgTbVgColMap, pIter); } @@ -1412,7 +1436,8 @@ int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) { // reset downstream operator's status pOperator->pDownstream[0]->status = OP_NOT_OPENED; - QUERY_CHECK_CODE(pOperator->pDownstream[0]->fpSet.getNextExtFn(pOperator->pDownstream[0], pVtbScan->vtbScanParam, pRes), line, _return); + code = pOperator->pDownstream[0]->fpSet.getNextExtFn(pOperator->pDownstream[0], pVtbScan->vtbScanParam, pRes); + QUERY_CHECK_CODE(code, line, _return); } if (*pRes) { @@ -1478,7 +1503,8 @@ static int32_t initVtbScanInfo(SOperatorInfo* pOperator, SDynQueryCtrlOperatorIn int32_t code = TSDB_CODE_SUCCESS; int32_t line = 0; - QUERY_CHECK_CODE(tsem_init(&pInfo->vtbScan.ready, 0, 0), line, _return); + code = tsem_init(&pInfo->vtbScan.ready, 0, 0); + QUERY_CHECK_CODE(code, line, _return); pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols; pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid; @@ -1487,6 +1513,8 @@ static int32_t initVtbScanInfo(SOperatorInfo* pOperator, SDynQueryCtrlOperatorIn pInfo->vtbScan.readHandle = *pHandle; pInfo->vtbScan.curTableIdx = 0; pInfo->vtbScan.lastTableIdx = -1; + pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName); + QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno); pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t)); QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno); @@ -1499,7 +1527,8 @@ static int32_t initVtbScanInfo(SOperatorInfo* pOperator, SDynQueryCtrlOperatorIn pInfo->vtbScan.childTableList = taosArrayInit(10, sizeof(uint64_t)); QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno); - QUERY_CHECK_CODE(pHandle->api.metaFn.getChildTableList(pHandle->vnode, pInfo->vtbScan.suid, pInfo->vtbScan.childTableList), line, _return); + code = pHandle->api.metaFn.getChildTableList(pHandle->vnode, pInfo->vtbScan.suid, pInfo->vtbScan.childTableList); + QUERY_CHECK_CODE(code, line, _return); pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno); @@ -1518,6 +1547,7 @@ int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numO QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; + int32_t line = 0; __optr_fn_t nextFp = NULL; SOperatorInfo* pOperator = NULL; SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo)); @@ -1554,7 +1584,8 @@ int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numO nextFp = seqStableJoin; break; case DYN_QTYPE_VTB_SCAN: - QUERY_CHECK_CODE(initVtbScanInfo(pOperator, pInfo, pHandle, pPhyciNode, pTaskInfo), code, _error); + code = initVtbScanInfo(pOperator, pInfo, pHandle, pPhyciNode, pTaskInfo); + QUERY_CHECK_CODE(code, line, _error); nextFp = vtbScan; break; default: diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index d20d072668..7f299dfef8 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -1239,7 +1239,7 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM); pOperator->pOperatorGetParam = NULL; - return TSDB_CODE_SUCCESS; + return code; } int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 11390ba38b..e5d2ce0735 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1237,11 +1237,13 @@ static int32_t createVTableScanInfoFromParam(SOperatorInfo* pOperator) { } pAPI->metaReaderFn.initReader(&orgTable, pInfo->base.readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); - QUERY_CHECK_CODE(pAPI->metaReaderFn.getTableEntryByName(&orgTable, strstr(pParam->pOrgTbInfo->tbName, ".") + 1), lino, _return); + code = pAPI->metaReaderFn.getTableEntryByName(&orgTable, strstr(pParam->pOrgTbInfo->tbName, ".") + 1); + QUERY_CHECK_CODE(code, lino, _return); switch (orgTable.me.type) { case TSDB_CHILD_TABLE: pAPI->metaReaderFn.initReader(&superTable, pInfo->base.readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); - QUERY_CHECK_CODE(pAPI->metaReaderFn.getTableEntryByUid(&superTable, orgTable.me.ctbEntry.suid), lino, _return); + code = pAPI->metaReaderFn.getTableEntryByUid(&superTable, orgTable.me.ctbEntry.suid); + QUERY_CHECK_CODE(code, lino, _return); schema = &superTable.me.stbEntry.schemaRow; break; case TSDB_NORMAL_TABLE: @@ -1289,8 +1291,10 @@ static int32_t createVTableScanInfoFromParam(SOperatorInfo* pOperator) { blockDataDestroy(pInfo->pResBlock); pInfo->pResBlock = NULL; } - QUERY_CHECK_CODE(createOneDataBlockWithColArray(pInfo->pOrgBlock, pBlockColArray, &pInfo->pResBlock), lino, _return); - QUERY_CHECK_CODE(initQueryTableDataCondWithColArray(&pInfo->base.cond, &pInfo->base.orgCond, &pInfo->base.readHandle, pColArray), lino, _return); + code = createOneDataBlockWithColArray(pInfo->pOrgBlock, pBlockColArray, &pInfo->pResBlock); + QUERY_CHECK_CODE(code, lino, _return); + code = initQueryTableDataCondWithColArray(&pInfo->base.cond, &pInfo->base.orgCond, &pInfo->base.readHandle, pColArray); + QUERY_CHECK_CODE(code, lino, _return); pInfo->base.cond.twindows.skey = pParam->window.ekey + 1; pInfo->base.cond.suid = orgTable.me.type == TSDB_CHILD_TABLE ? superTable.me.uid : 0; pInfo->currentGroupId = 0; @@ -1304,7 +1308,8 @@ static int32_t createVTableScanInfoFromParam(SOperatorInfo* pOperator) { uint64_t pUid = orgTable.me.uid; STableKeyInfo info = {.groupId = 0, .uid = pUid}; int32_t tableIdx = 0; - QUERY_CHECK_CODE(taosHashPut(pListInfo->map, &pUid, sizeof(uint64_t), &tableIdx, sizeof(int32_t)), lino, _return); + code = taosHashPut(pListInfo->map, &pUid, sizeof(uint64_t), &tableIdx, sizeof(int32_t)); + QUERY_CHECK_CODE(code, lino, _return); QUERY_CHECK_NULL(taosArrayPush(pListInfo->pTableList, &info), code, lino, _return, terrno); qDebug("add dynamic table scan uid:%" PRIu64 ", %s", info.uid, GET_TASKID(pTaskInfo)); @@ -1470,12 +1475,14 @@ int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SSDataBlock* result = NULL; while (true) { - QUERY_CHECK_CODE(startNextGroupScan(pOperator, &result), lino, _end); + code = startNextGroupScan(pOperator, &result); + QUERY_CHECK_CODE(code, lino, _end); if (result || pOperator->status == OP_EXEC_DONE) { SSDataBlock* res = NULL; if (result) { - QUERY_CHECK_CODE(createOneDataBlockWithTwoBlock(result, pInfo->pOrgBlock, &res), lino, _end); + code = createOneDataBlockWithTwoBlock(result, pInfo->pOrgBlock, &res); + QUERY_CHECK_CODE(code, lino, _end); pInfo->pResBlock = res; blockDataDestroy(result); } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 0a80ece8f8..5344168ceb 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -773,6 +773,7 @@ static int32_t logicDynQueryCtrlCopy(const SDynQueryCtrlLogicNode* pSrc, SDynQue COPY_OBJECT_FIELD(stbJoin.srcScan, sizeof(pDst->stbJoin.srcScan)); COPY_SCALAR_FIELD(vtbScan.scanAllCols); COPY_SCALAR_FIELD(vtbScan.suid); + COPY_CHAR_ARRAY_FIELD(vtbScan.dbName); CLONE_OBJECT_FIELD(vtbScan.pVgroupList, vgroupsInfoClone); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 2dbf3025da..276dfbe525 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -4386,6 +4386,7 @@ enum { PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_SRC_SCAN1, PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_SCAN_ALL_COLS, PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_SUID, + PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_DBNAME, PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_ACCOUNT_ID, PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_EP_SET, PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_SCAN_COLS, @@ -4427,6 +4428,9 @@ static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncode if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeU64(pEncoder, PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_SUID, pNode->vtbScan.suid); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_DBNAME, pNode->vtbScan.dbName); + } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeI32(pEncoder, PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_ACCOUNT_ID, pNode->vtbScan.accountId); } @@ -4485,6 +4489,9 @@ static int32_t msgToPhysiDynQueryCtrlNode(STlvDecoder* pDecoder, void* pObj) { case PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_SUID: code = tlvDecodeU64(pTlv, &pNode->vtbScan.suid); break; + case PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_DBNAME: + code = tlvDecodeCStr(pTlv, pNode->vtbScan.dbName, sizeof(pNode->vtbScan.dbName)); + break; case PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_ACCOUNT_ID: code = tlvDecodeI32(pTlv, &pNode->vtbScan.accountId); break; diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 970751d827..81129a6f8f 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1206,6 +1206,10 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, STableMet *pMissCache = true; } else if (bUsingTable && TSDB_SUPER_TABLE != (*pTableMeta)->tableType) { code = buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed"); + } else if (((*pTableMeta)->virtualStb) || + TSDB_VIRTUAL_CHILD_TABLE == (*pTableMeta)->tableType || + TSDB_VIRTUAL_NORMAL_TABLE == (*pTableMeta)->tableType) { + code = TSDB_CODE_VTABLE_NOT_SUPPORT_STMT; } } return code; @@ -1341,6 +1345,8 @@ static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt code = getTableMeta(pCxt, &pStmt->usingTableName, &pStableMeta, &pCxt->missCache, bUsingTable); if (TSDB_CODE_SUCCESS == code) { code = taosHashPut(pStmt->pSuperTableHashObj, tbFName, strlen(tbFName), &pStableMeta, POINTER_BYTES); + } else { + taosMemoryFreeClear(pStableMeta); } } } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f16760c6b1..1b8f0e034c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5479,6 +5479,15 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable, bool inJoin) { code = TSDB_CODE_TSC_INVALID_OPERATION; break; } + + if (pCxt->pParseCxt->isStmtBind) { + code = TSDB_CODE_VTABLE_NOT_SUPPORT_STMT; + break; + } + if (pCxt->pParseCxt->topicQuery) { + code = TSDB_CODE_VTABLE_NOT_SUPPORT_TOPIC; + break; + } PAR_ERR_RET(translateVirtualTable(pCxt, pTable, &name)); SVirtualTableNode *pVirtualTable = (SVirtualTableNode*)*pTable; pVirtualTable->table.singleTable = true; @@ -8566,7 +8575,7 @@ static int32_t translateInsertTable(STranslateContext* pCxt, SNode** pTable) { int32_t code = translateFrom(pCxt, pTable); if (TSDB_CODE_SUCCESS == code && TSDB_CHILD_TABLE != ((SRealTableNode*)*pTable)->pMeta->tableType && TSDB_NORMAL_TABLE != ((SRealTableNode*)*pTable)->pMeta->tableType) { - code = buildInvalidOperationMsg(&pCxt->msgBuf, "insert data into super table is not supported"); + code = buildInvalidOperationMsg(&pCxt->msgBuf, "insert data into super table or virtual table is not supported"); } return code; } @@ -11734,6 +11743,14 @@ static int32_t buildQueryForTableTopic(STranslateContext* pCxt, SCreateTopicStmt return code; } +static bool isVirtualTable(int8_t tableType) { + if (tableType == TSDB_VIRTUAL_CHILD_TABLE || tableType == TSDB_VIRTUAL_NORMAL_TABLE) { + return true; + } else { + return false; + } +} + static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) { if (NULL == pStmt->pQuery && NULL == pStmt->pWhere) { return TSDB_CODE_SUCCESS; @@ -12010,16 +12027,6 @@ static bool crossTableWithUdaf(SSelectStmt* pSelect) { !hasTbnameFunction(pSelect->pPartitionByList); } - -static bool isVirtualTable(int8_t tableType) { - if (tableType == TSDB_VIRTUAL_CHILD_TABLE || tableType == TSDB_VIRTUAL_NORMAL_TABLE) { - return true; - } else { - return false; - } -} - - static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { if (NULL == pStmt->pQuery) { return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index a5f32d7159..69c6f03e99 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1023,6 +1023,7 @@ static int32_t createVirtualSuperTableLogicNode(SLogicPlanContext* pCxt, SSelect pDynCtrl->qType = DYN_QTYPE_VTB_SCAN; pDynCtrl->vtbScan.scanAllCols = pVtableScan->scanAllCols; pDynCtrl->vtbScan.suid = pVtableScan->stableId; + tstrncpy(pDynCtrl->vtbScan.dbName, pVtableScan->tableName.dbname, TSDB_DB_NAME_LEN); PLAN_ERR_JRET(nodesListMakeStrictAppend(&pDynCtrl->node.pChildren, (SNode*)pVtableScan)); PLAN_ERR_JRET(nodesCloneList(pVtableScan->node.pTargets, &pDynCtrl->node.pTargets)); TSWAP(pVtableScan->pVgroupList, pDynCtrl->vtbScan.pVgroupList); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 4901b916f3..46442f84f1 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -7861,6 +7861,9 @@ static int32_t findDepTableScanNode(SColumnNode* pCol, SVirtualScanLogicNode *pV FOREACH(pScanCol, pScanNode->pScanCols) { if (QUERY_NODE_COLUMN == nodeType(pScanCol)) { SColumnNode *pScanColNode = (SColumnNode *)pScanCol; + if (pScanColNode->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + continue; + } if (pScanColNode->hasDep && pCol->hasRef) { if (strcmp(pScanColNode->dbName, pCol->refDbName) == 0 && strcmp(pScanColNode->tableAlias, pCol->refTableName) == 0 && diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 528d88c604..3b19b8d2be 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1919,6 +1919,7 @@ static int32_t updateDynQueryCtrlVtbScanInfo(SPhysiPlanContext* pCxt, SNodeList* pDynCtrl->vtbScan.suid = pLogicNode->vtbScan.suid; pDynCtrl->vtbScan.mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet; pDynCtrl->vtbScan.accountId = pCxt->pPlanCxt->acctId; + tstrncpy(pDynCtrl->vtbScan.dbName, pLogicNode->vtbScan.dbName, TSDB_DB_NAME_LEN); return code; _return: diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 3b6766fc27..fd88a53b23 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -910,6 +910,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM, "Virtual table scan TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_PRIMTS_HAS_REF, "Virtual table prim timestamp column should not has ref column") TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_NOT_VIRTUAL_SUPER_TABLE, "Create virtual child table must use virtual super table") TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_NOT_SUPPORT_DATA_TYPE, "Virtual table not support decimal type") +TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_NOT_SUPPORT_STMT, "Virtual table not support in STMT query and STMT insert") +TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_NOT_SUPPORT_TOPIC, "Virtual table not support in topic") +TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_NOT_SUPPORT_CROSS_DB, "Virtual super table query not support origin table from different databases") #ifdef TAOS_ERROR_C }; #endif diff --git a/tests/system-test/1-insert/stmt_error.py b/tests/system-test/1-insert/stmt_error.py index 0bfbedb9a1..ce243edd72 100644 --- a/tests/system-test/1-insert/stmt_error.py +++ b/tests/system-test/1-insert/stmt_error.py @@ -154,6 +154,129 @@ class TDTestCase: conn.close() raise err + def test_stmt_insert_vtb_error(self,conn): + # type: (TaosConnection) -> None + + dbname = "pytest_taos_stmt_vtb_error" + try: + conn.execute("drop database if exists %s" % dbname) + conn.execute("create database if not exists %s" % dbname) + conn.select_db(dbname) + + conn.execute( + "create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\ + bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \ + ff float, dd double, bb binary(65059), nn nchar(100), tt timestamp)", + ) + + conn.execute( + "create vtable if not exists log_v(ts timestamp, bo bool from pytest_taos_stmt_vtb_error.log.bo, " + "nil tinyint, ti tinyint, si smallint, ii int,\ + bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \ + ff float, dd double, bb binary(65059), nn nchar(100), tt timestamp)", + ) + conn.load_table_info("log_v") + + + stmt = conn.statement("insert into log_v values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + params = new_bind_params(16) + params[0].timestamp(1626861392589, PrecisionEnum.Milliseconds) + params[1].bool(True) + params[2].tinyint(None) + params[3].tinyint(2) + params[4].smallint(3) + params[5].int(4) + params[6].bigint(5) + params[7].tinyint_unsigned(6) + params[8].smallint_unsigned(7) + params[9].int_unsigned(8) + params[10].bigint_unsigned(9) + params[11].float(10.1) + params[12].double(10.11) + binaryStr = '123456789' + for i in range(1301): + binaryStr += "1234567890abcdefghij1234567890abcdefghij12345hello" + params[13].binary(binaryStr) + params[14].nchar("stmt") + params[15].timestamp(1626861392589, PrecisionEnum.Milliseconds) + + stmt.bind_param(params) + stmt.execute() + + assert stmt.affected_rows == 1 + stmt.close() + + querystmt=conn.statement("select ?, bo, nil, ti, si, ii,bi, tu, su, iu, bu, ff, dd, bb, nn, tt from log") + queryparam=new_bind_params(1) + print(type(queryparam)) + queryparam[0].binary("ts") + querystmt.bind_param(queryparam) + querystmt.execute() + result=querystmt.use_result() + + row=result.fetch_all() + print(row) + + assert row[0][1] == True + assert row[0][2] == None + for i in range(3, 10): + assert row[0][i] == i - 1 + #float == may not work as expected + # assert row[0][11] == c_float(10.1) + assert row[0][12] == 10.11 + assert row[0][13][65054:] == "hello" + assert row[0][14] == "stmt" + + conn.execute("drop database if exists %s" % dbname) + conn.close() + + except Exception as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + raise err + + def test_stmt_insert_vstb_error(self,conn): + + dbname = "pytest_taos_stmt_vstb_error" + try: + conn.execute("drop database if exists %s" % dbname) + conn.execute("create database if not exists %s" % dbname) + conn.execute("alter database %s keep 36500" % dbname) + conn.select_db(dbname) + + conn.execute("create stable STB_v(ts timestamp, n int) tags(b int) virtual 1") + + stmt = conn.statement("insert into ? using STB_v tags(?) values(?, ?)") + params = new_bind_params(1) + params[0].int(4); + stmt.set_tbname_tags("ct", params); + + multi_params = new_multi_binds(2); + multi_params[0].timestamp([9223372036854775808]) + multi_params[1].int([123]) + stmt.bind_param_batch(multi_params) + + stmt.execute() + result = stmt.use_result() + + result.close() + stmt.close() + + stmt = conn.statement("select * from STB") + stmt.execute() + result = stmt.use_result() + print(result.affected_rows) + row = result.next() + print(row) + + result.close() + stmt.close() + conn.close() + + except Exception as err: + conn.close() + raise err + def test_stmt_insert_error_null_timestamp(self,conn): dbname = "pytest_taos_stmt_error_null_ts" @@ -270,6 +393,24 @@ class TDTestCase: tdLog.info('=========stmt error occured for bind part column(NULL Timestamp) ==============') else: tdLog.exit("expect error(%s) not occured - 2" % str(error)) + + try: + self.test_stmt_insert_vtb_error(self.conn()) + except Exception as error : + + if str(error)=='[0x6205]: Virtual table not support in STMT query and STMT insert': + tdLog.info('=========stmt error occured for bind part column ==============') + else: + tdLog.exit("expect error(%s) not occured" % str(error)) + + try: + self.test_stmt_insert_vstb_error(self.conn()) + except Exception as error : + + if str(error)=='[0x6205]: Virtual table not support in STMT query and STMT insert': + tdLog.info('=========stmt error occured for bind part column ==============') + else: + tdLog.exit("expect error(%s) not occured" % str(error)) def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__)