Skip to content

Commit 99a5780

Browse files
committed
MEDIUM: acme: Listen for HAProxy events on the master socket
For now, only the "newcert" event is handled. This allows HAProxy to save a newly generated certificate to disk using dataplaneapi. More events will be implemented in the future to support ACME's DNS challenges.
1 parent daff5f5 commit 99a5780

File tree

4 files changed

+333
-2
lines changed

4 files changed

+333
-2
lines changed

client-native/events.go

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
// Copyright 2025 HAProxy Technologies
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
16+
package cn
17+
18+
import (
19+
"context"
20+
"errors"
21+
"fmt"
22+
"io"
23+
"strings"
24+
"sync/atomic"
25+
"time"
26+
27+
clientnative "github.com/haproxytech/client-native/v6"
28+
"github.com/haproxytech/client-native/v6/runtime"
29+
"github.com/haproxytech/dataplaneapi/log"
30+
)
31+
32+
// Listen for HAProxy's event on the master socket.
33+
34+
// Events categgories
35+
const (
36+
EventAcme = "acme"
37+
)
38+
39+
type HAProxyEventListener struct {
40+
listener *runtime.EventListener
41+
client clientnative.HAProxyClient // for storage only
42+
rt runtime.Runtime
43+
stop atomic.Bool
44+
lastEvent time.Time
45+
}
46+
47+
var (
48+
ErrNoMasterSocket = errors.New("master socket not configured")
49+
ErrOldVersion = errors.New("this version of HAProxy does not support event sinks")
50+
)
51+
52+
func ListenHAProxyEvents(ctx context.Context, client clientnative.HAProxyClient) (*HAProxyEventListener, error) {
53+
rt, err := client.Runtime()
54+
if err != nil {
55+
return nil, err
56+
}
57+
if rt == nil || rt.IsStatsSocket() {
58+
return nil, ErrNoMasterSocket
59+
}
60+
61+
version, err := rt.GetVersion()
62+
if err != nil {
63+
return nil, err
64+
}
65+
// v3.2+
66+
if version.Major < 3 || (version.Major == 3 && version.Minor < 2) {
67+
return nil, ErrOldVersion
68+
}
69+
70+
el, err := newHAProxyEventListener(rt.SocketPath())
71+
if err != nil {
72+
return nil, err
73+
}
74+
75+
h := &HAProxyEventListener{
76+
client: client,
77+
listener: el,
78+
rt: rt,
79+
}
80+
81+
go h.listen(ctx)
82+
83+
log.Debugf("listening for HAProxy events on: %s", rt.SocketPath())
84+
85+
return h, nil
86+
}
87+
88+
// Reconfigure a running listener with a new Runtime.
89+
func (h *HAProxyEventListener) Reconfigure(ctx context.Context, rt runtime.Runtime) error {
90+
if rt == nil || rt.IsStatsSocket() {
91+
return ErrNoMasterSocket
92+
}
93+
94+
if rt.SocketPath() == h.rt.SocketPath() {
95+
// no need to restart the listener
96+
h.rt = rt
97+
return nil
98+
}
99+
100+
h.Reset()
101+
h.rt = rt
102+
h.stop.Store(false)
103+
go h.listen(ctx)
104+
105+
return nil
106+
}
107+
108+
func (h *HAProxyEventListener) Reset() {
109+
if h.listener != nil {
110+
if err := h.listener.Close(); err != nil {
111+
log.Warning(err)
112+
}
113+
h.listener = nil
114+
}
115+
}
116+
117+
func (h *HAProxyEventListener) Stop() error {
118+
h.stop.Store(true)
119+
return h.listener.Close()
120+
}
121+
122+
func newHAProxyEventListener(socketPath string) (*runtime.EventListener, error) {
123+
// This is both the connect and write timeout.
124+
// Use a small value here since at this point dataplane is supposed
125+
// to be already connected to the master socket.
126+
timeout := 3 * time.Second
127+
128+
el, err := runtime.NewEventListener("unix", socketPath, "dpapi", timeout, "-w", "-0")
129+
if err != nil {
130+
return nil, fmt.Errorf("could not listen to HAProxy's events: %w", err)
131+
}
132+
133+
return el, nil
134+
}
135+
136+
func (h *HAProxyEventListener) listen(ctx context.Context) {
137+
var err error
138+
retryAfter := 100 * time.Millisecond
139+
140+
for {
141+
if h.stop.Load() {
142+
// Stop requested.
143+
h.Reset()
144+
return
145+
}
146+
if h.listener == nil {
147+
h.listener, err = newHAProxyEventListener(h.rt.SocketPath())
148+
if err != nil {
149+
// Try again.
150+
log.Warning(err)
151+
time.Sleep(retryAfter)
152+
retryAfter *= 2
153+
if retryAfter == 51200*time.Millisecond {
154+
// Give up after 10 iterations.
155+
h.stop.Store(true)
156+
}
157+
continue
158+
}
159+
}
160+
161+
for {
162+
ev, err := h.listener.Listen(ctx)
163+
if err != nil {
164+
// EOF errors usually happen when HAProxy restarts, do not log.
165+
if !errors.Is(err, io.EOF) {
166+
log.Warning(err)
167+
}
168+
// Reset the connection.
169+
h.Reset()
170+
break
171+
}
172+
173+
h.handle(ev)
174+
175+
if h.listener == nil { // just in case
176+
break
177+
}
178+
}
179+
}
180+
}
181+
182+
func (h *HAProxyEventListener) handle(ev runtime.Event) {
183+
if !ev.Timestamp.After(h.lastEvent) {
184+
// Event already seen! Skip.
185+
log.Debugf("events: skipping already seen: '%s'", ev.String())
186+
return
187+
}
188+
h.lastEvent = ev.Timestamp
189+
190+
log.Debugf("events: new: '%s'", ev.String())
191+
192+
category, rest, ok := strings.Cut(ev.Message, " ")
193+
if !ok {
194+
log.Warningf("failed to parse HAProxy Event: '%s'", ev.Message)
195+
return
196+
}
197+
198+
if category == EventAcme {
199+
h.handleAcmeEvent(rest)
200+
return
201+
}
202+
203+
// Do not expect dataplaneapi to be able to handle all event types.
204+
log.Debugf("unknown HAProxy Event type: '%s'", ev.Message)
205+
}

