Skip to content

Commit 2454d35

Browse files
authored
feat(commuter_rail_occupancies): read commuter rail occupancies from S3 instead of firebase (#858)
Problem: Keolis needs to decommission their Firebase feed Solution: We're having them move the contents of that Firebase feed to S3.
1 parent 8be57e1 commit 2454d35

File tree

8 files changed

+242
-26
lines changed

8 files changed

+242
-26
lines changed

apps/api_accounts/config/dev.exs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,4 @@ import Config
22

33
config :api_accounts, table_prefix: "DEV"
44

5-
config :ex_aws,
6-
access_key_id: "DevAccessKey",
7-
secret_access_key: "DevSecretKey"
8-
95
config :api_accounts, ApiAccounts.Mailer, adapter: Bamboo.LocalAdapter

apps/parse/lib/parse/commuter_rail_occupancies.ex

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,38 @@ defmodule Parse.CommuterRailOccupancies do
1313
{:ok, %{"data" => data}} when is_list(data) ->
1414
Enum.flat_map(data, &parse_record/1)
1515

16+
{:ok, data} when is_list(data) ->
17+
Enum.flat_map(data, &parse_record/1)
18+
1619
e ->
1720
Logger.warning("#{__MODULE__} decode_error e=#{inspect(e)}")
1821
[]
1922
end
2023
end
2124

22-
defp parse_record(
23-
%{
24-
"MedianDensity" => density,
25-
"MedianDensityFlag" => flag,
26-
"cTrainNo" => train
27-
} = record
28-
) do
29-
with {:ok, flag} <- density_flag(flag),
25+
defp parse_density_fields(%{
26+
"MedianDensity" => density,
27+
"MedianDensityFlag" => flag,
28+
"cTrainNo" => train
29+
}),
30+
do: {:ok, {density, flag, train}}
31+
32+
# new format keolis started providing when they switched this data over to S3
33+
defp parse_density_fields(%{
34+
"Median Density" => density,
35+
"Median Density Flag" => flag,
36+
"Trip Name" => train
37+
}),
38+
do: {:ok, {density, flag, train}}
39+
40+
defp parse_density_fields(record) do
41+
Logger.warning("#{__MODULE__} parse_error error=missing_fields #{inspect(record)}")
42+
{:error, :missing_fields}
43+
end
44+
45+
defp parse_record(record) do
46+
with {:ok, {density, flag, train}} <- parse_density_fields(record),
47+
{:ok, flag} <- density_flag(flag),
3048
{:ok, percentage} <- percentage(density),
3149
{:ok, name} <- trip_name(train) do
3250
[
@@ -43,11 +61,6 @@ defmodule Parse.CommuterRailOccupancies do
4361
end
4462
end
4563

46-
defp parse_record(record) do
47-
Logger.warning("#{__MODULE__} parse_error error=missing_fields #{inspect(record)}")
48-
[]
49-
end
50-
5164
defp density_flag(0), do: {:ok, :many_seats_available}
5265
defp density_flag(1), do: {:ok, :few_seats_available}
5366
defp density_flag(2), do: {:ok, :full}

apps/state_mediator/config/config.exs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ config :state_mediator, :commuter_rail_crowding,
1010
"CR_CROWDING_BASE_URL",
1111
"https://keolis-api-development.firebaseio.com/p-kcs-trms-firebase-7dayloading.json"
1212
},
13-
enabled: {:system, "CR_CROWDING_ENABLED", "false"}
13+
enabled: {:system, "CR_CROWDING_ENABLED", "true"},
14+
s3_bucket: {:system, "CR_CROWDING_S3_BUCKET"},
15+
s3_object: {:system, "CR_CROWDING_S3_OBJECT"},
16+
source: {:system, "CR_CROWING_SOURCE", "s3"}
1417

1518
config :state_mediator, Realtime,
1619
gtfs_url: {:system, "MBTA_GTFS_URL", "https://cdn.mbta.com/MBTA_GTFS.zip"},

apps/state_mediator/lib/state_mediator.ex

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,14 @@ defmodule StateMediator do
77
# See http://elixir-lang.org/docs/stable/elixir/Application.html
88
# for more information on OTP Applications
99
def start(_type, _args) do
10+
crowding_source = app_value(:commuter_rail_crowding, :source)
11+
1012
children =
1113
children(Application.get_env(:state_mediator, :start)) ++
12-
crowding_children(app_value(:commuter_rail_crowding, :enabled) == "true")
14+
crowding_children(
15+
app_value(:commuter_rail_crowding, :enabled) == "true",
16+
crowding_source
17+
)
1318

1419
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
1520
# for other strategies and supported options
@@ -94,9 +99,30 @@ defmodule StateMediator do
9499
}
95100
end
96101

