Skip to content

Commit 07eed02

Browse files
committed
fix: connection leaks
- run sql.Open only during collector initialisation. - make the number of max and max idle connections configurable and give them sensible defaults. - add a timeout to each collector scrape. - better shut down handling
1 parent 93bb0e6 commit 07eed02

File tree

7 files changed

+162
-37
lines changed

7 files changed

+162
-37
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,15 @@ The following environment variables configure the exporter:
255255

256256
* `PG_EXPORTER_METRIC_PREFIX`
257257
A prefix to use for each of the default metrics exported by postgres-exporter. Default is `pg`
258+
259+
* `PG_MAX_CONNECTIONS`
260+
Maximum number of open connections by the collector `-1`
261+
262+
* `PG_MAX_IDLE_CONNECTIONS`
263+
Maximum number of idle connections by the collector `-1`
264+
265+
* `PG_SCRAPE_TIMEOUT`
266+
Single collector timeout `10s`
258267

259268
Settings set by environment variables starting with `PG_` will be overwritten by the corresponding CLI flag if given.
260269

cmd/postgres_exporter/main.go

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,16 @@
1414
package main
1515

1616
import (
17+
"context"
1718
"errors"
1819
"fmt"
1920
"net"
2021
"net/http"
2122
"os"
23+
"os/signal"
2224
"strings"
25+
"syscall"
26+
"time"
2327

2428
"github.com/alecthomas/kingpin/v2"
2529
"github.com/form3tech-oss/postgres_exporter/collector"
@@ -53,6 +57,9 @@ var (
5357
excludeDatabases = kingpin.Flag("exclude-databases", "A list of databases to remove when autoDiscoverDatabases is enabled (DEPRECATED)").Default("").Envar("PG_EXPORTER_EXCLUDE_DATABASES").String()
5458
includeDatabases = kingpin.Flag("include-databases", "A list of databases to include when autoDiscoverDatabases is enabled (DEPRECATED)").Default("").Envar("PG_EXPORTER_INCLUDE_DATABASES").String()
5559
metricPrefix = kingpin.Flag("metric-prefix", "A metric prefix can be used to have non-default (not \"pg\") prefixes for each of the metrics").Default("pg").Envar("PG_EXPORTER_METRIC_PREFIX").String()
60+
maxOpenConnections = kingpin.Flag("max-connections", "the maximum number of opened connections").Default("-1").Envar("PG_MAX_CONNECTIONS").Int()
61+
maxIdleConnections = kingpin.Flag("max-idle-connections", "the maximum number of idle connections").Default("-1").Envar("PG_MAX_IDLE_CONNECTIONS").Int()
62+
collectorTimeout = kingpin.Flag("collector-timeout", "the single collector scrape timeout").Default("10s").Envar("PG_COLLECTOR_TIMEOUT").Duration()
5663
logger = log.NewNopLogger()
5764
)
5865

@@ -133,9 +140,6 @@ func main() {
133140
}
134141

135142
exporter := NewExporter(dsns, opts...)
136-
defer func() {
137-
exporter.servers.Close()
138-
}()
139143

140144
reg := prometheus.NewRegistry()
141145

@@ -149,22 +153,29 @@ func main() {
149153
if len(dsns) > 0 {
150154
dsn = dsns[0]
151155
}
152-
156+
153157
collOpts := []collector.Option{
154158
collector.WithConstantLabels(parseConstLabels(*constantLabelsList)),
159+
collector.WithScrapeTimeout(*collectorTimeout),
160+
}
161+
162+
if *maxOpenConnections >= 0 {
163+
collOpts = append(collOpts, collector.WithMaxOpenConnections(*maxOpenConnections))
164+
}
165+
if *maxIdleConnections >= 0 {
166+
collOpts = append(collOpts, collector.WithMaxIdleConnections(*maxIdleConnections))
155167
}
156168

157169
pgColl, err := collector.NewPostgresCollector(
158170
logger,
159171
excludedDatabases,
160172
dsn,
161173
[]string{},
162-
collOpts...
174+
collOpts...,
163175
)
164176
if err != nil {
165-
level.Warn(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error())
177+
level.Error(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error())
166178
} else {
167-
168179
reg.MustRegister(pgColl)
169180
}
170181

@@ -193,10 +204,29 @@ func main() {
193204
http.HandleFunc("/probe", handleProbe(logger, excludedDatabases))
194205

195206
srv := &http.Server{}
196-
if err := web.ListenAndServe(srv, webConfig, logger); err != nil {
197-
level.Error(logger).Log("msg", "Error running HTTP server", "err", err)
207+
srv.RegisterOnShutdown(func() {
208+
level.Info(logger).Log("msg", "gracefully shutting down HTTP server")
209+
exporter.servers.Close()
210+
pgColl.Close()
211+
})
212+
213+
go func() {
214+
if err := web.ListenAndServe(srv, webConfig, logger); !errors.Is(err, http.ErrServerClosed) {
215+
level.Error(logger).Log("msg", "running HTTP server", "err", err)
216+
}
217+
}()
218+
219+
sigChan := make(chan os.Signal, 1)
220+
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
221+
<-sigChan
222+
223+
shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 30*time.Second)
224+
defer shutdownRelease()
225+
if err := srv.Shutdown(shutdownCtx); err != nil {
226+
level.Error(logger).Log("msg", "during HTTP server shut down", "err", err)
198227
os.Exit(1)
199228
}
229+
level.Info(logger).Log("msg", "HTTP server gracefully shut down")
200230
}
201231

202232
func runHealthCheck(webConfig *web.FlagConfig) (bool, error) {

cmd/postgres_exporter/server.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func NewServer(dsn string, opts ...ServerOpt) (*Server, error) {
7171
db.SetMaxOpenConns(1)
7272
db.SetMaxIdleConns(1)
7373

74-
level.Info(logger).Log("msg", "Established new database connection", "fingerprint", fingerprint)
74+
level.Info(logger).Log("msg", "set up new server", "fingerprint", fingerprint)
7575

7676
s := &Server{
7777
db: db,
@@ -163,6 +163,7 @@ func (s *Servers) GetServer(dsn string) (*Server, error) {
163163
if !ok {
164164
server, err = NewServer(dsn, s.opts...)
165165
if err != nil {
166+
level.Error(logger).Log("msg", "failed create NewServer", "server", server, "err", err)
166167
time.Sleep(time.Duration(errCount) * time.Second)
167168
continue
168169
}
@@ -182,7 +183,12 @@ func (s *Servers) GetServer(dsn string) (*Server, error) {
182183
func (s *Servers) Close() {
183184
s.m.Lock()
184185
defer s.m.Unlock()
186+
if len(s.servers) == 0 {
187+
level.Debug(logger).Log("msg", "no servers to close connection for")
188+
return
189+
}
185190
for _, server := range s.servers {
191+
level.Info(logger).Log("msg", "closing server", "server", server)
186192
if err := server.Close(); err != nil {
187193
level.Error(logger).Log("msg", "Failed to close connection", "server", server, "err", err)
188194
}

collector/collector.go

Lines changed: 66 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ const (
4040

4141
defaultEnabled = true
4242
defaultDisabled = false
43+
44+
defaultMaxOpenConnections = 10
45+
defaultIdleConnections = 5
4346
)
4447

4548
var (
@@ -92,8 +95,11 @@ type PostgresCollector struct {
9295
Collectors map[string]Collector
9396
logger log.Logger
9497

95-
instance *instance
96-
constantLabels prometheus.Labels
98+
instance *instance
99+
constantLabels prometheus.Labels
100+
maxOpenConnections int
101+
maxIdleConnections int
102+
scrapeTimeout time.Duration
97103
}
98104

99105
type Option func(*PostgresCollector) error
@@ -106,10 +112,37 @@ func WithConstantLabels(l prometheus.Labels) Option {
106112
}
107113
}
108114

115+
// WithMaxOpenConnections configures the max number of open connections kept in the underlying pool.
116+
func WithMaxOpenConnections(v int) Option {
117+
return func(c *PostgresCollector) error {
118+
c.maxOpenConnections = v
119+
return nil
120+
}
121+
}
122+
123+
// WithMaxIdleConnections configures the max number of idle connections kept in the underlying pool.
124+
func WithMaxIdleConnections(v int) Option {
125+
return func(c *PostgresCollector) error {
126+
c.maxIdleConnections = v
127+
return nil
128+
}
129+
}
130+
131+
// WithScrapeTimeout configures the timeout for a single collector scrape.
132+
func WithScrapeTimeout(t time.Duration) Option {
133+
return func(c *PostgresCollector) error {
134+
c.scrapeTimeout = t
135+
return nil
136+
}
137+
}
138+
109139
// NewPostgresCollector creates a new PostgresCollector.
110140
func NewPostgresCollector(logger log.Logger, excludeDatabases []string, dsn string, filters []string, options ...Option) (*PostgresCollector, error) {
111141
p := &PostgresCollector{
112-
logger: logger,
142+
logger: logger,
143+
scrapeTimeout: 5 * time.Second,
144+
maxOpenConnections: defaultMaxOpenConnections,
145+
maxIdleConnections: defaultIdleConnections,
113146
}
114147
// Apply options to customize the collector
115148
for _, o := range options {
@@ -160,15 +193,31 @@ func NewPostgresCollector(logger log.Logger, excludeDatabases []string, dsn stri
160193
return nil, errors.New("empty dsn")
161194
}
162195

163-
instance, err := newInstance(dsn)
196+
instanceConf := &instanceConfiguration{
197+
dbMaxOpenConns: p.maxOpenConnections,
198+
dbMaxIdleConns: p.maxIdleConnections,
199+
}
200+
instance, err := newInstance(dsn, instanceConf)
201+
if err != nil {
202+
return nil, err
203+
}
204+
205+
err = instance.setup()
164206
if err != nil {
207+
level.Error(p.logger).Log("msg", "setting up connection to database", "err", err)
165208
return nil, err
166209
}
167210
p.instance = instance
168211

169212
return p, nil
170213
}
171214

215+
// Close closes the underlying collector instance
216+
func (p PostgresCollector) Close() error {
217+
level.Debug(p.logger).Log("msg", "closing collector", "instance", p.instance)
218+
return p.instance.Close()
219+
}
220+
172221
// Describe implements the prometheus.Collector interface.
173222
func (p PostgresCollector) Describe(ch chan<- *prometheus.Desc) {
174223
ch <- scrapeDurationDesc
@@ -177,42 +226,40 @@ func (p PostgresCollector) Describe(ch chan<- *prometheus.Desc) {
177226

178227
// Collect implements the prometheus.Collector interface.
179228
func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) {
180-
ctx := context.TODO()
181-
182-
// Set up the database connection for the collector.
183-
err := p.instance.setup()
184-
if err != nil {
185-
level.Error(p.logger).Log("msg", "Error opening connection to database", "err", err)
186-
return
187-
}
188-
defer p.instance.Close()
229+
ctx := context.Background()
189230

190231
wg := sync.WaitGroup{}
191232
wg.Add(len(p.Collectors))
192233
for name, c := range p.Collectors {
193234
go func(name string, c Collector) {
194-
execute(ctx, name, c, p.instance, ch, p.logger)
195-
wg.Done()
235+
execute(ctx, p.scrapeTimeout, name, c, p.instance, ch, p.logger, &wg)
196236
}(name, c)
197237
}
198238
wg.Wait()
199239
}
200240

201-
func execute(ctx context.Context, name string, c Collector, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) {
241+
func execute(ctx context.Context, timeout time.Duration, name string, c Collector, instance *instance, ch chan<- prometheus.Metric, logger log.Logger, wg *sync.WaitGroup) {
242+
defer wg.Done()
202243
begin := time.Now()
203-
err := c.Update(ctx, instance, ch)
244+
245+
scrapeCtx, cancel := context.WithTimeout(ctx, timeout)
246+
defer cancel()
247+
248+
err := c.Update(scrapeCtx, instance, ch)
204249
duration := time.Since(begin)
205250
var success float64
206251

207252
if err != nil {
253+
success = 0
208254
if IsNoDataError(err) {
209255
level.Debug(logger).Log("msg", "collector returned no data", "name", name, "duration_seconds", duration.Seconds(), "err", err)
256+
} else if scrapeCtx.Err() == context.DeadlineExceeded {
257+
level.Error(logger).Log("msg", "collector timedout", "name", name, "duration_seconds", duration.Seconds(), "err", err)
210258
} else {
211259
level.Error(logger).Log("msg", "collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err)
212260
}
213-
success = 0
214261
} else {
215-
level.Debug(logger).Log("msg", "collector succeeded", "name", name, "duration_seconds", duration.Seconds())
262+
level.Info(logger).Log("msg", "collector succeeded", "name", name, "duration_seconds", duration.Seconds())
216263
success = 1
217264
}
218265
ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name)

collector/instance.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,18 @@ type instance struct {
2525
dsn string
2626
db *sql.DB
2727
version semver.Version
28+
conf *instanceConfiguration
2829
}
2930

30-
func newInstance(dsn string) (*instance, error) {
31+
type instanceConfiguration struct {
32+
dbMaxOpenConns int
33+
dbMaxIdleConns int
34+
}
35+
36+
func newInstance(dsn string, conf *instanceConfiguration) (*instance, error) {
3137
i := &instance{
32-
dsn: dsn,
38+
dsn: dsn,
39+
conf: conf,
3340
}
3441

3542
// "Create" a database handle to verify the DSN provided is valid.
@@ -48,16 +55,16 @@ func (i *instance) setup() error {
4855
if err != nil {
4956
return err
5057
}
51-
db.SetMaxOpenConns(1)
52-
db.SetMaxIdleConns(1)
53-
i.db = db
58+
db.SetMaxOpenConns(i.conf.dbMaxOpenConns)
59+
db.SetMaxIdleConns(i.conf.dbMaxIdleConns)
5460

55-
version, err := queryVersion(i.db)
61+
version, err := queryVersion(db)
5662
if err != nil {
5763
return fmt.Errorf("error querying postgresql version: %w", err)
5864
} else {
5965
i.version = version
6066
}
67+
i.db = db
6168
return nil
6269
}
6370

collector/probe.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package collector
1616
import (
1717
"context"
1818
"sync"
19+
"time"
1920

2021
"github.com/form3tech-oss/postgres_exporter/config"
2122
"github.com/go-kit/log"
@@ -58,7 +59,11 @@ func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *p
5859
}
5960
}
6061

61-
instance, err := newInstance(dsn.GetConnectionString())
62+
cnf := &instanceConfiguration{
63+
dbMaxOpenConns: 2,
64+
dbMaxIdleConns: 1,
65+
}
66+
instance, err := newInstance(dsn.GetConnectionString(), cnf)
6267
if err != nil {
6368
return nil, err
6469
}
@@ -87,7 +92,7 @@ func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) {
8792
wg.Add(len(pc.collectors))
8893
for name, c := range pc.collectors {
8994
go func(name string, c Collector) {
90-
execute(context.TODO(), name, c, pc.instance, ch, pc.logger)
95+
execute(context.TODO(), 10*time.Second, name, c, pc.instance, ch, pc.logger, &wg)
9196
wg.Done()
9297
}(name, c)
9398
}

docker-compose.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
services:
2+
db:
3+
image: postgres:13.13-alpine
4+
restart: always
5+
command: postgres -c logging_collector=on -c log_destination=stderr -c wal_level=logical
6+
environment:
7+
- POSTGRES_USER=postgres
8+
- POSTGRES_PASSWORD=postgres
9+
ports:
10+
- '5432:5432'
11+
exporter:
12+
build:
13+
context: .
14+
args:
15+
- OS=linux
16+
- ARCH=arm64
17+
restart: no
18+
entrypoint: ["postgres_exporter"]
19+
command: infinity
20+
environment:
21+
- DATA_SOURCE_NAME="postgresql://postgres:postgres@db:5432/postgres?sslmode=disable"

0 commit comments

Comments
 (0)