Merge remote-tracking branch 'origin/master' into master-wq

Former-commit-id: 75a3fba2c4b7e27305cc8438c8fc466d374610de
This commit is contained in:
qiwang 2024-05-20 20:19:23 +08:00
commit 122075a3cb
46 changed files with 423 additions and 176 deletions

View File

@ -6,8 +6,7 @@ COPY . .
RUN go env -w GO111MODULE=on \
&& go env -w GOPROXY=https://goproxy.cn,direct \
&& go env -w CGO_ENABLED=0 \
&& go mod download
&& go env -w CGO_ENABLED=0
RUN go build -o pcm-coordinator-api /app/api/pcm.go
FROM alpine:3.18

View File

@ -408,7 +408,8 @@ type (
DeletedAt string `json:"deletedAt,omitempty" gorm:"index" db:"deleted_at"`
NsID string `json:"nsId,omitempty" db:"ns_id"`
TenantId string `json:"tenantId,omitempty" db:"tenant_id"`
CreateTime string `json:"createTime,omitempty" db:"create_time" gorm:"autoCreateTime"`
CreatedTime string `json:"createdTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time"`
AdapterTypeDict int `json:"adapterTypeDict" db:"create_time" gorm:"adapter_type_dict"` //任务类型(对应字典表的值
}
)

View File

@ -978,6 +978,9 @@ service pcm {
@handler GetComputeCardsByClusterHandler
get /schedule/getComputeCardsByCluster/:adapterId/:clusterId (GetComputeCardsByClusterReq) returns (GetComputeCardsByClusterResp)
@handler GetClusterBalanceByIdHandler
get /schedule/getClusterBalanceById/:adapterId/:clusterId (GetClusterBalanceByIdReq) returns (GetClusterBalanceByIdResp)
}
@server(
@ -1032,7 +1035,7 @@ service pcm {
get /monitoring/alert/rule (AlertRulesReq) returns (AlertRulesResp)
@handler DeleteAlertRuleHandler
delete /cloud/alert/rule (DeleteAlertRuleReq)
delete /monitoring/alert/rule (DeleteAlertRuleReq)
@doc "cluster resource load"
@handler clustersLoadHandler

View File

@ -141,4 +141,13 @@ type (
GetComputeCardsByClusterResp {
Cards []string `json:"cards"`
}
GetClusterBalanceByIdReq{
AdapterId string `path:"adapterId"`
ClusterId string `path:"clusterId"`
}
GetClusterBalanceByIdResp{
Balance float64 `json:"balance"`
}
)

View File

@ -1225,6 +1225,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/schedule/getComputeCardsByCluster/:adapterId/:clusterId",
Handler: schedule.GetComputeCardsByClusterHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/schedule/getClusterBalanceById/:adapterId/:clusterId",
Handler: schedule.GetClusterBalanceByIdHandler(serverCtx),
},
},
rest.WithPrefix("/pcm/v1"),
)
@ -1304,7 +1309,7 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
},
{
Method: http.MethodDelete,
Path: "/cloud/alert/rule",
Path: "/monitoring/alert/rule",
Handler: monitoring.DeleteAlertRuleHandler(serverCtx),
},
{

View File

@ -0,0 +1,26 @@
package schedule
import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/schedule"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
)
func GetClusterBalanceByIdHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.GetClusterBalanceByIdReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := schedule.NewGetClusterBalanceByIdLogic(r.Context(), svcCtx)
resp, err := l.GetClusterBalanceById(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -22,7 +22,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
)

View File

@ -22,7 +22,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"github.com/zeromicro/go-zero/core/logx"
)

View File

@ -23,7 +23,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
error2 "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/error"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"github.com/zeromicro/go-zero/core/logx"

View File

@ -22,7 +22,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"github.com/zeromicro/go-zero/core/logx"
)

View File

@ -22,7 +22,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"github.com/zeromicro/go-zero/core/logx"
)

View File

@ -21,7 +21,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"github.com/zeromicro/go-zero/core/logx"

View File

@ -23,7 +23,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"github.com/zeromicro/go-zero/core/logx"

View File

