Skip to content

Commit 8e7ab5e

Browse files
Merge remote-tracking branch 'origin/PMM-8787-concurrency-fix' into release-0.4.8
2 parents 812e164 + a9670fc commit 8e7ab5e

File tree

2 files changed

+113
-34
lines changed

2 files changed

+113
-34
lines changed

cmd/postgres_exporter/pg_setting.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,14 @@ func querySettings(ch chan<- prometheus.Metric, server *Server) error {
3333
return fmt.Errorf("error retrieving rows on %q: %s %v", server, namespace, err)
3434
}
3535

36-
ch <- s.metric(server.labels)
36+
labels := make(prometheus.Labels)
37+
server.labelsMtx.RLock()
38+
for key, value := range server.labels {
39+
labels[key] = value
40+
}
41+
server.labelsMtx.RUnlock()
42+
43+
ch <- s.metric(labels)
3744
}
3845

3946
return nil

cmd/postgres_exporter/postgres_exporter.go

Lines changed: 105 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -558,31 +558,46 @@ func parseUserQueries(content []byte) (map[string]intermediateMetricMap, map[str
558558
func addQueries(content []byte, pgVersion semver.Version, server *Server) error {
559559
metricMaps, newQueryOverrides, err := parseUserQueries(content)
560560
if err != nil {
561-
return nil
561+
return err
562562
}
563563
// Convert the loaded metric map into exporter representation
564-
partialExporterMap := makeDescMap(pgVersion, server.labels, metricMaps)
564+
labels := make(prometheus.Labels)
565+
server.labelsMtx.RLock()
566+
for key, value := range server.labels {
567+
labels[key] = value
568+
}
569+
server.labelsMtx.RUnlock()
570+
571+
partialExporterMap := makeDescMap(pgVersion, labels, metricMaps)
565572

566573
// Merge the two maps (which are now quite flatteend)
567574
for k, v := range partialExporterMap {
575+
server.metricMapMtx.RLock()
568576
_, found := server.metricMap[k]
577+
server.metricMapMtx.RUnlock()
569578
if found {
570579
log.Debugln("Overriding metric", k, "from user YAML file.")
571580
} else {
572581
log.Debugln("Adding new metric", k, "from user YAML file.")
573582
}
583+
server.metricMapMtx.Lock()
574584
server.metricMap[k] = v
585+
server.metricMapMtx.Unlock()
575586
}
576587

577588
// Merge the query override map
578589
for k, v := range newQueryOverrides {
590+
server.queryOverridesMtx.RLock()
579591
_, found := server.queryOverrides[k]
592+
server.queryOverridesMtx.RUnlock()
580593
if found {
581594
log.Debugln("Overriding query override", k, "from user YAML file.")
582595
} else {
583596
log.Debugln("Adding new query override", k, "from user YAML file.")
584597
}
598+
server.queryOverridesMtx.Lock()
585599
server.queryOverrides[k] = v
600+
server.queryOverridesMtx.Unlock()
586601
}
587602
return nil
588603
}
@@ -848,21 +863,25 @@ type cachedMetrics struct {
848863
// Server describes a connection to Postgres.
849864
// Also it contains metrics map and query overrides.
850865
type Server struct {
851-
db *sql.DB
852-
labels prometheus.Labels
853-
master bool
866+
db *sql.DB
867+
labels prometheus.Labels
868+
labelsMtx sync.RWMutex
869+
master bool
870+
masterMtx sync.RWMutex
854871

855872
// Last version used to calculate metric map. If mismatch on scrape,
856873
// then maps are recalculated.
857-
lastMapVersion semver.Version
874+
lastMapVersion semver.Version
875+
lastMapVersionMtx sync.RWMutex
858876
// Currently active metric map
859-
metricMap map[string]MetricMapNamespace
877+
metricMap map[string]MetricMapNamespace
878+
metricMapMtx sync.RWMutex
860879
// Currently active query overrides
861-
queryOverrides map[string]string
862-
mappingMtx sync.RWMutex
880+
queryOverrides map[string]string
881+
queryOverridesMtx sync.RWMutex
863882
// Currently cached metrics
864-
metricCache map[string]cachedMetrics
865-
cacheMtx sync.Mutex
883+
metricCache map[string]cachedMetrics
884+
metricCacheMtx sync.Mutex
866885
}
867886

868887
// ServerOpt configures a server.
@@ -932,12 +951,12 @@ func (s *Server) String() string {
932951

933952
// Scrape loads metrics.
934953
func (s *Server) Scrape(ch chan<- prometheus.Metric, disableSettingsMetrics bool) error {
935-
s.mappingMtx.RLock()
936-
defer s.mappingMtx.RUnlock()
937-
938954
var err error
939955

940-
if !disableSettingsMetrics && s.master {
956+
s.masterMtx.RLock()
957+
master := s.master
958+
s.masterMtx.RUnlock()
959+
if !disableSettingsMetrics && master {
941960
if err = querySettings(ch, s); err != nil {
942961
err = fmt.Errorf("error retrieving settings: %s", err)
943962
}
@@ -968,8 +987,6 @@ func NewServers(opts ...ServerOpt) *Servers {
968987

969988
// GetServer returns established connection from a collection.
970989
func (s *Servers) GetServer(dsn string) (*Server, error) {
971-
s.m.Lock()
972-
defer s.m.Unlock()
973990
var err error
974991
var ok bool
975992
errCount := 0 // start at zero because we increment before doing work
@@ -979,17 +996,24 @@ func (s *Servers) GetServer(dsn string) (*Server, error) {
979996
if errCount++; errCount > retries {
980997
return nil, err
981998
}
999+
s.m.Lock()
9821000
server, ok = s.servers[dsn]
1001+
s.m.Unlock()
9831002
if !ok {
9841003
server, err = NewServer(dsn, s.opts...)
9851004
if err != nil {
9861005
time.Sleep(time.Duration(errCount) * time.Second)
9871006
continue
9881007
}
1008+
s.m.Lock()
9891009
s.servers[dsn] = server
1010+
s.m.Unlock()
9901011
}
1012+
9911013
if err = server.Ping(); err != nil {
1014+
s.m.Lock()
9921015
delete(s.servers, dsn)
1016+
s.m.Unlock()
9931017
time.Sleep(time.Duration(errCount) * time.Second)
9941018
continue
9951019
}
@@ -1246,7 +1270,9 @@ func queryDatabases(server *Server) ([]string, error) {
12461270
// the scrape fails, and a slice of errors if they were non-fatal.
12471271
func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) {
12481272
// Check for a query override for this namespace
1273+
server.queryOverridesMtx.RLock()
12491274
query, found := server.queryOverrides[namespace]
1275+
server.queryOverridesMtx.RUnlock()
12501276

12511277
// Was this query disabled (i.e. nothing sensible can be queried on cu
12521278
// version of PostgreSQL?
@@ -1266,6 +1292,7 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa
12661292
} else {
12671293
rows, err = server.db.Query(query) // nolint: safesql
12681294
}
1295+
12691296
if err != nil {
12701297
return []prometheus.Metric{}, []error{}, fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err)
12711298
}
@@ -1326,7 +1353,13 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa
13261353
} else {
13271354
// Unknown metric. Report as untyped if scan to float64 works, else note an error too.
13281355
metricLabel := fmt.Sprintf("%s_%s", namespace, columnName)
1329-
desc := prometheus.NewDesc(metricLabel, fmt.Sprintf("Unknown metric from %s", namespace), mapping.labels, server.labels)
1356+
serverLabels := make(prometheus.Labels)
1357+
server.labelsMtx.RLock()
1358+
for key, value := range server.labels {
1359+
serverLabels[key] = value
1360+
}
1361+
server.labelsMtx.RUnlock()
1362+
desc := prometheus.NewDesc(metricLabel, fmt.Sprintf("Unknown metric from %s", namespace), mapping.labels, serverLabels)
13301363

13311364
// Its not an error to fail here, since the values are
13321365
// unexpected anyway.
@@ -1351,19 +1384,24 @@ func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[str
13511384

13521385
scrapeStart := time.Now()
13531386

1354-
for namespace, mapping := range server.metricMap {
1387+
server.metricMapMtx.RLock()
1388+
metricMap := server.metricMap
1389+
server.metricMapMtx.RUnlock()
1390+
for namespace, mapping := range metricMap {
13551391
log.Debugln("Querying namespace: ", namespace)
13561392

1357-
if mapping.master && !server.master {
1393+
server.masterMtx.RLock()
1394+
master := server.master
1395+
server.masterMtx.RUnlock()
1396+
if mapping.master && !master {
13581397
log.Debugln("Query skipped...")
13591398
continue
13601399
}
1361-
13621400
scrapeMetric := false
13631401
// Check if the metric is cached
1364-
server.cacheMtx.Lock()
1402+
server.metricCacheMtx.Lock()
13651403
cachedMetric, found := server.metricCache[namespace]
1366-
server.cacheMtx.Unlock()
1404+
server.metricCacheMtx.Unlock()
13671405
// If found, check if needs refresh from cache
13681406
if found {
13691407
if scrapeStart.Sub(cachedMetric.lastScrape).Seconds() > float64(mapping.cacheSeconds) {
@@ -1402,12 +1440,12 @@ func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[str
14021440
if scrapeMetric {
14031441
// Only cache if metric is meaningfully cacheable
14041442
if mapping.cacheSeconds > 0 {
1405-
server.cacheMtx.Lock()
1443+
server.metricCacheMtx.Lock()
14061444
server.metricCache[namespace] = cachedMetrics{
14071445
metrics: metrics,
14081446
lastScrape: scrapeStart,
14091447
}
1410-
server.cacheMtx.Unlock()
1448+
server.metricCacheMtx.Unlock()
14111449
}
14121450
}
14131451
}
@@ -1432,21 +1470,52 @@ func (e *Exporter) checkMapVersions(ch chan<- prometheus.Metric, server *Server)
14321470
log.Warnf("PostgreSQL version is lower on %q then our lowest supported version! Got %s minimum supported is %s.", server, semanticVersion, lowestSupportedVersion)
14331471
}
14341472

1473+
metricMap := make(map[string]MetricMapNamespace)
1474+
server.metricMapMtx.RLock()
1475+
for key, value := range server.metricMap {
1476+
metricMap[key] = value
1477+
}
1478+
server.metricMapMtx.RUnlock()
1479+
1480+
server.masterMtx.RLock()
1481+
master := server.master
1482+
server.masterMtx.RUnlock()
1483+
1484+
labels := make(prometheus.Labels)
1485+
server.labelsMtx.RLock()
1486+
for key, value := range server.labels {
1487+
labels[key] = value
1488+
}
1489+
server.labelsMtx.RUnlock()
1490+
14351491
// Check if semantic version changed and recalculate maps if needed.
1436-
if semanticVersion.NE(server.lastMapVersion) || server.metricMap == nil {
1492+
server.lastMapVersionMtx.RLock()
1493+
isVersionNotEqual := semanticVersion.NE(server.lastMapVersion)
1494+
server.lastMapVersionMtx.RUnlock()
1495+
1496+
if isVersionNotEqual || metricMap == nil {
14371497
log.Infof("Semantic Version Changed on %q: %s -> %s", server, server.lastMapVersion, semanticVersion)
1438-
server.mappingMtx.Lock()
14391498

14401499
// Get Default Metrics only for master database
1441-
if !e.disableDefaultMetrics && server.master {
1442-
server.metricMap = makeDescMap(semanticVersion, server.labels, e.builtinMetricMaps)
1500+
if !e.disableDefaultMetrics && master {
1501+
server.metricMapMtx.Lock()
1502+
server.metricMap = makeDescMap(semanticVersion, labels, e.builtinMetricMaps)
1503+
server.metricMapMtx.Unlock()
1504+
server.queryOverridesMtx.Lock()
14431505
server.queryOverrides = makeQueryOverrideMap(semanticVersion, queryOverrides)
1506+
server.queryOverridesMtx.Unlock()
14441507
} else {
1508+
server.metricMapMtx.Lock()
14451509
server.metricMap = make(map[string]MetricMapNamespace)
1510+
server.metricMapMtx.Unlock()
1511+
server.queryOverridesMtx.Lock()
14461512
server.queryOverrides = make(map[string]string)
1513+
server.queryOverridesMtx.Unlock()
14471514
}
14481515

1516+
server.lastMapVersionMtx.Lock()
14491517
server.lastMapVersion = semanticVersion
1518+
server.lastMapVersionMtx.Unlock()
14501519

14511520
if e.userQueriesPath[HR] != "" || e.userQueriesPath[MR] != "" || e.userQueriesPath[LR] != "" {
14521521
// Clear the metric while a reload is happening
@@ -1458,18 +1527,17 @@ func (e *Exporter) checkMapVersions(ch chan<- prometheus.Metric, server *Server)
14581527
e.loadCustomQueries(res, semanticVersion, server)
14591528
}
14601529
}
1461-
1462-
server.mappingMtx.Unlock()
14631530
}
14641531

14651532
// Output the version as a special metric only for master database
14661533
versionDesc := prometheus.NewDesc(fmt.Sprintf("%s_%s", namespace, staticLabelName),
1467-
"Version string as reported by postgres", []string{"version", "short_version"}, server.labels)
1534+
"Version string as reported by postgres", []string{"version", "short_version"}, labels)
14681535

1469-
if !e.disableDefaultMetrics && server.master {
1536+
if !e.disableDefaultMetrics && master {
14701537
ch <- prometheus.MustNewConstMetric(versionDesc,
14711538
prometheus.UntypedValue, 1, versionString, semanticVersion.String())
14721539
}
1540+
14731541
return nil
14741542
}
14751543

@@ -1574,7 +1642,9 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
15741642
}
15751643

15761644
// If autoDiscoverDatabases is true, set first dsn as master database (Default: false)
1645+
server.masterMtx.Lock()
15771646
server.master = true
1647+
server.masterMtx.Unlock()
15781648

15791649
databaseNames, err := queryDatabases(server)
15801650
if err != nil {
@@ -1609,7 +1679,9 @@ func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
16091679

16101680
// Check if autoDiscoverDatabases is false, set dsn as master database (Default: false)
16111681
if !e.autoDiscoverDatabases {
1682+
server.masterMtx.Lock()
16121683
server.master = true
1684+
server.masterMtx.Unlock()
16131685
}
16141686

16151687
// Check if map versions need to be updated

0 commit comments

Comments
 (0)