Skip to content

Commit 49a269f

Browse files
committed
build DAEFR project
0 parents  commit 49a269f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+12371
-0
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
__pycache__
2+
experiments/
3+
data/

DAEFR/.DS_Store

12 KB
Binary file not shown.

DAEFR/distributed/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from .distributed import (
2+
get_rank,
3+
get_local_rank,
4+
is_primary,
5+
synchronize,
6+
get_world_size,
7+
all_reduce,
8+
all_gather,
9+
reduce_dict,
10+
data_sampler,
11+
LOCAL_PROCESS_GROUP,
12+
)
13+
from .launch import launch

DAEFR/distributed/distributed.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import math
2+
import pickle
3+
4+
import torch
5+
from torch import distributed as dist
6+
from torch.utils import data
7+
8+
9+
LOCAL_PROCESS_GROUP = None
10+
11+
12+
def is_primary():
13+
return get_rank() == 0
14+
15+
16+
def get_rank():
17+
if not dist.is_available():
18+
return 0
19+
20+
if not dist.is_initialized():
21+
return 0
22+
23+
return dist.get_rank()
24+
25+
26+
def get_local_rank():
27+
if not dist.is_available():
28+
return 0
29+
30+
if not dist.is_initialized():
31+
return 0
32+
33+
if LOCAL_PROCESS_GROUP is None:
34+
raise ValueError("tensorfn.distributed.LOCAL_PROCESS_GROUP is None")
35+
36+
return dist.get_rank(group=LOCAL_PROCESS_GROUP)
37+
38+
39+
def synchronize():
40+
if not dist.is_available():
41+
return
42+
43+
if not dist.is_initialized():
44+
return
45+
46+
world_size = dist.get_world_size()
47+
48+
if world_size == 1:
49+
return
50+
51+
dist.barrier()
52+
53+
54+
def get_world_size():
55+
if not dist.is_available():
56+
return 1
57+
58+
if not dist.is_initialized():
59+
return 1
60+
61+
return dist.get_world_size()
62+
63+
64+
def all_reduce(tensor, op=dist.ReduceOp.SUM):
65+
world_size = get_world_size()
66+
67+
if world_size == 1:
68+
return tensor
69+
70+
dist.all_reduce(tensor, op=op)
71+
72+
return tensor
73+
74+
75+
def all_gather(data):
76+
world_size = get_world_size()
77+
78+
if world_size == 1:
79+
return [data]
80+
81+
buffer = pickle.dumps(data)
82+
storage = torch.ByteStorage.from_buffer(buffer)
83+
tensor = torch.ByteTensor(storage).to("cuda")
84+
85+
local_size = torch.IntTensor([tensor.numel()]).to("cuda")
86+
size_list = [torch.IntTensor([1]).to("cuda") for _ in range(world_size)]
87+
dist.all_gather(size_list, local_size)
88+
size_list = [int(size.item()) for size in size_list]
89+
max_size = max(size_list)
90+
91+
tensor_list = []
92+
for _ in size_list:
93+
tensor_list.append(torch.ByteTensor(size=(max_size,)).to("cuda"))
94+
95+
if local_size != max_size:
96+
padding = torch.ByteTensor(size=(max_size - local_size,)).to("cuda")
97+
tensor = torch.cat((tensor, padding), 0)
98+
99+
dist.all_gather(tensor_list, tensor)
100+
101+
data_list = []
102+
103+
for size, tensor in zip(size_list, tensor_list):
104+
buffer = tensor.cpu().numpy().tobytes()[:size]
105+
data_list.append(pickle.loads(buffer))
106+
107+
return data_list
108+
109+
110+
def reduce_dict(input_dict, average=True):
111+
world_size = get_world_size()
112+
113+
if world_size < 2:
114+
return input_dict
115+
116+
with torch.no_grad():
117+
keys = []
118+
values = []
119+
120+
for k in sorted(input_dict.keys()):
121+
keys.append(k)
122+
values.append(input_dict[k])
123+
124+
values = torch.stack(values, 0)
125+
dist.reduce(values, dst=0)
126+
127+
if dist.get_rank() == 0 and average:
128+
values /= world_size
129+
130+
reduced_dict = {k: v for k, v in zip(keys, values)}
131+
132+
return reduced_dict
133+
134+
135+
def data_sampler(dataset, shuffle, distributed):
136+
if distributed:
137+
return data.distributed.DistributedSampler(dataset, shuffle=shuffle)
138+
139+
if shuffle:
140+
return data.RandomSampler(dataset)
141+
142+
else:
143+
return data.SequentialSampler(dataset)

