From 80476d35829173743d138bb68ab661046181c0dd Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Mon, 30 Oct 2023 16:06:51 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=88=97=E8=A1=A8=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E5=B7=B2=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Former-commit-id: 1c63965ba59fc5ea1e20aa61a697222285fc190f --- api/desc/core/pcm-core.api | 3 + .../logic/core/scheduletaskbyyamllogic.go | 2 +- api/internal/logic/core/scheduletasklogic.go | 2 +- api/internal/logic/core/tasklistlogic.go | 70 ++++++++++++++++++- api/internal/types/types.go | 3 + pkg/models/taskmodel_gen.go | 3 +- .../reportheartbeatlogic.go | 5 +- 7 files changed, 83 insertions(+), 5 deletions(-) diff --git a/api/desc/core/pcm-core.api b/api/desc/core/pcm-core.api index 44f0143b..9bff4649 100644 --- a/api/desc/core/pcm-core.api +++ b/api/desc/core/pcm-core.api @@ -260,6 +260,9 @@ type ( CpuCores float64 `json:"cpuCores"` CpuRate float64 `json:"cpuRate"` CpuLimit float64 `json:"cpuLimit"` + GpuCores float64 `json:"gpuCores"` + GpuRate float64 `json:"gpuRate"` + GpuLimit float64 `json:"gpuLimit"` MemoryTotal float64 `json:"memoryTotal"` MemoryRate float64 `json:"memoryRate"` MemoryLimit float64 `json:"memoryLimit"` diff --git a/api/internal/logic/core/scheduletaskbyyamllogic.go b/api/internal/logic/core/scheduletaskbyyamllogic.go index 4459682c..e58e64f1 100644 --- a/api/internal/logic/core/scheduletaskbyyamllogic.go +++ b/api/internal/logic/core/scheduletaskbyyamllogic.go @@ -39,7 +39,7 @@ func (l *ScheduleTaskByYamlLogic) ScheduleTaskByYaml(req *types.ScheduleTaskByYa Description: req.Description, Name: req.Name, YamlString: string(bytes), - CommitTime: time.Now().String(), + CommitTime: time.Now(), } // 保存任务数据到数据库 tx := l.svcCtx.DbEngin.Create(&taskModel) diff --git a/api/internal/logic/core/scheduletasklogic.go b/api/internal/logic/core/scheduletasklogic.go index 5beb123e..bb580e0f 100644 --- a/api/internal/logic/core/scheduletasklogic.go +++ b/api/internal/logic/core/scheduletasklogic.go @@ -43,7 +43,7 @@ func (l *ScheduleTaskLogic) ScheduleTask(req *types.ScheduleTaskReq) (err error) Description: req.Description, Name: req.Name, YamlString: string(bytes), - CommitTime: time.Now().String(), + CommitTime: time.Now(), } // save the task in mysql and return id tx := l.svcCtx.DbEngin.Create(&taskModel) diff --git a/api/internal/logic/core/tasklistlogic.go b/api/internal/logic/core/tasklistlogic.go index a73c9fae..27728bf6 100644 --- a/api/internal/logic/core/tasklistlogic.go +++ b/api/internal/logic/core/tasklistlogic.go @@ -2,6 +2,10 @@ package core import ( "context" + "fmt" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" + "strconv" + "time" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" @@ -24,7 +28,71 @@ func NewTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *TaskList } func (l *TaskListLogic) TaskList(req *types.TaskListReq) (resp *types.TaskListResp, err error) { - // todo: add your logic here and delete this line + resp = &types.TaskListResp{} + + // 查询任务数据 + var tasks []models.Task + offset := (req.PageNum - 1) * req.PageSize + tx := l.svcCtx.DbEngin.Order("created_time desc").Offset(offset).Limit(req.PageSize).Find(&tasks) + if tx.Error != nil { + logx.Error(err) + return nil, tx.Error + } + if len(tasks) == 0 { + return nil, nil + } + // 查询任务总数 + l.svcCtx.DbEngin.Model(&models.Task{}).Count(&resp.TotalCount) + + // 查询任务异常数 + l.svcCtx.DbEngin.Model(&models.Task{}).Where("status = ?", "Failed").Count(&resp.AlarmCount) + + // 正常任务数 + resp.NormalCount = resp.TotalCount - resp.AlarmCount + for _, task := range tasks { + type PInfo struct { + Id int64 + Name string + Address string + } + pInfo := PInfo{} + tx := l.svcCtx.DbEngin.Raw("SELECT id,name,address from sc_participant_phy_info where id in (SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.participant_id) ,GROUP_CONCAT(DISTINCT a.participant_id) ,GROUP_CONCAT(DISTINCT c.participant_id))as service_name from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?)", task.Id).Scan(&pInfo) + if tx.Error != nil { + logx.Error(err) + return nil, tx.Error + } + // 获取p端状态 + var pStatus string + key := fmt.Sprintf("ps_%s-%d", pInfo.Address, pInfo.Id) + hGetAll := l.svcCtx.RedisClient.HGetAll(l.ctx, key) + if hGetAll.Err() != nil { + return nil, err + } + if len(hGetAll.Val()) != 0 { + parseInt, err := strconv.ParseInt(hGetAll.Val()["lastHeartbeat"], 10, 64) + if err != nil { + return nil, err + } + + if (time.Now().Unix() - parseInt) > 15 { + pStatus = "Unknown" + } else { + pStatus = "Normal" + } + } + + resp.Tasks = append(resp.Tasks, types.Task{ + Id: task.Id, + Name: task.Name, + Status: task.Status, + StartTime: task.StartTime, + EndTime: task.EndTime, + ParticipantId: pInfo.Id, + ParticipantName: pInfo.Name, + ParticipantStatus: pStatus, + }) + + } return } diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 555673f2..b5d4e9d3 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -237,6 +237,9 @@ type TaskDetailResp struct { CpuCores float64 `json:"cpuCores"` CpuRate float64 `json:"cpuRate"` CpuLimit float64 `json:"cpuLimit"` + GpuCores float64 `json:"gpuCores"` + GpuRate float64 `json:"gpuRate"` + GpuLimit float64 `json:"gpuLimit"` MemoryTotal float64 `json:"memoryTotal"` MemoryRate float64 `json:"memoryRate"` MemoryLimit float64 `json:"memoryLimit"` diff --git a/pkg/models/taskmodel_gen.go b/pkg/models/taskmodel_gen.go index ea5a41f5..470abbe6 100644 --- a/pkg/models/taskmodel_gen.go +++ b/pkg/models/taskmodel_gen.go @@ -7,6 +7,7 @@ import ( "database/sql" "gorm.io/gorm" "strings" + "time" "github.com/zeromicro/go-zero/core/stores/builder" "github.com/zeromicro/go-zero/core/stores/sqlx" @@ -40,7 +41,7 @@ type ( Status string `db:"status"` // 作业状态 Strategy int64 `db:"strategy"` // 策略 SynergyStatus int64 `db:"synergy_status"` // 协同状态(0-未协同、1-已协同) - CommitTime string `db:"commit_time"` // 提交时间 + CommitTime time.Time `db:"commit_time"` // 提交时间 StartTime string `db:"start_time"` // 开始时间 EndTime string `db:"end_time"` // 结束运行时间 RunningTime int64 `db:"running_time"` // 已运行时间(单位秒) diff --git a/rpc/internal/logic/participantservice/reportheartbeatlogic.go b/rpc/internal/logic/participantservice/reportheartbeatlogic.go index 25fb094d..8d637369 100644 --- a/rpc/internal/logic/participantservice/reportheartbeatlogic.go +++ b/rpc/internal/logic/participantservice/reportheartbeatlogic.go @@ -32,13 +32,16 @@ func (l *ReportHeartbeatLogic) ReportHeartbeat(in *pcmCore.ParticipantHeartbeatR key := fmt.Sprintf("ps_%s-%d", in.Address, in.ParticipantId) clientsMutex.Lock() defer clientsMutex.Unlock() - l.svcCtx.RedisClient.HSet(context.Background(), key, + setResult := l.svcCtx.RedisClient.HSet(context.Background(), key, map[string]interface{}{ "address": in.Address, "participantId": in.ParticipantId, "lastHeartbeat": time.Now().Unix(), "clientState": "TRUE", }) + if setResult.Err() != nil { + return nil, setResult.Err() + } l.svcCtx.RedisClient.Expire(context.Background(), key, 30*time.Second) return &pcmCore.HealthCheckResp{ Code: 200,