Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Object Store High Availability #10

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open

Object Store High Availability #10

wants to merge 11 commits into from

Conversation

Catch-Bull
Copy link

No description provided.

@Catch-Bull Catch-Bull marked this pull request as draft April 27, 2022 18:14

#### How to solve the object owner failure problem?

**Global Owner**: A group of special Ray actors with `max_restarts=-1` will be created to own all objects of a job created by `ray.put` and the returned object (either from normal-task or actor-task). When one of the special actors fails, the restarted instance will rebuild the reference table based on information provided by Raylets and other non-special workers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also work for objects of task returns, not just objects of ray.put.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned that in the description above:

`ray.put` and the returned object (either from normal-task or actor-task)

- RPCs sent by other workers to `G` should be retried until the state of `G's` Actor becomes `Alive`.
- The reply callback of some RPCs will not be invoked until the rebuilding state of `G` is set to `READY`.

We prefer __Option.2__, and due to the difficulty of implementation, we are more trend to choose the __active__ way to restore `G`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add some notes here for the cons of the passive way.

  • Raylets need to subscribe to the actor state notifications in order to know the new RPC address of a restarted actor and reconnect to it. To implement this, we need to add something similar to "actor manager" in core worker to Raylet. This is an additional coding effort.
  • Raylets pushing collected information to the restarted actor v.s. the actor pulling information from Raylets is just like push-based v.s. pull-based resource reporting. This is what we've already discussed and we've concluded that pull-based resource reporting is easier to maintain and test. I believe this conclusion also applies to object reference and location reporting.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add it to the doc's body.

reps/2022-04-21-object_store_HA.md Outdated Show resolved Hide resolved
reps/2022-04-21-object_store_HA.md Outdated Show resolved Hide resolved
@Catch-Bull Catch-Bull marked this pull request as ready for review April 28, 2022 16:05

#### How to solve the object owner failure problem?

**Global Owner**: A group of special Ray actors with `max_restarts=-1` will be created to own all objects of a job created by `ray.put` and the returned object (either from normal-task or actor-task). When one of the special actors fails, the restarted instance will rebuild the reference table based on information provided by Raylets and other non-special workers.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A group of special Ray actors

I'm interested in what this means. Is it a new type of core worker (it doesn't seem like it makes sense to do this at the application level)? or should this type of logic go in GCS?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a new type of CoreWorker(inherited from CoreWorker), add some rebuild logic when it restarts, etc. In short, it will not be much different from the original CoreWorker.

@wuisawesome
Copy link

cc @iycheng who has thought a lot about related topics (durable/named objects, ownership transfer to GCS, etc)

@Catch-Bull Catch-Bull requested a review from fishbone April 29, 2022 03:37
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few high-level comments:

  1. I think this doc is lacking key applications. The original idea behind ownership was that reference holders would often fate-share with the reference owner anyway, so you would not need any of the enhancements proposed. Not to say that this is always the case, but at least we should have some representative applications. And we should have these to include in the testing plan anyway.
  2. I consider object borrower failure as more of a one-off bug, and I would suggest moving that discussion to the github issue that you linked.
  3. For data and metadata loss (problems 1 and 3), we need to consider the positioning of this proposal compared to workflows, which is the current recommendation for durable execution. I'm worried that as is, there are some things that overlap (both handle owner failure) but also some things that are not included in one or the other. For example, using a global owner doesn't address the problem of recovering the application-level state that was at the owner. Ideally, we should have concrete use cases that are not currently well-supported by the workflows API and focus the discussion on those.
  4. The idea of a global owner makes sense, but I'm worried about the complexity of the options. Anything that requires polling the whole cluster is not feasible, IMHO. The other design option here that is not discussed is ownership transfer to an HA GCS.


