Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/stream_compression

This commit is contained in:
Hongze Cheng 2022-09-18 09:54:00 +08:00
commit 2e37e64f2e
44 changed files with 2498 additions and 2023 deletions

View File

@ -2,7 +2,7 @@
# taosadapter # taosadapter
ExternalProject_Add(taosadapter ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG 71e7ccf GIT_TAG 05fb2ff
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE

View File

@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG e7270c9 GIT_TAG 7d7b3ce
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE

View File

@ -46,7 +46,7 @@ The following restrictions apply:
### Other Rules ### Other Rules
- The window clause must occur after the PARTITION BY clause and before the GROUP BY clause. It cannot be used with a GROUP BY clause. - The window clause must occur after the PARTITION BY clause. It cannot be used with a GROUP BY clause.
- SELECT clauses on windows can contain only the following expressions: - SELECT clauses on windows can contain only the following expressions:
- Constants - Constants
- Aggregate functions - Aggregate functions
@ -78,7 +78,7 @@ These pseudocolumns occur after the aggregation clause.
1. A huge volume of interpolation output may be returned using `FILL`, so it's recommended to specify the time range when using `FILL`. The maximum number of interpolation values that can be returned in a single query is 10,000,000. 1. A huge volume of interpolation output may be returned using `FILL`, so it's recommended to specify the time range when using `FILL`. The maximum number of interpolation values that can be returned in a single query is 10,000,000.
2. The result set is in ascending order of timestamp when you aggregate by time window. 2. The result set is in ascending order of timestamp when you aggregate by time window.
3. If aggregate by window is used on STable, the aggregate function is performed on all the rows matching the filter conditions. If `PARTITION BY` is not used in the query, the result set will be returned in strict ascending order of timestamp; otherwise the result set is not exactly in the order of ascending timestamp in each group. 3. If aggregate by window is used on STable, the aggregate function is performed on all the rows matching the filter conditions. If `PARTITION BY` is not used in the query, the result set will be returned in strict ascending order of timestamp; otherwise the result set will be returned in the order of ascending timestamp in each group.
::: :::
@ -120,6 +120,12 @@ In case of using integer, bool, or string to represent the status of a device at
SELECT COUNT(*), FIRST(ts), status FROM temp_tb_1 STATE_WINDOW(status); SELECT COUNT(*), FIRST(ts), status FROM temp_tb_1 STATE_WINDOW(status);
``` ```
Only care about the information of the status window when the status is 2. For example:
```
SELECT * FROM (SELECT COUNT(*) AS cnt, FIRST(ts) AS fst, status FROM temp_tb_1 STATE_WINDOW(status)) t WHERE status = 2;
```
### Session Window ### Session Window
The primary key, i.e. timestamp, is used to determine which session window a row belongs to. As shown in the figure below, if the limit of time interval for the session window is specified as 12 seconds, then the 6 rows in the figure constitutes 2 time windows, [2019-04-28 14:22:102019-04-28 14:22:30] and [2019-04-28 14:23:102019-04-28 14:23:30] because the time difference between 2019-04-28 14:22:30 and 2019-04-28 14:23:10 is 40 seconds, which exceeds the time interval limit of 12 seconds. The primary key, i.e. timestamp, is used to determine which session window a row belongs to. As shown in the figure below, if the limit of time interval for the session window is specified as 12 seconds, then the 6 rows in the figure constitutes 2 time windows, [2019-04-28 14:22:102019-04-28 14:22:30] and [2019-04-28 14:23:102019-04-28 14:23:30] because the time difference between 2019-04-28 14:22:30 and 2019-04-28 14:23:10 is 40 seconds, which exceeds the time interval limit of 12 seconds.

View File

@ -4,7 +4,7 @@ sidebar_label: REST API
description: 详细介绍 TDengine 提供的 RESTful API. description: 详细介绍 TDengine 提供的 RESTful API.
--- ---
为支持各种不同类型平台的开发TDengine 提供符合 REST 设计标准的 API即 REST API。为最大程度降低学习成本不同于其他数据库 REST API 的设计方法TDengine 直接通过 HTTP POST 请求 BODY 中包含的 SQL 语句来操作数据库,仅需要一个 URL。REST 连接器的使用参见 [视频教程](https://www.taosdata.com/blog/2020/11/11/1965.html)。 为支持各种不同类型平台的开发TDengine 提供符合 RESTful 设计标准的 API即 REST API。为最大程度降低学习成本不同于其他数据库 REST API 的设计方法TDengine 直接通过 HTTP POST 请求 BODY 中包含的 SQL 语句来操作数据库,仅需要一个 URL。REST API 的使用参见 [视频教程](https://www.taosdata.com/blog/2020/11/11/1965.html)。
:::note :::note
与原生连接器的一个区别是RESTful 接口是无状态的,因此 `USE db_name` 指令没有效果,所有对表名、超级表名的引用都需要指定数据库名前缀。支持在 RESTful URL 中指定 db_name这时如果 SQL 语句中没有指定数据库名前缀的话,会使用 URL 中指定的这个 db_name。 与原生连接器的一个区别是RESTful 接口是无状态的,因此 `USE db_name` 指令没有效果,所有对表名、超级表名的引用都需要指定数据库名前缀。支持在 RESTful URL 中指定 db_name这时如果 SQL 语句中没有指定数据库名前缀的话,会使用 URL 中指定的这个 db_name。
@ -18,7 +18,7 @@ RESTful 接口不依赖于任何 TDengine 的库,因此客户端不需要安
在已经安装 TDengine 服务器端的情况下,可以按照如下方式进行验证。 在已经安装 TDengine 服务器端的情况下,可以按照如下方式进行验证。
下面以 Ubuntu 环境中使用 curl 工具(确认已经安装)来验证 RESTful 接口的正常,验证前请确认 taosAdapter 服务已开启,在 Linux 系统上此服务默认由 systemd 管理,使用命令 `systemctl start taosadapter` 启动。 下面以 Ubuntu 环境中使用 `curl` 工具(请确认已经安装)来验证 RESTful 接口是否工作正常,验证前请确认 taosAdapter 服务已开启,在 Linux 系统上此服务默认由 systemd 管理,使用命令 `systemctl start taosadapter` 启动。
下面示例是列出所有的数据库,请把 h1.taosdata.com 和 6041缺省值替换为实际运行的 TDengine 服务 FQDN 和端口号: 下面示例是列出所有的数据库,请把 h1.taosdata.com 和 6041缺省值替换为实际运行的 TDengine 服务 FQDN 和端口号:

View File

@ -0,0 +1,38 @@
---
title: Schemaless API
sidebar_label: Schemaless API
description: 详细介绍 TDengine 提供的 Schemaless API.
---
TDengine 提供了兼容 InfluxDB (v1) 和 OpenTSDB 行协议的 Schemaless API。支持 InfluxDBv1) 或 OpenTSDB 行协议写入数据的第三方软件无需修改代码,只要修改配置的 EndPoint URL 就可以直接把数据写入 TDengine 数据库。
### 兼容 InfluxDB 行协议写入的方法
您可以配置任何支持使用 InfluxDBv1 行协议的应用访问地址 `http://<fqdn>:6041/<APIEndPoint>` 来写入 InfluxDB 兼容格式的数据到 TDengine。EndPoint 如下:
```text
/influxdb/v1/write?<param1=value1>?<param2=value2>...
```
支持 InfluxDB 查询参数如下:
- `db` 指定 TDengine 使用的数据库名
- `precision` TDengine 使用的时间精度
- `u` TDengine 用户名
- `p` TDengine 密码
注意: 目前不支持 InfluxDB 的 token 验证方式,仅支持 Basic 验证和查询参数验证。
参考链接:[InfluxDB v1 写接口](https://docs.influxdata.com/influxdb/v2.0/reference/api/influxdb-1x/write/)
### 兼容 OpenTSDB 行协议写入的方法
您可以配置任何支持 OpenTSDB 行协议的应用访问地址 `http://<fqdn>:6041/<APIEndPoint>` 来写入 OpenTSDB 兼容格式的数据到 TDengine。EndPoint 如下:
```text
/opentsdb/v1/put/json/<db>
/opentsdb/v1/put/telnet/<db>
```
参考链接:
- [OpenTSDB JSON](http://opentsdb.net/docs/build/html/api_http/put.html)
- [OpenTSDB Telnet](http://opentsdb.net/docs/build/html/api_telnet/put.html)

View File

@ -46,7 +46,7 @@ SELECT select_list FROM tb_name
### 窗口子句的规则 ### 窗口子句的规则
- 窗口子句位于数据切分子句之后,GROUP BY 子句之前,且不可以和 GROUP BY 子句一起使用。 - 窗口子句位于数据切分子句之后,不可以和 GROUP BY 子句一起使用。
- 窗口子句将数据按窗口进行切分,对每个窗口进行 SELECT 列表中的表达式的计算SELECT 列表中的表达式只能包含: - 窗口子句将数据按窗口进行切分,对每个窗口进行 SELECT 列表中的表达式的计算SELECT 列表中的表达式只能包含:
- 常量。 - 常量。
- _wstart伪列、_wend伪列和_wduration伪列。 - _wstart伪列、_wend伪列和_wduration伪列。
@ -71,7 +71,7 @@ FILL 语句指定某一窗口区间数据缺失的情况下的填充模式。填
1. 使用 FILL 语句的时候可能生成大量的填充输出,务必指定查询的时间区间。针对每次查询,系统可返回不超过 1 千万条具有插值的结果。 1. 使用 FILL 语句的时候可能生成大量的填充输出,务必指定查询的时间区间。针对每次查询,系统可返回不超过 1 千万条具有插值的结果。
2. 在时间维度聚合中,返回的结果中时间序列严格单调递增。 2. 在时间维度聚合中,返回的结果中时间序列严格单调递增。
3. 如果查询对象是超级表,则聚合函数会作用于该超级表下满足值过滤条件的所有表的数据。如果查询中没有使用 PARTITION BY 语句,则返回的结果按照时间序列严格单调递增;如果查询中使用了 PARTITION BY 语句分组,则返回结果中每个 PARTITION 内按照时间序列严格单调递增。 3. 如果查询对象是超级表,则聚合函数会作用于该超级表下满足值过滤条件的所有表的数据。如果查询中没有使用 PARTITION BY 语句,则返回的结果按照时间序列严格单调递增;如果查询中使用了 PARTITION BY 语句分组,则返回结果中每个 PARTITION 内按照时间序列严格单调递增。
::: :::
@ -113,6 +113,12 @@ SELECT COUNT(*) FROM temp_tb_1 INTERVAL(1m) SLIDING(2m);
SELECT COUNT(*), FIRST(ts), status FROM temp_tb_1 STATE_WINDOW(status); SELECT COUNT(*), FIRST(ts), status FROM temp_tb_1 STATE_WINDOW(status);
``` ```
仅关心 status 为 2 时的状态窗口的信息。例如:
```
SELECT * FROM (SELECT COUNT(*) AS cnt, FIRST(ts) AS fst, status FROM temp_tb_1 STATE_WINDOW(status)) t WHERE status = 2;
```
### 会话窗口 ### 会话窗口
会话窗口根据记录的时间戳主键的值来确定是否属于同一个会话。如下图所示,如果设置时间戳的连续的间隔小于等于 12 秒,则以下 6 条记录构成 2 个会话窗口,分别是:[2019-04-28 14:22:102019-04-28 14:22:30]和[2019-04-28 14:23:102019-04-28 14:23:30]。因为 2019-04-28 14:22:30 与 2019-04-28 14:23:10 之间的时间间隔是 40 秒超过了连续时间间隔12 秒)。 会话窗口根据记录的时间戳主键的值来确定是否属于同一个会话。如下图所示,如果设置时间戳的连续的间隔小于等于 12 秒,则以下 6 条记录构成 2 个会话窗口,分别是:[2019-04-28 14:22:102019-04-28 14:22:30]和[2019-04-28 14:23:102019-04-28 14:23:30]。因为 2019-04-28 14:22:30 与 2019-04-28 14:23:10 之间的时间间隔是 40 秒超过了连续时间间隔12 秒)。

View File

@ -196,7 +196,7 @@ AllowWebSockets
- `u` TDengine 用户名 - `u` TDengine 用户名
- `p` TDengine 密码 - `p` TDengine 密码
注意: 目前不支持 InfluxDB 的 token 验证方式支持 Basic 验证和查询参数验证。 注意: 目前不支持 InfluxDB 的 token 验证方式,仅支持 Basic 验证和查询参数验证。
### OpenTSDB ### OpenTSDB

View File

@ -2956,7 +2956,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
} }
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
// taosMemoryFree(pSubTopicEp->schema.pSchema); if (pSubTopicEp->schema.nCols) taosMemoryFreeClear(pSubTopicEp->schema.pSchema);
taosArrayDestroy(pSubTopicEp->vgs); taosArrayDestroy(pSubTopicEp->vgs);
} }

