Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

This commit is contained in:
gccgdb1234 2022-08-11 11:41:23 +08:00
commit 322b8c3006
59 changed files with 717 additions and 4545 deletions

View File

@ -309,6 +309,7 @@ def pre_test_build_win() {
'''
bat '''
cd %WIN_CONNECTOR_ROOT%
python.exe -m pip install --upgrade pip
python -m pip install .
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
'''
@ -327,6 +328,7 @@ def run_win_test() {
bat '''
echo "windows test ..."
cd %WIN_CONNECTOR_ROOT%
python.exe -m pip install --upgrade pip
python -m pip install .
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
ls -l C:\\Windows\\System32\\taos.dll

View File

@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG ed6a160
GIT_TAG 3d21433
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 2d68404
GIT_TAG 57bdfbf
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -2,7 +2,7 @@
# taosws-rs
ExternalProject_Add(taosws-rs
GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git
GIT_TAG 97c4bac
GIT_TAG 7a54d21
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -47,27 +47,28 @@ If the displayed content is followed by `...` you can use this command to change
You can change the behavior of TDengine CLI by specifying command-line parameters. The following parameters are commonly used.
- -h, --host=HOST: FQDN of the server where the TDengine server is to be connected. Default is to connect to the local service
- -P, --port=PORT: Specify the port number to be used by the server. Default is `6030`
- -u, --user=USER: the user name to use when connecting. Default is `root`
- -p, --password=PASSWORD: the password to use when connecting to the server. Default is `taosdata`
- -h HOST: FQDN of the server where the TDengine server is to be connected. Default is to connect to the local service
- -P PORT: Specify the port number to be used by the server. Default is `6030`
- -u USER: the user name to use when connecting. Default is `root`
- -p PASSWORD: the password to use when connecting to the server. Default is `taosdata`
- -?, --help: print out all command-line arguments
And many more parameters.
- -c, --config-dir: Specify the directory where configuration file exists. The default is `/etc/taos`, and the default name of the configuration file in this directory is `taos.cfg`
- -C, --dump-config: Print the configuration parameters of `taos.cfg` in the default directory or specified by -c
- -d, --database=DATABASE: Specify the database to use when connecting to the server
- -D, --directory=DIRECTORY: Import the SQL script file in the specified path
- -f, --file=FILE: Execute the SQL script file in non-interactive mode
- -k, --check=CHECK: Specify the table to be checked
- -l, --pktlen=PKTLEN: Test package size to be used for network testing
- -n, --netrole=NETROLE: test scope for network connection test, default is `startup`. The value can be `client`, `server`, `rpc`, `startup`, `sync`, `speed`, or `fqdn`.
- -r, --raw-time: output the timestamp format as unsigned 64-bits integer (uint64_t in C language)
- -s, --commands=COMMAND: execute SQL commands in non-interactive mode
- -S, --pkttype=PKTTYPE: Specify the packet type used for network testing. The default is TCP, can be specified as either TCP or UDP when `speed` is specified to `netrole` parameter
- -T, --thread=THREADNUM: The number of threads to import data in multi-threaded mode
- -s, --commands: Run TDengine CLI commands without entering the terminal
- -a AUTHSTR: The auth string to use when connecting to the server
- -A: Generate auth string from password
- -c CONFIGDIR: Specify the directory where configuration file exists. The default is `/etc/taos`, and the default name of the configuration file in this directory is `taos.cfg`
- -C: Print the configuration parameters of `taos.cfg` in the default directory or specified by -c
- -d DATABASE: Specify the database to use when connecting to the server
- -f FILE: Execute the SQL script file in non-interactive mode
- -k: Check the service status, 0: unavailable1: network ok2: service ok3: service degraded4: exiting
- -l PKTLEN: Test package length to be used for network testing
- -n NETROLE: test scope for network connection test, default is `client`. The value can be `client`, `server`
- -N PKTNUM: Test package numbers to be used for network testing
- -r: output the timestamp format as unsigned 64-bits integer (uint64_t in C language)
- -s COMMAND: execute SQL commands in non-interactive mode
- -t: Check the details of the service statusstatus same as -k
- -w DISPLAYWIDTH: 客户端列显示宽度
- -z, --timezone=TIMEZONE: Specify time zone. Default is the value of current configuration file
- -V, --version: Print out the current version number

View File

@ -5,12 +5,11 @@ description: "List of platforms supported by TDengine server, client, and connec
## List of supported platforms for TDengine server
| | **CentOS 7/8** | **Ubuntu 16/18/20** | **Other Linux** |
| ------------ | -------------- | ------------------- | --------------- |
| X64 | ● | ● | |
| MIPS64 | | | ● |
| ARM64 | | ○ | ○ |
| Alpha64 | | | ○ |
| | **Windows 10/11** | **CentOS 7.9/8** | **Ubuntu 18/20** | **Other Linux** | **UOS** | **Kylin** | **Ningsi V60/V80** | **HUAWEI EulerOS** |
| ------------------ | ----------------- | ---------------- | ---------------- | --------------- | ------- | --------- | ------------------ | ------------------ |
| X64 | ● | ● | ● | | ● | ● | ● | |
| Raspberry Pi ARM64 | | | | ● | | | | |
| HUAWEI cloud ARM64 | | | | | | | | ● |
Note: ● means officially tested and verified, ○ means unofficially tested and verified.

View File

@ -25,7 +25,6 @@ All executable files of TDengine are in the _/usr/local/taos/bin_ directory by d
- _taosBenchmark_: TDengine testing tool
- _remove.sh_: script to uninstall TDengine, please execute it carefully, link to the **rmtaos** command in the /usr/bin directory. Will remove the TDengine installation directory `/usr/local/taos`, but will keep `/etc/taos`, `/var/lib/taos`, `/var/log/taos`
- _taosadapter_: server-side executable that provides RESTful services and accepts writing requests from a variety of other softwares
- _tarbitrator_: provides arbitration for two-node cluster deployments
- _TDinsight.sh_: script to download TDinsight and install it
- _set_core.sh_: script for setting up the system to generate core dump files for easy debugging
- _taosd-dump-cfg.gdb_: script to facilitate debugging of taosd's gdb execution.

View File

@ -2,6 +2,9 @@
sidebar_label: Docker
title: 通过 Docker 快速体验 TDengine
---
:::info
如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
:::
本节首先介绍如何通过 Docker 快速体验 TDengine然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。

View File

@ -10,14 +10,14 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
目前 TDengine 的原生接口连接器可支持的平台包括X64/X86/ARM64/ARM32/MIPS/Alpha 等硬件平台,以及 Linux/Win64/Win32 等开发环境。对照矩阵如下:
| **CPU** | **OS** | **JDBC** | **Python** | **Go** | **Node.js** | **C#** | **Rust** | C/C++ |
| **CPU** | **OS** | **Java** | **Python** | **Go** | **Node.js** | **C#** | **Rust** | C/C++ |
| -------------- | --------- | -------- | ---------- | ------ | ----------- | ------ | -------- | ----- |
| **X86 64bit** | **Linux** | ● | ● | ● | ● | ● | ● | ● |
| **X86 64bit** | **Win64** | ● | ● | ● | ● | ● | ● | ● |
| **X86 64bit** | **Win32** | ● | ● | ● | ● | ○ | ○ | ● |
| **X86 32bit** | **Win32** | ○ | ○ | ○ | ○ | ○ | ○ | ● |
| **ARM64** | **Linux** | ● | ● | ● | ● | ○ | ○ | ● |
| **ARM32** | **Linux** | ● | ● | ● | ● | ○ | ○ | ● |
| **ARM32** | **Linux** | ○ | ○ | ○ | ○ | ○ | ○ | ● |
| **MIPS 龙芯** | **Linux** | ○ | ○ | ○ | ○ | ○ | ○ | ○ |
| **Alpha 申威** | **Linux** | ○ | ○ | -- | -- | -- | -- | ○ |
| **X86 海光** | **Linux** | ○ | ○ | ○ | -- | -- | -- | ○ |
@ -32,6 +32,7 @@ TDengine 版本更新往往会增加新的功能特性,列表中的连接器
| **TDengine 版本** | **Java** | **Python** | **Go** | **C#** | **Node.js** | **Rust** |
| --------------------- | -------- | ---------- | ------------ | ------------- | --------------- | -------- |
| **3.0.0.0 及以上** | 3.0.0 | 当前版本 | 3.0 分支 | 3.0.0 | 3.0.0 | 当前版本 |
| **2.4.0.14 及以上** | 2.0.38 | 当前版本 | develop 分支 | 1.0.2 - 1.0.6 | 2.0.10 - 2.0.12 | 当前版本 |
| **2.4.0.6 及以上** | 2.0.37 | 当前版本 | develop 分支 | 1.0.2 - 1.0.6 | 2.0.10 - 2.0.12 | 当前版本 |
| **2.4.0.4 - 2.4.0.5** | 2.0.37 | 当前版本 | develop 分支 | 1.0.2 - 1.0.6 | 2.0.10 - 2.0.12 | 当前版本 |
@ -48,9 +49,8 @@ TDengine 版本更新往往会增加新的功能特性,列表中的连接器
| -------------- | -------- | ---------- | ------ | ------ | ----------- | -------- |
| **连接管理** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **普通查询** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **连续查询** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **参数绑定** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **订阅功能** | 支持 | 支持 | 支持 | 支持 | 支持 | 暂不支持 |
| ** TMQ ** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **Schemaless** | 支持 | 支持 | 支持 | 支持 | 支持 | 支持 |
| **DataFrame** | 不支持 | 支持 | 不支持 | 不支持 | 不支持 | 不支持 |
@ -58,17 +58,17 @@ TDengine 版本更新往往会增加新的功能特性,列表中的连接器
由于不同编程语言数据库框架规范不同,并不意味着所有 C/C++ 接口都需要对应封装支持。
:::
### 使用 REST 接口
### 使用 http (REST 或 WebSocket) 接口
| **功能特性** | **Java** | **Python** | **Go** | **C#(暂不支持)** | **Node.js** | **Rust** |
| ------------------------------ | -------- | ---------- | -------- | ------------------ | ----------- | -------- |
| **连接管理** | 支持 | 支持 | 支持 | N/A | 支持 | 支持 |
| **普通查询** | 支持 | 支持 | 支持 | N/A | 支持 | 支持 |
| **连续查询** | 支持 | 支持 | 支持 | N/A | 支持 | 支持 |
| **参数绑定** | 不支持 | 不支持 | 不支持 | N/A | 不支持 | 支持 |
| **订阅功能** | 不支持 | 不支持 | 不支持 | N/A | 不支持 | 不支持 |
| **Schemaless** | 暂不支持 | 暂不支持 | 暂不支持 | N/A | 不支持 | 暂不支持 |
| **批量拉取(基于 WebSocket** | 支持 | 暂不支持 | 暂不支持 | N/A | 不支持 | 暂不支持 |
| **参数绑定** | 不支持 | 不支持 | 不支持 | N/A | 不支持 | 支持 |
| ** TMQ ** | 不支持 | 暂不支持 | 暂不支持 | N/A | 不支持 | 支持 |
| **Schemaless** | 暂不支持 | 暂不支持 | 暂不支持 | N/A | 不支持 | 暂不支持 |
| **批量拉取(基于 WebSocket** | 支持 | 支持 | 暂不支持 | N/A | 不支持 | 支持 |
| **DataFrame** | 不支持 | 支持 | 不支持 | N/A | 不支持 | 不支持 |
:::warning

View File

@ -48,29 +48,30 @@ taos> SET MAX_BINARY_DISPLAY_WIDTH <nn>;
您可通过配置命令行参数来改变 TDengine CLI 的行为。以下为常用的几个命令行参数:
- -h, --host=HOST: 要连接的 TDengine 服务端所在服务器的 FQDN, 默认为连接本地服务
- -P, --port=PORT: 指定服务端所用端口号
- -u, --user=USER: 连接时使用的用户名
- -p, --password=PASSWORD: 连接服务端时使用的密码
- -h HOST: 要连接的 TDengine 服务端所在服务器的 FQDN, 默认为连接本地服务
- -P PORT: 指定服务端所用端口号
- -u USER: 连接时使用的用户名
- -p PASSWORD: 连接服务端时使用的密码
- -?, --help: 打印出所有命令行参数
还有更多其他参数:
- -c, --config-dir: 指定配置文件目录Linux 环境下默认为 `/etc/taos`,该目录下的配置文件默认名称为 `taos.cfg`
- -C, --dump-config: 打印 -c 指定的目录中 `taos.cfg` 的配置参数
- -d, --database=DATABASE: 指定连接到服务端时使用的数据库
- -D, --directory=DIRECTORY: 导入指定路径中的 SQL 脚本文件
- -f, --file=FILE: 以非交互模式执行 SQL 脚本文件。文件中一个 SQL 语句只能占一行
- -k, --check=CHECK: 指定要检查的表
- -l, --pktlen=PKTLEN: 网络测试时使用的测试包大小
- -n, --netrole=NETROLE: 网络连接测试时的测试范围,默认为 `startup`, 可选值为 `client`、`server`、`rpc`、`startup`、`sync`、`speed` 和 `fqdn` 之一
- -r, --raw-time: 将时间输出出无符号 64 位整数类型(即 C 语音中 uint64_t)
- -s, --commands=COMMAND: 以非交互模式执行的 SQL 命令
- -S, --pkttype=PKTTYPE: 指定网络测试所用的包类型,默认为 TCP。只有 netrole 为 `speed` 时既可以指定为 TCP 也可以指定为 UDP
- -T, --thread=THREADNUM: 以多线程模式导入数据时的线程数
- -s, --commands: 在不进入终端的情况下运行 TDengine 命令
- -z, --timezone=TIMEZONE: 指定时区,默认为本地时区
- -V, --version: 打印出当前版本号
- -a AUTHSTR: 连接服务端的授权信息
- -A: 通过用户名和密码计算授权信息
- -c CONFIGDIR: 指定配置文件目录Linux 环境下默认为 `/etc/taos`,该目录下的配置文件默认名称为 `taos.cfg`
- -C: 打印 -c 指定的目录中 `taos.cfg` 的配置参数
- -d DATABASE: 指定连接到服务端时使用的数据库
- -f FILE: 以非交互模式执行 SQL 脚本文件。文件中一个 SQL 语句只能占一行
- -k: 测试服务端运行状态0: unavailable1: network ok2: service ok3: service degraded4: exiting
- -l PKTLEN: 网络测试时使用的测试包大小
- -n NETROLE: 网络连接测试时的测试范围,默认为 `client`, 可选值为 `client`、`server`
- -N PKTNUM: 网络测试时使用的测试包数量
- -r: 将时间输出出无符号 64 位整数类型(即 C 语音中 uint64_t)
- -s COMMAND: 以非交互模式执行的 SQL 命令
- -t: 测试服务端启动状态,状态同-k
- -w DISPLAYWIDTH: 客户端列显示宽度
- -z TIMEZONE: 指定时区,默认为本地时区
- -V: 打印出当前版本号
示例:

View File

@ -5,18 +5,11 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表"
## TDengine 服务端支持的平台列表
| | **CentOS 7/8** | **Ubuntu 16/18/20** | **Other Linux** | **统信 UOS** | **银河/中标麒麟** | **凝思 V60/V80** | **华为 EulerOS** |
| ------------ | -------------- | ------------------- | --------------- | ------------ | ----------------- | ---------------- | ---------------- |
| X64 | ● | ● | | ○ | ● | ● | ● |
| 龙芯 MIPS64 | | | ● | | | | |
| 鲲鹏 ARM64 | | ○ | ○ | | ● | | |
| 申威 Alpha64 | | | ○ | ● | | | |
| 飞腾 ARM64 | | ○ 优麒麟 | | | | | |
| 海光 X64 | ● | ● | ● | ○ | ● | ● | |
| 瑞芯微 ARM64 | | | ○ | | | | |
| 全志 ARM64 | | | ○ | | | | |
| 炬力 ARM64 | | | ○ | | | | |
| 华为云 ARM64 | | | | | | | ● |
| | **Windows 10/11** | **CentOS 7.9/8** | **Ubuntu 18/20** | **Other Linux** | **统信 UOS** | **银河/中标麒麟** | **凝思 V60/V80** | **华为 EulerOS** |
| ------------ | ----------------- | ---------------- | ---------------- | --------------- | ------------ | ----------------- | ---------------- | ---------------- |
| X64 | ● | ● | ● | | ● | ● | ● | |
| 树莓派 ARM64 | | | | ● | | | | |
| 华为云 ARM64 | | | | | | | | ● |
注: ● 表示经过官方测试验证, ○ 表示非官方测试验证。

View File

@ -25,7 +25,6 @@ TDengine 的所有可执行文件默认存放在 _/usr/local/taos/bin_ 目录下
- _taosBenchmark_TDengine 测试工具
- _remove.sh_:卸载 TDengine 的脚本,请谨慎执行,链接到/usr/bin 目录下的**rmtaos**命令。会删除 TDengine 的安装目录/usr/local/taos但会保留/etc/taos、/var/lib/taos、/var/log/taos
- _taosadapter_: 提供 RESTful 服务和接受其他多种软件写入请求的服务端可执行文件
- _tarbitrator_: 提供双节点集群部署的仲裁功能
- _TDinsight.sh_:用于下载 TDinsight 并安装的脚本
- _set_core.sh_用于方便调试设置系统生成 core dump 文件的脚本
- _taosd-dump-cfg.gdb_:用于方便调试 taosd 的 gdb 执行脚本。

@ -1 +0,0 @@
Subproject commit 7ed7a97715388fa144718764d6bf20f9bfc29a12

View File

@ -196,15 +196,6 @@ DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, vo
DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res);
// Shuduo: temporary enable for app build
#if 1
typedef void (*__taos_sub_fn_t)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, __taos_sub_fn_t fp,
void *param, int interval);
DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
#endif
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
@ -281,10 +272,6 @@ DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
/* ------------------------------ TMQ END -------------------------------- */
#if 1 // Shuduo: temporary enable for app build
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
#endif
typedef enum {
TSDB_SRV_STATUS_UNAVAILABLE = 0,
TSDB_SRV_STATUS_NETWORK_OK = 1,

View File

@ -200,8 +200,6 @@ struct STag {
#if 1 //================================================================================================================================================
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
#define TD_SUPPORT_BITMAP
#define TD_SUPPORT_READ2
#define TD_SUPPORT_BACK2 // suppport back compatibility of 2.0
#define TASSERT(x) ASSERT(x)

View File

@ -1370,7 +1370,7 @@ typedef struct {
int64_t skey;
int64_t ekey;
int64_t version; // for stream
TSKEY watermark;// for stream
TSKEY watermark; // for stream
char data[];
} SRetrieveTableRsp;
@ -3080,6 +3080,22 @@ typedef struct SDeleteRes {
int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
typedef struct {
int64_t uid;
int64_t ts;
} SSingleDeleteReq;
int32_t tEncodeSSingleDeleteReq(SEncoder* pCoder, const SSingleDeleteReq* pReq);
int32_t tDecodeSSingleDeleteReq(SDecoder* pCoder, SSingleDeleteReq* pReq);
typedef struct {
int64_t suid;
SArray* deleteReqs; // SArray<SSingleDeleteReq>
} SBatchDeleteReq;
int32_t tEncodeSBatchDeleteReq(SEncoder* pCoder, const SBatchDeleteReq* pReq);
int32_t tDecodeSBatchDeleteReq(SDecoder* pCoder, SBatchDeleteReq* pReq);
typedef struct {
int32_t msgIdx;
int32_t msgType;

View File

@ -202,6 +202,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIRM, "alter-confirm", NULL, NULL)

View File

@ -319,17 +319,13 @@ typedef struct {
col_id_t kvIdx; // [0, nKvCols)
} STSRowIter;
void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow);
void tdSTSRowIterInit(STSRowIter *pIter, STSchema *pSchema);
void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow);
bool tdSTSRowIterFetch(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal);
bool tdSTSRowIterNext(STSRowIter *pIter, SCellVal *pVal);
int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow);
bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal);
bool tdGetTpRowDataOfCol(STSRowIter *pIter, col_type_t colType, int32_t offset, SCellVal *pVal);
bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType, col_id_t *nIdx, SCellVal *pVal);
bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal);
bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset, col_id_t colIdx,
SCellVal *pVal);
bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, col_id_t colIdx, SCellVal *pVal);
void tdSCellValPrint(SCellVal *pVal, int8_t colType);
void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag);
#ifdef __cplusplus

