feat: server.Server 新增分流通道创建和关闭事件

This commit is contained in:
kercylan98 2023-08-01 15:07:33 +08:00
parent c92f16c170
commit b9d953338f
3 changed files with 37 additions and 0 deletions

View File

@ -22,6 +22,8 @@ type MessageLowExecEventHandle func(srv *Server, message *Message, cost time.Dur
type ConsoleCommandEventHandle func(srv *Server)
type ConnectionOpenedAfterEventHandle func(srv *Server, conn *Conn)
type ConnectionWritePacketBeforeEventHandle func(srv *Server, conn *Conn, packet Packet) Packet
type ShuntChannelCreatedEventHandle func(srv *Server, guid int64)
type ShuntChannelClosedEventHandle func(srv *Server, guid int64)
type event struct {
*Server
@ -36,6 +38,8 @@ type event struct {
messageLowExecEventHandles []MessageLowExecEventHandle
connectionOpenedAfterEventHandles []ConnectionOpenedAfterEventHandle
connectionWritePacketBeforeHandles []ConnectionWritePacketBeforeEventHandle
shuntChannelCreatedEventHandles []ShuntChannelCreatedEventHandle
shuntChannelClosedEventHandles []ShuntChannelClosedEventHandle
consoleCommandEventHandles map[string][]ConsoleCommandEventHandle
consoleCommandEventHandleInitOnce sync.Once
@ -251,6 +255,34 @@ func (slf *event) OnConnectionWritePacketBeforeEvent(conn *Conn, packet Packet)
return newPacket
}
// RegShuntChannelCreatedEvent 在分流通道创建时将立刻执行被注册的事件处理函数
func (slf *event) RegShuntChannelCreatedEvent(handle ShuntChannelCreatedEventHandle) {
slf.shuntChannelCreatedEventHandles = append(slf.shuntChannelCreatedEventHandles, handle)
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
}
func (slf *event) OnShuntChannelCreatedEvent(guid int64) {
PushSystemMessage(slf.Server, func() {
for _, handle := range slf.shuntChannelCreatedEventHandles {
handle(slf.Server, guid)
}
}, "ShuntChannelCreatedEvent")
}
// RegShuntChannelCloseEvent 在分流通道关闭时将立刻执行被注册的事件处理函数
func (slf *event) RegShuntChannelCloseEvent(handle ShuntChannelClosedEventHandle) {
slf.shuntChannelClosedEventHandles = append(slf.shuntChannelClosedEventHandles, handle)
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
}
func (slf *event) OnShuntChannelClosedEvent(guid int64) {
PushSystemMessage(slf.Server, func() {
for _, handle := range slf.shuntChannelClosedEventHandles {
handle(slf.Server, guid)
}
}, "ShuntChannelCloseEvent")
}
func (slf *event) check() {
switch slf.network {
case NetworkHttp, NetworkGRPC:

View File

@ -238,6 +238,9 @@ func WithPProf(pattern ...string) Option {
//
// 将被分流的消息类型(更多类型有待斟酌):
// - MessageTypePacket
//
// 注意事项:
// - 需要在分流通道使用完成后主动调用 Server.ShuntChannelFreed 函数释放分流通道,避免内存泄漏
func WithShunt(channelGenerator func(guid int64) chan *Message, shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool)) Option {
return func(srv *Server) {
if channelGenerator == nil || shuntMatcher == nil {

View File

@ -489,6 +489,7 @@ func (slf *Server) ShuntChannelFreed(channelGuid int64) {
if exist {
close(channel)
slf.shuntChannels.Delete(channelGuid)
slf.OnShuntChannelClosedEvent(channelGuid)
}
}
@ -510,6 +511,7 @@ func (slf *Server) pushMessage(message *Message) {
slf.dispatchMessage(message)
}
}(channel)
defer slf.OnShuntChannelCreatedEvent(channelGuid)
}
if channel != nil {
channel <- message