diff --git a/docs/en/12-taos-sql/27-indexing.md b/docs/en/12-taos-sql/27-indexing.md index a89c8929c1..1badd38d17 100644 --- a/docs/en/12-taos-sql/27-indexing.md +++ b/docs/en/12-taos-sql/27-indexing.md @@ -19,6 +19,9 @@ index_option: functions: function [, function] ... ``` +### tag Indexing + + [tag index](../tag-index) ### SMA Indexing diff --git a/docs/zh/12-taos-sql/27-index.md b/docs/zh/12-taos-sql/27-indexing.md similarity index 98% rename from docs/zh/12-taos-sql/27-index.md rename to docs/zh/12-taos-sql/27-indexing.md index da8f38eb22..cf8cac1bed 100644 --- a/docs/zh/12-taos-sql/27-index.md +++ b/docs/zh/12-taos-sql/27-indexing.md @@ -20,6 +20,9 @@ index_option: functions: function [, function] ... ``` +### tag 索引 + + [tag 索引](../tag-index) ### SMA 索引 diff --git a/docs/zh/20-third-party/70-seeq.md b/docs/zh/20-third-party/70-seeq.md new file mode 100644 index 0000000000..0bdf58955d --- /dev/null +++ b/docs/zh/20-third-party/70-seeq.md @@ -0,0 +1,437 @@ +--- +sidebar_label: Seeq +title: Seeq +description: 如何使用 Seeq 和 TDengine 进行时序数据分析 +--- + +# 如何使用 Seeq 和 TDengine 进行时序数据分析 + +## 方案介绍 + +Seeq 是制造业和工业互联网(IIOT)高级分析软件。Seeq 支持在工艺制造组织中使用机器学习创新的新功能。这些功能使组织能够将自己或第三方机器学习算法部署到前线流程工程师和主题专家使用的高级分析应用程序,从而使单个数据科学家的努力扩展到许多前线员工。 + +通过 TDengine Java connector, Seeq 可以轻松支持查询 TDengine 提供的时序数据,并提供数据展现、分析、预测等功能。 + +### Seeq 安装方法 + +从 (Seeq 官网)[https://www.seeq.com/customer-download]下载相关软件,例如 Seeq Server 和 Seeq Data Lab 等。 + +### Seeq Server 安装和启动 + +``` +tar xvzf seeq-server-xxx.tar.gz +cd seeq-server-installer +sudo ./install + +sudo seeq service enable +sudo seeq start +``` + +### Seeq Data Lab Server 安装和启动 + +Seeq Data Lab 需要安装在和 Seeq Server 不同的服务器上,并通过配置和 Seeq Server 互联。详细安装配置指令参见(Seeq 官方文档)[https://support.seeq.com/space/KB/1034059842]。 + +``` +tar xvf seeq-data-lab--64bit-linux.tar.gz +sudo seeq-data-lab-installer/install -f /opt/seeq/seeq-data-lab -g /var/opt/seeq -u seeq +sudo seeq config set Network/DataLab/Hostname localhost +sudo seeq config set Network/DataLab/Port 34231 # the port of the Data Lab server (usually 34231) +sudo seeq config set Network/Hostname # the host IP or URL of the main Seeq Server + +# If the main Seeq server is configured to listen over HTTPS +sudo seeq config set Network/Webserver/SecurePort 443 # the secure port of the main Seeq Server (usually 443) + +# If the main Seeq server is NOT configured to listen over HTTPS +sudo seeq config set Network/Webserver/Port + +#On the main Seeq server, open a Seeq Command Prompt and set the hostname of the Data Lab server: +sudo seeq config set Network/DataLab/Hostname # the host IP (not URL) of the Data Lab server +sudo seeq config set Network/DataLab/Port 34231 # the port of the Data Lab server (usually 34231 +``` + +## TDengine 本地实例安装方法 + +请参考(官网文档)[https://docs.taosdata.com/get-started/package/]。 + +## TDengine Cloud 访问方法 +如果使用 Seeq 连接 TDengine Cloud,请在 https://cloud.taosdata.com 申请帐号并登录查看如何访问 TDengine Cloud。 + +## 如何配置 Seeq 访问 TDengine + +1. 查看 data 存储位置 + +``` +sudo seeq config get Folders/Data +``` + +2. 从 maven.org 下载 TDengine Java connector 包,目前最新版本为(3.2.4)[https://repo1.maven.org/maven2/com/taosdata/jdbc/taos-jdbcdriver/3.2.4/taos-jdbcdriver-3.2.4-dist.jar],并拷贝至 data 存储位置的 plugins\lib 中。 + +3. 重新启动 seeq server + +``` +sudo seeq restart +``` + +4. 输入 License + +使用浏览器访问 ip:34216 并按照说明输入 license。 + +## 使用 Seeq 分析 TDengine 时序数据 + +本章节演示如何使用 Seeq 软件配合 TDengine 进行时序数据分析。 + +### 场景介绍 + +示例场景为一个电力系统,用户每天从电站仪表收集用电量数据,并将其存储在 TDengine 集群中。现在用户想要预测电力消耗将会如何发展,并购买更多设备来支持它。用户电力消耗随着每月订单变化而不同,另外考虑到季节变化,电力消耗量会有所不同。这个城市位于北半球,所以在夏天会使用更多的电力。我们模拟数据来反映这些假定。 + +### 数据 Schema + +``` +CREATE STABLE meters (ts TIMESTAMP, num INT, temperature FLOAT, goods INT) TAGS (device NCHAR(20)); +CREATE TABLE goods (ts1 TIMESTAMP, ts2 TIMESTAMP, goods FLOAT); +``` + +!(Seeq demo schema)[./seeq/seeq-demo-schema.webp] + +### 构造数据方法 + +``` +python mockdata.py +taos -s "insert into power.goods select _wstart, _wstart + 10d, avg(goods) from power.meters interval(10d);" +``` +源代码托管在(github 仓库)[https://github.com/sangshuduo/td-forecasting]。 + +### 使用 Seeq 进行数据分析 + +#### 配置数据源(Data Source) + +使用 Seeq 管理员角色的帐号登录,并新建数据源。 + +- Power + +``` +{ + "QueryDefinitions": [ + { + "Name": "PowerNum", + "Type": "SIGNAL", + "Sql": "SELECT ts, num FROM meters", + "Enabled": true, + "TestMode": false, + "TestQueriesDuringSync": true, + "InProgressCapsulesEnabled": false, + "Variables": null, + "Properties": [ + { + "Name": "Name", + "Value": "Num", + "Sql": null, + "Uom": "string" + }, + { + "Name": "Interpolation Method", + "Value": "linear", + "Sql": null, + "Uom": "string" + }, + { + "Name": "Maximum Interpolation", + "Value": "2day", + "Sql": null, + "Uom": "string" + } + ], + "CapsuleProperties": null + } + ], + "Type": "GENERIC", + "Hostname": null, + "Port": 0, + "DatabaseName": null, + "Username": "root", + "Password": "taosdata", + "InitialSql": null, + "TimeZone": null, + "PrintRows": false, + "UseWindowsAuth": false, + "SqlFetchBatchSize": 100000, + "UseSSL": false, + "JdbcProperties": null, + "GenericDatabaseConfig": { + "DatabaseJdbcUrl": "jdbc:TAOS-RS://127.0.0.1:6041/power?user=root&password=taosdata", + "SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver", + "ResolutionInNanoseconds": 1000, + "ZonedColumnTypes": [] + } +} +``` + +- Goods + +``` +{ + "QueryDefinitions": [ + { + "Name": "PowerGoods", + "Type": "CONDITION", + "Sql": "SELECT ts1, ts2, goods FROM power.goods", + "Enabled": true, + "TestMode": false, + "TestQueriesDuringSync": true, + "InProgressCapsulesEnabled": false, + "Variables": null, + "Properties": [ + { + "Name": "Name", + "Value": "Goods", + "Sql": null, + "Uom": "string" + }, + { + "Name": "Maximum Duration", + "Value": "10days", + "Sql": null, + "Uom": "string" + } + ], + "CapsuleProperties": [ + { + "Name": "goods", + "Value": "${columnResult}", + "Column": "goods", + "Uom": "string" + } + ] + } + ], + "Type": "GENERIC", + "Hostname": null, + "Port": 0, + "DatabaseName": null, + "Username": "root", + "Password": "taosdata", + "InitialSql": null, + "TimeZone": null, + "PrintRows": false, + "UseWindowsAuth": false, + "SqlFetchBatchSize": 100000, + "UseSSL": false, + "JdbcProperties": null, + "GenericDatabaseConfig": { + "DatabaseJdbcUrl": "jdbc:TAOS-RS://127.0.0.1:6041/power?user=root&password=taosdata", + "SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver", + "ResolutionInNanoseconds": 1000, + "ZonedColumnTypes": [] + } +} +``` + +- Temperature + +``` +{ + "QueryDefinitions": [ + { + "Name": "PowerNum", + "Type": "SIGNAL", + "Sql": "SELECT ts, temperature FROM meters", + "Enabled": true, + "TestMode": false, + "TestQueriesDuringSync": true, + "InProgressCapsulesEnabled": false, + "Variables": null, + "Properties": [ + { + "Name": "Name", + "Value": "Temperature", + "Sql": null, + "Uom": "string" + }, + { + "Name": "Interpolation Method", + "Value": "linear", + "Sql": null, + "Uom": "string" + }, + { + "Name": "Maximum Interpolation", + "Value": "2day", + "Sql": null, + "Uom": "string" + } + ], + "CapsuleProperties": null + } + ], + "Type": "GENERIC", + "Hostname": null, + "Port": 0, + "DatabaseName": null, + "Username": "root", + "Password": "taosdata", + "InitialSql": null, + "TimeZone": null, + "PrintRows": false, + "UseWindowsAuth": false, + "SqlFetchBatchSize": 100000, + "UseSSL": false, + "JdbcProperties": null, + "GenericDatabaseConfig": { + "DatabaseJdbcUrl": "jdbc:TAOS-RS://127.0.0.1:6041/power?user=root&password=taosdata", + "SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver", + "ResolutionInNanoseconds": 1000, + "ZonedColumnTypes": [] + } +} +``` + +#### 使用 Seeq Workbench + +登录 Seeq 服务页面并新建 Seeq Workbench,通过选择数据源搜索结果和根据需要选择不同的工具,可以进行数据展现或预测,详细使用方法参见(官方知识库)[https://support.seeq.com/space/KB/146440193/Seeq+Workbench]。 + +!(Seeq Workbench)[./seeq/seeq-demo-workbench.webp] + +#### 用 Seeq Data Lab Server 进行进一步的数据分析 + +登录 Seeq 服务页面并新建 Seeq Data Lab,可以进一步使用 Python 编程或其他机器学习工具进行更复杂的数据挖掘功能。 + +```Python +from seeq import spy +spy.options.compatibility = 189 +import pandas as pd +import matplotlib +import matplotlib.pyplot as plt +import mlforecast +import lightgbm as lgb +from mlforecast.target_transforms import Differences +from sklearn.linear_model import LinearRegression + +ds = spy.search({'ID': "8C91A9C7-B6C2-4E18-AAAF-XXXXXXXXX"}) +print(ds) + +sig = ds.loc[ds['Name'].isin(['Num'])] +print(sig) + +data = spy.pull(sig, start='2015-01-01', end='2022-12-31', grid=None) +print("data.info()") +data.info() +print(data) +#data.plot() + +print("data[Num].info()") +data['Num'].info() +da = data['Num'].index.tolist() +#print(da) + +li = data['Num'].tolist() +#print(li) + +data2 = pd.DataFrame() +data2['ds'] = da +print('1st data2 ds info()') +data2['ds'].info() + +#data2['ds'] = pd.to_datetime(data2['ds']).to_timestamp() +data2['ds'] = pd.to_datetime(data2['ds']).astype('int64') +data2['y'] = li +print('2nd data2 ds info()') +data2['ds'].info() +print(data2) + +data2.insert(0, column = "unique_id", value="unique_id") + +print("Forecasting ...") + +forecast = mlforecast.MLForecast( + models = lgb.LGBMRegressor(), + freq = 1, + lags=[365], + target_transforms=[Differences([365])], +) + +forecast.fit(data2) +predicts = forecast.predict(365) + +pd.concat([data2, predicts]).set_index("ds").plot(title = "current data with forecast") +plt.show() +``` + +运行程序输出结果: + +!(Seeq forecast result)[./seeq/seeq-forecast-result.webp] + +### 配置 Seeq 数据源连接 TDengine Cloud + +配置 Seeq 数据源连接 TDengine Cloud 和连接 TDengine 本地安装实例没有本质的不同,只要登录 TDengine Cloud 后选择“编程 - Java”并拷贝带 token 字符串的 JDBC 填写为 Seeq Data Source 的 DatabaseJdbcUrl 值。 +注意使用 TDengine Cloud 时 SQL 命令中需要指定数据库名称。 + +#### 用 TDengine Cloud 作为数据源的配置内容示例: + +``` +{ + "QueryDefinitions": [ + { + "Name": "CloudVoltage", + "Type": "SIGNAL", + "Sql": "SELECT ts, voltage FROM test.meters", + "Enabled": true, + "TestMode": false, + "TestQueriesDuringSync": true, + "InProgressCapsulesEnabled": false, + "Variables": null, + "Properties": [ + { + "Name": "Name", + "Value": "Voltage", + "Sql": null, + "Uom": "string" + }, + { + "Name": "Interpolation Method", + "Value": "linear", + "Sql": null, + "Uom": "string" + }, + { + "Name": "Maximum Interpolation", + "Value": "2day", + "Sql": null, + "Uom": "string" + } + ], + "CapsuleProperties": null + } + ], + "Type": "GENERIC", + "Hostname": null, + "Port": 0, + "DatabaseName": null, + "Username": "root", + "Password": "taosdata", + "InitialSql": null, + "TimeZone": null, + "PrintRows": false, + "UseWindowsAuth": false, + "SqlFetchBatchSize": 100000, + "UseSSL": false, + "JdbcProperties": null, + "GenericDatabaseConfig": { + "DatabaseJdbcUrl": "jdbc:TAOS-RS://gw.cloud.taosdata.com?useSSL=true&token=41ac9d61d641b6b334e8b76f45f5a8XXXXXXXXXX", + "SqlDriverClassName": "com.taosdata.jdbc.rs.RestfulDriver", + "ResolutionInNanoseconds": 1000, + "ZonedColumnTypes": [] + } +} +``` + +#### TDengine Cloud 作为数据源的 Seeq Workbench 界面示例 + +!(Seeq workbench with TDengine cloud)[./seeq/seeq-workbench-with-tdengine-cloud.webp] + +## 方案总结 + +通过集成Seeq和TDengine,可以充分利用TDengine高效的存储和查询性能,同时也可以受益于Seeq提供给用户的强大数据可视化和分析功能。 + +这种集成使用户能够充分利用TDengine的高性能时序数据存储和检索,确保高效处理大量数据。同时,Seeq提供高级分析功能,如数据可视化、异常检测、相关性分析和预测建模,使用户能够获得有价值的洞察并基于数据进行决策。 + +综合来看,Seeq和TDengine共同为制造业、工业物联网和电力系统等各行各业的时序数据分析提供了综合解决方案。高效数据存储和先进的分析相结合,赋予用户充分发挥时序数据潜力的能力,推动运营改进,并支持预测和规划分析应用。 diff --git a/docs/zh/20-third-party/seeq/seeq-demo-schema.webp b/docs/zh/20-third-party/seeq/seeq-demo-schema.webp new file mode 100644 index 0000000000..3f603749ad Binary files /dev/null and b/docs/zh/20-third-party/seeq/seeq-demo-schema.webp differ diff --git a/docs/zh/20-third-party/seeq/seeq-demo-workbench.webp b/docs/zh/20-third-party/seeq/seeq-demo-workbench.webp new file mode 100644 index 0000000000..5c3d7a4158 Binary files /dev/null and b/docs/zh/20-third-party/seeq/seeq-demo-workbench.webp differ diff --git a/docs/zh/20-third-party/seeq/seeq-forecast-result.webp b/docs/zh/20-third-party/seeq/seeq-forecast-result.webp new file mode 100644 index 0000000000..13144207eb Binary files /dev/null and b/docs/zh/20-third-party/seeq/seeq-forecast-result.webp differ diff --git a/docs/zh/20-third-party/seeq/seeq-workbench-with-tdengine-cloud.webp b/docs/zh/20-third-party/seeq/seeq-workbench-with-tdengine-cloud.webp new file mode 100644 index 0000000000..f486c3ea29 Binary files /dev/null and b/docs/zh/20-third-party/seeq/seeq-workbench-with-tdengine-cloud.webp differ diff --git a/include/os/osDir.h b/include/os/osDir.h index 55c7a15764..533ac8e4a4 100644 --- a/include/os/osDir.h +++ b/include/os/osDir.h @@ -83,7 +83,7 @@ void taosRemoveDir(const char *dirname); bool taosDirExist(const char *dirname); int32_t taosMkDir(const char *dirname); int32_t taosMulMkDir(const char *dirname); -int32_t taosMulModeMkDir(const char *dirname, int mode); +int32_t taosMulModeMkDir(const char *dirname, int mode, bool checkAccess); void taosRemoveOldFiles(const char *dirname, int32_t keepDays); int32_t taosExpandDir(const char *dirname, char *outname, int32_t maxlen); int32_t taosRealPath(char *dirname, char *realPath, int32_t maxlen); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3c1f43ef40..9c242d7c1e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1523,7 +1523,7 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi taosSetAllDebugFlag(cfgGetItem(pCfg, "debugFlag")->i32, false); - if (taosMulModeMkDir(tsLogDir, 0777) != 0) { + if (taosMulModeMkDir(tsLogDir, 0777, true) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); printf("failed to create dir:%s since %s", tsLogDir, terrstr()); cfgCleanup(pCfg); diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index a3a31cfc5a..94c937e8f4 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -33,7 +33,7 @@ enum { int32_t mndInitConsumer(SMnode *pMnode); void mndCleanupConsumer(SMnode *pMnode); -void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId); +void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* info); SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 89c58337ad..6dd7a13c66 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -99,6 +99,8 @@ typedef enum { TRN_CONFLICT_GLOBAL = 1, TRN_CONFLICT_DB = 2, TRN_CONFLICT_DB_INSIDE = 3, + TRN_CONFLICT_TOPIC = 4, + TRN_CONFLICT_TOPIC_INSIDE = 5, } ETrnConflct; typedef enum { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 82492f930e..f25bd2cffb 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -37,7 +37,6 @@ static const char *mndConsumerStatusName(int status); static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer); -static int32_t mndProcessConsumerMetaMsg(SRpcMsg *pMsg); static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); @@ -45,7 +44,6 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg); static int32_t mndProcessAskEpReq(SRpcMsg *pMsg); static int32_t mndProcessMqHbReq(SRpcMsg *pMsg); static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg); -static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg); static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg); static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg); @@ -64,7 +62,6 @@ int32_t mndInitConsumer(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg); -// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg); @@ -76,7 +73,7 @@ int32_t mndInitConsumer(SMnode *pMnode) { void mndCleanupConsumer(SMnode *pMnode) {} -void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){ +void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* info){ SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); if (pClearMsg == NULL) { mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", consumerId, (int32_t)sizeof(SMqConsumerClearMsg)); @@ -85,7 +82,11 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){ pClearMsg->consumerId = consumerId; SRpcMsg rpcMsg = { - .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)}; + .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, + .pCont = pClearMsg, + .contLen = sizeof(SMqConsumerClearMsg), + .info = *info, + }; mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); @@ -122,48 +123,31 @@ void mndRebCntDec() { } } -//static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { -// SMnode *pMnode = pMsg->info.node; -// SMqConsumerLostMsg *pLostMsg = pMsg->pCont; -// SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId); -// if (pConsumer == NULL) { -// return 0; -// } -// -// mInfo("process consumer lost msg, consumer:0x%" PRIx64 " status:%d(%s)", pLostMsg->consumerId, pConsumer->status, -// mndConsumerStatusName(pConsumer->status)); -// -// if (pConsumer->status != MQ_CONSUMER_STATUS_READY) { -// mndReleaseConsumer(pMnode, pConsumer); -// return -1; -// } -// -// SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); -// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST; -// -// mndReleaseConsumer(pMnode, pConsumer); -// -// STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "lost-csm"); -// if (pTrans == NULL) { -// goto FAIL; -// } -// -// if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { -// goto FAIL; -// } -// -// if (mndTransPrepare(pMnode, pTrans) != 0) { -// goto FAIL; -// } -// -// tDeleteSMqConsumerObj(pConsumerNew, true); -// mndTransDrop(pTrans); -// return 0; -//FAIL: -// tDeleteSMqConsumerObj(pConsumerNew, true); -// mndTransDrop(pTrans); -// return -1; -//} +static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser) { + int32_t numOfTopics = taosArrayGetSize(pTopicList); + + for (int32_t i = 0; i < numOfTopics; i++) { + char *pOneTopic = taosArrayGetP(pTopicList, i); + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic); + if (pTopic == NULL) { // terrno has been set by callee function + return -1; + } + + if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) { + mndReleaseTopic(pMnode, pTopic); + return -1; + } + + mndTransSetDbName(pTrans, pOneTopic, NULL); + if(mndTransCheckConflict(pMnode, pTrans) != 0){ + mndReleaseTopic(pMnode, pTopic); + return -1; + } + mndReleaseTopic(pMnode, pTopic); + } + + return 0; +} static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; @@ -188,10 +172,13 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { mndReleaseConsumer(pMnode, pConsumer); - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm"); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm"); if (pTrans == NULL) { goto FAIL; } + if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user) != 0){ + goto FAIL; + } if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; @@ -221,19 +208,12 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId, mndConsumerStatusName(pConsumer->status)); -// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { -// mndReleaseConsumer(pMnode, pConsumer); -// return -1; -// } - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); -// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST; mndReleaseConsumer(pMnode, pConsumer); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm"); if (pTrans == NULL) goto FAIL; - // this is the drop action, not the update action if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; @@ -318,7 +298,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { if (status == MQ_CONSUMER_STATUS_READY) { if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info); } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { taosRLockLatch(&pConsumer->lock); int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics); @@ -333,7 +313,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { } } else if (status == MQ_CONSUMER_STATUS_LOST) { if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info); } } else { // MQ_CONSUMER_STATUS_REBALANCE taosRLockLatch(&pConsumer->lock); @@ -410,6 +390,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER, .pCont = pRecoverMsg, .contLen = sizeof(SMqConsumerRecoverMsg), + .info = pMsg->info, }; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); } @@ -484,6 +465,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER, .pCont = pRecoverMsg, .contLen = sizeof(SMqConsumerRecoverMsg), + .info = pMsg->info, }; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); @@ -629,27 +611,6 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj return 0; } -static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser) { - int32_t numOfTopics = taosArrayGetSize(pTopicList); - - for (int32_t i = 0; i < numOfTopics; i++) { - char *pOneTopic = taosArrayGetP(pTopicList, i); - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic); - if (pTopic == NULL) { // terrno has been set by callee function - return -1; - } - - if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) { - mndReleaseTopic(pMnode, pTopic); - return -1; - } - - mndReleaseTopic(pMnode, pTopic); - } - - return 0; -} - static void *topicNameDup(void *p) { return taosStrdup((char *)p); } static void freeItem(void *param) { @@ -688,12 +649,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } // check topic existence - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "subscribe"); if (pTrans == NULL) { goto _over; } - code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user); + code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user); if (code != TSDB_CODE_SUCCESS) { goto _over; } @@ -722,7 +683,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over; if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; - } else { int32_t status = atomic_load_32(&pExistedConsumer->status); @@ -802,7 +762,6 @@ _over: tDeleteSMqConsumerObj(pConsumerNew, true); - // TODO: replace with destroy subscribe msg taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); return code; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index d62b9e4174..c756341164 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -553,13 +553,17 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu } } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); + char topic[TSDB_TOPIC_FNAME_LEN] = {0}; + char cgroup[TSDB_CGROUP_LEN] = {0}; + mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "tmq-reb"); if (pTrans == NULL) { nodesDestroyNode((SNode*)pPlan); return -1; } - mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL); + mndTransSetDbName(pTrans, topic, cgroup); if (mndTransCheckConflict(pMnode, pTrans) != 0) { mndTransDrop(pTrans); nodesDestroyNode((SNode*)pPlan); @@ -587,10 +591,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu return -1; } - char topic[TSDB_TOPIC_FNAME_LEN] = {0}; - char cgroup[TSDB_CGROUP_LEN] = {0}; - mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); - // 3. commit log: consumer to update status and epoch // 3.1 set touched consumer int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers); @@ -802,6 +802,19 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { goto end; } + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "drop-cgroup"); + if (pTrans == NULL) { + mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); + code = -1; + goto end; + } + + mndTransSetDbName(pTrans, dropReq.topic, dropReq.cgroup); + code = mndTransCheckConflict(pMnode, pTrans); + if (code != 0) { + goto end; + } + void *pIter = NULL; SMqConsumerObj *pConsumer; while (1) { @@ -811,18 +824,11 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { } if (strcmp(dropReq.cgroup, pConsumer->cgroup) == 0) { - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info); } sdbRelease(pMnode->pSdb, pConsumer); } - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup"); - if (pTrans == NULL) { - mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); - code = -1; - goto end; - } - mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { @@ -1019,8 +1025,8 @@ int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName) { if (pIter == NULL) break; - char topic[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; + char topic[TSDB_TOPIC_FNAME_LEN] = {0}; + char cgroup[TSDB_CGROUP_LEN] = {0}; mndSplitSubscribeKey(pSub->key, topic, cgroup, true); if (strcmp(topic, topicName) != 0) { sdbRelease(pSdb, pSub); @@ -1084,7 +1090,6 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { } int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) { - int32_t code = -1; SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; @@ -1093,8 +1098,8 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); if (pIter == NULL) break; - char topic[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; + char topic[TSDB_TOPIC_FNAME_LEN] = {0}; + char cgroup[TSDB_CGROUP_LEN] = {0}; mndSplitSubscribeKey(pSub->key, topic, cgroup, true); if (strcmp(topic, topicName) != 0) { sdbRelease(pSdb, pSub); @@ -1132,15 +1137,13 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) { sdbRelease(pSdb, pSub); sdbCancelFetch(pSdb, pIter); - goto END; + return -1; } sdbRelease(pSdb, pSub); } - code = 0; -END: - return code; + return 0; } static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t consumerId, const char* topic, const char* cgroup, SArray* vgs, SArray *offsetRows){ diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index f6b2aabede..1e3a8bddb7 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -382,14 +382,29 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * int32_t code = -1; SNode *pAst = NULL; SQueryPlan *pPlan = NULL; - SMqTopicObj topicObj = {0}; + + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "create-topic"); + if (pTrans == NULL) { + mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + code = -1; + goto _OUT; + } + + mndTransSetDbName(pTrans, pCreate->name, NULL); + code = mndTransCheckConflict(pMnode, pTrans); + if (code != 0) { + goto _OUT; + } + mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name); + tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN); - if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj) != 0) { - return -1; + code = mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj); + if (code != 0) { + goto _OUT; } topicObj.createTime = taosGetTimestampMs(); @@ -406,6 +421,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * if (pCreate->withMeta) { terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + code = terrno; goto _OUT; } @@ -414,13 +430,15 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast); - if (nodesStringToNode(pCreate->ast, &pAst) != 0) { + code = nodesStringToNode(pCreate->ast, &pAst); + if (code != 0) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); goto _OUT; } SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true}; - if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) { + code = qCreateQueryPlan(&cxt, &pPlan, NULL); + if (code != 0) { mError("failed to create topic:%s since %s", pCreate->name, terrstr()); goto _OUT; } @@ -428,6 +446,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t)); if (topicObj.ntbColIds == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; goto _OUT; } @@ -438,12 +457,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.ntbColIds = NULL; } - if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) { + code = qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema); + if (code != 0) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); goto _OUT; } - if (nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL) != 0) { + code = nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL); + if (code != 0) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); goto _OUT; } @@ -451,6 +472,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName); if (pStb == NULL) { terrno = TSDB_CODE_MND_STB_NOT_EXIST; + code = terrno; goto _OUT; } @@ -470,21 +492,10 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * /*topicObj.withTbName = 1;*/ /*topicObj.withSchema = 1;*/ - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-topic"); - if (pTrans == NULL) { - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - goto _OUT; - } - - mndTransSetDbName(pTrans, pDb->name, NULL); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - goto _OUT; - } - mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name); - SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + code = -1; goto _OUT; } @@ -510,7 +521,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * // encoder check alter info int32_t len; - int32_t code; tEncodeSize(tEncodeSTqCheckInfo, &info, len, code); if (code < 0) { sdbRelease(pSdb, pVgroup); @@ -525,6 +535,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); sdbCancelFetch(pSdb, pIter); + code = -1; goto _OUT; } tEncoderClear(&encoder); @@ -539,6 +550,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); sdbCancelFetch(pSdb, pIter); + code = -1; goto _OUT; } buf = NULL; @@ -548,6 +560,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + code = -1; goto _OUT; } @@ -664,16 +677,19 @@ _OVER: } static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; SMDropTopicReq dropReq = {0}; + int32_t code = 0; + SMqTopicObj *pTopic = NULL; + STrans *pTrans = NULL; if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name); + pTopic = mndAcquireTopic(pMnode, dropReq.name); if (pTopic == NULL) { if (dropReq.igNotExists) { mInfo("topic:%s, not exist, ignore not exist is set", dropReq.name); @@ -685,9 +701,29 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } } - if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic) != 0) { - mndReleaseTopic(pMnode, pTopic); - return -1; + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "drop-topic"); + if (pTrans == NULL) { + mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); + code = -1; + goto end; + } + + mndTransSetDbName(pTrans, pTopic->name, NULL); + code = mndTransCheckConflict(pMnode, pTrans); + if (code != 0) { + goto end; + } + + mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); + + code = mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic); + if (code != 0) { + goto end; + } + + code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db); + if (code != 0) { + goto end; } void *pIter = NULL; @@ -698,37 +734,42 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { break; } - if (pConsumer->status == MQ_CONSUMER_STATUS_LOST){ - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); - mndReleaseConsumer(pMnode, pConsumer); - continue; - } - + bool found = false; int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { char *name = taosArrayGetP(pConsumer->assignedTopics, i); if (strcmp(name, pTopic->name) == 0) { - mndReleaseConsumer(pMnode, pConsumer); - mndReleaseTopic(pMnode, pTopic); - sdbCancelFetch(pSdb, pIter); - terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; - mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", - dropReq.name, pConsumer->consumerId, pConsumer->cgroup); - return -1; + found = true; + break; } } + if (found){ + if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pReq->info); + mndReleaseConsumer(pMnode, pConsumer); + continue; + } + + mndReleaseConsumer(pMnode, pConsumer); + sdbCancelFetch(pSdb, pIter); + terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", + dropReq.name, pConsumer->consumerId, pConsumer->cgroup); + code = -1; + goto end; + } sz = taosArrayGetSize(pConsumer->rebNewTopics); for (int32_t i = 0; i < sz; i++) { char *name = taosArrayGetP(pConsumer->rebNewTopics, i); if (strcmp(name, pTopic->name) == 0) { mndReleaseConsumer(pMnode, pConsumer); - mndReleaseTopic(pMnode, pTopic); sdbCancelFetch(pSdb, pIter); terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", dropReq.name, pConsumer->consumerId, pConsumer->cgroup); - return -1; + code = -1; + goto end; } } @@ -737,45 +778,22 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i); if (strcmp(name, pTopic->name) == 0) { mndReleaseConsumer(pMnode, pConsumer); - mndReleaseTopic(pMnode, pTopic); sdbCancelFetch(pSdb, pIter); terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)", dropReq.name, pConsumer->consumerId, pConsumer->cgroup); - return -1; + code = -1; + goto end; } } sdbRelease(pSdb, pConsumer); } - if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) { - mndReleaseTopic(pMnode, pTopic); - return -1; - } - - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "drop-topic"); - if (pTrans == NULL) { + code = mndDropSubByTopic(pMnode, pTrans, dropReq.name); + if (code < 0) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); - mndReleaseTopic(pMnode, pTopic); - return -1; - } - - mndTransSetDbName(pTrans, pTopic->db, NULL); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - mndReleaseTopic(pMnode, pTopic); - mndTransDrop(pTrans); - return -1; - } - - mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); - - // TODO check if rebalancing - if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { - mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); - mndTransDrop(pTrans); - mndReleaseTopic(pMnode, pTopic); - return -1; + goto end; } if (pTopic->ntbUid != 0) { @@ -801,25 +819,25 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { action.pCont = buf; action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN; action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + code = mndTransAppendRedoAction(pTrans, &action); + if (code != 0) { taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); - mndReleaseTopic(pMnode, pTopic); sdbCancelFetch(pSdb, pIter); - mndTransDrop(pTrans); - return -1; + goto end; } sdbRelease(pSdb, pVgroup); } } - int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic); + code = mndDropTopic(pMnode, pTrans, pReq, pTopic); + +end: mndReleaseTopic(pMnode, pTopic); mndTransDrop(pTrans); - if (code != 0) { mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); - return -1; + return code; } char detail[100] = {0}; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index fddde70834..93a152f0cc 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -791,6 +791,22 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { } } + if (pNew->conflict == TRN_CONFLICT_TOPIC) { + if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; + if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { + if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 ) conflict = true; + } + } + if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) { + if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; + if (pTrans->conflict == TRN_CONFLICT_TOPIC ) { + if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 ) conflict = true; + } + if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { + if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0) conflict = true; + } + } + if (conflict) { mError("trans:%d, db:%s stb:%s type:%d, can't execute since conflict with trans:%d db:%s stb:%s type:%d", pNew->id, pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index cef8a99e25..a502e3e314 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -783,6 +783,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); } + tqUnregisterPushHandle(pTq, pHandle); + code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (code != 0) { tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); @@ -884,7 +886,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_store_32(&pHandle->epoch, 0); - tqUnregisterPushHandle(pTq, pHandle); ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); } @@ -1201,6 +1202,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { "s-task:%s failed to start scan-history in first stream time window since already started, unexpected " "sched-status:%d", id, schedStatus); + streamMetaReleaseTask(pMeta, pTask); return 0; } @@ -1214,6 +1216,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + streamMetaReleaseTask(pMeta, pTask); return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 8ca2ccad1b..caf88f55fc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -25,7 +25,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, const int32_t* dstSlotIds, void** pRes, const char* idStr) { int32_t numOfRows = pBlock->info.rows; - bool allNullRow = true; + // bool allNullRow = true; if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { for (int32_t i = 0; i < pReader->numOfCols; ++i) { @@ -36,7 +36,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p p->ts = pColVal->ts; p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); - allNullRow = p->isNull & allNullRow; + // allNullRow = p->isNull & allNullRow; if (!p->isNull) { if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { @@ -56,7 +56,8 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false); } - pBlock->info.rows += allNullRow ? 0 : 1; + // pBlock->info.rows += allNullRow ? 0 : 1; + ++pBlock->info.rows; } else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) { for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]); @@ -65,7 +66,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); SColVal* pVal = &pColVal->colVal; - allNullRow = false; + // allNullRow = false; if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { if (!COL_VAL_IS_VALUE(&pColVal->colVal)) { colDataSetNULL(pColInfoData, numOfRows); @@ -80,7 +81,8 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p } } - pBlock->info.rows += allNullRow ? 0 : 1; + // pBlock->info.rows += allNullRow ? 0 : 1; + ++pBlock->info.rows; } else { tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr); return TSDB_CODE_INVALID_PARA; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 52e6fe6312..c02cff3aa9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -5608,4 +5608,4 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { pReader->idStr = taosStrdup(idstr); } -void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; } +void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index e635862200..c10d8c628d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1268,7 +1268,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* } // has duplicated ts of different version in this block - pInfo->hasDupTs = (pBlockInfo->record.numRow > pBlockInfo->record.count); + pInfo->hasDupTs = (pBlockInfo->record.numRow > pBlockInfo->record.count) || (pBlockInfo->record.count <= 0); pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pBlockInfo->record, pReader->info.order); if (hasDataInLastBlock(pLastBlockReader)) { diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index c14692b7f0..f836e71bc9 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -166,18 +166,20 @@ static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, i } else if (ekey < pInfo->pFillInfo->start) { int64_t t = ekey; SInterval* pInterval = &pInfo->pFillInfo->interval; - + int64_t prev = t; while(1) { - int64_t prev = taosTimeAdd(t, pInterval->sliding, pInterval->slidingUnit, pInterval->precision); - if (prev >= pInfo->pFillInfo->start) { - t = prev; + int64_t next = taosTimeAdd(t, pInterval->sliding, pInterval->slidingUnit, pInterval->precision); + if (next >= pInfo->pFillInfo->start) { + prev = t; + t = next; break; } - t = prev; + prev = t; + t = next; } - // todo time window chosen problem: t or prev value? - if (t > pInfo->pFillInfo->start) t -= pInterval->sliding; + // todo time window chosen problem: t or next value? + if (t > pInfo->pFillInfo->start) t = prev; taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, t); } } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 821967dfa4..96c1b2e9f0 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -654,9 +654,12 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; - int32_t outputLen = pInputData->varmeta.length * pInput->numOfRows; - char *outputBuf = taosMemoryCalloc(outputLen, 1); - char *output = outputBuf; + int32_t outputLen = pInputData->info.bytes; + char *outputBuf = taosMemoryMalloc(outputLen); + if (outputBuf == NULL) { + qError("substr function memory allocation failure. size: %d", outputLen); + return TSDB_CODE_OUT_OF_MEMORY; + } for (int32_t i = 0; i < pInput->numOfRows; ++i) { if (colDataIsNull_s(pInputData, i)) { @@ -676,14 +679,16 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu startPosBytes = TMAX(startPosBytes, 0); } + char *output = outputBuf; int32_t resLen = TMIN(subLen, len - startPosBytes); if (resLen > 0) { memcpy(varDataVal(output), varDataVal(input) + startPosBytes, resLen); + varDataSetLen(output, resLen); + } else { + varDataSetLen(output, 0); } - varDataSetLen(output, resLen); colDataSetVal(pOutputData, i, output, false); - output += varDataTLen(output); } pOutput->numOfRows = pInput->numOfRows; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2353497916..45878bb865 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -52,7 +52,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF memset(streamPath, 0, len); sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); - code = taosMulModeMkDir(streamPath, 0755); + code = taosMulModeMkDir(streamPath, 0755, false); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); goto _err; @@ -90,7 +90,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF memset(streamPath, 0, len); sprintf(streamPath, "%s/%s", pMeta->path, "state"); - code = taosMulModeMkDir(streamPath, 0755); + code = taosMulModeMkDir(streamPath, 0755, false); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); goto _err; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 5b42be182c..8694e5cf4c 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -169,7 +169,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz sscanf(cfg, "%d\n%d\n", &szPage, &pages); } } else { - int32_t code = taosMulModeMkDir(statePath, 0755); + int32_t code = taosMulModeMkDir(statePath, 0755, false); if (code == 0) { pCfgFile = taosOpenFile(cfgPath, TD_FILE_WRITE | TD_FILE_CREATE); sprintf(cfg, "%d\n%d\n", szPage, pages); diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index 4f595d8d4a..81b306e65d 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -62,7 +62,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i } memset(pDb->pgrHash, 0, tsize); - ret = taosMulModeMkDir(dbname, 0755); + ret = taosMulModeMkDir(dbname, 0755, false); if (ret < 0) { return -1; } diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index 3d63da7ba3..6e52c4ed27 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -193,7 +193,7 @@ int32_t taosMulMkDir(const char *dirname) { return code; } -int32_t taosMulModeMkDir(const char *dirname, int mode) { +int32_t taosMulModeMkDir(const char *dirname, int mode, bool checkAccess) { if (dirname == NULL || strlen(dirname) >= TDDIRMAXLEN) return -1; char temp[TDDIRMAXLEN]; char *pos = temp; @@ -206,6 +206,9 @@ int32_t taosMulModeMkDir(const char *dirname, int mode) { #endif if (taosDirExist(temp)) { + if (checkAccess && taosCheckAccessFile(temp, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) { + return 0; + } return chmod(temp, mode); } @@ -248,6 +251,9 @@ int32_t taosMulModeMkDir(const char *dirname, int mode) { } if (code < 0 && errno == EEXIST) { + if (checkAccess && taosCheckAccessFile(temp, TD_FILE_ACCESS_EXIST_OK | TD_FILE_ACCESS_READ_OK | TD_FILE_ACCESS_WRITE_OK)) { + return 0; + } return chmod(temp, mode); } diff --git a/tests/script/tsim/query/interval.sim b/tests/script/tsim/query/interval.sim index e2b0d219cb..135fcc8591 100644 --- a/tests/script/tsim/query/interval.sim +++ b/tests/script/tsim/query/interval.sim @@ -206,6 +206,19 @@ if $desc_rows != $asc_rows then return -1 endi +print ================= step11 + +sql create database if not exists test0828 +sql use test0828 +sql create stable st (ts timestamp, c2 int) tags(tg int) +sql insert into ct1 using st tags(1) values('2021-08-01', 0) +sql insert into ct2 using st tags(2) values('2022-08-01', 1) +sql select _wstart, _wend, count(*) from st where ts>='2021-01-01' and ts < '2023-08-28' interval(1n) fill(value, 0) order by _wstart desc +print $rows +if $rows != 32 then + return -1 +endi +sql drop database test0828 print =============== clear #sql drop database $db #sql select * from information_schema.ins_databases