diff --git a/utils/storage/data.go b/utils/storage/data.go deleted file mode 100644 index 62bdf31..0000000 --- a/utils/storage/data.go +++ /dev/null @@ -1,20 +0,0 @@ -package storage - -// newData 创建一个数据存储 -func newData[Body any](body Body) *Data[Body] { - data := &Data[Body]{ - body: body, - } - return data -} - -// Data 数据存储 -// - 数据存储默认拥有一个 Body 字段 -type Data[Body any] struct { - body Body -} - -// Handle 处理数据 -func (slf *Data[Body]) Handle(handler func(data Body)) { - handler(slf.body) -} diff --git a/utils/storage/data_test.go b/utils/storage/data_test.go deleted file mode 100644 index b8c1aa0..0000000 --- a/utils/storage/data_test.go +++ /dev/null @@ -1,56 +0,0 @@ -package storage_test - -import ( - "encoding/json" - "fmt" - "github.com/kercylan98/minotaur/utils/slice" - "github.com/kercylan98/minotaur/utils/storage" - "testing" -) - -type Player struct { - ID string `json:"id"` - Name string `json:"name"` - Power int64 `json:"power"` -} - -func TestData_Struct(t *testing.T) { - player := storage.NewSet[string, *Player](new(Player), - func(data *Player) string { - return data.ID - }, storage.WithIndex[string, string, *Player]("id", func(data *Player) string { - return data.ID - }), storage.WithIndex[string, string, *Player]("name", func(data *Player) string { - return data.Name - }), - ) - - p := player.New() - - p.Handle(func(data *Player) { - data.ID = "1" - data.Name = "kercylan" - data.Power = 100 - }) - - str, err := player.Struct(p) - if err != nil { - panic(err) - } - bytes, err := json.Marshal(str) - if err != nil { - panic(err) - } - - fmt.Println(string(bytes)) -} - -func TestData_Handle(t *testing.T) { - var is []int - for i := 1; i <= 23; i++ { - is = append(is, i) - } - - res := slice.LimitedCombinations(is, 5, 5) - fmt.Println("Count:", len(res)) -} diff --git a/utils/storage/errors.go b/utils/storage/errors.go new file mode 100644 index 0000000..e2adc1e --- /dev/null +++ b/utils/storage/errors.go @@ -0,0 +1,8 @@ +package storage + +import "errors" + +var ( + // ErrDataNotExist 数据不存在 + ErrDataNotExist = errors.New("data not exist") +) diff --git a/utils/storage/set.go b/utils/storage/set.go deleted file mode 100644 index 71f07fe..0000000 --- a/utils/storage/set.go +++ /dev/null @@ -1,97 +0,0 @@ -package storage - -import ( - jsonIter "github.com/json-iterator/go" - "github.com/kercylan98/minotaur/utils/generic" - "github.com/kercylan98/minotaur/utils/str" - "reflect" -) - -var json = jsonIter.ConfigCompatibleWithStandardLibrary - -func NewSet[PrimaryKey generic.Ordered, Body any](zero Body, getIndex func(data Body) PrimaryKey, options ...SetOption[PrimaryKey, Body]) *Set[PrimaryKey, Body] { - set := &Set[PrimaryKey, Body]{ - zero: zero, - tf: reflect.Indirect(reflect.ValueOf(zero)).Type(), - getIndex: getIndex, - bodyField: reflect.StructField{ - Name: DefaultBodyFieldName, - Type: reflect.TypeOf(str.None), - }, - items: make(map[PrimaryKey]*Data[Body]), - } - for _, option := range options { - option(set) - } - return set -} - -// Set 数据集合 -type Set[PrimaryKey generic.Ordered, Body any] struct { - storage Storage[PrimaryKey, Body] - zero Body - tf reflect.Type - getIndex func(data Body) PrimaryKey - bodyField reflect.StructField - indexes []reflect.StructField - getIndexValue map[string]func(data Body) any - items map[PrimaryKey]*Data[Body] -} - -// New 创建一份新数据 -// - 这份数据不会被存储 -func (slf *Set[PrimaryKey, Body]) New() *Data[Body] { - var data = reflect.New(slf.tf).Interface().(Body) - return newData(data) -} - -// Get 通过主键获取数据 -// - 优先从内存中加载,如果数据不存在,则尝试从存储中加载 -func (slf *Set[PrimaryKey, Body]) Get(index PrimaryKey) (*Data[Body], error) { - if data, exist := slf.items[index]; exist { - return data, nil - } - body, err := slf.storage.Load(index) - if err != nil { - return nil, err - } - data := newData(body) - slf.items[index] = data - return data, nil -} - -// Set 设置数据 -// - 该方法会将数据存储到内存中 -func (slf *Set[PrimaryKey, Body]) Set(data *Data[Body]) { - slf.items[slf.getIndex(data.body)] = data -} - -// Save 保存数据 -// - 该方法会将数据存储到存储器中 -func (slf *Set[PrimaryKey, Body]) Save(data *Data[Body]) error { - return slf.storage.Save(slf, slf.getIndex(data.body), data.body) -} - -// Struct 将数据存储转换为结构体 -func (slf *Set[PrimaryKey, Body]) Struct(data *Data[Body]) (any, error) { - var fields = make([]reflect.StructField, 0, len(slf.indexes)+1) - for _, field := range append(slf.indexes, slf.bodyField) { - fields = append(fields, field) - } - instance := reflect.New(reflect.StructOf(fields)) - value := instance.Elem() - for _, field := range slf.indexes { - get, exist := slf.getIndexValue[field.Name] - if !exist { - continue - } - value.FieldByName(field.Name).Set(reflect.ValueOf(get(data.body))) - } - bytes, err := json.Marshal(data.body) - if err != nil { - return nil, err - } - - value.FieldByName(slf.bodyField.Name).Set(reflect.ValueOf(string(bytes))) - return value.Interface(), nil -} diff --git a/utils/storage/set_options.go b/utils/storage/set_options.go deleted file mode 100644 index 51166d8..0000000 --- a/utils/storage/set_options.go +++ /dev/null @@ -1,65 +0,0 @@ -package storage - -import ( - "github.com/kercylan98/minotaur/utils/generic" - "github.com/kercylan98/minotaur/utils/str" - "reflect" - "strings" -) - -const ( - DefaultBodyFieldName = "Data" -) - -type SetOption[PrimaryKey generic.Ordered, Body any] func(set *Set[PrimaryKey, Body]) - -// WithIndex 添加一个索引 -// - 索引将会在数据结构体中创建一个字段,这个字段必须可以在 Body 内部找到,用于对查找功能的拓展 -func WithIndex[PrimaryKey generic.Ordered, Index generic.Ordered, Body any](name string, getValue func(data Body) Index) SetOption[PrimaryKey, Body] { - return func(set *Set[PrimaryKey, Body]) { - WithTagIndex[PrimaryKey, Index, Body](name, nil, getValue)(set) - } -} - -// WithTagIndex 添加一个带 tag 的索引 -// - 同 WithIndex,但是可以自定义索引的 tag -func WithTagIndex[PrimaryKey generic.Ordered, Index generic.Ordered, Body any](name string, tags []string, getValue func(data Body) Index) SetOption[PrimaryKey, Body] { - return func(set *Set[PrimaryKey, Body]) { - value := getValue(set.zero) - upperName := str.FirstUpper(name) - if set.getIndexValue == nil { - set.getIndexValue = map[string]func(data Body) any{} - } - set.getIndexValue[upperName] = func(data Body) any { - return getValue(data) - } - var tag string - if len(tags) > 0 { - tag = strings.Join(tags, " ") - } - set.indexes = append(set.indexes, reflect.StructField{ - Name: upperName, - Type: reflect.TypeOf(value), - Tag: reflect.StructTag(tag), - }) - } -} - -// WithBodyName 设置 Body 字段名称 -// - 默认字段名称为 DefaultBodyFieldName -func WithBodyName[PrimaryKey generic.Ordered, Body any](name string) SetOption[PrimaryKey, Body] { - return func(set *Set[PrimaryKey, Body]) { - if len(name) == 0 { - return - } - set.bodyField.Name = str.FirstUpper(name) - } -} - -// WithBodyTag 设置 Body 字段标签 -// - 如果有多个标签,将会以空格分隔,例如:`json:"data" yaml:"data"` -func WithBodyTag[PrimaryKey generic.Ordered, Body any](tags ...string) SetOption[PrimaryKey, Body] { - return func(set *Set[PrimaryKey, Body]) { - set.bodyField.Tag = reflect.StructTag(strings.Join(tags, " ")) - } -} diff --git a/utils/storage/storage.go b/utils/storage/storage.go index 4491839..a8e143c 100644 --- a/utils/storage/storage.go +++ b/utils/storage/storage.go @@ -1,11 +1,142 @@ package storage -import "github.com/kercylan98/minotaur/utils/generic" +import ( + "github.com/kercylan98/minotaur/utils/super" + "sync" + "time" +) -type Storage[PrimaryKey generic.Ordered, Body any] interface { - // Load 加载数据 - Load(index PrimaryKey) (Body, error) +// New 创建一个新的存储器 +func New[K comparable, D any, W Warehouse[K, D]](warehouse W) *Storage[K, D, W] { + s := &Storage[K, D, W]{ + w: warehouse, + } - // Save 保存数据 - Save(set *Set[PrimaryKey, Body], index PrimaryKey, data any) error + if cache, err := warehouse.Init(); err != nil { + panic(err) + } else { + s.cache = cache + } + + if s.cache == nil { + s.cache = make(map[K][]byte) + } + return s +} + +// Storage 用于缓存数据的存储器 +type Storage[K comparable, D any, W Warehouse[K, D]] struct { + cl sync.RWMutex + sl sync.Mutex + w W + cache map[K][]byte +} + +// Query 查询特定 key 的数据 +func (slf *Storage[Key, D, W]) Query(key Key) (v D, err error) { + slf.cl.RLock() + cache, exist := slf.cache[key] + slf.cl.RUnlock() + + if !exist { + cache, err = slf.w.Query(key) + if err != nil { + panic(err) + } + } + + v = slf.w.GenerateZero() + if err = super.UnmarshalJSON(cache, v); err != nil { + panic(err) + } + return v, err +} + +// Create 创建特定 key 的数据 +func (slf *Storage[K, D, W]) Create(key K, data D) error { + d, err := super.MarshalJSONE(data) + if err != nil { + return err + } + return slf.w.Create(key, d) +} + +// Save 保存特定 key 的数据 +func (slf *Storage[K, D, W]) Save(key K, data D) error { + d, err := super.MarshalJSONE(data) + if err != nil { + return err + } + slf.cl.Lock() + slf.cache[key] = d + slf.cl.Unlock() + return nil +} + +// Flush 将缓存中的数据全部保存到数据库中,如果保存失败,会调用 errHandle 处理错误,如果 errHandle 返回 >= 0 的时长,则继续尝试保存,否则将跳过本条数据 +// - 当 errHandle 为 nil 时,将会无限重试保存,间隔为 100ms +func (slf *Storage[K, D, W]) Flush(errHandle func(data []byte, err error) time.Duration) { + slf.cl.Lock() + if len(slf.cache) == 0 { + slf.cl.Unlock() + return + } + cache := slf.cache + slf.cache = make(map[K][]byte) + slf.cl.Unlock() + + slf.sl.Lock() + defer slf.sl.Unlock() + for key, data := range cache { + for { + if err := slf.w.Save(key, data); err != nil { + if errHandle == nil { + time.Sleep(time.Millisecond * 100) + continue + } else if d := errHandle(data, err); d >= 0 { + time.Sleep(d) + continue + } else { + break + } + } else { + break + } + } + } +} + +// Migrate 迁移数据,如果 keys 为空,则迁移全部数据 +func (slf *Storage[K, D, W]) Migrate(keys ...K) (data []byte, err error) { + slf.cl.RLock() + defer slf.cl.RUnlock() + if len(keys) > 0 { + var m = make(map[K][]byte) + for _, key := range keys { + d, exist := slf.cache[key] + if !exist { + d, err = slf.w.Query(key) + if err != nil { + return nil, ErrDataNotExist + } + } + m[key] = d + } + return super.MarshalJSONE(m) + } + return super.MarshalJSONE(slf.cache) +} + +// LoadMigrationData 加载迁移数据 +func (slf *Storage[K, D, W]) LoadMigrationData(data []byte) error { + m := make(map[K][]byte) + if err := super.UnmarshalJSON(data, &m); err != nil { + return err + } + slf.cl.Lock() + defer slf.cl.Unlock() + for key, d := range m { + slf.cache[key] = d + } + return nil } diff --git a/utils/storage/storage_test.go b/utils/storage/storage_test.go new file mode 100644 index 0000000..0f62393 --- /dev/null +++ b/utils/storage/storage_test.go @@ -0,0 +1,48 @@ +package storage_test + +import ( + "github.com/kercylan98/minotaur/utils/storage" + "testing" +) + +var fakeDB = map[any][]byte{} + +type Player struct { + ID string + Name string +} + +type PlayerWarehouse[K string, D *Player] struct { +} + +func (slf *PlayerWarehouse[K, D]) GenerateZero() D { + return &Player{} +} + +func (slf *PlayerWarehouse[K, D]) Init() (map[K][]byte, error) { + return nil, nil +} + +func (slf *PlayerWarehouse[K, D]) Query(key K) (data []byte, err error) { + return fakeDB[key], nil +} + +func (slf *PlayerWarehouse[K, D]) Create(key K, data []byte) error { + fakeDB[key] = data + return nil +} + +func (slf *PlayerWarehouse[K, D]) Save(key K, data []byte) error { + fakeDB[key] = data + return nil +} + +func TestStorage(t *testing.T) { + s := storage.New[string, *Player, *PlayerWarehouse[string, *Player]](new(PlayerWarehouse[string, *Player])) + _ = s.Create("1", &Player{ + ID: "1", + Name: "1", + }) + player, _ := s.Query("1") + t.Log(player) +} diff --git a/utils/storage/warehouse.go b/utils/storage/warehouse.go new file mode 100644 index 0000000..0c34151 --- /dev/null +++ b/utils/storage/warehouse.go @@ -0,0 +1,20 @@ +package storage + +// Warehouse 数据仓库接口,用于数据生产及持久化 +type Warehouse[K comparable, D any] interface { + // GenerateZero 生成一个空的数据对象 + GenerateZero() D + + // Init 用于数据初始化,例如启动时从数据库中加载所有玩家的离线数据等情况 + Init() (map[K][]byte, error) + + // Query 查询特定 key 的数据 + // - 返回的 err 应该排除数据不存在的错误情况,例如:sql.ErrNoRows + Query(key K) (data []byte, err error) + + // Create 创建特定 key 的数据 + Create(key K, data []byte) error + + // Save 保存特定 key 的数据 + Save(key K, data []byte) error +} diff --git a/utils/super/json.go b/utils/super/json.go index df0fb4a..5f505b5 100644 --- a/utils/super/json.go +++ b/utils/super/json.go @@ -22,6 +22,12 @@ func MarshalJSON(v interface{}) []byte { return b } +// MarshalJSONE 将对象转换为 json +// - 当转换失败时,将返回错误信息 +func MarshalJSONE(v interface{}) ([]byte, error) { + return json.Marshal(v) +} + // UnmarshalJSON 将 json 转换为对象 func UnmarshalJSON(data []byte, v interface{}) error { return json.Unmarshal(data, v)