Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TS-4994-3.0

This commit is contained in:
Hongze Cheng 2024-11-19 14:59:13 +08:00
commit 0d3210a2c9
52 changed files with 1015 additions and 446 deletions

View File

@ -5,7 +5,7 @@ node {
}
file_zh_changed = ''
file_en_changed = ''
file_no_doc_changed = ''
file_no_doc_changed = '1'
def abortPreviousBuilds() {
def currentJobName = env.JOB_NAME
def currentBuildNumber = env.BUILD_NUMBER.toInteger()
@ -656,4 +656,4 @@ pipeline {
)
}
}
}
}

View File

@ -4,6 +4,12 @@
"main": "index.js",
"license": "MIT",
"dependencies": {
"@tdengine/websocket": "^3.1.1"
}
"@tdengine/websocket": "^3.1.2"
},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"description": ""
}

View File

@ -21,7 +21,7 @@ sudo ./install.sh
为了避免影响系统已有的 Python 环境Anode 使用虚拟环境运行。安装 Anode 会在目录 `/var/lib/taos/taosanode/venv/` 中创建默认的 Python 虚拟环境Anode 运行所需要的库均安装在该目录下。为了避免反复安装虚拟环境带来的开销,卸载命令 `rmtaosanode` 并不会自动删除该虚拟环境,如果您确认不再需要 Python 的虚拟环境,手动删除该目录即可。
### 启停服务
在 Linux 系统中,安装 Anode 以后会自动创建 `taosadnoded` 服务。可以使用 `systemd` 来管理 Anode 服务,使用如下命令启动/停止/检查 Anode。
在 Linux 系统中,安装 Anode 以后会自动创建 `taosanoded` 服务。可以使用 `systemd` 来管理 Anode 服务,使用如下命令启动/停止/检查 Anode。
```bash
systemctl start taosanoded
@ -96,7 +96,7 @@ Anode 运行配置主要是以下:
### Anode 基本操作
对于 Anode 的管理,用户需要通过 TDengine 的命令行接口 taos 进行。因此下述介绍的管理命令都需要先打开 taos, 接到 TDengine 运行实例。
对于 Anode 的管理,用户需要通过 TDengine 的命令行接口 taos 进行。因此下述介绍的管理命令都需要先打开 taos, 接到 TDengine 运行实例。
#### 创建 Anode
```sql
CREATE ANODE {node_url}
@ -117,7 +117,7 @@ SHOW ANODES FULL;
#### 刷新集群中的分析算法缓存
```SQL
UPDATE ANODE {node_id}
UPDATE ANODE {anode_id}
UPDATE ALL ANODES
```

View File

