Merge branch '3.0' of https://github.com/taosdata/TDengine into 3.0
This commit is contained in:
commit
4775642c59
|
@ -243,7 +243,7 @@ curl -u root:taosdata -d "show databases" 127.0.0.1:6041/rest/sql
|
||||||
{"code":0,"column_meta":[["name","VARCHAR",64]],"data":[["information_schema"],["performance_schema"],["test"],["test1"]],"rows":4}
|
{"code":0,"column_meta":[["name","VARCHAR",64]],"data":[["information_schema"],["performance_schema"],["test"],["test1"]],"rows":4}
|
||||||
```
|
```
|
||||||
|
|
||||||
## Test cluster
|
## Test cluster
|
||||||
|
|
||||||
### Data preparation
|
### Data preparation
|
||||||
|
|
||||||
|
@ -335,7 +335,7 @@ tdengine-1 1/1 Running 1 (6m48s ago) 20m 10.244.0.59 node84
|
||||||
tdengine-2 1/1 Running 0 21m 10.244.1.223 node85 <none> <none>
|
tdengine-2 1/1 Running 0 21m 10.244.1.223 node85 <none> <none>
|
||||||
```
|
```
|
||||||
|
|
||||||
At this time, the cluster mnode has a re-election, and the monde on dnode1 becomes the leader.
|
At this time, the cluster mnode has a re-election, and the monde on dnode2 becomes the leader.
|
||||||
|
|
||||||
```Bash
|
```Bash
|
||||||
kubectl exec -it tdengine-1 -n tdengine-test -- taos -s "show mnodes\G"
|
kubectl exec -it tdengine-1 -n tdengine-test -- taos -s "show mnodes\G"
|
||||||
|
|
|
@ -10,9 +10,9 @@ description: 利用 Kubernetes 部署 TDengine 集群的详细指南
|
||||||
|
|
||||||
为了满足[高可用](https://docs.taosdata.com/tdinternal/high-availability/)的需求,集群需要满足如下要求:
|
为了满足[高可用](https://docs.taosdata.com/tdinternal/high-availability/)的需求,集群需要满足如下要求:
|
||||||
|
|
||||||
- 3个及以上 dnode :TDengine 的同一个 vgroup 中的多个 vnode ,不允许同时分布在一个 dnode ,所以如果创建3副本的数据库,则 dnode 数大于等于3
|
- 3 个及以上 dnode :TDengine 的同一个 vgroup 中的多个 vnode ,不允许同时分布在一个 dnode ,所以如果创建 3 副本的数据库,则 dnode 数大于等于 3
|
||||||
- 3个 mnode :mnode 负责整个集群的管理工作,TDengine 默认是一个 mnode。如果这个 mnode 所在的 dnode 掉线,则整个集群不可用。
|
- 3 个 mnode :mnode 负责整个集群的管理工作,TDengine 默认是一个 mnode。如果这个 mnode 所在的 dnode 掉线,则整个集群不可用。
|
||||||
- 数据库的3副本:TDengine 的副本配置是数据库级别,所以数据库3副本可满足在3个 dnode 的集群中,任意一个 dnode 下线,都不影响集群的正常使用。**如果下线** **dnode** **个数为2时,此时集群不可用,****因为****RAFT无法完成选举****。**(企业版:在灾难恢复场景,任一节点数据文件损坏,都可以通过重新拉起dnode进行恢复)
|
- 数据库的 3 副本:TDengine 的副本配置是数据库级别,所以数据库 3 副本可满足在 3 个 dnode 的集群中,任意一个 dnode 下线,都不影响集群的正常使用。**如果下线** **dnode** **个数为 2 时,此时集群不可用,\*\***因为\***\*RAFT 无法完成选举\*\***。\*\*(企业版:在灾难恢复场景,任一节点数据文件损坏,都可以通过重新拉起 dnode 进行恢复)
|
||||||
|
|
||||||
## 前置条件
|
## 前置条件
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ spec:
|
||||||
|
|
||||||
根据 Kubernetes 对各类部署的说明,我们将使用 StatefulSet 作为 TDengine 的部署资源类型。 创建文件 `tdengine.yaml`,其中 replicas 定义集群节点的数量为 3。节点时区为中国(Asia/Shanghai),每个节点分配 5G 标准(standard)存储(参考[Storage Classes](https://kubernetes.io/docs/concepts/storage/storage-classes/) 配置 storage class )。你也可以根据实际情况进行相应修改。
|
根据 Kubernetes 对各类部署的说明,我们将使用 StatefulSet 作为 TDengine 的部署资源类型。 创建文件 `tdengine.yaml`,其中 replicas 定义集群节点的数量为 3。节点时区为中国(Asia/Shanghai),每个节点分配 5G 标准(standard)存储(参考[Storage Classes](https://kubernetes.io/docs/concepts/storage/storage-classes/) 配置 storage class )。你也可以根据实际情况进行相应修改。
|
||||||
|
|
||||||
请特别注意startupProbe的配置,在 dnode 的 Pod 掉线一段时间后,再重新启动,这个时候新上线的 dnode 会短暂不可用。如果startupProbe配置过小,Kubernetes 会认为该 Pod 处于不正常的状态,并尝试重启该 Pod,该 dnode 的 Pod 会频繁重启,始终无法恢复到正常状态。参考 [Configure Liveness, Readiness and Startup Probes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/)
|
请特别注意 startupProbe 的配置,在 dnode 的 Pod 掉线一段时间后,再重新启动,这个时候新上线的 dnode 会短暂不可用。如果 startupProbe 配置过小,Kubernetes 会认为该 Pod 处于不正常的状态,并尝试重启该 Pod,该 dnode 的 Pod 会频繁重启,始终无法恢复到正常状态。参考 [Configure Liveness, Readiness and Startup Probes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/)
|
||||||
|
|
||||||
```YAML
|
```YAML
|
||||||
---
|
---
|
||||||
|
@ -176,7 +176,7 @@ taos> show dnodes
|
||||||
Query OK, 3 row(s) in set (0.001853s)
|
Query OK, 3 row(s) in set (0.001853s)
|
||||||
```
|
```
|
||||||
|
|
||||||
查看当前mnode
|
查看当前 mnode
|
||||||
|
|
||||||
```Bash
|
```Bash
|
||||||
kubectl exec -it tdengine-1 -n tdengine-test -- taos -s "show mnodes\G"
|
kubectl exec -it tdengine-1 -n tdengine-test -- taos -s "show mnodes\G"
|
||||||
|
@ -191,14 +191,14 @@ reboot_time: 2023-07-19 17:54:19.520
|
||||||
Query OK, 1 row(s) in set (0.001282s)
|
Query OK, 1 row(s) in set (0.001282s)
|
||||||
```
|
```
|
||||||
|
|
||||||
## 创建mnode
|
## 创建 mnode
|
||||||
|
|
||||||
```Bash
|
```Bash
|
||||||
kubectl exec -it tdengine-0 -n tdengine-test -- taos -s "create mnode on dnode 2"
|
kubectl exec -it tdengine-0 -n tdengine-test -- taos -s "create mnode on dnode 2"
|
||||||
kubectl exec -it tdengine-0 -n tdengine-test -- taos -s "create mnode on dnode 3"
|
kubectl exec -it tdengine-0 -n tdengine-test -- taos -s "create mnode on dnode 3"
|
||||||
```
|
```
|
||||||
|
|
||||||
查看mnode
|
查看 mnode
|
||||||
|
|
||||||
```Bash
|
```Bash
|
||||||
kubectl exec -it tdengine-1 -n tdengine-test -- taos -s "show mnodes\G"
|
kubectl exec -it tdengine-1 -n tdengine-test -- taos -s "show mnodes\G"
|
||||||
|
@ -249,7 +249,7 @@ curl -u root:taosdata -d "show databases" 127.0.0.1:6041/rest/sql
|
||||||
|
|
||||||
#### taosBenchmark
|
#### taosBenchmark
|
||||||
|
|
||||||
通过taosBenchmark 创建一个3副本的数据库,同时写入1亿条数据,同时查看数据
|
通过 taosBenchmark 创建一个 3 副本的数据库,同时写入 1 亿条数据,同时查看数据
|
||||||
|
|
||||||
```Bash
|
```Bash
|
||||||
kubectl exec -it tdengine-0 -n tdengine-test -- taosBenchmark -I stmt -d test -n 10000 -t 10000 -a 3
|
kubectl exec -it tdengine-0 -n tdengine-test -- taosBenchmark -I stmt -d test -n 10000 -t 10000 -a 3
|
||||||
|
@ -264,7 +264,7 @@ taos> select count(*) from test.meters;
|
||||||
Query OK, 1 row(s) in set (0.103537s)
|
Query OK, 1 row(s) in set (0.103537s)
|
||||||
```
|
```
|
||||||
|
|
||||||
查看vnode分布,通过show dnodes
|
查看 vnode 分布,通过 show dnodes
|
||||||
|
|
||||||
```Bash
|
```Bash
|
||||||
kubectl exec -it tdengine-0 -n tdengine-test -- taos -s "show dnodes"
|
kubectl exec -it tdengine-0 -n tdengine-test -- taos -s "show dnodes"
|
||||||
|
@ -278,7 +278,7 @@ taos> show dnodes
|
||||||
Query OK, 3 row(s) in set (0.001357s)
|
Query OK, 3 row(s) in set (0.001357s)
|
||||||
```
|
```
|
||||||
|
|
||||||
通过show vgroup 查看 vnode 分布情况
|
通过 show vgroup 查看 vnode 分布情况
|
||||||
|
|
||||||
```Bash
|
```Bash
|
||||||
kubectl exec -it tdengine-0 -n tdengine-test -- taos -s "show test.vgroups"
|
kubectl exec -it tdengine-0 -n tdengine-test -- taos -s "show test.vgroups"
|
||||||
|
@ -299,7 +299,7 @@ Query OK, 8 row(s) in set (0.001488s)
|
||||||
|
|
||||||
#### 手工创建
|
#### 手工创建
|
||||||
|
|
||||||
常见一个三副本的test1,并创建一张表,写入2条数据
|
常见一个三副本的 test1,并创建一张表,写入 2 条数据
|
||||||
|
|
||||||
```Bash
|
```Bash
|
||||||
kubectl exec -it tdengine-0 -n tdengine-test -- \
|
kubectl exec -it tdengine-0 -n tdengine-test -- \
|
||||||
|
@ -310,7 +310,7 @@ kubectl exec -it tdengine-0 -n tdengine-test -- \
|
||||||
insert into t1 values(now, 1)(now+1s, 2);"
|
insert into t1 values(now, 1)(now+1s, 2);"
|
||||||
```
|
```
|
||||||
|
|
||||||
通过show test1.vgroup 查看xnode分布情况
|
通过 show test1.vgroup 查看 xnode 分布情况
|
||||||
|
|
||||||
```Bash
|
```Bash
|
||||||
kubectl exec -it tdengine-0 -n tdengine-test -- taos -s "show test1.vgroups"
|
kubectl exec -it tdengine-0 -n tdengine-test -- taos -s "show test1.vgroups"
|
||||||
|
@ -335,7 +335,7 @@ tdengine-1 1/1 Running 1 (6m48s ago) 20m 10.244.0.59 node84
|
||||||
tdengine-2 1/1 Running 0 21m 10.244.1.223 node85 <none> <none>
|
tdengine-2 1/1 Running 0 21m 10.244.1.223 node85 <none> <none>
|
||||||
```
|
```
|
||||||
|
|
||||||
此时集群mnode发生重新选举,dnode1上的monde 成为leader
|
此时集群 mnode 发生重新选举,dnode2 上的 monde 成为 leader
|
||||||
|
|
||||||
```Bash
|
```Bash
|
||||||
kubectl exec -it tdengine-1 -n tdengine-test -- taos -s "show mnodes\G"
|
kubectl exec -it tdengine-1 -n tdengine-test -- taos -s "show mnodes\G"
|
||||||
|
@ -389,7 +389,7 @@ taos> select *from test1.t1
|
||||||
Query OK, 4 row(s) in set (0.001994s)
|
Query OK, 4 row(s) in set (0.001994s)
|
||||||
```
|
```
|
||||||
|
|
||||||
同理,至于非leader得mnode掉线,读写当然可以正常进行,这里就不做过多的展示。
|
同理,至于非 leader 得 mnode 掉线,读写当然可以正常进行,这里就不做过多的展示。
|
||||||
|
|
||||||
## 集群扩容
|
## 集群扩容
|
||||||
|
|
||||||
|
@ -436,7 +436,7 @@ Query OK, 4 row(s) in set (0.003628s)
|
||||||
|
|
||||||
## 集群缩容
|
## 集群缩容
|
||||||
|
|
||||||
由于 TDengine 集群在扩缩容时会对数据进行节点间迁移,使用 kubectl 命令进行缩容需要首先使用 "drop dnodes" 命令(**如果集群中存在3副本的db,那么缩容后的** **dnode** **个数也要必须大于等于3,否则drop dnode操作会被中止**),然后再节点删除完成后再进行 Kubernetes 集群缩容。
|
由于 TDengine 集群在扩缩容时会对数据进行节点间迁移,使用 kubectl 命令进行缩容需要首先使用 "drop dnodes" 命令(**如果集群中存在 3 副本的 db,那么缩容后的** **dnode** **个数也要必须大于等于 3,否则 drop dnode 操作会被中止**),然后再节点删除完成后再进行 Kubernetes 集群缩容。
|
||||||
|
|
||||||
注意:由于 Kubernetes Statefulset 中 Pod 的只能按创建顺序逆序移除,所以 TDengine drop dnode 也需要按照创建顺序逆序移除,否则会导致 Pod 处于错误状态。
|
注意:由于 Kubernetes Statefulset 中 Pod 的只能按创建顺序逆序移除,所以 TDengine drop dnode 也需要按照创建顺序逆序移除,否则会导致 Pod 处于错误状态。
|
||||||
|
|
||||||
|
@ -459,7 +459,7 @@ Query OK, 3 row(s) in set (0.003324s)
|
||||||
kubectl scale statefulsets tdengine --replicas=3 -n tdengine-test
|
kubectl scale statefulsets tdengine --replicas=3 -n tdengine-test
|
||||||
```
|
```
|
||||||
|
|
||||||
最后一个 POD 将会被删除。使用命令 kubectl get pods -l app=tdengine 查看POD状态:
|
最后一个 POD 将会被删除。使用命令 kubectl get pods -l app=tdengine 查看 POD 状态:
|
||||||
|
|
||||||
```Plain
|
```Plain
|
||||||
kubectl get pod -l app=tdengine -n tdengine-test -o wide
|
kubectl get pod -l app=tdengine -n tdengine-test -o wide
|
||||||
|
@ -469,7 +469,7 @@ tdengine-1 1/1 Running 1 (7h9m ago) 7h23m 10.244.0.59 node84 <
|
||||||
tdengine-2 1/1 Running 0 5h45m 10.244.1.224 node85 <none> <none>
|
tdengine-2 1/1 Running 0 5h45m 10.244.1.224 node85 <none> <none>
|
||||||
```
|
```
|
||||||
|
|
||||||
POD删除后,需要手动删除PVC,否则下次扩容时会继续使用以前的数据导致无法正常加入集群。
|
POD 删除后,需要手动删除 PVC,否则下次扩容时会继续使用以前的数据导致无法正常加入集群。
|
||||||
|
|
||||||
```Bash
|
```Bash
|
||||||
kubectl delete pvc aosdata-tdengine-3 -n tdengine-test
|
kubectl delete pvc aosdata-tdengine-3 -n tdengine-test
|
||||||
|
@ -502,7 +502,7 @@ Query OK, 4 row(s) in set (0.003881s)
|
||||||
|
|
||||||
## 清理 TDengine 集群
|
## 清理 TDengine 集群
|
||||||
|
|
||||||
> **删除pvc时需要注意下pv persistentVolumeReclaimPolicy策略,建议改为Delete,这样在删除pvc时才会自动清理pv,同时会清理底层的csi存储资源,如果没有配置删除pvc自动清理pv的策略,再删除pvc后,在手动清理pv时,pv对应的csi存储资源可能不会被释放。**
|
> **删除 pvc 时需要注意下 pv persistentVolumeReclaimPolicy 策略,建议改为 Delete,这样在删除 pvc 时才会自动清理 pv,同时会清理底层的 csi 存储资源,如果没有配置删除 pvc 自动清理 pv 的策略,再删除 pvc 后,在手动清理 pv 时,pv 对应的 csi 存储资源可能不会被释放。**
|
||||||
|
|
||||||
完整移除 TDengine 集群,需要分别清理 statefulset、svc、configmap、pvc。
|
完整移除 TDengine 集群,需要分别清理 statefulset、svc、configmap、pvc。
|
||||||
|
|
||||||
|
@ -537,8 +537,8 @@ Query OK, 4 row(s) in set (0.003862s)
|
||||||
对于在 Kubernetes 环境下 TDengine 的高可用和高可靠来说,对于硬件损坏、灾难恢复,分为两个层面来讲:
|
对于在 Kubernetes 环境下 TDengine 的高可用和高可靠来说,对于硬件损坏、灾难恢复,分为两个层面来讲:
|
||||||
|
|
||||||
1. 底层的分布式块存储具备的灾难恢复能力,块存储的多副本,当下流行的分布式块存储如 Ceph,就具备多副本能力,将存储副本扩展到不同的机架、机柜、机房、数据中心(或者直接使用公有云厂商提供的块存储服务)
|
1. 底层的分布式块存储具备的灾难恢复能力,块存储的多副本,当下流行的分布式块存储如 Ceph,就具备多副本能力,将存储副本扩展到不同的机架、机柜、机房、数据中心(或者直接使用公有云厂商提供的块存储服务)
|
||||||
2. TDengine的灾难恢复,在 TDengine Enterprise 中,本身具备了当一个 dnode 永久下线(物理机磁盘损坏,数据分拣丢失)后,重新拉起一个空白的dnode来恢复原dnode的工作。
|
2. TDengine 的灾难恢复,在 TDengine Enterprise 中,本身具备了当一个 dnode 永久下线(物理机磁盘损坏,数据分拣丢失)后,重新拉起一个空白的 dnode 来恢复原 dnode 的工作。
|
||||||
|
|
||||||
最后,欢迎使用[TDengine Cloud](https://cloud.taosdata.com/),来体验一站式全托管的TDengine云服务。
|
最后,欢迎使用[TDengine Cloud](https://cloud.taosdata.com/),来体验一站式全托管的 TDengine 云服务。
|
||||||
|
|
||||||
> TDengine Cloud 是一个极简的全托管时序数据处理云服务平台,它是基于开源的时序数据库 TDengine 而开发的。除高性能的时序数据库之外,它还具有缓存、订阅和流计算等系统功能,而且提供了便利而又安全的数据分享、以及众多的企业级功能。它可以让物联网、工业互联网、金融、IT 运维监控等领域企业在时序数据的管理上大幅降低人力成本和运营成本。
|
> TDengine Cloud 是一个极简的全托管时序数据处理云服务平台,它是基于开源的时序数据库 TDengine 而开发的。除高性能的时序数据库之外,它还具有缓存、订阅和流计算等系统功能,而且提供了便利而又安全的数据分享、以及众多的企业级功能。它可以让物联网、工业互联网、金融、IT 运维监控等领域企业在时序数据的管理上大幅降低人力成本和运营成本。
|
||||||
|
|
|
@ -28,11 +28,13 @@ extern "C" {
|
||||||
#define GRANTS_COL_MAX_LEN 196
|
#define GRANTS_COL_MAX_LEN 196
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define GRANT_HEART_BEAT_MIN 2
|
#define GRANT_HEART_BEAT_MIN 2
|
||||||
#define GRANT_ACTIVE_CODE "activeCode"
|
#define GRANT_EXPIRE_VALUE (31556995201)
|
||||||
#define GRANT_FLAG_ALL (0x01)
|
#define GRANT_EXPIRE_UNLIMITED(v) ((v) == GRANT_EXPIRE_VALUE)
|
||||||
#define GRANT_FLAG_AUDIT (0x02)
|
#define GRANT_ACTIVE_CODE "activeCode"
|
||||||
#define GRANT_FLAG_VIEW (0x04)
|
#define GRANT_FLAG_ALL (0x01)
|
||||||
|
#define GRANT_FLAG_AUDIT (0x02)
|
||||||
|
#define GRANT_FLAG_VIEW (0x04)
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_GRANT_ALL,
|
TSDB_GRANT_ALL,
|
||||||
|
|
|
@ -41,7 +41,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
||||||
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
|
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
|
||||||
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
|
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
|
||||||
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen);
|
||||||
|
|
||||||
void tqSetRestoreVersionInfo(SStreamTask* pTask);
|
void tqSetRestoreVersionInfo(SStreamTask* pTask);
|
||||||
int32_t tqExpandStreamTask(SStreamTask* pTask);
|
int32_t tqExpandStreamTask(SStreamTask* pTask);
|
||||||
|
|
|
@ -681,7 +681,8 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeI
|
||||||
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal);
|
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal);
|
||||||
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
|
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
|
||||||
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
|
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
|
||||||
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pInfo, int32_t code);
|
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
||||||
|
SRpcHandleInfo* pInfo, int32_t code);
|
||||||
|
|
||||||
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
||||||
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue);
|
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue);
|
||||||
|
@ -770,7 +771,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
|
||||||
int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
|
int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
|
||||||
int32_t setCode);
|
int32_t setCode);
|
||||||
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask);
|
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask);
|
||||||
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq);
|
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq);
|
||||||
SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo();
|
SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo();
|
||||||
|
|
||||||
// stream task state machine, and event handling
|
// stream task state machine, and event handling
|
||||||
|
|
|
@ -1258,8 +1258,12 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
|
||||||
pStmt->bInfo.sBindRowNum = bind->num;
|
pStmt->bInfo.sBindRowNum = bind->num;
|
||||||
}
|
}
|
||||||
|
|
||||||
qBindStmtSingleColValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, colIdx,
|
code = qBindStmtSingleColValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
|
||||||
pStmt->bInfo.sBindRowNum);
|
pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum);
|
||||||
|
if (code) {
|
||||||
|
tscError("qBindStmtSingleColValue failed, error:%s", tstrerror(code));
|
||||||
|
STMT_ERR_RET(code);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t startUs4 = taosGetTimestampUs();
|
int64_t startUs4 = taosGetTimestampUs();
|
||||||
|
|
|
@ -85,7 +85,7 @@ static const SSysDbTableSchema clusterSchema[] = {
|
||||||
{.name = "uptime", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
{.name = "uptime", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
||||||
{.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
{.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||||
{.name = "expire_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema userDBSchema[] = {
|
static const SSysDbTableSchema userDBSchema[] = {
|
||||||
|
|
|
@ -23,6 +23,14 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define COL_DATA_SET_VAL_RET(pData, isNull, pObj) \
|
||||||
|
do { \
|
||||||
|
if ((code = colDataSetVal(pColInfo, numOfRows, (pData), (isNull))) != 0) { \
|
||||||
|
if (pObj) sdbRelease(pSdb, (pObj)); \
|
||||||
|
return code; \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
int32_t mndInitShow(SMnode *pMnode);
|
int32_t mndInitShow(SMnode *pMnode);
|
||||||
void mndCleanupShow(SMnode *pMnode);
|
void mndCleanupShow(SMnode *pMnode);
|
||||||
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp);
|
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp);
|
||||||
|
|
|
@ -280,6 +280,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
|
||||||
static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
int32_t code = 0;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
SClusterObj *pCluster = NULL;
|
SClusterObj *pCluster = NULL;
|
||||||
|
@ -290,31 +291,44 @@ static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *
|
||||||
|
|
||||||
cols = 0;
|
cols = 0;
|
||||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pCluster->id, false);
|
COL_DATA_SET_VAL_RET((const char *)&pCluster->id, false, pCluster);
|
||||||
|
|
||||||
char buf[tListLen(pCluster->name) + VARSTR_HEADER_SIZE] = {0};
|
char buf[tListLen(pCluster->name) + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(buf, pCluster->name, pShow->pMeta->pSchemas[cols].bytes);
|
STR_WITH_MAXSIZE_TO_VARSTR(buf, pCluster->name, pShow->pMeta->pSchemas[cols].bytes);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, buf, false);
|
COL_DATA_SET_VAL_RET(buf, false, pCluster);
|
||||||
|
|
||||||
int32_t upTime = mndGetClusterUpTimeImp(pCluster);
|
int32_t upTime = mndGetClusterUpTimeImp(pCluster);
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&upTime, false);
|
COL_DATA_SET_VAL_RET((const char *)&upTime, false, pCluster);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pCluster->createdTime, false);
|
COL_DATA_SET_VAL_RET((const char *)&pCluster->createdTime, false, pCluster);
|
||||||
|
|
||||||
char ver[12] = {0};
|
char ver[12] = {0};
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(ver, tsVersionName, pShow->pMeta->pSchemas[cols].bytes);
|
STR_WITH_MAXSIZE_TO_VARSTR(ver, tsVersionName, pShow->pMeta->pSchemas[cols].bytes);
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)ver, false);
|
COL_DATA_SET_VAL_RET((const char *)ver, false, pCluster);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
char expireTime[25] = {0};
|
||||||
if (tsExpireTime <= 0) {
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
|
||||||
|
if (GRANT_EXPIRE_UNLIMITED(tsExpireTime / 1000)) {
|
||||||
|
STR_WITH_MAXSIZE_TO_VARSTR(expireTime, "unlimited", pShow->pMeta->pSchemas[cols].bytes);
|
||||||
|
COL_DATA_SET_VAL_RET(expireTime, false, pCluster);
|
||||||
|
} else if (tsExpireTime <= 0) {
|
||||||
colDataSetNULL(pColInfo, numOfRows);
|
colDataSetNULL(pColInfo, numOfRows);
|
||||||
} else {
|
} else {
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&tsExpireTime, false);
|
char ts[20] = {0};
|
||||||
|
time_t expireSec = tsExpireTime / 1000;
|
||||||
|
struct tm ptm;
|
||||||
|
if (taosLocalTime(&expireSec, &ptm, ts) != NULL) {
|
||||||
|
strftime(ts, 20, "%Y-%m-%d %H:%M:%S", &ptm);
|
||||||
|
} else {
|
||||||
|
ts[0] = 0;
|
||||||
|
}
|
||||||
|
STR_WITH_MAXSIZE_TO_VARSTR(expireTime, ts, pShow->pMeta->pSchemas[cols].bytes);
|
||||||
|
COL_DATA_SET_VAL_RET(expireTime, false, pCluster);
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pSdb, pCluster);
|
sdbRelease(pSdb, pCluster);
|
||||||
|
|
|
@ -1075,7 +1075,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
|
||||||
mWarn("not all vnodes ready, quit from vnodes status check");
|
mWarn("not all vnodes ready, quit from vnodes status check");
|
||||||
taosArrayDestroy(pNodeSnapshot);
|
taosArrayDestroy(pNodeSnapshot);
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
return 0;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
|
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
|
||||||
|
@ -1911,9 +1911,51 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
||||||
bool updated = taskNodeIsUpdated(pMnode);
|
bool updated = taskNodeIsUpdated(pMnode);
|
||||||
if (updated) {
|
if (updated) {
|
||||||
mError("tasks are not ready for pause, node update detected");
|
mError("tasks are not ready for pause, node update detected");
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
{ // check for tasks, if tasks are not ready, not allowed to pause
|
||||||
|
bool found = false;
|
||||||
|
bool readyToPause = true;
|
||||||
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
|
||||||
|
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
|
||||||
|
|
||||||
|
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
|
||||||
|
if (pEntry == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEntry->id.streamId != pStream->uid) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
|
||||||
|
mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
|
||||||
|
pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
|
||||||
|
readyToPause = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
found = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
if (!found) {
|
||||||
|
mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!readyToPause) {
|
||||||
|
mError("stream:%s task not ready for pause yet", pauseReq.name);
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
STrans *pTrans =
|
STrans *pTrans =
|
||||||
doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream");
|
doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
|
|
|
@ -154,7 +154,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
case TDMT_STREAM_TASK_RESUME:
|
case TDMT_STREAM_TASK_RESUME:
|
||||||
return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false);
|
return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false);
|
||||||
case TDMT_STREAM_TASK_UPDATE_CHKPT:
|
case TDMT_STREAM_TASK_UPDATE_CHKPT:
|
||||||
return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
|
return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont, pMsg->contLen);
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1013,7 +1013,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen);
|
return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg, msgLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||||
|
|
|
@ -640,7 +640,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen) {
|
||||||
SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
|
SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
|
||||||
|
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
@ -652,13 +652,14 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, in
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
|
||||||
if (ppTask != NULL && (*ppTask) != NULL) {
|
if (ppTask != NULL && (*ppTask) != NULL) {
|
||||||
streamTaskUpdateTaskCheckpointInfo(*ppTask, pReq);
|
streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq);
|
||||||
} else { // failed to get the task.
|
} else { // failed to get the task.
|
||||||
tqError("vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, it may have been dropped already",
|
tqError("vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, it may have been dropped already",
|
||||||
vgId, pReq->taskId);
|
vgId, pReq->taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
// always return success when handling the requirement issued by mnode during transaction.
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -853,7 +854,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
} else if (pState->state == TASK_STATUS__UNINIT) {
|
} else if (pState->state == TASK_STATUS__UNINIT) {
|
||||||
tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
|
tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
// /*int32_t ret = */ streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
|
||||||
tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId);
|
tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId);
|
||||||
} else {
|
} else {
|
||||||
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name);
|
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name);
|
||||||
|
@ -883,7 +883,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
||||||
tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
|
tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
|
||||||
pTask->id.idStr, (int32_t)pReq->downstreamTaskId);
|
pTask->id.idStr, (int32_t)pReq->downstreamTaskId);
|
||||||
|
|
||||||
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS);
|
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -901,7 +901,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
||||||
// re-send the lost checkpoint-trigger msg to downstream task
|
// re-send the lost checkpoint-trigger msg to downstream task
|
||||||
tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
|
tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
|
||||||
(int32_t)pReq->downstreamTaskId, checkpointId, transId);
|
(int32_t)pReq->downstreamTaskId, checkpointId, transId);
|
||||||
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_SUCCESS);
|
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_SUCCESS);
|
||||||
} else { // not send checkpoint-trigger yet, wait
|
} else { // not send checkpoint-trigger yet, wait
|
||||||
int32_t recv = 0, total = 0;
|
int32_t recv = 0, total = 0;
|
||||||
streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
|
streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
|
||||||
|
@ -914,7 +914,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
||||||
"sending checkpoint-source/trigger",
|
"sending checkpoint-source/trigger",
|
||||||
pTask->id.idStr, recv, total);
|
pTask->id.idStr, recv, total);
|
||||||
}
|
}
|
||||||
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
|
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
|
||||||
}
|
}
|
||||||
} else { // upstream not recv the checkpoint-source/trigger till now
|
} else { // upstream not recv the checkpoint-source/trigger till now
|
||||||
ASSERT(pState->state == TASK_STATUS__READY || pState->state == TASK_STATUS__HALT);
|
ASSERT(pState->state == TASK_STATUS__READY || pState->state == TASK_STATUS__HALT);
|
||||||
|
@ -922,7 +922,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
||||||
"s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
|
"s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
|
||||||
"upstream sending checkpoint-source/trigger",
|
"upstream sending checkpoint-source/trigger",
|
||||||
pTask->id.idStr);
|
pTask->id.idStr);
|
||||||
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
|
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
@ -998,8 +998,18 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
|
||||||
|
|
||||||
int32_t level = pTask->info.taskLevel;
|
int32_t level = pTask->info.taskLevel;
|
||||||
if (level == TASK_LEVEL__SINK) {
|
if (level == TASK_LEVEL__SINK) {
|
||||||
if (status == TASK_STATUS__UNINIT) {
|
ASSERT (status != TASK_STATUS__UNINIT); /*{
|
||||||
}
|
// tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr);
|
||||||
|
//
|
||||||
|
// if (pTask->pBackend == NULL) { // TODO: add test cases for this
|
||||||
|
// int32_t code = pMeta->expandTaskFn(pTask);
|
||||||
|
// if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
// tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId);
|
||||||
|
// streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
||||||
|
}*/
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1025,11 +1035,21 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
|
||||||
} else {
|
} else {
|
||||||
streamTrySchedExec(pTask);
|
streamTrySchedExec(pTask);
|
||||||
}
|
}
|
||||||
} else if (status == TASK_STATUS__UNINIT) {
|
} else {
|
||||||
// todo: fill-history task init ?
|
ASSERT (status != TASK_STATUS__UNINIT);// { // todo: fill-history task init ?
|
||||||
if (pTask->info.fillHistory == 0) {
|
// if (pTask->info.fillHistory == 0) {
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
// tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr);
|
||||||
}
|
//
|
||||||
|
// if (pTask->pBackend == NULL) { // TODO: add test cases for this
|
||||||
|
// int32_t code = pMeta->expandTaskFn(pTask);
|
||||||
|
// if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
// tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId);
|
||||||
|
// streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// int32_t ret = */streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
|
@ -410,6 +410,11 @@ int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bi
|
||||||
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
|
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Column index exceeds the number of columns
|
||||||
|
if (colIdx >= pCols->size && pCol == NULL) {
|
||||||
|
return buildInvalidOperationMsg(&pBuf, "column index exceeds the number of columns");
|
||||||
|
}
|
||||||
|
|
||||||
if (bind->buffer_type != pColSchema->type) {
|
if (bind->buffer_type != pColSchema->type) {
|
||||||
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
||||||
}
|
}
|
||||||
|
|
|
@ -257,7 +257,8 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
|
||||||
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
|
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
|
||||||
void bkdMgtDestroy(SBkdMgt* bm);
|
void bkdMgtDestroy(SBkdMgt* bm);
|
||||||
|
|
||||||
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list);
|
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list,
|
||||||
|
const char* id);
|
||||||
|
|
||||||
void* taskAcquireDb(int64_t refId);
|
void* taskAcquireDb(int64_t refId);
|
||||||
void taskReleaseDb(int64_t refId);
|
void taskReleaseDb(int64_t refId);
|
||||||
|
|
|
@ -194,7 +194,7 @@ void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
|
||||||
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask);
|
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask);
|
||||||
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo);
|
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo);
|
||||||
|
|
||||||
void streamClearChkptReadyMsg(SStreamTask* pTask);
|
void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo);
|
||||||
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||||
int32_t* blockSize);
|
int32_t* blockSize);
|
||||||
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
|
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
|
||||||
|
|
|
@ -2053,7 +2053,11 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
|
||||||
stInfo("%s newly create db in state-backend", key);
|
stInfo("%s newly create db in state-backend", key);
|
||||||
// pre create db
|
// pre create db
|
||||||
pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err);
|
pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err);
|
||||||
if (pTaskDb->db == NULL) goto _EXIT;
|
if (pTaskDb->db == NULL) {
|
||||||
|
stError("%s open state-backend failed, reason:%s", key, err);
|
||||||
|
goto _EXIT;
|
||||||
|
}
|
||||||
|
|
||||||
rocksdb_close(pTaskDb->db);
|
rocksdb_close(pTaskDb->db);
|
||||||
|
|
||||||
if (cfNames != NULL) {
|
if (cfNames != NULL) {
|
||||||
|
@ -2181,7 +2185,6 @@ void taskDbDestroy(void* pDb, bool flush) {
|
||||||
void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); }
|
void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); }
|
||||||
|
|
||||||
int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) {
|
int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) {
|
||||||
int64_t st = taosGetTimestampMs();
|
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
int64_t refId = pDb->refId;
|
int64_t refId = pDb->refId;
|
||||||
|
|
||||||
|
@ -2202,15 +2205,15 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list) {
|
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list, const char* idStr) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
|
SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
|
||||||
|
|
||||||
char* temp = taosMemoryCalloc(1, strlen(pDb->path) + 32);
|
char* temp = taosMemoryCalloc(1, strlen(pDb->path) + 32);
|
||||||
sprintf(temp, "%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "tmp", chkpId);
|
sprintf(temp, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId);
|
||||||
|
|
||||||
if (taosDirExist(temp)) {
|
if (taosDirExist(temp)) {
|
||||||
cleanDir(temp, "");
|
cleanDir(temp, idStr);
|
||||||
} else {
|
} else {
|
||||||
taosMkDir(temp);
|
taosMkDir(temp);
|
||||||
}
|
}
|
||||||
|
@ -2220,7 +2223,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
|
|
||||||
|
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list, const char* idStr) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STaskDbWrapper* pDb = arg;
|
STaskDbWrapper* pDb = arg;
|
||||||
ECHECKPOINT_BACKUP_TYPE utype = type;
|
ECHECKPOINT_BACKUP_TYPE utype = type;
|
||||||
|
@ -2229,7 +2233,7 @@ int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t typ
|
||||||
if (utype == DATA_UPLOAD_RSYNC) {
|
if (utype == DATA_UPLOAD_RSYNC) {
|
||||||
code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
|
code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
|
||||||
} else if (utype == DATA_UPLOAD_S3) {
|
} else if (utype == DATA_UPLOAD_S3) {
|
||||||
code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list);
|
code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list, idStr);
|
||||||
}
|
}
|
||||||
taskDbUnRefChkp(pDb, chkpId);
|
taskDbUnRefChkp(pDb, chkpId);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -58,8 +58,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->stage < stage) {
|
if (pInfo->stage < stage) {
|
||||||
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
|
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64,
|
||||||
", prev:%" PRId64,
|
|
||||||
id, upstreamTaskId, vgId, stage, pInfo->stage);
|
id, upstreamTaskId, vgId, stage, pInfo->stage);
|
||||||
// record the checkpoint failure id and sent to mnode
|
// record the checkpoint failure id and sent to mnode
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
@ -170,13 +169,13 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
pRsp->status = streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
|
pRsp->status = streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
|
|
||||||
SStreamTaskState* pState = streamTaskGetStatus(pTask);
|
SStreamTaskState* pState = streamTaskGetStatus(pTask);
|
||||||
stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
|
stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
|
||||||
") task:0x%x (vgId:%d), check_status:%d",
|
") task:0x%x (vgId:%d), check_status:%d",
|
||||||
pTask->id.idStr, pState->name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
|
pTask->id.idStr, pState->name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
|
||||||
pRsp->status);
|
pRsp->status);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
} else {
|
} else {
|
||||||
pRsp->status = TASK_DOWNSTREAM_NOT_READY;
|
pRsp->status = TASK_DOWNSTREAM_NOT_READY;
|
||||||
stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
|
stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
|
||||||
|
|
|
@ -18,16 +18,6 @@
|
||||||
#include "streamBackendRocksdb.h"
|
#include "streamBackendRocksdb.h"
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
ECHECKPOINT_BACKUP_TYPE type;
|
|
||||||
|
|
||||||
char* taskId;
|
|
||||||
int64_t chkpId;
|
|
||||||
SStreamTask* pTask;
|
|
||||||
int64_t dbRefId;
|
|
||||||
void* pMeta;
|
|
||||||
} SAsyncUploadArg;
|
|
||||||
|
|
||||||
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
|
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
|
||||||
static int32_t deleteCheckpointFile(const char* id, const char* name);
|
static int32_t deleteCheckpointFile(const char* id, const char* name);
|
||||||
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path);
|
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path);
|
||||||
|
@ -114,8 +104,15 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo, int32_t code) {
|
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
||||||
SCheckpointTriggerRsp* pRsp = rpcMallocCont(sizeof(SCheckpointTriggerRsp));
|
SRpcHandleInfo* pRpcInfo, int32_t code) {
|
||||||
|
int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
|
||||||
|
|
||||||
|
void* pBuf = rpcMallocCont(size);
|
||||||
|
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
|
||||||
|
|
||||||
|
((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId);
|
||||||
|
|
||||||
pRsp->streamId = pTask->id.streamId;
|
pRsp->streamId = pTask->id.streamId;
|
||||||
pRsp->upstreamTaskId = pTask->id.taskId;
|
pRsp->upstreamTaskId = pTask->id.taskId;
|
||||||
pRsp->taskId = dstTaskId;
|
pRsp->taskId = dstTaskId;
|
||||||
|
@ -130,7 +127,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId
|
||||||
|
|
||||||
pRsp->rspCode = code;
|
pRsp->rspCode = code;
|
||||||
|
|
||||||
SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = sizeof(SCheckpointTriggerRsp), .info = *pRpcInfo};
|
SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = size, .info = *pRpcInfo};
|
||||||
tmsgSendRsp(&rspMsg);
|
tmsgSendRsp(&rspMsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -408,11 +405,11 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
|
||||||
streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo);
|
streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo);
|
||||||
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
|
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
|
||||||
if (clearChkpReadyMsg) {
|
if (clearChkpReadyMsg) {
|
||||||
streamClearChkptReadyMsg(pTask);
|
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq) {
|
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -429,7 +426,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
|
||||||
pReq->transId);
|
pReq->transId);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
{ // destroy the related fill-history tasks
|
{ // destroy the related fill-history tasks
|
||||||
// drop task should not in the meta-lock, and drop the related fill-history task now
|
// drop task should not in the meta-lock, and drop the related fill-history task now
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
if (pReq->dropRelHTask) {
|
if (pReq->dropRelHTask) {
|
||||||
|
@ -446,34 +443,42 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
|
||||||
|
|
||||||
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
|
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
|
||||||
|
|
||||||
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint info, checkpointId:%" PRId64 "->%" PRId64
|
if ((!restored) && (pStatus->state != TASK_STATUS__CK)) {
|
||||||
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
|
stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
|
||||||
id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
|
" failed",
|
||||||
pInfo->checkpointTime, pReq->checkpointTs);
|
pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
|
||||||
|
|
||||||
if (pStatus->state != TASK_STATUS__DROPPING) {
|
|
||||||
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer);
|
|
||||||
|
|
||||||
pInfo->checkpointId = pReq->checkpointId;
|
|
||||||
pInfo->checkpointVer = pReq->checkpointVer;
|
|
||||||
pInfo->checkpointTime = pReq->checkpointTs;
|
|
||||||
|
|
||||||
streamTaskClearCheckInfo(pTask, false);
|
|
||||||
|
|
||||||
// todo handle error
|
|
||||||
if (pStatus->state == TASK_STATUS__CK) {
|
|
||||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
|
||||||
} else {
|
|
||||||
stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus->name);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
stDebug("s-task:0x%x vgId:%d status:%s not update checkpoint info, checkpointId:%" PRId64 "->%" PRId64 " failed",
|
|
||||||
pReq->taskId, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!restored) { // during restore procedure, do update checkpoint-info
|
||||||
|
stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
|
||||||
|
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
|
||||||
|
id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
|
||||||
|
pInfo->checkpointTime, pReq->checkpointTs);
|
||||||
|
} else { // not in restore status, must be in checkpoint status
|
||||||
|
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
|
||||||
|
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
|
||||||
|
id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
|
||||||
|
pInfo->checkpointTime, pReq->checkpointTs);
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
|
||||||
|
pInfo->processedVer <= pReq->checkpointVer);
|
||||||
|
|
||||||
|
pInfo->checkpointId = pReq->checkpointId;
|
||||||
|
pInfo->checkpointVer = pReq->checkpointVer;
|
||||||
|
pInfo->checkpointTime = pReq->checkpointTs;
|
||||||
|
|
||||||
|
streamTaskClearCheckInfo(pTask, true);
|
||||||
|
|
||||||
|
if (pStatus->state == TASK_STATUS__CK) {
|
||||||
|
// todo handle error
|
||||||
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus->name);
|
||||||
|
}
|
||||||
|
|
||||||
if (pReq->dropRelHTask) {
|
if (pReq->dropRelHTask) {
|
||||||
stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
|
stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
|
||||||
pReq->taskId, vgId, pReq->hTaskId);
|
pReq->taskId, vgId, pReq->hTaskId);
|
||||||
|
@ -562,78 +567,70 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t uploadCheckpointData(void* param) {
|
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
|
||||||
SAsyncUploadArg* pParam = param;
|
|
||||||
char* path = NULL;
|
char* path = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
|
SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
|
||||||
char* taskStr = pParam->taskId ? pParam->taskId : "NULL";
|
int64_t now = taosGetTimestampMs();
|
||||||
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
const char* idStr = pTask->id.idStr;
|
||||||
|
|
||||||
void* pBackend = taskAcquireDb(pParam->dbRefId);
|
if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles,
|
||||||
if (pBackend == NULL) {
|
pTask->id.idStr)) != 0) {
|
||||||
stError("s-task:%s failed to acquire db", taskStr);
|
stError("s-task:%s failed to gen upload checkpoint:%" PRId64, idStr, checkpointId);
|
||||||
taosMemoryFree(pParam->taskId);
|
|
||||||
taosMemoryFree(pParam);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((code = taskDbGenChkpUploadData(pParam->pTask->pBackend, ((SStreamMeta*)pParam->pMeta)->bkdChkptMgt,
|
if (type == DATA_UPLOAD_S3) {
|
||||||
pParam->chkpId, (int8_t)(pParam->type), &path, toDelFiles)) != 0) {
|
if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) {
|
||||||
stError("s-task:%s failed to gen upload checkpoint:%" PRId64, taskStr, pParam->chkpId);
|
stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", idStr, checkpointId);
|
||||||
}
|
|
||||||
|
|
||||||
if (pParam->type == DATA_UPLOAD_S3) {
|
|
||||||
if (code == 0 && (code = getCheckpointDataMeta(pParam->taskId, path, toDelFiles)) != 0) {
|
|
||||||
stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", taskStr, pParam->chkpId);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = streamTaskUploadCheckpoint(pParam->taskId, path);
|
code = streamTaskUploadCheckpoint(idStr, path);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", taskStr, pParam->chkpId);
|
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
|
||||||
} else {
|
} else {
|
||||||
stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", taskStr, pParam->chkpId, path);
|
stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", idStr, checkpointId, path);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taskReleaseDb(pParam->dbRefId);
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
|
||||||
if (code == 0) {
|
|
||||||
int32_t size = taosArrayGetSize(toDelFiles);
|
int32_t size = taosArrayGetSize(toDelFiles);
|
||||||
stDebug("s-task:%s remove redundant %d files", taskStr, size);
|
stDebug("s-task:%s remove redundant %d files", idStr, size);
|
||||||
|
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
char* pName = taosArrayGetP(toDelFiles, i);
|
char* pName = taosArrayGetP(toDelFiles, i);
|
||||||
code = deleteCheckpointFile(pParam->taskId, pName);
|
code = deleteCheckpointFile(idStr, pName);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
stDebug("s-task:%s failed to del file: %s", taskStr, pName);
|
stDebug("s-task:%s failed to remove file: %s", idStr, pName);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("s-task:%s remove redundant files done", taskStr);
|
stDebug("s-task:%s remove redundant files in uploading checkpointId:%" PRId64 " data", idStr, checkpointId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroyP(toDelFiles, taosMemoryFree);
|
taosArrayDestroyP(toDelFiles, taosMemoryFree);
|
||||||
|
double el = (taosGetTimestampMs() - now) / 1000.0;
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
stDebug("s-task:%s remove local checkpointId:%" PRId64 " data %s", taskStr, pParam->chkpId, path);
|
stDebug("s-task:%s complete update checkpointId:%" PRId64 ", elapsed time:%.2fs remove local checkpoint data %s",
|
||||||
|
idStr, checkpointId, el, path);
|
||||||
taosRemoveDir(path);
|
taosRemoveDir(path);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s update checkpointId:%" PRId64 " keep local checkpoint data", taskStr, pParam->chkpId);
|
stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs",
|
||||||
|
idStr, checkpointId, el);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(path);
|
taosMemoryFree(path);
|
||||||
taosMemoryFree(pParam->taskId);
|
|
||||||
taosMemoryFree(pParam);
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId, char* taskId) {
|
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) {
|
||||||
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
|
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
|
||||||
if (type == DATA_UPLOAD_DISABLE) {
|
if (type == DATA_UPLOAD_DISABLE) {
|
||||||
|
stDebug("s-task:%s not allowed to upload checkpoint data", pTask->id.idStr);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -641,15 +638,17 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointI
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg));
|
int64_t dbRefId = taskGetDBRef(pTask->pBackend);
|
||||||
arg->type = type;
|
void* pBackend = taskAcquireDb(dbRefId);
|
||||||
arg->taskId = taosStrdup(taskId);
|
if (pBackend == NULL) {
|
||||||
arg->chkpId = checkpointId;
|
stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData", pTask->id.idStr);
|
||||||
arg->pTask = pTask;
|
return -1;
|
||||||
arg->dbRefId = taskGetDBRef(pTask->pBackend);
|
}
|
||||||
arg->pMeta = pTask->pMeta;
|
|
||||||
|
|
||||||
return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL);
|
int32_t code = uploadCheckpointData(pTask, checkpointId, taskGetDBRef(pTask->pBackend), type);
|
||||||
|
taskReleaseDb(dbRefId);
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
|
@ -670,6 +669,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: monitoring the checkpoint-source msg
|
||||||
// send check point response to upstream task
|
// send check point response to upstream task
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
|
@ -679,38 +679,39 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// todo: let's retry send rsp to upstream/mnode
|
// todo: let's retry send rsp to mnode, checkpoint-ready has monitor now
|
||||||
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId,
|
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
if (code == TSDB_CODE_SUCCESS && (pTask->pMsgCb != NULL)) {
|
code = streamTaskRemoteBackupCheckpoint(pTask, ckId);
|
||||||
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code));
|
||||||
code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code));
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
// clear the checkpoint info if failed
|
// TODO: monitoring the checkpoint-report msg
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
// update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
if (pTask->pMsgCb != NULL) {
|
||||||
|
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
|
||||||
|
}
|
||||||
|
} else { // clear the checkpoint info if failed
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
streamTaskClearCheckInfo(pTask, false);
|
streamTaskClearCheckInfo(pTask, false);
|
||||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||||
streamTaskSetFailedCheckpointId(pTask);
|
streamTaskSetFailedCheckpointId(pTask);
|
||||||
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
|
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
|
||||||
}
|
}
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - startTs) / 1000.0;
|
double el = (taosGetTimestampMs() - startTs) / 1000.0;
|
||||||
stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", id,
|
stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2fs, %s ", id,
|
||||||
pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
|
pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
|
||||||
(code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
|
(code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
|
||||||
|
|
||||||
|
@ -739,14 +740,13 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pActiveInfo->checkCounter = 0;
|
pActiveInfo->checkCounter = 0;
|
||||||
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, pTask->id.idStr, vgId, now);
|
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
|
||||||
|
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
SStreamTaskState* pState = streamTaskGetStatus(pTask);
|
SStreamTaskState* pState = streamTaskGetStatus(pTask);
|
||||||
if (pState->state != TASK_STATUS__CK) {
|
if (pState->state != TASK_STATUS__CK) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", pTask->id.idStr,
|
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
|
||||||
vgId, ref);
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||||
|
@ -756,8 +756,8 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
||||||
// checkpoint-trigger recv flag is set, quit
|
// checkpoint-trigger recv flag is set, quit
|
||||||
if (pActiveInfo->allUpstreamTriggerRecv) {
|
if (pActiveInfo->allUpstreamTriggerRecv) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d",
|
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId,
|
||||||
pTask->id.idStr, vgId, ref);
|
ref);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||||
|
@ -815,6 +815,7 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
|
||||||
const char* pId = pTask->id.idStr;
|
const char* pId = pTask->id.idStr;
|
||||||
int32_t size = taosArrayGetSize(pNotSendList);
|
int32_t size = taosArrayGetSize(pNotSendList);
|
||||||
int32_t numOfUpstream = streamTaskGetNumOfUpstream(pTask);
|
int32_t numOfUpstream = streamTaskGetNumOfUpstream(pTask);
|
||||||
|
int64_t checkpointId = pTask->chkInfo.pActiveInfo->activeId;
|
||||||
|
|
||||||
if (size <= 0) {
|
if (size <= 0) {
|
||||||
stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId);
|
stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId);
|
||||||
|
@ -840,15 +841,14 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
|
||||||
pReq->downstreamNodeId = vgId;
|
pReq->downstreamNodeId = vgId;
|
||||||
pReq->upstreamTaskId = pUpstreamTask->taskId;
|
pReq->upstreamTaskId = pUpstreamTask->taskId;
|
||||||
pReq->upstreamNodeId = pUpstreamTask->nodeId;
|
pReq->upstreamNodeId = pUpstreamTask->nodeId;
|
||||||
pReq->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
|
pReq->checkpointId = checkpointId;
|
||||||
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq));
|
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq));
|
||||||
|
|
||||||
code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
|
code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
|
||||||
stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId, vgId,
|
stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId, vgId,
|
||||||
pUpstreamTask->taskId, pUpstreamTask->nodeId, pReq->checkpointId);
|
pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -781,7 +781,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
||||||
// check the status every 100ms
|
// check the status every 100ms
|
||||||
if (streamTaskShouldStop(pTask)) {
|
if (streamTaskShouldStop(pTask)) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
|
stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
|
||||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -795,6 +795,18 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
||||||
pActiveInfo->sendReadyCheckCounter = 0;
|
pActiveInfo->sendReadyCheckCounter = 0;
|
||||||
stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id);
|
stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id);
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
SStreamTaskState* pState = streamTaskGetStatus(pTask);
|
||||||
|
if (pState->state != TASK_STATUS__CK) {
|
||||||
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId,
|
||||||
|
pState->name, ref);
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
taosThreadMutexLock(&pActiveInfo->lock);
|
taosThreadMutexLock(&pActiveInfo->lock);
|
||||||
|
|
||||||
SArray* pList = pActiveInfo->pReadyMsgList;
|
SArray* pList = pActiveInfo->pReadyMsgList;
|
||||||
|
@ -844,7 +856,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
||||||
"and quit from timer, ref:%d",
|
"and quit from timer, ref:%d",
|
||||||
id, vgId, ref);
|
id, vgId, ref);
|
||||||
|
|
||||||
streamClearChkptReadyMsg(pTask);
|
streamClearChkptReadyMsg(pActiveInfo);
|
||||||
taosThreadMutexUnlock(&pActiveInfo->lock);
|
taosThreadMutexUnlock(&pActiveInfo->lock);
|
||||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||||
}
|
}
|
||||||
|
@ -906,9 +918,9 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
|
||||||
tmsgSendRsp(&pInfo->msg);
|
tmsgSendRsp(&pInfo->msg);
|
||||||
|
|
||||||
taosArrayClear(pList);
|
taosArrayClear(pList);
|
||||||
stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
stDebug("s-task:%s level:%d checkpoint-source rsp completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s level:%d already send rsp checkpoint success to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
stDebug("s-task:%s level:%d already send checkpoint-source rsp success to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock);
|
taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock);
|
||||||
|
@ -1116,8 +1128,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamClearChkptReadyMsg(SStreamTask* pTask) {
|
void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) {
|
||||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
|
||||||
if (pActiveInfo == NULL) {
|
if (pActiveInfo == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -231,7 +231,6 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
|
||||||
return 0;
|
return 0;
|
||||||
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
|
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
|
||||||
stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
|
stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
|
||||||
|
|
||||||
return streamMetaCvtDbFormat(pMeta);
|
return streamMetaCvtDbFormat(pMeta);
|
||||||
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
|
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
|
||||||
stError(
|
stError(
|
||||||
|
|
|
@ -240,7 +240,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
checkFillhistoryTaskStatus(pTask, pHisTask);
|
checkFillhistoryTaskStatus(pTask, pHisTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pHisTask);
|
streamMetaReleaseTask(pMeta, pHisTask);
|
||||||
|
|
|
@ -254,7 +254,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
walCloseReader(pTask->exec.pWalReader);
|
walCloseReader(pTask->exec.pWalReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamClearChkptReadyMsg(pTask);
|
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
|
||||||
|
|
||||||
if (pTask->msgInfo.pData != NULL) {
|
if (pTask->msgInfo.pData != NULL) {
|
||||||
clearBufferedDispatchMsg(pTask);
|
clearBufferedDispatchMsg(pTask);
|
||||||
|
@ -836,7 +836,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||||
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
||||||
|
|
||||||
// in case of fill-history task, stop the tsdb file scan operation.
|
// in case of fill-history task, stop the tsdb file scan operation.
|
||||||
if (pTask->info.fillHistory == 1) {
|
if (pTask->info.fillHistory == 1) {
|
||||||
|
|
|
@ -623,9 +623,9 @@ void doInitStateTransferTable(void) {
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
|
||||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||||
|
|
|
@ -0,0 +1,65 @@
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import frame.etool
|
||||||
|
|
||||||
|
from frame.log import *
|
||||||
|
from frame.cases import *
|
||||||
|
from frame.sql import *
|
||||||
|
from frame.caseBase import *
|
||||||
|
from frame import *
|
||||||
|
from frame.autogen import *
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase(TBase):
|
||||||
|
|
||||||
|
def td_30642(self):
|
||||||
|
sqls = [
|
||||||
|
"CREATE DATABASE IF NOT EXISTS `_xTest2`",
|
||||||
|
"CREATE USER `_xTest` PASS 'taosdata'",
|
||||||
|
"CREATE TABLE IF NOT EXISTS `_xTest2`.`meters` (ts timestamp, v1 int) tags(t1 int)",
|
||||||
|
|
||||||
|
"CREATE DATABASE IF NOT EXISTS `test2`",
|
||||||
|
"CREATE USER `user1` PASS 'taosdata'",
|
||||||
|
"CREATE TABLE IF NOT EXISTS `test2`.`meters2` (ts timestamp, v1 int) tags(t1 int)"
|
||||||
|
]
|
||||||
|
tdSql.executes(sqls)
|
||||||
|
|
||||||
|
sql1 = 'GRANT read ON `_xTest2`.`meters` WITH (t1 = 1) TO `_xTest`'
|
||||||
|
tdSql.query(sql1)
|
||||||
|
sql1_verify = "select * from information_schema.ins_user_privileges where user_name='_xTest' and privilege='read' and db_name='_xTest2' and table_name='meters'"
|
||||||
|
tdSql.query(sql1_verify)
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 4, '(`_xTest2`.`meters`.`t1` = 1)')
|
||||||
|
|
||||||
|
sql2 = 'GRANT write ON test2.meters2 WITH (t1 = 1) TO user1'
|
||||||
|
tdSql.query(sql2)
|
||||||
|
sql2_verify = "select * from information_schema.ins_user_privileges where user_name='user1' and privilege='write' and db_name='test2' and table_name='meters2'"
|
||||||
|
tdSql.query(sql2_verify)
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 4, '(`test2`.`meters2`.`t1` = 1)')
|
||||||
|
|
||||||
|
# run
|
||||||
|
def run(self):
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
|
||||||
|
# TD-30642
|
||||||
|
self.td_30642()
|
||||||
|
|
||||||
|
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -194,11 +194,11 @@ class TDTestCase(TBase):
|
||||||
# alter float(c9) double(c10) to tsz
|
# alter float(c9) double(c10) to tsz
|
||||||
comp = "tsz"
|
comp = "tsz"
|
||||||
sql = f"alter table {tbname} modify column c9 COMPRESS '{comp}';"
|
sql = f"alter table {tbname} modify column c9 COMPRESS '{comp}';"
|
||||||
tdSql.execute(sql)
|
tdSql.execute(sql, show=True)
|
||||||
self.checkDataDesc(tbname, 10, 5, comp)
|
self.checkDataDesc(tbname, 10, 5, comp)
|
||||||
self.writeData(10000)
|
self.writeData(10000)
|
||||||
sql = f"alter table {tbname} modify column c10 COMPRESS '{comp}';"
|
sql = f"alter table {tbname} modify column c10 COMPRESS '{comp}';"
|
||||||
tdSql.execute(sql)
|
tdSql.execute(sql, show=True)
|
||||||
self.checkDataDesc(tbname, 11, 5, comp)
|
self.checkDataDesc(tbname, 11, 5, comp)
|
||||||
self.writeData(10000)
|
self.writeData(10000)
|
||||||
|
|
||||||
|
@ -207,9 +207,48 @@ class TDTestCase(TBase):
|
||||||
for i in range(self.colCnt - 1):
|
for i in range(self.colCnt - 1):
|
||||||
col = f"c{i}"
|
col = f"c{i}"
|
||||||
sql = f"alter table {tbname} modify column {col} LEVEL '{level}';"
|
sql = f"alter table {tbname} modify column {col} LEVEL '{level}';"
|
||||||
tdSql.execute(sql)
|
tdSql.execute(sql, show=True)
|
||||||
|
self.checkDataDesc(tbname, i + 1, 6, level)
|
||||||
self.writeData(1000)
|
self.writeData(1000)
|
||||||
|
|
||||||
|
# modify two combine
|
||||||
|
|
||||||
|
|
||||||
|
i = 9
|
||||||
|
encode = "delta-d"
|
||||||
|
compress = "zlib"
|
||||||
|
sql = f"alter table {tbname} modify column c{i} ENCODE '{encode}' COMPRESS '{compress}';"
|
||||||
|
tdSql.execute(sql, show=True)
|
||||||
|
self.checkDataDesc(tbname, i + 1, 4, encode)
|
||||||
|
self.checkDataDesc(tbname, i + 1, 5, compress)
|
||||||
|
|
||||||
|
i = 10
|
||||||
|
encode = "delta-d"
|
||||||
|
level = "high"
|
||||||
|
sql = f"alter table {tbname} modify column c{i} ENCODE '{encode}' LEVEL '{level}';"
|
||||||
|
tdSql.execute(sql, show=True)
|
||||||
|
self.checkDataDesc(tbname, i + 1, 4, encode)
|
||||||
|
self.checkDataDesc(tbname, i + 1, 6, level)
|
||||||
|
|
||||||
|
i = 2
|
||||||
|
compress = "zlib"
|
||||||
|
level = "high"
|
||||||
|
sql = f"alter table {tbname} modify column c{i} COMPRESS '{compress}' LEVEL '{level}';"
|
||||||
|
tdSql.execute(sql, show=True)
|
||||||
|
self.checkDataDesc(tbname, i + 1, 5, compress)
|
||||||
|
self.checkDataDesc(tbname, i + 1, 6, level)
|
||||||
|
|
||||||
|
# modify three combine
|
||||||
|
i = 7
|
||||||
|
encode = "simple8b"
|
||||||
|
compress = "zstd"
|
||||||
|
level = "medium"
|
||||||
|
sql = f"alter table {tbname} modify column c{i} ENCODE '{encode}' COMPRESS '{compress}' LEVEL '{level}';"
|
||||||
|
tdSql.execute(sql, show=True)
|
||||||
|
self.checkDataDesc(tbname, i + 1, 4, encode)
|
||||||
|
self.checkDataDesc(tbname, i + 1, 5, compress)
|
||||||
|
self.checkDataDesc(tbname, i + 1, 6, level)
|
||||||
|
|
||||||
# alter error
|
# alter error
|
||||||
sqls = [
|
sqls = [
|
||||||
"alter table nodb.nostb modify column ts LEVEL 'high';",
|
"alter table nodb.nostb modify column ts LEVEL 'high';",
|
||||||
|
|
|
@ -30,6 +30,7 @@
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f query/subquery/subqueryBugs.py -N 3
|
,,y,army,./pytest.sh python3 ./test.py -f query/subquery/subqueryBugs.py -N 3
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f storage/oneStageComp.py -N 3 -L 3 -D 1
|
,,y,army,./pytest.sh python3 ./test.py -f storage/oneStageComp.py -N 3 -L 3 -D 1
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f storage/compressBasic.py -N 3
|
,,y,army,./pytest.sh python3 ./test.py -f storage/compressBasic.py -N 3
|
||||||
|
,,y,army,./pytest.sh python3 ./test.py -f grant/grantBugs.py -N 3
|
||||||
|
|
||||||
#
|
#
|
||||||
# system test
|
# system test
|
||||||
|
|
Loading…
Reference in New Issue