feat: server 新增 Unique 异步消息,可用于避免相同标识的异步消息在未执行完毕前重复执行

This commit is contained in:
kercylan98 2023-11-13 14:16:38 +08:00
parent 30c0b3a64b
commit e2b7887b14
6 changed files with 155 additions and 37 deletions

2
go.mod
View File

@ -4,7 +4,7 @@ go 1.21
require (
github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108
github.com/alphadose/haxmap v1.2.0
github.com/alphadose/haxmap v1.3.0
github.com/gin-contrib/pprof v1.4.0
github.com/gin-gonic/gin v1.9.1
github.com/go-resty/resty/v2 v2.7.0

2
go.sum
View File

@ -4,6 +4,8 @@ github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 h1:iPugyBI7
github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108/go.mod h1:WAMLHwunr1hi3u7OjGV6/VWG9QbdMhGpEKjROiSFd10=
github.com/alphadose/haxmap v1.2.0 h1:noGrAmCE+gNheZ4KpW+sYj9W5uMcO1UAjbAq9XBOAfM=
github.com/alphadose/haxmap v1.2.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM=
github.com/alphadose/haxmap v1.3.0 h1:C/2LboOnPCZP27GmmSXOcwx360st0P8N0fTJ3voefKc=
github.com/alphadose/haxmap v1.3.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=

View File

@ -231,6 +231,13 @@ func (slf *Conn) PushAsyncMessage(caller func() error, callback func(err error),
slf.server.PushShuntAsyncMessage(slf, caller, callback, mark...)
}
// PushUniqueAsyncMessage 推送唯一异步消息,该消息将通过 Server.PushUniqueShuntAsyncMessage 函数推送
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息
func (slf *Conn) PushUniqueAsyncMessage(name string, caller func() error, callback func(err error), mark ...log.Field) {
slf.server.PushUniqueShuntAsyncMessage(slf, name, caller, callback, mark...)
}
// Write 向连接中写入数据
// - messageType: websocket模式中指定消息类型
func (slf *Conn) Write(packet []byte, callback ...func(err error)) {

View File

@ -1,19 +1,35 @@
package server
import "github.com/kercylan98/minotaur/utils/buffer"
import (
"github.com/alphadose/haxmap"
"github.com/kercylan98/minotaur/utils/buffer"
)
var dispatcherUnique = struct{}{}
// generateDispatcher 生成消息分发器
func generateDispatcher(handler func(message *Message)) *dispatcher {
func generateDispatcher(handler func(dispatcher *dispatcher, message *Message)) *dispatcher {
return &dispatcher{
buffer: buffer.NewUnboundedN[*Message](),
handler: handler,
uniques: haxmap.New[string, struct{}](),
}
}
// dispatcher 消息分发器
type dispatcher struct {
buffer *buffer.Unbounded[*Message]
handler func(message *Message)
uniques *haxmap.Map[string, struct{}]
handler func(dispatcher *dispatcher, message *Message)
}
func (slf *dispatcher) unique(name string) bool {
_, loaded := slf.uniques.GetOrSet(name, dispatcherUnique)
return loaded
}
func (slf *dispatcher) antiUnique(name string) {
slf.uniques.Del(name)
}
func (slf *dispatcher) start() {
@ -24,7 +40,7 @@ func (slf *dispatcher) start() {
return
}
slf.buffer.Load()
slf.handler(message)
slf.handler(slf, message)
}
}
}

View File

@ -1,10 +1,8 @@
package server
import (
"fmt"
"github.com/kercylan98/minotaur/utils/hash"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/super"
)
const (
@ -32,20 +30,36 @@ const (
// MessageTypeShuntAsyncCallback 分流异步回调消息类型
MessageTypeShuntAsyncCallback
// MessageTypeUniqueAsync 唯一异步消息类型
MessageTypeUniqueAsync
// MessageTypeUniqueAsyncCallback 唯一异步回调消息类型
MessageTypeUniqueAsyncCallback
// MessageTypeUniqueShuntAsync 唯一分流异步消息类型
MessageTypeUniqueShuntAsync
// MessageTypeUniqueShuntAsyncCallback 唯一分流异步回调消息类型
MessageTypeUniqueShuntAsyncCallback
// MessageTypeSystem 系统消息类型
MessageTypeSystem
)
var messageNames = map[MessageType]string{
MessageTypePacket: "MessageTypePacket",
MessageTypeError: "MessageTypeError",
MessageTypeTicker: "MessageTypeTicker",
MessageTypeShuntTicker: "MessageTypeShuntTicker",
MessageTypeAsync: "MessageTypeAsync",
MessageTypeAsyncCallback: "MessageTypeAsyncCallback",
MessageTypeShuntAsync: "MessageTypeShuntAsync",
MessageTypeShuntAsyncCallback: "MessageTypeShuntAsyncCallback",
MessageTypeSystem: "MessageTypeSystem",
MessageTypePacket: "MessageTypePacket",
MessageTypeError: "MessageTypeError",
MessageTypeTicker: "MessageTypeTicker",
MessageTypeShuntTicker: "MessageTypeShuntTicker",
MessageTypeAsync: "MessageTypeAsync",
MessageTypeAsyncCallback: "MessageTypeAsyncCallback",
MessageTypeShuntAsync: "MessageTypeShuntAsync",
MessageTypeShuntAsyncCallback: "MessageTypeShuntAsyncCallback",
MessageTypeUniqueAsync: "MessageTypeUniqueAsync",
MessageTypeUniqueAsyncCallback: "MessageTypeUniqueAsyncCallback",
MessageTypeUniqueShuntAsync: "MessageTypeUniqueShuntAsync",
MessageTypeUniqueShuntAsyncCallback: "MessageTypeUniqueShuntAsyncCallback",
MessageTypeSystem: "MessageTypeSystem",
}
const (
@ -110,7 +124,7 @@ func (slf *Message) MessageType() MessageType {
// String 返回消息的字符串表示
func (slf *Message) String() string {
return fmt.Sprintf("[%s] %s", slf.t, super.MarshalJSON(slf.marks))
return slf.t.String()
}
// String 返回消息类型的字符串表示
@ -160,6 +174,30 @@ func (slf *Message) castToShuntAsyncCallbackMessage(conn *Conn, err error, calle
return slf
}
// castToUniqueAsyncMessage 将消息转换为唯一异步消息
func (slf *Message) castToUniqueAsyncMessage(unique string, caller func() error, callback func(err error), mark ...log.Field) *Message {
slf.t, slf.name, slf.exceptionHandler, slf.errHandler, slf.marks = MessageTypeUniqueAsync, unique, caller, callback, mark
return slf
}
// castToUniqueAsyncCallbackMessage 将消息转换为唯一异步回调消息
func (slf *Message) castToUniqueAsyncCallbackMessage(unique string, err error, caller func(err error), mark ...log.Field) *Message {
slf.t, slf.name, slf.err, slf.errHandler, slf.marks = MessageTypeUniqueAsyncCallback, unique, err, caller, mark
return slf
}
// castToUniqueShuntAsyncMessage 将消息转换为唯一分流异步消息
func (slf *Message) castToUniqueShuntAsyncMessage(conn *Conn, unique string, caller func() error, callback func(err error), mark ...log.Field) *Message {
slf.t, slf.conn, slf.name, slf.exceptionHandler, slf.errHandler, slf.marks = MessageTypeUniqueShuntAsync, conn, unique, caller, callback, mark
return slf
}
// castToUniqueShuntAsyncCallbackMessage 将消息转换为唯一分流异步回调消息
func (slf *Message) castToUniqueShuntAsyncCallbackMessage(conn *Conn, unique string, err error, caller func(err error), mark ...log.Field) *Message {
slf.t, slf.conn, slf.name, slf.err, slf.errHandler, slf.marks = MessageTypeUniqueShuntAsyncCallback, conn, unique, err, caller, mark
return slf
}
// castToSystemMessage 将消息转换为系统消息
func (slf *Message) castToSystemMessage(caller func(), mark ...log.Field) *Message {
slf.t, slf.ordinaryHandler, slf.marks = MessageTypeSystem, caller, mark

View File

@ -81,27 +81,27 @@ type Server struct {
*event // 事件
*runtime // 运行时
*option // 可选项
network Network // 网络类型
addr string // 侦听地址
systemSignal chan os.Signal // 系统信号
online *concurrent.BalanceMap[string, *Conn] // 在线连接
ginServer *gin.Engine // HTTP模式下的路由器
httpServer *http.Server // HTTP模式下的服务器
grpcServer *grpc.Server // GRPC模式下的服务器
gServer *gNet // TCP或UDP模式下的服务器
isRunning bool // 是否正在运行
isShutdown atomic.Bool // 是否已关闭
closeChannel chan struct{} // 关闭信号
multiple *MultipleServer // 多服务器模式下的服务器
ants *ants.Pool // 协程池
messagePool *concurrent.Pool[*Message] // 消息池
messageLock sync.RWMutex // 消息锁
multiple *MultipleServer // 多服务器模式下的服务器
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
runMode RunMode // 运行模式
messageCounter atomic.Int64 // 消息计数器
ctx context.Context // 上下文
dispatchers map[string]*dispatcher // 消息分发器
online *concurrent.BalanceMap[string, *Conn] // 在线连接
network Network // 网络类型
addr string // 侦听地址
runMode RunMode // 运行模式
systemSignal chan os.Signal // 系统信号
closeChannel chan struct{} // 关闭信号
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
messageLock sync.RWMutex // 消息锁
dispatcherLock sync.RWMutex // 消息分发器锁
isShutdown atomic.Bool // 是否已关闭
messageCounter atomic.Int64 // 消息计数器
isRunning bool // 是否正在运行
dispatchers map[string]*dispatcher // 消息分发器
}
// Run 使用特定地址运行服务器
@ -589,18 +589,22 @@ func (slf *Server) pushMessage(message *Message) {
break
}
fallthrough
case MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback:
case MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback, MessageTypeUniqueShuntAsync, MessageTypeUniqueShuntAsyncCallback:
var created bool
dispatcher, created = slf.useDispatcher(slf.shuntMatcher(message.conn))
if created {
go dispatcher.start()
}
case MessageTypeSystem, MessageTypeAsync, MessageTypeAsyncCallback, MessageTypeError, MessageTypeTicker:
case MessageTypeSystem, MessageTypeAsync, MessageTypeUniqueAsync, MessageTypeAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeError, MessageTypeTicker:
dispatcher, _ = slf.useDispatcher(serverSystemDispatcher)
}
if dispatcher == nil {
return
}
if (message.t == MessageTypeUniqueShuntAsync || message.t == MessageTypeUniqueAsync) && dispatcher.unique(message.name) {
slf.messagePool.Release(message)
return
}
slf.messageCounter.Add(1)
dispatcher.put(message)
}
@ -613,13 +617,17 @@ func (slf *Server) low(message *Message, present time.Time, expect time.Duration
message.marks = append(message.marks, log.String(fmt.Sprintf("Other-%d", i+1), s))
}
}
log.Warn("Server", log.String("type", "low-message"), log.String("cost", cost.String()), log.String("message", message.String()), log.Stack("stack"))
var fields = make([]log.Field, 0, len(message.marks)+4)
fields = append(fields, log.String("type", messageNames[message.t]), log.String("cost", cost.String()), log.String("message", message.String()))
fields = append(fields, message.marks...)
fields = append(fields, log.Stack("stack"))
log.Warn("Server", fields...)
slf.OnMessageLowExecEvent(message, cost)
}
}
// dispatchMessage 消息分发
func (slf *Server) dispatchMessage(msg *Message) {
func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
var (
ctx context.Context
cancel context.CancelFunc
@ -637,7 +645,7 @@ func (slf *Server) dispatchMessage(msg *Message) {
}
present := time.Now()
if msg.t != MessageTypeAsync {
if msg.t != MessageTypeAsync && msg.t != MessageTypeUniqueAsync && msg.t != MessageTypeShuntAsync && msg.t != MessageTypeUniqueShuntAsync {
defer func(msg *Message) {
if err := recover(); err != nil {
stack := string(debug.Stack())
@ -647,6 +655,9 @@ func (slf *Server) dispatchMessage(msg *Message) {
slf.OnMessageErrorEvent(msg, e)
}
}
if msg.t == MessageTypeUniqueAsyncCallback || msg.t == MessageTypeUniqueShuntAsyncCallback {
dispatcher.antiUnique(msg.name)
}
super.Handle(cancel)
slf.low(msg, present, time.Millisecond*100)
@ -675,10 +686,13 @@ func (slf *Server) dispatchMessage(msg *Message) {
}
case MessageTypeTicker, MessageTypeShuntTicker:
msg.ordinaryHandler()
case MessageTypeAsync, MessageTypeShuntAsync:
case MessageTypeAsync, MessageTypeShuntAsync, MessageTypeUniqueAsync, MessageTypeUniqueShuntAsync:
if err := slf.ants.Submit(func() {
defer func() {
if err := recover(); err != nil {
if msg.t == MessageTypeUniqueAsync || msg.t == MessageTypeUniqueShuntAsync {
dispatcher.antiUnique(msg.name)
}
stack := string(debug.Stack())
log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", stack))
fmt.Println(stack)
@ -700,19 +714,28 @@ func (slf *Server) dispatchMessage(msg *Message) {
}
if msg.errHandler != nil {
if msg.conn == nil {
if msg.t == MessageTypeUniqueAsync {
slf.PushUniqueAsyncCallbackMessage(msg.name, err, msg.errHandler)
return
}
slf.PushAsyncCallbackMessage(err, msg.errHandler)
return
}
if msg.t == MessageTypeUniqueShuntAsync {
slf.PushUniqueShuntAsyncCallbackMessage(msg.conn, msg.name, err, msg.errHandler)
return
}
slf.PushShuntAsyncCallbackMessage(msg.conn, err, msg.errHandler)
return
}
dispatcher.antiUnique(msg.name)
if err != nil {
log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", string(debug.Stack())))
}
}); err != nil {
panic(err)
}
case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback:
case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeUniqueShuntAsyncCallback:
msg.errHandler(msg.err)
case MessageTypeSystem:
msg.ordinaryHandler()
@ -796,6 +819,38 @@ func (slf *Server) PushShuntTickerMessage(conn *Conn, name string, caller func()
slf.pushMessage(slf.messagePool.Get().castToShuntTickerMessage(conn, name, caller, mark...))
}
// PushUniqueAsyncMessage 向服务器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致
// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息
func (slf *Server) PushUniqueAsyncMessage(unique string, caller func() error, callback func(err error), mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToUniqueAsyncMessage(unique, caller, callback, mark...))
}
// PushUniqueAsyncCallbackMessage 向服务器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致
func (slf *Server) PushUniqueAsyncCallbackMessage(unique string, err error, callback func(err error), mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToUniqueAsyncCallbackMessage(unique, err, callback, mark...))
}
// PushUniqueShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncMessage 进行转发
// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息
func (slf *Server) PushUniqueShuntAsyncMessage(conn *Conn, unique string, caller func() error, callback func(err error), mark ...log.Field) {
if slf.shuntMatcher == nil {
slf.PushUniqueAsyncMessage(unique, caller, callback)
return
}
slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncMessage(conn, unique, caller, callback, mark...))
}
// PushUniqueShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncCallbackMessage 进行转发
func (slf *Server) PushUniqueShuntAsyncCallbackMessage(conn *Conn, unique string, err error, callback func(err error), mark ...log.Field) {
if slf.shuntMatcher == nil {
slf.PushUniqueAsyncCallbackMessage(unique, err, callback)
return
}
slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncCallbackMessage(conn, unique, err, callback, mark...))
}
// PushErrorMessage 向服务器中推送 MessageTypeError 消息
// - 通过该函数推送错误消息,当消息触发时将在系统分发器中处理消息
// - 参数 errAction 用于指定错误消息的处理方式,可选值为 MessageErrorActionNone 和 MessageErrorActionShutdown