Skip to content

Commit 015df34

Browse files
committed
implement basic watermarking
1 parent f1b8fde commit 015df34

File tree

3 files changed

+66
-37
lines changed

3 files changed

+66
-37
lines changed

ray_beam_runner/portability/context_management.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,15 +163,18 @@ def setup(self):
163163
pcoll_id,
164164
self.execution_context.safe_coders.get(coder_id, coder_id),
165165
)
166+
166167
elif transform.spec.urn == bundle_processor.DATA_OUTPUT_URN:
167168
data_output[transform.unique_name] = pcoll_id
168169
coder_id = self.execution_context.data_channel_coders[
169170
translations.only_element(transform.inputs.values())
170171
]
171172
else:
172173
raise NotImplementedError
173-
data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id)
174-
transform.spec.payload = data_spec.SerializeToString()
174+
if pcoll_id != translations.IMPULSE_BUFFER:
175+
data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id)
176+
transform.spec.payload = data_spec.SerializeToString()
177+
175178
elif transform.spec.urn in translations.PAR_DO_URNS:
176179
payload = proto_utils.parse_Bytes(
177180
transform.spec.payload, beam_runner_api_pb2.ParDoPayload

ray_beam_runner/portability/execution.py

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -345,20 +345,9 @@ def get(self, pcoll) -> List[ray.ObjectRef]:
345345

346346
@ray.remote
347347
class RayWatermarkManager(watermark_manager.WatermarkManager):
348-
def __init__(self):
349-
# the original WatermarkManager performs a lot of computation
350-
# in its __init__ method. Because Ray calls __init__ whenever
351-
# it deserializes an object, we'll move its setup elsewhere.
352-
self._initialized = False
353-
self._pcollections_by_name = {}
354-
self._stages_by_name = {}
355-
356-
def setup(self, stages):
357-
if self._initialized:
358-
return
359-
logging.debug("initialized the RayWatermarkManager")
360-
self._initialized = True
361-
watermark_manager.WatermarkManager.setup(self, stages)
348+
def set_pcoll_produced_watermark(self, name, watermark):
349+
element = self._pcollections_by_name[name]
350+
element.set_produced_watermark(watermark)
362351

363352

364353
class RayRunnerExecutionContext(object):
@@ -371,6 +360,7 @@ def __init__(
371360
state_servicer: Optional[RayStateManager] = None,
372361
worker_manager: Optional[RayWorkerHandlerManager] = None,
373362
pcollection_buffers: PcollectionBufferManager = None,
363+
watermark_manager: Optional[RayWatermarkManager] = None,
374364
) -> None:
375365
ray.util.register_serializer(
376366
beam_runner_api_pb2.Components,
@@ -405,7 +395,9 @@ def __init__(
405395
for t in s.transforms
406396
if t.spec.urn == bundle_processor.DATA_INPUT_URN
407397
}
408-
self._watermark_manager = RayWatermarkManager.remote()
398+
self.watermark_manager = watermark_manager or RayWatermarkManager.remote(
399+
self.stages
400+
)
409401
self.pipeline_context = pipeline_context.PipelineContext(pipeline_components)
410402
self.safe_windowing_strategies = {
411403
# TODO: Enable safe_windowing_strategy after
@@ -419,14 +411,6 @@ def __init__(
419411
self.worker_manager = worker_manager or RayWorkerHandlerManager()
420412
self.timer_coder_ids = self._build_timer_coders_id_map()
421413

422-
@property
423-
def watermark_manager(self):
424-
# We don't need to wait for this line to execute with ray.get,
425-
# because any further calls to the watermark manager actor will
426-
# have to wait for it.
427-
self._watermark_manager.setup.remote(self.stages)
428-
return self._watermark_manager
429-
430414
@staticmethod
431415
def next_uid():
432416
# TODO(pabloem): Use stats actor for UIDs.
@@ -464,6 +448,7 @@ def __reduce__(self):
464448
self.state_servicer,
465449
self.worker_manager,
466450
self.pcollection_buffers,
451+
self.watermark_manager,
467452
)
468453

469454
def deserializer(*args):
@@ -475,6 +460,7 @@ def deserializer(*args):
475460
args[4],
476461
args[5],
477462
args[6],
463+
args[7],
478464
)
479465

480466
return (deserializer, data)

ray_beam_runner/portability/ray_fn_runner.py

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
from apache_beam.runners.portability.fn_api_runner import translations
4343
from apache_beam.runners.portability.fn_api_runner.execution import ListBuffer
4444
from apache_beam.transforms import environments
45-
from apache_beam.utils import proto_utils
45+
from apache_beam.utils import proto_utils, timestamp
4646

4747
import ray
4848
from ray_beam_runner.portability.context_management import RayBundleContextManager
@@ -227,7 +227,9 @@ def _run_stage(
227227
bundle_context_manager (execution.BundleContextManager): A description of
228228
the stage to execute, and its context.
229229
"""
230+
230231
bundle_context_manager.setup()
232+
231233
runner_execution_context.worker_manager.register_process_bundle_descriptor(
232234
bundle_context_manager.process_bundle_descriptor
233235
)
@@ -246,6 +248,8 @@ def _run_stage(
246248
for k in bundle_context_manager.transform_to_buffer_coder
247249
}
248250

251+
watermark_manager = runner_execution_context.watermark_manager
252+
249253
final_result = None # type: Optional[beam_fn_api_pb2.InstructionResponse]
250254

251255
while True:
@@ -262,19 +266,26 @@ def _run_stage(
262266

263267
final_result = merge_stage_results(final_result, last_result)
264268
if not delayed_applications and not fired_timers:
269+
# Processing has completed; marking all outputs as completed
270+
for output_pc in bundle_outputs:
271+
_, update_output_pc = translations.split_buffer_id(output_pc)
272+
watermark_manager.set_pcoll_produced_watermark.remote(
273+
update_output_pc, timestamp.MAX_TIMESTAMP
274+
)
265275
break
266276
else:
267-
# TODO: Enable following assertion after watermarking is implemented
268-
# assert (ray.get(
269-
# runner_execution_context.watermark_manager
270-
# .get_stage_node.remote(
271-
# bundle_context_manager.stage.name)).output_watermark()
272-
# < timestamp.MAX_TIMESTAMP), (
273-
# 'wrong timestamp for %s. '
274-
# % ray.get(
275-
# runner_execution_context.watermark_manager
276-
# .get_stage_node.remote(
277-
# bundle_context_manager.stage.name)))
277+
assert (
278+
ray.get(
279+
watermark_manager.get_stage_node.remote(
280+
bundle_context_manager.stage.name
281+
)
282+
).output_watermark()
283+
< timestamp.MAX_TIMESTAMP
284+
), "wrong timestamp for %s. " % ray.get(
285+
watermark_manager.get_stage_node.remote(
286+
bundle_context_manager.stage.name
287+
)
288+
)
278289
input_data = delayed_applications
279290
input_timers = fired_timers
280291

@@ -288,6 +299,20 @@ def _run_stage(
288299
# TODO(pabloem): Make sure that side inputs are being stored somewhere.
289300
# runner_execution_context.commit_side_inputs_to_state(data_side_input)
290301

302+
# assert that the output watermark was correctly set for this stage
303+
stage_node = ray.get(
304+
runner_execution_context.watermark_manager.get_stage_node.remote(
305+
bundle_context_manager.stage.name
306+
)
307+
)
308+
assert (
309+
stage_node.output_watermark() == timestamp.MAX_TIMESTAMP
310+
), "wrong output watermark for %s. Expected %s, but got %s." % (
311+
stage_node,
312+
timestamp.MAX_TIMESTAMP,
313+
stage_node.output_watermark(),
314+
)
315+
291316
return final_result
292317

293318
def _run_bundle(
@@ -346,6 +371,21 @@ def _run_bundle(
346371
# coder_impl=bundle_context_manager.get_input_coder_impl(
347372
# other_input))
348373

374+
# TODO: replace placeholder sets when timers are implemented
375+
watermark_updates = fn_runner.FnApiRunner._build_watermark_updates(
376+
runner_execution_context,
377+
transform_to_buffer_coder.keys(),
378+
set(), # expected_timers
379+
set(), # pcolls_with_da
380+
delayed_applications.keys(),
381+
set(), # watermarks_by_transform_and_timer_family
382+
)
383+
384+
for pc_name, watermark in watermark_updates.items():
385+
runner_execution_context.watermark_manager.set_pcoll_watermark.remote(
386+
pc_name, watermark
387+
)
388+
349389
newly_set_timers = {}
350390
return result, newly_set_timers, delayed_applications, output
351391

0 commit comments

Comments
 (0)