-
-
Notifications
You must be signed in to change notification settings - Fork 80
Draft: Implement runtime guessing of sector size #137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -26,7 +26,7 @@ | |||||||||||||||||
| from collections import Counter | ||||||||||||||||||
| from typing import Any, Dict, List, Optional, Tuple, Union, Iterator, Set | ||||||||||||||||||
|
|
||||||||||||||||||
| from .constants import max_sectors, sector_size | ||||||||||||||||||
| from .constants import max_sectors | ||||||||||||||||||
| from .core_types import DiskScanner, File, Partition | ||||||||||||||||||
| from .ntfs_fmt import (attr_header_fmt, attr_names, attr_nonresident_fmt, | ||||||||||||||||||
| attr_resident_fmt, attr_types_fmt, attribute_list_parser, | ||||||||||||||||||
|
|
@@ -95,7 +95,7 @@ def parse_mft_attr(attr: bytes) -> Tuple[Dict[str, Any], Optional[str]]: | |||||||||||||||||
| return header, name | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| def _apply_fixup_values(header: Dict[str, Any], entry: bytearray) -> None: | ||||||||||||||||||
| def _apply_fixup_values(header: Dict[str, Any], entry: bytearray, sector_size: int) -> None: | ||||||||||||||||||
| """Apply the fixup values to FILE and INDX records.""" | ||||||||||||||||||
| offset = header['off_fixup'] | ||||||||||||||||||
| for i in range(1, header['n_entries']): | ||||||||||||||||||
|
|
@@ -134,7 +134,7 @@ def _attributes_reader(entry: bytes, offset: int) -> Dict[str, Any]: | |||||||||||||||||
| return attributes | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| def parse_file_record(entry: bytes) -> Dict[str, Any]: | ||||||||||||||||||
| def parse_file_record(entry: bytes, sector_size: int) -> Dict[str, Any]: | ||||||||||||||||||
| """Parse the contents of a FILE record (MFT entry).""" | ||||||||||||||||||
| header = unpack(entry, entry_fmt) | ||||||||||||||||||
| if (header['size_alloc'] is None or | ||||||||||||||||||
|
|
@@ -147,19 +147,19 @@ def parse_file_record(entry: bytes) -> Dict[str, Any]: | |||||||||||||||||
| if header['off_fixup'] < 48: | ||||||||||||||||||
| header['record_n'] = None | ||||||||||||||||||
|
|
||||||||||||||||||
| _apply_fixup_values(header, entry) | ||||||||||||||||||
| _apply_fixup_values(header, entry, sector_size) | ||||||||||||||||||
|
|
||||||||||||||||||
| attributes = _attributes_reader(entry, header['off_first']) | ||||||||||||||||||
| header['valid'] = True | ||||||||||||||||||
| header['attributes'] = attributes | ||||||||||||||||||
| return header | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
||||||||||||||||||
| def parse_indx_record(entry: bytes) -> Dict[str, Any]: | ||||||||||||||||||
| def parse_indx_record(entry: bytes, sector_size: int) -> Dict[str, Any]: | ||||||||||||||||||
| """Parse the contents of a INDX record (directory index).""" | ||||||||||||||||||
| header = unpack(entry, indx_fmt) | ||||||||||||||||||
|
|
||||||||||||||||||
| _apply_fixup_values(header, entry) | ||||||||||||||||||
| _apply_fixup_values(header, entry, sector_size) | ||||||||||||||||||
|
|
||||||||||||||||||
| node_data = unpack(entry[24:], indx_header_fmt) | ||||||||||||||||||
| node_data['off_start_list'] += 24 | ||||||||||||||||||
|
|
@@ -214,7 +214,7 @@ def _integrate_attribute_list(parsed: Dict[str, Any], part: 'NTFSPartition', ima | |||||||||||||||||
| size = attr['real_size'] | ||||||||||||||||||
| for entry in attr['runlist']: | ||||||||||||||||||
| clusters_pos += entry['offset'] | ||||||||||||||||||
| length = min(entry['length'] * spc * sector_size, size) | ||||||||||||||||||
| length = min(entry['length'] * spc * part.sector_size, size) | ||||||||||||||||||
| size -= length | ||||||||||||||||||
| real_pos = clusters_pos * spc + part.offset | ||||||||||||||||||
| dump = sectors(image, real_pos, length, 1) | ||||||||||||||||||
|
|
@@ -247,7 +247,7 @@ def _integrate_attribute_list(parsed: Dict[str, Any], part: 'NTFSPartition', ima | |||||||||||||||||
| for index in entries_by_type[num]: | ||||||||||||||||||
| real_pos = mft_pos + index * FILE_size | ||||||||||||||||||
| dump = sectors(image, real_pos, FILE_size) | ||||||||||||||||||
| child_parsed = parse_file_record(dump) | ||||||||||||||||||
| child_parsed = parse_file_record(dump, part.sector_size) | ||||||||||||||||||
| if 'attributes' not in child_parsed: | ||||||||||||||||||
| continue | ||||||||||||||||||
| # Update the main entry (parsed) | ||||||||||||||||||
|
|
@@ -356,12 +356,12 @@ def content_iterator(self, partition: 'NTFSPartition', image: Any, datas: List[D | |||||||||||||||||
| break | ||||||||||||||||||
|
|
||||||||||||||||||
| for entry in attr['runlist']: | ||||||||||||||||||
| length = min(entry['length'] * spc * sector_size, size) | ||||||||||||||||||
| length = min(entry['length'] * spc * partition.sector_size, size) | ||||||||||||||||||
| size -= length | ||||||||||||||||||
| # Sparse runlist | ||||||||||||||||||
| if entry['offset'] is None: | ||||||||||||||||||
| while length > 0: | ||||||||||||||||||
| amount = min(max_sectors*sector_size, length) | ||||||||||||||||||
| amount = min(max_sectors*partition.sector_size, length) | ||||||||||||||||||
| length -= amount | ||||||||||||||||||
| yield b'\x00' * amount | ||||||||||||||||||
| continue | ||||||||||||||||||
|
|
@@ -371,8 +371,8 @@ def content_iterator(self, partition: 'NTFSPartition', image: Any, datas: List[D | |||||||||||||||||
| # Avoid to fill memory with huge blocks | ||||||||||||||||||
| offset = 0 | ||||||||||||||||||
| while length > 0: | ||||||||||||||||||
| amount = min(max_sectors*sector_size, length) | ||||||||||||||||||
| position = real_pos*sector_size + offset | ||||||||||||||||||
| amount = min(max_sectors*partition.sector_size, length) | ||||||||||||||||||
| position = real_pos*partition.sector_size + offset | ||||||||||||||||||
| partial = self._padded_bytes(image, position, amount) | ||||||||||||||||||
| length -= amount | ||||||||||||||||||
| offset += amount | ||||||||||||||||||
|
|
@@ -389,7 +389,7 @@ def get_content(self, partition: 'NTFSPartition') -> Optional[Union[bytes, Itera | |||||||||||||||||
|
|
||||||||||||||||||
| image = DiskScanner.get_image(partition.scanner) | ||||||||||||||||||
| dump = sectors(image, File.get_offset(self), FILE_size) | ||||||||||||||||||
| parsed = parse_file_record(dump) | ||||||||||||||||||
| parsed = parse_file_record(dump, partition.sector_size) | ||||||||||||||||||
|
|
||||||||||||||||||
| if not parsed['valid'] or 'attributes' not in parsed: | ||||||||||||||||||
| logging.error(u'Invalid MFT entry for {}'.format(self)) | ||||||||||||||||||
|
|
@@ -623,7 +623,7 @@ def add_from_indx_allocation(self, parsed: Dict[str, Any], part: NTFSPartition) | |||||||||||||||||
| img = DiskScanner.get_image(self) | ||||||||||||||||||
| for position in read_again: | ||||||||||||||||||
| dump = sectors(img, position, INDX_size) | ||||||||||||||||||
| entries = parse_indx_record(dump)['entries'] | ||||||||||||||||||
| entries = parse_indx_record(dump, part.sector_size)['entries'] | ||||||||||||||||||
| self.add_indx_entries(entries, part) | ||||||||||||||||||
|
|
||||||||||||||||||
| def add_from_attribute_list(self, parsed: Dict[str, Any], part: NTFSPartition, offset: int) -> None: | ||||||||||||||||||
|
|
@@ -656,7 +656,7 @@ def add_from_mft_mirror(self, part: NTFSPartition) -> None: | |||||||||||||||||
| if node is None or node.is_ghost: | ||||||||||||||||||
| position = mirrpos + i * FILE_size | ||||||||||||||||||
| dump = sectors(img, position, FILE_size) | ||||||||||||||||||
| parsed = parse_file_record(dump) | ||||||||||||||||||
| parsed = parse_file_record(dump, part.sector_size) | ||||||||||||||||||
| if parsed['valid'] and '$FILE_NAME' in parsed['attributes']: | ||||||||||||||||||
| node = NTFSFile(parsed, position) | ||||||||||||||||||
| part.add_file(node) | ||||||||||||||||||
|
|
@@ -702,7 +702,7 @@ def get_partitions(self) -> Dict[int, NTFSPartition]: | |||||||||||||||||
| logging.info('Parsing MFT entries') | ||||||||||||||||||
| for position in self.found_file: | ||||||||||||||||||
| dump = sectors(img, position, FILE_size) | ||||||||||||||||||
| parsed = parse_file_record(dump) | ||||||||||||||||||
| parsed = parse_file_record(dump, 512) # Default sector size during discovery | ||||||||||||||||||
| attrs = parsed.get('attributes', {}) | ||||||||||||||||||
| if not parsed['valid'] or '$FILE_NAME' not in attrs: | ||||||||||||||||||
| continue | ||||||||||||||||||
|
|
@@ -737,7 +737,7 @@ def get_partitions(self) -> Dict[int, NTFSPartition]: | |||||||||||||||||
| logging.info('Parsing INDX records') | ||||||||||||||||||
| for position in self.found_indx: | ||||||||||||||||||
| dump = sectors(img, position, INDX_size) | ||||||||||||||||||
|
||||||||||||||||||
| dump = sectors(img, position, INDX_size) | |
| dump = sectors(img, position, INDX_size) | |
| # NTFS typically uses a sector size of 512 bytes. During the initial discovery phase, | |
| # we use 512 as the default sector size for parsing INDX records. This is safe because | |
| # the NTFS boot sector, which contains the actual sector size, may not be available yet. | |
| # After discovery, the correct sector size will be determined from the boot sector and | |
| # used for further parsing. See NTFS documentation for details: | |
| # https://learn.microsoft.com/en-us/windows/win32/fileio/ntfs-technical-reference |
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -25,11 +25,11 @@ | |||||||
| import string | ||||||||
| import sys | ||||||||
| import time | ||||||||
| from typing import TYPE_CHECKING, Any, Iterable, Optional, List, Dict, Tuple, Union, Callable | ||||||||
| from typing import TYPE_CHECKING, Any, Iterable, Optional, List, Dict, Sequence, Tuple, Union, Callable | ||||||||
| import unicodedata | ||||||||
| import io | ||||||||
|
|
||||||||
| from .fs.constants import sector_size | ||||||||
| from recuperabit.fs.core_types import DiskScanner | ||||||||
|
|
||||||||
| printer: pprint.PrettyPrinter = pprint.PrettyPrinter(indent=4) | ||||||||
| all_chars = (chr(i) for i in range(sys.maxunicode)) | ||||||||
|
|
@@ -43,7 +43,7 @@ | |||||||
| from .fs.core_types import File, Partition | ||||||||
|
|
||||||||
|
|
||||||||
| def sectors(image: io.BufferedReader, offset: int, size: int, bsize: int = sector_size, fill: bool = True) -> Optional[bytearray]: | ||||||||
| def sectors(image: io.BufferedReader, offset: int, size: int, bsize: int = 512, fill: bool = True) -> Optional[bytearray]: | ||||||||
| """Read from a file descriptor.""" | ||||||||
| read = True | ||||||||
| try: | ||||||||
|
|
@@ -118,7 +118,7 @@ def unpack(data: bytes, fmt: List[Tuple[str, Tuple[Union[str, Callable[[bytes], | |||||||
| return result | ||||||||
|
|
||||||||
|
|
||||||||
| def feed_all(image: io.BufferedReader, scanners: List[Any], indexes: Iterable[int]) -> List[int]: | ||||||||
| def feed_all(image: io.BufferedReader, scanners: Sequence['DiskScanner'], indexes: Iterable[int]) -> List[int]: | ||||||||
| # Scan the disk image and feed the scanners | ||||||||
| interesting: List[int] = [] | ||||||||
| for index in indexes: | ||||||||
|
|
@@ -160,7 +160,7 @@ def readable_bytes(amount: Optional[int]) -> str: | |||||||
| return '%.2f %sB' % (scaled, powers[biggest]) | ||||||||
|
|
||||||||
|
|
||||||||
| def _file_tree_repr(node: 'File') -> str: | ||||||||
| def _file_tree_repr(node: 'File', sector_size: int) -> str: | ||||||||
| """Give a nice representation for the tree.""" | ||||||||
| desc = ( | ||||||||
| ' [GHOST]' if node.is_ghost else | ||||||||
|
|
@@ -184,21 +184,23 @@ def _file_tree_repr(node: 'File') -> str: | |||||||
| ) | ||||||||
|
|
||||||||
|
|
||||||||
| def tree_folder(directory: 'File', padding: int = 0) -> str: | ||||||||
| def tree_folder(directory: 'File', padding: int = 0, sector_size: int | None = None) -> str: | ||||||||
|
||||||||
| def tree_folder(directory: 'File', padding: int = 0, sector_size: int | None = None) -> str: | |
| def tree_folder(directory: 'File', padding: int = 0, sector_size: Optional[int] = None) -> str: |
Copilot
AI
Nov 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using an assertion for input validation in production code is not recommended. If sector_size is required, either remove the None default or raise a ValueError instead of using assert. Assertions can be disabled with Python's -O flag, making this check unreliable.
| assert sector_size is not None, "sector_size must be provided" | |
| if sector_size is None: | |
| raise ValueError("sector_size must be provided") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states 'Default sector size during discovery', but this hardcoded value could be problematic if the actual sector size differs. Consider documenting why it's safe to use 512 here during the discovery phase, or if the sector size should be determined and used instead.