Merge branch 'main' of https://github.com/taosdata/TDengine into fix/TS-5669

This commit is contained in:
yihaoDeng 2024-11-21 15:00:16 +08:00
commit a5c1f11b04
28 changed files with 302 additions and 180 deletions

View File

@ -5,7 +5,7 @@ node {
}
file_zh_changed = ''
file_en_changed = ''
file_no_doc_changed = ''
file_no_doc_changed = '1'
def abortPreviousBuilds() {
def currentJobName = env.JOB_NAME
def currentBuildNumber = env.BUILD_NUMBER.toInteger()
@ -355,7 +355,7 @@ def pre_test_build_win() {
bat '''
cd %WIN_COMMUNITY_ROOT%/tests/ci
pip3 install taospy==2.7.16
pip3 install taos-ws-py==0.3.3
pip3 install taos-ws-py==0.3.5
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
'''
return 1
@ -655,4 +655,4 @@ pipeline {
)
}
}
}
}

View File

@ -41,12 +41,18 @@ We recommend using the latest version of `taospy`, regardless of the version of
|Python Client Library Version|major changes|
|:-------------------:|:----:|
|2.7.16|add subscription configuration (session.timeout.ms, max.poll.interval.ms)|
|2.7.15|added support for VARBINARY and GEOMETRY types|
|2.7.14|fix known issues|
|2.7.13|add TMQ synchronous submission offset interface|
|2.7.12|1. added support for `varbinary` type (STMT does not yet support)<br/> 2. improved query performance (thanks to contributor [hadrianl](https://github.com/taosdata/taos-connector-python/pull/209))|
|2.7.9|support for getting assignment and seek function on subscription|
|2.7.8|add `execute_many` method|
|Python Websocket Connection Version|major changes|
|:----------------------------:|:-----:|
|0.3.5|1. added support for VARBINARY and GEOMETRY types <br/> 2. Fix known issues|
|0.3.2|1. optimize WebSocket SQL query and insertion performance <br/> 2. Fix known issues <br/> 3. Modify the readme and document|
|0.2.9|bugs fixes|
|0.2.5|1. support for getting assignment and seek function on subscription <br/> 2. support schemaless <br/> 3. support STMT|
|0.2.4|support `unsubscribe` on subscription|

View File

@ -27,6 +27,8 @@ Node.js client library needs to be run with Node.js 14 or higher version.
| Node.js connector version | major changes | TDengine 版本 |
| :-----------------------: | :------------------: | :----------------:|
| 3.1.2 | Optimized the data protocol and parsing, resulting in a significant improvement in performance | 3.2.0.0 or later |
| 3.1.1 | Optimized data transmission performance | 3.2.0.0 or later |
| 3.1.0 | new version, supports websocket | 3.2.0.0 or later |
## Supported features

View File

@ -24,10 +24,6 @@ import Release from "/components/ReleaseV3";
<Release type="tdengine" version="3.3.4.3" />
## 3.3.4.3
<Release type="tdengine" version="3.3.4.3" />
## 3.3.3.0
<Release type="tdengine" version="3.3.3.0" />

View File

@ -41,6 +41,7 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con
|Python Connector 版本|主要变化|
|:-------------------:|:----:|
|2.7.16|新增订阅配置 (session.timeout.ms, max.poll.interval.ms)|
|2.7.15|新增 VARBINARY 和 GEOMETRY 类型支持|
|2.7.14|修复已知问题|
|2.7.13|新增 tmq 同步提交 offset 接口|
@ -50,6 +51,7 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con
|Python WebSocket Connector 版本|主要变化|
|:----------------------------:|:-----:|
|0.3.5|新增 VARBINARY 和 GEOMETRY 类型支持,修复已知问题|
|0.3.2|优化 WebSocket sql 查询和插入性能,修改 readme 和 文档,修复已知问题|
|0.2.9|已知问题修复|
|0.2.5|1. 数据订阅支持获取消费进度和重置消费进度 <br/> 2. 支持 schemaless <br/> 3. 支持 STMT|

View File

@ -26,6 +26,7 @@ Node.js 连接器目前仅支持 WebSocket 连接器, 其通过 taosAdapter
| Node.js 连接器 版本 | 主要变化 | TDengine 版本 |
| :------------------: | :----------------------: | :----------------: |
| 3.1.2 | 对数据协议和解析进行了优化,性能得到大幅提升| 3.3.2.0 及更高版本 |
| 3.1.1 | 优化了数据传输性能 | 3.3.2.0 及更高版本 |
| 3.1.0 | 新版本发布,支持 WebSocket 连接 | 3.2.0.0 及更高版本 |

View File

@ -28,10 +28,6 @@ import Release from "/components/ReleaseV3";
<Release type="tdengine" version="3.3.4.3" />
## 3.3.4.3
<Release type="tdengine" version="3.3.4.3" />
## 3.3.3.0
<Release type="tdengine" version="3.3.3.0" />

View File

@ -1,60 +1,69 @@
## 新特性
* 新功能:流计算的 TWA 函数支持时间驱动的结果推送模式
* 新功能:流计算的 Interp 函数支持时间驱动的结果推送模式
* 优化:顺序执行 compact 和 split vgroup操作时的日志错误提示
* 新功能:支持微软对象存储
---
title: 3.3.4.3 版本说明
sidebar_label: 3.3.4.3
description: 3.3.4.3 版本说明
---
## 优化
* 优化:提升并发大查询时节点之间互相拉数据的效率
* 优化:支持使用 AVX2 和 AVX512 对 double 、timestamp 和 bigint 类型进行解码优化
* 优化:调整 case when 语句的结果类型判断方法
* 优化:提升查询 “select ... from ... where ts in (...)” 的数据扫描速度
* 优化:增加了流计算的兼容性保证机制,避免后续函数变更产生新的兼容性问题,之前版本的流计算必须重建
* 优化:提升 taosX 在交叉写入场景下的数据同步性能
* 优化:支持关闭整数/浮点数类型的编码
* 优化:多副本流计算中必须使用 snode
* 优化:客户端生成唯一 ID 标识每一个查询任务,避免重复 ID 导致的内存损坏
* 优化:加快数据库的创建时间
* 优化:修改 s3MigrateEnabled 默认值为0
* 优化:支持在审计数据库中记录删除操作
* 优化:支持在指定的 dnode 中创建数据库 [企业版]
* 优化:调整删除超级表数据列时的报错信息
### 行为变更及兼容性
1. 多副本流计算中必须使用 snode
1. 增加了流计算的兼容性保证机制,避免后续函数变更产生新的兼容性问题,但之前版本的流计算必须重建,具体参见 https://docs.taosdata.com/advanced/stream/#流计算升级故障恢复
1. 调整 case when 语句结果类型的判断方法
### 新特性
1. 新功能:流计算的 TWA 函数支持时间驱动的结果推送模式
1. 新功能:流计算的 Interp 函数支持时间驱动的结果推送模式
1. 新功能:支持微软对象存储
## 修复
* 修复last_row 查询性能在 3.3.3.0 中大幅下降的问题
* 修复WAL 条目不完整时 taosd 无法启动的问题
* 修复: partition by 常量时查询结果错误的问题
* 修复:标量函数包含 _wstart 且填充方式为 prev 时计算结果错误
* 修复Windows 平台下的时区设置问题
* 修复:空数据库进行 compact 操作时,事务无法结束【企业版】
* 修复:事务冲突的逻辑错误
* 修复:管理节点某些错误会导致事务无法停止
* 修复:管理节点某些错误会导致事务无法停止
* 修复dnode 数据清空后 taosc 重试错误的问题
* 修复Data Compact 被异常终止后,中间文件未被清理
* 修复新增列后Kafka 连接器的 earliest 模式消费不到新列数据
* 修复interp 函数在 fill(prev) 时行为不正确
* 修复TSMA 在高频元数据操作时异常停止的问题
* 修复show create stable 语句执行结果的标签显示错误
* 修复Percentile 函数在大数据量查询时会崩溃。
* 修复partition by 和 having 联合使用时的语法错误问题
* 修复interp 在 partition by tbname,c1 时 tbname 为空的问题
* 修复:通过 stmt 写入非法布尔数值时 taosd 可能 crash
* 修复Explorer OPC-UA 表名模板说明
* 修复:库符号 version 与使用相同符号的库冲突的问题 https://github.com/taosdata/TDengine/issues/25920
* 修复:在 windows 平台下 JDBC 驱动的句柄数持续升高问题
* 修复3.3.3.1 升级至 3.3.4.0 偶现的启动失败问题
* 修复Windows 平台重复增删表的内存泄漏
* 修复:无法限制并发拉起 checkpoint 数量导致流计算消耗资源过多
* 修复:并发查询时的 too many session 问题
* 修复Windows 平台下 taos shell 在慢查询场景中崩溃的问题
* 修复:当打开 dnode日志时加密数据库无法恢复的问题
* 修复:由于 mnode 同步超时,进而导致 taosd 无法启动的问题
* 修复:由于在快照同步过程中整理文件组数据的速度过慢,从而导致 Vnode虚拟节点无法恢复的问题
* 修复通过行协议向字符串类型的字段中写入带转义符的数据时taosd 会崩溃
* 修复Error Code 逻辑处理错误导致的元数据文件损坏
* 修复:查询语句中包含多个 “not” 条件语句嵌套时,未设置标量模式导致查询错误
* 修复vnode 统计信息上报超时导致的 dnode offline 问题
* 修复:在不支持 avx 指令集的服务器上taosd 启动失败问题
* 修复taosX 数据迁移容错处理 0x09xx 错误码
### 优化
1. 优化:提升并发大查询时节点之间互相拉数据的效率
1. 优化:支持使用 AVX2 和 AVX512 对 double 、timestamp 和 bigint 类型进行解码优化
1. 优化:调整 case when 语句的结果类型判断方法
1. 优化:顺序执行 compact 和 split vgroup操作时的日志错误提示
1. 优化:提升查询 “select ... from ... where ts in (...)” 的数据扫描速度
1. 优化:增加了流计算的兼容性保证机制,避免后续函数变更产生新的兼容性问题,之前版本的流计算必须重建
1. 优化:提升 taosX 在交叉写入场景下的数据同步性能
1. 优化:支持关闭整数/浮点数类型的编码
1. 优化:多副本流计算中必须使用 snode
1. 优化:客户端生成唯一 ID 标识每一个查询任务,避免重复 ID 导致的内存损坏
1. 优化:加快数据库的创建时间
1. 优化:修改 s3MigrateEnabled 默认值为0
1. 优化:支持在审计数据库中记录删除操作
1. 优化:支持在指定的 dnode 中创建数据库 [企业版]
1. 优化:调整删除超级表数据列时的报错信息
### 修复
1. 修复last_row 查询性能在 3.3.3.0 中大幅下降的问题
1. 修复WAL 条目不完整时 taosd 无法启动的问题
1. 修复: partition by 常量时查询结果错误的问题
1. 修复:标量函数包含 _wstart 且填充方式为 prev 时计算结果错误
1. 修复Windows 平台下的时区设置问题
1. 修复:空数据库进行 compact 操作时,事务无法结束【企业版】
1. 修复:事务冲突的逻辑错误
1. 修复:管理节点某些错误会导致事务无法停止
1. 修复:管理节点某些错误会导致事务无法停止
1. 修复dnode 数据清空后 taosc 重试错误的问题
1. 修复Data Compact 被异常终止后,中间文件未被清理
1. 修复新增列后Kafka 连接器的 earliest 模式消费不到新列数据
1. 修复interp 函数在 fill(prev) 时行为不正确
1. 修复TSMA 在高频元数据操作时异常停止的问题
1. 修复show create stable 语句执行结果的标签显示错误
1. 修复Percentile 函数在大数据量查询时会崩溃。
1. 修复partition by 和 having 联合使用时的语法错误问题
1. 修复interp 在 partition by tbname,c1 时 tbname 为空的问题
1. 修复:通过 stmt 写入非法布尔数值时 taosd 可能 crash
1. 修复:库符号 version 与使用相同符号的库冲突的问题
1. 修复:在 windows 平台下 JDBC 驱动的句柄数持续升高问题
1. 修复3.3.3.1 升级至 3.3.4.0 偶现的启动失败问题
1. 修复Windows 平台重复增删表的内存泄漏
1. 修复:无法限制并发拉起 checkpoint 数量导致流计算消耗资源过多
1. 修复:并发查询时的 too many session 问题
1. 修复Windows 平台下 taos shell 在慢查询场景中崩溃的问题
1. 修复:当打开 dnode日志时加密数据库无法恢复的问题
1. 修复:由于 mnode 同步超时,进而导致 taosd 无法启动的问题
1. 修复:由于在快照同步过程中整理文件组数据的速度过慢,从而导致 Vnode虚拟节点无法恢复的问题
1. 修复通过行协议向字符串类型的字段中写入带转义符的数据时taosd 会崩溃
1. 修复Error Code 逻辑处理错误导致的元数据文件损坏
1. 修复:查询语句中包含多个 “not” 条件语句嵌套时,未设置标量模式导致查询错误
1. 修复vnode 统计信息上报超时导致的 dnode offline 问题
1. 修复:在不支持 avx 指令集的服务器上taosd 启动失败问题
1. 修复taosX 数据迁移容错处理 0x09xx 错误码

View File

@ -4,7 +4,6 @@ sidebar_label: 版本说明
description: 各版本版本说明
---
[3.3.4.3](./3.3.4.3)
[3.3.4.3](./3.3.4.3)
[3.3.3.0](./3.3.3.0)
[3.3.2.0](./3.3.2.0)

View File

@ -102,6 +102,7 @@ typedef struct SCatalogReq {
bool svrVerRequired;
bool forceUpdate;
bool cloned;
bool forceFetchViewMeta;
} SCatalogReq;
typedef struct SMetaRes {

View File

@ -186,11 +186,25 @@ static int32_t tBufferGetI16(SBufferReader *reader, int16_t *value) {
}
static int32_t tBufferGetI32(SBufferReader *reader, int32_t *value) {
return tBufferGet(reader, sizeof(*value), value);
if (reader->offset + sizeof(int32_t) > reader->buffer->size) {
return TSDB_CODE_OUT_OF_RANGE;
}
if (value) {
*value = *(int32_t*)BR_PTR(reader);
}
reader->offset += sizeof(int32_t);
return 0;
}
static int32_t tBufferGetI64(SBufferReader *reader, int64_t *value) {
return tBufferGet(reader, sizeof(*value), value);
if (reader->offset + sizeof(int64_t) > reader->buffer->size) {
return TSDB_CODE_OUT_OF_RANGE;
}
if (value) {
*value = *(int64_t*)BR_PTR(reader);
}
reader->offset += sizeof(int64_t);
return 0;
}
static int32_t tBufferGetU8(SBufferReader *reader, uint8_t *value) { return tBufferGet(reader, sizeof(*value), value); }

View File

@ -972,7 +972,7 @@ static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinR
break;
}
if ((writer->brinBlock->numOfRecords) >= writer->config->maxRow) {
if ((writer->brinBlock->numOfRecords) >= 256) {
TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
}

View File

@ -836,6 +836,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
pList = &pReader->status.uidList;
int32_t i = 0;
int32_t j = 0;
while (i < TARRAY2_SIZE(pBlkArray)) {
pBrinBlk = &pBlkArray->data[i];
if (pBrinBlk->maxTbid.suid < pReader->info.suid) {
@ -851,7 +852,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
(pBrinBlk->minTbid.suid <= pReader->info.suid) && (pBrinBlk->maxTbid.suid >= pReader->info.suid), code, lino,
_end, TSDB_CODE_INTERNAL_ERROR);
if (pBrinBlk->maxTbid.suid == pReader->info.suid && pBrinBlk->maxTbid.uid < pList->tableUidList[0]) {
if (pBrinBlk->maxTbid.suid == pReader->info.suid && pBrinBlk->maxTbid.uid < pList->tableUidList[j]) {
i += 1;
continue;
}
@ -864,6 +865,14 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
TSDB_CHECK_NULL(p1, code, lino, _end, terrno);
i += 1;
if (pBrinBlk->maxTbid.suid == pReader->info.suid) {
while (j < numOfTables && pList->tableUidList[j] < pBrinBlk->maxTbid.uid) {
j++;
}
if (j >= numOfTables) {
break;
}
}
}
et2 = taosGetTimestampUs();

View File

@ -271,6 +271,7 @@ typedef struct SCtgViewsCtx {
SArray* pNames;
SArray* pResList;
SArray* pFetchs;
bool forceFetch;
} SCtgViewsCtx;
typedef enum {
@ -831,12 +832,12 @@ typedef struct SCtgCacheItemInfo {
#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgTaskFatal(param, ...) qFatal("qid:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskError(param, ...) qError("qid:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskWarn(param, ...) qWarn("qid:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskInfo(param, ...) qInfo("qid:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskDebug(param, ...) qDebug("qid:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskTrace(param, ...) qTrace("qid:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskFatal(param, ...) qFatal("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskError(param, ...) qError("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskWarn(param, ...) qWarn("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskInfo(param, ...) qInfo("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskDebug(param, ...) qDebug("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define ctgTaskTrace(param, ...) qTrace("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define CTG_LOCK_DEBUG(...) \
do { \

View File

@ -20,6 +20,11 @@
#include "tref.h"
#include "trpc.h"
typedef struct SCtgViewTaskParam {
bool forceFetch;
SArray* pTableReqs;
} SCtgViewTaskParam;
void ctgIsTaskDone(SCtgJob* pJob, CTG_TASK_TYPE type, bool* done) {
SCtgTask* pTask = NULL;
@ -500,7 +505,7 @@ int32_t ctgInitGetTbTagTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
int32_t ctgInitGetViewsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SCtgTask task = {0};
SCtgViewTaskParam* p = param;
task.type = CTG_TASK_GET_VIEW;
task.taskId = taskIdx;
task.pJob = pJob;
@ -511,7 +516,8 @@ int32_t ctgInitGetViewsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
}
SCtgViewsCtx* ctx = task.taskCtx;
ctx->pNames = param;
ctx->pNames = p->pTableReqs;
ctx->forceFetch = p->forceFetch;
ctx->pResList = taosArrayInit(pJob->viewNum, sizeof(SMetaRes));
if (NULL == ctx->pResList) {
qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->viewNum,
@ -849,13 +855,12 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
int32_t tbTagNum = (int32_t)taosArrayGetSize(pReq->pTableTag);
int32_t viewNum = (int32_t)ctgGetTablesReqNum(pReq->pView);
int32_t tbTsmaNum = (int32_t)taosArrayGetSize(pReq->pTableTSMAs);
int32_t tbTsmaNum = tsQuerySmaOptimize ? (int32_t)taosArrayGetSize(pReq->pTableTSMAs) : 0;
int32_t tsmaNum = (int32_t)taosArrayGetSize(pReq->pTSMAs);
int32_t tbNameNum = (int32_t)ctgGetTablesReqNum(pReq->pTableName);
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum +
userNum + dbInfoNum + tbIndexNum + tbCfgNum + tbTagNum + viewNum + tbTsmaNum + tbNameNum;
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
if (NULL == *job) {
ctgError("failed to calloc, size:%d,QID:0x%" PRIx64, (int32_t)sizeof(SCtgJob), pConn->requestId);
@ -1014,7 +1019,8 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
}
if (viewNum > 0) {
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_VIEW, pReq->pView, NULL));
SCtgViewTaskParam param = {.forceFetch = pReq->forceFetchViewMeta, .pTableReqs = pReq->pView};
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_VIEW, &param, NULL));
}
if (tbTsmaNum > 0) {
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_TSMA, pReq->pTableTSMAs, NULL));
@ -3712,16 +3718,14 @@ int32_t ctgLaunchGetViewsTask(SCtgTask* pTask) {
bool tbMetaDone = false;
SName* pName = NULL;
/*
ctgIsTaskDone(pJob, CTG_TASK_GET_TB_META_BATCH, &tbMetaDone);
if (tbMetaDone) {
CTG_ERR_RET(ctgBuildViewNullRes(pTask, pCtx));
TSWAP(pTask->res, pCtx->pResList);
ctgIsTaskDone(pJob, CTG_TASK_GET_TB_META_BATCH, &tbMetaDone);
if (tbMetaDone && !pCtx->forceFetch) {
CTG_ERR_RET(ctgBuildViewNullRes(pTask, pCtx));
TSWAP(pTask->res, pCtx->pResList);
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS;
}
*/
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS;
}
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
int32_t fetchIdx = 0;

View File

@ -3410,6 +3410,8 @@ int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff, int32_t*
QUERY_CHECK_CODE(code, lino, _end);
}
qDebug("%s last scan range %d. %" PRId64 ",%" PRId64, __func__, __LINE__, pInfo->lastScanRange.skey, pInfo->lastScanRange.ekey);
*pLen = len;
_end:
@ -3475,21 +3477,20 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
goto _end;
}
if (pInfo->pUpdateInfo != NULL) {
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
if (!pUpInfo) {
lino = __LINE__;
goto _end;
}
code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo);
if (code == TSDB_CODE_SUCCESS) {
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
pInfo->pUpdateInfo = pUpInfo;
} else {
taosMemoryFree(pUpInfo);
lino = __LINE__;
goto _end;
}
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
if (!pUpInfo) {
lino = __LINE__;
goto _end;
}
code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo);
if (code == TSDB_CODE_SUCCESS) {
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
pInfo->pUpdateInfo = pUpInfo;
qDebug("%s line:%d. stream scan updateinfo deserialize success", __func__, __LINE__);
} else {
taosMemoryFree(pUpInfo);
code = TSDB_CODE_SUCCESS;
qDebug("%s line:%d. stream scan did not have updateinfo", __func__, __LINE__);
}
if (tDecodeIsEnd(pDeCoder)) {
@ -3509,6 +3510,7 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
lino = __LINE__;
goto _end;
}
qDebug("%s last scan range %d. %" PRId64 ",%" PRId64, __func__, __LINE__, pInfo->lastScanRange.skey, pInfo->lastScanRange.ekey);
_end:
if (pDeCoder != NULL) {

View File

@ -3037,61 +3037,60 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
TSKEY startKey = getRowPTs(pInput->pPTS, 0);
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
#if 0
int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
// the optimized version only valid if all tuples in one block are monotonious increasing or descreasing.
// this assumption is NOT always works if project operator exists in downstream.
if (blockDataOrder == TSDB_ORDER_ASC) {
if (pCtx->order == TSDB_ORDER_ASC && !pCtx->hasPrimaryKey) {
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
char* data = colDataGetData(pInputCol, i);
bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL);
char* data = isNull ? NULL : colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
numOfElems++;
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
doSaveLastrow(pCtx, data, i, cts, pInfo);
int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo);
if (code != TSDB_CODE_SUCCESS) return code;
}
break;
}
} else { // descending order
} else if (!pCtx->hasPrimaryKey && pCtx->order == TSDB_ORDER_DESC) {
// the optimized version only valid if all tuples in one block are monotonious increasing or descreasing.
// this assumption is NOT always works if project operator exists in downstream.
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
char* data = colDataGetData(pInputCol, i);
bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL);
char* data = isNull ? NULL : colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
numOfElems++;
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
doSaveLastrow(pCtx, data, i, cts, pInfo);
int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo);
if (code != TSDB_CODE_SUCCESS) return code;
}
break;
}
}
#else
} else {
int64_t* pts = (int64_t*)pInput->pPTS->pData;
int from = -1;
int32_t i = -1;
while (funcInputGetNextRowIndex(pInput, from, false, &i, &from)) {
bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL);
char* data = isNull ? NULL : colDataGetData(pInputCol, i);
TSKEY cts = pts[i];
int64_t* pts = (int64_t*)pInput->pPTS->pData;
int from = -1;
int32_t i = -1;
while (funcInputGetNextRowIndex(pInput, from, false, &i, &from)) {
bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL);
char* data = isNull ? NULL : colDataGetData(pInputCol, i);
TSKEY cts = pts[i];
numOfElems++;
char* pkData = NULL;
if (pCtx->hasPrimaryKey) {
pkData = colDataGetData(pkCol, i);
}
if (pResInfo->numOfRes == 0 || pInfo->ts < cts ||
(pInfo->ts == pts[i] && pkCompareFn && pkCompareFn(pkData, pInfo->pkData) < 0)) {
int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
numOfElems++;
char* pkData = NULL;
if (pCtx->hasPrimaryKey) {
pkData = colDataGetData(pkCol, i);
}
if (pResInfo->numOfRes == 0 || pInfo->ts < cts ||
(pInfo->ts == pts[i] && pkCompareFn && pkCompareFn(pkData, pInfo->pkData) < 0)) {
int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResInfo->numOfRes = 1;
}
pResInfo->numOfRes = 1;
}
}
#endif
}
SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS;

