Skip to content

Commit 589b015

Browse files
committed
fix: sync clone port allocation(postgres-ai#133)
1 parent 6ec7720 commit 589b015

File tree

2 files changed

+94
-18
lines changed

2 files changed

+94
-18
lines changed

pkg/services/provision/mode_local.go

+38-18
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"path"
1212
"strconv"
1313
"strings"
14+
"sync"
15+
"sync/atomic"
1416
"time"
1517

1618
"github.com/docker/docker/api/types"
@@ -72,17 +74,18 @@ type provisionModeLocal struct {
7274
provision
7375
dockerClient *client.Client
7476
runner runners.Runner
77+
mu *sync.Mutex
7578
ports []bool
76-
sessionCounter uint
79+
sessionCounter uint32
7780
thinCloneManager thinclones.Manager
7881
}
7982

8083
// NewProvisionModeLocal creates a new Provision instance of ModeLocal.
8184
func NewProvisionModeLocal(ctx context.Context, config Config, dockerClient *client.Client) (Provision, error) {
8285
p := &provisionModeLocal{
83-
runner: runners.NewLocalRunner(config.ModeLocal.UseSudo),
84-
sessionCounter: 0,
85-
dockerClient: dockerClient,
86+
runner: runners.NewLocalRunner(config.ModeLocal.UseSudo),
87+
mu: &sync.Mutex{},
88+
dockerClient: dockerClient,
8689
provision: provision{
8790
config: config,
8891
ctx: ctx,
@@ -196,8 +199,7 @@ func (j *provisionModeLocal) StartSession(username, password, snapshotID string)
196199
return nil, errors.Wrap(err, "failed to get snapshots")
197200
}
198201

199-
// TODO(anatoly): Synchronization or port allocation statuses.
200-
port, err := j.getFreePort()
202+
port, err := j.allocatePort()
201203
if err != nil {
202204
return nil, errors.New("failed to get a free port")
203205
}
@@ -209,6 +211,10 @@ func (j *provisionModeLocal) StartSession(username, password, snapshotID string)
209211
defer func() {
210212
if err != nil {
211213
j.revertSession(name)
214+
215+
if portErr := j.freePort(port); portErr != nil {
216+
log.Err(portErr)
217+
}
212218
}
213219
}()
214220

@@ -227,12 +233,7 @@ func (j *provisionModeLocal) StartSession(username, password, snapshotID string)
227233
return nil, errors.Wrap(err, "failed to prepare a database")
228234
}
229235

230-
err = j.setPort(port, true)
231-
if err != nil {
232-
return nil, errors.Wrap(err, "failed to set a port")
233-
}
234-
235-
j.sessionCounter++
236+
atomic.AddUint32(&j.sessionCounter, 1)
236237

237238
appConfig := j.getAppConfig(name, port)
238239

@@ -263,7 +264,7 @@ func (j *provisionModeLocal) StopSession(session *resources.Session) error {
263264
return errors.Wrap(err, "failed to destroy a clone")
264265
}
265266

266-
err = j.setPort(session.Port, false)
267+
err = j.freePort(session.Port)
267268
if err != nil {
268269
return errors.Wrap(err, "failed to unbind a port")
269270
}
@@ -372,20 +373,39 @@ func (j *provisionModeLocal) initPortPool() error {
372373
return nil
373374
}
374375

375-
func (j *provisionModeLocal) getFreePort() (uint, error) {
376+
// allocatePort tries to find a free port and occupy it.
377+
func (j *provisionModeLocal) allocatePort() (uint, error) {
376378
portOpts := j.config.ModeLocal.PortPool
377379

380+
j.mu.Lock()
381+
defer j.mu.Unlock()
382+
378383
for index, binded := range j.ports {
379384
if !binded {
380385
port := portOpts.From + uint(index)
386+
387+
if err := j.setPortStatus(port, true); err != nil {
388+
return 0, errors.Wrap(err, "failed to set port status")
389+
}
390+
381391
return port, nil
382392
}
383393
}
384394

385395
return 0, errors.WithStack(NewNoRoomError("no available ports"))
386396
}
387397

388-
func (j *provisionModeLocal) setPort(port uint, bind bool) error {
398+
// freePort marks the port as free.
399+
func (j *provisionModeLocal) freePort(port uint) error {
400+
j.mu.Lock()
401+
defer j.mu.Unlock()
402+
403+
return j.setPortStatus(port, false)
404+
}
405+
406+
// setPortStatus updates the port status.
407+
// It's not safe to invoke without ports mutex locking. Use allocatePort and freePort methods.
408+
func (j *provisionModeLocal) setPortStatus(port uint, bind bool) error {
389409
portOpts := j.config.ModeLocal.PortPool
390410

391411
if port < portOpts.From || port >= portOpts.To {
@@ -399,14 +419,14 @@ func (j *provisionModeLocal) setPort(port uint, bind bool) error {
399419
}
400420

401421
func (j *provisionModeLocal) stopAllSessions() error {
402-
insts, err := postgres.List(j.runner, ClonePrefix)
422+
instances, err := postgres.List(j.runner, ClonePrefix)
403423
if err != nil {
404424
return errors.Wrap(err, "failed to list containers")
405425
}
406426

407-
log.Dbg("Containers running:", insts)
427+
log.Dbg("Containers running:", instances)
408428

409-
for _, inst := range insts {
429+
for _, inst := range instances {
410430
log.Dbg("Stopping container:", inst)
411431

412432
if err = postgres.Stop(j.runner, j.getAppConfig(inst, 0)); err != nil {
+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package provision
2+
3+
import (
4+
"sync"
5+
"testing"
6+
7+
"github.com/pkg/errors"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestPortAllocation(t *testing.T) {
13+
p := &provisionModeLocal{
14+
mu: &sync.Mutex{},
15+
provision: provision{
16+
config: Config{
17+
ModeLocal: ModeLocalConfig{
18+
PortPool: ModeLocalPortPool{
19+
From: 6000,
20+
To: 6002,
21+
},
22+
},
23+
},
24+
},
25+
}
26+
27+
// Initialize port pool.
28+
require.NoError(t, p.initPortPool())
29+
30+
// Allocate a new port.
31+
port, err := p.allocatePort()
32+
require.NoError(t, err)
33+
34+
assert.GreaterOrEqual(t, port, p.provision.config.ModeLocal.PortPool.From)
35+
assert.LessOrEqual(t, port, p.provision.config.ModeLocal.PortPool.To)
36+
37+
// Allocate one more port.
38+
_, err = p.allocatePort()
39+
require.NoError(t, err)
40+
41+
// Impossible allocate a new port.
42+
_, err = p.allocatePort()
43+
assert.IsType(t, errors.Cause(err), &NoRoomError{})
44+
assert.EqualError(t, err, "session cannot be started because there is no room: no available ports")
45+
46+
// Free port and allocate a new one.
47+
require.NoError(t, p.freePort(port))
48+
port, err = p.allocatePort()
49+
require.NoError(t, err)
50+
assert.GreaterOrEqual(t, port, p.provision.config.ModeLocal.PortPool.From)
51+
assert.LessOrEqual(t, port, p.provision.config.ModeLocal.PortPool.To)
52+
53+
// Try to free a non-existing port.
54+
err = p.freePort(1)
55+
assert.EqualError(t, err, "port 1 is out of bounds of the port pool")
56+
}

0 commit comments

Comments
 (0)