View File

@ -54,10 +54,6 @@ typedef struct SFuncExecFuncs {
FExecCombine combine;
} SFuncExecFuncs;
typedef struct SFileBlockInfo {
int32_t numBlocksOfStep;
} SFileBlockInfo;
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
#define TOP_BOTTOM_QUERY_LIMIT 100
@ -171,8 +167,6 @@ typedef struct tExprNode {
};
} tExprNode;
void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *));
struct SScalarParam {
bool colAlloced;
SColumnInfoData *columnData;
@ -182,10 +176,6 @@ struct SScalarParam {
int32_t numOfRows;
};
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
bool isSuperTable);
void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock);
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry);

View File

@ -284,11 +284,13 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_CONSUMER_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x03EA)
#define TSDB_CODE_MND_TOPIC_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x03EB)
#define TSDB_CODE_MND_CGROUP_USED TAOS_DEF_ERROR_CODE(0, 0x03EC)
#define TSDB_CODE_MND_TOPIC_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03ED)
// mnode-stream
#define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0)
#define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1)
#define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2)
#define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3)
// mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)

View File

@ -611,65 +611,6 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionIm
}
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNIEnv *env, jobject jobj, jlong con,
jboolean restart, jstring jtopic,
jstring jsql, jint jinterval) {
jlong sub = 0;
TAOS *taos = (TAOS *)con;
char *topic = NULL;
char *sql = NULL;
jniGetGlobalMethod(env);
jniDebug("jobj:%p, in TSDBJNIConnector_subscribeImp", jobj);
if (jtopic != NULL) {
topic = (char *)(*env)->GetStringUTFChars(env, jtopic, NULL);
}
if (jsql != NULL) {
sql = (char *)(*env)->GetStringUTFChars(env, jsql, NULL);
}
if (topic == NULL || sql == NULL) {
jniDebug("jobj:%p, invalid argument: topic or sql is NULL", jobj);
return sub;
}
TAOS_SUB *tsub = taos_subscribe(taos, (int)restart, topic, sql, NULL, NULL, jinterval);
sub = (jlong)tsub;
if (sub == 0) {
jniDebug("jobj:%p, failed to subscribe: topic:%s", jobj, topic);
} else {
jniDebug("jobj:%p, successfully subscribe: topic: %s", jobj, topic);
}
(*env)->ReleaseStringUTFChars(env, jtopic, topic);
(*env)->ReleaseStringUTFChars(env, jsql, sql);
return sub;
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub) {
jniDebug("jobj:%p, in TSDBJNIConnector_consumeImp, sub:%lld", jobj, sub);
jniGetGlobalMethod(env);
TAOS_SUB *tsub = (TAOS_SUB *)sub;
TAOS_RES *res = taos_consume(tsub);
if (res == NULL) {
jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub);
return 0l;
}
return (jlong)res;
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp(JNIEnv *env, jobject jobj, jlong sub,
jboolean keepProgress) {
TAOS_SUB *tsub = (TAOS_SUB *)sub;
taos_unsubscribe(tsub, keepProgress);
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_validateCreateTableSqlImp(JNIEnv *env, jobject jobj,
jlong con, jbyteArray jsql) {
TAOS *tscon = (TAOS *)con;

View File

@ -939,21 +939,6 @@ const void *taos_get_raw_block(TAOS_RES *res) {
return pRequest->body.resInfo.pData;
}
TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp,
void *param, int interval) {
// TODO
return NULL;
}
TAOS_RES *taos_consume(TAOS_SUB *tsub) {
// TODO
return NULL;
}
void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
// TODO
}
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;

