Merge branch 'develop'

This commit is contained in:
kercylan98 2023-07-07 16:29:52 +08:00
commit bd84ec778c
26 changed files with 312 additions and 602 deletions

View File

@ -3,6 +3,7 @@
[![Go doc](https://img.shields.io/badge/go.dev-reference-brightgreen?logo=go&logoColor=white&style=flat)](https://pkg.go.dev/github.com/kercylan98/minotaur)
![](https://img.shields.io/badge/Email-kercylan@gmail.com-green.svg?style=flat)
![](https://komarev.com/ghpvc/?username=kercylan98)
<a target="_blank" href="https://goreportcard.com/report/github.com/kercylan98/minotaur"><img src="https://goreportcard.com/badge/github.com/kercylan98/minotaur?style=flat-square" /></a>
Minotaur 是一个基于Golang 1.20 编写的服务端开发支持库,其中采用了大量泛型设计,用于游戏服务器开发。
@ -39,7 +40,6 @@ $ go get -u github.com/kercylan98/minotaur
## 用法
- 在`Minotaur`中大量使用了 **[泛型](https://go.dev/doc/tutorial/generics)** 、 **[观察者(事件)](https://www.runoob.com/design-pattern/observer-pattern.html)** 和 **[选项模式](https://juejin.cn/post/6844903729313873927)**,在使用前建议先进行相应了解;
- 更多的 **[示例](./examples)** 参考可在 [examples](./examples) 目录查阅;
- 项目文档可访问 **[pkg.go.dev](https://pkg.go.dev/github.com/kercylan98/minotaur)** 进行查阅;
### 本地文档

View File

@ -39,8 +39,6 @@ func NewLockstep[ClientID comparable, Command any](options ...LockstepOption[Cli
// - 自定帧序列化方式 WithLockstepSerialization
// - 从特定帧开始追帧
// - 兼容各种基于TCP/UDP/Unix的网络类型可通过客户端实现其他网络类型同步
//
// 可在 examples 目录下找到示例示例项目simple-server-lockstep
type Lockstep[ClientID comparable, Command any] struct {
clients *synchronization.Map[ClientID, component.LockstepClient[ClientID]] // 接受广播的客户端
frames *synchronization.Map[int, []Command] // 所有帧指令

View File

@ -1,2 +0,0 @@
// Package examples 提供了多种实现案例
package examples

View File

@ -1,96 +0,0 @@
// 该案例中将服务器消息通过多核的方式异步传输到房间中,在每个房间中单独同步处理维护消息。
package main
import (
"fmt"
"github.com/kercylan98/minotaur/game/builtin"
"github.com/kercylan98/minotaur/server"
"github.com/kercylan98/minotaur/utils/synchronization"
"strconv"
"strings"
)
type message struct {
conn *server.Conn
packet []byte
}
type Room struct {
*builtin.Room[string, *builtin.Player[string]]
channel chan *message // need close
pool *synchronization.Pool[*message]
}
func newRoom(guid int64) *Room {
room := &Room{
Room: builtin.NewRoom[string, *builtin.Player[string]](guid),
}
room.pool = synchronization.NewPool[*message](1024*100, func() *message {
return new(message)
}, func(data *message) {
data.conn = nil
data.packet = nil
})
room.channel = make(chan *message, 1024*100)
go func() {
for msg := range room.channel {
room.handePacket(msg.conn, msg.packet)
room.pool.Release(msg)
}
}()
return room
}
func (slf *Room) PushMessage(conn *server.Conn, packet []byte) {
msg := slf.pool.Get()
msg.conn = conn
msg.packet = packet
slf.channel <- msg
}
func (slf *Room) handePacket(conn *server.Conn, packet []byte) {
conn.WriteString(fmt.Sprintf("[%d] %s", slf.GetGuid(), string(packet)))
}
// 以房间为核心玩法的多核服务器实现
// - 服务器消息处理为异步执行
// - 由房间分发具体消息,在房间内所有消息为同步执行
func main() {
rooms := synchronization.NewMap[int64, *Room]()
srv := server.New(server.NetworkWebsocket,
server.WithWebsocketWriteMessageType(server.WebsocketMessageTypeText),
server.WithMultiCore(10),
)
srv.RegConnectionReceiveWebsocketPacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte, messageType int) {
p := strings.SplitN(string(packet), ":", 2)
roomId, err := strconv.ParseInt(p[0], 10, 64)
if err != nil {
conn.WriteString(fmt.Sprintf("wrong room id, err: %s", err.Error()))
return
}
// 假定命令格式 ${房间ID}:命令
switch p[1] {
case "create":
if !rooms.Exist(roomId) {
rooms.Set(roomId, newRoom(roomId))
conn.WriteString(fmt.Sprintf("create room[%d] success", roomId))
} else {
conn.WriteString(fmt.Sprintf("room[%d] existed", roomId))
}
default:
room, exist := rooms.GetExist(roomId)
if !exist {
rooms.Set(roomId, room)
conn.WriteString(fmt.Sprintf("room[%d] does not exist, create room please use ${roomId}:create", roomId))
} else {
room.PushMessage(conn, []byte(p[1]))
}
}
})
if err := srv.Run(":9999"); err != nil {
panic(err)
}
}

View File

@ -1,36 +0,0 @@
// 该案例实现了一个简单的聊天室功能
package main
import (
"fmt"
"github.com/kercylan98/minotaur/server"
"github.com/kercylan98/minotaur/utils/synchronization"
)
func main() {
connections := synchronization.NewMap[string, *server.Conn]()
srv := server.New(server.NetworkWebsocket, server.WithWebsocketWriteMessageType(server.WebsocketMessageTypeText))
srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) {
for _, c := range connections.Map() {
c.Write([]byte(fmt.Sprintf("%s 加入了聊天室", conn.GetID())))
}
connections.Set(conn.GetID(), conn)
conn.Write([]byte("欢迎加入"))
})
srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn, err any) {
if connections.DeleteExist(conn.GetID()) {
for id, c := range connections.Map() {
c.Write([]byte(fmt.Sprintf("%s 退出了聊天室", id)))
}
}
})
srv.RegConnectionReceiveWebsocketPacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte, messageType int) {
for _, c := range connections.Map() {
c.Write([]byte(fmt.Sprintf("%s: %s", conn.GetID(), string(packet))))
}
})
if err := srv.Run(":9999"); err != nil {
panic(err)
}
}

View File

@ -1,18 +0,0 @@
// 该案例中延时了控制台服务器的实现,支持运行中根据控制台指令执行额外的功能逻辑
package main
import (
"github.com/kercylan98/minotaur/server"
"github.com/kercylan98/minotaur/utils/log"
"go.uber.org/zap"
)
func main() {
srv := server.New(server.NetworkWebsocket)
srv.RegConsoleCommandEvent("test", func(srv *server.Server) {
log.Info("Console", zap.String("Info", "Test"))
})
if err := srv.Run(":9999"); err != nil {
panic(err)
}
}

View File

@ -1,27 +0,0 @@
package main
import (
"github.com/kercylan98/minotaur/server"
"github.com/kercylan98/minotaur/server/cross"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/timer"
"go.uber.org/zap"
"time"
)
func main() {
srv := server.New(server.NetworkWebsocket, server.WithCross("nats", 1, cross.NewNats("127.0.0.1:4222")), server.WithTicker(10, false))
srv.RegStartFinishEvent(func(srv *server.Server) {
srv.Ticker().Loop("CROSS", timer.Instantly, time.Second, timer.Forever, func() {
if err := srv.PushCrossMessage("nats", 2, []byte("I am cross 1")); err != nil {
panic(err)
}
})
})
srv.RegReceiveCrossPacketEvent(func(srv *server.Server, senderServerId int64, packet []byte) {
log.Info("Cross", zap.Int64("ServerID", senderServerId), zap.String("Packet", string(packet)))
})
if err := srv.Run(":9999"); err != nil {
panic(err)
}
}

View File

@ -1,27 +0,0 @@
package main
import (
"github.com/kercylan98/minotaur/server"
"github.com/kercylan98/minotaur/server/cross"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/timer"
"go.uber.org/zap"
"time"
)
func main() {
srv := server.New(server.NetworkWebsocket, server.WithCross("nats", 2, cross.NewNats("127.0.0.1:4222")), server.WithTicker(10, false))
srv.RegStartFinishEvent(func(srv *server.Server) {
srv.Ticker().Loop("CROSS", timer.Instantly, time.Second, timer.Forever, func() {
if err := srv.PushCrossMessage("nats", 1, []byte("I am cross 2")); err != nil {
panic(err)
}
})
})
srv.RegReceiveCrossPacketEvent(func(srv *server.Server, senderServerId int64, packet []byte) {
log.Info("Cross", zap.Int64("ServerID", senderServerId), zap.String("Packet", string(packet)))
})
if err := srv.Run(":19999"); err != nil {
panic(err)
}
}

View File

@ -1,2 +0,0 @@
// 该案例中演示了两个跨服相互通过网络进行数据传输的实现
package main

View File

@ -1,16 +0,0 @@
// 该案例中实现了一个简单的回响服务器
package main
import (
"github.com/kercylan98/minotaur/server"
)
func main() {
srv := server.New(server.NetworkWebsocket)
srv.RegConnectionReceiveWebsocketPacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte, messageType int) {
conn.Write(packet, messageType)
})
if err := srv.Run(":9999"); err != nil {
panic(err)
}
}

