Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize

This commit is contained in:
Hongze Cheng 2023-06-16 13:54:59 +08:00
commit d97f34931d
74 changed files with 1045 additions and 254 deletions

View File

@ -52,7 +52,7 @@ TDengine 还提供一组辅助工具软件 taosTools目前它包含 taosBench
### Ubuntu 18.04 及以上版本 & Debian ### Ubuntu 18.04 及以上版本 & Debian
```bash ```bash
sudo apt-get install -y gcc cmake build-essential git libssl-dev libgflags2.2 libgflags-dev libgeos-dev sudo apt-get install -y gcc cmake build-essential git libssl-dev libgflags2.2 libgflags-dev
``` ```
#### 为 taos-tools 安装编译需要的软件 #### 为 taos-tools 安装编译需要的软件
@ -68,14 +68,14 @@ sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-d
```bash ```bash
sudo yum install epel-release sudo yum install epel-release
sudo yum update sudo yum update
sudo yum install -y gcc gcc-c++ make cmake3 git openssl-devel geos geos-devel sudo yum install -y gcc gcc-c++ make cmake3 git openssl-devel
sudo ln -sf /usr/bin/cmake3 /usr/bin/cmake sudo ln -sf /usr/bin/cmake3 /usr/bin/cmake
``` ```
### CentOS 8/Fedora/Rocky Linux ### CentOS 8/Fedora/Rocky Linux
```bash ```bash
sudo dnf install -y gcc gcc-c++ make cmake epel-release git openssl-devel geos geos-devel sudo dnf install -y gcc gcc-c++ make cmake epel-release git openssl-devel
``` ```
#### 在 CentOS 上构建 taosTools 安装依赖软件 #### 在 CentOS 上构建 taosTools 安装依赖软件
@ -117,7 +117,7 @@ scl enable devtoolset-9 -- bash
### macOS ### macOS
``` ```
brew install argp-standalone pkgconfig geos brew install argp-standalone pkgconfig
``` ```
### 设置 golang 开发环境 ### 设置 golang 开发环境

View File

@ -60,7 +60,7 @@ To build TDengine, use [CMake](https://cmake.org/) 3.0.2 or higher versions in t
### Ubuntu 18.04 and above or Debian ### Ubuntu 18.04 and above or Debian
```bash ```bash
sudo apt-get install -y gcc cmake build-essential git libssl-dev libgflags2.2 libgflags-dev libgeos-dev sudo apt-get install -y gcc cmake build-essential git libssl-dev libgflags2.2 libgflags-dev
``` ```
#### Install build dependencies for taosTools #### Install build dependencies for taosTools
@ -76,14 +76,14 @@ sudo apt install build-essential libjansson-dev libsnappy-dev liblzma-dev libz-d
```bash ```bash
sudo yum install epel-release sudo yum install epel-release
sudo yum update sudo yum update
sudo yum install -y gcc gcc-c++ make cmake3 git openssl-devel geos geos-devel sudo yum install -y gcc gcc-c++ make cmake3 git openssl-devel
sudo ln -sf /usr/bin/cmake3 /usr/bin/cmake sudo ln -sf /usr/bin/cmake3 /usr/bin/cmake
``` ```
### CentOS 8/Fedora/Rocky Linux ### CentOS 8/Fedora/Rocky Linux
```bash ```bash
sudo dnf install -y gcc gcc-c++ make cmake epel-release git openssl-devel geos geos-devel sudo dnf install -y gcc gcc-c++ make cmake epel-release git openssl-devel
``` ```
#### Install build dependencies for taosTools on CentOS #### Install build dependencies for taosTools on CentOS
@ -124,7 +124,7 @@ scl enable devtoolset-9 -- bash
### macOS ### macOS
``` ```
brew install argp-standalone pkgconfig geos brew install argp-standalone pkgconfig
``` ```
### Setup golang environment ### Setup golang environment

View File

@ -115,18 +115,6 @@ ELSE ()
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${GCC_COVERAGE_COMPILE_FLAGS} ${GCC_COVERAGE_LINK_FLAGS}") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${GCC_COVERAGE_COMPILE_FLAGS} ${GCC_COVERAGE_LINK_FLAGS}")
ENDIF () ENDIF ()
IF (${BUILD_SANITIZER})
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3 -Wformat=0")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3 -Wformat=0")
MESSAGE(STATUS "Compile with Address Sanitizer!")
ELSEIF (${BUILD_RELEASE})
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -g3 -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -g3 -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
ENDIF ()
# disable all assert # disable all assert
IF ((${DISABLE_ASSERT} MATCHES "true") OR (${DISABLE_ASSERTS} MATCHES "true")) IF ((${DISABLE_ASSERT} MATCHES "true") OR (${DISABLE_ASSERTS} MATCHES "true"))
ADD_DEFINITIONS(-DDISABLE_ASSERT) ADD_DEFINITIONS(-DDISABLE_ASSERT)
@ -168,4 +156,20 @@ ELSE ()
MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2) is ACTIVATED") MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2) is ACTIVATED")
ENDIF() ENDIF()
# build mode
SET(CMAKE_C_FLAGS_REL "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS_REL "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
IF (${BUILD_SANITIZER})
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3 -Wformat=0")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3 -Wformat=0")
MESSAGE(STATUS "Compile with Address Sanitizer!")
ELSEIF (${BUILD_RELEASE})
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS_REL}")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS_REL}")
ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -g3 -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -g3 -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
ENDIF ()
ENDIF () ENDIF ()

View File