DAEFR/distributed/launch.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import os
2+
3+
import torch
4+
from torch import distributed as dist
5+
from torch import multiprocessing as mp
6+
7+
from . import distributed as dist_fn
8+
9+
10+
def find_free_port():
11+
import socket
12+
13+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
14+
15+
sock.bind(("", 0))
16+
port = sock.getsockname()[1]
17+
sock.close()
18+
19+
return port
20+
21+
22+
def launch(fn, n_gpu_per_machine, n_machine=1, machine_rank=0, dist_url=None, args=()):
23+
world_size = n_machine * n_gpu_per_machine
24+
25+
if world_size > 1:
26+
if "OMP_NUM_THREADS" not in os.environ:
27+
os.environ["OMP_NUM_THREADS"] = "1"
28+
29+
if dist_url == "auto":
30+
if n_machine != 1:
31+
raise ValueError('dist_url="auto" not supported in multi-machine jobs')
32+
33+
port = find_free_port()
34+
dist_url = f"tcp://127.0.0.1:{port}"
35+
36+
if n_machine > 1 and dist_url.startswith("file://"):
37+
raise ValueError(
38+
"file:// is not a reliable init method in multi-machine jobs. Prefer tcp://"
39+
)
40+
41+
mp.spawn(
42+
distributed_worker,
43+
nprocs=n_gpu_per_machine,
44+
args=(fn, world_size, n_gpu_per_machine, machine_rank, dist_url, args),
45+
daemon=False,
46+
)
47+
48+
else:
49+
fn(*args)
50+
51+
52+
def distributed_worker(
53+
local_rank, fn, world_size, n_gpu_per_machine, machine_rank, dist_url, args
54+
):
55+
if not torch.cuda.is_available():
56+
raise OSError("CUDA is not available. Please check your environments")
57+
58+
global_rank = machine_rank * n_gpu_per_machine + local_rank
59+
60+
try:
61+
dist.init_process_group(
62+
backend="NCCL",
63+
init_method=dist_url,
64+
world_size=world_size,
65+
rank=global_rank,
66+
)
67+
68+
except Exception:
69+
raise OSError("failed to initialize NCCL groups")
70+
71+
dist_fn.synchronize()
72+
73+
if n_gpu_per_machine > torch.cuda.device_count():
74+
raise ValueError(
75+
f"specified n_gpu_per_machine larger than available device ({torch.cuda.device_count()})"
76+
)
77+
78+
torch.cuda.set_device(local_rank)
79+
80+
if dist_fn.LOCAL_PROCESS_GROUP is not None:
81+
raise ValueError("torch.distributed.LOCAL_PROCESS_GROUP is not None")
82+
83+
n_machine = world_size // n_gpu_per_machine
84+
85+
for i in range(n_machine):
86+
ranks_on_i = list(range(i * n_gpu_per_machine, (i + 1) * n_gpu_per_machine))
87+
pg = dist.new_group(ranks_on_i)
88+
89+
if i == machine_rank:
90+
dist_fn.distributed.LOCAL_PROCESS_GROUP = pg
91+
92+
fn(*args)

DAEFR/models/.DS_Store

6 KB
Binary file not shown.

0 commit comments

Comments
 (0)