diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 1ab6a4a8cd..d3fc05a1d2 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -393,7 +393,7 @@ pipeline { agent{label " Mac_catalina "} steps { catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') { - timeout(time: 20, unit: 'MINUTES'){ + timeout(time: 30, unit: 'MINUTES'){ pre_test() pre_test_build_mac() } diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7f65ef8358..2406601722 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -402,7 +402,8 @@ typedef struct SHistoryTaskInfo { int32_t tickCount; int32_t retryTimes; int32_t waitInterval; - int64_t haltVer; // offset in wal when halt the stream task + int64_t haltVer; // offset in wal when halt the stream task + bool operatorOpen; // false by default } SHistoryTaskInfo; typedef struct STaskOutputInfo { @@ -852,7 +853,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); void streamMetaStartHb(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ); +int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, + int64_t endTs, bool ready); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); diff --git a/include/util/tdef.h b/include/util/tdef.h index 69d0c1126d..1a440c7268 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -305,7 +305,7 @@ typedef enum ELogicConditionType { #define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512 #define TSDB_SYNC_NEGOTIATION_WIN 512 -#define TSDB_SYNC_SNAP_BUFFER_SIZE 2048 +#define TSDB_SYNC_SNAP_BUFFER_SIZE 1024 #define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index f4f9cbb535..1275ba7962 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -291,12 +291,17 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) { goto _OVER; } - pObj = mndAcquireSnode(pMnode, createReq.dnodeId); - if (pObj != NULL) { +// pObj = mndAcquireSnode(pMnode, createReq.dnodeId); +// if (pObj != NULL) { +// terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST; +// goto _OVER; +// } else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) { +// goto _OVER; +// } + + if (sdbGetSize(pMnode->pSdb, SDB_SNODE) >= 1){ terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST; goto _OVER; - } else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) { - goto _OVER; } pDnode = mndAcquireDnode(pMnode, createReq.dnodeId); @@ -314,7 +319,7 @@ _OVER: return -1; } - mndReleaseSnode(pMnode, pObj); +// mndReleaseSnode(pMnode, pObj); mndReleaseDnode(pMnode, pDnode); tFreeSMCreateQnodeReq(&createReq); return code; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 21969cc404..e8d5dfd1f5 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -363,35 +363,6 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { return 0; } -static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64_t watermark, char **pStr) { - if (NULL == ast) { - return TSDB_CODE_SUCCESS; - } - - SNode * pAst = NULL; - int32_t code = nodesStringToNode(ast, &pAst); - - SQueryPlan *pPlan = NULL; - if (TSDB_CODE_SUCCESS == code) { - SPlanContext cxt = { - .pAstRoot = pAst, - .topicQuery = false, - .streamQuery = true, - .triggerType = (triggerType == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType, - .watermark = watermark, - }; - code = qCreateQueryPlan(&cxt, &pPlan, NULL); - } - - if (TSDB_CODE_SUCCESS == code) { - code = nodesNodeToString((SNode *)pPlan, false, pStr, NULL); - } - nodesDestroyNode(pAst); - nodesDestroyNode((SNode *)pPlan); - terrno = code; - return code; -} - static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) { SNode * pAst = NULL; SQueryPlan *pPlan = NULL; @@ -733,11 +704,20 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask pReq->streamId = pTask->id.streamId; STransAction action = {0}; - SEpSet epset = {0}; - if (pTask->info.nodeId == SNODE_HANDLE) { - SSnodeObj *pObj = mndAcquireSnode(pMnode, pTask->info.nodeId); - addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port); - } else { + SEpSet epset = {0}; + if(pTask->info.nodeId == SNODE_HANDLE){ + SSnodeObj *pObj = NULL; + void *pIter = NULL; + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj); + if (pIter == NULL) { + break; + } + + addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port); + sdbRelease(pMnode->pSdb, pObj); + } + }else{ SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); epset = mndGetVgroupEpset(pMnode, pVgObj); mndReleaseVgroup(pMnode, pVgObj); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 408b664e50..44ee804d22 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -161,6 +161,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg, SSubplan* pPlan) { if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { + if(pRebVg->oldConsumerId == -1) return 0; //drop stream, no consumer, while split vnode,all consumerId is -1 terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; return -1; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2b35628663..e9943d2abf 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -337,7 +337,7 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) { while (pIter) { STqHandle* pHandle = *(STqHandle**)pIter; - tqInfo("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); + tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); if (ASSERT(pHandle->msg != NULL)) { tqError("pHandle->msg should not be null"); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index f367bc96f8..8fee1d5904 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -72,7 +72,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); pHandle->msg->contLen = pMsg->contLen; int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); - tqInfo("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, + tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); return 0; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index aee2aaa244..b1d49bf31b 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -475,6 +475,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); if (!isLeader) { + streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false); tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); return code; @@ -482,6 +483,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { + streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false); tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", rsp.streamId, rsp.upstreamTaskId, vgId); terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; @@ -670,7 +672,8 @@ int32_t startStreamTasks(SStreamMeta* pMeta) { streamLaunchFillHistoryTask(pTask); } - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + pTask->execInfo.start, true); streamMetaReleaseTask(pMeta, pTask); continue; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 104c9b2f35..faea881f72 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -278,6 +278,15 @@ _exit: return code; } +static int64_t tBlockDataSize(SBlockData* pBlockData) { + int64_t nData = 0; + for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) { + SColData* pColData = tBlockDataGetColDataByIdx(pBlockData, iCol); + nData += pColData->nData; + } + return nData; +} + static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) { int32_t code = 0; int32_t lino = 0; @@ -320,8 +329,11 @@ static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** dat code = tsdbIterMergerNext(reader->dataIterMerger); TSDB_CHECK_CODE(code, lino, _exit); - if (reader->blockData->nRow >= 81920) { - break; + if (!(reader->blockData->nRow % 16)) { + int64_t nData = tBlockDataSize(reader->blockData); + if (nData >= 1 * 1024 * 1024) { + break; + } } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 8ab8f3852e..25f32195be 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -256,7 +256,10 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { bool finished = false; const char* id = pTask->id.idStr; - qSetStreamOpOpen(exec); + if (!pTask->hTaskInfo.operatorOpen) { + qSetStreamOpOpen(exec); + pTask->hTaskInfo.operatorOpen = true; + } while (1) { if (streamTaskShouldPause(pTask)) { diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index a4c448f678..0b6603cd7b 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -67,7 +67,8 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", pTask->id.idStr, numOfDowns, el, p); - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + pTask->execInfo.start, true); return TSDB_CODE_SUCCESS; } @@ -469,14 +470,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); } - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false); + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + taosGetTimestampMs(), false); // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId); - streamMetaUpdateTaskDownstreamStatus(pHTask, pHTask->execInfo.init, taosGetTimestampMs(), false); + streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, + taosGetTimestampMs(), false); streamMetaReleaseTask(pTask->pMeta, pHTask); } } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms @@ -1066,15 +1069,13 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) } } -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool ready) { - SStreamMeta* pMeta = pTask->pMeta; +int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, + int64_t endTs, bool ready) { + STaskStartInfo* pStartInfo = &pMeta->startInfo; + STaskId id = {.streamId = streamId, .taskId = taskId}; streamMetaWLock(pMeta); - - STaskId id = streamTaskExtractKey(pTask); - STaskStartInfo* pStartInfo = &pMeta->startInfo; - - SHashObj* pDst = ready? pStartInfo->pReadyTaskSet:pStartInfo->pFailedTaskSet; + SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); @@ -1086,15 +1087,14 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; - stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64 + stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x startTs:%" PRId64 ", readyTs:%" PRId64 " total elapsed time:%.2fs", - pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs, + pMeta->vgId, numOfTotal, taskId, pStartInfo->startTs, pStartInfo->readyTs, pStartInfo->elapsedTime / 1000.0); // print the initialization elapsed time and info displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); - streamMetaResetStartInfo(pStartInfo); } else { stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 33d8d34514..341d989f8f 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -327,15 +327,19 @@ int32_t walEndSnapshot(SWal *pWal) { // iterate files, until the searched result // delete according to file size or close time + SWalFileInfo *pUntil = NULL; for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { if ((pWal->cfg.retentionSize > 0 && newTotSize > pWal->cfg.retentionSize) || (pWal->cfg.retentionPeriod == 0 || pWal->cfg.retentionPeriod > 0 && iter->closeTs >= 0 && iter->closeTs + pWal->cfg.retentionPeriod < ts)) { - deleteCnt++; newTotSize -= iter->fileSize; - taosArrayPush(pWal->toDeleteFiles, iter); + pUntil = iter; } } + for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter <= pUntil; iter++) { + deleteCnt++; + taosArrayPush(pWal->toDeleteFiles, iter); + } // make new array, remove files taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index cc6647e463..a869af7d5d 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -247,7 +247,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_ALREADY_EXIST, "Mnode already exists" TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_NOT_EXIST, "Mnode not there") TAOS_DEFINE_ERROR(TSDB_CODE_MND_QNODE_ALREADY_EXIST, "Qnode already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_QNODE_NOT_EXIST, "Qnode not there") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode already exists") +//TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode already exists") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode can only be created 1") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_NOT_EXIST, "Snode not there") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_FEW_MNODES, "The replica of mnode cannot less than 1") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_MNODES, "The replica of mnode cannot exceed 3") diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 7d3efbf181..d4579c20bc 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -6,6 +6,7 @@ ,,y,unit-test,bash test.sh #system test +,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/stream_basic.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/scalar_function.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_interval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_session.py diff --git a/tests/script/tsim/snode/basic1.sim b/tests/script/tsim/snode/basic1.sim index 86072215f7..e7346b75a0 100644 --- a/tests/script/tsim/snode/basic1.sim +++ b/tests/script/tsim/snode/basic1.sim @@ -113,11 +113,7 @@ sql_error drop snode on dnode 2 print =============== create drop snodes sql create snode on dnode 1 -sql create snode on dnode 2 -sql show snodes -if $rows != 2 then - return -1 -endi +sql_error create snode on dnode 2 print =============== restart system sh/exec.sh -n dnode1 -s stop -x SIGINT @@ -127,7 +123,7 @@ system sh/exec.sh -n dnode2 -s start sleep 2000 sql show snodes -if $rows != 2 then +if $rows != 1 then return -1 endi diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-column.py b/tests/system-test/7-tmq/tmqVnodeSplit-column.py index 87a73e981e..1fe2b5809a 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-column.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-column.py @@ -23,7 +23,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -49,7 +49,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -118,7 +118,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-db.py b/tests/system-test/7-tmq/tmqVnodeSplit-db.py index e4353d3268..f66acf4fcd 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-db.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-db.py @@ -23,7 +23,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -49,7 +49,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -118,7 +118,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -189,7 +189,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py index 8276ae638b..68fb07b813 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py @@ -25,7 +25,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -51,7 +51,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -120,7 +120,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 120, @@ -189,7 +189,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py index 0d247b2848..6140e8a544 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py @@ -25,7 +25,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -51,7 +51,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -120,7 +120,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 120, @@ -189,7 +189,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py index cda5a27919..18b80b7f8d 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py @@ -27,7 +27,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -53,7 +53,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -122,7 +122,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 120, @@ -192,7 +192,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb.py index 17a427567e..c203350322 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb.py @@ -25,7 +25,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -51,7 +51,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -120,7 +120,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -190,7 +190,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeTransform-db.py b/tests/system-test/7-tmq/tmqVnodeTransform-db.py index 005bca70d6..5c61908d96 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform-db.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform-db.py @@ -20,7 +20,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -46,7 +46,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 30, @@ -138,7 +138,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 10, @@ -217,7 +217,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 10, diff --git a/tests/system-test/7-tmq/tmqVnodeTransform-stb.py b/tests/system-test/7-tmq/tmqVnodeTransform-stb.py index ec1331ae59..64cdf6d153 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform-stb.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform-stb.py @@ -20,7 +20,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -46,7 +46,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -137,7 +137,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -207,7 +207,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeTransform.py b/tests/system-test/7-tmq/tmqVnodeTransform.py index aab94bc7a2..3698297618 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform.py @@ -20,7 +20,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -46,7 +46,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -137,7 +137,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -203,7 +203,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/8-stream/stream_basic.py b/tests/system-test/8-stream/stream_basic.py new file mode 100644 index 0000000000..7f4d1d5ee3 --- /dev/null +++ b/tests/system-test/8-stream/stream_basic.py @@ -0,0 +1,110 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + + +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +from util.sqlset import * +from util.autogen import * + +import random +import time +import traceback +import os +from os import path + + +class TDTestCase: + # init + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + + # autoGen + self.autoGen = AutoGen() + + def waitTranslation(self, waitSeconds): + # wait end + for i in range(waitSeconds): + sql ="show transactions;" + rows = tdSql.query(sql) + if rows == 0: + return True + tdLog.info(f"i={i} wait for translation finish ...") + time.sleep(1) + + return False + + def getPath(self, tool="taosBenchmark"): + if (platform.system().lower() == 'windows'): + tool = tool + ".exe" + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + paths = [] + for root, dirs, files in os.walk(projPath): + if ((tool) in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + paths.append(os.path.join(root, tool)) + break + if (len(paths) == 0): + tdLog.exit("taosBenchmark not found!") + return + else: + tdLog.info("taosBenchmark found in %s" % paths[0]) + return paths[0] + + def taosBenchmark(self, param): + binPath = self.getPath() + cmd = f"{binPath} {param}" + tdLog.info(cmd) + os.system(cmd) + + # run + def run(self): + # gen data + random.seed(int(time.time())) + self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y") + # create stream + tdSql.execute("use db") + tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) + sql = "select count(*) from sta" + # loop wait max 60s to check count is ok + tdLog.info("loop wait result ...") + tdSql.checkDataLoop(0, 0, 99999, sql, loopCount=120, waitTime=0.5) + + # check all data is correct + sql = "select * from sta where cnt != 20;" + tdSql.query(sql) + tdSql.checkRows(0) + + # check ts interval is correct + sql = "select * from ( select diff(_wstart) as tsdif from sta ) where tsdif != 10;" + tdSql.query(sql) + tdSql.checkRows(0) + + # stop + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file