去除消息分流、设计不合理

This commit is contained in:
kercylan98 2023-05-19 14:19:11 +08:00
parent 746eb7cacd
commit 3effa6bb91
2 changed files with 1 additions and 43 deletions

View File

@ -1,7 +1,6 @@
package server
import (
"github.com/kercylan98/minotaur/utils/hash"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/timer"
"go.uber.org/zap"
@ -59,25 +58,6 @@ func WithCross(crossName string, serverId int64, cross Cross) Option {
}
}
// WithMessageDiversion 通过消息分流的方式创建服务器
// - 连接消息分流后消息将会从其他消息类型中独立出来,并且由多个消息管道及协程进行处理
// - 默认不会进行消息分流
// - 需要注意并发编程
func WithMessageDiversion(diversionNumber, channelSize int) Option {
return func(srv *Server) {
if srv.network == NetworkHttp || srv.network == NetworkGRPC {
log.Warn("WithMessageDiversion", zap.String("Network", string(srv.network)), zap.Error(ErrOnlySupportSocket))
return
}
srv.diversionMessageChannels = make([]chan *message, diversionNumber)
srv.diversionConsistency = hash.NewConsistency(3)
for i := 0; i < diversionNumber; i++ {
srv.diversionMessageChannels[i] = make(chan *message, channelSize)
srv.diversionConsistency.AddNode(i + 1)
}
}
}
// WithTLS 通过安全传输层协议TLS创建服务器
// - 支持Http、Websocket
func WithTLS(certFile, keyFile string) Option {

View File

@ -5,7 +5,6 @@ import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/kercylan98/minotaur/utils/hash"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/synchronization"
"github.com/kercylan98/minotaur/utils/timer"
@ -75,8 +74,6 @@ type Server struct {
multiple bool // 是否为多服务器模式下运行
prod bool // 是否为生产模式
core int // 消息处理核心数
diversionMessageChannels []chan *message // 分流消息管道
diversionConsistency *hash.Consistency // 哈希一致性分流器
websocketWriteMessageType int // websocket写入的消息类型
ticker *timer.Ticker // 定时器
}
@ -127,15 +124,6 @@ func (slf *Server) Run(addr string) error {
slf.dispatchMessage(message)
}
}()
go func() {
for i := 0; i < len(slf.diversionMessageChannels); i++ {
go func(channel chan *message) {
for message := range channel {
slf.dispatchMessage(message)
}
}(slf.diversionMessageChannels[i])
}
}()
}
}
@ -350,11 +338,6 @@ func (slf *Server) Shutdown(err error) {
for _, cross := range slf.cross {
cross.Release()
}
if len(slf.diversionMessageChannels) > 0 {
for i := 0; i < len(slf.diversionMessageChannels); i++ {
close(slf.diversionMessageChannels[i])
}
}
if slf.initMessageChannel {
if slf.gServer != nil {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
@ -408,12 +391,7 @@ func (slf *Server) PushMessage(messageType MessageType, attrs ...any) {
msg := slf.messagePool.Get()
msg.t = messageType
msg.attrs = attrs
if messageType == MessageTypePacket && len(slf.diversionMessageChannels) > 0 {
conn := attrs[0].(*Conn)
slf.diversionMessageChannels[slf.diversionConsistency.PickNode(conn.ip)] <- msg
} else {
slf.messageChannel <- msg
}
slf.messageChannel <- msg
}
// PushCrossMessage 推送跨服消息到特定跨服的服务器中