refactor: 私有化服务器 PushMessage 函数,移除 PushCrossMessage 函数,改为使用 server.PushXXXMessage 函数

This commit is contained in:
kercylan98 2023-07-07 13:38:26 +08:00
parent 7e67775157
commit 6d27433c4b
6 changed files with 61 additions and 46 deletions

View File

@ -13,9 +13,7 @@ func main() {
srv := server.New(server.NetworkWebsocket, server.WithCross("nats", 1, cross.NewNats("127.0.0.1:4222")), server.WithTicker(10, false)) srv := server.New(server.NetworkWebsocket, server.WithCross("nats", 1, cross.NewNats("127.0.0.1:4222")), server.WithTicker(10, false))
srv.RegStartFinishEvent(func(srv *server.Server) { srv.RegStartFinishEvent(func(srv *server.Server) {
srv.Ticker().Loop("CROSS", timer.Instantly, time.Second, timer.Forever, func() { srv.Ticker().Loop("CROSS", timer.Instantly, time.Second, timer.Forever, func() {
if err := srv.PushCrossMessage("nats", 2, []byte("I am cross 1")); err != nil { server.PushCrossMessage(srv, "nats", 2, []byte("I am cross 1"))
panic(err)
}
}) })
}) })
srv.RegReceiveCrossPacketEvent(func(srv *server.Server, senderServerId int64, packet []byte) { srv.RegReceiveCrossPacketEvent(func(srv *server.Server, senderServerId int64, packet []byte) {

View File

@ -13,9 +13,7 @@ func main() {
srv := server.New(server.NetworkWebsocket, server.WithCross("nats", 2, cross.NewNats("127.0.0.1:4222")), server.WithTicker(10, false)) srv := server.New(server.NetworkWebsocket, server.WithCross("nats", 2, cross.NewNats("127.0.0.1:4222")), server.WithTicker(10, false))
srv.RegStartFinishEvent(func(srv *server.Server) { srv.RegStartFinishEvent(func(srv *server.Server) {
srv.Ticker().Loop("CROSS", timer.Instantly, time.Second, timer.Forever, func() { srv.Ticker().Loop("CROSS", timer.Instantly, time.Second, timer.Forever, func() {
if err := srv.PushCrossMessage("nats", 1, []byte("I am cross 2")); err != nil { server.PushCrossMessage(srv, "nats", 1, []byte("I am cross 2"))
panic(err)
}
}) })
}) })
srv.RegReceiveCrossPacketEvent(func(srv *server.Server, senderServerId int64, packet []byte) { srv.RegReceiveCrossPacketEvent(func(srv *server.Server, senderServerId int64, packet []byte) {

View File

@ -48,7 +48,7 @@ func (slf *gNet) AfterWrite(c gnet.Conn, b []byte) {
} }
func (slf *gNet) React(packet []byte, c gnet.Conn) (out []byte, action gnet.Action) { func (slf *gNet) React(packet []byte, c gnet.Conn) (out []byte, action gnet.Action) {
slf.Server.PushMessage(MessageTypePacket, c.Context().(*Conn), bytes.Clone(packet)) slf.Server.pushMessage(MessageTypePacket, c.Context().(*Conn), bytes.Clone(packet))
return nil, gnet.None return nil, gnet.None
} }

View File

@ -1,24 +1,22 @@
package server package server
import "runtime/debug"
const ( const (
// MessageTypePacket 数据包消息类型:该类型的数据将被发送到 ConnectionReceivePacketEvent 进行处理 // MessageTypePacket 数据包消息类型:该类型的数据将被发送到 ConnectionReceivePacketEvent 进行处理
// - *server.Conn
// - []byte
MessageTypePacket MessageType = iota MessageTypePacket MessageType = iota
// MessageTypeError 错误消息类型:根据不同的错误状态,将交由 Server 进行统一处理 // MessageTypeError 错误消息类型:根据不同的错误状态,将交由 Server 进行统一处理
// - error
// - server.MessageErrorAction
MessageTypeError MessageTypeError
// MessageTypeCross 跨服消息类型:将被推送到跨服的 Cross 实现中进行处理 // MessageTypeCross 跨服消息类型:将被推送到跨服的 Cross 实现中进行处理
// - int64(serverId)
// - []byte
MessageTypeCross MessageTypeCross
// MessageTypeTicker 定时器消息类型 // MessageTypeTicker 定时器消息类型
// - func()
MessageTypeTicker MessageTypeTicker
// MessageTypeAsync 异步消息类型
MessageTypeAsync
) )
var messageNames = map[MessageType]string{ var messageNames = map[MessageType]string{
@ -129,3 +127,42 @@ func (slf MessageType) deconstructTicker(attrs ...any) (caller func()) {
} }
return return
} }
// PushWebsocketPacketMessage 向特定服务器中推送 WebsocketPacket 消息
func PushWebsocketPacketMessage(srv *Server, conn *Conn, packet []byte, messageType int) {
msg := srv.messagePool.Get()
msg.t = MessageTypePacket
msg.attrs = []any{conn, packet, messageType}
srv.pushMessage(msg)
}
// PushPacketMessage 向特定服务器中推送 Packet 消息
func PushPacketMessage(srv *Server, conn *Conn, packet []byte) {
msg := srv.messagePool.Get()
msg.t = MessageTypePacket
msg.attrs = []any{conn, packet}
srv.pushMessage(msg)
}
// PushErrorMessage 向特定服务器中推送 Error 消息
func PushErrorMessage(srv *Server, err error, action MessageErrorAction) {
msg := srv.messagePool.Get()
msg.t = MessageTypeError
msg.attrs = []any{err, action, string(debug.Stack())}
srv.pushMessage(msg)
}
// PushCrossMessage 向特定服务器中推送 Cross 消息
func PushCrossMessage(srv *Server, crossName string, serverId int64, packet []byte) {
if len(srv.cross) == 0 {
return
}
_, exist := srv.cross[crossName]
if !exist {
return
}
msg := srv.messagePool.Get()
msg.t = MessageTypeCross
msg.attrs = []any{serverId, packet}
srv.pushMessage(msg)
}

View File

@ -45,7 +45,7 @@ func WithTicker(size int, autonomy bool) Option {
srv.ticker = timer.GetTicker(size) srv.ticker = timer.GetTicker(size)
} else { } else {
srv.ticker = timer.GetTicker(size, timer.WithCaller(func(name string, caller func()) { srv.ticker = timer.GetTicker(size, timer.WithCaller(func(name string, caller func()) {
srv.PushMessage(MessageTypeTicker, caller) srv.pushMessage(MessageTypeTicker, caller)
})) }))
} }
} }
@ -64,7 +64,7 @@ func WithCross(crossName string, serverId int64, cross Cross) Option {
} }
srv.cross[crossName] = cross srv.cross[crossName] = cross
err := cross.Init(srv, func(serverId int64, packet []byte) { err := cross.Init(srv, func(serverId int64, packet []byte) {
srv.PushMessage(MessageTypeCross, serverId, packet) srv.pushMessage(MessageTypeCross, serverId, packet)
}) })
if err != nil { if err != nil {
log.Info("Cross", zap.Int64("ServerID", serverId), zap.String("Cross", reflect.TypeOf(cross).String()), zap.String("State", "WaitNatsRun")) log.Info("Cross", zap.Int64("ServerID", serverId), zap.String("Cross", reflect.TypeOf(cross).String()), zap.String("State", "WaitNatsRun"))

View File

@ -19,7 +19,6 @@ import (
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"runtime/debug"
"strings" "strings"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
@ -144,7 +143,7 @@ func (slf *Server) Run(addr string) error {
slf.OnStartBeforeEvent() slf.OnStartBeforeEvent()
if err := slf.grpcServer.Serve(listener); err != nil { if err := slf.grpcServer.Serve(listener); err != nil {
slf.isRunning = false slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) PushErrorMessage(slf, err, MessageErrorActionShutdown)
} }
}() }()
case NetworkTcp, NetworkTcp4, NetworkTcp6, NetworkUdp, NetworkUdp4, NetworkUdp6, NetworkUnix: case NetworkTcp, NetworkTcp4, NetworkTcp6, NetworkUdp, NetworkUdp4, NetworkUdp6, NetworkUnix:
@ -158,7 +157,7 @@ func (slf *Server) Run(addr string) error {
gnet.WithMulticore(true), gnet.WithMulticore(true),
); err != nil { ); err != nil {
slf.isRunning = false slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) PushErrorMessage(slf, err, MessageErrorActionShutdown)
} }
}) })
case NetworkKcp: case NetworkKcp:
@ -191,7 +190,7 @@ func (slf *Server) Run(addr string) error {
if err != nil { if err != nil {
panic(err) panic(err)
} }
slf.PushMessage(MessageTypePacket, conn, buf[:n]) PushPacketMessage(slf, conn, buf[:n])
} }
}(conn) }(conn)
} }
@ -209,12 +208,12 @@ func (slf *Server) Run(addr string) error {
if len(slf.certFile)+len(slf.keyFile) > 0 { if len(slf.certFile)+len(slf.keyFile) > 0 {
if err := slf.httpServer.ListenAndServeTLS(slf.certFile, slf.keyFile); err != nil { if err := slf.httpServer.ListenAndServeTLS(slf.certFile, slf.keyFile); err != nil {
slf.isRunning = false slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) PushErrorMessage(slf, err, MessageErrorActionShutdown)
} }
} else { } else {
if err := slf.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { if err := slf.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
slf.isRunning = false slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) PushErrorMessage(slf, err, MessageErrorActionShutdown)
} }
} }
@ -275,7 +274,7 @@ func (slf *Server) Run(addr string) error {
if len(slf.supportMessageTypes) > 0 && !slf.supportMessageTypes[messageType] { if len(slf.supportMessageTypes) > 0 && !slf.supportMessageTypes[messageType] {
panic(ErrWebsocketIllegalMessageType) panic(ErrWebsocketIllegalMessageType)
} }
slf.PushMessage(MessageTypePacket, conn, packet, messageType) PushWebsocketPacketMessage(slf, conn, packet, messageType)
} }
}) })
go func() { go func() {
@ -284,12 +283,12 @@ func (slf *Server) Run(addr string) error {
if len(slf.certFile)+len(slf.keyFile) > 0 { if len(slf.certFile)+len(slf.keyFile) > 0 {
if err := http.ListenAndServeTLS(slf.addr, slf.certFile, slf.keyFile, nil); err != nil { if err := http.ListenAndServeTLS(slf.addr, slf.certFile, slf.keyFile, nil); err != nil {
slf.isRunning = false slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) PushErrorMessage(slf, err, MessageErrorActionShutdown)
} }
} else { } else {
if err := http.ListenAndServe(slf.addr, nil); err != nil { if err := http.ListenAndServe(slf.addr, nil); err != nil {
slf.isRunning = false slf.isRunning = false
slf.PushMessage(MessageTypeError, err, MessageErrorActionShutdown) PushErrorMessage(slf, err, MessageErrorActionShutdown)
} }
} }
@ -426,30 +425,13 @@ func (slf *Server) HttpRouter() gin.IRouter {
return slf.ginServer return slf.ginServer
} }
// PushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求 // pushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求
func (slf *Server) PushMessage(messageType MessageType, attrs ...any) { func (slf *Server) pushMessage(message *Message) {
if slf.messagePool.IsClose() { if slf.messagePool.IsClose() {
slf.messagePool.Release(message)
return return
} }
msg := slf.messagePool.Get() slf.messageChannel <- message
msg.t = messageType
msg.attrs = attrs
if msg.t == MessageTypeError {
msg.attrs = append(msg.attrs, string(debug.Stack()))
}
slf.messageChannel <- msg
}
// PushCrossMessage 推送跨服消息到特定跨服的服务器中
func (slf *Server) PushCrossMessage(crossName string, serverId int64, packet []byte) error {
if len(slf.cross) == 0 {
return ErrNoSupportCross
}
cross, exist := slf.cross[crossName]
if !exist {
return ErrUnregisteredCrossName
}
return cross.PushMessage(serverId, packet)
} }
// dispatchMessage 消息分发 // dispatchMessage 消息分发