Merge remote-tracking branch 'origin/main' into fix/m23.1

This commit is contained in:
dapan1121 2023-08-30 11:03:33 +08:00
commit f6321d0c4f
26 changed files with 681 additions and 209 deletions

View File

@ -19,6 +19,9 @@ index_option:
functions:
function [, function] ...
```
### tag Indexing
[tag index](../tag-index)
### SMA Indexing

View File

@ -20,6 +20,9 @@ index_option:
functions:
function [, function] ...
```
### tag 索引
[tag 索引](../tag-index)
### SMA 索引

437
docs/zh/20-third-party/70-seeq.md vendored Normal file
View File

@ -0,0 +1,437 @@
---
sidebar_label: Seeq
title: Seeq
description: 如何使用 Seeq 和 TDengine 进行时序数据分析
---
# 如何使用 Seeq 和 TDengine 进行时序数据分析
## 方案介绍
Seeq 是制造业和工业互联网IIOT高级分析软件。Seeq 支持在工艺制造组织中使用机器学习创新的新功能。这些功能使组织能够将自己或第三方机器学习算法部署到前线流程工程师和主题专家使用的高级分析应用程序,从而使单个数据科学家的努力扩展到许多前线员工。
通过 TDengine Java connector Seeq 可以轻松支持查询 TDengine 提供的时序数据,并提供数据展现、分析、预测等功能。
### Seeq 安装方法
从 (Seeq 官网)[https://www.seeq.com/customer-download]下载相关软件,例如 Seeq Server 和 Seeq Data Lab 等。
### Seeq Server 安装和启动
```
tar xvzf seeq-server-xxx.tar.gz
cd seeq-server-installer
sudo ./install
sudo seeq service enable
sudo seeq start
```
### Seeq Data Lab Server 安装和启动
Seeq Data Lab 需要安装在和 Seeq Server 不同的服务器上,并通过配置和 Seeq Server 互联。详细安装配置指令参见(Seeq 官方文档)[https://support.seeq.com/space/KB/1034059842]。
```
tar xvf seeq-data-lab-<version>-64bit-linux.tar.gz
sudo seeq-data-lab-installer/install -f /opt/seeq/seeq-data-lab -g /var/opt/seeq -u seeq
sudo seeq config set Network/DataLab/Hostname localhost
sudo seeq config set Network/DataLab/Port 34231 # the port of the Data Lab server (usually 34231)
sudo seeq config set Network/Hostname <value> # the host IP or URL of the main Seeq Server
# If the main Seeq server is configured to listen over HTTPS
sudo seeq config set Network/Webserver/SecurePort 443 # the secure port of the main Seeq Server (usually 443)
# If the main Seeq server is NOT configured to listen over HTTPS
sudo seeq config set Network/Webserver/Port <value>
#On the main Seeq server, open a Seeq Command Prompt and set the hostname of the Data Lab server:
sudo seeq config set Network/DataLab/Hostname <value> # the host IP (not URL) of the Data Lab server
sudo seeq config set Network/DataLab/Port 34231 # the port of the Data Lab server (usually 34231
```
## TDengine 本地实例安装方法
请参考(官网文档)[https://docs.taosdata.com/get-started/package/]。
## TDengine Cloud 访问方法
如果使用 Seeq 连接 TDengine Cloud请在 https://cloud.taosdata.com 申请帐号并登录查看如何访问 TDengine Cloud。
## 如何配置 Seeq 访问 TDengine
1. 查看 data 存储位置
```
sudo seeq config get Folders/Data
```
2. 从 maven.org 下载 TDengine Java connector 包,目前最新版本为(3.2.4)[https://repo1.maven.org/maven2/com/taosdata/jdbc/taos-jdbcdriver/3.2.4/taos-jdbcdriver-3.2.4-dist.jar],并拷贝至 data 存储位置的 plugins\lib 中。
3. 重新启动 seeq server
```
sudo seeq restart
```
4. 输入 License
使用浏览器访问 ip:34216 并按照说明输入 license。
## 使用 Seeq 分析 TDengine 时序数据
本章节演示如何使用 Seeq 软件配合 TDengine 进行时序数据分析。
### 场景介绍
示例场景为一个电力系统,用户每天从电站仪表收集用电量数据,并将其存储在 TDengine 集群中。现在用户想要预测电力消耗将会如何发展,并购买更多设备来支持它。用户电力消耗随着每月订单变化而不同,另外考虑到季节变化,电力消耗量会有所不同。这个城市位于北半球,所以在夏天会使用更多的电力。我们模拟数据来反映这些假定。
### 数据 Schema
```
CREATE STABLE meters (ts TIMESTAMP, num INT, temperature FLOAT, goods INT) TAGS (device NCHAR(20));
CREATE TABLE goods (ts1 TIMESTAMP, ts2 TIMESTAMP, goods FLOAT);
```
!(Seeq demo schema)[./seeq/seeq-demo-schema.webp]
### 构造数据方法
```
python mockdata.py
taos -s "insert into power.goods select _wstart, _wstart + 10d, avg(goods) from power.meters interval(10d);"
```
源代码托管在(github 仓库)[https://github.com/sangshuduo/td-forecasting]。
### 使用 Seeq 进行数据分析
#### 配置数据源Data Source
使用 Seeq 管理员角色的帐号登录,并新建数据源。
- Power
```
{
"QueryDefinitions": [
{
"Name": "PowerNum",
"Type": "SIGNAL",
"Sql": "SELECT ts, num FROM meters",
"Enabled": true,
"TestMode": false,
"TestQueriesDuringSync": true,
"InProgressCapsulesEnabled": false,
"Variables": null,
"Properties": [
{
"Name": "Name",
"Value": "Num",
"Sql": null,
"Uom": "string"
},
{
"Name": "Interpolation Method",
"Value": "linear",
"Sql": null,
"Uom": "string"
},
{
"Name": "Maximum Interpolation",
"Value": "2day",
"Sql": null,
"Uom": "string"
}
],
"CapsuleProperties": null
}
],
"Type": "GENERIC",
"Hostname": null,
"Port": 0,
"DatabaseName": null,
"Username": "root",
"Password": "taosdata",
"InitialSql": null,
"TimeZone": null,
"PrintRows": false,
"UseWindowsAuth": false,
"SqlFetchBatchSize": 100000,
"UseSSL": false,
"JdbcProperties": null,
"GenericDatabaseConfig": {
"DatabaseJdbcUrl": "jdbc:TAOS-RS://127.0.0.1:6041/power?user=root&password=taosdata",
"SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver",
"ResolutionInNanoseconds": 1000,
"ZonedColumnTypes": []
}
}
```
- Goods
```
{
"QueryDefinitions": [
{
"Name": "PowerGoods",
"Type": "CONDITION",
"Sql": "SELECT ts1, ts2, goods FROM power.goods",
"Enabled": true,
"TestMode": false,
"TestQueriesDuringSync": true,
"InProgressCapsulesEnabled": false,
"Variables": null,
"Properties": [
{
"Name": "Name",
"Value": "Goods",
"Sql": null,
"Uom": "string"
},
{
"Name": "Maximum Duration",
"Value": "10days",
"Sql": null,
"Uom": "string"
}
],
"CapsuleProperties": [
{
"Name": "goods",
"Value": "${columnResult}",
"Column": "goods",
"Uom": "string"
}
]
}
],
"Type": "GENERIC",
"Hostname": null,
"Port": 0,
"DatabaseName": null,
"Username": "root",
"Password": "taosdata",
"InitialSql": null,
"TimeZone": null,
"PrintRows": false,
"UseWindowsAuth": false,
"SqlFetchBatchSize": 100000,
"UseSSL": false,
"JdbcProperties": null,
"GenericDatabaseConfig": {
"DatabaseJdbcUrl": "jdbc:TAOS-RS://127.0.0.1:6041/power?user=root&password=taosdata",
"SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver",
"ResolutionInNanoseconds": 1000,
"ZonedColumnTypes": []
}
}
```
- Temperature
```
{
"QueryDefinitions": [
{
"Name": "PowerNum",
"Type": "SIGNAL",
"Sql": "SELECT ts, temperature FROM meters",
"Enabled": true,
"TestMode": false,
"TestQueriesDuringSync": true,
"InProgressCapsulesEnabled": false,
"Variables": null,
"Properties": [
{
"Name": "Name",
"Value": "Temperature",
"Sql": null,
"Uom": "string"
},
{
"Name": "Interpolation Method",
"Value": "linear",
"Sql": null,
"Uom": "string"
},
{
"Name": "Maximum Interpolation",
"Value": "2day",
"Sql": null,
"Uom": "string"
}
],
"CapsuleProperties": null
}
],
"Type": "GENERIC",
"Hostname": null,
"Port": 0,
"DatabaseName": null,
"Username": "root",
"Password": "taosdata",
"InitialSql": null,
"TimeZone": null,
"PrintRows": false,
"UseWindowsAuth": false,
"SqlFetchBatchSize": 100000,
"UseSSL": false,
"JdbcProperties": null,
"GenericDatabaseConfig": {
"DatabaseJdbcUrl": "jdbc:TAOS-RS://127.0.0.1:6041/power?user=root&password=taosdata",
"SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver",
"ResolutionInNanoseconds": 1000,
"ZonedColumnTypes": []
}
}
```
#### 使用 Seeq Workbench
登录 Seeq 服务页面并新建 Seeq Workbench通过选择数据源搜索结果和根据需要选择不同的工具可以进行数据展现或预测详细使用方法参见(官方知识库)[https://support.seeq.com/space/KB/146440193/Seeq+Workbench]。
!(Seeq Workbench)[./seeq/seeq-demo-workbench.webp]
#### 用 Seeq Data Lab Server 进行进一步的数据分析
登录 Seeq 服务页面并新建 Seeq Data Lab可以进一步使用 Python 编程或其他机器学习工具进行更复杂的数据挖掘功能。
```Python
from seeq import spy
spy.options.compatibility = 189
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import mlforecast
import lightgbm as lgb
from mlforecast.target_transforms import Differences
from sklearn.linear_model import LinearRegression
ds = spy.search({'ID': "8C91A9C7-B6C2-4E18-AAAF-XXXXXXXXX"})
print(ds)
sig = ds.loc[ds['Name'].isin(['Num'])]
print(sig)
data = spy.pull(sig, start='2015-01-01', end='2022-12-31', grid=None)
print("data.info()")
data.info()
print(data)
#data.plot()
print("data[Num].info()")
data['Num'].info()
da = data['Num'].index.tolist()
#print(da)
li = data['Num'].tolist()
#print(li)
data2 = pd.DataFrame()
data2['ds'] = da
print('1st data2 ds info()')
data2['ds'].info()
#data2['ds'] = pd.to_datetime(data2['ds']).to_timestamp()
data2['ds'] = pd.to_datetime(data2['ds']).astype('int64')
data2['y'] = li
print('2nd data2 ds info()')
data2['ds'].info()
print(data2)
data2.insert(0, column = "unique_id", value="unique_id")
print("Forecasting ...")
forecast = mlforecast.MLForecast(
models = lgb.LGBMRegressor(),
freq = 1,
lags=[365],
target_transforms=[Differences([365])],
)
forecast.fit(data2)
predicts = forecast.predict(365)
pd.concat([data2, predicts]).set_index("ds").plot(title = "current data with forecast")
plt.show()
```
运行程序输出结果:
!(Seeq forecast result)[./seeq/seeq-forecast-result.webp]
### 配置 Seeq 数据源连接 TDengine Cloud
配置 Seeq 数据源连接 TDengine Cloud 和连接 TDengine 本地安装实例没有本质的不同,只要登录 TDengine Cloud 后选择“编程 - Java”并拷贝带 token 字符串的 JDBC 填写为 Seeq Data Source 的 DatabaseJdbcUrl 值。
注意使用 TDengine Cloud 时 SQL 命令中需要指定数据库名称。
#### 用 TDengine Cloud 作为数据源的配置内容示例:
```
{
"QueryDefinitions": [
{
"Name": "CloudVoltage",
"Type": "SIGNAL",
"Sql": "SELECT ts, voltage FROM test.meters",
"Enabled": true,
"TestMode": false,
"TestQueriesDuringSync": true,
"InProgressCapsulesEnabled": false,
"Variables": null,
"Properties": [
{
"Name": "Name",
"Value": "Voltage",
"Sql": null,
"Uom": "string"
},
{
"Name": "Interpolation Method",
"Value": "linear",
"Sql": null,
"Uom": "string"
},
{
"Name": "Maximum Interpolation",
"Value": "2day",
"Sql": null,
"Uom": "string"
}
],
"CapsuleProperties": null
}
],
"Type": "GENERIC",
"Hostname": null,
"Port": 0,
"DatabaseName": null,
"Username": "root",
"Password": "taosdata",
"InitialSql": null,
"TimeZone": null,
"PrintRows": false,
"UseWindowsAuth": false,
"SqlFetchBatchSize": 100000,
"UseSSL": false,
"JdbcProperties": null,
"GenericDatabaseConfig": {
"DatabaseJdbcUrl": "jdbc:TAOS-RS://gw.cloud.taosdata.com?useSSL=true&token=41ac9d61d641b6b334e8b76f45f5a8XXXXXXXXXX",
"SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver",
"ResolutionInNanoseconds": 1000,
"ZonedColumnTypes": []
}
}
```
#### TDengine Cloud 作为数据源的 Seeq Workbench 界面示例
!(Seeq workbench with TDengine cloud)[./seeq/seeq-workbench-with-tdengine-cloud.webp]
## 方案总结
通过集成Seeq和TDengine可以充分利用TDengine高效的存储和查询性能同时也可以受益于Seeq提供给用户的强大数据可视化和分析功能。
这种集成使用户能够充分利用TDengine的高性能时序数据存储和检索确保高效处理大量数据。同时Seeq提供高级分析功能如数据可视化、异常检测、相关性分析和预测建模使用户能够获得有价值的洞察并基于数据进行决策。
综合来看Seeq和TDengine共同为制造业、工业物联网和电力系统等各行各业的时序数据分析提供了综合解决方案。高效数据存储和先进的分析相结合赋予用户充分发挥时序数据潜力的能力推动运营改进并支持预测和规划分析应用。

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

View File

@ -83,7 +83,7 @@ void taosRemoveDir(const char *dirname);
bool taosDirExist(const char *dirname);
int32_t taosMkDir(const char *dirname);
int32_t taosMulMkDir(const char *dirname);
int32_t taosMulModeMkDir(const char *dirname, int mode);
int32_t taosMulModeMkDir(const char *dirname, int mode, bool checkAccess);
void taosRemoveOldFiles(const char *dirname, int32_t keepDays);
int32_t taosExpandDir(const char *dirname, char *outname, int32_t maxlen);
int32_t taosRealPath(char *dirname, char *realPath, int32_t maxlen);

View File

@ -1523,7 +1523,7 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
taosSetAllDebugFlag(cfgGetItem(pCfg, "debugFlag")->i32, false);
if (taosMulModeMkDir(tsLogDir, 0777) != 0) {
if (taosMulModeMkDir(tsLogDir, 0777, true) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
printf("failed to create dir:%s since %s", tsLogDir, terrstr());
cfgCleanup(pCfg);

View File

@ -33,7 +33,7 @@ enum {
int32_t mndInitConsumer(SMnode *pMnode);
void mndCleanupConsumer(SMnode *pMnode);
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId);
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* info);
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);

View File

@ -99,6 +99,8 @@ typedef enum {
TRN_CONFLICT_GLOBAL = 1,
TRN_CONFLICT_DB = 2,
TRN_CONFLICT_DB_INSIDE = 3,
TRN_CONFLICT_TOPIC = 4,
TRN_CONFLICT_TOPIC_INSIDE = 5,
} ETrnConflct;
typedef enum {

View File

@ -37,7 +37,6 @@ static const char *mndConsumerStatusName(int status);
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg);
static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
@ -45,7 +44,6 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
@ -64,7 +62,6 @@ int32_t mndInitConsumer(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
@ -76,7 +73,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {}
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* info){
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
if (pClearMsg == NULL) {
mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", consumerId, (int32_t)sizeof(SMqConsumerClearMsg));
@ -85,7 +82,11 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
pClearMsg->consumerId = consumerId;
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)};
.msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR,
.pCont = pClearMsg,
.contLen = sizeof(SMqConsumerClearMsg),
.info = *info,
};
mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
@ -122,48 +123,31 @@ void mndRebCntDec() {
}
}
//static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
// SMnode *pMnode = pMsg->info.node;
// SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
// SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
// if (pConsumer == NULL) {
// return 0;
// }
//
// mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status,
// mndConsumerStatusName(pConsumer->status));
//
// if (pConsumer->status != MQ_CONSUMER_STATUS_READY) {
// mndReleaseConsumer(pMnode, pConsumer);
// return -1;
// }
//
// SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
//
// mndReleaseConsumer(pMnode, pConsumer);
//
// STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm");
// if (pTrans == NULL) {
// goto FAIL;
// }
//
// if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
// goto FAIL;
// }
//
// if (mndTransPrepare(pMnode, pTrans) != 0) {
// goto FAIL;
// }
//
// tDeleteSMqConsumerObj(pConsumerNew, true);
// mndTransDrop(pTrans);
// return 0;
//FAIL:
// tDeleteSMqConsumerObj(pConsumerNew, true);
// mndTransDrop(pTrans);
// return -1;
//}
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
int32_t numOfTopics = taosArrayGetSize(pTopicList);
for (int32_t i = 0; i < numOfTopics; i++) {
char *pOneTopic = taosArrayGetP(pTopicList, i);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
if (pTopic == NULL) { // terrno has been set by callee function
return -1;
}
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
mndReleaseTopic(pMnode, pTopic);
return -1;
}
mndTransSetDbName(pTrans, pOneTopic, NULL);
if(mndTransCheckConflict(pMnode, pTrans) != 0){
mndReleaseTopic(pMnode, pTopic);
return -1;
}
mndReleaseTopic(pMnode, pTopic);
}
return 0;
}
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
@ -188,10 +172,13 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
mndReleaseConsumer(pMnode, pConsumer);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm");
if (pTrans == NULL) {
goto FAIL;
}
if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user) != 0){
goto FAIL;
}
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
@ -221,19 +208,12 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
mndConsumerStatusName(pConsumer->status));
// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
// mndReleaseConsumer(pMnode, pConsumer);
// return -1;
// }
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST;
mndReleaseConsumer(pMnode, pConsumer);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
if (pTrans == NULL) goto FAIL;
// this is the drop action, not the update action
if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
@ -318,7 +298,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
if (status == MQ_CONSUMER_STATUS_READY) {
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
} else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
taosRLockLatch(&pConsumer->lock);
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
@ -333,7 +313,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
}
} else if (status == MQ_CONSUMER_STATUS_LOST) {
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
}
} else { // MQ_CONSUMER_STATUS_REBALANCE
taosRLockLatch(&pConsumer->lock);
@ -410,6 +390,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
.msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
.pCont = pRecoverMsg,
.contLen = sizeof(SMqConsumerRecoverMsg),
.info = pMsg->info,
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
}
@ -484,6 +465,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
.msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
.pCont = pRecoverMsg,
.contLen = sizeof(SMqConsumerRecoverMsg),
.info = pMsg->info,
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
@ -629,27 +611,6 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj
return 0;
}
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) {
int32_t numOfTopics = taosArrayGetSize(pTopicList);
for (int32_t i = 0; i < numOfTopics; i++) {
char *pOneTopic = taosArrayGetP(pTopicList, i);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
if (pTopic == NULL) { // terrno has been set by callee function
return -1;
}
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
mndReleaseTopic(pMnode, pTopic);
return -1;
}
mndReleaseTopic(pMnode, pTopic);
}
return 0;
}
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
static void freeItem(void *param) {
@ -688,12 +649,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
}
// check topic existence
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "subscribe");
if (pTrans == NULL) {
goto _over;
}
code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}
@ -722,7 +683,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
} else {
int32_t status = atomic_load_32(&pExistedConsumer->status);
@ -802,7 +762,6 @@ _over:
tDeleteSMqConsumerObj(pConsumerNew, true);
// TODO: replace with destroy subscribe msg
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
return code;
}

