feat(query)[TS-5470]: add syntax to specify minimum duration for event and state windows

Introduce the `true for` syntax to allow users to specify the minimum
duration for event and state windows. Add corresponding tests to
validate the feature. Updated the user manual with usage instructions.
This commit is contained in:
Jinqing Kuang 2025-02-23 21:46:11 +08:00
parent 7f15be7b39
commit 51ffff2079
39 changed files with 862 additions and 131 deletions

View File

@ -55,9 +55,9 @@ join_clause:
window_clause: {
SESSION(ts_col, tol_val)
| STATE_WINDOW(col)
| STATE_WINDOW(col) [TRUE_FOR(true_for_duration)]
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)]
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition [TRUE_FOR(true_for_duration)]
| COUNT_WINDOW(count_val[, sliding_val])
interp_clause:

View File

@ -53,9 +53,9 @@ The syntax for the window clause is as follows:
```sql
window_clause: {
SESSION(ts_col, tol_val)
| STATE_WINDOW(col)
| STATE_WINDOW(col) [TRUE_FOR(true_for_duration)]
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [FILL(fill_mod_and_val)]
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition [TRUE_FOR(true_for_duration)]
| COUNT_WINDOW(count_val[, sliding_val])
}
```
@ -177,6 +177,12 @@ TDengine also supports using CASE expressions in state quantities, which can exp
SELECT tbname, _wstart, CASE WHEN voltage >= 205 and voltage <= 235 THEN 1 ELSE 0 END status FROM meters PARTITION BY tbname STATE_WINDOW(CASE WHEN voltage >= 205 and voltage <= 235 THEN 1 ELSE 0 END);
```
The status window supports using the TRUE_FOR parameter to set its minimum duration. If the window's duration is less than the specified value, it will be discarded automatically and no result will be returned. For example, setting the minimum duration to 3 seconds:
```
SELECT COUNT(*), FIRST(ts), status FROM temp_tb_1 STATE_WINDOW(status) TRUE_FOR (3s);
```
### Session Window
The session window is determined based on the timestamp primary key values of the records. As shown in the diagram below, if the continuous interval of the timestamps is set to be less than or equal to 12 seconds, the following 6 records form 2 session windows, which are: [2019-04-28 14:22:10, 2019-04-28 14:22:30] and [2019-04-28 14:23:10, 2019-04-28 14:23:30]. This is because the interval between 2019-04-28 14:22:30 and 2019-04-28 14:23:10 is 40 seconds, exceeding the continuous interval (12 seconds).
@ -212,6 +218,12 @@ select _wstart, _wend, count(*) from t event_window start with c1 > 0 end with c
<Image img={imgStep04} alt=""/>
</figure>
The event window supports using the TRUE_FOR parameter to set its minimum duration. If the window's duration is less than the specified value, it will be discarded automatically and no result will be returned. For example, setting the minimum duration to 3 seconds:
```
select _wstart, _wend, count(*) from t event_window start with c1 > 0 end with c2 < 10 true_for (3s);
```
### Count Window
Count windows divide data into windows based on a fixed number of data rows. By default, data is sorted by timestamp, then divided into multiple windows based on the value of count_val, and aggregate calculations are performed. count_val represents the maximum number of data rows in each count window; if the total number of data rows is not divisible by count_val, the last window will have fewer rows than count_val. sliding_val is a constant that represents the number of rows the window slides, similar to the SLIDING in interval.

View File

@ -458,6 +458,8 @@ This document details the server error codes that may be encountered when using
| 0x80002665 | The _TAGS pseudocolumn can only be used for subtable and supertable queries | Illegal tag column query | Check and correct the SQL statement |
| 0x80002666 | Subquery does not output primary timestamp column | Check and correct the SQL statement | |
| 0x80002667 | Invalid usage of expr: %s | Illegal expression | Check and correct the SQL statement |
| 0x80002687 | True_for duration cannot be negative | Use negative value as true_for duration | Check and correct the SQL statement |
| 0x80002688 | Cannot use 'year' or 'month' as true_for duration | Use year or month as true_for_duration | Check and correct the SQL statement |
| 0x800026FF | Parser internal error | Internal error in parser | Preserve the scene and logs, report issue on GitHub |
| 0x80002700 | Planner internal error | Internal error in planner | Preserve the scene and logs, report issue on GitHub |
| 0x80002701 | Expect ts equal | JOIN condition validation failed | Preserve the scene and logs, report issue on GitHub |

View File

@ -56,9 +56,9 @@ join_clause:
window_clause: {
SESSION(ts_col, tol_val)
| STATE_WINDOW(col)
| STATE_WINDOW(col) [TRUE_FOR(true_for_duration)]
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)]
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition [TRUE_FOR(true_for_duration)]
| COUNT_WINDOW(count_val[, sliding_val])
interp_clause:

View File

@ -46,9 +46,9 @@ TDengine 支持按时间窗口切分方式进行聚合结果查询,比如温
```sql
window_clause: {
SESSION(ts_col, tol_val)
| STATE_WINDOW(col)
| STATE_WINDOW(col) [TRUE_FOR(true_for_duration)]
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [FILL(fill_mod_and_val)]
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition [TRUE_FOR(true_for_duration)]
| COUNT_WINDOW(count_val[, sliding_val])
}
```
@ -165,6 +165,12 @@ TDengine 还支持将 CASE 表达式用在状态量,可以表达某个状态
SELECT tbname, _wstart, CASE WHEN voltage >= 205 and voltage <= 235 THEN 1 ELSE 0 END status FROM meters PARTITION BY tbname STATE_WINDOW(CASE WHEN voltage >= 205 and voltage <= 235 THEN 1 ELSE 0 END);
```
状态窗口支持使用 TRUE_FOR 参数来设定窗口的最小持续时长。如果某个状态窗口的宽度低于该设定值,则会自动舍弃,不返回任何计算结果。例如,设置最短持续时长为 3s:
```
SELECT COUNT(*), FIRST(ts), status FROM temp_tb_1 STATE_WINDOW(status) TRUE_FOR (3s);
```
### 会话窗口
会话窗口根据记录的时间戳主键的值来确定是否属于同一个会话。如下图所示,如果设置时间戳的连续的间隔小于等于 12 秒,则以下 6 条记录构成 2 个会话窗口,分别是:[2019-04-28 14:22:102019-04-28 14:22:30]和[2019-04-28 14:23:102019-04-28 14:23:30]。因为 2019-04-28 14:22:30 与 2019-04-28 14:23:10 之间的时间间隔是 40 秒超过了连续时间间隔12 秒)。
@ -196,6 +202,12 @@ select _wstart, _wend, count(*) from t event_window start with c1 > 0 end with c
![TDengine Database 事件窗口示意图](./event_window.webp)
事件窗口支持使用 TRUE_FOR 参数来设定窗口的最小持续时长。如果某个事件窗口的宽度低于该设定值,则会自动舍弃,不返回任何计算结果。例如,设置最短持续时长为 3s:
```
select _wstart, _wend, count(*) from t event_window start with c1 > 0 end with c2 < 10 true_for (3s);
```
### 计数窗口
计数窗口按固定的数据行数来划分窗口。默认将数据按时间戳排序再按照count_val的值将数据划分为多个窗口然后做聚合计算。count_val表示每个count window包含的最大数据行数总数据行数不能整除count_val时最后一个窗口的行数会小于count_val。sliding_val是常量表示窗口滑动的数量类似于 interval的SLIDING。

View File

@ -475,6 +475,8 @@ description: TDengine 服务端的错误码列表和详细说明
| 0x80002665 | The _TAGS pseudo column can only be used for subtable and supertable queries | 非法TAG列查询 | 检查并修正SQL语句 |
| 0x80002666 | 子查询不含主键时间戳列输出 | 检查并修正SQL语句 |
| 0x80002667 | Invalid usage of expr: %s | 非法表达式 | 检查并修正SQL语句 |
| 0x80002687 | True_for duration cannot be negative | true_for 的值不能是负数 | 检查并修正SQL语句 |
| 0x80002688 | Cannot use 'year' or 'month' as true_for duration | 不能使用 n(月), y(年) 作为 true_for 的时间单位 | 检查并修正SQL语句 |
| 0x800026FF | Parser internal error | 解析器内部错误 | 保留现场和日志github上报issue |
| 0x80002700 | Planner internal error | 计划期内部错误 | 保留现场和日志github上报issue |
| 0x80002701 | Expect ts equal | JOIN条件校验失败 | 保留现场和日志github上报issue |

