diff --git a/server/internal/v2/conn.go b/server/internal/v2/conn.go index 39bb095..4c7632e 100644 --- a/server/internal/v2/conn.go +++ b/server/internal/v2/conn.go @@ -15,8 +15,8 @@ type Conn interface { // DelActor 删除连接使用的 Actor DelActor() - // GetActor 获取连接使用的 Actor 名称及是否拥有 Actor 名称的状态 - GetActor() (string, bool) + // GetActor 获取连接使用的 Actor 名称 + GetActor() string // WritePacket 写入一个 Packet WritePacket(packet Packet) error @@ -29,15 +29,6 @@ type Conn interface { // WriteContext 写入数据 WriteContext(data []byte, context interface{}) error - - // PushMessage 通过连接推送特定消息到队列中进行处理 - PushMessage(message Message) - - // PushSyncMessage 是 PushMessage 中对于 GenerateConnSyncMessage 的快捷方式 - PushSyncMessage(handler func(srv Server, conn Conn)) - - // PushAsyncMessage 是 PushMessage 中对于 GenerateConnAsyncMessage 的快捷方式,当 callback 传入多个时,将仅有首个生效 - PushAsyncMessage(handler func(srv Server, conn Conn) error, callback ...func(srv Server, conn Conn, err error)) } func newConn(srv *server, c net.Conn, connWriter ConnWriter) *conn { @@ -63,12 +54,12 @@ func (c *conn) DelActor() { c.actor.Store(nil) } -func (c *conn) GetActor() (string, bool) { +func (c *conn) GetActor() string { ident := c.actor.Load() if ident == nil { - return "", false + return "" } - return *ident, true + return *ident } func (c *conn) WritePacket(packet Packet) error { @@ -87,30 +78,3 @@ func (c *conn) WriteBytes(data []byte) error { func (c *conn) WriteContext(data []byte, context interface{}) error { return c.writer(NewPacket(data).SetContext(context)) } - -func (c *conn) PushMessage(message Message) { - c.getDispatchHandler()(message) -} - -func (c *conn) PushSyncMessage(handler func(srv Server, conn Conn)) { - c.PushMessage(GenerateConnSyncMessage(c, handler)) -} - -func (c *conn) PushAsyncMessage(handler func(srv Server, conn Conn) error, callback ...func(srv Server, conn Conn, err error)) { - var cb func(srv Server, conn Conn, err error) - if len(callback) > 0 { - cb = callback[0] - } - c.PushMessage(GenerateConnAsyncMessage(c, handler, cb)) -} - -func (c *conn) getDispatchHandler() func(message Message) { - var ident, exist = c.GetActor() - return func(message Message) { - if !exist { - c.server.PushSystemMessage(message) - } else { - c.server.PushIdentMessage(ident, message) - } - } -} diff --git a/server/internal/v2/controller.go b/server/internal/v2/controller.go index 608eb43..36dd680 100644 --- a/server/internal/v2/controller.go +++ b/server/internal/v2/controller.go @@ -1,7 +1,7 @@ package server import ( - "github.com/kercylan98/minotaur/utils/log" + "context" "github.com/panjf2000/ants/v2" "net" ) @@ -18,12 +18,6 @@ type Controller interface { ReactPacket(conn net.Conn, packet Packet) // GetAnts 获取服务器异步池 GetAnts() *ants.Pool - // PushSystemMessage 推送系统消息 - PushSystemMessage(message Message, errorHandlers ...func(err error)) - // PushIdentMessage 推送标识消息 - PushIdentMessage(ident string, message Message, errorHandlers ...func(err error)) - // MessageErrProcess 消息错误处理 - MessageErrProcess(message Message, err error) } type controller struct { @@ -41,73 +35,37 @@ func (s *controller) GetServer() Server { return s.server } -func (s *controller) MessageErrProcess(message Message, err error) { - if err == nil { - return - } - if s.server.messageErrorHandler != nil { - s.server.messageErrorHandler(s.server, message, err) - } else { - s.server.GetLogger().Error("Server", log.Err(err)) - } -} - func (s *controller) GetAnts() *ants.Pool { return s.server.ants } -func (s *controller) PushSystemMessage(message Message, errorHandlers ...func(err error)) { - if err := s.server.reactor.SystemDispatch(message); err != nil { - for _, f := range errorHandlers { - f(err) - } - s.MessageErrProcess(message, err) - } -} - -func (s *controller) PushIdentMessage(ident string, message Message, errorHandlers ...func(err error)) { - if err := s.server.reactor.IdentDispatch(ident, message); err != nil { - for _, f := range errorHandlers { - f(err) - } - s.MessageErrProcess(message, err) - } -} - func (s *controller) RegisterConnection(conn net.Conn, writer ConnWriter) { - s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) { + s.server.PublishSyncMessage(s.getSysQueue(), func(ctx context.Context, srv Server) { c := newConn(s.server, conn, writer) s.server.connections[conn] = c s.events.onConnectionOpened(c) - })) + }) } func (s *controller) EliminateConnection(conn net.Conn, err error) { - s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) { + s.server.PublishSyncMessage(s.getSysQueue(), func(ctx context.Context, srv Server) { c, exist := s.server.connections[conn] if !exist { return } delete(s.server.connections, conn) s.server.events.onConnectionClosed(c, err) - })) + }) } func (s *controller) ReactPacket(conn net.Conn, packet Packet) { - s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) { + s.server.PublishSyncMessage(s.getSysQueue(), func(ctx context.Context, srv Server) { c, exist := s.server.connections[conn] if !exist { return } - ident, exist := c.GetActor() - if !exist { - s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) { - s.events.onConnectionReceivePacket(c, packet) - })) - } else { - s.PushIdentMessage(ident, GenerateSystemSyncMessage(func(srv Server) { - s.events.onConnectionReceivePacket(c, packet) - })) - } - })) + s.PublishSyncMessage(c.GetActor(), func(ctx context.Context, srv Server) { + s.events.onConnectionReceivePacket(c, packet) + }) + }) } diff --git a/server/internal/v2/events.go b/server/internal/v2/events.go index 74a7ee4..3711fd4 100644 --- a/server/internal/v2/events.go +++ b/server/internal/v2/events.go @@ -1,7 +1,9 @@ package server import ( + "context" "fmt" + "github.com/kercylan98/minotaur/server/internal/v2/message/messages" "github.com/kercylan98/minotaur/utils/log" "reflect" "time" @@ -66,12 +68,17 @@ func (s *events) onLaunched() { opt.logger.Info("Minotaur Server", log.String("", "============================================================================")) }) - s.PushMessage(GenerateSystemSyncMessage(func(srv Server) { - s.launchedEventHandlers.RangeValue(func(index int, value LaunchedEventHandler) bool { - value(s.server, s.server.state.Ip, s.server.state.LaunchedAt) - return true - }) - })) + s.PublishMessage(messages.Synchronous( + Server(s.server), + newProducer(s.server, nil), + s.server.getSysQueue(), + func(ctx context.Context, broker Server) { + s.launchedEventHandlers.RangeValue(func(index int, value LaunchedEventHandler) bool { + value(s.server, s.server.state.Ip, s.server.state.LaunchedAt) + return true + }) + }, + )) } func (s *events) RegisterConnectionOpenedEvent(handler ConnectionOpenedEventHandler, priority ...int) { @@ -79,12 +86,12 @@ func (s *events) RegisterConnectionOpenedEvent(handler ConnectionOpenedEventHand } func (s *events) onConnectionOpened(conn Conn) { - s.PushMessage(GenerateSystemSyncMessage(func(srv Server) { + s.PublishSyncMessage(s.getSysQueue(), func(ctx context.Context, srv Server) { s.connectionOpenedEventHandlers.RangeValue(func(index int, value ConnectionOpenedEventHandler) bool { - value(s.server, conn) + value(srv, conn) return true }) - })) + }) } func (s *events) RegisterConnectionClosedEvent(handler ConnectionClosedEventHandler, priority ...int) { @@ -92,12 +99,12 @@ func (s *events) RegisterConnectionClosedEvent(handler ConnectionClosedEventHand } func (s *events) onConnectionClosed(conn Conn, err error) { - s.PushMessage(GenerateSystemSyncMessage(func(srv Server) { + s.PublishSyncMessage(s.getSysQueue(), func(ctx context.Context, srv Server) { s.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool { - value(s.server, conn, err) + value(srv, conn, err) return true }) - })) + }) } func (s *events) RegisterConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int) { @@ -105,12 +112,12 @@ func (s *events) RegisterConnectionReceivePacketEvent(handler ConnectionReceiveP } func (s *events) onConnectionReceivePacket(conn *conn, packet Packet) { - conn.getDispatchHandler()(GenerateConnSyncMessage(conn, func(srv Server, conn Conn) { + s.PublishSyncMessage(conn.GetActor(), func(ctx context.Context, srv Server) { s.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool { - value(s.server, conn, packet) + value(srv, conn, packet) return true }) - })) + }) } func (s *events) RegisterShutdownEvent(handler ShutdownEventHandler, priority ...int) { @@ -118,10 +125,10 @@ func (s *events) RegisterShutdownEvent(handler ShutdownEventHandler, priority .. } func (s *events) onShutdown() { - s.PushSystemMessage(GenerateSystemSyncMessage(func(srv Server) { + s.PublishSyncMessage(s.getSysQueue(), func(ctx context.Context, srv Server) { s.shutdownEventHandlers.RangeValue(func(index int, value ShutdownEventHandler) bool { - value(s.server) + value(srv) return true }) - })) + }) } diff --git a/server/internal/v2/message.go b/server/internal/v2/message.go deleted file mode 100644 index 5ea437b..0000000 --- a/server/internal/v2/message.go +++ /dev/null @@ -1,249 +0,0 @@ -package server - -import ( - "github.com/kercylan98/minotaur/server/internal/v2/queue" - "github.com/kercylan98/minotaur/server/internal/v2/reactor" -) - -type Message interface { - // OnInitialize 消息初始化阶段将会被告知消息所在服务器、反应器、队列及标识信息 - OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) - - // OnProcess 消息处理阶段需要完成对消息的处理,并返回处理结果 - OnProcess() -} - -// GenerateSystemSyncMessage 生成系统同步消息 -func GenerateSystemSyncMessage(handler func(srv Server)) Message { - return &systemSyncMessage{handler: handler} -} - -type systemSyncMessage struct { - controller Controller - handler func(srv Server) -} - -func (m *systemSyncMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) { - m.controller = controller -} - -func (m *systemSyncMessage) OnProcess() { - m.handler(m.controller.GetServer()) -} - -// GenerateSystemAsyncMessage 生成系统异步消息 -func GenerateSystemAsyncMessage(handler func(srv Server) error, callback func(srv Server, err error)) Message { - return &systemAsyncMessage{ - handler: handler, - callback: callback, - } -} - -type systemAsyncMessage struct { - controller Controller - queue *queue.Queue[int, string, Message] - handler func(srv Server) error - callback func(srv Server, err error) - hasIdent bool - ident string -} - -func (m *systemAsyncMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) { - m.controller = controller - m.queue = message.Queue() - m.ident = message.Ident() - m.hasIdent = message.HasIdent() -} - -func (m *systemAsyncMessage) OnProcess() { - var ident = m.ident - - m.queue.WaitAdd(ident, 1) - err := m.controller.GetAnts().Submit(func() { - err := m.handler(m.controller.GetServer()) - if !m.hasIdent { - m.controller.PushSystemMessage(GenerateSystemAsyncCallbackMessage(m.callback, err), func(err error) { - m.queue.WaitAdd(ident, -1) - }) - } else { - m.controller.PushIdentMessage(ident, GenerateSystemAsyncCallbackMessage(m.callback, err), func(err error) { - m.queue.WaitAdd(ident, -1) - }) - } - if err != nil { - m.queue.WaitAdd(ident, -1) - } - }) - if err != nil { - m.controller.MessageErrProcess(m, err) - m.queue.WaitAdd(ident, -1) - } -} - -// GenerateSystemAsyncCallbackMessage 生成系统异步回调消息 -func GenerateSystemAsyncCallbackMessage(handler func(srv Server, err error), err error) Message { - return &systemAsyncCallbackMessage{ - err: err, - handler: handler, - } -} - -type systemAsyncCallbackMessage struct { - controller Controller - err error - handler func(srv Server, err error) - queue *queue.Queue[int, string, Message] - ident string -} - -func (m *systemAsyncCallbackMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) { - m.controller = controller - m.queue = message.Queue() - m.ident = message.Ident() -} - -func (m *systemAsyncCallbackMessage) OnProcess() { - defer func(m *systemAsyncCallbackMessage) { - m.queue.WaitAdd(m.ident, -1) - }(m) - - if m.handler != nil { - m.handler(m.controller.GetServer(), m.err) - } -} - -// GenerateConnSyncMessage 生成连接同步消息 -func GenerateConnSyncMessage(conn Conn, handler func(srv Server, conn Conn)) Message { - return &connSyncMessage{handler: handler, conn: conn} -} - -type connSyncMessage struct { - controller Controller - conn Conn - handler func(srv Server, conn Conn) -} - -func (m *connSyncMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) { - m.controller = controller -} - -func (m *connSyncMessage) OnProcess() { - m.handler(m.controller.GetServer(), m.conn) -} - -// GenerateConnAsyncMessage 生成连接异步消息 -func GenerateConnAsyncMessage(conn Conn, handler func(srv Server, conn Conn) error, callback func(srv Server, conn Conn, err error)) Message { - return &connAsyncMessage{ - conn: conn, - handler: handler, - callback: callback, - } -} - -type connAsyncMessage struct { - controller Controller - conn Conn - queue *queue.Queue[int, string, Message] - handler func(srv Server, conn Conn) error - callback func(srv Server, conn Conn, err error) - ident string - hasIdent bool -} - -func (m *connAsyncMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) { - m.controller = controller - m.queue = message.Queue() - m.ident = message.Ident() - m.hasIdent = message.HasIdent() -} - -func (m *connAsyncMessage) OnProcess() { - m.queue.WaitAdd(m.ident, 1) - err := m.controller.GetAnts().Submit(func() { - err := m.handler(m.controller.GetServer(), m.conn) - if !m.hasIdent { - m.controller.PushSystemMessage(GenerateConnAsyncCallbackMessage(m.conn, m.callback, err), func(err error) { - m.queue.WaitAdd(m.ident, -1) - }) - } else { - m.controller.PushIdentMessage(m.ident, GenerateConnAsyncCallbackMessage(m.conn, m.callback, err), func(err error) { - m.queue.WaitAdd(m.ident, -1) - }) - } - if err != nil { - m.queue.WaitAdd(m.ident, -1) - } - }) - if err != nil { - m.controller.MessageErrProcess(m, err) - m.queue.WaitAdd(m.ident, -1) - } -} - -// GenerateConnAsyncCallbackMessage 生成连接异步回调消息 -func GenerateConnAsyncCallbackMessage(conn Conn, handler func(srv Server, conn Conn, err error), err error) Message { - return &connAsyncCallbackMessage{ - conn: conn, - err: err, - handler: handler, - } -} - -type connAsyncCallbackMessage struct { - controller Controller - conn Conn - err error - handler func(srv Server, conn Conn, err error) - queue *queue.Queue[int, string, Message] - ident string -} - -func (m *connAsyncCallbackMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) { - m.controller = controller - m.queue = message.Queue() - m.ident = message.Ident() -} - -func (m *connAsyncCallbackMessage) OnProcess() { - defer func(m *connAsyncCallbackMessage) { - m.queue.WaitAdd(m.ident, -1) - }(m) - - if m.handler != nil { - m.handler(m.controller.GetServer(), m.conn, m.err) - } -} - -// GenerateCrossQueueMessage 生成跨队列消息,该消息将会把消息传入对应 ident 所在队列进行处理,并在处理完成时进行回调 -func GenerateCrossQueueMessage(targetIdent string, handler func(srv Server), callback func(srv Server)) Message { - return &crossQueueMessage{ - targetIdent: targetIdent, - handler: handler, - callback: callback, - } -} - -type crossQueueMessage struct { - controller Controller - message queue.MessageWrapper[int, string, Message] - handler func(srv Server) - callback func(srv Server) - targetIdent string -} - -func (m *crossQueueMessage) OnInitialize(controller Controller, reactor *reactor.Reactor[Message], message queue.MessageWrapper[int, string, Message]) { - m.controller = controller - m.message = message -} - -func (m *crossQueueMessage) OnProcess() { - m.controller.PushIdentMessage(m.targetIdent, GenerateSystemSyncMessage(func(srv Server) { - m.handler(srv) - - if m.message.HasIdent() { - m.controller.PushIdentMessage(m.message.Ident(), GenerateSystemSyncMessage(m.callback)) - } else { - m.controller.PushSystemMessage(GenerateSystemSyncMessage(m.callback)) - } - })) -} diff --git a/server/internal/v2/message/broker.go b/server/internal/v2/message/broker.go new file mode 100644 index 0000000..027ac89 --- /dev/null +++ b/server/internal/v2/message/broker.go @@ -0,0 +1,6 @@ +package message + +// Broker 消息核心的接口定义 +type Broker[P Producer, Q Queue] interface { + PublishMessage(message Message[P, Q]) +} diff --git a/server/internal/v2/message/message.go b/server/internal/v2/message/message.go new file mode 100644 index 0000000..39d3485 --- /dev/null +++ b/server/internal/v2/message/message.go @@ -0,0 +1,16 @@ +package message + +import ( + "context" + "github.com/kercylan98/minotaur/server/internal/v2/queue" +) + +type Message[P Producer, Q Queue] interface { + OnInitialize(ctx context.Context) + OnProcess() + + // GetProducer 获取消息生产者 + GetProducer() P + + queue.Message[Q] +} diff --git a/server/internal/v2/message/messages/asynchronous.go b/server/internal/v2/message/messages/asynchronous.go new file mode 100644 index 0000000..4ca4275 --- /dev/null +++ b/server/internal/v2/message/messages/asynchronous.go @@ -0,0 +1,92 @@ +package messages + +import ( + "context" + "github.com/kercylan98/minotaur/server/internal/v2/message" + "github.com/kercylan98/minotaur/server/internal/v2/queue" +) + +type ( + AsynchronousActuator[P message.Producer, Q message.Queue, B message.Broker[P, Q]] func(context.Context, B, func(context.Context, B)) // 负责执行异步消息的执行器 + AsynchronousHandler[P message.Producer, Q message.Queue, B message.Broker[P, Q]] func(context.Context, B) error // 异步消息逻辑处理器 + AsynchronousCallbackHandler[P message.Producer, Q message.Queue, B message.Broker[P, Q]] func(context.Context, B, error) // 异步消息回调处理器 +) + +// Asynchronous 创建一个异步消息实例,并指定相应的处理器。 +// 该函数接收以下参数: +// - broker:消息所属的 Broker 实例。 +// - actuator:异步消息的执行器,负责执行异步消息的逻辑,当该参数为空时,将会使用默认的 go func()。 +// - handler:异步消息的逻辑处理器,用于执行实际的异步消息处理逻辑,可选参数。 +// - callback:异步消息的回调处理器,处理消息处理完成后的回调操作,可选参数。 +// - afterHandler:异步消息执行完成后的处理器,用于进行后续的处理操作,可选参数。 +// +// 该函数除了 handler,其他所有处理器均为同步执行 +// +// 返回值为一个实现了 Message 接口的异步消息实例。 +func Asynchronous[P message.Producer, Q message.Queue, B message.Broker[P, Q]]( + broker B, producer P, queue Q, + actuator AsynchronousActuator[P, Q, B], + handler AsynchronousHandler[P, Q, B], + callback AsynchronousCallbackHandler[P, Q, B], +) message.Message[P, Q] { + m := &asynchronous[P, Q, B]{ + broker: broker, + producer: producer, + queue: queue, + actuator: actuator, + handler: handler, + callback: callback, + } + if m.actuator == nil { + m.actuator = func(ctx context.Context, b B, f func(context.Context, B)) { + go f(ctx, b) + } + } + + return m +} + +type asynchronous[P message.Producer, Q message.Queue, B message.Broker[P, Q]] struct { + broker B + producer P + queue Q + ctx context.Context + actuator AsynchronousActuator[P, Q, B] + handler AsynchronousHandler[P, Q, B] + callback AsynchronousCallbackHandler[P, Q, B] +} + +func (s *asynchronous[P, Q, B]) OnPublished(controller queue.Controller) { + controller.IncrementCustomMessageCount(1) +} + +func (s *asynchronous[P, Q, B]) OnProcessed(controller queue.Controller) { + controller.IncrementCustomMessageCount(-1) +} + +func (s *asynchronous[P, Q, B]) OnInitialize(ctx context.Context) { + s.ctx = ctx +} + +func (s *asynchronous[P, Q, B]) OnProcess() { + s.actuator(s.ctx, s.broker, func(ctx context.Context, broker B) { + var err error + if s.handler != nil { + err = s.handler(s.ctx, s.broker) + } + + broker.PublishMessage(Synchronous(broker, s.producer, s.queue, func(ctx context.Context, broker B) { + if s.callback != nil { + s.callback(ctx, broker, err) + } + })) + }) +} + +func (s *asynchronous[P, Q, B]) GetProducer() P { + return s.producer +} + +func (s *asynchronous[P, Q, B]) GetQueue() Q { + return s.queue +} diff --git a/server/internal/v2/message/messages/synchronous.go b/server/internal/v2/message/messages/synchronous.go new file mode 100644 index 0000000..bb32677 --- /dev/null +++ b/server/internal/v2/message/messages/synchronous.go @@ -0,0 +1,55 @@ +package messages + +import ( + "context" + "github.com/kercylan98/minotaur/server/internal/v2/message" + "github.com/kercylan98/minotaur/server/internal/v2/queue" +) + +type ( + SynchronousHandler[P message.Producer, Q message.Queue, B message.Broker[P, Q]] func(context.Context, B) +) + +func Synchronous[P message.Producer, Q message.Queue, B message.Broker[P, Q]]( + broker B, producer P, queue Q, + handler SynchronousHandler[P, Q, B], +) message.Message[P, Q] { + return &synchronous[P, Q, B]{ + broker: broker, + producer: producer, + queue: queue, + handler: handler, + } +} + +type synchronous[P message.Producer, Q message.Queue, B message.Broker[P, Q]] struct { + broker B + producer P + queue Q + ctx context.Context + handler SynchronousHandler[P, Q, B] +} + +func (s *synchronous[P, Q, B]) OnPublished(controller queue.Controller) { + +} + +func (s *synchronous[P, Q, B]) OnProcessed(controller queue.Controller) { + +} + +func (s *synchronous[P, Q, B]) OnInitialize(ctx context.Context) { + s.ctx = ctx +} + +func (s *synchronous[P, Q, B]) OnProcess() { + s.handler(s.ctx, s.broker) +} + +func (s *synchronous[P, Q, B]) GetProducer() P { + return s.producer +} + +func (s *synchronous[P, Q, B]) GetQueue() Q { + return s.queue +} diff --git a/server/internal/v2/message/producer.go b/server/internal/v2/message/producer.go new file mode 100644 index 0000000..0691634 --- /dev/null +++ b/server/internal/v2/message/producer.go @@ -0,0 +1,5 @@ +package message + +type Producer interface { + +} diff --git a/server/internal/v2/message/queue.go b/server/internal/v2/message/queue.go new file mode 100644 index 0000000..c3c0233 --- /dev/null +++ b/server/internal/v2/message/queue.go @@ -0,0 +1,3 @@ +package message + +type Queue comparable diff --git a/server/internal/v2/options.go b/server/internal/v2/options.go index e8862a0..8d282c8 100644 --- a/server/internal/v2/options.go +++ b/server/internal/v2/options.go @@ -1,6 +1,7 @@ package server import ( + "github.com/kercylan98/minotaur/server/internal/v2/message" "github.com/kercylan98/minotaur/utils/log/v2" "os" "sync" @@ -26,16 +27,16 @@ func DefaultOptions() *Options { type Options struct { server *server rw sync.RWMutex - serverMessageChannelSize int // 服务器 Actor 消息处理管道大小 - actorMessageChannelSize int // Actor 消息处理管道大小 - serverMessageBufferInitialSize int // 服务器 Actor 消息写入缓冲区初始化大小 - actorMessageBufferInitialSize int // Actor 消息写入缓冲区初始化大小 - messageErrorHandler func(srv Server, message Message, err error) // 消息错误处理器 - lifeCycleLimit time.Duration // 服务器生命周期上限,在服务器启动后达到生命周期上限将关闭服务器 - logger *log.Logger // 日志记录器 - debug bool // Debug 模式 - syncLowMessageDuration time.Duration // 同步慢消息时间 - asyncLowMessageDuration time.Duration // 异步慢消息时间 + serverMessageChannelSize int // 服务器 Actor 消息处理管道大小 + actorMessageChannelSize int // Actor 消息处理管道大小 + serverMessageBufferInitialSize int // 服务器 Actor 消息写入缓冲区初始化大小 + actorMessageBufferInitialSize int // Actor 消息写入缓冲区初始化大小 + messageErrorHandler func(srv Server, message message.Message[Producer, string], err error) // 消息错误处理器 + lifeCycleLimit time.Duration // 服务器生命周期上限,在服务器启动后达到生命周期上限将关闭服务器 + logger *log.Logger // 日志记录器 + debug bool // Debug 模式 + syncLowMessageDuration time.Duration // 同步慢消息时间 + asyncLowMessageDuration time.Duration // 异步慢消息时间 } func (opt *Options) init(srv *server) *Options { @@ -189,14 +190,14 @@ func (opt *Options) GetActorMessageBufferInitialSize() int { // WithMessageErrorHandler 设置消息错误处理器,当消息处理出现错误时,会调用该处理器进行处理 // - 如果在运行时设置,后续消息错误将会使用新的 handler 进行处理 -func (opt *Options) WithMessageErrorHandler(handler func(srv Server, message Message, err error)) *Options { +func (opt *Options) WithMessageErrorHandler(handler func(srv Server, message message.Message[Producer, string], err error)) *Options { return opt.modifyOptionsValue(func(opt *Options) { opt.messageErrorHandler = handler }) } -func (opt *Options) GetMessageErrorHandler() func(srv Server, message Message, err error) { - return getOptionsValue(opt, func(opt *Options) func(srv Server, message Message, err error) { +func (opt *Options) GetMessageErrorHandler() func(srv Server, message message.Message[Producer, string], err error) { + return getOptionsValue(opt, func(opt *Options) func(srv Server, message message.Message[Producer, string], err error) { return opt.messageErrorHandler }) } diff --git a/server/internal/v2/producer.go b/server/internal/v2/producer.go new file mode 100644 index 0000000..f981085 --- /dev/null +++ b/server/internal/v2/producer.go @@ -0,0 +1,21 @@ +package server + +func newProducer(srv Server, conn Conn) Producer { + return Producer{ + srv: srv, + conn: conn, + } +} + +type Producer struct { + srv Server + conn Conn +} + +func (p Producer) GetServer() Server { + return p.srv +} + +func (p Producer) GetConn() (conn Conn, exist bool) { + return p.conn, p.conn != nil +} diff --git a/server/internal/v2/queue/controller.go b/server/internal/v2/queue/controller.go new file mode 100644 index 0000000..37dc5ff --- /dev/null +++ b/server/internal/v2/queue/controller.go @@ -0,0 +1,34 @@ +package queue + +type ( + incrementCustomMessageCountHandler func(delta int64) +) + +func newController[Id, Q comparable, M Message[Q]](queue *Queue[Id, Q, M], message Message[Q]) Controller { + return Controller{ + incrementCustomMessageCount: func(delta int64) { + queueName := message.GetQueue() + + queue.cond.L.Lock() + + currIdent := queue.identifiers[queueName] + currIdent += delta + queue.identifiers[queueName] = currIdent + queue.state.total += delta + //log.Info("消息总计数", log.Int64("计数", q.state.total)) + + queue.cond.Signal() + queue.cond.L.Unlock() + }, + } +} + +// Controller 队列控制器 +type Controller struct { + incrementCustomMessageCount incrementCustomMessageCountHandler +} + +// IncrementCustomMessageCount 增加自定义消息计数,当消息计数不为 > 0 时会导致队列关闭进入等待状态 +func (c Controller) IncrementCustomMessageCount(delta int64) { + c.incrementCustomMessageCount(delta) +} diff --git a/server/internal/v2/queue/message.go b/server/internal/v2/queue/message.go index 4bafd43..3bea6f2 100644 --- a/server/internal/v2/queue/message.go +++ b/server/internal/v2/queue/message.go @@ -1,6 +1,11 @@ package queue // Message 消息接口定义 -type Message interface { - // 保留 +type Message[Queue comparable] interface { + // GetQueue 获取消息执行队列 + GetQueue() Queue + // OnPublished 消息发布成功 + OnPublished(controller Controller) + // OnProcessed 消息处理完成 + OnProcessed(controller Controller) } diff --git a/server/internal/v2/queue/message_handler.go b/server/internal/v2/queue/message_handler.go index aabab48..011ea39 100644 --- a/server/internal/v2/queue/message_handler.go +++ b/server/internal/v2/queue/message_handler.go @@ -3,7 +3,7 @@ package queue // MessageHandler 消息处理器支持传入两个函数对消息进行处理 // - 在 handler 内可以执行对消息的逻辑 // - 在 finisher 函数中可以接收到该消息是否是最后一条消息 -type MessageHandler[Id, Ident comparable, M Message] func( - handler func(m MessageWrapper[Id, Ident, M]), - finisher func(m MessageWrapper[Id, Ident, M], last bool), +type MessageHandler[Id, Q comparable, M Message[Q]] func( + handler func(m M), + finisher func(m M, last bool), ) diff --git a/server/internal/v2/queue/message_wrapper.go b/server/internal/v2/queue/message_wrapper.go deleted file mode 100644 index c8ef001..0000000 --- a/server/internal/v2/queue/message_wrapper.go +++ /dev/null @@ -1,38 +0,0 @@ -package queue - -func messageWrapper[Id, Ident comparable, M Message](queue *Queue[Id, Ident, M], hasIdent bool, ident Ident, msg M) MessageWrapper[Id, Ident, M] { - return MessageWrapper[Id, Ident, M]{ - queue: queue, - hasIdent: hasIdent, - ident: ident, - msg: msg, - } -} - -// MessageWrapper 提供了对外部消息的包装,用于方便的获取消息信息 -type MessageWrapper[Id, Ident comparable, M Message] struct { - queue *Queue[Id, Ident, M] // 处理消息的队列 - ident Ident // 消息所有人 - msg M // 消息信息 - hasIdent bool // 是否拥有所有人 -} - -// Queue 返回处理该消息的队列 -func (m MessageWrapper[Id, Ident, M]) Queue() *Queue[Id, Ident, M] { - return m.queue -} - -// Ident 返回消息的所有人 -func (m MessageWrapper[Id, Ident, M]) Ident() Ident { - return m.ident -} - -// HasIdent 返回消息是否拥有所有人 -func (m MessageWrapper[Id, Ident, M]) HasIdent() bool { - return m.hasIdent -} - -// Message 返回消息的具体实例 -func (m MessageWrapper[Id, Ident, M]) Message() M { - return m.msg -} diff --git a/server/internal/v2/queue/queue.go b/server/internal/v2/queue/queue.go index 74815b3..2ab5ae8 100644 --- a/server/internal/v2/queue/queue.go +++ b/server/internal/v2/queue/queue.go @@ -7,15 +7,15 @@ import ( ) // New 创建一个并发安全的队列 Queue,该队列支持通过 chanSize 自定义读取 channel 的大小,同支持使用 bufferSize 指定 buffer.Ring 的初始大小 -func New[Id, Ident comparable, M Message](id Id, chanSize, bufferSize int) *Queue[Id, Ident, M] { - q := &Queue[Id, Ident, M]{ - c: make(chan MessageHandler[Id, Ident, M], chanSize), - buf: buffer.NewRing[MessageWrapper[Id, Ident, M]](bufferSize), +func New[Id, Q comparable, M Message[Q]](id Id, chanSize, bufferSize int) *Queue[Id, Q, M] { + q := &Queue[Id, Q, M]{ + c: make(chan MessageHandler[Id, Q, M], chanSize), + buf: buffer.NewRing[wrapper[Id, Q, M]](bufferSize), condRW: &sync.RWMutex{}, - identifiers: make(map[Ident]int64), + identifiers: make(map[Q]int64), } q.cond = sync.NewCond(q.condRW) - q.state = &State[Id, Ident, M]{ + q.state = &State[Id, Q, M]{ queue: q, id: id, status: StatusNone, @@ -27,39 +27,39 @@ func New[Id, Ident comparable, M Message](id Id, chanSize, bufferSize int) *Queu // - 该队列接收自定义的消息 M,并将消息有序的传入 Read 函数所返回的 channel 中以供处理 // - 该结构主要实现目标为读写分离且并发安全的非阻塞传输队列,当消费阻塞时以牺牲内存为代价换取消息的生产不阻塞,适用于服务器消息处理等 // - 该队列保证了消息的完整性,确保消息不丢失,在队列关闭后会等待所有消息处理完毕后进行关闭,并提供 SetClosedHandler 函数来监听队列的关闭信号 -type Queue[Id, Ident comparable, M Message] struct { - state *State[Id, Ident, M] // 队列状态信息 - c chan MessageHandler[Id, Ident, M] // 消息读取通道 - buf *buffer.Ring[MessageWrapper[Id, Ident, M]] // 消息缓冲区 - cond *sync.Cond // 条件变量 - condRW *sync.RWMutex // 条件变量的读写锁 - closedHandler func(q *Queue[Id, Ident, M]) // 关闭处理函数 - identifiers map[Ident]int64 // 标识符在队列的消息计数映射 +type Queue[Id, Q comparable, M Message[Q]] struct { + state *State[Id, Q, M] // 队列状态信息 + c chan MessageHandler[Id, Q, M] // 消息读取通道 + buf *buffer.Ring[wrapper[Id, Q, M]] // 消息缓冲区 + cond *sync.Cond // 条件变量 + condRW *sync.RWMutex // 条件变量的读写锁 + closedHandler func(q *Queue[Id, Q, M]) // 关闭处理函数 + identifiers map[Q]int64 // 标识符在队列的消息计数映射 } // Id 获取队列 Id -func (q *Queue[Id, Ident, M]) Id() Id { +func (q *Queue[Id, Q, M]) Id() Id { return q.state.id } // SetId 设置队列 Id -func (q *Queue[Id, Ident, M]) SetId(id Id) { +func (q *Queue[Id, Q, M]) SetId(id Id) { q.state.id = id } // SetClosedHandler 设置队列关闭处理函数,在队列关闭后将执行该函数。此刻队列不再可用 // - Close 函数为非阻塞调用,调用后不会立即关闭队列,会等待消息处理完毕且处理期间不再有新消息介入 -func (q *Queue[Id, Ident, M]) SetClosedHandler(handler func(q *Queue[Id, Ident, M])) { +func (q *Queue[Id, Q, M]) SetClosedHandler(handler func(q *Queue[Id, Q, M])) { q.closedHandler = handler } // Run 阻塞的运行队列,当队列非首次运行时,将会引发来自 ErrorQueueInvalid 的 panic -func (q *Queue[Id, Ident, M]) Run() { +func (q *Queue[Id, Q, M]) Run() { if atomic.LoadInt32(&q.state.status) != StatusNone { panic(ErrorQueueInvalid) } atomic.StoreInt32(&q.state.status, StatusRunning) - defer func(q *Queue[Id, Ident, M]) { + defer func(q *Queue[Id, Q, M]) { if q.closedHandler != nil { q.closedHandler(q) } @@ -77,16 +77,21 @@ func (q *Queue[Id, Ident, M]) Run() { } items := q.buf.ReadAll() q.cond.L.Unlock() - for _, item := range items { - q.c <- func(handler func(MessageWrapper[Id, Ident, M]), finisher func(m MessageWrapper[Id, Ident, M], last bool)) { - defer func(msg MessageWrapper[Id, Ident, M]) { + for i := 0; i < len(items); i++ { + item := items[i] + q.c <- func(handler func(m M), finisher func(m M, last bool)) { + defer func(msg M) { + msg.OnProcessed(item.controller) + + queue := msg.GetQueue() + q.cond.L.Lock() q.state.total-- - curr := q.identifiers[msg.Ident()] - 1 + curr := q.identifiers[queue] - 1 if curr != 0 { - q.identifiers[msg.Ident()] = curr + q.identifiers[queue] = curr } else { - delete(q.identifiers, msg.Ident()) + delete(q.identifiers, queue) } if finisher != nil { finisher(msg, curr == 0) @@ -94,36 +99,40 @@ func (q *Queue[Id, Ident, M]) Run() { //log.Info("消息总计数", log.Int64("计数", q.state.total)) q.cond.Signal() q.cond.L.Unlock() - }(item) + }(item.message) - handler(item) + handler(item.message) } } } } -// Push 向队列中推送来自 ident 的消息 m,当队列已关闭时将会返回 ErrorQueueClosed -func (q *Queue[Id, Ident, M]) Push(hasIdent bool, ident Ident, m M) error { +// Push 向队列中推送来自 queue 的消息 m,当队列已关闭时将会返回 ErrorQueueClosed +func (q *Queue[Id, Q, M]) Push(queue Q, m M) error { if atomic.LoadInt32(&q.state.status) > StatusClosing { return ErrorQueueClosed } + wrapper := newWrapper(q, m) + q.cond.L.Lock() - q.identifiers[ident]++ + q.identifiers[queue]++ q.state.total++ - q.buf.Write(messageWrapper(q, hasIdent, ident, m)) + q.buf.Write(wrapper) //log.Info("消息总计数", log.Int64("计数", q.state.total)) q.cond.Signal() q.cond.L.Unlock() + + m.OnPublished(wrapper.controller) return nil } // WaitAdd 向队列增加来自外部的等待计数,在队列关闭时会等待该计数归零 -func (q *Queue[Id, Ident, M]) WaitAdd(ident Ident, delta int64) { +func (q *Queue[Id, Q, M]) WaitAdd(queue Q, delta int64) { q.cond.L.Lock() - currIdent := q.identifiers[ident] + currIdent := q.identifiers[queue] currIdent += delta - q.identifiers[ident] = currIdent + q.identifiers[queue] = currIdent q.state.total += delta //log.Info("消息总计数", log.Int64("计数", q.state.total)) @@ -132,23 +141,23 @@ func (q *Queue[Id, Ident, M]) WaitAdd(ident Ident, delta int64) { } // Read 获取队列消息的只读通道, -func (q *Queue[Id, Ident, M]) Read() <-chan MessageHandler[Id, Ident, M] { +func (q *Queue[Id, Q, M]) Read() <-chan MessageHandler[Id, Q, M] { return q.c } // Close 关闭队列 -func (q *Queue[Id, Ident, M]) Close() { +func (q *Queue[Id, Q, M]) Close() { atomic.CompareAndSwapInt32(&q.state.status, StatusRunning, StatusClosing) q.cond.Broadcast() } // State 获取队列状态 -func (q *Queue[Id, Ident, M]) State() *State[Id, Ident, M] { +func (q *Queue[Id, Q, M]) State() *State[Id, Q, M] { return q.state } // GetMessageCount 获取消息数量 -func (q *Queue[Id, Ident, M]) GetMessageCount() (count int64) { +func (q *Queue[Id, Q, M]) GetMessageCount() (count int64) { q.condRW.RLock() defer q.condRW.RUnlock() for _, i := range q.identifiers { @@ -158,8 +167,8 @@ func (q *Queue[Id, Ident, M]) GetMessageCount() (count int64) { } // GetMessageCountWithIdent 获取特定消息人的消息数量 -func (q *Queue[Id, Ident, M]) GetMessageCountWithIdent(ident Ident) int64 { +func (q *Queue[Id, Q, M]) GetMessageCountWithIdent(queue Q) int64 { q.condRW.RLock() defer q.condRW.RUnlock() - return q.identifiers[ident] + return q.identifiers[queue] } diff --git a/server/internal/v2/queue/state.go b/server/internal/v2/queue/state.go index a61173d..fcf024d 100644 --- a/server/internal/v2/queue/state.go +++ b/server/internal/v2/queue/state.go @@ -11,24 +11,24 @@ const ( StatusClosed // 队列已关闭 ) -type State[Id, Ident comparable, M Message] struct { - queue *Queue[Id, Ident, M] +type State[Id, Q comparable, M Message[Q]] struct { + queue *Queue[Id, Q, M] id Id // 队列 ID status int32 // 状态标志 total int64 // 消息总计数 } // IsClosed 判断队列是否已关闭 -func (q *State[Id, Ident, M]) IsClosed() bool { +func (q *State[Id, Q, M]) IsClosed() bool { return atomic.LoadInt32(&q.status) == StatusClosed } // IsClosing 判断队列是否正在关闭 -func (q *State[Id, Ident, M]) IsClosing() bool { +func (q *State[Id, Q, M]) IsClosing() bool { return atomic.LoadInt32(&q.status) == StatusClosing } // IsRunning 判断队列是否正在运行 -func (q *State[Id, Ident, M]) IsRunning() bool { +func (q *State[Id, Q, M]) IsRunning() bool { return atomic.LoadInt32(&q.status) == StatusRunning } diff --git a/server/internal/v2/queue/wrapper.go b/server/internal/v2/queue/wrapper.go new file mode 100644 index 0000000..d81471d --- /dev/null +++ b/server/internal/v2/queue/wrapper.go @@ -0,0 +1,13 @@ +package queue + +func newWrapper[Id, Q comparable, M Message[Q]](queue *Queue[Id, Q, M], message M) wrapper[Id, Q, M] { + return wrapper[Id, Q, M]{ + message: message, + controller: newController[Id, Q, M](queue, message), + } +} + +type wrapper[Id, Q comparable, M Message[Q]] struct { + message M + controller Controller +} diff --git a/server/internal/v2/reactor/handlers.go b/server/internal/v2/reactor/handlers.go index ac5f956..755ef8a 100644 --- a/server/internal/v2/reactor/handlers.go +++ b/server/internal/v2/reactor/handlers.go @@ -2,6 +2,6 @@ package reactor import "github.com/kercylan98/minotaur/server/internal/v2/queue" -type MessageHandler[M any] func(message queue.MessageWrapper[int, string, M]) +type MessageHandler[Q comparable, M queue.Message[Q]] func(message M) -type ErrorHandler[M any] func(message queue.MessageWrapper[int, string, M], err error) +type ErrorHandler[Q comparable, M queue.Message[Q]] func(message M, err error) diff --git a/server/internal/v2/reactor/reactor.go b/server/internal/v2/reactor/reactor.go index ffe8d85..c537721 100644 --- a/server/internal/v2/reactor/reactor.go +++ b/server/internal/v2/reactor/reactor.go @@ -20,19 +20,18 @@ const ( ) // NewReactor 创建一个新的 Reactor 实例,初始化系统级别的队列和多个 Socket 对应的队列 -func NewReactor[M queue.Message](systemQueueSize, queueSize, systemBufferSize, queueBufferSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] { +func NewReactor[Queue comparable, M queue.Message[Queue]](queueSize, queueBufferSize int, handler MessageHandler[Queue, M], errorHandler ErrorHandler[Queue, M]) *Reactor[Queue, M] { if handler == nil { } - r := &Reactor[M]{ - systemQueue: queue.New[int, string, M](-1, systemQueueSize, systemBufferSize), - lb: loadbalancer.NewRoundRobin[int, *queue.Queue[int, string, M]](), + r := &Reactor[Queue, M]{ + lb: loadbalancer.NewRoundRobin[int, *queue.Queue[int, Queue, M]](), errorHandler: errorHandler, queueSize: queueSize, queueBufferSize: queueBufferSize, state: statusNone, handler: handler, - location: make(map[string]int), + location: make(map[Queue]int), } r.logger.Store(log.GetLogger()) @@ -47,41 +46,40 @@ func NewReactor[M queue.Message](systemQueueSize, queueSize, systemBufferSize, q } // Reactor 是一个消息反应器,管理系统级别的队列和多个 Socket 对应的队列 -type Reactor[M queue.Message] struct { - logger atomic.Pointer[log.Logger] // 日志记录器 - state int32 // 状态 - systemQueue *queue.Queue[int, string, M] // 系统级别的队列 - queueSize int // 队列管道大小 - queueBufferSize int // 队列缓冲区大小 - queues []*queue.Queue[int, string, M] // 所有使用的队列 - queueRW sync.RWMutex // 队列读写锁 - location map[string]int // 所在队列 ID 映射 - locationRW sync.RWMutex // 所在队列 ID 映射锁 - lb *loadbalancer.RoundRobin[int, *queue.Queue[int, string, M]] // 负载均衡器 - wg sync.WaitGroup // 等待组 - cwg sync.WaitGroup // 关闭等待组 - handler MessageHandler[M] // 消息处理器 - errorHandler ErrorHandler[M] // 错误处理器 +type Reactor[Queue comparable, M queue.Message[Queue]] struct { + logger atomic.Pointer[log.Logger] // 日志记录器 + state int32 // 状态 + queueSize int // 队列管道大小 + queueBufferSize int // 队列缓冲区大小 + queues []*queue.Queue[int, Queue, M] // 所有使用的队列 + queueRW sync.RWMutex // 队列读写锁 + location map[Queue]int // 所在队列 ID 映射 + locationRW sync.RWMutex // 所在队列 ID 映射锁 + lb *loadbalancer.RoundRobin[int, *queue.Queue[int, Queue, M]] // 负载均衡器 + wg sync.WaitGroup // 等待组 + cwg sync.WaitGroup // 关闭等待组 + handler MessageHandler[Queue, M] // 消息处理器 + errorHandler ErrorHandler[Queue, M] // 错误处理器 } // SetLogger 设置日志记录器 -func (r *Reactor[M]) SetLogger(logger *log.Logger) { +func (r *Reactor[Queue, M]) SetLogger(logger *log.Logger) { r.logger.Store(logger) } // GetLogger 获取日志记录器 -func (r *Reactor[M]) GetLogger() *log.Logger { +func (r *Reactor[Queue, M]) GetLogger() *log.Logger { return r.logger.Load() } // process 消息处理 -func (r *Reactor[M]) process(msg queue.MessageWrapper[int, string, M]) { - defer func(msg queue.MessageWrapper[int, string, M]) { +func (r *Reactor[Queue, M]) process(msg M) { + defer func(msg M) { if err := super.RecoverTransform(recover()); err != nil { if r.errorHandler != nil { r.errorHandler(msg, err) } else { - r.GetLogger().Error("Reactor", log.String("action", "process"), log.Any("ident", msg.Ident()), log.Int("queue", msg.Queue().Id()), log.Err(err)) + r.GetLogger().Error("Reactor", log.String("action", "process"), log.Any("queue", msg.GetQueue()), log.Err(err)) debug.PrintStack() } } @@ -90,25 +88,17 @@ func (r *Reactor[M]) process(msg queue.MessageWrapper[int, string, M]) { r.handler(msg) } -// SystemDispatch 将消息分发到系统级别的队列 -func (r *Reactor[M]) SystemDispatch(msg M) error { - if atomic.LoadInt32(&r.state) > statusClosing { - r.queueRW.RUnlock() - return fmt.Errorf("reactor closing or closed") - } - return r.systemQueue.Push(false, "", msg) -} - -// IdentDispatch 将消息分发到 ident 使用的队列,当 ident 首次使用时,将会根据负载均衡策略选择一个队列 +// Dispatch 将消息分发到 ident 使用的队列,当 ident 首次使用时,将会根据负载均衡策略选择一个队列 // - 设置 count 会增加消息的外部计数,当 Reactor 关闭时会等待外部计数归零 -func (r *Reactor[M]) IdentDispatch(ident string, msg M) error { +// - 当 ident 为空字符串时候,将发送到 +func (r *Reactor[Queue, M]) Dispatch(ident Queue, msg M) error { r.queueRW.RLock() if atomic.LoadInt32(&r.state) > statusClosing { r.queueRW.RUnlock() return fmt.Errorf("reactor closing or closed") } - var next *queue.Queue[int, string, M] + var next *queue.Queue[int, Queue, M] r.locationRW.RLock() i, exist := r.location[ident] r.locationRW.RUnlock() @@ -126,16 +116,16 @@ func (r *Reactor[M]) IdentDispatch(ident string, msg M) error { next = r.queues[i] } r.queueRW.RUnlock() - return next.Push(true, ident, msg) + return next.Push(ident, msg) } // Run 启动 Reactor,运行系统级别的队列和多个 Socket 对应的队列 -func (r *Reactor[M]) Run(callbacks ...func(queues []*queue.Queue[int, string, M])) { +func (r *Reactor[Queue, M]) Run(callbacks ...func(queues []*queue.Queue[int, Queue, M])) { if !atomic.CompareAndSwapInt32(&r.state, statusNone, statusRunning) { return } r.queueRW.Lock() - queues := append([]*queue.Queue[int, string, M]{r.systemQueue}, r.queues...) + queues := r.queues for _, q := range queues { r.runQueue(q) } @@ -146,13 +136,13 @@ func (r *Reactor[M]) Run(callbacks ...func(queues []*queue.Queue[int, string, M] r.wg.Wait() } -func (r *Reactor[M]) noneLockAddQueue() { - q := queue.New[int, string, M](len(r.queues), r.queueSize, r.queueBufferSize) +func (r *Reactor[Queue, M]) noneLockAddQueue() { + q := queue.New[int, Queue, M](len(r.queues), r.queueSize, r.queueBufferSize) r.lb.Add(q) // 运行前添加到负载均衡器,未运行时允许接收消息 r.queues = append(r.queues, q) } -func (r *Reactor[M]) noneLockDelQueue(q *queue.Queue[int, string, M]) { +func (r *Reactor[Queue, M]) noneLockDelQueue(q *queue.Queue[int, Queue, M]) { idx := q.Id() if idx < 0 || idx >= len(r.queues) || r.queues[idx] != q { return @@ -163,9 +153,9 @@ func (r *Reactor[M]) noneLockDelQueue(q *queue.Queue[int, string, M]) { } } -func (r *Reactor[M]) runQueue(q *queue.Queue[int, string, M]) { +func (r *Reactor[Queue, M]) runQueue(q *queue.Queue[int, Queue, M]) { r.wg.Add(1) - q.SetClosedHandler(func(q *queue.Queue[int, string, M]) { + q.SetClosedHandler(func(q *queue.Queue[int, Queue, M]) { // 关闭时正在等待关闭完成,外部已加锁,无需再次加锁 r.noneLockDelQueue(q) r.cwg.Done() @@ -173,7 +163,7 @@ func (r *Reactor[M]) runQueue(q *queue.Queue[int, string, M]) { }) go q.Run() - go func(r *Reactor[M], q *queue.Queue[int, string, M]) { + go func(r *Reactor[Queue, M], q *queue.Queue[int, Queue, M]) { defer r.wg.Done() for m := range q.Read() { m(r.process, r.processFinish) @@ -181,13 +171,13 @@ func (r *Reactor[M]) runQueue(q *queue.Queue[int, string, M]) { }(r, q) } -func (r *Reactor[M]) Close() { +func (r *Reactor[Queue, M]) Close() { if !atomic.CompareAndSwapInt32(&r.state, statusRunning, statusClosing) { return } r.queueRW.Lock() r.cwg.Add(len(r.queues) + 1) - for _, q := range append(r.queues, r.systemQueue) { + for _, q := range r.queues { q.Close() } r.cwg.Wait() @@ -195,22 +185,25 @@ func (r *Reactor[M]) Close() { r.queueRW.Unlock() } -func (r *Reactor[M]) processFinish(m queue.MessageWrapper[int, string, M], last bool) { - if last { - r.locationRW.RLock() - mq, exist := r.location[m.Ident()] - r.locationRW.RUnlock() +func (r *Reactor[Queue, M]) processFinish(m M, last bool) { + if !last { + return + } + queueName := m.GetQueue() + + r.locationRW.RLock() + mq, exist := r.location[queueName] + r.locationRW.RUnlock() + if exist { + r.locationRW.Lock() + defer r.locationRW.Unlock() + mq, exist = r.location[queueName] if exist { - r.locationRW.Lock() - defer r.locationRW.Unlock() - mq, exist = r.location[m.Ident()] - if exist { - delete(r.location, m.Ident()) - r.queueRW.RLock() - mq := r.queues[mq] - r.queueRW.RUnlock() - r.logger.Load().Debug("Reactor", log.String("action", "unbind"), log.Any("ident", m.Ident()), log.Any("queue", mq.Id())) - } + delete(r.location, queueName) + r.queueRW.RLock() + mq := r.queues[mq] + r.queueRW.RUnlock() + r.logger.Load().Debug("Reactor", log.String("action", "unbind"), log.Any("queueName", queueName), log.Any("queue", mq.Id())) } } } diff --git a/server/internal/v2/server.go b/server/internal/v2/server.go index 3d701d3..771a3a9 100644 --- a/server/internal/v2/server.go +++ b/server/internal/v2/server.go @@ -2,9 +2,15 @@ package server import ( "context" + "fmt" + "github.com/kercylan98/minotaur/server/internal/v2/message" + "github.com/kercylan98/minotaur/server/internal/v2/message/messages" "github.com/kercylan98/minotaur/server/internal/v2/queue" + "github.com/kercylan98/minotaur/utils/collection" "github.com/kercylan98/minotaur/utils/log/v2" + "github.com/kercylan98/minotaur/utils/random" "github.com/panjf2000/ants/v2" + "reflect" "time" "github.com/kercylan98/minotaur/server/internal/v2/reactor" @@ -13,6 +19,7 @@ import ( type Server interface { Events + message.Broker[Producer, string] // Run 运行服务器 Run() error @@ -23,46 +30,45 @@ type Server interface { // GetStatus 获取服务器状态 GetStatus() *State - // PushMessage 推送特定消息到系统队列中进行处理 - PushMessage(message Message) + // PublishSyncMessage 发布同步消息 + PublishSyncMessage(queue string, handler messages.SynchronousHandler[Producer, string, Server]) - // PushSyncMessage 是 PushMessage 中对于 GenerateSystemSyncMessage 的快捷方式 - PushSyncMessage(handler func(srv Server)) - - // PushAsyncMessage 是 PushMessage 中对于 GenerateSystemAsyncMessage 的快捷方式,当 callback 传入多个时,将仅有首个生效 - PushAsyncMessage(handler func(srv Server) error, callback ...func(srv Server, err error)) + // PublishAsyncMessage 发布异步消息,当包含多个 callback 时,仅首个生效 + PublishAsyncMessage(queue string, handler messages.AsynchronousHandler[Producer, string, Server], callback ...messages.AsynchronousCallbackHandler[Producer, string, Server]) } type server struct { *controller *events *Options + queue string ants *ants.Pool state *State notify *notify ctx context.Context cancel context.CancelFunc network Network - reactor *reactor.Reactor[Message] + reactor *reactor.Reactor[string, message.Message[Producer, string]] } func NewServer(network Network, options ...*Options) Server { srv := &server{ network: network, Options: DefaultOptions(), + queue: fmt.Sprintf("%s:%s", reflect.TypeOf(new(server)).String(), random.HostName()), } srv.ctx, srv.cancel = context.WithCancel(context.Background()) srv.notify = new(notify).init(srv) srv.controller = new(controller).init(srv) srv.events = new(events).init(srv) srv.state = new(State).init(srv) - srv.reactor = reactor.NewReactor[Message]( - srv.GetServerMessageChannelSize(), srv.GetActorMessageChannelSize(), - srv.GetServerMessageBufferInitialSize(), srv.GetActorMessageBufferInitialSize(), + srv.reactor = reactor.NewReactor[string, message.Message[Producer, string]]( + srv.GetActorMessageChannelSize(), + srv.GetActorMessageBufferInitialSize(), srv.onProcessMessage, - func(message queue.MessageWrapper[int, string, Message], err error) { + func(message message.Message[Producer, string], err error) { if handler := srv.GetMessageErrorHandler(); handler != nil { - handler(srv, message.Message(), err) + handler(srv, message, err) } }) srv.Options.init(srv).Apply(options...) @@ -84,7 +90,7 @@ func NewServer(network Network, options ...*Options) Server { func (s *server) Run() (err error) { var queueWait = make(chan struct{}) - go s.reactor.Run(func(queues []*queue.Queue[int, string, Message]) { + go s.reactor.Run(func(queues []*queue.Queue[int, string, message.Message[Producer, string]]) { for _, q := range queues { s.GetLogger().Debug("Reactor", log.String("action", "run"), log.Any("queue", q.Id())) } @@ -123,31 +129,42 @@ func (s *server) Shutdown() (err error) { return } -func (s *server) PushMessage(message Message) { - s.controller.PushSystemMessage(message) +func (s *server) getSysQueue() string { + return s.queue } -func (s *server) PushSyncMessage(handler func(srv Server)) { - s.PushMessage(GenerateSystemSyncMessage(handler)) +func (s *server) PublishMessage(msg message.Message[Producer, string]) { + s.reactor.Dispatch(msg.GetQueue(), msg) } -func (s *server) PushAsyncMessage(handler func(srv Server) error, callback ...func(srv Server, err error)) { - var cb func(srv Server, err error) - if len(callback) > 0 { - cb = callback[0] - } - s.PushMessage(GenerateSystemAsyncMessage(handler, cb)) +func (s *server) PublishSyncMessage(queue string, handler messages.SynchronousHandler[Producer, string, Server]) { + s.PublishMessage(messages.Synchronous[Producer, string, Server]( + s, newProducer(s, nil), queue, + handler, + )) +} + +func (s *server) PublishAsyncMessage(queue string, handler messages.AsynchronousHandler[Producer, string, Server], callback ...messages.AsynchronousCallbackHandler[Producer, string, Server]) { + s.PublishMessage(messages.Asynchronous[Producer, string, Server]( + s, newProducer(s, nil), queue, + func(ctx context.Context, srv Server, f func(context.Context, Server)) { + s.ants.Submit(func() { + f(ctx, s) + }) + }, + handler, + collection.FindFirstOrDefaultInSlice(callback, nil), + )) } func (s *server) GetStatus() *State { return s.state.Status() } -func (s *server) onProcessMessage(message queue.MessageWrapper[int, string, Message]) { +func (s *server) onProcessMessage(m message.Message[Producer, string]) { s.getManyOptions(func(opt *Options) { - m := message.Message() - m.OnInitialize(s, s.reactor, message) + ctx := context.Background() + m.OnInitialize(ctx) m.OnProcess() }) - } diff --git a/server/internal/v2/server_test.go b/server/internal/v2/server_test.go index bff33f4..9ff3ccc 100644 --- a/server/internal/v2/server_test.go +++ b/server/internal/v2/server_test.go @@ -1,6 +1,7 @@ package server_test import ( + "context" "github.com/gobwas/ws" "github.com/kercylan98/minotaur/server/internal/v2" "github.com/kercylan98/minotaur/server/internal/v2/network" @@ -30,19 +31,18 @@ func TestNewServer(t *testing.T) { var tm = make(map[string]bool) srv.RegisterConnectionOpenedEvent(func(srv server.Server, conn server.Conn) { - conn.SetActor(random.HostName()) if err := conn.WritePacket(server.NewPacket([]byte("hello")).SetContext(ws.OpText)); err != nil { t.Error(err) } - conn.PushMessage(server.GenerateCrossQueueMessage("target", func(srv server.Server) { + srv.PublishAsyncMessage("123", func(ctx context.Context, s server.Server) error { + return nil + }, func(ctx context.Context, s server.Server, err error) { for i := 0; i < 10000000; i++ { _ = tm["1"] tm["1"] = random.Bool() } - }, func(srv server.Server) { - - })) + }) }) srv.RegisterConnectionReceivePacketEvent(func(srv server.Server, conn server.Conn, packet server.Packet) {