View File

@ -553,13 +553,17 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
}
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
char cgroup[TSDB_CGROUP_LEN] = {0};
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "tmq-reb");
if (pTrans == NULL) {
nodesDestroyNode((SNode*)pPlan);
return -1;
}
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
mndTransSetDbName(pTrans, topic, cgroup);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mndTransDrop(pTrans);
nodesDestroyNode((SNode*)pPlan);
@ -587,10 +591,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
return -1;
}
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
char cgroup[TSDB_CGROUP_LEN] = {0};
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
// 3. commit log: consumer to update status and epoch
// 3.1 set touched consumer
int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers);
@ -802,6 +802,19 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
goto end;
}
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "drop-cgroup");
if (pTrans == NULL) {
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
code = -1;
goto end;
}
mndTransSetDbName(pTrans, dropReq.topic, dropReq.cgroup);
code = mndTransCheckConflict(pMnode, pTrans);
if (code != 0) {
goto end;
}
void *pIter = NULL;
SMqConsumerObj *pConsumer;
while (1) {
@ -811,18 +824,11 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
}
if (strcmp(dropReq.cgroup, pConsumer->cgroup) == 0) {
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
}
sdbRelease(pMnode->pSdb, pConsumer);
}
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
if (pTrans == NULL) {
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
code = -1;
goto end;
}
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
@ -1019,8 +1025,8 @@ int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) {
if (pIter == NULL) break;
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
char cgroup[TSDB_CGROUP_LEN] = {0};
mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
if (strcmp(topic, topicName) != 0) {
sdbRelease(pSdb, pSub);
@ -1084,7 +1090,6 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
}
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
int32_t code = -1;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
@ -1093,8 +1098,8 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
if (pIter == NULL) break;
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
char cgroup[TSDB_CGROUP_LEN] = {0};
mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
if (strcmp(topic, topicName) != 0) {
sdbRelease(pSdb, pSub);
@ -1132,15 +1137,13 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
sdbRelease(pSdb, pSub);
sdbCancelFetch(pSdb, pIter);
goto END;
return -1;
}
sdbRelease(pSdb, pSub);
}
code = 0;
END:
return code;
return 0;
}
static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t consumerId, const char* topic, const char* cgroup, SArray* vgs, SArray *offsetRows){

View File

@ -382,14 +382,29 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
int32_t code = -1;
SNode *pAst = NULL;
SQueryPlan *pPlan = NULL;
SMqTopicObj topicObj = {0};
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "create-topic");
if (pTrans == NULL) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
code = -1;
goto _OUT;
}
mndTransSetDbName(pTrans, pCreate->name, NULL);
code = mndTransCheckConflict(pMnode, pTrans);
if (code != 0) {
goto _OUT;
}
mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name);
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN);
if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj) != 0) {
return -1;
code = mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj);
if (code != 0) {
goto _OUT;
}
topicObj.createTime = taosGetTimestampMs();
@ -406,6 +421,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if (pCreate->withMeta) {
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
code = terrno;
goto _OUT;
}
@ -414,13 +430,15 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast);
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
code = nodesStringToNode(pCreate->ast, &pAst);
if (code != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
goto _OUT;
}
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
if (code != 0) {
mError("failed to create topic:%s since %s", pCreate->name, terrstr());
goto _OUT;
}
@ -428,6 +446,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
if (topicObj.ntbColIds == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
goto _OUT;
}
@ -438,12 +457,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.ntbColIds = NULL;
}
if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
code = qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema);
if (code != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
goto _OUT;
}
if (nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
code = nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL);
if (code != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
goto _OUT;
}
@ -451,6 +472,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
if (pStb == NULL) {
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
code = terrno;
goto _OUT;
}
@ -470,21 +492,10 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
/*topicObj.withTbName = 1;*/
/*topicObj.withSchema = 1;*/
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-topic");
if (pTrans == NULL) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
goto _OUT;
}
mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
goto _OUT;
}
mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name);
SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
code = -1;
goto _OUT;
}
@ -510,7 +521,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
// encoder check alter info
int32_t len;
int32_t code;
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
if (code < 0) {
sdbRelease(pSdb, pVgroup);
@ -525,6 +535,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
code = -1;
goto _OUT;
}
tEncoderClear(&encoder);
@ -539,6 +550,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
code = -1;
goto _OUT;
}
buf = NULL;
@ -548,6 +560,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
code = -1;
goto _OUT;
}
@ -664,16 +677,19 @@ _OVER:
}
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SMDropTopicReq dropReq = {0};
int32_t code = 0;
SMqTopicObj *pTopic = NULL;
STrans *pTrans = NULL;
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
pTopic = mndAcquireTopic(pMnode, dropReq.name);
if (pTopic == NULL) {
if (dropReq.igNotExists) {
mInfo("topic:%s, not exist, ignore not exist is set", dropReq.name);
@ -685,9 +701,29 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
}
if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic) != 0) {
mndReleaseTopic(pMnode, pTopic);
return -1;
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "drop-topic");
if (pTrans == NULL) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
code = -1;
goto end;
}
mndTransSetDbName(pTrans, pTopic->name, NULL);
code = mndTransCheckConflict(pMnode, pTrans);
if (code != 0) {
goto end;
}
mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
code = mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic);
if (code != 0) {
goto end;
}
code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db);
if (code != 0) {
goto end;
}
void *pIter = NULL;
@ -698,37 +734,42 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
break;
}
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST){
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
mndReleaseConsumer(pMnode, pConsumer);
continue;
}
bool found = false;
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
if (strcmp(name, pTopic->name) == 0) {
mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic);
sdbCancelFetch(pSdb, pIter);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
return -1;
found = true;
break;
}
}
if (found){
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) {
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pReq->info);
mndReleaseConsumer(pMnode, pConsumer);
continue;
}
mndReleaseConsumer(pMnode, pConsumer);
sdbCancelFetch(pSdb, pIter);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
code = -1;
goto end;
}
sz = taosArrayGetSize(pConsumer->rebNewTopics);
for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(pConsumer->rebNewTopics, i);
if (strcmp(name, pTopic->name) == 0) {
mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic);
sdbCancelFetch(pSdb, pIter);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
return -1;
code = -1;
goto end;
}
}
@ -737,45 +778,22 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i);
if (strcmp(name, pTopic->name) == 0) {
mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic);
sdbCancelFetch(pSdb, pIter);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)",
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
return -1;
code = -1;
goto end;
}
}
sdbRelease(pSdb, pConsumer);
}
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
mndReleaseTopic(pMnode, pTopic);
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "drop-topic");
if (pTrans == NULL) {
code = mndDropSubByTopic(pMnode, pTrans, dropReq.name);
if (code < 0) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
mndReleaseTopic(pMnode, pTopic);
return -1;
}
mndTransSetDbName(pTrans, pTopic->db, NULL);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mndReleaseTopic(pMnode, pTopic);
mndTransDrop(pTrans);
return -1;
}
mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
// TODO check if rebalancing
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
mndTransDrop(pTrans);
mndReleaseTopic(pMnode, pTopic);
return -1;
goto end;
}
if (pTopic->ntbUid != 0) {
@ -801,25 +819,25 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
code = mndTransAppendRedoAction(pTrans, &action);
if (code != 0) {
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
mndReleaseTopic(pMnode, pTopic);
sdbCancelFetch(pSdb, pIter);
mndTransDrop(pTrans);
return -1;
goto end;
}
sdbRelease(pSdb, pVgroup);
}
}
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
end:
mndReleaseTopic(pMnode, pTopic);
mndTransDrop(pTrans);
if (code != 0) {
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
return -1;
return code;
}
char detail[100] = {0};