View File

@ -327,6 +327,7 @@ typedef struct SWindowLogicNode {
SNode* pStateExpr;
SNode* pStartCond;
SNode* pEndCond;
int64_t trueForLimit;
int8_t triggerType;
int64_t watermark;
int64_t deleteMark;
@ -724,6 +725,7 @@ typedef SSessionWinodwPhysiNode SStreamFinalSessionWinodwPhysiNode;
typedef struct SStateWinodwPhysiNode {
SWindowPhysiNode window;
SNode* pStateKey;
int64_t trueForLimit;
} SStateWinodwPhysiNode;
typedef SStateWinodwPhysiNode SStreamStateWinodwPhysiNode;
@ -732,6 +734,7 @@ typedef struct SEventWinodwPhysiNode {
SWindowPhysiNode window;
SNode* pStartCond;
SNode* pEndCond;
int64_t trueForLimit;
} SEventWinodwPhysiNode;
typedef SEventWinodwPhysiNode SStreamEventWinodwPhysiNode;

View File

@ -322,6 +322,7 @@ typedef struct SStateWindowNode {
ENodeType type; // QUERY_NODE_STATE_WINDOW
SNode* pCol; // timestamp primary key
SNode* pExpr;
SNode* pTrueForLimit;
} SStateWindowNode;
typedef struct SSessionWindowNode {
@ -346,6 +347,7 @@ typedef struct SEventWindowNode {
SNode* pCol; // timestamp primary key
SNode* pStartCond;
SNode* pEndCond;
SNode* pTrueForLimit;
} SEventWindowNode;
typedef struct SCountWindowNode {

View File

@ -908,6 +908,8 @@ int32_t taosGetErrSize();
#define TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT TAOS_DEF_ERROR_CODE(0, 0x2684)
#define TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE TAOS_DEF_ERROR_CODE(0, 0x2685)
#define TSDB_CODE_PAR_INVALID_VGID_LIST TAOS_DEF_ERROR_CODE(0, 0x2686)
#define TSDB_CODE_PAR_TRUE_FOR_NEGATIVE TAOS_DEF_ERROR_CODE(0, 0x2687)
#define TSDB_CODE_PAR_TRUE_FOR_UNIT TAOS_DEF_ERROR_CODE(0, 0x2688)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
//planner

View File

@ -1510,7 +1510,7 @@ TEST(clientCase, sub_tb_mt_test) {
(void)taosThreadCreate(&qid[i], NULL, doConsumeData, NULL);
}
for (int32_t i = 0; i < 4; ++i) {
for (int32_t i = 0; i < 1; ++i) {
(void)taosThreadJoin(qid[i], NULL);
}
}

View File

@ -42,6 +42,7 @@ typedef struct SGroupResInfo {
int32_t index; // rows consumed in func:doCopyToSDataBlockXX
int32_t iter; // relate to index-1, last consumed data's slot id in hash table
void* dataPos; // relate to index-1, last consumed data's position, in the nodelist of cur slot
int32_t delIndex; // rows consumed in func:doBuildDeleteDataBlock
SArray* pRows; // SArray<SResKeyPos>
char* pBuf;
bool freeItem;

View File

@ -686,6 +686,54 @@ typedef struct SResultWindowInfo {
bool isOutput;
} SResultWindowInfo;
typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
bool reptScan; // next round scan
int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
struct SOperatorInfo* pOperator;
bool cleanGroupResInfo;
} SSessionAggOperatorInfo;
typedef struct SStateWindowOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SExprSupp scalarSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
SColumn stateCol; // start row index
bool hasKey;
SStateKeys stateKey;
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
struct SOperatorInfo* pOperator;
bool cleanGroupResInfo;
int64_t trueForLimit;
} SStateWindowOperatorInfo;
typedef struct SEventWindowOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SExprSupp scalarSup;
SWindowRowsSup winSup;
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
uint64_t groupId; // current group id, used to identify the data block from different groups
SFilterInfo* pStartCondInfo;
SFilterInfo* pEndCondInfo;
bool inWindow;
SResultRow* pRow;
SSDataBlock* pPreDataBlock;
struct SOperatorInfo* pOperator;
int64_t trueForLimit;
} SEventWindowOperatorInfo;
typedef struct SStreamSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SSteamOpBasicInfo basic;
@ -746,6 +794,7 @@ typedef struct SStreamStateAggOperatorInfo {
SSHashObj* pPkDeleted;
bool destHasPrimaryKey;
struct SOperatorInfo* pOperator;
int64_t trueForLimit;
} SStreamStateAggOperatorInfo;
typedef struct SStreamEventAggOperatorInfo {
@ -778,6 +827,7 @@ typedef struct SStreamEventAggOperatorInfo {
struct SOperatorInfo* pOperator;
SNodeList* pStartCondCols;
SNodeList* pEndCondCols;
int64_t trueForLimit;
} SStreamEventAggOperatorInfo;
typedef struct SStreamCountAggOperatorInfo {
@ -1052,7 +1102,8 @@ int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pW
int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated);
int32_t saveDeleteRes(SSHashObj* pStDelete, SSessionKey key);
void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey);
void doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite);
void doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite,
SGroupResInfo* pGroupResInfo);
void doBuildSessionResult(struct SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo,
SSDataBlock* pBlock, SArray* pSessionKeys);
int32_t getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo);
@ -1109,6 +1160,7 @@ int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBl
int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status);
bool getIgoreNullRes(SExprSupp* pExprSup);
bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull);
int64_t getMinWindowSize(struct SOperatorInfo* pOperator);
#ifdef __cplusplus
}

View File

@ -24,22 +24,6 @@
#include "tdatablock.h"
#include "ttime.h"
typedef struct SEventWindowOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SExprSupp scalarSup;
SWindowRowsSup winSup;
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
uint64_t groupId; // current group id, used to identify the data block from different groups
SFilterInfo* pStartCondInfo;
SFilterInfo* pEndCondInfo;
bool inWindow;
SResultRow* pRow;
SSDataBlock* pPreDataBlock;
SOperatorInfo* pOperator;
} SEventWindowOperatorInfo;
static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** pRes);
static void destroyEWindowOperatorInfo(void* param);
static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock);
@ -114,8 +98,9 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
pInfo->tsSlotId = tsSlotId;
pInfo->pPreDataBlock = NULL;
pInfo->pOperator = pOperator;
pInfo->trueForLimit = pEventWindowNode->trueForLimit;
setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregateNext, NULL, destroyEWindowOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
@ -297,6 +282,7 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
SWindowRowsSup* pRowSup = &pInfo->winSup;
int32_t rowIndex = 0;
int64_t minWindowSize = getMinWindowSize(pOperator);
pRowSup->numOfRows = 0;
if (pInfo->groupId == 0) {
@ -341,18 +327,23 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
QUERY_CHECK_CODE(code, lino, _return);
doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset);
// check buffer size
if (pRes->info.rows + pInfo->pRow->numOfRows >= pRes->info.capacity) {
int32_t newSize = pRes->info.rows + pInfo->pRow->numOfRows;
code = blockDataEnsureCapacity(pRes, newSize);
if (pRowSup->win.ekey - pRowSup->win.skey < minWindowSize) {
qDebug("skip small window, groupId: %" PRId64 ", windowSize: %" PRId64 ", minWindowSize: %" PRId64,
pInfo->groupId, pRowSup->win.ekey - pRowSup->win.skey, minWindowSize);
} else {
// check buffer size
if (pRes->info.rows + pInfo->pRow->numOfRows >= pRes->info.capacity) {
int32_t newSize = pRes->info.rows + pInfo->pRow->numOfRows;
code = blockDataEnsureCapacity(pRes, newSize);
QUERY_CHECK_CODE(code, lino, _return);
}
code = copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pRes,
pSup->rowEntryInfoOffset, pTaskInfo);
QUERY_CHECK_CODE(code, lino, _return);
pRes->info.rows += pInfo->pRow->numOfRows;
}
code = copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pRes,
pSup->rowEntryInfoOffset, pTaskInfo);
QUERY_CHECK_CODE(code, lino, _return);
pRes->info.rows += pInfo->pRow->numOfRows;
pInfo->pRow->numOfRows = 0;
pInfo->inWindow = false;

