Merge pull request #21144 from taosdata/feat/TD-19801

feat(query): support interp with super table
This commit is contained in:
dapan1121 2023-05-12 11:37:20 +08:00 committed by GitHub
commit 9394afec68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 875 additions and 133 deletions

View File

@ -55,7 +55,7 @@ window_clause: {
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)] | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)]
interp_clause: interp_clause:
RANGE(ts_val, ts_val), EVERY(every_val), FILL(fill_mod_and_val) RANGE(ts_val, ts_val) EVERY(every_val) FILL(fill_mod_and_val)
partition_by_clause: partition_by_clause:
PARTITION BY expr [, expr] ... PARTITION BY expr [, expr] ...

View File

@ -886,7 +886,7 @@ INTERP(expr)
- The output time range of `INTERP` is specified by `RANGE(timestamp1,timestamp2)` parameter, with timestamp1 <= timestamp2. timestamp1 is the starting point of the output time range and must be specified. timestamp2 is the ending point of the output time range and must be specified. - The output time range of `INTERP` is specified by `RANGE(timestamp1,timestamp2)` parameter, with timestamp1 <= timestamp2. timestamp1 is the starting point of the output time range and must be specified. timestamp2 is the ending point of the output time range and must be specified.
- The number of rows in the result set of `INTERP` is determined by the parameter `EVERY(time_unit)`. Starting from timestamp1, one interpolation is performed for every time interval specified `time_unit` parameter. The parameter `time_unit` must be an integer, with no quotes, with a time unit of: a(millisecond)), s(second), m(minute), h(hour), d(day), or w(week). For example, `EVERY(500a)` will interpolate every 500 milliseconds. - The number of rows in the result set of `INTERP` is determined by the parameter `EVERY(time_unit)`. Starting from timestamp1, one interpolation is performed for every time interval specified `time_unit` parameter. The parameter `time_unit` must be an integer, with no quotes, with a time unit of: a(millisecond)), s(second), m(minute), h(hour), d(day), or w(week). For example, `EVERY(500a)` will interpolate every 500 milliseconds.
- Interpolation is performed based on `FILL` parameter. For more information about FILL clause, see [FILL Clause](../distinguished/#fill-clause). - Interpolation is performed based on `FILL` parameter. For more information about FILL clause, see [FILL Clause](../distinguished/#fill-clause).
- `INTERP` can only be used to interpolate in single timeline. So it must be used with `partition by tbname` when it's used on a STable. - `INTERP` can be applied to supertable by interpolating primary key sorted data of all its childtables. It can also be used with `partition by tbname` when applied to supertable to generate interpolation on each single timeline.
- Pseudocolumn `_irowts` can be used along with `INTERP` to return the timestamps associated with interpolation points(support after version 3.0.2.0). - Pseudocolumn `_irowts` can be used along with `INTERP` to return the timestamps associated with interpolation points(support after version 3.0.2.0).
- Pseudocolumn `_isfilled` can be used along with `INTERP` to indicate whether the results are original records or data points generated by interpolation algorithm(support after version 3.0.3.0). - Pseudocolumn `_isfilled` can be used along with `INTERP` to indicate whether the results are original records or data points generated by interpolation algorithm(support after version 3.0.3.0).

View File

@ -55,7 +55,7 @@ window_clause: {
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)] | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)]
interp_clause: interp_clause:
RANGE(ts_val, ts_val), EVERY(every_val), FILL(fill_mod_and_val) RANGE(ts_val, ts_val) EVERY(every_val) FILL(fill_mod_and_val)
partition_by_clause: partition_by_clause:
PARTITION BY expr [, expr] ... PARTITION BY expr [, expr] ...

View File

