feat: 新版 server 包 HTTP 基础实现

This commit is contained in:
kercylan98 2024-03-20 23:48:46 +08:00
parent 7239a278ee
commit b2c0bb0da3
18 changed files with 390 additions and 438 deletions

3
go.mod
View File

@ -36,6 +36,9 @@ require (
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.14.0 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gobwas/ws v1.3.2 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect

6
go.sum
View File

@ -45,6 +45,12 @@ github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg
github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
github.com/go-resty/resty/v2 v2.11.0 h1:i7jMfNOJYMp69lq7qozJP+bjgzfAzeOhuGlyDrqxT/8=
github.com/go-resty/resty/v2 v2.11.0/go.mod h1:iiP/OpA0CkcL3IGt1O0+/SIItFUbkkyw5BGXiVdTu+A=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.3.2 h1:zlnbNHxumkRvfPWgfXu8RBwyNR1x8wh9cf5PTOCqs9Q=
github.com/gobwas/ws v1.3.2/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=

59
server/v2/conn.go Normal file
View File

@ -0,0 +1,59 @@
package server
import (
"context"
"github.com/kercylan98/minotaur/utils/log"
"net"
"unsafe"
)
type Conn interface {
net.Conn
}
type conn struct {
net.Conn
cs *connections
ctx context.Context
cancel context.CancelFunc
idx int
}
func (c *conn) init(ctx context.Context, cs *connections, conn net.Conn, idx int) *conn {
c.Conn = conn
c.cs = cs
c.ctx, c.cancel = context.WithCancel(ctx)
c.idx = idx
return c
}
func (c *conn) awaitRead() {
defer func() { _ = c.Close() }()
const bufferSize = 4096
buf := make([]byte, bufferSize) // 避免频繁的内存分配,初始化一个固定大小的缓冲区
for {
select {
case <-c.ctx.Done():
return
default:
ptr := unsafe.Pointer(&buf[0])
n, err := c.Read((*[bufferSize]byte)(ptr)[:])
if err != nil {
log.Error("READ", err)
return
}
if n > 0 {
if _, err := c.Write(buf[:n]); err != nil {
log.Error("Write", err)
}
}
}
}
}
func (c *conn) Close() (err error) {
c.cs.Event() <- c
return
}

111
server/v2/connections.go Normal file
View File

@ -0,0 +1,111 @@
package server
import (
"context"
"github.com/kercylan98/minotaur/utils/log"
"net"
"time"
)
// connections 结构体用于管理连接
type connections struct {
ctx context.Context // 上下文对象,用于取消连接管理器
ch chan any // 事件通道,用于接收连接管理器的操作事件
items []*conn // 连接列表,存储所有打开的连接
gap []int // 连接空隙,记录已关闭的连接索引,用于重用索引
}
// 初始化连接管理器
func (cs *connections) init(ctx context.Context) *connections {
cs.ctx = ctx
cs.ch = make(chan any, 1024)
cs.items = make([]*conn, 0, 128)
go cs.awaitRun()
return cs
}
// 清理连接列表中的空隙
func (cs *connections) clearGap() {
cs.gap = cs.gap[:0]
var gap = make([]int, 0, len(cs.items))
for i, c := range cs.items {
if c == nil {
continue
}
c.idx = i
gap = append(gap, i)
}
cs.gap = gap
}
// 打开新连接
func (cs *connections) open(c net.Conn) error {
// 如果存在连接空隙,则重用连接空隙中的索引,否则分配新的索引
var idx int
var reuse bool
if len(cs.gap) > 0 {
idx = cs.gap[0]
cs.gap = cs.gap[1:]
reuse = true
} else {
idx = len(cs.items)
}
conn := new(conn).init(cs.ctx, cs, c, idx)
if reuse {
cs.items[idx] = conn
} else {
cs.items = append(cs.items, conn)
}
go conn.awaitRead()
return nil
}
// 关闭连接
func (cs *connections) close(c *conn) error {
if c == nil {
return nil
}
defer c.cancel()
// 如果连接索引是连接列表的最后一个索引,则直接删除连接对象,否则将连接对象置空,并将索引添加到连接空隙中
if c.idx == len(cs.items)-1 {
cs.items = cs.items[:c.idx]
} else {
cs.items[c.idx] = nil
cs.gap = append(cs.gap, c.idx)
}
return c.Conn.Close()
}
// 等待连接管理器的事件并处理
func (cs *connections) awaitRun() {
clearGapTicker := time.NewTicker(time.Second * 30)
defer clearGapTicker.Stop()
for {
select {
case <-cs.ctx.Done():
return
case <-clearGapTicker.C:
cs.clearGap()
case a := <-cs.ch:
var err error
switch v := a.(type) {
case *conn:
err = cs.close(v)
case net.Conn:
err = cs.open(v)
}
if err != nil {
log.Error("connections.awaitRun", log.Any("err", err))
}
}
}
}
// Event 获取连接管理器的事件通道
func (cs *connections) Event() chan<- any {
return cs.ch
}

9
server/v2/core.go Normal file
View File

@ -0,0 +1,9 @@
package server
type Core interface {
connectionManager
}
type connectionManager interface {
Event() chan<- any
}

View File

@ -1,69 +0,0 @@
package server
import (
"github.com/panjf2000/ants/v2"
"github.com/panjf2000/gnet/v2"
"time"
)
func newEventHandler(options *Options, trafficker Trafficker) (handler *eventHandler, err error) {
var wp *ants.Pool
if wp, err = ants.NewPool(ants.DefaultAntsPoolSize, ants.WithNonblocking(true)); err != nil {
return
}
handler = &eventHandler{
options: options,
trafficker: trafficker,
workerPool: wp,
}
return
}
type (
Trafficker interface {
OnBoot(options *Options) error
OnTraffic(c gnet.Conn, packet []byte)
}
eventHandler struct {
options *Options
trafficker Trafficker
workerPool *ants.Pool
}
)
func (e *eventHandler) OnBoot(eng gnet.Engine) (action gnet.Action) {
if err := e.trafficker.OnBoot(e.options); err != nil {
action = gnet.Shutdown
}
return
}
func (e *eventHandler) OnShutdown(eng gnet.Engine) {
return
}
func (e *eventHandler) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
return
}
func (e *eventHandler) OnClose(c gnet.Conn, err error) (action gnet.Action) {
return
}
func (e *eventHandler) OnTraffic(c gnet.Conn) (action gnet.Action) {
buf, err := c.Next(-1)
if err != nil {
return
}
var packet = make([]byte, len(buf))
copy(packet, buf)
err = e.workerPool.Submit(func() {
e.trafficker.OnTraffic(c, packet)
})
return
}
func (e *eventHandler) OnTick() (delay time.Duration, action gnet.Action) {
return
}

