Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delta support to orchestrator #1489

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 119 additions & 15 deletions lib/nerves_hub/deployments/orchestrator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule NervesHub.Deployments.Orchestrator do

alias NervesHub.Devices
alias NervesHub.Devices.Device
alias NervesHub.Firmwares
alias NervesHub.Repo
alias Phoenix.PubSub
alias Phoenix.Socket.Broadcast
Expand All @@ -32,6 +33,10 @@ defmodule NervesHub.Deployments.Orchestrator do
GenServer.cast(name(deployment_id), :trigger)
end

def report_version(deployment_id, firmware_uuid) when is_integer(deployment_id) do
GenServer.cast(name(deployment_id), {:report, firmware_uuid})
end

@doc """
Trigger an update for a device on the local node

Expand All @@ -40,6 +45,7 @@ defmodule NervesHub.Deployments.Orchestrator do
- the deployment
- not updating
- not using the deployment's current firmware
- with deltas active: not currently being generated

If there is space for the device based on the concurrent allowed updates
the device is told to update. This is not guaranteed to be at or under the
Expand All @@ -48,7 +54,8 @@ defmodule NervesHub.Deployments.Orchestrator do
As devices update and reconnect, the new orchestrator is told that the update
was successful, and the process is repeated.
"""
def trigger_update(deployment) do
def trigger_update(%{deployment: deployment} = state) do
deployment = Repo.preload(deployment, [:product, :firmware])
:telemetry.execute([:nerves_hub, :deployment, :trigger_update], %{count: 1})

match_conditions = [
Expand All @@ -58,6 +65,19 @@ defmodule NervesHub.Deployments.Orchestrator do
{:"/=", {:map_get, :firmware_uuid, :"$1"}, deployment.firmware.uuid}}
]

match_conditions =
if deployment.product.delta_updatable do
processing_deltas =
for {firmware_uuid, :processing} <- state.delta_status do
# Build rule to skip each firmware_uuid that is currently in delta processing
{:"/=", {:map_get, :firmware_uuid, :"$1"}, firmware_uuid}
end

match_conditions ++ processing_deltas
else
match_conditions
end