View File

@ -1,50 +0,0 @@
package main
import (
"github.com/kercylan98/minotaur/component/components"
"github.com/kercylan98/minotaur/game/builtin"
"github.com/kercylan98/minotaur/server"
"github.com/kercylan98/minotaur/utils/synchronization"
)
type Player struct {
*builtin.Player[string]
}
type Command struct {
CMD int
Data string
}
// 访问http://www.websocket-test.com/
// - 使用多个页面连接到服务器后任一页面发送start即可开启帧同步
func main() {
players := synchronization.NewMap[string, *Player]()
srv := server.New(server.NetworkWebsocket, server.WithWebsocketWriteMessageType(server.WebsocketMessageTypeText))
lockstep := components.NewLockstep[string, *Command]()
srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) {
player := &Player{Player: builtin.NewPlayer[string](conn.GetID(), conn)}
players.Set(conn.GetID(), player)
lockstep.JoinClient(player)
})
srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn, err any) {
players.Delete(conn.GetID())
lockstep.LeaveClient(conn.GetID())
if players.Size() == 0 {
lockstep.StopBroadcast()
}
})
srv.RegConnectionReceiveWebsocketPacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte, messageType int) {
switch string(packet) {
case "start":
lockstep.StartBroadcast()
default:
lockstep.AddCommand(&Command{CMD: 1, Data: string(packet)})
}
})
if err := srv.Run(":9999"); err != nil {
panic(err)
}
}

View File

@ -30,8 +30,8 @@ func (slf *Player[ID]) UseConn(conn *server.Conn) {
}
// Send 向该玩家发送数据
func (slf *Player[ID]) Send(packet []byte, messageType ...int) {
slf.conn.Write(packet, messageType...)
func (slf *Player[ID]) Send(packet server.Packet) {
slf.conn.Write(packet)
}
// Close 关闭玩家

1
go.mod
View File

@ -44,6 +44,7 @@ require (
github.com/nats-io/nats-server/v2 v2.9.16 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/panjf2000/ants/v2 v2.8.1 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/smartystreets/assertions v1.13.1 // indirect

3
go.sum
View File

@ -112,6 +112,8 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/panjf2000/ants/v2 v2.4.7 h1:MZnw2JRyTJxFwtaMtUJcwE618wKD04POWk2gwwP4E2M=
github.com/panjf2000/ants/v2 v2.4.7/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/panjf2000/ants/v2 v2.8.1 h1:C+n/f++aiW8kHCExKlpX6X+okmxKXP7DWLutxuAPuwQ=
github.com/panjf2000/ants/v2 v2.8.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
github.com/panjf2000/gnet v1.6.6 h1:P6bApc54hnVcJVgH+SMe41mn47ECCajB6E/dKq27Y0c=
github.com/panjf2000/gnet v1.6.6/go.mod h1:KcOU7QsCaCBjeD5kyshBIamG3d9kAQtlob4Y0v0E+sc=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
@ -228,6 +230,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View File

@ -156,29 +156,15 @@ func (slf *Conn) IsWebsocket() bool {
return slf.server.network == NetworkWebsocket
}
// WriteString 向连接中写入字符串
// - 通过转换为[]byte调用 *Conn.Write
func (slf *Conn) WriteString(data string, messageType ...int) {
slf.Write([]byte(data), messageType...)
}
// WriteStringWithCallback 与 WriteString 相同,但是会在写入完成后调用 callback
// - 当 callback 为 nil 时,与 WriteString 相同
func (slf *Conn) WriteStringWithCallback(data string, callback func(err error), messageType ...int) {
slf.WriteWithCallback([]byte(data), callback, messageType...)
}
// Write 向连接中写入数据
// - messageType: websocket模式中指定消息类型
func (slf *Conn) Write(data []byte, messageType ...int) {
func (slf *Conn) Write(packet Packet) {
if slf.packetPool == nil {
return
}
cp := slf.packetPool.Get()
if len(messageType) > 0 {
cp.websocketMessageType = messageType[0]
}
cp.packet = data
cp.websocketMessageType = packet.WebsocketType
cp.packet = packet.Data
slf.mutex.Lock()
slf.packets = append(slf.packets, cp)
slf.mutex.Unlock()
@ -186,15 +172,13 @@ func (slf *Conn) Write(data []byte, messageType ...int) {
// WriteWithCallback 与 Write 相同,但是会在写入完成后调用 callback
// - 当 callback 为 nil 时,与 Write 相同
func (slf *Conn) WriteWithCallback(data []byte, callback func(err error), messageType ...int) {
func (slf *Conn) WriteWithCallback(packet Packet, callback func(err error), messageType ...int) {
if slf.packetPool == nil {
return
}
cp := slf.packetPool.Get()
if len(messageType) > 0 {
cp.websocketMessageType = messageType[0]
}
cp.packet = data
cp.websocketMessageType = packet.WebsocketType
cp.packet = packet.Data
cp.callback = callback
slf.mutex.Lock()
slf.packets = append(slf.packets, cp)
@ -233,18 +217,8 @@ func (slf *Conn) writeLoop(wait *sync.WaitGroup) {
slf.mutex.Unlock()
for i := 0; i < len(packets); i++ {
data := packets[i]
//if len(data.packet) == 0 {
// for _, packet := range packets {
// slf.packetPool.Release(packet)
// }
// slf.Close()
// return
//}
var err error
if slf.IsWebsocket() {
if data.websocketMessageType <= 0 {
data.websocketMessageType = slf.server.websocketWriteMessageType
}
err = slf.ws.WriteMessage(data.websocketMessageType, data.packet)
} else {
if slf.gn != nil {

View File

@ -3,22 +3,12 @@ package server
import "errors"
var (
ErrConstructed = errors.New("the Server must be constructed using the server.New function")
ErrCanNotSupportNetwork = errors.New("can not support network")
ErrMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte")
ErrWebsocketMessageTypePacketAttrs = errors.New("MessageTypePacket must contain *Conn and []byte and int(MessageType)")
ErrMessageTypeErrorAttrs = errors.New("MessageTypePacket must contain error and MessageErrorAction")
ErrMessageTypeCrossErrorAttrs = errors.New("MessageTypeCross must contain int64(server id) and []byte")
ErrMessageTypeTickerErrorAttrs = errors.New("MessageTypeTicker must contain func()")
ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported")
ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported")
ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp")
ErrWebsocketMessageTypeException = errors.New("unknown message type, will not work")
ErrNotWebsocketUseMessageType = errors.New("message type filtering only supports websocket and does not take effect")
ErrWebsocketIllegalMessageType = errors.New("illegal message type")
ErrPleaseUseWebsocketHandle = errors.New("in Websocket mode, please use the RegConnectionReceiveWebsocketPacketEvent function to register")
ErrPleaseUseOrdinaryPacketHandle = errors.New("non Websocket mode, please use the RegConnectionReceivePacketEvent function to register")
ErrNoSupportCross = errors.New("the server does not support GetID or PushCrossMessage, please use the WithCross option to create the server")
ErrNoSupportTicker = errors.New("the server does not support Ticker, please use the WithTicker option to create the server")
ErrUnregisteredCrossName = errors.New("unregistered cross name, please use the WithCross option to create the server")
ErrConstructed = errors.New("the Server must be constructed using the server.New function")
ErrCanNotSupportNetwork = errors.New("can not support network")
ErrNetworkOnlySupportHttp = errors.New("the current network mode is not compatible with HttpRouter, only NetworkHttp is supported")
ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported")
ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp")
ErrWebsocketIllegalMessageType = errors.New("illegal message type")
ErrNoSupportCross = errors.New("the server does not support GetID or PushCrossMessage, please use the WithCross option to create the server")
ErrNoSupportTicker = errors.New("the server does not support Ticker, please use the WithTicker option to create the server")
)

View File

@ -13,8 +13,7 @@ import (
type StartBeforeEventHandle func(srv *Server)
type StartFinishEventHandle func(srv *Server)
type StopEventHandle func(srv *Server)
type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet []byte)
type ConnectionReceiveWebsocketPacketEventHandle func(srv *Server, conn *Conn, packet []byte, messageType int)
type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet Packet)
type ConnectionOpenedEventHandle func(srv *Server, conn *Conn)
type ConnectionClosedEventHandle func(srv *Server, conn *Conn, err any)
type ReceiveCrossPacketEventHandle func(srv *Server, senderServerId int64, packet []byte)
@ -24,16 +23,15 @@ type ConsoleCommandEventHandle func(srv *Server)
type event struct {
*Server
startBeforeEventHandles []StartBeforeEventHandle
startFinishEventHandles []StartFinishEventHandle
stopEventHandles []StopEventHandle
connectionReceivePacketEventHandles []ConnectionReceivePacketEventHandle
connectionReceiveWebsocketPacketEventHandles []ConnectionReceiveWebsocketPacketEventHandle
connectionOpenedEventHandles []ConnectionOpenedEventHandle
connectionClosedEventHandles []ConnectionClosedEventHandle
receiveCrossPacketEventHandles []ReceiveCrossPacketEventHandle
messageErrorEventHandles []MessageErrorEventHandle
messageLowExecEventHandles []MessageLowExecEventHandle
startBeforeEventHandles []StartBeforeEventHandle
startFinishEventHandles []StartFinishEventHandle
stopEventHandles []StopEventHandle
connectionReceivePacketEventHandles []ConnectionReceivePacketEventHandle
connectionOpenedEventHandles []ConnectionOpenedEventHandle
connectionClosedEventHandles []ConnectionClosedEventHandle
receiveCrossPacketEventHandles []ReceiveCrossPacketEventHandle
messageErrorEventHandles []MessageErrorEventHandle
messageLowExecEventHandles []MessageLowExecEventHandle
consoleCommandEventHandles map[string][]ConsoleCommandEventHandle
@ -75,8 +73,8 @@ func (slf *event) OnConsoleCommandEvent(command string) {
if !exist {
switch command {
case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN":
log.Info("Console", zap.String("Receive", command), zap.String("Action", "Shutdown"))
slf.Server.Shutdown(nil)
log.Info("Console", zap.String("Receive", command), zap.String("Action", "shutdown"))
slf.Server.shutdown(nil)
return
}
log.Warn("Server", zap.String("Command", "unregistered"))
@ -147,34 +145,16 @@ func (slf *event) RegConnectionReceivePacketEvent(handle ConnectionReceivePacket
if slf.network == NetworkHttp {
panic(ErrNetworkIncompatibleHttp)
}
if slf.network == NetworkWebsocket {
panic(ErrPleaseUseWebsocketHandle)
}
slf.connectionReceivePacketEventHandles = append(slf.connectionReceivePacketEventHandles, handle)
log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String()))
}
func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet []byte) {
func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet Packet) {
for _, handle := range slf.connectionReceivePacketEventHandles {
handle(slf.Server, conn, packet)
}
}
// RegConnectionReceiveWebsocketPacketEvent 在接收到Websocket数据包时将立刻执行被注册的事件处理函数
func (slf *event) RegConnectionReceiveWebsocketPacketEvent(handle ConnectionReceiveWebsocketPacketEventHandle) {
if slf.network != NetworkWebsocket {
panic(ErrPleaseUseOrdinaryPacketHandle)
}
slf.connectionReceiveWebsocketPacketEventHandles = append(slf.connectionReceiveWebsocketPacketEventHandles, handle)
log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String()))
}
func (slf *event) OnConnectionReceiveWebsocketPacketEvent(conn *Conn, packet []byte, messageType int) {
for _, handle := range slf.connectionReceiveWebsocketPacketEventHandles {
handle(slf.Server, conn, packet, messageType)
}
}
// RegReceiveCrossPacketEvent 在接收到跨服数据包时将立即执行被注册的事件处理函数
func (slf *event) RegReceiveCrossPacketEvent(handle ReceiveCrossPacketEventHandle) {
slf.receiveCrossPacketEventHandles = append(slf.receiveCrossPacketEventHandles, handle)
@ -215,15 +195,8 @@ func (slf *event) check() {
switch slf.network {
case NetworkHttp, NetworkGRPC:
default:
switch slf.network {
case NetworkWebsocket:
if len(slf.connectionReceiveWebsocketPacketEventHandles) == 0 {
log.Warn("Server", zap.String("ConnectionReceiveWebsocketPacketEvent", "invalid server, no packets processed"))
}
default:
if len(slf.connectionReceivePacketEventHandles) == 0 {
log.Warn("Server", zap.String("ConnectionReceivePacketEvent", "invalid server, no packets processed"))
}
if len(slf.connectionReceivePacketEventHandles) == 0 {
log.Warn("Server", zap.String("ConnectionReceivePacketEvent", "invalid server, no packets processed"))
}
}

View File

@ -23,7 +23,7 @@ func (slf *gNet) OnShutdown(server gnet.Server) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := gnet.Stop(ctx, fmt.Sprintf("%s://%s", slf.network, slf.addr)); err != nil {
log.Error("Server", zap.String("Minotaur GNet Server", "Shutdown"), zap.Error(err))
log.Error("Server", zap.String("Minotaur GNet Server", "shutdown"), zap.Error(err))
}
}
@ -48,7 +48,7 @@ func (slf *gNet) AfterWrite(c gnet.Conn, b []byte) {
}
func (slf *gNet) React(packet []byte, c gnet.Conn) (out []byte, action gnet.Action) {
slf.Server.PushMessage(MessageTypePacket, c.Context().(*Conn), bytes.Clone(packet))
PushPacketMessage(slf.Server, c.Context().(*Conn), append(bytes.Clone(packet), 0))
return nil, gnet.None
}

View File

@ -1,24 +1,24 @@
package server
import (
"runtime/debug"
)
const (
// MessageTypePacket 数据包消息类型:该类型的数据将被发送到 ConnectionReceivePacketEvent 进行处理
// - *server.Conn
// - []byte
MessageTypePacket MessageType = iota
// MessageTypeError 错误消息类型:根据不同的错误状态,将交由 Server 进行统一处理
// - error
// - server.MessageErrorAction
MessageTypeError
// MessageTypeCross 跨服消息类型:将被推送到跨服的 Cross 实现中进行处理
// - int64(serverId)
// - []byte
MessageTypeCross
// MessageTypeTicker 定时器消息类型
// - func()
MessageTypeTicker
// MessageTypeAsync 异步消息类型
MessageTypeAsync
)
var messageNames = map[MessageType]string{
@ -26,16 +26,17 @@ var messageNames = map[MessageType]string{
MessageTypeError: "MessageTypeError",
MessageTypeCross: "MessageTypeCross",
MessageTypeTicker: "MessageTypeTicker",
MessageTypeAsync: "MessageTypeAsync",
}
const (
MessageErrorActionNone MessageErrorAction = iota // 错误消息类型操作:将不会被进行任何特殊处理,仅进行日志输出
MessageErrorActionShutdown // 错误消息类型操作:当接收到该类型的操作时,服务器将执行 Server.Shutdown 函数
MessageErrorActionShutdown // 错误消息类型操作:当接收到该类型的操作时,服务器将执行 Server.shutdown 函数
)
var messageErrorActionNames = map[MessageErrorAction]string{
MessageErrorActionNone: "None",
MessageErrorActionShutdown: "Shutdown",
MessageErrorActionShutdown: "shutdown",
}
type (
@ -59,73 +60,53 @@ func (slf MessageType) String() string {
return messageNames[slf]
}
func (slf MessageType) deconstructWebSocketPacket(attrs ...any) (conn *Conn, packet []byte, messageType int) {
if len(attrs) != 3 {
panic(ErrWebsocketMessageTypePacketAttrs)
}
var ok bool
if conn, ok = attrs[0].(*Conn); !ok {
panic(ErrWebsocketMessageTypePacketAttrs)
}
if packet, ok = attrs[1].([]byte); !ok {
panic(ErrWebsocketMessageTypePacketAttrs)
}
if messageType, ok = attrs[2].(int); !ok {
panic(ErrWebsocketMessageTypePacketAttrs)
}
return
// PushPacketMessage 向特定服务器中推送 MessageTypePacket 消息
func PushPacketMessage(srv *Server, conn *Conn, packet []byte) {
msg := srv.messagePool.Get()
msg.t = MessageTypePacket
msg.attrs = []any{conn, packet}
srv.pushMessage(msg)
}
func (slf MessageType) deconstructPacket(attrs ...any) (conn *Conn, packet []byte) {
if len(attrs) != 2 {
panic(ErrMessageTypePacketAttrs)
}
var ok bool
if conn, ok = attrs[0].(*Conn); !ok {
panic(ErrMessageTypePacketAttrs)
}
if packet, ok = attrs[1].([]byte); !ok {
panic(ErrMessageTypePacketAttrs)
}
return
// PushErrorMessage 向特定服务器中推送 MessageTypeError 消息
func PushErrorMessage(srv *Server, err error, action MessageErrorAction) {
msg := srv.messagePool.Get()
msg.t = MessageTypeError
msg.attrs = []any{err, action, string(debug.Stack())}
srv.pushMessage(msg)
}
func (slf MessageType) deconstructError(attrs ...any) (err error, action MessageErrorAction, stack string) {
if len(attrs) != 3 {
panic(ErrMessageTypeErrorAttrs)
// PushCrossMessage 向特定服务器中推送 MessageTypeCross 消息
func PushCrossMessage(srv *Server, crossName string, serverId int64, packet []byte) {
if serverId == srv.id {
msg := srv.messagePool.Get()
msg.t = MessageTypeCross
msg.attrs = []any{serverId, packet}
srv.pushMessage(msg)
} else {
if len(srv.cross) == 0 {
return
}
cross, exist := srv.cross[crossName]
if !exist {
return
}
_ = cross.PushMessage(serverId, packet)
}
var ok bool
if err, ok = attrs[0].(error); !ok {
panic(ErrMessageTypeErrorAttrs)
}
if action, ok = attrs[1].(MessageErrorAction); !ok {
panic(ErrMessageTypeErrorAttrs)
}
stack = attrs[2].(string)
return
}
func (slf MessageType) deconstructCross(attrs ...any) (serverId int64, packet []byte) {
if len(attrs) != 2 {
panic(ErrMessageTypeCrossErrorAttrs)
}
var ok bool
if serverId, ok = attrs[0].(int64); !ok {
panic(ErrMessageTypeCrossErrorAttrs)
}
if packet, ok = attrs[1].([]byte); !ok {
panic(ErrMessageTypeCrossErrorAttrs)
}
return
// PushTickerMessage 向特定服务器中推送 MessageTypeTicker 消息
func PushTickerMessage(srv *Server, caller func()) {
msg := srv.messagePool.Get()
msg.t = MessageTypeTicker
msg.attrs = []any{caller}
srv.pushMessage(msg)
}
func (slf MessageType) deconstructTicker(attrs ...any) (caller func()) {
if len(attrs) != 1 {
panic(ErrMessageTypeTickerErrorAttrs)
}
var ok bool
if caller, ok = attrs[0].(func()); !ok {
panic(ErrMessageTypeTickerErrorAttrs)
}
return
// PushAsyncMessage 向特定服务器中推送 MessageTypeAsync 消息
func PushAsyncMessage(srv *Server, caller func() error, callback ...func(err error)) {
msg := srv.messagePool.Get()
msg.t = MessageTypeAsync
msg.attrs = []any{caller, callback}
srv.pushMessage(msg)
}

View File

@ -60,7 +60,7 @@ func (slf *MultipleServer) Run() {
case err := <-exceptionChannel:
for len(slf.servers) > 0 {
server := slf.servers[0]
server.Shutdown(err)
server.shutdown(err)
slf.servers = slf.servers[1:]
}
break
@ -68,7 +68,7 @@ func (slf *MultipleServer) Run() {
for len(slf.servers) > 0 {
server := slf.servers[0]
server.multipleRuntimeErrorChan = nil
server.Shutdown(nil)
server.shutdown(nil)
slf.servers = slf.servers[1:]
}
break
@ -76,7 +76,7 @@ func (slf *MultipleServer) Run() {
for len(slf.servers) > 0 {
server := slf.servers[0]
server.multipleRuntimeErrorChan = nil
server.Shutdown(nil)
server.shutdown(nil)
slf.servers = slf.servers[1:]
}
break

View File

@ -23,6 +23,41 @@ const (
)
type Option func(srv *Server)
type option struct {
disableAnts bool // 是否禁用协程池
antsPoolSize int // 协程池大小
}
type runtime struct {
deadlockDetect time.Duration // 是否开启死锁检测
}
// WithDeadlockDetect 通过死锁、死循环、永久阻塞检测的方式创建服务器
// - 当检测到死锁、死循环、永久阻塞时,服务器将会生成 WARN 类型的日志,关键字为 "SuspectedDeadlock"
func WithDeadlockDetect(t time.Duration) Option {
return func(srv *Server) {
if t > 0 {
srv.deadlockDetect = t
log.Info("DeadlockDetect", zap.Any("Time", t))
}
}
}
// WithDisableAsyncMessage 通过禁用异步消息的方式创建服务器
func WithDisableAsyncMessage() Option {
return func(srv *Server) {
srv.disableAnts = true
}
}
// WithAsyncPoolSize 通过指定异步消息池大小的方式创建服务器
// - 当通过 WithDisableAsyncMessage 禁用异步消息时,此选项无效
// - 默认值为 256
func WithAsyncPoolSize(size int) Option {
return func(srv *Server) {
srv.antsPoolSize = size
}
}
// WithWebsocketReadDeadline 设置 Websocket 读取超时时间
// - 默认: 30 * time.Second
@ -36,16 +71,6 @@ func WithWebsocketReadDeadline(t time.Duration) Option {
}
}
// WithDiversion 通过分流的方式创建服务器
// - diversion分流函数返回一个函数通道用于接收分流的消息
// - 需要确保能够通过 conn 和 packet 确定分流通道
// - 多核模式下将导致消息顺序不一致,通过结果依然是单核处理的,因为分流通道仅有一个
func WithDiversion(diversion func(conn ConnReadonly, packet []byte) chan func()) Option {
return func(srv *Server) {
srv.diversion = diversion
}
}
// WithTicker 通过定时器创建服务器,为服务器添加定时器功能
// - autonomy定时器是否独立运行独立运行的情况下不会作为服务器消息运行会导致并发问题
// - 多核与分流情况下需要考虑是否有必要 autonomy
@ -55,7 +80,7 @@ func WithTicker(size int, autonomy bool) Option {
srv.ticker = timer.GetTicker(size)
} else {
srv.ticker = timer.GetTicker(size, timer.WithCaller(func(name string, caller func()) {
srv.PushMessage(MessageTypeTicker, caller)
PushTickerMessage(srv, caller)
}))
}
}
@ -74,7 +99,10 @@ func WithCross(crossName string, serverId int64, cross Cross) Option {
}
srv.cross[crossName] = cross
err := cross.Init(srv, func(serverId int64, packet []byte) {
srv.PushMessage(MessageTypeCross, serverId, packet)
msg := srv.messagePool.Get()
msg.t = MessageTypeCross
msg.attrs = []any{serverId, packet}
srv.pushMessage(msg)
})
if err != nil {
log.Info("Cross", zap.Int64("ServerID", serverId), zap.String("Cross", reflect.TypeOf(cross).String()), zap.String("State", "WaitNatsRun"))
@ -115,24 +143,10 @@ func WithProd() Option {
}
}
// WithWebsocketWriteMessageType 设置客户端写入的Websocket消息类型
// - 默认: WebsocketMessageTypeBinary
func WithWebsocketWriteMessageType(messageType int) Option {
return func(srv *Server) {
switch messageType {
case WebsocketMessageTypeText, WebsocketMessageTypeBinary, WebsocketMessageTypeClose, WebsocketMessageTypePing, WebsocketMessageTypePong:
srv.websocketWriteMessageType = messageType
default:
log.Warn("WithWebsocketWriteMessageType", zap.Int("MessageType", messageType), zap.Error(ErrWebsocketMessageTypeException))
}
}
}
// WithWebsocketMessageType 设置仅支持特定类型的Websocket消息
func WithWebsocketMessageType(messageTypes ...int) Option {
return func(srv *Server) {
if srv.network != NetworkWebsocket {
log.Warn("WitchWebsocketMessageType", zap.String("Network", string(srv.network)), zap.Error(ErrNotWebsocketUseMessageType))
return
}
var supports = make(map[int]bool)
@ -140,8 +154,6 @@ func WithWebsocketMessageType(messageTypes ...int) Option {
switch messageType {
case WebsocketMessageTypeText, WebsocketMessageTypeBinary, WebsocketMessageTypeClose, WebsocketMessageTypePing, WebsocketMessageTypePong:
supports[messageType] = true
default:
log.Warn("WitchWebsocketMessageType", zap.Int("MessageType", messageType), zap.Error(ErrWebsocketMessageTypeException))
}
}
srv.supportMessageTypes = supports
@ -159,17 +171,3 @@ func WithMessageBufferSize(size int) Option {
srv.messagePoolSize = size
}
}
// WithMultiCore 通过特定核心数量运行服务器,默认为单核
// - count > 1 的情况下,将会有对应数量的 goroutine 来处理消息
// - 注意HTTP和GRPC网络模式下不会生效
// - 在需要分流的场景推荐采用多核模式,如游戏以房间的形式进行,每个房间互不干扰,这种情况下便可以每个房间单独维护数据包消息进行处理
func WithMultiCore(count int) Option {
return func(srv *Server) {
srv.core = count
if srv.core < 1 {
log.Warn("WithMultiCore", zap.Int("count", count), zap.String("tips", "wrong core count configuration, corrected to 1, currently in single-core mode"))
srv.core = 1
}
}
}

10
server/packet.go Normal file
View File

@ -0,0 +1,10 @@
package server
type Packet struct {
WebsocketType int
Data []byte
}
func (slf Packet) String() string {
return string(slf.Data)
}

View File

@ -10,6 +10,7 @@ import (
"github.com/kercylan98/minotaur/utils/synchronization"
"github.com/kercylan98/minotaur/utils/timer"
"github.com/kercylan98/minotaur/utils/times"
"github.com/panjf2000/ants/v2"
"github.com/panjf2000/gnet"
"github.com/panjf2000/gnet/pkg/logging"
"github.com/xtaci/kcp-go/v5"
@ -19,7 +20,6 @@ import (
"net/http"
"os"
"os/signal"
"runtime/debug"
"strings"
"sync/atomic"
"syscall"
@ -29,12 +29,12 @@ import (
// New 根据特定网络类型创建一个服务器
func New(network Network, options ...Option) *Server {
server := &Server{
event: &event{},
network: network,
options: options,
core: 1,
closeChannel: make(chan struct{}, 1),
websocketWriteMessageType: WebsocketMessageTypeBinary,
event: &event{},
runtime: &runtime{},
option: &option{},
network: network,
closeChannel: make(chan struct{}, 1),
systemSignal: make(chan os.Signal, 1),
}
server.event.Server = server
@ -53,37 +53,49 @@ func New(network Network, options ...Option) *Server {
for _, option := range options {
option(server)
}
if !server.disableAnts {
if server.antsPoolSize <= 0 {
server.antsPoolSize = 256
}
var err error
server.ants, err = ants.NewPool(server.antsPoolSize, ants.WithLogger(log.Logger()))
if err != nil {
panic(err)
}
}
server.option = nil
return server
}
// Server 网络服务器
type Server struct {
*event // 事件
cross map[string]Cross // 跨服
id int64 // 服务器id
network Network // 网络类型
addr string // 侦听地址
options []Option // 选项
ginServer *gin.Engine // HTTP模式下的路由器
httpServer *http.Server // HTTP模式下的服务器
grpcServer *grpc.Server // GRPC模式下的服务器
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件
isRunning bool // 是否正在运行
isShutdown atomic.Bool // 是否已关闭
closeChannel chan struct{} // 关闭信号
diversion func(conn ConnReadonly, packet []byte) chan func() // 分流器
*event // 事件
*runtime // 运行时
*option // 可选项
systemSignal chan os.Signal // 系统信号
cross map[string]Cross // 跨服
id int64 // 服务器id
network Network // 网络类型
addr string // 侦听地址
ginServer *gin.Engine // HTTP模式下的路由器
httpServer *http.Server // HTTP模式下的服务器
grpcServer *grpc.Server // GRPC模式下的服务器
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件
isRunning bool // 是否正在运行
isShutdown atomic.Bool // 是否已关闭
closeChannel chan struct{} // 关闭信号
ants *ants.Pool // 协程池
gServer *gNet // TCP或UDP模式下的服务器
messagePool *synchronization.Pool[*Message] // 消息池
messagePoolSize int // 消息池大小
messageChannel map[int]chan *Message // 消息管道
initMessageChannel bool // 消息管道是否已经初始化
multiple *MultipleServer // 多服务器模式下的服务器
prod bool // 是否为生产模式
core int // 消息处理核心数
websocketWriteMessageType int // websocket写入的消息类型
ticker *timer.Ticker // 定时器
gServer *gNet // TCP或UDP模式下的服务器
messagePool *synchronization.Pool[*Message] // 消息池
messagePoolSize int // 消息池大小
messageChannel chan *Message // 消息管道
multiple *MultipleServer // 多服务器模式下的服务器
prod bool // 是否为生产模式
ticker *timer.Ticker // 定时器
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
@ -110,7 +122,6 @@ func (slf *Server) Run(addr string) error {
slf.addr = addr
var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr)
var connectionInitHandle = func(callback func()) {
slf.initMessageChannel = true
if slf.messagePoolSize <= 0 {
slf.messagePoolSize = 100
}
@ -123,24 +134,18 @@ func (slf *Server) Run(addr string) error {
data.attrs = nil
},
)
slf.messageChannel = map[int]chan *Message{}
for i := 0; i < slf.core; i++ {
slf.messageChannel[i] = make(chan *Message, 4096*1000)
}
slf.messageChannel = make(chan *Message, 4096*1000)
if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC {
slf.gServer = &gNet{Server: slf}
}
if callback != nil {
go callback()
}
for _, messageChannel := range slf.messageChannel {
messageChannel := messageChannel
go func() {
for message := range messageChannel {
slf.dispatchMessage(message, slf.diversion != nil)
}
}()
}
go func() {
for message := range slf.messageChannel {
slf.dispatchMessage(message)
}
}()
}
switch slf.network {
@ -155,7 +160,7 @@ func (slf *Server) Run(addr string) error {
slf.OnStartBeforeEvent()
if err := slf.grpcServer.Serve(listener); err != nil {
slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
PushErrorMessage(slf, err, MessageErrorActionShutdown)
}
}()
case NetworkTcp, NetworkTcp4, NetworkTcp6, NetworkUdp, NetworkUdp4, NetworkUdp6, NetworkUnix:
@ -169,7 +174,7 @@ func (slf *Server) Run(addr string) error {
gnet.WithMulticore(true),
); err != nil {
slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
PushErrorMessage(slf, err, MessageErrorActionShutdown)
}
})
case NetworkKcp:
@ -202,7 +207,7 @@ func (slf *Server) Run(addr string) error {
if err != nil {
panic(err)
}
slf.PushMessage(MessageTypePacket, conn, buf[:n])
PushPacketMessage(slf, conn, buf[:n])
}
}(conn)
}
@ -220,12 +225,12 @@ func (slf *Server) Run(addr string) error {
if len(slf.certFile)+len(slf.keyFile) > 0 {
if err := slf.httpServer.ListenAndServeTLS(slf.certFile, slf.keyFile); err != nil {
slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
PushErrorMessage(slf, err, MessageErrorActionShutdown)
}
} else {
if err := slf.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
PushErrorMessage(slf, err, MessageErrorActionShutdown)
}
}
@ -286,7 +291,7 @@ func (slf *Server) Run(addr string) error {
if len(slf.supportMessageTypes) > 0 && !slf.supportMessageTypes[messageType] {
panic(ErrWebsocketIllegalMessageType)
}
slf.PushMessage(MessageTypePacket, conn, packet, messageType)
PushPacketMessage(slf, conn, append(packet, byte(messageType)))
}
})
go func() {
@ -295,12 +300,12 @@ func (slf *Server) Run(addr string) error {
if len(slf.certFile)+len(slf.keyFile) > 0 {
if err := http.ListenAndServeTLS(slf.addr, slf.certFile, slf.keyFile, nil); err != nil {
slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
PushErrorMessage(slf, err, MessageErrorActionShutdown)
}
} else {
if err := http.ListenAndServe(slf.addr, nil); err != nil {
slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown)
PushErrorMessage(slf, err, MessageErrorActionShutdown)
}
}
@ -318,11 +323,11 @@ func (slf *Server) Run(addr string) error {
)
log.Info("Server", zap.String("Minotaur Server", "===================================================================="))
slf.OnStartFinishEvent()
systemSignal := make(chan os.Signal, 1)
signal.Notify(systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
signal.Notify(slf.systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
select {
case <-systemSignal:
slf.Shutdown(nil)
case <-slf.systemSignal:
slf.shutdown(nil)
}
select {
@ -362,8 +367,13 @@ func (slf *Server) Ticker() *timer.Ticker {
return slf.ticker
}
// Shutdown 停止运行服务器
func (slf *Server) Shutdown(err error, stack ...string) {
// Shutdown 主动停止运行服务器
func (slf *Server) Shutdown() {
slf.systemSignal <- syscall.SIGQUIT
}
// shutdown 停止运行服务器
func (slf *Server) shutdown(err error, stack ...string) {
slf.OnStopEvent()
defer func() {
if slf.multipleRuntimeErrorChan != nil {
@ -377,12 +387,10 @@ func (slf *Server) Shutdown(err error, stack ...string) {
for _, cross := range slf.cross {
cross.Release()
}
if slf.initMessageChannel {
for _, messageChannel := range slf.messageChannel {
close(messageChannel)
}
if slf.messageChannel != nil {
close(slf.messageChannel)
slf.messagePool.Close()
slf.initMessageChannel = false
slf.messageChannel = nil
}
if slf.grpcServer != nil && slf.isRunning {
slf.grpcServer.GracefulStop()
@ -439,46 +447,41 @@ func (slf *Server) HttpRouter() gin.IRouter {
return slf.ginServer
}
// PushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求
func (slf *Server) PushMessage(messageType MessageType, attrs ...any) {
// pushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求
func (slf *Server) pushMessage(message *Message) {
if slf.messagePool.IsClose() {
slf.messagePool.Release(message)
return
}
msg := slf.messagePool.Get()
msg.t = messageType
msg.attrs = attrs
if msg.t == MessageTypeError {
msg.attrs = append(msg.attrs, string(debug.Stack()))
}
for _, channel := range slf.messageChannel {
channel <- msg
break
}
slf.messageChannel <- message
}
// PushCrossMessage 推送跨服消息到特定跨服的服务器中
func (slf *Server) PushCrossMessage(crossName string, serverId int64, packet []byte) error {
if len(slf.cross) == 0 {
return ErrNoSupportCross
func (slf *Server) low(message *Message, present time.Time) {
cost := time.Since(present)
if cost > time.Millisecond*100 {
log.Warn("Server", zap.String("MessageType", messageNames[message.t]), zap.String("LowExecCost", cost.String()))
slf.OnMessageLowExecEvent(message, cost)
}
cross, exist := slf.cross[crossName]
if !exist {
return ErrUnregisteredCrossName
}
return cross.PushMessage(serverId, packet)
}
// dispatchMessage 消息分发
func (slf *Server) dispatchMessage(msg *Message, isRedirect bool) {
if slf.diversion != nil && isRedirect && msg.t == MessageTypePacket {
conn, packet, _ := msg.t.deconstructWebSocketPacket(msg.attrs...)
if redirect := slf.diversion(conn, packet); redirect != nil {
redirect <- func() {
slf.dispatchMessage(msg, false)
func (slf *Server) dispatchMessage(msg *Message) {
var (
ctx context.Context
cancel context.CancelFunc
)
if slf.deadlockDetect > 0 {
ctx, cancel = context.WithTimeout(context.Background(), slf.deadlockDetect)
go func() {
select {
case <-ctx.Done():
if err := ctx.Err(); err == context.DeadlineExceeded {
log.Warn("Server", zap.String("MessageType", messageNames[msg.t]), zap.Any("SuspectedDeadlock", msg.attrs))
}
}
}
return
}()
}
present := time.Now()
defer func() {
if err := recover(); err != nil {
@ -488,41 +491,59 @@ func (slf *Server) dispatchMessage(msg *Message, isRedirect bool) {
}
}
cost := time.Since(present)
if cost > time.Millisecond*100 {
log.Warn("Server", zap.String("MessageType", messageNames[msg.t]), zap.String("LowExecCost", cost.String()), zap.Any("MessageAttrs", msg.attrs))
slf.OnMessageLowExecEvent(msg, cost)
if msg.t != MessageTypeAsync {
super.Handle(cancel)
slf.low(msg, present)
if !slf.isShutdown.Load() {
slf.messagePool.Release(msg)
}
}
if !slf.isShutdown.Load() {
slf.messagePool.Release(msg)
}
}()
var attrs = msg.attrs
switch msg.t {
case MessageTypePacket:
if slf.network == NetworkWebsocket {
conn, packet, messageType := msg.t.deconstructWebSocketPacket(msg.attrs...)
slf.OnConnectionReceiveWebsocketPacketEvent(conn, packet, messageType)
} else {
conn, packet := msg.t.deconstructPacket(msg.attrs...)
slf.OnConnectionReceivePacketEvent(conn, packet)
}
var packet = attrs[1].([]byte)
var wst = int(packet[len(packet)-1])
slf.OnConnectionReceivePacketEvent(attrs[0].(*Conn), Packet{Data: packet[:len(packet)-1], WebsocketType: wst})
case MessageTypeError:
err, action, stack := msg.t.deconstructError(msg.attrs...)
err, action, stack := attrs[0].(error), attrs[1].(MessageErrorAction), attrs[2].(string)
switch action {
case MessageErrorActionNone:
log.ErrorWithStack("Server", stack, zap.Error(err))
case MessageErrorActionShutdown:
slf.Shutdown(err, stack)
slf.shutdown(err, stack)
default:
log.Warn("Server", zap.String("not support message error action", action.String()))
}
case MessageTypeCross:
serverId, packet := msg.t.deconstructCross(msg.attrs...)
slf.OnReceiveCrossPacketEvent(serverId, packet)
slf.OnReceiveCrossPacketEvent(attrs[0].(int64), attrs[1].([]byte))
case MessageTypeTicker:
caller := msg.t.deconstructTicker(msg.attrs...)
caller()
attrs[0].(func())()
case MessageTypeAsync:
handle := attrs[0].(func() error)
callbacks := attrs[1].([]func(err error))
if err := slf.ants.Submit(func() {
defer func() {
if err := recover(); err != nil {
log.Error("Server", zap.String("MessageType", messageNames[msg.t]), zap.Any("MessageAttrs", msg.attrs), zap.Any("error", err))
if e, ok := err.(error); ok {
slf.OnMessageErrorEvent(msg, e)
}
}
super.Handle(cancel)
if !slf.isShutdown.Load() {
slf.messagePool.Release(msg)
}
}()
if err := handle(); err != nil {
for _, callback := range callbacks {
callback(err)
}
}
}); err != nil {
panic(err)
}
default:
log.Warn("Server", zap.String("not support message type", msg.t.String()))
}

View File

@ -0,0 +1,23 @@
package server_test
import (
"github.com/kercylan98/minotaur/server"
"time"
)
func ExampleNew() {
srv := server.New(server.NetworkWebsocket,
server.WithDeadlockDetect(time.Second*5),
)
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet server.Packet) {
conn.Write(packet)
})
go func() { time.Sleep(1 * time.Second); srv.Shutdown() }()
if err := srv.Run(":9999"); err != nil {
panic(err)
}
// Output:
}

View File

@ -101,8 +101,16 @@ func getWriter(filename string, times int) io.Writer {
return hook
}
func Logger() *zap.Logger {
return logger
type MLogger struct {
*zap.Logger
}
func (slf *MLogger) Printf(format string, args ...interface{}) {
slf.Info(fmt.Sprintf(format, args...))
}
func Logger() *MLogger {
return &MLogger{logger}
}
func Info(msg string, fields ...zap.Field) {

View File

@ -1,5 +1,9 @@
package str
const (
None = "" // 空字符串
)
// FirstUpper 首字母大写
func FirstUpper(str string) string {
var upperStr string