Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Conn struct {
lastID int64

createdAt time.Time
usedAt uint32 // atomic
usedAt atomic.Uint32
pooled bool
Inited bool
}
Expand All @@ -36,12 +36,12 @@ func NewConn(netConn net.Conn, pool *ConnPool) *Conn {
}

func (cn *Conn) UsedAt() time.Time {
unix := atomic.LoadUint32(&cn.usedAt)
unix := cn.usedAt.Load()
return time.Unix(int64(unix), 0)
}

func (cn *Conn) SetUsedAt(tm time.Time) {
atomic.StoreUint32(&cn.usedAt, uint32(tm.Unix()))
cn.usedAt.Store(uint32(tm.Unix()))
}

func (cn *Conn) RemoteAddr() net.Addr {
Expand Down
14 changes: 7 additions & 7 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ type Options struct {
type ConnPool struct {
opt *Options

dialErrorsNum uint32 // atomic
dialErrorsNum atomic.Uint32

_closed uint32 // atomic
_closed atomic.Bool

lastDialErrorMu sync.RWMutex
lastDialError error
Expand Down Expand Up @@ -188,14 +188,14 @@ func (p *ConnPool) dialConn(c context.Context, pooled bool) (*Conn, error) {
return nil, ErrClosed
}

if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
if p.dialErrorsNum.Load() >= uint32(p.opt.PoolSize) {
return nil, p.getLastDialError()
}

netConn, err := p.opt.Dialer(c)
if err != nil {
p.setLastDialError(err)
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
if p.dialErrorsNum.Add(1) == uint32(p.opt.PoolSize) {
go p.tryDial()
}
return nil, err
Expand All @@ -219,7 +219,7 @@ func (p *ConnPool) tryDial() {
continue
}

atomic.StoreUint32(&p.dialErrorsNum, 0)
p.dialErrorsNum.Store(0)
_ = conn.Close()
return
}
Expand Down Expand Up @@ -415,7 +415,7 @@ func (p *ConnPool) Stats() *Stats {
}

func (p *ConnPool) closed() bool {
return atomic.LoadUint32(&p._closed) == 1
return p._closed.Load()
}

func (p *ConnPool) Filter(fn func(*Conn) bool) error {
Expand All @@ -433,7 +433,7 @@ func (p *ConnPool) Filter(fn func(*Conn) bool) error {
}

func (p *ConnPool) Close() error {
if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
if !p._closed.CompareAndSwap(false, true) {
return ErrClosed
}

Expand Down
22 changes: 11 additions & 11 deletions internal/pool/pool_sticky.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func (e BadConnError) Unwrap() error {

type StickyConnPool struct {
pool Pooler
shared int32 // atomic
shared atomic.Int32

state uint32 // atomic
state atomic.Uint32
ch chan *Conn

_badConnError atomic.Value
Expand All @@ -53,7 +53,7 @@ func NewStickyConnPool(pool Pooler) *StickyConnPool {
ch: make(chan *Conn, 1),
}
}
atomic.AddInt32(&p.shared, 1)
p.shared.Add(1)
return p
}

Expand All @@ -68,13 +68,13 @@ func (p *StickyConnPool) CloseConn(cn *Conn) error {
func (p *StickyConnPool) Get(ctx context.Context) (*Conn, error) {
// In worst case this races with Close which is not a very common operation.
for i := 0; i < 1000; i++ {
switch atomic.LoadUint32(&p.state) {
switch p.state.Load() {
case stateDefault:
cn, err := p.pool.Get(ctx)
if err != nil {
return nil, err
}
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
if p.state.CompareAndSwap(stateDefault, stateInited) {
return cn, nil
}
p.pool.Remove(ctx, cn, ErrClosed)
Expand Down Expand Up @@ -124,16 +124,16 @@ func (p *StickyConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
}

func (p *StickyConnPool) Close() error {
if shared := atomic.AddInt32(&p.shared, -1); shared > 0 {
if shared := p.shared.Add(-1); shared > 0 {
return nil
}

for i := 0; i < 1000; i++ {
state := atomic.LoadUint32(&p.state)
state := p.state.Load()
if state == stateClosed {
return ErrClosed
}
if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) {
if p.state.CompareAndSwap(state, stateClosed) {
close(p.ch)
cn, ok := <-p.ch
if ok {
Expand Down Expand Up @@ -162,8 +162,8 @@ func (p *StickyConnPool) Reset(ctx context.Context) error {
return errors.New("pg: StickyConnPool does not have a Conn")
}

if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) {
state := atomic.LoadUint32(&p.state)
if !p.state.CompareAndSwap(stateInited, stateDefault) {
state := p.state.Load()
return fmt.Errorf("pg: invalid StickyConnPool state: %d", state)
}

Expand All @@ -181,7 +181,7 @@ func (p *StickyConnPool) badConnError() error {
}

func (p *StickyConnPool) Len() int {
switch atomic.LoadUint32(&p.state) {
switch p.state.Load() {
case stateDefault:
return 0
case stateInited:
Expand Down
6 changes: 3 additions & 3 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Tx struct {
stmtsMu sync.Mutex
stmts []*Stmt

_closed int32
_closed atomic.Bool
}

var _ orm.DB = (*Tx)(nil)
Expand Down Expand Up @@ -368,7 +368,7 @@ func (tx *Tx) CloseContext(ctx context.Context) error {
}

func (tx *Tx) close() {
if !atomic.CompareAndSwapInt32(&tx._closed, 0, 1) {
if !tx._closed.CompareAndSwap(false, true) {
return
}

Expand All @@ -384,5 +384,5 @@ func (tx *Tx) close() {
}

func (tx *Tx) closed() bool {
return atomic.LoadInt32(&tx._closed) == 1
return tx._closed.Load()
}