View File

@ -212,6 +212,7 @@ void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
pGroupResInfo->pRows = NULL;
}
pGroupResInfo->index = 0;
pGroupResInfo->delIndex = 0;
}
int32_t resultrowComparAsc(const void* p1, const void* p2) {
@ -303,6 +304,7 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL
pGroupResInfo->freeItem = true;
pGroupResInfo->pRows = pArrayList;
pGroupResInfo->index = 0;
pGroupResInfo->delIndex = 0;
}
bool hasRemainResults(SGroupResInfo* pGroupResInfo) {

View File

@ -80,7 +80,8 @@ static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* p
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol);
static void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup);
SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup,
int64_t minWindowSize);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
SFilePage* pData = NULL;
@ -846,7 +847,7 @@ _end:
}
void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup) {
SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup, int64_t minWindowSize) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExprInfo* pExprInfo = pSup->pExprInfo;
@ -874,6 +875,14 @@ void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp
releaseBufPage(pBuf, page);
continue;
}
// skip the window which is less than the windowMinSize
if (pRow->win.ekey - pRow->win.skey < minWindowSize) {
qDebug("skip small window, groupId: %" PRId64 ", windowSize: %" PRId64 ", minWindowSize: %" PRId64, pPos->groupId,
pRow->win.ekey - pRow->win.skey, minWindowSize);
pGroupResInfo->index += 1;
releaseBufPage(pBuf, page);
continue;
}
if (!ignoreGroup) {
if (pBlock->info.id.groupId == 0) {
@ -937,11 +946,11 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
pBlock->info.id.groupId = 0;
if (!pbInfo->mergeResultBlock) {
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold,
false);
false, getMinWindowSize(pOperator));
} else {
while (hasRemainResults(pGroupResInfo)) {
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold,
true);
true, getMinWindowSize(pOperator));
if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
break;
}

View File

@ -395,7 +395,7 @@ static int32_t buildCountResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
STaskNotifyEventStat* pNotifyEventStat = pTaskInfo->streamInfo.pNotifyEventStat;
bool addNotifyEvent = false;
addNotifyEvent = BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE);
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator, &pInfo->groupResInfo);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
if (addNotifyEvent) {

View File

@ -616,8 +616,8 @@ static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SStreamNotifyEventSupp* pNotifySup = &pInfo->basic.notifyEventSup;
STaskNotifyEventStat* pNotifyEventStat = pTaskInfo->streamInfo.pNotifyEventStat;
bool addNotifyEvent = false;
addNotifyEvent = BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE);
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
addNotifyEvent = BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE);
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator, &pInfo->groupResInfo);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
if (addNotifyEvent) {
@ -1075,6 +1075,8 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
code = nodesCollectColumnsFromNode((SNode*)pEventNode->pEndCond, NULL, COLLECT_COL_TYPE_ALL, &pInfo->pEndCondCols);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->trueForLimit = pEventNode->trueForLimit;
*pOptrInfo = pOperator;
return TSDB_CODE_SUCCESS;

View File

@ -22,13 +22,13 @@
#define NOTIFY_EVENT_NAME_CACHE_LIMIT_MB 16
typedef struct SStreamNotifyEvent {
uint64_t gid;
int64_t eventType;
uint64_t gid;
int64_t eventType;
STimeWindow win;
cJSON* pJson;
cJSON* pJson;
} SStreamNotifyEvent;
#define NOTIFY_EVENT_KEY_SIZE \
#define NOTIFY_EVENT_KEY_SIZE \
((sizeof(((struct SStreamNotifyEvent*)0)->gid) + sizeof(((struct SStreamNotifyEvent*)0)->eventType)) + \
sizeof(((struct SStreamNotifyEvent*)0)->win.skey))

View File

@ -462,6 +462,7 @@ void clearGroupResInfo(SGroupResInfo* pGroupResInfo) {
taosArrayDestroy(pGroupResInfo->pRows);
pGroupResInfo->pRows = NULL;
pGroupResInfo->index = 0;
pGroupResInfo->delIndex = 0;
}
void destroyStreamFinalIntervalOperatorInfo(void* param) {
@ -2864,14 +2865,86 @@ inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) {
return 0;
}
void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) {
static int32_t appendToDeleteDataBlock(SOperatorInfo* pOp, SSDataBlock *pBlock, SSessionKey *pKey) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStorageAPI* pAPI = &pOp->pTaskInfo->storageAPI;
SExecTaskInfo* pTaskInfo = pOp->pTaskInfo;
QUERY_CHECK_NULL(pBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pKey, code, lino, _end, TSDB_CODE_INVALID_PARA);
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
code = colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)&pKey->win.skey, false);
QUERY_CHECK_CODE(code, lino, _end);
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
code = colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)&pKey->win.skey, false);
QUERY_CHECK_CODE(code, lino, _end);
SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
colDataSetNULL(pUidCol, pBlock->info.rows);
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
code = colDataSetVal(pGpCol, pBlock->info.rows, (const char*)&pKey->groupId, false);
QUERY_CHECK_CODE(code, lino, _end);
SColumnInfoData* pCalStCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
colDataSetNULL(pCalStCol, pBlock->info.rows);
SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
colDataSetNULL(pCalEdCol, pBlock->info.rows);
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
if (!pTableCol) {
QUERY_CHECK_CODE(code, lino, _end);
}
void* tbname = NULL;
int32_t winCode = TSDB_CODE_SUCCESS;
SStorageAPI* pAPI = &pOp->pTaskInfo->storageAPI;
code =
pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, pKey->groupId, &tbname, false, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
if (winCode != TSDB_CODE_SUCCESS) {
colDataSetNULL(pTableCol, pBlock->info.rows);
} else {
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
code = colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
QUERY_CHECK_CODE(code, lino, _end);
pAPI->stateStore.streamStateFreeVal(tbname);
}
pBlock->info.rows += 1;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
}
return code;
}
void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite,
SGroupResInfo* pGroupResInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOp->pTaskInfo;
int64_t minWindowSize = getMinWindowSize(pOp);
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
blockDataCleanup(pBlock);
int32_t size = tSimpleHashGetSize(pStDeleted);
if (minWindowSize > 0) {
// Add the number of windows that are below the minimum width limit.
for (int32_t i = pGroupResInfo->delIndex; i < numOfRows; ++i) {
SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i);
SRowBuffPos* pPos = pWinInfo->pStatePos;
SSessionKey* pKey = (SSessionKey*)pPos->pKey;
if (pKey->win.ekey - pKey->win.skey < minWindowSize) {
size++;
}
}
}
if (size == 0) {
return;
}
@ -2884,48 +2957,21 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo
break;
}
SSessionKey* res = tSimpleHashGetKey(*Ite, NULL);
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
code = colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)&res->win.skey, false);
code = appendToDeleteDataBlock(pOp, pBlock, res);
QUERY_CHECK_CODE(code, lino, _end);
}
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
code = colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)&res->win.skey, false);
QUERY_CHECK_CODE(code, lino, _end);
SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
colDataSetNULL(pUidCol, pBlock->info.rows);
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
code = colDataSetVal(pGpCol, pBlock->info.rows, (const char*)&res->groupId, false);
QUERY_CHECK_CODE(code, lino, _end);
SColumnInfoData* pCalStCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
colDataSetNULL(pCalStCol, pBlock->info.rows);
SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
colDataSetNULL(pCalEdCol, pBlock->info.rows);
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
if (!pTableCol) {
QUERY_CHECK_CODE(code, lino, _end);
if (minWindowSize > 0) {
for (int32_t i = pGroupResInfo->delIndex; i < numOfRows; ++i) {
SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i);
SRowBuffPos* pPos = pWinInfo->pStatePos;
SSessionKey* pKey = (SSessionKey*)pPos->pKey;
if (pKey->win.ekey - pKey->win.skey < minWindowSize) {
code = appendToDeleteDataBlock(pOp, pBlock, pKey);
QUERY_CHECK_CODE(code, lino, _end);
}
}
void* tbname = NULL;
int32_t winCode = TSDB_CODE_SUCCESS;
code = pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, res->groupId, &tbname, false,
&winCode);
QUERY_CHECK_CODE(code, lino, _end);
if (winCode != TSDB_CODE_SUCCESS) {
colDataSetNULL(pTableCol, pBlock->info.rows);
} else {
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
code = colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
QUERY_CHECK_CODE(code, lino, _end);
pAPI->stateStore.streamStateFreeVal(tbname);
}
pBlock->info.rows += 1;
pGroupResInfo->delIndex = numOfRows;
}
_end:
@ -3118,6 +3164,7 @@ void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL
pGroupResInfo->index = 0;
pGroupResInfo->pBuf = NULL;
pGroupResInfo->freeItem = false;
pGroupResInfo->delIndex = 0;
}
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup,
@ -3130,6 +3177,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
SqlFunctionCtx* pCtx = pSup->pCtx;
int64_t minWindowSize = getMinWindowSize(pOperator);
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
@ -3170,6 +3218,13 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
pGroupResInfo->index += 1;
continue;
}
// skip the window which is less than the windowMinSize
if (pKey->win.ekey - pKey->win.skey < minWindowSize) {
qDebug("skip small window, groupId: %" PRId64 ", windowSize: %" PRId64 ", minWindowSize: %" PRId64, pKey->groupId,
pKey->win.ekey - pKey->win.skey, minWindowSize);
pGroupResInfo->index += 1;
continue;
}
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
break;
@ -3263,7 +3318,7 @@ static int32_t buildSessionResult(SOperatorInfo* pOperator, SSDataBlock** ppRes)
bool addNotifyEvent = false;
addNotifyEvent = IS_NORMAL_SESSION_OP(pOperator) &&
BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE);
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator, &pInfo->groupResInfo);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
if (addNotifyEvent) {
@ -4905,7 +4960,7 @@ static int32_t buildStateResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
STaskNotifyEventStat* pNotifyEventStat = pTaskInfo->streamInfo.pNotifyEventStat;
bool addNotifyEvent = false;
addNotifyEvent = BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE);
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator, &pInfo->groupResInfo);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
if (addNotifyEvent) {
@ -5340,6 +5395,8 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
code = appendDownstream(pOperator, &downstream, 1);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->trueForLimit = pStateNode->trueForLimit;
*pOptrInfo = pOperator;
return TSDB_CODE_SUCCESS;

View File

@ -1390,3 +1390,22 @@ void destroyTimeSliceOperatorInfo(void* param) {
}
taosMemoryFreeClear(param);
}
int64_t getMinWindowSize(struct SOperatorInfo* pOperator) {
if (pOperator == NULL) {
return 0;
}
switch (pOperator->operatorType) {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
return ((SStateWindowOperatorInfo*)pOperator->info)->trueForLimit;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return ((SStreamStateAggOperatorInfo*)pOperator->info)->trueForLimit;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
return ((SEventWindowOperatorInfo*)pOperator->info)->trueForLimit;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
return ((SStreamEventAggOperatorInfo*)pOperator->info)->trueForLimit;
default:
return 0;
}
}

