Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.

Commit 3545f94

Browse files
committed
wip
1 parent 43edc2f commit 3545f94

File tree

7 files changed

+363
-207
lines changed

7 files changed

+363
-207
lines changed

agent/stream.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package agent
22

33
import (
4+
"cdr.dev/coder-cli/xwebrtc"
45
"context"
56
"encoding/json"
67
"fmt"
@@ -12,7 +13,6 @@ import (
1213
"github.com/pion/webrtc/v3"
1314
"golang.org/x/xerrors"
1415

15-
"cdr.dev/coder-cli/internal/x/xwebrtc"
1616
"cdr.dev/coder-cli/pkg/proto"
1717
)
1818

@@ -128,6 +128,10 @@ func (s *stream) processMessage(msg proto.Message) {
128128
}
129129

130130
func (s *stream) processDataChannel(channel *webrtc.DataChannel) {
131+
if channel.Protocol() == "control" {
132+
return
133+
}
134+
131135
if channel.Protocol() == "ping" {
132136
channel.OnOpen(func() {
133137
rw, err := channel.Detach()

internal/cmd/tunnel.go

Lines changed: 32 additions & 186 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,19 @@
11
package cmd
22

33
import (
4+
"cdr.dev/coder-cli/coder-sdk"
5+
"cdr.dev/coder-cli/internal/x/xcobra"
6+
"cdr.dev/coder-cli/xwebrtc"
7+
"cdr.dev/slog"
8+
"cdr.dev/slog/sloggers/sloghuman"
49
"context"
5-
"encoding/json"
610
"fmt"
11+
"github.com/spf13/cobra"
12+
"golang.org/x/xerrors"
713
"io"
814
"net"
9-
"net/url"
1015
"os"
1116
"strconv"
12-
"time"
13-
14-
"cdr.dev/slog"
15-
"cdr.dev/slog/sloggers/sloghuman"
16-
"github.com/pion/webrtc/v3"
17-
"github.com/spf13/cobra"
18-
"golang.org/x/xerrors"
19-
"nhooyr.io/websocket"
20-
21-
"cdr.dev/coder-cli/coder-sdk"
22-
"cdr.dev/coder-cli/internal/x/xcobra"
23-
"cdr.dev/coder-cli/internal/x/xwebrtc"
24-
"cdr.dev/coder-cli/pkg/proto"
2517
)
2618

2719
func tunnelCmd() *cobra.Command {
@@ -74,15 +66,13 @@ coder tunnel my-dev 3000 3000
7466
return xerrors.Errorf("No workspace found by name '%s'", args[0])
7567
}
7668

77-
c := &client{
78-
id: envID,
79-
stdio: args[2] == "stdio",
80-
localPort: uint16(localPort),
81-
remotePort: uint16(remotePort),
82-
ctx: context.Background(),
83-
logger: log.Leveled(slog.LevelDebug),
84-
brokerAddr: baseURL,
85-
token: sdk.Token(),
69+
c := &tunnneler{
70+
workspaceID: envID,
71+
wc: xwebrtc.NewWorkspaceClient(log.Leveled(slog.LevelDebug), &baseURL, sdk.Token()),
72+
stdio: args[2] == "stdio",
73+
localPort: uint16(localPort),
74+
remotePort: uint16(remotePort),
75+
ctx: ctx,
8676
}
8777

8878
err = c.start()
@@ -97,197 +87,53 @@ coder tunnel my-dev 3000 3000
9787
return cmd
9888
}
9989

100-
type client struct {
101-
ctx context.Context
102-
brokerAddr url.URL
103-
token string
104-
logger slog.Logger
105-
id string
106-
remotePort uint16
107-
localPort uint16
108-
stdio bool
90+
type tunnneler struct {
91+
ctx context.Context
92+
wc *xwebrtc.WorkspaceClient
93+
workspaceID string
94+
remotePort uint16
95+
localPort uint16
96+
stdio bool
10997
}
11098

111-
func (c *client) start() error {
112-
url := fmt.Sprintf("%s%s%s%s%s", c.brokerAddr.String(), "/api/private/envagent/", c.id, "/connect?session_token=", c.token)
113-
turnScheme := "turns"
114-
if c.brokerAddr.Scheme == "http" {
115-
turnScheme = "turn"
116-
}
117-
tcpProxy := fmt.Sprintf("%s:%s:5349?transport=tcp", turnScheme, c.brokerAddr.Host)
118-
c.logger.Info(c.ctx, "connecting to broker", slog.F("url", url), slog.F("tcp-proxy", tcpProxy))
119-
conn, resp, err := websocket.Dial(c.ctx, url, nil)
120-
if err != nil && resp == nil {
121-
return fmt.Errorf("dial: %w", err)
122-
}
123-
if err != nil && resp != nil {
124-
return &coder.HTTPError{
125-
Response: resp,
126-
}
127-
}
128-
nconn := websocket.NetConn(context.Background(), conn, websocket.MessageBinary)
129-
130-
// Only enabled under a private feature flag for now,
131-
// so insecure connections are entirely fine to allow.
132-
servers := []webrtc.ICEServer{{
133-
URLs: []string{tcpProxy},
134-
Username: "insecure",
135-
Credential: "pass",
136-
CredentialType: webrtc.ICECredentialTypePassword,
137-
}}
138-
rtc, err := xwebrtc.NewPeerConnection(servers)
139-
if err != nil {
140-
return fmt.Errorf("create connection: %w", err)
141-
}
142-
143-
rtc.OnNegotiationNeeded(func() {
144-
c.logger.Debug(context.Background(), "negotiation needed...")
145-
})
146-
147-
rtc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) {
148-
c.logger.Info(context.Background(), "connection state changed", slog.F("state", pcs))
149-
})
150-
151-
channel, err := xwebrtc.NewProxyDataChannel(rtc, "forwarder", "tcp", c.remotePort)
152-
if err != nil {
153-
return fmt.Errorf("create data channel: %w", err)
154-
}
155-
flushCandidates := proto.ProxyICECandidates(rtc, nconn)
156-
157-
localDesc, err := rtc.CreateOffer(&webrtc.OfferOptions{})
158-
if err != nil {
159-
return fmt.Errorf("create offer: %w", err)
160-
}
161-
162-
err = rtc.SetLocalDescription(localDesc)
163-
if err != nil {
164-
return fmt.Errorf("set local desc: %w", err)
165-
}
166-
167-
c.logger.Debug(context.Background(), "writing offer")
168-
b, _ := json.Marshal(&proto.Message{
169-
Offer: &localDesc,
170-
Servers: servers,
171-
})
172-
_, err = nconn.Write(b)
99+
func (c *tunnneler) start() error {
100+
nc, err := c.wc.DialContext(c.ctx, xwebrtc.NetworkTCP, xwebrtc.FormatWorkspaceAddr(c.workspaceID, c.remotePort))
173101
if err != nil {
174-
return fmt.Errorf("write offer: %w", err)
175-
}
176-
flushCandidates()
177-
178-
go func() {
179-
err = xwebrtc.WaitForDataChannelOpen(context.Background(), channel)
180-
if err != nil {
181-
c.logger.Fatal(context.Background(), "waiting for data channel open", slog.Error(err))
182-
}
183-
_ = conn.Close(websocket.StatusNormalClosure, "rtc connected")
184-
}()
185-
186-
decoder := json.NewDecoder(nconn)
187-
for {
188-
var msg proto.Message
189-
err = decoder.Decode(&msg)
190-
if err == io.EOF {
191-
break
192-
}
193-
if websocket.CloseStatus(err) == websocket.StatusNormalClosure {
194-
break
195-
}
196-
if err != nil {
197-
return fmt.Errorf("read msg: %w", err)
198-
}
199-
if msg.Candidate != "" {
200-
c.logger.Debug(context.Background(), "accepted ice candidate", slog.F("candidate", msg.Candidate))
201-
err = proto.AcceptICECandidate(rtc, &msg)
202-
if err != nil {
203-
return fmt.Errorf("accept ice: %w", err)
204-
}
205-
}
206-
if msg.Answer != nil {
207-
c.logger.Debug(context.Background(), "got answer", slog.F("answer", msg.Answer))
208-
err = rtc.SetRemoteDescription(*msg.Answer)
209-
if err != nil {
210-
return fmt.Errorf("set remote: %w", err)
211-
}
212-
}
213-
}
214-
215-
// Once we're open... let's test out the ping.
216-
pingProto := "ping"
217-
pingChannel, err := rtc.CreateDataChannel("pinger", &webrtc.DataChannelInit{
218-
Protocol: &pingProto,
219-
})
220-
if err != nil {
221-
return fmt.Errorf("create ping channel")
102+
return fmt.Errorf("dial: %w", err)
222103
}
223-
pingChannel.OnOpen(func() {
224-
defer func() {
225-
_ = pingChannel.Close()
226-
}()
227-
t1 := time.Now()
228-
rw, _ := pingChannel.Detach()
229-
defer func() {
230-
_ = rw.Close()
231-
}()
232-
_, _ = rw.Write([]byte("hello"))
233-
b := make([]byte, 64)
234-
_, _ = rw.Read(b)
235-
c.logger.Info(c.ctx, "your latency directly to the agent", slog.F("ms", time.Since(t1).Milliseconds()))
236-
})
237104

105+
// proxy via stdio
238106
if c.stdio {
239-
// At this point the RTC is connected and data channel is opened...
240-
rw, err := channel.Detach()
241-
if err != nil {
242-
return fmt.Errorf("detach channel: %w", err)
243-
}
244107
go func() {
245-
_, _ = io.Copy(rw, os.Stdin)
108+
_, _ = io.Copy(nc, os.Stdin)
246109
}()
247-
_, err = io.Copy(os.Stdout, rw)
110+
_, err = io.Copy(os.Stdout, nc)
248111
if err != nil {
249112
return fmt.Errorf("copy: %w", err)
250113
}
251114
return nil
252115
}
253116

117+
// proxy via tcp listener
254118
listener, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", c.localPort))
255119
if err != nil {
256120
return fmt.Errorf("listen: %w", err)
257121
}
258122

259123
for {
260-
conn, err := listener.Accept()
124+
lc, err := listener.Accept()
261125
if err != nil {
262126
return fmt.Errorf("accept: %w", err)
263127
}
264128
go func() {
265129
defer func() {
266-
_ = conn.Close()
130+
_ = lc.Close()
267131
}()
268-
channel, err := xwebrtc.NewProxyDataChannel(rtc, "forwarder", "tcp", c.remotePort)
269-
if err != nil {
270-
c.logger.Warn(context.Background(), "create data channel for proxying", slog.Error(err))
271-
return
272-
}
273-
defer func() {
274-
_ = channel.Close()
275-
}()
276-
err = xwebrtc.WaitForDataChannelOpen(context.Background(), channel)
277-
if err != nil {
278-
c.logger.Warn(context.Background(), "wait for data channel open", slog.Error(err))
279-
return
280-
}
281-
rw, err := channel.Detach()
282-
if err != nil {
283-
c.logger.Warn(context.Background(), "detach channel", slog.Error(err))
284-
return
285-
}
286132

287133
go func() {
288-
_, _ = io.Copy(conn, rw)
134+
_, _ = io.Copy(lc, nc)
289135
}()
290-
_, _ = io.Copy(rw, conn)
136+
_, _ = io.Copy(nc, lc)
291137
}()
292138
}
293139
}

internal/x/xwebrtc/conn.go

Lines changed: 0 additions & 20 deletions
This file was deleted.

internal/x/xwebrtc/channel.go renamed to xwebrtc/channel.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,15 @@ func NewProxyDataChannel(conn *webrtc.PeerConnection, name, protocol string, por
3939
})
4040
}
4141

42+
func NewControlDataChannel(conn *webrtc.PeerConnection) (*webrtc.DataChannel, error) {
43+
proto := "control"
44+
ordered := true
45+
return conn.CreateDataChannel("control", &webrtc.DataChannelInit{
46+
Protocol: &proto,
47+
Ordered: &ordered,
48+
})
49+
}
50+
4251
// ParseProxyDataChannel parses a data channel to get the protocol and port.
4352
func ParseProxyDataChannel(channel *webrtc.DataChannel) (string, uint16, error) {
4453
if channel.Protocol() == "" {
@@ -54,3 +63,16 @@ func ParseProxyDataChannel(channel *webrtc.DataChannel) (string, uint16, error)
5463
}
5564
return host, uint16(p), nil
5665
}
66+
67+
// NewPeerConnection creates a new peer connection.
68+
// It uses the Google stun server by default.
69+
func NewPeerConnection(servers []webrtc.ICEServer) (*webrtc.PeerConnection, error) {
70+
se := webrtc.SettingEngine{}
71+
se.DetachDataChannels()
72+
se.SetICETimeouts(time.Second*5, time.Second*5, time.Second*2)
73+
api := webrtc.NewAPI(webrtc.WithSettingEngine(se))
74+
75+
return api.NewPeerConnection(webrtc.Configuration{
76+
ICEServers: servers,
77+
})
78+
}

0 commit comments

Comments
 (0)