Skip to content

Commit 61375f6

Browse files
committed
feat(telemetry): add :telemetry.span for handle_failed/2 callback
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 3b67a07 commit 61375f6

4 files changed

Lines changed: 200 additions & 5 deletions

File tree

lib/broadway.ex

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,57 @@ defmodule Broadway do
798798
}
799799
```
800800
801+
* `[:broadway, :handle_failed, :start]` - Dispatched before the `c:handle_failed/2`
802+
callback is invoked
803+
804+
* Measurement: `%{system_time: integer}`
805+
806+
* Metadata:
807+
808+
```
809+
%{
810+
module: atom,
811+
messages: [Broadway.Message.t],
812+
context: term,
813+
telemetry_span_context: reference
814+
}
815+
```
816+
817+
* `[:broadway, :handle_failed, :stop]` - Dispatched after the `c:handle_failed/2`
818+
callback has returned
819+
820+
* Measurement: `%{duration: native_time}`
821+
822+
* Metadata:
823+
824+
```
825+
%{
826+
module: atom,
827+
messages: [Broadway.Message.t],
828+
context: term,
829+
telemetry_span_context: reference
830+
}
831+
```
832+
833+
* `[:broadway, :handle_failed, :exception]` - Dispatched if the `c:handle_failed/2`
834+
callback raises an exception
835+
836+
* Measurement: `%{duration: native_time}`
837+
838+
* Metadata:
839+
840+
```
841+
%{
842+
module: atom,
843+
messages: [Broadway.Message.t],
844+
context: term,
845+
kind: :error | :exit | :throw,
846+
reason: term,
847+
stacktrace: list,
848+
telemetry_span_context: reference
849+
}
850+
```
851+
801852
* `[:broadway, :batcher, :start]` - Dispatched by a Broadway batcher before
802853
handling events
803854

lib/broadway/acknowledger.ex

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,19 @@ defmodule Broadway.Acknowledger do
102102
end
103103

104104
defp handle_failed_messages(messages, module, context) do
105+
metadata = %{module: module, messages: messages, context: context}
106+
107+
:telemetry.span(
108+
[:broadway, :handle_failed],
109+
metadata,
110+
fn ->
111+
result = do_handle_failed_messages(messages, module, context)
112+
{result, Map.put(metadata, :messages, result)}
113+
end
114+
)
115+
end
116+
117+
defp do_handle_failed_messages(messages, module, context) do
105118
module.handle_failed(messages, context)
106119
catch
107120
kind, reason ->

lib/broadway/topology/processor_stage.ex

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,7 @@ defmodule Broadway.Topology.ProcessorStage do
164164
index: state.partition,
165165
name: state.name,
166166
message: message,
167-
context: state.context,
168-
producer: state.producer
167+
context: state.context
169168
},
170169
fn ->
171170
updated_message =
@@ -180,8 +179,7 @@ defmodule Broadway.Topology.ProcessorStage do
180179
index: state.partition,
181180
name: state.name,
182181
message: updated_message,
183-
context: state.context,
184-
producer: state.producer
182+
context: state.context
185183
}}
186184
end
187185
)

test/broadway/telemetry_test.exs

Lines changed: 134 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
defmodule Broadway.TelemetryTest do
22
use ExUnit.Case
33

4+
alias Broadway.Message
5+
46
defmodule MessageServer do
57
use Broadway
68

@@ -24,6 +26,60 @@ defmodule Broadway.TelemetryTest do
2426
end
2527
end
2628

29+
defmodule FailingMessageServer do
30+
use Broadway
31+
32+
def start_link(opts) do
33+
Broadway.start_link(__MODULE__,
34+
name: opts[:name],
35+
producer: [module: {Broadway.DummyProducer, []}],
36+
processors: [default: []],
37+
context: opts[:context]
38+
)
39+
end
40+
41+
@impl true
42+
def handle_message(_, message, _context) do
43+
Message.failed(message, "forced failure")
44+
end
45+
46+
@impl true
47+
def handle_failed(messages, %{test_pid: test_pid} = _context) do
48+
send(test_pid, :handle_failed_called)
49+
messages
50+
end
51+
end
52+
53+
defmodule FailingBatchServer do
54+
use Broadway
55+
56+
def start_link(opts) do
57+
Broadway.start_link(__MODULE__,
58+
name: opts[:name],
59+
producer: [module: {Broadway.DummyProducer, []}],
60+
processors: [default: []],
61+
batchers: [default: [batch_size: 1]],
62+
context: opts[:context]
63+
)
64+
end
65+
66+
@impl true
67+
def handle_message(_, message, _context) do
68+
message
69+
end
70+
71+
@impl true
72+
def handle_batch(_batcher, messages, _batch_info, _context) do
73+
Enum.map(messages, &Message.failed(&1, "forced batch failure"))
74+
end
75+
76+
@impl true
77+
def handle_failed(messages, %{test_pid: test_pid} = _context) do
78+
send(test_pid, :handle_failed_called)
79+
messages
80+
end
81+
end
82+
2783
test "[:broadway, :processor, :message, :start] and :stop events include producer metadata" do
2884
test_pid = self()
2985
producer_options = [queue_url: "https://sqs.amazonaws.com/123456789/my-queue"]
@@ -72,7 +128,10 @@ defmodule Broadway.TelemetryTest do
72128
[:broadway, :processor, :message, :start],
73129
fn _event_name, _measurements, metadata, _config ->
74130
{_module, adapter_options} = metadata.producer
75-
queue_name = adapter_options |> Keyword.fetch!(:queue_url) |> String.split("/") |> List.last()
131+
132+
queue_name =
133+
adapter_options |> Keyword.fetch!(:queue_url) |> String.split("/") |> List.last()
134+
76135
send(test_pid, {:queue_name, queue_name})
77136
end,
78137
%{}
@@ -92,4 +151,78 @@ defmodule Broadway.TelemetryTest do
92151

93152
:telemetry.detach("test-extract-queue-name")
94153
end
154+
155+
test "[:broadway, :handle_failed, :start] and :stop are emitted from a processor" do
156+
test_pid = self()
157+
158+
:telemetry.attach_many(
159+
"test-handle-failed-processor",
160+
[
161+
[:broadway, :handle_failed, :start],
162+
[:broadway, :handle_failed, :stop]
163+
],
164+
fn event_name, _measurements, metadata, _config ->
165+
send(test_pid, {:telemetry_event, event_name, metadata})
166+
end,
167+
%{}
168+
)
169+
170+
{:ok, _pid} =
171+
start_supervised(
172+
{FailingMessageServer,
173+
name: :test_handle_failed_processor, context: %{test_pid: test_pid}}
174+
)
175+
176+
ref = Broadway.test_message(:test_handle_failed_processor, "data")
177+
178+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :start], start_metadata}
179+
assert start_metadata.module == FailingMessageServer
180+
assert [%Message{}] = start_metadata.messages
181+
182+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :stop], stop_metadata}
183+
assert stop_metadata.module == FailingMessageServer
184+
assert [%Message{}] = stop_metadata.messages
185+
186+
assert_receive :handle_failed_called
187+
assert_receive {:ack, ^ref, _successful, _failed}
188+
189+
:telemetry.detach("test-handle-failed-processor")
190+
end
191+
192+
test "[:broadway, :handle_failed, :start] and :stop are emitted from a batch processor" do
193+
test_pid = self()
194+
195+
:telemetry.attach_many(
196+
"test-handle-failed-batch-processor",
197+
[
198+
[:broadway, :handle_failed, :start],
199+
[:broadway, :handle_failed, :stop]
200+
],
201+
fn event_name, _measurements, metadata, _config ->
202+
send(test_pid, {:telemetry_event, event_name, metadata})
203+
end,
204+
%{}
205+
)
206+
207+
{:ok, _pid} =
208+
start_supervised(
209+
{FailingBatchServer,
210+
name: :test_handle_failed_batch_processor, context: %{test_pid: test_pid}}
211+
)
212+
213+
ref = Broadway.test_message(:test_handle_failed_batch_processor, "data")
214+
215+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :start], start_metadata}
216+
assert start_metadata.module == FailingBatchServer
217+
assert [%Message{}] = start_metadata.messages
218+
219+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :stop], stop_metadata}
220+
assert stop_metadata.module == FailingBatchServer
221+
assert [%Message{}] = stop_metadata.messages
222+
223+
assert_receive :handle_failed_called
224+
assert_receive {:ack, ^ref, _successful, _failed}
225+
226+
:telemetry.detach("test-handle-failed-batch-processor")
227+
end
95228
end

0 commit comments

Comments
 (0)