From d007d31c4d205ef6558d59eeda471b3f31bf6574 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 27 Nov 2023 18:15:09 +0800 Subject: [PATCH 01/14] fix:process snode when drop stream --- source/dnode/mnode/impl/src/mndStream.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 235703428f..bc265db766 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -700,8 +700,17 @@ static int32_t mndPersistTaskDropReq(SMnode* pMnode, STrans *pTrans, SStreamTask STransAction action = {0}; SEpSet epset = {0}; if(pTask->info.nodeId == SNODE_HANDLE){ - SSnodeObj* pObj = mndAcquireSnode(pMnode, pTask->info.nodeId); - addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port); + SSnodeObj *pObj = NULL; + void *pIter = NULL; + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj); + if (pIter == NULL) { + break; + } + + addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port); + sdbRelease(pMnode->pSdb, pObj); + } }else{ SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); epset = mndGetVgroupEpset(pMnode, pVgObj); From e66a1d6c9aa51aa71a9b58ffa0699813f91a5559 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 29 Nov 2023 09:14:44 +0800 Subject: [PATCH 02/14] enh: limit snap replication msg by size of block data in tsdbSnapReadTimeSeriesData --- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index a9da0fbcec..8a7d1f8b97 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -278,6 +278,15 @@ _exit: return code; } +static int64_t tBlockDataSize(SBlockData* pBlockData) { + int64_t nData = 0; + for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) { + SColData* pColData = tBlockDataGetColDataByIdx(pBlockData, iCol); + nData += pColData->nData; + } + return nData; +} + static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) { int32_t code = 0; int32_t lino = 0; @@ -320,8 +329,11 @@ static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** dat code = tsdbIterMergerNext(reader->dataIterMerger); TSDB_CHECK_CODE(code, lino, _exit); - if (reader->blockData->nRow >= 81920) { - break; + if (!(reader->blockData->nRow % 16)) { + int64_t nData = tBlockDataSize(reader->blockData); + if (nData >= 1 * 1024 * 1024) { + break; + } } } From 9e8944d17cf9d7433b956a5073766c1125908bae Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 29 Nov 2023 09:18:05 +0800 Subject: [PATCH 03/14] enh: adjust buffer size of sync snap replication --- include/util/tdef.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index 69d0c1126d..1a440c7268 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -305,7 +305,7 @@ typedef enum ELogicConditionType { #define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512 #define TSDB_SYNC_NEGOTIATION_WIN 512 -#define TSDB_SYNC_SNAP_BUFFER_SIZE 2048 +#define TSDB_SYNC_SNAP_BUFFER_SIZE 1024 #define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta From 4f168b4b2dd579082db6d7b4b449fc400c97afed Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 29 Nov 2023 17:22:43 +0800 Subject: [PATCH 04/14] fix:tmqVnodeSplit-column.py failed in arm64 because of rebalance after unsubscribe while split vnode --- source/dnode/mnode/impl/src/mndStream.c | 31 +------------------ source/dnode/mnode/impl/src/mndSubscribe.c | 1 + source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqPush.c | 2 +- .../system-test/7-tmq/tmqVnodeSplit-column.py | 2 +- 5 files changed, 5 insertions(+), 33 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bc265db766..08c60e7afa 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -328,35 +328,6 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { return 0; } -static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64_t watermark, char **pStr) { - if (NULL == ast) { - return TSDB_CODE_SUCCESS; - } - - SNode * pAst = NULL; - int32_t code = nodesStringToNode(ast, &pAst); - - SQueryPlan *pPlan = NULL; - if (TSDB_CODE_SUCCESS == code) { - SPlanContext cxt = { - .pAstRoot = pAst, - .topicQuery = false, - .streamQuery = true, - .triggerType = (triggerType == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType, - .watermark = watermark, - }; - code = qCreateQueryPlan(&cxt, &pPlan, NULL); - } - - if (TSDB_CODE_SUCCESS == code) { - code = nodesNodeToString((SNode *)pPlan, false, pStr, NULL); - } - nodesDestroyNode(pAst); - nodesDestroyNode((SNode *)pPlan); - terrno = code; - return code; -} - static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) { SNode * pAst = NULL; SQueryPlan *pPlan = NULL; @@ -777,6 +748,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SDbObj * pDb = NULL; SCMCreateStreamReq createStreamReq = {0}; SStreamObj streamObj = {0}; + char* sql = NULL; if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -808,7 +780,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - char* sql = NULL; int32_t sqlLen = 0; if(createStreamReq.sql != NULL){ sqlLen = strlen(createStreamReq.sql); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 408b664e50..44ee804d22 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -161,6 +161,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg, SSubplan* pPlan) { if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { + if(pRebVg->oldConsumerId == -1) return 0; //drop stream, no consumer, while split vnode,all consumerId is -1 terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; return -1; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9d16402ee6..f077caace7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -337,7 +337,7 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) { while (pIter) { STqHandle* pHandle = *(STqHandle**)pIter; - tqInfo("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); + tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); if (ASSERT(pHandle->msg != NULL)) { tqError("pHandle->msg should not be null"); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index f367bc96f8..8fee1d5904 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -72,7 +72,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); pHandle->msg->contLen = pMsg->contLen; int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); - tqInfo("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, + tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); return 0; } diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-column.py b/tests/system-test/7-tmq/tmqVnodeSplit-column.py index 54a43465e7..87a73e981e 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-column.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-column.py @@ -188,7 +188,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) From d9c84b4b720a502fe0b7d2b22709dbf3e3debea9 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 30 Nov 2023 11:55:17 +0800 Subject: [PATCH 05/14] enh: tolerate out-of-order of closeTS in walEndSnapshot --- source/libs/wal/src/walWrite.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 33d8d34514..341d989f8f 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -327,15 +327,19 @@ int32_t walEndSnapshot(SWal *pWal) { // iterate files, until the searched result // delete according to file size or close time + SWalFileInfo *pUntil = NULL; for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { if ((pWal->cfg.retentionSize > 0 && newTotSize > pWal->cfg.retentionSize) || (pWal->cfg.retentionPeriod == 0 || pWal->cfg.retentionPeriod > 0 && iter->closeTs >= 0 && iter->closeTs + pWal->cfg.retentionPeriod < ts)) { - deleteCnt++; newTotSize -= iter->fileSize; - taosArrayPush(pWal->toDeleteFiles, iter); + pUntil = iter; } } + for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter <= pUntil; iter++) { + deleteCnt++; + taosArrayPush(pWal->toDeleteFiles, iter); + } // make new array, remove files taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); From 3f6db938f36293da34f4514d336240a9ed068587 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 30 Nov 2023 16:00:02 +0800 Subject: [PATCH 06/14] fix:minus row num to reduce execute time --- tests/system-test/7-tmq/tmqVnodeSplit-column.py | 6 +++--- tests/system-test/7-tmq/tmqVnodeSplit-db.py | 8 ++++---- .../7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py | 8 ++++---- .../7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py | 8 ++++---- tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py | 8 ++++---- tests/system-test/7-tmq/tmqVnodeSplit-stb.py | 8 ++++---- tests/system-test/7-tmq/tmqVnodeTransform-db.py | 8 ++++---- tests/system-test/7-tmq/tmqVnodeTransform-stb.py | 8 ++++---- tests/system-test/7-tmq/tmqVnodeTransform.py | 8 ++++---- 9 files changed, 35 insertions(+), 35 deletions(-) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-column.py b/tests/system-test/7-tmq/tmqVnodeSplit-column.py index 87a73e981e..1fe2b5809a 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-column.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-column.py @@ -23,7 +23,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -49,7 +49,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -118,7 +118,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-db.py b/tests/system-test/7-tmq/tmqVnodeSplit-db.py index e4353d3268..f66acf4fcd 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-db.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-db.py @@ -23,7 +23,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -49,7 +49,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -118,7 +118,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -189,7 +189,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py index 8276ae638b..68fb07b813 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py @@ -25,7 +25,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -51,7 +51,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -120,7 +120,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 120, @@ -189,7 +189,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py index 0d247b2848..6140e8a544 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py @@ -25,7 +25,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -51,7 +51,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -120,7 +120,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 120, @@ -189,7 +189,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py index cda5a27919..18b80b7f8d 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py @@ -27,7 +27,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -53,7 +53,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -122,7 +122,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 120, @@ -192,7 +192,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb.py index 17a427567e..c203350322 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb.py @@ -25,7 +25,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -51,7 +51,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -120,7 +120,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -190,7 +190,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeTransform-db.py b/tests/system-test/7-tmq/tmqVnodeTransform-db.py index 005bca70d6..5c61908d96 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform-db.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform-db.py @@ -20,7 +20,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -46,7 +46,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 30, @@ -138,7 +138,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 10, @@ -217,7 +217,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 10, diff --git a/tests/system-test/7-tmq/tmqVnodeTransform-stb.py b/tests/system-test/7-tmq/tmqVnodeTransform-stb.py index ec1331ae59..64cdf6d153 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform-stb.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform-stb.py @@ -20,7 +20,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -46,7 +46,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -137,7 +137,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -207,7 +207,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeTransform.py b/tests/system-test/7-tmq/tmqVnodeTransform.py index aab94bc7a2..3698297618 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform.py @@ -20,7 +20,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -46,7 +46,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -137,7 +137,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -203,7 +203,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) From 30d78c6be15bd270b90c739a385c16fcce849e5e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 30 Nov 2023 16:10:09 +0800 Subject: [PATCH 07/14] fix(stream): make sure open operator only once for scan-history task. --- include/libs/stream/tstream.h | 3 ++- source/libs/stream/src/streamExec.c | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7f65ef8358..9f88672231 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -402,7 +402,8 @@ typedef struct SHistoryTaskInfo { int32_t tickCount; int32_t retryTimes; int32_t waitInterval; - int64_t haltVer; // offset in wal when halt the stream task + int64_t haltVer; // offset in wal when halt the stream task + bool operatorOpen; // false by default } SHistoryTaskInfo; typedef struct STaskOutputInfo { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 8ab8f3852e..25f32195be 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -256,7 +256,10 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { bool finished = false; const char* id = pTask->id.idStr; - qSetStreamOpOpen(exec); + if (!pTask->hTaskInfo.operatorOpen) { + qSetStreamOpOpen(exec); + pTask->hTaskInfo.operatorOpen = true; + } while (1) { if (streamTaskShouldPause(pTask)) { From 77a48f87113f2a8c3ea81296421c61587a88ddff Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 30 Nov 2023 17:36:03 +0800 Subject: [PATCH 08/14] fix:test case --- tests/script/tsim/snode/basic1.sim | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/script/tsim/snode/basic1.sim b/tests/script/tsim/snode/basic1.sim index 86072215f7..6e553591bd 100644 --- a/tests/script/tsim/snode/basic1.sim +++ b/tests/script/tsim/snode/basic1.sim @@ -113,11 +113,7 @@ sql_error drop snode on dnode 2 print =============== create drop snodes sql create snode on dnode 1 -sql create snode on dnode 2 -sql show snodes -if $rows != 2 then - return -1 -endi +sql_error create snode on dnode 2 print =============== restart system sh/exec.sh -n dnode1 -s stop -x SIGINT From acce0852df15aefcd818c0c1388ab3a920a3b3e6 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 30 Nov 2023 19:52:27 +0800 Subject: [PATCH 09/14] fix:test case --- tests/script/tsim/snode/basic1.sim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/tsim/snode/basic1.sim b/tests/script/tsim/snode/basic1.sim index 6e553591bd..e7346b75a0 100644 --- a/tests/script/tsim/snode/basic1.sim +++ b/tests/script/tsim/snode/basic1.sim @@ -123,7 +123,7 @@ system sh/exec.sh -n dnode2 -s start sleep 2000 sql show snodes -if $rows != 2 then +if $rows != 1 then return -1 endi From 353c8d21a9ba89bec6cbc87e746a3c9e3edb6e48 Mon Sep 17 00:00:00 2001 From: haoranchen Date: Fri, 1 Dec 2023 09:25:41 +0800 Subject: [PATCH 10/14] ci:increase timeout of mac test in Jenkinsfile2 --- Jenkinsfile2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 1ab6a4a8cd..d3fc05a1d2 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -393,7 +393,7 @@ pipeline { agent{label " Mac_catalina "} steps { catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') { - timeout(time: 20, unit: 'MINUTES'){ + timeout(time: 30, unit: 'MINUTES'){ pre_test() pre_test_build_mac() } From eac3cbf564ff75085fa7484e3c1033317b6e9962 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Dec 2023 11:12:57 +0800 Subject: [PATCH 11/14] fix(stream): merge 3.0 --- include/libs/stream/tstream.h | 3 ++- source/dnode/vnode/src/tqCommon/tqCommon.c | 5 +++- source/libs/stream/src/streamStart.c | 30 ++++++++++------------ 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9f88672231..2406601722 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -853,7 +853,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); void streamMetaStartHb(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ); +int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, + int64_t endTs, bool ready); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index aee2aaa244..b1d49bf31b 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -475,6 +475,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); if (!isLeader) { + streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false); tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); return code; @@ -482,6 +483,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { + streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false); tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", rsp.streamId, rsp.upstreamTaskId, vgId); terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; @@ -670,7 +672,8 @@ int32_t startStreamTasks(SStreamMeta* pMeta) { streamLaunchFillHistoryTask(pTask); } - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + pTask->execInfo.start, true); streamMetaReleaseTask(pMeta, pTask); continue; } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index a4c448f678..6e4fe45684 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -67,7 +67,8 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", pTask->id.idStr, numOfDowns, el, p); - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + pTask->execInfo.start, true); return TSDB_CODE_SUCCESS; } @@ -469,14 +470,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); } - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false); + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + taosGetTimestampMs(), false); // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId); - streamMetaUpdateTaskDownstreamStatus(pHTask, pHTask->execInfo.init, taosGetTimestampMs(), false); + streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, + taosGetTimestampMs(), false); streamMetaReleaseTask(pTask->pMeta, pHTask); } } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms @@ -1066,29 +1069,26 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) } } -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool ready) { - SStreamMeta* pMeta = pTask->pMeta; +int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, + int64_t endTs, bool ready) { + STaskStartInfo* pStartInfo = &pMeta->startInfo; + STaskId id = {.streamId = streamId, .taskId = taskId}; streamMetaWLock(pMeta); - - STaskId id = streamTaskExtractKey(pTask); - STaskStartInfo* pStartInfo = &pMeta->startInfo; - - SHashObj* pDst = ready? pStartInfo->pReadyTaskSet:pStartInfo->pFailedTaskSet; + SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); - int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); - if (numOfRecv == numOfTotal) { + if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) { pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; - stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64 + stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x startTs:%" PRId64 ", readyTs:%" PRId64 " total elapsed time:%.2fs", - pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs, + pMeta->vgId, numOfTotal, taskId, pStartInfo->startTs, pStartInfo->readyTs, pStartInfo->elapsedTime / 1000.0); // print the initialization elapsed time and info @@ -1096,8 +1096,6 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); streamMetaResetStartInfo(pStartInfo); - } else { - stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal); } streamMetaWUnLock(pMeta); From 4e5853d9fe305c26b88b7107bd9a6c8fceab7be6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Dec 2023 12:09:49 +0800 Subject: [PATCH 12/14] refactor: do some internal refactor. --- source/libs/stream/src/streamStart.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 6e4fe45684..0b6603cd7b 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -1081,8 +1081,9 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); + int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); - if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) { + if (numOfRecv == numOfTotal) { pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; @@ -1094,8 +1095,9 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI // print the initialization elapsed time and info displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); - streamMetaResetStartInfo(pStartInfo); + } else { + stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal); } streamMetaWUnLock(pMeta); From 34e79d158cbba5c0d91384758f1b68aae888b0fa Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 1 Dec 2023 16:27:02 +0800 Subject: [PATCH 13/14] case: add stream basic test case --- tests/system-test/8-stream/stream_basic.py | 110 +++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 tests/system-test/8-stream/stream_basic.py diff --git a/tests/system-test/8-stream/stream_basic.py b/tests/system-test/8-stream/stream_basic.py new file mode 100644 index 0000000000..7f4d1d5ee3 --- /dev/null +++ b/tests/system-test/8-stream/stream_basic.py @@ -0,0 +1,110 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + + +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +from util.sqlset import * +from util.autogen import * + +import random +import time +import traceback +import os +from os import path + + +class TDTestCase: + # init + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + + # autoGen + self.autoGen = AutoGen() + + def waitTranslation(self, waitSeconds): + # wait end + for i in range(waitSeconds): + sql ="show transactions;" + rows = tdSql.query(sql) + if rows == 0: + return True + tdLog.info(f"i={i} wait for translation finish ...") + time.sleep(1) + + return False + + def getPath(self, tool="taosBenchmark"): + if (platform.system().lower() == 'windows'): + tool = tool + ".exe" + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + paths = [] + for root, dirs, files in os.walk(projPath): + if ((tool) in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + paths.append(os.path.join(root, tool)) + break + if (len(paths) == 0): + tdLog.exit("taosBenchmark not found!") + return + else: + tdLog.info("taosBenchmark found in %s" % paths[0]) + return paths[0] + + def taosBenchmark(self, param): + binPath = self.getPath() + cmd = f"{binPath} {param}" + tdLog.info(cmd) + os.system(cmd) + + # run + def run(self): + # gen data + random.seed(int(time.time())) + self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y") + # create stream + tdSql.execute("use db") + tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) + sql = "select count(*) from sta" + # loop wait max 60s to check count is ok + tdLog.info("loop wait result ...") + tdSql.checkDataLoop(0, 0, 99999, sql, loopCount=120, waitTime=0.5) + + # check all data is correct + sql = "select * from sta where cnt != 20;" + tdSql.query(sql) + tdSql.checkRows(0) + + # check ts interval is correct + sql = "select * from ( select diff(_wstart) as tsdif from sta ) where tsdif != 10;" + tdSql.query(sql) + tdSql.checkRows(0) + + # stop + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file From a769ccb816de54ed2cc3ad6d798236bfeb57643e Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 1 Dec 2023 16:29:06 +0800 Subject: [PATCH 14/14] case: add stream basic test case --- tests/parallel_test/cases.task | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 92eaec52b5..b0129428b2 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -6,6 +6,7 @@ ,,y,unit-test,bash test.sh #system test +,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/stream_basic.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/scalar_function.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_interval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_session.py