@ -21,7 +21,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
error2 "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/error"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"github.com/zeromicro/go-zero/core/logx"

View File

@ -23,7 +23,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"strings"

View File

@ -23,7 +23,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"github.com/zeromicro/go-zero/core/logx"

View File

@ -19,7 +19,7 @@ import (
"github.com/jinzhu/copier"
error2 "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/error"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"

View File

@ -21,7 +21,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
error2 "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/error"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"github.com/zeromicro/go-zero/core/logx"

View File

@ -76,7 +76,7 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
select {
case _ = <-ch:
return resp, nil
case <-time.After(2 * time.Second):
case <-time.After(1 * time.Second):
return resp, nil
}

View File

@ -101,7 +101,7 @@ func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<-
}
for _, task := range taskList {
t := task
if t.Status == constants.Completed || task.Status == constants.Failed {
if t.Status == constants.Completed || task.Status == constants.Failed || task.Status == constants.Stopped {
continue
}
wg.Add(1)

View File

@ -23,7 +23,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"github.com/zeromicro/go-zero/core/logx"

View File

@ -19,7 +19,7 @@ import (
"github.com/jinzhu/copier"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"

View File

@ -23,7 +23,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"github.com/zeromicro/go-zero/core/logx"

View File

@ -19,7 +19,7 @@ import (
"github.com/jinzhu/copier"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"

View File

@ -20,7 +20,7 @@ import (
"github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"

View File

@ -21,7 +21,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
error2 "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/error"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"

View File

@ -21,7 +21,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"github.com/zeromicro/go-zero/core/logx"

View File

@ -22,7 +22,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"github.com/zeromicro/go-zero/core/logx"
)

View File

@ -21,7 +21,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"github.com/zeromicro/go-zero/core/logx"

View File

@ -21,7 +21,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"

View File

@ -19,7 +19,7 @@ import (
"github.com/jinzhu/copier"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"

View File

@ -80,138 +80,193 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
for _, ch := range chs {
select {
case <-ch:
case <-time.After(2 * time.Second):
return
}
}
return
}
func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) {
for _, task := range tasks {
if task.AdapterTypeDict != 1 {
continue
}
if task.Status == constants.Succeeded || task.Status == constants.Failed {
continue
for i := len(tasks) - 1; i >= 0; i-- {
if tasks[i].AdapterTypeDict != 1 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed {
tasks = append(tasks[:i], tasks[i+1:]...)
}
}
var aiTask []*models.TaskAi
tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask)
if len(tasks) == 0 {
ch <- struct{}{}
return
}
task := tasks[0]
for i, _ := range tasks {
earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime)
if latest.Before(earliest) {
task = tasks[i]
}
}
var aiTask []*models.TaskAi
tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
ch <- struct{}{}
return
}
if len(aiTask) == 0 {
ch <- struct{}{}
return
}
if len(aiTask) == 1 {
if aiTask[0].Status == constants.Completed {
task.Status = constants.Succeeded
} else {
task.Status = aiTask[0].Status
}
task.StartTime = aiTask[0].StartTime
task.EndTime = aiTask[0].EndTime
task.UpdatedTime = time.Now().Format(constants.Layout)
tx = l.svcCtx.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return
}
ch <- struct{}{}
return
}
if len(aiTask) == 0 {
for i := len(aiTask) - 1; i >= 0; i-- {
if aiTask[i].StartTime == "" {
aiTask = append(aiTask[:i], aiTask[i+1:]...)
}
}
if len(aiTask) == 0 {
ch <- struct{}{}
return
}
start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local)
end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local)
var status string
var count int
for _, a := range aiTask {
s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local)
e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local)
if s.Before(start) {
start = s
}
if e.After(end) {
end = e
}
if a.Status == constants.Failed {
status = a.Status
break
}
if a.Status == constants.Pending {
status = a.Status
continue
}
start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local)
end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local)
var status string
var count int
for _, a := range aiTask {
s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local)
e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local)
if s.Before(start) {
start = s
}
if e.After(end) {
end = e
}
if a.Status == constants.Failed {
status = a.Status
break
}
if a.Status == constants.Pending {
status = a.Status
continue
}
if a.Status == constants.Running {
status = a.Status
continue
}
if a.Status == constants.Completed {
count++
continue
}
if a.Status == constants.Running {
status = a.Status
continue
}
if count == len(aiTask) {
status = constants.Succeeded
if a.Status == constants.Completed {
count++
continue
}
}
if status != "" {
task.Status = status
task.StartTime = start.Format(constants.Layout)
task.EndTime = end.Format(constants.Layout)
}
if count == len(aiTask) {
status = constants.Succeeded
}
tx = l.svcCtx.DbEngin.Table("task").Updates(task)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return
}
if status != "" {
task.Status = status
task.StartTime = start.Format(constants.Layout)
task.EndTime = end.Format(constants.Layout)
}
task.UpdatedTime = time.Now().Format(constants.Layout)
tx = l.svcCtx.DbEngin.Table("task").Model(task).Updates(task)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
ch <- struct{}{}
return
}
ch <- struct{}{}
}
func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) {
for i := len(tasks) - 1; i >= 0; i-- {
if tasks[i].AdapterTypeDict != 1 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed {
tasks = append(tasks[:i], tasks[i+1:]...)
}
}
if len(tasks) == 0 {
ch <- struct{}{}
return
}
task := tasks[0]
for i, _ := range tasks {
earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime)
if latest.Before(earliest) {
task = tasks[i]
}
}
var aiTaskList []*models.TaskAi
tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
ch <- struct{}{}
return
}
if len(aiTaskList) == 0 {
ch <- struct{}{}
return
}
var wg sync.WaitGroup
for _, task := range tasks {
if task.AdapterTypeDict != 1 {
for _, aitask := range aiTaskList {
t := aitask
if t.Status == constants.Completed || t.Status == constants.Failed {
continue
}
if task.Status == constants.Succeeded || task.Status == constants.Failed {
continue
}
var aiTaskList []*models.TaskAi
tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return
}
if len(aiTaskList) == 0 {
continue
}
for _, aitask := range aiTaskList {
t := aitask
if t.Status == constants.Completed {
continue
}
wg.Add(1)
go func() {
trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId)
if err != nil {
msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}
t.Status = trainingTask.Status
t.StartTime = trainingTask.Start
t.EndTime = trainingTask.End
err = l.svcCtx.Scheduler.AiStorages.UpdateAiTask(t)
if err != nil {
msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}
wg.Add(1)
go func() {
trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId)
if err != nil {
msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
}()
}
return
}
t.Status = trainingTask.Status
t.StartTime = trainingTask.Start
t.EndTime = trainingTask.End
err = l.svcCtx.Scheduler.AiStorages.UpdateAiTask(t)
if err != nil {
msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}
wg.Done()
}()
}
wg.Wait()
ch <- struct{}{}

