Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,18 @@
import sys
try:
import readline
readline # ignore unused import warning
except ImportError:
pass

from recuperabit import logic, utils
# scanners
from recuperabit.fs.ntfs import NTFSScanner

from typing import TYPE_CHECKING
if TYPE_CHECKING:
from recuperabit.fs.core_types import Partition

__author__ = "Andrea Lazzarotto"
__copyright__ = "(c) 2014-2021, Andrea Lazzarotto"
__license__ = "GPLv3"
Expand Down Expand Up @@ -97,7 +102,7 @@ def check_valid_part(num, parts, shorthands, rebuild=True):
return None


def interpret(cmd, arguments, parts, shorthands, outdir):
def interpret(cmd, arguments, parts: dict[int, Partition], shorthands, outdir):
"""Perform command required by user."""
if cmd == 'help':
print('Available commands:')
Expand Down Expand Up @@ -362,7 +367,7 @@ def main():
pickle.dump(interesting, savefile)

# Ask for partitions
parts = {}
parts: dict[int, Partition] = {}
for scanner in scanners:
parts.update(scanner.get_partitions())

Expand Down
4 changes: 2 additions & 2 deletions recuperabit/fs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
# along with RecuperaBit. If not, see <http://www.gnu.org/licenses/>.


sector_size = 512
max_sectors = 256 # Maximum block size for recovery
sector_size: int = 512
max_sectors: int = 256 # Maximum block size for recovery
107 changes: 57 additions & 50 deletions recuperabit/fs/core_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,56 +25,58 @@

import logging
import os.path
from typing import Optional, Dict, Set, List, Tuple, Union, Any, Iterator
from datetime import datetime

from .constants import sector_size

from ..utils import readable_bytes