View File

@ -27,35 +27,6 @@
#include "tlog.h"
#include "ttime.h"
typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
bool reptScan; // next round scan
int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
SOperatorInfo* pOperator;
bool cleanGroupResInfo;
} SSessionAggOperatorInfo;
typedef struct SStateWindowOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SExprSupp scalarSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
SColumn stateCol; // start row index
bool hasKey;
SStateKeys stateKey;
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
SOperatorInfo* pOperator;
bool cleanGroupResInfo;
} SStateWindowOperatorInfo;
typedef enum SResultTsInterpType {
RESULT_ROW_START_INTERP = 1,
RESULT_ROW_END_INTERP = 2,
@ -1743,6 +1714,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy
pInfo->tsSlotId = tsSlotId;
pInfo->pOperator = pOperator;
pInfo->cleanGroupResInfo = false;
pInfo->trueForLimit = pStateNode->trueForLimit;
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,

View File

@ -354,6 +354,7 @@ static int32_t limitNodeCopy(const SLimitNode* pSrc, SLimitNode* pDst) {
static int32_t stateWindowNodeCopy(const SStateWindowNode* pSrc, SStateWindowNode* pDst) {
CLONE_NODE_FIELD(pCol);
CLONE_NODE_FIELD(pExpr);
CLONE_NODE_FIELD(pTrueForLimit);
return TSDB_CODE_SUCCESS;
}
@ -361,6 +362,7 @@ static int32_t eventWindowNodeCopy(const SEventWindowNode* pSrc, SEventWindowNod
CLONE_NODE_FIELD(pCol);
CLONE_NODE_FIELD(pStartCond);
CLONE_NODE_FIELD(pEndCond);
CLONE_NODE_FIELD(pTrueForLimit);
return TSDB_CODE_SUCCESS;
}
@ -627,6 +629,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
CLONE_NODE_FIELD(pStateExpr);
CLONE_NODE_FIELD(pStartCond);
CLONE_NODE_FIELD(pEndCond);
COPY_SCALAR_FIELD(trueForLimit);
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(deleteMark);

View File

@ -3127,6 +3127,7 @@ static int32_t jsonToPhysiSessionWindowNode(const SJson* pJson, void* pObj) {
}
static const char* jkStateWindowPhysiPlanStateKey = "StateKey";
static const char* jkStateWindowPhysiPlanTrueForLimit = "TrueForLimit";
static int32_t physiStateWindowNodeToJson(const void* pObj, SJson* pJson) {
const SStateWinodwPhysiNode* pNode = (const SStateWinodwPhysiNode*)pObj;
@ -3135,6 +3136,9 @@ static int32_t physiStateWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkStateWindowPhysiPlanStateKey, nodeToJson, pNode->pStateKey);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkStateWindowPhysiPlanTrueForLimit, pNode->trueForLimit);
}
return code;
}
@ -3146,12 +3150,16 @@ static int32_t jsonToPhysiStateWindowNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkStateWindowPhysiPlanStateKey, &pNode->pStateKey);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkStateWindowPhysiPlanTrueForLimit, &pNode->trueForLimit);
}
return code;
}
static const char* jkEventWindowPhysiPlanStartCond = "StartCond";
static const char* jkEventWindowPhysiPlanEndCond = "EndCond";
static const char* jkEventWindowPhysiPlanTrueForLimit = "TrueForLimit";
static int32_t physiEventWindowNodeToJson(const void* pObj, SJson* pJson) {
const SEventWinodwPhysiNode* pNode = (const SEventWinodwPhysiNode*)pObj;
@ -3163,6 +3171,9 @@ static int32_t physiEventWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkEventWindowPhysiPlanEndCond, nodeToJson, pNode->pEndCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkEventWindowPhysiPlanTrueForLimit, pNode->trueForLimit);
}
return code;
}
@ -3177,6 +3188,9 @@ static int32_t jsonToPhysiEventWindowNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkEventWindowPhysiPlanEndCond, &pNode->pEndCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkEventWindowPhysiPlanTrueForLimit, &pNode->trueForLimit);
}
return code;
}
@ -4960,6 +4974,7 @@ static int32_t jsonToLimitNode(const SJson* pJson, void* pObj) {
static const char* jkStateWindowCol = "StateWindowCol";
static const char* jkStateWindowExpr = "StateWindowExpr";
static const char* jkStateWindowTrueForLimit = "TrueForLimit";
static int32_t stateWindowNodeToJson(const void* pObj, SJson* pJson) {
const SStateWindowNode* pNode = (const SStateWindowNode*)pObj;
@ -4967,6 +4982,9 @@ static int32_t stateWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkStateWindowExpr, nodeToJson, pNode->pExpr);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkStateWindowTrueForLimit, nodeToJson, pNode->pTrueForLimit);
}
return code;
}
@ -4977,6 +4995,9 @@ static int32_t jsonToStateWindowNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkStateWindowExpr, (SNode**)&pNode->pExpr);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkStateWindowTrueForLimit, (SNode**)&pNode->pTrueForLimit);
}
return code;
}
@ -5006,6 +5027,7 @@ static int32_t jsonToSessionWindowNode(const SJson* pJson, void* pObj) {
static const char* jkEventWindowTsPrimaryKey = "TsPrimaryKey";
static const char* jkEventWindowStartCond = "StartCond";
static const char* jkEventWindowEndCond = "EndCond";
static const char* jkEventWindowTrueForLimit = "TrueForLimit";
static int32_t eventWindowNodeToJson(const void* pObj, SJson* pJson) {
const SEventWindowNode* pNode = (const SEventWindowNode*)pObj;
@ -5017,6 +5039,9 @@ static int32_t eventWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkEventWindowEndCond, nodeToJson, pNode->pEndCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkEventWindowTrueForLimit, nodeToJson, pNode->pTrueForLimit);
}
return code;
}
@ -5030,6 +5055,9 @@ static int32_t jsonToEventWindowNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkEventWindowEndCond, &pNode->pEndCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkEventWindowTrueForLimit, &pNode->pTrueForLimit);
}
return code;
}

