-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathconsulwatcher.go
510 lines (367 loc) · 14.6 KB
/
consulwatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
package caddyconsul
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"sync"
"time"
"github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/caddyconfig"
"github.com/hashicorp/consul/api"
)
// watchConsul starts all the Consul requesting go-routines,
// handles the initDone flag and handles the events triggered
// by the requests to Consul
func (cc *App) watchConsul() {
// Two chanels supporting the config and services transmissions
configChan := make(chan *caddy.Config)
servicesChan := make(chan serviceEntries)
// A local channel used to notify and propagate that init is done
localInitDoneChan := make(chan bool)
// Three channels are used to shutdown the different goroutines in the app
stopChan := make(chan bool)
shutdownKV := make(chan bool)
shutdownServices := make(chan bool)
// Two wait groups to ensure that init is indeed done
var confInitWaitGroup sync.WaitGroup
var servicesInitWaitGroup sync.WaitGroup
// And two wait groups to ensure that shutdown is done
var confWaitGroup sync.WaitGroup
var servicesWaitGroup sync.WaitGroup
// Global lock mutex to avoid updating the config and services during the generation of the Caddy JSON config
var lock sync.Mutex
// We will at least wait for the first config
// and the first call to load all available services
confInitWaitGroup.Add(1)
servicesInitWaitGroup.Add(1)
confWaitGroup.Add(1)
servicesWaitGroup.Add(1)
// Starting to watch Consul
go cc.watchConsulKV(configChan, shutdownKV, &confInitWaitGroup, &confWaitGroup)
go cc.watchConsulServices(servicesChan, shutdownServices, &servicesInitWaitGroup, &servicesWaitGroup)
initDone := globalInitDone
// Wait for init and wait for shutdown
if !initDone {
go cc.waitForInitAndGenerateFirstConfig(&confInitWaitGroup, &servicesInitWaitGroup, localInitDoneChan)
}
go cc.waitForShutdownEvent(shutdownKV, shutdownServices, stopChan)
needGeneration := false
OUTERLOOP:
for {
select {
case _ = <-stopChan:
break OUTERLOOP
case conf := <-configChan:
// Let's lock during the manipulation of our global vars
lock.Lock()
caddy.Log().Named("consul").Debug("Update to the global configuration stored in Consul detected")
// Let's update our global config var
globalConfig = conf
// Caddy will shut this instance of the app and start a new one with the its new configuration when we'll call caddy.Load(),
// though this is the current instance that will generate the next applied Caddy configuration and this generation heavily uses
// this instance's app configuration
// To prevent the new app configuration being used for the n+1 generation, we update it with the new one before randering
// the next Caddy configuration JSON payload
if conf, ok := conf.AppsRaw["consul"]; ok {
json.Unmarshal(conf, cc)
}
// We will need to re-generate the configuration during next tick
needGeneration = true
lock.Unlock()
case serviceEntries := <-servicesChan:
// Let's lock during the manipulation of our global vars
lock.Lock()
caddy.Log().Named("consul").Debug("Update to the Consul services detected")
// Let's update our global services var
switch serviceEntries.EventType {
case "update":
globalServices[serviceEntries.Service] = serviceEntries.Entries
case "delete":
delete(globalServices, serviceEntries.Service)
}
// We will need to re-generate the configuration during next tick
needGeneration = true
lock.Unlock()
case <-localInitDoneChan:
initDone = true
globalInitDone = true
case <-time.After(time.Second * 2):
// If there is no need for generation (or init has not been done yet), nothing to do
if !needGeneration || !initDone {
continue
}
var err error
// Let's lock during our generation process
lock.Lock()
caddy.Log().Named("consul").Debug("Regenerating configuration")
cc.fullConfigJSON, err = cc.generateConfAsJSON()
if err != nil {
caddy.Log().Error(fmt.Sprintf("Unable to generate config: %s", err))
lock.Unlock()
continue
}
// We just generated the conf, no need to do it again before the next update
needGeneration = false
// Sleeping a random time to reduce chances that all Caddy instances
// try to generate new SSL certificates at the same time
rand.Seed(time.Now().UnixNano())
n := rand.Intn(1500)
caddy.Log().Named("consul").Debug(fmt.Sprintf("sleeping %d ms before applying new configuration:\n%s\n", n, cc.fullConfigJSON))
time.Sleep(time.Duration(n) * time.Millisecond)
// We're not in the initial run anymore, so we have to propagate
// the new configuration to Caddy by ourselves
err = caddy.Load(cc.fullConfigJSON, false)
if err != nil {
caddy.Log().Error(fmt.Sprintf("Unable to load conf config: %s", err))
}
caddy.Log().Named("consul").Debug("Configuration was regenerated and applied by Caddy")
lock.Unlock()
}
}
// We wait for the config go-routine
confWaitGroup.Wait()
// We wait for the services go-routines
servicesWaitGroup.Wait()
caddy.Log().Named("consul").Info("Exiting app!")
}
// waitForInitAndGenerateFirstConfig just waits for the init waitgroups to be done
// and sends an event in the `initDone` channel
func (cc *App) waitForInitAndGenerateFirstConfig(confWaitGroup *sync.WaitGroup, servicesWaitGroup *sync.WaitGroup, initDone chan bool) {
// We just wait for the two sync.WaitGroups to be done
confWaitGroup.Wait()
servicesWaitGroup.Wait()
initDone <- true
}
// waitForShutdownEvent waits for an event received on the shutdown channel
// and broadcasts it to the config watching go-routine and the services watching
// go-routines
func (cc *App) waitForShutdownEvent(shutdownKV chan bool, shutdownServices chan bool, stopChan chan bool) {
OUTERLOOP:
for {
select {
case _ = <-cc.shutdownChan:
// Stop the Consul watching goroutines
shutdownKV <- true
shutdownServices <- true
// Then stop the main goroutine
stopChan <- true
break OUTERLOOP
}
}
}
// watchConsulKV watches the Consul K/V store key holding the Caddy configuration
// this configuration is either in JSON or Caddyfile formats
func (cc *App) watchConsulKV(configChan chan *caddy.Config, shutdownKV chan bool, confInitWaitGroup *sync.WaitGroup, confWaitGroup *sync.WaitGroup) {
stopped := false
signalInit := true
lastIndex := getLastIndex("consul-kv")
KVCtx, cancelKVFunc := context.WithCancel(context.Background())
go func() {
<-shutdownKV
stopped = true
caddy.Log().Named("consul.watcher.kv").Info("Stopping KV watching routine!")
// We shutdown the main Consul requester
cancelKVFunc()
// We signal the wait group that this goroutine is done
confWaitGroup.Done()
caddy.Log().Named("consul.watcher.kv").Debug("Stopped KV watching routine!")
}()
for {
if stopped {
caddy.Log().Named("consul.watcher.kv").Debug("Services watching routine is stopped, returning!")
return
}
caddy.Log().Named("consul.watcher.kv").Debug("Requesting KV updates!")
queryOptions := &api.QueryOptions{
WaitIndex: lastIndex,
WaitTime: time.Minute * 5,
}
keypair, meta, err := cc.client.KV().Get(cc.ConsulGlobalConfigKey, queryOptions.WithContext(KVCtx))
if err != nil {
// If we canceled the context, nothing wrong here
if KVCtx.Err() == context.Canceled {
return
}
caddy.Log().Named("consul.watcher.kv").Error(fmt.Sprintf("unable to request KV value from Consul: %s", err))
time.Sleep(time.Second * 1)
continue
}
if meta == nil {
caddy.Log().Named("consul.watcher.kv").Error("Consul returned an empty meta: key probably doesn't exist")
time.Sleep(time.Second * 1)
continue
}
if lastIndex >= meta.LastIndex {
caddy.Log().Named("consul.watcher.kv").Debug("Consul index didn't change: this is just a request timeout")
continue
}
lastIndex = storeLastIndex("consul-kv", meta.LastIndex)
conf := &caddy.Config{}
err = json.Unmarshal(keypair.Value, &conf)
if err != nil {
caddy.Log().Named("consul.watcher.kv").Debug("unable to unmarshal Consul KV content into caddy.Config struct, let's check if it's a caddyfile format")
cfgAdapter := caddyconfig.GetAdapter("caddyfile")
if cfgAdapter == nil {
caddy.Log().Named("consul.watcher.kv").Error("no Caddyfile adapter found")
continue
}
jsonVal, _, err := cfgAdapter.Adapt(keypair.Value, map[string]interface{}{})
if err != nil {
caddy.Log().Named("consul.watcher.kv").Error(fmt.Sprintf("error while adapting caddyfile to JSON: %s", err))
continue
}
err = json.Unmarshal(jsonVal, &conf)
if err != nil {
caddy.Log().Named("consul.watcher.kv").Error("unable to unmarshal Consul KV content into caddy.Config struct")
continue
}
}
configChan <- conf
if signalInit {
confInitWaitGroup.Done()
signalInit = false
}
}
}
// watchConsulServices watches the global services that hold the tag that we look for
// and triggers new go-routines watching the health of each returned service
func (cc *App) watchConsulServices(servicesChan chan serviceEntries, shutdownServices chan bool, servicesInitWaitGroup *sync.WaitGroup, servicesWaitGroup *sync.WaitGroup) {
if cc.AutoReverseProxy.ServicesTag == "" {
caddy.Log().Named("consul.watcher.services").Info("No services tag to watch, not watching services")
servicesInitWaitGroup.Done()
return
}
signalInit := true
lastIndex := getLastIndex("consul-services")
currentServices := make(map[string]func())
stop := false
healthCtx, cancelHealthFunc := context.WithCancel(context.Background())
go func() {
<-shutdownServices
stop = true
caddy.Log().Named("consul.watcher.services").Info("Stopping services watching routine!")
// We shutdown the main Consul requester
cancelHealthFunc()
for _, cancelFunc := range currentServices {
// We call cancelFunc to stop the goroutine that watches Consul for this service
cancelFunc()
}
servicesWaitGroup.Done()
caddy.Log().Named("consul.watcher.services").Debug("Stopped services watching routine!")
}()
for {
if stop {
caddy.Log().Named("consul.watcher.services").Debug("Services watching routine is stopped, returning!")
return
}
caddy.Log().Named("consul.watcher.services").Debug("Requesting services updates!")
options := &api.QueryOptions{
WaitIndex: lastIndex,
WaitTime: time.Minute * 5,
Filter: fmt.Sprintf("%s in ServiceTags", cc.AutoReverseProxy.ServicesTag),
}
healthChecks, meta, err := cc.client.Health().State("passing", options.WithContext(healthCtx))
if err != nil {
// If we canceled the context, nothing wrong here
if healthCtx.Err() == context.Canceled {
return
}
caddy.Log().Named("consul.watcher.services").Error(fmt.Sprintf("unable to request services from Consul: %s", err))
time.Sleep(time.Second * 1)
continue
}
if meta == nil {
caddy.Log().Named("consul.watcher.services").Error("Consul returned an empty meta...?")
time.Sleep(time.Second * 1)
continue
}
if lastIndex >= meta.LastIndex {
caddy.Log().Named("consul.watcher.services").Debug("Consul index didn't change: this is just a request timeout")
continue
}
lastIndex = storeLastIndex("consul-services", meta.LastIndex)
caddy.Log().Named("consul.watcher.services").Debug("Services list updated")
services := make(map[string]bool)
for _, service := range healthChecks {
services[service.ServiceName] = true
}
// Let's start by iterating on the services we have to monitor
for serviceName := range services {
if _, ok := currentServices[serviceName]; !ok {
// Create a new context that can be canceled if the service is not to be monitored anymore
ctx, cancelFunc := context.WithCancel(context.Background())
// Fill our services map with the cancel function for when we'll be needing it
currentServices[serviceName] = cancelFunc
// Launch the dedicated goroutine
go cc.watchConsulServiceHealthyEntries(ctx, serviceName, servicesChan, servicesInitWaitGroup, signalInit)
}
}
if signalInit {
caddy.Log().Named("consul.watcher.services").Debug(fmt.Sprintf("adding %d to the services sync.WaitGroup for the initialization", len(services)))
servicesInitWaitGroup.Add(len(services))
servicesInitWaitGroup.Done()
signalInit = false
}
// And now, let's clean the services that we don't need to monitor anymore
for serviceName, cancelFunc := range currentServices {
if _, ok := services[serviceName]; !ok {
// We call cancelFunc to stop the goroutine that watches Consul for this service
cancelFunc()
// We remove the service from our current watched services
delete(currentServices, serviceName)
// We send the delete event to the main watcher for propagation
servicesChan <- serviceEntries{
EventType: "delete",
Service: serviceName,
}
}
}
}
}
// watchConsulServiceHealthyEntries watches the health of a specific service
func (cc *App) watchConsulServiceHealthyEntries(ctx context.Context, serviceName string, servicesChan chan serviceEntries, servicesInitWaitGroup *sync.WaitGroup, signalInit bool) {
lastIndex := getLastIndex(fmt.Sprintf("consul-services-%s", serviceName))
for {
select {
case <-ctx.Done():
caddy.Log().Named(fmt.Sprintf("consul.watcher.services.health.%s", serviceName)).Info("Closing service watcher")
return
default:
queryOptions := &api.QueryOptions{
WaitIndex: lastIndex,
WaitTime: time.Minute * 5,
}
consulServiceEntries, meta, err := cc.client.Health().Service(serviceName, cc.AutoReverseProxy.ServicesTag, true, queryOptions.WithContext(ctx))
if err != nil {
// If we canceled the context, nothing wrong here
if ctx.Err() == context.Canceled {
return
}
caddy.Log().Named(fmt.Sprintf("consul.watcher.services.health.%s", serviceName)).Error(fmt.Sprintf("unable to request healthy service entries from Consul: %s", err))
time.Sleep(time.Second * 1)
continue
}
if meta == nil {
caddy.Log().Named(fmt.Sprintf("consul.watcher.services.health.%s", serviceName)).Error("Consul returned an empty meta... service doesn't exist anymore?")
time.Sleep(time.Second * 1)
continue
}
if lastIndex >= meta.LastIndex {
caddy.Log().Named(fmt.Sprintf("consul.watcher.services.health.%s", serviceName)).Debug("Consul index didn't change: this is just a request timeout")
continue
}
caddy.Log().Named(fmt.Sprintf("consul.watcher.services.health.%s", serviceName)).Debug("Service health updated")
lastIndex = storeLastIndex(fmt.Sprintf("consul-services-%s", serviceName), meta.LastIndex)
servicesChan <- serviceEntries{
EventType: "update",
Service: serviceName,
Entries: consulServiceEntries,
}
if signalInit {
servicesInitWaitGroup.Done()
signalInit = false
}
}
}
}