View File

@ -44,7 +44,6 @@ typedef struct {
TBC* pCur; TBC* pCur;
} SStreamStateCur; } SStreamStateCur;
#if 1
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen); int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen);
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen); int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key); int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key);
@ -69,8 +68,6 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -438,6 +438,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
} }
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD)); pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD)); pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
ASSERT(numOfCols == pResInfo->numOfCols);
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
pResInfo->fields[i].bytes = pSchema[i].bytes; pResInfo->fields[i].bytes = pSchema[i].bytes;

View File

@ -34,6 +34,7 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
removeMeta(pRequest->pTscObj, pRequest->targetTableList); removeMeta(pRequest->pTscObj, pRequest->targetTableList);
} }
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
@ -46,6 +47,7 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param; SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); tsem_post(&pRequest->body.rspSem);
@ -62,6 +64,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (delta > timestampDeltaLimit) { if (delta > timestampDeltaLimit) {
code = TSDB_CODE_TIME_UNSYNCED; code = TSDB_CODE_TIME_UNSYNCED;
tscError("time diff:%ds is too big", delta); tscError("time diff:%ds is too big", delta);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
setErrno(pRequest, code); setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem); tsem_post(&pRequest->body.rspSem);
@ -70,6 +73,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
/*assert(connectRsp.epSet.numOfEps > 0);*/ /*assert(connectRsp.epSet.numOfEps > 0);*/
if (connectRsp.epSet.numOfEps == 0) { if (connectRsp.epSet.numOfEps == 0) {
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
setErrno(pRequest, TSDB_CODE_MND_APP_ERROR); setErrno(pRequest, TSDB_CODE_MND_APP_ERROR);
tsem_post(&pRequest->body.rspSem); tsem_post(&pRequest->body.rspSem);
@ -114,6 +118,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
pTscObj->pAppInfo->numOfConns); pTscObj->pAppInfo->numOfConns);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tsem_post(&pRequest->body.rspSem); tsem_post(&pRequest->body.rspSem);
return 0; return 0;
} }
@ -137,6 +142,7 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
// todo rsp with the vnode id list // todo rsp with the vnode id list
SRequestObj* pRequest = param; SRequestObj* pRequest = param;
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code); setErrno(pRequest, code);
} }
@ -173,6 +179,7 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
setErrno(pRequest, code); setErrno(pRequest, code);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
@ -220,6 +227,7 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
setConnectionDB(pRequest->pTscObj, db); setConnectionDB(pRequest->pTscObj, db);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code); pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code);
@ -246,6 +254,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
pRequest->body.resInfo.execRes.res = createRsp.pMeta; pRequest->body.resInfo.execRes.res = createRsp.pMeta;
} }
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
@ -284,6 +293,7 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
} }
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
@ -309,6 +319,7 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
} }
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
SExecResult* pRes = &pRequest->body.resInfo.execRes; SExecResult* pRes = &pRequest->body.resInfo.execRes;
@ -420,6 +431,7 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
} }
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (pRequest->body.queryFp != NULL) { if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);

View File

@ -841,7 +841,7 @@ void tmqFreeImpl(void* handle) {
int32_t sz = taosArrayGetSize(tmq->clientTopics); int32_t sz = taosArrayGetSize(tmq->clientTopics);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (pTopic->schema.nCols) taosMemoryFree(pTopic->schema.pSchema); if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
int32_t vgSz = taosArrayGetSize(pTopic->vgs); int32_t vgSz = taosArrayGetSize(pTopic->vgs);
taosArrayDestroy(pTopic->vgs); taosArrayDestroy(pTopic->vgs);
} }
@ -1077,6 +1077,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tsem_destroy(&pParam->rspSem); tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam); taosMemoryFree(pParam);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return -1; return -1;
} }
@ -1115,6 +1116,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tmqEpoch); tmqEpoch);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
return 0; return 0;
} }
@ -1128,6 +1130,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
if (pRspWrapper == NULL) { if (pRspWrapper == NULL) {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch); tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
goto CREATE_MSG_FAIL; goto CREATE_MSG_FAIL;
} }
@ -1164,6 +1167,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosWriteQitem(tmq->mqueue, pRspWrapper); taosWriteQitem(tmq->mqueue, pRspWrapper);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
@ -1218,6 +1222,8 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
SMqClientTopic topic = {0}; SMqClientTopic topic = {0};
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
topic.schema = pTopicEp->schema; topic.schema = pTopicEp->schema;
pTopicEp->schema.nCols = 0;
pTopicEp->schema.pSchema = NULL;
tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN); tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN); tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
@ -1251,7 +1257,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
int32_t sz = taosArrayGetSize(tmq->clientTopics); int32_t sz = taosArrayGetSize(tmq->clientTopics);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (pTopic->schema.nCols) taosMemoryFree(pTopic->schema.pSchema); if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
int32_t vgSz = taosArrayGetSize(pTopic->vgs); int32_t vgSz = taosArrayGetSize(pTopic->vgs);
taosArrayDestroy(pTopic->vgs); taosArrayDestroy(pTopic->vgs);
} }