View File

@ -791,6 +791,22 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
}
}
if (pNew->conflict == TRN_CONFLICT_TOPIC) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 ) conflict = true;
}
}
if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_TOPIC ) {
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 ) conflict = true;
}
if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0) conflict = true;
}
}
if (conflict) {
mError("trans:%d, db:%s stb:%s type:%d, can't execute since conflict with trans:%d db:%s stb:%s type:%d",
pNew->id, pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname,

View File

@ -783,6 +783,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
}
tqUnregisterPushHandle(pTq, pHandle);
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (code != 0) {
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
@ -884,7 +886,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, 0);
tqUnregisterPushHandle(pTq, pHandle);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
}
@ -1201,6 +1202,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
"s-task:%s failed to start scan-history in first stream time window since already started, unexpected "
"sched-status:%d",
id, schedStatus);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
@ -1214,6 +1216,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el,
TASK_SCHED_STATUS__INACTIVE);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}

View File

@ -25,7 +25,7 @@
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
const int32_t* dstSlotIds, void** pRes, const char* idStr) {
int32_t numOfRows = pBlock->info.rows;
bool allNullRow = true;
// bool allNullRow = true;
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
@ -36,7 +36,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
p->ts = pColVal->ts;
p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
allNullRow = p->isNull & allNullRow;
// allNullRow = p->isNull & allNullRow;
if (!p->isNull) {
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
@ -56,7 +56,8 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false);
}
pBlock->info.rows += allNullRow ? 0 : 1;
// pBlock->info.rows += allNullRow ? 0 : 1;
++pBlock->info.rows;
} else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
@ -65,7 +66,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
SColVal* pVal = &pColVal->colVal;
allNullRow = false;
// allNullRow = false;
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
colDataSetNULL(pColInfoData, numOfRows);
@ -80,7 +81,8 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
}
}
pBlock->info.rows += allNullRow ? 0 : 1;
// pBlock->info.rows += allNullRow ? 0 : 1;
++pBlock->info.rows;
} else {
tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr);
return TSDB_CODE_INVALID_PARA;

