diff --git a/Jenkinsfile2 b/Jenkinsfile2
index 7ba9b4a933..4bb0754343 100644
--- a/Jenkinsfile2
+++ b/Jenkinsfile2
@@ -5,7 +5,7 @@ node {
}
file_zh_changed = ''
file_en_changed = ''
-file_no_doc_changed = ''
+file_no_doc_changed = '1'
def abortPreviousBuilds() {
def currentJobName = env.JOB_NAME
def currentBuildNumber = env.BUILD_NUMBER.toInteger()
@@ -355,7 +355,7 @@ def pre_test_build_win() {
bat '''
cd %WIN_COMMUNITY_ROOT%/tests/ci
pip3 install taospy==2.7.16
- pip3 install taos-ws-py==0.3.3
+ pip3 install taos-ws-py==0.3.5
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
'''
return 1
@@ -655,4 +655,4 @@ pipeline {
)
}
}
-}
\ No newline at end of file
+}
diff --git a/docs/en/14-reference/05-connectors/30-python.mdx b/docs/en/14-reference/05-connectors/30-python.mdx
index 7263a3caa6..4f17261b33 100644
--- a/docs/en/14-reference/05-connectors/30-python.mdx
+++ b/docs/en/14-reference/05-connectors/30-python.mdx
@@ -41,12 +41,18 @@ We recommend using the latest version of `taospy`, regardless of the version of
|Python Client Library Version|major changes|
|:-------------------:|:----:|
+|2.7.16|add subscription configuration (session.timeout.ms, max.poll.interval.ms)|
+|2.7.15|added support for VARBINARY and GEOMETRY types|
+|2.7.14|fix known issues|
+|2.7.13|add TMQ synchronous submission offset interface|
|2.7.12|1. added support for `varbinary` type (STMT does not yet support)
2. improved query performance (thanks to contributor [hadrianl](https://github.com/taosdata/taos-connector-python/pull/209))|
|2.7.9|support for getting assignment and seek function on subscription|
|2.7.8|add `execute_many` method|
|Python Websocket Connection Version|major changes|
|:----------------------------:|:-----:|
+|0.3.5|1. added support for VARBINARY and GEOMETRY types
2. Fix known issues|
+|0.3.2|1. optimize WebSocket SQL query and insertion performance
2. Fix known issues
3. Modify the readme and document|
|0.2.9|bugs fixes|
|0.2.5|1. support for getting assignment and seek function on subscription
2. support schemaless
3. support STMT|
|0.2.4|support `unsubscribe` on subscription|
diff --git a/docs/en/14-reference/05-connectors/35-node.mdx b/docs/en/14-reference/05-connectors/35-node.mdx
index 476f9bab71..2aeef7af1e 100644
--- a/docs/en/14-reference/05-connectors/35-node.mdx
+++ b/docs/en/14-reference/05-connectors/35-node.mdx
@@ -27,6 +27,8 @@ Node.js client library needs to be run with Node.js 14 or higher version.
| Node.js connector version | major changes | TDengine 版本 |
| :-----------------------: | :------------------: | :----------------:|
+| 3.1.2 | Optimized the data protocol and parsing, resulting in a significant improvement in performance | 3.2.0.0 or later |
+| 3.1.1 | Optimized data transmission performance | 3.2.0.0 or later |
| 3.1.0 | new version, supports websocket | 3.2.0.0 or later |
## Supported features
diff --git a/docs/en/28-releases/01-tdengine.md b/docs/en/28-releases/01-tdengine.md
index 161efe0d61..b24931b166 100644
--- a/docs/en/28-releases/01-tdengine.md
+++ b/docs/en/28-releases/01-tdengine.md
@@ -24,10 +24,6 @@ import Release from "/components/ReleaseV3";
-## 3.3.4.3
-
-
-
## 3.3.3.0
diff --git a/docs/zh/14-reference/05-connector/30-python.mdx b/docs/zh/14-reference/05-connector/30-python.mdx
index 8436c30249..3991477635 100644
--- a/docs/zh/14-reference/05-connector/30-python.mdx
+++ b/docs/zh/14-reference/05-connector/30-python.mdx
@@ -41,6 +41,7 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con
|Python Connector 版本|主要变化|
|:-------------------:|:----:|
+|2.7.16|新增订阅配置 (session.timeout.ms, max.poll.interval.ms)|
|2.7.15|新增 VARBINARY 和 GEOMETRY 类型支持|
|2.7.14|修复已知问题|
|2.7.13|新增 tmq 同步提交 offset 接口|
@@ -50,6 +51,7 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con
|Python WebSocket Connector 版本|主要变化|
|:----------------------------:|:-----:|
+|0.3.5|新增 VARBINARY 和 GEOMETRY 类型支持,修复已知问题|
|0.3.2|优化 WebSocket sql 查询和插入性能,修改 readme 和 文档,修复已知问题|
|0.2.9|已知问题修复|
|0.2.5|1. 数据订阅支持获取消费进度和重置消费进度
2. 支持 schemaless
3. 支持 STMT|
diff --git a/docs/zh/14-reference/05-connector/35-node.mdx b/docs/zh/14-reference/05-connector/35-node.mdx
index d9512eae78..df2abfab3d 100644
--- a/docs/zh/14-reference/05-connector/35-node.mdx
+++ b/docs/zh/14-reference/05-connector/35-node.mdx
@@ -26,6 +26,7 @@ Node.js 连接器目前仅支持 WebSocket 连接器, 其通过 taosAdapter
| Node.js 连接器 版本 | 主要变化 | TDengine 版本 |
| :------------------: | :----------------------: | :----------------: |
+| 3.1.2 | 对数据协议和解析进行了优化,性能得到大幅提升| 3.3.2.0 及更高版本 |
| 3.1.1 | 优化了数据传输性能 | 3.3.2.0 及更高版本 |
| 3.1.0 | 新版本发布,支持 WebSocket 连接 | 3.2.0.0 及更高版本 |
diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md
index d285cb36ae..cf9d7b6878 100644
--- a/docs/zh/28-releases/01-tdengine.md
+++ b/docs/zh/28-releases/01-tdengine.md
@@ -28,10 +28,6 @@ import Release from "/components/ReleaseV3";
-## 3.3.4.3
-
-
-
## 3.3.3.0
diff --git a/docs/zh/28-releases/03-notes/3.3.4.3.md b/docs/zh/28-releases/03-notes/3.3.4.3.md
index 364732b478..8ffd5802ed 100644
--- a/docs/zh/28-releases/03-notes/3.3.4.3.md
+++ b/docs/zh/28-releases/03-notes/3.3.4.3.md
@@ -1,60 +1,69 @@
-## 新特性
-* 新功能:流计算的 TWA 函数支持时间驱动的结果推送模式
-* 新功能:流计算的 Interp 函数支持时间驱动的结果推送模式
-* 优化:顺序执行 compact 和 split vgroup操作时的日志错误提示
-* 新功能:支持微软对象存储
+---
+title: 3.3.4.3 版本说明
+sidebar_label: 3.3.4.3
+description: 3.3.4.3 版本说明
+---
-## 优化
-* 优化:提升并发大查询时节点之间互相拉数据的效率
-* 优化:支持使用 AVX2 和 AVX512 对 double 、timestamp 和 bigint 类型进行解码优化
-* 优化:调整 case when 语句的结果类型判断方法
-* 优化:提升查询 “select ... from ... where ts in (...)” 的数据扫描速度
-* 优化:增加了流计算的兼容性保证机制,避免后续函数变更产生新的兼容性问题,之前版本的流计算必须重建
-* 优化:提升 taosX 在交叉写入场景下的数据同步性能
-* 优化:支持关闭整数/浮点数类型的编码
-* 优化:多副本流计算中必须使用 snode
-* 优化:客户端生成唯一 ID 标识每一个查询任务,避免重复 ID 导致的内存损坏
-* 优化:加快数据库的创建时间
-* 优化:修改 s3MigrateEnabled 默认值为0
-* 优化:支持在审计数据库中记录删除操作
-* 优化:支持在指定的 dnode 中创建数据库 [企业版]
-* 优化:调整删除超级表数据列时的报错信息
+### 行为变更及兼容性
+1. 多副本流计算中必须使用 snode
+1. 增加了流计算的兼容性保证机制,避免后续函数变更产生新的兼容性问题,但之前版本的流计算必须重建,具体参见 https://docs.taosdata.com/advanced/stream/#流计算升级故障恢复
+1. 调整 case when 语句结果类型的判断方法
+
+### 新特性
+1. 新功能:流计算的 TWA 函数支持时间驱动的结果推送模式
+1. 新功能:流计算的 Interp 函数支持时间驱动的结果推送模式
+1. 新功能:支持微软对象存储
-## 修复
-* 修复:last_row 查询性能在 3.3.3.0 中大幅下降的问题
-* 修复:WAL 条目不完整时 taosd 无法启动的问题
-* 修复: partition by 常量时查询结果错误的问题
-* 修复:标量函数包含 _wstart 且填充方式为 prev 时计算结果错误
-* 修复:Windows 平台下的时区设置问题
-* 修复:空数据库进行 compact 操作时,事务无法结束【企业版】
-* 修复:事务冲突的逻辑错误
-* 修复:管理节点某些错误会导致事务无法停止
-* 修复:管理节点某些错误会导致事务无法停止
-* 修复:dnode 数据清空后 taosc 重试错误的问题
-* 修复:Data Compact 被异常终止后,中间文件未被清理
-* 修复:新增列后,Kafka 连接器的 earliest 模式消费不到新列数据
-* 修复:interp 函数在 fill(prev) 时行为不正确
-* 修复:TSMA 在高频元数据操作时异常停止的问题
-* 修复:show create stable 语句执行结果的标签显示错误
-* 修复:Percentile 函数在大数据量查询时会崩溃。
-* 修复:partition by 和 having 联合使用时的语法错误问题
-* 修复:interp 在 partition by tbname,c1 时 tbname 为空的问题
-* 修复:通过 stmt 写入非法布尔数值时 taosd 可能 crash
-* 修复:Explorer OPC-UA 表名模板说明
-* 修复:库符号 version 与使用相同符号的库冲突的问题 https://github.com/taosdata/TDengine/issues/25920
-* 修复:在 windows 平台下 JDBC 驱动的句柄数持续升高问题
-* 修复:3.3.3.1 升级至 3.3.4.0 偶现的启动失败问题
-* 修复:Windows 平台重复增删表的内存泄漏
-* 修复:无法限制并发拉起 checkpoint 数量导致流计算消耗资源过多
-* 修复:并发查询时的 too many session 问题
-* 修复:Windows 平台下 taos shell 在慢查询场景中崩溃的问题
-* 修复:当打开 dnode日志时,加密数据库无法恢复的问题
-* 修复:由于 mnode 同步超时,进而导致 taosd 无法启动的问题
-* 修复:由于在快照同步过程中整理文件组数据的速度过慢,从而导致 Vnode(虚拟节点)无法恢复的问题
-* 修复:通过行协议向字符串类型的字段中写入带转义符的数据时,taosd 会崩溃
-* 修复:Error Code 逻辑处理错误导致的元数据文件损坏
-* 修复:查询语句中包含多个 “not” 条件语句嵌套时,未设置标量模式导致查询错误
-* 修复:vnode 统计信息上报超时导致的 dnode offline 问题
-* 修复:在不支持 avx 指令集的服务器上,taosd 启动失败问题
-* 修复:taosX 数据迁移容错处理 0x09xx 错误码
+### 优化
+1. 优化:提升并发大查询时节点之间互相拉数据的效率
+1. 优化:支持使用 AVX2 和 AVX512 对 double 、timestamp 和 bigint 类型进行解码优化
+1. 优化:调整 case when 语句的结果类型判断方法
+1. 优化:顺序执行 compact 和 split vgroup操作时的日志错误提示
+1. 优化:提升查询 “select ... from ... where ts in (...)” 的数据扫描速度
+1. 优化:增加了流计算的兼容性保证机制,避免后续函数变更产生新的兼容性问题,之前版本的流计算必须重建
+1. 优化:提升 taosX 在交叉写入场景下的数据同步性能
+1. 优化:支持关闭整数/浮点数类型的编码
+1. 优化:多副本流计算中必须使用 snode
+1. 优化:客户端生成唯一 ID 标识每一个查询任务,避免重复 ID 导致的内存损坏
+1. 优化:加快数据库的创建时间
+1. 优化:修改 s3MigrateEnabled 默认值为0
+1. 优化:支持在审计数据库中记录删除操作
+1. 优化:支持在指定的 dnode 中创建数据库 [企业版]
+1. 优化:调整删除超级表数据列时的报错信息
+### 修复
+1. 修复:last_row 查询性能在 3.3.3.0 中大幅下降的问题
+1. 修复:WAL 条目不完整时 taosd 无法启动的问题
+1. 修复: partition by 常量时查询结果错误的问题
+1. 修复:标量函数包含 _wstart 且填充方式为 prev 时计算结果错误
+1. 修复:Windows 平台下的时区设置问题
+1. 修复:空数据库进行 compact 操作时,事务无法结束【企业版】
+1. 修复:事务冲突的逻辑错误
+1. 修复:管理节点某些错误会导致事务无法停止
+1. 修复:管理节点某些错误会导致事务无法停止
+1. 修复:dnode 数据清空后 taosc 重试错误的问题
+1. 修复:Data Compact 被异常终止后,中间文件未被清理
+1. 修复:新增列后,Kafka 连接器的 earliest 模式消费不到新列数据
+1. 修复:interp 函数在 fill(prev) 时行为不正确
+1. 修复:TSMA 在高频元数据操作时异常停止的问题
+1. 修复:show create stable 语句执行结果的标签显示错误
+1. 修复:Percentile 函数在大数据量查询时会崩溃。
+1. 修复:partition by 和 having 联合使用时的语法错误问题
+1. 修复:interp 在 partition by tbname,c1 时 tbname 为空的问题
+1. 修复:通过 stmt 写入非法布尔数值时 taosd 可能 crash
+1. 修复:库符号 version 与使用相同符号的库冲突的问题
+1. 修复:在 windows 平台下 JDBC 驱动的句柄数持续升高问题
+1. 修复:3.3.3.1 升级至 3.3.4.0 偶现的启动失败问题
+1. 修复:Windows 平台重复增删表的内存泄漏
+1. 修复:无法限制并发拉起 checkpoint 数量导致流计算消耗资源过多
+1. 修复:并发查询时的 too many session 问题
+1. 修复:Windows 平台下 taos shell 在慢查询场景中崩溃的问题
+1. 修复:当打开 dnode日志时,加密数据库无法恢复的问题
+1. 修复:由于 mnode 同步超时,进而导致 taosd 无法启动的问题
+1. 修复:由于在快照同步过程中整理文件组数据的速度过慢,从而导致 Vnode(虚拟节点)无法恢复的问题
+1. 修复:通过行协议向字符串类型的字段中写入带转义符的数据时,taosd 会崩溃
+1. 修复:Error Code 逻辑处理错误导致的元数据文件损坏
+1. 修复:查询语句中包含多个 “not” 条件语句嵌套时,未设置标量模式导致查询错误
+1. 修复:vnode 统计信息上报超时导致的 dnode offline 问题
+1. 修复:在不支持 avx 指令集的服务器上,taosd 启动失败问题
+1. 修复:taosX 数据迁移容错处理 0x09xx 错误码
diff --git a/docs/zh/28-releases/03-notes/index.md b/docs/zh/28-releases/03-notes/index.md
index 1cdd3239e7..d1a48ab9a8 100644
--- a/docs/zh/28-releases/03-notes/index.md
+++ b/docs/zh/28-releases/03-notes/index.md
@@ -4,7 +4,6 @@ sidebar_label: 版本说明
description: 各版本版本说明
---
-[3.3.4.3](./3.3.4.3)
[3.3.4.3](./3.3.4.3)
[3.3.3.0](./3.3.3.0)
[3.3.2.0](./3.3.2.0)
diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h
index df3f87973f..7c6f02513e 100644
--- a/include/libs/catalog/catalog.h
+++ b/include/libs/catalog/catalog.h
@@ -102,6 +102,7 @@ typedef struct SCatalogReq {
bool svrVerRequired;
bool forceUpdate;
bool cloned;
+ bool forceFetchViewMeta;
} SCatalogReq;
typedef struct SMetaRes {
diff --git a/include/util/tbuffer.inc b/include/util/tbuffer.inc
index 39090fb7fa..633517ca58 100644
--- a/include/util/tbuffer.inc
+++ b/include/util/tbuffer.inc
@@ -186,11 +186,25 @@ static int32_t tBufferGetI16(SBufferReader *reader, int16_t *value) {
}
static int32_t tBufferGetI32(SBufferReader *reader, int32_t *value) {
- return tBufferGet(reader, sizeof(*value), value);
+ if (reader->offset + sizeof(int32_t) > reader->buffer->size) {
+ return TSDB_CODE_OUT_OF_RANGE;
+ }
+ if (value) {
+ *value = *(int32_t*)BR_PTR(reader);
+ }
+ reader->offset += sizeof(int32_t);
+ return 0;
}
static int32_t tBufferGetI64(SBufferReader *reader, int64_t *value) {
- return tBufferGet(reader, sizeof(*value), value);
+ if (reader->offset + sizeof(int64_t) > reader->buffer->size) {
+ return TSDB_CODE_OUT_OF_RANGE;
+ }
+ if (value) {
+ *value = *(int64_t*)BR_PTR(reader);
+ }
+ reader->offset += sizeof(int64_t);
+ return 0;
}
static int32_t tBufferGetU8(SBufferReader *reader, uint8_t *value) { return tBufferGet(reader, sizeof(*value), value); }
diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c
index 720ba68414..f51ffe0c83 100644
--- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c
+++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c
@@ -972,7 +972,7 @@ static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinR
break;
}
- if ((writer->brinBlock->numOfRecords) >= writer->config->maxRow) {
+ if ((writer->brinBlock->numOfRecords) >= 256) {
TAOS_CHECK_GOTO(tsdbDataFileWriteBrinBlock(writer), &lino, _exit);
}
diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c
index 527486270c..ac8e8505e4 100644
--- a/source/dnode/vnode/src/tsdb/tsdbRead2.c
+++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c
@@ -836,6 +836,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
pList = &pReader->status.uidList;
int32_t i = 0;
+ int32_t j = 0;
while (i < TARRAY2_SIZE(pBlkArray)) {
pBrinBlk = &pBlkArray->data[i];
if (pBrinBlk->maxTbid.suid < pReader->info.suid) {
@@ -851,7 +852,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
(pBrinBlk->minTbid.suid <= pReader->info.suid) && (pBrinBlk->maxTbid.suid >= pReader->info.suid), code, lino,
_end, TSDB_CODE_INTERNAL_ERROR);
- if (pBrinBlk->maxTbid.suid == pReader->info.suid && pBrinBlk->maxTbid.uid < pList->tableUidList[0]) {
+ if (pBrinBlk->maxTbid.suid == pReader->info.suid && pBrinBlk->maxTbid.uid < pList->tableUidList[j]) {
i += 1;
continue;
}
@@ -864,6 +865,14 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
TSDB_CHECK_NULL(p1, code, lino, _end, terrno);
i += 1;
+ if (pBrinBlk->maxTbid.suid == pReader->info.suid) {
+ while (j < numOfTables && pList->tableUidList[j] < pBrinBlk->maxTbid.uid) {
+ j++;
+ }
+ if (j >= numOfTables) {
+ break;
+ }
+ }
}
et2 = taosGetTimestampUs();
diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h
index e757163ba8..b581e31919 100644
--- a/source/libs/catalog/inc/catalogInt.h
+++ b/source/libs/catalog/inc/catalogInt.h
@@ -271,6 +271,7 @@ typedef struct SCtgViewsCtx {
SArray* pNames;
SArray* pResList;
SArray* pFetchs;
+ bool forceFetch;
} SCtgViewsCtx;
typedef enum {
@@ -831,12 +832,12 @@ typedef struct SCtgCacheItemInfo {
#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCtg, __VA_ARGS__)
-#define ctgTaskFatal(param, ...) qFatal("qid:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
-#define ctgTaskError(param, ...) qError("qid:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
-#define ctgTaskWarn(param, ...) qWarn("qid:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
-#define ctgTaskInfo(param, ...) qInfo("qid:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
-#define ctgTaskDebug(param, ...) qDebug("qid:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
-#define ctgTaskTrace(param, ...) qTrace("qid:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
+#define ctgTaskFatal(param, ...) qFatal("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
+#define ctgTaskError(param, ...) qError("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
+#define ctgTaskWarn(param, ...) qWarn("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
+#define ctgTaskInfo(param, ...) qInfo("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
+#define ctgTaskDebug(param, ...) qDebug("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
+#define ctgTaskTrace(param, ...) qTrace("QID:%" PRIx64 " CTG:%p " param, pTask->pJob->queryId, pCtg, __VA_ARGS__)
#define CTG_LOCK_DEBUG(...) \
do { \
diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c
index c1dcdf2741..9bfb4102aa 100644
--- a/source/libs/catalog/src/ctgAsync.c
+++ b/source/libs/catalog/src/ctgAsync.c
@@ -20,6 +20,11 @@
#include "tref.h"
#include "trpc.h"
+typedef struct SCtgViewTaskParam {
+ bool forceFetch;
+ SArray* pTableReqs;
+} SCtgViewTaskParam;
+
void ctgIsTaskDone(SCtgJob* pJob, CTG_TASK_TYPE type, bool* done) {
SCtgTask* pTask = NULL;
@@ -500,7 +505,7 @@ int32_t ctgInitGetTbTagTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
int32_t ctgInitGetViewsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SCtgTask task = {0};
-
+ SCtgViewTaskParam* p = param;
task.type = CTG_TASK_GET_VIEW;
task.taskId = taskIdx;
task.pJob = pJob;
@@ -511,7 +516,8 @@ int32_t ctgInitGetViewsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
}
SCtgViewsCtx* ctx = task.taskCtx;
- ctx->pNames = param;
+ ctx->pNames = p->pTableReqs;
+ ctx->forceFetch = p->forceFetch;
ctx->pResList = taosArrayInit(pJob->viewNum, sizeof(SMetaRes));
if (NULL == ctx->pResList) {
qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->viewNum,
@@ -849,13 +855,12 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
int32_t tbTagNum = (int32_t)taosArrayGetSize(pReq->pTableTag);
int32_t viewNum = (int32_t)ctgGetTablesReqNum(pReq->pView);
- int32_t tbTsmaNum = (int32_t)taosArrayGetSize(pReq->pTableTSMAs);
+ int32_t tbTsmaNum = tsQuerySmaOptimize ? (int32_t)taosArrayGetSize(pReq->pTableTSMAs) : 0;
int32_t tsmaNum = (int32_t)taosArrayGetSize(pReq->pTSMAs);
int32_t tbNameNum = (int32_t)ctgGetTablesReqNum(pReq->pTableName);
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum +
userNum + dbInfoNum + tbIndexNum + tbCfgNum + tbTagNum + viewNum + tbTsmaNum + tbNameNum;
-
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
if (NULL == *job) {
ctgError("failed to calloc, size:%d,QID:0x%" PRIx64, (int32_t)sizeof(SCtgJob), pConn->requestId);
@@ -1014,7 +1019,8 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
}
if (viewNum > 0) {
- CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_VIEW, pReq->pView, NULL));
+ SCtgViewTaskParam param = {.forceFetch = pReq->forceFetchViewMeta, .pTableReqs = pReq->pView};
+ CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_VIEW, ¶m, NULL));
}
if (tbTsmaNum > 0) {
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_TSMA, pReq->pTableTSMAs, NULL));
@@ -3712,16 +3718,14 @@ int32_t ctgLaunchGetViewsTask(SCtgTask* pTask) {
bool tbMetaDone = false;
SName* pName = NULL;
- /*
- ctgIsTaskDone(pJob, CTG_TASK_GET_TB_META_BATCH, &tbMetaDone);
- if (tbMetaDone) {
- CTG_ERR_RET(ctgBuildViewNullRes(pTask, pCtx));
- TSWAP(pTask->res, pCtx->pResList);
+ ctgIsTaskDone(pJob, CTG_TASK_GET_TB_META_BATCH, &tbMetaDone);
+ if (tbMetaDone && !pCtx->forceFetch) {
+ CTG_ERR_RET(ctgBuildViewNullRes(pTask, pCtx));
+ TSWAP(pTask->res, pCtx->pResList);
- CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
- return TSDB_CODE_SUCCESS;
- }
- */
+ CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
+ return TSDB_CODE_SUCCESS;
+ }
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
int32_t fetchIdx = 0;
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index eac4f87b14..095d7e1a2b 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -3410,6 +3410,8 @@ int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff, int32_t*
QUERY_CHECK_CODE(code, lino, _end);
}
+ qDebug("%s last scan range %d. %" PRId64 ",%" PRId64, __func__, __LINE__, pInfo->lastScanRange.skey, pInfo->lastScanRange.ekey);
+
*pLen = len;
_end:
@@ -3475,21 +3477,20 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
goto _end;
}
- if (pInfo->pUpdateInfo != NULL) {
- void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
- if (!pUpInfo) {
- lino = __LINE__;
- goto _end;
- }
- code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo);
- if (code == TSDB_CODE_SUCCESS) {
- pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
- pInfo->pUpdateInfo = pUpInfo;
- } else {
- taosMemoryFree(pUpInfo);
- lino = __LINE__;
- goto _end;
- }
+ void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
+ if (!pUpInfo) {
+ lino = __LINE__;
+ goto _end;
+ }
+ code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo);
+ if (code == TSDB_CODE_SUCCESS) {
+ pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
+ pInfo->pUpdateInfo = pUpInfo;
+ qDebug("%s line:%d. stream scan updateinfo deserialize success", __func__, __LINE__);
+ } else {
+ taosMemoryFree(pUpInfo);
+ code = TSDB_CODE_SUCCESS;
+ qDebug("%s line:%d. stream scan did not have updateinfo", __func__, __LINE__);
}
if (tDecodeIsEnd(pDeCoder)) {
@@ -3509,6 +3510,7 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
lino = __LINE__;
goto _end;
}
+ qDebug("%s last scan range %d. %" PRId64 ",%" PRId64, __func__, __LINE__, pInfo->lastScanRange.skey, pInfo->lastScanRange.ekey);
_end:
if (pDeCoder != NULL) {
diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c
index c2e2e9c17c..83227dea9e 100644
--- a/source/libs/function/src/builtinsimpl.c
+++ b/source/libs/function/src/builtinsimpl.c
@@ -3037,61 +3037,60 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
TSKEY startKey = getRowPTs(pInput->pPTS, 0);
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
-#if 0
- int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
-
- // the optimized version only valid if all tuples in one block are monotonious increasing or descreasing.
- // this assumption is NOT always works if project operator exists in downstream.
- if (blockDataOrder == TSDB_ORDER_ASC) {
+ if (pCtx->order == TSDB_ORDER_ASC && !pCtx->hasPrimaryKey) {
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
- char* data = colDataGetData(pInputCol, i);
+ bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL);
+ char* data = isNull ? NULL : colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
numOfElems++;
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
- doSaveLastrow(pCtx, data, i, cts, pInfo);
+ int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo);
+ if (code != TSDB_CODE_SUCCESS) return code;
}
break;
}
- } else { // descending order
+ } else if (!pCtx->hasPrimaryKey && pCtx->order == TSDB_ORDER_DESC) {
+ // the optimized version only valid if all tuples in one block are monotonious increasing or descreasing.
+ // this assumption is NOT always works if project operator exists in downstream.
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
- char* data = colDataGetData(pInputCol, i);
+ bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL);
+ char* data = isNull ? NULL : colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
numOfElems++;
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
- doSaveLastrow(pCtx, data, i, cts, pInfo);
+ int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo);
+ if (code != TSDB_CODE_SUCCESS) return code;
}
break;
}
- }
-#else
+ } else {
+ int64_t* pts = (int64_t*)pInput->pPTS->pData;
+ int from = -1;
+ int32_t i = -1;
+ while (funcInputGetNextRowIndex(pInput, from, false, &i, &from)) {
+ bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL);
+ char* data = isNull ? NULL : colDataGetData(pInputCol, i);
+ TSKEY cts = pts[i];
- int64_t* pts = (int64_t*)pInput->pPTS->pData;
- int from = -1;
- int32_t i = -1;
- while (funcInputGetNextRowIndex(pInput, from, false, &i, &from)) {
- bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL);
- char* data = isNull ? NULL : colDataGetData(pInputCol, i);
- TSKEY cts = pts[i];
-
- numOfElems++;
- char* pkData = NULL;
- if (pCtx->hasPrimaryKey) {
- pkData = colDataGetData(pkCol, i);
- }
- if (pResInfo->numOfRes == 0 || pInfo->ts < cts ||
- (pInfo->ts == pts[i] && pkCompareFn && pkCompareFn(pkData, pInfo->pkData) < 0)) {
- int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo);
- if (code != TSDB_CODE_SUCCESS) {
- return code;
+ numOfElems++;
+ char* pkData = NULL;
+ if (pCtx->hasPrimaryKey) {
+ pkData = colDataGetData(pkCol, i);
+ }
+ if (pResInfo->numOfRes == 0 || pInfo->ts < cts ||
+ (pInfo->ts == pts[i] && pkCompareFn && pkCompareFn(pkData, pInfo->pkData) < 0)) {
+ int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo);
+ if (code != TSDB_CODE_SUCCESS) {
+ return code;
+ }
+ pResInfo->numOfRes = 1;
}
- pResInfo->numOfRes = 1;
}
- }
-#endif
+ }
SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS;
diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h
index 857c7604a9..7298b04eb0 100644
--- a/source/libs/parser/inc/parUtil.h
+++ b/source/libs/parser/inc/parUtil.h
@@ -115,6 +115,7 @@ typedef struct SParseMetaCache {
SHashObj* pTableName; // key is tbFUid, elements is STableMeta*(append with tbName)
SArray* pDnodes; // element is SEpSet
bool dnodeRequired;
+ bool forceFetchViewMeta;
} SParseMetaCache;
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c
index eecc04658b..b78e10768f 100644
--- a/source/libs/parser/src/parAstParser.c
+++ b/source/libs/parser/src/parAstParser.c
@@ -810,7 +810,7 @@ static int32_t collectMetaKeyFromShowCreateView(SCollectMetaKeyCxt* pCxt, SShowC
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache);
}
-
+ pCxt->pMetaCache->forceFetchViewMeta = true;
return code;
}
@@ -888,6 +888,7 @@ static int32_t collectMetaKeyFromCreateViewStmt(SCollectMetaKeyCxt* pCxt, SCreat
static int32_t collectMetaKeyFromDropViewStmt(SCollectMetaKeyCxt* pCxt, SDropViewStmt* pStmt) {
int32_t code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName,
pStmt->viewName, AUTH_TYPE_ALTER, pCxt->pMetaCache);
+ pCxt->pMetaCache->forceFetchViewMeta = true;
return code;
}
diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c
index e35eea9e72..44e44982a3 100644
--- a/source/libs/parser/src/parUtil.c
+++ b/source/libs/parser/src/parUtil.c
@@ -817,6 +817,7 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
}
#endif
pCatalogReq->dNodeRequired = pMetaCache->dnodeRequired;
+ pCatalogReq->forceFetchViewMeta = pMetaCache->forceFetchViewMeta;
return code;
}
diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c
index 39024731ed..a1809ff137 100644
--- a/source/libs/planner/src/planOptimizer.c
+++ b/source/libs/planner/src/planOptimizer.c
@@ -204,6 +204,7 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order, SLogicNode* pNode
// case QUERY_NODE_LOGIC_PLAN_WINDOW:
case QUERY_NODE_LOGIC_PLAN_AGG:
case QUERY_NODE_LOGIC_PLAN_SORT:
+ case QUERY_NODE_LOGIC_PLAN_FILL:
if (pNode == pNodeForcePropagate) {
pNode->outputTsOrder = order;
break;
diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h
index 708c285aea..6d81baf91a 100644
--- a/source/libs/qworker/inc/qwInt.h
+++ b/source/libs/qworker/inc/qwInt.h
@@ -313,29 +313,29 @@ typedef struct SQWorkerMgmt {
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) \
- qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
+ qError("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) \
- qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
+ qWarn("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) \
- qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
+ qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_DLOGL(param, ...) \
- qDebugL("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
+ qDebugL("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) \
- qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
+ qError("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
#define QW_TASK_WLOG_E(param) \
- qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
+ qWarn("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
#define QW_TASK_DLOG_E(param) \
- qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
+ qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
#define QW_SCH_TASK_ELOG(param, ...) \
- qError("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
+ qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
qId, cId, tId, eId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) \
- qWarn("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, \
+ qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, \
cId, tId, eId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) \
- qDebug("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
+ qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
qId, cId, tId, eId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) \
diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h
index 6a910453f0..ef643852ea 100644
--- a/source/libs/scheduler/inc/schInt.h
+++ b/source/libs/scheduler/inc/schInt.h
@@ -62,7 +62,7 @@ typedef enum {
#define SCH_DEFAULT_MAX_RETRY_NUM 6
#define SCH_MIN_AYSNC_EXEC_NUM 3
#define SCH_DEFAULT_RETRY_TOTAL_ROUND 3
-#define SCH_DEFAULT_TASK_CAPACITY_NUM 1000
+#define SCH_DEFAULT_TASK_CAPACITY_NUM 1000
typedef struct SSchDebug {
bool lockEnable;
@@ -333,12 +333,13 @@ extern SSchedulerMgmt schMgmt;
#define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock)
#define SCH_CLIENT_ID(_task) ((_task) ? (_task)->clientId : -1)
-#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
-#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
+#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
+#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
#define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
-#define SCH_IS_DATA_BIND_PLAN(_plan) (((_plan)->subplanType == SUBPLAN_TYPE_SCAN) || ((_plan)->subplanType == SUBPLAN_TYPE_MODIFY))
-#define SCH_IS_DATA_BIND_TASK(task) SCH_IS_DATA_BIND_PLAN((task)->plan)
+#define SCH_IS_DATA_BIND_PLAN(_plan) \
+ (((_plan)->subplanType == SUBPLAN_TYPE_SCAN) || ((_plan)->subplanType == SUBPLAN_TYPE_MODIFY))
+#define SCH_IS_DATA_BIND_TASK(task) SCH_IS_DATA_BIND_PLAN((task)->plan)
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
#define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task))
#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) \
@@ -419,15 +420,15 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SWITCH_EPSET(_addr) ((_addr)->epSet.inUse = ((_addr)->epSet.inUse + 1) % (_addr)->epSet.numOfEps)
#define SCH_TASK_NUM_OF_EPS(_addr) ((_addr)->epSet.numOfEps)
-#define SCH_LOG_TASK_START_TS(_task) \
- do { \
- int64_t us = taosGetTimestampUs(); \
- if (NULL == taosArrayPush((_task)->profile.execTime, &us)) { \
- qError("taosArrayPush task execTime failed, error:%s", tstrerror(terrno)); \
- } \
- if (0 == (_task)->execId) { \
- (_task)->profile.startTs = us; \
- } \
+#define SCH_LOG_TASK_START_TS(_task) \
+ do { \
+ int64_t us = taosGetTimestampUs(); \
+ if (NULL == taosArrayPush((_task)->profile.execTime, &us)) { \
+ qError("taosArrayPush task execTime failed, error:%s", tstrerror(terrno)); \
+ } \
+ if (0 == (_task)->execId) { \
+ (_task)->profile.startTs = us; \
+ } \
} while (0)
#define SCH_LOG_TASK_WAIT_TS(_task) \
@@ -450,23 +451,23 @@ extern SSchedulerMgmt schMgmt;
(_task)->profile.endTs = us; \
} while (0)
-#define SCH_JOB_ELOG(param, ...) qError("qid:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
-#define SCH_JOB_DLOG(param, ...) qDebug("qid:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
+#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
+#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) \
- qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
+ qError("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) \
- qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
+ qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
#define SCH_TASK_TLOG(param, ...) \
- qTrace("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
+ qTrace("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
#define SCH_TASK_DLOGL(param, ...) \
- qDebugL("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
+ qDebugL("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) \
- qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
+ qWarn("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
#define SCH_SET_ERRNO(_err) \
@@ -580,7 +581,7 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void *param);
int32_t schAcquireJob(int64_t refId, SSchJob **ppJob);
int32_t schReleaseJob(int64_t refId);
-int32_t schReleaseJobEx(int64_t refId, int32_t* released);
+int32_t schReleaseJobEx(int64_t refId, int32_t *released);
void schFreeFlowCtrl(SSchJob *pJob);
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
@@ -648,7 +649,7 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pTask);
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
-int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int32_t idx, int32_t taskNum);
+int32_t schValidateSubplan(SSchJob *pJob, SSubplan *pSubplan, int32_t level, int32_t idx, int32_t taskNum);
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode);
diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c
index a3cfa00127..49d5041369 100644
--- a/source/libs/stream/src/streamUpdate.c
+++ b/source/libs/stream/src/streamUpdate.c
@@ -445,6 +445,11 @@ int32_t updateInfoSerialize(SEncoder* pEncoder, const SUpdateInfo* pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (!pInfo) {
+ if (tEncodeI32(pEncoder, -1) < 0) {
+ code = TSDB_CODE_FAILED;
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
+ uDebug("%s line:%d. it did not have updateinfo", __func__, __LINE__);
return TSDB_CODE_SUCCESS;
}
@@ -550,6 +555,10 @@ int32_t updateInfoDeserialize(SDecoder* pDeCoder, SUpdateInfo* pInfo) {
int32_t size = 0;
if (tDecodeI32(pDeCoder, &size) < 0) return -1;
+
+ if (size < 0) {
+ return -1;
+ }
pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
QUERY_CHECK_NULL(pInfo->pTsBuckets, code, lino, _error, terrno);
diff --git a/tests/ci/Dockerfile b/tests/ci/Dockerfile
index d3d574b484..1caa6fea9e 100644
--- a/tests/ci/Dockerfile
+++ b/tests/ci/Dockerfile
@@ -7,7 +7,7 @@ RUN apt-get install -y locales psmisc sudo tree libgeos-dev libgflags2.2 libgfl
RUN sed -i 's/# en_US.UTF-8/en_US.UTF-8/' /etc/locale.gen && locale-gen
RUN pip3 config set global.index-url http://admin:123456@192.168.0.212:3141/admin/dev/+simple/
RUN pip3 config set global.trusted-host 192.168.0.212
-RUN pip3 install taospy==2.7.16 taos-ws-py==0.3.3 pandas psutil fabric2 requests faker simplejson toml pexpect tzlocal distro decorator loguru hyperloglog
+RUN pip3 install taospy==2.7.16 taos-ws-py==0.3.5 pandas psutil fabric2 requests faker simplejson toml pexpect tzlocal distro decorator loguru hyperloglog
ENV LANG=en_US.UTF-8 LANGUAGE=en_US.UTF-8 LC_ALL=en_US.UTF-8
RUN apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9
RUN add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/'
diff --git a/tests/docs-examples-test/python.sh b/tests/docs-examples-test/python.sh
index 6a25683b58..3a9812637c 100644
--- a/tests/docs-examples-test/python.sh
+++ b/tests/docs-examples-test/python.sh
@@ -130,7 +130,7 @@ pip3 install kafka-python
python3 kafka_example_consumer.py
# 21
-pip3 install taos-ws-py==0.3.3
+pip3 install taos-ws-py==0.3.5
python3 conn_websocket_pandas.py
# 22
diff --git a/tests/parallel_test/run_case.sh b/tests/parallel_test/run_case.sh
index 5b0d34fc0a..a78d0aa4a4 100755
--- a/tests/parallel_test/run_case.sh
+++ b/tests/parallel_test/run_case.sh
@@ -76,9 +76,9 @@ ulimit -c unlimited
md5sum /usr/lib/libtaos.so.1
md5sum /home/TDinternal/debug/build/lib/libtaos.so
-#get python connector and update: taospy 2.7.16 taos-ws-py 0.3.3
+#get python connector and update: taospy 2.7.16 taos-ws-py 0.3.5
pip3 install taospy==2.7.16
-pip3 install taos-ws-py==0.3.3
+pip3 install taos-ws-py==0.3.5
$TIMEOUT_CMD $cmd
RET=$?
echo "cmd exit code: $RET"
diff --git a/tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim b/tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim
new file mode 100644
index 0000000000..ed72d87e9a
--- /dev/null
+++ b/tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim
@@ -0,0 +1,67 @@
+system sh/stop_dnodes.sh
+system sh/deploy.sh -n dnode1 -i 1
+
+system sh/cfg.sh -n dnode1 -c checkpointInterval -v 60
+
+system sh/exec.sh -n dnode1 -s start
+sleep 50
+sql connect
+
+print step1
+print =============== create database
+sql create database test vgroups 4;
+sql use test;
+
+sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int);
+sql create table t1 using st tags(1,1,1);
+sql create table t2 using st tags(2,2,2);
+
+sql create stream streams1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt1 as select _wstart, count(a) from st partition by tbname interval(2s);
+sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(a) from st interval(2s);
+
+run tsim/stream/checkTaskStatus.sim
+
+sleep 70000
+
+
+print restart taosd 01 ......
+
+system sh/stop_dnodes.sh
+
+system sh/exec.sh -n dnode1 -s start
+
+run tsim/stream/checkTaskStatus.sim
+
+sql insert into t1 values(now + 3000a,1,1,1);
+
+$loop_count = 0
+loop0:
+
+sleep 2000
+
+$loop_count = $loop_count + 1
+if $loop_count == 20 then
+ return -1
+endi
+
+print select * from streamt1;
+sql select * from streamt1;
+
+print $data00 $data01 $data02
+
+if $rows == 0 then
+ goto loop0
+endi
+
+print select * from streamt2;
+sql select * from streamt2;
+
+print $data00 $data01 $data02
+
+if $rows == 0 then
+ goto loop0
+endi
+
+print end
+
+system sh/exec.sh -n dnode1 -s stop -x SIGINT