Skip to content

Commit

Permalink
Fixed issue with Elixir 1.6
Browse files Browse the repository at this point in the history
- Fixed issue with Elixir 1.6
- Reformated code with new formater
- Added TravisCI
  • Loading branch information
railsmechanic committed Apr 7, 2018
1 parent 8c2b061 commit 4c8899c
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 100 deletions.
3 changes: 3 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ elixir:
- 1.6.0

otp_release:
- 18.0
- 19.0

script:
- mix dialyzer --halt-exit-status
Expand Down
134 changes: 67 additions & 67 deletions lib/sidewalk/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ defmodule Sidewalk.Client do
For more information of the structure of a Sidewalk job, please have a look at the `Job` module.
"""

@type job :: Sidewalk.Job.t
@type job :: Sidewalk.Job.t()
@type enqueue_delay :: integer()
@type enqueue_time :: float()
@type response :: {:ok, String.t} | {:error, String.t}
@type enqueue_time :: float()
@type response :: {:ok, String.t()} | {:error, String.t()}

@doc """
Jobs enqueued with this function will be executed by Sidekiq as soon as possible.
Expand All @@ -24,12 +24,10 @@ defmodule Sidewalk.Client do
{:ok, jid} = Sidewalk.Client.enqueue(job) # => jid: "2f87a952ced00ea6cdd61245"
"""
@spec enqueue(job) :: response
def enqueue(job=%Sidewalk.Job{}) do
with {:ok, normalized_job} <- normalize_job(job), do: atomic_push(normalized_job)
end
def enqueue(_) do
{:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]}"}
end
def enqueue(job = %Sidewalk.Job{}),
do: with({:ok, normalized_job} <- normalize_job(job), do: atomic_push(normalized_job))
def enqueue(_),
do: {:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]}"}

@doc """
Jobs enqueued with this function will be executed by Sidekiq after a defined delay in seconds.
Expand All @@ -41,15 +39,14 @@ defmodule Sidewalk.Client do
"""
@spec enqueue_in(job, enqueue_delay) :: response
def enqueue_in(job, enqueue_in_seconds \\ 60)
def enqueue_in(job=%Sidewalk.Job{}, enqueue_in_seconds) when is_integer(enqueue_in_seconds) and enqueue_in_seconds > 0 do
enqueue_at(job, (current_unix_timestamp() + enqueue_in_seconds))
end
def enqueue_in(job=%Sidewalk.Job{}, enqueue_in_seconds) when is_integer(enqueue_in_seconds) and enqueue_in_seconds <= 0 do
enqueue(job)
end
def enqueue_in(_,_) do
{:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]} and a valid 'enqueue in' delay in seconds greater than 0"}
end
def enqueue_in(job = %Sidewalk.Job{}, enqueue_in_seconds)
when is_integer(enqueue_in_seconds) and enqueue_in_seconds > 0,
do: enqueue_at(job, current_unix_timestamp() + enqueue_in_seconds)
def enqueue_in(job = %Sidewalk.Job{}, enqueue_in_seconds)
when is_integer(enqueue_in_seconds) and enqueue_in_seconds <= 0,
do: enqueue(job)
def enqueue_in(_,_),
do: {:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]} and a valid 'enqueue in' delay in seconds greater than 0"}

@doc """
Jobs enqueued with this function will be executed by Sidekiq at a given unix timestamp.
Expand All @@ -60,78 +57,85 @@ defmodule Sidewalk.Client do
{:ok, jid} = Sidewalk.Client.enqueue_at(job, 1546293600) # => jid: "d6ceac7d6c42d35ff6cac8a0"
"""
@spec enqueue_at(job, enqueue_time) :: response
def enqueue_at(job, enqueue_at_timestamp \\ (current_unix_timestamp() + 60))
def enqueue_at(job=%Sidewalk.Job{}, enqueue_at_timestamp) when is_number(enqueue_at_timestamp) and enqueue_at_timestamp > 1_000_000_000 do
if enqueue_at_timestamp <= current_unix_timestamp() do
enqueue(job)
def enqueue_at(job, enqueue_at_timestamp \\ current_unix_timestamp() + 60)
def enqueue_at(job = %Sidewalk.Job{}, enqueue_at_timestamp)
when is_number(enqueue_at_timestamp) and enqueue_at_timestamp > 1.0e9 do
if enqueue_at_timestamp > current_unix_timestamp() do
with {:ok, normalized_job} <- normalize_job(job),
do: atomic_push(normalized_job, enqueue_at_timestamp)
else
with {:ok, normalized_job} <- normalize_job(job), do: atomic_push(normalized_job, enqueue_at_timestamp)
enqueue(job)
end
end
def enqueue_at(_,_) do
{:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]} and a valid 'enqueue at' formated as unix timestamp"}
end

def enqueue_at(_,_),
do: {:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]} and a valid 'enqueue at' formated as unix timestamp"}

############################################################
## --> HELPER FUNCTIONS
@spec normalize_job(job) :: {:ok, job} | {:error, String.t}
defp normalize_job(job) do
case job do
%Sidewalk.Job{class: class} when not is_binary(class) or (is_binary(class) and byte_size(class) <= 0) ->
{:error, "Job class must be a valid String representation of the class name"}
%Sidewalk.Job{queue: queue} when not is_binary(queue) or (is_binary(queue) and byte_size(queue) <= 0) ->
{:error, "Job queue must be a valid String representation of the queue name"}
%Sidewalk.Job{args: args} when not is_list(args) ->
{:error, "Job args must be a List"}
%Sidewalk.Job{class: class, args: args} when is_binary(class) and byte_size(class) > 0 and is_list(args) ->
{:ok, %{job | jid: random_jid(), created_at: current_unix_timestamp()}}
_ ->
{:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]}"}
@spec normalize_job(job) :: {:ok, job} | {:error, String.t()}
defp normalize_job(%Sidewalk.Job{class: class})
when not is_binary(class) or (is_binary(class) and byte_size(class) <= 0),
do: {:error, "Job class must be a valid String representation of the class name"}
defp normalize_job(%Sidewalk.Job{queue: queue})
when not is_binary(queue) or (is_binary(queue) and byte_size(queue) <= 0),
do: {:error, "Job queue must be a valid String representation of the queue name"}
defp normalize_job(%Sidewalk.Job{args: args}) when not is_list(args),
do: {:error, "Job args must be a List"}
defp normalize_job(%Sidewalk.Job{class: class, args: args} = job)
when is_binary(class) and byte_size(class) > 0 and is_list(args),
do: {:ok, %{job | jid: random_jid(), created_at: current_unix_timestamp()}}
defp normalize_job(_),
do: {:error, "Job must be a Sidewalk.Job with at least 'class' and 'args' set: %Sidewalk.Job{class: 'SomeWorker', args: ['bob', 1, %{foo: 'bar'}]}"}

@spec atomic_push(job, number) :: response
defp atomic_push(job, enqueue_at_timestamp)
when is_map(job) and is_number(enqueue_at_timestamp) and enqueue_at_timestamp > 1.0e9 do
case Poison.encode(%{job | enqueued_at: current_unix_timestamp()}) do
{:ok, encoded_job} ->
:poolboy.transaction(:sidewalk_pool, fn conn ->
with {:ok, _} <-
Redix.command(conn, [
"ZADD",
namespacify("schedule"),
to_string(enqueue_at_timestamp),
encoded_job
]), do: {:ok, job.jid}
end)

{:error, error_message} ->
{:error, "Unable to enqueue delayed Job: #{error_message}"}
end
end

@spec atomic_push(job) :: response
defp atomic_push(job) when is_map(job) do
case Poison.encode(%{job | enqueued_at: current_unix_timestamp()}) do
{:ok, encoded_job} ->
:poolboy.transaction(:sidewalk_pool, fn(conn) ->
with \
{:ok, _} <- Redix.command(conn, ["MULTI"]),
{:ok, _} <- Redix.command(conn, ["SADD", namespacify("queues"), job.queue]),
{:ok, _} <- Redix.command(conn, ["LPUSH", namespacify("queue:#{job.queue}"), encoded_job]),
{:ok, _} <- Redix.command(conn, ["EXEC"])
do
:poolboy.transaction(:sidewalk_pool, fn conn ->
with {:ok, _} <- Redix.command(conn, ["MULTI"]),
{:ok, _} <- Redix.command(conn, ["SADD", namespacify("queues"), job.queue]),
{:ok, _} <- Redix.command(conn, ["LPUSH", namespacify("queue:#{job.queue}"), encoded_job]),
{:ok, _} <- Redix.command(conn, ["EXEC"]) do
{:ok, job.jid}
else
{:error, redix_error} ->
raise redix_error
end
end)
{:error, error_message} ->
{:error, "Unable to enqueue Job: #{error_message}"}
end
end

@spec atomic_push(job, number) :: response
defp atomic_push(job, at) when is_map(job) and is_number(at) and at > 1_000_000_000 do
case Poison.encode(%{job | enqueued_at: current_unix_timestamp()}) do
{:ok, encoded_job} ->
:poolboy.transaction(:sidewalk_pool, fn(conn) ->
with {:ok, _} <- Redix.command(conn, ["ZADD", namespacify("schedule"), to_string(at), encoded_job]), do: {:ok, job.jid}
end)
{:error, error_message} ->
{:error, "Unable to enqueue delayed Job: #{error_message}"}
{:error, "Unable to enqueue Job: #{error_message}"}
end
end

@spec random_jid :: String.t
@spec random_jid :: String.t()
defp random_jid do
:crypto.strong_rand_bytes(12)
12
|> :crypto.strong_rand_bytes()
|> Base.encode16(case: :lower)
end

@spec namespacify(String.t) :: String.t
@spec namespacify(String.t()) :: String.t()
defp namespacify(key) do
if namespace = Application.get_env(:sidewalk, :namespace) do
"#{namespace}:#{key}"
Expand All @@ -141,9 +145,5 @@ defmodule Sidewalk.Client do
end

@spec current_unix_timestamp :: float()
defp current_unix_timestamp do
{mega_seconds, seconds, microseconds} = :os.timestamp
String.to_float("#{mega_seconds}#{seconds}.#{microseconds}")
end

defp current_unix_timestamp, do: :erlang.system_time() / 1.0e9
end
38 changes: 18 additions & 20 deletions lib/sidewalk/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,27 @@ defmodule Sidewalk.Job do
- **enqueue_at** -> The timestamp when the job is really enqueued with the Redis server.
- **queue** -> The queue where a job should be enqueued. Defaults to "default" queue.
- **retry** -> Tells the Sidekiq worker to retry the enqueue job.
- **wrapped** -> For use when you're dealing with an Active Job system.
- **wrapped** -> For use when dealing with an ActiveJob system.
"""

