Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve failure telemetry #1307

Merged
merged 1 commit into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/realtime/monitoring/prom_ex.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
defmodule Realtime.PromEx do
alias Realtime.PromEx.Plugins.{OsMon, Phoenix, Tenants, Tenant}

alias Realtime.Nodes
alias Realtime.PromEx.Plugins.Channels
alias Realtime.PromEx.Plugins.OsMon
alias Realtime.PromEx.Plugins.Phoenix
alias Realtime.PromEx.Plugins.Tenant
alias Realtime.PromEx.Plugins.Tenants

@moduledoc """
Be sure to add the following to finish setting up PromEx:
Expand Down Expand Up @@ -70,6 +73,7 @@ defmodule Realtime.PromEx do
{OsMon, poll_rate: poll_rate},
{Tenants, poll_rate: poll_rate},
{Tenant, poll_rate: poll_rate},
{Channels, poll_rate: poll_rate},
{PromEx.Plugins.Ecto, otp_app: :realtime, poll_rate: poll_rate, metric_prefix: [:ecto]}
]
end
Expand Down
20 changes: 20 additions & 0 deletions lib/realtime/monitoring/prom_ex/plugins/channels.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Realtime.PromEx.Plugins.Channels do
@moduledoc """
Realtime channels monitoring plugin for PromEx
"""
use PromEx.Plugin
require Logger

@impl true
def event_metrics(_opts) do
Event.build(:realtime, [
counter(
[:realtime, :channel, :error],
event_name: [:realtime, :channel, :error],
measurement: :code,
tags: [:code],
description: "Count of errors in the Realtime channels initialization"
)
])
end
end
10 changes: 2 additions & 8 deletions lib/realtime/monitoring/prom_ex/plugins/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@ defmodule Realtime.PromEx.Plugins.Tenants do
@event_connected [:prom_ex, :plugin, :realtime, :tenants, :connected]

@impl true
def event_metrics(opts) do
rpc_metrics(opts)
end

defp rpc_metrics(_opts) do
def event_metrics(_) do
Event.build(:realtime, [
distribution(
[:realtime, :rpc],
Expand Down Expand Up @@ -58,9 +54,7 @@ defmodule Realtime.PromEx.Plugins.Tenants do
-1
end

execute_metrics(@event_connected, %{
connected: connected
})
execute_metrics(@event_connected, %{connected: connected})
end

defp execute_metrics(event, metrics) do
Expand Down
50 changes: 31 additions & 19 deletions lib/realtime/rpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,11 @@ defmodule Realtime.Rpc do
def call(node, mod, func, args, opts \\ []) do
timeout = Keyword.get(opts, :timeout, Application.get_env(:realtime, :rpc_timeout))
{latency, response} = :timer.tc(fn -> :rpc.call(node, mod, func, args, timeout) end)
tenant = Keyword.get(opts, :tenant, nil)

Telemetry.execute(
[:realtime, :tenants, :rpc],
[:realtime, :rpc],
%{latency: latency},
%{
tenant: tenant,
mod: mod,
func: func,
target_node: node,
origin_node: node()
}
%{mod: mod, func: func, target_node: node, origin_node: node()}
)

response
Expand All @@ -36,21 +29,36 @@ defmodule Realtime.Rpc do
def enhanced_call(node, mod, func, args \\ [], opts \\ []) do
timeout = Keyword.get(opts, :timeout, Application.get_env(:realtime, :rpc_timeout))

with {latency, {status, _} = response} <-
with {latency, response} <-
:timer.tc(fn -> :erpc.call(node, mod, func, args, timeout) end) do
Telemetry.execute(
[:realtime, :rpc],
%{latency: latency, success?: status == :ok},
%{mod: mod, func: func, target_node: node, origin_node: node()}
)

case response do
{status, _} when status in [:ok, :error] -> response
_ -> {:error, response}
{:ok, _} ->
Telemetry.execute(
[:realtime, :rpc],
%{latency: latency, success?: true},
%{mod: mod, func: func, target_node: node, origin_node: node()}
)

response

{:error, response} ->
Telemetry.execute(
[:realtime, :rpc],
%{latency: latency, success?: false},
%{mod: mod, func: func, target_node: node, origin_node: node()}
)

{:error, response}
end
end
catch
kind, reason ->
Telemetry.execute(
[:realtime, :rpc],
%{latency: 0, success?: false},
%{mod: mod, func: func, target_node: node, origin_node: node()}
)

log_error(
"ErrorOnRpcCall",
%{target: node, mod: mod, func: func, error: {kind, reason}},
Expand All @@ -59,6 +67,10 @@ defmodule Realtime.Rpc do
target: node
)

{:error, "RPC call error"}
case reason do
{:erpc, :timeout} -> {:error, :rpc_error, :timeout}
{:exception, error, _} -> {:error, :rpc_error, error}
_ -> {:error, reason}
end
end
end
14 changes: 14 additions & 0 deletions lib/realtime_web/channels/realtime_channel/logging.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
"""
require Logger
import Realtime.Logs
alias Realtime.Telemetry