@ -9,16 +9,45 @@ sidebar_label: "统计学算法"
|---|---|---|---|
|k|标准差倍数|选填|3|
```SQL
--- 指定调用的算法为ksigma, 参数 k 为 2
SELECT _WSTART, COUNT(*)
FROM foo
ANOMALY_WINDOW(foo.i32, "algo=ksigma,k=2")
```
- IQR<sup>[2]</sup>Interquartile range(IQR),四分位距是一种衡量变异性的方法。四分位数将一个按等级排序的数据集划分为四个相等的部分。即 Q1第 1 个四分位数、Q2第 2 个四分位数)和 Q3第 3 个四分位数)。 $IQR=Q3-Q1$,对于 $v$, $Q1-(1.5 \times IQR) \le v \le Q3+(1.5 \times IQR)$ 是正常值,范围之外的是异常值。无输入参数。
```SQL
--- 指定调用的算法为 iqr, 无参数
SELECT _WSTART, COUNT(*)
FROM foo
ANOMALY_WINDOW(foo.i32, "algo=iqr")
```
- Grubbs<sup>[3]</sup>: Grubbs' test即最大标准残差测试。Grubbs 通常用作检验最大值、最小值偏离均值的程度是否为异常,要求单变量数据集遵循近似标准正态分布。非正态分布数据集不能使用该方法。无输入参数。
- SHESD<sup>[4]</sup> 带有季节性的 ESD 检测算法。ESD 可以检测时间序列数据的多异常点。需要指定异常点比例的上界***k***,最差的情况是至多 49.9%。数据集的异常比例一般不超过 5%
```SQL
--- 指定调用的算法为 grubbs, 无参数
SELECT _WSTART, COUNT(*)
FROM foo
ANOMALY_WINDOW(foo.i32, "algo=grubbs")
```
- SHESD<sup>[4]</sup> 带有季节性的 ESD 检测算法。ESD 可以检测时间序列数据的多异常点。需要指定异常检测方向('pos' / 'neg' / 'both'),异常值比例的上界***max_anoms***,最差的情况是至多 49.9%。数据集的异常比例一般不超过 5%
|参数|说明|是否必选|默认值|
|---|---|---|---|
|k|异常点在输入数据集中占比 $1 \le K \le 49.9$ |选填|5|
|direction|异常检测方向类型('pos' / 'neg' / 'both')|否|"both"|
|max_anoms|异常值比例 $1 \le K \le 49.9$|否|0.05|
```SQL
--- 指定调用的算法为 shesd, 参数 direction 为 both异常值比例 5%
SELECT _WSTART, COUNT(*)
FROM foo
ANOMALY_WINDOW(foo.i32, "algo=shesd,direction=both,anoms=0.05")
```
### 参考文献
1. [https://en.wikipedia.org/wiki/689599.7 rule](https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule)

View File

@ -8,4 +8,13 @@ LOF<sup>[1]</sup>: Local Outlier Factor(LOF),局部离群因子/局部异常
是 Breunig 在 2000 年提出的一种基于密度的局部离群点检测算法,该方法适用于不同类簇密度分散情况迥异的数据。根据数据点周围的数据密集情况,首先计算每个数据点的一个局部可达密度,然后通过局部可达密度进一步计算得到每个数据点的一个离群因子,
该离群因子即标识了一个数据点的离群程度,因子值越大,表示离群程度越高,因子值越小,表示离群程度越低。最后,输出离群程度最大的 $topK$ 个点。
```SQL
--- 指定调用的算法为LOF即可调用该算法
SELECT count(*)
FROM foo
ANOMALY_WINDOW(foo.i32, "algo=lof")
```
### 参考文献
1. Breunig, M. M.; Kriegel, H.-P.; Ng, R. T.; Sander, J. (2000). LOF: Identifying Density-based Local Outliers (PDF). Proceedings of the 2000 ACM SIGMOD International Conference on Management of Data. SIGMOD. pp. 93104. doi:10.1145/335191.335388. ISBN 1-58113-217-4.

View File

@ -3,4 +3,15 @@ title: "机器学习算法"
sidebar_label: "机器学习算法"
---
Autoencoder: TDgpt 内置使用自编码器Autoencoder的异常检测算法对周期性的时间序列数据具有较好的检测结果。使用该模型需要针对输入时序数据进行预训练同时将训练完成的模型保存在到服务目录 `ad_autoencoder` 中,然后在 SQL 语句中指定调用该算法模型即可使用。
Autoencoder<sup>[1]</sup>: TDgpt 内置使用自编码器Autoencoder的异常检测算法对周期性的时间序列数据具有较好的检测结果。使用该模型需要针对输入时序数据进行预训练同时将训练完成的模型保存在到服务目录 `ad_autoencoder` 中,然后在 SQL 语句中指定调用该算法模型即可使用。
```SQL
--- 在 options 中增加 model 的名称ad_autoencoder_foo 针对 foo 数据集(表)训练的采用自编码器的异常检测模型进行异常检测
SELECT COUNT(*), _WSTART
FROM foo
ANOMALY_DETECTION(col1, 'algo=encoder, model=ad_autoencoder_foo');
```
### 参考文献
1. https://en.wikipedia.org/wiki/Autoencoder

View File

@ -56,7 +56,7 @@ SELECT _wstart, _wend, SUM(i32)
FROM foo
ANOMALY_WINDOW(i32, "algo=ksigma,k=2");
taos> SELECT _wstart, _wend, count(*) FROM ai.atb ANOMAYL_WINDOW(i32);
taos> SELECT _wstart, _wend, count(*) FROM foo ANOMAYL_WINDOW(i32);
_wstart | _wend | count(*) |
====================================================================
2020-01-01 00:00:16.000 | 2020-01-01 00:00:17.000 | 2 |

View File

@ -69,9 +69,12 @@ SELECT COUNT(*) FROM foo ANOMALY_DETECTION(col_name, 'algo=name')
## 添加具有模型的分析算法
基于统计学的分析算法可以直接针对输入时间序列数据进行分析,但是某些深度学习算法对于输入数据需要较长的时间训练,并且生成相应的模型。这种情况下,同一个分析算法对应不同的输入数据集有不同的分析模型。
将具有模型的分析算法添加到 Anode 中,首先需要在 `model` 目录中建立该算法对应的目录(目录名称可自拟),将采用该算法针对不同的输入时间序列数据生成的训练模型均需要保存在该目录下,同时目录结构要在分析算法中确定,以便能够固定加载该目录下的分析模型。如下图所示针对不同的数据集采用自编码器Autoencoder训练的数据异常检测算法模型均保存在该目录下。为了确保模型能够正常读取加载,要求存储的模型使用`joblib`库进行序列化保存。
将具有模型的分析算法添加到 Anode 中,首先需要在 `model` 目录中建立该算法对应的目录(目录名称可自拟),将采用该算法针对不同的输入时间序列数据生成的训练模型均需要保存在该目录下,同时目录名称要在分析算法中确定,以便能够固定加载该目录下的分析模型。为了确保模型能够正常读取加载,存储的模型使用`joblib`库进行序列化保存。
调用已经保存的模型,需要在调用参数中增加指定模型名称,以便能够调用正确的模型,示例 SQL 语句如下所示。
下面以自编码器Autoencoder为例说明如何添加要预先训练的模型进行异常检测。
首先我们在`model`目录中创建一个目录 -- `ad_detection`,该目录将用来保存所有使用自编码器训练的模型。然后,我们使用自编码器对 foo 表的时间序列数据进行训练,得到模型 ad_autoencoder_foo使用 `joblib`序列化以后保存在`ad_detection` 目录中。
使用 SQL 调用已经保存的模型,需要在调用参数中指定模型名称``model=ad_autoencoder_foo`,而 `algo=encoder` 是确定调用的自编码器生成的模型(这里的`encoder`说明调用的是自编码器算法模型,该名称是添加算法的时候在代码中定义)以便能够调用该模型。
```SQL
--- 在 options 中增加 model 的名称ad_autoencoder_foo 针对 foo 数据集(表)训练的采用自编码器的异常检测模型进行异常检测

View File

@ -19,5 +19,7 @@ TDgpt 运行在集群中的 AI Node (Anode)中,集群中可以部署若干个
- 时序数据异常检测。TDengine 中定义了新的时间窗口——异常(状态)窗口——来提供异常检测服务。异常窗口可以视为一种特殊的**事件窗口Event Window**,即异常检测算法确定的连续异常时间序列数据所在的时间窗口。与普通事件窗口区别在于——时间窗口的起始时间和结束时间均是分析算法确定,不是用户指定的表达式判定。异常窗口使用方式与其他类型的时间窗口(例如状态窗口、会话窗口等)类似。因此时间窗口内可使用的查询操作均可应用在异常窗口上。
- 时序数据预测。定义了一个新函数`FORECAST`,基于输入的(历史)时间序列数据调用指定(或默认)预测算法给出输入时序数据后续时间序列的**预测**数据。
TDgpt 还为算法开发者提供了一 SDK。任何开发者只需要按照[算法开发者指南](./dev)的步骤,就可以将自己独有的时序数据预测或时序数据异常检测算法无缝集成到 TDgpt, 这样 TDengine 用户就可以通过一条 SQL 获得时序数据预测结果或是异常窗口了, 大幅降低了用户使用新的时序数据分析算法的门槛,而且让 TDengine 成为一开放的系统。

View File

@ -294,6 +294,7 @@ charset 的有效值是 UTF-8。
|checkpointBackupDir | |内部参数,用于恢复 snode 数据|
|enableAuditDelete | |内部参数,用于测试审计功能|
|slowLogThresholdTest| |内部参数,用于测试慢日志|
|bypassFlag |3.3.4.5 后|内部参数,用于短路测试,默认值 0|
### 压缩参数
|参数名称|支持版本|参数含义|

View File

@ -97,6 +97,7 @@ TDengine 客户端驱动提供了应用编程所需要的全部 API并且在
|safetyCheckLevel |3.3.3.0 后|内部参数,用于随机失败测试|
|simdEnable |3.3.4.3 后|内部参数,用于测试 SIMD 加速|
|AVX512Enable |3.3.4.3 后|内部参数,用于测试 AVX512 加速|
|bypassFlag |3.3.4.5 后|内部参数用于短路测试缺省值0|
### SHELL 相关
|参数名称|支持版本|参数含义|

View File

@ -17,12 +17,23 @@
#define TDENGINE_STREAMMSG_H
#include "tmsg.h"
#include "trpc.h"
//#include "trpc.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SStreamRetrieveReq SStreamRetrieveReq;
typedef struct SStreamDispatchReq SStreamDispatchReq;
typedef struct STokenBucket STokenBucket;
typedef struct SMetaHbInfo SMetaHbInfo;
typedef struct SNodeUpdateInfo {
int32_t nodeId;
SEpSet prevEp;
SEpSet newEp;
} SNodeUpdateInfo;
typedef struct SStreamUpstreamEpInfo {
int32_t nodeId;
int32_t childId;
@ -170,8 +181,8 @@ typedef struct SStreamHbMsg {
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
} SStreamHbMsg;
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq);
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq);
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg);
typedef struct {
@ -179,6 +190,9 @@ typedef struct {
int32_t msgId;
} SMStreamHbRspMsg;
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp);
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp);
typedef struct SRetrieveChkptTriggerReq {
SMsgHead head;
int64_t streamId;
@ -189,6 +203,9 @@ typedef struct SRetrieveChkptTriggerReq {
int64_t downstreamTaskId;
} SRetrieveChkptTriggerReq;
int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq);
int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq);
typedef struct SCheckpointTriggerRsp {
int64_t streamId;
int64_t checkpointId;
@ -198,6 +215,9 @@ typedef struct SCheckpointTriggerRsp {
int32_t rspCode;
} SCheckpointTriggerRsp;
int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp);
int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp);
typedef struct SCheckpointReport {
int64_t streamId;
int32_t taskId;
@ -222,7 +242,7 @@ typedef struct SRestoreCheckpointInfo {
int32_t nodeId;
} SRestoreCheckpointInfo;
int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);
int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq);
typedef struct {
@ -232,10 +252,8 @@ typedef struct {
int32_t reqType;
} SStreamTaskRunReq;
typedef struct SCheckpointConsensusEntry {
SRestoreCheckpointInfo req;
int64_t ts;
} SCheckpointConsensusEntry;
int32_t tEncodeStreamTaskRunReq(SEncoder* pEncoder, const SStreamTaskRunReq* pReq);
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq);
#ifdef __cplusplus
}

View File

@ -67,6 +67,7 @@ extern int64_t tsTickPerHour[3];
extern int32_t tsCountAlwaysReturnValue;
extern float tsSelectivityRatio;
extern int32_t tsTagFilterResCacheSize;
extern int32_t tsBypassFlag;
// queue & threads
extern int32_t tsNumOfRpcThreads;

View File

@ -3804,7 +3804,14 @@ typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
} SVPauseStreamTaskReq, SVResetStreamTaskReq;
} SVPauseStreamTaskReq;
typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
int64_t chkptId;
} SVResetStreamTaskReq;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];

View File

@ -319,11 +319,6 @@ typedef struct SSTaskBasicInfo {
SInterval interval;
} SSTaskBasicInfo;
typedef struct SStreamRetrieveReq SStreamRetrieveReq;
typedef struct SStreamDispatchReq SStreamDispatchReq;
typedef struct STokenBucket STokenBucket;
typedef struct SMetaHbInfo SMetaHbInfo;
typedef struct SDispatchMsgInfo {
SStreamDispatchReq* pData; // current dispatch data
@ -626,11 +621,11 @@ typedef struct STaskStatusEntry {
STaskCkptInfo checkpointInfo;
} STaskStatusEntry;
typedef struct SNodeUpdateInfo {
int32_t nodeId;
SEpSet prevEp;
SEpSet newEp;
} SNodeUpdateInfo;
//typedef struct SNodeUpdateInfo {
// int32_t nodeId;
// SEpSet prevEp;
// SEpSet newEp;
//} SNodeUpdateInfo;
typedef struct SStreamTaskState {
ETaskStatus state;
@ -643,6 +638,11 @@ typedef struct SCheckpointConsensusInfo {
int64_t streamId;
} SCheckpointConsensusInfo;
typedef struct SCheckpointConsensusEntry {
SRestoreCheckpointInfo req;
int64_t ts;
} SCheckpointConsensusEntry;
void streamSetupScheduleTrigger(SStreamTask* pTask);
// dispatch related
@ -718,6 +718,7 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
SRpcHandleInfo* pInfo, int32_t code);
void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId);
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue);

View File

