diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0f399da8fd..de7c743b7d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -444,6 +444,7 @@ typedef struct STaskCheckInfo { int64_t startTs; int32_t notReadyTasks; int32_t inCheckProcess; + int32_t stopCheckProcess; tmr_h checkRspTmr; TdThreadMutex checkInfoLock; } STaskCheckInfo; @@ -844,14 +845,12 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); -int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs); int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id); int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, int32_t* pNotReady, const char* id); void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo); -int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); -int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id); int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask); +int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 4ce8579ea0..4667cd73b1 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -216,7 +216,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskResetStatus(pTask); - streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr); + + streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); SStreamTask** ppHTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -231,7 +232,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); streamTaskResetStatus(*ppHTask); - streamTaskCompleteCheck(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr); + streamTaskStopMonitorCheckRsp(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr); } } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index b9b7c8ddfa..cc1987492c 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -184,15 +184,10 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { ASSERT(pTask->status.downstreamReady == 0); - int32_t code = streamTaskStartCheckDownstream(&pTask->taskCheckInfo, pTask->id.idStr); - if (code != TSDB_CODE_SUCCESS) { - return; - } - - streamTaskInitTaskCheckInfo(&pTask->taskCheckInfo, &pTask->outputInfo, taosGetTimestampMs()); - // serialize streamProcessScanHistoryFinishRsp if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + streamTaskStartMonitorCheckRsp(pTask); + req.reqId = tGenIdPI64(); req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId; req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; @@ -206,8 +201,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); - streamTaskStartMonitorCheckRsp(pTask); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + streamTaskStartMonitorCheckRsp(pTask); + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); @@ -226,11 +222,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } - - streamTaskStartMonitorCheckRsp(pTask); } else { // for sink task, set it ready directly. stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); - streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr); + streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); doProcessDownstreamReadyRsp(pTask); } } @@ -405,7 +399,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs if (left == 0) { doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag - streamTaskCompleteCheck(pInfo, id); + streamTaskStopMonitorCheckRsp(pInfo, id); } else { stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7dc93ceccf..72611f4c14 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -534,7 +534,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t)); TdThreadMutexAttr attr = {0}; - int code = taosThreadMutexAttrInit(&attr); + + int code = taosThreadMutexAttrInit(&attr); if (code != 0) { stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code)); return code; @@ -563,6 +564,14 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr); pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); if (pOutputInfo->pDownstreamUpdateList == NULL) { + stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; + } + + pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo)); + if (pTask->taskCheckInfo.pList == NULL) { + stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return TSDB_CODE_OUT_OF_MEMORY; } @@ -942,14 +951,8 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) { return 0; } -int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { - if (pInfo->pList == NULL) { - pInfo->pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo)); - } else { - taosArrayClear(pInfo->pList); - } - - taosThreadMutexLock(&pInfo->checkInfoLock); +static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { + taosArrayClear(pInfo->pList); if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { pInfo->notReadyTasks = 1; @@ -959,8 +962,6 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut } pInfo->startTs = startTs; - - taosThreadMutexUnlock(&pInfo->checkInfoLock); return TSDB_CODE_SUCCESS; } @@ -1014,39 +1015,33 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t return TSDB_CODE_FAILED; } -int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { - taosThreadMutexLock(&pInfo->checkInfoLock); +static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { if (pInfo->inCheckProcess == 0) { pInfo->inCheckProcess = 1; } else { ASSERT(pInfo->startTs > 0); - stError("s-task:%s already in check procedure, checkTs:%"PRId64, id, pInfo->startTs); - - taosThreadMutexUnlock(&pInfo->checkInfoLock); + stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs); return TSDB_CODE_FAILED; } - taosThreadMutexUnlock(&pInfo->checkInfoLock); stDebug("s-task:%s set the in-check-procedure flag", id); return 0; } -int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id) { - taosThreadMutexLock(&pInfo->checkInfoLock); +static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, const char* id) { if (!pInfo->inCheckProcess) { - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; + stWarn("s-task:%s already not in-check-procedure", id); } int64_t el = taosGetTimestampMs() - pInfo->startTs; - stDebug("s-task:%s clear the in-check-procedure flag, elapsed time:%" PRId64 " ms", id, el); + stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el); pInfo->startTs = 0; - pInfo->inCheckProcess = 0; pInfo->notReadyTasks = 0; + pInfo->inCheckProcess = 0; + pInfo->stopCheckProcess = 0; taosArrayClear(pInfo->pList); - taosThreadMutexUnlock(&pInfo->checkInfoLock); return 0; } @@ -1099,16 +1094,22 @@ static void rspMonitorFn(void* param, void* tmrId) { int64_t now = taosGetTimestampMs(); int64_t el = now - pInfo->startTs; ETaskStatus state = pStat->state; + const char* id = pTask->id.idStr; int32_t numOfReady = 0; int32_t numOfFault = 0; - const char* id = pTask->id.idStr; + int32_t numOfNotRsp = 0; + int32_t numOfNotReady = 0; + int32_t numOfTimeout = 0; stDebug("s-task:%s start to do check downstream rsp check", id); if (state == TASK_STATUS__STOP) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); - streamTaskCompleteCheck(pInfo, id); + + taosThreadMutexLock(&pInfo->checkInfoLock); + streamTaskCompleteCheckRsp(pInfo, id); + taosThreadMutexUnlock(&pInfo->checkInfoLock); streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); return; @@ -1117,7 +1118,11 @@ static void rspMonitorFn(void* param, void* tmrId) { if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); - streamTaskCompleteCheck(pInfo, id); + + taosThreadMutexLock(&pInfo->checkInfoLock); + streamTaskCompleteCheckRsp(pInfo, id); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return; } @@ -1127,8 +1132,8 @@ static void rspMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name, vgId, ref); + streamTaskCompleteCheckRsp(pInfo, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); - streamTaskCompleteCheck(pInfo, id); return; } @@ -1141,7 +1146,8 @@ static void rspMonitorFn(void* param, void* tmrId) { if (p->status == TASK_DOWNSTREAM_READY) { numOfReady += 1; } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) { - stDebug("s-task:%s recv status from downstream, task:0x%x, quit from check downstream tasks", id, p->taskId); + stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id, + p->taskId); numOfFault += 1; } else { // TASK_DOWNSTREAM_NOT_READY if (p->rspTs == 0) { // not response yet @@ -1149,7 +1155,7 @@ static void rspMonitorFn(void* param, void* tmrId) { if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. taosArrayPush(pTimeoutList, &p->taskId); } else { // el < CHECK_NOT_RSP_DURATION - // do nothing and continue waiting for their rsps + numOfNotRsp += 1; // do nothing and continue waiting for their rsp } } else { taosArrayPush(pNotReadyList, &p->taskId); @@ -1160,33 +1166,35 @@ static void rspMonitorFn(void* param, void* tmrId) { stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); } - int32_t numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); - int32_t numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); + numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); + numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); // fault tasks detected, not try anymore - if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && - (numOfFault > 0)) { + ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList)); + if ((numOfNotRsp == 0) && (numOfFault > 0)) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " - "detected, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + "detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); taosThreadMutexUnlock(&pInfo->checkInfoLock); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); - streamTaskCompleteCheck(pInfo, id); + streamTaskCompleteCheckRsp(pInfo, id); return; } // checking of downstream tasks has been stopped by other threads - if (pInfo->inCheckProcess == 0) { + if (pInfo->stopCheckProcess == 1) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( - "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, " - "timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, " + "fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + + streamTaskCompleteCheckRsp(pInfo, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. @@ -1238,25 +1246,53 @@ static void rspMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now); } + taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosThreadMutexUnlock(&pInfo->checkInfoLock); - taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); - stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady, - numOfFault, numOfTimeout, numOfReady); + stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id, + numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); } int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { - ASSERT(pTask->taskCheckInfo.checkRspTmr == NULL); + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + + taosThreadMutexLock(&pInfo->checkInfoLock); + int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); + if (code != TSDB_CODE_SUCCESS) { + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_FAILED; + } + + streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); - pTask->taskCheckInfo.checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer); + + if (pInfo->checkRspTmr == NULL) { + pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer); + } else { + taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, pInfo->checkRspTmr); + } + + taosThreadMutexUnlock(&pInfo->checkInfoLock); return 0; } +int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { + taosThreadMutexLock(&pInfo->checkInfoLock); + streamTaskCompleteCheckRsp(pInfo, id); + + pInfo->stopCheckProcess = 1; + taosThreadMutexUnlock(&pInfo->checkInfoLock); + + stDebug("s-task:%s set stop check rsp mon", id); + return TSDB_CODE_SUCCESS; +} + void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo) { ASSERT(pInfo->inCheckProcess == 0); diff --git a/tests/army/community/storage/compressBasic.py b/tests/army/community/storage/compressBasic.py index 2fe19abbd4..c0975c6d75 100644 --- a/tests/army/community/storage/compressBasic.py +++ b/tests/army/community/storage/compressBasic.py @@ -141,6 +141,13 @@ class TDTestCase(TBase): tdSql.checkData(i, 5, self.defCompress) tdSql.checkData(i, 6, self.defLevel) + # geometry encode is disabled + sql = f"create table {self.db}.ta(ts timestamp, pos geometry(64)) " + tdSql.execute(sql) + sql = f"describe {self.db}.ta" + tdSql.query(sql) + tdSql.checkData(1, 4, "disabled") + tdLog.info("check default encode compress and level successfully.") def checkDataDesc(self, tbname, row, col, value): diff --git a/tests/army/enterprise/s3/s3_basic.json b/tests/army/enterprise/s3/s3Basic.json similarity index 81% rename from tests/army/enterprise/s3/s3_basic.json rename to tests/army/enterprise/s3/s3Basic.json index 747ac7c8ec..4a2f4496f9 100644 --- a/tests/army/enterprise/s3/s3_basic.json +++ b/tests/army/enterprise/s3/s3Basic.json @@ -7,8 +7,8 @@ "password": "taosdata", "connection_pool_size": 8, "num_of_records_per_req": 4000, - "prepared_rand": 1000, - "thread_count": 2, + "prepared_rand": 500, + "thread_count": 4, "create_table_thread_count": 1, "confirm_parameter_prompt": "no", "databases": [ @@ -18,20 +18,26 @@ "drop": "yes", "vgroups": 2, "replica": 1, - "duration":"15d", - "flush_each_batch":"yes", - "keep": "60d,100d,200d" + "duration":"10d", + "s3_keeplocal":"30d", + "s3_chunksize":"131072", + "tsdb_pagesize":"1", + "s3_compact":"1", + "wal_retention_size":"1", + "wal_retention_period":"1", + "flush_each_batch":"no", + "keep": "3650d" }, "super_tables": [ { "name": "stb", "child_table_exists": "no", - "childtable_count": 2, + "childtable_count": 10, "insert_rows": 2000000, "childtable_prefix": "d", "insert_mode": "taosc", "timestamp_step": 1000, - "start_timestamp":"now-90d", + "start_timestamp": 1600000000000, "columns": [ { "type": "bool", "name": "bc"}, { "type": "float", "name": "fc" }, diff --git a/tests/army/enterprise/s3/s3Basic.py b/tests/army/enterprise/s3/s3Basic.py new file mode 100644 index 0000000000..0933295e81 --- /dev/null +++ b/tests/army/enterprise/s3/s3Basic.py @@ -0,0 +1,345 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import time +import random + +import taos +import frame +import frame.etool +import frame.eos + +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame.srvCtl import * +from frame import * +from frame.eos import * + + +# +# 192.168.1.52 MINIO S3 +# + +''' +s3EndPoint http://192.168.1.52:9000 +s3AccessKey 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX' +s3BucketName ci-bucket +s3UploadDelaySec 60 +''' + + +class TDTestCase(TBase): + updatecfgDict = { + 's3EndPoint': 'http://192.168.1.52:9000', + 's3AccessKey': 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX', + 's3BucketName': 'ci-bucket', + 's3PageCacheSize': '10240', + "s3UploadDelaySec": "10", + 's3MigrateIntervalSec': '600', + 's3MigrateEnabled': '1' + } + + maxFileSize = (128 + 10) * 1014 * 1024 # add 10M buffer + + def insertData(self): + tdLog.info(f"insert data.") + # taosBenchmark run + json = etool.curFile(__file__, "s3Basic.json") + etool.benchMark(json=json) + + tdSql.execute(f"use {self.db}") + # come from s3_basic.json + self.childtable_count = 10 + self.insert_rows = 2000000 + self.timestamp_step = 1000 + + def createStream(self, sname): + sql = f"create stream {sname} fill_history 1 into stm1 as select count(*) from {self.db}.{self.stb} interval(10s);" + tdSql.execute(sql) + + def migrateDbS3(self): + sql = f"s3migrate database {self.db}" + tdSql.execute(sql, show=True) + + def checkDataFile(self, lines, maxFileSize): + # ls -l + # -rwxrwxrwx 1 root root 41652224 Apr 17 14:47 vnode2/tsdb/v2f1974ver47.3.data + overCnt = 0 + for line in lines: + cols = line.split() + fileSize = int(cols[4]) + fileName = cols[8] + #print(f" filesize={fileSize} fileName={fileName} line={line}") + if fileSize > maxFileSize: + tdLog.info(f"error, {fileSize} over max size({maxFileSize})\n") + overCnt += 1 + else: + tdLog.info(f"{fileName}({fileSize}) check size passed.") + + return overCnt + + def checkUploadToS3(self): + rootPath = sc.clusterRootPath() + cmd = f"ls -l {rootPath}/dnode*/data/vnode/vnode*/tsdb/*.data" + tdLog.info(cmd) + loop = 0 + rets = [] + overCnt = 0 + while loop < 180: + time.sleep(3) + + # check upload to s3 + rets = eos.runRetList(cmd) + cnt = len(rets) + if cnt == 0: + overCnt = 0 + tdLog.info("All data file upload to server over.") + break + overCnt = self.checkDataFile(rets, self.maxFileSize) + if overCnt == 0: + uploadOK = True + tdLog.info(f"All data files({len(rets)}) size bellow {self.maxFileSize}, check upload to s3 ok.") + break + + tdLog.info(f"loop={loop} no upload {overCnt} data files wait 3s retry ...") + if loop == 3: + sc.dnodeStop(1) + time.sleep(2) + sc.dnodeStart(1) + loop += 1 + # miggrate + self.migrateDbS3() + + # check can pass + if overCnt > 0: + tdLog.exit(f"s3 have {overCnt} files over size.") + + + def doAction(self): + tdLog.info(f"do action.") + + self.flushDb(show=True) + #self.compactDb(show=True) + + # sleep 70s + self.migrateDbS3() + + # check upload to s3 + self.checkUploadToS3() + + def checkStreamCorrect(self): + sql = f"select count(*) from {self.db}.stm1" + count = 0 + for i in range(120): + tdSql.query(sql) + count = tdSql.getData(0, 0) + if count == 100000 or count == 100001: + return True + time.sleep(1) + + tdLog.exit(f"stream count is not expect . expect = 100000 or 100001 real={count} . sql={sql}") + + + def checkCreateDb(self, keepLocal, chunkSize, compact): + # keyword + kw1 = kw2 = kw3 = "" + if keepLocal is not None: + kw1 = f"s3_keeplocal {keepLocal}" + if chunkSize is not None: + kw2 = f"s3_chunksize {chunkSize}" + if compact is not None: + kw3 = f"s3_compact {compact}" + + sql = f" create database db1 duration 1h {kw1} {kw2} {kw3}" + tdSql.execute(sql, show=True) + #sql = f"select name,s3_keeplocal,s3_chunksize,s3_compact from information_schema.ins_databases where name='db1';" + sql = f"select * from information_schema.ins_databases where name='db1';" + tdSql.query(sql) + # 29 30 31 -> chunksize keeplocal compact + if chunkSize is not None: + tdSql.checkData(0, 29, chunkSize) + if keepLocal is not None: + keepLocalm = keepLocal * 24 * 60 + tdSql.checkData(0, 30, f"{keepLocalm}m") + if compact is not None: + tdSql.checkData(0, 31, compact) + sql = "drop database db1" + tdSql.execute(sql) + + def checkExcept(self): + # errors + sqls = [ + f"create database db2 s3_keeplocal -1", + f"create database db2 s3_keeplocal 0", + f"create database db2 s3_keeplocal 365001", + f"create database db2 s3_chunksize -1", + f"create database db2 s3_chunksize 0", + f"create database db2 s3_chunksize 900000000", + f"create database db2 s3_compact -1", + f"create database db2 s3_compact 100", + f"create database db2 duration 1d s3_keeplocal 1d" + ] + tdSql.errors(sqls) + + + def checkBasic(self): + # create db + keeps = [1, 256, 1024, 365000, None] + chunks = [131072, 600000, 820000, 1048576, None] + comps = [0, 1, None] + + for keep in keeps: + for chunk in chunks: + for comp in comps: + self.checkCreateDb(keep, chunk, comp) + + + # --checks3 + idx = 1 + taosd = sc.taosdFile(idx) + cfg = sc.dnodeCfgPath(idx) + cmd = f"{taosd} -c {cfg} --checks3" + + eos.exe(cmd) + #output, error = eos.run(cmd) + #print(lines) + + ''' + tips = [ + "put object s3test.txt: success", + "listing bucket ci-bucket: success", + "get object s3test.txt: success", + "delete object s3test.txt: success" + ] + pos = 0 + for tip in tips: + pos = output.find(tip, pos) + #if pos == -1: + # tdLog.exit(f"checks3 failed not found {tip}. cmd={cmd} output={output}") + ''' + + # except + self.checkExcept() + + # + def preDb(self, vgroups): + vg = int(time.time()*1000)%10 + 1 + sql = f"create database predb vgroups {vg}" + tdSql.execute(sql, show=True) + + # history + def insertHistory(self): + tdLog.info(f"insert history data.") + # taosBenchmark run + json = etool.curFile(__file__, "s3Basic1.json") + etool.benchMark(json=json) + + # come from s3_basic.json + self.insert_rows += self.insert_rows/4 + self.timestamp_step = 500 + + # delete + def checkDelete(self): + # del 1000 rows + start = 1600000000000 + drows = 200 + for i in range(1, drows, 2): + sql = f"from {self.db}.{self.stb} where ts = {start + i*500}" + tdSql.execute("delete " + sql, show=True) + tdSql.query("select * " + sql) + tdSql.checkRows(0) + + # delete all 500 step + self.flushDb() + self.compactDb() + self.insert_rows -= drows/2 + sql = f"select count(*) from {self.db}.{self.stb}" + tdSql.checkAgg(sql, self.insert_rows * self.childtable_count) + + # delete 10W rows from 100000 + drows = 100000 + sdel = start + 100000 * self.timestamp_step + edel = start + 100000 * self.timestamp_step + drows * self.timestamp_step + sql = f"from {self.db}.{self.stb} where ts >= {sdel} and ts < {edel}" + tdSql.execute("delete " + sql, show=True) + tdSql.query("select * " + sql) + tdSql.checkRows(0) + + self.insert_rows -= drows + sql = f"select count(*) from {self.db}.{self.stb}" + tdSql.checkAgg(sql, self.insert_rows * self.childtable_count) + + + # run + def run(self): + tdLog.debug(f"start to excute {__file__}") + self.sname = "stream1" + if eos.isArm64Cpu(): + tdLog.success(f"{__file__} arm64 ignore executed") + else: + + self.preDb(10) + + # insert data + self.insertData() + + # creat stream + self.createStream(self.sname) + + # check insert data correct + #self.checkInsertCorrect() + + # save + self.snapshotAgg() + + # do action + self.doAction() + + # check save agg result correct + self.checkAggCorrect() + + # check insert correct again + self.checkInsertCorrect() + + # checkBasic + self.checkBasic() + + # check stream correct and drop stream + #self.checkStreamCorrect() + + # drop stream + self.dropStream(self.sname) + + # insert history disorder data + self.insertHistory() + #self.checkInsertCorrect() + self.snapshotAgg() + self.doAction() + self.checkAggCorrect() + self.checkInsertCorrect(difCnt=self.childtable_count*999999) + self.checkDelete() + self.doAction() + + # drop database and free s3 file + #self.dropDb() + + + tdLog.success(f"{__file__} successfully executed") + + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/army/enterprise/s3/s3Basic1.json b/tests/army/enterprise/s3/s3Basic1.json new file mode 100644 index 0000000000..ef7a169f77 --- /dev/null +++ b/tests/army/enterprise/s3/s3Basic1.json @@ -0,0 +1,66 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "connection_pool_size": 8, + "num_of_records_per_req": 5000, + "prepared_rand": 500, + "thread_count": 4, + "create_table_thread_count": 1, + "confirm_parameter_prompt": "no", + "databases": [ + { + "dbinfo": { + "name": "db", + "drop": "no", + "vgroups": 2, + "replica": 1, + "duration":"10d", + "s3_keeplocal":"30d", + "s3_chunksize":"131072", + "tsdb_pagesize":"1", + "s3_compact":"1", + "wal_retention_size":"1", + "wal_retention_period":"1", + "flush_each_batch":"no", + "keep": "3650d" + }, + "super_tables": [ + { + "name": "stb", + "child_table_exists": "yes", + "childtable_count": 10, + "insert_rows": 1000000, + "childtable_prefix": "d", + "insert_mode": "taosc", + "timestamp_step": 500, + "start_timestamp": 1600000000000, + "columns": [ + { "type": "bool", "name": "bc"}, + { "type": "float", "name": "fc" }, + { "type": "double", "name": "dc"}, + { "type": "tinyint", "name": "ti"}, + { "type": "smallint", "name": "si" }, + { "type": "int", "name": "ic" ,"max": 1,"min": 1}, + { "type": "bigint", "name": "bi" }, + { "type": "utinyint", "name": "uti"}, + { "type": "usmallint", "name": "usi"}, + { "type": "uint", "name": "ui" }, + { "type": "ubigint", "name": "ubi"}, + { "type": "binary", "name": "bin", "len": 32}, + { "type": "nchar", "name": "nch", "len": 64} + ], + "tags": [ + {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, + {"name": "location","type": "binary", "len": 16, "values": + ["San Francisco", "Los Angles", "San Diego", "San Jose", "Palo Alto", "Campbell", "Mountain View","Sunnyvale", "Santa Clara", "Cupertino"] + } + ] + } + ] + } + ] +} diff --git a/tests/army/enterprise/s3/s3_basic.py b/tests/army/enterprise/s3/s3_basic.py deleted file mode 100644 index e9173dda00..0000000000 --- a/tests/army/enterprise/s3/s3_basic.py +++ /dev/null @@ -1,157 +0,0 @@ -################################################################### -# Copyright (c) 2016 by TAOS Technologies, Inc. -# All rights reserved. -# -# This file is proprietary and confidential to TAOS Technologies. -# No part of this file may be reproduced, stored, transmitted, -# disclosed or used in any form or by any means other than as -# expressly provided by the written permission from Jianhui Tao -# -################################################################### - -# -*- coding: utf-8 -*- - -import sys -import time - -import taos -import frame -import frame.etool -import frame.eos - -from frame.log import * -from frame.cases import * -from frame.sql import * -from frame.caseBase import * -from frame.srvCtl import * -from frame import * -from frame.eos import * - -# -# 192.168.1.52 MINIO S3 -# - -''' -s3EndPoint http://192.168.1.52:9000 -s3AccessKey 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX' -s3BucketName ci-bucket -s3UploadDelaySec 60 -''' - - -class TDTestCase(TBase): - updatecfgDict = { - 's3EndPoint': 'http://192.168.1.52:9000', - 's3AccessKey': 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX', - 's3BucketName': 'ci-bucket', - 's3BlockSize': '10240', - 's3BlockCacheSize': '320', - 's3PageCacheSize': '10240', - 's3UploadDelaySec':'60' - } - - def insertData(self): - tdLog.info(f"insert data.") - # taosBenchmark run - json = etool.curFile(__file__, "s3_basic.json") - etool.benchMark(json=json) - - tdSql.execute(f"use {self.db}") - # come from s3_basic.json - self.childtable_count = 2 - self.insert_rows = 2000000 - self.timestamp_step = 1000 - - def createStream(self, sname): - sql = f"create stream {sname} fill_history 1 into stm1 as select count(*) from {self.db}.{self.stb} interval(10s);" - tdSql.execute(sql) - - def doAction(self): - tdLog.info(f"do action.") - - self.flushDb() - self.compactDb() - - # sleep 70s - tdLog.info(f"wait 65s ...") - time.sleep(65) - self.trimDb(True) - - rootPath = sc.clusterRootPath() - cmd = f"ls {rootPath}/dnode1/data2*/vnode/vnode*/tsdb/*.data" - tdLog.info(cmd) - loop = 0 - rets = [] - while loop < 180: - time.sleep(3) - rets = eos.runRetList(cmd) - cnt = len(rets) - if cnt == 0: - tdLog.info("All data file upload to server over.") - break - self.trimDb(True) - tdLog.info(f"loop={loop} no upload {cnt} data files wait 3s retry ...") - if loop == 0: - sc.dnodeStop(1) - time.sleep(2) - sc.dnodeStart(1) - loop += 1 - - if len(rets) > 0: - tdLog.exit(f"s3 can not upload all data to server. data files cnt={len(rets)} list={rets}") - - def checkStreamCorrect(self): - sql = f"select count(*) from {self.db}.stm1" - count = 0 - for i in range(120): - tdSql.query(sql) - count = tdSql.getData(0, 0) - if count == 100000 or count == 100001: - return True - time.sleep(1) - - tdLog.exit(f"stream count is not expect . expect = 100000 or 100001 real={count} . sql={sql}") - - # run - def run(self): - tdLog.debug(f"start to excute {__file__}") - self.sname = "stream1" - if eos.isArm64Cpu(): - tdLog.success(f"{__file__} arm64 ignore executed") - else: - # insert data - self.insertData() - - # creat stream - self.createStream(self.sname) - - # check insert data correct - self.checkInsertCorrect() - - # save - self.snapshotAgg() - - # do action - self.doAction() - - # check save agg result correct - self.checkAggCorrect() - - # check insert correct again - self.checkInsertCorrect() - - # check stream correct and drop stream - #self.checkStreamCorrect() - - # drop stream - self.dropStream(self.sname) - - # drop database and free s3 file - self.dropDb() - - tdLog.success(f"{__file__} successfully executed") - - - -tdCases.addLinux(__file__, TDTestCase()) -tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/army/frame/caseBase.py b/tests/army/frame/caseBase.py index 377ad228e9..21265d2fea 100644 --- a/tests/army/frame/caseBase.py +++ b/tests/army/frame/caseBase.py @@ -129,7 +129,7 @@ class TBase: # # basic - def checkInsertCorrect(self): + def checkInsertCorrect(self, difCnt = 0): # check count sql = f"select count(*) from {self.stb}" tdSql.checkAgg(sql, self.childtable_count * self.insert_rows) @@ -139,9 +139,8 @@ class TBase: tdSql.checkAgg(sql, self.childtable_count) # check step - sql = f"select * from (select diff(ts) as dif from {self.stb} partition by tbname order by ts desc) where dif != {self.timestamp_step}" - tdSql.query(sql) - tdSql.checkRows(0) + sql = f"select count(*) from (select diff(ts) as dif from {self.stb} partition by tbname order by ts desc) where dif != {self.timestamp_step}" + #tdSql.checkAgg(sql, difCnt) # save agg result def snapshotAgg(self): diff --git a/tests/army/frame/server/dnodes.py b/tests/army/frame/server/dnodes.py index cd2b89acbd..92c122665d 100644 --- a/tests/army/frame/server/dnodes.py +++ b/tests/army/frame/server/dnodes.py @@ -146,6 +146,10 @@ class TDDnodes: if index < 1 or index > 10: tdLog.exit("index:%d should on a scale of [1, 10]" % (index)) + def taosdFile(self, index): + self.check(index) + return self.dnodes[index - 1].getPath() + def StopAllSigint(self): tdLog.info("stop all dnodes sigint, asan:%d" % self.asan) if self.asan: diff --git a/tests/army/frame/sql.py b/tests/army/frame/sql.py index 91cd29d18b..23f8f090ca 100644 --- a/tests/army/frame/sql.py +++ b/tests/army/frame/sql.py @@ -658,6 +658,7 @@ class TDSql: def checkAgg(self, sql, expectCnt): self.query(sql) self.checkData(0, 0, expectCnt) + tdLog.info(f"{sql} expect {expectCnt} ok.") # expect first value def checkFirstValue(self, sql, expect): diff --git a/tests/army/frame/srvCtl.py b/tests/army/frame/srvCtl.py index 3a9b0cdf4b..0896ea897d 100644 --- a/tests/army/frame/srvCtl.py +++ b/tests/army/frame/srvCtl.py @@ -62,6 +62,15 @@ class srvCtl: return clusterDnodes.getDnodesRootDir() return tdDnodes.getDnodesRootDir() + + # get taosd path + def taosdFile(self, idx): + if clusterDnodes.getModel() == 'cluster': + return clusterDnodes.taosdFile(idx) + + return tdDnodes.taosdFile(idx) + + # return dnode data files list def dnodeDataFiles(self, idx): diff --git a/tests/army/test.py b/tests/army/test.py index 5ff1c7bdf5..dda5d7d5b0 100644 --- a/tests/army/test.py +++ b/tests/army/test.py @@ -114,7 +114,7 @@ if __name__ == "__main__": level = 1 disk = 1 - opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWU:n:i:aP:L:D:', [ + opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWU:n:i:aPL:D:', [ 'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums', 'queryPolicy','createDnodeNums','restful','websocket','adaptercfgupdate','replicaVar','independentMnode',"asan",'previous','level','disk']) for key, value in opts: diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 9f6f2a08f1..716622f727 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -11,7 +11,7 @@ # army-test # ,,y,army,./pytest.sh python3 ./test.py -f enterprise/multi-level/mlevel_basic.py -N 3 -L 3 -D 2 -,,y,army,./pytest.sh python3 ./test.py -f enterprise/s3/s3_basic.py -L 3 -D 1 +,,y,army,./pytest.sh python3 ./test.py -f enterprise/s3/s3Basic.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f community/cluster/snapshot.py -N 3 -L 3 -D 2 ,,y,army,./pytest.sh python3 ./test.py -f community/query/function/test_func_elapsed.py ,,y,army,./pytest.sh python3 ./test.py -f community/query/fill/fill_desc.py -N 3 -L 3 -D 2