View File

@ -5608,4 +5608,4 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
pReader->idStr = taosStrdup(idstr);
}
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ }

View File

@ -1268,7 +1268,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
}
// has duplicated ts of different version in this block
pInfo->hasDupTs = (pBlockInfo->record.numRow > pBlockInfo->record.count);
pInfo->hasDupTs = (pBlockInfo->record.numRow > pBlockInfo->record.count) || (pBlockInfo->record.count <= 0);
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pBlockInfo->record, pReader->info.order);
if (hasDataInLastBlock(pLastBlockReader)) {

View File

@ -166,18 +166,20 @@ static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, i
} else if (ekey < pInfo->pFillInfo->start) {
int64_t t = ekey;
SInterval* pInterval = &pInfo->pFillInfo->interval;
int64_t prev = t;
while(1) {
int64_t prev = taosTimeAdd(t, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
if (prev >= pInfo->pFillInfo->start) {
t = prev;
int64_t next = taosTimeAdd(t, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
if (next >= pInfo->pFillInfo->start) {
prev = t;
t = next;
break;
}
t = prev;
prev = t;
t = next;
}
// todo time window chosen problem: t or prev value?
if (t > pInfo->pFillInfo->start) t -= pInterval->sliding;
// todo time window chosen problem: t or next value?
if (t > pInfo->pFillInfo->start) t = prev;
taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, t);
}
}

View File

@ -654,9 +654,12 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
int32_t outputLen = pInputData->varmeta.length * pInput->numOfRows;
char *outputBuf = taosMemoryCalloc(outputLen, 1);
char *output = outputBuf;
int32_t outputLen = pInputData->info.bytes;
char *outputBuf = taosMemoryMalloc(outputLen);
if (outputBuf == NULL) {
qError("substr function memory allocation failure. size: %d", outputLen);
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_s(pInputData, i)) {
@ -676,14 +679,16 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
startPosBytes = TMAX(startPosBytes, 0);
}
char *output = outputBuf;
int32_t resLen = TMIN(subLen, len - startPosBytes);
if (resLen > 0) {
memcpy(varDataVal(output), varDataVal(input) + startPosBytes, resLen);
varDataSetLen(output, resLen);
} else {
varDataSetLen(output, 0);
}
varDataSetLen(output, resLen);
colDataSetVal(pOutputData, i, output, false);
output += varDataTLen(output);
}
pOutput->numOfRows = pInput->numOfRows;

