Skip to content

Commit

Permalink
Improve shutdown handling (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
eth2353 authored Jan 2, 2025
1 parent da3259c commit b7b59f2
Show file tree
Hide file tree
Showing 15 changed files with 467 additions and 167 deletions.
8 changes: 8 additions & 0 deletions compose-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ services:
validator-client:
container_name: validator-client
image: ghcr.io/serenita-org/vero:v0.9.0
# It's recommended to set the `stop_grace_period` value
# (or equivalent, e.g. `terminationGracePeriodSeconds` in k8s)
# to at least 30 seconds.
# This is because Vero defers the shutdown process if it detects
# an upcoming validator duty, opting to complete the duty before
# shutting down.
# This feature can make the shutdown process take several seconds.
stop_grace_period: 1m
command:
- "--network=holesky"
- "--remote-signer-url=http://remote-signer:9000"
Expand Down
19 changes: 11 additions & 8 deletions src/initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
BlockProposalService,
EventConsumerService,
SyncCommitteeService,
ValidatorDutyService,
ValidatorDutyServiceOptions,
ValidatorStatusTrackerService,
)
Expand Down Expand Up @@ -94,13 +95,12 @@ def check_data_dir_permissions(data_dir: Path) -> None:
)


async def run_services(cli_args: CLIArgs) -> None:
scheduler = AsyncIOScheduler(
timezone=pytz.UTC,
job_defaults=dict(misfire_grace_time=None),
)
scheduler.start()

async def run_services(
cli_args: CLIArgs,
scheduler: AsyncIOScheduler,
validator_duty_services: list[ValidatorDutyService],
shutdown_event: asyncio.Event,
) -> None:
async with (
RemoteSigner(url=cli_args.remote_signer_url) as remote_signer,
MultiBeaconNode(
Expand Down Expand Up @@ -145,6 +145,7 @@ async def run_services(cli_args: CLIArgs) -> None:
sync_committee_service,
):
service.start()
validator_duty_services.append(service)
_logger.info("Started validator duty services")

event_consumer_service = EventConsumerService(
Expand All @@ -163,4 +164,6 @@ async def run_services(cli_args: CLIArgs) -> None:
event_consumer_service.start()

# Run forever while monitoring the event loop
await monitor_event_loop(beacon_chain=beacon_chain)
await monitor_event_loop(
beacon_chain=beacon_chain, shutdown_event=shutdown_event
)
52 changes: 38 additions & 14 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
import asyncio
import functools
import logging
import signal
import sys
from pathlib import Path
from types import FrameType
from typing import TYPE_CHECKING

import pytz
from apscheduler.schedulers.asyncio import AsyncIOScheduler

from args import CLIArgs, parse_cli_args
from initialize import check_data_dir_permissions, run_services
from observability import get_service_commit, get_service_version, init_observability

if TYPE_CHECKING:
from services import ValidatorDutyService
from shutdown import shutdown_handler


def prep_datadir(data_dir: Path) -> None:
# Write to placeholder file
Expand All @@ -27,12 +35,36 @@ async def main(cli_args: CLIArgs) -> None:
)
check_data_dir_permissions(data_dir=Path(cli_args.data_dir))
prep_datadir(data_dir=Path(cli_args.data_dir))
await run_services(cli_args=cli_args)

scheduler = AsyncIOScheduler(
timezone=pytz.UTC,
job_defaults=dict(
coalesce=True, # default value
max_instances=1, # default value
misfire_grace_time=None, # default is 1 second
),
)
scheduler.start()

validator_duty_services: list[ValidatorDutyService] = []

def sigterm_handler(_signum: int, _frame: FrameType | None) -> None:
logging.getLogger().info("Received SIGTERM. Exiting.")
sys.exit(0)
loop = asyncio.get_running_loop()
signals = (signal.SIGINT, signal.SIGTERM)
shutdown_event = asyncio.Event()
for s in signals:
loop.add_signal_handler(
s,
functools.partial(
shutdown_handler, s, validator_duty_services, shutdown_event
),
)

await run_services(
cli_args=cli_args,
scheduler=scheduler,
validator_duty_services=validator_duty_services,
shutdown_event=shutdown_event,
)


if __name__ == "__main__":
Expand All @@ -43,12 +75,4 @@ def sigterm_handler(_signum: int, _frame: FrameType | None) -> None:
metrics_multiprocess_mode=cli_args.metrics_multiprocess_mode,
log_level=cli_args.log_level,
)

signal.signal(signal.SIGTERM, sigterm_handler)

# Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
try:
asyncio.run(main(cli_args=cli_args))
except (KeyboardInterrupt, SystemExit):
logger = logging.getLogger("vero-shutdown")
logger.info("Shutting down...")
asyncio.run(main(cli_args=cli_args))
6 changes: 4 additions & 2 deletions src/observability/event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
)


async def monitor_event_loop(beacon_chain: BeaconChain) -> None:
async def monitor_event_loop(
beacon_chain: BeaconChain, shutdown_event: asyncio.Event
) -> None:
_logger = logging.getLogger("event-loop")
event_loop = asyncio.get_running_loop()
_start = event_loop.time()
_interval = 0.1 # Check every 100 milliseconds
_loop_lag_high_threshold = 0.5 # 500 milliseconds

