fix: 多服务器情况下日志错乱及无法正常 Shuntdown 问题修复

This commit is contained in:
kercylan98 2023-06-30 18:16:58 +08:00
parent 2eaccef470
commit 67616b2963
4 changed files with 104 additions and 27 deletions

View File

@ -2,7 +2,11 @@ package server
import ( import (
"bytes" "bytes"
"context"
"fmt"
"github.com/kercylan98/minotaur/utils/log"
"github.com/panjf2000/gnet" "github.com/panjf2000/gnet"
"go.uber.org/zap"
"time" "time"
) )
@ -15,7 +19,12 @@ func (slf *gNet) OnInitComplete(server gnet.Server) (action gnet.Action) {
} }
func (slf *gNet) OnShutdown(server gnet.Server) { func (slf *gNet) OnShutdown(server gnet.Server) {
slf.closeChannel <- struct{}{}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := gnet.Stop(ctx, fmt.Sprintf("%s://%s", slf.network, slf.addr)); err != nil {
log.Error("Server", zap.String("Minotaur GNet Server", "Shutdown"), zap.Error(err))
}
} }
func (slf *gNet) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) { func (slf *gNet) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
@ -44,5 +53,9 @@ func (slf *gNet) React(packet []byte, c gnet.Conn) (out []byte, action gnet.Acti
} }
func (slf *gNet) Tick() (delay time.Duration, action gnet.Action) { func (slf *gNet) Tick() (delay time.Duration, action gnet.Action) {
delay = 1 * time.Second
if slf.isShutdown.Load() {
return 0, gnet.Shutdown
}
return return
} }

View File

