From e2b7887b142be1217572e6f2e487554eedc5010e Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Mon, 13 Nov 2023 14:16:38 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20server=20=E6=96=B0=E5=A2=9E=20Unique=20?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E6=B6=88=E6=81=AF=EF=BC=8C=E5=8F=AF=E7=94=A8?= =?UTF-8?q?=E4=BA=8E=E9=81=BF=E5=85=8D=E7=9B=B8=E5=90=8C=E6=A0=87=E8=AF=86?= =?UTF-8?q?=E7=9A=84=E5=BC=82=E6=AD=A5=E6=B6=88=E6=81=AF=E5=9C=A8=E6=9C=AA?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=AE=8C=E6=AF=95=E5=89=8D=E9=87=8D=E5=A4=8D?= =?UTF-8?q?=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 2 +- go.sum | 2 + server/conn.go | 7 ++++ server/dispatcher.go | 24 +++++++++-- server/message.go | 62 +++++++++++++++++++++++------ server/server.go | 95 ++++++++++++++++++++++++++++++++++---------- 6 files changed, 155 insertions(+), 37 deletions(-) diff --git a/go.mod b/go.mod index 7772ed2..3dadec3 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.21 require ( github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 - github.com/alphadose/haxmap v1.2.0 + github.com/alphadose/haxmap v1.3.0 github.com/gin-contrib/pprof v1.4.0 github.com/gin-gonic/gin v1.9.1 github.com/go-resty/resty/v2 v2.7.0 diff --git a/go.sum b/go.sum index 0d15992..91205e9 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 h1:iPugyBI7 github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108/go.mod h1:WAMLHwunr1hi3u7OjGV6/VWG9QbdMhGpEKjROiSFd10= github.com/alphadose/haxmap v1.2.0 h1:noGrAmCE+gNheZ4KpW+sYj9W5uMcO1UAjbAq9XBOAfM= github.com/alphadose/haxmap v1.2.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM= +github.com/alphadose/haxmap v1.3.0 h1:C/2LboOnPCZP27GmmSXOcwx360st0P8N0fTJ3voefKc= +github.com/alphadose/haxmap v1.3.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= diff --git a/server/conn.go b/server/conn.go index 81aa215..df55cbd 100644 --- a/server/conn.go +++ b/server/conn.go @@ -231,6 +231,13 @@ func (slf *Conn) PushAsyncMessage(caller func() error, callback func(err error), slf.server.PushShuntAsyncMessage(slf, caller, callback, mark...) } +// PushUniqueAsyncMessage 推送唯一异步消息,该消息将通过 Server.PushUniqueShuntAsyncMessage 函数推送 +// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 +// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息 +func (slf *Conn) PushUniqueAsyncMessage(name string, caller func() error, callback func(err error), mark ...log.Field) { + slf.server.PushUniqueShuntAsyncMessage(slf, name, caller, callback, mark...) +} + // Write 向连接中写入数据 // - messageType: websocket模式中指定消息类型 func (slf *Conn) Write(packet []byte, callback ...func(err error)) { diff --git a/server/dispatcher.go b/server/dispatcher.go index e3b4c17..e5639e3 100644 --- a/server/dispatcher.go +++ b/server/dispatcher.go @@ -1,19 +1,35 @@ package server -import "github.com/kercylan98/minotaur/utils/buffer" +import ( + "github.com/alphadose/haxmap" + "github.com/kercylan98/minotaur/utils/buffer" +) + +var dispatcherUnique = struct{}{} // generateDispatcher 生成消息分发器 -func generateDispatcher(handler func(message *Message)) *dispatcher { +func generateDispatcher(handler func(dispatcher *dispatcher, message *Message)) *dispatcher { return &dispatcher{ buffer: buffer.NewUnboundedN[*Message](), handler: handler, + uniques: haxmap.New[string, struct{}](), } } // dispatcher 消息分发器 type dispatcher struct { buffer *buffer.Unbounded[*Message] - handler func(message *Message) + uniques *haxmap.Map[string, struct{}] + handler func(dispatcher *dispatcher, message *Message) +} + +func (slf *dispatcher) unique(name string) bool { + _, loaded := slf.uniques.GetOrSet(name, dispatcherUnique) + return loaded +} + +func (slf *dispatcher) antiUnique(name string) { + slf.uniques.Del(name) } func (slf *dispatcher) start() { @@ -24,7 +40,7 @@ func (slf *dispatcher) start() { return } slf.buffer.Load() - slf.handler(message) + slf.handler(slf, message) } } } diff --git a/server/message.go b/server/message.go index 74a7af9..69c5082 100644 --- a/server/message.go +++ b/server/message.go @@ -1,10 +1,8 @@ package server import ( - "fmt" "github.com/kercylan98/minotaur/utils/hash" "github.com/kercylan98/minotaur/utils/log" - "github.com/kercylan98/minotaur/utils/super" ) const ( @@ -32,20 +30,36 @@ const ( // MessageTypeShuntAsyncCallback 分流异步回调消息类型 MessageTypeShuntAsyncCallback + // MessageTypeUniqueAsync 唯一异步消息类型 + MessageTypeUniqueAsync + + // MessageTypeUniqueAsyncCallback 唯一异步回调消息类型 + MessageTypeUniqueAsyncCallback + + // MessageTypeUniqueShuntAsync 唯一分流异步消息类型 + MessageTypeUniqueShuntAsync + + // MessageTypeUniqueShuntAsyncCallback 唯一分流异步回调消息类型 + MessageTypeUniqueShuntAsyncCallback + // MessageTypeSystem 系统消息类型 MessageTypeSystem ) var messageNames = map[MessageType]string{ - MessageTypePacket: "MessageTypePacket", - MessageTypeError: "MessageTypeError", - MessageTypeTicker: "MessageTypeTicker", - MessageTypeShuntTicker: "MessageTypeShuntTicker", - MessageTypeAsync: "MessageTypeAsync", - MessageTypeAsyncCallback: "MessageTypeAsyncCallback", - MessageTypeShuntAsync: "MessageTypeShuntAsync", - MessageTypeShuntAsyncCallback: "MessageTypeShuntAsyncCallback", - MessageTypeSystem: "MessageTypeSystem", + MessageTypePacket: "MessageTypePacket", + MessageTypeError: "MessageTypeError", + MessageTypeTicker: "MessageTypeTicker", + MessageTypeShuntTicker: "MessageTypeShuntTicker", + MessageTypeAsync: "MessageTypeAsync", + MessageTypeAsyncCallback: "MessageTypeAsyncCallback", + MessageTypeShuntAsync: "MessageTypeShuntAsync", + MessageTypeShuntAsyncCallback: "MessageTypeShuntAsyncCallback", + MessageTypeUniqueAsync: "MessageTypeUniqueAsync", + MessageTypeUniqueAsyncCallback: "MessageTypeUniqueAsyncCallback", + MessageTypeUniqueShuntAsync: "MessageTypeUniqueShuntAsync", + MessageTypeUniqueShuntAsyncCallback: "MessageTypeUniqueShuntAsyncCallback", + MessageTypeSystem: "MessageTypeSystem", } const ( @@ -110,7 +124,7 @@ func (slf *Message) MessageType() MessageType { // String 返回消息的字符串表示 func (slf *Message) String() string { - return fmt.Sprintf("[%s] %s", slf.t, super.MarshalJSON(slf.marks)) + return slf.t.String() } // String 返回消息类型的字符串表示 @@ -160,6 +174,30 @@ func (slf *Message) castToShuntAsyncCallbackMessage(conn *Conn, err error, calle return slf } +// castToUniqueAsyncMessage 将消息转换为唯一异步消息 +func (slf *Message) castToUniqueAsyncMessage(unique string, caller func() error, callback func(err error), mark ...log.Field) *Message { + slf.t, slf.name, slf.exceptionHandler, slf.errHandler, slf.marks = MessageTypeUniqueAsync, unique, caller, callback, mark + return slf +} + +// castToUniqueAsyncCallbackMessage 将消息转换为唯一异步回调消息 +func (slf *Message) castToUniqueAsyncCallbackMessage(unique string, err error, caller func(err error), mark ...log.Field) *Message { + slf.t, slf.name, slf.err, slf.errHandler, slf.marks = MessageTypeUniqueAsyncCallback, unique, err, caller, mark + return slf +} + +// castToUniqueShuntAsyncMessage 将消息转换为唯一分流异步消息 +func (slf *Message) castToUniqueShuntAsyncMessage(conn *Conn, unique string, caller func() error, callback func(err error), mark ...log.Field) *Message { + slf.t, slf.conn, slf.name, slf.exceptionHandler, slf.errHandler, slf.marks = MessageTypeUniqueShuntAsync, conn, unique, caller, callback, mark + return slf +} + +// castToUniqueShuntAsyncCallbackMessage 将消息转换为唯一分流异步回调消息 +func (slf *Message) castToUniqueShuntAsyncCallbackMessage(conn *Conn, unique string, err error, caller func(err error), mark ...log.Field) *Message { + slf.t, slf.conn, slf.name, slf.err, slf.errHandler, slf.marks = MessageTypeUniqueShuntAsyncCallback, conn, unique, err, caller, mark + return slf +} + // castToSystemMessage 将消息转换为系统消息 func (slf *Message) castToSystemMessage(caller func(), mark ...log.Field) *Message { slf.t, slf.ordinaryHandler, slf.marks = MessageTypeSystem, caller, mark diff --git a/server/server.go b/server/server.go index 4aedd39..2aebc2f 100644 --- a/server/server.go +++ b/server/server.go @@ -81,27 +81,27 @@ type Server struct { *event // 事件 *runtime // 运行时 *option // 可选项 - network Network // 网络类型 - addr string // 侦听地址 - systemSignal chan os.Signal // 系统信号 - online *concurrent.BalanceMap[string, *Conn] // 在线连接 ginServer *gin.Engine // HTTP模式下的路由器 httpServer *http.Server // HTTP模式下的服务器 grpcServer *grpc.Server // GRPC模式下的服务器 gServer *gNet // TCP或UDP模式下的服务器 - isRunning bool // 是否正在运行 - isShutdown atomic.Bool // 是否已关闭 - closeChannel chan struct{} // 关闭信号 + multiple *MultipleServer // 多服务器模式下的服务器 ants *ants.Pool // 协程池 messagePool *concurrent.Pool[*Message] // 消息池 - messageLock sync.RWMutex // 消息锁 - multiple *MultipleServer // 多服务器模式下的服务器 - multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 - runMode RunMode // 运行模式 - messageCounter atomic.Int64 // 消息计数器 ctx context.Context // 上下文 - dispatchers map[string]*dispatcher // 消息分发器 + online *concurrent.BalanceMap[string, *Conn] // 在线连接 + network Network // 网络类型 + addr string // 侦听地址 + runMode RunMode // 运行模式 + systemSignal chan os.Signal // 系统信号 + closeChannel chan struct{} // 关闭信号 + multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 + messageLock sync.RWMutex // 消息锁 dispatcherLock sync.RWMutex // 消息分发器锁 + isShutdown atomic.Bool // 是否已关闭 + messageCounter atomic.Int64 // 消息计数器 + isRunning bool // 是否正在运行 + dispatchers map[string]*dispatcher // 消息分发器 } // Run 使用特定地址运行服务器 @@ -589,18 +589,22 @@ func (slf *Server) pushMessage(message *Message) { break } fallthrough - case MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback: + case MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback, MessageTypeUniqueShuntAsync, MessageTypeUniqueShuntAsyncCallback: var created bool dispatcher, created = slf.useDispatcher(slf.shuntMatcher(message.conn)) if created { go dispatcher.start() } - case MessageTypeSystem, MessageTypeAsync, MessageTypeAsyncCallback, MessageTypeError, MessageTypeTicker: + case MessageTypeSystem, MessageTypeAsync, MessageTypeUniqueAsync, MessageTypeAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeError, MessageTypeTicker: dispatcher, _ = slf.useDispatcher(serverSystemDispatcher) } if dispatcher == nil { return } + if (message.t == MessageTypeUniqueShuntAsync || message.t == MessageTypeUniqueAsync) && dispatcher.unique(message.name) { + slf.messagePool.Release(message) + return + } slf.messageCounter.Add(1) dispatcher.put(message) } @@ -613,13 +617,17 @@ func (slf *Server) low(message *Message, present time.Time, expect time.Duration message.marks = append(message.marks, log.String(fmt.Sprintf("Other-%d", i+1), s)) } } - log.Warn("Server", log.String("type", "low-message"), log.String("cost", cost.String()), log.String("message", message.String()), log.Stack("stack")) + var fields = make([]log.Field, 0, len(message.marks)+4) + fields = append(fields, log.String("type", messageNames[message.t]), log.String("cost", cost.String()), log.String("message", message.String())) + fields = append(fields, message.marks...) + fields = append(fields, log.Stack("stack")) + log.Warn("Server", fields...) slf.OnMessageLowExecEvent(message, cost) } } // dispatchMessage 消息分发 -func (slf *Server) dispatchMessage(msg *Message) { +func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { var ( ctx context.Context cancel context.CancelFunc @@ -637,7 +645,7 @@ func (slf *Server) dispatchMessage(msg *Message) { } present := time.Now() - if msg.t != MessageTypeAsync { + if msg.t != MessageTypeAsync && msg.t != MessageTypeUniqueAsync && msg.t != MessageTypeShuntAsync && msg.t != MessageTypeUniqueShuntAsync { defer func(msg *Message) { if err := recover(); err != nil { stack := string(debug.Stack()) @@ -647,6 +655,9 @@ func (slf *Server) dispatchMessage(msg *Message) { slf.OnMessageErrorEvent(msg, e) } } + if msg.t == MessageTypeUniqueAsyncCallback || msg.t == MessageTypeUniqueShuntAsyncCallback { + dispatcher.antiUnique(msg.name) + } super.Handle(cancel) slf.low(msg, present, time.Millisecond*100) @@ -675,10 +686,13 @@ func (slf *Server) dispatchMessage(msg *Message) { } case MessageTypeTicker, MessageTypeShuntTicker: msg.ordinaryHandler() - case MessageTypeAsync, MessageTypeShuntAsync: + case MessageTypeAsync, MessageTypeShuntAsync, MessageTypeUniqueAsync, MessageTypeUniqueShuntAsync: if err := slf.ants.Submit(func() { defer func() { if err := recover(); err != nil { + if msg.t == MessageTypeUniqueAsync || msg.t == MessageTypeUniqueShuntAsync { + dispatcher.antiUnique(msg.name) + } stack := string(debug.Stack()) log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", stack)) fmt.Println(stack) @@ -700,19 +714,28 @@ func (slf *Server) dispatchMessage(msg *Message) { } if msg.errHandler != nil { if msg.conn == nil { + if msg.t == MessageTypeUniqueAsync { + slf.PushUniqueAsyncCallbackMessage(msg.name, err, msg.errHandler) + return + } slf.PushAsyncCallbackMessage(err, msg.errHandler) return } + if msg.t == MessageTypeUniqueShuntAsync { + slf.PushUniqueShuntAsyncCallbackMessage(msg.conn, msg.name, err, msg.errHandler) + return + } slf.PushShuntAsyncCallbackMessage(msg.conn, err, msg.errHandler) return } + dispatcher.antiUnique(msg.name) if err != nil { log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", string(debug.Stack()))) } }); err != nil { panic(err) } - case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback: + case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeUniqueShuntAsyncCallback: msg.errHandler(msg.err) case MessageTypeSystem: msg.ordinaryHandler() @@ -796,6 +819,38 @@ func (slf *Server) PushShuntTickerMessage(conn *Conn, name string, caller func() slf.pushMessage(slf.messagePool.Get().castToShuntTickerMessage(conn, name, caller, mark...)) } +// PushUniqueAsyncMessage 向服务器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致 +// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息 +func (slf *Server) PushUniqueAsyncMessage(unique string, caller func() error, callback func(err error), mark ...log.Field) { + slf.pushMessage(slf.messagePool.Get().castToUniqueAsyncMessage(unique, caller, callback, mark...)) +} + +// PushUniqueAsyncCallbackMessage 向服务器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致 +func (slf *Server) PushUniqueAsyncCallbackMessage(unique string, err error, callback func(err error), mark ...log.Field) { + slf.pushMessage(slf.messagePool.Get().castToUniqueAsyncCallbackMessage(unique, err, callback, mark...)) +} + +// PushUniqueShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致 +// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncMessage 进行转发 +// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息 +func (slf *Server) PushUniqueShuntAsyncMessage(conn *Conn, unique string, caller func() error, callback func(err error), mark ...log.Field) { + if slf.shuntMatcher == nil { + slf.PushUniqueAsyncMessage(unique, caller, callback) + return + } + slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncMessage(conn, unique, caller, callback, mark...)) +} + +// PushUniqueShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致 +// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncCallbackMessage 进行转发 +func (slf *Server) PushUniqueShuntAsyncCallbackMessage(conn *Conn, unique string, err error, callback func(err error), mark ...log.Field) { + if slf.shuntMatcher == nil { + slf.PushUniqueAsyncCallbackMessage(unique, err, callback) + return + } + slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncCallbackMessage(conn, unique, err, callback, mark...)) +} + // PushErrorMessage 向服务器中推送 MessageTypeError 消息 // - 通过该函数推送错误消息,当消息触发时将在系统分发器中处理消息 // - 参数 errAction 用于指定错误消息的处理方式,可选值为 MessageErrorActionNone 和 MessageErrorActionShutdown