Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.

Commit 2b0af5a

Browse files
authored
Add retries to coder agent start cmd (#316)
* Add retries to coder agent start cmd * Add more error logs
1 parent c1a9994 commit 2b0af5a

File tree

6 files changed

+293
-205
lines changed

6 files changed

+293
-205
lines changed

agent/doc.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
// Package agent is for interacting with p2p server and clients
2+
package agent

agent/server.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/url"
7+
"time"
8+
9+
"cdr.dev/slog"
10+
"github.com/hashicorp/yamux"
11+
"go.coder.com/retry"
12+
"golang.org/x/xerrors"
13+
"nhooyr.io/websocket"
14+
)
15+
16+
const (
17+
listenRoute = "/api/private/envagent/listen"
18+
)
19+
20+
// Server connects to a Coder deployment and listens for p2p connections.
21+
type Server struct {
22+
log slog.Logger
23+
listenURL *url.URL
24+
}
25+
26+
// ServerArgs are the required arguments to create an agent server.
27+
type ServerArgs struct {
28+
Log slog.Logger
29+
CoderURL *url.URL
30+
Token string
31+
}
32+
33+
// NewServer creates a new agent server.
34+
func NewServer(args ServerArgs) (*Server, error) {
35+
lURL, err := formatListenURL(args.CoderURL, args.Token)
36+
if err != nil {
37+
return nil, xerrors.Errorf("formatting listen url: %w", err)
38+
}
39+
40+
return &Server{
41+
log: args.Log,
42+
listenURL: lURL,
43+
}, nil
44+
}
45+
46+
// Run will listen and proxy new peer connections on a retry loop.
47+
func (s *Server) Run(ctx context.Context) error {
48+
err := retry.New(time.Second).Context(ctx).Backoff(15 * time.Second).Run(func() error {
49+
ctx, cancelFunc := context.WithTimeout(ctx, time.Second*15)
50+
defer cancelFunc()
51+
s.log.Info(ctx, "connecting to coder", slog.F("url", s.listenURL.String()))
52+
conn, _, err := websocket.Dial(ctx, s.listenURL.String(), nil)
53+
if err != nil {
54+
return fmt.Errorf("dial: %w", err)
55+
}
56+
nc := websocket.NetConn(context.Background(), conn, websocket.MessageBinary)
57+
session, err := yamux.Server(nc, nil)
58+
if err != nil {
59+
return fmt.Errorf("open: %w", err)
60+
}
61+
s.log.Info(ctx, "connected to coder. awaiting connection requests")
62+
for {
63+
st, err := session.AcceptStream()
64+
if err != nil {
65+
return fmt.Errorf("accept stream: %w", err)
66+
}
67+
stream := &stream{
68+
logger: s.log.Named(fmt.Sprintf("stream %d", st.StreamID())),
69+
stream: st,
70+
}
71+
go stream.listen()
72+
}
73+
})
74+
75+
return err
76+
}
77+
78+
func formatListenURL(coderURL *url.URL, token string) (*url.URL, error) {
79+
if coderURL.Scheme != "http" && coderURL.Scheme != "https" {
80+
return nil, xerrors.Errorf("invalid URL scheme")
81+
}
82+
83+
coderURL.Path = listenRoute
84+
q := coderURL.Query()
85+
q.Set("service_token", token)
86+
coderURL.RawQuery = q.Encode()
87+
88+
return coderURL, nil
89+
}