View File

@ -204,7 +204,7 @@ typedef struct {
typedef struct {
SMqCommitCbParamSet* params;
STqOffset* pOffset;
} SMqCommitCbParam2;
} SMqCommitCbParam;
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
@ -212,6 +212,7 @@ tmq_conf_t* tmq_conf_new() {
conf->autoCommit = true;
conf->autoCommitInterval = 5000;
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
conf->hbBgEnable = true;
return conf;
}
@ -368,8 +369,8 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
return sprintf(dst, "%s:%d", topicName, vg);
}
int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param;
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
// push into array
#if 0
@ -440,7 +441,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
tEncodeSTqOffset(&encoder, pOffset);
// build param
SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
pParam->params = pParamSet;
pParam->pOffset = pOffset;
@ -465,7 +466,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
pMsgSendInfo->requestObjRefId = 0;
pMsgSendInfo->param = pParam;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->fp = tmqCommitCb2;
pMsgSendInfo->fp = tmqCommitCb;
pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
// send msg
@ -1313,17 +1314,6 @@ int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
#endif
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
/*int64_t reqOffset;*/
/*if (pVg->currentOffset >= 0) {*/
/*reqOffset = pVg->currentOffset;*/
/*} else {*/
/*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/
/*tscError("unable to poll since no committed offset but reset offset is set to none");*/
/*return NULL;*/
/*}*/
/*reqOffset = tmq->resetOffsetCfg;*/
/*}*/
SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
if (pReq == NULL) {
return NULL;

View File

@ -5770,3 +5770,40 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
}
return 0;
}
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
if (tEncodeI64(pEncoder, pReq->uid) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1;
return 0;
}
int32_t tDecodeSSingleDeleteReq(SDecoder *pDecoder, SSingleDeleteReq *pReq) {
if (tDecodeI64(pDecoder, &pReq->uid) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1;
return 0;
}
int32_t tEncodeSBatchDeleteReq(SEncoder *pEncoder, const SBatchDeleteReq *pReq) {
if (tEncodeI64(pEncoder, pReq->suid) < 0) return -1;
int32_t sz = taosArrayGetSize(pReq->deleteReqs);
if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
SSingleDeleteReq *pOneReq = taosArrayGet(pReq->deleteReqs, i);
if (tEncodeSSingleDeleteReq(pEncoder, pOneReq) < 0) return -1;
}
return 0;
}
int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) {
if (tDecodeI64(pDecoder, &pReq->suid) < 0) return -1;
int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
pReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
if (pReq->deleteReqs == NULL) return -1;
for (int32_t i = 0; i < sz; i++) {
SSingleDeleteReq deleteReq;
if (tDecodeSSingleDeleteReq(pDecoder, &deleteReq) < 0) return -1;
taosArrayPush(pReq->deleteReqs, &deleteReq);
}
return 0;
}

View File