View File

@ -0,0 +1,35 @@
package schedule
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetClusterBalanceByIdLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetClusterBalanceByIdLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetClusterBalanceByIdLogic {
return &GetClusterBalanceByIdLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetClusterBalanceByIdLogic) GetClusterBalanceById(req *types.GetClusterBalanceByIdReq) (resp *types.GetClusterBalanceByIdResp, err error) {
resp = &types.GetClusterBalanceByIdResp{}
balance, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId][req.ClusterId].GetUserBalance(l.ctx)
if err != nil {
return nil, err
}
resp.Balance = balance
return resp, nil
}

View File

@ -31,6 +31,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
resp = &types.ScheduleResp{}
opt := &option.AiOption{
AdapterId: req.AiOption.AdapterId,
ClusterIds: req.AiOption.AiClusterIds,
TaskName: req.AiOption.TaskName,
ResourceType: req.AiOption.ResourceType,
Replica: req.AiOption.Replica,

View File

@ -88,3 +88,12 @@ func RoundFloat(val float64, precision uint) float64 {
ratio := math.Pow(10, float64(precision))
return math.Round(val*ratio) / ratio
}
func Contains(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}

View File

@ -22,6 +22,7 @@ import (
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
@ -66,6 +67,24 @@ func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource strin
}
func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
if as.option.ComputeCard != "" {
m, ok := as.AiService.AiCollectorAdapterMap[as.option.AdapterId]
if ok {
for _, id := range as.option.ClusterIds {
cm, ok := m[id]
if ok {
cards, err := cm.GetComputeCards(as.ctx)
if err != nil {
return nil, err
}
if common.Contains(cards, as.option.ComputeCard) {
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: id, Replicas: 1}}, nil
}
}
}
}
}
if len(as.option.ClusterIds) == 1 {
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
}
@ -131,40 +150,49 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
var wg sync.WaitGroup
var results []*AiResult
var mu sync.Mutex
var errs []interface{}
var ch = make(chan *AiResult, len(clusters))
var errCh = make(chan interface{}, len(clusters))
var taskNum int32
for _, cluster := range clusters {
taskNum += cluster.Replicas
}
var ch = make(chan *AiResult, taskNum)
var errCh = make(chan interface{}, taskNum)
executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId]
for _, cluster := range clusters {
c := cluster
wg.Add(1)
go func() {
opt, _ := cloneAiOption(as.option)
resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt)
if err != nil {
e := struct {
err error
clusterId string
}{
err: err,
clusterId: c.ClusterId,
for i := 0; i < int(c.Replicas); i++ {
wg.Add(1)
go func() {
opt, _ := cloneAiOption(as.option)
resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt)
if err != nil {
e := struct {
err error
clusterId string
}{
err: err,
clusterId: c.ClusterId,
}
errCh <- e
wg.Done()
return
}
errCh <- e
mu.Lock()
result, _ := convertType(resp)
mu.Unlock()
result.Replica = c.Replicas
result.ClusterId = c.ClusterId
result.Strategy = as.option.StrategyName
result.Card = opt.ComputeCard
ch <- result
wg.Done()
return
}
result, _ := convertType(resp)
result.Replica = c.Replicas
result.ClusterId = c.ClusterId
result.Strategy = as.option.StrategyName
result.Card = opt.ComputeCard
ch <- result
wg.Done()
}()
}()
}
}
wg.Wait()
close(ch)

