1// Copyright 2018 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package jsonrpc2 6 7import ( 8 "bufio" 9 "context" 10 "encoding/json" 11 "fmt" 12 "io" 13 "strconv" 14 "strings" 15 "sync" 16) 17 18// Stream abstracts the transport mechanics from the JSON RPC protocol. 19// A Conn reads and writes messages using the stream it was provided on 20// construction, and assumes that each call to Read or Write fully transfers 21// a single message, or returns an error. 22type Stream interface { 23 // Read gets the next message from the stream. 24 // It is never called concurrently. 25 Read(context.Context) ([]byte, error) 26 // Write sends a message to the stream. 27 // It must be safe for concurrent use. 28 Write(context.Context, []byte) error 29} 30 31// NewStream returns a Stream built on top of an io.Reader and io.Writer 32// The messages are sent with no wrapping, and rely on json decode consistency 33// to determine message boundaries. 34func NewStream(in io.Reader, out io.Writer) Stream { 35 return &plainStream{ 36 in: json.NewDecoder(in), 37 out: out, 38 } 39} 40 41type plainStream struct { 42 in *json.Decoder 43 outMu sync.Mutex 44 out io.Writer 45} 46 47func (s *plainStream) Read(ctx context.Context) ([]byte, error) { 48 select { 49 case <-ctx.Done(): 50 return nil, ctx.Err() 51 default: 52 } 53 var raw json.RawMessage 54 if err := s.in.Decode(&raw); err != nil { 55 return nil, err 56 } 57 return raw, nil 58} 59 60func (s *plainStream) Write(ctx context.Context, data []byte) error { 61 select { 62 case <-ctx.Done(): 63 return ctx.Err() 64 default: 65 } 66 s.outMu.Lock() 67 _, err := s.out.Write(data) 68 s.outMu.Unlock() 69 return err 70} 71 72// NewHeaderStream returns a Stream built on top of an io.Reader and io.Writer 73// The messages are sent with HTTP content length and MIME type headers. 74// This is the format used by LSP and others. 75func NewHeaderStream(in io.Reader, out io.Writer) Stream { 76 return &headerStream{ 77 in: bufio.NewReader(in), 78 out: out, 79 } 80} 81 82type headerStream struct { 83 in *bufio.Reader 84 outMu sync.Mutex 85 out io.Writer 86} 87 88func (s *headerStream) Read(ctx context.Context) ([]byte, error) { 89 select { 90 case <-ctx.Done(): 91 return nil, ctx.Err() 92 default: 93 } 94 var length int64 95 // read the header, stop on the first empty line 96 for { 97 line, err := s.in.ReadString('\n') 98 if err != nil { 99 return nil, fmt.Errorf("failed reading header line %q", err) 100 } 101 line = strings.TrimSpace(line) 102 // check we have a header line 103 if line == "" { 104 break 105 } 106 colon := strings.IndexRune(line, ':') 107 if colon < 0 { 108 return nil, fmt.Errorf("invalid header line %q", line) 109 } 110 name, value := line[:colon], strings.TrimSpace(line[colon+1:]) 111 switch name { 112 case "Content-Length": 113 if length, err = strconv.ParseInt(value, 10, 32); err != nil { 114 return nil, fmt.Errorf("failed parsing Content-Length: %v", value) 115 } 116 if length <= 0 { 117 return nil, fmt.Errorf("invalid Content-Length: %v", length) 118 } 119 default: 120 // ignoring unknown headers 121 } 122 } 123 if length == 0 { 124 return nil, fmt.Errorf("missing Content-Length header") 125 } 126 data := make([]byte, length) 127 if _, err := io.ReadFull(s.in, data); err != nil { 128 return nil, err 129 } 130 return data, nil 131} 132 133func (s *headerStream) Write(ctx context.Context, data []byte) error { 134 select { 135 case <-ctx.Done(): 136 return ctx.Err() 137 default: 138 } 139 s.outMu.Lock() 140 _, err := fmt.Fprintf(s.out, "Content-Length: %v\r\n\r\n", len(data)) 141 if err == nil { 142 _, err = s.out.Write(data) 143 } 144 s.outMu.Unlock() 145 return err 146} 147