class File(object):
"""Filesystem-independent representation of a file."""
def __init__(self, index, name, size, is_directory=False,
is_deleted=False, is_ghost=False):
self.index = index
self.name = name
self.size = size
self.is_directory = is_directory
self.is_deleted = is_deleted
self.is_ghost = is_ghost
self.parent = None
self.mac = {
"""Filesystem-independent representation of a file. Aka Node."""
def __init__(self, index: Union[int, str], name: str, size: Optional[int], is_directory: bool = False,
is_deleted: bool = False, is_ghost: bool = False) -> None:
self.index: Union[int, str] = index
self.name: str = name
self.size: Optional[int] = size
self.is_directory: bool = is_directory
self.is_deleted: bool = is_deleted
self.is_ghost: bool = is_ghost
self.parent: Optional[Union[int, str]] = None
self.mac: Dict[str, Optional[datetime]] = {
'modification': None,
'access': None,
'creation': None
}
self.children = set()
self.children_names = set() # Avoid name clashes breaking restore
self.offset = None # Offset from beginning of disk
self.children: Set['File'] = set()
self.children_names: Set[str] = set() # Avoid name clashes breaking restore
self.offset: Optional[int] = None # Offset from beginning of disk

def set_parent(self, parent):
def set_parent(self, parent: Optional[Union[int, str]]) -> None:
"""Set a pointer to the parent directory."""
self.parent = parent

def set_mac(self, modification, access, creation):
def set_mac(self, modification: Optional[datetime], access: Optional[datetime], creation: Optional[datetime]) -> None:
"""Set the modification, access and creation times."""
self.mac['modification'] = modification
self.mac['access'] = access
self.mac['creation'] = creation

def get_mac(self):
def get_mac(self) -> List[Optional[datetime]]:
"""Get the modification, access and creation times."""
keys = ('modification', 'access', 'creation')
return [self.mac[k] for k in keys]

def set_offset(self, offset):
def set_offset(self, offset: Optional[int]) -> None:
"""Set the offset of the file record with respect to the disk image."""
self.offset = offset

def get_offset(self):
def get_offset(self) -> Optional[int]:
"""Get the offset of the file record with respect to the disk image."""
return self.offset

def add_child(self, node):
def add_child(self, node: 'File') -> None:
"""Add a new child to this directory."""
original_name = node.name
i = 0
Expand All @@ -90,15 +92,15 @@ def add_child(self, node):
self.children.add(node)
self.children_names.add(node.name)

def full_path(self, part):
def full_path(self, part: 'Partition') -> str:
"""Return the full path of this file."""
if self.parent is not None:
parent = part[self.parent]
return os.path.join(parent.full_path(part), self.name)
else:
return self.name

def get_content(self, partition):
def get_content(self, partition: 'Partition') -> Optional[Union[bytes, Iterator[bytes]]]:
# pylint: disable=W0613
"""Extract the content of the file.

Expand All @@ -109,14 +111,14 @@ def get_content(self, partition):
raise NotImplementedError

# pylint: disable=R0201
def ignore(self):
def ignore(self) -> bool:
"""The following method is used by the restore procedure to check
files that should not be recovered. For example, in NTFS file
$BadClus:$Bad shall not be recovered because it creates an output
with the same size as the partition (usually many GBs)."""
return False

def __repr__(self):
def __repr__(self) -> str:
return (
u'File(#%s, ^^%s^^, %s, offset = %s sectors)' %
(self.index, self.parent, self.name, self.offset)
Expand All @@ -128,42 +130,42 @@ class Partition(object):

Parameter root_id represents the identifier assigned to the root directory
of a partition. This can be file system dependent."""
def __init__(self, fs_type, root_id, scanner):
self.fs_type = fs_type
self.root_id = root_id
self.size = None
self.offset = None
self.root = None
self.lost = File(-1, 'LostFiles', 0, is_directory=True, is_ghost=True)
self.files = {}
self.recoverable = False
self.scanner = scanner

def add_file(self, node):
def __init__(self, fs_type: str, root_id: Union[int, str], scanner: 'DiskScanner') -> None:
self.fs_type: str = fs_type
self.root_id: Union[int, str] = root_id
self.size: Optional[int] = None
self.offset: Optional[int] = None
self.root: Optional[File] = None
self.lost: File = File(-1, 'LostFiles', 0, is_directory=True, is_ghost=True)
self.files: Dict[Union[int, str], File] = {}
self.recoverable: bool = False
self.scanner: 'DiskScanner' = scanner

def add_file(self, node: File) -> None:
"""Insert a new file in the partition."""
index = node.index
self.files[index] = node

def set_root(self, node):
def set_root(self, node: File) -> None:
"""Set the root directory."""
if not node.is_directory:
raise TypeError('Not a directory')
self.root = node
self.root.set_parent(None)

def set_size(self, size):
def set_size(self, size: int) -> None:
"""Set the (estimated) size of the partition."""
self.size = size

def set_offset(self, offset):
def set_offset(self, offset: int) -> None:
"""Set the offset from the beginning of the disk."""
self.offset = offset

def set_recoverable(self, recoverable):
def set_recoverable(self, recoverable: bool) -> None:
"""State if the partition contents are also recoverable."""
self.recoverable = recoverable

def rebuild(self):
def rebuild(self) -> None:
"""Rebuild the partition structure.

This method processes the contents of files and it rebuilds the
Expand Down Expand Up @@ -201,11 +203,11 @@ def rebuild(self):
return

# pylint: disable=R0201
def additional_repr(self):
def additional_repr(self) -> List[Tuple[str, Any]]:
"""Return additional values to show in the string representation."""
return []

def __repr__(self):
def __repr__(self) -> str:
size = (
readable_bytes(self.size * sector_size)
if self.size is not None else '??? b'
Expand All @@ -227,14 +229,14 @@ def __repr__(self):
', '.join(a+': '+str(b) for a, b in data)
)

def __getitem__(self, index):
def __getitem__(self, index: Union[int, str]) -> File:
if index in self.files:
return self.files[index]
if index == self.lost.index:
return self.lost
raise KeyError

def get(self, index, default=None):
def get(self, index: Union[int, str], default: Optional[File] = None) -> Optional[File]:
"""Get a file or the special LostFiles directory."""
try:
return self.__getitem__(index)
Expand All @@ -244,17 +246,22 @@ def get(self, index, default=None):

class DiskScanner(object):
"""Abstract stub for the implementation of disk scanners."""
def __init__(self, pointer):
self.image = pointer
def __init__(self, pointer: Any) -> None:
self.image: Any = pointer

def get_image(self):
def get_image(self) -> Any:
"""Return the image reference."""
return self.image

def feed(self, index, sector):
@staticmethod
def get_image(scanner: 'DiskScanner') -> Any:
"""Static method to get image from scanner instance."""
return scanner.image

def feed(self, index: int, sector: bytes) -> Optional[str]:
"""Feed a new sector."""
raise NotImplementedError

def get_partitions(self):
def get_partitions(self) -> Dict[int, Partition]:
"""Get a list of the found partitions."""
raise NotImplementedError
Loading