Skip to content

Fix gRPC BoardList* methods concurrency issues #1804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added some doc comments in the source code
  • Loading branch information
cmaglie committed Aug 10, 2022
commit 601c243b611738e709fbb898e0589d67ab875be2
2 changes: 2 additions & 0 deletions arduino/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,8 @@ func (disc *PluggableDiscovery) Stop() error {

func (disc *PluggableDiscovery) stopSync() {
if disc.eventChan != nil {
// When stopping sync send a batch of "remove" events for
// all the active ports.
for _, port := range disc.cachedPorts {
disc.eventChan <- &Event{"remove", port, disc.GetID()}
}
Expand Down
24 changes: 16 additions & 8 deletions arduino/discovery/discoverymanager/discoverymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@ import (
"github.com/sirupsen/logrus"
)

// DiscoveryManager is required to handle multiple pluggable-discovery that
// may be shared across platforms
// DiscoveryManager manages the many-to-many communication between all pluggable
// discoveries and all watchers. Each PluggableDiscovery, once started, will
// produce a sequence of "events". These events will be broadcasted to all
// listening Watcher.
// The DiscoveryManager will not start the discoveries until the Start method
// is called.
type DiscoveryManager struct {
discoveriesMutex sync.Mutex
discoveries map[string]*discovery.PluggableDiscovery
discoveriesRunning bool
feed chan *discovery.Event
discoveries map[string]*discovery.PluggableDiscovery // all registered PluggableDiscovery
discoveriesRunning bool // set to true once discoveries are started
feed chan *discovery.Event // all events will pass through this channel
watchersMutex sync.Mutex
watchers map[*PortWatcher]bool
watchersCache map[string]map[string]*discovery.Event
watchers map[*PortWatcher]bool // all registered Watcher
watchersCache map[string]map[string]*discovery.Event // this is a cache of all active ports
}

var tr = i18n.Tr
Expand Down Expand Up @@ -85,7 +89,7 @@ func (dm *DiscoveryManager) Start() {
}

go func() {
// Feed all watchers with data coming from the discoveries
// Send all events coming from the feed channel to all active watchers
for ev := range dm.feed {
dm.feedEvent(ev)
}
Expand Down Expand Up @@ -152,11 +156,13 @@ func (dm *DiscoveryManager) Watch() (*PortWatcher, error) {
}
go func() {
dm.watchersMutex.Lock()
// When a watcher is started, send all the current active ports first...
for _, cache := range dm.watchersCache {
for _, ev := range cache {
watcher.feed <- ev
}
}
// ...and after that add the watcher to the list of watchers receiving events
dm.watchers[watcher] = true
dm.watchersMutex.Unlock()
}()
Expand All @@ -165,6 +171,7 @@ func (dm *DiscoveryManager) Watch() (*PortWatcher, error) {

func (dm *DiscoveryManager) startDiscovery(d *discovery.PluggableDiscovery) (discErr error) {
defer func() {
// If this function returns an error log it
if discErr != nil {
logrus.Errorf("Discovery %s failed to run: %s", d.GetID(), discErr)
}
Expand All @@ -179,6 +186,7 @@ func (dm *DiscoveryManager) startDiscovery(d *discovery.PluggableDiscovery) (dis
}

go func() {
// Transfer all incoming events from this discovery to the feed channel
for ev := range eventCh {
dm.feed <- ev
}
Expand Down