diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index f6f3943240..17c01bad94 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -134,7 +134,7 @@ int32_t hashSortFileRemoveFn(SStreamFileState* pFileState, const void* pKey); void clearSearchBuff(SStreamFileState* pFileState); int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen, int32_t* pWinCode); -int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, +int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal, int32_t* pVLen, int32_t* pWinCode); int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId); void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey); diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index b659c12315..4044cc0b9b 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -107,6 +107,7 @@ typedef struct SStreamFillInfo { TSKEY current; // current Key for fill TSKEY preRowKey; TSKEY nextRowKey; + TSKEY nextPointKey; SResultRowData* pResRow; SStreamFillLinearInfo* pLinearInfo; bool needFill; diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 7a17b2d4b5..7b0e316b2f 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -54,15 +54,18 @@ void streamTimeSliceReleaseState(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamTimeSliceOperatorInfo* pInfo = pOperator->info; - int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SWinKey); - int32_t resSize = winSize + sizeof(TSKEY); - char* pBuff = taosMemoryCalloc(1, resSize); + int32_t winNum = taosArrayGetSize(pInfo->historyWins); + + int32_t winSize = winNum * sizeof(SWinKey); + int32_t resSize = winSize + sizeof(TSKEY); + char* pBuff = taosMemoryCalloc(1, resSize); QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno); - memcpy(pBuff, pInfo->historyWins->pData, winSize); + if (winNum > 0) { + memcpy(pBuff, pInfo->historyWins->pData, winSize); + } memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); - qDebug("===stream=== time slice operator relase state. save result count:%d", - (int32_t)taosArrayGetSize(pInfo->historyWins)); + qDebug("===stream=== time slice operator relase state. save result count:%d", winNum); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_TIME_SLICE_OP_STATE_NAME, strlen(STREAM_TIME_SLICE_OP_STATE_NAME), pBuff, resSize); pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); @@ -169,6 +172,8 @@ void destroyStreamTimeSliceOperatorInfo(void* param) { tSimpleHashCleanup(pInfo->pDeletedMap); clearGroupResInfo(&pInfo->groupResInfo); + taosArrayDestroy(pInfo->historyWins); + taosMemoryFreeClear(param); } @@ -404,11 +409,11 @@ static void fillNormalRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi int32_t lino = 0; while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) { STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current}; - if (inWinRange(&pFillSup->winRange, &st)) { - bool res = true; - code = fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->current, pBlock, &res, true); - QUERY_CHECK_CODE(code, lino, _end); - } + // if (inWinRange(&pFillSup->winRange, &st)) { + bool res = true; + code = fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->current, pBlock, &res, true); + QUERY_CHECK_CODE(code, lino, _end); + // } pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, pFillSup->interval.precision); } @@ -905,6 +910,8 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo if (hasNextWindow(pFillSup)) { nextWKey = pFillSup->next.key; } + TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval); + TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval); pFillInfo->needFill = true; pFillInfo->pos = FILL_POS_INVALID; @@ -914,11 +921,9 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo case TSDB_FILL_SET_VALUE: case TSDB_FILL_SET_VALUE_F: { if (hasPrevWindow(pFillSup)) { - TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval); setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; } else { - TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval); setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; } @@ -927,7 +932,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo } break; case TSDB_FILL_PREV: { if (hasNextWindow(pFillSup)) { - TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval); setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; resetFillWindow(&pFillSup->prev); @@ -935,7 +939,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo pFillSup->prev.pRowVal = pFillSup->cur.pRowVal; } else { ASSERT(hasPrevWindow(pFillSup)); - TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval); setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; } @@ -943,7 +946,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo } break; case TSDB_FILL_NEXT: { if (hasPrevWindow(pFillSup)) { - TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval); setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; resetFillWindow(&pFillSup->next); @@ -951,7 +953,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo pFillSup->next.pRowVal = pFillSup->cur.pRowVal; } else { ASSERT(hasNextWindow(pFillSup)); - TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval); setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; resetFillWindow(&pFillSup->prev); @@ -970,7 +971,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pLinearInfo->hasNext = false; } else if (hasPrevWindow(pFillSup)) { - TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval); setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); @@ -981,7 +981,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo pFillInfo->pLinearInfo->hasNext = false; } else { ASSERT(hasNextWindow(pFillSup)); - TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval); setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); @@ -1205,6 +1204,18 @@ _end: } } +void getNextResKey(int64_t curGroupId, SArray* pKeyArray, int32_t curIndex, TSKEY* pNextKey) { + int32_t nextIndex = curIndex + 1; + if (nextIndex < taosArrayGetSize(pKeyArray)) { + SWinKey* pKey = (SWinKey*)taosArrayGet(pKeyArray, nextIndex); + if (pKey->groupId == curGroupId) { + *pNextKey = pKey->ts; + return; + } + } + *pNextKey = INT64_MIN; +} + void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -1235,6 +1246,10 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor } QUERY_CHECK_CODE(code, lino, _end); + getNextResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index, &pFillInfo->nextRowKey); + if (hasNextWindow(pFillSup)) { + pFillInfo->nextPointKey = nextPoint.key.ts; + } setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts); doStreamFillRange(pFillSup, pFillInfo, pBlock); releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); @@ -1326,13 +1341,13 @@ static int32_t doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock for (int32_t i = 0; i < pBlock->info.rows; i++) { while (1) { TSKEY ts = tsCalStarts[i]; - TSKEY endTs = tsCalEnds[i]; + TSKEY endCalTs = tsCalEnds[i]; uint64_t groupId = groupIds[i]; SWinKey key = {.ts = ts, .groupId = groupId}; SWinKey nextKey = {.groupId = groupId}; code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &key, &nextKey, NULL, NULL, &winCode); QUERY_CHECK_CODE(code, lino, _end); - if (key.ts > endTs) { + if (key.ts > endCalTs) { break; } (void)tSimpleHashRemove(pUpdatedMap, &key, sizeof(SWinKey)); @@ -1675,6 +1690,9 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* copyFillValueInfo(pInfo->pFillSup, pInfo->pFillInfo); pInfo->ignoreNull = getIgoreNullRes(pExpSup); + pInfo->historyWins = taosArrayInit(4, sizeof(SWinKey)); + QUERY_CHECK_NULL(pInfo->historyWins, code, lino, _error, terrno); + if (pHandle) { pInfo->isHistoryOp = pHandle->fillHistory; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 5ac833bc69..8492fba70e 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10384,9 +10384,23 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm } } - if (pStmt->pOptions->fillHistory && pSelect->hasInterpFunc) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - "Stream interp unsupported Fill history"); + if (pSelect->hasInterpFunc) { + if (pStmt->pOptions->fillHistory && pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "When trigger was force window close, Stream interp unsupported Fill history"); + } + + if (pStmt->pOptions->triggerType == STREAM_TRIGGER_WINDOW_CLOSE) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Stream interp unsupported window close"); + } + + if ((SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta && + TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && + !hasTbnameFunction(pSelect->pPartitionByList)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Interp for stream on super table must patitioned by table name"); + } } return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamSliceState.c b/source/libs/stream/src/streamSliceState.c index ea5e35e245..9eb4f57887 100644 --- a/source/libs/stream/src/streamSliceState.c +++ b/source/libs/stream/src/streamSliceState.c @@ -123,9 +123,34 @@ void clearSearchBuff(SStreamFileState* pFileState) { } } -int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, +int32_t getStateFromRocksdbByCur(SStreamFileState* pFileState, SStreamStateCur* pCur, SWinKey* pResKey, SRowBuffPos** ppPos, int32_t* pVLen, int32_t* pWinCode) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + void* tmpVal = NULL; + int32_t len = 0; + (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); + if ((*pWinCode) == TSDB_CODE_SUCCESS) { + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + QUERY_CHECK_CODE(code, lino, _end); + } + memcpy(pNewPos->pRowBuff, tmpVal, len); + taosMemoryFreeClear(tmpVal); + *pVLen = getRowStateRowSize(pFileState); + (*ppPos) = pNewPos; + } +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal, int32_t* pVLen, int32_t* pWinCode) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SArray* pWinStates = NULL; SSHashObj* pSearchBuff = getSearchBuff(pFileState); void* pState = getStateFileStore(pFileState); @@ -134,7 +159,20 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW pWinStates = (SArray*)(*ppBuff); } else { SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey); - (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); + void* tmpVal = NULL; + int32_t len = 0; + (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); + if ((*pWinCode) == TSDB_CODE_SUCCESS) { + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + QUERY_CHECK_CODE(code, lino, _end); + } + memcpy(pNewPos->pRowBuff, tmpVal, len); + taosMemoryFreeClear(tmpVal); + *pVLen = getRowStateRowSize(pFileState); + (*ppVal) = pNewPos; + } streamStateFreeCur(pCur); return code; } @@ -142,7 +180,20 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); if (index == -1) { SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey); - (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); + void* tmpVal = NULL; + int32_t len = 0; + (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); + if ((*pWinCode) == TSDB_CODE_SUCCESS) { + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + QUERY_CHECK_CODE(code, lino, _end); + } + memcpy(pNewPos->pRowBuff, tmpVal, len); + taosMemoryFreeClear(tmpVal); + *pVLen = getRowStateRowSize(pFileState); + (*ppVal) = pNewPos; + } streamStateFreeCur(pCur); return code; } else { @@ -152,15 +203,21 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW } SWinKey* pNext = taosArrayGet(pWinStates, index + 1); *pResKey = *pNext; - return getHashSortRowBuff(pFileState, pResKey, pVal, pVLen, pWinCode); + return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode); } (*pWinCode) = TSDB_CODE_FAILED; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } return code; } -int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, +int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal, int32_t* pVLen, int32_t* pWinCode) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SArray* pWinStates = NULL; SSHashObj* pSearchBuff = getSearchBuff(pFileState); void* pState = getStateFileStore(pFileState); @@ -169,7 +226,20 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW pWinStates = (SArray*)(*ppBuff); } else { SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey); - (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); + void* tmpVal = NULL; + int32_t len = 0; + (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); + if ((*pWinCode) == TSDB_CODE_SUCCESS) { + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + QUERY_CHECK_CODE(code, lino, _end); + } + memcpy(pNewPos->pRowBuff, tmpVal, len); + taosMemoryFreeClear(tmpVal); + *pVLen = getRowStateRowSize(pFileState); + (*ppVal) = pNewPos; + } streamStateFreeCur(pCur); return code; } @@ -177,15 +247,33 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); if (index == -1 || index == 0) { SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey); - (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); + void* tmpVal = NULL; + int32_t len = 0; + (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); + if ((*pWinCode) == TSDB_CODE_SUCCESS) { + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + QUERY_CHECK_CODE(code, lino, _end); + } + memcpy(pNewPos->pRowBuff, tmpVal, len); + taosMemoryFreeClear(tmpVal); + *pVLen = getRowStateRowSize(pFileState); + (*ppVal) = pNewPos; + } streamStateFreeCur(pCur); return code; } else { SWinKey* pNext = taosArrayGet(pWinStates, index - 1); *pResKey = *pNext; - return getHashSortRowBuff(pFileState, pResKey, pVal, pVLen, pWinCode); + return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode); } (*pWinCode) = TSDB_CODE_FAILED; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } return code; } diff --git a/tests/script/tsim/stream/streamInterpError.sim b/tests/script/tsim/stream/streamInterpError.sim new file mode 100644 index 0000000000..49366dc9ee --- /dev/null +++ b/tests/script/tsim/stream/streamInterpError.sim @@ -0,0 +1,82 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step2 + +sql create database test2 vgroups 1; +sql use test2; + +sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +print step2_0 + +sql create stream streams2_0_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_0_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(prev); +sql create stream streams2_0_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_0_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(next); +sql create stream streams2_0_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_0_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(linear); +sql create stream streams2_0_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_0_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(NULL); +sql create stream streams2_0_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_0_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(value,11,22,33,44); + +print step2_1 + +sql_error create stream streams2_1_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(prev); +sql_error create stream streams2_1_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(next); +sql_error create stream streams2_1_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(linear); +sql_error create stream streams2_1_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(NULL); +sql_error create stream streams2_1_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(value,11,22,33,44); + +print step2_2 + +sql_error create stream streams2_2_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(prev); +sql_error create stream streams2_2_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(next); +sql_error create stream streams2_2_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(linear); +sql_error create stream streams2_2_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(NULL); +sql_error create stream streams2_2_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(value,11,22,33,44); + +print step2_3 + +sql_error create stream streams2_3_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st partition by a every(1s) fill(prev); +sql_error create stream streams2_3_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st partition by a every(1s) fill(next); +sql_error create stream streams2_3_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st partition by a every(1s) fill(linear); +sql_error create stream streams2_3_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st partition by a every(1s) fill(NULL); +sql_error create stream streams2_3_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st partition by a every(1s) fill(value,11,22,33,44); + +print step2_4 + +sql_error create stream streams2_4_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4_1 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(prev); +sql_error create stream streams2_4_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4_2 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(next); +sql_error create stream streams2_4_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4_3 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(linear); +sql_error create stream streams2_4_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4_4 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(NULL); +sql_error create stream streams2_4_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4_5 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(value,11,22,33,44); + +print step2_5 + +sql_error create stream streams2_5_1 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(prev); +sql_error create stream streams2_5_2 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(next); +sql_error create stream streams2_5_3 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(linear); +sql_error create stream streams2_5_4 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(NULL); +sql_error create stream streams2_5_5 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(value,11,22,33,44); + +print step2_6 + +sql create stream streams2_6_1 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(prev); +sql create stream streams2_6_2 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(next); +sql create stream streams2_6_3 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(linear); +sql create stream streams2_6_4 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(NULL); +sql create stream streams2_6_5 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(value,11,22,33,44); + +sql_error create stream streams2_6_6 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_6 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(prev); +sql_error create stream streams2_6_7 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_7 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(next); +sql_error create stream streams2_6_8 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_8 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(linear); +sql_error create stream streams2_6_9 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_9 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(NULL); +sql_error create stream streams2_6_10 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_10 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(value,11,22,33,44); + +run tsim/stream/checkTaskStatus.sim + +run tsim/stream/checkTaskStatus.sim + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamInterpOther.sim b/tests/script/tsim/stream/streamInterpOther.sim index c135a31992..8af1274bb6 100644 --- a/tests/script/tsim/stream/streamInterpOther.sim +++ b/tests/script/tsim/stream/streamInterpOther.sim @@ -211,39 +211,6 @@ if $data17 != 44.000000000 then goto loop0_2 endi -print step2 - -sql create database test2 vgroups 1; -sql use test2; - -sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); -sql create table t1 using st tags(1,1,1); -sql create table t2 using st tags(2,2,2); - -sql_error create stream streams2_1_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(prev); -sql_error create stream streams2_1_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(next); -sql_error create stream streams2_1_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(linear); -sql_error create stream streams2_1_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(NULL); -sql_error create stream streams2_1_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(value,11,22,33,44); - -sql_error create stream streams2_2_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(prev); -sql_error create stream streams2_2_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(next); -sql_error create stream streams2_2_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(linear); -sql_error create stream streams2_2_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(NULL); -sql_error create stream streams2_2_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(value,11,22,33,44); - -sql_error create stream streams2_3_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st prartition by a every(1s) fill(prev); -sql_error create stream streams2_3_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st prartition by a every(1s) fill(next); -sql_error create stream streams2_3_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st prartition by a every(1s) fill(linear); -sql_error create stream streams2_3_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st prartition by a every(1s) fill(NULL); -sql_error create stream streams2_3_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st prartition by a every(1s) fill(value,11,22,33,44); - -sql_error create stream streams2_4_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_1 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(prev); -sql_error create stream streams2_4_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_2 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(next); -sql_error create stream streams2_4_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_3 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(linear); -sql_error create stream streams2_4_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_4 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(NULL); -sql_error create stream streams2_4_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_5 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(value,11,22,33,44); - print step3 sql create database test3 vgroups 4;