diff --git a/internal/jsonrpc2/jsonrpc2.go b/internal/jsonrpc2/jsonrpc2.go index 5c308292..95f9daae 100644 --- a/internal/jsonrpc2/jsonrpc2.go +++ b/internal/jsonrpc2/jsonrpc2.go @@ -19,16 +19,24 @@ import ( // Conn is a JSON RPC 2 client server connection. // Conn is bidirectional; it does not have a designated server or client end. type Conn struct { - seq int64 // must only be accessed using atomic operations - Handler Handler - Canceler Canceler - Logger Logger - stream Stream - err error - pendingMu sync.Mutex // protects the pending map - pending map[ID]chan *Response - handlingMu sync.Mutex // protects the handling map - handling map[ID]handling + seq int64 // must only be accessed using atomic operations + Handler Handler + Canceler Canceler + Logger Logger + Capacity int + RejectIfOverloaded bool + stream Stream + err error + pendingMu sync.Mutex // protects the pending map + pending map[ID]chan *Response + handlingMu sync.Mutex // protects the handling map + handling map[ID]handling +} + +type queueEntry struct { + ctx context.Context + c *Conn + r *Request } // Handler is an option you can pass to NewConn to handle incoming requests. @@ -237,11 +245,37 @@ type combined struct { Error *Error `json:"error,omitempty"` } +func (c *Conn) deliver(ctx context.Context, q chan queueEntry, request *Request) bool { + e := queueEntry{ctx: ctx, c: c, r: request} + if !c.RejectIfOverloaded { + q <- e + return true + } + select { + case q <- e: + return true + default: + return false + } +} + // Run blocks until the connection is terminated, and returns any error that // caused the termination. // It must be called exactly once for each Conn. // It returns only when the reader is closed or there is an error in the stream. func (c *Conn) Run(ctx context.Context) error { + q := make(chan queueEntry, c.Capacity) + defer close(q) + // start the queue processor + go func() { + // TODO: idle notification? + for e := range q { + if e.ctx.Err() != nil { + continue + } + c.Handler(e.ctx, e.c, e.r) + } + }() for { // get the data for a message data, err := c.stream.Read(ctx) @@ -268,10 +302,11 @@ func (c *Conn) Run(ctx context.Context) error { } if request.IsNotify() { c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil) - // we have a Notify, forward to the handler in a go routine - c.Handler(ctx, c, request) + // we have a Notify, add to the processor queue + c.deliver(ctx, q, request) + //TODO: log when we drop a message? } else { - // we have a Call, forward to the handler in another go routine + // we have a Call, add to the processor queue reqCtx, cancelReq := context.WithCancel(ctx) c.handlingMu.Lock() c.handling[*request.ID] = handling{ @@ -281,7 +316,10 @@ func (c *Conn) Run(ctx context.Context) error { } c.handlingMu.Unlock() c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil) - c.Handler(reqCtx, c, request) + if !c.deliver(reqCtx, q, request) { + // queue is full, reject the message by directly replying + c.Reply(ctx, request, nil, NewErrorf(CodeServerOverloaded, "no room in queue")) + } } case msg.ID != nil: // we have a response, get the pending entry from the map diff --git a/internal/jsonrpc2/wire.go b/internal/jsonrpc2/wire.go index bb59ad55..bcf4d652 100644 --- a/internal/jsonrpc2/wire.go +++ b/internal/jsonrpc2/wire.go @@ -28,6 +28,10 @@ const ( CodeInvalidParams = -32602 // CodeInternalError is not currently returned but defined for completeness. CodeInternalError = -32603 + + //CodeServerOverloaded is returned when a message was refused due to a + //server being temporarily unable to accept any new messages. + CodeServerOverloaded = -32000 ) // Request is sent to a server to represent a Call or Notify operaton.