diff --git a/.gitignore b/.gitignore index d6996550dc..df018f0ead 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ .project .pydevproject *.py.orig +.DS_Store # Not sure what the next one is for *.kpf diff --git a/bin/nib-dicomfs b/bin/nib-dicomfs index 115fd4e486..05b6a50afc 100755 --- a/bin/nib-dicomfs +++ b/bin/nib-dicomfs @@ -9,213 +9,7 @@ ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## # Copyright (C) 2011 Christian Haselgrove -import sys -import os -import stat -import errno -import time -import locale -import logging -import fuse -import nibabel as nib -import nibabel.dft as dft - -from optparse import OptionParser, Option - -uid = os.getuid() -gid = os.getgid() -encoding = locale.getdefaultlocale()[1] - -fuse.fuse_python_api = (0, 2) - -logger = logging.getLogger('nibabel.dft') - -class FileHandle: - - def __init__(self, fno): - self.fno = fno - self.keep_cache = False - self.direct_io = False - return - - def __str__(self): - return 'FileHandle(%d)' % self.fno - -class DICOMFS(fuse.Fuse): - - def __init__(self, *args, **kwargs): - self.followlinks = kwargs.pop('followlinks', False) - fuse.Fuse.__init__(self, *args, **kwargs) - self.fhs = {} - return - - def get_paths(self): - paths = {} - for study in dft.get_studies(self.dicom_path, self.followlinks): - pd = paths.setdefault(study.patient_name_or_uid(), {}) - patient_info = 'patient information\n' - patient_info = 'name: %s\n' % study.patient_name - patient_info += 'ID: %s\n' % study.patient_id - patient_info += 'birth date: %s\n' % study.patient_birth_date - patient_info += 'sex: %s\n' % study.patient_sex - pd['INFO'] = patient_info.encode('ascii', 'replace') - study_datetime = '%s_%s' % (study.date, study.time) - study_info = 'study info\n' - study_info += 'UID: %s\n' % study.uid - study_info += 'date: %s\n' % study.date - study_info += 'time: %s\n' % study.time - study_info += 'comments: %s\n' % study.comments - d = {'INFO': study_info.encode('ascii', 'replace')} - for series in study.series: - series_info = 'series info\n' - series_info += 'UID: %s\n' % series.uid - series_info += 'number: %s\n' % series.number - series_info += 'description: %s\n' % series.description - series_info += 'rows: %d\n' % series.rows - series_info += 'columns: %d\n' % series.columns - series_info += 'bits allocated: %d\n' % series.bits_allocated - series_info += 'bits stored: %d\n' % series.bits_stored - series_info += 'storage instances: %d\n' % len(series.storage_instances) - d[series.number] = {'INFO': series_info.encode('ascii', 'replace'), - '%s.nii' % series.number: (series.nifti_size, series.as_nifti), - '%s.png' % series.number: (series.png_size, series.as_png)} - pd[study_datetime] = d - return paths - - def match_path(self, path): - wd = self.get_paths() - if path == '/': - logger.debug('return root') - return wd - for part in path.lstrip('/').split('/'): - logger.debug("path:%s part:%s" % (path, part)) - if part not in wd: - return None - wd = wd[part] - logger.debug('return') - return wd - - def readdir(self, path, fh): - logger.info('readdir %s' % (path,)) - matched_path = self.match_path(path) - if matched_path is None: - return -errno.ENOENT - logger.debug('matched %s' % (matched_path,)) - fnames = [ k.encode('ascii', 'replace') for k in matched_path.keys() ] - fnames.append('.') - fnames.append('..') - return [ fuse.Direntry(f) for f in fnames ] - - def getattr(self, path): - logger.debug('getattr %s' % path) - matched_path = self.match_path(path) - logger.debug('matched: %s' % (matched_path,)) - now = time.time() - st = fuse.Stat() - if isinstance(matched_path, dict): - st.st_mode = stat.S_IFDIR | 0755 - st.st_ctime = now - st.st_mtime = now - st.st_atime = now - st.st_uid = uid - st.st_gid = gid - st.st_nlink = len(matched_path) - return st - if isinstance(matched_path, str): - st.st_mode = stat.S_IFREG | 0644 - st.st_ctime = now - st.st_mtime = now - st.st_atime = now - st.st_uid = uid - st.st_gid = gid - st.st_size = len(matched_path) - st.st_nlink = 1 - return st - if isinstance(matched_path, tuple): - st.st_mode = stat.S_IFREG | 0644 - st.st_ctime = now - st.st_mtime = now - st.st_atime = now - st.st_uid = uid - st.st_gid = gid - st.st_size = matched_path[0]() - st.st_nlink = 1 - return st - return -errno.ENOENT - - def open(self, path, flags): - logger.debug('open %s' % (path,)) - matched_path = self.match_path(path) - if matched_path is None: - return -errno.ENOENT - for i in range(1, 10): - if i not in self.fhs: - if isinstance(matched_path, str): - self.fhs[i] = matched_path - elif isinstance(matched_path, tuple): - self.fhs[i] = matched_path[1]() - else: - raise -errno.EFTYPE - return FileHandle(i) - raise -errno.ENFILE - - # not done - def read(self, path, size, offset, fh): - logger.debug('read') - logger.debug(path) - logger.debug(size) - logger.debug(offset) - logger.debug(fh) - return self.fhs[fh.fno][offset:offset+size] - - def release(self, path, flags, fh): - logger.debug('release') - logger.debug(path) - logger.debug(fh) - del self.fhs[fh.fno] - return - -def get_opt_parser(): - # use module docstring for help output - p = OptionParser( - usage="%s [OPTIONS] " - % os.path.basename(sys.argv[0]), - version="%prog " + nib.__version__) - - p.add_options([ - Option("-v", "--verbose", action="count", - dest="verbose", default=0, - help="make noise. Could be specified multiple times"), - ]) - - p.add_options([ - Option("-L", "--follow-links", action="store_true", - dest="followlinks", default=False, - help="Follow symbolic links in DICOM directory"), - ]) - return p +from nibabel.cmdline.dicomfs import main if __name__ == '__main__': - parser = get_opt_parser() - (opts, files) = parser.parse_args() - - if opts.verbose: - logger.addHandler(logging.StreamHandler(sys.stdout)) - logger.setLevel(opts.verbose > 1 and logging.DEBUG or logging.INFO) - - if len(files) != 2: - sys.stderr.write("Please provide two arguments:\n%s\n" % parser.usage) - sys.exit(1) - - fs = DICOMFS(dash_s_do='setsingle', followlinks=opts.followlinks) - fs.parse(['-f', '-s', files[1]]) - fs.dicom_path = files[0].decode(encoding) - try: - fs.main() - except fuse.FuseError: - # fuse prints the error message - sys.exit(1) - - sys.exit(0) - -# eof + main() \ No newline at end of file diff --git a/bin/nib-nifti-dx b/bin/nib-nifti-dx index 40122acd16..d317585286 100755 --- a/bin/nib-nifti-dx +++ b/bin/nib-nifti-dx @@ -8,32 +8,8 @@ # ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## ''' Print nifti diagnostics for header files ''' -from __future__ import division, print_function, absolute_import - -import sys - -from optparse import OptionParser - -import nibabel as nib - - -def main(): - """ Go go team """ - parser = OptionParser( - usage="%s [FILE ...]\n\n" % sys.argv[0] + __doc__, - version="%prog " + nib.__version__) - (opts, files) = parser.parse_args() - - for fname in files: - with nib.openers.ImageOpener(fname) as fobj: - hdr = fobj.read(nib.nifti1.header_dtype.itemsize) - result = nib.Nifti1Header.diagnose_binaryblock(hdr) - if len(result): - print('Picky header check output for "%s"\n' % fname) - print(result + '\n') - else: - print('Header for "%s" is clean' % fname) +from nibabel.cmdline.nifti_dx import main if __name__ == '__main__': main() diff --git a/bin/parrec2nii b/bin/parrec2nii index 4856af9986..27a1abca05 100755 --- a/bin/parrec2nii +++ b/bin/parrec2nii @@ -2,7 +2,7 @@ """PAR/REC to NIfTI converter """ -from nibabel.parrec2nii_cmd import main +from nibabel.cmdline.parrec2nii import main if __name__ == '__main__': diff --git a/nibabel/cmdline/dicomfs.py b/nibabel/cmdline/dicomfs.py new file mode 100644 index 0000000000..c54c07f966 --- /dev/null +++ b/nibabel/cmdline/dicomfs.py @@ -0,0 +1,241 @@ +#!python +# emacs: -*- mode: python-mode; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## +# +# See COPYING file distributed along with the NiBabel package for the +# copyright and license terms. +# +### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## +# Copyright (C) 2011 Christian Haselgrove + +import sys +import os +import stat +import errno +import time +import locale +import logging + + +class dummy_fuse(object): + """Dummy fuse "module" so that nose does not blow during doctests""" + Fuse = object + + +try: + import fuse + uid = os.getuid() + gid = os.getgid() +except ImportError: + fuse = dummy_fuse + +import nibabel as nib +import nibabel.dft as dft + +from optparse import OptionParser, Option + +encoding = locale.getdefaultlocale()[1] + +fuse.fuse_python_api = (0, 2) + +logger = logging.getLogger('nibabel.dft') + + +class FileHandle: + + def __init__(self, fno): + self.fno = fno + self.keep_cache = False + self.direct_io = False + return + + def __str__(self): + return 'FileHandle(%d)' % self.fno + + +class DICOMFS(fuse.Fuse): + + def __init__(self, *args, **kwargs): + if fuse is dummy_fuse: + raise RuntimeError( + "fuse module is not available, install it to use DICOMFS") + self.followlinks = kwargs.pop('followlinks', False) + self.dicom_path = kwargs.pop('dicom_path', None) + fuse.Fuse.__init__(self, *args, **kwargs) + self.fhs = {} + return + + def get_paths(self): + paths = {} + for study in dft.get_studies(self.dicom_path, self.followlinks): + pd = paths.setdefault(study.patient_name_or_uid(), {}) + patient_info = 'patient information\n' + patient_info = 'name: %s\n' % study.patient_name + patient_info += 'ID: %s\n' % study.patient_id + patient_info += 'birth date: %s\n' % study.patient_birth_date + patient_info += 'sex: %s\n' % study.patient_sex + pd['INFO'] = patient_info.encode('ascii', 'replace') + study_datetime = '%s_%s' % (study.date, study.time) + study_info = 'study info\n' + study_info += 'UID: %s\n' % study.uid + study_info += 'date: %s\n' % study.date + study_info += 'time: %s\n' % study.time + study_info += 'comments: %s\n' % study.comments + d = {'INFO': study_info.encode('ascii', 'replace')} + for series in study.series: + series_info = 'series info\n' + series_info += 'UID: %s\n' % series.uid + series_info += 'number: %s\n' % series.number + series_info += 'description: %s\n' % series.description + series_info += 'rows: %d\n' % series.rows + series_info += 'columns: %d\n' % series.columns + series_info += 'bits allocated: %d\n' % series.bits_allocated + series_info += 'bits stored: %d\n' % series.bits_stored + series_info += 'storage instances: %d\n' % len(series.storage_instances) + d[series.number] = {'INFO': series_info.encode('ascii', 'replace'), + '%s.nii' % series.number: (series.nifti_size, series.as_nifti), + '%s.png' % series.number: (series.png_size, series.as_png)} + pd[study_datetime] = d + return paths + + def match_path(self, path): + wd = self.get_paths() + if path == '/': + logger.debug('return root') + return wd + for part in path.lstrip('/').split('/'): + logger.debug("path:%s part:%s" % (path, part)) + if part not in wd: + return None + wd = wd[part] + logger.debug('return') + return wd + + def readdir(self, path, fh): + logger.info('readdir %s' % (path,)) + matched_path = self.match_path(path) + if matched_path is None: + return -errno.ENOENT + logger.debug('matched %s' % (matched_path,)) + fnames = [k.encode('ascii', 'replace') for k in matched_path.keys()] + fnames.append('.') + fnames.append('..') + return [fuse.Direntry(f) for f in fnames] + + def getattr(self, path): + logger.debug('getattr %s' % path) + matched_path = self.match_path(path) + logger.debug('matched: %s' % (matched_path,)) + now = time.time() + st = fuse.Stat() + if isinstance(matched_path, dict): + st.st_mode = stat.S_IFDIR | 0o755 + st.st_ctime = now + st.st_mtime = now + st.st_atime = now + st.st_uid = uid + st.st_gid = gid + st.st_nlink = len(matched_path) + return st + if isinstance(matched_path, str): + st.st_mode = stat.S_IFREG | 0o644 + st.st_ctime = now + st.st_mtime = now + st.st_atime = now + st.st_uid = uid + st.st_gid = gid + st.st_size = len(matched_path) + st.st_nlink = 1 + return st + if isinstance(matched_path, tuple): + st.st_mode = stat.S_IFREG | 0o644 + st.st_ctime = now + st.st_mtime = now + st.st_atime = now + st.st_uid = uid + st.st_gid = gid + st.st_size = matched_path[0]() + st.st_nlink = 1 + return st + return -errno.ENOENT + + def open(self, path, flags): + logger.debug('open %s' % (path,)) + matched_path = self.match_path(path) + if matched_path is None: + return -errno.ENOENT + for i in range(1, 10): + if i not in self.fhs: + if isinstance(matched_path, str): + self.fhs[i] = matched_path + elif isinstance(matched_path, tuple): + self.fhs[i] = matched_path[1]() + else: + raise -errno.EFTYPE + return FileHandle(i) + raise -errno.ENFILE + + # not done + def read(self, path, size, offset, fh): + logger.debug('read') + logger.debug(path) + logger.debug(size) + logger.debug(offset) + logger.debug(fh) + return self.fhs[fh.fno][offset:offset + size] + + def release(self, path, flags, fh): + logger.debug('release') + logger.debug(path) + logger.debug(fh) + del self.fhs[fh.fno] + return + + +def get_opt_parser(): + # use module docstring for help output + p = OptionParser( + usage="%s [OPTIONS] " + % os.path.basename(sys.argv[0]), + version="%prog " + nib.__version__) + + p.add_options([ + Option("-v", "--verbose", action="count", + dest="verbose", default=0, + help="make noise. Could be specified multiple times"), + ]) + + p.add_options([ + Option("-L", "--follow-links", action="store_true", + dest="followlinks", default=False, + help="Follow symbolic links in DICOM directory"), + ]) + return p + + +def main(args=None): + parser = get_opt_parser() + (opts, files) = parser.parse_args(args=args) + + if opts.verbose: + logger.addHandler(logging.StreamHandler(sys.stdout)) + logger.setLevel(opts.verbose > 1 and logging.DEBUG or logging.INFO) + + if len(files) != 2: + sys.stderr.write("Please provide two arguments:\n%s\n" % parser.usage) + sys.exit(1) + + fs = DICOMFS( + dash_s_do='setsingle', + followlinks=opts.followlinks, + dicom_path=files[0].decode(encoding) + ) + fs.parse(['-f', '-s', files[1]]) + try: + fs.main() + except fuse.FuseError: + # fuse prints the error message + sys.exit(1) + + sys.exit(0) diff --git a/nibabel/cmdline/ls.py b/nibabel/cmdline/ls.py index 98f75e21dc..f919700247 100755 --- a/nibabel/cmdline/ls.py +++ b/nibabel/cmdline/ls.py @@ -21,8 +21,7 @@ import nibabel.cmdline.utils from nibabel.cmdline.utils import _err, verbose, table2string, ap, safe_get -__author__ = 'Yaroslav Halchenko' -__copyright__ = 'Copyright (c) 2011-2016 Yaroslav Halchenko ' \ +__copyright__ = 'Copyright (c) 2011-18 Yaroslav Halchenko ' \ 'and NiBabel contributors' __license__ = 'MIT' @@ -153,11 +152,11 @@ def proc_file(f, opts): return row -def main(): +def main(args=None): """Show must go on""" parser = get_opt_parser() - (opts, files) = parser.parse_args() + (opts, files) = parser.parse_args(args=args) nibabel.cmdline.utils.verbose_level = opts.verbose diff --git a/nibabel/cmdline/nifti_dx.py b/nibabel/cmdline/nifti_dx.py new file mode 100644 index 0000000000..e478b5a5c2 --- /dev/null +++ b/nibabel/cmdline/nifti_dx.py @@ -0,0 +1,38 @@ +#!python +# emacs: -*- mode: python-mode; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## +# +# See COPYING file distributed along with the NiBabel package for the +# copyright and license terms. +# +### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## +''' Print nifti diagnostics for header files ''' + +import sys +from optparse import OptionParser + +import nibabel as nib + +__author__ = 'Matthew Brett' +__copyright__ = 'Copyright (c) 2011-18 Matthew Brett ' \ + 'and NiBabel contributors' +__license__ = 'MIT' + + +def main(args=None): + """ Go go team """ + parser = OptionParser( + usage="%s [FILE ...]\n\n" % sys.argv[0] + __doc__, + version="%prog " + nib.__version__) + (opts, files) = parser.parse_args(args=args) + + for fname in files: + with nib.openers.ImageOpener(fname) as fobj: + hdr = fobj.read(nib.nifti1.header_dtype.itemsize) + result = nib.Nifti1Header.diagnose_binaryblock(hdr) + if len(result): + print('Picky header check output for "%s"\n' % fname) + print(result + '\n') + else: + print('Header for "%s" is clean' % fname) diff --git a/nibabel/parrec2nii_cmd.py b/nibabel/cmdline/parrec2nii.py similarity index 100% rename from nibabel/parrec2nii_cmd.py rename to nibabel/cmdline/parrec2nii.py diff --git a/nibabel/cmdline/tests/__init__.py b/nibabel/cmdline/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/nibabel/tests/test_parrec2nii.py b/nibabel/cmdline/tests/test_parrec2nii.py similarity index 91% rename from nibabel/tests/test_parrec2nii.py rename to nibabel/cmdline/tests/test_parrec2nii.py index aa018b24d0..c5b5831270 100644 --- a/nibabel/tests/test_parrec2nii.py +++ b/nibabel/cmdline/tests/test_parrec2nii.py @@ -6,7 +6,7 @@ from numpy import array as npa import nibabel -from nibabel import parrec2nii_cmd as parrec2nii +from nibabel.cmdline import parrec2nii from mock import Mock, MagicMock, patch from nose.tools import assert_true @@ -29,10 +29,10 @@ [ 0. , 0. , 0. , 1. ]]) -@patch('nibabel.parrec2nii_cmd.verbose') -@patch('nibabel.parrec2nii_cmd.io_orientation') -@patch('nibabel.parrec2nii_cmd.nifti1') -@patch('nibabel.parrec2nii_cmd.pr') +@patch('nibabel.cmdline.parrec2nii.verbose') +@patch('nibabel.cmdline.parrec2nii.io_orientation') +@patch('nibabel.cmdline.parrec2nii.nifti1') +@patch('nibabel.cmdline.parrec2nii.pr') def test_parrec2nii_sets_qform_sform_code1(*args): # Check that set_sform(), set_qform() are called on the new header. parrec2nii.verbose.switch = False @@ -67,7 +67,7 @@ def test_parrec2nii_sets_qform_sform_code1(*args): nhdr.set_sform.assert_called_with(AN_OLD_AFFINE, code=1) -@patch('nibabel.parrec2nii_cmd.verbose') +@patch('nibabel.cmdline.parrec2nii.verbose') def test_parrec2nii_save_load_qform_code(*args): # Tests that after parrec2nii saves file, it has the sform and qform 'code' # set to '1', which means 'scanner', so that other software, e.g. FSL picks diff --git a/nibabel/tests/test_scripts.py b/nibabel/tests/test_scripts.py index 941e2271b0..cb24f893d2 100644 --- a/nibabel/tests/test_scripts.py +++ b/nibabel/tests/test_scripts.py @@ -128,6 +128,25 @@ def test_nib_ls_multiple(): ) +@script_test +def test_help(): + for cmd in ['parrec2nii', 'nib-dicomfs', 'nib-ls', 'nib-nifti-dx']: + if cmd == 'nib-dicomfs': + # needs special treatment since depends on fuse module which + # might not be available. + try: + import fuse + except Exception: + continue # do not test this one + code, stdout, stderr = run_command([cmd, '--help']) + assert_equal(code, 0) + assert_re_in(".*%s" % cmd, stdout) + assert_re_in(".*Usage", stdout) + # Some third party modules might like to announce some Deprecation + # etc warnings, see e.g. https://travis-ci.org/nipy/nibabel/jobs/370353602 + if 'warning' not in stderr.lower(): + assert_equal(stderr, '') + @script_test def test_nib_nifti_dx():