@derive [Poison.Encoder]
defstruct [
jid: "",
class: "",
args: [],
created_at: 0.0,
enqueued_at: 0.0,
queue: "default",
retry: true,
wrapped: ""
]
defstruct jid: "",
class: "",
args: [],
created_at: 0.0,
enqueued_at: 0.0,
queue: "default",
retry: true,
wrapped: ""

@type t :: %Sidewalk.Job{
jid: String.t,
class: String.t,
args: list(),
created_at: float(),
enqueued_at: float(),
queue: String.t,
retry: boolean,
wrapped: String.t
}
jid: String.t(),
class: String.t(),
args: list(),
created_at: float(),
enqueued_at: float(),
queue: String.t(),
retry: boolean,
wrapped: String.t()
}
end
17 changes: 8 additions & 9 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,28 @@ defmodule Sidewalk.Mixfile do
def project do
[
app: :sidewalk,
version: "0.3.3",
version: "0.3.4",
elixir: "~> 1.4",
build_embedded: Mix.env == :prod,
start_permanent: Mix.env == :prod,
build_embedded: Mix.env() == :prod,
start_permanent: Mix.env() == :prod,
description: description(),
package: package(),
deps: deps()
]
]
end

def application do
[extra_applications: [:logger, :crypto],
mod: {Sidewalk, []}]
[extra_applications: [:logger, :crypto], mod: {Sidewalk, []}]
end

