diff --git a/docs/zh/05-get-started/01-docker.md b/docs/zh/05-get-started/01-docker.md index e2be419517..35e7ad1933 100644 --- a/docs/zh/05-get-started/01-docker.md +++ b/docs/zh/05-get-started/01-docker.md @@ -62,7 +62,7 @@ taos> ## 体验查询 -使用上述 taosBenchmark 插入数据后,可以在 TDengine CLI 输入查询命令,体验查询速度。。 +使用上述 taosBenchmark 插入数据后,可以在 TDengine CLI 输入查询命令,体验查询速度。 查询超级表下记录总条数: diff --git a/docs/zh/28-releases/02-tools.md b/docs/zh/28-releases/02-tools.md index c8a99cb40f..9e8757cc4e 100644 --- a/docs/zh/28-releases/02-tools.md +++ b/docs/zh/28-releases/02-tools.md @@ -9,7 +9,3 @@ import Release from "/components/ReleaseV3"; ## 2.1.3 - -## 2.1.2 - - \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index c598b188b7..feb69bf74f 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "mndDb.h" +#include "mndCluster.h" #include "mndDnode.h" #include "mndOffset.h" #include "mndPrivilege.h" @@ -1714,18 +1715,18 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb, taosMemoryFree(buf); } -static void setInformationSchemaDbCfg(SDbObj *pDbObj) { +static void setInformationSchemaDbCfg(SMnode *pMnode, SDbObj *pDbObj) { tstrncpy(pDbObj->name, TSDB_INFORMATION_SCHEMA_DB, tListLen(pDbObj->name)); - pDbObj->createdTime = 0; + pDbObj->createdTime = mndGetClusterCreateTime(pMnode); pDbObj->cfg.numOfVgroups = 0; pDbObj->cfg.strict = 1; pDbObj->cfg.replications = 1; pDbObj->cfg.precision = TSDB_TIME_PRECISION_MILLI; } -static void setPerfSchemaDbCfg(SDbObj *pDbObj) { +static void setPerfSchemaDbCfg(SMnode *pMnode, SDbObj *pDbObj) { tstrncpy(pDbObj->name, TSDB_PERFORMANCE_SCHEMA_DB, tListLen(pDbObj->name)); - pDbObj->createdTime = 0; + pDbObj->createdTime = mndGetClusterCreateTime(pMnode); pDbObj->cfg.numOfVgroups = 0; pDbObj->cfg.strict = 1; pDbObj->cfg.replications = 1; @@ -1756,7 +1757,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc // Append the information_schema database into the result. if (!pShow->sysDbRsp) { SDbObj infoschemaDb = {0}; - setInformationSchemaDbCfg(&infoschemaDb); + setInformationSchemaDbCfg(pMnode, &infoschemaDb); size_t numOfTables = 0; getVisibleInfosTablesNum(sysinfo, &numOfTables); mndDumpDbInfoData(pMnode, pBlock, &infoschemaDb, pShow, numOfRows, numOfTables, true, 0, 1); @@ -1764,7 +1765,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc numOfRows += 1; SDbObj perfschemaDb = {0}; - setPerfSchemaDbCfg(&perfschemaDb); + setPerfSchemaDbCfg(pMnode, &perfschemaDb); numOfTables = 0; getPerfDbMeta(NULL, &numOfTables); mndDumpDbInfoData(pMnode, pBlock, &perfschemaDb, pShow, numOfRows, numOfTables, true, 0, 1); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 38b5bd24be..f6a35da683 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -650,6 +650,8 @@ typedef struct SSttBlockLoadInfo { SArray *aSttBlk; int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. int32_t currentLoadBlockIndex; + int32_t loadBlocks; + double elapsedTime; } SSttBlockLoadInfo; typedef struct SMergeTree { @@ -659,6 +661,7 @@ typedef struct SMergeTree { SLDataIter *pIter; bool destroyLoadInfo; SSttBlockLoadInfo *pLoadInfo; + const char *idStr; } SMergeTree; typedef struct { @@ -668,7 +671,7 @@ typedef struct { } SSkmInfo; int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo); + STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo, const char *idStr); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); @@ -676,6 +679,7 @@ void tMergeTreeClose(SMergeTree *pMTree); SSttBlockLoadInfo *tCreateLastBlockLoadInfo(); void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); +void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el); void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); // ========== inline functions ========== diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 05c95df798..8da783a5bd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -457,7 +457,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid, &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, - &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL); + &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL, NULL); bool hasVal = tMergeTreeNext(&state->mergeTree); if (!hasVal) { state->state = SFSLASTNEXTROW_FILESET; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index d660dfa99d..45fe29f0fa 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -67,6 +67,16 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) { pLoadInfo[i].blockIndex[1] = -1; taosArrayClear(pLoadInfo[i].aSttBlk); + + pLoadInfo[i].elapsedTime = 0; + pLoadInfo[i].loadBlocks = 0; + } +} + +void getLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo, int64_t* blocks, double* el) { + for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) { + *el += pLoadInfo[i].elapsedTime; + *blocks += pLoadInfo[i].loadBlocks; } } @@ -86,7 +96,7 @@ void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) { return NULL; } -static SBlockData* loadBlockIfMissing(SLDataIter *pIter) { +static SBlockData* loadLastBlock(SLDataIter *pIter, const char* idStr) { int32_t code = 0; SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo; @@ -100,8 +110,13 @@ static SBlockData* loadBlockIfMissing(SLDataIter *pIter) { pInfo->currentLoadBlockIndex ^= 1; if (pIter->pSttBlk != NULL) { // current block not loaded yet + int64_t st = taosGetTimestampUs(); code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, &pInfo->blockData[pInfo->currentLoadBlockIndex]); - tsdbDebug("read last block, index:%d, last file index:%d", pIter->iSttBlk, pIter->iStt); + double el = (taosGetTimestampUs() - st)/ 1000.0; + pInfo->elapsedTime += el; + pInfo->loadBlocks += 1; + + tsdbDebug("read last block, index:%d, last file index:%d, elapsed time:%.2f ms, %s", pIter->iSttBlk, pIter->iStt, el, idStr); if (code != TSDB_CODE_SUCCESS) { goto _exit; } @@ -245,9 +260,8 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); // find the start block - int32_t index = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward); - (*pIter)->iSttBlk = index; - if (index != -1) { + (*pIter)->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward); + if ((*pIter)->iSttBlk != -1) { (*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk); (*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1; } @@ -265,7 +279,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) { pIter->iSttBlk += step; int32_t index = -1; - size_t size = pIter->pBlockLoadInfo->aSttBlk->size;//taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk); + size_t size = pIter->pBlockLoadInfo->aSttBlk->size; for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) { SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i); if ((!pIter->backward) && p->minUid > pIter->uid) { @@ -310,13 +324,13 @@ void tLDataIterNextBlock(SLDataIter *pIter) { } } -static void findNextValidRow(SLDataIter *pIter) { +static void findNextValidRow(SLDataIter *pIter, const char* idStr) { int32_t step = pIter->backward ? -1 : 1; bool hasVal = false; int32_t i = pIter->iRow; - SBlockData *pBlockData = loadBlockIfMissing(pIter); + SBlockData *pBlockData = loadLastBlock(pIter, idStr); // mostly we only need to find the start position for a given table if ((((i == 0) && (!pIter->backward)) || (i == pBlockData->nRow - 1 && pIter->backward)) && pBlockData->aUid != NULL) { @@ -376,7 +390,7 @@ static void findNextValidRow(SLDataIter *pIter) { pIter->iRow = (hasVal) ? i : -1; } -bool tLDataIterNextRow(SLDataIter *pIter) { +bool tLDataIterNextRow(SLDataIter *pIter, const char* idStr) { int32_t code = 0; int32_t step = pIter->backward ? -1 : 1; @@ -386,11 +400,11 @@ bool tLDataIterNextRow(SLDataIter *pIter) { } int32_t iBlockL = pIter->iSttBlk; - SBlockData *pBlockData = loadBlockIfMissing(pIter); + SBlockData *pBlockData = loadLastBlock(pIter, idStr); pIter->iRow += step; while (1) { - findNextValidRow(pIter); + findNextValidRow(pIter, idStr); if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) { tLDataIterNextBlock(pIter); @@ -402,7 +416,7 @@ bool tLDataIterNextRow(SLDataIter *pIter) { } if (iBlockL != pIter->iSttBlk) { - pBlockData = loadBlockIfMissing(pIter); + pBlockData = loadLastBlock(pIter, idStr); pIter->iRow += step; } } @@ -445,7 +459,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { } int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo) { + STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo, const char* idStr) { pMTree->backward = backward; pMTree->pIter = NULL; pMTree->pIterList = taosArrayInit(4, POINTER_BYTES); @@ -453,6 +467,8 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead return TSDB_CODE_OUT_OF_MEMORY; } + pMTree->idStr = idStr; + tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); int32_t code = TSDB_CODE_SUCCESS; @@ -475,7 +491,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead goto _end; } - bool hasVal = tLDataIterNextRow(pIter); + bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr); if (hasVal) { taosArrayPush(pMTree->pIterList, &pIter); tMergeTreeAddIter(pMTree, pIter); @@ -498,7 +514,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) { if (pMTree->pIter) { SLDataIter *pIter = pMTree->pIter; - bool hasVal = tLDataIterNextRow(pIter); + bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr); if (!hasVal) { pMTree->pIter = NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index dea17f92da..9956339093 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -70,6 +70,8 @@ typedef struct SIOCostSummary { double smaLoadTime; int64_t lastBlockLoad; double lastBlockLoadTime; + int64_t composedBlocks; + double buildComposedBlockTime; } SIOCostSummary; typedef struct SBlockLoadSuppInfo { @@ -365,6 +367,9 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) { return false; } + SIOCostSummary* pSum = &pReader->cost; + getLastBlockLoadInfo(pIter->pLastBlockReader->pInfo, &pSum->lastBlockLoad, &pReader->cost.lastBlockLoadTime); + pIter->pLastBlockReader->uid = 0; tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo); @@ -1434,11 +1439,6 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, tRowMerge(&merge, &fRow1); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); - // merge with block data if ts == key - if (mergeBlockData && (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex])) { - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - } - int32_t code = tRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1452,9 +1452,10 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, } else { // not merge block data tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); + ASSERT(mergeBlockData); // merge with block data if ts == key - if (mergeBlockData && (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex])) { + if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) { doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); } @@ -1942,7 +1943,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader, - pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo); + pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { return false; } @@ -1982,8 +1983,6 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc tRowMergerClear(&merge); return TSDB_CODE_SUCCESS; } - - return TSDB_CODE_SUCCESS; } static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, @@ -2076,13 +2075,16 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { blockDataUpdateTsWindow(pResBlock, 0); setComposedBlockFlag(pReader, true); - int64_t et = taosGetTimestampUs(); + double el = (taosGetTimestampUs() - st)/1000.0; + + pReader->cost.composedBlocks += 1; + pReader->cost.buildComposedBlockTime += el; if (pResBlock->info.rows > 0) { tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, elapsed time:%.2f ms %s", pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, - pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr); + pResBlock->info.rows, el, pReader->idStr); } return TSDB_CODE_SUCCESS; @@ -3364,24 +3366,27 @@ void tsdbReaderClose(STsdbReader* pReader) { tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap); taosMemoryFree(pReader->status.uidCheckInfo.tableUidList); + SIOCostSummary* pCost = &pReader->cost; SFilesetIter* pFilesetIter = &pReader->status.fileIter; if (pFilesetIter->pLastBlockReader != NULL) { - tMergeTreeClose(&pFilesetIter->pLastBlockReader->mergeTree); - pFilesetIter->pLastBlockReader->pInfo = destroyLastBlockLoadInfo(pFilesetIter->pLastBlockReader->pInfo); - taosMemoryFree(pFilesetIter->pLastBlockReader); + SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader; + tMergeTreeClose(&pLReader->mergeTree); + + getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime); + + pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo); + taosMemoryFree(pLReader); } - SIOCostSummary* pCost = &pReader->cost; - - tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 - " SMA-time:%.2f ms, fileBlocks:%" PRId64 - ", fileBlocks-time:%.2f ms, " - "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 - ", lastBlocks-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s", - pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, - pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, - pCost->lastBlockLoadTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr); + tsdbDebug( + "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 + " SMA-time:%.2f ms, fileBlocks:%" PRId64 ", fileBlocks-load-time:%.2f ms, " + "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64 + ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb %s", + pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks, + pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks, + pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr); taosMemoryFree(pReader->idStr); taosMemoryFree(pReader->pSchema); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 0c35ed5335..5eb6557dbd 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -932,6 +932,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { case STREAM_DELETE_DATA: { copyDataBlock(pInfo->pDelRes, pBlock); pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; + return pInfo->pDelRes; } break; default: return pBlock; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index f158b24b58..b3a85807ae 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1424,7 +1424,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey)); } getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); - } while (win.skey < tsEnds[i]); + } while (win.skey <= tsEnds[i]); } } @@ -3595,7 +3595,8 @@ SArray* getWinInfos(SStreamAggSupporter* pAggSup, uint64_t groupId) { // don't add new window SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex) { - SArray* pWinInfos = getWinInfos(pAggSup, groupId); + STimeWindow searchWin = {.skey = startTs, .ekey = endTs}; + SArray* pWinInfos = getWinInfos(pAggSup, groupId); pAggSup->pCurWins = pWinInfos; int32_t size = taosArrayGetSize(pWinInfos); @@ -3607,7 +3608,7 @@ SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY start SResultWindowInfo* pWin = NULL; if (index >= 0) { pWin = taosArrayGet(pWinInfos, index); - if (isInWindow(pWin, startTs, gap)) { + if (isInWindow(pWin, startTs, gap) || isInTimeWindow(&searchWin, pWin->win.skey, gap)) { *pIndex = index; return pWin; } @@ -3615,7 +3616,7 @@ SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY start if (index + 1 < size) { pWin = taosArrayGet(pWinInfos, index + 1); - if (isInWindow(pWin, startTs, gap)) { + if (isInWindow(pWin, startTs, gap) || isInTimeWindow(&searchWin, pWin->win.skey, gap)) { *pIndex = index + 1; return pWin; } else if (endTs != INT64_MIN && isInWindow(pWin, endTs, gap)) { @@ -3793,7 +3794,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->win, true); compactFunctions(pSup->pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); taosHashRemove(pStUpdated, &pWinInfo->pos, sizeof(SResultRowPosition)); - if (pWinInfo->isOutput) { + if (pWinInfo->isOutput && pStDeleted) { SWinKey res = {.ts = pWinInfo->win.skey, .groupId = groupId}; taosHashPut(pStDeleted, &res, sizeof(SWinKey), &res, sizeof(SWinKey)); pWinInfo->isOutput = false; @@ -3886,19 +3887,24 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); uint64_t* gpDatas = (uint64_t*)pGroupCol->pData; for (int32_t i = 0; i < pBlock->info.rows; i++) { - int32_t winIndex = 0; - while (1) { - SResultWindowInfo* pCurWin = getCurSessionWindow(pAggSup, startDatas[i], endDatas[i], gpDatas[i], gap, &winIndex); - if (!pCurWin) { - break; - } + int32_t winIndex = 0; + SResultWindowInfo* pCurWin = getCurSessionWindow(pAggSup, startDatas[i], endDatas[i], gpDatas[i], gap, &winIndex); + if (!pCurWin) { + continue; + } + + do { SResultWindowInfo delWin = *pCurWin; deleteWindow(pAggSup->pCurWins, winIndex, fp); if (result) { delWin.groupId = gpDatas[i]; taosArrayPush(result, &delWin); } - } + if (winIndex >= taosArrayGetSize(pAggSup->pCurWins)) { + break; + } + pCurWin = taosArrayGet(pAggSup->pCurWins, winIndex); + } while (pCurWin->win.skey <= endDatas[i]); } } @@ -3979,26 +3985,16 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It } static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWinArray, int32_t numOfOutput, - SOperatorInfo* pOperator, SHashObj* pStUpdated, bool needCreate) { + SOperatorInfo* pOperator, SHashObj* pStUpdated) { SExprSupp* pSup = &pOperator->exprSupp; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - int32_t size = taosArrayGetSize(pWinArray); + int32_t size = taosArrayGetSize(pWinArray); ASSERT(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { SResultWindowInfo* pParentWin = taosArrayGet(pWinArray, i); - SResultRow* pCurResult = NULL; uint64_t groupId = pParentWin->groupId; - int32_t winIndex = 0; - if (needCreate) { - pParentWin = - getSessionTimeWindow(&pInfo->streamAggSup, pParentWin->win.skey, pParentWin->win.ekey, groupId, 0, &winIndex); - } - setWindowOutputBuf(pParentWin, &pCurResult, pSup->pCtx, groupId, numOfOutput, pSup->rowEntryInfoOffset, - &pInfo->streamAggSup, pTaskInfo); - int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); - int32_t num = 0; + int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); for (int32_t j = 0; j < numOfChildren; j++) { SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j); SStreamSessionAggOperatorInfo* pChInfo = pChild->info; @@ -4011,31 +4007,36 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin for (int32_t k = index; k < chWinSize; k++) { SResultWindowInfo* pChWin = taosArrayGet(pChWins, k); if (pParentWin->win.skey <= pChWin->win.skey && pChWin->win.ekey <= pParentWin->win.ekey) { + int32_t winIndex = 0; + SResultWindowInfo* pNewParWin = + getSessionTimeWindow(&pInfo->streamAggSup, pChWin->win.skey, pChWin->win.ekey, groupId, 0, &winIndex); + SResultRow* pPareResult = NULL; + setWindowOutputBuf(pNewParWin, &pPareResult, pSup->pCtx, groupId, numOfOutput, pSup->rowEntryInfoOffset, + &pInfo->streamAggSup, pTaskInfo); SResultRow* pChResult = NULL; setWindowOutputBuf(pChWin, &pChResult, pChild->exprSupp.pCtx, groupId, numOfOutput, pChild->exprSupp.rowEntryInfoOffset, &pChInfo->streamAggSup, pTaskInfo); - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pChWin->win, true); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pNewParWin->win, true); compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); + + int32_t winNum = getNumCompactWindow(pInfo->streamAggSup.pCurWins, winIndex, pInfo->gap); + if (winNum > 0) { + compactTimeWindow(pInfo, winIndex, winNum, groupId, numOfOutput, pStUpdated, NULL, pOperator); + } + SFilePage* bufPage = getBufPage(pChInfo->streamAggSup.pResultBuf, pChWin->pos.pageId); releaseBufPage(pChInfo->streamAggSup.pResultBuf, bufPage); - num++; - continue; + + bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pNewParWin->pos.pageId); + setBufPageDirty(bufPage, true); + releaseBufPage(pInfo->streamAggSup.pResultBuf, bufPage); + SWinKey value = {.ts = pNewParWin->win.skey, .groupId = groupId}; + taosHashPut(pStUpdated, &pNewParWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinKey)); } else if (!pChWin->isClosed) { break; } } } - if (num == 0 && needCreate) { - deleteWindow(pInfo->streamAggSup.pCurWins, winIndex, NULL); - } - if (pStUpdated && num > 0) { - SWinKey value = {.ts = pParentWin->win.skey, .groupId = groupId}; - taosHashPut(pStUpdated, &pParentWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinKey)); - } - SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pParentWin->pos.pageId); - ASSERT(size > 0); - setBufPageDirty(bufPage, true); - releaseBufPage(pInfo->streamAggSup.pResultBuf, bufPage); } } @@ -4196,7 +4197,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, pChildOp->exprSupp.numOfExprs, 0, NULL); - rebuildTimeWindow(pInfo, pWins, pOperator->exprSupp.numOfExprs, pOperator, NULL, false); + rebuildTimeWindow(pInfo, pWins, pOperator->exprSupp.numOfExprs, pOperator, pStUpdated); } taosArrayDestroy(pWins); continue; @@ -4210,7 +4211,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; // gap must be 0 doDeleteTimeWindows(&pChildInfo->streamAggSup, pBlock, 0, NULL, NULL); - rebuildTimeWindow(pInfo, pWins, pOperator->exprSupp.numOfExprs, pOperator, pStUpdated, true); + rebuildTimeWindow(pInfo, pWins, pOperator->exprSupp.numOfExprs, pOperator, pStUpdated); } copyDeleteWindowInfo(pWins, pInfo->pStDeleted); removeSessionResults(pStUpdated, pWins); @@ -4747,7 +4748,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_CLEAR) { doClearStateWindows(&pInfo->streamAggSup, pBlock, pSeUpdated, pInfo->pSeDeleted); continue; - } else if (pBlock->info.type == STREAM_DELETE_DATA) { + } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, pWins, destroyStateWinInfo); copyDeleteWindowInfo(pWins, pInfo->pSeDeleted); @@ -5674,7 +5675,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { NULL); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); continue; - } else if (pBlock->info.type == STREAM_DELETE_DATA) { + } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval, pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 46bae734ea..57cf477bf8 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -248,6 +248,12 @@ ./test.sh -f tsim/stream/windowClose.sim ./test.sh -f tsim/stream/ignoreExpiredData.sim ./test.sh -f tsim/stream/sliding.sim +#./test.sh -f tsim/stream/partitionbyColumnInterval.sim +#./test.sh -f tsim/stream/partitionbyColumnSession.sim +#./test.sh -f tsim/stream/partitionbyColumnState.sim +#./test.sh -f tsim/stream/deleteInterval.sim +#./test.sh -f tsim/stream/deleteSession.sim +#./test.sh -f tsim/stream/deleteState.sim # ---- transaction ---- ./test.sh -f tsim/trans/lossdata1.sim diff --git a/tests/script/tsim/column/table.sim b/tests/script/tsim/column/table.sim index 4f1d32c373..03c4799681 100644 --- a/tests/script/tsim/column/table.sim +++ b/tests/script/tsim/column/table.sim @@ -159,6 +159,7 @@ if $data01 != 10 then return -1 endi if $data02 != 4.500000000 then + print expect 4.500000000, actual: $data02 return -1 endi if $data03 != 4.500000000 then diff --git a/tests/script/tsim/stream/deleteSession.sim b/tests/script/tsim/stream/deleteSession.sim new file mode 100644 index 0000000000..541609633b --- /dev/null +++ b/tests/script/tsim/stream/deleteSession.sim @@ -0,0 +1,532 @@ +$loop_all = 0 +looptest: + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 200 +sql connect + +sql drop stream if exists streams0; +sql drop stream if exists streams1; +sql drop stream if exists streams2; +sql drop stream if exists streams3; +sql drop stream if exists streams4; +sql drop database if exists test; +sql create database test vgroups 1; +sql use test; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3 from t1 session(ts, 5s); + +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sleep 200 +sql delete from t1 where ts = 1648791213000; + +$loop_count = 0 + +loop0: +sleep 200 +sql select * from streamt order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 0 then + print =====rows=$rows + goto loop0 +endi + +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); + +$loop_count = 0 + +loop1: +sleep 200 +sql select * from streamt order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != NULL then + print =====data02=$data02 + goto loop1 +endi + +sql insert into t1 values(1648791213000,1,1,1,1.0); +sql insert into t1 values(1648791213001,2,2,2,2.0); +sql insert into t1 values(1648791213002,3,3,3,3.0); +sql insert into t1 values(1648791213003,4,4,4,4.0); + +sleep 200 +sql delete from t1 where ts >= 1648791213001 and ts <= 1648791213002; + +$loop_count = 0 + +loop3: +sleep 200 +sql select * from streamt order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop3 +endi + +if $data02 != 4 then + print =====data02=$data02 + goto loop3 +endi + +sql insert into t1 values(1648791223000,1,2,3,1.0); +sql insert into t1 values(1648791223001,1,2,3,1.0); +sql insert into t1 values(1648791223002,3,2,3,1.0); +sql insert into t1 values(1648791223003,3,2,3,1.0); + +$loop_count = 0 + +loop4: +sleep 200 +sql select * from streamt order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows + goto loop4 +endi + +sleep 200 + +sql delete from t1 where ts >= 1648791223000 and ts <= 1648791223003; + +$loop_count = 0 + +loop5: +sleep 200 +sql select * from streamt order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop5 +endi + +if $data02 != 4 then + print =====data02=$data02 + goto loop5 +endi + +sql insert into t1 values(1648791213000,1,1,1,1.0); +sql insert into t1 values(1648791213005,2,2,2,2.0); +sql insert into t1 values(1648791213006,3,3,3,3.0); +sql insert into t1 values(1648791213007,4,4,4,4.0); + +sql insert into t1 values(1648791223000,1,1,1,1.0); +sql insert into t1 values(1648791223001,2,2,2,2.0); +sql insert into t1 values(1648791223002,3,3,3,3.0); +sql insert into t1 values(1648791223003,4,4,4,4.0); + +sql insert into t1 values(1648791233000,1,1,1,1.0); +sql insert into t1 values(1648791233001,2,2,2,2.0); +sql insert into t1 values(1648791233008,3,3,3,3.0); +sql insert into t1 values(1648791233009,4,4,4,4.0); + +sql delete from t1 where ts >= 1648791213001 and ts <= 1648791233005; + +$loop_count = 0 + +loop6: +sleep 200 +sql select * from streamt order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop6 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop6 +endi + +if $data11 != 2 then + print =====data11=$data11 + goto loop6 +endi + +if $data12 != 4 then + print =====data12=$data12 + goto loop6 +endi + +sql drop stream if exists streams2; +sql drop database if exists test2; +sql create database test2 vgroups 4; +sql use test2; +sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams2 trigger at_once into test.streamt2 as select _wstart c1, count(*) c2, max(a) c3 from st session(ts,5s); + +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL); + +$loop_count = 0 + +loop7: +sleep 200 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop7 +endi + +sleep 200 + +sql delete from t1 where ts = 1648791213000; + +$loop_count = 0 + +loop8: +sleep 200 + +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop8 +endi + +if $data02 != NULL then + print =====data02=$data02 + goto loop8 +endi + +sql insert into t1 values(1648791223000,1,2,3,1.0); +sql insert into t1 values(1648791223001,1,2,3,1.0); +sql insert into t1 values(1648791223002,3,2,3,1.0); +sql insert into t1 values(1648791223003,3,2,3,1.0); +sql insert into t2 values(1648791223000,1,2,3,1.0); +sql insert into t2 values(1648791223001,1,2,3,1.0); +sql insert into t2 values(1648791223002,3,2,3,1.0); +sql insert into t2 values(1648791223003,3,2,3,1.0); + +sleep 200 + +sql delete from t2 where ts >= 1648791223000 and ts <= 1648791223001; + +$loop_count = 0 + +loop11: +sleep 200 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop11 +endi + +if $data02 != NULL then + print =====data02=$data02 + goto loop11 +endi + +if $data11 != 6 then + print =====data11=$data11 + goto loop11 +endi + +if $data12 != 3 then + print =====data12=$data12 + goto loop11 +endi + +sleep 200 + +sql delete from st where ts >= 1648791223000 and ts <= 1648791223003; + +$loop_count = 0 + +loop12: +sleep 200 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop12 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop12 +endi + +if $data02 != NULL then + print =====data02=$data02 + goto loop12 +endi + +sql insert into t1 values(1648791213004,3,2,3,1.0); +sql insert into t1 values(1648791213005,3,2,3,1.0); +sql insert into t1 values(1648791213006,3,2,3,1.0); +sql insert into t1 values(1648791223004,1,2,3,1.0); +sql insert into t2 values(1648791213004,3,2,3,1.0); +sql insert into t2 values(1648791213005,3,2,3,1.0); +sql insert into t2 values(1648791213006,3,2,3,1.0); +sql insert into t2 values(1648791223004,1,2,3,1.0); + +sleep 200 + +sql delete from t2 where ts >= 1648791213004 and ts <= 1648791213006; + +$loop_count = 0 + +loop13: +sleep 200 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows + goto loop13 +endi + +if $data01 != 4 then + print =====data01=$data01 + goto loop13 +endi + +if $data02 != 3 then + print =====data02=$data02 + goto loop13 +endi + +if $data11 != 2 then + print =====data11=$data11 + goto loop13 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop13 +endi + +sql insert into t1 values(1648791223005,1,2,3,1.0); +sql insert into t1 values(1648791223006,1,2,3,1.0); +sql insert into t2 values(1648791223005,1,2,3,1.0); +sql insert into t2 values(1648791223006,1,2,3,1.0); + +sql insert into t1 values(1648791233005,4,2,3,1.0); +sql insert into t1 values(1648791233006,2,2,3,1.0); +sql insert into t2 values(1648791233005,5,2,3,1.0); +sql insert into t2 values(1648791233006,3,2,3,1.0); + +sleep 200 + +sql delete from st where ts >= 1648791213001 and ts <= 1648791233005; + +$loop_count = 0 + +loop14: +sleep 200 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows + goto loop14 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop14 +endi + +if $data02 != NULL then + print =====data02=$data02 + goto loop14 +endi + +if $data11 != 2 then + print =====data11=$data11 + goto loop14 +endi + +if $data12 != 3 then + print =====data12=$data12 + goto loop14 +endi + +sql drop stream if exists streams1; +sql drop stream if exists streams2; +sql drop stream if exists streams3; +sql drop database if exists test3; +sql drop database if exists test; +sql create database test3 vgroups 4; +sql create database test vgroups 1; +sql use test3; +sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams3 trigger at_once into test.streamt3 as select _wstart c1, count(*) c2, max(a) c3 from st session(ts,5s); + +sql insert into t1 values(1648791210000,1,1,1,NULL); +sql insert into t1 values(1648791210001,2,2,2,NULL); +sql insert into t2 values(1648791213001,3,3,3,NULL); +sql insert into t2 values(1648791213003,4,4,4,NULL); +sql insert into t1 values(1648791216000,5,5,5,NULL); +sql insert into t1 values(1648791216002,6,6,6,NULL); +sql insert into t1 values(1648791216004,7,7,7,NULL); +sql insert into t2 values(1648791218001,8,8,8,NULL); +sql insert into t2 values(1648791218003,9,9,9,NULL); +sql insert into t1 values(1648791222000,10,10,10,NULL); +sql insert into t1 values(1648791222003,11,11,11,NULL); +sql insert into t1 values(1648791222005,12,12,12,NULL); + +sql insert into t1 values(1648791232005,13,13,13,NULL); +sql insert into t2 values(1648791242005,14,14,14,NULL); + +$loop_count = 0 + +loop19: +sleep 200 +sql select * from test.streamt3 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 3 then + print =====rows=$rows + goto loop19 +endi + +sql delete from t2 where ts >= 1648791213001 and ts <= 1648791218003; + +$loop_count = 0 + +loop20: +sleep 200 +sql select * from test.streamt3 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 5 then + print =====rows=$rows + goto loop20 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop20 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop20 +endi + +if $data11 != 3 then + print =====data11=$data11 + goto loop20 +endi + +if $data12 != 7 then + print =====data12=$data12 + goto loop20 +endi + +if $data21 != 3 then + print =====data21=$data21 + goto loop20 +endi + +if $data22 != 12 then + print =====data22=$data22 + goto loop20 +endi + +if $data31 != 1 then + print =====data31=$data31 + goto loop20 +endi + +if $data32 != 13 then + print =====data32=$data32 + goto loop20 +endi + +if $data41 != 1 then + print =====data41=$data41 + goto loop20 +endi + +if $data42 != 14 then + print =====data42=$data42 + goto loop20 +endi + +$loop_all = $loop_all + 1 +print ============loop_all=$loop_all + +system sh/stop_dnodes.sh + +#goto looptest \ No newline at end of file diff --git a/tests/script/tsim/stream/deleteState.sim b/tests/script/tsim/stream/deleteState.sim new file mode 100644 index 0000000000..ecd9f55340 --- /dev/null +++ b/tests/script/tsim/stream/deleteState.sim @@ -0,0 +1,198 @@ +$loop_all = 0 +looptest: + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 200 +sql connect + +sql drop stream if exists streams0; +sql drop stream if exists streams1; +sql drop stream if exists streams2; +sql drop stream if exists streams3; +sql drop stream if exists streams4; +sql drop database if exists test; +sql create database test vgroups 1; +sql use test; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(b) c3 from t1 state_window(a); + +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sleep 200 +sql delete from t1 where ts = 1648791213000; + +$loop_count = 0 + +loop0: +sleep 200 +sql select * from streamt order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 0 then + print =====rows=$rows + goto loop0 +endi + +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); + +$loop_count = 0 + +loop1: +sleep 200 +sql select * from streamt order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != NULL then + print =====data02=$data02 + goto loop1 +endi + +sql insert into t1 values(1648791213000,1,1,1,1.0); +sql insert into t1 values(1648791213001,1,2,2,2.0); +sql insert into t1 values(1648791213002,1,3,3,3.0); +sql insert into t1 values(1648791213003,1,4,4,4.0); + +sleep 200 +sql delete from t1 where ts >= 1648791213001 and ts <= 1648791213002; + +$loop_count = 0 + +loop3: +sleep 200 +sql select * from streamt order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop3 +endi + +if $data02 != 4 then + print =====data02=$data02 + goto loop3 +endi + +sql insert into t1 values(1648791223000,2,2,3,1.0); +sql insert into t1 values(1648791223001,2,2,3,1.0); +sql insert into t1 values(1648791223002,2,2,3,1.0); +sql insert into t1 values(1648791223003,2,2,3,1.0); + +$loop_count = 0 + +loop4: +sleep 200 +sql select * from streamt order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows + goto loop4 +endi + +sleep 200 + +sql delete from t1 where ts >= 1648791223000 and ts <= 1648791223003; + +$loop_count = 0 + +loop5: +sleep 200 +sql select * from streamt order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop5 +endi + +if $data02 != 4 then + print =====data02=$data02 + goto loop5 +endi + +sql insert into t1 values(1648791213000,1,1,1,1.0); +sql insert into t1 values(1648791213005,1,2,2,2.0); +sql insert into t1 values(1648791213006,1,3,3,3.0); +sql insert into t1 values(1648791213007,1,4,4,4.0); + +sql insert into t1 values(1648791223000,2,1,1,1.0); +sql insert into t1 values(1648791223001,2,2,2,2.0); +sql insert into t1 values(1648791223002,2,3,3,3.0); +sql insert into t1 values(1648791223003,2,4,4,4.0); + +sql insert into t1 values(1648791233000,3,1,1,1.0); +sql insert into t1 values(1648791233001,3,2,2,2.0); +sql insert into t1 values(1648791233008,3,3,3,3.0); +sql insert into t1 values(1648791233009,3,4,4,4.0); + +sql delete from t1 where ts >= 1648791213001 and ts <= 1648791233005; + +$loop_count = 0 + +loop6: +sleep 200 +sql select * from streamt order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows + goto loop6 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop6 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop6 +endi + +if $data11 != 2 then + print =====data11=$data11 + goto loop6 +endi + +if $data12 != 4 then + print =====data12=$data12 + goto loop6 +endi + + +$loop_all = $loop_all + 1 +print ============loop_all=$loop_all + +system sh/stop_dnodes.sh + +#goto looptest \ No newline at end of file diff --git a/tests/script/tsim/stream/partitionbyColumn0.sim b/tests/script/tsim/stream/partitionbyColumnInterval.sim similarity index 100% rename from tests/script/tsim/stream/partitionbyColumn0.sim rename to tests/script/tsim/stream/partitionbyColumnInterval.sim diff --git a/tests/script/tsim/stream/partitionbyColumn1.sim b/tests/script/tsim/stream/partitionbyColumnSession.sim similarity index 100% rename from tests/script/tsim/stream/partitionbyColumn1.sim rename to tests/script/tsim/stream/partitionbyColumnSession.sim diff --git a/tests/script/tsim/stream/partitionbyColumn2.sim b/tests/script/tsim/stream/partitionbyColumnState.sim similarity index 100% rename from tests/script/tsim/stream/partitionbyColumn2.sim rename to tests/script/tsim/stream/partitionbyColumnState.sim