Merge branch '3.0' of github.com:taosdata/TDengine into szhou/fixbug
This commit is contained in:
commit
d7a9f69cb5
|
@ -20,7 +20,7 @@ English | [简体中文](README-CN.md) | We are hiring, check [here](https://tde
|
|||
|
||||
# What is TDengine?
|
||||
|
||||
TDengine is an open source, cloud native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. It enables efficient, real-time data ingestion, processing, and monitoring of TB and even PB scale data per day, generated by billions of sensors and data collectors. TDengine differentiates itself from other TSDBs with the following advantages.:
|
||||
TDengine is an open source, high performance, cloud native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. It enables efficient, real-time data ingestion, processing, and monitoring of TB and even PB scale data per day, generated by billions of sensors and data collectors. TDengine differentiates itself from other TSDBs with the following advantages.:
|
||||
|
||||
- High-Performance: TDengine is the only time-series database to solve the high cardinality issue to support billions of data collection points while out performing other time-series databases for data ingestion, querying and data compression.
|
||||
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
import taos
|
||||
from taos.tmq import *
|
||||
|
||||
conn = taos.connect()
|
||||
|
||||
# create database
|
||||
conn.execute("drop database if exists py_tmq")
|
||||
conn.execute("create database if not exists py_tmq vgroups 2")
|
||||
|
||||
# create table and stables
|
||||
conn.select_db("py_tmq")
|
||||
conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
|
||||
conn.execute("create table if not exists tb1 using stb1 tags(1)")
|
||||
conn.execute("create table if not exists tb2 using stb1 tags(2)")
|
||||
conn.execute("create table if not exists tb3 using stb1 tags(3)")
|
||||
|
||||
# create topic
|
||||
conn.execute("drop topic if exists topic_ctb_column")
|
||||
conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
|
||||
|
||||
# set consumer configure options
|
||||
conf = TaosTmqConf()
|
||||
conf.set("group.id", "tg2")
|
||||
conf.set("td.connect.user", "root")
|
||||
conf.set("td.connect.pass", "taosdata")
|
||||
conf.set("enable.auto.commit", "true")
|
||||
conf.set("msg.with.table.name", "true")
|
||||
|
||||
def tmq_commit_cb_print(tmq, resp, offset, param=None):
|
||||
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
|
||||
|
||||
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
|
||||
|
||||
# build consumer
|
||||
tmq = conf.new_consumer()
|
||||
|
||||
# build topic list
|
||||
topic_list = TaosTmqList()
|
||||
topic_list.append("topic_ctb_column")
|
||||
|
||||
# subscribe consumer
|
||||
tmq.subscribe(topic_list)
|
||||
|
||||
# check subscriptions
|
||||
sub_list = tmq.subscription()
|
||||
print("subscribed topics: ",sub_list)
|
||||
|
||||
# start subscribe
|
||||
while 1:
|
||||
res = tmq.poll(1000)
|
||||
if res:
|
||||
topic = res.get_topic_name()
|
||||
vg = res.get_vgroup_id()
|
||||
db = res.get_db_name()
|
||||
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
|
||||
for row in res:
|
||||
print(row)
|
||||
tb = res.get_table_name()
|
||||
print(f"from table: {tb}")
|
|
@ -46,8 +46,8 @@ apt-get 方式只适用于 Debian 或 Ubuntu 系统
|
|||
</TabItem>
|
||||
<TabItem label="Deb 安装" value="debinst">
|
||||
|
||||
1、从官网下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb;
|
||||
2、进入到 TDengine-server-3.0.0.0-Linux-x64.deb 安装包所在目录,执行如下的安装命令:
|
||||
1. 从 [发布历史页面](../../releases) 下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb;
|
||||
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.deb 安装包所在目录,执行如下的安装命令:
|
||||
|
||||
```bash
|
||||
sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb
|
||||
|
@ -57,8 +57,8 @@ sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb
|
|||
|
||||
<TabItem label="RPM 安装" value="rpminst">
|
||||
|
||||
1、从官网下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm;
|
||||
2、进入到 TDengine-server-3.0.0.0-Linux-x64.rpm 安装包所在目录,执行如下的安装命令:
|
||||
1. 从 [发布历史页面](../../releases) 下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm;
|
||||
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.rpm 安装包所在目录,执行如下的安装命令:
|
||||
|
||||
```bash
|
||||
sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm
|
||||
|
@ -68,8 +68,8 @@ sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm
|
|||
|
||||
<TabItem label="tar.gz 安装" value="tarinst">
|
||||
|
||||
1、从官网下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz;
|
||||
2、进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本:
|
||||
1. 从 [发布历史页面](../../releases) 下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz;
|
||||
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本:
|
||||
|
||||
```bash
|
||||
tar -zxvf TDengine-server-3.0.0.0-Linux-x64.tar.gz
|
||||
|
@ -89,7 +89,9 @@ install.sh 安装脚本在执行过程中,会通过命令行交互界面询问
|
|||
</TabItem>
|
||||
|
||||
<TabItem label="Windows 安装" value="windows">
|
||||
TODO
|
||||
|
||||
1. 从 [发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe;
|
||||
2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
@ -152,14 +154,14 @@ systemctl 命令汇总:
|
|||
|
||||
<TabItem label="Windows 系统" value="windows">
|
||||
|
||||
TODO
|
||||
安装后,在 C:\TDengine 目录下,运行 taosd.exe 来启动 TDengine 服务进程。
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
## TDengine 命令行 (CLI)
|
||||
|
||||
为便于检查 TDengine 的状态,执行数据库 (Database) 的各种即席(Ad Hoc)查询,TDengine 提供一命令行应用程序(以下简称为 TDengine CLI) taos。要进入 TDengine 命令行,您只要在安装有 TDengine 的 Linux 终端执行 `taos` 即可。
|
||||
为便于检查 TDengine 的状态,执行数据库 (Database) 的各种即席(Ad Hoc)查询,TDengine 提供一命令行应用程序(以下简称为 TDengine CLI) taos。要进入 TDengine 命令行,您只要在安装有 TDengine 的 Linux 终端执行 `taos` 即可,也可以在安装有 TDengine 的 Windows 终端的 C:\TDengine 目录下,运行 taos.exe 来启动 TDengine 命令行。
|
||||
|
||||
```bash
|
||||
taos
|
||||
|
|
|
@ -4,6 +4,9 @@ description: "数据订阅与推送服务。写入到 TDengine 中的时序数
|
|||
title: 数据订阅
|
||||
---
|
||||
|
||||
import Tabs from "@theme/Tabs";
|
||||
import TabItem from "@theme/TabItem";
|
||||
|
||||
为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
|
||||
|
||||
与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。
|
||||
|
@ -51,7 +54,7 @@ DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
|
|||
DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
|
||||
```
|
||||
|
||||
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码可以在 [tmq.c](https://github.com/taosdata/TDengine/blob/3.0/examples/c/tmq.c) 看到。
|
||||
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面C语言的示例代码。
|
||||
|
||||
## 写入数据
|
||||
|
||||
|
@ -63,12 +66,8 @@ create database tmqdb;
|
|||
create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16) tags(t1 int, t3 varchar(16));
|
||||
create table tmqdb.ctb0 using tmqdb.stb tags(0, "subtable0");
|
||||
create table tmqdb.ctb1 using tmqdb.stb tags(1, "subtable1");
|
||||
create table tmqdb.ctb2 using tmqdb.stb tags(2, "subtable2");
|
||||
create table tmqdb.ctb3 using tmqdb.stb tags(3, "subtable3");
|
||||
insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
|
||||
insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
|
||||
insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22');
|
||||
insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33');
|
||||
```
|
||||
|
||||
## 创建topic:
|
||||
|
@ -130,7 +129,6 @@ TMQ支持多种订阅类型:
|
|||
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
tmq_conf_destroy(conf);
|
||||
return tmq;
|
||||
```
|
||||
|
||||
上述配置中包括consumer group ID,如果多个 consumer 指定的 consumer group ID一样,则自动形成一个consumer group,共享消费进度。
|
||||
|
@ -143,67 +141,24 @@ TMQ支持多种订阅类型:
|
|||
```sql
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
tmq_list_append(topicList, "topicName");
|
||||
return topicList;
|
||||
```
|
||||
|
||||
## 启动订阅并开始消费
|
||||
|
||||
```sql
|
||||
```
|
||||
/* 启动订阅 */
|
||||
tmq_subscribe(tmq, topicList);
|
||||
tmq_list_destroy(topicList);
|
||||
|
||||
/* 循环poll消息 */
|
||||
int32_t totalRows = 0;
|
||||
int32_t msgCnt = 0;
|
||||
int32_t timeOut = 5000;
|
||||
while (running) {
|
||||
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeOut);
|
||||
if (tmqmsg) {
|
||||
msgCnt++;
|
||||
totalRows += msg_process(tmqmsg);
|
||||
taos_free_result(tmqmsg);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
```
|
||||
|
||||
这里是一个 **while** 循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析:
|
||||
|
||||
```sql
|
||||
static int32_t msg_process(TAOS_RES* msg) {
|
||||
char buf[1024];
|
||||
int32_t rows = 0;
|
||||
|
||||
const char* topicName = tmq_get_topic_name(msg);
|
||||
const char* dbName = tmq_get_db_name(msg);
|
||||
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
||||
|
||||
printf("topic: %s\n", topicName);
|
||||
printf("db: %s\n", dbName);
|
||||
printf("vgroup id: %d\n", vgroupId);
|
||||
|
||||
while (1) {
|
||||
TAOS_ROW row = taos_fetch_row(msg);
|
||||
if (row == NULL) break;
|
||||
|
||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||
int32_t numOfFields = taos_field_count(msg);
|
||||
int32_t* length = taos_fetch_lengths(msg);
|
||||
int32_t precision = taos_result_precision(msg);
|
||||
const char* tbName = tmq_get_table_name(msg);
|
||||
rows++;
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
printf("row content from %s: %s\n", (tbName != NULL ? tbName : "null table"), buf);
|
||||
}
|
||||
|
||||
return rows;
|
||||
msg_process(tmqmsg);
|
||||
}
|
||||
```
|
||||
|
||||
这里是一个 **while** 循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析。
|
||||
|
||||
## 结束消费
|
||||
|
||||
```sql
|
||||
|
@ -243,4 +198,45 @@ TMQ支持多种订阅类型:
|
|||
show subscriptions;
|
||||
```
|
||||
|
||||
## 示例代码
|
||||
|
||||
本节展示各种语言的示例代码。
|
||||
|
||||
<Tabs>
|
||||
<TabItem label="C" value="c">
|
||||
|
||||
```c
|
||||
{{#include examples/c/tmq.c}}
|
||||
```
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Java" value="java">
|
||||
|
||||
TODO
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Go" value="Go">
|
||||
TODO
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Rust" value="Rust">
|
||||
TODO
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Python" value="Python">
|
||||
|
||||
```python
|
||||
{{#include docs/examples/python/tmq_example.py}}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Node.JS" value="Node.JS">
|
||||
TODO
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="C#" value="C#">
|
||||
TODO
|
||||
</TabItem>
|
||||
|
||||
</Tabs>
|
||||
|
|
|
@ -358,7 +358,7 @@ JDBC 连接器可能报错的错误码包括 3 种:JDBC driver 本身的报错
|
|||
具体的错误码请参考:
|
||||
|
||||
- [TDengine Java Connector](https://github.com/taosdata/taos-connector-jdbc/blob/main/src/main/java/com/taosdata/jdbc/TSDBErrorNumbers.java)
|
||||
- [TDengine_ERROR_CODE](../error-code)
|
||||
<!-- - [TDengine_ERROR_CODE](../error-code) -->
|
||||
|
||||
### 通过参数绑定写入数据
|
||||
|
||||
|
|
|
@ -86,7 +86,7 @@ TDengine is removed successfully!
|
|||
|
||||
</TabItem>
|
||||
<TabItem label="Windows 卸载" value="windows">
|
||||
TODO
|
||||
在 C:\TDengine 目录下,通过运行 unins000.exe 卸载程序来卸载 TDengine。
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
|
|
|
@ -87,10 +87,11 @@ typedef struct {
|
|||
typedef struct {
|
||||
tsem_t syncSem;
|
||||
int64_t sync;
|
||||
bool standby;
|
||||
SReplica replica;
|
||||
int32_t errCode;
|
||||
int32_t transId;
|
||||
SRWLatch lock;
|
||||
int8_t standby;
|
||||
int8_t leaderTransferFinish;
|
||||
} SSyncMgmt;
|
||||
|
||||
|
|
|
@ -238,7 +238,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
|
|||
} else {
|
||||
memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1);
|
||||
}
|
||||
if (mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) {
|
||||
if (retrieveReq.db[0] && mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -60,15 +60,19 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
|
|||
sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex);
|
||||
}
|
||||
|
||||
taosRLockLatch(&pMgmt->lock);
|
||||
if (transId <= 0) {
|
||||
taosRUnLockLatch(&pMgmt->lock);
|
||||
mError("trans:%d, invalid commit msg", transId);
|
||||
} else if (transId == pMgmt->transId) {
|
||||
taosRUnLockLatch(&pMgmt->lock);
|
||||
if (pMgmt->errCode != 0) {
|
||||
mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
|
||||
}
|
||||
pMgmt->transId = 0;
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
} else {
|
||||
taosRUnLockLatch(&pMgmt->lock);
|
||||
STrans *pTrans = mndAcquireTrans(pMnode, transId);
|
||||
if (pTrans != NULL) {
|
||||
mDebug("trans:%d, execute in mnode which not leader", transId);
|
||||
|
@ -115,6 +119,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
|
|||
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
|
||||
cbMeta.code, cbMeta.index, cbMeta.term);
|
||||
|
||||
taosWLockLatch(&pMgmt->lock);
|
||||
if (pMgmt->transId == -1) {
|
||||
if (pMgmt->errCode != 0) {
|
||||
mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode));
|
||||
|
@ -122,6 +127,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
|
|||
pMgmt->transId = 0;
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
}
|
||||
taosWUnLockLatch(&pMgmt->lock);
|
||||
}
|
||||
|
||||
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
|
||||
|
@ -170,12 +176,24 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) {
|
|||
SMnode *pMnode = pFsm->data;
|
||||
mDebug("vgId:1, become follower");
|
||||
|
||||
// clear old leader resource
|
||||
taosWLockLatch(&pMnode->syncMgmt.lock);
|
||||
if (pMnode->syncMgmt.transId != 0) {
|
||||
pMnode->syncMgmt.transId = 0;
|
||||
tsem_post(&pMnode->syncMgmt.syncSem);
|
||||
}
|
||||
taosWUnLockLatch(&pMnode->syncMgmt.lock);
|
||||
}
|
||||
|
||||
static void mndBecomeLeader(struct SSyncFSM *pFsm) {
|
||||
SMnode *pMnode = pFsm->data;
|
||||
mDebug("vgId:1, become leader");
|
||||
SMnode *pMnode = pFsm->data;
|
||||
|
||||
taosWLockLatch(&pMnode->syncMgmt.lock);
|
||||
if (pMnode->syncMgmt.transId != 0) {
|
||||
pMnode->syncMgmt.transId = 0;
|
||||
tsem_post(&pMnode->syncMgmt.syncSem);
|
||||
}
|
||||
taosWUnLockLatch(&pMnode->syncMgmt.lock);
|
||||
}
|
||||
|
||||
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||
|
@ -202,6 +220,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
|||
|
||||
int32_t mndInitSync(SMnode *pMnode) {
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
taosInitRWLatch(&pMgmt->lock);
|
||||
pMgmt->transId = 0;
|
||||
|
||||
SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
|
||||
|
@ -254,11 +274,14 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
|||
memcpy(req.pCont, pRaw, req.contLen);
|
||||
|
||||
pMgmt->errCode = 0;
|
||||
taosWLockLatch(&pMgmt->lock);
|
||||
pMgmt->transId = transId;
|
||||
taosWUnLockLatch(&pMgmt->lock);
|
||||
mTrace("trans:%d, will be proposed", pMgmt->transId);
|
||||
|
||||
const bool isWeak = false;
|
||||
int32_t code = syncPropose(pMgmt->sync, &req, isWeak);
|
||||
|
||||
if (code == 0) {
|
||||
tsem_wait(&pMgmt->syncSem);
|
||||
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
||||
|
@ -286,10 +309,12 @@ void mndSyncStart(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
void mndSyncStop(SMnode *pMnode) {
|
||||
taosWLockLatch(&pMnode->syncMgmt.lock);
|
||||
if (pMnode->syncMgmt.transId != 0) {
|
||||
pMnode->syncMgmt.transId = 0;
|
||||
tsem_post(&pMnode->syncMgmt.syncSem);
|
||||
}
|
||||
taosWUnLockLatch(&pMnode->syncMgmt.lock);
|
||||
}
|
||||
|
||||
bool mndIsMaster(SMnode *pMnode) {
|
||||
|
|
|
@ -308,7 +308,8 @@ struct SVnode {
|
|||
SSink* pSink;
|
||||
tsem_t canCommit;
|
||||
int64_t sync;
|
||||
int32_t blockCount;
|
||||
SRWLatch lock;
|
||||
bool blocked;
|
||||
bool restored;
|
||||
tsem_t syncSem;
|
||||
SQHandle* pQuery;
|
||||
|
|
|
@ -85,7 +85,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
|||
pVnode->state.commitTerm = info.state.commitTerm;
|
||||
pVnode->pTfs = pTfs;
|
||||
pVnode->msgCb = msgCb;
|
||||
pVnode->blockCount = 0;
|
||||
taosInitRWLatch(&pVnode->lock);
|
||||
pVnode->blocked = false;
|
||||
|
||||
tsem_init(&pVnode->syncSem, 0, 0);
|
||||
tsem_init(&(pVnode->canCommit), 0, 1);
|
||||
|
|
|
@ -28,20 +28,28 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
|
|||
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
||||
if (vnodeIsMsgBlock(pMsg->msgType)) {
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
taosWLockLatch(&pVnode->lock);
|
||||
if (!pVnode->blocked) {
|
||||
vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||
pVnode->blockCount = 1;
|
||||
pVnode->blocked = true;
|
||||
taosWUnLockLatch(&pVnode->lock);
|
||||
tsem_wait(&pVnode->syncSem);
|
||||
} else {
|
||||
taosWUnLockLatch(&pVnode->lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
||||
if (vnodeIsMsgBlock(pMsg->msgType)) {
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
if (pVnode->blockCount) {
|
||||
taosWLockLatch(&pVnode->lock);
|
||||
if (pVnode->blocked) {
|
||||
vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||
pVnode->blockCount = 0;
|
||||
pVnode->blocked = false;
|
||||
tsem_post(&pVnode->syncSem);
|
||||
}
|
||||
taosWUnLockLatch(&pVnode->lock);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -677,6 +685,12 @@ static void vnodeBecomeFollower(struct SSyncFSM *pFsm) {
|
|||
vDebug("vgId:%d, become follower", pVnode->config.vgId);
|
||||
|
||||
// clear old leader resource
|
||||
taosWLockLatch(&pVnode->lock);
|
||||
if (pVnode->blocked) {
|
||||
pVnode->blocked = false;
|
||||
tsem_post(&pVnode->syncSem);
|
||||
}
|
||||
taosWUnLockLatch(&pVnode->lock);
|
||||
}
|
||||
|
||||
static void vnodeBecomeLeader(struct SSyncFSM *pFsm) {
|
||||
|
|
|
@ -581,6 +581,20 @@ _return:
|
|||
}
|
||||
|
||||
int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass) {
|
||||
char *p = strchr(dbFName, '.');
|
||||
if (p) {
|
||||
++p;
|
||||
} else {
|
||||
p = dbFName;
|
||||
}
|
||||
|
||||
if (IS_SYS_DBNAME(p)) {
|
||||
*inCache = true;
|
||||
*pass = true;
|
||||
ctgDebug("sysdb %s, pass", dbFName);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, user, strlen(user));
|
||||
if (NULL == pUser) {
|
||||
ctgDebug("user not in cache, user:%s", user);
|
||||
|
|
|
@ -4254,8 +4254,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
ASSERT(0);
|
||||
}
|
||||
taosMemoryFree(ops);
|
||||
|
||||
pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
|
||||
if (pOptr) pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
|
||||
return pOptr;
|
||||
}
|
||||
|
||||
|
|
|
@ -2037,7 +2037,17 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName,
|
|||
code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) {
|
||||
if (TSDB_CODE_SUCCESS == code &&
|
||||
0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
|
||||
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) &&
|
||||
isSelectStmt(pCxt->pCurrStmt) &&
|
||||
0 == taosArrayGetSize(vgroupList)) {
|
||||
((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code &&
|
||||
0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
|
||||
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) {
|
||||
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
|
||||
}
|
||||
|
||||
|
|
|
@ -210,6 +210,8 @@ if $rows != 3 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql_error select * from performance_schema.PERF_OFFSETS;
|
||||
|
||||
sql show create stable stb;
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
|
|
|
@ -45,16 +45,16 @@ sql_error drop database db
|
|||
sql_error use db
|
||||
sql_error alter database db replica 1;
|
||||
sql_error show db.vgroups
|
||||
sql_error select * from information_schema.ins_stables where db_name = 'db'
|
||||
sql_error select * from information_schema.ins_tables where db_name = 'db'
|
||||
sql select * from information_schema.ins_stables where db_name = 'db'
|
||||
sql select * from information_schema.ins_tables where db_name = 'db'
|
||||
|
||||
print =============== check show
|
||||
sql_error select * from information_schema.ins_users
|
||||
sql select * from information_schema.ins_users
|
||||
sql_error show cluster
|
||||
sql_error select * from information_schema.ins_dnodes
|
||||
sql_error select * from information_schema.ins_mnodes
|
||||
sql select * from information_schema.ins_dnodes
|
||||
sql select * from information_schema.ins_mnodes
|
||||
sql_error show snodes
|
||||
sql_error select * from information_schema.ins_qnodes
|
||||
sql select * from information_schema.ins_qnodes
|
||||
sql_error show bnodes
|
||||
sql_error show grants
|
||||
sql_error show dnode 1 variables;
|
||||
|
|
Loading…
Reference in New Issue