diff --git a/cmake/mxml_CMakeLists.txt.in b/cmake/mxml_CMakeLists.txt.in index 762df40e10..93fd35bb01 100644 --- a/cmake/mxml_CMakeLists.txt.in +++ b/cmake/mxml_CMakeLists.txt.in @@ -2,6 +2,7 @@ ExternalProject_Add(mxml GIT_REPOSITORY https://github.com/michaelrsweet/mxml.git GIT_TAG v2.12 + SOURCE_DIR "${TD_CONTRIB_DIR}/mxml" #BINARY_DIR "" BUILD_IN_SOURCE TRUE diff --git a/docs/zh/14-reference/05-taosbenchmark.md b/docs/zh/14-reference/05-taosbenchmark.md index 8be3d4d0a8..d5ae95b20b 100644 --- a/docs/zh/14-reference/05-taosbenchmark.md +++ b/docs/zh/14-reference/05-taosbenchmark.md @@ -11,7 +11,11 @@ taosBenchmark (曾用名 taosdemo ) 是一个用于测试 TDengine 产品性能 ## 安装 -- 安装 TDengine 官方安装包的同时会自动安装 taosBenchmark +taosBenchmark 有两种安装方式: + +- 安装 TDengine 官方安装包的同时会自动安装 taosBenchmark, 详情请参考[ TDengine 安装](../../operation/pkg-install)。 + +- 单独编译 taos-tools 并安装, 详情请参考 [taos-tools](https://github.com/taosdata/taos-tools) 仓库。 ## 运行 diff --git a/docs/zh/14-reference/14-taosKeeper.md b/docs/zh/14-reference/14-taosKeeper.md index 7e870def72..738d351d45 100644 --- a/docs/zh/14-reference/14-taosKeeper.md +++ b/docs/zh/14-reference/14-taosKeeper.md @@ -13,7 +13,12 @@ taosKeeper 是 TDengine 3.0 版本监控指标的导出工具,通过简单的 ## 安装 -- 安装 TDengine 官方安装包的同时会自动安装 taosKeeper +taosKeeper 有两种安装方式: +taosKeeper 安装方式: + +- 安装 TDengine 官方安装包的同时会自动安装 taosKeeper, 详情请参考[ TDengine 安装](../../operation/pkg-install)。 + +- 单独编译 taosKeeper 并安装,详情请参考 [taosKeeper](https://github.com/taosdata/taoskeeper) 仓库。 ## 配置和运行方式 diff --git a/docs/zh/17-operation/07-cluster.md b/docs/zh/17-operation/07-cluster.md new file mode 100644 index 0000000000..cf4bfafd53 --- /dev/null +++ b/docs/zh/17-operation/07-cluster.md @@ -0,0 +1,80 @@ +--- +title: 集群运维 +description: TDengine 提供了多种集群运维手段以使集群运行更健康更高效 +--- + +为了使集群运行更健康更高效,TDengine 企业版提供了一些运维手段来帮助系统管理员更好地运维集群。 + +## 数据重整 + +TDengine 面向多种写入场景,在有些写入场景下,TDengine 的存储会导致数据存储的放大或数据文件的空洞等。这一方面影响数据的存储效率,另一方面也会影响查询效率。为了解决上述问题,TDengine 企业版提供了对数据的重整功能,即 DATA COMPACT 功能,将存储的数据文件重新整理,删除文件空洞和无效数据,提高数据的组织度,从而提高存储和查询的效率。 + +**语法** + +```sql +COMPACT DATABASE db_name [start with 'XXXX'] [end with 'YYYY']; +``` + +**效果** + +- 扫描并压缩指定的 DB 中所有 VGROUP 中 VNODE 的所有数据文件 +- COMPCAT 会删除被删除数据以及被删除的表的数据 +- COMPACT 会合并多个 STT 文件 +- 可通过 start with 关键字指定 COMPACT 数据的起始时间 +- 可通过 end with 关键字指定 COMPACT 数据的终止时间 + +**补充说明** + +- COMPACT 为异步,执行 COMPACT 命令后不会等 COMPACT 结束就会返回。如果上一个 COMPACT 没有完成则再发起一个 COMPACT 任务,则会等上一个任务完成后再返回。 +- COMPACT 可能阻塞写入,但不阻塞查询 +- COMPACT 的进度不可观测 + +## 集群负载再平衡 + +当多副本集群中的一个或多个节点因为升级或其它原因而重启后,有可能出现集群中各个 dnode 负载不均衡的现象,极端情况下会出现所有 vgroup 的 leader 都位于同一个 dnode 的情况。为了解决这个问题,可以使用下面的命令 + +```sql +balance vgroup leader; +``` + +**功能** + +让所有的 vgroup 的 leade r在各自的replica节点上均匀分布。这个命令会让 vgroup 强制重新选举,通过重新选举,在选举的过程中,变换 vgroup 的leader,通过这个方式,最终让leader均匀分布。 + +**注意** + +Raft选举本身带有随机性,所以通过选举的重新分布产生的均匀分布也是带有一定的概率,不会完全的均匀。**该命令的副作用是影响查询和写入**,在vgroup重新选举时,从开始选举到选举出新的 leader 这段时间,这 个vgroup 无法写入和查询。选举过程一般在秒级完成。所有的vgroup会依次逐个重新选举。 + +## 恢复数据节点 + +在多节点三副本的集群环境中,如果某个 dnode 的磁盘损坏,该 dnode 会自动退出,但集群中其它的 dnode 仍然能够继续提供写入和查询服务。 + +在更换了损坏的磁盘后,如果想要让曾经主动退出的 dnode 重新加入集群提供服务,可以通过 `restore dnode` 命令来恢复该数据节点上的部分或全部逻辑节点,该功能依赖多副本中的其它副本进行数据复制,所以只在集群中 dnode 数量大于等于 3 且副本数为 3 的情况下能够工作。 + + +```sql +restore dnode ;# 恢复dnode上的mnode,所有vnode和qnode +restore mnode on dnode ;# 恢复dnode上的mnode +restore vnode on dnode ;# 恢复dnode上的所有vnode +restore qnode on dnode ;# 恢复dnode上的qnode +``` + +**限制** +- 该功能是基于已有的复制功能的恢复,不是灾难恢复或者备份恢复,所以对于要恢复的 mnode 和 vnode来说,使用该命令的前提是还存在该 mnode 或 vnode 的其它两个副本仍然能够正常工作。 +- 该命令不能修复数据目录中的个别文件的损坏或者丢失。例如,如果某个 mnode 或者 vnode 中的个别文件或数据损坏,无法单独恢复损坏的某个文件或者某块数据。此时,可以选择将该 mnode/vnode 的数据全部清空再进行恢复。 + + +## 虚拟组分裂 (Scale Out) + +当一个 vgroup 因为子表数过多而导致 CPU 或 Disk 资源使用量负载过高时,增加 dnode 节点后,可通过 `split vgroup` 命令把该 vgroup 分裂为两个虚拟组。分裂完成后,新产生的两个 vgroup 承担原来由一个 vgroup 提供的读写服务。这也是 TDengine 为企业版用户提供的 scale out 集群的能力。 + +```sql +split vgroup +``` + +**注意** +- 单副本库虚拟组,在分裂完成后,历史时序数据总磁盘空间使用量,可能会翻倍。所以,在执行该操作之前,通过增加 dnode 节点方式,确保集群中有足够的 CPU 和磁盘资源,避免资源不足现象发生。 +- 该命令为 DB 级事务;执行过程,当前DB的其它管理事务将会被拒绝。集群中,其它DB不受影响。 +- 分裂任务执行过程中,可持续提供读写服务;期间,可能存在可感知的短暂的读写业务中断。 +- 在分裂过程中,不支持流和订阅。分裂结束后,历史 WAL 会清空。 +- 分裂过程中,可支持节点宕机重启容错;但不支持节点磁盘故障容错。 \ No newline at end of file diff --git a/docs/zh/17-operation/09-storage.md b/docs/zh/17-operation/09-storage.md new file mode 100644 index 0000000000..185b2c40ec --- /dev/null +++ b/docs/zh/17-operation/09-storage.md @@ -0,0 +1,56 @@ +--- +title: 多级存储 +--- + +## 多级存储 + +说明:多级存储功能仅企业版支持。 + +在默认配置下,TDengine 会将所有数据保存在 /var/lib/taos 目录下,而且每个 vnode 的数据文件保存在该目录下的不同目录。为扩大存储空间,尽量减少文件读取的瓶颈,提高数据吞吐率 TDengine 可通过配置系统参数 dataDir 让多个挂载的硬盘被系统同时使用。 + +除此之外,TDengine 也提供了数据分级存储的功能,将不同时间段的数据存储在挂载的不同介质上的目录里,从而实现不同“热度”的数据存储在不同的存储介质上,充分利用存储,节约成本。比如,最新采集的数据需要经常访问,对硬盘的读取性能要求高,那么用户可以配置将这些数据存储在 SSD 盘上。超过一定期限的数据,查询需求量没有那么高,那么可以存储在相对便宜的 HDD 盘上。 + +多级存储支持 3 级,每级最多可配置 16 个挂载点。 + +TDengine 多级存储配置方式如下(在配置文件/etc/taos/taos.cfg 中): + +``` +dataDir [path] +``` + +- path: 挂载点的文件夹路径 +- level: 介质存储等级,取值为 0,1,2。 + 0 级存储最新的数据,1 级存储次新的数据,2 级存储最老的数据,省略默认为 0。 + 各级存储之间的数据流向:0 级存储 -> 1 级存储 -> 2 级存储。 + 同一存储等级可挂载多个硬盘,同一存储等级上的数据文件分布在该存储等级的所有硬盘上。 + 需要说明的是,数据在不同级别的存储介质上的移动,是由系统自动完成的,用户无需干预。 +- primary: 是否为主挂载点,0(否)或 1(是),省略默认为 1。 + +在配置中,只允许一个主挂载点的存在(level=0,primary=1),例如采用如下的配置方式: + +``` +dataDir /mnt/data1 0 1 +dataDir /mnt/data2 0 0 +dataDir /mnt/data3 1 0 +dataDir /mnt/data4 1 0 +dataDir /mnt/data5 2 0 +dataDir /mnt/data6 2 0 +``` + +:::note + +1. 多级存储不允许跨级配置,合法的配置方案有:仅 0 级,仅 0 级+ 1 级,以及 0 级+ 1 级+ 2 级。而不允许只配置 level=0 和 level=2,而不配置 level=1。 +2. 禁止手动移除使用中的挂载盘,挂载盘目前不支持非本地的网络盘。 +3. 多级存储目前不支持删除已经挂载的硬盘的功能。 + +::: + +## 0 级负载均衡 + +在多级存储中,有且只有一个主挂载点,主挂载点承担了系统中最重要的元数据在座,同时各个 vnode 的主目录均存在于当前 dnode 主挂载点上,从而导致该 dnode 的写入性能受限于单个磁盘的 IO 吞吐能力。 + +从 TDengine 3.1.0.0 开始,如果一个 dnode 配置了多个 0 级挂载点,我们将该 dnode 上所有 vnode 的主目录均衡分布在所有的 0 级挂载点上,由这些 0 级挂载点共同承担写入负荷。在网络 I/O 及其它处理资源不成为瓶颈的情况下,通过优化集群配置,测试结果证明整个系统的写入能力和 0 级挂载点的数量呈现线性关系,即随着 0 级挂载点数量的增加,整个系统的写入能力也成倍增加。 + +## 同级挂载点选择策略 + +一般情况下,当 TDengine 要从同级挂载点中选择一个用于生成新的数据文件时,采用 round robin 策略进行选择。但现实中有可能每个磁盘的容量不相同,或者容量相同但写入的数据量不相同,这就导致会出现每个磁盘上的可用空间不均衡,在实际进行选择时有可能会选择到一个剩余空间已经很小的磁盘。为了解决这个问题,从 3.1.1.0 开始引入了一个新的配置 `minDiskFreeSize`,当某块磁盘上的可用空间小于等于这个阈值时,该磁盘将不再被选择用于生成新的数据文件。该配置项的单位为字节,其值应该大于 2GB,即会跳过可用空间小于 2GB 的挂载点。 diff --git a/docs/zh/20-third-party/11-kafka.md b/docs/zh/20-third-party/11-kafka.md index 64391916ab..2f09e50f0b 100644 --- a/docs/zh/20-third-party/11-kafka.md +++ b/docs/zh/20-third-party/11-kafka.md @@ -23,7 +23,7 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送 1. Linux 操作系统 2. 已安装 Java 8 和 Maven 3. 已安装 Git、curl、vi -4. 已安装并启动 TDengine。 +4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](../../operation/pkg-install) ## 安装 Kafka diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 99160f1519..baf37f00eb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -248,7 +248,7 @@ typedef struct SStreamChildEpInfo { typedef struct SStreamTaskKey { int64_t streamId; - int32_t taskId; + int64_t taskId; } SStreamTaskKey; typedef struct SStreamTaskId { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ff9e922ee1..f816a333a8 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -128,7 +128,7 @@ int32_t tsQueryPolicy = 1; int32_t tsQueryRspPolicy = 0; int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT bool tsEnableQueryHb = true; -bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true +bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true int32_t tsQuerySmaOptimize = 0; int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data. bool tsQueryPlannerTrace = false; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 8484148642..431864ae97 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1255,12 +1255,12 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { sprintf(detail, "colVer:%d, delay1:%" PRId64 ", delay2:%" PRId64 ", deleteMark1:%" PRId64 ", " "deleteMark2:%" PRId64 ", igExists:%d, numOfColumns:%d, numOfFuncs:%d, numOfTags:%d, " "source:%d, suid:%" PRId64 ", tagVer:%d, ttl:%d, " - "watermark1:%" PRId64 ", watermark2:%" PRId64, - createReq.colVer, createReq.delay1, createReq.delay2, createReq.deleteMark1, + "watermark1:%" PRId64 ", watermark2:%" PRId64, + createReq.colVer, createReq.delay1, createReq.delay2, createReq.deleteMark1, createReq.deleteMark2, createReq.igExists, createReq.numOfColumns, createReq.numOfFuncs, createReq.numOfTags, createReq.source, createReq.suid, createReq.tagVer, createReq.ttl, createReq.watermark1, createReq.watermark2); - + mndAuditFieldStr(detail, createReq.pColumns, createReq.numOfColumns, AUDIT_DETAIL_MAX); mndAuditFieldStr(detail, createReq.pTags, createReq.numOfTags, AUDIT_DETAIL_MAX); @@ -2610,7 +2610,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { char detail[2000] = {0}; sprintf(detail, "igNotExists:%d, source:%d" , dropReq.igNotExists, dropReq.source); - + SName name = {0}; tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7625f8869e..5752dae87b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -876,13 +876,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { char detail[2000] = {0}; sprintf(detail, "checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64 - ", " - "fillHistory:%d, igExists:%d, " - "igExpired:%d, igUpdate:%d, lastTs:%" PRId64 - ", " - "maxDelay:%" PRId64 - ", numOfTags:%d, sourceDB:%s, " - "targetStbFullName:%s, triggerType:%d, watermark:%" PRId64, + ", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64 + ", maxDelay:%" PRId64 ", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64, createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark, createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate, createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB, diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 0b243e0a9c..a9fb5096fb 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -642,7 +642,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { SName tableName = {0}; tNameFromString(&tableName, createTopicReq.subStbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - sprintf(detail, "igExists:%d, subStbName:%s, subType:%d, withMeta:%d, sql:%s", + sprintf(detail, "igExists:%d, subStbName:%s, subType:%d, withMeta:%d, sql:%s", createTopicReq.igExists, tableName.tname, createTopicReq.subType, createTopicReq.withMeta, sql); SName dbname = {0}; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index b107442199..95671e5900 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -657,7 +657,7 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) { if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; char detail[1000] = {0}; - sprintf(detail, "createType:%d, enable:%d, superUser:%d, sysInfo:%d", + sprintf(detail, "createType:%d, enable:%d, superUser:%d, sysInfo:%d", createReq.createType, createReq.enable, createReq.superUser, createReq.sysInfo); auditRecord(pReq, pMnode->clusterId, "createUser", createReq.user, "", detail); @@ -1039,16 +1039,16 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; char detail[1000] = {0}; - sprintf(detail, "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, tabName:%s, password:", + sprintf(detail, "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, tabName:%s, password:", mndUserAuditTypeStr(alterReq.alterType), alterReq.enable, alterReq.superUser, alterReq.sysInfo, alterReq.tabName); if(alterReq.alterType == TSDB_ALTER_USER_PASSWD){ - sprintf(detail, "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, tabName:%s, password:xxx", - mndUserAuditTypeStr(alterReq.alterType), alterReq.enable, alterReq.superUser, alterReq.sysInfo, + sprintf(detail, "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, tabName:%s, password:xxx", + mndUserAuditTypeStr(alterReq.alterType), alterReq.enable, alterReq.superUser, alterReq.sysInfo, alterReq.tabName); auditRecord(pReq, pMnode->clusterId, "alterUser", alterReq.user, "", detail); } - else if(alterReq.alterType == TSDB_ALTER_USER_SUPERUSER || + else if(alterReq.alterType == TSDB_ALTER_USER_SUPERUSER || alterReq.alterType == TSDB_ALTER_USER_ENABLE || alterReq.alterType == TSDB_ALTER_USER_SYSINFO){ auditRecord(pReq, pMnode->clusterId, "alterUser", alterReq.user, "", detail); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 447a7e2d90..f99f88f870 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1760,3 +1760,44 @@ _end: return rsp.code; } +int32_t tqProcessTaskStopReq(STQ* pTq, SRpcMsg* pMsg) { + int32_t vgId = TD_VID(pTq->pVnode); + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + + SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; + + SStreamMeta* pMeta = pTq->pStreamMeta; + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); + if (pTask == NULL) { + tqError("vgId:%d process stop req, failed to acquire task:0x%x, it may have been dropped already", vgId, + pReq->taskId); + // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active + return TSDB_CODE_SUCCESS; + } + + tqDebug("s-task:%s receive stop msg from mnode", pTask->id.idStr); + streamTaskStop(pTask); + + SStreamTask* pHistoryTask = NULL; + if (pTask->historyTaskId.taskId != 0) { + pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); + if (pHistoryTask == NULL) { + tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already", + pMeta->vgId, pTask->historyTaskId.taskId); + streamMetaReleaseTask(pMeta, pTask); + + // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active + return TSDB_CODE_SUCCESS; + } + + tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); + + streamTaskStop(pHistoryTask); + streamMetaReleaseTask(pMeta, pHistoryTask); + } + + streamMetaReleaseTask(pMeta, pTask); + tmsgSendRsp(&rsp); + return 0; +} diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 3c0321f300..f15bfd26a2 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -386,6 +386,16 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } + taosThreadMutexLock(&pTask->lock); + pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + + if (pTask->status.taskStatus != TASK_STATUS__NORMAL) { + tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus); + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + if (pItem != NULL) { noDataInWal = false; code = streamTaskPutDataIntoInputQ(pTask, pItem); diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index f19068ea88..733f073ce9 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -425,6 +425,20 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * if (code) goto _exit; } + if (pWriter->pStreamTaskWriter) { + code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback); + if (code) goto _exit; + } + + if (pWriter->pStreamStateWriter) { + code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback); + if (code) goto _exit; + + code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, 0); + pWriter->pStreamStateWriter = NULL; + if (code) goto _exit; + } + if (pWriter->pRsmaSnapWriter) { code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback); if (code) goto _exit; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index e81d0ef989..9d5a300c23 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3734,7 +3734,7 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t removeConstantValueFromList(SNodeList** pList) { SNode* pNode = NULL; WHERE_EACH(pNode, *pList) { - if (nodeType(pNode) == QUERY_NODE_VALUE || + if (nodeType(pNode) == QUERY_NODE_VALUE || (nodeType(pNode) == QUERY_NODE_FUNCTION && fmIsConstantResFunc((SFunctionNode*)pNode) && fmIsScalarFunc(((SFunctionNode*)pNode)->funcId))) { ERASE_NODE(*pList); continue; @@ -3753,7 +3753,11 @@ static int32_t removeConstantValueFromList(SNodeList** pList) { static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelect) { pCxt->currClause = SQL_CLAUSE_PARTITION_BY; int32_t code = TSDB_CODE_SUCCESS; - + + if (pSelect->pPartitionByList) { + code = removeConstantValueFromList(&pSelect->pPartitionByList); + } + if (TSDB_CODE_SUCCESS == code && pSelect->pPartitionByList) { int8_t typeType = getTableTypeFromTableNode(pSelect->pFromTable); SNode* pPar = nodesListGetNode(pSelect->pPartitionByList, 0); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 24a1ea2377..d460c8074d 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -447,7 +447,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pScan->pScanPseudoCols = pNewScanPseudoCols; } } -*/ +*/ } if (NULL != pScan->pScanCols) { @@ -516,7 +516,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; pJoin->isLowLevelJoin = pJoinTable->isLowLevelJoin; - + int32_t code = TSDB_CODE_SUCCESS; // set left and right node @@ -564,7 +564,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect code = createColumnByRewriteExprs(pColList, &pJoin->node.pTargets); } } - + if (TSDB_CODE_SUCCESS == code) { SNodeList* pColList = NULL; if (QUERY_NODE_REAL_TABLE == nodeType(pJoinTable->pRight) && !pJoin->isLowLevelJoin) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 52c3546a39..5628f5cf0f 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -539,7 +539,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla return TSDB_CODE_OUT_OF_MEMORY; } vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); - + pScan->onlyMetaCtbIdx = pScanLogicNode->onlyMetaCtbIdx; return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); @@ -726,7 +726,7 @@ static int32_t mergeEqCond(SNode** ppDst, SNode** ppSrc) { *ppSrc = NULL; return TSDB_CODE_SUCCESS; } - + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); if (NULL == pLogicCond) { return TSDB_CODE_OUT_OF_MEMORY; @@ -754,7 +754,7 @@ static int32_t getJoinDataBlockDescNode(SNodeList* pChildren, int32_t idx, SData planError("Invalid join children num:%d or child type:%d", pChildren->length, nodeType(nodesListGetNode(pChildren, 0))); return TSDB_CODE_PLAN_INTERNAL_ERROR; } - + return TSDB_CODE_SUCCESS; } @@ -775,12 +775,12 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi if (TSDB_CODE_SUCCESS == code) { code = getJoinDataBlockDescNode(pChildren, 1, &pRightDesc); } - + if (TSDB_CODE_SUCCESS == code) { code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pJoin->pPrimKeyCond); } - + if (TSDB_CODE_SUCCESS == code) { code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, &pJoin->pTargets); @@ -869,7 +869,7 @@ static int32_t createHashJoinColList(int16_t lBlkId, int16_t rBlkId, SNode* pEq1 if (NULL == pJoin->pOnLeft || NULL == pJoin->pOnRight) { return TSDB_CODE_OUT_OF_MEMORY; } - + code = extractHashJoinOnCols(lBlkId, rBlkId, pEq1, pJoin); if (TSDB_CODE_SUCCESS == code) { code = extractHashJoinOnCols(lBlkId, rBlkId, pEq2, pJoin); @@ -893,10 +893,10 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys return TSDB_CODE_OUT_OF_MEMORY; } SNodeList* pNew = nodesMakeList(); - + FOREACH(pNode, pJoin->pTargets) { SColumnNode* pCol = (SColumnNode*)pNode; - int32_t len = getSlotKey(pNode, NULL, name, TSDB_COL_FNAME_LEN); + int32_t len = getSlotKey(pNode, NULL, name, TSDB_COL_FNAME_LEN); tSimpleHashPut(pHash, name, len, &pCol, POINTER_BYTES); } @@ -905,7 +905,7 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys FOREACH(pNode, pJoin->pOnLeft) { SColumnNode* pCol = (SColumnNode*)pNode; - int32_t len = getSlotKey(pNode, NULL, name, TSDB_COL_FNAME_LEN); + int32_t len = getSlotKey(pNode, NULL, name, TSDB_COL_FNAME_LEN); SNode** p = tSimpleHashGet(pHash, name, len); if (p) { nodesListStrictAppend(pJoin->pTargets, *p); @@ -914,7 +914,7 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys } FOREACH(pNode, pJoin->pOnRight) { SColumnNode* pCol = (SColumnNode*)pNode; - int32_t len = getSlotKey(pNode, NULL, name, TSDB_COL_FNAME_LEN); + int32_t len = getSlotKey(pNode, NULL, name, TSDB_COL_FNAME_LEN); SNode** p = tSimpleHashGet(pHash, name, len); if (p) { nodesListStrictAppend(pJoin->pTargets, *p); @@ -930,7 +930,7 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys if (p == NULL) { break; } - + nodesListStrictAppend(pJoin->pTargets, *p); } } @@ -958,10 +958,10 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pJoin->pPrimKeyCond); if (TSDB_CODE_SUCCESS == code) { code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqCond); - } + } if (TSDB_CODE_SUCCESS == code) { code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond, &pJoin->pTagEqCond); - } + } if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) { code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pOtherOnCond, &pJoin->pFilterConditions); } @@ -973,10 +973,10 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil } if (TSDB_CODE_SUCCESS == code) { code = createHashJoinColList(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin->pPrimKeyCond, pJoin->pColEqCond, pJoin->pTagEqCond, pJoin); - } + } if (TSDB_CODE_SUCCESS == code) { code = sortHashJoinTargets(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin); - } + } if (TSDB_CODE_SUCCESS == code) { code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc); } @@ -1001,7 +1001,7 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren planError("Invalid join algorithm:%d", pJoinLogicNode->joinAlgo); break; } - + return TSDB_CODE_FAILED; } @@ -1019,7 +1019,7 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh pGrpCache->batchFetch = pLogicNode->batchFetch; SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc; int32_t code = TSDB_CODE_SUCCESS; -/* +/* if (TSDB_CODE_SUCCESS == code) { code = setListSlotId(pCxt, pChildDesc->dataBlockId, -1, pLogicNode->pGroupCols, &pGrpCache->pGroupCols); } @@ -1045,7 +1045,7 @@ static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList* } if (TSDB_CODE_SUCCESS == code) { memcpy(pDynCtrl->stbJoin.srcScan, pLogicNode->stbJoin.srcScan, sizeof(pDynCtrl->stbJoin.srcScan)); - + SNode* pNode = NULL; int32_t i = 0; FOREACH(pNode, pVgList) { @@ -1062,7 +1062,7 @@ static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList* return code; } - + static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SDynQueryCtrlLogicNode* pLogicNode, SPhysiNode** pPhyNode) { int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index baf319d014..1608a9e244 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -312,7 +312,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); ASSERT(remain >= 0); - if (remain == 0) { // all tasks are ready + if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); pMeta->totalTasks = 0; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 0a0ea221d6..3dbb224e79 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2157,7 +2157,6 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { if (rpcDebugFlag & DEBUG_DEBUG) { STraceId* trace = &pMsg->msg.info.traceId; char tbuf[512] = {0}; - EPSET_TO_STR(&pCtx->epSet, tbuf); tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf, pCtx->retryStep, pCtx->retryNextInterval); @@ -2388,7 +2387,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (rpcDebugFlag & DEBUG_TRACE) { char tbuf[512] = {0}; EPSET_TO_STR(&pCtx->epSet, tbuf); - tGTrace("%s conn %p extract epset from msg, use:%s", CONN_GET_INST_LABEL(pConn), pConn, tbuf); + tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); } } if (rpcDebugFlag & DEBUG_TRACE) { diff --git a/tests/script/tsim/query/tag_scan.sim b/tests/script/tsim/query/tag_scan.sim index 4d5b4cfe40..3f0a23fbd6 100644 --- a/tests/script/tsim/query/tag_scan.sim +++ b/tests/script/tsim/query/tag_scan.sim @@ -90,7 +90,7 @@ endi sql select tags tbname,t,b from stt1 order by t print $rows -print $data00 $data01 $data02 $data10 $data11 $data12 $data20 $data21 $data22 $data30 $data31 $data32 +print $data00 $data01 $data02 $data10 $data11 $data12 $data20 $data21 $data22 $data30 $data31 $data32 if $rows != 4 then return -1 endi @@ -103,7 +103,7 @@ endi sql select tags t,b from stt1 where t=1 print $rows -print $data00 $data01 +print $data00 $data01 if $rows != 1 then return -1 endi @@ -116,7 +116,7 @@ endi sql select tags t,b from stt1 where tbname='ctt11' print $rows -print $data00 $data01 +print $data00 $data01 if $rows != 1 then return -1 endi @@ -129,7 +129,7 @@ endi sql select tags t,b from ctt11 print $rows -print $data00 $data01 +print $data00 $data01 if $rows != 1 then return -1 endi