feat: server 包支持设置注册事件的优先级

This commit is contained in:
kercylan98 2023-08-21 15:02:48 +08:00
parent 2dd5dd5c6c
commit 3c6ce9cfdf
5 changed files with 142 additions and 102 deletions

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/runtimes" "github.com/kercylan98/minotaur/utils/runtimes"
"github.com/kercylan98/minotaur/utils/slice"
"reflect" "reflect"
"runtime/debug" "runtime/debug"
"sync" "sync"
@ -26,45 +27,66 @@ type ShuntChannelCreatedEventHandle func(srv *Server, guid int64)
type ShuntChannelClosedEventHandle func(srv *Server, guid int64) type ShuntChannelClosedEventHandle func(srv *Server, guid int64)
type ConnectionPacketPreprocessEventHandle func(srv *Server, conn *Conn, packet []byte, abort func(), usePacket func(newPacket []byte)) type ConnectionPacketPreprocessEventHandle func(srv *Server, conn *Conn, packet []byte, abort func(), usePacket func(newPacket []byte))
func newEvent(srv *Server) *event {
return &event{
Server: srv,
startBeforeEventHandles: slice.NewPriority[StartBeforeEventHandle](),
startFinishEventHandles: slice.NewPriority[StartFinishEventHandle](),
stopEventHandles: slice.NewPriority[StopEventHandle](),
connectionReceivePacketEventHandles: slice.NewPriority[ConnectionReceivePacketEventHandle](),
connectionOpenedEventHandles: slice.NewPriority[ConnectionOpenedEventHandle](),
connectionClosedEventHandles: slice.NewPriority[ConnectionClosedEventHandle](),
receiveCrossPacketEventHandles: slice.NewPriority[ReceiveCrossPacketEventHandle](),
messageErrorEventHandles: slice.NewPriority[MessageErrorEventHandle](),
messageLowExecEventHandles: slice.NewPriority[MessageLowExecEventHandle](),
connectionOpenedAfterEventHandles: slice.NewPriority[ConnectionOpenedAfterEventHandle](),
connectionWritePacketBeforeHandles: slice.NewPriority[ConnectionWritePacketBeforeEventHandle](),
shuntChannelCreatedEventHandles: slice.NewPriority[ShuntChannelCreatedEventHandle](),
shuntChannelClosedEventHandles: slice.NewPriority[ShuntChannelClosedEventHandle](),
connectionPacketPreprocessEventHandles: slice.NewPriority[ConnectionPacketPreprocessEventHandle](),
}
}
type event struct { type event struct {
*Server *Server
startBeforeEventHandles []StartBeforeEventHandle startBeforeEventHandles *slice.Priority[StartBeforeEventHandle]
startFinishEventHandles []StartFinishEventHandle startFinishEventHandles *slice.Priority[StartFinishEventHandle]
stopEventHandles []StopEventHandle stopEventHandles *slice.Priority[StopEventHandle]
connectionReceivePacketEventHandles []ConnectionReceivePacketEventHandle connectionReceivePacketEventHandles *slice.Priority[ConnectionReceivePacketEventHandle]
connectionOpenedEventHandles []ConnectionOpenedEventHandle connectionOpenedEventHandles *slice.Priority[ConnectionOpenedEventHandle]
connectionClosedEventHandles []ConnectionClosedEventHandle connectionClosedEventHandles *slice.Priority[ConnectionClosedEventHandle]
receiveCrossPacketEventHandles []ReceiveCrossPacketEventHandle receiveCrossPacketEventHandles *slice.Priority[ReceiveCrossPacketEventHandle]
messageErrorEventHandles []MessageErrorEventHandle messageErrorEventHandles *slice.Priority[MessageErrorEventHandle]
messageLowExecEventHandles []MessageLowExecEventHandle messageLowExecEventHandles *slice.Priority[MessageLowExecEventHandle]
connectionOpenedAfterEventHandles []ConnectionOpenedAfterEventHandle connectionOpenedAfterEventHandles *slice.Priority[ConnectionOpenedAfterEventHandle]
connectionWritePacketBeforeHandles []ConnectionWritePacketBeforeEventHandle connectionWritePacketBeforeHandles *slice.Priority[ConnectionWritePacketBeforeEventHandle]
shuntChannelCreatedEventHandles []ShuntChannelCreatedEventHandle shuntChannelCreatedEventHandles *slice.Priority[ShuntChannelCreatedEventHandle]
shuntChannelClosedEventHandles []ShuntChannelClosedEventHandle shuntChannelClosedEventHandles *slice.Priority[ShuntChannelClosedEventHandle]
connectionPacketPreprocessEventHandles []ConnectionPacketPreprocessEventHandle connectionPacketPreprocessEventHandles *slice.Priority[ConnectionPacketPreprocessEventHandle]
consoleCommandEventHandles map[string][]ConsoleCommandEventHandle consoleCommandEventHandles map[string]*slice.Priority[ConsoleCommandEventHandle]
consoleCommandEventHandleInitOnce sync.Once consoleCommandEventHandleInitOnce sync.Once
} }
// RegStopEvent 服务器停止时将立即执行被注册的事件处理函数 // RegStopEvent 服务器停止时将立即执行被注册的事件处理函数
func (slf *event) RegStopEvent(handle StopEventHandle) { func (slf *event) RegStopEvent(handle StopEventHandle, priority ...int) {
slf.stopEventHandles = append(slf.stopEventHandles, handle) slf.stopEventHandles.Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
} }
func (slf *event) OnStopEvent() { func (slf *event) OnStopEvent() {
for _, handle := range slf.stopEventHandles { slf.stopEventHandles.RangeValue(func(index int, value StopEventHandle) bool {
handle(slf.Server) value(slf.Server)
} return true
})
} }
// RegConsoleCommandEvent 控制台收到指令时将立即执行被注册的事件处理函数 // RegConsoleCommandEvent 控制台收到指令时将立即执行被注册的事件处理函数
// - 默认将注册 "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN" 指令作为关闭服务器的指令 // - 默认将注册 "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN" 指令作为关闭服务器的指令
// - 可通过注册默认指令进行默认行为的覆盖 // - 可通过注册默认指令进行默认行为的覆盖
func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEventHandle) { func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEventHandle, priority ...int) {
slf.consoleCommandEventHandleInitOnce.Do(func() { slf.consoleCommandEventHandleInitOnce.Do(func() {
slf.consoleCommandEventHandles = map[string][]ConsoleCommandEventHandle{} slf.consoleCommandEventHandles = map[string]*slice.Priority[ConsoleCommandEventHandle]{}
go func() { go func() {
for { for {
var input string var input string
@ -73,7 +95,7 @@ func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEv
} }
}() }()
}) })
slf.consoleCommandEventHandles[command] = append(slf.consoleCommandEventHandles[command], handle) slf.consoleCommandEventHandles[command].Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
} }
@ -89,16 +111,17 @@ func (slf *event) OnConsoleCommandEvent(command string) {
} }
log.Warn("Server", log.String("Command", "unregistered")) log.Warn("Server", log.String("Command", "unregistered"))
} else { } else {
for _, handle := range handles { handles.RangeValue(func(index int, value ConsoleCommandEventHandle) bool {
handle(slf.Server) value(slf.Server)
} return true
})
} }
}, "ConsoleCommandEvent") }, "ConsoleCommandEvent")
} }
// RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数 // RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数
func (slf *event) RegStartBeforeEvent(handle StartBeforeEventHandle) { func (slf *event) RegStartBeforeEvent(handle StartBeforeEventHandle, priority ...int) {
slf.startBeforeEventHandles = append(slf.startBeforeEventHandles, handle) slf.startBeforeEventHandles.Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
} }
@ -109,179 +132,191 @@ func (slf *event) OnStartBeforeEvent() {
debug.PrintStack() debug.PrintStack()
} }
}() }()
for _, handle := range slf.startBeforeEventHandles { slf.startBeforeEventHandles.RangeValue(func(index int, value StartBeforeEventHandle) bool {
handle(slf.Server) value(slf.Server)
} return true
})
} }
// RegStartFinishEvent 在服务器启动完成时将立刻执行被注册的事件处理函数 // RegStartFinishEvent 在服务器启动完成时将立刻执行被注册的事件处理函数
func (slf *event) RegStartFinishEvent(handle StartFinishEventHandle) { func (slf *event) RegStartFinishEvent(handle StartFinishEventHandle, priority ...int) {
slf.startFinishEventHandles = append(slf.startFinishEventHandles, handle) slf.startFinishEventHandles.Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
} }
func (slf *event) OnStartFinishEvent() { func (slf *event) OnStartFinishEvent() {
PushSystemMessage(slf.Server, func() { PushSystemMessage(slf.Server, func() {
for _, handle := range slf.startFinishEventHandles { slf.startFinishEventHandles.RangeValue(func(index int, value StartFinishEventHandle) bool {
handle(slf.Server) value(slf.Server)
} return true
})
}, "StartFinishEvent") }, "StartFinishEvent")
} }
// RegConnectionClosedEvent 在连接关闭后将立刻执行被注册的事件处理函数 // RegConnectionClosedEvent 在连接关闭后将立刻执行被注册的事件处理函数
func (slf *event) RegConnectionClosedEvent(handle ConnectionClosedEventHandle) { func (slf *event) RegConnectionClosedEvent(handle ConnectionClosedEventHandle, priority ...int) {
if slf.network == NetworkHttp { if slf.network == NetworkHttp {
panic(ErrNetworkIncompatibleHttp) panic(ErrNetworkIncompatibleHttp)
} }
slf.connectionClosedEventHandles = append(slf.connectionClosedEventHandles, handle) slf.connectionClosedEventHandles.Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
} }
func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) { func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) {
PushSystemMessage(slf.Server, func() { PushSystemMessage(slf.Server, func() {
for _, handle := range slf.connectionClosedEventHandles { slf.connectionClosedEventHandles.RangeValue(func(index int, value ConnectionClosedEventHandle) bool {
handle(slf.Server, conn, err) value(slf.Server, conn, err)
} return true
})
conn.Close() conn.Close()
slf.Server.online.Delete(conn.GetID()) slf.Server.online.Delete(conn.GetID())
}, "ConnectionClosedEvent") }, "ConnectionClosedEvent")
} }
// RegConnectionOpenedEvent 在连接打开后将立刻执行被注册的事件处理函数 // RegConnectionOpenedEvent 在连接打开后将立刻执行被注册的事件处理函数
func (slf *event) RegConnectionOpenedEvent(handle ConnectionOpenedEventHandle) { func (slf *event) RegConnectionOpenedEvent(handle ConnectionOpenedEventHandle, priority ...int) {
if slf.network == NetworkHttp { if slf.network == NetworkHttp {
panic(ErrNetworkIncompatibleHttp) panic(ErrNetworkIncompatibleHttp)
} }
slf.connectionOpenedEventHandles = append(slf.connectionOpenedEventHandles, handle) slf.connectionOpenedEventHandles.Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
} }
func (slf *event) OnConnectionOpenedEvent(conn *Conn) { func (slf *event) OnConnectionOpenedEvent(conn *Conn) {
PushSystemMessage(slf.Server, func() { PushSystemMessage(slf.Server, func() {
slf.Server.online.Set(conn.GetID(), conn) slf.Server.online.Set(conn.GetID(), conn)
for _, handle := range slf.connectionOpenedEventHandles { slf.connectionOpenedEventHandles.RangeValue(func(index int, value ConnectionOpenedEventHandle) bool {
handle(slf.Server, conn) value(slf.Server, conn)
} return true
})
}, "ConnectionOpenedEvent") }, "ConnectionOpenedEvent")
} }
// RegConnectionReceivePacketEvent 在接收到数据包时将立刻执行被注册的事件处理函数 // RegConnectionReceivePacketEvent 在接收到数据包时将立刻执行被注册的事件处理函数
func (slf *event) RegConnectionReceivePacketEvent(handle ConnectionReceivePacketEventHandle) { func (slf *event) RegConnectionReceivePacketEvent(handle ConnectionReceivePacketEventHandle, priority ...int) {
if slf.network == NetworkHttp { if slf.network == NetworkHttp {
panic(ErrNetworkIncompatibleHttp) panic(ErrNetworkIncompatibleHttp)
} }
slf.connectionReceivePacketEventHandles = append(slf.connectionReceivePacketEventHandles, handle) slf.connectionReceivePacketEventHandles.Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
} }
func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet Packet) { func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet Packet) {
for _, handle := range slf.connectionReceivePacketEventHandles { slf.connectionReceivePacketEventHandles.RangeValue(func(index int, value ConnectionReceivePacketEventHandle) bool {
handle(slf.Server, conn, packet) value(slf.Server, conn, packet)
} return true
})
} }
// RegReceiveCrossPacketEvent 在接收到跨服数据包时将立即执行被注册的事件处理函数 // RegReceiveCrossPacketEvent 在接收到跨服数据包时将立即执行被注册的事件处理函数
func (slf *event) RegReceiveCrossPacketEvent(handle ReceiveCrossPacketEventHandle) { func (slf *event) RegReceiveCrossPacketEvent(handle ReceiveCrossPacketEventHandle, priority ...int) {
slf.receiveCrossPacketEventHandles = append(slf.receiveCrossPacketEventHandles, handle) slf.receiveCrossPacketEventHandles.Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
} }
func (slf *event) OnReceiveCrossPacketEvent(serverId int64, packet []byte) { func (slf *event) OnReceiveCrossPacketEvent(serverId int64, packet []byte) {
for _, handle := range slf.receiveCrossPacketEventHandles { slf.receiveCrossPacketEventHandles.RangeValue(func(index int, value ReceiveCrossPacketEventHandle) bool {
handle(slf.Server, serverId, packet) value(slf.Server, serverId, packet)
} return true
})
} }
// RegMessageErrorEvent 在处理消息发生错误时将立即执行被注册的事件处理函数 // RegMessageErrorEvent 在处理消息发生错误时将立即执行被注册的事件处理函数
func (slf *event) RegMessageErrorEvent(handle MessageErrorEventHandle) { func (slf *event) RegMessageErrorEvent(handle MessageErrorEventHandle, priority ...int) {
slf.messageErrorEventHandles = append(slf.messageErrorEventHandles, handle) slf.messageErrorEventHandles.Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
} }
func (slf *event) OnMessageErrorEvent(message *Message, err error) { func (slf *event) OnMessageErrorEvent(message *Message, err error) {
PushSystemMessage(slf.Server, func() { PushSystemMessage(slf.Server, func() {
for _, handle := range slf.messageErrorEventHandles { slf.messageErrorEventHandles.RangeValue(func(index int, value MessageErrorEventHandle) bool {
handle(slf.Server, message, err) value(slf.Server, message, err)
} return true
})
}, "MessageErrorEvent") }, "MessageErrorEvent")
} }
// RegMessageLowExecEvent 在处理消息缓慢时将立即执行被注册的事件处理函数 // RegMessageLowExecEvent 在处理消息缓慢时将立即执行被注册的事件处理函数
func (slf *event) RegMessageLowExecEvent(handle MessageLowExecEventHandle) { func (slf *event) RegMessageLowExecEvent(handle MessageLowExecEventHandle, priority ...int) {
slf.messageLowExecEventHandles = append(slf.messageLowExecEventHandles, handle) slf.messageLowExecEventHandles.Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
} }
func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) { func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) {
PushSystemMessage(slf.Server, func() { PushSystemMessage(slf.Server, func() {
for _, handle := range slf.messageLowExecEventHandles { slf.messageLowExecEventHandles.RangeValue(func(index int, value MessageLowExecEventHandle) bool {
handle(slf.Server, message, cost) value(slf.Server, message, cost)
} return true
})
}, "MessageLowExecEvent") }, "MessageLowExecEvent")
} }
// RegConnectionOpenedAfterEvent 在连接打开事件处理完成后将立刻执行被注册的事件处理函数 // RegConnectionOpenedAfterEvent 在连接打开事件处理完成后将立刻执行被注册的事件处理函数
func (slf *event) RegConnectionOpenedAfterEvent(handle ConnectionOpenedAfterEventHandle) { func (slf *event) RegConnectionOpenedAfterEvent(handle ConnectionOpenedAfterEventHandle, priority ...int) {
if slf.network == NetworkHttp { if slf.network == NetworkHttp {
panic(ErrNetworkIncompatibleHttp) panic(ErrNetworkIncompatibleHttp)
} }
slf.connectionOpenedAfterEventHandles = append(slf.connectionOpenedAfterEventHandles, handle) slf.connectionOpenedAfterEventHandles.Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
} }
func (slf *event) OnConnectionOpenedAfterEvent(conn *Conn) { func (slf *event) OnConnectionOpenedAfterEvent(conn *Conn) {
PushSystemMessage(slf.Server, func() { PushSystemMessage(slf.Server, func() {
for _, handle := range slf.connectionOpenedAfterEventHandles { slf.connectionOpenedAfterEventHandles.RangeValue(func(index int, value ConnectionOpenedAfterEventHandle) bool {
handle(slf.Server, conn) value(slf.Server, conn)
} return true
})
}, "ConnectionOpenedAfterEvent") }, "ConnectionOpenedAfterEvent")
} }
// RegConnectionWritePacketBeforeEvent 在发送数据包前将立刻执行被注册的事件处理函数 // RegConnectionWritePacketBeforeEvent 在发送数据包前将立刻执行被注册的事件处理函数
func (slf *event) RegConnectionWritePacketBeforeEvent(handle ConnectionWritePacketBeforeEventHandle) { func (slf *event) RegConnectionWritePacketBeforeEvent(handle ConnectionWritePacketBeforeEventHandle, priority ...int) {
if slf.network == NetworkHttp { if slf.network == NetworkHttp {
panic(ErrNetworkIncompatibleHttp) panic(ErrNetworkIncompatibleHttp)
} }
slf.connectionWritePacketBeforeHandles = append(slf.connectionWritePacketBeforeHandles, handle) slf.connectionWritePacketBeforeHandles.Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
} }
func (slf *event) OnConnectionWritePacketBeforeEvent(conn *Conn, packet Packet) (newPacket Packet) { func (slf *event) OnConnectionWritePacketBeforeEvent(conn *Conn, packet Packet) (newPacket Packet) {
if len(slf.connectionWritePacketBeforeHandles) == 0 { if slf.connectionWritePacketBeforeHandles.Len() == 0 {
return packet return packet
} }
newPacket = packet newPacket = packet
for _, handle := range slf.connectionWritePacketBeforeHandles { slf.connectionWritePacketBeforeHandles.RangeValue(func(index int, value ConnectionWritePacketBeforeEventHandle) bool {
newPacket = handle(slf.Server, conn, packet) newPacket = value(slf.Server, conn, newPacket)
} return true
})
return newPacket return newPacket
} }
// RegShuntChannelCreatedEvent 在分流通道创建时将立刻执行被注册的事件处理函数 // RegShuntChannelCreatedEvent 在分流通道创建时将立刻执行被注册的事件处理函数
func (slf *event) RegShuntChannelCreatedEvent(handle ShuntChannelCreatedEventHandle) { func (slf *event) RegShuntChannelCreatedEvent(handle ShuntChannelCreatedEventHandle, priority ...int) {
slf.shuntChannelCreatedEventHandles = append(slf.shuntChannelCreatedEventHandles, handle) slf.shuntChannelCreatedEventHandles.Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
} }
func (slf *event) OnShuntChannelCreatedEvent(guid int64) { func (slf *event) OnShuntChannelCreatedEvent(guid int64) {
PushSystemMessage(slf.Server, func() { PushSystemMessage(slf.Server, func() {
for _, handle := range slf.shuntChannelCreatedEventHandles { slf.shuntChannelCreatedEventHandles.RangeValue(func(index int, value ShuntChannelCreatedEventHandle) bool {
handle(slf.Server, guid) value(slf.Server, guid)
} return true
})
}, "ShuntChannelCreatedEvent") }, "ShuntChannelCreatedEvent")
} }
// RegShuntChannelCloseEvent 在分流通道关闭时将立刻执行被注册的事件处理函数 // RegShuntChannelCloseEvent 在分流通道关闭时将立刻执行被注册的事件处理函数
func (slf *event) RegShuntChannelCloseEvent(handle ShuntChannelClosedEventHandle) { func (slf *event) RegShuntChannelCloseEvent(handle ShuntChannelClosedEventHandle, priority ...int) {
slf.shuntChannelClosedEventHandles = append(slf.shuntChannelClosedEventHandles, handle) slf.shuntChannelClosedEventHandles.Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
} }
func (slf *event) OnShuntChannelClosedEvent(guid int64) { func (slf *event) OnShuntChannelClosedEvent(guid int64) {
PushSystemMessage(slf.Server, func() { PushSystemMessage(slf.Server, func() {
for _, handle := range slf.shuntChannelClosedEventHandles { slf.shuntChannelClosedEventHandles.RangeValue(func(index int, value ShuntChannelClosedEventHandle) bool {
handle(slf.Server, guid) value(slf.Server, guid)
} return true
})
}, "ShuntChannelCloseEvent") }, "ShuntChannelCloseEvent")
} }
@ -293,22 +328,23 @@ func (slf *event) OnShuntChannelClosedEvent(guid int64) {
// 场景: // 场景:
// - 数据包格式校验 // - 数据包格式校验
// - 数据包分包等情况处理 // - 数据包分包等情况处理
func (slf *event) RegConnectionPacketPreprocessEvent(handle ConnectionPacketPreprocessEventHandle) { func (slf *event) RegConnectionPacketPreprocessEvent(handle ConnectionPacketPreprocessEventHandle, priority ...int) {
slf.connectionPacketPreprocessEventHandles = append(slf.connectionPacketPreprocessEventHandles, handle) slf.connectionPacketPreprocessEventHandles.Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
} }
func (slf *event) OnConnectionPacketPreprocessEvent(conn *Conn, packet []byte, usePacket func(newPacket []byte)) bool { func (slf *event) OnConnectionPacketPreprocessEvent(conn *Conn, packet []byte, usePacket func(newPacket []byte)) bool {
if len(slf.connectionPacketPreprocessEventHandles) == 0 { if slf.connectionPacketPreprocessEventHandles.Len() == 0 {
return false return false
} }
var abort = false var abort = false
for _, handle := range slf.connectionPacketPreprocessEventHandles { slf.connectionPacketPreprocessEventHandles.RangeValue(func(index int, value ConnectionPacketPreprocessEventHandle) bool {
handle(slf.Server, conn, packet, func() { abort = true }, usePacket) value(slf.Server, conn, packet, func() { abort = true }, usePacket)
if abort { if abort {
return abort return false
}
} }
return true
})
return abort return abort
} }
@ -316,12 +352,12 @@ func (slf *event) check() {
switch slf.network { switch slf.network {
case NetworkHttp, NetworkGRPC, NetworkNone: case NetworkHttp, NetworkGRPC, NetworkNone:
default: default:
if len(slf.connectionReceivePacketEventHandles) == 0 { if slf.connectionReceivePacketEventHandles.Len() == 0 {
log.Warn("Server", log.String("ConnectionReceivePacketEvent", "invalid server, no packets processed")) log.Warn("Server", log.String("ConnectionReceivePacketEvent", "invalid server, no packets processed"))
} }
} }
if len(slf.receiveCrossPacketEventHandles) > 0 && slf.cross == nil { if slf.receiveCrossPacketEventHandles.Len() > 0 && slf.cross == nil {
log.Warn("Server", log.String("ReceiveCrossPacketEvent", "invalid server, not register cross server")) log.Warn("Server", log.String("ReceiveCrossPacketEvent", "invalid server, not register cross server"))
} }

View File

@ -3,6 +3,7 @@ package gateway
import ( import (
"github.com/kercylan98/minotaur/server" "github.com/kercylan98/minotaur/server"
"github.com/kercylan98/minotaur/utils/super" "github.com/kercylan98/minotaur/utils/super"
"math"
) )
// NewGateway 基于 server.Server 创建网关服务器 // NewGateway 基于 server.Server 创建网关服务器
@ -25,8 +26,8 @@ type Gateway struct {
// Run 运行网关 // Run 运行网关
func (slf *Gateway) Run(addr string) error { func (slf *Gateway) Run(addr string) error {
slf.srv.RegConnectionOpenedEvent(slf.onConnectionOpened) slf.srv.RegConnectionOpenedEvent(slf.onConnectionOpened, math.MinInt)
slf.srv.RegConnectionReceivePacketEvent(slf.onConnectionReceivePacket) slf.srv.RegConnectionReceivePacketEvent(slf.onConnectionReceivePacket, math.MinInt)
return slf.srv.Run(addr) return slf.srv.Run(addr)
} }

View File

@ -2,6 +2,7 @@ package server
import ( import (
"github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/log"
"math"
"os" "os"
"os/signal" "os/signal"
"sync" "sync"
@ -38,12 +39,12 @@ func (slf *MultipleServer) Run() {
wait.Add(1) wait.Add(1)
go func(address string, server *Server) { go func(address string, server *Server) {
var startFinish bool var startFinish bool
server.startFinishEventHandles = append(server.startFinishEventHandles, func(srv *Server) { server.startFinishEventHandles.Append(func(srv *Server) {
if !startFinish { if !startFinish {
startFinish = true startFinish = true
wait.Done() wait.Done()
} }
}) }, math.MaxInt)
server.multiple = slf server.multiple = slf
server.multipleRuntimeErrorChan = runtimeExceptionChannel server.multipleRuntimeErrorChan = runtimeExceptionChannel
if err := server.Run(address); err != nil { if err := server.Run(address); err != nil {

View File

@ -30,7 +30,6 @@ import (
// New 根据特定网络类型创建一个服务器 // New 根据特定网络类型创建一个服务器
func New(network Network, options ...Option) *Server { func New(network Network, options ...Option) *Server {
server := &Server{ server := &Server{
event: &event{},
runtime: &runtime{messagePoolSize: DefaultMessageBufferSize, messageChannelSize: DefaultMessageChannelSize}, runtime: &runtime{messagePoolSize: DefaultMessageBufferSize, messageChannelSize: DefaultMessageChannelSize},
option: &option{}, option: &option{},
network: network, network: network,
@ -38,7 +37,7 @@ func New(network Network, options ...Option) *Server {
closeChannel: make(chan struct{}, 1), closeChannel: make(chan struct{}, 1),
systemSignal: make(chan os.Signal, 1), systemSignal: make(chan os.Signal, 1),
} }
server.event.Server = server server.event = newEvent(server)
switch network { switch network {
case NetworkHttp: case NetworkHttp:

View File

@ -90,6 +90,9 @@ func (slf *Priority[V]) SetPriority(index int, priority int) {
// Action 直接操作切片,如果返回值不为 nil则替换切片 // Action 直接操作切片,如果返回值不为 nil则替换切片
func (slf *Priority[V]) Action(action func(items []*PriorityItem[V]) []*PriorityItem[V]) { func (slf *Priority[V]) Action(action func(items []*PriorityItem[V]) []*PriorityItem[V]) {
if len(slf.items) == 0 {
return
}
if replace := action(slf.items); replace != nil { if replace := action(slf.items); replace != nil {
slf.items = replace slf.items = replace
slf.sort() slf.sort()