Skip to content

Commit

Permalink
Scheduler improvements (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
eth2353 authored Jan 14, 2025
1 parent b6fc715 commit 9016519
Show file tree
Hide file tree
Showing 20 changed files with 411 additions and 346 deletions.
1 change: 1 addition & 0 deletions grafana/vero-detailed.json
Original file line number Diff line number Diff line change
Expand Up @@ -5174,6 +5174,7 @@
"y": 68
},
"id": 36,
"interval": "1m",
"maxDataPoints": 20,
"options": {
"colWidth": 0.9,
Expand Down
19 changes: 17 additions & 2 deletions src/initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
ValidatorDutyServiceOptions,
ValidatorStatusTrackerService,
)
from tasks import TaskManager

_logger = logging.getLogger("vero-init")

Expand Down Expand Up @@ -101,29 +102,39 @@ async def run_services(
validator_duty_services: list[ValidatorDutyService],
shutdown_event: asyncio.Event,
) -> None:
task_manager = TaskManager()

async with (
RemoteSigner(url=cli_args.remote_signer_url) as remote_signer,
MultiBeaconNode(
beacon_node_urls=cli_args.beacon_node_urls,
beacon_node_urls_proposal=cli_args.beacon_node_urls_proposal,
scheduler=scheduler,
task_manager=task_manager,
cli_args=cli_args,
) as multi_beacon_node,
):
beacon_chain = BeaconChain(multi_beacon_node=multi_beacon_node)
beacon_chain = BeaconChain(
multi_beacon_node=multi_beacon_node,
task_manager=task_manager,
)

await _wait_for_genesis(genesis_datetime=beacon_chain.get_datetime_for_slot(0))

_logger.info(f"Current slot: {beacon_chain.current_slot}")
_logger.info(f"Current epoch: {beacon_chain.current_epoch}")
_logger.info(f"Current slot: {beacon_chain.current_slot}")

validator_status_tracker_service = ValidatorStatusTrackerService(
multi_beacon_node=multi_beacon_node,
beacon_chain=beacon_chain,
remote_signer=remote_signer,
scheduler=scheduler,
task_manager=task_manager,
)
await validator_status_tracker_service.initialize()
beacon_chain.new_slot_handlers.append(
validator_status_tracker_service.on_new_slot
)
_logger.info("Initialized validator status tracker")

