diff --git a/lib/que/helpers.ex b/lib/que/helpers.ex index 2d3e9c6..db1c519 100644 --- a/lib/que/helpers.ex +++ b/lib/que/helpers.ex @@ -32,9 +32,9 @@ defmodule Que.Helpers do @doc """ Off-loads tasks to custom `Que.TaskSupervisor` """ - @spec do_task((() -> any)) :: {:ok, pid} + @spec do_task((() -> any)) :: Task.t() def do_task(fun) do - Task.Supervisor.start_child(Que.TaskSupervisor, fun) + Task.Supervisor.async_nolink(Que.TaskSupervisor, fun) end diff --git a/lib/que/job.ex b/lib/que/job.ex index d8b97dc..4f75a95 100644 --- a/lib/que/job.ex +++ b/lib/que/job.ex @@ -59,14 +59,15 @@ defmodule Que.Job do def perform(job) do Que.Helpers.log("Starting #{job}") - {:ok, pid} = + task = Que.Helpers.do_task(fn -> Logger.metadata(job_id: job.id) job.worker.on_setup(job) job.worker.perform(job.arguments) end) - %{ job | status: :started, pid: pid, ref: Process.monitor(pid) } + # Return task instead of task.ref so a job can be awaited during tests + %{job | status: :started, pid: task.pid, ref: task} end @@ -76,17 +77,19 @@ defmodule Que.Job do Handles Job Success, Calls appropriate worker method and updates the job status to :completed """ - @spec handle_success(job :: Que.Job.t) :: Que.Job.t - def handle_success(job) do - Que.Helpers.log("Completed #{job}") + @spec handle_success(job :: Que.Job.t(), result :: term()) :: Que.Job.t() + def handle_success(job, result) do + Que.Helpers.log("Completed #{job} with result #{inspect(result)}}") + + job = %{job | status: :completed, pid: nil, ref: nil} Que.Helpers.do_task(fn -> Logger.metadata(job_id: job.id) - job.worker.on_success(job.arguments) + job.worker.on_success(job, result) job.worker.on_teardown(job) end) - %{ job | status: :completed, pid: nil, ref: nil } + job end @@ -102,7 +105,7 @@ defmodule Que.Job do Que.Helpers.do_task(fn -> Logger.metadata(job_id: job.id) - job.worker.on_failure(job.arguments, error) + job.worker.on_failure(job, error) job.worker.on_teardown(job) end) @@ -120,4 +123,3 @@ defimpl String.Chars, for: Que.Job do "Job # #{job.id} with #{ExUtils.Module.name(job.worker)}" end end - diff --git a/lib/que/queue.ex b/lib/que/queue.ex index 869dba1..3a8f813 100644 --- a/lib/que/queue.ex +++ b/lib/que/queue.ex @@ -105,8 +105,10 @@ defmodule Que.Queue do @spec find(queue :: Que.Queue.t, key :: atom, value :: term) :: Que.Job.t | nil def find(queue, key \\ :id, value) + # job.ref is actually a Task. So, we need to access job.ref.ref to get + # the real reference of a job. def find(%Que.Queue{ running: running }, :ref, value) do - Enum.find(running, &(Map.get(&1, :ref) == value)) + Enum.find(running, &(Map.get(&1, :ref) |> Map.get(:ref) == value)) end def find(%Que.Queue{} = q, key, value) do diff --git a/lib/que/server.ex b/lib/que/server.ex index 3e01d6e..0a0010e 100644 --- a/lib/que/server.ex +++ b/lib/que/server.ex @@ -97,24 +97,41 @@ defmodule Que.Server do # Job was completed successfully - Does cleanup and executes the Success - # callback on the Worker + # callback on the Worker. It is called when the supervisor finishes the task. @doc false - def handle_info({:DOWN, ref, :process, _pid, :normal}, queue) do + def handle_info({ref, result}, queue) do job = queue |> Que.Queue.find(:ref, ref) - |> Que.Job.handle_success - |> Que.Persistence.update queue = - queue - |> Que.Queue.remove(job) - |> Que.Queue.process + if job != nil do + job = + job + |> Que.Job.handle_success(result) + |> Que.Persistence.update() + + queue + |> Que.Queue.remove(job) + |> Que.Queue.process() + else + queue + end {:noreply, queue} end + # Once the job is done, the supervisor sends a DOWN message to the server + + @doc false + def handle_info({:DOWN, ref, :process, _pid, :normal}, queue) do + Que.Helpers.log("Job completed successfully", :low) + + Process.demonitor(ref, [:flush]) + + {:noreply, queue} + end @@ -126,12 +143,12 @@ defmodule Que.Server do queue |> Que.Queue.find(:ref, ref) |> Que.Job.handle_failure(err) - |> Que.Persistence.update + |> Que.Persistence.update() queue = queue |> Que.Queue.remove(job) - |> Que.Queue.process + |> Que.Queue.process() {:noreply, queue} end @@ -156,4 +173,3 @@ defmodule Que.Server do end end - diff --git a/lib/que/worker.ex b/lib/que/worker.ex index ac77202..68ad1d1 100644 --- a/lib/que/worker.ex +++ b/lib/que/worker.ex @@ -64,7 +64,7 @@ defmodule Que.Worker do ## Handle Job Success & Failure - The worker can also export optional `on_success/1` and `on_failure/2` + The worker can also export optional `on_success/2` and `on_failure/2` callbacks that handle appropriate cases. ``` @@ -75,11 +75,12 @@ defmodule Que.Worker do Mailer.send_campaign_email(campaign, user: user) end - def on_success({campaign, user}) do + def on_success({campaign, user} = job.args, result) do CampaignReport.compile(campaign, status: :success, user: user) + Logger.info("Campaign email returned: \#{result}") end - def on_failure({campaign, user}, error) do + def on_failure({campaign, user} = job.args, error) do CampaignReport.compile(campaign, status: :failed, user: user) Logger.debug("Campaign email to \#{user.id} failed: \#{inspect(error)}") end @@ -198,11 +199,11 @@ defmodule Que.Worker do ## Default implementations of on_success and on_failure callbacks - def on_success(_arg) do + def on_success(_job, _result) do end - def on_failure(_arg, _err) do + def on_failure(_job, _err) do end @@ -214,7 +215,7 @@ defmodule Que.Worker do end - defoverridable [on_success: 1, on_failure: 2, on_setup: 1, on_teardown: 1] + defoverridable [on_success: 2, on_failure: 2, on_setup: 1, on_teardown: 1] @@ -261,7 +262,7 @@ defmodule Que.Worker do Optional callback that is executed when the job is processed successfully. """ - @callback on_success(arguments :: term) :: term + @callback on_success(job :: Que.Job.t(), arguments :: term) :: term @@ -270,7 +271,7 @@ defmodule Que.Worker do Optional callback that is executed if an error is raised during job is processed (in `perform` callback) """ - @callback on_failure(arguments :: term, error :: tuple) :: term + @callback on_failure(job :: Que.Job.t(), error :: tuple) :: term diff --git a/test/que/job_test.exs b/test/que/job_test.exs index c39621e..a8b0c93 100644 --- a/test/que/job_test.exs +++ b/test/que/job_test.exs @@ -71,7 +71,10 @@ defmodule Que.Test.Job do SetupAndTeardownWorker |> Job.new |> Job.perform - |> Job.handle_success + + result = Task.await(job.ref) + + job = job |> Job.handle_success(result) assert job.status == :completed assert job.pid == nil @@ -109,10 +112,12 @@ defmodule Que.Test.Job do test "#handle_success works as expected" do capture = Helpers.capture_log(fn -> + result = true + job = SuccessWorker |> Job.new - |> Job.handle_success + |> Job.handle_success(result) assert job.status == :completed assert job.pid == nil @@ -122,7 +127,7 @@ defmodule Que.Test.Job do end) assert capture =~ ~r/Completed/ - assert capture =~ ~r/success: nil/ + assert capture =~ ~r/success/ end @@ -141,7 +146,7 @@ defmodule Que.Test.Job do end) assert capture =~ ~r/Failed/ - assert capture =~ ~r/failure: nil/ + assert capture =~ ~r/failure/ end end diff --git a/test/support.ex b/test/support.ex index 2e3744d..74a8fab 100644 --- a/test/support.ex +++ b/test/support.ex @@ -27,7 +27,7 @@ defmodule Que.Test.Meta do use Que.Worker def perform(args), do: Logger.debug("#{__MODULE__} - perform: #{inspect(args)}") - def on_success(args), do: Logger.debug("#{__MODULE__} - success: #{inspect(args)}") + def on_success(job, _result), do: Logger.debug("#{__MODULE__} - success: #{inspect(job)}") end @@ -39,7 +39,7 @@ defmodule Que.Test.Meta do raise "some error" end - def on_failure(args, _err), do: Logger.debug("#{__MODULE__} - failure: #{inspect(args)}") + def on_failure(job, _err), do: Logger.debug("#{__MODULE__} - failure: #{inspect(job)}") end @@ -51,8 +51,8 @@ defmodule Que.Test.Meta do Logger.debug("#{__MODULE__} - perform: #{inspect(args)}") end - def on_success(args), do: Logger.debug("#{__MODULE__} - success: #{inspect(args)}") - def on_failure(args, _err), do: Logger.debug("#{__MODULE__} - failure: #{inspect(args)}") + def on_success(job, _result), do: Logger.debug("#{__MODULE__} - success: #{inspect(job)}") + def on_failure(job, _err), do: Logger.debug("#{__MODULE__} - failure: #{inspect(job)}") end