comparison Code/python_oauth2-master/tests/test_oauth.py @ 21:e68dbee1f6db

Modified code New datasets Updated report
author Paulo Chiliguano <p.e.chiilguano@se14.qmul.ac.uk>
date Tue, 11 Aug 2015 10:50:36 +0100
parents
children
comparison
equal deleted inserted replaced
20:1dbd24575d44 21:e68dbee1f6db
1 # -*- coding: utf-8 -*-
2
3 """
4 The MIT License
5
6 Copyright (c) 2009 Vic Fryzel
7
8 Permission is hereby granted, free of charge, to any person obtaining a copy
9 of this software and associated documentation files (the "Software"), to deal
10 in the Software without restriction, including without limitation the rights
11 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12 copies of the Software, and to permit persons to whom the Software is
13 furnished to do so, subject to the following conditions:
14
15 The above copyright notice and this permission notice shall be included in
16 all copies or substantial portions of the Software.
17
18 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 THE SOFTWARE.
25 """
26 import sys
27 import os
28 import unittest
29 import oauth2 as oauth
30 import random
31 import time
32 import urllib
33 import urlparse
34 from types import ListType
35 import mock
36 import httplib2
37
38 # Fix for python2.5 compatibility
39 try:
40 from urlparse import parse_qs, parse_qsl
41 except ImportError:
42 from cgi import parse_qs, parse_qsl
43
44
45 sys.path[0:0] = [os.path.join(os.path.dirname(__file__), ".."),]
46
47
48 class TestError(unittest.TestCase):
49 def test_message(self):
50 try:
51 raise oauth.Error
52 except oauth.Error, e:
53 self.assertEqual(e.message, 'OAuth error occurred.')
54
55 msg = 'OMG THINGS BROKE!!!!'
56 try:
57 raise oauth.Error(msg)
58 except oauth.Error, e:
59 self.assertEqual(e.message, msg)
60
61 def test_str(self):
62 try:
63 raise oauth.Error
64 except oauth.Error, e:
65 self.assertEquals(str(e), 'OAuth error occurred.')
66
67 class TestGenerateFunctions(unittest.TestCase):
68 def test_build_auth_header(self):
69 header = oauth.build_authenticate_header()
70 self.assertEqual(header['WWW-Authenticate'], 'OAuth realm=""')
71 self.assertEqual(len(header), 1)
72 realm = 'http://example.myrealm.com/'
73 header = oauth.build_authenticate_header(realm)
74 self.assertEqual(header['WWW-Authenticate'], 'OAuth realm="%s"' %
75 realm)
76 self.assertEqual(len(header), 1)
77
78 def test_build_xoauth_string(self):
79 consumer = oauth.Consumer('consumer_token', 'consumer_secret')
80 token = oauth.Token('user_token', 'user_secret')
81 url = "https://mail.google.com/mail/b/joe@example.com/imap/"
82 xoauth_string = oauth.build_xoauth_string(url, consumer, token)
83
84 method, oauth_url, oauth_string = xoauth_string.split(' ')
85
86 self.assertEqual("GET", method)
87 self.assertEqual(url, oauth_url)
88
89 returned = {}
90 parts = oauth_string.split(',')
91 for part in parts:
92 var, val = part.split('=')
93 returned[var] = val.strip('"')
94
95 self.assertEquals('HMAC-SHA1', returned['oauth_signature_method'])
96 self.assertEquals('user_token', returned['oauth_token'])
97 self.assertEquals('consumer_token', returned['oauth_consumer_key'])
98 self.assertTrue('oauth_signature' in returned, 'oauth_signature')
99
100 def test_escape(self):
101 string = 'http://whatever.com/~someuser/?test=test&other=other'
102 self.assert_('~' in oauth.escape(string))
103 string = '../../../../../../../etc/passwd'
104 self.assert_('../' not in oauth.escape(string))
105
106 def test_gen_nonce(self):
107 nonce = oauth.generate_nonce()
108 self.assertEqual(len(nonce), 8)
109 nonce = oauth.generate_nonce(20)
110 self.assertEqual(len(nonce), 20)
111
112 def test_gen_verifier(self):
113 verifier = oauth.generate_verifier()
114 self.assertEqual(len(verifier), 8)
115 verifier = oauth.generate_verifier(16)
116 self.assertEqual(len(verifier), 16)
117
118 def test_gen_timestamp(self):
119 exp = int(time.time())
120 now = oauth.generate_timestamp()
121 self.assertEqual(exp, now)
122
123 class TestConsumer(unittest.TestCase):
124 def setUp(self):
125 self.key = 'my-key'
126 self.secret = 'my-secret'
127 self.consumer = oauth.Consumer(key=self.key, secret=self.secret)
128
129 def test_init(self):
130 self.assertEqual(self.consumer.key, self.key)
131 self.assertEqual(self.consumer.secret, self.secret)
132
133 def test_basic(self):
134 self.assertRaises(ValueError, lambda: oauth.Consumer(None, None))
135 self.assertRaises(ValueError, lambda: oauth.Consumer('asf', None))
136 self.assertRaises(ValueError, lambda: oauth.Consumer(None, 'dasf'))
137
138 def test_str(self):
139 res = dict(parse_qsl(str(self.consumer)))
140 self.assertTrue('oauth_consumer_key' in res)
141 self.assertTrue('oauth_consumer_secret' in res)
142 self.assertEquals(res['oauth_consumer_key'], self.consumer.key)
143 self.assertEquals(res['oauth_consumer_secret'], self.consumer.secret)
144
145 class TestToken(unittest.TestCase):
146 def setUp(self):
147 self.key = 'my-key'
148 self.secret = 'my-secret'
149 self.token = oauth.Token(self.key, self.secret)
150
151 def test_basic(self):
152 self.assertRaises(ValueError, lambda: oauth.Token(None, None))
153 self.assertRaises(ValueError, lambda: oauth.Token('asf', None))
154 self.assertRaises(ValueError, lambda: oauth.Token(None, 'dasf'))
155
156 def test_init(self):
157 self.assertEqual(self.token.key, self.key)
158 self.assertEqual(self.token.secret, self.secret)
159 self.assertEqual(self.token.callback, None)
160 self.assertEqual(self.token.callback_confirmed, None)
161 self.assertEqual(self.token.verifier, None)
162
163 def test_set_callback(self):
164 self.assertEqual(self.token.callback, None)
165 self.assertEqual(self.token.callback_confirmed, None)
166 cb = 'http://www.example.com/my-callback'
167 self.token.set_callback(cb)
168 self.assertEqual(self.token.callback, cb)
169 self.assertEqual(self.token.callback_confirmed, 'true')
170 self.token.set_callback(None)
171 self.assertEqual(self.token.callback, None)
172 # TODO: The following test should probably not pass, but it does
173 # To fix this, check for None and unset 'true' in set_callback
174 # Additionally, should a confirmation truly be done of the callback?
175 self.assertEqual(self.token.callback_confirmed, 'true')
176
177 def test_set_verifier(self):
178 self.assertEqual(self.token.verifier, None)
179 v = oauth.generate_verifier()
180 self.token.set_verifier(v)
181 self.assertEqual(self.token.verifier, v)
182 self.token.set_verifier()
183 self.assertNotEqual(self.token.verifier, v)
184 self.token.set_verifier('')
185 self.assertEqual(self.token.verifier, '')
186
187 def test_get_callback_url(self):
188 self.assertEqual(self.token.get_callback_url(), None)
189
190 self.token.set_verifier()
191 self.assertEqual(self.token.get_callback_url(), None)
192
193 cb = 'http://www.example.com/my-callback?save=1&return=true'
194 v = oauth.generate_verifier()
195 self.token.set_callback(cb)
196 self.token.set_verifier(v)
197 url = self.token.get_callback_url()
198 verifier_str = '&oauth_verifier=%s' % v
199 self.assertEqual(url, '%s%s' % (cb, verifier_str))
200
201 cb = 'http://www.example.com/my-callback-no-query'
202 v = oauth.generate_verifier()
203 self.token.set_callback(cb)
204 self.token.set_verifier(v)
205 url = self.token.get_callback_url()
206 verifier_str = '?oauth_verifier=%s' % v
207 self.assertEqual(url, '%s%s' % (cb, verifier_str))
208
209 def test_to_string(self):
210 string = 'oauth_token_secret=%s&oauth_token=%s' % (self.secret,
211 self.key)
212 self.assertEqual(self.token.to_string(), string)
213
214 self.token.set_callback('http://www.example.com/my-callback')
215 string += '&oauth_callback_confirmed=true'
216 self.assertEqual(self.token.to_string(), string)
217
218 def _compare_tokens(self, new):
219 self.assertEqual(self.token.key, new.key)
220 self.assertEqual(self.token.secret, new.secret)
221 # TODO: What about copying the callback to the new token?
222 # self.assertEqual(self.token.callback, new.callback)
223 self.assertEqual(self.token.callback_confirmed,
224 new.callback_confirmed)
225 # TODO: What about copying the verifier to the new token?
226 # self.assertEqual(self.token.verifier, new.verifier)
227
228 def test_to_string(self):
229 tok = oauth.Token('tooken', 'seecret')
230 self.assertEqual(str(tok), 'oauth_token_secret=seecret&oauth_token=tooken')
231
232 def test_from_string(self):
233 self.assertRaises(ValueError, lambda: oauth.Token.from_string(''))
234 self.assertRaises(ValueError, lambda: oauth.Token.from_string('blahblahblah'))
235 self.assertRaises(ValueError, lambda: oauth.Token.from_string('blah=blah'))
236
237 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret=asfdasf'))
238 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret='))
239 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=asfdasf'))
240 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token='))
241 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=&oauth_token_secret='))
242 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=tooken%26oauth_token_secret=seecret'))
243
244 string = self.token.to_string()
245 new = oauth.Token.from_string(string)
246 self._compare_tokens(new)
247
248 self.token.set_callback('http://www.example.com/my-callback')
249 string = self.token.to_string()
250 new = oauth.Token.from_string(string)
251 self._compare_tokens(new)
252
253 class ReallyEqualMixin:
254 def failUnlessReallyEqual(self, a, b, msg=None):
255 self.failUnlessEqual(a, b, msg=msg)
256 self.failUnlessEqual(type(a), type(b), msg="a :: %r, b :: %r, %r" % (a, b, msg))
257
258 class TestFuncs(unittest.TestCase):
259 def test_to_unicode(self):
260 self.failUnlessRaises(TypeError, oauth.to_unicode, '\xae')
261 self.failUnlessRaises(TypeError, oauth.to_unicode_optional_iterator, '\xae')
262 self.failUnlessRaises(TypeError, oauth.to_unicode_optional_iterator, ['\xae'])
263
264 self.failUnlessEqual(oauth.to_unicode(':-)'), u':-)')
265 self.failUnlessEqual(oauth.to_unicode(u'\u00ae'), u'\u00ae')
266 self.failUnlessEqual(oauth.to_unicode('\xc2\xae'), u'\u00ae')
267 self.failUnlessEqual(oauth.to_unicode_optional_iterator([':-)']), [u':-)'])
268 self.failUnlessEqual(oauth.to_unicode_optional_iterator([u'\u00ae']), [u'\u00ae'])
269
270 class TestRequest(unittest.TestCase, ReallyEqualMixin):
271 def test_setter(self):
272 url = "http://example.com"
273 method = "GET"
274 req = oauth.Request(method)
275 self.assertTrue(not hasattr(req, 'url') or req.url is None)
276 self.assertTrue(not hasattr(req, 'normalized_url') or req.normalized_url is None)
277
278 def test_deleter(self):
279 url = "http://example.com"
280 method = "GET"
281 req = oauth.Request(method, url)
282
283 try:
284 del req.url
285 url = req.url
286 self.fail("AttributeError should have been raised on empty url.")
287 except AttributeError:
288 pass
289 except Exception, e:
290 self.fail(str(e))
291
292 def test_url(self):
293 url1 = "http://example.com:80/foo.php"
294 url2 = "https://example.com:443/foo.php"
295 exp1 = "http://example.com/foo.php"
296 exp2 = "https://example.com/foo.php"
297 method = "GET"
298
299 req = oauth.Request(method, url1)
300 self.assertEquals(req.normalized_url, exp1)
301 self.assertEquals(req.url, url1)
302
303 req = oauth.Request(method, url2)
304 self.assertEquals(req.normalized_url, exp2)
305 self.assertEquals(req.url, url2)
306
307 def test_bad_url(self):
308 request = oauth.Request()
309 try:
310 request.url = "ftp://example.com"
311 self.fail("Invalid URL scheme was accepted.")
312 except ValueError:
313 pass
314
315 def test_unset_consumer_and_token(self):
316 consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
317 token = oauth.Token('my_key', 'my_secret')
318 request = oauth.Request("GET", "http://example.com/fetch.php")
319 request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer,
320 token)
321
322 self.assertEquals(consumer.key, request['oauth_consumer_key'])
323 self.assertEquals(token.key, request['oauth_token'])
324
325 def test_no_url_set(self):
326 consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
327 token = oauth.Token('my_key', 'my_secret')
328 request = oauth.Request()
329
330 try:
331 try:
332 request.sign_request(oauth.SignatureMethod_HMAC_SHA1(),
333 consumer, token)
334 except TypeError:
335 self.fail("Signature method didn't check for a normalized URL.")
336 except ValueError:
337 pass
338
339 def test_url_query(self):
340 url = "https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10"
341 normalized_url = urlparse.urlunparse(urlparse.urlparse(url)[:3] + (None, None, None))
342 method = "GET"
343
344 req = oauth.Request(method, url)
345 self.assertEquals(req.url, url)
346 self.assertEquals(req.normalized_url, normalized_url)
347
348 def test_get_parameter(self):
349 url = "http://example.com"
350 method = "GET"
351 params = {'oauth_consumer' : 'asdf'}
352 req = oauth.Request(method, url, parameters=params)
353
354 self.assertEquals(req.get_parameter('oauth_consumer'), 'asdf')
355 self.assertRaises(oauth.Error, req.get_parameter, 'blah')
356
357 def test_get_nonoauth_parameters(self):
358
359 oauth_params = {
360 'oauth_consumer': 'asdfasdfasdf'
361 }
362
363 other_params = {
364 u'foo': u'baz',
365 u'bar': u'foo',
366 u'multi': [u'FOO',u'BAR'],
367 u'uni_utf8': u'\xae',
368 u'uni_unicode': u'\u00ae',
369 u'uni_unicode_2': u'åÅøØ',
370 }
371
372 params = oauth_params
373 params.update(other_params)
374
375 req = oauth.Request("GET", "http://example.com", params)
376 self.assertEquals(other_params, req.get_nonoauth_parameters())
377
378 def test_to_header(self):
379 realm = "http://sp.example.com/"
380
381 params = {
382 'oauth_version': "1.0",
383 'oauth_nonce': "4572616e48616d6d65724c61686176",
384 'oauth_timestamp': "137131200",
385 'oauth_consumer_key': "0685bd9184jfhq22",
386 'oauth_signature_method': "HMAC-SHA1",
387 'oauth_token': "ad180jjd733klru7",
388 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
389 }
390
391 req = oauth.Request("GET", realm, params)
392 header, value = req.to_header(realm).items()[0]
393
394 parts = value.split('OAuth ')
395 vars = parts[1].split(', ')
396 self.assertTrue(len(vars), (len(params) + 1))
397
398 res = {}
399 for v in vars:
400 var, val = v.split('=')
401 res[var] = urllib.unquote(val.strip('"'))
402
403 self.assertEquals(realm, res['realm'])
404 del res['realm']
405
406 self.assertTrue(len(res), len(params))
407
408 for key, val in res.items():
409 self.assertEquals(val, params.get(key))
410
411 def test_to_postdata_nonascii(self):
412 realm = "http://sp.example.com/"
413
414 params = {
415 'nonasciithing': u'q\xbfu\xe9 ,aasp u?..a.s',
416 'oauth_version': "1.0",
417 'oauth_nonce': "4572616e48616d6d65724c61686176",
418 'oauth_timestamp': "137131200",
419 'oauth_consumer_key': "0685bd9184jfhq22",
420 'oauth_signature_method': "HMAC-SHA1",
421 'oauth_token': "ad180jjd733klru7",
422 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
423 }
424
425 req = oauth.Request("GET", realm, params)
426
427 self.failUnlessReallyEqual(req.to_postdata(), 'nonasciithing=q%C2%BFu%C3%A9%20%2Caasp%20u%3F..a.s&oauth_nonce=4572616e48616d6d65724c61686176&oauth_timestamp=137131200&oauth_consumer_key=0685bd9184jfhq22&oauth_signature_method=HMAC-SHA1&oauth_version=1.0&oauth_token=ad180jjd733klru7&oauth_signature=wOJIO9A2W5mFwDgiDvZbTSMK%252FPY%253D')
428
429 def test_to_postdata(self):
430 realm = "http://sp.example.com/"
431
432 params = {
433 'multi': ['FOO','BAR'],
434 'oauth_version': "1.0",
435 'oauth_nonce': "4572616e48616d6d65724c61686176",
436 'oauth_timestamp': "137131200",
437 'oauth_consumer_key': "0685bd9184jfhq22",
438 'oauth_signature_method': "HMAC-SHA1",
439 'oauth_token': "ad180jjd733klru7",
440 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
441 }
442
443 req = oauth.Request("GET", realm, params)
444
445 flat = [('multi','FOO'),('multi','BAR')]
446 del params['multi']
447 flat.extend(params.items())
448 kf = lambda x: x[0]
449 self.assertEquals(sorted(flat, key=kf), sorted(parse_qsl(req.to_postdata()), key=kf))
450
451 def test_to_url(self):
452 url = "http://sp.example.com/"
453
454 params = {
455 'oauth_version': "1.0",
456 'oauth_nonce': "4572616e48616d6d65724c61686176",
457 'oauth_timestamp': "137131200",
458 'oauth_consumer_key': "0685bd9184jfhq22",
459 'oauth_signature_method': "HMAC-SHA1",
460 'oauth_token': "ad180jjd733klru7",
461 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
462 }
463
464 req = oauth.Request("GET", url, params)
465 exp = urlparse.urlparse("%s?%s" % (url, urllib.urlencode(params)))
466 res = urlparse.urlparse(req.to_url())
467 self.assertEquals(exp.scheme, res.scheme)
468 self.assertEquals(exp.netloc, res.netloc)
469 self.assertEquals(exp.path, res.path)
470
471 a = parse_qs(exp.query)
472 b = parse_qs(res.query)
473 self.assertEquals(a, b)
474
475 def test_to_url_with_query(self):
476 url = "https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10"
477
478 params = {
479 'oauth_version': "1.0",
480 'oauth_nonce': "4572616e48616d6d65724c61686176",
481 'oauth_timestamp': "137131200",
482 'oauth_consumer_key': "0685bd9184jfhq22",
483 'oauth_signature_method': "HMAC-SHA1",
484 'oauth_token': "ad180jjd733klru7",
485 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
486 }
487
488 req = oauth.Request("GET", url, params)
489 # Note: the url above already has query parameters, so append new ones with &
490 exp = urlparse.urlparse("%s&%s" % (url, urllib.urlencode(params)))
491 res = urlparse.urlparse(req.to_url())
492 self.assertEquals(exp.scheme, res.scheme)
493 self.assertEquals(exp.netloc, res.netloc)
494 self.assertEquals(exp.path, res.path)
495
496 a = parse_qs(exp.query)
497 b = parse_qs(res.query)
498 self.assertTrue('alt' in b)
499 self.assertTrue('max-contacts' in b)
500 self.assertEquals(b['alt'], ['json'])
501 self.assertEquals(b['max-contacts'], ['10'])
502 self.assertEquals(a, b)
503
504 def test_signature_base_string_nonascii_nonutf8(self):
505 consumer = oauth.Consumer('consumer_token', 'consumer_secret')
506
507 url = u'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc\u2766,+CA'
508 req = oauth.Request("GET", url)
509 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json')
510 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
511 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=')
512
513 url = 'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc\xe2\x9d\xa6,+CA'
514 req = oauth.Request("GET", url)
515 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json')
516 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
517 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=')
518
519 url = 'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA'
520 req = oauth.Request("GET", url)
521 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json')
522 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
523 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=')
524
525 url = u'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA'
526 req = oauth.Request("GET", url)
527 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json')
528 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
529 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=')
530
531 def test_signature_base_string_with_query(self):
532 url = "https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10"
533 params = {
534 'oauth_version': "1.0",
535 'oauth_nonce': "4572616e48616d6d65724c61686176",
536 'oauth_timestamp': "137131200",
537 'oauth_consumer_key': "0685bd9184jfhq22",
538 'oauth_signature_method': "HMAC-SHA1",
539 'oauth_token': "ad180jjd733klru7",
540 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
541 }
542 req = oauth.Request("GET", url, params)
543 self.assertEquals(req.normalized_url, 'https://www.google.com/m8/feeds/contacts/default/full/')
544 self.assertEquals(req.url, 'https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10')
545 normalized_params = parse_qsl(req.get_normalized_parameters())
546 self.assertTrue(len(normalized_params), len(params) + 2)
547 normalized_params = dict(normalized_params)
548 for key, value in params.iteritems():
549 if key == 'oauth_signature':
550 continue
551 self.assertEquals(value, normalized_params[key])
552 self.assertEquals(normalized_params['alt'], 'json')
553 self.assertEquals(normalized_params['max-contacts'], '10')
554
555 def test_get_normalized_parameters_empty(self):
556 url = "http://sp.example.com/?empty="
557
558 req = oauth.Request("GET", url)
559
560 res = req.get_normalized_parameters()
561
562 expected='empty='
563
564 self.assertEquals(expected, res)
565
566 def test_get_normalized_parameters_duplicate(self):
567 url = "http://example.com/v2/search/videos?oauth_nonce=79815175&oauth_timestamp=1295397962&oauth_consumer_key=mykey&oauth_signature_method=HMAC-SHA1&q=car&oauth_version=1.0&offset=10&oauth_signature=spWLI%2FGQjid7sQVd5%2FarahRxzJg%3D"
568
569 req = oauth.Request("GET", url)
570
571 res = req.get_normalized_parameters()
572
573 expected='oauth_consumer_key=mykey&oauth_nonce=79815175&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1295397962&oauth_version=1.0&offset=10&q=car'
574
575 self.assertEquals(expected, res)
576
577 def test_get_normalized_parameters_from_url(self):
578 # example copied from
579 # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
580 # which in turns says that it was copied from
581 # http://oauth.net/core/1.0/#sig_base_example .
582 url = "http://photos.example.net/photos?file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk&oauth_version=1.0&size=original"
583
584 req = oauth.Request("GET", url)
585
586 res = req.get_normalized_parameters()
587
588 expected = 'file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk&oauth_version=1.0&size=original'
589
590 self.assertEquals(expected, res)
591
592 def test_signing_base(self):
593 # example copied from
594 # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
595 # which in turns says that it was copied from
596 # http://oauth.net/core/1.0/#sig_base_example .
597 url = "http://photos.example.net/photos?file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk&oauth_version=1.0&size=original"
598
599 req = oauth.Request("GET", url)
600
601 sm = oauth.SignatureMethod_HMAC_SHA1()
602
603 consumer = oauth.Consumer('dpf43f3p2l4k3l03', 'foo')
604 key, raw = sm.signing_base(req, consumer, None)
605
606 expected = 'GET&http%3A%2F%2Fphotos.example.net%2Fphotos&file%3Dvacation.jpg%26oauth_consumer_key%3Ddpf43f3p2l4k3l03%26oauth_nonce%3Dkllo9940pd9333jh%26oauth_signature_method%3DHMAC-SHA1%26oauth_timestamp%3D1191242096%26oauth_token%3Dnnch734d00sl2jdk%26oauth_version%3D1.0%26size%3Doriginal'
607 self.assertEquals(expected, raw)
608
609 def test_get_normalized_parameters(self):
610 url = "http://sp.example.com/"
611
612 params = {
613 'oauth_version': "1.0",
614 'oauth_nonce': "4572616e48616d6d65724c61686176",
615 'oauth_timestamp': "137131200",
616 'oauth_consumer_key': "0685bd9184jfhq22",
617 'oauth_signature_method': "HMAC-SHA1",
618 'oauth_token': "ad180jjd733klru7",
619 'multi': ['FOO','BAR', u'\u00ae', '\xc2\xae'],
620 'multi_same': ['FOO','FOO'],
621 'uni_utf8_bytes': '\xc2\xae',
622 'uni_unicode_object': u'\u00ae'
623 }
624
625 req = oauth.Request("GET", url, params)
626
627 res = req.get_normalized_parameters()
628
629 expected='multi=BAR&multi=FOO&multi=%C2%AE&multi=%C2%AE&multi_same=FOO&multi_same=FOO&oauth_consumer_key=0685bd9184jfhq22&oauth_nonce=4572616e48616d6d65724c61686176&oauth_signature_method=HMAC-SHA1&oauth_timestamp=137131200&oauth_token=ad180jjd733klru7&oauth_version=1.0&uni_unicode_object=%C2%AE&uni_utf8_bytes=%C2%AE'
630
631 self.assertEquals(expected, res)
632
633 def test_get_normalized_parameters_ignores_auth_signature(self):
634 url = "http://sp.example.com/"
635
636 params = {
637 'oauth_version': "1.0",
638 'oauth_nonce': "4572616e48616d6d65724c61686176",
639 'oauth_timestamp': "137131200",
640 'oauth_consumer_key': "0685bd9184jfhq22",
641 'oauth_signature_method': "HMAC-SHA1",
642 'oauth_signature': "some-random-signature-%d" % random.randint(1000, 2000),
643 'oauth_token': "ad180jjd733klru7",
644 }
645
646 req = oauth.Request("GET", url, params)
647
648 res = req.get_normalized_parameters()
649
650 self.assertNotEquals(urllib.urlencode(sorted(params.items())), res)
651
652 foo = params.copy()
653 del foo["oauth_signature"]
654 self.assertEqual(urllib.urlencode(sorted(foo.items())), res)
655
656 def test_set_signature_method(self):
657 consumer = oauth.Consumer('key', 'secret')
658 client = oauth.Client(consumer)
659
660 class Blah:
661 pass
662
663 try:
664 client.set_signature_method(Blah())
665 self.fail("Client.set_signature_method() accepted invalid method.")
666 except ValueError:
667 pass
668
669 m = oauth.SignatureMethod_HMAC_SHA1()
670 client.set_signature_method(m)
671 self.assertEquals(m, client.method)
672
673 def test_get_normalized_string_escapes_spaces_properly(self):
674 url = "http://sp.example.com/"
675 params = {
676 "some_random_data": random.randint(100, 1000),
677 "data": "This data with a random number (%d) has spaces!" % random.randint(1000, 2000),
678 }
679
680 req = oauth.Request("GET", url, params)
681 res = req.get_normalized_parameters()
682 expected = urllib.urlencode(sorted(params.items())).replace('+', '%20')
683 self.assertEqual(expected, res)
684
685 @mock.patch('oauth2.Request.make_timestamp')
686 @mock.patch('oauth2.Request.make_nonce')
687 def test_request_nonutf8_bytes(self, mock_make_nonce, mock_make_timestamp):
688 mock_make_nonce.return_value = 5
689 mock_make_timestamp.return_value = 6
690
691 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
692 con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
693 params = {
694 'oauth_version': "1.0",
695 'oauth_nonce': "4572616e48616d6d65724c61686176",
696 'oauth_timestamp': "137131200",
697 'oauth_token': tok.key,
698 'oauth_consumer_key': con.key
699 }
700
701 # If someone passes a sequence of bytes which is not ascii for
702 # url, we'll raise an exception as early as possible.
703 url = "http://sp.example.com/\x92" # It's actually cp1252-encoding...
704 self.assertRaises(TypeError, oauth.Request, method="GET", url=url, parameters=params)
705
706 # And if they pass an unicode, then we'll use it.
707 url = u'http://sp.example.com/\u2019'
708 req = oauth.Request(method="GET", url=url, parameters=params)
709 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
710 self.failUnlessReallyEqual(req['oauth_signature'], 'cMzvCkhvLL57+sTIxLITTHfkqZk=')
711
712 # And if it is a utf-8-encoded-then-percent-encoded non-ascii
713 # thing, we'll decode it and use it.
714 url = "http://sp.example.com/%E2%80%99"
715 req = oauth.Request(method="GET", url=url, parameters=params)
716 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
717 self.failUnlessReallyEqual(req['oauth_signature'], 'yMLKOyNKC/DkyhUOb8DLSvceEWE=')
718
719 # Same thing with the params.
720 url = "http://sp.example.com/"
721
722 # If someone passes a sequence of bytes which is not ascii in
723 # params, we'll raise an exception as early as possible.
724 params['non_oauth_thing'] = '\xae', # It's actually cp1252-encoding...
725 self.assertRaises(TypeError, oauth.Request, method="GET", url=url, parameters=params)
726
727 # And if they pass a unicode, then we'll use it.
728 params['non_oauth_thing'] = u'\u2019'
729 req = oauth.Request(method="GET", url=url, parameters=params)
730 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
731 self.failUnlessReallyEqual(req['oauth_signature'], '0GU50m0v60CVDB5JnoBXnvvvKx4=')
732
733 # And if it is a utf-8-encoded non-ascii thing, we'll decode
734 # it and use it.
735 params['non_oauth_thing'] = '\xc2\xae'
736 req = oauth.Request(method="GET", url=url, parameters=params)
737 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
738 self.failUnlessReallyEqual(req['oauth_signature'], 'pqOCu4qvRTiGiXB8Z61Jsey0pMM=')
739
740
741 # Also if there are non-utf8 bytes in the query args.
742 url = "http://sp.example.com/?q=\x92" # cp1252
743 self.assertRaises(TypeError, oauth.Request, method="GET", url=url, parameters=params)
744
745 def test_request_hash_of_body(self):
746 tok = oauth.Token(key="token", secret="tok-test-secret")
747 con = oauth.Consumer(key="consumer", secret="con-test-secret")
748
749 # Example 1a from Appendix A.1 of
750 # http://oauth.googlecode.com/svn/spec/ext/body_hash/1.0/oauth-bodyhash.html
751 # Except that we get a differetn result than they do.
752
753 params = {
754 'oauth_version': "1.0",
755 'oauth_token': tok.key,
756 'oauth_nonce': 10288510250934,
757 'oauth_timestamp': 1236874155,
758 'oauth_consumer_key': con.key
759 }
760
761 url = u"http://www.example.com/resource"
762 req = oauth.Request(method="PUT", url=url, parameters=params, body="Hello World!", is_form_encoded=False)
763 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
764 self.failUnlessReallyEqual(req['oauth_body_hash'], 'Lve95gjOVATpfV8EL5X4nxwjKHE=')
765 self.failUnlessReallyEqual(req['oauth_signature'], 't+MX8l/0S8hdbVQL99nD0X1fPnM=')
766 # oauth-bodyhash.html A.1 has
767 # '08bUFF%2Fjmp59mWB7cSgCYBUpJ0U%3D', but I don't see how that
768 # is possible.
769
770 # Example 1b
771 params = {
772 'oauth_version': "1.0",
773 'oauth_token': tok.key,
774 'oauth_nonce': 10369470270925,
775 'oauth_timestamp': 1236874236,
776 'oauth_consumer_key': con.key
777 }
778
779 req = oauth.Request(method="PUT", url=url, parameters=params, body="Hello World!", is_form_encoded=False)
780 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
781 self.failUnlessReallyEqual(req['oauth_body_hash'], 'Lve95gjOVATpfV8EL5X4nxwjKHE=')
782 self.failUnlessReallyEqual(req['oauth_signature'], 'CTFmrqJIGT7NsWJ42OrujahTtTc=')
783
784 # Appendix A.2
785 params = {
786 'oauth_version': "1.0",
787 'oauth_token': tok.key,
788 'oauth_nonce': 8628868109991,
789 'oauth_timestamp': 1238395022,
790 'oauth_consumer_key': con.key
791 }
792
793 req = oauth.Request(method="GET", url=url, parameters=params, is_form_encoded=False)
794 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
795 self.failUnlessReallyEqual(req['oauth_body_hash'], '2jmj7l5rSw0yVb/vlWAYkK/YBwk=')
796 self.failUnlessReallyEqual(req['oauth_signature'], 'Zhl++aWSP0O3/hYQ0CuBc7jv38I=')
797
798
799 def test_sign_request(self):
800 url = "http://sp.example.com/"
801
802 params = {
803 'oauth_version': "1.0",
804 'oauth_nonce': "4572616e48616d6d65724c61686176",
805 'oauth_timestamp': "137131200"
806 }
807
808 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
809 con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
810
811 params['oauth_token'] = tok.key
812 params['oauth_consumer_key'] = con.key
813 req = oauth.Request(method="GET", url=url, parameters=params)
814
815 methods = {
816 'DX01TdHws7OninCLK9VztNTH1M4=': oauth.SignatureMethod_HMAC_SHA1(),
817 'con-test-secret&tok-test-secret': oauth.SignatureMethod_PLAINTEXT()
818 }
819
820 for exp, method in methods.items():
821 req.sign_request(method, con, tok)
822 self.assertEquals(req['oauth_signature_method'], method.name)
823 self.assertEquals(req['oauth_signature'], exp)
824
825 # Also if there are non-ascii chars in the URL.
826 url = "http://sp.example.com/\xe2\x80\x99" # utf-8 bytes
827 req = oauth.Request(method="GET", url=url, parameters=params)
828 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok)
829 self.assertEquals(req['oauth_signature'], 'loFvp5xC7YbOgd9exIO6TxB7H4s=')
830
831 url = u'http://sp.example.com/\u2019' # Python unicode object
832 req = oauth.Request(method="GET", url=url, parameters=params)
833 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok)
834 self.assertEquals(req['oauth_signature'], 'loFvp5xC7YbOgd9exIO6TxB7H4s=')
835
836 # Also if there are non-ascii chars in the query args.
837 url = "http://sp.example.com/?q=\xe2\x80\x99" # utf-8 bytes
838 req = oauth.Request(method="GET", url=url, parameters=params)
839 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok)
840 self.assertEquals(req['oauth_signature'], 'IBw5mfvoCsDjgpcsVKbyvsDqQaU=')
841
842 url = u'http://sp.example.com/?q=\u2019' # Python unicode object
843 req = oauth.Request(method="GET", url=url, parameters=params)
844 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok)
845 self.assertEquals(req['oauth_signature'], 'IBw5mfvoCsDjgpcsVKbyvsDqQaU=')
846
847 def test_from_request(self):
848 url = "http://sp.example.com/"
849
850 params = {
851 'oauth_version': "1.0",
852 'oauth_nonce': "4572616e48616d6d65724c61686176",
853 'oauth_timestamp': "137131200",
854 'oauth_consumer_key': "0685bd9184jfhq22",
855 'oauth_signature_method': "HMAC-SHA1",
856 'oauth_token': "ad180jjd733klru7",
857 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
858 }
859
860 req = oauth.Request("GET", url, params)
861 headers = req.to_header()
862
863 # Test from the headers
864 req = oauth.Request.from_request("GET", url, headers)
865 self.assertEquals(req.method, "GET")
866 self.assertEquals(req.url, url)
867
868 self.assertEquals(params, req.copy())
869
870 # Test with bad OAuth headers
871 bad_headers = {
872 'Authorization' : 'OAuth this is a bad header'
873 }
874
875 self.assertRaises(oauth.Error, oauth.Request.from_request, "GET",
876 url, bad_headers)
877
878 # Test getting from query string
879 qs = urllib.urlencode(params)
880 req = oauth.Request.from_request("GET", url, query_string=qs)
881
882 exp = parse_qs(qs, keep_blank_values=False)
883 for k, v in exp.iteritems():
884 exp[k] = urllib.unquote(v[0])
885
886 self.assertEquals(exp, req.copy())
887
888 # Test that a boned from_request() call returns None
889 req = oauth.Request.from_request("GET", url)
890 self.assertEquals(None, req)
891
892 def test_from_token_and_callback(self):
893 url = "http://sp.example.com/"
894
895 params = {
896 'oauth_version': "1.0",
897 'oauth_nonce': "4572616e48616d6d65724c61686176",
898 'oauth_timestamp': "137131200",
899 'oauth_consumer_key': "0685bd9184jfhq22",
900 'oauth_signature_method': "HMAC-SHA1",
901 'oauth_token': "ad180jjd733klru7",
902 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
903 }
904
905 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
906 req = oauth.Request.from_token_and_callback(tok)
907 self.assertFalse('oauth_callback' in req)
908 self.assertEquals(req['oauth_token'], tok.key)
909
910 req = oauth.Request.from_token_and_callback(tok, callback=url)
911 self.assertTrue('oauth_callback' in req)
912 self.assertEquals(req['oauth_callback'], url)
913
914 def test_from_consumer_and_token(self):
915 url = "http://sp.example.com/"
916
917 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
918 tok.set_verifier('this_is_a_test_verifier')
919 con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
920 req = oauth.Request.from_consumer_and_token(con, token=tok,
921 http_method="GET", http_url=url)
922
923 self.assertEquals(req['oauth_token'], tok.key)
924 self.assertEquals(req['oauth_consumer_key'], con.key)
925 self.assertEquals(tok.verifier, req['oauth_verifier'])
926
927 class SignatureMethod_Bad(oauth.SignatureMethod):
928 name = "BAD"
929
930 def signing_base(self, request, consumer, token):
931 return ""
932
933 def sign(self, request, consumer, token):
934 return "invalid-signature"
935
936
937 class TestServer(unittest.TestCase):
938 def setUp(self):
939 url = "http://sp.example.com/"
940
941 params = {
942 'oauth_version': "1.0",
943 'oauth_nonce': "4572616e48616d6d65724c61686176",
944 'oauth_timestamp': int(time.time()),
945 'bar': 'blerg',
946 'multi': ['FOO','BAR'],
947 'foo': 59
948 }
949
950 self.consumer = oauth.Consumer(key="consumer-key",
951 secret="consumer-secret")
952 self.token = oauth.Token(key="token-key", secret="token-secret")
953
954 params['oauth_token'] = self.token.key
955 params['oauth_consumer_key'] = self.consumer.key
956 self.request = oauth.Request(method="GET", url=url, parameters=params)
957
958 signature_method = oauth.SignatureMethod_HMAC_SHA1()
959 self.request.sign_request(signature_method, self.consumer, self.token)
960
961 def test_init(self):
962 server = oauth.Server(signature_methods={'HMAC-SHA1' : oauth.SignatureMethod_HMAC_SHA1()})
963 self.assertTrue('HMAC-SHA1' in server.signature_methods)
964 self.assertTrue(isinstance(server.signature_methods['HMAC-SHA1'],
965 oauth.SignatureMethod_HMAC_SHA1))
966
967 server = oauth.Server()
968 self.assertEquals(server.signature_methods, {})
969
970 def test_add_signature_method(self):
971 server = oauth.Server()
972 res = server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
973 self.assertTrue(len(res) == 1)
974 self.assertTrue('HMAC-SHA1' in res)
975 self.assertTrue(isinstance(res['HMAC-SHA1'],
976 oauth.SignatureMethod_HMAC_SHA1))
977
978 res = server.add_signature_method(oauth.SignatureMethod_PLAINTEXT())
979 self.assertTrue(len(res) == 2)
980 self.assertTrue('PLAINTEXT' in res)
981 self.assertTrue(isinstance(res['PLAINTEXT'],
982 oauth.SignatureMethod_PLAINTEXT))
983
984 def test_verify_request(self):
985 server = oauth.Server()
986 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
987
988 parameters = server.verify_request(self.request, self.consumer,
989 self.token)
990
991 self.assertTrue('bar' in parameters)
992 self.assertTrue('foo' in parameters)
993 self.assertTrue('multi' in parameters)
994 self.assertEquals(parameters['bar'], 'blerg')
995 self.assertEquals(parameters['foo'], 59)
996 self.assertEquals(parameters['multi'], ['FOO','BAR'])
997
998 def test_build_authenticate_header(self):
999 server = oauth.Server()
1000 headers = server.build_authenticate_header('example.com')
1001 self.assertTrue('WWW-Authenticate' in headers)
1002 self.assertEquals('OAuth realm="example.com"',
1003 headers['WWW-Authenticate'])
1004
1005 def test_no_version(self):
1006 url = "http://sp.example.com/"
1007
1008 params = {
1009 'oauth_nonce': "4572616e48616d6d65724c61686176",
1010 'oauth_timestamp': int(time.time()),
1011 'bar': 'blerg',
1012 'multi': ['FOO','BAR'],
1013 'foo': 59
1014 }
1015
1016 self.consumer = oauth.Consumer(key="consumer-key",
1017 secret="consumer-secret")
1018 self.token = oauth.Token(key="token-key", secret="token-secret")
1019
1020 params['oauth_token'] = self.token.key
1021 params['oauth_consumer_key'] = self.consumer.key
1022 self.request = oauth.Request(method="GET", url=url, parameters=params)
1023
1024 signature_method = oauth.SignatureMethod_HMAC_SHA1()
1025 self.request.sign_request(signature_method, self.consumer, self.token)
1026
1027 server = oauth.Server()
1028 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
1029
1030 parameters = server.verify_request(self.request, self.consumer,
1031 self.token)
1032
1033 def test_invalid_version(self):
1034 url = "http://sp.example.com/"
1035
1036 params = {
1037 'oauth_version': '222.9922',
1038 'oauth_nonce': "4572616e48616d6d65724c61686176",
1039 'oauth_timestamp': int(time.time()),
1040 'bar': 'blerg',
1041 'multi': ['foo','bar'],
1042 'foo': 59
1043 }
1044
1045 consumer = oauth.Consumer(key="consumer-key",
1046 secret="consumer-secret")
1047 token = oauth.Token(key="token-key", secret="token-secret")
1048
1049 params['oauth_token'] = token.key
1050 params['oauth_consumer_key'] = consumer.key
1051 request = oauth.Request(method="GET", url=url, parameters=params)
1052
1053 signature_method = oauth.SignatureMethod_HMAC_SHA1()
1054 request.sign_request(signature_method, consumer, token)
1055
1056 server = oauth.Server()
1057 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
1058
1059 self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
1060
1061 def test_invalid_signature_method(self):
1062 url = "http://sp.example.com/"
1063
1064 params = {
1065 'oauth_version': '1.0',
1066 'oauth_nonce': "4572616e48616d6d65724c61686176",
1067 'oauth_timestamp': int(time.time()),
1068 'bar': 'blerg',
1069 'multi': ['FOO','BAR'],
1070 'foo': 59
1071 }
1072
1073 consumer = oauth.Consumer(key="consumer-key",
1074 secret="consumer-secret")
1075 token = oauth.Token(key="token-key", secret="token-secret")
1076
1077 params['oauth_token'] = token.key
1078 params['oauth_consumer_key'] = consumer.key
1079 request = oauth.Request(method="GET", url=url, parameters=params)
1080
1081 signature_method = SignatureMethod_Bad()
1082 request.sign_request(signature_method, consumer, token)
1083
1084 server = oauth.Server()
1085 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
1086
1087 self.assertRaises(oauth.Error, server.verify_request, request,
1088 consumer, token)
1089
1090 def test_missing_signature(self):
1091 url = "http://sp.example.com/"
1092
1093 params = {
1094 'oauth_version': '1.0',
1095 'oauth_nonce': "4572616e48616d6d65724c61686176",
1096 'oauth_timestamp': int(time.time()),
1097 'bar': 'blerg',
1098 'multi': ['FOO','BAR'],
1099 'foo': 59
1100 }
1101
1102 consumer = oauth.Consumer(key="consumer-key",
1103 secret="consumer-secret")
1104 token = oauth.Token(key="token-key", secret="token-secret")
1105
1106 params['oauth_token'] = token.key
1107 params['oauth_consumer_key'] = consumer.key
1108 request = oauth.Request(method="GET", url=url, parameters=params)
1109
1110 signature_method = oauth.SignatureMethod_HMAC_SHA1()
1111 request.sign_request(signature_method, consumer, token)
1112 del request['oauth_signature']
1113
1114 server = oauth.Server()
1115 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
1116
1117 self.assertRaises(oauth.MissingSignature, server.verify_request,
1118 request, consumer, token)
1119
1120
1121 # Request Token: http://oauth-sandbox.sevengoslings.net/request_token
1122 # Auth: http://oauth-sandbox.sevengoslings.net/authorize
1123 # Access Token: http://oauth-sandbox.sevengoslings.net/access_token
1124 # Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged
1125 # Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged
1126 # Key: bd37aed57e15df53
1127 # Secret: 0e9e6413a9ef49510a4f68ed02cd
1128 class TestClient(unittest.TestCase):
1129 # oauth_uris = {
1130 # 'request_token': '/request_token.php',
1131 # 'access_token': '/access_token.php'
1132 # }
1133
1134 oauth_uris = {
1135 'request_token': '/request_token',
1136 'authorize': '/authorize',
1137 'access_token': '/access_token',
1138 'two_legged': '/two_legged',
1139 'three_legged': '/three_legged'
1140 }
1141
1142 consumer_key = 'bd37aed57e15df53'
1143 consumer_secret = '0e9e6413a9ef49510a4f68ed02cd'
1144 host = 'http://oauth-sandbox.sevengoslings.net'
1145
1146 def setUp(self):
1147 self.consumer = oauth.Consumer(key=self.consumer_key,
1148 secret=self.consumer_secret)
1149
1150 self.body = {
1151 'foo': 'bar',
1152 'bar': 'foo',
1153 'multi': ['FOO','BAR'],
1154 'blah': 599999
1155 }
1156
1157 def _uri(self, type):
1158 uri = self.oauth_uris.get(type)
1159 if uri is None:
1160 raise KeyError("%s is not a valid OAuth URI type." % type)
1161
1162 return "%s%s" % (self.host, uri)
1163
1164 def create_simple_multipart_data(self, data):
1165 boundary = '---Boundary-%d' % random.randint(1,1000)
1166 crlf = '\r\n'
1167 items = []
1168 for key, value in data.iteritems():
1169 items += [
1170 '--'+boundary,
1171 'Content-Disposition: form-data; name="%s"'%str(key),
1172 '',
1173 str(value),
1174 ]
1175 items += ['', '--'+boundary+'--', '']
1176 content_type = 'multipart/form-data; boundary=%s' % boundary
1177 return content_type, crlf.join(items)
1178
1179 def test_init(self):
1180 class Blah():
1181 pass
1182
1183 try:
1184 client = oauth.Client(Blah())
1185 self.fail("Client.__init__() accepted invalid Consumer.")
1186 except ValueError:
1187 pass
1188
1189 consumer = oauth.Consumer('token', 'secret')
1190 try:
1191 client = oauth.Client(consumer, Blah())
1192 self.fail("Client.__init__() accepted invalid Token.")
1193 except ValueError:
1194 pass
1195
1196 def test_access_token_get(self):
1197 """Test getting an access token via GET."""
1198 client = oauth.Client(self.consumer, None)
1199 resp, content = client.request(self._uri('request_token'), "GET")
1200
1201 self.assertEquals(int(resp['status']), 200)
1202
1203 def test_access_token_post(self):
1204 """Test getting an access token via POST."""
1205 client = oauth.Client(self.consumer, None)
1206 resp, content = client.request(self._uri('request_token'), "POST")
1207
1208 self.assertEquals(int(resp['status']), 200)
1209
1210 res = dict(parse_qsl(content))
1211 self.assertTrue('oauth_token' in res)
1212 self.assertTrue('oauth_token_secret' in res)
1213
1214 def _two_legged(self, method):
1215 client = oauth.Client(self.consumer, None)
1216
1217 return client.request(self._uri('two_legged'), method,
1218 body=urllib.urlencode(self.body))
1219
1220 def test_two_legged_post(self):
1221 """A test of a two-legged OAuth POST request."""
1222 resp, content = self._two_legged("POST")
1223
1224 self.assertEquals(int(resp['status']), 200)
1225
1226 def test_two_legged_get(self):
1227 """A test of a two-legged OAuth GET request."""
1228 resp, content = self._two_legged("GET")
1229 self.assertEquals(int(resp['status']), 200)
1230
1231 @mock.patch('httplib2.Http.request')
1232 def test_multipart_post_does_not_alter_body(self, mockHttpRequest):
1233 random_result = random.randint(1,100)
1234
1235 data = {
1236 'rand-%d'%random.randint(1,100):random.randint(1,100),
1237 }
1238 content_type, body = self.create_simple_multipart_data(data)
1239
1240 client = oauth.Client(self.consumer, None)
1241 uri = self._uri('two_legged')
1242
1243 def mockrequest(cl, ur, **kw):
1244 self.failUnless(cl is client)
1245 self.failUnless(ur is uri)
1246 self.failUnlessEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers']))
1247 self.failUnlessEqual(kw['body'], body)
1248 self.failUnlessEqual(kw['connection_type'], None)
1249 self.failUnlessEqual(kw['method'], 'POST')
1250 self.failUnlessEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS)
1251 self.failUnless(isinstance(kw['headers'], dict))
1252
1253 return random_result
1254
1255 mockHttpRequest.side_effect = mockrequest
1256
1257 result = client.request(uri, 'POST', headers={'Content-Type':content_type}, body=body)
1258 self.assertEqual(result, random_result)
1259
1260 @mock.patch('httplib2.Http.request')
1261 def test_url_with_query_string(self, mockHttpRequest):
1262 uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf'
1263 client = oauth.Client(self.consumer, None)
1264 random_result = random.randint(1,100)
1265
1266 def mockrequest(cl, ur, **kw):
1267 self.failUnless(cl is client)
1268 self.failUnlessEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers']))
1269 self.failUnlessEqual(kw['body'], '')
1270 self.failUnlessEqual(kw['connection_type'], None)
1271 self.failUnlessEqual(kw['method'], 'GET')
1272 self.failUnlessEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS)
1273 self.failUnless(isinstance(kw['headers'], dict))
1274
1275 req = oauth.Request.from_consumer_and_token(self.consumer, None,
1276 http_method='GET', http_url=uri, parameters={})
1277 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.consumer, None)
1278 expected = parse_qsl(urlparse.urlparse(req.to_url()).query)
1279 actual = parse_qsl(urlparse.urlparse(ur).query)
1280 self.failUnlessEqual(len(expected), len(actual))
1281 actual = dict(actual)
1282 for key, value in expected:
1283 if key not in ('oauth_signature', 'oauth_nonce', 'oauth_timestamp'):
1284 self.failUnlessEqual(actual[key], value)
1285
1286 return random_result
1287
1288 mockHttpRequest.side_effect = mockrequest
1289
1290 client.request(uri, 'GET')
1291
1292 @mock.patch('httplib2.Http.request')
1293 @mock.patch('oauth2.Request.from_consumer_and_token')
1294 def test_multiple_values_for_a_key(self, mockReqConstructor, mockHttpRequest):
1295 client = oauth.Client(self.consumer, None)
1296
1297 request = oauth.Request("GET", "http://example.com/fetch.php", parameters={'multi': ['1', '2']})
1298 mockReqConstructor.return_value = request
1299
1300 client.request('http://whatever', 'POST', body='multi=1&multi=2')
1301
1302 self.failUnlessEqual(mockReqConstructor.call_count, 1)
1303 self.failUnlessEqual(mockReqConstructor.call_args[1]['parameters'], {'multi': ['1', '2']})
1304
1305 self.failUnless('multi=1' in mockHttpRequest.call_args[1]['body'])
1306 self.failUnless('multi=2' in mockHttpRequest.call_args[1]['body'])
1307
1308 if __name__ == "__main__":
1309 unittest.main()