11
server/v2/network.go Normal file
View File

@ -0,0 +1,11 @@
package server
import "context"
type Network interface {
OnSetup(ctx context.Context, core Core) error
OnRun(ctx context.Context) error
OnShutdown() error
}

53
server/v2/network/http.go Normal file
View File

@ -0,0 +1,53 @@
package network
import (
"context"
"github.com/kercylan98/minotaur/server/v2"
"github.com/pkg/errors"
"net"
"net/http"
"time"
)
func Http(addr string) server.Network {
return HttpWithHandler(addr, &HttpServe{ServeMux: http.NewServeMux()})
}
func HttpWithHandler[H http.Handler](addr string, handler H) server.Network {
c := &httpCore[H]{
addr: addr,
handler: handler,
srv: &http.Server{
Addr: addr,
Handler: handler,
DisableGeneralOptionsHandler: false,
},
}
return c
}
type httpCore[H http.Handler] struct {
addr string
handler H
srv *http.Server
}
func (h *httpCore[H]) OnSetup(ctx context.Context, core server.Core) (err error) {
h.srv.BaseContext = func(listener net.Listener) context.Context {
return ctx
}
return
}
func (h *httpCore[H]) OnRun(ctx context.Context) (err error) {
if err = h.srv.ListenAndServe(); errors.Is(err, http.ErrServerClosed) {
err = nil
}
return
}
func (h *httpCore[H]) OnShutdown() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return h.srv.Shutdown(ctx)
}

View File

@ -0,0 +1,7 @@
package network
import "net/http"
type HttpServe struct {
*http.ServeMux
}

View File

