internal/lsp: decouple message processing from stream processing.
The means that the stream reader can move forward while a message is being processed. This will significantly improve responsivness and cancellation handling, and also allow message handlers to send messages themselves, reducing the need to spin up new go routines inside handlers. The flow control changes from blocking to failing when a server is busy, which removes the main current cause of deadlock, but may break non deadlock cases that currently wait if the queue is not sufficiently large. Change-Id: Ia73eb049b38d0651344abdbf16c477a8ce1a6fd1 Reviewed-on: https://go-review.googlesource.com/c/tools/+/170007 Reviewed-by: Rebecca Stambler <rstambler@golang.org>
This commit is contained in:
parent
a8f40b3f4d
commit
1f1f5f5d57
|
@ -23,6 +23,8 @@ type Conn struct {
|
||||||
Handler Handler
|
Handler Handler
|
||||||
Canceler Canceler
|
Canceler Canceler
|
||||||
Logger Logger
|
Logger Logger
|
||||||
|
Capacity int
|
||||||
|
RejectIfOverloaded bool
|
||||||
stream Stream
|
stream Stream
|
||||||
err error
|
err error
|
||||||
pendingMu sync.Mutex // protects the pending map
|
pendingMu sync.Mutex // protects the pending map
|
||||||
|
@ -31,6 +33,12 @@ type Conn struct {
|
||||||
handling map[ID]handling
|
handling map[ID]handling
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type queueEntry struct {
|
||||||
|
ctx context.Context
|
||||||
|
c *Conn
|
||||||
|
r *Request
|
||||||
|
}
|
||||||
|
|
||||||
// Handler is an option you can pass to NewConn to handle incoming requests.
|
// Handler is an option you can pass to NewConn to handle incoming requests.
|
||||||
// If the request returns false from IsNotify then the Handler must eventually
|
// If the request returns false from IsNotify then the Handler must eventually
|
||||||
// call Reply on the Conn with the supplied request.
|
// call Reply on the Conn with the supplied request.
|
||||||
|
@ -237,11 +245,37 @@ type combined struct {
|
||||||
Error *Error `json:"error,omitempty"`
|
Error *Error `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Conn) deliver(ctx context.Context, q chan queueEntry, request *Request) bool {
|
||||||
|
e := queueEntry{ctx: ctx, c: c, r: request}
|
||||||
|
if !c.RejectIfOverloaded {
|
||||||
|
q <- e
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case q <- e:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Run blocks until the connection is terminated, and returns any error that
|
// Run blocks until the connection is terminated, and returns any error that
|
||||||
// caused the termination.
|
// caused the termination.
|
||||||
// It must be called exactly once for each Conn.
|
// It must be called exactly once for each Conn.
|
||||||
// It returns only when the reader is closed or there is an error in the stream.
|
// It returns only when the reader is closed or there is an error in the stream.
|
||||||
func (c *Conn) Run(ctx context.Context) error {
|
func (c *Conn) Run(ctx context.Context) error {
|
||||||
|
q := make(chan queueEntry, c.Capacity)
|
||||||
|
defer close(q)
|
||||||
|
// start the queue processor
|
||||||
|
go func() {
|
||||||
|
// TODO: idle notification?
|
||||||
|
for e := range q {
|
||||||
|
if e.ctx.Err() != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
c.Handler(e.ctx, e.c, e.r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
for {
|
for {
|
||||||
// get the data for a message
|
// get the data for a message
|
||||||
data, err := c.stream.Read(ctx)
|
data, err := c.stream.Read(ctx)
|
||||||
|
@ -268,10 +302,11 @@ func (c *Conn) Run(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
if request.IsNotify() {
|
if request.IsNotify() {
|
||||||
c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil)
|
c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil)
|
||||||
// we have a Notify, forward to the handler in a go routine
|
// we have a Notify, add to the processor queue
|
||||||
c.Handler(ctx, c, request)
|
c.deliver(ctx, q, request)
|
||||||
|
//TODO: log when we drop a message?
|
||||||
} else {
|
} else {
|
||||||
// we have a Call, forward to the handler in another go routine
|
// we have a Call, add to the processor queue
|
||||||
reqCtx, cancelReq := context.WithCancel(ctx)
|
reqCtx, cancelReq := context.WithCancel(ctx)
|
||||||
c.handlingMu.Lock()
|
c.handlingMu.Lock()
|
||||||
c.handling[*request.ID] = handling{
|
c.handling[*request.ID] = handling{
|
||||||
|
@ -281,7 +316,10 @@ func (c *Conn) Run(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
c.handlingMu.Unlock()
|
c.handlingMu.Unlock()
|
||||||
c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil)
|
c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil)
|
||||||
c.Handler(reqCtx, c, request)
|
if !c.deliver(reqCtx, q, request) {
|
||||||
|
// queue is full, reject the message by directly replying
|
||||||
|
c.Reply(ctx, request, nil, NewErrorf(CodeServerOverloaded, "no room in queue"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case msg.ID != nil:
|
case msg.ID != nil:
|
||||||
// we have a response, get the pending entry from the map
|
// we have a response, get the pending entry from the map
|
||||||
|
|
|
@ -28,6 +28,10 @@ const (
|
||||||
CodeInvalidParams = -32602
|
CodeInvalidParams = -32602
|
||||||
// CodeInternalError is not currently returned but defined for completeness.
|
// CodeInternalError is not currently returned but defined for completeness.
|
||||||
CodeInternalError = -32603
|
CodeInternalError = -32603
|
||||||
|
|
||||||
|
//CodeServerOverloaded is returned when a message was refused due to a
|
||||||
|
//server being temporarily unable to accept any new messages.
|
||||||
|
CodeServerOverloaded = -32000
|
||||||
)
|
)
|
||||||
|
|
||||||
// Request is sent to a server to represent a Call or Notify operaton.
|
// Request is sent to a server to represent a Call or Notify operaton.
|
||||||
|
|
Loading…
Reference in New Issue