From ad1499e9b0001134a6c445655d3b27008a637b7b Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Tue, 16 May 2023 12:51:08 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B7=A8=E6=9C=8D=E6=94=AF=E6=8C=81=E5=90=8D?= =?UTF-8?q?=E7=A7=B0=E9=9A=94=E7=A6=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/corss/nats.go | 4 ++++ server/cross.go | 2 ++ server/errors.go | 1 + server/options.go | 11 ++++++++--- server/server.go | 44 +++++++++++++++++++++++++++----------------- 5 files changed, 42 insertions(+), 20 deletions(-) diff --git a/server/corss/nats.go b/server/corss/nats.go index df8095c..42204b6 100644 --- a/server/corss/nats.go +++ b/server/corss/nats.go @@ -65,3 +65,7 @@ func (slf *Nats) PushMessage(serverId int64, packet []byte) error { } return slf.conn.Publish(fmt.Sprintf("%s_%d", slf.subject, serverId), data) } + +func (slf *Nats) Release() { + slf.conn.Close() +} diff --git a/server/cross.go b/server/cross.go index 28d1c10..bafdc85 100644 --- a/server/cross.go +++ b/server/cross.go @@ -9,4 +9,6 @@ type Cross interface { // PushMessage 推送跨服消息 // - serverId: 目标服务器id PushMessage(serverId int64, packet []byte) error + // Release 释放资源 + Release() } diff --git a/server/errors.go b/server/errors.go index 922aea5..ed59d27 100644 --- a/server/errors.go +++ b/server/errors.go @@ -21,4 +21,5 @@ var ( ErrOnlySupportSocket = errors.New("only supports Socket programming") ErrNoSupportCross = errors.New("the server does not support GetID or PushCrossMessage, please use the WithCross option to create the server") ErrNoSupportTicker = errors.New("the server does not support Ticker, please use the WithTicker option to create the server") + ErrUnregisteredCrossName = errors.New("unregistered cross name, please use the WithCross option to create the server") ) diff --git a/server/options.go b/server/options.go index 1c7ba00..84eda17 100644 --- a/server/options.go +++ b/server/options.go @@ -40,11 +40,16 @@ func WithTicker(size int, autonomy bool) Option { } // WithCross 通过跨服的方式创建服务器 -func WithCross(serverId int64, cross Cross) Option { +// - 推送跨服消息时,将推送到对应crossName的跨服中间件中,crossName可以满足不同功能采用不同的跨服/消息中间件 +// - 通常情况下crossName仅需一个即可 +func WithCross(crossName string, serverId int64, cross Cross) Option { return func(srv *Server) { srv.id = serverId - srv.cross = cross - err := srv.cross.Init(srv, func(serverId int64, packet []byte) { + if srv.cross == nil { + srv.cross = map[string]Cross{} + } + srv.cross[crossName] = cross + err := cross.Init(srv, func(serverId int64, packet []byte) { srv.PushMessage(MessageTypeCross, serverId, packet) }) if err != nil { diff --git a/server/server.go b/server/server.go index 289d415..25de728 100644 --- a/server/server.go +++ b/server/server.go @@ -53,19 +53,19 @@ func New(network Network, options ...Option) *Server { // Server 网络服务器 type Server struct { - *event // 事件 - cross Cross // 跨服 - id int64 // 服务器id - network Network // 网络类型 - addr string // 侦听地址 - options []Option // 选项 - ginServer *gin.Engine // HTTP模式下的路由器 - httpServer *http.Server // HTTP模式下的服务器 - grpcServer *grpc.Server // GRPC模式下的服务器 - supportMessageTypes map[int]bool // websocket模式下支持的消息类型 - certFile, keyFile string // TLS文件 - isShutdown atomic.Bool // 是否已关闭 - closeChannel chan struct{} // 关闭信号 + *event // 事件 + cross map[string]Cross // 跨服 + id int64 // 服务器id + network Network // 网络类型 + addr string // 侦听地址 + options []Option // 选项 + ginServer *gin.Engine // HTTP模式下的路由器 + httpServer *http.Server // HTTP模式下的服务器 + grpcServer *grpc.Server // GRPC模式下的服务器 + supportMessageTypes map[int]bool // websocket模式下支持的消息类型 + certFile, keyFile string // TLS文件 + isShutdown atomic.Bool // 是否已关闭 + closeChannel chan struct{} // 关闭信号 gServer *gNet // TCP或UDP模式下的服务器 messagePool *synchronization.Pool[*message] // 消息池 @@ -345,6 +345,12 @@ func (slf *Server) Ticker() *timer.Ticker { // Shutdown 停止运行服务器 func (slf *Server) Shutdown(err error) { slf.isShutdown.Store(true) + if slf.ticker != nil { + slf.ticker.Release() + } + for _, cross := range slf.cross { + cross.Release() + } if len(slf.diversionMessageChannels) > 0 { for i := 0; i < len(slf.diversionMessageChannels); i++ { close(slf.diversionMessageChannels[i]) @@ -411,12 +417,16 @@ func (slf *Server) PushMessage(messageType MessageType, attrs ...any) { } } -// PushCrossMessage 推送跨服消息 -func (slf *Server) PushCrossMessage(serverId int64, packet []byte) error { - if slf.cross == nil { +// PushCrossMessage 推送跨服消息到特定跨服的服务器中 +func (slf *Server) PushCrossMessage(crossName string, serverId int64, packet []byte) error { + if len(slf.cross) == 0 { return ErrNoSupportCross } - return slf.cross.PushMessage(serverId, packet) + cross, exist := slf.cross[crossName] + if !exist { + return ErrUnregisteredCrossName + } + return cross.PushMessage(serverId, packet) } // dispatchMessage 消息分发