Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proto] Data, Model and Pipeline Parallelism example #140

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions examples/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
from __future__ import annotations

import timeit
from functools import reduce
from operator import add

import numpy as np
import torch
from torch.nn.functional import mse_loss

import pyqtorch as pyq
from pyqtorch.parametric import Parametric

N_DEVICES = 2
N_QUBITS = 2
N_POINTS = 100
N_EPOCHS = 1


def fn(x: torch.Tensor, degree: int) -> torch.Tensor:
return 0.05 * reduce(add, (torch.cos(i * x) + torch.sin(i * x) for i in range(degree)), 0)


x = torch.linspace(0, 10, N_POINTS)
y = fn(x, 5)

assert torch.cuda.is_available()
assert torch.cuda.device_count() == N_DEVICES


def init_params(circ: pyq.QuantumCircuit, device: torch.device) -> torch.nn.ParameterDict:
return torch.nn.ParameterDict(
{
op.param_name: torch.rand(1, requires_grad=True, device=device)
for op in circ.operations
if isinstance(op, Parametric)
}
)


def hea(n_qubits: int, n_layers: int, param_name: str) -> list:
ops = []
for layer in range(n_layers):
ops += [pyq.RX(i, f"{param_name}_0_{layer}_{i}") for i in range(n_qubits)]
ops += [pyq.RY(i, f"{param_name}_1_{layer}_{i}") for i in range(n_qubits)]
ops += [pyq.RX(i, f"{param_name}_2_{layer}_{i}") for i in range(n_qubits)]
ops += [pyq.CNOT(i % n_qubits, (i + 1) % n_qubits) for i in range(n_qubits)]
return ops


class SingleDeviceCircuit(torch.nn.Module):
def __init__(self, n_qubits: int = N_QUBITS):
super().__init__()
self.feature_map = pyq.QuantumCircuit(
n_qubits, [pyq.RX(i, "x") for i in range(n_qubits)]
).to("cuda:1")
self.c0 = pyq.QuantumCircuit(n_qubits, hea(n_qubits, 1, "theta")).to("cuda:1")
self.c1 = pyq.QuantumCircuit(n_qubits, hea(n_qubits, 1, "phi")).to("cuda:1")
self.params_c0 = init_params(self.c0, device="cuda:1")
self.params_c1 = init_params(self.c1, device="cuda:1")
self.observable = pyq.Z(0).to("cuda:1")

def forward(self, x: torch.Tensor) -> torch.Tensor:
state = pyq.zero_state(N_QUBITS).to("cuda:1")
state = self.feature_map.forward(state.to("cuda:1"), {"x": x.to("cuda:1")})
state = self.c0.forward(state, self.params_c0)
state = self.c1.forward(state.to("cuda:1"), self.params_c1)
projected = self.observable.forward(state)
return pyq.inner_prod(state, projected).real


class ModelParallelCircuit(torch.nn.Module):
def __init__(self, n_qubits: int = N_QUBITS):
super().__init__()
self.feature_map = pyq.QuantumCircuit(
n_qubits, [pyq.RX(i, "x") for i in range(n_qubits)]
).to("cuda:0")
self.c0 = pyq.QuantumCircuit(n_qubits, hea(n_qubits, 1, "theta")).to("cuda:0")
self.c1 = pyq.QuantumCircuit(n_qubits, hea(n_qubits, 1, "phi")).to("cuda:1")
self.params_c0 = init_params(self.c0, device="cuda:0")
self.params_c1 = init_params(self.c1, device="cuda:1")
self.observable = pyq.Z(0).to("cuda:1")

def forward(self, x: torch.Tensor) -> torch.Tensor:
state = pyq.zero_state(N_QUBITS).to("cuda:0")
state = self.feature_map.forward(state.to("cuda:0"), {"x": x.to("cuda:0")})
state = self.c0.forward(state, self.params_c0)
state = self.c1.forward(state.to("cuda:1"), self.params_c1)
projected = self.observable.forward(state)
return pyq.inner_prod(state, projected).real


def train(circ) -> None:
optimizer = torch.optim.Adam(
{**circ.params_c0, **circ.params_c1}.values(), lr=0.01, foreach=False
)
for epoch in range(N_EPOCHS):
optimizer.zero_grad()
y_pred = circ.forward(x)
loss = mse_loss(y_pred, y.to("cuda:1"))
loss.backward()
optimizer.step()


