Skip to content

Parallelize Soroban #4715

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft

Parallelize Soroban #4715

wants to merge 6 commits into from

Conversation

sisuresh
Copy link
Contributor

@sisuresh sisuresh commented May 7, 2025

Description

Marked as draft because logic to refund after all transactions have been applied is missing. Additional testing and overall safety analysis is also required. There are also some TODOs in the code.

Checklist

  • Reviewed the contributing document
  • Rebased on top of master (no merge commits)
  • Ran clang-format v8.0.0 (via make format or the Visual Studio extension)
  • Compiles
  • Ran all tests
  • If change impacts performance, include supporting evidence per the performance document

@anupsdf anupsdf requested a review from Copilot May 7, 2025 23:00
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces parallel processing for Soroban transactions by adding new preloading and parallel apply functions. Key changes include:

  • New methods in TransactionFrame, FeeBumpTransactionFrame, and various OperationFrame derivatives to support parallel execution.
  • Enhanced exception handling and diagnostic messaging in the parallel apply flows.
  • Updates to ledger and metrics management to support the new parallel execution patterns.

Reviewed Changes

Copilot reviewed 34 out of 34 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/transactions/TransactionFrame.{h,cpp} Added functions for preloading and parallel applying Soroban ops.
src/transactions/RestoreFootprintOpFrame.{h,cpp} Extended RESTORE footprint phase with parallel apply support.
src/transactions/OperationFrame.{h,cpp} Introduced preload and parallel apply helper methods for operations.
src/transactions/InvokeHostFunctionOpFrame.h Added parallel apply overrides for invoking host functions.
src/transactions/FeeBumpTransactionFrame.{h,cpp} Updated fee bump path with parallel apply and enhanced error handling.
src/transactions/ExtendFootprintTTLOpFrame.{h,cpp} Extended TTL update operations with parallel apply support.
src/testdata/ledger-close-meta* Updated test data to adjust lastModifiedLedgerSeq values.
src/main/AppConnector.h Fixed a typographical error in the comments.
src/ledger/SorobanMetrics.{h,cpp} Updated metrics reporting using atomic loads.
src/ledger/LedgerTxn{Impl,}.{h,cpp} Added new methods for handling restored keys from the hot archive.
src/ledger/LedgerManagerImpl.h Added helper functions to support parallel apply stages.

@sisuresh sisuresh force-pushed the par-rebase branch 3 times, most recently from 7402b61 to 602914b Compare May 8, 2025 16:07
@@ -175,6 +175,27 @@ class LedgerManagerImpl : public LedgerManager
AbstractLedgerTxn& ltx,
std::unique_ptr<LedgerCloseMetaFrame> const& ledgerCloseMeta);

UnorderedSet<LedgerKey> getReadWriteKeysForStage(ApplyStage const& stage);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need randomized hashing in the output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean. Are you asking if this could be a set instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output is only used for lookups.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, what I'm trying to confirm is whether the use of rand hasher is intentional here (as opposed to using std::unordered_set). UnorderedSet is a custom data structure we usually use to hash out ordering-related bugs in standard sets/maps (per this issue), but I'm not sure this applies here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The randomization is also a DoS-prevention measure, to prevent people from being able to attack a hashmap with colliding inputs. The tree-based maps don't have this failure mode (but are slower overall).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah no this was not intentional. I think unordered_set is fine to use here, and the DoS angle Graydon mentioned isn't something we need to worry about here.

