rpc移除同步任务接口

Former-commit-id: 9c13fa7882b6a74c2c0bb48114b989d6ff8c54fa
This commit is contained in:
zhangwei 2023-11-27 15:52:47 +08:00
parent 9f570982ca
commit c4615e3c18
6 changed files with 651 additions and 742 deletions

View File

@ -26,66 +26,6 @@ const (
)
var promQLTemplates = map[string]string{
//node
"node_cpu_utilisation": "node:node_cpu_utilisation:avg1m{$1}",
"node_cpu_total": "node:node_num_cpu:sum{$1}",
"node_memory_utilisation": "node:node_memory_utilisation:{$1}",
"node_memory_available": "node:node_memory_bytes_available:sum{$1}",
"node_memory_total": "node:node_memory_bytes_total:sum{$1}",
"node_memory_usage_wo_cache": "node:node_memory_bytes_total:sum{$1} - node:node_memory_bytes_available:sum{$1}",
"node_net_utilisation": "node:node_net_utilisation:sum_irate{$1}",
"node_net_bytes_transmitted": "node:node_net_bytes_transmitted:sum_irate{$1}",
"node_net_bytes_received": "node:node_net_bytes_received:sum_irate{$1}",
"node_disk_read_iops": "node:data_volume_iops_reads:sum{$1}",
"node_disk_write_iops": "node:data_volume_iops_writes:sum{$1}",
"node_disk_read_throughput": "node:data_volume_throughput_bytes_read:sum{$1}",
"node_disk_write_throughput": "node:data_volume_throughput_bytes_written:sum{$1}",
"node_disk_size_capacity": `sum(max(node_filesystem_size_bytes{device=~"/dev/.*", device!~"/dev/loop\\d+", job="node-exporter"} * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:{$1}) by (device, node)) by (node)`,
"node_disk_size_available": `node:disk_space_available:{$1}`,
"node_disk_size_usage": `sum(max((node_filesystem_size_bytes{device=~"/dev/.*", device!~"/dev/loop\\d+", job="node-exporter"} - node_filesystem_avail_bytes{device=~"/dev/.*", device!~"/dev/loop\\d+", job="node-exporter"}) * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:{$1}) by (device, node)) by (node)`,
"node_disk_size_utilisation": `node:disk_space_utilization:ratio{$1}`,
"node_disk_inode_total": `node:node_inodes_total:{$1}`,
"node_disk_inode_usage": `node:node_inodes_total:{$1} - node:node_inodes_free:{$1}`,
"node_disk_inode_utilisation": `node:disk_inode_utilization:ratio{$1}`,
"node_pod_count": `node:pod_count:sum{$1}`,
"node_pod_quota": `max(kube_node_status_capacity{resource="pods",$1}) by (node) unless on (node) (kube_node_status_condition{condition="Ready",status=~"unknown|false"} > 0)`,
"node_pod_utilisation": `node:pod_utilization:ratio{$1}`,
"node_pod_running_count": `node:pod_running:count{$1}`,
"node_pod_succeeded_count": `node:pod_succeeded:count{$1}`,
"node_pod_abnormal_count": `node:pod_abnormal:count{$1}`,
"node_cpu_usage": `round(node:node_cpu_utilisation:avg1m{$1} * node:node_num_cpu:sum{$1}, 0.001)`,
"node_load1": `node:load1:ratio{$1}`,
"node_load5": `node:load5:ratio{$1}`,
"node_load15": `node:load15:ratio{$1}`,
"node_pod_abnormal_ratio": `node:pod_abnormal:ratio{$1}`,
"node_pleg_quantile": `node_quantile:kubelet_pleg_relist_duration_seconds:histogram_quantile{$1}`,
"node_device_size_usage": `sum by(device, node, host_ip, role) (node_filesystem_size_bytes{device!~"/dev/loop\\d+",device=~"/dev/.*",job="node-exporter"} * on(namespace, pod) group_left(node, host_ip, role) node_namespace_pod:kube_pod_info:{$1}) - sum by(device, node, host_ip, role) (node_filesystem_avail_bytes{device!~"/dev/loop\\d+",device=~"/dev/.*",job="node-exporter"} * on(namespace, pod) group_left(node, host_ip, role) node_namespace_pod:kube_pod_info:{$1})`,
"node_device_size_utilisation": `1 - sum by(device, node, host_ip, role) (node_filesystem_avail_bytes{device!~"/dev/loop\\d+",device=~"/dev/.*",job="node-exporter"} * on(namespace, pod) group_left(node, host_ip, role) node_namespace_pod:kube_pod_info:{$1}) / sum by(device, node, host_ip, role) (node_filesystem_size_bytes{device!~"/dev/loop\\d+",device=~"/dev/.*",job="node-exporter"} * on(namespace, pod) group_left(node, host_ip, role) node_namespace_pod:kube_pod_info:{$1})`,
// workspace
"workspace_cpu_usage": `round(sum by (workspace) (namespace:container_cpu_usage_seconds_total:sum_rate{namespace!="", $1}), 0.001)`,
"workspace_memory_usage": `sum by (workspace) (namespace:container_memory_usage_bytes:sum{namespace!="", $1})`,
"workspace_memory_usage_wo_cache": `sum by (workspace) (namespace:container_memory_usage_bytes_wo_cache:sum{namespace!="", $1})`,
"workspace_net_bytes_transmitted": `sum by (workspace) (sum by (namespace) (irate(container_network_transmit_bytes_total{namespace!="", pod!="", interface!~"^(cali.+|tunl.+|dummy.+|kube.+|flannel.+|cni.+|docker.+|veth.+|lo.*)", job="kubelet"}[5m])) * on (namespace) group_left(workspace) kube_namespace_labels{$1}) or on(workspace) max by(workspace) (kube_namespace_labels{$1} * 0)`,
"workspace_net_bytes_received": `sum by (workspace) (sum by (namespace) (irate(container_network_receive_bytes_total{namespace!="", pod!="", interface!~"^(cali.+|tunl.+|dummy.+|kube.+|flannel.+|cni.+|docker.+|veth.+|lo.*)", job="kubelet"}[5m])) * on (namespace) group_left(workspace) kube_namespace_labels{$1}) or on(workspace) max by(workspace) (kube_namespace_labels{$1} * 0)`,
"workspace_pod_count": `sum by (workspace) (kube_pod_status_phase{phase!~"Failed|Succeeded", namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1})) or on(workspace) max by(workspace) (kube_namespace_labels{$1} * 0)`,
"workspace_pod_running_count": `sum by (workspace) (kube_pod_status_phase{phase="Running", namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1})) or on(workspace) max by(workspace) (kube_namespace_labels{$1} * 0)`,
"workspace_pod_succeeded_count": `sum by (workspace) (kube_pod_status_phase{phase="Succeeded", namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1})) or on(workspace) max by(workspace) (kube_namespace_labels{$1} * 0)`,
"workspace_pod_abnormal_count": `count by (workspace) ((kube_pod_info{node!=""} unless on (pod, namespace) (kube_pod_status_phase{job="kube-state-metrics", phase="Succeeded"}>0) unless on (pod, namespace) ((kube_pod_status_ready{job="kube-state-metrics", condition="true"}>0) and on (pod, namespace) (kube_pod_status_phase{job="kube-state-metrics", phase="Running"}>0)) unless on (pod, namespace) (kube_pod_container_status_waiting_reason{job="kube-state-metrics", reason="ContainerCreating"}>0)) * on (namespace) group_left(workspace)(kube_namespace_labels{$1}))`,
"workspace_ingresses_extensions_count": `sum by (workspace) (kube_ingress_labels{namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1}))`,
"workspace_cronjob_count": `sum by (workspace) (kube_cronjob_labels{namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1}))`,
"workspace_pvc_count": `sum by (workspace) (kube_persistentvolumeclaim_info{namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1}))`,
"workspace_daemonset_count": `sum by (workspace) (kube_daemonset_labels{namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1}))`,
"workspace_deployment_count": `sum by (workspace) (kube_deployment_labels{namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1}))`,
"workspace_endpoint_count": `sum by (workspace) (kube_endpoint_labels{namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1}))`,
"workspace_hpa_count": `sum by (workspace) (kube_horizontalpodautoscaler_labels{namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1}))`,
"workspace_job_count": `sum by (workspace) (kube_job_labels{namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1}))`,
"workspace_statefulset_count": `sum by (workspace) (kube_statefulset_labels{namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1}))`,
"workspace_replicaset_count": `count by (workspace) (kube_replicaset_labels{namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1}))`,
"workspace_service_count": `sum by (workspace) (kube_service_info{namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1}))`,
"workspace_secret_count": `sum by (workspace) (kube_secret_info{namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1}))`,
"workspace_pod_abnormal_ratio": `count by (workspace) ((kube_pod_info{node!=""} unless on (pod, namespace) (kube_pod_status_phase{job="kube-state-metrics", phase="Succeeded"}>0) unless on (pod, namespace) ((kube_pod_status_ready{job="kube-state-metrics", condition="true"}>0) and on (pod, namespace) (kube_pod_status_phase{job="kube-state-metrics", phase="Running"}>0)) unless on (pod, namespace) (kube_pod_container_status_waiting_reason{job="kube-state-metrics", reason="ContainerCreating"}>0)) * on (namespace) group_left(workspace) kube_namespace_labels{$1}) / sum by (workspace) (kube_pod_status_phase{phase!="Succeeded", namespace!=""} * on (namespace) group_left(workspace)(kube_namespace_labels{$1}))`,
//namespace
"namespace_cpu_usage": `round(namespace:container_cpu_usage_seconds_total:sum_rate{namespace!="", $1}, 0.001)`,
@ -116,23 +56,6 @@ var promQLTemplates = map[string]string{
"namespace_ingresses_extensions_count": `sum by (namespace) (kube_ingress_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
"namespace_s2ibuilder_count": `sum by (namespace) (s2i_s2ibuilder_created{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
// workload
"workload_cpu_usage": `round(namespace:workload_cpu_usage:sum{$1}, 0.001)`,
"workload_memory_usage": `namespace:workload_memory_usage:sum{$1}`,
"workload_memory_usage_wo_cache": `namespace:workload_memory_usage_wo_cache:sum{$1}`,
"workload_net_bytes_transmitted": `namespace:workload_net_bytes_transmitted:sum_irate{$1}`,
"workload_net_bytes_received": `namespace:workload_net_bytes_received:sum_irate{$1}`,
"workload_deployment_replica": `label_join(sum (label_join(label_replace(kube_deployment_spec_replicas{$2}, "owner_kind", "Deployment", "", ""), "workload", "", "deployment")) by (namespace, owner_kind, workload), "workload", ":", "owner_kind", "workload")`,
"workload_deployment_replica_available": `label_join(sum (label_join(label_replace(kube_deployment_status_replicas_available{$2}, "owner_kind", "Deployment", "", ""), "workload", "", "deployment")) by (namespace, owner_kind, workload), "workload", ":", "owner_kind", "workload")`,
"workload_statefulset_replica": `label_join(sum (label_join(label_replace(kube_statefulset_replicas{$2}, "owner_kind", "StatefulSet", "", ""), "workload", "", "statefulset")) by (namespace, owner_kind, workload), "workload", ":", "owner_kind", "workload")`,
"workload_statefulset_replica_available": `label_join(sum (label_join(label_replace(kube_statefulset_status_replicas_current{$2}, "owner_kind", "StatefulSet", "", ""), "workload", "", "statefulset")) by (namespace, owner_kind, workload), "workload", ":", "owner_kind", "workload")`,
"workload_daemonset_replica": `label_join(sum (label_join(label_replace(kube_daemonset_status_desired_number_scheduled{$2}, "owner_kind", "DaemonSet", "", ""), "workload", "", "daemonset")) by (namespace, owner_kind, workload), "workload", ":", "owner_kind", "workload")`,
"workload_daemonset_replica_available": `label_join(sum (label_join(label_replace(kube_daemonset_status_number_available{$2}, "owner_kind", "DaemonSet", "", ""), "workload", "", "daemonset")) by (namespace, owner_kind, workload), "workload", ":", "owner_kind", "workload")`,
"workload_deployment_unavailable_replicas_ratio": `namespace:deployment_unavailable_replicas:ratio{$1}`,
"workload_daemonset_unavailable_replicas_ratio": `namespace:daemonset_unavailable_replicas:ratio{$1}`,
"workload_statefulset_unavailable_replicas_ratio": `namespace:statefulset_unavailable_replicas:ratio{$1}`,
"controller_cpu_usage_rate": `round(sum by (owner_name) (sum by (owner_name, pod) (irate(container_cpu_usage_seconds_total{job="kubelet", $1, image!=""}[5m]))/ sum by (owner_name,pod) (kube_pod_container_resource_limits{resource="cpu"}))/count(kube_pod_info{$2}) by (owner_name),0.0001)`,
"controller_memory_usage_rate": `round(sum by (owner_name) (sum by (owner_name, pod) (irate(container_memory_usage_bytes{job="kubelet", $1, image!=""}[5m]))/ sum by (owner_name,pod) (kube_pod_container_resource_limits{resource="memory"}))/count(kube_pod_info{$2}) by (owner_name),0.0001)`,
// pod

View File

@ -1,79 +0,0 @@
/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package cron
import (
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/svc"
"gorm.io/gorm"
"strings"
)
func InitCron(svc *svc.ServiceContext) {
svc.Cron.Start()
svc.Cron.AddFunc("*/5 * * * * ?", func() {
var tasks []models.Task
svc.DbEngin.Where("status not in ?", []string{constants.Deleted, constants.Succeeded, constants.Completed, constants.Failed}).Find(&tasks)
for _, task := range tasks {
var allStatus string
tx := svc.DbEngin.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", task.Id).Scan(&allStatus)
if tx.Error != nil {
logx.Error(tx.Error)
}
// 子状态统一则修改主任务状态
statusArray := strings.Split(allStatus, ",")
if len(removeRepeatedElement(statusArray)) == 1 {
updateTask(svc.DbEngin, &task, statusArray[0])
continue
}
// 子任务包含失败状态 主任务则失败
if strings.Contains(allStatus, constants.Failed) {
updateTask(svc.DbEngin, &task, constants.Failed)
continue
}
if strings.Contains(allStatus, constants.Running) {
updateTask(svc.DbEngin, &task, constants.Running)
}
}
})
}
func updateTask(dbEngin *gorm.DB, task *models.Task, status string) {
if task.Status != status {
task.Status = status
dbEngin.Updates(&task)
}
}
func removeRepeatedElement(arr []string) (newArr []string) {
newArr = make([]string, 0)
for i := 0; i < len(arr); i++ {
repeat := false
for j := i + 1; j < len(arr); j++ {
if arr[i] == arr[j] {
repeat = true
break
}
}
if !repeat {
newArr = append(newArr, arr[i])
}
}
return
}

View File

@ -17,8 +17,11 @@ package pcmcorelogic
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/pcmCore"
"gorm.io/gorm"
"strings"
"time"
"github.com/zeromicro/go-zero/core/logx"
@ -54,20 +57,73 @@ func (l *SyncInfoLogic) SyncInfo(in *pcmCore.SyncInfoReq) (*pcmCore.SyncInfoResp
switch kind {
case constants.CLOUD:
for _, cloudInfo := range in.CloudInfoList {
l.svcCtx.DbEngin.Exec("update cloud set status = ?,start_time = ?,running_time = ?,result = ? where participant_id = ? and task_id = ? and namespace = ? and name = ?",
cloudInfo.Status, cloudInfo.StartTime, cloudInfo.RunningTime, cloudInfo.Result, in.ParticipantId, cloudInfo.TaskId, cloudInfo.Namespace, cloudInfo.Name)
l.svcCtx.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and task_id = ? and namespace = ? and name = ?",
cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, in.ParticipantId, cloudInfo.TaskId, cloudInfo.Namespace, cloudInfo.Name)
syncTask(l.svcCtx.DbEngin, cloudInfo.TaskId)
}
case constants.HPC:
for _, hpcInfo := range in.HpcInfoList {
l.svcCtx.DbEngin.Exec("update hpc set status = ?,start_time = ?,running_time = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?",
l.svcCtx.DbEngin.Exec("update hpc set status = ?,start_time = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?",
hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, in.ParticipantId, hpcInfo.TaskId, hpcInfo.Name)
syncTask(l.svcCtx.DbEngin, hpcInfo.TaskId)
}
case constants.AI:
for _, aiInfo := range in.AiInfoList {
l.svcCtx.DbEngin.Exec("update ai set status = ?,start_time = ?,running_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?",
aiInfo.Status, aiInfo.StartTime, aiInfo.RunningTime, aiInfo.ProjectId, aiInfo.JobId, in.ParticipantId, aiInfo.TaskId, aiInfo.Name)
l.svcCtx.DbEngin.Exec("update ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?",
aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, in.ParticipantId, aiInfo.TaskId, aiInfo.Name)
syncTask(l.svcCtx.DbEngin, aiInfo.TaskId)
}
}
return &pcmCore.SyncInfoResp{}, nil
}
func syncTask(gorm *gorm.DB, taskId int64) {
var allStatus string
tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus)
if tx.Error != nil {
logx.Error(tx.Error)
}
// 子状态统一则修改主任务状态
statusArray := strings.Split(allStatus, ",")
if len(removeRepeatedElement(statusArray)) == 1 {
updateTask(gorm, taskId, statusArray[0])
}
// 子任务包含失败状态 主任务则失败
if strings.Contains(allStatus, constants.Failed) {
updateTask(gorm, taskId, constants.Failed)
}
if strings.Contains(allStatus, constants.Running) {
updateTask(gorm, taskId, constants.Running)
}
}
func updateTask(gorm *gorm.DB, taskId int64, status string) {
var task models.Task
gorm.Where("id = ? ", taskId).Find(&task)
if task.Status != status {
task.Status = status
gorm.Updates(&task)
}
}
func removeRepeatedElement(arr []string) (newArr []string) {
newArr = make([]string, 0)
for i := 0; i < len(arr); i++ {
repeat := false
for j := i + 1; j < len(arr); j++ {
if arr[i] == arr[j] {
repeat = true
break
}
}
if !repeat {
newArr = append(newArr, arr[i])
}
}
return
}

View File

@ -32,16 +32,17 @@ message AiInfo {
message CloudInfo {
int64 participant = 1;
int64 taskId = 2;
string apiVersion = 3;
string kind = 4;
string namespace = 5;
string name = 6;
string status = 7;
int64 id = 2;
int64 taskId = 3;
string apiVersion = 4;
string kind = 5;
string namespace = 6;
string name = 7;
string status = 11;
string startTime = 8;
int64 runningTime = 9;
string result = 10;
string yamlString = 11;
string yamlString = 12;
}
message VmInfo {

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,8 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.25.0
// source: pb/pcmCore.proto
// - protoc v3.19.4
// source: pcmCore.proto
package pcmCore
@ -146,7 +146,7 @@ var PcmCore_ServiceDesc = grpc.ServiceDesc{
},
},
Streams: []grpc.StreamDesc{},
Metadata: "pb/pcmCore.proto",
Metadata: "pcmCore.proto",
}
const (
@ -511,5 +511,5 @@ var ParticipantService_ServiceDesc = grpc.ServiceDesc{
},
},
Streams: []grpc.StreamDesc{},
Metadata: "pb/pcmCore.proto",
Metadata: "pcmCore.proto",
}