class PipelineParallelCircuit(ModelParallelCircuit):
def __init__(self, split_size=N_POINTS // 2, *args, **kwargs):
super(PipelineParallelCircuit, self).__init__(*args, **kwargs)
self.split_size = split_size

def forward(self, x: torch.Tensor) -> torch.Tensor:
init_state = pyq.zero_state(N_QUBITS).to("cuda:0")
splits = iter(x.split(self.split_size, dim=0))
s_next = next(splits)
s_prev = self.feature_map(init_state, {"x": s_next.to("cuda:0")})
s_prev = self.c0.forward(s_prev, self.params_c0).to("cuda:1")
ret = []

for s_next in splits:
s_prev = self.c1.forward(s_prev, self.params_c1)
ret.append(pyq.inner_prod(s_prev, self.observable.forward(s_prev)).real)

s_prev = self.feature_map(init_state, {"x": s_next.to("cuda:0")})
s_prev = self.c0.forward(s_prev, self.params_c0).to("cuda:1")

s_prev = self.c1.forward(s_prev, self.params_c1)
ret.append(pyq.inner_prod(s_prev, self.observable.forward(s_prev)).real)
return torch.cat(ret)


if __name__ == "__main__":
res = {"n_qubits": N_QUBITS}
for model_cls in [SingleDeviceCircuit, ModelParallelCircuit, PipelineParallelCircuit]:
try:
setup = "circ = model_cls(N_QUBITS)"
pp_run_times = timeit.repeat(
"train(circ)", setup, number=1, repeat=10, globals=globals()
)
pp_mean, pp_std = np.mean(pp_run_times), np.std(pp_run_times)
res[model_cls.__name__] = f"mean_runtime: {pp_mean}, std_runtime: {pp_std}"
except Exception as e:
res[model_cls.__name__] = f"failed, reason: {e}"
print(res)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ authors = [
]
requires-python = ">=3.8,<3.13"
license = {text = "Apache 2.0"}
version = "1.0.6"
version = "1.0.7"
classifiers=[
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python",
Expand Down
78 changes: 78 additions & 0 deletions pyqtorch/circuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from logging import getLogger
from typing import Any, Iterator

import torch
from torch import Tensor
from torch import device as torch_device
from torch.nn import Module, ModuleList
Expand Down Expand Up @@ -106,3 +107,80 @@ def expectation(
return AdjointExpectation.apply(circuit, observable, state, values.keys(), *values.values())
else:
raise ValueError(f"Requested diff_mode '{diff_mode}' not supported.")


class PipedCircuit(QuantumCircuit):
def __init__(self, n_qubits: int, operations: list[Module], dev_idx: int):
super().__init__(n_qubits, operations)
self = self.to(torch_device(f"cuda:{dev_idx}"))

def run(self, state: State = None, values: dict[str, Tensor] = {}) -> State:
if state is None:
state = self.init_state()
else:
state = state.to(self.device)
values = {k: v.to(self.device) for k, v in values.items()}
for op in self.operations:
state = op(state, values)
return state


class ModelParallelCircuit(QuantumCircuit):
def __init__(self, circ: QuantumCircuit, n_devices: int):
if not all([isinstance(subc, QuantumCircuit) for subc in circ.operations]):
msg = "Make sure the passed QuantumCircuit only contains other QuantumCircuits."
logger.error(msg)
raise ValueError(msg)
if not torch.cuda.is_available():
msg = f"{self.__class__.__name__} requires at least two GPU devices."
logger.error(msg)
raise ValueError(msg)
dev_count = torch.cuda.device_count()
if dev_count < n_devices:
msg = f"Requested {n_devices} GPU devices however only {dev_count} devices available."
logger.error(msg)
raise ValueError(msg)
n_circs = len(circ.operations)
dev_indices = [i for i in range(n_devices) for _ in range(n_circs // n_devices)]
operations = [
PipedCircuit(c.n_qubits, c.operations, dev_idx)
for c, dev_idx in zip(circ.operations, dev_indices)
]
super().__init__(circ.n_qubits, operations)


# class PipelineParallelCircuit(ModelParallelCircuit):
# def __init__(self, split_size=N_POINTS // 2, *args, **kwargs):
# super(PipelineParallelCircuit, self).__init__(*args, **kwargs)
# self.split_size = split_size

# def forward(self, x: torch.Tensor) -> torch.Tensor:
# init_state = zero_state(N_QUBITS).to("cuda:0")
# splits = iter(x.split(self.split_size, dim=0))
# s_next = next(splits)
# s_prev = self.feature_map(init_state, {"x": s_next.to("cuda:0")})
# s_prev = self.c0.forward(s_prev, self.params_c0).to("cuda:1")
# ret = []

# for s_next in splits:
# s_prev = self.c1.forward(s_prev, self.params_c1)
# ret.append(inner_prod(s_prev, self.observable.forward(s_prev)).real)

# s_prev = self.feature_map(init_state, {"x": s_next.to("cuda:0")})
# s_prev = self.c0.forward(s_prev, self.params_c0).to("cuda:1")

# s_prev = self.c1.forward(s_prev, self.params_c1)
# ret.append(inner_prod(s_prev, self.observable.forward(s_prev)).real)
# return torch.cat(ret)


# def train(circ) -> None:
# optimizer = torch.optim.Adam(
# {**circ.params_c0, **circ.params_c1}.values(), lr=0.01, foreach=False
# )
# for epoch in range(N_EPOCHS):
# optimizer.zero_grad()
# y_pred = circ.forward(x)
# loss = mse_loss(y_pred, y.to("cuda:1"))
# loss.backward()
# optimizer.step()
Loading