support interp fill extension

This commit is contained in:
wangjiaming0909 2024-11-28 18:29:20 +08:00 committed by wangjiaming0909
parent 675008dd18
commit 3239a10b76
26 changed files with 922 additions and 58 deletions

View File

@ -63,6 +63,7 @@ window_clause: {
interp_clause:
RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val)
| RANGE(ts_val, interval_val) FILL(fill_mod_and_val)
partition_by_clause:
PARTITION BY partition_by_expr [, partition_by_expr] ...
@ -256,6 +257,13 @@ The \_irowts pseudo column can only be used with the interp function to return t
SELECT _irowts, interp(current) FROM meters RANGE('2020-01-01 10:00:00', '2020-01-01 10:30:00') EVERY(1s) FILL(linear);
```
** \_IROWTS\_ORIGIN**
Pseudo column `_irowts_origin` is used to get the original timestamp of the row used for filling. It can only be used with the INTERP query. `_irowts_origin` is not supported in stream. Only FILL PREV/NEXT/NEAR is supported. If there is not data in range, return NULL.
```sql
SELECT _irowts_origin, interp(current) FROM meters RANGE('2020-01-01 10:00:00', '2020-01-01 10:30:00') EVERY(1s) FILL(PREV);
```
## Query Objects
After the FROM keyword, there can be several table (supertable) lists, or the results of subqueries.

View File

@ -1869,6 +1869,9 @@ INTERP(expr [, ignore_null_values])
- INTERP can be used with the pseudo-column _irowts to return the timestamp corresponding to the interpolation point (supported from version 3.0.2.0 onwards).
- INTERP can also be used with the pseudo-column _isfilled to show whether the returned result is from the original record or produced by the interpolation algorithm (supported from version 3.0.3.0 onwards).
- When querying a table with a composite primary key, if there are multiple records with the same timestamp, only the data corresponding to the minimum composite primary key will participate in the calculation.
- `INTERP` support NEAR fill mode. When `FILL(NEAR)` is used, the nearest value to the interpolation point is used to fill the missing value. If there are multiple values with the same distance to the interpolation point, previous row is used. NEAR fill is not supported in stream computation. For example, `SELECT _irowts,INTERP(current) FROM test.meters RANGE('2017-07-22 00:00:00','2017-07-24 12:25:00') EVERY(1h) FILL(NEAR)`.
- Psedo column `_irowts_origin` can be used along with `INTERP` only when using NEAR/NEXT/PREV fill mode.
- `INTERP` RANGE clause support INTERVAL extension, like `RANGE('2023-01-01 00:00:00', 1d)`. The second parameter is the interval length, and the unit cannot use y(year), n(month). The interval length must be an integer, with no quotes, the value can't be 0. The interval length is used to restrict the search range from the time point specified. For example, `SELECT _irowts,INTERP(current) FROM test.meters RANGE('2017-07-22 00:00:00', 1d) FILL(NEAR, 1)`. The query will return the interpolation result of the current column within the range of 1 day from the time point '2017-07-22 00:00:00'. If there is no data within the range, the specified value in FILL will be used. Only FILL PREV/NEXT/NEAR is supported in this case. It's illegal to use `EVERY` clause and NOT specify values in FILL clause in this case. None data-point range clause with INTERVAL extension is not supported currently, like `RANGE('2017-07-22 00:00:00', '2017-07-22 12:00:00', 1h)` is not supported.
### LAST

View File

@ -63,6 +63,7 @@ window_clause: {
interp_clause:
RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val)
| RANGE(ts_val, interval_val) FILL(fill_mod_and_val)
partition_by_clause:
PARTITION BY partition_by_expr [, partition_by_expr] ...
@ -255,6 +256,13 @@ select _rowts, max(current) from meters;
select _irowts, interp(current) from meters range('2020-01-01 10:00:00', '2020-01-01 10:30:00') every(1s) fill(linear);
```
**\_IROWTS\_ORIGIN**
`_irowts_origin` 伪列只能与 interp 函数一起使用,不支持在流计算中使用, 仅适用于FILL类型为PREV/NEXT/NEAR, 用于返回 interp 函数所使用的原始数据的时间戳列。若范围内无值, 则返回 NULL。
```sql
select _iorwts_origin, interp(current) from meters range('2020-01-01 10:00:00', '2020-01-01 10:30:00') every(1s) fill(NEXT);
```
## 查询对象
FROM 关键字后面可以是若干个表(超级表)列表,也可以是子查询的结果。

View File