@ -1012,6 +1012,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106)
#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107)
#define TSDB_CODE_STREAM_INPUTQ_FULL TAOS_DEF_ERROR_CODE(0, 0x4108)
#define TSDB_CODE_STREAM_INVLD_CHKPT TAOS_DEF_ERROR_CODE(0, 0x4109)
// TDLite
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)

View File

@ -620,6 +620,16 @@ enum {
enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 };
/**
* RB: return before
* RA: return after
* NR: not return, skip and go on following steps
*/
#define TSDB_BYPASS_RB_RPC_SEND_SUBMIT 0x01u
#define TSDB_BYPASS_RA_RPC_RECV_SUBMIT 0x02u
#define TSDB_BYPASS_RB_TSDB_WRITE_MEM 0x04u
#define TSDB_BYPASS_RB_TSDB_COMMIT 0x08u
#define DEFAULT_HANDLE 0
#define MNODE_HANDLE 1
#define QNODE_HANDLE -1

View File

@ -1,4 +1,7 @@
aux_source_directory(src COMMON_SRC)
aux_source_directory(src/msg COMMON_MSG_SRC)
LIST(APPEND COMMON_SRC ${COMMON_MSG_SRC})
if(TD_ENTERPRISE)
LIST(APPEND COMMON_SRC ${TD_ENTERPRISE_DIR}/src/plugins/common/src/tglobal.c)

View File

@ -15,8 +15,48 @@
#include "streamMsg.h"
#include "os.h"
#include "tstream.h"
#include "streamInt.h"
#include "tcommon.h"
typedef struct STaskId {
int64_t streamId;
int64_t taskId;
} STaskId;
typedef struct STaskCkptInfo {
int64_t latestId; // saved checkpoint id
int64_t latestVer; // saved checkpoint ver
int64_t latestTime; // latest checkpoint time
int64_t latestSize; // latest checkpoint size
int8_t remoteBackup; // latest checkpoint backup done
int64_t activeId; // current active checkpoint id
int32_t activeTransId; // checkpoint trans id
int8_t failed; // denote if the checkpoint is failed or not
int8_t consensusChkptId; // required the consensus-checkpointId
int64_t consensusTs; //
} STaskCkptInfo;
typedef struct STaskStatusEntry {
STaskId id;
int32_t status;
int32_t statusLastDuration; // to record the last duration of current status
int64_t stage;
int32_t nodeId;
SVersionRange verRange; // start/end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task
double inputQUsed; // in MiB
double inputRate;
double procsThroughput; // duration between one element put into input queue and being processed.
double procsTotal; // duration between one element put into input queue and being processed.
double outputThroughput; // the size of dispatched result blocks in bytes
double outputTotal; // the size of dispatched result blocks in bytes
double sinkQuota; // existed quota size for sink task
double sinkDataSize; // sink to dst data size
int64_t startTime;
int64_t startCheckpointId;
int64_t startCheckpointVer;
int64_t hTaskId;
STaskCkptInfo checkpointInfo;
} STaskStatusEntry;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId));
@ -289,7 +329,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen));
if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) {
stError("invalid dispatch req msg");
uError("invalid dispatch req msg");
TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
}
@ -605,173 +645,92 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
pMsg->numOfTasks = -1;
}
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
int32_t taskId = pTask->hTaskInfo.id.taskId;
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
taskId = pTask->streamTaskId.taskId;
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
for (int32_t i = 0; i < epSz; i++) {
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
}
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
}
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
}
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->msgId));
tEndEncode(pEncoder);
_exit:
return code;
}
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
int32_t taskId = 0;
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
}
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->msgId));
tEndDecode(pDecoder);
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
_exit:
return code;
}
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->downstreamTaskId));
tEndEncode(pEncoder);
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
_exit:
return code;
}
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
pTask->hTaskInfo.id.taskId = taskId;
int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
pTask->streamTaskId.taskId = taskId;
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->downstreamTaskId));
tEndDecode(pDecoder);
TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
_exit:
return code;
}
int32_t epSz = -1;
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp) {
int32_t code = 0;
int32_t lino;
if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
for (int32_t i = 0; i < epSz; i++) {
SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
if (pInfo == NULL) {
TAOS_CHECK_EXIT(terrno);
}
if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
taosMemoryFreeClear(pInfo);
goto _exit;
}
if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
}
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->checkpointId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->taskId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->transId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->rspCode));
tEndEncode(pEncoder);
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
}
_exit:
return code;
}
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
TAOS_CHECK_EXIT(terrno);
}
TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
}
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
}
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->taskId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->transId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->rspCode));
tEndDecode(pDecoder);
_exit:
@ -830,11 +789,7 @@ int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpoin
tEndEncode(pEncoder);
_exit:
if (code) {
return code;
} else {
return pEncoder->pos;
}
return code;
}
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) {
@ -853,3 +808,31 @@ int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo*
_exit:
return code;
}
int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType));
tEndEncode(pEncoder);
_exit:
return code;
}
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType));
tEndDecode(pDecoder);
_exit:
return code;
}

View File

@ -217,6 +217,8 @@ float tsSelectivityRatio = 1.0;
int32_t tsTagFilterResCacheSize = 1024 * 10;
char tsTagFilterCache = 0;
int32_t tsBypassFlag = 0;
// the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default)
// 0 no query allowed, queries are disabled
@ -612,6 +614,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorScope", tsRandErrScope, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "safetyCheckLevel", tsSafetyCheckLevel, 0, 5, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "bypassFlag", tsBypassFlag, 0, INT32_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, TSDB_MAX_RPC_THREADS);
@ -1303,6 +1306,10 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "safetyCheckLevel");
tsSafetyCheckLevel = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "bypassFlag");
tsBypassFlag = pItem->i32;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -2046,7 +2053,8 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"supportVnodes", &tsNumOfSupportVnodes},
{"experimental", &tsExperimental},
{"maxTsmaNum", &tsMaxTsmaNum},
{"safetyCheckLevel", &tsSafetyCheckLevel}};
{"safetyCheckLevel", &tsSafetyCheckLevel},
{"bypassFlag", &tsBypassFlag}};
if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false);
@ -2302,7 +2310,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
{"multiResultFunctionStarReturnTags", &tsMultiResultFunctionStarReturnTags},
{"maxTsmaCalcDelay", &tsMaxTsmaCalcDelay},
{"tsmaDataDeleteMark", &tsmaDataDeleteMark},
{"safetyCheckLevel", &tsSafetyCheckLevel}};
{"safetyCheckLevel", &tsSafetyCheckLevel},
{"bypassFlag", &tsBypassFlag}};
if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {
code = taosCfgSetOption(options, tListLen(options), pItem, false);

View File

@ -46,7 +46,7 @@ if (${TD_LINUX})
target_sources(tmsgTest
PRIVATE
"tmsgTest.cpp"
"../src/tmsg.c"
"../src/msg/tmsg.c"
)
target_include_directories(tmsgTest PUBLIC "${TD_SOURCE_DIR}/include/common/")
target_link_libraries(tmsgTest PUBLIC os util gtest gtest_main)

View File

@ -214,8 +214,6 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
} else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
(!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
code = pRpc->code;
goto _OVER;
}
if (pHandle->defaultNtype == NODE_END) {

View File

@ -56,6 +56,7 @@ typedef struct SStreamTransMgmt {
typedef struct SStreamTaskResetMsg {
int64_t streamId;
int32_t transId;
int64_t checkpointId;
} SStreamTaskResetMsg;
typedef struct SChkptReportInfo {
@ -142,9 +143,9 @@ int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pSt
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId);
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream);
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId);
int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts);
int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream);
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,

View File

@ -2434,7 +2434,12 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC
mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64,
pReport->taskId, p->checkpointId, pReport->checkpointId);
memcpy(p, pReport, sizeof(STaskChkptInfo));
// update the checkpoint report info
p->checkpointId = pReport->checkpointId;
p->ts = pReport->checkpointTs;
p->version = pReport->checkpointVer;
p->transId = pReport->transId;
p->dropHTask = pReport->dropHTask;
} else {
mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
}

View File

