comparison DEPENDENCIES/mingw32/Python27/Lib/site-packages/numpy/lib/tests/test__datasource.py @ 87:2a2c65a20a8b

Add Python libs and headers
author Chris Cannam
date Wed, 25 Feb 2015 14:05:22 +0000
parents
children
comparison
equal deleted inserted replaced
86:413a9d26189e 87:2a2c65a20a8b
1 from __future__ import division, absolute_import, print_function
2
3 import os
4 import sys
5 from tempfile import mkdtemp, mkstemp, NamedTemporaryFile
6 from shutil import rmtree
7
8 from numpy.compat import asbytes
9 from numpy.testing import (
10 run_module_suite, TestCase, assert_
11 )
12 import numpy.lib._datasource as datasource
13
14 if sys.version_info[0] >= 3:
15 import urllib.request as urllib_request
16 from urllib.parse import urlparse
17 from urllib.error import URLError
18 else:
19 import urllib2 as urllib_request
20 from urlparse import urlparse
21 from urllib2 import URLError
22
23
24 def urlopen_stub(url, data=None):
25 '''Stub to replace urlopen for testing.'''
26 if url == valid_httpurl():
27 tmpfile = NamedTemporaryFile(prefix='urltmp_')
28 return tmpfile
29 else:
30 raise URLError('Name or service not known')
31
32 # setup and teardown
33 old_urlopen = None
34
35
36 def setup():
37 global old_urlopen
38
39 old_urlopen = urllib_request.urlopen
40 urllib_request.urlopen = urlopen_stub
41
42
43 def teardown():
44 urllib_request.urlopen = old_urlopen
45
46 # A valid website for more robust testing
47 http_path = 'http://www.google.com/'
48 http_file = 'index.html'
49
50 http_fakepath = 'http://fake.abc.web/site/'
51 http_fakefile = 'fake.txt'
52
53 malicious_files = ['/etc/shadow', '../../shadow',
54 '..\\system.dat', 'c:\\windows\\system.dat']
55
56 magic_line = asbytes('three is the magic number')
57
58
59 # Utility functions used by many TestCases
60 def valid_textfile(filedir):
61 # Generate and return a valid temporary file.
62 fd, path = mkstemp(suffix='.txt', prefix='dstmp_', dir=filedir, text=True)
63 os.close(fd)
64 return path
65
66
67 def invalid_textfile(filedir):
68 # Generate and return an invalid filename.
69 fd, path = mkstemp(suffix='.txt', prefix='dstmp_', dir=filedir)
70 os.close(fd)
71 os.remove(path)
72 return path
73
74
75 def valid_httpurl():
76 return http_path+http_file
77
78
79 def invalid_httpurl():
80 return http_fakepath+http_fakefile
81
82
83 def valid_baseurl():
84 return http_path
85
86
87 def invalid_baseurl():
88 return http_fakepath
89
90
91 def valid_httpfile():
92 return http_file
93
94
95 def invalid_httpfile():
96 return http_fakefile
97
98
99 class TestDataSourceOpen(TestCase):
100 def setUp(self):
101 self.tmpdir = mkdtemp()
102 self.ds = datasource.DataSource(self.tmpdir)
103
104 def tearDown(self):
105 rmtree(self.tmpdir)
106 del self.ds
107
108 def test_ValidHTTP(self):
109 fh = self.ds.open(valid_httpurl())
110 assert_(fh)
111 fh.close()
112
113 def test_InvalidHTTP(self):
114 url = invalid_httpurl()
115 self.assertRaises(IOError, self.ds.open, url)
116 try:
117 self.ds.open(url)
118 except IOError as e:
119 # Regression test for bug fixed in r4342.
120 assert_(e.errno is None)
121
122 def test_InvalidHTTPCacheURLError(self):
123 self.assertRaises(URLError, self.ds._cache, invalid_httpurl())
124
125 def test_ValidFile(self):
126 local_file = valid_textfile(self.tmpdir)
127 fh = self.ds.open(local_file)
128 assert_(fh)
129 fh.close()
130
131 def test_InvalidFile(self):
132 invalid_file = invalid_textfile(self.tmpdir)
133 self.assertRaises(IOError, self.ds.open, invalid_file)
134
135 def test_ValidGzipFile(self):
136 try:
137 import gzip
138 except ImportError:
139 # We don't have the gzip capabilities to test.
140 import nose
141 raise nose.SkipTest
142 # Test datasource's internal file_opener for Gzip files.
143 filepath = os.path.join(self.tmpdir, 'foobar.txt.gz')
144 fp = gzip.open(filepath, 'w')
145 fp.write(magic_line)
146 fp.close()
147 fp = self.ds.open(filepath)
148 result = fp.readline()
149 fp.close()
150 self.assertEqual(magic_line, result)
151
152 def test_ValidBz2File(self):
153 try:
154 import bz2
155 except ImportError:
156 # We don't have the bz2 capabilities to test.
157 import nose
158 raise nose.SkipTest
159 # Test datasource's internal file_opener for BZip2 files.
160 filepath = os.path.join(self.tmpdir, 'foobar.txt.bz2')
161 fp = bz2.BZ2File(filepath, 'w')
162 fp.write(magic_line)
163 fp.close()
164 fp = self.ds.open(filepath)
165 result = fp.readline()
166 fp.close()
167 self.assertEqual(magic_line, result)
168
169
170 class TestDataSourceExists(TestCase):
171 def setUp(self):
172 self.tmpdir = mkdtemp()
173 self.ds = datasource.DataSource(self.tmpdir)
174
175 def tearDown(self):
176 rmtree(self.tmpdir)
177 del self.ds
178
179 def test_ValidHTTP(self):
180 assert_(self.ds.exists(valid_httpurl()))
181
182 def test_InvalidHTTP(self):
183 self.assertEqual(self.ds.exists(invalid_httpurl()), False)
184
185 def test_ValidFile(self):
186 # Test valid file in destpath
187 tmpfile = valid_textfile(self.tmpdir)
188 assert_(self.ds.exists(tmpfile))
189 # Test valid local file not in destpath
190 localdir = mkdtemp()
191 tmpfile = valid_textfile(localdir)
192 assert_(self.ds.exists(tmpfile))
193 rmtree(localdir)
194
195 def test_InvalidFile(self):
196 tmpfile = invalid_textfile(self.tmpdir)
197 self.assertEqual(self.ds.exists(tmpfile), False)
198
199
200 class TestDataSourceAbspath(TestCase):
201 def setUp(self):
202 self.tmpdir = os.path.abspath(mkdtemp())
203 self.ds = datasource.DataSource(self.tmpdir)
204
205 def tearDown(self):
206 rmtree(self.tmpdir)
207 del self.ds
208
209 def test_ValidHTTP(self):
210 scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl())
211 local_path = os.path.join(self.tmpdir, netloc,
212 upath.strip(os.sep).strip('/'))
213 self.assertEqual(local_path, self.ds.abspath(valid_httpurl()))
214
215 def test_ValidFile(self):
216 tmpfile = valid_textfile(self.tmpdir)
217 tmpfilename = os.path.split(tmpfile)[-1]
218 # Test with filename only
219 self.assertEqual(tmpfile, self.ds.abspath(tmpfilename))
220 # Test filename with complete path
221 self.assertEqual(tmpfile, self.ds.abspath(tmpfile))
222
223 def test_InvalidHTTP(self):
224 scheme, netloc, upath, pms, qry, frg = urlparse(invalid_httpurl())
225 invalidhttp = os.path.join(self.tmpdir, netloc,
226 upath.strip(os.sep).strip('/'))
227 self.assertNotEqual(invalidhttp, self.ds.abspath(valid_httpurl()))
228
229 def test_InvalidFile(self):
230 invalidfile = valid_textfile(self.tmpdir)
231 tmpfile = valid_textfile(self.tmpdir)
232 tmpfilename = os.path.split(tmpfile)[-1]
233 # Test with filename only
234 self.assertNotEqual(invalidfile, self.ds.abspath(tmpfilename))
235 # Test filename with complete path
236 self.assertNotEqual(invalidfile, self.ds.abspath(tmpfile))
237
238 def test_sandboxing(self):
239 tmpfile = valid_textfile(self.tmpdir)
240 tmpfilename = os.path.split(tmpfile)[-1]
241
242 tmp_path = lambda x: os.path.abspath(self.ds.abspath(x))
243
244 assert_(tmp_path(valid_httpurl()).startswith(self.tmpdir))
245 assert_(tmp_path(invalid_httpurl()).startswith(self.tmpdir))
246 assert_(tmp_path(tmpfile).startswith(self.tmpdir))
247 assert_(tmp_path(tmpfilename).startswith(self.tmpdir))
248 for fn in malicious_files:
249 assert_(tmp_path(http_path+fn).startswith(self.tmpdir))
250 assert_(tmp_path(fn).startswith(self.tmpdir))
251
252 def test_windows_os_sep(self):
253 orig_os_sep = os.sep
254 try:
255 os.sep = '\\'
256 self.test_ValidHTTP()
257 self.test_ValidFile()
258 self.test_InvalidHTTP()
259 self.test_InvalidFile()
260 self.test_sandboxing()
261 finally:
262 os.sep = orig_os_sep
263
264
265 class TestRepositoryAbspath(TestCase):
266 def setUp(self):
267 self.tmpdir = os.path.abspath(mkdtemp())
268 self.repos = datasource.Repository(valid_baseurl(), self.tmpdir)
269
270 def tearDown(self):
271 rmtree(self.tmpdir)
272 del self.repos
273
274 def test_ValidHTTP(self):
275 scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl())
276 local_path = os.path.join(self.repos._destpath, netloc,
277 upath.strip(os.sep).strip('/'))
278 filepath = self.repos.abspath(valid_httpfile())
279 self.assertEqual(local_path, filepath)
280
281 def test_sandboxing(self):
282 tmp_path = lambda x: os.path.abspath(self.repos.abspath(x))
283 assert_(tmp_path(valid_httpfile()).startswith(self.tmpdir))
284 for fn in malicious_files:
285 assert_(tmp_path(http_path+fn).startswith(self.tmpdir))
286 assert_(tmp_path(fn).startswith(self.tmpdir))
287
288 def test_windows_os_sep(self):
289 orig_os_sep = os.sep
290 try:
291 os.sep = '\\'
292 self.test_ValidHTTP()
293 self.test_sandboxing()
294 finally:
295 os.sep = orig_os_sep
296
297
298 class TestRepositoryExists(TestCase):
299 def setUp(self):
300 self.tmpdir = mkdtemp()
301 self.repos = datasource.Repository(valid_baseurl(), self.tmpdir)
302
303 def tearDown(self):
304 rmtree(self.tmpdir)
305 del self.repos
306
307 def test_ValidFile(self):
308 # Create local temp file
309 tmpfile = valid_textfile(self.tmpdir)
310 assert_(self.repos.exists(tmpfile))
311
312 def test_InvalidFile(self):
313 tmpfile = invalid_textfile(self.tmpdir)
314 self.assertEqual(self.repos.exists(tmpfile), False)
315
316 def test_RemoveHTTPFile(self):
317 assert_(self.repos.exists(valid_httpurl()))
318
319 def test_CachedHTTPFile(self):
320 localfile = valid_httpurl()
321 # Create a locally cached temp file with an URL based
322 # directory structure. This is similar to what Repository.open
323 # would do.
324 scheme, netloc, upath, pms, qry, frg = urlparse(localfile)
325 local_path = os.path.join(self.repos._destpath, netloc)
326 os.mkdir(local_path, 0o0700)
327 tmpfile = valid_textfile(local_path)
328 assert_(self.repos.exists(tmpfile))
329
330
331 class TestOpenFunc(TestCase):
332 def setUp(self):
333 self.tmpdir = mkdtemp()
334
335 def tearDown(self):
336 rmtree(self.tmpdir)
337
338 def test_DataSourceOpen(self):
339 local_file = valid_textfile(self.tmpdir)
340 # Test case where destpath is passed in
341 fp = datasource.open(local_file, destpath=self.tmpdir)
342 assert_(fp)
343 fp.close()
344 # Test case where default destpath is used
345 fp = datasource.open(local_file)
346 assert_(fp)
347 fp.close()
348
349
350 if __name__ == "__main__":
351 run_module_suite()