@ -1838,6 +1838,9 @@ ignore_null_values: {
- INTERP 可以与伪列 _irowts 一起使用,返回插值点所对应的时间戳(3.0.2.0 版本以后支持)。
- INTERP 可以与伪列 _isfilled 一起使用,显示返回结果是否为原始记录或插值算法产生的数据(3.0.3.0 版本以后支持)。
- INTERP 对于带复合主键的表的查询,若存在相同时间戳的数据,则只有对应的复合主键最小的数据参与运算。
- INTERP 查询支持新的NEAR FILL模式, 即当需要FILL时, 使用距离当前时间点最近的数据进行插值, 当前后时间戳与当前时间断面一样近时, FILL 前一行的值. 此模式在流计算中和窗口查询中不支持。例如: SELECT INTERP(col) FROM tb RANGE('2023-01-01 00:00:00', '2023-01-01 00:10:00') FILL(NEAR)。
- INTERP 只有在使用FILL PREV/NEXT/NEAR 模式时才可以使用伪列 `_irowts_origin`
- INTERP `RANEG`子句支持时间范围的扩展, 如`RANGE('2023-01-01 00:00:00', 10s)`表示在时间点'2023-01-01 00:00:00'查找前后10s的数据进行插值, FILL PREV/NEXT/NEAR分别表示从时间点向前/向后/前后查找数据, 若时间点周围没有数据, 则使用FILL指定的值进行插值, 因此此时FILL子句必须指定值。例如: SELECT INTERP(col) FROM tb RANGE('2023-01-01 00:00:00', 10s) FILL(PREV, 1). 目前仅支持时间点和时间范围的组合, 不支持时间区间和时间范围的组合, 即不支持RANGE('2023-01-01 00:00:00', '2023-02-01 00:00:00', 1h)。所指定的时间范围规则与EVERY类似, 单位不能是年或月, 值不能为0, 不能带引号。使用该扩展时, 不支持除FILL PREV/NEXT/NEAR外的其他FILL模式, 且不能指定EVERY子句
### LAST

View File

@ -188,6 +188,7 @@ typedef enum _mgmt_table {
#define TSDB_FILL_LINEAR 5
#define TSDB_FILL_PREV 6
#define TSDB_FILL_NEXT 7
#define TSDB_FILL_NEAR 8
#define TSDB_ALTER_USER_PASSWD 0x1
#define TSDB_ALTER_USER_SUPERUSER 0x2
@ -264,6 +265,7 @@ typedef enum ENodeType {
QUERY_NODE_COLUMN_OPTIONS,
QUERY_NODE_TSMA_OPTIONS,
QUERY_NODE_ANOMALY_WINDOW,
QUERY_NODE_RANGE_AROUND,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR = 100,

View File

@ -155,6 +155,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_FORECAST_LOW,
FUNCTION_TYPE_FORECAST_HIGH,
FUNCTION_TYPE_FORECAST_ROWTS,
FUNCTION_TYPE_IROWTS_ORIGIN,
// internal function
FUNCTION_TYPE_SELECT_VALUE = 3750,
@ -289,6 +290,7 @@ bool fmIsPrimaryKeyFunc(int32_t funcId);
bool fmIsProcessByRowFunc(int32_t funcId);
bool fmisSelectGroupConstValueFunc(int32_t funcId);
bool fmIsElapsedFunc(int32_t funcId);
bool fmIsRowTsOriginFunc(int32_t funcId);
void getLastCacheDataType(SDataType* pType, int32_t pkBytes);
int32_t createFunction(const char* pName, SNodeList* pParameterList, SFunctionNode** pFunc);

View File

@ -213,6 +213,8 @@ typedef struct SInterpFuncLogicNode {
EFillMode fillMode;
SNode* pFillValues; // SNodeListNode
SNode* pTimeSeries; // SColumnNode
int64_t rangeInterval;
int8_t rangeIntervalUnit;
SStreamNodeOption streamNodeOption;
} SInterpFuncLogicNode;
@ -528,6 +530,8 @@ typedef struct SInterpFuncPhysiNode {
SNode* pFillValues; // SNodeListNode
SNode* pTimeSeries; // SColumnNode
SStreamNodeOption streamNodeOption;
int64_t rangeInterval;
int8_t rangeIntervalUnit;
} SInterpFuncPhysiNode;
typedef SInterpFuncPhysiNode SStreamInterpFuncPhysiNode;

View File

@ -362,7 +362,8 @@ typedef enum EFillMode {
FILL_MODE_NULL,
FILL_MODE_NULL_F,
FILL_MODE_LINEAR,
FILL_MODE_NEXT
FILL_MODE_NEXT,
FILL_MODE_NEAR,
} EFillMode;
typedef enum ETimeLineMode {
@ -407,6 +408,11 @@ typedef struct SWindowOffsetNode {
SNode* pEndOffset; // SValueNode
} SWindowOffsetNode;
typedef struct SRangeAroundNode {
ENodeType type;
SNode* pTimepoint;
SNode* pInterval;
} SRangeAroundNode;
typedef struct SSelectStmt {
ENodeType type; // QUERY_NODE_SELECT_STMT
@ -421,6 +427,7 @@ typedef struct SSelectStmt {
SNodeList* pGroupByList; // SGroupingSetNode
SNode* pHaving;
SNode* pRange;
SNode* pRangeAround;
SNode* pEvery;
SNode* pFill;
SNodeList* pOrderByList; // SOrderByExprNode

View File

@ -1059,6 +1059,7 @@ void destroyFlusedPos(void* pRes);
bool isIrowtsPseudoColumn(SExprInfo* pExprInfo);
bool isIsfilledPseudoColumn(SExprInfo* pExprInfo);
bool isInterpFunc(SExprInfo* pExprInfo);
bool isIrowtsOriginPseudoColumn(SExprInfo* pExprInfo);
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
void* decodeSSessionKey(void* buf, SSessionKey* key);

View File

@ -2390,6 +2390,9 @@ int32_t convertFillType(int32_t mode) {
case FILL_MODE_LINEAR:
type = TSDB_FILL_LINEAR;
break;
case FILL_MODE_NEAR:
type = TSDB_FILL_NEAR;
break;
default:
type = TSDB_FILL_NONE;
}

View File

@ -48,6 +48,7 @@ typedef struct STimeSliceOperatorInfo {
int32_t remainIndex; // the remaining index in the block to be processed
bool hasPk;
SColumn pkCol;
int64_t rangeInterval;
} STimeSliceOperatorInfo;
static void destroyTimeSliceOperatorInfo(void* param);
@ -179,6 +180,11 @@ bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) {
return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0);
}
bool isIrowtsOriginPseudoColumn(SExprInfo* pExprInfo) {
const char* name = pExprInfo->pExpr->_function.functionName;
return (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_irowts_origin") == 0);
}
static void tRowGetKeyFromColData(int64_t ts, SColumnInfoData* pPkCol, int32_t rowIndex, SRowKey* pKey) {
pKey->ts = ts;
pKey->numOfPKs = 1;
@ -277,6 +283,79 @@ bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bo
return false;
}
static int32_t interpColSetKey(SColumnInfoData* pDst, int32_t rowNum, SGroupKeys* pKey) {
int32_t code = 0;
if (pKey->isNull == false) {
code = colDataSetVal(pDst, rowNum, pKey->pData, false);
} else {
colDataSetNULL(pDst, rowNum);
}
return code;
}
static bool interpSetFillRowWithRangeIntervalCheck(STimeSliceOperatorInfo* pSliceInfo, SArray** ppFillRow, SArray* pFillRefRow, int64_t fillRefRowTs) {
*ppFillRow = NULL;
if (pSliceInfo->rangeInterval <= 0 || llabs(fillRefRowTs - pSliceInfo->current) <= pSliceInfo->rangeInterval) {
*ppFillRow = pFillRefRow;
return true;
}
return false;
}
static bool interpDetermineNearFillRow(STimeSliceOperatorInfo* pSliceInfo, SArray** ppNearRow) {
if (!pSliceInfo->isPrevRowSet && !pSliceInfo->isNextRowSet) {
*ppNearRow = NULL;
return false;
}
SGroupKeys *pPrevTsKey = NULL, *pNextTsKey = NULL;
int64_t* pPrevTs = NULL, *pNextTs = NULL;
if (pSliceInfo->isPrevRowSet) {
pPrevTsKey = taosArrayGet(pSliceInfo->pPrevRow, pSliceInfo->tsCol.slotId);
pPrevTs = (int64_t*)pPrevTsKey->pData;
}
if (pSliceInfo->isNextRowSet) {
pNextTsKey = taosArrayGet(pSliceInfo->pNextRow, pSliceInfo->tsCol.slotId);
pNextTs = (int64_t*)pNextTsKey->pData;
}
if (!pPrevTsKey) {
*ppNearRow = pSliceInfo->pNextRow;
(void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppNearRow, pSliceInfo->pNextRow, *pNextTs);
} else if (!pNextTsKey) {
*ppNearRow = pSliceInfo->pPrevRow;
(void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppNearRow, pSliceInfo->pPrevRow, *pPrevTs);
} else {
if (llabs(pSliceInfo->current - *pPrevTs) <= llabs(*pNextTs - pSliceInfo->current)) {
// take prev if euqal
(void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppNearRow, pSliceInfo->pPrevRow, *pPrevTs);
} else {
(void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppNearRow, pSliceInfo->pNextRow, *pNextTs);
}
}
return true;
}
static bool interpDetermineFillRefRow(STimeSliceOperatorInfo* pSliceInfo, SArray** ppOutRow) {
bool needFill = false;
if (pSliceInfo->fillType == TSDB_FILL_PREV) {
if (pSliceInfo->isPrevRowSet) {
SGroupKeys* pTsCol = taosArrayGet(pSliceInfo->pPrevRow, pSliceInfo->tsCol.slotId);
(void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppOutRow, pSliceInfo->pPrevRow, *(int64_t*)pTsCol->pData);
needFill = true;
}
} else if (pSliceInfo->fillType == TSDB_FILL_NEXT) {
if (pSliceInfo->isNextRowSet) {
SGroupKeys* pTsCol = taosArrayGet(pSliceInfo->pNextRow, pSliceInfo->tsCol.slotId);
(void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppOutRow, pSliceInfo->pNextRow, *(int64_t*)pTsCol->pData);
needFill = true;
}
} else if (pSliceInfo->fillType == TSDB_FILL_NEAR) {
needFill = interpDetermineNearFillRow(pSliceInfo, ppOutRow);
} else {
needFill = true;
}
return needFill;
}
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
SSDataBlock* pSrcBlock, int32_t index, bool beforeTs, SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
@ -290,6 +369,8 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
int32_t fillColIndex = 0;
int32_t groupKeyIndex = 0;
bool hasInterp = true;
SArray* pFillRefRow = NULL;
bool needFill = interpDetermineFillRefRow(pSliceInfo, &pFillRefRow);
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
@ -305,7 +386,7 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
code = colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false);
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else if (!isInterpFunc(pExprInfo)) {
} else if (!isInterpFunc(pExprInfo) && !isIrowtsOriginPseudoColumn(pExprInfo)) {
if (isGroupKeyFunc(pExprInfo) || isSelectGroupConstValueFunc(pExprInfo)) {
if (pSrcBlock != NULL) {
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
@ -344,7 +425,7 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
continue;
}
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
int32_t srcSlot = isIrowtsOriginPseudoColumn(pExprInfo) ? pSliceInfo->tsCol.slotId : pExprInfo->base.pParam[0].pCol->slotId;
switch (pSliceInfo->fillType) {
case TSDB_FILL_NULL:
case TSDB_FILL_NULL_F: {
@ -352,6 +433,25 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
break;
}
case TSDB_FILL_PREV:
case TSDB_FILL_NEAR:
case TSDB_FILL_NEXT: {
if (!needFill) {
hasInterp = false;
break;
}
if (pFillRefRow) {
code = interpColSetKey(pDst, rows, taosArrayGet(pFillRefRow, srcSlot));
QUERY_CHECK_CODE(code, lino, _end);
break;
}
// no fillRefRow, fall through to fill specified values
if (srcSlot == pSliceInfo->tsCol.slotId) {
// if is _irowts_origin, there is no value to fill, just set to null
colDataSetNULL(pDst, rows);
break;
}
}
case TSDB_FILL_SET_VALUE:
case TSDB_FILL_SET_VALUE_F: {
SVariant* pVar = &pSliceInfo->pFillColInfo[fillColIndex].fillVal;
@ -444,38 +544,6 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
taosMemoryFree(current.val);
break;
}
case TSDB_FILL_PREV: {
if (!pSliceInfo->isPrevRowSet) {
hasInterp = false;
break;
}
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
if (pkey->isNull == false) {
code = colDataSetVal(pDst, rows, pkey->pData, false);
QUERY_CHECK_CODE(code, lino, _end);
} else {
colDataSetNULL(pDst, rows);
}
break;
}
case TSDB_FILL_NEXT: {
if (!pSliceInfo->isNextRowSet) {
hasInterp = false;
break;
}
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot);
if (pkey->isNull == false) {
code = colDataSetVal(pDst, rows, pkey->pData, false);
QUERY_CHECK_CODE(code, lino, _end);
} else {
colDataSetNULL(pDst, rows);
}
break;
}
case TSDB_FILL_NONE:
default:
break;
@ -507,7 +575,7 @@ static int32_t addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSu
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
if (isIrowtsPseudoColumn(pExprInfo)) {
if (isIrowtsPseudoColumn(pExprInfo) || isIrowtsOriginPseudoColumn(pExprInfo)) {
code = colDataSetVal(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false);
QUERY_CHECK_CODE(code, lino, _end);
} else if (isIsfilledPseudoColumn(pExprInfo)) {
@ -1233,6 +1301,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN
pInfo->pNextGroupRes = NULL;
pInfo->pRemainRes = NULL;
pInfo->remainIndex = 0;
pInfo->rangeInterval = pInterpPhyNode->rangeInterval;
if (pInfo->hasPk) {
pInfo->prevKey.numOfPKs = 1;

View File

@ -5596,6 +5596,20 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.sprocessFunc = NULL,
.finalizeFunc = NULL
},
{
.name = "_irowts_origin",
.type = FUNCTION_TYPE_IROWTS_ORIGIN,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_INTERP_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
.parameters = {.minParamNum = 0,
.maxParamNum = 0,
.paramInfoPattern = 0,
.outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_TIMESTAMP_TYPE}},
.translateFunc = translateTimePseudoColumn,
.getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL,
.sprocessFunc = NULL,
.finalizeFunc = NULL
},
};
// clang-format on

View File

@ -703,3 +703,10 @@ bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId) {
bool fmIsCountLikeFunc(int32_t funcId) {
return isSpecificClassifyFunc(funcId, FUNC_MGT_COUNT_LIKE_FUNC);
}
bool fmIsRowTsOriginFunc(int32_t funcId) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false;
}
return FUNCTION_TYPE_IROWTS_ORIGIN == funcMgtBuiltins[funcId].type;
}

View File

@ -684,6 +684,8 @@ static int32_t logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFunc
CLONE_NODE_FIELD(pFillValues);
CLONE_NODE_FIELD(pTimeSeries);
COPY_OBJECT_FIELD(streamNodeOption, sizeof(SStreamNodeOption));
COPY_SCALAR_FIELD(rangeInterval);
COPY_SCALAR_FIELD(rangeIntervalUnit);
return TSDB_CODE_SUCCESS;
}

View File

@ -1298,6 +1298,8 @@ static const char* jkInterpFuncLogicPlanFillMode = "fillMode";
static const char* jkInterpFuncLogicPlanFillValues = "FillValues";
static const char* jkInterpFuncLogicPlanTimeSeries = "TimeSeries";
static const char* jkInterpFuncLogicPlanStreamNodeOption = "StreamNodeOption";
static const char* jkInterpFuncLogicPlanRangeInterval = "RangeInterval";
static const char* jkInterpFuncLogicPlanRangeIntervalUnit = "RangeIntervalUnit";
static int32_t logicInterpFuncNodeToJson(const void* pObj, SJson* pJson) {
const SInterpFuncLogicNode* pNode = (const SInterpFuncLogicNode*)pObj;
@ -1333,6 +1335,12 @@ static int32_t logicInterpFuncNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkInterpFuncLogicPlanStreamNodeOption, streamNodeOptionToJson, &pNode->streamNodeOption);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkInterpFuncLogicPlanRangeInterval, pNode->rangeInterval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkInterpFuncLogicPlanRangeIntervalUnit, pNode->rangeIntervalUnit);
}
return code;
}
@ -1371,6 +1379,12 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkInterpFuncLogicPlanStreamNodeOption, jsonToStreamNodeOption, &pNode->streamNodeOption);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkInterpFuncLogicPlanRangeInterval, &pNode->rangeInterval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkInterpFuncLogicPlanRangeIntervalUnit, &pNode->rangeIntervalUnit);
}
return code;
}
@ -3312,6 +3326,8 @@ static const char* jkInterpFuncPhysiPlanFillMode = "FillMode";
static const char* jkInterpFuncPhysiPlanFillValues = "FillValues";
static const char* jkInterpFuncPhysiPlanTimeSeries = "TimeSeries";
static const char* jkInterpFuncPhysiPlanStreamNodeOption = "StreamNodeOption";
static const char* jkInterpFuncPhysiPlanRangeInterval = "RangeInterval";
static const char* jkInterpFuncPhysiPlanRangeIntervalUnit = "RangeIntervalUnit";
static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) {
const SInterpFuncPhysiNode* pNode = (const SInterpFuncPhysiNode*)pObj;
@ -3350,6 +3366,12 @@ static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanStreamNodeOption, streamNodeOptionToJson, &pNode->streamNodeOption);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanRangeInterval, pNode->rangeInterval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanRangeIntervalUnit, pNode->rangeIntervalUnit);
}
return code;
}
@ -3391,6 +3413,12 @@ static int32_t jsonToPhysiInterpFuncNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkInterpFuncPhysiPlanStreamNodeOption, jsonToStreamNodeOption, &pNode->streamNodeOption);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkInterpFuncPhysiPlanRangeInterval, &pNode->rangeInterval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkInterpFuncPhysiPlanRangeIntervalUnit, &pNode->rangeIntervalUnit);
}
return code;
}

