other: 优化消息分发
This commit is contained in:
parent
64c165317b
commit
e84a6ee1ae
|
@ -15,8 +15,8 @@ type Conn interface {
|
|||
// DelActor 删除连接使用的 Actor
|
||||
DelActor()
|
||||
|
||||
// GetActor 获取连接使用的 Actor 名称及是否拥有 Actor 名称的状态
|
||||
GetActor() (string, bool)
|
||||
// GetActor 获取连接使用的 Actor 名称
|
||||
GetActor() string
|
||||
|
||||
// WritePacket 写入一个 Packet
|
||||
WritePacket(packet Packet) error
|
||||
|
@ -29,15 +29,6 @@ type Conn interface {
|
|||
|
||||
// WriteContext 写入数据
|
||||
WriteContext(data []byte, context interface{}) error
|
||||
|
||||
// PushMessage 通过连接推送特定消息到队列中进行处理
|
||||
PushMessage(message Message)
|
||||
|
||||
// PushSyncMessage 是 PushMessage 中对于 GenerateConnSyncMessage 的快捷方式
|
||||
PushSyncMessage(handler func(srv Server, conn Conn))
|
||||
|
||||
// PushAsyncMessage 是 PushMessage 中对于 GenerateConnAsyncMessage 的快捷方式,当 callback 传入多个时,将仅有首个生效
|
||||
PushAsyncMessage(handler func(srv Server, conn Conn) error, callback ...func(srv Server, conn Conn, err error))
|
||||
}
|
||||
|
||||
func newConn(srv *server, c net.Conn, connWriter ConnWriter) *conn {
|
||||
|
@ -63,12 +54,12 @@ func (c *conn) DelActor() {
|
|||
c.actor.Store(nil)
|
||||
}
|
||||
|
||||
func (c *conn) GetActor() (string, bool) {
|
||||
func (c *conn) GetActor() string {
|
||||
ident := c.actor.Load()
|
||||
if ident == nil {
|
||||
return "", false
|
||||
return ""
|
||||
}
|
||||
return *ident, true
|
||||
return *ident
|
||||
}
|
||||
|
||||
func (c *conn) WritePacket(packet Packet) error {
|
||||
|
@ -87,30 +78,3 @@ func (c *conn) WriteBytes(data []byte) error {
|
|||
func (c *conn) WriteContext(data []byte, context interface{}) error {
|
||||
return c.writer(NewPacket(data).SetContext(context))
|
||||
}
|
||||
|
||||
func (c *conn) PushMessage(message Message) {
|
||||
c.getDispatchHandler()(message)
|
||||
}
|
||||
|
||||
func (c *conn) PushSyncMessage(handler func(srv Server, conn Conn)) {
|
||||
c.PushMessage(GenerateConnSyncMessage(c, handler))
|
||||
}
|
||||
|
||||
func (c *conn) PushAsyncMessage(handler func(srv Server, conn Conn) error, callback ...func(srv Server, conn Conn, err error)) {
|
||||
var cb func(srv Server, conn Conn, err error)
|
||||
if len(callback) > 0 {
|
||||
cb = callback[0]
|
||||
}
|
||||
c.PushMessage(GenerateConnAsyncMessage(c, handler, cb))
|
||||
}
|
||||
|
||||
func (c *conn) getDispatchHandler() func(message Message) {
|
||||
var ident, exist = c.GetActor()
|
||||
return func(message Message) {
|
||||
if !exist {
|
||||
c.server.PushSystemMessage(message)
|
||||
} else {
|
||||
c.server.PushIdentMessage(ident, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"github.com/kercylan98/minotaur/utils/log"
|
||||
"context"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"net"
|
||||
)
|
||||
|
@ -18,12 +18,6 @@ type Controller interface {
|
|||
ReactPacket(conn net.Conn, packet Packet)
|
||||
// GetAnts 获取服务器异步池
|
||||
GetAnts() *ants.Pool
|
||||
// PushSystemMessage 推送系统消息
|
||||
PushSystemMessage(message Message, errorHandlers ...func(err error))
|
||||
// PushIdentMessage 推送标识消息
|
||||
PushIdentMessage(ident string, message Message, errorHandlers ...func(err error))
|
||||
// MessageErrProcess 消息错误处理
|
||||
MessageErrProcess(message Message, err error)
|
||||
}
|
||||
|
||||
type controller struct {
|
||||
|
@ -41,73 +35,37 @@ func (s *controller) GetServer() Server {
|
|||
return s.server
|
||||
}
|
||||
|
||||
func (s *controller) MessageErrProcess(message Message, err error) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
if s.server.messageErrorHandler != nil {
|
||||
s.server.messageErrorHandler(s.server, message, err)
|
||||
} else {
|
||||
s.server.GetLogger().Error("Server", log.Err(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *controller) GetAnts() *ants.Pool {
|
||||
return s.server.ants
|
||||
}
|
||||
|
||||
func (s *controller) PushSystemMessage(message Message, errorHandlers ...func(err error)) {
|
||||
if err := s.server.reactor.SystemDispatch(message); err != nil {
|
||||
for _, f := range errorHandlers {
|
||||
f(err)
|
||||
}
|
||||
s.MessageErrProcess(message, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *controller) PushIdentMessage(ident string, message Message, errorHandlers ...func(err error)) {
|
||||
if err := s.server.reactor.IdentDispatch(ident, message); err != nil {
|
||||
for _, f := range errorHandlers {
|
||||
f(err)
|
||||
}
|
||||
s.MessageErrProcess(message, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *controller) RegisterConnection(conn net.Conn, writer ConnWriter) {
|
||||
s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) {
|
||||
s.server.PublishSyncMessage(s.getSysQueue(), func(ctx context.Context, srv Server) {
|
||||
c := newConn(s.server, conn, writer)
|
||||
s.server.connections[conn] = c
|
||||
s.events.onConnectionOpened(c)
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
func (s *controller) EliminateConnection(conn net.Conn, err error) {
|
||||
s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) {
|
||||
s.server.PublishSyncMessage(s.getSysQueue(), func(ctx context.Context, srv Server) {
|
||||
c, exist := s.server.connections[conn]
|
||||
if !exist {
|
||||
return
|
||||
}
|
||||
delete(s.server.connections, conn)
|
||||
s.server.events.onConnectionClosed(c, err)
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
func (s *controller) ReactPacket(conn net.Conn, packet Packet) {
|
||||
s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) {
|
||||
s.server.PublishSyncMessage(s.getSysQueue(), func(ctx context.Context, srv Server) {
|
||||
c, exist := s.server.connections[conn]
|
||||
if !exist {
|
||||
return
|
||||
}
|
||||
ident, exist := c.GetActor()
|
||||
if !exist {
|
||||
s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) {
|
||||
s.PublishSyncMessage(c.GetActor(), func(ctx context.Context, srv Server) {
|
||||
s.events.onConnectionReceivePacket(c, packet)
|
||||
}))
|
||||
} else {
|
||||
s.PushIdentMessage(ident, GenerateSystemSyncMessage(func(srv Server) {
|
||||
s.events.onConnectionReceivePacket(c, packet)
|
||||
}))
|
||||
}
|
||||
}))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/kercylan98/minotaur/server/internal/v2/message/messages"
|
||||
"github.com/kercylan98/minotaur/utils/log"
|
||||
"reflect"
|
||||
"time"
|
||||
|
@ -66,12 +68,17 @@ func (s *events) onLaunched() {
|
|||
opt.logger.Info("Minotaur Server", log.String("", "============================================================================"))
|
||||
})
|
||||
|
||||
s.PushMessage(GenerateSystemSyncMessage(func(srv Server) {
|
||||
s.PublishMessage(messages.Synchronous(
|
||||
Server(s.server),
|
||||
newProducer(s.server, nil),
|
||||
s.server.getSysQueue(),
|
||||
func(ctx context.Context, broker Server) {
|
||||
s.launchedEventHandlers.RangeValue(func(index int, value LaunchedEventHandler) bool {
|
||||
value(s.server, s.server.state.Ip, s.server.state.LaunchedAt)
|
||||
return true
|
||||
})
|
||||
}))
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
func (s *events) RegisterConnectionOpenedEvent(handler ConnectionOpenedEventHandler, priority ...int) {
|
||||
|
@ -79,12 +86,12 @@ func (s *events) RegisterConnectionOpenedEvent(handler ConnectionOpenedEventHand
|
|||
}
|
||||
|
||||
func (s *events) onConnectionOpened(conn Conn) {
|
||||
s.PushMessage(GenerateSystemSyncMessage(func(srv Server) {
|
||||
s.PublishSyncMessage(s.getSysQueue(), func(ctx context.Context, srv Server) {
|
||||
s.connectionOpenedEventHandlers.RangeValue(func(index int, value ConnectionOpenedEventHandler) bool {
|
||||
value(s.server, conn)
|
||||
value(srv, conn)
|
||||
return true
|
||||
})
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
func (s *events) RegisterConnectionClosedEvent(handler ConnectionClosedEventHandler, priority ...int) {
|
||||
|
@ -92,12 +99,12 @@ func (s *events) RegisterConnectionClosedEvent(handler ConnectionClosedEventHand
|
|||
}
|
||||
|
||||
func (s *events) onConnectionClosed(conn Conn, err error) {
|
||||
s.PushMessage(GenerateSystemSyncMessage(func(srv Server) {
|
||||
s.PublishSyncMessage(s.getSysQueue(), func(ctx context.Context, srv Server) {
|
||||
s.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool {
|
||||
value(s.server, conn, err)
|
||||
value(srv, conn, err)
|
||||
return true
|
||||
})
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
func (s *events) RegisterConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int) {
|
||||
|
@ -105,12 +112,12 @@ func (s *events) RegisterConnectionReceivePacketEvent(handler ConnectionReceiveP
|
|||
}
|
||||
|
||||
func (s *events) onConnectionReceivePacket(conn *conn, packet Packet) {
|
||||
conn.getDispatchHandler()(GenerateConnSyncMessage(conn, func(srv Server, conn Conn) {
|
||||
s.PublishSyncMessage(conn.GetActor(), func(ctx context.Context, srv Server) {
|
||||
s.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool {
|
||||
value(s.server, conn, packet)
|
||||
value(srv, conn, packet)
|
||||
return true
|
||||
})
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
func (s *events) RegisterShutdownEvent(handler ShutdownEventHandler, priority ...int) {
|
||||
|
@ -118,10 +125,10 @@ func (s *events) RegisterShutdownEvent(handler ShutdownEventHandler, priority ..
|
|||
}
|
||||
|
||||
func (s *events) onShutdown() {
|
||||
s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) {
|
||||
s.PublishSyncMessage(s.getSysQueue(), func(ctx context.Context, srv Server) {
|
||||
s.shutdownEventHandlers.RangeValue(func(index int, value ShutdownEventHandler) bool {
|
||||
value(s.server)
|
||||
value(srv)
|
||||
return true
|
||||
})
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,249 +0,0 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"github.com/kercylan98/minotaur/server/internal/v2/queue"
|
||||
"github.com/kercylan98/minotaur/server/internal/v2/reactor"
|
||||
)
|
||||
|
||||
type Message interface {
|
||||
// OnInitialize 消息初始化阶段将会被告知消息所在服务器、反应器、队列及标识信息
|
||||
OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message])
|
||||
|
||||
// OnProcess 消息处理阶段需要完成对消息的处理,并返回处理结果
|
||||
OnProcess()
|
||||
}
|
||||
|
||||
// GenerateSystemSyncMessage 生成系统同步消息
|
||||
func GenerateSystemSyncMessage(handler func(srv Server)) Message {
|
||||
return &systemSyncMessage{handler: handler}
|
||||
}
|
||||
|
||||
type systemSyncMessage struct {
|
||||
controller Controller
|
||||
handler func(srv Server)
|
||||
}
|
||||
|
||||
func (m *systemSyncMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) {
|
||||
m.controller = controller
|
||||
}
|
||||
|
||||
func (m *systemSyncMessage) OnProcess() {
|
||||
m.handler(m.controller.GetServer())
|
||||
}
|
||||
|
||||
// GenerateSystemAsyncMessage 生成系统异步消息
|
||||
func GenerateSystemAsyncMessage(handler func(srv Server) error, callback func(srv Server, err error)) Message {
|
||||
return &systemAsyncMessage{
|
||||
handler: handler,
|
||||
callback: callback,
|
||||
}
|
||||
}
|
||||
|
||||
type systemAsyncMessage struct {
|
||||
controller Controller
|
||||
queue *queue.Queue[int, string, Message]
|
||||
handler func(srv Server) error
|
||||
callback func(srv Server, err error)
|
||||
hasIdent bool
|
||||
ident string
|
||||
}
|
||||
|
||||
func (m *systemAsyncMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) {
|
||||
m.controller = controller
|
||||
m.queue = message.Queue()
|
||||
m.ident = message.Ident()
|
||||
m.hasIdent = message.HasIdent()
|
||||
}
|
||||
|
||||
func (m *systemAsyncMessage) OnProcess() {
|
||||
var ident = m.ident
|
||||
|
||||
m.queue.WaitAdd(ident, 1)
|
||||
err := m.controller.GetAnts().Submit(func() {
|
||||
err := m.handler(m.controller.GetServer())
|
||||
if !m.hasIdent {
|
||||
m.controller.PushSystemMessage(GenerateSystemAsyncCallbackMessage(m.callback, err), func(err error) {
|
||||
m.queue.WaitAdd(ident, -1)
|
||||
})
|
||||
} else {
|
||||
m.controller.PushIdentMessage(ident, GenerateSystemAsyncCallbackMessage(m.callback, err), func(err error) {
|
||||
m.queue.WaitAdd(ident, -1)
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
m.queue.WaitAdd(ident, -1)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
m.controller.MessageErrProcess(m, err)
|
||||
m.queue.WaitAdd(ident, -1)
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateSystemAsyncCallbackMessage 生成系统异步回调消息
|
||||
func GenerateSystemAsyncCallbackMessage(handler func(srv Server, err error), err error) Message {
|
||||
return &systemAsyncCallbackMessage{
|
||||
err: err,
|
||||
handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
type systemAsyncCallbackMessage struct {
|
||||
controller Controller
|
||||
err error
|
||||
handler func(srv Server, err error)
|
||||
queue *queue.Queue[int, string, Message]
|
||||
ident string
|
||||
}
|
||||
|
||||
func (m *systemAsyncCallbackMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) {
|
||||
m.controller = controller
|
||||
m.queue = message.Queue()
|
||||
m.ident = message.Ident()
|
||||
}
|
||||
|
||||
func (m *systemAsyncCallbackMessage) OnProcess() {
|
||||
defer func(m *systemAsyncCallbackMessage) {
|
||||
m.queue.WaitAdd(m.ident, -1)
|
||||
}(m)
|
||||
|
||||
if m.handler != nil {
|
||||
m.handler(m.controller.GetServer(), m.err)
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateConnSyncMessage 生成连接同步消息
|
||||
func GenerateConnSyncMessage(conn Conn, handler func(srv Server, conn Conn)) Message {
|
||||
return &connSyncMessage{handler: handler, conn: conn}
|
||||
}
|
||||
|
||||
type connSyncMessage struct {
|
||||
controller Controller
|
||||
conn Conn
|
||||
handler func(srv Server, conn Conn)
|
||||
}
|
||||
|
||||
func (m *connSyncMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) {
|
||||
m.controller = controller
|
||||
}
|
||||
|
||||
func (m *connSyncMessage) OnProcess() {
|
||||
m.handler(m.controller.GetServer(), m.conn)
|
||||
}
|
||||
|
||||
// GenerateConnAsyncMessage 生成连接异步消息
|
||||
func GenerateConnAsyncMessage(conn Conn, handler func(srv Server, conn Conn) error, callback func(srv Server, conn Conn, err error)) Message {
|
||||
return &connAsyncMessage{
|
||||
conn: conn,
|
||||
handler: handler,
|
||||
callback: callback,
|
||||
}
|
||||
}
|
||||
|
||||
type connAsyncMessage struct {
|
||||
controller Controller
|
||||
conn Conn
|
||||
queue *queue.Queue[int, string, Message]
|
||||
handler func(srv Server, conn Conn) error
|
||||
callback func(srv Server, conn Conn, err error)
|
||||
ident string
|
||||
hasIdent bool
|
||||
}
|
||||
|
||||
func (m *connAsyncMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) {
|
||||
m.controller = controller
|
||||
m.queue = message.Queue()
|
||||
m.ident = message.Ident()
|
||||
m.hasIdent = message.HasIdent()
|
||||
}
|
||||
|
||||
func (m *connAsyncMessage) OnProcess() {
|
||||
m.queue.WaitAdd(m.ident, 1)
|
||||
err := m.controller.GetAnts().Submit(func() {
|
||||
err := m.handler(m.controller.GetServer(), m.conn)
|
||||
if !m.hasIdent {
|
||||
m.controller.PushSystemMessage(GenerateConnAsyncCallbackMessage(m.conn, m.callback, err), func(err error) {
|
||||
m.queue.WaitAdd(m.ident, -1)
|
||||
})
|
||||
} else {
|
||||
m.controller.PushIdentMessage(m.ident, GenerateConnAsyncCallbackMessage(m.conn, m.callback, err), func(err error) {
|
||||
m.queue.WaitAdd(m.ident, -1)
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
m.queue.WaitAdd(m.ident, -1)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
m.controller.MessageErrProcess(m, err)
|
||||
m.queue.WaitAdd(m.ident, -1)
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateConnAsyncCallbackMessage 生成连接异步回调消息
|
||||
func GenerateConnAsyncCallbackMessage(conn Conn, handler func(srv Server, conn Conn, err error), err error) Message {
|
||||
return &connAsyncCallbackMessage{
|
||||
conn: conn,
|
||||
err: err,
|
||||
handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
type connAsyncCallbackMessage struct {
|
||||
controller Controller
|
||||
conn Conn
|
||||
err error
|
||||
handler func(srv Server, conn Conn, err error)
|
||||
queue *queue.Queue[int, string, Message]
|
||||
ident string
|
||||
}
|
||||
|
||||
func (m *connAsyncCallbackMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) {
|
||||
m.controller = controller
|
||||
m.queue = message.Queue()
|
||||
m.ident = message.Ident()
|
||||
}
|
||||
|
||||
func (m *connAsyncCallbackMessage) OnProcess() {
|
||||
defer func(m *connAsyncCallbackMessage) {
|
||||
m.queue.WaitAdd(m.ident, -1)
|
||||
}(m)
|
||||
|
||||
if m.handler != nil {
|
||||
m.handler(m.controller.GetServer(), m.conn, m.err)
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateCrossQueueMessage 生成跨队列消息,该消息将会把消息传入对应 ident 所在队列进行处理,并在处理完成时进行回调
|
||||
func GenerateCrossQueueMessage(targetIdent string, handler func(srv Server), callback func(srv Server)) Message {
|
||||
return &crossQueueMessage{
|
||||
targetIdent: targetIdent,
|
||||
handler: handler,
|
||||
callback: callback,
|
||||
}
|
||||
}
|
||||
|
||||
type crossQueueMessage struct {
|
||||
controller Controller
|
||||
message queue.MessageWrapper[int, string, Message]
|
||||
handler func(srv Server)
|
||||
callback func(srv Server)
|
||||
targetIdent string
|
||||
}
|
||||
|
||||
func (m *crossQueueMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) {
|
||||
m.controller = controller
|
||||
m.message = message
|
||||
}
|
||||
|
||||
func (m *crossQueueMessage) OnProcess() {
|
||||
m.controller.PushIdentMessage(m.targetIdent, GenerateSystemSyncMessage(func(srv Server) {
|
||||
m.handler(srv)
|
||||
|
||||
if m.message.HasIdent() {
|
||||
m.controller.PushIdentMessage(m.message.Ident(), GenerateSystemSyncMessage(m.callback))
|
||||
} else {
|
||||
m.controller.PushSystemMessage(GenerateSystemSyncMessage(m.callback))
|
||||
}
|
||||
}))
|
||||
}
|
|
@ -0,0 +1,6 @@
|
|||
package message
|
||||
|
||||
// Broker 消息核心的接口定义
|
||||
type Broker[P Producer, Q Queue] interface {
|
||||
PublishMessage(message Message[P, Q])
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
package message
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/kercylan98/minotaur/server/internal/v2/queue"
|
||||
)
|
||||
|
||||
type Message[P Producer, Q Queue] interface {
|
||||
OnInitialize(ctx context.Context)
|
||||
OnProcess()
|
||||
|
||||
// GetProducer 获取消息生产者
|
||||
GetProducer() P
|
||||
|
||||
queue.Message[Q]
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
package messages
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/kercylan98/minotaur/server/internal/v2/message"
|
||||
"github.com/kercylan98/minotaur/server/internal/v2/queue"
|
||||
)
|
||||
|
||||
type (
|
||||
AsynchronousActuator[P message.Producer, Q message.Queue, B message.Broker[P, Q]] func(context.Context, B, func(context.Context, B)) // 负责执行异步消息的执行器
|
||||
AsynchronousHandler[P message.Producer, Q message.Queue, B message.Broker[P, Q]] func(context.Context, B) error // 异步消息逻辑处理器
|
||||
AsynchronousCallbackHandler[P message.Producer, Q message.Queue, B message.Broker[P, Q]] func(context.Context, B, error) // 异步消息回调处理器
|
||||
)
|
||||
|
||||
// Asynchronous 创建一个异步消息实例,并指定相应的处理器。
|
||||
// 该函数接收以下参数:
|
||||
// - broker:消息所属的 Broker 实例。
|
||||
// - actuator:异步消息的执行器,负责执行异步消息的逻辑,当该参数为空时,将会使用默认的 go func()。
|
||||
// - handler:异步消息的逻辑处理器,用于执行实际的异步消息处理逻辑,可选参数。
|
||||
// - callback:异步消息的回调处理器,处理消息处理完成后的回调操作,可选参数。
|
||||
// - afterHandler:异步消息执行完成后的处理器,用于进行后续的处理操作,可选参数。
|
||||
//
|
||||
// 该函数除了 handler,其他所有处理器均为同步执行
|
||||
//
|
||||
// 返回值为一个实现了 Message 接口的异步消息实例。
|
||||
func Asynchronous[P message.Producer, Q message.Queue, B message.Broker[P, Q]](
|
||||
broker B, producer P, queue Q,
|
||||
actuator AsynchronousActuator[P, Q, B],
|
||||
handler AsynchronousHandler[P, Q, B],
|
||||
callback AsynchronousCallbackHandler[P, Q, B],
|
||||
) message.Message[P, Q] {
|
||||
m := &asynchronous[P, Q, B]{
|
||||
broker: broker,
|
||||
producer: producer,
|
||||
queue: queue,
|
||||
actuator: actuator,
|
||||
handler: handler,
|
||||
callback: callback,
|
||||
}
|
||||
if m.actuator == nil {
|
||||
m.actuator = func(ctx context.Context, b B, f func(context.Context, B)) {
|
||||
go f(ctx, b)
|
||||
}
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
type asynchronous[P message.Producer, Q message.Queue, B message.Broker[P, Q]] struct {
|
||||
broker B
|
||||
producer P
|
||||
queue Q
|
||||
ctx context.Context
|
||||
actuator AsynchronousActuator[P, Q, B]
|
||||
handler AsynchronousHandler[P, Q, B]
|
||||
callback AsynchronousCallbackHandler[P, Q, B]
|
||||
}
|
||||
|
||||
func (s *asynchronous[P, Q, B]) OnPublished(controller queue.Controller) {
|
||||
controller.IncrementCustomMessageCount(1)
|
||||
}
|
||||
|
||||
func (s *asynchronous[P, Q, B]) OnProcessed(controller queue.Controller) {
|
||||
controller.IncrementCustomMessageCount(-1)
|
||||
}
|
||||
|
||||
func (s *asynchronous[P, Q, B]) OnInitialize(ctx context.Context) {
|
||||
s.ctx = ctx
|
||||
}
|
||||
|
||||
func (s *asynchronous[P, Q, B]) OnProcess() {
|
||||
s.actuator(s.ctx, s.broker, func(ctx context.Context, broker B) {
|
||||
var err error
|
||||
if s.handler != nil {
|
||||
err = s.handler(s.ctx, s.broker)
|
||||
}
|
||||
|
||||
broker.PublishMessage(Synchronous(broker, s.producer, s.queue, func(ctx context.Context, broker B) {
|
||||
if s.callback != nil {
|
||||
s.callback(ctx, broker, err)
|
||||
}
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
func (s *asynchronous[P, Q, B]) GetProducer() P {
|
||||
return s.producer
|
||||
}
|
||||
|
||||
func (s *asynchronous[P, Q, B]) GetQueue() Q {
|
||||
return s.queue
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
package messages
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/kercylan98/minotaur/server/internal/v2/message"
|
||||
"github.com/kercylan98/minotaur/server/internal/v2/queue"
|
||||
)
|
||||
|
||||
type (
|
||||
SynchronousHandler[P message.Producer, Q message.Queue, B message.Broker[P, Q]] func(context.Context, B)
|
||||
)
|
||||
|
||||
func Synchronous[P message.Producer, Q message.Queue, B message.Broker[P, Q]](
|
||||
broker B, producer P, queue Q,
|
||||
handler SynchronousHandler[P, Q, B],
|
||||
) message.Message[P, Q] {
|
||||
return &synchronous[P, Q, B]{
|
||||
broker: broker,
|
||||
producer: producer,
|
||||
queue: queue,
|
||||
handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
type synchronous[P message.Producer, Q message.Queue, B message.Broker[P, Q]] struct {
|
||||
broker B
|
||||
producer P
|
||||
queue Q
|
||||
ctx context.Context
|
||||
handler SynchronousHandler[P, Q, B]
|
||||
}
|
||||
|
||||
func (s *synchronous[P, Q, B]) OnPublished(controller queue.Controller) {
|
||||
|
||||
}
|
||||
|
||||
func (s *synchronous[P, Q, B]) OnProcessed(controller queue.Controller) {
|
||||
|
||||
}
|
||||
|
||||
func (s *synchronous[P, Q, B]) OnInitialize(ctx context.Context) {
|
||||
s.ctx = ctx
|
||||
}
|
||||
|
||||
func (s *synchronous[P, Q, B]) OnProcess() {
|
||||
s.handler(s.ctx, s.broker)
|
||||
}
|
||||
|
||||
func (s *synchronous[P, Q, B]) GetProducer() P {
|
||||
return s.producer
|
||||
}
|
||||
|
||||
func (s *synchronous[P, Q, B]) GetQueue() Q {
|
||||
return s.queue
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
package message
|
||||
|
||||
type Producer interface {
|
||||
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
package message
|
||||
|
||||
type Queue comparable
|
|
@ -1,6 +1,7 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"github.com/kercylan98/minotaur/server/internal/v2/message"
|
||||
"github.com/kercylan98/minotaur/utils/log/v2"
|
||||
"os"
|
||||
"sync"
|
||||
|
@ -30,7 +31,7 @@ type Options struct {
|
|||
actorMessageChannelSize int // Actor 消息处理管道大小
|
||||
serverMessageBufferInitialSize int // 服务器 Actor 消息写入缓冲区初始化大小
|
||||
actorMessageBufferInitialSize int // Actor 消息写入缓冲区初始化大小
|
||||
messageErrorHandler func(srv Server, message Message, err error) // 消息错误处理器
|
||||
messageErrorHandler func(srv Server, message message.Message[Producer, string], err error) // 消息错误处理器
|
||||
lifeCycleLimit time.Duration // 服务器生命周期上限,在服务器启动后达到生命周期上限将关闭服务器
|
||||
logger *log.Logger // 日志记录器
|
||||
debug bool // Debug 模式
|
||||
|
@ -189,14 +190,14 @@ func (opt *Options) GetActorMessageBufferInitialSize() int {
|
|||
|
||||
// WithMessageErrorHandler 设置消息错误处理器,当消息处理出现错误时,会调用该处理器进行处理
|
||||
// - 如果在运行时设置,后续消息错误将会使用新的 handler 进行处理
|
||||
func (opt *Options) WithMessageErrorHandler(handler func(srv Server, message Message, err error)) *Options {
|
||||
func (opt *Options) WithMessageErrorHandler(handler func(srv Server, message message.Message[Producer, string], err error)) *Options {
|
||||
return opt.modifyOptionsValue(func(opt *Options) {
|
||||
opt.messageErrorHandler = handler
|
||||
})
|
||||
}
|
||||
|
||||
func (opt *Options) GetMessageErrorHandler() func(srv Server, message Message, err error) {
|
||||
return getOptionsValue(opt, func(opt *Options) func(srv Server, message Message, err error) {
|
||||
func (opt *Options) GetMessageErrorHandler() func(srv Server, message message.Message[Producer, string], err error) {
|
||||
return getOptionsValue(opt, func(opt *Options) func(srv Server, message message.Message[Producer, string], err error) {
|
||||
return opt.messageErrorHandler
|
||||
})
|
||||
}
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
package server
|
||||
|
||||
func newProducer(srv Server, conn Conn) Producer {
|
||||
return Producer{
|
||||
srv: srv,
|
||||
conn: conn,
|
||||
}
|
||||
}
|
||||
|
||||
type Producer struct {
|
||||
srv Server
|
||||
conn Conn
|
||||
}
|
||||
|
||||
func (p Producer) GetServer() Server {
|
||||
return p.srv
|
||||
}
|
||||
|
||||
func (p Producer) GetConn() (conn Conn, exist bool) {
|
||||
return p.conn, p.conn != nil
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package queue
|
||||
|
||||
type (
|
||||
incrementCustomMessageCountHandler func(delta int64)
|
||||
)
|
||||
|
||||
func newController[Id, Q comparable, M Message[Q]](queue *Queue[Id, Q, M], message Message[Q]) Controller {
|
||||
return Controller{
|
||||
incrementCustomMessageCount: func(delta int64) {
|
||||
queueName := message.GetQueue()
|
||||
|
||||
queue.cond.L.Lock()
|
||||
|
||||
currIdent := queue.identifiers[queueName]
|
||||
currIdent += delta
|
||||
queue.identifiers[queueName] = currIdent
|
||||
queue.state.total += delta
|
||||
//log.Info("消息总计数", log.Int64("计数", q.state.total))
|
||||
|
||||
queue.cond.Signal()
|
||||
queue.cond.L.Unlock()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Controller 队列控制器
|
||||
type Controller struct {
|
||||
incrementCustomMessageCount incrementCustomMessageCountHandler
|
||||
}
|
||||
|
||||
// IncrementCustomMessageCount 增加自定义消息计数,当消息计数不为 > 0 时会导致队列关闭进入等待状态
|
||||
func (c Controller) IncrementCustomMessageCount(delta int64) {
|
||||
c.incrementCustomMessageCount(delta)
|
||||
}
|
|
@ -1,6 +1,11 @@
|
|||
package queue
|
||||
|
||||
// Message 消息接口定义
|
||||
type Message interface {
|
||||
// 保留
|
||||
type Message[Queue comparable] interface {
|
||||
// GetQueue 获取消息执行队列
|
||||
GetQueue() Queue
|
||||
// OnPublished 消息发布成功
|
||||
OnPublished(controller Controller)
|
||||
// OnProcessed 消息处理完成
|
||||
OnProcessed(controller Controller)
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ package queue
|
|||
// MessageHandler 消息处理器支持传入两个函数对消息进行处理
|
||||
// - 在 handler 内可以执行对消息的逻辑
|
||||
// - 在 finisher 函数中可以接收到该消息是否是最后一条消息
|
||||
type MessageHandler[Id, Ident comparable, M Message] func(
|
||||
handler func(m MessageWrapper[Id, Ident, M]),
|
||||
finisher func(m MessageWrapper[Id, Ident, M], last bool),
|
||||
type MessageHandler[Id, Q comparable, M Message[Q]] func(
|
||||
handler func(m M),
|
||||
finisher func(m M, last bool),
|
||||
)
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
package queue
|
||||
|
||||
func messageWrapper[Id, Ident comparable, M Message](queue *Queue[Id, Ident, M], hasIdent bool, ident Ident, msg M) MessageWrapper[Id, Ident, M] {
|
||||
return MessageWrapper[Id, Ident, M]{
|
||||
queue: queue,
|
||||
hasIdent: hasIdent,
|
||||
ident: ident,
|
||||
msg: msg,
|
||||
}
|
||||
}
|
||||
|
||||
// MessageWrapper 提供了对外部消息的包装,用于方便的获取消息信息
|
||||
type MessageWrapper[Id, Ident comparable, M Message] struct {
|
||||
queue *Queue[Id, Ident, M] // 处理消息的队列
|
||||
ident Ident // 消息所有人
|
||||
msg M // 消息信息
|
||||
hasIdent bool // 是否拥有所有人
|
||||
}
|
||||
|
||||
// Queue 返回处理该消息的队列
|
||||
func (m MessageWrapper[Id, Ident, M]) Queue() *Queue[Id, Ident, M] {
|
||||
return m.queue
|
||||
}
|
||||
|
||||
// Ident 返回消息的所有人
|
||||
func (m MessageWrapper[Id, Ident, M]) Ident() Ident {
|
||||
return m.ident
|
||||
}
|
||||
|
||||
// HasIdent 返回消息是否拥有所有人
|
||||
func (m MessageWrapper[Id, Ident, M]) HasIdent() bool {
|
||||
return m.hasIdent
|
||||
}
|
||||
|
||||
// Message 返回消息的具体实例
|
||||
func (m MessageWrapper[Id, Ident, M]) Message() M {
|
||||
return m.msg
|
||||
}
|
|
@ -7,15 +7,15 @@ import (
|
|||
)
|
||||
|
||||
// New 创建一个并发安全的队列 Queue,该队列支持通过 chanSize 自定义读取 channel 的大小,同支持使用 bufferSize 指定 buffer.Ring 的初始大小
|
||||
func New[Id, Ident comparable, M Message](id Id, chanSize, bufferSize int) *Queue[Id, Ident, M] {
|
||||
q := &Queue[Id, Ident, M]{
|
||||
c: make(chan MessageHandler[Id, Ident, M], chanSize),
|
||||
buf: buffer.NewRing[MessageWrapper[Id, Ident, M]](bufferSize),
|
||||
func New[Id, Q comparable, M Message[Q]](id Id, chanSize, bufferSize int) *Queue[Id, Q, M] {
|
||||
q := &Queue[Id, Q, M]{
|
||||
c: make(chan MessageHandler[Id, Q, M], chanSize),
|
||||
buf: buffer.NewRing[wrapper[Id, Q, M]](bufferSize),
|
||||
condRW: &sync.RWMutex{},
|
||||
identifiers: make(map[Ident]int64),
|
||||
identifiers: make(map[Q]int64),
|
||||
}
|
||||
q.cond = sync.NewCond(q.condRW)
|
||||
q.state = &State[Id, Ident, M]{
|
||||
q.state = &State[Id, Q, M]{
|
||||
queue: q,
|
||||
id: id,
|
||||
status: StatusNone,
|
||||
|
@ -27,39 +27,39 @@ func New[Id, Ident comparable, M Message](id Id, chanSize, bufferSize int) *Queu
|
|||
// - 该队列接收自定义的消息 M,并将消息有序的传入 Read 函数所返回的 channel 中以供处理
|
||||
// - 该结构主要实现目标为读写分离且并发安全的非阻塞传输队列,当消费阻塞时以牺牲内存为代价换取消息的生产不阻塞,适用于服务器消息处理等
|
||||
// - 该队列保证了消息的完整性,确保消息不丢失,在队列关闭后会等待所有消息处理完毕后进行关闭,并提供 SetClosedHandler 函数来监听队列的关闭信号
|
||||
type Queue[Id, Ident comparable, M Message] struct {
|
||||
state *State[Id, Ident, M] // 队列状态信息
|
||||
c chan MessageHandler[Id, Ident, M] // 消息读取通道
|
||||
buf *buffer.Ring[MessageWrapper[Id, Ident, M]] // 消息缓冲区
|
||||
type Queue[Id, Q comparable, M Message[Q]] struct {
|
||||
state *State[Id, Q, M] // 队列状态信息
|
||||
c chan MessageHandler[Id, Q, M] // 消息读取通道
|
||||
buf *buffer.Ring[wrapper[Id, Q, M]] // 消息缓冲区
|
||||
cond *sync.Cond // 条件变量
|
||||
condRW *sync.RWMutex // 条件变量的读写锁
|
||||
closedHandler func(q *Queue[Id, Ident, M]) // 关闭处理函数
|
||||
identifiers map[Ident]int64 // 标识符在队列的消息计数映射
|
||||
closedHandler func(q *Queue[Id, Q, M]) // 关闭处理函数
|
||||
identifiers map[Q]int64 // 标识符在队列的消息计数映射
|
||||
}
|
||||
|
||||
// Id 获取队列 Id
|
||||
func (q *Queue[Id, Ident, M]) Id() Id {
|
||||
func (q *Queue[Id, Q, M]) Id() Id {
|
||||
return q.state.id
|
||||
}
|
||||
|
||||
// SetId 设置队列 Id
|
||||
func (q *Queue[Id, Ident, M]) SetId(id Id) {
|
||||
func (q *Queue[Id, Q, M]) SetId(id Id) {
|
||||
q.state.id = id
|
||||
}
|
||||
|
||||
// SetClosedHandler 设置队列关闭处理函数,在队列关闭后将执行该函数。此刻队列不再可用
|
||||
// - Close 函数为非阻塞调用,调用后不会立即关闭队列,会等待消息处理完毕且处理期间不再有新消息介入
|
||||
func (q *Queue[Id, Ident, M]) SetClosedHandler(handler func(q *Queue[Id, Ident, M])) {
|
||||
func (q *Queue[Id, Q, M]) SetClosedHandler(handler func(q *Queue[Id, Q, M])) {
|
||||
q.closedHandler = handler
|
||||
}
|
||||
|
||||
// Run 阻塞的运行队列,当队列非首次运行时,将会引发来自 ErrorQueueInvalid 的 panic
|
||||
func (q *Queue[Id, Ident, M]) Run() {
|
||||
func (q *Queue[Id, Q, M]) Run() {
|
||||
if atomic.LoadInt32(&q.state.status) != StatusNone {
|
||||
panic(ErrorQueueInvalid)
|
||||
}
|
||||
atomic.StoreInt32(&q.state.status, StatusRunning)
|
||||
defer func(q *Queue[Id, Ident, M]) {
|
||||
defer func(q *Queue[Id, Q, M]) {
|
||||
if q.closedHandler != nil {
|
||||
q.closedHandler(q)
|
||||
}
|
||||
|
@ -77,16 +77,21 @@ func (q *Queue[Id, Ident, M]) Run() {
|
|||
}
|
||||
items := q.buf.ReadAll()
|
||||
q.cond.L.Unlock()
|
||||
for _, item := range items {
|
||||
q.c <- func(handler func(MessageWrapper[Id, Ident, M]), finisher func(m MessageWrapper[Id, Ident, M], last bool)) {
|
||||
defer func(msg MessageWrapper[Id, Ident, M]) {
|
||||
for i := 0; i < len(items); i++ {
|
||||
item := items[i]
|
||||
q.c <- func(handler func(m M), finisher func(m M, last bool)) {
|
||||
defer func(msg M) {
|
||||
msg.OnProcessed(item.controller)
|
||||
|
||||
queue := msg.GetQueue()
|
||||
|
||||
q.cond.L.Lock()
|
||||
q.state.total--
|
||||
curr := q.identifiers[msg.Ident()] - 1
|
||||
curr := q.identifiers[queue] - 1
|
||||
if curr != 0 {
|
||||
q.identifiers[msg.Ident()] = curr
|
||||
q.identifiers[queue] = curr
|
||||
} else {
|
||||
delete(q.identifiers, msg.Ident())
|
||||
delete(q.identifiers, queue)
|
||||
}
|
||||
if finisher != nil {
|
||||
finisher(msg, curr == 0)
|
||||
|
@ -94,36 +99,40 @@ func (q *Queue[Id, Ident, M]) Run() {
|
|||
//log.Info("消息总计数", log.Int64("计数", q.state.total))
|
||||
q.cond.Signal()
|
||||
q.cond.L.Unlock()
|
||||
}(item)
|
||||
}(item.message)
|
||||
|
||||
handler(item)
|
||||
handler(item.message)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Push 向队列中推送来自 ident 的消息 m,当队列已关闭时将会返回 ErrorQueueClosed
|
||||
func (q *Queue[Id, Ident, M]) Push(hasIdent bool, ident Ident, m M) error {
|
||||
// Push 向队列中推送来自 queue 的消息 m,当队列已关闭时将会返回 ErrorQueueClosed
|
||||
func (q *Queue[Id, Q, M]) Push(queue Q, m M) error {
|
||||
if atomic.LoadInt32(&q.state.status) > StatusClosing {
|
||||
return ErrorQueueClosed
|
||||
}
|
||||
wrapper := newWrapper(q, m)
|
||||
|
||||
q.cond.L.Lock()
|
||||
q.identifiers[ident]++
|
||||
q.identifiers[queue]++
|
||||
q.state.total++
|
||||
q.buf.Write(messageWrapper(q, hasIdent, ident, m))
|
||||
q.buf.Write(wrapper)
|
||||
//log.Info("消息总计数", log.Int64("计数", q.state.total))
|
||||
q.cond.Signal()
|
||||
q.cond.L.Unlock()
|
||||
|
||||
m.OnPublished(wrapper.controller)
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitAdd 向队列增加来自外部的等待计数,在队列关闭时会等待该计数归零
|
||||
func (q *Queue[Id, Ident, M]) WaitAdd(ident Ident, delta int64) {
|
||||
func (q *Queue[Id, Q, M]) WaitAdd(queue Q, delta int64) {
|
||||
q.cond.L.Lock()
|
||||
|
||||
currIdent := q.identifiers[ident]
|
||||
currIdent := q.identifiers[queue]
|
||||
currIdent += delta
|
||||
q.identifiers[ident] = currIdent
|
||||
q.identifiers[queue] = currIdent
|
||||
q.state.total += delta
|
||||
//log.Info("消息总计数", log.Int64("计数", q.state.total))
|
||||
|
||||
|
@ -132,23 +141,23 @@ func (q *Queue[Id, Ident, M]) WaitAdd(ident Ident, delta int64) {
|
|||
}
|
||||
|
||||
// Read 获取队列消息的只读通道,
|
||||
func (q *Queue[Id, Ident, M]) Read() <-chan MessageHandler[Id, Ident, M] {
|
||||
func (q *Queue[Id, Q, M]) Read() <-chan MessageHandler[Id, Q, M] {
|
||||
return q.c
|
||||
}
|
||||
|
||||
// Close 关闭队列
|
||||
func (q *Queue[Id, Ident, M]) Close() {
|
||||
func (q *Queue[Id, Q, M]) Close() {
|
||||
atomic.CompareAndSwapInt32(&q.state.status, StatusRunning, StatusClosing)
|
||||
q.cond.Broadcast()
|
||||
}
|
||||
|
||||
// State 获取队列状态
|
||||
func (q *Queue[Id, Ident, M]) State() *State[Id, Ident, M] {
|
||||
func (q *Queue[Id, Q, M]) State() *State[Id, Q, M] {
|
||||
return q.state
|
||||
}
|
||||
|
||||
// GetMessageCount 获取消息数量
|
||||
func (q *Queue[Id, Ident, M]) GetMessageCount() (count int64) {
|
||||
func (q *Queue[Id, Q, M]) GetMessageCount() (count int64) {
|
||||
q.condRW.RLock()
|
||||
defer q.condRW.RUnlock()
|
||||
for _, i := range q.identifiers {
|
||||
|
@ -158,8 +167,8 @@ func (q *Queue[Id, Ident, M]) GetMessageCount() (count int64) {
|
|||
}
|
||||
|
||||
// GetMessageCountWithIdent 获取特定消息人的消息数量
|
||||
func (q *Queue[Id, Ident, M]) GetMessageCountWithIdent(ident Ident) int64 {
|
||||
func (q *Queue[Id, Q, M]) GetMessageCountWithIdent(queue Q) int64 {
|
||||
q.condRW.RLock()
|
||||
defer q.condRW.RUnlock()
|
||||
return q.identifiers[ident]
|
||||
return q.identifiers[queue]
|
||||
}
|
||||
|
|
|
@ -11,24 +11,24 @@ const (
|
|||
StatusClosed // 队列已关闭
|
||||
)
|
||||
|
||||
type State[Id, Ident comparable, M Message] struct {
|
||||
queue *Queue[Id, Ident, M]
|
||||
type State[Id, Q comparable, M Message[Q]] struct {
|
||||
queue *Queue[Id, Q, M]
|
||||
id Id // 队列 ID
|
||||
status int32 // 状态标志
|
||||
total int64 // 消息总计数
|
||||
}
|
||||
|
||||
// IsClosed 判断队列是否已关闭
|
||||
func (q *State[Id, Ident, M]) IsClosed() bool {
|
||||
func (q *State[Id, Q, M]) IsClosed() bool {
|
||||
return atomic.LoadInt32(&q.status) == StatusClosed
|
||||
}
|
||||
|
||||
// IsClosing 判断队列是否正在关闭
|
||||
func (q *State[Id, Ident, M]) IsClosing() bool {
|
||||
func (q *State[Id, Q, M]) IsClosing() bool {
|
||||
return atomic.LoadInt32(&q.status) == StatusClosing
|
||||
}
|
||||
|
||||
// IsRunning 判断队列是否正在运行
|
||||
func (q *State[Id, Ident, M]) IsRunning() bool {
|
||||
func (q *State[Id, Q, M]) IsRunning() bool {
|
||||
return atomic.LoadInt32(&q.status) == StatusRunning
|
||||
}
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
package queue
|
||||
|
||||
func newWrapper[Id, Q comparable, M Message[Q]](queue *Queue[Id, Q, M], message M) wrapper[Id, Q, M] {
|
||||
return wrapper[Id, Q, M]{
|
||||
message: message,
|
||||
controller: newController[Id, Q, M](queue, message),
|
||||
}
|
||||
}
|
||||
|
||||
type wrapper[Id, Q comparable, M Message[Q]] struct {
|
||||
message M
|
||||
controller Controller
|
||||
}
|
|
@ -2,6 +2,6 @@ package reactor
|
|||
|
||||
import "github.com/kercylan98/minotaur/server/internal/v2/queue"
|
||||
|
||||
type MessageHandler[M any] func(message queue.MessageWrapper[int, string, M])
|
||||
type MessageHandler[Q comparable, M queue.Message[Q]] func(message M)
|
||||
|
||||
type ErrorHandler[M any] func(message queue.MessageWrapper[int, string, M], err error)
|
||||
type ErrorHandler[Q comparable, M queue.Message[Q]] func(message M, err error)
|
||||
|
|
|
@ -20,19 +20,18 @@ const (
|
|||
)
|
||||
|
||||
// NewReactor 创建一个新的 Reactor 实例,初始化系统级别的队列和多个 Socket 对应的队列
|
||||
func NewReactor[M queue.Message](systemQueueSize, queueSize, systemBufferSize, queueBufferSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] {
|
||||
func NewReactor[Queue comparable, M queue.Message[Queue]](queueSize, queueBufferSize int, handler MessageHandler[Queue, M], errorHandler ErrorHandler[Queue, M]) *Reactor[Queue, M] {
|
||||
if handler == nil {
|
||||
|
||||
}
|
||||
r := &Reactor[M]{
|
||||
systemQueue: queue.New[int, string, M](-1, systemQueueSize, systemBufferSize),
|
||||
lb: loadbalancer.NewRoundRobin[int, *queue.Queue[int, string, M]](),
|
||||
r := &Reactor[Queue, M]{
|
||||
lb: loadbalancer.NewRoundRobin[int, *queue.Queue[int, Queue, M]](),
|
||||
errorHandler: errorHandler,
|
||||
queueSize: queueSize,
|
||||
queueBufferSize: queueBufferSize,
|
||||
state: statusNone,
|
||||
handler: handler,
|
||||
location: make(map[string]int),
|
||||
location: make(map[Queue]int),
|
||||
}
|
||||
r.logger.Store(log.GetLogger())
|
||||
|
||||
|
@ -47,41 +46,40 @@ func NewReactor[M queue.Message](systemQueueSize, queueSize, systemBufferSize, q
|
|||
}
|
||||
|
||||
// Reactor 是一个消息反应器,管理系统级别的队列和多个 Socket 对应的队列
|
||||
type Reactor[M queue.Message] struct {
|
||||
type Reactor[Queue comparable, M queue.Message[Queue]] struct {
|
||||
logger atomic.Pointer[log.Logger] // 日志记录器
|
||||
state int32 // 状态
|
||||
systemQueue *queue.Queue[int, string, M] // 系统级别的队列
|
||||
queueSize int // 队列管道大小
|
||||
queueBufferSize int // 队列缓冲区大小
|
||||
queues []*queue.Queue[int, string, M] // 所有使用的队列
|
||||
queues []*queue.Queue[int, Queue, M] // 所有使用的队列
|
||||
queueRW sync.RWMutex // 队列读写锁
|
||||
location map[string]int // 所在队列 ID 映射
|
||||
location map[Queue]int // 所在队列 ID 映射
|
||||
locationRW sync.RWMutex // 所在队列 ID 映射锁
|
||||
lb *loadbalancer.RoundRobin[int, *queue.Queue[int, string, M]] // 负载均衡器
|
||||
lb *loadbalancer.RoundRobin[int, *queue.Queue[int, Queue, M]] // 负载均衡器
|
||||
wg sync.WaitGroup // 等待组
|
||||
cwg sync.WaitGroup // 关闭等待组
|
||||
handler MessageHandler[M] // 消息处理器
|
||||
errorHandler ErrorHandler[M] // 错误处理器
|
||||
handler MessageHandler[Queue, M] // 消息处理器
|
||||
errorHandler ErrorHandler[Queue, M] // 错误处理器
|
||||
}
|
||||
|
||||
// SetLogger 设置日志记录器
|
||||
func (r *Reactor[M]) SetLogger(logger *log.Logger) {
|
||||
func (r *Reactor[Queue, M]) SetLogger(logger *log.Logger) {
|
||||
r.logger.Store(logger)
|
||||
}
|
||||
|
||||
// GetLogger 获取日志记录器
|
||||
func (r *Reactor[M]) GetLogger() *log.Logger {
|
||||
func (r *Reactor[Queue, M]) GetLogger() *log.Logger {
|
||||
return r.logger.Load()
|
||||
}
|
||||
|
||||
// process 消息处理
|
||||
func (r *Reactor[M]) process(msg queue.MessageWrapper[int, string, M]) {
|
||||
defer func(msg queue.MessageWrapper[int, string, M]) {
|
||||
func (r *Reactor[Queue, M]) process(msg M) {
|
||||
defer func(msg M) {
|
||||
if err := super.RecoverTransform(recover()); err != nil {
|
||||
if r.errorHandler != nil {
|
||||
r.errorHandler(msg, err)
|
||||
} else {
|
||||
r.GetLogger().Error("Reactor", log.String("action", "process"), log.Any("ident", msg.Ident()), log.Int("queue", msg.Queue().Id()), log.Err(err))
|
||||
r.GetLogger().Error("Reactor", log.String("action", "process"), log.Any("queue", msg.GetQueue()), log.Err(err))
|
||||
debug.PrintStack()
|
||||
}
|
||||
}
|
||||
|
@ -90,25 +88,17 @@ func (r *Reactor[M]) process(msg queue.MessageWrapper[int, string, M]) {
|
|||
r.handler(msg)
|
||||
}
|
||||
|
||||
// SystemDispatch 将消息分发到系统级别的队列
|
||||
func (r *Reactor[M]) SystemDispatch(msg M) error {
|
||||
if atomic.LoadInt32(&r.state) > statusClosing {
|
||||
r.queueRW.RUnlock()
|
||||
return fmt.Errorf("reactor closing or closed")
|
||||
}
|
||||
return r.systemQueue.Push(false, "", msg)
|
||||
}
|
||||
|
||||
// IdentDispatch 将消息分发到 ident 使用的队列,当 ident 首次使用时,将会根据负载均衡策略选择一个队列
|
||||
// Dispatch 将消息分发到 ident 使用的队列,当 ident 首次使用时,将会根据负载均衡策略选择一个队列
|
||||
// - 设置 count 会增加消息的外部计数,当 Reactor 关闭时会等待外部计数归零
|
||||
func (r *Reactor[M]) IdentDispatch(ident string, msg M) error {
|
||||
// - 当 ident 为空字符串时候,将发送到
|
||||
func (r *Reactor[Queue, M]) Dispatch(ident Queue, msg M) error {
|
||||
r.queueRW.RLock()
|
||||
if atomic.LoadInt32(&r.state) > statusClosing {
|
||||
r.queueRW.RUnlock()
|
||||
return fmt.Errorf("reactor closing or closed")
|
||||
}
|
||||
|
||||
var next *queue.Queue[int, string, M]
|
||||
var next *queue.Queue[int, Queue, M]
|
||||
r.locationRW.RLock()
|
||||
i, exist := r.location[ident]
|
||||
r.locationRW.RUnlock()
|
||||
|
@ -126,16 +116,16 @@ func (r *Reactor[M]) IdentDispatch(ident string, msg M) error {
|
|||
next = r.queues[i]
|
||||
}
|
||||
r.queueRW.RUnlock()
|
||||
return next.Push(true, ident, msg)
|
||||
return next.Push(ident, msg)
|
||||
}
|
||||
|
||||
// Run 启动 Reactor,运行系统级别的队列和多个 Socket 对应的队列
|
||||
func (r *Reactor[M]) Run(callbacks ...func(queues []*queue.Queue[int, string, M])) {
|
||||
func (r *Reactor[Queue, M]) Run(callbacks ...func(queues []*queue.Queue[int, Queue, M])) {
|
||||
if !atomic.CompareAndSwapInt32(&r.state, statusNone, statusRunning) {
|
||||
return
|
||||
}
|
||||
r.queueRW.Lock()
|
||||
queues := append([]*queue.Queue[int, string, M]{r.systemQueue}, r.queues...)
|
||||
queues := r.queues
|
||||
for _, q := range queues {
|
||||
r.runQueue(q)
|
||||
}
|
||||
|
@ -146,13 +136,13 @@ func (r *Reactor[M]) Run(callbacks ...func(queues []*queue.Queue[int, string, M]
|
|||
r.wg.Wait()
|
||||
}
|
||||
|
||||
func (r *Reactor[M]) noneLockAddQueue() {
|
||||
q := queue.New[int, string, M](len(r.queues), r.queueSize, r.queueBufferSize)
|
||||
func (r *Reactor[Queue, M]) noneLockAddQueue() {
|
||||
q := queue.New[int, Queue, M](len(r.queues), r.queueSize, r.queueBufferSize)
|
||||
r.lb.Add(q) // 运行前添加到负载均衡器,未运行时允许接收消息
|
||||
r.queues = append(r.queues, q)
|
||||
}
|
||||
|
||||
func (r *Reactor[M]) noneLockDelQueue(q *queue.Queue[int, string, M]) {
|
||||
func (r *Reactor[Queue, M]) noneLockDelQueue(q *queue.Queue[int, Queue, M]) {
|
||||
idx := q.Id()
|
||||
if idx < 0 || idx >= len(r.queues) || r.queues[idx] != q {
|
||||
return
|
||||
|
@ -163,9 +153,9 @@ func (r *Reactor[M]) noneLockDelQueue(q *queue.Queue[int, string, M]) {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *Reactor[M]) runQueue(q *queue.Queue[int, string, M]) {
|
||||
func (r *Reactor[Queue, M]) runQueue(q *queue.Queue[int, Queue, M]) {
|
||||
r.wg.Add(1)
|
||||
q.SetClosedHandler(func(q *queue.Queue[int, string, M]) {
|
||||
q.SetClosedHandler(func(q *queue.Queue[int, Queue, M]) {
|
||||
// 关闭时正在等待关闭完成,外部已加锁,无需再次加锁
|
||||
r.noneLockDelQueue(q)
|
||||
r.cwg.Done()
|
||||
|
@ -173,7 +163,7 @@ func (r *Reactor[M]) runQueue(q *queue.Queue[int, string, M]) {
|
|||
})
|
||||
go q.Run()
|
||||
|
||||
go func(r *Reactor[M], q *queue.Queue[int, string, M]) {
|
||||
go func(r *Reactor[Queue, M], q *queue.Queue[int, Queue, M]) {
|
||||
defer r.wg.Done()
|
||||
for m := range q.Read() {
|
||||
m(r.process, r.processFinish)
|
||||
|
@ -181,13 +171,13 @@ func (r *Reactor[M]) runQueue(q *queue.Queue[int, string, M]) {
|
|||
}(r, q)
|
||||
}
|
||||
|
||||
func (r *Reactor[M]) Close() {
|
||||
func (r *Reactor[Queue, M]) Close() {
|
||||
if !atomic.CompareAndSwapInt32(&r.state, statusRunning, statusClosing) {
|
||||
return
|
||||
}
|
||||
r.queueRW.Lock()
|
||||
r.cwg.Add(len(r.queues) + 1)
|
||||
for _, q := range append(r.queues, r.systemQueue) {
|
||||
for _, q := range r.queues {
|
||||
q.Close()
|
||||
}
|
||||
r.cwg.Wait()
|
||||
|
@ -195,22 +185,25 @@ func (r *Reactor[M]) Close() {
|
|||
r.queueRW.Unlock()
|
||||
}
|
||||
|
||||
func (r *Reactor[M]) processFinish(m queue.MessageWrapper[int, string, M], last bool) {
|
||||
if last {
|
||||
func (r *Reactor[Queue, M]) processFinish(m M, last bool) {
|
||||
if !last {
|
||||
return
|
||||
}
|
||||
queueName := m.GetQueue()
|
||||
|
||||
r.locationRW.RLock()
|
||||
mq, exist := r.location[m.Ident()]
|
||||
mq, exist := r.location[queueName]
|
||||
r.locationRW.RUnlock()
|
||||
if exist {
|
||||
r.locationRW.Lock()
|
||||
defer r.locationRW.Unlock()
|
||||
mq, exist = r.location[m.Ident()]
|
||||
mq, exist = r.location[queueName]
|
||||
if exist {
|
||||
delete(r.location, m.Ident())
|
||||
delete(r.location, queueName)
|
||||
r.queueRW.RLock()
|
||||
mq := r.queues[mq]
|
||||
r.queueRW.RUnlock()
|
||||
r.logger.Load().Debug("Reactor", log.String("action", "unbind"), log.Any("ident", m.Ident()), log.Any("queue", mq.Id()))
|
||||
}
|
||||
r.logger.Load().Debug("Reactor", log.String("action", "unbind"), log.Any("queueName", queueName), log.Any("queue", mq.Id()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,9 +2,15 @@ package server
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/kercylan98/minotaur/server/internal/v2/message"
|
||||
"github.com/kercylan98/minotaur/server/internal/v2/message/messages"
|
||||
"github.com/kercylan98/minotaur/server/internal/v2/queue"
|
||||
"github.com/kercylan98/minotaur/utils/collection"
|
||||
"github.com/kercylan98/minotaur/utils/log/v2"
|
||||
"github.com/kercylan98/minotaur/utils/random"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/kercylan98/minotaur/server/internal/v2/reactor"
|
||||
|
@ -13,6 +19,7 @@ import (
|
|||
|
||||
type Server interface {
|
||||
Events
|
||||
message.Broker[Producer, string]
|
||||
|
||||
// Run 运行服务器
|
||||
Run() error
|
||||
|
@ -23,46 +30,45 @@ type Server interface {
|
|||
// GetStatus 获取服务器状态
|
||||
GetStatus() *State
|
||||
|
||||
// PushMessage 推送特定消息到系统队列中进行处理
|
||||
PushMessage(message Message)
|
||||
// PublishSyncMessage 发布同步消息
|
||||
PublishSyncMessage(queue string, handler messages.SynchronousHandler[Producer, string, Server])
|
||||
|
||||
// PushSyncMessage 是 PushMessage 中对于 GenerateSystemSyncMessage 的快捷方式
|
||||
PushSyncMessage(handler func(srv Server))
|
||||
|
||||
// PushAsyncMessage 是 PushMessage 中对于 GenerateSystemAsyncMessage 的快捷方式,当 callback 传入多个时,将仅有首个生效
|
||||
PushAsyncMessage(handler func(srv Server) error, callback ...func(srv Server, err error))
|
||||
// PublishAsyncMessage 发布异步消息,当包含多个 callback 时,仅首个生效
|
||||
PublishAsyncMessage(queue string, handler messages.AsynchronousHandler[Producer, string, Server], callback ...messages.AsynchronousCallbackHandler[Producer, string, Server])
|
||||
}
|
||||
|
||||
type server struct {
|
||||
*controller
|
||||
*events
|
||||
*Options
|
||||
queue string
|
||||
ants *ants.Pool
|
||||
state *State
|
||||
notify *notify
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
network Network
|
||||
reactor *reactor.Reactor[Message]
|
||||
reactor *reactor.Reactor[string, message.Message[Producer, string]]
|
||||
}
|
||||
|
||||
func NewServer(network Network, options ...*Options) Server {
|
||||
srv := &server{
|
||||
network: network,
|
||||
Options: DefaultOptions(),
|
||||
queue: fmt.Sprintf("%s:%s", reflect.TypeOf(new(server)).String(), random.HostName()),
|
||||
}
|
||||
srv.ctx, srv.cancel = context.WithCancel(context.Background())
|
||||
srv.notify = new(notify).init(srv)
|
||||
srv.controller = new(controller).init(srv)
|
||||
srv.events = new(events).init(srv)
|
||||
srv.state = new(State).init(srv)
|
||||
srv.reactor = reactor.NewReactor[Message](
|
||||
srv.GetServerMessageChannelSize(), srv.GetActorMessageChannelSize(),
|
||||
srv.GetServerMessageBufferInitialSize(), srv.GetActorMessageBufferInitialSize(),
|
||||
srv.reactor = reactor.NewReactor[string, message.Message[Producer, string]](
|
||||
srv.GetActorMessageChannelSize(),
|
||||
srv.GetActorMessageBufferInitialSize(),
|
||||
srv.onProcessMessage,
|
||||
func(message queue.MessageWrapper[int, string, Message], err error) {
|
||||
func(message message.Message[Producer, string], err error) {
|
||||
if handler := srv.GetMessageErrorHandler(); handler != nil {
|
||||
handler(srv, message.Message(), err)
|
||||
handler(srv, message, err)
|
||||
}
|
||||
})
|
||||
srv.Options.init(srv).Apply(options...)
|
||||
|
@ -84,7 +90,7 @@ func NewServer(network Network, options ...*Options) Server {
|
|||
|
||||
func (s *server) Run() (err error) {
|
||||
var queueWait = make(chan struct{})
|
||||
go s.reactor.Run(func(queues []*queue.Queue[int, string, Message]) {
|
||||
go s.reactor.Run(func(queues []*queue.Queue[int, string, message.Message[Producer, string]]) {
|
||||
for _, q := range queues {
|
||||
s.GetLogger().Debug("Reactor", log.String("action", "run"), log.Any("queue", q.Id()))
|
||||
}
|
||||
|
@ -123,31 +129,42 @@ func (s *server) Shutdown() (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (s *server) PushMessage(message Message) {
|
||||
s.controller.PushSystemMessage(message)
|
||||
func (s *server) getSysQueue() string {
|
||||
return s.queue
|
||||
}
|
||||
|
||||
func (s *server) PushSyncMessage(handler func(srv Server)) {
|
||||
s.PushMessage(GenerateSystemSyncMessage(handler))
|
||||
func (s *server) PublishMessage(msg message.Message[Producer, string]) {
|
||||
s.reactor.Dispatch(msg.GetQueue(), msg)
|
||||
}
|
||||
|
||||
func (s *server) PushAsyncMessage(handler func(srv Server) error, callback ...func(srv Server, err error)) {
|
||||
var cb func(srv Server, err error)
|
||||
if len(callback) > 0 {
|
||||
cb = callback[0]
|
||||
func (s *server) PublishSyncMessage(queue string, handler messages.SynchronousHandler[Producer, string, Server]) {
|
||||
s.PublishMessage(messages.Synchronous[Producer, string, Server](
|
||||
s, newProducer(s, nil), queue,
|
||||
handler,
|
||||
))
|
||||
}
|
||||
s.PushMessage(GenerateSystemAsyncMessage(handler, cb))
|
||||
|
||||
func (s *server) PublishAsyncMessage(queue string, handler messages.AsynchronousHandler[Producer, string, Server], callback ...messages.AsynchronousCallbackHandler[Producer, string, Server]) {
|
||||
s.PublishMessage(messages.Asynchronous[Producer, string, Server](
|
||||
s, newProducer(s, nil), queue,
|
||||
func(ctx context.Context, srv Server, f func(context.Context, Server)) {
|
||||
s.ants.Submit(func() {
|
||||
f(ctx, s)
|
||||
})
|
||||
},
|
||||
handler,
|
||||
collection.FindFirstOrDefaultInSlice(callback, nil),
|
||||
))
|
||||
}
|
||||
|
||||
func (s *server) GetStatus() *State {
|
||||
return s.state.Status()
|
||||
}
|
||||
|
||||
func (s *server) onProcessMessage(message queue.MessageWrapper[int, string, Message]) {
|
||||
func (s *server) onProcessMessage(m message.Message[Producer, string]) {
|
||||
s.getManyOptions(func(opt *Options) {
|
||||
m := message.Message()
|
||||
m.OnInitialize(s, s.reactor, message)
|
||||
ctx := context.Background()
|
||||
m.OnInitialize(ctx)
|
||||
m.OnProcess()
|
||||
})
|
||||
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package server_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/gobwas/ws"
|
||||
"github.com/kercylan98/minotaur/server/internal/v2"
|
||||
"github.com/kercylan98/minotaur/server/internal/v2/network"
|
||||
|
@ -30,19 +31,18 @@ func TestNewServer(t *testing.T) {
|
|||
var tm = make(map[string]bool)
|
||||
|
||||
srv.RegisterConnectionOpenedEvent(func(srv server.Server, conn server.Conn) {
|
||||
conn.SetActor(random.HostName())
|
||||
if err := conn.WritePacket(server.NewPacket([]byte("hello")).SetContext(ws.OpText)); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
conn.PushMessage(server.GenerateCrossQueueMessage("target", func(srv server.Server) {
|
||||
srv.PublishAsyncMessage("123", func(ctx context.Context, s server.Server) error {
|
||||
return nil
|
||||
}, func(ctx context.Context, s server.Server, err error) {
|
||||
for i := 0; i < 10000000; i++ {
|
||||
_ = tm["1"]
|
||||
tm["1"] = random.Bool()
|
||||
}
|
||||
}, func(srv server.Server) {
|
||||
|
||||
}))
|
||||
})
|
||||
})
|
||||
|
||||
srv.RegisterConnectionReceivePacketEvent(func(srv server.Server, conn server.Conn, packet server.Packet) {
|
||||
|
|
Loading…
Reference in New Issue