@ -6,7 +6,6 @@ import (
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"time"
) )
func NewMultipleServer(serverHandle ...func() (addr string, srv *Server)) *MultipleServer { func NewMultipleServer(serverHandle ...func() (addr string, srv *Server)) *MultipleServer {
@ -21,17 +20,21 @@ func NewMultipleServer(serverHandle ...func() (addr string, srv *Server)) *Multi
} }
type MultipleServer struct { type MultipleServer struct {
servers []*Server servers []*Server
addresses []string addresses []string
exitEventHandles []func()
} }
func (slf *MultipleServer) Run() { func (slf *MultipleServer) Run() {
var exceptionChannel = make(chan error, 1) var exceptionChannel = make(chan error, 1)
var runtimeExceptionChannel = make(chan error, 1)
defer close(exceptionChannel) defer close(exceptionChannel)
defer close(runtimeExceptionChannel)
var running = make([]*Server, 0, len(slf.servers)) var running = make([]*Server, 0, len(slf.servers))
for i := 0; i < len(slf.servers); i++ { for i := 0; i < len(slf.servers); i++ {
go func(address string, server *Server) { go func(address string, server *Server) {
server.multiple = true server.multiple = slf
server.multipleRuntimeErrorChan = runtimeExceptionChannel
if err := server.Run(address); err != nil { if err := server.Run(address); err != nil {
exceptionChannel <- err exceptionChannel <- err
} else { } else {
@ -40,8 +43,6 @@ func (slf *MultipleServer) Run() {
}(slf.addresses[i], slf.servers[i]) }(slf.addresses[i], slf.servers[i])
} }
time.Sleep(500 * time.Millisecond)
log.Info("Server", zap.String("Minotaur Multiple Server", "====================================================================")) log.Info("Server", zap.String("Minotaur Multiple Server", "===================================================================="))
for _, server := range slf.servers { for _, server := range slf.servers {
log.Info("Server", zap.String("Minotaur Multiple Server", "RunningInfo"), log.Info("Server", zap.String("Minotaur Multiple Server", "RunningInfo"),
@ -55,14 +56,39 @@ func (slf *MultipleServer) Run() {
signal.Notify(systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) signal.Notify(systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
select { select {
case err := <-exceptionChannel: case err := <-exceptionChannel:
for _, server := range slf.servers { for len(slf.servers) > 0 {
server := slf.servers[0]
server.Shutdown(err) server.Shutdown(err)
slf.servers = slf.servers[1:]
} }
return break
case <-systemSignal: case <-runtimeExceptionChannel:
for _, server := range slf.servers { for len(slf.servers) > 0 {
server := slf.servers[0]
server.multipleRuntimeErrorChan = nil
server.Shutdown(nil) server.Shutdown(nil)
slf.servers = slf.servers[1:]
} }
return break
case <-systemSignal:
for len(slf.servers) > 0 {
server := slf.servers[0]
server.Shutdown(nil)
slf.servers = slf.servers[1:]
}
break
}
slf.OnExitEvent()
}
// RegExitEvent 注册退出事件
func (slf *MultipleServer) RegExitEvent(handle func()) {
slf.exitEventHandles = append(slf.exitEventHandles, handle)
}
func (slf *MultipleServer) OnExitEvent() {
for _, handle := range slf.exitEventHandles {
handle()
} }
} }

View File

@ -6,9 +6,11 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/super"
"github.com/kercylan98/minotaur/utils/synchronization" "github.com/kercylan98/minotaur/utils/synchronization"
"github.com/kercylan98/minotaur/utils/timer" "github.com/kercylan98/minotaur/utils/timer"
"github.com/panjf2000/gnet" "github.com/panjf2000/gnet"
"github.com/panjf2000/gnet/pkg/logging"
"github.com/xtaci/kcp-go/v5" "github.com/xtaci/kcp-go/v5"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -62,6 +64,7 @@ type Server struct {
grpcServer *grpc.Server // GRPC模式下的服务器 grpcServer *grpc.Server // GRPC模式下的服务器
supportMessageTypes map[int]bool // websocket模式下支持的消息类型 supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件 certFile, keyFile string // TLS文件
isRunning bool // 是否正在运行
isShutdown atomic.Bool // 是否已关闭 isShutdown atomic.Bool // 是否已关闭
closeChannel chan struct{} // 关闭信号 closeChannel chan struct{} // 关闭信号
@ -70,11 +73,13 @@ type Server struct {
messagePoolSize int // 消息池大小 messagePoolSize int // 消息池大小
messageChannel map[int]chan *Message // 消息管道 messageChannel map[int]chan *Message // 消息管道
initMessageChannel bool // 消息管道是否已经初始化 initMessageChannel bool // 消息管道是否已经初始化
multiple bool // 是否为多服务器模式下运行 multiple *MultipleServer // 多服务器模式下的服务器
prod bool // 是否为生产模式 prod bool // 是否为生产模式
core int // 消息处理核心数 core int // 消息处理核心数
websocketWriteMessageType int // websocket写入的消息类型 websocketWriteMessageType int // websocket写入的消息类型
ticker *timer.Ticker // 定时器 ticker *timer.Ticker // 定时器
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
} }
// Run 使用特定地址运行服务器 // Run 使用特定地址运行服务器
@ -137,15 +142,24 @@ func (slf *Server) Run(addr string) error {
return err return err
} }
go func() { go func() {
slf.isRunning = true
slf.OnStartBeforeEvent() slf.OnStartBeforeEvent()
if err := slf.grpcServer.Serve(listener); err != nil { if err := slf.grpcServer.Serve(listener); err != nil {
slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
} }
}() }()
case NetworkTcp, NetworkTcp4, NetworkTcp6, NetworkUdp, NetworkUdp4, NetworkUdp6, NetworkUnix: case NetworkTcp, NetworkTcp4, NetworkTcp6, NetworkUdp, NetworkUdp4, NetworkUdp6, NetworkUnix:
go connectionInitHandle(func() { go connectionInitHandle(func() {
slf.isRunning = true
slf.OnStartBeforeEvent() slf.OnStartBeforeEvent()
if err := gnet.Serve(slf.gServer, protoAddr); err != nil { if err := gnet.Serve(slf.gServer, protoAddr,
gnet.WithLogger(log.Logger().Sugar()),
gnet.WithLogLevel(super.If(slf.IsProd(), logging.ErrorLevel, logging.DebugLevel)),
gnet.WithTicker(true),
gnet.WithMulticore(true),
); err != nil {
slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
} }
}) })
@ -155,6 +169,7 @@ func (slf *Server) Run(addr string) error {
return err return err
} }
go connectionInitHandle(func() { go connectionInitHandle(func() {
slf.isRunning = true
slf.OnStartBeforeEvent() slf.OnStartBeforeEvent()
for { for {
session, err := listener.AcceptKCP() session, err := listener.AcceptKCP()
@ -189,14 +204,17 @@ func (slf *Server) Run(addr string) error {
gin.SetMode(gin.ReleaseMode) gin.SetMode(gin.ReleaseMode)
} }
go func() { go func() {
slf.isRunning = true
slf.OnStartBeforeEvent() slf.OnStartBeforeEvent()
slf.httpServer.Addr = slf.addr slf.httpServer.Addr = slf.addr
if len(slf.certFile)+len(slf.keyFile) > 0 { if len(slf.certFile)+len(slf.keyFile) > 0 {
if err := slf.httpServer.ListenAndServeTLS(slf.certFile, slf.keyFile); err != nil { if err := slf.httpServer.ListenAndServeTLS(slf.certFile, slf.keyFile); err != nil {
slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
} }
} else { } else {
if err := slf.httpServer.ListenAndServe(); err != nil { if err := slf.httpServer.ListenAndServe(); err != nil {
slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
} }
} }
@ -263,13 +281,16 @@ func (slf *Server) Run(addr string) error {
} }
}) })
go func() { go func() {
slf.isRunning = true
slf.OnStartBeforeEvent() slf.OnStartBeforeEvent()
if len(slf.certFile)+len(slf.keyFile) > 0 { if len(slf.certFile)+len(slf.keyFile) > 0 {
if err := http.ListenAndServeTLS(slf.addr, slf.certFile, slf.keyFile, nil); err != nil { if err := http.ListenAndServeTLS(slf.addr, slf.certFile, slf.keyFile, nil); err != nil {
slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
} }
} else { } else {
if err := http.ListenAndServe(slf.addr, nil); err != nil { if err := http.ListenAndServe(slf.addr, nil); err != nil {
slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
} }
} }
@ -280,7 +301,7 @@ func (slf *Server) Run(addr string) error {
return ErrCanNotSupportNetwork return ErrCanNotSupportNetwork
} }
if !slf.multiple { if slf.multiple == nil {
log.Info("Server", zap.String("Minotaur Server", "====================================================================")) log.Info("Server", zap.String("Minotaur Server", "===================================================================="))
log.Info("Server", zap.String("Minotaur Server", "RunningInfo"), log.Info("Server", zap.String("Minotaur Server", "RunningInfo"),
zap.Any("network", slf.network), zap.Any("network", slf.network),
@ -334,6 +355,11 @@ func (slf *Server) Ticker() *timer.Ticker {
// Shutdown 停止运行服务器 // Shutdown 停止运行服务器
func (slf *Server) Shutdown(err error, stack ...string) { func (slf *Server) Shutdown(err error, stack ...string) {
defer func() {
if slf.multipleRuntimeErrorChan != nil {
slf.multipleRuntimeErrorChan <- err
}
}()
slf.isShutdown.Store(true) slf.isShutdown.Store(true)
if slf.ticker != nil { if slf.ticker != nil {
slf.ticker.Release() slf.ticker.Release()
@ -342,23 +368,16 @@ func (slf *Server) Shutdown(err error, stack ...string) {
cross.Release() cross.Release()
} }
if slf.initMessageChannel { if slf.initMessageChannel {
if slf.gServer != nil {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if shutdownErr := gnet.Stop(ctx, fmt.Sprintf("%s://%s", slf.network, slf.addr)); shutdownErr != nil {
log.Error("Server", zap.Error(shutdownErr))
}
}
for _, messageChannel := range slf.messageChannel { for _, messageChannel := range slf.messageChannel {
close(messageChannel) close(messageChannel)
} }
slf.messagePool.Close() slf.messagePool.Close()
slf.initMessageChannel = false slf.initMessageChannel = false
} }
if slf.grpcServer != nil { if slf.grpcServer != nil && slf.isRunning {
slf.grpcServer.GracefulStop() slf.grpcServer.GracefulStop()
} }
if slf.httpServer != nil { if slf.httpServer != nil && slf.isRunning {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel() defer cancel()
if shutdownErr := slf.httpServer.Shutdown(ctx); shutdownErr != nil { if shutdownErr := slf.httpServer.Shutdown(ctx); shutdownErr != nil {
@ -371,13 +390,28 @@ func (slf *Server) Shutdown(err error, stack ...string) {
if len(stack) > 0 { if len(stack) > 0 {
s = stack[0] s = stack[0]
} }
log.ErrorWithStack("Server", s, zap.Any("network", slf.network), zap.String("listen", slf.addr), if slf.multiple != nil {
zap.String("action", "shutdown"), zap.String("state", "exception"), zap.Error(err)) slf.multiple.RegExitEvent(func() {
log.ErrorWithStack("Server", s, zap.Any("network", slf.network), zap.String("listen", slf.addr),
zap.String("action", "shutdown"), zap.String("state", "exception"), zap.Error(err))
})
for i, server := range slf.multiple.servers {
if server.addr == slf.addr {
slf.multiple.servers = append(slf.multiple.servers[:i], slf.multiple.servers[i+1:]...)
break
}
}
} else {
log.ErrorWithStack("Server", s, zap.Any("network", slf.network), zap.String("listen", slf.addr),
zap.String("action", "shutdown"), zap.String("state", "exception"), zap.Error(err))
}
} else { } else {
log.Info("Server", zap.Any("network", slf.network), zap.String("listen", slf.addr), log.Info("Server", zap.Any("network", slf.network), zap.String("listen", slf.addr),
zap.String("action", "shutdown"), zap.String("state", "normal")) zap.String("action", "shutdown"), zap.String("state", "normal"))
} }
slf.closeChannel <- struct{}{} if slf.gServer == nil {
slf.closeChannel <- struct{}{}
}
} }
func (slf *Server) GRPCServer() *grpc.Server { func (slf *Server) GRPCServer() *grpc.Server {

View File

@ -96,6 +96,10 @@ func getWriter(filename string, times int32) io.Writer {
return hook return hook
} }
func Logger() *zap.Logger {
return logger
}
func Info(msg string, fields ...zap.Field) { func Info(msg string, fields ...zap.Field) {
logger.Info(msg, fields...) logger.Info(msg, fields...)
} }