View File

@ -3746,7 +3746,9 @@ enum {
PHY_INERP_FUNC_CODE_INTERVAL_UNIT,
PHY_INERP_FUNC_CODE_FILL_MODE,
PHY_INERP_FUNC_CODE_FILL_VALUES,
PHY_INERP_FUNC_CODE_TIME_SERIES
PHY_INERP_FUNC_CODE_TIME_SERIES,
PHY_INTERP_FUNC_CODE_RANGE_INTERVAL,
PHY_INTERP_FUNC_CODE_RANGE_INTERVAL_UNIT,
};
static int32_t physiInterpFuncNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -3777,6 +3779,12 @@ static int32_t physiInterpFuncNodeToMsg(const void* pObj, STlvEncoder* pEncoder)
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_INERP_FUNC_CODE_TIME_SERIES, nodeToMsg, pNode->pTimeSeries);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_INTERP_FUNC_CODE_RANGE_INTERVAL, pNode->rangeInterval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, PHY_INTERP_FUNC_CODE_RANGE_INTERVAL_UNIT, pNode->rangeIntervalUnit);
}
return code;
}
@ -3815,6 +3823,12 @@ static int32_t msgToPhysiInterpFuncNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_INERP_FUNC_CODE_TIME_SERIES:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTimeSeries);
break;
case PHY_INTERP_FUNC_CODE_RANGE_INTERVAL:
code = tlvDecodeI64(pTlv, &pNode->rangeInterval);
break;
case PHY_INTERP_FUNC_CODE_RANGE_INTERVAL_UNIT:
code = tlvDecodeI8(pTlv, &pNode->rangeIntervalUnit);
break;
default:
break;
}

