diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 4b47d56a6c..423169c007 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -40,6 +40,7 @@ def pre_test(){ git reset --hard cd ${WKC} git reset --hard + git clean -fxd ''' script { if (env.CHANGE_TARGET == 'master') { diff --git a/cmake/taosws_CMakeLists.txt.in b/cmake/taosws_CMakeLists.txt.in index 3c1a7f5e73..c50e73144f 100644 --- a/cmake/taosws_CMakeLists.txt.in +++ b/cmake/taosws_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosws-rs ExternalProject_Add(taosws-rs GIT_REPOSITORY https://github.com/taosdata/taosws-rs.git - GIT_TAG 9de599d + GIT_TAG 24b199e SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/docs/zh/01-index.md b/docs/zh/01-index.md index 0db1df0de9..27d4270e4c 100644 --- a/docs/zh/01-index.md +++ b/docs/zh/01-index.md @@ -4,7 +4,7 @@ sidebar_label: 文档首页 slug: / --- -TDengine是一款开源、[高性能](https://www.taosdata.com/fast)、云原生的专为物联网、工业互联网、金融等优化设计的时序数据库(Time-Series Database)。同时它还带有内建的缓存、流式计算、数据订阅等系统功能,能大幅减少系统设计的复杂度,降低研发和运营成本,是一极简的时序数据处理平台。本文档是 TDengine 用户手册,主要是介绍 TDengine 的基本概念、安装、使用、功能、开发接口、运营维护、TDengine 内核设计等等,它主要是面向架构师、开发者与系统管理员的。 +TDengine是一款开源、[高性能](https://www.taosdata.com/fast)、云原生的时序数据库(Time-Series Database, TSDB), 它专为物联网、工业互联网、金融等场景优化设计。同时它还带有内建的缓存、流式计算、数据订阅等系统功能,能大幅减少系统设计的复杂度,降低研发和运营成本,是一极简的时序数据处理平台。本文档是 TDengine 用户手册,主要是介绍 TDengine 的基本概念、安装、使用、功能、开发接口、运营维护、TDengine 内核设计等等,它主要是面向架构师、开发者与系统管理员的。 TDengine 充分利用了时序数据的特点,提出了“一个数据采集点一张表”与“超级表”的概念,设计了创新的存储引擎,让数据的写入、查询和存储效率都得到极大的提升。为正确理解并使用TDengine, 无论如何,请您仔细阅读[基本概念](./concept)一章。 diff --git a/docs/zh/02-intro.md b/docs/zh/02-intro.md index 191e1cbcc2..91f0376d8c 100644 --- a/docs/zh/02-intro.md +++ b/docs/zh/02-intro.md @@ -3,7 +3,7 @@ title: 产品简介 toc_max_heading_level: 2 --- -TDengine 是一款高性能、分布式、支持 SQL 的时序数据库 (Database),其核心代码,包括集群功能全部开源(开源协议,AGPL v3.0)。TDengine 能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。除核心的时序数据库 (Database) 功能外,TDengine 还提供[缓存](/develop/cache/)、[数据订阅](/develop/subscribe)、[流式计算](/develop/continuous-query)等大数据平台所需要的系列功能,最大程度减少研发和运维的复杂度。 +TDengine 是一款开源、高性能、云原生的时序数据库 (Time-Series Database, TSDB)。TDengine 能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。除核心的时序数据库功能外,TDengine 还提供[缓存](/develop/cache/)、[数据订阅](/develop/subscribe)、[流式计算](/develop/continuous-query)等功能,是一极简的时序数据处理平台,最大程度的减小系统设计的复杂度,降低研发和运营成本。 本章节介绍TDengine的主要功能、竞争优势、适用场景、与其他数据库的对比测试等等,让大家对TDengine有个整体的了解。 @@ -16,7 +16,7 @@ TDengine的主要功能如下: 3. 支持[各种查询](/develop/query-data),包括聚合查询、嵌套查询、降采样查询、插值等 4. 支持[用户自定义函数](/develop/udf) 5. 支持[缓存](/develop/cache),将每张表的最后一条记录缓存起来,这样无需 Redis -6. 支持[连续查询](/develop/continuous-query)(Continuous Query) +6. 支持[流式计算](/develop/continuous-query)(Stream Processing) 7. 支持[数据订阅](/develop/subscribe),而且可以指定过滤条件 8. 支持[集群](/cluster/),可以通过多节点进行水平扩展,并通过多副本实现高可靠 9. 提供[命令行程序](/reference/taos-shell),便于管理集群,检查系统状态,做即席查询 @@ -33,28 +33,24 @@ TDengine的主要功能如下: 由于 TDengine 充分利用了[时序数据特点](https://www.taosdata.com/blog/2019/07/09/105.html),比如结构化、无需事务、很少删除或更新、写多读少等等,设计了全新的针对时序数据的存储引擎和计算引擎,因此与其他时序数据库相比,TDengine 有以下特点: -- **[高性能](https://www.taosdata.com/fast)**:通过创新的存储引擎设计,无论是数据写入还是查询,TDengine 的性能比通用数据库快 10 倍以上,也远超其他时序数据库,而且存储空间也大为节省。 +- **高性能**:通过创新的存储引擎设计,无论是数据写入还是查询,TDengine 的性能比通用数据库快 10 倍以上,也远超其他时序数据库,存储空间不及通用数据库的1/10。 -- **[分布式](https://www.taosdata.com/scalable)**:通过原生分布式的设计,TDengine 提供了水平扩展的能力,只需要增加节点就能获得更强的数据处理能力,同时通过多副本机制保证了系统的高可用。 +- **云原生**:通过原生分布式的设计,充分利用云平台的优势,TDengine 提供了水平扩展能力,具备弹性、韧性和可观测性,支持k8s部署,可运行在公有云、私有云和混合云上。 -- **[支持 SQL](https://www.taosdata.com/sql-support)**:TDengine 采用 SQL 作为数据查询语言,减少学习和迁移成本,同时提供 SQL 扩展来处理时序数据特有的分析,而且支持方便灵活的 schemaless 数据写入。 +- **极简时序数据平台**:TDengine 内建消息队列、缓存、流式计算等功能,应用无需再集成 Kafka/Redis/HBase/Spark 等软件,大幅降低系统的复杂度,降低应用开发和运营成本。 -- **All in One**:将数据库、消息队列、缓存、流式计算等功能融合一起,应用无需再集成 Kafka/Redis/HBase/Spark 等软件,大幅降低应用开发和维护成本。 +- **分析能力**:支持 SQL,同时为时序数据特有的分析提供SQL扩展。通过超级表、存储计算分离、分区分片、预计算、自定义函数等技术,TDengine 具备强大的分析能力。 -- **零管理**:安装、集群几秒搞定,无任何依赖,不用分库分表,系统运行状态监测能与 Grafana 或其他运维工具无缝集成。 +- **简单易用**:无任何依赖,安装、集群几秒搞定;提供REST以及各种语言连接器,与众多第三方工具无缝集成;提供命令行程序,便于管理和即席查询;提供各种运维工具。 -- **零学习成本**:采用 SQL 查询语言,支持 C/C++、Python、Java、Go、Rust、Node.js、C#、Lua(社区贡献)、PHP(社区贡献) 等多种编程语言,与 MySQL 相似,零学习成本。 - -- **无缝集成**:不用一行代码,即可与 Telegraf、Grafana、Prometheus、EMQX、HiveMQ、StatsD、collectd、icinga、TCollector、Matlab、R 等第三方工具无缝集成。 - -- **互动 Console**: 通过命令行 console,不用编程,执行 SQL 语句就能做即席查询、各种数据库的操作、管理以及集群的维护. +- **核心开源**:TDengine 的核心代码包括集群功能全部开源,截止到2022年8月1日,全球超过 135.9k 个运行实例,GitHub Star 18.7k,Fork 4.4k,社区活跃。 采用 TDengine,可将典型的物联网、车联网、工业互联网大数据平台的总拥有成本大幅降低。表现在几个方面: 1. 由于其超强性能,它能将系统需要的计算资源和存储资源大幅降低 -2. 因为采用 SQL 接口,能与众多第三方软件无缝集成,学习迁移成本大幅下降 -3. 因为其 All In One 的特性,系统复杂度降低,能降研发成本 -4. 因为运维维护简单,运营维护成本能大幅降低 +2. 因为支持 SQL,能与众多第三方软件无缝集成,学习迁移成本大幅下降 +3. 因为是一极简的时序数据平台,系统复杂度、研发和运营成本大幅降低 +4. 因为维护简单,运营维护成本能大幅降低 ## 技术生态 diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 22aac46560..7839859e8b 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -184,6 +184,7 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u int32_t getJsonValueLen(const char* data); int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); +int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, int32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, diff --git a/include/common/tgrant.h b/include/common/tgrant.h index 09c6e5378e..6392fcf517 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -22,6 +22,9 @@ extern "C" { #include "os.h" #include "taoserror.h" +#ifdef GRANTS_CFG +#include "tgrantCfg.h" +#endif typedef enum { TSDB_GRANT_ALL, @@ -37,10 +40,38 @@ typedef enum { TSDB_GRANT_CONNS, TSDB_GRANT_STREAMS, TSDB_GRANT_CPU_CORES, + TSDB_GRANT_STABLE, + TSDB_GRANT_TABLE, } EGrantType; int32_t grantCheck(EGrantType grant); +#ifndef GRANTS_CFG +#define GRANTS_SCHEMA static const SSysDbTableSchema grantsSchema[] = { \ + {.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "expire time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "storage(GB)", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "accounts", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "cpu cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "speed(PPS)", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ +} +#define GRANT_CFG_ADD +#define GRANT_CFG_SET +#define GRANT_CFG_GET +#define GRANT_CFG_CHECK +#define GRANT_CFG_SKIP +#define GRANT_CFG_DECLARE +#define GRANT_CFG_EXTERN +#endif + #ifdef __cplusplus } #endif diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index aec8a1f73e..d96a55c74c 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -28,10 +28,10 @@ extern bool gRaftDetailLog; #define SYNC_RESP_TTL_MS 10000000 -#define SYNC_MAX_BATCH_SIZE 500 -#define SYNC_INDEX_BEGIN 0 -#define SYNC_INDEX_INVALID -1 -#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF +#define SYNC_MAX_BATCH_SIZE 1 +#define SYNC_INDEX_BEGIN 0 +#define SYNC_INDEX_INVALID -1 +#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF typedef enum { SYNC_STRATEGY_NO_SNAPSHOT = 0, diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 28c1174dfd..b15be12490 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -406,6 +406,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_GRANT_STORAGE_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0809) #define TSDB_CODE_GRANT_QUERYTIME_LIMITED TAOS_DEF_ERROR_CODE(0, 0x080A) #define TSDB_CODE_GRANT_CPU_LIMITED TAOS_DEF_ERROR_CODE(0, 0x080B) +#define TSDB_CODE_GRANT_STABLE_LIMITED TAOS_DEF_ERROR_CODE(0, 0x080C) +#define TSDB_CODE_GRANT_TABLE_LIMITED TAOS_DEF_ERROR_CODE(0, 0x080D) // sync #define TSDB_CODE_SYN_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0903) diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index e01b113a04..2b5d440a73 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -1,14 +1,24 @@ aux_source_directory(src COMMON_SRC) add_library(common STATIC ${COMMON_SRC}) + +if (DEFINED GRANT_CFG_INCLUDE_DIR) + add_definitions(-DGRANTS_CFG) +endif() + target_include_directories( common PUBLIC "${TD_SOURCE_DIR}/include/common" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -IF(${TD_WINDOWS}) - PRIVATE "${TD_SOURCE_DIR}/contrib/pthread" - PRIVATE "${TD_SOURCE_DIR}/contrib/msvcregex" -ENDIF () + PRIVATE "${GRANT_CFG_INCLUDE_DIR}" ) +IF(${TD_WINDOWS}) + target_include_directories( + common + PRIVATE "${TD_SOURCE_DIR}/contrib/pthread" + PRIVATE "${TD_SOURCE_DIR}/contrib/msvcregex" + ) +ENDIF () + target_link_libraries( common PUBLIC os diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 7d07946dc9..c4a3ce2d6b 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -17,6 +17,7 @@ #include "taos.h" #include "tdef.h" #include "types.h" +#include "tgrant.h" #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) @@ -188,22 +189,7 @@ static const SSysDbTableSchema userUsersSchema[] = { {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, }; -static const SSysDbTableSchema grantsSchema[] = { - {.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "expire time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "storage(GB)", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "accounts", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "cpu cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "speed(PPS)", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, -}; +GRANTS_SCHEMA; static const SSysDbTableSchema vgroupsSchema[] = { {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 7cfc1c0b1d..2a8f80f030 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -118,6 +118,76 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con return 0; } +int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) { + if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + return TSDB_CODE_SUCCESS; + } + + if (pColumnInfoData->varmeta.allocLen >= newSize) { + return TSDB_CODE_SUCCESS; + } + + if (pColumnInfoData->varmeta.allocLen < newSize) { + char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pColumnInfoData->pData = buf; + pColumnInfoData->varmeta.allocLen = newSize; + } + + return TSDB_CODE_SUCCESS; +} + +static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData, int32_t itemLen, int32_t numOfRows) { + ASSERT(pColumnInfoData->info.bytes >= itemLen); + size_t start = 1; + + // the first item + memcpy(pColumnInfoData->pData, pData, itemLen); + + int32_t t = 0; + int32_t count = log(numOfRows)/log(2); + while(t < count) { + int32_t xlen = 1 << t; + memcpy(pColumnInfoData->pData + start * itemLen + pColumnInfoData->varmeta.length, pColumnInfoData->pData, xlen * itemLen); + t += 1; + start += xlen; + } + + // the tail part + if (numOfRows > start) { + memcpy(pColumnInfoData->pData + start * itemLen + currentRow * itemLen, pColumnInfoData->pData, (numOfRows - start) * itemLen); + } + + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + for(int32_t i = 0; i < numOfRows; ++i) { + pColumnInfoData->varmeta.offset[i + currentRow] = pColumnInfoData->varmeta.length + i * itemLen; + } + + pColumnInfoData->varmeta.length += numOfRows * itemLen; + } +} + +int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows) { + ASSERT(pData != NULL && pColumnInfoData != NULL); + + int32_t len = pColumnInfoData->info.bytes; + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + len = varDataTLen(pData); + if (pColumnInfoData->varmeta.allocLen < (numOfRows + currentRow) * len) { + int32_t code = colDataReserve(pColumnInfoData, (numOfRows + currentRow) * len); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + } + + doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows); + return TSDB_CODE_SUCCESS; +} + static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource, int32_t numOfRow2) { if (numOfRow2 <= 0) return; @@ -1113,15 +1183,12 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { if (IS_VAR_DATA_TYPE(pColumn->info.type)) { pColumn->varmeta.length = 0; - if (pColumn->varmeta.offset > 0) { + if (pColumn->varmeta.offset != NULL) { memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows); } } else { if (pColumn->nullbitmap != NULL) { memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows)); - if (pColumn->pData != NULL) { - memset(pColumn->pData, 0, pColumn->info.bytes * numOfRows); - } } } } @@ -1725,6 +1792,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) } switch (pColInfoData->info.type) { case TSDB_DATA_TYPE_TIMESTAMP: + memset(pBuf, 0, sizeof(pBuf)); formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI); len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf); if (len >= size - 1) return dumpBuf; @@ -1753,6 +1821,26 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var); if (len >= size - 1) return dumpBuf; break; + case TSDB_DATA_TYPE_BOOL: + len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var); + if (len >= size - 1) return dumpBuf; + break; + case TSDB_DATA_TYPE_VARCHAR: { + memset(pBuf, 0, sizeof(pBuf)); + char* pData = colDataGetVarData(pColInfoData, j); + int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData)); + memcpy(pBuf, varDataVal(pData), dataSize); + len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); + if (len >= size - 1) return dumpBuf; + } break; + case TSDB_DATA_TYPE_NCHAR: { + char* pData = colDataGetVarData(pColInfoData, j); + int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData)); + memset(pBuf, 0, sizeof(pBuf)); + taosUcs4ToMbs((TdUcs4 *)varDataVal(pData), dataSize, pBuf); + len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); + if (len >= size - 1) return dumpBuf; + } break; } } len += snprintf(dumpBuf + len, size - len, "\n"); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 6f4a3060ed..41e3917937 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -19,6 +19,9 @@ #include "tconfig.h" #include "tdatablock.h" #include "tlog.h" +#include "tgrant.h" + +GRANT_CFG_DECLARE; SConfig *tsCfg = NULL; @@ -441,6 +444,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 100000, 1) != 0) return -1; if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; + GRANT_CFG_ADD; return 0; } @@ -590,7 +594,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } - + GRANT_CFG_GET; return 0; } @@ -603,6 +607,7 @@ void taosLocalCfgForbiddenToChange(char* name, bool* forbidden) { *forbidden = true; return; } + GRANT_CFG_CHECK; *forbidden = false; } diff --git a/source/dnode/mgmt/mgmt_dnode/CMakeLists.txt b/source/dnode/mgmt/mgmt_dnode/CMakeLists.txt index a4268fc9f0..fdd0830a58 100644 --- a/source/dnode/mgmt/mgmt_dnode/CMakeLists.txt +++ b/source/dnode/mgmt/mgmt_dnode/CMakeLists.txt @@ -1,8 +1,12 @@ aux_source_directory(src MGMT_DNODE) add_library(mgmt_dnode STATIC ${MGMT_DNODE}) +if (DEFINED GRANT_CFG_INCLUDE_DIR) + add_definitions(-DGRANTS_CFG) +endif() target_include_directories( mgmt_dnode PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc" + PUBLIC "${GRANT_CFG_INCLUDE_DIR}" ) target_link_libraries( mgmt_dnode node_util diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 27a4056249..dd98816161 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "dmInt.h" #include "systable.h" +#include "tgrant.h" extern SConfig *tsCfg; @@ -223,6 +224,7 @@ int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) { for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) { SConfigItem *pItem = taosArrayGet(tsCfg->array, i); + GRANT_CFG_SKIP; SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, c++); colDataAppend(pColInfo, i, (const char *)&dnodeId, false); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c003f5a63f..bd1b2db145 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -146,7 +146,8 @@ static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanI static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); -static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex); +static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, + int32_t rowIndex); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); @@ -758,7 +759,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn pReader->cost.blockLoadTime += elapsedTime; int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1; - tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 + tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows, pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr); @@ -1444,9 +1445,9 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI // 2. the direct next point is not an duplicated timestamp if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) || (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) { - int32_t step = pReader->order == TSDB_ORDER_ASC? 1:-1; + int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1; int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step]; - if (nextKey != key) { // merge is not needed + if (nextKey != key) { // merge is not needed doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); pDumpInfo->rowIndex += step; return TSDB_CODE_SUCCESS; @@ -2134,15 +2135,18 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe int32_t sversion = TSDBROW_SVERSION(pRow); STSchema* pTSchema = NULL; - if (sversion != pReader->pSchema->version) { + if (pReader->pSchema == NULL || sversion != pReader->pSchema->version) { metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema); + if (pReader->pSchema == NULL) { + pReader->pSchema = pTSchema; + } } else { pTSchema = pReader->pSchema; } tRowMergerAdd(pMerger, pRow, pTSchema); - if (sversion != pReader->pSchema->version) { + if (pTSchema != pReader->pSchema) { taosMemoryFree(pTSchema); } } @@ -2230,7 +2234,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc int32_t step = asc ? 1 : -1; pDumpInfo->rowIndex += step; - if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) ||(pDumpInfo->rowIndex >= 0 && !asc)) { + if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) { pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); } @@ -2271,8 +2275,11 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe // updateSchema(pRow, uid, pReader); int32_t sversion = TSDBROW_SVERSION(pRow); STSchema* pTSchema = NULL; - if (sversion != pReader->pSchema->version) { + if (pReader->pSchema == NULL || sversion != pReader->pSchema->version) { metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema); + if (pReader->pSchema == NULL) { + pReader->pSchema = pTSchema; + } } else { pTSchema = pReader->pSchema; } @@ -2282,7 +2289,7 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe tRowMergerGetRow(&merge, pTSRow); tRowMergerClear(&merge); - if (sversion != pReader->pSchema->version) { + if (pTSchema != pReader->pSchema) { taosMemoryFree(pTSchema); } } @@ -2425,9 +2432,9 @@ int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBloc int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx); int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock); - while(i < numOfOutputCols && j < numOfInputCols) { + while (i < numOfOutputCols && j < numOfInputCols) { SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i); - SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j); + SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j); if (pData->cid == pCol->info.colId) { tColDataGetValue(pData, rowIndex, &cv); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 13b45f3164..99173e87d7 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -695,7 +695,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { SSyncInfo syncInfo = { .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST, //.snapshotStrategy = SYNC_STRATEGY_NO_SNAPSHOT, - .batchSize = 10, + .batchSize = 1, .vgId = pVnode->config.vgId, .isStandBy = pVnode->config.standby, .syncCfg = pVnode->config.syncCfg, diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 5b5c6010e8..816309beb1 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -416,6 +416,7 @@ typedef struct SCtgCacheOperation { bool syncOp; tsem_t rspSem; bool stopQueue; + bool unLocked; } SCtgCacheOperation; typedef struct SCtgQNode { diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 06e8216e87..2e8e259151 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -674,7 +674,13 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { tsem_post(&gCtgMgmt.queue.reqSem); if (syncOp) { + if (!operation->unLocked) { + CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); + } tsem_wait(&operation->rspSem); + if (!operation->unLocked) { + CTG_LOCK(CTG_READ, &gCtgMgmt.lock); + } taosMemoryFree(operation); } @@ -1011,6 +1017,7 @@ int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool op->opId = CTG_OP_CLEAR_CACHE; op->syncOp = syncOp; op->stopQueue = stopQueue; + op->unLocked = true; SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg)); if (NULL == msg) { diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index bd3402dc39..8333cb28c0 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -19,7 +19,7 @@ #include "catalogInt.h" extern SCatalogMgmt gCtgMgmt; -SCtgDebug gCTGDebug = {0}; +SCtgDebug gCTGDebug = {.lockEnable = true}; void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) { ASSERT(*(int32_t*)param == 1); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 55bfc88a49..0f97b5c5b1 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -331,7 +331,7 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { SHashObj* pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (NULL == pBatchs) { ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM); - CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } pTask->pBatchs = pBatchs; #endif diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index a76b457422..2ff76f937d 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -19,6 +19,7 @@ #include "scheduler.h" #include "tdatablock.h" #include "tglobal.h" +#include "tgrant.h" extern SConfig* tsCfg; @@ -563,7 +564,7 @@ int32_t setLocalVariablesResultIntoDataBlock(SSDataBlock* pBlock) { for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) { SConfigItem* pItem = taosArrayGet(tsCfg->array, i); - + GRANT_CFG_SKIP; char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 577f9772be..f16b485240 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -917,9 +917,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, - SNode* pCondition, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 96c20d6136..ed225e1dcb 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -571,6 +571,10 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* } *numOfExprs = numOfFuncs + numOfGroupKeys; + if (*numOfExprs == 0) { + return NULL; + } + SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo)); for (int32_t i = 0; i < (*numOfExprs); ++i) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e52cbf40a9..6804a1258c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4171,16 +4171,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; - - STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, - .calTrigger = pSessionNode->window.triggerType}; - - SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; - - pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, - pPhyNode->pConditions, pTaskInfo); + pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) { pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8821dbd5a1..a1b00fb1fb 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -405,9 +405,15 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int data = (char*)p; } - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - colDataAppend(pColInfoData, i, data, - (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); + bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)); + if (isNullVal) { + colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows); + } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) { + colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows); + } else { // todo opt for json tag + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + colDataAppend(pColInfoData, i, data, false); + } } if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL && diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 26dd772292..e371e6d9cf 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -152,7 +152,7 @@ SSDataBlock* loadNextDataBlock(void* param) { void applyScalarFunction(SSDataBlock* pBlock, void* param) { SOperatorInfo* pOperator = param; SSortOperatorInfo* pSort = pOperator->info; - if (pOperator->exprSupp.pExprInfo != NULL) { + if (pOperator->exprSupp.pExprInfo != NULL && pOperator->exprSupp.numOfExprs > 0) { int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 802e1f2306..a14f554cf5 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2439,12 +2439,14 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); + colDataDestroy(&pInfo->twAggSup.timeWindowData); + + cleanupAggSup(&pInfo->aggSup); + cleanupGroupResInfo(&pInfo->groupResInfo); taosMemoryFreeClear(param); } -SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, - STimeWindowAggSupp* pTwAggSupp, SNode* pCondition, +SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo) { SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -2455,6 +2457,10 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); + int32_t numOfCols = 0; + SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); + SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc); + int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -2462,16 +2468,19 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo initBasicInfo(&pInfo->binfo, pResBlock); - pInfo->twAggSup = *pTwAggSupp; + pInfo->twAggSup.waterMark = pSessionNode->window.watermark; + pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType; + pInfo->gap = pSessionNode->gap; + initResultRowInfo(&pInfo->binfo.resultRowInfo); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->tsSlotId = tsSlotId; - pInfo->gap = gap; - pInfo->binfo.pRes = pResBlock; + pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; + pInfo->binfo.pRes = pResBlock; pInfo->winSup.prevTs = INT64_MIN; - pInfo->reptScan = false; - pInfo->pCondition = pCondition; + pInfo->reptScan = false; + pInfo->pCondition = pSessionNode->window.node.pConditions; + pOperator->name = "SessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION; pOperator->blocking = true; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index e54bc9eb4c..456a0bea85 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -81,10 +81,8 @@ static int32_t addNamespace(STranslateContext* pCxt, void* pTable) { SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel); taosArrayPush(pTables, &pTable); if (hasSameTableAlias(pTables)) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, - TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS, - "Not unique table/alias: '%s'", - ((STableNode*)pTable)->tableAlias); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS, + "Not unique table/alias: '%s'", ((STableNode*)pTable)->tableAlias); } } else { do { @@ -92,10 +90,8 @@ static int32_t addNamespace(STranslateContext* pCxt, void* pTable) { if (pCxt->currLevel == currTotalLevel) { taosArrayPush(pTables, &pTable); if (hasSameTableAlias(pTables)) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, - TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS, - "Not unique table/alias: '%s'", - ((STableNode*)pTable)->tableAlias); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS, + "Not unique table/alias: '%s'", ((STableNode*)pTable)->tableAlias); } } taosArrayPush(pCxt->pNsLevel, &pTables); @@ -1587,6 +1583,7 @@ static EDealRes rewriteExprToGroupKeyFunc(STranslateContext* pCxt, SNode** pNode strcpy(pFunc->functionName, "_group_key"); strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName); + strcpy(pFunc->node.userAlias, ((SExprNode*)*pNode)->userAlias); pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode); if (TSDB_CODE_SUCCESS == pCxt->errCode) { *pNode = (SNode*)pFunc; @@ -2644,27 +2641,6 @@ static int32_t appendTsForImplicitTsFunc(STranslateContext* pCxt, SSelectStmt* p return pCxt->errCode; } -typedef struct SRwriteUniqueCxt { - STranslateContext* pTranslateCxt; - SNode* pExpr; -} SRwriteUniqueCxt; - -static EDealRes rewriteSeletcValueFunc(STranslateContext* pCxt, SNode** pNode) { - SFunctionNode* pFirst = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); - if (NULL == pFirst) { - pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; - return DEAL_RES_ERROR; - } - strcpy(pFirst->functionName, "first"); - TSWAP(pFirst->pParameterList, ((SFunctionNode*)*pNode)->pParameterList); - strcpy(pFirst->node.aliasName, ((SExprNode*)*pNode)->aliasName); - nodesDestroyNode(*pNode); - *pNode = (SNode*)pFirst; - pCxt->errCode = fmGetFuncInfo(pFirst, pCxt->msgBuf.buf, pCxt->msgBuf.len); - ((SSelectStmt*)pCxt->pCurrStmt)->hasAggFuncs = true; - return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR; -} - typedef struct SReplaceOrderByAliasCxt { STranslateContext* pTranslateCxt; SNodeList* pProjectionList; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index a70afe25d6..2d889dd925 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1727,16 +1727,11 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; metaGetTableNameByUid(pInput->param, uid, str); - - for(int32_t i = 0; i < pInput->numOfRows; ++i) { - colDataAppend(pOutput->columnData, pOutput->numOfRows + i, str, false); - } - + colDataAppendNItems(pOutput->columnData, pOutput->numOfRows, str, pInput->numOfRows); pOutput->numOfRows += pInput->numOfRows; return TSDB_CODE_SUCCESS; } - /** Aggregation functions **/ int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { SColumnInfoData *pInputData = pInput->columnData; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 293e3e3c35..431e479123 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -658,7 +658,6 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); transRemoveExHandle(transGetRefMgt(), conn->refId); - transDestroyBuffer(&conn->readBuf); conn->refId = -1; if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); @@ -685,7 +684,7 @@ static void cliDestroy(uv_handle_t* handle) { transQueueDestroy(&conn->cliMsgs); tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); transReqQueueClear(&conn->wreqQueue); - + transDestroyBuffer(&conn->readBuf); taosMemoryFree(conn); } static bool cliHandleNoResp(SCliConn* conn) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index fd420203e8..8b27d95e52 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -830,7 +830,6 @@ static void destroyConn(SSvrConn* conn, bool clear) { return; } - transDestroyBuffer(&conn->readBuf); if (clear) { if (!uv_is_closing((uv_handle_t*)conn->pTcp)) { tTrace("conn %p to be destroyed", conn); @@ -881,6 +880,7 @@ static void uvDestroyConn(uv_handle_t* handle) { QUEUE_REMOVE(&conn->queue); taosMemoryFree(conn->pTcp); destroyConnRegArg(conn); + transDestroyBuffer(&conn->readBuf); taosMemoryFree(conn); if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { diff --git a/source/os/src/osMath.c b/source/os/src/osMath.c index 98cd63a831..cd2acac261 100644 --- a/source/os/src/osMath.c +++ b/source/os/src/osMath.c @@ -31,6 +31,7 @@ void swapStr(char* j, char* J, int width) { } #endif +// todo refactor: 1) move away; 2) use merge sort instead; 3) qsort is not a stable sort actually. void taosSort(void* arr, int64_t sz, int64_t width, __compar_fn_t compar) { #ifdef WINDOWS int64_t i, j; diff --git a/source/util/CMakeLists.txt b/source/util/CMakeLists.txt index 8553d714fc..8f3bd42a47 100644 --- a/source/util/CMakeLists.txt +++ b/source/util/CMakeLists.txt @@ -1,10 +1,15 @@ configure_file("${CMAKE_CURRENT_SOURCE_DIR}/src/version.c.in" "${CMAKE_CURRENT_SOURCE_DIR}/src/version.c") aux_source_directory(src UTIL_SRC) add_library(util STATIC ${UTIL_SRC}) +if (DEFINED GRANT_CFG_INCLUDE_DIR) + add_definitions(-DGRANTS_CFG) +endif() target_include_directories( util PUBLIC "${TD_SOURCE_DIR}/include/util" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" + PRIVATE "${TD_SOURCE_DIR}/include/common" + PRIVATE "${GRANT_CFG_INCLUDE_DIR}" ) target_link_libraries( util diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index ab7e30bab2..fdb397561d 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -21,6 +21,7 @@ #include "tenv.h" #include "cJSON.h" #include "tjson.h" +#include "tgrant.h" #define CFG_NAME_PRINT_LEN 24 #define CFG_SRC_PRINT_LEN 12 @@ -301,6 +302,7 @@ static int32_t cfgSetTfsItem(SConfig *pCfg, const char *name, const char *value, } int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype) { + GRANT_CFG_SET; SConfigItem *pItem = cfgGetItem(pCfg, name); if (pItem == NULL) { return -1; diff --git a/tests/system-test/2-query/qnodeCluster.py b/tests/system-test/2-query/qnodeCluster.py new file mode 100644 index 0000000000..23adfb768d --- /dev/null +++ b/tests/system-test/2-query/qnodeCluster.py @@ -0,0 +1,290 @@ +# from asyncio.windows_events import NULL +import taos +import sys +import datetime +import inspect +import random + +from util.log import * +from util.sql import * +from util.cases import * +from util.cluster import * +from util.common import * + +sys.path.append("./6-cluster/") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + +import threading + +class TDTestCase: + + clientCfgDict = {'queryproxy': '1','debugFlag': 135} + clientCfgDict["debugFlag"] = 131 + updatecfgDict = {'clientCfg': {}} + updatecfgDict = {'debugFlag': 131} + updatecfgDict = {'keepColumnName': 1} + updatecfgDict["clientCfg"] = clientCfgDict + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + + def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1): + tsql.execute("use %s" %dbName) + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + tagValue = 'beijing' + if (i % 10 == 0): + sql += " %s%d using %s (name,fleet,driver,device_version) tags('truck_%d', 'South%d','Trish%d','v2.%d')"%(ctbPrefix,i,stbName,i,i,i,i) + else: + model = 'H-%d'%i + sql += " %s%d using %s tags('truck_%d', 'South%d','Trish%d','%s','v2.%d')"%(ctbPrefix,i,stbName,i,i,i,model,i) + if (i > 0) and (i%1000 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) + return + + def prepareData(self): + dbname="db_tsbs" + stabname1="readings" + stabname2="diagnostics" + ctbnamePre1="rct" + ctbnamePre2="dct" + ctbNums=40 + self.ctbNums=ctbNums + rowNUms=100 + ts=1451606400000 + tdSql.execute(f"create database {dbname};") + tdSql.execute(f"use {dbname} ") + tdSql.execute(f''' + create table {stabname1} (ts timestamp,latitude double,longitude double,elevation double,velocity double,heading double,grade double,fuel_consumption double,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30)); + ''') + tdSql.execute(f''' + create table {stabname2} (ts timestamp,fuel_state double,current_load double,status bigint,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30)) ; + ''') + self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname1,ctbPrefix=ctbnamePre1,ctbNum=ctbNums) + self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname2,ctbPrefix=ctbnamePre2,ctbNum=ctbNums) + + + for j in range(ctbNums): + for i in range(rowNUms): + tdSql.execute( + f"insert into rct{j} values ( {ts+i*60000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )" + ) + status= random.randint(0,1) + tdSql.execute( + f"insert into dct{j} values ( {ts+i*60000}, {1+i*0.1},{1400+i*15}, {status},{1500+i*20}, {150+i*2},{5+i} )" + ) + tdSql.execute("insert into dct9 (ts,fuel_state) values('2021-07-13 14:06:33.123Z',1.2) ;") + # def check_avg(self ,origin_query , check_query): + # avg_result = tdSql.getResult(origin_query) + # origin_result = tdSql.getResult(check_query) + + # check_status = True + # for row_index , row in enumerate(avg_result): + # for col_index , elem in enumerate(row): + # if avg_result[row_index][col_index] != origin_result[row_index][col_index]: + # check_status = False + # if not check_status: + # tdLog.notice("avg function value has not as expected , sql is \"%s\" "%origin_query ) + # sys.exit(1) + # else: + # tdLog.info("avg value check pass , it work as expected ,sql is \"%s\" "%check_query ) + + + def createCluster(self): + tdSql.execute("create mnode on dnode 2") + tdSql.execute("create mnode on dnode 3") + tdSql.execute("create qnode on dnode 1") + tdSql.execute("create qnode on dnode 2") + tdSql.execute("create qnode on dnode 3") + time.sleep(10) + + def tsbsIotQuery(self,tdSql): + tdSql.execute("use db_tsbs") + + # test interval and partition + tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet; ") + # print(tdSql.queryResult) + parRows=tdSql.queryRows + tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet interval(10m); ") + tdSql.checkRows(parRows) + + + # # test insert into + # tdSql.execute("create table testsnode (ts timestamp, c1 float,c2 binary(30),c3 binary(30),c4 binary(30)) ;") + # tdSql.query("insert into testsnode SELECT ts,avg(velocity) as mean_velocity,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet,ts interval(10m);") + + # tdSql.query("insert into testsnode(ts,c1,c2,c3,c4) SELECT ts,avg(velocity) as mean_velocity,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet,ts interval(10m);") + + + # test paitition interval fill + tdSql.query("SELECT name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0) ;") + + + # test partition interval limit (PRcore-TD-17410) + tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings partition BY name,driver,fleet interval (10m) limit 1);") + tdSql.checkRows(self.ctbNums) + + # test partition interval Pseudo time-column + tdSql.query("SELECT count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;") + + # 1 high-load: + tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name desc, ts DESC;") + + tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name ;") + + # 2 stationary-trucks + tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1)") + tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1) WHERE fleet = 'West' AND mean_velocity < 1000 partition BY name") + + # 3 long-driving-sessions + tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT _wstart as ts,name,driver,avg(velocity) as mean_velocity FROM readings where ts > '2016-01-01T00:00:34Z' AND ts <= '2016-01-01T04:00:34Z' partition BY name,driver interval(10m)) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 22 ;") + + + #4 long-daily-sessions + tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT name,driver,avg(velocity) as mean_velocity FROM readings WHERE fleet ='West' AND ts > '2016-01-01T12:31:37Z' AND ts <= '2016-01-05T12:31:37Z' partition BY name,driver interval(10m) ) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 60") + + # 5. avg-daily-driving-duration + tdSql.query("select _wstart as ts,fleet,name,driver,count(mv)/6 as hours_driven from ( select _wstart as ts,fleet,name,driver,avg(velocity) as mv from readings where ts > '2016-01-01T00:00:00Z' and ts < '2016-01-05T00:00:01Z' partition by fleet,name,driver interval(10m)) where ts > '2016-01-01T00:00:00Z' and ts < '2016-01-05T00:00:01Z' partition by fleet,name,driver interval(1d) ;") + + + # # 6. avg-daily-driving-session + # #taosc core dumped + # tdSql.execute("create table random_measure2_1 (ts timestamp,ela float, name binary(40))") + # tdSql.query("SELECT ts,diff(mv) AS difka FROM (SELECT ts,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name,ts interval(10m) fill(value,0)) GROUP BY name,ts;") + # tdSql.query("select name,diff(mv) AS difka FROM (SELECT ts,name,mv FROM (SELECT _wstart as ts,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0))) group BY name ;") + # tdSql.query("SELECT _wstart,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0)") + + # 7. avg-load + tdSql.query("SELECT fleet, model,avg(ml) AS mean_load_percentage FROM (SELECT fleet, model,current_load/load_capacity AS ml FROM diagnostics partition BY name, fleet, model) partition BY fleet, model order by fleet ;") + + # 8. daily-activity + tdSql.query(" SELECT model,ms1 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;") + + tdSql.query(" SELECT model,ms1 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;") + + tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;") + + tdSql.query("SELECT _wstart as ts,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;") + + + # 9. breakdown-frequency + # NULL ---count(NULL)=0 expect count(NULL)= 100 + tdSql.query("SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where model is null partition BY model,state_changed ") + parRows=tdSql.queryRows + assert parRows != 0 , "query result is wrong" + + + tdSql.query(" SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where state_changed =1 partition BY model,state_changed ;") + + #it's already supported: + # last-loc + tdSql.query("SELECT last_row(ts),latitude,longitude,name,driver FROM readings WHERE fleet='South' and name IS NOT NULL partition BY name,driver order by name ;") + + + #2. low-fuel + tdSql.query("SELECT last_row(ts),name,driver,fuel_state,driver FROM diagnostics WHERE fuel_state <= 0.1 AND fleet = 'South' and name IS NOT NULL GROUP BY name,driver order by name;") + + # 3. avg-vs-projected-fuel-consumption + tdSql.query("select avg(fuel_consumption) as avg_fuel_consumption,avg(nominal_fuel_consumption) as nominal_fuel_consumption from readings where velocity > 1 group by fleet") + + def restartFunc(self,func_name,threadNumbers,dnodeNumbers,mnodeNums,restartNumbers,stopRole): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'db', + 'dbNumbers': 8, + 'dropFlag': 1, + 'event': '', + 'vgroups': 2, + 'replica': 1, + 'stbName': 'stb', + 'stbNumbers': 100, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1, + } + + dnodeNumbers=int(dnodeNumbers) + mnodeNums=int(mnodeNums) + vnodeNumbers = int(dnodeNumbers-mnodeNums) + + tdSql.query("show dnodes;") + tdLog.debug(tdSql.queryResult) + clusterComCheck.checkDnodes(dnodeNumbers) + + tdLog.info("create database and stable") + tdDnodes=cluster.dnodes + stopcount =0 + threads=[] + for i in range(threadNumbers): + newTdSql=tdCom.newTdSql() + print("123") + threads.append(threading.Thread(target=func_name,args=(newTdSql,))) + print("456") + for tr in threads: + tr.start() + + tdLog.info("Take turns stopping %s "%stopRole) + while stopcount < restartNumbers: + tdLog.info(" restart loop: %d"%stopcount ) + if stopRole == "mnode": + for i in range(mnodeNums): + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + elif stopRole == "vnode": + for i in range(vnodeNumbers): + tdDnodes[i+mnodeNums].stoptaosd() + # sleep(10) + tdDnodes[i+mnodeNums].starttaosd() + # sleep(10) + elif stopRole == "dnode": + for i in range(dnodeNumbers): + tdDnodes[i].stoptaosd() + # sleep(10) + tdDnodes[i].starttaosd() + # sleep(10) + + # dnodeNumbers don't include database of schema + if clusterComCheck.checkDnodes(dnodeNumbers): + tdLog.info("check dnodes status is ready") + else: + tdLog.info("check dnodes status is not ready") + self.stopThread(threads) + tdLog.exit("one or more of dnodes failed to start ") + # self.check3mnode() + stopcount+=1 + + for tr in threads: + tr.join() + + + def run(self): + tdLog.printNoPrefix("==========step1:create database and table,insert data ==============") + self.createCluster() + self.prepareData() + queryPolicy=2 + simClientCfg="%s/taos.cfg"%tdDnodes.getSimCfgPath() + cmd='sed -i "s/^queryPolicy.*/queryPolicy 2/g" %s'%simClientCfg + os.system(cmd) + # self.tsbsIotQuery() + self.restartFunc(func_name=self.tsbsIotQuery,threadNumbers=3,dnodeNumbers=5,mnodeNums=3,restartNumbers=3,stopRole='mnode') + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/2-query/queryQnode.py b/tests/system-test/2-query/queryQnode.py index b5e2bd3328..176c7ccf3e 100644 --- a/tests/system-test/2-query/queryQnode.py +++ b/tests/system-test/2-query/queryQnode.py @@ -318,7 +318,6 @@ class TDTestCase: os.system(cmd) # tdDnodes.stop(1) # tdDnodes.start(1) - tdSql.execute("reset query cache") tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy) tdSql.query("show local variables;") for i in range(tdSql.queryRows): diff --git a/tests/system-test/2-query/tsbsQuery.py b/tests/system-test/2-query/tsbsQuery.py index ca270932b1..75b33f1a5e 100644 --- a/tests/system-test/2-query/tsbsQuery.py +++ b/tests/system-test/2-query/tsbsQuery.py @@ -1,3 +1,4 @@ +# from asyncio.windows_events import NULL import taos import sys import datetime @@ -21,38 +22,99 @@ class TDTestCase: tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), True) - def prepareData(self): - database="db_tsbs" - ts=1451606400000 - tdSql.execute(f"create database {database};") - tdSql.execute(f"use {database} ") - tdSql.execute(''' - create table readings (ts timestamp,latitude double,longitude double,elevation double,velocity double,heading double,grade double,fuel_consumption double,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30)); - ''') - tdSql.execute(''' - create table diagnostics (ts timestamp,fuel_state double,current_load double,status bigint,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30)) ; - ''') + def create_ctable(self,tsql=None, dbName='db',stbName='stb',ctbPrefix='ctb',ctbNum=1): + tsql.execute("use %s" %dbName) + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + tagValue = 'beijing' + if (i % 10 == 0): + sql += " %s%d using %s (name,fleet,driver,device_version) tags('truck_%d', 'South%d','Trish%d','v2.%d')"%(ctbPrefix,i,stbName,i,i,i,i) + else: + model = 'H-%d'%i + sql += " %s%d using %s tags('truck_%d', 'South%d','Trish%d','%s','v2.%d')"%(ctbPrefix,i,stbName,i,i,i,model,i) + if (i > 0) and (i%1000 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) + return - for i in range(10): - if i == 1 or i == 2 : - tdLog.debug(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}', NULL,'v2.3')") - tdSql.execute(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}', NULL,'v2.3')") - else : - tdSql.execute(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}','H-{i}','v2.3')") - if i == 1 or i == 2 : - tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}',NULL ,'v2.3')") - else: - tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}','H-{i}','v2.3')") - for j in range(10): - for i in range(100): - tdSql.execute( - f"insert into rct{j} values ( {ts+i*60000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )" - ) - status= random.randint(0,1) - tdSql.execute( - f"insert into dct{j} values ( {ts+i*60000}, {1+i*0.1},{1400+i*15}, {status},{1500+i*20}, {150+i*2},{5+i} )" - ) - tdSql.execute("insert into dct9 (ts,fuel_state) values('2021-07-13 14:06:33.123Z',1.2) ;") + def insertData(self,startTs,tsql=None, dbName='db',stbName='stb',ctbPrefix='ctb',ctbNum=1,rowsPerTbl=100,batchNum=1000): + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + if startTs is None: + t = time.time() + startTs = int(round(t * 1000)) + + for i in range(ctbNum): + sql += " %s%d values "%(ctbPrefix,i) + for j in range(rowsPerTbl): + if(ctbPrefix=="rct"): + sql += f"({startTs+j*60000}, {80+j}, {90+j}, {85+j}, {30+j*10}, {1.2*j}, {221+j*2}, {20+j*0.2}, {1500+j*20}, {150+j*2},{5+j}) " + elif ( ctbPrefix=="dct"): + status= random.randint(0,1) + sql += f"( {startTs+j*60000}, {1+j*0.1},{1400+j*15}, {status},{1500+j*20}, {150+j*2},{5+j} ) " + # tdLog.debug("1insert sql:%s"%sql) + if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + # tdLog.debug("2insert sql:%s"%sql) + tsql.execute(sql) + if j < rowsPerTbl - 1: + sql = "insert into %s%d values " %(ctbPrefix,i) + else: + sql = "insert into " + if sql != pre_insert: + # tdLog.debug("3insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def prepareData(self): + dbname="db_tsbs" + stabname1="readings" + stabname2="diagnostics" + ctbnamePre1="rct" + ctbnamePre2="dct" + ctbNums=40 + self.ctbNums=ctbNums + rowNUms=200 + ts=1451606400000 + tdSql.execute(f"create database {dbname};") + tdSql.execute(f"use {dbname} ") + tdSql.execute(f''' + create table {stabname1} (ts timestamp,latitude double,longitude double,elevation double,velocity double,heading double,grade double,fuel_consumption double,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30)); + ''') + tdSql.execute(f''' + create table {stabname2} (ts timestamp,fuel_state double,current_load double,status bigint,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30)) ; + ''') + self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname1,ctbPrefix=ctbnamePre1,ctbNum=ctbNums) + self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname2,ctbPrefix=ctbnamePre2,ctbNum=ctbNums) + self.insertData(tsql=tdSql,dbName=dbname,stbName=stabname1,ctbPrefix=ctbnamePre1,ctbNum=ctbNums,rowsPerTbl=rowNUms,startTs=ts,batchNum=1000) + self.insertData(tsql=tdSql,dbName=dbname,stbName=stabname2,ctbPrefix=ctbnamePre2,ctbNum=ctbNums,rowsPerTbl=rowNUms,startTs=ts,batchNum=1000) + # for i in range(ctbNum): + # if i %10 == 0 : + # # tdLog.debug(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}', NULL,'v2.3')") + # tdSql.execute(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}', NULL,'v2.3')") + # else : + # tdSql.execute(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}','H-{i}','v2.3')") + # if i %10 == 0 : + # tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}',NULL ,'v2.3')") + # else: + # tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}','H-{i}','v2.3')") + # for j in range(ctbNums): + # for i in range(rowNUms): + # tdSql.execute( + # f"insert into rct{j} values ( {ts+i*60000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )" + # ) + # status= random.randint(0,1) + # tdSql.execute( + # f"insert into dct{j} values ( {ts+i*60000}, {1+i*0.1},{1400+i*15}, {status},{1500+i*20}, {150+i*2},{5+i} )" + # ) + # tdSql.execute("insert into dct9 (ts,fuel_state) values('2021-07-13 14:06:33.123Z',1.2) ;") # def check_avg(self ,origin_query , check_query): # avg_result = tdSql.getResult(origin_query) # origin_result = tdSql.getResult(check_query) @@ -75,7 +137,6 @@ class TDTestCase: # test interval and partition tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet; ") - print(tdSql.queryResult) parRows=tdSql.queryRows tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet interval(10m); ") tdSql.checkRows(parRows) @@ -94,7 +155,7 @@ class TDTestCase: # test partition interval limit (PRcore-TD-17410) tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings partition BY name,driver,fleet interval (10m) limit 1);") - tdSql.checkRows(10) + tdSql.checkRows(self.ctbNums) # test partition interval Pseudo time-column tdSql.query("SELECT count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;") @@ -136,17 +197,27 @@ class TDTestCase: tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;") - tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;") + tdSql.query("SELECT _wstart as ts,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;") # 9. breakdown-frequency # NULL ---count(NULL)=0 expect count(NULL)= 100 tdSql.query("SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where model is null partition BY model,state_changed ") parRows=tdSql.queryRows - assert parRows != 0 , "query result is wrong" + assert parRows != 0 , "query result is wrong, query rows %d but expect > 0 " %parRows tdSql.query(" SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where state_changed =1 partition BY model,state_changed ;") + sql="select model,ctc from (SELECT model,count(state_changed) as ctc FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= 1451606400000 AND ts < 1451952001000 ) WHERE ts >= 1451606400000 AND ts < 1451952001000 partition BY model interval(10m)) partition BY model) WHERE state_changed = 1 partition BY model )where model is null;" + + # for i in range(2): + # tdSql.query("%s"%sql) + # quertR1=tdSql.queryResult + # for j in range(50): + # tdSql.query("%s"%sql) + # quertR2=tdSql.queryResult + # assert quertR1 == quertR2 , "%s != %s ,The results of multiple queries are different" %(quertR1,quertR2) + #it's already supported: # last-loc diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index a2310ea9c9..4526ff2230 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -778,7 +778,7 @@ void shellReadHistory() { taosFsyncFile(pFile); taosCloseFile(&pFile); } - pHistory->hend = pHistory->hstart; + pHistory->hstart = pHistory->hend; } void shellWriteHistory() { diff --git a/tools/shell/src/shellWebsocket.c b/tools/shell/src/shellWebsocket.c index fee2325c34..2dcab04b3f 100644 --- a/tools/shell/src/shellWebsocket.c +++ b/tools/shell/src/shellWebsocket.c @@ -33,10 +33,11 @@ int shell_conn_ws_server(bool first) { return 0; } -static int horizontalPrintWebsocket(WS_RES* wres) { +static int horizontalPrintWebsocket(WS_RES* wres, double* execute_time) { const void* data = NULL; int rows; ws_fetch_block(wres, &data, &rows); + *execute_time += (double)(ws_take_timing(wres)/1E6); if (!rows) { return 0; } @@ -72,10 +73,11 @@ static int horizontalPrintWebsocket(WS_RES* wres) { return numOfRows; } -static int verticalPrintWebsocket(WS_RES* wres) { +static int verticalPrintWebsocket(WS_RES* wres, double* pexecute_time) { int rows = 0; const void* data = NULL; ws_fetch_block(wres, &data, &rows); + *pexecute_time += (double)(ws_take_timing(wres)/1E6); if (!rows) { return 0; } @@ -112,7 +114,7 @@ static int verticalPrintWebsocket(WS_RES* wres) { return numOfRows; } -static int dumpWebsocketToFile(const char* fname, WS_RES* wres) { +static int dumpWebsocketToFile(const char* fname, WS_RES* wres, double* pexecute_time) { char fullname[PATH_MAX] = {0}; if (taosExpandDir(fname, fullname, PATH_MAX) != 0) { tstrncpy(fullname, fname, PATH_MAX); @@ -127,6 +129,7 @@ static int dumpWebsocketToFile(const char* fname, WS_RES* wres) { int rows = 0; const void* data = NULL; ws_fetch_block(wres, &data, &rows); + *pexecute_time += (double)(ws_take_timing(wres)/1E6); if (!rows) { taosCloseFile(&pFile); return 0; @@ -162,14 +165,14 @@ static int dumpWebsocketToFile(const char* fname, WS_RES* wres) { return numOfRows; } -static int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical) { +static int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical, double* pexecute_time) { int numOfRows = 0; if (fname != NULL) { - numOfRows = dumpWebsocketToFile(fname, wres); + numOfRows = dumpWebsocketToFile(fname, wres, pexecute_time); } else if (vertical) { - numOfRows = verticalPrintWebsocket(wres); + numOfRows = verticalPrintWebsocket(wres, pexecute_time); } else { - numOfRows = horizontalPrintWebsocket(wres); + numOfRows = horizontalPrintWebsocket(wres, pexecute_time); } *error_no = ws_errno(wres); return numOfRows; @@ -225,6 +228,8 @@ void shellRunSingleCommandWebsocketImp(char *command) { return; } + double execute_time = ws_take_timing(res)/1E6; + if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) { fprintf(stdout, "Database changed.\r\n\r\n"); fflush(stdout); @@ -236,22 +241,27 @@ void shellRunSingleCommandWebsocketImp(char *command) { if (ws_is_update_query(res)) { numOfRows = ws_affected_rows(res); et = taosGetTimestampUs(); - printf("Query Ok, %d of %d row(s) in database (%.6fs)\n", numOfRows, numOfRows, - (et - st)/1E6); + double total_time = (et - st)/1E3; + double net_time = total_time - (double)execute_time; + printf("Query Ok, %d of %d row(s) in database\n", numOfRows, numOfRows); + printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n", execute_time, net_time, total_time); } else { int error_no = 0; - numOfRows = shellDumpWebsocket(res, fname, &error_no, printMode); + numOfRows = shellDumpWebsocket(res, fname, &error_no, printMode, &execute_time); if (numOfRows < 0) { ws_free_result(res); return; } et = taosGetTimestampUs(); + double total_time = (et - st) / 1E3; + double net_time = total_time - execute_time; if (error_no == 0 && !shell.stop_query) { - printf("Query OK, %d row(s) in set (%.6fs)\n", numOfRows, - (et - st)/1E6); + printf("Query OK, %d row(s) in set\n", numOfRows); + printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n", execute_time, net_time, total_time); } else { printf("Query interrupted, %d row(s) in set (%.6fs)\n", numOfRows, (et - st)/1E6); + printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n", execute_time, net_time, total_time); } } printf("\n");