@ -888,7 +888,7 @@ INTERP(expr)
- INTERP 的输出时间范围根据 RANGE(timestamp1,timestamp2)字段来指定,需满足 timestamp1 <= timestamp2。其中 timestamp1必选值为输出时间范围的起始值即如果 timestamp1 时刻符合插值条件则 timestamp1 为输出的第一条记录timestamp2必选值为输出时间范围的结束值即输出的最后一条记录的 timestamp 不能大于 timestamp2。 - INTERP 的输出时间范围根据 RANGE(timestamp1,timestamp2)字段来指定,需满足 timestamp1 <= timestamp2。其中 timestamp1必选值为输出时间范围的起始值即如果 timestamp1 时刻符合插值条件则 timestamp1 为输出的第一条记录timestamp2必选值为输出时间范围的结束值即输出的最后一条记录的 timestamp 不能大于 timestamp2。
- INTERP 根据 EVERY(time_unit) 字段来确定输出时间范围内的结果条数,即从 timestamp1 开始每隔固定长度的时间time_unit 值进行插值time_unit 可取值时间单位1a(毫秒)1s(秒)1m(分)1h(小时)1d(天)1w(周)。例如 EVERY(500a) 将对于指定数据每500毫秒间隔进行一次插值. - INTERP 根据 EVERY(time_unit) 字段来确定输出时间范围内的结果条数,即从 timestamp1 开始每隔固定长度的时间time_unit 值进行插值time_unit 可取值时间单位1a(毫秒)1s(秒)1m(分)1h(小时)1d(天)1w(周)。例如 EVERY(500a) 将对于指定数据每500毫秒间隔进行一次插值.
- INTERP 根据 FILL 字段来决定在每个符合输出条件的时刻如何进行插值。关于 FILL 子句如何使用请参考 [FILL 子句](../distinguished/#fill-子句) - INTERP 根据 FILL 字段来决定在每个符合输出条件的时刻如何进行插值。关于 FILL 子句如何使用请参考 [FILL 子句](../distinguished/#fill-子句)
- INTERP 只能在一个时间序列内进行插值,因此当作用于超级表时必须跟 partition by tbname 一起使用 - INTERP 作用于超级表时, 会将该超级表下的所有子表数据按照主键列排序后进行插值计算,也可以搭配 PARTITION BY tbname 使用,将结果强制规约到单个时间线
- INTERP 可以与伪列 _irowts 一起使用,返回插值点所对应的时间戳(3.0.2.0版本以后支持)。 - INTERP 可以与伪列 _irowts 一起使用,返回插值点所对应的时间戳(3.0.2.0版本以后支持)。
- INTERP 可以与伪列 _isfilled 一起使用,显示返回结果是否为原始记录或插值算法产生的数据(3.0.3.0版本以后支持)。 - INTERP 可以与伪列 _isfilled 一起使用,显示返回结果是否为原始记录或插值算法产生的数据(3.0.3.0版本以后支持)。

View File

@ -36,6 +36,11 @@ typedef struct STimeSliceOperatorInfo {
SColumn tsCol; // primary timestamp column SColumn tsCol; // primary timestamp column
SExprSupp scalarSup; // scalar calculation SExprSupp scalarSup; // scalar calculation
struct SFillColInfo* pFillColInfo; // fill column info struct SFillColInfo* pFillColInfo; // fill column info
int64_t prevTs;
bool prevTsSet;
uint64_t groupId;
SGroupKeys* pPrevGroupKey;
SSDataBlock* pNextGroupRes;
} STimeSliceOperatorInfo; } STimeSliceOperatorInfo;
static void destroyTimeSliceOperatorInfo(void* param); static void destroyTimeSliceOperatorInfo(void* param);
@ -166,12 +171,49 @@ static bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) {
return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0); return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0);
} }
static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumnInfoData* pTsCol,
int32_t curIndex, int32_t rows) {
int64_t currentTs = *(int64_t*)colDataGetData(pTsCol, curIndex);
if (currentTs > pSliceInfo->win.ekey) {
return false;
}
if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevTs)) {
return true;
}
pSliceInfo->prevTsSet = true;
pSliceInfo->prevTs = currentTs;
if (currentTs == pSliceInfo->win.ekey && curIndex < rows - 1) {
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1);
if (currentTs == nextTs) {
return true;
}
}
return false;
}
static bool isInterpFunc(SExprInfo* pExprInfo) {
int32_t functionType = pExprInfo->pExpr->_function.functionType;
return (functionType == FUNCTION_TYPE_INTERP);
}
static bool isGroupKeyFunc(SExprInfo* pExprInfo) {
int32_t functionType = pExprInfo->pExpr->_function.functionType;
return (functionType == FUNCTION_TYPE_GROUP_KEY);
}
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
bool beforeTs) { SSDataBlock* pSrcBlock, int32_t index, bool beforeTs) {
int32_t rows = pResBlock->info.rows; int32_t rows = pResBlock->info.rows;
timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock); timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock);
// todo set the correct primary timestamp column // todo set the correct primary timestamp column
// output the result // output the result
bool hasInterp = true; bool hasInterp = true;
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
@ -187,6 +229,30 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
bool isFilled = true; bool isFilled = true;
colDataAppend(pDst, pResBlock->info.rows, (char*)&isFilled, false); colDataAppend(pDst, pResBlock->info.rows, (char*)&isFilled, false);
continue; continue;
} else if (!isInterpFunc(pExprInfo)) {
if (isGroupKeyFunc(pExprInfo)) {
if (pSrcBlock != NULL) {
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
if (colDataIsNull_s(pSrc, index)) {
colDataSetNULL(pDst, pResBlock->info.rows);
continue;
}
char* v = colDataGetData(pSrc, index);
colDataSetVal(pDst, pResBlock->info.rows, v, false);
} else {
// use stored group key
SGroupKeys* pkey = pSliceInfo->pPrevGroupKey;
if (pkey->isNull == false) {
colDataSetVal(pDst, rows, pkey->pData, false);
} else {
colDataSetNULL(pDst, rows);
}
}
}
continue;
} }
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
@ -412,7 +478,31 @@ static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExprSup) {
if (pInfo->pPrevGroupKey != NULL) {
return TSDB_CODE_SUCCESS;
}
pInfo->pPrevGroupKey = taosMemoryCalloc(1, sizeof(SGroupKeys));
if (pInfo->pPrevGroupKey == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[i];
if (isGroupKeyFunc(pExprInfo)) {
pInfo->pPrevGroupKey->bytes = pExprInfo->base.resSchema.bytes;
pInfo->pPrevGroupKey->type = pExprInfo->base.resSchema.type;
pInfo->pPrevGroupKey->isNull = false;
pInfo->pPrevGroupKey->pData = taosMemoryCalloc(1, pInfo->pPrevGroupKey->bytes);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock, SExprSupp* pExprSup) {
int32_t code; int32_t code;
code = initPrevRowsKeeper(pInfo, pBlock); code = initPrevRowsKeeper(pInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -429,9 +519,202 @@ static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
code = initGroupKeyKeeper(pInfo, pExprSup);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t resetPrevRowsKeeper(STimeSliceOperatorInfo* pInfo) {
if (pInfo->pPrevRow == NULL) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
SGroupKeys *pKey = taosArrayGet(pInfo->pPrevRow, i);
pKey->isNull = false;
}
pInfo->isPrevRowSet = false;
return TSDB_CODE_SUCCESS;
}
static int32_t resetNextRowsKeeper(STimeSliceOperatorInfo* pInfo) {
if (pInfo->pNextRow == NULL) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
SGroupKeys *pKey = taosArrayGet(pInfo->pPrevRow, i);
pKey->isNull = false;
}
pInfo->isNextRowSet = false;
return TSDB_CODE_SUCCESS;
}
static int32_t resetFillLinearInfo(STimeSliceOperatorInfo* pInfo) {
if (pInfo->pLinearInfo == NULL) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
SFillLinearInfo *pLinearInfo = taosArrayGet(pInfo->pLinearInfo, i);
pLinearInfo->start.key = INT64_MIN;
pLinearInfo->end.key = INT64_MIN;
pLinearInfo->isStartSet = false;
pLinearInfo->isEndSet = false;
}
return TSDB_CODE_SUCCESS;
}
static int32_t resetKeeperInfo(STimeSliceOperatorInfo* pInfo) {
resetPrevRowsKeeper(pInfo);
resetNextRowsKeeper(pInfo);
resetFillLinearInfo(pInfo);
return TSDB_CODE_SUCCESS;
}
static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock,
SExecTaskInfo* pTaskInfo) {
SSDataBlock* pResBlock = pSliceInfo->pRes;
SInterval* pInterval = &pSliceInfo->interval;
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
// check for duplicate timestamps
if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP);
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
break;
}
if (ts == pSliceInfo->current) {
addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
doKeepPrevRows(pSliceInfo, pBlock, i);
doKeepLinearInfo(pSliceInfo, pBlock, i);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
break;
}
} else if (ts < pSliceInfo->current) {
// in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
doKeepPrevRows(pSliceInfo, pBlock, i);
doKeepLinearInfo(pSliceInfo, pBlock, i);
if (i < pBlock->info.rows - 1) {
// in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i + 1);
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false) &&
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
break;
} else {
pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
pInterval->precision);
}
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
break;
}
} else {
// ignore current row, and do nothing
}
} else { // it is the last row of current block
doKeepPrevRows(pSliceInfo, pBlock, i);
}
} else { // ts > pSliceInfo->current
// in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i);
doKeepLinearInfo(pSliceInfo, pBlock, i);
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true) &&
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
break;
} else {
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}
}
// add current row if timestamp match
if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) {
addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}
doKeepPrevRows(pSliceInfo, pBlock, i);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
break;
}
}
}
}
static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) {
SSDataBlock* pResBlock = pSliceInfo->pRes;
SInterval* pInterval = &pSliceInfo->interval;
while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT &&
pSliceInfo->fillType != TSDB_FILL_LINEAR) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}
}
static void copyPrevGroupKey(SExprSupp* pExprSup, SGroupKeys* pGroupKey, SSDataBlock* pSrcBlock) {
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
if (isGroupKeyFunc(pExprInfo)) {
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
if (colDataIsNull_s(pSrc, 0)) {
pGroupKey->isNull = true;
break;
}
char* v = colDataGetData(pSrc, 0);
if (IS_VAR_DATA_TYPE(pGroupKey->type)) {
memcpy(pGroupKey->pData, v, varDataTLen(v));
} else {
memcpy(pGroupKey->pData, v, pGroupKey->bytes);
}
pGroupKey->isNull = false;
break;
}
}
}
static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
pSliceInfo->current = pSliceInfo->win.skey;
pSliceInfo->prevTsSet = false;
resetKeeperInfo(pSliceInfo);
}
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
@ -449,118 +732,62 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
blockDataCleanup(pResBlock); blockDataCleanup(pResBlock);
while (1) {
if (pSliceInfo->pNextGroupRes != NULL) {
setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true);
doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo);
copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pSliceInfo->pNextGroupRes);
pSliceInfo->pNextGroupRes = NULL;
}
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
setOperatorCompleted(pOperator);
break; break;
} }
if (pSliceInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
pSliceInfo->groupId = pBlock->info.id.groupId;
} else {
if (pSliceInfo->groupId != pBlock->info.id.groupId) {
pSliceInfo->groupId = pBlock->info.id.groupId;
pSliceInfo->pNextGroupRes = pBlock;
break;
}
}
if (pSliceInfo->scalarSup.pExprInfo != NULL) { if (pSliceInfo->scalarSup.pExprInfo != NULL) {
SExprSupp* pExprSup = &pSliceInfo->scalarSup; SExprSupp* pExprSup = &pSliceInfo->scalarSup;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
} }
int32_t code = initKeeperInfo(pSliceInfo, pBlock); int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
setOperatorCompleted(pOperator);
break;
}
if (ts == pSliceInfo->current) {
addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
doKeepPrevRows(pSliceInfo, pBlock, i);
doKeepLinearInfo(pSliceInfo, pBlock, i);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
setOperatorCompleted(pOperator);
break;
}
} else if (ts < pSliceInfo->current) {
// in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
doKeepPrevRows(pSliceInfo, pBlock, i);
doKeepLinearInfo(pSliceInfo, pBlock, i);
if (i < pBlock->info.rows - 1) {
// in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i + 1);
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false) &&
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
break;
} else {
pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
pInterval->precision);
}
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
setOperatorCompleted(pOperator);
break;
}
} else {
// ignore current row, and do nothing
}
} else { // it is the last row of current block
doKeepPrevRows(pSliceInfo, pBlock, i);
}
} else { // ts > pSliceInfo->current
// in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
doKeepNextRows(pSliceInfo, pBlock, i);
doKeepLinearInfo(pSliceInfo, pBlock, i);
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, true) &&
pSliceInfo->fillType == TSDB_FILL_LINEAR) {
break;
} else {
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}
}
// add current row if timestamp match
if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) {
addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}
doKeepPrevRows(pSliceInfo, pBlock, i);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
setOperatorCompleted(pOperator);
break;
}
}
}
} }
// check if need to interpolate after last datablock // check if need to interpolate after last datablock
// except for fill(next), fill(linear) // except for fill(next), fill(linear)
while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && genInterpAfterDataBlock(pSliceInfo, pOperator, 0);
pSliceInfo->fillType != TSDB_FILL_LINEAR) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
if (pOperator->status == OP_EXEC_DONE) {
break;
}
// restore initial value for next group
resetTimesliceInfo(pSliceInfo);
if (pResBlock->info.rows >= 4096) {
break;
}
}
// restore the value // restore the value
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
@ -612,6 +839,11 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->win = pInterpPhyNode->timeRange; pInfo->win = pInterpPhyNode->timeRange;
pInfo->interval.interval = pInterpPhyNode->interval; pInfo->interval.interval = pInterpPhyNode->interval;
pInfo->current = pInfo->win.skey; pInfo->current = pInfo->win.skey;
pInfo->prevTsSet = false;
pInfo->prevTs = 0;
pInfo->groupId = 0;
pInfo->pPrevGroupKey = NULL;
pInfo->pNextGroupRes = NULL;
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
@ -659,6 +891,10 @@ void destroyTimeSliceOperatorInfo(void* param) {
taosMemoryFree(pKey->end.val); taosMemoryFree(pKey->end.val);
} }
taosArrayDestroy(pInfo->pLinearInfo); taosArrayDestroy(pInfo->pLinearInfo);
taosMemoryFree(pInfo->pPrevGroupKey->pData);
taosMemoryFree(pInfo->pPrevGroupKey);
cleanupExprSupp(&pInfo->scalarSup); cleanupExprSupp(&pInfo->scalarSup);
for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) { for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) {

View File

@ -1518,9 +1518,7 @@ static int32_t translateInterpFunc(STranslateContext* pCxt, SFunctionNode* pFunc
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt; SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
SNode* pTable = pSelect->pFromTable; SNode* pTable = pSelect->pFromTable;
if ((NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) || if ((NULL != pTable && QUERY_NODE_REAL_TABLE != nodeType(pTable))) {
(TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType)))) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE, return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
"%s is only supported in single table query", pFunc->functionName); "%s is only supported in single table query", pFunc->functionName);
} }

