Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[WIP] Pectra
Browse files Browse the repository at this point in the history
eth2353 committed Jan 25, 2025
1 parent c31739d commit eb43cc2
Showing 36 changed files with 1,454 additions and 405 deletions.
1 change: 1 addition & 0 deletions requirements-dev.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
-c requirements.txt
aioresponses
milagro-bls-binding
pre-commit
pytest
pytest-asyncio
10 changes: 10 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -289,6 +289,16 @@ iniconfig==2.0.0 \
--hash=sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3 \
--hash=sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374
# via pytest
milagro-bls-binding==1.9.0 \
--hash=sha256:2c47c5e45b30d0df02b65c2da4f9e10a49cc1a773f47796294663fc564ca56ee \
--hash=sha256:43fb41b335b2a40ee21f2698c6ae27ed83921f5f6109443705f793c77d4b6d6e \
--hash=sha256:4d91da896d8de735c828dc1815d7dcaeeed9363c25a5d4f725ec3916672cbc79 \
--hash=sha256:5646113ffa12a43acda419341817cf3b1b327b9e81dfd2c2a98c6aa1b38422c0 \
--hash=sha256:8800b9a8c61c20d1fdb5593b8e1940cbf6e521b454cc6f764fc22f026337651f \
--hash=sha256:a28cbae598a01f76c5204a29b2d060c9aee8c66898e37c37b708aecc16d1b482 \
--hash=sha256:b2362f0d14318a3f44a3d1e186e2069eb25859cc4b9cae3e473505a28954803e \
--hash=sha256:d723eac25c1c5d8ebc9937a4eebcea40be1b2fff742dcf181d83e4ee59d2b12d
# via -r requirements-dev.in
multidict==6.1.0 \
--hash=sha256:052e10d2d37810b99cc170b785945421141bf7bb7d2f8799d431e7db229c385f \
--hash=sha256:06809f4f0f7ab7ea2cabf9caca7d79c22c0758b58a71f9d32943ae13c7ace056 \
14 changes: 7 additions & 7 deletions src/initialize.py
Original file line number Diff line number Diff line change
@@ -119,6 +119,11 @@ async def run_services(
) -> None:
spec = load_spec(cli_args=cli_args)

beacon_chain = BeaconChain(
spec=spec,
task_manager=task_manager,
)