match_return = %{
device_id: {:element, 1, :"$_"},
pid: {:element, 1, {:element, 2, :"$_"}},
Expand Down Expand Up @@ -103,10 +123,16 @@ defmodule NervesHub.Deployments.Orchestrator do
end

def init(deployment) do
{:ok, deployment, {:continue, :boot}}
state = %{
# fw_uuid => :ready | :processing | :needs_full
delta_status: %{},
deployment: deployment
}

{:ok, state, {:continue, :boot}}
end

def handle_continue(:boot, deployment) do
def handle_continue(:boot, %{deployment: deployment} = state) do
_ = PubSub.subscribe(NervesHub.PubSub, "deployment:#{deployment.id}")

# trigger every 5 minutes as a back up
Expand All @@ -115,32 +141,110 @@ defmodule NervesHub.Deployments.Orchestrator do
deployment =
deployment
|> Repo.reload()
|> Repo.preload([:firmware], force: true)
|> Repo.preload([:firmware, []], force: true)
|> Repo.preload([:product], force: true)

{:noreply, %{state | deployment: deployment}}
end

{:noreply, deployment}
def handle_cast(:trigger, state) do
trigger_update(state)
{:noreply, state}
end

def handle_cast(:trigger, deployment) do
trigger_update(deployment)
{:noreply, deployment}
def handle_cast({:report, firmware_uuid}, %{deployment: deployment} = state) do
# Are delta updates enabled?
state =
if firmware_uuid != deployment.firmware.uuid and deployment.product.delta_updatable do
ensure_delta_resolved(state, firmware_uuid)
else
state
end

{:noreply, state}
end

def handle_info(%Broadcast{event: "deployments/update"}, deployment) do
def handle_info(%Broadcast{event: "deployments/update"}, %{deployment: deployment} = state) do
deployment =
deployment
|> Repo.reload()
|> Repo.preload([:firmware], force: true)

trigger_update(deployment)
# When firmware changes we stop caring about the current delta sets
delta_status =
if deployment.firmware_id != state.deployment.firmware_id do
%{state | delta_status: %{}}
else
# This event fires when deltas are completed as well
# Check all deltas that were in processing
for {firmware_uuid, status} <- state.delta_status, into: state.delta_status do
if status == :processing do
with {:ok, source_fw} <- Firmwares.get_firmware_by_uuid(firmware_uuid),
{:ok, _delta} <-
Firmwares.get_firmware_delta_by_source_and_target(
source_fw,
deployment.firmware
) do
{firmware_uuid, :ready}
else
_ ->
{firmware_uuid, :processing}
end
else
{firmware_uuid, status}
end
end
end

state = %{state | deployment: deployment, delta_status: delta_status}
trigger_update(state)

{:noreply, deployment}
{:noreply, state}
end

# Catch all for unknown broadcasts on a deployment
def handle_info(%Broadcast{topic: "deployment:" <> _}, deployment), do: {:noreply, deployment}
def handle_info(%Broadcast{topic: "deployment:" <> _}, %{deployment: deployment} = state),
do: {:noreply, %{state | deployment: deployment}}

def handle_info(:trigger, state) do
trigger_update(state)
{:noreply, state}
end

defp ensure_delta_resolved(%{deployment: deployment} = state, firmware_uuid) do
# Do we need to figure out what the delta status is?
if is_nil(state.delta_status[firmware_uuid]) do
# Does the source version exist as firmware for this deployment?
# Otherwise generating a delta is impossible.
case Firmwares.get_firmware_by_product_and_uuid(deployment.product, firmware_uuid) do
{:ok, source_fw} ->
attempt_resolve_delta(state, source_fw)

{:error, _} ->
Logger.warning(
"Cannot trigger firmware delta generation from #{firmware_uuid}. Firmware does not exist for Deployment ID #{deployment.id}."
)

set_delta_status(state, firmware_uuid, :needs_full)
end
else
# Do nothing if already resolved
state
end
end

defp attempt_resolve_delta(state, source_fw) do
case Firmwares.get_firmware_delta_by_source_and_target(source_fw, state.deployment.firmware) do
{:ok, _fw_delta} ->
set_delta_status(state, source_fw.uuid, :ready)

{:error, :not_found} ->
NervesHub.Workers.FirmwareDeltaBuilder.start(source_fw.id, state.deployment.firmware.id)
set_delta_status(state, source_fw.uuid, :processing)
end
end

def handle_info(:trigger, deployment) do
trigger_update(deployment)
{:noreply, deployment}
defp set_delta_status(state, firmware_uuid, status) do
put_in(state, [:delta_status, firmware_uuid], status)
end
end
27 changes: 25 additions & 2 deletions lib/nerves_hub/devices.ex
Original file line number Diff line number Diff line change
Expand Up @@ -680,11 +680,20 @@ defmodule NervesHub.Devices do
end

def resolve_update(device) do
deployment = Repo.preload(device.deployment, [:firmware])
deployment =
device.deployment
|> Repo.preload(:firmware)
|> Repo.preload(:product)

case verify_update_eligibility(device, deployment) do
{:ok, _device} ->
{:ok, url} = Firmwares.get_firmware_url(deployment.firmware)
{:ok, url} =
if deployment.product.delta_updatable do
get_delta_or_firmware_url(device.firmware_metadata.uuid, deployment.firmware)
else
Firmwares.get_firmware_url(deployment.firmware)
end

{:ok, meta} = Firmwares.metadata_from_firmware(deployment.firmware)

%UpdatePayload{
Expand All @@ -703,6 +712,20 @@ defmodule NervesHub.Devices do
end
end

defp get_delta_or_firmware_url(device_firmware_uuid, target_firmware) do
with %Firmware{} = device_firmware <- Firmwares.get_firmware_by_uuid(device_firmware_uuid),
{:ok, firmware_delta} <-
Firmwares.get_firmware_delta_by_source_and_target(device_firmware, target_firmware) do
Firmwares.get_firmware_url(firmware_delta)
else
_ ->
# When a resolve has been triggered, even with delta support on
# it is best to deliver a firmware even if we can't get a delta.
# This could be typical for some manual deployments.
Firmwares.get_firmware_url(target_firmware)
end
end

@spec delta_updatable?(
source :: Firmware.t(),
target :: Firmware.t(),
Expand Down
49 changes: 11 additions & 38 deletions lib/nerves_hub_web/channels/device_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,6 @@ defmodule NervesHubWeb.DeviceChannel do
# we might make a new one right below it, so clear it beforehand
Devices.clear_inflight_update(device)

# Let the orchestrator handle this going forward ?
update_payload = Devices.resolve_update(device)

push_update? =
update_payload.update_available and not is_nil(update_payload.firmware_url) and
update_payload.firmware_meta[:uuid] != params["currently_downloading_uuid"]

maybe_push_update(socket, update_payload, device, push_update?)

## After join
:telemetry.execute([:nerves_hub, :devices, :connect], %{count: 1}, %{
ref_id: socket.assigns.reference_id,
Expand All @@ -80,7 +71,7 @@ defmodule NervesHubWeb.DeviceChannel do
deployment_id: device.deployment_id,
firmware_uuid: device.firmware_metadata.uuid,
updates_enabled: device.updates_enabled && !Devices.device_in_penalty_box?(device),
updating: push_update?
updating: false
}

Map.merge(value, update)
Expand All @@ -89,10 +80,18 @@ defmodule NervesHubWeb.DeviceChannel do
# Cluster tracking
Tracker.online(device)

if device.deployment_id do
# Report firmware version to deployment orchestrator allowing it to start cooking delta updates
NervesHub.Deployments.Orchestrator.report_version(
device.deployment_id,
device.firmware_metadata.uuid
)
end

socket =
socket
|> assign(:device, device)
|> assign(:update_started?, push_update?)
|> assign(:update_started?, false)
|> assign(:penalty_timer, nil)
|> maybe_start_penalty_timer()
|> maybe_send_archive()
Expand Down Expand Up @@ -576,32 +575,6 @@ defmodule NervesHubWeb.DeviceChannel do
:ok
end

defp maybe_push_update(_socket, _update_payload, _device, false) do
:ok
end

defp maybe_push_update(socket, update_payload, device, true) do
# Push the update to the device
push(socket, "update", update_payload)

deployment = device.deployment

description =
"device #{device.identifier} received update for firmware #{deployment.firmware.version}(#{deployment.firmware.uuid}) via deployment #{deployment.name} on connect"

AuditLogs.audit_with_ref!(
deployment,
device,
description,
socket.assigns.reference_id
)

# if there's an update, track it
_ = Devices.told_to_update(device, deployment)

:ok
end

defp subscribe(topic) do
_ = Phoenix.PubSub.subscribe(NervesHub.PubSub, topic)
:ok
Expand Down Expand Up @@ -742,7 +715,7 @@ defmodule NervesHubWeb.DeviceChannel do
end

defp deployment_preload(device) do
Repo.preload(device, [deployment: [:archive, :firmware]], force: true)
Repo.preload(device, [deployment: [:product, :archive, :firmware]], force: true)
end

defp maybe_send_archive(socket) do
Expand Down
Loading
Loading