agent/stream.go

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"net"
9+
10+
"cdr.dev/slog"
11+
"github.com/hashicorp/yamux"
12+
"github.com/pion/webrtc/v3"
13+
"golang.org/x/xerrors"
14+
15+
"cdr.dev/coder-cli/internal/x/xwebrtc"
16+
"cdr.dev/coder-cli/pkg/proto"
17+
)
18+
19+
type stream struct {
20+
stream *yamux.Stream
21+
logger slog.Logger
22+
23+
rtc *webrtc.PeerConnection
24+
}
25+
26+
// writes an error and closes.
27+
func (s *stream) fatal(err error) {
28+
_ = s.write(proto.Message{
29+
Error: err.Error(),
30+
})
31+
s.logger.Error(context.Background(), err.Error(), slog.Error(err))
32+
_ = s.stream.Close()
33+
}
34+
35+
func (s *stream) listen() {
36+
decoder := json.NewDecoder(s.stream)
37+
for {
38+
var msg proto.Message
39+
err := decoder.Decode(&msg)
40+
if err == io.EOF {
41+
break
42+
}
43+
if err != nil {
44+
s.fatal(err)
45+
return
46+
}
47+
s.processMessage(msg)
48+
}
49+
}
50+
51+
func (s *stream) write(msg proto.Message) error {
52+
d, err := json.Marshal(&msg)
53+
if err != nil {
54+
return err
55+
}
56+
_, err = s.stream.Write(d)
57+
if err != nil {
58+
return err
59+
}
60+
return nil
61+
}
62+
63+
func (s *stream) processMessage(msg proto.Message) {
64+
s.logger.Debug(context.Background(), "processing message", slog.F("msg", msg))
65+
66+
if msg.Error != "" {
67+
s.fatal(xerrors.New(msg.Error))
68+
return
69+
}
70+
71+
if msg.Candidate != "" {
72+
if s.rtc == nil {
73+
s.fatal(xerrors.New("rtc connection must be started before candidates are sent"))
74+
return
75+
}
76+
77+
s.logger.Debug(context.Background(), "accepted ice candidate", slog.F("candidate", msg.Candidate))
78+
err := proto.AcceptICECandidate(s.rtc, &msg)
79+
if err != nil {
80+
s.fatal(err)
81+
return
82+
}
83+
}
84+
85+
if msg.Offer != nil {
86+
rtc, err := xwebrtc.NewPeerConnection()
87+
if err != nil {
88+
s.fatal(fmt.Errorf("create connection: %w", err))
89+
return
90+
}
91+
flushCandidates := proto.ProxyICECandidates(rtc, s.stream)
92+
93+
err = rtc.SetRemoteDescription(*msg.Offer)
94+
if err != nil {
95+
s.fatal(fmt.Errorf("set remote desc: %w", err))
96+
return
97+
}
98+
answer, err := rtc.CreateAnswer(nil)
99+
if err != nil {
100+
s.fatal(fmt.Errorf("create answer: %w", err))
101+
return
102+
}
103+
err = rtc.SetLocalDescription(answer)
104+
if err != nil {
105+
s.fatal(fmt.Errorf("set local desc: %w", err))
106+
return
107+
}
108+
flushCandidates()
109+
110+
err = s.write(proto.Message{
111+
Answer: rtc.LocalDescription(),
112+
})
113+
if err != nil {
114+
s.fatal(fmt.Errorf("send local desc: %w", err))
115+
return
116+
}
117+
118+
rtc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) {
119+
s.logger.Info(context.Background(), "state changed", slog.F("new", pcs))
120+
})
121+
rtc.OnDataChannel(s.processDataChannel)
122+
s.rtc = rtc
123+
}
124+
}
125+
126+
func (s *stream) processDataChannel(channel *webrtc.DataChannel) {
127+
if channel.Protocol() == "ping" {
128+
channel.OnOpen(func() {
129+
rw, err := channel.Detach()
130+
if err != nil {
131+
return
132+
}
133+
d := make([]byte, 64)
134+
_, err = rw.Read(d)
135+
if err != nil {
136+
s.logger.Error(context.Background(), "read ping", slog.Error(err))
137+
return
138+
}
139+
_, err = rw.Write(d)
140+
if err != nil {
141+
s.logger.Error(context.Background(), "write ping", slog.Error(err))
142+
return
143+
}
144+
})
145+
return
146+
}
147+
148+
prto, port, err := xwebrtc.ParseProxyDataChannel(channel)
149+
if err != nil {
150+
s.fatal(fmt.Errorf("failed to parse proxy data channel: %w", err))
151+
return
152+
}
153+
if prto != "tcp" {
154+
s.fatal(fmt.Errorf("client provided unsupported protocol: %s", prto))
155+
return
156+
}
157+
158+
conn, err := net.Dial(prto, fmt.Sprintf("localhost:%d", port))
159+
if err != nil {
160+
s.fatal(fmt.Errorf("failed to dial client port: %d", port))
161+
return
162+
}
163+
164+
channel.OnOpen(func() {
165+
s.logger.Debug(context.Background(), "proxying data channel to local port", slog.F("port", port))
166+
rw, err := channel.Detach()
167+
if err != nil {
168+
_ = channel.Close()
169+
s.logger.Error(context.Background(), "detach client data channel", slog.Error(err))
170+
return
171+
}
172+
go func() {
173+
_, err = io.Copy(rw, conn)
174+
if err != nil {
175+
s.logger.Error(context.Background(), "copy to conn", slog.Error(err))
176+
}
177+
}()
178+
go func() {
179+
_, _ = io.Copy(conn, rw)
180+
if err != nil {
181+
s.logger.Error(context.Background(), "copy from conn", slog.Error(err))
182+
}
183+
}()
184+
})
185+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ require (
1717
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4
1818
github.com/rjeczalik/notify v0.9.2
1919
github.com/spf13/cobra v1.1.3
20+
go.coder.com/retry v1.2.0
2021
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
2122
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005
2223
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,8 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
358358
go.coder.com/cli v0.4.0/go.mod h1:hRTOURCR3LJF1FRW9arecgrzX+AHG7mfYMwThPIgq+w=
359359
go.coder.com/flog v0.0.0-20190906214207-47dd47ea0512 h1:DjCS6dRQh+1PlfiBmnabxfdrzenb0tAwJqFxDEH/s9g=
360360
go.coder.com/flog v0.0.0-20190906214207-47dd47ea0512/go.mod h1:83JsYgXYv0EOaXjIMnaZ1Fl6ddNB3fJnDZ/8845mUJ8=
361+
go.coder.com/retry v1.2.0 h1:ODdUPu9cb9pcbeAM5j2YqJHUgfFbN60vmhtlWIKZGLo=
362+
go.coder.com/retry v1.2.0/go.mod h1:ihkJszQk8F+yaFL2pcIku9MzbYo+U8vka4IsvQSXVfE=
361363
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
362364
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
363365
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=

0 commit comments

Comments
 (0)