@ -0,0 +1,72 @@
#include "mndTrans.h"
uint32_t seed = 0;
static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) {
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
if (rpcMsg.pCont == NULL) {
return rpcMsg;
}
rpcMsg.info.traceId.rootId = traceId;
rpcMsg.info.notFreeAhandle = 1;
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
return rpcMsg;
}
void streamTransRandomErrorGen(STransAction *pAction, STrans *pTrans, int64_t signature) {
if ((pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT && pAction->id > 2) ||
(pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) ||
(pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE && pAction->id > 2)) {
if (seed == 0) {
seed = taosGetTimestampSec();
}
uint32_t v = taosRandR(&seed);
int32_t choseItem = v % 5;
if (choseItem == 0) {
// 1. one of update-checkpoint not send, restart and send it again
taosMsleep(5000);
if (pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT) {
mError(
"***sleep 5s and core dump, following tasks will not recv update-checkpoint info, so the checkpoint will "
"rollback***");
exit(-1);
} else if (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) { // pAction->msgType == TDMT_STREAM_CONSEN_CHKPT
mError(
"***sleep 5s and core dump, following tasks will not recv consen-checkpoint info, so the tasks will "
"not started***");
} else { // pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE
mError(
"***sleep 5s and core dump, following tasks will not recv checkpoint-source info, so the tasks will "
"started after restart***");
exit(-1);
}
} else if (choseItem == 1) {
// 2. repeat send update chkpt msg
mError("***repeat send update-checkpoint/consensus/checkpoint trans msg 3times to vnode***");
mError("***repeat 1***");
SRpcMsg rpcMsg1 = createRpcMsg(pAction, pTrans->mTraceId, signature);
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg1);
mError("***repeat 2***");
SRpcMsg rpcMsg2 = createRpcMsg(pAction, pTrans->mTraceId, signature);
code = tmsgSendReq(&pAction->epSet, &rpcMsg2);
mError("***repeat 3***");
SRpcMsg rpcMsg3 = createRpcMsg(pAction, pTrans->mTraceId, signature);
code = tmsgSendReq(&pAction->epSet, &rpcMsg3);
} else if (choseItem == 2) {
// 3. sleep 40s and then send msg
mError("***idle for 30s, and then send msg***");
taosMsleep(30000);
} else {
// do nothing
// mInfo("no error triggered");
}
}
}

View File

@ -24,7 +24,7 @@ typedef struct SFailedCheckpointInfo {
static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode);
static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList);
static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId);
static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId);
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage);
static void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
@ -68,7 +68,7 @@ void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
}
}
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId) {
STrans *pTrans = NULL;
int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME,
" reset from failed checkpoint", &pTrans);
@ -84,7 +84,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
return code;
}
code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream);
code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream, chkptId);
if (code) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
@ -115,7 +115,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
return code;
}
int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId) {
int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId) {
int32_t size = sizeof(SStreamTaskResetMsg);
int32_t num = taosArrayGetSize(execInfo.pKilledChkptTrans);
@ -135,8 +135,9 @@ int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t
taosArrayRemove(execInfo.pKilledChkptTrans, 0); // remove this first, append new reset trans in the tail
}
SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId};
SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId, .checkpointId = checkpointId};
// let's remember that this trans had been killed already
void *px = taosArrayPush(execInfo.pKilledChkptTrans, &p);
if (px == NULL) {
mError("failed to push reset-msg trans:%d into the killed chkpt trans list, size:%d", transId, num - 1);
@ -150,6 +151,7 @@ int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t
pReq->streamId = streamId;
pReq->transId = transId;
pReq->checkpointId = checkpointId;
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_TASK_RESET, .pCont = pReq, .contLen = size};
int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
@ -234,7 +236,7 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) {
} else {
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
pStream->uid, pMsg->transId);
code = mndCreateStreamResetStatusTrans(pMnode, pStream);
code = mndCreateStreamResetStatusTrans(pMnode, pStream, pMsg->checkpointId);
}
}
@ -379,9 +381,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) {
mError("vgId:%d HbMsgId:%d already handled, bh msg discard", pEntry->nodeId, req.msgId);
mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId);
terrno = TSDB_CODE_INVALID_MSG;
// return directly and after the vnode to continue to send the next HbMsg.
terrno = TSDB_CODE_SUCCESS;
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
streamMutexUnlock(&execInfo.lock);
@ -495,10 +498,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
continue;
}
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
pInfo->checkpointId, pInfo->transId);
mInfo("stream:0x%" PRIx64 " checkpointId:%" PRId64
" transId:%d failed issue task-reset trans to reset all tasks status",
pInfo->streamUid, pInfo->checkpointId, pInfo->transId);
code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId);
code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId, pInfo->checkpointId);
if (code) {
mError("failed to create reset task trans, code:%s", tstrerror(code));
}
@ -549,12 +553,37 @@ void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArr
}
void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId) {
SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = sizeof(SMStreamHbRspMsg)};
rsp.pCont = rpcMallocCont(rsp.contLen);
int32_t ret = 0;
int32_t tlen = 0;
void *buf = NULL;
SMStreamHbRspMsg *pMsg = rsp.pCont;
pMsg->head.vgId = htonl(vgId);
pMsg->msgId = msgId;
const SMStreamHbRspMsg msg = {.msgId = msgId};
tEncodeSize(tEncodeStreamHbRsp, &msg, tlen, ret);
if (ret < 0) {
mError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
}
buf = rpcMallocCont(tlen + sizeof(SMsgHead));
if (buf == NULL) {
mError("encode stream hb msg rsp failed, code:%s", tstrerror(terrno));
return;
}
((SMStreamHbRspMsg*)buf)->head.vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamHbRsp(&encoder, &msg)) < 0) {
rpcFreeCont(buf);
tEncoderClear(&encoder);
mError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
return;
}
tEncoderClear(&encoder);
SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = tlen + sizeof(SMsgHead), .pCont = buf};
tmsgSendRsp(&rsp);
pRpcInfo->handle = NULL; // disable auto rsp

View File

@ -295,7 +295,7 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas
return code;
}
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t chkptId) {
SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
@ -306,6 +306,7 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
pReq->chkptId = chkptId;
SEpSet epset = {0};
bool hasEpset = false;
@ -544,7 +545,7 @@ int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* p
return 0;
}
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId) {
SStreamTaskIter *pIter = NULL;
taosWLockLatch(&pStream->lock);
@ -564,7 +565,7 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
return code;
}
code = doSetResetAction(pMnode, pTrans, pTask);
code = doSetResetAction(pMnode, pTrans, pTask, chkptId);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
@ -606,7 +607,7 @@ int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* p
tEncoderInit(&encoder, abuf, tlen);
code = tEncodeRestoreCheckpointInfo(&encoder, &req);
tEncoderClear(&encoder);
if (code == -1) {
if (code < 0) {
taosMemoryFree(pBuf);
return code;
}

View File

@ -1521,74 +1521,4 @@ int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
mError("snode not existed when trying to create stream in db with multiple replica");
return TSDB_CODE_SNODE_NOT_DEPLOYED;
}
}
uint32_t seed = 0;
static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) {
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
if (rpcMsg.pCont == NULL) {
return rpcMsg;
}
rpcMsg.info.traceId.rootId = traceId;
rpcMsg.info.notFreeAhandle = 1;
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
return rpcMsg;
}
void streamTransRandomErrorGen(STransAction *pAction, STrans *pTrans, int64_t signature) {
if ((pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT && pAction->id > 2) ||
(pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) ||
(pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE && pAction->id > 2)) {
if (seed == 0) {
seed = taosGetTimestampSec();
}
uint32_t v = taosRandR(&seed);
int32_t choseItem = v % 5;
if (choseItem == 0) {
// 1. one of update-checkpoint not send, restart and send it again
taosMsleep(5000);
if (pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT) {
mError(
"***sleep 5s and core dump, following tasks will not recv update-checkpoint info, so the checkpoint will "
"rollback***");
exit(-1);
} else if (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) { // pAction->msgType == TDMT_STREAM_CONSEN_CHKPT
mError(
"***sleep 5s and core dump, following tasks will not recv consen-checkpoint info, so the tasks will "
"not started***");
} else { // pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE
mError(
"***sleep 5s and core dump, following tasks will not recv checkpoint-source info, so the tasks will "
"started after restart***");
exit(-1);
}
} else if (choseItem == 1) {
// 2. repeat send update chkpt msg
mError("***repeat send update-checkpoint/consensus/checkpoint trans msg 3times to vnode***");
mError("***repeat 1***");
SRpcMsg rpcMsg1 = createRpcMsg(pAction, pTrans->mTraceId, signature);
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg1);
mError("***repeat 2***");
SRpcMsg rpcMsg2 = createRpcMsg(pAction, pTrans->mTraceId, signature);
code = tmsgSendReq(&pAction->epSet, &rpcMsg2);
mError("***repeat 3***");
SRpcMsg rpcMsg3 = createRpcMsg(pAction, pTrans->mTraceId, signature);
code = tmsgSendReq(&pAction->epSet, &rpcMsg3);
} else if (choseItem == 2) {
// 3. sleep 40s and then send msg
mError("***idle for 30s, and then send msg***");
taosMsleep(30000);
} else {
// do nothing
// mInfo("no error triggered");
}
}
}

View File