View File

@ -3443,7 +3443,7 @@ static int32_t msgToPhysiSessionWindowNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { PHY_STATE_CODE_WINDOW = 1, PHY_STATE_CODE_KEY };
enum { PHY_STATE_CODE_WINDOW = 1, PHY_STATE_CODE_KEY, PHY_STATE_CODE_TRUE_FOR_LIMIT };
static int32_t physiStateWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SStateWinodwPhysiNode* pNode = (const SStateWinodwPhysiNode*)pObj;
@ -3452,6 +3452,9 @@ static int32_t physiStateWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_STATE_CODE_KEY, nodeToMsg, pNode->pStateKey);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_STATE_CODE_TRUE_FOR_LIMIT, pNode->trueForLimit);
}
return code;
}
@ -3469,6 +3472,9 @@ static int32_t msgToPhysiStateWindowNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_STATE_CODE_KEY:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pStateKey);
break;
case PHY_STATE_CODE_TRUE_FOR_LIMIT:
code = tlvDecodeI64(pTlv, &pNode->trueForLimit);
break;
default:
break;
}
@ -3477,7 +3483,7 @@ static int32_t msgToPhysiStateWindowNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { PHY_EVENT_CODE_WINDOW = 1, PHY_EVENT_CODE_START_COND, PHY_EVENT_CODE_END_COND };
enum { PHY_EVENT_CODE_WINDOW = 1, PHY_EVENT_CODE_START_COND, PHY_EVENT_CODE_END_COND, PHY_EVENT_CODE_TRUE_FOR_LIMIT };
static int32_t physiEventWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SEventWinodwPhysiNode* pNode = (const SEventWinodwPhysiNode*)pObj;
@ -3489,6 +3495,9 @@ static int32_t physiEventWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_EVENT_CODE_END_COND, nodeToMsg, pNode->pEndCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_EVENT_CODE_TRUE_FOR_LIMIT, pNode->trueForLimit);
}
return code;
}
@ -3509,6 +3518,9 @@ static int32_t msgToPhysiEventWindowNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_EVENT_CODE_END_COND:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pEndCond);
break;
case PHY_EVENT_CODE_TRUE_FOR_LIMIT:
code = tlvDecodeI64(pTlv, &pNode->trueForLimit);
break;
default:
break;
}

View File

@ -102,6 +102,9 @@ static EDealRes dispatchExpr(SNode* pNode, ETraversalOrder order, FNodeWalker wa
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkExpr(pState->pCol, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkExpr(pState->pTrueForLimit, order, walker, pContext);
}
break;
}
case QUERY_NODE_SESSION_WINDOW: {
@ -174,6 +177,9 @@ static EDealRes dispatchExpr(SNode* pNode, ETraversalOrder order, FNodeWalker wa
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkExpr(pEvent->pEndCond, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkExpr(pEvent->pTrueForLimit, order, walker, pContext);
}
break;
}
case QUERY_NODE_COUNT_WINDOW: {
@ -313,6 +319,9 @@ static EDealRes rewriteExpr(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteExpr(&pState->pCol, order, rewriter, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteExpr(&pState->pTrueForLimit, order, rewriter, pContext);
}
break;
}
case QUERY_NODE_SESSION_WINDOW: {
@ -385,6 +394,9 @@ static EDealRes rewriteExpr(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteExpr(&pEvent->pEndCond, order, rewriter, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteExpr(&pEvent->pTrueForLimit, order, rewriter, pContext);
}
break;
}
case QUERY_NODE_WINDOW_OFFSET: {

View File

@ -1125,6 +1125,7 @@ void nodesDestroyNode(SNode* pNode) {
SStateWindowNode* pState = (SStateWindowNode*)pNode;
nodesDestroyNode(pState->pCol);
nodesDestroyNode(pState->pExpr);
nodesDestroyNode(pState->pTrueForLimit);
break;
}
case QUERY_NODE_SESSION_WINDOW: {
@ -1239,6 +1240,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pEvent->pCol);
nodesDestroyNode(pEvent->pStartCond);
nodesDestroyNode(pEvent->pEndCond);
nodesDestroyNode(pEvent->pTrueForLimit);
break;
}
case QUERY_NODE_COUNT_WINDOW: {

View File

@ -91,6 +91,7 @@ TEST_F(NodesCloneTest, stateWindow) {
SStateWindowNode* pDstNode = (SStateWindowNode*)pDst;
ASSERT_EQ(nodeType(pSrcNode->pCol), nodeType(pDstNode->pCol));
ASSERT_EQ(nodeType(pSrcNode->pExpr), nodeType(pDstNode->pExpr));
ASSERT_EQ(nodeType(pSrcNode->pTrueForLimit), nodeType(pDstNode->pTrueForLimit));
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
@ -102,6 +103,7 @@ TEST_F(NodesCloneTest, stateWindow) {
SStateWindowNode* pNode = (SStateWindowNode*)srcNode.get();
code = nodesMakeNode(QUERY_NODE_COLUMN, &pNode->pCol);
code = nodesMakeNode(QUERY_NODE_OPERATOR, &pNode->pExpr);
code = nodesMakeNode(QUERY_NODE_VALUE, &pNode->pTrueForLimit);
return srcNode.get();
}());
}

View File

@ -155,8 +155,8 @@ SNode* createViewNode(SAstCreateContext* pCxt, SToken* pDbName, SToken* pVie
SNode* createLimitNode(SAstCreateContext* pCxt, SNode* pLimit, SNode* pOffset);
SNode* createOrderByExprNode(SAstCreateContext* pCxt, SNode* pExpr, EOrder order, ENullOrder nullOrder);
SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode* pGap);
SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr);
SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond);
SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr, SNode *pTrueForLimit);
SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond, SNode *pTrueForLimit);
SNode* createCountWindowNode(SAstCreateContext* pCxt, const SToken* pCountToken, const SToken* pSlidingToken);
SNode* createAnomalyWindowNode(SAstCreateContext* pCxt, SNode* pExpr, const SToken* pFuncOpt);
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding,

View File

