Merge pull request 'updated strategy parameters' (#17) from tzwang/pcm-coordinator:master into master

Former-commit-id: 44a3dfd14d9adbdab690b2c7c1e8be324ecf842f
This commit is contained in:
tzwang 2024-01-30 17:40:38 +08:00
commit 75543eacae
17 changed files with 217 additions and 22 deletions

View File

@ -27,6 +27,7 @@ import (
"gorm.io/gorm"
"sigs.k8s.io/yaml"
"strings"
"sync"
)
type Scheduler struct {
@ -38,7 +39,8 @@ type Scheduler struct {
participantRpc participantservice.ParticipantService
ResourceCollector *map[string]collector.ResourceCollector
Storages database.Storage
AiExecutor *map[string]executor.Executor
AiExecutor *map[string]executor.AiExecutor
mu sync.RWMutex
}
func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) {
@ -50,7 +52,7 @@ func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB,
return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil
}
func NewScheduler2(resourceCollector *map[string]collector.ResourceCollector, storages database.Storage, aiExecutor *map[string]executor.Executor) *Scheduler {
func NewScheduler2(resourceCollector *map[string]collector.ResourceCollector, storages database.Storage, aiExecutor *map[string]executor.AiExecutor) *Scheduler {
return &Scheduler{ResourceCollector: resourceCollector, Storages: storages, AiExecutor: aiExecutor}
}

View File

@ -19,6 +19,7 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
@ -30,6 +31,7 @@ type AiScheduler struct {
yamlString string
task *response.TaskInfo
*scheduler.Scheduler
option option.AiOption
}
func NewAiScheduler(val string, scheduler *scheduler.Scheduler) (*AiScheduler, error) {
@ -48,7 +50,7 @@ func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource strin
}
func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
resources, err := as.findProvidersWithResource()
resources, err := as.findClustersWithResource()
if err != nil {
return nil, err
}
@ -61,7 +63,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
Name: resource.Name,
})
}
strategy := strategy.NewReplicationStrategy(nil, 0)
strategy := strategy.NewReplicationStrategy(nil)
return strategy, nil
}
@ -83,12 +85,19 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
return errors.New("clusters is nil")
}
_ = *as.AiExecutor
executorMap := *as.AiExecutor
for _, cluster := range clusters {
_, err := executorMap[cluster.Name].Execute(option.AiOption{})
if err != nil {
// TODO: database operation
}
// TODO: database operation
}
return nil
}
func (as *AiScheduler) findProvidersWithResource() ([]*collector.ResourceSpecs, error) {
func (as *AiScheduler) findClustersWithResource() ([]*collector.ResourceSpecs, error) {
var resourceSpecs []*collector.ResourceSpecs
for _, resourceCollector := range *as.ResourceCollector {
spec, err := resourceCollector.GetResourceSpecs()

View File

@ -0,0 +1,17 @@
package option
type AiOption struct {
aiType string // shuguangAi/octopus
resourceType string // cpu/gpu/compute card
taskType string // pytorch/tensorflow
imageId string
specId string
datasetsId string
codeId string
cmd string
datasets string
code string
}

View File

@ -24,8 +24,8 @@ var (
}
)
func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.Executor, *map[string]collector.ResourceCollector) {
executorMap := make(map[string]executor.Executor)
func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.AiExecutor, *map[string]collector.ResourceCollector) {
executorMap := make(map[string]executor.AiExecutor)
collectorMap := make(map[string]collector.ResourceCollector)
for k, v := range AiTypeMap {
switch v {

View File

@ -0,0 +1,11 @@
package executor
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink"
)
type AiExecutor interface {
Execute(option option.AiOption) (interface{}, error)
storeLink.Linkage
}

View File

@ -1,8 +0,0 @@
package executor
type Executor interface {
QueryImageList() (interface{}, error)
SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error)
QueryTask(taskId string) (interface{}, error)
QuerySpecs() (interface{}, error)
}

View File

@ -2,3 +2,7 @@ package strategy
type DynamicResourcesStrategy struct {
}
func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
return nil, nil
}

View File

@ -0,0 +1,9 @@
package params
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
)
type Params struct {
resources []*collector.ResourceSpecs
}

View File