@ -246,7 +246,7 @@ TEST_F(StreamTest, kill_checkpoint_trans) {
px = taosArrayPush(pStream->tasks, &pLevel);
ASSERT(px != NULL);
code = mndCreateStreamResetStatusTrans(pMnode, pStream);
code = mndCreateStreamResetStatusTrans(pMnode, pStream, 1);
ASSERT(code != 0);
tFreeStreamObj(pStream);

View File

@ -1009,21 +1009,34 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t code = 0;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SDecoder decoder;
SStreamTaskRunReq req = {0};
tDecoderInit(&decoder, (uint8_t*)msg, len);
if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
tqError("vgId:%d failed to decode task run req, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
tDecoderClear(&decoder);
return TSDB_CODE_SUCCESS;
}
tDecoderClear(&decoder);
// extracted submit data from wal files for all tasks
if (pReq->reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
return tqScanWal(pTq);
}
int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
if (code) {
tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}
// let's continue scan data in the wal files
if (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK) {
if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) {
code = tqScanWalAsync(pTq, false); // it's ok to failed
if (code) {
tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
@ -1297,7 +1310,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont;
SStreamCheckpointReadyMsg* pReq = (SStreamCheckpointReadyMsg*)pMsg->pCont;
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
(int32_t)pReq->downstreamTaskId);
@ -1318,10 +1331,23 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont;
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
(int32_t)pReq->downstreamTaskId);
SRetrieveChkptTriggerReq req = {0};
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SDecoder decoder = {0};
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
tDecoderClear(&decoder);
tqError("vgId:%d invalid retrieve checkpoint-trigger req received", vgId);
return TSDB_CODE_INVALID_MSG;
}
tDecoderClear(&decoder);
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from s-task:0x%" PRId64, vgId,
req.downstreamTaskId);
return TSDB_CODE_STREAM_NOT_LEADER;
}

View File

