Closed
Description
What are you trying to accomplish?
convert to pydra task/workflow and setup such pipeline
What have you tried?
still have trouble with nipype.interfaces.dcm2nii.Dcm2niix so any suggestion welcomed
python script
#!/home/shengwei/anaconda3/bin/python
from logging import INFO, basicConfig, error, log
# from grp import getgrnam
from nipype.interfaces.dcm2nii import Dcm2niix
from os import chown, mkdir, path, scandir, walk
# from pwd import getpwnam
from re import IGNORECASE, search
from shutil import rmtree
from tarfile import is_tarfile, open
from zipfile import BadZipFile, ZipFile, is_zipfile
_STAGING_DIR = 'staging'
_CONVERT_BASE_DIR = 'uc'
class MriWrapper(object):
"""This class obtains meta data (i.e. date, visit, projid, protocol) of scan from its path"""
def __init__(self, path):
self.path = path
scankey_search = search('\d{6}_\d{2}_\d{8}', path)
if scankey_search is None:
sys.exit('Could not find scan key from ' + path)
else:
scan_key = scankey_search.group(0).split('_')
self.date = scan_key[0]
self.visit = scan_key[1]
self.projid = scan_key[2]
if search('bannockburn', path):
self.protocol = ('bannockburn',)
elif search('mg', path):
self.protocol = ('mg',)
elif search('uc', path):
self.protocol = ('uc',)
else:
sys.exit('No such active or valid protocol exists')
def find_dicom(base_path):
"""find dicom files"""
for entry in scandir(base_path):
if not entry.name.startswith('.') and entry.is_file():
if entry.name.endswith('.zip', IGNORECASE) or entry.name.endswith('.tar.gz', IGNORECASE):
if not search('nii|nifti|nifti|par|P\d{5,6}.zip', entry.name, IGNORECASE):
try:
if is_zipfile(entry.path):
if 'dicom' in entry.name or 'DICOM'in entry.name:
return entry
else:
sys.exit('Zip file name does not contain "dicom"')
elif is_tarfile(entry.path):
if check_dicom_tar(open(entry.path, 'r|gz')):
return entry
except RuntimeError:
sys.exit('DICOM not found or invalid in ' + base_path)
# TODO: speed up by iterating over files manually and returning when dicom is found (vs using getnames())
def check_dicom_tar(tar_file):
for entry in tar_file.getnames():
if search('dicom', entry, IGNORECASE) is not None:
return True
return False
def get_dcm2niix_path():
"""find the path for dcm2niix input folder, assuming it is the ONLY deepest path"""
stage_entries = [entry for entry in walk(_STAGING_DIR)]
return stage_entries[-1][0]
def convert_dicom_dir(dicom_dir):
"""convert directory with dicom files"""
print('Checking ' + dicom_dir)
basicConfig(filename='convert.log', filemode='w', level=INFO)
dicom = find_dicom(dicom_dir)
if not dicom:
error('No DICOM in ' + dicom_dir)
else:
log(INFO, 'Converting ' + dicom.path)
wrapper = MriWrapper(dicom.path)
output_dir = path.join(path.dirname(dicom.path), wrapper.projid + '_' + wrapper.visit + '_nii')
if not path.exists(output_dir):
mkdir(output_dir)
convert_dicom_file(dicom_dir, dicom, output_dir)
print('Clearing staging folders')
rmtree(_STAGING_DIR)
mkdir(_STAGING_DIR)
# userId = getpwnam('mriadmin').pw_uid
# groupId = getgrnam('mri').gr_gid
# for root, dirs, files in walk(output_dir):
# for subdir in dirs:
# chown(path.join(root, subdir), userId, groupId)
# for file in files:
# chown(path.join(root, file), userId, groupId)
def convert_dicom_file(dicom_dir, dicom_file, output_dir, merge_files=False):
"""convert dicom compressed zip/tar file"""
# unzip dicom to staging directory
if dicom_file.name.endswith(".zip", IGNORECASE):
print("Unzipping " + path.join(dicom_dir, dicom_file.name))
try:
ZipFile(dicom_file.path).extractall(_STAGING_DIR)
except BadZipfile:
sys.exit('Bad zipfile! Skipping')
elif dicom_file.name.endswith(".tar.gz", IGNORECASE):
print("Untarring " + dicom_file.name)
open(dicom_file.path, "r:gz").extractall(_STAGING_DIR)
else:
sys.exit('DICOM entry not recognized. Skipping ' + dicom_file.name)
try:
converter = Dcm2niix()
converter.inputs.source_dir = get_dcm2niix_path()
converter.inputs.output_dir = output_dir
converter.inputs.ignore_deriv = True
converter.inputs.merge_imgs = merge_files
converter.inputs.single_file = True
converter.inputs.out_filename = "%d"
converter.inputs.bids_format = True
converter.inputs.anon_bids = True
print(converter.cmdline)
# system('dcm2niix -i y -f %d -v n -z y -o ' + dst_dir + ' ' + src_dir)
except RuntimeError:
rmtree(output_dir)
sys.exit('Error converting, removed ' + output_dir)
if __name__ == '__main__':
scans = [folder[0] for folder in walk(_CONVERT_BASE_DIR)][1:]
for scan in scans:
if 'nii' not in scan:
convert_dicom_dir(scan)