diff --git a/server/options.go b/server/options.go index 2ccf809..6c23474 100644 --- a/server/options.go +++ b/server/options.go @@ -1,7 +1,6 @@ package server import ( - "github.com/kercylan98/minotaur/utils/hash" "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/timer" "go.uber.org/zap" @@ -59,25 +58,6 @@ func WithCross(crossName string, serverId int64, cross Cross) Option { } } -// WithMessageDiversion 通过消息分流的方式创建服务器 -// - 连接消息分流后消息将会从其他消息类型中独立出来,并且由多个消息管道及协程进行处理 -// - 默认不会进行消息分流 -// - 需要注意并发编程 -func WithMessageDiversion(diversionNumber, channelSize int) Option { - return func(srv *Server) { - if srv.network == NetworkHttp || srv.network == NetworkGRPC { - log.Warn("WithMessageDiversion", zap.String("Network", string(srv.network)), zap.Error(ErrOnlySupportSocket)) - return - } - srv.diversionMessageChannels = make([]chan *message, diversionNumber) - srv.diversionConsistency = hash.NewConsistency(3) - for i := 0; i < diversionNumber; i++ { - srv.diversionMessageChannels[i] = make(chan *message, channelSize) - srv.diversionConsistency.AddNode(i + 1) - } - } -} - // WithTLS 通过安全传输层协议TLS创建服务器 // - 支持:Http、Websocket func WithTLS(certFile, keyFile string) Option { diff --git a/server/server.go b/server/server.go index 98fbdfb..5cf90a3 100644 --- a/server/server.go +++ b/server/server.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" - "github.com/kercylan98/minotaur/utils/hash" "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/synchronization" "github.com/kercylan98/minotaur/utils/timer" @@ -75,8 +74,6 @@ type Server struct { multiple bool // 是否为多服务器模式下运行 prod bool // 是否为生产模式 core int // 消息处理核心数 - diversionMessageChannels []chan *message // 分流消息管道 - diversionConsistency *hash.Consistency // 哈希一致性分流器 websocketWriteMessageType int // websocket写入的消息类型 ticker *timer.Ticker // 定时器 } @@ -127,15 +124,6 @@ func (slf *Server) Run(addr string) error { slf.dispatchMessage(message) } }() - go func() { - for i := 0; i < len(slf.diversionMessageChannels); i++ { - go func(channel chan *message) { - for message := range channel { - slf.dispatchMessage(message) - } - }(slf.diversionMessageChannels[i]) - } - }() } } @@ -350,11 +338,6 @@ func (slf *Server) Shutdown(err error) { for _, cross := range slf.cross { cross.Release() } - if len(slf.diversionMessageChannels) > 0 { - for i := 0; i < len(slf.diversionMessageChannels); i++ { - close(slf.diversionMessageChannels[i]) - } - } if slf.initMessageChannel { if slf.gServer != nil { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -408,12 +391,7 @@ func (slf *Server) PushMessage(messageType MessageType, attrs ...any) { msg := slf.messagePool.Get() msg.t = messageType msg.attrs = attrs - if messageType == MessageTypePacket && len(slf.diversionMessageChannels) > 0 { - conn := attrs[0].(*Conn) - slf.diversionMessageChannels[slf.diversionConsistency.PickNode(conn.ip)] <- msg - } else { - slf.messageChannel <- msg - } + slf.messageChannel <- msg } // PushCrossMessage 推送跨服消息到特定跨服的服务器中