Skip to content

Commit 345f51f

Browse files
Merge pull request #15 from form3tech-oss/ap-fix-connection-leaks
fix: connection leaks
2 parents 93bb0e6 + 27ceefc commit 345f51f

File tree

9 files changed

+208
-260
lines changed

9 files changed

+208
-260
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,15 @@ The following environment variables configure the exporter:
255255

256256
* `PG_EXPORTER_METRIC_PREFIX`
257257
A prefix to use for each of the default metrics exported by postgres-exporter. Default is `pg`
258+
259+
* `PG_MAX_CONNECTIONS`
260+
Maximum number of open connections by the collector `-1`
261+
262+
* `PG_MAX_IDLE_CONNECTIONS`
263+
Maximum number of idle connections by the collector `-1`
264+
265+
* `PG_SCRAPE_TIMEOUT`
266+
Single collector timeout `10s`
258267

259268
Settings set by environment variables starting with `PG_` will be overwritten by the corresponding CLI flag if given.
260269

cmd/postgres_exporter/main.go

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,16 @@
1414
package main
1515

1616
import (
17+
"context"
1718
"errors"
1819
"fmt"
1920
"net"
2021
"net/http"
2122
"os"
23+
"os/signal"
2224
"strings"
25+
"syscall"
26+
"time"
2327

2428
"github.com/alecthomas/kingpin/v2"
2529
"github.com/form3tech-oss/postgres_exporter/collector"
@@ -53,6 +57,9 @@ var (
5357
excludeDatabases = kingpin.Flag("exclude-databases", "A list of databases to remove when autoDiscoverDatabases is enabled (DEPRECATED)").Default("").Envar("PG_EXPORTER_EXCLUDE_DATABASES").String()
5458
includeDatabases = kingpin.Flag("include-databases", "A list of databases to include when autoDiscoverDatabases is enabled (DEPRECATED)").Default("").Envar("PG_EXPORTER_INCLUDE_DATABASES").String()
5559
metricPrefix = kingpin.Flag("metric-prefix", "A metric prefix can be used to have non-default (not \"pg\") prefixes for each of the metrics").Default("pg").Envar("PG_EXPORTER_METRIC_PREFIX").String()
60+
maxOpenConnections = kingpin.Flag("max-connections", "the maximum number of opened connections").Default("-1").Envar("PG_MAX_CONNECTIONS").Int()
61+
maxIdleConnections = kingpin.Flag("max-idle-connections", "the maximum number of idle connections").Default("-1").Envar("PG_MAX_IDLE_CONNECTIONS").Int()
62+
collectorTimeout = kingpin.Flag("collector-timeout", "the single collector scrape timeout").Default("10s").Envar("PG_COLLECTOR_TIMEOUT").Duration()
5663
logger = log.NewNopLogger()
5764
)
5865

@@ -130,12 +137,11 @@ func main() {
130137
WithConstantLabels(*constantLabelsList),
131138
ExcludeDatabases(excludedDatabases),
132139
IncludeDatabases(*includeDatabases),
140+
WithMaxOpenConnections(*maxOpenConnections),
141+
WithMaxIdleConnections(*maxIdleConnections),
133142
}
134143

135144
exporter := NewExporter(dsns, opts...)
136-
defer func() {
137-
exporter.servers.Close()
138-
}()
139145

140146
reg := prometheus.NewRegistry()
141147

@@ -149,22 +155,31 @@ func main() {
149155
if len(dsns) > 0 {
150156
dsn = dsns[0]
151157
}
152-
158+
153159
collOpts := []collector.Option{
154160
collector.WithConstantLabels(parseConstLabels(*constantLabelsList)),
161+
collector.WithMaxIdleConnections(*maxIdleConnections),
162+
collector.WithMaxOpenConnections(*maxOpenConnections),
163+
collector.WithScrapeTimeout(*collectorTimeout),
164+
}
165+
166+
if *maxOpenConnections >= 0 {
167+
collOpts = append(collOpts, collector.WithMaxOpenConnections(*maxOpenConnections))
168+
}
169+
if *maxIdleConnections >= 0 {
170+
collOpts = append(collOpts, collector.WithMaxIdleConnections(*maxIdleConnections))
155171
}
156172

157173
pgColl, err := collector.NewPostgresCollector(
158174
logger,
159175
excludedDatabases,
160176
dsn,
161177
[]string{},
162-
collOpts...
178+
collOpts...,
163179
)
164180
if err != nil {
165-
level.Warn(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error())
181+
level.Error(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error())
166182
} else {
167-
168183
reg.MustRegister(pgColl)
169184
}
170185

@@ -190,13 +205,30 @@ func main() {
190205
http.Handle("/", landingPage)
191206
}
192207

193-
http.HandleFunc("/probe", handleProbe(logger, excludedDatabases))
194-
195208
srv := &http.Server{}
196-
if err := web.ListenAndServe(srv, webConfig, logger); err != nil {
197-
level.Error(logger).Log("msg", "Error running HTTP server", "err", err)
209+
srv.RegisterOnShutdown(func() {
210+
level.Info(logger).Log("msg", "gracefully shutting down HTTP server")
211+
exporter.servers.Close()
212+
pgColl.Close()
213+
})
214+
215+
go func() {
216+
if err := web.ListenAndServe(srv, webConfig, logger); !errors.Is(err, http.ErrServerClosed) {
217+
level.Error(logger).Log("msg", "running HTTP server", "err", err)
218+
}
219+
}()
220+
221+
sigChan := make(chan os.Signal, 1)
222+
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
223+
<-sigChan
224+
225+
shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 30*time.Second)
226+
defer shutdownRelease()
227+
if err := srv.Shutdown(shutdownCtx); err != nil {
228+
level.Error(logger).Log("msg", "during HTTP server shut down", "err", err)
198229
os.Exit(1)
199230
}
231+
level.Info(logger).Log("msg", "HTTP server gracefully shut down")
200232
}
201233

202234
func runHealthCheck(webConfig *web.FlagConfig) (bool, error) {

cmd/postgres_exporter/namespace.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package main
1515

1616
import (
17+
"context"
1718
"database/sql"
1819
"errors"
1920
"fmt"
@@ -27,7 +28,7 @@ import (
2728

2829
// Query within a namespace mapping and emit metrics. Returns fatal errors if
2930
// the scrape fails, and a slice of errors if they were non-fatal.
30-
func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) {
31+
func queryNamespaceMappingWithContext(ctx context.Context, server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) {
3132
// Check for a query override for this namespace
3233
query, found := server.queryOverrides[namespace]
3334

@@ -45,19 +46,19 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa
4546
if !found {
4647
// I've no idea how to avoid this properly at the moment, but this is
4748
// an admin tool so you're not injecting SQL right?
48-
rows, err = server.db.Query(fmt.Sprintf("SELECT * FROM %s;", namespace)) // nolint: gas
49+
rows, err = server.db.QueryContext(ctx, fmt.Sprintf("SELECT * FROM %s;", namespace)) // nolint: gas
4950
} else {
50-
rows, err = server.db.Query(query)
51+
rows, err = server.db.QueryContext(ctx, query)
5152
}
5253
if err != nil {
53-
return []prometheus.Metric{}, []error{}, fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err)
54+
return []prometheus.Metric{}, []error{}, fmt.Errorf("error running query on database %q: %s %v", server, namespace, err)
5455
}
5556
defer rows.Close() // nolint: errcheck
5657

5758
var columnNames []string
5859
columnNames, err = rows.Columns()
5960
if err != nil {
60-
return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("Error retrieving column list for: ", namespace, err))
61+
return []prometheus.Metric{}, []error{}, errors.New(fmt.Sprintln("error retrieving column list for: ", namespace, err))
6162
}
6263

6364
// Make a lookup map for the column indices
@@ -183,17 +184,17 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa
183184

184185
// Iterate through all the namespace mappings in the exporter and run their
185186
// queries.
186-
func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[string]error {
187+
func queryNamespaceMappings(ctx context.Context, ch chan<- prometheus.Metric, server *Server) map[string]error {
187188
// Return a map of namespace -> errors
188189
namespaceErrors := make(map[string]error)
189190

190191
scrapeStart := time.Now()
191192

192193
for namespace, mapping := range server.metricMap {
193-
level.Debug(logger).Log("msg", "Querying namespace", "namespace", namespace)
194+
level.Debug(logger).Log("msg", "querying namespace", "namespace", namespace)
194195

195196
if mapping.master && !server.master {
196-
level.Debug(logger).Log("msg", "Query skipped...")
197+
level.Debug(logger).Log("msg", "query skipped...", "namespace", namespace)
197198
continue
198199
}
199200

@@ -202,7 +203,7 @@ func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[str
202203
serVersion, _ := semver.Parse(server.lastMapVersion.String())
203204
runServerRange, _ := semver.ParseRange(server.runonserver)
204205
if !runServerRange(serVersion) {
205-
level.Debug(logger).Log("msg", "Query skipped for this database version", "version", server.lastMapVersion.String(), "target_version", server.runonserver)
206+
level.Debug(logger).Log("msg", "query skipped for this database version", "version", server.lastMapVersion.String(), "target_version", server.runonserver)
206207
continue
207208
}
208209
}
@@ -225,20 +226,21 @@ func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[str
225226
var nonFatalErrors []error
226227
var err error
227228
if scrapeMetric {
228-
metrics, nonFatalErrors, err = queryNamespaceMapping(server, namespace, mapping)
229-
} else {
229+
metrics, nonFatalErrors, err = queryNamespaceMappingWithContext(ctx, server, namespace, mapping)
230+
} else {
231+
level.Debug(logger).Log("msg", "found cached metrics", "namespace", namespace)
230232
metrics = cachedMetric.metrics
231233
}
232234

233235
// Serious error - a namespace disappeared
234236
if err != nil {
235237
namespaceErrors[namespace] = err
236-
level.Info(logger).Log("err", err)
238+
level.Error(logger).Log("err", err)
237239
}
238240
// Non-serious errors - likely version or parsing problems.
239241
if len(nonFatalErrors) > 0 {
240242
for _, err := range nonFatalErrors {
241-
level.Info(logger).Log("err", err)
243+
level.Error(logger).Log("err", err)
242244
}
243245
}
244246

cmd/postgres_exporter/postgres_exporter.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,11 @@ type Exporter struct {
420420
userQueriesError *prometheus.GaugeVec
421421
totalScrapes prometheus.Counter
422422

423+
// Connection settings
424+
maxOpenConnections int
425+
maxIdleConnections int
426+
scrapeTimeout time.Duration
427+
423428
// servers are used to allow re-using the DB connection between scrapes.
424429
// servers contains metrics map and query overrides.
425430
servers *Servers
@@ -472,6 +477,24 @@ func WithUserQueriesPath(p string) ExporterOpt {
472477
}
473478
}
474479

480+
func WithMaxOpenConnections(n int) ExporterOpt {
481+
return func(e *Exporter) {
482+
e.maxOpenConnections = n
483+
}
484+
}
485+
486+
func WithMaxIdleConnections(n int) ExporterOpt {
487+
return func(e *Exporter) {
488+
e.maxIdleConnections = n
489+
}
490+
}
491+
492+
func WithScrapeTimeout(d time.Duration) ExporterOpt {
493+
return func(e *Exporter) {
494+
e.scrapeTimeout = d
495+
}
496+
}
497+
475498
// WithConstantLabels configures constant labels.
476499
func WithConstantLabels(s string) ExporterOpt {
477500
return func(e *Exporter) {

cmd/postgres_exporter/probe.go

Lines changed: 0 additions & 107 deletions
This file was deleted.

0 commit comments

Comments
 (0)