For now, users can only rely on lineage to recover the unavailable object. But lineage has many restriction:
- Can not recover the object which put in object store via `ray.put`.
- Can not recover the object returned by actor task.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is not exactly true (actor tasks can also be retried, but user has to guarantee that they're idempotent).

@Catch-Bull Catch-Bull requested a review from stephanie-wang May 9, 2022 12:22
@Catch-Bull
Copy link
Author

A few high-level comments:

  1. I think this doc is lacking key applications. The original idea behind ownership was that reference holders would often fate-share with the reference owner anyway, so you would not need any of the enhancements proposed. Not to say that this is always the case, but at least we should have some representative applications. And we should have these to include in the testing plan anyway.
  2. I consider object borrower failure as more of a one-off bug, and I would suggest moving that discussion to the github issue that you linked.
  3. For data and metadata loss (problems 1 and 3), we need to consider the positioning of this proposal compared to workflows, which is the current recommendation for durable execution. I'm worried that as is, there are some things that overlap (both handle owner failure) but also some things that are not included in one or the other. For example, using a global owner doesn't address the problem of recovering the application-level state that was at the owner. Ideally, we should have concrete use cases that are not currently well-supported by the workflows API and focus the discussion on those.
  4. The idea of a global owner makes sense, but I'm worried about the complexity of the options. Anything that requires polling the whole cluster is not feasible, IMHO. The other design option here that is not discussed is ownership transfer to an HA GCS.

@stephanie-wang

  • About 1:
    Our main scenarios are as follows:

    1. there is a logically distributed data interface based on multiple ray.ObjectRef, such as mars.DataFrame, will be unavailable when any of the ray.ObjectRefs is out of scope.
    2. NamedObject, Access one or multiple objects via name(a string).
  • About 2:
    Our proposal is not a bug-fix, we will modify some designs of the Object Store to solve it.

  • About 3:
    Our main scene can be seen in 1. The point is to ensure the availability of ObjectRef, so that ray can take on more responsibilities and reduce the burden on users.

  • About 4:
    If traversing the cluster is not acceptable, we propose to implement Object Store HA based on checkpoints
    Our proposal:

    • Store checkpoint URI in ObjectRef.
    • about the owner:
      • option 1: Only one owner in the global. The first worker to access the checkpoint after the owner dies is the owner.
      • option 2: Multiple owners, the worker which can not get data from Object Store and directly load checkpoint will own the object.

    Notes:
    - Simple to implement.
    - User to manage the lifecycle of checkpoints. We just need to ensure that users can get data through ObjectRef.

@raulchen
Copy link

raulchen commented May 10, 2022

Let me elaborate on the scenarios in case they are not clear enough.

Scenario 1: a group of workers generate data and "ray.put" them in the object store. Then another worker collects those "ObjectRef"s and logically composes them as a distributed data structure. In this case, the data-generating workers are the owners. As they've finished their work, we want to kill them and create new workers to do other stuff with the generated distributed data structure. Note, unlike datasets which "return" the data, this case "ray.put" data in the object store. Thus the data is owned together by many workers in the cluster.

Scenario 2: imagine we have an actor that works as a KV store for Ray objects. It stores a map from the string keys to object refs. It allows other components in the application to query objects by string keys. In this case, we want to avoid data loss when the actor restarts.

For scenario 1, the current workaround is assigning a single actor as the owner for all objects. But the problem is scalability. If we have multiple global owners, we can support this scenario better. Note, ownership transfer should be able to fix this case as well, but seems more complex.

For scenario 2, it's not currently possible. and ownership transfer won't help as well.

@Catch-Bull you can add a "case study" section to the proposal.

@raulchen
Copy link

Regarding checkpointing objects. I think there are 2 key problems that we need to think about:

  1. Can failure recovery be transparent to the users?
  2. Can we automatically garbage-collect the checkpoint data?

IMO, # 1 is more important, if we don't support this, users will have to manually reload data from the checkpoint. Our idea to fix this issue is that, when the object becomes unavailable, the worker that calls "ray.get" will reload data from the checkpoint and put it in the object store again, then it becomes the new owner. Note, multiple workers may become owners simultaneously, but they are independent with each other.

Re # 2, it's less critical. For short term solution, we can require users to manually clean up checkpoints, or set a TTL. For long term, we can use the metadata recovery mechanism that is mentioned in this REP.

We did a discussion, if we have object checkpointing + transparent recovery + manual checkpoint management, that should mostly meet our users' requirements as well. And it should be much simpler to implement than this proposal.

@stephanie-wang @ericl any concerns or comments?

@ericl
Copy link
Contributor

ericl commented May 10, 2022

IMO, # 1 is more important, if we don't support this, users will have to manually reload data from the checkpoint. Our idea to fix this issue is that, when the object becomes unavailable, the worker that calls "ray.get" will reload data from the checkpoint and put it in the object store again, then it becomes the new owner. Note, multiple workers may become owners simultaneously, but they are independent with each other.

This is an interesting solution. Having multiple owners for an object technically is workable, if a little strange. We will have to think a little about how to treat the memory copies of the object; whether all the in-memory copies of a checkpointed object should be considered secondary copies and eligible for eviction. But otherwise it seems quite simple.

Another option is to use the GCS as the owner for checkpointed objects, though this may require more code changes and be more complex in practice.

Re # 2, it's less critical. For short term solution, we can require users to manually clean up checkpoints, or set a TTL. For long term, we can use the metadata recovery mechanism that is mentioned in this REP.

Agreed it's less critical. For batch jobs, the checkpoint dir can also be deleted at the end of the job. For serving jobs, the checkpoint files can be cleaned up normally if the owner exits properly. So there should be no leaks except in the presence of failures.

We did a discussion, if we have object checkpointing + transparent recovery + manual checkpoint management, that should mostly meet our users' requirements as well. And it should be much simpler to implement than this proposal.

I'm in favor of this, since checkpointing is something that has come up from users multiple times before. Good checkpoint support will simplify the design and potentially improve performance of many Ray libraries, including Tune and Workflows.

@fishbone
Copy link
Contributor

IMO, # 1 is more important, if we don't support this, users will have to manually reload data from the checkpoint. Our idea to fix this issue is that, when the object becomes unavailable, the worker that calls "ray.get" will reload data from the checkpoint and put it in the object store again, then it becomes the new owner. Note, multiple workers may become owners simultaneously, but they are independent with each other.

I feel this is simple and clean. If in the end, we decide to go this way, could you add some details about how do you plan to support this with the current system? We have object spilling here which has overlap with checkpointing. I'd prefer not to have two systems doing similar work. (push data to storage and load it up later).

@raulchen
Copy link

raulchen commented May 10, 2022

We will have to think a little about how to treat the memory copies of the object; whether all the in-memory copies of a checkpointed object should be considered secondary copies and eligible for eviction. But otherwise it seems quite simple.

I assume your purpose is to save object store memory by pinning less objects, right? But this seems to complicate the design. My preference is that checkpoints are only used for recovering objects when they are lost. The behavior regarding primary/secondary copies remains unchanged for checkpointed objects.

@raulchen
Copy link

I feel this is simple and clean. If in the end, we decide to go this way, could you add some details about how do you plan to support this with the current system? We have object spilling here which has overlap with checkpointing. I'd prefer not to have two systems doing similar work. (push data to storage and load it up later).

yeah, spilling and checkpointing should depend on the same storage backend. See this doc.

@stephanie-wang stephanie-wang self-assigned this May 10, 2022
@ericl
Copy link
Contributor

ericl commented May 10, 2022

I assume your purpose is to save object store memory by pinning less objects, right? But this seems to complicate the design. My preference is that checkpoints are only used for recovering objects when they are lost. The behavior regarding primary/secondary copies remains unchanged for checkpointed objects.

Agreed that not changing the behavior would be the best approach.

@raulchen
Copy link

We did a discussion, if we have object checkpointing + transparent recovery + manual checkpoint management, that should mostly meet our users' requirements as well. And it should be much simpler to implement than this proposal.

@stephanie-wang do you have any high-level concerns over this direction? If not, we will revise the proposal.

@stephanie-wang
Copy link
Contributor

We did a discussion, if we have object checkpointing + transparent recovery + manual checkpoint management, that should mostly meet our users' requirements as well. And it should be much simpler to implement than this proposal.

@stephanie-wang do you have any high-level concerns over this direction? If not, we will revise the proposal.

I am definitely in favor of using a checkpoint-based method instead, so please go ahead and revise the proposal as needed.

But one thing I am confused by in the "multiple owner" design is how the usual protocols will change after recovery. Normally, the advantage of having a single owner is that we can centralize all the ref counting updates. It sounds like it could get quite complicated if there are multiple owners. To be concrete, how will the worker that calls ray.get rebuild the reference count, and how will references created after that be accounted for?

Maybe a simpler design to consider is to follow the rule "checkpointed objects have no owner". So anyone who needs the object will restore it as needed, and we'll just rely on LRU eviction to GC in-memory copies.

In summary, it seems like we need the following additions to the proposal:

  1. Application case descriptions and semantics required by the application
  2. Positioning compared to checkpointing and workflows
  3. Description of alternatives (ownership transfer, rebuilding owner state as originally proposed etc) and why we're choosing not to implement these right now
  4. Deeper discussion of how the multi-ownership proposal will work during and after recovery.

@ericl
Copy link
Contributor

ericl commented May 11, 2022

But one thing I am confused by in the "multiple owner" design is how the usual protocols will change after recovery. Normally, the advantage of having a single owner is that we can centralize all the ref counting updates. It sounds like it could get quite complicated if there are multiple owners. To be concrete, how will the worker that calls ray.get rebuild the reference count, and how will references created after that be accounted for?

I was assuming that the protocols wouldn't change. For example, suppose you have the same checkpointed object O1 restored on two workers, W1, and W2. Now there would be effectively two different "logical references" to the same object in the cluster: (O1, owner=W1), and (O2, owner=W2).

If a task is operating on (O2, owner=W2), it would only talk to W2 about the object.

If a task somehow got references to both owners at once, it would need to decide which one to talk to, this is the only part I'm not sure about.

rely on LRU eviction to GC in-memory copies

This would mean we can't eagerly evict objects though right?

@stephanie-wang
Copy link
Contributor

But one thing I am confused by in the "multiple owner" design is how the usual protocols will change after recovery. Normally, the advantage of having a single owner is that we can centralize all the ref counting updates. It sounds like it could get quite complicated if there are multiple owners. To be concrete, how will the worker that calls ray.get rebuild the reference count, and how will references created after that be accounted for?

I was assuming that the protocols wouldn't change. For example, suppose you have the same checkpointed object O1 restored on two workers, W1, and W2. Now there would be effectively two different "logical references" to the same object in the cluster: (O1, owner=W1), and (O2, owner=W2).

If a task is operating on (O2, owner=W2), it would only talk to W2 about the object.

If a task somehow got references to both owners at once, it would need to decide which one to talk to, this is the only part I'm not sure about.

I think this comes down to what semantics we're providing. If we're providing transparent recovery for current reference holders, then we need to resolve this issue about possibly conflicting owners and also how to rebuild the reference count. But if the checkpoint restore semantics are to provide a completely new ObjectRef, then this isn't an issue. Actually, I'm not sure which case is envisioned for this proposal. Can you clarify this, @Catch-Bull or @raulchen?

rely on LRU eviction to GC in-memory copies

This would mean we can't eagerly evict objects though right?

We can still eagerly evict once the worker that restored the object is no longer using it. If caching is required, though, we could also have the GCS act as the new owner upon checkpoint restore. I think this is actually the cleanest approach: no ownership transfer needed, and no weird situations with multiple owners.

@ericl
Copy link
Contributor

ericl commented May 11, 2022

I'm trying to think through the multiple owner collision issue. Is it actually complex? If a process arbitrarily picked say, the first owner that registered for an object, it seems things would work. Location updates would go to an arbitrary owner, which is fine, and same for any ref count updates.

+1 though on perhaps using GCS as a temporary owner of checkpointed objects, if necessary.

@stephanie-wang
Copy link
Contributor

I'm trying to think through the multiple owner collision issue. Is it actually complex? If a process arbitrarily picked say, the first owner that registered for an object, it seems things would work. Location updates would go to an arbitrary owner, which is fine, and same for any ref count updates.

In theory, I think it could work, but in practice might get complicated. It's similar to the issue we've had before about a single worker deserializing the same ObjectRef twice, and that actually gets to be a bit complex in the implementation...

@raulchen
Copy link

raulchen commented May 13, 2022

My thought is the same as @ericl. The protocol won't change.
Say object O's original owner is W0, and 2 workers W1 and W2 both hold a reference.
When W1 and W2 call "ray.get(O)" and find the object is lost, they will recover the data and change the owner of their local object ref to themselves (note, this process happens independently on each worker). Then original object becomes 2 different objects, (O, W1) and (O, W2).
After that, everything stays the same. W1 is the owner of (O, W1), and W2 is the owner of (O, W2). They are independent with each other. So we don't have to think about "multiple owners“.

I think the protocol should be clear to reason about. Implementation wise, we may need some changes. For example, saving the owner address in ObjectRef, and using (object id, owner address) as the key of the reference table.

@scv119
Copy link
Contributor

scv119 commented May 16, 2022

Another option is to use the GCS as the owner for checkpointed objects, though this may require more code changes and be more complex in practice.

Echo Eric/Stephanie's point. I think we should at least explore multi-owner vs GCS owned objects. The trade-off between these two are not clear without detailed explanation.

@Catch-Bull
Copy link
Author

@ericl @iycheng @stephanie-wang @scv119 We discussed it internally:

Options to implement object HA with checkpoint

Option 1: Ownerless objects

Every checkpointed object will be ownerless. When ray.get on a checkpointed object and the object data is not local, data will be loaded from checkpoint and stored in local object store as a secondary copy.

  • Pros
    • The logic/protocol should be relatively straightforward.
  • Cons
    • No pulling/pushing object data between nodes.
      • High IO pressure on external storage.
      • External storage bottleneck.
      • Bad loading performance.
    • Existing code is purely ownership-based. Need to add many if-elses to support ownerless objects. (e.g. location pubsub. GC.)
      • High dev cost.
      • Maintenance burden.
    • Some features aren't easy to support. e.g. locality-aware scheduling.
      • Missing features.

Option 2: Divided owners on owner failure

Once the owner of a checkpointed object is dead, subsequent access to the object on a worker will make the worker the new owner of the object. The metadata about this object in the reference table of the worker will be rewritten. If multiple workers hold the same object ref and want to access it after owner failure, each worker will become the new owner of the object, independently. i.e. multiple owners after owner failure.

If both worker A and B are the new owners of an object and both pass object refs to worker C, worker C only records the owner of the object once. e.g. If worker C receives the object ref from worker A first, then worker C treats worker A as the owner of the object.

  • Pros
    • The ownership protocol is still distributed. Critical updates to the owner reference table are still in-process.
      • Good performance on metadata updates.
  • Cons
    • Owner collision. e.g. Raylet also stores and uses owner addresses and communicates with owners. How do we update the owner info in Raylet, especially if two workers on the same node claim the new owner of an object?
      • High dev cost to sync new owner info to Raylet.
      • High dev cost to maintain N owners of an object in Raylet.
    • Primary copy collision. Imaging multiple workers on the same node call ray.get on an object.
      • If we allow only one copy loaded into the local object store, we need to update the GC strategy to only unpin the primary copy if all owners say the object is out-of-scope.
        • High dev cost.
      • If we store multiple copies (i.e. one copy per owner), we need to update object manager and plasma store by changing the object key from object ID to object ID + owner address.
        • High dev cost.
        • High memory pressure to object store.
        • High IO pressure on external storage.
        • External storage bottleneck.
        • Bad loading performance.
    • No pulling/pushing object data between nodes.
      • High IO pressure on external storage.
      • External storage bottleneck.
      • Bad loading performance.
    • Corner case handling such as RPC failures.
      • Potentially high dev cost.

Option 3: Global owner(s)

We use highly available processes as global owners of checkpointed objects. Such highly available processes can be GCS or a group of named actors with max_restarts=-1. We reuse the existing ownership assignment RPCs to assign a checkpointed object to a global owner and encode the immutable info (an owner_is_gcs flag or the actor name) about the global owner into the owner address. The process to get an RPC client to the owner needs to be updated to be able to return a working RPC client to the up-to-date IP:port of the owner.

Note that we don't need to restore the reference table in global owners by pulling info from the cluster because objects are already checkpointed. Checkpoint info is stored in the reference table and it will be encoded when serializing an object ref, hence checkpoint info is recoverable. If borrowers detected owner failure, they will try to reconnect to the owner and the recovered owner will recover the reference count and borrower list via these new RPC connections.

  • Pros
    • No major protocol changes compared to the existing ownership assignment protocol.
      • Low dev cost.
    • No owner address updates because the owner_is_gcs flag or the actor name is encoded in it.
      • Low dev cost.
  • Cons
    • Centralized/semi-centralized architecture.
      • Potentially bottleneck.
      • Bad performance.
    • Corner case handling such as RPC failures.
      • Potentially high dev cost.

We prefer named actors rather than GCS as global owners.

  • The number of global owners is configurable, hence scalable.
  • No need to embed (part of) core worker code into GCS.
  • No increased complexity for GCS.

cc @raulchen @kfstorm

@ericl
Copy link
Contributor

ericl commented May 16, 2022

Great analysis! My two takeaways:

  • Option 2 is perhaps much more complicated than may appear (as @stephanie-wang has mentioned earlier).
  • Option 3 might be doable with named actors.

I'd like to confirm my understanding on Option 3 more. Is the key quality here that since these objects are checkpointed, it's actually OK if the actors fail? Since if the owner actor fails, that just causes another re-load of the object from a checkpoint in the worst case. We just need some temporary coordination to resolve inefficiencies around object transfers.

If this understanding is correct them I quite like this proposal (using named actors as global owners). It's re-using existing functionality, has no significant performance tradeoffs, and the ownership semantics are clear. Would be curious if others have the same conclusion.

@raulchen
Copy link

Summarizing the conclusions from an offline discussions:

  • Among the above 3 options, option 3 is the simplest, in terms of both the protocol and implementation. And using actors as the global owners is more preferable than GCS.
  • @kfstorm and @Catch-Bull are working on a prototype based on option 3, which will demo the basic functionalities of checkpointing an object and recovering an object from the checkpoint.
  • This REP doc will be revised entirely, as the design has changed a lot from what it originally was.

@zhe-thoughts
Copy link
Collaborator

This is a great conclusion. Thanks @raulchen . Overall I think we should be using prototypes more for REPs. cc @ericl and @scv119 for prototyping future REPs from Anyscale Ray team as well

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new design proposal! A few comments:

  1. Can we provide some specific application examples? Right now the use cases are a bit abstract.
  2. Can we brainstorm options for the API and semantics?
  3. Can you clarify what changes are concretely needed for the current protocols?


We use highly available processes as global owners of checkpointed objects. Such highly available processes can be GCS or a group of named actors with `max_restarts=-1`. We reuse the existing ownership assignment RPCs to assign a checkpointed object to a global owner and encode the immutable info (an `owner_is_gcs` flag or the actor name) about the global owner into the owner address. The process to get an RPC client to the owner needs to be updated to be able to return a working RPC client to the up-to-date IP:port of the owner.

Note that we don't need to restore the reference table in global owners by pulling info from the cluster because objects are already checkpointed. Checkpoint info is stored in the reference table and it will be encoded when serializing an object ref, hence checkpoint info is recoverable. If borrowers detected owner failure, they will try to reconnect to the owner and the recovered owner will recover the reference count and borrower list via these new RPC connections.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that the checkpoint location will be stored with the serialized ObjectRef? It makes sense, but if that is the idea, could you make that more explicit somewhere in the design? For example, how does the protocol for retrieving the object value change when using HA?

I'm also wondering if we can try a version where the application has to manually free the object. Then we don't need to worry about keeping track of the ref count at all. Would this meet your use cases?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we will store checkpoint_uri into ObjectRef, protocol is here: #10 (comment). I will add to the REP later.

yes, In our prototype test, we do not recover the reference count after the owner restart, because the HA Object doesn't have a primary copy, and users need to release the checkpoint by themself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current prototype, refs caused by borrowers are still recorded. We didn't simplify this. But if the global owner is recovered from failure, the reference count will become zero and direct borrowers will not re-report borrow info to the new owner process. In this case, the owner will not GC the object because it knows that the object is HA.

We did think about bypassing reference recording for HA objects. But for the simplicity of the prototype, we didn't implement this.

)

