diff --git a/config/config.exs b/config/config.exs index 65798b023..fcab0fdcf 100644 --- a/config/config.exs +++ b/config/config.exs @@ -69,8 +69,9 @@ config :nerves_hub, Oban, {Oban.Plugins.Pruner, max_age: 604_800}, {Oban.Plugins.Cron, crontab: [ - {"0 * * * *", NervesHub.Workers.ScheduleOrgAuditLogTruncation, max_attempts: 1}, - {"*/1 * * * *", NervesHub.Workers.CleanDeviceConnectionStates}, + {"0 * * * *", NervesHub.Workers.ScheduleOrgAuditLogTruncation}, + {"*/1 * * * *", NervesHub.Workers.CleanStaleDeviceConnections}, + {"0 */1 * * *", NervesHub.Workers.DeleteOldDeviceConnections}, {"*/5 * * * *", NervesHub.Workers.ExpireInflightUpdates}, {"*/15 * * * *", NervesHub.Workers.DeviceHealthTruncation} ]} diff --git a/config/runtime.exs b/config/runtime.exs index 96293a2c4..ebbc2d7d4 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -27,6 +27,10 @@ config :nerves_hub, String.to_integer(System.get_env("DEVICE_DEPLOYMENT_CHANGE_JITTER_SECONDS", "10")), device_last_seen_update_interval_minutes: String.to_integer(System.get_env("DEVICE_LAST_SEEN_UPDATE_INTERVAL_MINUTES", "5")), + device_connection_max_age_days: + String.to_integer(System.get_env("DEVICE_CONNECTION_MAX_AGE_DAYS", "14")), + device_connection_delete_limit: + String.to_integer(System.get_env("DEVICE_CONNECTION_DELETE_LIMIT", "100000")), deployment_calculator_interval_seconds: String.to_integer(System.get_env("DEPLOYMENT_CALCULATOR_INTERVAL_SECONDS", "3600")), mapbox_access_token: System.get_env("MAPBOX_ACCESS_TOKEN"), diff --git a/lib/mix/tasks/gen.devices.ex b/lib/mix/tasks/gen.devices.ex index 7055fd774..bcec8370e 100644 --- a/lib/mix/tasks/gen.devices.ex +++ b/lib/mix/tasks/gen.devices.ex @@ -40,9 +40,6 @@ defmodule Mix.Tasks.NervesHub.Gen.Devices do org_id: org.id, product_id: product.id, identifier: "generated-#{i}", - connection_status: :connected, - connection_established_at: DateTime.now!("Etc/UTC"), - connection_last_seen_at: DateTime.now!("Etc/UTC"), connection_metadata: %{ "location" => %{ "longitude" => lng, diff --git a/lib/nerves_hub/devices.ex b/lib/nerves_hub/devices.ex index 8006a5786..867688562 100644 --- a/lib/nerves_hub/devices.ex +++ b/lib/nerves_hub/devices.ex @@ -13,7 +13,6 @@ defmodule NervesHub.Devices do alias NervesHub.Deployments.Deployment alias NervesHub.Deployments.Orchestrator alias NervesHub.Devices.CACertificate - alias NervesHub.Devices.Connections alias NervesHub.Devices.Device alias NervesHub.Devices.DeviceCertificate alias NervesHub.Devices.DeviceHealth @@ -36,13 +35,6 @@ defmodule NervesHub.Devices do Repo.get(Device, device_id) end - def get_device(device_id, :preload_latest_connection) when is_integer(device_id) do - Device - |> where(id: ^device_id) - |> Connections.preload_latest_connection() - |> Repo.one() - end - def get_active_device(filters) do Device |> Repo.exclude_deleted() @@ -61,15 +53,6 @@ defmodule NervesHub.Devices do |> Repo.all() end - def get_devices_by_org_id_and_product_id(org_id, product_id, :preload_latest_connection) do - Device - |> where([d], d.org_id == ^org_id) - |> where([d], d.product_id == ^product_id) - |> Connections.preload_latest_connection() - |> Repo.exclude_deleted() - |> Repo.all() - end - def get_devices_by_org_id_and_product_id(org_id, product_id, opts) do {entries, _pager} = get_devices_by_org_id_and_product_id_with_pager(org_id, product_id, opts) entries @@ -89,11 +72,16 @@ defmodule NervesHub.Devices do |> join(:left, [d, o], p in assoc(d, :product)) |> join(:left, [d, o, p], dp in assoc(d, :deployment)) |> join(:left, [d, o, p, dp], f in assoc(dp, :firmware)) + |> join(:left, [d, o, p, dp, f], lc in assoc(d, :latest_connection), as: :latest_connection) |> Repo.exclude_deleted() - |> order_by(^sort_devices(sorting)) + |> sort_devices(sorting) |> Filtering.build_filters(filters) - |> preload([d, o, p, dp, f], org: o, product: p, deployment: {dp, firmware: f}) - |> Connections.preload_latest_connection() + |> preload([d, o, p, dp, f, latest_connection: lc], + org: o, + product: p, + deployment: {dp, firmware: f}, + latest_connection: lc + ) |> Flop.run(flop) end @@ -126,10 +114,11 @@ defmodule NervesHub.Devices do Device |> where([d], d.product_id == ^product_id) - |> Connections.preload_latest_connection() |> Repo.exclude_deleted() + |> join(:left, [d], dc in assoc(d, :latest_connection), as: :latest_connection) + |> preload([latest_connection: lc], latest_connection: lc) |> Filtering.build_filters(filters) - |> order_by(^sort_devices(sorting)) + |> sort_devices(sorting) |> Flop.run(flop) |> then(fn {entries, meta} -> meta @@ -145,6 +134,7 @@ defmodule NervesHub.Devices do def get_minimal_device_location_by_org_id_and_product_id(org_id, product_id) do Device + |> join(:inner, [d], dc in DeviceConnection, on: d.latest_connection_id == dc.id) |> where(org_id: ^org_id) |> where(product_id: ^product_id) |> where([d], not is_nil(fragment("?->'location'->'latitude'", d.connection_metadata))) @@ -152,7 +142,7 @@ defmodule NervesHub.Devices do |> select([d, dc], %{ id: d.id, identifier: d.identifier, - connection_status: d.connection_status, + connection_status: dc.connection_status, latitude: fragment("?->'location'->'latitude'", d.connection_metadata), longitude: fragment("?->'location'->'longitude'", d.connection_metadata), firmware_uuid: fragment("?->'uuid'", d.firmware_metadata) @@ -204,13 +194,19 @@ defmodule NervesHub.Devices do end) end - defp sort_devices({:asc, :connection_last_seen_at}), - do: {:asc_nulls_first, :connection_last_seen_at} + defp sort_devices(query, {:asc, :connection_last_seen_at}) do + order_by(query, [latest_connection: latest_connection], + desc_nulls_last: latest_connection.last_seen_at + ) + end - defp sort_devices({:desc, :connection_last_seen_at}), - do: {:desc_nulls_last, :connection_last_seen_at} + defp sort_devices(query, {:desc, :connection_last_seen_at}) do + order_by(query, [latest_connection: latest_connection], + asc_nulls_first: latest_connection.last_seen_at + ) + end - defp sort_devices(sort), do: sort + defp sort_devices(query, sort), do: order_by(query, [], ^sort) def get_device_count_by_org_id(org_id) do q = @@ -299,7 +295,8 @@ defmodule NervesHub.Devices do defp join_and_preload(query, :latest_connection) do query - |> Connections.preload_latest_connection() + |> join(:left, [d], dc in assoc(d, :latest_connection), as: :latest_connection) + |> preload([latest_connection: lc], latest_connection: lc) end @spec get_shared_secret_auth(String.t()) :: @@ -655,28 +652,6 @@ defmodule NervesHub.Devices do |> Repo.all() end - def clean_connection_states() do - interval = Application.get_env(:nerves_hub, :device_last_seen_update_interval_minutes) - a_minute_ago = DateTime.shift(DateTime.utc_now(), minute: -(interval + 1)) - - Device - |> where(connection_status: :connected) - |> where([d], d.connection_last_seen_at < ^a_minute_ago) - |> Repo.update_all( - set: [ - connection_status: :disconnected, - connection_disconnected_at: DateTime.utc_now() - ] - ) - end - - def connected_count(product) do - Device - |> where(connection_status: :connected) - |> where(product_id: ^product.id) - |> Repo.aggregate(:count) - end - def update_firmware_metadata(device, nil) do {:ok, device} end diff --git a/lib/nerves_hub/devices/connections.ex b/lib/nerves_hub/devices/connections.ex index 08279283f..d3587b09c 100644 --- a/lib/nerves_hub/devices/connections.ex +++ b/lib/nerves_hub/devices/connections.ex @@ -31,15 +31,6 @@ defmodule NervesHub.Devices.Connections do |> Repo.one() end - @doc """ - Preload latest respective connection in a device query. - """ - @spec preload_latest_connection(Ecto.Query.t()) :: Ecto.Query.t() - def preload_latest_connection(query) do - query - |> preload(device_connections: ^distinct_on_device()) - end - @doc """ Creates a device connection, reported from device socket """ @@ -48,14 +39,25 @@ defmodule NervesHub.Devices.Connections do def device_connected(device_id) do now = DateTime.utc_now() - %{ - device_id: device_id, - established_at: now, - last_seen_at: now, - status: :connected - } - |> DeviceConnection.create_changeset() - |> Repo.insert() + changeset = + DeviceConnection.create_changeset(%{ + device_id: device_id, + established_at: now, + last_seen_at: now, + status: :connected + }) + + case Repo.insert(changeset) do + {:ok, device_connection} -> + Device + |> where(id: ^device_id) + |> Repo.update_all(set: [latest_connection_id: device_connection.id]) + + {:ok, device_connection} + + {:error, _} = error -> + error + end end @doc """ @@ -89,46 +91,6 @@ defmodule NervesHub.Devices.Connections do |> Repo.update() end - @doc """ - Selects devices id's which has provided status in it's latest connection record. - """ - @spec query_devices_with_connection_status(String.t()) :: Ecto.Query.t() - def query_devices_with_connection_status(status) do - (lr in subquery(latest_row_query())) - |> from() - |> where([lr], lr.rn == 1) - |> where( - [lr], - lr.status == ^String.to_existing_atom(status) - ) - |> join(:inner, [lr], d in Device, on: lr.device_id == d.id) - |> select([lr, d], d.id) - end - - @doc """ - Generates a query to retrieve the most recent `DeviceConnection` for devices. - The query includes the row number (`rn`) - for each record, which is used to identify the most recent connection. - - Returns an Ecto query. - """ - @spec latest_row_query() :: Ecto.Query.t() - def latest_row_query() do - DeviceConnection - |> select([dc], %{ - device_id: dc.device_id, - status: dc.status, - last_seen_at: dc.last_seen_at, - rn: row_number() |> over(partition_by: dc.device_id, order_by: [desc: dc.last_seen_at]) - }) - end - - defp distinct_on_device() do - DeviceConnection - |> distinct(:device_id) - |> order_by([:device_id, desc: :last_seen_at]) - end - def clean_stale_connections() do interval = Application.get_env(:nerves_hub, :device_last_seen_update_interval_minutes) a_minute_ago = DateTime.shift(DateTime.utc_now(), minute: -(interval + 1)) @@ -144,4 +106,32 @@ defmodule NervesHub.Devices.Connections do ] ) end + + def delete_old_connections() do + interval = Application.get_env(:nerves_hub, :device_connection_max_age_days) + delete_limit = Application.get_env(:nerves_hub, :device_connection_delete_limit) + days_ago = DateTime.shift(DateTime.utc_now(), day: -interval) + + query = + DeviceConnection + |> join(:inner, [dc], d in Device, on: dc.device_id == d.id) + |> where([dc, _d], dc.last_seen_at < ^days_ago) + |> where([dc, _d], dc.status != :connected) + |> where([dc, d], dc.id != d.latest_connection_id) + |> select([dc], dc.id) + |> limit(^delete_limit) + + {delete_count, _} = + DeviceConnection + |> where([d], d.id in subquery(query)) + |> Repo.delete_all() + + if delete_count == 0 do + :ok + else + # relax stress on Ecto pool and go again + Process.sleep(2000) + delete_old_connections() + end + end end diff --git a/lib/nerves_hub/devices/device.ex b/lib/nerves_hub/devices/device.ex index cb17b1399..072b9951b 100644 --- a/lib/nerves_hub/devices/device.ex +++ b/lib/nerves_hub/devices/device.ex @@ -26,10 +26,6 @@ defmodule NervesHub.Devices.Device do :updates_blocked_until, :connecting_code, :deployment_id, - :connection_status, - :connection_established_at, - :connection_disconnected_at, - :connection_last_seen_at, :connection_types, :connection_metadata, :status, @@ -41,6 +37,7 @@ defmodule NervesHub.Devices.Device do belongs_to(:org, Org, where: [deleted_at: nil]) belongs_to(:product, Product, where: [deleted_at: nil]) belongs_to(:deployment, Deployment) + belongs_to(:latest_connection, DeviceConnection, type: :binary_id) embeds_one(:firmware_metadata, FirmwareMetadata, on_replace: :update) has_many(:device_certificates, DeviceCertificate, on_delete: :delete_all) has_many(:device_connections, DeviceConnection, on_delete: :delete_all) @@ -67,15 +64,15 @@ defmodule NervesHub.Devices.Device do timestamps() - # Deprecated fields, replaced with device_connections table. - field(:connection_status, Ecto.Enum, - values: [:connected, :disconnected, :not_seen], - default: :not_seen - ) - - field(:connection_established_at, :utc_datetime) - field(:connection_disconnected_at, :utc_datetime) - field(:connection_last_seen_at, :utc_datetime) + # Deprecated fields, remove these any time after 29/1/2025. + # Also remove index from NervesHub.Repo.Migrations.AddConnectionStatusIndexToDevices. + # field(:connection_status, Ecto.Enum, + # values: [:connected, :disconnected, :not_seen], + # default: :not_seen + # ) + # field(:connection_established_at, :utc_datetime) + # field(:connection_disconnected_at, :utc_datetime) + # field(:connection_last_seen_at, :utc_datetime) embeds_one(:extensions, DeviceExtensionsSetting, on_replace: :update) end diff --git a/lib/nerves_hub/devices/filtering.ex b/lib/nerves_hub/devices/filtering.ex index c422534af..897dbc6ea 100644 --- a/lib/nerves_hub/devices/filtering.ex +++ b/lib/nerves_hub/devices/filtering.ex @@ -6,7 +6,6 @@ defmodule NervesHub.Devices.Filtering do import Ecto.Query alias NervesHub.Devices.Alarms - alias NervesHub.Devices.Connections alias NervesHub.Devices.DeviceMetric alias NervesHub.Types.Tag @@ -43,11 +42,7 @@ defmodule NervesHub.Devices.Filtering do if value == "not_seen" do where(query, [d], d.status == :registered) else - where( - query, - [d], - d.id in subquery(Connections.query_devices_with_connection_status(value)) - ) + where(query, [latest_connection: lc], lc.status == ^value) end end @@ -130,9 +125,9 @@ defmodule NervesHub.Devices.Filtering do {value_as_float, _} = Float.parse(value) query - |> join(:inner, [d], m in DeviceMetric, on: d.id == m.device_id) - |> where([_, m], m.inserted_at == subquery(latest_metric_for_key(key))) - |> where([d, m], m.key == ^key) + |> join(:inner, [d], m in DeviceMetric, on: d.id == m.device_id, as: :device_metric) + |> where([device_metric: dm], dm.inserted_at == subquery(latest_metric_for_key(key))) + |> where([device_metric: dm], dm.key == ^key) |> gt_or_lt(value_as_float, operator) end @@ -144,6 +139,6 @@ defmodule NervesHub.Devices.Filtering do |> where([dm], dm.key == ^key) end - defp gt_or_lt(query, value, "gt"), do: where(query, [_, dm], dm.value > ^value) - defp gt_or_lt(query, value, "lt"), do: where(query, [_, dm], dm.value < ^value) + defp gt_or_lt(query, value, "gt"), do: where(query, [device_metric: dm], dm.value > ^value) + defp gt_or_lt(query, value, "lt"), do: where(query, [device_metric: dm], dm.value < ^value) end diff --git a/lib/nerves_hub/tracker.ex b/lib/nerves_hub/tracker.ex index 23f2d6212..e7e6f6e38 100644 --- a/lib/nerves_hub/tracker.ex +++ b/lib/nerves_hub/tracker.ex @@ -4,6 +4,7 @@ defmodule NervesHub.Tracker do """ alias NervesHub.Devices.Device + alias NervesHub.Repo def online(%{} = device) do online(device.identifier) @@ -69,7 +70,10 @@ defmodule NervesHub.Tracker do Returns `true` if device's latest connections has a status of `:connected`, otherwise `false`. """ - def online?(%{device_connections: [%{status: :connected}]}), do: true + def online?(%{latest_connection: %Ecto.Association.NotLoaded{}} = device), + do: online?(Repo.preload(device, :latest_connection)) + + def online?(%{latest_connection: %{status: :connected}}), do: true def online?(_), do: false @doc """ diff --git a/lib/nerves_hub/workers/clean_device_connection_states.ex b/lib/nerves_hub/workers/clean_device_connection_states.ex deleted file mode 100644 index 94accc644..000000000 --- a/lib/nerves_hub/workers/clean_device_connection_states.ex +++ /dev/null @@ -1,16 +0,0 @@ -defmodule NervesHub.Workers.CleanDeviceConnectionStates do - use Oban.Worker, - max_attempts: 5, - queue: :device - - alias NervesHub.Devices - alias NervesHub.Devices.Connections - - @impl Oban.Worker - def perform(_) do - Devices.clean_connection_states() - Connections.clean_stale_connections() - - :ok - end -end diff --git a/lib/nerves_hub/workers/clean_stale_device_connections.ex b/lib/nerves_hub/workers/clean_stale_device_connections.ex new file mode 100644 index 000000000..8cb770254 --- /dev/null +++ b/lib/nerves_hub/workers/clean_stale_device_connections.ex @@ -0,0 +1,14 @@ +defmodule NervesHub.Workers.CleanStaleDeviceConnections do + use Oban.Worker, + max_attempts: 5, + queue: :device + + alias NervesHub.Devices.Connections + + @impl Oban.Worker + def perform(_) do + _ = Connections.clean_stale_connections() + + :ok + end +end diff --git a/lib/nerves_hub/workers/delete_old_device_connections.ex b/lib/nerves_hub/workers/delete_old_device_connections.ex new file mode 100644 index 000000000..87c651d3f --- /dev/null +++ b/lib/nerves_hub/workers/delete_old_device_connections.ex @@ -0,0 +1,14 @@ +defmodule NervesHub.Workers.DeleteOldDeviceConnections do + use Oban.Worker, + max_attempts: 5, + queue: :device + + alias NervesHub.Devices.Connections + + @impl Oban.Worker + def perform(_) do + _ = Connections.delete_old_connections() + + :ok + end +end diff --git a/lib/nerves_hub/workers/schedule_org_audit_log_truncation.ex b/lib/nerves_hub/workers/schedule_org_audit_log_truncation.ex index 695de1a75..1462a24fc 100644 --- a/lib/nerves_hub/workers/schedule_org_audit_log_truncation.ex +++ b/lib/nerves_hub/workers/schedule_org_audit_log_truncation.ex @@ -1,6 +1,7 @@ defmodule NervesHub.Workers.ScheduleOrgAuditLogTruncation do use Oban.Worker, - queue: :truncation + queue: :truncation, + max_attempts: 1 alias NervesHub.Accounts alias NervesHub.Workers.OrgAuditLogTruncation diff --git a/lib/nerves_hub_web/live/devices/index-new.html.heex b/lib/nerves_hub_web/live/devices/index-new.html.heex index 800c27d93..c5c2c3cdc 100644 --- a/lib/nerves_hub_web/live/devices/index-new.html.heex +++ b/lib/nerves_hub_web/live/devices/index-new.html.heex @@ -117,7 +117,7 @@