annotate Code/python_oauth2-master/build/lib/tests/test_oauth.py @ 47:b0186d4a4496 tip

Move 7Digital dataset to Downloads
author Paulo Chiliguano <p.e.chiliguano@se14.qmul.ac.uk>
date Sat, 09 Jul 2022 00:50:43 -0500
parents e68dbee1f6db
children
rev   line source
p@21 1 # -*- coding: utf-8 -*-
p@21 2
p@21 3 """
p@21 4 The MIT License
p@21 5
p@21 6 Copyright (c) 2009 Vic Fryzel
p@21 7
p@21 8 Permission is hereby granted, free of charge, to any person obtaining a copy
p@21 9 of this software and associated documentation files (the "Software"), to deal
p@21 10 in the Software without restriction, including without limitation the rights
p@21 11 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
p@21 12 copies of the Software, and to permit persons to whom the Software is
p@21 13 furnished to do so, subject to the following conditions:
p@21 14
p@21 15 The above copyright notice and this permission notice shall be included in
p@21 16 all copies or substantial portions of the Software.
p@21 17
p@21 18 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
p@21 19 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
p@21 20 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
p@21 21 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
p@21 22 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
p@21 23 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
p@21 24 THE SOFTWARE.
p@21 25 """
p@21 26 import sys
p@21 27 import os
p@21 28 import unittest
p@21 29 import oauth2 as oauth
p@21 30 import random
p@21 31 import time
p@21 32 import urllib
p@21 33 import urlparse
p@21 34 from types import ListType
p@21 35 import mock
p@21 36 import httplib2
p@21 37
p@21 38 # Fix for python2.5 compatibility
p@21 39 try:
p@21 40 from urlparse import parse_qs, parse_qsl
p@21 41 except ImportError:
p@21 42 from cgi import parse_qs, parse_qsl
p@21 43
p@21 44
p@21 45 sys.path[0:0] = [os.path.join(os.path.dirname(__file__), ".."),]
p@21 46
p@21 47
p@21 48 class TestError(unittest.TestCase):
p@21 49 def test_message(self):
p@21 50 try:
p@21 51 raise oauth.Error
p@21 52 except oauth.Error, e:
p@21 53 self.assertEqual(e.message, 'OAuth error occurred.')
p@21 54
p@21 55 msg = 'OMG THINGS BROKE!!!!'
p@21 56 try:
p@21 57 raise oauth.Error(msg)
p@21 58 except oauth.Error, e:
p@21 59 self.assertEqual(e.message, msg)
p@21 60
p@21 61 def test_str(self):
p@21 62 try:
p@21 63 raise oauth.Error
p@21 64 except oauth.Error, e:
p@21 65 self.assertEquals(str(e), 'OAuth error occurred.')
p@21 66
p@21 67 class TestGenerateFunctions(unittest.TestCase):
p@21 68 def test_build_auth_header(self):
p@21 69 header = oauth.build_authenticate_header()
p@21 70 self.assertEqual(header['WWW-Authenticate'], 'OAuth realm=""')
p@21 71 self.assertEqual(len(header), 1)
p@21 72 realm = 'http://example.myrealm.com/'
p@21 73 header = oauth.build_authenticate_header(realm)
p@21 74 self.assertEqual(header['WWW-Authenticate'], 'OAuth realm="%s"' %
p@21 75 realm)
p@21 76 self.assertEqual(len(header), 1)
p@21 77
p@21 78 def test_build_xoauth_string(self):
p@21 79 consumer = oauth.Consumer('consumer_token', 'consumer_secret')
p@21 80 token = oauth.Token('user_token', 'user_secret')
p@21 81 url = "https://mail.google.com/mail/b/joe@example.com/imap/"
p@21 82 xoauth_string = oauth.build_xoauth_string(url, consumer, token)
p@21 83
p@21 84 method, oauth_url, oauth_string = xoauth_string.split(' ')
p@21 85
p@21 86 self.assertEqual("GET", method)
p@21 87 self.assertEqual(url, oauth_url)
p@21 88
p@21 89 returned = {}
p@21 90 parts = oauth_string.split(',')
p@21 91 for part in parts:
p@21 92 var, val = part.split('=')
p@21 93 returned[var] = val.strip('"')
p@21 94
p@21 95 self.assertEquals('HMAC-SHA1', returned['oauth_signature_method'])
p@21 96 self.assertEquals('user_token', returned['oauth_token'])
p@21 97 self.assertEquals('consumer_token', returned['oauth_consumer_key'])
p@21 98 self.assertTrue('oauth_signature' in returned, 'oauth_signature')
p@21 99
p@21 100 def test_escape(self):
p@21 101 string = 'http://whatever.com/~someuser/?test=test&other=other'
p@21 102 self.assert_('~' in oauth.escape(string))
p@21 103 string = '../../../../../../../etc/passwd'
p@21 104 self.assert_('../' not in oauth.escape(string))
p@21 105
p@21 106 def test_gen_nonce(self):
p@21 107 nonce = oauth.generate_nonce()
p@21 108 self.assertEqual(len(nonce), 8)
p@21 109 nonce = oauth.generate_nonce(20)
p@21 110 self.assertEqual(len(nonce), 20)
p@21 111
p@21 112 def test_gen_verifier(self):
p@21 113 verifier = oauth.generate_verifier()
p@21 114 self.assertEqual(len(verifier), 8)
p@21 115 verifier = oauth.generate_verifier(16)
p@21 116 self.assertEqual(len(verifier), 16)
p@21 117
p@21 118 def test_gen_timestamp(self):
p@21 119 exp = int(time.time())
p@21 120 now = oauth.generate_timestamp()
p@21 121 self.assertEqual(exp, now)
p@21 122
p@21 123 class TestConsumer(unittest.TestCase):
p@21 124 def setUp(self):
p@21 125 self.key = 'my-key'
p@21 126 self.secret = 'my-secret'
p@21 127 self.consumer = oauth.Consumer(key=self.key, secret=self.secret)
p@21 128
p@21 129 def test_init(self):
p@21 130 self.assertEqual(self.consumer.key, self.key)
p@21 131 self.assertEqual(self.consumer.secret, self.secret)
p@21 132
p@21 133 def test_basic(self):
p@21 134 self.assertRaises(ValueError, lambda: oauth.Consumer(None, None))
p@21 135 self.assertRaises(ValueError, lambda: oauth.Consumer('asf', None))
p@21 136 self.assertRaises(ValueError, lambda: oauth.Consumer(None, 'dasf'))
p@21 137
p@21 138 def test_str(self):
p@21 139 res = dict(parse_qsl(str(self.consumer)))
p@21 140 self.assertTrue('oauth_consumer_key' in res)
p@21 141 self.assertTrue('oauth_consumer_secret' in res)
p@21 142 self.assertEquals(res['oauth_consumer_key'], self.consumer.key)
p@21 143 self.assertEquals(res['oauth_consumer_secret'], self.consumer.secret)
p@21 144
p@21 145 class TestToken(unittest.TestCase):
p@21 146 def setUp(self):
p@21 147 self.key = 'my-key'
p@21 148 self.secret = 'my-secret'
p@21 149 self.token = oauth.Token(self.key, self.secret)
p@21 150
p@21 151 def test_basic(self):
p@21 152 self.assertRaises(ValueError, lambda: oauth.Token(None, None))
p@21 153 self.assertRaises(ValueError, lambda: oauth.Token('asf', None))
p@21 154 self.assertRaises(ValueError, lambda: oauth.Token(None, 'dasf'))
p@21 155
p@21 156 def test_init(self):
p@21 157 self.assertEqual(self.token.key, self.key)
p@21 158 self.assertEqual(self.token.secret, self.secret)
p@21 159 self.assertEqual(self.token.callback, None)
p@21 160 self.assertEqual(self.token.callback_confirmed, None)
p@21 161 self.assertEqual(self.token.verifier, None)
p@21 162
p@21 163 def test_set_callback(self):
p@21 164 self.assertEqual(self.token.callback, None)
p@21 165 self.assertEqual(self.token.callback_confirmed, None)
p@21 166 cb = 'http://www.example.com/my-callback'
p@21 167 self.token.set_callback(cb)
p@21 168 self.assertEqual(self.token.callback, cb)
p@21 169 self.assertEqual(self.token.callback_confirmed, 'true')
p@21 170 self.token.set_callback(None)
p@21 171 self.assertEqual(self.token.callback, None)
p@21 172 # TODO: The following test should probably not pass, but it does
p@21 173 # To fix this, check for None and unset 'true' in set_callback
p@21 174 # Additionally, should a confirmation truly be done of the callback?
p@21 175 self.assertEqual(self.token.callback_confirmed, 'true')
p@21 176
p@21 177 def test_set_verifier(self):
p@21 178 self.assertEqual(self.token.verifier, None)
p@21 179 v = oauth.generate_verifier()
p@21 180 self.token.set_verifier(v)
p@21 181 self.assertEqual(self.token.verifier, v)
p@21 182 self.token.set_verifier()
p@21 183 self.assertNotEqual(self.token.verifier, v)
p@21 184 self.token.set_verifier('')
p@21 185 self.assertEqual(self.token.verifier, '')
p@21 186
p@21 187 def test_get_callback_url(self):
p@21 188 self.assertEqual(self.token.get_callback_url(), None)
p@21 189
p@21 190 self.token.set_verifier()
p@21 191 self.assertEqual(self.token.get_callback_url(), None)
p@21 192
p@21 193 cb = 'http://www.example.com/my-callback?save=1&return=true'
p@21 194 v = oauth.generate_verifier()
p@21 195 self.token.set_callback(cb)
p@21 196 self.token.set_verifier(v)
p@21 197 url = self.token.get_callback_url()
p@21 198 verifier_str = '&oauth_verifier=%s' % v
p@21 199 self.assertEqual(url, '%s%s' % (cb, verifier_str))
p@21 200
p@21 201 cb = 'http://www.example.com/my-callback-no-query'
p@21 202 v = oauth.generate_verifier()
p@21 203 self.token.set_callback(cb)
p@21 204 self.token.set_verifier(v)
p@21 205 url = self.token.get_callback_url()
p@21 206 verifier_str = '?oauth_verifier=%s' % v
p@21 207 self.assertEqual(url, '%s%s' % (cb, verifier_str))
p@21 208
p@21 209 def test_to_string(self):
p@21 210 string = 'oauth_token_secret=%s&oauth_token=%s' % (self.secret,
p@21 211 self.key)
p@21 212 self.assertEqual(self.token.to_string(), string)
p@21 213
p@21 214 self.token.set_callback('http://www.example.com/my-callback')
p@21 215 string += '&oauth_callback_confirmed=true'
p@21 216 self.assertEqual(self.token.to_string(), string)
p@21 217
p@21 218 def _compare_tokens(self, new):
p@21 219 self.assertEqual(self.token.key, new.key)
p@21 220 self.assertEqual(self.token.secret, new.secret)
p@21 221 # TODO: What about copying the callback to the new token?
p@21 222 # self.assertEqual(self.token.callback, new.callback)
p@21 223 self.assertEqual(self.token.callback_confirmed,
p@21 224 new.callback_confirmed)
p@21 225 # TODO: What about copying the verifier to the new token?
p@21 226 # self.assertEqual(self.token.verifier, new.verifier)
p@21 227
p@21 228 def test_to_string(self):
p@21 229 tok = oauth.Token('tooken', 'seecret')
p@21 230 self.assertEqual(str(tok), 'oauth_token_secret=seecret&oauth_token=tooken')
p@21 231
p@21 232 def test_from_string(self):
p@21 233 self.assertRaises(ValueError, lambda: oauth.Token.from_string(''))
p@21 234 self.assertRaises(ValueError, lambda: oauth.Token.from_string('blahblahblah'))
p@21 235 self.assertRaises(ValueError, lambda: oauth.Token.from_string('blah=blah'))
p@21 236
p@21 237 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret=asfdasf'))
p@21 238 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret='))
p@21 239 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=asfdasf'))
p@21 240 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token='))
p@21 241 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=&oauth_token_secret='))
p@21 242 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=tooken%26oauth_token_secret=seecret'))
p@21 243
p@21 244 string = self.token.to_string()
p@21 245 new = oauth.Token.from_string(string)
p@21 246 self._compare_tokens(new)
p@21 247
p@21 248 self.token.set_callback('http://www.example.com/my-callback')
p@21 249 string = self.token.to_string()
p@21 250 new = oauth.Token.from_string(string)
p@21 251 self._compare_tokens(new)
p@21 252
p@21 253 class ReallyEqualMixin:
p@21 254 def failUnlessReallyEqual(self, a, b, msg=None):
p@21 255 self.failUnlessEqual(a, b, msg=msg)
p@21 256 self.failUnlessEqual(type(a), type(b), msg="a :: %r, b :: %r, %r" % (a, b, msg))
p@21 257
p@21 258 class TestFuncs(unittest.TestCase):
p@21 259 def test_to_unicode(self):
p@21 260 self.failUnlessRaises(TypeError, oauth.to_unicode, '\xae')
p@21 261 self.failUnlessRaises(TypeError, oauth.to_unicode_optional_iterator, '\xae')
p@21 262 self.failUnlessRaises(TypeError, oauth.to_unicode_optional_iterator, ['\xae'])
p@21 263
p@21 264 self.failUnlessEqual(oauth.to_unicode(':-)'), u':-)')
p@21 265 self.failUnlessEqual(oauth.to_unicode(u'\u00ae'), u'\u00ae')
p@21 266 self.failUnlessEqual(oauth.to_unicode('\xc2\xae'), u'\u00ae')
p@21 267 self.failUnlessEqual(oauth.to_unicode_optional_iterator([':-)']), [u':-)'])
p@21 268 self.failUnlessEqual(oauth.to_unicode_optional_iterator([u'\u00ae']), [u'\u00ae'])
p@21 269
p@21 270 class TestRequest(unittest.TestCase, ReallyEqualMixin):
p@21 271 def test_setter(self):
p@21 272 url = "http://example.com"
p@21 273 method = "GET"
p@21 274 req = oauth.Request(method)
p@21 275 self.assertTrue(not hasattr(req, 'url') or req.url is None)
p@21 276 self.assertTrue(not hasattr(req, 'normalized_url') or req.normalized_url is None)
p@21 277
p@21 278 def test_deleter(self):
p@21 279 url = "http://example.com"
p@21 280 method = "GET"
p@21 281 req = oauth.Request(method, url)
p@21 282
p@21 283 try:
p@21 284 del req.url
p@21 285 url = req.url
p@21 286 self.fail("AttributeError should have been raised on empty url.")
p@21 287 except AttributeError:
p@21 288 pass
p@21 289 except Exception, e:
p@21 290 self.fail(str(e))
p@21 291
p@21 292 def test_url(self):
p@21 293 url1 = "http://example.com:80/foo.php"
p@21 294 url2 = "https://example.com:443/foo.php"
p@21 295 exp1 = "http://example.com/foo.php"
p@21 296 exp2 = "https://example.com/foo.php"
p@21 297 method = "GET"
p@21 298
p@21 299 req = oauth.Request(method, url1)
p@21 300 self.assertEquals(req.normalized_url, exp1)
p@21 301 self.assertEquals(req.url, url1)
p@21 302
p@21 303 req = oauth.Request(method, url2)
p@21 304 self.assertEquals(req.normalized_url, exp2)
p@21 305 self.assertEquals(req.url, url2)
p@21 306
p@21 307 def test_bad_url(self):
p@21 308 request = oauth.Request()
p@21 309 try:
p@21 310 request.url = "ftp://example.com"
p@21 311 self.fail("Invalid URL scheme was accepted.")
p@21 312 except ValueError:
p@21 313 pass
p@21 314
p@21 315 def test_unset_consumer_and_token(self):
p@21 316 consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
p@21 317 token = oauth.Token('my_key', 'my_secret')
p@21 318 request = oauth.Request("GET", "http://example.com/fetch.php")
p@21 319 request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer,
p@21 320 token)
p@21 321
p@21 322 self.assertEquals(consumer.key, request['oauth_consumer_key'])
p@21 323 self.assertEquals(token.key, request['oauth_token'])
p@21 324
p@21 325 def test_no_url_set(self):
p@21 326 consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
p@21 327 token = oauth.Token('my_key', 'my_secret')
p@21 328 request = oauth.Request()
p@21 329
p@21 330 try:
p@21 331 try:
p@21 332 request.sign_request(oauth.SignatureMethod_HMAC_SHA1(),
p@21 333 consumer, token)
p@21 334 except TypeError:
p@21 335 self.fail("Signature method didn't check for a normalized URL.")
p@21 336 except ValueError:
p@21 337 pass
p@21 338
p@21 339 def test_url_query(self):
p@21 340 url = "https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10"
p@21 341 normalized_url = urlparse.urlunparse(urlparse.urlparse(url)[:3] + (None, None, None))
p@21 342 method = "GET"
p@21 343
p@21 344 req = oauth.Request(method, url)
p@21 345 self.assertEquals(req.url, url)
p@21 346 self.assertEquals(req.normalized_url, normalized_url)
p@21 347
p@21 348 def test_get_parameter(self):
p@21 349 url = "http://example.com"
p@21 350 method = "GET"
p@21 351 params = {'oauth_consumer' : 'asdf'}
p@21 352 req = oauth.Request(method, url, parameters=params)
p@21 353
p@21 354 self.assertEquals(req.get_parameter('oauth_consumer'), 'asdf')
p@21 355 self.assertRaises(oauth.Error, req.get_parameter, 'blah')
p@21 356
p@21 357 def test_get_nonoauth_parameters(self):
p@21 358
p@21 359 oauth_params = {
p@21 360 'oauth_consumer': 'asdfasdfasdf'
p@21 361 }
p@21 362
p@21 363 other_params = {
p@21 364 u'foo': u'baz',
p@21 365 u'bar': u'foo',
p@21 366 u'multi': [u'FOO',u'BAR'],
p@21 367 u'uni_utf8': u'\xae',
p@21 368 u'uni_unicode': u'\u00ae',
p@21 369 u'uni_unicode_2': u'åÅøØ',
p@21 370 }
p@21 371
p@21 372 params = oauth_params
p@21 373 params.update(other_params)
p@21 374
p@21 375 req = oauth.Request("GET", "http://example.com", params)
p@21 376 self.assertEquals(other_params, req.get_nonoauth_parameters())
p@21 377
p@21 378 def test_to_header(self):
p@21 379 realm = "http://sp.example.com/"
p@21 380
p@21 381 params = {
p@21 382 'oauth_version': "1.0",
p@21 383 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 384 'oauth_timestamp': "137131200",
p@21 385 'oauth_consumer_key': "0685bd9184jfhq22",
p@21 386 'oauth_signature_method': "HMAC-SHA1",
p@21 387 'oauth_token': "ad180jjd733klru7",
p@21 388 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
p@21 389 }
p@21 390
p@21 391 req = oauth.Request("GET", realm, params)
p@21 392 header, value = req.to_header(realm).items()[0]
p@21 393
p@21 394 parts = value.split('OAuth ')
p@21 395 vars = parts[1].split(', ')
p@21 396 self.assertTrue(len(vars), (len(params) + 1))
p@21 397
p@21 398 res = {}
p@21 399 for v in vars:
p@21 400 var, val = v.split('=')
p@21 401 res[var] = urllib.unquote(val.strip('"'))
p@21 402
p@21 403 self.assertEquals(realm, res['realm'])
p@21 404 del res['realm']
p@21 405
p@21 406 self.assertTrue(len(res), len(params))
p@21 407
p@21 408 for key, val in res.items():
p@21 409 self.assertEquals(val, params.get(key))
p@21 410
p@21 411 def test_to_postdata_nonascii(self):
p@21 412 realm = "http://sp.example.com/"
p@21 413
p@21 414 params = {
p@21 415 'nonasciithing': u'q\xbfu\xe9 ,aasp u?..a.s',
p@21 416 'oauth_version': "1.0",
p@21 417 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 418 'oauth_timestamp': "137131200",
p@21 419 'oauth_consumer_key': "0685bd9184jfhq22",
p@21 420 'oauth_signature_method': "HMAC-SHA1",
p@21 421 'oauth_token': "ad180jjd733klru7",
p@21 422 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
p@21 423 }
p@21 424
p@21 425 req = oauth.Request("GET", realm, params)
p@21 426
p@21 427 self.failUnlessReallyEqual(req.to_postdata(), 'nonasciithing=q%C2%BFu%C3%A9%20%2Caasp%20u%3F..a.s&oauth_nonce=4572616e48616d6d65724c61686176&oauth_timestamp=137131200&oauth_consumer_key=0685bd9184jfhq22&oauth_signature_method=HMAC-SHA1&oauth_version=1.0&oauth_token=ad180jjd733klru7&oauth_signature=wOJIO9A2W5mFwDgiDvZbTSMK%252FPY%253D')
p@21 428
p@21 429 def test_to_postdata(self):
p@21 430 realm = "http://sp.example.com/"
p@21 431
p@21 432 params = {
p@21 433 'multi': ['FOO','BAR'],
p@21 434 'oauth_version': "1.0",
p@21 435 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 436 'oauth_timestamp': "137131200",
p@21 437 'oauth_consumer_key': "0685bd9184jfhq22",
p@21 438 'oauth_signature_method': "HMAC-SHA1",
p@21 439 'oauth_token': "ad180jjd733klru7",
p@21 440 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
p@21 441 }
p@21 442
p@21 443 req = oauth.Request("GET", realm, params)
p@21 444
p@21 445 flat = [('multi','FOO'),('multi','BAR')]
p@21 446 del params['multi']
p@21 447 flat.extend(params.items())
p@21 448 kf = lambda x: x[0]
p@21 449 self.assertEquals(sorted(flat, key=kf), sorted(parse_qsl(req.to_postdata()), key=kf))
p@21 450
p@21 451 def test_to_url(self):
p@21 452 url = "http://sp.example.com/"
p@21 453
p@21 454 params = {
p@21 455 'oauth_version': "1.0",
p@21 456 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 457 'oauth_timestamp': "137131200",
p@21 458 'oauth_consumer_key': "0685bd9184jfhq22",
p@21 459 'oauth_signature_method': "HMAC-SHA1",
p@21 460 'oauth_token': "ad180jjd733klru7",
p@21 461 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
p@21 462 }
p@21 463
p@21 464 req = oauth.Request("GET", url, params)
p@21 465 exp = urlparse.urlparse("%s?%s" % (url, urllib.urlencode(params)))
p@21 466 res = urlparse.urlparse(req.to_url())
p@21 467 self.assertEquals(exp.scheme, res.scheme)
p@21 468 self.assertEquals(exp.netloc, res.netloc)
p@21 469 self.assertEquals(exp.path, res.path)
p@21 470
p@21 471 a = parse_qs(exp.query)
p@21 472 b = parse_qs(res.query)
p@21 473 self.assertEquals(a, b)
p@21 474
p@21 475 def test_to_url_with_query(self):
p@21 476 url = "https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10"
p@21 477
p@21 478 params = {
p@21 479 'oauth_version': "1.0",
p@21 480 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 481 'oauth_timestamp': "137131200",
p@21 482 'oauth_consumer_key': "0685bd9184jfhq22",
p@21 483 'oauth_signature_method': "HMAC-SHA1",
p@21 484 'oauth_token': "ad180jjd733klru7",
p@21 485 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
p@21 486 }
p@21 487
p@21 488 req = oauth.Request("GET", url, params)
p@21 489 # Note: the url above already has query parameters, so append new ones with &
p@21 490 exp = urlparse.urlparse("%s&%s" % (url, urllib.urlencode(params)))
p@21 491 res = urlparse.urlparse(req.to_url())
p@21 492 self.assertEquals(exp.scheme, res.scheme)
p@21 493 self.assertEquals(exp.netloc, res.netloc)
p@21 494 self.assertEquals(exp.path, res.path)
p@21 495
p@21 496 a = parse_qs(exp.query)
p@21 497 b = parse_qs(res.query)
p@21 498 self.assertTrue('alt' in b)
p@21 499 self.assertTrue('max-contacts' in b)
p@21 500 self.assertEquals(b['alt'], ['json'])
p@21 501 self.assertEquals(b['max-contacts'], ['10'])
p@21 502 self.assertEquals(a, b)
p@21 503
p@21 504 def test_signature_base_string_nonascii_nonutf8(self):
p@21 505 consumer = oauth.Consumer('consumer_token', 'consumer_secret')
p@21 506
p@21 507 url = u'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc\u2766,+CA'
p@21 508 req = oauth.Request("GET", url)
p@21 509 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json')
p@21 510 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
p@21 511 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=')
p@21 512
p@21 513 url = 'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc\xe2\x9d\xa6,+CA'
p@21 514 req = oauth.Request("GET", url)
p@21 515 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json')
p@21 516 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
p@21 517 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=')
p@21 518
p@21 519 url = 'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA'
p@21 520 req = oauth.Request("GET", url)
p@21 521 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json')
p@21 522 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
p@21 523 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=')
p@21 524
p@21 525 url = u'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA'
p@21 526 req = oauth.Request("GET", url)
p@21 527 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json')
p@21 528 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
p@21 529 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=')
p@21 530
p@21 531 def test_signature_base_string_with_query(self):
p@21 532 url = "https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10"
p@21 533 params = {
p@21 534 'oauth_version': "1.0",
p@21 535 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 536 'oauth_timestamp': "137131200",
p@21 537 'oauth_consumer_key': "0685bd9184jfhq22",
p@21 538 'oauth_signature_method': "HMAC-SHA1",
p@21 539 'oauth_token': "ad180jjd733klru7",
p@21 540 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
p@21 541 }
p@21 542 req = oauth.Request("GET", url, params)
p@21 543 self.assertEquals(req.normalized_url, 'https://www.google.com/m8/feeds/contacts/default/full/')
p@21 544 self.assertEquals(req.url, 'https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10')
p@21 545 normalized_params = parse_qsl(req.get_normalized_parameters())
p@21 546 self.assertTrue(len(normalized_params), len(params) + 2)
p@21 547 normalized_params = dict(normalized_params)
p@21 548 for key, value in params.iteritems():
p@21 549 if key == 'oauth_signature':
p@21 550 continue
p@21 551 self.assertEquals(value, normalized_params[key])
p@21 552 self.assertEquals(normalized_params['alt'], 'json')
p@21 553 self.assertEquals(normalized_params['max-contacts'], '10')
p@21 554
p@21 555 def test_get_normalized_parameters_empty(self):
p@21 556 url = "http://sp.example.com/?empty="
p@21 557
p@21 558 req = oauth.Request("GET", url)
p@21 559
p@21 560 res = req.get_normalized_parameters()
p@21 561
p@21 562 expected='empty='
p@21 563
p@21 564 self.assertEquals(expected, res)
p@21 565
p@21 566 def test_get_normalized_parameters_duplicate(self):
p@21 567 url = "http://example.com/v2/search/videos?oauth_nonce=79815175&oauth_timestamp=1295397962&oauth_consumer_key=mykey&oauth_signature_method=HMAC-SHA1&q=car&oauth_version=1.0&offset=10&oauth_signature=spWLI%2FGQjid7sQVd5%2FarahRxzJg%3D"
p@21 568
p@21 569 req = oauth.Request("GET", url)
p@21 570
p@21 571 res = req.get_normalized_parameters()
p@21 572
p@21 573 expected='oauth_consumer_key=mykey&oauth_nonce=79815175&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1295397962&oauth_version=1.0&offset=10&q=car'
p@21 574
p@21 575 self.assertEquals(expected, res)
p@21 576
p@21 577 def test_get_normalized_parameters_from_url(self):
p@21 578 # example copied from
p@21 579 # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
p@21 580 # which in turns says that it was copied from
p@21 581 # http://oauth.net/core/1.0/#sig_base_example .
p@21 582 url = "http://photos.example.net/photos?file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk&oauth_version=1.0&size=original"
p@21 583
p@21 584 req = oauth.Request("GET", url)
p@21 585
p@21 586 res = req.get_normalized_parameters()
p@21 587
p@21 588 expected = 'file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk&oauth_version=1.0&size=original'
p@21 589
p@21 590 self.assertEquals(expected, res)
p@21 591
p@21 592 def test_signing_base(self):
p@21 593 # example copied from
p@21 594 # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
p@21 595 # which in turns says that it was copied from
p@21 596 # http://oauth.net/core/1.0/#sig_base_example .
p@21 597 url = "http://photos.example.net/photos?file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk&oauth_version=1.0&size=original"
p@21 598
p@21 599 req = oauth.Request("GET", url)
p@21 600
p@21 601 sm = oauth.SignatureMethod_HMAC_SHA1()
p@21 602
p@21 603 consumer = oauth.Consumer('dpf43f3p2l4k3l03', 'foo')
p@21 604 key, raw = sm.signing_base(req, consumer, None)
p@21 605
p@21 606 expected = 'GET&http%3A%2F%2Fphotos.example.net%2Fphotos&file%3Dvacation.jpg%26oauth_consumer_key%3Ddpf43f3p2l4k3l03%26oauth_nonce%3Dkllo9940pd9333jh%26oauth_signature_method%3DHMAC-SHA1%26oauth_timestamp%3D1191242096%26oauth_token%3Dnnch734d00sl2jdk%26oauth_version%3D1.0%26size%3Doriginal'
p@21 607 self.assertEquals(expected, raw)
p@21 608
p@21 609 def test_get_normalized_parameters(self):
p@21 610 url = "http://sp.example.com/"
p@21 611
p@21 612 params = {
p@21 613 'oauth_version': "1.0",
p@21 614 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 615 'oauth_timestamp': "137131200",
p@21 616 'oauth_consumer_key': "0685bd9184jfhq22",
p@21 617 'oauth_signature_method': "HMAC-SHA1",
p@21 618 'oauth_token': "ad180jjd733klru7",
p@21 619 'multi': ['FOO','BAR', u'\u00ae', '\xc2\xae'],
p@21 620 'multi_same': ['FOO','FOO'],
p@21 621 'uni_utf8_bytes': '\xc2\xae',
p@21 622 'uni_unicode_object': u'\u00ae'
p@21 623 }
p@21 624
p@21 625 req = oauth.Request("GET", url, params)
p@21 626
p@21 627 res = req.get_normalized_parameters()
p@21 628
p@21 629 expected='multi=BAR&multi=FOO&multi=%C2%AE&multi=%C2%AE&multi_same=FOO&multi_same=FOO&oauth_consumer_key=0685bd9184jfhq22&oauth_nonce=4572616e48616d6d65724c61686176&oauth_signature_method=HMAC-SHA1&oauth_timestamp=137131200&oauth_token=ad180jjd733klru7&oauth_version=1.0&uni_unicode_object=%C2%AE&uni_utf8_bytes=%C2%AE'
p@21 630
p@21 631 self.assertEquals(expected, res)
p@21 632
p@21 633 def test_get_normalized_parameters_ignores_auth_signature(self):
p@21 634 url = "http://sp.example.com/"
p@21 635
p@21 636 params = {
p@21 637 'oauth_version': "1.0",
p@21 638 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 639 'oauth_timestamp': "137131200",
p@21 640 'oauth_consumer_key': "0685bd9184jfhq22",
p@21 641 'oauth_signature_method': "HMAC-SHA1",
p@21 642 'oauth_signature': "some-random-signature-%d" % random.randint(1000, 2000),
p@21 643 'oauth_token': "ad180jjd733klru7",
p@21 644 }
p@21 645
p@21 646 req = oauth.Request("GET", url, params)
p@21 647
p@21 648 res = req.get_normalized_parameters()
p@21 649
p@21 650 self.assertNotEquals(urllib.urlencode(sorted(params.items())), res)
p@21 651
p@21 652 foo = params.copy()
p@21 653 del foo["oauth_signature"]
p@21 654 self.assertEqual(urllib.urlencode(sorted(foo.items())), res)
p@21 655
p@21 656 def test_set_signature_method(self):
p@21 657 consumer = oauth.Consumer('key', 'secret')
p@21 658 client = oauth.Client(consumer)
p@21 659
p@21 660 class Blah:
p@21 661 pass
p@21 662
p@21 663 try:
p@21 664 client.set_signature_method(Blah())
p@21 665 self.fail("Client.set_signature_method() accepted invalid method.")
p@21 666 except ValueError:
p@21 667 pass
p@21 668
p@21 669 m = oauth.SignatureMethod_HMAC_SHA1()
p@21 670 client.set_signature_method(m)
p@21 671 self.assertEquals(m, client.method)
p@21 672
p@21 673 def test_get_normalized_string_escapes_spaces_properly(self):
p@21 674 url = "http://sp.example.com/"
p@21 675 params = {
p@21 676 "some_random_data": random.randint(100, 1000),
p@21 677 "data": "This data with a random number (%d) has spaces!" % random.randint(1000, 2000),
p@21 678 }
p@21 679
p@21 680 req = oauth.Request("GET", url, params)
p@21 681 res = req.get_normalized_parameters()
p@21 682 expected = urllib.urlencode(sorted(params.items())).replace('+', '%20')
p@21 683 self.assertEqual(expected, res)
p@21 684
p@21 685 @mock.patch('oauth2.Request.make_timestamp')
p@21 686 @mock.patch('oauth2.Request.make_nonce')
p@21 687 def test_request_nonutf8_bytes(self, mock_make_nonce, mock_make_timestamp):
p@21 688 mock_make_nonce.return_value = 5
p@21 689 mock_make_timestamp.return_value = 6
p@21 690
p@21 691 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
p@21 692 con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
p@21 693 params = {
p@21 694 'oauth_version': "1.0",
p@21 695 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 696 'oauth_timestamp': "137131200",
p@21 697 'oauth_token': tok.key,
p@21 698 'oauth_consumer_key': con.key
p@21 699 }
p@21 700
p@21 701 # If someone passes a sequence of bytes which is not ascii for
p@21 702 # url, we'll raise an exception as early as possible.
p@21 703 url = "http://sp.example.com/\x92" # It's actually cp1252-encoding...
p@21 704 self.assertRaises(TypeError, oauth.Request, method="GET", url=url, parameters=params)
p@21 705
p@21 706 # And if they pass an unicode, then we'll use it.
p@21 707 url = u'http://sp.example.com/\u2019'
p@21 708 req = oauth.Request(method="GET", url=url, parameters=params)
p@21 709 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
p@21 710 self.failUnlessReallyEqual(req['oauth_signature'], 'cMzvCkhvLL57+sTIxLITTHfkqZk=')
p@21 711
p@21 712 # And if it is a utf-8-encoded-then-percent-encoded non-ascii
p@21 713 # thing, we'll decode it and use it.
p@21 714 url = "http://sp.example.com/%E2%80%99"
p@21 715 req = oauth.Request(method="GET", url=url, parameters=params)
p@21 716 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
p@21 717 self.failUnlessReallyEqual(req['oauth_signature'], 'yMLKOyNKC/DkyhUOb8DLSvceEWE=')
p@21 718
p@21 719 # Same thing with the params.
p@21 720 url = "http://sp.example.com/"
p@21 721
p@21 722 # If someone passes a sequence of bytes which is not ascii in
p@21 723 # params, we'll raise an exception as early as possible.
p@21 724 params['non_oauth_thing'] = '\xae', # It's actually cp1252-encoding...
p@21 725 self.assertRaises(TypeError, oauth.Request, method="GET", url=url, parameters=params)
p@21 726
p@21 727 # And if they pass a unicode, then we'll use it.
p@21 728 params['non_oauth_thing'] = u'\u2019'
p@21 729 req = oauth.Request(method="GET", url=url, parameters=params)
p@21 730 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
p@21 731 self.failUnlessReallyEqual(req['oauth_signature'], '0GU50m0v60CVDB5JnoBXnvvvKx4=')
p@21 732
p@21 733 # And if it is a utf-8-encoded non-ascii thing, we'll decode
p@21 734 # it and use it.
p@21 735 params['non_oauth_thing'] = '\xc2\xae'
p@21 736 req = oauth.Request(method="GET", url=url, parameters=params)
p@21 737 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
p@21 738 self.failUnlessReallyEqual(req['oauth_signature'], 'pqOCu4qvRTiGiXB8Z61Jsey0pMM=')
p@21 739
p@21 740
p@21 741 # Also if there are non-utf8 bytes in the query args.
p@21 742 url = "http://sp.example.com/?q=\x92" # cp1252
p@21 743 self.assertRaises(TypeError, oauth.Request, method="GET", url=url, parameters=params)
p@21 744
p@21 745 def test_request_hash_of_body(self):
p@21 746 tok = oauth.Token(key="token", secret="tok-test-secret")
p@21 747 con = oauth.Consumer(key="consumer", secret="con-test-secret")
p@21 748
p@21 749 # Example 1a from Appendix A.1 of
p@21 750 # http://oauth.googlecode.com/svn/spec/ext/body_hash/1.0/oauth-bodyhash.html
p@21 751 # Except that we get a differetn result than they do.
p@21 752
p@21 753 params = {
p@21 754 'oauth_version': "1.0",
p@21 755 'oauth_token': tok.key,
p@21 756 'oauth_nonce': 10288510250934,
p@21 757 'oauth_timestamp': 1236874155,
p@21 758 'oauth_consumer_key': con.key
p@21 759 }
p@21 760
p@21 761 url = u"http://www.example.com/resource"
p@21 762 req = oauth.Request(method="PUT", url=url, parameters=params, body="Hello World!", is_form_encoded=False)
p@21 763 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
p@21 764 self.failUnlessReallyEqual(req['oauth_body_hash'], 'Lve95gjOVATpfV8EL5X4nxwjKHE=')
p@21 765 self.failUnlessReallyEqual(req['oauth_signature'], 't+MX8l/0S8hdbVQL99nD0X1fPnM=')
p@21 766 # oauth-bodyhash.html A.1 has
p@21 767 # '08bUFF%2Fjmp59mWB7cSgCYBUpJ0U%3D', but I don't see how that
p@21 768 # is possible.
p@21 769
p@21 770 # Example 1b
p@21 771 params = {
p@21 772 'oauth_version': "1.0",
p@21 773 'oauth_token': tok.key,
p@21 774 'oauth_nonce': 10369470270925,
p@21 775 'oauth_timestamp': 1236874236,
p@21 776 'oauth_consumer_key': con.key
p@21 777 }
p@21 778
p@21 779 req = oauth.Request(method="PUT", url=url, parameters=params, body="Hello World!", is_form_encoded=False)
p@21 780 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
p@21 781 self.failUnlessReallyEqual(req['oauth_body_hash'], 'Lve95gjOVATpfV8EL5X4nxwjKHE=')
p@21 782 self.failUnlessReallyEqual(req['oauth_signature'], 'CTFmrqJIGT7NsWJ42OrujahTtTc=')
p@21 783
p@21 784 # Appendix A.2
p@21 785 params = {
p@21 786 'oauth_version': "1.0",
p@21 787 'oauth_token': tok.key,
p@21 788 'oauth_nonce': 8628868109991,
p@21 789 'oauth_timestamp': 1238395022,
p@21 790 'oauth_consumer_key': con.key
p@21 791 }
p@21 792
p@21 793 req = oauth.Request(method="GET", url=url, parameters=params, is_form_encoded=False)
p@21 794 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
p@21 795 self.failUnlessReallyEqual(req['oauth_body_hash'], '2jmj7l5rSw0yVb/vlWAYkK/YBwk=')
p@21 796 self.failUnlessReallyEqual(req['oauth_signature'], 'Zhl++aWSP0O3/hYQ0CuBc7jv38I=')
p@21 797
p@21 798
p@21 799 def test_sign_request(self):
p@21 800 url = "http://sp.example.com/"
p@21 801
p@21 802 params = {
p@21 803 'oauth_version': "1.0",
p@21 804 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 805 'oauth_timestamp': "137131200"
p@21 806 }
p@21 807
p@21 808 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
p@21 809 con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
p@21 810
p@21 811 params['oauth_token'] = tok.key
p@21 812 params['oauth_consumer_key'] = con.key
p@21 813 req = oauth.Request(method="GET", url=url, parameters=params)
p@21 814
p@21 815 methods = {
p@21 816 'DX01TdHws7OninCLK9VztNTH1M4=': oauth.SignatureMethod_HMAC_SHA1(),
p@21 817 'con-test-secret&tok-test-secret': oauth.SignatureMethod_PLAINTEXT()
p@21 818 }
p@21 819
p@21 820 for exp, method in methods.items():
p@21 821 req.sign_request(method, con, tok)
p@21 822 self.assertEquals(req['oauth_signature_method'], method.name)
p@21 823 self.assertEquals(req['oauth_signature'], exp)
p@21 824
p@21 825 # Also if there are non-ascii chars in the URL.
p@21 826 url = "http://sp.example.com/\xe2\x80\x99" # utf-8 bytes
p@21 827 req = oauth.Request(method="GET", url=url, parameters=params)
p@21 828 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok)
p@21 829 self.assertEquals(req['oauth_signature'], 'loFvp5xC7YbOgd9exIO6TxB7H4s=')
p@21 830
p@21 831 url = u'http://sp.example.com/\u2019' # Python unicode object
p@21 832 req = oauth.Request(method="GET", url=url, parameters=params)
p@21 833 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok)
p@21 834 self.assertEquals(req['oauth_signature'], 'loFvp5xC7YbOgd9exIO6TxB7H4s=')
p@21 835
p@21 836 # Also if there are non-ascii chars in the query args.
p@21 837 url = "http://sp.example.com/?q=\xe2\x80\x99" # utf-8 bytes
p@21 838 req = oauth.Request(method="GET", url=url, parameters=params)
p@21 839 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok)
p@21 840 self.assertEquals(req['oauth_signature'], 'IBw5mfvoCsDjgpcsVKbyvsDqQaU=')
p@21 841
p@21 842 url = u'http://sp.example.com/?q=\u2019' # Python unicode object
p@21 843 req = oauth.Request(method="GET", url=url, parameters=params)
p@21 844 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok)
p@21 845 self.assertEquals(req['oauth_signature'], 'IBw5mfvoCsDjgpcsVKbyvsDqQaU=')
p@21 846
p@21 847 def test_from_request(self):
p@21 848 url = "http://sp.example.com/"
p@21 849
p@21 850 params = {
p@21 851 'oauth_version': "1.0",
p@21 852 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 853 'oauth_timestamp': "137131200",
p@21 854 'oauth_consumer_key': "0685bd9184jfhq22",
p@21 855 'oauth_signature_method': "HMAC-SHA1",
p@21 856 'oauth_token': "ad180jjd733klru7",
p@21 857 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
p@21 858 }
p@21 859
p@21 860 req = oauth.Request("GET", url, params)
p@21 861 headers = req.to_header()
p@21 862
p@21 863 # Test from the headers
p@21 864 req = oauth.Request.from_request("GET", url, headers)
p@21 865 self.assertEquals(req.method, "GET")
p@21 866 self.assertEquals(req.url, url)
p@21 867
p@21 868 self.assertEquals(params, req.copy())
p@21 869
p@21 870 # Test with bad OAuth headers
p@21 871 bad_headers = {
p@21 872 'Authorization' : 'OAuth this is a bad header'
p@21 873 }
p@21 874
p@21 875 self.assertRaises(oauth.Error, oauth.Request.from_request, "GET",
p@21 876 url, bad_headers)
p@21 877
p@21 878 # Test getting from query string
p@21 879 qs = urllib.urlencode(params)
p@21 880 req = oauth.Request.from_request("GET", url, query_string=qs)
p@21 881
p@21 882 exp = parse_qs(qs, keep_blank_values=False)
p@21 883 for k, v in exp.iteritems():
p@21 884 exp[k] = urllib.unquote(v[0])
p@21 885
p@21 886 self.assertEquals(exp, req.copy())
p@21 887
p@21 888 # Test that a boned from_request() call returns None
p@21 889 req = oauth.Request.from_request("GET", url)
p@21 890 self.assertEquals(None, req)
p@21 891
p@21 892 def test_from_token_and_callback(self):
p@21 893 url = "http://sp.example.com/"
p@21 894
p@21 895 params = {
p@21 896 'oauth_version': "1.0",
p@21 897 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 898 'oauth_timestamp': "137131200",
p@21 899 'oauth_consumer_key': "0685bd9184jfhq22",
p@21 900 'oauth_signature_method': "HMAC-SHA1",
p@21 901 'oauth_token': "ad180jjd733klru7",
p@21 902 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
p@21 903 }
p@21 904
p@21 905 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
p@21 906 req = oauth.Request.from_token_and_callback(tok)
p@21 907 self.assertFalse('oauth_callback' in req)
p@21 908 self.assertEquals(req['oauth_token'], tok.key)
p@21 909
p@21 910 req = oauth.Request.from_token_and_callback(tok, callback=url)
p@21 911 self.assertTrue('oauth_callback' in req)
p@21 912 self.assertEquals(req['oauth_callback'], url)
p@21 913
p@21 914 def test_from_consumer_and_token(self):
p@21 915 url = "http://sp.example.com/"
p@21 916
p@21 917 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
p@21 918 tok.set_verifier('this_is_a_test_verifier')
p@21 919 con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
p@21 920 req = oauth.Request.from_consumer_and_token(con, token=tok,
p@21 921 http_method="GET", http_url=url)
p@21 922
p@21 923 self.assertEquals(req['oauth_token'], tok.key)
p@21 924 self.assertEquals(req['oauth_consumer_key'], con.key)
p@21 925 self.assertEquals(tok.verifier, req['oauth_verifier'])
p@21 926
p@21 927 class SignatureMethod_Bad(oauth.SignatureMethod):
p@21 928 name = "BAD"
p@21 929
p@21 930 def signing_base(self, request, consumer, token):
p@21 931 return ""
p@21 932
p@21 933 def sign(self, request, consumer, token):
p@21 934 return "invalid-signature"
p@21 935
p@21 936
p@21 937 class TestServer(unittest.TestCase):
p@21 938 def setUp(self):
p@21 939 url = "http://sp.example.com/"
p@21 940
p@21 941 params = {
p@21 942 'oauth_version': "1.0",
p@21 943 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 944 'oauth_timestamp': int(time.time()),
p@21 945 'bar': 'blerg',
p@21 946 'multi': ['FOO','BAR'],
p@21 947 'foo': 59
p@21 948 }
p@21 949
p@21 950 self.consumer = oauth.Consumer(key="consumer-key",
p@21 951 secret="consumer-secret")
p@21 952 self.token = oauth.Token(key="token-key", secret="token-secret")
p@21 953
p@21 954 params['oauth_token'] = self.token.key
p@21 955 params['oauth_consumer_key'] = self.consumer.key
p@21 956 self.request = oauth.Request(method="GET", url=url, parameters=params)
p@21 957
p@21 958 signature_method = oauth.SignatureMethod_HMAC_SHA1()
p@21 959 self.request.sign_request(signature_method, self.consumer, self.token)
p@21 960
p@21 961 def test_init(self):
p@21 962 server = oauth.Server(signature_methods={'HMAC-SHA1' : oauth.SignatureMethod_HMAC_SHA1()})
p@21 963 self.assertTrue('HMAC-SHA1' in server.signature_methods)
p@21 964 self.assertTrue(isinstance(server.signature_methods['HMAC-SHA1'],
p@21 965 oauth.SignatureMethod_HMAC_SHA1))
p@21 966
p@21 967 server = oauth.Server()
p@21 968 self.assertEquals(server.signature_methods, {})
p@21 969
p@21 970 def test_add_signature_method(self):
p@21 971 server = oauth.Server()
p@21 972 res = server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
p@21 973 self.assertTrue(len(res) == 1)
p@21 974 self.assertTrue('HMAC-SHA1' in res)
p@21 975 self.assertTrue(isinstance(res['HMAC-SHA1'],
p@21 976 oauth.SignatureMethod_HMAC_SHA1))
p@21 977
p@21 978 res = server.add_signature_method(oauth.SignatureMethod_PLAINTEXT())
p@21 979 self.assertTrue(len(res) == 2)
p@21 980 self.assertTrue('PLAINTEXT' in res)
p@21 981 self.assertTrue(isinstance(res['PLAINTEXT'],
p@21 982 oauth.SignatureMethod_PLAINTEXT))
p@21 983
p@21 984 def test_verify_request(self):
p@21 985 server = oauth.Server()
p@21 986 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
p@21 987
p@21 988 parameters = server.verify_request(self.request, self.consumer,
p@21 989 self.token)
p@21 990
p@21 991 self.assertTrue('bar' in parameters)
p@21 992 self.assertTrue('foo' in parameters)
p@21 993 self.assertTrue('multi' in parameters)
p@21 994 self.assertEquals(parameters['bar'], 'blerg')
p@21 995 self.assertEquals(parameters['foo'], 59)
p@21 996 self.assertEquals(parameters['multi'], ['FOO','BAR'])
p@21 997
p@21 998 def test_build_authenticate_header(self):
p@21 999 server = oauth.Server()
p@21 1000 headers = server.build_authenticate_header('example.com')
p@21 1001 self.assertTrue('WWW-Authenticate' in headers)
p@21 1002 self.assertEquals('OAuth realm="example.com"',
p@21 1003 headers['WWW-Authenticate'])
p@21 1004
p@21 1005 def test_no_version(self):
p@21 1006 url = "http://sp.example.com/"
p@21 1007
p@21 1008 params = {
p@21 1009 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 1010 'oauth_timestamp': int(time.time()),
p@21 1011 'bar': 'blerg',
p@21 1012 'multi': ['FOO','BAR'],
p@21 1013 'foo': 59
p@21 1014 }
p@21 1015
p@21 1016 self.consumer = oauth.Consumer(key="consumer-key",
p@21 1017 secret="consumer-secret")
p@21 1018 self.token = oauth.Token(key="token-key", secret="token-secret")
p@21 1019
p@21 1020 params['oauth_token'] = self.token.key
p@21 1021 params['oauth_consumer_key'] = self.consumer.key
p@21 1022 self.request = oauth.Request(method="GET", url=url, parameters=params)
p@21 1023
p@21 1024 signature_method = oauth.SignatureMethod_HMAC_SHA1()
p@21 1025 self.request.sign_request(signature_method, self.consumer, self.token)
p@21 1026
p@21 1027 server = oauth.Server()
p@21 1028 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
p@21 1029
p@21 1030 parameters = server.verify_request(self.request, self.consumer,
p@21 1031 self.token)
p@21 1032
p@21 1033 def test_invalid_version(self):
p@21 1034 url = "http://sp.example.com/"
p@21 1035
p@21 1036 params = {
p@21 1037 'oauth_version': '222.9922',
p@21 1038 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 1039 'oauth_timestamp': int(time.time()),
p@21 1040 'bar': 'blerg',
p@21 1041 'multi': ['foo','bar'],
p@21 1042 'foo': 59
p@21 1043 }
p@21 1044
p@21 1045 consumer = oauth.Consumer(key="consumer-key",
p@21 1046 secret="consumer-secret")
p@21 1047 token = oauth.Token(key="token-key", secret="token-secret")
p@21 1048
p@21 1049 params['oauth_token'] = token.key
p@21 1050 params['oauth_consumer_key'] = consumer.key
p@21 1051 request = oauth.Request(method="GET", url=url, parameters=params)
p@21 1052
p@21 1053 signature_method = oauth.SignatureMethod_HMAC_SHA1()
p@21 1054 request.sign_request(signature_method, consumer, token)
p@21 1055
p@21 1056 server = oauth.Server()
p@21 1057 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
p@21 1058
p@21 1059 self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
p@21 1060
p@21 1061 def test_invalid_signature_method(self):
p@21 1062 url = "http://sp.example.com/"
p@21 1063
p@21 1064 params = {
p@21 1065 'oauth_version': '1.0',
p@21 1066 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 1067 'oauth_timestamp': int(time.time()),
p@21 1068 'bar': 'blerg',
p@21 1069 'multi': ['FOO','BAR'],
p@21 1070 'foo': 59
p@21 1071 }
p@21 1072
p@21 1073 consumer = oauth.Consumer(key="consumer-key",
p@21 1074 secret="consumer-secret")
p@21 1075 token = oauth.Token(key="token-key", secret="token-secret")
p@21 1076
p@21 1077 params['oauth_token'] = token.key
p@21 1078 params['oauth_consumer_key'] = consumer.key
p@21 1079 request = oauth.Request(method="GET", url=url, parameters=params)
p@21 1080
p@21 1081 signature_method = SignatureMethod_Bad()
p@21 1082 request.sign_request(signature_method, consumer, token)
p@21 1083
p@21 1084 server = oauth.Server()
p@21 1085 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
p@21 1086
p@21 1087 self.assertRaises(oauth.Error, server.verify_request, request,
p@21 1088 consumer, token)
p@21 1089
p@21 1090 def test_missing_signature(self):
p@21 1091 url = "http://sp.example.com/"
p@21 1092
p@21 1093 params = {
p@21 1094 'oauth_version': '1.0',
p@21 1095 'oauth_nonce': "4572616e48616d6d65724c61686176",
p@21 1096 'oauth_timestamp': int(time.time()),
p@21 1097 'bar': 'blerg',
p@21 1098 'multi': ['FOO','BAR'],
p@21 1099 'foo': 59
p@21 1100 }
p@21 1101
p@21 1102 consumer = oauth.Consumer(key="consumer-key",
p@21 1103 secret="consumer-secret")
p@21 1104 token = oauth.Token(key="token-key", secret="token-secret")
p@21 1105
p@21 1106 params['oauth_token'] = token.key
p@21 1107 params['oauth_consumer_key'] = consumer.key
p@21 1108 request = oauth.Request(method="GET", url=url, parameters=params)
p@21 1109
p@21 1110 signature_method = oauth.SignatureMethod_HMAC_SHA1()
p@21 1111 request.sign_request(signature_method, consumer, token)
p@21 1112 del request['oauth_signature']
p@21 1113
p@21 1114 server = oauth.Server()
p@21 1115 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
p@21 1116
p@21 1117 self.assertRaises(oauth.MissingSignature, server.verify_request,
p@21 1118 request, consumer, token)
p@21 1119
p@21 1120
p@21 1121 # Request Token: http://oauth-sandbox.sevengoslings.net/request_token
p@21 1122 # Auth: http://oauth-sandbox.sevengoslings.net/authorize
p@21 1123 # Access Token: http://oauth-sandbox.sevengoslings.net/access_token
p@21 1124 # Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged
p@21 1125 # Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged
p@21 1126 # Key: bd37aed57e15df53
p@21 1127 # Secret: 0e9e6413a9ef49510a4f68ed02cd
p@21 1128 class TestClient(unittest.TestCase):
p@21 1129 # oauth_uris = {
p@21 1130 # 'request_token': '/request_token.php',
p@21 1131 # 'access_token': '/access_token.php'
p@21 1132 # }
p@21 1133
p@21 1134 oauth_uris = {
p@21 1135 'request_token': '/request_token',
p@21 1136 'authorize': '/authorize',
p@21 1137 'access_token': '/access_token',
p@21 1138 'two_legged': '/two_legged',
p@21 1139 'three_legged': '/three_legged'
p@21 1140 }
p@21 1141
p@21 1142 consumer_key = 'bd37aed57e15df53'
p@21 1143 consumer_secret = '0e9e6413a9ef49510a4f68ed02cd'
p@21 1144 host = 'http://oauth-sandbox.sevengoslings.net'
p@21 1145
p@21 1146 def setUp(self):
p@21 1147 self.consumer = oauth.Consumer(key=self.consumer_key,
p@21 1148 secret=self.consumer_secret)
p@21 1149
p@21 1150 self.body = {
p@21 1151 'foo': 'bar',
p@21 1152 'bar': 'foo',
p@21 1153 'multi': ['FOO','BAR'],
p@21 1154 'blah': 599999
p@21 1155 }
p@21 1156
p@21 1157 def _uri(self, type):
p@21 1158 uri = self.oauth_uris.get(type)
p@21 1159 if uri is None:
p@21 1160 raise KeyError("%s is not a valid OAuth URI type." % type)
p@21 1161
p@21 1162 return "%s%s" % (self.host, uri)
p@21 1163
p@21 1164 def create_simple_multipart_data(self, data):
p@21 1165 boundary = '---Boundary-%d' % random.randint(1,1000)
p@21 1166 crlf = '\r\n'
p@21 1167 items = []
p@21 1168 for key, value in data.iteritems():
p@21 1169 items += [
p@21 1170 '--'+boundary,
p@21 1171 'Content-Disposition: form-data; name="%s"'%str(key),
p@21 1172 '',
p@21 1173 str(value),
p@21 1174 ]
p@21 1175 items += ['', '--'+boundary+'--', '']
p@21 1176 content_type = 'multipart/form-data; boundary=%s' % boundary
p@21 1177 return content_type, crlf.join(items)
p@21 1178
p@21 1179 def test_init(self):
p@21 1180 class Blah():
p@21 1181 pass
p@21 1182
p@21 1183 try:
p@21 1184 client = oauth.Client(Blah())
p@21 1185 self.fail("Client.__init__() accepted invalid Consumer.")
p@21 1186 except ValueError:
p@21 1187 pass
p@21 1188
p@21 1189 consumer = oauth.Consumer('token', 'secret')
p@21 1190 try:
p@21 1191 client = oauth.Client(consumer, Blah())
p@21 1192 self.fail("Client.__init__() accepted invalid Token.")
p@21 1193 except ValueError:
p@21 1194 pass
p@21 1195
p@21 1196 def test_access_token_get(self):
p@21 1197 """Test getting an access token via GET."""
p@21 1198 client = oauth.Client(self.consumer, None)
p@21 1199 resp, content = client.request(self._uri('request_token'), "GET")
p@21 1200
p@21 1201 self.assertEquals(int(resp['status']), 200)
p@21 1202
p@21 1203 def test_access_token_post(self):
p@21 1204 """Test getting an access token via POST."""
p@21 1205 client = oauth.Client(self.consumer, None)
p@21 1206 resp, content = client.request(self._uri('request_token'), "POST")
p@21 1207
p@21 1208 self.assertEquals(int(resp['status']), 200)
p@21 1209
p@21 1210 res = dict(parse_qsl(content))
p@21 1211 self.assertTrue('oauth_token' in res)
p@21 1212 self.assertTrue('oauth_token_secret' in res)
p@21 1213
p@21 1214 def _two_legged(self, method):
p@21 1215 client = oauth.Client(self.consumer, None)
p@21 1216
p@21 1217 return client.request(self._uri('two_legged'), method,
p@21 1218 body=urllib.urlencode(self.body))
p@21 1219
p@21 1220 def test_two_legged_post(self):
p@21 1221 """A test of a two-legged OAuth POST request."""
p@21 1222 resp, content = self._two_legged("POST")
p@21 1223
p@21 1224 self.assertEquals(int(resp['status']), 200)
p@21 1225
p@21 1226 def test_two_legged_get(self):
p@21 1227 """A test of a two-legged OAuth GET request."""
p@21 1228 resp, content = self._two_legged("GET")
p@21 1229 self.assertEquals(int(resp['status']), 200)
p@21 1230
p@21 1231 @mock.patch('httplib2.Http.request')
p@21 1232 def test_multipart_post_does_not_alter_body(self, mockHttpRequest):
p@21 1233 random_result = random.randint(1,100)
p@21 1234
p@21 1235 data = {
p@21 1236 'rand-%d'%random.randint(1,100):random.randint(1,100),
p@21 1237 }
p@21 1238 content_type, body = self.create_simple_multipart_data(data)
p@21 1239
p@21 1240 client = oauth.Client(self.consumer, None)
p@21 1241 uri = self._uri('two_legged')
p@21 1242
p@21 1243 def mockrequest(cl, ur, **kw):
p@21 1244 self.failUnless(cl is client)
p@21 1245 self.failUnless(ur is uri)
p@21 1246 self.failUnlessEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers']))
p@21 1247 self.failUnlessEqual(kw['body'], body)
p@21 1248 self.failUnlessEqual(kw['connection_type'], None)
p@21 1249 self.failUnlessEqual(kw['method'], 'POST')
p@21 1250 self.failUnlessEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS)
p@21 1251 self.failUnless(isinstance(kw['headers'], dict))
p@21 1252
p@21 1253 return random_result
p@21 1254
p@21 1255 mockHttpRequest.side_effect = mockrequest
p@21 1256
p@21 1257 result = client.request(uri, 'POST', headers={'Content-Type':content_type}, body=body)
p@21 1258 self.assertEqual(result, random_result)
p@21 1259
p@21 1260 @mock.patch('httplib2.Http.request')
p@21 1261 def test_url_with_query_string(self, mockHttpRequest):
p@21 1262 uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf'
p@21 1263 client = oauth.Client(self.consumer, None)
p@21 1264 random_result = random.randint(1,100)
p@21 1265
p@21 1266 def mockrequest(cl, ur, **kw):
p@21 1267 self.failUnless(cl is client)
p@21 1268 self.failUnlessEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers']))
p@21 1269 self.failUnlessEqual(kw['body'], '')
p@21 1270 self.failUnlessEqual(kw['connection_type'], None)
p@21 1271 self.failUnlessEqual(kw['method'], 'GET')
p@21 1272 self.failUnlessEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS)
p@21 1273 self.failUnless(isinstance(kw['headers'], dict))
p@21 1274
p@21 1275 req = oauth.Request.from_consumer_and_token(self.consumer, None,
p@21 1276 http_method='GET', http_url=uri, parameters={})
p@21 1277 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.consumer, None)
p@21 1278 expected = parse_qsl(urlparse.urlparse(req.to_url()).query)
p@21 1279 actual = parse_qsl(urlparse.urlparse(ur).query)
p@21 1280 self.failUnlessEqual(len(expected), len(actual))
p@21 1281 actual = dict(actual)
p@21 1282 for key, value in expected:
p@21 1283 if key not in ('oauth_signature', 'oauth_nonce', 'oauth_timestamp'):
p@21 1284 self.failUnlessEqual(actual[key], value)
p@21 1285
p@21 1286 return random_result
p@21 1287
p@21 1288 mockHttpRequest.side_effect = mockrequest
p@21 1289
p@21 1290 client.request(uri, 'GET')
p@21 1291
p@21 1292 @mock.patch('httplib2.Http.request')
p@21 1293 @mock.patch('oauth2.Request.from_consumer_and_token')
p@21 1294 def test_multiple_values_for_a_key(self, mockReqConstructor, mockHttpRequest):
p@21 1295 client = oauth.Client(self.consumer, None)
p@21 1296
p@21 1297 request = oauth.Request("GET", "http://example.com/fetch.php", parameters={'multi': ['1', '2']})
p@21 1298 mockReqConstructor.return_value = request
p@21 1299
p@21 1300 client.request('http://whatever', 'POST', body='multi=1&multi=2')
p@21 1301
p@21 1302 self.failUnlessEqual(mockReqConstructor.call_count, 1)
p@21 1303 self.failUnlessEqual(mockReqConstructor.call_args[1]['parameters'], {'multi': ['1', '2']})
p@21 1304
p@21 1305 self.failUnless('multi=1' in mockHttpRequest.call_args[1]['body'])
p@21 1306 self.failUnless('multi=2' in mockHttpRequest.call_args[1]['body'])
p@21 1307
p@21 1308 if __name__ == "__main__":
p@21 1309 unittest.main()