View File

@ -9,9 +9,9 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
"strconv"
)

View File

@ -11,6 +11,7 @@ type AiCollector interface {
DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error)
UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error
GetComputeCards(ctx context.Context) ([]string, error)
GetUserBalance(ctx context.Context) (float64, error)
}
type ResourceStats struct {

View File

@ -16,12 +16,13 @@ package storeLink
import (
"context"
"github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"strconv"
"strings"
)
@ -159,13 +160,37 @@ func (m *ModelArtsLink) GetDatasetsSpecs(ctx context.Context) ([]*collector.Data
}
func (m *ModelArtsLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
return nil, nil
var algorithms []*collector.Algorithm
req := &modelarts.ListAlgorithmsReq{
Platform: m.platform,
Offset: m.pageIndex,
Limit: m.pageSize,
}
resp, err := m.modelArtsRpc.ListAlgorithms(ctx, req)
if err != nil {
return nil, err
}
if resp.ErrorMsg != "" {
return nil, errors.New("failed to get algorithms")
}
for _, a := range resp.Items {
//TODO The value of taskType is temporarily fixed to "pytorch"
algorithm := &collector.Algorithm{Name: a.Metadata.Name, Platform: MODELARTS, TaskType: "pytorch"}
algorithms = append(algorithms, algorithm)
}
return algorithms, nil
}
func (m *ModelArtsLink) GetComputeCards(ctx context.Context) ([]string, error) {
return nil, nil
}
func (m *ModelArtsLink) GetUserBalance(ctx context.Context) (float64, error) {
return 0, nil
}
func (m *ModelArtsLink) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
return "", nil
}

View File