View File

@ -270,7 +270,7 @@ int32_t dmInitClient(SDnode *pDnode) {
SRpcInit rpcInit = {0}; SRpcInit rpcInit = {0};
rpcInit.label = "DND-C"; rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 4;
rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;

View File

@ -487,6 +487,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256); tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
taosArrayDestroy(pConsumerNew->rebNewTopics);
pConsumerNew->rebNewTopics = newSub; pConsumerNew->rebNewTopics = newSub;
subscribe.topicNames = NULL; subscribe.topicNames = NULL;

View File

@ -145,7 +145,10 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
} }
void tDeleteSMqVgEp(SMqVgEp *pVgEp) { void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
if (pVgEp->qmsg) taosMemoryFree(pVgEp->qmsg); if (pVgEp) {
taosMemoryFreeClear(pVgEp->qmsg);
taosMemoryFree(pVgEp);
}
} }
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
@ -200,18 +203,10 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L
} }
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) { void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
if (pConsumer->currentTopics) {
taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
}
if (pConsumer->rebNewTopics) {
taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
}
if (pConsumer->rebRemovedTopics) {
taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
}
if (pConsumer->assignedTopics) {
taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
}
} }
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
@ -428,6 +423,13 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
} }
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
}
taosHashCleanup(pSub->consumerHash); taosHashCleanup(pSub->consumerHash);
taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp); taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
} }

View File

@ -1187,6 +1187,7 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
if (pCol->colId > 0 && pCol->colId == colId) { if (pCol->colId > 0 && pCol->colId == colId) {
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
nodesDestroyList(pNodeList);
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC; terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId); mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
return -1; return -1;
@ -1197,6 +1198,7 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
NEXT: NEXT:
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
nodesDestroyList(pNodeList);
} }
return 0; return 0;
} }
@ -1228,6 +1230,7 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
if (pCol->colId > 0 && pCol->colId == colId) { if (pCol->colId > 0 && pCol->colId == colId) {
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
nodesDestroyList(pNodeList);
terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED; terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
mError("stream:%s, check colId:%d conflicted", pStream->name, pCol->colId); mError("stream:%s, check colId:%d conflicted", pStream->name, pCol->colId);
return -1; return -1;
@ -1238,6 +1241,7 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
NEXT: NEXT:
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
nodesDestroyList(pNodeList);
} }
return 0; return 0;
} }
@ -1275,6 +1279,7 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName,
if ((pCol->colId) > 0 && (pCol->colId == colId)) { if ((pCol->colId) > 0 && (pCol->colId == colId)) {
sdbRelease(pSdb, pSma); sdbRelease(pSdb, pSma);
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
nodesDestroyList(pNodeList);
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA; terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA;
mError("tsma:%s, check colId:%d conflicted", pSma->name, pCol->colId); mError("tsma:%s, check colId:%d conflicted", pSma->name, pCol->colId);
return -1; return -1;
@ -1285,6 +1290,7 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName,
NEXT: NEXT:
sdbRelease(pSdb, pSma); sdbRelease(pSdb, pSma);
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
nodesDestroyList(pNodeList);
} }
return 0; return 0;
} }
@ -1774,7 +1780,7 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, i
return 0; return 0;
} }
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char* dbFName, char* stbFName, void **pCont, int32_t *pLen) { int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char *dbFName, char *stbFName, void **pCont, int32_t *pLen) {
int32_t ret = -1; int32_t ret = -1;
SDbObj *pDb = mndAcquireDb(pMnode, dbFName); SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
if (NULL == pDb) { if (NULL == pDb) {
@ -1834,7 +1840,6 @@ _OVER:
return ret; return ret;
} }
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
void *alterOriData, int32_t alterOriDataLen) { void *alterOriData, int32_t alterOriDataLen) {
int32_t code = -1; int32_t code = -1;
@ -2091,6 +2096,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
if (pCol->tableId == suid) { if (pCol->tableId == suid) {
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
nodesDestroyList(pNodeList);
return -1; return -1;
} else { } else {
goto NEXT; goto NEXT;
@ -2099,6 +2105,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
NEXT: NEXT:
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
nodesDestroyList(pNodeList);
} }
return 0; return 0;
} }
@ -2136,6 +2143,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
if (pCol->tableId == suid) { if (pCol->tableId == suid) {
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
nodesDestroyList(pNodeList);
return -1; return -1;
} else { } else {
goto NEXT; goto NEXT;
@ -2144,6 +2152,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
NEXT: NEXT:
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
nodesDestroyList(pNodeList);
} }
return 0; return 0;
} }

View File

@ -490,8 +490,12 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
mndReleaseConsumer(pMnode, pConsumerOld); mndReleaseConsumer(pMnode, pConsumerOld);
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
ASSERT(0); ASSERT(0);
tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew);
goto REB_FAIL; goto REB_FAIL;
} }
tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew);
} }
// 3.3 set removed consumer // 3.3 set removed consumer
@ -509,8 +513,12 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
mndReleaseConsumer(pMnode, pConsumerOld); mndReleaseConsumer(pMnode, pConsumerOld);
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
ASSERT(0); ASSERT(0);
tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew);
goto REB_FAIL; goto REB_FAIL;
} }
tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew);
} }
#if 0 #if 0
if (consumerNum) { if (consumerNum) {

View File

@ -224,6 +224,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) { if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
goto TOPIC_DECODE_OVER; goto TOPIC_DECODE_OVER;
} }
taosMemoryFree(buf);
} else { } else {
pTopic->schema.nCols = 0; pTopic->schema.nCols = 0;
pTopic->schema.version = 0; pTopic->schema.version = 0;
@ -266,6 +267,11 @@ static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) { static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
mTrace("topic:%s, perform delete action", pTopic->name); mTrace("topic:%s, perform delete action", pTopic->name);
taosMemoryFreeClear(pTopic->sql);
taosMemoryFreeClear(pTopic->ast);
taosMemoryFreeClear(pTopic->physicalPlan);
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
taosArrayDestroy(pTopic->ntbColIds);
return 0; return 0;
} }
@ -347,6 +353,7 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
} }
} }
} }
nodesDestroyList(pNodeList);
return 0; return 0;
} }
@ -416,6 +423,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
taosMemoryFree(topicObj.sql); taosMemoryFree(topicObj.sql);
return -1; return -1;
} }
nodesDestroyNode(pAst);
nodesDestroyNode((SNode *)pPlan);
} else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) { } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName); SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
if (pStb == NULL) { if (pStb == NULL) {
@ -512,6 +521,10 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
} }
taosMemoryFreeClear(topicObj.physicalPlan); taosMemoryFreeClear(topicObj.physicalPlan);
taosMemoryFreeClear(topicObj.sql);
taosMemoryFreeClear(topicObj.ast);
taosArrayDestroy(topicObj.ntbColIds);
if (topicObj.schema.nCols) taosMemoryFreeClear(topicObj.schema.pSchema);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }

View File

@ -147,6 +147,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
taosHashCancelIterate(pStore->pHash, pIter); taosHashCancelIterate(pStore->pHash, pIter);
return -1; return -1;
} }
taosMemoryFree(buf);
} }
// close and rename file // close and rename file
taosCloseFile(&pFile); taosCloseFile(&pFile);

View File

