From 855f1842c2760e038c8f3cda43b1ef1455e7a12c Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Thu, 16 Jun 2022 15:28:52 +0800 Subject: [PATCH 1/5] ci: allow to run specified cases in windows full test --- tests/system-test/test-all.bat | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/system-test/test-all.bat b/tests/system-test/test-all.bat index 0929b1fc6e..275cbeebbb 100644 --- a/tests/system-test/test-all.bat +++ b/tests/system-test/test-all.bat @@ -5,7 +5,12 @@ set /a a=0 if %1 == full ( echo Windows Taosd Full Test set /a exitNum=0 - for /F "usebackq tokens=*" %%i in (fulltest.bat) do ( + del /Q /F failed.txt + set caseFile="fulltest.bat" + if not "%2" == "" ( + set caseFile="%2" + ) + for /F "usebackq tokens=*" %%i in (!caseFile!) do ( for /f "tokens=1* delims= " %%a in ("%%i") do if not "%%a" == "@REM" ( set /a a+=1 echo !a! Processing %%i @@ -13,7 +18,7 @@ if %1 == full ( set time1=!_timeTemp! echo Start at !time! call %%i ARG1 > result_!a!.txt 2>error_!a!.txt - if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && set /a exitNum=8 ) else ( call :colorEcho 0a "Success" &echo. ) + if errorlevel 1 ( call :colorEcho 0c "failed" &echo. && set /a exitNum=8 && echo %%i >>failed.txt ) else ( call :colorEcho 0a "Success" &echo. ) ) ) exit !exitNum! @@ -77,4 +82,4 @@ for %%a in (%tt%) do ( set /a index=index+1 ) set /a _timeTemp=(%hh%*60+%mm%)*60+%ss% -goto :eof \ No newline at end of file +goto :eof From dbef3dfc55d19e1d3e182ef61f23629fe7261eff Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 16 Jun 2022 15:34:33 +0800 Subject: [PATCH 2/5] fix: not submit for rsma without rollup --- source/common/src/tdatablock.c | 42 ++++++++++++++++---------- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/sma/smaRollup.c | 14 +++++++-- 3 files changed, 38 insertions(+), 19 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5a2aaed74e..3ec1b98b16 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1583,6 +1583,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks int32_t rowSize = pDataBlock->info.rowSize; int64_t groupId = pDataBlock->info.groupId; + if (colNum <= 1) { + // invalid if only with TS col + continue; + } + if (rb.nCols != colNum) { tdSRowSetTpInfo(&rb, colNum, pTSchema->flen); } @@ -1679,23 +1684,28 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks msgLen += pSubmitBlk->dataLen; } - (*pReq)->length = msgLen; + if (numOfBlks > 0) { + (*pReq)->length = msgLen; - (*pReq)->header.vgId = htonl(vgId); - (*pReq)->header.contLen = htonl(msgLen); - (*pReq)->length = (*pReq)->header.contLen; - (*pReq)->numOfBlocks = htonl(numOfBlks); - SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1); - while (numOfBlks--) { - int32_t dataLen = blk->dataLen; - blk->uid = htobe64(blk->uid); - blk->suid = htobe64(blk->suid); - blk->padding = htonl(blk->padding); - blk->sversion = htonl(blk->sversion); - blk->dataLen = htonl(blk->dataLen); - blk->schemaLen = htonl(blk->schemaLen); - blk->numOfRows = htons(blk->numOfRows); - blk = (SSubmitBlk*)(blk->data + dataLen); + (*pReq)->header.vgId = htonl(vgId); + (*pReq)->header.contLen = htonl(msgLen); + (*pReq)->length = (*pReq)->header.contLen; + (*pReq)->numOfBlocks = htonl(numOfBlks); + SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1); + while (numOfBlks--) { + int32_t dataLen = blk->dataLen; + blk->uid = htobe64(blk->uid); + blk->suid = htobe64(blk->suid); + blk->padding = htonl(blk->padding); + blk->sversion = htonl(blk->sversion); + blk->dataLen = htonl(blk->dataLen); + blk->schemaLen = htonl(blk->schemaLen); + blk->numOfRows = htons(blk->numOfRows); + blk = (SSubmitBlk*)(blk->data + dataLen); + } + } else { + // no valid rows + taosMemoryFreeClear(*pReq); } return TSDB_CODE_SUCCESS; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 2fd0815181..553c6a40ab 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -260,6 +260,7 @@ struct SSma { #define SMA_CFG(s) (&(s)->pVnode->config) #define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg) +#define SMA_RETENTION(s) ((SRetention *)&(s)->pVnode->config.tsdbCfg.retentions) #define SMA_LOCKED(s) ((s)->locked) #define SMA_META(s) ((s)->pVnode->pMeta) #define SMA_VID(s) TD_VID((s)->pVnode) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index b2dcce8f4c..0c372dfa70 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -400,22 +400,24 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3 } if (taosArrayGetSize(pResult) > 0) { -#if 1 +#if 0 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, level); blockDebugShowData(pResult, flag); #endif STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); SSubmitReq *pReq = NULL; - if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) != 0) { + if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) { taosArrayDestroy(pResult); return TSDB_CODE_FAILED; } - if (tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) != 0) { + + if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) { taosArrayDestroy(pResult); taosMemoryFreeClear(pReq); return TSDB_CODE_FAILED; } + taosMemoryFreeClear(pReq); } else { smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno)); @@ -469,6 +471,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { return TSDB_CODE_SUCCESS; } + SRetention *pRetention = SMA_RETENTION(pSma); + if (!RETENTION_VALID(pRetention + 1)) { + // return directly if retention level 1 is invalid + return TSDB_CODE_SUCCESS; + } + if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { STbUidStore uidStore = {0}; tdFetchSubmitReqSuids(pMsg, &uidStore); From 528d7d7248cbafb734aec558368cacbcaa832551 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Thu, 16 Jun 2022 15:46:51 +0800 Subject: [PATCH 3/5] os: add dll check --- include/os/osSysinfo.h | 1 + source/client/src/clientHb.c | 4 +++- source/libs/catalog/src/ctgCache.c | 4 +++- source/os/src/osSysinfo.c | 16 ++++++++++++++++ source/util/src/tcache.c | 6 +++++- 5 files changed, 28 insertions(+), 3 deletions(-) diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index c009bcf350..4ec2e2884e 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -68,6 +68,7 @@ typedef struct { } SysNameInfo; SysNameInfo taosGetSysNameInfo(); +bool taosCheckCurrentInDll(); #ifdef __cplusplus } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 5db27ae438..c450e9351f 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -588,7 +588,9 @@ void hbThreadFuncUnexpectedStopped(void) { static void *hbThreadFunc(void *param) { setThreadName("hb"); #ifdef WINDOWS - atexit(hbThreadFuncUnexpectedStopped); + if (taosCheckCurrentInDll()) { + atexit(hbThreadFuncUnexpectedStopped); + } #endif while (1) { int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 277516686b..0948c01270 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -1938,7 +1938,9 @@ void ctgCleanupCacheQueue(void) { void* ctgUpdateThreadFunc(void* param) { setThreadName("catalog"); #ifdef WINDOWS - atexit(ctgUpdateThreadUnexpectedStopped); + if (taosCheckCurrentInDll()) { + atexit(ctgUpdateThreadUnexpectedStopped); + } #endif qInfo("catalog update thread started"); diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index ace870853f..4981f7dc26 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -945,3 +945,19 @@ SysNameInfo taosGetSysNameInfo() { return info; #endif } + + +bool taosCheckCurrentInDll() { +#ifdef WINDOWS + MEMORY_BASIC_INFORMATION mbi; + char path[PATH_MAX] = {0}; + GetModuleFileName(((VirtualQuery(taosCheckCurrentInDll,&mbi,sizeof(mbi)) != 0) ? (HMODULE)mbi.AllocationBase : NULL), path, PATH_MAX); + int strLastIndex = strlen(path); + if ((path[strLastIndex-3] == 'd' || path[strLastIndex-3] == 'D') && (path[strLastIndex-2] == 'l' || path[strLastIndex-2] == 'L') && (path[strLastIndex-1] == 'l' || path[strLastIndex-1] == 'L')) { + return true; + } + return false; +#else + return false; +#endif +} \ No newline at end of file diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index 10a5475555..1a716752a9 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -829,7 +829,11 @@ void *taosCacheTimedRefresh(void *handle) { const int32_t SLEEP_DURATION = 500; // 500 ms int64_t count = 0; - atexit(taosCacheRefreshWorkerUnexpectedStopped); +#ifdef WINDOWS + if (taosCheckCurrentInDll()) { + atexit(taosCacheRefreshWorkerUnexpectedStopped); + } +#endif while (1) { taosMsleep(SLEEP_DURATION); From bd518f3f71557f86b9d5b5273603ca6d14e066a3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 Jun 2022 17:39:33 +0800 Subject: [PATCH 4/5] fix(query): remove group info during sort. --- source/libs/executor/inc/executorimpl.h | 8 --- source/libs/executor/inc/tsort.h | 6 --- source/libs/executor/src/executorimpl.c | 31 +++-------- source/libs/executor/src/scanoperator.c | 14 +---- source/libs/executor/src/sortoperator.c | 70 +++++-------------------- source/libs/executor/src/tsort.c | 57 ++++++-------------- 6 files changed, 37 insertions(+), 149 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b435446513..034e2893df 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -696,10 +696,6 @@ typedef struct SSortedMergeOperatorInfo { int32_t numOfResPerPage; char** groupVal; SArray *groupInfo; - - bool hasGroupId; - uint64_t groupId; - STupleHandle* prefetchedTuple; } SSortedMergeOperatorInfo; typedef struct SSortOperatorInfo { @@ -712,10 +708,6 @@ typedef struct SSortOperatorInfo { int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. - - STupleHandle *prefetchedTuple; - bool hasGroupId; - uint64_t groupId; } SSortOperatorInfo; typedef struct STagFilterOperatorInfo { diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index e731c55a7d..86ee841cc2 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -130,12 +130,6 @@ bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colId); */ void* tsortGetValue(STupleHandle* pVHandle, int32_t colId); -/** - * - * @param pVHandle - * @return - */ -uint64_t tsortGetGroupId(STupleHandle* pVHandle); /** * * @param pSortHandle diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index bcd7b7f86f..daee71b666 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3030,31 +3030,12 @@ SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo blockDataEnsureCapacity(p, capacity); while (1) { - STupleHandle* pTupleHandle = NULL; - if (pInfo->prefetchedTuple == NULL) { - pTupleHandle = tsortNextTuple(pHandle); - } else { - pTupleHandle = pInfo->prefetchedTuple; - pInfo->groupId = tsortGetGroupId(pTupleHandle); - pInfo->prefetchedTuple = NULL; - } - + STupleHandle* pTupleHandle = tsortNextTuple(pHandle); if (pTupleHandle == NULL) { break; } - uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); - if (!pInfo->hasGroupId) { - pInfo->groupId = tupleGroupId; - pInfo->hasGroupId = true; - appendOneRowToDataBlock(p, pTupleHandle); - } else if (pInfo->groupId == tupleGroupId) { - appendOneRowToDataBlock(p, pTupleHandle); - } else { - pInfo->prefetchedTuple = pTupleHandle; - break; - } - + appendOneRowToDataBlock(p, pTupleHandle); if (p->info.rows >= capacity) { break; } @@ -3073,7 +3054,6 @@ SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo pDataBlock->info.rows = p->info.rows; pDataBlock->info.capacity = p->info.rows; - pDataBlock->info.groupId = pInfo->groupId; } blockDataDestroy(p); @@ -3339,7 +3319,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); } - size_t rows = blockDataGetNumOfRows(pInfo->pRes); // pInfo->pRes : NULL; + size_t rows = blockDataGetNumOfRows(pInfo->pRes); pOperator->resultInfo.totalRows += rows; return (rows == 0) ? NULL : pInfo->pRes; @@ -4920,7 +4900,10 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { } SArray* extractPartitionColInfo(SNodeList* pNodeList) { - if(!pNodeList) return NULL; + if(!pNodeList) { + return NULL; + } + size_t numOfCols = LIST_LENGTH(pNodeList); SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn)); if (pList == NULL) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1ffb1529d7..ddebba1c9e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2175,25 +2175,13 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa pTupleHandle = tsortNextTuple(pHandle); } else { pTupleHandle = pInfo->prefetchedTuple; - pInfo->groupId = tsortGetGroupId(pTupleHandle); - pInfo->prefetchedTuple = NULL; } if (pTupleHandle == NULL) { break; } - uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); - if (!pInfo->hasGroupId) { - pInfo->groupId = tupleGroupId; - pInfo->hasGroupId = true; - appendOneRowToDataBlock(p, pTupleHandle); - } else if (pInfo->groupId == tupleGroupId) { - appendOneRowToDataBlock(p, pTupleHandle); - } else { - pInfo->prefetchedTuple = pTupleHandle; - break; - } + appendOneRowToDataBlock(p, pTupleHandle); if (p->info.rows >= capacity) { break; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 2f2080d5fe..81899b68cd 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -40,15 +40,13 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR initResultSizeInfo(pOperator, 1024); - pInfo->pSortInfo = pSortInfo; - pInfo->pColMatchInfo = pColMatchColInfo; - pInfo->hasGroupId = false; - pInfo->prefetchedTuple = NULL; - pOperator->name = "SortOperator"; + pInfo->pSortInfo = pSortInfo; + pInfo->pColMatchInfo = pColMatchColInfo; + pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; // lazy evaluation for the following parameter since the input datablock is not known till now. // pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + @@ -97,31 +95,12 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i blockDataEnsureCapacity(p, capacity); while (1) { - STupleHandle* pTupleHandle = NULL; - if (pInfo->prefetchedTuple == NULL) { - pTupleHandle = tsortNextTuple(pHandle); - } else { - pTupleHandle = pInfo->prefetchedTuple; - pInfo->groupId = tsortGetGroupId(pTupleHandle); - pInfo->prefetchedTuple = NULL; - } - + STupleHandle* pTupleHandle = tsortNextTuple(pHandle); if (pTupleHandle == NULL) { break; } - uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); - if (!pInfo->hasGroupId) { - pInfo->groupId = tupleGroupId; - pInfo->hasGroupId = true; - appendOneRowToDataBlock(p, pTupleHandle); - } else if (pInfo->groupId == tupleGroupId) { - appendOneRowToDataBlock(p, pTupleHandle); - } else { - pInfo->prefetchedTuple = pTupleHandle; - break; - } - + appendOneRowToDataBlock(p, pTupleHandle); if (p->info.rows >= capacity) { break; } @@ -140,7 +119,6 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i pDataBlock->info.rows = p->info.rows; pDataBlock->info.capacity = p->info.rows; - pDataBlock->info.groupId = pInfo->groupId; } blockDataDestroy(p); @@ -255,10 +233,7 @@ typedef struct SMultiwaySortMergeOperatorInfo { SSDataBlock* pInputBlock; int64_t startTs; // sort start time - - bool hasGroupId; - uint64_t groupId; - STupleHandle* prefetchedTuple; + uint64_t groupId; } SMultiwaySortMergeOperatorInfo; int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { @@ -312,31 +287,12 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData blockDataEnsureCapacity(p, capacity); while (1) { - STupleHandle* pTupleHandle = NULL; - if (pInfo->prefetchedTuple == NULL) { - pTupleHandle = tsortNextTuple(pHandle); - } else { - pTupleHandle = pInfo->prefetchedTuple; - pInfo->groupId = tsortGetGroupId(pTupleHandle); - pInfo->prefetchedTuple = NULL; - } - + STupleHandle* pTupleHandle = tsortNextTuple(pHandle); if (pTupleHandle == NULL) { break; } - uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); - if (!pInfo->hasGroupId) { - pInfo->groupId = tupleGroupId; - pInfo->hasGroupId = true; - appendOneRowToDataBlock(p, pTupleHandle); - } else if (pInfo->groupId == tupleGroupId) { - appendOneRowToDataBlock(p, pTupleHandle); - } else { - pInfo->prefetchedTuple = pTupleHandle; - break; - } - + appendOneRowToDataBlock(p, pTupleHandle); if (p->info.rows >= capacity) { break; } @@ -432,14 +388,12 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pInfo->hasGroupId = false; - pInfo->prefetchedTuple = NULL; pOperator->pTaskInfo = pTaskInfo; pInfo->bufPageSize = getProperSortPageSize(rowSize); uint32_t numOfSources = taosArrayGetSize(pSortInfo); - numOfSources = TMAX(2, numOfSources); + numOfSources = TMAX(4, numOfSources); pInfo->sortBufSize = numOfSources * pInfo->bufPageSize; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 5bcd58f8db..1502387360 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -557,59 +557,40 @@ static int32_t createInitialSources(SSortHandle* pHandle) { SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0); taosArrayClear(pHandle->pOrderedSource); - bool hasGroupId = false; - SSDataBlock* prefetchedDataBlock = NULL; - while (1) { - SSDataBlock* pBlock = NULL; - if (prefetchedDataBlock == NULL) { - pBlock = pHandle->fetchfp(source->param); - } else { - pBlock = prefetchedDataBlock; - prefetchedDataBlock = NULL; - } - + SSDataBlock* pBlock = pHandle->fetchfp(source->param); if (pBlock == NULL) { break; } - if (!hasGroupId) { - // calculate the buffer pages according to the total available buffers. + if (pHandle->pDataBlock == NULL) { pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock)); // todo, number of pages are set according to the total available sort buffer pHandle->numOfPages = 1024; sortBufSize = pHandle->numOfPages * pHandle->pageSize; - - hasGroupId = true; pHandle->pDataBlock = createOneDataBlock(pBlock, false); } - if (pHandle->pDataBlock->info.groupId == pBlock->info.groupId) { - // perform the scalar function calculation before apply the sort - if (pHandle->beforeFp != NULL) { - pHandle->beforeFp(pBlock, pHandle->param); - } + if (pHandle->beforeFp != NULL) { + pHandle->beforeFp(pBlock, pHandle->param); + } - int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock); - if (code != 0) { - return code; - } + int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock); + if (code != 0) { + return code; + } - size_t size = blockDataGetSize(pHandle->pDataBlock); - if (size > sortBufSize) { - // Perform the in-memory sort and then flush data in the buffer into disk. - int64_t p = taosGetTimestampUs(); - blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); + size_t size = blockDataGetSize(pHandle->pDataBlock); + if (size > sortBufSize) { + // Perform the in-memory sort and then flush data in the buffer into disk. + int64_t p = taosGetTimestampUs(); + blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); - int64_t el = taosGetTimestampUs() - p; - pHandle->sortElapsed += el; + int64_t el = taosGetTimestampUs() - p; + pHandle->sortElapsed += el; - doAddToBuf(pHandle->pDataBlock, pHandle); - } - } else { - prefetchedDataBlock = pBlock; - pHandle->pDataBlock = createOneDataBlock(pBlock, false); + doAddToBuf(pHandle->pDataBlock, pHandle); } } @@ -758,10 +739,6 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) { } } -uint64_t tsortGetGroupId(STupleHandle* pVHandle) { - return pVHandle->pBlock->info.groupId; -} - SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) { SSortExecInfo info = {0}; From 7b98430906e03208282be22ac084585172ac0a01 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 16 Jun 2022 17:26:52 +0800 Subject: [PATCH 5/5] feat(stream): stream partition by --- source/libs/executor/src/timewindowoperator.c | 170 ++++++++++-------- .../tsim/stream/distributeInterval0.sim | 37 +++- tests/script/tsim/stream/partitionby.sim | 62 ++++++- 3 files changed, 195 insertions(+), 74 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index f8972a02a1..d7ae823522 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1257,6 +1257,10 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { return TSDB_CODE_SUCCESS; } +bool isCloseWindow(STimeWindow *pWin, STimeWindowAggSupp* pSup) { + return pWin->ekey < pSup->maxTs - pSup->waterMark; +} + static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, SArray* closeWins) { void* pIte = NULL; @@ -1269,7 +1273,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL); - if (win.ekey < pSup->maxTs - pSup->waterMark) { + if (isCloseWindow(&win, pSup)) { char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); taosHashRemove(pHashMap, keyBuf, keyLen); @@ -2036,59 +2040,6 @@ _error: return NULL; } -static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId, - SArray* pUpdated) { - SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; - SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); - SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - int32_t numOfOutput = pOperatorInfo->numOfExprs; - int32_t step = 1; - bool ascScan = true; - TSKEY* tsCols = NULL; - SResultRow* pResult = NULL; - int32_t forwardRows = 0; - - if (pSDataBlock->pDataBlock != NULL) { - SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); - tsCols = (int64_t*)pColDataInfo->pData; - } else { - return; - } - - int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1); - TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); - STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, - pInfo->interval.precision, NULL); - while (1) { - int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx, - numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); - if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); - pos->groupId = tableGroupId; - pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; - *(int64_t*)pos->key = pResult->win.skey; - forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, - TSDB_ORDER_ASC); - if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { - saveResultRow(pResult, tableGroupId, pUpdated); - } - // window start(end) key interpolation - // doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, - // forwardRows); - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, - tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); - int32_t prevEndPos = (forwardRows - 1) * step + startPos; - ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); - startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); - if (startPos < 0) { - break; - } - } -} - bool isFinalInterval(SStreamFinalIntervalOperatorInfo* pInfo) { return pInfo->pChildren != NULL; } void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput, @@ -2130,6 +2081,74 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArra } } +bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { + SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId); + SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, + pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(int64_t))); + return p1 == NULL; +} + +static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId, + SArray* pUpdated) { + SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; + SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); + SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; + int32_t numOfOutput = pOperatorInfo->numOfExprs; + int32_t step = 1; + bool ascScan = true; + TSKEY* tsCols = NULL; + SResultRow* pResult = NULL; + int32_t forwardRows = 0; + + if (pSDataBlock->pDataBlock != NULL) { + SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); + tsCols = (int64_t*)pColDataInfo->pData; + } else { + return; + } + + int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1); + TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); + STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, + pInfo->interval.precision, NULL); + while (1) { + if (isFinalInterval(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) && + isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup)) { + SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow)); + taosArrayPush(pUpWins, &nextWin); + rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId, + pOperatorInfo->numOfExprs, pOperatorInfo->pTaskInfo); + taosArrayDestroy(pUpWins); + } + int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx, + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); + pos->groupId = tableGroupId; + pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; + *(int64_t*)pos->key = pResult->win.skey; + forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, + TSDB_ORDER_ASC); + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { + saveResultRow(pResult, tableGroupId, pUpdated); + } + // window start(end) key interpolation + // doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, + // forwardRows); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, + tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + int32_t prevEndPos = (forwardRows - 1) * step + startPos; + ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); + startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); + if (startPos < 0) { + break; + } + } +} + static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) { taosHashClear(pInfo->aggSup.pResultRowHashTable); clearDiskbasedBuf(pInfo->aggSup.pResultBuf); @@ -2169,6 +2188,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SOperatorInfo* downstream = pOperator->pDownstream[0]; SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); + TSKEY maxTs = INT64_MIN; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -2222,6 +2242,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true); + doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated); if (isFinalInterval(pInfo)) { int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -2238,10 +2259,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { setInputDataBlock(pChildOp, pChInfo->binfo.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true); doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL); } - doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated); - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); + maxTs = TMAX(maxTs, pBlock->info.window.ekey); } + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); if (isFinalInterval(pInfo)) { closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); } @@ -2564,7 +2585,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t } static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx, - int32_t groupId, int32_t numOfOutput, int32_t* rowCellInfoOffset, + uint64_t groupId, int32_t numOfOutput, int32_t* rowCellInfoOffset, SStreamAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) { assert(pWinInfo->win.skey <= pWinInfo->win.ekey); // too many time window in query @@ -2642,7 +2663,7 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap) return size - startIndex - 1; } -void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num, int32_t groupId, +void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num, uint64_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) { SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pCurWins, startIndex); SResultRow* pCurResult = NULL; @@ -2667,13 +2688,18 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, } } +typedef struct SWinRes { + TSKEY ts; + uint64_t groupId; +} SWinRes; + static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated, SHashObj* pStDeleted) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; bool masterScan = true; int32_t numOfOutput = pOperator->numOfExprs; - int64_t groupId = pSDataBlock->info.groupId; + uint64_t groupId = pSDataBlock->info.groupId; int64_t gap = pInfo->gap; int64_t code = TSDB_CODE_SUCCESS; @@ -2693,7 +2719,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; for (int32_t i = 0; i < pSDataBlock->info.rows;) { int32_t winIndex = 0; - SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], pSDataBlock->info.groupId, gap, &winIndex); + SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, tsCols[i], groupId, gap, &winIndex); winRows = updateSessionWindowInfo(pCurWin, tsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted); code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pTaskInfo); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { @@ -2709,7 +2735,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } pCurWin->isClosed = false; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &(pCurWin->win.skey), sizeof(TSKEY)); + SWinRes value = {.ts = pCurWin->win.skey, .groupId = groupId}; + code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes)); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -2736,7 +2763,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo* } } -static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated, int32_t groupId) { +static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated) { void* pData = NULL; size_t keyLen = 0; while ((pData = taosHashIterate(pStUpdated, pData)) != NULL) { @@ -2746,9 +2773,9 @@ static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated, int32_t if (pos == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } - pos->groupId = groupId; + pos->groupId = ((SWinRes*)pData)->groupId; pos->pos = *(SResultRowPosition*)key; - *(int64_t*)pos->key = *(uint64_t*)pData; + *(int64_t*)pos->key = ((SWinRes*)pData)->ts; taosArrayPush(pUpdated, &pos); } return TSDB_CODE_SUCCESS; @@ -2815,7 +2842,9 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra __get_win_info_ fn) { // Todo(liuyao) save window to tdb void **pIte = NULL; + size_t keyLen = 0; while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { + uint64_t* pGroupId = taosHashGetKey(pIte, &keyLen); SArray *pWins = (SArray *) (*pIte); int32_t size = taosArrayGetSize(pWins); for (int32_t i = 0; i < size; i++) { @@ -2825,7 +2854,7 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra if (!pSeWin->isClosed) { pSeWin->isClosed = true; if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed); + int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, *pGroupId, pClosed); pSeWin->isOutput = true; } } @@ -2892,7 +2921,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; doClearSessionWindows(&pChildInfo->streamAggSup, &pChildInfo->binfo, pBlock, 0, pChildOp->numOfExprs, pChildInfo->gap, NULL); - rebuildTimeWindow(pInfo, pWins, pInfo->binfo.pRes->info.groupId, pOperator->numOfExprs, pOperator->pTaskInfo); + rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->numOfExprs, pOperator->pTaskInfo); } taosArrayDestroy(pWins); continue; @@ -2916,7 +2945,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getSessionWinInfo); - copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId); + copyUpdateResult(pStUpdated, pUpdated); taosHashCleanup(pStUpdated); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, @@ -3216,8 +3245,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } pCurWin->winInfo.isClosed = false; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &(pCurWin->winInfo.win.skey), - sizeof(TSKEY)); + SWinRes value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId}; + code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), + &value, sizeof(SWinRes)); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -3274,7 +3304,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getStateWinInfo); - copyUpdateResult(pSeUpdated, pUpdated, pBInfo->pRes->info.groupId); + copyUpdateResult(pSeUpdated, pUpdated); taosHashCleanup(pSeUpdated); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, diff --git a/tests/script/tsim/stream/distributeInterval0.sim b/tests/script/tsim/stream/distributeInterval0.sim index f4f3e04f0a..b720272116 100644 --- a/tests/script/tsim/stream/distributeInterval0.sim +++ b/tests/script/tsim/stream/distributeInterval0.sim @@ -173,4 +173,39 @@ endi sql select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s); -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file + +sql create database test1 vgroups 1; +sql use test1; +sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create stream stream_t2 trigger at_once into streamtST1 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ; + +sql insert into ts1 values(1648791211000,1,2,3); +sql insert into ts1 values(1648791222001,2,2,3); +sql insert into ts2 values(1648791211000,1,2,3); +sql insert into ts2 values(1648791222001,2,2,3); + +$loop_count = 0 +loop2: +sql select * from streamtST1; + +sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $data01 != 2 then + print =====data01=$data01 + goto loop2 +endi + +#rows 1 +if $data11 != 2 then + print =====data11=$data11 + goto loop2 +endi + +system sh/stop_dnodes.sh \ No newline at end of file diff --git a/tests/script/tsim/stream/partitionby.sim b/tests/script/tsim/stream/partitionby.sim index df1e096551..b84a01eb4a 100644 --- a/tests/script/tsim/stream/partitionby.sim +++ b/tests/script/tsim/stream/partitionby.sim @@ -34,6 +34,7 @@ print =====rows=$rows goto loop0 endi +print =====loop0 sql create database test1 vgroups 1; sql use test1; @@ -51,7 +52,7 @@ sql insert into ts2 values(1648791211000,1,2,3); $loop_count = 0 -loop0: +loop1: sleep 300 sql select * from streamt; @@ -62,7 +63,62 @@ endi if $rows != 2 then print =====rows=$rows -goto loop0 +goto loop1 endi -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +print =====loop1 + +sql create database test2 vgroups 1; +sql use test2; +sql create stable st(ts timestamp,a int,b int,c int,id int) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); + +sql create stream stream_t2 trigger at_once into streamtST as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6, max(id) c7 from st partition by ta interval(10s) ; +sql insert into ts1 values(1648791211000,1,2,3,1); +sql insert into ts1 values(1648791222001,2,2,3,2); +sql insert into ts2 values(1648791211000,1,2,3,3); +sql insert into ts2 values(1648791222001,2,2,3,4); + +sql insert into ts2 values(1648791222002,2,2,3,5); +sql insert into ts2 values(1648791222002,2,2,3,6); + +sql insert into ts1 values(1648791211000,1,2,3,1); +sql insert into ts1 values(1648791222001,2,2,3,2); +sql insert into ts2 values(1648791211000,1,2,3,3); +sql insert into ts2 values(1648791222001,2,2,3,4); + +$loop_count = 0 + +loop2: +sleep 300 +sql select * from streamtST; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $data01 != 1 then +print =====data01=$data01 +goto loop2 +endi + +if $data02 != 1 then +print =====data02=$data02 +goto loop2 +endi + +if $data03 != 1 then +print =====data03=$data03 +goto loop2 +endi + +if $data04 != 2 then +print =====data04=$data04 +goto loop2 +endi + +print =====loop2 + +system sh/stop_dnodes.sh \ No newline at end of file