@ -33,8 +33,12 @@ const uint8_t tdVTypeByte[2][3] = {{
// declaration
static uint8_t tdGetBitmapByte(uint8_t byte);
static int32_t tdCompareColId(const void *arg1, const void *arg2);
static FORCE_INLINE int32_t compareKvRowColId(const void *key1, const void *key2);
static bool tdSTSRowIterGetTpVal(STSRowIter *pIter, col_type_t colType, int32_t offset, SCellVal *pVal);
static bool tdSTSRowIterGetKvVal(STSRowIter *pIter, col_id_t colId, col_id_t *nIdx, SCellVal *pVal);
static bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset,
col_id_t colIdx, SCellVal *pVal);
static bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, col_id_t colIdx, SCellVal *pVal);
static void tdSCellValPrint(SCellVal *pVal, int8_t colType);
// implementation
/**
@ -330,14 +334,14 @@ void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag) {
tdSTSRowIterInit(&iter, pSchema);
tdSTSRowIterReset(&iter, row);
printf("%s >>>type:%d,sver:%d ", tag, (int32_t)TD_ROW_TYPE(row), (int32_t)TD_ROW_SVER(row));
for (int i = 0; i < pSchema->numOfCols; ++i) {
STColumn *stCol = pSchema->columns + i;
SCellVal sVal = {255, NULL};
if (!tdSTSRowIterNext(&iter, stCol->colId, stCol->type, &sVal)) {
STColumn *cols = (STColumn *)&iter.pSchema->columns;
while (true) {
SCellVal sVal = {.valType = 255, NULL};
if (!tdSTSRowIterNext(&iter, &sVal)) {
break;
}
ASSERT(sVal.valType == 0 || sVal.valType == 1 || sVal.valType == 2);
tdSCellValPrint(&sVal, stCol->type);
tdSCellValPrint(&sVal, cols[iter.colIdx - 1].type);
}
printf("\n");
}
@ -420,6 +424,16 @@ void tdSCellValPrint(SCellVal *pVal, int8_t colType) {
}
}
static FORCE_INLINE int32_t compareKvRowColId(const void *key1, const void *key2) {
if (*(col_id_t *)key1 > ((SKvRowIdx *)key2)->colId) {
return 1;
} else if (*(col_id_t *)key1 < ((SKvRowIdx *)key2)->colId) {
return -1;
} else {
return 0;
}
}
bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, col_id_t colIdx, SCellVal *pVal) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
tdRowSetVal(pVal, TD_VTYPE_NORM, TD_ROW_KEY_ADDR(pRow));
@ -456,7 +470,7 @@ bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t fl
return true;
}
bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal) {
bool tdSTSRowIterFetch(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pVal->val = &pIter->pRow->ts;
pVal->valType = TD_VTYPE_NORM;
@ -477,10 +491,10 @@ bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCe
return false;
}
}
tdGetTpRowDataOfCol(pIter, pCol->type, pCol->offset - sizeof(TSKEY), pVal);
tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset - sizeof(TSKEY), pVal);
++pIter->colIdx;
} else if (TD_IS_KV_ROW(pIter->pRow)) {
return tdGetKvRowValOfColEx(pIter, colId, colType, &pIter->kvIdx, pVal);
return tdSTSRowIterGetKvVal(pIter, colId, &pIter->kvIdx, pVal);
} else {
pVal->valType = TD_VTYPE_NONE;
terrno = TSDB_CODE_INVALID_PARA;
@ -489,13 +503,68 @@ bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCe
return true;
}
bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType, col_id_t *nIdx, SCellVal *pVal) {
bool tdSTSRowIterNext(STSRowIter *pIter, SCellVal *pVal) {
if (pIter->colIdx >= pIter->pSchema->numOfCols) {
return false;
}
STColumn *pCol = &pIter->pSchema->columns[pIter->colIdx];
if (pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pVal->val = &pIter->pRow->ts;
pVal->valType = TD_VTYPE_NORM;
++pIter->colIdx;
return true;
}
if (TD_IS_TP_ROW(pIter->pRow)) {
tdSTSRowIterGetTpVal(pIter, pCol->type, pCol->offset - sizeof(TSKEY), pVal);
} else if (TD_IS_KV_ROW(pIter->pRow)) {
tdSTSRowIterGetKvVal(pIter, pCol->colId, &pIter->kvIdx, pVal);
} else {
ASSERT(0);
}
++pIter->colIdx;
return true;
}
bool tdSTSRowIterGetTpVal(STSRowIter *pIter, col_type_t colType, int32_t offset, SCellVal *pVal) {
STSRow *pRow = pIter->pRow;
if (pRow->statis == 0) {
pVal->valType = TD_VTYPE_NORM;
if (IS_VAR_DATA_TYPE(colType)) {
pVal->val = POINTER_SHIFT(pRow, *(VarDataOffsetT *)POINTER_SHIFT(TD_ROW_DATA(pRow), offset));
} else {
pVal->val = POINTER_SHIFT(TD_ROW_DATA(pRow), offset);
}
return TSDB_CODE_SUCCESS;
}
if (tdGetBitmapValType(pIter->pBitmap, pIter->colIdx - 1, &pVal->valType, 0) != TSDB_CODE_SUCCESS) {
pVal->valType = TD_VTYPE_NONE;
return terrno;
}
if (pVal->valType == TD_VTYPE_NORM) {
if (IS_VAR_DATA_TYPE(colType)) {
pVal->val = POINTER_SHIFT(pRow, *(VarDataOffsetT *)POINTER_SHIFT(TD_ROW_DATA(pRow), offset));
} else {
pVal->val = POINTER_SHIFT(TD_ROW_DATA(pRow), offset);
}
}
return true;
}
bool tdSTSRowIterGetKvVal(STSRowIter *pIter, col_id_t colId, col_id_t *nIdx, SCellVal *pVal) {
STSRow *pRow = pIter->pRow;
SKvRowIdx *pKvIdx = NULL;
bool colFound = false;
col_id_t kvNCols = tdRowGetNCols(pRow) - 1;
void *pColIdx = TD_ROW_COL_IDX(pRow);
while (*nIdx < kvNCols) {
pKvIdx = (SKvRowIdx *)POINTER_SHIFT(TD_ROW_COL_IDX(pRow), *nIdx * sizeof(SKvRowIdx));
pKvIdx = (SKvRowIdx *)POINTER_SHIFT(pColIdx, *nIdx * sizeof(SKvRowIdx));
if (pKvIdx->colId == colId) {
++(*nIdx);
pVal->val = POINTER_SHIFT(pRow, pKvIdx->offset);
@ -518,48 +587,13 @@ bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType,
}
}
#ifdef TD_SUPPORT_BITMAP
int16_t colIdx = -1;
if (pKvIdx) colIdx = POINTER_DISTANCE(pKvIdx, TD_ROW_COL_IDX(pRow)) / sizeof(SKvRowIdx);
if (tdGetBitmapValType(pIter->pBitmap, colIdx, &pVal->valType, 0) != TSDB_CODE_SUCCESS) {
if (tdGetBitmapValType(pIter->pBitmap, pIter->kvIdx - 1, &pVal->valType, 0) != TSDB_CODE_SUCCESS) {
pVal->valType = TD_VTYPE_NONE;
}
#else
pVal->valType = isNull(pVal->val, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
#endif
return true;
}
bool tdGetTpRowDataOfCol(STSRowIter *pIter, col_type_t colType, int32_t offset, SCellVal *pVal) {
STSRow *pRow = pIter->pRow;
if (IS_VAR_DATA_TYPE(colType)) {
pVal->val = POINTER_SHIFT(pRow, *(VarDataOffsetT *)POINTER_SHIFT(TD_ROW_DATA(pRow), offset));
} else {
pVal->val = POINTER_SHIFT(TD_ROW_DATA(pRow), offset);
}
#ifdef TD_SUPPORT_BITMAP
if (tdGetBitmapValType(pIter->pBitmap, pIter->colIdx - 1, &pVal->valType, 0) != TSDB_CODE_SUCCESS) {
pVal->valType = TD_VTYPE_NONE;
}
#else
pVal->valType = isNull(pVal->val, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
#endif
return true;
}
static FORCE_INLINE int32_t compareKvRowColId(const void *key1, const void *key2) {
if (*(col_id_t *)key1 > ((SKvRowIdx *)key2)->colId) {
return 1;
} else if (*(col_id_t *)key1 < ((SKvRowIdx *)key2)->colId) {
return -1;
} else {
return 0;
}
}
int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) {
STColumn *pTColumn;
SColVal *pColVal;
@ -625,7 +659,7 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) {
if (maxVarDataLen > 0) {
varBuf = taosMemoryMalloc(maxVarDataLen);
if (!varBuf) {
if(isAlloc) {
if (isAlloc) {
taosMemoryFreeClear(*ppRow);
}
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -673,6 +707,19 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) {
return 0;
}
static FORCE_INLINE int32_t tdCompareColId(const void *arg1, const void *arg2) {
int32_t colId = *(int32_t *)arg1;
STColumn *pCol = (STColumn *)arg2;
if (colId < pCol->colId) {
return -1;
} else if (colId == pCol->colId) {
return 0;
} else {
return 1;
}
}
bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pVal->val = &pIter->pRow->ts;
@ -712,19 +759,6 @@ bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCell
return true;
}
static int32_t tdCompareColId(const void *arg1, const void *arg2) {
int32_t colId = *(int32_t *)arg1;
STColumn *pCol = (STColumn *)arg2;
if (colId < pCol->colId) {
return -1;
} else if (colId == pCol->colId) {
return 0;
} else {
return 1;
}
}
int32_t tdGetBitmapValTypeII(const void *pBitmap, int16_t colIdx, TDRowValT *pValType) {
if (!pBitmap || colIdx < 0) {
TASSERT(0);
@ -938,7 +972,7 @@ int32_t tdAppendColValToRow(SRowBuilder *pBuilder, col_id_t colId, int8_t colTyp
break;
case TD_VTYPE_NONE:
if (!pBuilder->hasNone) pBuilder->hasNone = true;
break;
return TSDB_CODE_SUCCESS;
default:
ASSERT(0);
break;
@ -970,13 +1004,11 @@ int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowValT valType, const vo
STSRow *row = pBuilder->pBuf;
// No need to store None/Null values.
if (valType == TD_VTYPE_NORM) {
// ts key stored in STSRow.ts
SKvRowIdx *pColIdx = (SKvRowIdx *)POINTER_SHIFT(TD_ROW_COL_IDX(row), offset);
char *ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row));
pColIdx->colId = colId;
pColIdx->offset = TD_ROW_LEN(row); // the offset include the TD_ROW_HEAD_LEN
if (valType == TD_VTYPE_NORM) {
char *ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row));
if (IS_VAR_DATA_TYPE(colType)) {
if (isCopyVarData) {
memcpy(ptr, val, varDataTLen(val));
@ -987,26 +1019,6 @@ int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowValT valType, const vo
TD_ROW_LEN(row) += TYPE_BYTES[colType];
}
}
#ifdef TD_SUPPORT_BACK2
// NULL/None value
else {
SKvRowIdx *pColIdx = (SKvRowIdx *)POINTER_SHIFT(TD_ROW_COL_IDX(row), offset);
char *ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row));
pColIdx->colId = colId;
pColIdx->offset = TD_ROW_LEN(row); // the offset include the TD_ROW_HEAD_LEN
const void *nullVal = getNullValue(colType);
if (IS_VAR_DATA_TYPE(colType)) {
if (isCopyVarData) {
memcpy(ptr, nullVal, varDataTLen(nullVal));
}
TD_ROW_LEN(row) += varDataTLen(nullVal);
} else {
memcpy(ptr, nullVal, TYPE_BYTES[colType]);
TD_ROW_LEN(row) += TYPE_BYTES[colType];
}
}
#endif
return 0;
}
@ -1044,24 +1056,6 @@ int32_t tdAppendColValToTpRow(SRowBuilder *pBuilder, TDRowValT valType, const vo
memcpy(POINTER_SHIFT(TD_ROW_DATA(row), offset), val, TYPE_BYTES[colType]);
}
}
#ifdef TD_SUPPORT_BACK2
// NULL/None value
else {
// TODO: Null value for new data types imported since 3.0 need to be defined.
const void *nullVal = getNullValue(colType);
if (IS_VAR_DATA_TYPE(colType)) {
// ts key stored in STSRow.ts
*(VarDataOffsetT *)POINTER_SHIFT(TD_ROW_DATA(row), offset) = TD_ROW_LEN(row);
if (isCopyVarData) {
memcpy(POINTER_SHIFT(row, TD_ROW_LEN(row)), nullVal, varDataTLen(nullVal));
}
TD_ROW_LEN(row) += varDataTLen(nullVal);
} else {
memcpy(POINTER_SHIFT(TD_ROW_DATA(row), offset), nullVal, TYPE_BYTES[colType]);
}
}
#endif
return 0;
}
@ -1329,7 +1323,7 @@ void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow) {
pIter->pRow = pRow;
pIter->pBitmap = tdGetBitmapAddr(pRow, pRow->type, pIter->pSchema->flen, tdRowGetNCols(pRow));
pIter->offset = 0;
pIter->colIdx = PRIMARYKEY_TIMESTAMP_COL_ID;
pIter->colIdx = 0; // PRIMARYKEY_TIMESTAMP_COL_ID;
pIter->kvIdx = 0;
}

View File

@ -364,6 +364,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;

View File

@ -32,6 +32,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb);
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);

View File

@ -995,11 +995,13 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndCheckTopicExist(pMnode, pDb) < 0) goto _OVER;
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
/*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
/*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
/*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
if (mndDropStreamByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropSmasByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER;
@ -1706,7 +1708,7 @@ static void setPerfSchemaDbCfg(SDbObj *pDbObj) {
static bool mndGetTablesOfDbFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
SVgObj *pVgroup = pObj;
int32_t *numOfTables = p1;
int64_t uid = *(int64_t*)p2;
int64_t uid = *(int64_t *)p2;
if (pVgroup->dbUid == uid) {
*numOfTables += pVgroup->numOfTables;
}

View File

@ -837,7 +837,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
sdbCancelFetch(pSdb, pIter);
mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
return -1;
} else {
#if 0
@ -929,14 +929,31 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->status, true);
char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&n, pStream->sourceDb, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&n, varDataVal(sourceDB));
varDataSetLen(sourceDB, strlen(varDataVal(sourceDB)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->sourceDb, true);
colDataAppend(pColInfo, numOfRows, (const char *)&sourceDB, false);
char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&n, pStream->targetDb, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&n, varDataVal(targetDB));
varDataSetLen(targetDB, strlen(varDataVal(targetDB)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->targetDb, true);
colDataAppend(pColInfo, numOfRows, (const char *)&targetDB, false);
if (pStream->targetSTbName[0] == 0) {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->targetSTbName, true);
colDataAppend(pColInfo, numOfRows, NULL, true);
} else {
char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&n, pStream->targetSTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
strcpy(&targetSTB[VARSTR_HEADER_SIZE], tNameGetTableName(&n));
varDataSetLen(targetSTB, strlen(varDataVal(targetSTB)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&targetSTB, false);
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->watermark, false);

View File

@ -398,13 +398,27 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
}
// 8. TODO generate logs
mInfo("rebalance calculation completed, rebalanced vg:");
// 8. generate logs
mInfo("mq rebalance: calculation completed, rebalanced vg:");
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
mInfo("vgId:%d, moved from consumer:%" PRId64 ", to consumer:%" PRId64, pOutputRebVg->pVgEp->vgId,
mInfo("mq rebalance: vgId:%d, moved from consumer:%" PRId64 ", to consumer:%" PRId64, pOutputRebVg->pVgEp->vgId,
pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
}
{
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
mInfo("mq rebalance: final cfg: consumer %ld has %d vg", pConsumerEp->consumerId, sz);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
mInfo("mq rebalance: final cfg: vg %d to consumer %ld", pVgEp->vgId, pConsumerEp->consumerId);
}
}
}
// 9. clear
taosHashCleanup(pHash);

View File

@ -782,6 +782,26 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter);
}
int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqTopicObj *pTopic = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
if (pIter == NULL) break;
if (pTopic->dbUid == pDb->uid) {
sdbRelease(pSdb, pTopic);
terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
return -1;
}
sdbRelease(pSdb, pTopic);
}
return 0;
}
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;

View File

@ -172,7 +172,7 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId);
const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq);
// sma
int32_t smaInit();

View File

@ -119,7 +119,7 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
SName stbFullName = {0};
tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
SVCreateStbReq pReq = {0};
pReq.name = (char*)tNameGetTableName(&stbFullName);
pReq.name = (char *)tNameGetTableName(&stbFullName);
pReq.suid = pCfg->dstTbUid;
pReq.schemaRow = pCfg->schemaRow;
pReq.schemaTag = pCfg->schemaTag;
@ -200,8 +200,9 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
goto _err;
}
SBatchDeleteReq deleteReq;
SSubmitReq *pSubmitReq = tdBlockToSubmit((const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid,
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId);
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq);
if (!pSubmitReq) {
smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),

View File

@ -325,7 +325,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
for (int32_t i = 0; i < colActual; i++) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
SCellVal sVal = {0};
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
if (!tdSTSRowIterFetch(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
break;
}
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) {

View File

@ -16,7 +16,7 @@
#include "tq.h"
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId) {
const char* stbFullName, int32_t vgId, SBatchDeleteReq* deleteReq) {
SSubmitReq* ret = NULL;
SArray* schemaReqs = NULL;
SArray* schemaReqSz = NULL;
@ -33,6 +33,9 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
schemaReqSz = taosArrayInit(sz, sizeof(int32_t));
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_DATA) {
//
}
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
@ -179,15 +182,43 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pRes = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
SBatchDeleteReq deleteReq = {0};
tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size);
ASSERT(pTask->tbSink.pTSchema);
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, pVnode->config.vgId);
pTask->tbSink.stbFullName, pVnode->config.vgId, &deleteReq);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
int32_t code;
int32_t len;
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
if (code < 0) {
//
ASSERT(0);
}
SEncoder encoder;
void* buf = taosMemoryCalloc(1, len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, len);
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
tEncoderClear(&encoder);
if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
SRpcMsg msg = {
.msgType = TDMT_VND_BATCH_DEL,
.pCont = buf,
.contLen = len + sizeof(SMsgHead),
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqDebug("failed to put into write-queue since %s", terrstr());
}
}
taosArrayDestroy(deleteReq.deleteReqs);
/*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
// build write msg
SRpcMsg msg = {

View File

@ -182,7 +182,7 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
tsdbInfo("-------------------%d\n", pCol->info.bytes);
// tsdbInfo("-------------------%d\n", pCol->info.bytes);
}
}

View File

@ -29,6 +29,7 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
@ -190,6 +191,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
case TDMT_VND_DELETE:
if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_BATCH_DEL:
if (vnodeProcessBatchDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break;
/* TQ */
case TDMT_VND_MQ_VG_CHANGE:
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
@ -1053,6 +1057,23 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
return 0;
}
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SBatchDeleteReq deleteReq;
SDecoder decoder;
tDecoderInit(&decoder, pReq, len);
tDecodeSBatchDeleteReq(&decoder, &deleteReq);
int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
for (int32_t i = 0; i < sz; i++) {
SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i);
int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, pOneReq->uid, pOneReq->ts, pOneReq->ts);
if (code) {
// TODO
}
}
return 0;
}
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
int32_t code = 0;
SDecoder *pCoder = &(SDecoder){0};