@ -359,6 +359,25 @@ func (o *OctopusLink) GetComputeCards(ctx context.Context) ([]string, error) {
return cards, nil
}
func (o *OctopusLink) GetUserBalance(ctx context.Context) (float64, error) {
balanceReq := &octopus.GetUserBalanceReq{
Platform: o.platform,
}
balanceResp, err := o.octopusRpc.GetUserBalance(ctx, balanceReq)
if err != nil {
return 0, err
}
if !balanceResp.Success {
if balanceResp.Error != nil {
return 0, errors.New(balanceResp.Error.Message)
} else {
return 0, errors.New("failed to get user balance")
}
}
balance := float64(balanceResp.Payload.BillingUser.Amount)
return balance, nil
}
func (o *OctopusLink) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
var name string
if resourceType == CARD {
@ -591,7 +610,10 @@ func (o *OctopusLink) generateResourceId(ctx context.Context, option *option.AiO
}
if option.ResourceType == CARD {
err = setResourceIdByCard(option, specResp, GCU)
if option.ComputeCard == "" {
option.ComputeCard = GCU
}
err = setResourceIdByCard(option, specResp, option.ComputeCard)
if err != nil {
return err
}
@ -742,6 +764,8 @@ func (o *OctopusLink) generateCmd(option *option.AiOption) error {
switch option.ComputeCard {
case GCU:
option.Cmd = "cd /code; python3 train.py"
case MLU:
option.Cmd = ". /torch/venv3/pytorch/bin/activate; cd /code; python train.py"
default:
option.Cmd = TRAIN_CMD
}

View File

@ -33,7 +33,7 @@ import (
const (
RAM_SIZE_1G = 1024 // 1G
WORKER_NUMBER = 1
DCU = "dcu"
DCU = "DCU"
DCU_TOPS = 24.5
PYTORCH = "Pytorch"
TASK_PYTORCH_PREFIX = "PytorchTask"
@ -453,6 +453,16 @@ func (s *ShuguangAi) GetComputeCards(ctx context.Context) ([]string, error) {
return cards, nil
}
func (s *ShuguangAi) GetUserBalance(ctx context.Context) (float64, error) {
userReq := &hpcAC.GetUserInfoReq{}
userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq)
if err != nil {
return 0, err
}
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
return balance, nil
}
func (s *ShuguangAi) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
algoName := dataset + DASH + algorithm
req := &hpcAC.GetFileReq{
@ -570,7 +580,13 @@ func (s *ShuguangAi) generateResourceId(option *option.AiOption) error {
}
if option.ResourceType == CARD {
if option.ComputeCard == "" {
option.ComputeCard = DCU
}
if strings.ToUpper(option.ComputeCard) != DCU {
return errors.New("computeCard not found")
}
option.ComputeCard = DCU
if 0 <= option.Tops && option.Tops <= DCU_TOPS {

View File

@ -25,11 +25,11 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"gitlink.org.cn/JointCloud/pcm-octopus/octopus"
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gorm.io/gorm"
"strings"
"sync"

View File

@ -30,12 +30,12 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice"
"gitlink.org.cn/JointCloud/pcm-kubernetes/kubernetesclient"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
"gitlink.org.cn/JointCloud/pcm-openstack/openstackclient"
slurmClient "gitlink.org.cn/JointCloud/pcm-slurm/slurmclient"
"gitlink.org.cn/jcce-pcm/pcm-participant-ceph/cephclient"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"

View File

@ -344,7 +344,8 @@ type TaskModel struct {
DeletedAt string `json:"deletedAt,omitempty" gorm:"index" db:"deleted_at"`
NsID string `json:"nsId,omitempty" db:"ns_id"`
TenantId string `json:"tenantId,omitempty" db:"tenant_id"`
CreateTime string `json:"createTime,omitempty" db:"create_time" gorm:"autoCreateTime"`
CreatedTime string `json:"createdTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time"`
AdapterTypeDict int `json:"adapterTypeDict" db:"create_time" gorm:"adapter_type_dict"` //任务类型(对应字典表的值
}
@ -5745,6 +5746,15 @@ type GetComputeCardsByClusterResp struct {
Cards []string `json:"cards"`
}
type GetClusterBalanceByIdReq struct {
AdapterId string `path:"adapterId"`
ClusterId string `path:"clusterId"`
}
type GetClusterBalanceByIdResp struct {
Balance float64 `json:"balance"`
}
type CreateAlertRuleReq struct {
CLusterId string `json:"clusterId"`
ClusterName string `json:"clusterName"`

2
go.mod
View File

@ -26,11 +26,11 @@ require (
github.com/zeromicro/go-zero v1.6.3
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240426095603-549fefd8bece
gitlink.org.cn/JointCloud/pcm-kubernetes v0.0.0-20240301071143-347480abff2c
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240515005224-689bb339a9c9
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240510133934-6a5526289b35
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203
gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5
gitlink.org.cn/jcce-pcm/pcm-participant-ceph v0.0.0-20230904090036-24fc730ec87d
gitlink.org.cn/jcce-pcm/pcm-participant-modelarts v0.0.0-20231101085149-724c7c4cc090
go.opentelemetry.io/otel/trace v1.25.0
gonum.org/v1/gonum v0.11.0
google.golang.org/grpc v1.63.0

4
go.sum
View File

@ -1082,6 +1082,8 @@ gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240426095603-549fefd8bece h1:W3yBnvAVV
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240426095603-549fefd8bece/go.mod h1:w3Nb5TNymCItQ7K3x4Q0JLuoq9OerwAzAWT2zsPE9Xo=
gitlink.org.cn/JointCloud/pcm-kubernetes v0.0.0-20240301071143-347480abff2c h1:2Wl/hvaSFjh6fmCSIQhjkr9llMRREQeqcXNLZ/HPY18=
gitlink.org.cn/JointCloud/pcm-kubernetes v0.0.0-20240301071143-347480abff2c/go.mod h1:lSRfGs+PxFvw7CcndHWRd6UlLlGrZn0b0hp5cfaMNGw=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240515005224-689bb339a9c9 h1:FRtOtI9vDFHcyPUdc4PL95CFi/DFk+HXT6JNTf/91d8=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240515005224-689bb339a9c9/go.mod h1:2WC5tDApfQNNIBfDNYwdaQiXhfCsG2n03P3ZxX9p9O4=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240510133934-6a5526289b35 h1:E2QfpS3Y0FjR8Zyv5l2Ti/2NetQFqHG66c8+T/+J1u0=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240510133934-6a5526289b35/go.mod h1:QOD5+/l2D+AYBjF2h5T0mdJyfGAmF78QmeKdbBXbjLQ=
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 h1:s6PsZ1+bev294IWdZRlV7mnOwI1+UzFcldVW/BqhQzI=
@ -1090,8 +1092,6 @@ gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5 h1:+/5vnz
gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5/go.mod h1:97AlUXN13g9UN3+9/DzCHpeoU5sbdyv0IQuTEHNexzQ=
gitlink.org.cn/jcce-pcm/pcm-participant-ceph v0.0.0-20230904090036-24fc730ec87d h1:DHjl/rLuH2gKYtY0MKMGNQDHFT12APg25RlMUQo+tHk=
gitlink.org.cn/jcce-pcm/pcm-participant-ceph v0.0.0-20230904090036-24fc730ec87d/go.mod h1:r/KLzUpupCV5jdxSfgDhc2pVjP0fBi3VhAWRttsBn30=
gitlink.org.cn/jcce-pcm/pcm-participant-modelarts v0.0.0-20231101085149-724c7c4cc090 h1:jztlHo72bcWM1jUwvG3Hfk2K+AJL0RvlsdIqlktH/MI=
gitlink.org.cn/jcce-pcm/pcm-participant-modelarts v0.0.0-20231101085149-724c7c4cc090/go.mod h1:pisJKAI8FRFFUcBaH3Gob+ENXWRM97rpuYmv9s1raag=
go.etcd.io/etcd/api/v3 v3.5.7/go.mod h1:9qew1gCdDDLu+VwmeG+iFpL+QlpHTo7iubavdVDgCAA=
go.etcd.io/etcd/api/v3 v3.5.13 h1:8WXU2/NBge6AUF1K1gOexB6e07NgsN1hXK0rSTtgSp4=
go.etcd.io/etcd/api/v3 v3.5.13/go.mod h1:gBqlqkcMMZMVTMm4NDZloEVJzxQOQIls8splbqBDa0c=