Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ def interpret(cmd, arguments, parts: dict[int, 'Partition'], shorthands, outdir)
part = check_valid_part(arguments[0], parts, shorthands)
if part is not None:
print('-'*10)
print(utils.tree_folder(part.root))
print(utils.tree_folder(part.lost))
print(utils.tree_folder(part.root, sector_size=part.sector_size))
print(utils.tree_folder(part.lost, sector_size=part.sector_size))
print('-'*10)
elif cmd == 'bodyfile':
if len(arguments) != 2:
Expand Down
1 change: 0 additions & 1 deletion recuperabit/fs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,4 @@
# along with RecuperaBit. If not, see <http://www.gnu.org/licenses/>.


sector_size: int = 512
max_sectors: int = 256 # Maximum block size for recovery
11 changes: 3 additions & 8 deletions recuperabit/fs/core_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
from typing import Optional, Dict, Set, List, Tuple, Union, Any, Iterator
from datetime import datetime

from .constants import sector_size

from ..utils import readable_bytes


Expand Down Expand Up @@ -140,6 +138,7 @@ def __init__(self, fs_type: str, root_id: Union[int, str], scanner: 'DiskScanner
self.files: Dict[Union[int, str], File] = {}
self.recoverable: bool = False
self.scanner: 'DiskScanner' = scanner
self.sector_size: int = 512 # Default sector size, can be overridden

def add_file(self, node: File) -> None:
"""Insert a new file in the partition."""
Expand Down Expand Up @@ -209,14 +208,14 @@ def additional_repr(self) -> List[Tuple[str, Any]]:

def __repr__(self) -> str:
size = (
readable_bytes(self.size * sector_size)
readable_bytes(self.size * self.sector_size)
if self.size is not None else '??? b'
)
data = [
('Offset', self.offset),
(
'Offset (b)',
self.offset * sector_size
self.offset * self.sector_size
if self.offset is not None else None
),
]
Expand Down Expand Up @@ -249,10 +248,6 @@ class DiskScanner(object):
def __init__(self, pointer: Any) -> None:
self.image: Any = pointer

def get_image(self) -> Any:
"""Return the image reference."""
return self.image

@staticmethod
def get_image(scanner: 'DiskScanner') -> Any:
"""Static method to get image from scanner instance."""
Expand Down
38 changes: 19 additions & 19 deletions recuperabit/fs/ntfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from collections import Counter
from typing import Any, Dict, List, Optional, Tuple, Union, Iterator, Set

from .constants import max_sectors, sector_size
from .constants import max_sectors
from .core_types import DiskScanner, File, Partition
from .ntfs_fmt import (attr_header_fmt, attr_names, attr_nonresident_fmt,
attr_resident_fmt, attr_types_fmt, attribute_list_parser,
Expand Down Expand Up @@ -95,7 +95,7 @@ def parse_mft_attr(attr: bytes) -> Tuple[Dict[str, Any], Optional[str]]:
return header, name


def _apply_fixup_values(header: Dict[str, Any], entry: bytearray) -> None:
def _apply_fixup_values(header: Dict[str, Any], entry: bytearray, sector_size: int) -> None:
"""Apply the fixup values to FILE and INDX records."""
offset = header['off_fixup']
for i in range(1, header['n_entries']):
Expand Down Expand Up @@ -134,7 +134,7 @@ def _attributes_reader(entry: bytes, offset: int) -> Dict[str, Any]:
return attributes


def parse_file_record(entry: bytes) -> Dict[str, Any]:
def parse_file_record(entry: bytes, sector_size: int) -> Dict[str, Any]:
"""Parse the contents of a FILE record (MFT entry)."""
header = unpack(entry, entry_fmt)
if (header['size_alloc'] is None or
Expand All @@ -147,19 +147,19 @@ def parse_file_record(entry: bytes) -> Dict[str, Any]:
if header['off_fixup'] < 48:
header['record_n'] = None

_apply_fixup_values(header, entry)
_apply_fixup_values(header, entry, sector_size)

attributes = _attributes_reader(entry, header['off_first'])
header['valid'] = True
header['attributes'] = attributes
return header


def parse_indx_record(entry: bytes) -> Dict[str, Any]:
def parse_indx_record(entry: bytes, sector_size: int) -> Dict[str, Any]:
"""Parse the contents of a INDX record (directory index)."""
header = unpack(entry, indx_fmt)

_apply_fixup_values(header, entry)
_apply_fixup_values(header, entry, sector_size)

node_data = unpack(entry[24:], indx_header_fmt)
node_data['off_start_list'] += 24
Expand Down Expand Up @@ -214,7 +214,7 @@ def _integrate_attribute_list(parsed: Dict[str, Any], part: 'NTFSPartition', ima
size = attr['real_size']
for entry in attr['runlist']:
clusters_pos += entry['offset']
length = min(entry['length'] * spc * sector_size, size)
length = min(entry['length'] * spc * part.sector_size, size)
size -= length
real_pos = clusters_pos * spc + part.offset
dump = sectors(image, real_pos, length, 1)
Expand Down Expand Up @@ -247,7 +247,7 @@ def _integrate_attribute_list(parsed: Dict[str, Any], part: 'NTFSPartition', ima
for index in entries_by_type[num]:
real_pos = mft_pos + index * FILE_size
dump = sectors(image, real_pos, FILE_size)
child_parsed = parse_file_record(dump)
child_parsed = parse_file_record(dump, part.sector_size)
if 'attributes' not in child_parsed:
continue
# Update the main entry (parsed)
Expand Down Expand Up @@ -356,12 +356,12 @@ def content_iterator(self, partition: 'NTFSPartition', image: Any, datas: List[D
break

for entry in attr['runlist']:
length = min(entry['length'] * spc * sector_size, size)
length = min(entry['length'] * spc * partition.sector_size, size)
size -= length
# Sparse runlist
if entry['offset'] is None:
while length > 0:
amount = min(max_sectors*sector_size, length)
amount = min(max_sectors*partition.sector_size, length)
length -= amount
yield b'\x00' * amount
continue
Expand All @@ -371,8 +371,8 @@ def content_iterator(self, partition: 'NTFSPartition', image: Any, datas: List[D
# Avoid to fill memory with huge blocks
offset = 0
while length > 0:
amount = min(max_sectors*sector_size, length)
position = real_pos*sector_size + offset
amount = min(max_sectors*partition.sector_size, length)
position = real_pos*partition.sector_size + offset
partial = self._padded_bytes(image, position, amount)
length -= amount
offset += amount
Expand All @@ -389,7 +389,7 @@ def get_content(self, partition: 'NTFSPartition') -> Optional[Union[bytes, Itera

image = DiskScanner.get_image(partition.scanner)
dump = sectors(image, File.get_offset(self), FILE_size)
parsed = parse_file_record(dump)
parsed = parse_file_record(dump, partition.sector_size)

if not parsed['valid'] or 'attributes' not in parsed:
logging.error(u'Invalid MFT entry for {}'.format(self))
Expand Down Expand Up @@ -623,7 +623,7 @@ def add_from_indx_allocation(self, parsed: Dict[str, Any], part: NTFSPartition)
img = DiskScanner.get_image(self)
for position in read_again:
dump = sectors(img, position, INDX_size)
entries = parse_indx_record(dump)['entries']
entries = parse_indx_record(dump, part.sector_size)['entries']
self.add_indx_entries(entries, part)

def add_from_attribute_list(self, parsed: Dict[str, Any], part: NTFSPartition, offset: int) -> None:
Expand Down Expand Up @@ -656,7 +656,7 @@ def add_from_mft_mirror(self, part: NTFSPartition) -> None:
if node is None or node.is_ghost:
position = mirrpos + i * FILE_size
dump = sectors(img, position, FILE_size)
parsed = parse_file_record(dump)
parsed = parse_file_record(dump, part.sector_size)
if parsed['valid'] and '$FILE_NAME' in parsed['attributes']:
node = NTFSFile(parsed, position)
part.add_file(node)
Expand Down Expand Up @@ -702,7 +702,7 @@ def get_partitions(self) -> Dict[int, NTFSPartition]:
logging.info('Parsing MFT entries')
for position in self.found_file:
dump = sectors(img, position, FILE_size)
parsed = parse_file_record(dump)
parsed = parse_file_record(dump, 512) # Default sector size during discovery
Copy link

Copilot AI Nov 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states 'Default sector size during discovery', but this hardcoded value could be problematic if the actual sector size differs. Consider documenting why it's safe to use 512 here during the discovery phase, or if the sector size should be determined and used instead.

Copilot uses AI. Check for mistakes.
attrs = parsed.get('attributes', {})
if not parsed['valid'] or '$FILE_NAME' not in attrs:
continue
Expand Down Expand Up @@ -737,7 +737,7 @@ def get_partitions(self) -> Dict[int, NTFSPartition]:
logging.info('Parsing INDX records')
for position in self.found_indx:
dump = sectors(img, position, INDX_size)
Copy link

Copilot AI Nov 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the previous issue, this hardcoded 512 sector size during discovery should be better documented to explain why it's safe to assume this value before the actual sector size is determined from the boot sector.

Suggested change
dump = sectors(img, position, INDX_size)
dump = sectors(img, position, INDX_size)
# NTFS typically uses a sector size of 512 bytes. During the initial discovery phase,
# we use 512 as the default sector size for parsing INDX records. This is safe because
# the NTFS boot sector, which contains the actual sector size, may not be available yet.
# After discovery, the correct sector size will be determined from the boot sector and
# used for further parsing. See NTFS documentation for details:
# https://learn.microsoft.com/en-us/windows/win32/fileio/ntfs-technical-reference

Copilot uses AI. Check for mistakes.
parsed = parse_indx_record(dump)
parsed = parse_indx_record(dump, 512) # Default sector size during discovery
if not parsed['valid']:
continue

Expand Down Expand Up @@ -793,7 +793,7 @@ def get_partitions(self) -> Dict[int, NTFSPartition]:
else:
# Infer MFT mirror position
dump = sectors(img, entry.offset, FILE_size)
mirror = parse_file_record(dump)
mirror = parse_file_record(dump, part.sector_size)
if (mirror['valid'] and 'attributes' in mirror and
'$DATA' in mirror['attributes']):
datas = mirror['attributes']['$DATA']
Expand Down Expand Up @@ -856,7 +856,7 @@ def get_partitions(self) -> Dict[int, NTFSPartition]:
if entry is None or part.sec_per_clus is None:
continue
dump = sectors(img, entry.offset, FILE_size)
parsed = parse_file_record(dump)
parsed = parse_file_record(dump, part.sector_size)
if not parsed['valid'] or 'attributes' not in parsed:
continue

Expand Down
22 changes: 12 additions & 10 deletions recuperabit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import string
import sys
import time
from typing import TYPE_CHECKING, Any, Iterable, Optional, List, Dict, Tuple, Union, Callable
from typing import TYPE_CHECKING, Any, Iterable, Optional, List, Dict, Sequence, Tuple, Union, Callable
import unicodedata
import io

from .fs.constants import sector_size
from recuperabit.fs.core_types import DiskScanner

printer: pprint.PrettyPrinter = pprint.PrettyPrinter(indent=4)
all_chars = (chr(i) for i in range(sys.maxunicode))
Expand All @@ -43,7 +43,7 @@
from .fs.core_types import File, Partition


def sectors(image: io.BufferedReader, offset: int, size: int, bsize: int = sector_size, fill: bool = True) -> Optional[bytearray]:
def sectors(image: io.BufferedReader, offset: int, size: int, bsize: int = 512, fill: bool = True) -> Optional[bytearray]:
"""Read from a file descriptor."""
read = True
try:
Expand Down Expand Up @@ -118,7 +118,7 @@ def unpack(data: bytes, fmt: List[Tuple[str, Tuple[Union[str, Callable[[bytes],
return result


def feed_all(image: io.BufferedReader, scanners: List[Any], indexes: Iterable[int]) -> List[int]:
def feed_all(image: io.BufferedReader, scanners: Sequence['DiskScanner'], indexes: Iterable[int]) -> List[int]:
# Scan the disk image and feed the scanners
interesting: List[int] = []
for index in indexes:
Expand Down Expand Up @@ -160,7 +160,7 @@ def readable_bytes(amount: Optional[int]) -> str:
return '%.2f %sB' % (scaled, powers[biggest])


def _file_tree_repr(node: 'File') -> str:
def _file_tree_repr(node: 'File', sector_size: int) -> str:
"""Give a nice representation for the tree."""
desc = (
' [GHOST]' if node.is_ghost else
Expand All @@ -184,21 +184,23 @@ def _file_tree_repr(node: 'File') -> str:
)


def tree_folder(directory: 'File', padding: int = 0) -> str:
def tree_folder(directory: 'File', padding: int = 0, sector_size: int | None = None) -> str:
Copy link

Copilot AI Nov 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type annotation int | None uses Python 3.10+ union syntax, but the codebase uses typing.Optional elsewhere for consistency. Consider changing to Optional[int] for backward compatibility and consistency with the rest of the codebase.

Suggested change
def tree_folder(directory: 'File', padding: int = 0, sector_size: int | None = None) -> str:
def tree_folder(directory: 'File', padding: int = 0, sector_size: Optional[int] = None) -> str:

Copilot uses AI. Check for mistakes.
"""Return a tree-like textual representation of a directory."""
assert sector_size is not None, "sector_size must be provided"
Copy link

Copilot AI Nov 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an assertion for input validation in production code is not recommended. If sector_size is required, either remove the None default or raise a ValueError instead of using assert. Assertions can be disabled with Python's -O flag, making this check unreliable.

Suggested change
assert sector_size is not None, "sector_size must be provided"
if sector_size is None:
raise ValueError("sector_size must be provided")

Copilot uses AI. Check for mistakes.

lines: List[str] = []
pad = ' ' * padding
lines.append(
pad + _file_tree_repr(directory)
pad + _file_tree_repr(directory, sector_size)
)
padding = padding + 2
pad = ' ' * padding
for entry in directory.children:
if len(entry.children) or entry.is_directory:
lines.append(tree_folder(entry, padding))
lines.append(tree_folder(entry, padding, sector_size))
else:
lines.append(
pad + _file_tree_repr(entry)
pad + _file_tree_repr(entry, sector_size)
)
return '\n'.join(lines)

Expand Down Expand Up @@ -309,7 +311,7 @@ def csv_part(part: 'Partition') -> list[str]:
obj.mac['modification'], obj.mac['access'],
obj.mac['creation'], obj.size,
readable_bytes(obj.size),
(obj.offset * sector_size
(obj.offset * part.sector_size
if obj.offset is not None else None),
obj.offset,
'1' if obj.is_directory else '',
Expand Down