View File

@ -741,6 +741,7 @@ typedef struct STimeSliceOperatorInfo {
SArray* pPrevRow; // SArray<SGroupValue>
SArray* pNextRow; // SArray<SGroupValue>
SArray* pLinearInfo; // SArray<SFillLinearInfo>
bool fillLastPoint;
bool isPrevRowSet;
bool isNextRowSet;
int32_t fillType; // fill type
@ -1017,6 +1018,7 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid);

View File

@ -37,7 +37,6 @@ typedef struct SFillLinearInfo {
SPoint start;
SPoint end;
bool hasNull;
bool fillLastPoint;
int16_t type;
int32_t bytes;
} SFillLinearInfo;

View File

@ -1277,8 +1277,12 @@ void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
}
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
bool init = false;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
if (init) {
continue;
}
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
@ -1295,6 +1299,8 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
} else {
pResInfo->initialized = true;
}
} else {
init = true;
}
}
}
@ -3416,6 +3422,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
}
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
if (code != TSDB_CODE_SUCCESS) {
qError("Create agg result buf failed since %s", tstrerror(code));
return code;
}
@ -3435,7 +3442,11 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf
return code;
}
doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
for (int32_t i = 0; i < numOfCols; ++i) {
pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
}

View File

@ -132,6 +132,8 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
nodesDestroyNode(pJoinOperator->pCondAfterMerge);
pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes);
taosMemoryFreeClear(param);
}

View File

