fix: server 包优化 Shutdown 逻辑,修复服务器关闭时不会等待消息执行完毕的问题

This commit is contained in:
kercylan98 2023-08-17 10:38:16 +08:00
parent dcfb3da534
commit 93c5f3695f
3 changed files with 33 additions and 17 deletions

View File

@ -2,9 +2,6 @@ package server
import ( import (
"bytes" "bytes"
"context"
"fmt"
"github.com/kercylan98/minotaur/utils/log"
"github.com/panjf2000/gnet" "github.com/panjf2000/gnet"
"time" "time"
) )
@ -18,12 +15,7 @@ func (slf *gNet) OnInitComplete(server gnet.Server) (action gnet.Action) {
} }
func (slf *gNet) OnShutdown(server gnet.Server) { func (slf *gNet) OnShutdown(server gnet.Server) {
slf.closeChannel <- struct{}{} return
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := gnet.Stop(ctx, fmt.Sprintf("%s://%s", slf.network, slf.addr)); err != nil {
log.Error("Server", log.String("Minotaur GNet Server", "Shutdown"), log.Err(err))
}
} }
func (slf *gNet) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) { func (slf *gNet) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
@ -52,9 +44,5 @@ func (slf *gNet) React(packet []byte, c gnet.Conn) (out []byte, action gnet.Acti
} }
func (slf *gNet) Tick() (delay time.Duration, action gnet.Action) { func (slf *gNet) Tick() (delay time.Duration, action gnet.Action) {
delay = 1 * time.Second
if slf.isShutdown.Load() {
return 0, gnet.Shutdown
}
return return
} }

View File

@ -1,5 +1,7 @@
package server package server
import "github.com/kercylan98/minotaur/utils/slice"
type Network string type Network string
const ( const (
@ -19,3 +21,14 @@ const (
NetworkKcp Network = "kcp" NetworkKcp Network = "kcp"
NetworkGRPC Network = "grpc" NetworkGRPC Network = "grpc"
) )
var (
networks = []Network{
NetworkNone, NetworkTcp, NetworkTcp4, NetworkTcp6, NetworkUdp, NetworkUdp4, NetworkUdp6, NetworkUnix, NetworkHttp, NetworkWebsocket, NetworkKcp, NetworkGRPC,
}
)
// GetNetworks 获取所有支持的网络模式
func GetNetworks() []Network {
return slice.Copy(networks)
}

View File

@ -96,6 +96,7 @@ type Server struct {
shuntChannels *concurrent.BalanceMap[int64, chan *Message] // 分流管道 shuntChannels *concurrent.BalanceMap[int64, chan *Message] // 分流管道
channelGenerator func(guid int64) chan *Message // 消息管道生成器 channelGenerator func(guid int64) chan *Message // 消息管道生成器
shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool) // 分流管道匹配器 shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool) // 分流管道匹配器
messageCounter atomic.Int64 // 消息计数器
} }
// Run 使用特定地址运行服务器 // Run 使用特定地址运行服务器
@ -411,6 +412,12 @@ func (slf *Server) Shutdown() {
// shutdown 停止运行服务器 // shutdown 停止运行服务器
func (slf *Server) shutdown(err error) { func (slf *Server) shutdown(err error) {
slf.isShutdown.Store(true)
for slf.messageCounter.Load() > 0 {
log.Info("Server", log.Any("network", slf.network), log.String("listen", slf.addr),
log.String("action", "shutdown"), log.String("state", "waiting"), log.Int64("message", slf.messageCounter.Load()))
time.Sleep(time.Second)
}
if slf.multiple == nil { if slf.multiple == nil {
slf.OnStopEvent() slf.OnStopEvent()
} }
@ -419,7 +426,6 @@ func (slf *Server) shutdown(err error) {
slf.multipleRuntimeErrorChan <- err slf.multipleRuntimeErrorChan <- err
} }
}() }()
slf.isShutdown.Store(true)
if slf.ticker != nil { if slf.ticker != nil {
slf.ticker.Release() slf.ticker.Release()
} }
@ -453,6 +459,11 @@ func (slf *Server) shutdown(err error) {
log.Error("Server", log.Err(shutdownErr)) log.Error("Server", log.Err(shutdownErr))
} }
} }
if slf.gServer != nil && slf.isRunning {
if shutdownErr := gnet.Stop(context.Background(), fmt.Sprintf("%s://%s", slf.network, slf.addr)); err != nil {
log.Error("Server", log.Err(shutdownErr))
}
}
if err != nil { if err != nil {
if slf.multiple != nil { if slf.multiple != nil {
@ -474,10 +485,8 @@ func (slf *Server) shutdown(err error) {
log.Info("Server", log.Any("network", slf.network), log.String("listen", slf.addr), log.Info("Server", log.Any("network", slf.network), log.String("listen", slf.addr),
log.String("action", "shutdown"), log.String("state", "normal")) log.String("action", "shutdown"), log.String("state", "normal"))
} }
if slf.gServer == nil {
slf.closeChannel <- struct{}{} slf.closeChannel <- struct{}{}
} }
}
// GRPCServer 当网络类型为 NetworkGRPC 时将被允许获取 grpc 服务器,否则将会发生 panic // GRPCServer 当网络类型为 NetworkGRPC 时将被允许获取 grpc 服务器,否则将会发生 panic
func (slf *Server) GRPCServer() *grpc.Server { func (slf *Server) GRPCServer() *grpc.Server {
@ -514,6 +523,9 @@ func (slf *Server) pushMessage(message *Message) {
slf.messagePool.Release(message) slf.messagePool.Release(message)
return return
} }
if slf.isShutdown.Load() {
return
}
if slf.shuntChannels != nil && (message.t == MessageTypePacket) { if slf.shuntChannels != nil && (message.t == MessageTypePacket) {
conn := message.attrs[0].(*Conn) conn := message.attrs[0].(*Conn)
channelGuid, allowToCreate := slf.shuntMatcher(conn) channelGuid, allowToCreate := slf.shuntMatcher(conn)
@ -546,6 +558,7 @@ func (slf *Server) low(message *Message, present time.Time, expect time.Duration
// dispatchMessage 消息分发 // dispatchMessage 消息分发
func (slf *Server) dispatchMessage(msg *Message) { func (slf *Server) dispatchMessage(msg *Message) {
slf.messageCounter.Add(1)
var ( var (
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@ -579,6 +592,7 @@ func (slf *Server) dispatchMessage(msg *Message) {
super.Handle(cancel) super.Handle(cancel)
slf.low(msg, present, time.Millisecond*100) slf.low(msg, present, time.Millisecond*100)
slf.messageCounter.Add(-1)
if !slf.isShutdown.Load() { if !slf.isShutdown.Load() {
slf.messagePool.Release(msg) slf.messagePool.Release(msg)
@ -623,6 +637,7 @@ func (slf *Server) dispatchMessage(msg *Message) {
} }
super.Handle(cancel) super.Handle(cancel)
slf.low(msg, present, time.Second) slf.low(msg, present, time.Second)
slf.messageCounter.Add(-1)
if !slf.isShutdown.Load() { if !slf.isShutdown.Load() {
slf.messagePool.Release(msg) slf.messagePool.Release(msg)