@ -13,15 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "trpc.h"
#include "query.h"
#include "tname.h"
#include "catalogInt.h" #include "catalogInt.h"
#include "query.h"
#include "systable.h" #include "systable.h"
#include "tname.h"
#include "tref.h" #include "tref.h"
#include "trpc.h"
int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetTbMetaTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SName *name = (SName*)param; SName* name = (SName*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_META; task.type = CTG_TASK_GET_TB_META;
@ -45,13 +45,14 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetTbMetasTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SName *name = (SName*)param; SName* name = (SName*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_META_BATCH; task.type = CTG_TASK_GET_TB_META_BATCH;
@ -69,14 +70,14 @@ int32_t ctgInitGetTbMetasTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx,
pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum); ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetDbVgTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
char *dbFName = (char*)param; char* dbFName = (char*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_DB_VGROUP; task.type = CTG_TASK_GET_DB_VGROUP;
@ -94,13 +95,14 @@ int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), dbFName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetDbCfgTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
char *dbFName = (char*)param; char* dbFName = (char*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_DB_CFG; task.type = CTG_TASK_GET_DB_CFG;
@ -118,13 +120,14 @@ int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), dbFName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetDbInfoTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
char *dbFName = (char*)param; char* dbFName = (char*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_DB_INFO; task.type = CTG_TASK_GET_DB_INFO;
@ -142,14 +145,14 @@ int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), dbFName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetTbHashTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, void* param) { SName* name = (SName*)param;
SName *name = (SName*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_HASH; task.type = CTG_TASK_GET_TB_HASH;
@ -173,13 +176,14 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tableName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetTbHashsTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SName *name = (SName*)param; SName* name = (SName*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_HASH_BATCH; task.type = CTG_TASK_GET_TB_HASH_BATCH;
@ -197,14 +201,13 @@ int32_t ctgInitGetTbHashsTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx,
pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum); ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetQnodeTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_QNODE; task.type = CTG_TASK_GET_QNODE;
@ -219,7 +222,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetDnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetDnodeTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_DNODE; task.type = CTG_TASK_GET_DNODE;
@ -234,8 +237,8 @@ int32_t ctgInitGetDnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
char *name = (char*)param; char* name = (char*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_INDEX; task.type = CTG_TASK_GET_INDEX;
@ -253,13 +256,14 @@ int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, indexFName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetUdfTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
char *name = (char*)param; char* name = (char*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_UDF; task.type = CTG_TASK_GET_UDF;
@ -277,13 +281,14 @@ int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, udfName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, udfName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetUserTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SUserAuthInfo *user = (SUserAuthInfo*)param; SUserAuthInfo* user = (SUserAuthInfo*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_USER; task.type = CTG_TASK_GET_USER;
@ -301,12 +306,13 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, user:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), user->user); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, user:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), user->user);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetSvrVerTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetSvrVerTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_SVR_VER; task.type = CTG_TASK_GET_SVR_VER;
@ -320,8 +326,8 @@ int32_t ctgInitGetSvrVerTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetTbIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SName *name = (SName*)param; SName* name = (SName*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_INDEX; task.type = CTG_TASK_GET_TB_INDEX;
@ -344,13 +350,14 @@ int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitGetTbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { int32_t ctgInitGetTbCfgTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SName *name = (SName*)param; SName* name = (SName*)param;
SCtgTask task = {0}; SCtgTask task = {0};
task.type = CTG_TASK_GET_TB_CFG; task.type = CTG_TASK_GET_TB_CFG;
@ -373,13 +380,13 @@ int32_t ctgInitGetTbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, const SCatalogReq* pReq) {
int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, const SCatalogReq* pReq) {
SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
SHashObj* pTb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); SHashObj* pTb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == pDb || NULL == pTb) { if (NULL == pDb || NULL == pTb) {
@ -455,7 +462,6 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con
taosHashCleanup(pTb); taosHashCleanup(pTb);
for (int32_t i = 0; i < pJob->tbIndexNum; ++i) { for (int32_t i = 0; i < pJob->tbIndexNum; ++i) {
SName* name = taosArrayGet(pReq->pTableIndex, i); SName* name = taosArrayGet(pReq->pTableIndex, i);
ctgDropTbIndexEnqueue(pCtg, name, true); ctgDropTbIndexEnqueue(pCtg, name, true);
@ -464,7 +470,7 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *taskId) { int32_t ctgInitTask(SCtgJob* pJob, CTG_TASK_TYPE type, void* param, int32_t* taskId) {
int32_t tid = atomic_fetch_add_32(&pJob->taskIdx, 1); int32_t tid = atomic_fetch_add_32(&pJob->taskIdx, 1);
CTG_LOCK(CTG_WRITE, &pJob->taskLock); CTG_LOCK(CTG_WRITE, &pJob->taskLock);
@ -478,7 +484,8 @@ int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *tas
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param) { int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp,
void* param) {
int32_t code = 0; int32_t code = 0;
int32_t tbMetaNum = (int32_t)ctgGetTablesReqNum(pReq->pTableMeta); int32_t tbMetaNum = (int32_t)ctgGetTablesReqNum(pReq->pTableMeta);
int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup); int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup);
@ -494,7 +501,8 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex); int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg); int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum; int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum +
userNum + dbInfoNum + tbIndexNum + tbCfgNum;
*job = taosMemoryCalloc(1, sizeof(SCtgJob)); *job = taosMemoryCalloc(1, sizeof(SCtgJob));
if (NULL == *job) { if (NULL == *job) {
@ -502,7 +510,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
SCtgJob *pJob = *job; SCtgJob* pJob = *job;
pJob->subTaskNum = taskNum; pJob->subTaskNum = taskNum;
pJob->queryId = pConn->requestId; pJob->queryId = pConn->requestId;
@ -526,7 +534,8 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
pJob->svrVerNum = svrVerNum; pJob->svrVerNum = svrVerNum;
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
pJob->pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); pJob->pBatchs =
taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == pJob->pBatchs) { if (NULL == pJob->pBatchs) {
ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM); ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
@ -625,10 +634,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId); taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId, taskNum, pReq->forceUpdate); qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId,
taskNum, pReq->forceUpdate);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
ctgFreeJob(*job); ctgFreeJob(*job);
@ -658,7 +667,6 @@ int32_t ctgDumpTbMetasRes(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgDumpDbVgRes(SCtgTask* pTask) { int32_t ctgDumpDbVgRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pDbVgroup) { if (NULL == pJob->jobRes.pDbVgroup) {
@ -772,7 +780,6 @@ int32_t ctgDumpDnodeRes(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgDumpDbCfgRes(SCtgTask* pTask) { int32_t ctgDumpDbCfgRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pDbCfg) { if (NULL == pJob->jobRes.pDbCfg) {
@ -848,14 +855,14 @@ int32_t ctgDumpSvrVer(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgCallSubCb(SCtgTask *pTask) { int32_t ctgCallSubCb(SCtgTask* pTask) {
int32_t code = 0; int32_t code = 0;
CTG_LOCK(CTG_WRITE, &pTask->lock); CTG_LOCK(CTG_WRITE, &pTask->lock);
int32_t parentNum = taosArrayGetSize(pTask->pParents); int32_t parentNum = taosArrayGetSize(pTask->pParents);
for (int32_t i = 0; i < parentNum; ++i) { for (int32_t i = 0; i < parentNum; ++i) {
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
SCtgTask* pParent = taosArrayGetP(pTask->pParents, i); SCtgTask* pParent = taosArrayGetP(pTask->pParents, i);
pParent->subRes.code = pTask->code; pParent->subRes.code = pTask->code;
@ -866,7 +873,7 @@ int32_t ctgCallSubCb(SCtgTask *pTask) {
} }
} }
SCtgMsgCtx *pParMsgCtx = CTG_GET_TASK_MSGCTX(pParent, -1); SCtgMsgCtx* pParMsgCtx = CTG_GET_TASK_MSGCTX(pParent, -1);
pParMsgCtx->pBatchs = pMsgCtx->pBatchs; pParMsgCtx->pBatchs = pMsgCtx->pBatchs;
CTG_ERR_JRET(pParent->subRes.fp(pParent)); CTG_ERR_JRET(pParent->subRes.fp(pParent));
@ -910,7 +917,8 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1); int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1);
if (taskDone < taosArrayGetSize(pJob->pTasks)) { if (taskDone < taosArrayGetSize(pJob->pTasks)) {
qDebug("QID:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone, (int32_t)taosArrayGetSize(pJob->pTasks)); qDebug("QID:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone,
(int32_t)taosArrayGetSize(pJob->pTasks));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -920,17 +928,17 @@ _return:
pJob->jobResCode = code; pJob->jobResCode = code;
//taosSsleep(2); // taosSsleep(2);
//qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId); // qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId);
taosAsyncExec(ctgCallUserCb, pJob, NULL); taosAsyncExec(ctgCallUserCb, pJob, NULL);
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgDBCache *dbCache = NULL; SCtgDBCache* dbCache = NULL;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
@ -1057,7 +1065,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
} }
/* /*
else if (CTG_IS_META_CTABLE(pOut->metaType)) { else if (CTG_IS_META_CTABLE(pOut->metaType)) {
SName stbName = *pName; SName stbName = *pName;
strcpy(stbName.tname, pOut->tbName); strcpy(stbName.tname, pOut->tbName);
@ -1075,7 +1083,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
} }
*/ */
TSWAP(pTask->res, pOut->tbMeta); TSWAP(pTask->res, pOut->tbMeta);
@ -1092,10 +1100,9 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgDBCache *dbCache = NULL; SCtgDBCache* dbCache = NULL;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
@ -1225,7 +1232,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
} }
/* /*
else if (CTG_IS_META_CTABLE(pOut->metaType)) { else if (CTG_IS_META_CTABLE(pOut->metaType)) {
SName stbName = *pName; SName stbName = *pName;
strcpy(stbName.tname, pOut->tbName); strcpy(stbName.tname, pOut->tbName);
@ -1243,7 +1250,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
} }
*/ */
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx); SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
pRes->code = 0; pRes->code = 0;
@ -1277,8 +1284,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCtgDbVgCtx* ctx = (SCtgDbVgCtx*)pTask->taskCtx; SCtgDbVgCtx* ctx = (SCtgDbVgCtx*)pTask->taskCtx;
@ -1304,7 +1310,6 @@ int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *
break; break;
} }
_return: _return:
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
@ -1312,7 +1317,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCtgTbHashCtx* ctx = (SCtgTbHashCtx*)pTask->taskCtx; SCtgTbHashCtx* ctx = (SCtgTbHashCtx*)pTask->taskCtx;
@ -1342,7 +1347,6 @@ int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
break; break;
} }
_return: _return:
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
@ -1350,7 +1354,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCtgTbHashsCtx* ctx = (SCtgTbHashsCtx*)pTask->taskCtx; SCtgTbHashsCtx* ctx = (SCtgTbHashsCtx*)pTask->taskCtx;
@ -1390,7 +1394,7 @@ _return:
STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx); STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx);
int32_t num = taosArrayGetSize(pReq->pTables); int32_t num = taosArrayGetSize(pReq->pTables);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SMetaRes *pRes = taosArrayGet(ctx->pResList, pFetch->resIdx + i); SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx + i);
pRes->code = code; pRes->code = code;
pRes->pRes = NULL; pRes->pRes = NULL;
} }
@ -1408,8 +1412,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t ctgHandleGetTbIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
@ -1432,7 +1435,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetTbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
@ -1446,7 +1449,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetDbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetDbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
@ -1460,12 +1463,11 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetDbInfoRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetDbInfoRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
CTG_RET(TSDB_CODE_APP_ERROR); CTG_RET(TSDB_CODE_APP_ERROR);
} }
int32_t ctgHandleGetQnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t ctgHandleGetQnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
@ -1479,7 +1481,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetDnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetDnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
@ -1493,7 +1495,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
@ -1507,7 +1509,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetUdfRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetUdfRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
@ -1521,7 +1523,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetUserRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetUserRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCtgUserCtx* ctx = (SCtgUserCtx*)pTask->taskCtx; SCtgUserCtx* ctx = (SCtgUserCtx*)pTask->taskCtx;
@ -1541,9 +1543,11 @@ int32_t ctgHandleGetUserRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *
goto _return; goto _return;
} }
if (ctx->user.type == AUTH_TYPE_READ && pOut->readDbs && taosHashGet(pOut->readDbs, ctx->user.dbFName, strlen(ctx->user.dbFName))) { if (ctx->user.type == AUTH_TYPE_READ && pOut->readDbs &&
taosHashGet(pOut->readDbs, ctx->user.dbFName, strlen(ctx->user.dbFName))) {
pass = true; pass = true;
} else if (ctx->user.type == AUTH_TYPE_WRITE && pOut->writeDbs && taosHashGet(pOut->writeDbs, ctx->user.dbFName, strlen(ctx->user.dbFName))) { } else if (ctx->user.type == AUTH_TYPE_WRITE && pOut->writeDbs &&
taosHashGet(pOut->writeDbs, ctx->user.dbFName, strlen(ctx->user.dbFName))) {
pass = true; pass = true;
} }
@ -1566,7 +1570,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetSvrVerRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetSvrVerRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
@ -1581,7 +1585,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq *tReq, int32_t flag, SName* pName, int32_t* vgId) { int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq* tReq, int32_t flag, SName* pName, int32_t* vgId) {
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
@ -1590,7 +1594,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq *tReq, int32_t flag, SName* pName, int
if (CTG_FLAG_IS_SYS_DB(flag)) { if (CTG_FLAG_IS_SYS_DB(flag)) {
ctgDebug("will refresh sys db tbmeta, tbName:%s", tNameGetTableName(pName)); ctgDebug("will refresh sys db tbmeta, tbName:%s", tNameGetTableName(pName));
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char *)pName->dbname, (char *)pName->tname, NULL, tReq)); CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char*)pName->dbname, (char*)pName->tname, NULL, tReq));
} }
if (CTG_FLAG_IS_STB(flag)) { if (CTG_FLAG_IS_STB(flag)) {
@ -1600,7 +1604,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq *tReq, int32_t flag, SName* pName, int
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq)); CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq));
} }
SCtgDBCache *dbCache = NULL; SCtgDBCache* dbCache = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pName, dbFName); tNameGetFullDbName(pName, dbFName);
@ -1631,11 +1635,11 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbMetaTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
@ -1655,7 +1659,7 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx;
@ -1685,7 +1689,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) {
for (int32_t i = 0; i < pCtx->fetchNum; ++i) { for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
SName* pName = ctgGetFetchName(pCtx->pNames, pFetch); SName* pName = ctgGetFetchName(pCtx->pNames, pFetch);
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
@ -1699,14 +1703,14 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) { int32_t ctgLaunchGetDbVgTask(SCtgTask* pTask) {
int32_t code = 0; int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgDBCache *dbCache = NULL; SCtgDBCache* dbCache = NULL;
SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx; SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
@ -1740,14 +1744,14 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbHashTask(SCtgTask* pTask) {
int32_t code = 0; int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgDBCache *dbCache = NULL; SCtgDBCache* dbCache = NULL;
SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx; SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
@ -1785,11 +1789,11 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbHashsTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbHashsCtx* pCtx = (SCtgTbHashsCtx*)pTask->taskCtx; SCtgTbHashsCtx* pCtx = (SCtgTbHashsCtx*)pTask->taskCtx;
SCtgDBCache *dbCache = NULL; SCtgDBCache* dbCache = NULL;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
int32_t dbNum = taosArrayGetSize(pCtx->pNames); int32_t dbNum = taosArrayGetSize(pCtx->pNames);
int32_t fetchIdx = 0; int32_t fetchIdx = 0;
@ -1805,7 +1809,8 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) {
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, &tReq, dbCache->vgCache.vgInfo, pCtx, pReq->dbFName, pReq->pTables, false)); CTG_ERR_JRET(
ctgGetVgInfosFromHashValue(pCtg, &tReq, dbCache->vgCache.vgInfo, pCtx, pReq->dbFName, pReq->pTables, false));
ctgReleaseVgInfoToCache(pCtg, dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
dbCache = NULL; dbCache = NULL;
@ -1833,7 +1838,7 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) {
for (int32_t i = 0; i < pCtx->fetchNum; ++i) { for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
@ -1858,15 +1863,14 @@ _return:
return code; return code;
} }
int32_t ctgLaunchGetTbIndexTask(SCtgTask* pTask) {
int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
int32_t code = 0; int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx; SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
SArray* pRes = NULL; SArray* pRes = NULL;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
@ -1883,7 +1887,7 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetTbCfgTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbCfgTask(SCtgTask* pTask) {
int32_t code = 0; int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
@ -1892,7 +1896,7 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask *pTask) {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pCtx->pName, dbFName); tNameGetFullDbName(pCtx->pName, dbFName);
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
@ -1930,12 +1934,11 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgLaunchGetQnodeTask(SCtgTask* pTask) {
int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
@ -1944,11 +1947,11 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) { int32_t ctgLaunchGetDnodeTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
@ -1957,13 +1960,12 @@ int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetDbCfgTask(SCtgTask* pTask) {
int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx; SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
@ -1973,13 +1975,13 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) { int32_t ctgLaunchGetDbInfoTask(SCtgTask* pTask) {
int32_t code = 0; int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SCtgDBCache *dbCache = NULL; SCtgDBCache* dbCache = NULL;
SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx; SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
@ -2013,12 +2015,12 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) { int32_t ctgLaunchGetIndexTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx; SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
@ -2028,12 +2030,12 @@ int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) { int32_t ctgLaunchGetUdfTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx; SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
@ -2043,14 +2045,14 @@ int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetUserTask(SCtgTask *pTask) { int32_t ctgLaunchGetUserTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx; SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx;
bool inCache = false; bool inCache = false;
bool pass = false; bool pass = false;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
@ -2072,11 +2074,11 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) { int32_t ctgLaunchGetSvrVerTask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
@ -2086,7 +2088,7 @@ int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) { int32_t ctgRelaunchGetTbMetaTask(SCtgTask* pTask) {
ctgResetTbMetaTask(pTask); ctgResetTbMetaTask(pTask);
CTG_ERR_RET(ctgLaunchGetTbMetaTask(pTask)); CTG_ERR_RET(ctgLaunchGetTbMetaTask(pTask));
@ -2094,7 +2096,7 @@ int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTbCfgCb(SCtgTask *pTask) { int32_t ctgGetTbCfgCb(SCtgTask* pTask) {
int32_t code = 0; int32_t code = 0;
CTG_ERR_JRET(pTask->subRes.code); CTG_ERR_JRET(pTask->subRes.code);
@ -2124,7 +2126,6 @@ int32_t ctgCompDbVgTasks(SCtgTask* pTask, void* param, bool* equal) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgCompTbMetaTasks(SCtgTask* pTask, void* param, bool* equal) { int32_t ctgCompTbMetaTasks(SCtgTask* pTask, void* param, bool* equal) {
SCtgTbMetaCtx* ctx = pTask->taskCtx; SCtgTbMetaCtx* ctx = pTask->taskCtx;
@ -2145,14 +2146,14 @@ int32_t ctgCloneDbVg(SCtgTask* pTask, void** pRes) {
CTG_RET(cloneDbVgInfo(pOut->dbVgroup, (SDBVgInfo**)pRes)); CTG_RET(cloneDbVgInfo(pOut->dbVgroup, (SDBVgInfo**)pRes));
} }
SCtgAsyncFps gCtgAsyncFps[] = { SCtgAsyncFps gCtgAsyncFps[] = {
{ctgInitGetQnodeTask, ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes, NULL, NULL}, {ctgInitGetQnodeTask, ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes, NULL, NULL},
{ctgInitGetDnodeTask, ctgLaunchGetDnodeTask, ctgHandleGetDnodeRsp, ctgDumpDnodeRes, NULL, NULL}, {ctgInitGetDnodeTask, ctgLaunchGetDnodeTask, ctgHandleGetDnodeRsp, ctgDumpDnodeRes, NULL, NULL},
{ctgInitGetDbVgTask, ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes, ctgCompDbVgTasks, ctgCloneDbVg}, {ctgInitGetDbVgTask, ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes, ctgCompDbVgTasks, ctgCloneDbVg},
{ctgInitGetDbCfgTask, ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes, NULL, NULL}, {ctgInitGetDbCfgTask, ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes, NULL, NULL},
{ctgInitGetDbInfoTask, ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes, NULL, NULL}, {ctgInitGetDbInfoTask, ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes, NULL, NULL},
{ctgInitGetTbMetaTask, ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes, ctgCompTbMetaTasks, ctgCloneTbMeta}, {ctgInitGetTbMetaTask, ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes, ctgCompTbMetaTasks,
ctgCloneTbMeta},
{ctgInitGetTbHashTask, ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes, NULL, NULL}, {ctgInitGetTbHashTask, ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes, NULL, NULL},
{ctgInitGetTbIndexTask, ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes, NULL, NULL}, {ctgInitGetTbIndexTask, ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes, NULL, NULL},
{ctgInitGetTbCfgTask, ctgLaunchGetTbCfgTask, ctgHandleGetTbCfgRsp, ctgDumpTbCfgRes, NULL, NULL}, {ctgInitGetTbCfgTask, ctgLaunchGetTbCfgTask, ctgHandleGetTbCfgRsp, ctgDumpTbCfgRes, NULL, NULL},
@ -2164,19 +2165,19 @@ SCtgAsyncFps gCtgAsyncFps[] = {
{ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL}, {ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL},
}; };
int32_t ctgMakeAsyncRes(SCtgJob *pJob) { int32_t ctgMakeAsyncRes(SCtgJob* pJob) {
int32_t code = 0; int32_t code = 0;
int32_t taskNum = taosArrayGetSize(pJob->pTasks); int32_t taskNum = taosArrayGetSize(pJob->pTasks);
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i); SCtgTask* pTask = taosArrayGet(pJob->pTasks, i);
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].dumpResFp)(pTask)); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].dumpResFp)(pTask));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgSearchExistingTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t* taskId) { int32_t ctgSearchExistingTask(SCtgJob* pJob, CTG_TASK_TYPE type, void* param, int32_t* taskId) {
bool equal = false; bool equal = false;
SCtgTask* pTask = NULL; SCtgTask* pTask = NULL;
int32_t code = 0; int32_t code = 0;
@ -2206,15 +2207,15 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) { int32_t ctgSetSubTaskCb(SCtgTask* pSub, SCtgTask* pTask) {
int32_t code = 0; int32_t code = 0;
CTG_LOCK(CTG_WRITE, &pSub->lock); CTG_LOCK(CTG_WRITE, &pSub->lock);
if (CTG_TASK_DONE == pSub->status) { if (CTG_TASK_DONE == pSub->status) {
pTask->subRes.code = pSub->code; pTask->subRes.code = pSub->code;
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res)); CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res));
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
SCtgMsgCtx *pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1); SCtgMsgCtx* pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1);
pMsgCtx->pBatchs = pSubMsgCtx->pBatchs; pMsgCtx->pBatchs = pSubMsgCtx->pBatchs;
CTG_ERR_JRET(pTask->subRes.fp(pTask)); CTG_ERR_JRET(pTask->subRes.fp(pTask));
@ -2233,8 +2234,7 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgLaunchSubTask(SCtgTask* pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param) {
int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
int32_t subTaskId = -1; int32_t subTaskId = -1;
bool newTask = false; bool newTask = false;
@ -2254,8 +2254,8 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask)); CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask));
if (newTask) { if (newTask) {
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
SCtgMsgCtx *pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1); SCtgMsgCtx* pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1);
pSubMsgCtx->pBatchs = pMsgCtx->pBatchs; pSubMsgCtx->pBatchs = pMsgCtx->pBatchs;
CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub)); CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub));
@ -2265,11 +2265,11 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchJob(SCtgJob *pJob) { int32_t ctgLaunchJob(SCtgJob* pJob) {
int32_t taskNum = taosArrayGetSize(pJob->pTasks); int32_t taskNum = taosArrayGetSize(pJob->pTasks);
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i); SCtgTask* pTask = taosArrayGet(pJob->pTasks, i);
qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId);
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
@ -2289,6 +2289,3 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -75,7 +75,8 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx);
pMsgCtx->pBatchs = pBatchs; pMsgCtx->pBatchs = pBatchs;
ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId, rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs); ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId,
rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs);
(*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); (*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
} }
@ -344,13 +345,14 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
pMsgCtx->pBatchs = pBatchs; pMsgCtx->pBatchs = pBatchs;
#endif #endif
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, cbParam->reqType, pMsg, rspCode)); CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, cbParam->reqType, pMsg, rspCode));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
@ -361,6 +363,7 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
_return: _return:
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (pJob) { if (pJob) {
taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId); taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
@ -442,8 +445,8 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTaskReq* tReq, int32_t msgType, void* msg, int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTaskReq* tReq, int32_t msgType,
uint32_t msgSize) { void* msg, uint32_t msgSize) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
@ -1066,7 +1069,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName, int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName,
STableMetaOutput* out, SCtgTaskReq* tReq) { STableMetaOutput* out, SCtgTaskReq* tReq) {
SCtgTask *pTask = tReq ? tReq->pTask : NULL; SCtgTask* pTask = tReq ? tReq->pTask : NULL;
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName}; SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
char* msg = NULL; char* msg = NULL;
SEpSet* pVnodeEpSet = NULL; SEpSet* pVnodeEpSet = NULL;
@ -1131,7 +1134,7 @@ int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo, int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo,
STableMetaOutput* out, SCtgTaskReq* tReq) { STableMetaOutput* out, SCtgTaskReq* tReq) {
SCtgTask *pTask = tReq ? tReq->pTask : NULL; SCtgTask* pTask = tReq ? tReq->pTask : NULL;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName); tNameGetFullDbName(pTableName, dbFName);
int32_t reqType = TDMT_VND_TABLE_META; int32_t reqType = TDMT_VND_TABLE_META;
@ -1243,7 +1246,8 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->pVgInfo->vgId, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->pVgInfo->vgId, reqType, msg,
msgLen));
#endif #endif
} }