validator_service_args = ValidatorDutyServiceOptions(
Expand All @@ -133,6 +144,7 @@ async def run_services(
validator_status_tracker_service=validator_status_tracker_service,
scheduler=scheduler,
cli_args=cli_args,
task_manager=task_manager,
)

attestation_service = AttestationService(**validator_service_args)
Expand All @@ -146,11 +158,14 @@ async def run_services(
):
service.start()
validator_duty_services.append(service)
beacon_chain.new_slot_handlers.append(service.on_new_slot)
_logger.info("Started validator duty services")

event_consumer_service = EventConsumerService(
multi_beacon_node=multi_beacon_node,
beacon_chain=beacon_chain,
scheduler=scheduler,
task_manager=task_manager,
)

_register_event_handlers(
Expand Down
1 change: 1 addition & 0 deletions src/observability/_metrics_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ErrorType(Enum):
SYNC_COMMITTEE_MESSAGE_PRODUCE = "sync-committee-message-produce"
SYNC_COMMITTEE_MESSAGE_PUBLISH = "sync-committee-message-publish"
DUTIES_UPDATE = "duties-update"
VALIDATOR_STATUS_UPDATE = "validator-status-update"
SIGNATURE = "signature"
OTHER = "other"

Expand Down
2 changes: 2 additions & 0 deletions src/providers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from .beacon_chain import BeaconChain
from .beacon_node import BeaconNode
from .multi_beacon_node import MultiBeaconNode
from .remote_signer import RemoteSigner

__all__ = [
"BeaconChain",
"BeaconNode",
"MultiBeaconNode",
"RemoteSigner",
]
40 changes: 38 additions & 2 deletions src/providers/beacon_chain.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,36 @@
"""Provides information about the beacon chain - current slot, epoch, fork, genesis and spec data."""

import asyncio
import datetime
import logging
from math import floor
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import pytz

from schemas import SchemaRemoteSigner
from spec.base import Fork, Genesis, Spec
from tasks import TaskManager

if TYPE_CHECKING:
from collections.abc import Callable, Coroutine

from providers import MultiBeaconNode


class BeaconChain:
def __init__(self, multi_beacon_node: "MultiBeaconNode"):
def __init__(self, multi_beacon_node: "MultiBeaconNode", task_manager: TaskManager):
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.setLevel(logging.getLogger().level)

self.multi_beacon_node = multi_beacon_node
self.task_manager = task_manager

self.new_slot_handlers: list[
Callable[[int, bool], Coroutine[Any, Any, None]]
] = []

self.task_manager.submit_task(self.on_new_slot())

@property
def genesis(self) -> Genesis:
Expand Down Expand Up @@ -75,6 +86,31 @@ def _get_slots_since_genesis(self) -> int:
def current_slot(self) -> int:
return self._get_slots_since_genesis()

async def _wait_for_next_slot(self) -> None:
# A slightly more accurate version of asyncio.sleep()
_next_slot = self.current_slot + 1
_delay = (
self.get_datetime_for_slot(_next_slot) - datetime.datetime.now(tz=pytz.UTC)
).total_seconds()

# asyncio.sleep can be off by up to 16ms (on Windows)
await asyncio.sleep(_delay - 0.016)

while self.current_slot < _next_slot: # noqa: ASYNC110
await asyncio.sleep(0)

self.task_manager.submit_task(self.on_new_slot())

async def on_new_slot(self) -> None:
_current_slot = self.current_slot # Cache property value
self.logger.info(f"Slot {_current_slot}")
_is_new_epoch = _current_slot % self.spec.SLOTS_PER_EPOCH == 0

for handler in self.new_slot_handlers:
self.task_manager.submit_task(handler(_current_slot, _is_new_epoch))

self.task_manager.submit_task(self._wait_for_next_slot())

def time_since_slot_start(self, slot: int) -> float:
return (
datetime.datetime.now(tz=pytz.UTC) - self.get_datetime_for_slot(slot)
Expand Down
57 changes: 33 additions & 24 deletions src/providers/beacon_node.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Provides methods for interacting with a beacon node through the [Beacon Node API](https://github.com/ethereum/beacon-APIs)."""

import asyncio
import datetime
import json
import logging
from collections.abc import AsyncIterable
Expand All @@ -10,7 +9,6 @@

import aiohttp
import msgspec
import pytz
from aiohttp import ClientTimeout
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from opentelemetry import trace
Expand All @@ -27,11 +25,10 @@
from spec.base import Genesis, Spec, parse_spec
from spec.configs import Network, get_network_spec
from spec.sync_committee import SyncCommitteeContributionClass
from tasks import TaskManager

_TIMEOUT_DEFAULT_CONNECT = 1
_TIMEOUT_DEFAULT_TOTAL = 10
_SCORE_DELTA_SUCCESS = 1
_SCORE_DELTA_FAILURE = 5


_BEACON_NODE_SCORE = Gauge(
Expand Down Expand Up @@ -76,7 +73,13 @@ class BeaconNodeUnsupportedEndpoint(Exception):


class BeaconNode:
def __init__(self, base_url: str, scheduler: AsyncIOScheduler) -> None:
MAX_SCORE = 100
SCORE_DELTA_SUCCESS = 1
SCORE_DELTA_FAILURE = 5

def __init__(
self, base_url: str, scheduler: AsyncIOScheduler, task_manager: TaskManager
) -> None:
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.setLevel(logging.getLogger().level)

Expand All @@ -88,6 +91,7 @@ def __init__(self, base_url: str, scheduler: AsyncIOScheduler) -> None:
raise ValueError(f"Failed to parse hostname from {base_url}")

self.scheduler = scheduler
self.task_manager = task_manager

self.initialized = False
self._score = 0
Expand Down Expand Up @@ -122,7 +126,7 @@ def score(self) -> int:

@score.setter
def score(self, value: int) -> None:
self._score = max(0, min(value, 100))
self._score = max(0, min(value, BeaconNode.MAX_SCORE))
_BEACON_NODE_SCORE.labels(host=self.host).set(self._score)

async def _initialize_full(self, cli_args: CLIArgs) -> None:
Expand All @@ -148,10 +152,10 @@ async def _initialize_full(self, cli_args: CLIArgs) -> None:
self.get_node_version,
"interval",
minutes=10,
id=f"{self.__class__.__name__}.get_node_version-{self.host}",
id=f"{self.__class__.__name__}.get_node_version-{self.base_url}",
)

self.score = 100
self.score = BeaconNode.MAX_SCORE
self.initialized = True

async def initialize_full(self, cli_args: CLIArgs) -> None:
Expand All @@ -165,17 +169,9 @@ async def initialize_full(self, cli_args: CLIArgs) -> None:
f"Failed to initialize beacon node at {self.base_url}: {e!r}",
exc_info=self.logger.isEnabledFor(logging.DEBUG),
)
# Retry initializing every 30 seconds
next_run_time = datetime.datetime.now(tz=pytz.UTC) + datetime.timedelta(
seconds=30,
)
self.scheduler.add_job(
self.initialize_full,
"date",
next_run_time=next_run_time,
kwargs=dict(cli_args=cli_args),
id=f"{self.__class__.__name__}.initialize_full-{self.host}",
replace_existing=True,
# Try to initialize again in 30 seconds
self.task_manager.submit_task(
self.initialize_full(cli_args=cli_args), delay=30.0
)

@staticmethod
Expand Down Expand Up @@ -222,7 +218,7 @@ async def _make_request(
await self._handle_nok_status_code(response=resp)

# Request was successfully fulfilled
self.score += _SCORE_DELTA_SUCCESS
self.score += BeaconNode.SCORE_DELTA_SUCCESS
return await resp.text()
except BeaconNodeUnsupportedEndpoint:
raise
Expand All @@ -231,7 +227,7 @@ async def _make_request(
f"Failed to get response from {self.host} for {method} {endpoint}: {e!r}",
exc_info=self.logger.isEnabledFor(logging.DEBUG),
)
self.score -= _SCORE_DELTA_FAILURE
self.score -= BeaconNode.SCORE_DELTA_FAILURE
raise

def _raise_if_optimistic(
Expand Down Expand Up @@ -692,8 +688,11 @@ async def produce_block_v3(
),
)

block_value: int = consensus_block_value + execution_payload_value
self.logger.info(f"{self.host} returned block with value {block_value}")
self.logger.info(
f"{self.host} returned block with"
f" consensus block value {consensus_block_value},"
f" execution payload value {execution_payload_value}."
)
_BEACON_NODE_CONSENSUS_BLOCK_VALUE.labels(host=self.host).observe(
consensus_block_value
)
Expand Down Expand Up @@ -809,6 +808,7 @@ async def subscribe_to_events(
exc_info=self.logger.isEnabledFor(logging.DEBUG),
)
continue

event_data = []
next_line = (await anext(events_iter)).decode()
while next_line not in ("\n", "\r\n"):
Expand All @@ -821,6 +821,15 @@ async def subscribe_to_events(
raise NotImplementedError(
f"Unable to process event with name {event_name}, event_data: {event_data}!",
) from None
yield msgspec.json.decode(

event = msgspec.json.decode(
event_data[0].split("data:")[1], type=event_struct
)

if (
hasattr(event, "execution_optimistic")
and event.execution_optimistic
):
raise ValueError(f"Execution optimistic for event: {event}")

yield event
16 changes: 13 additions & 3 deletions src/providers/multi_beacon_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@

from args import CLIArgs
from observability import ErrorType, get_shared_metrics
from providers.beacon_node import BeaconNode
from providers import BeaconNode
from schemas import SchemaBeaconAPI, SchemaValidator
from spec.attestation import Attestation, AttestationData
from spec.block import BeaconBlockClass
from spec.configs import Network
from spec.sync_committee import SyncCommitteeContributionClass
from tasks import TaskManager

(_ERRORS_METRIC,) = get_shared_metrics()
_VC_ATTESTATION_CONSENSUS_CONTRIBUTIONS = CounterMetric(
Expand All @@ -75,6 +76,7 @@ def __init__(
beacon_node_urls: list[str],
beacon_node_urls_proposal: list[str],
scheduler: AsyncIOScheduler,
task_manager: TaskManager,
cli_args: CLIArgs,
):
self.logger = logging.getLogger(self.__class__.__name__)
Expand All @@ -83,11 +85,15 @@ def __init__(
self.tracer = trace.get_tracer(self.__class__.__name__)

self.beacon_nodes = [
BeaconNode(base_url=base_url, scheduler=scheduler)
BeaconNode(
base_url=base_url, scheduler=scheduler, task_manager=task_manager
)
for base_url in beacon_node_urls
]
self.beacon_nodes_proposal = [
BeaconNode(base_url=base_url, scheduler=scheduler)
BeaconNode(
base_url=base_url, scheduler=scheduler, task_manager=task_manager
)
for base_url in beacon_node_urls_proposal
]

Expand Down Expand Up @@ -151,6 +157,10 @@ async def __aexit__(
],
)

@property
def primary_beacon_node(self) -> BeaconNode:
return self.beacon_nodes[0]

@property
def best_beacon_node(self) -> BeaconNode:
return next(
Expand Down
Loading

0 comments on commit 9016519

Please sign in to comment.