diff --git a/server/message.go b/server/message.go index d6078da..8301138 100644 --- a/server/message.go +++ b/server/message.go @@ -187,6 +187,12 @@ func (slf *Message) GetTickerMessageAttrs() (caller func()) { // PushTickerMessage 向特定服务器中推送 MessageTypeTicker 消息 func PushTickerMessage(srv *Server, caller func(), mark ...any) { + srv.messageLock.RLock() + if srv.messagePool == nil { + srv.messageLock.RUnlock() + return + } + srv.messageLock.RUnlock() msg := srv.messagePool.Get() msg.t = MessageTypeTicker msg.attrs = append([]any{caller}, mark...) diff --git a/server/multiple.go b/server/multiple.go index 7848108..e4ec259 100644 --- a/server/multiple.go +++ b/server/multiple.go @@ -34,13 +34,15 @@ func (slf *MultipleServer) Run() { close(exceptionChannel) close(runtimeExceptionChannel) }() - var running = make([]*Server, 0, len(slf.servers)) var wait sync.WaitGroup for i := 0; i < len(slf.servers); i++ { wait.Add(1) go func(address string, server *Server) { + var lock sync.Mutex var startFinish bool server.startFinishEventHandles.Append(func(srv *Server) { + lock.Lock() + defer lock.Unlock() if !startFinish { startFinish = true wait.Done() @@ -50,9 +52,9 @@ func (slf *MultipleServer) Run() { server.multipleRuntimeErrorChan = runtimeExceptionChannel if err := server.Run(address); err != nil { exceptionChannel <- err - } else { - running = append(running, server) } + lock.Lock() + defer lock.Unlock() if !startFinish { startFinish = true wait.Done() diff --git a/server/server.go b/server/server.go index 3a5c1eb..b632fc6 100644 --- a/server/server.go +++ b/server/server.go @@ -24,6 +24,7 @@ import ( "os/signal" "runtime/debug" "strings" + "sync" "sync/atomic" "syscall" "time" @@ -77,23 +78,23 @@ func New(network Network, options ...Option) *Server { // Server 网络服务器 type Server struct { - *event // 事件 - *runtime // 运行时 - *option // 可选项 - network Network // 网络类型 - addr string // 侦听地址 - systemSignal chan os.Signal // 系统信号 - online *concurrent.BalanceMap[string, *Conn] // 在线连接 - ginServer *gin.Engine // HTTP模式下的路由器 - httpServer *http.Server // HTTP模式下的服务器 - grpcServer *grpc.Server // GRPC模式下的服务器 - gServer *gNet // TCP或UDP模式下的服务器 - isRunning bool // 是否正在运行 - isShutdown atomic.Bool // 是否已关闭 - closeChannel chan struct{} // 关闭信号 - ants *ants.Pool // 协程池 - messagePool *concurrent.Pool[*Message] // 消息池 - //messageChannel chan *Message // 消息管道 + *event // 事件 + *runtime // 运行时 + *option // 可选项 + network Network // 网络类型 + addr string // 侦听地址 + systemSignal chan os.Signal // 系统信号 + online *concurrent.BalanceMap[string, *Conn] // 在线连接 + ginServer *gin.Engine // HTTP模式下的路由器 + httpServer *http.Server // HTTP模式下的服务器 + grpcServer *grpc.Server // GRPC模式下的服务器 + gServer *gNet // TCP或UDP模式下的服务器 + isRunning bool // 是否正在运行 + isShutdown atomic.Bool // 是否已关闭 + closeChannel chan struct{} // 关闭信号 + ants *ants.Pool // 协程池 + messagePool *concurrent.Pool[*Message] // 消息池 + messageLock sync.RWMutex // 消息锁 messageChannel *buffer.Unbounded[*Message] // 消息无界缓冲区 multiple *MultipleServer // 多服务器模式下的服务器 multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 @@ -128,6 +129,7 @@ func (slf *Server) Run(addr string) error { var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr) var messageInitFinish = make(chan struct{}, 1) var connectionInitHandle = func(callback func()) { + slf.messageLock.Lock() slf.messagePool = concurrent.NewPool[*Message](slf.messagePoolSize, func() *Message { return &Message{} @@ -140,6 +142,7 @@ func (slf *Server) Run(addr string) error { slf.messageChannel = buffer.NewUnbounded[*Message](func() *Message { return nil }) + slf.messageLock.Unlock() if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC { slf.gServer = &gNet{Server: slf} }