Merge branch '3.0' into feature/TD-11274-3.0
This commit is contained in:
commit
39dcfb5bd2
|
@ -40,6 +40,7 @@ def pre_test(){
|
|||
git reset --hard
|
||||
cd ${WKC}
|
||||
git reset --hard
|
||||
git clean -fxd
|
||||
'''
|
||||
script {
|
||||
if (env.CHANGE_TARGET == 'master') {
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
# taosws-rs
|
||||
ExternalProject_Add(taosws-rs
|
||||
GIT_REPOSITORY https://github.com/taosdata/taosws-rs.git
|
||||
GIT_TAG 9de599d
|
||||
GIT_TAG 24b199e
|
||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
|
||||
BINARY_DIR ""
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
|
|
|
@ -4,7 +4,7 @@ sidebar_label: 文档首页
|
|||
slug: /
|
||||
---
|
||||
|
||||
TDengine是一款开源、[高性能](https://www.taosdata.com/fast)、云原生的专为物联网、工业互联网、金融等优化设计的时序数据库(Time-Series Database)。同时它还带有内建的缓存、流式计算、数据订阅等系统功能,能大幅减少系统设计的复杂度,降低研发和运营成本,是一极简的时序数据处理平台。本文档是 TDengine 用户手册,主要是介绍 TDengine 的基本概念、安装、使用、功能、开发接口、运营维护、TDengine 内核设计等等,它主要是面向架构师、开发者与系统管理员的。
|
||||
TDengine是一款开源、[高性能](https://www.taosdata.com/fast)、云原生的时序数据库(Time-Series Database, TSDB), 它专为物联网、工业互联网、金融等场景优化设计。同时它还带有内建的缓存、流式计算、数据订阅等系统功能,能大幅减少系统设计的复杂度,降低研发和运营成本,是一极简的时序数据处理平台。本文档是 TDengine 用户手册,主要是介绍 TDengine 的基本概念、安装、使用、功能、开发接口、运营维护、TDengine 内核设计等等,它主要是面向架构师、开发者与系统管理员的。
|
||||
|
||||
TDengine 充分利用了时序数据的特点,提出了“一个数据采集点一张表”与“超级表”的概念,设计了创新的存储引擎,让数据的写入、查询和存储效率都得到极大的提升。为正确理解并使用TDengine, 无论如何,请您仔细阅读[基本概念](./concept)一章。
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@ title: 产品简介
|
|||
toc_max_heading_level: 2
|
||||
---
|
||||
|
||||
TDengine 是一款高性能、分布式、支持 SQL 的时序数据库 (Database),其核心代码,包括集群功能全部开源(开源协议,AGPL v3.0)。TDengine 能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。除核心的时序数据库 (Database) 功能外,TDengine 还提供[缓存](/develop/cache/)、[数据订阅](/develop/subscribe)、[流式计算](/develop/continuous-query)等大数据平台所需要的系列功能,最大程度减少研发和运维的复杂度。
|
||||
TDengine 是一款开源、高性能、云原生的时序数据库 (Time-Series Database, TSDB)。TDengine 能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。除核心的时序数据库功能外,TDengine 还提供[缓存](/develop/cache/)、[数据订阅](/develop/subscribe)、[流式计算](/develop/continuous-query)等功能,是一极简的时序数据处理平台,最大程度的减小系统设计的复杂度,降低研发和运营成本。
|
||||
|
||||
本章节介绍TDengine的主要功能、竞争优势、适用场景、与其他数据库的对比测试等等,让大家对TDengine有个整体的了解。
|
||||
|
||||
|
@ -16,7 +16,7 @@ TDengine的主要功能如下:
|
|||
3. 支持[各种查询](/develop/query-data),包括聚合查询、嵌套查询、降采样查询、插值等
|
||||
4. 支持[用户自定义函数](/develop/udf)
|
||||
5. 支持[缓存](/develop/cache),将每张表的最后一条记录缓存起来,这样无需 Redis
|
||||
6. 支持[连续查询](/develop/continuous-query)(Continuous Query)
|
||||
6. 支持[流式计算](/develop/continuous-query)(Stream Processing)
|
||||
7. 支持[数据订阅](/develop/subscribe),而且可以指定过滤条件
|
||||
8. 支持[集群](/cluster/),可以通过多节点进行水平扩展,并通过多副本实现高可靠
|
||||
9. 提供[命令行程序](/reference/taos-shell),便于管理集群,检查系统状态,做即席查询
|
||||
|
@ -33,28 +33,24 @@ TDengine的主要功能如下:
|
|||
|
||||
由于 TDengine 充分利用了[时序数据特点](https://www.taosdata.com/blog/2019/07/09/105.html),比如结构化、无需事务、很少删除或更新、写多读少等等,设计了全新的针对时序数据的存储引擎和计算引擎,因此与其他时序数据库相比,TDengine 有以下特点:
|
||||
|
||||
- **[高性能](https://www.taosdata.com/fast)**:通过创新的存储引擎设计,无论是数据写入还是查询,TDengine 的性能比通用数据库快 10 倍以上,也远超其他时序数据库,而且存储空间也大为节省。
|
||||
- **高性能**:通过创新的存储引擎设计,无论是数据写入还是查询,TDengine 的性能比通用数据库快 10 倍以上,也远超其他时序数据库,存储空间不及通用数据库的1/10。
|
||||
|
||||
- **[分布式](https://www.taosdata.com/scalable)**:通过原生分布式的设计,TDengine 提供了水平扩展的能力,只需要增加节点就能获得更强的数据处理能力,同时通过多副本机制保证了系统的高可用。
|
||||
- **云原生**:通过原生分布式的设计,充分利用云平台的优势,TDengine 提供了水平扩展能力,具备弹性、韧性和可观测性,支持k8s部署,可运行在公有云、私有云和混合云上。
|
||||
|
||||
- **[支持 SQL](https://www.taosdata.com/sql-support)**:TDengine 采用 SQL 作为数据查询语言,减少学习和迁移成本,同时提供 SQL 扩展来处理时序数据特有的分析,而且支持方便灵活的 schemaless 数据写入。
|
||||
- **极简时序数据平台**:TDengine 内建消息队列、缓存、流式计算等功能,应用无需再集成 Kafka/Redis/HBase/Spark 等软件,大幅降低系统的复杂度,降低应用开发和运营成本。
|
||||
|
||||
- **All in One**:将数据库、消息队列、缓存、流式计算等功能融合一起,应用无需再集成 Kafka/Redis/HBase/Spark 等软件,大幅降低应用开发和维护成本。
|
||||
- **分析能力**:支持 SQL,同时为时序数据特有的分析提供SQL扩展。通过超级表、存储计算分离、分区分片、预计算、自定义函数等技术,TDengine 具备强大的分析能力。
|
||||
|
||||
- **零管理**:安装、集群几秒搞定,无任何依赖,不用分库分表,系统运行状态监测能与 Grafana 或其他运维工具无缝集成。
|
||||
- **简单易用**:无任何依赖,安装、集群几秒搞定;提供REST以及各种语言连接器,与众多第三方工具无缝集成;提供命令行程序,便于管理和即席查询;提供各种运维工具。
|
||||
|
||||
- **零学习成本**:采用 SQL 查询语言,支持 C/C++、Python、Java、Go、Rust、Node.js、C#、Lua(社区贡献)、PHP(社区贡献) 等多种编程语言,与 MySQL 相似,零学习成本。
|
||||
|
||||
- **无缝集成**:不用一行代码,即可与 Telegraf、Grafana、Prometheus、EMQX、HiveMQ、StatsD、collectd、icinga、TCollector、Matlab、R 等第三方工具无缝集成。
|
||||
|
||||
- **互动 Console**: 通过命令行 console,不用编程,执行 SQL 语句就能做即席查询、各种数据库的操作、管理以及集群的维护.
|
||||
- **核心开源**:TDengine 的核心代码包括集群功能全部开源,截止到2022年8月1日,全球超过 135.9k 个运行实例,GitHub Star 18.7k,Fork 4.4k,社区活跃。
|
||||
|
||||
采用 TDengine,可将典型的物联网、车联网、工业互联网大数据平台的总拥有成本大幅降低。表现在几个方面:
|
||||
|
||||
1. 由于其超强性能,它能将系统需要的计算资源和存储资源大幅降低
|
||||
2. 因为采用 SQL 接口,能与众多第三方软件无缝集成,学习迁移成本大幅下降
|
||||
3. 因为其 All In One 的特性,系统复杂度降低,能降研发成本
|
||||
4. 因为运维维护简单,运营维护成本能大幅降低
|
||||
2. 因为支持 SQL,能与众多第三方软件无缝集成,学习迁移成本大幅下降
|
||||
3. 因为是一极简的时序数据平台,系统复杂度、研发和运营成本大幅降低
|
||||
4. 因为维护简单,运营维护成本能大幅降低
|
||||
|
||||
## 技术生态
|
||||
|
||||
|
|
|
@ -184,6 +184,7 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
|
|||
int32_t getJsonValueLen(const char* data);
|
||||
|
||||
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
|
||||
int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows);
|
||||
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
|
||||
const SColumnInfoData* pSource, int32_t numOfRow2);
|
||||
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
|
||||
|
|
|
@ -22,6 +22,9 @@ extern "C" {
|
|||
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#ifdef GRANTS_CFG
|
||||
#include "tgrantCfg.h"
|
||||
#endif
|
||||
|
||||
typedef enum {
|
||||
TSDB_GRANT_ALL,
|
||||
|
@ -37,10 +40,38 @@ typedef enum {
|
|||
TSDB_GRANT_CONNS,
|
||||
TSDB_GRANT_STREAMS,
|
||||
TSDB_GRANT_CPU_CORES,
|
||||
TSDB_GRANT_STABLE,
|
||||
TSDB_GRANT_TABLE,
|
||||
} EGrantType;
|
||||
|
||||
int32_t grantCheck(EGrantType grant);
|
||||
|
||||
#ifndef GRANTS_CFG
|
||||
#define GRANTS_SCHEMA static const SSysDbTableSchema grantsSchema[] = { \
|
||||
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||
{.name = "expire time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||
{.name = "storage(GB)", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||
{.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||
{.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||
{.name = "accounts", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||
{.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||
{.name = "cpu cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||
{.name = "speed(PPS)", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||
{.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||
}
|
||||
#define GRANT_CFG_ADD
|
||||
#define GRANT_CFG_SET
|
||||
#define GRANT_CFG_GET
|
||||
#define GRANT_CFG_CHECK
|
||||
#define GRANT_CFG_SKIP
|
||||
#define GRANT_CFG_DECLARE
|
||||
#define GRANT_CFG_EXTERN
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -28,10 +28,10 @@ extern bool gRaftDetailLog;
|
|||
|
||||
#define SYNC_RESP_TTL_MS 10000000
|
||||
|
||||
#define SYNC_MAX_BATCH_SIZE 500
|
||||
#define SYNC_INDEX_BEGIN 0
|
||||
#define SYNC_INDEX_INVALID -1
|
||||
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
|
||||
#define SYNC_MAX_BATCH_SIZE 1
|
||||
#define SYNC_INDEX_BEGIN 0
|
||||
#define SYNC_INDEX_INVALID -1
|
||||
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
|
||||
|
||||
typedef enum {
|
||||
SYNC_STRATEGY_NO_SNAPSHOT = 0,
|
||||
|
|
|
@ -406,6 +406,8 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_GRANT_STORAGE_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0809)
|
||||
#define TSDB_CODE_GRANT_QUERYTIME_LIMITED TAOS_DEF_ERROR_CODE(0, 0x080A)
|
||||
#define TSDB_CODE_GRANT_CPU_LIMITED TAOS_DEF_ERROR_CODE(0, 0x080B)
|
||||
#define TSDB_CODE_GRANT_STABLE_LIMITED TAOS_DEF_ERROR_CODE(0, 0x080C)
|
||||
#define TSDB_CODE_GRANT_TABLE_LIMITED TAOS_DEF_ERROR_CODE(0, 0x080D)
|
||||
|
||||
// sync
|
||||
#define TSDB_CODE_SYN_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0903)
|
||||
|
|
|
@ -1,14 +1,24 @@
|
|||
aux_source_directory(src COMMON_SRC)
|
||||
add_library(common STATIC ${COMMON_SRC})
|
||||
|
||||
if (DEFINED GRANT_CFG_INCLUDE_DIR)
|
||||
add_definitions(-DGRANTS_CFG)
|
||||
endif()
|
||||
|
||||
target_include_directories(
|
||||
common
|
||||
PUBLIC "${TD_SOURCE_DIR}/include/common"
|
||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||
IF(${TD_WINDOWS})
|
||||
PRIVATE "${TD_SOURCE_DIR}/contrib/pthread"
|
||||
PRIVATE "${TD_SOURCE_DIR}/contrib/msvcregex"
|
||||
ENDIF ()
|
||||
PRIVATE "${GRANT_CFG_INCLUDE_DIR}"
|
||||
)
|
||||
IF(${TD_WINDOWS})
|
||||
target_include_directories(
|
||||
common
|
||||
PRIVATE "${TD_SOURCE_DIR}/contrib/pthread"
|
||||
PRIVATE "${TD_SOURCE_DIR}/contrib/msvcregex"
|
||||
)
|
||||
ENDIF ()
|
||||
|
||||
target_link_libraries(
|
||||
common
|
||||
PUBLIC os
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "taos.h"
|
||||
#include "tdef.h"
|
||||
#include "types.h"
|
||||
#include "tgrant.h"
|
||||
|
||||
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||
|
@ -188,22 +189,7 @@ static const SSysDbTableSchema userUsersSchema[] = {
|
|||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||
};
|
||||
|
||||
static const SSysDbTableSchema grantsSchema[] = {
|
||||
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "expire time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "storage(GB)", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "accounts", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "cpu cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "speed(PPS)", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
};
|
||||
GRANTS_SCHEMA;
|
||||
|
||||
static const SSysDbTableSchema vgroupsSchema[] = {
|
||||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
|
|
|
@ -118,6 +118,76 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
|
||||
if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pColumnInfoData->varmeta.allocLen >= newSize) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pColumnInfoData->varmeta.allocLen < newSize) {
|
||||
char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
|
||||
if (buf == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pColumnInfoData->pData = buf;
|
||||
pColumnInfoData->varmeta.allocLen = newSize;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData, int32_t itemLen, int32_t numOfRows) {
|
||||
ASSERT(pColumnInfoData->info.bytes >= itemLen);
|
||||
size_t start = 1;
|
||||
|
||||
// the first item
|
||||
memcpy(pColumnInfoData->pData, pData, itemLen);
|
||||
|
||||
int32_t t = 0;
|
||||
int32_t count = log(numOfRows)/log(2);
|
||||
while(t < count) {
|
||||
int32_t xlen = 1 << t;
|
||||
memcpy(pColumnInfoData->pData + start * itemLen + pColumnInfoData->varmeta.length, pColumnInfoData->pData, xlen * itemLen);
|
||||
t += 1;
|
||||
start += xlen;
|
||||
}
|
||||
|
||||
// the tail part
|
||||
if (numOfRows > start) {
|
||||
memcpy(pColumnInfoData->pData + start * itemLen + currentRow * itemLen, pColumnInfoData->pData, (numOfRows - start) * itemLen);
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||
for(int32_t i = 0; i < numOfRows; ++i) {
|
||||
pColumnInfoData->varmeta.offset[i + currentRow] = pColumnInfoData->varmeta.length + i * itemLen;
|
||||
}
|
||||
|
||||
pColumnInfoData->varmeta.length += numOfRows * itemLen;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows) {
|
||||
ASSERT(pData != NULL && pColumnInfoData != NULL);
|
||||
|
||||
int32_t len = pColumnInfoData->info.bytes;
|
||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||
len = varDataTLen(pData);
|
||||
if (pColumnInfoData->varmeta.allocLen < (numOfRows + currentRow) * len) {
|
||||
int32_t code = colDataReserve(pColumnInfoData, (numOfRows + currentRow) * len);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
|
||||
int32_t numOfRow2) {
|
||||
if (numOfRow2 <= 0) return;
|
||||
|
@ -1113,15 +1183,12 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
|
|||
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
|
||||
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
|
||||
pColumn->varmeta.length = 0;
|
||||
if (pColumn->varmeta.offset > 0) {
|
||||
if (pColumn->varmeta.offset != NULL) {
|
||||
memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows);
|
||||
}
|
||||
} else {
|
||||
if (pColumn->nullbitmap != NULL) {
|
||||
memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
|
||||
if (pColumn->pData != NULL) {
|
||||
memset(pColumn->pData, 0, pColumn->info.bytes * numOfRows);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1725,6 +1792,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
|||
}
|
||||
switch (pColInfoData->info.type) {
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
memset(pBuf, 0, sizeof(pBuf));
|
||||
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
|
||||
len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf);
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
|
@ -1753,6 +1821,26 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
|||
len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_VARCHAR: {
|
||||
memset(pBuf, 0, sizeof(pBuf));
|
||||
char* pData = colDataGetVarData(pColInfoData, j);
|
||||
int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
|
||||
memcpy(pBuf, varDataVal(pData), dataSize);
|
||||
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
} break;
|
||||
case TSDB_DATA_TYPE_NCHAR: {
|
||||
char* pData = colDataGetVarData(pColInfoData, j);
|
||||
int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
|
||||
memset(pBuf, 0, sizeof(pBuf));
|
||||
taosUcs4ToMbs((TdUcs4 *)varDataVal(pData), dataSize, pBuf);
|
||||
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
} break;
|
||||
}
|
||||
}
|
||||
len += snprintf(dumpBuf + len, size - len, "\n");
|
||||
|
|
|
@ -19,6 +19,9 @@
|
|||
#include "tconfig.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tlog.h"
|
||||
#include "tgrant.h"
|
||||
|
||||
GRANT_CFG_DECLARE;
|
||||
|
||||
SConfig *tsCfg = NULL;
|
||||
|
||||
|
@ -441,6 +444,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 100000, 1) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
|
||||
GRANT_CFG_ADD;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -590,7 +594,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
if (tsQueryBufferSize >= 0) {
|
||||
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
|
||||
}
|
||||
|
||||
GRANT_CFG_GET;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -603,6 +607,7 @@ void taosLocalCfgForbiddenToChange(char* name, bool* forbidden) {
|
|||
*forbidden = true;
|
||||
return;
|
||||
}
|
||||
GRANT_CFG_CHECK;
|
||||
|
||||
*forbidden = false;
|
||||
}
|
||||
|
|
|
@ -1,8 +1,12 @@
|
|||
aux_source_directory(src MGMT_DNODE)
|
||||
add_library(mgmt_dnode STATIC ${MGMT_DNODE})
|
||||
if (DEFINED GRANT_CFG_INCLUDE_DIR)
|
||||
add_definitions(-DGRANTS_CFG)
|
||||
endif()
|
||||
target_include_directories(
|
||||
mgmt_dnode
|
||||
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||
PUBLIC "${GRANT_CFG_INCLUDE_DIR}"
|
||||
)
|
||||
target_link_libraries(
|
||||
mgmt_dnode node_util
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "dmInt.h"
|
||||
#include "systable.h"
|
||||
#include "tgrant.h"
|
||||
|
||||
extern SConfig *tsCfg;
|
||||
|
||||
|
@ -223,6 +224,7 @@ int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
|
|||
|
||||
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
|
||||
SConfigItem *pItem = taosArrayGet(tsCfg->array, i);
|
||||
GRANT_CFG_SKIP;
|
||||
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
|
||||
colDataAppend(pColInfo, i, (const char *)&dnodeId, false);
|
||||
|
|
|
@ -146,7 +146,8 @@ static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanI
|
|||
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
|
||||
STsdbReader* pReader);
|
||||
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
|
||||
static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex);
|
||||
static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
|
||||
int32_t rowIndex);
|
||||
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
|
||||
static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
|
||||
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
|
||||
|
@ -758,7 +759,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
|||
pReader->cost.blockLoadTime += elapsedTime;
|
||||
|
||||
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
|
||||
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
||||
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
||||
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
|
||||
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
|
||||
pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
|
||||
|
@ -1444,9 +1445,9 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
|||
// 2. the direct next point is not an duplicated timestamp
|
||||
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
|
||||
(pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
|
||||
int32_t step = pReader->order == TSDB_ORDER_ASC? 1:-1;
|
||||
int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
|
||||
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
|
||||
if (nextKey != key) { // merge is not needed
|
||||
if (nextKey != key) { // merge is not needed
|
||||
doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
|
||||
pDumpInfo->rowIndex += step;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2134,15 +2135,18 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe
|
|||
|
||||
int32_t sversion = TSDBROW_SVERSION(pRow);
|
||||
STSchema* pTSchema = NULL;
|
||||
if (sversion != pReader->pSchema->version) {
|
||||
if (pReader->pSchema == NULL || sversion != pReader->pSchema->version) {
|
||||
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema);
|
||||
if (pReader->pSchema == NULL) {
|
||||
pReader->pSchema = pTSchema;
|
||||
}
|
||||
} else {
|
||||
pTSchema = pReader->pSchema;
|
||||
}
|
||||
|
||||
tRowMergerAdd(pMerger, pRow, pTSchema);
|
||||
|
||||
if (sversion != pReader->pSchema->version) {
|
||||
if (pTSchema != pReader->pSchema) {
|
||||
taosMemoryFree(pTSchema);
|
||||
}
|
||||
}
|
||||
|
@ -2230,7 +2234,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
|||
int32_t step = asc ? 1 : -1;
|
||||
|
||||
pDumpInfo->rowIndex += step;
|
||||
if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) ||(pDumpInfo->rowIndex >= 0 && !asc)) {
|
||||
if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
|
||||
pDumpInfo->rowIndex =
|
||||
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
|
||||
}
|
||||
|
@ -2271,8 +2275,11 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe
|
|||
// updateSchema(pRow, uid, pReader);
|
||||
int32_t sversion = TSDBROW_SVERSION(pRow);
|
||||
STSchema* pTSchema = NULL;
|
||||
if (sversion != pReader->pSchema->version) {
|
||||
if (pReader->pSchema == NULL || sversion != pReader->pSchema->version) {
|
||||
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema);
|
||||
if (pReader->pSchema == NULL) {
|
||||
pReader->pSchema = pTSchema;
|
||||
}
|
||||
} else {
|
||||
pTSchema = pReader->pSchema;
|
||||
}
|
||||
|
@ -2282,7 +2289,7 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe
|
|||
tRowMergerGetRow(&merge, pTSRow);
|
||||
tRowMergerClear(&merge);
|
||||
|
||||
if (sversion != pReader->pSchema->version) {
|
||||
if (pTSchema != pReader->pSchema) {
|
||||
taosMemoryFree(pTSchema);
|
||||
}
|
||||
}
|
||||
|
@ -2425,9 +2432,9 @@ int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBloc
|
|||
int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx);
|
||||
int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);
|
||||
|
||||
while(i < numOfOutputCols && j < numOfInputCols) {
|
||||
while (i < numOfOutputCols && j < numOfInputCols) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
|
||||
SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
|
||||
SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
|
||||
|
||||
if (pData->cid == pCol->info.colId) {
|
||||
tColDataGetValue(pData, rowIndex, &cv);
|
||||
|
|
|
@ -695,7 +695,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
|||
SSyncInfo syncInfo = {
|
||||
.snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
|
||||
//.snapshotStrategy = SYNC_STRATEGY_NO_SNAPSHOT,
|
||||
.batchSize = 10,
|
||||
.batchSize = 1,
|
||||
.vgId = pVnode->config.vgId,
|
||||
.isStandBy = pVnode->config.standby,
|
||||
.syncCfg = pVnode->config.syncCfg,
|
||||
|
|
|
@ -416,6 +416,7 @@ typedef struct SCtgCacheOperation {
|
|||
bool syncOp;
|
||||
tsem_t rspSem;
|
||||
bool stopQueue;
|
||||
bool unLocked;
|
||||
} SCtgCacheOperation;
|
||||
|
||||
typedef struct SCtgQNode {
|
||||
|
|
|
@ -674,7 +674,13 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
|
|||
tsem_post(&gCtgMgmt.queue.reqSem);
|
||||
|
||||
if (syncOp) {
|
||||
if (!operation->unLocked) {
|
||||
CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
|
||||
}
|
||||
tsem_wait(&operation->rspSem);
|
||||
if (!operation->unLocked) {
|
||||
CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
|
||||
}
|
||||
taosMemoryFree(operation);
|
||||
}
|
||||
|
||||
|
@ -1011,6 +1017,7 @@ int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool
|
|||
op->opId = CTG_OP_CLEAR_CACHE;
|
||||
op->syncOp = syncOp;
|
||||
op->stopQueue = stopQueue;
|
||||
op->unLocked = true;
|
||||
|
||||
SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
|
||||
if (NULL == msg) {
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#include "catalogInt.h"
|
||||
|
||||
extern SCatalogMgmt gCtgMgmt;
|
||||
SCtgDebug gCTGDebug = {0};
|
||||
SCtgDebug gCTGDebug = {.lockEnable = true};
|
||||
|
||||
void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
|
||||
ASSERT(*(int32_t*)param == 1);
|
||||
|
|
|
@ -331,7 +331,7 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
|
|||
SHashObj* pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (NULL == pBatchs) {
|
||||
ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
pTask->pBatchs = pBatchs;
|
||||
#endif
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "scheduler.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tglobal.h"
|
||||
#include "tgrant.h"
|
||||
|
||||
extern SConfig* tsCfg;
|
||||
|
||||
|
@ -563,7 +564,7 @@ int32_t setLocalVariablesResultIntoDataBlock(SSDataBlock* pBlock) {
|
|||
|
||||
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
|
||||
SConfigItem* pItem = taosArrayGet(tsCfg->array, i);
|
||||
|
||||
GRANT_CFG_SKIP;
|
||||
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
|
||||
|
|
|
@ -917,9 +917,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
|
||||
SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
|
||||
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
|
||||
|
|
|
@ -571,6 +571,10 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
}
|
||||
|
||||
*numOfExprs = numOfFuncs + numOfGroupKeys;
|
||||
if (*numOfExprs == 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
|
||||
|
||||
for (int32_t i = 0; i < (*numOfExprs); ++i) {
|
||||
|
|
|
@ -4171,16 +4171,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
|
||||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||
|
||||
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
|
||||
.calTrigger = pSessionNode->window.triggerType};
|
||||
|
||||
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
||||
|
||||
pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
|
||||
pPhyNode->pConditions, pTaskInfo);
|
||||
pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
|
||||
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
|
||||
|
|
|
@ -405,9 +405,15 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
|
|||
data = (char*)p;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
colDataAppend(pColInfoData, i, data,
|
||||
(data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
|
||||
bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
|
||||
if (isNullVal) {
|
||||
colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
|
||||
} else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
|
||||
colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
|
||||
} else { // todo opt for json tag
|
||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
colDataAppend(pColInfoData, i, data, false);
|
||||
}
|
||||
}
|
||||
|
||||
if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&
|
||||
|
|
|
@ -152,7 +152,7 @@ SSDataBlock* loadNextDataBlock(void* param) {
|
|||
void applyScalarFunction(SSDataBlock* pBlock, void* param) {
|
||||
SOperatorInfo* pOperator = param;
|
||||
SSortOperatorInfo* pSort = pOperator->info;
|
||||
if (pOperator->exprSupp.pExprInfo != NULL) {
|
||||
if (pOperator->exprSupp.pExprInfo != NULL && pOperator->exprSupp.numOfExprs > 0) {
|
||||
int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx,
|
||||
pOperator->exprSupp.numOfExprs, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -2439,12 +2439,14 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
|
||||
cleanupBasicInfo(&pInfo->binfo);
|
||||
|
||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, SNode* pCondition,
|
||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
@ -2455,6 +2457,10 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
|||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc);
|
||||
|
||||
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -2462,16 +2468,19 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
|||
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
|
||||
pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
|
||||
pInfo->gap = pSessionNode->gap;
|
||||
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
pInfo->tsSlotId = tsSlotId;
|
||||
pInfo->gap = gap;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->winSup.prevTs = INT64_MIN;
|
||||
pInfo->reptScan = false;
|
||||
pInfo->pCondition = pCondition;
|
||||
pInfo->reptScan = false;
|
||||
pInfo->pCondition = pSessionNode->window.node.pConditions;
|
||||
|
||||
pOperator->name = "SessionWindowAggOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
|
||||
pOperator->blocking = true;
|
||||
|
|
|
@ -81,10 +81,8 @@ static int32_t addNamespace(STranslateContext* pCxt, void* pTable) {
|
|||
SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel);
|
||||
taosArrayPush(pTables, &pTable);
|
||||
if (hasSameTableAlias(pTables)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf,
|
||||
TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS,
|
||||
"Not unique table/alias: '%s'",
|
||||
((STableNode*)pTable)->tableAlias);
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS,
|
||||
"Not unique table/alias: '%s'", ((STableNode*)pTable)->tableAlias);
|
||||
}
|
||||
} else {
|
||||
do {
|
||||
|
@ -92,10 +90,8 @@ static int32_t addNamespace(STranslateContext* pCxt, void* pTable) {
|
|||
if (pCxt->currLevel == currTotalLevel) {
|
||||
taosArrayPush(pTables, &pTable);
|
||||
if (hasSameTableAlias(pTables)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf,
|
||||
TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS,
|
||||
"Not unique table/alias: '%s'",
|
||||
((STableNode*)pTable)->tableAlias);
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS,
|
||||
"Not unique table/alias: '%s'", ((STableNode*)pTable)->tableAlias);
|
||||
}
|
||||
}
|
||||
taosArrayPush(pCxt->pNsLevel, &pTables);
|
||||
|
@ -1587,6 +1583,7 @@ static EDealRes rewriteExprToGroupKeyFunc(STranslateContext* pCxt, SNode** pNode
|
|||
|
||||
strcpy(pFunc->functionName, "_group_key");
|
||||
strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName);
|
||||
strcpy(pFunc->node.userAlias, ((SExprNode*)*pNode)->userAlias);
|
||||
pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode);
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||
*pNode = (SNode*)pFunc;
|
||||
|
@ -2644,27 +2641,6 @@ static int32_t appendTsForImplicitTsFunc(STranslateContext* pCxt, SSelectStmt* p
|
|||
return pCxt->errCode;
|
||||
}
|
||||
|
||||
typedef struct SRwriteUniqueCxt {
|
||||
STranslateContext* pTranslateCxt;
|
||||
SNode* pExpr;
|
||||
} SRwriteUniqueCxt;
|
||||
|
||||
static EDealRes rewriteSeletcValueFunc(STranslateContext* pCxt, SNode** pNode) {
|
||||
SFunctionNode* pFirst = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pFirst) {
|
||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
strcpy(pFirst->functionName, "first");
|
||||
TSWAP(pFirst->pParameterList, ((SFunctionNode*)*pNode)->pParameterList);
|
||||
strcpy(pFirst->node.aliasName, ((SExprNode*)*pNode)->aliasName);
|
||||
nodesDestroyNode(*pNode);
|
||||
*pNode = (SNode*)pFirst;
|
||||
pCxt->errCode = fmGetFuncInfo(pFirst, pCxt->msgBuf.buf, pCxt->msgBuf.len);
|
||||
((SSelectStmt*)pCxt->pCurrStmt)->hasAggFuncs = true;
|
||||
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR;
|
||||
}
|
||||
|
||||
typedef struct SReplaceOrderByAliasCxt {
|
||||
STranslateContext* pTranslateCxt;
|
||||
SNodeList* pProjectionList;
|
||||
|
|
|
@ -1727,16 +1727,11 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO
|
|||
|
||||
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
metaGetTableNameByUid(pInput->param, uid, str);
|
||||
|
||||
for(int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||
colDataAppend(pOutput->columnData, pOutput->numOfRows + i, str, false);
|
||||
}
|
||||
|
||||
colDataAppendNItems(pOutput->columnData, pOutput->numOfRows, str, pInput->numOfRows);
|
||||
pOutput->numOfRows += pInput->numOfRows;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/** Aggregation functions **/
|
||||
int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
SColumnInfoData *pInputData = pInput->columnData;
|
||||
|
|
|
@ -658,7 +658,6 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
|||
QUEUE_REMOVE(&conn->q);
|
||||
QUEUE_INIT(&conn->q);
|
||||
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||
transDestroyBuffer(&conn->readBuf);
|
||||
|
||||
conn->refId = -1;
|
||||
if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
|
||||
|
@ -685,7 +684,7 @@ static void cliDestroy(uv_handle_t* handle) {
|
|||
transQueueDestroy(&conn->cliMsgs);
|
||||
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||
transReqQueueClear(&conn->wreqQueue);
|
||||
|
||||
transDestroyBuffer(&conn->readBuf);
|
||||
taosMemoryFree(conn);
|
||||
}
|
||||
static bool cliHandleNoResp(SCliConn* conn) {
|
||||
|
|
|
@ -830,7 +830,6 @@ static void destroyConn(SSvrConn* conn, bool clear) {
|
|||
return;
|
||||
}
|
||||
|
||||
transDestroyBuffer(&conn->readBuf);
|
||||
if (clear) {
|
||||
if (!uv_is_closing((uv_handle_t*)conn->pTcp)) {
|
||||
tTrace("conn %p to be destroyed", conn);
|
||||
|
@ -881,6 +880,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
|||
QUEUE_REMOVE(&conn->queue);
|
||||
taosMemoryFree(conn->pTcp);
|
||||
destroyConnRegArg(conn);
|
||||
transDestroyBuffer(&conn->readBuf);
|
||||
taosMemoryFree(conn);
|
||||
|
||||
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
|
||||
|
|
|
@ -31,6 +31,7 @@ void swapStr(char* j, char* J, int width) {
|
|||
}
|
||||
#endif
|
||||
|
||||
// todo refactor: 1) move away; 2) use merge sort instead; 3) qsort is not a stable sort actually.
|
||||
void taosSort(void* arr, int64_t sz, int64_t width, __compar_fn_t compar) {
|
||||
#ifdef WINDOWS
|
||||
int64_t i, j;
|
||||
|
|
|
@ -1,10 +1,15 @@
|
|||
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/src/version.c.in" "${CMAKE_CURRENT_SOURCE_DIR}/src/version.c")
|
||||
aux_source_directory(src UTIL_SRC)
|
||||
add_library(util STATIC ${UTIL_SRC})
|
||||
if (DEFINED GRANT_CFG_INCLUDE_DIR)
|
||||
add_definitions(-DGRANTS_CFG)
|
||||
endif()
|
||||
target_include_directories(
|
||||
util
|
||||
PUBLIC "${TD_SOURCE_DIR}/include/util"
|
||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||
PRIVATE "${TD_SOURCE_DIR}/include/common"
|
||||
PRIVATE "${GRANT_CFG_INCLUDE_DIR}"
|
||||
)
|
||||
target_link_libraries(
|
||||
util
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "tenv.h"
|
||||
#include "cJSON.h"
|
||||
#include "tjson.h"
|
||||
#include "tgrant.h"
|
||||
|
||||
#define CFG_NAME_PRINT_LEN 24
|
||||
#define CFG_SRC_PRINT_LEN 12
|
||||
|
@ -301,6 +302,7 @@ static int32_t cfgSetTfsItem(SConfig *pCfg, const char *name, const char *value,
|
|||
}
|
||||
|
||||
int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype) {
|
||||
GRANT_CFG_SET;
|
||||
SConfigItem *pItem = cfgGetItem(pCfg, name);
|
||||
if (pItem == NULL) {
|
||||
return -1;
|
||||
|
|
|
@ -0,0 +1,290 @@
|
|||
# from asyncio.windows_events import NULL
|
||||
import taos
|
||||
import sys
|
||||
import datetime
|
||||
import inspect
|
||||
import random
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.cluster import *
|
||||
from util.common import *
|
||||
|
||||
sys.path.append("./6-cluster/")
|
||||
from clusterCommonCreate import *
|
||||
from clusterCommonCheck import clusterComCheck
|
||||
|
||||
import threading
|
||||
|
||||
class TDTestCase:
|
||||
|
||||
clientCfgDict = {'queryproxy': '1','debugFlag': 135}
|
||||
clientCfgDict["debugFlag"] = 131
|
||||
updatecfgDict = {'clientCfg': {}}
|
||||
updatecfgDict = {'debugFlag': 131}
|
||||
updatecfgDict = {'keepColumnName': 1}
|
||||
updatecfgDict["clientCfg"] = clientCfgDict
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), True)
|
||||
|
||||
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1):
|
||||
tsql.execute("use %s" %dbName)
|
||||
pre_create = "create table"
|
||||
sql = pre_create
|
||||
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
|
||||
for i in range(ctbNum):
|
||||
tagValue = 'beijing'
|
||||
if (i % 10 == 0):
|
||||
sql += " %s%d using %s (name,fleet,driver,device_version) tags('truck_%d', 'South%d','Trish%d','v2.%d')"%(ctbPrefix,i,stbName,i,i,i,i)
|
||||
else:
|
||||
model = 'H-%d'%i
|
||||
sql += " %s%d using %s tags('truck_%d', 'South%d','Trish%d','%s','v2.%d')"%(ctbPrefix,i,stbName,i,i,i,model,i)
|
||||
if (i > 0) and (i%1000 == 0):
|
||||
tsql.execute(sql)
|
||||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
|
||||
return
|
||||
|
||||
def prepareData(self):
|
||||
dbname="db_tsbs"
|
||||
stabname1="readings"
|
||||
stabname2="diagnostics"
|
||||
ctbnamePre1="rct"
|
||||
ctbnamePre2="dct"
|
||||
ctbNums=40
|
||||
self.ctbNums=ctbNums
|
||||
rowNUms=100
|
||||
ts=1451606400000
|
||||
tdSql.execute(f"create database {dbname};")
|
||||
tdSql.execute(f"use {dbname} ")
|
||||
tdSql.execute(f'''
|
||||
create table {stabname1} (ts timestamp,latitude double,longitude double,elevation double,velocity double,heading double,grade double,fuel_consumption double,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30));
|
||||
''')
|
||||
tdSql.execute(f'''
|
||||
create table {stabname2} (ts timestamp,fuel_state double,current_load double,status bigint,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30)) ;
|
||||
''')
|
||||
self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname1,ctbPrefix=ctbnamePre1,ctbNum=ctbNums)
|
||||
self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname2,ctbPrefix=ctbnamePre2,ctbNum=ctbNums)
|
||||
|
||||
|
||||
for j in range(ctbNums):
|
||||
for i in range(rowNUms):
|
||||
tdSql.execute(
|
||||
f"insert into rct{j} values ( {ts+i*60000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )"
|
||||
)
|
||||
status= random.randint(0,1)
|
||||
tdSql.execute(
|
||||
f"insert into dct{j} values ( {ts+i*60000}, {1+i*0.1},{1400+i*15}, {status},{1500+i*20}, {150+i*2},{5+i} )"
|
||||
)
|
||||
tdSql.execute("insert into dct9 (ts,fuel_state) values('2021-07-13 14:06:33.123Z',1.2) ;")
|
||||
# def check_avg(self ,origin_query , check_query):
|
||||
# avg_result = tdSql.getResult(origin_query)
|
||||
# origin_result = tdSql.getResult(check_query)
|
||||
|
||||
# check_status = True
|
||||
# for row_index , row in enumerate(avg_result):
|
||||
# for col_index , elem in enumerate(row):
|
||||
# if avg_result[row_index][col_index] != origin_result[row_index][col_index]:
|
||||
# check_status = False
|
||||
# if not check_status:
|
||||
# tdLog.notice("avg function value has not as expected , sql is \"%s\" "%origin_query )
|
||||
# sys.exit(1)
|
||||
# else:
|
||||
# tdLog.info("avg value check pass , it work as expected ,sql is \"%s\" "%check_query )
|
||||
|
||||
|
||||
def createCluster(self):
|
||||
tdSql.execute("create mnode on dnode 2")
|
||||
tdSql.execute("create mnode on dnode 3")
|
||||
tdSql.execute("create qnode on dnode 1")
|
||||
tdSql.execute("create qnode on dnode 2")
|
||||
tdSql.execute("create qnode on dnode 3")
|
||||
time.sleep(10)
|
||||
|
||||
def tsbsIotQuery(self,tdSql):
|
||||
tdSql.execute("use db_tsbs")
|
||||
|
||||
# test interval and partition
|
||||
tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet; ")
|
||||
# print(tdSql.queryResult)
|
||||
parRows=tdSql.queryRows
|
||||
tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet interval(10m); ")
|
||||
tdSql.checkRows(parRows)
|
||||
|
||||
|
||||
# # test insert into
|
||||
# tdSql.execute("create table testsnode (ts timestamp, c1 float,c2 binary(30),c3 binary(30),c4 binary(30)) ;")
|
||||
# tdSql.query("insert into testsnode SELECT ts,avg(velocity) as mean_velocity,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet,ts interval(10m);")
|
||||
|
||||
# tdSql.query("insert into testsnode(ts,c1,c2,c3,c4) SELECT ts,avg(velocity) as mean_velocity,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet,ts interval(10m);")
|
||||
|
||||
|
||||
# test paitition interval fill
|
||||
tdSql.query("SELECT name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0) ;")
|
||||
|
||||
|
||||
# test partition interval limit (PRcore-TD-17410)
|
||||
tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings partition BY name,driver,fleet interval (10m) limit 1);")
|
||||
tdSql.checkRows(self.ctbNums)
|
||||
|
||||
# test partition interval Pseudo time-column
|
||||
tdSql.query("SELECT count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;")
|
||||
|
||||
# 1 high-load:
|
||||
tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name desc, ts DESC;")
|
||||
|
||||
tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name ;")
|
||||
|
||||
# 2 stationary-trucks
|
||||
tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1)")
|
||||
tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1) WHERE fleet = 'West' AND mean_velocity < 1000 partition BY name")
|
||||
|
||||
# 3 long-driving-sessions
|
||||
tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT _wstart as ts,name,driver,avg(velocity) as mean_velocity FROM readings where ts > '2016-01-01T00:00:34Z' AND ts <= '2016-01-01T04:00:34Z' partition BY name,driver interval(10m)) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 22 ;")
|
||||
|
||||
|
||||
#4 long-daily-sessions
|
||||
tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT name,driver,avg(velocity) as mean_velocity FROM readings WHERE fleet ='West' AND ts > '2016-01-01T12:31:37Z' AND ts <= '2016-01-05T12:31:37Z' partition BY name,driver interval(10m) ) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 60")
|
||||
|
||||
# 5. avg-daily-driving-duration
|
||||
tdSql.query("select _wstart as ts,fleet,name,driver,count(mv)/6 as hours_driven from ( select _wstart as ts,fleet,name,driver,avg(velocity) as mv from readings where ts > '2016-01-01T00:00:00Z' and ts < '2016-01-05T00:00:01Z' partition by fleet,name,driver interval(10m)) where ts > '2016-01-01T00:00:00Z' and ts < '2016-01-05T00:00:01Z' partition by fleet,name,driver interval(1d) ;")
|
||||
|
||||
|
||||
# # 6. avg-daily-driving-session
|
||||
# #taosc core dumped
|
||||
# tdSql.execute("create table random_measure2_1 (ts timestamp,ela float, name binary(40))")
|
||||
# tdSql.query("SELECT ts,diff(mv) AS difka FROM (SELECT ts,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name,ts interval(10m) fill(value,0)) GROUP BY name,ts;")
|
||||
# tdSql.query("select name,diff(mv) AS difka FROM (SELECT ts,name,mv FROM (SELECT _wstart as ts,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0))) group BY name ;")
|
||||
# tdSql.query("SELECT _wstart,name,floor(avg(velocity)/10)/floor(avg(velocity)/10) AS mv FROM readings WHERE name!='' AND ts > '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by name interval(10m) fill(value,0)")
|
||||
|
||||
# 7. avg-load
|
||||
tdSql.query("SELECT fleet, model,avg(ml) AS mean_load_percentage FROM (SELECT fleet, model,current_load/load_capacity AS ml FROM diagnostics partition BY name, fleet, model) partition BY fleet, model order by fleet ;")
|
||||
|
||||
# 8. daily-activity
|
||||
tdSql.query(" SELECT model,ms1 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;")
|
||||
|
||||
tdSql.query(" SELECT model,ms1 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;")
|
||||
|
||||
tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;")
|
||||
|
||||
tdSql.query("SELECT _wstart as ts,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;")
|
||||
|
||||
|
||||
# 9. breakdown-frequency
|
||||
# NULL ---count(NULL)=0 expect count(NULL)= 100
|
||||
tdSql.query("SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where model is null partition BY model,state_changed ")
|
||||
parRows=tdSql.queryRows
|
||||
assert parRows != 0 , "query result is wrong"
|
||||
|
||||
|
||||
tdSql.query(" SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where state_changed =1 partition BY model,state_changed ;")
|
||||
|
||||
#it's already supported:
|
||||
# last-loc
|
||||
tdSql.query("SELECT last_row(ts),latitude,longitude,name,driver FROM readings WHERE fleet='South' and name IS NOT NULL partition BY name,driver order by name ;")
|
||||
|
||||
|
||||
#2. low-fuel
|
||||
tdSql.query("SELECT last_row(ts),name,driver,fuel_state,driver FROM diagnostics WHERE fuel_state <= 0.1 AND fleet = 'South' and name IS NOT NULL GROUP BY name,driver order by name;")
|
||||
|
||||
# 3. avg-vs-projected-fuel-consumption
|
||||
tdSql.query("select avg(fuel_consumption) as avg_fuel_consumption,avg(nominal_fuel_consumption) as nominal_fuel_consumption from readings where velocity > 1 group by fleet")
|
||||
|
||||
def restartFunc(self,func_name,threadNumbers,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
paraDict = {'dbName': 'db',
|
||||
'dbNumbers': 8,
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 2,
|
||||
'replica': 1,
|
||||
'stbName': 'stb',
|
||||
'stbNumbers': 100,
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbNum': 1,
|
||||
}
|
||||
|
||||
dnodeNumbers=int(dnodeNumbers)
|
||||
mnodeNums=int(mnodeNums)
|
||||
vnodeNumbers = int(dnodeNumbers-mnodeNums)
|
||||
|
||||
tdSql.query("show dnodes;")
|
||||
tdLog.debug(tdSql.queryResult)
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
|
||||
tdLog.info("create database and stable")
|
||||
tdDnodes=cluster.dnodes
|
||||
stopcount =0
|
||||
threads=[]
|
||||
for i in range(threadNumbers):
|
||||
newTdSql=tdCom.newTdSql()
|
||||
print("123")
|
||||
threads.append(threading.Thread(target=func_name,args=(newTdSql,)))
|
||||
print("456")
|
||||
for tr in threads:
|
||||
tr.start()
|
||||
|
||||
tdLog.info("Take turns stopping %s "%stopRole)
|
||||
while stopcount < restartNumbers:
|
||||
tdLog.info(" restart loop: %d"%stopcount )
|
||||
if stopRole == "mnode":
|
||||
for i in range(mnodeNums):
|
||||
tdDnodes[i].stoptaosd()
|
||||
# sleep(10)
|
||||
tdDnodes[i].starttaosd()
|
||||
# sleep(10)
|
||||
elif stopRole == "vnode":
|
||||
for i in range(vnodeNumbers):
|
||||
tdDnodes[i+mnodeNums].stoptaosd()
|
||||
# sleep(10)
|
||||
tdDnodes[i+mnodeNums].starttaosd()
|
||||
# sleep(10)
|
||||
elif stopRole == "dnode":
|
||||
for i in range(dnodeNumbers):
|
||||
tdDnodes[i].stoptaosd()
|
||||
# sleep(10)
|
||||
tdDnodes[i].starttaosd()
|
||||
# sleep(10)
|
||||
|
||||
# dnodeNumbers don't include database of schema
|
||||
if clusterComCheck.checkDnodes(dnodeNumbers):
|
||||
tdLog.info("check dnodes status is ready")
|
||||
else:
|
||||
tdLog.info("check dnodes status is not ready")
|
||||
self.stopThread(threads)
|
||||
tdLog.exit("one or more of dnodes failed to start ")
|
||||
# self.check3mnode()
|
||||
stopcount+=1
|
||||
|
||||
for tr in threads:
|
||||
tr.join()
|
||||
|
||||
|
||||
def run(self):
|
||||
tdLog.printNoPrefix("==========step1:create database and table,insert data ==============")
|
||||
self.createCluster()
|
||||
self.prepareData()
|
||||
queryPolicy=2
|
||||
simClientCfg="%s/taos.cfg"%tdDnodes.getSimCfgPath()
|
||||
cmd='sed -i "s/^queryPolicy.*/queryPolicy 2/g" %s'%simClientCfg
|
||||
os.system(cmd)
|
||||
# self.tsbsIotQuery()
|
||||
self.restartFunc(func_name=self.tsbsIotQuery,threadNumbers=3,dnodeNumbers=5,mnodeNums=3,restartNumbers=3,stopRole='mnode')
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -318,7 +318,6 @@ class TDTestCase:
|
|||
os.system(cmd)
|
||||
# tdDnodes.stop(1)
|
||||
# tdDnodes.start(1)
|
||||
tdSql.execute("reset query cache")
|
||||
tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy)
|
||||
tdSql.query("show local variables;")
|
||||
for i in range(tdSql.queryRows):
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
# from asyncio.windows_events import NULL
|
||||
import taos
|
||||
import sys
|
||||
import datetime
|
||||
|
@ -21,38 +22,99 @@ class TDTestCase:
|
|||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), True)
|
||||
|
||||
def prepareData(self):
|
||||
database="db_tsbs"
|
||||
ts=1451606400000
|
||||
tdSql.execute(f"create database {database};")
|
||||
tdSql.execute(f"use {database} ")
|
||||
tdSql.execute('''
|
||||
create table readings (ts timestamp,latitude double,longitude double,elevation double,velocity double,heading double,grade double,fuel_consumption double,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30));
|
||||
''')
|
||||
tdSql.execute('''
|
||||
create table diagnostics (ts timestamp,fuel_state double,current_load double,status bigint,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30)) ;
|
||||
''')
|
||||
def create_ctable(self,tsql=None, dbName='db',stbName='stb',ctbPrefix='ctb',ctbNum=1):
|
||||
tsql.execute("use %s" %dbName)
|
||||
pre_create = "create table"
|
||||
sql = pre_create
|
||||
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
|
||||
for i in range(ctbNum):
|
||||
tagValue = 'beijing'
|
||||
if (i % 10 == 0):
|
||||
sql += " %s%d using %s (name,fleet,driver,device_version) tags('truck_%d', 'South%d','Trish%d','v2.%d')"%(ctbPrefix,i,stbName,i,i,i,i)
|
||||
else:
|
||||
model = 'H-%d'%i
|
||||
sql += " %s%d using %s tags('truck_%d', 'South%d','Trish%d','%s','v2.%d')"%(ctbPrefix,i,stbName,i,i,i,model,i)
|
||||
if (i > 0) and (i%1000 == 0):
|
||||
tsql.execute(sql)
|
||||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
|
||||
return
|
||||
|
||||
for i in range(10):
|
||||
if i == 1 or i == 2 :
|
||||
tdLog.debug(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}', NULL,'v2.3')")
|
||||
tdSql.execute(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}', NULL,'v2.3')")
|
||||
else :
|
||||
tdSql.execute(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}','H-{i}','v2.3')")
|
||||
if i == 1 or i == 2 :
|
||||
tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}',NULL ,'v2.3')")
|
||||
else:
|
||||
tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}','H-{i}','v2.3')")
|
||||
for j in range(10):
|
||||
for i in range(100):
|
||||
tdSql.execute(
|
||||
f"insert into rct{j} values ( {ts+i*60000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )"
|
||||
)
|
||||
status= random.randint(0,1)
|
||||
tdSql.execute(
|
||||
f"insert into dct{j} values ( {ts+i*60000}, {1+i*0.1},{1400+i*15}, {status},{1500+i*20}, {150+i*2},{5+i} )"
|
||||
)
|
||||
tdSql.execute("insert into dct9 (ts,fuel_state) values('2021-07-13 14:06:33.123Z',1.2) ;")
|
||||
def insertData(self,startTs,tsql=None, dbName='db',stbName='stb',ctbPrefix='ctb',ctbNum=1,rowsPerTbl=100,batchNum=1000):
|
||||
tsql.execute("use %s" %dbName)
|
||||
pre_insert = "insert into "
|
||||
sql = pre_insert
|
||||
if startTs is None:
|
||||
t = time.time()
|
||||
startTs = int(round(t * 1000))
|
||||
|
||||
for i in range(ctbNum):
|
||||
sql += " %s%d values "%(ctbPrefix,i)
|
||||
for j in range(rowsPerTbl):
|
||||
if(ctbPrefix=="rct"):
|
||||
sql += f"({startTs+j*60000}, {80+j}, {90+j}, {85+j}, {30+j*10}, {1.2*j}, {221+j*2}, {20+j*0.2}, {1500+j*20}, {150+j*2},{5+j}) "
|
||||
elif ( ctbPrefix=="dct"):
|
||||
status= random.randint(0,1)
|
||||
sql += f"( {startTs+j*60000}, {1+j*0.1},{1400+j*15}, {status},{1500+j*20}, {150+j*2},{5+j} ) "
|
||||
# tdLog.debug("1insert sql:%s"%sql)
|
||||
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
|
||||
# tdLog.debug("2insert sql:%s"%sql)
|
||||
tsql.execute(sql)
|
||||
if j < rowsPerTbl - 1:
|
||||
sql = "insert into %s%d values " %(ctbPrefix,i)
|
||||
else:
|
||||
sql = "insert into "
|
||||
if sql != pre_insert:
|
||||
# tdLog.debug("3insert sql:%s"%sql)
|
||||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
def prepareData(self):
|
||||
dbname="db_tsbs"
|
||||
stabname1="readings"
|
||||
stabname2="diagnostics"
|
||||
ctbnamePre1="rct"
|
||||
ctbnamePre2="dct"
|
||||
ctbNums=40
|
||||
self.ctbNums=ctbNums
|
||||
rowNUms=200
|
||||
ts=1451606400000
|
||||
tdSql.execute(f"create database {dbname};")
|
||||
tdSql.execute(f"use {dbname} ")
|
||||
tdSql.execute(f'''
|
||||
create table {stabname1} (ts timestamp,latitude double,longitude double,elevation double,velocity double,heading double,grade double,fuel_consumption double,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30));
|
||||
''')
|
||||
tdSql.execute(f'''
|
||||
create table {stabname2} (ts timestamp,fuel_state double,current_load double,status bigint,load_capacity double,fuel_capacity double,nominal_fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30)) ;
|
||||
''')
|
||||
self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname1,ctbPrefix=ctbnamePre1,ctbNum=ctbNums)
|
||||
self.create_ctable(tsql=tdSql,dbName=dbname,stbName=stabname2,ctbPrefix=ctbnamePre2,ctbNum=ctbNums)
|
||||
self.insertData(tsql=tdSql,dbName=dbname,stbName=stabname1,ctbPrefix=ctbnamePre1,ctbNum=ctbNums,rowsPerTbl=rowNUms,startTs=ts,batchNum=1000)
|
||||
self.insertData(tsql=tdSql,dbName=dbname,stbName=stabname2,ctbPrefix=ctbnamePre2,ctbNum=ctbNums,rowsPerTbl=rowNUms,startTs=ts,batchNum=1000)
|
||||
# for i in range(ctbNum):
|
||||
# if i %10 == 0 :
|
||||
# # tdLog.debug(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}', NULL,'v2.3')")
|
||||
# tdSql.execute(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}', NULL,'v2.3')")
|
||||
# else :
|
||||
# tdSql.execute(f"create table rct{i} using readings (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}','H-{i}','v2.3')")
|
||||
# if i %10 == 0 :
|
||||
# tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}',NULL ,'v2.3')")
|
||||
# else:
|
||||
# tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}','H-{i}','v2.3')")
|
||||
# for j in range(ctbNums):
|
||||
# for i in range(rowNUms):
|
||||
# tdSql.execute(
|
||||
# f"insert into rct{j} values ( {ts+i*60000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )"
|
||||
# )
|
||||
# status= random.randint(0,1)
|
||||
# tdSql.execute(
|
||||
# f"insert into dct{j} values ( {ts+i*60000}, {1+i*0.1},{1400+i*15}, {status},{1500+i*20}, {150+i*2},{5+i} )"
|
||||
# )
|
||||
# tdSql.execute("insert into dct9 (ts,fuel_state) values('2021-07-13 14:06:33.123Z',1.2) ;")
|
||||
# def check_avg(self ,origin_query , check_query):
|
||||
# avg_result = tdSql.getResult(origin_query)
|
||||
# origin_result = tdSql.getResult(check_query)
|
||||
|
@ -75,7 +137,6 @@ class TDTestCase:
|
|||
|
||||
# test interval and partition
|
||||
tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet; ")
|
||||
print(tdSql.queryResult)
|
||||
parRows=tdSql.queryRows
|
||||
tdSql.query(" SELECT avg(velocity) as mean_velocity ,name,driver,fleet FROM readings WHERE ts > 1451606400000 AND ts <= 1451606460000 partition BY name,driver,fleet interval(10m); ")
|
||||
tdSql.checkRows(parRows)
|
||||
|
@ -94,7 +155,7 @@ class TDTestCase:
|
|||
|
||||
# test partition interval limit (PRcore-TD-17410)
|
||||
tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings partition BY name,driver,fleet interval (10m) limit 1);")
|
||||
tdSql.checkRows(10)
|
||||
tdSql.checkRows(self.ctbNums)
|
||||
|
||||
# test partition interval Pseudo time-column
|
||||
tdSql.query("SELECT count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;")
|
||||
|
@ -136,17 +197,27 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;")
|
||||
|
||||
tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;")
|
||||
tdSql.query("SELECT _wstart as ts,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;")
|
||||
|
||||
|
||||
# 9. breakdown-frequency
|
||||
# NULL ---count(NULL)=0 expect count(NULL)= 100
|
||||
tdSql.query("SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where model is null partition BY model,state_changed ")
|
||||
parRows=tdSql.queryRows
|
||||
assert parRows != 0 , "query result is wrong"
|
||||
assert parRows != 0 , "query result is wrong, query rows %d but expect > 0 " %parRows
|
||||
|
||||
|
||||
tdSql.query(" SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where state_changed =1 partition BY model,state_changed ;")
|
||||
sql="select model,ctc from (SELECT model,count(state_changed) as ctc FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= 1451606400000 AND ts < 1451952001000 ) WHERE ts >= 1451606400000 AND ts < 1451952001000 partition BY model interval(10m)) partition BY model) WHERE state_changed = 1 partition BY model )where model is null;"
|
||||
|
||||
# for i in range(2):
|
||||
# tdSql.query("%s"%sql)
|
||||
# quertR1=tdSql.queryResult
|
||||
# for j in range(50):
|
||||
# tdSql.query("%s"%sql)
|
||||
# quertR2=tdSql.queryResult
|
||||
# assert quertR1 == quertR2 , "%s != %s ,The results of multiple queries are different" %(quertR1,quertR2)
|
||||
|
||||
|
||||
#it's already supported:
|
||||
# last-loc
|
||||
|
|
|
@ -778,7 +778,7 @@ void shellReadHistory() {
|
|||
taosFsyncFile(pFile);
|
||||
taosCloseFile(&pFile);
|
||||
}
|
||||
pHistory->hend = pHistory->hstart;
|
||||
pHistory->hstart = pHistory->hend;
|
||||
}
|
||||
|
||||
void shellWriteHistory() {
|
||||
|
|
|
@ -33,10 +33,11 @@ int shell_conn_ws_server(bool first) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int horizontalPrintWebsocket(WS_RES* wres) {
|
||||
static int horizontalPrintWebsocket(WS_RES* wres, double* execute_time) {
|
||||
const void* data = NULL;
|
||||
int rows;
|
||||
ws_fetch_block(wres, &data, &rows);
|
||||
*execute_time += (double)(ws_take_timing(wres)/1E6);
|
||||
if (!rows) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -72,10 +73,11 @@ static int horizontalPrintWebsocket(WS_RES* wres) {
|
|||
return numOfRows;
|
||||
}
|
||||
|
||||
static int verticalPrintWebsocket(WS_RES* wres) {
|
||||
static int verticalPrintWebsocket(WS_RES* wres, double* pexecute_time) {
|
||||
int rows = 0;
|
||||
const void* data = NULL;
|
||||
ws_fetch_block(wres, &data, &rows);
|
||||
*pexecute_time += (double)(ws_take_timing(wres)/1E6);
|
||||
if (!rows) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -112,7 +114,7 @@ static int verticalPrintWebsocket(WS_RES* wres) {
|
|||
return numOfRows;
|
||||
}
|
||||
|
||||
static int dumpWebsocketToFile(const char* fname, WS_RES* wres) {
|
||||
static int dumpWebsocketToFile(const char* fname, WS_RES* wres, double* pexecute_time) {
|
||||
char fullname[PATH_MAX] = {0};
|
||||
if (taosExpandDir(fname, fullname, PATH_MAX) != 0) {
|
||||
tstrncpy(fullname, fname, PATH_MAX);
|
||||
|
@ -127,6 +129,7 @@ static int dumpWebsocketToFile(const char* fname, WS_RES* wres) {
|
|||
int rows = 0;
|
||||
const void* data = NULL;
|
||||
ws_fetch_block(wres, &data, &rows);
|
||||
*pexecute_time += (double)(ws_take_timing(wres)/1E6);
|
||||
if (!rows) {
|
||||
taosCloseFile(&pFile);
|
||||
return 0;
|
||||
|
@ -162,14 +165,14 @@ static int dumpWebsocketToFile(const char* fname, WS_RES* wres) {
|
|||
return numOfRows;
|
||||
}
|
||||
|
||||
static int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical) {
|
||||
static int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical, double* pexecute_time) {
|
||||
int numOfRows = 0;
|
||||
if (fname != NULL) {
|
||||
numOfRows = dumpWebsocketToFile(fname, wres);
|
||||
numOfRows = dumpWebsocketToFile(fname, wres, pexecute_time);
|
||||
} else if (vertical) {
|
||||
numOfRows = verticalPrintWebsocket(wres);
|
||||
numOfRows = verticalPrintWebsocket(wres, pexecute_time);
|
||||
} else {
|
||||
numOfRows = horizontalPrintWebsocket(wres);
|
||||
numOfRows = horizontalPrintWebsocket(wres, pexecute_time);
|
||||
}
|
||||
*error_no = ws_errno(wres);
|
||||
return numOfRows;
|
||||
|
@ -225,6 +228,8 @@ void shellRunSingleCommandWebsocketImp(char *command) {
|
|||
return;
|
||||
}
|
||||
|
||||
double execute_time = ws_take_timing(res)/1E6;
|
||||
|
||||
if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) {
|
||||
fprintf(stdout, "Database changed.\r\n\r\n");
|
||||
fflush(stdout);
|
||||
|
@ -236,22 +241,27 @@ void shellRunSingleCommandWebsocketImp(char *command) {
|
|||
if (ws_is_update_query(res)) {
|
||||
numOfRows = ws_affected_rows(res);
|
||||
et = taosGetTimestampUs();
|
||||
printf("Query Ok, %d of %d row(s) in database (%.6fs)\n", numOfRows, numOfRows,
|
||||
(et - st)/1E6);
|
||||
double total_time = (et - st)/1E3;
|
||||
double net_time = total_time - (double)execute_time;
|
||||
printf("Query Ok, %d of %d row(s) in database\n", numOfRows, numOfRows);
|
||||
printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n", execute_time, net_time, total_time);
|
||||
} else {
|
||||
int error_no = 0;
|
||||
numOfRows = shellDumpWebsocket(res, fname, &error_no, printMode);
|
||||
numOfRows = shellDumpWebsocket(res, fname, &error_no, printMode, &execute_time);
|
||||
if (numOfRows < 0) {
|
||||
ws_free_result(res);
|
||||
return;
|
||||
}
|
||||
et = taosGetTimestampUs();
|
||||
double total_time = (et - st) / 1E3;
|
||||
double net_time = total_time - execute_time;
|
||||
if (error_no == 0 && !shell.stop_query) {
|
||||
printf("Query OK, %d row(s) in set (%.6fs)\n", numOfRows,
|
||||
(et - st)/1E6);
|
||||
printf("Query OK, %d row(s) in set\n", numOfRows);
|
||||
printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n", execute_time, net_time, total_time);
|
||||
} else {
|
||||
printf("Query interrupted, %d row(s) in set (%.6fs)\n", numOfRows,
|
||||
(et - st)/1E6);
|
||||
printf("Execute: %.2f ms Network: %.2f ms Total: %.2f ms\n", execute_time, net_time, total_time);
|
||||
}
|
||||
}
|
||||
printf("\n");
|
||||
|
|
Loading…
Reference in New Issue