Merge pull request 'moved the scheduler module' (#13) from tzwang/pcm-coordinator:master into master

Former-commit-id: 859c6baca3cc3f93aaff7f0aaba4b81c6226e9d4
This commit is contained in:
tzwang 2024-01-26 09:35:56 +08:00
commit 6496cfce3c
28 changed files with 378 additions and 34 deletions

View File

@ -16,10 +16,10 @@ package mqs
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/schedulers"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service"
)
/*

View File

@ -16,9 +16,9 @@ package mqs
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/schedulers"
)
/*

View File

@ -15,8 +15,8 @@
package common
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy"
"math/rand"
"time"
)

View File

@ -1,7 +1,7 @@
package database
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
"gorm.io/gorm"
)

View File

@ -1,6 +1,6 @@
package database
import "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/entity"
import "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
type Storage interface {
GetProviderParams() ([]entity.ProviderParams, error)

View File

@ -18,11 +18,11 @@ import (
"encoding/json"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/common"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/database"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/common"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice"
"gorm.io/gorm"
"sigs.k8s.io/yaml"

View File

@ -16,13 +16,13 @@ package schedulers
import (
"errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
)

View File

@ -16,11 +16,11 @@ package schedulers
import (
"bytes"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/database"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"io"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

View File

@ -15,11 +15,11 @@
package schedulers
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
)

View File

@ -1,8 +1,8 @@
package schedulers
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy"
)
type VmScheduler struct {

View File

@ -2,9 +2,9 @@ package service
import (
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/impl"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/impl"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient"

View File

@ -1,8 +1,8 @@
package impl
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
)

View File

@ -1,8 +1,8 @@
package impl
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient"
)

View File

@ -2,8 +2,8 @@ package impl
import (
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/executor"
)
//单条作业费=作业运行秒数×(CPU核心数*CPU单价+GPU卡数×GPU单价+DCU卡数×DCU单价)/3600

View File

@ -2,7 +2,7 @@ package strategy
import (
"github.com/pkg/errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
)
type ReplicationStrategy struct {

View File

@ -16,7 +16,7 @@ package strategy
import (
"errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
)
type PricingStrategy struct {

View File

@ -102,6 +102,350 @@ func GetParticipantById(partId int64, dbEngin *gorm.DB) *models.StorelinkCenter
return &participant
}
func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.StorelinkCenter) (interface{}, error) {
switch (interface{})(in).(type) {
case *octopus.UploadImageResp:
inresp := (interface{})(in).(*octopus.UploadImageResp)
switch (interface{})(out).(type) {
case *types.UploadLinkImageResp:
resp := (interface{})(out).(*types.UploadLinkImageResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
return resp, nil
}
return resp, nil
}
return nil, nil
case *octopus.DeleteImageResp:
inresp := (interface{})(in).(*octopus.DeleteImageResp)
var resp types.DeleteLinkImageResp
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
return resp, nil
}
return resp, nil
case *octopus.GetUserImageListResp:
inresp := (interface{})(in).(*octopus.GetUserImageListResp)
var resp types.GetLinkImageListResp
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
resp.Images = nil
return resp, nil
}
for _, v := range inresp.Payload.Images {
var image types.ImageSl
image.ImageId = v.Image.Id
image.ImageName = v.Image.ImageName
image.ImageStatus = OctImgStatus[v.Image.ImageStatus]
resp.Images = append(resp.Images, &image)
}
return resp, nil
case *modelarts.ListReposDetailsResp:
inresp := (interface{})(in).(*modelarts.ListReposDetailsResp)
var resp types.GetLinkImageListResp
if inresp.Errors != nil {
resp.Success = false
resp.ErrorMsg = inresp.Errors[0].ErrorMessage
resp.Images = nil
return resp, nil
}
resp.Success = true
for _, v := range inresp.Items {
for _, r := range v.Tags {
var image types.ImageSl
image.ImageId = v.Namespace + "/" + v.Name + ":" + r
image.ImageName = v.Name
image.ImageStatus = "created"
resp.Images = append(resp.Images, &image)
}
}
return resp, nil
case *hpcAC.GetImageListAiResp:
inresp := (interface{})(in).(*hpcAC.GetImageListAiResp)
var resp types.GetLinkImageListResp
if inresp.Code == "0" {
resp.Success = true
for _, img := range inresp.Data {
var image types.ImageSl
image.ImageId = img.ImageId
image.ImageName = img.Version
image.ImageStatus = "created"
resp.Images = append(resp.Images, &image)
}
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
resp.Images = nil
}
return resp, nil
case *octopus.CreateTrainJobResp:
inresp := (interface{})(in).(*octopus.CreateTrainJobResp)
var resp types.SubmitLinkTaskResp
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
return resp, nil
}
resp.TaskId = inresp.Payload.JobId
return resp, nil
case *modelarts.CreateTrainingJobResp:
inresp := (interface{})(in).(*modelarts.CreateTrainingJobResp)
var resp types.SubmitLinkTaskResp
if inresp.ErrorMsg != "" {
resp.ErrorMsg = inresp.ErrorMsg
resp.Success = false
return resp, nil
}
resp.Success = true
resp.TaskId = inresp.Metadata.Id
return resp, nil
case *hpcAC.SubmitTaskAiResp:
inresp := (interface{})(in).(*hpcAC.SubmitTaskAiResp)
var resp types.SubmitLinkTaskResp
if inresp.Code == "0" {
resp.Success = true
resp.TaskId = inresp.Data
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
}
return resp, nil
case *hpcAC.SubmitJobResp:
inresp := (interface{})(in).(*hpcAC.SubmitJobResp)
var resp types.SubmitLinkTaskResp
if inresp.Code == "0" {
resp.Success = true
resp.TaskId = inresp.Data
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
}
return resp, nil
case *octopus.GetTrainJobResp:
inresp := (interface{})(in).(*octopus.GetTrainJobResp)
var resp types.GetLinkTaskResp
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
return resp, nil
}
var task types.TaskSl
task.TaskId = inresp.Payload.TrainJob.Id
task.TaskName = inresp.Payload.TrainJob.Name
task.StartedAt = inresp.Payload.TrainJob.StartedAt
task.CompletedAt = inresp.Payload.TrainJob.CompletedAt
task.TaskStatus = inresp.Payload.TrainJob.Status
resp.Task = &task
return resp, nil
case *modelarts.JobResponse:
inresp := (interface{})(in).(*modelarts.JobResponse)
var resp types.GetLinkTaskResp
if inresp.ErrorMsg != "" {
resp.ErrorMsg = inresp.ErrorMsg
resp.Success = false
return resp, nil
}
resp.Success = true
resp.Task = &types.TaskSl{}
resp.Task.TaskId = inresp.Metadata.Id
resp.Task.TaskName = inresp.Metadata.Name
resp.Task.StartedAt = int64(inresp.Status.StartTime)
resp.Task.CompletedAt = int64(inresp.Status.Duration)
resp.Task.TaskStatus = inresp.Status.Phase
return resp, nil
case *hpcAC.GetPytorchTaskResp:
inresp := (interface{})(in).(*hpcAC.GetPytorchTaskResp)
var resp types.GetLinkTaskResp
if inresp.Code == "0" {
resp.Success = true
var task types.TaskSl
task.TaskId = inresp.Data.Id
task.TaskName = inresp.Data.TaskName
task.TaskStatus = inresp.Data.Status
task.StartedAt = timeutils.StringToUnixTime(inresp.Data.StartTime)
task.CompletedAt = timeutils.StringToUnixTime(inresp.Data.EndTime)
resp.Task = &task
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
resp.Task = nil
}
return resp, nil
case *hpcAC.GetJobDetailResp:
inresp := (interface{})(in).(*hpcAC.GetJobDetailResp)
var resp types.GetLinkTaskResp
if inresp.Code == "0" {
resp.Success = true
var task types.TaskSl
task.TaskId = inresp.Data.JobId
task.TaskName = inresp.Data.JobName
task.TaskStatus = AcStatus[inresp.Data.JobStatus]
task.StartedAt = timeutils.StringToUnixTime(inresp.Data.JobStartTime)
task.CompletedAt = timeutils.StringToUnixTime(inresp.Data.JobEndTime)
resp.Task = &task
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
resp.Task = nil
}
return resp, nil
case *hpcAC.HistoryJobDetailResp:
inresp := (interface{})(in).(*hpcAC.HistoryJobDetailResp)
var resp types.GetLinkTaskResp
if inresp.Code == "0" {
resp.Success = true
var task types.TaskSl
task.TaskId = inresp.Data.JobId
task.TaskName = inresp.Data.JobName
task.TaskStatus = AcStatus[inresp.Data.JobState]
task.StartedAt = timeutils.StringToUnixTime(inresp.Data.JobStartTime)
task.CompletedAt = timeutils.StringToUnixTime(inresp.Data.JobEndTime)
resp.Task = &task
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
resp.Task = nil
}
return resp, nil
case *octopus.DeleteTrainJobResp:
inresp := (interface{})(in).(*octopus.DeleteTrainJobResp)
var resp types.DeleteLinkTaskResp
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
return resp, nil
}
return resp, nil
case *modelarts.DeleteTrainingJobResp:
inresp := (interface{})(in).(*modelarts.DeleteTrainingJobResp)
var resp types.DeleteLinkTaskResp
if inresp.ErrorMsg != "" {
resp.ErrorMsg = inresp.ErrorMsg
resp.Success = false
return resp, nil
}
resp.Success = true
return resp, nil
case *hpcAC.DeleteTaskAiResp:
inresp := (interface{})(in).(*hpcAC.DeleteTaskAiResp)
var resp types.DeleteLinkTaskResp
if inresp.Code == "0" {
resp.Success = true
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
}
return resp, nil
case *hpcAC.DeleteJobResp:
inresp := (interface{})(in).(*hpcAC.DeleteJobResp)
var resp types.DeleteLinkTaskResp
if inresp.Code == "0" {
resp.Success = true
} else {
resp.Success = false
resp.ErrorMsg = inresp.Msg
}
return resp, nil
case *octopus.GetResourceSpecsResp:
inresp := (interface{})(in).(*octopus.GetResourceSpecsResp)
var resp types.GetResourceSpecsResp
resp.Success = inresp.Success
if !resp.Success {
resp.ResourceSpecs = nil
return resp, nil
}
for _, spec := range inresp.TrainResourceSpecs {
var respec types.ResourceSpecSl
respec.SpecId = spec.Id
respec.SpecName = spec.Name
respec.ParticipantId = participant.Id
respec.ParticipantName = participant.Name
respec.SpecPrice = spec.Price
resp.ResourceSpecs = append(resp.ResourceSpecs, &respec)
}
return resp, nil
case *hpcAC.GetResourceSpecResp:
inresp := (interface{})(in).(*hpcAC.GetResourceSpecResp)
var resp types.GetResourceSpecsResp
if inresp.Code != "0" {
resp.Success = false
resp.ResourceSpecs = nil
} else {
var spec types.ResourceSpecSl
resp.Success = true
spec.ParticipantName = participant.Name
spec.ParticipantId = participant.Id
spec.SpecName = SHUGUANGAI_CUSTOM_RESOURCE_NAME
spec.SpecId = SHUGUANGAI_CUSTOM_RESOURCE_ID
resp.ResourceSpecs = append(resp.ResourceSpecs, &spec)
}
return resp, nil
case *modelarts.TrainingJobFlavorsResp:
inresp := (interface{})(in).(*modelarts.TrainingJobFlavorsResp)
var resp types.GetResourceSpecsResp
resp.Success = true
if inresp.Flavors == nil {
resp.Success = false
resp.ResourceSpecs = nil
return resp, nil
}
for _, spec := range inresp.Flavors {
var respec types.ResourceSpecSl
respec.SpecId = spec.FlavorId
respec.SpecName = spec.FlavorName
respec.ParticipantId = participant.Id
respec.ParticipantName = participant.Name
respec.SpecPrice = 0
resp.ResourceSpecs = append(resp.ResourceSpecs, &respec)
}
return resp, nil
default:
return nil, errors.New("type convert fail")
}
}
func ConvertType[T any](in *T, participant *models.StorelinkCenter) (interface{}, error) {
switch (interface{})(in).(type) {