Skip to content

Commit

Permalink
Update Prometheus metrics. Bump (test) version.
Browse files Browse the repository at this point in the history
  • Loading branch information
brocaar committed May 3, 2019
1 parent f511d1c commit 3c934d3
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 121 deletions.
2 changes: 1 addition & 1 deletion docs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,5 @@ pygmentsUseClasses = true
weight = 7

[params]
version = "3.0.0-test.2"
version = "3.0.0-test.3"

4 changes: 4 additions & 0 deletions docs/content/overview/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ now used for selecting and configuring the packet-forwarder backends.
The MQTT integration configuration has moved to the new `[integration...]`
section. This allows for adding new integrations in the future besides MQTT.

#### Prometheus metrics

The Prometheus metrics have been updated / cleaned up.

## v2.7.1

### Bugfixes
Expand Down
9 changes: 9 additions & 0 deletions internal/backend/basicstation/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ func NewBackend(conf config.Config) (*Backend, error) {
b.websocketWrap(b.handleRouterInfo, w, r)
})
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
bsEventCounter("connect")
b.websocketWrap(b.handleGateway, w, r)
bsEventCounter("disconnect")
})

// using net.Listen makes it easier to test as we can bind to ":0" and
Expand Down Expand Up @@ -197,6 +199,7 @@ func (b *Backend) SendDownlinkFrame(df gw.DownlinkFrame) error {
var gatewayID lorawan.EUI64
copy(gatewayID[:], df.TxInfo.GatewayId)

bsWebsocketSendCounter("dnmsg")
if err := b.sendToGateway(gatewayID, pl); err != nil {
return errors.Wrap(err, "send to gateway error")
}
Expand All @@ -215,6 +218,7 @@ func (b *Backend) ApplyConfiguration(gwConfig gw.GatewayConfiguration) error {
var gatewayID lorawan.EUI64
copy(gatewayID[:], gwConfig.GatewayId)

bsWebsocketSendCounter("router_config")
if err := b.sendToGateway(gatewayID, rc); err != nil {
return errors.Wrap(err, "send router config to gateway error")
}
Expand All @@ -231,6 +235,7 @@ func (b *Backend) Close() error {
}

func (b *Backend) handleRouterInfo(r *http.Request, c *websocket.Conn) {
bsWebsocketReceiveCounter("router_info")
var req structs.RouterInfoRequest

if err := c.ReadJSON(&req); err != nil {
Expand Down Expand Up @@ -326,6 +331,8 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) {
continue
}

bsWebsocketReceiveCounter(string(msgType))

// handle message-type
switch msgType {
case structs.VersionMessage:
Expand Down Expand Up @@ -517,6 +524,7 @@ func (b *Backend) websocketWrap(handler func(*http.Request, *websocket.Conn), w

conn.SetReadDeadline(time.Now().Add(b.readTimeout))
conn.SetPongHandler(func(string) error {
bsWebsocketPingPongCounter("pong")
conn.SetReadDeadline(time.Now().Add(b.readTimeout))
return nil
})
Expand All @@ -528,6 +536,7 @@ func (b *Backend) websocketWrap(handler func(*http.Request, *websocket.Conn), w
for {
select {
case <-ticker.C:
bsWebsocketPingPongCounter("ping")
conn.SetWriteDeadline(time.Now().Add(b.writeTimeout))
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
log.WithError(err).Error("backend/basicstation: send ping message error")
Expand Down
56 changes: 56 additions & 0 deletions internal/backend/basicstation/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package basicstation

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/brocaar/lora-gateway-bridge/internal/metrics"
)

var (
bsEventCounter func(string)
bsWebsocketSendCounter func(string)
bsWebsocketReceiveCounter func(string)
bsWebsocketPingPongCounter func(string)
)

func init() {
ec := metrics.MustRegisterNewCounter(
"backend_basicstation_event",
"Per gateway event type counter.",
[]string{"event"},
)

wsc := metrics.MustRegisterNewCounter(
"backend_basicstation_websocket_send",
"Per message-type websocket write counter.",
[]string{"msgtype"},
)

wrc := metrics.MustRegisterNewCounter(
"backend_basicstation_websocket_receive",
"Per message-type websocket receive counter.",
[]string{"msgtype"},
)

ppc := metrics.MustRegisterNewCounter(
"backend_basicstation_websocket_ping_pong",
"Websocket Ping/Pong counter.",
[]string{"type"},
)

bsEventCounter = func(event string) {
ec(prometheus.Labels{"event": event})
}

bsWebsocketReceiveCounter = func(msgtype string) {
wsc(prometheus.Labels{"msgtype": msgtype})
}

bsWebsocketSendCounter = func(msgtype string) {
wrc(prometheus.Labels{"msgtype": msgtype})
}

bsWebsocketPingPongCounter = func(typ string) {
ppc(prometheus.Labels{"type": typ})
}
}
63 changes: 30 additions & 33 deletions internal/backend/semtechudp/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,25 +192,25 @@ func (b *Backend) SendDownlinkFrame(frame gw.DownlinkFrame) error {
// ApplyConfiguration applies the given configuration to the gateway
// (packet-forwarder).
func (b *Backend) ApplyConfiguration(config gw.GatewayConfiguration) error {
return gatewayConfigHandleTimer(func() error {
var gatewayID lorawan.EUI64
copy(gatewayID[:], config.GatewayId)

b.Lock()
var pfConfig *pfConfiguration
for i := range b.configurations {
if b.configurations[i].gatewayID == gatewayID {
pfConfig = &b.configurations[i]
}
}
b.Unlock()
eventCounter("configuration")

var gatewayID lorawan.EUI64
copy(gatewayID[:], config.GatewayId)

if pfConfig == nil {
return errGatewayDoesNotExist
b.Lock()
var pfConfig *pfConfiguration
for i := range b.configurations {
if b.configurations[i].gatewayID == gatewayID {
pfConfig = &b.configurations[i]
}
}
b.Unlock()

return b.applyConfiguration(*pfConfig, config)
})
if pfConfig == nil {
return errGatewayDoesNotExist
}

return b.applyConfiguration(*pfConfig, config)
}

func (b *Backend) applyConfiguration(pfConfig pfConfiguration, config gw.GatewayConfiguration) error {
Expand Down Expand Up @@ -315,11 +315,8 @@ func (b *Backend) sendPackets() error {
"protocol_version": p.data[0],
}).Debug("backend/semtechudp: sending udp packet to gateway")

err = gatewayWriteUDPTimer(pt.String(), func() error {
_, err := b.conn.WriteToUDP(p.data, p.addr)
return err
})

udpWriteCounter(pt.String())
_, err = b.conn.WriteToUDP(p.data, p.addr)
if err != nil {
log.WithFields(log.Fields{
"addr": p.addr,
Expand Down Expand Up @@ -349,18 +346,18 @@ func (b *Backend) handlePacket(up udpPacket) error {
"protocol_version": up.data[0],
}).Debug("backend/semtechudp: received udp packet from gateway")

return gatewayHandleTimer(pt.String(), func() error {
switch pt {
case packets.PushData:
return b.handlePushData(up)
case packets.PullData:
return b.handlePullData(up)
case packets.TXACK:
return b.handleTXACK(up)
default:
return fmt.Errorf("backend/semtechudp: unknown packet type: %s", pt)
}
})
udpReadCounter(pt.String())

switch pt {
case packets.PushData:
return b.handlePushData(up)
case packets.PullData:
return b.handlePullData(up)
case packets.TXACK:
return b.handleTXACK(up)
default:
return fmt.Errorf("backend/semtechudp: unknown packet type: %s", pt)
}
}

func (b *Backend) handlePullData(up udpPacket) error {
Expand Down
47 changes: 18 additions & 29 deletions internal/backend/semtechudp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,50 +7,39 @@ import (
)

var (
gatewayEventCounter func(string)
gatewayWriteUDPTimer func(string, func() error) error
gatewayHandleTimer func(string, func() error) error
gatewayConfigHandleTimer func(func() error) error
eventCounter func(string)
udpWriteCounter func(string)
udpReadCounter func(string)
)

func init() {
ec := metrics.MustRegisterNewCounter(
"gateway_event",
"Per event type counter.",
"backend_semtechudp_event",
"Per gateway event type counter.",
[]string{"event"},
)

wt := metrics.MustRegisterNewTimerWithError(
"gateway_udp_write",
"Per message-type UDP write duration tracking.",
[]string{"type"},
uwc := metrics.MustRegisterNewCounter(
"backend_semtechudp_udp_write",
"UDP packets written by packet type.",
[]string{"packet_type"},
)

ht := metrics.MustRegisterNewTimerWithError(
"gateway_udp_received_handle",
"Per message-type received UDP handling duration tracking.",
[]string{"type"},
urc := metrics.MustRegisterNewCounter(
"backend_semtechudp_udp_read",
"UDP packets read by packet type.",
[]string{"packet_type"},
)

ch := metrics.MustRegisterNewTimerWithError(
"gateway_config_handle",
"Tracks the duration of configuration handling.",
[]string{},
)

gatewayEventCounter = func(event string) {
eventCounter = func(event string) {
ec(prometheus.Labels{"event": event})
}

gatewayWriteUDPTimer = func(mType string, f func() error) error {
return wt(prometheus.Labels{"type": mType}, f)
}

gatewayHandleTimer = func(mType string, f func() error) error {
return ht(prometheus.Labels{"type": mType}, f)
udpWriteCounter = func(pt string) {
uwc(prometheus.Labels{"packet_type": pt})
}

gatewayConfigHandleTimer = func(f func() error) error {
return ch(prometheus.Labels{}, f)
udpReadCounter = func(pt string) {
urc(prometheus.Labels{"packet_type": pt})
}
}
4 changes: 2 additions & 2 deletions internal/backend/semtechudp/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (c *gateways) set(gatewayID lorawan.EUI64, gw gateway) error {

_, ok := c.gateways[gatewayID]
if !ok {
gatewayEventCounter("register_gateway")
eventCounter("connect")
c.connectChan <- gatewayID
}
c.gateways[gatewayID] = gw
Expand All @@ -68,7 +68,7 @@ func (c *gateways) cleanup() error {

for gatewayID := range c.gateways {
if c.gateways[gatewayID].lastSeen.Before(time.Now().Add(gatewayCleanupDuration)) {
gatewayEventCounter("unregister_gateway")
eventCounter("disconnect")
c.disconnectChan <- gatewayID
delete(c.gateways, gatewayID)
}
Expand Down
Loading

0 comments on commit 3c934d3

Please sign in to comment.