@ -0,0 +1,57 @@
package network
import (
"context"
"fmt"
"github.com/gobwas/ws"
"github.com/kercylan98/minotaur/server/v2"
"net/http"
)
func WebSocket(addr, pattern string) server.Network {
return WebSocketWithHandler[*HttpServe](addr, &HttpServe{ServeMux: http.NewServeMux()}, func(handler *HttpServe, ws http.HandlerFunc) {
handler.Handle(fmt.Sprintf("GET %s", pattern), ws)
})
}
func WebSocketWithHandler[H http.Handler](addr string, handler H, upgraderHandlerFunc WebSocketUpgraderHandlerFunc[H]) server.Network {
c := &websocketCore[H]{
httpCore: HttpWithHandler(addr, handler).(*httpCore[H]),
upgraderHandlerFunc: upgraderHandlerFunc,
}
return c
}
type WebSocketUpgraderHandlerFunc[H http.Handler] func(handler H, ws http.HandlerFunc)
type websocketCore[H http.Handler] struct {
*httpCore[H]
upgraderHandlerFunc WebSocketUpgraderHandlerFunc[H]
core server.Core
}
func (w *websocketCore[H]) OnSetup(ctx context.Context, core server.Core) (err error) {
w.core = core
if err = w.httpCore.OnSetup(ctx, core); err != nil {
return
}
w.upgraderHandlerFunc(w.handler, w.onUpgrade)
return
}
func (w *websocketCore[H]) OnRun(ctx context.Context) error {
return w.httpCore.OnRun(ctx)
}
func (w *websocketCore[H]) OnShutdown() error {
return w.httpCore.OnShutdown()
}
func (w *websocketCore[H]) onUpgrade(writer http.ResponseWriter, request *http.Request) {
conn, _, _, err := ws.UpgradeHTTP(request, writer)
if err != nil {
return
}
w.core.Event() <- conn
}

View File

@ -1,4 +0,0 @@
package server
type Options struct {
}

View File

