Chris@87: from __future__ import division, absolute_import, print_function Chris@87: Chris@87: import sys Chris@87: import gzip Chris@87: import os Chris@87: import threading Chris@87: from tempfile import mkstemp, NamedTemporaryFile Chris@87: import time Chris@87: import warnings Chris@87: import gc Chris@87: from io import BytesIO Chris@87: from datetime import datetime Chris@87: Chris@87: import numpy as np Chris@87: import numpy.ma as ma Chris@87: from numpy.lib._iotools import (ConverterError, ConverterLockError, Chris@87: ConversionWarning) Chris@87: from numpy.compat import asbytes, asbytes_nested, bytes, asstr Chris@87: from nose import SkipTest Chris@87: from numpy.ma.testutils import ( Chris@87: TestCase, assert_equal, assert_array_equal, Chris@87: assert_raises, assert_raises_regex, run_module_suite Chris@87: ) Chris@87: from numpy.testing import assert_warns, assert_, build_err_msg Chris@87: from numpy.testing.utils import tempdir Chris@87: Chris@87: Chris@87: class TextIO(BytesIO): Chris@87: """Helper IO class. Chris@87: Chris@87: Writes encode strings to bytes if needed, reads return bytes. Chris@87: This makes it easier to emulate files opened in binary mode Chris@87: without needing to explicitly convert strings to bytes in Chris@87: setting up the test data. Chris@87: Chris@87: """ Chris@87: def __init__(self, s=""): Chris@87: BytesIO.__init__(self, asbytes(s)) Chris@87: Chris@87: def write(self, s): Chris@87: BytesIO.write(self, asbytes(s)) Chris@87: Chris@87: def writelines(self, lines): Chris@87: BytesIO.writelines(self, [asbytes(s) for s in lines]) Chris@87: Chris@87: Chris@87: MAJVER, MINVER = sys.version_info[:2] Chris@87: IS_64BIT = sys.maxsize > 2**32 Chris@87: Chris@87: Chris@87: def strptime(s, fmt=None): Chris@87: """This function is available in the datetime module only Chris@87: from Python >= 2.5. Chris@87: Chris@87: """ Chris@87: if sys.version_info[0] >= 3: Chris@87: return datetime(*time.strptime(s.decode('latin1'), fmt)[:3]) Chris@87: else: Chris@87: return datetime(*time.strptime(s, fmt)[:3]) Chris@87: Chris@87: Chris@87: class RoundtripTest(object): Chris@87: def roundtrip(self, save_func, *args, **kwargs): Chris@87: """ Chris@87: save_func : callable Chris@87: Function used to save arrays to file. Chris@87: file_on_disk : bool Chris@87: If true, store the file on disk, instead of in a Chris@87: string buffer. Chris@87: save_kwds : dict Chris@87: Parameters passed to `save_func`. Chris@87: load_kwds : dict Chris@87: Parameters passed to `numpy.load`. Chris@87: args : tuple of arrays Chris@87: Arrays stored to file. Chris@87: Chris@87: """ Chris@87: save_kwds = kwargs.get('save_kwds', {}) Chris@87: load_kwds = kwargs.get('load_kwds', {}) Chris@87: file_on_disk = kwargs.get('file_on_disk', False) Chris@87: Chris@87: if file_on_disk: Chris@87: target_file = NamedTemporaryFile(delete=False) Chris@87: load_file = target_file.name Chris@87: else: Chris@87: target_file = BytesIO() Chris@87: load_file = target_file Chris@87: Chris@87: try: Chris@87: arr = args Chris@87: Chris@87: save_func(target_file, *arr, **save_kwds) Chris@87: target_file.flush() Chris@87: target_file.seek(0) Chris@87: Chris@87: if sys.platform == 'win32' and not isinstance(target_file, BytesIO): Chris@87: target_file.close() Chris@87: Chris@87: arr_reloaded = np.load(load_file, **load_kwds) Chris@87: Chris@87: self.arr = arr Chris@87: self.arr_reloaded = arr_reloaded Chris@87: finally: Chris@87: if not isinstance(target_file, BytesIO): Chris@87: target_file.close() Chris@87: # holds an open file descriptor so it can't be deleted on win Chris@87: if not isinstance(arr_reloaded, np.lib.npyio.NpzFile): Chris@87: os.remove(target_file.name) Chris@87: Chris@87: def check_roundtrips(self, a): Chris@87: self.roundtrip(a) Chris@87: self.roundtrip(a, file_on_disk=True) Chris@87: self.roundtrip(np.asfortranarray(a)) Chris@87: self.roundtrip(np.asfortranarray(a), file_on_disk=True) Chris@87: if a.shape[0] > 1: Chris@87: # neither C nor Fortran contiguous for 2D arrays or more Chris@87: self.roundtrip(np.asfortranarray(a)[1:]) Chris@87: self.roundtrip(np.asfortranarray(a)[1:], file_on_disk=True) Chris@87: Chris@87: def test_array(self): Chris@87: a = np.array([], float) Chris@87: self.check_roundtrips(a) Chris@87: Chris@87: a = np.array([[1, 2], [3, 4]], float) Chris@87: self.check_roundtrips(a) Chris@87: Chris@87: a = np.array([[1, 2], [3, 4]], int) Chris@87: self.check_roundtrips(a) Chris@87: Chris@87: a = np.array([[1 + 5j, 2 + 6j], [3 + 7j, 4 + 8j]], dtype=np.csingle) Chris@87: self.check_roundtrips(a) Chris@87: Chris@87: a = np.array([[1 + 5j, 2 + 6j], [3 + 7j, 4 + 8j]], dtype=np.cdouble) Chris@87: self.check_roundtrips(a) Chris@87: Chris@87: def test_array_object(self): Chris@87: if sys.version_info[:2] >= (2, 7): Chris@87: a = np.array([], object) Chris@87: self.check_roundtrips(a) Chris@87: Chris@87: a = np.array([[1, 2], [3, 4]], object) Chris@87: self.check_roundtrips(a) Chris@87: # Fails with UnpicklingError: could not find MARK on Python 2.6 Chris@87: Chris@87: def test_1D(self): Chris@87: a = np.array([1, 2, 3, 4], int) Chris@87: self.roundtrip(a) Chris@87: Chris@87: @np.testing.dec.knownfailureif(sys.platform == 'win32', "Fail on Win32") Chris@87: def test_mmap(self): Chris@87: a = np.array([[1, 2.5], [4, 7.3]]) Chris@87: self.roundtrip(a, file_on_disk=True, load_kwds={'mmap_mode': 'r'}) Chris@87: Chris@87: a = np.asfortranarray([[1, 2.5], [4, 7.3]]) Chris@87: self.roundtrip(a, file_on_disk=True, load_kwds={'mmap_mode': 'r'}) Chris@87: Chris@87: def test_record(self): Chris@87: a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) Chris@87: self.check_roundtrips(a) Chris@87: Chris@87: def test_format_2_0(self): Chris@87: dt = [(("%d" % i) * 100, float) for i in range(500)] Chris@87: a = np.ones(1000, dtype=dt) Chris@87: with warnings.catch_warnings(record=True): Chris@87: warnings.filterwarnings('always', '', UserWarning) Chris@87: self.check_roundtrips(a) Chris@87: Chris@87: Chris@87: class TestSaveLoad(RoundtripTest, TestCase): Chris@87: def roundtrip(self, *args, **kwargs): Chris@87: RoundtripTest.roundtrip(self, np.save, *args, **kwargs) Chris@87: assert_equal(self.arr[0], self.arr_reloaded) Chris@87: assert_equal(self.arr[0].dtype, self.arr_reloaded.dtype) Chris@87: assert_equal(self.arr[0].flags.fnc, self.arr_reloaded.flags.fnc) Chris@87: Chris@87: Chris@87: class TestSavezLoad(RoundtripTest, TestCase): Chris@87: def roundtrip(self, *args, **kwargs): Chris@87: RoundtripTest.roundtrip(self, np.savez, *args, **kwargs) Chris@87: try: Chris@87: for n, arr in enumerate(self.arr): Chris@87: reloaded = self.arr_reloaded['arr_%d' % n] Chris@87: assert_equal(arr, reloaded) Chris@87: assert_equal(arr.dtype, reloaded.dtype) Chris@87: assert_equal(arr.flags.fnc, reloaded.flags.fnc) Chris@87: finally: Chris@87: # delete tempfile, must be done here on windows Chris@87: if self.arr_reloaded.fid: Chris@87: self.arr_reloaded.fid.close() Chris@87: os.remove(self.arr_reloaded.fid.name) Chris@87: Chris@87: @np.testing.dec.skipif(not IS_64BIT, "Works only with 64bit systems") Chris@87: @np.testing.dec.slow Chris@87: def test_big_arrays(self): Chris@87: L = (1 << 31) + 100000 Chris@87: a = np.empty(L, dtype=np.uint8) Chris@87: with tempdir(prefix="numpy_test_big_arrays_") as tmpdir: Chris@87: tmp = os.path.join(tmpdir, "file.npz") Chris@87: np.savez(tmp, a=a) Chris@87: del a Chris@87: npfile = np.load(tmp) Chris@87: a = npfile['a'] Chris@87: npfile.close() Chris@87: Chris@87: def test_multiple_arrays(self): Chris@87: a = np.array([[1, 2], [3, 4]], float) Chris@87: b = np.array([[1 + 2j, 2 + 7j], [3 - 6j, 4 + 12j]], complex) Chris@87: self.roundtrip(a, b) Chris@87: Chris@87: def test_named_arrays(self): Chris@87: a = np.array([[1, 2], [3, 4]], float) Chris@87: b = np.array([[1 + 2j, 2 + 7j], [3 - 6j, 4 + 12j]], complex) Chris@87: c = BytesIO() Chris@87: np.savez(c, file_a=a, file_b=b) Chris@87: c.seek(0) Chris@87: l = np.load(c) Chris@87: assert_equal(a, l['file_a']) Chris@87: assert_equal(b, l['file_b']) Chris@87: Chris@87: def test_savez_filename_clashes(self): Chris@87: # Test that issue #852 is fixed Chris@87: # and savez functions in multithreaded environment Chris@87: Chris@87: def writer(error_list): Chris@87: fd, tmp = mkstemp(suffix='.npz') Chris@87: os.close(fd) Chris@87: try: Chris@87: arr = np.random.randn(500, 500) Chris@87: try: Chris@87: np.savez(tmp, arr=arr) Chris@87: except OSError as err: Chris@87: error_list.append(err) Chris@87: finally: Chris@87: os.remove(tmp) Chris@87: Chris@87: errors = [] Chris@87: threads = [threading.Thread(target=writer, args=(errors,)) Chris@87: for j in range(3)] Chris@87: for t in threads: Chris@87: t.start() Chris@87: for t in threads: Chris@87: t.join() Chris@87: Chris@87: if errors: Chris@87: raise AssertionError(errors) Chris@87: Chris@87: def test_not_closing_opened_fid(self): Chris@87: # Test that issue #2178 is fixed: Chris@87: # verify could seek on 'loaded' file Chris@87: Chris@87: fd, tmp = mkstemp(suffix='.npz') Chris@87: os.close(fd) Chris@87: try: Chris@87: fp = open(tmp, 'wb') Chris@87: np.savez(fp, data='LOVELY LOAD') Chris@87: fp.close() Chris@87: Chris@87: fp = open(tmp, 'rb', 10000) Chris@87: fp.seek(0) Chris@87: assert_(not fp.closed) Chris@87: _ = np.load(fp)['data'] Chris@87: assert_(not fp.closed) Chris@87: # must not get closed by .load(opened fp) Chris@87: fp.seek(0) Chris@87: assert_(not fp.closed) Chris@87: Chris@87: finally: Chris@87: fp.close() Chris@87: os.remove(tmp) Chris@87: Chris@87: def test_closing_fid(self): Chris@87: # Test that issue #1517 (too many opened files) remains closed Chris@87: # It might be a "weak" test since failed to get triggered on Chris@87: # e.g. Debian sid of 2012 Jul 05 but was reported to Chris@87: # trigger the failure on Ubuntu 10.04: Chris@87: # http://projects.scipy.org/numpy/ticket/1517#comment:2 Chris@87: fd, tmp = mkstemp(suffix='.npz') Chris@87: os.close(fd) Chris@87: Chris@87: try: Chris@87: fp = open(tmp, 'wb') Chris@87: np.savez(fp, data='LOVELY LOAD') Chris@87: fp.close() Chris@87: # We need to check if the garbage collector can properly close Chris@87: # numpy npz file returned by np.load when their reference count Chris@87: # goes to zero. Python 3 running in debug mode raises a Chris@87: # ResourceWarning when file closing is left to the garbage Chris@87: # collector, so we catch the warnings. Because ResourceWarning Chris@87: # is unknown in Python < 3.x, we take the easy way out and Chris@87: # catch all warnings. Chris@87: with warnings.catch_warnings(): Chris@87: warnings.simplefilter("ignore") Chris@87: for i in range(1, 1025): Chris@87: try: Chris@87: np.load(tmp)["data"] Chris@87: except Exception as e: Chris@87: msg = "Failed to load data from a file: %s" % e Chris@87: raise AssertionError(msg) Chris@87: finally: Chris@87: os.remove(tmp) Chris@87: Chris@87: def test_closing_zipfile_after_load(self): Chris@87: # Check that zipfile owns file and can close it. Chris@87: # This needs to pass a file name to load for the Chris@87: # test. Chris@87: with tempdir(prefix="numpy_test_closing_zipfile_after_load_") as tmpdir: Chris@87: fd, tmp = mkstemp(suffix='.npz', dir=tmpdir) Chris@87: os.close(fd) Chris@87: np.savez(tmp, lab='place holder') Chris@87: data = np.load(tmp) Chris@87: fp = data.zip.fp Chris@87: data.close() Chris@87: assert_(fp.closed) Chris@87: Chris@87: Chris@87: class TestSaveTxt(TestCase): Chris@87: def test_array(self): Chris@87: a = np.array([[1, 2], [3, 4]], float) Chris@87: fmt = "%.18e" Chris@87: c = BytesIO() Chris@87: np.savetxt(c, a, fmt=fmt) Chris@87: c.seek(0) Chris@87: assert_equal(c.readlines(), Chris@87: [asbytes((fmt + ' ' + fmt + '\n') % (1, 2)), Chris@87: asbytes((fmt + ' ' + fmt + '\n') % (3, 4))]) Chris@87: Chris@87: a = np.array([[1, 2], [3, 4]], int) Chris@87: c = BytesIO() Chris@87: np.savetxt(c, a, fmt='%d') Chris@87: c.seek(0) Chris@87: assert_equal(c.readlines(), [b'1 2\n', b'3 4\n']) Chris@87: Chris@87: def test_1D(self): Chris@87: a = np.array([1, 2, 3, 4], int) Chris@87: c = BytesIO() Chris@87: np.savetxt(c, a, fmt='%d') Chris@87: c.seek(0) Chris@87: lines = c.readlines() Chris@87: assert_equal(lines, [b'1\n', b'2\n', b'3\n', b'4\n']) Chris@87: Chris@87: def test_record(self): Chris@87: a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) Chris@87: c = BytesIO() Chris@87: np.savetxt(c, a, fmt='%d') Chris@87: c.seek(0) Chris@87: assert_equal(c.readlines(), [b'1 2\n', b'3 4\n']) Chris@87: Chris@87: def test_delimiter(self): Chris@87: a = np.array([[1., 2.], [3., 4.]]) Chris@87: c = BytesIO() Chris@87: np.savetxt(c, a, delimiter=',', fmt='%d') Chris@87: c.seek(0) Chris@87: assert_equal(c.readlines(), [b'1,2\n', b'3,4\n']) Chris@87: Chris@87: def test_format(self): Chris@87: a = np.array([(1, 2), (3, 4)]) Chris@87: c = BytesIO() Chris@87: # Sequence of formats Chris@87: np.savetxt(c, a, fmt=['%02d', '%3.1f']) Chris@87: c.seek(0) Chris@87: assert_equal(c.readlines(), [b'01 2.0\n', b'03 4.0\n']) Chris@87: Chris@87: # A single multiformat string Chris@87: c = BytesIO() Chris@87: np.savetxt(c, a, fmt='%02d : %3.1f') Chris@87: c.seek(0) Chris@87: lines = c.readlines() Chris@87: assert_equal(lines, [b'01 : 2.0\n', b'03 : 4.0\n']) Chris@87: Chris@87: # Specify delimiter, should be overiden Chris@87: c = BytesIO() Chris@87: np.savetxt(c, a, fmt='%02d : %3.1f', delimiter=',') Chris@87: c.seek(0) Chris@87: lines = c.readlines() Chris@87: assert_equal(lines, [b'01 : 2.0\n', b'03 : 4.0\n']) Chris@87: Chris@87: # Bad fmt, should raise a ValueError Chris@87: c = BytesIO() Chris@87: assert_raises(ValueError, np.savetxt, c, a, fmt=99) Chris@87: Chris@87: def test_header_footer(self): Chris@87: """ Chris@87: Test the functionality of the header and footer keyword argument. Chris@87: """ Chris@87: c = BytesIO() Chris@87: a = np.array([(1, 2), (3, 4)], dtype=np.int) Chris@87: test_header_footer = 'Test header / footer' Chris@87: # Test the header keyword argument Chris@87: np.savetxt(c, a, fmt='%1d', header=test_header_footer) Chris@87: c.seek(0) Chris@87: assert_equal(c.read(), Chris@87: asbytes('# ' + test_header_footer + '\n1 2\n3 4\n')) Chris@87: # Test the footer keyword argument Chris@87: c = BytesIO() Chris@87: np.savetxt(c, a, fmt='%1d', footer=test_header_footer) Chris@87: c.seek(0) Chris@87: assert_equal(c.read(), Chris@87: asbytes('1 2\n3 4\n# ' + test_header_footer + '\n')) Chris@87: # Test the commentstr keyword argument used on the header Chris@87: c = BytesIO() Chris@87: commentstr = '% ' Chris@87: np.savetxt(c, a, fmt='%1d', Chris@87: header=test_header_footer, comments=commentstr) Chris@87: c.seek(0) Chris@87: assert_equal(c.read(), Chris@87: asbytes(commentstr + test_header_footer + '\n' + '1 2\n3 4\n')) Chris@87: # Test the commentstr keyword argument used on the footer Chris@87: c = BytesIO() Chris@87: commentstr = '% ' Chris@87: np.savetxt(c, a, fmt='%1d', Chris@87: footer=test_header_footer, comments=commentstr) Chris@87: c.seek(0) Chris@87: assert_equal(c.read(), Chris@87: asbytes('1 2\n3 4\n' + commentstr + test_header_footer + '\n')) Chris@87: Chris@87: def test_file_roundtrip(self): Chris@87: f, name = mkstemp() Chris@87: os.close(f) Chris@87: try: Chris@87: a = np.array([(1, 2), (3, 4)]) Chris@87: np.savetxt(name, a) Chris@87: b = np.loadtxt(name) Chris@87: assert_array_equal(a, b) Chris@87: finally: Chris@87: os.unlink(name) Chris@87: Chris@87: def test_complex_arrays(self): Chris@87: ncols = 2 Chris@87: nrows = 2 Chris@87: a = np.zeros((ncols, nrows), dtype=np.complex128) Chris@87: re = np.pi Chris@87: im = np.e Chris@87: a[:] = re + 1.0j * im Chris@87: Chris@87: # One format only Chris@87: c = BytesIO() Chris@87: np.savetxt(c, a, fmt=' %+.3e') Chris@87: c.seek(0) Chris@87: lines = c.readlines() Chris@87: assert_equal( Chris@87: lines, Chris@87: [b' ( +3.142e+00+ +2.718e+00j) ( +3.142e+00+ +2.718e+00j)\n', Chris@87: b' ( +3.142e+00+ +2.718e+00j) ( +3.142e+00+ +2.718e+00j)\n']) Chris@87: Chris@87: # One format for each real and imaginary part Chris@87: c = BytesIO() Chris@87: np.savetxt(c, a, fmt=' %+.3e' * 2 * ncols) Chris@87: c.seek(0) Chris@87: lines = c.readlines() Chris@87: assert_equal( Chris@87: lines, Chris@87: [b' +3.142e+00 +2.718e+00 +3.142e+00 +2.718e+00\n', Chris@87: b' +3.142e+00 +2.718e+00 +3.142e+00 +2.718e+00\n']) Chris@87: Chris@87: # One format for each complex number Chris@87: c = BytesIO() Chris@87: np.savetxt(c, a, fmt=['(%.3e%+.3ej)'] * ncols) Chris@87: c.seek(0) Chris@87: lines = c.readlines() Chris@87: assert_equal( Chris@87: lines, Chris@87: [b'(3.142e+00+2.718e+00j) (3.142e+00+2.718e+00j)\n', Chris@87: b'(3.142e+00+2.718e+00j) (3.142e+00+2.718e+00j)\n']) Chris@87: Chris@87: def test_custom_writer(self): Chris@87: Chris@87: class CustomWriter(list): Chris@87: def write(self, text): Chris@87: self.extend(text.split(b'\n')) Chris@87: Chris@87: w = CustomWriter() Chris@87: a = np.array([(1, 2), (3, 4)]) Chris@87: np.savetxt(w, a) Chris@87: b = np.loadtxt(w) Chris@87: assert_array_equal(a, b) Chris@87: Chris@87: Chris@87: class TestLoadTxt(TestCase): Chris@87: def test_record(self): Chris@87: c = TextIO() Chris@87: c.write('1 2\n3 4') Chris@87: c.seek(0) Chris@87: x = np.loadtxt(c, dtype=[('x', np.int32), ('y', np.int32)]) Chris@87: a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) Chris@87: assert_array_equal(x, a) Chris@87: Chris@87: d = TextIO() Chris@87: d.write('M 64.0 75.0\nF 25.0 60.0') Chris@87: d.seek(0) Chris@87: mydescriptor = {'names': ('gender', 'age', 'weight'), Chris@87: 'formats': ('S1', 'i4', 'f4')} Chris@87: b = np.array([('M', 64.0, 75.0), Chris@87: ('F', 25.0, 60.0)], dtype=mydescriptor) Chris@87: y = np.loadtxt(d, dtype=mydescriptor) Chris@87: assert_array_equal(y, b) Chris@87: Chris@87: def test_array(self): Chris@87: c = TextIO() Chris@87: c.write('1 2\n3 4') Chris@87: Chris@87: c.seek(0) Chris@87: x = np.loadtxt(c, dtype=np.int) Chris@87: a = np.array([[1, 2], [3, 4]], int) Chris@87: assert_array_equal(x, a) Chris@87: Chris@87: c.seek(0) Chris@87: x = np.loadtxt(c, dtype=float) Chris@87: a = np.array([[1, 2], [3, 4]], float) Chris@87: assert_array_equal(x, a) Chris@87: Chris@87: def test_1D(self): Chris@87: c = TextIO() Chris@87: c.write('1\n2\n3\n4\n') Chris@87: c.seek(0) Chris@87: x = np.loadtxt(c, dtype=int) Chris@87: a = np.array([1, 2, 3, 4], int) Chris@87: assert_array_equal(x, a) Chris@87: Chris@87: c = TextIO() Chris@87: c.write('1,2,3,4\n') Chris@87: c.seek(0) Chris@87: x = np.loadtxt(c, dtype=int, delimiter=',') Chris@87: a = np.array([1, 2, 3, 4], int) Chris@87: assert_array_equal(x, a) Chris@87: Chris@87: def test_missing(self): Chris@87: c = TextIO() Chris@87: c.write('1,2,3,,5\n') Chris@87: c.seek(0) Chris@87: x = np.loadtxt(c, dtype=int, delimiter=',', Chris@87: converters={3: lambda s: int(s or - 999)}) Chris@87: a = np.array([1, 2, 3, -999, 5], int) Chris@87: assert_array_equal(x, a) Chris@87: Chris@87: def test_converters_with_usecols(self): Chris@87: c = TextIO() Chris@87: c.write('1,2,3,,5\n6,7,8,9,10\n') Chris@87: c.seek(0) Chris@87: x = np.loadtxt(c, dtype=int, delimiter=',', Chris@87: converters={3: lambda s: int(s or - 999)}, Chris@87: usecols=(1, 3,)) Chris@87: a = np.array([[2, -999], [7, 9]], int) Chris@87: assert_array_equal(x, a) Chris@87: Chris@87: def test_comments(self): Chris@87: c = TextIO() Chris@87: c.write('# comment\n1,2,3,5\n') Chris@87: c.seek(0) Chris@87: x = np.loadtxt(c, dtype=int, delimiter=',', Chris@87: comments='#') Chris@87: a = np.array([1, 2, 3, 5], int) Chris@87: assert_array_equal(x, a) Chris@87: Chris@87: def test_skiprows(self): Chris@87: c = TextIO() Chris@87: c.write('comment\n1,2,3,5\n') Chris@87: c.seek(0) Chris@87: x = np.loadtxt(c, dtype=int, delimiter=',', Chris@87: skiprows=1) Chris@87: a = np.array([1, 2, 3, 5], int) Chris@87: assert_array_equal(x, a) Chris@87: Chris@87: c = TextIO() Chris@87: c.write('# comment\n1,2,3,5\n') Chris@87: c.seek(0) Chris@87: x = np.loadtxt(c, dtype=int, delimiter=',', Chris@87: skiprows=1) Chris@87: a = np.array([1, 2, 3, 5], int) Chris@87: assert_array_equal(x, a) Chris@87: Chris@87: def test_usecols(self): Chris@87: a = np.array([[1, 2], [3, 4]], float) Chris@87: c = BytesIO() Chris@87: np.savetxt(c, a) Chris@87: c.seek(0) Chris@87: x = np.loadtxt(c, dtype=float, usecols=(1,)) Chris@87: assert_array_equal(x, a[:, 1]) Chris@87: Chris@87: a = np.array([[1, 2, 3], [3, 4, 5]], float) Chris@87: c = BytesIO() Chris@87: np.savetxt(c, a) Chris@87: c.seek(0) Chris@87: x = np.loadtxt(c, dtype=float, usecols=(1, 2)) Chris@87: assert_array_equal(x, a[:, 1:]) Chris@87: Chris@87: # Testing with arrays instead of tuples. Chris@87: c.seek(0) Chris@87: x = np.loadtxt(c, dtype=float, usecols=np.array([1, 2])) Chris@87: assert_array_equal(x, a[:, 1:]) Chris@87: Chris@87: # Checking with dtypes defined converters. Chris@87: data = '''JOE 70.1 25.3 Chris@87: BOB 60.5 27.9 Chris@87: ''' Chris@87: c = TextIO(data) Chris@87: names = ['stid', 'temp'] Chris@87: dtypes = ['S4', 'f8'] Chris@87: arr = np.loadtxt(c, usecols=(0, 2), dtype=list(zip(names, dtypes))) Chris@87: assert_equal(arr['stid'], [b"JOE", b"BOB"]) Chris@87: assert_equal(arr['temp'], [25.3, 27.9]) Chris@87: Chris@87: def test_fancy_dtype(self): Chris@87: c = TextIO() Chris@87: c.write('1,2,3.0\n4,5,6.0\n') Chris@87: c.seek(0) Chris@87: dt = np.dtype([('x', int), ('y', [('t', int), ('s', float)])]) Chris@87: x = np.loadtxt(c, dtype=dt, delimiter=',') Chris@87: a = np.array([(1, (2, 3.0)), (4, (5, 6.0))], dt) Chris@87: assert_array_equal(x, a) Chris@87: Chris@87: def test_shaped_dtype(self): Chris@87: c = TextIO("aaaa 1.0 8.0 1 2 3 4 5 6") Chris@87: dt = np.dtype([('name', 'S4'), ('x', float), ('y', float), Chris@87: ('block', int, (2, 3))]) Chris@87: x = np.loadtxt(c, dtype=dt) Chris@87: a = np.array([('aaaa', 1.0, 8.0, [[1, 2, 3], [4, 5, 6]])], Chris@87: dtype=dt) Chris@87: assert_array_equal(x, a) Chris@87: Chris@87: def test_3d_shaped_dtype(self): Chris@87: c = TextIO("aaaa 1.0 8.0 1 2 3 4 5 6 7 8 9 10 11 12") Chris@87: dt = np.dtype([('name', 'S4'), ('x', float), ('y', float), Chris@87: ('block', int, (2, 2, 3))]) Chris@87: x = np.loadtxt(c, dtype=dt) Chris@87: a = np.array([('aaaa', 1.0, 8.0, Chris@87: [[[1, 2, 3], [4, 5, 6]], [[7, 8, 9], [10, 11, 12]]])], Chris@87: dtype=dt) Chris@87: assert_array_equal(x, a) Chris@87: Chris@87: def test_empty_file(self): Chris@87: with warnings.catch_warnings(): Chris@87: warnings.filterwarnings("ignore", Chris@87: message="loadtxt: Empty input file:") Chris@87: c = TextIO() Chris@87: x = np.loadtxt(c) Chris@87: assert_equal(x.shape, (0,)) Chris@87: x = np.loadtxt(c, dtype=np.int64) Chris@87: assert_equal(x.shape, (0,)) Chris@87: assert_(x.dtype == np.int64) Chris@87: Chris@87: def test_unused_converter(self): Chris@87: c = TextIO() Chris@87: c.writelines(['1 21\n', '3 42\n']) Chris@87: c.seek(0) Chris@87: data = np.loadtxt(c, usecols=(1,), Chris@87: converters={0: lambda s: int(s, 16)}) Chris@87: assert_array_equal(data, [21, 42]) Chris@87: Chris@87: c.seek(0) Chris@87: data = np.loadtxt(c, usecols=(1,), Chris@87: converters={1: lambda s: int(s, 16)}) Chris@87: assert_array_equal(data, [33, 66]) Chris@87: Chris@87: def test_dtype_with_object(self): Chris@87: "Test using an explicit dtype with an object" Chris@87: from datetime import date Chris@87: import time Chris@87: data = """ 1; 2001-01-01 Chris@87: 2; 2002-01-31 """ Chris@87: ndtype = [('idx', int), ('code', np.object)] Chris@87: func = lambda s: strptime(s.strip(), "%Y-%m-%d") Chris@87: converters = {1: func} Chris@87: test = np.loadtxt(TextIO(data), delimiter=";", dtype=ndtype, Chris@87: converters=converters) Chris@87: control = np.array( Chris@87: [(1, datetime(2001, 1, 1)), (2, datetime(2002, 1, 31))], Chris@87: dtype=ndtype) Chris@87: assert_equal(test, control) Chris@87: Chris@87: def test_uint64_type(self): Chris@87: tgt = (9223372043271415339, 9223372043271415853) Chris@87: c = TextIO() Chris@87: c.write("%s %s" % tgt) Chris@87: c.seek(0) Chris@87: res = np.loadtxt(c, dtype=np.uint64) Chris@87: assert_equal(res, tgt) Chris@87: Chris@87: def test_int64_type(self): Chris@87: tgt = (-9223372036854775807, 9223372036854775807) Chris@87: c = TextIO() Chris@87: c.write("%s %s" % tgt) Chris@87: c.seek(0) Chris@87: res = np.loadtxt(c, dtype=np.int64) Chris@87: assert_equal(res, tgt) Chris@87: Chris@87: def test_universal_newline(self): Chris@87: f, name = mkstemp() Chris@87: os.write(f, b'1 21\r3 42\r') Chris@87: os.close(f) Chris@87: Chris@87: try: Chris@87: data = np.loadtxt(name) Chris@87: assert_array_equal(data, [[1, 21], [3, 42]]) Chris@87: finally: Chris@87: os.unlink(name) Chris@87: Chris@87: def test_empty_field_after_tab(self): Chris@87: c = TextIO() Chris@87: c.write('1 \t2 \t3\tstart \n4\t5\t6\t \n7\t8\t9.5\t') Chris@87: c.seek(0) Chris@87: dt = {'names': ('x', 'y', 'z', 'comment'), Chris@87: 'formats': ('= 3: Chris@87: # python 3k is known to fail for '\r' Chris@87: linesep = ('\n', '\r\n') Chris@87: else: Chris@87: linesep = ('\n', '\r\n', '\r') Chris@87: Chris@87: for sep in linesep: Chris@87: data = '0 1 2' + sep + '3 4 5' Chris@87: f, name = mkstemp() Chris@87: # We can't use NamedTemporaryFile on windows, because we cannot Chris@87: # reopen the file. Chris@87: try: Chris@87: os.write(f, asbytes(data)) Chris@87: assert_array_equal(np.genfromtxt(name), wanted) Chris@87: finally: Chris@87: os.close(f) Chris@87: os.unlink(name) Chris@87: Chris@87: def test_gft_using_generator(self): Chris@87: # gft doesn't work with unicode. Chris@87: def count(): Chris@87: for i in range(10): Chris@87: yield asbytes("%d" % i) Chris@87: Chris@87: res = np.genfromtxt(count()) Chris@87: assert_array_equal(res, np.arange(10)) Chris@87: Chris@87: Chris@87: def test_gzip_load(): Chris@87: a = np.random.random((5, 5)) Chris@87: Chris@87: s = BytesIO() Chris@87: f = gzip.GzipFile(fileobj=s, mode="w") Chris@87: Chris@87: np.save(f, a) Chris@87: f.close() Chris@87: s.seek(0) Chris@87: Chris@87: f = gzip.GzipFile(fileobj=s, mode="r") Chris@87: assert_array_equal(np.load(f), a) Chris@87: Chris@87: Chris@87: def test_gzip_loadtxt(): Chris@87: # Thanks to another windows brokeness, we can't use Chris@87: # NamedTemporaryFile: a file created from this function cannot be Chris@87: # reopened by another open call. So we first put the gzipped string Chris@87: # of the test reference array, write it to a securely opened file, Chris@87: # which is then read from by the loadtxt function Chris@87: s = BytesIO() Chris@87: g = gzip.GzipFile(fileobj=s, mode='w') Chris@87: g.write(b'1 2 3\n') Chris@87: g.close() Chris@87: s.seek(0) Chris@87: Chris@87: f, name = mkstemp(suffix='.gz') Chris@87: try: Chris@87: os.write(f, s.read()) Chris@87: s.close() Chris@87: assert_array_equal(np.loadtxt(name), [1, 2, 3]) Chris@87: finally: Chris@87: os.close(f) Chris@87: os.unlink(name) Chris@87: Chris@87: Chris@87: def test_gzip_loadtxt_from_string(): Chris@87: s = BytesIO() Chris@87: f = gzip.GzipFile(fileobj=s, mode="w") Chris@87: f.write(b'1 2 3\n') Chris@87: f.close() Chris@87: s.seek(0) Chris@87: Chris@87: f = gzip.GzipFile(fileobj=s, mode="r") Chris@87: assert_array_equal(np.loadtxt(f), [1, 2, 3]) Chris@87: Chris@87: Chris@87: def test_npzfile_dict(): Chris@87: s = BytesIO() Chris@87: x = np.zeros((3, 3)) Chris@87: y = np.zeros((3, 3)) Chris@87: Chris@87: np.savez(s, x=x, y=y) Chris@87: s.seek(0) Chris@87: Chris@87: z = np.load(s) Chris@87: Chris@87: assert_('x' in z) Chris@87: assert_('y' in z) Chris@87: assert_('x' in z.keys()) Chris@87: assert_('y' in z.keys()) Chris@87: Chris@87: for f, a in z.items(): Chris@87: assert_(f in ['x', 'y']) Chris@87: assert_equal(a.shape, (3, 3)) Chris@87: Chris@87: assert_(len(z.items()) == 2) Chris@87: Chris@87: for f in z: Chris@87: assert_(f in ['x', 'y']) Chris@87: Chris@87: assert_('x' in z.keys()) Chris@87: Chris@87: Chris@87: def test_load_refcount(): Chris@87: # Check that objects returned by np.load are directly freed based on Chris@87: # their refcount, rather than needing the gc to collect them. Chris@87: Chris@87: f = BytesIO() Chris@87: np.savez(f, [1, 2, 3]) Chris@87: f.seek(0) Chris@87: Chris@87: gc.collect() Chris@87: n_before = len(gc.get_objects()) Chris@87: np.load(f) Chris@87: n_after = len(gc.get_objects()) Chris@87: Chris@87: assert_equal(n_before, n_after) Chris@87: Chris@87: if __name__ == "__main__": Chris@87: run_module_suite()