p@21
|
1 # -*- coding: utf-8 -*-
|
p@21
|
2
|
p@21
|
3 """
|
p@21
|
4 The MIT License
|
p@21
|
5
|
p@21
|
6 Copyright (c) 2009 Vic Fryzel
|
p@21
|
7
|
p@21
|
8 Permission is hereby granted, free of charge, to any person obtaining a copy
|
p@21
|
9 of this software and associated documentation files (the "Software"), to deal
|
p@21
|
10 in the Software without restriction, including without limitation the rights
|
p@21
|
11 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
p@21
|
12 copies of the Software, and to permit persons to whom the Software is
|
p@21
|
13 furnished to do so, subject to the following conditions:
|
p@21
|
14
|
p@21
|
15 The above copyright notice and this permission notice shall be included in
|
p@21
|
16 all copies or substantial portions of the Software.
|
p@21
|
17
|
p@21
|
18 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
p@21
|
19 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
p@21
|
20 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
p@21
|
21 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
p@21
|
22 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
p@21
|
23 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
p@21
|
24 THE SOFTWARE.
|
p@21
|
25 """
|
p@21
|
26 import sys
|
p@21
|
27 import os
|
p@21
|
28 import unittest
|
p@21
|
29 import oauth2 as oauth
|
p@21
|
30 import random
|
p@21
|
31 import time
|
p@21
|
32 import urllib
|
p@21
|
33 import urlparse
|
p@21
|
34 from types import ListType
|
p@21
|
35 import mock
|
p@21
|
36 import httplib2
|
p@21
|
37
|
p@21
|
38 # Fix for python2.5 compatibility
|
p@21
|
39 try:
|
p@21
|
40 from urlparse import parse_qs, parse_qsl
|
p@21
|
41 except ImportError:
|
p@21
|
42 from cgi import parse_qs, parse_qsl
|
p@21
|
43
|
p@21
|
44
|
p@21
|
45 sys.path[0:0] = [os.path.join(os.path.dirname(__file__), ".."),]
|
p@21
|
46
|
p@21
|
47
|
p@21
|
48 class TestError(unittest.TestCase):
|
p@21
|
49 def test_message(self):
|
p@21
|
50 try:
|
p@21
|
51 raise oauth.Error
|
p@21
|
52 except oauth.Error, e:
|
p@21
|
53 self.assertEqual(e.message, 'OAuth error occurred.')
|
p@21
|
54
|
p@21
|
55 msg = 'OMG THINGS BROKE!!!!'
|
p@21
|
56 try:
|
p@21
|
57 raise oauth.Error(msg)
|
p@21
|
58 except oauth.Error, e:
|
p@21
|
59 self.assertEqual(e.message, msg)
|
p@21
|
60
|
p@21
|
61 def test_str(self):
|
p@21
|
62 try:
|
p@21
|
63 raise oauth.Error
|
p@21
|
64 except oauth.Error, e:
|
p@21
|
65 self.assertEquals(str(e), 'OAuth error occurred.')
|
p@21
|
66
|
p@21
|
67 class TestGenerateFunctions(unittest.TestCase):
|
p@21
|
68 def test_build_auth_header(self):
|
p@21
|
69 header = oauth.build_authenticate_header()
|
p@21
|
70 self.assertEqual(header['WWW-Authenticate'], 'OAuth realm=""')
|
p@21
|
71 self.assertEqual(len(header), 1)
|
p@21
|
72 realm = 'http://example.myrealm.com/'
|
p@21
|
73 header = oauth.build_authenticate_header(realm)
|
p@21
|
74 self.assertEqual(header['WWW-Authenticate'], 'OAuth realm="%s"' %
|
p@21
|
75 realm)
|
p@21
|
76 self.assertEqual(len(header), 1)
|
p@21
|
77
|
p@21
|
78 def test_build_xoauth_string(self):
|
p@21
|
79 consumer = oauth.Consumer('consumer_token', 'consumer_secret')
|
p@21
|
80 token = oauth.Token('user_token', 'user_secret')
|
p@21
|
81 url = "https://mail.google.com/mail/b/joe@example.com/imap/"
|
p@21
|
82 xoauth_string = oauth.build_xoauth_string(url, consumer, token)
|
p@21
|
83
|
p@21
|
84 method, oauth_url, oauth_string = xoauth_string.split(' ')
|
p@21
|
85
|
p@21
|
86 self.assertEqual("GET", method)
|
p@21
|
87 self.assertEqual(url, oauth_url)
|
p@21
|
88
|
p@21
|
89 returned = {}
|
p@21
|
90 parts = oauth_string.split(',')
|
p@21
|
91 for part in parts:
|
p@21
|
92 var, val = part.split('=')
|
p@21
|
93 returned[var] = val.strip('"')
|
p@21
|
94
|
p@21
|
95 self.assertEquals('HMAC-SHA1', returned['oauth_signature_method'])
|
p@21
|
96 self.assertEquals('user_token', returned['oauth_token'])
|
p@21
|
97 self.assertEquals('consumer_token', returned['oauth_consumer_key'])
|
p@21
|
98 self.assertTrue('oauth_signature' in returned, 'oauth_signature')
|
p@21
|
99
|
p@21
|
100 def test_escape(self):
|
p@21
|
101 string = 'http://whatever.com/~someuser/?test=test&other=other'
|
p@21
|
102 self.assert_('~' in oauth.escape(string))
|
p@21
|
103 string = '../../../../../../../etc/passwd'
|
p@21
|
104 self.assert_('../' not in oauth.escape(string))
|
p@21
|
105
|
p@21
|
106 def test_gen_nonce(self):
|
p@21
|
107 nonce = oauth.generate_nonce()
|
p@21
|
108 self.assertEqual(len(nonce), 8)
|
p@21
|
109 nonce = oauth.generate_nonce(20)
|
p@21
|
110 self.assertEqual(len(nonce), 20)
|
p@21
|
111
|
p@21
|
112 def test_gen_verifier(self):
|
p@21
|
113 verifier = oauth.generate_verifier()
|
p@21
|
114 self.assertEqual(len(verifier), 8)
|
p@21
|
115 verifier = oauth.generate_verifier(16)
|
p@21
|
116 self.assertEqual(len(verifier), 16)
|
p@21
|
117
|
p@21
|
118 def test_gen_timestamp(self):
|
p@21
|
119 exp = int(time.time())
|
p@21
|
120 now = oauth.generate_timestamp()
|
p@21
|
121 self.assertEqual(exp, now)
|
p@21
|
122
|
p@21
|
123 class TestConsumer(unittest.TestCase):
|
p@21
|
124 def setUp(self):
|
p@21
|
125 self.key = 'my-key'
|
p@21
|
126 self.secret = 'my-secret'
|
p@21
|
127 self.consumer = oauth.Consumer(key=self.key, secret=self.secret)
|
p@21
|
128
|
p@21
|
129 def test_init(self):
|
p@21
|
130 self.assertEqual(self.consumer.key, self.key)
|
p@21
|
131 self.assertEqual(self.consumer.secret, self.secret)
|
p@21
|
132
|
p@21
|
133 def test_basic(self):
|
p@21
|
134 self.assertRaises(ValueError, lambda: oauth.Consumer(None, None))
|
p@21
|
135 self.assertRaises(ValueError, lambda: oauth.Consumer('asf', None))
|
p@21
|
136 self.assertRaises(ValueError, lambda: oauth.Consumer(None, 'dasf'))
|
p@21
|
137
|
p@21
|
138 def test_str(self):
|
p@21
|
139 res = dict(parse_qsl(str(self.consumer)))
|
p@21
|
140 self.assertTrue('oauth_consumer_key' in res)
|
p@21
|
141 self.assertTrue('oauth_consumer_secret' in res)
|
p@21
|
142 self.assertEquals(res['oauth_consumer_key'], self.consumer.key)
|
p@21
|
143 self.assertEquals(res['oauth_consumer_secret'], self.consumer.secret)
|
p@21
|
144
|
p@21
|
145 class TestToken(unittest.TestCase):
|
p@21
|
146 def setUp(self):
|
p@21
|
147 self.key = 'my-key'
|
p@21
|
148 self.secret = 'my-secret'
|
p@21
|
149 self.token = oauth.Token(self.key, self.secret)
|
p@21
|
150
|
p@21
|
151 def test_basic(self):
|
p@21
|
152 self.assertRaises(ValueError, lambda: oauth.Token(None, None))
|
p@21
|
153 self.assertRaises(ValueError, lambda: oauth.Token('asf', None))
|
p@21
|
154 self.assertRaises(ValueError, lambda: oauth.Token(None, 'dasf'))
|
p@21
|
155
|
p@21
|
156 def test_init(self):
|
p@21
|
157 self.assertEqual(self.token.key, self.key)
|
p@21
|
158 self.assertEqual(self.token.secret, self.secret)
|
p@21
|
159 self.assertEqual(self.token.callback, None)
|
p@21
|
160 self.assertEqual(self.token.callback_confirmed, None)
|
p@21
|
161 self.assertEqual(self.token.verifier, None)
|
p@21
|
162
|
p@21
|
163 def test_set_callback(self):
|
p@21
|
164 self.assertEqual(self.token.callback, None)
|
p@21
|
165 self.assertEqual(self.token.callback_confirmed, None)
|
p@21
|
166 cb = 'http://www.example.com/my-callback'
|
p@21
|
167 self.token.set_callback(cb)
|
p@21
|
168 self.assertEqual(self.token.callback, cb)
|
p@21
|
169 self.assertEqual(self.token.callback_confirmed, 'true')
|
p@21
|
170 self.token.set_callback(None)
|
p@21
|
171 self.assertEqual(self.token.callback, None)
|
p@21
|
172 # TODO: The following test should probably not pass, but it does
|
p@21
|
173 # To fix this, check for None and unset 'true' in set_callback
|
p@21
|
174 # Additionally, should a confirmation truly be done of the callback?
|
p@21
|
175 self.assertEqual(self.token.callback_confirmed, 'true')
|
p@21
|
176
|
p@21
|
177 def test_set_verifier(self):
|
p@21
|
178 self.assertEqual(self.token.verifier, None)
|
p@21
|
179 v = oauth.generate_verifier()
|
p@21
|
180 self.token.set_verifier(v)
|
p@21
|
181 self.assertEqual(self.token.verifier, v)
|
p@21
|
182 self.token.set_verifier()
|
p@21
|
183 self.assertNotEqual(self.token.verifier, v)
|
p@21
|
184 self.token.set_verifier('')
|
p@21
|
185 self.assertEqual(self.token.verifier, '')
|
p@21
|
186
|
p@21
|
187 def test_get_callback_url(self):
|
p@21
|
188 self.assertEqual(self.token.get_callback_url(), None)
|
p@21
|
189
|
p@21
|
190 self.token.set_verifier()
|
p@21
|
191 self.assertEqual(self.token.get_callback_url(), None)
|
p@21
|
192
|
p@21
|
193 cb = 'http://www.example.com/my-callback?save=1&return=true'
|
p@21
|
194 v = oauth.generate_verifier()
|
p@21
|
195 self.token.set_callback(cb)
|
p@21
|
196 self.token.set_verifier(v)
|
p@21
|
197 url = self.token.get_callback_url()
|
p@21
|
198 verifier_str = '&oauth_verifier=%s' % v
|
p@21
|
199 self.assertEqual(url, '%s%s' % (cb, verifier_str))
|
p@21
|
200
|
p@21
|
201 cb = 'http://www.example.com/my-callback-no-query'
|
p@21
|
202 v = oauth.generate_verifier()
|
p@21
|
203 self.token.set_callback(cb)
|
p@21
|
204 self.token.set_verifier(v)
|
p@21
|
205 url = self.token.get_callback_url()
|
p@21
|
206 verifier_str = '?oauth_verifier=%s' % v
|
p@21
|
207 self.assertEqual(url, '%s%s' % (cb, verifier_str))
|
p@21
|
208
|
p@21
|
209 def test_to_string(self):
|
p@21
|
210 string = 'oauth_token_secret=%s&oauth_token=%s' % (self.secret,
|
p@21
|
211 self.key)
|
p@21
|
212 self.assertEqual(self.token.to_string(), string)
|
p@21
|
213
|
p@21
|
214 self.token.set_callback('http://www.example.com/my-callback')
|
p@21
|
215 string += '&oauth_callback_confirmed=true'
|
p@21
|
216 self.assertEqual(self.token.to_string(), string)
|
p@21
|
217
|
p@21
|
218 def _compare_tokens(self, new):
|
p@21
|
219 self.assertEqual(self.token.key, new.key)
|
p@21
|
220 self.assertEqual(self.token.secret, new.secret)
|
p@21
|
221 # TODO: What about copying the callback to the new token?
|
p@21
|
222 # self.assertEqual(self.token.callback, new.callback)
|
p@21
|
223 self.assertEqual(self.token.callback_confirmed,
|
p@21
|
224 new.callback_confirmed)
|
p@21
|
225 # TODO: What about copying the verifier to the new token?
|
p@21
|
226 # self.assertEqual(self.token.verifier, new.verifier)
|
p@21
|
227
|
p@21
|
228 def test_to_string(self):
|
p@21
|
229 tok = oauth.Token('tooken', 'seecret')
|
p@21
|
230 self.assertEqual(str(tok), 'oauth_token_secret=seecret&oauth_token=tooken')
|
p@21
|
231
|
p@21
|
232 def test_from_string(self):
|
p@21
|
233 self.assertRaises(ValueError, lambda: oauth.Token.from_string(''))
|
p@21
|
234 self.assertRaises(ValueError, lambda: oauth.Token.from_string('blahblahblah'))
|
p@21
|
235 self.assertRaises(ValueError, lambda: oauth.Token.from_string('blah=blah'))
|
p@21
|
236
|
p@21
|
237 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret=asfdasf'))
|
p@21
|
238 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret='))
|
p@21
|
239 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=asfdasf'))
|
p@21
|
240 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token='))
|
p@21
|
241 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=&oauth_token_secret='))
|
p@21
|
242 self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=tooken%26oauth_token_secret=seecret'))
|
p@21
|
243
|
p@21
|
244 string = self.token.to_string()
|
p@21
|
245 new = oauth.Token.from_string(string)
|
p@21
|
246 self._compare_tokens(new)
|
p@21
|
247
|
p@21
|
248 self.token.set_callback('http://www.example.com/my-callback')
|
p@21
|
249 string = self.token.to_string()
|
p@21
|
250 new = oauth.Token.from_string(string)
|
p@21
|
251 self._compare_tokens(new)
|
p@21
|
252
|
p@21
|
253 class ReallyEqualMixin:
|
p@21
|
254 def failUnlessReallyEqual(self, a, b, msg=None):
|
p@21
|
255 self.failUnlessEqual(a, b, msg=msg)
|
p@21
|
256 self.failUnlessEqual(type(a), type(b), msg="a :: %r, b :: %r, %r" % (a, b, msg))
|
p@21
|
257
|
p@21
|
258 class TestFuncs(unittest.TestCase):
|
p@21
|
259 def test_to_unicode(self):
|
p@21
|
260 self.failUnlessRaises(TypeError, oauth.to_unicode, '\xae')
|
p@21
|
261 self.failUnlessRaises(TypeError, oauth.to_unicode_optional_iterator, '\xae')
|
p@21
|
262 self.failUnlessRaises(TypeError, oauth.to_unicode_optional_iterator, ['\xae'])
|
p@21
|
263
|
p@21
|
264 self.failUnlessEqual(oauth.to_unicode(':-)'), u':-)')
|
p@21
|
265 self.failUnlessEqual(oauth.to_unicode(u'\u00ae'), u'\u00ae')
|
p@21
|
266 self.failUnlessEqual(oauth.to_unicode('\xc2\xae'), u'\u00ae')
|
p@21
|
267 self.failUnlessEqual(oauth.to_unicode_optional_iterator([':-)']), [u':-)'])
|
p@21
|
268 self.failUnlessEqual(oauth.to_unicode_optional_iterator([u'\u00ae']), [u'\u00ae'])
|
p@21
|
269
|
p@21
|
270 class TestRequest(unittest.TestCase, ReallyEqualMixin):
|
p@21
|
271 def test_setter(self):
|
p@21
|
272 url = "http://example.com"
|
p@21
|
273 method = "GET"
|
p@21
|
274 req = oauth.Request(method)
|
p@21
|
275 self.assertTrue(not hasattr(req, 'url') or req.url is None)
|
p@21
|
276 self.assertTrue(not hasattr(req, 'normalized_url') or req.normalized_url is None)
|
p@21
|
277
|
p@21
|
278 def test_deleter(self):
|
p@21
|
279 url = "http://example.com"
|
p@21
|
280 method = "GET"
|
p@21
|
281 req = oauth.Request(method, url)
|
p@21
|
282
|
p@21
|
283 try:
|
p@21
|
284 del req.url
|
p@21
|
285 url = req.url
|
p@21
|
286 self.fail("AttributeError should have been raised on empty url.")
|
p@21
|
287 except AttributeError:
|
p@21
|
288 pass
|
p@21
|
289 except Exception, e:
|
p@21
|
290 self.fail(str(e))
|
p@21
|
291
|
p@21
|
292 def test_url(self):
|
p@21
|
293 url1 = "http://example.com:80/foo.php"
|
p@21
|
294 url2 = "https://example.com:443/foo.php"
|
p@21
|
295 exp1 = "http://example.com/foo.php"
|
p@21
|
296 exp2 = "https://example.com/foo.php"
|
p@21
|
297 method = "GET"
|
p@21
|
298
|
p@21
|
299 req = oauth.Request(method, url1)
|
p@21
|
300 self.assertEquals(req.normalized_url, exp1)
|
p@21
|
301 self.assertEquals(req.url, url1)
|
p@21
|
302
|
p@21
|
303 req = oauth.Request(method, url2)
|
p@21
|
304 self.assertEquals(req.normalized_url, exp2)
|
p@21
|
305 self.assertEquals(req.url, url2)
|
p@21
|
306
|
p@21
|
307 def test_bad_url(self):
|
p@21
|
308 request = oauth.Request()
|
p@21
|
309 try:
|
p@21
|
310 request.url = "ftp://example.com"
|
p@21
|
311 self.fail("Invalid URL scheme was accepted.")
|
p@21
|
312 except ValueError:
|
p@21
|
313 pass
|
p@21
|
314
|
p@21
|
315 def test_unset_consumer_and_token(self):
|
p@21
|
316 consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
|
p@21
|
317 token = oauth.Token('my_key', 'my_secret')
|
p@21
|
318 request = oauth.Request("GET", "http://example.com/fetch.php")
|
p@21
|
319 request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer,
|
p@21
|
320 token)
|
p@21
|
321
|
p@21
|
322 self.assertEquals(consumer.key, request['oauth_consumer_key'])
|
p@21
|
323 self.assertEquals(token.key, request['oauth_token'])
|
p@21
|
324
|
p@21
|
325 def test_no_url_set(self):
|
p@21
|
326 consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
|
p@21
|
327 token = oauth.Token('my_key', 'my_secret')
|
p@21
|
328 request = oauth.Request()
|
p@21
|
329
|
p@21
|
330 try:
|
p@21
|
331 try:
|
p@21
|
332 request.sign_request(oauth.SignatureMethod_HMAC_SHA1(),
|
p@21
|
333 consumer, token)
|
p@21
|
334 except TypeError:
|
p@21
|
335 self.fail("Signature method didn't check for a normalized URL.")
|
p@21
|
336 except ValueError:
|
p@21
|
337 pass
|
p@21
|
338
|
p@21
|
339 def test_url_query(self):
|
p@21
|
340 url = "https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10"
|
p@21
|
341 normalized_url = urlparse.urlunparse(urlparse.urlparse(url)[:3] + (None, None, None))
|
p@21
|
342 method = "GET"
|
p@21
|
343
|
p@21
|
344 req = oauth.Request(method, url)
|
p@21
|
345 self.assertEquals(req.url, url)
|
p@21
|
346 self.assertEquals(req.normalized_url, normalized_url)
|
p@21
|
347
|
p@21
|
348 def test_get_parameter(self):
|
p@21
|
349 url = "http://example.com"
|
p@21
|
350 method = "GET"
|
p@21
|
351 params = {'oauth_consumer' : 'asdf'}
|
p@21
|
352 req = oauth.Request(method, url, parameters=params)
|
p@21
|
353
|
p@21
|
354 self.assertEquals(req.get_parameter('oauth_consumer'), 'asdf')
|
p@21
|
355 self.assertRaises(oauth.Error, req.get_parameter, 'blah')
|
p@21
|
356
|
p@21
|
357 def test_get_nonoauth_parameters(self):
|
p@21
|
358
|
p@21
|
359 oauth_params = {
|
p@21
|
360 'oauth_consumer': 'asdfasdfasdf'
|
p@21
|
361 }
|
p@21
|
362
|
p@21
|
363 other_params = {
|
p@21
|
364 u'foo': u'baz',
|
p@21
|
365 u'bar': u'foo',
|
p@21
|
366 u'multi': [u'FOO',u'BAR'],
|
p@21
|
367 u'uni_utf8': u'\xae',
|
p@21
|
368 u'uni_unicode': u'\u00ae',
|
p@21
|
369 u'uni_unicode_2': u'åÅøØ',
|
p@21
|
370 }
|
p@21
|
371
|
p@21
|
372 params = oauth_params
|
p@21
|
373 params.update(other_params)
|
p@21
|
374
|
p@21
|
375 req = oauth.Request("GET", "http://example.com", params)
|
p@21
|
376 self.assertEquals(other_params, req.get_nonoauth_parameters())
|
p@21
|
377
|
p@21
|
378 def test_to_header(self):
|
p@21
|
379 realm = "http://sp.example.com/"
|
p@21
|
380
|
p@21
|
381 params = {
|
p@21
|
382 'oauth_version': "1.0",
|
p@21
|
383 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
384 'oauth_timestamp': "137131200",
|
p@21
|
385 'oauth_consumer_key': "0685bd9184jfhq22",
|
p@21
|
386 'oauth_signature_method': "HMAC-SHA1",
|
p@21
|
387 'oauth_token': "ad180jjd733klru7",
|
p@21
|
388 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
|
p@21
|
389 }
|
p@21
|
390
|
p@21
|
391 req = oauth.Request("GET", realm, params)
|
p@21
|
392 header, value = req.to_header(realm).items()[0]
|
p@21
|
393
|
p@21
|
394 parts = value.split('OAuth ')
|
p@21
|
395 vars = parts[1].split(', ')
|
p@21
|
396 self.assertTrue(len(vars), (len(params) + 1))
|
p@21
|
397
|
p@21
|
398 res = {}
|
p@21
|
399 for v in vars:
|
p@21
|
400 var, val = v.split('=')
|
p@21
|
401 res[var] = urllib.unquote(val.strip('"'))
|
p@21
|
402
|
p@21
|
403 self.assertEquals(realm, res['realm'])
|
p@21
|
404 del res['realm']
|
p@21
|
405
|
p@21
|
406 self.assertTrue(len(res), len(params))
|
p@21
|
407
|
p@21
|
408 for key, val in res.items():
|
p@21
|
409 self.assertEquals(val, params.get(key))
|
p@21
|
410
|
p@21
|
411 def test_to_postdata_nonascii(self):
|
p@21
|
412 realm = "http://sp.example.com/"
|
p@21
|
413
|
p@21
|
414 params = {
|
p@21
|
415 'nonasciithing': u'q\xbfu\xe9 ,aasp u?..a.s',
|
p@21
|
416 'oauth_version': "1.0",
|
p@21
|
417 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
418 'oauth_timestamp': "137131200",
|
p@21
|
419 'oauth_consumer_key': "0685bd9184jfhq22",
|
p@21
|
420 'oauth_signature_method': "HMAC-SHA1",
|
p@21
|
421 'oauth_token': "ad180jjd733klru7",
|
p@21
|
422 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
|
p@21
|
423 }
|
p@21
|
424
|
p@21
|
425 req = oauth.Request("GET", realm, params)
|
p@21
|
426
|
p@21
|
427 self.failUnlessReallyEqual(req.to_postdata(), 'nonasciithing=q%C2%BFu%C3%A9%20%2Caasp%20u%3F..a.s&oauth_nonce=4572616e48616d6d65724c61686176&oauth_timestamp=137131200&oauth_consumer_key=0685bd9184jfhq22&oauth_signature_method=HMAC-SHA1&oauth_version=1.0&oauth_token=ad180jjd733klru7&oauth_signature=wOJIO9A2W5mFwDgiDvZbTSMK%252FPY%253D')
|
p@21
|
428
|
p@21
|
429 def test_to_postdata(self):
|
p@21
|
430 realm = "http://sp.example.com/"
|
p@21
|
431
|
p@21
|
432 params = {
|
p@21
|
433 'multi': ['FOO','BAR'],
|
p@21
|
434 'oauth_version': "1.0",
|
p@21
|
435 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
436 'oauth_timestamp': "137131200",
|
p@21
|
437 'oauth_consumer_key': "0685bd9184jfhq22",
|
p@21
|
438 'oauth_signature_method': "HMAC-SHA1",
|
p@21
|
439 'oauth_token': "ad180jjd733klru7",
|
p@21
|
440 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
|
p@21
|
441 }
|
p@21
|
442
|
p@21
|
443 req = oauth.Request("GET", realm, params)
|
p@21
|
444
|
p@21
|
445 flat = [('multi','FOO'),('multi','BAR')]
|
p@21
|
446 del params['multi']
|
p@21
|
447 flat.extend(params.items())
|
p@21
|
448 kf = lambda x: x[0]
|
p@21
|
449 self.assertEquals(sorted(flat, key=kf), sorted(parse_qsl(req.to_postdata()), key=kf))
|
p@21
|
450
|
p@21
|
451 def test_to_url(self):
|
p@21
|
452 url = "http://sp.example.com/"
|
p@21
|
453
|
p@21
|
454 params = {
|
p@21
|
455 'oauth_version': "1.0",
|
p@21
|
456 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
457 'oauth_timestamp': "137131200",
|
p@21
|
458 'oauth_consumer_key': "0685bd9184jfhq22",
|
p@21
|
459 'oauth_signature_method': "HMAC-SHA1",
|
p@21
|
460 'oauth_token': "ad180jjd733klru7",
|
p@21
|
461 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
|
p@21
|
462 }
|
p@21
|
463
|
p@21
|
464 req = oauth.Request("GET", url, params)
|
p@21
|
465 exp = urlparse.urlparse("%s?%s" % (url, urllib.urlencode(params)))
|
p@21
|
466 res = urlparse.urlparse(req.to_url())
|
p@21
|
467 self.assertEquals(exp.scheme, res.scheme)
|
p@21
|
468 self.assertEquals(exp.netloc, res.netloc)
|
p@21
|
469 self.assertEquals(exp.path, res.path)
|
p@21
|
470
|
p@21
|
471 a = parse_qs(exp.query)
|
p@21
|
472 b = parse_qs(res.query)
|
p@21
|
473 self.assertEquals(a, b)
|
p@21
|
474
|
p@21
|
475 def test_to_url_with_query(self):
|
p@21
|
476 url = "https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10"
|
p@21
|
477
|
p@21
|
478 params = {
|
p@21
|
479 'oauth_version': "1.0",
|
p@21
|
480 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
481 'oauth_timestamp': "137131200",
|
p@21
|
482 'oauth_consumer_key': "0685bd9184jfhq22",
|
p@21
|
483 'oauth_signature_method': "HMAC-SHA1",
|
p@21
|
484 'oauth_token': "ad180jjd733klru7",
|
p@21
|
485 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
|
p@21
|
486 }
|
p@21
|
487
|
p@21
|
488 req = oauth.Request("GET", url, params)
|
p@21
|
489 # Note: the url above already has query parameters, so append new ones with &
|
p@21
|
490 exp = urlparse.urlparse("%s&%s" % (url, urllib.urlencode(params)))
|
p@21
|
491 res = urlparse.urlparse(req.to_url())
|
p@21
|
492 self.assertEquals(exp.scheme, res.scheme)
|
p@21
|
493 self.assertEquals(exp.netloc, res.netloc)
|
p@21
|
494 self.assertEquals(exp.path, res.path)
|
p@21
|
495
|
p@21
|
496 a = parse_qs(exp.query)
|
p@21
|
497 b = parse_qs(res.query)
|
p@21
|
498 self.assertTrue('alt' in b)
|
p@21
|
499 self.assertTrue('max-contacts' in b)
|
p@21
|
500 self.assertEquals(b['alt'], ['json'])
|
p@21
|
501 self.assertEquals(b['max-contacts'], ['10'])
|
p@21
|
502 self.assertEquals(a, b)
|
p@21
|
503
|
p@21
|
504 def test_signature_base_string_nonascii_nonutf8(self):
|
p@21
|
505 consumer = oauth.Consumer('consumer_token', 'consumer_secret')
|
p@21
|
506
|
p@21
|
507 url = u'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc\u2766,+CA'
|
p@21
|
508 req = oauth.Request("GET", url)
|
p@21
|
509 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json')
|
p@21
|
510 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
|
p@21
|
511 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=')
|
p@21
|
512
|
p@21
|
513 url = 'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc\xe2\x9d\xa6,+CA'
|
p@21
|
514 req = oauth.Request("GET", url)
|
p@21
|
515 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json')
|
p@21
|
516 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
|
p@21
|
517 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=')
|
p@21
|
518
|
p@21
|
519 url = 'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA'
|
p@21
|
520 req = oauth.Request("GET", url)
|
p@21
|
521 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json')
|
p@21
|
522 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
|
p@21
|
523 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=')
|
p@21
|
524
|
p@21
|
525 url = u'http://api.simplegeo.com:80/1.0/places/address.json?q=monkeys&category=animal&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA'
|
p@21
|
526 req = oauth.Request("GET", url)
|
p@21
|
527 self.failUnlessReallyEqual(req.normalized_url, u'http://api.simplegeo.com/1.0/places/address.json')
|
p@21
|
528 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
|
p@21
|
529 self.failUnlessReallyEqual(req['oauth_signature'], 'WhufgeZKyYpKsI70GZaiDaYwl6g=')
|
p@21
|
530
|
p@21
|
531 def test_signature_base_string_with_query(self):
|
p@21
|
532 url = "https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10"
|
p@21
|
533 params = {
|
p@21
|
534 'oauth_version': "1.0",
|
p@21
|
535 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
536 'oauth_timestamp': "137131200",
|
p@21
|
537 'oauth_consumer_key': "0685bd9184jfhq22",
|
p@21
|
538 'oauth_signature_method': "HMAC-SHA1",
|
p@21
|
539 'oauth_token': "ad180jjd733klru7",
|
p@21
|
540 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
|
p@21
|
541 }
|
p@21
|
542 req = oauth.Request("GET", url, params)
|
p@21
|
543 self.assertEquals(req.normalized_url, 'https://www.google.com/m8/feeds/contacts/default/full/')
|
p@21
|
544 self.assertEquals(req.url, 'https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10')
|
p@21
|
545 normalized_params = parse_qsl(req.get_normalized_parameters())
|
p@21
|
546 self.assertTrue(len(normalized_params), len(params) + 2)
|
p@21
|
547 normalized_params = dict(normalized_params)
|
p@21
|
548 for key, value in params.iteritems():
|
p@21
|
549 if key == 'oauth_signature':
|
p@21
|
550 continue
|
p@21
|
551 self.assertEquals(value, normalized_params[key])
|
p@21
|
552 self.assertEquals(normalized_params['alt'], 'json')
|
p@21
|
553 self.assertEquals(normalized_params['max-contacts'], '10')
|
p@21
|
554
|
p@21
|
555 def test_get_normalized_parameters_empty(self):
|
p@21
|
556 url = "http://sp.example.com/?empty="
|
p@21
|
557
|
p@21
|
558 req = oauth.Request("GET", url)
|
p@21
|
559
|
p@21
|
560 res = req.get_normalized_parameters()
|
p@21
|
561
|
p@21
|
562 expected='empty='
|
p@21
|
563
|
p@21
|
564 self.assertEquals(expected, res)
|
p@21
|
565
|
p@21
|
566 def test_get_normalized_parameters_duplicate(self):
|
p@21
|
567 url = "http://example.com/v2/search/videos?oauth_nonce=79815175&oauth_timestamp=1295397962&oauth_consumer_key=mykey&oauth_signature_method=HMAC-SHA1&q=car&oauth_version=1.0&offset=10&oauth_signature=spWLI%2FGQjid7sQVd5%2FarahRxzJg%3D"
|
p@21
|
568
|
p@21
|
569 req = oauth.Request("GET", url)
|
p@21
|
570
|
p@21
|
571 res = req.get_normalized_parameters()
|
p@21
|
572
|
p@21
|
573 expected='oauth_consumer_key=mykey&oauth_nonce=79815175&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1295397962&oauth_version=1.0&offset=10&q=car'
|
p@21
|
574
|
p@21
|
575 self.assertEquals(expected, res)
|
p@21
|
576
|
p@21
|
577 def test_get_normalized_parameters_from_url(self):
|
p@21
|
578 # example copied from
|
p@21
|
579 # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
|
p@21
|
580 # which in turns says that it was copied from
|
p@21
|
581 # http://oauth.net/core/1.0/#sig_base_example .
|
p@21
|
582 url = "http://photos.example.net/photos?file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk&oauth_version=1.0&size=original"
|
p@21
|
583
|
p@21
|
584 req = oauth.Request("GET", url)
|
p@21
|
585
|
p@21
|
586 res = req.get_normalized_parameters()
|
p@21
|
587
|
p@21
|
588 expected = 'file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk&oauth_version=1.0&size=original'
|
p@21
|
589
|
p@21
|
590 self.assertEquals(expected, res)
|
p@21
|
591
|
p@21
|
592 def test_signing_base(self):
|
p@21
|
593 # example copied from
|
p@21
|
594 # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js
|
p@21
|
595 # which in turns says that it was copied from
|
p@21
|
596 # http://oauth.net/core/1.0/#sig_base_example .
|
p@21
|
597 url = "http://photos.example.net/photos?file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk&oauth_version=1.0&size=original"
|
p@21
|
598
|
p@21
|
599 req = oauth.Request("GET", url)
|
p@21
|
600
|
p@21
|
601 sm = oauth.SignatureMethod_HMAC_SHA1()
|
p@21
|
602
|
p@21
|
603 consumer = oauth.Consumer('dpf43f3p2l4k3l03', 'foo')
|
p@21
|
604 key, raw = sm.signing_base(req, consumer, None)
|
p@21
|
605
|
p@21
|
606 expected = 'GET&http%3A%2F%2Fphotos.example.net%2Fphotos&file%3Dvacation.jpg%26oauth_consumer_key%3Ddpf43f3p2l4k3l03%26oauth_nonce%3Dkllo9940pd9333jh%26oauth_signature_method%3DHMAC-SHA1%26oauth_timestamp%3D1191242096%26oauth_token%3Dnnch734d00sl2jdk%26oauth_version%3D1.0%26size%3Doriginal'
|
p@21
|
607 self.assertEquals(expected, raw)
|
p@21
|
608
|
p@21
|
609 def test_get_normalized_parameters(self):
|
p@21
|
610 url = "http://sp.example.com/"
|
p@21
|
611
|
p@21
|
612 params = {
|
p@21
|
613 'oauth_version': "1.0",
|
p@21
|
614 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
615 'oauth_timestamp': "137131200",
|
p@21
|
616 'oauth_consumer_key': "0685bd9184jfhq22",
|
p@21
|
617 'oauth_signature_method': "HMAC-SHA1",
|
p@21
|
618 'oauth_token': "ad180jjd733klru7",
|
p@21
|
619 'multi': ['FOO','BAR', u'\u00ae', '\xc2\xae'],
|
p@21
|
620 'multi_same': ['FOO','FOO'],
|
p@21
|
621 'uni_utf8_bytes': '\xc2\xae',
|
p@21
|
622 'uni_unicode_object': u'\u00ae'
|
p@21
|
623 }
|
p@21
|
624
|
p@21
|
625 req = oauth.Request("GET", url, params)
|
p@21
|
626
|
p@21
|
627 res = req.get_normalized_parameters()
|
p@21
|
628
|
p@21
|
629 expected='multi=BAR&multi=FOO&multi=%C2%AE&multi=%C2%AE&multi_same=FOO&multi_same=FOO&oauth_consumer_key=0685bd9184jfhq22&oauth_nonce=4572616e48616d6d65724c61686176&oauth_signature_method=HMAC-SHA1&oauth_timestamp=137131200&oauth_token=ad180jjd733klru7&oauth_version=1.0&uni_unicode_object=%C2%AE&uni_utf8_bytes=%C2%AE'
|
p@21
|
630
|
p@21
|
631 self.assertEquals(expected, res)
|
p@21
|
632
|
p@21
|
633 def test_get_normalized_parameters_ignores_auth_signature(self):
|
p@21
|
634 url = "http://sp.example.com/"
|
p@21
|
635
|
p@21
|
636 params = {
|
p@21
|
637 'oauth_version': "1.0",
|
p@21
|
638 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
639 'oauth_timestamp': "137131200",
|
p@21
|
640 'oauth_consumer_key': "0685bd9184jfhq22",
|
p@21
|
641 'oauth_signature_method': "HMAC-SHA1",
|
p@21
|
642 'oauth_signature': "some-random-signature-%d" % random.randint(1000, 2000),
|
p@21
|
643 'oauth_token': "ad180jjd733klru7",
|
p@21
|
644 }
|
p@21
|
645
|
p@21
|
646 req = oauth.Request("GET", url, params)
|
p@21
|
647
|
p@21
|
648 res = req.get_normalized_parameters()
|
p@21
|
649
|
p@21
|
650 self.assertNotEquals(urllib.urlencode(sorted(params.items())), res)
|
p@21
|
651
|
p@21
|
652 foo = params.copy()
|
p@21
|
653 del foo["oauth_signature"]
|
p@21
|
654 self.assertEqual(urllib.urlencode(sorted(foo.items())), res)
|
p@21
|
655
|
p@21
|
656 def test_set_signature_method(self):
|
p@21
|
657 consumer = oauth.Consumer('key', 'secret')
|
p@21
|
658 client = oauth.Client(consumer)
|
p@21
|
659
|
p@21
|
660 class Blah:
|
p@21
|
661 pass
|
p@21
|
662
|
p@21
|
663 try:
|
p@21
|
664 client.set_signature_method(Blah())
|
p@21
|
665 self.fail("Client.set_signature_method() accepted invalid method.")
|
p@21
|
666 except ValueError:
|
p@21
|
667 pass
|
p@21
|
668
|
p@21
|
669 m = oauth.SignatureMethod_HMAC_SHA1()
|
p@21
|
670 client.set_signature_method(m)
|
p@21
|
671 self.assertEquals(m, client.method)
|
p@21
|
672
|
p@21
|
673 def test_get_normalized_string_escapes_spaces_properly(self):
|
p@21
|
674 url = "http://sp.example.com/"
|
p@21
|
675 params = {
|
p@21
|
676 "some_random_data": random.randint(100, 1000),
|
p@21
|
677 "data": "This data with a random number (%d) has spaces!" % random.randint(1000, 2000),
|
p@21
|
678 }
|
p@21
|
679
|
p@21
|
680 req = oauth.Request("GET", url, params)
|
p@21
|
681 res = req.get_normalized_parameters()
|
p@21
|
682 expected = urllib.urlencode(sorted(params.items())).replace('+', '%20')
|
p@21
|
683 self.assertEqual(expected, res)
|
p@21
|
684
|
p@21
|
685 @mock.patch('oauth2.Request.make_timestamp')
|
p@21
|
686 @mock.patch('oauth2.Request.make_nonce')
|
p@21
|
687 def test_request_nonutf8_bytes(self, mock_make_nonce, mock_make_timestamp):
|
p@21
|
688 mock_make_nonce.return_value = 5
|
p@21
|
689 mock_make_timestamp.return_value = 6
|
p@21
|
690
|
p@21
|
691 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
|
p@21
|
692 con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
|
p@21
|
693 params = {
|
p@21
|
694 'oauth_version': "1.0",
|
p@21
|
695 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
696 'oauth_timestamp': "137131200",
|
p@21
|
697 'oauth_token': tok.key,
|
p@21
|
698 'oauth_consumer_key': con.key
|
p@21
|
699 }
|
p@21
|
700
|
p@21
|
701 # If someone passes a sequence of bytes which is not ascii for
|
p@21
|
702 # url, we'll raise an exception as early as possible.
|
p@21
|
703 url = "http://sp.example.com/\x92" # It's actually cp1252-encoding...
|
p@21
|
704 self.assertRaises(TypeError, oauth.Request, method="GET", url=url, parameters=params)
|
p@21
|
705
|
p@21
|
706 # And if they pass an unicode, then we'll use it.
|
p@21
|
707 url = u'http://sp.example.com/\u2019'
|
p@21
|
708 req = oauth.Request(method="GET", url=url, parameters=params)
|
p@21
|
709 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
|
p@21
|
710 self.failUnlessReallyEqual(req['oauth_signature'], 'cMzvCkhvLL57+sTIxLITTHfkqZk=')
|
p@21
|
711
|
p@21
|
712 # And if it is a utf-8-encoded-then-percent-encoded non-ascii
|
p@21
|
713 # thing, we'll decode it and use it.
|
p@21
|
714 url = "http://sp.example.com/%E2%80%99"
|
p@21
|
715 req = oauth.Request(method="GET", url=url, parameters=params)
|
p@21
|
716 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
|
p@21
|
717 self.failUnlessReallyEqual(req['oauth_signature'], 'yMLKOyNKC/DkyhUOb8DLSvceEWE=')
|
p@21
|
718
|
p@21
|
719 # Same thing with the params.
|
p@21
|
720 url = "http://sp.example.com/"
|
p@21
|
721
|
p@21
|
722 # If someone passes a sequence of bytes which is not ascii in
|
p@21
|
723 # params, we'll raise an exception as early as possible.
|
p@21
|
724 params['non_oauth_thing'] = '\xae', # It's actually cp1252-encoding...
|
p@21
|
725 self.assertRaises(TypeError, oauth.Request, method="GET", url=url, parameters=params)
|
p@21
|
726
|
p@21
|
727 # And if they pass a unicode, then we'll use it.
|
p@21
|
728 params['non_oauth_thing'] = u'\u2019'
|
p@21
|
729 req = oauth.Request(method="GET", url=url, parameters=params)
|
p@21
|
730 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
|
p@21
|
731 self.failUnlessReallyEqual(req['oauth_signature'], '0GU50m0v60CVDB5JnoBXnvvvKx4=')
|
p@21
|
732
|
p@21
|
733 # And if it is a utf-8-encoded non-ascii thing, we'll decode
|
p@21
|
734 # it and use it.
|
p@21
|
735 params['non_oauth_thing'] = '\xc2\xae'
|
p@21
|
736 req = oauth.Request(method="GET", url=url, parameters=params)
|
p@21
|
737 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
|
p@21
|
738 self.failUnlessReallyEqual(req['oauth_signature'], 'pqOCu4qvRTiGiXB8Z61Jsey0pMM=')
|
p@21
|
739
|
p@21
|
740
|
p@21
|
741 # Also if there are non-utf8 bytes in the query args.
|
p@21
|
742 url = "http://sp.example.com/?q=\x92" # cp1252
|
p@21
|
743 self.assertRaises(TypeError, oauth.Request, method="GET", url=url, parameters=params)
|
p@21
|
744
|
p@21
|
745 def test_request_hash_of_body(self):
|
p@21
|
746 tok = oauth.Token(key="token", secret="tok-test-secret")
|
p@21
|
747 con = oauth.Consumer(key="consumer", secret="con-test-secret")
|
p@21
|
748
|
p@21
|
749 # Example 1a from Appendix A.1 of
|
p@21
|
750 # http://oauth.googlecode.com/svn/spec/ext/body_hash/1.0/oauth-bodyhash.html
|
p@21
|
751 # Except that we get a differetn result than they do.
|
p@21
|
752
|
p@21
|
753 params = {
|
p@21
|
754 'oauth_version': "1.0",
|
p@21
|
755 'oauth_token': tok.key,
|
p@21
|
756 'oauth_nonce': 10288510250934,
|
p@21
|
757 'oauth_timestamp': 1236874155,
|
p@21
|
758 'oauth_consumer_key': con.key
|
p@21
|
759 }
|
p@21
|
760
|
p@21
|
761 url = u"http://www.example.com/resource"
|
p@21
|
762 req = oauth.Request(method="PUT", url=url, parameters=params, body="Hello World!", is_form_encoded=False)
|
p@21
|
763 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
|
p@21
|
764 self.failUnlessReallyEqual(req['oauth_body_hash'], 'Lve95gjOVATpfV8EL5X4nxwjKHE=')
|
p@21
|
765 self.failUnlessReallyEqual(req['oauth_signature'], 't+MX8l/0S8hdbVQL99nD0X1fPnM=')
|
p@21
|
766 # oauth-bodyhash.html A.1 has
|
p@21
|
767 # '08bUFF%2Fjmp59mWB7cSgCYBUpJ0U%3D', but I don't see how that
|
p@21
|
768 # is possible.
|
p@21
|
769
|
p@21
|
770 # Example 1b
|
p@21
|
771 params = {
|
p@21
|
772 'oauth_version': "1.0",
|
p@21
|
773 'oauth_token': tok.key,
|
p@21
|
774 'oauth_nonce': 10369470270925,
|
p@21
|
775 'oauth_timestamp': 1236874236,
|
p@21
|
776 'oauth_consumer_key': con.key
|
p@21
|
777 }
|
p@21
|
778
|
p@21
|
779 req = oauth.Request(method="PUT", url=url, parameters=params, body="Hello World!", is_form_encoded=False)
|
p@21
|
780 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
|
p@21
|
781 self.failUnlessReallyEqual(req['oauth_body_hash'], 'Lve95gjOVATpfV8EL5X4nxwjKHE=')
|
p@21
|
782 self.failUnlessReallyEqual(req['oauth_signature'], 'CTFmrqJIGT7NsWJ42OrujahTtTc=')
|
p@21
|
783
|
p@21
|
784 # Appendix A.2
|
p@21
|
785 params = {
|
p@21
|
786 'oauth_version': "1.0",
|
p@21
|
787 'oauth_token': tok.key,
|
p@21
|
788 'oauth_nonce': 8628868109991,
|
p@21
|
789 'oauth_timestamp': 1238395022,
|
p@21
|
790 'oauth_consumer_key': con.key
|
p@21
|
791 }
|
p@21
|
792
|
p@21
|
793 req = oauth.Request(method="GET", url=url, parameters=params, is_form_encoded=False)
|
p@21
|
794 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, None)
|
p@21
|
795 self.failUnlessReallyEqual(req['oauth_body_hash'], '2jmj7l5rSw0yVb/vlWAYkK/YBwk=')
|
p@21
|
796 self.failUnlessReallyEqual(req['oauth_signature'], 'Zhl++aWSP0O3/hYQ0CuBc7jv38I=')
|
p@21
|
797
|
p@21
|
798
|
p@21
|
799 def test_sign_request(self):
|
p@21
|
800 url = "http://sp.example.com/"
|
p@21
|
801
|
p@21
|
802 params = {
|
p@21
|
803 'oauth_version': "1.0",
|
p@21
|
804 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
805 'oauth_timestamp': "137131200"
|
p@21
|
806 }
|
p@21
|
807
|
p@21
|
808 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
|
p@21
|
809 con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
|
p@21
|
810
|
p@21
|
811 params['oauth_token'] = tok.key
|
p@21
|
812 params['oauth_consumer_key'] = con.key
|
p@21
|
813 req = oauth.Request(method="GET", url=url, parameters=params)
|
p@21
|
814
|
p@21
|
815 methods = {
|
p@21
|
816 'DX01TdHws7OninCLK9VztNTH1M4=': oauth.SignatureMethod_HMAC_SHA1(),
|
p@21
|
817 'con-test-secret&tok-test-secret': oauth.SignatureMethod_PLAINTEXT()
|
p@21
|
818 }
|
p@21
|
819
|
p@21
|
820 for exp, method in methods.items():
|
p@21
|
821 req.sign_request(method, con, tok)
|
p@21
|
822 self.assertEquals(req['oauth_signature_method'], method.name)
|
p@21
|
823 self.assertEquals(req['oauth_signature'], exp)
|
p@21
|
824
|
p@21
|
825 # Also if there are non-ascii chars in the URL.
|
p@21
|
826 url = "http://sp.example.com/\xe2\x80\x99" # utf-8 bytes
|
p@21
|
827 req = oauth.Request(method="GET", url=url, parameters=params)
|
p@21
|
828 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok)
|
p@21
|
829 self.assertEquals(req['oauth_signature'], 'loFvp5xC7YbOgd9exIO6TxB7H4s=')
|
p@21
|
830
|
p@21
|
831 url = u'http://sp.example.com/\u2019' # Python unicode object
|
p@21
|
832 req = oauth.Request(method="GET", url=url, parameters=params)
|
p@21
|
833 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok)
|
p@21
|
834 self.assertEquals(req['oauth_signature'], 'loFvp5xC7YbOgd9exIO6TxB7H4s=')
|
p@21
|
835
|
p@21
|
836 # Also if there are non-ascii chars in the query args.
|
p@21
|
837 url = "http://sp.example.com/?q=\xe2\x80\x99" # utf-8 bytes
|
p@21
|
838 req = oauth.Request(method="GET", url=url, parameters=params)
|
p@21
|
839 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok)
|
p@21
|
840 self.assertEquals(req['oauth_signature'], 'IBw5mfvoCsDjgpcsVKbyvsDqQaU=')
|
p@21
|
841
|
p@21
|
842 url = u'http://sp.example.com/?q=\u2019' # Python unicode object
|
p@21
|
843 req = oauth.Request(method="GET", url=url, parameters=params)
|
p@21
|
844 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), con, tok)
|
p@21
|
845 self.assertEquals(req['oauth_signature'], 'IBw5mfvoCsDjgpcsVKbyvsDqQaU=')
|
p@21
|
846
|
p@21
|
847 def test_from_request(self):
|
p@21
|
848 url = "http://sp.example.com/"
|
p@21
|
849
|
p@21
|
850 params = {
|
p@21
|
851 'oauth_version': "1.0",
|
p@21
|
852 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
853 'oauth_timestamp': "137131200",
|
p@21
|
854 'oauth_consumer_key': "0685bd9184jfhq22",
|
p@21
|
855 'oauth_signature_method': "HMAC-SHA1",
|
p@21
|
856 'oauth_token': "ad180jjd733klru7",
|
p@21
|
857 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
|
p@21
|
858 }
|
p@21
|
859
|
p@21
|
860 req = oauth.Request("GET", url, params)
|
p@21
|
861 headers = req.to_header()
|
p@21
|
862
|
p@21
|
863 # Test from the headers
|
p@21
|
864 req = oauth.Request.from_request("GET", url, headers)
|
p@21
|
865 self.assertEquals(req.method, "GET")
|
p@21
|
866 self.assertEquals(req.url, url)
|
p@21
|
867
|
p@21
|
868 self.assertEquals(params, req.copy())
|
p@21
|
869
|
p@21
|
870 # Test with bad OAuth headers
|
p@21
|
871 bad_headers = {
|
p@21
|
872 'Authorization' : 'OAuth this is a bad header'
|
p@21
|
873 }
|
p@21
|
874
|
p@21
|
875 self.assertRaises(oauth.Error, oauth.Request.from_request, "GET",
|
p@21
|
876 url, bad_headers)
|
p@21
|
877
|
p@21
|
878 # Test getting from query string
|
p@21
|
879 qs = urllib.urlencode(params)
|
p@21
|
880 req = oauth.Request.from_request("GET", url, query_string=qs)
|
p@21
|
881
|
p@21
|
882 exp = parse_qs(qs, keep_blank_values=False)
|
p@21
|
883 for k, v in exp.iteritems():
|
p@21
|
884 exp[k] = urllib.unquote(v[0])
|
p@21
|
885
|
p@21
|
886 self.assertEquals(exp, req.copy())
|
p@21
|
887
|
p@21
|
888 # Test that a boned from_request() call returns None
|
p@21
|
889 req = oauth.Request.from_request("GET", url)
|
p@21
|
890 self.assertEquals(None, req)
|
p@21
|
891
|
p@21
|
892 def test_from_token_and_callback(self):
|
p@21
|
893 url = "http://sp.example.com/"
|
p@21
|
894
|
p@21
|
895 params = {
|
p@21
|
896 'oauth_version': "1.0",
|
p@21
|
897 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
898 'oauth_timestamp': "137131200",
|
p@21
|
899 'oauth_consumer_key': "0685bd9184jfhq22",
|
p@21
|
900 'oauth_signature_method': "HMAC-SHA1",
|
p@21
|
901 'oauth_token': "ad180jjd733klru7",
|
p@21
|
902 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D",
|
p@21
|
903 }
|
p@21
|
904
|
p@21
|
905 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
|
p@21
|
906 req = oauth.Request.from_token_and_callback(tok)
|
p@21
|
907 self.assertFalse('oauth_callback' in req)
|
p@21
|
908 self.assertEquals(req['oauth_token'], tok.key)
|
p@21
|
909
|
p@21
|
910 req = oauth.Request.from_token_and_callback(tok, callback=url)
|
p@21
|
911 self.assertTrue('oauth_callback' in req)
|
p@21
|
912 self.assertEquals(req['oauth_callback'], url)
|
p@21
|
913
|
p@21
|
914 def test_from_consumer_and_token(self):
|
p@21
|
915 url = "http://sp.example.com/"
|
p@21
|
916
|
p@21
|
917 tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
|
p@21
|
918 tok.set_verifier('this_is_a_test_verifier')
|
p@21
|
919 con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
|
p@21
|
920 req = oauth.Request.from_consumer_and_token(con, token=tok,
|
p@21
|
921 http_method="GET", http_url=url)
|
p@21
|
922
|
p@21
|
923 self.assertEquals(req['oauth_token'], tok.key)
|
p@21
|
924 self.assertEquals(req['oauth_consumer_key'], con.key)
|
p@21
|
925 self.assertEquals(tok.verifier, req['oauth_verifier'])
|
p@21
|
926
|
p@21
|
927 class SignatureMethod_Bad(oauth.SignatureMethod):
|
p@21
|
928 name = "BAD"
|
p@21
|
929
|
p@21
|
930 def signing_base(self, request, consumer, token):
|
p@21
|
931 return ""
|
p@21
|
932
|
p@21
|
933 def sign(self, request, consumer, token):
|
p@21
|
934 return "invalid-signature"
|
p@21
|
935
|
p@21
|
936
|
p@21
|
937 class TestServer(unittest.TestCase):
|
p@21
|
938 def setUp(self):
|
p@21
|
939 url = "http://sp.example.com/"
|
p@21
|
940
|
p@21
|
941 params = {
|
p@21
|
942 'oauth_version': "1.0",
|
p@21
|
943 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
944 'oauth_timestamp': int(time.time()),
|
p@21
|
945 'bar': 'blerg',
|
p@21
|
946 'multi': ['FOO','BAR'],
|
p@21
|
947 'foo': 59
|
p@21
|
948 }
|
p@21
|
949
|
p@21
|
950 self.consumer = oauth.Consumer(key="consumer-key",
|
p@21
|
951 secret="consumer-secret")
|
p@21
|
952 self.token = oauth.Token(key="token-key", secret="token-secret")
|
p@21
|
953
|
p@21
|
954 params['oauth_token'] = self.token.key
|
p@21
|
955 params['oauth_consumer_key'] = self.consumer.key
|
p@21
|
956 self.request = oauth.Request(method="GET", url=url, parameters=params)
|
p@21
|
957
|
p@21
|
958 signature_method = oauth.SignatureMethod_HMAC_SHA1()
|
p@21
|
959 self.request.sign_request(signature_method, self.consumer, self.token)
|
p@21
|
960
|
p@21
|
961 def test_init(self):
|
p@21
|
962 server = oauth.Server(signature_methods={'HMAC-SHA1' : oauth.SignatureMethod_HMAC_SHA1()})
|
p@21
|
963 self.assertTrue('HMAC-SHA1' in server.signature_methods)
|
p@21
|
964 self.assertTrue(isinstance(server.signature_methods['HMAC-SHA1'],
|
p@21
|
965 oauth.SignatureMethod_HMAC_SHA1))
|
p@21
|
966
|
p@21
|
967 server = oauth.Server()
|
p@21
|
968 self.assertEquals(server.signature_methods, {})
|
p@21
|
969
|
p@21
|
970 def test_add_signature_method(self):
|
p@21
|
971 server = oauth.Server()
|
p@21
|
972 res = server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
|
p@21
|
973 self.assertTrue(len(res) == 1)
|
p@21
|
974 self.assertTrue('HMAC-SHA1' in res)
|
p@21
|
975 self.assertTrue(isinstance(res['HMAC-SHA1'],
|
p@21
|
976 oauth.SignatureMethod_HMAC_SHA1))
|
p@21
|
977
|
p@21
|
978 res = server.add_signature_method(oauth.SignatureMethod_PLAINTEXT())
|
p@21
|
979 self.assertTrue(len(res) == 2)
|
p@21
|
980 self.assertTrue('PLAINTEXT' in res)
|
p@21
|
981 self.assertTrue(isinstance(res['PLAINTEXT'],
|
p@21
|
982 oauth.SignatureMethod_PLAINTEXT))
|
p@21
|
983
|
p@21
|
984 def test_verify_request(self):
|
p@21
|
985 server = oauth.Server()
|
p@21
|
986 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
|
p@21
|
987
|
p@21
|
988 parameters = server.verify_request(self.request, self.consumer,
|
p@21
|
989 self.token)
|
p@21
|
990
|
p@21
|
991 self.assertTrue('bar' in parameters)
|
p@21
|
992 self.assertTrue('foo' in parameters)
|
p@21
|
993 self.assertTrue('multi' in parameters)
|
p@21
|
994 self.assertEquals(parameters['bar'], 'blerg')
|
p@21
|
995 self.assertEquals(parameters['foo'], 59)
|
p@21
|
996 self.assertEquals(parameters['multi'], ['FOO','BAR'])
|
p@21
|
997
|
p@21
|
998 def test_build_authenticate_header(self):
|
p@21
|
999 server = oauth.Server()
|
p@21
|
1000 headers = server.build_authenticate_header('example.com')
|
p@21
|
1001 self.assertTrue('WWW-Authenticate' in headers)
|
p@21
|
1002 self.assertEquals('OAuth realm="example.com"',
|
p@21
|
1003 headers['WWW-Authenticate'])
|
p@21
|
1004
|
p@21
|
1005 def test_no_version(self):
|
p@21
|
1006 url = "http://sp.example.com/"
|
p@21
|
1007
|
p@21
|
1008 params = {
|
p@21
|
1009 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
1010 'oauth_timestamp': int(time.time()),
|
p@21
|
1011 'bar': 'blerg',
|
p@21
|
1012 'multi': ['FOO','BAR'],
|
p@21
|
1013 'foo': 59
|
p@21
|
1014 }
|
p@21
|
1015
|
p@21
|
1016 self.consumer = oauth.Consumer(key="consumer-key",
|
p@21
|
1017 secret="consumer-secret")
|
p@21
|
1018 self.token = oauth.Token(key="token-key", secret="token-secret")
|
p@21
|
1019
|
p@21
|
1020 params['oauth_token'] = self.token.key
|
p@21
|
1021 params['oauth_consumer_key'] = self.consumer.key
|
p@21
|
1022 self.request = oauth.Request(method="GET", url=url, parameters=params)
|
p@21
|
1023
|
p@21
|
1024 signature_method = oauth.SignatureMethod_HMAC_SHA1()
|
p@21
|
1025 self.request.sign_request(signature_method, self.consumer, self.token)
|
p@21
|
1026
|
p@21
|
1027 server = oauth.Server()
|
p@21
|
1028 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
|
p@21
|
1029
|
p@21
|
1030 parameters = server.verify_request(self.request, self.consumer,
|
p@21
|
1031 self.token)
|
p@21
|
1032
|
p@21
|
1033 def test_invalid_version(self):
|
p@21
|
1034 url = "http://sp.example.com/"
|
p@21
|
1035
|
p@21
|
1036 params = {
|
p@21
|
1037 'oauth_version': '222.9922',
|
p@21
|
1038 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
1039 'oauth_timestamp': int(time.time()),
|
p@21
|
1040 'bar': 'blerg',
|
p@21
|
1041 'multi': ['foo','bar'],
|
p@21
|
1042 'foo': 59
|
p@21
|
1043 }
|
p@21
|
1044
|
p@21
|
1045 consumer = oauth.Consumer(key="consumer-key",
|
p@21
|
1046 secret="consumer-secret")
|
p@21
|
1047 token = oauth.Token(key="token-key", secret="token-secret")
|
p@21
|
1048
|
p@21
|
1049 params['oauth_token'] = token.key
|
p@21
|
1050 params['oauth_consumer_key'] = consumer.key
|
p@21
|
1051 request = oauth.Request(method="GET", url=url, parameters=params)
|
p@21
|
1052
|
p@21
|
1053 signature_method = oauth.SignatureMethod_HMAC_SHA1()
|
p@21
|
1054 request.sign_request(signature_method, consumer, token)
|
p@21
|
1055
|
p@21
|
1056 server = oauth.Server()
|
p@21
|
1057 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
|
p@21
|
1058
|
p@21
|
1059 self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
|
p@21
|
1060
|
p@21
|
1061 def test_invalid_signature_method(self):
|
p@21
|
1062 url = "http://sp.example.com/"
|
p@21
|
1063
|
p@21
|
1064 params = {
|
p@21
|
1065 'oauth_version': '1.0',
|
p@21
|
1066 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
1067 'oauth_timestamp': int(time.time()),
|
p@21
|
1068 'bar': 'blerg',
|
p@21
|
1069 'multi': ['FOO','BAR'],
|
p@21
|
1070 'foo': 59
|
p@21
|
1071 }
|
p@21
|
1072
|
p@21
|
1073 consumer = oauth.Consumer(key="consumer-key",
|
p@21
|
1074 secret="consumer-secret")
|
p@21
|
1075 token = oauth.Token(key="token-key", secret="token-secret")
|
p@21
|
1076
|
p@21
|
1077 params['oauth_token'] = token.key
|
p@21
|
1078 params['oauth_consumer_key'] = consumer.key
|
p@21
|
1079 request = oauth.Request(method="GET", url=url, parameters=params)
|
p@21
|
1080
|
p@21
|
1081 signature_method = SignatureMethod_Bad()
|
p@21
|
1082 request.sign_request(signature_method, consumer, token)
|
p@21
|
1083
|
p@21
|
1084 server = oauth.Server()
|
p@21
|
1085 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
|
p@21
|
1086
|
p@21
|
1087 self.assertRaises(oauth.Error, server.verify_request, request,
|
p@21
|
1088 consumer, token)
|
p@21
|
1089
|
p@21
|
1090 def test_missing_signature(self):
|
p@21
|
1091 url = "http://sp.example.com/"
|
p@21
|
1092
|
p@21
|
1093 params = {
|
p@21
|
1094 'oauth_version': '1.0',
|
p@21
|
1095 'oauth_nonce': "4572616e48616d6d65724c61686176",
|
p@21
|
1096 'oauth_timestamp': int(time.time()),
|
p@21
|
1097 'bar': 'blerg',
|
p@21
|
1098 'multi': ['FOO','BAR'],
|
p@21
|
1099 'foo': 59
|
p@21
|
1100 }
|
p@21
|
1101
|
p@21
|
1102 consumer = oauth.Consumer(key="consumer-key",
|
p@21
|
1103 secret="consumer-secret")
|
p@21
|
1104 token = oauth.Token(key="token-key", secret="token-secret")
|
p@21
|
1105
|
p@21
|
1106 params['oauth_token'] = token.key
|
p@21
|
1107 params['oauth_consumer_key'] = consumer.key
|
p@21
|
1108 request = oauth.Request(method="GET", url=url, parameters=params)
|
p@21
|
1109
|
p@21
|
1110 signature_method = oauth.SignatureMethod_HMAC_SHA1()
|
p@21
|
1111 request.sign_request(signature_method, consumer, token)
|
p@21
|
1112 del request['oauth_signature']
|
p@21
|
1113
|
p@21
|
1114 server = oauth.Server()
|
p@21
|
1115 server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())
|
p@21
|
1116
|
p@21
|
1117 self.assertRaises(oauth.MissingSignature, server.verify_request,
|
p@21
|
1118 request, consumer, token)
|
p@21
|
1119
|
p@21
|
1120
|
p@21
|
1121 # Request Token: http://oauth-sandbox.sevengoslings.net/request_token
|
p@21
|
1122 # Auth: http://oauth-sandbox.sevengoslings.net/authorize
|
p@21
|
1123 # Access Token: http://oauth-sandbox.sevengoslings.net/access_token
|
p@21
|
1124 # Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged
|
p@21
|
1125 # Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged
|
p@21
|
1126 # Key: bd37aed57e15df53
|
p@21
|
1127 # Secret: 0e9e6413a9ef49510a4f68ed02cd
|
p@21
|
1128 class TestClient(unittest.TestCase):
|
p@21
|
1129 # oauth_uris = {
|
p@21
|
1130 # 'request_token': '/request_token.php',
|
p@21
|
1131 # 'access_token': '/access_token.php'
|
p@21
|
1132 # }
|
p@21
|
1133
|
p@21
|
1134 oauth_uris = {
|
p@21
|
1135 'request_token': '/request_token',
|
p@21
|
1136 'authorize': '/authorize',
|
p@21
|
1137 'access_token': '/access_token',
|
p@21
|
1138 'two_legged': '/two_legged',
|
p@21
|
1139 'three_legged': '/three_legged'
|
p@21
|
1140 }
|
p@21
|
1141
|
p@21
|
1142 consumer_key = 'bd37aed57e15df53'
|
p@21
|
1143 consumer_secret = '0e9e6413a9ef49510a4f68ed02cd'
|
p@21
|
1144 host = 'http://oauth-sandbox.sevengoslings.net'
|
p@21
|
1145
|
p@21
|
1146 def setUp(self):
|
p@21
|
1147 self.consumer = oauth.Consumer(key=self.consumer_key,
|
p@21
|
1148 secret=self.consumer_secret)
|
p@21
|
1149
|
p@21
|
1150 self.body = {
|
p@21
|
1151 'foo': 'bar',
|
p@21
|
1152 'bar': 'foo',
|
p@21
|
1153 'multi': ['FOO','BAR'],
|
p@21
|
1154 'blah': 599999
|
p@21
|
1155 }
|
p@21
|
1156
|
p@21
|
1157 def _uri(self, type):
|
p@21
|
1158 uri = self.oauth_uris.get(type)
|
p@21
|
1159 if uri is None:
|
p@21
|
1160 raise KeyError("%s is not a valid OAuth URI type." % type)
|
p@21
|
1161
|
p@21
|
1162 return "%s%s" % (self.host, uri)
|
p@21
|
1163
|
p@21
|
1164 def create_simple_multipart_data(self, data):
|
p@21
|
1165 boundary = '---Boundary-%d' % random.randint(1,1000)
|
p@21
|
1166 crlf = '\r\n'
|
p@21
|
1167 items = []
|
p@21
|
1168 for key, value in data.iteritems():
|
p@21
|
1169 items += [
|
p@21
|
1170 '--'+boundary,
|
p@21
|
1171 'Content-Disposition: form-data; name="%s"'%str(key),
|
p@21
|
1172 '',
|
p@21
|
1173 str(value),
|
p@21
|
1174 ]
|
p@21
|
1175 items += ['', '--'+boundary+'--', '']
|
p@21
|
1176 content_type = 'multipart/form-data; boundary=%s' % boundary
|
p@21
|
1177 return content_type, crlf.join(items)
|
p@21
|
1178
|
p@21
|
1179 def test_init(self):
|
p@21
|
1180 class Blah():
|
p@21
|
1181 pass
|
p@21
|
1182
|
p@21
|
1183 try:
|
p@21
|
1184 client = oauth.Client(Blah())
|
p@21
|
1185 self.fail("Client.__init__() accepted invalid Consumer.")
|
p@21
|
1186 except ValueError:
|
p@21
|
1187 pass
|
p@21
|
1188
|
p@21
|
1189 consumer = oauth.Consumer('token', 'secret')
|
p@21
|
1190 try:
|
p@21
|
1191 client = oauth.Client(consumer, Blah())
|
p@21
|
1192 self.fail("Client.__init__() accepted invalid Token.")
|
p@21
|
1193 except ValueError:
|
p@21
|
1194 pass
|
p@21
|
1195
|
p@21
|
1196 def test_access_token_get(self):
|
p@21
|
1197 """Test getting an access token via GET."""
|
p@21
|
1198 client = oauth.Client(self.consumer, None)
|
p@21
|
1199 resp, content = client.request(self._uri('request_token'), "GET")
|
p@21
|
1200
|
p@21
|
1201 self.assertEquals(int(resp['status']), 200)
|
p@21
|
1202
|
p@21
|
1203 def test_access_token_post(self):
|
p@21
|
1204 """Test getting an access token via POST."""
|
p@21
|
1205 client = oauth.Client(self.consumer, None)
|
p@21
|
1206 resp, content = client.request(self._uri('request_token'), "POST")
|
p@21
|
1207
|
p@21
|
1208 self.assertEquals(int(resp['status']), 200)
|
p@21
|
1209
|
p@21
|
1210 res = dict(parse_qsl(content))
|
p@21
|
1211 self.assertTrue('oauth_token' in res)
|
p@21
|
1212 self.assertTrue('oauth_token_secret' in res)
|
p@21
|
1213
|
p@21
|
1214 def _two_legged(self, method):
|
p@21
|
1215 client = oauth.Client(self.consumer, None)
|
p@21
|
1216
|
p@21
|
1217 return client.request(self._uri('two_legged'), method,
|
p@21
|
1218 body=urllib.urlencode(self.body))
|
p@21
|
1219
|
p@21
|
1220 def test_two_legged_post(self):
|
p@21
|
1221 """A test of a two-legged OAuth POST request."""
|
p@21
|
1222 resp, content = self._two_legged("POST")
|
p@21
|
1223
|
p@21
|
1224 self.assertEquals(int(resp['status']), 200)
|
p@21
|
1225
|
p@21
|
1226 def test_two_legged_get(self):
|
p@21
|
1227 """A test of a two-legged OAuth GET request."""
|
p@21
|
1228 resp, content = self._two_legged("GET")
|
p@21
|
1229 self.assertEquals(int(resp['status']), 200)
|
p@21
|
1230
|
p@21
|
1231 @mock.patch('httplib2.Http.request')
|
p@21
|
1232 def test_multipart_post_does_not_alter_body(self, mockHttpRequest):
|
p@21
|
1233 random_result = random.randint(1,100)
|
p@21
|
1234
|
p@21
|
1235 data = {
|
p@21
|
1236 'rand-%d'%random.randint(1,100):random.randint(1,100),
|
p@21
|
1237 }
|
p@21
|
1238 content_type, body = self.create_simple_multipart_data(data)
|
p@21
|
1239
|
p@21
|
1240 client = oauth.Client(self.consumer, None)
|
p@21
|
1241 uri = self._uri('two_legged')
|
p@21
|
1242
|
p@21
|
1243 def mockrequest(cl, ur, **kw):
|
p@21
|
1244 self.failUnless(cl is client)
|
p@21
|
1245 self.failUnless(ur is uri)
|
p@21
|
1246 self.failUnlessEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers']))
|
p@21
|
1247 self.failUnlessEqual(kw['body'], body)
|
p@21
|
1248 self.failUnlessEqual(kw['connection_type'], None)
|
p@21
|
1249 self.failUnlessEqual(kw['method'], 'POST')
|
p@21
|
1250 self.failUnlessEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS)
|
p@21
|
1251 self.failUnless(isinstance(kw['headers'], dict))
|
p@21
|
1252
|
p@21
|
1253 return random_result
|
p@21
|
1254
|
p@21
|
1255 mockHttpRequest.side_effect = mockrequest
|
p@21
|
1256
|
p@21
|
1257 result = client.request(uri, 'POST', headers={'Content-Type':content_type}, body=body)
|
p@21
|
1258 self.assertEqual(result, random_result)
|
p@21
|
1259
|
p@21
|
1260 @mock.patch('httplib2.Http.request')
|
p@21
|
1261 def test_url_with_query_string(self, mockHttpRequest):
|
p@21
|
1262 uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf'
|
p@21
|
1263 client = oauth.Client(self.consumer, None)
|
p@21
|
1264 random_result = random.randint(1,100)
|
p@21
|
1265
|
p@21
|
1266 def mockrequest(cl, ur, **kw):
|
p@21
|
1267 self.failUnless(cl is client)
|
p@21
|
1268 self.failUnlessEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers']))
|
p@21
|
1269 self.failUnlessEqual(kw['body'], '')
|
p@21
|
1270 self.failUnlessEqual(kw['connection_type'], None)
|
p@21
|
1271 self.failUnlessEqual(kw['method'], 'GET')
|
p@21
|
1272 self.failUnlessEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS)
|
p@21
|
1273 self.failUnless(isinstance(kw['headers'], dict))
|
p@21
|
1274
|
p@21
|
1275 req = oauth.Request.from_consumer_and_token(self.consumer, None,
|
p@21
|
1276 http_method='GET', http_url=uri, parameters={})
|
p@21
|
1277 req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.consumer, None)
|
p@21
|
1278 expected = parse_qsl(urlparse.urlparse(req.to_url()).query)
|
p@21
|
1279 actual = parse_qsl(urlparse.urlparse(ur).query)
|
p@21
|
1280 self.failUnlessEqual(len(expected), len(actual))
|
p@21
|
1281 actual = dict(actual)
|
p@21
|
1282 for key, value in expected:
|
p@21
|
1283 if key not in ('oauth_signature', 'oauth_nonce', 'oauth_timestamp'):
|
p@21
|
1284 self.failUnlessEqual(actual[key], value)
|
p@21
|
1285
|
p@21
|
1286 return random_result
|
p@21
|
1287
|
p@21
|
1288 mockHttpRequest.side_effect = mockrequest
|
p@21
|
1289
|
p@21
|
1290 client.request(uri, 'GET')
|
p@21
|
1291
|
p@21
|
1292 @mock.patch('httplib2.Http.request')
|
p@21
|
1293 @mock.patch('oauth2.Request.from_consumer_and_token')
|
p@21
|
1294 def test_multiple_values_for_a_key(self, mockReqConstructor, mockHttpRequest):
|
p@21
|
1295 client = oauth.Client(self.consumer, None)
|
p@21
|
1296
|
p@21
|
1297 request = oauth.Request("GET", "http://example.com/fetch.php", parameters={'multi': ['1', '2']})
|
p@21
|
1298 mockReqConstructor.return_value = request
|
p@21
|
1299
|
p@21
|
1300 client.request('http://whatever', 'POST', body='multi=1&multi=2')
|
p@21
|
1301
|
p@21
|
1302 self.failUnlessEqual(mockReqConstructor.call_count, 1)
|
p@21
|
1303 self.failUnlessEqual(mockReqConstructor.call_args[1]['parameters'], {'multi': ['1', '2']})
|
p@21
|
1304
|
p@21
|
1305 self.failUnless('multi=1' in mockHttpRequest.call_args[1]['body'])
|
p@21
|
1306 self.failUnless('multi=2' in mockHttpRequest.call_args[1]['body'])
|
p@21
|
1307
|
p@21
|
1308 if __name__ == "__main__":
|
p@21
|
1309 unittest.main()
|