diff --git a/docs/zh/03-intro.md b/docs/zh/03-intro.md index 47d00cf01a..0167f9323b 100644 --- a/docs/zh/03-intro.md +++ b/docs/zh/03-intro.md @@ -24,11 +24,11 @@ TDengine 经过特别优化,以适应时间序列数据的独特需求,引 1. 写入数据:TDengine 支持多种数据写入方式。首先,它完全兼容 SQL,允许用户使用标准的 SQL 语法进行数据写入。而且 TDengine 还支持无模式(Schemaless)写入,包括流行的 InfluxDB Line 协议、OpenTSDB 的 Telnet 和 JSON 协议,这些协议的加入使得数据的导入变得更加灵活和高效。更进一步,TDengine 与众多第三方工具实现了无缝集成,例如 Telegraf、Prometheus、EMQX、StatsD、collectd 和 HiveMQ 等。在 TDengine Enterprise 中, 还提供了 MQTT、OPC-UA、OPC-DA、PI、Wonderware、Kafka、InfluxDB、OpenTSDB、MySQL、Oracle 和 SQL Server 等连接器。这些工具通过简单的配置,无需一行代码,就可以将来自各种数据源的数据源源不断的写入数据库,极大地简化了数据收集和存储的过程。 -2. 查询数据:TDengine 提供标准的 SQL 查询语法,并针对时序数据和业务的特点优化和新增了许多语法和功能,例如降采样、插值、累计求和、时间加权平均、状态窗口、时间窗口、会话窗口、滑动窗口等。TDengine 还支持用户自定义函数(UDF) +2. 查询数据:TDengine 提供标准的 SQL 查询语法,并针对时序数据和业务的特点优化和新增了许多语法和功能,例如降采样、插值、累计求和、时间加权平均、状态窗口、时间窗口、会话窗口、滑动窗口等。TDengine 还支持用户自定义函数(UDF)。 3. 缓存:TDengine 使用时间驱动缓存管理策略(First-In-First-Out,FIFO),将最近到达的(当前状态)数据保存在缓存中,这样便于获取任何监测对象的实时状态,而无需使用 Redis 等其他缓存工具,简化系统架构和运营成本。 -4. 流式计算:TDengine 流式计算引擎提供了实时处理写入的数据流的能力,不仅支持连续查询,还支持事件驱动的流式计算。它提供了替代复杂流处理系统的轻量级解决方案,并能够在高吞吐的数据写入的情况下,提供毫秒级的计算结果延迟 +4. 流式计算:TDengine 流式计算引擎提供了实时处理写入的数据流的能力,不仅支持连续查询,还支持事件驱动的流式计算。它提供了替代复杂流处理系统的轻量级解决方案,并能够在高吞吐的数据写入的情况下,提供毫秒级的计算结果延迟。 5. 数据订阅:TDengine 提供了类似 Kafka 的数据订阅功能。但用户可以通过 SQL 来灵活控制订阅的数据内容,并使用 Kafka 相同的 API 来订阅一张表、一组表、全部列或部分列、甚至整个数据库的数据。TDengine 可以替代需要集成消息队列产品的场景, 从而简化系统设计的复杂度,降低运营维护成本。 @@ -38,13 +38,13 @@ TDengine 经过特别优化,以适应时间序列数据的独特需求,引 8. 数据迁移:TDengine 提供了多种便捷的数据导入导出功能,包括脚本文件导入导出、数据文件导入导出、taosdump 工具导入导出等。 -9. 编程连接器:TDengine 提供不同语言的连接器,包括 C/C++、Java、Go、Node.js、Rust、Python、C#、R、PHP 等。这些连接器大多都支持原生连接和 WebSocket 两种连接方式。TDengine 也提供 REST 接口,任何语言的应用程序可以直接通过 HTTP 请求访问数据库。 +9. 编程连接器:TDengine 提供不同语言的连接器,包括 C/C++、Java、Go、Node.js、Rust、Python、C#、R、PHP 等。这些连接器大多都支持原生连接和 WebSocket 两种连接方式。TDengine 也提供 RESTful 接口,任何语言的应用程序可以直接通过 HTTP 请求访问数据库。 10. 数据安全:TDengine 提供了丰富的用户管理和权限管理功能以控制不同用户对数据库和表的访问权限,提供了 IP 白名单功能以控制不同帐号只能从特定的服务器接入集群。TDengine 支持系统管理员对不同数据库按需加密,数据加密后对读写完全透明且对性能的影响很小。还提供了审计日志功能以记录系统中的敏感操作。 11. 常用工具:TDengine 还提供了交互式命令行程序(CLI),便于管理集群、检查系统状态、做即时查询。压力测试工具 taosBenchmark,用于测试 TDengine 的性能。TDengine 还提供了图形化管理界面,简化了操作和管理过程。 -12. 零代码数据接入:TDengine 企业版提供了丰富的数据接入功能,依托强大的数据接入平台,无需一行代码,只需要做简单的配置即可实现多种数据源的数据接入,目前已经支持的数据源包括:OPC UA, OPC DA, Pi, MQTT, Kafka, InfluxDB, OpenTSDB, MySql, SQL Server, Oracle, Wonderware Historian, MongoDB。 +12. 零代码数据接入:TDengine 企业版提供了丰富的数据接入功能,依托强大的数据接入平台,无需一行代码,只需要做简单的配置即可实现多种数据源的数据接入,目前已经支持的数据源包括:OPC-UA、OPC-DA、PI、MQTT、Kafka、InfluxDB、OpenTSDB、MySQL、SQL Server、Oracle、Wonderware Historian、MongoDB。 ## TDengine 与典型时序数据库的区别 @@ -58,16 +58,14 @@ TDengine 经过特别优化,以适应时间序列数据的独特需求,引 4. 强大的分析能力:TDengine 不仅支持标准 SQL 查询,还为时序数据特有的分析提供了 SQL 扩展。通过超级表、存储计算分离、分区分片、预计算、UDF 等先进技术,TDengine 展现出强大的数据分析能力。 -5. 简单易用:TDengine 安装无依赖,集群部署仅需几秒即可完成。它提供了 REST ful接口和多种编程语言的连接器,与众多第三方工具无缝集成。此外,命令行程序和丰富的运维工具也极大地方便了用户的管理和即时查询需求。 +5. 简单易用:TDengine 安装无依赖,集群部署仅需几秒即可完成。它提供了 RESTful 接口和多种编程语言的连接器,与众多第三方工具无缝集成。此外,命令行程序和丰富的运维工具也极大地方便了用户的管理和即时查询需求。 -6. 核心开源:TDengine 的核心代码,包括集群功能,均在开源协议下公开发布。它在GitHub 网站全球趋势排行榜上多次位居榜首,显示出其受欢迎程度。同时,TDengine -拥有一个活跃的开发者社区,为技术的持续发展和创新提供了有力支持。 +6. 核心开源:TDengine 的核心代码,包括集群功能,均在开源协议下公开发布。它在 GitHub 网站全球趋势排行榜上多次位居榜首,显示出其受欢迎程度。同时,TDengine 拥有一个活跃的开发者社区,为技术的持续发展和创新提供了有力支持。 采用 TDengine,企业可以在物联网、车联网、工业互联网等典型场景中显著降低大数据平台的总拥有成本,主要体现在以下几个方面: 1. 高性能带来的成本节约:TDengine 卓越的写入、查询和存储性能意味着系统所需的计算资源和存储资源可以大幅度减少。这不仅降低了硬件成本,还减少了能源消耗和维护费用。 2. 标准化与兼容性带来的成本效益:由于 TDengine 支持标准 SQL,并与众多第三方软件实现了无缝集成,用户可以轻松地将现有系统迁移到 TDengine 上,无须重写大量代码。这种标准化和兼容性大大降低了学习和迁移成本,缩短了项目周期。 -3. 简化系统架构带来的成本降低:作为一个极简的时序数据平台,TDengine 集成了消息队列、缓存、流计算等必要功能,避免了额外集成众多其他组件的需要。这 -种简化的系统架构显著降低了系统的复杂度,从而减少了研发和运营成本,提高了整体运营效率。 +3. 简化系统架构带来的成本降低:作为一个极简的时序数据平台,TDengine 集成了消息队列、缓存、流计算等必要功能,避免了额外集成众多其他组件的需要。这种简化的系统架构显著降低了系统的复杂度,从而减少了研发和运营成本,提高了整体运营效率。 ## 技术生态 diff --git a/docs/zh/04-get-started/03-package.md b/docs/zh/04-get-started/03-package.md index 4906a2fcfa..7df41af831 100644 --- a/docs/zh/04-get-started/03-package.md +++ b/docs/zh/04-get-started/03-package.md @@ -146,7 +146,7 @@ Note: 从 3.0.1.7 开始,只提供 TDengine 客户端的 Windows 客户端的 :::info -下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases/tdengine)。 +下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](https://docs.taosdata.com/releases/tdengine/)。 ::: :::note diff --git a/docs/zh/08-develop/01-connect/index.md b/docs/zh/08-develop/01-connect/index.md index 755a9e7f74..160e4cd40c 100644 --- a/docs/zh/08-develop/01-connect/index.md +++ b/docs/zh/08-develop/01-connect/index.md @@ -99,7 +99,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速 - **安装前准备** - 安装 Python。新近版本 taospy 包要求 Python 3.6.2+。早期版本 taospy 包要求 Python 3.7+。taos-ws-py 包要求 Python 3.7+。如果系统上还没有 Python 可参考 [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) 安装。 - 安装 [pip](https://pypi.org/project/pip/)。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 [pip documentation](https://pip.pypa.io/en/stable/installation/) 安装。 - - 如果使用原生连接,还需[安装客户端驱动](../#安装客户端驱动)。客户端软件包含了 TDengine 客户端动态链接库(libtaos.so 或 taos.dll) 和 TDengine CLI。 + - 如果使用原生连接,还需[安装客户端驱动](../connect/#安装客户端驱动-taosc)。客户端软件包含了 TDengine 客户端动态链接库(libtaos.so 或 taos.dll) 和 TDengine CLI。 - **使用 pip 安装** - 卸载旧版本 diff --git a/docs/zh/26-tdinternal/03-storage.md b/docs/zh/26-tdinternal/03-storage.md index f65f06e85b..e402babdd7 100644 --- a/docs/zh/26-tdinternal/03-storage.md +++ b/docs/zh/26-tdinternal/03-storage.md @@ -101,7 +101,7 @@ head 文件是时序数据存储文件(data 文件)的 BRIN(Block Range In head 文件中存储了多个 BRIN 记录块及其索引。BRIN 记录块采用列存压缩的方式,这种方式可以大大减少空间占用,同时保持较高的查询性能。BRIN 索引结构如下图所示: -![BRIN 索引结构](./brin.png) +![BRIN 索引结构](./brin.png) #### data 文件 @@ -121,4 +121,4 @@ data 文件是实际存储时序数据的文件。在 data 文件中,时序数 在少表高频的场景下,系统仅维护一个 stt 文件。该文件专门用于存储每次数据落盘后剩余的碎片数据。这样,在下一次数据落盘时,这些碎片数据可以与内存中的新数据合并,形成较大的数据块,随后一并写入 data 文件中。这种机制有效地避免了数据文件的碎片化,确保了数据存储的连续性和高效性。 -对于多表低频的场景,建议配置多个 stt 文件。这种场景下的核心思想是,尽管单张表每次落盘的数据量可能不大,但同一超级表下的所有子表累积的数据量却相当可观。通过合并这些数据,可以生成较大的数据块,从而减少数据块的碎片化。这不仅提升了数据的写入效率,还能显著提高查询性能,因为连续的数据存储更有利于快速的数据检索和访问。 \ No newline at end of file +对于多表低频的场景,建议配置多个 stt 文件。这种场景下的核心思想是,尽管单张表每次落盘的数据量可能不大,但同一超级表下的所有子表累积的数据量却相当可观。通过合并这些数据,可以生成较大的数据块,从而减少数据块的碎片化。这不仅提升了数据的写入效率,还能显著提高查询性能,因为连续的数据存储更有利于快速的数据检索和访问。 diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9c681e937e..b695290f6a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2840,6 +2840,8 @@ typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; char clientId[TSDB_CLIENT_ID_LEN]; + char user[TSDB_USER_LEN]; + char fqdn[TSDB_FQDN_LEN]; SArray* topicNames; // SArray int8_t withTbName; @@ -2873,6 +2875,8 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta); tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs); tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs); + tlen += taosEncodeString(buf, pReq->user); + tlen += taosEncodeString(buf, pReq->fqdn); return tlen; } @@ -2907,6 +2911,8 @@ static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeR if ((char*)buf - (char*)start < len) { buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs); buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs); + buf = taosDecodeStringTo(buf, pReq->user); + buf = taosDecodeStringTo(buf, pReq->fqdn); } else { pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 8f35a2fad1..0224a20109 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -82,6 +82,8 @@ struct tmq_t { int64_t refId; char groupId[TSDB_CGROUP_LEN]; char clientId[TSDB_CLIENT_ID_LEN]; + char user[TSDB_USER_LEN]; + char fqdn[TSDB_FQDN_LEN]; int8_t withTbName; int8_t useSnapshot; int8_t autoCommit; @@ -1265,6 +1267,10 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->replayEnable = conf->replayEnable; pTmq->sourceExcluded = conf->sourceExcluded; pTmq->enableBatchMeta = conf->enableBatchMeta; + tstrncpy(pTmq->user, user, TSDB_USER_LEN); + if (taosGetFqdn(pTmq->fqdn) != 0) { + (void)strcpy(pTmq->fqdn, "localhost"); + } if (conf->replayEnable) { pTmq->autoCommit = false; } @@ -1332,6 +1338,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { req.consumerId = tmq->consumerId; tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN); tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); + tstrncpy(req.user, tmq->user, TSDB_USER_LEN); + tstrncpy(req.fqdn, tmq->fqdn, TSDB_FQDN_LEN); req.topicNames = taosArrayInit(sz, sizeof(void*)); if (req.topicNames == NULL) { diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 2d69a687a6..3f27ab2b2b 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -344,7 +344,9 @@ static const SSysDbTableSchema subscriptionSchema[] = { {.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "consumer_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "fqdn", .bytes = TSDB_FQDN_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "offset", .bytes = TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "rows", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, }; @@ -480,11 +482,12 @@ static const SSysDbTableSchema connectionsSchema[] = { {.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, }; - static const SSysDbTableSchema consumerSchema[] = { {.name = "consumer_id", .bytes = TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "client_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "fqdn", .bytes = TSDB_FQDN_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, /*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},*/ diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 62e77867f6..99e59662ac 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -597,6 +597,8 @@ typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; char clientId[TSDB_CLIENT_ID_LEN]; + char user[TSDB_USER_LEN]; + char fqdn[TSDB_FQDN_LEN]; int8_t updateType; // used only for update int32_t epoch; int32_t status; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 8f2523d50e..65617ddf31 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -903,6 +903,22 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * MND_TMQ_NULL_CHECK(pColInfo); MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false)); + // user + char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(user, pConsumer->user); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + MND_TMQ_NULL_CHECK(pColInfo); + MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false)); + + // fqdn + char fqdn[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(fqdn, pConsumer->fqdn); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + MND_TMQ_NULL_CHECK(pColInfo); + MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)fqdn, false)); + // status const char *pStatusName = mndConsumerStatusName(pConsumer->status); status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 695bf4d30d..c604e58588 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -325,6 +325,8 @@ int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg; pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs; pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs; + tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN); + tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN); pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup); if (pConsumer->rebNewTopics == NULL){ @@ -429,6 +431,8 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg); tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs); tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs); + tlen += taosEncodeString(buf, pConsumer->user); + tlen += taosEncodeString(buf, pConsumer->fqdn); return tlen; } @@ -503,6 +507,8 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s if (sver > 2){ buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs); buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs); + buf = taosDecodeStringTo(buf, pConsumer->user); + buf = taosDecodeStringTo(buf, pConsumer->fqdn); } else{ pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index b3f8cf0aea..22ac58c9db 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -1330,8 +1330,8 @@ END: TAOS_RETURN(code); } -static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *topic, - const char *cgroup, SArray *vgs, SArray *offsetRows) { +static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char* user, const char* fqdn, + const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) { int32_t code = 0; int32_t sz = taosArrayGetSize(vgs); for (int32_t j = 0; j < sz; j++) { @@ -1355,7 +1355,7 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t cons MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false)); // consumer id - char consumerIdHex[32] = {0}; + char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0}; (void)sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, consumerId); varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex))); @@ -1363,6 +1363,18 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t cons MND_TMQ_NULL_CHECK(pColInfo); MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1)); + char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; + if (user) STR_TO_VARSTR(userStr, user); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + MND_TMQ_NULL_CHECK(pColInfo); + MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL)); + + char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0}; + if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + MND_TMQ_NULL_CHECK(pColInfo); + MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL)); + mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId, varDataVal(cgroup), pVgEp->vgId); @@ -1435,16 +1447,25 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock SMqConsumerEp *pConsumerEp = NULL; void *pIter = NULL; + while (1) { pIter = taosHashIterate(pSub->consumerHash, pIter); if (pIter == NULL) break; pConsumerEp = (SMqConsumerEp *)pIter; - MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs, + char *user = NULL; + char *fqdn = NULL; + SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId); + if (pConsumer != NULL) { + user = pConsumer->user; + fqdn = pConsumer->fqdn; + sdbRelease(pSdb, pConsumer); + } + MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup, pConsumerEp->vgs, pConsumerEp->offsetRows)); } - MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows)); + MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows)); pBlock->info.rows = numOfRows; diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index d5284cd3fc..0a80cc817d 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -133,9 +133,9 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn int32_t numOfNotFillCols, const struct SNodeListNode* val); bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); -void taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity, - SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId, - int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo); +int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity, + SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId, + int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo); void* taosDestroyFillInfo(struct SFillInfo* pFillInfo); int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 34f174377b..0b66834d45 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -405,8 +405,12 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t // STimeWindow w = {0}; // getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC); pInfo->pFillInfo = NULL; - taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, - pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo); + int32_t code = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, + pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + return code; + } if (order == TSDB_ORDER_ASC) { pInfo->win.skey = win.skey; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 99ceb2ab59..93facbf396 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -536,9 +536,10 @@ static int32_t createTableCacheVal(const SMetaReader* pMetaReader, STableCachedV int32_t lino = 0; STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal)); QUERY_CHECK_NULL(pVal, code, lino, _end, terrno); + + pVal->pTags = NULL; pVal->pName = taosStrdup(pMetaReader->me.name); QUERY_CHECK_NULL(pVal->pName, code, lino, _end, terrno); - pVal->pTags = NULL; // only child table has tag value if (pMetaReader->me.type == TSDB_CHILD_TABLE) { diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index c2ccdf62f5..e346946a7a 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -523,14 +523,14 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { return pFillInfo->numOfRows - pFillInfo->index; } -void taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity, - SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t primaryTsSlotId, - int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo) { +int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity, + SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t primaryTsSlotId, + int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (fillType == TSDB_FILL_NONE) { (*ppFillInfo) = NULL; - return; + return code; } SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo)); @@ -572,10 +572,9 @@ _end: if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pFillInfo->next.pRowVal); taosArrayDestroy(pFillInfo->prev.pRowVal); - terrno = code; - T_LONG_JMP(pTaskInfo->env, code); } (*ppFillInfo) = pFillInfo; + return code; } void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 5e5d6494cf..fa9dc79cc3 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1230,7 +1230,7 @@ void destroyIntervalOperatorInfo(void* param) { } static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo, - SExecTaskInfo* pTaskInfo) { + bool* pRes) { // the primary timestamp column bool needed = false; int32_t code = TSDB_CODE_SUCCESS; @@ -1298,10 +1298,9 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); } - return needed; + *pRes = needed; + return code; } int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, @@ -1390,7 +1389,10 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); QUERY_CHECK_CODE(code, lino, _error); - pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, pTaskInfo); + + pInfo->timeWindowInterpo = false; + code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo); + QUERY_CHECK_CODE(code, lino, _error); if (pInfo->timeWindowInterpo) { pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo)); if (pInfo->binfo.resultRowInfo.openWindow == NULL) { @@ -2085,7 +2087,9 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); QUERY_CHECK_CODE(code, lino, _error); - iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, pTaskInfo); + iaInfo->timeWindowInterpo = false; + code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo); + QUERY_CHECK_CODE(code, lino, _error); if (iaInfo->timeWindowInterpo) { iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo)); } @@ -2416,7 +2420,9 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win); QUERY_CHECK_CODE(code, lino, _error); - pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, pTaskInfo); + pIntervalInfo->timeWindowInterpo = false; + code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo); + QUERY_CHECK_CODE(code, lino, _error); if (pIntervalInfo->timeWindowInterpo) { pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo)); if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) { diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index d7a5540544..616cd034ab 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -222,10 +222,10 @@ class TDTestCase: tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdLog.info(len(tdSql.queryResult)) - tdSql.checkEqual(True, len(tdSql.queryResult) in range(261, 269)) + tdSql.checkEqual(True, len(tdSql.queryResult) in range(261, 271)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") - tdSql.checkEqual(54, len(tdSql.queryResult)) + tdSql.checkEqual(56, len(tdSql.queryResult)) def ins_dnodes_check(self): tdSql.execute('drop database if exists db2') diff --git a/tests/system-test/7-tmq/tmq_primary_key.py b/tests/system-test/7-tmq/tmq_primary_key.py index 80888ddbe6..13d6bd565d 100644 --- a/tests/system-test/7-tmq/tmq_primary_key.py +++ b/tests/system-test/7-tmq/tmq_primary_key.py @@ -85,7 +85,7 @@ class TDTestCase: time.sleep(4) # wait for heart beat tdSql.query(f'show subscriptions;') - sub = tdSql.getData(0, 4); + sub = tdSql.getData(0, 6); print(sub) if not sub.startswith("tsdb"): tdLog.exit(f"show subscriptions error") @@ -196,7 +196,7 @@ class TDTestCase: time.sleep(4) # wait for heart beat tdSql.query(f'show subscriptions;') - sub = tdSql.getData(0, 4); + sub = tdSql.getData(0, 6); print(sub) if not sub.startswith("tsdb"): tdLog.exit(f"show subscriptions error") @@ -306,7 +306,7 @@ class TDTestCase: time.sleep(4) # wait for heart beat tdSql.query(f'show subscriptions;') - sub = tdSql.getData(0, 4); + sub = tdSql.getData(0, 6); print(sub) if not sub.startswith("tsdb"): tdLog.exit(f"show subscriptions error") @@ -416,7 +416,7 @@ class TDTestCase: time.sleep(4) # wait for heart beat tdSql.query(f'show subscriptions;') - sub = tdSql.getData(0, 4); + sub = tdSql.getData(0, 6); print(sub) if not sub.startswith("tsdb"): tdLog.exit(f"show subscriptions error") @@ -517,7 +517,7 @@ class TDTestCase: consumer.close() tdSql.query(f'show subscriptions;') - sub = tdSql.getData(0, 4); + sub = tdSql.getData(0, 6); print(sub) if not sub.startswith("tsdb"): tdLog.exit(f"show subscriptions error") diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index d0e682cffb..4e90aefe7c 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -598,12 +598,12 @@ class TDTestCase: tdSql.query(f'show consumers') tdSql.checkRows(1) tdSql.checkData(0, 1, 'g1') - tdSql.checkData(0, 4, 't2') + tdSql.checkData(0, 6, 't2') tdSql.execute(f'drop consumer group g1 on t1') tdSql.query(f'show consumers') tdSql.checkRows(1) tdSql.checkData(0, 1, 'g1') - tdSql.checkData(0, 4, 't2') + tdSql.checkData(0, 6, 't2') tdSql.query(f'show subscriptions') tdSql.checkRows(1) @@ -641,7 +641,7 @@ class TDTestCase: tdSql.query(f'show consumers') tdSql.checkRows(1) tdSql.checkData(0, 1, 'g1') - tdSql.checkData(0, 4, 't2') + tdSql.checkData(0, 6, 't2') tdSql.execute(f'insert into t4 using st tags(3) values(now, 1)') try: