diff --git a/README.md b/README.md index 3a7137b..11de66c 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,7 @@ [![Go doc](https://img.shields.io/badge/go.dev-reference-brightgreen?logo=go&logoColor=white&style=flat)](https://pkg.go.dev/github.com/kercylan98/minotaur) ![](https://img.shields.io/badge/Email-kercylan@gmail.com-green.svg?style=flat) ![](https://komarev.com/ghpvc/?username=kercylan98) + Minotaur 是一个基于Golang 1.20 编写的服务端开发支持库,其中采用了大量泛型设计,用于游戏服务器开发。 @@ -39,7 +40,6 @@ $ go get -u github.com/kercylan98/minotaur ## 用法 - 在`Minotaur`中大量使用了 **[泛型](https://go.dev/doc/tutorial/generics)** 、 **[观察者(事件)](https://www.runoob.com/design-pattern/observer-pattern.html)** 和 **[选项模式](https://juejin.cn/post/6844903729313873927)**,在使用前建议先进行相应了解; -- 更多的 **[示例](./examples)** 参考可在 [examples](./examples) 目录查阅; - 项目文档可访问 **[pkg.go.dev](https://pkg.go.dev/github.com/kercylan98/minotaur)** 进行查阅; ### 本地文档 diff --git a/component/components/lockstep.go b/component/components/lockstep.go index 4a98f69..eb21c6f 100644 --- a/component/components/lockstep.go +++ b/component/components/lockstep.go @@ -39,8 +39,6 @@ func NewLockstep[ClientID comparable, Command any](options ...LockstepOption[Cli // - 自定帧序列化方式 WithLockstepSerialization // - 从特定帧开始追帧 // - 兼容各种基于TCP/UDP/Unix的网络类型,可通过客户端实现其他网络类型同步 -// -// 可在 examples 目录下找到示例,示例项目:simple-server-lockstep type Lockstep[ClientID comparable, Command any] struct { clients *synchronization.Map[ClientID, component.LockstepClient[ClientID]] // 接受广播的客户端 frames *synchronization.Map[int, []Command] // 所有帧指令 diff --git a/examples/doc.go b/examples/doc.go deleted file mode 100644 index 5150013..0000000 --- a/examples/doc.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package examples 提供了多种实现案例 -package examples diff --git a/examples/multi-core-server-room-sync/main.go b/examples/multi-core-server-room-sync/main.go deleted file mode 100644 index 1e906a7..0000000 --- a/examples/multi-core-server-room-sync/main.go +++ /dev/null @@ -1,96 +0,0 @@ -// 该案例中将服务器消息通过多核的方式异步传输到房间中,在每个房间中单独同步处理维护消息。 -package main - -import ( - "fmt" - "github.com/kercylan98/minotaur/game/builtin" - "github.com/kercylan98/minotaur/server" - "github.com/kercylan98/minotaur/utils/synchronization" - "strconv" - "strings" -) - -type message struct { - conn *server.Conn - packet []byte -} - -type Room struct { - *builtin.Room[string, *builtin.Player[string]] - channel chan *message // need close - pool *synchronization.Pool[*message] -} - -func newRoom(guid int64) *Room { - room := &Room{ - Room: builtin.NewRoom[string, *builtin.Player[string]](guid), - } - room.pool = synchronization.NewPool[*message](1024*100, func() *message { - return new(message) - }, func(data *message) { - data.conn = nil - data.packet = nil - }) - room.channel = make(chan *message, 1024*100) - go func() { - for msg := range room.channel { - room.handePacket(msg.conn, msg.packet) - room.pool.Release(msg) - } - }() - return room -} - -func (slf *Room) PushMessage(conn *server.Conn, packet []byte) { - msg := slf.pool.Get() - msg.conn = conn - msg.packet = packet - slf.channel <- msg -} - -func (slf *Room) handePacket(conn *server.Conn, packet []byte) { - conn.WriteString(fmt.Sprintf("[%d] %s", slf.GetGuid(), string(packet))) -} - -// 以房间为核心玩法的多核服务器实现 -// - 服务器消息处理为异步执行 -// - 由房间分发具体消息,在房间内所有消息为同步执行 -func main() { - rooms := synchronization.NewMap[int64, *Room]() - - srv := server.New(server.NetworkWebsocket, - server.WithWebsocketWriteMessageType(server.WebsocketMessageTypeText), - server.WithMultiCore(10), - ) - - srv.RegConnectionReceiveWebsocketPacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte, messageType int) { - p := strings.SplitN(string(packet), ":", 2) - roomId, err := strconv.ParseInt(p[0], 10, 64) - if err != nil { - conn.WriteString(fmt.Sprintf("wrong room id, err: %s", err.Error())) - return - } - // 假定命令格式 ${房间ID}:命令 - switch p[1] { - case "create": - if !rooms.Exist(roomId) { - rooms.Set(roomId, newRoom(roomId)) - conn.WriteString(fmt.Sprintf("create room[%d] success", roomId)) - } else { - conn.WriteString(fmt.Sprintf("room[%d] existed", roomId)) - } - default: - room, exist := rooms.GetExist(roomId) - if !exist { - rooms.Set(roomId, room) - conn.WriteString(fmt.Sprintf("room[%d] does not exist, create room please use ${roomId}:create", roomId)) - } else { - room.PushMessage(conn, []byte(p[1])) - } - } - }) - - if err := srv.Run(":9999"); err != nil { - panic(err) - } -} diff --git a/examples/simple-server-chatroom/main.go b/examples/simple-server-chatroom/main.go deleted file mode 100644 index 394ccff..0000000 --- a/examples/simple-server-chatroom/main.go +++ /dev/null @@ -1,36 +0,0 @@ -// 该案例实现了一个简单的聊天室功能 -package main - -import ( - "fmt" - "github.com/kercylan98/minotaur/server" - "github.com/kercylan98/minotaur/utils/synchronization" -) - -func main() { - connections := synchronization.NewMap[string, *server.Conn]() - - srv := server.New(server.NetworkWebsocket, server.WithWebsocketWriteMessageType(server.WebsocketMessageTypeText)) - srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) { - for _, c := range connections.Map() { - c.Write([]byte(fmt.Sprintf("%s 加入了聊天室", conn.GetID()))) - } - connections.Set(conn.GetID(), conn) - conn.Write([]byte("欢迎加入")) - }) - srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn, err any) { - if connections.DeleteExist(conn.GetID()) { - for id, c := range connections.Map() { - c.Write([]byte(fmt.Sprintf("%s 退出了聊天室", id))) - } - } - }) - srv.RegConnectionReceiveWebsocketPacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte, messageType int) { - for _, c := range connections.Map() { - c.Write([]byte(fmt.Sprintf("%s: %s", conn.GetID(), string(packet)))) - } - }) - if err := srv.Run(":9999"); err != nil { - panic(err) - } -} diff --git a/examples/simple-server-console/main.go b/examples/simple-server-console/main.go deleted file mode 100644 index 031b6de..0000000 --- a/examples/simple-server-console/main.go +++ /dev/null @@ -1,18 +0,0 @@ -// 该案例中延时了控制台服务器的实现,支持运行中根据控制台指令执行额外的功能逻辑 -package main - -import ( - "github.com/kercylan98/minotaur/server" - "github.com/kercylan98/minotaur/utils/log" - "go.uber.org/zap" -) - -func main() { - srv := server.New(server.NetworkWebsocket) - srv.RegConsoleCommandEvent("test", func(srv *server.Server) { - log.Info("Console", zap.String("Info", "Test")) - }) - if err := srv.Run(":9999"); err != nil { - panic(err) - } -} diff --git a/examples/simple-server-cross/cross-a/main.go b/examples/simple-server-cross/cross-a/main.go deleted file mode 100644 index 1c01c2a..0000000 --- a/examples/simple-server-cross/cross-a/main.go +++ /dev/null @@ -1,27 +0,0 @@ -package main - -import ( - "github.com/kercylan98/minotaur/server" - "github.com/kercylan98/minotaur/server/cross" - "github.com/kercylan98/minotaur/utils/log" - "github.com/kercylan98/minotaur/utils/timer" - "go.uber.org/zap" - "time" -) - -func main() { - srv := server.New(server.NetworkWebsocket, server.WithCross("nats", 1, cross.NewNats("127.0.0.1:4222")), server.WithTicker(10, false)) - srv.RegStartFinishEvent(func(srv *server.Server) { - srv.Ticker().Loop("CROSS", timer.Instantly, time.Second, timer.Forever, func() { - if err := srv.PushCrossMessage("nats", 2, []byte("I am cross 1")); err != nil { - panic(err) - } - }) - }) - srv.RegReceiveCrossPacketEvent(func(srv *server.Server, senderServerId int64, packet []byte) { - log.Info("Cross", zap.Int64("ServerID", senderServerId), zap.String("Packet", string(packet))) - }) - if err := srv.Run(":9999"); err != nil { - panic(err) - } -} diff --git a/examples/simple-server-cross/cross-b/main.go b/examples/simple-server-cross/cross-b/main.go deleted file mode 100644 index 8df686e..0000000 --- a/examples/simple-server-cross/cross-b/main.go +++ /dev/null @@ -1,27 +0,0 @@ -package main - -import ( - "github.com/kercylan98/minotaur/server" - "github.com/kercylan98/minotaur/server/cross" - "github.com/kercylan98/minotaur/utils/log" - "github.com/kercylan98/minotaur/utils/timer" - "go.uber.org/zap" - "time" -) - -func main() { - srv := server.New(server.NetworkWebsocket, server.WithCross("nats", 2, cross.NewNats("127.0.0.1:4222")), server.WithTicker(10, false)) - srv.RegStartFinishEvent(func(srv *server.Server) { - srv.Ticker().Loop("CROSS", timer.Instantly, time.Second, timer.Forever, func() { - if err := srv.PushCrossMessage("nats", 1, []byte("I am cross 2")); err != nil { - panic(err) - } - }) - }) - srv.RegReceiveCrossPacketEvent(func(srv *server.Server, senderServerId int64, packet []byte) { - log.Info("Cross", zap.Int64("ServerID", senderServerId), zap.String("Packet", string(packet))) - }) - if err := srv.Run(":19999"); err != nil { - panic(err) - } -} diff --git a/examples/simple-server-cross/doc.go b/examples/simple-server-cross/doc.go deleted file mode 100644 index 49a84db..0000000 --- a/examples/simple-server-cross/doc.go +++ /dev/null @@ -1,2 +0,0 @@ -// 该案例中演示了两个跨服相互通过网络进行数据传输的实现 -package main diff --git a/examples/simple-server-echo/main.go b/examples/simple-server-echo/main.go deleted file mode 100644 index 2d7caef..0000000 --- a/examples/simple-server-echo/main.go +++ /dev/null @@ -1,16 +0,0 @@ -// 该案例中实现了一个简单的回响服务器 -package main - -import ( - "github.com/kercylan98/minotaur/server" -) - -func main() { - srv := server.New(server.NetworkWebsocket) - srv.RegConnectionReceiveWebsocketPacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte, messageType int) { - conn.Write(packet, messageType) - }) - if err := srv.Run(":9999"); err != nil { - panic(err) - } -} diff --git a/examples/simple-server-lockstep/main.go b/examples/simple-server-lockstep/main.go deleted file mode 100644 index d85cbb1..0000000 --- a/examples/simple-server-lockstep/main.go +++ /dev/null @@ -1,50 +0,0 @@ -package main - -import ( - "github.com/kercylan98/minotaur/component/components" - "github.com/kercylan98/minotaur/game/builtin" - "github.com/kercylan98/minotaur/server" - "github.com/kercylan98/minotaur/utils/synchronization" -) - -type Player struct { - *builtin.Player[string] -} - -type Command struct { - CMD int - Data string -} - -// 访问:http://www.websocket-test.com/ -// - 使用多个页面连接到服务器后,任一页面发送start即可开启帧同步 -func main() { - players := synchronization.NewMap[string, *Player]() - - srv := server.New(server.NetworkWebsocket, server.WithWebsocketWriteMessageType(server.WebsocketMessageTypeText)) - lockstep := components.NewLockstep[string, *Command]() - - srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) { - player := &Player{Player: builtin.NewPlayer[string](conn.GetID(), conn)} - players.Set(conn.GetID(), player) - lockstep.JoinClient(player) - }) - srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn, err any) { - players.Delete(conn.GetID()) - lockstep.LeaveClient(conn.GetID()) - if players.Size() == 0 { - lockstep.StopBroadcast() - } - }) - srv.RegConnectionReceiveWebsocketPacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte, messageType int) { - switch string(packet) { - case "start": - lockstep.StartBroadcast() - default: - lockstep.AddCommand(&Command{CMD: 1, Data: string(packet)}) - } - }) - if err := srv.Run(":9999"); err != nil { - panic(err) - } -} diff --git a/game/builtin/player.go b/game/builtin/player.go index c66c2a9..3fd686e 100644 --- a/game/builtin/player.go +++ b/game/builtin/player.go @@ -30,8 +30,8 @@ func (slf *Player[ID]) UseConn(conn *server.Conn) { } // Send 向该玩家发送数据 -func (slf *Player[ID]) Send(packet []byte, messageType ...int) { - slf.conn.Write(packet, messageType...) +func (slf *Player[ID]) Send(packet server.Packet) { + slf.conn.Write(packet) } // Close 关闭玩家 diff --git a/go.mod b/go.mod index c551cd6..e73072f 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( github.com/nats-io/nats-server/v2 v2.9.16 // indirect github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/panjf2000/ants/v2 v2.8.1 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/smartystreets/assertions v1.13.1 // indirect diff --git a/go.sum b/go.sum index 4f7cfe0..51e5913 100644 --- a/go.sum +++ b/go.sum @@ -112,6 +112,8 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/panjf2000/ants/v2 v2.4.7 h1:MZnw2JRyTJxFwtaMtUJcwE618wKD04POWk2gwwP4E2M= github.com/panjf2000/ants/v2 v2.4.7/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= +github.com/panjf2000/ants/v2 v2.8.1 h1:C+n/f++aiW8kHCExKlpX6X+okmxKXP7DWLutxuAPuwQ= +github.com/panjf2000/ants/v2 v2.8.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= github.com/panjf2000/gnet v1.6.6 h1:P6bApc54hnVcJVgH+SMe41mn47ECCajB6E/dKq27Y0c= github.com/panjf2000/gnet v1.6.6/go.mod h1:KcOU7QsCaCBjeD5kyshBIamG3d9kAQtlob4Y0v0E+sc= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= @@ -228,6 +230,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/server/conn.go b/server/conn.go index 3588b21..1d0fd69 100644 --- a/server/conn.go +++ b/server/conn.go @@ -156,29 +156,15 @@ func (slf *Conn) IsWebsocket() bool { return slf.server.network == NetworkWebsocket } -// WriteString 向连接中写入字符串 -// - 通过转换为[]byte调用 *Conn.Write -func (slf *Conn) WriteString(data string, messageType ...int) { - slf.Write([]byte(data), messageType...) -} - -// WriteStringWithCallback 与 WriteString 相同,但是会在写入完成后调用 callback -// - 当 callback 为 nil 时,与 WriteString 相同 -func (slf *Conn) WriteStringWithCallback(data string, callback func(err error), messageType ...int) { - slf.WriteWithCallback([]byte(data), callback, messageType...) -} - // Write 向连接中写入数据 // - messageType: websocket模式中指定消息类型 -func (slf *Conn) Write(data []byte, messageType ...int) { +func (slf *Conn) Write(packet Packet) { if slf.packetPool == nil { return } cp := slf.packetPool.Get() - if len(messageType) > 0 { - cp.websocketMessageType = messageType[0] - } - cp.packet = data + cp.websocketMessageType = packet.WebsocketType + cp.packet = packet.Data slf.mutex.Lock() slf.packets = append(slf.packets, cp) slf.mutex.Unlock() @@ -186,15 +172,13 @@ func (slf *Conn) Write(data []byte, messageType ...int) { // WriteWithCallback 与 Write 相同,但是会在写入完成后调用 callback // - 当 callback 为 nil 时,与 Write 相同 -func (slf *Conn) WriteWithCallback(data []byte, callback func(err error), messageType ...int) { +func (slf *Conn) WriteWithCallback(packet Packet, callback func(err error), messageType ...int) { if slf.packetPool == nil { return } cp := slf.packetPool.Get() - if len(messageType) > 0 { - cp.websocketMessageType = messageType[0] - } - cp.packet = data + cp.websocketMessageType = packet.WebsocketType + cp.packet = packet.Data cp.callback = callback slf.mutex.Lock() slf.packets = append(slf.packets, cp) @@ -233,18 +217,8 @@ func (slf *Conn) writeLoop(wait *sync.WaitGroup) { slf.mutex.Unlock() for i := 0; i < len(packets); i++ { data := packets[i] - //if len(data.packet) == 0 { - // for _, packet := range packets { - // slf.packetPool.Release(packet) - // } - // slf.Close() - // return - //} var err error if slf.IsWebsocket() { - if data.websocketMessageType <= 0 { - data.websocketMessageType = slf.server.websocketWriteMessageType - } err = slf.ws.WriteMessage(data.websocketMessageType, data.packet) } else { if slf.gn != nil { diff --git a/server/errors.go b/server/errors.go index 855f40e..5ddd56a 100644 --- a/server/errors.go +++ b/server/errors.go @@ -3,22 +3,12 @@ package server import "errors" var ( - ErrConstructed = errors.New("the Server must be constructed using the server.New function") - ErrCanNotSupportNetwork = errors.New("can not support network") - ErrMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte") - ErrWebsocketMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte and int(MessageType)") - ErrMessageTypeErrorAttrs = errors.New("MessageTypePacket must contain error and MessageErrorAction") - ErrMessageTypeCrossErrorAttrs = errors.New("MessageTypeCross must contain int64(server id) and []byte") - ErrMessageTypeTickerErrorAttrs = errors.New("MessageTypeTicker must contain func()") - ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported") - ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported") - ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp") - ErrWebsocketMessageTypeException = errors.New("unknown message type, will not work") - ErrNotWebsocketUseMessageType = errors.New("message type filtering only supports websocket and does not take effect") - ErrWebsocketIllegalMessageType = errors.New("illegal message type") - ErrPleaseUseWebsocketHandle = errors.New("in Websocket mode, please use the RegConnectionReceiveWebsocketPacketEvent function to register") - ErrPleaseUseOrdinaryPacketHandle = errors.New("non Websocket mode, please use the RegConnectionReceivePacketEvent function to register") - ErrNoSupportCross = errors.New("the server does not support GetID or PushCrossMessage, please use the WithCross option to create the server") - ErrNoSupportTicker = errors.New("the server does not support Ticker, please use the WithTicker option to create the server") - ErrUnregisteredCrossName = errors.New("unregistered cross name, please use the WithCross option to create the server") + ErrConstructed = errors.New("the Server must be constructed using the server.New function") + ErrCanNotSupportNetwork = errors.New("can not support network") + ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported") + ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported") + ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp") + ErrWebsocketIllegalMessageType = errors.New("illegal message type") + ErrNoSupportCross = errors.New("the server does not support GetID or PushCrossMessage, please use the WithCross option to create the server") + ErrNoSupportTicker = errors.New("the server does not support Ticker, please use the WithTicker option to create the server") ) diff --git a/server/event.go b/server/event.go index 7d2bc1b..1d1ba10 100644 --- a/server/event.go +++ b/server/event.go @@ -13,8 +13,7 @@ import ( type StartBeforeEventHandle func(srv *Server) type StartFinishEventHandle func(srv *Server) type StopEventHandle func(srv *Server) -type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet []byte) -type ConnectionReceiveWebsocketPacketEventHandle func(srv *Server, conn *Conn, packet []byte, messageType int) +type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet Packet) type ConnectionOpenedEventHandle func(srv *Server, conn *Conn) type ConnectionClosedEventHandle func(srv *Server, conn *Conn, err any) type ReceiveCrossPacketEventHandle func(srv *Server, senderServerId int64, packet []byte) @@ -24,16 +23,15 @@ type ConsoleCommandEventHandle func(srv *Server) type event struct { *Server - startBeforeEventHandles []StartBeforeEventHandle - startFinishEventHandles []StartFinishEventHandle - stopEventHandles []StopEventHandle - connectionReceivePacketEventHandles []ConnectionReceivePacketEventHandle - connectionReceiveWebsocketPacketEventHandles []ConnectionReceiveWebsocketPacketEventHandle - connectionOpenedEventHandles []ConnectionOpenedEventHandle - connectionClosedEventHandles []ConnectionClosedEventHandle - receiveCrossPacketEventHandles []ReceiveCrossPacketEventHandle - messageErrorEventHandles []MessageErrorEventHandle - messageLowExecEventHandles []MessageLowExecEventHandle + startBeforeEventHandles []StartBeforeEventHandle + startFinishEventHandles []StartFinishEventHandle + stopEventHandles []StopEventHandle + connectionReceivePacketEventHandles []ConnectionReceivePacketEventHandle + connectionOpenedEventHandles []ConnectionOpenedEventHandle + connectionClosedEventHandles []ConnectionClosedEventHandle + receiveCrossPacketEventHandles []ReceiveCrossPacketEventHandle + messageErrorEventHandles []MessageErrorEventHandle + messageLowExecEventHandles []MessageLowExecEventHandle consoleCommandEventHandles map[string][]ConsoleCommandEventHandle @@ -75,8 +73,8 @@ func (slf *event) OnConsoleCommandEvent(command string) { if !exist { switch command { case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN": - log.Info("Console", zap.String("Receive", command), zap.String("Action", "Shutdown")) - slf.Server.Shutdown(nil) + log.Info("Console", zap.String("Receive", command), zap.String("Action", "shutdown")) + slf.Server.shutdown(nil) return } log.Warn("Server", zap.String("Command", "unregistered")) @@ -147,34 +145,16 @@ func (slf *event) RegConnectionReceivePacketEvent(handle ConnectionReceivePacket if slf.network == NetworkHttp { panic(ErrNetworkIncompatibleHttp) } - if slf.network == NetworkWebsocket { - panic(ErrPleaseUseWebsocketHandle) - } slf.connectionReceivePacketEventHandles = append(slf.connectionReceivePacketEventHandles, handle) log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String())) } -func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet []byte) { +func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet Packet) { for _, handle := range slf.connectionReceivePacketEventHandles { handle(slf.Server, conn, packet) } } -// RegConnectionReceiveWebsocketPacketEvent 在接收到Websocket数据包时将立刻执行被注册的事件处理函数 -func (slf *event) RegConnectionReceiveWebsocketPacketEvent(handle ConnectionReceiveWebsocketPacketEventHandle) { - if slf.network != NetworkWebsocket { - panic(ErrPleaseUseOrdinaryPacketHandle) - } - slf.connectionReceiveWebsocketPacketEventHandles = append(slf.connectionReceiveWebsocketPacketEventHandles, handle) - log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String())) -} - -func (slf *event) OnConnectionReceiveWebsocketPacketEvent(conn *Conn, packet []byte, messageType int) { - for _, handle := range slf.connectionReceiveWebsocketPacketEventHandles { - handle(slf.Server, conn, packet, messageType) - } -} - // RegReceiveCrossPacketEvent 在接收到跨服数据包时将立即执行被注册的事件处理函数 func (slf *event) RegReceiveCrossPacketEvent(handle ReceiveCrossPacketEventHandle) { slf.receiveCrossPacketEventHandles = append(slf.receiveCrossPacketEventHandles, handle) @@ -215,15 +195,8 @@ func (slf *event) check() { switch slf.network { case NetworkHttp, NetworkGRPC: default: - switch slf.network { - case NetworkWebsocket: - if len(slf.connectionReceiveWebsocketPacketEventHandles) == 0 { - log.Warn("Server", zap.String("ConnectionReceiveWebsocketPacketEvent", "invalid server, no packets processed")) - } - default: - if len(slf.connectionReceivePacketEventHandles) == 0 { - log.Warn("Server", zap.String("ConnectionReceivePacketEvent", "invalid server, no packets processed")) - } + if len(slf.connectionReceivePacketEventHandles) == 0 { + log.Warn("Server", zap.String("ConnectionReceivePacketEvent", "invalid server, no packets processed")) } } diff --git a/server/gnet.go b/server/gnet.go index a21e630..4e71793 100644 --- a/server/gnet.go +++ b/server/gnet.go @@ -23,7 +23,7 @@ func (slf *gNet) OnShutdown(server gnet.Server) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() if err := gnet.Stop(ctx, fmt.Sprintf("%s://%s", slf.network, slf.addr)); err != nil { - log.Error("Server", zap.String("Minotaur GNet Server", "Shutdown"), zap.Error(err)) + log.Error("Server", zap.String("Minotaur GNet Server", "shutdown"), zap.Error(err)) } } @@ -48,7 +48,7 @@ func (slf *gNet) AfterWrite(c gnet.Conn, b []byte) { } func (slf *gNet) React(packet []byte, c gnet.Conn) (out []byte, action gnet.Action) { - slf.Server.PushMessage(MessageTypePacket, c.Context().(*Conn), bytes.Clone(packet)) + PushPacketMessage(slf.Server, c.Context().(*Conn), append(bytes.Clone(packet), 0)) return nil, gnet.None } diff --git a/server/message.go b/server/message.go index 7a6a173..f93e531 100644 --- a/server/message.go +++ b/server/message.go @@ -1,24 +1,24 @@ package server +import ( + "runtime/debug" +) + const ( // MessageTypePacket 数据包消息类型:该类型的数据将被发送到 ConnectionReceivePacketEvent 进行处理 - // - *server.Conn - // - []byte MessageTypePacket MessageType = iota // MessageTypeError 错误消息类型:根据不同的错误状态,将交由 Server 进行统一处理 - // - error - // - server.MessageErrorAction MessageTypeError // MessageTypeCross 跨服消息类型:将被推送到跨服的 Cross 实现中进行处理 - // - int64(serverId) - // - []byte MessageTypeCross // MessageTypeTicker 定时器消息类型 - // - func() MessageTypeTicker + + // MessageTypeAsync 异步消息类型 + MessageTypeAsync ) var messageNames = map[MessageType]string{ @@ -26,16 +26,17 @@ var messageNames = map[MessageType]string{ MessageTypeError: "MessageTypeError", MessageTypeCross: "MessageTypeCross", MessageTypeTicker: "MessageTypeTicker", + MessageTypeAsync: "MessageTypeAsync", } const ( MessageErrorActionNone MessageErrorAction = iota // 错误消息类型操作:将不会被进行任何特殊处理,仅进行日志输出 - MessageErrorActionShutdown // 错误消息类型操作:当接收到该类型的操作时,服务器将执行 Server.Shutdown 函数 + MessageErrorActionShutdown // 错误消息类型操作:当接收到该类型的操作时,服务器将执行 Server.shutdown 函数 ) var messageErrorActionNames = map[MessageErrorAction]string{ MessageErrorActionNone: "None", - MessageErrorActionShutdown: "Shutdown", + MessageErrorActionShutdown: "shutdown", } type ( @@ -59,73 +60,53 @@ func (slf MessageType) String() string { return messageNames[slf] } -func (slf MessageType) deconstructWebSocketPacket(attrs ...any) (conn *Conn, packet []byte, messageType int) { - if len(attrs) != 3 { - panic(ErrWebsocketMessageTypePacketAttrs) - } - var ok bool - if conn, ok = attrs[0].(*Conn); !ok { - panic(ErrWebsocketMessageTypePacketAttrs) - } - if packet, ok = attrs[1].([]byte); !ok { - panic(ErrWebsocketMessageTypePacketAttrs) - } - if messageType, ok = attrs[2].(int); !ok { - panic(ErrWebsocketMessageTypePacketAttrs) - } - return +// PushPacketMessage 向特定服务器中推送 MessageTypePacket 消息 +func PushPacketMessage(srv *Server, conn *Conn, packet []byte) { + msg := srv.messagePool.Get() + msg.t = MessageTypePacket + msg.attrs = []any{conn, packet} + srv.pushMessage(msg) } -func (slf MessageType) deconstructPacket(attrs ...any) (conn *Conn, packet []byte) { - if len(attrs) != 2 { - panic(ErrMessageTypePacketAttrs) - } - var ok bool - if conn, ok = attrs[0].(*Conn); !ok { - panic(ErrMessageTypePacketAttrs) - } - if packet, ok = attrs[1].([]byte); !ok { - panic(ErrMessageTypePacketAttrs) - } - return +// PushErrorMessage 向特定服务器中推送 MessageTypeError 消息 +func PushErrorMessage(srv *Server, err error, action MessageErrorAction) { + msg := srv.messagePool.Get() + msg.t = MessageTypeError + msg.attrs = []any{err, action, string(debug.Stack())} + srv.pushMessage(msg) } -func (slf MessageType) deconstructError(attrs ...any) (err error, action MessageErrorAction, stack string) { - if len(attrs) != 3 { - panic(ErrMessageTypeErrorAttrs) +// PushCrossMessage 向特定服务器中推送 MessageTypeCross 消息 +func PushCrossMessage(srv *Server, crossName string, serverId int64, packet []byte) { + if serverId == srv.id { + msg := srv.messagePool.Get() + msg.t = MessageTypeCross + msg.attrs = []any{serverId, packet} + srv.pushMessage(msg) + } else { + if len(srv.cross) == 0 { + return + } + cross, exist := srv.cross[crossName] + if !exist { + return + } + _ = cross.PushMessage(serverId, packet) } - var ok bool - if err, ok = attrs[0].(error); !ok { - panic(ErrMessageTypeErrorAttrs) - } - if action, ok = attrs[1].(MessageErrorAction); !ok { - panic(ErrMessageTypeErrorAttrs) - } - stack = attrs[2].(string) - return } -func (slf MessageType) deconstructCross(attrs ...any) (serverId int64, packet []byte) { - if len(attrs) != 2 { - panic(ErrMessageTypeCrossErrorAttrs) - } - var ok bool - if serverId, ok = attrs[0].(int64); !ok { - panic(ErrMessageTypeCrossErrorAttrs) - } - if packet, ok = attrs[1].([]byte); !ok { - panic(ErrMessageTypeCrossErrorAttrs) - } - return +// PushTickerMessage 向特定服务器中推送 MessageTypeTicker 消息 +func PushTickerMessage(srv *Server, caller func()) { + msg := srv.messagePool.Get() + msg.t = MessageTypeTicker + msg.attrs = []any{caller} + srv.pushMessage(msg) } -func (slf MessageType) deconstructTicker(attrs ...any) (caller func()) { - if len(attrs) != 1 { - panic(ErrMessageTypeTickerErrorAttrs) - } - var ok bool - if caller, ok = attrs[0].(func()); !ok { - panic(ErrMessageTypeTickerErrorAttrs) - } - return +// PushAsyncMessage 向特定服务器中推送 MessageTypeAsync 消息 +func PushAsyncMessage(srv *Server, caller func() error, callback ...func(err error)) { + msg := srv.messagePool.Get() + msg.t = MessageTypeAsync + msg.attrs = []any{caller, callback} + srv.pushMessage(msg) } diff --git a/server/multiple.go b/server/multiple.go index c9fdf0a..0f29a47 100644 --- a/server/multiple.go +++ b/server/multiple.go @@ -60,7 +60,7 @@ func (slf *MultipleServer) Run() { case err := <-exceptionChannel: for len(slf.servers) > 0 { server := slf.servers[0] - server.Shutdown(err) + server.shutdown(err) slf.servers = slf.servers[1:] } break @@ -68,7 +68,7 @@ func (slf *MultipleServer) Run() { for len(slf.servers) > 0 { server := slf.servers[0] server.multipleRuntimeErrorChan = nil - server.Shutdown(nil) + server.shutdown(nil) slf.servers = slf.servers[1:] } break @@ -76,7 +76,7 @@ func (slf *MultipleServer) Run() { for len(slf.servers) > 0 { server := slf.servers[0] server.multipleRuntimeErrorChan = nil - server.Shutdown(nil) + server.shutdown(nil) slf.servers = slf.servers[1:] } break diff --git a/server/options.go b/server/options.go index fcecd2a..ed31f7f 100644 --- a/server/options.go +++ b/server/options.go @@ -23,6 +23,41 @@ const ( ) type Option func(srv *Server) +type option struct { + disableAnts bool // 是否禁用协程池 + antsPoolSize int // 协程池大小 +} + +type runtime struct { + deadlockDetect time.Duration // 是否开启死锁检测 +} + +// WithDeadlockDetect 通过死锁、死循环、永久阻塞检测的方式创建服务器 +// - 当检测到死锁、死循环、永久阻塞时,服务器将会生成 WARN 类型的日志,关键字为 "SuspectedDeadlock" +func WithDeadlockDetect(t time.Duration) Option { + return func(srv *Server) { + if t > 0 { + srv.deadlockDetect = t + log.Info("DeadlockDetect", zap.Any("Time", t)) + } + } +} + +// WithDisableAsyncMessage 通过禁用异步消息的方式创建服务器 +func WithDisableAsyncMessage() Option { + return func(srv *Server) { + srv.disableAnts = true + } +} + +// WithAsyncPoolSize 通过指定异步消息池大小的方式创建服务器 +// - 当通过 WithDisableAsyncMessage 禁用异步消息时,此选项无效 +// - 默认值为 256 +func WithAsyncPoolSize(size int) Option { + return func(srv *Server) { + srv.antsPoolSize = size + } +} // WithWebsocketReadDeadline 设置 Websocket 读取超时时间 // - 默认: 30 * time.Second @@ -36,16 +71,6 @@ func WithWebsocketReadDeadline(t time.Duration) Option { } } -// WithDiversion 通过分流的方式创建服务器 -// - diversion:分流函数,返回一个函数通道,用于接收分流的消息 -// - 需要确保能够通过 conn 和 packet 确定分流通道 -// - 多核模式下将导致消息顺序不一致,通过结果依然是单核处理的,因为分流通道仅有一个 -func WithDiversion(diversion func(conn ConnReadonly, packet []byte) chan func()) Option { - return func(srv *Server) { - srv.diversion = diversion - } -} - // WithTicker 通过定时器创建服务器,为服务器添加定时器功能 // - autonomy:定时器是否独立运行(独立运行的情况下不会作为服务器消息运行,会导致并发问题) // - 多核与分流情况下需要考虑是否有必要 autonomy @@ -55,7 +80,7 @@ func WithTicker(size int, autonomy bool) Option { srv.ticker = timer.GetTicker(size) } else { srv.ticker = timer.GetTicker(size, timer.WithCaller(func(name string, caller func()) { - srv.PushMessage(MessageTypeTicker, caller) + PushTickerMessage(srv, caller) })) } } @@ -74,7 +99,10 @@ func WithCross(crossName string, serverId int64, cross Cross) Option { } srv.cross[crossName] = cross err := cross.Init(srv, func(serverId int64, packet []byte) { - srv.PushMessage(MessageTypeCross, serverId, packet) + msg := srv.messagePool.Get() + msg.t = MessageTypeCross + msg.attrs = []any{serverId, packet} + srv.pushMessage(msg) }) if err != nil { log.Info("Cross", zap.Int64("ServerID", serverId), zap.String("Cross", reflect.TypeOf(cross).String()), zap.String("State", "WaitNatsRun")) @@ -115,24 +143,10 @@ func WithProd() Option { } } -// WithWebsocketWriteMessageType 设置客户端写入的Websocket消息类型 -// - 默认: WebsocketMessageTypeBinary -func WithWebsocketWriteMessageType(messageType int) Option { - return func(srv *Server) { - switch messageType { - case WebsocketMessageTypeText, WebsocketMessageTypeBinary, WebsocketMessageTypeClose, WebsocketMessageTypePing, WebsocketMessageTypePong: - srv.websocketWriteMessageType = messageType - default: - log.Warn("WithWebsocketWriteMessageType", zap.Int("MessageType", messageType), zap.Error(ErrWebsocketMessageTypeException)) - } - } -} - // WithWebsocketMessageType 设置仅支持特定类型的Websocket消息 func WithWebsocketMessageType(messageTypes ...int) Option { return func(srv *Server) { if srv.network != NetworkWebsocket { - log.Warn("WitchWebsocketMessageType", zap.String("Network", string(srv.network)), zap.Error(ErrNotWebsocketUseMessageType)) return } var supports = make(map[int]bool) @@ -140,8 +154,6 @@ func WithWebsocketMessageType(messageTypes ...int) Option { switch messageType { case WebsocketMessageTypeText, WebsocketMessageTypeBinary, WebsocketMessageTypeClose, WebsocketMessageTypePing, WebsocketMessageTypePong: supports[messageType] = true - default: - log.Warn("WitchWebsocketMessageType", zap.Int("MessageType", messageType), zap.Error(ErrWebsocketMessageTypeException)) } } srv.supportMessageTypes = supports @@ -159,17 +171,3 @@ func WithMessageBufferSize(size int) Option { srv.messagePoolSize = size } } - -// WithMultiCore 通过特定核心数量运行服务器,默认为单核 -// - count > 1 的情况下,将会有对应数量的 goroutine 来处理消息 -// - 注意:HTTP和GRPC网络模式下不会生效 -// - 在需要分流的场景推荐采用多核模式,如游戏以房间的形式进行,每个房间互不干扰,这种情况下便可以每个房间单独维护数据包消息进行处理 -func WithMultiCore(count int) Option { - return func(srv *Server) { - srv.core = count - if srv.core < 1 { - log.Warn("WithMultiCore", zap.Int("count", count), zap.String("tips", "wrong core count configuration, corrected to 1, currently in single-core mode")) - srv.core = 1 - } - } -} diff --git a/server/packet.go b/server/packet.go new file mode 100644 index 0000000..b863c0f --- /dev/null +++ b/server/packet.go @@ -0,0 +1,10 @@ +package server + +type Packet struct { + WebsocketType int + Data []byte +} + +func (slf Packet) String() string { + return string(slf.Data) +} diff --git a/server/server.go b/server/server.go index 4017bb2..09eccea 100644 --- a/server/server.go +++ b/server/server.go @@ -10,6 +10,7 @@ import ( "github.com/kercylan98/minotaur/utils/synchronization" "github.com/kercylan98/minotaur/utils/timer" "github.com/kercylan98/minotaur/utils/times" + "github.com/panjf2000/ants/v2" "github.com/panjf2000/gnet" "github.com/panjf2000/gnet/pkg/logging" "github.com/xtaci/kcp-go/v5" @@ -19,7 +20,6 @@ import ( "net/http" "os" "os/signal" - "runtime/debug" "strings" "sync/atomic" "syscall" @@ -29,12 +29,12 @@ import ( // New 根据特定网络类型创建一个服务器 func New(network Network, options ...Option) *Server { server := &Server{ - event: &event{}, - network: network, - options: options, - core: 1, - closeChannel: make(chan struct{}, 1), - websocketWriteMessageType: WebsocketMessageTypeBinary, + event: &event{}, + runtime: &runtime{}, + option: &option{}, + network: network, + closeChannel: make(chan struct{}, 1), + systemSignal: make(chan os.Signal, 1), } server.event.Server = server @@ -53,37 +53,49 @@ func New(network Network, options ...Option) *Server { for _, option := range options { option(server) } + + if !server.disableAnts { + if server.antsPoolSize <= 0 { + server.antsPoolSize = 256 + } + var err error + server.ants, err = ants.NewPool(server.antsPoolSize, ants.WithLogger(log.Logger())) + if err != nil { + panic(err) + } + } + + server.option = nil return server } // Server 网络服务器 type Server struct { - *event // 事件 - cross map[string]Cross // 跨服 - id int64 // 服务器id - network Network // 网络类型 - addr string // 侦听地址 - options []Option // 选项 - ginServer *gin.Engine // HTTP模式下的路由器 - httpServer *http.Server // HTTP模式下的服务器 - grpcServer *grpc.Server // GRPC模式下的服务器 - supportMessageTypes map[int]bool // websocket模式下支持的消息类型 - certFile, keyFile string // TLS文件 - isRunning bool // 是否正在运行 - isShutdown atomic.Bool // 是否已关闭 - closeChannel chan struct{} // 关闭信号 - diversion func(conn ConnReadonly, packet []byte) chan func() // 分流器 + *event // 事件 + *runtime // 运行时 + *option // 可选项 + systemSignal chan os.Signal // 系统信号 + cross map[string]Cross // 跨服 + id int64 // 服务器id + network Network // 网络类型 + addr string // 侦听地址 + ginServer *gin.Engine // HTTP模式下的路由器 + httpServer *http.Server // HTTP模式下的服务器 + grpcServer *grpc.Server // GRPC模式下的服务器 + supportMessageTypes map[int]bool // websocket模式下支持的消息类型 + certFile, keyFile string // TLS文件 + isRunning bool // 是否正在运行 + isShutdown atomic.Bool // 是否已关闭 + closeChannel chan struct{} // 关闭信号 + ants *ants.Pool // 协程池 - gServer *gNet // TCP或UDP模式下的服务器 - messagePool *synchronization.Pool[*Message] // 消息池 - messagePoolSize int // 消息池大小 - messageChannel map[int]chan *Message // 消息管道 - initMessageChannel bool // 消息管道是否已经初始化 - multiple *MultipleServer // 多服务器模式下的服务器 - prod bool // 是否为生产模式 - core int // 消息处理核心数 - websocketWriteMessageType int // websocket写入的消息类型 - ticker *timer.Ticker // 定时器 + gServer *gNet // TCP或UDP模式下的服务器 + messagePool *synchronization.Pool[*Message] // 消息池 + messagePoolSize int // 消息池大小 + messageChannel chan *Message // 消息管道 + multiple *MultipleServer // 多服务器模式下的服务器 + prod bool // 是否为生产模式 + ticker *timer.Ticker // 定时器 multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 @@ -110,7 +122,6 @@ func (slf *Server) Run(addr string) error { slf.addr = addr var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr) var connectionInitHandle = func(callback func()) { - slf.initMessageChannel = true if slf.messagePoolSize <= 0 { slf.messagePoolSize = 100 } @@ -123,24 +134,18 @@ func (slf *Server) Run(addr string) error { data.attrs = nil }, ) - slf.messageChannel = map[int]chan *Message{} - for i := 0; i < slf.core; i++ { - slf.messageChannel[i] = make(chan *Message, 4096*1000) - } + slf.messageChannel = make(chan *Message, 4096*1000) if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC { slf.gServer = &gNet{Server: slf} } if callback != nil { go callback() } - for _, messageChannel := range slf.messageChannel { - messageChannel := messageChannel - go func() { - for message := range messageChannel { - slf.dispatchMessage(message, slf.diversion != nil) - } - }() - } + go func() { + for message := range slf.messageChannel { + slf.dispatchMessage(message) + } + }() } switch slf.network { @@ -155,7 +160,7 @@ func (slf *Server) Run(addr string) error { slf.OnStartBeforeEvent() if err := slf.grpcServer.Serve(listener); err != nil { slf.isRunning = false - slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) + PushErrorMessage(slf, err, MessageErrorActionShutdown) } }() case NetworkTcp, NetworkTcp4, NetworkTcp6, NetworkUdp, NetworkUdp4, NetworkUdp6, NetworkUnix: @@ -169,7 +174,7 @@ func (slf *Server) Run(addr string) error { gnet.WithMulticore(true), ); err != nil { slf.isRunning = false - slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) + PushErrorMessage(slf, err, MessageErrorActionShutdown) } }) case NetworkKcp: @@ -202,7 +207,7 @@ func (slf *Server) Run(addr string) error { if err != nil { panic(err) } - slf.PushMessage(MessageTypePacket, conn, buf[:n]) + PushPacketMessage(slf, conn, buf[:n]) } }(conn) } @@ -220,12 +225,12 @@ func (slf *Server) Run(addr string) error { if len(slf.certFile)+len(slf.keyFile) > 0 { if err := slf.httpServer.ListenAndServeTLS(slf.certFile, slf.keyFile); err != nil { slf.isRunning = false - slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) + PushErrorMessage(slf, err, MessageErrorActionShutdown) } } else { if err := slf.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { slf.isRunning = false - slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) + PushErrorMessage(slf, err, MessageErrorActionShutdown) } } @@ -286,7 +291,7 @@ func (slf *Server) Run(addr string) error { if len(slf.supportMessageTypes) > 0 && !slf.supportMessageTypes[messageType] { panic(ErrWebsocketIllegalMessageType) } - slf.PushMessage(MessageTypePacket, conn, packet, messageType) + PushPacketMessage(slf, conn, append(packet, byte(messageType))) } }) go func() { @@ -295,12 +300,12 @@ func (slf *Server) Run(addr string) error { if len(slf.certFile)+len(slf.keyFile) > 0 { if err := http.ListenAndServeTLS(slf.addr, slf.certFile, slf.keyFile, nil); err != nil { slf.isRunning = false - slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) + PushErrorMessage(slf, err, MessageErrorActionShutdown) } } else { if err := http.ListenAndServe(slf.addr, nil); err != nil { slf.isRunning = false - slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) + PushErrorMessage(slf, err, MessageErrorActionShutdown) } } @@ -318,11 +323,11 @@ func (slf *Server) Run(addr string) error { ) log.Info("Server", zap.String("Minotaur Server", "====================================================================")) slf.OnStartFinishEvent() - systemSignal := make(chan os.Signal, 1) - signal.Notify(systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) + + signal.Notify(slf.systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) select { - case <-systemSignal: - slf.Shutdown(nil) + case <-slf.systemSignal: + slf.shutdown(nil) } select { @@ -362,8 +367,13 @@ func (slf *Server) Ticker() *timer.Ticker { return slf.ticker } -// Shutdown 停止运行服务器 -func (slf *Server) Shutdown(err error, stack ...string) { +// Shutdown 主动停止运行服务器 +func (slf *Server) Shutdown() { + slf.systemSignal <- syscall.SIGQUIT +} + +// shutdown 停止运行服务器 +func (slf *Server) shutdown(err error, stack ...string) { slf.OnStopEvent() defer func() { if slf.multipleRuntimeErrorChan != nil { @@ -377,12 +387,10 @@ func (slf *Server) Shutdown(err error, stack ...string) { for _, cross := range slf.cross { cross.Release() } - if slf.initMessageChannel { - for _, messageChannel := range slf.messageChannel { - close(messageChannel) - } + if slf.messageChannel != nil { + close(slf.messageChannel) slf.messagePool.Close() - slf.initMessageChannel = false + slf.messageChannel = nil } if slf.grpcServer != nil && slf.isRunning { slf.grpcServer.GracefulStop() @@ -439,46 +447,41 @@ func (slf *Server) HttpRouter() gin.IRouter { return slf.ginServer } -// PushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求 -func (slf *Server) PushMessage(messageType MessageType, attrs ...any) { +// pushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求 +func (slf *Server) pushMessage(message *Message) { if slf.messagePool.IsClose() { + slf.messagePool.Release(message) return } - msg := slf.messagePool.Get() - msg.t = messageType - msg.attrs = attrs - if msg.t == MessageTypeError { - msg.attrs = append(msg.attrs, string(debug.Stack())) - } - for _, channel := range slf.messageChannel { - channel <- msg - break - } + slf.messageChannel <- message } -// PushCrossMessage 推送跨服消息到特定跨服的服务器中 -func (slf *Server) PushCrossMessage(crossName string, serverId int64, packet []byte) error { - if len(slf.cross) == 0 { - return ErrNoSupportCross +func (slf *Server) low(message *Message, present time.Time) { + cost := time.Since(present) + if cost > time.Millisecond*100 { + log.Warn("Server", zap.String("MessageType", messageNames[message.t]), zap.String("LowExecCost", cost.String())) + slf.OnMessageLowExecEvent(message, cost) } - cross, exist := slf.cross[crossName] - if !exist { - return ErrUnregisteredCrossName - } - return cross.PushMessage(serverId, packet) } // dispatchMessage 消息分发 -func (slf *Server) dispatchMessage(msg *Message, isRedirect bool) { - if slf.diversion != nil && isRedirect && msg.t == MessageTypePacket { - conn, packet, _ := msg.t.deconstructWebSocketPacket(msg.attrs...) - if redirect := slf.diversion(conn, packet); redirect != nil { - redirect <- func() { - slf.dispatchMessage(msg, false) +func (slf *Server) dispatchMessage(msg *Message) { + var ( + ctx context.Context + cancel context.CancelFunc + ) + if slf.deadlockDetect > 0 { + ctx, cancel = context.WithTimeout(context.Background(), slf.deadlockDetect) + go func() { + select { + case <-ctx.Done(): + if err := ctx.Err(); err == context.DeadlineExceeded { + log.Warn("Server", zap.String("MessageType", messageNames[msg.t]), zap.Any("SuspectedDeadlock", msg.attrs)) + } } - } - return + }() } + present := time.Now() defer func() { if err := recover(); err != nil { @@ -488,41 +491,59 @@ func (slf *Server) dispatchMessage(msg *Message, isRedirect bool) { } } - cost := time.Since(present) - if cost > time.Millisecond*100 { - log.Warn("Server", zap.String("MessageType", messageNames[msg.t]), zap.String("LowExecCost", cost.String()), zap.Any("MessageAttrs", msg.attrs)) - slf.OnMessageLowExecEvent(msg, cost) + if msg.t != MessageTypeAsync { + super.Handle(cancel) + slf.low(msg, present) + if !slf.isShutdown.Load() { + slf.messagePool.Release(msg) + } } - if !slf.isShutdown.Load() { - slf.messagePool.Release(msg) - } }() + var attrs = msg.attrs switch msg.t { case MessageTypePacket: - if slf.network == NetworkWebsocket { - conn, packet, messageType := msg.t.deconstructWebSocketPacket(msg.attrs...) - slf.OnConnectionReceiveWebsocketPacketEvent(conn, packet, messageType) - } else { - conn, packet := msg.t.deconstructPacket(msg.attrs...) - slf.OnConnectionReceivePacketEvent(conn, packet) - } + var packet = attrs[1].([]byte) + var wst = int(packet[len(packet)-1]) + slf.OnConnectionReceivePacketEvent(attrs[0].(*Conn), Packet{Data: packet[:len(packet)-1], WebsocketType: wst}) case MessageTypeError: - err, action, stack := msg.t.deconstructError(msg.attrs...) + err, action, stack := attrs[0].(error), attrs[1].(MessageErrorAction), attrs[2].(string) switch action { case MessageErrorActionNone: log.ErrorWithStack("Server", stack, zap.Error(err)) case MessageErrorActionShutdown: - slf.Shutdown(err, stack) + slf.shutdown(err, stack) default: log.Warn("Server", zap.String("not support message error action", action.String())) } case MessageTypeCross: - serverId, packet := msg.t.deconstructCross(msg.attrs...) - slf.OnReceiveCrossPacketEvent(serverId, packet) + slf.OnReceiveCrossPacketEvent(attrs[0].(int64), attrs[1].([]byte)) case MessageTypeTicker: - caller := msg.t.deconstructTicker(msg.attrs...) - caller() + attrs[0].(func())() + case MessageTypeAsync: + handle := attrs[0].(func() error) + callbacks := attrs[1].([]func(err error)) + if err := slf.ants.Submit(func() { + defer func() { + if err := recover(); err != nil { + log.Error("Server", zap.String("MessageType", messageNames[msg.t]), zap.Any("MessageAttrs", msg.attrs), zap.Any("error", err)) + if e, ok := err.(error); ok { + slf.OnMessageErrorEvent(msg, e) + } + } + super.Handle(cancel) + if !slf.isShutdown.Load() { + slf.messagePool.Release(msg) + } + }() + if err := handle(); err != nil { + for _, callback := range callbacks { + callback(err) + } + } + }); err != nil { + panic(err) + } default: log.Warn("Server", zap.String("not support message type", msg.t.String())) } diff --git a/server/server_example_test.go b/server/server_example_test.go new file mode 100644 index 0000000..7fd49af --- /dev/null +++ b/server/server_example_test.go @@ -0,0 +1,23 @@ +package server_test + +import ( + "github.com/kercylan98/minotaur/server" + "time" +) + +func ExampleNew() { + srv := server.New(server.NetworkWebsocket, + server.WithDeadlockDetect(time.Second*5), + ) + + srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet server.Packet) { + conn.Write(packet) + }) + + go func() { time.Sleep(1 * time.Second); srv.Shutdown() }() + if err := srv.Run(":9999"); err != nil { + panic(err) + } + + // Output: +} diff --git a/utils/log/log.go b/utils/log/log.go index 832a04e..cfab117 100644 --- a/utils/log/log.go +++ b/utils/log/log.go @@ -101,8 +101,16 @@ func getWriter(filename string, times int) io.Writer { return hook } -func Logger() *zap.Logger { - return logger +type MLogger struct { + *zap.Logger +} + +func (slf *MLogger) Printf(format string, args ...interface{}) { + slf.Info(fmt.Sprintf(format, args...)) +} + +func Logger() *MLogger { + return &MLogger{logger} } func Info(msg string, fields ...zap.Field) { diff --git a/utils/str/str.go b/utils/str/str.go index ddf83ee..f433268 100644 --- a/utils/str/str.go +++ b/utils/str/str.go @@ -1,5 +1,9 @@ package str +const ( + None = "" // 空字符串 +) + // FirstUpper 首字母大写 func FirstUpper(str string) string { var upperStr string