@ -828,14 +828,25 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
}
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t code = 0;
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SDecoder decoder;
int32_t type = pReq->reqType;
int32_t vgId = pMeta->vgId;
int32_t code = 0;
SStreamTaskRunReq req = {0};
tDecoderInit(&decoder, (uint8_t*)msg, len);
if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
tqError("vgId:%d failed to decode task run req, code:%s", pMeta->vgId, tstrerror(code));
tDecoderClear(&decoder);
return TSDB_CODE_SUCCESS;
}
tDecoderClear(&decoder);
int32_t type = req.reqType;
if (type == STREAM_EXEC_T_START_ONE_TASK) {
code = streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId);
code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId);
return 0;
} else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
code = streamMetaStartAllTasks(pMeta);
@ -847,11 +858,11 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
code = streamMetaStopAllTasks(pMeta);
return 0;
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
return code;
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
if (pTask != NULL && (code == 0)) {
char* pStatus = NULL;
@ -873,7 +884,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
}
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
if ((pTask != NULL) && (code == 0)) { // even in halt status, the data in inputQ must be processed
char* p = NULL;
if (streamTaskReadyToRun(pTask, &p)) {
@ -890,7 +901,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
return 0;
} else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
// todo add one function to handle this
tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, pReq->taskId);
tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, req.taskId);
return code;
}
}
@ -939,7 +950,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
}
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
SVResetStreamTaskReq* pReq = (SVResetStreamTaskReq*)pMsg;
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
@ -954,17 +965,13 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
streamMutexLock(&pTask->lock);
streamTaskClearCheckInfo(pTask, true);
streamTaskSetFailedCheckpointId(pTask, pReq->chkptId);
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState.state == TASK_STATUS__CK) {
int32_t tranId = 0;
int64_t activeChkId = 0;
streamTaskGetActiveCheckpointInfo(pTask, &tranId, &activeChkId);
tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d",
pTask->id.idStr, activeChkId, tranId);
streamTaskSetStatusReady(pTask);
tqDebug("s-task:%s reset checkpoint status to ready", pTask->id.idStr);
} else if (pState.state == TASK_STATUS__UNINIT) {
// tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
// tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
@ -980,25 +987,36 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
}
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont;
SRetrieveChkptTriggerReq req = {0};
SStreamTask* pTask = NULL;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SDecoder decoder = {0};
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->upstreamTaskId, &pTask);
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
tDecoderClear(&decoder);
tqError("vgId:%d invalid retrieve checkpoint-trigger req received", pMeta->vgId);
return TSDB_CODE_INVALID_MSG;
}
tDecoderClear(&decoder);
int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
if (pTask == NULL || (code != 0)) {
tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64
tqError("vgId:%d process retrieve checkpoint-trigger, checkpointId:%" PRId64
" from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pReq->checkpointId, (int32_t)pReq->downstreamTaskId, pReq->upstreamTaskId);
pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64,
pReq->upstreamTaskId, (int32_t)pReq->downstreamTaskId, pReq->checkpointId);
req.upstreamTaskId, (int32_t)req.downstreamTaskId, req.checkpointId);
if (pTask->status.downstreamReady != 1) {
tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
pTask->id.idStr, (int32_t)pReq->downstreamTaskId);
pTask->id.idStr, (int32_t)req.downstreamTaskId);
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
TSDB_CODE_STREAM_TASK_IVLD_STATUS);
streamMetaReleaseTask(pMeta, pTask);
return code;
@ -1010,19 +1028,19 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
int64_t checkpointId = 0;
streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
if (checkpointId != pReq->checkpointId) {
if (checkpointId != req.checkpointId) {
tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64
" req:%" PRId64,
pTask->id.idStr, pReq->downstreamTaskId, checkpointId, pReq->checkpointId);
pTask->id.idStr, req.downstreamTaskId, checkpointId, req.checkpointId);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_INVALID_MSG;
}
if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) {
if (streamTaskAlreadySendTrigger(pTask, req.downstreamNodeId)) {
// re-send the lost checkpoint-trigger msg to downstream task
tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
(int32_t)pReq->downstreamTaskId, checkpointId, transId);
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
(int32_t)req.downstreamTaskId, checkpointId, transId);
code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
TSDB_CODE_SUCCESS);
} else { // not send checkpoint-trigger yet, wait
int32_t recv = 0, total = 0;
@ -1036,7 +1054,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
"sending checkpoint-source/trigger",
pTask->id.idStr, recv, total);
}
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
TSDB_CODE_ACTION_IN_PROGRESS);
}
} else { // upstream not recv the checkpoint-source/trigger till now
@ -1048,7 +1066,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
"s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
"upstream sending checkpoint-source/trigger",
pTask->id.idStr);
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
TSDB_CODE_ACTION_IN_PROGRESS);
}
@ -1057,23 +1075,34 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
}
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SCheckpointTriggerRsp rsp = {0};
SStreamTask* pTask = NULL;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SDecoder decoder = {0};
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId, &pTask);
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeCheckpointTriggerRsp(&decoder, &rsp) < 0) {
tDecoderClear(&decoder);
tqError("vgId:%d invalid retrieve checkpoint-trigger rsp received", pMeta->vgId);
return TSDB_CODE_INVALID_MSG;
}
tDecoderClear(&decoder);
int32_t code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.taskId, &pTask);
if (pTask == NULL || (code != 0)) {
tqError(
"vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pRsp->taskId);
pMeta->vgId, rsp.taskId);
return code;
}
tqDebug(
"s-task:%s recv re-send checkpoint-trigger msg from through retrieve/rsp channel, upstream:0x%x, "
"checkpointId:%" PRId64 ", transId:%d",
pTask->id.idStr, pRsp->upstreamTaskId, pRsp->checkpointId, pRsp->transId);
"s-task:%s recv re-send checkpoint-trigger msg through retrieve/rsp channel, upstream:0x%x, checkpointId:%" PRId64
", transId:%d",
pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId);
code = streamTaskProcessCheckpointTriggerRsp(pTask, pRsp);
code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
@ -1203,7 +1232,23 @@ int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
}
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
return streamProcessHeartbeatRsp(pMeta, pMsg->pCont);
SMStreamHbRspMsg rsp = {0};
int32_t code = 0;
SDecoder decoder;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
tDecoderInit(&decoder, (uint8_t*)msg, len);
code = tDecodeStreamHbRsp(&decoder, &rsp);
if (code < 0) {
terrno = TSDB_CODE_INVALID_MSG;
tDecoderClear(&decoder);
tqError("vgId:%d failed to parse hb rsp msg, code:%s", pMeta->vgId, tstrerror(terrno));
return terrno;
}
tDecoderClear(&decoder);
return streamProcessHeartbeatRsp(pMeta, &rsp);
}
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
@ -1237,7 +1282,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SRestoreCheckpointInfo req = {0};
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) {
if ((code = tDecodeRestoreCheckpointInfo(&decoder, &req)) < 0) {
tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
tDecoderClear(&decoder);
return TSDB_CODE_SUCCESS;

View File

@ -667,7 +667,7 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
int64_t nRow = imem->nRow;
int64_t nDel = imem->nDel;
if (nRow == 0 && nDel == 0) {
if ((nRow == 0 && nDel == 0) || (tsBypassFlag & TSDB_BYPASS_RB_TSDB_COMMIT)) {
(void)taosThreadMutexLock(&tsdb->mutex);
tsdb->imem = NULL;
(void)taosThreadMutexUnlock(&tsdb->mutex);

View File

@ -122,6 +122,10 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmi
tb_uid_t suid = pSubmitTbData->suid;
tb_uid_t uid = pSubmitTbData->uid;
if (tsBypassFlag & TSDB_BYPASS_RB_TSDB_WRITE_MEM) {
goto _err;
}
// create/get STbData to op
code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
if (code) {

View File

@ -362,6 +362,10 @@ static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
int32_t lino = 0;
if (tsBypassFlag & TSDB_BYPASS_RA_RPC_RECV_SUBMIT) {
return TSDB_CODE_MSG_PREPROCESSED;
}
SDecoder *pCoder = &(SDecoder){0};
if (taosHton64(((SSubmitReq2Msg *)pMsg->pCont)->version) != 1) {

View File

@ -3465,11 +3465,6 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
goto _end;
}
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
if (!pUpInfo) {
lino = __LINE__;
goto _end;
}
SDecoder decoder = {0};
pDeCoder = &decoder;
tDecoderInit(pDeCoder, buf, tlen);
@ -3478,14 +3473,21 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
goto _end;
}
code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo);
if (code == TSDB_CODE_SUCCESS) {
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
pInfo->pUpdateInfo = pUpInfo;
} else {
taosMemoryFree(pUpInfo);
lino = __LINE__;
goto _end;
if (pInfo->pUpdateInfo != NULL) {
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
if (!pUpInfo) {
lino = __LINE__;
goto _end;
}
code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo);
if (code == TSDB_CODE_SUCCESS) {
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
pInfo->pUpdateInfo = pUpInfo;
} else {
taosMemoryFree(pUpInfo);
lino = __LINE__;
goto _end;
}
}
if (tDecodeIsEnd(pDeCoder)) {

View File

@ -1345,30 +1345,19 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
#if 1
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
msg = NULL;
SCH_ERR_JRET(code);
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
}
#else
if (TDMT_VND_SUBMIT != msgType) {
if ((tsBypassFlag & TSDB_BYPASS_RB_RPC_SEND_SUBMIT) && (TDMT_VND_SUBMIT == msgType)) {
taosMemoryFree(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
} else {
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
msg = NULL;
SCH_ERR_JRET(code);
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
}
} else {
taosMemoryFree(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
}
#endif
return TSDB_CODE_SUCCESS;

View File

@ -192,7 +192,6 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
void streamTaskSetFailedCheckpointId(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
@ -245,6 +244,9 @@ int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger);
int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval,
STimeWindow* pLatestWindow, const char* id);
// inject stream errors
void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId);
#ifdef __cplusplus
}
#endif

View File

@ -161,33 +161,52 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
SRpcHandleInfo* pRpcInfo, int32_t code) {
int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
void* pBuf = rpcMallocCont(size);
if (pBuf == NULL) {
int32_t ret = 0;
int32_t tlen = 0;
void* buf = NULL;
SEncoder encoder;
SCheckpointTriggerRsp req = {.streamId = pTask->id.streamId,
.upstreamTaskId = pTask->id.taskId,
.taskId = dstTaskId,
.rspCode = code};
if (code == TSDB_CODE_SUCCESS) {
req.checkpointId = pTask->chkInfo.pActiveInfo->activeId;
req.transId = pTask->chkInfo.pActiveInfo->transId;
} else {
req.checkpointId = -1;
req.transId = -1;
}
tEncodeSize(tEncodeCheckpointTriggerRsp, &req, tlen, ret);
if (ret < 0) {
stError("s-task:%s encode checkpoint-trigger rsp msg failed, code:%s", pTask->id.idStr, tstrerror(code));
return ret;
}
buf = rpcMallocCont(tlen + sizeof(SMsgHead));
if (buf == NULL) {
stError("s-task:%s malloc chkpt-trigger rsp failed for task:0x%x, since out of memory", pTask->id.idStr, dstTaskId);
return terrno;
}
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
((SMsgHead*)buf)->vgId = htonl(downstreamNodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId);
pRsp->streamId = pTask->id.streamId;
pRsp->upstreamTaskId = pTask->id.taskId;
pRsp->taskId = dstTaskId;
pRsp->rspCode = code;
if (code == TSDB_CODE_SUCCESS) {
pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
pRsp->transId = pTask->chkInfo.pActiveInfo->transId;
} else {
pRsp->checkpointId = -1;
pRsp->transId = -1;
tEncoderInit(&encoder, abuf, tlen);
if ((ret = tEncodeCheckpointTriggerRsp(&encoder, &req)) < 0) {
rpcFreeCont(buf);
tEncoderClear(&encoder);
stError("encode checkpoint-trigger rsp failed, code:%s", tstrerror(code));
return ret;
}
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo};
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = tlen + sizeof(SMsgHead), .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
return 0;
return ret;
}
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
@ -222,14 +241,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
stError("s-task:%s vgId:%d current checkpointId:%" PRId64
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
return code;
return TSDB_CODE_STREAM_INVLD_CHKPT;
}
if (pActiveInfo->failedId >= checkpointId) {
stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
" discard the checkpoint-trigger block",
id, vgId, checkpointId, transId, pActiveInfo->failedId);
return code;
return TSDB_CODE_STREAM_INVLD_CHKPT;
}
if (pTask->chkInfo.checkpointId == checkpointId) {
@ -255,8 +274,7 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
"the interrupted checkpoint",
id, vgId, pBlock->srcTaskId);
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
return code;
return TSDB_CODE_STREAM_INVLD_CHKPT;
}
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
@ -264,14 +282,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
" discard",
id, vgId, pActiveInfo->activeId, checkpointId);
return code;
return TSDB_CODE_STREAM_INVLD_CHKPT;
} else { // checkpointId == pActiveInfo->activeId
if (pActiveInfo->allUpstreamTriggerRecv == 1) {
stDebug(
"s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
"checkpointId:%" PRId64 " transId:%d",
id, vgId, checkpointId, transId);
return code;
return TSDB_CODE_STREAM_INVLD_CHKPT;
}
if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
@ -283,17 +301,17 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
}
if (p->upstreamTaskId == pBlock->srcTaskId) {
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
stWarn("s-task:%s repeatly recv checkpoint-trigger msg from task:0x%x vgId:%d, checkpointId:%" PRId64
", prev recvTs:%" PRId64 " discard",
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
return code;
return TSDB_CODE_STREAM_INVLD_CHKPT;
}
}
}
}
}
return 0;
return TSDB_CODE_SUCCESS;
}
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
@ -317,6 +335,9 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId);
streamMutexUnlock(&pTask->lock);
if (code) {
if (taskLevel != TASK_LEVEL__SOURCE) { // the checkpoint-trigger is discard, open the inputQ for upstream tasks
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
}
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
@ -330,6 +351,11 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
pActiveInfo->activeId = checkpointId;
pActiveInfo->transId = transId;
if (pTask->chkInfo.startTs == 0) {
pTask->chkInfo.startTs = taosGetTimestampMs();
pTask->execInfo.checkpoint += 1;
}
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
@ -373,6 +399,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
return code;
}
#if 0
chkptFailedByRetrieveReqToSource(pTask, checkpointId);
#endif
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure
@ -382,11 +412,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
streamFreeQitem((SStreamQueueItem*)pBlock);
}
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
if (pTask->chkInfo.startTs == 0) {
pTask->chkInfo.startTs = taosGetTimestampMs();
pTask->execInfo.checkpoint += 1;
}
// todo: handle this
// update the child Id for downstream tasks
code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
@ -562,7 +587,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
}
streamMutexUnlock(&pInfo->lock);
stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", current checkpointId:%" PRId64,
stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", latest checkpointId:%" PRId64,
pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId);
}
@ -682,15 +707,22 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
return TSDB_CODE_SUCCESS;
}
void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId) {
struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
if (pInfo->activeId <= 0) {
stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr);
if (failedId <= 0) {
stWarn("s-task:%s failedId is 0, not update the failed checkpoint info, current failedId:%" PRId64
" activeId:%" PRId64,
pTask->id.idStr, pInfo->failedId, pInfo->activeId);
} else {
pInfo->failedId = pInfo->activeId;
stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d)", pTask->id.idStr, pInfo->activeId,
pInfo->transId);
if (failedId <= pInfo->failedId) {
stDebug("s-task:%s failedId:%" PRId64 " not update to:%" PRId64, pTask->id.idStr, pInfo->failedId, failedId);
} else {
stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d) activeId:%" PRId64
" prev failedId:%" PRId64,
pTask->id.idStr, failedId, pInfo->transId, pInfo->activeId, pInfo->failedId);
pInfo->failedId = failedId;
}
}
}
@ -698,7 +730,7 @@ void streamTaskSetCheckpointFailed(SStreamTask* pTask) {
streamMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask).state;
if (status == TASK_STATUS__CK) {
streamTaskSetFailedCheckpointId(pTask);
streamTaskSetFailedCheckpointId(pTask, pTask->chkInfo.pActiveInfo->activeId);
}
streamMutexUnlock(&pTask->lock);
}
@ -876,8 +908,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
}
} else { // clear the checkpoint info if failed
// set failed checkpoint id before clear the checkpoint info
streamMutexLock(&pTask->lock);
streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info
streamTaskSetFailedCheckpointId(pTask, ckId);
streamMutexUnlock(&pTask->lock);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
@ -1101,23 +1134,43 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
return TSDB_CODE_INVALID_PARA;
}
SRetrieveChkptTriggerReq* pReq = rpcMallocCont(sizeof(SRetrieveChkptTriggerReq));
if (pReq == NULL) {
code = terrno;
stError("vgId:%d failed to create msg to retrieve trigger msg for task:%s exec, code:out of memory", vgId, pId);
int32_t ret = 0;
int32_t tlen = 0;
void* buf = NULL;
SRpcMsg rpcMsg = {0};
SEncoder encoder;
SRetrieveChkptTriggerReq req = {.streamId = pTask->id.streamId,
.downstreamTaskId = pTask->id.taskId,
.downstreamNodeId = vgId,
.upstreamTaskId = pUpstreamTask->taskId,
.upstreamNodeId = pUpstreamTask->nodeId,
.checkpointId = checkpointId};
tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret);
if (ret < 0) {
stError("encode retrieve checkpoint-trigger msg failed, code:%s", tstrerror(code));
}
buf = rpcMallocCont(tlen + sizeof(SMsgHead));
if (buf == NULL) {
stError("vgId:%d failed to create retrieve checkpoint-trigger msg for task:%s exec, code:out of memory", vgId, pId);
continue;
}
pReq->head.vgId = htonl(pUpstreamTask->nodeId);
pReq->streamId = pTask->id.streamId;
pReq->downstreamTaskId = pTask->id.taskId;
pReq->downstreamNodeId = vgId;
pReq->upstreamTaskId = pUpstreamTask->taskId;
pReq->upstreamNodeId = pUpstreamTask->nodeId;
pReq->checkpointId = checkpointId;
((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(pUpstreamTask->nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SRpcMsg rpcMsg = {0};
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq));
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeRetrieveChkptTriggerReq(&encoder, &req)) < 0) {
rpcFreeCont(buf);
tEncoderClear(&encoder);
stError("encode retrieve checkpoint-trigger req failed, code:%s", tstrerror(code));
continue;
}
tEncoderClear(&encoder);
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, buf, tlen + sizeof(SMsgHead));
code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
if (code == TSDB_CODE_SUCCESS) {

View File

@ -0,0 +1,17 @@
#include "streamInt.h"
/**
* pre-request: checkpoint interval should be 60s
* @param pTask
* @param checkpointId
*/
void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId) {
streamMutexLock(&pTask->lock);
// set current checkpoint failed immediately, set failed checkpoint id before clear the checkpoint info
streamTaskSetFailedCheckpointId(pTask, checkpointId);
streamMutexUnlock(&pTask->lock);
// the checkpoint interval should be 60s, and the next checkpoint req should be issued by mnode
taosMsleep(65*1000);
}

