diff --git a/.formatter.exs b/.formatter.exs new file mode 100644 index 0000000..2bed17c --- /dev/null +++ b/.formatter.exs @@ -0,0 +1,3 @@ +[ + inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/.travis.yml b/.travis.yml index 0c5c3e3..fcdcd2f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,7 @@ elixir: - 1.6.0 otp_release: - - 18.0 + - 19.0 script: - mix dialyzer --halt-exit-status diff --git a/lib/sidewalk/client.ex b/lib/sidewalk/client.ex index 2893e87..a6eea07 100644 --- a/lib/sidewalk/client.ex +++ b/lib/sidewalk/client.ex @@ -10,10 +10,10 @@ defmodule Sidewalk.Client do For more information of the structure of a Sidewalk job, please have a look at the `Job` module. """ - @type job :: Sidewalk.Job.t + @type job :: Sidewalk.Job.t() @type enqueue_delay :: integer() - @type enqueue_time :: float() - @type response :: {:ok, String.t} | {:error, String.t} + @type enqueue_time :: float() + @type response :: {:ok, String.t()} | {:error, String.t()} @doc """ Jobs enqueued with this function will be executed by Sidekiq as soon as possible. @@ -24,12 +24,10 @@ defmodule Sidewalk.Client do {:ok, jid} = Sidewalk.Client.enqueue(job) # => jid: "2f87a952ced00ea6cdd61245" """ @spec enqueue(job) :: response - def enqueue(job=%Sidewalk.Job{}) do - with {:ok, normalized_job} <- normalize_job(job), do: atomic_push(normalized_job) - end - def enqueue(_) do - {:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]}"} - end + def enqueue(job = %Sidewalk.Job{}), + do: with({:ok, normalized_job} <- normalize_job(job), do: atomic_push(normalized_job)) + def enqueue(_), + do: {:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]}"} @doc """ Jobs enqueued with this function will be executed by Sidekiq after a defined delay in seconds. @@ -41,15 +39,14 @@ defmodule Sidewalk.Client do """ @spec enqueue_in(job, enqueue_delay) :: response def enqueue_in(job, enqueue_in_seconds \\ 60) - def enqueue_in(job=%Sidewalk.Job{}, enqueue_in_seconds) when is_integer(enqueue_in_seconds) and enqueue_in_seconds > 0 do - enqueue_at(job, (current_unix_timestamp() + enqueue_in_seconds)) - end - def enqueue_in(job=%Sidewalk.Job{}, enqueue_in_seconds) when is_integer(enqueue_in_seconds) and enqueue_in_seconds <= 0 do - enqueue(job) - end - def enqueue_in(_,_) do - {:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]} and a valid 'enqueue in' delay in seconds greater than 0"} - end + def enqueue_in(job = %Sidewalk.Job{}, enqueue_in_seconds) + when is_integer(enqueue_in_seconds) and enqueue_in_seconds > 0, + do: enqueue_at(job, current_unix_timestamp() + enqueue_in_seconds) + def enqueue_in(job = %Sidewalk.Job{}, enqueue_in_seconds) + when is_integer(enqueue_in_seconds) and enqueue_in_seconds <= 0, + do: enqueue(job) + def enqueue_in(_,_), + do: {:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]} and a valid 'enqueue in' delay in seconds greater than 0"} @doc """ Jobs enqueued with this function will be executed by Sidekiq at a given unix timestamp. @@ -60,34 +57,53 @@ defmodule Sidewalk.Client do {:ok, jid} = Sidewalk.Client.enqueue_at(job, 1546293600) # => jid: "d6ceac7d6c42d35ff6cac8a0" """ @spec enqueue_at(job, enqueue_time) :: response - def enqueue_at(job, enqueue_at_timestamp \\ (current_unix_timestamp() + 60)) - def enqueue_at(job=%Sidewalk.Job{}, enqueue_at_timestamp) when is_number(enqueue_at_timestamp) and enqueue_at_timestamp > 1_000_000_000 do - if enqueue_at_timestamp <= current_unix_timestamp() do - enqueue(job) + def enqueue_at(job, enqueue_at_timestamp \\ current_unix_timestamp() + 60) + def enqueue_at(job = %Sidewalk.Job{}, enqueue_at_timestamp) + when is_number(enqueue_at_timestamp) and enqueue_at_timestamp > 1.0e9 do + if enqueue_at_timestamp > current_unix_timestamp() do + with {:ok, normalized_job} <- normalize_job(job), + do: atomic_push(normalized_job, enqueue_at_timestamp) else - with {:ok, normalized_job} <- normalize_job(job), do: atomic_push(normalized_job, enqueue_at_timestamp) + enqueue(job) end end - def enqueue_at(_,_) do - {:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]} and a valid 'enqueue at' formated as unix timestamp"} - end - + def enqueue_at(_,_), + do: {:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]} and a valid 'enqueue at' formated as unix timestamp"} ############################################################ ## --> HELPER FUNCTIONS - @spec normalize_job(job) :: {:ok, job} | {:error, String.t} - defp normalize_job(job) do - case job do - %Sidewalk.Job{class: class} when not is_binary(class) or (is_binary(class) and byte_size(class) <= 0) -> - {:error, "Job class must be a valid String representation of the class name"} - %Sidewalk.Job{queue: queue} when not is_binary(queue) or (is_binary(queue) and byte_size(queue) <= 0) -> - {:error, "Job queue must be a valid String representation of the queue name"} - %Sidewalk.Job{args: args} when not is_list(args) -> - {:error, "Job args must be a List"} - %Sidewalk.Job{class: class, args: args} when is_binary(class) and byte_size(class) > 0 and is_list(args) -> - {:ok, %{job | jid: random_jid(), created_at: current_unix_timestamp()}} - _ -> - {:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]}"} + @spec normalize_job(job) :: {:ok, job} | {:error, String.t()} + defp normalize_job(%Sidewalk.Job{class: class}) + when not is_binary(class) or (is_binary(class) and byte_size(class) <= 0), + do: {:error, "Job class must be a valid String representation of the class name"} + defp normalize_job(%Sidewalk.Job{queue: queue}) + when not is_binary(queue) or (is_binary(queue) and byte_size(queue) <= 0), + do: {:error, "Job queue must be a valid String representation of the queue name"} + defp normalize_job(%Sidewalk.Job{args: args}) when not is_list(args), + do: {:error, "Job args must be a List"} + defp normalize_job(%Sidewalk.Job{class: class, args: args} = job) + when is_binary(class) and byte_size(class) > 0 and is_list(args), + do: {:ok, %{job | jid: random_jid(), created_at: current_unix_timestamp()}} + defp normalize_job(_), + do: {:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]}"} + + @spec atomic_push(job, number) :: response + defp atomic_push(job, enqueue_at_timestamp) + when is_map(job) and is_number(enqueue_at_timestamp) and enqueue_at_timestamp > 1.0e9 do + case Poison.encode(%{job | enqueued_at: current_unix_timestamp()}) do + {:ok, encoded_job} -> + :poolboy.transaction(:sidewalk_pool, fn conn -> + with {:ok, _} <- + Redix.command(conn, [ + "ZADD", + namespacify("schedule"), + to_string(enqueue_at_timestamp), + encoded_job + ]), do: {:ok, job.jid} + end) + + {:error, error_message} -> + {:error, "Unable to enqueue delayed Job: #{error_message}"} end end @@ -95,43 +111,31 @@ defmodule Sidewalk.Client do defp atomic_push(job) when is_map(job) do case Poison.encode(%{job | enqueued_at: current_unix_timestamp()}) do {:ok, encoded_job} -> - :poolboy.transaction(:sidewalk_pool, fn(conn) -> - with \ - {:ok, _} <- Redix.command(conn, ["MULTI"]), - {:ok, _} <- Redix.command(conn, ["SADD", namespacify("queues"), job.queue]), - {:ok, _} <- Redix.command(conn, ["LPUSH", namespacify("queue:#{job.queue}"), encoded_job]), - {:ok, _} <- Redix.command(conn, ["EXEC"]) - do + :poolboy.transaction(:sidewalk_pool, fn conn -> + with {:ok, _} <- Redix.command(conn, ["MULTI"]), + {:ok, _} <- Redix.command(conn, ["SADD", namespacify("queues"), job.queue]), + {:ok, _} <- Redix.command(conn, ["LPUSH", namespacify("queue:#{job.queue}"), encoded_job]), + {:ok, _} <- Redix.command(conn, ["EXEC"]) do {:ok, job.jid} else {:error, redix_error} -> raise redix_error end end) - {:error, error_message} -> - {:error, "Unable to enqueue Job: #{error_message}"} - end - end - @spec atomic_push(job, number) :: response - defp atomic_push(job, at) when is_map(job) and is_number(at) and at > 1_000_000_000 do - case Poison.encode(%{job | enqueued_at: current_unix_timestamp()}) do - {:ok, encoded_job} -> - :poolboy.transaction(:sidewalk_pool, fn(conn) -> - with {:ok, _} <- Redix.command(conn, ["ZADD", namespacify("schedule"), to_string(at), encoded_job]), do: {:ok, job.jid} - end) {:error, error_message} -> - {:error, "Unable to enqueue delayed Job: #{error_message}"} + {:error, "Unable to enqueue Job: #{error_message}"} end end - @spec random_jid :: String.t + @spec random_jid :: String.t() defp random_jid do - :crypto.strong_rand_bytes(12) + 12 + |> :crypto.strong_rand_bytes() |> Base.encode16(case: :lower) end - @spec namespacify(String.t) :: String.t + @spec namespacify(String.t()) :: String.t() defp namespacify(key) do if namespace = Application.get_env(:sidewalk, :namespace) do "#{namespace}:#{key}" @@ -141,9 +145,5 @@ defmodule Sidewalk.Client do end @spec current_unix_timestamp :: float() - defp current_unix_timestamp do - {mega_seconds, seconds, microseconds} = :os.timestamp - String.to_float("#{mega_seconds}#{seconds}.#{microseconds}") - end - + defp current_unix_timestamp, do: :erlang.system_time() / 1.0e9 end diff --git a/lib/sidewalk/job.ex b/lib/sidewalk/job.ex index d16ce64..ef38c03 100644 --- a/lib/sidewalk/job.ex +++ b/lib/sidewalk/job.ex @@ -11,29 +11,27 @@ defmodule Sidewalk.Job do - **enqueue_at** -> The timestamp when the job is really enqueued with the Redis server. - **queue** -> The queue where a job should be enqueued. Defaults to "default" queue. - **retry** -> Tells the Sidekiq worker to retry the enqueue job. - - **wrapped** -> For use when you're dealing with an Active Job system. + - **wrapped** -> For use when dealing with an ActiveJob system. """ @derive [Poison.Encoder] - defstruct [ - jid: "", - class: "", - args: [], - created_at: 0.0, - enqueued_at: 0.0, - queue: "default", - retry: true, - wrapped: "" - ] + defstruct jid: "", + class: "", + args: [], + created_at: 0.0, + enqueued_at: 0.0, + queue: "default", + retry: true, + wrapped: "" @type t :: %Sidewalk.Job{ - jid: String.t, - class: String.t, - args: list(), - created_at: float(), - enqueued_at: float(), - queue: String.t, - retry: boolean, - wrapped: String.t - } + jid: String.t(), + class: String.t(), + args: list(), + created_at: float(), + enqueued_at: float(), + queue: String.t(), + retry: boolean, + wrapped: String.t() + } end diff --git a/mix.exs b/mix.exs index 04031b4..d5d0f96 100644 --- a/mix.exs +++ b/mix.exs @@ -4,19 +4,18 @@ defmodule Sidewalk.Mixfile do def project do [ app: :sidewalk, - version: "0.3.3", + version: "0.3.4", elixir: "~> 1.4", - build_embedded: Mix.env == :prod, - start_permanent: Mix.env == :prod, + build_embedded: Mix.env() == :prod, + start_permanent: Mix.env() == :prod, description: description(), package: package(), deps: deps() - ] + ] end def application do - [extra_applications: [:logger, :crypto], - mod: {Sidewalk, []}] + [extra_applications: [:logger, :crypto], mod: {Sidewalk, []}] end defp deps do @@ -24,9 +23,9 @@ defmodule Sidewalk.Mixfile do {:redix, "~> 0.6.0"}, {:poolboy, "~> 1.5.1"}, {:poison, "~> 3.1.0"}, - {:ex_doc, "~> 0.16.0", only: [:dev]}, - {:earmark, "~> 1.2.0", only: [:dev]}, - {:dialyxir, "~> 0.5", only: [:dev], runtime: false} + {:ex_doc, "~> 0.16.0", only: [:dev]}, + {:earmark, "~> 1.2.0", only: [:dev]}, + {:dialyxir, "~> 0.5", only: [:dev], runtime: false} ] end diff --git a/test/sidewalk/client_test.exs b/test/sidewalk/client_test.exs index 8eeb705..00372f2 100644 --- a/test/sidewalk/client_test.exs +++ b/test/sidewalk/client_test.exs @@ -86,7 +86,7 @@ defmodule Sidewalk.ClientTest do assert stored_job.retry == false assert stored_job.enqueued_at > 0 assert stored_job.created_at > 0 - assert stored_job.created_at < stored_job.enqueued_at + assert stored_job.created_at <= stored_job.enqueued_at end test "enqueue_in/2 to write data correctly to redis", %{redis: redis} do @@ -102,7 +102,7 @@ defmodule Sidewalk.ClientTest do assert stored_job.retry == false assert stored_job.enqueued_at > 0 assert stored_job.created_at > 0 - assert stored_job.created_at < stored_job.enqueued_at + assert stored_job.created_at <= stored_job.enqueued_at assert String.to_float(raw_execution_time) > 1_000_000_000 end @@ -119,7 +119,7 @@ defmodule Sidewalk.ClientTest do assert stored_job.retry == false assert stored_job.enqueued_at > 0 assert stored_job.created_at > 0 - assert stored_job.created_at < stored_job.enqueued_at + assert stored_job.created_at <= stored_job.enqueued_at assert String.to_integer(raw_execution_time) == 2_000_000_000 end