diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 54c8d092d3..51b3c511dd 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -454,6 +454,27 @@ typedef struct SSteamOpBasicInfo { bool updateOperatorInfo; } SSteamOpBasicInfo; +typedef struct SStreamFillSupporter { + int32_t type; // fill type + SInterval interval; + SResultRowData prev; + TSKEY prevOriginKey; + SResultRowData cur; + SResultRowData next; + TSKEY nextOriginKey; + SResultRowData nextNext; + SFillColInfo* pAllColInfo; // fill exprs and not fill exprs + SExprSupp notFillExprSup; + int32_t numOfAllCols; // number of all exprs, including the tags columns + int32_t numOfFillCols; + int32_t numOfNotFillCols; + int32_t rowSize; + SSHashObj* pResMap; + bool hasDelete; + SStorageAPI* pAPI; + STimeWindow winRange; +} SStreamFillSupporter; + typedef struct SStreamScanInfo { SSteamOpBasicInfo basic; SExprInfo* pPseudoExpr; @@ -494,6 +515,7 @@ typedef struct SStreamScanInfo { STimeWindow updateWin; STimeWindowAggSupp twAggSup; SSDataBlock* pUpdateDataRes; + SStreamFillSupporter* pFillSup; // status for tmq SNodeList* pGroupTags; SNode* pTagCond; @@ -775,27 +797,6 @@ typedef struct SStreamPartitionOperatorInfo { SSDataBlock* pCreateTbRes; } SStreamPartitionOperatorInfo; -typedef struct SStreamFillSupporter { - int32_t type; // fill type - SInterval interval; - SResultRowData prev; - TSKEY prevOriginKey; - SResultRowData cur; - SResultRowData next; - TSKEY nextOriginKey; - SResultRowData nextNext; - SFillColInfo* pAllColInfo; // fill exprs and not fill exprs - SExprSupp notFillExprSup; - int32_t numOfAllCols; // number of all exprs, including the tags columns - int32_t numOfFillCols; - int32_t numOfNotFillCols; - int32_t rowSize; - SSHashObj* pResMap; - bool hasDelete; - SStorageAPI* pAPI; - STimeWindow winRange; -} SStreamFillSupporter; - typedef struct SStreamFillOperatorInfo { SSteamOpBasicInfo basic; SStreamFillSupporter* pFillSup; diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index f4b5030d3a..744d95f1cf 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -27,6 +27,21 @@ extern "C" { #define FILL_POS_MID 2 #define FILL_POS_END 3 +#define HAS_NON_ROW_DATA(pRowData) (pRowData->key == INT64_MIN) +#define HAS_ROW_DATA(pRowData) (pRowData && pRowData->key != INT64_MIN) + +typedef struct SSliceRowData { + TSKEY key; + SResultCellData pRowVal[]; +} SSliceRowData; + +typedef struct SSlicePoint { + SWinKey key; + SSliceRowData* pLeftRow; + SSliceRowData* pRightRow; + SRowBuffPos* pResPos; +} SSlicePoint; + void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type); bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo); void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo); @@ -56,6 +71,7 @@ void destroySPoint(void* ptr); void destroyStreamFillInfo(SStreamFillInfo* pFillInfo); int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes); void resetStreamFillSup(SStreamFillSupporter* pFillSup); +void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup); int winPosCmprImpl(const void* pKey1, const void* pKey2); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 93facbf396..18648e3b57 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2184,8 +2184,38 @@ _end: return code; } -int32_t getTimeSliceWinRange(SStreamAggSupporter* pAggSup, SInterval* pInterval, TSKEY start, TSKEY end, - int64_t groupId, STimeWindow* pScanRange, STimeWindow* pDelRange) { +static int32_t setDelRangeEndKey(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, SWinKey* pEndKey, STimeWindow* pScanRange, bool* pRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SSlicePoint nextPoint = {.key.groupId = pEndKey->groupId}; + int32_t vLen = 0; + int32_t winCode = TSDB_CODE_SUCCESS; + code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, pEndKey, &nextPoint.key, (void**)&nextPoint.pResPos, &vLen, &winCode); + QUERY_CHECK_CODE(code, lino, _end); + if (winCode == TSDB_CODE_SUCCESS) { + setPointBuff(&nextPoint, pFillSup); + if (HAS_ROW_DATA(nextPoint.pLeftRow) && pEndKey->ts < nextPoint.pLeftRow->key) { + pScanRange->ekey = nextPoint.pLeftRow->key; + *pRes = true; + } else if (pEndKey->ts < nextPoint.pRightRow->key) { + pScanRange->ekey = nextPoint.pRightRow->key; + *pRes = true; + } else { + *pEndKey = nextPoint.key; + pScanRange->ekey = TMAX(nextPoint.pRightRow->key, nextPoint.key.ts); + *pRes = false; + } + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t getTimeSliceWinRange(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, SInterval* pInterval, TSKEY start, TSKEY end, + int64_t groupId, STimeWindow* pScanRange) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int32_t winCode = TSDB_CODE_SUCCESS; @@ -2193,28 +2223,36 @@ int32_t getTimeSliceWinRange(SStreamAggSupporter* pAggSup, SInterval* pInterval, dumyInfo.cur.pageId = -1; STimeWindow sWin = getActiveTimeWindow(NULL, &dumyInfo, start, pInterval, TSDB_ORDER_ASC); SWinKey startKey = {.groupId = groupId, .ts = sWin.skey}; - pDelRange->skey = sWin.skey; sWin = getActiveTimeWindow(NULL, &dumyInfo, end, pInterval, TSDB_ORDER_ASC); SWinKey endKey = {.groupId = groupId, .ts = sWin.ekey}; - pDelRange->ekey = sWin.ekey; - SWinKey preKey = {.groupId = groupId}; - code = pAggSup->stateStore.streamStateFillGetPrev(pAggSup->pState, &startKey, &preKey, NULL, NULL, &winCode); + SSlicePoint prevPoint = {.key.groupId = groupId}; + SSlicePoint nextPoint = {.key.groupId = groupId}; + int32_t vLen = 0; + code = pAggSup->stateStore.streamStateFillGetPrev(pAggSup->pState, &startKey, &prevPoint.key, (void**)&prevPoint.pResPos, &vLen, &winCode); QUERY_CHECK_CODE(code, lino, _end); if (winCode == TSDB_CODE_SUCCESS) { - pScanRange->skey = preKey.ts; + setPointBuff(&prevPoint, pFillSup); + if (HAS_ROW_DATA(prevPoint.pRightRow)) { + pScanRange->skey = prevPoint.pRightRow->key; + } else { + pScanRange->skey = prevPoint.pLeftRow->key; + } } else { pScanRange->skey = startKey.ts; } - SWinKey nextKey = {.groupId = groupId}; - code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &endKey, &nextKey, NULL, NULL, &winCode); + bool res = false; + SWinKey curKey = endKey; + code = setDelRangeEndKey(pAggSup, pFillSup, &curKey, pScanRange, &res); QUERY_CHECK_CODE(code, lino, _end); - if (winCode == TSDB_CODE_SUCCESS) { - pScanRange->ekey = nextKey.ts; - } else { - pScanRange->ekey = endKey.ts; + if (res == false) { + code = setDelRangeEndKey(pAggSup, pFillSup, &curKey, pScanRange, &res); + QUERY_CHECK_CODE(code, lino, _end); + } + if (res == false) { + pScanRange->ekey = TMAX(endKey.ts, pScanRange->ekey); } _end: @@ -2277,10 +2315,9 @@ static int32_t generateTimeSliceScanRange(SStreamScanInfo* pInfo, SSDataBlock* p } STimeWindow scanRange = {0}; - STimeWindow delRange = {0}; ASSERT(mode == STREAM_DELETE_RESULT || mode == STREAM_DELETE_DATA); - code = getTimeSliceWinRange(pInfo->windowSup.pStreamAggSup, &pInfo->interval, startData[i], endData[i], groupId, - &scanRange, &delRange); + code = getTimeSliceWinRange(pInfo->windowSup.pStreamAggSup, pInfo->pFillSup, &pInfo->interval, startData[i], endData[i], groupId, + &scanRange); QUERY_CHECK_CODE(code, lino, _end); code = colDataSetVal(pDestStartCol, i, (const char*)&scanRange.skey, false); @@ -2293,10 +2330,10 @@ static int32_t generateTimeSliceScanRange(SStreamScanInfo* pInfo, SSDataBlock* p code = colDataSetVal(pDestGpCol, i, (const char*)&groupId, false); QUERY_CHECK_CODE(code, lino, _end); - code = colDataSetVal(pDestCalStartTsCol, i, (const char*)&delRange.skey, false); + code = colDataSetVal(pDestCalStartTsCol, i, (const char*)&scanRange.skey, false); QUERY_CHECK_CODE(code, lino, _end); - code = colDataSetVal(pDestCalEndTsCol, i, (const char*)&delRange.ekey, false); + code = colDataSetVal(pDestCalEndTsCol, i, (const char*)&scanRange.ekey, false); QUERY_CHECK_CODE(code, lino, _end); pDestBlock->info.rows++; @@ -4330,6 +4367,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pInfo->pState = pTaskInfo->streamInfo.pState; pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn; + pInfo->pFillSup = NULL; code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 72a75b9ebf..17840f8790 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -29,23 +29,9 @@ #define STREAM_TIME_SLICE_OP_STATE_NAME "StreamTimeSliceHistoryState" #define STREAM_TIME_SLICE_OP_CHECKPOINT_NAME "StreamTimeSliceOperator_Checkpoint" -#define HAS_NON_ROW_DATA(pRowData) (pRowData->key == INT64_MIN) -#define HAS_ROW_DATA(pRowData) (pRowData && pRowData->key != INT64_MIN) #define IS_INVALID_WIN_KEY(ts) ((ts) == INT64_MIN) #define SET_WIN_KEY_INVALID(ts) ((ts) = INT64_MIN) -typedef struct SSliceRowData { - TSKEY key; - SResultCellData pRowVal[]; -} SSliceRowData; - -typedef struct SSlicePoint { - SWinKey key; - SSliceRowData* pLeftRow; - SSliceRowData* pRightRow; - SRowBuffPos* pResPos; -} SSlicePoint; - int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap) { return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), NULL, 0); } @@ -628,7 +614,7 @@ static int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, static void setResultRowData(SSliceRowData** ppRowData, void* pBuff) { (*ppRowData) = (SSliceRowData*)pBuff; } -static void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup) { +void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup) { if (pFillSup->type != TSDB_FILL_LINEAR) { setResultRowData(&pPoint->pRightRow, pPoint->pResPos->pRowBuff); pPoint->pLeftRow = pPoint->pRightRow; @@ -1344,14 +1330,6 @@ static int32_t buildTimeSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRe uint16_t opType = pOperator->operatorType; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; - doBuildDeleteResultImpl(&pAggSup->stateStore, pAggSup->pState, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); - if (pInfo->pDelRes->info.rows != 0) { - // process the rest of the data - printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); - (*ppRes) = pInfo->pDelRes; - goto _end; - } - doBuildTimeSlicePointResult(pAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo); if (pInfo->pRes->info.rows != 0) { printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); @@ -1407,16 +1385,16 @@ static int32_t doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); uint64_t* groupIds = (uint64_t*)pGroupCol->pData; - SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); - TSKEY* tsCalStarts = (TSKEY*)pCalStartCol->pData; - SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); - TSKEY* tsCalEnds = (TSKEY*)pCalEndCol->pData; + SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + TSKEY* tsStarts = (TSKEY*)pStartCol->pData; + SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + TSKEY* tsEnds = (TSKEY*)pEndCol->pData; for (int32_t i = 0; i < pBlock->info.rows; i++) { + TSKEY ts = tsStarts[i]; + TSKEY endCalTs = tsEnds[i]; + uint64_t groupId = groupIds[i]; + SWinKey key = {.ts = ts, .groupId = groupId}; while (1) { - TSKEY ts = tsCalStarts[i]; - TSKEY endCalTs = tsCalEnds[i]; - uint64_t groupId = groupIds[i]; - SWinKey key = {.ts = ts, .groupId = groupId}; SWinKey nextKey = {.groupId = groupId}; code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &key, &nextKey, NULL, NULL, &winCode); QUERY_CHECK_CODE(code, lino, _end); @@ -1424,8 +1402,6 @@ static int32_t doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock break; } (void)tSimpleHashRemove(pUpdatedMap, &key, sizeof(SWinKey)); - void* tmp = taosArrayPush(pDelWins, &key); - QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); pAggSup->stateStore.streamStateDel(pAggSup->pState, &key); if (winCode != TSDB_CODE_SUCCESS) { @@ -1526,9 +1502,15 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR setStreamOperatorState(&pInfo->basic, pBlock->info.type); switch (pBlock->info.type) { - case STREAM_DELETE_RESULT: { + case STREAM_DELETE_RESULT: + case STREAM_DELETE_DATA: { code = doDeleteTimeSliceResult(pAggSup, pBlock, pInfo->pUpdatedMap, pInfo->pDelWins); QUERY_CHECK_CODE(code, lino, _end); + copyDataBlock(pInfo->pDelRes, pBlock); + pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; + (*ppRes) = pInfo->pDelRes; + printDataBlock((*ppRes), getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + goto _end; } break; case STREAM_NORMAL: case STREAM_INVALID: { @@ -1560,9 +1542,7 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR doStreamTimeSliceImpl(pOperator, pBlock); } - if (!pInfo->destHasPrimaryKey) { - removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); - } else { + if (pInfo->destHasPrimaryKey) { copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins); } @@ -1665,6 +1645,44 @@ int32_t getDownstreamRes(SOperatorInfo* downstream, SSDataBlock** ppRes) { return TSDB_CODE_FAILED; } +int32_t initTimeSliceDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, + int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic, + SStreamFillSupporter* pFillSup) { + SExecTaskInfo* pTaskInfo = downstream->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) { + SStreamPartitionOperatorInfo* pScanInfo = downstream->info; + pScanInfo->tsColIndex = tsColIndex; + } + + if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + code = initTimeSliceDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic, pFillSup); + return code; + } + SStreamScanInfo* pScanInfo = downstream->info; + pScanInfo->igCheckUpdate = true; + pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; + pScanInfo->pState = pAggSup->pState; + if (!pScanInfo->pUpdateInfo) { + code = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark, + pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen, + &pScanInfo->pUpdateInfo); + QUERY_CHECK_CODE(code, lino, _end); + } + pScanInfo->twAggSup = *pTwSup; + pScanInfo->pFillSup = pFillSup; + pScanInfo->interval = pFillSup->interval; + pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo; + pBasic->primaryPkIndex = pScanInfo->primaryKeyIndex; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + } + return code; +} + int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** ppOptInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -1789,13 +1807,12 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* setOperatorStreamStateFn(pOperator, streamTimeSliceReleaseState, streamTimeSliceReloadState); if (downstream) { - if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - SStreamScanInfo* pScanInfo = downstream->info; - pScanInfo->igCheckUpdate = true; - } - initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, - &pInfo->basic); + code = initTimeSliceDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, + &pInfo->twAggSup, &pInfo->basic, pInfo->pFillSup); + QUERY_CHECK_CODE(code, lino, _error); + code = appendDownstream(pOperator, &downstream, 1); + QUERY_CHECK_CODE(code, lino, _error); } (*ppOptInfo) = pOperator; return code; diff --git a/source/libs/stream/src/streamSliceState.c b/source/libs/stream/src/streamSliceState.c index 4ad60daec8..82d204c634 100644 --- a/source/libs/stream/src/streamSliceState.c +++ b/source/libs/stream/src/streamSliceState.c @@ -284,7 +284,6 @@ _end: } void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey) { - deleteRowBuff(pFileState, pKey, sizeof(SWinKey)); SSHashObj* pSearchBuff = getSearchBuff(pFileState); void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); if (!ppBuff) { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index ed4c639aaa..259d89b6bd 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -246,10 +246,6 @@ int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKe } void streamStateFillDel(SStreamState* pState, const SWinKey* key) { - if (pState->pFileState) { - deleteHashSortRowBuff(pState->pFileState, key); - return; - } int32_t code = streamStateFillDel_rocksdb(pState, key); qTrace("%s at line %d res %d", __func__, __LINE__, code); } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 522f911f64..af8b1bfc18 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -569,10 +569,12 @@ int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t k pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey)); SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen); if (pos) { - *pVLen = pFileState->rowSize; - *pVal = *pos; - (*pos)->beUsed = true; - (*pos)->beFlushed = false; + if (pVal != NULL) { + *pVLen = pFileState->rowSize; + *pVal = *pos; + (*pos)->beUsed = true; + (*pos)->beFlushed = false; + } goto _end; } SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); @@ -616,11 +618,17 @@ void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLe qTrace("%s at line %d res:%d", __func__, __LINE__, code_buff); int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); qTrace("%s at line %d res:%d", __func__, __LINE__, code_file); + if (pFileState->searchBuff != NULL) { + deleteHashSortRowBuff(pFileState, pKey); + } } int32_t resetRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) { int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen); int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); + if (pFileState->searchBuff != NULL) { + deleteHashSortRowBuff(pFileState, pKey); + } if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) { return TSDB_CODE_SUCCESS; } diff --git a/tests/script/tsim/stream/streamInterpDelete0.sim b/tests/script/tsim/stream/streamInterpDelete0.sim new file mode 100644 index 0000000000..21bac13e4a --- /dev/null +++ b/tests/script/tsim/stream/streamInterpDelete0.sim @@ -0,0 +1,507 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(prev); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791212001,1,1,1,1.0) (1648791214000,8,1,1,1.0) (1648791215000,10,1,1,1.0) (1648791215009,15,1,1,1.0) (1648791217001,4,1,1,1.0); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop0 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop0 +endi + +if $data11 != 8 then + print ======data11=$data11 + goto loop0 +endi + +if $data21 != 10 then + print ======data21=$data21 + goto loop0 +endi + +if $data31 != 15 then + print ======data31=$data31 + goto loop0 +endi + +if $data41 != 15 then + print ======data41=$data41 + goto loop0 +endi + +print 1 sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000; +sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000; + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop1 +endi + +if $data11 != 8 then + print ======data11=$data11 + goto loop1 +endi + +if $data21 != 8 then + print ======data21=$data21 + goto loop1 +endi + +if $data31 != 8 then + print ======data31=$data31 + goto loop1 +endi + +if $data41 != 8 then + print ======data41=$data41 + goto loop1 +endi + + +print 2 sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000; +sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000; + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 4 then + print ======rows=$rows + goto loop2 +endi + +# row 0 +if $data01 != 8 then + print ======data01=$data01 + goto loop2 +endi + +if $data11 != 8 then + print ======data11=$data11 + goto loop2 +endi + +if $data21 != 8 then + print ======data21=$data21 + goto loop2 +endi + +if $data31 != 8 then + print ======data31=$data31 + goto loop2 +endi + +print 3 sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000; +sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000 + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop3 +endi + +# row 0 +if $data01 != 8 then + print ======data01=$data01 + goto loop3 +endi + + +print step2 +print =============== create database +sql create database test2 vgroups 1; +sql use test2; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(next); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(1648791212001,1,1,1,1.0) (1648791214000,8,1,1,1.0) (1648791215000,10,1,1,1.0) (1648791215009,15,1,1,1.0) (1648791217001,4,1,1,1.0); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop4: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop4 +endi + +# row 0 +if $data01 != 8 then + print ======data01=$data01 + goto loop4 +endi + +if $data11 != 8 then + print ======data11=$data11 + goto loop4 +endi + +if $data21 != 10 then + print ======data21=$data21 + goto loop4 +endi + +if $data31 != 4 then + print ======data31=$data31 + goto loop4 +endi + +if $data41 != 4 then + print ======data41=$data41 + goto loop4 +endi + +print 1 sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000; +sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000; + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop5: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop5 +endi + +# row 0 +if $data01 != 8 then + print ======data01=$data01 + goto loop5 +endi + +if $data11 != 8 then + print ======data11=$data11 + goto loop5 +endi + +if $data21 != 4 then + print ======data21=$data21 + goto loop5 +endi + +if $data31 != 4 then + print ======data31=$data31 + goto loop5 +endi + +if $data41 != 4 then + print ======data41=$data41 + goto loop5 +endi + + +print 2 sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000; +sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000; + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop6: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 4 then + print ======rows=$rows + goto loop6 +endi + +# row 0 +if $data01 != 8 then + print ======data01=$data01 + goto loop6 +endi + +if $data11 != 4 then + print ======data11=$data11 + goto loop6 +endi + +if $data21 != 4 then + print ======data21=$data21 + goto loop6 +endi + +if $data31 != 4 then + print ======data31=$data31 + goto loop6 +endi + +print 3 sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000; +sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000 + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop7: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop7 +endi + +# row 0 +if $data01 != 8 then + print ======data01=$data01 + goto loop7 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamInterpDelete1.sim b/tests/script/tsim/stream/streamInterpDelete1.sim new file mode 100644 index 0000000000..162da175e8 --- /dev/null +++ b/tests/script/tsim/stream/streamInterpDelete1.sim @@ -0,0 +1,508 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(NULL); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791212001,1,1,1,1.0) (1648791214000,8,1,1,1.0) (1648791215000,10,1,1,1.0) (1648791215009,15,1,1,1.0) (1648791217001,4,1,1,1.0); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop0 +endi + +# row 0 +if $data01 != NULL then + print ======data01=$data01 + goto loop0 +endi + +if $data11 != 8 then + print ======data11=$data11 + goto loop0 +endi + +if $data21 != 10 then + print ======data21=$data21 + goto loop0 +endi + +if $data31 != NULL then + print ======data31=$data31 + goto loop0 +endi + +if $data41 != NULL then + print ======data41=$data41 + goto loop0 +endi + +print 1 sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000; +sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000; + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != NULL then + print ======data01=$data01 + goto loop1 +endi + +if $data11 != 8 then + print ======data11=$data11 + goto loop1 +endi + +if $data21 != NULL then + print ======data21=$data21 + goto loop1 +endi + +if $data31 != NULL then + print ======data31=$data31 + goto loop1 +endi + +if $data41 != NULL then + print ======data41=$data41 + goto loop1 +endi + + +print 2 sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000; +sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000; + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 4 then + print ======rows=$rows + goto loop2 +endi + +# row 0 +if $data01 != 8 then + print ======data01=$data01 + goto loop2 +endi + +if $data11 != NULL then + print ======data11=$data11 + goto loop2 +endi + +if $data21 != NULL then + print ======data21=$data21 + goto loop2 +endi + +if $data31 != NULL then + print ======data31=$data31 + goto loop2 +endi + +print 3 sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000; +sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000 + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop3 +endi + +# row 0 +if $data01 != 8 then + print ======data01=$data01 + goto loop3 +endi + + +print step2 +print =============== create database +sql create database test2 vgroups 1; +sql use test2; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(value,100,200,300,400); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(1648791212001,1,1,1,1.0) (1648791214000,8,1,1,1.0) (1648791215000,10,1,1,1.0) (1648791215009,15,1,1,1.0) (1648791217001,4,1,1,1.0); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop4: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop4 +endi + +# row 0 +if $data01 != 100 then + print ======data01=$data01 + goto loop4 +endi + +if $data11 != 8 then + print ======data11=$data11 + goto loop4 +endi + +if $data21 != 10 then + print ======data21=$data21 + goto loop4 +endi + +if $data31 != 100 then + print ======data31=$data31 + goto loop4 +endi + +if $data41 != 100 then + print ======data41=$data41 + goto loop4 +endi + +print 1 sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000; +sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000; + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop5: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop5 +endi + +# row 0 +if $data01 != 100 then + print ======data01=$data01 + goto loop5 +endi + +if $data11 != 8 then + print ======data11=$data11 + goto loop5 +endi + +if $data21 != 100 then + print ======data21=$data21 + goto loop5 +endi + +if $data31 != 100 then + print ======data31=$data31 + goto loop5 +endi + +if $data41 != 100 then + print ======data41=$data41 + goto loop5 +endi + + +print 2 sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000; +sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000; + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop6: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 4 then + print ======rows=$rows + goto loop6 +endi + +# row 0 +if $data01 != 8 then + print ======data01=$data01 + goto loop6 +endi + +if $data11 != 100 then + print ======data11=$data11 + goto loop6 +endi + +if $data21 != 100 then + print ======data21=$data21 + goto loop6 +endi + +if $data31 != 100 then + print ======data31=$data31 + goto loop6 +endi + +print 3 sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000; +sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000 + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop7: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop7 +endi + +# row 0 +if $data01 != 8 then + print ======data01=$data01 + goto loop7 +endi + + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamInterpDelete2.sim b/tests/script/tsim/stream/streamInterpDelete2.sim new file mode 100644 index 0000000000..be27dcda49 --- /dev/null +++ b/tests/script/tsim/stream/streamInterpDelete2.sim @@ -0,0 +1,258 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(linear); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791212001,1,1,1,1.0) (1648791214000,8,1,1,1.0) (1648791215000,10,1,1,1.0) (1648791215009,15,1,1,1.0) (1648791217001,4,1,1,1.0); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linera); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop0 +endi + +# row 0 +if $data01 != 4 then + print ======data01=$data01 + goto loop0 +endi + +if $data11 != 8 then + print ======data11=$data11 + goto loop0 +endi + +if $data21 != 10 then + print ======data21=$data21 + goto loop0 +endi + +if $data31 != 9 then + print ======data31=$data31 + goto loop0 +endi + +if $data41 != 4 then + print ======data41=$data41 + goto loop0 +endi + +print 1 sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000; +sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000; + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != 4 then + print ======data01=$data01 + goto loop1 +endi + +if $data11 != 8 then + print ======data11=$data11 + goto loop1 +endi + +if $data21 != 6 then + print ======data21=$data21 + goto loop1 +endi + +if $data31 != 5 then + print ======data31=$data31 + goto loop1 +endi + +if $data41 != 4 then + print ======data41=$data41 + goto loop1 +endi + + +print 2 sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000; +sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000; + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 4 then + print ======rows=$rows + goto loop2 +endi + +# row 0 +if $data01 != 8 then + print ======data01=$data01 + goto loop2 +endi + +if $data11 != 6 then + print ======data11=$data11 + goto loop2 +endi + +if $data21 != 5 then + print ======data21=$data21 + goto loop2 +endi + +if $data31 != 4 then + print ======data31=$data31 + goto loop2 +endi + +print 3 sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000; +sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000 + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 + +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop3 +endi + +# row 0 +if $data01 != 8 then + print ======data01=$data01 + goto loop3 +endi + + +system sh/exec.sh -n dnode1 -s stop -x SIGINT