p@21: # -*- coding: utf-8 -*- p@21: p@21: """ p@21: The MIT License p@21: p@21: Copyright (c) 2009 Vic Fryzel p@21: p@21: Permission is hereby granted, free of charge, to any person obtaining a copy p@21: of this software and associated documentation files (the "Software"), to deal p@21: in the Software without restriction, including without limitation the rights p@21: to use, copy, modify, merge, publish, distribute, sublicense, and/or sell p@21: copies of the Software, and to permit persons to whom the Software is p@21: furnished to do so, subject to the following conditions: p@21: p@21: The above copyright notice and this permission notice shall be included in p@21: all copies or substantial portions of the Software. p@21: p@21: THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR p@21: IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, p@21: FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE p@21: AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER p@21: LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, p@21: OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN p@21: THE SOFTWARE. p@21: """ p@21: import sys p@21: import os p@21: import unittest p@21: import oauth2 as oauth p@21: import random p@21: import time p@21: import urllib p@21: import urlparse p@21: from types import ListType p@21: import mock p@21: import httplib2 p@21: p@21: # Fix for python2.5 compatibility p@21: try: p@21: from urlparse import parse_qs, parse_qsl p@21: except ImportError: p@21: from cgi import parse_qs, parse_qsl p@21: p@21: p@21: sys.path[0:0] = [os.path.join(os.path.dirname(__file__), ".."),] p@21: p@21: p@21: class TestError(unittest.TestCase): p@21: def test_message(self): p@21: try: p@21: raise oauth.Error p@21: except oauth.Error, e: p@21: self.assertEqual(e.message, 'OAuth error occurred.') p@21: p@21: msg = 'OMG THINGS BROKE!!!!' p@21: try: p@21: raise oauth.Error(msg) p@21: except oauth.Error, e: p@21: self.assertEqual(e.message, msg) p@21: p@21: def test_str(self): p@21: try: p@21: raise oauth.Error p@21: except oauth.Error, e: p@21: self.assertEquals(str(e), 'OAuth error occurred.') p@21: p@21: class TestGenerateFunctions(unittest.TestCase): p@21: def test_build_auth_header(self): p@21: header = oauth.build_authenticate_header() p@21: self.assertEqual(header['WWW-Authenticate'], 'OAuth realm=""') p@21: self.assertEqual(len(header), 1) p@21: realm = 'http://example.myrealm.com/' p@21: header = oauth.build_authenticate_header(realm) p@21: self.assertEqual(header['WWW-Authenticate'], 'OAuth realm="%s"' % p@21: realm) p@21: self.assertEqual(len(header), 1) p@21: p@21: def test_build_xoauth_string(self): p@21: consumer = oauth.Consumer('consumer_token', 'consumer_secret') p@21: token = oauth.Token('user_token', 'user_secret') p@21: url = "https://mail.google.com/mail/b/joe@example.com/imap/" p@21: xoauth_string = oauth.build_xoauth_string(url, consumer, token) p@21: p@21: method, oauth_url, oauth_string = xoauth_string.split(' ') p@21: p@21: self.assertEqual("GET", method) p@21: self.assertEqual(url, oauth_url) p@21: p@21: returned = {} p@21: parts = oauth_string.split(',') p@21: for part in parts: p@21: var, val = part.split('=') p@21: returned[var] = val.strip('"') p@21: p@21: self.assertEquals('HMAC-SHA1', returned['oauth_signature_method']) p@21: self.assertEquals('user_token', returned['oauth_token']) p@21: self.assertEquals('consumer_token', returned['oauth_consumer_key']) p@21: self.assertTrue('oauth_signature' in returned, 'oauth_signature') p@21: p@21: def test_escape(self): p@21: string = 'http://whatever.com/~someuser/?test=test&other=other' p@21: self.assert_('~' in oauth.escape(string)) p@21: string = '../../../../../../../etc/passwd' p@21: self.assert_('../' not in oauth.escape(string)) p@21: p@21: def test_gen_nonce(self): p@21: nonce = oauth.generate_nonce() p@21: self.assertEqual(len(nonce), 8) p@21: nonce = oauth.generate_nonce(20) p@21: self.assertEqual(len(nonce), 20) p@21: p@21: def test_gen_verifier(self): p@21: verifier = oauth.generate_verifier() p@21: self.assertEqual(len(verifier), 8) p@21: verifier = oauth.generate_verifier(16) p@21: self.assertEqual(len(verifier), 16) p@21: p@21: def test_gen_timestamp(self): p@21: exp = int(time.time()) p@21: now = oauth.generate_timestamp() p@21: self.assertEqual(exp, now) p@21: p@21: class TestConsumer(unittest.TestCase): p@21: def setUp(self): p@21: self.key = 'my-key' p@21: self.secret = 'my-secret' p@21: self.consumer = oauth.Consumer(key=self.key, secret=self.secret) p@21: p@21: def test_init(self): p@21: self.assertEqual(self.consumer.key, self.key) p@21: self.assertEqual(self.consumer.secret, self.secret) p@21: p@21: def test_basic(self): p@21: self.assertRaises(ValueError, lambda: oauth.Consumer(None, None)) p@21: self.assertRaises(ValueError, lambda: oauth.Consumer('asf', None)) p@21: self.assertRaises(ValueError, lambda: oauth.Consumer(None, 'dasf')) p@21: p@21: def test_str(self): p@21: res = dict(parse_qsl(str(self.consumer))) p@21: self.assertTrue('oauth_consumer_key' in res) p@21: self.assertTrue('oauth_consumer_secret' in res) p@21: self.assertEquals(res['oauth_consumer_key'], self.consumer.key) p@21: self.assertEquals(res['oauth_consumer_secret'], self.consumer.secret) p@21: p@21: class TestToken(unittest.TestCase): p@21: def setUp(self): p@21: self.key = 'my-key' p@21: self.secret = 'my-secret' p@21: self.token = oauth.Token(self.key, self.secret) p@21: p@21: def test_basic(self): p@21: self.assertRaises(ValueError, lambda: oauth.Token(None, None)) p@21: self.assertRaises(ValueError, lambda: oauth.Token('asf', None)) p@21: self.assertRaises(ValueError, lambda: oauth.Token(None, 'dasf')) p@21: p@21: def test_init(self): p@21: self.assertEqual(self.token.key, self.key) p@21: self.assertEqual(self.token.secret, self.secret) p@21: self.assertEqual(self.token.callback, None) p@21: self.assertEqual(self.token.callback_confirmed, None) p@21: self.assertEqual(self.token.verifier, None) p@21: p@21: def test_set_callback(self): p@21: self.assertEqual(self.token.callback, None) p@21: self.assertEqual(self.token.callback_confirmed, None) p@21: cb = 'http://www.example.com/my-callback' p@21: self.token.set_callback(cb) p@21: self.assertEqual(self.token.callback, cb) p@21: self.assertEqual(self.token.callback_confirmed, 'true') p@21: self.token.set_callback(None) p@21: self.assertEqual(self.token.callback, None) p@21: # TODO: The following test should probably not pass, but it does p@21: # To fix this, check for None and unset 'true' in set_callback p@21: # Additionally, should a confirmation truly be done of the callback? p@21: self.assertEqual(self.token.callback_confirmed, 'true') p@21: p@21: def test_set_verifier(self): p@21: self.assertEqual(self.token.verifier, None) p@21: v = oauth.generate_verifier() p@21: self.token.set_verifier(v) p@21: self.assertEqual(self.token.verifier, v) p@21: self.token.set_verifier() p@21: self.assertNotEqual(self.token.verifier, v) p@21: self.token.set_verifier('') p@21: self.assertEqual(self.token.verifier, '') p@21: p@21: def test_get_callback_url(self): p@21: self.assertEqual(self.token.get_callback_url(), None) p@21: p@21: self.token.set_verifier() p@21: self.assertEqual(self.token.get_callback_url(), None) p@21: p@21: cb = 'http://www.example.com/my-callback?save=1&return=true' p@21: v = oauth.generate_verifier() p@21: self.token.set_callback(cb) p@21: self.token.set_verifier(v) p@21: url = self.token.get_callback_url() p@21: verifier_str = '&oauth_verifier=%s' % v p@21: self.assertEqual(url, '%s%s' % (cb, verifier_str)) p@21: p@21: cb = 'http://www.example.com/my-callback-no-query' p@21: v = oauth.generate_verifier() p@21: self.token.set_callback(cb) p@21: self.token.set_verifier(v) p@21: url = self.token.get_callback_url() p@21: verifier_str = '?oauth_verifier=%s' % v p@21: self.assertEqual(url, '%s%s' % (cb, verifier_str)) p@21: p@21: def test_to_string(self): p@21: string = 'oauth_token_secret=%s&oauth_token=%s' % (self.secret, p@21: self.key) p@21: self.assertEqual(self.token.to_string(), string) p@21: p@21: self.token.set_callback('http://www.example.com/my-callback') p@21: string += '&oauth_callback_confirmed=true' p@21: self.assertEqual(self.token.to_string(), string) p@21: p@21: def _compare_tokens(self, new): p@21: self.assertEqual(self.token.key, new.key) p@21: self.assertEqual(self.token.secret, new.secret) p@21: # TODO: What about copying the callback to the new token? p@21: # self.assertEqual(self.token.callback, new.callback) p@21: self.assertEqual(self.token.callback_confirmed, p@21: new.callback_confirmed) p@21: # TODO: What about copying the verifier to the new token? p@21: # self.assertEqual(self.token.verifier, new.verifier) p@21: p@21: def test_to_string(self): p@21: tok = oauth.Token('tooken', 'seecret') p@21: self.assertEqual(str(tok), 'oauth_token_secret=seecret&oauth_token=tooken') p@21: p@21: def test_from_string(self): p@21: self.assertRaises(ValueError, lambda: oauth.Token.from_string('')) p@21: self.assertRaises(ValueError, lambda: oauth.Token.from_string('blahblahblah')) p@21: self.assertRaises(ValueError, lambda: oauth.Token.from_string('blah=blah')) p@21: p@21: self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret=asfdasf')) p@21: self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret=')) p@21: self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=asfdasf')) p@21: self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=')) p@21: self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=&oauth_token_secret=')) p@21: self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=tooken%26oauth_token_secret=seecret')) p@21: p@21: string = self.token.to_string() p@21: new = oauth.Token.from_string(string) p@21: self._compare_tokens(new) p@21: p@21: self.token.set_callback('http://www.example.com/my-callback') p@21: string = self.token.to_string() p@21: new = oauth.Token.from_string(string) p@21: self._compare_tokens(new) p@21: p@21: class ReallyEqualMixin: p@21: def failUnlessReallyEqual(self, a, b, msg=None): p@21: self.failUnlessEqual(a, b, msg=msg) p@21: self.failUnlessEqual(type(a), type(b), msg="a :: %r, b :: %r, %r" % (a, b, msg)) p@21: p@21: class TestFuncs(unittest.TestCase): p@21: def test_to_unicode(self): p@21: self.failUnlessRaises(TypeError, oauth.to_unicode, '\xae') p@21: self.failUnlessRaises(TypeError, oauth.to_unicode_optional_iterator, '\xae') p@21: self.failUnlessRaises(TypeError, oauth.to_unicode_optional_iterator, ['\xae']) p@21: p@21: self.failUnlessEqual(oauth.to_unicode(':-)'), u':-)') p@21: self.failUnlessEqual(oauth.to_unicode(u'\u00ae'), u'\u00ae') p@21: self.failUnlessEqual(oauth.to_unicode('\xc2\xae'), u'\u00ae') p@21: self.failUnlessEqual(oauth.to_unicode_optional_iterator([':-)']), [u':-)']) p@21: self.failUnlessEqual(oauth.to_unicode_optional_iterator([u'\u00ae']), [u'\u00ae']) p@21: p@21: class TestRequest(unittest.TestCase, ReallyEqualMixin): p@21: def test_setter(self): p@21: url = "http://example.com" p@21: method = "GET" p@21: req = oauth.Request(method) p@21: self.assertTrue(not hasattr(req, 'url') or req.url is None) p@21: self.assertTrue(not hasattr(req, 'normalized_url') or req.normalized_url is None) p@21: p@21: def test_deleter(self): p@21: url = "http://example.com" p@21: method = "GET" p@21: req = oauth.Request(method, url) p@21: p@21: try: p@21: del req.url p@21: url = req.url p@21: self.fail("AttributeError should have been raised on empty url.") p@21: except AttributeError: p@21: pass p@21: except Exception, e: p@21: self.fail(str(e)) p@21: p@21: def test_url(self): p@21: url1 = "http://example.com:80/foo.php" p@21: url2 = "https://example.com:443/foo.php" p@21: exp1 = "http://example.com/foo.php" p@21: exp2 = "https://example.com/foo.php" p@21: method = "GET" p@21: p@21: req = oauth.Request(method, url1) p@21: self.assertEquals(req.normalized_url, exp1) p@21: self.assertEquals(req.url, url1) p@21: p@21: req = oauth.Request(method, url2) p@21: self.assertEquals(req.normalized_url, exp2) p@21: self.assertEquals(req.url, url2) p@21: p@21: def test_bad_url(self): p@21: request = oauth.Request() p@21: try: p@21: request.url = "ftp://example.com" p@21: self.fail("Invalid URL scheme was accepted.") p@21: except ValueError: p@21: pass p@21: p@21: def test_unset_consumer_and_token(self): p@21: consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret') p@21: token = oauth.Token('my_key', 'my_secret') p@21: request = oauth.Request("GET", "http://example.com/fetch.php") p@21: request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, p@21: token) p@21: p@21: self.assertEquals(consumer.key, request['oauth_consumer_key']) p@21: self.assertEquals(token.key, request['oauth_token']) p@21: p@21: def test_no_url_set(self): p@21: consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret') p@21: token = oauth.Token('my_key', 'my_secret') p@21: request = oauth.Request() p@21: p@21: try: p@21: try: p@21: request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), p@21: consumer, token) p@21: except TypeError: p@21: self.fail("Signature method didn't check for a normalized URL.") p@21: except ValueError: p@21: pass p@21: p@21: def test_url_query(self): p@21: url = "https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10" p@21: normalized_url = urlparse.urlunparse(urlparse.urlparse(url)[:3] + (None, None, None)) p@21: method = "GET" p@21: p@21: req = oauth.Request(method, url) p@21: self.assertEquals(req.url, url) p@21: self.assertEquals(req.normalized_url, normalized_url) p@21: p@21: def test_get_parameter(self): p@21: url = "http://example.com" p@21: method = "GET" p@21: params = {'oauth_consumer' : 'asdf'} p@21: req = oauth.Request(method, url, parameters=params) p@21: p@21: self.assertEquals(req.get_parameter('oauth_consumer'), 'asdf') p@21: self.assertRaises(oauth.Error, req.get_parameter, 'blah') p@21: p@21: def test_get_nonoauth_parameters(self): p@21: p@21: oauth_params = { p@21: 'oauth_consumer': 'asdfasdfasdf' p@21: } p@21: p@21: other_params = { p@21: u'foo': u'baz', p@21: u'bar': u'foo', p@21: u'multi': [u'FOO',u'BAR'], p@21: u'uni_utf8': u'\xae', p@21: u'uni_unicode': u'\u00ae', p@21: u'uni_unicode_2': u'åÅøØ', p@21: } p@21: p@21: params = oauth_params p@21: params.update(other_params) p@21: p@21: req = oauth.Request("GET", "http://example.com", params) p@21: self.assertEquals(other_params, req.get_nonoauth_parameters()) p@21: p@21: def test_to_header(self): p@21: realm = "http://sp.example.com/" p@21: p@21: params = { p@21: 'oauth_version': "1.0", p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': "137131200", p@21: 'oauth_consumer_key': "0685bd9184jfhq22", p@21: 'oauth_signature_method': "HMAC-SHA1", p@21: 'oauth_token': "ad180jjd733klru7", p@21: 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", p@21: } p@21: p@21: req = oauth.Request("GET", realm, params) p@21: header, value = req.to_header(realm).items()[0] p@21: p@21: parts = value.split('OAuth ') p@21: vars = parts[1].split(', ') p@21: self.assertTrue(len(vars), (len(params) + 1)) p@21: p@21: res = {} p@21: for v in vars: p@21: var, val = v.split('=') p@21: res[var] = urllib.unquote(val.strip('"')) p@21: p@21: self.assertEquals(realm, res['realm']) p@21: del res['realm'] p@21: p@21: self.assertTrue(len(res), len(params)) p@21: p@21: for key, val in res.items(): p@21: self.assertEquals(val, params.get(key)) p@21: p@21: def test_to_postdata_nonascii(self): p@21: realm = "http://sp.example.com/" p@21: p@21: params = { p@21: 'nonasciithing': u'q\xbfu\xe9 ,aasp u?..a.s', p@21: 'oauth_version': "1.0", p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': "137131200", p@21: 'oauth_consumer_key': "0685bd9184jfhq22", p@21: 'oauth_signature_method': "HMAC-SHA1", p@21: 'oauth_token': "ad180jjd733klru7", p@21: 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", p@21: } p@21: p@21: req = oauth.Request("GET", realm, params) p@21: p@21: self.failUnlessReallyEqual(req.to_postdata(), 'nonasciithing=q%C2%BFu%C3%A9%20%2Caasp%20u%3F..a.s&oauth_nonce=4572616e48616d6d65724c61686176&oauth_timestamp=137131200&oauth_consumer_key=0685bd9184jfhq22&oauth_signature_method=HMAC-SHA1&oauth_version=1.0&oauth_token=ad180jjd733klru7&oauth_signature=wOJIO9A2W5mFwDgiDvZbTSMK%252FPY%253D') p@21: p@21: def test_to_postdata(self): p@21: realm = "http://sp.example.com/" p@21: p@21: params = { p@21: 'multi': ['FOO','BAR'], p@21: 'oauth_version': "1.0", p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': "137131200", p@21: 'oauth_consumer_key': "0685bd9184jfhq22", p@21: 'oauth_signature_method': "HMAC-SHA1", p@21: 'oauth_token': "ad180jjd733klru7", p@21: 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", p@21: } p@21: p@21: req = oauth.Request("GET", realm, params) p@21: p@21: flat = [('multi','FOO'),('multi','BAR')] p@21: del params['multi'] p@21: flat.extend(params.items()) p@21: kf = lambda x: x[0] p@21: self.assertEquals(sorted(flat, key=kf), sorted(parse_qsl(req.to_postdata()), key=kf)) p@21: p@21: def test_to_url(self): p@21: url = "http://sp.example.com/" p@21: p@21: params = { p@21: 'oauth_version': "1.0", p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': "137131200", p@21: 'oauth_consumer_key': "0685bd9184jfhq22", p@21: 'oauth_signature_method': "HMAC-SHA1", p@21: 'oauth_token': "ad180jjd733klru7", p@21: 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", p@21: } p@21: p@21: req = oauth.Request("GET", url, params) p@21: exp = urlparse.urlparse("%s?%s" % (url, urllib.urlencode(params))) p@21: res = urlparse.urlparse(req.to_url()) p@21: self.assertEquals(exp.scheme, res.scheme) p@21: self.assertEquals(exp.netloc, res.netloc) p@21: self.assertEquals(exp.path, res.path) p@21: p@21: a = parse_qs(exp.query) p@21: b = parse_qs(res.query) p@21: self.assertEquals(a, b) p@21: p@21: def test_to_url_with_query(self): p@21: url = "https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10" p@21: p@21: params = { p@21: 'oauth_version': "1.0", p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': "137131200", p@21: 'oauth_consumer_key': "0685bd9184jfhq22", p@21: 'oauth_signature_method': "HMAC-SHA1", p@21: 'oauth_token': "ad180jjd733klru7", p@21: 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", p@21: } p@21: p@21: req = oauth.Request("GET", url, params) p@21: # Note: the url above already has query parameters, so append new ones with & p@21: exp = urlparse.urlparse("%s&%s" % (url, urllib.urlencode(params))) p@21: res = urlparse.urlparse(req.to_url()) p@21: self.assertEquals(exp.scheme, res.scheme) p@21: self.assertEquals(exp.netloc, res.netloc) p@21: self.assertEquals(exp.path, res.path) p@21: p@21: a = parse_qs(exp.query) p@21: b = parse_qs(res.query) p@21: self.assertTrue('alt' in b) p@21: self.assertTrue('max-contacts' in b) p@21: self.assertEquals(b['alt'], ['json']) p@21: self.assertEquals(b['max-contacts'], ['10']) p@21: self.assertEquals(a, b) p@21: p@21: def test_signature_base_string_nonascii_nonutf8(self): p@21: consumer = oauth.Consumer('consumer_token', 'consumer_secret') p@21: p@21: url = u'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc\u2766,+CA' p@21: req = oauth.Request("GET", url) p@21: self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json') p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) p@21: self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=') p@21: p@21: url = 'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc\xe2\x9d\xa6,+CA' p@21: req = oauth.Request("GET", url) p@21: self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json') p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) p@21: self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=') p@21: p@21: url = 'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA' p@21: req = oauth.Request("GET", url) p@21: self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json') p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) p@21: self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=') p@21: p@21: url = u'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA' p@21: req = oauth.Request("GET", url) p@21: self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json') p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) p@21: self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=') p@21: p@21: def test_signature_base_string_with_query(self): p@21: url = "https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10" p@21: params = { p@21: 'oauth_version': "1.0", p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': "137131200", p@21: 'oauth_consumer_key': "0685bd9184jfhq22", p@21: 'oauth_signature_method': "HMAC-SHA1", p@21: 'oauth_token': "ad180jjd733klru7", p@21: 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", p@21: } p@21: req = oauth.Request("GET", url, params) p@21: self.assertEquals(req.normalized_url, 'https://www.google.com/m8/feeds/contacts/default/full/') p@21: self.assertEquals(req.url, 'https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10') p@21: normalized_params = parse_qsl(req.get_normalized_parameters()) p@21: self.assertTrue(len(normalized_params), len(params) + 2) p@21: normalized_params = dict(normalized_params) p@21: for key, value in params.iteritems(): p@21: if key == 'oauth_signature': p@21: continue p@21: self.assertEquals(value, normalized_params[key]) p@21: self.assertEquals(normalized_params['alt'], 'json') p@21: self.assertEquals(normalized_params['max-contacts'], '10') p@21: p@21: def test_get_normalized_parameters_empty(self): p@21: url = "http://sp.example.com/?empty=" p@21: p@21: req = oauth.Request("GET", url) p@21: p@21: res = req.get_normalized_parameters() p@21: p@21: expected='empty=' p@21: p@21: self.assertEquals(expected, res) p@21: p@21: def test_get_normalized_parameters_duplicate(self): p@21: url = "http://example.com/v2/search/videos?oauth_nonce=79815175&oauth_timestamp=1295397962&oauth_consumer_key=mykey&oauth_signature_method=HMAC-SHA1&q=car&oauth_version=1.0&offset=10&oauth_signature=spWLI%2FGQjid7sQVd5%2FarahRxzJg%3D" p@21: p@21: req = oauth.Request("GET", url) p@21: p@21: res = req.get_normalized_parameters() p@21: p@21: expected='oauth_consumer_key=mykey&oauth_nonce=79815175&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1295397962&oauth_version=1.0&offset=10&q=car' p@21: p@21: self.assertEquals(expected, res) p@21: p@21: def test_get_normalized_parameters_from_url(self): p@21: # example copied from p@21: # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js p@21: # which in turns says that it was copied from p@21: # http://oauth.net/core/1.0/#sig_base_example . p@21: url = "http://photos.example.net/photos?file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk&oauth_version=1.0&size=original" p@21: p@21: req = oauth.Request("GET", url) p@21: p@21: res = req.get_normalized_parameters() p@21: p@21: expected = 'file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk&oauth_version=1.0&size=original' p@21: p@21: self.assertEquals(expected, res) p@21: p@21: def test_signing_base(self): p@21: # example copied from p@21: # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js p@21: # which in turns says that it was copied from p@21: # http://oauth.net/core/1.0/#sig_base_example . p@21: url = "http://photos.example.net/photos?file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk&oauth_version=1.0&size=original" p@21: p@21: req = oauth.Request("GET", url) p@21: p@21: sm = oauth.SignatureMethod_HMAC_SHA1() p@21: p@21: consumer = oauth.Consumer('dpf43f3p2l4k3l03', 'foo') p@21: key, raw = sm.signing_base(req, consumer, None) p@21: p@21: expected = 'GET&http%3A%2F%2Fphotos.example.net%2Fphotos&file%3Dvacation.jpg%26oauth_consumer_key%3Ddpf43f3p2l4k3l03%26oauth_nonce%3Dkllo9940pd9333jh%26oauth_signature_method%3DHMAC-SHA1%26oauth_timestamp%3D1191242096%26oauth_token%3Dnnch734d00sl2jdk%26oauth_version%3D1.0%26size%3Doriginal' p@21: self.assertEquals(expected, raw) p@21: p@21: def test_get_normalized_parameters(self): p@21: url = "http://sp.example.com/" p@21: p@21: params = { p@21: 'oauth_version': "1.0", p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': "137131200", p@21: 'oauth_consumer_key': "0685bd9184jfhq22", p@21: 'oauth_signature_method': "HMAC-SHA1", p@21: 'oauth_token': "ad180jjd733klru7", p@21: 'multi': ['FOO','BAR', u'\u00ae', '\xc2\xae'], p@21: 'multi_same': ['FOO','FOO'], p@21: 'uni_utf8_bytes': '\xc2\xae', p@21: 'uni_unicode_object': u'\u00ae' p@21: } p@21: p@21: req = oauth.Request("GET", url, params) p@21: p@21: res = req.get_normalized_parameters() p@21: p@21: expected='multi=BAR&multi=FOO&multi=%C2%AE&multi=%C2%AE&multi_same=FOO&multi_same=FOO&oauth_consumer_key=0685bd9184jfhq22&oauth_nonce=4572616e48616d6d65724c61686176&oauth_signature_method=HMAC-SHA1&oauth_timestamp=137131200&oauth_token=ad180jjd733klru7&oauth_version=1.0&uni_unicode_object=%C2%AE&uni_utf8_bytes=%C2%AE' p@21: p@21: self.assertEquals(expected, res) p@21: p@21: def test_get_normalized_parameters_ignores_auth_signature(self): p@21: url = "http://sp.example.com/" p@21: p@21: params = { p@21: 'oauth_version': "1.0", p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': "137131200", p@21: 'oauth_consumer_key': "0685bd9184jfhq22", p@21: 'oauth_signature_method': "HMAC-SHA1", p@21: 'oauth_signature': "some-random-signature-%d" % random.randint(1000, 2000), p@21: 'oauth_token': "ad180jjd733klru7", p@21: } p@21: p@21: req = oauth.Request("GET", url, params) p@21: p@21: res = req.get_normalized_parameters() p@21: p@21: self.assertNotEquals(urllib.urlencode(sorted(params.items())), res) p@21: p@21: foo = params.copy() p@21: del foo["oauth_signature"] p@21: self.assertEqual(urllib.urlencode(sorted(foo.items())), res) p@21: p@21: def test_set_signature_method(self): p@21: consumer = oauth.Consumer('key', 'secret') p@21: client = oauth.Client(consumer) p@21: p@21: class Blah: p@21: pass p@21: p@21: try: p@21: client.set_signature_method(Blah()) p@21: self.fail("Client.set_signature_method() accepted invalid method.") p@21: except ValueError: p@21: pass p@21: p@21: m = oauth.SignatureMethod_HMAC_SHA1() p@21: client.set_signature_method(m) p@21: self.assertEquals(m, client.method) p@21: p@21: def test_get_normalized_string_escapes_spaces_properly(self): p@21: url = "http://sp.example.com/" p@21: params = { p@21: "some_random_data": random.randint(100, 1000), p@21: "data": "This data with a random number (%d) has spaces!" % random.randint(1000, 2000), p@21: } p@21: p@21: req = oauth.Request("GET", url, params) p@21: res = req.get_normalized_parameters() p@21: expected = urllib.urlencode(sorted(params.items())).replace('+', '%20') p@21: self.assertEqual(expected, res) p@21: p@21: @mock.patch('oauth2.Request.make_timestamp') p@21: @mock.patch('oauth2.Request.make_nonce') p@21: def test_request_nonutf8_bytes(self, mock_make_nonce, mock_make_timestamp): p@21: mock_make_nonce.return_value = 5 p@21: mock_make_timestamp.return_value = 6 p@21: p@21: tok = oauth.Token(key="tok-test-key", secret="tok-test-secret") p@21: con = oauth.Consumer(key="con-test-key", secret="con-test-secret") p@21: params = { p@21: 'oauth_version': "1.0", p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': "137131200", p@21: 'oauth_token': tok.key, p@21: 'oauth_consumer_key': con.key p@21: } p@21: p@21: # If someone passes a sequence of bytes which is not ascii for p@21: # url, we'll raise an exception as early as possible. p@21: url = "http://sp.example.com/\x92" # It's actually cp1252-encoding... p@21: self.assertRaises(TypeError, oauth.Request, method="GET", url=url, parameters=params) p@21: p@21: # And if they pass an unicode, then we'll use it. p@21: url = u'http://sp.example.com/\u2019' p@21: req = oauth.Request(method="GET", url=url, parameters=params) p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None) p@21: self.failUnlessReallyEqual(req['oauth_signature'], 'cMzvCkhvLL57+sTIxLITTHfkqZk=') p@21: p@21: # And if it is a utf-8-encoded-then-percent-encoded non-ascii p@21: # thing, we'll decode it and use it. p@21: url = "http://sp.example.com/%E2%80%99" p@21: req = oauth.Request(method="GET", url=url, parameters=params) p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None) p@21: self.failUnlessReallyEqual(req['oauth_signature'], 'yMLKOyNKC/DkyhUOb8DLSvceEWE=') p@21: p@21: # Same thing with the params. p@21: url = "http://sp.example.com/" p@21: p@21: # If someone passes a sequence of bytes which is not ascii in p@21: # params, we'll raise an exception as early as possible. p@21: params['non_oauth_thing'] = '\xae', # It's actually cp1252-encoding... p@21: self.assertRaises(TypeError, oauth.Request, method="GET", url=url, parameters=params) p@21: p@21: # And if they pass a unicode, then we'll use it. p@21: params['non_oauth_thing'] = u'\u2019' p@21: req = oauth.Request(method="GET", url=url, parameters=params) p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None) p@21: self.failUnlessReallyEqual(req['oauth_signature'], '0GU50m0v60CVDB5JnoBXnvvvKx4=') p@21: p@21: # And if it is a utf-8-encoded non-ascii thing, we'll decode p@21: # it and use it. p@21: params['non_oauth_thing'] = '\xc2\xae' p@21: req = oauth.Request(method="GET", url=url, parameters=params) p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None) p@21: self.failUnlessReallyEqual(req['oauth_signature'], 'pqOCu4qvRTiGiXB8Z61Jsey0pMM=') p@21: p@21: p@21: # Also if there are non-utf8 bytes in the query args. p@21: url = "http://sp.example.com/?q=\x92" # cp1252 p@21: self.assertRaises(TypeError, oauth.Request, method="GET", url=url, parameters=params) p@21: p@21: def test_request_hash_of_body(self): p@21: tok = oauth.Token(key="token", secret="tok-test-secret") p@21: con = oauth.Consumer(key="consumer", secret="con-test-secret") p@21: p@21: # Example 1a from Appendix A.1 of p@21: # http://oauth.googlecode.com/svn/spec/ext/body_hash/1.0/oauth-bodyhash.html p@21: # Except that we get a differetn result than they do. p@21: p@21: params = { p@21: 'oauth_version': "1.0", p@21: 'oauth_token': tok.key, p@21: 'oauth_nonce': 10288510250934, p@21: 'oauth_timestamp': 1236874155, p@21: 'oauth_consumer_key': con.key p@21: } p@21: p@21: url = u"http://www.example.com/resource" p@21: req = oauth.Request(method="PUT", url=url, parameters=params, body="Hello World!", is_form_encoded=False) p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None) p@21: self.failUnlessReallyEqual(req['oauth_body_hash'], 'Lve95gjOVATpfV8EL5X4nxwjKHE=') p@21: self.failUnlessReallyEqual(req['oauth_signature'], 't+MX8l/0S8hdbVQL99nD0X1fPnM=') p@21: # oauth-bodyhash.html A.1 has p@21: # '08bUFF%2Fjmp59mWB7cSgCYBUpJ0U%3D', but I don't see how that p@21: # is possible. p@21: p@21: # Example 1b p@21: params = { p@21: 'oauth_version': "1.0", p@21: 'oauth_token': tok.key, p@21: 'oauth_nonce': 10369470270925, p@21: 'oauth_timestamp': 1236874236, p@21: 'oauth_consumer_key': con.key p@21: } p@21: p@21: req = oauth.Request(method="PUT", url=url, parameters=params, body="Hello World!", is_form_encoded=False) p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None) p@21: self.failUnlessReallyEqual(req['oauth_body_hash'], 'Lve95gjOVATpfV8EL5X4nxwjKHE=') p@21: self.failUnlessReallyEqual(req['oauth_signature'], 'CTFmrqJIGT7NsWJ42OrujahTtTc=') p@21: p@21: # Appendix A.2 p@21: params = { p@21: 'oauth_version': "1.0", p@21: 'oauth_token': tok.key, p@21: 'oauth_nonce': 8628868109991, p@21: 'oauth_timestamp': 1238395022, p@21: 'oauth_consumer_key': con.key p@21: } p@21: p@21: req = oauth.Request(method="GET", url=url, parameters=params, is_form_encoded=False) p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None) p@21: self.failUnlessReallyEqual(req['oauth_body_hash'], '2jmj7l5rSw0yVb/vlWAYkK/YBwk=') p@21: self.failUnlessReallyEqual(req['oauth_signature'], 'Zhl++aWSP0O3/hYQ0CuBc7jv38I=') p@21: p@21: p@21: def test_sign_request(self): p@21: url = "http://sp.example.com/" p@21: p@21: params = { p@21: 'oauth_version': "1.0", p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': "137131200" p@21: } p@21: p@21: tok = oauth.Token(key="tok-test-key", secret="tok-test-secret") p@21: con = oauth.Consumer(key="con-test-key", secret="con-test-secret") p@21: p@21: params['oauth_token'] = tok.key p@21: params['oauth_consumer_key'] = con.key p@21: req = oauth.Request(method="GET", url=url, parameters=params) p@21: p@21: methods = { p@21: 'DX01TdHws7OninCLK9VztNTH1M4=': oauth.SignatureMethod_HMAC_SHA1(), p@21: 'con-test-secret&tok-test-secret': oauth.SignatureMethod_PLAINTEXT() p@21: } p@21: p@21: for exp, method in methods.items(): p@21: req.sign_request(method, con, tok) p@21: self.assertEquals(req['oauth_signature_method'], method.name) p@21: self.assertEquals(req['oauth_signature'], exp) p@21: p@21: # Also if there are non-ascii chars in the URL. p@21: url = "http://sp.example.com/\xe2\x80\x99" # utf-8 bytes p@21: req = oauth.Request(method="GET", url=url, parameters=params) p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok) p@21: self.assertEquals(req['oauth_signature'], 'loFvp5xC7YbOgd9exIO6TxB7H4s=') p@21: p@21: url = u'http://sp.example.com/\u2019' # Python unicode object p@21: req = oauth.Request(method="GET", url=url, parameters=params) p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok) p@21: self.assertEquals(req['oauth_signature'], 'loFvp5xC7YbOgd9exIO6TxB7H4s=') p@21: p@21: # Also if there are non-ascii chars in the query args. p@21: url = "http://sp.example.com/?q=\xe2\x80\x99" # utf-8 bytes p@21: req = oauth.Request(method="GET", url=url, parameters=params) p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok) p@21: self.assertEquals(req['oauth_signature'], 'IBw5mfvoCsDjgpcsVKbyvsDqQaU=') p@21: p@21: url = u'http://sp.example.com/?q=\u2019' # Python unicode object p@21: req = oauth.Request(method="GET", url=url, parameters=params) p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok) p@21: self.assertEquals(req['oauth_signature'], 'IBw5mfvoCsDjgpcsVKbyvsDqQaU=') p@21: p@21: def test_from_request(self): p@21: url = "http://sp.example.com/" p@21: p@21: params = { p@21: 'oauth_version': "1.0", p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': "137131200", p@21: 'oauth_consumer_key': "0685bd9184jfhq22", p@21: 'oauth_signature_method': "HMAC-SHA1", p@21: 'oauth_token': "ad180jjd733klru7", p@21: 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", p@21: } p@21: p@21: req = oauth.Request("GET", url, params) p@21: headers = req.to_header() p@21: p@21: # Test from the headers p@21: req = oauth.Request.from_request("GET", url, headers) p@21: self.assertEquals(req.method, "GET") p@21: self.assertEquals(req.url, url) p@21: p@21: self.assertEquals(params, req.copy()) p@21: p@21: # Test with bad OAuth headers p@21: bad_headers = { p@21: 'Authorization' : 'OAuth this is a bad header' p@21: } p@21: p@21: self.assertRaises(oauth.Error, oauth.Request.from_request, "GET", p@21: url, bad_headers) p@21: p@21: # Test getting from query string p@21: qs = urllib.urlencode(params) p@21: req = oauth.Request.from_request("GET", url, query_string=qs) p@21: p@21: exp = parse_qs(qs, keep_blank_values=False) p@21: for k, v in exp.iteritems(): p@21: exp[k] = urllib.unquote(v[0]) p@21: p@21: self.assertEquals(exp, req.copy()) p@21: p@21: # Test that a boned from_request() call returns None p@21: req = oauth.Request.from_request("GET", url) p@21: self.assertEquals(None, req) p@21: p@21: def test_from_token_and_callback(self): p@21: url = "http://sp.example.com/" p@21: p@21: params = { p@21: 'oauth_version': "1.0", p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': "137131200", p@21: 'oauth_consumer_key': "0685bd9184jfhq22", p@21: 'oauth_signature_method': "HMAC-SHA1", p@21: 'oauth_token': "ad180jjd733klru7", p@21: 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", p@21: } p@21: p@21: tok = oauth.Token(key="tok-test-key", secret="tok-test-secret") p@21: req = oauth.Request.from_token_and_callback(tok) p@21: self.assertFalse('oauth_callback' in req) p@21: self.assertEquals(req['oauth_token'], tok.key) p@21: p@21: req = oauth.Request.from_token_and_callback(tok, callback=url) p@21: self.assertTrue('oauth_callback' in req) p@21: self.assertEquals(req['oauth_callback'], url) p@21: p@21: def test_from_consumer_and_token(self): p@21: url = "http://sp.example.com/" p@21: p@21: tok = oauth.Token(key="tok-test-key", secret="tok-test-secret") p@21: tok.set_verifier('this_is_a_test_verifier') p@21: con = oauth.Consumer(key="con-test-key", secret="con-test-secret") p@21: req = oauth.Request.from_consumer_and_token(con, token=tok, p@21: http_method="GET", http_url=url) p@21: p@21: self.assertEquals(req['oauth_token'], tok.key) p@21: self.assertEquals(req['oauth_consumer_key'], con.key) p@21: self.assertEquals(tok.verifier, req['oauth_verifier']) p@21: p@21: class SignatureMethod_Bad(oauth.SignatureMethod): p@21: name = "BAD" p@21: p@21: def signing_base(self, request, consumer, token): p@21: return "" p@21: p@21: def sign(self, request, consumer, token): p@21: return "invalid-signature" p@21: p@21: p@21: class TestServer(unittest.TestCase): p@21: def setUp(self): p@21: url = "http://sp.example.com/" p@21: p@21: params = { p@21: 'oauth_version': "1.0", p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': int(time.time()), p@21: 'bar': 'blerg', p@21: 'multi': ['FOO','BAR'], p@21: 'foo': 59 p@21: } p@21: p@21: self.consumer = oauth.Consumer(key="consumer-key", p@21: secret="consumer-secret") p@21: self.token = oauth.Token(key="token-key", secret="token-secret") p@21: p@21: params['oauth_token'] = self.token.key p@21: params['oauth_consumer_key'] = self.consumer.key p@21: self.request = oauth.Request(method="GET", url=url, parameters=params) p@21: p@21: signature_method = oauth.SignatureMethod_HMAC_SHA1() p@21: self.request.sign_request(signature_method, self.consumer, self.token) p@21: p@21: def test_init(self): p@21: server = oauth.Server(signature_methods={'HMAC-SHA1' : oauth.SignatureMethod_HMAC_SHA1()}) p@21: self.assertTrue('HMAC-SHA1' in server.signature_methods) p@21: self.assertTrue(isinstance(server.signature_methods['HMAC-SHA1'], p@21: oauth.SignatureMethod_HMAC_SHA1)) p@21: p@21: server = oauth.Server() p@21: self.assertEquals(server.signature_methods, {}) p@21: p@21: def test_add_signature_method(self): p@21: server = oauth.Server() p@21: res = server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) p@21: self.assertTrue(len(res) == 1) p@21: self.assertTrue('HMAC-SHA1' in res) p@21: self.assertTrue(isinstance(res['HMAC-SHA1'], p@21: oauth.SignatureMethod_HMAC_SHA1)) p@21: p@21: res = server.add_signature_method(oauth.SignatureMethod_PLAINTEXT()) p@21: self.assertTrue(len(res) == 2) p@21: self.assertTrue('PLAINTEXT' in res) p@21: self.assertTrue(isinstance(res['PLAINTEXT'], p@21: oauth.SignatureMethod_PLAINTEXT)) p@21: p@21: def test_verify_request(self): p@21: server = oauth.Server() p@21: server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) p@21: p@21: parameters = server.verify_request(self.request, self.consumer, p@21: self.token) p@21: p@21: self.assertTrue('bar' in parameters) p@21: self.assertTrue('foo' in parameters) p@21: self.assertTrue('multi' in parameters) p@21: self.assertEquals(parameters['bar'], 'blerg') p@21: self.assertEquals(parameters['foo'], 59) p@21: self.assertEquals(parameters['multi'], ['FOO','BAR']) p@21: p@21: def test_build_authenticate_header(self): p@21: server = oauth.Server() p@21: headers = server.build_authenticate_header('example.com') p@21: self.assertTrue('WWW-Authenticate' in headers) p@21: self.assertEquals('OAuth realm="example.com"', p@21: headers['WWW-Authenticate']) p@21: p@21: def test_no_version(self): p@21: url = "http://sp.example.com/" p@21: p@21: params = { p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': int(time.time()), p@21: 'bar': 'blerg', p@21: 'multi': ['FOO','BAR'], p@21: 'foo': 59 p@21: } p@21: p@21: self.consumer = oauth.Consumer(key="consumer-key", p@21: secret="consumer-secret") p@21: self.token = oauth.Token(key="token-key", secret="token-secret") p@21: p@21: params['oauth_token'] = self.token.key p@21: params['oauth_consumer_key'] = self.consumer.key p@21: self.request = oauth.Request(method="GET", url=url, parameters=params) p@21: p@21: signature_method = oauth.SignatureMethod_HMAC_SHA1() p@21: self.request.sign_request(signature_method, self.consumer, self.token) p@21: p@21: server = oauth.Server() p@21: server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) p@21: p@21: parameters = server.verify_request(self.request, self.consumer, p@21: self.token) p@21: p@21: def test_invalid_version(self): p@21: url = "http://sp.example.com/" p@21: p@21: params = { p@21: 'oauth_version': '222.9922', p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': int(time.time()), p@21: 'bar': 'blerg', p@21: 'multi': ['foo','bar'], p@21: 'foo': 59 p@21: } p@21: p@21: consumer = oauth.Consumer(key="consumer-key", p@21: secret="consumer-secret") p@21: token = oauth.Token(key="token-key", secret="token-secret") p@21: p@21: params['oauth_token'] = token.key p@21: params['oauth_consumer_key'] = consumer.key p@21: request = oauth.Request(method="GET", url=url, parameters=params) p@21: p@21: signature_method = oauth.SignatureMethod_HMAC_SHA1() p@21: request.sign_request(signature_method, consumer, token) p@21: p@21: server = oauth.Server() p@21: server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) p@21: p@21: self.assertRaises(oauth.Error, server.verify_request, request, consumer, token) p@21: p@21: def test_invalid_signature_method(self): p@21: url = "http://sp.example.com/" p@21: p@21: params = { p@21: 'oauth_version': '1.0', p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': int(time.time()), p@21: 'bar': 'blerg', p@21: 'multi': ['FOO','BAR'], p@21: 'foo': 59 p@21: } p@21: p@21: consumer = oauth.Consumer(key="consumer-key", p@21: secret="consumer-secret") p@21: token = oauth.Token(key="token-key", secret="token-secret") p@21: p@21: params['oauth_token'] = token.key p@21: params['oauth_consumer_key'] = consumer.key p@21: request = oauth.Request(method="GET", url=url, parameters=params) p@21: p@21: signature_method = SignatureMethod_Bad() p@21: request.sign_request(signature_method, consumer, token) p@21: p@21: server = oauth.Server() p@21: server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) p@21: p@21: self.assertRaises(oauth.Error, server.verify_request, request, p@21: consumer, token) p@21: p@21: def test_missing_signature(self): p@21: url = "http://sp.example.com/" p@21: p@21: params = { p@21: 'oauth_version': '1.0', p@21: 'oauth_nonce': "4572616e48616d6d65724c61686176", p@21: 'oauth_timestamp': int(time.time()), p@21: 'bar': 'blerg', p@21: 'multi': ['FOO','BAR'], p@21: 'foo': 59 p@21: } p@21: p@21: consumer = oauth.Consumer(key="consumer-key", p@21: secret="consumer-secret") p@21: token = oauth.Token(key="token-key", secret="token-secret") p@21: p@21: params['oauth_token'] = token.key p@21: params['oauth_consumer_key'] = consumer.key p@21: request = oauth.Request(method="GET", url=url, parameters=params) p@21: p@21: signature_method = oauth.SignatureMethod_HMAC_SHA1() p@21: request.sign_request(signature_method, consumer, token) p@21: del request['oauth_signature'] p@21: p@21: server = oauth.Server() p@21: server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) p@21: p@21: self.assertRaises(oauth.MissingSignature, server.verify_request, p@21: request, consumer, token) p@21: p@21: p@21: # Request Token: http://oauth-sandbox.sevengoslings.net/request_token p@21: # Auth: http://oauth-sandbox.sevengoslings.net/authorize p@21: # Access Token: http://oauth-sandbox.sevengoslings.net/access_token p@21: # Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged p@21: # Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged p@21: # Key: bd37aed57e15df53 p@21: # Secret: 0e9e6413a9ef49510a4f68ed02cd p@21: class TestClient(unittest.TestCase): p@21: # oauth_uris = { p@21: # 'request_token': '/request_token.php', p@21: # 'access_token': '/access_token.php' p@21: # } p@21: p@21: oauth_uris = { p@21: 'request_token': '/request_token', p@21: 'authorize': '/authorize', p@21: 'access_token': '/access_token', p@21: 'two_legged': '/two_legged', p@21: 'three_legged': '/three_legged' p@21: } p@21: p@21: consumer_key = 'bd37aed57e15df53' p@21: consumer_secret = '0e9e6413a9ef49510a4f68ed02cd' p@21: host = 'http://oauth-sandbox.sevengoslings.net' p@21: p@21: def setUp(self): p@21: self.consumer = oauth.Consumer(key=self.consumer_key, p@21: secret=self.consumer_secret) p@21: p@21: self.body = { p@21: 'foo': 'bar', p@21: 'bar': 'foo', p@21: 'multi': ['FOO','BAR'], p@21: 'blah': 599999 p@21: } p@21: p@21: def _uri(self, type): p@21: uri = self.oauth_uris.get(type) p@21: if uri is None: p@21: raise KeyError("%s is not a valid OAuth URI type." % type) p@21: p@21: return "%s%s" % (self.host, uri) p@21: p@21: def create_simple_multipart_data(self, data): p@21: boundary = '---Boundary-%d' % random.randint(1,1000) p@21: crlf = '\r\n' p@21: items = [] p@21: for key, value in data.iteritems(): p@21: items += [ p@21: '--'+boundary, p@21: 'Content-Disposition: form-data; name="%s"'%str(key), p@21: '', p@21: str(value), p@21: ] p@21: items += ['', '--'+boundary+'--', ''] p@21: content_type = 'multipart/form-data; boundary=%s' % boundary p@21: return content_type, crlf.join(items) p@21: p@21: def test_init(self): p@21: class Blah(): p@21: pass p@21: p@21: try: p@21: client = oauth.Client(Blah()) p@21: self.fail("Client.__init__() accepted invalid Consumer.") p@21: except ValueError: p@21: pass p@21: p@21: consumer = oauth.Consumer('token', 'secret') p@21: try: p@21: client = oauth.Client(consumer, Blah()) p@21: self.fail("Client.__init__() accepted invalid Token.") p@21: except ValueError: p@21: pass p@21: p@21: def test_access_token_get(self): p@21: """Test getting an access token via GET.""" p@21: client = oauth.Client(self.consumer, None) p@21: resp, content = client.request(self._uri('request_token'), "GET") p@21: p@21: self.assertEquals(int(resp['status']), 200) p@21: p@21: def test_access_token_post(self): p@21: """Test getting an access token via POST.""" p@21: client = oauth.Client(self.consumer, None) p@21: resp, content = client.request(self._uri('request_token'), "POST") p@21: p@21: self.assertEquals(int(resp['status']), 200) p@21: p@21: res = dict(parse_qsl(content)) p@21: self.assertTrue('oauth_token' in res) p@21: self.assertTrue('oauth_token_secret' in res) p@21: p@21: def _two_legged(self, method): p@21: client = oauth.Client(self.consumer, None) p@21: p@21: return client.request(self._uri('two_legged'), method, p@21: body=urllib.urlencode(self.body)) p@21: p@21: def test_two_legged_post(self): p@21: """A test of a two-legged OAuth POST request.""" p@21: resp, content = self._two_legged("POST") p@21: p@21: self.assertEquals(int(resp['status']), 200) p@21: p@21: def test_two_legged_get(self): p@21: """A test of a two-legged OAuth GET request.""" p@21: resp, content = self._two_legged("GET") p@21: self.assertEquals(int(resp['status']), 200) p@21: p@21: @mock.patch('httplib2.Http.request') p@21: def test_multipart_post_does_not_alter_body(self, mockHttpRequest): p@21: random_result = random.randint(1,100) p@21: p@21: data = { p@21: 'rand-%d'%random.randint(1,100):random.randint(1,100), p@21: } p@21: content_type, body = self.create_simple_multipart_data(data) p@21: p@21: client = oauth.Client(self.consumer, None) p@21: uri = self._uri('two_legged') p@21: p@21: def mockrequest(cl, ur, **kw): p@21: self.failUnless(cl is client) p@21: self.failUnless(ur is uri) p@21: self.failUnlessEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers'])) p@21: self.failUnlessEqual(kw['body'], body) p@21: self.failUnlessEqual(kw['connection_type'], None) p@21: self.failUnlessEqual(kw['method'], 'POST') p@21: self.failUnlessEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS) p@21: self.failUnless(isinstance(kw['headers'], dict)) p@21: p@21: return random_result p@21: p@21: mockHttpRequest.side_effect = mockrequest p@21: p@21: result = client.request(uri, 'POST', headers={'Content-Type':content_type}, body=body) p@21: self.assertEqual(result, random_result) p@21: p@21: @mock.patch('httplib2.Http.request') p@21: def test_url_with_query_string(self, mockHttpRequest): p@21: uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf' p@21: client = oauth.Client(self.consumer, None) p@21: random_result = random.randint(1,100) p@21: p@21: def mockrequest(cl, ur, **kw): p@21: self.failUnless(cl is client) p@21: self.failUnlessEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers'])) p@21: self.failUnlessEqual(kw['body'], '') p@21: self.failUnlessEqual(kw['connection_type'], None) p@21: self.failUnlessEqual(kw['method'], 'GET') p@21: self.failUnlessEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS) p@21: self.failUnless(isinstance(kw['headers'], dict)) p@21: p@21: req = oauth.Request.from_consumer_and_token(self.consumer, None, p@21: http_method='GET', http_url=uri, parameters={}) p@21: req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.consumer, None) p@21: expected = parse_qsl(urlparse.urlparse(req.to_url()).query) p@21: actual = parse_qsl(urlparse.urlparse(ur).query) p@21: self.failUnlessEqual(len(expected), len(actual)) p@21: actual = dict(actual) p@21: for key, value in expected: p@21: if key not in ('oauth_signature', 'oauth_nonce', 'oauth_timestamp'): p@21: self.failUnlessEqual(actual[key], value) p@21: p@21: return random_result p@21: p@21: mockHttpRequest.side_effect = mockrequest p@21: p@21: client.request(uri, 'GET') p@21: p@21: @mock.patch('httplib2.Http.request') p@21: @mock.patch('oauth2.Request.from_consumer_and_token') p@21: def test_multiple_values_for_a_key(self, mockReqConstructor, mockHttpRequest): p@21: client = oauth.Client(self.consumer, None) p@21: p@21: request = oauth.Request("GET", "http://example.com/fetch.php", parameters={'multi': ['1', '2']}) p@21: mockReqConstructor.return_value = request p@21: p@21: client.request('http://whatever', 'POST', body='multi=1&multi=2') p@21: p@21: self.failUnlessEqual(mockReqConstructor.call_count, 1) p@21: self.failUnlessEqual(mockReqConstructor.call_args[1]['parameters'], {'multi': ['1', '2']}) p@21: p@21: self.failUnless('multi=1' in mockHttpRequest.call_args[1]['body']) p@21: self.failUnless('multi=2' in mockHttpRequest.call_args[1]['body']) p@21: p@21: if __name__ == "__main__": p@21: unittest.main()