Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b49e144
chore: go mod tidy
ademidoff Oct 16, 2025
ac6e846
PMM-14324 Eclude tests for go-consistent
ademidoff Oct 16, 2025
422773e
Merge branch 'pmm-ha' into PMM-14324-pmm-ha-monitoring-run-pmm-agent
ademidoff Oct 16, 2025
d4c801a
PMM_14324 Fix error printing syntax
ademidoff Oct 16, 2025
0fd2456
Merge branch 'PMM-14324-pmm-ha-monitoring-run-pmm-agent' of ssh://git…
ademidoff Oct 17, 2025
5a9ae78
PMM_14324 Rename the method for consistency
ademidoff Oct 17, 2025
9f3c5b1
PMM_14324 pmm-agent auto-start
ademidoff Oct 17, 2025
f30173f
PMM_14324 Pass context so transactions can be cancelled
ademidoff Oct 18, 2025
faf213a
PMM_14324 Use l.Info instead of l.Print
ademidoff Oct 18, 2025
c7a42a8
PMM_14324 Remove StandardService
ademidoff Oct 18, 2025
0d2643f
PMM_14324 Don't scrape remotes on standby nodes
ademidoff Oct 18, 2025
757151a
PMM_14324 Provide unique inventory node names in HA
ademidoff Oct 18, 2025
1cf37da
PMM_14324 Fix linter errors
ademidoff Oct 18, 2025
837631c
PMM-14324 Fix leaky DSN
ademidoff Oct 18, 2025
87ec23e
PMM-14324 Log in logfmt format with logrus
ademidoff Oct 18, 2025
8e9a690
PMM-14324 Fix QAN schema migration params
ademidoff Oct 20, 2025
46a47f8
PMM-13812 Fix fir templateFS driver.
JiriCtvrtka Oct 21, 2025
cecdedc
PMM-13812 Include custom cluster name in check for engine.
JiriCtvrtka Oct 21, 2025
e9a422a
PMM-13812 Lint.
JiriCtvrtka Oct 21, 2025
51decbb
PMM-13812 Lint.
JiriCtvrtka Oct 21, 2025
6493a09
PMM-13812 Remove methods not explicitly needed by driver.
JiriCtvrtka Oct 21, 2025
5148063
PMM-13812 Lint.
JiriCtvrtka Oct 21, 2025
76a3330
PMM-13812 Lint.
JiriCtvrtka Oct 21, 2025
cb65485
PMM-13812 Lint.
JiriCtvrtka Oct 21, 2025
20eba7d
Merge branch 'PMM-13812-ha-clickhouse' into PMM-14324-pmm-ha-monitoring
ademidoff Oct 21, 2025
a9ea40b
PMM-14305 Put back CH schema, add a test
ademidoff Oct 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 8 additions & 19 deletions managed/cmd/pmm-managed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ func main() { //nolint:maintidx,cyclop
SSLCAPath: *postgresSSLCAPathF,
SSLKeyPath: *postgresSSLKeyPathF,
SSLCertPath: *postgresSSLCertPathF,
HANodeID: *haNodeID,
}

sqlDB, err := models.OpenDB(setupParams)
Expand Down Expand Up @@ -823,10 +824,16 @@ func main() { //nolint:maintidx,cyclop

cleaner := clean.New(db)
externalRules := vmalert.NewExternalRules()
vmdb, err := victoriametrics.NewVictoriaMetrics(*victoriaMetricsConfigF, db, vmParams)
vmdb, err := victoriametrics.NewVictoriaMetrics(*victoriaMetricsConfigF, db, vmParams, haService)
if err != nil {
l.Panicf("VictoriaMetrics service problem: %+v", err)
}

// This ensures scrape config regeneration happens when leadership changes
if haParams.Enabled {
haService.AddLeaderService(vmdb)
}

vmalert, err := vmalert.NewVMAlert(externalRules, *victoriaMetricsVMAlertURLF)
if err != nil {
l.Panicf("VictoriaMetrics VMAlert service problem: %+v", err)
Expand All @@ -839,11 +846,6 @@ func main() { //nolint:maintidx,cyclop

agentsRegistry := agents.NewRegistry(db, vmParams)

// TODO remove once PMM cluster will be Active-Active
haService.AddLeaderService(ha.NewStandardService("agentsRegistry", func(_ context.Context) error { return nil }, func() {
agentsRegistry.KickAll(ctx)
}))

pbmPITRService := backup.NewPBMPITRService()
backupRemovalService := backup.NewRemovalService(db, pbmPITRService)
backupRetentionService := backup.NewRetentionService(db, backupRemovalService)
Expand Down Expand Up @@ -877,19 +879,6 @@ func main() { //nolint:maintidx,cyclop
HAParams: haParams,
})

haService.AddLeaderService(ha.NewStandardService("pmm-agent-runner", func(_ context.Context) error {
err := supervisord.StartSupervisedService("pmm-agent")
if err != nil {
l.Warnf("couldn't start pmm-agent: %q", err)
}
return err
}, func() {
err := supervisord.StopSupervisedService("pmm-agent")
if err != nil {
l.Warnf("couldn't stop pmm-agent: %q", err)
}
}))

platformAddress, err := envvars.GetPlatformAddress()
if err != nil {
l.Fatal(err)
Expand Down
10 changes: 8 additions & 2 deletions managed/models/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,7 @@ type SetupDBParams struct {
SSLCAPath string
SSLKeyPath string
SSLCertPath string
HANodeID string
SetupFixtures SetupFixturesMode
MigrationVersion *int
}
Expand Down Expand Up @@ -1462,14 +1463,19 @@ func setupPMMServerAgents(q *reform.Querier, params SetupDBParams) error {
return err
}
if params.Address != DefaultPostgreSQLAddr {
nodeName := PMMServerPostgreSQLNodeName
if params.HANodeID != "" {
nodeName = params.HANodeID
}
if node, err = CreateNode(q, RemoteNodeType, &CreateNodeParams{
NodeName: PMMServerPostgreSQLNodeName,
NodeName: nodeName,
Address: address,
}); err != nil {
return err
}
} else {
params.Name = "" // using postgres database in order to get metrics from entrypoint extension setup for QAN.
// using postgres database in order to get metrics from entrypoint extension setup for QAN.
params.Name = ""
}

// create PostgreSQL Service and associated Agents
Expand Down
4 changes: 2 additions & 2 deletions managed/models/node_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func createNodeWithID(q *reform.Querier, id string, nodeType NodeType, params *C
return nil, err
}

// do not check that machine-id is unique: https://jira.percona.com/browse/PMM-4196
// do not check that machine-id is unique: https://perconadev.atlassian.net/browse/PMM-4196

if nodeType == RemoteRDSNodeType {
if strings.Contains(params.InstanceID, ".") {
Expand All @@ -211,7 +211,7 @@ func createNodeWithID(q *reform.Querier, id string, nodeType NodeType, params *C
}

// Trim trailing \n received from broken 2.0.0 clients.
// See https://jira.percona.com/browse/PMM-4720
// See https://perconadev.atlassian.net/browse/PMM-4720
machineID := pointer.ToStringOrNil(strings.TrimSpace(pointer.GetString(params.MachineID)))

node := &Node{
Expand Down
2 changes: 1 addition & 1 deletion managed/services/agents/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// FIXME Rename to victoriaMetrics.Service, update tests.
type prometheusService interface {
RequestConfigurationUpdate()
BuildScrapeConfigForVMAgent(pmmAgentID string) ([]byte, error)
BuildScrapeConfigForVMAgent(ctx context.Context, pmmAgentID string) ([]byte, error)
}

// qanClient is a subset of methods of qan.Client used by this package.
Expand Down
7 changes: 0 additions & 7 deletions managed/services/agents/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,6 @@ func (r *Registry) Collect(ch chan<- prom.Metric) {
r.mClockDrift.Collect(ch)
}

// KickAll sends a signal to all registered agents in the registry to perform a kick action.
func (r *Registry) KickAll(ctx context.Context) {
for _, agentInfo := range r.agents {
r.Kick(ctx, agentInfo.id)
}
}

// check interfaces.
var (
_ prom.Collector = (*Registry)(nil)
Expand Down
2 changes: 1 addition & 1 deletion managed/services/agents/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (u *StateUpdater) sendSetStateRequest(ctx context.Context, agent *pmmAgentI
case models.PMMAgentType:
continue
case models.VMAgentType:
scrapeCfg, err := u.vmdb.BuildScrapeConfigForVMAgent(agent.id)
scrapeCfg, err := u.vmdb.BuildScrapeConfigForVMAgent(ctx, agent.id)
if err != nil {
return errors.Wrapf(err, "cannot get agent scrape config for agent: %s", agent.id)
}
Expand Down
4 changes: 2 additions & 2 deletions managed/services/alerting/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (s *Service) CreateTemplate(ctx context.Context, req *alerting.CreateTempla

templates, err := alert.Parse(strings.NewReader(req.Yaml), pParams)
if err != nil {
s.l.Errorf("failed to parse rule template form request: +%v", err)
s.l.Errorf("failed to parse rule template form request: %+v", err)
return nil, status.Errorf(codes.InvalidArgument, "Failed to parse rule template: %v.", err)
}

Expand Down Expand Up @@ -462,7 +462,7 @@ func (s *Service) UpdateTemplate(ctx context.Context, req *alerting.UpdateTempla

templates, err := alert.Parse(strings.NewReader(req.Yaml), parseParams)
if err != nil {
s.l.Errorf("failed to parse rule template form request: +%v", err)
s.l.Errorf("failed to parse rule template form request: %+v", err)
return nil, status.Error(codes.InvalidArgument, "Failed to parse rule template.")
}

Expand Down
15 changes: 7 additions & 8 deletions managed/services/ha/highavailability.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (s *Service) Run(ctx context.Context) error {
s.services.StartAllServices(ctx)
}
case <-ctx.Done():
s.services.StopRunningServices()
s.services.StopAllServices()
return
}
}
Expand Down Expand Up @@ -413,8 +413,7 @@ func (s *Service) runLeaderObserver(ctx context.Context) {
case isLeader := <-node.LeaderCh():
if isLeader {
s.services.StartAllServices(ctx)
// This node is the leader
s.l.Printf("I am the leader!")
s.l.Info("I am the leader!")
peers := s.memberlist.Members()
for _, peer := range peers {
if peer.Name == s.params.NodeID {
Expand All @@ -423,8 +422,8 @@ func (s *Service) runLeaderObserver(ctx context.Context) {
s.addMemberlistNodeToRaft(peer)
}
} else {
s.l.Printf("I am not a leader!")
s.services.StopRunningServices()
s.l.Info("I am not a leader!")
s.services.StopAllServices()
}
case <-t.C:
address, serverID := s.raftNode.LeaderWithID()
Expand All @@ -439,7 +438,7 @@ func (s *Service) runLeaderObserver(ctx context.Context) {
func (s *Service) AddLeaderService(leaderService LeaderService) {
err := s.services.Add(leaderService)
if err != nil {
s.l.Errorf("couldn't add HA service: +%v", err)
s.l.Errorf("couldn't add HA service: %+v", err)
}
}

Expand All @@ -460,14 +459,14 @@ func (s *Service) BroadcastMessage(message []byte) error {
return nil
}

// IsLeader checks if the current instance of the high availability service is the leader.
// IsLeader checks if the current instance of HA service is the leader.
func (s *Service) IsLeader() bool {
s.rw.RLock()
defer s.rw.RUnlock()
return (s.raftNode != nil && s.raftNode.State() == raft.Leader) || !s.params.Enabled
}

// Bootstrap performs the necessary steps to initialize the high availability service.
// Bootstrap returns true if HA service should bootstrap (true in non-HA setups).
func (s *Service) Bootstrap() bool {
return s.params.Bootstrap || !s.params.Enabled
}
38 changes: 0 additions & 38 deletions managed/services/ha/leaderservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,44 +27,6 @@ type LeaderService interface {
ID() string
}

// StandardService represents a standard service in the high-availability setup.
type StandardService struct {
id string

mu sync.Mutex

startFunc func(context.Context) error
stopFunc func()
}

// NewStandardService creates a new standard service.
func NewStandardService(id string, startFunc func(context.Context) error, stopFunc func()) *StandardService {
return &StandardService{
id: id,
startFunc: startFunc,
stopFunc: stopFunc,
}
}

// ID returns the ID of the standard service.
func (s *StandardService) ID() string {
return s.id
}

// Start starts the standard service.
func (s *StandardService) Start(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.startFunc(ctx)
}

// Stop stops the standard service.
func (s *StandardService) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
s.stopFunc()
}

// ContextService represents a context service.
type ContextService struct {
id string
Expand Down
11 changes: 9 additions & 2 deletions managed/services/ha/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type services struct {
l *logrus.Entry
}

// newServices creates a new services manager.
func newServices() *services {
return &services{
all: make(map[string]LeaderService),
Expand All @@ -44,13 +45,14 @@ func newServices() *services {
}
}

// Add registers a new leader service.
func (s *services) Add(service LeaderService) error {
s.rw.Lock()
defer s.rw.Unlock()

id := service.ID()
if _, ok := s.all[id]; ok {
return fmt.Errorf("service with id %s is already exist", id)
return fmt.Errorf("service with id %s already exists", id)
}
s.all[id] = service
select {
Expand All @@ -60,6 +62,7 @@ func (s *services) Add(service LeaderService) error {
return nil
}

// StartAllServices starts all registered services that are not currently running.
func (s *services) StartAllServices(ctx context.Context) {
s.rw.Lock()
defer s.rw.Unlock()
Expand All @@ -80,7 +83,8 @@ func (s *services) StartAllServices(ctx context.Context) {
}
}

func (s *services) StopRunningServices() {
// StopAllServices stops all running services.
func (s *services) StopAllServices() {
s.rw.Lock()
defer s.rw.Unlock()

Expand All @@ -92,14 +96,17 @@ func (s *services) StopRunningServices() {
}
}

// Refresh returns a channel that signals when services should be refreshed.
func (s *services) Refresh() chan struct{} {
return s.refresh
}

// Wait waits for all services to stop.
func (s *services) Wait() {
s.wg.Wait()
}

// removeService removes a service from the registry of running services.
func (s *services) removeService(id string) {
s.rw.Lock()
defer s.rw.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion managed/services/supervisord/pmm_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ redirect_stderr = true
priority = 15
command = /usr/sbin/pmm-agent --config-file=/usr/local/percona/pmm/config/pmm-agent.yaml --paths-tempdir=/srv/pmm-agent/tmp --paths-nomad-data-dir=/srv/nomad/data
autorestart = true
autostart = false
autostart = true
startretries = 1000
startsecs = 1
stopsignal = TERM
Expand Down
18 changes: 0 additions & 18 deletions managed/services/supervisord/supervisord.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,24 +413,6 @@ func (s *Service) UpdateConfiguration(settings *models.Settings, ssoDetails *mod
return err
}

// RestartSupervisedService restarts given service.
func (s *Service) RestartSupervisedService(serviceName string) error {
_, err := s.supervisorctl("restart", serviceName)
return err
}

// StartSupervisedService starts given service.
func (s *Service) StartSupervisedService(serviceName string) error {
_, err := s.supervisorctl("start", serviceName)
return err
}

// StopSupervisedService stops given service.
func (s *Service) StopSupervisedService(serviceName string) error {
_, err := s.supervisorctl("stop", serviceName)
return err
}

//nolint:lll
var templates = template.Must(template.New("").Option("missingkey=error").Parse(`

Expand Down
10 changes: 9 additions & 1 deletion managed/services/victoriametrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// AddScrapeConfigs - adds agents scrape configuration to given scrape config,
// pmm_agent_id and push_metrics used for filtering.
func AddScrapeConfigs(l *logrus.Entry, cfg *config.Config, q *reform.Querier, //nolint:cyclop,maintidx
globalResolutions *models.MetricsResolutions, pmmAgentID *string, pushMetrics bool,
globalResolutions *models.MetricsResolutions, pmmAgentID *string, pushMetrics bool, skipExternalAgents bool,
) error {
agents, err := models.FindAgentsForScrapeConfig(q, pmmAgentID, pushMetrics)
if err != nil {
Expand Down Expand Up @@ -187,6 +187,10 @@ func AddScrapeConfigs(l *logrus.Entry, cfg *config.Config, q *reform.Querier, //
continue

case models.RDSExporterType:
if skipExternalAgents && pointer.GetString(agent.RunsOnNodeID) == models.PMMServerNodeID {
l.Debugf("Skip the scrape config for RDSExporter %s running on PMM Server in HA non-leader mode", agent.AgentID)
continue
}
rdsParams = append(rdsParams, &scrapeConfigParams{
host: paramsHost,
node: paramsNode,
Expand All @@ -197,6 +201,10 @@ func AddScrapeConfigs(l *logrus.Entry, cfg *config.Config, q *reform.Querier, //
continue

case models.ExternalExporterType:
if skipExternalAgents && pointer.GetString(agent.RunsOnNodeID) == models.PMMServerNodeID {
l.Debugf("Skip the scrape config for ExternalExporter %s running on PMM Server in HA non-leader mode", agent.AgentID)
continue
}
scfgs, err = scrapeConfigsForExternalExporter(&mr, &scrapeConfigParams{
host: paramsHost,
node: paramsNode,
Expand Down
Loading
Loading