View File

@ -83,13 +83,37 @@ int32_t streamTrySchedExec(SStreamTask* pTask) {
}
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
int32_t code = 0;
int32_t tlen = 0;
SStreamTaskRunReq req = {.streamId = streamId, .taskId = taskId, .reqType = execType};
tEncodeSize(tEncodeStreamTaskRunReq, &req, tlen, code);
if (code < 0) {
stError("s-task:0x%" PRIx64 " vgId:%d encode stream task run req failed, code:%s", streamId, vgId, tstrerror(code));
return code;
}
void* buf = rpcMallocCont(tlen + sizeof(SMsgHead));
if (buf == NULL) {
stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType,
tstrerror(terrno));
return terrno;
}
((SMsgHead*)buf)->vgId = vgId;
char* bufx = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, (uint8_t*)bufx, tlen);
if ((code = tEncodeStreamTaskRunReq(&encoder, &req)) < 0) {
rpcFreeCont(buf);
tEncoderClear(&encoder);
stError("s-task:0x%x vgId:%d encode run task msg failed, code:%s", taskId, vgId, tstrerror(code));
return code;
}
tEncoderClear(&encoder);
if (streamId != 0) {
stDebug("vgId:%d create msg to for task:0x%x, exec type:%d, %s", vgId, taskId, execType,
streamTaskGetExecType(execType));
@ -97,13 +121,8 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType));
}
pRunReq->head.vgId = vgId;
pRunReq->streamId = streamId;
pRunReq->taskId = taskId;
pRunReq->reqType = execType;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
int32_t code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)};
code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
if (code) {
stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId);
}

View File

@ -22,6 +22,7 @@
#include "tstream.h"
#include "ttimer.h"
#include "wal.h"
#include "streamMsg.h"
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
@ -1246,13 +1247,13 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
taosMemoryFree(pInfo);
}
//NOTE: clear the checkpoint id, and keep the failed id
// NOTE: clear the checkpoint id, and keep the failed id
// failedId for a task will increase as the checkpoint I.D. increases.
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
pInfo->activeId = 0;
pInfo->transId = 0;
pInfo->allUpstreamTriggerRecv = 0;
pInfo->dispatchTrigger = false;
// pInfo->failedId = 0;
taosArrayClear(pInfo->pDispatchTriggerList);
taosArrayClear(pInfo->pCheckpointReadyRecvList);
@ -1303,4 +1304,178 @@ void streamTaskFreeRefId(int64_t* pRefId) {
}
metaRefMgtRemove(pRefId);
}
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId));
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet));
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId));
int32_t taskId = pTask->hTaskInfo.id.taskId;
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId));
taskId = pTask->streamTaskId.taskId;
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId));
TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer));
TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey));
int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz));
for (int32_t i = 0; i < epSz; i++) {
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo));
}
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg));
}
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid));
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName));
TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper));
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId));
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved));
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId));
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet));
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
}
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
tEndEncode(pEncoder);
_exit:
return code;
}
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
int32_t taskId = 0;
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver));
if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) {
TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
}
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId));
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet));
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
pTask->hTaskInfo.id.taskId = taskId;
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId));
pTask->streamTaskId.taskId = taskId;
TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer));
TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey));
int32_t epSz = -1;
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0);
if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
for (int32_t i = 0; i < epSz; i++) {
SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
if (pInfo == NULL) {
TAOS_CHECK_EXIT(terrno);
}
if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) {
taosMemoryFreeClear(pInfo);
goto _exit;
}
if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
}
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg));
}
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid));
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName));
pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) {
TAOS_CHECK_EXIT(terrno);
}
TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper));
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId));
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved));
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId));
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet));
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo));
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName));
}
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam));
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5));
}
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
tEndDecode(pDecoder);
_exit:
return code;
}

View File

@ -234,6 +234,10 @@ struct SSyncNode {
bool isStart;
// statis
int32_t sendCount;
int32_t recvCount;
int32_t slowCount;
};
// open/close --------------

View File

@ -39,6 +39,7 @@ typedef struct SSyncLogReplMgr {
int64_t peerStartTime;
int32_t retryBackoff;
int32_t peerId;
int32_t sendCount;
} SSyncLogReplMgr;
typedef struct SSyncLogBufEntry {

View File

@ -104,6 +104,11 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
goto _IGNORE;
}
int32_t nRef = atomic_fetch_add_32(&ths->recvCount, 1);
if (nRef <= 0) {
sError("vgId:%d, recv count is %d", ths->vgId, nRef);
}
int32_t code = syncBuildAppendEntriesReply(&rpcRsp, ths->vgId);
if (code != 0) {
syncLogRecvAppendEntries(ths, pMsg, "build rsp error");

View File

@ -88,6 +88,22 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
pMsg->destId = *destRaftId;
TAOS_CHECK_RETURN(syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg));
int32_t nRef = 0;
if (pSyncNode != NULL) {
nRef = atomic_fetch_add_32(&pSyncNode->sendCount, 1);
if (nRef <= 0) {
sError("vgId:%d, send count is %d", pSyncNode->vgId, nRef);
}
}
SSyncLogReplMgr* mgr = syncNodeGetLogReplMgr(pSyncNode, (SRaftId*)destRaftId);
if (mgr != NULL) {
nRef = atomic_fetch_add_32(&mgr->sendCount, 1);
if (nRef <= 0) {
sError("vgId:%d, send count is %d", pSyncNode->vgId, nRef);
}
}
TAOS_RETURN(TSDB_CODE_SUCCESS);
}