@ -0,0 +1,16 @@
package params
import "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
type ReplicationOption struct {
replicas int32
participants []entity.Participant
}
func (o *ReplicationOption) GetReplicas() int32 {
return o.replicas
}
func (o *ReplicationOption) GetParticipants() []entity.Participant {
return o.participants
}

View File

@ -0,0 +1,26 @@
package params
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
)
type ResourcePricingOption struct {
replicas int32
task *providerPricing.Task
providers []*providerPricing.Provider
*Params
}
func NewResourcePricingOption(params *Params) *ResourcePricingOption {
return &ResourcePricingOption{
Params: params,
}
}
func (r *ResourcePricingOption) GetReplicas() int32 {
return r.replicas
}
func (r *ResourcePricingOption) GetProviders() []*providerPricing.Provider {
return r.providers
}

View File

@ -3,6 +3,7 @@ package strategy
import (
"github.com/pkg/errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/params"
)
type ReplicationStrategy struct {
@ -10,9 +11,9 @@ type ReplicationStrategy struct {
participants []entity.Participant
}
func NewReplicationStrategy(participants []entity.Participant, replicas int32) *ReplicationStrategy {
return &ReplicationStrategy{replicas: replicas,
participants: participants,
func NewReplicationStrategy(params *params.ReplicationOption) *ReplicationStrategy {
return &ReplicationStrategy{replicas: params.GetReplicas(),
participants: params.GetParticipants(),
}
}

View File

@ -13,6 +13,12 @@ type StaticWeightStrategy struct {
weights []entity.WeightP
}
func NewStaticWeightStrategy(weights []entity.WeightP, replicas int32) *StaticWeightStrategy {
return &StaticWeightStrategy{weights: weights,
num: replicas,
}
}
func (ps *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) {
// TODO: implement the scheduling logic return nil, nil

View File

@ -9,6 +9,3 @@ type AssignedCluster struct {
Name string
Replicas int32
}
type Options struct {
}

View File

@ -0,0 +1,90 @@
package test
import (
"fmt"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"testing"
)
func TestReplication(t *testing.T) {
parts := []entity.Participant{
{Name: "test1", Participant_id: 1},
{Name: "test2", Participant_id: 2},
{Name: "test3", Participant_id: 3},
}
tests := []struct {
name string
replica int32
ps []entity.Participant
}{
{
name: "test1",
replica: 1,
ps: parts,
},
{
name: "test2",
replica: 2,
ps: parts,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
repl := strategy.NewReplicationStrategy(nil)
schedule, err := repl.Schedule()
if err != nil {
return
}
for _, cluster := range schedule {
fmt.Println(cluster)
}
})
}
}
func TestStaticWeight(t *testing.T) {
parts := []entity.WeightP{
{Name: "p1", Participant_id: 1, Weight: 3},
{Name: "p2", Participant_id: 2, Weight: 5},
{Name: "p3", Participant_id: 3, Weight: 2},
}
tests := []struct {
name string
replica int32
ps []entity.WeightP
}{
{
name: "test1",
replica: 1,
ps: parts,
},
{
name: "test2",
replica: 5,
ps: parts,
},
{
name: "test2",
replica: 6,
ps: parts,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
repl := strategy.NewStaticWeightStrategy(tt.ps, tt.replica)
schedule, err := repl.Schedule()
if err != nil {
return
}
for _, cluster := range schedule {
fmt.Println(cluster)
}
})
}
}

View File

@ -16,6 +16,7 @@ package storeLink
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
@ -151,3 +152,7 @@ func (o *ModelArtsLink) QuerySpecs() (interface{}, error) {
func (o *ModelArtsLink) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil
}
func (o *ModelArtsLink) Execute(option option.AiOption) (interface{}, error) {
return nil, nil
}

View File

@ -16,6 +16,7 @@ package storeLink
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
@ -198,3 +199,7 @@ func (o *OctopusLink) QuerySpecs() (interface{}, error) {
func (o *OctopusLink) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil
}
func (o *OctopusLink) Execute(option option.AiOption) (interface{}, error) {
return nil, nil
}

View File

@ -18,6 +18,7 @@ import (
"context"
"errors"
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
@ -174,3 +175,7 @@ func (o *ShuguangAi) QuerySpecs() (interface{}, error) {
func (o *ShuguangAi) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil
}
func (o *ShuguangAi) Execute(option option.AiOption) (interface{}, error) {
return nil, nil
}