97-
@spec crowding_children(boolean()) :: [:supervisor.child_spec() | {module(), term()} | module()]
98-
defp crowding_children(true) do
99-
Logger.info("#{__MODULE__} CR_CROWDING_ENABLED=true")
102+
@spec crowding_children(boolean(), String.t()) :: [
103+
:supervisor.child_spec() | {module(), term()} | module()
104+
]
105+
defp crowding_children(true, "s3") do
106+
Logger.info("#{__MODULE__} CR_CROWDING_ENABLED=true, source=s3")
107+
108+
[
109+
{
110+
StateMediator.S3Mediator,
111+
[
112+
spec_id: :cr_s3_crowding_mediator,
113+
bucket_arn: "mbta-gtfs-commuter-rail-staging",
114+
object: "crowding-trends.json",
115+
spec_id: :s3_mediator,
116+
interval: 5 * 60 * 1_000,
117+
sync_timeout: 30_000,
118+
state: State.CommuterRailOccupancy
119+
]
120+
}
121+
]
122+
end
123+
124+
defp crowding_children(true, "firebase") do
125+
Logger.info("#{__MODULE__} CR_CROWDING_ENABLED=true, source=firebase")
100126

101127
credentials = :commuter_rail_crowding |> app_value(:firebase_credentials) |> Jason.decode!()
102128

@@ -124,7 +150,7 @@ defmodule StateMediator do
124150
]
125151
end
126152

127-
defp crowding_children(false) do
153+
defp crowding_children(false, _) do
128154
Logger.info("#{__MODULE__} CR_CROWDING_ENABLED=false")
129155
[]
130156
end
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
defmodule StateMediator.S3Mediator do
2+
@moduledoc """
3+
4+
S3Mediator is responsible for reading files from an S3 bucket and
5+
sending messages to the state module.
6+
7+
"""
8+
9+
defstruct [
10+
:module,
11+
:bucket_arn,
12+
:object,
13+
:sync_timeout,
14+
:interval
15+
]
16+
17+
@opaque t :: %__MODULE__{
18+
module: module,
19+
bucket_arn: String.t(),
20+
object: String.t(),
21+
sync_timeout: pos_integer()
22+
}
23+
24+
use GenServer
25+
require Logger
26+
alias ExAws.S3
27+
28+
def child_spec(opts) do
29+
{spec_id, opts} = Keyword.pop!(opts, :spec_id)
30+
31+
%{
32+
id: spec_id,
33+
start: {__MODULE__, :start_link, [opts]}
34+
}
35+
end
36+
37+
@spec start_link(Keyword.t()) :: {:ok, pid}
38+
def start_link(options) do
39+
GenServer.start_link(__MODULE__, options)
40+
end
41+
42+
@spec stop(pid) :: :ok
43+
def stop(pid) do
44+
GenServer.stop(pid)
45+
end
46+
47+
@spec init(Keyword.t()) :: {:ok, __MODULE__.t(), {:continue, any()}}
48+
def init(options) do
49+
state_module = Keyword.fetch!(options, :state)
50+
51+
bucket_arn = Keyword.fetch!(options, :bucket_arn)
52+
object = Keyword.fetch!(options, :object)
53+
sync_timeout = options |> Keyword.get(:sync_timeout, 5000)
54+
interval = options |> Keyword.get(:interval, 5000)
55+
56+
state = %__MODULE__{
57+
interval: interval,
58+
module: state_module,
59+
bucket_arn: bucket_arn,
60+
object: object,
61+
sync_timeout: sync_timeout
62+
}
63+
64+
{:ok, state, {:continue, nil}}
65+
end
66+
67+
@spec handle_continue(any, t) :: {:noreply, t} | {:noreply, t, :hibernate}
68+
def handle_continue(_, %{module: state_module} = state) do
69+
_ = Logger.debug(fn -> "#{__MODULE__} #{state_module} initial sync starting" end)
70+
fetch(state)
71+
end
72+
73+
def handle_info(:timeout, %{module: state_module} = state) do
74+
_ = Logger.debug(fn -> "#{__MODULE__} #{state_module} timeout sync starting" end)
75+
fetch(state)
76+
end
77+
78+
defp fetch(%{bucket_arn: bucket_arn, object: object} = state) do
79+
bucket_arn
80+
|> S3.get_object(object)
81+
|> ExAws.request()
82+
|> handle_response(state)
83+
end
84+
85+
defp handle_response(
86+
{:ok, %{body: body}},
87+
%{sync_timeout: sync_timeout, module: state_module} = state
88+
) do
89+
debug_time("#{state_module} new state", fn -> state_module.new_state(body, sync_timeout) end)
90+
91+
schedule_update(state)
92+
end
93+
94+
defp handle_response(
95+
response,
96+
state
97+
) do
98+
Logger.warning(
99+
"Received unknown response when getting commuter rail occupancies from S3: #{inspect(response)}"
100+
)
101+
102+
schedule_update(state)
103+
end
104+
105+
defp schedule_update(%{interval: interval} = state) when interval != nil do
106+
{:noreply, state, interval}
107+
end
108+
109+
defp debug_time(description, func) do
110+
State.Logger.debug_time(func, fn milliseconds ->
111+
"#{__MODULE__} #{description} took #{milliseconds}ms"
112+
end)
113+
end
114+
end

