Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: CERT-Polska/malduck
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: cd6497ed24523f8b1bcb1930e90d6d507363880a
Choose a base ref
..
head repository: CERT-Polska/malduck
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 4508e36c5139cda392331c17f52a18d9e8f30809
Choose a head ref
Showing with 23 additions and 21 deletions.
  1. +1 −1 docs/extractor.rst
  2. +22 −20 malduck/extractor/extract_manager.py
2 changes: 1 addition & 1 deletion docs/extractor.rst
Original file line number Diff line number Diff line change
@@ -19,5 +19,5 @@ Module interface
Internally used classes and routines
------------------------------------

.. autoclass:: malduck.extractor.extract_manager.ProcmemExtractManager
.. autoclass:: malduck.extractor.extract_manager.ExtractionContext
:members:
42 changes: 22 additions & 20 deletions malduck/extractor/extract_manager.py
Original file line number Diff line number Diff line change
@@ -91,27 +91,27 @@ def on_extractor_error(
traceback.format_exc(),
)

def push_file(self, filepath: str, base: int = 0) -> List[str]:
def push_file(self, filepath: str, base: int = 0) -> Optional[str]:
"""
Pushes file for extraction. Config extractor entrypoint.
:param filepath: Path to extracted file
:type filepath: str
:param base: Memory dump base address
:type base: int
:return: Configuration if ripped successfully or empty dict otherwise
:return: Detected family if configuration looks better than already stored one
"""
log.debug("Started extraction of file %s:%x", filepath, base)
with ProcessMemory.from_file(filepath, base=base) as p:
return self.push_procmem(p, rip_binaries=True)

def match_procmem(self, p: ProcessMemory) -> YaraRulesetMatch:
"""
Performs Yara matchign on ProcessMemory using modules
Performs Yara matching on ProcessMemory using modules
bound with current ExtractManager.
"""
matches = p.yarap(self.rules, extended=True)
log.debug("Matched rules: %s", list(matches.keys()))
log.debug("Matched rules: %s", ",".join(list(matches.keys())))
return matches

def carve_procmem(self, p: ProcessMemory) -> List[ProcessMemoryBinary]:
@@ -130,13 +130,13 @@ def carve_procmem(self, p: ProcessMemory) -> List[ProcessMemoryBinary]:
binaries += carved_bins
return binaries

def push_config(self, config: Config):
def push_config(self, config: Config) -> bool:
if not config.get("family"):
return False

family = config["family"]
if family in self.configs:
if is_config_better(self.configs[family], config):
if is_config_better(base_config=self.configs[family], new_config=config):
self.configs[family] = config
log.debug("%s config looks better than previous one", family)
return True
@@ -171,7 +171,7 @@ def push_config(self, config: Config):
self.configs[family] = config
return True

def _extract_procmem(self, p: ProcessMemory, matches):
def _extract_procmem(self, p: ProcessMemory, matches) -> Optional[str]:
log.debug("%s - ripping...", repr(p))
# Create extraction context for single file
manager = ExtractionContext(parent=self)
@@ -183,13 +183,17 @@ def _extract_procmem(self, p: ProcessMemory, matches):
config = manager.collected_config
if config.get("family"):
log.debug("%s - found %s!", repr(p), config.get("family"))
self.push_config(config)
return [config["family"]]
if self.push_config(config):
return config["family"]
else:
return None
else:
log.debug("%s - no luck.", repr(p))
return []
return None

def push_procmem(self, p: ProcessMemory, rip_binaries: bool = False) -> List[str]:
def push_procmem(
self, p: ProcessMemory, rip_binaries: bool = False
) -> Optional[str]:
"""
Pushes ProcessMemory object for extraction
@@ -198,24 +202,22 @@ def push_procmem(self, p: ProcessMemory, rip_binaries: bool = False) -> List[str
:param rip_binaries: Look for binaries (PE, ELF) in provided ProcessMemory and try to perform extraction using
specialized variants (ProcessMemoryPE, ProcessMemoryELF)
:type rip_binaries: bool (default: False)
:return: List of detected families
:return: Detected family if configuration looks better than already stored one
"""
matches = self.match_procmem(p)
if not matches:
log.debug("No Yara matches.")
return []
return None

binaries = self.carve_procmem(p) if rip_binaries else []

families = set(self._extract_procmem(p, matches))
family = self._extract_procmem(p, matches)
for binary in binaries:
families = families.union(set(self._extract_procmem(binary, matches)))
family = self._extract_procmem(binary, matches) or family
binary_image = binary.image
if binary_image:
families = families.union(
set(self._extract_procmem(binary_image, matches))
)
return list(families)
family = self._extract_procmem(binary_image, matches) or family
return family

@property
def config(self) -> List[Config]:
@@ -237,7 +239,7 @@ def __init__(self, parent: ExtractManager) -> None:
self.parent = parent #: Bound ExtractManager instance

@property
def family(self):
def family(self) -> Optional[str]:
"""Matched family"""
return self.collected_config.get("family")