View File

@ -52,7 +52,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(streamPath, 0755);
code = taosMulModeMkDir(streamPath, 0755, false);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
goto _err;
@ -90,7 +90,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "state");
code = taosMulModeMkDir(streamPath, 0755);
code = taosMulModeMkDir(streamPath, 0755, false);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
goto _err;

View File

@ -169,7 +169,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
sscanf(cfg, "%d\n%d\n", &szPage, &pages);
}
} else {
int32_t code = taosMulModeMkDir(statePath, 0755);
int32_t code = taosMulModeMkDir(statePath, 0755, false);
if (code == 0) {
pCfgFile = taosOpenFile(cfgPath, TD_FILE_WRITE | TD_FILE_CREATE);
sprintf(cfg, "%d\n%d\n", szPage, pages);

View File

@ -62,7 +62,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i
}
memset(pDb->pgrHash, 0, tsize);
ret = taosMulModeMkDir(dbname, 0755);
ret = taosMulModeMkDir(dbname, 0755, false);
if (ret < 0) {
return -1;
}

View File

@ -193,7 +193,7 @@ int32_t taosMulMkDir(const char *dirname) {
return code;
}
int32_t taosMulModeMkDir(const char *dirname, int mode) {
int32_t taosMulModeMkDir(const char *dirname, int mode, bool checkAccess) {
if (dirname == NULL || strlen(dirname) >= TDDIRMAXLEN) return -1;
char temp[TDDIRMAXLEN];
char *pos = temp;
@ -206,6 +206,9 @@ int32_t taosMulModeMkDir(const char *dirname, int mode) {
#endif
if (taosDirExist(temp)) {
if (checkAccess && taosCheckAccessFile(temp, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
return 0;
}
return chmod(temp, mode);
}
@ -248,6 +251,9 @@ int32_t taosMulModeMkDir(const char *dirname, int mode) {
}
if (code < 0 && errno == EEXIST) {
if (checkAccess && taosCheckAccessFile(temp, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) {
return 0;
}
return chmod(temp, mode);
}

View File

@ -206,6 +206,19 @@ if $desc_rows != $asc_rows then
return -1
endi
print ================= step11
sql create database if not exists test0828
sql use test0828
sql create stable st (ts timestamp, c2 int) tags(tg int)
sql insert into ct1 using st tags(1) values('2021-08-01', 0)
sql insert into ct2 using st tags(2) values('2022-08-01', 1)
sql select _wstart, _wend, count(*) from st where ts>='2021-01-01' and ts < '2023-08-28' interval(1n) fill(value, 0) order by _wstart desc
print $rows
if $rows != 32 then
return -1
endi
sql drop database test0828
print =============== clear
#sql drop database $db
#sql select * from information_schema.ins_databases