Skip to content

Commit d521609

Browse files
author
Jiri Ctvrtka
committed
PMM-8787 Fix.
1 parent ad4379d commit d521609

File tree

2 files changed

+114
-34
lines changed

2 files changed

+114
-34
lines changed

cmd/postgres_exporter/pg_setting.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,14 @@ func querySettings(ch chan<- prometheus.Metric, server *Server) error {
3333
return fmt.Errorf("error retrieving rows on %q: %s %v", server, namespace, err)
3434
}
3535

36-
ch <- s.metric(server.labels)
36+
labels := make(prometheus.Labels)
37+
server.labelsMtx.RLock()
38+
for key, value := range server.labels {
39+
labels[key] = value
40+
}
41+
server.labelsMtx.RUnlock()
42+
43+
ch <- s.metric(labels)
3744
}
3845

3946
return nil

cmd/postgres_exporter/postgres_exporter.go

Lines changed: 106 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -558,31 +558,46 @@ func parseUserQueries(content []byte) (map[string]intermediateMetricMap, map[str
558558
func addQueries(content []byte, pgVersion semver.Version, server *Server) error {
559559
metricMaps, newQueryOverrides, err := parseUserQueries(content)
560560
if err != nil {
561-
return nil
561+
return err
562562
}
563563
// Convert the loaded metric map into exporter representation
564-
partialExporterMap := makeDescMap(pgVersion, server.labels, metricMaps)
564+
labels := make(prometheus.Labels)
565+
server.labelsMtx.RLock()
566+
for key, value := range server.labels {
567+
labels[key] = value
568+
}
569+
server.labelsMtx.RUnlock()
570+
571+
partialExporterMap := makeDescMap(pgVersion, labels, metricMaps)
565572

566573
// Merge the two maps (which are now quite flatteend)
567574
for k, v := range partialExporterMap {
575+
server.metricMapMtx.RLock()
568576
_, found := server.metricMap[k]
577+
server.metricMapMtx.RUnlock()
569578
if found {
570579
log.Debugln("Overriding metric", k, "from user YAML file.")
571580
} else {
572581
log.Debugln("Adding new metric", k, "from user YAML file.")
573582
}
583+
server.metricMapMtx.Lock()
574584
server.metricMap[k] = v
585+
server.metricMapMtx.Unlock()
575586
}
576587

577588
// Merge the query override map
578589
for k, v := range newQueryOverrides {
590+
server.queryOverridesMtx.RLock()
579591
_, found := server.queryOverrides[k]
592+
server.queryOverridesMtx.RUnlock()
580593
if found {
581594
log.Debugln("Overriding query override", k, "from user YAML file.")
582595
} else {
583596
log.Debugln("Adding new query override", k, "from user YAML file.")
584597
}
598+
server.queryOverridesMtx.Lock()
585599
server.queryOverrides[k] = v
600+
server.queryOverridesMtx.Unlock()
586601
}
587602
return nil
588603
}
@@ -848,21 +863,26 @@ type cachedMetrics struct {
848863
// Server describes a connection to Postgres.
849864
// Also it contains metrics map and query overrides.
850865
type Server struct {
851-
db *sql.DB
852-
labels prometheus.Labels
853-
master bool
866+
m sync.RWMutex
867+
db *sql.DB
868+
labels prometheus.Labels
869+
labelsMtx sync.RWMutex
870+
master bool
871+
masterMtx sync.RWMutex
854872

855873
// Last version used to calculate metric map. If mismatch on scrape,
856874
// then maps are recalculated.
857-
lastMapVersion semver.Version
875+
lastMapVersion semver.Version
876+
lastMapVersionMtx sync.RWMutex
858877
// Currently active metric map
859-
metricMap map[string]MetricMapNamespace
878+
metricMap map[string]MetricMapNamespace
879+
metricMapMtx sync.RWMutex
860880
// Currently active query overrides
861-
queryOverrides map[string]string
862-
mappingMtx sync.RWMutex
881+
queryOverrides map[string]string
882+
queryOverridesMtx sync.RWMutex
863883
// Currently cached metrics
864-
metricCache map[string]cachedMetrics
865-
cacheMtx sync.Mutex
884+
metricCache map[string]cachedMetrics
885+
metricCacheMtx sync.Mutex
866886
}
867887

868888
// ServerOpt configures a server.
@@ -932,12 +952,12 @@ func (s *Server) String() string {
932952

933953
// Scrape loads metrics.
934954
func (s *Server) Scrape(ch chan<- prometheus.Metric, disableSettingsMetrics bool) error {
935-
s.mappingMtx.RLock()
936-
defer s.mappingMtx.RUnlock()
937-
938955
var err error
939956

940-
if !disableSettingsMetrics && s.master {
957+
s.masterMtx.RLock()
958+
master := s.master
959+
s.masterMtx.RUnlock()
960+
if !disableSettingsMetrics && master {
941961
if err = querySettings(ch, s); err != nil {
942962
err = fmt.Errorf("error retrieving settings: %s", err)
943963
}
@@ -968,8 +988,6 @@ func NewServers(opts ...ServerOpt) *Servers {
968988

969989
// GetServer returns established connection from a collection.
970990
func (s *Servers) GetServer(dsn string) (*Server, error) {
971-
s.m.Lock()
972-
defer s.m.Unlock()
973991
var err error
974992
var ok bool
975993
errCount := 0 // start at zero because we increment before doing work
@@ -979,17 +997,24 @@ func (s *Servers) GetServer(dsn string) (*Server, error) {
979997
if errCount++; errCount > retries {
980998
return nil, err
981999
}
1000+
s.m.Lock()
9821001
server, ok = s.servers[dsn]
1002+
s.m.Unlock()
9831003
if !ok {
9841004
server, err = NewServer(dsn, s.opts...)
9851005
if err != nil {
9861006
time.Sleep(time.Duration(errCount) * time.Second)
9871007
continue
9881008
}
1009+
s.m.Lock()
9891010
s.servers[dsn] = server
1011+
s.m.Unlock()
9901012
}
1013+
9911014
if err = server.Ping(); err != nil {
1015+
s.m.Lock()
9921016
delete(s.servers, dsn)
1017+
s.m.Unlock()
9931018
time.Sleep(time.Duration(errCount) * time.Second)
9941019
continue
9951020
}
@@ -1246,7 +1271,9 @@ func queryDatabases(server *Server) ([]string, error) {
12461271
// the scrape fails, and a slice of errors if they were non-fatal.
12471272
func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNamespace) ([]prometheus.Metric, []error, error) {
12481273
// Check for a query override for this namespace
1274+
server.queryOverridesMtx.RLock()
12491275
query, found := server.queryOverrides[namespace]
1276+
server.queryOverridesMtx.RUnlock()
12501277

12511278
// Was this query disabled (i.e. nothing sensible can be queried on cu
12521279
// version of PostgreSQL?
@@ -1266,6 +1293,7 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa
12661293
} else {
12671294
rows, err = server.db.Query(query) // nolint: safesql
12681295
}
1296+
12691297
if err != nil {
12701298
return []prometheus.Metric{}, []error{}, fmt.Errorf("Error running query on database %q: %s %v", server, namespace, err)
12711299
}
@@ -1326,7 +1354,13 @@ func queryNamespaceMapping(server *Server, namespace string, mapping MetricMapNa
13261354
} else {
13271355
// Unknown metric. Report as untyped if scan to float64 works, else note an error too.
13281356
metricLabel := fmt.Sprintf("%s_%s", namespace, columnName)
1329-
desc := prometheus.NewDesc(metricLabel, fmt.Sprintf("Unknown metric from %s", namespace), mapping.labels, server.labels)
1357+
serverLabels := make(prometheus.Labels)
1358+
server.labelsMtx.RLock()
1359+
for key, value := range server.labels {
1360+
serverLabels[key] = value
1361+
}
1362+
server.labelsMtx.RUnlock()
1363+
desc := prometheus.NewDesc(metricLabel, fmt.Sprintf("Unknown metric from %s", namespace), mapping.labels, serverLabels)
13301364

13311365
// Its not an error to fail here, since the values are
13321366
// unexpected anyway.
@@ -1351,19 +1385,24 @@ func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[str
13511385

13521386
scrapeStart := time.Now()
13531387

1354-
for namespace, mapping := range server.metricMap {
1388+
server.metricMapMtx.RLock()
1389+
metricMap := server.metricMap
1390+
server.metricMapMtx.RUnlock()
1391+
for namespace, mapping := range metricMap {
13551392
log.Debugln("Querying namespace: ", namespace)
13561393

1357-
if mapping.master && !server.master {
1394+
server.metricMapMtx.RLock()
1395+
master := server.master
1396+
server.metricMapMtx.RUnlock()
1397+
if mapping.master && !master {
13581398
log.Debugln("Query skipped...")
13591399
continue
13601400
}
1361-
13621401
scrapeMetric := false
13631402
// Check if the metric is cached
1364-
server.cacheMtx.Lock()
1403+
server.metricCacheMtx.Lock()
13651404
cachedMetric, found := server.metricCache[namespace]
1366-
server.cacheMtx.Unlock()
1405+
server.metricCacheMtx.Unlock()
13671406
// If found, check if needs refresh from cache
13681407
if found {
13691408
if scrapeStart.Sub(cachedMetric.lastScrape).Seconds() > float64(mapping.cacheSeconds) {
@@ -1402,12 +1441,12 @@ func queryNamespaceMappings(ch chan<- prometheus.Metric, server *Server) map[str
14021441
if scrapeMetric {
14031442
// Only cache if metric is meaningfully cacheable
14041443
if mapping.cacheSeconds > 0 {
1405-
server.cacheMtx.Lock()
1444+
server.metricCacheMtx.Lock()
14061445
server.metricCache[namespace] = cachedMetrics{
14071446
metrics: metrics,
14081447
lastScrape: scrapeStart,
14091448
}
1410-
server.cacheMtx.Unlock()
1449+
server.metricCacheMtx.Unlock()
14111450
}
14121451
}
14131452
}
@@ -1432,21 +1471,52 @@ func (e *Exporter) checkMapVersions(ch chan<- prometheus.Metric, server *Server)
14321471
log.Warnf("PostgreSQL version is lower on %q then our lowest supported version! Got %s minimum supported is %s.", server, semanticVersion, lowestSupportedVersion)
14331472
}
14341473

1474+
server.lastMapVersionMtx.RLock()
1475+
lastMapVersion := server.lastMapVersion
1476+
server.lastMapVersionMtx.RUnlock()
1477+
1478+
metricMap := make(map[string]MetricMapNamespace)
1479+
server.metricMapMtx.RLock()
1480+
for key, value := range server.metricMap {
1481+
metricMap[key] = value
1482+
}
1483+
server.lastMapVersionMtx.RUnlock()
1484+
1485+
server.masterMtx.RLock()
1486+
master := server.master
1487+
server.masterMtx.RUnlock()
1488+
1489+
labels := make(prometheus.Labels)
1490+
server.labelsMtx.RLock()
1491+
for key, value := range server.labels {
1492+
labels[key] = value
1493+
}
1494+
server.labelsMtx.RUnlock()
1495+
14351496
// Check if semantic version changed and recalculate maps if needed.
1436-
if semanticVersion.NE(server.lastMapVersion) || server.metricMap == nil {
1497+
if semanticVersion.NE(lastMapVersion) || metricMap == nil {
14371498
log.Infof("Semantic Version Changed on %q: %s -> %s", server, server.lastMapVersion, semanticVersion)
1438-
server.mappingMtx.Lock()
14391499

14401500
// Get Default Metrics only for master database
1441-
if !e.disableDefaultMetrics && server.master {
1442-
server.metricMap = makeDescMap(semanticVersion, server.labels, e.builtinMetricMaps)
1501+
if !e.disableDefaultMetrics && master {
1502+
server.metricMapMtx.Lock()
1503+
server.metricMap = makeDescMap(semanticVersion, labels, e.builtinMetricMaps)
1504+
server.metricMapMtx.Unlock()
1505+
server.queryOverridesMtx.Lock()
14431506
server.queryOverrides = makeQueryOverrideMap(semanticVersion, queryOverrides)
1507+
server.queryOverridesMtx.Unlock()
14441508
} else {
1509+
server.metricMapMtx.Lock()
14451510
server.metricMap = make(map[string]MetricMapNamespace)
1511+
server.metricMapMtx.Unlock()
1512+
server.queryOverridesMtx.Lock()
14461513
server.queryOverrides = make(map[string]string)
1514+
server.queryOverridesMtx.Unlock()
14471515
}
14481516

1517+
server.lastMapVersionMtx.Lock()
14491518
server.lastMapVersion = semanticVersion
1519+
server.lastMapVersionMtx.Unlock()
14501520

14511521
if e.userQueriesPath[HR] != "" || e.userQueriesPath[MR] != "" || e.userQueriesPath[LR] != "" {
14521522
// Clear the metric while a reload is happening
@@ -1458,18 +1528,17 @@ func (e *Exporter) checkMapVersions(ch chan<- prometheus.Metric, server *Server)
14581528
e.loadCustomQueries(res, semanticVersion, server)
14591529
}
14601530
}
1461-
1462-
server.mappingMtx.Unlock()
14631531
}
14641532

14651533
// Output the version as a special metric only for master database
14661534
versionDesc := prometheus.NewDesc(fmt.Sprintf("%s_%s", namespace, staticLabelName),
1467-
"Version string as reported by postgres", []string{"version", "short_version"}, server.labels)
1535+
"Version string as reported by postgres", []string{"version", "short_version"}, labels)
14681536

1469-
if !e.disableDefaultMetrics && server.master {
1537+
if !e.disableDefaultMetrics && master {
14701538
ch <- prometheus.MustNewConstMetric(versionDesc,
14711539
prometheus.UntypedValue, 1, versionString, semanticVersion.String())
14721540
}
1541+
14731542
return nil
14741543
}
14751544

@@ -1574,7 +1643,9 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
15741643
}
15751644

15761645
// If autoDiscoverDatabases is true, set first dsn as master database (Default: false)
1646+
server.masterMtx.Lock()
15771647
server.master = true
1648+
server.masterMtx.Unlock()
15781649

15791650
databaseNames, err := queryDatabases(server)
15801651
if err != nil {
@@ -1609,7 +1680,9 @@ func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
16091680

16101681
// Check if autoDiscoverDatabases is false, set dsn as master database (Default: false)
16111682
if !e.autoDiscoverDatabases {
1683+
server.masterMtx.Lock()
16121684
server.master = true
1685+
server.masterMtx.Unlock()
16131686
}
16141687

16151688
// Check if map versions need to be updated

0 commit comments

Comments
 (0)