# put a HA object. the default value of `enable_ha` is False.
ray.put(value, enable_ha=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are use cases for specifying the exact actor that should own the object. In workflows, for example, this could be the WorkflowManagementActor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also have an _owner= parameter, maybe we can reuse that one.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is not very clear on the REP, the previous consideration was to hope that this interface should be as simple as possible, user only needs to config GlobalOwner on JobConfig (like the number of GlobalOwner):

ray.init(job_config=JobConfig(global_owner={"number": 16}))
ray.put("test", enable_ha=True)

Maybe we can provide an interface to initialize GlobalOwner:

ray.init()
### creat owner [o1, o2, o3]
...
###
# or ray.init_global_owner(number=16)
ray.init_global_owner(owners=[o1, o2, o3])
ray.put("test", enable_ha=True)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are use cases for specifying the exact actor that should own the object. In workflows, for example, this could be the WorkflowManagementActor.

Is there any reason that dummy actors as global owners don't work in workflows?


We use highly available processes as global owners of checkpointed objects. Such highly available processes can be GCS or a group of named actors with `max_restarts=-1`. We reuse the existing ownership assignment RPCs to assign a checkpointed object to a global owner and encode the immutable info (an `owner_is_gcs` flag or the actor name) about the global owner into the owner address. The process to get an RPC client to the owner needs to be updated to be able to return a working RPC client to the up-to-date IP:port of the owner.

Note that we don't need to restore the reference table in global owners by pulling info from the cluster because objects are already checkpointed. Checkpoint info is stored in the reference table and it will be encoded when serializing an object ref, hence checkpoint info is recoverable. If borrowers detected owner failure, they will try to reconnect to the owner and the recovered owner will recover the reference count and borrower list via these new RPC connections.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's also not clear to me how exactly the recovery protocol work. it's be nice to have an explicit section explain the recovery protocol.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current design is quite simple:

  1. Try to get the object locations from the owner, and then pull data directly from one of those locations
  2. If 1 fails, try to restore the data from the checkpoint. The current implementation of the prototype is to directly use an io_worker to read data from the local disk, which is the same as the previous restore the spilled data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation of the prototype is to directly use an io_worker to read data from the local disk

to be clear, the test cases we added in the prototype use local disks. In real environments, we should use remote distributed storage.

We implement object HA based on the checkpoint, so we can walk around **Problem 3: Loss of All Copies**,
previously discussed options: https://github.com/ray-project/enhancements/pull/10#issuecomment-1127719640

We use highly available processes as global owners of checkpointed objects. Such highly available processes can be GCS or a group of named actors with `max_restarts=-1`. We reuse the existing ownership assignment RPCs to assign a checkpointed object to a global owner and encode the immutable info (an `owner_is_gcs` flag or the actor name) about the global owner into the owner address. The process to get an RPC client to the owner needs to be updated to be able to return a working RPC client to the up-to-date IP:port of the owner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to use high available storage instead of local disk?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Object Store HA hope that if a single node fails, the data will not be lost, so it still needs to rely on external storage, like s3


We use highly available processes as global owners of checkpointed objects. Such highly available processes can be GCS or a group of named actors with `max_restarts=-1`. We reuse the existing ownership assignment RPCs to assign a checkpointed object to a global owner and encode the immutable info (an `owner_is_gcs` flag or the actor name) about the global owner into the owner address. The process to get an RPC client to the owner needs to be updated to be able to return a working RPC client to the up-to-date IP:port of the owner.

Note that we don't need to restore the reference table in global owners by pulling info from the cluster because objects are already checkpointed. Checkpoint info is stored in the reference table and it will be encoded when serializing an object ref, hence checkpoint info is recoverable. If borrowers detected owner failure, they will try to reconnect to the owner and the recovered owner will recover the reference count and borrower list via these new RPC connections.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does checkpointed object storage get GCed? Both under normal case and under failure?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GC of the checkpoint is handed over to the user to be responsible

We can add some strategies to help user management, such as deleting all checkpoints at the end of the job, if the cluster fails unexpectedly, we can only rely on external storage for its own life cycle management.

@MissiontoMars
Copy link

What are the plans for implementing this feature?

@Catch-Bull
Copy link
Author

What are the plans for implementing this feature?

we will give the plans after this REP gets merged.

@Catch-Bull
Copy link
Author

Object High Availability prototype in here: ray-project/ray#27541

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.