View File

@ -93,6 +93,7 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
strcpy(pRes->tableName, pHandle->pDeleter->tableFName); strcpy(pRes->tableName, pHandle->pDeleter->tableFName);
strcpy(pRes->tsColName, pHandle->pDeleter->tsColName); strcpy(pRes->tsColName, pHandle->pDeleter->tsColName);
pRes->affectedRows = *(int64_t*)pColRes->pData; pRes->affectedRows = *(int64_t*)pColRes->pData;
if (pRes->affectedRows) { if (pRes->affectedRows) {
pRes->skey = *(int64_t*)pColSKey->pData; pRes->skey = *(int64_t*)pColSKey->pData;
pRes->ekey = *(int64_t*)pColEKey->pData; pRes->ekey = *(int64_t*)pColEKey->pData;
@ -102,6 +103,8 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey; pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
} }
qDebug("delete %ld rows, from %ld to %ld", pRes->affectedRows, pRes->skey, pRes->ekey);
pBuf->useSize += pEntry->dataLen; pBuf->useSize += pEntry->dataLen;
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen); atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);

View File

@ -268,7 +268,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
STableListInfo* pListInfo = &pTaskInfo->tableqinfoList; STableListInfo* pListInfo = &pTaskInfo->tableqinfoList;
if (isAdd) { if (isAdd) {
qDebug("add %d tables id into query list, %s", (int32_t) taosArrayGetSize(tableIdList), pTaskInfo->id.str); qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
} }
if (pListInfo->map == NULL) { if (pListInfo->map == NULL) {
@ -321,6 +321,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
} }
bool exists = false; bool exists = false;
#if 0
for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) { for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) {
STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k); STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k);
if (pKeyInfo->uid == keyInfo.uid) { if (pKeyInfo->uid == keyInfo.uid) {
@ -328,6 +329,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
exists = true; exists = true;
} }
} }
#endif
if (!exists) { if (!exists) {
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);

View File

@ -637,7 +637,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid); tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, pInfo->currentTable, pTaskInfo->id.str); qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables,
pInfo->currentTable, pTaskInfo->id.str);
tsdbReaderReset(pInfo->dataReader, &pInfo->cond); tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
@ -1332,6 +1333,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
isDeletedWindow(&win, pBlock->info.groupId, pInfo->windowSup.pIntervalAggSup); isDeletedWindow(&win, pBlock->info.groupId, pInfo->windowSup.pIntervalAggSup);
if ((update || closedWin) && out) { if ((update || closedWin) && out) {
qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
uint64_t gpId = closedWin && pInfo->partitionSup.needCalc uint64_t gpId = closedWin && pInfo->partitionSup.needCalc
? calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId) ? calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId)
: 0; : 0;

