diff --git a/api/internal/scheduler/scheduler.go b/api/internal/scheduler/scheduler.go index e63a3873..e791c9a0 100644 --- a/api/internal/scheduler/scheduler.go +++ b/api/internal/scheduler/scheduler.go @@ -27,6 +27,7 @@ import ( "gorm.io/gorm" "sigs.k8s.io/yaml" "strings" + "sync" ) type Scheduler struct { @@ -38,7 +39,8 @@ type Scheduler struct { participantRpc participantservice.ParticipantService ResourceCollector *map[string]collector.ResourceCollector Storages database.Storage - AiExecutor *map[string]executor.Executor + AiExecutor *map[string]executor.AiExecutor + mu sync.RWMutex } func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { @@ -50,7 +52,7 @@ func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil } -func NewScheduler2(resourceCollector *map[string]collector.ResourceCollector, storages database.Storage, aiExecutor *map[string]executor.Executor) *Scheduler { +func NewScheduler2(resourceCollector *map[string]collector.ResourceCollector, storages database.Storage, aiExecutor *map[string]executor.AiExecutor) *Scheduler { return &Scheduler{ResourceCollector: resourceCollector, Storages: storages, AiExecutor: aiExecutor} } diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 5f9f2d22..68fc15f1 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -19,6 +19,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" @@ -30,6 +31,7 @@ type AiScheduler struct { yamlString string task *response.TaskInfo *scheduler.Scheduler + option option.AiOption } func NewAiScheduler(val string, scheduler *scheduler.Scheduler) (*AiScheduler, error) { @@ -48,7 +50,7 @@ func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource strin } func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { - resources, err := as.findProvidersWithResource() + resources, err := as.findClustersWithResource() if err != nil { return nil, err } @@ -61,7 +63,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { Name: resource.Name, }) } - strategy := strategy.NewReplicationStrategy(nil, 0) + strategy := strategy.NewReplicationStrategy(nil) return strategy, nil } @@ -83,12 +85,19 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { return errors.New("clusters is nil") } - _ = *as.AiExecutor + executorMap := *as.AiExecutor + for _, cluster := range clusters { + _, err := executorMap[cluster.Name].Execute(option.AiOption{}) + if err != nil { + // TODO: database operation + } + // TODO: database operation + } return nil } -func (as *AiScheduler) findProvidersWithResource() ([]*collector.ResourceSpecs, error) { +func (as *AiScheduler) findClustersWithResource() ([]*collector.ResourceSpecs, error) { var resourceSpecs []*collector.ResourceSpecs for _, resourceCollector := range *as.ResourceCollector { spec, err := resourceCollector.GetResourceSpecs() diff --git a/api/internal/scheduler/schedulers/option/aiOption.go b/api/internal/scheduler/schedulers/option/aiOption.go new file mode 100644 index 00000000..2d45383c --- /dev/null +++ b/api/internal/scheduler/schedulers/option/aiOption.go @@ -0,0 +1,17 @@ +package option + +type AiOption struct { + aiType string // shuguangAi/octopus + resourceType string // cpu/gpu/compute card + taskType string // pytorch/tensorflow + + imageId string + specId string + datasetsId string + codeId string + + cmd string + + datasets string + code string +} diff --git a/api/internal/scheduler/service/aiService.go b/api/internal/scheduler/service/aiService.go index 3b46596b..29ab4653 100644 --- a/api/internal/scheduler/service/aiService.go +++ b/api/internal/scheduler/service/aiService.go @@ -24,8 +24,8 @@ var ( } ) -func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.Executor, *map[string]collector.ResourceCollector) { - executorMap := make(map[string]executor.Executor) +func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.AiExecutor, *map[string]collector.ResourceCollector) { + executorMap := make(map[string]executor.AiExecutor) collectorMap := make(map[string]collector.ResourceCollector) for k, v := range AiTypeMap { switch v { diff --git a/api/internal/scheduler/service/executor/aiExecutor.go b/api/internal/scheduler/service/executor/aiExecutor.go new file mode 100644 index 00000000..a52ab062 --- /dev/null +++ b/api/internal/scheduler/service/executor/aiExecutor.go @@ -0,0 +1,11 @@ +package executor + +import ( + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink" +) + +type AiExecutor interface { + Execute(option option.AiOption) (interface{}, error) + storeLink.Linkage +} diff --git a/api/internal/scheduler/service/executor/executor.go b/api/internal/scheduler/service/executor/executor.go deleted file mode 100644 index 917e45ca..00000000 --- a/api/internal/scheduler/service/executor/executor.go +++ /dev/null @@ -1,8 +0,0 @@ -package executor - -type Executor interface { - QueryImageList() (interface{}, error) - SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) - QueryTask(taskId string) (interface{}, error) - QuerySpecs() (interface{}, error) -} diff --git a/api/internal/scheduler/strategy/dynamicResources.go b/api/internal/scheduler/strategy/dynamicResources.go index 579333f2..4bb4a83e 100644 --- a/api/internal/scheduler/strategy/dynamicResources.go +++ b/api/internal/scheduler/strategy/dynamicResources.go @@ -2,3 +2,7 @@ package strategy type DynamicResourcesStrategy struct { } + +func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { + return nil, nil +} diff --git a/api/internal/scheduler/strategy/params/params.go b/api/internal/scheduler/strategy/params/params.go new file mode 100644 index 00000000..44c29f13 --- /dev/null +++ b/api/internal/scheduler/strategy/params/params.go @@ -0,0 +1,9 @@ +package params + +import ( + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" +) + +type Params struct { + resources []*collector.ResourceSpecs +} diff --git a/api/internal/scheduler/strategy/params/replicationParams.go b/api/internal/scheduler/strategy/params/replicationParams.go new file mode 100644 index 00000000..7adfaea0 --- /dev/null +++ b/api/internal/scheduler/strategy/params/replicationParams.go @@ -0,0 +1,16 @@ +package params + +import "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" + +type ReplicationOption struct { + replicas int32 + participants []entity.Participant +} + +func (o *ReplicationOption) GetReplicas() int32 { + return o.replicas +} + +func (o *ReplicationOption) GetParticipants() []entity.Participant { + return o.participants +} diff --git a/api/internal/scheduler/strategy/params/resourcePricingParams.go b/api/internal/scheduler/strategy/params/resourcePricingParams.go new file mode 100644 index 00000000..10fe7bd6 --- /dev/null +++ b/api/internal/scheduler/strategy/params/resourcePricingParams.go @@ -0,0 +1,26 @@ +package params + +import ( + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" +) + +type ResourcePricingOption struct { + replicas int32 + task *providerPricing.Task + providers []*providerPricing.Provider + *Params +} + +func NewResourcePricingOption(params *Params) *ResourcePricingOption { + return &ResourcePricingOption{ + Params: params, + } +} + +func (r *ResourcePricingOption) GetReplicas() int32 { + return r.replicas +} + +func (r *ResourcePricingOption) GetProviders() []*providerPricing.Provider { + return r.providers +} diff --git a/api/internal/scheduler/strategy/replication.go b/api/internal/scheduler/strategy/replication.go index 88ecd6fb..3f9549b9 100644 --- a/api/internal/scheduler/strategy/replication.go +++ b/api/internal/scheduler/strategy/replication.go @@ -3,6 +3,7 @@ package strategy import ( "github.com/pkg/errors" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/params" ) type ReplicationStrategy struct { @@ -10,9 +11,9 @@ type ReplicationStrategy struct { participants []entity.Participant } -func NewReplicationStrategy(participants []entity.Participant, replicas int32) *ReplicationStrategy { - return &ReplicationStrategy{replicas: replicas, - participants: participants, +func NewReplicationStrategy(params *params.ReplicationOption) *ReplicationStrategy { + return &ReplicationStrategy{replicas: params.GetReplicas(), + participants: params.GetParticipants(), } } diff --git a/api/internal/scheduler/strategy/staticWeight.go b/api/internal/scheduler/strategy/staticWeight.go index b028baaa..5215771d 100644 --- a/api/internal/scheduler/strategy/staticWeight.go +++ b/api/internal/scheduler/strategy/staticWeight.go @@ -13,6 +13,12 @@ type StaticWeightStrategy struct { weights []entity.WeightP } +func NewStaticWeightStrategy(weights []entity.WeightP, replicas int32) *StaticWeightStrategy { + return &StaticWeightStrategy{weights: weights, + num: replicas, + } +} + func (ps *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) { // TODO: implement the scheduling logic return nil, nil diff --git a/api/internal/scheduler/strategy/strategy.go b/api/internal/scheduler/strategy/strategy.go index 1502dc21..af23fbf2 100644 --- a/api/internal/scheduler/strategy/strategy.go +++ b/api/internal/scheduler/strategy/strategy.go @@ -9,6 +9,3 @@ type AssignedCluster struct { Name string Replicas int32 } - -type Options struct { -} diff --git a/api/internal/scheduler/strategy/test/strategy_test.go b/api/internal/scheduler/strategy/test/strategy_test.go new file mode 100644 index 00000000..767d71c4 --- /dev/null +++ b/api/internal/scheduler/strategy/test/strategy_test.go @@ -0,0 +1,90 @@ +package test + +import ( + "fmt" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy" + "testing" +) + +func TestReplication(t *testing.T) { + parts := []entity.Participant{ + {Name: "test1", Participant_id: 1}, + {Name: "test2", Participant_id: 2}, + {Name: "test3", Participant_id: 3}, + } + tests := []struct { + name string + replica int32 + ps []entity.Participant + }{ + { + name: "test1", + replica: 1, + ps: parts, + }, + { + name: "test2", + replica: 2, + ps: parts, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + repl := strategy.NewReplicationStrategy(nil) + schedule, err := repl.Schedule() + if err != nil { + return + } + for _, cluster := range schedule { + fmt.Println(cluster) + } + + }) + } + +} + +func TestStaticWeight(t *testing.T) { + parts := []entity.WeightP{ + {Name: "p1", Participant_id: 1, Weight: 3}, + {Name: "p2", Participant_id: 2, Weight: 5}, + {Name: "p3", Participant_id: 3, Weight: 2}, + } + tests := []struct { + name string + replica int32 + ps []entity.WeightP + }{ + { + name: "test1", + replica: 1, + ps: parts, + }, + { + name: "test2", + replica: 5, + ps: parts, + }, + { + name: "test2", + replica: 6, + ps: parts, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + repl := strategy.NewStaticWeightStrategy(tt.ps, tt.replica) + schedule, err := repl.Schedule() + if err != nil { + return + } + for _, cluster := range schedule { + fmt.Println(cluster) + } + + }) + } +} diff --git a/api/internal/storeLink/modelarts.go b/api/internal/storeLink/modelarts.go index 350bd0ec..31489205 100644 --- a/api/internal/storeLink/modelarts.go +++ b/api/internal/storeLink/modelarts.go @@ -16,6 +16,7 @@ package storeLink import ( "context" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" @@ -151,3 +152,7 @@ func (o *ModelArtsLink) QuerySpecs() (interface{}, error) { func (o *ModelArtsLink) GetResourceSpecs() (*collector.ResourceSpecs, error) { return nil, nil } + +func (o *ModelArtsLink) Execute(option option.AiOption) (interface{}, error) { + return nil, nil +} diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index 370ec23c..b40da2ee 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -16,6 +16,7 @@ package storeLink import ( "context" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" @@ -198,3 +199,7 @@ func (o *OctopusLink) QuerySpecs() (interface{}, error) { func (o *OctopusLink) GetResourceSpecs() (*collector.ResourceSpecs, error) { return nil, nil } + +func (o *OctopusLink) Execute(option option.AiOption) (interface{}, error) { + return nil, nil +} diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index 49b2edf2..ad84315d 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -18,6 +18,7 @@ import ( "context" "errors" "gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" @@ -174,3 +175,7 @@ func (o *ShuguangAi) QuerySpecs() (interface{}, error) { func (o *ShuguangAi) GetResourceSpecs() (*collector.ResourceSpecs, error) { return nil, nil } + +func (o *ShuguangAi) Execute(option option.AiOption) (interface{}, error) { + return nil, nil +}