From 3239a10b76c43e88b6e4fc12fff39d6c433115df Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Thu, 28 Nov 2024 18:29:20 +0800 Subject: [PATCH] support interp fill extension --- docs/en/14-reference/03-taos-sql/06-select.md | 10 +- .../14-reference/03-taos-sql/10-function.md | 3 + docs/zh/14-reference/03-taos-sql/06-select.md | 10 +- .../14-reference/03-taos-sql/10-function.md | 3 + include/common/tmsg.h | 2 + include/libs/function/functionMgt.h | 2 + include/libs/nodes/plannodes.h | 4 + include/libs/nodes/querynodes.h | 9 +- source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/src/executil.c | 3 + source/libs/executor/src/timesliceoperator.c | 139 +++-- source/libs/function/src/builtins.c | 14 + source/libs/function/src/functionMgt.c | 7 + source/libs/nodes/src/nodesCloneFuncs.c | 2 + source/libs/nodes/src/nodesCodeFuncs.c | 28 + source/libs/nodes/src/nodesMsgFuncs.c | 16 +- source/libs/nodes/src/nodesUtilFuncs.c | 9 + source/libs/parser/inc/parAst.h | 1 + source/libs/parser/inc/sql.y | 33 +- source/libs/parser/src/parAstCreater.c | 25 +- source/libs/parser/src/parTokenizer.c | 2 + source/libs/parser/src/parTranslater.c | 122 ++++- source/libs/planner/src/planLogicCreater.c | 10 + source/libs/planner/src/planPhysiCreater.c | 2 + tests/parallel_test/cases.task | 5 + tests/system-test/2-query/interp_extension.py | 518 ++++++++++++++++++ 26 files changed, 922 insertions(+), 58 deletions(-) create mode 100644 tests/system-test/2-query/interp_extension.py diff --git a/docs/en/14-reference/03-taos-sql/06-select.md b/docs/en/14-reference/03-taos-sql/06-select.md index e6e1ca65c2..6b3ab751ec 100644 --- a/docs/en/14-reference/03-taos-sql/06-select.md +++ b/docs/en/14-reference/03-taos-sql/06-select.md @@ -62,7 +62,8 @@ window_clause: { | COUNT_WINDOW(count_val[, sliding_val]) interp_clause: - RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val) + RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val) + | RANGE(ts_val, interval_val) FILL(fill_mod_and_val) partition_by_clause: PARTITION BY partition_by_expr [, partition_by_expr] ... @@ -256,6 +257,13 @@ The \_irowts pseudo column can only be used with the interp function to return t SELECT _irowts, interp(current) FROM meters RANGE('2020-01-01 10:00:00', '2020-01-01 10:30:00') EVERY(1s) FILL(linear); ``` +** \_IROWTS\_ORIGIN** +Pseudo column `_irowts_origin` is used to get the original timestamp of the row used for filling. It can only be used with the INTERP query. `_irowts_origin` is not supported in stream. Only FILL PREV/NEXT/NEAR is supported. If there is not data in range, return NULL. + +```sql +SELECT _irowts_origin, interp(current) FROM meters RANGE('2020-01-01 10:00:00', '2020-01-01 10:30:00') EVERY(1s) FILL(PREV); +``` + ## Query Objects After the FROM keyword, there can be several table (supertable) lists, or the results of subqueries. diff --git a/docs/en/14-reference/03-taos-sql/10-function.md b/docs/en/14-reference/03-taos-sql/10-function.md index 521a8bc8ca..ff52125cfd 100644 --- a/docs/en/14-reference/03-taos-sql/10-function.md +++ b/docs/en/14-reference/03-taos-sql/10-function.md @@ -1869,6 +1869,9 @@ INTERP(expr [, ignore_null_values]) - INTERP can be used with the pseudo-column _irowts to return the timestamp corresponding to the interpolation point (supported from version 3.0.2.0 onwards). - INTERP can also be used with the pseudo-column _isfilled to show whether the returned result is from the original record or produced by the interpolation algorithm (supported from version 3.0.3.0 onwards). - When querying a table with a composite primary key, if there are multiple records with the same timestamp, only the data corresponding to the minimum composite primary key will participate in the calculation. +- `INTERP` support NEAR fill mode. When `FILL(NEAR)` is used, the nearest value to the interpolation point is used to fill the missing value. If there are multiple values with the same distance to the interpolation point, previous row is used. NEAR fill is not supported in stream computation. For example, `SELECT _irowts,INTERP(current) FROM test.meters RANGE('2017-07-22 00:00:00','2017-07-24 12:25:00') EVERY(1h) FILL(NEAR)`. +- Psedo column `_irowts_origin` can be used along with `INTERP` only when using NEAR/NEXT/PREV fill mode. +- `INTERP` RANGE clause support INTERVAL extension, like `RANGE('2023-01-01 00:00:00', 1d)`. The second parameter is the interval length, and the unit cannot use y(year), n(month). The interval length must be an integer, with no quotes, the value can't be 0. The interval length is used to restrict the search range from the time point specified. For example, `SELECT _irowts,INTERP(current) FROM test.meters RANGE('2017-07-22 00:00:00', 1d) FILL(NEAR, 1)`. The query will return the interpolation result of the current column within the range of 1 day from the time point '2017-07-22 00:00:00'. If there is no data within the range, the specified value in FILL will be used. Only FILL PREV/NEXT/NEAR is supported in this case. It's illegal to use `EVERY` clause and NOT specify values in FILL clause in this case. None data-point range clause with INTERVAL extension is not supported currently, like `RANGE('2017-07-22 00:00:00', '2017-07-22 12:00:00', 1h)` is not supported. ### LAST diff --git a/docs/zh/14-reference/03-taos-sql/06-select.md b/docs/zh/14-reference/03-taos-sql/06-select.md index af19559c81..28b9346225 100644 --- a/docs/zh/14-reference/03-taos-sql/06-select.md +++ b/docs/zh/14-reference/03-taos-sql/06-select.md @@ -62,7 +62,8 @@ window_clause: { | COUNT_WINDOW(count_val[, sliding_val]) interp_clause: - RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val) + RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val) + | RANGE(ts_val, interval_val) FILL(fill_mod_and_val) partition_by_clause: PARTITION BY partition_by_expr [, partition_by_expr] ... @@ -255,6 +256,13 @@ select _rowts, max(current) from meters; select _irowts, interp(current) from meters range('2020-01-01 10:00:00', '2020-01-01 10:30:00') every(1s) fill(linear); ``` +**\_IROWTS\_ORIGIN** +`_irowts_origin` 伪列只能与 interp 函数一起使用,不支持在流计算中使用, 仅适用于FILL类型为PREV/NEXT/NEAR, 用于返回 interp 函数所使用的原始数据的时间戳列。若范围内无值, 则返回 NULL。 + +```sql +select _iorwts_origin, interp(current) from meters range('2020-01-01 10:00:00', '2020-01-01 10:30:00') every(1s) fill(NEXT); +``` + ## 查询对象 FROM 关键字后面可以是若干个表(超级表)列表,也可以是子查询的结果。 diff --git a/docs/zh/14-reference/03-taos-sql/10-function.md b/docs/zh/14-reference/03-taos-sql/10-function.md index 2f4b739447..244c1e156a 100644 --- a/docs/zh/14-reference/03-taos-sql/10-function.md +++ b/docs/zh/14-reference/03-taos-sql/10-function.md @@ -1838,6 +1838,9 @@ ignore_null_values: { - INTERP 可以与伪列 _irowts 一起使用,返回插值点所对应的时间戳(3.0.2.0 版本以后支持)。 - INTERP 可以与伪列 _isfilled 一起使用,显示返回结果是否为原始记录或插值算法产生的数据(3.0.3.0 版本以后支持)。 - INTERP 对于带复合主键的表的查询,若存在相同时间戳的数据,则只有对应的复合主键最小的数据参与运算。 +- INTERP 查询支持新的NEAR FILL模式, 即当需要FILL时, 使用距离当前时间点最近的数据进行插值, 当前后时间戳与当前时间断面一样近时, FILL 前一行的值. 此模式在流计算中和窗口查询中不支持。例如: SELECT INTERP(col) FROM tb RANGE('2023-01-01 00:00:00', '2023-01-01 00:10:00') FILL(NEAR)。 +- INTERP 只有在使用FILL PREV/NEXT/NEAR 模式时才可以使用伪列 `_irowts_origin`。 +- INTERP `RANEG`子句支持时间范围的扩展, 如`RANGE('2023-01-01 00:00:00', 10s)`表示在时间点'2023-01-01 00:00:00'查找前后10s的数据进行插值, FILL PREV/NEXT/NEAR分别表示从时间点向前/向后/前后查找数据, 若时间点周围没有数据, 则使用FILL指定的值进行插值, 因此此时FILL子句必须指定值。例如: SELECT INTERP(col) FROM tb RANGE('2023-01-01 00:00:00', 10s) FILL(PREV, 1). 目前仅支持时间点和时间范围的组合, 不支持时间区间和时间范围的组合, 即不支持RANGE('2023-01-01 00:00:00', '2023-02-01 00:00:00', 1h)。所指定的时间范围规则与EVERY类似, 单位不能是年或月, 值不能为0, 不能带引号。使用该扩展时, 不支持除FILL PREV/NEXT/NEAR外的其他FILL模式, 且不能指定EVERY子句 ### LAST diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 26b15c2b76..3793b2b0f3 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -188,6 +188,7 @@ typedef enum _mgmt_table { #define TSDB_FILL_LINEAR 5 #define TSDB_FILL_PREV 6 #define TSDB_FILL_NEXT 7 +#define TSDB_FILL_NEAR 8 #define TSDB_ALTER_USER_PASSWD 0x1 #define TSDB_ALTER_USER_SUPERUSER 0x2 @@ -264,6 +265,7 @@ typedef enum ENodeType { QUERY_NODE_COLUMN_OPTIONS, QUERY_NODE_TSMA_OPTIONS, QUERY_NODE_ANOMALY_WINDOW, + QUERY_NODE_RANGE_AROUND, // Statement nodes are used in parser and planner module. QUERY_NODE_SET_OPERATOR = 100, diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index f71c2210be..bf40632953 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -155,6 +155,7 @@ typedef enum EFunctionType { FUNCTION_TYPE_FORECAST_LOW, FUNCTION_TYPE_FORECAST_HIGH, FUNCTION_TYPE_FORECAST_ROWTS, + FUNCTION_TYPE_IROWTS_ORIGIN, // internal function FUNCTION_TYPE_SELECT_VALUE = 3750, @@ -289,6 +290,7 @@ bool fmIsPrimaryKeyFunc(int32_t funcId); bool fmIsProcessByRowFunc(int32_t funcId); bool fmisSelectGroupConstValueFunc(int32_t funcId); bool fmIsElapsedFunc(int32_t funcId); +bool fmIsRowTsOriginFunc(int32_t funcId); void getLastCacheDataType(SDataType* pType, int32_t pkBytes); int32_t createFunction(const char* pName, SNodeList* pParameterList, SFunctionNode** pFunc); diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 89bc27a1fa..09eec2a17a 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -213,6 +213,8 @@ typedef struct SInterpFuncLogicNode { EFillMode fillMode; SNode* pFillValues; // SNodeListNode SNode* pTimeSeries; // SColumnNode + int64_t rangeInterval; + int8_t rangeIntervalUnit; SStreamNodeOption streamNodeOption; } SInterpFuncLogicNode; @@ -528,6 +530,8 @@ typedef struct SInterpFuncPhysiNode { SNode* pFillValues; // SNodeListNode SNode* pTimeSeries; // SColumnNode SStreamNodeOption streamNodeOption; + int64_t rangeInterval; + int8_t rangeIntervalUnit; } SInterpFuncPhysiNode; typedef SInterpFuncPhysiNode SStreamInterpFuncPhysiNode; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 7af74a347a..bf84ffd3df 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -362,7 +362,8 @@ typedef enum EFillMode { FILL_MODE_NULL, FILL_MODE_NULL_F, FILL_MODE_LINEAR, - FILL_MODE_NEXT + FILL_MODE_NEXT, + FILL_MODE_NEAR, } EFillMode; typedef enum ETimeLineMode { @@ -407,6 +408,11 @@ typedef struct SWindowOffsetNode { SNode* pEndOffset; // SValueNode } SWindowOffsetNode; +typedef struct SRangeAroundNode { + ENodeType type; + SNode* pTimepoint; + SNode* pInterval; +} SRangeAroundNode; typedef struct SSelectStmt { ENodeType type; // QUERY_NODE_SELECT_STMT @@ -421,6 +427,7 @@ typedef struct SSelectStmt { SNodeList* pGroupByList; // SGroupingSetNode SNode* pHaving; SNode* pRange; + SNode* pRangeAround; SNode* pEvery; SNode* pFill; SNodeList* pOrderByList; // SOrderByExprNode diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 271e9c91a1..1483ee0b46 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -1059,6 +1059,7 @@ void destroyFlusedPos(void* pRes); bool isIrowtsPseudoColumn(SExprInfo* pExprInfo); bool isIsfilledPseudoColumn(SExprInfo* pExprInfo); bool isInterpFunc(SExprInfo* pExprInfo); +bool isIrowtsOriginPseudoColumn(SExprInfo* pExprInfo); int32_t encodeSSessionKey(void** buf, SSessionKey* key); void* decodeSSessionKey(void* buf, SSessionKey* key); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 814833fca8..732e69ae09 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2390,6 +2390,9 @@ int32_t convertFillType(int32_t mode) { case FILL_MODE_LINEAR: type = TSDB_FILL_LINEAR; break; + case FILL_MODE_NEAR: + type = TSDB_FILL_NEAR; + break; default: type = TSDB_FILL_NONE; } diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 50deba932f..6a3052c75c 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -48,6 +48,7 @@ typedef struct STimeSliceOperatorInfo { int32_t remainIndex; // the remaining index in the block to be processed bool hasPk; SColumn pkCol; + int64_t rangeInterval; } STimeSliceOperatorInfo; static void destroyTimeSliceOperatorInfo(void* param); @@ -179,6 +180,11 @@ bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) { return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0); } +bool isIrowtsOriginPseudoColumn(SExprInfo* pExprInfo) { + const char* name = pExprInfo->pExpr->_function.functionName; + return (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_irowts_origin") == 0); +} + static void tRowGetKeyFromColData(int64_t ts, SColumnInfoData* pPkCol, int32_t rowIndex, SRowKey* pKey) { pKey->ts = ts; pKey->numOfPKs = 1; @@ -277,6 +283,79 @@ bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bo return false; } +static int32_t interpColSetKey(SColumnInfoData* pDst, int32_t rowNum, SGroupKeys* pKey) { + int32_t code = 0; + if (pKey->isNull == false) { + code = colDataSetVal(pDst, rowNum, pKey->pData, false); + } else { + colDataSetNULL(pDst, rowNum); + } + return code; +} + +static bool interpSetFillRowWithRangeIntervalCheck(STimeSliceOperatorInfo* pSliceInfo, SArray** ppFillRow, SArray* pFillRefRow, int64_t fillRefRowTs) { + *ppFillRow = NULL; + if (pSliceInfo->rangeInterval <= 0 || llabs(fillRefRowTs - pSliceInfo->current) <= pSliceInfo->rangeInterval) { + *ppFillRow = pFillRefRow; + return true; + } + return false; +} + +static bool interpDetermineNearFillRow(STimeSliceOperatorInfo* pSliceInfo, SArray** ppNearRow) { + if (!pSliceInfo->isPrevRowSet && !pSliceInfo->isNextRowSet) { + *ppNearRow = NULL; + return false; + } + SGroupKeys *pPrevTsKey = NULL, *pNextTsKey = NULL; + int64_t* pPrevTs = NULL, *pNextTs = NULL; + if (pSliceInfo->isPrevRowSet) { + pPrevTsKey = taosArrayGet(pSliceInfo->pPrevRow, pSliceInfo->tsCol.slotId); + pPrevTs = (int64_t*)pPrevTsKey->pData; + } + if (pSliceInfo->isNextRowSet) { + pNextTsKey = taosArrayGet(pSliceInfo->pNextRow, pSliceInfo->tsCol.slotId); + pNextTs = (int64_t*)pNextTsKey->pData; + } + if (!pPrevTsKey) { + *ppNearRow = pSliceInfo->pNextRow; + (void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppNearRow, pSliceInfo->pNextRow, *pNextTs); + } else if (!pNextTsKey) { + *ppNearRow = pSliceInfo->pPrevRow; + (void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppNearRow, pSliceInfo->pPrevRow, *pPrevTs); + } else { + if (llabs(pSliceInfo->current - *pPrevTs) <= llabs(*pNextTs - pSliceInfo->current)) { + // take prev if euqal + (void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppNearRow, pSliceInfo->pPrevRow, *pPrevTs); + } else { + (void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppNearRow, pSliceInfo->pNextRow, *pNextTs); + } + } + return true; +} + +static bool interpDetermineFillRefRow(STimeSliceOperatorInfo* pSliceInfo, SArray** ppOutRow) { + bool needFill = false; + if (pSliceInfo->fillType == TSDB_FILL_PREV) { + if (pSliceInfo->isPrevRowSet) { + SGroupKeys* pTsCol = taosArrayGet(pSliceInfo->pPrevRow, pSliceInfo->tsCol.slotId); + (void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppOutRow, pSliceInfo->pPrevRow, *(int64_t*)pTsCol->pData); + needFill = true; + } + } else if (pSliceInfo->fillType == TSDB_FILL_NEXT) { + if (pSliceInfo->isNextRowSet) { + SGroupKeys* pTsCol = taosArrayGet(pSliceInfo->pNextRow, pSliceInfo->tsCol.slotId); + (void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppOutRow, pSliceInfo->pNextRow, *(int64_t*)pTsCol->pData); + needFill = true; + } + } else if (pSliceInfo->fillType == TSDB_FILL_NEAR) { + needFill = interpDetermineNearFillRow(pSliceInfo, ppOutRow); + } else { + needFill = true; + } + return needFill; +} + static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, SSDataBlock* pSrcBlock, int32_t index, bool beforeTs, SExecTaskInfo* pTaskInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -290,6 +369,8 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp int32_t fillColIndex = 0; int32_t groupKeyIndex = 0; bool hasInterp = true; + SArray* pFillRefRow = NULL; + bool needFill = interpDetermineFillRefRow(pSliceInfo, &pFillRefRow); for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; @@ -305,7 +386,7 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp code = colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false); QUERY_CHECK_CODE(code, lino, _end); continue; - } else if (!isInterpFunc(pExprInfo)) { + } else if (!isInterpFunc(pExprInfo) && !isIrowtsOriginPseudoColumn(pExprInfo)) { if (isGroupKeyFunc(pExprInfo) || isSelectGroupConstValueFunc(pExprInfo)) { if (pSrcBlock != NULL) { int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; @@ -344,7 +425,7 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp continue; } - int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + int32_t srcSlot = isIrowtsOriginPseudoColumn(pExprInfo) ? pSliceInfo->tsCol.slotId : pExprInfo->base.pParam[0].pCol->slotId; switch (pSliceInfo->fillType) { case TSDB_FILL_NULL: case TSDB_FILL_NULL_F: { @@ -352,6 +433,25 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp break; } + case TSDB_FILL_PREV: + case TSDB_FILL_NEAR: + case TSDB_FILL_NEXT: { + if (!needFill) { + hasInterp = false; + break; + } + if (pFillRefRow) { + code = interpColSetKey(pDst, rows, taosArrayGet(pFillRefRow, srcSlot)); + QUERY_CHECK_CODE(code, lino, _end); + break; + } + // no fillRefRow, fall through to fill specified values + if (srcSlot == pSliceInfo->tsCol.slotId) { + // if is _irowts_origin, there is no value to fill, just set to null + colDataSetNULL(pDst, rows); + break; + } + } case TSDB_FILL_SET_VALUE: case TSDB_FILL_SET_VALUE_F: { SVariant* pVar = &pSliceInfo->pFillColInfo[fillColIndex].fillVal; @@ -444,38 +544,6 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp taosMemoryFree(current.val); break; } - case TSDB_FILL_PREV: { - if (!pSliceInfo->isPrevRowSet) { - hasInterp = false; - break; - } - - SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot); - if (pkey->isNull == false) { - code = colDataSetVal(pDst, rows, pkey->pData, false); - QUERY_CHECK_CODE(code, lino, _end); - } else { - colDataSetNULL(pDst, rows); - } - break; - } - - case TSDB_FILL_NEXT: { - if (!pSliceInfo->isNextRowSet) { - hasInterp = false; - break; - } - - SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot); - if (pkey->isNull == false) { - code = colDataSetVal(pDst, rows, pkey->pData, false); - QUERY_CHECK_CODE(code, lino, _end); - } else { - colDataSetNULL(pDst, rows); - } - break; - } - case TSDB_FILL_NONE: default: break; @@ -507,7 +575,7 @@ static int32_t addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSu int32_t dstSlot = pExprInfo->base.resSchema.slotId; SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); - if (isIrowtsPseudoColumn(pExprInfo)) { + if (isIrowtsPseudoColumn(pExprInfo) || isIrowtsOriginPseudoColumn(pExprInfo)) { code = colDataSetVal(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false); QUERY_CHECK_CODE(code, lino, _end); } else if (isIsfilledPseudoColumn(pExprInfo)) { @@ -1233,6 +1301,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN pInfo->pNextGroupRes = NULL; pInfo->pRemainRes = NULL; pInfo->remainIndex = 0; + pInfo->rangeInterval = pInterpPhyNode->rangeInterval; if (pInfo->hasPk) { pInfo->prevKey.numOfPKs = 1; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 5ce15a32b2..f8f8fc705e 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -5596,6 +5596,20 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .sprocessFunc = NULL, .finalizeFunc = NULL }, + { + .name = "_irowts_origin", + .type = FUNCTION_TYPE_IROWTS_ORIGIN, + .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_INTERP_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, + .parameters = {.minParamNum = 0, + .maxParamNum = 0, + .paramInfoPattern = 0, + .outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_TIMESTAMP_TYPE}}, + .translateFunc = translateTimePseudoColumn, + .getEnvFunc = getTimePseudoFuncEnv, + .initFunc = NULL, + .sprocessFunc = NULL, + .finalizeFunc = NULL + }, }; // clang-format on diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 5dfb94ba6e..ccff881e49 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -703,3 +703,10 @@ bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId) { bool fmIsCountLikeFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_COUNT_LIKE_FUNC); } + +bool fmIsRowTsOriginFunc(int32_t funcId) { + if (funcId < 0 || funcId >= funcMgtBuiltinsNum) { + return false; + } + return FUNCTION_TYPE_IROWTS_ORIGIN == funcMgtBuiltins[funcId].type; +} diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index ba87912670..c2deec9c68 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -684,6 +684,8 @@ static int32_t logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFunc CLONE_NODE_FIELD(pFillValues); CLONE_NODE_FIELD(pTimeSeries); COPY_OBJECT_FIELD(streamNodeOption, sizeof(SStreamNodeOption)); + COPY_SCALAR_FIELD(rangeInterval); + COPY_SCALAR_FIELD(rangeIntervalUnit); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index f7f858db78..2d074dfc68 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1298,6 +1298,8 @@ static const char* jkInterpFuncLogicPlanFillMode = "fillMode"; static const char* jkInterpFuncLogicPlanFillValues = "FillValues"; static const char* jkInterpFuncLogicPlanTimeSeries = "TimeSeries"; static const char* jkInterpFuncLogicPlanStreamNodeOption = "StreamNodeOption"; +static const char* jkInterpFuncLogicPlanRangeInterval = "RangeInterval"; +static const char* jkInterpFuncLogicPlanRangeIntervalUnit = "RangeIntervalUnit"; static int32_t logicInterpFuncNodeToJson(const void* pObj, SJson* pJson) { const SInterpFuncLogicNode* pNode = (const SInterpFuncLogicNode*)pObj; @@ -1333,6 +1335,12 @@ static int32_t logicInterpFuncNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkInterpFuncLogicPlanStreamNodeOption, streamNodeOptionToJson, &pNode->streamNodeOption); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkInterpFuncLogicPlanRangeInterval, pNode->rangeInterval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkInterpFuncLogicPlanRangeIntervalUnit, pNode->rangeIntervalUnit); + } return code; } @@ -1371,6 +1379,12 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonToObject(pJson, jkInterpFuncLogicPlanStreamNodeOption, jsonToStreamNodeOption, &pNode->streamNodeOption); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkInterpFuncLogicPlanRangeInterval, &pNode->rangeInterval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkInterpFuncLogicPlanRangeIntervalUnit, &pNode->rangeIntervalUnit); + } return code; } @@ -3312,6 +3326,8 @@ static const char* jkInterpFuncPhysiPlanFillMode = "FillMode"; static const char* jkInterpFuncPhysiPlanFillValues = "FillValues"; static const char* jkInterpFuncPhysiPlanTimeSeries = "TimeSeries"; static const char* jkInterpFuncPhysiPlanStreamNodeOption = "StreamNodeOption"; +static const char* jkInterpFuncPhysiPlanRangeInterval = "RangeInterval"; +static const char* jkInterpFuncPhysiPlanRangeIntervalUnit = "RangeIntervalUnit"; static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { const SInterpFuncPhysiNode* pNode = (const SInterpFuncPhysiNode*)pObj; @@ -3350,6 +3366,12 @@ static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanStreamNodeOption, streamNodeOptionToJson, &pNode->streamNodeOption); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanRangeInterval, pNode->rangeInterval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanRangeIntervalUnit, pNode->rangeIntervalUnit); + } return code; } @@ -3391,6 +3413,12 @@ static int32_t jsonToPhysiInterpFuncNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonToObject(pJson, jkInterpFuncPhysiPlanStreamNodeOption, jsonToStreamNodeOption, &pNode->streamNodeOption); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkInterpFuncPhysiPlanRangeInterval, &pNode->rangeInterval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkInterpFuncPhysiPlanRangeIntervalUnit, &pNode->rangeIntervalUnit); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index bf3ea66e47..fe95322577 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -3746,7 +3746,9 @@ enum { PHY_INERP_FUNC_CODE_INTERVAL_UNIT, PHY_INERP_FUNC_CODE_FILL_MODE, PHY_INERP_FUNC_CODE_FILL_VALUES, - PHY_INERP_FUNC_CODE_TIME_SERIES + PHY_INERP_FUNC_CODE_TIME_SERIES, + PHY_INTERP_FUNC_CODE_RANGE_INTERVAL, + PHY_INTERP_FUNC_CODE_RANGE_INTERVAL_UNIT, }; static int32_t physiInterpFuncNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -3777,6 +3779,12 @@ static int32_t physiInterpFuncNodeToMsg(const void* pObj, STlvEncoder* pEncoder) if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_INERP_FUNC_CODE_TIME_SERIES, nodeToMsg, pNode->pTimeSeries); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_INTERP_FUNC_CODE_RANGE_INTERVAL, pNode->rangeInterval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_INTERP_FUNC_CODE_RANGE_INTERVAL_UNIT, pNode->rangeIntervalUnit); + } return code; } @@ -3815,6 +3823,12 @@ static int32_t msgToPhysiInterpFuncNode(STlvDecoder* pDecoder, void* pObj) { case PHY_INERP_FUNC_CODE_TIME_SERIES: code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTimeSeries); break; + case PHY_INTERP_FUNC_CODE_RANGE_INTERVAL: + code = tlvDecodeI64(pTlv, &pNode->rangeInterval); + break; + case PHY_INTERP_FUNC_CODE_RANGE_INTERVAL_UNIT: + code = tlvDecodeI8(pTlv, &pNode->rangeIntervalUnit); + break; default: break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index ac29021e83..04b0d56a63 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -470,6 +470,8 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { case QUERY_NODE_SET_OPERATOR: code = makeNode(type, sizeof(SSetOperator), &pNode); break; + case QUERY_NODE_RANGE_AROUND: + code = makeNode(type, sizeof(SRangeAroundNode), &pNode); break; case QUERY_NODE_SELECT_STMT: code = makeNode(type, sizeof(SSelectStmt), &pNode); break; @@ -1245,6 +1247,12 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pWin->pEndOffset); break; } + case QUERY_NODE_RANGE_AROUND: { + SRangeAroundNode* pAround = (SRangeAroundNode*)pNode; + nodesDestroyNode(pAround->pInterval); + nodesDestroyNode(pAround->pTimepoint); + break; + } case QUERY_NODE_SET_OPERATOR: { SSetOperator* pStmt = (SSetOperator*)pNode; nodesDestroyList(pStmt->pProjectionList); @@ -1266,6 +1274,7 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyList(pStmt->pGroupByList); nodesDestroyNode(pStmt->pHaving); nodesDestroyNode(pStmt->pRange); + nodesDestroyNode(pStmt->pRangeAround); nodesDestroyNode(pStmt->pEvery); nodesDestroyNode(pStmt->pFill); nodesDestroyList(pStmt->pOrderByList); diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index fb0529e6d1..09b07c3fcb 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -163,6 +163,7 @@ SNode* createFillNode(SAstCreateContext* pCxt, EFillMode mode, SNode* pValue SNode* createGroupingSetNode(SAstCreateContext* pCxt, SNode* pNode); SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd); SNode* createInterpTimePoint(SAstCreateContext* pCxt, SNode* pPoint); +SNode* createInterpTimeAround(SAstCreateContext* pCxt, SNode* pTimepoint, SNode* pInterval); SNode* createWhenThenNode(SAstCreateContext* pCxt, SNode* pWhen, SNode* pThen); SNode* createCaseWhenNode(SAstCreateContext* pCxt, SNode* pCase, SNodeList* pWhenThenList, SNode* pElse); SNode* createAlterSingleTagColumnNode(SAstCreateContext* pCtx, SToken* token, SNode* pVal); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index b8a5e6f98f..19d34c99b0 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -1209,6 +1209,7 @@ pseudo_column(A) ::= QTAGS(B). pseudo_column(A) ::= FLOW(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); } pseudo_column(A) ::= FHIGH(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); } pseudo_column(A) ::= FROWTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); } +pseudo_column(A) ::= IROWTS_ORIGIN(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); } function_expression(A) ::= function_name(B) NK_LP expression_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); } function_expression(A) ::= star_func(B) NK_LP star_func_para_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); } @@ -1454,7 +1455,7 @@ jlimit_clause_opt(A) ::= JLIMIT NK_INTEGER(B). query_specification(A) ::= SELECT hint_list(M) set_quantifier_opt(B) tag_mode_opt(N) select_list(C) from_clause_opt(D) where_clause_opt(E) partition_by_clause_opt(F) range_opt(J) every_opt(K) - fill_opt(L) twindow_clause_opt(G) group_by_clause_opt(H) having_clause_opt(I). { + interp_fill_opt(L) twindow_clause_opt(G) group_by_clause_opt(H) having_clause_opt(I). { A = createSelectStmt(pCxt, B, C, D, M); A = setSelectStmtTagMode(pCxt, A, N); A = addWhereClause(pCxt, A, E); @@ -1539,19 +1540,41 @@ interval_sliding_duration_literal(A) ::= NK_VARIABLE(B). interval_sliding_duration_literal(A) ::= NK_STRING(B). { A = createRawExprNode(pCxt, &B, createDurationValueNode(pCxt, &B)); } interval_sliding_duration_literal(A) ::= NK_INTEGER(B). { A = createRawExprNode(pCxt, &B, createDurationValueNode(pCxt, &B)); } +interp_fill_opt(A) ::= . { A = NULL; } +interp_fill_opt(A) ::= fill_value(B). { A = B; } +interp_fill_opt(A) ::= + FILL NK_LP fill_position_mode_extension(B) NK_COMMA expression_list(C) NK_RP. { A = createFillNode(pCxt, B, createNodeListNode(pCxt, C)); } +interp_fill_opt(A) ::= FILL NK_LP interp_fill_mode(B) NK_RP. { A = createFillNode(pCxt, B, NULL); } + fill_opt(A) ::= . { A = NULL; } fill_opt(A) ::= FILL NK_LP fill_mode(B) NK_RP. { A = createFillNode(pCxt, B, NULL); } -fill_opt(A) ::= FILL NK_LP VALUE NK_COMMA expression_list(B) NK_RP. { A = createFillNode(pCxt, FILL_MODE_VALUE, createNodeListNode(pCxt, B)); } -fill_opt(A) ::= FILL NK_LP VALUE_F NK_COMMA expression_list(B) NK_RP. { A = createFillNode(pCxt, FILL_MODE_VALUE_F, createNodeListNode(pCxt, B)); } +fill_opt(A) ::= fill_value(B). { A = B; } + +fill_value(A) ::= FILL NK_LP VALUE NK_COMMA expression_list(B) NK_RP. { A = createFillNode(pCxt, FILL_MODE_VALUE, createNodeListNode(pCxt, B)); } +fill_value(A) ::= FILL NK_LP VALUE_F NK_COMMA expression_list(B) NK_RP. { A = createFillNode(pCxt, FILL_MODE_VALUE_F, createNodeListNode(pCxt, B)); } %type fill_mode { EFillMode } %destructor fill_mode { } fill_mode(A) ::= NONE. { A = FILL_MODE_NONE; } -fill_mode(A) ::= PREV. { A = FILL_MODE_PREV; } fill_mode(A) ::= NULL. { A = FILL_MODE_NULL; } fill_mode(A) ::= NULL_F. { A = FILL_MODE_NULL_F; } fill_mode(A) ::= LINEAR. { A = FILL_MODE_LINEAR; } -fill_mode(A) ::= NEXT. { A = FILL_MODE_NEXT; } +fill_mode(A) ::= fill_position_mode(B). { A = B; } + +%type fill_position_mode { EFillMode } +%destructor fill_position_mode { } +fill_position_mode(A) ::= PREV. { A = FILL_MODE_PREV; } +fill_position_mode(A) ::= NEXT. { A = FILL_MODE_NEXT; } + +%type fill_position_mode_extension { EFillMode } +%destructor fill_position_mode_extension { } +fill_position_mode_extension(A) ::= fill_position_mode(B). { A = B; } +fill_position_mode_extension(A) ::= NEAR. { A = FILL_MODE_NEAR; } + +%type interp_fill_mode { EFillMode } +%destructor interp_fill_mode { } +interp_fill_mode(A) ::= fill_mode(B). { A = B; } +interp_fill_mode(A) ::= NEAR. { A = FILL_MODE_NEAR; } %type group_by_clause_opt { SNodeList* } %destructor group_by_clause_opt { nodesDestroyList($$); } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index e05a399d32..7503e91dbe 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -1460,6 +1460,9 @@ _err: SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd) { CHECK_PARSER_STATUS(pCxt); + if (pEnd && nodeType(pEnd) == QUERY_NODE_VALUE && ((SValueNode*)pEnd)->flag & VALUE_FLAG_IS_DURATION) { + return createInterpTimeAround(pCxt, pStart, pEnd); + } return createBetweenAnd(pCxt, createPrimaryKeyCol(pCxt, NULL), pStart, pEnd); _err: nodesDestroyNode(pStart); @@ -1475,6 +1478,19 @@ _err: return NULL; } +SNode* createInterpTimeAround(SAstCreateContext* pCxt, SNode* pTimepoint, SNode* pInterval) { + CHECK_PARSER_STATUS(pCxt); + SRangeAroundNode* pAround = NULL; + pCxt->errCode = nodesMakeNode(QUERY_NODE_RANGE_AROUND, (SNode**)&pAround); + CHECK_PARSER_STATUS(pCxt); + pAround->pTimepoint = createInterpTimePoint(pCxt, pTimepoint); + pAround->pInterval = pInterval; + CHECK_PARSER_STATUS(pCxt); + return (SNode*)pAround; +_err: + return NULL; +} + SNode* createWhenThenNode(SAstCreateContext* pCxt, SNode* pWhen, SNode* pThen) { CHECK_PARSER_STATUS(pCxt); SWhenThenNode* pWhenThen = NULL; @@ -1632,8 +1648,15 @@ _err: SNode* addRangeClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pRange) { CHECK_PARSER_STATUS(pCxt); + SSelectStmt* pSelect = (SSelectStmt*)pStmt; if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) { - ((SSelectStmt*)pStmt)->pRange = pRange; + if (pRange && nodeType(pRange) == QUERY_NODE_RANGE_AROUND) { + pSelect->pRangeAround = pRange; + SRangeAroundNode* pAround = (SRangeAroundNode*)pRange; + TSWAP(pSelect->pRange, pAround->pTimepoint); + } else { + pSelect->pRange = pRange; + } } return pStmt; _err: diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index 1db139b8d4..9100e83f42 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -176,6 +176,7 @@ static SKeyword keywordTable[] = { {"NORMAL", TK_NORMAL}, {"NCHAR", TK_NCHAR}, {"NEXT", TK_NEXT}, + {"NEAR", TK_NEAR}, {"NMATCH", TK_NMATCH}, {"NONE", TK_NONE}, {"NOT", TK_NOT}, @@ -326,6 +327,7 @@ static SKeyword keywordTable[] = { {"WRITE", TK_WRITE}, {"_C0", TK_ROWTS}, {"_IROWTS", TK_IROWTS}, + {"_IROWTS_ORIGIN", TK_IROWTS_ORIGIN}, {"_ISFILLED", TK_ISFILLED}, {"_QDURATION", TK_QDURATION}, {"_QEND", TK_QEND}, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index dd08ee7654..7e5d9375ac 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1260,7 +1260,8 @@ bool isPrimaryKeyImpl(SNode* pExpr) { FUNCTION_TYPE_LAST_ROW == pFunc->funcType || FUNCTION_TYPE_TIMETRUNCATE == pFunc->funcType) { return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0)); } else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType || - FUNCTION_TYPE_IROWTS == pFunc->funcType || FUNCTION_TYPE_FORECAST_ROWTS == pFunc->funcType) { + FUNCTION_TYPE_IROWTS == pFunc->funcType || FUNCTION_TYPE_IROWTS_ORIGIN == pFunc->funcType || + FUNCTION_TYPE_FORECAST_ROWTS == pFunc->funcType) { return true; } } else if (QUERY_NODE_OPERATOR == nodeType(pExpr)) { @@ -5388,14 +5389,11 @@ static int32_t convertFillValue(STranslateContext* pCxt, SDataType dt, SNodeList return code; } -static int32_t checkFillValues(STranslateContext* pCxt, SFillNode* pFill, SNodeList* pProjectionList) { - if (FILL_MODE_VALUE != pFill->mode && FILL_MODE_VALUE_F != pFill->mode) { - return TSDB_CODE_SUCCESS; - } - +static int32_t doCheckFillValues(STranslateContext* pCxt, SFillNode* pFill, SNodeList* pProjectionList) { int32_t fillNo = 0; SNodeListNode* pFillValues = (SNodeListNode*)pFill->pValues; SNode* pProject = NULL; + if (!pFillValues) return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "Filled values number mismatch"); FOREACH(pProject, pProjectionList) { if (needFill(pProject)) { if (fillNo >= LIST_LENGTH(pFillValues->pNodeList)) { @@ -5415,6 +5413,14 @@ static int32_t checkFillValues(STranslateContext* pCxt, SFillNode* pFill, SNodeL return TSDB_CODE_SUCCESS; } +static int32_t checkFillValues(STranslateContext* pCxt, SFillNode* pFill, SNodeList* pProjectionList) { + if (FILL_MODE_VALUE != pFill->mode && FILL_MODE_VALUE_F != pFill->mode) { + return TSDB_CODE_SUCCESS; + } + return doCheckFillValues(pCxt, pFill, pProjectionList); + return TSDB_CODE_SUCCESS; +} + static int32_t translateFillValues(STranslateContext* pCxt, SSelectStmt* pSelect) { if (NULL == pSelect->pWindow || QUERY_NODE_INTERVAL_WINDOW != nodeType(pSelect->pWindow) || NULL == ((SIntervalWindowNode*)pSelect->pWindow)->pFill) { @@ -6185,6 +6191,31 @@ static int32_t translateInterpEvery(STranslateContext* pCxt, SNode** pEvery) { return code; } +static EDealRes hasRowTsOriginFuncWalkNode(SNode* pNode, void* ctx) { + bool *hasRowTsOriginFunc = ctx; + if (nodeType(pNode) == QUERY_NODE_FUNCTION) { + SFunctionNode* pFunc = (SFunctionNode*)pNode; + if (fmIsRowTsOriginFunc(pFunc->funcId)) { + *hasRowTsOriginFunc = true; + return DEAL_RES_END; + } + } + return DEAL_RES_CONTINUE; +} + +static int32_t checkInterpForStream(STranslateContext* pCxt, SSelectStmt* pSelect) { + if (pCxt->createStream) { + SFillNode* pFill = (SFillNode*)pSelect->pFill; + if (pFill->mode == FILL_MODE_NEAR) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "FILL NEAR is not supported by stream"); + } + if (pSelect->pRangeAround) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "RANGE with around is not supported by stream"); + } + } + return TSDB_CODE_SUCCESS; +} + static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect) { int32_t code = TSDB_CODE_SUCCESS; @@ -6200,13 +6231,60 @@ static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect if (TSDB_CODE_SUCCESS == code) { code = checkFill(pCxt, (SFillNode*)pSelect->pFill, (SValueNode*)pSelect->pEvery, true); } + bool hasRowTsOriginFunc = false; + nodesWalkExprs(pSelect->pProjectionList, hasRowTsOriginFuncWalkNode, &hasRowTsOriginFunc); + if (hasRowTsOriginFunc && pCxt->createStream) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "_irowts_origin is not supported by stream"); + } if (TSDB_CODE_SUCCESS == code) { - code = checkFillValues(pCxt, (SFillNode*)pSelect->pFill, pSelect->pProjectionList); + SFillNode* pFill = (SFillNode*)pSelect->pFill; + if (pSelect->pRangeAround) { + if (pFill->mode != FILL_MODE_PREV && pFill->mode != FILL_MODE_NEXT && pFill->mode != FILL_MODE_NEAR) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE, + "Range with interval can only used with fill PREV/NEXT/NEAR"); + } + if (TSDB_CODE_SUCCESS == code) + code = doCheckFillValues(pCxt, pFill, pSelect->pProjectionList); + } else { + if (FILL_MODE_PREV == pFill->mode || FILL_MODE_NEXT == pFill->mode || FILL_MODE_NEAR == pFill->mode) { + if (pFill->pValues) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "Can't specify fill values"); + } + } else { + if (hasRowTsOriginFunc) return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC, "_irowts_origin can only be used with FILL PREV/NEXT/NEAR"); + } + code = checkFillValues(pCxt, pFill, pSelect->pProjectionList); + } } return code; } +static int32_t translateInterpAround(STranslateContext* pCxt, SSelectStmt* pSelect) { + int32_t code = 0; + if (pSelect->pRangeAround) { + SRangeAroundNode* pAround = (SRangeAroundNode*)pSelect->pRangeAround; + code = translateExpr(pCxt, &pAround->pInterval); + if (TSDB_CODE_SUCCESS == code) { + if (nodeType(pAround->pInterval) == QUERY_NODE_VALUE) { + SValueNode* pVal = (SValueNode*)pAround->pInterval; + if (pVal->datum.i == 0) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE, "Range interval cannot be 0"); + } + int8_t unit = pVal->unit; + if (unit == TIME_UNIT_YEAR || unit == TIME_UNIT_MONTH) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, + "Unsupported time unit in RANGE clause"); + } + } else { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE, "Invalid range interval"); + } + } + } + return code; +} + static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { if (!pSelect->hasInterpFunc) { if (NULL != pSelect->pRange || NULL != pSelect->pEvery || NULL != pSelect->pFill) { @@ -6227,6 +6305,7 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { } } + int32_t code = 0; if (pCxt->createStream) { if (NULL != pSelect->pRange) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, @@ -6238,23 +6317,40 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { "Missing EVERY clause or FILL clause"); } } else { - if (NULL == pSelect->pRange || NULL == pSelect->pEvery || NULL == pSelect->pFill) { - if (pSelect->pRange != NULL && QUERY_NODE_OPERATOR == nodeType(pSelect->pRange) && pSelect->pEvery == NULL) { - // single point interp every can be omitted - } else { + if (!pSelect->pRangeAround) { + if (NULL == pSelect->pRange || NULL == pSelect->pEvery || NULL == pSelect->pFill) { + if (pSelect->pRange != NULL && QUERY_NODE_OPERATOR == nodeType(pSelect->pRange) && pSelect->pEvery == NULL) { + // single point interp every can be omitted + } else { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE, + "Missing RANGE clause, EVERY clause or FILL clause"); + } + } + } else { + if (pSelect->pEvery) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE, - "Missing RANGE clause, EVERY clause or FILL clause"); + "Range clause with around interval can't be used with EVERY clause"); + } + if (!pSelect->pFill) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE, + "Missing FILL clause"); } } } - int32_t code = translateExpr(pCxt, &pSelect->pRange); + code = translateExpr(pCxt, &pSelect->pRange); if (TSDB_CODE_SUCCESS == code) { code = translateInterpEvery(pCxt, &pSelect->pEvery); } if (TSDB_CODE_SUCCESS == code) { code = translateInterpFill(pCxt, pSelect); } + if (TSDB_CODE_SUCCESS == code) { + code = translateInterpAround(pCxt, pSelect); + } + if (TSDB_CODE_SUCCESS == code) { + code = checkInterpForStream(pCxt, pSelect); + } return code; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index f4c4926ca1..47a0144243 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -973,6 +973,16 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p pInterpFunc->precision = pSelect->precision; } + if (TSDB_CODE_SUCCESS == code && pSelect->pRangeAround) { + SNode* pRangeInterval = ((SRangeAroundNode*)pSelect->pRangeAround)->pInterval; + if (!pRangeInterval || nodeType(pRangeInterval) != QUERY_NODE_VALUE) { + code = TSDB_CODE_PAR_INTERNAL_ERROR; + } else { + pInterpFunc->rangeInterval = ((SValueNode*)pRangeInterval)->datum.i; + pInterpFunc->rangeIntervalUnit = ((SValueNode*)pRangeInterval)->unit; + } + } + // set the output if (TSDB_CODE_SUCCESS == code) { code = createColumnByRewriteExprs(pInterpFunc->pFuncs, &pInterpFunc->node.pTargets); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 347aeba95e..a0912ec8c7 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1958,6 +1958,8 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh pInterpFunc->intervalUnit = pFuncLogicNode->intervalUnit; pInterpFunc->precision = pFuncLogicNode->node.precision; pInterpFunc->pFillValues = NULL; + pInterpFunc->rangeInterval = pFuncLogicNode->rangeInterval; + pInterpFunc->rangeIntervalUnit = pFuncLogicNode->rangeIntervalUnit; code = nodesCloneNode(pFuncLogicNode->pFillValues, &pInterpFunc->pFillValues); if (TSDB_CODE_SUCCESS != code) { code = code; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index ab6bfcfd1a..34ff38eb43 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -156,6 +156,11 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsma2.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsma2.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery2.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp_extension.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp_extension.py -R +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp_extension.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp_extension.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp_extension.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py diff --git a/tests/system-test/2-query/interp_extension.py b/tests/system-test/2-query/interp_extension.py new file mode 100644 index 0000000000..b9283f7ca1 --- /dev/null +++ b/tests/system-test/2-query/interp_extension.py @@ -0,0 +1,518 @@ +import queue +from random import randrange +import time +import threading +import secrets +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from datetime import timezone +from tzlocal import get_localzone +# from tmqCommon import * + + +ROUND: int = 500 + +class TDTestCase: + updatecfgDict = {'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 4, 'debugFlag': 143} + check_failed: bool = False + + def __init__(self): + self.vgroups = 4 + self.ctbNum = 10 + self.rowsPerTbl = 10000 + self.duraion = '1h' + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def create_database(self, tsql, dbName, dropFlag=1, vgroups=2, replica=1, duration: str = '1d'): + if dropFlag == 1: + tsql.execute("drop database if exists %s" % (dbName)) + + tsql.execute("create database if not exists %s vgroups %d replica %d duration %s" % ( + dbName, vgroups, replica, duration)) + tdLog.debug("complete to create database %s" % (dbName)) + return + + def create_stable(self, tsql, paraDict): + colString = tdCom.gen_column_type_str( + colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"]) + tagString = tdCom.gen_tag_type_str( + tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"]) + sqlString = f"create table if not exists %s.%s (%s) tags (%s)" % ( + paraDict["dbName"], paraDict["stbName"], colString, tagString) + tdLog.debug("%s" % (sqlString)) + tsql.execute(sqlString) + return + + def create_ctable(self, tsql=None, dbName='dbx', stbName='stb', ctbPrefix='ctb', ctbNum=1, ctbStartIdx=0): + for i in range(ctbNum): + sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % (dbName, ctbPrefix, i+ctbStartIdx, dbName, stbName, (i+ctbStartIdx) % 5, i+ctbStartIdx + random.randint( + 1, 100), i+ctbStartIdx + random.randint(1, 100), i+ctbStartIdx + random.randint(1, 100), i+ctbStartIdx + random.randint(1, 100), i+ctbStartIdx + random.randint(1, 100)) + tsql.execute(sqlString) + + tdLog.debug("complete to create %d child tables by %s.%s" % + (ctbNum, dbName, stbName)) + return + + def init_normal_tb(self, tsql, db_name: str, tb_name: str, rows: int, start_ts: int, ts_step: int): + sql = 'CREATE TABLE %s.%s (ts timestamp, c1 INT, c2 INT, c3 INT, c4 double, c5 VARCHAR(255))' % ( + db_name, tb_name) + tsql.execute(sql) + sql = 'INSERT INTO %s.%s values' % (db_name, tb_name) + for j in range(rows): + sql += f'(%d, %d,%d,%d,{random.random()},"varchar_%d"),' % (start_ts + j * ts_step + randrange(500), j % + 10 + randrange(200), j % 10, j % 10, j % 10 + randrange(100)) + tsql.execute(sql) + + def insert_data(self, tsql, dbName, ctbPrefix, ctbNum, rowsPerTbl, batchNum, startTs, tsStep): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" % dbName) + pre_insert = "insert into " + sql = pre_insert + + for i in range(ctbNum): + rowsBatched = 0 + sql += " %s.%s%d values " % (dbName, ctbPrefix, i) + for j in range(rowsPerTbl): + if (i < ctbNum/2): + sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') " % (startTs + j*tsStep + randrange( + 500), j % 10 + randrange(100), j % 10 + randrange(200), j % 10, j % 10, j % 10, j % 10, j % 10, j % 10) + else: + sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') " % ( + startTs + j*tsStep + randrange(500), j % 10, j % 10, j % 10, j % 10, j % 10, j % 10) + rowsBatched += 1 + if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + rowsBatched = 0 + if j < rowsPerTbl - 1: + sql = "insert into %s.%s%d values " % (dbName, ctbPrefix, i) + else: + sql = "insert into " + if sql != pre_insert: + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def init_data(self, db: str = 'test', ctb_num: int = 10, rows_per_ctb: int = 10000, start_ts: int = 1537146000000, ts_step: int = 500): + tdLog.printNoPrefix( + "======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': db, + 'dropFlag': 1, + 'vgroups': 4, + 'stbName': 'meters', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count': 1}, {'type': 'BIGINT', 'count': 1}, {'type': 'FLOAT', 'count': 1}, {'type': 'DOUBLE', 'count': 1}, {'type': 'smallint', 'count': 1}, {'type': 'tinyint', 'count': 1}, {'type': 'bool', 'count': 1}, {'type': 'binary', 'len': 10, 'count': 1}, {'type': 'nchar', 'len': 10, 'count': 1}], + 'tagSchema': [{'type': 'INT', 'count': 1}, {'type': 'nchar', 'len': 20, 'count': 1}, {'type': 'binary', 'len': 20, 'count': 1}, {'type': 'BIGINT', 'count': 1}, {'type': 'smallint', 'count': 1}, {'type': 'DOUBLE', 'count': 1}], + 'ctbPrefix': 't', + 'ctbStartIdx': 0, + 'ctbNum': ctb_num, + 'rowsPerTbl': rows_per_ctb, + 'batchNum': 3000, + 'startTs': start_ts, + 'tsStep': ts_step} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = ctb_num + paraDict['rowsPerTbl'] = rows_per_ctb + + tdLog.info("create database") + self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], + vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion) + + tdLog.info("create stb") + self.create_stable(tsql=tdSql, paraDict=paraDict) + + tdLog.info("create child tables") + self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"], + stbName=paraDict["stbName"], ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"], ctbStartIdx=paraDict["ctbStartIdx"]) + self.insert_data(tsql=tdSql, dbName=paraDict["dbName"], + ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"], + rowsPerTbl=paraDict["rowsPerTbl"], batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"], tsStep=paraDict["tsStep"]) + self.init_normal_tb(tdSql, paraDict['dbName'], 'norm_tb', + paraDict['rowsPerTbl'], paraDict['startTs'], paraDict['tsStep']) + + def run(self): + self.init_data() + self.test_interp_extension() + + + def datetime_add_tz(self, dt): + if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None: + return dt.replace(tzinfo=get_localzone()) + return dt + + def binary_search_ts(self, select_results, ts): + mid = 0 + try: + found: bool = False + start = 0 + end = len(select_results) - 1 + while start <= end: + mid = (start + end) // 2 + if self.datetime_add_tz(select_results[mid][0]) == ts: + found = True + return mid + elif self.datetime_add_tz(select_results[mid][0]) < ts: + start = mid + 1 + else: + end = mid - 1 + + if not found: + tdLog.exit(f"cannot find ts in select results {ts} {select_results}") + return start + except Exception as e: + tdLog.debug(f"{select_results[mid][0]}, {ts}, {len(select_results)}, {select_results[mid]}") + self.check_failed = True + tdLog.exit(f"binary_search_ts error: {e}") + + def distance(self, ts1, ts2): + return abs(self.datetime_add_tz(ts1) - self.datetime_add_tz(ts2)) + + ## TODO pass last position to avoid search from the beginning + def is_nearest(self, select_results, irowts_origin, irowts): + if len(select_results) <= 1: + return True + try: + #tdLog.debug(f"check is_nearest for: {irowts_origin} {irowts}") + idx = self.binary_search_ts(select_results, irowts_origin) + if idx == 0: + #tdLog.debug(f"prev row: null,cur row: {select_results[idx]}, next row: {select_results[idx + 1]}") + res = self.distance(irowts, select_results[idx][0]) <= self.distance(irowts, select_results[idx + 1][0]) + if not res: + tdLog.debug(f"prev row: null,cur row: {select_results[idx]}, next row: {select_results[idx + 1]}, irowts_origin: {irowts_origin}, irowts: {irowts}") + return res + if idx == len(select_results) - 1: + #tdLog.debug(f"prev row: {select_results[idx - 1]},cur row: {select_results[idx]}, next row: null") + res = self.distance(irowts, select_results[idx][0]) <= self.distance(irowts, select_results[idx - 1][0]) + if not res: + tdLog.debug(f"prev row: {select_results[idx - 1]},cur row: {select_results[idx]}, next row: null, irowts_origin: {irowts_origin}, irowts: {irowts}") + return res + #tdLog.debug(f"prev row: {select_results[idx - 1]},cur row: {select_results[idx]}, next row: {select_results[idx + 1]}") + res = self.distance(irowts, select_results[idx][0]) <= self.distance(irowts, select_results[idx - 1][0]) and self.distance(irowts, select_results[idx][0]) <= self.distance(irowts, select_results[idx + 1][0]) + if not res: + tdLog.debug(f"prev row: {select_results[idx - 1]},cur row: {select_results[idx]}, next row: {select_results[idx + 1]}, irowts_origin: {irowts_origin}, irowts: {irowts}") + return res + except Exception as e: + self.check_failed = True + tdLog.exit(f"is_nearest error: {e}") + + ## interp_results: _irowts_origin, _irowts, ..., _isfilled + ## select_all_results must be sorted by ts in ascending order + def check_result_for_near(self, interp_results, select_all_results, sql, sql_select_all): + #tdLog.debug(f"check_result_for_near for sql: {sql}, sql_select_all{sql_select_all}") + for row in interp_results: + if row[0].tzinfo is None or row[0].tzinfo.utcoffset(row[0]) is None: + irowts_origin = row[0].replace(tzinfo=get_localzone()) + irowts = row[1].replace(tzinfo=get_localzone()) + else: + irowts_origin = row[0] + irowts = row[1] + if not self.is_nearest(select_all_results, irowts_origin, irowts): + self.check_failed = True + tdLog.exit(f"interp result is not the nearest for row: {row}, {sql}") + + def query_routine(self, sql_queue: queue.Queue, output_queue: queue.Queue): + try: + tdcom = TDCom() + cli = tdcom.newTdSql() + while True: + item = sql_queue.get() + if item is None or self.check_failed: + output_queue.put(None) + break + (sql, sql_select_all, _) = item + cli.query(sql, queryTimes=1) + interp_results = cli.queryResult + if sql_select_all is not None: + cli.query(sql_select_all, queryTimes=1) + output_queue.put((sql, interp_results, cli.queryResult, sql_select_all)) + cli.close() + except Exception as e: + self.check_failed = True + tdLog.exit(f"query_routine error: {e}") + + def interp_check_near_routine(self, select_all_results, output_queue: queue.Queue): + try: + while True: + item = output_queue.get() + if item is None: + break + (sql, interp_results, all_results, sql_select_all) = item + if all_results is not None: + self.check_result_for_near(interp_results, all_results, sql, sql_select_all) + else: + self.check_result_for_near(interp_results, select_all_results, sql, None) + except Exception as e: + self.check_failed = True + tdLog.exit(f"interp_check_near_routine error: {e}") + + def create_qt_threads(self, sql_queue: queue.Queue, output_queue: queue.Queue, num: int): + qts = [] + for _ in range(0, num): + qt = threading.Thread(target=self.query_routine, args=(sql_queue, output_queue)) + qt.start() + qts.append(qt) + return qts + + def wait_qt_threads(self, qts: list): + for qt in qts: + qt.join() + + ### first(ts) | last(ts) + ### 2018-09-17 09:00:00.047 | 2018-09-17 10:23:19.863 + def test_interp_fill_extension_near(self): + sql = f"select last(ts), c1, c2 from test.t0" + tdSql.query(sql, queryTimes=1) + lastRow = tdSql.queryResult[0] + sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.t0 range('2020-02-01 00:00:00', '2020-02-01 00:01:00') every(1s) fill(near)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(61) + for i in range(0, 61): + tdSql.checkData(i, 0, lastRow[0]) + tdSql.checkData(i, 2, lastRow[1]) + tdSql.checkData(i, 3, lastRow[2]) + tdSql.checkData(i, 4, True) + + sql = f"select ts, c1, c2 from test.t0 where ts between '2018-09-17 08:59:59' and '2018-09-17 09:00:06' order by ts asc" + tdSql.query(sql, queryTimes=1) + select_all_results = tdSql.queryResult + sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.t0 range('2018-09-17 09:00:00', '2018-09-17 09:00:05') every(1s) fill(near)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(6) + self.check_result_for_near(tdSql.queryResult, select_all_results, sql, None) + + + start = 1537146000000 + end = 1537151000000 + + tdSql.query("select ts, c1, c2 from test.t0 order by ts asc", queryTimes=1) + select_all_results = tdSql.queryResult + + qt_threads_num = 4 + sql_queue = queue.Queue() + output_queue = queue.Queue() + qts = self.create_qt_threads(sql_queue, output_queue, qt_threads_num) + ct = threading.Thread(target=self.interp_check_near_routine, args=(select_all_results, output_queue)) + ct.start() + for i in range(0, ROUND): + range_start = random.randint(start, end) + range_end = random.randint(range_start, end) + every = random.randint(1, 15) + #tdLog.debug(f"range_start: {range_start}, range_end: {range_end}") + sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.t0 range({range_start}, {range_end}) every({every}s) fill(near)" + sql_queue.put((sql, None, None)) + + ### no prev only, no next only, no prev and no next, have prev and have next + for i in range(0, ROUND): + range_point = random.randint(start, end) + ## all data points are can be filled by near + sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.t0 range({range_point}, 1h) fill(near, 1, 2)" + sql_queue.put((sql, None, None)) + + for i in range(0, ROUND): + range_start = random.randint(start, end) + range_end = random.randint(range_start, end) + range_where_start = random.randint(start, end) + range_where_end = random.randint(range_where_start, end) + every = random.randint(1, 15) + sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.t0 where ts between {range_where_start} and {range_where_end} range({range_start}, {range_end}) every({every}s) fill(near)" + tdSql.query(f'select to_char(cast({range_where_start} as timestamp), \'YYYY-MM-DD HH24:MI:SS.MS\'), to_char(cast({range_where_end} as timestamp), \'YYYY-MM-DD HH24:MI:SS.MS\')', queryTimes=1) + where_start_str = tdSql.queryResult[0][0] + where_end_str = tdSql.queryResult[0][1] + sql_select_all = f"select ts, c1, c2 from test.t0 where ts between '{where_start_str}' and '{where_end_str}' order by ts asc" + sql_queue.put((sql, sql_select_all, None)) + + for i in range(0, ROUND): + range_start = random.randint(start, end) + range_end = random.randint(range_start, end) + range_where_start = random.randint(start, end) + range_where_end = random.randint(range_where_start, end) + range_point = random.randint(start, end) + sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.t0 where ts between {range_where_start} and {range_where_end} range({range_point}, 1h) fill(near, 1, 2)" + tdSql.query(f'select to_char(cast({range_where_start} as timestamp), \'YYYY-MM-DD HH24:MI:SS.MS\'), to_char(cast({range_where_end} as timestamp), \'YYYY-MM-DD HH24:MI:SS.MS\')', queryTimes=1) + where_start_str = tdSql.queryResult[0][0] + where_end_str = tdSql.queryResult[0][1] + sql_select_all = f"select ts, c1, c2 from test.t0 where ts between '{where_start_str}' and '{where_end_str}' order by ts asc" + sql_queue.put((sql, sql_select_all, None)) + for i in range(0, qt_threads_num): + sql_queue.put(None) + self.wait_qt_threads(qts) + ct.join() + + if self.check_failed: + tdLog.exit("interp check near failed") + + def test_interp_extension_irowts_origin(self): + sql = f"select _irowts, _irowts_origin, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:01:00') every(1s) fill(near)" + tdSql.query(sql, queryTimes=1) + + sql = f"select _irowts, _irowts_origin, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:01:00') every(1s) fill(NULL)" + tdSql.error(sql, -2147473833) + sql = f"select _irowts, _irowts_origin, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:01:00') every(1s) fill(linear)" + tdSql.error(sql, -2147473833) + sql = f"select _irowts, _irowts_origin, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:01:00') every(1s) fill(NULL_F)" + tdSql.error(sql, -2147473833) + + + def test_interp_fill_extension(self): + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) fill(near, 0, 0)" + tdSql.query(sql, queryTimes=1) + + ### must specify value + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) fill(near)" + tdSql.error(sql, -2147473915) + ### num of fill value mismatch + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) fill(near, 1)" + tdSql.error(sql, -2147473915) + + ### range with around interval cannot specify two timepoints, currently not supported + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:02:00', 1h) fill(near, 1, 1)" + tdSql.error(sql, -2147473920) ## syntax error + + ### NULL/linear cannot specify other values + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:02:00') fill(NULL, 1, 1)" + tdSql.error(sql, -2147473920) + + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:02:00') fill(linear, 1, 1)" + tdSql.error(sql, -2147473920) ## syntax error + + ### cannot have every clause with range around + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) every(1s) fill(prev, 1, 1)" + tdSql.error(sql, -2147473827) ## TSDB_CODE_PAR_INVALID_INTERP_CLAUSE + + ### cannot specify near/prev/next values when using range + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:01:00') every(1s) fill(near, 1, 1)" + tdSql.error(sql, -2147473915) ## cannot specify values + + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00') every(1s) fill(near, 1, 1)" + tdSql.error(sql, -2147473915) ## cannot specify values + + ### when range around interval is set, only prev/next/near is supported + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) fill(NULL, 1, 1)" + tdSql.error(sql, -2147473920) + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) fill(NULL)" + tdSql.error(sql, -2147473861) ## TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE + + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) fill(linear, 1, 1)" + tdSql.error(sql, -2147473920) + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) fill(linear)" + tdSql.error(sql, -2147473861) ## TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE + + ### range interval cannot be 0 + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 0h) fill(near, 1, 1)" + tdSql.error(sql, -2147473861) ## TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE + + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1y) fill(near, 1, 1)" + tdSql.error(sql, -2147473915) ## TSDB_CODE_PAR_WRONG_VALUE_TYPE + + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1n) fill(near, 1, 1)" + tdSql.error(sql, -2147473915) ## TSDB_CODE_PAR_WRONG_VALUE_TYPE + + sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters where ts between '2020-02-01 00:00:00' and '2020-02-01 00:00:00' range('2020-02-01 00:00:00', 1h) fill(near, 1, 1)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(0) + + ### first(ts) | last(ts) + ### 2018-09-17 09:00:00.047 | 2018-09-17 10:23:19.863 + sql = "select to_char(first(ts), 'YYYY-MM-DD HH24:MI:SS.MS') from test.meters" + tdSql.query(sql, queryTimes=1) + first_ts = tdSql.queryResult[0][0] + sql = "select to_char(last(ts), 'YYYY-MM-DD HH24:MI:SS.MS') from test.meters" + tdSql.query(sql, queryTimes=1) + last_ts = tdSql.queryResult[0][0] + sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1d) fill(near, 1, 2)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(1) + tdSql.checkData(0, 0, None) + tdSql.checkData(0, 1, '2020-02-01 00:00:00.000') + tdSql.checkData(0, 2, 1) + tdSql.checkData(0, 3, 2) + tdSql.checkData(0, 4, True) + + sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2018-09-18 10:25:00', 1d) fill(prev, 3, 4)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(1) + tdSql.checkData(0, 0, None) + tdSql.checkData(0, 1, '2018-09-18 10:25:00.000') + tdSql.checkData(0, 2, 3) + tdSql.checkData(0, 3, 4) + tdSql.checkData(0, 4, True) + + sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2018-09-16 08:25:00', 1d) fill(next, 5, 6)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(1) + tdSql.checkData(0, 0, None) + tdSql.checkData(0, 1, '2018-09-16 08:25:00.000') + tdSql.checkData(0, 2, 5) + tdSql.checkData(0, 3, 6) + tdSql.checkData(0, 4, True) + + sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2018-09-16 09:00:01', 1d) fill(next, 1, 2)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(1) + tdSql.checkData(0, 0, first_ts) + tdSql.checkData(0, 1, '2018-09-16 09:00:01') + tdSql.checkData(0, 4, True) + + sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2018-09-18 10:23:19', 1d) fill(prev, 1, 2)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, '2018-09-18 10:23:19') + tdSql.checkData(0, 4, True) + + sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.meters range('{last_ts}', 1a) fill(next, 1, 2)" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, last_ts) + tdSql.checkData(0, 4, False) + + + def test_interval_fill_extension(self): + ## not allowed + sql = f"select count(*) from test.meters interval(1s) fill(near)" + tdSql.error(sql, -2147473920) ## syntax error + + sql = f"select count(*) from test.meters interval(1s) fill(prev, 1)" + tdSql.error(sql, -2147473920) ## syntax error + sql = f"select count(*) from test.meters interval(1s) fill(next, 1)" + tdSql.error(sql, -2147473920) ## syntax error + + def test_interp_fill_extension_stream(self): + ## near is not supported + sql = f"create stream s1 trigger force_window_close into test.s_res_tb as select _irowts, interp(c1), interp(c2)from test.meters partition by tbname every(1s) fill(near);" + tdSql.error(sql, -2147473851) ## TSDB_CODE_PAR_INVALID_STREAM_QUERY + + ## _irowts_origin is not support + sql = f"create stream s1 trigger force_window_close into test.s_res_tb as select _irowts_origin, interp(c1), interp(c2)from test.meters partition by tbname every(1s) fill(prev);" + tdSql.error(sql, -2147473851) ## TSDB_CODE_PAR_INVALID_STREAM_QUERY + + sql = f"create stream s1 trigger force_window_close into test.s_res_tb as select _irowts, interp(c1), interp(c2)from test.meters partition by tbname every(1s) fill(next, 1, 1);" + tdSql.error(sql, -2147473915) ## cannot specify values + + def test_interp_extension(self): + self.test_interp_fill_extension_near() + self.test_interp_extension_irowts_origin() + self.test_interp_fill_extension() + self.test_interval_fill_extension() + self.test_interp_fill_extension_stream() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())