async with (
RemoteSigner(url=cli_args.remote_signer_url) as remote_signer,
MultiBeaconNode(
@@ -130,13 +135,9 @@ async def run_services(
cli_args=cli_args,
) as multi_beacon_node,
):
beacon_chain = BeaconChain(
spec=spec,
multi_beacon_node=multi_beacon_node,
task_manager=task_manager,
)

beacon_chain.initialize(genesis=multi_beacon_node.best_beacon_node.genesis)
await _wait_for_genesis(genesis_datetime=beacon_chain.get_datetime_for_slot(0))
beacon_chain.start_slot_ticker()

_logger.info(f"Current epoch: {beacon_chain.current_epoch}")
_logger.info(f"Current slot: {beacon_chain.current_slot}")
@@ -157,7 +158,6 @@ async def run_services(
validator_service_args = ValidatorDutyServiceOptions(
multi_beacon_node=multi_beacon_node,
beacon_chain=beacon_chain,
spec=spec,
remote_signer=remote_signer,
validator_status_tracker_service=validator_status_tracker_service,
scheduler=scheduler,
46 changes: 25 additions & 21 deletions src/providers/beacon_chain.py
Original file line number Diff line number Diff line change
@@ -6,58 +6,44 @@
from math import floor
from typing import TYPE_CHECKING, Any

from schemas import SchemaRemoteSigner
from schemas import SchemaBeaconAPI, SchemaRemoteSigner
from spec._ascii import ELECTRA as ELECTRA_ASCII_ART
from spec.base import Fork, Genesis, Spec
from tasks import TaskManager

if TYPE_CHECKING:
from collections.abc import Callable, Coroutine

from providers import MultiBeaconNode


class BeaconChain:
def __init__(
self,
spec: Spec,
multi_beacon_node: "MultiBeaconNode",
task_manager: TaskManager,
):
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.setLevel(logging.getLogger().level)

self.spec = spec
self.multi_beacon_node = multi_beacon_node
self.task_manager = task_manager

self.genesis = Genesis()
self.current_fork_version = SchemaBeaconAPI.ForkVersion.DENEB

self.new_slot_handlers: list[
Callable[[int, bool], Coroutine[Any, Any, None]]
] = []

self.task_manager.submit_task(self.on_new_slot())

@property
def genesis(self) -> Genesis:
return next(
bn.genesis for bn in self.multi_beacon_node.beacon_nodes if bn.initialized
)

def get_fork(self, slot: int) -> Fork:
slot_epoch = slot // self.spec.SLOTS_PER_EPOCH

if (
hasattr(self.spec, "ELECTRA_FORK_EPOCH")
and slot_epoch >= self.spec.ELECTRA_FORK_EPOCH
):
if slot_epoch >= self.spec.ELECTRA_FORK_EPOCH:
return Fork(
previous_version=self.spec.DENEB_FORK_VERSION,
current_version=self.spec.ELECTRA_FORK_VERSION,
epoch=self.spec.ELECTRA_FORK_EPOCH,
)
if (
hasattr(self.spec, "DENEB_FORK_EPOCH")
and slot_epoch >= self.spec.DENEB_FORK_EPOCH
):
if slot_epoch >= self.spec.DENEB_FORK_EPOCH:
return Fork(
previous_version=self.spec.CAPELLA_FORK_VERSION,
current_version=self.spec.DENEB_FORK_VERSION,
@@ -71,6 +57,18 @@ def get_fork_info(self, slot: int) -> SchemaRemoteSigner.ForkInfo:
genesis_validators_root=self.genesis.genesis_validators_root.to_obj(),
)

def initialize(self, genesis: Genesis) -> None:
self.genesis = genesis

current_epoch = self.current_slot // self.spec.SLOTS_PER_EPOCH
if current_epoch >= self.spec.ELECTRA_FORK_EPOCH:
self.current_fork_version = SchemaBeaconAPI.ForkVersion.ELECTRA
else:
self.current_fork_version = SchemaBeaconAPI.ForkVersion.DENEB

def start_slot_ticker(self) -> None:
self.task_manager.submit_task(self.on_new_slot())

def get_datetime_for_slot(self, slot: int) -> datetime.datetime:
slot_timestamp = self.genesis.genesis_time + slot * self.spec.SECONDS_PER_SLOT
return datetime.datetime.fromtimestamp(slot_timestamp, tz=datetime.UTC)
@@ -105,6 +103,12 @@ async def on_new_slot(self) -> None:
self.logger.info(f"Slot {_current_slot}")
_is_new_epoch = _current_slot % self.spec.SLOTS_PER_EPOCH == 0

if _is_new_epoch:
_current_epoch = _current_slot // self.spec.SLOTS_PER_EPOCH
if _current_epoch == self.spec.ELECTRA_FORK_EPOCH:
self.current_fork_version = SchemaBeaconAPI.ForkVersion.ELECTRA
self.logger.info(f"Electra fork epoch reached! {ELECTRA_ASCII_ART}")

for handler in self.new_slot_handlers:
self.task_manager.submit_task(handler(_current_slot, _is_new_epoch))

100 changes: 67 additions & 33 deletions src/providers/beacon_node.py
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
from spec import Spec, SpecAttestation, SpecSyncCommittee
from spec.attestation import AttestationData
from spec.base import Genesis, parse_spec
from spec.constants import INTERVALS_PER_SLOT
from tasks import TaskManager

_TIMEOUT_DEFAULT_CONNECT = 1
@@ -76,7 +77,11 @@ class BeaconNode:
SCORE_DELTA_FAILURE = 5

def __init__(
self, base_url: str, scheduler: AsyncIOScheduler, task_manager: TaskManager
self,
base_url: str,
spec: Spec,
scheduler: AsyncIOScheduler,
task_manager: TaskManager,
) -> None:
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.setLevel(logging.getLogger().level)
@@ -88,6 +93,8 @@ def __init__(
if not self.host:
raise ValueError(f"Failed to parse hostname from {base_url}")

self.spec = spec

self.scheduler = scheduler
self.task_manager = task_manager

@@ -127,20 +134,26 @@ def score(self, value: int) -> None:
self._score = max(0, min(value, BeaconNode.MAX_SCORE))
_BEACON_NODE_SCORE.labels(host=self.host).set(self._score)

async def _initialize_full(self, spec: Spec) -> None:
async def _initialize_full(self) -> None:
self.genesis = await self.get_genesis()

# Warn if the spec returned by the beacon node differs
bn_spec = await self.get_spec()
if spec != bn_spec:
self.logger.warning(
f"Spec values returned by beacon node not equal to hardcoded spec values."
f"\nBeacon node:\n{bn_spec}"
f"\nHardcoded:\n{spec}"
try:
bn_spec = await self.get_spec()
if self.spec != bn_spec:
self.logger.warning(
f"Spec values returned by beacon node not equal to hardcoded spec values."
f"\nBeacon node:\n{bn_spec}"
f"\nHardcoded:\n{self.spec}"
)
except Exception as e:
# This triggers with Prysm because it doesn't return a value
# for MAX_BLOB_COMMITMENTS_PER_BLOCK TODO report?
self.logger.error(
f"Failed to verify beacon node spec, error: {e!r}",
exc_info=self.logger.isEnabledFor(logging.DEBUG),
)

self.spec = bn_spec

# Regularly refresh the version of the beacon node
self.node_version = await self.get_node_version()
self.scheduler.add_job(
@@ -153,9 +166,9 @@ async def _initialize_full(self, spec: Spec) -> None:
self.score = BeaconNode.MAX_SCORE
self.initialized = True

async def initialize_full(self, spec: Spec) -> None:
async def initialize_full(self) -> None:
try:
await self._initialize_full(spec=spec)
await self._initialize_full()
self.logger.info(
f"Initialized beacon node at {self.base_url}",
)
@@ -165,7 +178,7 @@ async def initialize_full(self, spec: Spec) -> None:
exc_info=self.logger.isEnabledFor(logging.DEBUG),
)
# Try to initialize again in 30 seconds
self.task_manager.submit_task(self.initialize_full(spec=spec), delay=30.0)
self.task_manager.submit_task(self.initialize_full(), delay=30.0)

@staticmethod
async def _handle_nok_status_code(response: aiohttp.ClientResponse) -> None:
@@ -499,11 +512,16 @@ async def publish_sync_committee_messages(
data=self.json_encoder.encode(messages),
)

async def publish_attestations(self, attestations: list[dict]) -> None: # type: ignore[type-arg]
async def publish_attestations(
self,
attestations: list[dict], # type: ignore[type-arg]
fork_version: SchemaBeaconAPI.ForkVersion,
) -> None:
await self._make_request(
method="POST",
endpoint="/eth/v1/beacon/pool/attestations",
endpoint="/eth/v2/beacon/pool/attestations",
data=self.json_encoder.encode(attestations),
headers={"Eth-Consensus-Version": fork_version.value},
)

async def prepare_beacon_committee_subscriptions(self, data: list[dict]) -> None: # type: ignore[type-arg]
@@ -520,39 +538,50 @@ async def prepare_sync_committee_subscriptions(self, data: list[dict]) -> None:
data=self.json_encoder.encode(data),
)

async def get_aggregate_attestation(
async def get_aggregate_attestation_v2(
self,
attestation_data: AttestationData,
) -> "SpecAttestation.AttestationDeneb":
resp = await self._make_request(
committee_index: int,
) -> "SpecAttestation.AttestationPhase0 | SpecAttestation.AttestationElectra":
resp_text = await self._make_request(
method="GET",
endpoint="/eth/v1/validator/aggregate_attestation",
endpoint="/eth/v2/validator/aggregate_attestation",
params=dict(
attestation_data_root=f"0x{attestation_data.hash_tree_root().hex()}",
slot=attestation_data.slot,
committee_index=committee_index,
),
timeout=ClientTimeout(
connect=self.client_session.timeout.connect,
total=int(self.spec.SECONDS_PER_SLOT)
/ int(self.spec.INTERVALS_PER_SLOT),
total=int(self.spec.SECONDS_PER_SLOT) / INTERVALS_PER_SLOT,
),
)

return SpecAttestation.AttestationDeneb.from_obj(json.loads(resp)["data"])
response = msgspec.json.decode(
resp_text, type=SchemaBeaconAPI.GetAggregatedAttestationV2Response
)

if response.version == SchemaBeaconAPI.ForkVersion.DENEB:
return SpecAttestation.AttestationPhase0.from_obj(response.data)
if response.version == SchemaBeaconAPI.ForkVersion.ELECTRA:
return SpecAttestation.AttestationElectra.from_obj(response.data)
raise NotImplementedError(f"Unsupported fork version {response.version}")

async def publish_aggregate_and_proofs(
self,
signed_aggregate_and_proofs: list[tuple[dict, str]], # type: ignore[type-arg]
fork_version: SchemaBeaconAPI.ForkVersion,
) -> None:
await self._make_request(
method="POST",
endpoint="/eth/v1/validator/aggregate_and_proofs",
endpoint="/eth/v2/validator/aggregate_and_proofs",
data=self.json_encoder.encode(
[
dict(message=msg, signature=sig)
for msg, sig in signed_aggregate_and_proofs
]
),
headers={"Eth-Consensus-Version": fork_version.value},
)

async def get_sync_committee_contribution(
@@ -571,8 +600,7 @@ async def get_sync_committee_contribution(
),
timeout=ClientTimeout(
connect=self.client_session.timeout.connect,
total=int(self.spec.SECONDS_PER_SLOT)
/ int(self.spec.INTERVALS_PER_SLOT),
total=int(self.spec.SECONDS_PER_SLOT) / INTERVALS_PER_SLOT,
),
)

@@ -693,13 +721,16 @@ async def produce_block_v3(

async def publish_block_v2(
self,
block_version: SchemaBeaconAPI.BeaconBlockVersion,
fork_version: SchemaBeaconAPI.ForkVersion,
block: Container,
blobs: list, # type: ignore[type-arg]
kzg_proofs: list, # type: ignore[type-arg]
signature: str,
) -> None:
if block_version == SchemaBeaconAPI.BeaconBlockVersion.DENEB:
if fork_version in (
SchemaBeaconAPI.ForkVersion.DENEB,
SchemaBeaconAPI.ForkVersion.ELECTRA,
):
data = dict(
signed_block=dict(
message=block.to_obj(),
@@ -709,7 +740,7 @@ async def publish_block_v2(
blobs=blobs,
)
else:
raise NotImplementedError(f"Unsupported block version {block_version}")
raise NotImplementedError(f"Unsupported fork version {fork_version}")

self.logger.debug(
f"Publishing block for slot {block.slot},"
@@ -721,22 +752,25 @@ async def publish_block_v2(
method="POST",
endpoint="/eth/v2/beacon/blocks",
data=self.json_encoder.encode(data),
headers={"Eth-Consensus-Version": block_version.value},
headers={"Eth-Consensus-Version": fork_version.value},
)

async def publish_blinded_block_v2(
self,
block_version: SchemaBeaconAPI.BeaconBlockVersion,
fork_version: SchemaBeaconAPI.ForkVersion,
block: Container,
signature: str,
) -> None:
if block_version == SchemaBeaconAPI.BeaconBlockVersion.DENEB:
if fork_version in (
SchemaBeaconAPI.ForkVersion.DENEB,
SchemaBeaconAPI.ForkVersion.ELECTRA,
):
data = dict(
message=block.to_obj(),
signature=signature,
)
else:
raise NotImplementedError(f"Unsupported block version {block_version}")
raise NotImplementedError(f"Unsupported fork version {fork_version}")

self.logger.debug(
f"Publishing blinded block for slot {block.slot},"
@@ -748,7 +782,7 @@ async def publish_blinded_block_v2(
method="POST",
endpoint="/eth/v2/beacon/blinded_blocks",
data=self.json_encoder.encode(data),
headers={"Eth-Consensus-Version": block_version.value},
headers={"Eth-Consensus-Version": fork_version.value},
)

async def subscribe_to_events(
Loading

0 comments on commit eb43cc2

Please sign in to comment.