Merge branch 'develop'
This commit is contained in:
commit
64280b4de3
93
README.md
93
README.md
|
@ -21,6 +21,7 @@ mindmap
|
|||
/cross 内置跨服功能实现
|
||||
/router 内置路由器功能实现
|
||||
/utils 工具结构函数目录
|
||||
/examples 示例代码目录
|
||||
```
|
||||
|
||||
## Server 架构预览
|
||||
|
@ -65,8 +66,8 @@ chmod 777 ./local-doc.sh
|
|||
- **[http://localhost:9998/pkg/github.com/kercylan98/minotaur/](http://localhost:9998/pkg/github.com/kercylan98/minotaur/)**
|
||||
- **[https://pkg.go.dev/github.com/kercylan98/minotaur](https://pkg.go.dev/github.com/kercylan98/minotaur)**
|
||||
|
||||
### 简单示例
|
||||
创建一个基于Websocket的回响服务器。
|
||||
### 简单回响服务器
|
||||
创建一个基于`Websocket`创建的单线程回响服务器。
|
||||
```go
|
||||
package main
|
||||
|
||||
|
@ -87,6 +88,94 @@ func main() {
|
|||
访问 **[WebSocket 在线测试](http://www.websocket-test.com/)** 进行验证。
|
||||
> Websocket地址: ws://127.0.0.1:9999
|
||||
|
||||
### 分流服务器
|
||||
分流服务器可以将客户端分流到不同的分组上,每个分组中为串行处理,不同分组之间并行处理。
|
||||
```go
|
||||
package main
|
||||
|
||||
import "github.com/kercylan98/minotaur/server"
|
||||
|
||||
func main() {
|
||||
srv := server.New(server.NetworkWebsocket,
|
||||
server.WithShunt(func(guid int64) chan *server.Message {
|
||||
return make(chan *server.Message, 1024*100)
|
||||
}, func(conn *server.Conn) (guid int64, allowToCreate bool) {
|
||||
guid, allowToCreate = conn.GetData("roomId").(int64)
|
||||
return
|
||||
}),
|
||||
)
|
||||
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
|
||||
conn.Write(packet)
|
||||
})
|
||||
if err := srv.Run(":9999"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
```
|
||||
> 该示例中假设各房间互不干涉,故通过`server.WithShunt`将连接通过`roomId`进行分组,提高并发处理能力。
|
||||
|
||||
### 服务器死锁检测
|
||||
`Minotaur`内置了服务器消息死锁检测功能,可通过`server.WithDeadlockDetect`进行开启。
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/kercylan98/minotaur/server"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
srv := server.New(server.NetworkWebsocket,
|
||||
server.WithDeadlockDetect(time.Second*5),
|
||||
)
|
||||
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
|
||||
time.Sleep(10 * time.Second)
|
||||
conn.Write(packet)
|
||||
})
|
||||
if err := srv.Run(":9999"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
```
|
||||
> 在开启死锁检测的时候需要设置一个合理的死锁怀疑时间,该时间内消息没有处理完毕则会触发死锁检测,并打印`WARN`级别的日志输出。
|
||||
|
||||
### 计时器
|
||||
在默认的`server.Server`不会包含计时器功能,可通过`server.WithTicker`进行开启,例如:
|
||||
```go
|
||||
package main
|
||||
|
||||
import "github.com/kercylan98/minotaur/server"
|
||||
|
||||
func main() {
|
||||
srv := server.New(server.NetworkWebsocket, server.WithTicker(50, false))
|
||||
if err := srv.Run(":9999"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
```
|
||||
也可以通过`timer.GetTicker`获取计时器进行使用,例如:
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/kercylan98/minotaur/utils/timer"
|
||||
"github.com/kercylan98/minotaur/utils/times"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var ticker = timer.GetTicker(10)
|
||||
var wait sync.WaitGroup
|
||||
wait.Add(3)
|
||||
ticker.Loop("LOOP", timer.Instantly, times.Second, timer.Forever, func() {
|
||||
fmt.Println("LOOP")
|
||||
wait.Done()
|
||||
})
|
||||
wait.Wait()
|
||||
}
|
||||
```
|
||||
|
||||
### 持续更新的示例项目
|
||||
- **[Minotaur-Example](https://github.com/kercylan98/minotaur-example)**
|
||||
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"github.com/kercylan98/minotaur/server"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
srv := server.New(server.NetworkWebsocket,
|
||||
server.WithDeadlockDetect(time.Second*5),
|
||||
)
|
||||
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
|
||||
time.Sleep(10 * time.Second)
|
||||
conn.Write(packet)
|
||||
})
|
||||
if err := srv.Run(":9999"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package main
|
||||
|
||||
import "github.com/kercylan98/minotaur/server"
|
||||
|
||||
func main() {
|
||||
srv := server.New(server.NetworkWebsocket,
|
||||
server.WithShunt(func(guid int64) chan *server.Message {
|
||||
return make(chan *server.Message, 1024*100)
|
||||
}, func(conn *server.Conn) (guid int64, allowToCreate bool) {
|
||||
guid, allowToCreate = conn.GetData("roomId").(int64)
|
||||
return
|
||||
}),
|
||||
)
|
||||
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
|
||||
conn.Write(packet)
|
||||
})
|
||||
if err := srv.Run(":9999"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package main
|
||||
|
||||
import "github.com/kercylan98/minotaur/server"
|
||||
|
||||
func main() {
|
||||
srv := server.New(server.NetworkWebsocket)
|
||||
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
|
||||
conn.Write(packet)
|
||||
})
|
||||
if err := srv.Run(":9999"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
package main
|
||||
|
||||
import "github.com/kercylan98/minotaur/server"
|
||||
|
||||
func main() {
|
||||
srv := server.New(server.NetworkWebsocket, server.WithTicker(50, false))
|
||||
if err := srv.Run(":9999"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/kercylan98/minotaur/utils/timer"
|
||||
"github.com/kercylan98/minotaur/utils/times"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var ticker = timer.GetTicker(10)
|
||||
var wait sync.WaitGroup
|
||||
wait.Add(3)
|
||||
ticker.Loop("LOOP", timer.Instantly, times.Second, timer.Forever, func() {
|
||||
fmt.Println("LOOP")
|
||||
wait.Done()
|
||||
})
|
||||
wait.Wait()
|
||||
}
|
7
go.mod
7
go.mod
|
@ -63,11 +63,12 @@ require (
|
|||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/arch v0.3.0 // indirect
|
||||
golang.org/x/crypto v0.12.0 // indirect
|
||||
golang.org/x/crypto v0.13.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326 // indirect
|
||||
golang.org/x/net v0.14.0 // indirect
|
||||
golang.org/x/sys v0.11.0 // indirect
|
||||
golang.org/x/text v0.12.0 // indirect
|
||||
golang.org/x/sys v0.12.0 // indirect
|
||||
golang.org/x/term v0.12.0 // indirect
|
||||
golang.org/x/text v0.13.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
|
||||
google.golang.org/protobuf v1.30.0 // indirect
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
|
||||
|
|
8
go.sum
8
go.sum
|
@ -215,6 +215,8 @@ golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPh
|
|||
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
|
||||
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
|
||||
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
|
||||
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326 h1:QfTh0HpN6hlw6D3vu8DAwC8pBIwikq0AI1evdm+FksE=
|
||||
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
|
||||
|
@ -258,12 +260,18 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
|||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
|
||||
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
|
||||
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.12.0 h1:/ZfYdc3zq+q02Rv9vGqTeSItdzZTSNDmfTi0mBAuidU=
|
||||
golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
|
||||
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
|
||||
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
|
||||
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/kercylan98/minotaur/utils/concurrent"
|
||||
"sync"
|
||||
)
|
||||
|
@ -155,7 +156,11 @@ func (slf *Client) writeLoop(wait *sync.WaitGroup) {
|
|||
}()
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
slf.Close(err.(error))
|
||||
err, isErr := err.(error)
|
||||
if !isErr {
|
||||
err = fmt.Errorf("%v", err)
|
||||
}
|
||||
slf.Close(err)
|
||||
}
|
||||
}()
|
||||
wait.Done()
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
package client_test
|
||||
|
||||
import (
|
||||
"github.com/kercylan98/minotaur/server"
|
||||
"github.com/kercylan98/minotaur/server/client"
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestClient_WriteWS(t *testing.T) {
|
||||
var wait sync.WaitGroup
|
||||
wait.Add(1)
|
||||
srv := server.New(server.NetworkWebsocket)
|
||||
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
|
||||
srv.Shutdown()
|
||||
})
|
||||
srv.RegStopEvent(func(srv *server.Server) {
|
||||
wait.Done()
|
||||
})
|
||||
srv.RegMessageReadyEvent(func(srv *server.Server) {
|
||||
cli := client.NewWebsocket("ws://127.0.0.1:9999")
|
||||
cli.RegConnectionOpenedEvent(func(conn *client.Client) {
|
||||
conn.WriteWS(2, []byte("Hello"))
|
||||
})
|
||||
if err := cli.Run(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
})
|
||||
if err := srv.Run(":9999"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
wait.Wait()
|
||||
}
|
|
@ -5,6 +5,8 @@ import (
|
|||
"github.com/kercylan98/minotaur/utils/log"
|
||||
"github.com/kercylan98/minotaur/utils/runtimes"
|
||||
"github.com/kercylan98/minotaur/utils/slice"
|
||||
"golang.org/x/crypto/ssh/terminal"
|
||||
"os"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
|
@ -27,6 +29,7 @@ type ShuntChannelCreatedEventHandle func(srv *Server, guid int64)
|
|||
type ShuntChannelClosedEventHandle func(srv *Server, guid int64)
|
||||
type ConnectionPacketPreprocessEventHandle func(srv *Server, conn *Conn, packet []byte, abort func(), usePacket func(newPacket []byte))
|
||||
type MessageExecBeforeEventHandle func(srv *Server, message *Message) bool
|
||||
type MessageReadyEventHandle func(srv *Server)
|
||||
|
||||
func newEvent(srv *Server) *event {
|
||||
return &event{
|
||||
|
@ -46,6 +49,7 @@ func newEvent(srv *Server) *event {
|
|||
shuntChannelClosedEventHandles: slice.NewPriority[ShuntChannelClosedEventHandle](),
|
||||
connectionPacketPreprocessEventHandles: slice.NewPriority[ConnectionPacketPreprocessEventHandle](),
|
||||
messageExecBeforeEventHandles: slice.NewPriority[MessageExecBeforeEventHandle](),
|
||||
messageReadyEventHandles: slice.NewPriority[MessageReadyEventHandle](),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -66,6 +70,7 @@ type event struct {
|
|||
shuntChannelClosedEventHandles *slice.Priority[ShuntChannelClosedEventHandle]
|
||||
connectionPacketPreprocessEventHandles *slice.Priority[ConnectionPacketPreprocessEventHandle]
|
||||
messageExecBeforeEventHandles *slice.Priority[MessageExecBeforeEventHandle]
|
||||
messageReadyEventHandles *slice.Priority[MessageReadyEventHandle]
|
||||
|
||||
consoleCommandEventHandles map[string]*slice.Priority[ConsoleCommandEventHandle]
|
||||
consoleCommandEventHandleInitOnce sync.Once
|
||||
|
@ -88,6 +93,12 @@ func (slf *event) OnStopEvent() {
|
|||
// - 默认将注册 "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN" 指令作为关闭服务器的指令
|
||||
// - 可通过注册默认指令进行默认行为的覆盖
|
||||
func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEventHandle, priority ...int) {
|
||||
fd := int(os.Stdin.Fd())
|
||||
if !terminal.IsTerminal(fd) {
|
||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("ignore", "system not terminal"))
|
||||
return
|
||||
}
|
||||
|
||||
slf.consoleCommandEventHandleInitOnce.Do(func() {
|
||||
slf.consoleCommandEventHandles = map[string]*slice.Priority[ConsoleCommandEventHandle]{}
|
||||
go func() {
|
||||
|
@ -147,6 +158,7 @@ func (slf *event) OnStartBeforeEvent() {
|
|||
}
|
||||
|
||||
// RegStartFinishEvent 在服务器启动完成时将立刻执行被注册的事件处理函数
|
||||
// - 需要注意该时刻服务器已经启动完成,但是还有可能未开始处理消息,客户端有可能无法连接,如果需要在消息处理器准备就绪后执行,请使用 RegMessageReadyEvent 函数
|
||||
func (slf *event) RegStartFinishEvent(handle StartFinishEventHandle, priority ...int) {
|
||||
slf.startFinishEventHandles.Append(handle, slice.GetValue(priority, 0))
|
||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
||||
|
@ -365,7 +377,6 @@ func (slf *event) RegMessageExecBeforeEvent(handle MessageExecBeforeEventHandle,
|
|||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
||||
}
|
||||
|
||||
// OnMessageExecBeforeEvent 执行消息处理前的事件处理函数
|
||||
func (slf *event) OnMessageExecBeforeEvent(message *Message) bool {
|
||||
if slf.messageExecBeforeEventHandles.Len() == 0 {
|
||||
return true
|
||||
|
@ -384,6 +395,27 @@ func (slf *event) OnMessageExecBeforeEvent(message *Message) bool {
|
|||
return result
|
||||
}
|
||||
|
||||
// RegMessageReadyEvent 在服务器消息处理器准备就绪时立即执行被注册的事件处理函数
|
||||
func (slf *event) RegMessageReadyEvent(handle MessageReadyEventHandle, priority ...int) {
|
||||
slf.messageReadyEventHandles.Append(handle, slice.GetValue(priority, 0))
|
||||
}
|
||||
|
||||
func (slf *event) OnMessageReadyEvent() {
|
||||
if slf.messageReadyEventHandles.Len() == 0 {
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
log.Error("Server", log.String("OnMessageReadyEvent", fmt.Sprintf("%v", err)))
|
||||
debug.PrintStack()
|
||||
}
|
||||
}()
|
||||
slf.messageReadyEventHandles.RangeValue(func(index int, value MessageReadyEventHandle) bool {
|
||||
value(slf.Server)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (slf *event) check() {
|
||||
switch slf.network {
|
||||
case NetworkHttp, NetworkGRPC, NetworkNone:
|
||||
|
|
|
@ -341,6 +341,10 @@ func (slf *Server) Run(addr string) error {
|
|||
)
|
||||
log.Info("Server", log.String(serverMark, "===================================================================="))
|
||||
slf.OnStartFinishEvent()
|
||||
time.Sleep(time.Second)
|
||||
if !slf.isShutdown.Load() {
|
||||
slf.OnMessageReadyEvent()
|
||||
}
|
||||
|
||||
signal.Notify(slf.systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
|
||||
select {
|
||||
|
@ -354,6 +358,10 @@ func (slf *Server) Run(addr string) error {
|
|||
}
|
||||
} else {
|
||||
slf.OnStartFinishEvent()
|
||||
time.Sleep(time.Second)
|
||||
if !slf.isShutdown.Load() {
|
||||
slf.OnMessageReadyEvent()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"github.com/kercylan98/minotaur/server/client"
|
||||
"github.com/kercylan98/minotaur/utils/times"
|
||||
"golang.org/x/time/rate"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
@ -31,17 +30,21 @@ func TestNew(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestNewClient(t *testing.T) {
|
||||
var total atomic.Int64
|
||||
for i := 0; i < 1000; i++ {
|
||||
id := i
|
||||
fmt.Println("启动", i+1)
|
||||
cli := client.NewWebsocket("ws://127.0.0.1:9999")
|
||||
cli.RegConnectionReceivePacketEvent(func(conn *client.Client, wst int, packet []byte) {
|
||||
fmt.Println(string(packet))
|
||||
fmt.Println("收到", id+1, string(packet))
|
||||
})
|
||||
cli.RegConnectionOpenedEvent(func(conn *client.Client) {
|
||||
go func() {
|
||||
for i < 1000 {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
for {
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
cli.WriteWS(2, []byte("hello"))
|
||||
total.Add(1)
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"github.com/kercylan98/minotaur/utils/log"
|
||||
"go.uber.org/zap"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewPool[T any](bufferSize int, generator func() T, releaser func(data T)) *Pool[T] {
|
||||
|
@ -52,12 +53,13 @@ func (slf *Pool[T]) Get() T {
|
|||
slf.mutex.Unlock()
|
||||
return data
|
||||
}
|
||||
slf.warn++
|
||||
slf.mutex.Unlock()
|
||||
if slf.warn >= 256 {
|
||||
now := time.Now().Unix()
|
||||
if now-slf.warn >= 1 {
|
||||
log.Warn("Pool", log.String("Get", "the number of buffer members is insufficient, consider whether it is due to unreleased or inappropriate buffer size"), zap.Stack("stack"))
|
||||
slf.warn = 0
|
||||
slf.warn = now
|
||||
}
|
||||
slf.mutex.Unlock()
|
||||
|
||||
return slf.generator()
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
package survey
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Analyzer 分析器
|
||||
type Analyzer struct {
|
||||
v map[string]float64
|
||||
repeat map[string]struct{}
|
||||
subs map[string]*Analyzer
|
||||
m sync.Mutex
|
||||
}
|
||||
|
||||
// Sub 获取子分析器
|
||||
func (slf *Analyzer) Sub(key string) *Analyzer {
|
||||
slf.m.Lock()
|
||||
defer slf.m.Unlock()
|
||||
if slf.subs == nil {
|
||||
slf.subs = make(map[string]*Analyzer)
|
||||
}
|
||||
sub, e := slf.subs[key]
|
||||
if !e {
|
||||
sub = &Analyzer{}
|
||||
slf.subs[key] = sub
|
||||
}
|
||||
return sub
|
||||
}
|
||||
|
||||
// Increase 在指定 key 现有值的基础上增加 recordKey 的值
|
||||
func (slf *Analyzer) Increase(key string, record R, recordKey string) {
|
||||
slf.m.Lock()
|
||||
defer slf.m.Unlock()
|
||||
if !record.Exist(recordKey) {
|
||||
return
|
||||
}
|
||||
if slf.v == nil {
|
||||
slf.v = make(map[string]float64)
|
||||
}
|
||||
v, e := slf.v[key]
|
||||
if !e {
|
||||
slf.v[key] = record.GetFloat64(recordKey)
|
||||
return
|
||||
}
|
||||
slf.v[key] = v + record.GetFloat64(recordKey)
|
||||
}
|
||||
|
||||
// IncreaseValue 在指定 key 现有值的基础上增加 value
|
||||
func (slf *Analyzer) IncreaseValue(key string, value float64) {
|
||||
slf.m.Lock()
|
||||
defer slf.m.Unlock()
|
||||
if slf.v == nil {
|
||||
slf.v = make(map[string]float64)
|
||||
}
|
||||
slf.v[key] += value
|
||||
}
|
||||
|
||||
// IncreaseNonRepeat 在指定 key 现有值的基础上增加 recordKey 的值,但是当去重维度 dimension 相同时,不会增加
|
||||
func (slf *Analyzer) IncreaseNonRepeat(key string, record R, recordKey string, dimension ...string) {
|
||||
slf.m.Lock()
|
||||
if !record.Exist(recordKey) {
|
||||
slf.m.Unlock()
|
||||
return
|
||||
}
|
||||
if slf.repeat == nil {
|
||||
slf.repeat = make(map[string]struct{})
|
||||
}
|
||||
dvs := make([]string, 0, len(dimension))
|
||||
for _, v := range dimension {
|
||||
dvs = append(dvs, record.GetString(v))
|
||||
}
|
||||
dk := strings.Join(dvs, "_")
|
||||
if _, e := slf.repeat[dk]; e {
|
||||
slf.m.Unlock()
|
||||
return
|
||||
}
|
||||
slf.m.Unlock()
|
||||
slf.Increase(key, record, recordKey)
|
||||
}
|
||||
|
||||
// IncreaseValueNonRepeat 在指定 key 现有值的基础上增加 value,但是当去重维度 dimension 相同时,不会增加
|
||||
func (slf *Analyzer) IncreaseValueNonRepeat(key string, record R, value float64, dimension ...string) {
|
||||
slf.m.Lock()
|
||||
if slf.repeat == nil {
|
||||
slf.repeat = make(map[string]struct{})
|
||||
}
|
||||
dvs := make([]string, 0, len(dimension))
|
||||
for _, v := range dimension {
|
||||
dvs = append(dvs, record.GetString(v))
|
||||
}
|
||||
dk := strings.Join(dvs, "_")
|
||||
if _, e := slf.repeat[dk]; e {
|
||||
slf.m.Unlock()
|
||||
return
|
||||
}
|
||||
slf.m.Unlock()
|
||||
slf.IncreaseValue(key, value)
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
package survey_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/kercylan98/minotaur/utils/log/survey"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestClose(t *testing.T) {
|
||||
path := `./test/day.2023-09-06.log`
|
||||
|
||||
report := survey.Analyze(path, func(analyzer *survey.Analyzer, record survey.R) {
|
||||
switch record.GetString("type") {
|
||||
case "open_conn":
|
||||
analyzer.IncreaseValueNonRepeat("开播人数", record, 1, "live_id")
|
||||
case "report_rank":
|
||||
analyzer.IncreaseValue("开始游戏次数", 1)
|
||||
analyzer.Increase("开播时长", record, "game_time")
|
||||
analyzer.Sub(record.GetString("live_id")).IncreaseValue("开始游戏次数", 1)
|
||||
analyzer.Sub(record.GetString("live_id")).Increase("开播时长", record, "game_time")
|
||||
case "statistics":
|
||||
analyzer.IncreaseValueNonRepeat("活跃人数", record, 1, "active_player")
|
||||
analyzer.IncreaseValueNonRepeat("评论人数", record, 1, "comment_player")
|
||||
analyzer.IncreaseValueNonRepeat("点赞人数", record, 1, "like_player")
|
||||
analyzer.Sub(record.GetString("live_id")).IncreaseValueNonRepeat("活跃人数", record, 1, "active_player")
|
||||
analyzer.Sub(record.GetString("live_id")).IncreaseValueNonRepeat("评论人数", record, 1, "comment_player")
|
||||
analyzer.Sub(record.GetString("live_id")).IncreaseValueNonRepeat("点赞人数", record, 1, "like_player")
|
||||
|
||||
giftId := record.GetString("gift.gift_id")
|
||||
if len(giftId) > 0 {
|
||||
giftPrice := record.GetFloat64("gift.price")
|
||||
giftCount := record.GetFloat64("gift.count")
|
||||
giftSender := record.GetString("gift.gift_senders")
|
||||
|
||||
analyzer.IncreaseValue("礼物总价值", giftPrice*giftCount)
|
||||
analyzer.IncreaseValueNonRepeat(fmt.Sprintf("送礼人数_%s", giftId), record, 1, giftSender)
|
||||
analyzer.IncreaseValue(fmt.Sprintf("礼物总数_%s", giftId), giftCount)
|
||||
|
||||
analyzer.Sub(record.GetString("live_id")).IncreaseValue("礼物总价值", giftPrice*giftCount)
|
||||
analyzer.Sub(record.GetString("live_id")).IncreaseValueNonRepeat(fmt.Sprintf("送礼人数_%s", giftId), record, 1, giftSender)
|
||||
analyzer.Sub(record.GetString("live_id")).IncreaseValue(fmt.Sprintf("礼物总数_%s", giftId), giftCount)
|
||||
}
|
||||
|
||||
}
|
||||
})
|
||||
|
||||
fmt.Println(report.FilterSub("warzone0009"))
|
||||
}
|
|
@ -18,7 +18,8 @@ func (slf R) Get(key string) Result {
|
|||
|
||||
// Exist 判断指定 key 是否存在
|
||||
func (slf R) Exist(key string) bool {
|
||||
return slf.Get(key).Exists()
|
||||
v := slf.Get(key)
|
||||
return v.Exists() && len(v.String()) > 0
|
||||
}
|
||||
|
||||
// GetString 该函数为 Get(key).String() 的简写
|
|
@ -0,0 +1,72 @@
|
|||
package survey
|
||||
|
||||
import (
|
||||
"github.com/kercylan98/minotaur/utils/super"
|
||||
)
|
||||
|
||||
func newReport(analyzer *Analyzer) *Report {
|
||||
report := &Report{
|
||||
analyzer: analyzer,
|
||||
Name: "ROOT",
|
||||
Values: analyzer.v,
|
||||
Subs: make([]*Report, 0, len(analyzer.subs)),
|
||||
}
|
||||
for k, v := range analyzer.subs {
|
||||
sub := newReport(v)
|
||||
sub.Name = k
|
||||
report.Subs = append(report.Subs, sub)
|
||||
}
|
||||
return report
|
||||
}
|
||||
|
||||
// Report 分析报告
|
||||
type Report struct {
|
||||
analyzer *Analyzer
|
||||
Name string // 报告名称(默认为 ROOT)
|
||||
Values map[string]float64 `json:"Values,omitempty"`
|
||||
Subs []*Report `json:"Reports,omitempty"`
|
||||
}
|
||||
|
||||
// ReserveSub 仅保留特定名称子报告
|
||||
func (slf *Report) ReserveSub(names ...string) *Report {
|
||||
report := newReport(slf.analyzer)
|
||||
var newSub []*Report
|
||||
for _, sub := range slf.Subs {
|
||||
var exist bool
|
||||
for _, name := range names {
|
||||
if sub.Name == name {
|
||||
exist = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if exist {
|
||||
newSub = append(newSub, sub)
|
||||
}
|
||||
}
|
||||
report.Subs = newSub
|
||||
return report
|
||||
}
|
||||
|
||||
// FilterSub 过滤特定名称的子报告
|
||||
func (slf *Report) FilterSub(names ...string) *Report {
|
||||
report := newReport(slf.analyzer)
|
||||
var newSub []*Report
|
||||
for _, sub := range slf.Subs {
|
||||
var exist bool
|
||||
for _, name := range names {
|
||||
if sub.Name == name {
|
||||
exist = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !exist {
|
||||
newSub = append(newSub, sub)
|
||||
}
|
||||
}
|
||||
report.Subs = newSub
|
||||
return report
|
||||
}
|
||||
|
||||
func (slf *Report) String() string {
|
||||
return string(super.MarshalIndentJSON(slf, "", " "))
|
||||
}
|
|
@ -113,14 +113,17 @@ func Close(names ...string) {
|
|||
}
|
||||
}
|
||||
|
||||
// AllWithPath 处理特定记录器特定日期的所有记录,当发生错误时,会发生 panic
|
||||
// Analyze 分析特定文件的记录,当发生错误时,会发生 panic
|
||||
// - handle 为并行执行的,需要自行处理并发安全
|
||||
// - 适用于外部进程对于日志文件的读取,但是需要注意的是,此时日志文件可能正在被写入,所以可能会读取到错误的数据
|
||||
func AllWithPath(filePath string, handle func(record R)) {
|
||||
func Analyze(filePath string, handle func(analyzer *Analyzer, record R)) *Report {
|
||||
analyzer := new(Analyzer)
|
||||
err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
|
||||
handle(R(s))
|
||||
handle(analyzer, R(s))
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return newReport(analyzer)
|
||||
}
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
package survey_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/kercylan98/minotaur/utils/log/survey"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestRecord(t *testing.T) {
|
||||
_ = os.MkdirAll("./test", os.ModePerm)
|
||||
survey.Reg("GLOBAL_DATA", "./test/global_data.log")
|
||||
now := time.Now()
|
||||
//for i := 0; i < 100000000; i++ {
|
||||
// survey.Record("GLOBAL_DATA", map[string]any{
|
||||
// "joinTime": time.Now().Unix(),
|
||||
// "action": random.Int64(1, 999),
|
||||
// })
|
||||
// // 每500w flush一次
|
||||
// if i%5000000 == 0 {
|
||||
// survey.Flush()
|
||||
// }
|
||||
//}
|
||||
//survey.Flush()
|
||||
//
|
||||
var i atomic.Int64
|
||||
survey.All("GLOBAL_DATA", time.Now(), func(record survey.R) bool {
|
||||
i.Add(record.Get("action").Int())
|
||||
return true
|
||||
})
|
||||
fmt.Println("write cost:", time.Since(now), i.Load())
|
||||
}
|
||||
|
||||
// Line: 30000001, time: 1.45s
|
File diff suppressed because it is too large
Load Diff
|
@ -1 +0,0 @@
|
|||
2023-08-22 19:34:15 - {"action":1,"joinTime":1692704055}
|
|
@ -345,3 +345,13 @@ func Filter[T any](a []T, filterHandle func(a T) bool) []T {
|
|||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Mapping 将切片中的元素进行转换
|
||||
// - mappingHandle 返回转换后的元素
|
||||
func Mapping[T any, R any](a []T, mappingHandle func(a T) R) []R {
|
||||
var result []R
|
||||
for _, a := range a {
|
||||
result = append(result, mappingHandle(a))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue