Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support O_DIRECT dataloader for pytorch #250

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dlio_benchmark/common/enumerations.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class DataLoaderType(Enum):
"""
TENSORFLOW='tensorflow'
PYTORCH='pytorch'
PYTORCH_DIRECT='pytorch-direct'
DALI='dali'
NATIVE_DALI='native_dali'
CUSTOM='custom'
Expand Down
2 changes: 1 addition & 1 deletion dlio_benchmark/data_loader/data_loader_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def get_loader(type, format_type, dataset_type, epoch):
if DLIOMPI.get_instance().rank() == 0:
logging.info(f"{utcnow()} Running DLIO with custom data loader class {_args.data_loader_class.__name__}")
return _args.data_loader_class(format_type, dataset_type, epoch)
elif type == DataLoaderType.PYTORCH:
elif type == DataLoaderType.PYTORCH or type == DataLoaderType.PYTORCH_DIRECT:
from dlio_benchmark.data_loader.torch_data_loader import TorchDataLoader
return TorchDataLoader(format_type, dataset_type, epoch)
elif type == DataLoaderType.TENSORFLOW:
Expand Down
1 change: 1 addition & 0 deletions dlio_benchmark/data_loader/torch_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def worker_init(self, worker_id):
dataset_type=self.dataset_type,
thread_index=worker_id,
epoch_number=self.epoch_number)
logging.debug(f"{utcnow()} worker initialized {worker_id} with reader {type(self.reader).__name__}")

def __del__(self):
if self.dlp_logger:
Expand Down
180 changes: 180 additions & 0 deletions dlio_benchmark/reader/odirect_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""
Copyright (c) 2024, UChicago Argonne, LLC
All Rights Reserved

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import numpy as np

import os
import ctypes
import time
import struct
import zlib

from dlio_benchmark.common.constants import MODULE_DATA_READER
from dlio_benchmark.reader.reader_handler import FormatReader
from dlio_benchmark.utils.utility import Profile

dlp = Profile(MODULE_DATA_READER)


class ODirectReader(FormatReader):
"""
ODirectReader - Generic reader for files using O_DIRECT
"""

@dlp.log_init
def __init__(self, dataset_type, thread_index, epoch, parser=None, alignment=4096):
super().__init__(dataset_type, thread_index)
self.parser = parser
self.alignment = alignment

@dlp.log
def open(self, filename):
super().open(filename)
data = self.odirect_read(filename)
if self.parser:
data = self.parser(data)
return data

def odirect_read(self, filepath):
try:
# Open the file with O_DIRECT
fd = os.open(filepath, os.O_RDONLY | os.O_DIRECT)

# Get the file size
file_size = os.path.getsize(filepath)

# Calculate the buffer size, aligned to the given alignment
buffer_size = ((file_size + self.alignment - 1) // self.alignment) * self.alignment

# Allocate the aligned buffer
buf = self.allocate_aligned_buffer(buffer_size)
mem_view = memoryview(buf)

# Read the file into the buffer
bytes_read = os.readv(fd, [mem_view[0:buffer_size]])
if bytes_read != file_size:
raise IOError(f"Could not read the entire file. Expected {file_size} bytes, got {bytes_read} bytes")
return mem_view
finally:
os.close(fd)

def allocate_aligned_buffer(self, size):
buf_size = size + (self.alignment - 1)
raw_memory = bytearray(buf_size)
ctypes_raw_type = (ctypes.c_char * buf_size)
ctypes_raw_memory = ctypes_raw_type.from_buffer(raw_memory)
raw_address = ctypes.addressof(ctypes_raw_memory)
offset = raw_address % self.alignment
offset_to_aligned = (self.alignment - offset) % self.alignment
ctypes_aligned_type = (ctypes.c_char * (buf_size - offset_to_aligned))
ctypes_aligned_memory = ctypes_aligned_type.from_buffer(raw_memory, offset_to_aligned)
return ctypes_aligned_memory

@dlp.log
def close(self, filename):
super().close(filename)

@dlp.log
def get_sample(self, filename, sample_index):
super().get_sample(filename, sample_index)
image = self.open_file_map[filename][..., sample_index]
dlp.update(image_size=image.nbytes)

def next(self):
for batch in super().next():
yield batch

@dlp.log
def read_index(self, image_idx, step):
return super().read_index(image_idx, step)

@dlp.log
def finalize(self):
return super().finalize()

def is_index_based(self):
return True

def is_iterator_based(self):
return True

# optimized to use in-ram buffer with 0 copy
def parse_npy(mem_view):
# Verify the magic string
if mem_view[:6].tobytes() != b'\x93NUMPY':
raise ValueError("This is not a valid .npy file.")

# Read version information
major, minor = struct.unpack('<BB', mem_view[6:8].tobytes())
if major == 1:
header_len = struct.unpack('<H', mem_view[8:10].tobytes())[0]
header = mem_view[10:10 + header_len].tobytes()
elif major == 2:
header_len = struct.unpack('<I', mem_view[8:12].tobytes())[0]
header = mem_view[12:12 + header_len].tobytes()
else:
raise ValueError(f"Unsupported .npy file version: {major}.{minor}")

# Parse the header
header_dict = eval(header.decode('latin1'))
dtype = np.dtype(header_dict['descr'])
shape = header_dict['shape']
fortran_order = header_dict['fortran_order']

# Calculate the data offset
data_offset = (10 + header_len) if major == 1 else (12 + header_len)
data_size = np.prod(shape) * dtype.itemsize

# Load the array data
data = np.ndarray(shape, dtype=dtype, buffer=mem_view[data_offset:data_offset + data_size])

# If the array is in Fortran order, convert it
if fortran_order:
data = np.asfortranarray(data)
return data

def parse_npz(mem_view):
files = {}
pos = 0

while pos < len(mem_view):
# Verify magic
local_header_signature = mem_view[pos:pos+4].tobytes()
if local_header_signature != b'\x50\x4b\x03\x04':
break

compressed_size = struct.unpack('<I', mem_view[pos+18:pos+22].tobytes())[0]
uncompressed_size = struct.unpack('<I', mem_view[pos+22:pos+26].tobytes())[0]
filename_len = struct.unpack('<H', mem_view[pos+26:pos+28].tobytes())[0]
extra_len = struct.unpack('<H', mem_view[pos+28:pos+30].tobytes())[0]
filename = mem_view[pos+30:pos+30+filename_len].tobytes().decode('utf-8')

# skip to data offset
pos += 30 + filename_len + extra_len
if not filename.endswith('.npy'):
raise ValueError(f"Unexpected file in npz: {filename}")
filename = filename[:-4]

compressed_data = mem_view[pos:pos+compressed_size]
pos += compressed_size

if compressed_size == uncompressed_size:
uncompressed_data = compressed_data
else:
uncompressed_data = zlib.decompress(compressed_data)

files[filename] = parse_npy(uncompressed_data)
return files
11 changes: 10 additions & 1 deletion dlio_benchmark/reader/reader_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,24 @@ def get_reader(type, dataset_type, thread_index, epoch_number):
if _args.data_loader == DataLoaderType.NATIVE_DALI:
from dlio_benchmark.reader.dali_npy_reader import DaliNPYReader
return DaliNPYReader(dataset_type, thread_index, epoch_number)
elif _args.data_loader == DataLoaderType.PYTORCH_DIRECT:
from dlio_benchmark.reader.odirect_reader import ODirectReader, parse_npy
return ODirectReader(dataset_type, thread_index, epoch_number, parse_npy)
else:
from dlio_benchmark.reader.npy_reader import NPYReader
return NPYReader(dataset_type, thread_index, epoch_number)
return NPYReader(dataset_type, thread_index, epoch_number)
elif type == FormatType.NPZ:
if _args.data_loader == DataLoaderType.NATIVE_DALI:
raise Exception("Loading data of %s format is not supported without framework data loader; please use npy format instead." %type)
elif _args.data_loader == DataLoaderType.PYTORCH_DIRECT:
from dlio_benchmark.reader.odirect_reader import ODirectReader, parse_npz
def parse_npz_x(mem_view):
return parse_npz(mem_view)["x"]
return ODirectReader(dataset_type, thread_index, epoch_number, parse_npz_x)
else:
from dlio_benchmark.reader.npz_reader import NPZReader
return NPZReader(dataset_type, thread_index, epoch_number)

elif type == FormatType.TFRECORD:
if _args.data_loader == DataLoaderType.NATIVE_DALI:
from dlio_benchmark.reader.dali_tfrecord_reader import DaliTFRecordReader
Expand Down
2 changes: 1 addition & 1 deletion dlio_benchmark/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def derive_configurations(self, file_list_train=None, file_list_eval=None):
if self.data_loader_sampler is None and self.data_loader_classname is None:
if self.data_loader == DataLoaderType.TENSORFLOW:
self.data_loader_sampler = DataLoaderSampler.ITERATIVE
elif self.data_loader in [DataLoaderType.PYTORCH, DataLoaderType.DALI]:
elif self.data_loader in [DataLoaderType.PYTORCH, DataLoaderType.PYTORCH_DIRECT, DataLoaderType.DALI]:
self.data_loader_sampler = DataLoaderSampler.INDEX
if self.data_loader_classname is not None:
from dlio_benchmark.data_loader.base_data_loader import BaseDataLoader
Expand Down