@ -319,6 +319,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
return TSDB_CODE_SUCCESS;
} else {
qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
*status = FUNC_DATA_REQUIRED_DATA_LOAD;
}
}
@ -326,13 +327,11 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
// try to filter data block according to sma info
if (pTableScanInfo->pFilterNode != NULL) {
if (!loadSMA) {
doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
}
bool keep = doFilterByBlockSMA(pTableScanInfo->pFilterNode, pBlock->pBlockAgg, taosArrayGetSize(pBlock->pDataBlock),
pBlockInfo->rows);
if (pTableScanInfo->pFilterNode != NULL && (!loadSMA)) {
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
if (success) {
size_t size = taosArrayGetSize(pBlock->pDataBlock);
bool keep = doFilterByBlockSMA(pTableScanInfo->pFilterNode, pBlock->pBlockAgg, size, pBlockInfo->rows);
if (!keep) {
qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
@ -342,6 +341,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
return TSDB_CODE_SUCCESS;
}
}
}
// free the sma info, since it should not be involved in later computing process.
taosMemoryFreeClear(pBlock->pBlockAgg);
@ -1174,10 +1174,15 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
bool isClosed = false;
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
isClosed = isCloseWindow(&win, &pInfo->twAggSup);
}
// must check update info first.
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup) &&
if ((update || (isSignleIntervalWindow(pInfo) && isClosed &&
isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup))) && out) {
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
}
@ -2458,81 +2463,6 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
#if 0
int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;
STagScanInfo *pInfo = pOperator->info;
SSDataBlock *pRes = pInfo->pRes;
int32_t count = 0;
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
int32_t functionId = getExprFunctionId(&pOperator->exprSupp.pExprInfo[0]);
if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
assert(pQueryAttr->numOfOutput == 1);
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
int32_t rsize = pExprInfo->base.resSchema.bytes;
count = 0;
int16_t bytes = pExprInfo->base.resSchema.bytes;
int16_t type = pExprInfo->base.resSchema.type;
for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) {
bytes = pQueryAttr->tagColList[i].bytes;
type = pQueryAttr->tagColList[i].type;
break;
}
}
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
int32_t i = pInfo->curPos++;
STableQueryInfo *item = taosArrayGetP(pa, i);
char *output = pColInfo->pData + count * rsize;
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
output = varDataVal(output);
STableId* id = TSDB_TABLEID(item->pTable);
*(int16_t *)output = 0;
output += sizeof(int16_t);
*(int64_t *)output = id->uid; // memory align problem, todo serialize
output += sizeof(id->uid);
*(int32_t *)output = id->tid;
output += sizeof(id->tid);
*(int32_t *)output = pQueryAttr->vgId;
output += sizeof(pQueryAttr->vgId);
char* data = NULL;
if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
data = tsdbGetTableName(item->pTable);
} else {
data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes);
}
doSetTagValueToResultBuf(output, data, type, bytes);
count += 1;
}
//qDebug("QInfo:0x%"PRIx64" create (tableId, tag) info completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
} else if (functionId == FUNCTION_COUNT) {// handle the "count(tbname)" query
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
*(int64_t*)pColInfo->pData = pInfo->totalTables;
count = 1;
pOperator->status = OP_EXEC_DONE;
//qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
} else { // return only the tags|table name etc.
#endif
STagScanInfo* pInfo = pOperator->info;
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
SSDataBlock* pRes = pInfo->pRes;
@ -2610,6 +2540,8 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
STagScanInfo* pInfo = (STagScanInfo*)param;
pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosArrayDestroy(pInfo->pColMatchInfo);
taosMemoryFreeClear(param);
}

View File

@ -851,23 +851,34 @@ static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t g
return TSDB_CODE_SUCCESS;
}
static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SHashObj* pUpdatedMap) {
SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
if (newPos == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
newPos->groupId = groupId;
newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
*(int64_t*)newPos->key = ts;
SWinRes key = {.ts = ts, .groupId = groupId};
if (taosHashPut(pUpdatedMap, &key, sizeof(SWinRes), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) {
taosMemoryFree(newPos);
}
return TSDB_CODE_SUCCESS;
}
static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) {
return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);;
}
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated);
}
static void removeResult(SArray* pUpdated, SWinRes* pKey) {
int32_t size = taosArrayGetSize(pUpdated);
int32_t index = binarySearchCom(pUpdated, size, pKey, TSDB_ORDER_DESC, compareResKey);
if (index >= 0 && 0 == compareResKey(pKey, pUpdated, index)) {
taosArrayRemove(pUpdated, index);
}
}
static void removeResults(SArray* pWins, SArray* pUpdated) {
static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) {
int32_t size = taosArrayGetSize(pWins);
for (int32_t i = 0; i < size; i++) {
SWinRes* pW = taosArrayGet(pWins, i);
removeResult(pUpdated, pW);
taosHashRemove(pUpdatedMap, pW, sizeof(SWinRes));
}
}
@ -894,11 +905,14 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) {
return -1;
}
static void removeDeleteResults(SArray* pUpdated, SArray* pDelWins) {
int32_t upSize = taosArrayGetSize(pUpdated);
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
if (!pUpdatedMap || taosHashGetSize(pUpdatedMap) == 0) {
return;
}
int32_t delSize = taosArrayGetSize(pDelWins);
for (int32_t i = 0; i < upSize; i++) {
SResKeyPos* pResKey = taosArrayGetP(pUpdated, i);
void* pIte = NULL;
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
SResKeyPos* pResKey = (SResKeyPos*)pIte;
int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes);
if (index >= 0 && 0 == compareWinRes(pResKey, pDelWins, index)) {
taosArrayRemove(pDelWins, index);
@ -914,7 +928,7 @@ bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup) {
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) { return isOverdue(pWin->ekey, pSup); }
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag, SArray* pUpdated) {
int32_t scanFlag, SHashObj* pUpdatedMap) {
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
@ -940,7 +954,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResultRow(pResult, tableGroupId, pUpdated);
saveWinResultRow(pResult, tableGroupId, pUpdatedMap);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
}
}
@ -997,7 +1011,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResultRow(pResult, tableGroupId, pUpdated);
saveWinResultRow(pResult, tableGroupId, pUpdatedMap);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
}
@ -1437,7 +1451,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
}
}
static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) {
void* pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
@ -1446,7 +1460,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1455,7 +1469,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
}
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
SHashObj* pPullDataMap, SArray* closeWins, SArray* pRecyPages,
SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pRecyPages,
SDiskbasedBuf* pDiscBuf) {
qDebug("===stream===close interval window");
void* pIte = NULL;
@ -1487,7 +1501,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
}
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, closeWins);
int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, closeWins);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1577,11 +1591,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
break;
}
// qInfo("===stream===%ld", pBlock->info.version);
printDataBlock(pBlock, "single interval recv");
if (pBlock->info.type == STREAM_CLEAR) {
@ -1594,7 +1611,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
continue;
}
@ -1617,17 +1634,24 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap);
}
pOperator->status = OP_RES_TO_RETURN;
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdated,
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
void* pIte = NULL;
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
taosArrayPush(pUpdated, pIte);
}
taosHashCleanup(pUpdatedMap);
taosArraySort(pUpdated, resultrowComparAsc);
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
removeDeleteResults(pUpdated, pInfo->pDelWins);
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows > 0) {
return pInfo->pDelRes;
@ -1695,7 +1719,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
for (int32_t i = 0; i < numOfCols; i++) {
if (!fmIsInvertible(pFCtx[i].functionId)) {
if (fmIsUserDefinedFunc(pFCtx[i].functionId) || !fmIsInvertible(pFCtx[i].functionId)) {
return false;
}
}
@ -2088,8 +2112,10 @@ static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock
pSliceInfo->isNextRowSet = true;
}
static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex,
bool isLastRow) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
bool fillLastPoint = pSliceInfo->fillLastPoint;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
@ -2097,16 +2123,22 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
// null data should not be kept since it can not be used to perform interpolation
if (!colDataIsNull_s(pColInfoData, i)) {
int64_t startKey = *(int64_t*)colDataGetData(pTsCol, rowIndex);
int64_t endKey = *(int64_t*)colDataGetData(pTsCol, rowIndex + 1);
pLinearInfo->start.key = startKey;
pLinearInfo->end.key = endKey;
if (isLastRow) {
pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
memcpy(pLinearInfo->start.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes);
} else if (fillLastPoint) {
pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
memcpy(pLinearInfo->end.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes);
} else {
pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex + 1);
char* val;
val = colDataGetData(pColInfoData, rowIndex);
memcpy(pLinearInfo->start.val, val, pLinearInfo->bytes);
val = colDataGetData(pColInfoData, rowIndex + 1);
memcpy(pLinearInfo->end.val, val, pLinearInfo->bytes);
}
pLinearInfo->hasNull = false;
} else {
@ -2114,6 +2146,8 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
}
}
pSliceInfo->fillLastPoint = isLastRow ? true : false;
}
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup,
@ -2270,7 +2304,7 @@ static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB
}
pInfo->pLinearInfo = taosArrayInit(4, sizeof(SFillLinearInfo));
if (pInfo->pNextRow == NULL) {
if (pInfo->pLinearInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -2284,15 +2318,20 @@ static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB
linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes);
linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes);
linearInfo.hasNull = false;
linearInfo.fillLastPoint = false;
linearInfo.type = pColInfo->info.type;
linearInfo.bytes = pColInfo->info.bytes;
taosArrayPush(pInfo->pLinearInfo, &linearInfo);
}
pInfo->fillLastPoint = false;
return TSDB_CODE_SUCCESS;
}
static bool needToFillLastPoint(STimeSliceOperatorInfo* pSliceInfo) {
return (pSliceInfo->fillLastPoint == true && pSliceInfo->fillType == TSDB_FILL_LINEAR);
}
static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
int32_t code;
code = initPrevRowsKeeper(pInfo, pBlock);
@ -2357,6 +2396,23 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
if (i == 0 && needToFillLastPoint(pSliceInfo)) { // first row in current block
doKeepLinearInfo(pSliceInfo, pBlock, i, false);
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator);
break;
}
}
if (ts == pSliceInfo->current) {
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j];
@ -2375,9 +2431,10 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
// for linear interpolation, always fill value between this and next points;
// if its the first point in data block, also fill values between previous(if there's any) and this point;
// if its the last point in data block, no need to fill, but reserve this point as the start value for next data block.
// if its the last point in data block, no need to fill, but reserve this point as the start value and do
// the interpolation when processing next data block.
if (pSliceInfo->fillType == TSDB_FILL_LINEAR) {
doKeepLinearInfo(pSliceInfo, pBlock, i);
doKeepLinearInfo(pSliceInfo, pBlock, i, false);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (i < pBlock->info.rows - 1) {
@ -2397,6 +2454,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
break;
}
}
} else {// it is the last row of current block
//store ts value as start, and calculate interp value when processing next block
doKeepLinearInfo(pSliceInfo, pBlock, i, true);
}
} else { // non-linear interpolation
pSliceInfo->current =
@ -2415,7 +2475,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
doKeepPrevRows(pSliceInfo, pBlock, i);
if (pSliceInfo->fillType == TSDB_FILL_LINEAR) {
doKeepLinearInfo(pSliceInfo, pBlock, i);
doKeepLinearInfo(pSliceInfo, pBlock, i, false);
// no need to increate pSliceInfo->current here
//pSliceInfo->current =
// taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
@ -2495,7 +2555,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pSliceInfo->fillType == TSDB_FILL_LINEAR) {
doKeepLinearInfo(pSliceInfo, pBlock, i);
doKeepLinearInfo(pSliceInfo, pBlock, i, false);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (i < pBlock->info.rows - 1) {
@ -2831,7 +2891,7 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
}
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
SArray* pUpdated) {
SHashObj* pUpdatedMap) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
@ -2913,8 +2973,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey,
NULL, TSDB_ORDER_ASC);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
saveResultRow(pResult, tableGroupId, pUpdated);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
saveWinResultRow(pResult, tableGroupId, pUpdatedMap);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
@ -3020,6 +3080,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
TSKEY maxTs = INT64_MIN;
SExprSupp* pSup = &pOperator->exprSupp;
@ -3077,7 +3139,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
clearSpecialDataBlock(pInfo->pUpdateRes);
removeDeleteResults(pUpdated, pInfo->pDelWins);
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
pOperator->status = OP_RES_TO_RETURN;
qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
break;
@ -3104,7 +3166,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
taosArrayDestroy(pUpWins);
continue;
}
removeResults(pUpWins, pUpdated);
removeResults(pUpWins, pUpdatedMap);
copyDataBlock(pInfo->pUpdateRes, pBlock);
// copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
pInfo->returnUpdate = true;
@ -3122,15 +3184,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, pUpdated);
continue;
}
removeResults(pInfo->pDelWins, pUpdated);
removeResults(pInfo->pDelWins, pUpdatedMap);
break;
} else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins);
removeResults(pUpWins, pUpdated);
removeResults(pUpWins, pUpdatedMap);
taosArrayDestroy(pUpWins);
if (taosArrayGetSize(pUpdated) > 0) {
break;
@ -3146,7 +3208,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated);
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
if (IS_FINAL_OP(pInfo)) {
int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren);
@ -3171,12 +3233,19 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
if (IS_FINAL_OP(pInfo)) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap,
pUpdated, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
pUpdatedMap, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
} else {
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
}
void* pIte = NULL;
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
taosArrayPush(pUpdated, pIte);
}
taosHashCleanup(pUpdatedMap);
taosArraySort(pUpdated, resultrowComparAsc);
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);

View File

@ -13,19 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tsimplehash.h"
#include "os.h"
#include "taoserror.h"
#define SHASH_DEFAULT_LOAD_FACTOR 0.75
#define HASH_MAX_CAPACITY (1024*1024*16)
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
#define SHASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * SHASH_DEFAULT_LOAD_FACTOR)
#define GET_SHASH_NODE_KEY(_n, _dl) ((char*)(_n) + sizeof(SHNode) + (_dl))
#define GET_SHASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHNode))
#define GET_SHASH_NODE_KEY(_n, _dl) ((char *)(_n) + sizeof(SHNode) + (_dl))
#define GET_SHASH_NODE_DATA(_n) ((char *)(_n) + sizeof(SHNode))
#define HASH_INDEX(v, c) ((v) & ((c)-1))
#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * SHASH_DEFAULT_LOAD_FACTOR)
#define FREE_HASH_NODE(_n) \
do { \
@ -62,7 +61,7 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t
capacity = 4;
}
SSHashObj* pHashObj = (SSHashObj*) taosMemoryCalloc(1, sizeof(SSHashObj));
SSHashObj *pHashObj = (SSHashObj *)taosMemoryCalloc(1, sizeof(SSHashObj));
if (pHashObj == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
@ -91,7 +90,7 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) {
if (pHashObj == NULL) {
return 0;
}
return (int32_t)atomic_load_64((int64_t*)&pHashObj->size);
return (int32_t)atomic_load_64((int64_t *)&pHashObj->size);
}
static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) {
@ -108,41 +107,42 @@ static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *pDat
}
static void taosHashTableResize(SSHashObj *pHashObj) {
if (!HASH_NEED_RESIZE(pHashObj)) {
if (!SHASH_NEED_RESIZE(pHashObj)) {
return;
}
int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u);
if (newCapacity > HASH_MAX_CAPACITY) {
// uDebug("current capacity:%zu, maximum capacity:%d, no resize applied due to limitation is reached",
// pHashObj->capacity, HASH_MAX_CAPACITY);
// uDebug("current capacity:%zu, maximum capacity:%d, no resize applied due to limitation is reached",
// pHashObj->capacity, HASH_MAX_CAPACITY);
return;
}
int64_t st = taosGetTimestampUs();
void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity);
if (pNewEntryList == NULL) {
// qWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
// qWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
return;
}
size_t inc = newCapacity - pHashObj->capacity;
memset((char*)pNewEntryList + pHashObj->capacity * sizeof(void*), 0, inc);
memset((char *)pNewEntryList + pHashObj->capacity * sizeof(void *), 0, inc);
pHashObj->hashList = pNewEntryList;
pHashObj->capacity = newCapacity;
for (int32_t idx = 0; idx < pHashObj->capacity; ++idx) {
SHNode* pNode = pHashObj->hashList[idx];
SHNode *pNext;
SHNode *pPrev = NULL;
SHNode *pNode = pHashObj->hashList[idx];
if (pNode == NULL) {
continue;
}
SHNode *pNext;
SHNode *pPrev = NULL;
while (pNode != NULL) {
void* key = GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen);
void *key = GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen);
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->dataLen);
int32_t newIdx = HASH_INDEX(hashVal, pHashObj->capacity);
@ -166,8 +166,9 @@ static void taosHashTableResize(SSHashObj *pHashObj) {
int64_t et = taosGetTimestampUs();
// uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", (int32_t)pHashObj->capacity,
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
// uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms",
// (int32_t)pHashObj->capacity,
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
}
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
@ -210,7 +211,7 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
pNewNode->next = pHashObj->hashList[slot];
pHashObj->hashList[slot] = pNewNode;
atomic_add_fetch_64(&pHashObj->size, 1);
} else { //update data
} else { // update data
memcpy(GET_SHASH_NODE_DATA(pNode), data, pHashObj->dataLen);
}
@ -230,9 +231,7 @@ static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void
return pNode;
}
static FORCE_INLINE bool taosHashTableEmpty(const SSHashObj *pHashObj) {
return tSimpleHashGetSize(pHashObj) == 0;
}
static FORCE_INLINE bool taosHashTableEmpty(const SSHashObj *pHashObj) { return tSimpleHashGetSize(pHashObj) == 0; }
void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || key == NULL) {
@ -299,9 +298,9 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) {
return (pHashObj->capacity * sizeof(void *)) + sizeof(SHNode) * tSimpleHashGetSize(pHashObj) + sizeof(SSHashObj);
}
void *tSimpleHashGetKey(const SSHashObj* pHashObj, void *data, size_t* keyLen) {
void *tSimpleHashGetKey(const SSHashObj *pHashObj, void *data, size_t *keyLen) {
int32_t offset = offsetof(SHNode, data);
SHNode *node = ((SHNode*)(char*)data - offset);
SHNode *node = ((SHNode *)(char *)data - offset);
if (keyLen != NULL) {
*keyLen = pHashObj->keyLen;
}

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TAGGFUNCTION_H
#define TDENGINE_TAGGFUNCTION_H
#ifndef TDENGINE_TFUNCTIONINT_H
#define TDENGINE_TFUNCTIONINT_H
#ifdef __cplusplus
extern "C" {
@ -28,17 +28,6 @@ extern "C" {
#include "function.h"
#include "tudf.h"
#define AVG_FUNCTION_INTER_BUFFER_SIZE 50
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value
#define DATA_SET_FLAG_SIZE sizeof(DATA_SET_FLAG)
typedef struct SInterpInfoDetail {
TSKEY ts; // interp specified timestamp
int8_t type;
int8_t primaryCol;
} SInterpInfoDetail;
bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const char *maxval);
/**
@ -57,4 +46,4 @@ static FORCE_INLINE void initResultRowEntry(SResultRowEntryInfo *pResInfo, int32
}
#endif
#endif // TDENGINE_TAGGFUNCTION_H
#endif // TDENGINE_TFUNCTIONINT_H

View File

@ -18,10 +18,10 @@
#include "function.h"
#include "query.h"
#include "querynodes.h"
#include "taggfunction.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tdigest.h"
#include "tfunctionInt.h"
#include "tglobal.h"
#include "thistogram.h"
#include "tpercentile.h"
@ -312,14 +312,6 @@ typedef struct SGroupKeyInfo {
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
do { \
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
SqlFunctionCtx* __ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
__ctx->fpSet.process(__ctx); \
} \
} while (0);
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
do { \
for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
@ -506,8 +498,7 @@ int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
cleanupResultRowEntry(pResInfo);
pResInfo->isNullRes = (pResInfo->isNullRes == 1) ? 1 : (pResInfo->numOfRes == 0);;
char* in = finalResult;
colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);
@ -4284,9 +4275,9 @@ static int32_t histogramFunctionImpl(SqlFunctionCtx* pCtx, bool isPartial) {
}
if (!isPartial) {
SET_VAL(GET_RES_INFO(pCtx), numOfElems, pInfo->numOfBins);
GET_RES_INFO(pCtx)->numOfRes = pInfo->numOfBins;
} else {
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
GET_RES_INFO(pCtx)->numOfRes = 1;
}
return TSDB_CODE_SUCCESS;
}

File diff suppressed because it is too large Load Diff

View File

@ -1,67 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "function.h"
#include "os.h"
#include "texception.h"
#include "tmsg.h"
static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *));
void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)) {
if (pNode == NULL) {
return;
}
if (pNode->nodeType == TEXPR_BINARYEXPR_NODE || pNode->nodeType == TEXPR_UNARYEXPR_NODE) {
doExprTreeDestroy(&pNode, fp);
}
taosMemoryFree(pNode);
}
static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
if (*pExpr == NULL) {
return;
}
taosMemoryFree(*pExpr);
*pExpr = NULL;
}
// TODO: these three functions should be made global
static void* exception_calloc(size_t nmemb, size_t size) {
void* p = taosMemoryCalloc(nmemb, size);
if (p == NULL) {
THROW(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
return p;
}
static void* exception_malloc(size_t size) {
void* p = taosMemoryMalloc(size);
if (p == NULL) {
THROW(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
return p;
}
static UNUSED_FUNC char* exception_strdup(const char* str) {
char* p = strdup(str);
if (p == NULL) {
THROW(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
return p;
}

View File

@ -0,0 +1,86 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosdef.h"
#include "tmsg.h"
#include "thash.h"
#include "ttypes.h"
#include "function.h"
#include "tbuffer.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tfunctionInt.h"
#include "thistogram.h"
#include "tpercentile.h"
#include "ttszip.h"
#include "tudf.h"
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell) {
pCell->initialized = false;
}
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock) {
int32_t maxRows = 0;
for (int32_t j = 0; j < num; ++j) {
#if 0
int32_t id = pCtx[j].functionId;
/*
* ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output
*/
if (id == FUNCTION_TS || id == FUNCTION_TAG || id == FUNCTION_TAGPRJ) {
continue;
}
#endif
SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[j]);
if (pResInfo != NULL && maxRows < pResInfo->numOfRes) {
maxRows = pResInfo->numOfRes;
}
}
assert(maxRows >= 0);
blockDataEnsureCapacity(pResBlock, maxRows);
for(int32_t i = 0; i < num; ++i) {
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[i]);
if (pResInfo->numOfRes == 0) {
for(int32_t j = 0; j < pResInfo->numOfRes; ++j) {
colDataAppend(pCol, j, NULL, true); // TODO add set null data api
}
} else {
for (int32_t j = 0; j < pResInfo->numOfRes; ++j) {
colDataAppend(pCol, j, GET_ROWCELL_INTERBUF(pResInfo), false);
}
}
}
pResBlock->info.rows = maxRows;
return maxRows;
}
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry) {
assert(pEntry != NULL);
return pEntry->complete;
}
bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry) {
return pEntry->initialized;
}

