-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Object Store High Availability #10
base: main
Are you sure you want to change the base?
Conversation
reps/2022-04-21-object_store_HA.md
Outdated
|
||
#### How to solve the object owner failure problem? | ||
|
||
**Global Owner**: A group of special Ray actors with `max_restarts=-1` will be created to own all objects of a job created by `ray.put` and the returned object (either from normal-task or actor-task). When one of the special actors fails, the restarted instance will rebuild the reference table based on information provided by Raylets and other non-special workers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should also work for objects of task returns, not just objects of ray.put
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned that in the description above:
`ray.put` and the returned object (either from normal-task or actor-task)
reps/2022-04-21-object_store_HA.md
Outdated
- RPCs sent by other workers to `G` should be retried until the state of `G's` Actor becomes `Alive`. | ||
- The reply callback of some RPCs will not be invoked until the rebuilding state of `G` is set to `READY`. | ||
|
||
We prefer __Option.2__, and due to the difficulty of implementation, we are more trend to choose the __active__ way to restore `G` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add some notes here for the cons of the passive way.
- Raylets need to subscribe to the actor state notifications in order to know the new RPC address of a restarted actor and reconnect to it. To implement this, we need to add something similar to "actor manager" in core worker to Raylet. This is an additional coding effort.
- Raylets pushing collected information to the restarted actor v.s. the actor pulling information from Raylets is just like push-based v.s. pull-based resource reporting. This is what we've already discussed and we've concluded that pull-based resource reporting is easier to maintain and test. I believe this conclusion also applies to object reference and location reporting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add it to the doc's body.
reps/2022-04-21-object_store_HA.md
Outdated
|
||
#### How to solve the object owner failure problem? | ||
|
||
**Global Owner**: A group of special Ray actors with `max_restarts=-1` will be created to own all objects of a job created by `ray.put` and the returned object (either from normal-task or actor-task). When one of the special actors fails, the restarted instance will rebuild the reference table based on information provided by Raylets and other non-special workers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A group of special Ray actors
I'm interested in what this means. Is it a new type of core worker (it doesn't seem like it makes sense to do this at the application level)? or should this type of logic go in GCS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a new type of CoreWorker(inherited from CoreWorker), add some rebuild logic when it restarts, etc. In short, it will not be much different from the original CoreWorker.
cc @iycheng who has thought a lot about related topics (durable/named objects, ownership transfer to GCS, etc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few high-level comments:
- I think this doc is lacking key applications. The original idea behind ownership was that reference holders would often fate-share with the reference owner anyway, so you would not need any of the enhancements proposed. Not to say that this is always the case, but at least we should have some representative applications. And we should have these to include in the testing plan anyway.
- I consider object borrower failure as more of a one-off bug, and I would suggest moving that discussion to the github issue that you linked.
- For data and metadata loss (problems 1 and 3), we need to consider the positioning of this proposal compared to workflows, which is the current recommendation for durable execution. I'm worried that as is, there are some things that overlap (both handle owner failure) but also some things that are not included in one or the other. For example, using a global owner doesn't address the problem of recovering the application-level state that was at the owner. Ideally, we should have concrete use cases that are not currently well-supported by the workflows API and focus the discussion on those.
- The idea of a global owner makes sense, but I'm worried about the complexity of the options. Anything that requires polling the whole cluster is not feasible, IMHO. The other design option here that is not discussed is ownership transfer to an HA GCS.
|
||
For now, users can only rely on lineage to recover the unavailable object. But lineage has many restriction: | ||
- Can not recover the object which put in object store via `ray.put`. | ||
- Can not recover the object returned by actor task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this is not exactly true (actor tasks can also be retried, but user has to guarantee that they're idempotent).
|
Let me elaborate on the scenarios in case they are not clear enough. Scenario 1: a group of workers generate data and "ray.put" them in the object store. Then another worker collects those "ObjectRef"s and logically composes them as a distributed data structure. In this case, the data-generating workers are the owners. As they've finished their work, we want to kill them and create new workers to do other stuff with the generated distributed data structure. Note, unlike datasets which "return" the data, this case "ray.put" data in the object store. Thus the data is owned together by many workers in the cluster. Scenario 2: imagine we have an actor that works as a KV store for Ray objects. It stores a map from the string keys to object refs. It allows other components in the application to query objects by string keys. In this case, we want to avoid data loss when the actor restarts. For scenario 1, the current workaround is assigning a single actor as the owner for all objects. But the problem is scalability. If we have multiple global owners, we can support this scenario better. Note, ownership transfer should be able to fix this case as well, but seems more complex. For scenario 2, it's not currently possible. and ownership transfer won't help as well. @Catch-Bull you can add a "case study" section to the proposal. |
Regarding checkpointing objects. I think there are 2 key problems that we need to think about:
IMO, # 1 is more important, if we don't support this, users will have to manually reload data from the checkpoint. Our idea to fix this issue is that, when the object becomes unavailable, the worker that calls "ray.get" will reload data from the checkpoint and put it in the object store again, then it becomes the new owner. Note, multiple workers may become owners simultaneously, but they are independent with each other. Re # 2, it's less critical. For short term solution, we can require users to manually clean up checkpoints, or set a TTL. For long term, we can use the metadata recovery mechanism that is mentioned in this REP. We did a discussion, if we have object checkpointing + transparent recovery + manual checkpoint management, that should mostly meet our users' requirements as well. And it should be much simpler to implement than this proposal. @stephanie-wang @ericl any concerns or comments? |
This is an interesting solution. Having multiple owners for an object technically is workable, if a little strange. We will have to think a little about how to treat the memory copies of the object; whether all the in-memory copies of a checkpointed object should be considered secondary copies and eligible for eviction. But otherwise it seems quite simple. Another option is to use the GCS as the owner for checkpointed objects, though this may require more code changes and be more complex in practice.
Agreed it's less critical. For batch jobs, the checkpoint dir can also be deleted at the end of the job. For serving jobs, the checkpoint files can be cleaned up normally if the owner exits properly. So there should be no leaks except in the presence of failures.
I'm in favor of this, since checkpointing is something that has come up from users multiple times before. Good checkpoint support will simplify the design and potentially improve performance of many Ray libraries, including Tune and Workflows. |
I feel this is simple and clean. If in the end, we decide to go this way, could you add some details about how do you plan to support this with the current system? We have object spilling here which has overlap with checkpointing. I'd prefer not to have two systems doing similar work. (push data to storage and load it up later). |
I assume your purpose is to save object store memory by pinning less objects, right? But this seems to complicate the design. My preference is that checkpoints are only used for recovering objects when they are lost. The behavior regarding primary/secondary copies remains unchanged for checkpointed objects. |
yeah, spilling and checkpointing should depend on the same storage backend. See this doc. |
Agreed that not changing the behavior would be the best approach. |
@stephanie-wang do you have any high-level concerns over this direction? If not, we will revise the proposal. |
I am definitely in favor of using a checkpoint-based method instead, so please go ahead and revise the proposal as needed. But one thing I am confused by in the "multiple owner" design is how the usual protocols will change after recovery. Normally, the advantage of having a single owner is that we can centralize all the ref counting updates. It sounds like it could get quite complicated if there are multiple owners. To be concrete, how will the worker that calls Maybe a simpler design to consider is to follow the rule "checkpointed objects have no owner". So anyone who needs the object will restore it as needed, and we'll just rely on LRU eviction to GC in-memory copies. In summary, it seems like we need the following additions to the proposal:
|
I was assuming that the protocols wouldn't change. For example, suppose you have the same checkpointed object O1 restored on two workers, W1, and W2. Now there would be effectively two different "logical references" to the same object in the cluster: (O1, owner=W1), and (O2, owner=W2). If a task is operating on (O2, owner=W2), it would only talk to W2 about the object. If a task somehow got references to both owners at once, it would need to decide which one to talk to, this is the only part I'm not sure about.
This would mean we can't eagerly evict objects though right? |
I think this comes down to what semantics we're providing. If we're providing transparent recovery for current reference holders, then we need to resolve this issue about possibly conflicting owners and also how to rebuild the reference count. But if the checkpoint restore semantics are to provide a completely new ObjectRef, then this isn't an issue. Actually, I'm not sure which case is envisioned for this proposal. Can you clarify this, @Catch-Bull or @raulchen?
We can still eagerly evict once the worker that restored the object is no longer using it. If caching is required, though, we could also have the GCS act as the new owner upon checkpoint restore. I think this is actually the cleanest approach: no ownership transfer needed, and no weird situations with multiple owners. |
I'm trying to think through the multiple owner collision issue. Is it actually complex? If a process arbitrarily picked say, the first owner that registered for an object, it seems things would work. Location updates would go to an arbitrary owner, which is fine, and same for any ref count updates. +1 though on perhaps using GCS as a temporary owner of checkpointed objects, if necessary. |
In theory, I think it could work, but in practice might get complicated. It's similar to the issue we've had before about a single worker deserializing the same ObjectRef twice, and that actually gets to be a bit complex in the implementation... |
My thought is the same as @ericl. The protocol won't change. I think the protocol should be clear to reason about. Implementation wise, we may need some changes. For example, saving the owner address in ObjectRef, and using (object id, owner address) as the key of the reference table. |
Echo Eric/Stephanie's point. I think we should at least explore multi-owner vs GCS owned objects. The trade-off between these two are not clear without detailed explanation. |
@ericl @iycheng @stephanie-wang @scv119 We discussed it internally: Options to implement object HA with checkpointOption 1: Ownerless objectsEvery checkpointed object will be ownerless. When
Option 2: Divided owners on owner failureOnce the owner of a checkpointed object is dead, subsequent access to the object on a worker will make the worker the new owner of the object. The metadata about this object in the reference table of the worker will be rewritten. If multiple workers hold the same object ref and want to access it after owner failure, each worker will become the new owner of the object, independently. i.e. multiple owners after owner failure. If both worker A and B are the new owners of an object and both pass object refs to worker C, worker C only records the owner of the object once. e.g. If worker C receives the object ref from worker A first, then worker C treats worker A as the owner of the object.
Option 3: Global owner(s)We use highly available processes as global owners of checkpointed objects. Such highly available processes can be GCS or a group of named actors with Note that we don't need to restore the reference table in global owners by pulling info from the cluster because objects are already checkpointed. Checkpoint info is stored in the reference table and it will be encoded when serializing an object ref, hence checkpoint info is recoverable. If borrowers detected owner failure, they will try to reconnect to the owner and the recovered owner will recover the reference count and borrower list via these new RPC connections.
We prefer named actors rather than GCS as global owners.
|
Great analysis! My two takeaways:
I'd like to confirm my understanding on Option 3 more. Is the key quality here that since these objects are checkpointed, it's actually OK if the actors fail? Since if the owner actor fails, that just causes another re-load of the object from a checkpoint in the worst case. We just need some temporary coordination to resolve inefficiencies around object transfers. If this understanding is correct them I quite like this proposal (using named actors as global owners). It's re-using existing functionality, has no significant performance tradeoffs, and the ownership semantics are clear. Would be curious if others have the same conclusion. |
Summarizing the conclusions from an offline discussions:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the new design proposal! A few comments:
- Can we provide some specific application examples? Right now the use cases are a bit abstract.
- Can we brainstorm options for the API and semantics?
- Can you clarify what changes are concretely needed for the current protocols?
|
||
We use highly available processes as global owners of checkpointed objects. Such highly available processes can be GCS or a group of named actors with `max_restarts=-1`. We reuse the existing ownership assignment RPCs to assign a checkpointed object to a global owner and encode the immutable info (an `owner_is_gcs` flag or the actor name) about the global owner into the owner address. The process to get an RPC client to the owner needs to be updated to be able to return a working RPC client to the up-to-date IP:port of the owner. | ||
|
||
Note that we don't need to restore the reference table in global owners by pulling info from the cluster because objects are already checkpointed. Checkpoint info is stored in the reference table and it will be encoded when serializing an object ref, hence checkpoint info is recoverable. If borrowers detected owner failure, they will try to reconnect to the owner and the recovered owner will recover the reference count and borrower list via these new RPC connections. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that the checkpoint location will be stored with the serialized ObjectRef? It makes sense, but if that is the idea, could you make that more explicit somewhere in the design? For example, how does the protocol for retrieving the object value change when using HA?
I'm also wondering if we can try a version where the application has to manually free the object. Then we don't need to worry about keeping track of the ref count at all. Would this meet your use cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we will store checkpoint_uri into ObjectRef, protocol is here: #10 (comment). I will add to the REP later.
yes, In our prototype test, we do not recover the reference count after the owner restart, because the HA Object doesn't have a primary copy, and users need to release the checkpoint by themself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current prototype, refs caused by borrowers are still recorded. We didn't simplify this. But if the global owner is recovered from failure, the reference count will become zero and direct borrowers will not re-report borrow info to the new owner process. In this case, the owner will not GC the object because it knows that the object is HA.
We did think about bypassing reference recording for HA objects. But for the simplicity of the prototype, we didn't implement this.
) | ||
|
||
# put a HA object. the default value of `enable_ha` is False. | ||
ray.put(value, enable_ha=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are use cases for specifying the exact actor that should own the object. In workflows, for example, this could be the WorkflowManagementActor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we also have an _owner=
parameter, maybe we can reuse that one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is not very clear on the REP, the previous consideration was to hope that this interface should be as simple as possible, user only needs to config GlobalOwner
on JobConfig
(like the number of GlobalOwner
):
ray.init(job_config=JobConfig(global_owner={"number": 16}))
ray.put("test", enable_ha=True)
Maybe we can provide an interface to initialize GlobalOwner
:
ray.init()
### creat owner [o1, o2, o3]
...
###
# or ray.init_global_owner(number=16)
ray.init_global_owner(owners=[o1, o2, o3])
ray.put("test", enable_ha=True)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @kfstorm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are use cases for specifying the exact actor that should own the object. In workflows, for example, this could be the WorkflowManagementActor.
Is there any reason that dummy actors as global owners don't work in workflows?
|
||
We use highly available processes as global owners of checkpointed objects. Such highly available processes can be GCS or a group of named actors with `max_restarts=-1`. We reuse the existing ownership assignment RPCs to assign a checkpointed object to a global owner and encode the immutable info (an `owner_is_gcs` flag or the actor name) about the global owner into the owner address. The process to get an RPC client to the owner needs to be updated to be able to return a working RPC client to the up-to-date IP:port of the owner. | ||
|
||
Note that we don't need to restore the reference table in global owners by pulling info from the cluster because objects are already checkpointed. Checkpoint info is stored in the reference table and it will be encoded when serializing an object ref, hence checkpoint info is recoverable. If borrowers detected owner failure, they will try to reconnect to the owner and the recovered owner will recover the reference count and borrower list via these new RPC connections. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah it's also not clear to me how exactly the recovery protocol work. it's be nice to have an explicit section explain the recovery protocol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current design is quite simple:
- Try to get the object locations from the
owner
, and then pull data directly from one of those locations - If 1 fails, try to restore the data from the checkpoint. The current implementation of the prototype is to directly use an
io_worker
to read data from the local disk, which is the same as the previous restore the spilled data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation of the prototype is to directly use an io_worker to read data from the local disk
to be clear, the test cases we added in the prototype use local disks. In real environments, we should use remote distributed storage.
We implement object HA based on the checkpoint, so we can walk around **Problem 3: Loss of All Copies**, | ||
previously discussed options: https://github.com/ray-project/enhancements/pull/10#issuecomment-1127719640 | ||
|
||
We use highly available processes as global owners of checkpointed objects. Such highly available processes can be GCS or a group of named actors with `max_restarts=-1`. We reuse the existing ownership assignment RPCs to assign a checkpointed object to a global owner and encode the immutable info (an `owner_is_gcs` flag or the actor name) about the global owner into the owner address. The process to get an RPC client to the owner needs to be updated to be able to return a working RPC client to the up-to-date IP:port of the owner. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need to use high available storage instead of local disk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Object Store HA
hope that if a single node fails, the data will not be lost, so it still needs to rely on external storage, like s3
|
||
We use highly available processes as global owners of checkpointed objects. Such highly available processes can be GCS or a group of named actors with `max_restarts=-1`. We reuse the existing ownership assignment RPCs to assign a checkpointed object to a global owner and encode the immutable info (an `owner_is_gcs` flag or the actor name) about the global owner into the owner address. The process to get an RPC client to the owner needs to be updated to be able to return a working RPC client to the up-to-date IP:port of the owner. | ||
|
||
Note that we don't need to restore the reference table in global owners by pulling info from the cluster because objects are already checkpointed. Checkpoint info is stored in the reference table and it will be encoded when serializing an object ref, hence checkpoint info is recoverable. If borrowers detected owner failure, they will try to reconnect to the owner and the recovered owner will recover the reference count and borrower list via these new RPC connections. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does checkpointed object storage get GCed? Both under normal case and under failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GC of the checkpoint is handed over to the user to be responsible
We can add some strategies to help user management, such as deleting all checkpoints at the end of the job, if the cluster fails unexpectedly, we can only rely on external storage for its own life cycle management.
What are the plans for implementing this feature? |
we will give the plans after this REP gets merged. |
Object High Availability prototype in here: ray-project/ray#27541 |
No description provided.