Skip to content

Commit

Permalink
Few block blobs pool improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Jan 31, 2025
1 parent 7c471af commit 218801b
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public Optional<ExecutionPayloadEnvelope> revealExecutionPayload(
false,
state.hashTreeRoot());
try {
// Run state transition and set state root
final BeaconState newState =
spec.atSlot(block.getSlot())
.getExecutionPayloadProcessor()
Expand All @@ -102,7 +103,7 @@ public Optional<ExecutionPayloadEnvelope> revealExecutionPayload(
return Optional.of(executionPayload.withStateRoot(newState.hashTreeRoot()));
} catch (ExecutionPayloadProcessingException ex) {
LOG.warn(
"State transition error while processing execution payload with beacon block root"
"State transition error while processing execution payload with beacon block root "
+ executionPayload.getBeaconBlockRoot(),
ex);
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public SafeFuture<InternalValidationResult> sendExecutionPayload(
executionPayloadAndBlobSidecarsRevealer.revealBlobSidecars(block, executionPayload);
publishExecutionPayloadAndBlobSidecars(executionPayload, blobSidecars);
// provide blobs for the execution payload before importing it
blockBlobSidecarsTrackersPool.onCompletedBlockExecutionPayloadAndBlobSidecars(
blockBlobSidecarsTrackersPool.onCompletedExecutionPayloadAndBlobSidecars(
block, executionPayload, blobSidecars);
return executionPayloadManager.validateAndImportExecutionPayload(
executionPayload, Optional.of(timeProvider.getTimeInMillis()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ public ExecutionPayloadProcessingException(final String message) {
super(message);
}

public ExecutionPayloadProcessingException(final String template, Object... args) {
super(String.format(template, args));
}

public ExecutionPayloadProcessingException(final Exception ex) {
super(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.bls.BLS;
import tech.pegasys.teku.infrastructure.ssz.SszList;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.config.SpecConfigEip7732;
import tech.pegasys.teku.spec.constants.Domain;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlockHeader;
Expand Down Expand Up @@ -131,7 +132,8 @@ public void processExecutionPayload(
.hashTreeRoot()
.equals(BeaconStateEip7732.required(state).getLatestWithdrawalsRoot())) {
throw new ExecutionPayloadProcessingException(
"Execution payload withdrawals root is not consistent with the state latest withdrawals root");
"Execution payload withdrawals root %s is not consistent with the state latest withdrawals root %s",
payload.getWithdrawals(), BeaconStateEip7732.required(state).getLatestWithdrawalsRoot());
}

// Verify the gas limit
Expand All @@ -144,32 +146,35 @@ public void processExecutionPayload(
// Verify consistency of the parent hash with respect to the previous execution payload
if (!payload.getParentHash().equals(BeaconStateEip7732.required(state).getLatestBlockHash())) {
throw new ExecutionPayloadProcessingException(
"Execution payload parent hash is not consistent with the latest block hash from state");
"Execution payload parent hash %s is not consistent with the latest block hash %s from state",
payload.getParentHash(), BeaconStateEip7732.required(state).getLatestBlockHash());
}

// Verify prev_randao
// EIP-7732 TODO: fix (doesn't work in local interop)
// if (!payload
// .getPrevRandao()
// .equals(
// beaconStateAccessors.getRandaoMix(
// state, miscHelpers.computeEpochAtSlot(state.getSlot())))) {
// final Bytes32 expectedPrevRandao =
// beaconStateAccessors.getRandaoMix(state,
// miscHelpers.computeEpochAtSlot(state.getSlot()));
// if (!payload.getPrevRandao().equals(expectedPrevRandao)) {
// throw new ExecutionPayloadProcessingException(
// "Execution payload prev randao is not as expected");
// "Execution payload prev randao %s is not as expected %s",
// payload.getPrevRandao(), expectedPrevRandao);
// }

// Verify timestamp
if (!payload
.getTimestamp()
.equals(miscHelpers.computeTimeAtSlot(state.getGenesisTime(), state.getSlot()))) {
final UInt64 expectedTimestamp =
miscHelpers.computeTimeAtSlot(state.getGenesisTime(), state.getSlot());
if (!payload.getTimestamp().equals(expectedTimestamp)) {
throw new ExecutionPayloadProcessingException(
"Execution payload timestamp is not as expected");
"Execution payload timestamp %s is not as expected %s",
payload.getTimestamp(), expectedTimestamp);
}

// Verify commitments are under limit
if (envelope.getBlobKzgCommitments().size() > specConfig.getMaxBlobCommitmentsPerBlock()) {
throw new ExecutionPayloadProcessingException(
"Execution payload blob kzg commitments are over the limit");
"Execution payload blob kzg commitments are over the limit %d > %d",
envelope.getBlobKzgCommitments().size(), specConfig.getMaxBlobCommitmentsPerBlock());
}

// Verify the execution payload is valid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class BlockBlobSidecarsTracker {
private final AtomicReference<Optional<SignedBeaconBlock>> block =
new AtomicReference<>(Optional.empty());

// ePBS
private final AtomicReference<Optional<ExecutionPayloadEnvelope>> executionPayloadEnvelope =
new AtomicReference<>(Optional.empty());

Expand Down Expand Up @@ -185,22 +186,20 @@ public boolean setBlock(final SignedBeaconBlock block) {
return true;
}

public boolean setBlockAndExecutionPayloadEnvelope(
// EIP-7732 TODO: debug timings
public boolean setExecutionPayloadEnvelope(
final SignedBeaconBlock block, final ExecutionPayloadEnvelope executionPayloadEnvelope) {
checkArgument(block.getSlotAndBlockRoot().equals(slotAndBlockRoot), "Wrong block");
final Optional<SignedBeaconBlock> oldBlock = this.block.getAndSet(Optional.of(block));
checkArgument(
executionPayloadEnvelope.getBeaconBlockRoot().equals(slotAndBlockRoot.getBlockRoot()),
"Wrong execution payload envelope");
this.block.set(Optional.of(block));
final Optional<ExecutionPayloadEnvelope> oldExecutionPayloadEnvelope =
this.executionPayloadEnvelope.getAndSet(Optional.of(executionPayloadEnvelope));
if (oldBlock.isPresent() || oldExecutionPayloadEnvelope.isPresent()) {
if (oldExecutionPayloadEnvelope.isPresent()) {
return false;
}

LOG.debug(
"Block and execution payload envelope received for {}", slotAndBlockRoot::toLogString);
maybeDebugTimings.ifPresent(
debugTimings -> {
debugTimings.put(BLOCK_ARRIVAL_TIMING_IDX, System.currentTimeMillis());
});
LOG.debug("Execution payload envelope received for {}", slotAndBlockRoot::toLogString);

pruneExcessiveBlobSidecars();
checkCompletion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,18 @@ public void onNewBlobSidecar(
public void onNewBlock(
final SignedBeaconBlock block, final Optional<RemoteOrigin> remoteOrigin) {}

@Override
public void onNewExecutionPayload(
final SignedBeaconBlock block,
final SignedExecutionPayloadEnvelope executionPayload,
final Optional<RemoteOrigin> remoteOrigin) {}

@Override
public void onCompletedBlockAndBlobSidecars(
final SignedBeaconBlock block, final List<BlobSidecar> blobSidecars) {}

@Override
public void onCompletedBlockExecutionPayloadAndBlobSidecars(
public void onCompletedExecutionPayloadAndBlobSidecars(
final SignedBeaconBlock block,
final SignedExecutionPayloadEnvelope executionPayload,
final List<BlobSidecar> blobSidecars) {}
Expand Down Expand Up @@ -126,9 +132,14 @@ public void subscribeNewBlobSidecar(NewBlobSidecarSubscriber newBlobSidecarSubsc

void onNewBlock(SignedBeaconBlock block, Optional<RemoteOrigin> remoteOrigin);

void onNewExecutionPayload(
SignedBeaconBlock block,
SignedExecutionPayloadEnvelope executionPayload,
Optional<RemoteOrigin> remoteOrigin);

void onCompletedBlockAndBlobSidecars(SignedBeaconBlock block, List<BlobSidecar> blobSidecars);

void onCompletedBlockExecutionPayloadAndBlobSidecars(
void onCompletedExecutionPayloadAndBlobSidecars(
SignedBeaconBlock block,
SignedExecutionPayloadEnvelope executionPayload,
List<BlobSidecar> blobSidecars);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.execution.SignedExecutionPayloadEnvelope;
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
import tech.pegasys.teku.statetransition.validation.ExecutionPayloadValidator;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
Expand All @@ -37,16 +39,19 @@ public class ExecutionPayloadManager {
new ConcurrentHashMap<>();

private final ExecutionPayloadValidator executionPayloadValidator;
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final ForkChoice forkChoice;
private final RecentChainData recentChainData;
private final ExecutionLayerChannel executionLayerChannel;

public ExecutionPayloadManager(
final ExecutionPayloadValidator executionPayloadValidator,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final ForkChoice forkChoice,
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayerChannel) {
this.executionPayloadValidator = executionPayloadValidator;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
this.forkChoice = forkChoice;
this.recentChainData = recentChainData;
this.executionLayerChannel = executionLayerChannel;
Expand All @@ -67,9 +72,23 @@ public SafeFuture<InternalValidationResult> validateAndImportExecutionPayload(
timestamp ->
recentChainData.onExecutionPayload(signedExecutionPayloadEnvelope, timestamp),
() -> LOG.error("arrivalTimestamp tracking must be enabled to support Eip7732"));
validatedExecutionPayloadEnvelopes.put(
signedExecutionPayloadEnvelope.getMessage().getBeaconBlockRoot(),
signedExecutionPayloadEnvelope);
final Bytes32 blockRoot =
signedExecutionPayloadEnvelope.getMessage().getBeaconBlockRoot();
validatedExecutionPayloadEnvelopes.put(blockRoot, signedExecutionPayloadEnvelope);
recentChainData
.retrieveSignedBlockByRoot(blockRoot)
.finish(
maybeBlock ->
maybeBlock.ifPresent(
block ->
blockBlobSidecarsTrackersPool.onNewExecutionPayload(
block,
signedExecutionPayloadEnvelope,
Optional.of(RemoteOrigin.GOSSIP))),
err ->
LOG.error(
"Couldn't retrieve a block for execution payload with root {}",
blockRoot));
forkChoice
.onExecutionPayload(signedExecutionPayloadEnvelope, executionLayerChannel)
.finish(err -> LOG.error("Failed to process received execution payload.", err));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ public synchronized void onNewBlock(
if (block.getMessage().getBody().toVersionDeneb().isEmpty()) {
return;
}
// in ePBS, when importing the block there is no data availability check
if (block.getMessage().getBody().toVersionEip7732().isPresent()) {
return;
}
if (recentChainData.containsBlock(block.getRoot())) {
return;
}
Expand All @@ -299,6 +303,24 @@ public synchronized void onNewBlock(
internalOnNewBlock(block, remoteOrigin);
}

@Override
public void onNewExecutionPayload(
final SignedBeaconBlock block,
final SignedExecutionPayloadEnvelope executionPayload,
final Optional<RemoteOrigin> remoteOrigin) {
if (block.getMessage().getBody().toVersionEip7732().isEmpty()) {
return;
}
if (recentChainData.containsExecutionPayloadEnvelope(
executionPayload.getMessage().getBeaconBlockRoot())) {
return;
}
if (shouldIgnoreItemAtSlot(block.getSlot())) {
return;
}
internalOnNewExecutionPayloadEnvelope(block, executionPayload.getMessage(), remoteOrigin);
}

@Override
public synchronized BlockBlobSidecarsTracker getOrCreateBlockBlobSidecarsTracker(
final SignedBeaconBlock block) {
Expand All @@ -308,8 +330,7 @@ public synchronized BlockBlobSidecarsTracker getOrCreateBlockBlobSidecarsTracker
@Override
public BlockBlobSidecarsTracker getOrCreateBlockBlobSidecarsTracker(
final SignedBeaconBlock block, final ExecutionPayloadEnvelope executionPayloadEnvelope) {
return internalOnNewBlockAndExecutionPayloadEnvelope(
block, executionPayloadEnvelope, Optional.empty());
return internalOnNewExecutionPayloadEnvelope(block, executionPayloadEnvelope, Optional.empty());
}

@Override
Expand Down Expand Up @@ -359,7 +380,7 @@ public synchronized void onCompletedBlockAndBlobSidecars(
}

@Override
public void onCompletedBlockExecutionPayloadAndBlobSidecars(
public void onCompletedExecutionPayloadAndBlobSidecars(
final SignedBeaconBlock block,
final SignedExecutionPayloadEnvelope executionPayload,
final List<BlobSidecar> blobSidecars) {
Expand All @@ -371,7 +392,7 @@ public void onCompletedBlockExecutionPayloadAndBlobSidecars(
final BlockBlobSidecarsTracker blobSidecarsTracker =
getOrCreateBlobSidecarsTracker(slotAndBlockRoot, __ -> {}, __ -> {});

blobSidecarsTracker.setBlockAndExecutionPayloadEnvelope(block, executionPayload.getMessage());
blobSidecarsTracker.setExecutionPayloadEnvelope(block, executionPayload.getMessage());

long addedBlobs =
blobSidecars.stream()
Expand All @@ -390,8 +411,8 @@ public void onCompletedBlockExecutionPayloadAndBlobSidecars(

if (!blobSidecarsTracker.isComplete()) {
LOG.error(
"Tracker for block {} is supposed to be completed but it is not. Missing blob sidecars: {}",
block.toLogString(),
"Tracker for execution payload with block root {} is supposed to be completed but it is not. Missing blob sidecars: {}",
executionPayload.getMessage().getBeaconBlockRoot(),
blobSidecarsTracker.getMissingBlobSidecars().count());
}

Expand Down Expand Up @@ -577,7 +598,7 @@ private BlockBlobSidecarsTracker internalOnNewBlock(
return tracker;
}

private BlockBlobSidecarsTracker internalOnNewBlockAndExecutionPayloadEnvelope(
private BlockBlobSidecarsTracker internalOnNewExecutionPayloadEnvelope(
final SignedBeaconBlock block,
final ExecutionPayloadEnvelope executionPayloadEnvelope,
final Optional<RemoteOrigin> remoteOrigin) {
Expand All @@ -587,13 +608,12 @@ private BlockBlobSidecarsTracker internalOnNewBlockAndExecutionPayloadEnvelope(
getOrCreateBlobSidecarsTracker(
slotAndBlockRoot,
newTracker -> {
newTracker.setBlockAndExecutionPayloadEnvelope(block, executionPayloadEnvelope);
newTracker.setExecutionPayloadEnvelope(block, executionPayloadEnvelope);
countBlock(remoteOrigin);
onFirstSeen(slotAndBlockRoot, remoteOrigin);
},
existingTracker -> {
if (!existingTracker.setBlockAndExecutionPayloadEnvelope(
block, executionPayloadEnvelope)) {
if (!existingTracker.setExecutionPayloadEnvelope(block, executionPayloadEnvelope)) {
// block and execution envelope were already set
countDuplicateBlock(remoteOrigin);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1382,7 +1382,11 @@ public void initExecutionPayloadManager() {
new ExecutionPayloadValidator(spec, recentChainData);
executionPayloadManager =
new ExecutionPayloadManager(
executionPayloadValidator, forkChoice, recentChainData, executionLayer);
executionPayloadValidator,
blockBlobSidecarsTrackersPool,
forkChoice,
recentChainData,
executionLayer);
}

public void initPayloadAttestationManager() {
Expand Down

0 comments on commit 218801b

Please sign in to comment.