View File

@ -470,6 +470,8 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
case QUERY_NODE_SET_OPERATOR:
code = makeNode(type, sizeof(SSetOperator), &pNode);
break;
case QUERY_NODE_RANGE_AROUND:
code = makeNode(type, sizeof(SRangeAroundNode), &pNode); break;
case QUERY_NODE_SELECT_STMT:
code = makeNode(type, sizeof(SSelectStmt), &pNode);
break;
@ -1245,6 +1247,12 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pWin->pEndOffset);
break;
}
case QUERY_NODE_RANGE_AROUND: {
SRangeAroundNode* pAround = (SRangeAroundNode*)pNode;
nodesDestroyNode(pAround->pInterval);
nodesDestroyNode(pAround->pTimepoint);
break;
}
case QUERY_NODE_SET_OPERATOR: {
SSetOperator* pStmt = (SSetOperator*)pNode;
nodesDestroyList(pStmt->pProjectionList);
@ -1266,6 +1274,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyList(pStmt->pGroupByList);
nodesDestroyNode(pStmt->pHaving);
nodesDestroyNode(pStmt->pRange);
nodesDestroyNode(pStmt->pRangeAround);
nodesDestroyNode(pStmt->pEvery);
nodesDestroyNode(pStmt->pFill);
nodesDestroyList(pStmt->pOrderByList);

View File

@ -163,6 +163,7 @@ SNode* createFillNode(SAstCreateContext* pCxt, EFillMode mode, SNode* pValue
SNode* createGroupingSetNode(SAstCreateContext* pCxt, SNode* pNode);
SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd);
SNode* createInterpTimePoint(SAstCreateContext* pCxt, SNode* pPoint);
SNode* createInterpTimeAround(SAstCreateContext* pCxt, SNode* pTimepoint, SNode* pInterval);
SNode* createWhenThenNode(SAstCreateContext* pCxt, SNode* pWhen, SNode* pThen);
SNode* createCaseWhenNode(SAstCreateContext* pCxt, SNode* pCase, SNodeList* pWhenThenList, SNode* pElse);
SNode* createAlterSingleTagColumnNode(SAstCreateContext* pCtx, SToken* token, SNode* pVal);

View File

@ -1209,6 +1209,7 @@ pseudo_column(A) ::= QTAGS(B).
pseudo_column(A) ::= FLOW(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= FHIGH(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= FROWTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= IROWTS_ORIGIN(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
function_expression(A) ::= function_name(B) NK_LP expression_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
function_expression(A) ::= star_func(B) NK_LP star_func_para_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
@ -1454,7 +1455,7 @@ jlimit_clause_opt(A) ::= JLIMIT NK_INTEGER(B).
query_specification(A) ::=
SELECT hint_list(M) set_quantifier_opt(B) tag_mode_opt(N) select_list(C) from_clause_opt(D)
where_clause_opt(E) partition_by_clause_opt(F) range_opt(J) every_opt(K)
fill_opt(L) twindow_clause_opt(G) group_by_clause_opt(H) having_clause_opt(I). {
interp_fill_opt(L) twindow_clause_opt(G) group_by_clause_opt(H) having_clause_opt(I). {
A = createSelectStmt(pCxt, B, C, D, M);
A = setSelectStmtTagMode(pCxt, A, N);
A = addWhereClause(pCxt, A, E);
@ -1539,19 +1540,41 @@ interval_sliding_duration_literal(A) ::= NK_VARIABLE(B).
interval_sliding_duration_literal(A) ::= NK_STRING(B). { A = createRawExprNode(pCxt, &B, createDurationValueNode(pCxt, &B)); }
interval_sliding_duration_literal(A) ::= NK_INTEGER(B). { A = createRawExprNode(pCxt, &B, createDurationValueNode(pCxt, &B)); }
interp_fill_opt(A) ::= . { A = NULL; }
interp_fill_opt(A) ::= fill_value(B). { A = B; }
interp_fill_opt(A) ::=
FILL NK_LP fill_position_mode_extension(B) NK_COMMA expression_list(C) NK_RP. { A = createFillNode(pCxt, B, createNodeListNode(pCxt, C)); }
interp_fill_opt(A) ::= FILL NK_LP interp_fill_mode(B) NK_RP. { A = createFillNode(pCxt, B, NULL); }
fill_opt(A) ::= . { A = NULL; }
fill_opt(A) ::= FILL NK_LP fill_mode(B) NK_RP. { A = createFillNode(pCxt, B, NULL); }
fill_opt(A) ::= FILL NK_LP VALUE NK_COMMA expression_list(B) NK_RP. { A = createFillNode(pCxt, FILL_MODE_VALUE, createNodeListNode(pCxt, B)); }
fill_opt(A) ::= FILL NK_LP VALUE_F NK_COMMA expression_list(B) NK_RP. { A = createFillNode(pCxt, FILL_MODE_VALUE_F, createNodeListNode(pCxt, B)); }
fill_opt(A) ::= fill_value(B). { A = B; }
fill_value(A) ::= FILL NK_LP VALUE NK_COMMA expression_list(B) NK_RP. { A = createFillNode(pCxt, FILL_MODE_VALUE, createNodeListNode(pCxt, B)); }
fill_value(A) ::= FILL NK_LP VALUE_F NK_COMMA expression_list(B) NK_RP. { A = createFillNode(pCxt, FILL_MODE_VALUE_F, createNodeListNode(pCxt, B)); }
%type fill_mode { EFillMode }
%destructor fill_mode { }
fill_mode(A) ::= NONE. { A = FILL_MODE_NONE; }
fill_mode(A) ::= PREV. { A = FILL_MODE_PREV; }
fill_mode(A) ::= NULL. { A = FILL_MODE_NULL; }
fill_mode(A) ::= NULL_F. { A = FILL_MODE_NULL_F; }
fill_mode(A) ::= LINEAR. { A = FILL_MODE_LINEAR; }
fill_mode(A) ::= NEXT. { A = FILL_MODE_NEXT; }
fill_mode(A) ::= fill_position_mode(B). { A = B; }
%type fill_position_mode { EFillMode }
%destructor fill_position_mode { }
fill_position_mode(A) ::= PREV. { A = FILL_MODE_PREV; }
fill_position_mode(A) ::= NEXT. { A = FILL_MODE_NEXT; }
%type fill_position_mode_extension { EFillMode }
%destructor fill_position_mode_extension { }
fill_position_mode_extension(A) ::= fill_position_mode(B). { A = B; }
fill_position_mode_extension(A) ::= NEAR. { A = FILL_MODE_NEAR; }
%type interp_fill_mode { EFillMode }
%destructor interp_fill_mode { }
interp_fill_mode(A) ::= fill_mode(B). { A = B; }
interp_fill_mode(A) ::= NEAR. { A = FILL_MODE_NEAR; }
%type group_by_clause_opt { SNodeList* }
%destructor group_by_clause_opt { nodesDestroyList($$); }

View File

@ -1460,6 +1460,9 @@ _err:
SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd) {
CHECK_PARSER_STATUS(pCxt);
if (pEnd && nodeType(pEnd) == QUERY_NODE_VALUE && ((SValueNode*)pEnd)->flag & VALUE_FLAG_IS_DURATION) {
return createInterpTimeAround(pCxt, pStart, pEnd);
}
return createBetweenAnd(pCxt, createPrimaryKeyCol(pCxt, NULL), pStart, pEnd);
_err:
nodesDestroyNode(pStart);
@ -1475,6 +1478,19 @@ _err:
return NULL;
}
SNode* createInterpTimeAround(SAstCreateContext* pCxt, SNode* pTimepoint, SNode* pInterval) {
CHECK_PARSER_STATUS(pCxt);
SRangeAroundNode* pAround = NULL;
pCxt->errCode = nodesMakeNode(QUERY_NODE_RANGE_AROUND, (SNode**)&pAround);
CHECK_PARSER_STATUS(pCxt);
pAround->pTimepoint = createInterpTimePoint(pCxt, pTimepoint);
pAround->pInterval = pInterval;
CHECK_PARSER_STATUS(pCxt);
return (SNode*)pAround;
_err:
return NULL;
}
SNode* createWhenThenNode(SAstCreateContext* pCxt, SNode* pWhen, SNode* pThen) {
CHECK_PARSER_STATUS(pCxt);
SWhenThenNode* pWhenThen = NULL;
@ -1632,8 +1648,15 @@ _err:
SNode* addRangeClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pRange) {
CHECK_PARSER_STATUS(pCxt);
SSelectStmt* pSelect = (SSelectStmt*)pStmt;
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
((SSelectStmt*)pStmt)->pRange = pRange;
if (pRange && nodeType(pRange) == QUERY_NODE_RANGE_AROUND) {
pSelect->pRangeAround = pRange;
SRangeAroundNode* pAround = (SRangeAroundNode*)pRange;
TSWAP(pSelect->pRange, pAround->pTimepoint);
} else {
pSelect->pRange = pRange;
}
}
return pStmt;
_err:

View File

@ -176,6 +176,7 @@ static SKeyword keywordTable[] = {
{"NORMAL", TK_NORMAL},
{"NCHAR", TK_NCHAR},
{"NEXT", TK_NEXT},
{"NEAR", TK_NEAR},
{"NMATCH", TK_NMATCH},
{"NONE", TK_NONE},
{"NOT", TK_NOT},
@ -326,6 +327,7 @@ static SKeyword keywordTable[] = {
{"WRITE", TK_WRITE},
{"_C0", TK_ROWTS},
{"_IROWTS", TK_IROWTS},
{"_IROWTS_ORIGIN", TK_IROWTS_ORIGIN},
{"_ISFILLED", TK_ISFILLED},
{"_QDURATION", TK_QDURATION},
{"_QEND", TK_QEND},

View File

@ -1260,7 +1260,8 @@ bool isPrimaryKeyImpl(SNode* pExpr) {
FUNCTION_TYPE_LAST_ROW == pFunc->funcType || FUNCTION_TYPE_TIMETRUNCATE == pFunc->funcType) {
return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0));
} else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType ||
FUNCTION_TYPE_IROWTS == pFunc->funcType || FUNCTION_TYPE_FORECAST_ROWTS == pFunc->funcType) {
FUNCTION_TYPE_IROWTS == pFunc->funcType || FUNCTION_TYPE_IROWTS_ORIGIN == pFunc->funcType ||
FUNCTION_TYPE_FORECAST_ROWTS == pFunc->funcType) {
return true;
}
} else if (QUERY_NODE_OPERATOR == nodeType(pExpr)) {
@ -5388,14 +5389,11 @@ static int32_t convertFillValue(STranslateContext* pCxt, SDataType dt, SNodeList
return code;
}
static int32_t checkFillValues(STranslateContext* pCxt, SFillNode* pFill, SNodeList* pProjectionList) {
if (FILL_MODE_VALUE != pFill->mode && FILL_MODE_VALUE_F != pFill->mode) {
return TSDB_CODE_SUCCESS;
}
static int32_t doCheckFillValues(STranslateContext* pCxt, SFillNode* pFill, SNodeList* pProjectionList) {
int32_t fillNo = 0;
SNodeListNode* pFillValues = (SNodeListNode*)pFill->pValues;
SNode* pProject = NULL;
if (!pFillValues) return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "Filled values number mismatch");
FOREACH(pProject, pProjectionList) {
if (needFill(pProject)) {
if (fillNo >= LIST_LENGTH(pFillValues->pNodeList)) {
@ -5415,6 +5413,14 @@ static int32_t checkFillValues(STranslateContext* pCxt, SFillNode* pFill, SNodeL
return TSDB_CODE_SUCCESS;
}
static int32_t checkFillValues(STranslateContext* pCxt, SFillNode* pFill, SNodeList* pProjectionList) {
if (FILL_MODE_VALUE != pFill->mode && FILL_MODE_VALUE_F != pFill->mode) {
return TSDB_CODE_SUCCESS;
}
return doCheckFillValues(pCxt, pFill, pProjectionList);
return TSDB_CODE_SUCCESS;
}
static int32_t translateFillValues(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pWindow || QUERY_NODE_INTERVAL_WINDOW != nodeType(pSelect->pWindow) ||
NULL == ((SIntervalWindowNode*)pSelect->pWindow)->pFill) {
@ -6185,6 +6191,31 @@ static int32_t translateInterpEvery(STranslateContext* pCxt, SNode** pEvery) {
return code;
}
static EDealRes hasRowTsOriginFuncWalkNode(SNode* pNode, void* ctx) {
bool *hasRowTsOriginFunc = ctx;
if (nodeType(pNode) == QUERY_NODE_FUNCTION) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
if (fmIsRowTsOriginFunc(pFunc->funcId)) {
*hasRowTsOriginFunc = true;
return DEAL_RES_END;
}
}
return DEAL_RES_CONTINUE;
}
static int32_t checkInterpForStream(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (pCxt->createStream) {
SFillNode* pFill = (SFillNode*)pSelect->pFill;
if (pFill->mode == FILL_MODE_NEAR) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "FILL NEAR is not supported by stream");
}
if (pSelect->pRangeAround) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "RANGE with around is not supported by stream");
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect) {
int32_t code = TSDB_CODE_SUCCESS;
@ -6200,13 +6231,60 @@ static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect
if (TSDB_CODE_SUCCESS == code) {
code = checkFill(pCxt, (SFillNode*)pSelect->pFill, (SValueNode*)pSelect->pEvery, true);
}
bool hasRowTsOriginFunc = false;
nodesWalkExprs(pSelect->pProjectionList, hasRowTsOriginFuncWalkNode, &hasRowTsOriginFunc);
if (hasRowTsOriginFunc && pCxt->createStream) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"_irowts_origin is not supported by stream");
}
if (TSDB_CODE_SUCCESS == code) {
code = checkFillValues(pCxt, (SFillNode*)pSelect->pFill, pSelect->pProjectionList);
SFillNode* pFill = (SFillNode*)pSelect->pFill;
if (pSelect->pRangeAround) {
if (pFill->mode != FILL_MODE_PREV && pFill->mode != FILL_MODE_NEXT && pFill->mode != FILL_MODE_NEAR) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE,
"Range with interval can only used with fill PREV/NEXT/NEAR");
}
if (TSDB_CODE_SUCCESS == code)
code = doCheckFillValues(pCxt, pFill, pSelect->pProjectionList);
} else {
if (FILL_MODE_PREV == pFill->mode || FILL_MODE_NEXT == pFill->mode || FILL_MODE_NEAR == pFill->mode) {
if (pFill->pValues) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "Can't specify fill values");
}
} else {
if (hasRowTsOriginFunc) return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC, "_irowts_origin can only be used with FILL PREV/NEXT/NEAR");
}
code = checkFillValues(pCxt, pFill, pSelect->pProjectionList);
}
}
return code;
}
static int32_t translateInterpAround(STranslateContext* pCxt, SSelectStmt* pSelect) {
int32_t code = 0;
if (pSelect->pRangeAround) {
SRangeAroundNode* pAround = (SRangeAroundNode*)pSelect->pRangeAround;
code = translateExpr(pCxt, &pAround->pInterval);
if (TSDB_CODE_SUCCESS == code) {
if (nodeType(pAround->pInterval) == QUERY_NODE_VALUE) {
SValueNode* pVal = (SValueNode*)pAround->pInterval;
if (pVal->datum.i == 0) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE, "Range interval cannot be 0");
}
int8_t unit = pVal->unit;
if (unit == TIME_UNIT_YEAR || unit == TIME_UNIT_MONTH) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE,
"Unsupported time unit in RANGE clause");
}
} else {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE, "Invalid range interval");
}
}
}
return code;
}
static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (!pSelect->hasInterpFunc) {
if (NULL != pSelect->pRange || NULL != pSelect->pEvery || NULL != pSelect->pFill) {
@ -6227,6 +6305,7 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
}
}
int32_t code = 0;
if (pCxt->createStream) {
if (NULL != pSelect->pRange) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
@ -6238,6 +6317,7 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
"Missing EVERY clause or FILL clause");
}
} else {
if (!pSelect->pRangeAround) {
if (NULL == pSelect->pRange || NULL == pSelect->pEvery || NULL == pSelect->pFill) {
if (pSelect->pRange != NULL && QUERY_NODE_OPERATOR == nodeType(pSelect->pRange) && pSelect->pEvery == NULL) {
// single point interp every can be omitted
@ -6246,15 +6326,31 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
"Missing RANGE clause, EVERY clause or FILL clause");
}
}
} else {
if (pSelect->pEvery) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE,
"Range clause with around interval can't be used with EVERY clause");
}
if (!pSelect->pFill) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE,
"Missing FILL clause");
}
}
}
int32_t code = translateExpr(pCxt, &pSelect->pRange);
code = translateExpr(pCxt, &pSelect->pRange);
if (TSDB_CODE_SUCCESS == code) {
code = translateInterpEvery(pCxt, &pSelect->pEvery);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateInterpFill(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateInterpAround(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkInterpForStream(pCxt, pSelect);
}
return code;
}

View File

@ -973,6 +973,16 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
pInterpFunc->precision = pSelect->precision;
}
if (TSDB_CODE_SUCCESS == code && pSelect->pRangeAround) {
SNode* pRangeInterval = ((SRangeAroundNode*)pSelect->pRangeAround)->pInterval;
if (!pRangeInterval || nodeType(pRangeInterval) != QUERY_NODE_VALUE) {
code = TSDB_CODE_PAR_INTERNAL_ERROR;
} else {
pInterpFunc->rangeInterval = ((SValueNode*)pRangeInterval)->datum.i;
pInterpFunc->rangeIntervalUnit = ((SValueNode*)pRangeInterval)->unit;
}
}
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pInterpFunc->pFuncs, &pInterpFunc->node.pTargets);

