Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-30837

This commit is contained in:
54liuyao 2024-08-15 13:20:54 +08:00
commit ebb5c41d7f
19 changed files with 119 additions and 49 deletions

View File

@ -24,11 +24,11 @@ TDengine 经过特别优化,以适应时间序列数据的独特需求,引
1. 写入数据TDengine 支持多种数据写入方式。首先,它完全兼容 SQL允许用户使用标准的 SQL 语法进行数据写入。而且 TDengine 还支持无模式Schemaless写入包括流行的 InfluxDB Line 协议、OpenTSDB 的 Telnet 和 JSON 协议这些协议的加入使得数据的导入变得更加灵活和高效。更进一步TDengine 与众多第三方工具实现了无缝集成,例如 Telegraf、Prometheus、EMQX、StatsD、collectd 和 HiveMQ 等。在 TDengine Enterprise 中, 还提供了 MQTT、OPC-UA、OPC-DA、PI、Wonderware、Kafka、InfluxDB、OpenTSDB、MySQL、Oracle 和 SQL Server 等连接器。这些工具通过简单的配置,无需一行代码,就可以将来自各种数据源的数据源源不断的写入数据库,极大地简化了数据收集和存储的过程。
2. 查询数据TDengine 提供标准的 SQL 查询语法并针对时序数据和业务的特点优化和新增了许多语法和功能例如降采样、插值、累计求和、时间加权平均、状态窗口、时间窗口、会话窗口、滑动窗口等。TDengine 还支持用户自定义函数UDF
2. 查询数据TDengine 提供标准的 SQL 查询语法并针对时序数据和业务的特点优化和新增了许多语法和功能例如降采样、插值、累计求和、时间加权平均、状态窗口、时间窗口、会话窗口、滑动窗口等。TDengine 还支持用户自定义函数UDF
3. 缓存TDengine 使用时间驱动缓存管理策略First-In-First-OutFIFO将最近到达的当前状态数据保存在缓存中这样便于获取任何监测对象的实时状态而无需使用 Redis 等其他缓存工具,简化系统架构和运营成本。
4. 流式计算TDengine 流式计算引擎提供了实时处理写入的数据流的能力,不仅支持连续查询,还支持事件驱动的流式计算。它提供了替代复杂流处理系统的轻量级解决方案,并能够在高吞吐的数据写入的情况下,提供毫秒级的计算结果延迟
4. 流式计算TDengine 流式计算引擎提供了实时处理写入的数据流的能力,不仅支持连续查询,还支持事件驱动的流式计算。它提供了替代复杂流处理系统的轻量级解决方案,并能够在高吞吐的数据写入的情况下,提供毫秒级的计算结果延迟
5. 数据订阅TDengine 提供了类似 Kafka 的数据订阅功能。但用户可以通过 SQL 来灵活控制订阅的数据内容,并使用 Kafka 相同的 API 来订阅一张表、一组表、全部列或部分列、甚至整个数据库的数据。TDengine 可以替代需要集成消息队列产品的场景, 从而简化系统设计的复杂度,降低运营维护成本。
@ -38,13 +38,13 @@ TDengine 经过特别优化,以适应时间序列数据的独特需求,引
8. 数据迁移TDengine 提供了多种便捷的数据导入导出功能包括脚本文件导入导出、数据文件导入导出、taosdump 工具导入导出等。
9. 编程连接器TDengine 提供不同语言的连接器,包括 C/C++、Java、Go、Node.js、Rust、Python、C#、R、PHP 等。这些连接器大多都支持原生连接和 WebSocket 两种连接方式。TDengine 也提供 REST 接口,任何语言的应用程序可以直接通过 HTTP 请求访问数据库。
9. 编程连接器TDengine 提供不同语言的连接器,包括 C/C++、Java、Go、Node.js、Rust、Python、C#、R、PHP 等。这些连接器大多都支持原生连接和 WebSocket 两种连接方式。TDengine 也提供 RESTful 接口,任何语言的应用程序可以直接通过 HTTP 请求访问数据库。
10. 数据安全TDengine 提供了丰富的用户管理和权限管理功能以控制不同用户对数据库和表的访问权限,提供了 IP 白名单功能以控制不同帐号只能从特定的服务器接入集群。TDengine 支持系统管理员对不同数据库按需加密,数据加密后对读写完全透明且对性能的影响很小。还提供了审计日志功能以记录系统中的敏感操作。
11. 常用工具TDengine 还提供了交互式命令行程序CLI便于管理集群、检查系统状态、做即时查询。压力测试工具 taosBenchmark用于测试 TDengine 的性能。TDengine 还提供了图形化管理界面,简化了操作和管理过程。
12. 零代码数据接入TDengine 企业版提供了丰富的数据接入功能依托强大的数据接入平台无需一行代码只需要做简单的配置即可实现多种数据源的数据接入目前已经支持的数据源包括OPC UA, OPC DA, Pi, MQTT, Kafka, InfluxDB, OpenTSDB, MySql, SQL Server, Oracle, Wonderware Historian, MongoDB。
12. 零代码数据接入TDengine 企业版提供了丰富的数据接入功能依托强大的数据接入平台无需一行代码只需要做简单的配置即可实现多种数据源的数据接入目前已经支持的数据源包括OPC-UA、OPC-DA、PI、MQTT、Kafka、InfluxDB、OpenTSDB、MySQL、SQL Server、Oracle、Wonderware Historian、MongoDB。
## TDengine 与典型时序数据库的区别
@ -58,16 +58,14 @@ TDengine 经过特别优化,以适应时间序列数据的独特需求,引
4. 强大的分析能力TDengine 不仅支持标准 SQL 查询,还为时序数据特有的分析提供了 SQL 扩展。通过超级表、存储计算分离、分区分片、预计算、UDF 等先进技术TDengine 展现出强大的数据分析能力。
5. 简单易用TDengine 安装无依赖,集群部署仅需几秒即可完成。它提供了 REST ful接口和多种编程语言的连接器与众多第三方工具无缝集成。此外命令行程序和丰富的运维工具也极大地方便了用户的管理和即时查询需求。
5. 简单易用TDengine 安装无依赖,集群部署仅需几秒即可完成。它提供了 RESTful 接口和多种编程语言的连接器,与众多第三方工具无缝集成。此外,命令行程序和丰富的运维工具也极大地方便了用户的管理和即时查询需求。
6. 核心开源TDengine 的核心代码包括集群功能均在开源协议下公开发布。它在GitHub 网站全球趋势排行榜上多次位居榜首显示出其受欢迎程度。同时TDengine
拥有一个活跃的开发者社区,为技术的持续发展和创新提供了有力支持。
6. 核心开源TDengine 的核心代码,包括集群功能,均在开源协议下公开发布。它在 GitHub 网站全球趋势排行榜上多次位居榜首显示出其受欢迎程度。同时TDengine 拥有一个活跃的开发者社区,为技术的持续发展和创新提供了有力支持。
采用 TDengine企业可以在物联网、车联网、工业互联网等典型场景中显著降低大数据平台的总拥有成本主要体现在以下几个方面
1. 高性能带来的成本节约TDengine 卓越的写入、查询和存储性能意味着系统所需的计算资源和存储资源可以大幅度减少。这不仅降低了硬件成本,还减少了能源消耗和维护费用。
2. 标准化与兼容性带来的成本效益:由于 TDengine 支持标准 SQL并与众多第三方软件实现了无缝集成用户可以轻松地将现有系统迁移到 TDengine 上,无须重写大量代码。这种标准化和兼容性大大降低了学习和迁移成本,缩短了项目周期。
3. 简化系统架构带来的成本降低作为一个极简的时序数据平台TDengine 集成了消息队列、缓存、流计算等必要功能,避免了额外集成众多其他组件的需要。这
种简化的系统架构显著降低了系统的复杂度,从而减少了研发和运营成本,提高了整体运营效率。
3. 简化系统架构带来的成本降低作为一个极简的时序数据平台TDengine 集成了消息队列、缓存、流计算等必要功能,避免了额外集成众多其他组件的需要。这种简化的系统架构显著降低了系统的复杂度,从而减少了研发和运营成本,提高了整体运营效率。
## 技术生态