View File

@ -1192,7 +1192,10 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
pBuilder->hasNone = true;
}
tdSRowEnd(pBuilder);
*gotRow = true;
#ifdef TD_DEBUG_PRINT_ROW
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(schema, spd->numOfCols, 1);
tdSRowPrint(row, pSTSchema, __func__);
@ -1201,7 +1204,6 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
}
// *len = pBuilder->extendedRowSize;
tdSRowEnd(pBuilder);
return TSDB_CODE_SUCCESS;
}

View File

@ -124,7 +124,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
}
pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
pInfo->pCloseWinSBF = NULL;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
pInfo->maxVersion = 0;
pInfo->scanGroupId = 0;

View File

@ -163,6 +163,7 @@ typedef struct SSyncNode {
bool changing;
int64_t startTime;
int64_t leaderTime;
int64_t lastReplicateTime;
} SSyncNode;

View File

@ -32,9 +32,9 @@ typedef struct SRespStub {
} SRespStub;
typedef struct SSyncRespMgr {
SHashObj * pRespHash;
SHashObj *pRespHash;
int64_t ttl;
void * data;
void *data;
TdThreadMutex mutex;
uint64_t seqNum;
} SSyncRespMgr;
@ -46,7 +46,8 @@ int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t index);
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub);
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub);
void syncRespClean(SSyncRespMgr *pObj);
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl);
void syncRespCleanRsp(SSyncRespMgr *pObj);
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp);
#ifdef __cplusplus
}

View File

@ -1100,6 +1100,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
int64_t timeNow = taosGetTimestampMs();
pSyncNode->startTime = timeNow;
pSyncNode->leaderTime = timeNow;
pSyncNode->lastReplicateTime = timeNow;
syncNodeEventLog(pSyncNode, "sync open");
@ -2015,6 +2016,8 @@ void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
}
}
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// maybe clear leader cache
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
@ -2028,6 +2031,9 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// reset elect timer
syncNodeResetElectTimer(pSyncNode);
// send rsp to client
syncNodeLeaderChangeRsp(pSyncNode);
// call back
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) {
pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
@ -2068,6 +2074,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->leaderTime = taosGetTimestampMs();
// reset restoreFinish
pSyncNode->restoreFinish = false;
@ -2954,8 +2962,11 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
}
ths->restoreFinish = true;
int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "restore finish, index:%" PRId64, pEntry->index);
snprintf(eventLog, sizeof(eventLog), "restore finish, index:%ld, elapsed:%ld ms, ", pEntry->index,
restoreDelay);
syncNodeEventLog(ths, eventLog);
}
}

View File

@ -108,13 +108,19 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
return 0; // get none object
}
void syncRespClean(SSyncRespMgr *pObj) {
void syncRespCleanRsp(SSyncRespMgr *pObj) {
taosThreadMutexLock(&(pObj->mutex));
syncRespCleanByTTL(pObj, pObj->ttl);
syncRespCleanByTTL(pObj, -1, true);
taosThreadMutexUnlock(&(pObj->mutex));
}
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
void syncRespClean(SSyncRespMgr *pObj) {
taosThreadMutexLock(&(pObj->mutex));
syncRespCleanByTTL(pObj, pObj->ttl, false);
taosThreadMutexUnlock(&(pObj->mutex));
}
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
int cnt = 0;
int sum = 0;
@ -126,12 +132,12 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
while (pStub) {
size_t len;
void * key = taosHashGetKey(pStub, &len);
void *key = taosHashGetKey(pStub, &len);
uint64_t *pSeqNum = (uint64_t *)key;
sum++;
int64_t nowMS = taosGetTimestampMs();
if (nowMS - pStub->createTime > ttl) {
if (nowMS - pStub->createTime > ttl || -1 == ttl) {
taosArrayPush(delIndexArray, pSeqNum);
cnt++;
@ -148,7 +154,14 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
pStub->rpcMsg.pCont = NULL;
pStub->rpcMsg.contLen = 0;
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &(pStub->rpcMsg), cbMeta);
// TODO: and make rpcMsg body, call commit cb
// pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &(pStub->rpcMsg), cbMeta);
pStub->rpcMsg.code = TSDB_CODE_SYN_NOT_LEADER;
if (pStub->rpcMsg.info.handle != NULL) {
tmsgSendRsp(&(pStub->rpcMsg));
}
}
pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, pStub);

View File

@ -281,11 +281,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC, "Invalid topic")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_QUERY, "Topic with invalid query")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Topic with invalid option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED, "Consumer unchanged")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED, "Topic unchanged")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST, "Subcribe not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_OFFSET_NOT_EXIST, "Offset not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_MUST_BE_DELETED, "Topic must be dropped first")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer")
// mnode-stream

View File

@ -112,7 +112,7 @@ ELSE ()
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}"
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}"
INSTALL_COMMAND
COMMAND wget -c https://github.com/upx/upx/releases/download/v3.96/upx-3.96-${PLATFORM_ARCH_STR}_linux.tar.xz -O $ENV{HOME}/upx.tar.xz && tar -xvJf $ENV{HOME}/upx.tar.xz -C $ENV{HOME}/ --strip-components 1 > /dev/null && $ENV{HOME}/upx taosadapter || :
COMMAND wget -nc https://github.com/upx/upx/releases/download/v3.96/upx-3.96-${PLATFORM_ARCH_STR}_linux.tar.xz -O $ENV{HOME}/upx.tar.xz && tar -xvJf $ENV{HOME}/upx.tar.xz -C $ENV{HOME}/ --strip-components 1 > /dev/null && $ENV{HOME}/upx taosadapter || :
COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/

View File

@ -38,7 +38,7 @@
#define SHELL_STARTUP "Check the details of the service status."
#define SHELL_WIDTH "Set the default binary display width, default is 30."
#define SHELL_NET_ROLE "Net role when network connectivity test, options: client|server."
#define SHELL_PKG_LEN "Packet length used for net test, default is 1024 bytes."
#define SHELL_PKT_LEN "Packet length used for net test, default is 1024 bytes."
#define SHELL_PKT_NUM "Packet numbers used for net test, default is 100."
#define SHELL_VERSION "Print program version."
#define SHELL_EMAIL "<support@taosdata.com>"
@ -62,7 +62,7 @@ void shellPrintHelp() {
printf("%s%s%s%s\r\n", indent, "-f,", indent, SHELL_FILE);
printf("%s%s%s%s\r\n", indent, "-h,", indent, SHELL_HOST);
printf("%s%s%s%s\r\n", indent, "-k,", indent, SHELL_CHECK);
printf("%s%s%s%s\r\n", indent, "-l,", indent, SHELL_PKG_LEN);
printf("%s%s%s%s\r\n", indent, "-l,", indent, SHELL_PKT_LEN);
printf("%s%s%s%s\r\n", indent, "-n,", indent, SHELL_NET_ROLE);
printf("%s%s%s%s\r\n", indent, "-N,", indent, SHELL_PKT_NUM);
printf("%s%s%s%s\r\n", indent, "-p,", indent, SHELL_PASSWORD);
@ -105,7 +105,7 @@ static struct argp_option shellOptions[] = {
{"startup", 't', 0, 0, SHELL_STARTUP},
{"display-width", 'w', "WIDTH", 0, SHELL_WIDTH},
{"netrole", 'n', "NETROLE", 0, SHELL_NET_ROLE},
{"pktlen", 'l', "PKTLEN", 0, SHELL_PKG_LEN},
{"pktlen", 'l', "PKTLEN", 0, SHELL_PKT_LEN},
#ifdef WEBSOCKET
{"dsn", 'E', "DSN", 0, SHELL_DSN},
{"restful", 'R', 0, 0, SHELL_REST},
@ -228,7 +228,7 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
SShellArgs *pArgs = &shell.args;
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "--usage") == 0 || strcmp(argv[i], "-?") == 0) {
if (strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "--usage") == 0 || strcmp(argv[i], "-?") == 0 || strcmp(argv[i], "/?") == 0) {
shellParseSingleOpt('?', NULL);
return 0;
}

@ -1 +0,0 @@
Subproject commit 3c7dafeea3e558968165b73bee0f51024898e3da