diff --git a/server/gnet.go b/server/gnet.go index 3dd0a79..6cfe774 100644 --- a/server/gnet.go +++ b/server/gnet.go @@ -2,7 +2,11 @@ package server import ( "bytes" + "context" + "fmt" + "github.com/kercylan98/minotaur/utils/log" "github.com/panjf2000/gnet" + "go.uber.org/zap" "time" ) @@ -15,7 +19,12 @@ func (slf *gNet) OnInitComplete(server gnet.Server) (action gnet.Action) { } func (slf *gNet) OnShutdown(server gnet.Server) { - + slf.closeChannel <- struct{}{} + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + if err := gnet.Stop(ctx, fmt.Sprintf("%s://%s", slf.network, slf.addr)); err != nil { + log.Error("Server", zap.String("Minotaur GNet Server", "Shutdown"), zap.Error(err)) + } } func (slf *gNet) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) { @@ -44,5 +53,9 @@ func (slf *gNet) React(packet []byte, c gnet.Conn) (out []byte, action gnet.Acti } func (slf *gNet) Tick() (delay time.Duration, action gnet.Action) { + delay = 1 * time.Second + if slf.isShutdown.Load() { + return 0, gnet.Shutdown + } return } diff --git a/server/multiple.go b/server/multiple.go index 675e261..7f58b15 100644 --- a/server/multiple.go +++ b/server/multiple.go @@ -6,7 +6,6 @@ import ( "os" "os/signal" "syscall" - "time" ) func NewMultipleServer(serverHandle ...func() (addr string, srv *Server)) *MultipleServer { @@ -21,17 +20,21 @@ func NewMultipleServer(serverHandle ...func() (addr string, srv *Server)) *Multi } type MultipleServer struct { - servers []*Server - addresses []string + servers []*Server + addresses []string + exitEventHandles []func() } func (slf *MultipleServer) Run() { var exceptionChannel = make(chan error, 1) + var runtimeExceptionChannel = make(chan error, 1) defer close(exceptionChannel) + defer close(runtimeExceptionChannel) var running = make([]*Server, 0, len(slf.servers)) for i := 0; i < len(slf.servers); i++ { go func(address string, server *Server) { - server.multiple = true + server.multiple = slf + server.multipleRuntimeErrorChan = runtimeExceptionChannel if err := server.Run(address); err != nil { exceptionChannel <- err } else { @@ -40,8 +43,6 @@ func (slf *MultipleServer) Run() { }(slf.addresses[i], slf.servers[i]) } - time.Sleep(500 * time.Millisecond) - log.Info("Server", zap.String("Minotaur Multiple Server", "====================================================================")) for _, server := range slf.servers { log.Info("Server", zap.String("Minotaur Multiple Server", "RunningInfo"), @@ -55,14 +56,39 @@ func (slf *MultipleServer) Run() { signal.Notify(systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) select { case err := <-exceptionChannel: - for _, server := range slf.servers { + for len(slf.servers) > 0 { + server := slf.servers[0] server.Shutdown(err) + slf.servers = slf.servers[1:] } - return - case <-systemSignal: - for _, server := range slf.servers { + break + case <-runtimeExceptionChannel: + for len(slf.servers) > 0 { + server := slf.servers[0] + server.multipleRuntimeErrorChan = nil server.Shutdown(nil) + slf.servers = slf.servers[1:] } - return + break + case <-systemSignal: + for len(slf.servers) > 0 { + server := slf.servers[0] + server.Shutdown(nil) + slf.servers = slf.servers[1:] + } + break + } + + slf.OnExitEvent() +} + +// RegExitEvent 注册退出事件 +func (slf *MultipleServer) RegExitEvent(handle func()) { + slf.exitEventHandles = append(slf.exitEventHandles, handle) +} + +func (slf *MultipleServer) OnExitEvent() { + for _, handle := range slf.exitEventHandles { + handle() } } diff --git a/server/server.go b/server/server.go index fe1ef99..e32303c 100644 --- a/server/server.go +++ b/server/server.go @@ -6,9 +6,11 @@ import ( "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/kercylan98/minotaur/utils/log" + "github.com/kercylan98/minotaur/utils/super" "github.com/kercylan98/minotaur/utils/synchronization" "github.com/kercylan98/minotaur/utils/timer" "github.com/panjf2000/gnet" + "github.com/panjf2000/gnet/pkg/logging" "github.com/xtaci/kcp-go/v5" "go.uber.org/zap" "google.golang.org/grpc" @@ -62,6 +64,7 @@ type Server struct { grpcServer *grpc.Server // GRPC模式下的服务器 supportMessageTypes map[int]bool // websocket模式下支持的消息类型 certFile, keyFile string // TLS文件 + isRunning bool // 是否正在运行 isShutdown atomic.Bool // 是否已关闭 closeChannel chan struct{} // 关闭信号 @@ -70,11 +73,13 @@ type Server struct { messagePoolSize int // 消息池大小 messageChannel map[int]chan *Message // 消息管道 initMessageChannel bool // 消息管道是否已经初始化 - multiple bool // 是否为多服务器模式下运行 + multiple *MultipleServer // 多服务器模式下的服务器 prod bool // 是否为生产模式 core int // 消息处理核心数 websocketWriteMessageType int // websocket写入的消息类型 ticker *timer.Ticker // 定时器 + + multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 } // Run 使用特定地址运行服务器 @@ -137,15 +142,24 @@ func (slf *Server) Run(addr string) error { return err } go func() { + slf.isRunning = true slf.OnStartBeforeEvent() if err := slf.grpcServer.Serve(listener); err != nil { + slf.isRunning = false slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) } }() case NetworkTcp, NetworkTcp4, NetworkTcp6, NetworkUdp, NetworkUdp4, NetworkUdp6, NetworkUnix: go connectionInitHandle(func() { + slf.isRunning = true slf.OnStartBeforeEvent() - if err := gnet.Serve(slf.gServer, protoAddr); err != nil { + if err := gnet.Serve(slf.gServer, protoAddr, + gnet.WithLogger(log.Logger().Sugar()), + gnet.WithLogLevel(super.If(slf.IsProd(), logging.ErrorLevel, logging.DebugLevel)), + gnet.WithTicker(true), + gnet.WithMulticore(true), + ); err != nil { + slf.isRunning = false slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) } }) @@ -155,6 +169,7 @@ func (slf *Server) Run(addr string) error { return err } go connectionInitHandle(func() { + slf.isRunning = true slf.OnStartBeforeEvent() for { session, err := listener.AcceptKCP() @@ -189,14 +204,17 @@ func (slf *Server) Run(addr string) error { gin.SetMode(gin.ReleaseMode) } go func() { + slf.isRunning = true slf.OnStartBeforeEvent() slf.httpServer.Addr = slf.addr if len(slf.certFile)+len(slf.keyFile) > 0 { if err := slf.httpServer.ListenAndServeTLS(slf.certFile, slf.keyFile); err != nil { + slf.isRunning = false slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) } } else { if err := slf.httpServer.ListenAndServe(); err != nil { + slf.isRunning = false slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) } } @@ -263,13 +281,16 @@ func (slf *Server) Run(addr string) error { } }) go func() { + slf.isRunning = true slf.OnStartBeforeEvent() if len(slf.certFile)+len(slf.keyFile) > 0 { if err := http.ListenAndServeTLS(slf.addr, slf.certFile, slf.keyFile, nil); err != nil { + slf.isRunning = false slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) } } else { if err := http.ListenAndServe(slf.addr, nil); err != nil { + slf.isRunning = false slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) } } @@ -280,7 +301,7 @@ func (slf *Server) Run(addr string) error { return ErrCanNotSupportNetwork } - if !slf.multiple { + if slf.multiple == nil { log.Info("Server", zap.String("Minotaur Server", "====================================================================")) log.Info("Server", zap.String("Minotaur Server", "RunningInfo"), zap.Any("network", slf.network), @@ -334,6 +355,11 @@ func (slf *Server) Ticker() *timer.Ticker { // Shutdown 停止运行服务器 func (slf *Server) Shutdown(err error, stack ...string) { + defer func() { + if slf.multipleRuntimeErrorChan != nil { + slf.multipleRuntimeErrorChan <- err + } + }() slf.isShutdown.Store(true) if slf.ticker != nil { slf.ticker.Release() @@ -342,23 +368,16 @@ func (slf *Server) Shutdown(err error, stack ...string) { cross.Release() } if slf.initMessageChannel { - if slf.gServer != nil { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - if shutdownErr := gnet.Stop(ctx, fmt.Sprintf("%s://%s", slf.network, slf.addr)); shutdownErr != nil { - log.Error("Server", zap.Error(shutdownErr)) - } - } for _, messageChannel := range slf.messageChannel { close(messageChannel) } slf.messagePool.Close() slf.initMessageChannel = false } - if slf.grpcServer != nil { + if slf.grpcServer != nil && slf.isRunning { slf.grpcServer.GracefulStop() } - if slf.httpServer != nil { + if slf.httpServer != nil && slf.isRunning { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() if shutdownErr := slf.httpServer.Shutdown(ctx); shutdownErr != nil { @@ -371,13 +390,28 @@ func (slf *Server) Shutdown(err error, stack ...string) { if len(stack) > 0 { s = stack[0] } - log.ErrorWithStack("Server", s, zap.Any("network", slf.network), zap.String("listen", slf.addr), - zap.String("action", "shutdown"), zap.String("state", "exception"), zap.Error(err)) + if slf.multiple != nil { + slf.multiple.RegExitEvent(func() { + log.ErrorWithStack("Server", s, zap.Any("network", slf.network), zap.String("listen", slf.addr), + zap.String("action", "shutdown"), zap.String("state", "exception"), zap.Error(err)) + }) + for i, server := range slf.multiple.servers { + if server.addr == slf.addr { + slf.multiple.servers = append(slf.multiple.servers[:i], slf.multiple.servers[i+1:]...) + break + } + } + } else { + log.ErrorWithStack("Server", s, zap.Any("network", slf.network), zap.String("listen", slf.addr), + zap.String("action", "shutdown"), zap.String("state", "exception"), zap.Error(err)) + } } else { log.Info("Server", zap.Any("network", slf.network), zap.String("listen", slf.addr), zap.String("action", "shutdown"), zap.String("state", "normal")) } - slf.closeChannel <- struct{}{} + if slf.gServer == nil { + slf.closeChannel <- struct{}{} + } } func (slf *Server) GRPCServer() *grpc.Server { diff --git a/utils/log/log.go b/utils/log/log.go index bf7bc50..b764498 100644 --- a/utils/log/log.go +++ b/utils/log/log.go @@ -96,6 +96,10 @@ func getWriter(filename string, times int32) io.Writer { return hook } +func Logger() *zap.Logger { + return logger +} + func Info(msg string, fields ...zap.Field) { logger.Info(msg, fields...) }