View File

@ -146,7 +146,7 @@ Note: 从 3.0.1.7 开始,只提供 TDengine 客户端的 Windows 客户端的
</Tabs>
:::info
下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases/tdengine)。
下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](https://docs.taosdata.com/releases/tdengine/)。
:::
:::note

View File

@ -99,7 +99,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
- **安装前准备**
- 安装 Python。新近版本 taospy 包要求 Python 3.6.2+。早期版本 taospy 包要求 Python 3.7+。taos-ws-py 包要求 Python 3.7+。如果系统上还没有 Python 可参考 [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) 安装。
- 安装 [pip](https://pypi.org/project/pip/)。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 [pip documentation](https://pip.pypa.io/en/stable/installation/) 安装。
- 如果使用原生连接,还需[安装客户端驱动](../#安装客户端驱动)。客户端软件包含了 TDengine 客户端动态链接库(libtaos.so 或 taos.dll) 和 TDengine CLI。
- 如果使用原生连接,还需[安装客户端驱动](../connect/#安装客户端驱动-taosc)。客户端软件包含了 TDengine 客户端动态链接库(libtaos.so 或 taos.dll) 和 TDengine CLI。
- **使用 pip 安装**
- 卸载旧版本

View File

@ -101,7 +101,7 @@ head 文件是时序数据存储文件data 文件)的 BRINBlock Range In
head 文件中存储了多个 BRIN 记录块及其索引。BRIN 记录块采用列存压缩的方式这种方式可以大大减少空间占用同时保持较高的查询性能。BRIN 索引结构如下图所示:
[BRIN 索引结构](./brin.png)
![BRIN 索引结构](./brin.png)
#### data 文件
@ -121,4 +121,4 @@ data 文件是实际存储时序数据的文件。在 data 文件中,时序数
在少表高频的场景下,系统仅维护一个 stt 文件。该文件专门用于存储每次数据落盘后剩余的碎片数据。这样,在下一次数据落盘时,这些碎片数据可以与内存中的新数据合并,形成较大的数据块,随后一并写入 data 文件中。这种机制有效地避免了数据文件的碎片化,确保了数据存储的连续性和高效性。
对于多表低频的场景,建议配置多个 stt 文件。这种场景下的核心思想是,尽管单张表每次落盘的数据量可能不大,但同一超级表下的所有子表累积的数据量却相当可观。通过合并这些数据,可以生成较大的数据块,从而减少数据块的碎片化。这不仅提升了数据的写入效率,还能显著提高查询性能,因为连续的数据存储更有利于快速的数据检索和访问。
对于多表低频的场景,建议配置多个 stt 文件。这种场景下的核心思想是,尽管单张表每次落盘的数据量可能不大,但同一超级表下的所有子表累积的数据量却相当可观。通过合并这些数据,可以生成较大的数据块,从而减少数据块的碎片化。这不仅提升了数据的写入效率,还能显著提高查询性能,因为连续的数据存储更有利于快速的数据检索和访问。

View File

@ -2840,6 +2840,8 @@ typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char clientId[TSDB_CLIENT_ID_LEN];
char user[TSDB_USER_LEN];
char fqdn[TSDB_FQDN_LEN];
SArray* topicNames; // SArray<char**>
int8_t withTbName;
@ -2873,6 +2875,8 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
tlen += taosEncodeFixedI8(buf, pReq->enableBatchMeta);
tlen += taosEncodeFixedI32(buf, pReq->sessionTimeoutMs);
tlen += taosEncodeFixedI32(buf, pReq->maxPollIntervalMs);
tlen += taosEncodeString(buf, pReq->user);
tlen += taosEncodeString(buf, pReq->fqdn);
return tlen;
}
@ -2907,6 +2911,8 @@ static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeR
if ((char*)buf - (char*)start < len) {
buf = taosDecodeFixedI32(buf, &pReq->sessionTimeoutMs);
buf = taosDecodeFixedI32(buf, &pReq->maxPollIntervalMs);
buf = taosDecodeStringTo(buf, pReq->user);
buf = taosDecodeStringTo(buf, pReq->fqdn);
} else {
pReq->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
pReq->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;

View File

@ -82,6 +82,8 @@ struct tmq_t {
int64_t refId;
char groupId[TSDB_CGROUP_LEN];
char clientId[TSDB_CLIENT_ID_LEN];
char user[TSDB_USER_LEN];
char fqdn[TSDB_FQDN_LEN];
int8_t withTbName;
int8_t useSnapshot;
int8_t autoCommit;
@ -1265,6 +1267,10 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->replayEnable = conf->replayEnable;
pTmq->sourceExcluded = conf->sourceExcluded;
pTmq->enableBatchMeta = conf->enableBatchMeta;
tstrncpy(pTmq->user, user, TSDB_USER_LEN);
if (taosGetFqdn(pTmq->fqdn) != 0) {
(void)strcpy(pTmq->fqdn, "localhost");
}
if (conf->replayEnable) {
pTmq->autoCommit = false;
}
@ -1332,6 +1338,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
req.consumerId = tmq->consumerId;
tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN);
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
tstrncpy(req.user, tmq->user, TSDB_USER_LEN);
tstrncpy(req.fqdn, tmq->fqdn, TSDB_FQDN_LEN);
req.topicNames = taosArrayInit(sz, sizeof(void*));
if (req.topicNames == NULL) {

View File

@ -344,7 +344,9 @@ static const SSysDbTableSchema subscriptionSchema[] = {
{.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "consumer_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "fqdn", .bytes = TSDB_FQDN_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "offset", .bytes = TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "rows", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
};
@ -480,11 +482,12 @@ static const SSysDbTableSchema connectionsSchema[] = {
{.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
};
static const SSysDbTableSchema consumerSchema[] = {
{.name = "consumer_id", .bytes = TSDB_CONSUMER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "client_id", .bytes = TSDB_CLIENT_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "fqdn", .bytes = TSDB_FQDN_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
/*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},*/

View File

@ -597,6 +597,8 @@ typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char clientId[TSDB_CLIENT_ID_LEN];
char user[TSDB_USER_LEN];
char fqdn[TSDB_FQDN_LEN];
int8_t updateType; // used only for update
int32_t epoch;
int32_t status;

View File

@ -903,6 +903,22 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
MND_TMQ_NULL_CHECK(pColInfo);
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false));
// user
char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(user, pConsumer->user);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
MND_TMQ_NULL_CHECK(pColInfo);
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)user, false));
// fqdn
char fqdn[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(fqdn, pConsumer->fqdn);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
MND_TMQ_NULL_CHECK(pColInfo);
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, numOfRows, (const char *)fqdn, false));
// status
const char *pStatusName = mndConsumerStatusName(pConsumer->status);
status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);

View File

@ -325,6 +325,8 @@ int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
if (pConsumer->rebNewTopics == NULL){
@ -429,6 +431,8 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
tlen += taosEncodeString(buf, pConsumer->user);
tlen += taosEncodeString(buf, pConsumer->fqdn);
return tlen;
}
@ -503,6 +507,8 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
if (sver > 2){
buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
buf = taosDecodeStringTo(buf, pConsumer->user);
buf = taosDecodeStringTo(buf, pConsumer->fqdn);
} else{
pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;

View File

@ -1330,8 +1330,8 @@ END:
TAOS_RETURN(code);
}
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *topic,
const char *cgroup, SArray *vgs, SArray *offsetRows) {
static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char* user, const char* fqdn,
const char *topic, const char *cgroup, SArray *vgs, SArray *offsetRows) {
int32_t code = 0;
int32_t sz = taosArrayGetSize(vgs);
for (int32_t j = 0; j < sz; j++) {
@ -1355,7 +1355,7 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t cons
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false));
// consumer id
char consumerIdHex[32] = {0};
char consumerIdHex[TSDB_CONSUMER_ID_LEN] = {0};
(void)sprintf(varDataVal(consumerIdHex), "0x%" PRIx64, consumerId);
varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex)));
@ -1363,6 +1363,18 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t cons
MND_TMQ_NULL_CHECK(pColInfo);
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1));
char userStr[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
if (user) STR_TO_VARSTR(userStr, user);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
MND_TMQ_NULL_CHECK(pColInfo);
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, userStr, user == NULL));
char fqdnStr[TSDB_FQDN_LEN + VARSTR_HEADER_SIZE] = {0};
if (fqdn) STR_TO_VARSTR(fqdnStr, fqdn);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
MND_TMQ_NULL_CHECK(pColInfo);
MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, fqdnStr, fqdn == NULL));
mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId,
varDataVal(cgroup), pVgEp->vgId);
@ -1435,16 +1447,25 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
SMqConsumerEp *pConsumerEp = NULL;
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break;
pConsumerEp = (SMqConsumerEp *)pIter;
MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs,
char *user = NULL;
char *fqdn = NULL;
SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &pConsumerEp->consumerId);
if (pConsumer != NULL) {
user = pConsumer->user;
fqdn = pConsumer->fqdn;
sdbRelease(pSdb, pConsumer);
}
MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, user, fqdn, topic, cgroup, pConsumerEp->vgs,
pConsumerEp->offsetRows));
}
MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
MND_TMQ_RETURN_CHECK(buildResult(pBlock, &numOfRows, -1, NULL, NULL, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows));
pBlock->info.rows = numOfRows;

View File

@ -133,9 +133,9 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
int32_t numOfNotFillCols, const struct SNodeListNode* val);
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
void taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId,
int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo);
int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId,
int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo);
void* taosDestroyFillInfo(struct SFillInfo* pFillInfo);
int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);

View File

@ -405,8 +405,12 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
// STimeWindow w = {0};
// getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC);
pInfo->pFillInfo = NULL;
taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo);
int32_t code = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
}
if (order == TSDB_ORDER_ASC) {
pInfo->win.skey = win.skey;

View File

@ -536,9 +536,10 @@ static int32_t createTableCacheVal(const SMetaReader* pMetaReader, STableCachedV
int32_t lino = 0;
STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
QUERY_CHECK_NULL(pVal, code, lino, _end, terrno);
pVal->pTags = NULL;
pVal->pName = taosStrdup(pMetaReader->me.name);
QUERY_CHECK_NULL(pVal->pName, code, lino, _end, terrno);
pVal->pTags = NULL;
// only child table has tag value
if (pMetaReader->me.type == TSDB_CHILD_TABLE) {

View File

@ -523,14 +523,14 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
return pFillInfo->numOfRows - pFillInfo->index;
}
void taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t primaryTsSlotId,
int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo) {
int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t primaryTsSlotId,
int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (fillType == TSDB_FILL_NONE) {
(*ppFillInfo) = NULL;
return;
return code;
}
SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo));
@ -572,10 +572,9 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pFillInfo->next.pRowVal);
taosArrayDestroy(pFillInfo->prev.pRowVal);
terrno = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppFillInfo) = pFillInfo;
return code;
}
void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {

View File

@ -1230,7 +1230,7 @@ void destroyIntervalOperatorInfo(void* param) {
}
static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
SExecTaskInfo* pTaskInfo) {
bool* pRes) {
// the primary timestamp column
bool needed = false;
int32_t code = TSDB_CODE_SUCCESS;
@ -1298,10 +1298,9 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return needed;
*pRes = needed;
return code;
}
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
@ -1390,7 +1389,10 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, pTaskInfo);
pInfo->timeWindowInterpo = false;
code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
QUERY_CHECK_CODE(code, lino, _error);
if (pInfo->timeWindowInterpo) {
pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
@ -2085,7 +2087,9 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
QUERY_CHECK_CODE(code, lino, _error);
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, pTaskInfo);
iaInfo->timeWindowInterpo = false;
code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
QUERY_CHECK_CODE(code, lino, _error);
if (iaInfo->timeWindowInterpo) {
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
}
@ -2416,7 +2420,9 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva
code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
QUERY_CHECK_CODE(code, lino, _error);
pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, pTaskInfo);
pIntervalInfo->timeWindowInterpo = false;
code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
QUERY_CHECK_CODE(code, lino, _error);
if (pIntervalInfo->timeWindowInterpo) {
pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {

View File

@ -222,10 +222,10 @@ class TDTestCase:
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdLog.info(len(tdSql.queryResult))
tdSql.checkEqual(True, len(tdSql.queryResult) in range(261, 269))
tdSql.checkEqual(True, len(tdSql.queryResult) in range(261, 271))
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(54, len(tdSql.queryResult))
tdSql.checkEqual(56, len(tdSql.queryResult))
def ins_dnodes_check(self):
tdSql.execute('drop database if exists db2')

View File

@ -85,7 +85,7 @@ class TDTestCase:
time.sleep(4) # wait for heart beat
tdSql.query(f'show subscriptions;')
sub = tdSql.getData(0, 4);
sub = tdSql.getData(0, 6);
print(sub)
if not sub.startswith("tsdb"):
tdLog.exit(f"show subscriptions error")
@ -196,7 +196,7 @@ class TDTestCase:
time.sleep(4) # wait for heart beat
tdSql.query(f'show subscriptions;')
sub = tdSql.getData(0, 4);
sub = tdSql.getData(0, 6);
print(sub)
if not sub.startswith("tsdb"):
tdLog.exit(f"show subscriptions error")
@ -306,7 +306,7 @@ class TDTestCase:
time.sleep(4) # wait for heart beat
tdSql.query(f'show subscriptions;')
sub = tdSql.getData(0, 4);
sub = tdSql.getData(0, 6);
print(sub)
if not sub.startswith("tsdb"):
tdLog.exit(f"show subscriptions error")
@ -416,7 +416,7 @@ class TDTestCase:
time.sleep(4) # wait for heart beat
tdSql.query(f'show subscriptions;')
sub = tdSql.getData(0, 4);
sub = tdSql.getData(0, 6);
print(sub)
if not sub.startswith("tsdb"):
tdLog.exit(f"show subscriptions error")
@ -517,7 +517,7 @@ class TDTestCase:
consumer.close()
tdSql.query(f'show subscriptions;')
sub = tdSql.getData(0, 4);
sub = tdSql.getData(0, 6);
print(sub)
if not sub.startswith("tsdb"):
tdLog.exit(f"show subscriptions error")

View File

@ -598,12 +598,12 @@ class TDTestCase:
tdSql.query(f'show consumers')
tdSql.checkRows(1)
tdSql.checkData(0, 1, 'g1')
tdSql.checkData(0, 4, 't2')
tdSql.checkData(0, 6, 't2')
tdSql.execute(f'drop consumer group g1 on t1')
tdSql.query(f'show consumers')
tdSql.checkRows(1)
tdSql.checkData(0, 1, 'g1')
tdSql.checkData(0, 4, 't2')
tdSql.checkData(0, 6, 't2')
tdSql.query(f'show subscriptions')
tdSql.checkRows(1)
@ -641,7 +641,7 @@ class TDTestCase:
tdSql.query(f'show consumers')
tdSql.checkRows(1)
tdSql.checkData(0, 1, 'g1')
tdSql.checkData(0, 4, 't2')
tdSql.checkData(0, 6, 't2')
tdSql.execute(f'insert into t4 using st tags(3) values(now, 1)')
try: