Chris@87: from __future__ import division, absolute_import, print_function Chris@87: Chris@87: import os Chris@87: import sys Chris@87: from tempfile import mkdtemp, mkstemp, NamedTemporaryFile Chris@87: from shutil import rmtree Chris@87: Chris@87: from numpy.compat import asbytes Chris@87: from numpy.testing import ( Chris@87: run_module_suite, TestCase, assert_ Chris@87: ) Chris@87: import numpy.lib._datasource as datasource Chris@87: Chris@87: if sys.version_info[0] >= 3: Chris@87: import urllib.request as urllib_request Chris@87: from urllib.parse import urlparse Chris@87: from urllib.error import URLError Chris@87: else: Chris@87: import urllib2 as urllib_request Chris@87: from urlparse import urlparse Chris@87: from urllib2 import URLError Chris@87: Chris@87: Chris@87: def urlopen_stub(url, data=None): Chris@87: '''Stub to replace urlopen for testing.''' Chris@87: if url == valid_httpurl(): Chris@87: tmpfile = NamedTemporaryFile(prefix='urltmp_') Chris@87: return tmpfile Chris@87: else: Chris@87: raise URLError('Name or service not known') Chris@87: Chris@87: # setup and teardown Chris@87: old_urlopen = None Chris@87: Chris@87: Chris@87: def setup(): Chris@87: global old_urlopen Chris@87: Chris@87: old_urlopen = urllib_request.urlopen Chris@87: urllib_request.urlopen = urlopen_stub Chris@87: Chris@87: Chris@87: def teardown(): Chris@87: urllib_request.urlopen = old_urlopen Chris@87: Chris@87: # A valid website for more robust testing Chris@87: http_path = 'http://www.google.com/' Chris@87: http_file = 'index.html' Chris@87: Chris@87: http_fakepath = 'http://fake.abc.web/site/' Chris@87: http_fakefile = 'fake.txt' Chris@87: Chris@87: malicious_files = ['/etc/shadow', '../../shadow', Chris@87: '..\\system.dat', 'c:\\windows\\system.dat'] Chris@87: Chris@87: magic_line = asbytes('three is the magic number') Chris@87: Chris@87: Chris@87: # Utility functions used by many TestCases Chris@87: def valid_textfile(filedir): Chris@87: # Generate and return a valid temporary file. Chris@87: fd, path = mkstemp(suffix='.txt', prefix='dstmp_', dir=filedir, text=True) Chris@87: os.close(fd) Chris@87: return path Chris@87: Chris@87: Chris@87: def invalid_textfile(filedir): Chris@87: # Generate and return an invalid filename. Chris@87: fd, path = mkstemp(suffix='.txt', prefix='dstmp_', dir=filedir) Chris@87: os.close(fd) Chris@87: os.remove(path) Chris@87: return path Chris@87: Chris@87: Chris@87: def valid_httpurl(): Chris@87: return http_path+http_file Chris@87: Chris@87: Chris@87: def invalid_httpurl(): Chris@87: return http_fakepath+http_fakefile Chris@87: Chris@87: Chris@87: def valid_baseurl(): Chris@87: return http_path Chris@87: Chris@87: Chris@87: def invalid_baseurl(): Chris@87: return http_fakepath Chris@87: Chris@87: Chris@87: def valid_httpfile(): Chris@87: return http_file Chris@87: Chris@87: Chris@87: def invalid_httpfile(): Chris@87: return http_fakefile Chris@87: Chris@87: Chris@87: class TestDataSourceOpen(TestCase): Chris@87: def setUp(self): Chris@87: self.tmpdir = mkdtemp() Chris@87: self.ds = datasource.DataSource(self.tmpdir) Chris@87: Chris@87: def tearDown(self): Chris@87: rmtree(self.tmpdir) Chris@87: del self.ds Chris@87: Chris@87: def test_ValidHTTP(self): Chris@87: fh = self.ds.open(valid_httpurl()) Chris@87: assert_(fh) Chris@87: fh.close() Chris@87: Chris@87: def test_InvalidHTTP(self): Chris@87: url = invalid_httpurl() Chris@87: self.assertRaises(IOError, self.ds.open, url) Chris@87: try: Chris@87: self.ds.open(url) Chris@87: except IOError as e: Chris@87: # Regression test for bug fixed in r4342. Chris@87: assert_(e.errno is None) Chris@87: Chris@87: def test_InvalidHTTPCacheURLError(self): Chris@87: self.assertRaises(URLError, self.ds._cache, invalid_httpurl()) Chris@87: Chris@87: def test_ValidFile(self): Chris@87: local_file = valid_textfile(self.tmpdir) Chris@87: fh = self.ds.open(local_file) Chris@87: assert_(fh) Chris@87: fh.close() Chris@87: Chris@87: def test_InvalidFile(self): Chris@87: invalid_file = invalid_textfile(self.tmpdir) Chris@87: self.assertRaises(IOError, self.ds.open, invalid_file) Chris@87: Chris@87: def test_ValidGzipFile(self): Chris@87: try: Chris@87: import gzip Chris@87: except ImportError: Chris@87: # We don't have the gzip capabilities to test. Chris@87: import nose Chris@87: raise nose.SkipTest Chris@87: # Test datasource's internal file_opener for Gzip files. Chris@87: filepath = os.path.join(self.tmpdir, 'foobar.txt.gz') Chris@87: fp = gzip.open(filepath, 'w') Chris@87: fp.write(magic_line) Chris@87: fp.close() Chris@87: fp = self.ds.open(filepath) Chris@87: result = fp.readline() Chris@87: fp.close() Chris@87: self.assertEqual(magic_line, result) Chris@87: Chris@87: def test_ValidBz2File(self): Chris@87: try: Chris@87: import bz2 Chris@87: except ImportError: Chris@87: # We don't have the bz2 capabilities to test. Chris@87: import nose Chris@87: raise nose.SkipTest Chris@87: # Test datasource's internal file_opener for BZip2 files. Chris@87: filepath = os.path.join(self.tmpdir, 'foobar.txt.bz2') Chris@87: fp = bz2.BZ2File(filepath, 'w') Chris@87: fp.write(magic_line) Chris@87: fp.close() Chris@87: fp = self.ds.open(filepath) Chris@87: result = fp.readline() Chris@87: fp.close() Chris@87: self.assertEqual(magic_line, result) Chris@87: Chris@87: Chris@87: class TestDataSourceExists(TestCase): Chris@87: def setUp(self): Chris@87: self.tmpdir = mkdtemp() Chris@87: self.ds = datasource.DataSource(self.tmpdir) Chris@87: Chris@87: def tearDown(self): Chris@87: rmtree(self.tmpdir) Chris@87: del self.ds Chris@87: Chris@87: def test_ValidHTTP(self): Chris@87: assert_(self.ds.exists(valid_httpurl())) Chris@87: Chris@87: def test_InvalidHTTP(self): Chris@87: self.assertEqual(self.ds.exists(invalid_httpurl()), False) Chris@87: Chris@87: def test_ValidFile(self): Chris@87: # Test valid file in destpath Chris@87: tmpfile = valid_textfile(self.tmpdir) Chris@87: assert_(self.ds.exists(tmpfile)) Chris@87: # Test valid local file not in destpath Chris@87: localdir = mkdtemp() Chris@87: tmpfile = valid_textfile(localdir) Chris@87: assert_(self.ds.exists(tmpfile)) Chris@87: rmtree(localdir) Chris@87: Chris@87: def test_InvalidFile(self): Chris@87: tmpfile = invalid_textfile(self.tmpdir) Chris@87: self.assertEqual(self.ds.exists(tmpfile), False) Chris@87: Chris@87: Chris@87: class TestDataSourceAbspath(TestCase): Chris@87: def setUp(self): Chris@87: self.tmpdir = os.path.abspath(mkdtemp()) Chris@87: self.ds = datasource.DataSource(self.tmpdir) Chris@87: Chris@87: def tearDown(self): Chris@87: rmtree(self.tmpdir) Chris@87: del self.ds Chris@87: Chris@87: def test_ValidHTTP(self): Chris@87: scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl()) Chris@87: local_path = os.path.join(self.tmpdir, netloc, Chris@87: upath.strip(os.sep).strip('/')) Chris@87: self.assertEqual(local_path, self.ds.abspath(valid_httpurl())) Chris@87: Chris@87: def test_ValidFile(self): Chris@87: tmpfile = valid_textfile(self.tmpdir) Chris@87: tmpfilename = os.path.split(tmpfile)[-1] Chris@87: # Test with filename only Chris@87: self.assertEqual(tmpfile, self.ds.abspath(tmpfilename)) Chris@87: # Test filename with complete path Chris@87: self.assertEqual(tmpfile, self.ds.abspath(tmpfile)) Chris@87: Chris@87: def test_InvalidHTTP(self): Chris@87: scheme, netloc, upath, pms, qry, frg = urlparse(invalid_httpurl()) Chris@87: invalidhttp = os.path.join(self.tmpdir, netloc, Chris@87: upath.strip(os.sep).strip('/')) Chris@87: self.assertNotEqual(invalidhttp, self.ds.abspath(valid_httpurl())) Chris@87: Chris@87: def test_InvalidFile(self): Chris@87: invalidfile = valid_textfile(self.tmpdir) Chris@87: tmpfile = valid_textfile(self.tmpdir) Chris@87: tmpfilename = os.path.split(tmpfile)[-1] Chris@87: # Test with filename only Chris@87: self.assertNotEqual(invalidfile, self.ds.abspath(tmpfilename)) Chris@87: # Test filename with complete path Chris@87: self.assertNotEqual(invalidfile, self.ds.abspath(tmpfile)) Chris@87: Chris@87: def test_sandboxing(self): Chris@87: tmpfile = valid_textfile(self.tmpdir) Chris@87: tmpfilename = os.path.split(tmpfile)[-1] Chris@87: Chris@87: tmp_path = lambda x: os.path.abspath(self.ds.abspath(x)) Chris@87: Chris@87: assert_(tmp_path(valid_httpurl()).startswith(self.tmpdir)) Chris@87: assert_(tmp_path(invalid_httpurl()).startswith(self.tmpdir)) Chris@87: assert_(tmp_path(tmpfile).startswith(self.tmpdir)) Chris@87: assert_(tmp_path(tmpfilename).startswith(self.tmpdir)) Chris@87: for fn in malicious_files: Chris@87: assert_(tmp_path(http_path+fn).startswith(self.tmpdir)) Chris@87: assert_(tmp_path(fn).startswith(self.tmpdir)) Chris@87: Chris@87: def test_windows_os_sep(self): Chris@87: orig_os_sep = os.sep Chris@87: try: Chris@87: os.sep = '\\' Chris@87: self.test_ValidHTTP() Chris@87: self.test_ValidFile() Chris@87: self.test_InvalidHTTP() Chris@87: self.test_InvalidFile() Chris@87: self.test_sandboxing() Chris@87: finally: Chris@87: os.sep = orig_os_sep Chris@87: Chris@87: Chris@87: class TestRepositoryAbspath(TestCase): Chris@87: def setUp(self): Chris@87: self.tmpdir = os.path.abspath(mkdtemp()) Chris@87: self.repos = datasource.Repository(valid_baseurl(), self.tmpdir) Chris@87: Chris@87: def tearDown(self): Chris@87: rmtree(self.tmpdir) Chris@87: del self.repos Chris@87: Chris@87: def test_ValidHTTP(self): Chris@87: scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl()) Chris@87: local_path = os.path.join(self.repos._destpath, netloc, Chris@87: upath.strip(os.sep).strip('/')) Chris@87: filepath = self.repos.abspath(valid_httpfile()) Chris@87: self.assertEqual(local_path, filepath) Chris@87: Chris@87: def test_sandboxing(self): Chris@87: tmp_path = lambda x: os.path.abspath(self.repos.abspath(x)) Chris@87: assert_(tmp_path(valid_httpfile()).startswith(self.tmpdir)) Chris@87: for fn in malicious_files: Chris@87: assert_(tmp_path(http_path+fn).startswith(self.tmpdir)) Chris@87: assert_(tmp_path(fn).startswith(self.tmpdir)) Chris@87: Chris@87: def test_windows_os_sep(self): Chris@87: orig_os_sep = os.sep Chris@87: try: Chris@87: os.sep = '\\' Chris@87: self.test_ValidHTTP() Chris@87: self.test_sandboxing() Chris@87: finally: Chris@87: os.sep = orig_os_sep Chris@87: Chris@87: Chris@87: class TestRepositoryExists(TestCase): Chris@87: def setUp(self): Chris@87: self.tmpdir = mkdtemp() Chris@87: self.repos = datasource.Repository(valid_baseurl(), self.tmpdir) Chris@87: Chris@87: def tearDown(self): Chris@87: rmtree(self.tmpdir) Chris@87: del self.repos Chris@87: Chris@87: def test_ValidFile(self): Chris@87: # Create local temp file Chris@87: tmpfile = valid_textfile(self.tmpdir) Chris@87: assert_(self.repos.exists(tmpfile)) Chris@87: Chris@87: def test_InvalidFile(self): Chris@87: tmpfile = invalid_textfile(self.tmpdir) Chris@87: self.assertEqual(self.repos.exists(tmpfile), False) Chris@87: Chris@87: def test_RemoveHTTPFile(self): Chris@87: assert_(self.repos.exists(valid_httpurl())) Chris@87: Chris@87: def test_CachedHTTPFile(self): Chris@87: localfile = valid_httpurl() Chris@87: # Create a locally cached temp file with an URL based Chris@87: # directory structure. This is similar to what Repository.open Chris@87: # would do. Chris@87: scheme, netloc, upath, pms, qry, frg = urlparse(localfile) Chris@87: local_path = os.path.join(self.repos._destpath, netloc) Chris@87: os.mkdir(local_path, 0o0700) Chris@87: tmpfile = valid_textfile(local_path) Chris@87: assert_(self.repos.exists(tmpfile)) Chris@87: Chris@87: Chris@87: class TestOpenFunc(TestCase): Chris@87: def setUp(self): Chris@87: self.tmpdir = mkdtemp() Chris@87: Chris@87: def tearDown(self): Chris@87: rmtree(self.tmpdir) Chris@87: Chris@87: def test_DataSourceOpen(self): Chris@87: local_file = valid_textfile(self.tmpdir) Chris@87: # Test case where destpath is passed in Chris@87: fp = datasource.open(local_file, destpath=self.tmpdir) Chris@87: assert_(fp) Chris@87: fp.close() Chris@87: # Test case where default destpath is used Chris@87: fp = datasource.open(local_file) Chris@87: assert_(fp) Chris@87: fp.close() Chris@87: Chris@87: Chris@87: if __name__ == "__main__": Chris@87: run_module_suite()