@ -1628,7 +1628,8 @@ partition_item(A) ::= expr_or_subquery(B) AS column_alias(C).
twindow_clause_opt(A) ::= . { A = NULL; }
twindow_clause_opt(A) ::= SESSION NK_LP column_reference(B) NK_COMMA
interval_sliding_duration_literal(C) NK_RP. { A = createSessionWindowNode(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)); }
twindow_clause_opt(A) ::= STATE_WINDOW NK_LP expr_or_subquery(B) NK_RP. { A = createStateWindowNode(pCxt, releaseRawExprNode(pCxt, B)); }
twindow_clause_opt(A) ::=
STATE_WINDOW NK_LP expr_or_subquery(B) NK_RP true_for_opt(C). { A = createStateWindowNode(pCxt, releaseRawExprNode(pCxt, B), C); }
twindow_clause_opt(A) ::= INTERVAL NK_LP interval_sliding_duration_literal(B)
NK_RP sliding_opt(C) fill_opt(D). { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, B), NULL, C, D); }
twindow_clause_opt(A) ::=
@ -1637,9 +1638,9 @@ twindow_clause_opt(A) ::=
sliding_opt(D) fill_opt(E). { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C), D, E); }
twindow_clause_opt(A) ::=
INTERVAL NK_LP interval_sliding_duration_literal(B) NK_COMMA
AUTO(C) NK_RP sliding_opt(D) fill_opt(E). { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, B), createDurationValueNode(pCxt, &C), D, E); }
twindow_clause_opt(A) ::=
EVENT_WINDOW START WITH search_condition(B) END WITH search_condition(C). { A = createEventWindowNode(pCxt, B, C); }
AUTO(C) NK_RP sliding_opt(D) fill_opt(E). { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, B), createDurationValueNode(pCxt, &C), D, E); }
twindow_clause_opt(A) ::= EVENT_WINDOW START WITH search_condition(B)
END WITH search_condition(C) true_for_opt(D). { A = createEventWindowNode(pCxt, B, C, D); }
twindow_clause_opt(A) ::=
COUNT_WINDOW NK_LP NK_INTEGER(B) NK_RP. { A = createCountWindowNode(pCxt, &B, &B); }
twindow_clause_opt(A) ::=
@ -1717,6 +1718,9 @@ range_opt(A) ::=
every_opt(A) ::= . { A = NULL; }
every_opt(A) ::= EVERY NK_LP duration_literal(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
true_for_opt(A) ::= . { A = NULL; }
true_for_opt(A) ::= TRUE_FOR NK_LP interval_sliding_duration_literal(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
/************************************************ query_expression ****************************************************/
query_expression(A) ::= query_simple(B)
order_by_clause_opt(C) slimit_clause_opt(D) limit_clause_opt(E). {

View File

@ -1332,7 +1332,7 @@ _err:
return NULL;
}
SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr) {
SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr, SNode* pTrueForLimit) {
SStateWindowNode* state = NULL;
CHECK_PARSER_STATUS(pCxt);
pCxt->errCode = nodesMakeNode(QUERY_NODE_STATE_WINDOW, (SNode**)&state);
@ -1340,14 +1340,16 @@ SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr) {
state->pCol = createPrimaryKeyCol(pCxt, NULL);
CHECK_MAKE_NODE(state->pCol);
state->pExpr = pExpr;
state->pTrueForLimit = pTrueForLimit;
return (SNode*)state;
_err:
nodesDestroyNode((SNode*)state);
nodesDestroyNode(pExpr);
nodesDestroyNode(pTrueForLimit);
return NULL;
}
SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond) {
SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond, SNode* pTrueForLimit) {
SEventWindowNode* pEvent = NULL;
CHECK_PARSER_STATUS(pCxt);
pCxt->errCode = nodesMakeNode(QUERY_NODE_EVENT_WINDOW, (SNode**)&pEvent);
@ -1356,11 +1358,13 @@ SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode*
CHECK_MAKE_NODE(pEvent->pCol);
pEvent->pStartCond = pStartCond;
pEvent->pEndCond = pEndCond;
pEvent->pTrueForLimit = pTrueForLimit;
return (SNode*)pEvent;
_err:
nodesDestroyNode((SNode*)pEvent);
nodesDestroyNode(pStartCond);
nodesDestroyNode(pEndCond);
nodesDestroyNode(pTrueForLimit);
return NULL;
}

View File

@ -359,6 +359,7 @@ static SKeyword keywordTable[] = {
{"ON_FAILURE", TK_ON_FAILURE},
{"NOTIFY_HISTORY", TK_NOTIFY_HISTORY},
{"REGEXP", TK_REGEXP},
{"TRUE_FOR", TK_TRUE_FOR}
};
// clang-format on

View File

@ -6043,6 +6043,20 @@ static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* p
return TSDB_CODE_SUCCESS;
}
static int32_t checkTrueForLimit(STranslateContext *pCxt, SNode *pNode) {
SValueNode *pTrueForLimit = (SValueNode *)pNode;
if (pTrueForLimit == NULL) {
return TSDB_CODE_SUCCESS;
}
if (pTrueForLimit->datum.i < 0) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TRUE_FOR_NEGATIVE);
}
if (IS_CALENDAR_TIME_DURATION(pTrueForLimit->unit)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TRUE_FOR_UNIT);
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateStateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) &&
!isGlobalTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) {
@ -6055,6 +6069,9 @@ static int32_t translateStateWindow(STranslateContext* pCxt, SSelectStmt* pSelec
if (TSDB_CODE_SUCCESS == code) {
code = checkStateWindowForStream(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkTrueForLimit(pCxt, pState->pTrueForLimit);
}
return code;
}
@ -6081,7 +6098,7 @@ static int32_t translateEventWindow(STranslateContext* pCxt, SSelectStmt* pSelec
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TIMELINE_QUERY,
"EVENT_WINDOW requires valid time series input");
}
return TSDB_CODE_SUCCESS;
return checkTrueForLimit(pCxt, ((SEventWindowNode*)pSelect->pWindow)->pTrueForLimit);
}
static int32_t translateCountWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {

View File

@ -227,6 +227,10 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Some functions cannot appear in the select list at the same time";
case TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR:
return "Syntax error in regular expression";
case TSDB_CODE_PAR_TRUE_FOR_NEGATIVE:
return "True_for duration cannot be negative";
case TSDB_CODE_PAR_TRUE_FOR_UNIT:
return "Cannot use 'year' or 'month' as true_for duration";
default:
return "Unknown error";
}

View File

@ -1159,6 +1159,9 @@ static int32_t createWindowLogicNodeByState(SLogicPlanContext* pCxt, SStateWindo
nodesDestroyNode((SNode*)pWindow);
return code;
}
if (pState->pTrueForLimit) {
pWindow->trueForLimit = ((SValueNode*)pState->pTrueForLimit)->datum.i;
}
// rewrite the expression in subsequent clauses
code = rewriteExprForSelect(pWindow->pStateExpr, pSelect, SQL_CLAUSE_WINDOW);
if (TSDB_CODE_SUCCESS == code) {
@ -1272,6 +1275,9 @@ static int32_t createWindowLogicNodeByEvent(SLogicPlanContext* pCxt, SEventWindo
nodesDestroyNode((SNode*)pWindow);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pEvent->pTrueForLimit) {
pWindow->trueForLimit = ((SValueNode*)pEvent->pTrueForLimit)->datum.i;
}
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
}

View File

@ -2328,6 +2328,8 @@ static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
// }
}
pState->trueForLimit = pWindowLogicNode->trueForLimit;
if (TSDB_CODE_SUCCESS == code) {
code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pState->window, pWindowLogicNode);
}
@ -2358,6 +2360,7 @@ static int32_t createEventWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pEndCond, &pEvent->pEndCond);
}
pEvent->trueForLimit = pWindowLogicNode->trueForLimit;
if (TSDB_CODE_SUCCESS == code) {
code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pEvent->window, pWindowLogicNode);
}

View File

@ -752,6 +752,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT, "ANOMALY_WINDOW opti
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE, "Invalid forecast clause")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VGID_LIST, "Invalid vgid list")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TRUE_FOR_NEGATIVE, "True_for duration cannot be negative")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TRUE_FOR_UNIT, "Cannot use 'year' or 'month' as true_for duration")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
//planner

View File

@ -554,6 +554,8 @@ TSDB_CODE_PAR_COL_PK_TYPE = 0x80002679
TSDB_CODE_PAR_INVALID_PK_OP = 0x8000267A
TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL = 0x8000267B
TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE = 0x8000267C
TSDB_CODE_PAR_TRUE_FOR_NEGATIVE = 0x80002687
TSDB_CODE_PAR_TRUE_FOR_UNIT = 0x80002688
TSDB_CODE_PAR_INTERNAL_ERROR = 0x800026FF
TSDB_CODE_PLAN_INTERNAL_ERROR = 0x80002700
TSDB_CODE_PLAN_EXPECTED_TS_EQUAL = 0x80002701

View File

@ -1252,6 +1252,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/operator.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/operator.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f eco-system/manager/schema_change.py -N 3 -M 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_window_true_for.py
#tsim test
,,y,script,./test.sh -f tsim/query/timeline.sim

View File

