From 98861a03b78dc33c0fcfc08598d77e5f61edd8de Mon Sep 17 00:00:00 2001 From: Chris Gorgolewski Date: Mon, 11 Nov 2019 15:32:30 -0500 Subject: [PATCH 1/7] converted test_volumeutils to pytest --- .azure-pipelines/windows.yml | 2 +- .travis.yml | 2 +- nibabel/tests/test_volumeutils.py | 535 +++++++++++++++--------------- 3 files changed, 268 insertions(+), 271 deletions(-) diff --git a/.azure-pipelines/windows.yml b/.azure-pipelines/windows.yml index 826c7ddc41..2d63db68e0 100644 --- a/.azure-pipelines/windows.yml +++ b/.azure-pipelines/windows.yml @@ -41,7 +41,7 @@ jobs: cd for_testing cp ../.coveragerc . nosetests --with-doctest --with-coverage --cover-package nibabel nibabel - pytest -v ../nibabel/tests/test_affines.py + pytest -v ../nibabel/tests/test_affines.py ../nibabel/tests/test_volumeutils.py displayName: 'Nose tests' - script: | cd for_testing diff --git a/.travis.yml b/.travis.yml index 39f1a14a45..ea4eb22291 100644 --- a/.travis.yml +++ b/.travis.yml @@ -132,7 +132,7 @@ script: cd for_testing cp ../.coveragerc . nosetests --with-doctest --with-coverage --cover-package nibabel nibabel - pytest -v ../nibabel/tests/test_affines.py + pytest -v ../nibabel/tests/test_affines.py ../nibabel/tests/test_volumeutils.py else false fi diff --git a/nibabel/tests/test_volumeutils.py b/nibabel/tests/test_volumeutils.py index 6eeb6c6e55..12f93ef70d 100644 --- a/nibabel/tests/test_volumeutils.py +++ b/nibabel/tests/test_volumeutils.py @@ -20,6 +20,7 @@ import bz2 import threading import time +import pytest import numpy as np @@ -54,10 +55,8 @@ from numpy.testing import (assert_array_almost_equal, assert_array_equal) -from nose.tools import assert_true, assert_false, assert_equal, assert_raises - -from ..testing import (assert_dt_equal, assert_allclose_safely, - suppress_warnings, clear_and_catch_warnings) +from ..testing_pytest import (assert_dt_equal, assert_allclose_safely, + suppress_warnings, clear_and_catch_warnings) #: convenience variables for numpy types FLOAT_TYPES = np.sctypes['float'] @@ -77,7 +76,7 @@ def test__is_compressed_fobj(): fname = 'test.bin' + ext for mode in ('wb', 'rb'): fobj = opener(fname, mode) - assert_equal(_is_compressed_fobj(fobj), compressed) + assert _is_compressed_fobj(fobj) == compressed fobj.close() @@ -108,20 +107,20 @@ def make_array(n, bytes): contents1 = bytearray(4 * n) fobj_r.readinto(contents1) # Second element is 1 - assert_false(contents1[0:8] == b'\x00' * 8) + assert contents1[0:8] != b'\x00' * 8 out_arr = make_array(n, contents1) assert_array_equal(in_arr, out_arr) # Set second element to 0 out_arr[1] = 0 # Show this changed the bytes string - assert_equal(contents1[:8], b'\x00' * 8) + assert contents1[:8] == b'\x00' * 8 # Reread, to get unmodified contents fobj_r.seek(0) contents2 = bytearray(4 * n) fobj_r.readinto(contents2) out_arr2 = make_array(n, contents2) assert_array_equal(in_arr, out_arr2) - assert_equal(out_arr[1], 0) + assert out_arr[1] == 0 finally: fobj_r.close() os.unlink(fname) @@ -133,30 +132,30 @@ def test_array_from_file(): in_arr = np.arange(24, dtype=dtype).reshape(shape) # Check on string buffers offset = 0 - assert_true(buf_chk(in_arr, BytesIO(), None, offset)) + assert buf_chk(in_arr, BytesIO(), None, offset) offset = 10 - assert_true(buf_chk(in_arr, BytesIO(), None, offset)) + assert buf_chk(in_arr, BytesIO(), None, offset) # check on real file fname = 'test.bin' with InTemporaryDirectory(): # fortran ordered out_buf = open(fname, 'wb') in_buf = open(fname, 'rb') - assert_true(buf_chk(in_arr, out_buf, in_buf, offset)) + assert buf_chk(in_arr, out_buf, in_buf, offset) # Drop offset to check that shape's not coming from file length out_buf.seek(0) in_buf.seek(0) offset = 5 - assert_true(buf_chk(in_arr, out_buf, in_buf, offset)) + assert buf_chk(in_arr, out_buf, in_buf, offset) del out_buf, in_buf # Make sure empty shape, and zero length, give empty arrays arr = array_from_file((), np.dtype('f8'), BytesIO()) - assert_equal(len(arr), 0) + assert len(arr) == 0 arr = array_from_file((0,), np.dtype('f8'), BytesIO()) - assert_equal(len(arr), 0) + assert len(arr) == 0 # Check error from small file - assert_raises(IOError, array_from_file, - shape, dtype, BytesIO()) + with pytest.raises(IOError): + array_from_file(shape, dtype, BytesIO()) # check on real file fd, fname = tempfile.mkstemp() with InTemporaryDirectory(): @@ -164,8 +163,8 @@ def test_array_from_file(): in_buf = open(fname, 'rb') # For windows this will raise a WindowsError from mmap, Unices # appear to raise an IOError - assert_raises(Exception, array_from_file, - shape, dtype, in_buf) + with pytest.raises(Exception): + array_from_file(shape, dtype, in_buf) del in_buf @@ -180,35 +179,35 @@ def test_array_from_file_mmap(): with open('test.bin', 'rb') as fobj: res = array_from_file(shape, dt, fobj) assert_array_equal(res, arr) - assert_true(isinstance(res, np.memmap)) - assert_equal(res.mode, 'c') + assert isinstance(res, np.memmap) + assert res.mode == 'c' with open('test.bin', 'rb') as fobj: res = array_from_file(shape, dt, fobj, mmap=True) assert_array_equal(res, arr) - assert_true(isinstance(res, np.memmap)) - assert_equal(res.mode, 'c') + assert isinstance(res, np.memmap) + assert res.mode == 'c' with open('test.bin', 'rb') as fobj: res = array_from_file(shape, dt, fobj, mmap='c') assert_array_equal(res, arr) - assert_true(isinstance(res, np.memmap)) - assert_equal(res.mode, 'c') + assert isinstance(res, np.memmap) + assert res.mode == 'c' with open('test.bin', 'rb') as fobj: res = array_from_file(shape, dt, fobj, mmap='r') assert_array_equal(res, arr) - assert_true(isinstance(res, np.memmap)) - assert_equal(res.mode, 'r') + assert isinstance(res, np.memmap) + assert res.mode == 'r' with open('test.bin', 'rb+') as fobj: res = array_from_file(shape, dt, fobj, mmap='r+') assert_array_equal(res, arr) - assert_true(isinstance(res, np.memmap)) - assert_equal(res.mode, 'r+') + assert isinstance(res, np.memmap) + assert res.mode == 'r+' with open('test.bin', 'rb') as fobj: res = array_from_file(shape, dt, fobj, mmap=False) assert_array_equal(res, arr) - assert_false(isinstance(res, np.memmap)) + assert not isinstance(res, np.memmap) with open('test.bin', 'rb') as fobj: - assert_raises(ValueError, - array_from_file, shape, dt, fobj, mmap='p') + with pytest.raises(ValueError): + array_from_file(shape, dt, fobj, mmap='p') def buf_chk(in_arr, out_buf, in_buf, offset): @@ -276,7 +275,7 @@ def test_array_from_file_reread(): out_arr = array_from_file(shape, dtt, fobj_r, offset, order) assert_array_equal(in_arr, out_arr) out_arr[..., 0] = -1 - assert_false(np.allclose(in_arr, out_arr)) + assert not np.allclose(in_arr, out_arr) out_arr2 = array_from_file(shape, dtt, fobj_r, offset, order) assert_array_equal(in_arr, out_arr2) finally: @@ -336,7 +335,7 @@ def test_a2f_upscale(): back = apply_read_scaling(raw, slope, inter) top = back - arr score = np.abs(top / arr) - assert_true(np.all(score < 10)) + assert np.all(score < 10) def test_a2f_min_max(): @@ -547,13 +546,12 @@ def test_a2f_scaled_unscaled(): nan_fill = np.round(nan_fill) # nan2zero will check whether 0 in scaled to a valid value in output if (in_dtype in CFLOAT_TYPES and not mn_out <= nan_fill <= mx_out): - assert_raises(ValueError, - array_to_file, - arr, - fobj, - out_dtype=out_dtype, - divslope=divslope, - intercept=intercept) + with pytest.raises(ValueError): + array_to_file(arr, + fobj, + out_dtype=out_dtype, + divslope=divslope, + intercept=intercept) continue with suppress_warnings(): back_arr = write_return(arr, fobj, @@ -625,13 +623,12 @@ def test_a2f_bad_scaling(): intercept=inter, divslope=slope)) else: - assert_raises(ValueError, - array_to_file, - arr, - fobj, - np.int8, - intercept=inter, - divslope=slope) + with pytest.raises(ValueError): + array_to_file(arr, + fobj, + np.int8, + intercept=inter, + divslope=slope) def test_a2f_nan2zero_range(): @@ -668,8 +665,10 @@ def test_a2f_nan2zero_range(): # Errors from datatype threshold after scaling back_arr = write_return(arr, fobj, np.int8, intercept=128) assert_array_equal([-128, -128, -127, -128], back_arr) - assert_raises(ValueError, write_return, arr, fobj, np.int8, intercept=129) - assert_raises(ValueError, write_return, arr_no_nan, fobj, np.int8, intercept=129) + with pytest.raises(ValueError): + write_return(arr, fobj, np.int8, intercept=129) + with pytest.raises(ValueError): + write_return(arr_no_nan, fobj, np.int8, intercept=129) # OK with nan2zero false, but we get whatever nan casts to nan_cast = np.array(np.nan).astype(np.int8) back_arr = write_return(arr, fobj, np.int8, intercept=129, nan2zero=False) @@ -677,10 +676,10 @@ def test_a2f_nan2zero_range(): # divslope back_arr = write_return(arr, fobj, np.int8, intercept=256, divslope=2) assert_array_equal([-128, -128, -128, -128], back_arr) - assert_raises(ValueError, write_return, arr, fobj, np.int8, - intercept=257.1, divslope=2) - assert_raises(ValueError, write_return, arr_no_nan, fobj, np.int8, - intercept=257.1, divslope=2) + with pytest.raises(ValueError): + write_return(arr, fobj, np.int8, intercept=257.1, divslope=2) + with pytest.raises(ValueError): + write_return(arr_no_nan, fobj, np.int8, intercept=257.1, divslope=2) # OK with nan2zero false back_arr = write_return(arr, fobj, np.int8, intercept=257.1, divslope=2, nan2zero=False) @@ -705,8 +704,10 @@ def test_a2f_non_numeric(): back_arr = write_return(arr, fobj, float) assert_array_equal(back_arr, arr.astype(float)) # mn, mx never work for structured types - assert_raises(ValueError, write_return, arr, fobj, float, mn=0) - assert_raises(ValueError, write_return, arr, fobj, float, mx=10) + with pytest.raises(ValueError): + write_return(arr, fobj, float, mn=0) + with pytest.raises(ValueError): + write_return(arr, fobj, float, mx=10) def write_return(data, fileobj, out_dtype, *args, **kwargs): @@ -720,42 +721,39 @@ def write_return(data, fileobj, out_dtype, *args, **kwargs): def test_apply_scaling(): # Null scaling, same array returned arr = np.zeros((3,), dtype=np.int16) - assert_true(apply_read_scaling(arr) is arr) - assert_true(apply_read_scaling(arr, np.float64(1.0)) is arr) - assert_true(apply_read_scaling(arr, inter=np.float64(0)) is arr) + assert apply_read_scaling(arr) is arr + assert apply_read_scaling(arr, np.float64(1.0)) is arr + assert apply_read_scaling(arr, inter=np.float64(0)) is arr f32, f64 = np.float32, np.float64 f32_arr = np.zeros((1,), dtype=f32) i16_arr = np.zeros((1,), dtype=np.int16) # Check float upcast (not the normal numpy scalar rule) # This is the normal rule - no upcast from scalar - assert_equal((f32_arr * f64(1)).dtype, np.float32) - assert_equal((f32_arr + f64(1)).dtype, np.float32) + assert (f32_arr * f64(1)).dtype == np.float32 + assert (f32_arr + f64(1)).dtype == np.float32 # The function does upcast though - ret = apply_read_scaling(np.float32(0), np.float64(2)) - assert_equal(ret.dtype, np.float64) - ret = apply_read_scaling(np.float32(0), inter=np.float64(2)) - assert_equal(ret.dtype, np.float64) + ret=apply_read_scaling(np.float32(0), np.float64(2)) + assert ret.dtype == np.float64 + ret=apply_read_scaling(np.float32(0), inter=np.float64(2)) + assert ret.dtype == np.float64 # Check integer inf upcast - big = f32(type_info(f32)['max']) + big=f32(type_info(f32)['max']) # Normally this would not upcast - assert_equal((i16_arr * big).dtype, np.float32) + assert (i16_arr * big).dtype == np.float32 # An equivalent case is a little hard to find for the intercept - nmant_32 = type_info(np.float32)['nmant'] - big_delta = np.float32(2**(floor_log2(big) - nmant_32)) - assert_equal((i16_arr * big_delta + big).dtype, np.float32) + nmant_32=type_info(np.float32)['nmant'] + big_delta=np.float32(2**(floor_log2(big) - nmant_32)) + assert (i16_arr * big_delta + big).dtype == np.float32 # Upcasting does occur with this routine - assert_equal(apply_read_scaling(i16_arr, big).dtype, np.float64) - assert_equal(apply_read_scaling(i16_arr, big_delta, big).dtype, np.float64) + assert apply_read_scaling(i16_arr, big).dtype == np.float64 + assert apply_read_scaling(i16_arr, big_delta, big).dtype == np.float64 # If float32 passed, no overflow, float32 returned - assert_equal(apply_read_scaling(np.int8(0), f32(-1.0), f32(0.0)).dtype, - np.float32) + assert apply_read_scaling(np.int8(0), f32(-1.0), f32(0.0)).dtype == np.float32 # float64 passed, float64 returned - assert_equal(apply_read_scaling(np.int8(0), -1.0, 0.0).dtype, np.float64) + assert apply_read_scaling(np.int8(0), -1.0, 0.0).dtype == np.float64 # float32 passed, overflow, float64 returned - assert_equal(apply_read_scaling(np.int8(0), f32(1e38), f32(0.0)).dtype, - np.float64) - assert_equal(apply_read_scaling(np.int8(0), f32(-1e38), f32(0.0)).dtype, - np.float64) + assert apply_read_scaling(np.int8(0), f32(1e38), f32(0.0)).dtype == np.float64 + assert apply_read_scaling(np.int8(0), f32(-1e38), f32(0.0)).dtype == np.float64 # Non-zero intercept still generates floats assert_dt_equal(apply_read_scaling(i16_arr, 1.0, 1.0).dtype, float) assert_dt_equal(apply_read_scaling( @@ -766,7 +764,7 @@ def test_apply_scaling(): def test_apply_read_scaling_ints(): # Test that apply_read_scaling copes with integer scaling inputs - arr = np.arange(10, dtype=np.int16) + arr=np.arange(10, dtype=np.int16) assert_array_equal(apply_read_scaling(arr, 1, 0), arr) assert_array_equal(apply_read_scaling(arr, 1, 1), arr + 1) assert_array_equal(apply_read_scaling(arr, 2, 1), arr * 2 + 1) @@ -774,7 +772,7 @@ def test_apply_read_scaling_ints(): def test_apply_read_scaling_nones(): # Check that we can pass None as slope and inter to apply read scaling - arr = np.arange(10, dtype=np.int16) + arr=np.arange(10, dtype=np.int16) assert_array_equal(apply_read_scaling(arr, None, None), arr) assert_array_equal(apply_read_scaling(arr, 2, None), arr * 2) assert_array_equal(apply_read_scaling(arr, None, 1), arr + 1) @@ -782,10 +780,10 @@ def test_apply_read_scaling_nones(): def test_int_scinter(): # Finding float type needed for applying scale, offset to ints - assert_equal(int_scinter_ftype(np.int8, 1.0, 0.0), np.float32) - assert_equal(int_scinter_ftype(np.int8, -1.0, 0.0), np.float32) - assert_equal(int_scinter_ftype(np.int8, 1e38, 0.0), np.float64) - assert_equal(int_scinter_ftype(np.int8, -1e38, 0.0), np.float64) + assert int_scinter_ftype(np.int8, 1.0, 0.0) == np.float32 + assert int_scinter_ftype(np.int8, -1.0, 0.0) == np.float32 + assert int_scinter_ftype(np.int8, 1e38, 0.0) == np.float64 + assert int_scinter_ftype(np.int8, -1e38, 0.0) == np.float64 def test_working_type(): @@ -794,32 +792,32 @@ def test_working_type(): # need this because of the very confusing np.int32 != np.intp (on 32 bit). def wt(*args, **kwargs): return np.dtype(working_type(*args, **kwargs)).str - d1 = np.atleast_1d + d1=np.atleast_1d for in_type in NUMERIC_TYPES: - in_ts = np.dtype(in_type).str - assert_equal(wt(in_type), in_ts) - assert_equal(wt(in_type, 1, 0), in_ts) - assert_equal(wt(in_type, 1.0, 0.0), in_ts) - in_val = d1(in_type(0)) + in_ts=np.dtype(in_type).str + assert wt(in_type) == in_ts + assert wt(in_type, 1, 0) == in_ts + assert wt(in_type, 1.0, 0.0) == in_ts + in_val=d1(in_type(0)) for slope_type in NUMERIC_TYPES: - sl_val = slope_type(1) # no scaling, regardless of type - assert_equal(wt(in_type, sl_val, 0.0), in_ts) - sl_val = slope_type(2) # actual scaling - out_val = in_val / d1(sl_val) - assert_equal(wt(in_type, sl_val), out_val.dtype.str) + sl_val=slope_type(1) # no scaling, regardless of type + assert wt(in_type, sl_val, 0.0) == in_ts + sl_val=slope_type(2) # actual scaling + out_val=in_val / d1(sl_val) + assert wt(in_type, sl_val) == out_val.dtype.str for inter_type in NUMERIC_TYPES: - i_val = inter_type(0) # no scaling, regardless of type - assert_equal(wt(in_type, 1, i_val), in_ts) - i_val = inter_type(1) # actual scaling - out_val = in_val - d1(i_val) - assert_equal(wt(in_type, 1, i_val), out_val.dtype.str) + i_val=inter_type(0) # no scaling, regardless of type + assert wt(in_type, 1, i_val) == in_ts + i_val=inter_type(1) # actual scaling + out_val=in_val - d1(i_val) + assert wt(in_type, 1, i_val) == out_val.dtype.str # Combine scaling and intercept - out_val = (in_val - d1(i_val)) / d1(sl_val) - assert_equal(wt(in_type, sl_val, i_val), out_val.dtype.str) + out_val=(in_val - d1(i_val)) / d1(sl_val) + assert wt(in_type, sl_val, i_val) == out_val.dtype.str # Confirm that type codes and dtypes work as well - f32s = np.dtype(np.float32).str - assert_equal(wt('f4', 1, 0), f32s) - assert_equal(wt(np.dtype('f4'), 1, 0), f32s) + f32s=np.dtype(np.float32).str + assert wt('f4', 1, 0) == f32s + assert wt(np.dtype('f4'), 1, 0) == f32s def test_better_float(): @@ -828,18 +826,16 @@ def check_against(f1, f2): return f1 if FLOAT_TYPES.index(f1) >= FLOAT_TYPES.index(f2) else f2 for first in FLOAT_TYPES: for other in IUINT_TYPES + np.sctypes['complex']: - assert_equal(better_float_of(first, other), first) - assert_equal(better_float_of(other, first), first) + assert better_float_of(first, other) == first + assert better_float_of(other, first) == first for other2 in IUINT_TYPES + np.sctypes['complex']: - assert_equal(better_float_of(other, other2), np.float32) - assert_equal(better_float_of(other, other2, np.float64), - np.float64) + assert better_float_of(other, other2) == np.float32 + assert better_float_of(other, other2, np.float64) == np.float64 for second in FLOAT_TYPES: - assert_equal(better_float_of(first, second), - check_against(first, second)) + assert better_float_of(first, second) == check_against(first, second) # Check codes and dtypes work - assert_equal(better_float_of('f4', 'f8', 'f4'), np.float64) - assert_equal(better_float_of('i4', 'i8', 'f8'), np.float64) + assert better_float_of('f4', 'f8', 'f4') == np.float64 + assert better_float_of('i4', 'i8', 'f8') == np.float64 def test_best_write_scale_ftype(): @@ -847,44 +843,40 @@ def test_best_write_scale_ftype(): # Types return better of (default, array type) unless scale overflows. # Return float type cannot be less capable than the input array type for dtt in IUINT_TYPES + FLOAT_TYPES: - arr = np.arange(10, dtype=dtt) - assert_equal(best_write_scale_ftype(arr, 1, 0), - better_float_of(dtt, np.float32)) - assert_equal(best_write_scale_ftype(arr, 1, 0, np.float64), - better_float_of(dtt, np.float64)) - assert_equal(best_write_scale_ftype(arr, np.float32(2), 0), - better_float_of(dtt, np.float32)) - assert_equal(best_write_scale_ftype(arr, 1, np.float32(1)), - better_float_of(dtt, np.float32)) + arr=np.arange(10, dtype=dtt) + assert best_write_scale_ftype(arr, 1, 0) == better_float_of(dtt, np.float32) + assert best_write_scale_ftype(arr, 1, 0, np.float64) == better_float_of(dtt, np.float64) + assert best_write_scale_ftype(arr, np.float32(2), 0) == better_float_of(dtt, np.float32) + assert best_write_scale_ftype(arr, 1, np.float32(1)) == better_float_of(dtt, np.float32) # Overflowing ints with scaling results in upcast - best_vals = ((np.float32, np.float64),) + best_vals=((np.float32, np.float64),) if np.longdouble in OK_FLOATS: best_vals += ((np.float64, np.longdouble),) for lower_t, higher_t in best_vals: # Information on this float - L_info = type_info(lower_t) - t_max = L_info['max'] - nmant = L_info['nmant'] # number of significand digits - big_delta = lower_t(2**(floor_log2(t_max) - nmant)) # delta below max + L_info=type_info(lower_t) + t_max=L_info['max'] + nmant=L_info['nmant'] # number of significand digits + big_delta=lower_t(2**(floor_log2(t_max) - nmant)) # delta below max # Even large values that don't overflow don't change output - arr = np.array([0, t_max], dtype=lower_t) - assert_equal(best_write_scale_ftype(arr, 1, 0), lower_t) + arr=np.array([0, t_max], dtype=lower_t) + assert best_write_scale_ftype(arr, 1, 0) == lower_t # Scaling > 1 reduces output values, so no upcast needed - assert_equal(best_write_scale_ftype(arr, lower_t(1.01), 0), lower_t) + assert best_write_scale_ftype(arr, lower_t(1.01), 0) == lower_t # Scaling < 1 increases values, so upcast may be needed (and is here) - assert_equal(best_write_scale_ftype(arr, lower_t(0.99), 0), higher_t) + assert best_write_scale_ftype(arr, lower_t(0.99), 0) == higher_t # Large minus offset on large array can cause upcast - assert_equal(best_write_scale_ftype(arr, 1, -big_delta / 2.01), lower_t) - assert_equal(best_write_scale_ftype(arr, 1, -big_delta / 2.0), higher_t) + assert best_write_scale_ftype(arr, 1, -big_delta / 2.01) == lower_t + assert best_write_scale_ftype(arr, 1, -big_delta / 2.0) == higher_t # With infs already in input, default type returns - arr[0] = np.inf - assert_equal(best_write_scale_ftype(arr, lower_t(0.5), 0), lower_t) - arr[0] = -np.inf - assert_equal(best_write_scale_ftype(arr, lower_t(0.5), 0), lower_t) + arr[0]=np.inf + assert best_write_scale_ftype(arr, lower_t(0.5), 0) == lower_t + arr[0]=-np.inf + assert best_write_scale_ftype(arr, lower_t(0.5), 0) == lower_t def test_can_cast(): - tests = ((np.float32, np.float32, True, True, True), + tests=((np.float32, np.float32, True, True, True), (np.float64, np.float32, True, True, True), (np.complex128, np.float32, False, False, False), (np.float32, np.complex128, True, True, True), @@ -900,46 +892,46 @@ def test_can_cast(): (np.uint16, np.uint8, False, True, True), ) for intype, outtype, def_res, scale_res, all_res in tests: - assert_equal(def_res, can_cast(intype, outtype)) - assert_equal(scale_res, can_cast(intype, outtype, False, True)) - assert_equal(all_res, can_cast(intype, outtype, True, True)) + assert def_res == can_cast(intype, outtype) + assert scale_res == can_cast(intype, outtype, False, True) + assert all_res == can_cast(intype, outtype, True, True) def test_write_zeros(): - bio = BytesIO() + bio=BytesIO() write_zeros(bio, 10000) - assert_equal(bio.getvalue(), b'\x00' * 10000) + assert bio.getvalue() == b'\x00' * 10000 bio.seek(0) bio.truncate(0) write_zeros(bio, 10000, 256) - assert_equal(bio.getvalue(), b'\x00' * 10000) + assert bio.getvalue() == b'\x00' * 10000 bio.seek(0) bio.truncate(0) write_zeros(bio, 200, 256) - assert_equal(bio.getvalue(), b'\x00' * 200) + assert bio.getvalue() == b'\x00' * 200 def test_seek_tell(): # Test seek tell routine - bio = BytesIO() - in_files = bio, 'test.bin', 'test.gz', 'test.bz2' - start = 10 - end = 100 - diff = end - start - tail = 7 + bio=BytesIO() + in_files=bio, 'test.bin', 'test.gz', 'test.bz2' + start=10 + end=100 + diff=end - start + tail=7 with InTemporaryDirectory(): for in_file, write0 in itertools.product(in_files, (False, True)): - st = functools.partial(seek_tell, write0=write0) + st=functools.partial(seek_tell, write0=write0) bio.seek(0) # First write the file with ImageOpener(in_file, 'wb') as fobj: - assert_equal(fobj.tell(), 0) + assert fobj.tell() == 0 # already at position - OK st(fobj, 0) - assert_equal(fobj.tell(), 0) + assert fobj.tell() == 0 # Move position by writing fobj.write(b'\x01' * start) - assert_equal(fobj.tell(), start) + assert fobj.tell() == start # Files other than BZ2Files can seek forward on write, leaving # zeros in their wake. BZ2Files can't seek when writing, unless # we enable the write0 flag to seek_tell @@ -948,66 +940,68 @@ def test_seek_tell(): fobj.write(b'\x00' * diff) else: st(fobj, end) - assert_equal(fobj.tell(), end) + assert fobj.tell() == end # Write tail fobj.write(b'\x02' * tail) bio.seek(0) # Now read back the file testing seek_tell in reading mode with ImageOpener(in_file, 'rb') as fobj: - assert_equal(fobj.tell(), 0) + assert fobj.tell() == 0 st(fobj, 0) - assert_equal(fobj.tell(), 0) + assert fobj.tell() == 0 st(fobj, start) - assert_equal(fobj.tell(), start) + assert fobj.tell() == start st(fobj, end) - assert_equal(fobj.tell(), end) + assert fobj.tell() == end # Seek anywhere works in read mode for all files st(fobj, 0) bio.seek(0) # Check we have the expected written output with ImageOpener(in_file, 'rb') as fobj: - assert_equal(fobj.read(), - b'\x01' * start + b'\x00' * diff + b'\x02' * tail) + assert fobj.read() == b'\x01' * start + b'\x00' * diff + b'\x02' * tail for in_file in ('test2.gz', 'test2.bz2'): # Check failure of write seek backwards with ImageOpener(in_file, 'wb') as fobj: fobj.write(b'g' * 10) - assert_equal(fobj.tell(), 10) + assert fobj.tell() == 10 seek_tell(fobj, 10) - assert_equal(fobj.tell(), 10) - assert_raises(IOError, seek_tell, fobj, 5) + assert fobj.tell() == 10 + with pytest.raises(IOError): + seek_tell(fobj, 5) # Make sure read seeks don't affect file with ImageOpener(in_file, 'rb') as fobj: seek_tell(fobj, 10) seek_tell(fobj, 0) with ImageOpener(in_file, 'rb') as fobj: - assert_equal(fobj.read(), b'g' * 10) + assert fobj.read() == b'g' * 10 def test_seek_tell_logic(): # Test logic of seek_tell write0 with dummy class # Seek works? OK - bio = BytesIO() + bio=BytesIO() seek_tell(bio, 10) - assert_equal(bio.tell(), 10) + assert bio.tell() == 10 class BabyBio(BytesIO): def seek(self, *args): raise IOError() - bio = BabyBio() + bio=BabyBio() # Fresh fileobj, position 0, can't seek - error - assert_raises(IOError, bio.seek, 10) + with pytest.raises(IOError): + bio.seek(10) # Put fileobj in correct position by writing - ZEROB = b'\x00' + ZEROB=b'\x00' bio.write(ZEROB * 10) seek_tell(bio, 10) # already there, nothing to do - assert_equal(bio.tell(), 10) - assert_equal(bio.getvalue(), ZEROB * 10) + assert bio.tell() == 10 + assert bio.getvalue() == ZEROB * 10 # Try write zeros to get to new position - assert_raises(IOError, bio.seek, 20) + with pytest.raises(IOError): + bio.seek(20) seek_tell(bio, 20, write0=True) - assert_equal(bio.getvalue(), ZEROB * 20) + assert bio.getvalue() == ZEROB * 20 def test_fname_ext_ul_case(): @@ -1016,23 +1010,23 @@ def test_fname_ext_ul_case(): with open('afile.TXT', 'wt') as fobj: fobj.write('Interesting information') # OSX usually has case-insensitive file systems; Windows also - os_cares_case = not exists('afile.txt') + os_cares_case=not exists('afile.txt') with open('bfile.txt', 'wt') as fobj: fobj.write('More interesting information') # If there is no file, the case doesn't change - assert_equal(fname_ext_ul_case('nofile.txt'), 'nofile.txt') - assert_equal(fname_ext_ul_case('nofile.TXT'), 'nofile.TXT') + assert fname_ext_ul_case('nofile.txt') == 'nofile.txt' + assert fname_ext_ul_case('nofile.TXT') == 'nofile.TXT' # If there is a file, accept upper or lower case for ext if os_cares_case: - assert_equal(fname_ext_ul_case('afile.txt'), 'afile.TXT') - assert_equal(fname_ext_ul_case('bfile.TXT'), 'bfile.txt') + assert fname_ext_ul_case('afile.txt') == 'afile.TXT' + assert fname_ext_ul_case('bfile.TXT') == 'bfile.txt' else: - assert_equal(fname_ext_ul_case('afile.txt'), 'afile.txt') - assert_equal(fname_ext_ul_case('bfile.TXT'), 'bfile.TXT') - assert_equal(fname_ext_ul_case('afile.TXT'), 'afile.TXT') - assert_equal(fname_ext_ul_case('bfile.txt'), 'bfile.txt') + assert fname_ext_ul_case('afile.txt') == 'afile.txt' + assert fname_ext_ul_case('bfile.TXT') == 'bfile.TXT' + assert fname_ext_ul_case('afile.TXT') == 'afile.TXT' + assert fname_ext_ul_case('bfile.txt') == 'bfile.txt' # Not mixed case though - assert_equal(fname_ext_ul_case('afile.TxT'), 'afile.TxT') + assert fname_ext_ul_case('afile.TxT') == 'afile.TxT' def test_allopen(): @@ -1041,72 +1035,72 @@ def test_allopen(): with clear_and_catch_warnings() as w: warnings.filterwarnings('once', category=DeprecationWarning) # Test default mode is 'rb' - fobj = allopen(__file__) + fobj=allopen(__file__) # Check we got the deprecation warning - assert_equal(len(w), 1) - assert_equal(fobj.mode, 'rb') + assert len(w) == 1 + assert fobj.mode == 'rb' # That we can set it - fobj = allopen(__file__, 'r') - assert_equal(fobj.mode, 'r') + fobj=allopen(__file__, 'r') + assert fobj.mode == 'r' # with keyword arguments - fobj = allopen(__file__, mode='r') - assert_equal(fobj.mode, 'r') + fobj=allopen(__file__, mode='r') + assert fobj.mode == 'r' # fileobj returns fileobj - msg = b'tiddle pom' - sobj = BytesIO(msg) - fobj = allopen(sobj) - assert_equal(fobj.read(), msg) + msg=b'tiddle pom' + sobj=BytesIO(msg) + fobj=allopen(sobj) + assert fobj.read() == msg # mode is gently ignored - fobj = allopen(sobj, mode='r') + fobj=allopen(sobj, mode='r') def test_allopen_compresslevel(): # We can set the default compression level with the module global # Get some data to compress with open(__file__, 'rb') as fobj: - my_self = fobj.read() + my_self=fobj.read() # Prepare loop - fname = 'test.gz' - sizes = {} + fname='test.gz' + sizes={} # Stash module global from .. import volumeutils as vu - original_compress_level = vu.default_compresslevel - assert_equal(original_compress_level, 1) + original_compress_level=vu.default_compresslevel + assert original_compress_level == 1 try: with InTemporaryDirectory(): for compresslevel in ('default', 1, 9): if compresslevel != 'default': - vu.default_compresslevel = compresslevel + vu.default_compresslevel=compresslevel with warnings.catch_warnings(): warnings.simplefilter("ignore") with allopen(fname, 'wb') as fobj: fobj.write(my_self) with open(fname, 'rb') as fobj: - my_selves_smaller = fobj.read() - sizes[compresslevel] = len(my_selves_smaller) - assert_equal(sizes['default'], sizes[1]) - assert_true(sizes[1] > sizes[9]) + my_selves_smaller=fobj.read() + sizes[compresslevel]=len(my_selves_smaller) + assert sizes['default'] == sizes[1] + assert sizes[1] > sizes[9] finally: - vu.default_compresslevel = original_compress_level + vu.default_compresslevel=original_compress_level def test_shape_zoom_affine(): - shape = (3, 5, 7) - zooms = (3, 2, 1) - res = shape_zoom_affine(shape, zooms) - exp = np.array([[-3., 0., 0., 3.], + shape=(3, 5, 7) + zooms=(3, 2, 1) + res=shape_zoom_affine(shape, zooms) + exp=np.array([[-3., 0., 0., 3.], [0., 2., 0., -4.], [0., 0., 1., -3.], [0., 0., 0., 1.]]) assert_array_almost_equal(res, exp) - res = shape_zoom_affine((3, 5), (3, 2)) - exp = np.array([[-3., 0., 0., 3.], + res=shape_zoom_affine((3, 5), (3, 2)) + exp=np.array([[-3., 0., 0., 3.], [0., 2., 0., -4.], [0., 0., 1., -0.], [0., 0., 0., 1.]]) assert_array_almost_equal(res, exp) - res = shape_zoom_affine(shape, zooms, False) - exp = np.array([[3., 0., 0., -3.], + res=shape_zoom_affine(shape, zooms, False) + exp=np.array([[3., 0., 0., -3.], [0., 2., 0., -4.], [0., 0., 1., -3.], [0., 0., 0., 1.]]) @@ -1114,9 +1108,9 @@ def test_shape_zoom_affine(): def test_rec2dict(): - r = np.zeros((), dtype=[('x', 'i4'), ('s', 'S10')]) - d = rec2dict(r) - assert_equal(d, {'x': 0, 's': b''}) + r=np.zeros((), dtype=[('x', 'i4'), ('s', 'S10')]) + d=rec2dict(r) + assert d == {'x': 0, 's': b''} def test_dtypes(): @@ -1127,43 +1121,45 @@ def test_dtypes(): # In [10]: dtype(' Date: Mon, 11 Nov 2019 15:50:39 -0500 Subject: [PATCH 2/7] fixed spacing --- nibabel/tests/test_volumeutils.py | 872 ++++++++++++++++-------------- 1 file changed, 466 insertions(+), 406 deletions(-) diff --git a/nibabel/tests/test_volumeutils.py b/nibabel/tests/test_volumeutils.py index 12f93ef70d..34c2196b93 100644 --- a/nibabel/tests/test_volumeutils.py +++ b/nibabel/tests/test_volumeutils.py @@ -6,7 +6,7 @@ # copyright and license terms. # ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## -''' Test for volumeutils module ''' +""" Test for volumeutils module """ import os from os.path import exists @@ -27,54 +27,60 @@ from ..tmpdirs import InTemporaryDirectory from ..openers import ImageOpener from .. import volumeutils -from ..volumeutils import (array_from_file, - _is_compressed_fobj, - array_to_file, - allopen, # for backwards compatibility - fname_ext_ul_case, - calculate_scale, - can_cast, - write_zeros, - seek_tell, - apply_read_scaling, - working_type, - best_write_scale_ftype, - better_float_of, - int_scinter_ftype, - make_dt_codes, - native_code, - shape_zoom_affine, - rec2dict, - _dt_min_max, - _write_data, - _ftype4scaled_finite, - ) +from ..volumeutils import ( + array_from_file, + _is_compressed_fobj, + array_to_file, + allopen, # for backwards compatibility + fname_ext_ul_case, + calculate_scale, + can_cast, + write_zeros, + seek_tell, + apply_read_scaling, + working_type, + best_write_scale_ftype, + better_float_of, + int_scinter_ftype, + make_dt_codes, + native_code, + shape_zoom_affine, + rec2dict, + _dt_min_max, + _write_data, + _ftype4scaled_finite, +) from ..openers import Opener, BZ2File -from ..casting import (floor_log2, type_info, OK_FLOATS, shared_range) +from ..casting import floor_log2, type_info, OK_FLOATS, shared_range -from numpy.testing import (assert_array_almost_equal, - assert_array_equal) +from numpy.testing import assert_array_almost_equal, assert_array_equal -from ..testing_pytest import (assert_dt_equal, assert_allclose_safely, - suppress_warnings, clear_and_catch_warnings) +from ..testing_pytest import ( + assert_dt_equal, + assert_allclose_safely, + suppress_warnings, + clear_and_catch_warnings, +) #: convenience variables for numpy types -FLOAT_TYPES = np.sctypes['float'] -COMPLEX_TYPES = np.sctypes['complex'] +FLOAT_TYPES = np.sctypes["float"] +COMPLEX_TYPES = np.sctypes["complex"] CFLOAT_TYPES = FLOAT_TYPES + COMPLEX_TYPES -INT_TYPES = np.sctypes['int'] -IUINT_TYPES = INT_TYPES + np.sctypes['uint'] +INT_TYPES = np.sctypes["int"] +IUINT_TYPES = INT_TYPES + np.sctypes["uint"] NUMERIC_TYPES = CFLOAT_TYPES + IUINT_TYPES def test__is_compressed_fobj(): # _is_compressed helper function with InTemporaryDirectory(): - for ext, opener, compressed in (('', open, False), - ('.gz', gzip.open, True), - ('.bz2', BZ2File, True)): - fname = 'test.bin' + ext - for mode in ('wb', 'rb'): + for ext, opener, compressed in ( + ("", open, False), + (".gz", gzip.open, True), + (".bz2", BZ2File, True), + ): + fname = "test.bin" + ext + for mode in ("wb", "rb"): fobj = opener(fname, mode) assert _is_compressed_fobj(fobj) == compressed fobj.close() @@ -91,29 +97,29 @@ def make_array(n, bytes): return arr # Check whether file, gzip file, bz2 file reread memory from cache - fname = 'test.bin' + fname = "test.bin" with InTemporaryDirectory(): for n, opener in itertools.product( - (256, 1024, 2560, 25600), - (open, gzip.open, BZ2File)): + (256, 1024, 2560, 25600), (open, gzip.open, BZ2File) + ): in_arr = np.arange(n, dtype=dtype) # Write array to file - fobj_w = opener(fname, 'wb') + fobj_w = opener(fname, "wb") fobj_w.write(in_arr.tostring()) fobj_w.close() # Read back from file - fobj_r = opener(fname, 'rb') + fobj_r = opener(fname, "rb") try: contents1 = bytearray(4 * n) fobj_r.readinto(contents1) # Second element is 1 - assert contents1[0:8] != b'\x00' * 8 + assert contents1[0:8] != b"\x00" * 8 out_arr = make_array(n, contents1) assert_array_equal(in_arr, out_arr) # Set second element to 0 out_arr[1] = 0 # Show this changed the bytes string - assert contents1[:8] == b'\x00' * 8 + assert contents1[:8] == b"\x00" * 8 # Reread, to get unmodified contents fobj_r.seek(0) contents2 = bytearray(4 * n) @@ -136,11 +142,11 @@ def test_array_from_file(): offset = 10 assert buf_chk(in_arr, BytesIO(), None, offset) # check on real file - fname = 'test.bin' + fname = "test.bin" with InTemporaryDirectory(): # fortran ordered - out_buf = open(fname, 'wb') - in_buf = open(fname, 'rb') + out_buf = open(fname, "wb") + in_buf = open(fname, "rb") assert buf_chk(in_arr, out_buf, in_buf, offset) # Drop offset to check that shape's not coming from file length out_buf.seek(0) @@ -149,9 +155,9 @@ def test_array_from_file(): assert buf_chk(in_arr, out_buf, in_buf, offset) del out_buf, in_buf # Make sure empty shape, and zero length, give empty arrays - arr = array_from_file((), np.dtype('f8'), BytesIO()) + arr = array_from_file((), np.dtype("f8"), BytesIO()) assert len(arr) == 0 - arr = array_from_file((0,), np.dtype('f8'), BytesIO()) + arr = array_from_file((0,), np.dtype("f8"), BytesIO()) assert len(arr) == 0 # Check error from small file with pytest.raises(IOError): @@ -159,8 +165,8 @@ def test_array_from_file(): # check on real file fd, fname = tempfile.mkstemp() with InTemporaryDirectory(): - open(fname, 'wb').write(b'1') - in_buf = open(fname, 'rb') + open(fname, "wb").write(b"1") + in_buf = open(fname, "rb") # For windows this will raise a WindowsError from mmap, Unices # appear to raise an IOError with pytest.raises(Exception): @@ -174,55 +180,51 @@ def test_array_from_file_mmap(): with InTemporaryDirectory(): for dt in (np.int16, np.float): arr = np.arange(np.prod(shape), dtype=dt).reshape(shape) - with open('test.bin', 'wb') as fobj: - fobj.write(arr.tostring(order='F')) - with open('test.bin', 'rb') as fobj: + with open("test.bin", "wb") as fobj: + fobj.write(arr.tostring(order="F")) + with open("test.bin", "rb") as fobj: res = array_from_file(shape, dt, fobj) assert_array_equal(res, arr) assert isinstance(res, np.memmap) - assert res.mode == 'c' - with open('test.bin', 'rb') as fobj: + assert res.mode == "c" + with open("test.bin", "rb") as fobj: res = array_from_file(shape, dt, fobj, mmap=True) assert_array_equal(res, arr) assert isinstance(res, np.memmap) - assert res.mode == 'c' - with open('test.bin', 'rb') as fobj: - res = array_from_file(shape, dt, fobj, mmap='c') + assert res.mode == "c" + with open("test.bin", "rb") as fobj: + res = array_from_file(shape, dt, fobj, mmap="c") assert_array_equal(res, arr) assert isinstance(res, np.memmap) - assert res.mode == 'c' - with open('test.bin', 'rb') as fobj: - res = array_from_file(shape, dt, fobj, mmap='r') + assert res.mode == "c" + with open("test.bin", "rb") as fobj: + res = array_from_file(shape, dt, fobj, mmap="r") assert_array_equal(res, arr) assert isinstance(res, np.memmap) - assert res.mode == 'r' - with open('test.bin', 'rb+') as fobj: - res = array_from_file(shape, dt, fobj, mmap='r+') + assert res.mode == "r" + with open("test.bin", "rb+") as fobj: + res = array_from_file(shape, dt, fobj, mmap="r+") assert_array_equal(res, arr) assert isinstance(res, np.memmap) - assert res.mode == 'r+' - with open('test.bin', 'rb') as fobj: + assert res.mode == "r+" + with open("test.bin", "rb") as fobj: res = array_from_file(shape, dt, fobj, mmap=False) assert_array_equal(res, arr) assert not isinstance(res, np.memmap) - with open('test.bin', 'rb') as fobj: + with open("test.bin", "rb") as fobj: with pytest.raises(ValueError): - array_from_file(shape, dt, fobj, mmap='p') + array_from_file(shape, dt, fobj, mmap="p") def buf_chk(in_arr, out_buf, in_buf, offset): - ''' Write contents of in_arr into fileobj, read back, check same ''' - instr = b' ' * offset + in_arr.tostring(order='F') + """ Write contents of in_arr into fileobj, read back, check same """ + instr = b" " * offset + in_arr.tostring(order="F") out_buf.write(instr) out_buf.flush() if in_buf is None: # we're using in_buf from out_buf out_buf.seek(0) in_buf = out_buf - arr = array_from_file( - in_arr.shape, - in_arr.dtype, - in_buf, - offset) + arr = array_from_file(in_arr.shape, in_arr.dtype, in_buf, offset) return np.allclose(in_arr, arr) @@ -232,14 +234,13 @@ def test_array_from_file_openers(): dtype = np.dtype(np.float32) in_arr = np.arange(24, dtype=dtype).reshape(shape) with InTemporaryDirectory(): - for ext, offset in itertools.product(('', '.gz', '.bz2'), - (0, 5, 10)): - fname = 'test.bin' + ext - with Opener(fname, 'wb') as out_buf: + for ext, offset in itertools.product(("", ".gz", ".bz2"), (0, 5, 10)): + fname = "test.bin" + ext + with Opener(fname, "wb") as out_buf: if offset != 0: # avoid https://bugs.python.org/issue16828 - out_buf.write(b' ' * offset) - out_buf.write(in_arr.tostring(order='F')) - with Opener(fname, 'rb') as in_buf: + out_buf.write(b" " * offset) + out_buf.write(in_arr.tostring(order="F")) + with Opener(fname, "rb") as in_buf: out_arr = array_from_file(shape, dtype, in_buf, offset) assert_array_almost_equal(in_arr, out_arr) # Delete object holding onto file for Windows @@ -251,25 +252,26 @@ def test_array_from_file_reread(): # This is the live check for the generic checks in # test_fobj_string_assumptions offset = 9 - fname = 'test.bin' + fname = "test.bin" with InTemporaryDirectory(): for shape, opener, dtt, order in itertools.product( - ((64,), (64, 65), (64, 65, 66)), - (open, gzip.open, bz2.BZ2File, BytesIO), - (np.int16, np.float32), - ('F', 'C')): + ((64,), (64, 65), (64, 65, 66)), + (open, gzip.open, bz2.BZ2File, BytesIO), + (np.int16, np.float32), + ("F", "C"), + ): n_els = np.prod(shape) in_arr = np.arange(n_els, dtype=dtt).reshape(shape) - is_bio = hasattr(opener, 'getvalue') + is_bio = hasattr(opener, "getvalue") # Write array to file - fobj_w = opener() if is_bio else opener(fname, 'wb') - fobj_w.write(b' ' * offset) + fobj_w = opener() if is_bio else opener(fname, "wb") + fobj_w.write(b" " * offset) fobj_w.write(in_arr.tostring(order=order)) if is_bio: fobj_r = fobj_w else: fobj_w.close() - fobj_r = opener(fname, 'rb') + fobj_r = opener(fname, "rb") # Read back from file try: out_arr = array_from_file(shape, dtt, fobj_r, offset, order) @@ -291,14 +293,14 @@ def test_array_to_file(): str_io = BytesIO() for tp in (np.uint64, np.float, np.complex): dt = np.dtype(tp) - for code in '<>': + for code in "<>": ndt = dt.newbyteorder(code) for allow_intercept in (True, False): with suppress_warnings(): # deprecated - scale, intercept, mn, mx = \ - calculate_scale(arr, ndt, allow_intercept) - data_back = write_return(arr, str_io, ndt, - 0, intercept, scale) + scale, intercept, mn, mx = calculate_scale( + arr, ndt, allow_intercept + ) + data_back = write_return(arr, str_io, ndt, 0, intercept, scale) assert_array_almost_equal(arr, data_back) # Test array-like str_io = BytesIO() @@ -324,13 +326,20 @@ def test_a2f_upscale(): # Test values discovered from stress testing. The largish value (2**115) # overflows to inf after the intercept is subtracted, using float32 as the # working precision. The difference between inf and this value is lost. - arr = np.array([[info['min'], 2**115, info['max']]], dtype=np.float32) - slope = np.float32(2**121) - inter = info['min'] + arr = np.array([[info["min"], 2 ** 115, info["max"]]], dtype=np.float32) + slope = np.float32(2 ** 121) + inter = info["min"] str_io = BytesIO() # We need to provide mn, mx for function to be able to calculate upcasting - array_to_file(arr, str_io, np.uint8, intercept=inter, divslope=slope, - mn=info['min'], mx=info['max']) + array_to_file( + arr, + str_io, + np.uint8, + intercept=inter, + divslope=slope, + mn=info["min"], + mx=info["max"], + ) raw = array_from_file(arr.shape, np.uint8, str_io) back = apply_read_scaling(raw, slope, inter) top = back - arr @@ -345,11 +354,11 @@ def test_a2f_min_max(): for out_dt in (np.float32, np.int8): arr = np.arange(4, dtype=in_dt) # min thresholding - with np.errstate(invalid='ignore'): + with np.errstate(invalid="ignore"): data_back = write_return(arr, str_io, out_dt, 0, 0, 1, 1) assert_array_equal(data_back, [1, 1, 2, 3]) # max thresholding - with np.errstate(invalid='ignore'): + with np.errstate(invalid="ignore"): data_back = write_return(arr, str_io, out_dt, 0, 0, 1, None, 2) assert_array_equal(data_back, [0, 1, 2, 2]) # min max thresholding @@ -374,13 +383,13 @@ def test_a2f_order(): arr = np.array([0.0, 1.0, 2.0]) str_io = BytesIO() # order makes no difference in 1D case - data_back = write_return(arr, str_io, ndt, order='C') + data_back = write_return(arr, str_io, ndt, order="C") assert_array_equal(data_back, [0.0, 1.0, 2.0]) # but does in the 2D case arr = np.array([[0.0, 1.0], [2.0, 3.0]]) - data_back = write_return(arr, str_io, ndt, order='F') + data_back = write_return(arr, str_io, ndt, order="F") assert_array_equal(data_back, arr) - data_back = write_return(arr, str_io, ndt, order='C') + data_back = write_return(arr, str_io, ndt, order="C") assert_array_equal(data_back, arr.T) @@ -394,12 +403,12 @@ def test_a2f_nan2zero(): # True is the default, but just to show it's possible data_back = write_return(arr, str_io, ndt, nan2zero=True) assert_array_equal(data_back, arr) - with np.errstate(invalid='ignore'): + with np.errstate(invalid="ignore"): data_back = write_return(arr, str_io, np.int64, nan2zero=True) assert_array_equal(data_back, [[0, 0], [0, 0]]) # otherwise things get a bit weird; tidied here # How weird? Look at arr.astype(np.int64) - with np.errstate(invalid='ignore'): + with np.errstate(invalid="ignore"): data_back = write_return(arr, str_io, np.int64, nan2zero=False) assert_array_equal(data_back, arr.astype(np.int64)) @@ -418,18 +427,16 @@ def test_a2f_nan2zero_scaling(): # Array values including zero before scaling but not after bio = BytesIO() for in_dt, out_dt, zero_in, inter in itertools.product( - FLOAT_TYPES, - IUINT_TYPES, - (True, False), - (0, -100)): + FLOAT_TYPES, IUINT_TYPES, (True, False), (0, -100) + ): in_info = np.finfo(in_dt) out_info = np.iinfo(out_dt) - mx = min(in_info.max, out_info.max * 2., 2**32) + inter + mx = min(in_info.max, out_info.max * 2.0, 2 ** 32) + inter mn = 0 if zero_in or inter else 100 vals = [np.nan] + [mn, mx] nan_arr = np.array(vals, dtype=in_dt) zero_arr = np.nan_to_num(nan_arr) - with np.errstate(invalid='ignore'): + with np.errstate(invalid="ignore"): back_nan = write_return(nan_arr, bio, np.int64, intercept=inter) back_zero = write_return(zero_arr, bio, np.int64, intercept=inter) assert_array_equal(back_nan, back_zero) @@ -439,7 +446,7 @@ def test_a2f_offset(): # check that non-zero file offset works arr = np.array([[0.0, 1.0], [2.0, 3.0]]) str_io = BytesIO() - str_io.write(b'a' * 42) + str_io.write(b"a" * 42) array_to_file(arr, str_io, np.float, 42) data_back = array_from_file(arr.shape, np.float, str_io, 42) assert_array_equal(data_back, arr.astype(np.float)) @@ -481,22 +488,30 @@ def test_a2f_zeros(): def test_a2f_big_scalers(): # Check that clip works even for overflowing scalers / data info = type_info(np.float32) - arr = np.array([info['min'], 0, info['max']], dtype=np.float32) + arr = np.array([info["min"], 0, info["max"]], dtype=np.float32) str_io = BytesIO() # Intercept causes overflow - does routine scale correctly? # We check whether the routine correctly clips extreme values. # We need nan2zero=False because we can't represent 0 in the input, given # the scaling and the output range. with suppress_warnings(): # overflow - array_to_file(arr, str_io, np.int8, intercept=np.float32(2**120), - nan2zero=False) + array_to_file( + arr, str_io, np.int8, intercept=np.float32(2 ** 120), nan2zero=False + ) data_back = array_from_file(arr.shape, np.int8, str_io) assert_array_equal(data_back, [-128, -128, 127]) # Scales also if mx, mn specified? Same notes and complaints as for the test # above. str_io.seek(0) - array_to_file(arr, str_io, np.int8, mn=info['min'], mx=info['max'], - intercept=np.float32(2**120), nan2zero=False) + array_to_file( + arr, + str_io, + np.int8, + mn=info["min"], + mx=info["max"], + intercept=np.float32(2 ** 120), + nan2zero=False, + ) data_back = array_from_file(arr.shape, np.int8, str_io) assert_array_equal(data_back, [-128, -128, 127]) # And if slope causes overflow? @@ -507,8 +522,9 @@ def test_a2f_big_scalers(): assert_array_equal(data_back, [-128, 0, 127]) # with mn, mx specified? str_io.seek(0) - array_to_file(arr, str_io, np.int8, mn=info['min'], mx=info['max'], - divslope=np.float32(0.5)) + array_to_file( + arr, str_io, np.int8, mn=info["min"], mx=info["max"], divslope=np.float32(0.5) + ) data_back = array_from_file(arr.shape, np.int8, str_io) assert_array_equal(data_back, [-128, 0, 127]) @@ -518,13 +534,13 @@ def test_a2f_int_scaling(): arr = np.array([0, 1, 128, 255], dtype=np.uint8) fobj = BytesIO() back_arr = write_return(arr, fobj, np.uint8, intercept=1) - assert_array_equal(back_arr, np.clip(arr - 1., 0, 255)) + assert_array_equal(back_arr, np.clip(arr - 1.0, 0, 255)) back_arr = write_return(arr, fobj, np.uint8, divslope=2) - assert_array_equal(back_arr, np.round(np.clip(arr / 2., 0, 255))) + assert_array_equal(back_arr, np.round(np.clip(arr / 2.0, 0, 255))) back_arr = write_return(arr, fobj, np.uint8, intercept=1, divslope=2) - assert_array_equal(back_arr, np.round(np.clip((arr - 1.) / 2., 0, 255))) + assert_array_equal(back_arr, np.round(np.clip((arr - 1.0) / 2.0, 0, 255))) back_arr = write_return(arr, fobj, np.int16, intercept=1, divslope=2) - assert_array_equal(back_arr, np.round((arr - 1.) / 2.)) + assert_array_equal(back_arr, np.round((arr - 1.0) / 2.0)) def test_a2f_scaled_unscaled(): @@ -532,10 +548,8 @@ def test_a2f_scaled_unscaled(): # without scaling fobj = BytesIO() for in_dtype, out_dtype, intercept, divslope in itertools.product( - NUMERIC_TYPES, - NUMERIC_TYPES, - (0, 0.5, -1, 1), - (1, 0.5, 2)): + NUMERIC_TYPES, NUMERIC_TYPES, (0, 0.5, -1, 1), (1, 0.5, 2) + ): mn_in, mx_in = _dt_min_max(in_dtype) nan_val = np.nan if in_dtype in CFLOAT_TYPES else 10 arr = np.array([mn_in, -1, 0, 1, mx_in, nan_val], dtype=in_dtype) @@ -545,31 +559,32 @@ def test_a2f_scaled_unscaled(): if out_dtype in IUINT_TYPES: nan_fill = np.round(nan_fill) # nan2zero will check whether 0 in scaled to a valid value in output - if (in_dtype in CFLOAT_TYPES and not mn_out <= nan_fill <= mx_out): + if in_dtype in CFLOAT_TYPES and not mn_out <= nan_fill <= mx_out: with pytest.raises(ValueError): - array_to_file(arr, - fobj, - out_dtype=out_dtype, - divslope=divslope, - intercept=intercept) + array_to_file( + arr, + fobj, + out_dtype=out_dtype, + divslope=divslope, + intercept=intercept, + ) continue with suppress_warnings(): - back_arr = write_return(arr, fobj, - out_dtype=out_dtype, - divslope=divslope, - intercept=intercept) + back_arr = write_return( + arr, fobj, out_dtype=out_dtype, divslope=divslope, intercept=intercept + ) exp_back = arr.copy() - if (in_dtype in IUINT_TYPES and - out_dtype in IUINT_TYPES and - (intercept, divslope) == (0, 1)): + if ( + in_dtype in IUINT_TYPES + and out_dtype in IUINT_TYPES + and (intercept, divslope) == (0, 1) + ): # Direct iu to iu casting. # Need to clip if ranges not the same. # Use smaller of input, output range to avoid np.clip upcasting # the array because of large clip limits. if (mn_in, mx_in) != (mn_out, mx_out): - exp_back = np.clip(exp_back, - max(mn_in, mn_out), - min(mx_in, mx_out)) + exp_back = np.clip(exp_back, max(mn_in, mn_out), min(mx_in, mx_out)) else: # Need to deal with nans, casting to float, clipping if in_dtype in CFLOAT_TYPES and out_dtype in IUINT_TYPES: exp_back[np.isnan(exp_back)] = 0 @@ -579,8 +594,7 @@ def test_a2f_scaled_unscaled(): exp_back -= intercept if divslope != 1: exp_back /= divslope - if (exp_back.dtype.type in CFLOAT_TYPES and - out_dtype in IUINT_TYPES): + if exp_back.dtype.type in CFLOAT_TYPES and out_dtype in IUINT_TYPES: exp_back = np.round(exp_back).astype(float) exp_back = np.clip(exp_back, *shared_range(float, out_dtype)) exp_back = exp_back.astype(out_dtype) @@ -600,35 +614,28 @@ def test_a2f_nanpos(): def test_a2f_bad_scaling(): # Test that pathological scalers raise an error - NUMERICAL_TYPES = sum([np.sctypes[key] for key in ['int', - 'uint', - 'float', - 'complex']], - []) + NUMERICAL_TYPES = sum( + [np.sctypes[key] for key in ["int", "uint", "float", "complex"]], [] + ) for in_type, out_type, slope, inter in itertools.product( - NUMERICAL_TYPES, - NUMERICAL_TYPES, - (None, 1, 0, np.nan, -np.inf, np.inf), - (0, np.nan, -np.inf, np.inf)): + NUMERICAL_TYPES, + NUMERICAL_TYPES, + (None, 1, 0, np.nan, -np.inf, np.inf), + (0, np.nan, -np.inf, np.inf), + ): arr = np.ones((2,), dtype=in_type) fobj = BytesIO() if (slope, inter) == (1, 0): - assert_array_equal(arr, - write_return(arr, fobj, out_type, - intercept=inter, - divslope=slope)) + assert_array_equal( + arr, write_return(arr, fobj, out_type, intercept=inter, divslope=slope) + ) elif (slope, inter) == (None, 0): - assert_array_equal(0, - write_return(arr, fobj, out_type, - intercept=inter, - divslope=slope)) + assert_array_equal( + 0, write_return(arr, fobj, out_type, intercept=inter, divslope=slope) + ) else: with pytest.raises(ValueError): - array_to_file(arr, - fobj, - np.int8, - intercept=inter, - divslope=slope) + array_to_file(arr, fobj, np.int8, intercept=inter, divslope=slope) def test_a2f_nan2zero_range(): @@ -648,8 +655,9 @@ def test_a2f_nan2zero_range(): # Pushing zero outside the output data range does not generate error back_arr = write_return(arr_no_nan, fobj, np.int8, intercept=129, nan2zero=True) assert_array_equal([-128, -128, -128, -127], back_arr) - back_arr = write_return(arr_no_nan, fobj, np.int8, - intercept=257.1, divslope=2, nan2zero=True) + back_arr = write_return( + arr_no_nan, fobj, np.int8, intercept=257.1, divslope=2, nan2zero=True + ) assert_array_equal([-128, -128, -128, -128], back_arr) for dt in CFLOAT_TYPES: arr = np.array([-1, 0, 1, np.nan], dtype=dt) @@ -657,11 +665,9 @@ def test_a2f_nan2zero_range(): arr_no_nan = np.array([-1, 0, 1, 2], dtype=dt) # No errors from explicit thresholding # mn thresholding excluding zero - assert_array_equal([1, 1, 1, 0], - write_return(arr, fobj, np.int8, mn=1)) + assert_array_equal([1, 1, 1, 0], write_return(arr, fobj, np.int8, mn=1)) # mx thresholding excluding zero - assert_array_equal([-1, -1, -1, 0], - write_return(arr, fobj, np.int8, mx=-1)) + assert_array_equal([-1, -1, -1, 0], write_return(arr, fobj, np.int8, mx=-1)) # Errors from datatype threshold after scaling back_arr = write_return(arr, fobj, np.int8, intercept=128) assert_array_equal([-128, -128, -127, -128], back_arr) @@ -681,17 +687,18 @@ def test_a2f_nan2zero_range(): with pytest.raises(ValueError): write_return(arr_no_nan, fobj, np.int8, intercept=257.1, divslope=2) # OK with nan2zero false - back_arr = write_return(arr, fobj, np.int8, - intercept=257.1, divslope=2, nan2zero=False) + back_arr = write_return( + arr, fobj, np.int8, intercept=257.1, divslope=2, nan2zero=False + ) assert_array_equal([-128, -128, -128, nan_cast], back_arr) def test_a2f_non_numeric(): # Reminder that we may get structured dtypes - dt = np.dtype([('f1', 'f'), ('f2', 'i2')]) + dt = np.dtype([("f1", "f"), ("f2", "i2")]) arr = np.zeros((2,), dtype=dt) - arr['f1'] = 0.4, 0.6 - arr['f2'] = 10, 12 + arr["f1"] = 0.4, 0.6 + arr["f2"] = 10, 12 fobj = BytesIO() back_arr = write_return(arr, fobj, dt) assert_array_equal(back_arr, arr) @@ -732,17 +739,17 @@ def test_apply_scaling(): assert (f32_arr * f64(1)).dtype == np.float32 assert (f32_arr + f64(1)).dtype == np.float32 # The function does upcast though - ret=apply_read_scaling(np.float32(0), np.float64(2)) + ret = apply_read_scaling(np.float32(0), np.float64(2)) assert ret.dtype == np.float64 - ret=apply_read_scaling(np.float32(0), inter=np.float64(2)) + ret = apply_read_scaling(np.float32(0), inter=np.float64(2)) assert ret.dtype == np.float64 # Check integer inf upcast - big=f32(type_info(f32)['max']) + big = f32(type_info(f32)["max"]) # Normally this would not upcast assert (i16_arr * big).dtype == np.float32 # An equivalent case is a little hard to find for the intercept - nmant_32=type_info(np.float32)['nmant'] - big_delta=np.float32(2**(floor_log2(big) - nmant_32)) + nmant_32 = type_info(np.float32)["nmant"] + big_delta = np.float32(2 ** (floor_log2(big) - nmant_32)) assert (i16_arr * big_delta + big).dtype == np.float32 # Upcasting does occur with this routine assert apply_read_scaling(i16_arr, big).dtype == np.float64 @@ -756,15 +763,17 @@ def test_apply_scaling(): assert apply_read_scaling(np.int8(0), f32(-1e38), f32(0.0)).dtype == np.float64 # Non-zero intercept still generates floats assert_dt_equal(apply_read_scaling(i16_arr, 1.0, 1.0).dtype, float) - assert_dt_equal(apply_read_scaling( - np.zeros((1,), dtype=np.int32), 1.0, 1.0).dtype, float) - assert_dt_equal(apply_read_scaling( - np.zeros((1,), dtype=np.int64), 1.0, 1.0).dtype, float) + assert_dt_equal( + apply_read_scaling(np.zeros((1,), dtype=np.int32), 1.0, 1.0).dtype, float + ) + assert_dt_equal( + apply_read_scaling(np.zeros((1,), dtype=np.int64), 1.0, 1.0).dtype, float + ) def test_apply_read_scaling_ints(): # Test that apply_read_scaling copes with integer scaling inputs - arr=np.arange(10, dtype=np.int16) + arr = np.arange(10, dtype=np.int16) assert_array_equal(apply_read_scaling(arr, 1, 0), arr) assert_array_equal(apply_read_scaling(arr, 1, 1), arr + 1) assert_array_equal(apply_read_scaling(arr, 2, 1), arr * 2 + 1) @@ -772,7 +781,7 @@ def test_apply_read_scaling_ints(): def test_apply_read_scaling_nones(): # Check that we can pass None as slope and inter to apply read scaling - arr=np.arange(10, dtype=np.int16) + arr = np.arange(10, dtype=np.int16) assert_array_equal(apply_read_scaling(arr, None, None), arr) assert_array_equal(apply_read_scaling(arr, 2, None), arr * 2) assert_array_equal(apply_read_scaling(arr, None, 1), arr + 1) @@ -792,50 +801,52 @@ def test_working_type(): # need this because of the very confusing np.int32 != np.intp (on 32 bit). def wt(*args, **kwargs): return np.dtype(working_type(*args, **kwargs)).str - d1=np.atleast_1d + + d1 = np.atleast_1d for in_type in NUMERIC_TYPES: - in_ts=np.dtype(in_type).str + in_ts = np.dtype(in_type).str assert wt(in_type) == in_ts assert wt(in_type, 1, 0) == in_ts assert wt(in_type, 1.0, 0.0) == in_ts - in_val=d1(in_type(0)) + in_val = d1(in_type(0)) for slope_type in NUMERIC_TYPES: - sl_val=slope_type(1) # no scaling, regardless of type + sl_val = slope_type(1) # no scaling, regardless of type assert wt(in_type, sl_val, 0.0) == in_ts - sl_val=slope_type(2) # actual scaling - out_val=in_val / d1(sl_val) + sl_val = slope_type(2) # actual scaling + out_val = in_val / d1(sl_val) assert wt(in_type, sl_val) == out_val.dtype.str for inter_type in NUMERIC_TYPES: - i_val=inter_type(0) # no scaling, regardless of type + i_val = inter_type(0) # no scaling, regardless of type assert wt(in_type, 1, i_val) == in_ts - i_val=inter_type(1) # actual scaling - out_val=in_val - d1(i_val) + i_val = inter_type(1) # actual scaling + out_val = in_val - d1(i_val) assert wt(in_type, 1, i_val) == out_val.dtype.str # Combine scaling and intercept - out_val=(in_val - d1(i_val)) / d1(sl_val) + out_val = (in_val - d1(i_val)) / d1(sl_val) assert wt(in_type, sl_val, i_val) == out_val.dtype.str # Confirm that type codes and dtypes work as well - f32s=np.dtype(np.float32).str - assert wt('f4', 1, 0) == f32s - assert wt(np.dtype('f4'), 1, 0) == f32s + f32s = np.dtype(np.float32).str + assert wt("f4", 1, 0) == f32s + assert wt(np.dtype("f4"), 1, 0) == f32s def test_better_float(): # Better float function def check_against(f1, f2): return f1 if FLOAT_TYPES.index(f1) >= FLOAT_TYPES.index(f2) else f2 + for first in FLOAT_TYPES: - for other in IUINT_TYPES + np.sctypes['complex']: + for other in IUINT_TYPES + np.sctypes["complex"]: assert better_float_of(first, other) == first assert better_float_of(other, first) == first - for other2 in IUINT_TYPES + np.sctypes['complex']: + for other2 in IUINT_TYPES + np.sctypes["complex"]: assert better_float_of(other, other2) == np.float32 assert better_float_of(other, other2, np.float64) == np.float64 for second in FLOAT_TYPES: assert better_float_of(first, second) == check_against(first, second) # Check codes and dtypes work - assert better_float_of('f4', 'f8', 'f4') == np.float64 - assert better_float_of('i4', 'i8', 'f8') == np.float64 + assert better_float_of("f4", "f8", "f4") == np.float64 + assert better_float_of("i4", "i8", "f8") == np.float64 def test_best_write_scale_ftype(): @@ -843,23 +854,29 @@ def test_best_write_scale_ftype(): # Types return better of (default, array type) unless scale overflows. # Return float type cannot be less capable than the input array type for dtt in IUINT_TYPES + FLOAT_TYPES: - arr=np.arange(10, dtype=dtt) + arr = np.arange(10, dtype=dtt) assert best_write_scale_ftype(arr, 1, 0) == better_float_of(dtt, np.float32) - assert best_write_scale_ftype(arr, 1, 0, np.float64) == better_float_of(dtt, np.float64) - assert best_write_scale_ftype(arr, np.float32(2), 0) == better_float_of(dtt, np.float32) - assert best_write_scale_ftype(arr, 1, np.float32(1)) == better_float_of(dtt, np.float32) + assert best_write_scale_ftype(arr, 1, 0, np.float64) == better_float_of( + dtt, np.float64 + ) + assert best_write_scale_ftype(arr, np.float32(2), 0) == better_float_of( + dtt, np.float32 + ) + assert best_write_scale_ftype(arr, 1, np.float32(1)) == better_float_of( + dtt, np.float32 + ) # Overflowing ints with scaling results in upcast - best_vals=((np.float32, np.float64),) + best_vals = ((np.float32, np.float64),) if np.longdouble in OK_FLOATS: best_vals += ((np.float64, np.longdouble),) for lower_t, higher_t in best_vals: # Information on this float - L_info=type_info(lower_t) - t_max=L_info['max'] - nmant=L_info['nmant'] # number of significand digits - big_delta=lower_t(2**(floor_log2(t_max) - nmant)) # delta below max + L_info = type_info(lower_t) + t_max = L_info["max"] + nmant = L_info["nmant"] # number of significand digits + big_delta = lower_t(2 ** (floor_log2(t_max) - nmant)) # delta below max # Even large values that don't overflow don't change output - arr=np.array([0, t_max], dtype=lower_t) + arr = np.array([0, t_max], dtype=lower_t) assert best_write_scale_ftype(arr, 1, 0) == lower_t # Scaling > 1 reduces output values, so no upcast needed assert best_write_scale_ftype(arr, lower_t(1.01), 0) == lower_t @@ -869,28 +886,29 @@ def test_best_write_scale_ftype(): assert best_write_scale_ftype(arr, 1, -big_delta / 2.01) == lower_t assert best_write_scale_ftype(arr, 1, -big_delta / 2.0) == higher_t # With infs already in input, default type returns - arr[0]=np.inf + arr[0] = np.inf assert best_write_scale_ftype(arr, lower_t(0.5), 0) == lower_t - arr[0]=-np.inf + arr[0] = -np.inf assert best_write_scale_ftype(arr, lower_t(0.5), 0) == lower_t def test_can_cast(): - tests=((np.float32, np.float32, True, True, True), - (np.float64, np.float32, True, True, True), - (np.complex128, np.float32, False, False, False), - (np.float32, np.complex128, True, True, True), - (np.float32, np.uint8, False, True, True), - (np.uint32, np.complex128, True, True, True), - (np.int64, np.float32, True, True, True), - (np.complex128, np.int16, False, False, False), - (np.float32, np.int16, False, True, True), - (np.uint8, np.int16, True, True, True), - (np.uint16, np.int16, False, True, True), - (np.int16, np.uint16, False, False, True), - (np.int8, np.uint16, False, False, True), - (np.uint16, np.uint8, False, True, True), - ) + tests = ( + (np.float32, np.float32, True, True, True), + (np.float64, np.float32, True, True, True), + (np.complex128, np.float32, False, False, False), + (np.float32, np.complex128, True, True, True), + (np.float32, np.uint8, False, True, True), + (np.uint32, np.complex128, True, True, True), + (np.int64, np.float32, True, True, True), + (np.complex128, np.int16, False, False, False), + (np.float32, np.int16, False, True, True), + (np.uint8, np.int16, True, True, True), + (np.uint16, np.int16, False, True, True), + (np.int16, np.uint16, False, False, True), + (np.int8, np.uint16, False, False, True), + (np.uint16, np.uint8, False, True, True), + ) for intype, outtype, def_res, scale_res, all_res in tests: assert def_res == can_cast(intype, outtype) assert scale_res == can_cast(intype, outtype, False, True) @@ -898,54 +916,54 @@ def test_can_cast(): def test_write_zeros(): - bio=BytesIO() + bio = BytesIO() write_zeros(bio, 10000) - assert bio.getvalue() == b'\x00' * 10000 + assert bio.getvalue() == b"\x00" * 10000 bio.seek(0) bio.truncate(0) write_zeros(bio, 10000, 256) - assert bio.getvalue() == b'\x00' * 10000 + assert bio.getvalue() == b"\x00" * 10000 bio.seek(0) bio.truncate(0) write_zeros(bio, 200, 256) - assert bio.getvalue() == b'\x00' * 200 + assert bio.getvalue() == b"\x00" * 200 def test_seek_tell(): # Test seek tell routine - bio=BytesIO() - in_files=bio, 'test.bin', 'test.gz', 'test.bz2' - start=10 - end=100 - diff=end - start - tail=7 + bio = BytesIO() + in_files = bio, "test.bin", "test.gz", "test.bz2" + start = 10 + end = 100 + diff = end - start + tail = 7 with InTemporaryDirectory(): for in_file, write0 in itertools.product(in_files, (False, True)): - st=functools.partial(seek_tell, write0=write0) + st = functools.partial(seek_tell, write0=write0) bio.seek(0) # First write the file - with ImageOpener(in_file, 'wb') as fobj: + with ImageOpener(in_file, "wb") as fobj: assert fobj.tell() == 0 # already at position - OK st(fobj, 0) assert fobj.tell() == 0 # Move position by writing - fobj.write(b'\x01' * start) + fobj.write(b"\x01" * start) assert fobj.tell() == start # Files other than BZ2Files can seek forward on write, leaving # zeros in their wake. BZ2Files can't seek when writing, unless # we enable the write0 flag to seek_tell - if not write0 and in_file == 'test.bz2': # Can't seek write in bz2 + if not write0 and in_file == "test.bz2": # Can't seek write in bz2 # write the zeros by hand for the read test below - fobj.write(b'\x00' * diff) + fobj.write(b"\x00" * diff) else: st(fobj, end) assert fobj.tell() == end # Write tail - fobj.write(b'\x02' * tail) + fobj.write(b"\x02" * tail) bio.seek(0) # Now read back the file testing seek_tell in reading mode - with ImageOpener(in_file, 'rb') as fobj: + with ImageOpener(in_file, "rb") as fobj: assert fobj.tell() == 0 st(fobj, 0) assert fobj.tell() == 0 @@ -957,42 +975,42 @@ def test_seek_tell(): st(fobj, 0) bio.seek(0) # Check we have the expected written output - with ImageOpener(in_file, 'rb') as fobj: - assert fobj.read() == b'\x01' * start + b'\x00' * diff + b'\x02' * tail - for in_file in ('test2.gz', 'test2.bz2'): + with ImageOpener(in_file, "rb") as fobj: + assert fobj.read() == b"\x01" * start + b"\x00" * diff + b"\x02" * tail + for in_file in ("test2.gz", "test2.bz2"): # Check failure of write seek backwards - with ImageOpener(in_file, 'wb') as fobj: - fobj.write(b'g' * 10) + with ImageOpener(in_file, "wb") as fobj: + fobj.write(b"g" * 10) assert fobj.tell() == 10 seek_tell(fobj, 10) assert fobj.tell() == 10 with pytest.raises(IOError): seek_tell(fobj, 5) # Make sure read seeks don't affect file - with ImageOpener(in_file, 'rb') as fobj: + with ImageOpener(in_file, "rb") as fobj: seek_tell(fobj, 10) seek_tell(fobj, 0) - with ImageOpener(in_file, 'rb') as fobj: - assert fobj.read() == b'g' * 10 + with ImageOpener(in_file, "rb") as fobj: + assert fobj.read() == b"g" * 10 def test_seek_tell_logic(): # Test logic of seek_tell write0 with dummy class # Seek works? OK - bio=BytesIO() + bio = BytesIO() seek_tell(bio, 10) assert bio.tell() == 10 class BabyBio(BytesIO): - def seek(self, *args): raise IOError() - bio=BabyBio() + + bio = BabyBio() # Fresh fileobj, position 0, can't seek - error with pytest.raises(IOError): bio.seek(10) # Put fileobj in correct position by writing - ZEROB=b'\x00' + ZEROB = b"\x00" bio.write(ZEROB * 10) seek_tell(bio, 10) # already there, nothing to do assert bio.tell() == 10 @@ -1007,110 +1025,123 @@ def seek(self, *args): def test_fname_ext_ul_case(): # Get filename ignoring the case of the filename extension with InTemporaryDirectory(): - with open('afile.TXT', 'wt') as fobj: - fobj.write('Interesting information') + with open("afile.TXT", "wt") as fobj: + fobj.write("Interesting information") # OSX usually has case-insensitive file systems; Windows also - os_cares_case=not exists('afile.txt') - with open('bfile.txt', 'wt') as fobj: - fobj.write('More interesting information') + os_cares_case = not exists("afile.txt") + with open("bfile.txt", "wt") as fobj: + fobj.write("More interesting information") # If there is no file, the case doesn't change - assert fname_ext_ul_case('nofile.txt') == 'nofile.txt' - assert fname_ext_ul_case('nofile.TXT') == 'nofile.TXT' + assert fname_ext_ul_case("nofile.txt") == "nofile.txt" + assert fname_ext_ul_case("nofile.TXT") == "nofile.TXT" # If there is a file, accept upper or lower case for ext if os_cares_case: - assert fname_ext_ul_case('afile.txt') == 'afile.TXT' - assert fname_ext_ul_case('bfile.TXT') == 'bfile.txt' + assert fname_ext_ul_case("afile.txt") == "afile.TXT" + assert fname_ext_ul_case("bfile.TXT") == "bfile.txt" else: - assert fname_ext_ul_case('afile.txt') == 'afile.txt' - assert fname_ext_ul_case('bfile.TXT') == 'bfile.TXT' - assert fname_ext_ul_case('afile.TXT') == 'afile.TXT' - assert fname_ext_ul_case('bfile.txt') == 'bfile.txt' + assert fname_ext_ul_case("afile.txt") == "afile.txt" + assert fname_ext_ul_case("bfile.TXT") == "bfile.TXT" + assert fname_ext_ul_case("afile.TXT") == "afile.TXT" + assert fname_ext_ul_case("bfile.txt") == "bfile.txt" # Not mixed case though - assert fname_ext_ul_case('afile.TxT') == 'afile.TxT' + assert fname_ext_ul_case("afile.TxT") == "afile.TxT" def test_allopen(): # This import into volumeutils is for compatibility. The code is the # ``openers`` module. with clear_and_catch_warnings() as w: - warnings.filterwarnings('once', category=DeprecationWarning) + warnings.filterwarnings("once", category=DeprecationWarning) # Test default mode is 'rb' - fobj=allopen(__file__) + fobj = allopen(__file__) # Check we got the deprecation warning assert len(w) == 1 - assert fobj.mode == 'rb' + assert fobj.mode == "rb" # That we can set it - fobj=allopen(__file__, 'r') - assert fobj.mode == 'r' + fobj = allopen(__file__, "r") + assert fobj.mode == "r" # with keyword arguments - fobj=allopen(__file__, mode='r') - assert fobj.mode == 'r' + fobj = allopen(__file__, mode="r") + assert fobj.mode == "r" # fileobj returns fileobj - msg=b'tiddle pom' - sobj=BytesIO(msg) - fobj=allopen(sobj) + msg = b"tiddle pom" + sobj = BytesIO(msg) + fobj = allopen(sobj) assert fobj.read() == msg # mode is gently ignored - fobj=allopen(sobj, mode='r') + fobj = allopen(sobj, mode="r") def test_allopen_compresslevel(): # We can set the default compression level with the module global # Get some data to compress - with open(__file__, 'rb') as fobj: - my_self=fobj.read() + with open(__file__, "rb") as fobj: + my_self = fobj.read() # Prepare loop - fname='test.gz' - sizes={} + fname = "test.gz" + sizes = {} # Stash module global from .. import volumeutils as vu - original_compress_level=vu.default_compresslevel + + original_compress_level = vu.default_compresslevel assert original_compress_level == 1 try: with InTemporaryDirectory(): - for compresslevel in ('default', 1, 9): - if compresslevel != 'default': - vu.default_compresslevel=compresslevel + for compresslevel in ("default", 1, 9): + if compresslevel != "default": + vu.default_compresslevel = compresslevel with warnings.catch_warnings(): warnings.simplefilter("ignore") - with allopen(fname, 'wb') as fobj: + with allopen(fname, "wb") as fobj: fobj.write(my_self) - with open(fname, 'rb') as fobj: - my_selves_smaller=fobj.read() - sizes[compresslevel]=len(my_selves_smaller) - assert sizes['default'] == sizes[1] + with open(fname, "rb") as fobj: + my_selves_smaller = fobj.read() + sizes[compresslevel] = len(my_selves_smaller) + assert sizes["default"] == sizes[1] assert sizes[1] > sizes[9] finally: - vu.default_compresslevel=original_compress_level + vu.default_compresslevel = original_compress_level def test_shape_zoom_affine(): - shape=(3, 5, 7) - zooms=(3, 2, 1) - res=shape_zoom_affine(shape, zooms) - exp=np.array([[-3., 0., 0., 3.], - [0., 2., 0., -4.], - [0., 0., 1., -3.], - [0., 0., 0., 1.]]) + shape = (3, 5, 7) + zooms = (3, 2, 1) + res = shape_zoom_affine(shape, zooms) + exp = np.array( + [ + [-3.0, 0.0, 0.0, 3.0], + [0.0, 2.0, 0.0, -4.0], + [0.0, 0.0, 1.0, -3.0], + [0.0, 0.0, 0.0, 1.0], + ] + ) assert_array_almost_equal(res, exp) - res=shape_zoom_affine((3, 5), (3, 2)) - exp=np.array([[-3., 0., 0., 3.], - [0., 2., 0., -4.], - [0., 0., 1., -0.], - [0., 0., 0., 1.]]) + res = shape_zoom_affine((3, 5), (3, 2)) + exp = np.array( + [ + [-3.0, 0.0, 0.0, 3.0], + [0.0, 2.0, 0.0, -4.0], + [0.0, 0.0, 1.0, -0.0], + [0.0, 0.0, 0.0, 1.0], + ] + ) assert_array_almost_equal(res, exp) - res=shape_zoom_affine(shape, zooms, False) - exp=np.array([[3., 0., 0., -3.], - [0., 2., 0., -4.], - [0., 0., 1., -3.], - [0., 0., 0., 1.]]) + res = shape_zoom_affine(shape, zooms, False) + exp = np.array( + [ + [3.0, 0.0, 0.0, -3.0], + [0.0, 2.0, 0.0, -4.0], + [0.0, 0.0, 1.0, -3.0], + [0.0, 0.0, 0.0, 1.0], + ] + ) assert_array_almost_equal(res, exp) def test_rec2dict(): - r=np.zeros((), dtype=[('x', 'i4'), ('s', 'S10')]) - d=rec2dict(r) - assert d == {'x': 0, 's': b''} + r = np.zeros((), dtype=[("x", "i4"), ("s", "S10")]) + d = rec2dict(r) + assert d == {"x": 0, "s": b""} def test_dtypes(): @@ -1121,127 +1152,156 @@ def test_dtypes(): # In [10]: dtype(' Date: Mon, 11 Nov 2019 16:08:53 -0500 Subject: [PATCH 3/7] fixed spacing --- nibabel/tests/test_volumeutils.py | 65 +++++++------------------------ 1 file changed, 14 insertions(+), 51 deletions(-) diff --git a/nibabel/tests/test_volumeutils.py b/nibabel/tests/test_volumeutils.py index 34c2196b93..9f7bda3350 100644 --- a/nibabel/tests/test_volumeutils.py +++ b/nibabel/tests/test_volumeutils.py @@ -99,9 +99,7 @@ def make_array(n, bytes): # Check whether file, gzip file, bz2 file reread memory from cache fname = "test.bin" with InTemporaryDirectory(): - for n, opener in itertools.product( - (256, 1024, 2560, 25600), (open, gzip.open, BZ2File) - ): + for n, opener in itertools.product((256, 1024, 2560, 25600), (open, gzip.open, BZ2File)): in_arr = np.arange(n, dtype=dtype) # Write array to file fobj_w = opener(fname, "wb") @@ -297,9 +295,7 @@ def test_array_to_file(): ndt = dt.newbyteorder(code) for allow_intercept in (True, False): with suppress_warnings(): # deprecated - scale, intercept, mn, mx = calculate_scale( - arr, ndt, allow_intercept - ) + scale, intercept, mn, mx = calculate_scale(arr, ndt, allow_intercept) data_back = write_return(arr, str_io, ndt, 0, intercept, scale) assert_array_almost_equal(arr, data_back) # Test array-like @@ -332,13 +328,7 @@ def test_a2f_upscale(): str_io = BytesIO() # We need to provide mn, mx for function to be able to calculate upcasting array_to_file( - arr, - str_io, - np.uint8, - intercept=inter, - divslope=slope, - mn=info["min"], - mx=info["max"], + arr, str_io, np.uint8, intercept=inter, divslope=slope, mn=info["min"], mx=info["max"], ) raw = array_from_file(arr.shape, np.uint8, str_io) back = apply_read_scaling(raw, slope, inter) @@ -495,9 +485,7 @@ def test_a2f_big_scalers(): # We need nan2zero=False because we can't represent 0 in the input, given # the scaling and the output range. with suppress_warnings(): # overflow - array_to_file( - arr, str_io, np.int8, intercept=np.float32(2 ** 120), nan2zero=False - ) + array_to_file(arr, str_io, np.int8, intercept=np.float32(2 ** 120), nan2zero=False) data_back = array_from_file(arr.shape, np.int8, str_io) assert_array_equal(data_back, [-128, -128, 127]) # Scales also if mx, mn specified? Same notes and complaints as for the test @@ -522,9 +510,7 @@ def test_a2f_big_scalers(): assert_array_equal(data_back, [-128, 0, 127]) # with mn, mx specified? str_io.seek(0) - array_to_file( - arr, str_io, np.int8, mn=info["min"], mx=info["max"], divslope=np.float32(0.5) - ) + array_to_file(arr, str_io, np.int8, mn=info["min"], mx=info["max"], divslope=np.float32(0.5)) data_back = array_from_file(arr.shape, np.int8, str_io) assert_array_equal(data_back, [-128, 0, 127]) @@ -562,11 +548,7 @@ def test_a2f_scaled_unscaled(): if in_dtype in CFLOAT_TYPES and not mn_out <= nan_fill <= mx_out: with pytest.raises(ValueError): array_to_file( - arr, - fobj, - out_dtype=out_dtype, - divslope=divslope, - intercept=intercept, + arr, fobj, out_dtype=out_dtype, divslope=divslope, intercept=intercept, ) continue with suppress_warnings(): @@ -614,9 +596,7 @@ def test_a2f_nanpos(): def test_a2f_bad_scaling(): # Test that pathological scalers raise an error - NUMERICAL_TYPES = sum( - [np.sctypes[key] for key in ["int", "uint", "float", "complex"]], [] - ) + NUMERICAL_TYPES = sum([np.sctypes[key] for key in ["int", "uint", "float", "complex"]], []) for in_type, out_type, slope, inter in itertools.product( NUMERICAL_TYPES, NUMERICAL_TYPES, @@ -687,9 +667,7 @@ def test_a2f_nan2zero_range(): with pytest.raises(ValueError): write_return(arr_no_nan, fobj, np.int8, intercept=257.1, divslope=2) # OK with nan2zero false - back_arr = write_return( - arr, fobj, np.int8, intercept=257.1, divslope=2, nan2zero=False - ) + back_arr = write_return(arr, fobj, np.int8, intercept=257.1, divslope=2, nan2zero=False) assert_array_equal([-128, -128, -128, nan_cast], back_arr) @@ -763,12 +741,8 @@ def test_apply_scaling(): assert apply_read_scaling(np.int8(0), f32(-1e38), f32(0.0)).dtype == np.float64 # Non-zero intercept still generates floats assert_dt_equal(apply_read_scaling(i16_arr, 1.0, 1.0).dtype, float) - assert_dt_equal( - apply_read_scaling(np.zeros((1,), dtype=np.int32), 1.0, 1.0).dtype, float - ) - assert_dt_equal( - apply_read_scaling(np.zeros((1,), dtype=np.int64), 1.0, 1.0).dtype, float - ) + assert_dt_equal(apply_read_scaling(np.zeros((1,), dtype=np.int32), 1.0, 1.0).dtype, float) + assert_dt_equal(apply_read_scaling(np.zeros((1,), dtype=np.int64), 1.0, 1.0).dtype, float) def test_apply_read_scaling_ints(): @@ -856,15 +830,9 @@ def test_best_write_scale_ftype(): for dtt in IUINT_TYPES + FLOAT_TYPES: arr = np.arange(10, dtype=dtt) assert best_write_scale_ftype(arr, 1, 0) == better_float_of(dtt, np.float32) - assert best_write_scale_ftype(arr, 1, 0, np.float64) == better_float_of( - dtt, np.float64 - ) - assert best_write_scale_ftype(arr, np.float32(2), 0) == better_float_of( - dtt, np.float32 - ) - assert best_write_scale_ftype(arr, 1, np.float32(1)) == better_float_of( - dtt, np.float32 - ) + assert best_write_scale_ftype(arr, 1, 0, np.float64) == better_float_of(dtt, np.float64) + assert best_write_scale_ftype(arr, np.float32(2), 0) == better_float_of(dtt, np.float32) + assert best_write_scale_ftype(arr, 1, np.float32(1)) == better_float_of(dtt, np.float32) # Overflowing ints with scaling results in upcast best_vals = ((np.float32, np.float64),) if np.longdouble in OK_FLOATS: @@ -1248,12 +1216,7 @@ def assert_rt( # check defense against modifying data in-place for in_cast, pre_clips, inter, slope, post_clips, nan_fill in itp( - (None, np.float32), - (None, (-1, 25)), - (0.0, 1.0), - (1.0, 0.5), - (None, (-2, 49)), - (None, 1), + (None, np.float32), (None, (-1, 25)), (0.0, 1.0), (1.0, 0.5), (None, (-2, 49)), (None, 1), ): data = np.arange(24).astype(np.float32) assert_rt( From 67eb65efe22127d8714ea972baadba22ca712363 Mon Sep 17 00:00:00 2001 From: Chris Gorgolewski Date: Mon, 11 Nov 2019 16:14:19 -0500 Subject: [PATCH 4/7] fixed spacing --- nibabel/tests/test_volumeutils.py | 32 ++++--------------------------- 1 file changed, 4 insertions(+), 28 deletions(-) diff --git a/nibabel/tests/test_volumeutils.py b/nibabel/tests/test_volumeutils.py index 9f7bda3350..d2434dc331 100644 --- a/nibabel/tests/test_volumeutils.py +++ b/nibabel/tests/test_volumeutils.py @@ -1076,32 +1076,17 @@ def test_shape_zoom_affine(): zooms = (3, 2, 1) res = shape_zoom_affine(shape, zooms) exp = np.array( - [ - [-3.0, 0.0, 0.0, 3.0], - [0.0, 2.0, 0.0, -4.0], - [0.0, 0.0, 1.0, -3.0], - [0.0, 0.0, 0.0, 1.0], - ] + [[-3.0, 0.0, 0.0, 3.0], [0.0, 2.0, 0.0, -4.0], [0.0, 0.0, 1.0, -3.0], [0.0, 0.0, 0.0, 1.0],] ) assert_array_almost_equal(res, exp) res = shape_zoom_affine((3, 5), (3, 2)) exp = np.array( - [ - [-3.0, 0.0, 0.0, 3.0], - [0.0, 2.0, 0.0, -4.0], - [0.0, 0.0, 1.0, -0.0], - [0.0, 0.0, 0.0, 1.0], - ] + [[-3.0, 0.0, 0.0, 3.0], [0.0, 2.0, 0.0, -4.0], [0.0, 0.0, 1.0, -0.0], [0.0, 0.0, 0.0, 1.0],] ) assert_array_almost_equal(res, exp) res = shape_zoom_affine(shape, zooms, False) exp = np.array( - [ - [3.0, 0.0, 0.0, -3.0], - [0.0, 2.0, 0.0, -4.0], - [0.0, 0.0, 1.0, -3.0], - [0.0, 0.0, 0.0, 1.0], - ] + [[3.0, 0.0, 0.0, -3.0], [0.0, 2.0, 0.0, -4.0], [0.0, 0.0, 1.0, -3.0], [0.0, 0.0, 0.0, 1.0],] ) assert_array_almost_equal(res, exp) @@ -1179,16 +1164,7 @@ def assert_rt( if have_nans and nan_fill is None and not out_dtype.type == "f": raise ValueError("Cannot handle this case") _write_data( - to_write, - sio, - out_dtype, - order, - in_cast, - pre_clips, - inter, - slope, - post_clips, - nan_fill, + to_write, sio, out_dtype, order, in_cast, pre_clips, inter, slope, post_clips, nan_fill, ) arr = np.ndarray(shape, out_dtype, buffer=sio.getvalue(), order=order) expected = to_write.copy() From d6aa52dbe08961b4afd78f1c84bcbf823aa30632 Mon Sep 17 00:00:00 2001 From: Chris Gorgolewski Date: Mon, 11 Nov 2019 16:28:56 -0500 Subject: [PATCH 5/7] fixed spacing, revert --- nibabel/tests/test_volumeutils.py | 697 +++++++++++++++--------------- 1 file changed, 349 insertions(+), 348 deletions(-) diff --git a/nibabel/tests/test_volumeutils.py b/nibabel/tests/test_volumeutils.py index d2434dc331..19e5f3bd27 100644 --- a/nibabel/tests/test_volumeutils.py +++ b/nibabel/tests/test_volumeutils.py @@ -6,7 +6,7 @@ # copyright and license terms. # ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## -""" Test for volumeutils module """ +''' Test for volumeutils module ''' import os from os.path import exists @@ -27,60 +27,54 @@ from ..tmpdirs import InTemporaryDirectory from ..openers import ImageOpener from .. import volumeutils -from ..volumeutils import ( - array_from_file, - _is_compressed_fobj, - array_to_file, - allopen, # for backwards compatibility - fname_ext_ul_case, - calculate_scale, - can_cast, - write_zeros, - seek_tell, - apply_read_scaling, - working_type, - best_write_scale_ftype, - better_float_of, - int_scinter_ftype, - make_dt_codes, - native_code, - shape_zoom_affine, - rec2dict, - _dt_min_max, - _write_data, - _ftype4scaled_finite, -) +from ..volumeutils import (array_from_file, + _is_compressed_fobj, + array_to_file, + allopen, # for backwards compatibility + fname_ext_ul_case, + calculate_scale, + can_cast, + write_zeros, + seek_tell, + apply_read_scaling, + working_type, + best_write_scale_ftype, + better_float_of, + int_scinter_ftype, + make_dt_codes, + native_code, + shape_zoom_affine, + rec2dict, + _dt_min_max, + _write_data, + _ftype4scaled_finite, + ) from ..openers import Opener, BZ2File -from ..casting import floor_log2, type_info, OK_FLOATS, shared_range +from ..casting import (floor_log2, type_info, OK_FLOATS, shared_range) -from numpy.testing import assert_array_almost_equal, assert_array_equal +from numpy.testing import (assert_array_almost_equal, + assert_array_equal) -from ..testing_pytest import ( - assert_dt_equal, - assert_allclose_safely, - suppress_warnings, - clear_and_catch_warnings, -) +from ..testing_pytest import (assert_dt_equal, assert_allclose_safely, + suppress_warnings, clear_and_catch_warnings) #: convenience variables for numpy types -FLOAT_TYPES = np.sctypes["float"] -COMPLEX_TYPES = np.sctypes["complex"] +FLOAT_TYPES = np.sctypes['float'] +COMPLEX_TYPES = np.sctypes['complex'] CFLOAT_TYPES = FLOAT_TYPES + COMPLEX_TYPES -INT_TYPES = np.sctypes["int"] -IUINT_TYPES = INT_TYPES + np.sctypes["uint"] +INT_TYPES = np.sctypes['int'] +IUINT_TYPES = INT_TYPES + np.sctypes['uint'] NUMERIC_TYPES = CFLOAT_TYPES + IUINT_TYPES def test__is_compressed_fobj(): # _is_compressed helper function with InTemporaryDirectory(): - for ext, opener, compressed in ( - ("", open, False), - (".gz", gzip.open, True), - (".bz2", BZ2File, True), - ): - fname = "test.bin" + ext - for mode in ("wb", "rb"): + for ext, opener, compressed in (('', open, False), + ('.gz', gzip.open, True), + ('.bz2', BZ2File, True)): + fname = 'test.bin' + ext + for mode in ('wb', 'rb'): fobj = opener(fname, mode) assert _is_compressed_fobj(fobj) == compressed fobj.close() @@ -97,27 +91,29 @@ def make_array(n, bytes): return arr # Check whether file, gzip file, bz2 file reread memory from cache - fname = "test.bin" + fname = 'test.bin' with InTemporaryDirectory(): - for n, opener in itertools.product((256, 1024, 2560, 25600), (open, gzip.open, BZ2File)): + for n, opener in itertools.product( + (256, 1024, 2560, 25600), + (open, gzip.open, BZ2File)): in_arr = np.arange(n, dtype=dtype) # Write array to file - fobj_w = opener(fname, "wb") + fobj_w = opener(fname, 'wb') fobj_w.write(in_arr.tostring()) fobj_w.close() # Read back from file - fobj_r = opener(fname, "rb") + fobj_r = opener(fname, 'rb') try: contents1 = bytearray(4 * n) fobj_r.readinto(contents1) # Second element is 1 - assert contents1[0:8] != b"\x00" * 8 + assert contents1[0:8] != b'\x00' * 8 out_arr = make_array(n, contents1) assert_array_equal(in_arr, out_arr) # Set second element to 0 out_arr[1] = 0 # Show this changed the bytes string - assert contents1[:8] == b"\x00" * 8 + assert contents1[:8] == b'\x00' * 8 # Reread, to get unmodified contents fobj_r.seek(0) contents2 = bytearray(4 * n) @@ -140,11 +136,11 @@ def test_array_from_file(): offset = 10 assert buf_chk(in_arr, BytesIO(), None, offset) # check on real file - fname = "test.bin" + fname = 'test.bin' with InTemporaryDirectory(): # fortran ordered - out_buf = open(fname, "wb") - in_buf = open(fname, "rb") + out_buf = open(fname, 'wb') + in_buf = open(fname, 'rb') assert buf_chk(in_arr, out_buf, in_buf, offset) # Drop offset to check that shape's not coming from file length out_buf.seek(0) @@ -153,9 +149,9 @@ def test_array_from_file(): assert buf_chk(in_arr, out_buf, in_buf, offset) del out_buf, in_buf # Make sure empty shape, and zero length, give empty arrays - arr = array_from_file((), np.dtype("f8"), BytesIO()) + arr = array_from_file((), np.dtype('f8'), BytesIO()) assert len(arr) == 0 - arr = array_from_file((0,), np.dtype("f8"), BytesIO()) + arr = array_from_file((0,), np.dtype('f8'), BytesIO()) assert len(arr) == 0 # Check error from small file with pytest.raises(IOError): @@ -163,8 +159,8 @@ def test_array_from_file(): # check on real file fd, fname = tempfile.mkstemp() with InTemporaryDirectory(): - open(fname, "wb").write(b"1") - in_buf = open(fname, "rb") + open(fname, 'wb').write(b'1') + in_buf = open(fname, 'rb') # For windows this will raise a WindowsError from mmap, Unices # appear to raise an IOError with pytest.raises(Exception): @@ -178,51 +174,55 @@ def test_array_from_file_mmap(): with InTemporaryDirectory(): for dt in (np.int16, np.float): arr = np.arange(np.prod(shape), dtype=dt).reshape(shape) - with open("test.bin", "wb") as fobj: - fobj.write(arr.tostring(order="F")) - with open("test.bin", "rb") as fobj: + with open('test.bin', 'wb') as fobj: + fobj.write(arr.tostring(order='F')) + with open('test.bin', 'rb') as fobj: res = array_from_file(shape, dt, fobj) assert_array_equal(res, arr) assert isinstance(res, np.memmap) - assert res.mode == "c" - with open("test.bin", "rb") as fobj: + assert res.mode == 'c' + with open('test.bin', 'rb') as fobj: res = array_from_file(shape, dt, fobj, mmap=True) assert_array_equal(res, arr) assert isinstance(res, np.memmap) - assert res.mode == "c" - with open("test.bin", "rb") as fobj: - res = array_from_file(shape, dt, fobj, mmap="c") + assert res.mode == 'c' + with open('test.bin', 'rb') as fobj: + res = array_from_file(shape, dt, fobj, mmap='c') assert_array_equal(res, arr) assert isinstance(res, np.memmap) - assert res.mode == "c" - with open("test.bin", "rb") as fobj: - res = array_from_file(shape, dt, fobj, mmap="r") + assert res.mode == 'c' + with open('test.bin', 'rb') as fobj: + res = array_from_file(shape, dt, fobj, mmap='r') assert_array_equal(res, arr) assert isinstance(res, np.memmap) - assert res.mode == "r" - with open("test.bin", "rb+") as fobj: - res = array_from_file(shape, dt, fobj, mmap="r+") + assert res.mode == 'r' + with open('test.bin', 'rb+') as fobj: + res = array_from_file(shape, dt, fobj, mmap='r+') assert_array_equal(res, arr) assert isinstance(res, np.memmap) - assert res.mode == "r+" - with open("test.bin", "rb") as fobj: + assert res.mode == 'r+' + with open('test.bin', 'rb') as fobj: res = array_from_file(shape, dt, fobj, mmap=False) assert_array_equal(res, arr) assert not isinstance(res, np.memmap) - with open("test.bin", "rb") as fobj: + with open('test.bin', 'rb') as fobj: with pytest.raises(ValueError): - array_from_file(shape, dt, fobj, mmap="p") + array_from_file(shape, dt, fobj, mmap='p') def buf_chk(in_arr, out_buf, in_buf, offset): - """ Write contents of in_arr into fileobj, read back, check same """ - instr = b" " * offset + in_arr.tostring(order="F") + ''' Write contents of in_arr into fileobj, read back, check same ''' + instr = b' ' * offset + in_arr.tostring(order='F') out_buf.write(instr) out_buf.flush() if in_buf is None: # we're using in_buf from out_buf out_buf.seek(0) in_buf = out_buf - arr = array_from_file(in_arr.shape, in_arr.dtype, in_buf, offset) + arr = array_from_file( + in_arr.shape, + in_arr.dtype, + in_buf, + offset) return np.allclose(in_arr, arr) @@ -232,13 +232,14 @@ def test_array_from_file_openers(): dtype = np.dtype(np.float32) in_arr = np.arange(24, dtype=dtype).reshape(shape) with InTemporaryDirectory(): - for ext, offset in itertools.product(("", ".gz", ".bz2"), (0, 5, 10)): - fname = "test.bin" + ext - with Opener(fname, "wb") as out_buf: + for ext, offset in itertools.product(('', '.gz', '.bz2'), + (0, 5, 10)): + fname = 'test.bin' + ext + with Opener(fname, 'wb') as out_buf: if offset != 0: # avoid https://bugs.python.org/issue16828 - out_buf.write(b" " * offset) - out_buf.write(in_arr.tostring(order="F")) - with Opener(fname, "rb") as in_buf: + out_buf.write(b' ' * offset) + out_buf.write(in_arr.tostring(order='F')) + with Opener(fname, 'rb') as in_buf: out_arr = array_from_file(shape, dtype, in_buf, offset) assert_array_almost_equal(in_arr, out_arr) # Delete object holding onto file for Windows @@ -250,26 +251,25 @@ def test_array_from_file_reread(): # This is the live check for the generic checks in # test_fobj_string_assumptions offset = 9 - fname = "test.bin" + fname = 'test.bin' with InTemporaryDirectory(): for shape, opener, dtt, order in itertools.product( - ((64,), (64, 65), (64, 65, 66)), - (open, gzip.open, bz2.BZ2File, BytesIO), - (np.int16, np.float32), - ("F", "C"), - ): + ((64,), (64, 65), (64, 65, 66)), + (open, gzip.open, bz2.BZ2File, BytesIO), + (np.int16, np.float32), + ('F', 'C')): n_els = np.prod(shape) in_arr = np.arange(n_els, dtype=dtt).reshape(shape) - is_bio = hasattr(opener, "getvalue") + is_bio = hasattr(opener, 'getvalue') # Write array to file - fobj_w = opener() if is_bio else opener(fname, "wb") - fobj_w.write(b" " * offset) + fobj_w = opener() if is_bio else opener(fname, 'wb') + fobj_w.write(b' ' * offset) fobj_w.write(in_arr.tostring(order=order)) if is_bio: fobj_r = fobj_w else: fobj_w.close() - fobj_r = opener(fname, "rb") + fobj_r = opener(fname, 'rb') # Read back from file try: out_arr = array_from_file(shape, dtt, fobj_r, offset, order) @@ -291,12 +291,14 @@ def test_array_to_file(): str_io = BytesIO() for tp in (np.uint64, np.float, np.complex): dt = np.dtype(tp) - for code in "<>": + for code in '<>': ndt = dt.newbyteorder(code) for allow_intercept in (True, False): with suppress_warnings(): # deprecated - scale, intercept, mn, mx = calculate_scale(arr, ndt, allow_intercept) - data_back = write_return(arr, str_io, ndt, 0, intercept, scale) + scale, intercept, mn, mx = \ + calculate_scale(arr, ndt, allow_intercept) + data_back = write_return(arr, str_io, ndt, + 0, intercept, scale) assert_array_almost_equal(arr, data_back) # Test array-like str_io = BytesIO() @@ -322,14 +324,13 @@ def test_a2f_upscale(): # Test values discovered from stress testing. The largish value (2**115) # overflows to inf after the intercept is subtracted, using float32 as the # working precision. The difference between inf and this value is lost. - arr = np.array([[info["min"], 2 ** 115, info["max"]]], dtype=np.float32) - slope = np.float32(2 ** 121) - inter = info["min"] + arr = np.array([[info['min'], 2**115, info['max']]], dtype=np.float32) + slope = np.float32(2**121) + inter = info['min'] str_io = BytesIO() # We need to provide mn, mx for function to be able to calculate upcasting - array_to_file( - arr, str_io, np.uint8, intercept=inter, divslope=slope, mn=info["min"], mx=info["max"], - ) + array_to_file(arr, str_io, np.uint8, intercept=inter, divslope=slope, + mn=info['min'], mx=info['max']) raw = array_from_file(arr.shape, np.uint8, str_io) back = apply_read_scaling(raw, slope, inter) top = back - arr @@ -344,11 +345,11 @@ def test_a2f_min_max(): for out_dt in (np.float32, np.int8): arr = np.arange(4, dtype=in_dt) # min thresholding - with np.errstate(invalid="ignore"): + with np.errstate(invalid='ignore'): data_back = write_return(arr, str_io, out_dt, 0, 0, 1, 1) assert_array_equal(data_back, [1, 1, 2, 3]) # max thresholding - with np.errstate(invalid="ignore"): + with np.errstate(invalid='ignore'): data_back = write_return(arr, str_io, out_dt, 0, 0, 1, None, 2) assert_array_equal(data_back, [0, 1, 2, 2]) # min max thresholding @@ -373,13 +374,13 @@ def test_a2f_order(): arr = np.array([0.0, 1.0, 2.0]) str_io = BytesIO() # order makes no difference in 1D case - data_back = write_return(arr, str_io, ndt, order="C") + data_back = write_return(arr, str_io, ndt, order='C') assert_array_equal(data_back, [0.0, 1.0, 2.0]) # but does in the 2D case arr = np.array([[0.0, 1.0], [2.0, 3.0]]) - data_back = write_return(arr, str_io, ndt, order="F") + data_back = write_return(arr, str_io, ndt, order='F') assert_array_equal(data_back, arr) - data_back = write_return(arr, str_io, ndt, order="C") + data_back = write_return(arr, str_io, ndt, order='C') assert_array_equal(data_back, arr.T) @@ -393,12 +394,12 @@ def test_a2f_nan2zero(): # True is the default, but just to show it's possible data_back = write_return(arr, str_io, ndt, nan2zero=True) assert_array_equal(data_back, arr) - with np.errstate(invalid="ignore"): + with np.errstate(invalid='ignore'): data_back = write_return(arr, str_io, np.int64, nan2zero=True) assert_array_equal(data_back, [[0, 0], [0, 0]]) # otherwise things get a bit weird; tidied here # How weird? Look at arr.astype(np.int64) - with np.errstate(invalid="ignore"): + with np.errstate(invalid='ignore'): data_back = write_return(arr, str_io, np.int64, nan2zero=False) assert_array_equal(data_back, arr.astype(np.int64)) @@ -417,16 +418,18 @@ def test_a2f_nan2zero_scaling(): # Array values including zero before scaling but not after bio = BytesIO() for in_dt, out_dt, zero_in, inter in itertools.product( - FLOAT_TYPES, IUINT_TYPES, (True, False), (0, -100) - ): + FLOAT_TYPES, + IUINT_TYPES, + (True, False), + (0, -100)): in_info = np.finfo(in_dt) out_info = np.iinfo(out_dt) - mx = min(in_info.max, out_info.max * 2.0, 2 ** 32) + inter + mx = min(in_info.max, out_info.max * 2., 2**32) + inter mn = 0 if zero_in or inter else 100 vals = [np.nan] + [mn, mx] nan_arr = np.array(vals, dtype=in_dt) zero_arr = np.nan_to_num(nan_arr) - with np.errstate(invalid="ignore"): + with np.errstate(invalid='ignore'): back_nan = write_return(nan_arr, bio, np.int64, intercept=inter) back_zero = write_return(zero_arr, bio, np.int64, intercept=inter) assert_array_equal(back_nan, back_zero) @@ -436,7 +439,7 @@ def test_a2f_offset(): # check that non-zero file offset works arr = np.array([[0.0, 1.0], [2.0, 3.0]]) str_io = BytesIO() - str_io.write(b"a" * 42) + str_io.write(b'a' * 42) array_to_file(arr, str_io, np.float, 42) data_back = array_from_file(arr.shape, np.float, str_io, 42) assert_array_equal(data_back, arr.astype(np.float)) @@ -478,28 +481,22 @@ def test_a2f_zeros(): def test_a2f_big_scalers(): # Check that clip works even for overflowing scalers / data info = type_info(np.float32) - arr = np.array([info["min"], 0, info["max"]], dtype=np.float32) + arr = np.array([info['min'], 0, info['max']], dtype=np.float32) str_io = BytesIO() # Intercept causes overflow - does routine scale correctly? # We check whether the routine correctly clips extreme values. # We need nan2zero=False because we can't represent 0 in the input, given # the scaling and the output range. with suppress_warnings(): # overflow - array_to_file(arr, str_io, np.int8, intercept=np.float32(2 ** 120), nan2zero=False) + array_to_file(arr, str_io, np.int8, intercept=np.float32(2**120), + nan2zero=False) data_back = array_from_file(arr.shape, np.int8, str_io) assert_array_equal(data_back, [-128, -128, 127]) # Scales also if mx, mn specified? Same notes and complaints as for the test # above. str_io.seek(0) - array_to_file( - arr, - str_io, - np.int8, - mn=info["min"], - mx=info["max"], - intercept=np.float32(2 ** 120), - nan2zero=False, - ) + array_to_file(arr, str_io, np.int8, mn=info['min'], mx=info['max'], + intercept=np.float32(2**120), nan2zero=False) data_back = array_from_file(arr.shape, np.int8, str_io) assert_array_equal(data_back, [-128, -128, 127]) # And if slope causes overflow? @@ -510,7 +507,8 @@ def test_a2f_big_scalers(): assert_array_equal(data_back, [-128, 0, 127]) # with mn, mx specified? str_io.seek(0) - array_to_file(arr, str_io, np.int8, mn=info["min"], mx=info["max"], divslope=np.float32(0.5)) + array_to_file(arr, str_io, np.int8, mn=info['min'], mx=info['max'], + divslope=np.float32(0.5)) data_back = array_from_file(arr.shape, np.int8, str_io) assert_array_equal(data_back, [-128, 0, 127]) @@ -520,13 +518,13 @@ def test_a2f_int_scaling(): arr = np.array([0, 1, 128, 255], dtype=np.uint8) fobj = BytesIO() back_arr = write_return(arr, fobj, np.uint8, intercept=1) - assert_array_equal(back_arr, np.clip(arr - 1.0, 0, 255)) + assert_array_equal(back_arr, np.clip(arr - 1., 0, 255)) back_arr = write_return(arr, fobj, np.uint8, divslope=2) - assert_array_equal(back_arr, np.round(np.clip(arr / 2.0, 0, 255))) + assert_array_equal(back_arr, np.round(np.clip(arr / 2., 0, 255))) back_arr = write_return(arr, fobj, np.uint8, intercept=1, divslope=2) - assert_array_equal(back_arr, np.round(np.clip((arr - 1.0) / 2.0, 0, 255))) + assert_array_equal(back_arr, np.round(np.clip((arr - 1.) / 2., 0, 255))) back_arr = write_return(arr, fobj, np.int16, intercept=1, divslope=2) - assert_array_equal(back_arr, np.round((arr - 1.0) / 2.0)) + assert_array_equal(back_arr, np.round((arr - 1.) / 2.)) def test_a2f_scaled_unscaled(): @@ -534,8 +532,10 @@ def test_a2f_scaled_unscaled(): # without scaling fobj = BytesIO() for in_dtype, out_dtype, intercept, divslope in itertools.product( - NUMERIC_TYPES, NUMERIC_TYPES, (0, 0.5, -1, 1), (1, 0.5, 2) - ): + NUMERIC_TYPES, + NUMERIC_TYPES, + (0, 0.5, -1, 1), + (1, 0.5, 2)): mn_in, mx_in = _dt_min_max(in_dtype) nan_val = np.nan if in_dtype in CFLOAT_TYPES else 10 arr = np.array([mn_in, -1, 0, 1, mx_in, nan_val], dtype=in_dtype) @@ -545,28 +545,31 @@ def test_a2f_scaled_unscaled(): if out_dtype in IUINT_TYPES: nan_fill = np.round(nan_fill) # nan2zero will check whether 0 in scaled to a valid value in output - if in_dtype in CFLOAT_TYPES and not mn_out <= nan_fill <= mx_out: + if (in_dtype in CFLOAT_TYPES and not mn_out <= nan_fill <= mx_out): with pytest.raises(ValueError): - array_to_file( - arr, fobj, out_dtype=out_dtype, divslope=divslope, intercept=intercept, - ) + array_to_file(arr, + fobj, + out_dtype=out_dtype, + divslope=divslope, + intercept=intercept) continue with suppress_warnings(): - back_arr = write_return( - arr, fobj, out_dtype=out_dtype, divslope=divslope, intercept=intercept - ) + back_arr = write_return(arr, fobj, + out_dtype=out_dtype, + divslope=divslope, + intercept=intercept) exp_back = arr.copy() - if ( - in_dtype in IUINT_TYPES - and out_dtype in IUINT_TYPES - and (intercept, divslope) == (0, 1) - ): + if (in_dtype in IUINT_TYPES and + out_dtype in IUINT_TYPES and + (intercept, divslope) == (0, 1)): # Direct iu to iu casting. # Need to clip if ranges not the same. # Use smaller of input, output range to avoid np.clip upcasting # the array because of large clip limits. if (mn_in, mx_in) != (mn_out, mx_out): - exp_back = np.clip(exp_back, max(mn_in, mn_out), min(mx_in, mx_out)) + exp_back = np.clip(exp_back, + max(mn_in, mn_out), + min(mx_in, mx_out)) else: # Need to deal with nans, casting to float, clipping if in_dtype in CFLOAT_TYPES and out_dtype in IUINT_TYPES: exp_back[np.isnan(exp_back)] = 0 @@ -576,7 +579,8 @@ def test_a2f_scaled_unscaled(): exp_back -= intercept if divslope != 1: exp_back /= divslope - if exp_back.dtype.type in CFLOAT_TYPES and out_dtype in IUINT_TYPES: + if (exp_back.dtype.type in CFLOAT_TYPES and + out_dtype in IUINT_TYPES): exp_back = np.round(exp_back).astype(float) exp_back = np.clip(exp_back, *shared_range(float, out_dtype)) exp_back = exp_back.astype(out_dtype) @@ -596,26 +600,35 @@ def test_a2f_nanpos(): def test_a2f_bad_scaling(): # Test that pathological scalers raise an error - NUMERICAL_TYPES = sum([np.sctypes[key] for key in ["int", "uint", "float", "complex"]], []) + NUMERICAL_TYPES = sum([np.sctypes[key] for key in ['int', + 'uint', + 'float', + 'complex']], + []) for in_type, out_type, slope, inter in itertools.product( - NUMERICAL_TYPES, - NUMERICAL_TYPES, - (None, 1, 0, np.nan, -np.inf, np.inf), - (0, np.nan, -np.inf, np.inf), - ): + NUMERICAL_TYPES, + NUMERICAL_TYPES, + (None, 1, 0, np.nan, -np.inf, np.inf), + (0, np.nan, -np.inf, np.inf)): arr = np.ones((2,), dtype=in_type) fobj = BytesIO() if (slope, inter) == (1, 0): - assert_array_equal( - arr, write_return(arr, fobj, out_type, intercept=inter, divslope=slope) - ) + assert_array_equal(arr, + write_return(arr, fobj, out_type, + intercept=inter, + divslope=slope)) elif (slope, inter) == (None, 0): - assert_array_equal( - 0, write_return(arr, fobj, out_type, intercept=inter, divslope=slope) - ) + assert_array_equal(0, + write_return(arr, fobj, out_type, + intercept=inter, + divslope=slope)) else: with pytest.raises(ValueError): - array_to_file(arr, fobj, np.int8, intercept=inter, divslope=slope) + array_to_file(arr, + fobj, + np.int8, + intercept=inter, + divslope=slope) def test_a2f_nan2zero_range(): @@ -635,9 +648,8 @@ def test_a2f_nan2zero_range(): # Pushing zero outside the output data range does not generate error back_arr = write_return(arr_no_nan, fobj, np.int8, intercept=129, nan2zero=True) assert_array_equal([-128, -128, -128, -127], back_arr) - back_arr = write_return( - arr_no_nan, fobj, np.int8, intercept=257.1, divslope=2, nan2zero=True - ) + back_arr = write_return(arr_no_nan, fobj, np.int8, + intercept=257.1, divslope=2, nan2zero=True) assert_array_equal([-128, -128, -128, -128], back_arr) for dt in CFLOAT_TYPES: arr = np.array([-1, 0, 1, np.nan], dtype=dt) @@ -645,9 +657,11 @@ def test_a2f_nan2zero_range(): arr_no_nan = np.array([-1, 0, 1, 2], dtype=dt) # No errors from explicit thresholding # mn thresholding excluding zero - assert_array_equal([1, 1, 1, 0], write_return(arr, fobj, np.int8, mn=1)) + assert_array_equal([1, 1, 1, 0], + write_return(arr, fobj, np.int8, mn=1)) # mx thresholding excluding zero - assert_array_equal([-1, -1, -1, 0], write_return(arr, fobj, np.int8, mx=-1)) + assert_array_equal([-1, -1, -1, 0], + write_return(arr, fobj, np.int8, mx=-1)) # Errors from datatype threshold after scaling back_arr = write_return(arr, fobj, np.int8, intercept=128) assert_array_equal([-128, -128, -127, -128], back_arr) @@ -667,16 +681,17 @@ def test_a2f_nan2zero_range(): with pytest.raises(ValueError): write_return(arr_no_nan, fobj, np.int8, intercept=257.1, divslope=2) # OK with nan2zero false - back_arr = write_return(arr, fobj, np.int8, intercept=257.1, divslope=2, nan2zero=False) + back_arr = write_return(arr, fobj, np.int8, + intercept=257.1, divslope=2, nan2zero=False) assert_array_equal([-128, -128, -128, nan_cast], back_arr) def test_a2f_non_numeric(): # Reminder that we may get structured dtypes - dt = np.dtype([("f1", "f"), ("f2", "i2")]) + dt = np.dtype([('f1', 'f'), ('f2', 'i2')]) arr = np.zeros((2,), dtype=dt) - arr["f1"] = 0.4, 0.6 - arr["f2"] = 10, 12 + arr['f1'] = 0.4, 0.6 + arr['f2'] = 10, 12 fobj = BytesIO() back_arr = write_return(arr, fobj, dt) assert_array_equal(back_arr, arr) @@ -722,12 +737,12 @@ def test_apply_scaling(): ret = apply_read_scaling(np.float32(0), inter=np.float64(2)) assert ret.dtype == np.float64 # Check integer inf upcast - big = f32(type_info(f32)["max"]) + big = f32(type_info(f32)['max']) # Normally this would not upcast assert (i16_arr * big).dtype == np.float32 # An equivalent case is a little hard to find for the intercept - nmant_32 = type_info(np.float32)["nmant"] - big_delta = np.float32(2 ** (floor_log2(big) - nmant_32)) + nmant_32 = type_info(np.float32)['nmant'] + big_delta = np.float32(2**(floor_log2(big) - nmant_32)) assert (i16_arr * big_delta + big).dtype == np.float32 # Upcasting does occur with this routine assert apply_read_scaling(i16_arr, big).dtype == np.float64 @@ -741,8 +756,10 @@ def test_apply_scaling(): assert apply_read_scaling(np.int8(0), f32(-1e38), f32(0.0)).dtype == np.float64 # Non-zero intercept still generates floats assert_dt_equal(apply_read_scaling(i16_arr, 1.0, 1.0).dtype, float) - assert_dt_equal(apply_read_scaling(np.zeros((1,), dtype=np.int32), 1.0, 1.0).dtype, float) - assert_dt_equal(apply_read_scaling(np.zeros((1,), dtype=np.int64), 1.0, 1.0).dtype, float) + assert_dt_equal(apply_read_scaling( + np.zeros((1,), dtype=np.int32), 1.0, 1.0).dtype, float) + assert_dt_equal(apply_read_scaling( + np.zeros((1,), dtype=np.int64), 1.0, 1.0).dtype, float) def test_apply_read_scaling_ints(): @@ -755,7 +772,7 @@ def test_apply_read_scaling_ints(): def test_apply_read_scaling_nones(): # Check that we can pass None as slope and inter to apply read scaling - arr = np.arange(10, dtype=np.int16) + arr=np.arange(10, dtype=np.int16) assert_array_equal(apply_read_scaling(arr, None, None), arr) assert_array_equal(apply_read_scaling(arr, 2, None), arr * 2) assert_array_equal(apply_read_scaling(arr, None, 1), arr + 1) @@ -775,8 +792,7 @@ def test_working_type(): # need this because of the very confusing np.int32 != np.intp (on 32 bit). def wt(*args, **kwargs): return np.dtype(working_type(*args, **kwargs)).str - - d1 = np.atleast_1d + d1=np.atleast_1d for in_type in NUMERIC_TYPES: in_ts = np.dtype(in_type).str assert wt(in_type) == in_ts @@ -796,31 +812,30 @@ def wt(*args, **kwargs): out_val = in_val - d1(i_val) assert wt(in_type, 1, i_val) == out_val.dtype.str # Combine scaling and intercept - out_val = (in_val - d1(i_val)) / d1(sl_val) + out_val=(in_val - d1(i_val)) / d1(sl_val) assert wt(in_type, sl_val, i_val) == out_val.dtype.str # Confirm that type codes and dtypes work as well f32s = np.dtype(np.float32).str - assert wt("f4", 1, 0) == f32s - assert wt(np.dtype("f4"), 1, 0) == f32s + assert wt('f4', 1, 0) == f32s + assert wt(np.dtype('f4'), 1, 0) == f32s def test_better_float(): # Better float function def check_against(f1, f2): return f1 if FLOAT_TYPES.index(f1) >= FLOAT_TYPES.index(f2) else f2 - for first in FLOAT_TYPES: - for other in IUINT_TYPES + np.sctypes["complex"]: + for other in IUINT_TYPES + np.sctypes['complex']: assert better_float_of(first, other) == first assert better_float_of(other, first) == first - for other2 in IUINT_TYPES + np.sctypes["complex"]: + for other2 in IUINT_TYPES + np.sctypes['complex']: assert better_float_of(other, other2) == np.float32 assert better_float_of(other, other2, np.float64) == np.float64 for second in FLOAT_TYPES: assert better_float_of(first, second) == check_against(first, second) # Check codes and dtypes work - assert better_float_of("f4", "f8", "f4") == np.float64 - assert better_float_of("i4", "i8", "f8") == np.float64 + assert better_float_of('f4', 'f8', 'f4') == np.float64 + assert better_float_of('i4', 'i8', 'f8') == np.float64 def test_best_write_scale_ftype(): @@ -834,15 +849,15 @@ def test_best_write_scale_ftype(): assert best_write_scale_ftype(arr, np.float32(2), 0) == better_float_of(dtt, np.float32) assert best_write_scale_ftype(arr, 1, np.float32(1)) == better_float_of(dtt, np.float32) # Overflowing ints with scaling results in upcast - best_vals = ((np.float32, np.float64),) + best_vals=((np.float32, np.float64),) if np.longdouble in OK_FLOATS: best_vals += ((np.float64, np.longdouble),) for lower_t, higher_t in best_vals: # Information on this float L_info = type_info(lower_t) - t_max = L_info["max"] - nmant = L_info["nmant"] # number of significand digits - big_delta = lower_t(2 ** (floor_log2(t_max) - nmant)) # delta below max + t_max = L_info['max'] + nmant = L_info['nmant'] # number of significand digits + big_delta = lower_t(2**(floor_log2(t_max) - nmant)) # delta below max # Even large values that don't overflow don't change output arr = np.array([0, t_max], dtype=lower_t) assert best_write_scale_ftype(arr, 1, 0) == lower_t @@ -854,29 +869,28 @@ def test_best_write_scale_ftype(): assert best_write_scale_ftype(arr, 1, -big_delta / 2.01) == lower_t assert best_write_scale_ftype(arr, 1, -big_delta / 2.0) == higher_t # With infs already in input, default type returns - arr[0] = np.inf + arr[0]=np.inf assert best_write_scale_ftype(arr, lower_t(0.5), 0) == lower_t - arr[0] = -np.inf + arr[0]=-np.inf assert best_write_scale_ftype(arr, lower_t(0.5), 0) == lower_t def test_can_cast(): - tests = ( - (np.float32, np.float32, True, True, True), - (np.float64, np.float32, True, True, True), - (np.complex128, np.float32, False, False, False), - (np.float32, np.complex128, True, True, True), - (np.float32, np.uint8, False, True, True), - (np.uint32, np.complex128, True, True, True), - (np.int64, np.float32, True, True, True), - (np.complex128, np.int16, False, False, False), - (np.float32, np.int16, False, True, True), - (np.uint8, np.int16, True, True, True), - (np.uint16, np.int16, False, True, True), - (np.int16, np.uint16, False, False, True), - (np.int8, np.uint16, False, False, True), - (np.uint16, np.uint8, False, True, True), - ) + tests=((np.float32, np.float32, True, True, True), + (np.float64, np.float32, True, True, True), + (np.complex128, np.float32, False, False, False), + (np.float32, np.complex128, True, True, True), + (np.float32, np.uint8, False, True, True), + (np.uint32, np.complex128, True, True, True), + (np.int64, np.float32, True, True, True), + (np.complex128, np.int16, False, False, False), + (np.float32, np.int16, False, True, True), + (np.uint8, np.int16, True, True, True), + (np.uint16, np.int16, False, True, True), + (np.int16, np.uint16, False, False, True), + (np.int8, np.uint16, False, False, True), + (np.uint16, np.uint8, False, True, True), + ) for intype, outtype, def_res, scale_res, all_res in tests: assert def_res == can_cast(intype, outtype) assert scale_res == can_cast(intype, outtype, False, True) @@ -886,21 +900,21 @@ def test_can_cast(): def test_write_zeros(): bio = BytesIO() write_zeros(bio, 10000) - assert bio.getvalue() == b"\x00" * 10000 + assert bio.getvalue() == b'\x00' * 10000 bio.seek(0) bio.truncate(0) write_zeros(bio, 10000, 256) - assert bio.getvalue() == b"\x00" * 10000 + assert bio.getvalue() == b'\x00' * 10000 bio.seek(0) bio.truncate(0) write_zeros(bio, 200, 256) - assert bio.getvalue() == b"\x00" * 200 + assert bio.getvalue() == b'\x00' * 200 def test_seek_tell(): # Test seek tell routine bio = BytesIO() - in_files = bio, "test.bin", "test.gz", "test.bz2" + in_files = bio, 'test.bin', 'test.gz', 'test.bz2' start = 10 end = 100 diff = end - start @@ -910,28 +924,28 @@ def test_seek_tell(): st = functools.partial(seek_tell, write0=write0) bio.seek(0) # First write the file - with ImageOpener(in_file, "wb") as fobj: + with ImageOpener(in_file, 'wb') as fobj: assert fobj.tell() == 0 # already at position - OK st(fobj, 0) assert fobj.tell() == 0 # Move position by writing - fobj.write(b"\x01" * start) + fobj.write(b'\x01' * start) assert fobj.tell() == start # Files other than BZ2Files can seek forward on write, leaving # zeros in their wake. BZ2Files can't seek when writing, unless # we enable the write0 flag to seek_tell - if not write0 and in_file == "test.bz2": # Can't seek write in bz2 + if not write0 and in_file == 'test.bz2': # Can't seek write in bz2 # write the zeros by hand for the read test below - fobj.write(b"\x00" * diff) + fobj.write(b'\x00' * diff) else: st(fobj, end) assert fobj.tell() == end # Write tail - fobj.write(b"\x02" * tail) + fobj.write(b'\x02' * tail) bio.seek(0) # Now read back the file testing seek_tell in reading mode - with ImageOpener(in_file, "rb") as fobj: + with ImageOpener(in_file, 'rb') as fobj: assert fobj.tell() == 0 st(fobj, 0) assert fobj.tell() == 0 @@ -943,23 +957,23 @@ def test_seek_tell(): st(fobj, 0) bio.seek(0) # Check we have the expected written output - with ImageOpener(in_file, "rb") as fobj: - assert fobj.read() == b"\x01" * start + b"\x00" * diff + b"\x02" * tail - for in_file in ("test2.gz", "test2.bz2"): + with ImageOpener(in_file, 'rb') as fobj: + assert fobj.read() == b'\x01' * start + b'\x00' * diff + b'\x02' * tail + for in_file in ('test2.gz', 'test2.bz2'): # Check failure of write seek backwards - with ImageOpener(in_file, "wb") as fobj: - fobj.write(b"g" * 10) + with ImageOpener(in_file, 'wb') as fobj: + fobj.write(b'g' * 10) assert fobj.tell() == 10 seek_tell(fobj, 10) assert fobj.tell() == 10 with pytest.raises(IOError): seek_tell(fobj, 5) # Make sure read seeks don't affect file - with ImageOpener(in_file, "rb") as fobj: + with ImageOpener(in_file, 'rb') as fobj: seek_tell(fobj, 10) seek_tell(fobj, 0) - with ImageOpener(in_file, "rb") as fobj: - assert fobj.read() == b"g" * 10 + with ImageOpener(in_file, 'rb') as fobj: + assert fobj.read() == b'g' * 10 def test_seek_tell_logic(): @@ -970,15 +984,15 @@ def test_seek_tell_logic(): assert bio.tell() == 10 class BabyBio(BytesIO): + def seek(self, *args): raise IOError() - bio = BabyBio() # Fresh fileobj, position 0, can't seek - error with pytest.raises(IOError): bio.seek(10) # Put fileobj in correct position by writing - ZEROB = b"\x00" + ZEROB = b'\x00' bio.write(ZEROB * 10) seek_tell(bio, 10) # already there, nothing to do assert bio.tell() == 10 @@ -993,108 +1007,110 @@ def seek(self, *args): def test_fname_ext_ul_case(): # Get filename ignoring the case of the filename extension with InTemporaryDirectory(): - with open("afile.TXT", "wt") as fobj: - fobj.write("Interesting information") + with open('afile.TXT', 'wt') as fobj: + fobj.write('Interesting information') # OSX usually has case-insensitive file systems; Windows also - os_cares_case = not exists("afile.txt") - with open("bfile.txt", "wt") as fobj: - fobj.write("More interesting information") + os_cares_case = not exists('afile.txt') + with open('bfile.txt', 'wt') as fobj: + fobj.write('More interesting information') # If there is no file, the case doesn't change - assert fname_ext_ul_case("nofile.txt") == "nofile.txt" - assert fname_ext_ul_case("nofile.TXT") == "nofile.TXT" + assert fname_ext_ul_case('nofile.txt') == 'nofile.txt' + assert fname_ext_ul_case('nofile.TXT') == 'nofile.TXT' # If there is a file, accept upper or lower case for ext if os_cares_case: - assert fname_ext_ul_case("afile.txt") == "afile.TXT" - assert fname_ext_ul_case("bfile.TXT") == "bfile.txt" + assert fname_ext_ul_case('afile.txt') == 'afile.TXT' + assert fname_ext_ul_case('bfile.TXT') == 'bfile.txt' else: - assert fname_ext_ul_case("afile.txt") == "afile.txt" - assert fname_ext_ul_case("bfile.TXT") == "bfile.TXT" - assert fname_ext_ul_case("afile.TXT") == "afile.TXT" - assert fname_ext_ul_case("bfile.txt") == "bfile.txt" + assert fname_ext_ul_case('afile.txt') == 'afile.txt' + assert fname_ext_ul_case('bfile.TXT') == 'bfile.TXT' + assert fname_ext_ul_case('afile.TXT') == 'afile.TXT' + assert fname_ext_ul_case('bfile.txt') == 'bfile.txt' # Not mixed case though - assert fname_ext_ul_case("afile.TxT") == "afile.TxT" + assert fname_ext_ul_case('afile.TxT') == 'afile.TxT' def test_allopen(): # This import into volumeutils is for compatibility. The code is the # ``openers`` module. with clear_and_catch_warnings() as w: - warnings.filterwarnings("once", category=DeprecationWarning) + warnings.filterwarnings('once', category=DeprecationWarning) # Test default mode is 'rb' fobj = allopen(__file__) # Check we got the deprecation warning assert len(w) == 1 - assert fobj.mode == "rb" + assert fobj.mode == 'rb' # That we can set it - fobj = allopen(__file__, "r") - assert fobj.mode == "r" + fobj = allopen(__file__, 'r') + assert fobj.mode == 'r' # with keyword arguments - fobj = allopen(__file__, mode="r") - assert fobj.mode == "r" + fobj = allopen(__file__, mode='r') + assert fobj.mode == 'r' # fileobj returns fileobj - msg = b"tiddle pom" + msg = b'tiddle pom' sobj = BytesIO(msg) fobj = allopen(sobj) assert fobj.read() == msg # mode is gently ignored - fobj = allopen(sobj, mode="r") + fobj = allopen(sobj, mode='r') def test_allopen_compresslevel(): # We can set the default compression level with the module global # Get some data to compress - with open(__file__, "rb") as fobj: - my_self = fobj.read() + with open(__file__, 'rb') as fobj: + my_self=fobj.read() # Prepare loop - fname = "test.gz" - sizes = {} + fname='test.gz' + sizes={} # Stash module global from .. import volumeutils as vu - original_compress_level = vu.default_compresslevel assert original_compress_level == 1 try: with InTemporaryDirectory(): - for compresslevel in ("default", 1, 9): - if compresslevel != "default": + for compresslevel in ('default', 1, 9): + if compresslevel != 'default': vu.default_compresslevel = compresslevel with warnings.catch_warnings(): warnings.simplefilter("ignore") - with allopen(fname, "wb") as fobj: + with allopen(fname, 'wb') as fobj: fobj.write(my_self) - with open(fname, "rb") as fobj: + with open(fname, 'rb') as fobj: my_selves_smaller = fobj.read() sizes[compresslevel] = len(my_selves_smaller) - assert sizes["default"] == sizes[1] + assert sizes['default'] == sizes[1] assert sizes[1] > sizes[9] finally: vu.default_compresslevel = original_compress_level def test_shape_zoom_affine(): - shape = (3, 5, 7) - zooms = (3, 2, 1) + shape=(3, 5, 7) + zooms=(3, 2, 1) res = shape_zoom_affine(shape, zooms) - exp = np.array( - [[-3.0, 0.0, 0.0, 3.0], [0.0, 2.0, 0.0, -4.0], [0.0, 0.0, 1.0, -3.0], [0.0, 0.0, 0.0, 1.0],] - ) + exp = np.array([[-3., 0., 0., 3.], + [0., 2., 0., -4.], + [0., 0., 1., -3.], + [0., 0., 0., 1.]]) assert_array_almost_equal(res, exp) res = shape_zoom_affine((3, 5), (3, 2)) - exp = np.array( - [[-3.0, 0.0, 0.0, 3.0], [0.0, 2.0, 0.0, -4.0], [0.0, 0.0, 1.0, -0.0], [0.0, 0.0, 0.0, 1.0],] - ) + exp = np.array([[-3., 0., 0., 3.], + [0., 2., 0., -4.], + [0., 0., 1., -0.], + [0., 0., 0., 1.]]) assert_array_almost_equal(res, exp) res = shape_zoom_affine(shape, zooms, False) - exp = np.array( - [[3.0, 0.0, 0.0, -3.0], [0.0, 2.0, 0.0, -4.0], [0.0, 0.0, 1.0, -3.0], [0.0, 0.0, 0.0, 1.0],] - ) + exp = np.array([[3., 0., 0., -3.], + [0., 2., 0., -4.], + [0., 0., 1., -3.], + [0., 0., 0., 1.]]) assert_array_almost_equal(res, exp) def test_rec2dict(): - r = np.zeros((), dtype=[("x", "i4"), ("s", "S10")]) + r = np.zeros((), dtype=[('x', 'i4'), ('s', 'S10')]) d = rec2dict(r) - assert d == {"x": 0, "s": b""} + assert d == {'x': 0, 's': b''} def test_dtypes(): @@ -1105,36 +1121,38 @@ def test_dtypes(): # In [10]: dtype(' Date: Mon, 11 Nov 2019 16:35:45 -0500 Subject: [PATCH 6/7] fixed spacing, take 27 --- nibabel/tests/test_volumeutils.py | 38 +++++++++++++++---------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/nibabel/tests/test_volumeutils.py b/nibabel/tests/test_volumeutils.py index 19e5f3bd27..7cd0920f6c 100644 --- a/nibabel/tests/test_volumeutils.py +++ b/nibabel/tests/test_volumeutils.py @@ -792,7 +792,7 @@ def test_working_type(): # need this because of the very confusing np.int32 != np.intp (on 32 bit). def wt(*args, **kwargs): return np.dtype(working_type(*args, **kwargs)).str - d1=np.atleast_1d + d1 = np.atleast_1d for in_type in NUMERIC_TYPES: in_ts = np.dtype(in_type).str assert wt(in_type) == in_ts @@ -812,7 +812,7 @@ def wt(*args, **kwargs): out_val = in_val - d1(i_val) assert wt(in_type, 1, i_val) == out_val.dtype.str # Combine scaling and intercept - out_val=(in_val - d1(i_val)) / d1(sl_val) + out_val = (in_val - d1(i_val)) / d1(sl_val) assert wt(in_type, sl_val, i_val) == out_val.dtype.str # Confirm that type codes and dtypes work as well f32s = np.dtype(np.float32).str @@ -849,7 +849,7 @@ def test_best_write_scale_ftype(): assert best_write_scale_ftype(arr, np.float32(2), 0) == better_float_of(dtt, np.float32) assert best_write_scale_ftype(arr, 1, np.float32(1)) == better_float_of(dtt, np.float32) # Overflowing ints with scaling results in upcast - best_vals=((np.float32, np.float64),) + best_vals = ((np.float32, np.float64),) if np.longdouble in OK_FLOATS: best_vals += ((np.float64, np.longdouble),) for lower_t, higher_t in best_vals: @@ -869,14 +869,14 @@ def test_best_write_scale_ftype(): assert best_write_scale_ftype(arr, 1, -big_delta / 2.01) == lower_t assert best_write_scale_ftype(arr, 1, -big_delta / 2.0) == higher_t # With infs already in input, default type returns - arr[0]=np.inf + arr[0] = np.inf assert best_write_scale_ftype(arr, lower_t(0.5), 0) == lower_t - arr[0]=-np.inf + arr[0] = -np.inf assert best_write_scale_ftype(arr, lower_t(0.5), 0) == lower_t def test_can_cast(): - tests=((np.float32, np.float32, True, True, True), + tests = ((np.float32, np.float32, True, True, True), (np.float64, np.float32, True, True, True), (np.complex128, np.float32, False, False, False), (np.float32, np.complex128, True, True, True), @@ -1060,8 +1060,8 @@ def test_allopen_compresslevel(): with open(__file__, 'rb') as fobj: my_self=fobj.read() # Prepare loop - fname='test.gz' - sizes={} + fname = 'test.gz' + sizes = {} # Stash module global from .. import volumeutils as vu original_compress_level = vu.default_compresslevel @@ -1085,8 +1085,8 @@ def test_allopen_compresslevel(): def test_shape_zoom_affine(): - shape=(3, 5, 7) - zooms=(3, 2, 1) + shape = (3, 5, 7) + zooms = (3, 2, 1) res = shape_zoom_affine(shape, zooms) exp = np.array([[-3., 0., 0., 3.], [0., 2., 0., -4.], @@ -1121,7 +1121,7 @@ def test_dtypes(): # In [10]: dtype(' Date: Mon, 11 Nov 2019 16:36:52 -0500 Subject: [PATCH 7/7] fixed spacing, take 28 --- nibabel/tests/test_volumeutils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nibabel/tests/test_volumeutils.py b/nibabel/tests/test_volumeutils.py index 7cd0920f6c..4072f85131 100644 --- a/nibabel/tests/test_volumeutils.py +++ b/nibabel/tests/test_volumeutils.py @@ -1058,7 +1058,7 @@ def test_allopen_compresslevel(): # We can set the default compression level with the module global # Get some data to compress with open(__file__, 'rb') as fobj: - my_self=fobj.read() + my_self = fobj.read() # Prepare loop fname = 'test.gz' sizes = {}