other: 新版 server 消息并发安全控制完善
This commit is contained in:
parent
7cb5dd069a
commit
ac929b6fcd
|
@ -1,6 +1,7 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"go.uber.org/atomic"
|
||||||
"net"
|
"net"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -41,17 +42,17 @@ func newConn(srv *server, c net.Conn, connWriter ConnWriter) *conn {
|
||||||
|
|
||||||
type conn struct {
|
type conn struct {
|
||||||
server *server
|
server *server
|
||||||
conn net.Conn // 连接
|
conn net.Conn // 连接
|
||||||
writer ConnWriter // 写入器
|
writer ConnWriter // 写入器
|
||||||
actor string // Actor 名称
|
actor atomic.String // Actor 名称
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *conn) SetActor(actor string) {
|
func (c *conn) SetActor(actor string) {
|
||||||
c.actor = actor
|
c.actor.Store(actor)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *conn) GetActor() string {
|
func (c *conn) GetActor() string {
|
||||||
return c.actor
|
return c.actor.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *conn) WritePacket(packet Packet) error {
|
func (c *conn) WritePacket(packet Packet) error {
|
||||||
|
@ -72,7 +73,7 @@ func (c *conn) WriteContext(data []byte, context interface{}) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *conn) PushSyncMessage(handler func(srv Server, conn Conn)) {
|
func (c *conn) PushSyncMessage(handler func(srv Server, conn Conn)) {
|
||||||
if err := c.server.reactor.Dispatch(c.actor, SyncMessage(c.server, func(srv *server) {
|
if err := c.server.reactor.AutoDispatch(c.GetActor(), SyncMessage(c.server, func(srv *server) {
|
||||||
handler(srv, c)
|
handler(srv, c)
|
||||||
})); err != nil {
|
})); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -80,7 +81,7 @@ func (c *conn) PushSyncMessage(handler func(srv Server, conn Conn)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *conn) PushAsyncMessage(handler func(srv Server, conn Conn) error, callbacks ...func(srv Server, conn Conn, err error)) {
|
func (c *conn) PushAsyncMessage(handler func(srv Server, conn Conn) error, callbacks ...func(srv Server, conn Conn, err error)) {
|
||||||
if err := c.server.reactor.Dispatch(c.actor, AsyncMessage(c.server, c.actor, func(srv *server) error {
|
if err := c.server.reactor.AutoDispatch(c.GetActor(), AsyncMessage(c.server, c.GetActor(), func(srv *server) error {
|
||||||
return handler(srv, c)
|
return handler(srv, c)
|
||||||
}, func(srv *server, err error) {
|
}, func(srv *server, err error) {
|
||||||
for _, callback := range callbacks {
|
for _, callback := range callbacks {
|
||||||
|
|
|
@ -2,7 +2,6 @@ package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/kercylan98/minotaur/server/internal/v2/queue"
|
"github.com/kercylan98/minotaur/server/internal/v2/queue"
|
||||||
"github.com/kercylan98/minotaur/server/internal/v2/reactor"
|
|
||||||
"github.com/kercylan98/minotaur/utils/log/v2"
|
"github.com/kercylan98/minotaur/utils/log/v2"
|
||||||
"github.com/kercylan98/minotaur/utils/super"
|
"github.com/kercylan98/minotaur/utils/super"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
|
@ -44,11 +43,7 @@ type asyncMessage struct {
|
||||||
func (s *asyncMessage) Execute() {
|
func (s *asyncMessage) Execute() {
|
||||||
var q *queue.Queue[int, string, Message]
|
var q *queue.Queue[int, string, Message]
|
||||||
var dispatch = func(ident string, message Message, beforeHandler ...func(queue *queue.Queue[int, string, Message], msg Message)) {
|
var dispatch = func(ident string, message Message, beforeHandler ...func(queue *queue.Queue[int, string, Message], msg Message)) {
|
||||||
if ident == "" {
|
_ = s.srv.reactor.AutoDispatch(ident, message, beforeHandler...)
|
||||||
_ = s.srv.reactor.DispatchWithSystem(message, beforeHandler...)
|
|
||||||
} else {
|
|
||||||
_ = s.srv.reactor.Dispatch(ident, message, beforeHandler...)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dispatch(
|
dispatch(
|
||||||
|
@ -89,7 +84,7 @@ func (s *asyncMessage) Execute() {
|
||||||
})
|
})
|
||||||
}),
|
}),
|
||||||
func(queue *queue.Queue[int, string, Message], msg Message) {
|
func(queue *queue.Queue[int, string, Message], msg Message) {
|
||||||
queue.WaitAdd(reactor.SysIdent, 1)
|
queue.WaitAdd(s.ident, 1)
|
||||||
q = queue
|
q = queue
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
|
@ -85,6 +85,7 @@ func (opt *Options) IsDebug() bool {
|
||||||
func (opt *Options) WithLogger(logger *log.Logger) *Options {
|
func (opt *Options) WithLogger(logger *log.Logger) *Options {
|
||||||
return opt.modifyOptionsValue(func(opt *Options) {
|
return opt.modifyOptionsValue(func(opt *Options) {
|
||||||
opt.logger = logger
|
opt.logger = logger
|
||||||
|
opt.server.reactor.SetLogger(opt.logger)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -67,6 +67,16 @@ type Reactor[M queue.Message] struct {
|
||||||
errorHandler ErrorHandler[M] // 错误处理器
|
errorHandler ErrorHandler[M] // 错误处理器
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetLogger 设置日志记录器
|
||||||
|
func (r *Reactor[M]) SetLogger(logger *log.Logger) {
|
||||||
|
r.logger.Store(logger)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLogger 获取日志记录器
|
||||||
|
func (r *Reactor[M]) GetLogger() *log.Logger {
|
||||||
|
return r.logger.Load()
|
||||||
|
}
|
||||||
|
|
||||||
// process 消息处理
|
// process 消息处理
|
||||||
func (r *Reactor[M]) process(msg queue.MessageWrapper[int, string, M]) {
|
func (r *Reactor[M]) process(msg queue.MessageWrapper[int, string, M]) {
|
||||||
defer func(msg queue.MessageWrapper[int, string, M]) {
|
defer func(msg queue.MessageWrapper[int, string, M]) {
|
||||||
|
@ -74,20 +84,21 @@ func (r *Reactor[M]) process(msg queue.MessageWrapper[int, string, M]) {
|
||||||
if r.errorHandler != nil {
|
if r.errorHandler != nil {
|
||||||
r.errorHandler(msg, err)
|
r.errorHandler(msg, err)
|
||||||
} else {
|
} else {
|
||||||
r.logger.Load().Error("Reactor", log.Int("queue", msg.Queue().Id()), log.String("ident", msg.Ident()), log.Err(err))
|
r.GetLogger().Error("Reactor", log.String("action", "process"), log.String("ident", msg.Ident()), log.Int("queue", msg.Queue().Id()), log.Err(err))
|
||||||
debug.PrintStack()
|
debug.PrintStack()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(msg)
|
}(msg)
|
||||||
|
|
||||||
r.handler(msg)
|
r.handler(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AutoDispatch 自动分发,当 ident 为空字符串时,分发到系统级别的队列,否则分发到 ident 使用的队列
|
// AutoDispatch 自动分发,当 ident 为空字符串时,分发到系统级别的队列,否则分发到 ident 使用的队列
|
||||||
func (r *Reactor[M]) AutoDispatch(ident string, msg M) error {
|
func (r *Reactor[M]) AutoDispatch(ident string, msg M, beforeHandler ...func(queue *queue.Queue[int, string, M], msg M)) error {
|
||||||
if ident == str.None {
|
if ident == str.None {
|
||||||
return r.DispatchWithSystem(msg)
|
return r.DispatchWithSystem(msg, beforeHandler...)
|
||||||
}
|
}
|
||||||
return r.Dispatch(ident, msg)
|
return r.Dispatch(ident, msg, beforeHandler...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DispatchWithSystem 将消息分发到系统级别的队列
|
// DispatchWithSystem 将消息分发到系统级别的队列
|
||||||
|
@ -120,6 +131,9 @@ func (r *Reactor[M]) Dispatch(ident string, msg M, beforeHandler ...func(queue *
|
||||||
if i, exist = r.location[ident]; !exist {
|
if i, exist = r.location[ident]; !exist {
|
||||||
next = r.lb.Next()
|
next = r.lb.Next()
|
||||||
r.location[ident] = next.Id()
|
r.location[ident] = next.Id()
|
||||||
|
r.logger.Load().Debug("Reactor", log.String("action", "bind"), log.String("ident", ident), log.Any("queue", next.Id()))
|
||||||
|
} else {
|
||||||
|
next = r.queues[i]
|
||||||
}
|
}
|
||||||
r.locationRW.Unlock()
|
r.locationRW.Unlock()
|
||||||
} else {
|
} else {
|
||||||
|
@ -133,16 +147,19 @@ func (r *Reactor[M]) Dispatch(ident string, msg M, beforeHandler ...func(queue *
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run 启动 Reactor,运行系统级别的队列和多个 Socket 对应的队列
|
// Run 启动 Reactor,运行系统级别的队列和多个 Socket 对应的队列
|
||||||
func (r *Reactor[M]) Run() {
|
func (r *Reactor[M]) Run(callbacks ...func(queues []*queue.Queue[int, string, M])) {
|
||||||
if !atomic.CompareAndSwapInt32(&r.state, statusNone, statusRunning) {
|
if !atomic.CompareAndSwapInt32(&r.state, statusNone, statusRunning) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
r.queueRW.Lock()
|
r.queueRW.Lock()
|
||||||
r.runQueue(r.systemQueue)
|
queues := append([]*queue.Queue[int, string, M]{r.systemQueue}, r.queues...)
|
||||||
for i := 0; i < len(r.queues); i++ {
|
for _, q := range queues {
|
||||||
r.runQueue(r.queues[i])
|
r.runQueue(q)
|
||||||
}
|
}
|
||||||
r.queueRW.Unlock()
|
r.queueRW.Unlock()
|
||||||
|
for _, callback := range callbacks {
|
||||||
|
callback(queues)
|
||||||
|
}
|
||||||
r.wg.Wait()
|
r.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -169,6 +186,7 @@ func (r *Reactor[M]) runQueue(q *queue.Queue[int, string, M]) {
|
||||||
// 关闭时正在等待关闭完成,外部已加锁,无需再次加锁
|
// 关闭时正在等待关闭完成,外部已加锁,无需再次加锁
|
||||||
r.noneLockDelQueue(q)
|
r.noneLockDelQueue(q)
|
||||||
r.cwg.Done()
|
r.cwg.Done()
|
||||||
|
r.logger.Load().Debug("Reactor", log.String("action", "close"), log.Any("queue", q.Id()))
|
||||||
})
|
})
|
||||||
go q.Run()
|
go q.Run()
|
||||||
|
|
||||||
|
@ -177,9 +195,21 @@ func (r *Reactor[M]) runQueue(q *queue.Queue[int, string, M]) {
|
||||||
for m := range q.Read() {
|
for m := range q.Read() {
|
||||||
m(r.process, func(m queue.MessageWrapper[int, string, M], last bool) {
|
m(r.process, func(m queue.MessageWrapper[int, string, M], last bool) {
|
||||||
if last {
|
if last {
|
||||||
r.queueRW.Lock()
|
r.locationRW.RLock()
|
||||||
defer r.queueRW.Unlock()
|
mq, exist := r.location[m.Ident()]
|
||||||
delete(r.location, m.Ident())
|
r.locationRW.RUnlock()
|
||||||
|
if exist {
|
||||||
|
r.locationRW.Lock()
|
||||||
|
defer r.locationRW.Unlock()
|
||||||
|
mq, exist = r.location[m.Ident()]
|
||||||
|
if exist {
|
||||||
|
delete(r.location, m.Ident())
|
||||||
|
r.queueRW.RLock()
|
||||||
|
mq := r.queues[mq]
|
||||||
|
r.queueRW.RUnlock()
|
||||||
|
r.logger.Load().Debug("Reactor", log.String("action", "unbind"), log.String("ident", m.Ident()), log.Any("queue", mq.Id()))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,8 +3,10 @@ package reactor_test
|
||||||
import (
|
import (
|
||||||
"github.com/kercylan98/minotaur/server/internal/v2/queue"
|
"github.com/kercylan98/minotaur/server/internal/v2/queue"
|
||||||
"github.com/kercylan98/minotaur/server/internal/v2/reactor"
|
"github.com/kercylan98/minotaur/server/internal/v2/reactor"
|
||||||
|
"github.com/kercylan98/minotaur/utils/log/v2"
|
||||||
"github.com/kercylan98/minotaur/utils/random"
|
"github.com/kercylan98/minotaur/utils/random"
|
||||||
"github.com/kercylan98/minotaur/utils/times"
|
"github.com/kercylan98/minotaur/utils/times"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -16,6 +18,8 @@ func BenchmarkReactor_Dispatch(b *testing.B) {
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
r.SetLogger(log.NewLogger(log.NewHandler(os.Stdout, log.DefaultOptions().WithLevel(log.LevelInfo))))
|
||||||
|
|
||||||
go r.Run()
|
go r.Run()
|
||||||
|
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
|
|
@ -64,6 +64,7 @@ func NewServer(network Network, options ...*Options) Server {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
srv.Options.init(srv).Apply(options...)
|
srv.Options.init(srv).Apply(options...)
|
||||||
|
srv.reactor.SetLogger(srv.Options.GetLogger())
|
||||||
antsPool, err := ants.NewPool(ants.DefaultAntsPoolSize, ants.WithOptions(ants.Options{
|
antsPool, err := ants.NewPool(ants.DefaultAntsPoolSize, ants.WithOptions(ants.Options{
|
||||||
ExpiryDuration: 10 * time.Second,
|
ExpiryDuration: 10 * time.Second,
|
||||||
Nonblocking: true,
|
Nonblocking: true,
|
||||||
|
@ -80,7 +81,15 @@ func NewServer(network Network, options ...*Options) Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) Run() (err error) {
|
func (s *server) Run() (err error) {
|
||||||
go s.reactor.Run()
|
var queueWait = make(chan struct{})
|
||||||
|
go s.reactor.Run(func(queues []*queue.Queue[int, string, Message]) {
|
||||||
|
for _, q := range queues {
|
||||||
|
s.GetLogger().Debug("Reactor", log.String("action", "run"), log.Any("queue", q.Id()))
|
||||||
|
}
|
||||||
|
close(queueWait)
|
||||||
|
})
|
||||||
|
<-queueWait
|
||||||
|
|
||||||
if err = s.network.OnSetup(s.ctx, s); err != nil {
|
if err = s.network.OnSetup(s.ctx, s); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"github.com/gobwas/ws"
|
"github.com/gobwas/ws"
|
||||||
"github.com/kercylan98/minotaur/server/internal/v2"
|
"github.com/kercylan98/minotaur/server/internal/v2"
|
||||||
"github.com/kercylan98/minotaur/server/internal/v2/network"
|
"github.com/kercylan98/minotaur/server/internal/v2/network"
|
||||||
|
"github.com/kercylan98/minotaur/utils/random"
|
||||||
"github.com/kercylan98/minotaur/utils/times"
|
"github.com/kercylan98/minotaur/utils/times"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -25,10 +26,20 @@ func TestNewServer(t *testing.T) {
|
||||||
|
|
||||||
srv := server.NewServer(network.WebSocket(":9999"), server.NewOptions().WithLifeCycleLimit(times.Day*3))
|
srv := server.NewServer(network.WebSocket(":9999"), server.NewOptions().WithLifeCycleLimit(times.Day*3))
|
||||||
|
|
||||||
|
var tm = make(map[string]bool)
|
||||||
|
|
||||||
srv.RegisterConnectionOpenedEvent(func(srv server.Server, conn server.Conn) {
|
srv.RegisterConnectionOpenedEvent(func(srv server.Server, conn server.Conn) {
|
||||||
|
conn.SetActor("12321")
|
||||||
if err := conn.WritePacket(server.NewPacket([]byte("hello")).SetContext(ws.OpText)); err != nil {
|
if err := conn.WritePacket(server.NewPacket([]byte("hello")).SetContext(ws.OpText)); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
conn.PushSyncMessage(func(srv server.Server, conn server.Conn) {
|
||||||
|
for i := 0; i < 10000000; i++ {
|
||||||
|
_ = tm["1"]
|
||||||
|
tm["1"] = random.Bool()
|
||||||
|
}
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
srv.RegisterConnectionReceivePacketEvent(func(srv server.Server, conn server.Conn, packet server.Packet) {
|
srv.RegisterConnectionReceivePacketEvent(func(srv server.Server, conn server.Conn, packet server.Packet) {
|
||||||
|
|
Loading…
Reference in New Issue