@doc """
Logs messages according to user options given on config
Expand All @@ -20,6 +21,17 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
socket
end

@doc """
List of errors that are system triggered and not user driven
"""
def system_errors,
do: [
"UnableToSetPolicies",
"InitializingProjectConnection",
"DatabaseConnectionIssue",
"UnknownErrorOnChannel"
]

@doc """
Logs errors in an expected format
"""
Expand All @@ -32,6 +44,8 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
def log_error_message(level, code, error, metadata \\ [])

def log_error_message(:error, code, error, metadata) do
if code in system_errors(), do: Telemetry.execute([:realtime, :channel, :error], %{code: code}, %{code: code})

log_error(code, error, metadata)
{:error, %{reason: error}}
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.29",
version: "2.34.30",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
19 changes: 9 additions & 10 deletions test/realtime/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ defmodule Realtime.DatabaseTest do
use Realtime.DataCase, async: false

import ExUnit.CaptureLog
import Mock

alias Realtime.Database
doctest Realtime.Database
def handle_telemetry(event, metadata, _, pid: pid), do: send(pid, {event, metadata})

setup do
tenant = tenant_fixture()

:telemetry.attach(__MODULE__, [:realtime, :database, :transaction], &__MODULE__.handle_telemetry/4, pid: self())
on_exit(fn -> :telemetry.detach(__MODULE__) end)
# Ensure no replication slot is present before the test
Cleanup.ensure_no_replication_slot()

Expand Down Expand Up @@ -163,15 +164,13 @@ defmodule Realtime.DatabaseTest do
test "with telemetry event defined, emits telemetry event", %{db_conn: db_conn} do
event = [:realtime, :database, :transaction]

with_mock Realtime.Telemetry, execute: fn _, _, _ -> :ok end do
Database.transaction(
db_conn,
fn conn -> Postgrex.query!(conn, "SELECT pg_sleep(6)", []) end,
telemetry: event
)
Database.transaction(
db_conn,
fn conn -> Postgrex.query!(conn, "SELECT pg_sleep(6)", []) end,
telemetry: event
)

assert_called(Realtime.Telemetry.execute(event, %{latency: :_}, %{}))
end
assert_receive {^event, %{latency: _}}
end
end

Expand Down
19 changes: 19 additions & 0 deletions test/realtime/monitoring/prom_ex_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Realtime.PromExTest do
use ExUnit.Case
doctest Realtime.PromEx
alias Realtime.PromEx

describe "get_metrics/0" do
test "builds metrics in prometheus format which includes host region and id" do
metrics = PromEx.get_metrics()

assert String.contains?(
metrics,
"# HELP beam_system_schedulers_online_info The number of scheduler threads that are online."
)

assert String.contains?(metrics, "# TYPE beam_system_schedulers_online_info gauge")
assert String.contains?(metrics, "beam_system_schedulers_online_info{host=\"nohost\",region=\"\",id=\"nohost\"}")
end
end
end
53 changes: 48 additions & 5 deletions test/realtime/rpc_test.exs
Original file line number Diff line number Diff line change
@@ -1,30 +1,73 @@
defmodule Realtime.RpcTest do
use ExUnit.Case
alias Realtime.Rpc

import ExUnit.CaptureLog

alias Realtime.Rpc

defmodule TestRpc do
def test_raise, do: raise("test")
def test_timeout, do: Process.sleep(1000)
def test_timeout, do: Process.sleep(200)
def test_success, do: {:ok, "success"}
end

def handle_telemetry(event, metadata, _, pid: pid), do: send(pid, {event, metadata})

setup do
:telemetry.attach(__MODULE__, [:realtime, :rpc], &__MODULE__.handle_telemetry/4, pid: self())
on_exit(fn -> :telemetry.detach(__MODULE__) end)
:ok
end

describe "call/5" do
test "successful RPC call returns exactly what the original function returns" do
assert {:ok, "success"} = Rpc.call(node(), TestRpc, :test_success, [])
assert_receive {[:realtime, :rpc], %{latency: _}}
end

test "raised exceptions are properly caught and logged" do
assert {:badrpc,
{:EXIT,
{%RuntimeError{message: "test"},
[
{Realtime.RpcTest.TestRpc, :test_raise, 0,
[file: ~c"test/realtime/rpc_test.exs", line: 9, error_info: %{module: Exception}]}
]}}} =
Rpc.call(node(), TestRpc, :test_raise, [])

assert_receive {[:realtime, :rpc], %{latency: _}}
end

test "timeouts are properly caught and logged" do
assert {:badrpc, :timeout} =
Rpc.call(node(), TestRpc, :test_timeout, [], timeout: 100)

assert_receive {[:realtime, :rpc], %{latency: _}}
end
end

describe "enhanced_call/5" do
test "successful RPC call returns exactly what the original function returns" do
assert {:ok, "success"} = Rpc.enhanced_call(node(), TestRpc, :test_success)
assert_receive {[:realtime, :rpc], %{latency: _, success?: true}}
end

test "raised exceptions are properly caught and logged" do
assert capture_log(fn ->
assert {:error, "RPC call error"} = Rpc.enhanced_call(node(), TestRpc, :test_raise)
assert {:error, :rpc_error, %RuntimeError{message: "test"}} =
Rpc.enhanced_call(node(), TestRpc, :test_raise)
end) =~ "ErrorOnRpcCall"

assert_receive {[:realtime, :rpc], %{latency: _, success?: false}}
end

test "timeouts are properly caught and logged" do
assert capture_log(fn ->
assert {:error, "RPC call error"} =
Rpc.enhanced_call(node(), TestRpc, :test_timeout, 500)
assert {:error, :rpc_error, :timeout} =
Rpc.enhanced_call(node(), TestRpc, :test_timeout, [], timeout: 100)
end) =~ "ErrorOnRpcCall"

assert_receive {[:realtime, :rpc], %{latency: 0, success?: false}}
end
end
end
19 changes: 18 additions & 1 deletion test/realtime_web/channels/realtime_channel/logging_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do
import ExUnit.CaptureLog
alias RealtimeWeb.RealtimeChannel.Logging

def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {event, measures, metadata})

setup do
:telemetry.attach(__MODULE__, [:realtime, :channel, :error], &__MODULE__.handle_telemetry/4, pid: self())
on_exit(fn -> :telemetry.detach(__MODULE__) end)

level = Logger.level()
Logger.configure(level: :debug)
on_exit(fn -> Logger.configure(level: level) end)
Expand Down Expand Up @@ -38,9 +43,21 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do

test "handles error level errors" do
assert capture_log(fn ->
result = Logging.log_error_message(:error, :test_code, "test error")
result = Logging.log_error_message(:error, "TestCodeError", "test error")
assert {:error, %{reason: "test error"}} = result
end) =~ "test error"
end

test "only emits telemetry for system errors" do
errors = Logging.system_errors()

for error <- errors do
Logging.log_error_message(:error, error, "test error")
assert_receive {[:realtime, :channel, :error], %{code: ^error}, %{code: ^error}}
end

Logging.log_error_message(:error, "DatabaseConnectionIssue", "test error")
refute_receive {[:realtime, :channel, :error], %{code: "DatabaseConnectionIssue"}, %{code: "UnableToSetPolicies"}}
end
end
end
Loading