From 51ffff207967682e680fdd79dfe0c8eb8a60565b Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Sun, 23 Feb 2025 21:46:11 +0800 Subject: [PATCH] feat(query)[TS-5470]: add syntax to specify minimum duration for event and state windows Introduce the `true for` syntax to allow users to specify the minimum duration for event and state windows. Add corresponding tests to validate the feature. Updated the user manual with usage instructions. --- docs/en/14-reference/03-taos-sql/06-select.md | 4 +- .../03-taos-sql/12-distinguished.md | 16 +- docs/en/14-reference/09-error-code.md | 2 + docs/zh/14-reference/03-taos-sql/06-select.md | 4 +- .../03-taos-sql/12-distinguished.md | 16 +- docs/zh/14-reference/09-error-code.md | 2 + include/libs/nodes/plannodes.h | 3 + include/libs/nodes/querynodes.h | 2 + include/util/taoserror.h | 2 + source/client/test/clientTests.cpp | 2 +- source/libs/executor/inc/executil.h | 1 + source/libs/executor/inc/executorInt.h | 54 +- .../libs/executor/src/eventwindowoperator.c | 45 +- source/libs/executor/src/executil.c | 2 + source/libs/executor/src/executorInt.c | 17 +- .../executor/src/streamcountwindowoperator.c | 2 +- .../executor/src/streameventwindowoperator.c | 6 +- source/libs/executor/src/streamexecutorInt.c | 8 +- .../executor/src/streamtimewindowoperator.c | 143 +++-- source/libs/executor/src/timesliceoperator.c | 19 + source/libs/executor/src/timewindowoperator.c | 30 +- source/libs/nodes/src/nodesCloneFuncs.c | 3 + source/libs/nodes/src/nodesCodeFuncs.c | 28 + source/libs/nodes/src/nodesMsgFuncs.c | 16 +- source/libs/nodes/src/nodesTraverseFuncs.c | 12 + source/libs/nodes/src/nodesUtilFuncs.c | 2 + source/libs/nodes/test/nodesCloneTest.cpp | 2 + source/libs/parser/inc/parAst.h | 4 +- source/libs/parser/inc/sql.y | 12 +- source/libs/parser/src/parAstCreater.c | 8 +- source/libs/parser/src/parTokenizer.c | 1 + source/libs/parser/src/parTranslater.c | 19 +- source/libs/parser/src/parUtil.c | 4 + source/libs/planner/src/planLogicCreater.c | 6 + source/libs/planner/src/planPhysiCreater.c | 3 + source/util/src/terror.c | 2 + source/util/test/errorCodeTable.ini | 2 + tests/parallel_test/cases.task | 1 + .../2-query/test_window_true_for.py | 488 ++++++++++++++++++ 39 files changed, 862 insertions(+), 131 deletions(-) create mode 100644 tests/system-test/2-query/test_window_true_for.py diff --git a/docs/en/14-reference/03-taos-sql/06-select.md b/docs/en/14-reference/03-taos-sql/06-select.md index 21cb419bed..54c4248ff0 100644 --- a/docs/en/14-reference/03-taos-sql/06-select.md +++ b/docs/en/14-reference/03-taos-sql/06-select.md @@ -55,9 +55,9 @@ join_clause: window_clause: { SESSION(ts_col, tol_val) - | STATE_WINDOW(col) + | STATE_WINDOW(col) [TRUE_FOR(true_for_duration)] | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)] - | EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition + | EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition [TRUE_FOR(true_for_duration)] | COUNT_WINDOW(count_val[, sliding_val]) interp_clause: diff --git a/docs/en/14-reference/03-taos-sql/12-distinguished.md b/docs/en/14-reference/03-taos-sql/12-distinguished.md index e98b654be3..e2afdef0aa 100644 --- a/docs/en/14-reference/03-taos-sql/12-distinguished.md +++ b/docs/en/14-reference/03-taos-sql/12-distinguished.md @@ -53,9 +53,9 @@ The syntax for the window clause is as follows: ```sql window_clause: { SESSION(ts_col, tol_val) - | STATE_WINDOW(col) + | STATE_WINDOW(col) [TRUE_FOR(true_for_duration)] | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [FILL(fill_mod_and_val)] - | EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition + | EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition [TRUE_FOR(true_for_duration)] | COUNT_WINDOW(count_val[, sliding_val]) } ``` @@ -177,6 +177,12 @@ TDengine also supports using CASE expressions in state quantities, which can exp SELECT tbname, _wstart, CASE WHEN voltage >= 205 and voltage <= 235 THEN 1 ELSE 0 END status FROM meters PARTITION BY tbname STATE_WINDOW(CASE WHEN voltage >= 205 and voltage <= 235 THEN 1 ELSE 0 END); ``` +The status window supports using the TRUE_FOR parameter to set its minimum duration. If the window's duration is less than the specified value, it will be discarded automatically and no result will be returned. For example, setting the minimum duration to 3 seconds: + +``` +SELECT COUNT(*), FIRST(ts), status FROM temp_tb_1 STATE_WINDOW(status) TRUE_FOR (3s); +``` + ### Session Window The session window is determined based on the timestamp primary key values of the records. As shown in the diagram below, if the continuous interval of the timestamps is set to be less than or equal to 12 seconds, the following 6 records form 2 session windows, which are: [2019-04-28 14:22:10, 2019-04-28 14:22:30] and [2019-04-28 14:23:10, 2019-04-28 14:23:30]. This is because the interval between 2019-04-28 14:22:30 and 2019-04-28 14:23:10 is 40 seconds, exceeding the continuous interval (12 seconds). @@ -212,6 +218,12 @@ select _wstart, _wend, count(*) from t event_window start with c1 > 0 end with c +The event window supports using the TRUE_FOR parameter to set its minimum duration. If the window's duration is less than the specified value, it will be discarded automatically and no result will be returned. For example, setting the minimum duration to 3 seconds: + +``` +select _wstart, _wend, count(*) from t event_window start with c1 > 0 end with c2 < 10 true_for (3s); +``` + ### Count Window Count windows divide data into windows based on a fixed number of data rows. By default, data is sorted by timestamp, then divided into multiple windows based on the value of count_val, and aggregate calculations are performed. count_val represents the maximum number of data rows in each count window; if the total number of data rows is not divisible by count_val, the last window will have fewer rows than count_val. sliding_val is a constant that represents the number of rows the window slides, similar to the SLIDING in interval. diff --git a/docs/en/14-reference/09-error-code.md b/docs/en/14-reference/09-error-code.md index 233ac78a19..e52251fe6c 100644 --- a/docs/en/14-reference/09-error-code.md +++ b/docs/en/14-reference/09-error-code.md @@ -458,6 +458,8 @@ This document details the server error codes that may be encountered when using | 0x80002665 | The _TAGS pseudocolumn can only be used for subtable and supertable queries | Illegal tag column query | Check and correct the SQL statement | | 0x80002666 | Subquery does not output primary timestamp column | Check and correct the SQL statement | | | 0x80002667 | Invalid usage of expr: %s | Illegal expression | Check and correct the SQL statement | +| 0x80002687 | True_for duration cannot be negative | Use negative value as true_for duration | Check and correct the SQL statement | +| 0x80002688 | Cannot use 'year' or 'month' as true_for duration | Use year or month as true_for_duration | Check and correct the SQL statement | | 0x800026FF | Parser internal error | Internal error in parser | Preserve the scene and logs, report issue on GitHub | | 0x80002700 | Planner internal error | Internal error in planner | Preserve the scene and logs, report issue on GitHub | | 0x80002701 | Expect ts equal | JOIN condition validation failed | Preserve the scene and logs, report issue on GitHub | diff --git a/docs/zh/14-reference/03-taos-sql/06-select.md b/docs/zh/14-reference/03-taos-sql/06-select.md index 3375fe6134..15b9196f1a 100644 --- a/docs/zh/14-reference/03-taos-sql/06-select.md +++ b/docs/zh/14-reference/03-taos-sql/06-select.md @@ -56,9 +56,9 @@ join_clause: window_clause: { SESSION(ts_col, tol_val) - | STATE_WINDOW(col) + | STATE_WINDOW(col) [TRUE_FOR(true_for_duration)] | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)] - | EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition + | EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition [TRUE_FOR(true_for_duration)] | COUNT_WINDOW(count_val[, sliding_val]) interp_clause: diff --git a/docs/zh/14-reference/03-taos-sql/12-distinguished.md b/docs/zh/14-reference/03-taos-sql/12-distinguished.md index bd6705b33b..8ac941dd77 100644 --- a/docs/zh/14-reference/03-taos-sql/12-distinguished.md +++ b/docs/zh/14-reference/03-taos-sql/12-distinguished.md @@ -46,9 +46,9 @@ TDengine 支持按时间窗口切分方式进行聚合结果查询,比如温 ```sql window_clause: { SESSION(ts_col, tol_val) - | STATE_WINDOW(col) + | STATE_WINDOW(col) [TRUE_FOR(true_for_duration)] | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [FILL(fill_mod_and_val)] - | EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition + | EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition [TRUE_FOR(true_for_duration)] | COUNT_WINDOW(count_val[, sliding_val]) } ``` @@ -165,6 +165,12 @@ TDengine 还支持将 CASE 表达式用在状态量,可以表达某个状态 SELECT tbname, _wstart, CASE WHEN voltage >= 205 and voltage <= 235 THEN 1 ELSE 0 END status FROM meters PARTITION BY tbname STATE_WINDOW(CASE WHEN voltage >= 205 and voltage <= 235 THEN 1 ELSE 0 END); ``` +状态窗口支持使用 TRUE_FOR 参数来设定窗口的最小持续时长。如果某个状态窗口的宽度低于该设定值,则会自动舍弃,不返回任何计算结果。例如,设置最短持续时长为 3s: + +``` +SELECT COUNT(*), FIRST(ts), status FROM temp_tb_1 STATE_WINDOW(status) TRUE_FOR (3s); +``` + ### 会话窗口 会话窗口根据记录的时间戳主键的值来确定是否属于同一个会话。如下图所示,如果设置时间戳的连续的间隔小于等于 12 秒,则以下 6 条记录构成 2 个会话窗口,分别是:[2019-04-28 14:22:10,2019-04-28 14:22:30]和[2019-04-28 14:23:10,2019-04-28 14:23:30]。因为 2019-04-28 14:22:30 与 2019-04-28 14:23:10 之间的时间间隔是 40 秒,超过了连续时间间隔(12 秒)。 @@ -196,6 +202,12 @@ select _wstart, _wend, count(*) from t event_window start with c1 > 0 end with c ![TDengine Database 事件窗口示意图](./event_window.webp) +事件窗口支持使用 TRUE_FOR 参数来设定窗口的最小持续时长。如果某个事件窗口的宽度低于该设定值,则会自动舍弃,不返回任何计算结果。例如,设置最短持续时长为 3s: + +``` +select _wstart, _wend, count(*) from t event_window start with c1 > 0 end with c2 < 10 true_for (3s); +``` + ### 计数窗口 计数窗口按固定的数据行数来划分窗口。默认将数据按时间戳排序,再按照count_val的值,将数据划分为多个窗口,然后做聚合计算。count_val表示每个count window包含的最大数据行数,总数据行数不能整除count_val时,最后一个窗口的行数会小于count_val。sliding_val是常量,表示窗口滑动的数量,类似于 interval的SLIDING。 diff --git a/docs/zh/14-reference/09-error-code.md b/docs/zh/14-reference/09-error-code.md index d29ff542eb..5037c631eb 100644 --- a/docs/zh/14-reference/09-error-code.md +++ b/docs/zh/14-reference/09-error-code.md @@ -475,6 +475,8 @@ description: TDengine 服务端的错误码列表和详细说明 | 0x80002665 | The _TAGS pseudo column can only be used for subtable and supertable queries | 非法TAG列查询 | 检查并修正SQL语句 | | 0x80002666 | 子查询不含主键时间戳列输出 | 检查并修正SQL语句 | | 0x80002667 | Invalid usage of expr: %s | 非法表达式 | 检查并修正SQL语句 | +| 0x80002687 | True_for duration cannot be negative | true_for 的值不能是负数 | 检查并修正SQL语句 | +| 0x80002688 | Cannot use 'year' or 'month' as true_for duration | 不能使用 n(月), y(年) 作为 true_for 的时间单位 | 检查并修正SQL语句 | | 0x800026FF | Parser internal error | 解析器内部错误 | 保留现场和日志,github上报issue | | 0x80002700 | Planner internal error | 计划期内部错误 | 保留现场和日志,github上报issue | | 0x80002701 | Expect ts equal | JOIN条件校验失败 | 保留现场和日志,github上报issue | diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 1afec35c3c..9ed565090e 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -327,6 +327,7 @@ typedef struct SWindowLogicNode { SNode* pStateExpr; SNode* pStartCond; SNode* pEndCond; + int64_t trueForLimit; int8_t triggerType; int64_t watermark; int64_t deleteMark; @@ -724,6 +725,7 @@ typedef SSessionWinodwPhysiNode SStreamFinalSessionWinodwPhysiNode; typedef struct SStateWinodwPhysiNode { SWindowPhysiNode window; SNode* pStateKey; + int64_t trueForLimit; } SStateWinodwPhysiNode; typedef SStateWinodwPhysiNode SStreamStateWinodwPhysiNode; @@ -732,6 +734,7 @@ typedef struct SEventWinodwPhysiNode { SWindowPhysiNode window; SNode* pStartCond; SNode* pEndCond; + int64_t trueForLimit; } SEventWinodwPhysiNode; typedef SEventWinodwPhysiNode SStreamEventWinodwPhysiNode; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index c80dda863f..2ffdea6eeb 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -322,6 +322,7 @@ typedef struct SStateWindowNode { ENodeType type; // QUERY_NODE_STATE_WINDOW SNode* pCol; // timestamp primary key SNode* pExpr; + SNode* pTrueForLimit; } SStateWindowNode; typedef struct SSessionWindowNode { @@ -346,6 +347,7 @@ typedef struct SEventWindowNode { SNode* pCol; // timestamp primary key SNode* pStartCond; SNode* pEndCond; + SNode* pTrueForLimit; } SEventWindowNode; typedef struct SCountWindowNode { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 397118411c..cc8c49d109 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -908,6 +908,8 @@ int32_t taosGetErrSize(); #define TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT TAOS_DEF_ERROR_CODE(0, 0x2684) #define TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE TAOS_DEF_ERROR_CODE(0, 0x2685) #define TSDB_CODE_PAR_INVALID_VGID_LIST TAOS_DEF_ERROR_CODE(0, 0x2686) +#define TSDB_CODE_PAR_TRUE_FOR_NEGATIVE TAOS_DEF_ERROR_CODE(0, 0x2687) +#define TSDB_CODE_PAR_TRUE_FOR_UNIT TAOS_DEF_ERROR_CODE(0, 0x2688) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 54c0e59817..8ebbe62857 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1510,7 +1510,7 @@ TEST(clientCase, sub_tb_mt_test) { (void)taosThreadCreate(&qid[i], NULL, doConsumeData, NULL); } - for (int32_t i = 0; i < 4; ++i) { + for (int32_t i = 0; i < 1; ++i) { (void)taosThreadJoin(qid[i], NULL); } } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 9e36a29476..726ab6eaea 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -42,6 +42,7 @@ typedef struct SGroupResInfo { int32_t index; // rows consumed in func:doCopyToSDataBlockXX int32_t iter; // relate to index-1, last consumed data's slot id in hash table void* dataPos; // relate to index-1, last consumed data's position, in the nodelist of cur slot + int32_t delIndex; // rows consumed in func:doBuildDeleteDataBlock SArray* pRows; // SArray char* pBuf; bool freeItem; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index e7bc1f67e1..2ad625e309 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -686,6 +686,54 @@ typedef struct SResultWindowInfo { bool isOutput; } SResultWindowInfo; +typedef struct SSessionAggOperatorInfo { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SExprSupp scalarSupp; // supporter for perform scalar function + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + bool reptScan; // next round scan + int64_t gap; // session window gap + int32_t tsSlotId; // primary timestamp slot id + STimeWindowAggSupp twAggSup; + struct SOperatorInfo* pOperator; + bool cleanGroupResInfo; +} SSessionAggOperatorInfo; + +typedef struct SStateWindowOperatorInfo { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SExprSupp scalarSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + SColumn stateCol; // start row index + bool hasKey; + SStateKeys stateKey; + int32_t tsSlotId; // primary timestamp column slot id + STimeWindowAggSupp twAggSup; + struct SOperatorInfo* pOperator; + bool cleanGroupResInfo; + int64_t trueForLimit; +} SStateWindowOperatorInfo; + + +typedef struct SEventWindowOperatorInfo { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SExprSupp scalarSup; + SWindowRowsSup winSup; + int32_t tsSlotId; // primary timestamp column slot id + STimeWindowAggSupp twAggSup; + uint64_t groupId; // current group id, used to identify the data block from different groups + SFilterInfo* pStartCondInfo; + SFilterInfo* pEndCondInfo; + bool inWindow; + SResultRow* pRow; + SSDataBlock* pPreDataBlock; + struct SOperatorInfo* pOperator; + int64_t trueForLimit; +} SEventWindowOperatorInfo; + typedef struct SStreamSessionAggOperatorInfo { SOptrBasicInfo binfo; SSteamOpBasicInfo basic; @@ -746,6 +794,7 @@ typedef struct SStreamStateAggOperatorInfo { SSHashObj* pPkDeleted; bool destHasPrimaryKey; struct SOperatorInfo* pOperator; + int64_t trueForLimit; } SStreamStateAggOperatorInfo; typedef struct SStreamEventAggOperatorInfo { @@ -778,6 +827,7 @@ typedef struct SStreamEventAggOperatorInfo { struct SOperatorInfo* pOperator; SNodeList* pStartCondCols; SNodeList* pEndCondCols; + int64_t trueForLimit; } SStreamEventAggOperatorInfo; typedef struct SStreamCountAggOperatorInfo { @@ -1052,7 +1102,8 @@ int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pW int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated); int32_t saveDeleteRes(SSHashObj* pStDelete, SSessionKey key); void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey); -void doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite); +void doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite, + SGroupResInfo* pGroupResInfo); void doBuildSessionResult(struct SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, SSDataBlock* pBlock, SArray* pSessionKeys); int32_t getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo); @@ -1109,6 +1160,7 @@ int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBl int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status); bool getIgoreNullRes(SExprSupp* pExprSup); bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull); +int64_t getMinWindowSize(struct SOperatorInfo* pOperator); #ifdef __cplusplus } diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index e68a91d97d..925a4bd7ff 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -24,22 +24,6 @@ #include "tdatablock.h" #include "ttime.h" -typedef struct SEventWindowOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SExprSupp scalarSup; - SWindowRowsSup winSup; - int32_t tsSlotId; // primary timestamp column slot id - STimeWindowAggSupp twAggSup; - uint64_t groupId; // current group id, used to identify the data block from different groups - SFilterInfo* pStartCondInfo; - SFilterInfo* pEndCondInfo; - bool inWindow; - SResultRow* pRow; - SSDataBlock* pPreDataBlock; - SOperatorInfo* pOperator; -} SEventWindowOperatorInfo; - static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** pRes); static void destroyEWindowOperatorInfo(void* param); static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); @@ -114,8 +98,9 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy pInfo->tsSlotId = tsSlotId; pInfo->pPreDataBlock = NULL; pInfo->pOperator = pOperator; + pInfo->trueForLimit = pEventWindowNode->trueForLimit; - setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, + setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregateNext, NULL, destroyEWindowOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); @@ -297,6 +282,7 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p TSKEY* tsList = (TSKEY*)pColInfoData->pData; SWindowRowsSup* pRowSup = &pInfo->winSup; int32_t rowIndex = 0; + int64_t minWindowSize = getMinWindowSize(pOperator); pRowSup->numOfRows = 0; if (pInfo->groupId == 0) { @@ -341,18 +327,23 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p QUERY_CHECK_CODE(code, lino, _return); doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset); - // check buffer size - if (pRes->info.rows + pInfo->pRow->numOfRows >= pRes->info.capacity) { - int32_t newSize = pRes->info.rows + pInfo->pRow->numOfRows; - code = blockDataEnsureCapacity(pRes, newSize); + if (pRowSup->win.ekey - pRowSup->win.skey < minWindowSize) { + qDebug("skip small window, groupId: %" PRId64 ", windowSize: %" PRId64 ", minWindowSize: %" PRId64, + pInfo->groupId, pRowSup->win.ekey - pRowSup->win.skey, minWindowSize); + } else { + // check buffer size + if (pRes->info.rows + pInfo->pRow->numOfRows >= pRes->info.capacity) { + int32_t newSize = pRes->info.rows + pInfo->pRow->numOfRows; + code = blockDataEnsureCapacity(pRes, newSize); + QUERY_CHECK_CODE(code, lino, _return); + } + + code = copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pRes, + pSup->rowEntryInfoOffset, pTaskInfo); QUERY_CHECK_CODE(code, lino, _return); + + pRes->info.rows += pInfo->pRow->numOfRows; } - - code = copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pRes, - pSup->rowEntryInfoOffset, pTaskInfo); - QUERY_CHECK_CODE(code, lino, _return); - - pRes->info.rows += pInfo->pRow->numOfRows; pInfo->pRow->numOfRows = 0; pInfo->inWindow = false; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 147d62d245..db63f74af4 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -212,6 +212,7 @@ void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) { pGroupResInfo->pRows = NULL; } pGroupResInfo->index = 0; + pGroupResInfo->delIndex = 0; } int32_t resultrowComparAsc(const void* p1, const void* p2) { @@ -303,6 +304,7 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL pGroupResInfo->freeItem = true; pGroupResInfo->pRows = pArrayList; pGroupResInfo->index = 0; + pGroupResInfo->delIndex = 0; } bool hasRemainResults(SGroupResInfo* pGroupResInfo) { diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 51e7cd896a..a7b1b296ac 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -80,7 +80,8 @@ static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* p static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); static void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, - SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup); + SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup, + int64_t minWindowSize); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) { SFilePage* pData = NULL; @@ -846,7 +847,7 @@ _end: } void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, - SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup) { + SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup, int64_t minWindowSize) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SExprInfo* pExprInfo = pSup->pExprInfo; @@ -874,6 +875,14 @@ void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp releaseBufPage(pBuf, page); continue; } + // skip the window which is less than the windowMinSize + if (pRow->win.ekey - pRow->win.skey < minWindowSize) { + qDebug("skip small window, groupId: %" PRId64 ", windowSize: %" PRId64 ", minWindowSize: %" PRId64, pPos->groupId, + pRow->win.ekey - pRow->win.skey, minWindowSize); + pGroupResInfo->index += 1; + releaseBufPage(pBuf, page); + continue; + } if (!ignoreGroup) { if (pBlock->info.id.groupId == 0) { @@ -937,11 +946,11 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG pBlock->info.id.groupId = 0; if (!pbInfo->mergeResultBlock) { doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, - false); + false, getMinWindowSize(pOperator)); } else { while (hasRemainResults(pGroupResInfo)) { doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, - true); + true, getMinWindowSize(pOperator)); if (pBlock->info.rows >= pOperator->resultInfo.threshold) { break; } diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index c33abb3d89..63ff2fa92b 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -395,7 +395,7 @@ static int32_t buildCountResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { STaskNotifyEventStat* pNotifyEventStat = pTaskInfo->streamInfo.pNotifyEventStat; bool addNotifyEvent = false; addNotifyEvent = BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE); - doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator, &pInfo->groupResInfo); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); if (addNotifyEvent) { diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index ab2aa600bb..d258eb08ff 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -616,8 +616,8 @@ static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SStreamNotifyEventSupp* pNotifySup = &pInfo->basic.notifyEventSup; STaskNotifyEventStat* pNotifyEventStat = pTaskInfo->streamInfo.pNotifyEventStat; bool addNotifyEvent = false; - addNotifyEvent = BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE); - doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + addNotifyEvent = BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE); + doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator, &pInfo->groupResInfo); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); if (addNotifyEvent) { @@ -1075,6 +1075,8 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = nodesCollectColumnsFromNode((SNode*)pEventNode->pEndCond, NULL, COLLECT_COL_TYPE_ALL, &pInfo->pEndCondCols); QUERY_CHECK_CODE(code, lino, _error); + pInfo->trueForLimit = pEventNode->trueForLimit; + *pOptrInfo = pOperator; return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/streamexecutorInt.c b/source/libs/executor/src/streamexecutorInt.c index 635de21b6e..a928521aeb 100644 --- a/source/libs/executor/src/streamexecutorInt.c +++ b/source/libs/executor/src/streamexecutorInt.c @@ -22,13 +22,13 @@ #define NOTIFY_EVENT_NAME_CACHE_LIMIT_MB 16 typedef struct SStreamNotifyEvent { - uint64_t gid; - int64_t eventType; + uint64_t gid; + int64_t eventType; STimeWindow win; - cJSON* pJson; + cJSON* pJson; } SStreamNotifyEvent; -#define NOTIFY_EVENT_KEY_SIZE \ +#define NOTIFY_EVENT_KEY_SIZE \ ((sizeof(((struct SStreamNotifyEvent*)0)->gid) + sizeof(((struct SStreamNotifyEvent*)0)->eventType)) + \ sizeof(((struct SStreamNotifyEvent*)0)->win.skey)) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index fbb55301cd..0f65146a1b 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -462,6 +462,7 @@ void clearGroupResInfo(SGroupResInfo* pGroupResInfo) { taosArrayDestroy(pGroupResInfo->pRows); pGroupResInfo->pRows = NULL; pGroupResInfo->index = 0; + pGroupResInfo->delIndex = 0; } void destroyStreamFinalIntervalOperatorInfo(void* param) { @@ -2864,14 +2865,86 @@ inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) { return 0; } -void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) { +static int32_t appendToDeleteDataBlock(SOperatorInfo* pOp, SSDataBlock *pBlock, SSessionKey *pKey) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - SStorageAPI* pAPI = &pOp->pTaskInfo->storageAPI; SExecTaskInfo* pTaskInfo = pOp->pTaskInfo; + QUERY_CHECK_NULL(pBlock, code, lino, _end, TSDB_CODE_INVALID_PARA); + QUERY_CHECK_NULL(pKey, code, lino, _end, TSDB_CODE_INVALID_PARA); + + SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + code = colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)&pKey->win.skey, false); + QUERY_CHECK_CODE(code, lino, _end); + + SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + code = colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)&pKey->win.skey, false); + QUERY_CHECK_CODE(code, lino, _end); + + SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); + colDataSetNULL(pUidCol, pBlock->info.rows); + + SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + code = colDataSetVal(pGpCol, pBlock->info.rows, (const char*)&pKey->groupId, false); + QUERY_CHECK_CODE(code, lino, _end); + + SColumnInfoData* pCalStCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); + colDataSetNULL(pCalStCol, pBlock->info.rows); + + SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + colDataSetNULL(pCalEdCol, pBlock->info.rows); + + SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); + if (!pTableCol) { + QUERY_CHECK_CODE(code, lino, _end); + } + + void* tbname = NULL; + int32_t winCode = TSDB_CODE_SUCCESS; + SStorageAPI* pAPI = &pOp->pTaskInfo->storageAPI; + code = + pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, pKey->groupId, &tbname, false, &winCode); + QUERY_CHECK_CODE(code, lino, _end); + + if (winCode != TSDB_CODE_SUCCESS) { + colDataSetNULL(pTableCol, pBlock->info.rows); + } else { + char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; + STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); + code = colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false); + QUERY_CHECK_CODE(code, lino, _end); + pAPI->stateStore.streamStateFreeVal(tbname); + } + pBlock->info.rows += 1; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + } + return code; +} + +void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite, + SGroupResInfo* pGroupResInfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SExecTaskInfo* pTaskInfo = pOp->pTaskInfo; + int64_t minWindowSize = getMinWindowSize(pOp); + int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); + blockDataCleanup(pBlock); int32_t size = tSimpleHashGetSize(pStDeleted); + if (minWindowSize > 0) { + // Add the number of windows that are below the minimum width limit. + for (int32_t i = pGroupResInfo->delIndex; i < numOfRows; ++i) { + SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i); + SRowBuffPos* pPos = pWinInfo->pStatePos; + SSessionKey* pKey = (SSessionKey*)pPos->pKey; + if (pKey->win.ekey - pKey->win.skey < minWindowSize) { + size++; + } + } + } if (size == 0) { return; } @@ -2884,48 +2957,21 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo break; } SSessionKey* res = tSimpleHashGetKey(*Ite, NULL); - SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); - code = colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)&res->win.skey, false); + code = appendToDeleteDataBlock(pOp, pBlock, res); QUERY_CHECK_CODE(code, lino, _end); + } - SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); - code = colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)&res->win.skey, false); - QUERY_CHECK_CODE(code, lino, _end); - - SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); - colDataSetNULL(pUidCol, pBlock->info.rows); - - SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); - code = colDataSetVal(pGpCol, pBlock->info.rows, (const char*)&res->groupId, false); - QUERY_CHECK_CODE(code, lino, _end); - - SColumnInfoData* pCalStCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); - colDataSetNULL(pCalStCol, pBlock->info.rows); - - SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); - colDataSetNULL(pCalEdCol, pBlock->info.rows); - - SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); - if (!pTableCol) { - QUERY_CHECK_CODE(code, lino, _end); + if (minWindowSize > 0) { + for (int32_t i = pGroupResInfo->delIndex; i < numOfRows; ++i) { + SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i); + SRowBuffPos* pPos = pWinInfo->pStatePos; + SSessionKey* pKey = (SSessionKey*)pPos->pKey; + if (pKey->win.ekey - pKey->win.skey < minWindowSize) { + code = appendToDeleteDataBlock(pOp, pBlock, pKey); + QUERY_CHECK_CODE(code, lino, _end); + } } - - void* tbname = NULL; - int32_t winCode = TSDB_CODE_SUCCESS; - code = pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, res->groupId, &tbname, false, - &winCode); - QUERY_CHECK_CODE(code, lino, _end); - - if (winCode != TSDB_CODE_SUCCESS) { - colDataSetNULL(pTableCol, pBlock->info.rows); - } else { - char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; - STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); - code = colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false); - QUERY_CHECK_CODE(code, lino, _end); - pAPI->stateStore.streamStateFreeVal(tbname); - } - pBlock->info.rows += 1; + pGroupResInfo->delIndex = numOfRows; } _end: @@ -3118,6 +3164,7 @@ void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL pGroupResInfo->index = 0; pGroupResInfo->pBuf = NULL; pGroupResInfo->freeItem = false; + pGroupResInfo->delIndex = 0; } int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup, @@ -3130,6 +3177,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa int32_t numOfExprs = pSup->numOfExprs; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; SqlFunctionCtx* pCtx = pSup->pCtx; + int64_t minWindowSize = getMinWindowSize(pOperator); int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); @@ -3170,6 +3218,13 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa pGroupResInfo->index += 1; continue; } + // skip the window which is less than the windowMinSize + if (pKey->win.ekey - pKey->win.skey < minWindowSize) { + qDebug("skip small window, groupId: %" PRId64 ", windowSize: %" PRId64 ", minWindowSize: %" PRId64, pKey->groupId, + pKey->win.ekey - pKey->win.skey, minWindowSize); + pGroupResInfo->index += 1; + continue; + } if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { break; @@ -3263,7 +3318,7 @@ static int32_t buildSessionResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) bool addNotifyEvent = false; addNotifyEvent = IS_NORMAL_SESSION_OP(pOperator) && BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE); - doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator, &pInfo->groupResInfo); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); if (addNotifyEvent) { @@ -4905,7 +4960,7 @@ static int32_t buildStateResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { STaskNotifyEventStat* pNotifyEventStat = pTaskInfo->streamInfo.pNotifyEventStat; bool addNotifyEvent = false; addNotifyEvent = BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE); - doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator, &pInfo->groupResInfo); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); if (addNotifyEvent) { @@ -5340,6 +5395,8 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = appendDownstream(pOperator, &downstream, 1); QUERY_CHECK_CODE(code, lino, _error); + pInfo->trueForLimit = pStateNode->trueForLimit; + *pOptrInfo = pOperator; return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 49fd557fe3..96ad51149c 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1390,3 +1390,22 @@ void destroyTimeSliceOperatorInfo(void* param) { } taosMemoryFreeClear(param); } + +int64_t getMinWindowSize(struct SOperatorInfo* pOperator) { + if (pOperator == NULL) { + return 0; + } + + switch (pOperator->operatorType) { + case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: + return ((SStateWindowOperatorInfo*)pOperator->info)->trueForLimit; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: + return ((SStreamStateAggOperatorInfo*)pOperator->info)->trueForLimit; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT: + return ((SEventWindowOperatorInfo*)pOperator->info)->trueForLimit; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: + return ((SStreamEventAggOperatorInfo*)pOperator->info)->trueForLimit; + default: + return 0; + } +} diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 71c71a547e..96a3b02464 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -27,35 +27,6 @@ #include "tlog.h" #include "ttime.h" -typedef struct SSessionAggOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SExprSupp scalarSupp; // supporter for perform scalar function - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - bool reptScan; // next round scan - int64_t gap; // session window gap - int32_t tsSlotId; // primary timestamp slot id - STimeWindowAggSupp twAggSup; - SOperatorInfo* pOperator; - bool cleanGroupResInfo; -} SSessionAggOperatorInfo; - -typedef struct SStateWindowOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SExprSupp scalarSup; - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - SColumn stateCol; // start row index - bool hasKey; - SStateKeys stateKey; - int32_t tsSlotId; // primary timestamp column slot id - STimeWindowAggSupp twAggSup; - SOperatorInfo* pOperator; - bool cleanGroupResInfo; -} SStateWindowOperatorInfo; - typedef enum SResultTsInterpType { RESULT_ROW_START_INTERP = 1, RESULT_ROW_END_INTERP = 2, @@ -1743,6 +1714,7 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy pInfo->tsSlotId = tsSlotId; pInfo->pOperator = pOperator; pInfo->cleanGroupResInfo = false; + pInfo->trueForLimit = pStateNode->trueForLimit; setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo, diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 0e7a719c2c..a0dd5ba061 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -354,6 +354,7 @@ static int32_t limitNodeCopy(const SLimitNode* pSrc, SLimitNode* pDst) { static int32_t stateWindowNodeCopy(const SStateWindowNode* pSrc, SStateWindowNode* pDst) { CLONE_NODE_FIELD(pCol); CLONE_NODE_FIELD(pExpr); + CLONE_NODE_FIELD(pTrueForLimit); return TSDB_CODE_SUCCESS; } @@ -361,6 +362,7 @@ static int32_t eventWindowNodeCopy(const SEventWindowNode* pSrc, SEventWindowNod CLONE_NODE_FIELD(pCol); CLONE_NODE_FIELD(pStartCond); CLONE_NODE_FIELD(pEndCond); + CLONE_NODE_FIELD(pTrueForLimit); return TSDB_CODE_SUCCESS; } @@ -627,6 +629,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p CLONE_NODE_FIELD(pStateExpr); CLONE_NODE_FIELD(pStartCond); CLONE_NODE_FIELD(pEndCond); + COPY_SCALAR_FIELD(trueForLimit); COPY_SCALAR_FIELD(triggerType); COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(deleteMark); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 6966f6a463..e906c79a08 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -3127,6 +3127,7 @@ static int32_t jsonToPhysiSessionWindowNode(const SJson* pJson, void* pObj) { } static const char* jkStateWindowPhysiPlanStateKey = "StateKey"; +static const char* jkStateWindowPhysiPlanTrueForLimit = "TrueForLimit"; static int32_t physiStateWindowNodeToJson(const void* pObj, SJson* pJson) { const SStateWinodwPhysiNode* pNode = (const SStateWinodwPhysiNode*)pObj; @@ -3135,6 +3136,9 @@ static int32_t physiStateWindowNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkStateWindowPhysiPlanStateKey, nodeToJson, pNode->pStateKey); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkStateWindowPhysiPlanTrueForLimit, pNode->trueForLimit); + } return code; } @@ -3146,12 +3150,16 @@ static int32_t jsonToPhysiStateWindowNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkStateWindowPhysiPlanStateKey, &pNode->pStateKey); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkStateWindowPhysiPlanTrueForLimit, &pNode->trueForLimit); + } return code; } static const char* jkEventWindowPhysiPlanStartCond = "StartCond"; static const char* jkEventWindowPhysiPlanEndCond = "EndCond"; +static const char* jkEventWindowPhysiPlanTrueForLimit = "TrueForLimit"; static int32_t physiEventWindowNodeToJson(const void* pObj, SJson* pJson) { const SEventWinodwPhysiNode* pNode = (const SEventWinodwPhysiNode*)pObj; @@ -3163,6 +3171,9 @@ static int32_t physiEventWindowNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkEventWindowPhysiPlanEndCond, nodeToJson, pNode->pEndCond); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkEventWindowPhysiPlanTrueForLimit, pNode->trueForLimit); + } return code; } @@ -3177,6 +3188,9 @@ static int32_t jsonToPhysiEventWindowNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkEventWindowPhysiPlanEndCond, &pNode->pEndCond); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkEventWindowPhysiPlanTrueForLimit, &pNode->trueForLimit); + } return code; } @@ -4960,6 +4974,7 @@ static int32_t jsonToLimitNode(const SJson* pJson, void* pObj) { static const char* jkStateWindowCol = "StateWindowCol"; static const char* jkStateWindowExpr = "StateWindowExpr"; +static const char* jkStateWindowTrueForLimit = "TrueForLimit"; static int32_t stateWindowNodeToJson(const void* pObj, SJson* pJson) { const SStateWindowNode* pNode = (const SStateWindowNode*)pObj; @@ -4967,6 +4982,9 @@ static int32_t stateWindowNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkStateWindowExpr, nodeToJson, pNode->pExpr); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkStateWindowTrueForLimit, nodeToJson, pNode->pTrueForLimit); + } return code; } @@ -4977,6 +4995,9 @@ static int32_t jsonToStateWindowNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkStateWindowExpr, (SNode**)&pNode->pExpr); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkStateWindowTrueForLimit, (SNode**)&pNode->pTrueForLimit); + } return code; } @@ -5006,6 +5027,7 @@ static int32_t jsonToSessionWindowNode(const SJson* pJson, void* pObj) { static const char* jkEventWindowTsPrimaryKey = "TsPrimaryKey"; static const char* jkEventWindowStartCond = "StartCond"; static const char* jkEventWindowEndCond = "EndCond"; +static const char* jkEventWindowTrueForLimit = "TrueForLimit"; static int32_t eventWindowNodeToJson(const void* pObj, SJson* pJson) { const SEventWindowNode* pNode = (const SEventWindowNode*)pObj; @@ -5017,6 +5039,9 @@ static int32_t eventWindowNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkEventWindowEndCond, nodeToJson, pNode->pEndCond); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkEventWindowTrueForLimit, nodeToJson, pNode->pTrueForLimit); + } return code; } @@ -5030,6 +5055,9 @@ static int32_t jsonToEventWindowNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkEventWindowEndCond, &pNode->pEndCond); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkEventWindowTrueForLimit, &pNode->pTrueForLimit); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 1becd07aba..6f15322a24 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -3443,7 +3443,7 @@ static int32_t msgToPhysiSessionWindowNode(STlvDecoder* pDecoder, void* pObj) { return code; } -enum { PHY_STATE_CODE_WINDOW = 1, PHY_STATE_CODE_KEY }; +enum { PHY_STATE_CODE_WINDOW = 1, PHY_STATE_CODE_KEY, PHY_STATE_CODE_TRUE_FOR_LIMIT }; static int32_t physiStateWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { const SStateWinodwPhysiNode* pNode = (const SStateWinodwPhysiNode*)pObj; @@ -3452,6 +3452,9 @@ static int32_t physiStateWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_STATE_CODE_KEY, nodeToMsg, pNode->pStateKey); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_STATE_CODE_TRUE_FOR_LIMIT, pNode->trueForLimit); + } return code; } @@ -3469,6 +3472,9 @@ static int32_t msgToPhysiStateWindowNode(STlvDecoder* pDecoder, void* pObj) { case PHY_STATE_CODE_KEY: code = msgToNodeFromTlv(pTlv, (void**)&pNode->pStateKey); break; + case PHY_STATE_CODE_TRUE_FOR_LIMIT: + code = tlvDecodeI64(pTlv, &pNode->trueForLimit); + break; default: break; } @@ -3477,7 +3483,7 @@ static int32_t msgToPhysiStateWindowNode(STlvDecoder* pDecoder, void* pObj) { return code; } -enum { PHY_EVENT_CODE_WINDOW = 1, PHY_EVENT_CODE_START_COND, PHY_EVENT_CODE_END_COND }; +enum { PHY_EVENT_CODE_WINDOW = 1, PHY_EVENT_CODE_START_COND, PHY_EVENT_CODE_END_COND, PHY_EVENT_CODE_TRUE_FOR_LIMIT }; static int32_t physiEventWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { const SEventWinodwPhysiNode* pNode = (const SEventWinodwPhysiNode*)pObj; @@ -3489,6 +3495,9 @@ static int32_t physiEventWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_EVENT_CODE_END_COND, nodeToMsg, pNode->pEndCond); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_EVENT_CODE_TRUE_FOR_LIMIT, pNode->trueForLimit); + } return code; } @@ -3509,6 +3518,9 @@ static int32_t msgToPhysiEventWindowNode(STlvDecoder* pDecoder, void* pObj) { case PHY_EVENT_CODE_END_COND: code = msgToNodeFromTlv(pTlv, (void**)&pNode->pEndCond); break; + case PHY_EVENT_CODE_TRUE_FOR_LIMIT: + code = tlvDecodeI64(pTlv, &pNode->trueForLimit); + break; default: break; } diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index f3f7395a37..33b27d1414 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -102,6 +102,9 @@ static EDealRes dispatchExpr(SNode* pNode, ETraversalOrder order, FNodeWalker wa if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkExpr(pState->pCol, order, walker, pContext); } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkExpr(pState->pTrueForLimit, order, walker, pContext); + } break; } case QUERY_NODE_SESSION_WINDOW: { @@ -174,6 +177,9 @@ static EDealRes dispatchExpr(SNode* pNode, ETraversalOrder order, FNodeWalker wa if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkExpr(pEvent->pEndCond, order, walker, pContext); } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkExpr(pEvent->pTrueForLimit, order, walker, pContext); + } break; } case QUERY_NODE_COUNT_WINDOW: { @@ -313,6 +319,9 @@ static EDealRes rewriteExpr(SNode** pRawNode, ETraversalOrder order, FNodeRewrit if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteExpr(&pState->pCol, order, rewriter, pContext); } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = rewriteExpr(&pState->pTrueForLimit, order, rewriter, pContext); + } break; } case QUERY_NODE_SESSION_WINDOW: { @@ -385,6 +394,9 @@ static EDealRes rewriteExpr(SNode** pRawNode, ETraversalOrder order, FNodeRewrit if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteExpr(&pEvent->pEndCond, order, rewriter, pContext); } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = rewriteExpr(&pEvent->pTrueForLimit, order, rewriter, pContext); + } break; } case QUERY_NODE_WINDOW_OFFSET: { diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 9473e75642..01a3d41f61 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1125,6 +1125,7 @@ void nodesDestroyNode(SNode* pNode) { SStateWindowNode* pState = (SStateWindowNode*)pNode; nodesDestroyNode(pState->pCol); nodesDestroyNode(pState->pExpr); + nodesDestroyNode(pState->pTrueForLimit); break; } case QUERY_NODE_SESSION_WINDOW: { @@ -1239,6 +1240,7 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pEvent->pCol); nodesDestroyNode(pEvent->pStartCond); nodesDestroyNode(pEvent->pEndCond); + nodesDestroyNode(pEvent->pTrueForLimit); break; } case QUERY_NODE_COUNT_WINDOW: { diff --git a/source/libs/nodes/test/nodesCloneTest.cpp b/source/libs/nodes/test/nodesCloneTest.cpp index ea0201f2ef..6ac6c72f48 100644 --- a/source/libs/nodes/test/nodesCloneTest.cpp +++ b/source/libs/nodes/test/nodesCloneTest.cpp @@ -91,6 +91,7 @@ TEST_F(NodesCloneTest, stateWindow) { SStateWindowNode* pDstNode = (SStateWindowNode*)pDst; ASSERT_EQ(nodeType(pSrcNode->pCol), nodeType(pDstNode->pCol)); ASSERT_EQ(nodeType(pSrcNode->pExpr), nodeType(pDstNode->pExpr)); + ASSERT_EQ(nodeType(pSrcNode->pTrueForLimit), nodeType(pDstNode->pTrueForLimit)); }); std::unique_ptr srcNode(nullptr, nodesDestroyNode); @@ -102,6 +103,7 @@ TEST_F(NodesCloneTest, stateWindow) { SStateWindowNode* pNode = (SStateWindowNode*)srcNode.get(); code = nodesMakeNode(QUERY_NODE_COLUMN, &pNode->pCol); code = nodesMakeNode(QUERY_NODE_OPERATOR, &pNode->pExpr); + code = nodesMakeNode(QUERY_NODE_VALUE, &pNode->pTrueForLimit); return srcNode.get(); }()); } diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index d99dfc977a..0931ca69cd 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -155,8 +155,8 @@ SNode* createViewNode(SAstCreateContext* pCxt, SToken* pDbName, SToken* pVie SNode* createLimitNode(SAstCreateContext* pCxt, SNode* pLimit, SNode* pOffset); SNode* createOrderByExprNode(SAstCreateContext* pCxt, SNode* pExpr, EOrder order, ENullOrder nullOrder); SNode* createSessionWindowNode(SAstCreateContext* pCxt, SNode* pCol, SNode* pGap); -SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr); -SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond); +SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr, SNode *pTrueForLimit); +SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond, SNode *pTrueForLimit); SNode* createCountWindowNode(SAstCreateContext* pCxt, const SToken* pCountToken, const SToken* pSlidingToken); SNode* createAnomalyWindowNode(SAstCreateContext* pCxt, SNode* pExpr, const SToken* pFuncOpt); SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding, diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index f7dcd1b4ce..8dc33f14e1 100755 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -1628,7 +1628,8 @@ partition_item(A) ::= expr_or_subquery(B) AS column_alias(C). twindow_clause_opt(A) ::= . { A = NULL; } twindow_clause_opt(A) ::= SESSION NK_LP column_reference(B) NK_COMMA interval_sliding_duration_literal(C) NK_RP. { A = createSessionWindowNode(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)); } -twindow_clause_opt(A) ::= STATE_WINDOW NK_LP expr_or_subquery(B) NK_RP. { A = createStateWindowNode(pCxt, releaseRawExprNode(pCxt, B)); } +twindow_clause_opt(A) ::= + STATE_WINDOW NK_LP expr_or_subquery(B) NK_RP true_for_opt(C). { A = createStateWindowNode(pCxt, releaseRawExprNode(pCxt, B), C); } twindow_clause_opt(A) ::= INTERVAL NK_LP interval_sliding_duration_literal(B) NK_RP sliding_opt(C) fill_opt(D). { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, B), NULL, C, D); } twindow_clause_opt(A) ::= @@ -1637,9 +1638,9 @@ twindow_clause_opt(A) ::= sliding_opt(D) fill_opt(E). { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C), D, E); } twindow_clause_opt(A) ::= INTERVAL NK_LP interval_sliding_duration_literal(B) NK_COMMA - AUTO(C) NK_RP sliding_opt(D) fill_opt(E). { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, B), createDurationValueNode(pCxt, &C), D, E); } -twindow_clause_opt(A) ::= - EVENT_WINDOW START WITH search_condition(B) END WITH search_condition(C). { A = createEventWindowNode(pCxt, B, C); } + AUTO(C) NK_RP sliding_opt(D) fill_opt(E). { A = createIntervalWindowNode(pCxt, releaseRawExprNode(pCxt, B), createDurationValueNode(pCxt, &C), D, E); } +twindow_clause_opt(A) ::= EVENT_WINDOW START WITH search_condition(B) + END WITH search_condition(C) true_for_opt(D). { A = createEventWindowNode(pCxt, B, C, D); } twindow_clause_opt(A) ::= COUNT_WINDOW NK_LP NK_INTEGER(B) NK_RP. { A = createCountWindowNode(pCxt, &B, &B); } twindow_clause_opt(A) ::= @@ -1717,6 +1718,9 @@ range_opt(A) ::= every_opt(A) ::= . { A = NULL; } every_opt(A) ::= EVERY NK_LP duration_literal(B) NK_RP. { A = releaseRawExprNode(pCxt, B); } +true_for_opt(A) ::= . { A = NULL; } +true_for_opt(A) ::= TRUE_FOR NK_LP interval_sliding_duration_literal(B) NK_RP. { A = releaseRawExprNode(pCxt, B); } + /************************************************ query_expression ****************************************************/ query_expression(A) ::= query_simple(B) order_by_clause_opt(C) slimit_clause_opt(D) limit_clause_opt(E). { diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index ec0b38a5e7..c42f81f231 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -1332,7 +1332,7 @@ _err: return NULL; } -SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr) { +SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr, SNode* pTrueForLimit) { SStateWindowNode* state = NULL; CHECK_PARSER_STATUS(pCxt); pCxt->errCode = nodesMakeNode(QUERY_NODE_STATE_WINDOW, (SNode**)&state); @@ -1340,14 +1340,16 @@ SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pExpr) { state->pCol = createPrimaryKeyCol(pCxt, NULL); CHECK_MAKE_NODE(state->pCol); state->pExpr = pExpr; + state->pTrueForLimit = pTrueForLimit; return (SNode*)state; _err: nodesDestroyNode((SNode*)state); nodesDestroyNode(pExpr); + nodesDestroyNode(pTrueForLimit); return NULL; } -SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond) { +SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* pEndCond, SNode* pTrueForLimit) { SEventWindowNode* pEvent = NULL; CHECK_PARSER_STATUS(pCxt); pCxt->errCode = nodesMakeNode(QUERY_NODE_EVENT_WINDOW, (SNode**)&pEvent); @@ -1356,11 +1358,13 @@ SNode* createEventWindowNode(SAstCreateContext* pCxt, SNode* pStartCond, SNode* CHECK_MAKE_NODE(pEvent->pCol); pEvent->pStartCond = pStartCond; pEvent->pEndCond = pEndCond; + pEvent->pTrueForLimit = pTrueForLimit; return (SNode*)pEvent; _err: nodesDestroyNode((SNode*)pEvent); nodesDestroyNode(pStartCond); nodesDestroyNode(pEndCond); + nodesDestroyNode(pTrueForLimit); return NULL; } diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index 3b08d403dc..7d748463a2 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -359,6 +359,7 @@ static SKeyword keywordTable[] = { {"ON_FAILURE", TK_ON_FAILURE}, {"NOTIFY_HISTORY", TK_NOTIFY_HISTORY}, {"REGEXP", TK_REGEXP}, + {"TRUE_FOR", TK_TRUE_FOR} }; // clang-format on diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 142529830a..c46d0ffdc4 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6043,6 +6043,20 @@ static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* p return TSDB_CODE_SUCCESS; } +static int32_t checkTrueForLimit(STranslateContext *pCxt, SNode *pNode) { + SValueNode *pTrueForLimit = (SValueNode *)pNode; + if (pTrueForLimit == NULL) { + return TSDB_CODE_SUCCESS; + } + if (pTrueForLimit->datum.i < 0) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TRUE_FOR_NEGATIVE); + } + if (IS_CALENDAR_TIME_DURATION(pTrueForLimit->unit)) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TRUE_FOR_UNIT); + } + return TSDB_CODE_SUCCESS; +} + static int32_t translateStateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) && !isGlobalTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) { @@ -6055,6 +6069,9 @@ static int32_t translateStateWindow(STranslateContext* pCxt, SSelectStmt* pSelec if (TSDB_CODE_SUCCESS == code) { code = checkStateWindowForStream(pCxt, pSelect); } + if (TSDB_CODE_SUCCESS == code) { + code = checkTrueForLimit(pCxt, pState->pTrueForLimit); + } return code; } @@ -6081,7 +6098,7 @@ static int32_t translateEventWindow(STranslateContext* pCxt, SSelectStmt* pSelec return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TIMELINE_QUERY, "EVENT_WINDOW requires valid time series input"); } - return TSDB_CODE_SUCCESS; + return checkTrueForLimit(pCxt, ((SEventWindowNode*)pSelect->pWindow)->pTrueForLimit); } static int32_t translateCountWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 0cda428487..a5758a17c3 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -227,6 +227,10 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "Some functions cannot appear in the select list at the same time"; case TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR: return "Syntax error in regular expression"; + case TSDB_CODE_PAR_TRUE_FOR_NEGATIVE: + return "True_for duration cannot be negative"; + case TSDB_CODE_PAR_TRUE_FOR_UNIT: + return "Cannot use 'year' or 'month' as true_for duration"; default: return "Unknown error"; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 3f064f2b66..ba2f29a240 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1159,6 +1159,9 @@ static int32_t createWindowLogicNodeByState(SLogicPlanContext* pCxt, SStateWindo nodesDestroyNode((SNode*)pWindow); return code; } + if (pState->pTrueForLimit) { + pWindow->trueForLimit = ((SValueNode*)pState->pTrueForLimit)->datum.i; + } // rewrite the expression in subsequent clauses code = rewriteExprForSelect(pWindow->pStateExpr, pSelect, SQL_CLAUSE_WINDOW); if (TSDB_CODE_SUCCESS == code) { @@ -1272,6 +1275,9 @@ static int32_t createWindowLogicNodeByEvent(SLogicPlanContext* pCxt, SEventWindo nodesDestroyNode((SNode*)pWindow); return TSDB_CODE_OUT_OF_MEMORY; } + if (pEvent->pTrueForLimit) { + pWindow->trueForLimit = ((SValueNode*)pEvent->pTrueForLimit)->datum.i; + } return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode); } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 316699bba1..4c825889f0 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -2328,6 +2328,8 @@ static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC // } } + pState->trueForLimit = pWindowLogicNode->trueForLimit; + if (TSDB_CODE_SUCCESS == code) { code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pState->window, pWindowLogicNode); } @@ -2358,6 +2360,7 @@ static int32_t createEventWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC if (TSDB_CODE_SUCCESS == code) { code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pEndCond, &pEvent->pEndCond); } + pEvent->trueForLimit = pWindowLogicNode->trueForLimit; if (TSDB_CODE_SUCCESS == code) { code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pEvent->window, pWindowLogicNode); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index f34b00bec5..cc9e8d7154 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -752,6 +752,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT, "ANOMALY_WINDOW opti TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE, "Invalid forecast clause") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_REGULAR_EXPRESSION_ERROR, "Syntax error in regular expression") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VGID_LIST, "Invalid vgid list") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TRUE_FOR_NEGATIVE, "True_for duration cannot be negative") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TRUE_FOR_UNIT, "Cannot use 'year' or 'month' as true_for duration") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") //planner diff --git a/source/util/test/errorCodeTable.ini b/source/util/test/errorCodeTable.ini index f67c8ab834..d8b8efcd70 100644 --- a/source/util/test/errorCodeTable.ini +++ b/source/util/test/errorCodeTable.ini @@ -554,6 +554,8 @@ TSDB_CODE_PAR_COL_PK_TYPE = 0x80002679 TSDB_CODE_PAR_INVALID_PK_OP = 0x8000267A TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL = 0x8000267B TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE = 0x8000267C +TSDB_CODE_PAR_TRUE_FOR_NEGATIVE = 0x80002687 +TSDB_CODE_PAR_TRUE_FOR_UNIT = 0x80002688 TSDB_CODE_PAR_INTERNAL_ERROR = 0x800026FF TSDB_CODE_PLAN_INTERNAL_ERROR = 0x80002700 TSDB_CODE_PLAN_EXPECTED_TS_EQUAL = 0x80002701 diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 26cc122021..de87aa803f 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1252,6 +1252,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/operator.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/operator.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f eco-system/manager/schema_change.py -N 3 -M 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_window_true_for.py #tsim test ,,y,script,./test.sh -f tsim/query/timeline.sim diff --git a/tests/system-test/2-query/test_window_true_for.py b/tests/system-test/2-query/test_window_true_for.py new file mode 100644 index 0000000000..2db1ac6531 --- /dev/null +++ b/tests/system-test/2-query/test_window_true_for.py @@ -0,0 +1,488 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import time + +import taos +from util.log import * +from util.cases import * +from util.sql import * + +class TDTestCase: + # init + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + + def create_objects(self): + tdSql.execute("drop database if exists test", show=True) + tdSql.execute("create database test keep 36500 precision 'ms'", show=True) + tdSql.execute("use test", show=True) + tdSql.execute("create stable st (ts timestamp, c1 int) tags (gid int)", show=True) + tdSql.execute("create table ct_0 using st(gid) tags (0)") + tdSql.execute("create table ct_1 using st(gid) tags (1)") + + tdSql.execute(f'''create stream s_event_1 into d_event_1 as + select _wstart, _wend, count(*) from ct_0 + event_window start with c1 > 0 end with c1 < 0 true_for(3s);''', show=True) + tdSql.execute(f'''create stream s_event_2 ignore update 0 ignore expired 0 into d_event_2 as + select _wstart, _wend, count(*) from ct_0 + event_window start with c1 > 0 end with c1 < 0 true_for(3s);''', show=True) + + tdSql.execute(f'''create stream s_event_3 into d_event_3 as + select _wstart, _wend, count(*) from ct_0 + event_window start with c1 > 0 end with c1 < 0 true_for(2999);''', show=True) + tdSql.execute(f'''create stream s_event_4 ignore update 0 ignore expired 0 into d_event_4 as + select _wstart, _wend, count(*) from ct_0 + event_window start with c1 > 0 end with c1 < 0 true_for(2999);''', show=True) + + tdSql.execute(f'''create stream s_event_5 into d_event_5 as + select _wstart, _wend, count(*) from ct_0 + event_window start with c1 > 0 end with c1 < 0 true_for('3001a');''', show=True) + tdSql.execute(f'''create stream s_event_6 ignore update 0 ignore expired 0 into d_event_6 as + select _wstart, _wend, count(*) from ct_0 + event_window start with c1 > 0 end with c1 < 0 true_for('3001a');''', show=True) + + tdSql.execute(f'''create stream s_state_1 into d_state_1 as + select _wstart, _wend, count(*) from ct_1 + state_window(c1) true_for (3s);''', show=True) + tdSql.execute(f'''create stream s_state_2 ignore update 0 ignore expired 0 into d_state_2 as + select _wstart, _wend, count(*) from ct_1 + state_window(c1) true_for (3s);''', show=True) + + tdSql.execute(f'''create stream s_state_3 into d_state_3 as + select _wstart, _wend, count(*) from ct_1 + state_window(c1) true_for (2999);''', show=True) + tdSql.execute(f'''create stream s_state_4 ignore update 0 ignore expired 0 into d_state_4 as + select _wstart, _wend, count(*) from ct_1 + state_window(c1) true_for (2999);''', show=True) + + tdSql.execute(f'''create stream s_state_5 into d_state_5 as + select _wstart, _wend, count(*) from ct_1 + state_window(c1) true_for ('3001a');''', show=True) + tdSql.execute(f'''create stream s_state_6 ignore update 0 ignore expired 0 into d_state_6 as + select _wstart, _wend, count(*) from ct_1 + state_window(c1) true_for ('3001a');''', show=True) + # Wait for the stream tasks to be ready + for i in range(50): + tdLog.info(f"i={i} wait for stream tasks ready ...") + time.sleep(1) + rows = tdSql.query("select * from information_schema.ins_stream_tasks where status <> 'ready';") + if rows == 0: + break + + def insert_data(self): + tdSql.execute(f'''insert into ct_0 values + ('2025-01-01 00:00:00.000', -1), + ('2025-01-01 00:00:01.000', 1), + ('2025-01-01 00:00:02.000', -1), + ('2025-01-01 00:00:03.000', 1), + ('2025-01-01 00:00:04.000', 1), + ('2025-01-01 00:00:05.000', -1), + ('2025-01-01 00:00:06.000', 1), + ('2025-01-01 00:00:07.000', 1), + ('2025-01-01 00:00:08.000', 1), + ('2025-01-01 00:00:08.999', -1), + ('2025-01-01 00:00:10.000', 1), + ('2025-01-01 00:00:11.000', 1), + ('2025-01-01 00:00:12.000', 1), + ('2025-01-01 00:00:13.000', -1), + ('2025-01-01 00:00:14.000', 1), + ('2025-01-01 00:00:15.000', 1), + ('2025-01-01 00:00:16.000', 1), + ('2025-01-01 00:00:17.001', -1), + ('2025-01-01 00:00:18.000', 1), + ('2025-01-01 00:00:19.000', 1), + ('2025-01-01 00:00:20.000', 1), + ('2025-01-01 00:00:21.000', 1), + ('2025-01-01 00:00:22.000', -1), + ('2025-01-01 00:00:23.000', -1), + ('2025-01-01 00:00:24.000', 1), + ('2025-01-01 00:00:25.000', 1), + ('2025-01-01 00:00:26.000', 1), + ('2025-01-01 00:00:27.000', 1), + ('2025-01-01 00:00:28.000', 1), + ('2025-01-01 00:00:29.000', 1), + ('2025-01-01 00:00:30.000', -1), + ('2025-01-01 00:00:31.000', 0);''', show=True) + tdSql.execute(f'''insert into ct_1 values + ('2025-01-01 00:00:00.000', 0), + ('2025-01-01 00:00:01.000', 1), + ('2025-01-01 00:00:02.000', 1), + ('2025-01-01 00:00:03.000', 2), + ('2025-01-01 00:00:04.000', 2), + ('2025-01-01 00:00:05.000', 2), + ('2025-01-01 00:00:06.000', 3), + ('2025-01-01 00:00:07.000', 3), + ('2025-01-01 00:00:08.000', 3), + ('2025-01-01 00:00:08.999', 3), + ('2025-01-01 00:00:10.000', 4), + ('2025-01-01 00:00:11.000', 4), + ('2025-01-01 00:00:12.000', 4), + ('2025-01-01 00:00:13.000', 4), + ('2025-01-01 00:00:14.000', 5), + ('2025-01-01 00:00:15.000', 5), + ('2025-01-01 00:00:16.000', 5), + ('2025-01-01 00:00:17.001', 5), + ('2025-01-01 00:00:18.000', 6), + ('2025-01-01 00:00:19.000', 6), + ('2025-01-01 00:00:20.000', 6), + ('2025-01-01 00:00:21.000', 6), + ('2025-01-01 00:00:22.000', 6), + ('2025-01-01 00:00:23.000', 0), + ('2025-01-01 00:00:24.000', 7), + ('2025-01-01 00:00:25.000', 7), + ('2025-01-01 00:00:26.000', 7), + ('2025-01-01 00:00:27.000', 7), + ('2025-01-01 00:00:28.000', 7), + ('2025-01-01 00:00:29.000', 7), + ('2025-01-01 00:00:30.000', 7), + ('2025-01-01 00:00:31.000', 0);''', show=True) + tdLog.info("wait for all stream tasks to be ready ...") + time.sleep(10) + + def update_data(self): + tdSql.execute(f'''insert into ct_0 values + ('2025-01-01 00:00:00.000', 1), + ('2025-01-01 00:00:22.000', 1), + ('2025-01-01 00:00:28.000', -1);''', show=True) + tdSql.execute(f'''insert into ct_1 values + ('2025-01-01 00:00:00.000', 1), + ('2025-01-01 00:00:23.000', 6), + ('2025-01-01 00:00:29.000', 8), + ('2025-01-01 00:00:30.000', 8);''', show=True) + tdLog.info("wait for all stream tasks to be ready ...") + time.sleep(5) + + def check_result(self): + tdSql.query("select * from d_event_1", show=True) + tdSql.checkRows(4) + tdSql.checkData(0, 0, '2025-01-01 00:00:10.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:13.000') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(1, 2, 4) + tdSql.checkData(2, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:22.000') + tdSql.checkData(2, 2, 5) + tdSql.checkData(3, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(3, 1, '2025-01-01 00:00:30.000') + tdSql.checkData(3, 2, 7) + + tdSql.query("select * from d_event_2", show=True) + tdSql.checkRows(4) + tdSql.checkData(0, 0, '2025-01-01 00:00:10.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:13.000') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(1, 2, 4) + tdSql.checkData(2, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:23.000') + tdSql.checkData(2, 2, 6) + tdSql.checkData(3, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(3, 1, '2025-01-01 00:00:28.000') + tdSql.checkData(3, 2, 5) + + tdSql.query("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(3s);", show=True) + tdSql.checkRows(4) + tdSql.checkData(0, 0, '2025-01-01 00:00:10.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:13.000') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(1, 2, 4) + tdSql.checkData(2, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:23.000') + tdSql.checkData(2, 2, 6) + tdSql.checkData(3, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(3, 1, '2025-01-01 00:00:28.000') + tdSql.checkData(3, 2, 5) + + tdSql.query("select * from d_event_3", show=True) + tdSql.checkRows(5) + tdSql.checkData(0, 0, '2025-01-01 00:00:06.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:08.999') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:10.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:13.000') + tdSql.checkData(1, 2, 4) + tdSql.checkData(2, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(2, 2, 4) + tdSql.checkData(3, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(3, 1, '2025-01-01 00:00:22.000') + tdSql.checkData(3, 2, 5) + tdSql.checkData(4, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(4, 1, '2025-01-01 00:00:30.000') + tdSql.checkData(4, 2, 7) + + tdSql.query("select * from d_event_4", show=True) + tdSql.checkRows(5) + tdSql.checkData(0, 0, '2025-01-01 00:00:06.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:08.999') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:10.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:13.000') + tdSql.checkData(1, 2, 4) + tdSql.checkData(2, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(2, 2, 4) + tdSql.checkData(3, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(3, 1, '2025-01-01 00:00:23.000') + tdSql.checkData(3, 2, 6) + tdSql.checkData(4, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(4, 1, '2025-01-01 00:00:28.000') + tdSql.checkData(4, 2, 5) + + tdSql.query("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(2999);", show=True) + tdSql.checkRows(5) + tdSql.checkData(0, 0, '2025-01-01 00:00:06.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:08.999') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:10.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:13.000') + tdSql.checkData(1, 2, 4) + tdSql.checkData(2, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(2, 2, 4) + tdSql.checkData(3, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(3, 1, '2025-01-01 00:00:23.000') + tdSql.checkData(3, 2, 6) + tdSql.checkData(4, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(4, 1, '2025-01-01 00:00:28.000') + tdSql.checkData(4, 2, 5) + + tdSql.query("select * from d_event_5", show=True) + tdSql.checkRows(3) + tdSql.checkData(0, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:22.000') + tdSql.checkData(1, 2, 5) + tdSql.checkData(2, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:30.000') + tdSql.checkData(2, 2, 7) + + tdSql.query("select * from d_event_6", show=True) + tdSql.checkRows(3) + tdSql.checkData(0, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:23.000') + tdSql.checkData(1, 2, 6) + tdSql.checkData(2, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:28.000') + tdSql.checkData(2, 2, 5) + + tdSql.query("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for('3001a');", show=True) + tdSql.checkRows(3) + tdSql.checkData(0, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:23.000') + tdSql.checkData(1, 2, 6) + tdSql.checkData(2, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:28.000') + tdSql.checkData(2, 2, 5) + + tdSql.query("select * from d_state_1", show=True) + tdSql.checkRows(4) + tdSql.checkData(0, 0, '2025-01-01 00:00:10.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:13.000') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(1, 2, 4) + tdSql.checkData(2, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:22.000') + tdSql.checkData(2, 2, 5) + tdSql.checkData(3, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(3, 1, '2025-01-01 00:00:30.000') + tdSql.checkData(3, 2, 7) + + tdSql.query("select * from d_state_2", show=True) + tdSql.checkRows(4) + tdSql.checkData(0, 0, '2025-01-01 00:00:10.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:13.000') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(1, 2, 4) + tdSql.checkData(2, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:23.000') + tdSql.checkData(2, 2, 6) + tdSql.checkData(3, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(3, 1, '2025-01-01 00:00:28.000') + tdSql.checkData(3, 2, 5) + + tdSql.query("select _wstart, _wend, count(*) from ct_1 state_window(c1) true_for(3s);", show=True) + tdSql.checkRows(4) + tdSql.checkData(0, 0, '2025-01-01 00:00:10.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:13.000') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(1, 2, 4) + tdSql.checkData(2, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:23.000') + tdSql.checkData(2, 2, 6) + tdSql.checkData(3, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(3, 1, '2025-01-01 00:00:28.000') + tdSql.checkData(3, 2, 5) + + tdSql.query("select * from d_state_3", show=True) + tdSql.checkRows(5) + tdSql.checkData(0, 0, '2025-01-01 00:00:06.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:08.999') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:10.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:13.000') + tdSql.checkData(1, 2, 4) + tdSql.checkData(2, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(2, 2, 4) + tdSql.checkData(3, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(3, 1, '2025-01-01 00:00:22.000') + tdSql.checkData(3, 2, 5) + tdSql.checkData(4, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(4, 1, '2025-01-01 00:00:30.000') + tdSql.checkData(4, 2, 7) + + tdSql.query("select * from d_state_4", show=True) + tdSql.checkRows(5) + tdSql.checkData(0, 0, '2025-01-01 00:00:06.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:08.999') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:10.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:13.000') + tdSql.checkData(1, 2, 4) + tdSql.checkData(2, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(2, 2, 4) + tdSql.checkData(3, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(3, 1, '2025-01-01 00:00:23.000') + tdSql.checkData(3, 2, 6) + tdSql.checkData(4, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(4, 1, '2025-01-01 00:00:28.000') + tdSql.checkData(4, 2, 5) + + tdSql.query("select _wstart, _wend, count(*) from ct_1 state_window(c1) true_for(2999);", show=True) + tdSql.checkRows(5) + tdSql.checkData(0, 0, '2025-01-01 00:00:06.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:08.999') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:10.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:13.000') + tdSql.checkData(1, 2, 4) + tdSql.checkData(2, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(2, 2, 4) + tdSql.checkData(3, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(3, 1, '2025-01-01 00:00:23.000') + tdSql.checkData(3, 2, 6) + tdSql.checkData(4, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(4, 1, '2025-01-01 00:00:28.000') + tdSql.checkData(4, 2, 5) + + tdSql.query("select * from d_state_5", show=True) + tdSql.checkRows(3) + tdSql.checkData(0, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:22.000') + tdSql.checkData(1, 2, 5) + tdSql.checkData(2, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:30.000') + tdSql.checkData(2, 2, 7) + + tdSql.query("select * from d_state_6", show=True) + tdSql.checkRows(3) + tdSql.checkData(0, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:23.000') + tdSql.checkData(1, 2, 6) + tdSql.checkData(2, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:28.000') + tdSql.checkData(2, 2, 5) + + tdSql.query("select _wstart, _wend, count(*) from ct_1 state_window(c1) true_for('3001a');", show=True) + tdSql.checkRows(3) + tdSql.checkData(0, 0, '2025-01-01 00:00:14.000') + tdSql.checkData(0, 1, '2025-01-01 00:00:17.001') + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 0, '2025-01-01 00:00:18.000') + tdSql.checkData(1, 1, '2025-01-01 00:00:23.000') + tdSql.checkData(1, 2, 6) + tdSql.checkData(2, 0, '2025-01-01 00:00:24.000') + tdSql.checkData(2, 1, '2025-01-01 00:00:28.000') + tdSql.checkData(2, 2, 5) + + def test_abnormal_query(self): + tdLog.info("test abnormal window true_for limit") + tdSql.error("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(3n);") + tdSql.error("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(3y);") + tdSql.error("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(-1);") + tdSql.error("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(-1a);") + tdSql.error("select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for('-1a');") + tdSql.error("create stream s_ab into dst as select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(3n);") + tdSql.error("create stream s_ab into dst as select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(3y);") + tdSql.error("create stream s_ab into dst as select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(-1);") + tdSql.error("create stream s_ab into dst as select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for(-1a);") + tdSql.error("create stream s_ab into dst as select _wstart, _wend, count(*) from ct_0 event_window start with c1 > 0 end with c1 < 0 true_for('-1a');") + + def test_window_true_for_limit(self): + """ Test the functionality of the true_for window function. + + This test covers: + 1. Both batch query and stream computing scenarios. + 2. Two types of windows: event_window and state_window. + 3. Parameter types for true_for: numeric and string. + 4. Boundary value tests. + 5. Error case tests. + + Since: v3.3.6.0 + + Labels: true_for, state_window, event_window + + Jira: TS-5470 + + History: + - 2025-02-21 Kuang Jinqing Created + """ + self.create_objects() + self.insert_data() + self.update_data() + self.check_result() + self.test_abnormal_query() + + # run + def run(self): + self.test_window_true_for_limit() + + # stop + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())