-
Notifications
You must be signed in to change notification settings - Fork 0
/
wiper.py
291 lines (275 loc) · 8.81 KB
/
wiper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__app_name__ = 'WipeR'
__author__ = 'Markus Thilo'
__version__ = '0.5.3_2024-06-21'
__license__ = 'GPL-3'
__email__ = '[email protected]'
__status__ = 'Testing'
__description__ = '''
This is a wipe tool designed for SSDs and HDDs. There is also the possibility to overwrite files but without erasing file system metadata.
By default only unwiped blocks (or SSD pages) are overwritten though it is possible to force the overwriting of every block or even use a two pass wipe (1st pass writes random values). Instead of zeros you can choose to overwrite with a given byte value.
Whe the target is a physical drive, you can create a partition where (after a successful wipe) the log is copied into. A custom head for this log can be defined in a text file (wipe-head.txt by default).
Be aware that this module is extremely dangerous as it is designed to erase data!
'''
from sys import executable as __executable__
from subprocess import Popen, PIPE
from pathlib import Path
from argparse import ArgumentParser
from lib.timestamp import TimeStamp
from lib.extpath import ExtPath
from lib.logger import Logger
from lib.linutils import LinUtils
if Path(__file__).suffix.lower() == '.pyc':
__parent_path__ = Path(__executable__).parent
else:
__parent_path__ = Path(__file__).parent
class WipeR:
'''Frontend and Python wrapper for zd'''
MIN_BLOCKSIZE = 512
STD_BLOCKSIZE = 4096
MAX_BLOCKSIZE = 32768
def __init__(self):
'''Look for zd'''
self.zd_path = LinUtils.find_bin('zd', __parent_path__)
if self.zd_path:
self.available = True
else:
self.available = False
def wipe(self, targets,
verify = False,
allbytes = False,
extra = False,
value = False,
blocksize = None,
maxbadblocks = None,
maxretries = None,
log = None,
outdir = None,
echo = print
):
self.echo = echo
if len(targets) == 0:
raise FileNotFoundError('Missing drive or file(s) to wipe')
if verify and allbytes and extra:
raise RuntimeError(f'Too many arguments - you can perform normal wipe, all bytes, extra/2-pass or just verify')
if blocksize and (
blocksize % self.MIN_BLOCKSIZE != 0 or blocksize < self.MIN_BLOCKSIZE or blocksize > self.MAX_BLOCKSIZE
):
raise ValueError(f'Block size has to be n * {MIN_BLOCKSIZE}, >={MIN_BLOCKSIZE} and <={MAX_BLOCKSIZE}')
if value:
try:
int(value, 16)
except ValueError:
raise ValueError('Byte to overwrite with (-f/--value) has to be a hex value')
if int(value, 16) < 0 or int(value, 16) > 0xff:
raise ValueError('Byte to overwrite (-f/--value) has to be inbetween 00 and ff')
self.outdir = ExtPath.mkdir(outdir)
if log:
self.log = log
else:
self.log = Logger(
filename = f'{TimeStamp.now(path_comp=True, no_ms=True)}_wipe',
outdir = self.outdir,
head = 'wiper.WipeR',
echo = self.echo
)
if ExtPath.path(targets[0]).is_block_device():
if len(targets) != 1:
raise RuntimeError('Only one physical drive at a time')
if not verify:
for partition in LinUtils.lspart(targets[0]):
stdout, stderr = LinUtils.umount(partition)
if stderr:
self.log.warning(stderr, echo=True)
cmd = [f'{self.zd_path}']
if blocksize:
cmd.extend(['-b', f'{blocksize}'])
if value:
cmd.extend(['-f', f'{value}'])
if maxbadblocks:
cmd.extend(['-m', f'{maxbadblocks}'])
if maxretries:
cmd.extend(['-r', f'{maxretries}'])
if verify:
cmd.append('-v')
elif allbytes:
cmd.append('-a')
elif extra:
cmd.append('-x')
if self.echo == print:
echo = lambda msg: print(f'\r{msg}', end='')
else:
echo = lambda msg: self.echo(f'\n{msg}', overwrite=True)
for target in targets:
self.echo()
proc = Popen(cmd + [target], stdout=PIPE, stderr=PIPE, text=True)
for line in proc.stdout:
msg = line.strip()
if msg.startswith('...'):
echo(msg)
elif msg == '':
self.echo('')
else:
self.log.info(msg, echo=True)
if stderr := proc.stderr.read():
self.log.error(f'zd terminated with: {stderr}')
def mkfs(self, target,
fs = 'ntfs',
loghead = None,
mbr = False,
name = None,
mountpoint = None
):
'''Generate partition and file system'''
if loghead:
loghead = ExtPath.path(loghead)
else:
loghead = __parent_path__/'wipe-log-head.txt'
if not name:
name = 'Volume'
stdout, stderr = LinUtils.init_dev(target, mbr=mbr, fs=fs)
if stderr:
self.log.warning(stderr, echo=True)
for retry in range(10):
partitions = LinUtils.lspart(target).keys()
if partitions:
partition = list(partitions)[0]
break
if retry == 9:
self.log.warning('Could not create new partition', echo=True)
self.log.close()
return
stdout, stderr = LinUtils.mkfs(partition, fs=fs, label=name)
if stderr:
self.log.warning(stderr, echo=True)
if mountpoint:
mnt = ExtPath.mkdir(mountpoint)
else:
mnt = ExtPath.mkdir(self.outdir/'mnt')
stdout, stderr = LinUtils.mount(partition, mnt)
if stderr:
self.log.warning(stderr, echo=True)
self.log.close()
return
self.log.info('Disk preparation successful', echo=True)
self.log.close()
log_path = mnt/'wipe-log.txt'
try:
head = loghead.read_text()
except FileNotFoundError:
head = ''
with log_path.open('w') as fh:
fh.write(head + self.log.path.read_text())
if mountpoint:
return
stdout, stderr = LinUtils.umount(mnt)
if stderr:
raise RuntimeError(f'{stdout}\n{stderr}')
mnt.rmdir()
class WipeRCli(ArgumentParser):
'''CLI, also used for GUI of FallbackImager'''
def __init__(self, **kwargs):
'''Define CLI using argparser'''
super().__init__(description=__description__, **kwargs)
self.add_argument('-a', '--allbytes', action='store_true',
help='Write every byte/block (do not check before overwriting block)'
)
self.add_argument('-b', '--blocksize', type=int,
help='Block size in bytes (=n*512, >=512, <= 32768,default is 4096)', metavar='INTEGER'
)
self.add_argument('-c', '--create', type=str,
choices=['ntfs', 'fat32', 'exfat', 'NTFS', 'FAT32', 'EXFAT', 'ExFAT', 'exFAT'],
help='Create partition [fat32/exfat/ntfs] after wiping a physical drive',
metavar='STRING'
)
self.add_argument('-f', '--value', type=str,
help='Byte to overwrite with as hex (00 - ff)',
metavar='HEX_BYTE'
)
self.add_argument('-g', '--loghead', type=ExtPath.path,
help='Use the given file as head when writing log to new drive',
metavar='FILE'
)
self.add_argument('-m', '--mbr', action='store_true',
help='Use mbr instead of gpt Partition table (when target is a physical drive)'
)
self.add_argument('-n', '--name', type=str,
help='Name/label of the new partition (when target is a physical drive)',
metavar='STRING'
)
self.add_argument('-o', '--outdir', type=ExtPath.path,
help='Directory to write log', metavar='DIRECTORY'
)
self.add_argument('-p', '--mount', type=ExtPath.path,
help='Mountpoint to the new partition (when target is a physical drive)',
metavar='DIRECTORY'
)
self.add_argument('-q', '--maxbadblocks', type=int,
help='Abort after given number of bad blocks (default is 200)', metavar='INTEGER'
)
self.add_argument('-r', '--maxretries', type=int,
help='Maximum of retries after read or write error (default is 200)',
metavar='INTEGER'
)
self.add_argument('-v', '--verify', action='store_true',
help='Verify, but do not wipe'
)
self.add_argument('-x', '--extra', action='store_true',
help='Overwrite all bytes/blocks twice, write random bytes at 1st pass'
)
self.add_argument('targets', nargs='*', type=ExtPath.path,
help='Target blockdevice or file(s) (/dev/sdc)', metavar='BLOCKDEVICE/FILE'
)
def parse(self, *cmd):
'''Parse arguments'''
args = super().parse_args(*cmd)
self.targets = args.targets
self.allbytes = args.allbytes
self.blocksize = args.blocksize
self.create = args.create
self.loghead = args.loghead
self.maxbadblocks = args.maxbadblocks
self.maxretries = args.maxretries
self.mbr = args.mbr
self.mountpoint = args.mount
self.name = args.name
self.outdir = args.outdir
self.value = args.value
self.verify = args.verify
self.extra = args.extra
def run(self, echo=print):
'''Run zd'''
if self.verify and (
self.create or
self.extra or
self.mbr or
self.mountpoint or
self.name
):
raise RuntimeError(f'Arguments incompatible with --verify/-v')
wiper = WipeR()
wiper.wipe(self.targets,
allbytes = self.allbytes,
blocksize = self.blocksize,
maxbadblocks = self.maxbadblocks,
maxretries = self.maxretries,
outdir = self.outdir,
value = self.value,
verify = self.verify,
extra = self.extra,
echo = echo
)
if self.create:
wiper.mkfs(self.targets[0],
fs = self.create,
loghead = self.loghead,
mbr = self.mbr,
name = self.name,
mountpoint = self.mountpoint
)
wiper.log.close()
if __name__ == '__main__': # start here if called as application
app = WipeRCli()
app.parse()
app.run()