From 93c5f3695f8e43e84aa7b94d6bedb4d9f4bf0a9b Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Thu, 17 Aug 2023 10:38:16 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20server=20=E5=8C=85=E4=BC=98=E5=8C=96=20S?= =?UTF-8?q?hutdown=20=E9=80=BB=E8=BE=91=EF=BC=8C=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=99=A8=E5=85=B3=E9=97=AD=E6=97=B6=E4=B8=8D?= =?UTF-8?q?=E4=BC=9A=E7=AD=89=E5=BE=85=E6=B6=88=E6=81=AF=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=AE=8C=E6=AF=95=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/gnet.go | 14 +------------- server/network.go | 13 +++++++++++++ server/server.go | 23 +++++++++++++++++++---- 3 files changed, 33 insertions(+), 17 deletions(-) diff --git a/server/gnet.go b/server/gnet.go index 06877eb..8231fad 100644 --- a/server/gnet.go +++ b/server/gnet.go @@ -2,9 +2,6 @@ package server import ( "bytes" - "context" - "fmt" - "github.com/kercylan98/minotaur/utils/log" "github.com/panjf2000/gnet" "time" ) @@ -18,12 +15,7 @@ func (slf *gNet) OnInitComplete(server gnet.Server) (action gnet.Action) { } func (slf *gNet) OnShutdown(server gnet.Server) { - slf.closeChannel <- struct{}{} - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - if err := gnet.Stop(ctx, fmt.Sprintf("%s://%s", slf.network, slf.addr)); err != nil { - log.Error("Server", log.String("Minotaur GNet Server", "Shutdown"), log.Err(err)) - } + return } func (slf *gNet) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) { @@ -52,9 +44,5 @@ func (slf *gNet) React(packet []byte, c gnet.Conn) (out []byte, action gnet.Acti } func (slf *gNet) Tick() (delay time.Duration, action gnet.Action) { - delay = 1 * time.Second - if slf.isShutdown.Load() { - return 0, gnet.Shutdown - } return } diff --git a/server/network.go b/server/network.go index 97f1bac..62ce882 100644 --- a/server/network.go +++ b/server/network.go @@ -1,5 +1,7 @@ package server +import "github.com/kercylan98/minotaur/utils/slice" + type Network string const ( @@ -19,3 +21,14 @@ const ( NetworkKcp Network = "kcp" NetworkGRPC Network = "grpc" ) + +var ( + networks = []Network{ + NetworkNone, NetworkTcp, NetworkTcp4, NetworkTcp6, NetworkUdp, NetworkUdp4, NetworkUdp6, NetworkUnix, NetworkHttp, NetworkWebsocket, NetworkKcp, NetworkGRPC, + } +) + +// GetNetworks 获取所有支持的网络模式 +func GetNetworks() []Network { + return slice.Copy(networks) +} diff --git a/server/server.go b/server/server.go index 5b299ff..850cf4a 100644 --- a/server/server.go +++ b/server/server.go @@ -96,6 +96,7 @@ type Server struct { shuntChannels *concurrent.BalanceMap[int64, chan *Message] // 分流管道 channelGenerator func(guid int64) chan *Message // 消息管道生成器 shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool) // 分流管道匹配器 + messageCounter atomic.Int64 // 消息计数器 } // Run 使用特定地址运行服务器 @@ -411,6 +412,12 @@ func (slf *Server) Shutdown() { // shutdown 停止运行服务器 func (slf *Server) shutdown(err error) { + slf.isShutdown.Store(true) + for slf.messageCounter.Load() > 0 { + log.Info("Server", log.Any("network", slf.network), log.String("listen", slf.addr), + log.String("action", "shutdown"), log.String("state", "waiting"), log.Int64("message", slf.messageCounter.Load())) + time.Sleep(time.Second) + } if slf.multiple == nil { slf.OnStopEvent() } @@ -419,7 +426,6 @@ func (slf *Server) shutdown(err error) { slf.multipleRuntimeErrorChan <- err } }() - slf.isShutdown.Store(true) if slf.ticker != nil { slf.ticker.Release() } @@ -453,6 +459,11 @@ func (slf *Server) shutdown(err error) { log.Error("Server", log.Err(shutdownErr)) } } + if slf.gServer != nil && slf.isRunning { + if shutdownErr := gnet.Stop(context.Background(), fmt.Sprintf("%s://%s", slf.network, slf.addr)); err != nil { + log.Error("Server", log.Err(shutdownErr)) + } + } if err != nil { if slf.multiple != nil { @@ -474,9 +485,7 @@ func (slf *Server) shutdown(err error) { log.Info("Server", log.Any("network", slf.network), log.String("listen", slf.addr), log.String("action", "shutdown"), log.String("state", "normal")) } - if slf.gServer == nil { - slf.closeChannel <- struct{}{} - } + slf.closeChannel <- struct{}{} } // GRPCServer 当网络类型为 NetworkGRPC 时将被允许获取 grpc 服务器,否则将会发生 panic @@ -514,6 +523,9 @@ func (slf *Server) pushMessage(message *Message) { slf.messagePool.Release(message) return } + if slf.isShutdown.Load() { + return + } if slf.shuntChannels != nil && (message.t == MessageTypePacket) { conn := message.attrs[0].(*Conn) channelGuid, allowToCreate := slf.shuntMatcher(conn) @@ -546,6 +558,7 @@ func (slf *Server) low(message *Message, present time.Time, expect time.Duration // dispatchMessage 消息分发 func (slf *Server) dispatchMessage(msg *Message) { + slf.messageCounter.Add(1) var ( ctx context.Context cancel context.CancelFunc @@ -579,6 +592,7 @@ func (slf *Server) dispatchMessage(msg *Message) { super.Handle(cancel) slf.low(msg, present, time.Millisecond*100) + slf.messageCounter.Add(-1) if !slf.isShutdown.Load() { slf.messagePool.Release(msg) @@ -623,6 +637,7 @@ func (slf *Server) dispatchMessage(msg *Message) { } super.Handle(cancel) slf.low(msg, present, time.Second) + slf.messageCounter.Add(-1) if !slf.isShutdown.Load() { slf.messagePool.Release(msg)