feat: gateway 网关支持通过可选项自定义端点选择器

This commit is contained in:
kercylan98 2023-08-16 17:16:14 +08:00
parent 5ff74b623d
commit e0f43c5bfb
4 changed files with 30 additions and 8 deletions

View File

@ -49,12 +49,14 @@ func (slf *Endpoint) Write(packet server.Packet) {
slf.client.Write(packet) slf.client.Write(packet)
} }
// onConnectionClosed 与端点连接断开事件
func (slf *Endpoint) onConnectionClosed(conn *client.Websocket, err any) { func (slf *Endpoint) onConnectionClosed(conn *client.Websocket, err any) {
if !slf.offline { if !slf.offline {
go slf.Connect() go slf.Connect()
} }
} }
// onConnectionReceivePacket 解说到来自端点的数据包事件
func (slf *Endpoint) onConnectionReceivePacket(conn *client.Websocket, packet server.Packet) { func (slf *Endpoint) onConnectionReceivePacket(conn *client.Websocket, packet server.Packet) {
p := UnpackGatewayPacket(packet) p := UnpackGatewayPacket(packet)
packet.Data = p.Data packet.Data = p.Data

View File

@ -3,7 +3,7 @@ package gateway
import ( import (
"github.com/kercylan98/minotaur/server" "github.com/kercylan98/minotaur/server"
"github.com/kercylan98/minotaur/utils/concurrent" "github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/slice" "github.com/kercylan98/minotaur/utils/random"
) )
// NewEndpointManager 创建网关端点管理器 // NewEndpointManager 创建网关端点管理器
@ -11,6 +11,9 @@ func NewEndpointManager() *EndpointManager {
em := &EndpointManager{ em := &EndpointManager{
endpoints: concurrent.NewBalanceMap[string, []*Endpoint](), endpoints: concurrent.NewBalanceMap[string, []*Endpoint](),
memory: concurrent.NewBalanceMap[string, *Endpoint](), memory: concurrent.NewBalanceMap[string, *Endpoint](),
selector: func(endpoints []*Endpoint) *Endpoint {
return endpoints[random.Int(0, len(endpoints)-1)]
},
} }
return em return em
} }
@ -19,6 +22,7 @@ func NewEndpointManager() *EndpointManager {
type EndpointManager struct { type EndpointManager struct {
endpoints *concurrent.BalanceMap[string, []*Endpoint] endpoints *concurrent.BalanceMap[string, []*Endpoint]
memory *concurrent.BalanceMap[string, *Endpoint] memory *concurrent.BalanceMap[string, *Endpoint]
selector func([]*Endpoint) *Endpoint
} }
// GetEndpoint 获取端点 // GetEndpoint 获取端点
@ -35,15 +39,16 @@ func (slf *EndpointManager) GetEndpoint(name string, conn *server.Conn) (*Endpoi
if len(endpoints) == 0 { if len(endpoints) == 0 {
return return
} }
// 随机获取 var available = make([]*Endpoint, 0, len(endpoints))
endpoints = slice.Copy(endpoints)
slice.Shuffle(endpoints)
for _, e := range endpoints { for _, e := range endpoints {
if e.offline || e.state <= 0 { if !e.offline && e.state > 0 {
continue available = append(available, e)
} }
endpoint = e
} }
if len(available) == 0 {
return
}
endpoint = slf.selector(available)
}) })
if endpoint == nil { if endpoint == nil {
return nil, ErrEndpointNotExists return nil, ErrEndpointNotExists

View File

@ -6,11 +6,14 @@ import (
) )
// NewGateway 基于 server.Server 创建网关服务器 // NewGateway 基于 server.Server 创建网关服务器
func NewGateway(srv *server.Server) *Gateway { func NewGateway(srv *server.Server, options ...Option) *Gateway {
gateway := &Gateway{ gateway := &Gateway{
srv: srv, srv: srv,
EndpointManager: NewEndpointManager(), EndpointManager: NewEndpointManager(),
} }
for _, option := range options {
option(gateway)
}
return gateway return gateway
} }

12
server/gateway/options.go Normal file
View File

@ -0,0 +1,12 @@
package gateway
// Option 网关选项
type Option func(gateway *Gateway)
// WithEndpointSelector 设置端点选择器
// - 默认情况下,网关会随机选择一个端点作为目标,如果需要自定义端点选择器,可以通过该选项设置
func WithEndpointSelector(selector func([]*Endpoint) *Endpoint) Option {
return func(gateway *Gateway) {
gateway.EndpointManager.selector = selector
}
}