while True:
while not shutdown_event.is_set():
await asyncio.sleep(_interval)
lag = event_loop.time() - _start - _interval
if lag > _loop_lag_high_threshold:
Expand Down
6 changes: 6 additions & 0 deletions src/providers/beacon_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ def current_epoch(self) -> int:
def compute_start_slot_at_epoch(self, epoch: int) -> int:
return epoch * self.spec.SLOTS_PER_EPOCH # type: ignore[no-any-return]

def compute_epochs_for_sync_period(self, sync_period: int) -> tuple[int, int]:
spec = self.spec # Cache property value
start_epoch = sync_period * spec.EPOCHS_PER_SYNC_COMMITTEE_PERIOD
end_epoch = start_epoch + spec.EPOCHS_PER_SYNC_COMMITTEE_PERIOD
return start_epoch, end_epoch

def compute_sync_period_for_epoch(self, epoch: int) -> int:
return epoch // self.spec.EPOCHS_PER_SYNC_COMMITTEE_PERIOD # type: ignore[no-any-return]

Expand Down
3 changes: 2 additions & 1 deletion src/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
from .block_proposal import BlockProposalService
from .event_consumer import EventConsumerService
from .sync_committee import SyncCommitteeService
from .validator_duty_service import ValidatorDutyServiceOptions
from .validator_duty_service import ValidatorDutyService, ValidatorDutyServiceOptions
from .validator_status_tracker import ValidatorStatusTrackerService

__all__ = [
"AttestationService",
"BlockProposalService",
"EventConsumerService",
"SyncCommitteeService",
"ValidatorDutyService",
"ValidatorDutyServiceOptions",
"ValidatorStatusTrackerService",
]
51 changes: 46 additions & 5 deletions src/services/attestation.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,45 @@ def start(self) -> None:
self.update_duties, id=f"{self.__class__.__name__}.update_duties"
)

@property
def next_duty_slot(self) -> int | None:
# In case a duty for the current slot has not finished yet, it is still
# considered the next duty slot
if self.has_ongoing_duty:
return self._last_slot_duty_started_for

current_slot = self.beacon_chain.current_slot
min_duty_slots_per_epoch = (
min(
(
int(d.slot)
for d in duties
if int(d.slot) > self._last_slot_duty_started_for
and int(d.slot) >= current_slot
),
default=None,
)
for duties in self.attester_duties.values()
if duties
)
return min(
(slot for slot in min_duty_slots_per_epoch if slot is not None),
default=None,
)

@property
def next_duty_run_time(self) -> datetime.datetime | None:
next_duty_slot = self.next_duty_slot
if next_duty_slot is None:
return None

return self.beacon_chain.get_datetime_for_slot(
next_duty_slot
) + datetime.timedelta(
seconds=int(self.beacon_chain.spec.SECONDS_PER_SLOT)
/ int(self.beacon_chain.spec.INTERVALS_PER_SLOT),
)

async def handle_head_event(self, event: SchemaBeaconAPI.HeadEvent) -> None:
if (
any(
Expand Down Expand Up @@ -120,12 +159,12 @@ async def attest_if_not_yet_attested(
if self.validator_status_tracker_service.slashing_detected:
raise RuntimeError("Slashing detected, not attesting")

if slot < self._last_slot_duty_performed_for:
if slot < self._last_slot_duty_started_for:
return
if slot == self._last_slot_duty_performed_for:
if slot == self._last_slot_duty_started_for:
if head_event:
self.logger.warning(
f"Ignoring head event, already started attesting to slot {self._last_slot_duty_performed_for}",
f"Ignoring head event, already started attesting to slot {self._last_slot_duty_started_for}",
)
return
if slot != self.beacon_chain.current_slot:
Expand All @@ -137,8 +176,6 @@ async def attest_if_not_yet_attested(
)
return

self._last_slot_duty_performed_for = slot

epoch = slot // self.beacon_chain.spec.SLOTS_PER_EPOCH
slot_attester_duties = {
duty for duty in self.attester_duties[epoch] if int(duty.slot) == slot
Expand All @@ -162,6 +199,7 @@ async def attest_if_not_yet_attested(
self.logger.debug(
f"Attesting to slot {slot}, {len(slot_attester_duties)} duties",
)
self._last_slot_duty_started_for = slot
self._duty_start_time_metric.labels(
duty=ValidatorDuty.ATTESTATION.value,
).observe(self.beacon_chain.time_since_slot_start(slot=slot))
Expand Down Expand Up @@ -198,6 +236,7 @@ async def attest_if_not_yet_attested(
_ERRORS_METRIC.labels(
error_type=ErrorType.ATTESTATION_CONSENSUS.value,
).inc()
self._last_slot_duty_completed_for = slot
return

consensus_time = asyncio.get_running_loop().time() - consensus_start
Expand Down Expand Up @@ -325,6 +364,8 @@ def _att_data_for_committee_idx(
_VC_PUBLISHED_ATTESTATIONS.inc(
amount=len(attestations_objects_to_publish),
)
finally:
self._last_slot_duty_completed_for = slot

def prepare_and_aggregate_attestations(
self,
Expand Down
Loading

0 comments on commit b7b59f2

Please sign in to comment.