std::vector<std::thread> threads;
for (size_t i = 0; i < stage.size(); ++i)
{
auto& entryMapByThread = entryMapsByThread.at(i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main thread can invalidate this reference while other threads perform application, so it would be safer to move ownership of the map object to another thread and then back to main. I'd also recommend hardening the code by adding an invariant that only a specific thread id can access the map since the map is not synchronized in any way, so you want to make sure threads aren't actually sharing data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Updated to use a unique_ptr.

{
UnorderedSet<LedgerKey> res;

for (auto const& txs : stage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

terminology is getting tricky! phase, stage, bundle are all starting to sound a bit similar. I wonder if there’s a way to abstract it away via an iterator of sorts if all you care about is iterating over all transactions, for example. We can pass a lambda to the iterator. This way we can try to minimize places in core that need to be aware of the internal structure of a parallel tx set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for context, I noticed there are lots of nested loops in the code making it harder to review, so I wonder if we can create an abstraction for tx sets to avoid bleeding internal details into other subsystems of core.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an iterator to ApplyStage. This gets rid of all but one of the nested loops. I didn't add something similar for the entire phase, but I don't think that's necessary here. Let me know what you think.

std::unique_ptr<TxEffects> mEffects;
};

typedef std::vector<TxBundle> Thread;
Copy link
Contributor Author

@sisuresh sisuresh May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the new types in this file should be moved to a new file.

@@ -38,6 +40,222 @@ using TransactionFrameBasePtr = std::shared_ptr<TransactionFrameBase const>;
using TransactionFrameBaseConstPtr =
std::shared_ptr<TransactionFrameBase const>;

using ModifiedEntryMap = UnorderedMap<LedgerKey, std::optional<LedgerEntry>>;

struct ThreadEntry
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread -> Cluster

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you expand the comment on what this struct is? The name - ThreadEntry - is a bit generic so it's hard to tell how it's supposed to be used.

UnorderedSet<LedgerKey>
LedgerManagerImpl::getReadWriteKeysForStage(ApplyStage const& stage)
{
UnorderedSet<LedgerKey> res;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::unordered_set

LedgerManagerImpl::collectEntries(AppConnector& app, AbstractLedgerTxn& ltx,
Cluster const& cluster)
{
std::unique_ptr<ThreadEntryMap> entryMap =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: auto entryMap

@@ -350,4 +350,5 @@ doubleToClampedUint32(double d)
}
return static_cast<uint32_t>(std::clamp<double>(d, 0, maxUint32));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: accident?

@@ -38,6 +40,222 @@ using TransactionFrameBasePtr = std::shared_ptr<TransactionFrameBase const>;
using TransactionFrameBaseConstPtr =
std::shared_ptr<TransactionFrameBase const>;

using ModifiedEntryMap = UnorderedMap<LedgerKey, std::optional<LedgerEntry>>;

struct ThreadEntry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you expand the comment on what this struct is? The name - ThreadEntry - is a bit generic so it's hard to tell how it's supposed to be used.

DiagnosticEventBuffer& buffer) const
{
releaseAssert(isSoroban());
releaseAssert(mOperations.size() == 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this right? In

, core handles malformed transactions gracefully, and sets the inner code to MALFORMED, whereas here we'd crash.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The malformed check happens at validation, which in the parallel flow is in preParallelApply. This assert is at apply time, which you shouldn't be able to get to with an invalid transaction.

@@ -175,6 +175,27 @@ class LedgerManagerImpl : public LedgerManager
AbstractLedgerTxn& ltx,
std::unique_ptr<LedgerCloseMetaFrame> const& ledgerCloseMeta);

UnorderedSet<LedgerKey> getReadWriteKeysForStage(ApplyStage const& stage);

std::unique_ptr<ThreadEntryMap> collectEntries(AppConnector& app,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no good reason to havegetReadWriteKeysForStage and collectEntries as LedgerManager's member functions. How about a new parallel apply utils namespace where these are free functions?

threadFutures.emplace_back(std::async(
std::launch::async, &LedgerManagerImpl::applyThread, this,
std::ref(app), std::move(entryMapPtr), std::ref(cluster), config,
sorobanConfig, std::ref(ledgerInfo), sorobanBasePrngSeed,
Copy link
Contributor

@marta-lokhova marta-lokhova May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of things are problematic here:

  • std::async may throw and crash the program if the system is unable to spin up a new thread.
  • Passing references to threads this way is quite dangerous - if these variables go out of scope, you'll get UB. Please try to either copy or move the necessary data (e.g. ledgerInfo, cluster, etc). Long-living objects like AppConnector are probably fine, as long we can guarantee that it's destroyed after LedgerManager and any futures/shared state it's holding on to.
  • How does shutdown work here? If core starts shutting down while parallel apply is happening, how do we ensure threads are properly joined before the app starts to destruct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, I'm curious about the choice of std::async here as opposed to a thread pool of sorts based on the underlying hardware specs. It looks like we're not taking into account the hardware here, so I'm wondering if depending on the machine this may cause issues like CPU over-subscription. Also, this might not be an issue, but what's the rough overhead of launching new threads in std::async, since those are kernel threads?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, I'm curious about the choice of std::async here as opposed to a thread pool of sorts based on the underlying hardware specs. It looks like we're not taking into account the hardware here, so I'm wondering if depending on the machine this may cause issues like CPU over-subscription. Also, this might not be an issue, but what's the rough overhead of launching new threads in std::async, since those are kernel threads?

The number of threads is determined by the network setting, which should be set based on what the network wants the minimum spec to be. Wrt launching new threads, it should be in the order of microseconds, so I don't think it makes sense to optimize with a thread pool at the moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::async may throw and crash the program if the system is unable to spin up a new thread.

Isn't this similar to failing on an stl container insert due to being out of memory? In that rare instance, the node will crash, but the rest of the network should be fine.

Passing references to threads this way is quite dangerous

Yeah this is a good point. I'll work on improving this.

How does shutdown work here? If core starts shutting down while parallel apply is happening, how do we ensure threads are properly joined before the app starts to destruct?

We discussed this offline. Shutdown waits for ledger close to finish in both the background ledger close and main thread ledger close scenarios, which means these threads will have to join as well for that to happen. I can add a comment here that mentions that shutdown should work because the futures are not long lived, and we should make sure that doesn't change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, I'm curious about the choice of std::async here as opposed to a thread pool of sorts based on the underlying hardware specs. It looks like we're not taking into account the hardware here, so I'm wondering if depending on the machine this may cause issues like CPU over-subscription. Also, this might not be an issue, but what's the rough overhead of launching new threads in std::async, since those are kernel threads?

The network setting for the number of threads should be set to what we believe the minimum hardware spec should be(mentioned in the CAP. The actual number of cores used is not part of the protocol, so we have some flexibility here if we see the need to tweak this in the future.

The overhead of launching threads should be in the order of microseconds, which I don't think is something we need to optimize at the moment.

std::vector<std::unique_ptr<ThreadEntryMap>> entryMapsByCluster;
for (auto& restoredKeysFutures : threadFutures)
{
auto futureResult = restoredKeysFutures.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a sanity check that future is valid here (if not, calling get is UB)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note that after calling get, shared state isn't destroyed, because we're still storing (now invalid) futures in threadFutures. We may want to explicitly clear it to avoid footguns (thread objects are also stored in that shared state)

}

void
LedgerManagerImpl::applySorobanStage(AppConnector& app, AbstractLedgerTxn& ltx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is doing a lot of things, making it hard to read and navigate. It might be easier to refactor each step into its own function (also please comment for each application step).


threadFutures.emplace_back(std::async(
std::launch::async, &LedgerManagerImpl::applyThread, this,
std::ref(app), std::move(entryMapPtr), std::ref(cluster), config,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::cref just in case (or ideally, a smart pointer to const)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just my 5c: I don't think it's worth wrapping everything into shared ptrs here, as the lifetime is very clear (at least for the most of objects; it could make sense for something externally owned, but stuff like stage/cluster/prng seed etc. is create for the apply path and it's lifetime is managed by a single thread.
cref of course does make sense for constants.

Cluster const& cluster, Config const& config,
SorobanNetworkConfig const& sorobanConfig,
ParallelLedgerInfo const& ledgerInfo, Hash const& sorobanBasePrngSeed,
SorobanMetrics& sorobanMetrics);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the SorobanMetrics methods aren't thread safe. Those methods should assert that the thread is main or ledger close.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatives to this -

  1. Collect metrics within each thread, and update SorobanMetrics at the end.
  2. Mark the non-thread safe member variables as atomic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fairly simple to make this thread-safe. You could also switch to medida counters, those are thread-safe.

results.transactionHash =
txBundle.getTx()->getContentsHash();
results.result = txBundle.getResPayload()->getResult();
if (results.result.result.code() ==
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the code here is duplicated from the non-parallel path below.

@sisuresh
Copy link
Contributor Author

sisuresh commented May 21, 2025

I updated the PR after rebasing with master. The most recent rebase was messy and the PR still has a couple todo's to address from it, and some failing test cases.

@@ -825,6 +825,35 @@ LedgerTxn::Impl::erase(InternalLedgerKey const& key)
}
}

void
LedgerTxn::addRestoredFromHotArchive(LedgerEntry const& ledgerEntry, LedgerEntry const& ttlEntry)
Copy link
Contributor Author

@sisuresh sisuresh May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SirTyson restoreFromHotArchive and restoreFromLiveBucketList can be removed right? If we remove the v23 code from ApplyHelper, these will no longer be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the restore op uses these methods pre-v23.

@sisuresh sisuresh force-pushed the par-rebase branch 2 times, most recently from f6318fd to cbb3acf Compare May 22, 2025 15:27
sisuresh added 2 commits May 22, 2025 18:20
Remove unused json test files
OperationResult& res,
std::optional<RefundableFeeTracker>& refundableFeeTracker,
OperationMetaBuilder& opMeta) const
class ParallelApplyHelper
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The V23 code can be removed from ParallelApplyHelper. We should also de-duplicate some of the shared code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean from ApplyHelper? Or, rather, we need to remove pre-p23 code from this and post-p23 code from ApplyHelper.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah I meant ApplyHelper.

std::unique_ptr<ThreadEntryMap> entryMapByCluster,
Cluster const& cluster, Config const& config,
SorobanNetworkConfig const& sorobanConfig,
std::shared_ptr<ParallelLedgerInfo const> ledgerInfo,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why does this need to be a shared ptr, whereas soroban config is just a reference? I'd rather we passed a reference if the lifetime is clear (and it seems like it is - these threads are short-lived and are joined immediately). Or pass by value if lifetime is of concern (this is a small enough struct).

uint64_t mLedgerInsnsCount{0};
uint64_t mLedgerInsnsExclVmCount{0};
uint64_t mLedgerHostFnExecTimeNsecs{0};
std::atomic<uint64_t> mCounterLedgerTxCount{0};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine for the first iteration to reduce the amount of diffs, but I'd rather avoid synchronization between threads entirely and used map-reduce style approach (compute metrics per thread).

subSeed = subSeedSha.finish();
}
++txNum;
auto txSetStages = phase.getParallelStages();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto const&

@@ -1835,110 +2188,224 @@ LedgerManagerImpl::applyTransactions(

for (auto const& phase : phases)
{
for (auto const& tx : phase)
if (phase.isParallel())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if/else bodies should probably be moved into separate functions.

if (protocolVersionStartsFrom(
ltx.loadHeader().current().ledgerVersion,
ProtocolVersion::V_23))
for (size_t i = 0; i < txSetStages.size(); ++i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the following loop could be range fors

ledgerSeq +
mSorobanConfig.stateArchivalSettings().minPersistentTTL - 1;

for (size_t i = 0; i < footprintKeys.size(); ++i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be range for?

Config const& cfg, InvokeHostFunctionOutput const& output,
HostFunctionMetrics const& metrics, DiagnosticEventManager& buffer) const
{
if (cfg.ENABLE_SOROBAN_DIAGNOSTIC_EVENTS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe return early to reduce nesting?

@@ -62,6 +76,11 @@ class OperationFrame
LedgerTxnEntry loadSourceAccount(AbstractLedgerTxn& ltx,
LedgerTxnHeader const& header) const;

bool preloadEntryHelper(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually use this anywhere?

}

Cluster const&
ApplyStage::getCluster(size_t i) const
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: From what I've noticed from the code, it doesn't seem like this is necessary, given the existence of iterators (usually we only iterate this by index).

res.emplace(lk);
if (isSorobanEntry(lk))
{
res.emplace(getTTLKey(lk));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually care about TTL entries for the purpose of this set? It doesn't seem like we ever actually test TTLs for being present in the set.

@graydon
Copy link
Contributor

graydon commented May 24, 2025

@sisuresh I spent today reading and understanding more of the LedgerManagerImpl changes, specifically applyThead. So far so good! but to make sense of it I also started doing some helper-extraction. I made this commit: graydon@e43974e for example. You're welcome to grab it into your branch if you think it helps readability (I do!) -- it should be a logical no-op, just method extractions and a handful of renamings or code-pattern changes (eg. using destructuring patterns in range for loops in a couple places).

if that's not too objectionable I kinda want to do the same to the other new big blocks in LedgerManagerImpl::applyTransactions and applySorobanStage. WDYT?

(it also occurs to me that most of these methods don't touch LedgerManagerImpl state variables at all and could perhaps be moved to a separate file. LedgerManagerImpl is fairly large at this point..)

@sisuresh
Copy link
Contributor Author

some

Thank you for this! Your change looks great and I'll cherry-pick it. The other big blocks should be split up as well. If that's something you wanted to tackle, that would be great.

@graydon
Copy link
Contributor

graydon commented May 27, 2025

some

Thank you for this! Your change looks great and I'll cherry-pick it. The other big blocks should be split up as well. If that's something you wanted to tackle, that would be great.

@sisuresh did another quick round of this in graydon@8207c92

@@ -2526,7 +2526,8 @@ LedgerManagerImpl::sealLedgerTxnAndTransferEntriesToBucketList(
LiveBucket::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION))
{
std::vector<LedgerKey> restoredKeys;
auto const& restoredKeyMap = ltx.getRestoredHotArchiveKeys();
auto const& restoredKeyMap =
ltxEvictions.getRestoredHotArchiveKeys();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change doesn't actually work because getRestoredHotArchiveKeys only returns mRestoredKeys for the current ltx, not its parents. We should probably recurse up and append the keys instead. @SirTyson What do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants