Skip to content

Conversation

@rfilgueiras
Copy link

@rfilgueiras rfilgueiras commented Oct 1, 2025

Description

This PR introduces a new component for NVFlare called Tensor Stream, which optimizes the transfer of model tensors between server and clients by streaming them separately from the main task payload.

Instead of embedding tensors directly into task data/results, Tensor Stream serializes and streams each tensor individually using safetensors, ensuring efficient and scalable communication. On the receiving side, tensors are deserialized, stored in memory, and reinjected back into the task payload to preserve the original structure.

Server → Clients (Task Data)

  • Server serializes and streams all model tensors to clients.
  • Tensors are removed from the task data before transmission.
  • Clients receive and deserialize streamed tensors, then add them back into the task data.
server-side

Clients → Server (Task Results)

  • Clients serialize and stream model parameter tensors to the server.
  • Tensors are removed from the task results before transmission.
  • Server receives and deserializes streamed tensors, then reinjects them into the task results.
client-side

This approach replaces the standard tensor transfer in NVFlare, avoiding unnecessary copies and enabling more efficient handling of large model states.

Types of changes

  • Non-breaking change (fix or new feature that would not break existing functionality).
  • Breaking change (fix or new feature that would cause existing functionality to change).
  • New tests added to cover the changes.
  • Quick tests passed locally by running ./runtest.sh.
  • In-line docstrings updated.
  • Documentation updated.

…treaming

Introduce new components for both server and clients to enable streaming
of model tensors using safetensors serialization.

When configured, these components replace the standard tensor serialization
and transfer mechanisms in NVFlare with a streaming approach, optimizing
the handling, transfer, and reinjection of tensors during distributed training rounds.

This enhances memory efficiency and scalability for applications dealing with large model states.
@rfilgueiras rfilgueiras marked this pull request as ready for review October 2, 2025 12:24
@Copilot Copilot AI review requested due to automatic review settings October 2, 2025 12:24
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a new "Tensor Stream" component for NVFlare that optimizes tensor transfer between server and clients by streaming tensors separately from task payloads using safetensors serialization.

Key Changes

  • Implements streaming-based tensor transfer that replaces embedding tensors in main payload
  • Uses safetensors for efficient serialization and streaming of individual tensors
  • Provides separate components for server-side and client-side tensor streaming with automatic cleanup

Reviewed Changes

Copilot reviewed 19 out of 19 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
nvflare/app_opt/tensor_stream/server.py Server-side streamer that sends task data tensors to clients and receives task results
nvflare/app_opt/tensor_stream/client.py Client-side streamer that receives task data from server and sends results back
nvflare/app_opt/tensor_stream/sender.py Component for sending tensors using StreamableEngine with torch/numpy conversion
nvflare/app_opt/tensor_stream/receiver.py Component for receiving tensors and reconstructing them into FL context
nvflare/app_opt/tensor_stream/producer.py Produces stream data from torch tensors using safetensors serialization
nvflare/app_opt/tensor_stream/consumer.py Consumes streamed tensor data and reconstructs tensors from safetensors
nvflare/app_opt/tensor_stream/utils.py Utility functions for cleaning task data/results and managing topics/targets
nvflare/app_opt/tensor_stream/types.py Type definitions and constants for tensor streaming
nvflare/app_opt/tensor_stream/executors.py Example executor showing tensor stream usage
tests/unit_test/app_opt/tensor_stream/ Comprehensive test suite covering all components

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@rfilgueiras rfilgueiras requested a review from Copilot October 2, 2025 20:02
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 19 out of 19 changed files in this pull request and generated 8 comments.


Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@chesterxgchen
Copy link
Collaborator

chesterxgchen commented Oct 5, 2025

@rfilgueiras one question

since the tensor is streamed separately from the task data (payload), do we have a race condition where the tensor might be arrived the earlier than the task data ? How do we handle this ?

@rfilgueiras
Copy link
Author

@rfilgueiras one question

since the tensor is streamed separately from the task data (payload), do we have a trance condition where the tensor might be arrived the earlier than the task data ? How do we handle this ?

This is a great question @chesterxgchen :)

I had this issue and solved it by waiting for all transfers to be done before cleaning the context and releasing the tasks:
https://github.com/rfilgueiras/NVFlare/blob/484f51eb1f4d59bdebffc6cbfcf042616878e315/nvflare/app_opt/tensor_stream/server.py#L132

Each client's task must not only wait for the transfer to the same client, but also until the transfer to all clients is done.
Only the last client will actually clean the context. It was a surprise to me that the task's context is shared among all client tasks.

I will introduce a timeout in the wait logic, which should be the maximum time it should wait for all the tensors to be transferred to all clients.

Note that in this line, after each client is done, it will increment num_task_data_sent:
https://github.com/rfilgueiras/NVFlare/blob/484f51eb1f4d59bdebffc6cbfcf042616878e315/nvflare/app_opt/tensor_stream/server.py#L130

All producers share the same tensors dict and should treat it as
read-only data.
replaced the | operator in type definitions with typing.Union.
@chesterxgchen
Copy link
Collaborator

chesterxgchen commented Oct 6, 2025

Each client's task must not only wait for the transfer to the same client, but also until the transfer to all clients is done.
Only the last client will actually clean the context. It was a surprise to me that the task's context is shared among all client tasks.

I don't understand this part. Each client should only need to wait for its own tensor. It the tensor is delivered, then it can process. This is independent to all other clients. Some client can be fast or slow, depending on their network or computing resources. Waiting for all tensors from all clients seems not necessary.

Are you afraid that the server task data is cleaned too earlier and before it it broadcasted to the individual site ?

@chesterxgchen
Copy link
Collaborator

chesterxgchen commented Oct 6, 2025

Each client's task must not only wait for the transfer to the same client, but also until the transfer to all clients is done.
Only the last client will actually clean the context. It was a surprise to me that the task's context is shared among all client tasks.

I don't understand this part. Each client should only need to wait for its own tensor. It the tensor is delivered, then it can process. This is independent to all other clients. Some client can be fast or slow, depending on their network or computing resources. Waiting for all tensors from all clients seems not necessary.

Are you afraid that the server task data is cleaned too earlier and before it it broadcasted to the individual site ?

I think I understand what you trying to do here. You want to make sure the data are actually sent to all clients before you clean the task data.You wait for data "sent" ( not necessarily delivered) to all clients, then call clean data on server side.

send() is non blocking call. This is ok.

But what I was asking is different question. All above is on the server side, on the client side (Client Executor), is it possible to have a race condition that Task data and Tensor Steam arrived in different sequence
For example,
Tensor arrived first, but the Task Data is not yet here yet, so the instruction (task name) is not yet given, the executor has no idea what to do with the Tensor received.

- add parameter to define which tasks the tensor stream should be enabled
- improved TensorServerStreamer syncronization before releasing tasks to
clients
- add root_keys auto-detection: removed parameter from Client, Server and Sender
feat: add tasks parameter, default "train"

- add a parameter to define which tasks the tensor stream should be enabled for
- improved TensorServerStreamer synchronization before releasing tasks to
    clients
 - add root_keys auto-detection: removed parameter from Client, Server, and Sender
- receiver: handle receiving multiple tensor maps (when multiple
  non-empty root keys are present)
- server and client: clean-up received tensors after updating task
…e-dict

fix: tensor received with root keys
@rfilgueiras
Copy link
Author

Each client's task must not only wait for the transfer to the same client, but also until the transfer to all clients is done.
Only the last client will actually clean the context. It was a surprise to me that the task's context is shared among all client tasks.

I don't understand this part. Each client should only need to wait for its own tensor. It the tensor is delivered, then it can process. This is independent to all other clients. Some client can be fast or slow, depending on their network or computing resources. Waiting for all tensors from all clients seems not necessary.

Are you afraid that the server task data is cleaned too earlier and before it it broadcasted to the individual site ?

Yes. I am reading the tensors from the fl_ctx without copying them. If I clean the context before they finish, it impacts the ones that are still processing the tensors.

This was strange to me at first, since they might have different filters applied, etc.
Can you double-check if this is how it works, or is there something I can do to avoid the waiting?

@rfilgueiras
Copy link
Author

rfilgueiras commented Oct 6, 2025

Each client's task must not only wait for the transfer to the same client, but also until the transfer to all clients is done.
Only the last client will actually clean the context. It was a surprise to me that the task's context is shared among all client tasks.

I don't understand this part. Each client should only need to wait for its own tensor. It the tensor is delivered, then it can process. This is independent to all other clients. Some client can be fast or slow, depending on their network or computing resources. Waiting for all tensors from all clients seems not necessary.
Are you afraid that the server task data is cleaned too earlier and before it it broadcasted to the individual site ?

I think I understand what you trying to do here. You want to make sure the data are actually sent to all clients before you clean the task data.You wait for data "sent" ( not necessarily delivered) to all clients, then call clean data on server side.

send() is non blocking call. This is ok.

In this case, the send is synchronous and blocks on each thread (for each client) on the server side until all data is produced by the server and consumed by the client.

I updated the logic to be clearer about what is happening. The send operation is triggered by the AFTER_TASK_DATA_FILTER event:
https://github.com/rfilgueiras/NVFlare/blob/55bbde8c73496833400d2df92091375efc9e3308/nvflare/app_opt/tensor_stream/server.py#L115

But what I was asking is different question. All above is on the server side, on the client side (Client Executor), is it possible to have a race condition that Task data and Tensor Steam arrived in different sequence For example, Tensor arrived first, but the Task Data is not yet here yet, so the instruction (task name) is not yet given, the executor has no idea what to do with the Tensor received.

This is fine, the server sends the tensors, and they must arrive before the task data.
It will be stored on the receiver at the client side:
https://github.com/rfilgueiras/NVFlare/blob/55bbde8c73496833400d2df92091375efc9e3308/nvflare/app_opt/tensor_stream/receiver.py#L70

When the task data arrives, the client streamer handles the BEFORE_TASK_DATA_FILTER event:
https://github.com/rfilgueiras/NVFlare/blob/55bbde8c73496833400d2df92091375efc9e3308/nvflare/app_opt/tensor_stream/client.py#L98

It will ensure the task data is updated with the received tensors.

Copy link
Collaborator

@yanchengnv yanchengnv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very clever solution to use event handlers for tensor streaming. I wonder whether it could be generalized for other types of streaming.

With this solution, the streaming is between the server and the client. This works well only if the client is the one that does training. But this is not always the case. For example, in case of Client API based applications, the client does not do training. It is the external app that actually does training.

A better solution (for future) might be using the Object Downloader. Instead of pushing the tensors to the client, the server will wait for the trainer to pull tensors from it. The trainer could be the client, or the external app.

@rfilgueiras
Copy link
Author

rfilgueiras commented Oct 8, 2025

This is a very clever solution to use event handlers for tensor streaming. I wonder whether it could be generalized for other types of streaming.

Yes, the main advantage of using event handlers is that neither the controller nor executors need to be modified.

What other types of streaming services do you think of?
This is very flexible if you look at it; we only need different producers and consumers that know how to stream the data.

With this solution, the streaming is between the server and the client. This works well only if the client is the one that does training. But this is not always the case. For example, in case of Client API based applications, the client does not do training. It is the external app that actually does training.

Yes, the current solution is not compatible with the ClientAPI for many reasons, even when it is running in-memory.

A better solution (for future) might be using the Object Downloader. Instead of pushing the tensors to the client, the server will wait for the trainer to pull tensors from it. The trainer could be the client, or the external app.

This would work in the opposite way than I do now.

From Server -> Client (Task Data):

  • Server registers a listener passing the Producer factory
  • Client requests the task data from the Server, passing the Consumer factory

From Client -> Server (Task Result):

  • Client registers a listener passing the Producer factory
  • Server requests the task result from the ClientAPI, passing the Consumer factory

I would love to discuss how to improve these interfaces to facilitate such patterns:

  • ClientAPI (in and ex process)
  • Client Engine
  • Server Engine

One possibility would be:

  • Task Data and Task Result have a proxy data object with metadata (replacing the original data)
  • Peer would know (based on headers) that it should be dealing with a streaming transfer
  • Using the provided metadata, it would download the data (by consuming the stream)
  • Data would be added to the Task Data and Result by normal processing

@rfilgueiras
Copy link
Author

rfilgueiras commented Oct 9, 2025

I understand that using the ObjDownloader would allow it to support the ClientAPI.
The main reason I did not consider it was to maintain compatibility with NVFlare 2.6.

If the task data or task results cannot be found in the FLContext,
the send operation will not be performed.

In this case, a warming will logged but task will not be aborted.
It will fallback to the default serialization and transmission process.

On the receiver side, the same applies.
Improve performance while keeping memory usage low.
By leveraging generators data flow is faster.

When doing IO the CPU bound code and prepare the next batch.
Combined with chunking, it reduces CPU usage and latency.

When testing with 5000 tensors (2.2GB) it reduced transfer
times from 70s to 30s.
- avoid possible memory leaks
- reduce duplication when converting to and from numpy
…ation-when-handling-numpy-conversions

fix: ensure memory is released as soon as possible
fix: use TASK_RESULT_RECEIVED instead of BEFORE_TASK_RESULT_FILTER.
     It ensures that for each task we will add the tensors back into the
     context.
Keep track of seens tasks and ensure received tensors are cleaned-up
when the round is over.
…-of-peer-name

feat: use task id to store and retrieve received tensors
Added new method to ensure the tensors arrived before trying to set
them back into the task data/result.
When setting enable_request_task_data_tensors=True, the client will send
a federated event to the server requesting the tensors to be sent.

It means that the server can first send the task data without tensors
and wait for the client to requet the tensors to be sent.

It should allow supporing the ClientAPI in the future.
Sender:

Added new method to parse and store the tensors only.
This should be done before calling send.
It is common to have model params with mixed data, not only tensors.
This is now handled ensuring other data will be preserved.

A new approach to recursive dicts was implemented, ensuring now that
all tensors will be serialized with one producer and consumer.
- Client: add support for requesting tensors after the task data has arrived
- Server: update event handling to use `Sender.store_tensors` and `Receiver.wait_for_tensors`
- Sender: decouple storing the model params reference (store_tensors method) from sending the tensors
- Receive: improve logic to wait for tensors, since task data or result can arrive before the tensors are received
- Producer: use the new tensor chunking logic from model params and update the data model to pass the location of tensors when they are part of nested dicts
- Consumer: add logic to reconstruct the original params using received tensors and the new parent_keys field
- Utils: add new functions to handle tensor chunking, update params, and extract non-tensor values from params
…data-from-context

fix: clean task data and task result data should
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants