diff --git a/cmd/postgres_exporter/datasource.go b/cmd/postgres_exporter/datasource.go index 35abae8b1..cfc97aae1 100644 --- a/cmd/postgres_exporter/datasource.go +++ b/cmd/postgres_exporter/datasource.go @@ -48,22 +48,16 @@ func (e *Exporter) discoverDatabaseDSNs() []string { level.Error(logger).Log("msg", "Unable to parse DSN as either URI or connstring", "dsn", loggableDSN(dsn)) continue } - - server, err := e.servers.GetServer(dsn, e.resolutionEnabled) - if err != nil { - level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err) - continue - } dsns[dsn] = struct{}{} // If autoDiscoverDatabases is true, set first dsn as master database (Default: false) - server.master = true + e.masterDSN = dsn - databaseNames, err := queryDatabases(server) + databaseNames, err := e.getDatabaseNames(dsn) if err != nil { - level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err) continue } + for _, databaseName := range databaseNames { if contains(e.excludeDatabases, databaseName) { continue @@ -99,15 +93,40 @@ func (e *Exporter) discoverDatabaseDSNs() []string { return result } -func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error { - server, err := e.servers.GetServer(dsn, e.resolutionEnabled) +func (e *Exporter) getDatabaseNames(dsn string) ([]string, error) { + if e.connSema != nil { + if err := e.connSema.Acquire(e.ctx, 1); err != nil { + level.Warn(logger).Log("msg", "Failed to acquire semaphore", "err", err) + return nil, err + } + defer e.connSema.Release(1) + } + server, err := e.GetServer(dsn) + if err != nil { + level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err) + return nil, err + } + defer server.Close() + + dbNames, err := queryDatabases(e.ctx, server) + if err != nil { + level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err) + return nil, err + } + + return dbNames, nil +} + +func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error { + server, err := e.GetServer(dsn) if err != nil { return &ErrorConnectToServer{fmt.Sprintf("Error opening connection to database (%s): %s", loggableDSN(dsn), err.Error())} } + defer server.Close() // Check if autoDiscoverDatabases is false, set dsn as master database (Default: false) - if !e.autoDiscoverDatabases { + if !e.autoDiscoverDatabases || e.masterDSN == dsn { server.master = true } diff --git a/cmd/postgres_exporter/main.go b/cmd/postgres_exporter/main.go index 2577a0f30..431601418 100644 --- a/cmd/postgres_exporter/main.go +++ b/cmd/postgres_exporter/main.go @@ -16,24 +16,22 @@ package main import ( "fmt" "net/http" + _ "net/http/pprof" "os" "strings" - _ "net/http/pprof" - "github.com/alecthomas/kingpin/v2" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/prometheus-community/postgres_exporter/collector" "github.com/prometheus-community/postgres_exporter/config" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/promlog" "github.com/prometheus/common/promlog/flag" "github.com/prometheus/common/version" "github.com/prometheus/exporter-toolkit/web" "github.com/prometheus/exporter-toolkit/web/kingpinflag" + "golang.org/x/sync/semaphore" ) var ( @@ -51,7 +49,7 @@ var ( disableDefaultMetrics = kingpin.Flag("disable-default-metrics", "Do not include default metrics.").Default("false").Envar("PG_EXPORTER_DISABLE_DEFAULT_METRICS").Bool() disableSettingsMetrics = kingpin.Flag("disable-settings-metrics", "Do not include pg_settings metrics.").Default("false").Envar("PG_EXPORTER_DISABLE_SETTINGS_METRICS").Bool() autoDiscoverDatabases = kingpin.Flag("auto-discover-databases", "Whether to discover the databases on a server dynamically. (DEPRECATED)").Default("false").Envar("PG_EXPORTER_AUTO_DISCOVER_DATABASES").Bool() - //queriesPath = kingpin.Flag("extend.query-path", "Path to custom queries to run. (DEPRECATED)").Default("").Envar("PG_EXPORTER_EXTEND_QUERY_PATH").String() + // queriesPath = kingpin.Flag("extend.query-path", "Path to custom queries to run. (DEPRECATED)").Default("").Envar("PG_EXPORTER_EXTEND_QUERY_PATH").String() onlyDumpMaps = kingpin.Flag("dumpmaps", "Do not run, simply dump the maps.").Bool() constantLabelsList = kingpin.Flag("constantLabels", "A list of label=value separated by comma(,). (DEPRECATED)").Default("").Envar("PG_EXPORTER_CONSTANT_LABELS").String() excludeDatabases = kingpin.Flag("exclude-databases", "A list of databases to remove when autoDiscoverDatabases is enabled (DEPRECATED)").Default("").Envar("PG_EXPORTER_EXCLUDE_DATABASES").String() @@ -103,9 +101,9 @@ func main() { excludedDatabases := strings.Split(*excludeDatabases, ",") logger.Log("msg", "Excluded databases", "databases", fmt.Sprintf("%v", excludedDatabases)) - //if *queriesPath != "" { + // if *queriesPath != "" { // level.Warn(logger).Log("msg", "The extended queries.yaml config is DEPRECATED", "file", *queriesPath) - //} + // } if *autoDiscoverDatabases || *excludeDatabases != "" || *includeDatabases != "" { level.Warn(logger).Log("msg", "Scraping additional databases via auto discovery is DEPRECATED") @@ -115,65 +113,18 @@ func main() { level.Warn(logger).Log("msg", "Constant labels on all metrics is DEPRECATED") } - servers := NewServers(ServerWithLabels(parseConstLabels(*constantLabelsList))) - - opts := []ExporterOpt{ - CollectorName("exporter"), - DisableDefaultMetrics(*disableDefaultMetrics), - DisableSettingsMetrics(*disableSettingsMetrics), - AutoDiscoverDatabases(*autoDiscoverDatabases), - WithConstantLabels(*constantLabelsList), - WithServers(servers), - ExcludeDatabases(excludedDatabases), - IncludeDatabases(*includeDatabases), - } - - exporter := NewExporter(dsns, opts...) - defer func() { - exporter.servers.Close() - }() - versionCollector := version.NewCollector(exporterName) - prometheus.MustRegister(versionCollector) - - prometheus.MustRegister(exporter) - - // TODO(@sysadmind): Remove this with multi-target support. We are removing multiple DSN support - dsn := "" - if len(dsns) > 0 { - dsn = dsns[0] - } - - cleanup, hr, mr, lr := initializePerconaExporters(dsns, servers) - defer cleanup() - - pe, err := collector.NewPostgresCollector( - logger, - excludedDatabases, - dsn, - []string{}, - ) - if err != nil { - level.Warn(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error()) - } else { - prometheus.MustRegister(pe) - } - psCollector := collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}) goCollector := collectors.NewGoCollector() - promHandler := newHandler(map[string]prometheus.Collector{ - "exporter": exporter, - "custom_query.hr": hr, - "custom_query.mr": mr, - "custom_query.lr": lr, + globalCollectors := map[string]prometheus.Collector{ "standard.process": psCollector, "standard.go": goCollector, "version": versionCollector, - "postgres": pe, - }) + } - http.Handle(*metricsPath, promHandler) + connSema := semaphore.NewWeighted(*maxConnections) + http.Handle(*metricsPath, Handler(logger, dsns, connSema, globalCollectors)) if *metricsPath != "/" && *metricsPath != "" { landingConfig := web.LandingConfig{ @@ -195,7 +146,7 @@ func main() { http.Handle("/", landingPage) } - http.HandleFunc("/probe", handleProbe(logger, excludedDatabases)) + http.HandleFunc("/probe", handleProbe(logger, excludedDatabases, connSema)) level.Info(logger).Log("msg", "Listening on address", "address", *webConfig.WebListenAddresses) srv := &http.Server{} @@ -204,80 +155,3 @@ func main() { os.Exit(1) } } - -// handler wraps an unfiltered http.Handler but uses a filtered handler, -// created on the fly, if filtering is requested. Create instances with -// newHandler. It used for collectors filtering. -type handler struct { - unfilteredHandler http.Handler - collectors map[string]prometheus.Collector -} - -func newHandler(collectors map[string]prometheus.Collector) *handler { - h := &handler{collectors: collectors} - - innerHandler, err := h.innerHandler() - if err != nil { - level.Error(logger).Log("msg", "Couldn't create metrics handler", "error", err) - os.Exit(1) - } - - h.unfilteredHandler = innerHandler - return h -} - -// ServeHTTP implements http.Handler. -func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - filters := r.URL.Query()["collect[]"] - level.Debug(logger).Log("msg", "Collect query", "filters", filters) - - if len(filters) == 0 { - // No filters, use the prepared unfiltered handler. - h.unfilteredHandler.ServeHTTP(w, r) - return - } - - filteredHandler, err := h.innerHandler(filters...) - if err != nil { - level.Warn(logger).Log("msg", "Couldn't create filtered metrics handler", "error", err) - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(fmt.Sprintf("Couldn't create filtered metrics handler: %s", err))) // nolint: errcheck - return - } - - filteredHandler.ServeHTTP(w, r) -} - -func (h *handler) innerHandler(filters ...string) (http.Handler, error) { - registry := prometheus.NewRegistry() - - // register all collectors by default. - if len(filters) == 0 { - for name, c := range h.collectors { - if err := registry.Register(c); err != nil { - return nil, err - } - level.Debug(logger).Log("msg", "Collector was registered", "collector", name) - } - } - - // register only filtered collectors. - for _, name := range filters { - if c, ok := h.collectors[name]; ok { - if err := registry.Register(c); err != nil { - return nil, err - } - level.Debug(logger).Log("msg", "Collector was registered", "collector", name) - } - } - - handler := promhttp.HandlerFor( - registry, - promhttp.HandlerOpts{ - //ErrorLog: log.NewNopLogger() .NewErrorLogger(), - ErrorHandling: promhttp.ContinueOnError, - }, - ) - - return handler, nil -} diff --git a/cmd/postgres_exporter/percona_exporter.go b/cmd/postgres_exporter/percona_exporter.go index 09aa537af..bf3749d24 100644 --- a/cmd/postgres_exporter/percona_exporter.go +++ b/cmd/postgres_exporter/percona_exporter.go @@ -1,26 +1,33 @@ package main import ( + "context" "crypto/sha256" - "database/sql" "fmt" + stdlog "log" + "net/http" "os" "path/filepath" + "strconv" "strings" + "time" "github.com/alecthomas/kingpin/v2" "github.com/blang/semver/v4" + "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus-community/postgres_exporter/collector" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/sync/semaphore" ) type MetricResolution string const ( - DISABLED MetricResolution = "" - LR MetricResolution = "lr" - MR MetricResolution = "mr" - HR MetricResolution = "hr" + LR MetricResolution = "lr" + MR MetricResolution = "mr" + HR MetricResolution = "hr" ) var ( @@ -30,59 +37,188 @@ var ( collectCustomQueryLrDirectory = kingpin.Flag("collect.custom_query.lr.directory", "Path to custom queries with low resolution directory.").Envar("PG_EXPORTER_EXTEND_QUERY_LR_PATH").String() collectCustomQueryMrDirectory = kingpin.Flag("collect.custom_query.mr.directory", "Path to custom queries with medium resolution directory.").Envar("PG_EXPORTER_EXTEND_QUERY_MR_PATH").String() collectCustomQueryHrDirectory = kingpin.Flag("collect.custom_query.hr.directory", "Path to custom queries with high resolution directory.").Envar("PG_EXPORTER_EXTEND_QUERY_HR_PATH").String() + + maxConnections = kingpin.Flag("max-connections", "Maximum number of connections to use").Default("5").Envar("PG_EXPORTER_MAX_CONNECTIONS").Int64() ) -func initializePerconaExporters(dsn []string, servers *Servers) (func(), *Exporter, *Exporter, *Exporter) { +// Handler returns a http.Handler that serves metrics. Can be used instead of +// run for hooking up custom HTTP servers. +func Handler(logger log.Logger, dsns []string, connSema *semaphore.Weighted, globalCollectors map[string]prometheus.Collector) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + seconds, err := strconv.Atoi(r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds")) + // To support also older ones vmagents. + if err != nil { + seconds = 10 + } + + ctx, cancel := context.WithTimeout(r.Context(), time.Duration(seconds)*time.Second) + defer cancel() + + filters := r.URL.Query()["collect[]"] + level.Debug(logger).Log("msg", "Collect query", "filters", filters) + + var f Filters + if len(filters) == 0 { + f.EnableAllCollectors = true + } else { + for _, filter := range filters { + switch filter { + case "standard.process": + f.EnableProcessCollector = true + case "standard.go": + f.EnableGoCollector = true + case "version": + f.EnableVersionCollector = true + case "exporter": + f.EnableDefaultCollector = true + case "custom_query.hr": + f.EnableHRCollector = true + case "custom_query.mr": + f.EnableMRCollector = true + case "custom_query.lr": + f.EnableLRCollector = true + case "postgres": + f.EnablePostgresCollector = true + } + } + } + + registry := makeRegistry(ctx, dsns, connSema, globalCollectors, f) + + // Delegate http serving to Prometheus client library, which will call collector.Collect. + h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{ + ErrorHandling: promhttp.ContinueOnError, + ErrorLog: stdlog.New(log.NewStdlibAdapter(logger), "handler", 0), + }) + + h.ServeHTTP(w, r) + }) +} + +// Filters is a struct to enable or disable collectors. +type Filters struct { + EnableAllCollectors bool + EnableLRCollector bool + EnableMRCollector bool + EnableHRCollector bool + EnableDefaultCollector bool + EnableGoCollector bool + EnableVersionCollector bool + EnableProcessCollector bool + EnablePostgresCollector bool +} + +// makeRegistry creates a new prometheus registry with default and percona exporters. +func makeRegistry(ctx context.Context, dsns []string, connSema *semaphore.Weighted, globalCollectors map[string]prometheus.Collector, filters Filters) *prometheus.Registry { + registry := prometheus.NewRegistry() + + excludedDatabases := strings.Split(*excludeDatabases, ",") + logger.Log("msg", "Excluded databases", "databases", fmt.Sprintf("%v", excludedDatabases)) + queriesPath := map[MetricResolution]string{ HR: *collectCustomQueryHrDirectory, MR: *collectCustomQueryMrDirectory, LR: *collectCustomQueryLrDirectory, } - excludedDatabases := strings.Split(*excludeDatabases, ",") opts := []ExporterOpt{ - DisableDefaultMetrics(true), - DisableSettingsMetrics(true), AutoDiscoverDatabases(*autoDiscoverDatabases), - WithServers(servers), - WithUserQueriesPath(queriesPath), ExcludeDatabases(excludedDatabases), + WithConnectionsSemaphore(connSema), + WithContext(ctx), } - hrExporter := NewExporter(dsn, - append(opts, - CollectorName("custom_query.hr"), - WithUserQueriesResolutionEnabled(HR), - WithEnabled(*collectCustomQueryHr), - WithConstantLabels(*constantLabelsList), - )..., - ) - prometheus.MustRegister(hrExporter) - - mrExporter := NewExporter(dsn, - append(opts, - CollectorName("custom_query.mr"), - WithUserQueriesResolutionEnabled(MR), - WithEnabled(*collectCustomQueryMr), - WithConstantLabels(*constantLabelsList), - )..., - ) - prometheus.MustRegister(mrExporter) - - lrExporter := NewExporter(dsn, - append(opts, - CollectorName("custom_query.lr"), - WithUserQueriesResolutionEnabled(LR), - WithEnabled(*collectCustomQueryLr), - WithConstantLabels(*constantLabelsList), - )..., - ) - prometheus.MustRegister(lrExporter) - - return func() { - hrExporter.servers.Close() - mrExporter.servers.Close() - lrExporter.servers.Close() - }, hrExporter, mrExporter, lrExporter + + if filters.EnableAllCollectors || filters.EnableDefaultCollector { + defaultExporter := NewExporter(dsns, append( + opts, + CollectorName("exporter"), + WithConstantLabels(*constantLabelsList), // This option depends on collectors name, so keep it after CollectorName option + DisableDefaultMetrics(*disableDefaultMetrics), + DisableSettingsMetrics(*disableSettingsMetrics), + IncludeDatabases(*includeDatabases), + )...) + registry.MustRegister(defaultExporter) + } + + if filters.EnableAllCollectors || filters.EnableHRCollector { + hrExporter := NewExporter(dsns, + append(opts, + CollectorName("custom_query.hr"), + WithConstantLabels(*constantLabelsList), // This option depends on collectors name, so keep it after CollectorName option + WithUserQueriesEnabled(HR), + WithEnabled(*collectCustomQueryHr), + DisableDefaultMetrics(true), + DisableSettingsMetrics(true), + WithUserQueriesPath(queriesPath), + )...) + registry.MustRegister(hrExporter) + + } + + if filters.EnableAllCollectors || filters.EnableMRCollector { + mrExporter := NewExporter(dsns, + append(opts, + CollectorName("custom_query.mr"), + WithConstantLabels(*constantLabelsList), // This option depends on collectors name, so keep it after CollectorName option + WithUserQueriesEnabled(MR), + WithEnabled(*collectCustomQueryMr), + DisableDefaultMetrics(true), + DisableSettingsMetrics(true), + WithUserQueriesPath(queriesPath), + )...) + registry.MustRegister(mrExporter) + } + + if filters.EnableAllCollectors || filters.EnableLRCollector { + lrExporter := NewExporter(dsns, + append(opts, + CollectorName("custom_query.lr"), + WithConstantLabels(*constantLabelsList), // This option depends on collectors name, so keep it after CollectorName option + WithUserQueriesEnabled(LR), + WithEnabled(*collectCustomQueryLr), + DisableDefaultMetrics(true), + DisableSettingsMetrics(true), + WithUserQueriesPath(queriesPath), + )...) + registry.MustRegister(lrExporter) + } + + if filters.EnableAllCollectors || filters.EnableGoCollector { + registry.MustRegister(globalCollectors["standard.go"]) + } + + if filters.EnableAllCollectors || filters.EnableProcessCollector { + registry.MustRegister(globalCollectors["standard.process"]) + } + + if filters.EnableAllCollectors || filters.EnableVersionCollector { + registry.MustRegister(globalCollectors["version"]) + } + + if filters.EnableAllCollectors || filters.EnablePostgresCollector { + // This chunk moved here from main.go + // TODO(@sysadmind): Remove this with multi-target support. We are removing multiple DSN support + dsn := "" + if len(dsns) > 0 { + dsn = dsns[0] + } + + pe, err := collector.NewPostgresCollector( + logger, + excludedDatabases, + dsn, + []string{}, + collector.WithContext(ctx), + collector.WithConnectionsSemaphore(connSema), + ) + if err != nil { + level.Error(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error()) + } else { + registry.MustRegister(pe) + } + } + + return registry } func (e *Exporter) loadCustomQueries(res MetricResolution, version semver.Version, server *Server) { @@ -128,21 +264,3 @@ func (e *Exporter) addCustomQueriesFromFile(path string, version semver.Version, // Mark user queries as successfully loaded e.userQueriesError.WithLabelValues(path, hashsumStr).Set(0) } - -// NewDB establishes a new connection using DSN. -func NewDB(dsn string) (*sql.DB, error) { - fingerprint, err := parseFingerprint(dsn) - if err != nil { - return nil, err - } - - db, err := sql.Open("postgres", dsn) - if err != nil { - return nil, err - } - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) - - level.Info(logger).Log("msg", "Established new database connection", "fingerprint", fingerprint) - return db, nil -} diff --git a/cmd/postgres_exporter/postgres_exporter.go b/cmd/postgres_exporter/postgres_exporter.go index fa9468925..39dd9a79c 100644 --- a/cmd/postgres_exporter/postgres_exporter.go +++ b/cmd/postgres_exporter/postgres_exporter.go @@ -14,17 +14,21 @@ package main import ( + "context" "database/sql" "errors" "fmt" "math" "regexp" "strings" + "sync" + "sync/atomic" "time" "github.com/blang/semver/v4" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sync/semaphore" ) // ColumnUsage should be one of several enum values which describe how a @@ -432,7 +436,11 @@ type Exporter struct { // servers are used to allow re-using the DB connection between scrapes. // servers contains metrics map and query overrides. - servers *Servers + // servers *Servers + + connSema *semaphore.Weighted + ctx context.Context + masterDSN string } // ExporterOpt configures Exporter. @@ -452,20 +460,34 @@ func CollectorName(name string) ExporterOpt { } } -// WithUserQueriesResolutionEnabled enables resolution for user's queries. -func WithUserQueriesResolutionEnabled(p MetricResolution) ExporterOpt { +// WithUserQueriesEnabled enables user's queries. +func WithUserQueriesEnabled(p MetricResolution) ExporterOpt { return func(e *Exporter) { e.resolutionEnabled = p } } -// WithEnabled enables user's queries. +// WithUserQueriesEnabled enables user's queries. func WithEnabled(p bool) ExporterOpt { return func(e *Exporter) { e.enabled = p } } +// WithContext sets context for the exporter. +func WithContext(ctx context.Context) ExporterOpt { + return func(e *Exporter) { + e.ctx = ctx + } +} + +// WithConnectionsSemaphore sets the semaphore for limiting the number of connections to the database instance. +func WithConnectionsSemaphore(sem *semaphore.Weighted) ExporterOpt { + return func(e *Exporter) { + e.connSema = sem + } +} + // DisableSettingsMetrics configures pg_settings export. func DisableSettingsMetrics(b bool) ExporterOpt { return func(e *Exporter) { @@ -515,13 +537,6 @@ func WithConstantLabels(s string) ExporterOpt { } } -// WithServers configures constant labels. -func WithServers(s *Servers) ExporterOpt { - return func(e *Exporter) { - e.servers = s - } -} - func parseConstLabels(s string) prometheus.Labels { labels := make(prometheus.Labels) @@ -554,6 +569,7 @@ func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter { dsn: dsn, builtinMetricMaps: builtinMetricMaps, enabled: true, + ctx: context.Background(), } for _, opt := range opts { @@ -561,10 +577,38 @@ func NewExporter(dsn []string, opts ...ExporterOpt) *Exporter { } e.setupInternalMetrics() + // e.servers = NewServers(ServerWithLabels(e.constantLabels)) return e } +// GetServer returns a new Server instance for the provided DSN. +func (e *Exporter) GetServer(dsn string, opts ...ServerOpt) (*Server, error) { + var err error + errCount := 0 // start at zero because we increment before doing work + retries := 1 + var server *Server + for { + if errCount++; errCount > retries { + return nil, err + } + + server, err = NewServer(dsn, opts...) + if err != nil { + time.Sleep(time.Duration(errCount) * time.Second) + continue + } + + if err = server.Ping(); err != nil { + server.Close() + time.Sleep(time.Duration(errCount) * time.Second) + continue + } + break + } + return server, nil +} + func (e *Exporter) setupInternalMetrics() { e.duration = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, @@ -655,31 +699,30 @@ func (e *Exporter) checkMapVersions(ch chan<- prometheus.Metric, server *Server) } // Check if semantic version changed and recalculate maps if needed. - if semanticVersion.NE(server.lastMapVersion) || server.metricMap == nil { - level.Info(logger).Log("msg", "Semantic version changed", "server", server, "from", server.lastMapVersion, "to", semanticVersion) - server.mappingMtx.Lock() - - // Get Default Metrics only for master database - if !e.disableDefaultMetrics && server.master { - server.metricMap = makeDescMap(semanticVersion, server.labels, e.builtinMetricMaps) - server.queryOverrides = makeQueryOverrideMap(semanticVersion, queryOverrides) - } else { - server.metricMap = make(map[string]MetricMapNamespace) - server.queryOverrides = make(map[string]string) - } + // if semanticVersion.NE(server.lastMapVersion[e.resolutionEnabled]) || server.metricMap == nil { + // level.Info(logger).Log("msg", "Semantic version changed", "server", server, "from", server.lastMapVersion[e.resolutionEnabled], "to", semanticVersion) + server.mappingMtx.Lock() - server.lastMapVersion = semanticVersion - - if e.userQueriesPath[e.resolutionEnabled] != "" { - // Clear the metric while reload - e.userQueriesError.Reset() - } + // Get Default Metrics only for master database + if !e.disableDefaultMetrics && server.master { + server.metricMap = makeDescMap(semanticVersion, server.labels, e.builtinMetricMaps) + server.queryOverrides = makeQueryOverrideMap(semanticVersion, queryOverrides) + } else { + server.metricMap = make(map[string]MetricMapNamespace) + server.queryOverrides = make(map[string]string) + } - e.loadCustomQueries(e.resolutionEnabled, semanticVersion, server) + // server.lastMapVersion[e.resolutionEnabled] = semanticVersion - server.mappingMtx.Unlock() + if e.userQueriesPath[HR] != "" || e.userQueriesPath[MR] != "" || e.userQueriesPath[LR] != "" { + // Clear the metric while reload + e.userQueriesError.Reset() } + e.loadCustomQueries(e.resolutionEnabled, semanticVersion, server) + + server.mappingMtx.Unlock() + // Output the version as a special metric only for master database versionDesc := prometheus.NewDesc(fmt.Sprintf("%s_%s", namespace, staticLabelName), "Version string as reported by postgres", []string{"version", "short_version"}, server.labels) @@ -703,29 +746,45 @@ func (e *Exporter) scrape(ch chan<- prometheus.Metric) { dsns = e.discoverDatabaseDSNs() } - var errorsCount int - var connectionErrorsCount int + var errorsCount atomic.Int32 + var connectionErrorsCount atomic.Int32 + var wg sync.WaitGroup for _, dsn := range dsns { - if err := e.scrapeDSN(ch, dsn); err != nil { - errorsCount++ + dsn := dsn + wg.Add(1) + go func() { + defer wg.Done() + + if e.connSema != nil { + if err := e.connSema.Acquire(e.ctx, 1); err != nil { + level.Warn(logger).Log("msg", "Failed to acquire semaphore", "err", err) + return + } + defer e.connSema.Release(1) + } + if err := e.scrapeDSN(ch, dsn); err != nil { + errorsCount.Add(1) - level.Error(logger).Log("err", err) + level.Error(logger).Log("err", err) - if _, ok := err.(*ErrorConnectToServer); ok { - connectionErrorsCount++ + if _, ok := err.(*ErrorConnectToServer); ok { + connectionErrorsCount.Add(1) + } } - } + }() } + wg.Wait() + switch { - case connectionErrorsCount >= len(dsns): + case int(connectionErrorsCount.Load()) >= len(dsns): e.psqlUp.Set(0) default: e.psqlUp.Set(1) // Didn't fail, can mark connection as up for this scrape. } - switch errorsCount { + switch errorsCount.Load() { case 0: e.error.Set(0) default: diff --git a/cmd/postgres_exporter/postgres_exporter_integration_test.go b/cmd/postgres_exporter/postgres_exporter_integration_test.go index dadee782e..b24f76dda 100644 --- a/cmd/postgres_exporter/postgres_exporter_integration_test.go +++ b/cmd/postgres_exporter/postgres_exporter_integration_test.go @@ -62,11 +62,7 @@ func (s *IntegrationSuite) TestAllNamespacesReturnResults(c *C) { for _, dsn := range s.e.dsn { // Open a database connection - db, err := NewDB(dsn) - c.Assert(db, NotNil) - c.Assert(err, IsNil) - - server, err := NewServer(dsn, db) + server, err := NewServer(dsn) c.Assert(server, NotNil) c.Assert(err, IsNil) diff --git a/cmd/postgres_exporter/probe.go b/cmd/postgres_exporter/probe.go index a200ad2eb..b238c4799 100644 --- a/cmd/postgres_exporter/probe.go +++ b/cmd/postgres_exporter/probe.go @@ -23,9 +23,10 @@ import ( "github.com/prometheus-community/postgres_exporter/config" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/sync/semaphore" ) -func handleProbe(logger log.Logger, excludeDatabases []string) http.HandlerFunc { +func handleProbe(logger log.Logger, excludeDatabases []string, connSema *semaphore.Weighted) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() conf := c.GetConfig() @@ -69,21 +70,24 @@ func handleProbe(logger log.Logger, excludeDatabases []string) http.HandlerFunc DisableDefaultMetrics(*disableDefaultMetrics), DisableSettingsMetrics(*disableSettingsMetrics), AutoDiscoverDatabases(*autoDiscoverDatabases), - //WithUserQueriesPath(*queriesPath), + // WithUserQueriesPath(*queriesPath), WithConstantLabels(*constantLabelsList), ExcludeDatabases(excludeDatabases), IncludeDatabases(*includeDatabases), + WithContext(ctx), + WithConnectionsSemaphore(connSema), } dsns := []string{dsn.GetConnectionString()} exporter := NewExporter(dsns, opts...) - defer func() { - exporter.servers.Close() - }() + + // defer func() { + // exporter.servers.Close() + // }() registry.MustRegister(exporter) // Run the probe - pc, err := collector.NewProbeCollector(tl, excludeDatabases, registry, dsn) + pc, err := collector.NewProbeCollector(ctx, tl, excludeDatabases, registry, dsn, connSema) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/cmd/postgres_exporter/queries.go b/cmd/postgres_exporter/queries.go index 8338c0338..57a1b749b 100644 --- a/cmd/postgres_exporter/queries.go +++ b/cmd/postgres_exporter/queries.go @@ -14,6 +14,7 @@ package main import ( + "context" "errors" "fmt" @@ -278,8 +279,8 @@ func addQueries(content []byte, pgVersion semver.Version, server *Server) error return nil } -func queryDatabases(server *Server) ([]string, error) { - rows, err := server.db.Query("SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database() AND has_database_privilege(current_user, datname, 'connect')") +func queryDatabases(ctx context.Context, server *Server) ([]string, error) { + rows, err := server.db.QueryContext(ctx, "SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database() AND has_database_privilege(current_user, datname, 'connect')") if err != nil { return nil, fmt.Errorf("Error retrieving databases: %v", err) } diff --git a/cmd/postgres_exporter/server.go b/cmd/postgres_exporter/server.go index fe93cf112..574ce9f6e 100644 --- a/cmd/postgres_exporter/server.go +++ b/cmd/postgres_exporter/server.go @@ -59,12 +59,21 @@ func ServerWithLabels(labels prometheus.Labels) ServerOpt { } // NewServer establishes a new connection using DSN. -func NewServer(dsn string, db *sql.DB, opts ...ServerOpt) (*Server, error) { +func NewServer(dsn string, opts ...ServerOpt) (*Server, error) { fingerprint, err := parseFingerprint(dsn) if err != nil { return nil, err } + db, err := sql.Open("postgres", dsn) + if err != nil { + return nil, err + } + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + + level.Info(logger).Log("msg", "Established new database connection", "fingerprint", fingerprint) + s := &Server{ db: db, master: false, @@ -131,7 +140,6 @@ func (s *Server) Scrape(ch chan<- prometheus.Metric, disableSettingsMetrics bool type Servers struct { m sync.Mutex servers map[string]*Server - dbs map[string]*sql.DB opts []ServerOpt } @@ -139,47 +147,35 @@ type Servers struct { func NewServers(opts ...ServerOpt) *Servers { return &Servers{ servers: make(map[string]*Server), - dbs: make(map[string]*sql.DB), opts: opts, } } // GetServer returns established connection from a collection. -func (s *Servers) GetServer(dsn string, res MetricResolution) (*Server, error) { +func (s *Servers) GetServer(dsn string) (*Server, error) { s.m.Lock() defer s.m.Unlock() var err error var ok bool errCount := 0 // start at zero because we increment before doing work retries := 1 - var db *sql.DB var server *Server for { if errCount++; errCount > retries { return nil, err } - db, ok = s.dbs[dsn] - if !ok { - db, err = NewDB(dsn) - if err != nil { - time.Sleep(time.Duration(errCount) * time.Second) - continue - } - s.dbs[dsn] = db - } - key := dsn + ":" + string(res) - server, ok = s.servers[key] + server, ok = s.servers[dsn] if !ok { - server, err = NewServer(dsn, db, s.opts...) + server, err = NewServer(dsn, s.opts...) if err != nil { time.Sleep(time.Duration(errCount) * time.Second) continue } - s.servers[key] = server + s.servers[dsn] = server } if err = server.Ping(); err != nil { - delete(s.servers, key) - delete(s.dbs, dsn) + server.Close() + delete(s.servers, dsn) time.Sleep(time.Duration(errCount) * time.Second) continue } diff --git a/collector/collector.go b/collector/collector.go index 121129871..31ad66340 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -24,6 +24,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sync/semaphore" ) var ( @@ -92,6 +93,9 @@ type PostgresCollector struct { logger log.Logger instance *instance + + connSema *semaphore.Weighted + ctx context.Context } type Option func(*PostgresCollector) error @@ -157,6 +161,22 @@ func NewPostgresCollector(logger log.Logger, excludeDatabases []string, dsn stri return p, nil } +// WithContext sets context for the collector. +func WithContext(ctx context.Context) Option { + return func(c *PostgresCollector) error { + c.ctx = ctx + return nil + } +} + +// WithConnectionsSemaphore sets the semaphore for limiting the number of connections to the database instance. +func WithConnectionsSemaphore(sem *semaphore.Weighted) Option { + return func(c *PostgresCollector) error { + c.connSema = sem + return nil + } +} + // Describe implements the prometheus.Collector interface. func (p PostgresCollector) Describe(ch chan<- *prometheus.Desc) { ch <- scrapeDurationDesc @@ -165,11 +185,17 @@ func (p PostgresCollector) Describe(ch chan<- *prometheus.Desc) { // Collect implements the prometheus.Collector interface. func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { - ctx := context.TODO() - // copy the instance so that concurrent scrapes have independent instances inst := p.instance.copy() + if p.connSema != nil { + if err := p.connSema.Acquire(p.ctx, 1); err != nil { + level.Warn(p.logger).Log("msg", "Failed to acquire semaphore", "err", err) + return + } + defer p.connSema.Release(1) + } + // Set up the database connection for the collector. err := inst.setup() if err != nil { @@ -182,7 +208,7 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { wg.Add(len(p.Collectors)) for name, c := range p.Collectors { go func(name string, c Collector) { - execute(ctx, name, c, inst, ch, p.logger) + execute(p.ctx, name, c, inst, ch, p.logger) wg.Done() }(name, c) } diff --git a/collector/probe.go b/collector/probe.go index 4c0f0419b..5ac46aab4 100644 --- a/collector/probe.go +++ b/collector/probe.go @@ -21,6 +21,7 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus-community/postgres_exporter/config" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sync/semaphore" ) type ProbeCollector struct { @@ -28,9 +29,11 @@ type ProbeCollector struct { collectors map[string]Collector logger log.Logger instance *instance + connSema *semaphore.Weighted + ctx context.Context } -func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *prometheus.Registry, dsn config.DSN) (*ProbeCollector, error) { +func NewProbeCollector(ctx context.Context, logger log.Logger, excludeDatabases []string, registry *prometheus.Registry, dsn config.DSN, connSema *semaphore.Weighted) (*ProbeCollector, error) { collectors := make(map[string]Collector) initiatedCollectorsMtx.Lock() defer initiatedCollectorsMtx.Unlock() @@ -68,6 +71,8 @@ func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *p collectors: collectors, logger: logger, instance: instance, + connSema: connSema, + ctx: ctx, }, nil } @@ -75,6 +80,12 @@ func (pc *ProbeCollector) Describe(ch chan<- *prometheus.Desc) { } func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) { + if err := pc.connSema.Acquire(pc.ctx, 1); err != nil { + level.Warn(pc.logger).Log("msg", "Failed to acquire semaphore", "err", err) + return + } + defer pc.connSema.Release(1) + // Set up the database connection for the collector. err := pc.instance.setup() if err != nil { @@ -87,7 +98,7 @@ func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) { wg.Add(len(pc.collectors)) for name, c := range pc.collectors { go func(name string, c Collector) { - execute(context.TODO(), name, c, pc.instance, ch, pc.logger) + execute(pc.ctx, name, c, pc.instance, ch, pc.logger) wg.Done() }(name, c) } diff --git a/go.mod b/go.mod index 3868348b9..6c6e79469 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/smartystreets/goconvey v1.8.1 github.com/stretchr/testify v1.8.4 github.com/tklauser/go-sysconf v0.3.13 + golang.org/x/sync v0.2.0 golang.org/x/sys v0.18.0 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c gopkg.in/yaml.v2 v2.4.0 @@ -47,7 +48,6 @@ require ( golang.org/x/crypto v0.17.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/oauth2 v0.8.0 // indirect - golang.org/x/sync v0.2.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.32.0 // indirect