View File

@ -23,6 +23,8 @@ class TDTestCase:
stbname = "stb" stbname = "stb"
ctbname1 = "ctb1" ctbname1 = "ctb1"
ctbname2 = "ctb2" ctbname2 = "ctb2"
ctbname3 = "ctb3"
num_of_ctables = 3
tdSql.prepare() tdSql.prepare()
@ -816,17 +818,26 @@ class TDTestCase:
) )
tdSql.execute( tdSql.execute(
f'''create table if not exists {dbname}.{ctbname2} using {dbname}.{stbname} tags(1) f'''create table if not exists {dbname}.{ctbname2} using {dbname}.{stbname} tags(2)
''' '''
) )
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:05', 5, 5, 5, 5, 5.0, 5.0, true, 'varchar', 'nchar')") tdSql.execute(
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:10', 10, 10, 10, 10, 10.0, 10.0, true, 'varchar', 'nchar')") f'''create table if not exists {dbname}.{ctbname3} using {dbname}.{stbname} tags(3)
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')") '''
)
tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-02 00:00:05', 5, 5, 5, 5, 5.0, 5.0, true, 'varchar', 'nchar')") tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:01', 1, 1, 1, 1, 1.0, 1.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-02 00:00:10', 10, 10, 10, 10, 10.0, 10.0, true, 'varchar', 'nchar')") tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:07', 7, 7, 7, 7, 7.0, 7.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-02 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')") tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:13', 13, 13, 13, 13, 13.0, 13.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-01 00:00:03', 3, 3, 3, 3, 3.0, 3.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-01 00:00:09', 9, 9, 9, 9, 9.0, 9.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname2} values ('2020-02-01 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname3} values ('2020-02-01 00:00:05', 5, 5, 5, 5, 5.0, 5.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname3} values ('2020-02-01 00:00:11', 11, 11, 11, 11, 11.0, 11.0, true, 'varchar', 'nchar')")
tdSql.execute(f"insert into {dbname}.{ctbname3} values ('2020-02-01 00:00:17', 17, 17, 17, 17, 17.0, 17.0, true, 'varchar', 'nchar')")
tdSql.execute(f"flush database {dbname}"); tdSql.execute(f"flush database {dbname}");
@ -834,7 +845,7 @@ class TDTestCase:
# test fill null # test fill null
## | {. | | .} | ## | {. | | .} |
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:05') every(1d) fill(null)") tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:06') every(1d) fill(null)")
tdSql.checkRows(11) tdSql.checkRows(11)
tdSql.checkData(0, 0, 5) tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, None) tdSql.checkData(1, 0, None)
@ -881,7 +892,7 @@ class TDTestCase:
# test fill value # test fill value
## | {. | | .} | ## | {. | | .} |
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:05') every(1d) fill(value, 1)") tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:06') every(1d) fill(value, 1)")
tdSql.checkRows(11) tdSql.checkRows(11)
tdSql.checkData(0, 0, 5) tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, 1) tdSql.checkData(1, 0, 1)
@ -895,7 +906,7 @@ class TDTestCase:
tdSql.checkData(9, 0, 1) tdSql.checkData(9, 0, 1)
tdSql.checkData(10, 0, 15) tdSql.checkData(10, 0, 15)
## | . | {} | . | # | . | {} | . |
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-03 00:00:05', '2020-02-07 00:00:05') every(1d) fill(value, 1)") tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-03 00:00:05', '2020-02-07 00:00:05') every(1d) fill(value, 1)")
tdSql.checkRows(5) tdSql.checkRows(5)
tdSql.checkData(0, 0, 1) tdSql.checkData(0, 0, 1)
@ -928,7 +939,7 @@ class TDTestCase:
# test fill prev # test fill prev
## | {. | | .} | ## | {. | | .} |
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:05') every(1d) fill(prev)") tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:06') every(1d) fill(prev)")
tdSql.checkRows(11) tdSql.checkRows(11)
tdSql.checkData(0, 0, 5) tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, 5) tdSql.checkData(1, 0, 5)
@ -973,7 +984,7 @@ class TDTestCase:
# test fill next # test fill next
## | {. | | .} | ## | {. | | .} |
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:05') every(1d) fill(next)") tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:06') every(1d) fill(next)")
tdSql.checkRows(11) tdSql.checkRows(11)
tdSql.checkData(0, 0, 5) tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, 15) tdSql.checkData(1, 0, 15)
@ -1015,7 +1026,7 @@ class TDTestCase:
# test fill linear # test fill linear
## | {. | | .} | ## | {. | | .} |
tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:05') every(1d) fill(linear)") tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-11 00:00:06') every(1d) fill(linear)")
tdSql.checkRows(11) tdSql.checkRows(11)
tdSql.checkData(0, 0, 5) tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, 6) tdSql.checkData(1, 0, 6)
@ -2391,19 +2402,516 @@ class TDTestCase:
tdLog.printNoPrefix("==========step13:stable cases") tdLog.printNoPrefix("==========step13:test stable cases")
tdSql.error(f"select interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(null)") # select interp from supertable
#tdSql.checkRows(13) tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
tdSql.checkRows(19)
#tdSql.query(f"select interp(c0) from {dbname}.{ctbname1} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(null)") tdSql.checkData(0, 2, None)
#tdSql.checkRows(13) tdSql.checkData(1, 2, 1)
tdSql.checkData(2, 2, None)
tdSql.checkData(3, 2, 3)
tdSql.checkData(4, 2, None)
tdSql.checkData(5, 2, 5)
tdSql.checkData(6, 2, None)
tdSql.checkData(7, 2, 7)
tdSql.checkData(8, 2, None)
tdSql.checkData(9, 2, 9)
tdSql.checkData(10, 2, None)
tdSql.checkData(11, 2, 11)
tdSql.checkData(12, 2, None)
tdSql.checkData(13, 2, 13)
tdSql.checkData(14, 2, None)
tdSql.checkData(15, 2, 15)
tdSql.checkData(16, 2, None)
tdSql.checkData(17, 2, 17)
tdSql.checkData(18, 2, None)
tdSql.error(f"select interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:04', '2020-02-02 00:00:16') every(1s) fill(null)") tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(value, 0)")
#tdSql.checkRows(13) tdSql.checkRows(19)
tdSql.checkData(0, 2, 0)
tdSql.checkData(1, 2, 1)
tdSql.checkData(2, 2, 0)
tdSql.checkData(3, 2, 3)
tdSql.checkData(4, 2, 0)
tdSql.checkData(5, 2, 5)
tdSql.checkData(6, 2, 0)
tdSql.checkData(7, 2, 7)
tdSql.checkData(8, 2, 0)
tdSql.checkData(9, 2, 9)
tdSql.checkData(10, 2, 0)
tdSql.checkData(11, 2, 11)
tdSql.checkData(12, 2, 0)
tdSql.checkData(13, 2, 13)
tdSql.checkData(14, 2, 0)
tdSql.checkData(15, 2, 15)
tdSql.checkData(16, 2, 0)
tdSql.checkData(17, 2, 17)
tdSql.checkData(18, 2, 0)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(prev)")
tdSql.checkRows(18)
tdSql.checkData(0, 0, '2020-02-01 00:00:01.000')
tdSql.checkData(0, 1, False)
tdSql.checkData(0, 2, 1)
tdSql.checkData(1, 2, 1)
tdSql.checkData(2, 2, 3)
tdSql.checkData(3, 2, 3)
tdSql.checkData(4, 2, 5)
tdSql.checkData(5, 2, 5)
tdSql.checkData(6, 2, 7)
tdSql.checkData(7, 2, 7)
tdSql.checkData(8, 2, 9)
tdSql.checkData(9, 2, 9)
tdSql.checkData(10, 2, 11)
tdSql.checkData(11, 2, 11)
tdSql.checkData(12, 2, 13)
tdSql.checkData(13, 2, 13)
tdSql.checkData(14, 2, 15)
tdSql.checkData(15, 2, 15)
tdSql.checkData(16, 2, 17)
tdSql.checkData(17, 2, 17)
tdSql.checkData(17, 0, '2020-02-01 00:00:18.000')
tdSql.checkData(17, 1, True)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(18)
tdSql.checkData(0, 0, '2020-02-01 00:00:00.000')
tdSql.checkData(0, 1, True)
tdSql.checkData(0, 2, 1)
tdSql.checkData(1, 2, 1)
tdSql.checkData(2, 2, 3)
tdSql.checkData(3, 2, 3)
tdSql.checkData(4, 2, 5)
tdSql.checkData(5, 2, 5)
tdSql.checkData(6, 2, 7)
tdSql.checkData(7, 2, 7)
tdSql.checkData(8, 2, 9)
tdSql.checkData(9, 2, 9)
tdSql.checkData(10, 2, 11)
tdSql.checkData(11, 2, 11)
tdSql.checkData(12, 2, 13)
tdSql.checkData(13, 2, 13)
tdSql.checkData(14, 2, 15)
tdSql.checkData(15, 2, 15)
tdSql.checkData(16, 2, 17)
tdSql.checkData(17, 2, 17)
tdSql.checkData(17, 0, '2020-02-01 00:00:17.000')
tdSql.checkData(17, 1, False)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(17)
tdSql.checkData(0, 2, 1)
tdSql.checkData(1, 2, 2)
tdSql.checkData(2, 2, 3)
tdSql.checkData(3, 2, 4)
tdSql.checkData(4, 2, 5)
tdSql.checkData(5, 2, 6)
tdSql.checkData(6, 2, 7)
tdSql.checkData(7, 2, 8)
tdSql.checkData(8, 2, 9)
tdSql.checkData(9, 2, 10)
tdSql.checkData(10, 2, 11)
tdSql.checkData(11, 2, 12)
tdSql.checkData(12, 2, 13)
tdSql.checkData(13, 2, 14)
tdSql.checkData(14, 2, 15)
tdSql.checkData(15, 2, 16)
tdSql.checkData(16, 2, 17)
# select interp from supertable partition by tbname
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
point_idx = {1, 7, 13, 22, 28, 34, 43, 49, 55}
point_dict = {1:1, 7:7, 13:13, 22:3, 28:9, 34:15, 43:5, 49:11, 55:17}
rows_per_partition = 19
tdSql.checkRows(rows_per_partition * num_of_ctables)
for i in range(num_of_ctables):
for j in range(rows_per_partition):
row = j + i * rows_per_partition
tdSql.checkData(row, 0, f'ctb{i + 1}')
tdSql.checkData(j, 1, f'2020-02-01 00:00:{j}.000')
if row in point_idx:
tdSql.checkData(row, 2, False)
else:
tdSql.checkData(row, 2, True)
if row in point_idx:
tdSql.checkData(row, 3, point_dict[row])
else:
tdSql.checkData(row, 3, None)
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(value, 0)")
point_idx = {1, 7, 13, 22, 28, 34, 43, 49, 55}
point_dict = {1:1, 7:7, 13:13, 22:3, 28:9, 34:15, 43:5, 49:11, 55:17}
rows_per_partition = 19
tdSql.checkRows(rows_per_partition * num_of_ctables)
for i in range(num_of_ctables):
for j in range(rows_per_partition):
row = j + i * rows_per_partition
tdSql.checkData(row, 0, f'ctb{i + 1}')
tdSql.checkData(j, 1, f'2020-02-01 00:00:{j}.000')
if row in point_idx:
tdSql.checkData(row, 2, False)
else:
tdSql.checkData(row, 2, True)
if row in point_idx:
tdSql.checkData(row, 3, point_dict[row])
else:
tdSql.checkData(row, 3, 0)
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(prev)")
tdSql.checkRows(48)
for i in range(0, 18):
tdSql.checkData(i, 0, 'ctb1')
for i in range(18, 34):
tdSql.checkData(i, 0, 'ctb2')
for i in range(34, 48):
tdSql.checkData(i, 0, 'ctb3')
tdSql.checkData(0, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(17, 1, '2020-02-01 00:00:18.000')
tdSql.checkData(18, 1, '2020-02-01 00:00:03.000')
tdSql.checkData(33, 1, '2020-02-01 00:00:18.000')
tdSql.checkData(34, 1, '2020-02-01 00:00:05.000')
tdSql.checkData(47, 1, '2020-02-01 00:00:18.000')
for i in range(0, 6):
tdSql.checkData(i, 3, 1)
for i in range(6, 12):
tdSql.checkData(i, 3, 7)
for i in range(12, 18):
tdSql.checkData(i, 3, 13)
for i in range(18, 24):
tdSql.checkData(i, 3, 3)
for i in range(24, 30):
tdSql.checkData(i, 3, 9)
for i in range(30, 34):
tdSql.checkData(i, 3, 15)
for i in range(34, 40):
tdSql.checkData(i, 3, 5)
for i in range(40, 46):
tdSql.checkData(i, 3, 11)
for i in range(46, 48):
tdSql.checkData(i, 3, 17)
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(48)
for i in range(0, 14):
tdSql.checkData(i, 0, 'ctb1')
for i in range(14, 30):
tdSql.checkData(i, 0, 'ctb2')
for i in range(30, 48):
tdSql.checkData(i, 0, 'ctb3')
tdSql.checkData(0, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(13, 1, '2020-02-01 00:00:13.000')
tdSql.checkData(14, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(29, 1, '2020-02-01 00:00:15.000')
tdSql.checkData(30, 1, '2020-02-01 00:00:00.000')
tdSql.checkData(47, 1, '2020-02-01 00:00:17.000')
for i in range(0, 2):
tdSql.checkData(i, 3, 1)
for i in range(2, 8):
tdSql.checkData(i, 3, 7)
for i in range(8, 14):
tdSql.checkData(i, 3, 13)
for i in range(14, 18):
tdSql.checkData(i, 3, 3)
for i in range(18, 24):
tdSql.checkData(i, 3, 9)
for i in range(24, 30):
tdSql.checkData(i, 3, 15)
for i in range(30, 36):
tdSql.checkData(i, 3, 5)
for i in range(36, 42):
tdSql.checkData(i, 3, 11)
for i in range(42, 48):
tdSql.checkData(i, 3, 17)
tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(39)
for i in range(0, 13):
tdSql.checkData(i, 0, 'ctb1')
for i in range(13, 26):
tdSql.checkData(i, 0, 'ctb2')
for i in range(26, 39):
tdSql.checkData(i, 0, 'ctb3')
tdSql.checkData(0, 1, '2020-02-01 00:00:01.000')
tdSql.checkData(12, 1, '2020-02-01 00:00:13.000')
tdSql.checkData(13, 1, '2020-02-01 00:00:03.000')
tdSql.checkData(25, 1, '2020-02-01 00:00:15.000')
tdSql.checkData(26, 1, '2020-02-01 00:00:05.000')
tdSql.checkData(38, 1, '2020-02-01 00:00:17.000')
for i in range(0, 13):
tdSql.checkData(i, 3, i + 1)
for i in range(13, 26):
tdSql.checkData(i, 3, i - 10)
for i in range(26, 39):
tdSql.checkData(i, 3, i - 21)
# select interp from supertable partition by column
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
tdSql.checkRows(171)
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(value, 0)")
tdSql.checkRows(171)
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(prev)")
tdSql.checkRows(90)
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(90)
tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(9)
# select interp from supertable partition by tag
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
tdSql.checkRows(57)
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(value, 0)")
tdSql.checkRows(57)
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(prev)")
tdSql.checkRows(48)
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)")
tdSql.checkRows(48)
tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(39)
# select interp from supertable filter
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where ts between '2020-02-01 00:00:01.000' and '2020-02-01 00:00:13.000' range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where c0 <= 13 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where t1 = 1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where tbname = 'ctb1' range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where ts between '2020-02-01 00:00:01.000' and '2020-02-01 00:00:13.000' partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(27)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where c0 <= 13 partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(27)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where t1 = 1 partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where tbname = 'ctb1' partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
# select interp from supertable filter limit
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 13")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 20")
tdSql.checkRows(17)
for i in range(17):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where ts between '2020-02-01 00:00:01.000' and '2020-02-01 00:00:13.000' range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where c0 <= 13 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where t1 = 1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where tbname = 'ctb1' range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 13")
tdSql.checkRows(13)
for i in range(13):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 40")
tdSql.checkRows(39)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where ts between '2020-02-01 00:00:01.000' and '2020-02-01 00:00:13.000' partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where c0 <= 13 partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where t1 = 1 partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} where tbname = 'ctb1' partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear) limit 10")
tdSql.checkRows(10)
for i in range(10):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 1)
# select interp from supertable with scalar expression
tdSql.query(f"select _irowts, _isfilled, interp(1 + 1) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(17)
for i in range(17):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, 2.0)
tdSql.query(f"select _irowts, _isfilled, interp(c0 + 1) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(17)
for i in range(17):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, i + 2)
tdSql.query(f"select _irowts, _isfilled, interp(c0 * 2) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(17)
for i in range(17):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, (i + 1) * 2)
tdSql.query(f"select _irowts, _isfilled, interp(c0 + c1) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)")
tdSql.checkRows(17)
for i in range(17):
tdSql.checkData(i, 0, f'2020-02-01 00:00:{i + 1}.000')
tdSql.checkData(i, 2, (i + 1) * 2)
# check duplicate timestamp
# add duplicate timestamp for different child tables
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')")
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:14') every(1s) fill(null)")
tdSql.error(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:15') every(1s) fill(null)")
tdSql.error(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
#tdSql.query(f"select _irowts,interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:04', '2020-02-02 00:00:16') every(1h) fill(prev)")
#tdSql.query(f"select tbname,_irowts,interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:04', '2020-02-02 00:00:16') every(1h) fill(prev)")
tdLog.printNoPrefix("======step 14: test interp pseudo columns") tdLog.printNoPrefix("======step 14: test interp pseudo columns")
tdSql.error(f"select _irowts, c6 from {dbname}.{tbname}") tdSql.error(f"select _irowts, c6 from {dbname}.{tbname}")