Skip to content

Commit

Permalink
Merge branch 'main' into PMM-12897-move-db-specific-metrics-to-low-re…
Browse files Browse the repository at this point in the history
…solution
  • Loading branch information
artemgavrilov authored Mar 7, 2024
2 parents 3ee619d + 1b1eb48 commit 6635172
Show file tree
Hide file tree
Showing 11 changed files with 396 additions and 292 deletions.
43 changes: 31 additions & 12 deletions cmd/postgres_exporter/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,16 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
level.Error(logger).Log("msg", "Unable to parse DSN as either URI or connstring", "dsn", loggableDSN(dsn))
continue
}

server, err := e.servers.GetServer(dsn, e.resolutionEnabled)
if err != nil {
level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err)
continue
}
dsns[dsn] = struct{}{}

// If autoDiscoverDatabases is true, set first dsn as master database (Default: false)
server.master = true
e.masterDSN = dsn

databaseNames, err := queryDatabases(server)
databaseNames, err := e.getDatabaseNames(dsn)
if err != nil {
level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err)
continue
}

for _, databaseName := range databaseNames {
if contains(e.excludeDatabases, databaseName) {
continue
Expand Down Expand Up @@ -99,15 +93,40 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
return result
}

func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
server, err := e.servers.GetServer(dsn, e.resolutionEnabled)
func (e *Exporter) getDatabaseNames(dsn string) ([]string, error) {
if e.connSema != nil {
if err := e.connSema.Acquire(e.ctx, 1); err != nil {
level.Warn(logger).Log("msg", "Failed to acquire semaphore", "err", err)
return nil, err
}
defer e.connSema.Release(1)
}

server, err := e.GetServer(dsn)
if err != nil {
level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err)
return nil, err
}
defer server.Close()

dbNames, err := queryDatabases(e.ctx, server)
if err != nil {
level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err)
return nil, err
}

return dbNames, nil
}

func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
server, err := e.GetServer(dsn)
if err != nil {
return &ErrorConnectToServer{fmt.Sprintf("Error opening connection to database (%s): %s", loggableDSN(dsn), err.Error())}
}
defer server.Close()

// Check if autoDiscoverDatabases is false, set dsn as master database (Default: false)
if !e.autoDiscoverDatabases {
if !e.autoDiscoverDatabases || e.masterDSN == dsn {
server.master = true
}

Expand Down
146 changes: 10 additions & 136 deletions cmd/postgres_exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,22 @@ package main
import (
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"strings"

_ "net/http/pprof"

"github.com/alecthomas/kingpin/v2"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus-community/postgres_exporter/collector"
"github.com/prometheus-community/postgres_exporter/config"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/promlog"
"github.com/prometheus/common/promlog/flag"
"github.com/prometheus/common/version"
"github.com/prometheus/exporter-toolkit/web"
"github.com/prometheus/exporter-toolkit/web/kingpinflag"
"golang.org/x/sync/semaphore"
)

var (
Expand All @@ -51,7 +49,7 @@ var (
disableDefaultMetrics = kingpin.Flag("disable-default-metrics", "Do not include default metrics.").Default("false").Envar("PG_EXPORTER_DISABLE_DEFAULT_METRICS").Bool()
disableSettingsMetrics = kingpin.Flag("disable-settings-metrics", "Do not include pg_settings metrics.").Default("false").Envar("PG_EXPORTER_DISABLE_SETTINGS_METRICS").Bool()
autoDiscoverDatabases = kingpin.Flag("auto-discover-databases", "Whether to discover the databases on a server dynamically. (DEPRECATED)").Default("false").Envar("PG_EXPORTER_AUTO_DISCOVER_DATABASES").Bool()
//queriesPath = kingpin.Flag("extend.query-path", "Path to custom queries to run. (DEPRECATED)").Default("").Envar("PG_EXPORTER_EXTEND_QUERY_PATH").String()
// queriesPath = kingpin.Flag("extend.query-path", "Path to custom queries to run. (DEPRECATED)").Default("").Envar("PG_EXPORTER_EXTEND_QUERY_PATH").String()
onlyDumpMaps = kingpin.Flag("dumpmaps", "Do not run, simply dump the maps.").Bool()
constantLabelsList = kingpin.Flag("constantLabels", "A list of label=value separated by comma(,). (DEPRECATED)").Default("").Envar("PG_EXPORTER_CONSTANT_LABELS").String()
excludeDatabases = kingpin.Flag("exclude-databases", "A list of databases to remove when autoDiscoverDatabases is enabled (DEPRECATED)").Default("").Envar("PG_EXPORTER_EXCLUDE_DATABASES").String()
Expand Down Expand Up @@ -103,9 +101,9 @@ func main() {
excludedDatabases := strings.Split(*excludeDatabases, ",")
logger.Log("msg", "Excluded databases", "databases", fmt.Sprintf("%v", excludedDatabases))

//if *queriesPath != "" {
// if *queriesPath != "" {
// level.Warn(logger).Log("msg", "The extended queries.yaml config is DEPRECATED", "file", *queriesPath)
//}
// }

if *autoDiscoverDatabases || *excludeDatabases != "" || *includeDatabases != "" {
level.Warn(logger).Log("msg", "Scraping additional databases via auto discovery is DEPRECATED")
Expand All @@ -115,65 +113,18 @@ func main() {
level.Warn(logger).Log("msg", "Constant labels on all metrics is DEPRECATED")
}

servers := NewServers(ServerWithLabels(parseConstLabels(*constantLabelsList)))

opts := []ExporterOpt{
CollectorName("exporter"),
DisableDefaultMetrics(*disableDefaultMetrics),
DisableSettingsMetrics(*disableSettingsMetrics),
AutoDiscoverDatabases(*autoDiscoverDatabases),
WithConstantLabels(*constantLabelsList),
WithServers(servers),
ExcludeDatabases(excludedDatabases),
IncludeDatabases(*includeDatabases),
}

exporter := NewExporter(dsns, opts...)
defer func() {
exporter.servers.Close()
}()

versionCollector := version.NewCollector(exporterName)
prometheus.MustRegister(versionCollector)

prometheus.MustRegister(exporter)

// TODO(@sysadmind): Remove this with multi-target support. We are removing multiple DSN support
dsn := ""
if len(dsns) > 0 {
dsn = dsns[0]
}

cleanup, hr, mr, lr := initializePerconaExporters(dsns, servers)
defer cleanup()

pe, err := collector.NewPostgresCollector(
logger,
excludedDatabases,
dsn,
[]string{},
)
if err != nil {
level.Warn(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error())
} else {
prometheus.MustRegister(pe)
}

psCollector := collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})
goCollector := collectors.NewGoCollector()

promHandler := newHandler(map[string]prometheus.Collector{
"exporter": exporter,
"custom_query.hr": hr,
"custom_query.mr": mr,
"custom_query.lr": lr,
globalCollectors := map[string]prometheus.Collector{
"standard.process": psCollector,
"standard.go": goCollector,
"version": versionCollector,
"postgres": pe,
})
}

http.Handle(*metricsPath, promHandler)
connSema := semaphore.NewWeighted(*maxConnections)
http.Handle(*metricsPath, Handler(logger, dsns, connSema, globalCollectors))

if *metricsPath != "/" && *metricsPath != "" {
landingConfig := web.LandingConfig{
Expand All @@ -195,7 +146,7 @@ func main() {
http.Handle("/", landingPage)
}

http.HandleFunc("/probe", handleProbe(logger, excludedDatabases))
http.HandleFunc("/probe", handleProbe(logger, excludedDatabases, connSema))

level.Info(logger).Log("msg", "Listening on address", "address", *webConfig.WebListenAddresses)
srv := &http.Server{}
Expand All @@ -204,80 +155,3 @@ func main() {
os.Exit(1)
}
}

// handler wraps an unfiltered http.Handler but uses a filtered handler,
// created on the fly, if filtering is requested. Create instances with
// newHandler. It used for collectors filtering.
type handler struct {
unfilteredHandler http.Handler
collectors map[string]prometheus.Collector
}

func newHandler(collectors map[string]prometheus.Collector) *handler {
h := &handler{collectors: collectors}

innerHandler, err := h.innerHandler()
if err != nil {
level.Error(logger).Log("msg", "Couldn't create metrics handler", "error", err)
os.Exit(1)
}

h.unfilteredHandler = innerHandler
return h
}

// ServeHTTP implements http.Handler.
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
filters := r.URL.Query()["collect[]"]
level.Debug(logger).Log("msg", "Collect query", "filters", filters)

if len(filters) == 0 {
// No filters, use the prepared unfiltered handler.
h.unfilteredHandler.ServeHTTP(w, r)
return
}

filteredHandler, err := h.innerHandler(filters...)
if err != nil {
level.Warn(logger).Log("msg", "Couldn't create filtered metrics handler", "error", err)
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(fmt.Sprintf("Couldn't create filtered metrics handler: %s", err))) // nolint: errcheck
return
}

filteredHandler.ServeHTTP(w, r)
}

func (h *handler) innerHandler(filters ...string) (http.Handler, error) {
registry := prometheus.NewRegistry()

// register all collectors by default.
if len(filters) == 0 {
for name, c := range h.collectors {
if err := registry.Register(c); err != nil {
return nil, err
}
level.Debug(logger).Log("msg", "Collector was registered", "collector", name)
}
}

// register only filtered collectors.
for _, name := range filters {
if c, ok := h.collectors[name]; ok {
if err := registry.Register(c); err != nil {
return nil, err
}
level.Debug(logger).Log("msg", "Collector was registered", "collector", name)
}
}

handler := promhttp.HandlerFor(
registry,
promhttp.HandlerOpts{
//ErrorLog: log.NewNopLogger() .NewErrorLogger(),
ErrorHandling: promhttp.ContinueOnError,
},
)

return handler, nil
}
Loading

0 comments on commit 6635172

Please sign in to comment.