fix: server 包数据竞态问题优化
This commit is contained in:
parent
859e0a1ac1
commit
cdbf388498
|
@ -187,6 +187,12 @@ func (slf *Message) GetTickerMessageAttrs() (caller func()) {
|
||||||
|
|
||||||
// PushTickerMessage 向特定服务器中推送 MessageTypeTicker 消息
|
// PushTickerMessage 向特定服务器中推送 MessageTypeTicker 消息
|
||||||
func PushTickerMessage(srv *Server, caller func(), mark ...any) {
|
func PushTickerMessage(srv *Server, caller func(), mark ...any) {
|
||||||
|
srv.messageLock.RLock()
|
||||||
|
if srv.messagePool == nil {
|
||||||
|
srv.messageLock.RUnlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
srv.messageLock.RUnlock()
|
||||||
msg := srv.messagePool.Get()
|
msg := srv.messagePool.Get()
|
||||||
msg.t = MessageTypeTicker
|
msg.t = MessageTypeTicker
|
||||||
msg.attrs = append([]any{caller}, mark...)
|
msg.attrs = append([]any{caller}, mark...)
|
||||||
|
|
|
@ -34,13 +34,15 @@ func (slf *MultipleServer) Run() {
|
||||||
close(exceptionChannel)
|
close(exceptionChannel)
|
||||||
close(runtimeExceptionChannel)
|
close(runtimeExceptionChannel)
|
||||||
}()
|
}()
|
||||||
var running = make([]*Server, 0, len(slf.servers))
|
|
||||||
var wait sync.WaitGroup
|
var wait sync.WaitGroup
|
||||||
for i := 0; i < len(slf.servers); i++ {
|
for i := 0; i < len(slf.servers); i++ {
|
||||||
wait.Add(1)
|
wait.Add(1)
|
||||||
go func(address string, server *Server) {
|
go func(address string, server *Server) {
|
||||||
|
var lock sync.Mutex
|
||||||
var startFinish bool
|
var startFinish bool
|
||||||
server.startFinishEventHandles.Append(func(srv *Server) {
|
server.startFinishEventHandles.Append(func(srv *Server) {
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
if !startFinish {
|
if !startFinish {
|
||||||
startFinish = true
|
startFinish = true
|
||||||
wait.Done()
|
wait.Done()
|
||||||
|
@ -50,9 +52,9 @@ func (slf *MultipleServer) Run() {
|
||||||
server.multipleRuntimeErrorChan = runtimeExceptionChannel
|
server.multipleRuntimeErrorChan = runtimeExceptionChannel
|
||||||
if err := server.Run(address); err != nil {
|
if err := server.Run(address); err != nil {
|
||||||
exceptionChannel <- err
|
exceptionChannel <- err
|
||||||
} else {
|
|
||||||
running = append(running, server)
|
|
||||||
}
|
}
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
if !startFinish {
|
if !startFinish {
|
||||||
startFinish = true
|
startFinish = true
|
||||||
wait.Done()
|
wait.Done()
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
@ -77,23 +78,23 @@ func New(network Network, options ...Option) *Server {
|
||||||
|
|
||||||
// Server 网络服务器
|
// Server 网络服务器
|
||||||
type Server struct {
|
type Server struct {
|
||||||
*event // 事件
|
*event // 事件
|
||||||
*runtime // 运行时
|
*runtime // 运行时
|
||||||
*option // 可选项
|
*option // 可选项
|
||||||
network Network // 网络类型
|
network Network // 网络类型
|
||||||
addr string // 侦听地址
|
addr string // 侦听地址
|
||||||
systemSignal chan os.Signal // 系统信号
|
systemSignal chan os.Signal // 系统信号
|
||||||
online *concurrent.BalanceMap[string, *Conn] // 在线连接
|
online *concurrent.BalanceMap[string, *Conn] // 在线连接
|
||||||
ginServer *gin.Engine // HTTP模式下的路由器
|
ginServer *gin.Engine // HTTP模式下的路由器
|
||||||
httpServer *http.Server // HTTP模式下的服务器
|
httpServer *http.Server // HTTP模式下的服务器
|
||||||
grpcServer *grpc.Server // GRPC模式下的服务器
|
grpcServer *grpc.Server // GRPC模式下的服务器
|
||||||
gServer *gNet // TCP或UDP模式下的服务器
|
gServer *gNet // TCP或UDP模式下的服务器
|
||||||
isRunning bool // 是否正在运行
|
isRunning bool // 是否正在运行
|
||||||
isShutdown atomic.Bool // 是否已关闭
|
isShutdown atomic.Bool // 是否已关闭
|
||||||
closeChannel chan struct{} // 关闭信号
|
closeChannel chan struct{} // 关闭信号
|
||||||
ants *ants.Pool // 协程池
|
ants *ants.Pool // 协程池
|
||||||
messagePool *concurrent.Pool[*Message] // 消息池
|
messagePool *concurrent.Pool[*Message] // 消息池
|
||||||
//messageChannel chan *Message // 消息管道
|
messageLock sync.RWMutex // 消息锁
|
||||||
messageChannel *buffer.Unbounded[*Message] // 消息无界缓冲区
|
messageChannel *buffer.Unbounded[*Message] // 消息无界缓冲区
|
||||||
multiple *MultipleServer // 多服务器模式下的服务器
|
multiple *MultipleServer // 多服务器模式下的服务器
|
||||||
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
|
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
|
||||||
|
@ -128,6 +129,7 @@ func (slf *Server) Run(addr string) error {
|
||||||
var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr)
|
var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr)
|
||||||
var messageInitFinish = make(chan struct{}, 1)
|
var messageInitFinish = make(chan struct{}, 1)
|
||||||
var connectionInitHandle = func(callback func()) {
|
var connectionInitHandle = func(callback func()) {
|
||||||
|
slf.messageLock.Lock()
|
||||||
slf.messagePool = concurrent.NewPool[*Message](slf.messagePoolSize,
|
slf.messagePool = concurrent.NewPool[*Message](slf.messagePoolSize,
|
||||||
func() *Message {
|
func() *Message {
|
||||||
return &Message{}
|
return &Message{}
|
||||||
|
@ -140,6 +142,7 @@ func (slf *Server) Run(addr string) error {
|
||||||
slf.messageChannel = buffer.NewUnbounded[*Message](func() *Message {
|
slf.messageChannel = buffer.NewUnbounded[*Message](func() *Message {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
slf.messageLock.Unlock()
|
||||||
if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC {
|
if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC {
|
||||||
slf.gServer = &gNet{Server: slf}
|
slf.gServer = &gNet{Server: slf}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue