Skip to content

Commit 17a24a0

Browse files
committed
feat(telemetry): add :telemetry.span for handle_failed/2 callback
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 3b67a07 commit 17a24a0

5 files changed

Lines changed: 237 additions & 5 deletions

File tree

lib/broadway.ex

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,75 @@ defmodule Broadway do
798798
}
799799
```
800800
801+
* `[:broadway, :handle_failed, :start]` - Dispatched before the `c:handle_failed/2`
802+
callback is invoked
803+
804+
* Measurement: `%{system_time: integer}`
805+
806+
* Metadata:
807+
808+
```
809+
%{
810+
topology_name: atom,
811+
name: atom,
812+
index: non_neg_integer,
813+
processor_key: atom, # only present when called from a processor
814+
batch_info: Broadway.BatchInfo.t, # only present when called from a batch processor
815+
module: atom,
816+
messages: [Broadway.Message.t],
817+
context: term,
818+
producer: {atom, list},
819+
telemetry_span_context: reference
820+
}
821+
```
822+
823+
* `[:broadway, :handle_failed, :stop]` - Dispatched after the `c:handle_failed/2`
824+
callback has returned
825+
826+
* Measurement: `%{duration: native_time}`
827+
828+
* Metadata:
829+
830+
```
831+
%{
832+
topology_name: atom,
833+
name: atom,
834+
index: non_neg_integer,
835+
processor_key: atom, # only present when called from a processor
836+
batch_info: Broadway.BatchInfo.t, # only present when called from a batch processor
837+
module: atom,
838+
messages: [Broadway.Message.t],
839+
context: term,
840+
producer: {atom, list},
841+
telemetry_span_context: reference
842+
}
843+
```
844+
845+
* `[:broadway, :handle_failed, :exception]` - Dispatched if the `c:handle_failed/2`
846+
callback raises an exception
847+
848+
* Measurement: `%{duration: native_time}`
849+
850+
* Metadata:
851+
852+
```
853+
%{
854+
topology_name: atom,
855+
name: atom,
856+
index: non_neg_integer,
857+
processor_key: atom, # only present when called from a processor
858+
batch_info: Broadway.BatchInfo.t, # only present when called from a batch processor
859+
module: atom,
860+
messages: [Broadway.Message.t],
861+
context: term,
862+
producer: {atom, list},
863+
kind: :error | :exit | :throw,
864+
reason: term,
865+
stacktrace: list,
866+
telemetry_span_context: reference
867+
}
868+
```
869+
801870
* `[:broadway, :batcher, :start]` - Dispatched by a Broadway batcher before
802871
handling events
803872

lib/broadway/acknowledger.ex

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,28 @@ defmodule Broadway.Acknowledger do
9393
# Used by the processor and the batcher to maybe call c:handle_failed/2
9494
# on failed messages.
9595
@doc false
96-
def maybe_handle_failed_messages(messages, module, context) do
96+
def maybe_handle_failed_messages(messages, module, context, telemetry_metadata \\ %{}) do
9797
if function_exported?(module, :handle_failed, 2) and messages != [] do
98-
handle_failed_messages(messages, module, context)
98+
handle_failed_messages(messages, module, context, telemetry_metadata)
9999
else
100100
messages
101101
end
102102
end
103103

104-
defp handle_failed_messages(messages, module, context) do
104+
defp handle_failed_messages(messages, module, context, telemetry_metadata) do
105+
metadata = Map.merge(telemetry_metadata, %{module: module, messages: messages, context: context})
106+
107+
:telemetry.span(
108+
[:broadway, :handle_failed],
109+
metadata,
110+
fn ->
111+
result = do_handle_failed_messages(messages, module, context)
112+
{result, Map.put(metadata, :messages, result)}
113+
end
114+
)
115+
end
116+
117+
defp do_handle_failed_messages(messages, module, context) do
105118
module.handle_failed(messages, context)
106119
catch
107120
kind, reason ->

lib/broadway/topology/batch_processor_stage.ex

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,14 @@ defmodule Broadway.Topology.BatchProcessorStage do
7272
Acknowledger.maybe_handle_failed_messages(
7373
failed_messages,
7474
state.module,
75-
state.context
75+
state.context,
76+
%{
77+
topology_name: state.topology_name,
78+
name: state.name,
79+
index: state.partition,
80+
batch_info: batch_info,
81+
producer: state.producer
82+
}
7683
)
7784

7885
if returned != size do

lib/broadway/topology/processor_stage.ex

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,14 @@ defmodule Broadway.Topology.ProcessorStage do
9191
Acknowledger.maybe_handle_failed_messages(
9292
failed_messages,
9393
state.module,
94-
state.context
94+
state.context,
95+
%{
96+
topology_name: state.topology_name,
97+
name: state.name,
98+
index: state.partition,
99+
processor_key: state.processor_key,
100+
producer: state.producer
101+
}
95102
)
96103

97104
try do

test/broadway/telemetry_test.exs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
defmodule Broadway.TelemetryTest do
22
use ExUnit.Case
33

4+
alias Broadway.Message
5+
46
defmodule MessageServer do
57
use Broadway
68

@@ -24,6 +26,60 @@ defmodule Broadway.TelemetryTest do
2426
end
2527
end
2628

29+
defmodule FailingMessageServer do
30+
use Broadway
31+
32+
def start_link(opts) do
33+
Broadway.start_link(__MODULE__,
34+
name: opts[:name],
35+
producer: [module: {Broadway.DummyProducer, []}],
36+
processors: [default: []],
37+
context: opts[:context]
38+
)
39+
end
40+
41+
@impl true
42+
def handle_message(_, message, _context) do
43+
Message.failed(message, "forced failure")
44+
end
45+
46+
@impl true
47+
def handle_failed(messages, %{test_pid: test_pid} = _context) do
48+
send(test_pid, :handle_failed_called)
49+
messages
50+
end
51+
end
52+
53+
defmodule FailingBatchServer do
54+
use Broadway
55+
56+
def start_link(opts) do
57+
Broadway.start_link(__MODULE__,
58+
name: opts[:name],
59+
producer: [module: {Broadway.DummyProducer, []}],
60+
processors: [default: []],
61+
batchers: [default: [batch_size: 1]],
62+
context: opts[:context]
63+
)
64+
end
65+
66+
@impl true
67+
def handle_message(_, message, _context) do
68+
message
69+
end
70+
71+
@impl true
72+
def handle_batch(_batcher, messages, _batch_info, _context) do
73+
Enum.map(messages, &Message.failed(&1, "forced batch failure"))
74+
end
75+
76+
@impl true
77+
def handle_failed(messages, %{test_pid: test_pid} = _context) do
78+
send(test_pid, :handle_failed_called)
79+
messages
80+
end
81+
end
82+
2783
test "[:broadway, :processor, :message, :start] and :stop events include producer metadata" do
2884
test_pid = self()
2985
producer_options = [queue_url: "https://sqs.amazonaws.com/123456789/my-queue"]
@@ -92,4 +148,84 @@ defmodule Broadway.TelemetryTest do
92148

93149
:telemetry.detach("test-extract-queue-name")
94150
end
151+
152+
test "[:broadway, :handle_failed, :start] and :stop are emitted from a processor" do
153+
test_pid = self()
154+
155+
:telemetry.attach_many(
156+
"test-handle-failed-processor",
157+
[
158+
[:broadway, :handle_failed, :start],
159+
[:broadway, :handle_failed, :stop]
160+
],
161+
fn event_name, _measurements, metadata, _config ->
162+
send(test_pid, {:telemetry_event, event_name, metadata})
163+
end,
164+
%{}
165+
)
166+
167+
{:ok, _pid} =
168+
start_supervised(
169+
{FailingMessageServer,
170+
name: :test_handle_failed_processor, context: %{test_pid: test_pid}}
171+
)
172+
173+
ref = Broadway.test_message(:test_handle_failed_processor, "data")
174+
175+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :start], start_metadata}
176+
assert start_metadata.topology_name == :test_handle_failed_processor
177+
assert start_metadata.module == FailingMessageServer
178+
assert start_metadata.processor_key == :default
179+
assert [%Message{}] = start_metadata.messages
180+
181+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :stop], stop_metadata}
182+
assert stop_metadata.topology_name == :test_handle_failed_processor
183+
assert stop_metadata.module == FailingMessageServer
184+
assert [%Message{}] = stop_metadata.messages
185+
186+
assert_receive :handle_failed_called
187+
assert_receive {:ack, ^ref, _successful, _failed}
188+
189+
:telemetry.detach("test-handle-failed-processor")
190+
end
191+
192+
test "[:broadway, :handle_failed, :start] and :stop are emitted from a batch processor" do
193+
test_pid = self()
194+
195+
:telemetry.attach_many(
196+
"test-handle-failed-batch-processor",
197+
[
198+
[:broadway, :handle_failed, :start],
199+
[:broadway, :handle_failed, :stop]
200+
],
201+
fn event_name, _measurements, metadata, _config ->
202+
send(test_pid, {:telemetry_event, event_name, metadata})
203+
end,
204+
%{}
205+
)
206+
207+
{:ok, _pid} =
208+
start_supervised(
209+
{FailingBatchServer,
210+
name: :test_handle_failed_batch_processor, context: %{test_pid: test_pid}}
211+
)
212+
213+
ref = Broadway.test_message(:test_handle_failed_batch_processor, "data")
214+
215+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :start], start_metadata}
216+
assert start_metadata.topology_name == :test_handle_failed_batch_processor
217+
assert start_metadata.module == FailingBatchServer
218+
assert %Broadway.BatchInfo{} = start_metadata.batch_info
219+
assert [%Message{}] = start_metadata.messages
220+
221+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :stop], stop_metadata}
222+
assert stop_metadata.topology_name == :test_handle_failed_batch_processor
223+
assert stop_metadata.module == FailingBatchServer
224+
assert [%Message{}] = stop_metadata.messages
225+
226+
assert_receive :handle_failed_called
227+
assert_receive {:ack, ^ref, _successful, _failed}
228+
229+
:telemetry.detach("test-handle-failed-batch-processor")
230+
end
95231
end

0 commit comments

Comments
 (0)