apps/state_mediator/mix.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ defmodule StateMediator.Mixfile do
5151
{:goth, "~> 1.3"},
5252
{:hackney, "~> 1.18"},
5353
{:timex, "~> 3.7"},
54-
{:emqtt_failover, "~> 0.3"}
54+
{:emqtt_failover, "~> 0.3"},
55+
{:mox, "~> 1.0", only: :test}
5556
]
5657
end
5758
end
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
defmodule StateMediator.S3MediatorTest do
2+
use ExUnit.Case, async: true
3+
4+
import Mox
5+
import StateMediator.S3Mediator
6+
7+
defmodule StateModule do
8+
def size do
9+
0
10+
end
11+
12+
def new_state(pid, _timeout) do
13+
send(pid, :received_new_state)
14+
end
15+
end
16+
17+
@moduletag capture_log: true
18+
@opts [
19+
bucket_arn: "mbta-gtfs-boom-shakalaka",
20+
object: "objection",
21+
state: __MODULE__.StateModule,
22+
interval: 1_000
23+
]
24+
25+
describe "init/1" do
26+
test "fires a continue" do
27+
assert {:ok, _, {:continue, _}} = init(@opts)
28+
end
29+
30+
test "builds an initial state" do
31+
assert {:ok, state, {:continue, _}} = init(@opts)
32+
assert %StateMediator.S3Mediator{} = state
33+
assert state.module == @opts[:state]
34+
assert state.bucket_arn == @opts[:bucket_arn]
35+
assert state.sync_timeout == 5_000
36+
assert state.interval == 1_000
37+
end
38+
end
39+
40+
describe "handle_info/2" do
41+
test "on body: schedules an update" do
42+
{:ok, state, {:continue, _}} = init(@opts)
43+
assert {:noreply, ^state, 1_000} = handle_info(:timeout, state)
44+
end
45+
46+
test "on error: schedules an update" do
47+
{:ok, state, {:continue, _}} = init(@opts)
48+
Mox.defmock(FakeAws, for: ExAws.Behaviour)
49+
50+
test_pid = self()
51+
monitor_pid = GenServer.whereis(StateMediator.S3Mediator)
52+
allow(FakeAws, test_pid, monitor_pid)
53+
stub(FakeAws, :request, fn _ -> {:error, %{body: "your transit isn't rapid enough"}} end)
54+
55+
assert {:noreply, ^state, 1_000} = handle_info(:timeout, state)
56+
end
57+
end
58+
end

config/runtime.exs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ if is_prod? and is_release? do
2828
port: "DYNAMO_PORT" |> System.get_env("443") |> String.to_integer(),
2929
scheme: System.get_env("DYNAMO_SCHEME", "https://"),
3030
host: System.fetch_env!("DYNAMO_HOST")
31-
]
31+
],
32+
json_codec: Jason
33+
3234

3335
config :alb_monitor,
3436
ecs_metadata_uri: System.fetch_env!("ECS_CONTAINER_METADATA_URI"),
@@ -64,7 +66,10 @@ if is_prod? and is_release? do
6466
secret_key_base: System.fetch_env!("SECRET_KEY_BASE")
6567

6668
config :state_mediator, :commuter_rail_crowding,
67-
firebase_credentials: System.fetch_env!("CR_CROWDING_FIREBASE_CREDENTIALS")
69+
firebase_credentials: System.fetch_env!("CR_CROWDING_FIREBASE_CREDENTIALS"),
70+
s3_bucket: System.fetch_env!("CR_CROWDING_S3_BUCKET"),
71+
s3_object: System.fetch_env!("CR_CROWDING_S3_OBJECT"),
72+
source: System.fetch_env!("CR_CROWDING_SOURCE")
6873

6974
config :recaptcha,
7075
enabled: true,

0 commit comments

Comments
 (0)