diff --git a/docs/en/06-advanced/03-stream.md b/docs/en/06-advanced/03-stream.md index 3db3a3a18e..9c8dd44655 100644 --- a/docs/en/06-advanced/03-stream.md +++ b/docs/en/06-advanced/03-stream.md @@ -30,7 +30,7 @@ stream_options: { WATERMARK time IGNORE EXPIRED [0|1] DELETE_MARK time - FILL_HISTORY [0|1] + FILL_HISTORY [0|1] [ASYNC] IGNORE UPDATE [0|1] } @@ -109,7 +109,7 @@ Under normal circumstances, stream computation tasks will not process data that By enabling the fill_history option, the created stream computation task will be capable of processing data written before, during, and after the creation of the stream. This means that data written either before or after the creation of the stream will be included in the scope of stream computation, thus ensuring data integrity and consistency. This setting provides users with greater flexibility, allowing them to flexibly handle historical and new data according to actual needs. Tips: -- When enabling fill_ristory, creating a stream requires finding the boundary point of historical data. If there is a lot of historical data, it may cause the task of creating a stream to take a long time. In this case, the parameter streamRunHistorySync (supported since version 3.3.6.0) can be configured to 1 (default is 0), and the task of creating a stream can be processed in the background. The statement of creating a stream can be returned immediately without blocking subsequent operations. +- When enabling fill_history, creating a stream requires finding the boundary point of historical data. If there is a lot of historical data, it may cause the task of creating a stream to take a long time. In this case, you can use fill_history 1 async (supported since version 3.3.6.0) , then the task of creating a stream can be processed in the background. The statement of creating a stream can be returned immediately without blocking subsequent operations. async only takes effect when fill_history 1 is used, and creating a stream with fill_history 0 is very fast and does not require asynchronous processing. - Show streams can be used to view the progress of background stream creation (ready status indicates success, init status indicates stream creation in progress, failed status indicates that the stream creation has failed, and the message column can be used to view the reason for the failure. In the case of failed stream creation, the stream can be deleted and rebuilt). diff --git a/docs/en/14-reference/01-components/02-taosc.md b/docs/en/14-reference/01-components/02-taosc.md index 69c06a5c23..83cb3d765b 100644 --- a/docs/en/14-reference/01-components/02-taosc.md +++ b/docs/en/14-reference/01-components/02-taosc.md @@ -72,12 +72,6 @@ The TDengine client driver provides all the APIs needed for application programm | tempDir | |Supported, effective immediately | Specifies the directory for generating temporary files during operation, default on Linux platform is /tmp | | minimalTmpDirGB | |Supported, effective immediately | Minimum space required to be reserved in the directory specified by tempDir, in GB, default value: 1 | -### Stream Related - -| Parameter Name |Supported Version|Dynamic Modification| Description | -|-----------------------|----------|--------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| streamRunHistoryAsync | 3.3.6.0 |Supported, effective immediately | When creating a stream with the fill_history parameter, should the stream statement be executed asynchronously. Boolean value, async if true, sync if false. default is false | - ### Log Related |Parameter Name|Supported Version|Dynamic Modification|Description| diff --git a/docs/en/14-reference/03-taos-sql/14-stream.md b/docs/en/14-reference/03-taos-sql/14-stream.md index 24d9bae468..6ff081b21e 100644 --- a/docs/en/14-reference/03-taos-sql/14-stream.md +++ b/docs/en/14-reference/03-taos-sql/14-stream.md @@ -15,7 +15,7 @@ stream_options: { WATERMARK time IGNORE EXPIRED [0|1] DELETE_MARK time - FILL_HISTORY [0|1] + FILL_HISTORY [0|1] [ASYNC] IGNORE UPDATE [0|1] } @@ -127,6 +127,13 @@ create stream if not exists s1 fill_history 1 into st1 as select count(*) from If the stream task is completely outdated and you no longer want it to monitor or process data, you can manually delete it. The computed data will still be retained. +Tips: +- When enabling fill_history, creating a stream requires finding the boundary point of historical data. If there is a lot of historical data, it may cause the task of creating a stream to take a long time. In this case, you can use fill_history 1 async (supported since version 3.3.6.0) , then the task of creating a stream can be processed in the background. The statement of creating a stream can be returned immediately without blocking subsequent operations. async only takes effect when fill_history 1 is used, and creating a stream with fill_history 0 is very fast and does not require asynchronous processing. + +- Show streams can be used to view the progress of background stream creation (ready status indicates success, init status indicates stream creation in progress, failed status indicates that the stream creation has failed, and the message column can be used to view the reason for the failure. In the case of failed stream creation, the stream can be deleted and rebuilt). + +- Besides, do not create multiple streams asynchronously at the same time, as transaction conflicts may cause subsequent streams to fail. + ## Deleting Stream Computing ```sql diff --git a/docs/zh/06-advanced/03-stream.md b/docs/zh/06-advanced/03-stream.md index bd112d29d8..ac922353d7 100644 --- a/docs/zh/06-advanced/03-stream.md +++ b/docs/zh/06-advanced/03-stream.md @@ -27,7 +27,7 @@ stream_options: { WATERMARK time IGNORE EXPIRED [0|1] DELETE_MARK time - FILL_HISTORY [0|1] + FILL_HISTORY [0|1] [ASYNC] IGNORE UPDATE [0|1] } @@ -102,7 +102,7 @@ PARTITION 子句中,为 tbname 定义了一个别名 tname, 在 PARTITION 通过启用 fill_history 选项,创建的流计算任务将具备处理创建前、创建过程中以及创建后写入的数据的能力。这意味着,无论数据是在流创建之前还是之后写入的,都将纳入流计算的范围,从而确保数据的完整性和一致性。这一设置为用户提供了更大的灵活性,使其能够根据实际需求灵活处理历史数据和新数据。 注意: -- 开启 fill_history 时,创建流需要找到历史数据的分界点,如果历史数据很多,可能会导致创建流任务耗时较长,此时可以配置参数 streamRunHistoryAsync(3.3.6.0版本开始支持) 为 1 (默认为0),将创建流的任务放在后台处理,创建流的语句可立即返回,不阻塞后面的操作。 +- 开启 fill_history 时,创建流需要找到历史数据的分界点,如果历史数据很多,可能会导致创建流任务耗时较长,此时可以通过 fill_history 1 async(3.3.6.0版本开始支持) 语法将创建流的任务放在后台处理,创建流的语句可立即返回,不阻塞后面的操作。async 只对 fill_history 1 起效,fill_history 0 时建流很快,不需要异步处理。 - 通过 show streams 可查看后台建流的进度(ready 状态表示成功,init 状态表示正在建流,failed 状态表示建流失败,失败时 message 列可以查看原因。对于建流失败的情况可以删除流重新建立)。 diff --git a/docs/zh/14-reference/01-components/02-taosc.md b/docs/zh/14-reference/01-components/02-taosc.md index dfe840b575..8952e35664 100755 --- a/docs/zh/14-reference/01-components/02-taosc.md +++ b/docs/zh/14-reference/01-components/02-taosc.md @@ -305,15 +305,6 @@ TDengine 客户端驱动提供了应用编程所需要的全部 API,并且在 - 动态修改:不支持 - 支持版本:从 v3.1.0.0 版本开始引入 -### 流相关 - -#### streamRunHistoryAsync -- 说明:创建流有 fill_history 参数时,是否异步执行建流语句 -- 类型:布尔;false:同步;true:异步 -- 默认值:false -- 动态修改:支持通过 SQL 修改,立即生效 -- 支持版本:从 v3.3.6.0 版本开始引入 - ### 日志相关 #### logDir diff --git a/docs/zh/14-reference/03-taos-sql/14-stream.md b/docs/zh/14-reference/03-taos-sql/14-stream.md index ec57d54ab1..41013bfd65 100644 --- a/docs/zh/14-reference/03-taos-sql/14-stream.md +++ b/docs/zh/14-reference/03-taos-sql/14-stream.md @@ -14,7 +14,7 @@ stream_options: { WATERMARK time IGNORE EXPIRED [0|1] DELETE_MARK time - FILL_HISTORY [0|1] + FILL_HISTORY [0|1] [ASYNC] IGNORE UPDATE [0|1] } @@ -127,6 +127,13 @@ create stream if not exists s1 fill_history 1 into st1 as select count(*) from 如果该流任务已经彻底过期,并且您不再想让它检测或处理数据,您可以手动删除它,被计算出的数据仍会被保留。 +注意: +- 开启 fill_history 时,创建流需要找到历史数据的分界点,如果历史数据很多,可能会导致创建流任务耗时较长,此时可以通过 fill_history 1 async(3.3.6.0版本开始支持) 语法将创建流的任务放在后台处理,创建流的语句可立即返回,不阻塞后面的操作。async 只对 fill_history 1 起效,fill_history 0 时建流很快,不需要异步处理。 + +- 通过 show streams 可查看后台建流的进度(ready 状态表示成功,init 状态表示正在建流,failed 状态表示建流失败,失败时 message 列可以查看原因。对于建流失败的情况可以删除流重新建立)。 + +- 另外,不要同时异步创建多个流,可能由于事务冲突导致后面创建的流失败。 + ## 删除流式计算 ```sql diff --git a/include/common/tglobal.h b/include/common/tglobal.h index ec28529e12..c916f6764b 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -297,7 +297,6 @@ extern bool tsFilterScalarMode; extern int32_t tsMaxStreamBackendCache; extern int32_t tsPQSortMemThreshold; extern bool tsStreamCoverage; -extern bool tsStreamRunHistoryAsync; extern int8_t tsS3EpNum; extern int32_t tsStreamNotifyMessageSize; extern int32_t tsStreamNotifyFrameSize; diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 0b71846894..246b352d1a 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -608,6 +608,7 @@ typedef struct SStreamOptions { SNode* pDeleteMark; SNode* pRecInterval; int8_t fillHistory; + bool runHistoryAsync; int8_t ignoreExpired; int8_t ignoreUpdate; int64_t setFlag; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index f86f984e0c..03ace921c5 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -956,7 +956,9 @@ int32_t processCreateStreamFirstPhaseRsp(void* param, SDataBuf* pMsg, int32_t co taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); - if (code == 0 && !pRequest->streamRunHistory && tsStreamRunHistoryAsync){ + if (code == 0 && !pRequest->streamRunHistory && + ((SCreateStreamStmt*)(pRequest->pQuery->pRoot))->pOptions->fillHistory && + ((SCreateStreamStmt*)(pRequest->pQuery->pRoot))->pOptions->runHistoryAsync){ processCreateStreamSecondPhase(pRequest); } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d8c4b10e27..24deec8a1d 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -351,7 +351,6 @@ int64_t tsStreamFailedTimeout = 30 * 60 * 1000; bool tsFilterScalarMode = false; int tsStreamAggCnt = 100000; bool tsStreamCoverage = false; -bool tsStreamRunHistoryAsync = false; char tsAdapterFqdn[TSDB_FQDN_LEN] = "localhost"; uint16_t tsAdapterPort = 6041; @@ -784,9 +783,6 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddBool(pCfg, "compareAsStrInGreatest", tsCompareAsStrInGreatest, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT,CFG_CATEGORY_LOCAL)); - TAOS_CHECK_RETURN( - cfgAddBool(pCfg, "streamRunHistoryAsync", tsStreamRunHistoryAsync, CFG_DYN_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); - TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -1529,9 +1525,6 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamCoverage"); tsStreamCoverage = pItem->bval; - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamRunHistoryAsync"); - tsStreamRunHistoryAsync = pItem->bval; - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "compareAsStrInGreatest"); tsCompareAsStrInGreatest = pItem->bval; @@ -2873,7 +2866,6 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { {"numOfRpcSessions", &tsNumOfRpcSessions}, {"bypassFlag", &tsBypassFlag}, {"safetyCheckLevel", &tsSafetyCheckLevel}, - {"streamRunHistoryAsync", &tsStreamRunHistoryAsync}, {"streamCoverage", &tsStreamCoverage}, {"compareAsStrInGreatest", &tsCompareAsStrInGreatest}}; diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index b7c4b780aa..2f5f344a95 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -326,7 +326,7 @@ SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pFuncName); SNode* createStreamOptions(SAstCreateContext* pCxt); SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken, - SNode* pNode); + SNode* pNode, bool runHistoryAsync); SNode* createStreamNotifyOptions(SAstCreateContext *pCxt, SNodeList* pAddrUrls, SNodeList* pEventTypes); SNode* setStreamNotifyOptions(SAstCreateContext* pCxt, SNode* pNode, EStreamNotifyOptionSetFlag setFlag, SToken* pToken); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 1def8af34a..77b2ae86bf 100755 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -867,17 +867,18 @@ tag_def_or_ref_opt(A) ::= tags_def(B). tag_def_or_ref_opt(A) ::= TAGS NK_LP column_stream_def_list(B) NK_RP. { A = B; } stream_options(A) ::= . { A = createStreamOptions(pCxt); } -stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); } -stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); } -stream_options(A) ::= stream_options(B) TRIGGER FORCE_WINDOW_CLOSE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); } +stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL, false); } +stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL, false); } +stream_options(A) ::= stream_options(B) TRIGGER FORCE_WINDOW_CLOSE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL, false); } stream_options(A) ::= stream_options(B) TRIGGER CONTINUOUS_WINDOW_CLOSE(C) - recalculate_opt(D). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, D); } -stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY(C) duration_literal(D). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, releaseRawExprNode(pCxt, D)); } -stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_WATERMARK_SET, NULL, releaseRawExprNode(pCxt, C)); } -stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_EXPIRED_SET, &C, NULL); } -stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_FILL_HISTORY_SET, &C, NULL); } -stream_options(A) ::= stream_options(B) DELETE_MARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_DELETE_MARK_SET, NULL, releaseRawExprNode(pCxt, C)); } -stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_UPDATE_SET, &C, NULL); } + recalculate_opt(D). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, D, false); } +stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY(C) duration_literal(D). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, releaseRawExprNode(pCxt, D), false); } +stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_WATERMARK_SET, NULL, releaseRawExprNode(pCxt, C), false); } +stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_EXPIRED_SET, &C, NULL, false); } +stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C) ASYNC. { A = setStreamOptions(pCxt, B, SOPT_FILL_HISTORY_SET, &C, NULL, true); } +stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_FILL_HISTORY_SET, &C, NULL, false); } +stream_options(A) ::= stream_options(B) DELETE_MARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_DELETE_MARK_SET, NULL, releaseRawExprNode(pCxt, C), false); } +stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_UPDATE_SET, &C, NULL, false); } recalculate_opt(A) ::= . { A = NULL; } recalculate_opt(A) ::= RECALCULATE duration_literal(B). { A = releaseRawExprNode(pCxt, B); } @@ -1878,4 +1879,4 @@ column_ref(A) ::= column_name_list(B). %type column_name_list { STokenTriplet* } %destructor column_name_list { } column_name_list(A) ::= NK_ID(B). { A = createTokenTriplet(pCxt, B); } -column_name_list(A) ::= column_name_list(B) NK_DOT NK_ID(C). { A = setColumnName(pCxt, B, C); } \ No newline at end of file +column_name_list(A) ::= column_name_list(B) NK_DOT NK_ID(C). { A = setColumnName(pCxt, B, C); } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 20ff568d10..7c5a08161d 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -4036,6 +4036,7 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) { pOptions->fillHistory = STREAM_DEFAULT_FILL_HISTORY; pOptions->ignoreExpired = STREAM_DEFAULT_IGNORE_EXPIRED; pOptions->ignoreUpdate = STREAM_DEFAULT_IGNORE_UPDATE; + pOptions->runHistoryAsync = false; return (SNode*)pOptions; _err: return NULL; @@ -4060,7 +4061,7 @@ static int8_t getTriggerType(uint32_t tokenType) { } SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken, - SNode* pNode) { + SNode* pNode, bool runHistoryAsync) { SStreamOptions* pStreamOptions = (SStreamOptions*)pOptions; if (BIT_FLAG_TEST_MASK(setflag, pStreamOptions->setFlag)) { pCxt->errCode = @@ -4097,7 +4098,7 @@ SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptions break; } BIT_FLAG_SET_MASK(pStreamOptions->setFlag, setflag); - + pStreamOptions->runHistoryAsync = runHistoryAsync; return pOptions; } diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index 894dd6ba48..7f70a4329c 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -29,6 +29,7 @@ typedef struct SKeyword { // clang-format off // keywords in sql string static SKeyword keywordTable[] = { + {"ASYNC", TK_ASYNC}, {"ACCOUNT", TK_ACCOUNT}, {"ACCOUNTS", TK_ACCOUNTS}, {"ADD", TK_ADD}, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c old mode 100755 new mode 100644 index f4254ad9fa..61521c7ea1 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -13416,7 +13416,7 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB); code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL); } - if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->fillHistory && (pCxt->pParseCxt->streamRunHistory || !tsStreamRunHistoryAsync)) { + if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->fillHistory && (pCxt->pParseCxt->streamRunHistory || !pStmt->pOptions->runHistoryAsync)) { SRealTableNode* pTable = (SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable); code = createLastTsSelectStmt(pTable->table.dbName, pTable->table.tableName, pTable->pMeta->schema[0].name, &pStmt->pPrevQuery); diff --git a/tests/system-test/8-stream/ts-5617.py b/tests/system-test/8-stream/ts-5617.py index 2112c46c0f..3714952b0e 100755 --- a/tests/system-test/8-stream/ts-5617.py +++ b/tests/system-test/8-stream/ts-5617.py @@ -78,7 +78,7 @@ insertJson = '''{ class TDTestCase: updatecfgDict = {'debugFlag': 135, 'asynclog': 0, 'streamFailedTimeout': 10000} - clientCfgDict = {'debugFlag': 135, 'asynclog': 0, 'streamRunHistoryAsync': 1} + clientCfgDict = {'debugFlag': 135, 'asynclog': 0} updatecfgDict["clientCfg"] = clientCfgDict def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -97,7 +97,7 @@ class TDTestCase: tdLog.info("test creating stream with history in normal ......") start_time = time.time() - tdSql.execute(f'create stream s21 fill_history 1 into ts5617.st21 tags(tname varchar(20)) subtable(tname) as select last(val), last(quality) from ts5617.stb_2_2_1 partition by tbname tname interval(1800s);') + tdSql.execute(f'create stream s21 fill_history 1 async into ts5617.st21 tags(tname varchar(20)) subtable(tname) as select last(val), last(quality) from ts5617.stb_2_2_1 partition by tbname tname interval(1800s);') end_time = time.time() if end_time - start_time > 1: tdLog.exit("create history stream sync too long") @@ -118,7 +118,7 @@ class TDTestCase: tdSql.execute(f'drop table if exists ts5617.st21') tdLog.info("test creating stream with history in taosd error ......") - tdSql.execute(f'create stream s211 fill_history 1 into ts5617.st211 tags(tname varchar(20)) subtable(tname) as select last(val), last(quality) from ts5617.stb_2_2_1 partition by tbname tname interval(1800s);') + tdSql.execute(f'create stream s211 fill_history 1 async into ts5617.st211 tags(tname varchar(20)) subtable(tname) as select last(val), last(quality) from ts5617.stb_2_2_1 partition by tbname tname interval(1800s);') tdSql.execute(f'create stable ts5617.st211(ts timestamp, i int) tags(tname varchar(20))') tdSql.query("show streams") @@ -139,8 +139,8 @@ class TDTestCase: tdSql.execute(f'drop table if exists ts5617.st211') tdLog.info("test creating stream with history in taosd error ......") - tdSql.execute(f'create stream s21 fill_history 1 into ts5617.st21 as select last(val), last(quality) from ts5617.d_0 interval(1800s);') - tdSql.execute(f'create stream s211 fill_history 1 into ts5617.st211 as select last(val), last(quality) from ts5617.d_0 interval(1800s);') + tdSql.execute(f'create stream s21 fill_history 1 async into ts5617.st21 as select last(val), last(quality) from ts5617.d_0 interval(1800s);') + tdSql.execute(f'create stream s211 fill_history 1 async into ts5617.st211 as select last(val), last(quality) from ts5617.d_0 interval(1800s);') while 1: tdSql.query("show streams") @@ -158,7 +158,7 @@ class TDTestCase: tdSql.execute(f'drop table if exists ts5617.st211') tdLog.info("test creating stream with history in taosd restart ......") - tdSql.execute(f'create stream s21 fill_history 1 into ts5617.st21 tags(tname varchar(20)) subtable(tname) as select last(val), last(quality) from ts5617.stb_2_2_1 partition by tbname tname interval(1800s);') + tdSql.execute(f'create stream s21 fill_history 1 async into ts5617.st21 tags(tname varchar(20)) subtable(tname) as select last(val), last(quality) from ts5617.stb_2_2_1 partition by tbname tname interval(1800s);') tdSql.query("show streams") tdSql.checkRows(1) tdSql.checkData(0, 1, "init")