@ -0,0 +1,488 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import time
import taos
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
# init
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
def create_objects(self):
tdSql.execute("drop database if exists test", show=True)
tdSql.execute("create database test keep 36500 precision 'ms'", show=True)
tdSql.execute("use test", show=True)
tdSql.execute("create stable st (ts timestamp, c1 int) tags (gid int)", show=True)
tdSql.execute("create table ct_0 using st(gid) tags (0)")
tdSql.execute("create table ct_1 using st(gid) tags (1)")
tdSql.execute(f'''create stream s_event_1 into d_event_1 as
select _wstart, _wend, count(*) from ct_0
event_window start with c1 > 0 end with c1 < 0 true_for(3s);''', show=True)
tdSql.execute(f'''create stream s_event_2 ignore update 0 ignore expired 0 into d_event_2 as
select _wstart, _wend, count(*) from ct_0
event_window start with c1 > 0 end with c1 < 0 true_for(3s);''', show=True)
tdSql.execute(f'''create stream s_event_3 into d_event_3 as
select _wstart, _wend, count(*) from ct_0
event_window start with c1 > 0 end with c1 < 0 true_for(2999);''', show=True)
tdSql.execute(f'''create stream s_event_4 ignore update 0 ignore expired 0 into d_event_4 as
select _wstart, _wend, count(*) from ct_0
event_window start with c1 > 0 end with c1 < 0 true_for(2999);''', show=True)
tdSql.execute(f'''create stream s_event_5 into d_event_5 as
select _wstart, _wend, count(*) from ct_0
event_window start with c1 > 0 end with c1 < 0 true_for('3001a');''', show=True)
tdSql.execute(f'''create stream s_event_6 ignore update 0 ignore expired 0 into d_event_6 as
select _wstart, _wend, count(*) from ct_0
event_window start with c1 > 0 end with c1 < 0 true_for('3001a');''', show=True)
tdSql.execute(f'''create stream s_state_1 into d_state_1 as
select _wstart, _wend, count(*) from ct_1
state_window(c1) true_for (3s);''', show=True)
tdSql.execute(f'''create stream s_state_2 ignore update 0 ignore expired 0 into d_state_2 as
select _wstart, _wend, count(*) from ct_1
state_window(c1) true_for (3s);''', show=True)
tdSql.execute(f'''create stream s_state_3 into d_state_3 as
select _wstart, _wend, count(*) from ct_1
state_window(c1) true_for (2999);''', show=True)
tdSql.execute(f'''create stream s_state_4 ignore update 0 ignore expired 0 into d_state_4 as
select _wstart, _wend, count(*) from ct_1
state_window(c1) true_for (2999);''', show=True)
tdSql.execute(f'''create stream s_state_5 into d_state_5 as
select _wstart, _wend, count(*) from ct_1
state_window(c1) true_for ('3001a');''', show=True)
tdSql.execute(f'''create stream s_state_6 ignore update 0 ignore expired 0 into d_state_6 as
select _wstart, _wend, count(*) from ct_1
state_window(c1) true_for ('3001a');''', show=True)
# Wait for the stream tasks to be ready
for i in range(50):
tdLog.info(f"i={i} wait for stream tasks ready ...")
time.sleep(1)
rows = tdSql.query("select * from information_schema.ins_stream_tasks where status <> 'ready';")
if rows == 0:
break
def insert_data(self):
tdSql.execute(f'''insert into ct_0 values
('2025-01-01 00:00:00.000', -1),
('2025-01-01 00:00:01.000', 1),
('2025-01-01 00:00:02.000', -1),
('2025-01-01 00:00:03.000', 1),
('2025-01-01 00:00:04.000', 1),
('2025-01-01 00:00:05.000', -1),
('2025-01-01 00:00:06.000', 1),
('2025-01-01 00:00:07.000', 1),
('2025-01-01 00:00:08.000', 1),
('2025-01-01 00:00:08.999', -1),
('2025-01-01 00:00:10.000', 1),
('2025-01-01 00:00:11.000', 1),
('2025-01-01 00:00:12.000', 1),
('2025-01-01 00:00:13.000', -1),
('2025-01-01 00:00:14.000', 1),
('2025-01-01 00:00:15.000', 1),
('2025-01-01 00:00:16.000', 1),
('2025-01-01 00:00:17.001', -1),
('2025-01-01 00:00:18.000', 1),
('2025-01-01 00:00:19.000', 1),
('2025-01-01 00:00:20.000', 1),
('2025-01-01 00:00:21.000', 1),
('2025-01-01 00:00:22.000', -1),
('2025-01-01 00:00:23.000', -1),
('2025-01-01 00:00:24.000', 1),
('2025-01-01 00:00:25.000', 1),
('2025-01-01 00:00:26.000', 1),
('2025-01-01 00:00:27.000', 1),
('2025-01-01 00:00:28.000', 1),
('2025-01-01 00:00:29.000', 1),
('2025-01-01 00:00:30.000', -1),
('2025-01-01 00:00:31.000', 0);''', show=True)
tdSql.execute(f'''insert into ct_1 values
('2025-01-01 00:00:00.000', 0),
('2025-01-01 00:00:01.000', 1),
('2025-01-01 00:00:02.000', 1),
('2025-01-01 00:00:03.000', 2),
('2025-01-01 00:00:04.000', 2),
('2025-01-01 00:00:05.000', 2),
('2025-01-01 00:00:06.000', 3),
('2025-01-01 00:00:07.000', 3),
('2025-01-01 00:00:08.000', 3),
('2025-01-01 00:00:08.999', 3),
('2025-01-01 00:00:10.000', 4),
('2025-01-01 00:00:11.000', 4),
('2025-01-01 00:00:12.000', 4),
('2025-01-01 00:00:13.000', 4),
('2025-01-01 00:00:14.000', 5),
('2025-01-01 00:00:15.000', 5),
('2025-01-01 00:00:16.000', 5),
('2025-01-01 00:00:17.001', 5),
('2025-01-01 00:00:18.000', 6),
('2025-01-01 00:00:19.000', 6),
('2025-01-01 00:00:20.000', 6),
('2025-01-01 00:00:21.000', 6),
('2025-01-01 00:00:22.000', 6),
('2025-01-01 00:00:23.000', 0),
('2025-01-01 00:00:24.000', 7),
('2025-01-01 00:00:25.000', 7),
('2025-01-01 00:00:26.000', 7),
('2025-01-01 00:00:27.000', 7),
('2025-01-01 00:00:28.000', 7),
('2025-01-01 00:00:29.000', 7),
('2025-01-01 00:00:30.000', 7),
('2025-01-01 00:00:31.000', 0);''', show=True)
tdLog.info("wait for all stream tasks to be ready ...")
time.sleep(10)
def update_data(self):
tdSql.execute(f'''insert into ct_0 values
('2025-01-01 00:00:00.000', 1),
('2025-01-01 00:00:22.000', 1),
('2025-01-01 00:00:28.000', -1);''', show=True)
tdSql.execute(f'''insert into ct_1 values
('2025-01-01 00:00:00.000', 1),
('2025-01-01 00:00:23.000', 6),
('2025-01-01 00:00:29.000', 8),
('2025-01-01 00:00:30.000', 8);''', show=True)
tdLog.info("wait for all stream tasks to be ready ...")
time.sleep(5)
def check_result(self):
tdSql.query("select * from d_event_1", show=True)
tdSql.checkRows(4)
tdSql.checkData(0, 0, '2025-01-01 00:00:10.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:13.000')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(1, 2, 4)
tdSql.checkData(2, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:22.000')
tdSql.checkData(2, 2, 5)
tdSql.checkData(3, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(3, 1, '2025-01-01 00:00:30.000')
tdSql.checkData(3, 2, 7)
tdSql.query("select * from d_event_2", show=True)
tdSql.checkRows(4)
tdSql.checkData(0, 0, '2025-01-01 00:00:10.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:13.000')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(1, 2, 4)
tdSql.checkData(2, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:23.000')
tdSql.checkData(2, 2, 6)
tdSql.checkData(3, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(3, 1, '2025-01-01 00:00:28.000')
tdSql.checkData(3, 2, 5)
tdSql.query("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(3s);", show=True)
tdSql.checkRows(4)
tdSql.checkData(0, 0, '2025-01-01 00:00:10.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:13.000')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(1, 2, 4)
tdSql.checkData(2, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:23.000')
tdSql.checkData(2, 2, 6)
tdSql.checkData(3, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(3, 1, '2025-01-01 00:00:28.000')
tdSql.checkData(3, 2, 5)
tdSql.query("select * from d_event_3", show=True)
tdSql.checkRows(5)
tdSql.checkData(0, 0, '2025-01-01 00:00:06.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:08.999')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:10.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:13.000')
tdSql.checkData(1, 2, 4)
tdSql.checkData(2, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(2, 2, 4)
tdSql.checkData(3, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(3, 1, '2025-01-01 00:00:22.000')
tdSql.checkData(3, 2, 5)
tdSql.checkData(4, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(4, 1, '2025-01-01 00:00:30.000')
tdSql.checkData(4, 2, 7)
tdSql.query("select * from d_event_4", show=True)
tdSql.checkRows(5)
tdSql.checkData(0, 0, '2025-01-01 00:00:06.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:08.999')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:10.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:13.000')
tdSql.checkData(1, 2, 4)
tdSql.checkData(2, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(2, 2, 4)
tdSql.checkData(3, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(3, 1, '2025-01-01 00:00:23.000')
tdSql.checkData(3, 2, 6)
tdSql.checkData(4, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(4, 1, '2025-01-01 00:00:28.000')
tdSql.checkData(4, 2, 5)
tdSql.query("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(2999);", show=True)
tdSql.checkRows(5)
tdSql.checkData(0, 0, '2025-01-01 00:00:06.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:08.999')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:10.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:13.000')
tdSql.checkData(1, 2, 4)
tdSql.checkData(2, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(2, 2, 4)
tdSql.checkData(3, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(3, 1, '2025-01-01 00:00:23.000')
tdSql.checkData(3, 2, 6)
tdSql.checkData(4, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(4, 1, '2025-01-01 00:00:28.000')
tdSql.checkData(4, 2, 5)
tdSql.query("select * from d_event_5", show=True)
tdSql.checkRows(3)
tdSql.checkData(0, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:22.000')
tdSql.checkData(1, 2, 5)
tdSql.checkData(2, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:30.000')
tdSql.checkData(2, 2, 7)
tdSql.query("select * from d_event_6", show=True)
tdSql.checkRows(3)
tdSql.checkData(0, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:23.000')
tdSql.checkData(1, 2, 6)
tdSql.checkData(2, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:28.000')
tdSql.checkData(2, 2, 5)
tdSql.query("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for('3001a');", show=True)
tdSql.checkRows(3)
tdSql.checkData(0, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:23.000')
tdSql.checkData(1, 2, 6)
tdSql.checkData(2, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:28.000')
tdSql.checkData(2, 2, 5)
tdSql.query("select * from d_state_1", show=True)
tdSql.checkRows(4)
tdSql.checkData(0, 0, '2025-01-01 00:00:10.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:13.000')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(1, 2, 4)
tdSql.checkData(2, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:22.000')
tdSql.checkData(2, 2, 5)
tdSql.checkData(3, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(3, 1, '2025-01-01 00:00:30.000')
tdSql.checkData(3, 2, 7)
tdSql.query("select * from d_state_2", show=True)
tdSql.checkRows(4)
tdSql.checkData(0, 0, '2025-01-01 00:00:10.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:13.000')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(1, 2, 4)
tdSql.checkData(2, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:23.000')
tdSql.checkData(2, 2, 6)
tdSql.checkData(3, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(3, 1, '2025-01-01 00:00:28.000')
tdSql.checkData(3, 2, 5)
tdSql.query("select _wstart, _wend, count(*) from ct_1 state_window(c1) true_for(3s);", show=True)
tdSql.checkRows(4)
tdSql.checkData(0, 0, '2025-01-01 00:00:10.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:13.000')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(1, 2, 4)
tdSql.checkData(2, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:23.000')
tdSql.checkData(2, 2, 6)
tdSql.checkData(3, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(3, 1, '2025-01-01 00:00:28.000')
tdSql.checkData(3, 2, 5)
tdSql.query("select * from d_state_3", show=True)
tdSql.checkRows(5)
tdSql.checkData(0, 0, '2025-01-01 00:00:06.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:08.999')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:10.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:13.000')
tdSql.checkData(1, 2, 4)
tdSql.checkData(2, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(2, 2, 4)
tdSql.checkData(3, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(3, 1, '2025-01-01 00:00:22.000')
tdSql.checkData(3, 2, 5)
tdSql.checkData(4, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(4, 1, '2025-01-01 00:00:30.000')
tdSql.checkData(4, 2, 7)
tdSql.query("select * from d_state_4", show=True)
tdSql.checkRows(5)
tdSql.checkData(0, 0, '2025-01-01 00:00:06.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:08.999')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:10.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:13.000')
tdSql.checkData(1, 2, 4)
tdSql.checkData(2, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(2, 2, 4)
tdSql.checkData(3, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(3, 1, '2025-01-01 00:00:23.000')
tdSql.checkData(3, 2, 6)
tdSql.checkData(4, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(4, 1, '2025-01-01 00:00:28.000')
tdSql.checkData(4, 2, 5)
tdSql.query("select _wstart, _wend, count(*) from ct_1 state_window(c1) true_for(2999);", show=True)
tdSql.checkRows(5)
tdSql.checkData(0, 0, '2025-01-01 00:00:06.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:08.999')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:10.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:13.000')
tdSql.checkData(1, 2, 4)
tdSql.checkData(2, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(2, 2, 4)
tdSql.checkData(3, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(3, 1, '2025-01-01 00:00:23.000')
tdSql.checkData(3, 2, 6)
tdSql.checkData(4, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(4, 1, '2025-01-01 00:00:28.000')
tdSql.checkData(4, 2, 5)
tdSql.query("select * from d_state_5", show=True)
tdSql.checkRows(3)
tdSql.checkData(0, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:22.000')
tdSql.checkData(1, 2, 5)
tdSql.checkData(2, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:30.000')
tdSql.checkData(2, 2, 7)
tdSql.query("select * from d_state_6", show=True)
tdSql.checkRows(3)
tdSql.checkData(0, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:23.000')
tdSql.checkData(1, 2, 6)
tdSql.checkData(2, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:28.000')
tdSql.checkData(2, 2, 5)
tdSql.query("select _wstart, _wend, count(*) from ct_1 state_window(c1) true_for('3001a');", show=True)
tdSql.checkRows(3)
tdSql.checkData(0, 0, '2025-01-01 00:00:14.000')
tdSql.checkData(0, 1, '2025-01-01 00:00:17.001')
tdSql.checkData(0, 2, 4)
tdSql.checkData(1, 0, '2025-01-01 00:00:18.000')
tdSql.checkData(1, 1, '2025-01-01 00:00:23.000')
tdSql.checkData(1, 2, 6)
tdSql.checkData(2, 0, '2025-01-01 00:00:24.000')
tdSql.checkData(2, 1, '2025-01-01 00:00:28.000')
tdSql.checkData(2, 2, 5)
def test_abnormal_query(self):
tdLog.info("test abnormal window true_for limit")
tdSql.error("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(3n);")
tdSql.error("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(3y);")
tdSql.error("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(-1);")
tdSql.error("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(-1a);")
tdSql.error("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for('-1a');")
tdSql.error("create stream s_ab into dst as select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(3n);")
tdSql.error("create stream s_ab into dst as select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(3y);")
tdSql.error("create stream s_ab into dst as select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(-1);")
tdSql.error("create stream s_ab into dst as select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(-1a);")
tdSql.error("create stream s_ab into dst as select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for('-1a');")
def test_window_true_for_limit(self):
""" Test the functionality of the true_for window function.
This test covers:
1. Both batch query and stream computing scenarios.
2. Two types of windows: event_window and state_window.
3. Parameter types for true_for: numeric and string.
4. Boundary value tests.
5. Error case tests.
Since: v3.3.6.0
Labels: true_for, state_window, event_window
Jira: TS-5470
History:
- 2025-02-21 Kuang Jinqing Created
"""
self.create_objects()
self.insert_data()
self.update_data()
self.check_result()
self.test_abnormal_query()
# run
def run(self):
self.test_window_true_for_limit()
# stop
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())