Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.

feat: Durability test RTC connections #329

Merged
merged 19 commits into from
May 3, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix test race
  • Loading branch information
kylecarbs committed May 2, 2021
commit 1d7d14c51ff0af06ddaf8f18e013dee9ee4bc1b9
33 changes: 22 additions & 11 deletions wsnet/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,31 +85,39 @@ func Dial(ctx context.Context, broker string, config *DialConfig) (*Dialer, erro
flushCandidates()

dialer := &Dialer{
ws: conn,
ctrl: ctrl,
rtc: rtc,
}

go func() {
err = waitForDataChannelOpen(ctx, ctrl)
if err != nil {
_ = conn.Close(websocket.StatusAbnormalClosure, "timeout")
return
}
dialer.ctrlrw, _ = ctrl.Detach()
_ = conn.Close(websocket.StatusNormalClosure, "connected")
}()

return dialer, dialer.negotiate(nconn)
}

type Dialer struct {
ws *websocket.Conn
ctrl *webrtc.DataChannel
ctrlrw datachannel.ReadWriteCloser
rtc *webrtc.PeerConnection
}

func (d *Dialer) negotiate(nconn net.Conn) (err error) {
decoder := json.NewDecoder(nconn)
errCh := make(chan error)
go func() {
defer close(errCh)
err := waitForDataChannelOpen(context.Background(), d.ctrl)
if err != nil {
_ = d.ws.Close(websocket.StatusAbnormalClosure, "timeout")
errCh <- err
return
}
d.ctrlrw, err = d.ctrl.Detach()
if err != nil {
errCh <- err
}
_ = d.ws.Close(websocket.StatusNormalClosure, "connected")
}()

for {
var msg protoMessage
err = decoder.Decode(&msg)
Expand Down Expand Up @@ -144,7 +152,7 @@ func (d *Dialer) negotiate(nconn net.Conn) (err error) {
}
return fmt.Errorf("unhandled message: %+v", msg)
}
return nil
return <-errCh
}

// Close closes the RTC connection.
Expand Down Expand Up @@ -173,6 +181,9 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
if err != nil {
return nil, fmt.Errorf("create data channel: %w", err)
}
dc.OnError(func(err error) {
fmt.Printf("We got err %+v\n", err)
})
err = waitForDataChannelOpen(ctx, dc)
if err != nil {
return nil, fmt.Errorf("wait for open: %w", err)
Expand Down