defp deps do
[
{:redix, "~> 0.6.0"},
{:poolboy, "~> 1.5.1"},
{:poison, "~> 3.1.0"},
{:ex_doc, "~> 0.16.0", only: [:dev]},
{:earmark, "~> 1.2.0", only: [:dev]},
{:dialyxir, "~> 0.5", only: [:dev], runtime: false}
{:ex_doc, "~> 0.16.0", only: [:dev]},
{:earmark, "~> 1.2.0", only: [:dev]},
{:dialyxir, "~> 0.5", only: [:dev], runtime: false}
]
end

Expand Down
6 changes: 3 additions & 3 deletions test/sidewalk/client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ defmodule Sidewalk.ClientTest do
assert stored_job.retry == false
assert stored_job.enqueued_at > 0
assert stored_job.created_at > 0
assert stored_job.created_at < stored_job.enqueued_at
assert stored_job.created_at <= stored_job.enqueued_at
end

test "enqueue_in/2 to write data correctly to redis", %{redis: redis} do
Expand All @@ -102,7 +102,7 @@ defmodule Sidewalk.ClientTest do
assert stored_job.retry == false
assert stored_job.enqueued_at > 0
assert stored_job.created_at > 0
assert stored_job.created_at < stored_job.enqueued_at
assert stored_job.created_at <= stored_job.enqueued_at
assert String.to_float(raw_execution_time) > 1_000_000_000
end

Expand All @@ -119,7 +119,7 @@ defmodule Sidewalk.ClientTest do
assert stored_job.retry == false
assert stored_job.enqueued_at > 0
assert stored_job.created_at > 0
assert stored_job.created_at < stored_job.enqueued_at
assert stored_job.created_at <= stored_job.enqueued_at
assert String.to_integer(raw_execution_time) == 2_000_000_000
end

Expand Down

0 comments on commit 4c8899c

Please sign in to comment.