Chris@87
|
1 from __future__ import division, absolute_import, print_function
|
Chris@87
|
2
|
Chris@87
|
3 import os
|
Chris@87
|
4 import sys
|
Chris@87
|
5 from tempfile import mkdtemp, mkstemp, NamedTemporaryFile
|
Chris@87
|
6 from shutil import rmtree
|
Chris@87
|
7
|
Chris@87
|
8 from numpy.compat import asbytes
|
Chris@87
|
9 from numpy.testing import (
|
Chris@87
|
10 run_module_suite, TestCase, assert_
|
Chris@87
|
11 )
|
Chris@87
|
12 import numpy.lib._datasource as datasource
|
Chris@87
|
13
|
Chris@87
|
14 if sys.version_info[0] >= 3:
|
Chris@87
|
15 import urllib.request as urllib_request
|
Chris@87
|
16 from urllib.parse import urlparse
|
Chris@87
|
17 from urllib.error import URLError
|
Chris@87
|
18 else:
|
Chris@87
|
19 import urllib2 as urllib_request
|
Chris@87
|
20 from urlparse import urlparse
|
Chris@87
|
21 from urllib2 import URLError
|
Chris@87
|
22
|
Chris@87
|
23
|
Chris@87
|
24 def urlopen_stub(url, data=None):
|
Chris@87
|
25 '''Stub to replace urlopen for testing.'''
|
Chris@87
|
26 if url == valid_httpurl():
|
Chris@87
|
27 tmpfile = NamedTemporaryFile(prefix='urltmp_')
|
Chris@87
|
28 return tmpfile
|
Chris@87
|
29 else:
|
Chris@87
|
30 raise URLError('Name or service not known')
|
Chris@87
|
31
|
Chris@87
|
32 # setup and teardown
|
Chris@87
|
33 old_urlopen = None
|
Chris@87
|
34
|
Chris@87
|
35
|
Chris@87
|
36 def setup():
|
Chris@87
|
37 global old_urlopen
|
Chris@87
|
38
|
Chris@87
|
39 old_urlopen = urllib_request.urlopen
|
Chris@87
|
40 urllib_request.urlopen = urlopen_stub
|
Chris@87
|
41
|
Chris@87
|
42
|
Chris@87
|
43 def teardown():
|
Chris@87
|
44 urllib_request.urlopen = old_urlopen
|
Chris@87
|
45
|
Chris@87
|
46 # A valid website for more robust testing
|
Chris@87
|
47 http_path = 'http://www.google.com/'
|
Chris@87
|
48 http_file = 'index.html'
|
Chris@87
|
49
|
Chris@87
|
50 http_fakepath = 'http://fake.abc.web/site/'
|
Chris@87
|
51 http_fakefile = 'fake.txt'
|
Chris@87
|
52
|
Chris@87
|
53 malicious_files = ['/etc/shadow', '../../shadow',
|
Chris@87
|
54 '..\\system.dat', 'c:\\windows\\system.dat']
|
Chris@87
|
55
|
Chris@87
|
56 magic_line = asbytes('three is the magic number')
|
Chris@87
|
57
|
Chris@87
|
58
|
Chris@87
|
59 # Utility functions used by many TestCases
|
Chris@87
|
60 def valid_textfile(filedir):
|
Chris@87
|
61 # Generate and return a valid temporary file.
|
Chris@87
|
62 fd, path = mkstemp(suffix='.txt', prefix='dstmp_', dir=filedir, text=True)
|
Chris@87
|
63 os.close(fd)
|
Chris@87
|
64 return path
|
Chris@87
|
65
|
Chris@87
|
66
|
Chris@87
|
67 def invalid_textfile(filedir):
|
Chris@87
|
68 # Generate and return an invalid filename.
|
Chris@87
|
69 fd, path = mkstemp(suffix='.txt', prefix='dstmp_', dir=filedir)
|
Chris@87
|
70 os.close(fd)
|
Chris@87
|
71 os.remove(path)
|
Chris@87
|
72 return path
|
Chris@87
|
73
|
Chris@87
|
74
|
Chris@87
|
75 def valid_httpurl():
|
Chris@87
|
76 return http_path+http_file
|
Chris@87
|
77
|
Chris@87
|
78
|
Chris@87
|
79 def invalid_httpurl():
|
Chris@87
|
80 return http_fakepath+http_fakefile
|
Chris@87
|
81
|
Chris@87
|
82
|
Chris@87
|
83 def valid_baseurl():
|
Chris@87
|
84 return http_path
|
Chris@87
|
85
|
Chris@87
|
86
|
Chris@87
|
87 def invalid_baseurl():
|
Chris@87
|
88 return http_fakepath
|
Chris@87
|
89
|
Chris@87
|
90
|
Chris@87
|
91 def valid_httpfile():
|
Chris@87
|
92 return http_file
|
Chris@87
|
93
|
Chris@87
|
94
|
Chris@87
|
95 def invalid_httpfile():
|
Chris@87
|
96 return http_fakefile
|
Chris@87
|
97
|
Chris@87
|
98
|
Chris@87
|
99 class TestDataSourceOpen(TestCase):
|
Chris@87
|
100 def setUp(self):
|
Chris@87
|
101 self.tmpdir = mkdtemp()
|
Chris@87
|
102 self.ds = datasource.DataSource(self.tmpdir)
|
Chris@87
|
103
|
Chris@87
|
104 def tearDown(self):
|
Chris@87
|
105 rmtree(self.tmpdir)
|
Chris@87
|
106 del self.ds
|
Chris@87
|
107
|
Chris@87
|
108 def test_ValidHTTP(self):
|
Chris@87
|
109 fh = self.ds.open(valid_httpurl())
|
Chris@87
|
110 assert_(fh)
|
Chris@87
|
111 fh.close()
|
Chris@87
|
112
|
Chris@87
|
113 def test_InvalidHTTP(self):
|
Chris@87
|
114 url = invalid_httpurl()
|
Chris@87
|
115 self.assertRaises(IOError, self.ds.open, url)
|
Chris@87
|
116 try:
|
Chris@87
|
117 self.ds.open(url)
|
Chris@87
|
118 except IOError as e:
|
Chris@87
|
119 # Regression test for bug fixed in r4342.
|
Chris@87
|
120 assert_(e.errno is None)
|
Chris@87
|
121
|
Chris@87
|
122 def test_InvalidHTTPCacheURLError(self):
|
Chris@87
|
123 self.assertRaises(URLError, self.ds._cache, invalid_httpurl())
|
Chris@87
|
124
|
Chris@87
|
125 def test_ValidFile(self):
|
Chris@87
|
126 local_file = valid_textfile(self.tmpdir)
|
Chris@87
|
127 fh = self.ds.open(local_file)
|
Chris@87
|
128 assert_(fh)
|
Chris@87
|
129 fh.close()
|
Chris@87
|
130
|
Chris@87
|
131 def test_InvalidFile(self):
|
Chris@87
|
132 invalid_file = invalid_textfile(self.tmpdir)
|
Chris@87
|
133 self.assertRaises(IOError, self.ds.open, invalid_file)
|
Chris@87
|
134
|
Chris@87
|
135 def test_ValidGzipFile(self):
|
Chris@87
|
136 try:
|
Chris@87
|
137 import gzip
|
Chris@87
|
138 except ImportError:
|
Chris@87
|
139 # We don't have the gzip capabilities to test.
|
Chris@87
|
140 import nose
|
Chris@87
|
141 raise nose.SkipTest
|
Chris@87
|
142 # Test datasource's internal file_opener for Gzip files.
|
Chris@87
|
143 filepath = os.path.join(self.tmpdir, 'foobar.txt.gz')
|
Chris@87
|
144 fp = gzip.open(filepath, 'w')
|
Chris@87
|
145 fp.write(magic_line)
|
Chris@87
|
146 fp.close()
|
Chris@87
|
147 fp = self.ds.open(filepath)
|
Chris@87
|
148 result = fp.readline()
|
Chris@87
|
149 fp.close()
|
Chris@87
|
150 self.assertEqual(magic_line, result)
|
Chris@87
|
151
|
Chris@87
|
152 def test_ValidBz2File(self):
|
Chris@87
|
153 try:
|
Chris@87
|
154 import bz2
|
Chris@87
|
155 except ImportError:
|
Chris@87
|
156 # We don't have the bz2 capabilities to test.
|
Chris@87
|
157 import nose
|
Chris@87
|
158 raise nose.SkipTest
|
Chris@87
|
159 # Test datasource's internal file_opener for BZip2 files.
|
Chris@87
|
160 filepath = os.path.join(self.tmpdir, 'foobar.txt.bz2')
|
Chris@87
|
161 fp = bz2.BZ2File(filepath, 'w')
|
Chris@87
|
162 fp.write(magic_line)
|
Chris@87
|
163 fp.close()
|
Chris@87
|
164 fp = self.ds.open(filepath)
|
Chris@87
|
165 result = fp.readline()
|
Chris@87
|
166 fp.close()
|
Chris@87
|
167 self.assertEqual(magic_line, result)
|
Chris@87
|
168
|
Chris@87
|
169
|
Chris@87
|
170 class TestDataSourceExists(TestCase):
|
Chris@87
|
171 def setUp(self):
|
Chris@87
|
172 self.tmpdir = mkdtemp()
|
Chris@87
|
173 self.ds = datasource.DataSource(self.tmpdir)
|
Chris@87
|
174
|
Chris@87
|
175 def tearDown(self):
|
Chris@87
|
176 rmtree(self.tmpdir)
|
Chris@87
|
177 del self.ds
|
Chris@87
|
178
|
Chris@87
|
179 def test_ValidHTTP(self):
|
Chris@87
|
180 assert_(self.ds.exists(valid_httpurl()))
|
Chris@87
|
181
|
Chris@87
|
182 def test_InvalidHTTP(self):
|
Chris@87
|
183 self.assertEqual(self.ds.exists(invalid_httpurl()), False)
|
Chris@87
|
184
|
Chris@87
|
185 def test_ValidFile(self):
|
Chris@87
|
186 # Test valid file in destpath
|
Chris@87
|
187 tmpfile = valid_textfile(self.tmpdir)
|
Chris@87
|
188 assert_(self.ds.exists(tmpfile))
|
Chris@87
|
189 # Test valid local file not in destpath
|
Chris@87
|
190 localdir = mkdtemp()
|
Chris@87
|
191 tmpfile = valid_textfile(localdir)
|
Chris@87
|
192 assert_(self.ds.exists(tmpfile))
|
Chris@87
|
193 rmtree(localdir)
|
Chris@87
|
194
|
Chris@87
|
195 def test_InvalidFile(self):
|
Chris@87
|
196 tmpfile = invalid_textfile(self.tmpdir)
|
Chris@87
|
197 self.assertEqual(self.ds.exists(tmpfile), False)
|
Chris@87
|
198
|
Chris@87
|
199
|
Chris@87
|
200 class TestDataSourceAbspath(TestCase):
|
Chris@87
|
201 def setUp(self):
|
Chris@87
|
202 self.tmpdir = os.path.abspath(mkdtemp())
|
Chris@87
|
203 self.ds = datasource.DataSource(self.tmpdir)
|
Chris@87
|
204
|
Chris@87
|
205 def tearDown(self):
|
Chris@87
|
206 rmtree(self.tmpdir)
|
Chris@87
|
207 del self.ds
|
Chris@87
|
208
|
Chris@87
|
209 def test_ValidHTTP(self):
|
Chris@87
|
210 scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl())
|
Chris@87
|
211 local_path = os.path.join(self.tmpdir, netloc,
|
Chris@87
|
212 upath.strip(os.sep).strip('/'))
|
Chris@87
|
213 self.assertEqual(local_path, self.ds.abspath(valid_httpurl()))
|
Chris@87
|
214
|
Chris@87
|
215 def test_ValidFile(self):
|
Chris@87
|
216 tmpfile = valid_textfile(self.tmpdir)
|
Chris@87
|
217 tmpfilename = os.path.split(tmpfile)[-1]
|
Chris@87
|
218 # Test with filename only
|
Chris@87
|
219 self.assertEqual(tmpfile, self.ds.abspath(tmpfilename))
|
Chris@87
|
220 # Test filename with complete path
|
Chris@87
|
221 self.assertEqual(tmpfile, self.ds.abspath(tmpfile))
|
Chris@87
|
222
|
Chris@87
|
223 def test_InvalidHTTP(self):
|
Chris@87
|
224 scheme, netloc, upath, pms, qry, frg = urlparse(invalid_httpurl())
|
Chris@87
|
225 invalidhttp = os.path.join(self.tmpdir, netloc,
|
Chris@87
|
226 upath.strip(os.sep).strip('/'))
|
Chris@87
|
227 self.assertNotEqual(invalidhttp, self.ds.abspath(valid_httpurl()))
|
Chris@87
|
228
|
Chris@87
|
229 def test_InvalidFile(self):
|
Chris@87
|
230 invalidfile = valid_textfile(self.tmpdir)
|
Chris@87
|
231 tmpfile = valid_textfile(self.tmpdir)
|
Chris@87
|
232 tmpfilename = os.path.split(tmpfile)[-1]
|
Chris@87
|
233 # Test with filename only
|
Chris@87
|
234 self.assertNotEqual(invalidfile, self.ds.abspath(tmpfilename))
|
Chris@87
|
235 # Test filename with complete path
|
Chris@87
|
236 self.assertNotEqual(invalidfile, self.ds.abspath(tmpfile))
|
Chris@87
|
237
|
Chris@87
|
238 def test_sandboxing(self):
|
Chris@87
|
239 tmpfile = valid_textfile(self.tmpdir)
|
Chris@87
|
240 tmpfilename = os.path.split(tmpfile)[-1]
|
Chris@87
|
241
|
Chris@87
|
242 tmp_path = lambda x: os.path.abspath(self.ds.abspath(x))
|
Chris@87
|
243
|
Chris@87
|
244 assert_(tmp_path(valid_httpurl()).startswith(self.tmpdir))
|
Chris@87
|
245 assert_(tmp_path(invalid_httpurl()).startswith(self.tmpdir))
|
Chris@87
|
246 assert_(tmp_path(tmpfile).startswith(self.tmpdir))
|
Chris@87
|
247 assert_(tmp_path(tmpfilename).startswith(self.tmpdir))
|
Chris@87
|
248 for fn in malicious_files:
|
Chris@87
|
249 assert_(tmp_path(http_path+fn).startswith(self.tmpdir))
|
Chris@87
|
250 assert_(tmp_path(fn).startswith(self.tmpdir))
|
Chris@87
|
251
|
Chris@87
|
252 def test_windows_os_sep(self):
|
Chris@87
|
253 orig_os_sep = os.sep
|
Chris@87
|
254 try:
|
Chris@87
|
255 os.sep = '\\'
|
Chris@87
|
256 self.test_ValidHTTP()
|
Chris@87
|
257 self.test_ValidFile()
|
Chris@87
|
258 self.test_InvalidHTTP()
|
Chris@87
|
259 self.test_InvalidFile()
|
Chris@87
|
260 self.test_sandboxing()
|
Chris@87
|
261 finally:
|
Chris@87
|
262 os.sep = orig_os_sep
|
Chris@87
|
263
|
Chris@87
|
264
|
Chris@87
|
265 class TestRepositoryAbspath(TestCase):
|
Chris@87
|
266 def setUp(self):
|
Chris@87
|
267 self.tmpdir = os.path.abspath(mkdtemp())
|
Chris@87
|
268 self.repos = datasource.Repository(valid_baseurl(), self.tmpdir)
|
Chris@87
|
269
|
Chris@87
|
270 def tearDown(self):
|
Chris@87
|
271 rmtree(self.tmpdir)
|
Chris@87
|
272 del self.repos
|
Chris@87
|
273
|
Chris@87
|
274 def test_ValidHTTP(self):
|
Chris@87
|
275 scheme, netloc, upath, pms, qry, frg = urlparse(valid_httpurl())
|
Chris@87
|
276 local_path = os.path.join(self.repos._destpath, netloc,
|
Chris@87
|
277 upath.strip(os.sep).strip('/'))
|
Chris@87
|
278 filepath = self.repos.abspath(valid_httpfile())
|
Chris@87
|
279 self.assertEqual(local_path, filepath)
|
Chris@87
|
280
|
Chris@87
|
281 def test_sandboxing(self):
|
Chris@87
|
282 tmp_path = lambda x: os.path.abspath(self.repos.abspath(x))
|
Chris@87
|
283 assert_(tmp_path(valid_httpfile()).startswith(self.tmpdir))
|
Chris@87
|
284 for fn in malicious_files:
|
Chris@87
|
285 assert_(tmp_path(http_path+fn).startswith(self.tmpdir))
|
Chris@87
|
286 assert_(tmp_path(fn).startswith(self.tmpdir))
|
Chris@87
|
287
|
Chris@87
|
288 def test_windows_os_sep(self):
|
Chris@87
|
289 orig_os_sep = os.sep
|
Chris@87
|
290 try:
|
Chris@87
|
291 os.sep = '\\'
|
Chris@87
|
292 self.test_ValidHTTP()
|
Chris@87
|
293 self.test_sandboxing()
|
Chris@87
|
294 finally:
|
Chris@87
|
295 os.sep = orig_os_sep
|
Chris@87
|
296
|
Chris@87
|
297
|
Chris@87
|
298 class TestRepositoryExists(TestCase):
|
Chris@87
|
299 def setUp(self):
|
Chris@87
|
300 self.tmpdir = mkdtemp()
|
Chris@87
|
301 self.repos = datasource.Repository(valid_baseurl(), self.tmpdir)
|
Chris@87
|
302
|
Chris@87
|
303 def tearDown(self):
|
Chris@87
|
304 rmtree(self.tmpdir)
|
Chris@87
|
305 del self.repos
|
Chris@87
|
306
|
Chris@87
|
307 def test_ValidFile(self):
|
Chris@87
|
308 # Create local temp file
|
Chris@87
|
309 tmpfile = valid_textfile(self.tmpdir)
|
Chris@87
|
310 assert_(self.repos.exists(tmpfile))
|
Chris@87
|
311
|
Chris@87
|
312 def test_InvalidFile(self):
|
Chris@87
|
313 tmpfile = invalid_textfile(self.tmpdir)
|
Chris@87
|
314 self.assertEqual(self.repos.exists(tmpfile), False)
|
Chris@87
|
315
|
Chris@87
|
316 def test_RemoveHTTPFile(self):
|
Chris@87
|
317 assert_(self.repos.exists(valid_httpurl()))
|
Chris@87
|
318
|
Chris@87
|
319 def test_CachedHTTPFile(self):
|
Chris@87
|
320 localfile = valid_httpurl()
|
Chris@87
|
321 # Create a locally cached temp file with an URL based
|
Chris@87
|
322 # directory structure. This is similar to what Repository.open
|
Chris@87
|
323 # would do.
|
Chris@87
|
324 scheme, netloc, upath, pms, qry, frg = urlparse(localfile)
|
Chris@87
|
325 local_path = os.path.join(self.repos._destpath, netloc)
|
Chris@87
|
326 os.mkdir(local_path, 0o0700)
|
Chris@87
|
327 tmpfile = valid_textfile(local_path)
|
Chris@87
|
328 assert_(self.repos.exists(tmpfile))
|
Chris@87
|
329
|
Chris@87
|
330
|
Chris@87
|
331 class TestOpenFunc(TestCase):
|
Chris@87
|
332 def setUp(self):
|
Chris@87
|
333 self.tmpdir = mkdtemp()
|
Chris@87
|
334
|
Chris@87
|
335 def tearDown(self):
|
Chris@87
|
336 rmtree(self.tmpdir)
|
Chris@87
|
337
|
Chris@87
|
338 def test_DataSourceOpen(self):
|
Chris@87
|
339 local_file = valid_textfile(self.tmpdir)
|
Chris@87
|
340 # Test case where destpath is passed in
|
Chris@87
|
341 fp = datasource.open(local_file, destpath=self.tmpdir)
|
Chris@87
|
342 assert_(fp)
|
Chris@87
|
343 fp.close()
|
Chris@87
|
344 # Test case where default destpath is used
|
Chris@87
|
345 fp = datasource.open(local_file)
|
Chris@87
|
346 assert_(fp)
|
Chris@87
|
347 fp.close()
|
Chris@87
|
348
|
Chris@87
|
349
|
Chris@87
|
350 if __name__ == "__main__":
|
Chris@87
|
351 run_module_suite()
|