View File

@ -152,8 +152,9 @@ static void syncLogReplStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLe
for (int32_t i = 0; i < pSyncNode->replicaNum; i++) {
SSyncLogReplMgr* pMgr = pSyncNode->logReplMgrs[i];
if (pMgr == NULL) break;
len += tsnprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 ", %" PRId64 ", %" PRId64 "]", i, pMgr->restored,
len += tsnprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 ", %" PRId64 ", %" PRId64 "] ", i, pMgr->restored,
pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
len += tsnprintf(buf + len, bufLen - len, "%d", pMgr->sendCount);
if (i + 1 < pSyncNode->replicaNum) {
len += tsnprintf(buf + len, bufLen - len, "%s", ", ");
}
@ -234,14 +235,15 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
"aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s, arb-token:%s",
", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s, arb-token:%s, msg[sent:%d, recv:%d, slow-recev:%d]",
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex,
appliedIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum,
pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex,
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr,
replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr, pNode->arbToken);
replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, pNode->recvCount,
pNode->slowCount);
}
}

View File

@ -1679,7 +1679,7 @@ void cliConnCb(uv_connect_t* req, int status) {
STUB_RAND_NETWORK_ERR(status);
if (status != 0) {
tDebug("%s conn %p failed to connect to %s since %s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr,
tError("%s conn %p failed to connect to %s since %s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr,
uv_strerror(status));
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr);
TAOS_UNUSED(transUnrefCliHandle(pConn));
@ -1832,15 +1832,20 @@ static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
if (code == 0) {
size_t len = strlen(fqdn);
uint32_t* v = taosHashGet(cache, fqdn, len);
if (addr != *v) {
char old[TSDB_FQDN_LEN] = {0}, new[TSDB_FQDN_LEN] = {0};
tinet_ntoa(old, *v);
tinet_ntoa(new, addr);
tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new);
code = taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
if (v != NULL) {
if (addr != *v) {
char old[TSDB_FQDN_LEN] = {0}, new[TSDB_FQDN_LEN] = {0};
tinet_ntoa(old, *v);
tinet_ntoa(new, addr);
tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new);
code = taosHashPut(cache, fqdn, len, &addr, sizeof(addr));
}
} else {
code = taosHashPut(cache, fqdn, len, &addr, sizeof(addr));
}
} else {
code = TSDB_CODE_RPC_FQDN_ERROR; // TSDB_CODE_RPC_INVALID_FQDN;
tWarn("failed to get ip from fqdn:%s since %s", fqdn, tstrerror(code));
}
return code;
}
@ -2933,10 +2938,8 @@ void cliMayResetRespCode(SCliReq* pReq, STransMsg* pResp) {
// check whole vnodes is offline on this vgroup
if (((pCtx->epSet != NULL) && pCtx->epsetRetryCnt >= pCtx->epSet->numOfEps) || pCtx->retryStep > 0) {
if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
pResp->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
} else if (pResp->code == TSDB_CODE_RPC_BROKEN_LINK) {
pResp->code = TSDB_CODE_RPC_SOMENODE_BROKEN_LINK;
if (pResp->code == TSDB_CODE_RPC_BROKEN_LINK) {
pResp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; // TSDB_CODE_RPC_SOMENODE_BROKEN_LINK;
}
}
}

View File

@ -854,7 +854,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_CONFLICT_EVENT, "Stream conflict event")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVLD_CHKPT, "Invalid checkpoint trigger msg")
// TDLite
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")

View File

@ -100,6 +100,70 @@ class TDTestCase(TBase):
tdSql.query('show dnodes')
tdSql.checkData(0, 3, "64")
def checkKeyValue(self, res, key, value, ikey = 0, ival = 1):
result = False
for row in res:
if row[ikey] == key:
if row[ival] != value:
raise Exception(f"key:{key} value:{row[ival]} != {value}")
else:
tdLog.info(f"key:{key} value:{row[ival]} == {value}")
result = True
break
if not result:
raise Exception(f"key:{key} not found")
def alterBypassFlag(self):
"""Add test case for altering bypassFlag(TD-32907)
"""
tdSql.execute(f"drop database if exists db")
tdSql.execute(f"create database db")
tdSql.execute("use db")
self.checkKeyValue(tdSql.getResult("show local variables;"), "bypassFlag", "0")
self.checkKeyValue(tdSql.getResult("show dnode 1 variables like 'bypassFlag'"), "bypassFlag", "0", 1, 2)
tdSql.execute("alter local 'bypassFlag 1'")
self.checkKeyValue(tdSql.getResult("show local variables;"), "bypassFlag", "1")
self.checkKeyValue(tdSql.getResult("show dnode 1 variables like 'bypassFlag'"), "bypassFlag", "0", 1, 2)
tdSql.execute("create table stb0(ts timestamp, c0 int) tags(t0 int)")
tdSql.execute("create table ctb0 using stb0 tags(0)")
tdSql.execute("insert into ctb0 values(now, 1)")
tdSql.query("select * from stb0")
tdSql.checkRows(0)
tdSql.execute("alter local 'bypassFlag 0'")
tdSql.execute("alter all dnodes 'bypassFlag 2'")
self.checkKeyValue(tdSql.getResult("show local variables"), "bypassFlag", "0")
self.checkKeyValue(tdSql.getResult("show dnode 1 variables like 'bypassFlag'"), "bypassFlag", "2", 1, 2)
tdSql.execute("insert into ctb0 values(now, 2)")
tdSql.query("select * from stb0")
tdSql.checkRows(0)
tdSql.execute("alter all dnodes 'bypassFlag 4'")
self.checkKeyValue(tdSql.getResult("show dnode 1 variables like 'bypassFlag'"), "bypassFlag", "4", 1, 2)
tdSql.execute("insert into ctb0 values(now, 4)")
tdSql.execute("insert into ctb1 using stb0 tags(1) values(now, 10)")
tdSql.query("select * from stb0")
tdSql.checkRows(0)
tdSql.query("show db.tables")
tdSql.checkRows(2)
tdSql.execute("alter all dnodes 'bypassFlag 8'")
self.checkKeyValue(tdSql.getResult("show dnode 1 variables like 'bypassFlag'"), "bypassFlag", "8", 1, 2)
tdSql.execute("insert into ctb0 values(now, 8)")
tdSql.execute("insert into ctb1 values(now, 18)")
tdSql.query("select * from stb0")
tdSql.checkRows(2)
tdSql.execute("flush database db")
tdSql.query("select * from stb0")
tdSql.checkRows(0)
tdSql.execute("alter all dnodes 'bypassFlag 0'")
self.checkKeyValue(tdSql.getResult("show local variables"), "bypassFlag", "0")
self.checkKeyValue(tdSql.getResult("show dnode 1 variables like 'bypassFlag'"), "bypassFlag", "0", 1, 2)
tdSql.execute("insert into ctb0 values(now, 80)")
tdSql.execute("insert into ctb1 values(now, 180)")
tdSql.query("select * from stb0")
tdSql.checkRows(2)
tdSql.execute("flush database db")
tdSql.query("select * from stb0")
tdSql.checkRows(2)
# run
def run(self):
tdLog.debug(f"start to excute {__file__}")
@ -110,6 +174,8 @@ class TDTestCase(TBase):
self.alterTtlConfig()
# TS-5390
self.alterCachemodel()
# TD-32907
self.alterBypassFlag()
tdLog.success(f"{__file__} successfully executed")

View File

@ -9,4 +9,5 @@ requests
pexpect
faker
pyopenssl
hyperloglog
hyperloglog
tzlocal

View File

@ -53,6 +53,8 @@ if $rows != 5 then
return -1
endi
sleep 500
print =============== show vgroups2
sql show d2.vgroups
if $rows != 2 then
@ -126,13 +128,14 @@ if $data12 != d2 then
endi
if $data13 != leader then
print expect leader , actual $13
return -1
endi
print $data14
print $data15
print $data14 , $data15
if $data16 != 1 then
print expect 1, acutal $data16
return -1
endi