@ -229,7 +229,10 @@ endif(${BUILD_WITH_LEVELDB})
# To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev # To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev
if(${BUILD_WITH_ROCKSDB}) if(${BUILD_WITH_ROCKSDB})
if(${TD_LINUX}) if(${TD_LINUX})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized -Wno-error=unused-but-set-variable -Wno-error=unused-variable -Wno-error=unused-function -Wno-errno=unused-private-field -Wno-error=unused-result") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS_REL} -Wno-error=maybe-uninitialized -Wno-error=unused-but-set-variable -Wno-error=unused-variable -Wno-error=unused-function -Wno-errno=unused-private-field -Wno-error=unused-result")
IF ("${CMAKE_BUILD_TYPE}" STREQUAL "")
SET(CMAKE_BUILD_TYPE Release)
endif()
endif(${TD_LINUX}) endif(${TD_LINUX})
MESSAGE(STATUS "CXXXX STATUS CONFIG: " ${CMAKE_CXX_FLAGS}) MESSAGE(STATUS "CXXXX STATUS CONFIG: " ${CMAKE_CXX_FLAGS})
@ -265,7 +268,7 @@ if(${BUILD_WITH_ROCKSDB})
option(WITH_FALLOCATE "" OFF) option(WITH_FALLOCATE "" OFF)
option(WITH_JEMALLOC "" OFF) option(WITH_JEMALLOC "" OFF)
option(WITH_GFLAGS "" OFF) option(WITH_GFLAGS "" OFF)
option(PORTABLE "" OFF) option(PORTABLE "" ON)
option(WITH_LIBURING "" OFF) option(WITH_LIBURING "" OFF)
option(FAIL_ON_WARNINGS OFF) option(FAIL_ON_WARNINGS OFF)
@ -485,6 +488,13 @@ endif(${BUILD_ADDR2LINE})
# geos # geos
if(${BUILD_GEOS}) if(${BUILD_GEOS})
if(${TD_LINUX})
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS_REL}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS_REL}")
IF ("${CMAKE_BUILD_TYPE}" STREQUAL "")
SET(CMAKE_BUILD_TYPE Release)
endif()
endif(${TD_LINUX})
option(BUILD_SHARED_LIBS "Build GEOS with shared libraries" OFF) option(BUILD_SHARED_LIBS "Build GEOS with shared libraries" OFF)
add_subdirectory(geos EXCLUDE_FROM_ALL) add_subdirectory(geos EXCLUDE_FROM_ALL)
target_include_directories( target_include_directories(

View File

@ -79,8 +79,6 @@ Usage: taosdump [OPTION...] dbname [tbname ...]
-e, --escape-character Use escaped character for database name -e, --escape-character Use escaped character for database name
-N, --without-property Dump database without its properties. -N, --without-property Dump database without its properties.
-s, --schemaonly Only dump table schemas. -s, --schemaonly Only dump table schemas.
-y, --answer-yes Input yes for prompt. It will skip data file
checking!
-d, --avro-codec=snappy Choose an avro codec among null, deflate, snappy, -d, --avro-codec=snappy Choose an avro codec among null, deflate, snappy,
and lzma. and lzma.
-S, --start-time=START_TIME Start time to dump. Either epoch or -S, --start-time=START_TIME Start time to dump. Either epoch or

30
docs/en/14-reference/12-config/index.md Normal file → Executable file
View File

@ -365,6 +365,16 @@ The charset that takes effect is UTF-8.
| Unit | GB | | Unit | GB |
| Default Value | 2.0 | | Default Value | 2.0 |
### metaCacheMaxSize
| Attribute | Description |
| ------------- | ------------------------------------------------------------------------------------------------- |
| Applicable | Client Only |
| Meaning | Maximum meta cache size in single client process |
| Unit | MB |
| Default Value | -1 (No limitation) |
## Cluster Parameters ## Cluster Parameters
### supportVnodes ### supportVnodes
@ -433,6 +443,26 @@ The charset that takes effect is UTF-8.
| Default Value | 0 | | Default Value | 0 |
| Note | When it's bigger than 0, the log file would be renamed to "taosdlog.xxx" in which "xxx" is the timestamp when the file is changed last time | | Note | When it's bigger than 0, the log file would be renamed to "taosdlog.xxx" in which "xxx" is the timestamp when the file is changed last time |
### slowLogThreshold
| Attribute | Description |
| ------------- | -------------------------------------------------------------------------------------------------------- |
| Applicable | Client only |
| Meaning | When an operation execution time exceeds this threshold, the operation will be logged in slow log file |
| Unit | second |
| Default Value | 3 |
| Note | All slow operations will be logged in file "taosSlowLog" in the log directory |
### slowLogScope
| Attribute | Description |
| --------------- | ----------------------------------------------------------------------- |
| Applicable | Client only |
| Meaning | Slow log type to be logged |
| Optional Values | ALL, QUERY, INSERT, OTHERS, NONE |
| Default Value | ALL |
| Note | All slow operations will be logged by default, one option could be set |
### debugFlag ### debugFlag
| Attribute | Description | | Attribute | Description |

View File

@ -299,7 +299,7 @@ SELECT COUNT(*) FROM test.meters WHERE location = "California.SanFrancisco";
SELECT AVG(current), MAX(voltage), MIN(phase) FROM test.meters WHERE groupId = 10; SELECT AVG(current), MAX(voltage), MIN(phase) FROM test.meters WHERE groupId = 10;
``` ```
对表 `d10` 按 10 秒进行平均值、最大值和最小值聚合统计: 对表 `d10` 10 秒进行平均值、最大值和最小值聚合统计:
```sql ```sql
SELECT FIRST(ts), AVG(current), MAX(voltage), MIN(phase) FROM test.d10 INTERVAL(10s); SELECT FIRST(ts), AVG(current), MAX(voltage), MIN(phase) FROM test.d10 INTERVAL(10s);

View File

@ -82,8 +82,6 @@ Usage: taosdump [OPTION...] dbname [tbname ...]
-e, --escape-character Use escaped character for database name -e, --escape-character Use escaped character for database name
-N, --without-property Dump database without its properties. -N, --without-property Dump database without its properties.
-s, --schemaonly Only dump tables' schema. -s, --schemaonly Only dump tables' schema.
-y, --answer-yes Input yes for prompt. It will skip data file
checking!
-d, --avro-codec=snappy Choose an avro codec among null, deflate, snappy, -d, --avro-codec=snappy Choose an avro codec among null, deflate, snappy,
and lzma. and lzma.
-S, --start-time=START_TIME Start time to dump. Either epoch or -S, --start-time=START_TIME Start time to dump. Either epoch or

29
docs/zh/14-reference/12-config/index.md Normal file → Executable file
View File

@ -384,6 +384,15 @@ charset 的有效值是 UTF-8。
| 单位 | GB | | 单位 | GB |
| 缺省值 | 2.0 | | 缺省值 | 2.0 |
### metaCacheMaxSize
| 属性 | 说明 |
| -------- | ---------------------------------------------- |
| 适用范围 | 仅客户端适用 |
| 含义 | 指定单个客户端元数据缓存大小的最大值 |
| 单位 | MB |
| 缺省值 | -1 (无限制) |
## 集群相关 ## 集群相关
### supportVnodes ### supportVnodes
@ -452,6 +461,26 @@ charset 的有效值是 UTF-8。
| 缺省值 | 0 | | 缺省值 | 0 |
| 补充说明 | 大于 0 时,日志文件会被重命名为 taosdlog.xxx其中 xxx 为日志文件最后修改的时间戳。 | | 补充说明 | 大于 0 时,日志文件会被重命名为 taosdlog.xxx其中 xxx 为日志文件最后修改的时间戳。 |
### slowLogThreshold
| 属性 | 说明 |
| -------- | ------------------------------------------------------------- |
| 适用范围 | 仅客户端适用 |
| 含义 | 指定慢查询门限值,大于等于门限值认为是慢查询 |
| 单位 | 秒 |
| 缺省值 | 3 |
| 补充说明 | 每个客户端中所有慢查询会被记录在日志目录下的taosSlowLog文件中 |
### slowLogScope
| 属性 | 说明 |
| -------- | --------------------------------------------------------------|
| 适用范围 | 仅客户端适用 |
| 含义 | 指定启动记录哪些类型的慢查询 |
| 可选值 | ALL, QUERY, INSERT, OTHERS, NONE |
| 缺省值 | ALL |
| 补充说明 | 默认记录所有类型的慢查询,可通过配置只记录某一类型的慢查询 |
### debugFlag ### debugFlag
| 属性 | 说明 | | 属性 | 说明 |

View File

@ -51,27 +51,27 @@ public class JdbcDemo {
private void createDatabase() { private void createDatabase() {
String sql = "create database if not exists " + dbName; String sql = "create database if not exists " + dbName;
exuete(sql); execute(sql);
} }
private void useDatabase() { private void useDatabase() {
String sql = "use " + dbName; String sql = "use " + dbName;
exuete(sql); execute(sql);
} }
private void dropTable() { private void dropTable() {
final String sql = "drop table if exists " + dbName + "." + tbName + ""; final String sql = "drop table if exists " + dbName + "." + tbName + "";
exuete(sql); execute(sql);
} }
private void createTable() { private void createTable() {
final String sql = "create table if not exists " + dbName + "." + tbName + " (ts timestamp, temperature float, humidity int)"; final String sql = "create table if not exists " + dbName + "." + tbName + " (ts timestamp, temperature float, humidity int)";
exuete(sql); execute(sql);
} }
private void insert() { private void insert() {
final String sql = "insert into " + dbName + "." + tbName + " (ts, temperature, humidity) values(now, 20.5, 34)"; final String sql = "insert into " + dbName + "." + tbName + " (ts, temperature, humidity) values(now, 20.5, 34)";
exuete(sql); execute(sql);
} }
private void select() { private void select() {
@ -120,7 +120,7 @@ public class JdbcDemo {
System.out.println("[ " + (succeed ? "OK" : "ERROR!") + " ] time cost: " + cost + " ms, execute statement ====> " + sql); System.out.println("[ " + (succeed ? "OK" : "ERROR!") + " ] time cost: " + cost + " ms, execute statement ====> " + sql);
} }
private void exuete(String sql) { private void execute(String sql) {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
try (Statement statement = connection.createStatement()) { try (Statement statement = connection.createStatement()) {
boolean execute = statement.execute(sql); boolean execute = statement.execute(sql);

View File

@ -22,7 +22,7 @@
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<version>30.1.1-jre</version> <version>32.0.0-jre</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -82,6 +82,7 @@ extern int64_t tsVndCommitMaxIntervalMs;
// mnode // mnode
extern int64_t tsMndSdbWriteDelta; extern int64_t tsMndSdbWriteDelta;
extern int64_t tsMndLogRetention; extern int64_t tsMndLogRetention;
extern int8_t tsGrant;
extern bool tsMndSkipGrant; extern bool tsMndSkipGrant;
// monitor // monitor
@ -198,6 +199,7 @@ void taosSetAllDebugFlag(int32_t flag, bool rewrite);
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal, bool rewrite); void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal, bool rewrite);
int32_t taosApplyLocalCfg(SConfig *pCfg, char *name); int32_t taosApplyLocalCfg(SConfig *pCfg, char *name);
void taosLocalCfgForbiddenToChange(char *name, bool *forbidden); void taosLocalCfgForbiddenToChange(char *name, bool *forbidden);
int8_t taosGranted();
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -310,6 +310,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME_PUSH, "vnode-tmq-consume-push", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)

View File

@ -42,8 +42,8 @@ else:
# os.system("rm -rf /var/lib/taos/*") # os.system("rm -rf /var/lib/taos/*")
# os.system("systemctl restart taosd ") # os.system("systemctl restart taosd ")
# wait a moment ,at least 5 seconds # wait a moment ,at least 10 seconds
time.sleep(5) time.sleep(10)
# prepare data by taosBenchmark # prepare data by taosBenchmark

View File

@ -124,12 +124,12 @@ if [ -f ${compile_dir}/build/bin/jemalloc-config ]; then
cp ${compile_dir}/build/lib/libjemalloc.so.2 ${pkg_dir}${install_user_local_path}/lib/ cp ${compile_dir}/build/lib/libjemalloc.so.2 ${pkg_dir}${install_user_local_path}/lib/
ln -sf libjemalloc.so.2 ${pkg_dir}${install_user_local_path}/lib/libjemalloc.so ln -sf libjemalloc.so.2 ${pkg_dir}${install_user_local_path}/lib/libjemalloc.so
fi fi
if [ -f ${compile_dir}/build/lib/libjemalloc.a ]; then # if [ -f ${compile_dir}/build/lib/libjemalloc.a ]; then
cp ${compile_dir}/build/lib/libjemalloc.a ${pkg_dir}${install_user_local_path}/lib/ # cp ${compile_dir}/build/lib/libjemalloc.a ${pkg_dir}${install_user_local_path}/lib/
fi # fi
if [ -f ${compile_dir}/build/lib/libjemalloc_pic.a ]; then # if [ -f ${compile_dir}/build/lib/libjemalloc_pic.a ]; then
cp ${compile_dir}/build/lib/libjemalloc_pic.a ${pkg_dir}${install_user_local_path}/lib/ # cp ${compile_dir}/build/lib/libjemalloc_pic.a ${pkg_dir}${install_user_local_path}/lib/
fi # fi
if [ -f ${compile_dir}/build/lib/pkgconfig/jemalloc.pc ]; then if [ -f ${compile_dir}/build/lib/pkgconfig/jemalloc.pc ]; then
cp ${compile_dir}/build/lib/pkgconfig/jemalloc.pc ${pkg_dir}${install_user_local_path}/lib/pkgconfig/ cp ${compile_dir}/build/lib/pkgconfig/jemalloc.pc ${pkg_dir}${install_user_local_path}/lib/pkgconfig/
fi fi

View File

@ -123,12 +123,12 @@ if [ -f %{_compiledir}/build/bin/jemalloc-config ]; then
cp %{_compiledir}/build/lib/libjemalloc.so.2 %{buildroot}%{homepath}/jemalloc/lib cp %{_compiledir}/build/lib/libjemalloc.so.2 %{buildroot}%{homepath}/jemalloc/lib
ln -sf libjemalloc.so.2 %{buildroot}%{homepath}/jemalloc/lib/libjemalloc.so ln -sf libjemalloc.so.2 %{buildroot}%{homepath}/jemalloc/lib/libjemalloc.so
fi fi
if [ -f %{_compiledir}/build/lib/libjemalloc.a ]; then # if [ -f %{_compiledir}/build/lib/libjemalloc.a ]; then
cp %{_compiledir}/build/lib/libjemalloc.a %{buildroot}%{homepath}/jemalloc/lib # cp %{_compiledir}/build/lib/libjemalloc.a %{buildroot}%{homepath}/jemalloc/lib
fi # fi
if [ -f %{_compiledir}/build/lib/libjemalloc_pic.a ]; then # if [ -f %{_compiledir}/build/lib/libjemalloc_pic.a ]; then
cp %{_compiledir}/build/lib/libjemalloc_pic.a %{buildroot}%{homepath}/jemalloc/lib # cp %{_compiledir}/build/lib/libjemalloc_pic.a %{buildroot}%{homepath}/jemalloc/lib
fi # fi
if [ -f %{_compiledir}/build/lib/pkgconfig/jemalloc.pc ]; then if [ -f %{_compiledir}/build/lib/pkgconfig/jemalloc.pc ]; then
cp %{_compiledir}/build/lib/pkgconfig/jemalloc.pc %{buildroot}%{homepath}/jemalloc/lib/pkgconfig cp %{_compiledir}/build/lib/pkgconfig/jemalloc.pc %{buildroot}%{homepath}/jemalloc/lib/pkgconfig
fi fi

View File

@ -315,13 +315,13 @@ function install_jemalloc() {
${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.so.2 /usr/local/lib ${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.so.2 /usr/local/lib
${csudo}ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so ${csudo}ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so
${csudo}/usr/bin/install -c -d /usr/local/lib ${csudo}/usr/bin/install -c -d /usr/local/lib
if [ -f ${jemalloc_dir}/lib/libjemalloc.a ]; then # if [ -f ${jemalloc_dir}/lib/libjemalloc.a ]; then
${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.a /usr/local/lib # ${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.a /usr/local/lib
fi # fi
if [ -f ${jemalloc_dir}/lib/libjemalloc_pic.a ]; then # if [ -f ${jemalloc_dir}/lib/libjemalloc_pic.a ]; then
${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc_pic.a /usr/local/lib # ${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc_pic.a /usr/local/lib
fi # fi
if [ -f ${jemalloc_dir}/lib/libjemalloc_pic.a ]; then if [ -f ${jemalloc_dir}/lib/pkgconfig/jemalloc.pc ]; then
${csudo}/usr/bin/install -c -d /usr/local/lib/pkgconfig ${csudo}/usr/bin/install -c -d /usr/local/lib/pkgconfig
${csudo}/usr/bin/install -c -m 644 ${jemalloc_dir}/lib/pkgconfig/jemalloc.pc /usr/local/lib/pkgconfig ${csudo}/usr/bin/install -c -m 644 ${jemalloc_dir}/lib/pkgconfig/jemalloc.pc /usr/local/lib/pkgconfig
fi fi

View File

@ -214,13 +214,13 @@ function install_jemalloc() {
${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.so.2 /usr/local/lib ${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.so.2 /usr/local/lib
${csudo}ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so ${csudo}ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so
${csudo}/usr/bin/install -c -d /usr/local/lib ${csudo}/usr/bin/install -c -d /usr/local/lib
if [ -f ${jemalloc_dir}/lib/libjemalloc.a ]; then # if [ -f ${jemalloc_dir}/lib/libjemalloc.a ]; then
${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.a /usr/local/lib # ${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.a /usr/local/lib
fi # fi
if [ -f ${jemalloc_dir}/lib/libjemalloc_pic.a ]; then # if [ -f ${jemalloc_dir}/lib/libjemalloc_pic.a ]; then
${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc_pic.a /usr/local/lib # ${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc_pic.a /usr/local/lib
fi # fi
if [ -f ${jemalloc_dir}/lib/libjemalloc_pic.a ]; then if [ -f ${jemalloc_dir}/lib/pkgconfig/jemalloc.pc ]; then
${csudo}/usr/bin/install -c -d /usr/local/lib/pkgconfig ${csudo}/usr/bin/install -c -d /usr/local/lib/pkgconfig
${csudo}/usr/bin/install -c -m 644 ${jemalloc_dir}/lib/pkgconfig/jemalloc.pc /usr/local/lib/pkgconfig ${csudo}/usr/bin/install -c -m 644 ${jemalloc_dir}/lib/pkgconfig/jemalloc.pc /usr/local/lib/pkgconfig
fi fi

View File

@ -241,10 +241,10 @@ function install_jemalloc() {
${csudo}/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.so.2 /usr/local/lib ${csudo}/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.so.2 /usr/local/lib
${csudo}ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so > /dev/null 2>&1 ${csudo}ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so > /dev/null 2>&1
${csudo}/usr/bin/install -c -d /usr/local/lib ${csudo}/usr/bin/install -c -d /usr/local/lib
[ -f ${binary_dir}/build/lib/libjemalloc.a ] && # [ -f ${binary_dir}/build/lib/libjemalloc.a ] &&
${csudo}/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.a /usr/local/lib # ${csudo}/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.a /usr/local/lib
[ -f ${binary_dir}/build/lib/libjemalloc_pic.a ] && # [ -f ${binary_dir}/build/lib/libjemalloc_pic.a ] &&
${csudo}/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc_pic.a /usr/local/lib # ${csudo}/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc_pic.a /usr/local/lib
if [ -f "${binary_dir}/build/lib/pkgconfig/jemalloc.pc" ]; then if [ -f "${binary_dir}/build/lib/pkgconfig/jemalloc.pc" ]; then
${csudo}/usr/bin/install -c -d /usr/local/lib/pkgconfig ${csudo}/usr/bin/install -c -d /usr/local/lib/pkgconfig
${csudo}/usr/bin/install -c -m 644 ${binary_dir}/build/lib/pkgconfig/jemalloc.pc \ ${csudo}/usr/bin/install -c -m 644 ${binary_dir}/build/lib/pkgconfig/jemalloc.pc \

View File

@ -118,12 +118,12 @@ if [ -f ${build_dir}/bin/jemalloc-config ]; then
cp ${build_dir}/lib/libjemalloc.so.2 ${install_dir}/jemalloc/lib cp ${build_dir}/lib/libjemalloc.so.2 ${install_dir}/jemalloc/lib
ln -sf libjemalloc.so.2 ${install_dir}/jemalloc/lib/libjemalloc.so ln -sf libjemalloc.so.2 ${install_dir}/jemalloc/lib/libjemalloc.so
fi fi
if [ -f ${build_dir}/lib/libjemalloc.a ]; then # if [ -f ${build_dir}/lib/libjemalloc.a ]; then
cp ${build_dir}/lib/libjemalloc.a ${install_dir}/jemalloc/lib # cp ${build_dir}/lib/libjemalloc.a ${install_dir}/jemalloc/lib
fi # fi
if [ -f ${build_dir}/lib/libjemalloc_pic.a ]; then # if [ -f ${build_dir}/lib/libjemalloc_pic.a ]; then
cp ${build_dir}/lib/libjemalloc_pic.a ${install_dir}/jemalloc/lib # cp ${build_dir}/lib/libjemalloc_pic.a ${install_dir}/jemalloc/lib
fi # fi
if [ -f ${build_dir}/lib/pkgconfig/jemalloc.pc ]; then if [ -f ${build_dir}/lib/pkgconfig/jemalloc.pc ]; then
cp ${build_dir}/lib/pkgconfig/jemalloc.pc ${install_dir}/jemalloc/lib/pkgconfig cp ${build_dir}/lib/pkgconfig/jemalloc.pc ${install_dir}/jemalloc/lib/pkgconfig
fi fi

View File

@ -217,12 +217,12 @@ if [ -f ${build_dir}/bin/jemalloc-config ]; then
cp ${build_dir}/lib/libjemalloc.so.2 ${install_dir}/jemalloc/lib cp ${build_dir}/lib/libjemalloc.so.2 ${install_dir}/jemalloc/lib
ln -sf libjemalloc.so.2 ${install_dir}/jemalloc/lib/libjemalloc.so ln -sf libjemalloc.so.2 ${install_dir}/jemalloc/lib/libjemalloc.so
fi fi
if [ -f ${build_dir}/lib/libjemalloc.a ]; then # if [ -f ${build_dir}/lib/libjemalloc.a ]; then
cp ${build_dir}/lib/libjemalloc.a ${install_dir}/jemalloc/lib # cp ${build_dir}/lib/libjemalloc.a ${install_dir}/jemalloc/lib
fi # fi
if [ -f ${build_dir}/lib/libjemalloc_pic.a ]; then # if [ -f ${build_dir}/lib/libjemalloc_pic.a ]; then
cp ${build_dir}/lib/libjemalloc_pic.a ${install_dir}/jemalloc/lib # cp ${build_dir}/lib/libjemalloc_pic.a ${install_dir}/jemalloc/lib
fi # fi
if [ -f ${build_dir}/lib/pkgconfig/jemalloc.pc ]; then if [ -f ${build_dir}/lib/pkgconfig/jemalloc.pc ]; then
cp ${build_dir}/lib/pkgconfig/jemalloc.pc ${install_dir}/jemalloc/lib/pkgconfig cp ${build_dir}/lib/pkgconfig/jemalloc.pc ${install_dir}/jemalloc/lib/pkgconfig
fi fi

View File

@ -169,13 +169,13 @@ function install_jemalloc() {
${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.so.2 /usr/local/lib ${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.so.2 /usr/local/lib
${csudo}ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so ${csudo}ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so
${csudo}/usr/bin/install -c -d /usr/local/lib ${csudo}/usr/bin/install -c -d /usr/local/lib
if [ -f ${jemalloc_dir}/lib/libjemalloc.a ]; then # if [ -f ${jemalloc_dir}/lib/libjemalloc.a ]; then
${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.a /usr/local/lib # ${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.a /usr/local/lib
fi # fi
if [ -f ${jemalloc_dir}/lib/libjemalloc_pic.a ]; then # if [ -f ${jemalloc_dir}/lib/libjemalloc_pic.a ]; then
${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc_pic.a /usr/local/lib # ${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc_pic.a /usr/local/lib
fi # fi
if [ -f ${jemalloc_dir}/lib/libjemalloc_pic.a ]; then if [ -f ${jemalloc_dir}/lib/pkgconfig/jemalloc.pc ]; then
${csudo}/usr/bin/install -c -d /usr/local/lib/pkgconfig ${csudo}/usr/bin/install -c -d /usr/local/lib/pkgconfig
${csudo}/usr/bin/install -c -m 644 ${jemalloc_dir}/lib/pkgconfig/jemalloc.pc /usr/local/lib/pkgconfig ${csudo}/usr/bin/install -c -m 644 ${jemalloc_dir}/lib/pkgconfig/jemalloc.pc /usr/local/lib/pkgconfig
fi fi

View File

@ -1553,17 +1553,8 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
} }
} }
char cTmp = 0; // for print tmp if is raw
if (info->isRawLine) {
cTmp = tmp[len];
tmp[len] = '\0';
}
uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, numLines:%d, protocol:%d, len:%d, sql:%s", info->id, uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, numLines:%d, protocol:%d, len:%d, sql:%s", info->id,
info->isRawLine, numLines, info->protocol, len, tmp); info->isRawLine, numLines, info->protocol, len, info->isRawLine ? "rawdata" : tmp);
if (info->isRawLine) {
tmp[len] = cTmp;
}
if (info->protocol == TSDB_SML_LINE_PROTOCOL) { if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
if (info->dataFormat) { if (info->dataFormat) {
@ -1584,8 +1575,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE; code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tmp[len] = '\0'; uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, info->isRawLine ? "rawdata" : tmp);
uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
return code; return code;
} }
if (info->reRun) { if (info->reRun) {

View File

@ -280,7 +280,7 @@ static const SSysDbTableSchema topicSchema[] = {
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "schema", .bytes = TSDB_SHOW_SCHEMA_JSON_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "schema", .bytes = TSDB_MAX_BINARY_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "meta", .bytes = 4 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "meta", .bytes = 4 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "type", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "type", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
}; };

View File

@ -1590,18 +1590,35 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int
int32_t nRows = payloadSize / rowSize; int32_t nRows = payloadSize / rowSize;
ASSERT(nRows >= 1); ASSERT(nRows >= 1);
// the true value must be less than the value of nRows int32_t numVarCols = 0;
int32_t additional = 0; int32_t numFixCols = 0;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pCol->info.type)) { if (IS_VAR_DATA_TYPE(pCol->info.type)) {
additional += nRows * sizeof(int32_t); ++numVarCols;
} else { } else {
additional += BitmapLen(nRows); ++numFixCols;
} }
} }
int32_t newRows = (payloadSize - additional) / rowSize; // find the data payload whose size is greater than payloadSize
int result = -1;
int start = 1;
int end = nRows;
while (start <= end) {
int mid = start + (end - start) / 2;
//data size + var data type columns offset + fixed data type columns bitmap len
int midSize = rowSize * mid + numVarCols * sizeof(int32_t) * mid + numFixCols * BitmapLen(mid);
if (midSize > payloadSize) {
result = mid;
end = mid - 1;
} else {
start = mid + 1;
}
}
int32_t newRows = (result != -1) ? result - 1 : nRows;
// the true value must be less than the value of nRows
ASSERT(newRows <= nRows && newRows >= 1); ASSERT(newRows <= nRows && newRows >= 1);
return newRows; return newRows;

View File

@ -14,6 +14,7 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h" #include "tglobal.h"
#include "tconfig.h" #include "tconfig.h"
#include "tgrant.h" #include "tgrant.h"
@ -73,6 +74,7 @@ int64_t tsVndCommitMaxIntervalMs = 600 * 1000;
// mnode // mnode
int64_t tsMndSdbWriteDelta = 200; int64_t tsMndSdbWriteDelta = 200;
int64_t tsMndLogRetention = 2000; int64_t tsMndLogRetention = 2000;
int8_t tsGrant = 1;
bool tsMndSkipGrant = false; bool tsMndSkipGrant = false;
// monitor // monitor
@ -1525,3 +1527,5 @@ void taosSetAllDebugFlag(int32_t flag, bool rewrite) {
taosSetDebugFlag(&metaDebugFlag, "metaDebugFlag", flag, rewrite); taosSetDebugFlag(&metaDebugFlag, "metaDebugFlag", flag, rewrite);
uInfo("all debug flag are set to %d", flag); uInfo("all debug flag are set to %d", flag);
} }
int8_t taosGranted() { return atomic_load_8(&tsGrant); }

View File

@ -714,6 +714,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -23,10 +23,6 @@ static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
SEpSet epSet = {0}; SEpSet epSet = {0};
dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet); dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
if (epSet.numOfEps == 1) {
return;
}
const int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet); const int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->pCont = rpcMallocCont(contLen); pMsg->pCont = rpcMallocCont(contLen);
if (pMsg->pCont == NULL) { if (pMsg->pCont == NULL) {

View File

@ -105,6 +105,7 @@ typedef struct {
SHashObj *dnodeHash; SHashObj *dnodeHash;
TdThreadRwlock lock; TdThreadRwlock lock;
SMsgCb msgCb; SMsgCb msgCb;
bool validMnodeEps;
} SDnodeData; } SDnodeData;
typedef struct { typedef struct {

View File

@ -288,6 +288,8 @@ static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
taosHashPut(pData->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); taosHashPut(pData->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
} }
pData->validMnodeEps = true;
dmPrintEps(pData); dmPrintEps(pData);
} }
@ -348,6 +350,7 @@ void dmRotateMnodeEpSet(SDnodeData *pData) {
} }
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) { void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) {
if(!pData->validMnodeEps) return;
dmGetMnodeEpSet(pData, pEpSet); dmGetMnodeEpSet(pData, pEpSet);
dTrace("msg is redirected, handle:%p num:%d use:%d", pMsg->info.handle, pEpSet->numOfEps, pEpSet->inUse); dTrace("msg is redirected, handle:%p num:%d use:%d", pMsg->info.handle, pEpSet->numOfEps, pEpSet->inUse);
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {

View File

@ -3159,8 +3159,14 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
int32_t numOfRows = buildSysDbColsInfo(pBlock, pShow->db, pShow->filterTb);
int32_t numOfRows = 0;
if (!pShow->sysDbRsp) {
numOfRows = buildSysDbColsInfo(pBlock, pShow->db, pShow->filterTb);
mDebug("mndRetrieveStbCol get system table cols, rows:%d, db:%s", numOfRows, pShow->db); mDebug("mndRetrieveStbCol get system table cols, rows:%d, db:%s", numOfRows, pShow->db);
pShow->sysDbRsp = true;
}
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
if (strlen(pShow->db) > 0) { if (strlen(pShow->db) > 0) {
pDb = mndAcquireDb(pMnode, pShow->db); pDb = mndAcquireDb(pMnode, pShow->db);

View File

@ -912,12 +912,14 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pTopic->createTime, false); colDataSetVal(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; char *sql = taosMemoryMalloc(strlen(pTopic->sql) + VARSTR_HEADER_SIZE);
STR_TO_VARSTR(sql, pTopic->sql); STR_TO_VARSTR(sql, pTopic->sql);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)sql, false); colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
taosMemoryFree(sql);
char *schemaJson = taosMemoryMalloc(TSDB_SHOW_SCHEMA_JSON_LEN + VARSTR_HEADER_SIZE); char *schemaJson = taosMemoryMalloc(TSDB_SHOW_SCHEMA_JSON_LEN + VARSTR_HEADER_SIZE);
if(pTopic->subType == TOPIC_SUB_TYPE__COLUMN){ if(pTopic->subType == TOPIC_SUB_TYPE__COLUMN){
schemaToJson(pTopic->schema.pSchema, pTopic->schema.nCols, schemaJson); schemaToJson(pTopic->schema.pSchema, pTopic->schema.nCols, schemaJson);

View File

@ -229,6 +229,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg);
// tq-stream // tq-stream

View File

@ -264,7 +264,7 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq*
tFormatOffset(buf1, TSDB_OFFSET_LEN, &pRsp->reqOffset); tFormatOffset(buf1, TSDB_OFFSET_LEN, &pRsp->reqOffset);
tFormatOffset(buf2, TSDB_OFFSET_LEN, &pRsp->rspOffset); tFormatOffset(buf2, TSDB_OFFSET_LEN, &pRsp->rspOffset);
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId, tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId,
pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
return 0; return 0;
@ -421,6 +421,35 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
return 0; return 0;
} }
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
taosWLockLatch(&pTq->lock);
if (taosHashGetSize(pTq->pPushMgr) > 0) {
void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
while (pIter) {
STqHandle* pHandle = *(STqHandle**)pIter;
tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
if (ASSERT(pHandle->msg != NULL)) {
tqError("pHandle->msg should not be null");
break;
}else{
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
}
pIter = taosHashIterate(pTq->pPushMgr, pIter);
}
taosHashClear(pTq->pPushMgr);
}
taosWUnLockLatch(&pTq->lock);
return 0;
}
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SMqPollReq req = {0}; SMqPollReq req = {0};
int code = 0; int code = 0;

View File

@ -17,35 +17,16 @@
#include "vnd.h" #include "vnd.h"
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); if (taosHashGetSize(pTq->pPushMgr) <= 0) {
return 0;
taosWLockLatch(&pTq->lock); }
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME_PUSH};
if (taosHashGetSize(pTq->pPushMgr) > 0) { msg.pCont = rpcMallocCont(sizeof(SMsgHead));
void* pIter = taosHashIterate(pTq->pPushMgr, NULL); msg.contLen = sizeof(SMsgHead);
SMsgHead *pHead = msg.pCont;
while (pIter) { pHead->vgId = TD_VID(pTq->pVnode);
STqHandle* pHandle = *(STqHandle**)pIter; pHead->contLen = msg.contLen;
tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
if (ASSERT(pHandle->msg != NULL)) {
tqError("pHandle->msg should not be null");
break;
}else{
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
}
pIter = taosHashIterate(pTq->pPushMgr, pIter);
}
taosHashClear(pTq->pPushMgr);
}
// unlock
taosWUnLockLatch(&pTq->lock);
return 0; return 0;
} }

View File

@ -168,7 +168,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, pOffset); code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
if(code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) { if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
goto end; goto end;
} }
@ -176,13 +176,17 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) { if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
// lock // lock
taosWLockLatch(&pTq->lock); taosWLockLatch(&pTq->lock);
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
if (pOffset->version >= ver ||
dataRsp.rspOffset.version >= ver) { // check if there are data again to avoid lost data
code = tqRegisterPushHandle(pTq, pHandle, pMsg); code = tqRegisterPushHandle(pTq, pHandle, pMsg);
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
tDeleteMqDataRsp(&dataRsp); goto end;
return code; } else {
taosWUnLockLatch(&pTq->lock);
}
} }
// NOTE: this pHandle->consumerId may have been changed already.
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
end : { end : {
@ -192,9 +196,8 @@ end : {
" code:%d", " code:%d",
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
tDeleteMqDataRsp(&dataRsp); tDeleteMqDataRsp(&dataRsp);
}
return code; return code;
}
} }
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,

