From e0f43c5bfb96654fb682de22ad07af91c8c40958 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Wed, 16 Aug 2023 17:16:14 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20gateway=20=E7=BD=91=E5=85=B3=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E9=80=9A=E8=BF=87=E5=8F=AF=E9=80=89=E9=A1=B9=E8=87=AA?= =?UTF-8?q?=E5=AE=9A=E4=B9=89=E7=AB=AF=E7=82=B9=E9=80=89=E6=8B=A9=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/gateway/endpoint.go | 2 ++ server/gateway/endpoint_manager.go | 19 ++++++++++++------- server/gateway/gateway.go | 5 ++++- server/gateway/options.go | 12 ++++++++++++ 4 files changed, 30 insertions(+), 8 deletions(-) create mode 100644 server/gateway/options.go diff --git a/server/gateway/endpoint.go b/server/gateway/endpoint.go index 07b6fae..86fb05b 100644 --- a/server/gateway/endpoint.go +++ b/server/gateway/endpoint.go @@ -49,12 +49,14 @@ func (slf *Endpoint) Write(packet server.Packet) { slf.client.Write(packet) } +// onConnectionClosed 与端点连接断开事件 func (slf *Endpoint) onConnectionClosed(conn *client.Websocket, err any) { if !slf.offline { go slf.Connect() } } +// onConnectionReceivePacket 解说到来自端点的数据包事件 func (slf *Endpoint) onConnectionReceivePacket(conn *client.Websocket, packet server.Packet) { p := UnpackGatewayPacket(packet) packet.Data = p.Data diff --git a/server/gateway/endpoint_manager.go b/server/gateway/endpoint_manager.go index cb92600..ecda703 100644 --- a/server/gateway/endpoint_manager.go +++ b/server/gateway/endpoint_manager.go @@ -3,7 +3,7 @@ package gateway import ( "github.com/kercylan98/minotaur/server" "github.com/kercylan98/minotaur/utils/concurrent" - "github.com/kercylan98/minotaur/utils/slice" + "github.com/kercylan98/minotaur/utils/random" ) // NewEndpointManager 创建网关端点管理器 @@ -11,6 +11,9 @@ func NewEndpointManager() *EndpointManager { em := &EndpointManager{ endpoints: concurrent.NewBalanceMap[string, []*Endpoint](), memory: concurrent.NewBalanceMap[string, *Endpoint](), + selector: func(endpoints []*Endpoint) *Endpoint { + return endpoints[random.Int(0, len(endpoints)-1)] + }, } return em } @@ -19,6 +22,7 @@ func NewEndpointManager() *EndpointManager { type EndpointManager struct { endpoints *concurrent.BalanceMap[string, []*Endpoint] memory *concurrent.BalanceMap[string, *Endpoint] + selector func([]*Endpoint) *Endpoint } // GetEndpoint 获取端点 @@ -35,15 +39,16 @@ func (slf *EndpointManager) GetEndpoint(name string, conn *server.Conn) (*Endpoi if len(endpoints) == 0 { return } - // 随机获取 - endpoints = slice.Copy(endpoints) - slice.Shuffle(endpoints) + var available = make([]*Endpoint, 0, len(endpoints)) for _, e := range endpoints { - if e.offline || e.state <= 0 { - continue + if !e.offline && e.state > 0 { + available = append(available, e) } - endpoint = e } + if len(available) == 0 { + return + } + endpoint = slf.selector(available) }) if endpoint == nil { return nil, ErrEndpointNotExists diff --git a/server/gateway/gateway.go b/server/gateway/gateway.go index 7c749a4..e50157d 100644 --- a/server/gateway/gateway.go +++ b/server/gateway/gateway.go @@ -6,11 +6,14 @@ import ( ) // NewGateway 基于 server.Server 创建网关服务器 -func NewGateway(srv *server.Server) *Gateway { +func NewGateway(srv *server.Server, options ...Option) *Gateway { gateway := &Gateway{ srv: srv, EndpointManager: NewEndpointManager(), } + for _, option := range options { + option(gateway) + } return gateway } diff --git a/server/gateway/options.go b/server/gateway/options.go new file mode 100644 index 0000000..a467c30 --- /dev/null +++ b/server/gateway/options.go @@ -0,0 +1,12 @@ +package gateway + +// Option 网关选项 +type Option func(gateway *Gateway) + +// WithEndpointSelector 设置端点选择器 +// - 默认情况下,网关会随机选择一个端点作为目标,如果需要自定义端点选择器,可以通过该选项设置 +func WithEndpointSelector(selector func([]*Endpoint) *Endpoint) Option { + return func(gateway *Gateway) { + gateway.EndpointManager.selector = selector + } +}