diff --git a/include/common/tglobal.h b/include/common/tglobal.h index e6333d2ddc..8180243ff0 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -291,6 +291,7 @@ extern bool tsFilterScalarMode; extern int32_t tsMaxStreamBackendCache; extern int32_t tsPQSortMemThreshold; extern int32_t tsResolveFQDNRetryTime; +extern bool tsStreamCoverage; extern bool tsExperimental; // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 905dcb4fda..b732b7140a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -328,6 +328,7 @@ int64_t tsStreamBufferSize = 128 * 1024 * 1024; bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds int tsStreamAggCnt = 100000; +bool tsStreamCoverage = false; bool tsUpdateCacheBatch = true; @@ -733,6 +734,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tsmaDataDeleteMark", tsmaDataDeleteMark, 60 * 60 * 1000, INT64_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); + + TAOS_CHECK_RETURN(cfgAddBool(pCfg, "streamCoverage", tsStreamCoverage, CFG_DYN_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); + TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -1463,6 +1467,9 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "bypassFlag"); tsBypassFlag = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamCoverage"); + tsStreamCoverage = pItem->bval; + TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -2735,7 +2742,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { {"maxTsmaCalcDelay", &tsMaxTsmaCalcDelay}, {"tsmaDataDeleteMark", &tsmaDataDeleteMark}, {"numOfRpcSessions", &tsNumOfRpcSessions}, - {"bypassFlag", &tsBypassFlag}}; + {"bypassFlag", &tsBypassFlag}, + {"streamCoverage", &tsStreamCoverage}}; if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) { code = taosCfgSetOption(options, tListLen(options), pItem, false); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index feaa6910f2..342bd6d66e 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -11411,7 +11411,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm if (pSelect->hasInterpFunc) { // Temporary code - if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + if (tsStreamCoverage == false && pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Stream interp function only support force window close"); } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index c25b0f34fc..41ac0117f3 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -250,6 +250,9 @@ void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId); // inject stream errors void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId); +int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type); +int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 876cd1b472..0ec66cd2ce 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -21,7 +21,9 @@ static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); static int32_t deleteCheckpointFile(const char* id, const char* name); static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId); +#ifdef BUILD_NO_CALL static int32_t deleteCheckpoint(const char* id); +#endif static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, @@ -998,7 +1000,7 @@ static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** return 0; } -static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList) { +int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList) { const char* id = pTask->id.idStr; SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; @@ -1492,6 +1494,7 @@ int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t che return 0; } +#ifdef BUILD_NO_CALL int32_t deleteCheckpoint(const char* id) { if (id == NULL || strlen(id) == 0) { stError("deleteCheckpoint parameters invalid"); @@ -1504,6 +1507,7 @@ int32_t deleteCheckpoint(const char* id) { } return 0; } +#endif int32_t deleteCheckpointFile(const char* id, const char* name) { char object[128] = {0}; diff --git a/source/libs/stream/test/streamCheckPointTest.cpp b/source/libs/stream/test/streamCheckPointTest.cpp new file mode 100644 index 0000000000..80dd3ec142 --- /dev/null +++ b/source/libs/stream/test/streamCheckPointTest.cpp @@ -0,0 +1,270 @@ +#include +#include "tstream.h" +#include "streamInt.h" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" +#pragma GCC diagnostic ignored "-Wsign-compare" +#pragma GCC diagnostic ignored "-Wformat" +#pragma GCC diagnostic ignored "-Wint-to-pointer-cast" +#pragma GCC diagnostic ignored "-Wpointer-arith" + +void initTaskLock(SStreamTask* pTask) { + TdThreadMutexAttr attr = {0}; + int32_t code = taosThreadMutexAttrInit(&attr); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = taosThreadMutexInit(&pTask->lock, &attr); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = taosThreadMutexAttrDestroy(&attr); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); +} + +TEST(streamCheckpointTest, StreamTaskProcessCheckpointTriggerRsp) { + SStreamTask* pTask = NULL; + int64_t uid = 1111111111111111; + SArray* array = taosArrayInit(4, POINTER_BYTES); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + initTaskLock(pTask); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->activeId = 123111; + pTask->chkInfo.pActiveInfo->transId = 4561111; + + streamTaskSetStatusReady(pTask); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SCheckpointTriggerRsp pRsp; + memset(&pRsp, 0, sizeof(SCheckpointTriggerRsp)); + pRsp.rspCode = TSDB_CODE_SUCCESS; + pRsp.checkpointId = 123; + pRsp.transId = 456; + pRsp.upstreamTaskId = 789; + + code = streamTaskProcessCheckpointTriggerRsp(pTask, &pRsp); + ASSERT_NE(code, TSDB_CODE_SUCCESS); + + pRsp.rspCode = TSDB_CODE_FAILED; + code = streamTaskProcessCheckpointTriggerRsp(pTask, &pRsp); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + tFreeStreamTask(pTask); + taosArrayDestroy(array); +} + +TEST(streamCheckpointTest, StreamTaskSetFailedCheckpointId) { + SStreamTask* pTask = NULL; + int64_t uid = 1111111111111111; + SArray* array = taosArrayInit(4, POINTER_BYTES); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + initTaskLock(pTask); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + pInfo->failedId = 0; + + int64_t failedCheckpointId = 123; + + streamTaskSetFailedCheckpointId(pTask, failedCheckpointId); + ASSERT_EQ(pInfo->failedId, failedCheckpointId); + + streamTaskSetFailedCheckpointId(pTask, 0); + ASSERT_EQ(pInfo->failedId, failedCheckpointId); + + streamTaskSetFailedCheckpointId(pTask, pInfo->failedId - 1); + ASSERT_EQ(pInfo->failedId, failedCheckpointId); + tFreeStreamTask(pTask); + taosArrayDestroy(array); +} + +TEST(UploadCheckpointDataTest, UploadSuccess) { + streamMetaInit(); + SStreamTask* pTask = NULL; + int64_t uid = 1111111111111111; + SArray* array = taosArrayInit(4, POINTER_BYTES); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + initTaskLock(pTask); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + int64_t checkpointId = 123; + int64_t dbRefId = 1; + ECHECKPOINT_BACKUP_TYPE type = DATA_UPLOAD_S3; + + STaskDbWrapper* pBackend = NULL; + int64_t processVer = -1; + const char *path = "/tmp/backend3/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0); + ASSERT(pState != NULL); + + pTask->pBackend = pState->pTdbState->pOwner->pBackend; + + code = taskDbDoCheckpoint(pTask->pBackend, checkpointId, 0); + ASSERT(code == 0); + + int32_t result = uploadCheckpointData(pTask, checkpointId, dbRefId, type); + + EXPECT_EQ(result, TSDB_CODE_SUCCESS) << "uploadCheckpointData should return 0 on success"; + tFreeStreamTask(pTask); + taosRemoveDir(path); + streamStateClose(pState, true); + taosArrayDestroy(array); +} + +TEST(UploadCheckpointDataTest, UploadDisabled) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, POINTER_BYTES); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + initTaskLock(pTask); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + int64_t checkpointId = 123; + int64_t dbRefId = 1; + + STaskDbWrapper* pBackend = NULL; + int64_t processVer = -1; + const char *path = "/tmp/backend4/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0); + ASSERT(pState != NULL); + + pTask->pBackend = pState->pTdbState->pOwner->pBackend; + + code = taskDbDoCheckpoint(pTask->pBackend, checkpointId, 0); + ASSERT(code == 0); + + ECHECKPOINT_BACKUP_TYPE type = DATA_UPLOAD_DISABLE; + + int32_t result = uploadCheckpointData(pTask, checkpointId, dbRefId, type); + + EXPECT_NE(result, TSDB_CODE_SUCCESS) << "uploadCheckpointData should return 0 when backup type is disabled"; + + streamStateClose(pState, true); + tFreeStreamTask(pTask); + taosArrayDestroy(array); +} + +TEST(StreamTaskAlreadySendTriggerTest, AlreadySendTrigger) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, POINTER_BYTES); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + initTaskLock(pTask); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->activeId = 123111; + pTask->chkInfo.pActiveInfo->transId = 4561111; + + streamTaskSetStatusReady(pTask); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + int32_t downstreamNodeId = 1; + int64_t sendingCheckpointId = 123; + TSKEY ts = taosGetTimestampMs(); + + STaskTriggerSendInfo triggerInfo; + triggerInfo.sendTs = ts; + triggerInfo.recved = false; + triggerInfo.nodeId = downstreamNodeId; + + taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo); + + pTask->chkInfo.pActiveInfo->dispatchTrigger = true; + bool result = streamTaskAlreadySendTrigger(pTask, downstreamNodeId); + + EXPECT_TRUE(result) << "The trigger message should have been sent to the downstream node"; + + tFreeStreamTask(pTask); + taosArrayDestroy(array); +} + +TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, POINTER_BYTES); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + initTaskLock(pTask); + + const char *path = "/tmp/backend5/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->activeId = 123111; + pTask->chkInfo.pActiveInfo->chkptTriggerMsgTmr.launchChkptId = pTask->chkInfo.pActiveInfo->activeId; + pTask->chkInfo.pActiveInfo->transId = 4561111; + pTask->chkInfo.startTs = 11111; + + streamTaskSetStatusReady(pTask); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + int32_t downstreamNodeId = 1; + int64_t sendingCheckpointId = 123; + TSKEY ts = taosGetTimestampMs(); + + STaskTriggerSendInfo triggerInfo; + triggerInfo.sendTs = ts; + triggerInfo.recved = false; + triggerInfo.nodeId = downstreamNodeId; + + taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo); + + pTask->chkInfo.pActiveInfo->dispatchTrigger = true; + SArray* array1 = NULL; + code = chkptTriggerRecvMonitorHelper(pTask, NULL, &array1); + EXPECT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->pMeta->fatalInfo.code = TSDB_CODE_SUCCESS; + streamSetFatalError(pTask->pMeta, code, __func__, __LINE__); + + pTask->pMeta->fatalInfo.code = TSDB_CODE_FAILED; + streamSetFatalError(pTask->pMeta, code, __func__, __LINE__); + tFreeStreamTask(pTask); + taosArrayDestroy(array); + taosArrayDestroy(array1); +} diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 00a17c8c96..1aef2195db 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1420,34 +1420,36 @@ ,,y,script,./test.sh -f tsim/stream/sliding.sim ,,y,script,./test.sh -f tsim/stream/state0.sim ,,y,script,./test.sh -f tsim/stream/state1.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpDelete0.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpDelete1.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpDelete2.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpError.sim +,,y,script,./test.sh -f tsim/stream/streamInterpDelete0.sim +,,y,script,./test.sh -f tsim/stream/streamInterpDelete1.sim +,,y,script,./test.sh -f tsim/stream/streamInterpDelete2.sim +,,y,script,./test.sh -f tsim/stream/streamInterpError.sim ,,y,script,./test.sh -f tsim/stream/streamInterpForceWindowClose.sim ,,y,script,./test.sh -f tsim/stream/streamInterpForceWindowClose1.sim ,,y,script,./test.sh -f tsim/stream/streamInterpFwcError.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpHistory.sim +,,y,script,./test.sh -f tsim/stream/streamInterpHistory.sim #,,y,script,./test.sh -f tsim/stream/streamInterpHistory1.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpLarge.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpLinear0.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpNext0.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpOther.sim +,,y,script,./test.sh -f tsim/stream/streamInterpLarge.sim +,,y,script,./test.sh -f tsim/stream/streamInterpLinear0.sim +,,y,script,./test.sh -f tsim/stream/streamInterpNext0.sim +,,y,script,./test.sh -f tsim/stream/streamInterpOther.sim #,,y,script,./test.sh -f tsim/stream/streamInterpOther1.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpPartitionBy0.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPartitionBy0.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPartitionBy1.sim #,,y,script,./test.sh -f tsim/stream/streamInterpPrev0.sim #,,y,script,./test.sh -f tsim/stream/streamInterpPrev1.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey0.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey1.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey2.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey3.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpUpdate.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpUpdate1.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpValue0.sim -#,,y,script,./test.sh -f tsim/stream/streamPrimaryKey0.sim -#,,y,script,./test.sh -f tsim/stream/streamPrimaryKey1.sim -#,,y,script,./test.sh -f tsim/stream/streamPrimaryKey2.sim -#,,y,script,./test.sh -f tsim/stream/streamPrimaryKey3.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey0.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey1.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey2.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey3.sim +,,y,script,./test.sh -f tsim/stream/streamInterpUpdate.sim +,,y,script,./test.sh -f tsim/stream/streamInterpUpdate1.sim +,,y,script,./test.sh -f tsim/stream/streamInterpUpdate2.sim +,,y,script,./test.sh -f tsim/stream/streamInterpValue0.sim +,,y,script,./test.sh -f tsim/stream/streamPrimaryKey0.sim +,,y,script,./test.sh -f tsim/stream/streamPrimaryKey1.sim +,,y,script,./test.sh -f tsim/stream/streamPrimaryKey2.sim +,,y,script,./test.sh -f tsim/stream/streamPrimaryKey3.sim ,,y,script,./test.sh -f tsim/stream/streamTwaError.sim ,,y,script,./test.sh -f tsim/stream/streamTwaFwcFill.sim ,,y,script,./test.sh -f tsim/stream/streamTwaFwcFillPrimaryKey.sim diff --git a/tests/script/tsim/stream/streamInterpDelete0.sim b/tests/script/tsim/stream/streamInterpDelete0.sim index 21bac13e4a..440d7ce413 100644 --- a/tests/script/tsim/stream/streamInterpDelete0.sim +++ b/tests/script/tsim/stream/streamInterpDelete0.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpDelete1.sim b/tests/script/tsim/stream/streamInterpDelete1.sim index 162da175e8..9413cf8918 100644 --- a/tests/script/tsim/stream/streamInterpDelete1.sim +++ b/tests/script/tsim/stream/streamInterpDelete1.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpDelete2.sim b/tests/script/tsim/stream/streamInterpDelete2.sim index be27dcda49..fb53678eff 100644 --- a/tests/script/tsim/stream/streamInterpDelete2.sim +++ b/tests/script/tsim/stream/streamInterpDelete2.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpError.sim b/tests/script/tsim/stream/streamInterpError.sim index 53a92df772..f0f4e80ade 100644 --- a/tests/script/tsim/stream/streamInterpError.sim +++ b/tests/script/tsim/stream/streamInterpError.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step2 sql create database test2 vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpHistory.sim b/tests/script/tsim/stream/streamInterpHistory.sim index b9685ebf05..9737e7d155 100644 --- a/tests/script/tsim/stream/streamInterpHistory.sim +++ b/tests/script/tsim/stream/streamInterpHistory.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpLarge.sim b/tests/script/tsim/stream/streamInterpLarge.sim index 85203d2d9e..2626f49b6a 100644 --- a/tests/script/tsim/stream/streamInterpLarge.sim +++ b/tests/script/tsim/stream/streamInterpLarge.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpLinear0.sim b/tests/script/tsim/stream/streamInterpLinear0.sim index 7d4b28d545..c52540895b 100644 --- a/tests/script/tsim/stream/streamInterpLinear0.sim +++ b/tests/script/tsim/stream/streamInterpLinear0.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpNext0.sim b/tests/script/tsim/stream/streamInterpNext0.sim index abdbeda634..4395031aec 100644 --- a/tests/script/tsim/stream/streamInterpNext0.sim +++ b/tests/script/tsim/stream/streamInterpNext0.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpOther.sim b/tests/script/tsim/stream/streamInterpOther.sim index 8553e67ec8..4572bfca56 100644 --- a/tests/script/tsim/stream/streamInterpOther.sim +++ b/tests/script/tsim/stream/streamInterpOther.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 4; diff --git a/tests/script/tsim/stream/streamInterpPartitionBy0.sim b/tests/script/tsim/stream/streamInterpPartitionBy0.sim index 6b222de228..543bb48a1c 100644 --- a/tests/script/tsim/stream/streamInterpPartitionBy0.sim +++ b/tests/script/tsim/stream/streamInterpPartitionBy0.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step prev print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpPartitionBy1.sim b/tests/script/tsim/stream/streamInterpPartitionBy1.sim index ecb5e0ee62..c8138ac05f 100644 --- a/tests/script/tsim/stream/streamInterpPartitionBy1.sim +++ b/tests/script/tsim/stream/streamInterpPartitionBy1.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step NULL print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpPrimaryKey0.sim b/tests/script/tsim/stream/streamInterpPrimaryKey0.sim index 9edddff6db..1bbc2a9b5d 100644 --- a/tests/script/tsim/stream/streamInterpPrimaryKey0.sim +++ b/tests/script/tsim/stream/streamInterpPrimaryKey0.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpPrimaryKey1.sim b/tests/script/tsim/stream/streamInterpPrimaryKey1.sim index 04a1f299be..0db33c9767 100644 --- a/tests/script/tsim/stream/streamInterpPrimaryKey1.sim +++ b/tests/script/tsim/stream/streamInterpPrimaryKey1.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpPrimaryKey2.sim b/tests/script/tsim/stream/streamInterpPrimaryKey2.sim index f06e1ecd03..0574a1ceec 100644 --- a/tests/script/tsim/stream/streamInterpPrimaryKey2.sim +++ b/tests/script/tsim/stream/streamInterpPrimaryKey2.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpPrimaryKey3.sim b/tests/script/tsim/stream/streamInterpPrimaryKey3.sim index 725cf8d850..23cb0a58e6 100644 --- a/tests/script/tsim/stream/streamInterpPrimaryKey3.sim +++ b/tests/script/tsim/stream/streamInterpPrimaryKey3.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpUpdate.sim b/tests/script/tsim/stream/streamInterpUpdate.sim index 59a188c2a6..394ac1a341 100644 --- a/tests/script/tsim/stream/streamInterpUpdate.sim +++ b/tests/script/tsim/stream/streamInterpUpdate.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpUpdate1.sim b/tests/script/tsim/stream/streamInterpUpdate1.sim index 45f16af35d..3987afa21e 100644 --- a/tests/script/tsim/stream/streamInterpUpdate1.sim +++ b/tests/script/tsim/stream/streamInterpUpdate1.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpUpdate2.sim b/tests/script/tsim/stream/streamInterpUpdate2.sim index 2a71474dd7..cde5b589e8 100644 --- a/tests/script/tsim/stream/streamInterpUpdate2.sim +++ b/tests/script/tsim/stream/streamInterpUpdate2.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpValue0.sim b/tests/script/tsim/stream/streamInterpValue0.sim index bce7f0ece6..2cbf61f4bd 100644 --- a/tests/script/tsim/stream/streamInterpValue0.sim +++ b/tests/script/tsim/stream/streamInterpValue0.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1;