Mercurial > hg > vamp-build-and-test
comparison DEPENDENCIES/mingw32/Python27/Lib/site-packages/numpy/lib/tests/test__datasource.py @ 87:2a2c65a20a8b
Add Python libs and headers
author | Chris Cannam |
---|---|
date | Wed, 25 Feb 2015 14:05:22 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
86:413a9d26189e | 87:2a2c65a20a8b |
---|---|
1 from __future__ import division, absolute_import, print_function | |
2 | |
3 import os | |
4 import sys | |
5 from tempfile import mkdtemp, mkstemp, NamedTemporaryFile | |
6 from shutil import rmtree | |
7 | |
8 from numpy.compat import asbytes | |
9 from numpy.testing import ( | |
10 run_module_suite, TestCase, assert_ | |
11 ) | |
12 import numpy.lib._datasource as datasource | |
13 | |
14 if sys.version_info[0] >= 3: | |
15 import urllib.request as urllib_request | |
16 from urllib.parse import urlparse | |
17 from urllib.error import URLError | |
18 else: | |
19 import urllib2 as urllib_request | |
20 from urlparse import urlparse | |
21 from urllib2 import URLError | |
22 | |
23 | |
24 def urlopen_stub(url, data=None): | |
25 '''Stub to replace urlopen for testing.''' | |
26 if url == valid_httpurl(): | |
27 tmpfile = NamedTemporaryFile(prefix='urltmp_') | |
28 return tmpfile | |
29 else: | |
30 raise URLError('Name or service not known') | |
31 | |
32 # setup and teardown | |
33 old_urlopen = None | |
34 | |
35 | |
36 def setup(): | |
37 global old_urlopen | |
38 | |
39 old_urlopen = urllib_request.urlopen | |
40 urllib_request.urlopen = urlopen_stub | |
41 | |
42 | |
43 def teardown(): | |
44 urllib_request.urlopen = old_urlopen | |
45 | |
46 # A valid website for more robust testing | |
47 http_path = 'http://www.google.com/' | |
48 http_file = 'index.html' | |
49 | |
50 http_fakepath = 'http://fake.abc.web/site/' | |
51 http_fakefile = 'fake.txt' | |
52 | |
53 malicious_files = ['/etc/shadow', '../../shadow', | |
54 '..\\system.dat', 'c:\\windows\\system.dat'] | |
55 | |
56 magic_line = asbytes('three is the magic number') | |
57 | |
58 | |
59 # Utility functions used by many TestCases | |
60 def valid_textfile(filedir): | |
61 # Generate and return a valid temporary file. | |
62 fd, path = mkstemp(suffix='.txt', prefix='dstmp_', dir=filedir, text=True) | |
63 os.close(fd) | |
64 return path | |
65 | |
66 | |
67 def invalid_textfile(filedir): | |
68 # Generate and return an invalid filename. | |
69 fd, path = mkstemp(suffix='.txt', prefix='dstmp_', dir=filedir) | |
70 os.close(fd) | |
71 os.remove(path) | |
72 return path | |
73 | |
74 | |
75 def valid_httpurl(): | |
76 return http_path+http_file | |
77 | |
78 | |
79 def invalid_httpurl(): | |
80 return http_fakepath+http_fakefile | |
81 | |
82 | |
83 def valid_baseurl(): | |
84 return http_path | |
85 | |
86 | |
87 def invalid_baseurl(): | |
88 return http_fakepath | |
89 | |
90 | |
91 def valid_httpfile(): | |
92 return http_file | |
93 | |
94 | |
95 def invalid_httpfile(): | |
96 return http_fakefile | |
97 | |
98 | |
99 class TestDataSourceOpen(TestCase): | |
100 def setUp(self): | |
101 self.tmpdir = mkdtemp() | |
102 self.ds = datasource.DataSource(self.tmpdir) | |
103 | |
104 def tearDown(self): | |
105 rmtree(self.tmpdir) | |
106 del self.ds | |
107 | |
108 def test_ValidHTTP(self): | |
109 fh = self.ds.open(valid_httpurl()) | |
110 assert_(fh) | |
111 fh.close() | |
112 | |
113 def test_InvalidHTTP(self): | |
114 url = invalid_httpurl() | |
115 self.assertRaises(IOError, self.ds.open, url) | |
116 try: | |
117 self.ds.open(url) | |
118 except IOError as e: | |
119 # Regression test for bug fixed in r4342. | |
120 assert_(e.errno is None) | |
121 | |
122 def test_InvalidHTTPCacheURLError(self): | |
123 self.assertRaises(URLError, self.ds._cache, invalid_httpurl()) | |
124 | |
125 def test_ValidFile(self): | |
126 local_file = valid_textfile(self.tmpdir) | |
127 fh = self.ds.open(local_file) | |
128 assert_(fh) | |
129 fh.close() | |
130 | |
131 def test_InvalidFile(self): | |
132 invalid_file = invalid_textfile(self.tmpdir) | |
133 self.assertRaises(IOError, self.ds.open, invalid_file) | |
134 | |
135 def test_ValidGzipFile(self): | |
136 try: | |
137 import gzip | |
138 except ImportError: | |
139 # We don't have the gzip capabilities to test. | |
140 import nose | |
141 raise nose.SkipTest | |
142 # Test datasource's internal file_opener for Gzip files. | |
143 filepath = os.path.join(self.tmpdir, 'foobar.txt.gz') | |
144 fp = gzip.open(filepath, 'w') | |
145 fp.write(magic_line) | |
146 fp.close() | |
147 fp = self.ds.open(filepath) | |
148 result = fp.readline() | |
149 fp.close() | |
150 self.assertEqual(magic_line, result) | |
151 | |
152 def test_ValidBz2File(self): | |
153 try: | |
154 import bz2 | |
155 except ImportError: | |
156 # We don't have the bz2 capabilities to test. | |
157 import nose | |
158 raise nose.SkipTest | |
159 # Test datasource's internal file_opener for BZip2 files. | |
160 filepath = os.path.join(self.tmpdir, 'foobar.txt.bz2') | |
161 fp = bz2.BZ2File(filepath, 'w') | |
162 fp.write(magic_line) | |
163 fp.close() | |
164 fp = self.ds.open(filepath) | |
165 result = fp.readline() | |
166 fp.close() | |
167 self.assertEqual(magic_line, result) | |
168 | |
169 | |
170 class TestDataSourceExists(TestCase): | |
171 def setUp(self): | |
172 self.tmpdir = mkdtemp() | |
173 self.ds = datasource.DataSource(self.tmpdir) | |
174 | |
175 def tearDown(self): | |
176 rmtree(self.tmpdir) | |
177 del self.ds | |
178 | |
179 def test_ValidHTTP(self): | |
180 assert_(self.ds.exists(valid_httpurl())) | |
181 | |
182 def test_InvalidHTTP(self): | |
183 self.assertEqual(self.ds.exists(invalid_httpurl()), False) | |
184 | |
185 def test_ValidFile(self): | |
186 # Test valid file in destpath | |
187 tmpfile = valid_textfile(self.tmpdir) | |
188 assert_(self.ds.exists(tmpfile)) | |
189 # Test valid local file not in destpath | |
190 localdir = mkdtemp() | |
191 tmpfile = valid_textfile(localdir) | |
192 assert_(self.ds.exists(tmpfile)) | |
193 rmtree(localdir) | |
194 | |
195 def test_InvalidFile(self): | |
196 tmpfile = invalid_textfile(self.tmpdir) | |
197 self.assertEqual(self.ds.exists(tmpfile), False) | |
198 | |
199 | |
200 class TestDataSourceAbspath(TestCase): | |
201 def setUp(self): | |
202 self.tmpdir = os.path.abspath(mkdtemp()) | |
203 self.ds = datasource.DataSource(self.tmpdir) | |
204 | |
205 def tearDown(self): | |
206 rmtree(self.tmpdir) | |
207 del self.ds | |
208 | |
209 def test_ValidHTTP(self): | |
210 scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl()) | |
211 local_path = os.path.join(self.tmpdir, netloc, | |
212 upath.strip(os.sep).strip('/')) | |
213 self.assertEqual(local_path, self.ds.abspath(valid_httpurl())) | |
214 | |
215 def test_ValidFile(self): | |
216 tmpfile = valid_textfile(self.tmpdir) | |
217 tmpfilename = os.path.split(tmpfile)[-1] | |
218 # Test with filename only | |
219 self.assertEqual(tmpfile, self.ds.abspath(tmpfilename)) | |
220 # Test filename with complete path | |
221 self.assertEqual(tmpfile, self.ds.abspath(tmpfile)) | |
222 | |
223 def test_InvalidHTTP(self): | |
224 scheme, netloc, upath, pms, qry, frg = urlparse(invalid_httpurl()) | |
225 invalidhttp = os.path.join(self.tmpdir, netloc, | |
226 upath.strip(os.sep).strip('/')) | |
227 self.assertNotEqual(invalidhttp, self.ds.abspath(valid_httpurl())) | |
228 | |
229 def test_InvalidFile(self): | |
230 invalidfile = valid_textfile(self.tmpdir) | |
231 tmpfile = valid_textfile(self.tmpdir) | |
232 tmpfilename = os.path.split(tmpfile)[-1] | |
233 # Test with filename only | |
234 self.assertNotEqual(invalidfile, self.ds.abspath(tmpfilename)) | |
235 # Test filename with complete path | |
236 self.assertNotEqual(invalidfile, self.ds.abspath(tmpfile)) | |
237 | |
238 def test_sandboxing(self): | |
239 tmpfile = valid_textfile(self.tmpdir) | |
240 tmpfilename = os.path.split(tmpfile)[-1] | |
241 | |
242 tmp_path = lambda x: os.path.abspath(self.ds.abspath(x)) | |
243 | |
244 assert_(tmp_path(valid_httpurl()).startswith(self.tmpdir)) | |
245 assert_(tmp_path(invalid_httpurl()).startswith(self.tmpdir)) | |
246 assert_(tmp_path(tmpfile).startswith(self.tmpdir)) | |
247 assert_(tmp_path(tmpfilename).startswith(self.tmpdir)) | |
248 for fn in malicious_files: | |
249 assert_(tmp_path(http_path+fn).startswith(self.tmpdir)) | |
250 assert_(tmp_path(fn).startswith(self.tmpdir)) | |
251 | |
252 def test_windows_os_sep(self): | |
253 orig_os_sep = os.sep | |
254 try: | |
255 os.sep = '\\' | |
256 self.test_ValidHTTP() | |
257 self.test_ValidFile() | |
258 self.test_InvalidHTTP() | |
259 self.test_InvalidFile() | |
260 self.test_sandboxing() | |
261 finally: | |
262 os.sep = orig_os_sep | |
263 | |
264 | |
265 class TestRepositoryAbspath(TestCase): | |
266 def setUp(self): | |
267 self.tmpdir = os.path.abspath(mkdtemp()) | |
268 self.repos = datasource.Repository(valid_baseurl(), self.tmpdir) | |
269 | |
270 def tearDown(self): | |
271 rmtree(self.tmpdir) | |
272 del self.repos | |
273 | |
274 def test_ValidHTTP(self): | |
275 scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl()) | |
276 local_path = os.path.join(self.repos._destpath, netloc, | |
277 upath.strip(os.sep).strip('/')) | |
278 filepath = self.repos.abspath(valid_httpfile()) | |
279 self.assertEqual(local_path, filepath) | |
280 | |
281 def test_sandboxing(self): | |
282 tmp_path = lambda x: os.path.abspath(self.repos.abspath(x)) | |
283 assert_(tmp_path(valid_httpfile()).startswith(self.tmpdir)) | |
284 for fn in malicious_files: | |
285 assert_(tmp_path(http_path+fn).startswith(self.tmpdir)) | |
286 assert_(tmp_path(fn).startswith(self.tmpdir)) | |
287 | |
288 def test_windows_os_sep(self): | |
289 orig_os_sep = os.sep | |
290 try: | |
291 os.sep = '\\' | |
292 self.test_ValidHTTP() | |
293 self.test_sandboxing() | |
294 finally: | |
295 os.sep = orig_os_sep | |
296 | |
297 | |
298 class TestRepositoryExists(TestCase): | |
299 def setUp(self): | |
300 self.tmpdir = mkdtemp() | |
301 self.repos = datasource.Repository(valid_baseurl(), self.tmpdir) | |
302 | |
303 def tearDown(self): | |
304 rmtree(self.tmpdir) | |
305 del self.repos | |
306 | |
307 def test_ValidFile(self): | |
308 # Create local temp file | |
309 tmpfile = valid_textfile(self.tmpdir) | |
310 assert_(self.repos.exists(tmpfile)) | |
311 | |
312 def test_InvalidFile(self): | |
313 tmpfile = invalid_textfile(self.tmpdir) | |
314 self.assertEqual(self.repos.exists(tmpfile), False) | |
315 | |
316 def test_RemoveHTTPFile(self): | |
317 assert_(self.repos.exists(valid_httpurl())) | |
318 | |
319 def test_CachedHTTPFile(self): | |
320 localfile = valid_httpurl() | |
321 # Create a locally cached temp file with an URL based | |
322 # directory structure. This is similar to what Repository.open | |
323 # would do. | |
324 scheme, netloc, upath, pms, qry, frg = urlparse(localfile) | |
325 local_path = os.path.join(self.repos._destpath, netloc) | |
326 os.mkdir(local_path, 0o0700) | |
327 tmpfile = valid_textfile(local_path) | |
328 assert_(self.repos.exists(tmpfile)) | |
329 | |
330 | |
331 class TestOpenFunc(TestCase): | |
332 def setUp(self): | |
333 self.tmpdir = mkdtemp() | |
334 | |
335 def tearDown(self): | |
336 rmtree(self.tmpdir) | |
337 | |
338 def test_DataSourceOpen(self): | |
339 local_file = valid_textfile(self.tmpdir) | |
340 # Test case where destpath is passed in | |
341 fp = datasource.open(local_file, destpath=self.tmpdir) | |
342 assert_(fp) | |
343 fp.close() | |
344 # Test case where default destpath is used | |
345 fp = datasource.open(local_file) | |
346 assert_(fp) | |
347 fp.close() | |
348 | |
349 | |
350 if __name__ == "__main__": | |
351 run_module_suite() |