annotate DEPENDENCIES/mingw32/Python27/Lib/site-packages/numpy/lib/tests/test__datasource.py @ 133:4acb5d8d80b6 tip

Don't fail environmental check if README.md exists (but .txt and no-suffix don't)
author Chris Cannam
date Tue, 30 Jul 2019 12:25:44 +0100
parents 2a2c65a20a8b
children
rev   line source
Chris@87 1 from __future__ import division, absolute_import, print_function
Chris@87 2
Chris@87 3 import os
Chris@87 4 import sys
Chris@87 5 from tempfile import mkdtemp, mkstemp, NamedTemporaryFile
Chris@87 6 from shutil import rmtree
Chris@87 7
Chris@87 8 from numpy.compat import asbytes
Chris@87 9 from numpy.testing import (
Chris@87 10 run_module_suite, TestCase, assert_
Chris@87 11 )
Chris@87 12 import numpy.lib._datasource as datasource
Chris@87 13
Chris@87 14 if sys.version_info[0] >= 3:
Chris@87 15 import urllib.request as urllib_request
Chris@87 16 from urllib.parse import urlparse
Chris@87 17 from urllib.error import URLError
Chris@87 18 else:
Chris@87 19 import urllib2 as urllib_request
Chris@87 20 from urlparse import urlparse
Chris@87 21 from urllib2 import URLError
Chris@87 22
Chris@87 23
Chris@87 24 def urlopen_stub(url, data=None):
Chris@87 25 '''Stub to replace urlopen for testing.'''
Chris@87 26 if url == valid_httpurl():
Chris@87 27 tmpfile = NamedTemporaryFile(prefix='urltmp_')
Chris@87 28 return tmpfile
Chris@87 29 else:
Chris@87 30 raise URLError('Name or service not known')
Chris@87 31
Chris@87 32 # setup and teardown
Chris@87 33 old_urlopen = None
Chris@87 34
Chris@87 35
Chris@87 36 def setup():
Chris@87 37 global old_urlopen
Chris@87 38
Chris@87 39 old_urlopen = urllib_request.urlopen
Chris@87 40 urllib_request.urlopen = urlopen_stub
Chris@87 41
Chris@87 42
Chris@87 43 def teardown():
Chris@87 44 urllib_request.urlopen = old_urlopen
Chris@87 45
Chris@87 46 # A valid website for more robust testing
Chris@87 47 http_path = 'http://www.google.com/'
Chris@87 48 http_file = 'index.html'
Chris@87 49
Chris@87 50 http_fakepath = 'http://fake.abc.web/site/'
Chris@87 51 http_fakefile = 'fake.txt'
Chris@87 52
Chris@87 53 malicious_files = ['/etc/shadow', '../../shadow',
Chris@87 54 '..\\system.dat', 'c:\\windows\\system.dat']
Chris@87 55
Chris@87 56 magic_line = asbytes('three is the magic number')
Chris@87 57
Chris@87 58
Chris@87 59 # Utility functions used by many TestCases
Chris@87 60 def valid_textfile(filedir):
Chris@87 61 # Generate and return a valid temporary file.
Chris@87 62 fd, path = mkstemp(suffix='.txt', prefix='dstmp_', dir=filedir, text=True)
Chris@87 63 os.close(fd)
Chris@87 64 return path
Chris@87 65
Chris@87 66
Chris@87 67 def invalid_textfile(filedir):
Chris@87 68 # Generate and return an invalid filename.
Chris@87 69 fd, path = mkstemp(suffix='.txt', prefix='dstmp_', dir=filedir)
Chris@87 70 os.close(fd)
Chris@87 71 os.remove(path)
Chris@87 72 return path
Chris@87 73
Chris@87 74
Chris@87 75 def valid_httpurl():
Chris@87 76 return http_path+http_file
Chris@87 77
Chris@87 78
Chris@87 79 def invalid_httpurl():
Chris@87 80 return http_fakepath+http_fakefile
Chris@87 81
Chris@87 82
Chris@87 83 def valid_baseurl():
Chris@87 84 return http_path
Chris@87 85
Chris@87 86
Chris@87 87 def invalid_baseurl():
Chris@87 88 return http_fakepath
Chris@87 89
Chris@87 90
Chris@87 91 def valid_httpfile():
Chris@87 92 return http_file
Chris@87 93
Chris@87 94
Chris@87 95 def invalid_httpfile():
Chris@87 96 return http_fakefile
Chris@87 97
Chris@87 98
Chris@87 99 class TestDataSourceOpen(TestCase):
Chris@87 100 def setUp(self):
Chris@87 101 self.tmpdir = mkdtemp()
Chris@87 102 self.ds = datasource.DataSource(self.tmpdir)
Chris@87 103
Chris@87 104 def tearDown(self):
Chris@87 105 rmtree(self.tmpdir)
Chris@87 106 del self.ds
Chris@87 107
Chris@87 108 def test_ValidHTTP(self):
Chris@87 109 fh = self.ds.open(valid_httpurl())
Chris@87 110 assert_(fh)
Chris@87 111 fh.close()
Chris@87 112
Chris@87 113 def test_InvalidHTTP(self):
Chris@87 114 url = invalid_httpurl()
Chris@87 115 self.assertRaises(IOError, self.ds.open, url)
Chris@87 116 try:
Chris@87 117 self.ds.open(url)
Chris@87 118 except IOError as e:
Chris@87 119 # Regression test for bug fixed in r4342.
Chris@87 120 assert_(e.errno is None)
Chris@87 121
Chris@87 122 def test_InvalidHTTPCacheURLError(self):
Chris@87 123 self.assertRaises(URLError, self.ds._cache, invalid_httpurl())
Chris@87 124
Chris@87 125 def test_ValidFile(self):
Chris@87 126 local_file = valid_textfile(self.tmpdir)
Chris@87 127 fh = self.ds.open(local_file)
Chris@87 128 assert_(fh)
Chris@87 129 fh.close()
Chris@87 130
Chris@87 131 def test_InvalidFile(self):
Chris@87 132 invalid_file = invalid_textfile(self.tmpdir)
Chris@87 133 self.assertRaises(IOError, self.ds.open, invalid_file)
Chris@87 134
Chris@87 135 def test_ValidGzipFile(self):
Chris@87 136 try:
Chris@87 137 import gzip
Chris@87 138 except ImportError:
Chris@87 139 # We don't have the gzip capabilities to test.
Chris@87 140 import nose
Chris@87 141 raise nose.SkipTest
Chris@87 142 # Test datasource's internal file_opener for Gzip files.
Chris@87 143 filepath = os.path.join(self.tmpdir, 'foobar.txt.gz')
Chris@87 144 fp = gzip.open(filepath, 'w')
Chris@87 145 fp.write(magic_line)
Chris@87 146 fp.close()
Chris@87 147 fp = self.ds.open(filepath)
Chris@87 148 result = fp.readline()
Chris@87 149 fp.close()
Chris@87 150 self.assertEqual(magic_line, result)
Chris@87 151
Chris@87 152 def test_ValidBz2File(self):
Chris@87 153 try:
Chris@87 154 import bz2
Chris@87 155 except ImportError:
Chris@87 156 # We don't have the bz2 capabilities to test.
Chris@87 157 import nose
Chris@87 158 raise nose.SkipTest
Chris@87 159 # Test datasource's internal file_opener for BZip2 files.
Chris@87 160 filepath = os.path.join(self.tmpdir, 'foobar.txt.bz2')
Chris@87 161 fp = bz2.BZ2File(filepath, 'w')
Chris@87 162 fp.write(magic_line)
Chris@87 163 fp.close()
Chris@87 164 fp = self.ds.open(filepath)
Chris@87 165 result = fp.readline()
Chris@87 166 fp.close()
Chris@87 167 self.assertEqual(magic_line, result)
Chris@87 168
Chris@87 169
Chris@87 170 class TestDataSourceExists(TestCase):
Chris@87 171 def setUp(self):
Chris@87 172 self.tmpdir = mkdtemp()
Chris@87 173 self.ds = datasource.DataSource(self.tmpdir)
Chris@87 174
Chris@87 175 def tearDown(self):
Chris@87 176 rmtree(self.tmpdir)
Chris@87 177 del self.ds
Chris@87 178
Chris@87 179 def test_ValidHTTP(self):
Chris@87 180 assert_(self.ds.exists(valid_httpurl()))
Chris@87 181
Chris@87 182 def test_InvalidHTTP(self):
Chris@87 183 self.assertEqual(self.ds.exists(invalid_httpurl()), False)
Chris@87 184
Chris@87 185 def test_ValidFile(self):
Chris@87 186 # Test valid file in destpath
Chris@87 187 tmpfile = valid_textfile(self.tmpdir)
Chris@87 188 assert_(self.ds.exists(tmpfile))
Chris@87 189 # Test valid local file not in destpath
Chris@87 190 localdir = mkdtemp()
Chris@87 191 tmpfile = valid_textfile(localdir)
Chris@87 192 assert_(self.ds.exists(tmpfile))
Chris@87 193 rmtree(localdir)
Chris@87 194
Chris@87 195 def test_InvalidFile(self):
Chris@87 196 tmpfile = invalid_textfile(self.tmpdir)
Chris@87 197 self.assertEqual(self.ds.exists(tmpfile), False)
Chris@87 198
Chris@87 199
Chris@87 200 class TestDataSourceAbspath(TestCase):
Chris@87 201 def setUp(self):
Chris@87 202 self.tmpdir = os.path.abspath(mkdtemp())
Chris@87 203 self.ds = datasource.DataSource(self.tmpdir)
Chris@87 204
Chris@87 205 def tearDown(self):
Chris@87 206 rmtree(self.tmpdir)
Chris@87 207 del self.ds
Chris@87 208
Chris@87 209 def test_ValidHTTP(self):
Chris@87 210 scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl())
Chris@87 211 local_path = os.path.join(self.tmpdir, netloc,
Chris@87 212 upath.strip(os.sep).strip('/'))
Chris@87 213 self.assertEqual(local_path, self.ds.abspath(valid_httpurl()))
Chris@87 214
Chris@87 215 def test_ValidFile(self):
Chris@87 216 tmpfile = valid_textfile(self.tmpdir)
Chris@87 217 tmpfilename = os.path.split(tmpfile)[-1]
Chris@87 218 # Test with filename only
Chris@87 219 self.assertEqual(tmpfile, self.ds.abspath(tmpfilename))
Chris@87 220 # Test filename with complete path
Chris@87 221 self.assertEqual(tmpfile, self.ds.abspath(tmpfile))
Chris@87 222
Chris@87 223 def test_InvalidHTTP(self):
Chris@87 224 scheme, netloc, upath, pms, qry, frg = urlparse(invalid_httpurl())
Chris@87 225 invalidhttp = os.path.join(self.tmpdir, netloc,
Chris@87 226 upath.strip(os.sep).strip('/'))
Chris@87 227 self.assertNotEqual(invalidhttp, self.ds.abspath(valid_httpurl()))
Chris@87 228
Chris@87 229 def test_InvalidFile(self):
Chris@87 230 invalidfile = valid_textfile(self.tmpdir)
Chris@87 231 tmpfile = valid_textfile(self.tmpdir)
Chris@87 232 tmpfilename = os.path.split(tmpfile)[-1]
Chris@87 233 # Test with filename only
Chris@87 234 self.assertNotEqual(invalidfile, self.ds.abspath(tmpfilename))
Chris@87 235 # Test filename with complete path
Chris@87 236 self.assertNotEqual(invalidfile, self.ds.abspath(tmpfile))
Chris@87 237
Chris@87 238 def test_sandboxing(self):
Chris@87 239 tmpfile = valid_textfile(self.tmpdir)
Chris@87 240 tmpfilename = os.path.split(tmpfile)[-1]
Chris@87 241
Chris@87 242 tmp_path = lambda x: os.path.abspath(self.ds.abspath(x))
Chris@87 243
Chris@87 244 assert_(tmp_path(valid_httpurl()).startswith(self.tmpdir))
Chris@87 245 assert_(tmp_path(invalid_httpurl()).startswith(self.tmpdir))
Chris@87 246 assert_(tmp_path(tmpfile).startswith(self.tmpdir))
Chris@87 247 assert_(tmp_path(tmpfilename).startswith(self.tmpdir))
Chris@87 248 for fn in malicious_files:
Chris@87 249 assert_(tmp_path(http_path+fn).startswith(self.tmpdir))
Chris@87 250 assert_(tmp_path(fn).startswith(self.tmpdir))
Chris@87 251
Chris@87 252 def test_windows_os_sep(self):
Chris@87 253 orig_os_sep = os.sep
Chris@87 254 try:
Chris@87 255 os.sep = '\\'
Chris@87 256 self.test_ValidHTTP()
Chris@87 257 self.test_ValidFile()
Chris@87 258 self.test_InvalidHTTP()
Chris@87 259 self.test_InvalidFile()
Chris@87 260 self.test_sandboxing()
Chris@87 261 finally:
Chris@87 262 os.sep = orig_os_sep
Chris@87 263
Chris@87 264
Chris@87 265 class TestRepositoryAbspath(TestCase):
Chris@87 266 def setUp(self):
Chris@87 267 self.tmpdir = os.path.abspath(mkdtemp())
Chris@87 268 self.repos = datasource.Repository(valid_baseurl(), self.tmpdir)
Chris@87 269
Chris@87 270 def tearDown(self):
Chris@87 271 rmtree(self.tmpdir)
Chris@87 272 del self.repos
Chris@87 273
Chris@87 274 def test_ValidHTTP(self):
Chris@87 275 scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl())
Chris@87 276 local_path = os.path.join(self.repos._destpath, netloc,
Chris@87 277 upath.strip(os.sep).strip('/'))
Chris@87 278 filepath = self.repos.abspath(valid_httpfile())
Chris@87 279 self.assertEqual(local_path, filepath)
Chris@87 280
Chris@87 281 def test_sandboxing(self):
Chris@87 282 tmp_path = lambda x: os.path.abspath(self.repos.abspath(x))
Chris@87 283 assert_(tmp_path(valid_httpfile()).startswith(self.tmpdir))
Chris@87 284 for fn in malicious_files:
Chris@87 285 assert_(tmp_path(http_path+fn).startswith(self.tmpdir))
Chris@87 286 assert_(tmp_path(fn).startswith(self.tmpdir))
Chris@87 287
Chris@87 288 def test_windows_os_sep(self):
Chris@87 289 orig_os_sep = os.sep
Chris@87 290 try:
Chris@87 291 os.sep = '\\'
Chris@87 292 self.test_ValidHTTP()
Chris@87 293 self.test_sandboxing()
Chris@87 294 finally:
Chris@87 295 os.sep = orig_os_sep
Chris@87 296
Chris@87 297
Chris@87 298 class TestRepositoryExists(TestCase):
Chris@87 299 def setUp(self):
Chris@87 300 self.tmpdir = mkdtemp()
Chris@87 301 self.repos = datasource.Repository(valid_baseurl(), self.tmpdir)
Chris@87 302
Chris@87 303 def tearDown(self):
Chris@87 304 rmtree(self.tmpdir)
Chris@87 305 del self.repos
Chris@87 306
Chris@87 307 def test_ValidFile(self):
Chris@87 308 # Create local temp file
Chris@87 309 tmpfile = valid_textfile(self.tmpdir)
Chris@87 310 assert_(self.repos.exists(tmpfile))
Chris@87 311
Chris@87 312 def test_InvalidFile(self):
Chris@87 313 tmpfile = invalid_textfile(self.tmpdir)
Chris@87 314 self.assertEqual(self.repos.exists(tmpfile), False)
Chris@87 315
Chris@87 316 def test_RemoveHTTPFile(self):
Chris@87 317 assert_(self.repos.exists(valid_httpurl()))
Chris@87 318
Chris@87 319 def test_CachedHTTPFile(self):
Chris@87 320 localfile = valid_httpurl()
Chris@87 321 # Create a locally cached temp file with an URL based
Chris@87 322 # directory structure. This is similar to what Repository.open
Chris@87 323 # would do.
Chris@87 324 scheme, netloc, upath, pms, qry, frg = urlparse(localfile)
Chris@87 325 local_path = os.path.join(self.repos._destpath, netloc)
Chris@87 326 os.mkdir(local_path, 0o0700)
Chris@87 327 tmpfile = valid_textfile(local_path)
Chris@87 328 assert_(self.repos.exists(tmpfile))
Chris@87 329
Chris@87 330
Chris@87 331 class TestOpenFunc(TestCase):
Chris@87 332 def setUp(self):
Chris@87 333 self.tmpdir = mkdtemp()
Chris@87 334
Chris@87 335 def tearDown(self):
Chris@87 336 rmtree(self.tmpdir)
Chris@87 337
Chris@87 338 def test_DataSourceOpen(self):
Chris@87 339 local_file = valid_textfile(self.tmpdir)
Chris@87 340 # Test case where destpath is passed in
Chris@87 341 fp = datasource.open(local_file, destpath=self.tmpdir)
Chris@87 342 assert_(fp)
Chris@87 343 fp.close()
Chris@87 344 # Test case where default destpath is used
Chris@87 345 fp = datasource.open(local_file)
Chris@87 346 assert_(fp)
Chris@87 347 fp.close()
Chris@87 348
Chris@87 349
Chris@87 350 if __name__ == "__main__":
Chris@87 351 run_module_suite()