diff --git a/docs/zh/01-index.md b/docs/zh/01-index.md index 1fda72024c..02c6105d43 100644 --- a/docs/zh/01-index.md +++ b/docs/zh/01-index.md @@ -16,7 +16,7 @@ TDengine 采用 SQL 作为查询语言,大大降低学习成本、降低迁移 如果你是系统管理员,关心安装、升级、容错灾备、关心数据导入、导出、配置参数,如何监测 TDengine 是否健康运行,如何提升系统运行的性能,请仔细参考[运维指南](./operation)一章。 -如果你对数据库内核设计感兴趣,或是开源爱好者,建议仔细阅读[技术内幕](./tdinterna)一章。该章从分布式架构到存储引擎、查询引擎、数据订阅,再到流计算引擎都做了详细阐述。建议对照文档,查看TDengine在GitHub的源代码,对TDengine的设计和编码做深入了解,更欢迎加入开源社区,贡献代码。 +如果你对数据库内核设计感兴趣,或是开源爱好者,建议仔细阅读[技术内幕](./tdinternal)一章。该章从分布式架构到存储引擎、查询引擎、数据订阅,再到流计算引擎都做了详细阐述。建议对照文档,查看TDengine在GitHub的源代码,对TDengine的设计和编码做深入了解,更欢迎加入开源社区,贡献代码。 最后,作为一个开源软件,欢迎大家的参与。如果发现文档有任何错误、描述不清晰的地方,请在每个页面的最下方,点击“编辑本文档”直接进行修改。 diff --git a/docs/zh/04-get-started/03-package.md b/docs/zh/04-get-started/03-package.md index 9724c0a1c9..344b9412df 100644 --- a/docs/zh/04-get-started/03-package.md +++ b/docs/zh/04-get-started/03-package.md @@ -8,8 +8,6 @@ import Tabs from "@theme/Tabs"; import TabItem from "@theme/TabItem"; import PkgListV3 from "/components/PkgListV3"; -您可以[用 Docker 立即体验](../../get-started/docker/) TDengine。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. - TDengine 完整的软件包包括服务端(taosd)、应用驱动(taosc)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、命令行程序(CLI,taos)和一些工具软件。目前 TDinsight 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/components/taosadapter/) 提供 [RESTful 接口](../../reference/connector/rest-api/)。 为方便使用,标准的服务端安装包包含了 taosd、taosAdapter、taosc、taos、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 Lite 版本的安装包。 @@ -319,4 +317,4 @@ SELECT AVG(current), MAX(voltage), MIN(phase) FROM test.meters WHERE groupId = 1 SELECT _wstart, AVG(current), MAX(voltage), MIN(phase) FROM test.d1001 INTERVAL(10s); ``` -在上面的查询中,使用系统提供的伪列 _wstart 来给出每个窗口的开始时间。 \ No newline at end of file +在上面的查询中,使用系统提供的伪列 _wstart 来给出每个窗口的开始时间。 diff --git a/docs/zh/04-get-started/index.md b/docs/zh/04-get-started/index.md index 4d9f7ceae5..5a7192f2c6 100644 --- a/docs/zh/04-get-started/index.md +++ b/docs/zh/04-get-started/index.md @@ -10,7 +10,7 @@ import official_account from './official-account.webp' TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../reference/components/taosadapter) 提供 [RESTful 接口](../reference/connector/rest-api)。 -本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。 +本章主要介绍如何快速设置 TDengine 环境并体验其高效写入和查询。 ```mdx-code-block import DocCardList from '@theme/DocCardList'; @@ -34,4 +34,4 @@ import {useCurrentSidebarCategory} from '@docusaurus/theme-common'; 关注 TDengine 视频号
收看技术直播与教学视频 关注 TDengine 公众号
阅读技术文章与行业案例 - \ No newline at end of file + diff --git a/docs/zh/06-advanced/03-stream.md b/docs/zh/06-advanced/03-stream.md index f5202cddad..333dabd189 100644 --- a/docs/zh/06-advanced/03-stream.md +++ b/docs/zh/06-advanced/03-stream.md @@ -175,7 +175,7 @@ TDengine 对于修改数据提供两种处理方式,由 IGNORE UPDATE 选项 用户可以为每个 partition 对应的子表生成自定义的 TAG 值,如下创建流的语句, ```sql -CREATE STREAM output_tag trigger at_once INTO output_tag_s TAGS(alias_tag varchar(100)) as select _wstart, count(*) from power.meters partition by concat("tag-", tbname) as alias_tag interval(10s)); +CREATE STREAM output_tag trigger at_once INTO output_tag_s TAGS(alias_tag varchar(100)) as select _wstart, count(*) from power.meters partition by concat("tag-", tbname) as alias_tag interval(10s); ``` 在 PARTITION 子句中,为 concat("tag-", tbname)定义了一个别名 alias_tag, 对应超级表 output_tag_s 的自定义 TAG 的名字。在上述示例中,流新创建的子表的 TAG 将以前缀 'tag-' 连接原表名作为 TAG 的值。会对 TAG 信息进行如下检查。 diff --git a/docs/zh/06-advanced/05-data-in/health-options.png b/docs/zh/06-advanced/05-data-in/health-options.png new file mode 100644 index 0000000000..d20a520a95 Binary files /dev/null and b/docs/zh/06-advanced/05-data-in/health-options.png differ diff --git a/docs/zh/06-advanced/05-data-in/index.md b/docs/zh/06-advanced/05-data-in/index.md index 0dfa04db56..8f23fe2a81 100644 --- a/docs/zh/06-advanced/05-data-in/index.md +++ b/docs/zh/06-advanced/05-data-in/index.md @@ -294,9 +294,32 @@ let v3 = data["voltage"].split(","); 在任务列表页面,还可以对任务进行启动、停止、查看、删除、复制等操作,也可以查看各个任务的运行情况,包括写入的记录条数、流量等。 +### 健康状态 + +从 3.3.5.0 开始,在任务管理列表中,增加了一项 ”健康状态“,用于指示当前任务运行过程中的健康状态。 + +在数据源的”高级选项“列表中,增加了多项健康状态监测的配置项,包括: + +![health options](./health-options.png) + +1. 健康监测时段(Health Check Duration):可选项,表示对最近多长时间的任务状态进行统计。 +2. Busy 状态阈值(Busy State Threshold):百分比,表示写入队列中入队元素数量与队列长度之比,默认 100%。 +3. 写入队列长度(Max Write Queue Length):表示对应的写入队列长度最大值。 +4. 写入错误阈值(Write Error Threshold):数值类型,表示健康监测时段中允许写入错误的数量。超出阈值,则报错。 + +在任务管理列表展示中,有如下状态: + +- Ready:数据源和目标端健康检查通过,可以进行数据读取和写入。 +- Idle:表示监测时段内无数据处理(没有数据进入处理流程)。 +- Busy:表示写入队列已满(超出一定阈值,表示写入繁忙,在一定程度上意味着当前可能存在性能瓶颈,需要调整参数或配置等来进行改善,但并不说明存在错误)。 +- Bounce:数据源和目标端均正常,但在写入过程中存在错误,一定周期内超出阈值,可能意味着存在大量非正常数据或正在发生数据丢失。 +- SourceError: 数据源错误导致无法进行读取。此时工作负载将尝试重连数据源。 +- SinkError:写入端错误导致无法进行写入。此时工作负载将尝试重连数据库,恢复后进入 Ready 状态。 +- Fatal:严重或无法恢复的错误。 + ```mdx-code-block import DocCardList from '@theme/DocCardList'; import {useCurrentSidebarCategory} from '@docusaurus/theme-common'; -``` \ No newline at end of file +``` diff --git a/docs/zh/14-reference/03-taos-sql/24-show.md b/docs/zh/14-reference/03-taos-sql/24-show.md index 110c9cee6e..4596a03281 100644 --- a/docs/zh/14-reference/03-taos-sql/24-show.md +++ b/docs/zh/14-reference/03-taos-sql/24-show.md @@ -155,7 +155,7 @@ SHOW QNODES; SHOW QUERIES; ``` -显示当前系统中正在进行的查询。 +显示当前系统中正在进行的写入(更新)/查询/删除。(由于内部 API 命名原因,所以统称 QUERIES) ## SHOW SCORES diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index cb05f98f45..0b34e882c8 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -378,7 +378,7 @@ typedef struct { TAOS_MULTI_BIND *bind; } SBindInfo; int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema, - SArray *rowArray); + SArray *rowArray, bool *pOrdered, bool *pDupTs); // stmt2 binding int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int32_t buffMaxLen, initGeosFn igeos, @@ -392,7 +392,7 @@ typedef struct { } SBindInfo2; int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema, - SArray *rowArray); + SArray *rowArray, bool *pOrdered, bool *pDupTs); #endif diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index dfc18da8cd..b311a9e06d 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -2168,19 +2168,38 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col pStmt->semWaited = true; } - int32_t code = 0; + SSHashObj *hashTbnames = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR)); + if (NULL == hashTbnames) { + tscError("stmt2 bind failed: %s", tstrerror(terrno)); + return terrno; + } + + int32_t code = TSDB_CODE_SUCCESS; for (int i = 0; i < bindv->count; ++i) { if (bindv->tbnames && bindv->tbnames[i]) { + if (pStmt->sql.stbInterlaceMode) { + if (tSimpleHashGet(hashTbnames, bindv->tbnames[i], strlen(bindv->tbnames[i])) != NULL) { + code = terrno = TSDB_CODE_PAR_TBNAME_DUPLICATED; + tscError("stmt2 bind failed: %s %s", tstrerror(terrno), bindv->tbnames[i]); + goto out; + } + + code = tSimpleHashPut(hashTbnames, bindv->tbnames[i], strlen(bindv->tbnames[i]), NULL, 0); + if (code) { + goto out; + } + } + code = stmtSetTbName2(stmt, bindv->tbnames[i]); if (code) { - return code; + goto out; } } if (bindv->tags && bindv->tags[i]) { code = stmtSetTbTags2(stmt, bindv->tags[i]); if (code) { - return code; + goto out; } } @@ -2189,26 +2208,29 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col if (bind->num <= 0 || bind->num > INT16_MAX) { tscError("invalid bind num %d", bind->num); - terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR; - return terrno; + code = terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR; + goto out; } int32_t insert = 0; (void)stmtIsInsert2(stmt, &insert); if (0 == insert && bind->num > 1) { tscError("only one row data allowed for query"); - terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR; - return terrno; + code = terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR; + goto out; } code = stmtBindBatch2(stmt, bind, col_idx); if (TSDB_CODE_SUCCESS != code) { - return code; + goto out; } } } - return TSDB_CODE_SUCCESS; +out: + tSimpleHashCleanup(hashTbnames); + + return code; } int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) { diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index c1ab7ccff0..f1aacfed15 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -449,9 +449,11 @@ static int32_t tBindInfoCompare(const void *p1, const void *p2, const void *para * `infoSorted` is whether the bind information is sorted by column id * `pTSchema` is the schema of the table * `rowArray` is the array to store the rows + * `pOrdered` is the pointer to store ordered + * `pDupTs` is the pointer to store duplicateTs */ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema, - SArray *rowArray) { + SArray *rowArray, bool *pOrdered, bool *pDupTs) { if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) { return TSDB_CODE_INVALID_PARA; } @@ -469,6 +471,7 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, return terrno; } + SRowKey rowKey, lastRowKey; for (int32_t iRow = 0; iRow < numOfRows; iRow++) { taosArrayClear(colValArray); @@ -507,6 +510,22 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, code = terrno; goto _exit; } + + if (pOrdered && pDupTs) { + tRowGetKey(row, &rowKey); + if (iRow == 0) { + *pOrdered = true; + *pDupTs = false; + } else { + // no more compare if we already get disordered or duplicate rows + if (*pOrdered && !*pDupTs) { + int32_t code = tRowKeyCompare(&rowKey, &lastRowKey); + *pOrdered = (code >= 0); + *pDupTs = (code == 0); + } + } + lastRowKey = rowKey; + } } _exit: @@ -3235,9 +3254,11 @@ _exit: * `infoSorted` is whether the bind information is sorted by column id * `pTSchema` is the schema of the table * `rowArray` is the array to store the rows + * `pOrdered` is the pointer to store ordered + * `pDupTs` is the pointer to store duplicateTs */ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema, - SArray *rowArray) { + SArray *rowArray, bool *pOrdered, bool *pDupTs) { if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) { return TSDB_CODE_INVALID_PARA; } @@ -3266,6 +3287,7 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte } } + SRowKey rowKey, lastRowKey; for (int32_t iRow = 0; iRow < numOfRows; iRow++) { taosArrayClear(colValArray); @@ -3317,6 +3339,22 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte code = terrno; goto _exit; } + + if (pOrdered && pDupTs) { + tRowGetKey(row, &rowKey); + if (iRow == 0) { + *pOrdered = true; + *pDupTs = false; + } else { + // no more compare if we already get disordered or duplicate rows + if (*pOrdered && !*pDupTs) { + int32_t code = tRowKeyCompare(&rowKey, &lastRowKey); + *pOrdered = (code >= 0); + *pDupTs = (code == 0); + } + } + lastRowKey = rowKey; + } } _exit: diff --git a/source/dnode/mnode/impl/src/mndIndex.c b/source/dnode/mnode/impl/src/mndIndex.c index 718c34e85a..f5dac9df65 100644 --- a/source/dnode/mnode/impl/src/mndIndex.c +++ b/source/dnode/mnode/impl/src/mndIndex.c @@ -673,8 +673,6 @@ static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb code = TSDB_CODE_MND_TAG_NOT_EXIST; TAOS_RETURN(code); } - col_id_t colId = pOld->pTags[tag].colId; - TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId)); TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew)); SSchema *pTag = pNew->pTags + tag; @@ -806,16 +804,7 @@ static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *re TAOS_RETURN(code); } - col_id_t colId = pStb->pTags[tag].colId; - TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pStb->name, pStb->uid, colId)); - - // SSchema *pTag = pStb->pTags + tag; - // if (IS_IDX_ON(pTag)) { - // terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST; - // return -1; - // } code = mndAddIndexImpl(pMnode, pReq, pDb, pStb, &idxObj); - TAOS_RETURN(code); } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 2db76f6312..d46968a22d 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1805,7 +1805,6 @@ static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj * } SSchema *pTarget = &pOld->pColumns[idx]; col_id_t colId = pTarget->colId; - TAOS_CHECK_RETURN(mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId)); TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew)); code = validColCmprByType(pTarget->type, p->bytes); @@ -3702,10 +3701,6 @@ static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *ta terrno = TSDB_CODE_MND_TAG_NOT_EXIST; return -1; } - col_id_t colId = pOld->pTags[tag].colId; - if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) { - return -1; - } if (mndAllocStbSchemas(pOld, pNew) != 0) { return -1; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3bee82e3e7..9e6188e9d9 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1685,11 +1685,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid); - if (pStream->status == STREAM_STATUS__PAUSE) { - sdbRelease(pMnode->pSdb, pStream); - return 0; - } - if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) { sdbRelease(pMnode->pSdb, pStream); return code; @@ -1778,7 +1773,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { // pause stream taosWLockLatch(&pStream->lock); - pStream->status = STREAM_STATUS__PAUSE; code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); if (code) { taosWUnLockLatch(&pStream->lock); @@ -1829,11 +1823,6 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } } - if (pStream->status != STREAM_STATUS__PAUSE) { - sdbRelease(pMnode->pSdb, pStream); - return 0; - } - mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid); if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { sdbRelease(pMnode->pSdb, pStream); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index f3d3a85211..7a38e68744 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -914,8 +914,7 @@ int32_t mndResetChkptReportInfo(SHashObj *pHash, int64_t streamId) { return TSDB_CODE_MND_STREAM_NOT_EXIST; } -static void mndShowStreamStatus(char *dst, SStreamObj *pStream) { - int8_t status = atomic_load_8(&pStream->status); +static void mndShowStreamStatus(char *dst, int8_t status) { if (status == STREAM_STATUS__NORMAL) { tstrncpy(dst, "ready", MND_STREAM_TRIGGER_NAME_SIZE); } else if (status == STREAM_STATUS__STOP) { @@ -951,6 +950,41 @@ static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) { varDataSetLen(pBuf, len + 2); } +static int32_t isAllTaskPaused(SStreamObj *pStream, bool *pRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamTaskIter *pIter = NULL; + bool isPaused = true; + + taosRLockLatch(&pStream->lock); + code = createStreamTaskIter(pStream, &pIter); + TSDB_CHECK_CODE(code, lino, _end); + + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = NULL; + code = streamTaskIterGetCurrent(pIter, &pTask); + TSDB_CHECK_CODE(code, lino, _end); + + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (pe == NULL) { + continue; + } + if (pe->status != TASK_STATUS__PAUSE) { + isPaused = false; + } + } + (*pRes) = isPaused; + +_end: + destroyStreamTaskIter(pIter); + taosRUnLockLatch(&pStream->lock); + if (code != TSDB_CODE_SUCCESS) { + mError("error happens when get stream status, lino:%d, code:%s", lino, tstrerror(code)); + } + return code; +} + int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) { int32_t code = 0; int32_t cols = 0; @@ -999,7 +1033,15 @@ int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_ char status[20 + VARSTR_HEADER_SIZE] = {0}; char status2[MND_STREAM_TRIGGER_NAME_SIZE] = {0}; - mndShowStreamStatus(status2, pStream); + bool isPaused = false; + code = isAllTaskPaused(pStream, &isPaused); + TSDB_CHECK_CODE(code, lino, _end); + + int8_t streamStatus = atomic_load_8(&pStream->status); + if (isPaused) { + streamStatus = STREAM_STATUS__PAUSE; + } + mndShowStreamStatus(status2, streamStatus); STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index 4ecc18d189..37d7b2f431 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -368,7 +368,7 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind // } } - code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols); + code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, &pDataBlock->ordered, &pDataBlock->duplicateTs); qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum); @@ -745,7 +745,7 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin pBindInfos[c].bytes = pColSchema->bytes; } - code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols); + code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, &pDataBlock->ordered, &pDataBlock->duplicateTs); qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 74274ab1ba..00a17c8c96 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -315,6 +315,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5466.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5473.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-32187.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-33225.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td32526.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py diff --git a/tests/script/api/makefile b/tests/script/api/makefile index b871c5f3ff..a270a6c0ed 100644 --- a/tests/script/api/makefile +++ b/tests/script/api/makefile @@ -29,6 +29,8 @@ exe: # gcc $(CFLAGS) ./stmt2-get-fields.c -o $(ROOT)stmt2-get-fields $(LFLAGS) # gcc $(CFLAGS) ./stmt2-nohole.c -o $(ROOT)stmt2-nohole $(LFLAGS) gcc $(CFLAGS) ./stmt-crash.c -o $(ROOT)stmt-crash $(LFLAGS) + gcc $(CFLAGS) ./stmt-insert-dupkeys.c -o $(ROOT)stmt-insert-dupkeys $(LFLAGS) + gcc $(CFLAGS) ./stmt2-insert-dupkeys.c -o $(ROOT)stmt2-insert-dupkeys $(LFLAGS) clean: rm $(ROOT)batchprepare @@ -47,3 +49,5 @@ clean: rm $(ROOT)stmt2-get-fields rm $(ROOT)stmt2-nohole rm $(ROOT)stmt-crash + rm $(ROOT)stmt-insert-dupkeys + rm $(ROOT)stmt2-insert-dupkeys diff --git a/tests/script/api/stmt-insert-dupkeys.c b/tests/script/api/stmt-insert-dupkeys.c new file mode 100644 index 0000000000..b564fbb21d --- /dev/null +++ b/tests/script/api/stmt-insert-dupkeys.c @@ -0,0 +1,234 @@ +// compile with +// gcc -o stmt-insert-dupkeys stmt-insert-dupkeys.c -ltaos +#include +#include +#include +#include "taos.h" + +#define NUMROWS 3 + +/** + * @brief execute sql only and ignore result set + * + * @param taos + * @param sql + */ +void executeSQL(TAOS *taos, const char *sql) { + TAOS_RES *res = taos_query(taos, sql); + int code = taos_errno(res); + if (code != 0) { + printf("%s\n", taos_errstr(res)); + taos_free_result(res); + taos_close(taos); + exit(EXIT_FAILURE); + } + taos_free_result(res); +} + +/** + * @brief exit program when error occur. + * + * @param stmt + * @param code + * @param msg + */ +void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) { + if (code != 0) { + printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt)); + exit(EXIT_FAILURE); + } +} + +void prepareBindTags(TAOS_MULTI_BIND *tags) { + // bind table name and tags + char *location = "California.SanFrancisco"; + int groupId = 2; + tags[0].buffer_type = TSDB_DATA_TYPE_BINARY; + tags[0].buffer_length = strlen(location); + tags[0].length = (int32_t *)&tags[0].buffer_length; + tags[0].buffer = location; + tags[0].is_null = NULL; + + tags[1].buffer_type = TSDB_DATA_TYPE_INT; + tags[1].buffer_length = sizeof(int); + tags[1].length = (int32_t *)&tags[1].buffer_length; + tags[1].buffer = &groupId; + tags[1].is_null = NULL; +} + +void prepareBindParams(TAOS_MULTI_BIND *params, int64_t *ts, float *current, int *voltage, float *phase) { + // is_null array + char is_null[NUMROWS] = {0}; + // length array + int32_t int64Len[NUMROWS] = {sizeof(int64_t)}; + int32_t floatLen[NUMROWS] = {sizeof(float)}; + int32_t intLen[NUMROWS] = {sizeof(int)}; + + params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[0].buffer_length = sizeof(int64_t); + params[0].buffer = ts; + params[0].length = int64Len; + params[0].is_null = is_null; + params[0].num = NUMROWS; + + params[1].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[1].buffer_length = sizeof(float); + params[1].buffer = current; + params[1].length = floatLen; + params[1].is_null = is_null; + params[1].num = NUMROWS; + + params[2].buffer_type = TSDB_DATA_TYPE_INT; + params[2].buffer_length = sizeof(int); + params[2].buffer = voltage; + params[2].length = intLen; + params[2].is_null = is_null; + params[2].num = NUMROWS; + + params[3].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[3].buffer_length = sizeof(float); + params[3].buffer = phase; + params[3].length = floatLen; + params[3].is_null = is_null; + params[3].num = NUMROWS; +} + +/** + * @brief insert data using stmt API + * + * @param taos + */ +void insertData(TAOS *taos, int64_t *ts, float *current, int *voltage, float *phase) { + // init + TAOS_STMT *stmt = taos_stmt_init(taos); + + // prepare + const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) values(?, ?, ?, ?)"; + int code = taos_stmt_prepare(stmt, sql, 0); + checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare"); + + // bind table name and tags + TAOS_MULTI_BIND tags[2]; + prepareBindTags(tags); + code = taos_stmt_set_tbname_tags(stmt, "d1001", tags); + checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags"); + + TAOS_MULTI_BIND params[4]; + prepareBindParams(params, ts, current, voltage, phase); + + code = taos_stmt_bind_param_batch(stmt, params); // bind batch + checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param_batch"); + + code = taos_stmt_add_batch(stmt); // add batch + checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch"); + + // execute + code = taos_stmt_execute(stmt); + checkErrorCode(stmt, code, "failed to execute taos_stmt_execute"); + + int affectedRows = taos_stmt_affected_rows(stmt); + printf("successfully inserted %d rows\n", affectedRows); + + // close + (void)taos_stmt_close(stmt); +} + +void insertDataInterlace(TAOS *taos, int64_t *ts, float *current, int *voltage, float *phase) { + // init with interlace mode + TAOS_STMT_OPTIONS op; + op.reqId = 0; + op.singleStbInsert = true; + op.singleTableBindOnce = true; + TAOS_STMT *stmt = taos_stmt_init_with_options(taos, &op); + + // prepare + const char *sql = "INSERT INTO ? values(?, ?, ?, ?)"; + int code = taos_stmt_prepare(stmt, sql, 0); + checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare"); + + // bind table name and tags + TAOS_MULTI_BIND tags[2]; + prepareBindTags(tags); + code = taos_stmt_set_tbname_tags(stmt, "d1001", tags); + checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags"); + + TAOS_MULTI_BIND params[4]; + prepareBindParams(params, ts, current, voltage, phase); + + code = taos_stmt_bind_param_batch(stmt, params); // bind batch + checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param_batch"); + + code = taos_stmt_add_batch(stmt); // add batch + checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch"); + + // execute + code = taos_stmt_execute(stmt); + checkErrorCode(stmt, code, "failed to execute taos_stmt_execute"); + + int affectedRows = taos_stmt_affected_rows(stmt); + printf("successfully inserted %d rows\n", affectedRows); + + // close + (void)taos_stmt_close(stmt); +} + +int main() { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 6030); + if (taos == NULL) { + printf("failed to connect to server\n"); + exit(EXIT_FAILURE); + } + executeSQL(taos, "DROP DATABASE IF EXISTS power"); + executeSQL(taos, "CREATE DATABASE power"); + executeSQL(taos, "USE power"); + executeSQL(taos, + "CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), " + "groupId INT)"); + + // initial insert, expect insert 3 rows + int64_t ts0[] = {1648432611234, 1648432611345, 1648432611456}; + float current0[] = {10.1f, 10.2f, 10.3f}; + int voltage0[] = {216, 217, 218}; + float phase0[] = {0.31f, 0.32f, 0.33f}; + insertData(taos, ts0, current0, voltage0, phase0); + + // insert with interlace mode, send non-duplicate ts, expect insert 3 overlapped rows + int64_t ts1[] = {1648432611234, 1648432611345, 1648432611456}; + int voltage1[] = {219, 220, 221}; + insertDataInterlace(taos, ts1, current0, voltage1, phase0); + + // insert with interlace mode, send duplicate ts, expect insert 2 rows with dups merged + int64_t ts2[] = {1648432611678, 1648432611678, 1648432611789}; + int voltage2[] = {222, 223, 224}; + insertDataInterlace(taos, ts2, current0, voltage2, phase0); + + // insert with interlace mode, send disordered rows, expect insert 3 sorted rows + int64_t ts3[] = {1648432611900, 1648432611890, 1648432611910}; + int voltage3[] = {225, 226, 227}; + insertDataInterlace(taos, ts3, current0, voltage3, phase0); + + // insert with interlace mode, send disordered and duplicate rows, expect insert 2 sorted and dup-merged rows + int64_t ts4[] = {1648432611930, 1648432611920, 1648432611930}; + int voltage4[] = {228, 229, 230}; + insertDataInterlace(taos, ts4, current0, voltage4, phase0); + + taos_close(taos); + taos_cleanup(); + + // final results + // taos> select * from d1001; + // ts | current | voltage | phase | + // ====================================================================================== + // 2022-03-28 09:56:51.234 | 10.1000004 | 219 | 0.3100000 | + // 2022-03-28 09:56:51.345 | 10.1999998 | 220 | 0.3200000 | + // 2022-03-28 09:56:51.456 | 10.3000002 | 221 | 0.3300000 | + // 2022-03-28 09:56:51.678 | 10.1999998 | 223 | 0.3200000 | + // 2022-03-28 09:56:51.789 | 10.3000002 | 224 | 0.3300000 | + // 2022-03-28 09:56:51.890 | 10.1999998 | 226 | 0.3200000 | + // 2022-03-28 09:56:51.900 | 10.1000004 | 225 | 0.3100000 | + // 2022-03-28 09:56:51.910 | 10.3000002 | 227 | 0.3300000 | + // 2022-03-28 09:56:51.920 | 10.1999998 | 229 | 0.3200000 | + // 2022-03-28 09:56:51.930 | 10.3000002 | 230 | 0.3300000 | + // Query OK, 10 row(s) in set (0.005083s) +} + diff --git a/tests/script/api/stmt2-insert-dupkeys.c b/tests/script/api/stmt2-insert-dupkeys.c new file mode 100644 index 0000000000..adab3ddf39 --- /dev/null +++ b/tests/script/api/stmt2-insert-dupkeys.c @@ -0,0 +1,235 @@ +#include +#include +#include +#include +#include +#include "taos.h" + +int CTB_NUMS = 3; +int ROW_NUMS = 3; + +void do_query(TAOS* taos, const char* sql) { + TAOS_RES* result = taos_query(taos, sql); + int code = taos_errno(result); + if (code) { + printf("failed to query: %s, reason:%s\n", sql, taos_errstr(result)); + taos_free_result(result); + return; + } + taos_free_result(result); +} + +void createdb(TAOS* taos) { + do_query(taos, "drop database if exists db"); + do_query(taos, "create database db"); + do_query(taos, "create stable db.stb (ts timestamp, b binary(10)) tags(t1 int, t2 binary(10))"); + do_query(taos, "use db"); +} + +#define INIT(tbs, ts, ts_len, b, b_len, tags, paramv) \ +do { \ + /* tbname */ \ + tbs = (char**)malloc(CTB_NUMS * sizeof(char*)); \ + for (int i = 0; i < CTB_NUMS; i++) { \ + tbs[i] = (char*)malloc(sizeof(char) * 20); \ + sprintf(tbs[i], "ctb_%d", i); \ + } \ + /* col params */ \ + ts = (int64_t**)malloc(CTB_NUMS * sizeof(int64_t*)); \ + b = (char**)malloc(CTB_NUMS * sizeof(char*)); \ + ts_len = (int*)malloc(ROW_NUMS * sizeof(int)); \ + b_len = (int*)malloc(ROW_NUMS * sizeof(int)); \ + for (int i = 0; i < ROW_NUMS; i++) { \ + ts_len[i] = sizeof(int64_t); \ + b_len[i] = 1; \ + } \ + for (int i = 0; i < CTB_NUMS; i++) { \ + ts[i] = (int64_t*)malloc(ROW_NUMS * sizeof(int64_t)); \ + b[i] = (char*)malloc(ROW_NUMS * sizeof(char)); \ + for (int j = 0; j < ROW_NUMS; j++) { \ + ts[i][j] = 1591060628000 + j; \ + b[i][j] = (char)('a' + j); \ + } \ + } \ + /*tag params */ \ + int t1 = 0; \ + int t1len = sizeof(int); \ + int t2len = 3; \ + /* bind params */ \ + paramv = (TAOS_STMT2_BIND**)malloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*)); \ + tags = (TAOS_STMT2_BIND**)malloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*)); \ + for (int i = 0; i < CTB_NUMS; i++) { \ + /* create tags */ \ + tags[i] = (TAOS_STMT2_BIND*)malloc(2 * sizeof(TAOS_STMT2_BIND)); \ + tags[i][0] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_INT, &t1, &t1len, NULL, 0}; \ + tags[i][1] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_BINARY, "after", &t2len, NULL, 0}; \ + /* create col params */ \ + paramv[i] = (TAOS_STMT2_BIND*)malloc(2 * sizeof(TAOS_STMT2_BIND)); \ + paramv[i][0] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS}; \ + paramv[i][1] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS}; \ + } \ +} while (0) + +#define UINIT(tbs, ts, ts_len, b, b_len, tags, paramv) \ +do { \ + for (int i = 0; i < CTB_NUMS; i++) { \ + free(tbs[i]); \ + } \ + free(tbs); \ + for (int i = 0; i < CTB_NUMS; i++) { \ + free(ts[i]); \ + free(b[i]); \ + } \ + free(ts); \ + free(b); \ + free(ts_len); \ + free(b_len); \ + for (int i = 0; i < CTB_NUMS; i++) { \ + free(tags[i]); \ + free(paramv[i]); \ + } \ + free(tags); \ + free(paramv); \ +} while (0) + +void insert(TAOS* taos, char **tbs, TAOS_STMT2_BIND **tags, TAOS_STMT2_BIND **paramv, const char* sql) +{ + clock_t start, end; + double cpu_time_used; + + TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL}; + TAOS_STMT2 *stmt = taos_stmt2_init(taos, &option); + int code = taos_stmt2_prepare(stmt, sql, 0); + if (code != 0) { + printf("failed to execute taos_stmt2_prepare. error:%s\n", taos_stmt2_error(stmt)); + taos_stmt2_close(stmt); + exit(EXIT_FAILURE); + } + + // bind + start = clock(); + TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, tags, paramv}; + if (taos_stmt2_bind_param(stmt, &bindv, -1)) { + printf("failed to execute taos_stmt2_bind_param statement.error:%s\n", taos_stmt2_error(stmt)); + taos_stmt2_close(stmt); + exit(EXIT_FAILURE); + } + end = clock(); + cpu_time_used = ((double)(end - start)) / CLOCKS_PER_SEC; + printf("stmt2-bind [%s] insert Time used: %f seconds\n", sql, cpu_time_used); + start = clock(); + + // exec + if (taos_stmt2_exec(stmt, NULL)) { + printf("failed to execute taos_stmt2_exec statement.error:%s\n", taos_stmt2_error(stmt)); + taos_stmt2_close(stmt); + exit(EXIT_FAILURE); + } + end = clock(); + cpu_time_used = ((double)(end - start)) / CLOCKS_PER_SEC; + printf("stmt2-exec [%s] insert Time used: %f seconds\n", sql, cpu_time_used); + + taos_stmt2_close(stmt); +} + +void insert_dist(TAOS* taos, const char *sql) { + char **tbs, **b; + int64_t **ts; + int *ts_len, *b_len; + TAOS_STMT2_BIND **paramv, **tags; + + INIT(tbs, ts, ts_len, b, b_len, tags, paramv); + + insert(taos, tbs, tags, paramv, sql); + + UINIT(tbs, ts, ts_len, b, b_len, tags, paramv); +} + +void insert_dup_rows(TAOS* taos, const char *sql) { + char **tbs, **b; + int64_t **ts; + int *ts_len, *b_len; + TAOS_STMT2_BIND **paramv, **tags; + + INIT(tbs, ts, ts_len, b, b_len, tags, paramv); + + // insert duplicate rows + for (int i = 0; i < CTB_NUMS; i++) { + for (int j = 0; j < ROW_NUMS; j++) { + ts[i][j] = 1591060628000; + b[i][j] = (char)('x' + j); + } + } + for (int i = 0; i < CTB_NUMS; i++) { + paramv[i][0] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS}; + paramv[i][1] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS}; + } + insert(taos, tbs, tags, paramv, sql); + + UINIT(tbs, ts, ts_len, b, b_len, tags, paramv); +} + +void insert_dup_tables(TAOS* taos, const char *sql) { + char **tbs, **b; + int64_t **ts; + int *ts_len, *b_len; + TAOS_STMT2_BIND **paramv, **tags; + + INIT(tbs, ts, ts_len, b, b_len, tags, paramv); + + for (int i = 0; i < CTB_NUMS; i++) { + sprintf(tbs[i], "ctb_%d", i % 2); + } + + for (int i = 0; i < CTB_NUMS; i++) { + paramv[i][0] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS}; + paramv[i][1] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS}; + } + insert(taos, tbs, tags, paramv, sql); + + UINIT(tbs, ts, ts_len, b, b_len, tags, paramv); +} + +int main() { + TAOS* taos = taos_connect("localhost", "root", "taosdata", "", 0); + if (!taos) { + printf("failed to connect to db, reason:%s\n", taos_errstr(taos)); + exit(EXIT_FAILURE); + } + + createdb(taos); + // insert distinct rows + insert_dist(taos, "insert into db.? using db.stb tags(?,?)values(?,?)"); + // insert duplicate rows + insert_dup_rows(taos, "insert into db.? values(?,?)"); + // insert duplicate tables + insert_dup_tables(taos, "insert into db.? values(?,?)"); + + taos_close(taos); + taos_cleanup(); +} + +// final results +// taos> select * from ctb_0; +// ts | b | +// ========================================= +// 2020-06-02 09:17:08.000 | z | +// 2020-06-02 09:17:08.001 | b | +// 2020-06-02 09:17:08.002 | c | +// Query OK, 3 row(s) in set (0.003975s) +// +// taos> select * from ctb_1; +// ts | b | +// ========================================= +// 2020-06-02 09:17:08.000 | z | +// 2020-06-02 09:17:08.001 | b | +// 2020-06-02 09:17:08.002 | c | +// Query OK, 3 row(s) in set (0.007241s) + +// taos> select * from ctb_2; +// ts | b | +// ========================================= +// 2020-06-02 09:17:08.000 | z | +// 2020-06-02 09:17:08.001 | b | +// 2020-06-02 09:17:08.002 | c | +// Query OK, 3 row(s) in set (0.005443s) diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index 1f4caf5c03..4cc193dd79 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -398,4 +398,204 @@ endi print ===== step5 over +print ===== step6 +sql drop database if exists test6; +sql create database test7 vgroups 1; +sql use test7; +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); + +sql create stream streams8 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt8 as select _wstart, count(*) c1 from st interval(10s); +sql create stream streams9 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt9 as select _wstart, count(*) c1 from st partition by tbname interval(10s); + +run tsim/stream/checkTaskStatus.sim + +$loop_count = 0 +loop7: + +$loop_count = $loop_count + 1 +if $loop_count == 40 then + return -1 +endi + +sleep 500 + +sql select status, * from information_schema.ins_streams where status != "ready"; + +if $rows != 0 then + print $data00 $data01 $data02 $data03 $data04 + print $data10 $data11 $data12 $data13 $data14 + print $data20 $data21 $data22 $data23 $data24 + print $data30 $data31 $data32 $data33 $data34 + print $data40 $data41 $data42 $data43 $data44 + print $data50 $data51 $data52 $data53 $data54 + goto loop7 +endi + +sql pause stream streams8; + +sql pause stream streams9; + +sql pause stream streams8; + +sql pause stream streams9; + +sleep 1000 + +sql pause stream streams8; + +sql pause stream streams9; + +sleep 1000 + +$loop_count = 0 +loop80: + +$loop_count = $loop_count + 1 +if $loop_count == 40 then + print pause stream failed + goto end_step_6 +endi + +sleep 1000 + +sql select status, * from information_schema.ins_stream_tasks where status != "paused"; + +if $rows != 2 then + print $data00 $data01 $data02 $data03 $data04 + print $data10 $data11 $data12 $data13 $data14 + print $data20 $data21 $data22 $data23 $data24 + print $data30 $data31 $data32 $data33 $data34 + print $data40 $data41 $data42 $data43 $data44 + print $data50 $data51 $data52 $data53 $data54 + goto loop80 +endi + +$loop_count = 0 +loop8: + +$loop_count = $loop_count + 1 +if $loop_count == 40 then + return -1 +endi + +sleep 1000 + +sql select status, * from information_schema.ins_streams where status == "paused"; + +if $rows != 2 then + print $data00 $data01 $data02 $data03 $data04 + print $data10 $data11 $data12 $data13 $data14 + print $data20 $data21 $data22 $data23 $data24 + print $data30 $data31 $data32 $data33 $data34 + print $data40 $data41 $data42 $data43 $data44 + print $data50 $data51 $data52 $data53 $data54 + goto loop8 +endi + + +sql resume stream streams8; + +sql resume stream streams9; + +sql resume stream streams8; + +sql resume stream streams9; + +sleep 1000 + +sql resume stream streams8; + +sql resume stream streams9; + +sleep 1000 + + +$loop_count = 0 +loop90: + +$loop_count = $loop_count + 1 +if $loop_count == 40 then + print pause stream failed + goto end_step_6 +endi + +sleep 1000 + +sql select status, * from information_schema.ins_stream_tasks where status == "paused"; + +if $rows != 0 then + print $data00 $data01 $data02 $data03 $data04 + print $data10 $data11 $data12 $data13 $data14 + print $data20 $data21 $data22 $data23 $data24 + print $data30 $data31 $data32 $data33 $data34 + print $data40 $data41 $data42 $data43 $data44 + print $data50 $data51 $data52 $data53 $data54 + goto loop90 +endi + +$loop_count = 0 +loop9: + +$loop_count = $loop_count + 1 +if $loop_count == 40 then + return -1 +endi + +sleep 1000 + +sql select status, * from information_schema.ins_streams where status != "paused"; + +if $rows != 2 then + print $data00 $data01 $data02 $data03 $data04 + print $data10 $data11 $data12 $data13 $data14 + print $data20 $data21 $data22 $data23 $data24 + print $data30 $data31 $data32 $data33 $data34 + print $data40 $data41 $data42 $data43 $data44 + print $data50 $data51 $data52 $data53 $data54 + goto loop9 +endi + +run tsim/stream/checkTaskStatus.sim + +sql insert into ts1 values(1648791213001,1,12,3,1.0); + +$loop_count = 0 +loop11: + +$loop_count = $loop_count + 1 +if $loop_count == 40 then + return -1 +endi + +sleep 1000 + +sql select * from streamt8; + +if $rows != 1 then + print $data00 $data01 $data02 $data03 $data04 + print $data10 $data11 $data12 $data13 $data14 + print $data20 $data21 $data22 $data23 $data24 + print $data30 $data31 $data32 $data33 $data34 + print $data40 $data41 $data42 $data43 $data44 + print $data50 $data51 $data52 $data53 $data54 + goto loop11 +endi + +sql select * from streamt9; + +if $rows != 1 then + print $data00 $data01 $data02 $data03 $data04 + print $data10 $data11 $data12 $data13 $data14 + print $data20 $data21 $data22 $data23 $data24 + print $data30 $data31 $data32 $data33 $data34 + print $data40 $data41 $data42 $data43 $data44 + print $data50 $data51 $data52 $data53 $data54 + goto loop11 +endi + +end_step_6: + +print ===== step6 over + system sh/stop_dnodes.sh diff --git a/tests/system-test/7-tmq/td-33225.py b/tests/system-test/7-tmq/td-33225.py new file mode 100644 index 0000000000..f39e402b55 --- /dev/null +++ b/tests/system-test/7-tmq/td-33225.py @@ -0,0 +1,44 @@ +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from taos.tmq import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def run(self): + tdSql.execute(f'create database if not exists db_33225') + tdSql.execute(f'use db_33225') + tdSql.execute(f'create stable if not exists s33225 (ts timestamp, c1 int, c2 int) tags (t binary(32), t2 int)') + tdSql.execute(f'insert into t1 using s33225 tags("__devicid__", 1) values(1669092069068, 0, 1)') + + tdSql.execute("create topic db_33225_topic as select ts,c1,t2 from s33225") + tdSql.execute(f'create stream s1 into st1 as select _wstart, count(*), avg(c2),t2 from s33225 PARTITION BY tbname INTERVAL(1m)') + + tdSql.execute(f'alter table s33225 modify column c2 COMPRESS "zlib"') + tdSql.execute(f'create index dex1 on s33225(t2)') + + return + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file