跨服支持名称隔离

This commit is contained in:
kercylan98 2023-05-16 12:51:08 +08:00
parent 1c3b27e900
commit ad1499e9b0
5 changed files with 42 additions and 20 deletions

View File

@ -65,3 +65,7 @@ func (slf *Nats) PushMessage(serverId int64, packet []byte) error {
} }
return slf.conn.Publish(fmt.Sprintf("%s_%d", slf.subject, serverId), data) return slf.conn.Publish(fmt.Sprintf("%s_%d", slf.subject, serverId), data)
} }
func (slf *Nats) Release() {
slf.conn.Close()
}

View File

@ -9,4 +9,6 @@ type Cross interface {
// PushMessage 推送跨服消息 // PushMessage 推送跨服消息
// - serverId: 目标服务器id // - serverId: 目标服务器id
PushMessage(serverId int64, packet []byte) error PushMessage(serverId int64, packet []byte) error
// Release 释放资源
Release()
} }

View File

@ -21,4 +21,5 @@ var (
ErrOnlySupportSocket = errors.New("only supports Socket programming") ErrOnlySupportSocket = errors.New("only supports Socket programming")
ErrNoSupportCross = errors.New("the server does not support GetID or PushCrossMessage, please use the WithCross option to create the server") ErrNoSupportCross = errors.New("the server does not support GetID or PushCrossMessage, please use the WithCross option to create the server")
ErrNoSupportTicker = errors.New("the server does not support Ticker, please use the WithTicker option to create the server") ErrNoSupportTicker = errors.New("the server does not support Ticker, please use the WithTicker option to create the server")
ErrUnregisteredCrossName = errors.New("unregistered cross name, please use the WithCross option to create the server")
) )

View File

@ -40,11 +40,16 @@ func WithTicker(size int, autonomy bool) Option {
} }
// WithCross 通过跨服的方式创建服务器 // WithCross 通过跨服的方式创建服务器
func WithCross(serverId int64, cross Cross) Option { // - 推送跨服消息时将推送到对应crossName的跨服中间件中crossName可以满足不同功能采用不同的跨服/消息中间件
// - 通常情况下crossName仅需一个即可
func WithCross(crossName string, serverId int64, cross Cross) Option {
return func(srv *Server) { return func(srv *Server) {
srv.id = serverId srv.id = serverId
srv.cross = cross if srv.cross == nil {
err := srv.cross.Init(srv, func(serverId int64, packet []byte) { srv.cross = map[string]Cross{}
}
srv.cross[crossName] = cross
err := cross.Init(srv, func(serverId int64, packet []byte) {
srv.PushMessage(MessageTypeCross, serverId, packet) srv.PushMessage(MessageTypeCross, serverId, packet)
}) })
if err != nil { if err != nil {

View File

@ -53,19 +53,19 @@ func New(network Network, options ...Option) *Server {
// Server 网络服务器 // Server 网络服务器
type Server struct { type Server struct {
*event // 事件 *event // 事件
cross Cross // 跨服 cross map[string]Cross // 跨服
id int64 // 服务器id id int64 // 服务器id
network Network // 网络类型 network Network // 网络类型
addr string // 侦听地址 addr string // 侦听地址
options []Option // 选项 options []Option // 选项
ginServer *gin.Engine // HTTP模式下的路由器 ginServer *gin.Engine // HTTP模式下的路由器
httpServer *http.Server // HTTP模式下的服务器 httpServer *http.Server // HTTP模式下的服务器
grpcServer *grpc.Server // GRPC模式下的服务器 grpcServer *grpc.Server // GRPC模式下的服务器
supportMessageTypes map[int]bool // websocket模式下支持的消息类型 supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件 certFile, keyFile string // TLS文件
isShutdown atomic.Bool // 是否已关闭 isShutdown atomic.Bool // 是否已关闭
closeChannel chan struct{} // 关闭信号 closeChannel chan struct{} // 关闭信号
gServer *gNet // TCP或UDP模式下的服务器 gServer *gNet // TCP或UDP模式下的服务器
messagePool *synchronization.Pool[*message] // 消息池 messagePool *synchronization.Pool[*message] // 消息池
@ -345,6 +345,12 @@ func (slf *Server) Ticker() *timer.Ticker {
// Shutdown 停止运行服务器 // Shutdown 停止运行服务器
func (slf *Server) Shutdown(err error) { func (slf *Server) Shutdown(err error) {
slf.isShutdown.Store(true) slf.isShutdown.Store(true)
if slf.ticker != nil {
slf.ticker.Release()
}
for _, cross := range slf.cross {
cross.Release()
}
if len(slf.diversionMessageChannels) > 0 { if len(slf.diversionMessageChannels) > 0 {
for i := 0; i < len(slf.diversionMessageChannels); i++ { for i := 0; i < len(slf.diversionMessageChannels); i++ {
close(slf.diversionMessageChannels[i]) close(slf.diversionMessageChannels[i])
@ -411,12 +417,16 @@ func (slf *Server) PushMessage(messageType MessageType, attrs ...any) {
} }
} }
// PushCrossMessage 推送跨服消息 // PushCrossMessage 推送跨服消息到特定跨服的服务器中
func (slf *Server) PushCrossMessage(serverId int64, packet []byte) error { func (slf *Server) PushCrossMessage(crossName string, serverId int64, packet []byte) error {
if slf.cross == nil { if len(slf.cross) == 0 {
return ErrNoSupportCross return ErrNoSupportCross
} }
return slf.cross.PushMessage(serverId, packet) cross, exist := slf.cross[crossName]
if !exist {
return ErrUnregisteredCrossName
}
return cross.PushMessage(serverId, packet)
} }
// dispatchMessage 消息分发 // dispatchMessage 消息分发