client-native/events_acme.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2025 HAProxy Technologies
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
16+
package cn
17+
18+
import (
19+
"errors"
20+
"io"
21+
"path/filepath"
22+
"strings"
23+
24+
"github.com/haproxytech/client-native/v6/configuration"
25+
"github.com/haproxytech/dataplaneapi/log"
26+
)
27+
28+
const EventAcmeNewCert = "newcert"
29+
30+
func (h *HAProxyEventListener) handleAcmeEvent(message string) {
31+
name, args, ok := strings.Cut(message, " ")
32+
if !ok {
33+
log.Warningf("failed to parse ACME Event: '%s'", message)
34+
return
35+
}
36+
37+
if name == EventAcmeNewCert {
38+
h.handleAcmeNewCertEvent(args)
39+
return
40+
}
41+
42+
log.Debugf("unknown ACME Event type: '%s'", message)
43+
}
44+
45+
// HAProxy has created a new certificate and needs dpapi to write it to Storage.
46+
// ex: acme newcert foobar.pem.rsa
47+
func (h *HAProxyEventListener) handleAcmeNewCertEvent(args string) {
48+
if len(args) == 0 {
49+
log.Error("received HAProxy Event 'acme newcert' without a cert name")
50+
return
51+
}
52+
53+
// Do not use the certificate name in args as a storage name.
54+
// It could be an alias, or the user could have split keys and certs storage.
55+
56+
crt, err := h.rt.ShowCertificate(args)
57+
if err != nil {
58+
log.Errorf("events: acme newcert %s: %s", args, err.Error())
59+
return
60+
}
61+
62+
storage, err := h.client.SSLCertStorage()
63+
if err != nil {
64+
log.Error(err)
65+
return
66+
}
67+
68+
// 'dump ssl cert' can only be issued on sockets with level "admin".
69+
pem, err := h.rt.DumpCertificate(crt.StorageName)
70+
if err != nil {
71+
log.Errorf("events: acme newcert %s: dump cert: %s", args, err.Error())
72+
return
73+
}
74+
75+
// The storage API only wants the filename, while the runtime API uses paths.
76+
storageName := filepath.Base(crt.StorageName)
77+
78+
// Create or Replace the certificate.
79+
_, _, err = storage.Get(storageName)
80+
if err != nil {
81+
if errors.Is(err, configuration.ErrObjectDoesNotExist) {
82+
rc := io.NopCloser(strings.NewReader(pem))
83+
_, _, err = storage.Create(storageName, rc)
84+
}
85+
} else {
86+
_, err = storage.Replace(storageName, pem)
87+
}
88+
89+
if err != nil {
90+
log.Errorf("events: acme newcert %s: storage: %s", args, err.Error())
91+
return
92+
}
93+
94+
log.Debugf("events: OK: acme newcert %s => %s", args, crt.StorageName)
95+
}