View File

@ -115,6 +115,7 @@ typedef struct SParseMetaCache {
SHashObj* pTableName; // key is tbFUid, elements is STableMeta*(append with tbName)
SArray* pDnodes; // element is SEpSet
bool dnodeRequired;
bool forceFetchViewMeta;
} SParseMetaCache;
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);

View File

@ -810,7 +810,7 @@ static int32_t collectMetaKeyFromShowCreateView(SCollectMetaKeyCxt* pCxt, SShowC
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache);
}
pCxt->pMetaCache->forceFetchViewMeta = true;
return code;
}
@ -888,6 +888,7 @@ static int32_t collectMetaKeyFromCreateViewStmt(SCollectMetaKeyCxt* pCxt, SCreat
static int32_t collectMetaKeyFromDropViewStmt(SCollectMetaKeyCxt* pCxt, SDropViewStmt* pStmt) {
int32_t code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName,
pStmt->viewName, AUTH_TYPE_ALTER, pCxt->pMetaCache);
pCxt->pMetaCache->forceFetchViewMeta = true;
return code;
}

View File

@ -817,6 +817,7 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
}
#endif
pCatalogReq->dNodeRequired = pMetaCache->dnodeRequired;
pCatalogReq->forceFetchViewMeta = pMetaCache->forceFetchViewMeta;
return code;
}

