|
1 | | -import os |
2 | | -from importlib.util import find_spec |
3 | | -from ctypes import CDLL, Structure, POINTER, c_int32, byref, c_char_p, c_void_p, pointer |
4 | | -from typing import AnyStr, Callable, Any, Mapping, Type, Optional |
5 | | - |
6 | | -libinjector_path = find_spec('.libinjector', __package__).origin |
7 | | -libinjector = CDLL(libinjector_path) |
8 | | - |
9 | | -injector_t = type('injector_t', (Structure,), {}) |
10 | | -injector_pointer_t = POINTER(injector_t) |
11 | | -pid_t = c_int32 |
12 | | - |
13 | | -libinjector.injector_attach.argtypes = POINTER(injector_pointer_t), pid_t |
14 | | -libinjector.injector_attach.restype = c_int32 |
15 | | -libinjector.injector_inject.argtypes = injector_pointer_t, c_char_p, POINTER(c_void_p) |
16 | | -libinjector.injector_inject.restype = c_int32 |
17 | | -libinjector.injector_detach.argtypes = injector_pointer_t, |
18 | | -libinjector.injector_detach.restype = c_int32 |
19 | | -libinjector.injector_error.argtypes = () |
20 | | -libinjector.injector_error.restype = c_char_p |
21 | | - |
22 | | - |
23 | | -class PyInjectorError(Exception): |
24 | | - pass |
25 | | - |
26 | | - |
27 | | -class LibraryNotFoundException(PyInjectorError): |
28 | | - def __init__(self, path: bytes): |
29 | | - self.path = path |
30 | | - |
31 | | - def __str__(self): |
32 | | - return f'Could not find library: {self.path}' |
33 | | - |
34 | | - |
35 | | -class InjectorError(PyInjectorError): |
36 | | - def __init__(self, func_name: str, ret_val: int, error_str: Optional[bytes]): |
37 | | - self.func_name = func_name |
38 | | - self.ret_val = ret_val |
39 | | - self.error_str = error_str.decode() |
40 | | - |
41 | | - def __str__(self): |
42 | | - explanation = 'see error code definition in injector/include/injector.h' \ |
43 | | - if self.error_str is None else self.error_str |
44 | | - return '{} returned {}: {}'.format(self.func_name, self.ret_val, explanation) |
45 | | - |
46 | | - |
47 | | -class InjectorPermissionError(InjectorError): |
48 | | - def __str__(self): |
49 | | - return (super().__str__() + |
50 | | - '\nFailed attaching to process due to permission error.\n' |
51 | | - 'This is most likely due to ptrace scope limitations applied to the kernel for security purposes.\n' |
52 | | - 'Possible solutions:\n' |
53 | | - ' - Rerun as root\n' |
54 | | - ' - Temporarily remove ptrace scope limitations using ' |
55 | | - '`echo 0 | sudo tee /proc/sys/kernel/yama/ptrace_scope`\n' |
56 | | - ' - Persistently remove ptrace scope limitations by editing /etc/sysctl.d/10-ptrace.conf\n' |
57 | | - 'More details can be found here: https://stackoverflow.com/q/19215177/2907819') |
58 | | - |
59 | | - |
60 | | -def call_c_func(func: Callable[..., int], *args: Any, |
61 | | - exception_map: Mapping[int, Type[InjectorError]] = None) -> None: |
62 | | - ret = func(*args) |
63 | | - if ret != 0: |
64 | | - exception_map = {} if exception_map is None else exception_map |
65 | | - exception_cls = exception_map.get(ret, InjectorError) |
66 | | - raise exception_cls(func.__name__, ret, libinjector.injector_error()) |
67 | | - |
68 | | - |
69 | | -class Injector: |
70 | | - def __init__(self, injector_p: injector_pointer_t): |
71 | | - self.injector_p = injector_p |
72 | | - |
73 | | - @classmethod |
74 | | - def attach(cls, pid: int) -> 'Injector': |
75 | | - assert isinstance(pid, int) |
76 | | - injector_p = injector_pointer_t() |
77 | | - call_c_func(libinjector.injector_attach, byref(injector_p), pid, |
78 | | - exception_map={-8: InjectorPermissionError}) |
79 | | - return cls(injector_p) |
80 | | - |
81 | | - def inject(self, library_path: AnyStr) -> int: |
82 | | - if isinstance(library_path, str): |
83 | | - library_path = library_path.encode() |
84 | | - assert isinstance(library_path, bytes) |
85 | | - assert os.path.isfile(library_path), f'Library not found at "{library_path.decode()}"' |
86 | | - handle = c_void_p() |
87 | | - call_c_func(libinjector.injector_inject, self.injector_p, library_path, pointer(handle)) |
88 | | - return handle.value |
89 | | - |
90 | | - def detach(self) -> None: |
91 | | - call_c_func(libinjector.injector_detach, self.injector_p) |
92 | | - |
93 | | - |
94 | | -def inject(pid: int, library_path: AnyStr) -> int: |
95 | | - """ |
96 | | - Inject the shared library at library_path to the process (or thread) with the given pid. |
97 | | - Return the handle to the loaded library. |
98 | | - """ |
99 | | - if not os.path.isfile(library_path): |
100 | | - raise LibraryNotFoundException(library_path) |
101 | | - injector = Injector.attach(pid) |
102 | | - try: |
103 | | - return injector.inject(library_path) |
104 | | - finally: |
105 | | - injector.detach() |
| 1 | +import os |
| 2 | +from importlib.util import find_spec |
| 3 | +from ctypes import CDLL, Structure, POINTER, c_int32, byref, c_char_p, c_void_p, pointer |
| 4 | +from typing import AnyStr, Callable, Any, Mapping, Type, Optional |
| 5 | + |
| 6 | +libinjector_path = find_spec('.libinjector', __package__).origin |
| 7 | +libinjector = CDLL(libinjector_path) |
| 8 | + |
| 9 | +injector_t = type('injector_t', (Structure,), {}) |
| 10 | +injector_pointer_t = POINTER(injector_t) |
| 11 | +pid_t = c_int32 |
| 12 | + |
| 13 | +libinjector.injector_attach.argtypes = POINTER(injector_pointer_t), pid_t |
| 14 | +libinjector.injector_attach.restype = c_int32 |
| 15 | +libinjector.injector_inject.argtypes = injector_pointer_t, c_char_p, POINTER(c_void_p) |
| 16 | +libinjector.injector_inject.restype = c_int32 |
| 17 | +libinjector.injector_detach.argtypes = injector_pointer_t, |
| 18 | +libinjector.injector_detach.restype = c_int32 |
| 19 | +libinjector.injector_error.argtypes = () |
| 20 | +libinjector.injector_error.restype = c_char_p |
| 21 | + |
| 22 | + |
| 23 | +class PyInjectorError(Exception): |
| 24 | + pass |
| 25 | + |
| 26 | + |
| 27 | +class LibraryNotFoundException(PyInjectorError): |
| 28 | + def __init__(self, path: bytes): |
| 29 | + self.path = path |
| 30 | + |
| 31 | + def __str__(self): |
| 32 | + return f'Could not find library: {self.path}' |
| 33 | + |
| 34 | + |
| 35 | +class InjectorError(PyInjectorError): |
| 36 | + def __init__(self, func_name: str, ret_val: int, error_str: Optional[bytes]): |
| 37 | + self.func_name = func_name |
| 38 | + self.ret_val = ret_val |
| 39 | + self.error_str = error_str.decode() |
| 40 | + |
| 41 | + def __str__(self): |
| 42 | + explanation = 'see error code definition in injector/include/injector.h' \ |
| 43 | + if self.error_str is None else self.error_str |
| 44 | + return '{} returned {}: {}'.format(self.func_name, self.ret_val, explanation) |
| 45 | + |
| 46 | + |
| 47 | +class InjectorPermissionError(InjectorError): |
| 48 | + def __str__(self): |
| 49 | + return (super().__str__() + |
| 50 | + '\nFailed attaching to process due to permission error.\n' |
| 51 | + 'This is most likely due to ptrace scope limitations applied to the kernel for security purposes.\n' |
| 52 | + 'Possible solutions:\n' |
| 53 | + ' - Rerun as root\n' |
| 54 | + ' - Temporarily remove ptrace scope limitations using ' |
| 55 | + '`echo 0 | sudo tee /proc/sys/kernel/yama/ptrace_scope`\n' |
| 56 | + ' - Persistently remove ptrace scope limitations by editing /etc/sysctl.d/10-ptrace.conf\n' |
| 57 | + 'More details can be found here: https://stackoverflow.com/q/19215177/2907819') |
| 58 | + |
| 59 | + |
| 60 | +def call_c_func(func: Callable[..., int], *args: Any, |
| 61 | + exception_map: Mapping[int, Type[InjectorError]] = None) -> None: |
| 62 | + ret = func(*args) |
| 63 | + if ret != 0: |
| 64 | + exception_map = {} if exception_map is None else exception_map |
| 65 | + exception_cls = exception_map.get(ret, InjectorError) |
| 66 | + raise exception_cls(func.__name__, ret, libinjector.injector_error()) |
| 67 | + |
| 68 | + |
| 69 | +class Injector: |
| 70 | + def __init__(self, injector_p: injector_pointer_t): |
| 71 | + self.injector_p = injector_p |
| 72 | + |
| 73 | + @classmethod |
| 74 | + def attach(cls, pid: int) -> 'Injector': |
| 75 | + assert isinstance(pid, int) |
| 76 | + injector_p = injector_pointer_t() |
| 77 | + call_c_func(libinjector.injector_attach, byref(injector_p), pid, |
| 78 | + exception_map={-8: InjectorPermissionError}) |
| 79 | + return cls(injector_p) |
| 80 | + |
| 81 | + def inject(self, library_path: AnyStr) -> int: |
| 82 | + if isinstance(library_path, str): |
| 83 | + library_path = library_path.encode() |
| 84 | + assert isinstance(library_path, bytes) |
| 85 | + assert os.path.isfile(library_path), f'Library not found at "{library_path.decode()}"' |
| 86 | + handle = c_void_p() |
| 87 | + call_c_func(libinjector.injector_inject, self.injector_p, library_path, pointer(handle)) |
| 88 | + return handle.value |
| 89 | + |
| 90 | + def detach(self) -> None: |
| 91 | + call_c_func(libinjector.injector_detach, self.injector_p) |
| 92 | + |
| 93 | + |
| 94 | +def inject(pid: int, library_path: AnyStr) -> int: |
| 95 | + """ |
| 96 | + Inject the shared library at library_path to the process (or thread) with the given pid. |
| 97 | + Return the handle to the loaded library. |
| 98 | + """ |
| 99 | + if not os.path.isfile(library_path): |
| 100 | + raise LibraryNotFoundException(library_path) |
| 101 | + injector = Injector.attach(pid) |
| 102 | + try: |
| 103 | + return injector.inject(library_path) |
| 104 | + finally: |
| 105 | + injector.detach() |
0 commit comments