View File

@ -1958,6 +1958,8 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
pInterpFunc->intervalUnit = pFuncLogicNode->intervalUnit;
pInterpFunc->precision = pFuncLogicNode->node.precision;
pInterpFunc->pFillValues = NULL;
pInterpFunc->rangeInterval = pFuncLogicNode->rangeInterval;
pInterpFunc->rangeIntervalUnit = pFuncLogicNode->rangeIntervalUnit;
code = nodesCloneNode(pFuncLogicNode->pFillValues, &pInterpFunc->pFillValues);
if (TSDB_CODE_SUCCESS != code) {
code = code;

View File

@ -156,6 +156,11 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsma2.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsma2.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery2.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp_extension.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp_extension.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp_extension.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp_extension.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp_extension.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py

View File

@ -0,0 +1,518 @@
import queue
from random import randrange
import time
import threading
import secrets
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from datetime import timezone
from tzlocal import get_localzone
# from tmqCommon import *
ROUND: int = 500
class TDTestCase:
updatecfgDict = {'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 4, 'debugFlag': 143}
check_failed: bool = False
def __init__(self):
self.vgroups = 4
self.ctbNum = 10
self.rowsPerTbl = 10000
self.duraion = '1h'
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def create_database(self, tsql, dbName, dropFlag=1, vgroups=2, replica=1, duration: str = '1d'):
if dropFlag == 1:
tsql.execute("drop database if exists %s" % (dbName))
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s" % (
dbName, vgroups, replica, duration))
tdLog.debug("complete to create database %s" % (dbName))
return
def create_stable(self, tsql, paraDict):
colString = tdCom.gen_column_type_str(
colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"])
tagString = tdCom.gen_tag_type_str(
tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"])
sqlString = f"create table if not exists %s.%s (%s) tags (%s)" % (
paraDict["dbName"], paraDict["stbName"], colString, tagString)
tdLog.debug("%s" % (sqlString))
tsql.execute(sqlString)
return
def create_ctable(self, tsql=None, dbName='dbx', stbName='stb', ctbPrefix='ctb', ctbNum=1, ctbStartIdx=0):
for i in range(ctbNum):
sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % (dbName, ctbPrefix, i+ctbStartIdx, dbName, stbName, (i+ctbStartIdx) % 5, i+ctbStartIdx + random.randint(
1, 100), i+ctbStartIdx + random.randint(1, 100), i+ctbStartIdx + random.randint(1, 100), i+ctbStartIdx + random.randint(1, 100), i+ctbStartIdx + random.randint(1, 100))
tsql.execute(sqlString)
tdLog.debug("complete to create %d child tables by %s.%s" %
(ctbNum, dbName, stbName))
return
def init_normal_tb(self, tsql, db_name: str, tb_name: str, rows: int, start_ts: int, ts_step: int):
sql = 'CREATE TABLE %s.%s (ts timestamp, c1 INT, c2 INT, c3 INT, c4 double, c5 VARCHAR(255))' % (
db_name, tb_name)
tsql.execute(sql)
sql = 'INSERT INTO %s.%s values' % (db_name, tb_name)
for j in range(rows):
sql += f'(%d, %d,%d,%d,{random.random()},"varchar_%d"),' % (start_ts + j * ts_step + randrange(500), j %
10 + randrange(200), j % 10, j % 10, j % 10 + randrange(100))
tsql.execute(sql)
def insert_data(self, tsql, dbName, ctbPrefix, ctbNum, rowsPerTbl, batchNum, startTs, tsStep):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" % dbName)
pre_insert = "insert into "
sql = pre_insert
for i in range(ctbNum):
rowsBatched = 0
sql += " %s.%s%d values " % (dbName, ctbPrefix, i)
for j in range(rowsPerTbl):
if (i < ctbNum/2):
sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') " % (startTs + j*tsStep + randrange(
500), j % 10 + randrange(100), j % 10 + randrange(200), j % 10, j % 10, j % 10, j % 10, j % 10, j % 10)
else:
sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') " % (
startTs + j*tsStep + randrange(500), j % 10, j % 10, j % 10, j % 10, j % 10, j % 10)
rowsBatched += 1
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s.%s%d values " % (dbName, ctbPrefix, i)
else:
sql = "insert into "
if sql != pre_insert:
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def init_data(self, db: str = 'test', ctb_num: int = 10, rows_per_ctb: int = 10000, start_ts: int = 1537146000000, ts_step: int = 500):
tdLog.printNoPrefix(
"======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': db,
'dropFlag': 1,
'vgroups': 4,
'stbName': 'meters',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count': 1}, {'type': 'BIGINT', 'count': 1}, {'type': 'FLOAT', 'count': 1}, {'type': 'DOUBLE', 'count': 1}, {'type': 'smallint', 'count': 1}, {'type': 'tinyint', 'count': 1}, {'type': 'bool', 'count': 1}, {'type': 'binary', 'len': 10, 'count': 1}, {'type': 'nchar', 'len': 10, 'count': 1}],
'tagSchema': [{'type': 'INT', 'count': 1}, {'type': 'nchar', 'len': 20, 'count': 1}, {'type': 'binary', 'len': 20, 'count': 1}, {'type': 'BIGINT', 'count': 1}, {'type': 'smallint', 'count': 1}, {'type': 'DOUBLE', 'count': 1}],
'ctbPrefix': 't',
'ctbStartIdx': 0,
'ctbNum': ctb_num,
'rowsPerTbl': rows_per_ctb,
'batchNum': 3000,
'startTs': start_ts,
'tsStep': ts_step}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = ctb_num
paraDict['rowsPerTbl'] = rows_per_ctb
tdLog.info("create database")
self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"],
vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion)
tdLog.info("create stb")
self.create_stable(tsql=tdSql, paraDict=paraDict)
tdLog.info("create child tables")
self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"],
stbName=paraDict["stbName"], ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"], ctbStartIdx=paraDict["ctbStartIdx"])
self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],
ctbPrefix=paraDict["ctbPrefix"], ctbNum=paraDict["ctbNum"],
rowsPerTbl=paraDict["rowsPerTbl"], batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"], tsStep=paraDict["tsStep"])
self.init_normal_tb(tdSql, paraDict['dbName'], 'norm_tb',
paraDict['rowsPerTbl'], paraDict['startTs'], paraDict['tsStep'])
def run(self):
self.init_data()
self.test_interp_extension()
def datetime_add_tz(self, dt):
if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None:
return dt.replace(tzinfo=get_localzone())
return dt
def binary_search_ts(self, select_results, ts):
mid = 0
try:
found: bool = False
start = 0
end = len(select_results) - 1
while start <= end:
mid = (start + end) // 2
if self.datetime_add_tz(select_results[mid][0]) == ts:
found = True
return mid
elif self.datetime_add_tz(select_results[mid][0]) < ts:
start = mid + 1
else:
end = mid - 1
if not found:
tdLog.exit(f"cannot find ts in select results {ts} {select_results}")
return start
except Exception as e:
tdLog.debug(f"{select_results[mid][0]}, {ts}, {len(select_results)}, {select_results[mid]}")
self.check_failed = True
tdLog.exit(f"binary_search_ts error: {e}")
def distance(self, ts1, ts2):
return abs(self.datetime_add_tz(ts1) - self.datetime_add_tz(ts2))
## TODO pass last position to avoid search from the beginning
def is_nearest(self, select_results, irowts_origin, irowts):
if len(select_results) <= 1:
return True
try:
#tdLog.debug(f"check is_nearest for: {irowts_origin} {irowts}")
idx = self.binary_search_ts(select_results, irowts_origin)
if idx == 0:
#tdLog.debug(f"prev row: null,cur row: {select_results[idx]}, next row: {select_results[idx + 1]}")
res = self.distance(irowts, select_results[idx][0]) <= self.distance(irowts, select_results[idx + 1][0])
if not res:
tdLog.debug(f"prev row: null,cur row: {select_results[idx]}, next row: {select_results[idx + 1]}, irowts_origin: {irowts_origin}, irowts: {irowts}")
return res
if idx == len(select_results) - 1:
#tdLog.debug(f"prev row: {select_results[idx - 1]},cur row: {select_results[idx]}, next row: null")
res = self.distance(irowts, select_results[idx][0]) <= self.distance(irowts, select_results[idx - 1][0])
if not res:
tdLog.debug(f"prev row: {select_results[idx - 1]},cur row: {select_results[idx]}, next row: null, irowts_origin: {irowts_origin}, irowts: {irowts}")
return res
#tdLog.debug(f"prev row: {select_results[idx - 1]},cur row: {select_results[idx]}, next row: {select_results[idx + 1]}")
res = self.distance(irowts, select_results[idx][0]) <= self.distance(irowts, select_results[idx - 1][0]) and self.distance(irowts, select_results[idx][0]) <= self.distance(irowts, select_results[idx + 1][0])
if not res:
tdLog.debug(f"prev row: {select_results[idx - 1]},cur row: {select_results[idx]}, next row: {select_results[idx + 1]}, irowts_origin: {irowts_origin}, irowts: {irowts}")
return res
except Exception as e:
self.check_failed = True
tdLog.exit(f"is_nearest error: {e}")
## interp_results: _irowts_origin, _irowts, ..., _isfilled
## select_all_results must be sorted by ts in ascending order
def check_result_for_near(self, interp_results, select_all_results, sql, sql_select_all):
#tdLog.debug(f"check_result_for_near for sql: {sql}, sql_select_all{sql_select_all}")
for row in interp_results:
if row[0].tzinfo is None or row[0].tzinfo.utcoffset(row[0]) is None:
irowts_origin = row[0].replace(tzinfo=get_localzone())
irowts = row[1].replace(tzinfo=get_localzone())
else:
irowts_origin = row[0]
irowts = row[1]
if not self.is_nearest(select_all_results, irowts_origin, irowts):
self.check_failed = True
tdLog.exit(f"interp result is not the nearest for row: {row}, {sql}")
def query_routine(self, sql_queue: queue.Queue, output_queue: queue.Queue):
try:
tdcom = TDCom()
cli = tdcom.newTdSql()
while True:
item = sql_queue.get()
if item is None or self.check_failed:
output_queue.put(None)
break
(sql, sql_select_all, _) = item
cli.query(sql, queryTimes=1)
interp_results = cli.queryResult
if sql_select_all is not None:
cli.query(sql_select_all, queryTimes=1)
output_queue.put((sql, interp_results, cli.queryResult, sql_select_all))
cli.close()
except Exception as e:
self.check_failed = True
tdLog.exit(f"query_routine error: {e}")
def interp_check_near_routine(self, select_all_results, output_queue: queue.Queue):
try:
while True:
item = output_queue.get()
if item is None:
break
(sql, interp_results, all_results, sql_select_all) = item
if all_results is not None:
self.check_result_for_near(interp_results, all_results, sql, sql_select_all)
else:
self.check_result_for_near(interp_results, select_all_results, sql, None)
except Exception as e:
self.check_failed = True
tdLog.exit(f"interp_check_near_routine error: {e}")
def create_qt_threads(self, sql_queue: queue.Queue, output_queue: queue.Queue, num: int):
qts = []
for _ in range(0, num):
qt = threading.Thread(target=self.query_routine, args=(sql_queue, output_queue))
qt.start()
qts.append(qt)
return qts
def wait_qt_threads(self, qts: list):
for qt in qts:
qt.join()
### first(ts) | last(ts)
### 2018-09-17 09:00:00.047 | 2018-09-17 10:23:19.863
def test_interp_fill_extension_near(self):
sql = f"select last(ts), c1, c2 from test.t0"
tdSql.query(sql, queryTimes=1)
lastRow = tdSql.queryResult[0]
sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.t0 range('2020-02-01 00:00:00', '2020-02-01 00:01:00') every(1s) fill(near)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(61)
for i in range(0, 61):
tdSql.checkData(i, 0, lastRow[0])
tdSql.checkData(i, 2, lastRow[1])
tdSql.checkData(i, 3, lastRow[2])
tdSql.checkData(i, 4, True)
sql = f"select ts, c1, c2 from test.t0 where ts between '2018-09-17 08:59:59' and '2018-09-17 09:00:06' order by ts asc"
tdSql.query(sql, queryTimes=1)
select_all_results = tdSql.queryResult
sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.t0 range('2018-09-17 09:00:00', '2018-09-17 09:00:05') every(1s) fill(near)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(6)
self.check_result_for_near(tdSql.queryResult, select_all_results, sql, None)
start = 1537146000000
end = 1537151000000
tdSql.query("select ts, c1, c2 from test.t0 order by ts asc", queryTimes=1)
select_all_results = tdSql.queryResult
qt_threads_num = 4
sql_queue = queue.Queue()
output_queue = queue.Queue()
qts = self.create_qt_threads(sql_queue, output_queue, qt_threads_num)
ct = threading.Thread(target=self.interp_check_near_routine, args=(select_all_results, output_queue))
ct.start()
for i in range(0, ROUND):
range_start = random.randint(start, end)
range_end = random.randint(range_start, end)
every = random.randint(1, 15)
#tdLog.debug(f"range_start: {range_start}, range_end: {range_end}")
sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.t0 range({range_start}, {range_end}) every({every}s) fill(near)"
sql_queue.put((sql, None, None))
### no prev only, no next only, no prev and no next, have prev and have next
for i in range(0, ROUND):
range_point = random.randint(start, end)
## all data points are can be filled by near
sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.t0 range({range_point}, 1h) fill(near, 1, 2)"
sql_queue.put((sql, None, None))
for i in range(0, ROUND):
range_start = random.randint(start, end)
range_end = random.randint(range_start, end)
range_where_start = random.randint(start, end)
range_where_end = random.randint(range_where_start, end)
every = random.randint(1, 15)
sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.t0 where ts between {range_where_start} and {range_where_end} range({range_start}, {range_end}) every({every}s) fill(near)"
tdSql.query(f'select to_char(cast({range_where_start} as timestamp), \'YYYY-MM-DD HH24:MI:SS.MS\'), to_char(cast({range_where_end} as timestamp), \'YYYY-MM-DD HH24:MI:SS.MS\')', queryTimes=1)
where_start_str = tdSql.queryResult[0][0]
where_end_str = tdSql.queryResult[0][1]
sql_select_all = f"select ts, c1, c2 from test.t0 where ts between '{where_start_str}' and '{where_end_str}' order by ts asc"
sql_queue.put((sql, sql_select_all, None))
for i in range(0, ROUND):
range_start = random.randint(start, end)
range_end = random.randint(range_start, end)
range_where_start = random.randint(start, end)
range_where_end = random.randint(range_where_start, end)
range_point = random.randint(start, end)
sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.t0 where ts between {range_where_start} and {range_where_end} range({range_point}, 1h) fill(near, 1, 2)"
tdSql.query(f'select to_char(cast({range_where_start} as timestamp), \'YYYY-MM-DD HH24:MI:SS.MS\'), to_char(cast({range_where_end} as timestamp), \'YYYY-MM-DD HH24:MI:SS.MS\')', queryTimes=1)
where_start_str = tdSql.queryResult[0][0]
where_end_str = tdSql.queryResult[0][1]
sql_select_all = f"select ts, c1, c2 from test.t0 where ts between '{where_start_str}' and '{where_end_str}' order by ts asc"
sql_queue.put((sql, sql_select_all, None))
for i in range(0, qt_threads_num):
sql_queue.put(None)
self.wait_qt_threads(qts)
ct.join()
if self.check_failed:
tdLog.exit("interp check near failed")
def test_interp_extension_irowts_origin(self):
sql = f"select _irowts, _irowts_origin, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:01:00') every(1s) fill(near)"
tdSql.query(sql, queryTimes=1)
sql = f"select _irowts, _irowts_origin, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:01:00') every(1s) fill(NULL)"
tdSql.error(sql, -2147473833)
sql = f"select _irowts, _irowts_origin, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:01:00') every(1s) fill(linear)"
tdSql.error(sql, -2147473833)
sql = f"select _irowts, _irowts_origin, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:01:00') every(1s) fill(NULL_F)"
tdSql.error(sql, -2147473833)
def test_interp_fill_extension(self):
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) fill(near, 0, 0)"
tdSql.query(sql, queryTimes=1)
### must specify value
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) fill(near)"
tdSql.error(sql, -2147473915)
### num of fill value mismatch
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) fill(near, 1)"
tdSql.error(sql, -2147473915)
### range with around interval cannot specify two timepoints, currently not supported
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:02:00', 1h) fill(near, 1, 1)"
tdSql.error(sql, -2147473920) ## syntax error
### NULL/linear cannot specify other values
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:02:00') fill(NULL, 1, 1)"
tdSql.error(sql, -2147473920)
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:02:00') fill(linear, 1, 1)"
tdSql.error(sql, -2147473920) ## syntax error
### cannot have every clause with range around
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) every(1s) fill(prev, 1, 1)"
tdSql.error(sql, -2147473827) ## TSDB_CODE_PAR_INVALID_INTERP_CLAUSE
### cannot specify near/prev/next values when using range
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', '2020-02-01 00:01:00') every(1s) fill(near, 1, 1)"
tdSql.error(sql, -2147473915) ## cannot specify values
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00') every(1s) fill(near, 1, 1)"
tdSql.error(sql, -2147473915) ## cannot specify values
### when range around interval is set, only prev/next/near is supported
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) fill(NULL, 1, 1)"
tdSql.error(sql, -2147473920)
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) fill(NULL)"
tdSql.error(sql, -2147473861) ## TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) fill(linear, 1, 1)"
tdSql.error(sql, -2147473920)
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1h) fill(linear)"
tdSql.error(sql, -2147473861) ## TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE
### range interval cannot be 0
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 0h) fill(near, 1, 1)"
tdSql.error(sql, -2147473861) ## TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1y) fill(near, 1, 1)"
tdSql.error(sql, -2147473915) ## TSDB_CODE_PAR_WRONG_VALUE_TYPE
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1n) fill(near, 1, 1)"
tdSql.error(sql, -2147473915) ## TSDB_CODE_PAR_WRONG_VALUE_TYPE
sql = f"select _irowts, interp(c1), interp(c2), _isfilled from test.meters where ts between '2020-02-01 00:00:00' and '2020-02-01 00:00:00' range('2020-02-01 00:00:00', 1h) fill(near, 1, 1)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(0)
### first(ts) | last(ts)
### 2018-09-17 09:00:00.047 | 2018-09-17 10:23:19.863
sql = "select to_char(first(ts), 'YYYY-MM-DD HH24:MI:SS.MS') from test.meters"
tdSql.query(sql, queryTimes=1)
first_ts = tdSql.queryResult[0][0]
sql = "select to_char(last(ts), 'YYYY-MM-DD HH24:MI:SS.MS') from test.meters"
tdSql.query(sql, queryTimes=1)
last_ts = tdSql.queryResult[0][0]
sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2020-02-01 00:00:00', 1d) fill(near, 1, 2)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(1)
tdSql.checkData(0, 0, None)
tdSql.checkData(0, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 3, 2)
tdSql.checkData(0, 4, True)
sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2018-09-18 10:25:00', 1d) fill(prev, 3, 4)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(1)
tdSql.checkData(0, 0, None)
tdSql.checkData(0, 1, '2018-09-18 10:25:00.000')
tdSql.checkData(0, 2, 3)
tdSql.checkData(0, 3, 4)
tdSql.checkData(0, 4, True)
sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2018-09-16 08:25:00', 1d) fill(next, 5, 6)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(1)
tdSql.checkData(0, 0, None)
tdSql.checkData(0, 1, '2018-09-16 08:25:00.000')
tdSql.checkData(0, 2, 5)
tdSql.checkData(0, 3, 6)
tdSql.checkData(0, 4, True)
sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2018-09-16 09:00:01', 1d) fill(next, 1, 2)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(1)
tdSql.checkData(0, 0, first_ts)
tdSql.checkData(0, 1, '2018-09-16 09:00:01')
tdSql.checkData(0, 4, True)
sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.meters range('2018-09-18 10:23:19', 1d) fill(prev, 1, 2)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, '2018-09-18 10:23:19')
tdSql.checkData(0, 4, True)
sql = f"select _irowts_origin, _irowts, interp(c1), interp(c2), _isfilled from test.meters range('{last_ts}', 1a) fill(next, 1, 2)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, last_ts)
tdSql.checkData(0, 4, False)
def test_interval_fill_extension(self):
## not allowed
sql = f"select count(*) from test.meters interval(1s) fill(near)"
tdSql.error(sql, -2147473920) ## syntax error
sql = f"select count(*) from test.meters interval(1s) fill(prev, 1)"
tdSql.error(sql, -2147473920) ## syntax error
sql = f"select count(*) from test.meters interval(1s) fill(next, 1)"
tdSql.error(sql, -2147473920) ## syntax error
def test_interp_fill_extension_stream(self):
## near is not supported
sql = f"create stream s1 trigger force_window_close into test.s_res_tb as select _irowts, interp(c1), interp(c2)from test.meters partition by tbname every(1s) fill(near);"
tdSql.error(sql, -2147473851) ## TSDB_CODE_PAR_INVALID_STREAM_QUERY
## _irowts_origin is not support
sql = f"create stream s1 trigger force_window_close into test.s_res_tb as select _irowts_origin, interp(c1), interp(c2)from test.meters partition by tbname every(1s) fill(prev);"
tdSql.error(sql, -2147473851) ## TSDB_CODE_PAR_INVALID_STREAM_QUERY
sql = f"create stream s1 trigger force_window_close into test.s_res_tb as select _irowts, interp(c1), interp(c2)from test.meters partition by tbname every(1s) fill(next, 1, 1);"
tdSql.error(sql, -2147473915) ## cannot specify values
def test_interp_extension(self):
self.test_interp_fill_extension_near()
self.test_interp_extension_irowts_origin()
self.test_interp_fill_extension()
self.test_interval_fill_extension()
self.test_interp_fill_extension_stream()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())