View File

@ -204,6 +204,7 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order, SLogicNode* pNode
// case QUERY_NODE_LOGIC_PLAN_WINDOW:
case QUERY_NODE_LOGIC_PLAN_AGG:
case QUERY_NODE_LOGIC_PLAN_SORT:
case QUERY_NODE_LOGIC_PLAN_FILL:
if (pNode == pNodeForcePropagate) {
pNode->outputTsOrder = order;
break;

View File

@ -313,29 +313,29 @@ typedef struct SQWorkerMgmt {
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) \
qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
qError("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) \
qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
qWarn("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) \
qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_DLOGL(param, ...) \
qDebugL("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
qDebugL("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) \
qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
qError("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
#define QW_TASK_WLOG_E(param) \
qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
qWarn("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
#define QW_TASK_DLOG_E(param) \
qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
#define QW_SCH_TASK_ELOG(param, ...) \
qError("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
qId, cId, tId, eId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) \
qWarn("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, \
qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, \
cId, tId, eId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) \
qDebug("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
qId, cId, tId, eId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) \

View File

@ -62,7 +62,7 @@ typedef enum {
#define SCH_DEFAULT_MAX_RETRY_NUM 6
#define SCH_MIN_AYSNC_EXEC_NUM 3
#define SCH_DEFAULT_RETRY_TOTAL_ROUND 3
#define SCH_DEFAULT_TASK_CAPACITY_NUM 1000
#define SCH_DEFAULT_TASK_CAPACITY_NUM 1000
typedef struct SSchDebug {
bool lockEnable;
@ -333,12 +333,13 @@ extern SSchedulerMgmt schMgmt;
#define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock)
#define SCH_CLIENT_ID(_task) ((_task) ? (_task)->clientId : -1)
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
#define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_IS_DATA_BIND_PLAN(_plan) (((_plan)->subplanType == SUBPLAN_TYPE_SCAN) || ((_plan)->subplanType == SUBPLAN_TYPE_MODIFY))
#define SCH_IS_DATA_BIND_TASK(task) SCH_IS_DATA_BIND_PLAN((task)->plan)
#define SCH_IS_DATA_BIND_PLAN(_plan) \
(((_plan)->subplanType == SUBPLAN_TYPE_SCAN) || ((_plan)->subplanType == SUBPLAN_TYPE_MODIFY))
#define SCH_IS_DATA_BIND_TASK(task) SCH_IS_DATA_BIND_PLAN((task)->plan)
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
#define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task))
#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) \
@ -419,15 +420,15 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SWITCH_EPSET(_addr) ((_addr)->epSet.inUse = ((_addr)->epSet.inUse + 1) % (_addr)->epSet.numOfEps)
#define SCH_TASK_NUM_OF_EPS(_addr) ((_addr)->epSet.numOfEps)
#define SCH_LOG_TASK_START_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
if (NULL == taosArrayPush((_task)->profile.execTime, &us)) { \
qError("taosArrayPush task execTime failed, error:%s", tstrerror(terrno)); \
} \
if (0 == (_task)->execId) { \
(_task)->profile.startTs = us; \
} \
#define SCH_LOG_TASK_START_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
if (NULL == taosArrayPush((_task)->profile.execTime, &us)) { \
qError("taosArrayPush task execTime failed, error:%s", tstrerror(terrno)); \
} \
if (0 == (_task)->execId) { \
(_task)->profile.startTs = us; \
} \
} while (0)
#define SCH_LOG_TASK_WAIT_TS(_task) \
@ -450,23 +451,23 @@ extern SSchedulerMgmt schMgmt;
(_task)->profile.endTs = us; \
} while (0)
#define SCH_JOB_ELOG(param, ...) qError("qid:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("qid:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) \
qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
qError("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) \
qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
#define SCH_TASK_TLOG(param, ...) \
qTrace("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
qTrace("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
#define SCH_TASK_DLOGL(param, ...) \
qDebugL("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
qDebugL("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) \
qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
qWarn("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
#define SCH_SET_ERRNO(_err) \
@ -580,7 +581,7 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void *param);
int32_t schAcquireJob(int64_t refId, SSchJob **ppJob);
int32_t schReleaseJob(int64_t refId);
int32_t schReleaseJobEx(int64_t refId, int32_t* released);
int32_t schReleaseJobEx(int64_t refId, int32_t *released);
void schFreeFlowCtrl(SSchJob *pJob);
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
@ -648,7 +649,7 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pTask);
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int32_t idx, int32_t taskNum);
int32_t schValidateSubplan(SSchJob *pJob, SSubplan *pSubplan, int32_t level, int32_t idx, int32_t taskNum);
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode);

View File

@ -445,6 +445,11 @@ int32_t updateInfoSerialize(SEncoder* pEncoder, const SUpdateInfo* pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (!pInfo) {
if (tEncodeI32(pEncoder, -1) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
uDebug("%s line:%d. it did not have updateinfo", __func__, __LINE__);
return TSDB_CODE_SUCCESS;
}
@ -550,6 +555,10 @@ int32_t updateInfoDeserialize(SDecoder* pDeCoder, SUpdateInfo* pInfo) {
int32_t size = 0;
if (tDecodeI32(pDeCoder, &size) < 0) return -1;
if (size < 0) {
return -1;
}
pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
QUERY_CHECK_NULL(pInfo->pTsBuckets, code, lino, _error, terrno);

View File

@ -7,7 +7,7 @@ RUN apt-get install -y locales psmisc sudo tree libgeos-dev libgflags2.2 libgfl
RUN sed -i 's/# en_US.UTF-8/en_US.UTF-8/' /etc/locale.gen && locale-gen
RUN pip3 config set global.index-url http://admin:123456@192.168.0.212:3141/admin/dev/+simple/
RUN pip3 config set global.trusted-host 192.168.0.212
RUN pip3 install taospy==2.7.16 taos-ws-py==0.3.3 pandas psutil fabric2 requests faker simplejson toml pexpect tzlocal distro decorator loguru hyperloglog
RUN pip3 install taospy==2.7.16 taos-ws-py==0.3.5 pandas psutil fabric2 requests faker simplejson toml pexpect tzlocal distro decorator loguru hyperloglog
ENV LANG=en_US.UTF-8 LANGUAGE=en_US.UTF-8 LC_ALL=en_US.UTF-8
RUN apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9
RUN add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/'

View File

@ -130,7 +130,7 @@ pip3 install kafka-python
python3 kafka_example_consumer.py
# 21
pip3 install taos-ws-py==0.3.3
pip3 install taos-ws-py==0.3.5
python3 conn_websocket_pandas.py
# 22

View File

@ -76,9 +76,9 @@ ulimit -c unlimited
md5sum /usr/lib/libtaos.so.1
md5sum /home/TDinternal/debug/build/lib/libtaos.so
#get python connector and update: taospy 2.7.16 taos-ws-py 0.3.3
#get python connector and update: taospy 2.7.16 taos-ws-py 0.3.5
pip3 install taospy==2.7.16
pip3 install taos-ws-py==0.3.3
pip3 install taos-ws-py==0.3.5
$TIMEOUT_CMD $cmd
RET=$?
echo "cmd exit code: $RET"

View File

@ -0,0 +1,67 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c checkpointInterval -v 60
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step1
print =============== create database
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt1 as select _wstart, count(a) from st partition by tbname interval(2s);
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(a) from st interval(2s);
run tsim/stream/checkTaskStatus.sim
sleep 70000
print restart taosd 01 ......
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(now + 3000a,1,1,1);
$loop_count = 0
loop0:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print select * from streamt1;
sql select * from streamt1;
print $data00 $data01 $data02
if $rows == 0 then
goto loop0
endi
print select * from streamt2;
sql select * from streamt2;
print $data00 $data01 $data02
if $rows == 0 then
goto loop0
endi
print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT