Mercurial > hg > hybrid-music-recommender-using-content-based-and-social-information
comparison Code/python_oauth2-master/tests/test_oauth.py @ 21:e68dbee1f6db
Modified code
New datasets
Updated report
author | Paulo Chiliguano <p.e.chiilguano@se14.qmul.ac.uk> |
---|---|
date | Tue, 11 Aug 2015 10:50:36 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
20:1dbd24575d44 | 21:e68dbee1f6db |
---|---|
1 # -*- coding: utf-8 -*- | |
2 | |
3 """ | |
4 The MIT License | |
5 | |
6 Copyright (c) 2009 Vic Fryzel | |
7 | |
8 Permission is hereby granted, free of charge, to any person obtaining a copy | |
9 of this software and associated documentation files (the "Software"), to deal | |
10 in the Software without restriction, including without limitation the rights | |
11 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
12 copies of the Software, and to permit persons to whom the Software is | |
13 furnished to do so, subject to the following conditions: | |
14 | |
15 The above copyright notice and this permission notice shall be included in | |
16 all copies or substantial portions of the Software. | |
17 | |
18 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
19 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
20 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
21 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
22 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
23 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
24 THE SOFTWARE. | |
25 """ | |
26 import sys | |
27 import os | |
28 import unittest | |
29 import oauth2 as oauth | |
30 import random | |
31 import time | |
32 import urllib | |
33 import urlparse | |
34 from types import ListType | |
35 import mock | |
36 import httplib2 | |
37 | |
38 # Fix for python2.5 compatibility | |
39 try: | |
40 from urlparse import parse_qs, parse_qsl | |
41 except ImportError: | |
42 from cgi import parse_qs, parse_qsl | |
43 | |
44 | |
45 sys.path[0:0] = [os.path.join(os.path.dirname(__file__), ".."),] | |
46 | |
47 | |
48 class TestError(unittest.TestCase): | |
49 def test_message(self): | |
50 try: | |
51 raise oauth.Error | |
52 except oauth.Error, e: | |
53 self.assertEqual(e.message, 'OAuth error occurred.') | |
54 | |
55 msg = 'OMG THINGS BROKE!!!!' | |
56 try: | |
57 raise oauth.Error(msg) | |
58 except oauth.Error, e: | |
59 self.assertEqual(e.message, msg) | |
60 | |
61 def test_str(self): | |
62 try: | |
63 raise oauth.Error | |
64 except oauth.Error, e: | |
65 self.assertEquals(str(e), 'OAuth error occurred.') | |
66 | |
67 class TestGenerateFunctions(unittest.TestCase): | |
68 def test_build_auth_header(self): | |
69 header = oauth.build_authenticate_header() | |
70 self.assertEqual(header['WWW-Authenticate'], 'OAuth realm=""') | |
71 self.assertEqual(len(header), 1) | |
72 realm = 'http://example.myrealm.com/' | |
73 header = oauth.build_authenticate_header(realm) | |
74 self.assertEqual(header['WWW-Authenticate'], 'OAuth realm="%s"' % | |
75 realm) | |
76 self.assertEqual(len(header), 1) | |
77 | |
78 def test_build_xoauth_string(self): | |
79 consumer = oauth.Consumer('consumer_token', 'consumer_secret') | |
80 token = oauth.Token('user_token', 'user_secret') | |
81 url = "https://mail.google.com/mail/b/joe@example.com/imap/" | |
82 xoauth_string = oauth.build_xoauth_string(url, consumer, token) | |
83 | |
84 method, oauth_url, oauth_string = xoauth_string.split(' ') | |
85 | |
86 self.assertEqual("GET", method) | |
87 self.assertEqual(url, oauth_url) | |
88 | |
89 returned = {} | |
90 parts = oauth_string.split(',') | |
91 for part in parts: | |
92 var, val = part.split('=') | |
93 returned[var] = val.strip('"') | |
94 | |
95 self.assertEquals('HMAC-SHA1', returned['oauth_signature_method']) | |
96 self.assertEquals('user_token', returned['oauth_token']) | |
97 self.assertEquals('consumer_token', returned['oauth_consumer_key']) | |
98 self.assertTrue('oauth_signature' in returned, 'oauth_signature') | |
99 | |
100 def test_escape(self): | |
101 string = 'http://whatever.com/~someuser/?test=test&other=other' | |
102 self.assert_('~' in oauth.escape(string)) | |
103 string = '../../../../../../../etc/passwd' | |
104 self.assert_('../' not in oauth.escape(string)) | |
105 | |
106 def test_gen_nonce(self): | |
107 nonce = oauth.generate_nonce() | |
108 self.assertEqual(len(nonce), 8) | |
109 nonce = oauth.generate_nonce(20) | |
110 self.assertEqual(len(nonce), 20) | |
111 | |
112 def test_gen_verifier(self): | |
113 verifier = oauth.generate_verifier() | |
114 self.assertEqual(len(verifier), 8) | |
115 verifier = oauth.generate_verifier(16) | |
116 self.assertEqual(len(verifier), 16) | |
117 | |
118 def test_gen_timestamp(self): | |
119 exp = int(time.time()) | |
120 now = oauth.generate_timestamp() | |
121 self.assertEqual(exp, now) | |
122 | |
123 class TestConsumer(unittest.TestCase): | |
124 def setUp(self): | |
125 self.key = 'my-key' | |
126 self.secret = 'my-secret' | |
127 self.consumer = oauth.Consumer(key=self.key, secret=self.secret) | |
128 | |
129 def test_init(self): | |
130 self.assertEqual(self.consumer.key, self.key) | |
131 self.assertEqual(self.consumer.secret, self.secret) | |
132 | |
133 def test_basic(self): | |
134 self.assertRaises(ValueError, lambda: oauth.Consumer(None, None)) | |
135 self.assertRaises(ValueError, lambda: oauth.Consumer('asf', None)) | |
136 self.assertRaises(ValueError, lambda: oauth.Consumer(None, 'dasf')) | |
137 | |
138 def test_str(self): | |
139 res = dict(parse_qsl(str(self.consumer))) | |
140 self.assertTrue('oauth_consumer_key' in res) | |
141 self.assertTrue('oauth_consumer_secret' in res) | |
142 self.assertEquals(res['oauth_consumer_key'], self.consumer.key) | |
143 self.assertEquals(res['oauth_consumer_secret'], self.consumer.secret) | |
144 | |
145 class TestToken(unittest.TestCase): | |
146 def setUp(self): | |
147 self.key = 'my-key' | |
148 self.secret = 'my-secret' | |
149 self.token = oauth.Token(self.key, self.secret) | |
150 | |
151 def test_basic(self): | |
152 self.assertRaises(ValueError, lambda: oauth.Token(None, None)) | |
153 self.assertRaises(ValueError, lambda: oauth.Token('asf', None)) | |
154 self.assertRaises(ValueError, lambda: oauth.Token(None, 'dasf')) | |
155 | |
156 def test_init(self): | |
157 self.assertEqual(self.token.key, self.key) | |
158 self.assertEqual(self.token.secret, self.secret) | |
159 self.assertEqual(self.token.callback, None) | |
160 self.assertEqual(self.token.callback_confirmed, None) | |
161 self.assertEqual(self.token.verifier, None) | |
162 | |
163 def test_set_callback(self): | |
164 self.assertEqual(self.token.callback, None) | |
165 self.assertEqual(self.token.callback_confirmed, None) | |
166 cb = 'http://www.example.com/my-callback' | |
167 self.token.set_callback(cb) | |
168 self.assertEqual(self.token.callback, cb) | |
169 self.assertEqual(self.token.callback_confirmed, 'true') | |
170 self.token.set_callback(None) | |
171 self.assertEqual(self.token.callback, None) | |
172 # TODO: The following test should probably not pass, but it does | |
173 # To fix this, check for None and unset 'true' in set_callback | |
174 # Additionally, should a confirmation truly be done of the callback? | |
175 self.assertEqual(self.token.callback_confirmed, 'true') | |
176 | |
177 def test_set_verifier(self): | |
178 self.assertEqual(self.token.verifier, None) | |
179 v = oauth.generate_verifier() | |
180 self.token.set_verifier(v) | |
181 self.assertEqual(self.token.verifier, v) | |
182 self.token.set_verifier() | |
183 self.assertNotEqual(self.token.verifier, v) | |
184 self.token.set_verifier('') | |
185 self.assertEqual(self.token.verifier, '') | |
186 | |
187 def test_get_callback_url(self): | |
188 self.assertEqual(self.token.get_callback_url(), None) | |
189 | |
190 self.token.set_verifier() | |
191 self.assertEqual(self.token.get_callback_url(), None) | |
192 | |
193 cb = 'http://www.example.com/my-callback?save=1&return=true' | |
194 v = oauth.generate_verifier() | |
195 self.token.set_callback(cb) | |
196 self.token.set_verifier(v) | |
197 url = self.token.get_callback_url() | |
198 verifier_str = '&oauth_verifier=%s' % v | |
199 self.assertEqual(url, '%s%s' % (cb, verifier_str)) | |
200 | |
201 cb = 'http://www.example.com/my-callback-no-query' | |
202 v = oauth.generate_verifier() | |
203 self.token.set_callback(cb) | |
204 self.token.set_verifier(v) | |
205 url = self.token.get_callback_url() | |
206 verifier_str = '?oauth_verifier=%s' % v | |
207 self.assertEqual(url, '%s%s' % (cb, verifier_str)) | |
208 | |
209 def test_to_string(self): | |
210 string = 'oauth_token_secret=%s&oauth_token=%s' % (self.secret, | |
211 self.key) | |
212 self.assertEqual(self.token.to_string(), string) | |
213 | |
214 self.token.set_callback('http://www.example.com/my-callback') | |
215 string += '&oauth_callback_confirmed=true' | |
216 self.assertEqual(self.token.to_string(), string) | |
217 | |
218 def _compare_tokens(self, new): | |
219 self.assertEqual(self.token.key, new.key) | |
220 self.assertEqual(self.token.secret, new.secret) | |
221 # TODO: What about copying the callback to the new token? | |
222 # self.assertEqual(self.token.callback, new.callback) | |
223 self.assertEqual(self.token.callback_confirmed, | |
224 new.callback_confirmed) | |
225 # TODO: What about copying the verifier to the new token? | |
226 # self.assertEqual(self.token.verifier, new.verifier) | |
227 | |
228 def test_to_string(self): | |
229 tok = oauth.Token('tooken', 'seecret') | |
230 self.assertEqual(str(tok), 'oauth_token_secret=seecret&oauth_token=tooken') | |
231 | |
232 def test_from_string(self): | |
233 self.assertRaises(ValueError, lambda: oauth.Token.from_string('')) | |
234 self.assertRaises(ValueError, lambda: oauth.Token.from_string('blahblahblah')) | |
235 self.assertRaises(ValueError, lambda: oauth.Token.from_string('blah=blah')) | |
236 | |
237 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret=asfdasf')) | |
238 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret=')) | |
239 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=asfdasf')) | |
240 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=')) | |
241 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=&oauth_token_secret=')) | |
242 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=tooken%26oauth_token_secret=seecret')) | |
243 | |
244 string = self.token.to_string() | |
245 new = oauth.Token.from_string(string) | |
246 self._compare_tokens(new) | |
247 | |
248 self.token.set_callback('http://www.example.com/my-callback') | |
249 string = self.token.to_string() | |
250 new = oauth.Token.from_string(string) | |
251 self._compare_tokens(new) | |
252 | |
253 class ReallyEqualMixin: | |
254 def failUnlessReallyEqual(self, a, b, msg=None): | |
255 self.failUnlessEqual(a, b, msg=msg) | |
256 self.failUnlessEqual(type(a), type(b), msg="a :: %r, b :: %r, %r" % (a, b, msg)) | |
257 | |
258 class TestFuncs(unittest.TestCase): | |
259 def test_to_unicode(self): | |
260 self.failUnlessRaises(TypeError, oauth.to_unicode, '\xae') | |
261 self.failUnlessRaises(TypeError, oauth.to_unicode_optional_iterator, '\xae') | |
262 self.failUnlessRaises(TypeError, oauth.to_unicode_optional_iterator, ['\xae']) | |
263 | |
264 self.failUnlessEqual(oauth.to_unicode(':-)'), u':-)') | |
265 self.failUnlessEqual(oauth.to_unicode(u'\u00ae'), u'\u00ae') | |
266 self.failUnlessEqual(oauth.to_unicode('\xc2\xae'), u'\u00ae') | |
267 self.failUnlessEqual(oauth.to_unicode_optional_iterator([':-)']), [u':-)']) | |
268 self.failUnlessEqual(oauth.to_unicode_optional_iterator([u'\u00ae']), [u'\u00ae']) | |
269 | |
270 class TestRequest(unittest.TestCase, ReallyEqualMixin): | |
271 def test_setter(self): | |
272 url = "http://example.com" | |
273 method = "GET" | |
274 req = oauth.Request(method) | |
275 self.assertTrue(not hasattr(req, 'url') or req.url is None) | |
276 self.assertTrue(not hasattr(req, 'normalized_url') or req.normalized_url is None) | |
277 | |
278 def test_deleter(self): | |
279 url = "http://example.com" | |
280 method = "GET" | |
281 req = oauth.Request(method, url) | |
282 | |
283 try: | |
284 del req.url | |
285 url = req.url | |
286 self.fail("AttributeError should have been raised on empty url.") | |
287 except AttributeError: | |
288 pass | |
289 except Exception, e: | |
290 self.fail(str(e)) | |
291 | |
292 def test_url(self): | |
293 url1 = "http://example.com:80/foo.php" | |
294 url2 = "https://example.com:443/foo.php" | |
295 exp1 = "http://example.com/foo.php" | |
296 exp2 = "https://example.com/foo.php" | |
297 method = "GET" | |
298 | |
299 req = oauth.Request(method, url1) | |
300 self.assertEquals(req.normalized_url, exp1) | |
301 self.assertEquals(req.url, url1) | |
302 | |
303 req = oauth.Request(method, url2) | |
304 self.assertEquals(req.normalized_url, exp2) | |
305 self.assertEquals(req.url, url2) | |
306 | |
307 def test_bad_url(self): | |
308 request = oauth.Request() | |
309 try: | |
310 request.url = "ftp://example.com" | |
311 self.fail("Invalid URL scheme was accepted.") | |
312 except ValueError: | |
313 pass | |
314 | |
315 def test_unset_consumer_and_token(self): | |
316 consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret') | |
317 token = oauth.Token('my_key', 'my_secret') | |
318 request = oauth.Request("GET", "http://example.com/fetch.php") | |
319 request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, | |
320 token) | |
321 | |
322 self.assertEquals(consumer.key, request['oauth_consumer_key']) | |
323 self.assertEquals(token.key, request['oauth_token']) | |
324 | |
325 def test_no_url_set(self): | |
326 consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret') | |
327 token = oauth.Token('my_key', 'my_secret') | |
328 request = oauth.Request() | |
329 | |
330 try: | |
331 try: | |
332 request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), | |
333 consumer, token) | |
334 except TypeError: | |
335 self.fail("Signature method didn't check for a normalized URL.") | |
336 except ValueError: | |
337 pass | |
338 | |
339 def test_url_query(self): | |
340 url = "https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10" | |
341 normalized_url = urlparse.urlunparse(urlparse.urlparse(url)[:3] + (None, None, None)) | |
342 method = "GET" | |
343 | |
344 req = oauth.Request(method, url) | |
345 self.assertEquals(req.url, url) | |
346 self.assertEquals(req.normalized_url, normalized_url) | |
347 | |
348 def test_get_parameter(self): | |
349 url = "http://example.com" | |
350 method = "GET" | |
351 params = {'oauth_consumer' : 'asdf'} | |
352 req = oauth.Request(method, url, parameters=params) | |
353 | |
354 self.assertEquals(req.get_parameter('oauth_consumer'), 'asdf') | |
355 self.assertRaises(oauth.Error, req.get_parameter, 'blah') | |
356 | |
357 def test_get_nonoauth_parameters(self): | |
358 | |
359 oauth_params = { | |
360 'oauth_consumer': 'asdfasdfasdf' | |
361 } | |
362 | |
363 other_params = { | |
364 u'foo': u'baz', | |
365 u'bar': u'foo', | |
366 u'multi': [u'FOO',u'BAR'], | |
367 u'uni_utf8': u'\xae', | |
368 u'uni_unicode': u'\u00ae', | |
369 u'uni_unicode_2': u'åÅøØ', | |
370 } | |
371 | |
372 params = oauth_params | |
373 params.update(other_params) | |
374 | |
375 req = oauth.Request("GET", "http://example.com", params) | |
376 self.assertEquals(other_params, req.get_nonoauth_parameters()) | |
377 | |
378 def test_to_header(self): | |
379 realm = "http://sp.example.com/" | |
380 | |
381 params = { | |
382 'oauth_version': "1.0", | |
383 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
384 'oauth_timestamp': "137131200", | |
385 'oauth_consumer_key': "0685bd9184jfhq22", | |
386 'oauth_signature_method': "HMAC-SHA1", | |
387 'oauth_token': "ad180jjd733klru7", | |
388 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", | |
389 } | |
390 | |
391 req = oauth.Request("GET", realm, params) | |
392 header, value = req.to_header(realm).items()[0] | |
393 | |
394 parts = value.split('OAuth ') | |
395 vars = parts[1].split(', ') | |
396 self.assertTrue(len(vars), (len(params) + 1)) | |
397 | |
398 res = {} | |
399 for v in vars: | |
400 var, val = v.split('=') | |
401 res[var] = urllib.unquote(val.strip('"')) | |
402 | |
403 self.assertEquals(realm, res['realm']) | |
404 del res['realm'] | |
405 | |
406 self.assertTrue(len(res), len(params)) | |
407 | |
408 for key, val in res.items(): | |
409 self.assertEquals(val, params.get(key)) | |
410 | |
411 def test_to_postdata_nonascii(self): | |
412 realm = "http://sp.example.com/" | |
413 | |
414 params = { | |
415 'nonasciithing': u'q\xbfu\xe9 ,aasp u?..a.s', | |
416 'oauth_version': "1.0", | |
417 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
418 'oauth_timestamp': "137131200", | |
419 'oauth_consumer_key': "0685bd9184jfhq22", | |
420 'oauth_signature_method': "HMAC-SHA1", | |
421 'oauth_token': "ad180jjd733klru7", | |
422 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", | |
423 } | |
424 | |
425 req = oauth.Request("GET", realm, params) | |
426 | |
427 self.failUnlessReallyEqual(req.to_postdata(), 'nonasciithing=q%C2%BFu%C3%A9%20%2Caasp%20u%3F..a.s&oauth_nonce=4572616e48616d6d65724c61686176&oauth_timestamp=137131200&oauth_consumer_key=0685bd9184jfhq22&oauth_signature_method=HMAC-SHA1&oauth_version=1.0&oauth_token=ad180jjd733klru7&oauth_signature=wOJIO9A2W5mFwDgiDvZbTSMK%252FPY%253D') | |
428 | |
429 def test_to_postdata(self): | |
430 realm = "http://sp.example.com/" | |
431 | |
432 params = { | |
433 'multi': ['FOO','BAR'], | |
434 'oauth_version': "1.0", | |
435 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
436 'oauth_timestamp': "137131200", | |
437 'oauth_consumer_key': "0685bd9184jfhq22", | |
438 'oauth_signature_method': "HMAC-SHA1", | |
439 'oauth_token': "ad180jjd733klru7", | |
440 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", | |
441 } | |
442 | |
443 req = oauth.Request("GET", realm, params) | |
444 | |
445 flat = [('multi','FOO'),('multi','BAR')] | |
446 del params['multi'] | |
447 flat.extend(params.items()) | |
448 kf = lambda x: x[0] | |
449 self.assertEquals(sorted(flat, key=kf), sorted(parse_qsl(req.to_postdata()), key=kf)) | |
450 | |
451 def test_to_url(self): | |
452 url = "http://sp.example.com/" | |
453 | |
454 params = { | |
455 'oauth_version': "1.0", | |
456 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
457 'oauth_timestamp': "137131200", | |
458 'oauth_consumer_key': "0685bd9184jfhq22", | |
459 'oauth_signature_method': "HMAC-SHA1", | |
460 'oauth_token': "ad180jjd733klru7", | |
461 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", | |
462 } | |
463 | |
464 req = oauth.Request("GET", url, params) | |
465 exp = urlparse.urlparse("%s?%s" % (url, urllib.urlencode(params))) | |
466 res = urlparse.urlparse(req.to_url()) | |
467 self.assertEquals(exp.scheme, res.scheme) | |
468 self.assertEquals(exp.netloc, res.netloc) | |
469 self.assertEquals(exp.path, res.path) | |
470 | |
471 a = parse_qs(exp.query) | |
472 b = parse_qs(res.query) | |
473 self.assertEquals(a, b) | |
474 | |
475 def test_to_url_with_query(self): | |
476 url = "https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10" | |
477 | |
478 params = { | |
479 'oauth_version': "1.0", | |
480 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
481 'oauth_timestamp': "137131200", | |
482 'oauth_consumer_key': "0685bd9184jfhq22", | |
483 'oauth_signature_method': "HMAC-SHA1", | |
484 'oauth_token': "ad180jjd733klru7", | |
485 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", | |
486 } | |
487 | |
488 req = oauth.Request("GET", url, params) | |
489 # Note: the url above already has query parameters, so append new ones with & | |
490 exp = urlparse.urlparse("%s&%s" % (url, urllib.urlencode(params))) | |
491 res = urlparse.urlparse(req.to_url()) | |
492 self.assertEquals(exp.scheme, res.scheme) | |
493 self.assertEquals(exp.netloc, res.netloc) | |
494 self.assertEquals(exp.path, res.path) | |
495 | |
496 a = parse_qs(exp.query) | |
497 b = parse_qs(res.query) | |
498 self.assertTrue('alt' in b) | |
499 self.assertTrue('max-contacts' in b) | |
500 self.assertEquals(b['alt'], ['json']) | |
501 self.assertEquals(b['max-contacts'], ['10']) | |
502 self.assertEquals(a, b) | |
503 | |
504 def test_signature_base_string_nonascii_nonutf8(self): | |
505 consumer = oauth.Consumer('consumer_token', 'consumer_secret') | |
506 | |
507 url = u'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc\u2766,+CA' | |
508 req = oauth.Request("GET", url) | |
509 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json') | |
510 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) | |
511 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=') | |
512 | |
513 url = 'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc\xe2\x9d\xa6,+CA' | |
514 req = oauth.Request("GET", url) | |
515 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json') | |
516 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) | |
517 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=') | |
518 | |
519 url = 'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA' | |
520 req = oauth.Request("GET", url) | |
521 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json') | |
522 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) | |
523 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=') | |
524 | |
525 url = u'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA' | |
526 req = oauth.Request("GET", url) | |
527 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json') | |
528 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) | |
529 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=') | |
530 | |
531 def test_signature_base_string_with_query(self): | |
532 url = "https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10" | |
533 params = { | |
534 'oauth_version': "1.0", | |
535 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
536 'oauth_timestamp': "137131200", | |
537 'oauth_consumer_key': "0685bd9184jfhq22", | |
538 'oauth_signature_method': "HMAC-SHA1", | |
539 'oauth_token': "ad180jjd733klru7", | |
540 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", | |
541 } | |
542 req = oauth.Request("GET", url, params) | |
543 self.assertEquals(req.normalized_url, 'https://www.google.com/m8/feeds/contacts/default/full/') | |
544 self.assertEquals(req.url, 'https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10') | |
545 normalized_params = parse_qsl(req.get_normalized_parameters()) | |
546 self.assertTrue(len(normalized_params), len(params) + 2) | |
547 normalized_params = dict(normalized_params) | |
548 for key, value in params.iteritems(): | |
549 if key == 'oauth_signature': | |
550 continue | |
551 self.assertEquals(value, normalized_params[key]) | |
552 self.assertEquals(normalized_params['alt'], 'json') | |
553 self.assertEquals(normalized_params['max-contacts'], '10') | |
554 | |
555 def test_get_normalized_parameters_empty(self): | |
556 url = "http://sp.example.com/?empty=" | |
557 | |
558 req = oauth.Request("GET", url) | |
559 | |
560 res = req.get_normalized_parameters() | |
561 | |
562 expected='empty=' | |
563 | |
564 self.assertEquals(expected, res) | |
565 | |
566 def test_get_normalized_parameters_duplicate(self): | |
567 url = "http://example.com/v2/search/videos?oauth_nonce=79815175&oauth_timestamp=1295397962&oauth_consumer_key=mykey&oauth_signature_method=HMAC-SHA1&q=car&oauth_version=1.0&offset=10&oauth_signature=spWLI%2FGQjid7sQVd5%2FarahRxzJg%3D" | |
568 | |
569 req = oauth.Request("GET", url) | |
570 | |
571 res = req.get_normalized_parameters() | |
572 | |
573 expected='oauth_consumer_key=mykey&oauth_nonce=79815175&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1295397962&oauth_version=1.0&offset=10&q=car' | |
574 | |
575 self.assertEquals(expected, res) | |
576 | |
577 def test_get_normalized_parameters_from_url(self): | |
578 # example copied from | |
579 # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js | |
580 # which in turns says that it was copied from | |
581 # http://oauth.net/core/1.0/#sig_base_example . | |
582 url = "http://photos.example.net/photos?file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk&oauth_version=1.0&size=original" | |
583 | |
584 req = oauth.Request("GET", url) | |
585 | |
586 res = req.get_normalized_parameters() | |
587 | |
588 expected = 'file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk&oauth_version=1.0&size=original' | |
589 | |
590 self.assertEquals(expected, res) | |
591 | |
592 def test_signing_base(self): | |
593 # example copied from | |
594 # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js | |
595 # which in turns says that it was copied from | |
596 # http://oauth.net/core/1.0/#sig_base_example . | |
597 url = "http://photos.example.net/photos?file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk&oauth_version=1.0&size=original" | |
598 | |
599 req = oauth.Request("GET", url) | |
600 | |
601 sm = oauth.SignatureMethod_HMAC_SHA1() | |
602 | |
603 consumer = oauth.Consumer('dpf43f3p2l4k3l03', 'foo') | |
604 key, raw = sm.signing_base(req, consumer, None) | |
605 | |
606 expected = 'GET&http%3A%2F%2Fphotos.example.net%2Fphotos&file%3Dvacation.jpg%26oauth_consumer_key%3Ddpf43f3p2l4k3l03%26oauth_nonce%3Dkllo9940pd9333jh%26oauth_signature_method%3DHMAC-SHA1%26oauth_timestamp%3D1191242096%26oauth_token%3Dnnch734d00sl2jdk%26oauth_version%3D1.0%26size%3Doriginal' | |
607 self.assertEquals(expected, raw) | |
608 | |
609 def test_get_normalized_parameters(self): | |
610 url = "http://sp.example.com/" | |
611 | |
612 params = { | |
613 'oauth_version': "1.0", | |
614 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
615 'oauth_timestamp': "137131200", | |
616 'oauth_consumer_key': "0685bd9184jfhq22", | |
617 'oauth_signature_method': "HMAC-SHA1", | |
618 'oauth_token': "ad180jjd733klru7", | |
619 'multi': ['FOO','BAR', u'\u00ae', '\xc2\xae'], | |
620 'multi_same': ['FOO','FOO'], | |
621 'uni_utf8_bytes': '\xc2\xae', | |
622 'uni_unicode_object': u'\u00ae' | |
623 } | |
624 | |
625 req = oauth.Request("GET", url, params) | |
626 | |
627 res = req.get_normalized_parameters() | |
628 | |
629 expected='multi=BAR&multi=FOO&multi=%C2%AE&multi=%C2%AE&multi_same=FOO&multi_same=FOO&oauth_consumer_key=0685bd9184jfhq22&oauth_nonce=4572616e48616d6d65724c61686176&oauth_signature_method=HMAC-SHA1&oauth_timestamp=137131200&oauth_token=ad180jjd733klru7&oauth_version=1.0&uni_unicode_object=%C2%AE&uni_utf8_bytes=%C2%AE' | |
630 | |
631 self.assertEquals(expected, res) | |
632 | |
633 def test_get_normalized_parameters_ignores_auth_signature(self): | |
634 url = "http://sp.example.com/" | |
635 | |
636 params = { | |
637 'oauth_version': "1.0", | |
638 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
639 'oauth_timestamp': "137131200", | |
640 'oauth_consumer_key': "0685bd9184jfhq22", | |
641 'oauth_signature_method': "HMAC-SHA1", | |
642 'oauth_signature': "some-random-signature-%d" % random.randint(1000, 2000), | |
643 'oauth_token': "ad180jjd733klru7", | |
644 } | |
645 | |
646 req = oauth.Request("GET", url, params) | |
647 | |
648 res = req.get_normalized_parameters() | |
649 | |
650 self.assertNotEquals(urllib.urlencode(sorted(params.items())), res) | |
651 | |
652 foo = params.copy() | |
653 del foo["oauth_signature"] | |
654 self.assertEqual(urllib.urlencode(sorted(foo.items())), res) | |
655 | |
656 def test_set_signature_method(self): | |
657 consumer = oauth.Consumer('key', 'secret') | |
658 client = oauth.Client(consumer) | |
659 | |
660 class Blah: | |
661 pass | |
662 | |
663 try: | |
664 client.set_signature_method(Blah()) | |
665 self.fail("Client.set_signature_method() accepted invalid method.") | |
666 except ValueError: | |
667 pass | |
668 | |
669 m = oauth.SignatureMethod_HMAC_SHA1() | |
670 client.set_signature_method(m) | |
671 self.assertEquals(m, client.method) | |
672 | |
673 def test_get_normalized_string_escapes_spaces_properly(self): | |
674 url = "http://sp.example.com/" | |
675 params = { | |
676 "some_random_data": random.randint(100, 1000), | |
677 "data": "This data with a random number (%d) has spaces!" % random.randint(1000, 2000), | |
678 } | |
679 | |
680 req = oauth.Request("GET", url, params) | |
681 res = req.get_normalized_parameters() | |
682 expected = urllib.urlencode(sorted(params.items())).replace('+', '%20') | |
683 self.assertEqual(expected, res) | |
684 | |
685 @mock.patch('oauth2.Request.make_timestamp') | |
686 @mock.patch('oauth2.Request.make_nonce') | |
687 def test_request_nonutf8_bytes(self, mock_make_nonce, mock_make_timestamp): | |
688 mock_make_nonce.return_value = 5 | |
689 mock_make_timestamp.return_value = 6 | |
690 | |
691 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret") | |
692 con = oauth.Consumer(key="con-test-key", secret="con-test-secret") | |
693 params = { | |
694 'oauth_version': "1.0", | |
695 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
696 'oauth_timestamp': "137131200", | |
697 'oauth_token': tok.key, | |
698 'oauth_consumer_key': con.key | |
699 } | |
700 | |
701 # If someone passes a sequence of bytes which is not ascii for | |
702 # url, we'll raise an exception as early as possible. | |
703 url = "http://sp.example.com/\x92" # It's actually cp1252-encoding... | |
704 self.assertRaises(TypeError, oauth.Request, method="GET", url=url, parameters=params) | |
705 | |
706 # And if they pass an unicode, then we'll use it. | |
707 url = u'http://sp.example.com/\u2019' | |
708 req = oauth.Request(method="GET", url=url, parameters=params) | |
709 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None) | |
710 self.failUnlessReallyEqual(req['oauth_signature'], 'cMzvCkhvLL57+sTIxLITTHfkqZk=') | |
711 | |
712 # And if it is a utf-8-encoded-then-percent-encoded non-ascii | |
713 # thing, we'll decode it and use it. | |
714 url = "http://sp.example.com/%E2%80%99" | |
715 req = oauth.Request(method="GET", url=url, parameters=params) | |
716 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None) | |
717 self.failUnlessReallyEqual(req['oauth_signature'], 'yMLKOyNKC/DkyhUOb8DLSvceEWE=') | |
718 | |
719 # Same thing with the params. | |
720 url = "http://sp.example.com/" | |
721 | |
722 # If someone passes a sequence of bytes which is not ascii in | |
723 # params, we'll raise an exception as early as possible. | |
724 params['non_oauth_thing'] = '\xae', # It's actually cp1252-encoding... | |
725 self.assertRaises(TypeError, oauth.Request, method="GET", url=url, parameters=params) | |
726 | |
727 # And if they pass a unicode, then we'll use it. | |
728 params['non_oauth_thing'] = u'\u2019' | |
729 req = oauth.Request(method="GET", url=url, parameters=params) | |
730 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None) | |
731 self.failUnlessReallyEqual(req['oauth_signature'], '0GU50m0v60CVDB5JnoBXnvvvKx4=') | |
732 | |
733 # And if it is a utf-8-encoded non-ascii thing, we'll decode | |
734 # it and use it. | |
735 params['non_oauth_thing'] = '\xc2\xae' | |
736 req = oauth.Request(method="GET", url=url, parameters=params) | |
737 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None) | |
738 self.failUnlessReallyEqual(req['oauth_signature'], 'pqOCu4qvRTiGiXB8Z61Jsey0pMM=') | |
739 | |
740 | |
741 # Also if there are non-utf8 bytes in the query args. | |
742 url = "http://sp.example.com/?q=\x92" # cp1252 | |
743 self.assertRaises(TypeError, oauth.Request, method="GET", url=url, parameters=params) | |
744 | |
745 def test_request_hash_of_body(self): | |
746 tok = oauth.Token(key="token", secret="tok-test-secret") | |
747 con = oauth.Consumer(key="consumer", secret="con-test-secret") | |
748 | |
749 # Example 1a from Appendix A.1 of | |
750 # http://oauth.googlecode.com/svn/spec/ext/body_hash/1.0/oauth-bodyhash.html | |
751 # Except that we get a differetn result than they do. | |
752 | |
753 params = { | |
754 'oauth_version': "1.0", | |
755 'oauth_token': tok.key, | |
756 'oauth_nonce': 10288510250934, | |
757 'oauth_timestamp': 1236874155, | |
758 'oauth_consumer_key': con.key | |
759 } | |
760 | |
761 url = u"http://www.example.com/resource" | |
762 req = oauth.Request(method="PUT", url=url, parameters=params, body="Hello World!", is_form_encoded=False) | |
763 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None) | |
764 self.failUnlessReallyEqual(req['oauth_body_hash'], 'Lve95gjOVATpfV8EL5X4nxwjKHE=') | |
765 self.failUnlessReallyEqual(req['oauth_signature'], 't+MX8l/0S8hdbVQL99nD0X1fPnM=') | |
766 # oauth-bodyhash.html A.1 has | |
767 # '08bUFF%2Fjmp59mWB7cSgCYBUpJ0U%3D', but I don't see how that | |
768 # is possible. | |
769 | |
770 # Example 1b | |
771 params = { | |
772 'oauth_version': "1.0", | |
773 'oauth_token': tok.key, | |
774 'oauth_nonce': 10369470270925, | |
775 'oauth_timestamp': 1236874236, | |
776 'oauth_consumer_key': con.key | |
777 } | |
778 | |
779 req = oauth.Request(method="PUT", url=url, parameters=params, body="Hello World!", is_form_encoded=False) | |
780 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None) | |
781 self.failUnlessReallyEqual(req['oauth_body_hash'], 'Lve95gjOVATpfV8EL5X4nxwjKHE=') | |
782 self.failUnlessReallyEqual(req['oauth_signature'], 'CTFmrqJIGT7NsWJ42OrujahTtTc=') | |
783 | |
784 # Appendix A.2 | |
785 params = { | |
786 'oauth_version': "1.0", | |
787 'oauth_token': tok.key, | |
788 'oauth_nonce': 8628868109991, | |
789 'oauth_timestamp': 1238395022, | |
790 'oauth_consumer_key': con.key | |
791 } | |
792 | |
793 req = oauth.Request(method="GET", url=url, parameters=params, is_form_encoded=False) | |
794 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None) | |
795 self.failUnlessReallyEqual(req['oauth_body_hash'], '2jmj7l5rSw0yVb/vlWAYkK/YBwk=') | |
796 self.failUnlessReallyEqual(req['oauth_signature'], 'Zhl++aWSP0O3/hYQ0CuBc7jv38I=') | |
797 | |
798 | |
799 def test_sign_request(self): | |
800 url = "http://sp.example.com/" | |
801 | |
802 params = { | |
803 'oauth_version': "1.0", | |
804 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
805 'oauth_timestamp': "137131200" | |
806 } | |
807 | |
808 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret") | |
809 con = oauth.Consumer(key="con-test-key", secret="con-test-secret") | |
810 | |
811 params['oauth_token'] = tok.key | |
812 params['oauth_consumer_key'] = con.key | |
813 req = oauth.Request(method="GET", url=url, parameters=params) | |
814 | |
815 methods = { | |
816 'DX01TdHws7OninCLK9VztNTH1M4=': oauth.SignatureMethod_HMAC_SHA1(), | |
817 'con-test-secret&tok-test-secret': oauth.SignatureMethod_PLAINTEXT() | |
818 } | |
819 | |
820 for exp, method in methods.items(): | |
821 req.sign_request(method, con, tok) | |
822 self.assertEquals(req['oauth_signature_method'], method.name) | |
823 self.assertEquals(req['oauth_signature'], exp) | |
824 | |
825 # Also if there are non-ascii chars in the URL. | |
826 url = "http://sp.example.com/\xe2\x80\x99" # utf-8 bytes | |
827 req = oauth.Request(method="GET", url=url, parameters=params) | |
828 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok) | |
829 self.assertEquals(req['oauth_signature'], 'loFvp5xC7YbOgd9exIO6TxB7H4s=') | |
830 | |
831 url = u'http://sp.example.com/\u2019' # Python unicode object | |
832 req = oauth.Request(method="GET", url=url, parameters=params) | |
833 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok) | |
834 self.assertEquals(req['oauth_signature'], 'loFvp5xC7YbOgd9exIO6TxB7H4s=') | |
835 | |
836 # Also if there are non-ascii chars in the query args. | |
837 url = "http://sp.example.com/?q=\xe2\x80\x99" # utf-8 bytes | |
838 req = oauth.Request(method="GET", url=url, parameters=params) | |
839 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok) | |
840 self.assertEquals(req['oauth_signature'], 'IBw5mfvoCsDjgpcsVKbyvsDqQaU=') | |
841 | |
842 url = u'http://sp.example.com/?q=\u2019' # Python unicode object | |
843 req = oauth.Request(method="GET", url=url, parameters=params) | |
844 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok) | |
845 self.assertEquals(req['oauth_signature'], 'IBw5mfvoCsDjgpcsVKbyvsDqQaU=') | |
846 | |
847 def test_from_request(self): | |
848 url = "http://sp.example.com/" | |
849 | |
850 params = { | |
851 'oauth_version': "1.0", | |
852 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
853 'oauth_timestamp': "137131200", | |
854 'oauth_consumer_key': "0685bd9184jfhq22", | |
855 'oauth_signature_method': "HMAC-SHA1", | |
856 'oauth_token': "ad180jjd733klru7", | |
857 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", | |
858 } | |
859 | |
860 req = oauth.Request("GET", url, params) | |
861 headers = req.to_header() | |
862 | |
863 # Test from the headers | |
864 req = oauth.Request.from_request("GET", url, headers) | |
865 self.assertEquals(req.method, "GET") | |
866 self.assertEquals(req.url, url) | |
867 | |
868 self.assertEquals(params, req.copy()) | |
869 | |
870 # Test with bad OAuth headers | |
871 bad_headers = { | |
872 'Authorization' : 'OAuth this is a bad header' | |
873 } | |
874 | |
875 self.assertRaises(oauth.Error, oauth.Request.from_request, "GET", | |
876 url, bad_headers) | |
877 | |
878 # Test getting from query string | |
879 qs = urllib.urlencode(params) | |
880 req = oauth.Request.from_request("GET", url, query_string=qs) | |
881 | |
882 exp = parse_qs(qs, keep_blank_values=False) | |
883 for k, v in exp.iteritems(): | |
884 exp[k] = urllib.unquote(v[0]) | |
885 | |
886 self.assertEquals(exp, req.copy()) | |
887 | |
888 # Test that a boned from_request() call returns None | |
889 req = oauth.Request.from_request("GET", url) | |
890 self.assertEquals(None, req) | |
891 | |
892 def test_from_token_and_callback(self): | |
893 url = "http://sp.example.com/" | |
894 | |
895 params = { | |
896 'oauth_version': "1.0", | |
897 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
898 'oauth_timestamp': "137131200", | |
899 'oauth_consumer_key': "0685bd9184jfhq22", | |
900 'oauth_signature_method': "HMAC-SHA1", | |
901 'oauth_token': "ad180jjd733klru7", | |
902 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", | |
903 } | |
904 | |
905 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret") | |
906 req = oauth.Request.from_token_and_callback(tok) | |
907 self.assertFalse('oauth_callback' in req) | |
908 self.assertEquals(req['oauth_token'], tok.key) | |
909 | |
910 req = oauth.Request.from_token_and_callback(tok, callback=url) | |
911 self.assertTrue('oauth_callback' in req) | |
912 self.assertEquals(req['oauth_callback'], url) | |
913 | |
914 def test_from_consumer_and_token(self): | |
915 url = "http://sp.example.com/" | |
916 | |
917 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret") | |
918 tok.set_verifier('this_is_a_test_verifier') | |
919 con = oauth.Consumer(key="con-test-key", secret="con-test-secret") | |
920 req = oauth.Request.from_consumer_and_token(con, token=tok, | |
921 http_method="GET", http_url=url) | |
922 | |
923 self.assertEquals(req['oauth_token'], tok.key) | |
924 self.assertEquals(req['oauth_consumer_key'], con.key) | |
925 self.assertEquals(tok.verifier, req['oauth_verifier']) | |
926 | |
927 class SignatureMethod_Bad(oauth.SignatureMethod): | |
928 name = "BAD" | |
929 | |
930 def signing_base(self, request, consumer, token): | |
931 return "" | |
932 | |
933 def sign(self, request, consumer, token): | |
934 return "invalid-signature" | |
935 | |
936 | |
937 class TestServer(unittest.TestCase): | |
938 def setUp(self): | |
939 url = "http://sp.example.com/" | |
940 | |
941 params = { | |
942 'oauth_version': "1.0", | |
943 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
944 'oauth_timestamp': int(time.time()), | |
945 'bar': 'blerg', | |
946 'multi': ['FOO','BAR'], | |
947 'foo': 59 | |
948 } | |
949 | |
950 self.consumer = oauth.Consumer(key="consumer-key", | |
951 secret="consumer-secret") | |
952 self.token = oauth.Token(key="token-key", secret="token-secret") | |
953 | |
954 params['oauth_token'] = self.token.key | |
955 params['oauth_consumer_key'] = self.consumer.key | |
956 self.request = oauth.Request(method="GET", url=url, parameters=params) | |
957 | |
958 signature_method = oauth.SignatureMethod_HMAC_SHA1() | |
959 self.request.sign_request(signature_method, self.consumer, self.token) | |
960 | |
961 def test_init(self): | |
962 server = oauth.Server(signature_methods={'HMAC-SHA1' : oauth.SignatureMethod_HMAC_SHA1()}) | |
963 self.assertTrue('HMAC-SHA1' in server.signature_methods) | |
964 self.assertTrue(isinstance(server.signature_methods['HMAC-SHA1'], | |
965 oauth.SignatureMethod_HMAC_SHA1)) | |
966 | |
967 server = oauth.Server() | |
968 self.assertEquals(server.signature_methods, {}) | |
969 | |
970 def test_add_signature_method(self): | |
971 server = oauth.Server() | |
972 res = server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) | |
973 self.assertTrue(len(res) == 1) | |
974 self.assertTrue('HMAC-SHA1' in res) | |
975 self.assertTrue(isinstance(res['HMAC-SHA1'], | |
976 oauth.SignatureMethod_HMAC_SHA1)) | |
977 | |
978 res = server.add_signature_method(oauth.SignatureMethod_PLAINTEXT()) | |
979 self.assertTrue(len(res) == 2) | |
980 self.assertTrue('PLAINTEXT' in res) | |
981 self.assertTrue(isinstance(res['PLAINTEXT'], | |
982 oauth.SignatureMethod_PLAINTEXT)) | |
983 | |
984 def test_verify_request(self): | |
985 server = oauth.Server() | |
986 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) | |
987 | |
988 parameters = server.verify_request(self.request, self.consumer, | |
989 self.token) | |
990 | |
991 self.assertTrue('bar' in parameters) | |
992 self.assertTrue('foo' in parameters) | |
993 self.assertTrue('multi' in parameters) | |
994 self.assertEquals(parameters['bar'], 'blerg') | |
995 self.assertEquals(parameters['foo'], 59) | |
996 self.assertEquals(parameters['multi'], ['FOO','BAR']) | |
997 | |
998 def test_build_authenticate_header(self): | |
999 server = oauth.Server() | |
1000 headers = server.build_authenticate_header('example.com') | |
1001 self.assertTrue('WWW-Authenticate' in headers) | |
1002 self.assertEquals('OAuth realm="example.com"', | |
1003 headers['WWW-Authenticate']) | |
1004 | |
1005 def test_no_version(self): | |
1006 url = "http://sp.example.com/" | |
1007 | |
1008 params = { | |
1009 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
1010 'oauth_timestamp': int(time.time()), | |
1011 'bar': 'blerg', | |
1012 'multi': ['FOO','BAR'], | |
1013 'foo': 59 | |
1014 } | |
1015 | |
1016 self.consumer = oauth.Consumer(key="consumer-key", | |
1017 secret="consumer-secret") | |
1018 self.token = oauth.Token(key="token-key", secret="token-secret") | |
1019 | |
1020 params['oauth_token'] = self.token.key | |
1021 params['oauth_consumer_key'] = self.consumer.key | |
1022 self.request = oauth.Request(method="GET", url=url, parameters=params) | |
1023 | |
1024 signature_method = oauth.SignatureMethod_HMAC_SHA1() | |
1025 self.request.sign_request(signature_method, self.consumer, self.token) | |
1026 | |
1027 server = oauth.Server() | |
1028 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) | |
1029 | |
1030 parameters = server.verify_request(self.request, self.consumer, | |
1031 self.token) | |
1032 | |
1033 def test_invalid_version(self): | |
1034 url = "http://sp.example.com/" | |
1035 | |
1036 params = { | |
1037 'oauth_version': '222.9922', | |
1038 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
1039 'oauth_timestamp': int(time.time()), | |
1040 'bar': 'blerg', | |
1041 'multi': ['foo','bar'], | |
1042 'foo': 59 | |
1043 } | |
1044 | |
1045 consumer = oauth.Consumer(key="consumer-key", | |
1046 secret="consumer-secret") | |
1047 token = oauth.Token(key="token-key", secret="token-secret") | |
1048 | |
1049 params['oauth_token'] = token.key | |
1050 params['oauth_consumer_key'] = consumer.key | |
1051 request = oauth.Request(method="GET", url=url, parameters=params) | |
1052 | |
1053 signature_method = oauth.SignatureMethod_HMAC_SHA1() | |
1054 request.sign_request(signature_method, consumer, token) | |
1055 | |
1056 server = oauth.Server() | |
1057 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) | |
1058 | |
1059 self.assertRaises(oauth.Error, server.verify_request, request, consumer, token) | |
1060 | |
1061 def test_invalid_signature_method(self): | |
1062 url = "http://sp.example.com/" | |
1063 | |
1064 params = { | |
1065 'oauth_version': '1.0', | |
1066 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
1067 'oauth_timestamp': int(time.time()), | |
1068 'bar': 'blerg', | |
1069 'multi': ['FOO','BAR'], | |
1070 'foo': 59 | |
1071 } | |
1072 | |
1073 consumer = oauth.Consumer(key="consumer-key", | |
1074 secret="consumer-secret") | |
1075 token = oauth.Token(key="token-key", secret="token-secret") | |
1076 | |
1077 params['oauth_token'] = token.key | |
1078 params['oauth_consumer_key'] = consumer.key | |
1079 request = oauth.Request(method="GET", url=url, parameters=params) | |
1080 | |
1081 signature_method = SignatureMethod_Bad() | |
1082 request.sign_request(signature_method, consumer, token) | |
1083 | |
1084 server = oauth.Server() | |
1085 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) | |
1086 | |
1087 self.assertRaises(oauth.Error, server.verify_request, request, | |
1088 consumer, token) | |
1089 | |
1090 def test_missing_signature(self): | |
1091 url = "http://sp.example.com/" | |
1092 | |
1093 params = { | |
1094 'oauth_version': '1.0', | |
1095 'oauth_nonce': "4572616e48616d6d65724c61686176", | |
1096 'oauth_timestamp': int(time.time()), | |
1097 'bar': 'blerg', | |
1098 'multi': ['FOO','BAR'], | |
1099 'foo': 59 | |
1100 } | |
1101 | |
1102 consumer = oauth.Consumer(key="consumer-key", | |
1103 secret="consumer-secret") | |
1104 token = oauth.Token(key="token-key", secret="token-secret") | |
1105 | |
1106 params['oauth_token'] = token.key | |
1107 params['oauth_consumer_key'] = consumer.key | |
1108 request = oauth.Request(method="GET", url=url, parameters=params) | |
1109 | |
1110 signature_method = oauth.SignatureMethod_HMAC_SHA1() | |
1111 request.sign_request(signature_method, consumer, token) | |
1112 del request['oauth_signature'] | |
1113 | |
1114 server = oauth.Server() | |
1115 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) | |
1116 | |
1117 self.assertRaises(oauth.MissingSignature, server.verify_request, | |
1118 request, consumer, token) | |
1119 | |
1120 | |
1121 # Request Token: http://oauth-sandbox.sevengoslings.net/request_token | |
1122 # Auth: http://oauth-sandbox.sevengoslings.net/authorize | |
1123 # Access Token: http://oauth-sandbox.sevengoslings.net/access_token | |
1124 # Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged | |
1125 # Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged | |
1126 # Key: bd37aed57e15df53 | |
1127 # Secret: 0e9e6413a9ef49510a4f68ed02cd | |
1128 class TestClient(unittest.TestCase): | |
1129 # oauth_uris = { | |
1130 # 'request_token': '/request_token.php', | |
1131 # 'access_token': '/access_token.php' | |
1132 # } | |
1133 | |
1134 oauth_uris = { | |
1135 'request_token': '/request_token', | |
1136 'authorize': '/authorize', | |
1137 'access_token': '/access_token', | |
1138 'two_legged': '/two_legged', | |
1139 'three_legged': '/three_legged' | |
1140 } | |
1141 | |
1142 consumer_key = 'bd37aed57e15df53' | |
1143 consumer_secret = '0e9e6413a9ef49510a4f68ed02cd' | |
1144 host = 'http://oauth-sandbox.sevengoslings.net' | |
1145 | |
1146 def setUp(self): | |
1147 self.consumer = oauth.Consumer(key=self.consumer_key, | |
1148 secret=self.consumer_secret) | |
1149 | |
1150 self.body = { | |
1151 'foo': 'bar', | |
1152 'bar': 'foo', | |
1153 'multi': ['FOO','BAR'], | |
1154 'blah': 599999 | |
1155 } | |
1156 | |
1157 def _uri(self, type): | |
1158 uri = self.oauth_uris.get(type) | |
1159 if uri is None: | |
1160 raise KeyError("%s is not a valid OAuth URI type." % type) | |
1161 | |
1162 return "%s%s" % (self.host, uri) | |
1163 | |
1164 def create_simple_multipart_data(self, data): | |
1165 boundary = '---Boundary-%d' % random.randint(1,1000) | |
1166 crlf = '\r\n' | |
1167 items = [] | |
1168 for key, value in data.iteritems(): | |
1169 items += [ | |
1170 '--'+boundary, | |
1171 'Content-Disposition: form-data; name="%s"'%str(key), | |
1172 '', | |
1173 str(value), | |
1174 ] | |
1175 items += ['', '--'+boundary+'--', ''] | |
1176 content_type = 'multipart/form-data; boundary=%s' % boundary | |
1177 return content_type, crlf.join(items) | |
1178 | |
1179 def test_init(self): | |
1180 class Blah(): | |
1181 pass | |
1182 | |
1183 try: | |
1184 client = oauth.Client(Blah()) | |
1185 self.fail("Client.__init__() accepted invalid Consumer.") | |
1186 except ValueError: | |
1187 pass | |
1188 | |
1189 consumer = oauth.Consumer('token', 'secret') | |
1190 try: | |
1191 client = oauth.Client(consumer, Blah()) | |
1192 self.fail("Client.__init__() accepted invalid Token.") | |
1193 except ValueError: | |
1194 pass | |
1195 | |
1196 def test_access_token_get(self): | |
1197 """Test getting an access token via GET.""" | |
1198 client = oauth.Client(self.consumer, None) | |
1199 resp, content = client.request(self._uri('request_token'), "GET") | |
1200 | |
1201 self.assertEquals(int(resp['status']), 200) | |
1202 | |
1203 def test_access_token_post(self): | |
1204 """Test getting an access token via POST.""" | |
1205 client = oauth.Client(self.consumer, None) | |
1206 resp, content = client.request(self._uri('request_token'), "POST") | |
1207 | |
1208 self.assertEquals(int(resp['status']), 200) | |
1209 | |
1210 res = dict(parse_qsl(content)) | |
1211 self.assertTrue('oauth_token' in res) | |
1212 self.assertTrue('oauth_token_secret' in res) | |
1213 | |
1214 def _two_legged(self, method): | |
1215 client = oauth.Client(self.consumer, None) | |
1216 | |
1217 return client.request(self._uri('two_legged'), method, | |
1218 body=urllib.urlencode(self.body)) | |
1219 | |
1220 def test_two_legged_post(self): | |
1221 """A test of a two-legged OAuth POST request.""" | |
1222 resp, content = self._two_legged("POST") | |
1223 | |
1224 self.assertEquals(int(resp['status']), 200) | |
1225 | |
1226 def test_two_legged_get(self): | |
1227 """A test of a two-legged OAuth GET request.""" | |
1228 resp, content = self._two_legged("GET") | |
1229 self.assertEquals(int(resp['status']), 200) | |
1230 | |
1231 @mock.patch('httplib2.Http.request') | |
1232 def test_multipart_post_does_not_alter_body(self, mockHttpRequest): | |
1233 random_result = random.randint(1,100) | |
1234 | |
1235 data = { | |
1236 'rand-%d'%random.randint(1,100):random.randint(1,100), | |
1237 } | |
1238 content_type, body = self.create_simple_multipart_data(data) | |
1239 | |
1240 client = oauth.Client(self.consumer, None) | |
1241 uri = self._uri('two_legged') | |
1242 | |
1243 def mockrequest(cl, ur, **kw): | |
1244 self.failUnless(cl is client) | |
1245 self.failUnless(ur is uri) | |
1246 self.failUnlessEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers'])) | |
1247 self.failUnlessEqual(kw['body'], body) | |
1248 self.failUnlessEqual(kw['connection_type'], None) | |
1249 self.failUnlessEqual(kw['method'], 'POST') | |
1250 self.failUnlessEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS) | |
1251 self.failUnless(isinstance(kw['headers'], dict)) | |
1252 | |
1253 return random_result | |
1254 | |
1255 mockHttpRequest.side_effect = mockrequest | |
1256 | |
1257 result = client.request(uri, 'POST', headers={'Content-Type':content_type}, body=body) | |
1258 self.assertEqual(result, random_result) | |
1259 | |
1260 @mock.patch('httplib2.Http.request') | |
1261 def test_url_with_query_string(self, mockHttpRequest): | |
1262 uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf' | |
1263 client = oauth.Client(self.consumer, None) | |
1264 random_result = random.randint(1,100) | |
1265 | |
1266 def mockrequest(cl, ur, **kw): | |
1267 self.failUnless(cl is client) | |
1268 self.failUnlessEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers'])) | |
1269 self.failUnlessEqual(kw['body'], '') | |
1270 self.failUnlessEqual(kw['connection_type'], None) | |
1271 self.failUnlessEqual(kw['method'], 'GET') | |
1272 self.failUnlessEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS) | |
1273 self.failUnless(isinstance(kw['headers'], dict)) | |
1274 | |
1275 req = oauth.Request.from_consumer_and_token(self.consumer, None, | |
1276 http_method='GET', http_url=uri, parameters={}) | |
1277 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.consumer, None) | |
1278 expected = parse_qsl(urlparse.urlparse(req.to_url()).query) | |
1279 actual = parse_qsl(urlparse.urlparse(ur).query) | |
1280 self.failUnlessEqual(len(expected), len(actual)) | |
1281 actual = dict(actual) | |
1282 for key, value in expected: | |
1283 if key not in ('oauth_signature', 'oauth_nonce', 'oauth_timestamp'): | |
1284 self.failUnlessEqual(actual[key], value) | |
1285 | |
1286 return random_result | |
1287 | |
1288 mockHttpRequest.side_effect = mockrequest | |
1289 | |
1290 client.request(uri, 'GET') | |
1291 | |
1292 @mock.patch('httplib2.Http.request') | |
1293 @mock.patch('oauth2.Request.from_consumer_and_token') | |
1294 def test_multiple_values_for_a_key(self, mockReqConstructor, mockHttpRequest): | |
1295 client = oauth.Client(self.consumer, None) | |
1296 | |
1297 request = oauth.Request("GET", "http://example.com/fetch.php", parameters={'multi': ['1', '2']}) | |
1298 mockReqConstructor.return_value = request | |
1299 | |
1300 client.request('http://whatever', 'POST', body='multi=1&multi=2') | |
1301 | |
1302 self.failUnlessEqual(mockReqConstructor.call_count, 1) | |
1303 self.failUnlessEqual(mockReqConstructor.call_args[1]['parameters'], {'multi': ['1', '2']}) | |
1304 | |
1305 self.failUnless('multi=1' in mockHttpRequest.call_args[1]['body']) | |
1306 self.failUnless('multi=2' in mockHttpRequest.call_args[1]['body']) | |
1307 | |
1308 if __name__ == "__main__": | |
1309 unittest.main() |