diff --git a/server/server.go b/server/server.go index 11bdc65..3c771d2 100644 --- a/server/server.go +++ b/server/server.go @@ -69,7 +69,7 @@ type Server struct { gServer *gNet // TCP或UDP模式下的服务器 messagePool *synchronization.Pool[*message] // 消息池 messagePoolSize int // 消息池大小 - messageChannel chan *message // 消息管道 + messageChannel map[int]chan *message // 消息管道 initMessageChannel bool // 消息管道是否已经初始化 multiple bool // 是否为多服务器模式下运行 prod bool // 是否为生产模式 @@ -111,16 +111,20 @@ func (slf *Server) Run(addr string) error { data.attrs = nil }, ) - slf.messageChannel = make(chan *message, 4096*1000) + slf.messageChannel = map[int]chan *message{} + for i := 0; i < slf.core; i++ { + slf.messageChannel[i] = make(chan *message, 4096*1000) + } if slf.network != NetworkHttp && slf.network != NetworkWebsocket { slf.gServer = &gNet{Server: slf} } if callback != nil { go callback() } - for i := 0; i < slf.core; i++ { + for _, messageChannel := range slf.messageChannel { + messageChannel := messageChannel go func() { - for message := range slf.messageChannel { + for message := range messageChannel { slf.dispatchMessage(message) } }() @@ -344,7 +348,9 @@ func (slf *Server) Shutdown(err error) { log.Error("Server", zap.Error(shutdownErr)) } } - close(slf.messageChannel) + for _, messageChannel := range slf.messageChannel { + close(messageChannel) + } slf.messagePool.Close() slf.initMessageChannel = false } @@ -389,7 +395,10 @@ func (slf *Server) PushMessage(messageType MessageType, attrs ...any) { msg := slf.messagePool.Get() msg.t = messageType msg.attrs = attrs - slf.messageChannel <- msg + for _, channel := range slf.messageChannel { + channel <- msg + break + } } // PushCrossMessage 推送跨服消息到特定跨服的服务器中