服务器优化

This commit is contained in:
kercylan98 2023-05-11 11:24:02 +08:00
parent 61ba4cfad6
commit dbdff6ca5d
1 changed files with 104 additions and 87 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/synchronization"
"github.com/panjf2000/gnet"
"github.com/pkg/errors"
"github.com/xtaci/kcp-go/v5"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -15,7 +16,9 @@ import (
"net/http"
"os"
"os/signal"
"runtime/debug"
"strings"
"sync/atomic"
"syscall"
"time"
)
@ -27,6 +30,7 @@ func New(network Network, options ...Option) *Server {
network: network,
options: options,
core: 1,
closeChannel: make(chan struct{}),
}
server.event.Server = server
@ -55,6 +59,8 @@ type Server struct {
grpcServer *grpc.Server // GRPC模式下的服务器
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件
isShutdown atomic.Bool // 是否已关闭
closeChannel chan struct{} // 关闭信号
gServer *gNet // TCP或UDP模式下的服务器
messagePool *synchronization.Pool[*message] // 消息池
@ -93,7 +99,8 @@ func (slf *Server) Run(addr string) error {
slf.messagePool = synchronization.NewPool[*message](slf.messagePoolSize,
func() *message {
return &message{}
}, func(data *message) {
},
func(data *message) {
data.t = 0
data.attrs = nil
},
@ -123,14 +130,14 @@ func (slf *Server) Run(addr string) error {
go func() {
slf.OnStartBeforeEvent()
if err := slf.grpcServer.Serve(listener); err != nil {
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
slf.PushMessage(MessageTypeError, errors.WithMessage(err, string(debug.Stack())), MessageErrorActionShutdown)
}
}()
case NetworkTCP, NetworkTCP4, NetworkTCP6, NetworkUdp, NetworkUdp4, NetworkUdp6, NetworkUnix:
go connectionInitHandle(func() {
slf.OnStartBeforeEvent()
if err := gnet.Serve(slf.gServer, protoAddr); err != nil {
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
slf.PushMessage(MessageTypeError, errors.WithMessage(err, string(debug.Stack())), MessageErrorActionShutdown)
}
})
case NetworkKcp:
@ -178,23 +185,24 @@ func (slf *Server) Run(addr string) error {
slf.httpServer.Addr = slf.addr
if len(slf.certFile)+len(slf.keyFile) > 0 {
if err := slf.httpServer.ListenAndServeTLS(slf.certFile, slf.keyFile); err != nil {
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
slf.PushMessage(MessageTypeError, errors.WithMessage(err, string(debug.Stack())), MessageErrorActionShutdown)
}
} else {
if err := slf.httpServer.ListenAndServe(); err != nil {
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
slf.PushMessage(MessageTypeError, errors.WithMessage(err, string(debug.Stack())), MessageErrorActionShutdown)
}
}
}()
case NetworkWebsocket:
go connectionInitHandle(nil)
go connectionInitHandle(func() {
var pattern string
var index = strings.Index(addr, "/")
if index == -1 {
pattern = "/"
} else {
pattern = addr[index:]
slf.addr = slf.addr[:index]
}
var upgrade = websocket.Upgrader{
ReadBufferSize: 4096,
@ -251,15 +259,16 @@ func (slf *Server) Run(addr string) error {
slf.OnStartBeforeEvent()
if len(slf.certFile)+len(slf.keyFile) > 0 {
if err := http.ListenAndServeTLS(slf.addr, slf.certFile, slf.keyFile, nil); err != nil {
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
slf.PushMessage(MessageTypeError, errors.WithMessage(err, string(debug.Stack())), MessageErrorActionShutdown)
}
} else {
if err := http.ListenAndServe(slf.addr, nil); err != nil {
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
slf.PushMessage(MessageTypeError, errors.WithMessage(err, string(debug.Stack())), MessageErrorActionShutdown)
}
}
}()
})
default:
return ErrCanNotSupportNetwork
}
@ -278,6 +287,9 @@ func (slf *Server) Run(addr string) error {
select {
case <-systemSignal:
slf.Shutdown(nil)
case <-slf.closeChannel:
close(slf.closeChannel)
break
}
} else {
slf.OnStartFinishEvent()
@ -298,6 +310,7 @@ func (slf *Server) IsDev() bool {
// Shutdown 停止运行服务器
func (slf *Server) Shutdown(err error) {
slf.isShutdown.Store(true)
if slf.initMessageChannel {
if slf.gServer != nil {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
@ -324,6 +337,7 @@ func (slf *Server) Shutdown(err error) {
if err != nil {
log.Error("Server", zap.Any("network", slf.network), zap.String("listen", slf.addr),
zap.String("action", "shutdown"), zap.String("state", "exception"), zap.Error(err))
slf.closeChannel <- struct{}{}
} else {
log.Info("Server", zap.Any("network", slf.network), zap.String("listen", slf.addr),
zap.String("action", "shutdown"), zap.String("state", "normal"))
@ -356,7 +370,9 @@ func (slf *Server) PushMessage(messageType MessageType, attrs ...any) {
// dispatchMessage 消息分发
func (slf *Server) dispatchMessage(msg *message) {
defer func() {
if !slf.isShutdown.Load() {
slf.messagePool.Release(msg)
}
if err := recover(); err != nil {
log.Error("Server", zap.String("MessageType", messageNames[msg.t]), zap.Any("MessageAttrs", msg.attrs), zap.Any("error", err))
}
@ -377,6 +393,7 @@ func (slf *Server) dispatchMessage(msg *message) {
log.Error("Server", zap.Error(err))
case MessageErrorActionShutdown:
slf.Shutdown(err)
fmt.Println(err)
default:
log.Warn("Server", zap.String("not support message error action", action.String()))
}