View File

@ -691,6 +691,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
.colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)}; .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)};
if (!pLastCol) { if (!pLastCol) {
pLastCol = &noneCol; pLastCol = &noneCol;
reallocVarData(&pLastCol->colVal);
} }
taosArraySet(pLastArray, idxKey->idx, pLastCol); taosArraySet(pLastArray, idxKey->idx, pLastCol);
@ -2848,15 +2849,17 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal}; *pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { if (IS_VAR_DATA_TYPE(pColVal->type) /*&& pColVal->value.nData > 0*/) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
if (pCol->colVal.value.pData == NULL) { if (pCol->colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
if (pColVal->value.nData > 0) {
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
} }
}
if (!COL_VAL_IS_VALUE(pColVal)) { if (!COL_VAL_IS_VALUE(pColVal)) {
if (!setNoneCol) { if (!setNoneCol) {
@ -3016,15 +3019,17 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal}; *pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { if (IS_VAR_DATA_TYPE(pColVal->type) /*&& pColVal->value.nData > 0*/) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
if (pCol->colVal.value.pData == NULL) { if (pCol->colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
if (pColVal->value.nData > 0) {
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
} }
}
/*if (COL_VAL_IS_NONE(pColVal)) { /*if (COL_VAL_IS_NONE(pColVal)) {
if (!setNoneCol) { if (!setNoneCol) {

View File

@ -238,6 +238,10 @@ static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
tEndDecode(pCoder); tEndDecode(pCoder);
_exit: _exit:
if (code) {
vError("vgId:%d, failed to preprocess submit request since %s, msg type:%d", TD_VID(pVnode), tstrerror(code),
pMsg->msgType);
}
tDecoderClear(pCoder); tDecoderClear(pCoder);
return code; return code;
} }
@ -297,7 +301,7 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
_exit: _exit:
if (code) { if (code) {
vError("vgId%d failed to preprocess write request since %s, msg type:%d", TD_VID(pVnode), tstrerror(code), vError("vgId:%d, failed to preprocess write request since %s, msg type:%d", TD_VID(pVnode), tstrerror(code),
pMsg->msgType); pMsg->msgType);
} }
return code; return code;
@ -505,7 +509,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing"); vTrace("message in vnode query queue is processing");
if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME) && !syncIsReadyForRead(pVnode->sync)) { if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME || pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) && !syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno); vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0; return 0;
} }
@ -526,6 +530,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_VND_TMQ_CONSUME: case TDMT_VND_TMQ_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_CONSUME_PUSH:
return tqProcessPollPush(pVnode->pTq, pMsg);
default: default:
vError("unknown msg type:%d in query queue", pMsg->msgType); vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
@ -559,8 +565,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return vnodeGetTableCfg(pVnode, pMsg, true); return vnodeGetTableCfg(pVnode, pMsg, true);
case TDMT_VND_BATCH_META: case TDMT_VND_BATCH_META:
return vnodeGetBatchMeta(pVnode, pMsg); return vnodeGetBatchMeta(pVnode, pMsg);
case TDMT_VND_TMQ_CONSUME: // case TDMT_VND_TMQ_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg); // return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_WALINFO: case TDMT_VND_TMQ_VG_WALINFO:
return tqProcessVgWalInfoReq(pVnode->pTq, pMsg); return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN: case TDMT_STREAM_TASK_RUN:

View File

@ -950,8 +950,8 @@ int32_t ctgCloneMetaOutput(STableMetaOutput* output, STableMetaOutput** pOutput)
int32_t ctgGenerateVgList(SCatalog* pCtg, SHashObj* vgHash, SArray** pList); int32_t ctgGenerateVgList(SCatalog* pCtg, SHashObj* vgHash, SArray** pList);
void ctgFreeJob(void* job); void ctgFreeJob(void* job);
void ctgFreeHandleImpl(SCatalog* pCtg); void ctgFreeHandleImpl(SCatalog* pCtg);
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup); int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SEpSet* pMgmtEps, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup);
int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx, int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SEpSet* pMgmgEpSet, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx,
char* dbFName, SArray* pNames, bool update); char* dbFName, SArray* pNames, bool update);
int32_t ctgGetVgIdsFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, char* dbFName, const char* pTbs[], int32_t tbNum, int32_t ctgGetVgIdsFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, char* dbFName, const char* pTbs[], int32_t tbNum,
int32_t* vgId); int32_t* vgId);

View File

@ -568,7 +568,7 @@ int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, pTableName, pVgroup)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pConn ? &pConn->mgmtEps : NULL, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, pTableName, pVgroup));
_return: _return:
@ -629,7 +629,7 @@ int32_t ctgGetCachedTbVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInf
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, pVgroup)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, NULL, dbCache->vgCache.vgInfo, pTableName, pVgroup));
ctgRUnlockVgInfo(dbCache); ctgRUnlockVgInfo(dbCache);

View File

@ -1112,7 +1112,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out; SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, pName, &vgInfo)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, &pConn->mgmtEps, pOut->dbVgroup, pName, &vgInfo));
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
@ -1132,7 +1132,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
if (NULL != dbCache) { if (NULL != dbCache) {
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, &pConn->mgmtEps, dbCache->vgCache.vgInfo, pName, &vgInfo));
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
@ -1282,7 +1282,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out; SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, pName, &vgInfo)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, &pConn->mgmtEps, pOut->dbVgroup, pName, &vgInfo));
ctgTaskDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); ctgTaskDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
@ -1302,7 +1302,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
if (NULL != dbCache) { if (NULL != dbCache) {
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, &pConn->mgmtEps, dbCache->vgCache.vgInfo, pName, &vgInfo));
ctgTaskDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); ctgTaskDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
@ -1501,7 +1501,7 @@ int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, ctx->pName, (SVgroupInfo*)pTask->res)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, &pTask->pJob->conn.mgmtEps, pOut->dbVgroup, ctx->pName, (SVgroupInfo*)pTask->res));
CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, ctx->dbFName, pOut->dbId, pOut->dbVgroup, false)); CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, ctx->dbFName, pOut->dbId, pOut->dbVgroup, false));
pOut->dbVgroup = NULL; pOut->dbVgroup = NULL;
@ -1536,7 +1536,7 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out; SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;
STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx); STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx);
CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, tReq, pOut->dbVgroup, ctx, pMsgCtx->target, pReq->pTables, true)); CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, &pTask->pJob->conn.mgmtEps, tReq, pOut->dbVgroup, ctx, pMsgCtx->target, pReq->pTables, true));
CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, pMsgCtx->target, pOut->dbId, pOut->dbVgroup, false)); CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, pMsgCtx->target, pOut->dbId, pOut->dbVgroup, false));
pOut->dbVgroup = NULL; pOut->dbVgroup = NULL;
@ -1799,7 +1799,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq* tReq, int32_t flag, SName* pName, int
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
if (dbCache) { if (dbCache) {
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, &pConn->mgmtEps, dbCache->vgCache.vgInfo, pName, &vgInfo));
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
@ -1948,7 +1948,7 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask* pTask) {
if (NULL == pTask->res) { if (NULL == pTask->res) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pCtx->pName, (SVgroupInfo*)pTask->res)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, &pConn->mgmtEps, dbCache->vgCache.vgInfo, pCtx->pName, (SVgroupInfo*)pTask->res));
ctgReleaseVgInfoToCache(pCtg, dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
dbCache = NULL; dbCache = NULL;
@ -1996,7 +1996,7 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask* pTask) {
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_ERR_JRET( CTG_ERR_JRET(
ctgGetVgInfosFromHashValue(pCtg, &tReq, dbCache->vgCache.vgInfo, pCtx, pReq->dbFName, pReq->pTables, false)); ctgGetVgInfosFromHashValue(pCtg, &pConn->mgmtEps, &tReq, dbCache->vgCache.vgInfo, pCtx, pReq->dbFName, pReq->pTables, false));
ctgReleaseVgInfoToCache(pCtg, dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
dbCache = NULL; dbCache = NULL;
@ -2375,7 +2375,7 @@ int32_t ctgGetTbCfgCb(SCtgTask* pTask) {
SDBVgInfo* pDb = (SDBVgInfo*)pTask->subRes.res; SDBVgInfo* pDb = (SDBVgInfo*)pTask->subRes.res;
pCtx->pVgInfo = taosMemoryCalloc(1, sizeof(SVgroupInfo)); pCtx->pVgInfo = taosMemoryCalloc(1, sizeof(SVgroupInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pTask->pJob->pCtg, pDb, pCtx->pName, pCtx->pVgInfo)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pTask->pJob->pCtg, &pTask->pJob->conn.mgmtEps, pDb, pCtx->pName, pCtx->pVgInfo));
} }
CTG_RET(ctgLaunchGetTbCfgTask(pTask)); CTG_RET(ctgLaunchGetTbCfgTask(pTask));
@ -2395,7 +2395,7 @@ int32_t ctgGetTbTagCb(SCtgTask* pTask) {
if (NULL == pCtx->pVgInfo) { if (NULL == pCtx->pVgInfo) {
pCtx->pVgInfo = taosMemoryCalloc(1, sizeof(SVgroupInfo)); pCtx->pVgInfo = taosMemoryCalloc(1, sizeof(SVgroupInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pTask->pJob->pCtg, pDb, pCtx->pName, pCtx->pVgInfo)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pTask->pJob->pCtg, &pTask->pJob->conn.mgmtEps, pDb, pCtx->pName, pCtx->pVgInfo));
} }
CTG_RET(ctgLaunchGetTbTagTask(pTask)); CTG_RET(ctgLaunchGetTbTagTask(pTask));

View File

@ -2989,7 +2989,7 @@ int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVg
} }
*pVgroup = taosMemoryCalloc(1, sizeof(SVgroupInfo)); *pVgroup = taosMemoryCalloc(1, sizeof(SVgroupInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, *pVgroup)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, NULL, dbCache->vgCache.vgInfo, pTableName, *pVgroup));
_return: _return:

View File

@ -969,7 +969,7 @@ int32_t ctgHashValueComp(void const* lp, void const* rp) {
return 0; return 0;
} }
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup) { int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SEpSet* pMgmtEps, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup) {
int32_t code = 0; int32_t code = 0;
CTG_ERR_RET(ctgMakeVgArray(dbInfo)); CTG_ERR_RET(ctgMakeVgArray(dbInfo));
@ -977,6 +977,14 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName
char db[TSDB_DB_FNAME_LEN] = {0}; char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db); tNameGetFullDbName(pTableName, db);
if (IS_SYS_DBNAME(pTableName->dbname)) {
pVgroup->vgId = MNODE_HANDLE;
if (pMgmtEps) {
memcpy(&pVgroup->epSet, pMgmtEps, sizeof(pVgroup->epSet));
}
return TSDB_CODE_SUCCESS;
}
if (vgNum <= 0) { if (vgNum <= 0) {
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", db, vgNum); ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", db, vgNum);
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED); CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
@ -1020,23 +1028,53 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx, int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SEpSet* pMgmgEpSet, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx,
char* dbFName, SArray* pNames, bool update) { char* dbFName, SArray* pNames, bool update) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SMetaRes res = {0}; SMetaRes res = {0};
SVgroupInfo* vgInfo = NULL;
CTG_ERR_RET(ctgMakeVgArray(dbInfo)); CTG_ERR_RET(ctgMakeVgArray(dbInfo));
int32_t tbNum = taosArrayGetSize(pNames);
char* pSep = strchr(dbFName, '.');
if (pSep && IS_SYS_DBNAME(pSep + 1)) {
SVgroupInfo mgmtInfo = {0};
mgmtInfo.vgId = MNODE_HANDLE;
if (pMgmgEpSet) {
memcpy(&mgmtInfo.epSet, pMgmgEpSet, sizeof(mgmtInfo.epSet));
}
for (int32_t i = 0; i < tbNum; ++i) {
vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo));
if (NULL == vgInfo) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(vgInfo, &mgmtInfo, sizeof(mgmtInfo));
ctgDebug("Got tb hash vgroup, vgId:%d, epNum %d, current %s port %d", vgInfo->vgId, vgInfo->epSet.numOfEps,
vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
if (update) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, tReq->msgIdx);
SMetaRes* pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx + i);
pRes->pRes = vgInfo;
} else {
res.pRes = vgInfo;
taosArrayPush(pCtx->pResList, &res);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t vgNum = taosArrayGetSize(dbInfo->vgArray); int32_t vgNum = taosArrayGetSize(dbInfo->vgArray);
if (vgNum <= 0) { if (vgNum <= 0) {
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum); ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
SVgroupInfo* vgInfo = NULL;
int32_t tbNum = taosArrayGetSize(pNames);
if (1 == vgNum) { if (1 == vgNum) {
for (int32_t i = 0; i < tbNum; ++i) { for (int32_t i = 0; i < tbNum; ++i) {
vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo)); vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo));

View File

@ -461,7 +461,11 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
uint32_t defaultPgsz = 0; uint32_t defaultPgsz = 0;
uint32_t defaultBufsz = 0; uint32_t defaultBufsz = 0;
getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
if (code) {
qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize);
return code;
}
if (!osTempSpaceAvailable()) { if (!osTempSpaceAvailable()) {
code = TSDB_CODE_NO_DISKSPACE; code = TSDB_CODE_NO_DISKSPACE;

View File

@ -174,6 +174,7 @@ void destroyEWindowOperatorInfo(void* param) {
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSup);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }

View File

@ -922,8 +922,13 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) { int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
*defaultPgsz = 4096; *defaultPgsz = 4096;
uint32_t last = *defaultPgsz;
while (*defaultPgsz < rowSize * 4) { while (*defaultPgsz < rowSize * 4) {
*defaultPgsz <<= 1u; *defaultPgsz <<= 1u;
if (*defaultPgsz < last) {
return TSDB_CODE_INVALID_PARA;
}
last = *defaultPgsz;
} }
// The default buffer for each operator in query is 10MB. // The default buffer for each operator in query is 10MB.
@ -932,6 +937,9 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
*defaultBufsz = 4096 * 2560; *defaultBufsz = 4096 * 2560;
if ((*defaultBufsz) <= (*defaultPgsz)) { if ((*defaultBufsz) <= (*defaultPgsz)) {
(*defaultBufsz) = (*defaultPgsz) * 4; (*defaultBufsz) = (*defaultPgsz) * 4;
if (*defaultBufsz < ((int64_t)(*defaultPgsz)) * 4) {
return TSDB_CODE_INVALID_PARA;
}
} }
return 0; return 0;

View File

@ -871,7 +871,12 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
uint32_t defaultBufsz = 0; uint32_t defaultBufsz = 0;
pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc); pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc);
getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz); int32_t code = getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
pTaskInfo->code = code;
goto _error;
}
if (!osTempSpaceAvailable()) { if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_DISKSPACE; terrno = TSDB_CODE_NO_DISKSPACE;
@ -880,7 +885,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
goto _error; goto _error;
} }
int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir); code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
pTaskInfo->code = code; pTaskInfo->code = code;

View File

@ -2660,7 +2660,7 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
} else { } else {
pDiffInfo->ignoreNegative = false; pDiffInfo->ignoreNegative = false;
} }
pDiffInfo->includeNull = false; pDiffInfo->includeNull = true;
pDiffInfo->firstOutput = false; pDiffInfo->firstOutput = false;
return true; return true;
} }

View File

@ -210,7 +210,7 @@ SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, ST
SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable, SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable,
bool withMeta, SNode* pWhere); bool withMeta, SNode* pWhere);
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName); SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName);
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pCGroupId, SToken* pTopicName); SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pCGroupId, SToken* pTopicName);
SNode* createAlterLocalStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue); SNode* createAlterLocalStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue);
SNode* createDefaultExplainOptions(SAstCreateContext* pCxt); SNode* createDefaultExplainOptions(SAstCreateContext* pCxt);
SNode* setExplainVerbose(SAstCreateContext* pCxt, SNode* pOptions, const SToken* pVal); SNode* setExplainVerbose(SAstCreateContext* pCxt, SNode* pOptions, const SToken* pVal);

View File

@ -210,6 +210,15 @@ static bool checkTopicName(SAstCreateContext* pCxt, SToken* pTopicName) {
return true; return true;
} }
static bool checkCGroupName(SAstCreateContext* pCxt, SToken* pCGroup) {
trimEscape(pCGroup);
if (pCGroup->n >= TSDB_CGROUP_LEN) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pCGroup->z);
return false;
}
return true;
}
static bool checkStreamName(SAstCreateContext* pCxt, SToken* pStreamName) { static bool checkStreamName(SAstCreateContext* pCxt, SToken* pStreamName) {
trimEscape(pStreamName); trimEscape(pStreamName);
if (pStreamName->n >= TSDB_STREAM_NAME_LEN) { if (pStreamName->n >= TSDB_STREAM_NAME_LEN) {
@ -1751,12 +1760,15 @@ SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken
return (SNode*)pStmt; return (SNode*)pStmt;
} }
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pCGroupId, SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pCGroupId,
SToken* pTopicName) { SToken* pTopicName) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
if (!checkTopicName(pCxt, pTopicName)) { if (!checkTopicName(pCxt, pTopicName)) {
return NULL; return NULL;
} }
if (!checkCGroupName(pCxt, pCGroupId)) {
return NULL;
}
SDropCGroupStmt* pStmt = (SDropCGroupStmt*)nodesMakeNode(QUERY_NODE_DROP_CGROUP_STMT); SDropCGroupStmt* pStmt = (SDropCGroupStmt*)nodesMakeNode(QUERY_NODE_DROP_CGROUP_STMT);
CHECK_OUT_OF_MEM(pStmt); CHECK_OUT_OF_MEM(pStmt);
pStmt->ignoreNotExists = ignoreNotExists; pStmt->ignoreNotExists = ignoreNotExists;

View File

@ -311,6 +311,9 @@ static int32_t calcConstDelete(SCalcConstContext* pCxt, SDeleteStmt* pDelete) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = calcConstStmtCondition(pCxt, &pDelete->pWhere, &pDelete->deleteZeroRows); code = calcConstStmtCondition(pCxt, &pDelete->pWhere, &pDelete->deleteZeroRows);
} }
if (code == TSDB_CODE_SUCCESS && pDelete->timeRange.skey > pDelete->timeRange.ekey) {
pDelete->deleteZeroRows = true;
}
return code; return code;
} }
@ -465,6 +468,9 @@ static bool isEmptyResultQuery(SNode* pStmt) {
} }
break; break;
} }
case QUERY_NODE_DELETE_STMT:
isEmptyResult = ((SDeleteStmt*)pStmt)->deleteZeroRows;
break;
default: default:
break; break;
} }

View File

@ -1384,13 +1384,33 @@ static bool isCountStar(SFunctionNode* pFunc) {
return (QUERY_NODE_COLUMN == nodeType(pPara) && 0 == strcmp(((SColumnNode*)pPara)->colName, "*")); return (QUERY_NODE_COLUMN == nodeType(pPara) && 0 == strcmp(((SColumnNode*)pPara)->colName, "*"));
} }
static int32_t rewriteCountStarAsCount1(STranslateContext* pCxt, SFunctionNode* pCount) {
int32_t code = TSDB_CODE_SUCCESS;
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == pVal) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pVal->node.resType.type = TSDB_DATA_TYPE_INT;
pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
const int32_t val = 1;
nodesSetValueNodeValue(pVal, (void*)&val);
pVal->translate = true;
nodesListErase(pCount->pParameterList, nodesListGetCell(pCount->pParameterList, 0));
code = nodesListAppend(pCount->pParameterList, (SNode*)pVal);
return code;
}
// count(*) is rewritten as count(ts) for scannning optimization // count(*) is rewritten as count(ts) for scannning optimization
static int32_t rewriteCountStar(STranslateContext* pCxt, SFunctionNode* pCount) { static int32_t rewriteCountStar(STranslateContext* pCxt, SFunctionNode* pCount) {
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pCount->pParameterList, 0); SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pCount->pParameterList, 0);
STableNode* pTable = NULL; STableNode* pTable = NULL;
int32_t code = findTable(pCxt, ('\0' == pCol->tableAlias[0] ? NULL : pCol->tableAlias), &pTable); int32_t code = findTable(pCxt, ('\0' == pCol->tableAlias[0] ? NULL : pCol->tableAlias), &pTable);
if (TSDB_CODE_SUCCESS == code && QUERY_NODE_REAL_TABLE == nodeType(pTable)) { if (TSDB_CODE_SUCCESS == code) {
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
setColumnInfoBySchema((SRealTableNode*)pTable, ((SRealTableNode*)pTable)->pMeta->schema, -1, pCol); setColumnInfoBySchema((SRealTableNode*)pTable, ((SRealTableNode*)pTable)->pMeta->schema, -1, pCol);
} else {
code = rewriteCountStarAsCount1(pCxt, pCount);
}
} }
return code; return code;
} }
@ -3055,13 +3075,13 @@ static bool needFill(SNode* pNode) {
static int32_t convertFillValue(STranslateContext* pCxt, SDataType dt, SNodeList* pValues, int32_t index) { static int32_t convertFillValue(STranslateContext* pCxt, SDataType dt, SNodeList* pValues, int32_t index) {
SListCell* pCell = nodesListGetCell(pValues, index); SListCell* pCell = nodesListGetCell(pValues, index);
if (dataTypeEqual(&dt, &((SExprNode*)pCell->pNode)->resType)) { if (dataTypeEqual(&dt, &((SExprNode*)pCell->pNode)->resType) && (QUERY_NODE_VALUE == nodeType(pCell->pNode))) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SNode* pCaseFunc = NULL; SNode* pCastFunc = NULL;
int32_t code = createCastFunc(pCxt, pCell->pNode, dt, &pCaseFunc); int32_t code = createCastFunc(pCxt, pCell->pNode, dt, &pCastFunc);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = scalarCalculateConstants(pCaseFunc, &pCell->pNode); code = scalarCalculateConstants(pCastFunc, &pCell->pNode);
} }
if (TSDB_CODE_SUCCESS == code && QUERY_NODE_VALUE != nodeType(pCell->pNode)) { if (TSDB_CODE_SUCCESS == code && QUERY_NODE_VALUE != nodeType(pCell->pNode)) {
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "Fill value can only accept constant"); code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "Fill value can only accept constant");

View File

@ -7,15 +7,9 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
IF (TD_GRANT) TARGET_LINK_LIBRARIES(qworker
TARGET_LINK_LIBRARIES(qworker
PRIVATE os util transport nodes planner qcom executor index grant
)
ELSE ()
TARGET_LINK_LIBRARIES(qworker
PRIVATE os util transport nodes planner qcom executor index PRIVATE os util transport nodes planner qcom executor index
) )
ENDIF()
if(${BUILD_TEST}) if(${BUILD_TEST})
ADD_SUBDIRECTORY(test) ADD_SUBDIRECTORY(test)

View File

@ -366,7 +366,7 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg.msgMask)) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) { if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg.msgMask)) && !taosGranted()) {
QW_ELOG("query failed cause of grant expired, msgMask:%d", msg.msgMask); QW_ELOG("query failed cause of grant expired, msgMask:%d", msg.msgMask);
tFreeSSubQueryMsg(&msg); tFreeSSubQueryMsg(&msg);
QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED); QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);

View File

@ -824,6 +824,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
break; break;
} }
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
queryStop = false;
} while (true); } while (true);
input.code = code; input.code = code;

View File

@ -89,6 +89,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
} }
pMeta->streamBackend = streamBackendInit(streamPath); pMeta->streamBackend = streamBackendInit(streamPath);
if (pMeta->streamBackend == NULL) {
goto _err;
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
taosMemoryFree(streamPath); taosMemoryFree(streamPath);

View File

@ -8,20 +8,9 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
# bloomFilterTest # bloomFilterTest
ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp") ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp")
#TARGET_LINK_LIBRARIES( TARGET_LINK_LIBRARIES(streamUpdateTest
# streamUpdateTest
# PUBLIC os util common gtest gtest_main stream executor
#)
IF (TD_GRANT)
TARGET_LINK_LIBRARIES(streamUpdateTest
PUBLIC os util common gtest gtest_main stream executor index grant
)
ELSE ()
TARGET_LINK_LIBRARIES(streamUpdateTest
PUBLIC os util common gtest gtest_main stream executor index PUBLIC os util common gtest gtest_main stream executor index
) )
ENDIF()
TARGET_INCLUDE_DIRECTORIES( TARGET_INCLUDE_DIRECTORIES(
streamUpdateTest streamUpdateTest

View File

@ -64,7 +64,7 @@ else()
endif() endif()
IF (JEMALLOC_ENABLED) IF (JEMALLOC_ENABLED)
target_link_libraries(os PUBLIC -ljemalloc) target_link_libraries(os PUBLIC -L${CMAKE_BINARY_DIR}/build/lib -ljemalloc)
ENDIF () ENDIF ()
if(${BUILD_TEST}) if(${BUILD_TEST})

View File

@ -449,6 +449,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb2.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb2.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb3.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb3.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py
@ -823,6 +824,8 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tagFilter.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tagFilter.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/projectionDesc.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/projectionDesc.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts_3398.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts_3398.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts_3405.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts_3423.py -N 3 -n 3
,,n,system-test,python3 ./test.py -f 2-query/queryQnode.py ,,n,system-test,python3 ./test.py -f 2-query/queryQnode.py
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode1mnode.py ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode1mnode.py
@ -932,7 +935,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mavg.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mavg.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sample.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sample.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py -Q 2 #,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/unique.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/unique.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stateduration.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stateduration.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_stateduration.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_stateduration.py -Q 2
@ -1027,7 +1030,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mavg.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mavg.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sample.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sample.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py -Q 3 #,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/unique.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/unique.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stateduration.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stateduration.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_stateduration.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_stateduration.py -Q 3
@ -1123,7 +1126,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mavg.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mavg.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sample.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sample.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/cast.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/cast.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py -Q 4 #,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/unique.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/unique.py -Q 4
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stateduration.py -Q 4 #,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stateduration.py -Q 4
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_stateduration.py -Q 4 #,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_stateduration.py -Q 4

View File

@ -380,10 +380,10 @@ if $row != 8 then
endi endi
sql select diff(k) from tm0 sql select diff(k) from tm0
if $row != 3 then if $row != 4 then
return -1 return -1
endi endi
if $data20 != -1 then if $data20 != NULL then
return -1 return -1
endi endi

View File

@ -0,0 +1,25 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database if not exists test
sql use test
sql create table t1 (ts timestamp, c2 int)
sql insert into t1 values(now, 1)
sql delete from t1 where ts is null
sql delete from t1 where ts < now
sql select ts from t1 order by ts asc
print ----------rows: $rows
if $rows != 0 then
return -1
endi
sql select ts from t1 order by ts desc
print ----------rows: $rows
if $rows != 0 then
return -1
endi

View File

@ -131,4 +131,8 @@ print $rows
if $rows != 9 then if $rows != 9 then
return -1 return -1
endi endi
print =========================== td-24781
sql select DISTINCT (`precision`) from `information_schema`.`ins_databases` PARTITION BY `precision`
#system sh/exec.sh -n dnode1 -s stop -x SIGINT #system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -42,4 +42,12 @@ endi
if $data00 != 4 then if $data00 != 4 then
return -1 return -1
endi endi
sql create table ctcount(ts timestamp, f int);
sql insert into ctcount(ts) values(now)(now+1s);
sql select count(*) from (select f from ctcount);
print $data00
if $data00 != 2 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -116,7 +116,7 @@ class TDTestCase:
tdSql.checkRows(1000) tdSql.checkRows(1000)
tdLog.info("================= step3") tdLog.info("================= step3")
tdSql.execute('drop database test') tdSql.execute('drop database test')
for i in range(50): for i in range(10):
tdSql.execute("create database test%d duration 1" %(i)) tdSql.execute("create database test%d duration 1" %(i))
tdSql.execute("use test%d" %(i)) tdSql.execute("use test%d" %(i))
tdSql.execute("create table tb (ts timestamp,i int)") tdSql.execute("create table tb (ts timestamp,i int)")

View File

@ -0,0 +1,267 @@
from itertools import product
import taos
from taos.tmq import *
from util.cases import *
from util.common import *
from util.log import *
from util.sql import *
from util.sqlset import *
class TDTestCase:
"""This test case is used to veirfy the show create stable/table command for
the different user privilege(TS-3469)
"""
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
# init the tdsql
tdSql.init(conn.cursor())
self.setsql = TDSetSql()
# user info
self.username = 'test'
self.password = 'test'
# db info
self.dbname = "user_privilege_show"
self.stbname = 'stb'
self.common_tbname = "tb"
self.ctbname_list = ["ct1", "ct2"]
self.column_dict = {
'ts': 'timestamp',
'col1': 'float',
'col2': 'int',
}
self.tag_dict = {
'ctbname': 'binary(10)'
}
# privilege check scenario info
self.privilege_check_dic = {}
self.senario_type = ["stable", "table", "ctable"]
self.priv_type = ["read", "write", "all", "none"]
# stable senarios
# include the show stable xxx command test senarios and expect result, true as have privilege, false as no privilege
# the list element is (db_privilege, stable_privilege, expect_res)
st_senarios_list = []
for senario in list(product(self.priv_type, repeat=2)):
expect_res = True
if senario == ("write", "write") or senario == ("none", "none") or senario == ("none", "write") or senario == ("write", "none"):
expect_res = False
st_senarios_list.append(senario + (expect_res,))
# self.privilege_check_dic["stable"] = st_senarios_list
# table senarios
# the list element is (db_privilege, table_privilege, expect_res)
self.privilege_check_dic["table"] = st_senarios_list
# child table senarios
# the list element is (db_privilege, stable_privilege, ctable_privilege, expect_res)
ct_senarios_list = []
for senario in list(product(self.priv_type, repeat=3)):
expect_res = True
if senario[2] == "write" or (senario[2] == "none" and senario[1] == "write") or (senario[2] == "none" and senario[1] == "none" and senario[0] == "write"):
expect_res = False
ct_senarios_list.append(senario + (expect_res,))
self.privilege_check_dic["ctable"] = ct_senarios_list
def prepare_data(self, senario_type):
"""Create the db and data for test
"""
if senario_type == "stable":
# db name
self.dbname = self.dbname + '_stable'
elif senario_type == "table":
# db name
self.dbname = self.dbname + '_table'
else:
# db name
self.dbname = self.dbname + '_ctable'
# create datebase
tdSql.execute(f"create database {self.dbname}")
tdLog.debug("sql:" + f"create database {self.dbname}")
tdSql.execute(f"use {self.dbname}")
tdLog.debug("sql:" + f"use {self.dbname}")
# create tables
if "_stable" in self.dbname:
# create stable
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
tdLog.debug("Create stable {} successfully".format(self.stbname))
elif "_table" in self.dbname:
# create common table
tdSql.execute(f"create table {self.common_tbname}(ts timestamp, col1 float, col2 int)")
tdLog.debug("sql:" + f"create table {self.common_tbname}(ts timestamp, col1 float, col2 int)")
else:
# create stable and child table
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
tdLog.debug("Create stable {} successfully".format(self.stbname))
for ctname in self.ctbname_list:
tdSql.execute(f"create table {ctname} using {self.stbname} tags('{ctname}')")
tdLog.debug("sql:" + f"create table {ctname} using {self.stbname} tags('{ctname}')")
def create_user(self):
"""Create the user for test
"""
tdSql.execute(f'create user {self.username} pass "{self.password}"')
tdLog.debug("sql:" + f'create user {self.username} pass "{self.password}"')
def grant_privilege(self, username, privilege, privilege_obj, ctable_include=False, tag_condition=None):
"""Add the privilege for the user
"""
try:
if ctable_include and tag_condition:
tdSql.execute(f'grant {privilege} on {self.dbname}.{privilege_obj} with {tag_condition} to {username}')
tdLog.debug("sql:" + f'grant {privilege} on {self.dbname}.{privilege_obj} with {tag_condition} to {username}')
else:
tdSql.execute(f'grant {privilege} on {self.dbname}.{privilege_obj} to {username}')
tdLog.debug("sql:" + f'grant {privilege} on {self.dbname}.{privilege_obj} to {username}')
except Exception as ex:
tdLog.exit(ex)
def remove_privilege(self, username, privilege, privilege_obj, ctable_include=False, tag_condition=None):
"""Remove the privilege for the user
"""
try:
if ctable_include and tag_condition:
tdSql.execute(f'revoke {privilege} on {self.dbname}.{privilege_obj} with {tag_condition} from {username}')
tdLog.debug("sql:" + f'revoke {privilege} on {self.dbname}.{privilege_obj} with {tag_condition} from {username}')
else:
tdSql.execute(f'revoke {privilege} on {self.dbname}.{privilege_obj} from {username}')
tdLog.debug("sql:" + f'revoke {privilege} on {self.dbname}.{privilege_obj} from {username}')
except Exception as ex:
tdLog.exit(ex)
def run(self):
"""Currently, the test case can't be executed for all of the privilege combinations cause
the table privilege isn't finished by dev team, only left one senario:
db read privilege for user and show create table command; will udpate the test case once
the table privilege function is finished
"""
self.create_user()
# temp solution only for the db read privilege verification
self.prepare_data("table")
# grant db read privilege
self.grant_privilege(self.username, "read", "*")
# create the taos connection with -utest -ptest
testconn = taos.connect(user=self.username, password=self.password)
testconn.execute("use %s;" % self.dbname)
# show the user privileges
res = testconn.query("select * from information_schema.ins_user_privileges;")
tdLog.debug("Current information_schema.ins_user_privileges values: {}".format(res.fetch_all()))
# query execution
sql = "show create table " + self.common_tbname + ";"
tdLog.debug("sql: %s" % sql)
res = testconn.query(sql)
# query result
tdLog.debug("sql res:" + str(res.fetch_all()))
# remove the privilege
self.remove_privilege(self.username, "read", "*")
# clear env
testconn.close()
tdSql.execute(f"drop database {self.dbname}")
"""
for senario_type in self.privilege_check_dic.keys():
tdLog.debug(f"---------check the {senario_type} privilege----------")
self.prepare_data(senario_type)
for senario in self.privilege_check_dic[senario_type]:
# grant db privilege
if senario[0] != "none":
self.grant_privilege(self.username, senario[0], "*")
# grant stable privilege
if senario[1] != "none":
self.grant_privilege(self.username, senario[1], self.stbname if senario_type == "stable" or senario_type == "ctable" else self.common_tbname)
if senario_type == "stable" or senario_type == "table":
tdLog.debug(f"check the db privilege: {senario[0]}, (s)table privilege: {senario[1]}")
else:
if senario[2] != "none":
# grant child table privilege
self.grant_privilege(self.username, senario[2], self.stbname, True, "ctbname='ct1'")
tdLog.debug(f"check the db privilege: {senario[0]}, (s)table privilege: {senario[1]}, ctable privilege: {senario[2]}")
testconn = taos.connect(user=self.username, password=self.password)
tdLog.debug("Create taos connection with user: {}, password: {}".format(self.username, self.password))
try:
testconn.execute("use %s;" % self.dbname)
except BaseException as ex:
if (senario_type in ["stable", "table"] and senario[0] == "none" and senario[1] == "none") or (senario_type == "ctable" and senario[0] == "none" and senario[1] == "none" and senario[2] == "none"):
continue
else:
tdLog.exit(ex)
# query privileges for user
res = testconn.query("select * from information_schema.ins_user_privileges;")
tdLog.debug("Current information_schema.ins_user_privileges values: {}".format(res.fetch_all()))
if senario_type == "stable" or senario_type == "table":
sql = "show create " + (("stable " + self.stbname) if senario_type == "stable" else (f"table {self.dbname}." + self.common_tbname + ";"))
if senario[2]:
tdLog.debug("sql: %s" % sql)
tdLog.debug(f"expected result: {senario[2]}")
res = testconn.query(sql)
tdLog.debug("sql res:" + res.fetch_all())
else:
exception_flag = False
try:
tdLog.debug("sql: %s" % sql)
tdLog.debug(f"expected result: {senario[2]}")
res = testconn.query(sql)
tdLog.debug("sql res:" + res.fetch_all())
except BaseException:
exception_flag = True
caller = inspect.getframeinfo(inspect.stack()[1][0])
tdLog.debug(f"{caller.filename}({caller.lineno}) failed to check the db privilege {senario[0]} and stable privilege {senario[1]} failed as expected")
if not exception_flag:
pass
# tdLog.exit("The expected exception isn't occurred")
else:
sql = f"show create table {self.dbname}.{self.ctbname_list[0]};"
if senario[3]:
tdLog.debug("sql: %s" % sql)
tdLog.debug(f"expected result: {senario[3]}")
res = testconn.query(sql)
tdLog.debug(res.fetch_all())
else:
exception_flag = False
try:
tdLog.debug("sql: %s" % sql)
tdLog.debug(f"expected result: {senario[3]}")
res = testconn.query(sql)
tdLog.debug(res.fetch_all())
except BaseException:
exception_flag = True
caller = inspect.getframeinfo(inspect.stack()[1][0])
tdLog.debug(f"{caller.filename}({caller.lineno}) failed to check the db privilege {senario[0]}, stable privilege {senario[1]} and ctable privilege {senario[2]} failed as expected")
if not exception_flag:
pass
# tdLog.exit("The expected exception isn't occurred")
# remove db privilege
if senario[0] != "none":
self.remove_privilege(self.username, senario[0], "*")
# remove stable privilege
if senario[1] != "none":
self.remove_privilege(self.username, senario[1], self.stbname if senario_type == "stable" else self.common_tbname)
# remove child table privilege
if senario_type == "ctable":
if senario[2] != "none":
self.remove_privilege(self.username, senario[2], self.ctbname_list[0], True, "ctbname='ct1'")
testconn.close()
# remove the database
tdSql.execute(f"drop database {self.dbname}")
# reset the dbname
self.dbname = "user_privilege_show"
"""
def stop(self):
# remove the user
tdSql.execute(f'drop user {self.username}')
# close the connection
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -52,6 +52,95 @@ class TDTestCase:
tdSql.checkData(0, 0, None) tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, None) tdSql.checkData(1, 0, None)
# handle null values
tdSql.execute(
f"create table {dbname}.ntb_null(ts timestamp,c1 int,c2 double,c3 float,c4 bool)")
tdSql.execute(f"insert into {dbname}.ntb_null values(now, 1, 1.0, NULL, NULL)")
tdSql.execute(f"insert into {dbname}.ntb_null values(now, NULL, 2.0, 2.0, NULL)")
tdSql.execute(f"insert into {dbname}.ntb_null values(now, 2, NULL, NULL, false)")
tdSql.execute(f"insert into {dbname}.ntb_null values(now, NULL, 1.0, 1.0, NULL)")
tdSql.execute(f"insert into {dbname}.ntb_null values(now, NULL, 3.0, NULL, true)")
tdSql.execute(f"insert into {dbname}.ntb_null values(now, 3, NULL, 3.0, NULL)")
tdSql.execute(f"insert into {dbname}.ntb_null values(now, 1, NULL, NULL, true)")
tdSql.query(f"select diff(c1) from {dbname}.ntb_null")
tdSql.checkRows(6)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, 1)
tdSql.checkData(2, 0, None)
tdSql.checkData(3, 0, None)
tdSql.checkData(4, 0, 1)
tdSql.checkData(5, 0, -2)
tdSql.query(f"select diff(c2) from {dbname}.ntb_null")
tdSql.checkRows(6)
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, None)
tdSql.checkData(2, 0, -1)
tdSql.checkData(3, 0, 2)
tdSql.checkData(4, 0, None)
tdSql.checkData(5, 0, None)
tdSql.query(f"select diff(c3) from {dbname}.ntb_null")
tdSql.checkRows(6)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, None)
tdSql.checkData(2, 0, -1)
tdSql.checkData(3, 0, None)
tdSql.checkData(4, 0, 2)
tdSql.checkData(5, 0, None)
tdSql.query(f"select diff(c4) from {dbname}.ntb_null")
tdSql.checkRows(6)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, None)
tdSql.checkData(2, 0, None)
tdSql.checkData(3, 0, 1)
tdSql.checkData(4, 0, None)
tdSql.checkData(5, 0, 0)
tdSql.query(f"select diff(c1),diff(c2),diff(c3),diff(c4) from {dbname}.ntb_null")
tdSql.checkRows(6)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, 1)
tdSql.checkData(2, 0, None)
tdSql.checkData(3, 0, None)
tdSql.checkData(4, 0, 1)
tdSql.checkData(5, 0, -2)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, None)
tdSql.checkData(2, 1, -1)
tdSql.checkData(3, 1, 2)
tdSql.checkData(4, 1, None)
tdSql.checkData(5, 1, None)
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 2, None)
tdSql.checkData(2, 2, -1)
tdSql.checkData(3, 2, None)
tdSql.checkData(4, 2, 2)
tdSql.checkData(5, 2, None)
tdSql.checkData(0, 3, None)
tdSql.checkData(1, 3, None)
tdSql.checkData(2, 3, None)
tdSql.checkData(3, 3, 1)
tdSql.checkData(4, 3, None)
tdSql.checkData(5, 3, 0)
tdSql.query(f"select diff(c1),diff(c2),diff(c3),diff(c4) from {dbname}.ntb_null where c1 is not null")
tdSql.checkRows(3)
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 1)
tdSql.checkData(2, 0, -2)
tdSql.checkData(0, 1, None)
tdSql.checkData(1, 1, None)
tdSql.checkData(2, 1, None)
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 2, None)
tdSql.checkData(2, 2, None)
tdSql.checkData(0, 3, None)
tdSql.checkData(1, 3, None)
tdSql.checkData(2, 3, 1)
tdSql.execute(f'''create table {dbname}.stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, tdSql.execute(f'''create table {dbname}.stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))''') col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))''')
tdSql.execute(f"create table {dbname}.stb_1 using {dbname}.stb tags('beijing')") tdSql.execute(f"create table {dbname}.stb_1 using {dbname}.stb tags('beijing')")
@ -103,6 +192,9 @@ class TDTestCase:
tdSql.error(f"select diff(col1,1.23) from {dbname}.stb_1") tdSql.error(f"select diff(col1,1.23) from {dbname}.stb_1")
tdSql.error(f"select diff(col1,-1) from {dbname}.stb_1") tdSql.error(f"select diff(col1,-1) from {dbname}.stb_1")
tdSql.query(f"select ts,diff(col1),ts from {dbname}.stb_1") tdSql.query(f"select ts,diff(col1),ts from {dbname}.stb_1")
tdSql.error(f"select diff(col1, 1),diff(col2) from {dbname}.stb_1")
tdSql.error(f"select diff(col1, 1),diff(col2, 0) from {dbname}.stb_1")
tdSql.error(f"select diff(col1, 1),diff(col2, 1) from {dbname}.stb_1")
tdSql.query(f"select diff(ts) from {dbname}.stb_1") tdSql.query(f"select diff(ts) from {dbname}.stb_1")
tdSql.checkRows(10) tdSql.checkRows(10)

View File

@ -127,22 +127,33 @@ class TDTestCase:
return return
else: else:
tdSql.query(f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)}") sql = f"select {col} from {table_expr} {re.sub('limit [0-9]*|offset [0-9]*','',condition)}"
tdSql.query(sql)
offset_val = condition.split("offset")[1].split(" ")[1] if "offset" in condition else 0 offset_val = condition.split("offset")[1].split(" ")[1] if "offset" in condition else 0
pre_result = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None] pre_result = np.array(tdSql.queryResult)[np.array(tdSql.queryResult) != None]
if (platform.system().lower() == 'windows' and pre_result.dtype == 'int32'): if (platform.system().lower() == 'windows' and pre_result.dtype == 'int32'):
pre_result = np.array(pre_result, dtype = 'int64') pre_result = np.array(pre_result, dtype = 'int64')
pre_diff = np.diff(pre_result)[offset_val:] pre_diff = np.diff(pre_result)[offset_val:]
tdSql.query(self.diff_query_form( if len(pre_diff) > 0:
col=col, alias=alias, table_expr=table_expr, condition=condition sql =self.diff_query_form(col=col, alias=alias, table_expr=table_expr, condition=condition)
)) tdSql.query(sql)
j = 0
diff_cnt = len(pre_diff)
for i in range(tdSql.queryRows): for i in range(tdSql.queryRows):
print(f"case in {line}: ", end='') print(f"case in {line}: i={i} j={j} pre_diff[j]={pre_diff[j]} ", end='')
if isinstance(pre_diff[i] , float ): if isinstance(pre_diff[j] , float ):
if j + 1 < diff_cnt:
j += 1
pass pass
else: else:
tdSql.checkData(i, 0, pre_diff[i]) if tdSql.getData(i,0) != None:
tdSql.checkData(i, 0, pre_diff[j])
if j + 1 < diff_cnt:
j += 1
else:
print(f"getData i={i} is None j={j} ")
else:
print("pre_diff len is zero.")
pass pass
@ -354,31 +365,31 @@ class TDTestCase:
tdSql.checkRows(229) tdSql.checkRows(229)
tdSql.checkData(0,0,0) tdSql.checkData(0,0,0)
tdSql.query("select diff(c1) from db.stb1 partition by tbname ") tdSql.query("select diff(c1) from db.stb1 partition by tbname ")
tdSql.checkRows(190) tdSql.checkRows(220)
tdSql.query("select diff(st1+c1) from db.stb1 partition by tbname") tdSql.query("select diff(st1+c1) from db.stb1 partition by tbname")
tdSql.checkRows(190) tdSql.checkRows(220)
tdSql.query("select diff(st1+c1) from db.stb1 partition by tbname") tdSql.query("select diff(st1+c1) from db.stb1 partition by tbname")
tdSql.checkRows(190) tdSql.checkRows(220)
tdSql.query("select diff(st1+c1) from db.stb1 partition by tbname") tdSql.query("select diff(st1+c1) from db.stb1 partition by tbname")
tdSql.checkRows(190) tdSql.checkRows(220)
# bug need fix # bug need fix
tdSql.query("select diff(st1+c1) from db.stb1 partition by tbname") tdSql.query("select diff(st1+c1) from db.stb1 partition by tbname")
tdSql.checkRows(190) tdSql.checkRows(220)
# bug need fix # bug need fix
tdSql.query("select tbname , diff(c1) from db.stb1 partition by tbname") tdSql.query("select tbname , diff(c1) from db.stb1 partition by tbname")
tdSql.checkRows(190) tdSql.checkRows(220)
tdSql.query("select tbname , diff(st1) from db.stb1 partition by tbname") tdSql.query("select tbname , diff(st1) from db.stb1 partition by tbname")
tdSql.checkRows(220) tdSql.checkRows(220)
# partition by tags # partition by tags
tdSql.query("select st1 , diff(c1) from db.stb1 partition by st1") tdSql.query("select st1 , diff(c1) from db.stb1 partition by st1")
tdSql.checkRows(190) tdSql.checkRows(220)
tdSql.query("select diff(c1) from db.stb1 partition by st1") tdSql.query("select diff(c1) from db.stb1 partition by st1")
tdSql.checkRows(190) tdSql.checkRows(220)
def diff_test_run(self) : def diff_test_run(self) :

View File

@ -226,6 +226,7 @@ class TDTestCase:
tdSql.checkData(3, 0, 12) tdSql.checkData(3, 0, 12)
## test fill value with scalar expression ## test fill value with scalar expression
# data types
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, 1 + 2)") tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, 1 + 2)")
tdSql.checkRows(4) tdSql.checkRows(4)
tdSql.checkData(0, 0, 3) tdSql.checkData(0, 0, 3)
@ -233,6 +234,49 @@ class TDTestCase:
tdSql.checkData(2, 0, 3) tdSql.checkData(2, 0, 3)
tdSql.checkData(3, 0, 3) tdSql.checkData(3, 0, 3)
tdSql.query(f"select interp(c1) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, 1 + 2)")
tdSql.checkRows(4)
tdSql.checkData(0, 0, 3)
tdSql.checkData(1, 0, 3)
tdSql.checkData(2, 0, 3)
tdSql.checkData(3, 0, 3)
tdSql.query(f"select interp(c2) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, 1 + 2)")
tdSql.checkRows(4)
tdSql.checkData(0, 0, 3)
tdSql.checkData(1, 0, 3)
tdSql.checkData(2, 0, 3)
tdSql.checkData(3, 0, 3)
tdSql.query(f"select interp(c3) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, 1 + 2)")
tdSql.checkRows(4)
tdSql.checkData(0, 0, 3)
tdSql.checkData(1, 0, 3)
tdSql.checkData(2, 0, 3)
tdSql.checkData(3, 0, 3)
tdSql.query(f"select interp(c4) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, 1 + 2)")
tdSql.checkRows(4)
tdSql.checkData(0, 0, 3.0)
tdSql.checkData(1, 0, 3.0)
tdSql.checkData(2, 0, 3.0)
tdSql.checkData(3, 0, 3.0)
tdSql.query(f"select interp(c5) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, 1 + 2)")
tdSql.checkRows(4)
tdSql.checkData(0, 0, 3.0)
tdSql.checkData(1, 0, 3.0)
tdSql.checkData(2, 0, 3.0)
tdSql.checkData(3, 0, 3.0)
tdSql.query(f"select interp(c6) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, 1 + 2)")
tdSql.checkRows(4)
tdSql.checkData(0, 0, True)
tdSql.checkData(1, 0, True)
tdSql.checkData(2, 0, True)
tdSql.checkData(3, 0, True)
# expr types
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, 1.0 + 2.0)") tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(value, 1.0 + 2.0)")
tdSql.checkRows(4) tdSql.checkRows(4)
tdSql.checkData(0, 0, 3) tdSql.checkData(0, 0, 3)
@ -275,6 +319,7 @@ class TDTestCase:
tdSql.checkData(2, 0, 3) tdSql.checkData(2, 0, 3)
tdSql.checkData(3, 0, 3) tdSql.checkData(3, 0, 3)
tdLog.printNoPrefix("==========step5:fill prev") tdLog.printNoPrefix("==========step5:fill prev")
## {. . .} ## {. . .}

View File

@ -172,7 +172,7 @@ class TDTestCase:
tdSql.checkRows(90) tdSql.checkRows(90)
tdSql.query(f"select c1 , diff(c1 , 0) from {dbname}.stb partition by c1") tdSql.query(f"select c1 , diff(c1 , 0) from {dbname}.stb partition by c1")
tdSql.checkRows(90) tdSql.checkRows(140)
tdSql.query(f"select c1 , csum(c1) from {dbname}.stb partition by c1") tdSql.query(f"select c1 , csum(c1) from {dbname}.stb partition by c1")
tdSql.checkRows(100) tdSql.checkRows(100)

View File

@ -0,0 +1,59 @@
from util.log import *
from util.sql import *
from util.cases import *
from util.sqlset import *
import datetime
class TDTestCase:
"""This test case is used to verify the query performance for the merge scans process of
multiple tables join
"""
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), False)
def run(self):
# test case for https://jira.taosdata.com:18080/browse/TS-3405:
# create db
ret = tdSql.execute("CREATE DATABASE IF NOT EXISTS statistics2 REPLICA {} DURATION 14400m KEEP 5256000m,5256000m,5256000m PRECISION 'ms' MINROWS 100 MAXROWS 4096 COMP 2;".format(self.replicaVar))
tdSql.execute("use statistics2;")
# create stable
ret = tdSql.execute("CREATE STABLE IF NOT EXISTS statistics2.`pg`(`day` timestamp,`lt_3` int,`c3_3` int,`c6_3` int,`c9_3` int,`c12_3` int,`c15_3` int,`c18_3` int,`c21_3` int,`c24_3` int,`c27_3` int,`ge_3` int) TAGS(`vin` binary(32));")
ret = tdSql.execute("CREATE STABLE IF NOT EXISTS statistics2.`b`(`day` timestamp, `month` int) TAGS(`group_path` binary(32),`vin` binary(32));")
ret = tdSql.execute("CREATE STABLE IF NOT EXISTS statistics2.`g`(`day` timestamp,`run_state` tinyint) TAGS(`vin` binary(32));")
# insert the data to table
insertRows = 30000
for i in range(insertRows):
ts = datetime.datetime.strptime('2023-05-01 00:00:00.000', '%Y-%m-%d %H:%M:%S.%f') + datetime.timedelta(seconds=i)
tdSql.execute("insert into d1001 using statistics2.`pg` tags('test') values ('{}', {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}) \
d2001 using statistics2.`b` tags('1#%', 'test') values ('{}', {}) \
d3001 using statistics2.`g` tags('test') values ('{}', {});".format(ts, i, i, i+1, i+2, i+3, i+4, i+5, i+6, i+7, i+8, i+9, ts, 5, ts, 1))
tdLog.info("insert %d rows" % (insertRows))
# execute the sql statements
ret = tdSql.query("SELECT sum(pg.lt_3) es1,sum(pg.c3_3) es2,sum(pg.c6_3) es3,sum(pg.c9_3) es4,sum(pg.c12_3) es5,sum(pg.c15_3) es6,sum(pg.c18_3) es7,sum(pg.c21_3) es8,sum(pg.c24_3) es9,sum(pg.c27_3) es10,sum(pg.ge_3) es11 FROM statistics2.b b,statistics2.pg pg,statistics2.g g WHERE b.`day` = pg.`day` AND b.`day` = g.`day` AND b.vin = pg.vin AND b.vin = g.vin AND b.vin IS NOT NULL AND b.`group_path` LIKE '1#%';")
# check the first query result
if (449985000, 449985000, 450015000, 450045000, 450075000, 450105000, 450135000, 450165000, 450195000, 450225000, 450255000) in tdSql.queryResult:
tdLog.info("first query result is correct")
else:
tdLog.info("first query result is wrong")
ret = tdSql.query("SELECT sum(pg.lt_3) es1, sum(pg.c3_3) es2, sum(pg.c6_3) es3, sum(pg.c9_3) es4, sum(pg.c12_3) es5, sum(pg.c15_3) es6, sum(pg.c18_3) es7, sum(pg.c21_3) es8, sum(pg.c24_3) es9, sum(pg.c27_3) es10, sum(pg.ge_3) es11 FROM (select * from statistics2.b order by day,month) b, (select * from statistics2.pg order by day,lt_3 ) pg, (select * from statistics2.g order by day,run_state) g WHERE b.`day` = pg.`day` AND b.`day` = g.`day` AND b.vin = pg.vin AND b.vin = g.vin AND b.vin IS NOT NULL;")
# check the second query result
if (449985000, 449985000, 450015000, 450045000, 450075000, 450105000, 450135000, 450165000, 450195000, 450225000, 450255000) in tdSql.queryResult:
tdLog.info("second query result is correct")
else:
tdLog.info("second query result is wrong")
def stop(self):
# clear the db
tdSql.execute("drop database if exists statistics2;")
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -0,0 +1,69 @@
from util.log import *
from util.sql import *
from util.cases import *
from util.sqlset import *
import datetime
import random
class TDTestCase:
"""This test case is used to verify last(*) query result is correct when the data
is group by tag for stable
"""
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), False)
def run(self):
# test case for https://jira.taosdata.com:18080/browse/TS-3423:
# create db
ret = tdSql.execute("CREATE DATABASE IF NOT EXISTS ts_3423 REPLICA {} DURATION 14400m KEEP 5256000m,5256000m,5256000m PRECISION 'ms' MINROWS 100 MAXROWS 4096 COMP 2;".format(self.replicaVar))
tdSql.execute("use ts_3423;")
# create stable
ret = tdSql.execute("CREATE STABLE IF NOT EXISTS ts_3423.`st_last`(`ts` timestamp,`n1` int,`n2` float) TAGS(`groupname` binary(32));")
# insert the data to table
insertRows = 10
child_table_num = 10
for i in range(insertRows):
ts = datetime.datetime.strptime('2023-05-01 00:00:00.000', '%Y-%m-%d %H:%M:%S.%f') + datetime.timedelta(seconds=i)
for j in range(child_table_num):
ret = tdSql.execute("insert into {} using ts_3423.`st_last` tags('{}') values ('{}', {}, {})".format("d" + str(j), "group" + str(j), str(ts), str(i+1), random.random()))
tdLog.info("insert %d rows for every child table" % (insertRows))
# cache model list
cache_model = ["none", "last_row", "last_value", "both"]
query_res = []
# execute the sql statements first
ret = tdSql.query("select `cachemodel` from information_schema.ins_databases where name='ts_3423'")
current_cache_model = tdSql.queryResult[0][0]
tdLog.info("query on cache model {}".format(current_cache_model))
ret = tdSql.query("select last(*) from st_last group by groupname;")
# save the results
query_res.append(len(tdSql.queryResult))
# remove the current cache model
cache_model.remove(current_cache_model)
for item in cache_model:
tdSql.execute("alter database ts_3423 cachemodel '{}';".format(item))
# execute the sql statements
ret = tdSql.query("select last(*) from st_last group by groupname;")
tdLog.info("query on cache model {}".format(item))
query_res.append(len(tdSql.queryResult))
# check the result
res = True if query_res.count(child_table_num) == 4 else False
if res:
tdLog.info("query result is correct and same among different cache model")
else:
tdLog.info("query result is wrong")
def stop(self):
# clear the db
tdSql.execute("drop database if exists ts_3423;")
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -45,6 +45,8 @@
#define SHELL_MAX_PKG_NUM 1 * 1024 * 1024 #define SHELL_MAX_PKG_NUM 1 * 1024 * 1024
#define SHELL_MIN_PKG_NUM 1 #define SHELL_MIN_PKG_NUM 1
#define SHELL_DEF_PKG_NUM 100 #define SHELL_DEF_PKG_NUM 100
#define SHELL_FLOAT_WIDTH 20
#define SHELL_DOUBLE_WIDTH 25
typedef struct { typedef struct {
char* hist[SHELL_MAX_HISTORY_SIZE]; char* hist[SHELL_MAX_HISTORY_SIZE];

View File

@ -326,6 +326,7 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
char quotationStr[2]; char quotationStr[2];
quotationStr[0] = '\"'; quotationStr[0] = '\"';
quotationStr[1] = 0; quotationStr[1] = 0;
int32_t width;
int n; int n;
char buf[TSDB_MAX_BYTES_PER_ROW]; char buf[TSDB_MAX_BYTES_PER_ROW];
@ -358,20 +359,27 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
taosFprintfFile(pFile, "%" PRIu64, *((uint64_t *)val)); taosFprintfFile(pFile, "%" PRIu64, *((uint64_t *)val));
break; break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
width = SHELL_FLOAT_WIDTH;
if (tsEnableScience) { if (tsEnableScience) {
taosFprintfFile(pFile, "%e", GET_FLOAT_VAL(val)); taosFprintfFile(pFile, "%*e", width, GET_FLOAT_VAL(val));
} else { } else {
taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val)); n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.5f", width, GET_FLOAT_VAL(val));
if (n > SHELL_FLOAT_WIDTH) {
taosFprintfFile(pFile, "%*e", width, GET_FLOAT_VAL(val));
} else {
taosFprintfFile(pFile, "%s", buf);
}
} }
break; break;
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
width = SHELL_DOUBLE_WIDTH;
if (tsEnableScience) { if (tsEnableScience) {
snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9e", 23, GET_DOUBLE_VAL(val)); snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%.9e", GET_DOUBLE_VAL(val));
taosFprintfFile(pFile, "%s", buf); taosFprintfFile(pFile, "%*s", width, buf);
} else { } else {
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", length, GET_DOUBLE_VAL(val)); n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val));
if (n > TMAX(25, length)) { if (n > SHELL_DOUBLE_WIDTH) {
taosFprintfFile(pFile, "%*.15e", length, GET_DOUBLE_VAL(val)); taosFprintfFile(pFile, "%*.15e", width, GET_DOUBLE_VAL(val));
} else { } else {
taosFprintfFile(pFile, "%s", buf); taosFprintfFile(pFile, "%s", buf);
} }
@ -607,7 +615,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
printf("%*e", width, GET_FLOAT_VAL(val)); printf("%*e", width, GET_FLOAT_VAL(val));
} else { } else {
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.5f", width, GET_FLOAT_VAL(val)); n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.5f", width, GET_FLOAT_VAL(val));
if (n > TMAX(20, width)) { if (n > SHELL_FLOAT_WIDTH) {
printf("%*e", width, GET_FLOAT_VAL(val)); printf("%*e", width, GET_FLOAT_VAL(val));
} else { } else {
printf("%s", buf); printf("%s", buf);
@ -620,7 +628,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
printf("%*s", width, buf); printf("%*s", width, buf);
} else { } else {
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val)); n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val));
if (n > TMAX(25, width)) { if (n > SHELL_DOUBLE_WIDTH) {
printf("%*.15e", width, GET_DOUBLE_VAL(val)); printf("%*.15e", width, GET_DOUBLE_VAL(val));
} else { } else {
printf("%s", buf); printf("%s", buf);
@ -757,10 +765,10 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) {
return TMAX(21, width); // '-9223372036854775807' return TMAX(21, width); // '-9223372036854775807'
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
return TMAX(20, width); return TMAX(SHELL_FLOAT_WIDTH, width);
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
return TMAX(25, width); return TMAX(SHELL_DOUBLE_WIDTH, width);
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_GEOMETRY: case TSDB_DATA_TYPE_GEOMETRY:

View File

@ -2,7 +2,7 @@ aux_source_directory(src TSIM_SRC)
add_executable(tsim ${TSIM_SRC}) add_executable(tsim ${TSIM_SRC})
target_link_libraries( target_link_libraries(
tsim tsim
PUBLIC taos_static PUBLIC taos
PUBLIC util PUBLIC util
PUBLIC common PUBLIC common
PUBLIC os PUBLIC os