Merge remote-tracking branch 'origin/3.0' into feat/TD-24700
|
@ -83,7 +83,7 @@ If `maven` is used to manage the projects, what needs to be done is only adding
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.2.1</version>
|
||||
<version>3.2.4</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
|
|
|
@ -373,7 +373,7 @@ conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (locat
|
|||
<TabItem value="websocket" label="WebSocket connection">
|
||||
|
||||
```python
|
||||
conn = taosws.connect(url="ws://localhost:6041")
|
||||
conn = taosws.connect("taosws://localhost:6041")
|
||||
# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
|
||||
conn.execute("DROP DATABASE IF EXISTS test")
|
||||
conn.execute("CREATE DATABASE test")
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
---
|
||||
sidebar_label: qStudio
|
||||
title: qStudio
|
||||
description: Step-by-Step Guide to Accessing TDengine Data with qStudio
|
||||
---
|
||||
|
||||
qStudio is a free cross-platform SQL data analysis tool that allows easy browsing of tables, variables, functions, and configuration settings in a database. The latest version of qStudio includes built-in support for TDengine.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
To connect TDengine using qStudio, you need to complete the following preparations:
|
||||
|
||||
- Install qStudio: qStudio supports major operating systems, including Windows, macOS, and Linux. Please ensure you download the correct installation package for your platform from the [download page](https://www.timestored.com/qstudio/download/).
|
||||
- Set up TDengine instance: Make sure TDengine is installed and running correctly, and the taosAdapter is installed and running. For detailed information, refer to the taosAdapter User Manual.
|
||||
|
||||
## Connecting to TDengine with qStudio
|
||||
|
||||
1. Launch the qStudio application and select "Server" and then "Add Server..." from the menu. Choose TDengine from the Server Type dropdown.
|
||||
|
||||

|
||||
|
||||
2. Configure the TDengine connection by entering the host address, port number, username, and password. If TDengine is deployed on the local machine, you can fill in the username and password only. The default username is "root," and the default password is "taosdata." Click "Test" to test the connection's availability. If the TDengine Java connector is not installed on the local machine, qStudio will prompt you to download and install it.
|
||||
|
||||

|
||||
|
||||
3. Once connected successfully, the screen will display as shown below. If the connection fails, check that the TDengine service and taosAdapter are running correctly, and ensure that the host address, port number, username, and password are correct.
|
||||
|
||||

|
||||
|
||||
4. Use qStudio to select databases and tables to browse data from the TDengine server.
|
||||
|
||||

|
||||
|
||||
5. You can also perform operations on TDengine data by executing SQL commands.
|
||||
|
||||

|
||||
|
||||
6. qStudio supports charting functions based on the data. For more information, please refer to the [qStudio documentation](https://www.timestored.com/qstudio/help).
|
||||
|
||||

|
After Width: | Height: | Size: 94 KiB |
After Width: | Height: | Size: 148 KiB |
After Width: | Height: | Size: 34 KiB |
After Width: | Height: | Size: 93 KiB |
After Width: | Height: | Size: 39 KiB |
After Width: | Height: | Size: 78 KiB |
|
@ -22,7 +22,7 @@
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.2.1</version>
|
||||
<version>3.2.4</version>
|
||||
</dependency>
|
||||
<!-- ANCHOR_END: dep-->
|
||||
<dependency>
|
||||
|
@ -33,4 +33,4 @@
|
|||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
</project>
|
||||
|
|
|
@ -82,7 +82,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.2.1</version>
|
||||
<version>3.2.4</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
|
|
|
@ -446,7 +446,7 @@ TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据
|
|||
**注意**:
|
||||
|
||||
- JDBC REST 连接目前不支持参数绑定
|
||||
- 以下示例代码基于 taos-jdbcdriver-3.2.1
|
||||
- 以下示例代码基于 taos-jdbcdriver-3.2.4
|
||||
- binary 类型数据需要调用 setString 方法,nchar 类型数据需要调用 setNString 方法
|
||||
- 预处理语句中指定数据库与子表名称不要使用 `db.?`,应直接使用 `?`,然后在 setTableName 中指定数据库,如:`prepareStatement.setTableName("db.t1")`。
|
||||
|
||||
|
|
|
@ -375,7 +375,7 @@ conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (locat
|
|||
<TabItem value="websocket" label="WebSocket 连接">
|
||||
|
||||
```python
|
||||
conn = taosws.connect(url="ws://localhost:6041")
|
||||
conn = taosws.connect("taosws://localhost:6041")
|
||||
# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
|
||||
conn.execute("DROP DATABASE IF EXISTS test")
|
||||
conn.execute("CREATE DATABASE test")
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
---
|
||||
sidebar_label: qStudio
|
||||
title: qStudio
|
||||
description: 使用 qStudio 存取 TDengine 数据的详细指南
|
||||
---
|
||||
|
||||
qStudio 是一款免费的多平台 SQL 数据分析工具,可以轻松浏览数据库中的表、变量、函数和配置设置。最新版本 qStudio 内嵌支持 TDengine。
|
||||
|
||||
## 前置条件
|
||||
|
||||
使用 qStudio 连接 TDengine 需要以下几方面的准备工作。
|
||||
|
||||
- 安装 qStudio。qStudio 支持主流操作系统包括 Windows、macOS 和 Linux。请注意[下载](https://www.timestored.com/qstudio/download/)正确平台的安装包。
|
||||
- 安装 TDengine 实例,请确认 TDengine 正常运行,并且 taosAdapter 已经安装并正常运行,具体细节请参考 [taosAdapter 的使用手册](/reference/taosadapter)。
|
||||
|
||||
## 使用 qStudio 连接 TDengine
|
||||
|
||||
1. 启动 qStudio 应用,从菜单项选择“Server” 和 “Add Server...”,然后在 Server Type 下拉框中选择 TDengine。
|
||||
|
||||

|
||||
|
||||
2. 配置 TDengine 连接,填入主机地址、端口号、用户名和密码。如果 TDengine 部署在本机,可以只填用户名和密码,默认用户名为 root,默认密码为 taosdata。点击“Test”可以对连接是否可用进行测试。如果本机没有安装 TDengine Java
|
||||
连接器,qStudio 会提示下载安装。
|
||||
|
||||

|
||||
|
||||
3. 连接成功将显示如下图所示。如果显示连接失败,请检查 TDengine 服务和 taosAdapter 是否正确运行,主机地址、端口号、用户名和密码是否正确。
|
||||
|
||||

|
||||
|
||||
4. 使用 qStudio 选择数据库和表可以浏览 TDengine 服务的数据。
|
||||
|
||||

|
||||
|
||||
5. 也可以通过执行 SQL 命令的方式对 TDengine 数据进行操作。
|
||||
|
||||

|
||||
|
||||
6. qStudio 支持根据数据绘制图表等功能,请参考 [qStudio 的帮助文档](https://www.timestored.com/qstudio/help)
|
||||
|
||||

|
After Width: | Height: | Size: 94 KiB |
After Width: | Height: | Size: 148 KiB |
After Width: | Height: | Size: 34 KiB |
After Width: | Height: | Size: 93 KiB |
After Width: | Height: | Size: 39 KiB |
After Width: | Height: | Size: 78 KiB |
|
@ -95,6 +95,8 @@ struct tm *taosLocalTime(const time_t *timep, struct tm *result, char *buf);
|
|||
struct tm *taosLocalTimeNolock(struct tm *result, const time_t *timep, int dst);
|
||||
time_t taosTime(time_t *t);
|
||||
time_t taosMktime(struct tm *timep);
|
||||
int64_t user_mktime64(const uint32_t year, const uint32_t mon, const uint32_t day, const uint32_t hour,
|
||||
const uint32_t min, const uint32_t sec, int64_t time_zone);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -1442,4 +1442,178 @@ TEST(clientCase, sub_tb_mt_test) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST(clientCase, ts_3756) {
|
||||
// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
|
||||
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT_NE(pConn, nullptr);
|
||||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
|
||||
tmq_conf_set(conf, "enable.auto.commit", "false");
|
||||
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
|
||||
tmq_conf_set(conf, "group.id", "group_id_2");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "latest");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "false");
|
||||
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
tmq_conf_destroy(conf);
|
||||
|
||||
// 创建订阅 topics 列表
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
tmq_list_append(topicList, "tp");
|
||||
|
||||
// 启动订阅
|
||||
tmq_subscribe(tmq, topicList);
|
||||
tmq_list_destroy(topicList);
|
||||
|
||||
TAOS_FIELD* fields = NULL;
|
||||
int32_t numOfFields = 0;
|
||||
int32_t precision = 0;
|
||||
int32_t totalRows = 0;
|
||||
int32_t msgCnt = 0;
|
||||
int32_t timeout = 200;
|
||||
|
||||
int32_t count = 0;
|
||||
|
||||
tmq_topic_assignment* pAssign = NULL;
|
||||
int32_t numOfAssign = 0;
|
||||
|
||||
int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
return;
|
||||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
|
||||
}
|
||||
|
||||
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4);
|
||||
tmq_free_assignment(pAssign);
|
||||
|
||||
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
return;
|
||||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
|
||||
}
|
||||
|
||||
tmq_free_assignment(pAssign);
|
||||
|
||||
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
return;
|
||||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
printf("start to poll\n");
|
||||
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
|
||||
if (pRes) {
|
||||
char buf[128];
|
||||
|
||||
const char* topicName = tmq_get_topic_name(pRes);
|
||||
// const char* dbName = tmq_get_db_name(pRes);
|
||||
// int32_t vgroupId = tmq_get_vgroup_id(pRes);
|
||||
//
|
||||
// printf("topic: %s\n", topicName);
|
||||
// printf("db: %s\n", dbName);
|
||||
// printf("vgroup id: %d\n", vgroupId);
|
||||
|
||||
printSubResults(pRes, &totalRows);
|
||||
|
||||
tmq_topic_assignment* pAssignTmp = NULL;
|
||||
int32_t numOfAssignTmp = 0;
|
||||
|
||||
code = tmq_get_topic_assignment(tmq, "tp", &pAssignTmp, &numOfAssignTmp);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
return;
|
||||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssignTmp[i].vgId, pAssignTmp[i].currentOffset, pAssignTmp[i].begin, pAssignTmp[i].end);
|
||||
}
|
||||
if(numOfAssign != 0){
|
||||
int i = 0;
|
||||
for(; i < numOfAssign; i++){
|
||||
if(pAssign[i].currentOffset != pAssignTmp[i].currentOffset){
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(i == numOfAssign){
|
||||
printf("all position is same\n");
|
||||
break;
|
||||
}
|
||||
tmq_free_assignment(pAssign);
|
||||
}
|
||||
numOfAssign = numOfAssignTmp;
|
||||
pAssign = pAssignTmp;
|
||||
|
||||
} else {
|
||||
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
|
||||
// tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
|
||||
// tmq_commit_sync(tmq, pRes);
|
||||
continue;
|
||||
}
|
||||
|
||||
// tmq_commit_sync(tmq, pRes);
|
||||
if (pRes != NULL) {
|
||||
taos_free_result(pRes);
|
||||
// if ((++count) > 1) {
|
||||
// break;
|
||||
// }
|
||||
} else {
|
||||
// break;
|
||||
}
|
||||
|
||||
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin);
|
||||
}
|
||||
|
||||
tmq_free_assignment(pAssign);
|
||||
|
||||
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
return;
|
||||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
|
||||
}
|
||||
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
}
|
||||
#pragma GCC diagnostic pop
|
||||
|
|
|
@ -25,46 +25,6 @@
|
|||
|
||||
#include "tlog.h"
|
||||
|
||||
/*
|
||||
* mktime64 - Converts date to seconds.
|
||||
* Converts Gregorian date to seconds since 1970-01-01 00:00:00.
|
||||
* Assumes input in normal date format, i.e. 1980-12-31 23:59:59
|
||||
* => year=1980, mon=12, day=31, hour=23, min=59, sec=59.
|
||||
*
|
||||
* [For the Julian calendar (which was used in Russia before 1917,
|
||||
* Britain & colonies before 1752, anywhere else before 1582,
|
||||
* and is still in use by some communities) leave out the
|
||||
* -year/100+year/400 terms, and add 10.]
|
||||
*
|
||||
* This algorithm was first published by Gauss (I think).
|
||||
*
|
||||
* A leap second can be indicated by calling this function with sec as
|
||||
* 60 (allowable under ISO 8601). The leap second is treated the same
|
||||
* as the following second since they don't exist in UNIX time.
|
||||
*
|
||||
* An encoding of midnight at the end of the day as 24:00:00 - ie. midnight
|
||||
* tomorrow - (allowable under ISO 8601) is supported.
|
||||
*/
|
||||
static int64_t user_mktime64(const uint32_t year0, const uint32_t mon0, const uint32_t day, const uint32_t hour,
|
||||
const uint32_t min, const uint32_t sec, int64_t time_zone) {
|
||||
uint32_t mon = mon0, year = year0;
|
||||
|
||||
/* 1..12 -> 11,12,1..10 */
|
||||
if (0 >= (int32_t)(mon -= 2)) {
|
||||
mon += 12; /* Puts Feb last since it has leap day */
|
||||
year -= 1;
|
||||
}
|
||||
|
||||
// int64_t res = (((((int64_t) (year/4 - year/100 + year/400 + 367*mon/12 + day) +
|
||||
// year*365 - 719499)*24 + hour)*60 + min)*60 + sec);
|
||||
int64_t res;
|
||||
res = 367 * ((int64_t)mon) / 12;
|
||||
res += year / 4 - year / 100 + year / 400 + day + ((int64_t)year) * 365 - 719499;
|
||||
res = res * 24;
|
||||
res = ((res + hour) * 60 + min) * 60 + sec;
|
||||
|
||||
return (res + time_zone);
|
||||
}
|
||||
|
||||
// ==== mktime() kernel code =================//
|
||||
static int64_t m_deltaUtc = 0;
|
||||
|
|
|
@ -114,9 +114,10 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
|||
const STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg);
|
||||
|
||||
terrno = 0;
|
||||
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
|
||||
if (code != 0) {
|
||||
if (terrno != 0) {
|
||||
if (code == -1 && terrno != 0) {
|
||||
code = terrno;
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ static const char *offlineReason[] = {
|
|||
"timezone not match",
|
||||
"locale not match",
|
||||
"charset not match",
|
||||
"ttl change on write not match"
|
||||
"ttlChangeOnWrite not match",
|
||||
"unknown",
|
||||
};
|
||||
|
||||
|
|
|
@ -863,6 +863,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
|
|||
mndReleaseDb(pMnode, pDb);
|
||||
goto _OVER;
|
||||
}
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
} else {
|
||||
while (1) {
|
||||
SDbObj *pDb = NULL;
|
||||
|
@ -887,6 +888,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
|
|||
mndReleaseDb(pMnode, pDb);
|
||||
goto _OVER;
|
||||
}
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
} else {
|
||||
while (1) {
|
||||
SDbObj *pDb = NULL;
|
||||
|
@ -908,6 +910,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
taosHashRemove(newUser.readDbs, alterReq.objname, len);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
} else {
|
||||
taosHashClear(newUser.readDbs);
|
||||
}
|
||||
|
@ -922,6 +925,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
taosHashRemove(newUser.writeDbs, alterReq.objname, len);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
} else {
|
||||
taosHashClear(newUser.writeDbs);
|
||||
}
|
||||
|
|
|
@ -183,7 +183,7 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
if (pTask->status.timerActive == 1) {
|
||||
if (pTask->status.timerActive >= 1) {
|
||||
inTimer = true;
|
||||
}
|
||||
}
|
||||
|
@ -646,7 +646,8 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)data, len);
|
||||
if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -654,19 +655,22 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
STqOffset* pOffset = &vgOffset.offset;
|
||||
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
|
||||
if (pSavedOffset == NULL) {
|
||||
return TSDB_CODE_TMQ_NO_COMMITTED;
|
||||
terrno = TSDB_CODE_TMQ_NO_COMMITTED;
|
||||
return terrno;
|
||||
}
|
||||
vgOffset.offset = *pSavedOffset;
|
||||
|
||||
int32_t code = 0;
|
||||
tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code);
|
||||
if (code < 0) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
void* buf = rpcMallocCont(len);
|
||||
if (buf == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, buf, len);
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "tsdb.h"
|
||||
#include "tsdbFSet2.h"
|
||||
#include "tsdbMerge.h"
|
||||
#include "tsdbReadUtil.h"
|
||||
#include "tsdbSttFileRW.h"
|
||||
|
||||
|
@ -352,10 +353,14 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t uidComparFn(const void *p1, const void *p2) {
|
||||
const uint64_t *uid1 = p1;
|
||||
static int32_t suidComparFn(const void *target, const void *p2) {
|
||||
const uint64_t *targetUid = target;
|
||||
const uint64_t *uid2 = p2;
|
||||
return (*uid1) - (*uid2);
|
||||
if (*uid2 == (*targetUid)) {
|
||||
return 0;
|
||||
} else {
|
||||
return (*targetUid) < (*uid2) ? -1:1;
|
||||
}
|
||||
}
|
||||
|
||||
static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint64_t suid, uint64_t uid,
|
||||
|
@ -372,29 +377,55 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6
|
|||
}
|
||||
}
|
||||
|
||||
// for (; i < TARRAY2_SIZE(pStatisBlkArray); ++i) {
|
||||
// SStatisBlk *p = &pStatisBlkArray->data[i];
|
||||
// if (p->minTbid.uid <= uid && p->maxTbid.uid >= uid) {
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// if (p->maxTbid.uid < uid) {
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
|
||||
if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SStatisBlk *p = &pStatisBlkArray->data[i];
|
||||
STbStatisBlock block = {0};
|
||||
tsdbSttFileReadStatisBlock(pReader, p, &block);
|
||||
while(i < TARRAY2_SIZE(pStatisBlkArray)) {
|
||||
SStatisBlk *p = &pStatisBlkArray->data[i];
|
||||
if (p->minTbid.suid > suid) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t index = tarray2SearchIdx(block.uid, &uid, sizeof(int64_t), uidComparFn, TD_EQ);
|
||||
tStatisBlockDestroy(&block);
|
||||
STbStatisBlock block = {0};
|
||||
tsdbSttFileReadStatisBlock(pReader, p, &block);
|
||||
|
||||
return (index != -1);
|
||||
int32_t index = tarray2SearchIdx(block.suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ);
|
||||
if (index == -1) {
|
||||
tStatisBlockDestroy(&block);
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t j = index;
|
||||
if (block.uid->data[j] == uid) {
|
||||
tStatisBlockDestroy(&block);
|
||||
return true;
|
||||
} else if (block.uid->data[j] > uid) {
|
||||
while (j >= 0 && block.suid->data[j] == suid) {
|
||||
if (block.uid->data[j] == uid) {
|
||||
tStatisBlockDestroy(&block);
|
||||
return true;
|
||||
} else {
|
||||
j -= 1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
j = index + 1;
|
||||
while (j < block.suid->size && block.suid->data[j] == suid) {
|
||||
if (block.uid->data[j] == uid) {
|
||||
tStatisBlockDestroy(&block);
|
||||
return true;
|
||||
} else {
|
||||
j += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tStatisBlockDestroy(&block);
|
||||
i += 1;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
|
||||
|
@ -452,12 +483,12 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader
|
|||
tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
|
||||
}
|
||||
|
||||
// bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader);
|
||||
// if (!exists) {
|
||||
// pIter->iSttBlk = -1;
|
||||
// pIter->pSttBlk = NULL;
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader);
|
||||
if (!exists) {
|
||||
pIter->iSttBlk = -1;
|
||||
pIter->pSttBlk = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// find the start block, actually we could load the position to avoid repeatly searching for the start position when
|
||||
// the skey is updated.
|
||||
|
|
|
@ -439,7 +439,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
|
|||
return code;
|
||||
|
||||
_end:
|
||||
tsdbReaderClose(pReader);
|
||||
tsdbReaderClose2(pReader);
|
||||
*ppReader = NULL;
|
||||
return code;
|
||||
}
|
||||
|
@ -1729,41 +1729,45 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
|||
|
||||
// row in last file block
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
|
||||
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist
|
||||
if (key < tsLast) {
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||
} else if (key == ts) {
|
||||
SRow* pTSRow = NULL;
|
||||
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||
|
||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
|
||||
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||
|
||||
taosMemoryFree(pTSRow);
|
||||
tsdbRowMergerClear(pMerger);
|
||||
return code;
|
||||
} else { // key > ts
|
||||
} else if (key > tsLast) {
|
||||
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
|
||||
}
|
||||
} else {
|
||||
if (key > tsLast) {
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||
} else if (key < tsLast) {
|
||||
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
|
||||
}
|
||||
} else { // desc order
|
||||
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
|
||||
}
|
||||
// the following for key == tsLast
|
||||
SRow* pTSRow = NULL;
|
||||
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||
|
||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
|
||||
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||
|
||||
taosMemoryFree(pTSRow);
|
||||
tsdbRowMergerClear(pMerger);
|
||||
return code;
|
||||
|
||||
} else { // only last block exists
|
||||
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
|
||||
}
|
||||
|
@ -2190,7 +2194,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
|||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
|
||||
TSDBROW *pRow = NULL, *piRow = NULL;
|
||||
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
|
||||
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] :
|
||||
(ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN);
|
||||
if (pBlockScanInfo->iter.hasVal) {
|
||||
pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
|
||||
}
|
||||
|
@ -2564,9 +2569,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
|||
|
||||
// load the last data block of current table
|
||||
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
|
||||
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
|
||||
// reset the index in last block when handing a new file
|
||||
// doCleanupTableScanInfo(pScanInfo);
|
||||
if (pScanInfo == NULL) {
|
||||
tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr);
|
||||
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
||||
if (!hasNexTable) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2575,8 +2579,15 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
|||
continue;
|
||||
}
|
||||
|
||||
// reset the index in last block when handing a new file
|
||||
// doCleanupTableScanInfo(pScanInfo);
|
||||
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
|
||||
// reset the index in last block when handing a new file
|
||||
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
||||
if (!hasNexTable) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
||||
if (!hasDataInLastFile) {
|
||||
|
@ -2667,16 +2678,32 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
(ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey;
|
||||
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
||||
} else {
|
||||
if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
// only return the rows in last block
|
||||
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
ASSERT(tsLast >= pBlockInfo->record.lastKey);
|
||||
bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader);
|
||||
int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN;
|
||||
if (!bHasDataInLastBlock || ((ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.lastKey < tsLast) ||
|
||||
(!ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.firstKey > tsLast))) {
|
||||
// whole block is required, return it directly
|
||||
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
|
||||
pInfo->rows = pBlockInfo->record.numRow;
|
||||
pInfo->id.uid = pScanInfo->uid;
|
||||
pInfo->dataLoad = 0;
|
||||
pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey};
|
||||
setComposedBlockFlag(pReader, false);
|
||||
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
|
||||
|
||||
// update the last key for the corresponding table
|
||||
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey;
|
||||
tsdbDebug("%p uid:%" PRIu64
|
||||
" clean file block retrieved from file, global index:%d, "
|
||||
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
|
||||
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
|
||||
pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
|
||||
} else {
|
||||
SBlockData* pBData = &pReader->status.fileBlockData;
|
||||
tBlockDataReset(pBData);
|
||||
|
||||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||
tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
|
||||
tsdbDebug("load data in last block firstly %s", pReader->idStr);
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
|
@ -2707,23 +2734,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
||||
pResBlock->info.rows, el, pReader->idStr);
|
||||
}
|
||||
} else { // whole block is required, return it directly
|
||||
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
|
||||
pInfo->rows = pBlockInfo->record.numRow;
|
||||
pInfo->id.uid = pScanInfo->uid;
|
||||
pInfo->dataLoad = 0;
|
||||
pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey};
|
||||
setComposedBlockFlag(pReader, false);
|
||||
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
|
||||
|
||||
// update the last key for the corresponding table
|
||||
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey;
|
||||
tsdbDebug("%p uid:%" PRIu64
|
||||
" clean file block retrieved from file, global index:%d, "
|
||||
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
|
||||
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
|
||||
pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code;
|
||||
|
@ -4096,12 +4108,10 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
tsdbDataFileReaderClose(&pReader->pFileReader);
|
||||
|
||||
int64_t loadBlocks = 0;
|
||||
double elapse = 0;
|
||||
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &loadBlocks, &elapse);
|
||||
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
||||
|
||||
// resetDataBlockScanInfo excluding lastKey
|
||||
STableBlockScanInfo** p = NULL;
|
||||
int32_t iter = 0;
|
||||
|
|
|
@ -628,7 +628,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_TMQ_SEEK:
|
||||
return tqProcessSeekReq(pVnode->pTq, pMsg);
|
||||
|
||||
default:
|
||||
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
|
|
|
@ -25,6 +25,7 @@ extern "C" {
|
|||
#include "tsort.h"
|
||||
#include "ttszip.h"
|
||||
#include "tvariant.h"
|
||||
#include "theap.h"
|
||||
|
||||
#include "dataSinkMgt.h"
|
||||
#include "executil.h"
|
||||
|
@ -460,6 +461,14 @@ typedef struct SIntervalAggOperatorInfo {
|
|||
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
||||
STimeWindowAggSupp twAggSup;
|
||||
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
|
||||
// for limit optimization
|
||||
bool limited;
|
||||
int64_t limit;
|
||||
bool slimited;
|
||||
int64_t slimit;
|
||||
uint64_t curGroupId; // initialize to UINT64_MAX
|
||||
uint64_t handledGroupNum;
|
||||
BoundedQueue* pBQ;
|
||||
} SIntervalAggOperatorInfo;
|
||||
|
||||
typedef struct SMergeAlignedIntervalAggOperatorInfo {
|
||||
|
|
|
@ -191,6 +191,8 @@ int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols);
|
|||
bool tsortIsClosed(SSortHandle* pHandle);
|
||||
void tsortSetClosed(SSortHandle* pHandle);
|
||||
|
||||
void setSingleTableMerge(SSortHandle* pHandle);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -2138,8 +2138,9 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
if (pScanNode->groupOrderScan) pTableListInfo->numOfOuputGroups = taosArrayGetSize(pTableListInfo->pTableList);
|
||||
|
||||
if (groupSort) {
|
||||
if (groupSort || pScanNode->groupOrderScan) {
|
||||
code = sortTableGroup(pTableListInfo);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -278,7 +278,6 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
|
|||
SNode* pTagIndexCond, const char* pUser, const char* dbname) {
|
||||
int32_t type = nodeType(pPhyNode);
|
||||
const char* idstr = GET_TASKID(pTaskInfo);
|
||||
|
||||
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
||||
SOperatorInfo* pOperator = NULL;
|
||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
||||
|
|
|
@ -3047,17 +3047,22 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
// one table has one data block
|
||||
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
||||
|
||||
STableMergeScanSortSourceParam param = {0};
|
||||
param.pOperator = pOperator;
|
||||
STableMergeScanSortSourceParam *param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam));
|
||||
param->pOperator = pOperator;
|
||||
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
|
||||
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL);
|
||||
|
||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
ps->param = ¶m;
|
||||
ps->onlyRef = true;
|
||||
ps->param = param;
|
||||
ps->onlyRef = false;
|
||||
tsortAddSource(pInfo->pSortHandle, ps);
|
||||
|
||||
int32_t code = tsortOpen(pInfo->pSortHandle);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (numOfTable == 1) {
|
||||
setSingleTableMerge(pInfo->pSortHandle);
|
||||
} else {
|
||||
code = tsortOpen(pInfo->pSortHandle);
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
|
|
|
@ -876,7 +876,67 @@ bool needDeleteWindowBuf(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) {
|
|||
return pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark;
|
||||
}
|
||||
|
||||
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
|
||||
static bool tsKeyCompFn(void* l, void* r, void* param) {
|
||||
TSKEY* lTS = (TSKEY*)l;
|
||||
TSKEY* rTS = (TSKEY*)r;
|
||||
SIntervalAggOperatorInfo* pInfo = param;
|
||||
return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
|
||||
}
|
||||
|
||||
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
|
||||
char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
|
||||
SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
|
||||
return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief check if cur window should be filtered out by limit info
|
||||
* @retval true if should be filtered out
|
||||
* @retval false if not filtering out
|
||||
* @note If no limit info, we skip filtering.
|
||||
* If input/output ts order mismatch, we skip filtering too.
|
||||
* eg. input ts order: desc, and output ts order: asc, limit: 10
|
||||
* IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
|
||||
* every tuple in every block.
|
||||
* And the boundedQueue keeps refreshing all records with smaller ts key.
|
||||
*/
|
||||
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId) {
|
||||
if (!pOperatorInfo->limited // if no limit info, no filter will be applied
|
||||
|| pOperatorInfo->binfo.inputTsOrder !=
|
||||
pOperatorInfo->binfo.outputTsOrder // if input/output ts order mismatch, no filter
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
if (pOperatorInfo->limit == 0) return true;
|
||||
|
||||
if (pOperatorInfo->pBQ == NULL) {
|
||||
pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosMemoryFree, pOperatorInfo);
|
||||
}
|
||||
|
||||
bool shouldFilter = false;
|
||||
// if BQ has been full, compare it with top of BQ
|
||||
if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
|
||||
PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
|
||||
shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
|
||||
}
|
||||
if (shouldFilter) {
|
||||
return true;
|
||||
} else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// cur win not been filtered out and not been pushed into BQ yet, push it into BQ
|
||||
PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
|
||||
*((TSKEY*)node.data) = win->skey;
|
||||
|
||||
if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
|
||||
taosMemoryFree(node.data);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
|
||||
int32_t scanFlag) {
|
||||
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
|
||||
|
||||
|
@ -891,8 +951,21 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
|
||||
SResultRow* pResult = NULL;
|
||||
|
||||
if (tableGroupId != pInfo->curGroupId) {
|
||||
pInfo->handledGroupNum += 1;
|
||||
if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
|
||||
return true;
|
||||
} else {
|
||||
pInfo->curGroupId = tableGroupId;
|
||||
destroyBoundedQueue(pInfo->pBQ);
|
||||
pInfo->pBQ = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
STimeWindow win =
|
||||
getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
|
||||
if (filterWindowWithLimit(pInfo, &win, tableGroupId)) return false;
|
||||
|
||||
int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
|
||||
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
|
||||
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||
|
@ -929,7 +1002,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
while (1) {
|
||||
int32_t prevEndPos = forwardRows - 1 + startPos;
|
||||
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->binfo.inputTsOrder);
|
||||
if (startPos < 0) {
|
||||
if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId)) {
|
||||
break;
|
||||
}
|
||||
// null data, failed to allocate more memory buffer
|
||||
|
@ -963,6 +1036,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
if (pInfo->timeWindowInterpo) {
|
||||
saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
|
||||
|
@ -1043,7 +1117,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
|
||||
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag);
|
||||
if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
|
||||
}
|
||||
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
|
||||
|
@ -1495,6 +1569,7 @@ void destroyIntervalOperatorInfo(void* param) {
|
|||
|
||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||
destroyBoundedQueue(pInfo->pBQ);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
|
@ -1658,6 +1733,17 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
|
|||
pInfo->interval = interval;
|
||||
pInfo->twAggSup = as;
|
||||
pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
|
||||
if (pPhyNode->window.node.pLimit) {
|
||||
SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
|
||||
pInfo->limited = true;
|
||||
pInfo->limit = pLimit->limit + pLimit->offset;
|
||||
}
|
||||
if (pPhyNode->window.node.pSlimit) {
|
||||
SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
|
||||
pInfo->slimited = true;
|
||||
pInfo->slimit = pLimit->limit + pLimit->offset;
|
||||
pInfo->curGroupId = UINT64_MAX;
|
||||
}
|
||||
|
||||
if (pPhyNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
|
|
|
@ -69,8 +69,14 @@ struct SSortHandle {
|
|||
_sort_fetch_block_fn_t fetchfp;
|
||||
_sort_merge_compar_fn_t comparFn;
|
||||
SMultiwayMergeTreeInfo* pMergeTree;
|
||||
|
||||
bool singleTableMerge;
|
||||
};
|
||||
|
||||
void setSingleTableMerge(SSortHandle* pHandle) {
|
||||
pHandle->singleTableMerge = true;
|
||||
}
|
||||
|
||||
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
|
||||
|
||||
// | offset[0] | offset[1] |....| nullbitmap | data |...|
|
||||
|
@ -1453,6 +1459,26 @@ static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) {
|
|||
return &pHandle->tupleHandle;
|
||||
}
|
||||
|
||||
static STupleHandle* tsortSingleTableMergeNextTuple(SSortHandle* pHandle) {
|
||||
if (1 == pHandle->numOfCompletedSources) return NULL;
|
||||
if (pHandle->tupleHandle.pBlock && pHandle->tupleHandle.rowIndex + 1 < pHandle->tupleHandle.pBlock->info.rows) {
|
||||
pHandle->tupleHandle.rowIndex++;
|
||||
} else {
|
||||
if (pHandle->tupleHandle.rowIndex == -1) return NULL;
|
||||
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
|
||||
SSortSource* source = *pSource;
|
||||
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
|
||||
if (!pBlock || pBlock->info.rows == 0) {
|
||||
setCurrentSourceDone(source, pHandle);
|
||||
pHandle->tupleHandle.pBlock = NULL;
|
||||
return NULL;
|
||||
}
|
||||
pHandle->tupleHandle.pBlock = pBlock;
|
||||
pHandle->tupleHandle.rowIndex = 0;
|
||||
}
|
||||
return &pHandle->tupleHandle;
|
||||
}
|
||||
|
||||
int32_t tsortOpen(SSortHandle* pHandle) {
|
||||
if (pHandle->opened) {
|
||||
return 0;
|
||||
|
@ -1470,7 +1496,9 @@ int32_t tsortOpen(SSortHandle* pHandle) {
|
|||
}
|
||||
|
||||
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
|
||||
if (pHandle->pBoundedQueue)
|
||||
if (pHandle->singleTableMerge)
|
||||
return tsortSingleTableMergeNextTuple(pHandle);
|
||||
else if (pHandle->pBoundedQueue)
|
||||
return tsortPQSortNextTuple(pHandle);
|
||||
else
|
||||
return tsortBufMergeSortNextTuple(pHandle);
|
||||
|
|
|
@ -821,7 +821,19 @@ static bool isPrimaryKeyImpl(SNode* pExpr) {
|
|||
FUNCTION_TYPE_IROWTS == pFunc->funcType) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
} else if (QUERY_NODE_OPERATOR == nodeType(pExpr)) {
|
||||
SOperatorNode* pOper = (SOperatorNode*)pExpr;
|
||||
if (OP_TYPE_ADD != pOper->opType && OP_TYPE_SUB != pOper->opType) {
|
||||
return false;
|
||||
}
|
||||
if (!isPrimaryKeyImpl(pOper->pLeft)) {
|
||||
return false;
|
||||
}
|
||||
if (QUERY_NODE_VALUE != nodeType(pOper->pRight)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -848,7 +848,6 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
|
|||
: (pSelect->hasTimeLineFunc ? getRequireDataOrder(true, pSelect) : DATA_ORDER_LEVEL_IN_BLOCK);
|
||||
pWindow->node.resultDataOrder =
|
||||
pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_GLOBAL : getRequireDataOrder(true, pSelect);
|
||||
|
||||
pWindow->pTspk = nodesCloneNode(pInterval->pCol);
|
||||
if (NULL == pWindow->pTspk) {
|
||||
nodesDestroyNode((SNode*)pWindow);
|
||||
|
|
|
@ -368,7 +368,7 @@ static void scanPathOptSetGroupOrderScan(SScanLogicNode* pScan) {
|
|||
|
||||
if (pScan->node.pParent && nodeType(pScan->node.pParent) == QUERY_NODE_LOGIC_PLAN_AGG) {
|
||||
SAggLogicNode* pAgg = (SAggLogicNode*)pScan->node.pParent;
|
||||
bool withSlimit = pAgg->node.pSlimit != NULL || (pAgg->node.pParent && pAgg->node.pParent->pSlimit);
|
||||
bool withSlimit = pAgg->node.pSlimit != NULL || (pAgg->node.pParent && pAgg->node.pParent->pSlimit);
|
||||
if (withSlimit && isPartTableAgg(pAgg)) {
|
||||
pScan->groupOrderScan = pAgg->node.forceCreateNonBlockingOptr = true;
|
||||
}
|
||||
|
@ -1651,11 +1651,33 @@ static bool planOptNodeListHasTbname(SNodeList* pKeys) {
|
|||
}
|
||||
|
||||
static bool partTagsIsOptimizableNode(SLogicNode* pNode) {
|
||||
return ((QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) ||
|
||||
(QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && NULL != ((SAggLogicNode*)pNode)->pGroupKeys &&
|
||||
NULL != ((SAggLogicNode*)pNode)->pAggFuncs)) &&
|
||||
1 == LIST_LENGTH(pNode->pChildren) &&
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0)));
|
||||
bool ret = 1 == LIST_LENGTH(pNode->pChildren) &&
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0));
|
||||
if (!ret) return ret;
|
||||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_LOGIC_PLAN_PARTITION: {
|
||||
if (pNode->pParent && nodeType(pNode->pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) {
|
||||
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode->pParent;
|
||||
if (pWindow->winType == WINDOW_TYPE_INTERVAL) {
|
||||
// if interval has slimit, we push down partition node to scan, and scan will set groupOrderScan to true
|
||||
// we want to skip groups of blocks after slimit satisfied
|
||||
// if interval only has limit, we do not push down partition node to scan
|
||||
// we want to get grouped output from partition node and make use of limit
|
||||
// if no slimit and no limit, we push down partition node and groupOrderScan is false, cause we do not need
|
||||
// group ordered output
|
||||
if (!pWindow->node.pSlimit && pWindow->node.pLimit) ret = false;
|
||||
}
|
||||
}
|
||||
} break;
|
||||
case QUERY_NODE_LOGIC_PLAN_AGG: {
|
||||
SAggLogicNode* pAgg = (SAggLogicNode*)pNode;
|
||||
ret = pAgg->pGroupKeys && pAgg->pAggFuncs;
|
||||
} break;
|
||||
default:
|
||||
ret = false;
|
||||
break;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
static SNodeList* partTagsGetPartKeys(SLogicNode* pNode) {
|
||||
|
@ -1796,6 +1818,8 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
|||
scanPathOptSetGroupOrderScan(pScan);
|
||||
pParent->hasGroupKeyOptimized = true;
|
||||
}
|
||||
if (pNode->pParent->pSlimit)
|
||||
pScan->groupOrderScan = true;
|
||||
|
||||
NODES_CLEAR_LIST(pNode->pChildren);
|
||||
nodesDestroyNode((SNode*)pNode);
|
||||
|
@ -2752,23 +2776,79 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
|
|||
}
|
||||
|
||||
static bool pushDownLimitOptShouldBeOptimized(SLogicNode* pNode) {
|
||||
if (NULL == pNode->pLimit || 1 != LIST_LENGTH(pNode->pChildren)) {
|
||||
if ((NULL == pNode->pLimit && pNode->pSlimit == NULL) || 1 != LIST_LENGTH(pNode->pChildren)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
|
||||
// push down to sort node
|
||||
if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
|
||||
// if we have pushed down, we skip it
|
||||
if (pChild->pLimit) return false;
|
||||
} else if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild) || QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pNode)) {
|
||||
// push down to table scan node
|
||||
// if pNode is sortNode, we skip push down limit info to table scan node
|
||||
return false;
|
||||
}
|
||||
if (pChild->pLimit || pChild->pSlimit) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
static void swapLimit(SLogicNode* pParent, SLogicNode* pChild) {
|
||||
pChild->pLimit = pParent->pLimit;
|
||||
pParent->pLimit = NULL;
|
||||
}
|
||||
|
||||
static void cloneLimit(SLogicNode* pParent, SLogicNode* pChild) {
|
||||
SLimitNode* pLimit = NULL;
|
||||
if (pParent->pLimit) {
|
||||
pChild->pLimit = nodesCloneNode(pParent->pLimit);
|
||||
pLimit = (SLimitNode*)pChild->pLimit;
|
||||
pLimit->limit += pLimit->offset;
|
||||
pLimit->offset = 0;
|
||||
}
|
||||
|
||||
if (pParent->pSlimit) {
|
||||
pChild->pSlimit = nodesCloneNode(pParent->pSlimit);
|
||||
pLimit = (SLimitNode*)pChild->pSlimit;
|
||||
pLimit->limit += pLimit->offset;
|
||||
pLimit->offset = 0;
|
||||
}
|
||||
}
|
||||
|
||||
static bool pushDownLimitHow(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPushTo);
|
||||
static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPushTo) {
|
||||
switch (nodeType(pNodeLimitPushTo)) {
|
||||
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
|
||||
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNodeLimitPushTo;
|
||||
if (pWindow->winType != WINDOW_TYPE_INTERVAL) break;
|
||||
cloneLimit(pNodeWithLimit, pNodeLimitPushTo);
|
||||
return true;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_FILL:
|
||||
case QUERY_NODE_LOGIC_PLAN_SORT: {
|
||||
cloneLimit(pNodeWithLimit, pNodeLimitPushTo);
|
||||
SNode* pChild = NULL;
|
||||
FOREACH(pChild, pNodeLimitPushTo->pChildren) { pushDownLimitHow(pNodeLimitPushTo, (SLogicNode*)pChild); }
|
||||
return true;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT && pNodeWithLimit->pLimit) {
|
||||
swapLimit(pNodeWithLimit, pNodeLimitPushTo);
|
||||
return true;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool pushDownLimitHow(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPushTo) {
|
||||
switch (nodeType(pNodeWithLimit)) {
|
||||
case QUERY_NODE_LOGIC_PLAN_PROJECT:
|
||||
case QUERY_NODE_LOGIC_PLAN_FILL:
|
||||
return pushDownLimitTo(pNodeWithLimit, pNodeLimitPushTo);
|
||||
case QUERY_NODE_LOGIC_PLAN_SORT: {
|
||||
SSortLogicNode* pSort = (SSortLogicNode*)pNodeWithLimit;
|
||||
if (sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys)) return pushDownLimitTo(pNodeWithLimit, pNodeLimitPushTo);
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||
SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, pushDownLimitOptShouldBeOptimized);
|
||||
if (NULL == pNode) {
|
||||
|
@ -2777,17 +2857,9 @@ static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
|
|||
|
||||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
|
||||
nodesDestroyNode(pChild->pLimit);
|
||||
if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
|
||||
pChild->pLimit = nodesCloneNode(pNode->pLimit);
|
||||
SLimitNode* pLimit = (SLimitNode*)pChild->pLimit;
|
||||
pLimit->limit += pLimit->offset;
|
||||
pLimit->offset = 0;
|
||||
} else {
|
||||
pChild->pLimit = pNode->pLimit;
|
||||
pNode->pLimit = NULL;
|
||||
if (pushDownLimitHow(pNode, pChild)) {
|
||||
pCxt->optimized = true;
|
||||
}
|
||||
pCxt->optimized = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -3473,6 +3545,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
|
|||
{.pName = "sortNonPriKeyOptimize", .optimizeFunc = sortNonPriKeyOptimize},
|
||||
{.pName = "SortPrimaryKey", .optimizeFunc = sortPrimaryKeyOptimize},
|
||||
{.pName = "SmaIndex", .optimizeFunc = smaIndexOptimize},
|
||||
{.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize},
|
||||
{.pName = "PartitionTags", .optimizeFunc = partTagsOptimize},
|
||||
{.pName = "StableJoin", .optimizeFunc = stableJoinOptimize},
|
||||
{.pName = "MergeProjects", .optimizeFunc = mergeProjectsOptimize},
|
||||
|
@ -3480,7 +3553,6 @@ static const SOptimizeRule optimizeRuleSet[] = {
|
|||
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},
|
||||
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize},
|
||||
{.pName = "TagScan", .optimizeFunc = tagScanOptimize},
|
||||
{.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize},
|
||||
{.pName = "TableCountScan", .optimizeFunc = tableCountScanOptimize},
|
||||
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
|
||||
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
|
||||
|
|
|
@ -499,6 +499,18 @@ static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* p
|
|||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
|
||||
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
|
||||
if (pMerge->node.pLimit) {
|
||||
nodesDestroyNode(pMerge->node.pLimit);
|
||||
pMerge->node.pLimit = NULL;
|
||||
}
|
||||
if (pMerge->node.pSlimit) {
|
||||
nodesDestroyNode(pMerge->node.pSlimit);
|
||||
pMerge->node.pSlimit = NULL;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -218,11 +218,11 @@ _EXIT:
|
|||
}
|
||||
void streamBackendCleanup(void* arg) {
|
||||
SBackendWrapper* pHandle = (SBackendWrapper*)arg;
|
||||
RocksdbCfInst** pIter = (RocksdbCfInst**)taosHashIterate(pHandle->cfInst, NULL);
|
||||
void* pIter = taosHashIterate(pHandle->cfInst, NULL);
|
||||
while (pIter != NULL) {
|
||||
RocksdbCfInst* inst = *pIter;
|
||||
RocksdbCfInst* inst = *(RocksdbCfInst**)pIter;
|
||||
destroyRocksdbCfInst(inst);
|
||||
taosHashIterate(pHandle->cfInst, pIter);
|
||||
pIter = taosHashIterate(pHandle->cfInst, pIter);
|
||||
}
|
||||
taosHashCleanup(pHandle->cfInst);
|
||||
|
||||
|
@ -1610,6 +1610,9 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
|
|||
const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
|
||||
stateSessionKeyDecode((void*)&ktmp, (char*)curKey);
|
||||
|
||||
if (pVal != NULL) *pVal = NULL;
|
||||
if (pVLen != NULL) *pVLen = 0;
|
||||
|
||||
SStateSessionKey* pKTmp = &ktmp;
|
||||
const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen);
|
||||
char* val = NULL;
|
||||
|
@ -1617,19 +1620,23 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
|
|||
if (len < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pKTmp->opNum != pCur->number) {
|
||||
taosMemoryFree(val);
|
||||
return -1;
|
||||
}
|
||||
if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
|
||||
taosMemoryFree(val);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pVal != NULL) {
|
||||
*pVal = (char*)val;
|
||||
} else {
|
||||
taosMemoryFree(val);
|
||||
}
|
||||
if (pVLen != NULL) *pVLen = len;
|
||||
|
||||
if (pKTmp->opNum != pCur->number) {
|
||||
return -1;
|
||||
}
|
||||
if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
|
||||
return -1;
|
||||
}
|
||||
if (pVLen != NULL) *pVLen = len;
|
||||
*pKey = pKTmp->key;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -550,13 +550,27 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
|||
|
||||
static void doRetryDispatchData(void* param, void* tmrId) {
|
||||
SStreamTask* pTask = param;
|
||||
|
||||
if (streamTaskShouldStop(&pTask->status)) {
|
||||
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
|
||||
return;
|
||||
}
|
||||
|
||||
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
|
||||
|
||||
int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
|
||||
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
|
||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
if (!streamTaskShouldStop(&pTask->status)) {
|
||||
qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
|
||||
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
|
||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
} else {
|
||||
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
|
||||
}
|
||||
} else {
|
||||
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -137,19 +137,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
|||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
if (pTask->schedTimer) {
|
||||
taosTmrStop(pTask->schedTimer);
|
||||
pTask->schedTimer = NULL;
|
||||
}
|
||||
|
||||
if (pTask->launchTaskTimer) {
|
||||
taosTmrStop(pTask->launchTaskTimer);
|
||||
pTask->launchTaskTimer = NULL;
|
||||
}
|
||||
|
||||
tFreeStreamTask(pTask);
|
||||
tFreeStreamTask(*(SStreamTask**)pIter);
|
||||
}
|
||||
|
||||
taosHashCleanup(pMeta->pTasks);
|
||||
|
@ -362,11 +350,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
|
|||
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
||||
doRemoveIdFromList(pMeta, num, pTask->id.taskId);
|
||||
|
||||
// remove the ref by timer
|
||||
if (pTask->triggerParam != 0) {
|
||||
taosTmrStop(pTask->schedTimer);
|
||||
}
|
||||
|
||||
streamMetaRemoveTask(pMeta, taskId);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
} else {
|
||||
|
|
|
@ -540,14 +540,14 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
|||
taosWLockLatch(&pMeta->lock);
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
ASSERT((*ppTask)->status.timerActive == 1);
|
||||
ASSERT((*ppTask)->status.timerActive >= 1);
|
||||
|
||||
if (streamTaskShouldStop(&(*ppTask)->status)) {
|
||||
const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
|
||||
qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);
|
||||
|
||||
taosMemoryFree(pInfo);
|
||||
(*ppTask)->status.timerActive = 0;
|
||||
atomic_sub_fetch_8(&(*ppTask)->status.timerActive, 1);
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
return;
|
||||
}
|
||||
|
@ -556,7 +556,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
|||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
|
||||
if (pTask != NULL) {
|
||||
ASSERT(pTask->status.timerActive == 1);
|
||||
ASSERT(pTask->status.timerActive >= 1);
|
||||
|
||||
// abort the timer if intend to stop task
|
||||
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
|
||||
|
@ -578,7 +578,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
// not in timer anymore
|
||||
pTask->status.timerActive = 0;
|
||||
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
} else {
|
||||
qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId);
|
||||
|
@ -609,11 +609,11 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
|||
// todo failed to create timer
|
||||
taosMemoryFree(pInfo);
|
||||
} else {
|
||||
pTask->status.timerActive = 1; // timer is active
|
||||
atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active
|
||||
qDebug("s-task:%s set timer active flag", pTask->id.idStr);
|
||||
}
|
||||
} else { // timer exists
|
||||
pTask->status.timerActive = 1;
|
||||
ASSERT(pTask->status.timerActive > 0);
|
||||
qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
|
||||
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
|
||||
}
|
||||
|
|
|
@ -13,11 +13,11 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <libs/transport/trpc.h>
|
||||
#include <streamInt.h>
|
||||
#include "streamInt.h"
|
||||
#include "executor.h"
|
||||
#include "tstream.h"
|
||||
#include "wal.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
|
||||
int32_t childId = taosArrayGetSize(pArray);
|
||||
|
@ -213,6 +213,22 @@ static void freeItem(void* p) {
|
|||
void tFreeStreamTask(SStreamTask* pTask) {
|
||||
qDebug("free s-task:%s, %p", pTask->id.idStr, pTask);
|
||||
|
||||
// remove the ref by timer
|
||||
while(pTask->status.timerActive > 0) {
|
||||
qDebug("s-task:%s wait for task stop timer activities", pTask->id.idStr);
|
||||
taosMsleep(10);
|
||||
}
|
||||
|
||||
if (pTask->schedTimer != NULL) {
|
||||
taosTmrStop(pTask->schedTimer);
|
||||
pTask->schedTimer = NULL;
|
||||
}
|
||||
|
||||
if (pTask->launchTaskTimer != NULL) {
|
||||
taosTmrStop(pTask->launchTaskTimer);
|
||||
pTask->launchTaskTimer = NULL;
|
||||
}
|
||||
|
||||
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
|
||||
if (pTask->inputQueue) {
|
||||
streamQueueClose(pTask->inputQueue);
|
||||
|
|
|
@ -367,8 +367,49 @@ int32_t taosGetTimeOfDay(struct timeval *tv) {
|
|||
|
||||
time_t taosTime(time_t *t) { return time(t); }
|
||||
|
||||
/*
|
||||
* mktime64 - Converts date to seconds.
|
||||
* Converts Gregorian date to seconds since 1970-01-01 00:00:00.
|
||||
* Assumes input in normal date format, i.e. 1980-12-31 23:59:59
|
||||
* => year=1980, mon=12, day=31, hour=23, min=59, sec=59.
|
||||
*
|
||||
* [For the Julian calendar (which was used in Russia before 1917,
|
||||
* Britain & colonies before 1752, anywhere else before 1582,
|
||||
* and is still in use by some communities) leave out the
|
||||
* -year/100+year/400 terms, and add 10.]
|
||||
*
|
||||
* This algorithm was first published by Gauss (I think).
|
||||
*
|
||||
* A leap second can be indicated by calling this function with sec as
|
||||
* 60 (allowable under ISO 8601). The leap second is treated the same
|
||||
* as the following second since they don't exist in UNIX time.
|
||||
*
|
||||
* An encoding of midnight at the end of the day as 24:00:00 - ie. midnight
|
||||
* tomorrow - (allowable under ISO 8601) is supported.
|
||||
*/
|
||||
int64_t user_mktime64(const uint32_t year, const uint32_t mon, const uint32_t day, const uint32_t hour,
|
||||
const uint32_t min, const uint32_t sec, int64_t time_zone) {
|
||||
uint32_t _mon = mon, _year = year;
|
||||
|
||||
/* 1..12 -> 11,12,1..10 */
|
||||
if (0 >= (int32_t)(_mon -= 2)) {
|
||||
_mon += 12; /* Puts Feb last since it has leap day */
|
||||
_year -= 1;
|
||||
}
|
||||
|
||||
// int64_t _res = (((((int64_t) (_year/4 - _year/100 + _year/400 + 367*_mon/12 + day) +
|
||||
// _year*365 - 719499)*24 + hour)*60 + min)*60 + sec);
|
||||
int64_t _res = 367 * ((int64_t)_mon) / 12;
|
||||
_res += _year / 4 - _year / 100 + _year / 400 + day + ((int64_t)_year) * 365 - 719499;
|
||||
_res *= 24;
|
||||
_res = ((_res + hour) * 60 + min) * 60 + sec;
|
||||
|
||||
return _res + time_zone;
|
||||
}
|
||||
|
||||
time_t taosMktime(struct tm *timep) {
|
||||
#ifdef WINDOWS
|
||||
#if 0
|
||||
struct tm tm1 = {0};
|
||||
LARGE_INTEGER t;
|
||||
FILETIME f;
|
||||
|
@ -405,6 +446,19 @@ time_t taosMktime(struct tm *timep) {
|
|||
|
||||
t.QuadPart -= offset.QuadPart;
|
||||
return (time_t)(t.QuadPart / 10000000);
|
||||
#else
|
||||
time_t result = mktime(timep);
|
||||
if (result != -1) {
|
||||
return result;
|
||||
}
|
||||
#ifdef _MSC_VER
|
||||
#if _MSC_VER >= 1900
|
||||
int64_t tz = _timezone;
|
||||
#endif
|
||||
#endif
|
||||
return user_mktime64(timep->tm_year + 1900, timep->tm_mon + 1, timep->tm_mday, timep->tm_hour, timep->tm_min,
|
||||
timep->tm_sec, tz);
|
||||
#endif
|
||||
#else
|
||||
return mktime(timep);
|
||||
#endif
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
|
||||
|
@ -954,6 +955,7 @@
|
|||
,,n,script,./test.sh -f tsim/query/udfpy.sim
|
||||
,,y,script,./test.sh -f tsim/query/udf_with_const.sim
|
||||
,,y,script,./test.sh -f tsim/query/join_interval.sim
|
||||
,,y,script,./test.sh -f tsim/query/join_pk.sim
|
||||
,,y,script,./test.sh -f tsim/query/unionall_as_table.sim
|
||||
,,y,script,./test.sh -f tsim/query/multi_order_by.sim
|
||||
,,y,script,./test.sh -f tsim/query/sys_tbname.sim
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
|
||||
sql create database test;
|
||||
sql use test;
|
||||
sql create table st(ts timestamp, f int) tags(t int);
|
||||
sql insert into ct1 using st tags(1) values(now, 0)(now+1s, 1)
|
||||
sql insert into ct2 using st tags(2) values(now+2s, 2)(now+3s, 3)
|
||||
sql select * from (select _wstart - 1s as ts, count(*) as num1 from st interval(1s)) as t1 inner join (select _wstart as ts, count(*) as num2 from st interval(1s)) as t2 on t1.ts = t2.ts
|
||||
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data21 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data03 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data13 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data23 != 1 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from (select _wstart - 1d as ts, count(*) as num1 from st interval(1s)) as t1 inner join (select _wstart as ts, count(*) as num2 from st interval(1s)) as t2 on t1.ts = t2.ts
|
||||
|
||||
sql select * from (select _wstart + 1a as ts, count(*) as num1 from st interval(1s)) as t1 inner join (select _wstart as ts, count(*) as num2 from st interval(1s)) as t2 on t1.ts = t2.ts
|
||||
|
||||
sql_error select * from (select _wstart * 3 as ts, count(*) as num1 from st interval(1s)) as t1 inner join (select _wstart as ts, count(*) as num2 from st interval(1s)) as t2 on t1.ts = t2.ts
|
||||
|
||||
sql create table sst(ts timestamp, ts2 timestamp, f int) tags(t int);
|
||||
sql insert into sct1 using sst tags(1) values('2023-08-07 13:30:56', '2023-08-07 13:30:56', 0)('2023-08-07 13:30:57', '2023-08-07 13:30:57', 1)
|
||||
sql insert into sct2 using sst tags(2) values('2023-08-07 13:30:58', '2023-08-07 13:30:58', 2)('2023-08-07 13:30:59', '2023-08-07 13:30:59', 3)
|
||||
sql select * from (select ts - 1s as jts from sst) as t1 inner join (select ts-1s as jts from sst) as t2 on t1.jts = t2.jts
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
sql select * from (select ts - 1s as jts from sst) as t1 inner join (select ts as jts from sst) as t2 on t1.jts = t2.jts
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
sql_error select * from (select ts2 - 1s as jts from sst) as t1 inner join (select ts2 as jts from sst) as t2 on t1.jts = t2.jts
|
||||
|
||||
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
@ -1603,58 +1603,58 @@ QUERY_PLAN: Time Range: [-9223372036854775808,
|
|||
taos> select _wstart, last(ts), avg(c2) from meters interval(10s) order by _wstart desc;
|
||||
_wstart | last(ts) | avg(c2) |
|
||||
================================================================================
|
||||
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000 |
|
||||
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000 |
|
||||
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000 |
|
||||
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000 |
|
||||
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000 |
|
||||
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000 |
|
||||
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000 |
|
||||
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000 |
|
||||
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000 |
|
||||
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000 |
|
||||
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000000000 |
|
||||
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000000000 |
|
||||
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000000000 |
|
||||
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000000000 |
|
||||
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000000000 |
|
||||
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000000000 |
|
||||
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000000000 |
|
||||
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000000000 |
|
||||
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000000000 |
|
||||
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000000000 |
|
||||
|
||||
taos> select _wstart, last(ts), avg(c2) from meters interval(10s) order by _wstart asc;
|
||||
_wstart | last(ts) | avg(c2) |
|
||||
================================================================================
|
||||
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000 |
|
||||
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000 |
|
||||
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000 |
|
||||
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000 |
|
||||
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000 |
|
||||
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000 |
|
||||
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000 |
|
||||
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000 |
|
||||
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000 |
|
||||
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000 |
|
||||
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000000000 |
|
||||
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000000000 |
|
||||
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000000000 |
|
||||
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000000000 |
|
||||
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000000000 |
|
||||
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000000000 |
|
||||
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000000000 |
|
||||
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000000000 |
|
||||
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000000000 |
|
||||
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000000000 |
|
||||
|
||||
taos> select _wstart, first(ts), avg(c2) from meters interval(10s) order by _wstart asc;
|
||||
_wstart | first(ts) | avg(c2) |
|
||||
================================================================================
|
||||
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000 |
|
||||
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000 |
|
||||
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000 |
|
||||
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000 |
|
||||
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000 |
|
||||
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000 |
|
||||
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000 |
|
||||
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000 |
|
||||
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000 |
|
||||
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000 |
|
||||
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000000000 |
|
||||
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000000000 |
|
||||
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000000000 |
|
||||
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000000000 |
|
||||
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000000000 |
|
||||
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000000000 |
|
||||
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000000000 |
|
||||
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000000000 |
|
||||
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000000000 |
|
||||
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000000000 |
|
||||
|
||||
taos> select _wstart, first(ts), avg(c2) from meters interval(10s) order by _wstart desc;
|
||||
_wstart | first(ts) | avg(c2) |
|
||||
================================================================================
|
||||
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000 |
|
||||
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000 |
|
||||
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000 |
|
||||
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000 |
|
||||
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000 |
|
||||
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000 |
|
||||
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000 |
|
||||
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000 |
|
||||
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000 |
|
||||
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000 |
|
||||
2022-05-24 00:01:00.000 | 2022-05-24 00:01:08.000 | 210.000000000000000 |
|
||||
2022-05-23 00:01:00.000 | 2022-05-23 00:01:08.000 | 116.000000000000000 |
|
||||
2022-05-22 00:01:00.000 | 2022-05-22 00:01:08.000 | 196.000000000000000 |
|
||||
2022-05-21 00:01:00.000 | 2022-05-21 00:01:08.000 | 11.000000000000000 |
|
||||
2022-05-20 00:01:00.000 | 2022-05-20 00:01:08.000 | 120.000000000000000 |
|
||||
2022-05-19 00:01:00.000 | 2022-05-19 00:01:08.000 | 243.000000000000000 |
|
||||
2022-05-18 00:01:00.000 | 2022-05-18 00:01:08.000 | 58.000000000000000 |
|
||||
2022-05-17 00:01:00.000 | 2022-05-17 00:01:08.000 | 59.000000000000000 |
|
||||
2022-05-16 00:01:00.000 | 2022-05-16 00:01:08.000 | 136.000000000000000 |
|
||||
2022-05-15 00:01:00.000 | 2022-05-15 00:01:08.000 | 234.000000000000000 |
|
||||
|
||||
taos> select last(a) as d from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s)) order by d;
|
||||
d |
|
||||
|
@ -1792,35 +1792,35 @@ taos> select last(b) as d from (select last(ts) as b, avg(c2) as c from meters i
|
|||
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by d desc;
|
||||
_wstart | d | avg(c) |
|
||||
================================================================================
|
||||
2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000 |
|
||||
2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000 |
|
||||
2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000 |
|
||||
2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000 |
|
||||
2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000 |
|
||||
2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.600000000 |
|
||||
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 |
|
||||
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 |
|
||||
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 |
|
||||
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 |
|
||||
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 |
|
||||
2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000 |
|
||||
2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000 |
|
||||
2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000 |
|
||||
2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000 |
|
||||
2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000 |
|
||||
2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.400000000 |
|
||||
2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000 |
|
||||
2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.800000000 |
|
||||
2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000 |
|
||||
2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000 |
|
||||
2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.800000000 |
|
||||
2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000 |
|
||||
2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.600000000 |
|
||||
2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000 |
|
||||
2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000 |
|
||||
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 |
|
||||
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 |
|
||||
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 |
|
||||
2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000000000 |
|
||||
2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000000000 |
|
||||
2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000000000 |
|
||||
2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000000000 |
|
||||
2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000000000 |
|
||||
2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.599999999999994 |
|
||||
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.199999999999989 |
|
||||
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000000011 |
|
||||
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000000006 |
|
||||
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000000000 |
|
||||
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000000000 |
|
||||
2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000000000 |
|
||||
2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000000000 |
|
||||
2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000000000 |
|
||||
2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000000000 |
|
||||
2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000000003 |
|
||||
2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.399999999999999 |
|
||||
2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000000001 |
|
||||
2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.799999999999997 |
|
||||
2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000000000 |
|
||||
2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000000006 |
|
||||
2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.799999999999997 |
|
||||
2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000000003 |
|
||||
2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.599999999999994 |
|
||||
2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000000000 |
|
||||
2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000000000 |
|
||||
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000000000 |
|
||||
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000000000 |
|
||||
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000000000 |
|
||||
|
||||
taos> explain verbose true select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by d desc\G;
|
||||
*************************** 1.row ***************************
|
||||
|
@ -2673,51 +2673,51 @@ taos> select ts, c2 from d1 order by ts asc, c2 desc limit 5,5;
|
|||
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc;
|
||||
_wstart | d | avg(c) |
|
||||
================================================================================
|
||||
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 |
|
||||
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 |
|
||||
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 |
|
||||
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 |
|
||||
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 |
|
||||
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 |
|
||||
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 |
|
||||
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 |
|
||||
2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000 |
|
||||
2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000 |
|
||||
2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.600000000 |
|
||||
2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000 |
|
||||
2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000 |
|
||||
2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.600000000 |
|
||||
2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000 |
|
||||
2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000 |
|
||||
2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000 |
|
||||
2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000 |
|
||||
2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.800000000 |
|
||||
2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000 |
|
||||
2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000 |
|
||||
2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000 |
|
||||
2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.800000000 |
|
||||
2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000 |
|
||||
2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.400000000 |
|
||||
2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000 |
|
||||
2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000 |
|
||||
2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000 |
|
||||
2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000 |
|
||||
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000000000 |
|
||||
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000000000 |
|
||||
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000000006 |
|
||||
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000000000 |
|
||||
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000000000 |
|
||||
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000000011 |
|
||||
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000000000 |
|
||||
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.199999999999989 |
|
||||
2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000000000 |
|
||||
2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000000000 |
|
||||
2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.599999999999994 |
|
||||
2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000000000 |
|
||||
2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000000000 |
|
||||
2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.599999999999994 |
|
||||
2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000000000 |
|
||||
2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000000003 |
|
||||
2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000000000 |
|
||||
2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000000000 |
|
||||
2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.799999999999997 |
|
||||
2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000000006 |
|
||||
2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000000000 |
|
||||
2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000000000 |
|
||||
2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.799999999999997 |
|
||||
2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000000001 |
|
||||
2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.399999999999999 |
|
||||
2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000000003 |
|
||||
2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000000000 |
|
||||
2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000000000 |
|
||||
2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000000000 |
|
||||
|
||||
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2;
|
||||
_wstart | d | avg(c) |
|
||||
================================================================================
|
||||
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 |
|
||||
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 |
|
||||
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000000000 |
|
||||
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000000000 |
|
||||
|
||||
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2,6;
|
||||
_wstart | d | avg(c) |
|
||||
================================================================================
|
||||
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 |
|
||||
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 |
|
||||
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 |
|
||||
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 |
|
||||
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 |
|
||||
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 |
|
||||
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000000006 |
|
||||
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000000000 |
|
||||
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000000000 |
|
||||
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000000011 |
|
||||
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000000000 |
|
||||
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.199999999999989 |
|
||||
|
||||
taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 10;
|
||||
last(ts) | d |
|
||||
|
|
|
@ -0,0 +1,266 @@
|
|||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
import math
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
# from tmqCommon import *
|
||||
|
||||
class TDTestCase:
|
||||
def __init__(self):
|
||||
self.vgroups = 4
|
||||
self.ctbNum = 10
|
||||
self.rowsPerTbl = 10000
|
||||
self.duraion = '1h'
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
|
||||
def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'):
|
||||
if dropFlag == 1:
|
||||
tsql.execute("drop database if exists %s"%(dbName))
|
||||
|
||||
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s"%(dbName, vgroups, replica, duration))
|
||||
tdLog.debug("complete to create database %s"%(dbName))
|
||||
return
|
||||
|
||||
def create_stable(self,tsql, paraDict):
|
||||
colString = tdCom.gen_column_type_str(colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"])
|
||||
tagString = tdCom.gen_tag_type_str(tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"])
|
||||
sqlString = f"create table if not exists %s.%s (%s) tags (%s)"%(paraDict["dbName"], paraDict["stbName"], colString, tagString)
|
||||
tdLog.debug("%s"%(sqlString))
|
||||
tsql.execute(sqlString)
|
||||
return
|
||||
|
||||
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
|
||||
for i in range(ctbNum):
|
||||
sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % \
|
||||
(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,(i+ctbStartIdx) % 5,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx)
|
||||
tsql.execute(sqlString)
|
||||
|
||||
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
|
||||
return
|
||||
|
||||
def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep):
|
||||
tdLog.debug("start to insert data ............")
|
||||
tsql.execute("use %s" %dbName)
|
||||
pre_insert = "insert into "
|
||||
sql = pre_insert
|
||||
|
||||
for i in range(ctbNum):
|
||||
rowsBatched = 0
|
||||
sql += " %s%d values "%(ctbPrefix,i)
|
||||
for j in range(rowsPerTbl):
|
||||
if (i < ctbNum/2):
|
||||
sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10, j%10, j%10)
|
||||
else:
|
||||
sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10)
|
||||
rowsBatched += 1
|
||||
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
|
||||
tsql.execute(sql)
|
||||
rowsBatched = 0
|
||||
if j < rowsPerTbl - 1:
|
||||
sql = "insert into %s%d values " %(ctbPrefix,i)
|
||||
else:
|
||||
sql = "insert into "
|
||||
if sql != pre_insert:
|
||||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
def prepareTestEnv(self):
|
||||
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||
paraDict = {'dbName': 'test',
|
||||
'dropFlag': 1,
|
||||
'vgroups': 2,
|
||||
'stbName': 'meters',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'FLOAT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'smallint', 'count':1},{'type': 'tinyint', 'count':1},{'type': 'bool', 'count':1},{'type': 'binary', 'len':10, 'count':1},{'type': 'nchar', 'len':10, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}],
|
||||
'ctbPrefix': 't',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 100,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 3000,
|
||||
'startTs': 1537146000000,
|
||||
'tsStep': 600000}
|
||||
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tdLog.info("create database")
|
||||
self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion)
|
||||
|
||||
tdLog.info("create stb")
|
||||
self.create_stable(tsql=tdSql, paraDict=paraDict)
|
||||
|
||||
tdLog.info("create child tables")
|
||||
self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"], \
|
||||
stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],\
|
||||
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict["ctbStartIdx"])
|
||||
self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],\
|
||||
ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\
|
||||
rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\
|
||||
startTs=paraDict["startTs"],tsStep=paraDict["tsStep"])
|
||||
return
|
||||
|
||||
def check_first_rows(self, all_rows, limited_rows, offset: int = 0):
|
||||
for i in range(0, len(limited_rows) - 1):
|
||||
if limited_rows[i] != all_rows[i + offset]:
|
||||
tdLog.info("row: %d, row in all: %s" % (i+offset+1, str(all_rows[i+offset])))
|
||||
tdLog.info("row: %d, row in limted: %s" % (i+1, str(limited_rows[i])))
|
||||
tdLog.exit("row data check failed")
|
||||
tdLog.info("all rows are the same as query without limit..")
|
||||
|
||||
def query_and_check_with_slimit(self, sql: str, max_limit: int, step: int, offset: int = 0):
|
||||
self.query_and_check_with_limit(sql, max_limit, step, offset, ' slimit ')
|
||||
|
||||
def query_and_check_with_limit(self, sql: str, max_limit: int, step: int, offset: int = 0, limit_str: str = ' limit '):
|
||||
for limit in range(0, max_limit, step):
|
||||
limited_sql = sql + limit_str + str(offset) + "," + str(limit)
|
||||
tdLog.info("query with sql: %s " % (sql) + limit_str + " %d,%d" % (offset, limit))
|
||||
all_rows = tdSql.getResult(sql)
|
||||
limited_rows = tdSql.getResult(limited_sql)
|
||||
tdLog.info("all rows: %d, limited rows: %d" % (len(all_rows), len(limited_rows)))
|
||||
if limit_str == ' limit ':
|
||||
if limit + offset <= len(all_rows) and len(limited_rows) != limit:
|
||||
tdLog.exit("limited sql has less rows than limit value which is not right, \
|
||||
limit: %d, limited_rows: %d, all_rows: %d, offset: %d" % (limit, len(limited_rows), len(all_rows), offset))
|
||||
elif limit + offset > len(all_rows) and offset < len(all_rows) and offset + len(limited_rows) != len(all_rows):
|
||||
tdLog.exit("limited sql has less rows than all_rows which is not right, \
|
||||
limit: %d, limited_rows: %d, all_rows: %d, offset: %d" % (limit, len(limited_rows), len(all_rows), offset))
|
||||
elif offset >= len(all_rows) and len(limited_rows) != 0:
|
||||
tdLog.exit("limited rows should be zero, \
|
||||
limit: %d, limited_rows: %d, all_rows: %d, offset: %d" % (limit, len(limited_rows), len(all_rows), offset))
|
||||
|
||||
self.check_first_rows(all_rows, limited_rows, offset)
|
||||
|
||||
def test_interval_limit_asc(self, offset: int = 0):
|
||||
sqls = ["select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1s) ",
|
||||
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1m) ",
|
||||
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1h) ",
|
||||
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1d) ",
|
||||
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1s) ",
|
||||
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1m) ",
|
||||
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1h) ",
|
||||
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1d) "]
|
||||
for sql in sqls:
|
||||
self.query_and_check_with_limit(sql, 5000, 500, offset)
|
||||
|
||||
def test_interval_limit_desc(self, offset: int = 0):
|
||||
sqls = ["select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1s) ",
|
||||
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1m) ",
|
||||
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1h) ",
|
||||
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1d) ",
|
||||
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1s) ",
|
||||
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1m) ",
|
||||
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1h) ",
|
||||
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1d) "]
|
||||
for sql in sqls:
|
||||
self.query_and_check_with_limit(sql, 5000, 500, offset)
|
||||
|
||||
def test_interval_limit_offset(self):
|
||||
for offset in range(0, 1000, 500):
|
||||
self.test_interval_limit_asc(offset)
|
||||
self.test_interval_limit_desc(offset)
|
||||
self.test_interval_fill_limit(offset)
|
||||
self.test_interval_order_by_limit(offset)
|
||||
self.test_interval_partition_by_slimit(offset)
|
||||
|
||||
def test_interval_fill_limit(self, offset: int = 0):
|
||||
sqls = [
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
|
||||
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1s) fill(linear)",
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
|
||||
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1m) fill(linear)",
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
|
||||
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1h) fill(linear)",
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
|
||||
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1d) fill(linear)"
|
||||
]
|
||||
for sql in sqls:
|
||||
self.query_and_check_with_limit(sql, 5000, 1000, offset)
|
||||
|
||||
def test_interval_order_by_limit(self, offset: int = 0):
|
||||
sqls = [
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
|
||||
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by b",
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
|
||||
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by a desc",
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), last(ts) from meters \
|
||||
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by a desc",
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
|
||||
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by count(*), sum(c1), a",
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
|
||||
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by a, count(*), sum(c1)",
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
|
||||
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by b",
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
|
||||
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by a desc",
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters \
|
||||
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by a desc",
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
|
||||
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by count(*), sum(c1), a",
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters \
|
||||
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by a, count(*), sum(c1)",
|
||||
]
|
||||
for sql in sqls:
|
||||
self.query_and_check_with_limit(sql, 6000, 2000, offset)
|
||||
|
||||
def test_interval_partition_by_slimit(self, offset: int = 0):
|
||||
sqls = [
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters "
|
||||
"where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by t1 interval(1m)",
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters "
|
||||
"where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by t1 interval(1h)",
|
||||
"select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters "
|
||||
"where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by c3 interval(1m)",
|
||||
]
|
||||
for sql in sqls:
|
||||
self.query_and_check_with_slimit(sql, 10, 2, offset)
|
||||
|
||||
def test_interval_partition_by_slimit_limit(self):
|
||||
sql = "select * from (select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts),c3 from meters " \
|
||||
"where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by c3 interval(1m) slimit 10 limit 2) order by c3 asc"
|
||||
tdSql.query(sql)
|
||||
tdSql.checkRows(20)
|
||||
tdSql.checkData(0, 4, 0)
|
||||
tdSql.checkData(1, 4, 0)
|
||||
tdSql.checkData(2, 4, 1)
|
||||
tdSql.checkData(3, 4, 1)
|
||||
tdSql.checkData(18, 4, 9)
|
||||
tdSql.checkData(19, 4, 9)
|
||||
|
||||
sql = "select * from (select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts),c3 from meters " \
|
||||
"where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by c3 interval(1m) slimit 2,2 limit 2) order by c3 asc"
|
||||
tdSql.query(sql)
|
||||
tdSql.checkRows(4)
|
||||
tdSql.checkData(0, 4, 2)
|
||||
tdSql.checkData(1, 4, 2)
|
||||
tdSql.checkData(2, 4, 9)
|
||||
tdSql.checkData(3, 4, 9)
|
||||
|
||||
def run(self):
|
||||
self.prepareTestEnv()
|
||||
self.test_interval_limit_offset()
|
||||
self.test_interval_partition_by_slimit_limit()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|