File diff suppressed because it is too large Load Diff

View File

@ -452,7 +452,7 @@ sma_stream_opt(A) ::= stream_options(B) WATERMARK duration_literal(C).
sma_stream_opt(A) ::= stream_options(B) MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; } sma_stream_opt(A) ::= stream_options(B) MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; }
/************************************************ create/drop topic ***************************************************/ /************************************************ create/drop topic ***************************************************/
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS query_expression(C). { pCxt->pRootNode = createCreateTopicStmtUseQuery(pCxt, A, &B, C); } cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS query_or_subquery(C). { pCxt->pRootNode = createCreateTopicStmtUseQuery(pCxt, A, &B, C); }
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, false); } cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, false); }
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B)
WITH META AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, true); } WITH META AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, true); }
@ -471,7 +471,7 @@ cmd ::= DESCRIBE full_table_name(A).
cmd ::= RESET QUERY CACHE. { pCxt->pRootNode = createResetQueryCacheStmt(pCxt); } cmd ::= RESET QUERY CACHE. { pCxt->pRootNode = createResetQueryCacheStmt(pCxt); }
/************************************************ explain *************************************************************/ /************************************************ explain *************************************************************/
cmd ::= EXPLAIN analyze_opt(A) explain_options(B) query_expression(C). { pCxt->pRootNode = createExplainStmt(pCxt, A, B, C); } cmd ::= EXPLAIN analyze_opt(A) explain_options(B) query_or_subquery(C). { pCxt->pRootNode = createExplainStmt(pCxt, A, B, C); }
%type analyze_opt { bool } %type analyze_opt { bool }
%destructor analyze_opt { } %destructor analyze_opt { }
@ -502,7 +502,7 @@ bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B).
/************************************************ create/drop stream **************************************************/ /************************************************ create/drop stream **************************************************/
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A)
stream_options(B) INTO full_table_name(C) AS query_expression(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, D); } stream_options(B) INTO full_table_name(C) AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, D); }
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); } cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
stream_options(A) ::= . { A = createStreamOptions(pCxt); } stream_options(A) ::= . { A = createStreamOptions(pCxt); }
@ -535,12 +535,12 @@ dnode_list(A) ::= dnode_list(B) DNODE NK_INTEGER(C).
cmd ::= DELETE FROM full_table_name(A) where_clause_opt(B). { pCxt->pRootNode = createDeleteStmt(pCxt, A, B); } cmd ::= DELETE FROM full_table_name(A) where_clause_opt(B). { pCxt->pRootNode = createDeleteStmt(pCxt, A, B); }
/************************************************ select **************************************************************/ /************************************************ select **************************************************************/
cmd ::= query_expression(A). { pCxt->pRootNode = A; } cmd ::= query_or_subquery(A). { pCxt->pRootNode = A; }
/************************************************ insert **************************************************************/ /************************************************ insert **************************************************************/
cmd ::= INSERT INTO full_table_name(A) cmd ::= INSERT INTO full_table_name(A)
NK_LP col_name_list(B) NK_RP query_expression(C). { pCxt->pRootNode = createInsertStmt(pCxt, A, B, C); } NK_LP col_name_list(B) NK_RP query_or_subquery(C). { pCxt->pRootNode = createInsertStmt(pCxt, A, B, C); }
cmd ::= INSERT INTO full_table_name(A) query_expression(B). { pCxt->pRootNode = createInsertStmt(pCxt, A, NULL, B); } cmd ::= INSERT INTO full_table_name(A) query_or_subquery(B). { pCxt->pRootNode = createInsertStmt(pCxt, A, NULL, B); }
/************************************************ literal *************************************************************/ /************************************************ literal *************************************************************/
literal(A) ::= NK_INTEGER(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &B)); } literal(A) ::= NK_INTEGER(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &B)); }
@ -936,28 +936,26 @@ every_opt(A) ::= .
every_opt(A) ::= EVERY NK_LP duration_literal(B) NK_RP. { A = releaseRawExprNode(pCxt, B); } every_opt(A) ::= EVERY NK_LP duration_literal(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
/************************************************ query_expression ****************************************************/ /************************************************ query_expression ****************************************************/
query_expression(A) ::= query_expression(A) ::= query_simple(B)
query_expression_body(B)
order_by_clause_opt(C) slimit_clause_opt(D) limit_clause_opt(E). { order_by_clause_opt(C) slimit_clause_opt(D) limit_clause_opt(E). {
A = addOrderByClause(pCxt, B, C); A = addOrderByClause(pCxt, B, C);
A = addSlimitClause(pCxt, A, D); A = addSlimitClause(pCxt, A, D);
A = addLimitClause(pCxt, A, E); A = addLimitClause(pCxt, A, E);
} }
query_expression_body(A) ::= query_primary(B). { A = B; } query_simple(A) ::= query_specification(B). { A = B; }
query_expression_body(A) ::= query_simple(A) ::= union_query_expression(B). { A = B; }
query_expression_body(B) UNION ALL query_expression_body(D). { A = createSetOperator(pCxt, SET_OP_TYPE_UNION_ALL, B, D); }
query_expression_body(A) ::=
query_expression_body(B) UNION query_expression_body(D). { A = createSetOperator(pCxt, SET_OP_TYPE_UNION, B, D); }
query_primary(A) ::= query_specification(B). { A = B; } union_query_expression(A) ::=
query_primary(A) ::= query_simple_or_subquery(B) UNION ALL query_simple_or_subquery(C). { A = createSetOperator(pCxt, SET_OP_TYPE_UNION_ALL, B, C); }
NK_LP query_expression_body(B) union_query_expression(A) ::=
order_by_clause_opt(C) slimit_clause_opt(D) limit_clause_opt(E) NK_RP. { query_simple_or_subquery(B) UNION query_simple_or_subquery(C). { A = createSetOperator(pCxt, SET_OP_TYPE_UNION, B, C); }
A = addOrderByClause(pCxt, B, C);
A = addSlimitClause(pCxt, A, D); query_simple_or_subquery(A) ::= query_simple(B). { A = B; }
A = addLimitClause(pCxt, A, E); query_simple_or_subquery(A) ::= subquery(B). { A = releaseRawExprNode(pCxt, B); }
}
query_or_subquery(A) ::= query_expression(B). { A = B; }
query_or_subquery(A) ::= subquery(B). { A = releaseRawExprNode(pCxt, B); }
%type order_by_clause_opt { SNodeList* } %type order_by_clause_opt { SNodeList* }
%destructor order_by_clause_opt { nodesDestroyList($$); } %destructor order_by_clause_opt { nodesDestroyList($$); }

File diff suppressed because it is too large Load Diff

View File

@ -44,6 +44,8 @@ TEST_F(PlanSetOpTest, unionAllWithSubquery) {
run("SELECT ts FROM (SELECT ts FROM st1s1) UNION ALL SELECT ts FROM (SELECT ts FROM st1s2)"); run("SELECT ts FROM (SELECT ts FROM st1s1) UNION ALL SELECT ts FROM (SELECT ts FROM st1s2)");
// super table // super table
run("SELECT ts FROM (SELECT ts FROM st1) UNION ALL SELECT ts FROM (SELECT ts FROM st1)"); run("SELECT ts FROM (SELECT ts FROM st1) UNION ALL SELECT ts FROM (SELECT ts FROM st1)");
run("(SELECT SERVER_STATUS()) UNION ALL (SELECT SERVER_STATUS())");
} }
TEST_F(PlanSetOpTest, unionAllWithOrderBy) { TEST_F(PlanSetOpTest, unionAllWithOrderBy) {

View File

@ -275,6 +275,7 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) {
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
qDestroyTask(otaskHandle); qDestroyTask(otaskHandle);
qDebug("task handle destryed");
} }
} }
@ -305,6 +306,7 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) {
if (ctx->sinkHandle) { if (ctx->sinkHandle) {
dsDestroyDataSinker(ctx->sinkHandle); dsDestroyDataSinker(ctx->sinkHandle);
ctx->sinkHandle = NULL; ctx->sinkHandle = NULL;
qDebug("sink handle destryed");
} }
} }
@ -452,6 +454,10 @@ void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksH
void qwDestroyImpl(void *pMgmt) { void qwDestroyImpl(void *pMgmt) {
SQWorker *mgmt = (SQWorker *)pMgmt; SQWorker *mgmt = (SQWorker *)pMgmt;
int8_t nodeType = mgmt->nodeType;
int32_t nodeId = mgmt->nodeId;
qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
taosTmrStop(mgmt->hbTimer); taosTmrStop(mgmt->hbTimer);
mgmt->hbTimer = NULL; mgmt->hbTimer = NULL;
@ -484,6 +490,8 @@ void qwDestroyImpl(void *pMgmt) {
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
qwCloseRef(); qwCloseRef();
qDebug("qworker destroyed, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
} }
int32_t qwOpenRef(void) { int32_t qwOpenRef(void) {

View File

@ -385,7 +385,6 @@ _return:
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
} }
int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;

View File

@ -1498,9 +1498,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
pCtx->ahandle = pReq->info.ahandle; pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType; pCtx->msgType = pReq->msgType;
if (ctx != NULL) { if (ctx != NULL) pCtx->appCtx = *ctx;
pCtx->appCtx = *ctx;
}
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cliMsg->ctx = pCtx; cliMsg->ctx = pCtx;

View File

@ -1148,6 +1148,7 @@ int transReleaseSrvHandle(void* handle) {
tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q); transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(transGetRefMgt(), refId); transReleaseExHandle(transGetRefMgt(), refId);
return 0; return 0;
_return1: _return1:
@ -1177,8 +1178,10 @@ int transSendResponse(const STransMsg* msg) {
STraceId* trace = (STraceId*)&msg->info.traceId; STraceId* trace = (STraceId*)&msg->info.traceId;
tGTrace("conn %p start to send resp (1/2)", exh->handle); tGTrace("conn %p start to send resp (1/2)", exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q); transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(transGetRefMgt(), refId); transReleaseExHandle(transGetRefMgt(), refId);
return 0; return 0;
_return1: _return1:
tTrace("handle %p failed to send resp", exh); tTrace("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
@ -1207,6 +1210,7 @@ int transRegisterMsg(const STransMsg* msg) {
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle); tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q); transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(transGetRefMgt(), refId); transReleaseExHandle(transGetRefMgt(), refId);
return 0; return 0;

View File

@ -31,6 +31,15 @@ typedef struct TdDir {
HANDLE hFind; HANDLE hFind;
} TdDir; } TdDir;
enum
{
WRDE_NOSPACE = 1, /* Ran out of memory. */
WRDE_BADCHAR, /* A metachar appears in the wrong place. */
WRDE_BADVAL, /* Undefined var reference with WRDE_UNDEF. */
WRDE_CMDSUB, /* Command substitution with WRDE_NOCMD. */
WRDE_SYNTAX /* Shell syntax error. */
};
int wordexp(char *words, wordexp_t *pwordexp, int flags) { int wordexp(char *words, wordexp_t *pwordexp, int flags) {
pwordexp->we_offs = 0; pwordexp->we_offs = 0;
pwordexp->we_wordc = 1; pwordexp->we_wordc = 1;
@ -265,9 +274,21 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays) {
int32_t taosExpandDir(const char *dirname, char *outname, int32_t maxlen) { int32_t taosExpandDir(const char *dirname, char *outname, int32_t maxlen) {
wordexp_t full_path; wordexp_t full_path;
if (0 != wordexp(dirname, &full_path, 0)) { switch (wordexp (dirname, &full_path, 0)) {
printf("failed to expand path:%s since %s", dirname, strerror(errno)); case 0:
wordfree(&full_path); break;
case WRDE_NOSPACE:
wordfree (&full_path);
// printf("failed to expand path:%s since Out of memory\n", dirname);
return -1;
case WRDE_BADCHAR:
// printf("failed to expand path:%s since illegal occurrence of newline or one of |, &, ;, <, >, (, ), {, }\n", dirname);
return -1;
case WRDE_SYNTAX:
// printf("failed to expand path:%s since Shell syntax error, such as unbalanced parentheses or unmatched quotes\n", dirname);
return -1;
default:
// printf("failed to expand path:%s since %s\n", dirname, strerror(errno));
return -1; return -1;
} }

View File

@ -386,11 +386,12 @@ void* taosArrayDestroy(SArray* pArray) {
} }
void taosArrayDestroyP(SArray* pArray, FDelete fp) { void taosArrayDestroyP(SArray* pArray, FDelete fp) {
if(!pArray) return; if (pArray) {
for (int32_t i = 0; i < pArray->size; i++) { for (int32_t i = 0; i < pArray->size; i++) {
fp(*(void**)TARRAY_GET_ELEM(pArray, i)); fp(*(void**)TARRAY_GET_ELEM(pArray, i));
} }
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
}
} }
void taosArrayDestroyEx(SArray* pArray, FDelete fp) { void taosArrayDestroyEx(SArray* pArray, FDelete fp) {

View File

@ -116,7 +116,7 @@ sql_error (select c1 from union_tb0 limit 1 union all select c1 from union_tb1 l
sql_error (select c1 from union_tb0 limit 1 union all select c1 from union_tb1 limit 1) limit 1 sql_error (select c1 from union_tb0 limit 1 union all select c1 from union_tb1 limit 1) limit 1
# sql with parenthese # sql with parenthese
sql (((select c1 from union_tb0))) sql (select c1 from union_tb0)
if $rows != 10000 then if $rows != 10000 then
return -1 return -1
endi endi

View File

@ -325,7 +325,9 @@ class TDTestCase:
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4)) expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4))
elif self.snapshot == 1: elif self.snapshot == 1:
consumerId = 5 consumerId = 5
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4)) # expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4))
# fix case: sometimes only 200 rows are deleted
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4))
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 1 ifcheckdata = 1