@ -1,20 +1,59 @@
package server
import "github.com/panjf2000/gnet/v2"
import (
"context"
"github.com/kercylan98/minotaur/utils/super"
"sync"
)
func NewServer(trafficker Trafficker) *Server {
srv := &Server{
trafficker: trafficker,
type Server interface {
Run() error
Shutdown() error
}
type server struct {
ctx *super.CancelContext
networks []Network
connections *connections
}
func NewServer(networks ...Network) Server {
srv := &server{
ctx: super.WithCancelContext(context.Background()),
networks: networks,
}
srv.connections = new(connections).init(srv.ctx)
return srv
}
type Server struct {
trafficker Trafficker
func (s *server) Run() (err error) {
for _, network := range s.networks {
if err = network.OnSetup(s.ctx, s.connections); err != nil {
return
}
}
var group = new(sync.WaitGroup)
for _, network := range s.networks {
group.Add(1)
go func(ctx *super.CancelContext, group *sync.WaitGroup, network Network) {
defer group.Done()
if err = network.OnRun(ctx); err != nil {
panic(err)
}
}(s.ctx, group, network)
}
group.Wait()
return
}
func (s *Server) Run(protoAddr string) (err error) {
var handler *eventHandler
handler, err = newEventHandler(new(Options), s.trafficker)
return gnet.Run(handler, protoAddr)
func (s *server) Shutdown() (err error) {
defer s.ctx.Cancel()
for _, network := range s.networks {
if err = network.OnShutdown(); err != nil {
return
}
}
return
}

View File

@ -3,27 +3,24 @@ package server_test
import (
"github.com/gin-gonic/gin"
"github.com/kercylan98/minotaur/server/v2"
"github.com/kercylan98/minotaur/server/v2/traffickers"
"github.com/kercylan98/minotaur/server/v2/network"
"net/http"
"testing"
)
func TestNewServer(t *testing.T) {
r := gin.New()
r := gin.Default()
r.GET("/", func(context *gin.Context) {
context.JSON(200, gin.H{
"ping": "pong",
})
})
srv := server.NewServer(traffickers.WebSocket(r, func(handler *gin.Engine, upgradeHandler func(writer http.ResponseWriter, request *http.Request) error) {
srv := server.NewServer(network.WebSocketWithHandler(":9999", r, func(handler *gin.Engine, ws http.HandlerFunc) {
handler.GET("/ws", func(context *gin.Context) {
if err := upgradeHandler(context.Writer, context.Request); err != nil {
context.AbortWithError(500, err)
}
ws(context.Writer, context.Request)
})
}))
if err := srv.Run("tcp://:8080"); err != nil {
if err := srv.Run(); err != nil {
panic(err)
}
}

View File

@ -1,56 +0,0 @@
package traffickers
import (
"bufio"
"bytes"
"github.com/kercylan98/minotaur/server/v2"
"github.com/kercylan98/minotaur/utils/hub"
"github.com/panjf2000/gnet/v2"
netHttp "net/http"
)
func Http[H netHttp.Handler](handler H) server.Trafficker {
return &http[H]{
handler: handler,
ncb: func(c gnet.Conn, err error) error {
return nil
},
}
}
type http[H netHttp.Handler] struct {
handler H
rwp *hub.ObjectPool[*httpResponseWriter]
ncb func(c gnet.Conn, err error) error
}
func (h *http[H]) OnBoot(options *server.Options) error {
h.rwp = hub.NewObjectPool[httpResponseWriter](func() *httpResponseWriter {
return new(httpResponseWriter)
}, func(data *httpResponseWriter) {
data.reset()
})
return nil
}
func (h *http[H]) OnTraffic(c gnet.Conn, packet []byte) {
var responseWriter *httpResponseWriter
defer func() {
if responseWriter == nil || !responseWriter.isHijack {
_ = c.Close()
}
}()
httpRequest, err := netHttp.ReadRequest(bufio.NewReader(bytes.NewReader(packet)))
if err != nil {
return
}
responseWriter = h.rwp.Get()
responseWriter.init(c)
h.handler.ServeHTTP(responseWriter, httpRequest)
if responseWriter.isHijack {
return
}
_ = responseWriter.Result().Write(c)
}

View File

@ -1,211 +0,0 @@
package traffickers
import (
"bufio"
"bytes"
"fmt"
"github.com/panjf2000/gnet/v2"
"io"
"net"
netHttp "net/http"
"net/textproto"
"strconv"
"strings"
"golang.org/x/net/http/httpguts"
)
type httpResponseWriter struct {
Code int
HeaderMap netHttp.Header
Body *bytes.Buffer
Flushed bool
conn *websocketConn
result *netHttp.Response
snapHeader netHttp.Header
wroteHeader bool
isHijack bool
}
func (rw *httpResponseWriter) init(c gnet.Conn) {
rw.conn = &websocketConn{Conn: c}
rw.Code = 200
rw.Body = new(bytes.Buffer)
rw.HeaderMap = make(netHttp.Header)
rw.isHijack = false
}
func (rw *httpResponseWriter) reset() {
rw.conn = nil
rw.Code = 200
rw.Body = nil
rw.HeaderMap = nil
rw.isHijack = false
}
func (rw *httpResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if !rw.isHijack {
return rw.conn, bufio.NewReadWriter(bufio.NewReader(rw.conn), bufio.NewWriter(rw.conn)), nil
}
return nil, nil, netHttp.ErrHijacked
}
func (rw *httpResponseWriter) Header() netHttp.Header {
m := rw.HeaderMap
if m == nil {
m = make(netHttp.Header)
rw.HeaderMap = m
}
return m
}
func (rw *httpResponseWriter) writeHeader(b []byte, str string) {
if rw.wroteHeader {
return
}
if len(str) > 512 {
str = str[:512]
}
m := rw.Header()
_, hasType := m["Content-Type"]
hasTE := m.Get("Transfer-Encoding") != ""
if !hasType && !hasTE {
if b == nil {
b = []byte(str)
}
m.Set("Content-Type", netHttp.DetectContentType(b))
}
rw.WriteHeader(200)
}
func (rw *httpResponseWriter) Write(buf []byte) (n int, err error) {
if rw.isHijack {
n = len(buf)
var wait = make(chan error)
if err = rw.conn.AsyncWrite(buf, func(c gnet.Conn, err error) error {
if err != nil {
wait <- err
}
return nil
}); err != nil {
return
}
err = <-wait
return
}
rw.writeHeader(buf, "")
if rw.Body != nil {
rw.Body.Write(buf)
}
return len(buf), nil
}
func (rw *httpResponseWriter) WriteString(str string) (int, error) {
rw.writeHeader(nil, str)
if rw.Body != nil {
rw.Body.WriteString(str)
}
return len(str), nil
}
func checkWriteHeaderCode(code int) {
if code < 100 || code > 999 {
panic(fmt.Sprintf("invalid WriteHeader code %v", code))
}
}
func (rw *httpResponseWriter) WriteHeader(code int) {
if rw.wroteHeader {
return
}
checkWriteHeaderCode(code)
rw.Code = code
rw.wroteHeader = true
if rw.HeaderMap == nil {
rw.HeaderMap = make(netHttp.Header)
}
rw.snapHeader = rw.HeaderMap.Clone()
}
func (rw *httpResponseWriter) Flush() {
if !rw.wroteHeader {
rw.WriteHeader(200)
}
rw.Flushed = true
}
func (rw *httpResponseWriter) Result() *netHttp.Response {
if rw.result != nil {
return rw.result
}
if rw.snapHeader == nil {
rw.snapHeader = rw.HeaderMap.Clone()
}
res := &netHttp.Response{
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
StatusCode: rw.Code,
Header: rw.snapHeader,
}
rw.result = res
if res.StatusCode == 0 {
res.StatusCode = 200
}
res.Status = fmt.Sprintf("%03d %s", res.StatusCode, netHttp.StatusText(res.StatusCode))
if rw.Body != nil {
res.Body = io.NopCloser(bytes.NewReader(rw.Body.Bytes()))
} else {
res.Body = netHttp.NoBody
}
res.ContentLength = parseContentLength(res.Header.Get("Content-Length"))
if trailers, ok := rw.snapHeader["Trailer"]; ok {
res.Trailer = make(netHttp.Header, len(trailers))
for _, k := range trailers {
for _, k := range strings.Split(k, ",") {
k = netHttp.CanonicalHeaderKey(textproto.TrimString(k))
if !httpguts.ValidTrailerHeader(k) {
// Ignore since forbidden by RFC 7230, section 4.1.2.
continue
}
vv, ok := rw.HeaderMap[k]
if !ok {
continue
}
vv2 := make([]string, len(vv))
copy(vv2, vv)
res.Trailer[k] = vv2
}
}
}
for k, vv := range rw.HeaderMap {
if !strings.HasPrefix(k, netHttp.TrailerPrefix) {
continue
}
if res.Trailer == nil {
res.Trailer = make(netHttp.Header)
}
for _, v := range vv {
res.Trailer.Add(strings.TrimPrefix(k, netHttp.TrailerPrefix), v)
}
}
return res
}
func parseContentLength(cl string) int64 {
cl = textproto.TrimString(cl)
if cl == "" {
return -1
}
n, err := strconv.ParseUint(cl, 10, 63)
if err != nil {
return -1
}
return int64(n)
}

View File

@ -1,64 +0,0 @@
package traffickers
import (
"fmt"
ws "github.com/gorilla/websocket"
"github.com/kercylan98/minotaur/server/v2"
"github.com/panjf2000/gnet/v2"
netHttp "net/http"
)
func WebSocket[H netHttp.Handler](handler H, binder func(handler H, upgradeHandler func(writer netHttp.ResponseWriter, request *netHttp.Request) error)) server.Trafficker {
w := &websocket[H]{
http: Http(handler).(*http[H]),
binder: binder,
upgrader: &ws.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
CheckOrigin: func(r *netHttp.Request) bool {
return true
},
},
}
binder(handler, w.OnUpgrade)
return w
}
type websocket[H netHttp.Handler] struct {
*http[H]
binder func(handler H, upgradeHandler func(writer netHttp.ResponseWriter, request *netHttp.Request) error)
upgrader *ws.Upgrader
}
func (w *websocket[H]) OnBoot(options *server.Options) error {
return w.http.OnBoot(options)
}
func (w *websocket[H]) OnTraffic(c gnet.Conn, packet []byte) {
w.http.OnTraffic(c, packet)
}
func (w *websocket[H]) OnUpgrade(writer netHttp.ResponseWriter, request *netHttp.Request) (err error) {
var (
ip string
conn *ws.Conn
)
ip = request.Header.Get("X-Real-IP")
conn, err = w.upgrader.Upgrade(writer, request, nil)
if err != nil {
return
}
fmt.Println("opened", ip)
go func() {
for {
mt, data, err := conn.ReadMessage()
if err != nil {
continue
}
conn.WriteMessage(mt, data)
}
}()
return nil
}

View File

@ -1,16 +0,0 @@
package traffickers
import (
"github.com/panjf2000/gnet/v2"
"time"
)
type websocketConn struct {
gnet.Conn
deadline time.Time
}
func (c *websocketConn) SetDeadline(t time.Time) error {
c.deadline = t
return nil
}

20
utils/super/context.go Normal file
View File

@ -0,0 +1,20 @@
package super
import "context"
func WithCancelContext(ctx context.Context) *CancelContext {
ctx, cancel := context.WithCancel(ctx)
return &CancelContext{
Context: ctx,
cancel: cancel,
}
}
type CancelContext struct {
context.Context
cancel func()
}
func (c *CancelContext) Cancel() {
c.cancel()
}