configure_data_plane.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
"github.com/getkin/kin-openapi/openapi2"
3434
"github.com/getkin/kin-openapi/openapi2conv"
3535
"github.com/getkin/kin-openapi/openapi3"
36-
"github.com/go-openapi/errors"
36+
api_errors "github.com/go-openapi/errors"
3737
"github.com/go-openapi/runtime"
3838
"github.com/go-openapi/runtime/middleware"
3939
"github.com/go-openapi/swag"
@@ -79,6 +79,7 @@ var (
7979
AccLogger *log.Logger
8080
serverStartedCallback func()
8181
clientMutex sync.Mutex
82+
eventListener *cn.HAProxyEventListener
8283
)
8384

8485
func SetServerStartedCallback(callFunc func()) {
@@ -159,7 +160,7 @@ func configureAPI(api *operations.DataPlaneAPI) http.Handler { //nolint:cyclop,m
159160
// end overriding options with env variables
160161

161162
// configure the api here
162-
api.ServeError = errors.ServeError
163+
api.ServeError = api_errors.ServeError
163164

164165
// Set your custom logger if needed. Default one is log.Printf
165166
// Expected interface func(string, ...interface{})
@@ -192,6 +193,8 @@ func configureAPI(api *operations.DataPlaneAPI) http.Handler { //nolint:cyclop,m
192193

193194
initDataplaneStorage(haproxyOptions.DataplaneStorageDir, client)
194195

196+
configureEventListener(clientCtx, client)
197+
195198
users := dataplaneapi_config.GetUsersStore()
196199
// this is not part of GetUsersStore(),
197200
// in case of reload we need to reread users
@@ -1286,6 +1289,28 @@ func configureNativeClient(cyx context.Context, haproxyOptions dataplaneapi_conf
12861289
return client
12871290
}
12881291

1292+
func configureEventListener(ctx context.Context, client client_native.HAProxyClient) {
1293+
rt, err := client.Runtime()
1294+
if err != nil {
1295+
return
1296+
}
1297+
1298+
if eventListener != nil {
1299+
err = eventListener.Reconfigure(ctx, rt)
1300+
if err != nil {
1301+
// Stop the listener if the new conf has no master socket.
1302+
log.Info("Stopping the EventListener:", err.Error())
1303+
_ = eventListener.Stop()
1304+
}
1305+
} else {
1306+
// First start.
1307+
eventListener, err = cn.ListenHAProxyEvents(ctx, client)
1308+
if err != nil && err != cn.ErrNoMasterSocket && err != cn.ErrOldVersion {
1309+
log.Error("Failed to start HAProxy's event listener:", err.Error())
1310+
}
1311+
}
1312+
}
1313+
12891314
func handleSignals(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, client client_native.HAProxyClient, haproxyOptions dataplaneapi_config.HAProxyConfiguration, users *dataplaneapi_config.Users) {
12901315
for {
12911316
select {
@@ -1299,6 +1324,7 @@ func handleSignals(ctx context.Context, cancel context.CancelFunc, sigs chan os.
12991324
log.Infof("Unable to reload Data Plane API: %s", err.Error())
13001325
} else {
13011326
client.ReplaceRuntime(cn.ConfigureRuntimeClient(clientCtx, configuration, haproxyOptions))
1327+
configureEventListener(clientCtx, client)
13021328
log.Info("Reloaded Data Plane API")
13031329
}
13041330
} else if sig == syscall.SIGUSR2 {
@@ -1340,6 +1366,9 @@ func startWatcher(ctx context.Context, client client_native.HAProxyClient, hapro
13401366
reloadAgent.ReloadWithCallback(reconfigureFunc)
13411367
}
13421368

1369+
// Reconfigure the event listener if needed.
1370+
configureEventListener(ctx, client)
1371+
13431372
// get the last configuration which has been updated by reloadConfigurationFile and increment version in config file.
13441373
configuration, err := client.Configuration()
13451374
if err != nil {

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ require (
4343
gopkg.in/yaml.v2 v2.4.0
4444
)
4545

46+
replace github.com/haproxytech/client-native/v6 => ../client-native
47+
4648
require (
4749
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
4850
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.31 // indirect

0 commit comments

Comments
 (0)