1// Copyright 2018 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package jsonrpc2
6
7import (
8	"bufio"
9	"context"
10	"encoding/json"
11	"fmt"
12	"io"
13	"strconv"
14	"strings"
15	"sync"
16)
17
18// Stream abstracts the transport mechanics from the JSON RPC protocol.
19// A Conn reads and writes messages using the stream it was provided on
20// construction, and assumes that each call to Read or Write fully transfers
21// a single message, or returns an error.
22type Stream interface {
23	// Read gets the next message from the stream.
24	// It is never called concurrently.
25	Read(context.Context) ([]byte, error)
26	// Write sends a message to the stream.
27	// It must be safe for concurrent use.
28	Write(context.Context, []byte) error
29}
30
31// NewStream returns a Stream built on top of an io.Reader and io.Writer
32// The messages are sent with no wrapping, and rely on json decode consistency
33// to determine message boundaries.
34func NewStream(in io.Reader, out io.Writer) Stream {
35	return &plainStream{
36		in:  json.NewDecoder(in),
37		out: out,
38	}
39}
40
41type plainStream struct {
42	in    *json.Decoder
43	outMu sync.Mutex
44	out   io.Writer
45}
46
47func (s *plainStream) Read(ctx context.Context) ([]byte, error) {
48	select {
49	case <-ctx.Done():
50		return nil, ctx.Err()
51	default:
52	}
53	var raw json.RawMessage
54	if err := s.in.Decode(&raw); err != nil {
55		return nil, err
56	}
57	return raw, nil
58}
59
60func (s *plainStream) Write(ctx context.Context, data []byte) error {
61	select {
62	case <-ctx.Done():
63		return ctx.Err()
64	default:
65	}
66	s.outMu.Lock()
67	_, err := s.out.Write(data)
68	s.outMu.Unlock()
69	return err
70}
71
72// NewHeaderStream returns a Stream built on top of an io.Reader and io.Writer
73// The messages are sent with HTTP content length and MIME type headers.
74// This is the format used by LSP and others.
75func NewHeaderStream(in io.Reader, out io.Writer) Stream {
76	return &headerStream{
77		in:  bufio.NewReader(in),
78		out: out,
79	}
80}
81
82type headerStream struct {
83	in    *bufio.Reader
84	outMu sync.Mutex
85	out   io.Writer
86}
87
88func (s *headerStream) Read(ctx context.Context) ([]byte, error) {
89	select {
90	case <-ctx.Done():
91		return nil, ctx.Err()
92	default:
93	}
94	var length int64
95	// read the header, stop on the first empty line
96	for {
97		line, err := s.in.ReadString('\n')
98		if err != nil {
99			return nil, fmt.Errorf("failed reading header line %q", err)
100		}
101		line = strings.TrimSpace(line)
102		// check we have a header line
103		if line == "" {
104			break
105		}
106		colon := strings.IndexRune(line, ':')
107		if colon < 0 {
108			return nil, fmt.Errorf("invalid header line %q", line)
109		}
110		name, value := line[:colon], strings.TrimSpace(line[colon+1:])
111		switch name {
112		case "Content-Length":
113			if length, err = strconv.ParseInt(value, 10, 32); err != nil {
114				return nil, fmt.Errorf("failed parsing Content-Length: %v", value)
115			}
116			if length <= 0 {
117				return nil, fmt.Errorf("invalid Content-Length: %v", length)
118			}
119		default:
120			// ignoring unknown headers
121		}
122	}
123	if length == 0 {
124		return nil, fmt.Errorf("missing Content-Length header")
125	}
126	data := make([]byte, length)
127	if _, err := io.ReadFull(s.in, data); err != nil {
128		return nil, err
129	}
130	return data, nil
131}
132
133func (s *headerStream) Write(ctx context.Context, data []byte) error {
134	select {
135	case <-ctx.Done():
136		return ctx.Err()
137	default:
138	}
139	s.outMu.Lock()
140	_, err := fmt.Fprintf(s.out, "Content-Length: %v\r\n\r\n", len(data))
141	if err == nil {
142		_, err = s.out.Write(data)
143	}
144	s.outMu.Unlock()
145	return err
146}
147