yading@7: """
yading@7: The MIT License
yading@7:
yading@7: Copyright (c) 2007 Leah Culver
yading@7:
yading@7: Permission is hereby granted, free of charge, to any person obtaining a copy
yading@7: of this software and associated documentation files (the "Software"), to deal
yading@7: in the Software without restriction, including without limitation the rights
yading@7: to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
yading@7: copies of the Software, and to permit persons to whom the Software is
yading@7: furnished to do so, subject to the following conditions:
yading@7:
yading@7: The above copyright notice and this permission notice shall be included in
yading@7: all copies or substantial portions of the Software.
yading@7:
yading@7: THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
yading@7: IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
yading@7: FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
yading@7: AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
yading@7: LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
yading@7: OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
yading@7: THE SOFTWARE.
yading@7: """
yading@7:
yading@7: import cgi
yading@7: import urllib
yading@7: import time
yading@7: import random
yading@7: import urlparse
yading@7: import hmac
yading@7: import binascii
yading@7: import re
yading@7:
yading@7:
yading@7: VERSION = '1.0' # Hi Blaine!
yading@7: HTTP_METHOD = 'GET'
yading@7: SIGNATURE_METHOD = 'PLAINTEXT'
yading@7:
yading@7:
yading@7: class OAuthError(RuntimeError):
yading@7: """Generic exception class."""
yading@7: def __init__(self, message='OAuth error occured.'):
yading@7: self.message = message
yading@7:
yading@7: def build_authenticate_header(realm=''):
yading@7: """Optional WWW-Authenticate header (401 error)"""
yading@7: return {'WWW-Authenticate': 'OAuth realm="%s"' % realm}
yading@7:
yading@7: def escape(s):
yading@7: """Escape a URL including any /."""
yading@7: return urllib.quote(s, safe='~')
yading@7:
yading@7: def _utf8_str(s):
yading@7: """Convert unicode to utf-8."""
yading@7: if isinstance(s, unicode):
yading@7: return s.encode("utf-8")
yading@7: else:
yading@7: return str(s)
yading@7:
yading@7: def generate_timestamp():
yading@7: """Get seconds since epoch (UTC)."""
yading@7: return int(time.time())
yading@7:
yading@7: def generate_nonce(length=8):
yading@7: """Generate pseudorandom number."""
yading@7: return ''.join([str(random.randint(0, 9)) for i in range(length)])
yading@7:
yading@7: def generate_verifier(length=8):
yading@7: """Generate pseudorandom number."""
yading@7: return ''.join([str(random.randint(0, 9)) for i in range(length)])
yading@7:
yading@7:
yading@7: class OAuthConsumer(object):
yading@7: """Consumer of OAuth authentication.
yading@7:
yading@7: OAuthConsumer is a data type that represents the identity of the Consumer
yading@7: via its shared secret with the Service Provider.
yading@7:
yading@7: """
yading@7: key = None
yading@7: secret = None
yading@7:
yading@7: def __init__(self, key, secret):
yading@7: self.key = key
yading@7: self.secret = secret
yading@7:
yading@7:
yading@7: class OAuthToken(object):
yading@7: """OAuthToken is a data type that represents an End User via either an access
yading@7: or request token.
yading@7:
yading@7: key -- the token
yading@7: secret -- the token secret
yading@7:
yading@7: """
yading@7: key = None
yading@7: secret = None
yading@7: callback = None
yading@7: callback_confirmed = None
yading@7: verifier = None
yading@7:
yading@7: def __init__(self, key, secret):
yading@7: self.key = key
yading@7: self.secret = secret
yading@7:
yading@7: def set_callback(self, callback):
yading@7: self.callback = callback
yading@7: self.callback_confirmed = 'true'
yading@7:
yading@7: def set_verifier(self, verifier=None):
yading@7: if verifier is not None:
yading@7: self.verifier = verifier
yading@7: else:
yading@7: self.verifier = generate_verifier()
yading@7:
yading@7: def get_callback_url(self):
yading@7: if self.callback and self.verifier:
yading@7: # Append the oauth_verifier.
yading@7: parts = urlparse.urlparse(self.callback)
yading@7: scheme, netloc, path, params, query, fragment = parts[:6]
yading@7: if query:
yading@7: query = '%s&oauth_verifier=%s' % (query, self.verifier)
yading@7: else:
yading@7: query = 'oauth_verifier=%s' % self.verifier
yading@7: return urlparse.urlunparse((scheme, netloc, path, params,
yading@7: query, fragment))
yading@7: return self.callback
yading@7:
yading@7: def to_string(self):
yading@7: data = {
yading@7: 'oauth_token': self.key,
yading@7: 'oauth_token_secret': self.secret,
yading@7: }
yading@7: if self.callback_confirmed is not None:
yading@7: data['oauth_callback_confirmed'] = self.callback_confirmed
yading@7: return urllib.urlencode(data)
yading@7:
yading@7: def from_string(s):
yading@7: """ Returns a token from something like:
yading@7: oauth_token_secret=xxx&oauth_token=xxx
yading@7: """
yading@7: print "******* %s" % s.__class__
yading@7: #params = urlparse.parse_qs(s, keep_blank_values=False)
yading@7:
yading@7: key = re.search("(\w.+)", s).groups()[0]
yading@7: print "@@@@@@ key: %s" %key
yading@7: secret = re.search("(\w.+)", s).groups()[0]
yading@7: print "@@@@@@ secret: %s" % secret
yading@7: token = OAuthToken(key, secret)
yading@7:
yading@7: return token
yading@7: from_string = staticmethod(from_string)
yading@7:
yading@7: def __str__(self):
yading@7: return self.to_string()
yading@7:
yading@7:
yading@7: class OAuthRequest(object):
yading@7: """OAuthRequest represents the request and can be serialized.
yading@7:
yading@7: OAuth parameters:
yading@7: - oauth_consumer_key
yading@7: - oauth_token
yading@7: - oauth_signature_method
yading@7: - oauth_signature
yading@7: - oauth_timestamp
yading@7: - oauth_nonce
yading@7: - oauth_version
yading@7: - oauth_verifier
yading@7: ... any additional parameters, as defined by the Service Provider.
yading@7: """
yading@7: parameters = None # OAuth parameters.
yading@7: http_method = HTTP_METHOD
yading@7: http_url = None
yading@7: version = VERSION
yading@7:
yading@7: def __init__(self, http_method=HTTP_METHOD, http_url=None, parameters=None):
yading@7: self.http_method = http_method
yading@7: self.http_url = http_url
yading@7: self.parameters = parameters or {}
yading@7:
yading@7: def set_parameter(self, parameter, value):
yading@7: self.parameters[parameter] = value
yading@7:
yading@7: def get_parameter(self, parameter):
yading@7: try:
yading@7: return self.parameters[parameter]
yading@7: except:
yading@7: raise OAuthError('Parameter not found: %s' % parameter)
yading@7:
yading@7: def _get_timestamp_nonce(self):
yading@7: return self.get_parameter('oauth_timestamp'), self.get_parameter(
yading@7: 'oauth_nonce')
yading@7:
yading@7: def get_nonoauth_parameters(self):
yading@7: """Get any non-OAuth parameters."""
yading@7: parameters = {}
yading@7: for k, v in self.parameters.iteritems():
yading@7: # Ignore oauth parameters.
yading@7: if k.find('oauth_') < 0:
yading@7: parameters[k] = v
yading@7: return parameters
yading@7:
yading@7: def to_header(self, realm=''):
yading@7: """Serialize as a header for an HTTPAuth request."""
yading@7: auth_header = 'OAuth realm="%s"' % realm
yading@7: # Add the oauth parameters.
yading@7: if self.parameters:
yading@7: for k, v in self.parameters.iteritems():
yading@7: if k[:6] == 'oauth_':
yading@7: auth_header += ', %s="%s"' % (k, escape(str(v)))
yading@7: return {'Authorization': auth_header}
yading@7:
yading@7: def to_postdata(self):
yading@7: """Serialize as post data for a POST request."""
yading@7: return '&'.join(['%s=%s' % (escape(str(k)), escape(str(v))) \
yading@7: for k, v in self.parameters.iteritems()])
yading@7:
yading@7: def to_url(self):
yading@7: """Serialize as a URL for a GET request."""
yading@7: return '%s?%s' % (self.get_normalized_http_url(), self.to_postdata())
yading@7:
yading@7: def get_normalized_parameters(self):
yading@7: """Return a string that contains the parameters that must be signed."""
yading@7: params = self.parameters
yading@7: try:
yading@7: # Exclude the signature if it exists.
yading@7: del params['oauth_signature']
yading@7: except:
yading@7: pass
yading@7: # Escape key values before sorting.
yading@7: key_values = [(escape(_utf8_str(k)), escape(_utf8_str(v))) \
yading@7: for k,v in params.items()]
yading@7: # Sort lexicographically, first after key, then after value.
yading@7: key_values.sort()
yading@7: # Combine key value pairs into a string.
yading@7: return '&'.join(['%s=%s' % (k, v) for k, v in key_values])
yading@7:
yading@7: def get_normalized_http_method(self):
yading@7: """Uppercases the http method."""
yading@7: return self.http_method.upper()
yading@7:
yading@7: def get_normalized_http_url(self):
yading@7: """Parses the URL and rebuilds it to be scheme://host/path."""
yading@7: parts = urlparse.urlparse(self.http_url)
yading@7: scheme, netloc, path = parts[:3]
yading@7: # Exclude default port numbers.
yading@7: if scheme == 'http' and netloc[-3:] == ':80':
yading@7: netloc = netloc[:-3]
yading@7: elif scheme == 'https' and netloc[-4:] == ':443':
yading@7: netloc = netloc[:-4]
yading@7: return '%s://%s%s' % (scheme, netloc, path)
yading@7:
yading@7: def sign_request(self, signature_method, consumer, token):
yading@7: """Set the signature parameter to the result of build_signature."""
yading@7: # Set the signature method.
yading@7: self.set_parameter('oauth_signature_method',
yading@7: signature_method.get_name())
yading@7: # Set the signature.
yading@7: self.set_parameter('oauth_signature',
yading@7: self.build_signature(signature_method, consumer, token))
yading@7:
yading@7: def build_signature(self, signature_method, consumer, token):
yading@7: """Calls the build signature method within the signature method."""
yading@7: return signature_method.build_signature(self, consumer, token)
yading@7:
yading@7: def from_request(http_method, http_url, headers=None, parameters=None,
yading@7: query_string=None):
yading@7: """Combines multiple parameter sources."""
yading@7: if parameters is None:
yading@7: parameters = {}
yading@7:
yading@7: # Headers
yading@7: if headers and 'Authorization' in headers:
yading@7: auth_header = headers['Authorization']
yading@7: # Check that the authorization header is OAuth.
yading@7: if auth_header[:6] == 'OAuth ':
yading@7: auth_header = auth_header[6:]
yading@7: try:
yading@7: # Get the parameters from the header.
yading@7: header_params = OAuthRequest._split_header(auth_header)
yading@7: parameters.update(header_params)
yading@7: except:
yading@7: raise OAuthError('Unable to parse OAuth parameters from '
yading@7: 'Authorization header.')
yading@7:
yading@7: # GET or POST query string.
yading@7: if query_string:
yading@7: query_params = OAuthRequest._split_url_string(query_string)
yading@7: parameters.update(query_params)
yading@7:
yading@7: # URL parameters.
yading@7: param_str = urlparse.urlparse(http_url)[4] # query
yading@7: url_params = OAuthRequest._split_url_string(param_str)
yading@7: parameters.update(url_params)
yading@7:
yading@7: if parameters:
yading@7: return OAuthRequest(http_method, http_url, parameters)
yading@7:
yading@7: return None
yading@7: from_request = staticmethod(from_request)
yading@7:
yading@7: def from_consumer_and_token(oauth_consumer, token=None,
yading@7: callback=None, verifier=None, http_method=HTTP_METHOD,
yading@7: http_url=None, parameters=None):
yading@7: if not parameters:
yading@7: parameters = {}
yading@7:
yading@7: defaults = {
yading@7: 'oauth_consumer_key': oauth_consumer.key,
yading@7: 'oauth_timestamp': generate_timestamp(),
yading@7: 'oauth_nonce': generate_nonce(),
yading@7: 'oauth_version': OAuthRequest.version,
yading@7: }
yading@7:
yading@7: defaults.update(parameters)
yading@7: parameters = defaults
yading@7:
yading@7: if token:
yading@7: parameters['oauth_token'] = token.key
yading@7: if token.callback:
yading@7: parameters['oauth_callback'] = token.callback
yading@7: # 1.0a support for verifier.
yading@7: if verifier:
yading@7: parameters['oauth_verifier'] = verifier
yading@7: elif callback:
yading@7: # 1.0a support for callback in the request token request.
yading@7: parameters['oauth_callback'] = callback
yading@7:
yading@7: return OAuthRequest(http_method, http_url, parameters)
yading@7: from_consumer_and_token = staticmethod(from_consumer_and_token)
yading@7:
yading@7: def from_token_and_callback(token, callback=None, http_method=HTTP_METHOD,
yading@7: http_url=None, parameters=None):
yading@7: if not parameters:
yading@7: parameters = {}
yading@7:
yading@7: parameters['oauth_token'] = token.key
yading@7:
yading@7: if callback:
yading@7: parameters['oauth_callback'] = callback
yading@7:
yading@7: return OAuthRequest(http_method, http_url, parameters)
yading@7: from_token_and_callback = staticmethod(from_token_and_callback)
yading@7:
yading@7: def _split_header(header):
yading@7: """Turn Authorization: header into parameters."""
yading@7: params = {}
yading@7: parts = header.split(',')
yading@7: for param in parts:
yading@7: # Ignore realm parameter.
yading@7: if param.find('realm') > -1:
yading@7: continue
yading@7: # Remove whitespace.
yading@7: param = param.strip()
yading@7: # Split key-value.
yading@7: param_parts = param.split('=', 1)
yading@7: # Remove quotes and unescape the value.
yading@7: params[param_parts[0]] = urllib.unquote(param_parts[1].strip('\"'))
yading@7: return params
yading@7: _split_header = staticmethod(_split_header)
yading@7:
yading@7: def _split_url_string(param_str):
yading@7: """Turn URL string into parameters."""
yading@7: parameters = cgi.parse_qs(param_str, keep_blank_values=False)
yading@7: for k, v in parameters.iteritems():
yading@7: parameters[k] = urllib.unquote(v[0])
yading@7: return parameters
yading@7: _split_url_string = staticmethod(_split_url_string)
yading@7:
yading@7: class OAuthServer(object):
yading@7: """A worker to check the validity of a request against a data store."""
yading@7: timestamp_threshold = 300 # In seconds, five minutes.
yading@7: version = VERSION
yading@7: signature_methods = None
yading@7: data_store = None
yading@7:
yading@7: def __init__(self, data_store=None, signature_methods=None):
yading@7: self.data_store = data_store
yading@7: self.signature_methods = signature_methods or {}
yading@7:
yading@7: def set_data_store(self, data_store):
yading@7: self.data_store = data_store
yading@7:
yading@7: def get_data_store(self):
yading@7: return self.data_store
yading@7:
yading@7: def add_signature_method(self, signature_method):
yading@7: self.signature_methods[signature_method.get_name()] = signature_method
yading@7: return self.signature_methods
yading@7:
yading@7: def fetch_request_token(self, oauth_request):
yading@7: """Processes a request_token request and returns the
yading@7: request token on success.
yading@7: """
yading@7: try:
yading@7: # Get the request token for authorization.
yading@7: token = self._get_token(oauth_request, 'request')
yading@7: except OAuthError:
yading@7: # No token required for the initial token request.
yading@7: version = self._get_version(oauth_request)
yading@7: consumer = self._get_consumer(oauth_request)
yading@7: try:
yading@7: callback = self.get_callback(oauth_request)
yading@7: except OAuthError:
yading@7: callback = None # 1.0, no callback specified.
yading@7: self._check_signature(oauth_request, consumer, None)
yading@7: # Fetch a new token.
yading@7: token = self.data_store.fetch_request_token(consumer, callback)
yading@7: return token
yading@7:
yading@7: def fetch_access_token(self, oauth_request):
yading@7: """Processes an access_token request and returns the
yading@7: access token on success.
yading@7: """
yading@7: version = self._get_version(oauth_request)
yading@7: consumer = self._get_consumer(oauth_request)
yading@7: try:
yading@7: verifier = self._get_verifier(oauth_request)
yading@7: except OAuthError:
yading@7: verifier = None
yading@7: # Get the request token.
yading@7: token = self._get_token(oauth_request, 'request')
yading@7: self._check_signature(oauth_request, consumer, token)
yading@7: new_token = self.data_store.fetch_access_token(consumer, token, verifier)
yading@7: return new_token
yading@7:
yading@7: def verify_request(self, oauth_request):
yading@7: """Verifies an api call and checks all the parameters."""
yading@7: # -> consumer and token
yading@7: version = self._get_version(oauth_request)
yading@7: consumer = self._get_consumer(oauth_request)
yading@7: # Get the access token.
yading@7: token = self._get_token(oauth_request, 'access')
yading@7: self._check_signature(oauth_request, consumer, token)
yading@7: parameters = oauth_request.get_nonoauth_parameters()
yading@7: return consumer, token, parameters
yading@7:
yading@7: def authorize_token(self, token, user):
yading@7: """Authorize a request token."""
yading@7: return self.data_store.authorize_request_token(token, user)
yading@7:
yading@7: def get_callback(self, oauth_request):
yading@7: """Get the callback URL."""
yading@7: return oauth_request.get_parameter('oauth_callback')
yading@7:
yading@7: def build_authenticate_header(self, realm=''):
yading@7: """Optional support for the authenticate header."""
yading@7: return {'WWW-Authenticate': 'OAuth realm="%s"' % realm}
yading@7:
yading@7: def _get_version(self, oauth_request):
yading@7: """Verify the correct version request for this server."""
yading@7: try:
yading@7: version = oauth_request.get_parameter('oauth_version')
yading@7: except:
yading@7: version = VERSION
yading@7: if version and version != self.version:
yading@7: raise OAuthError('OAuth version %s not supported.' % str(version))
yading@7: return version
yading@7:
yading@7: def _get_signature_method(self, oauth_request):
yading@7: """Figure out the signature with some defaults."""
yading@7: try:
yading@7: signature_method = oauth_request.get_parameter(
yading@7: 'oauth_signature_method')
yading@7: except:
yading@7: signature_method = SIGNATURE_METHOD
yading@7: try:
yading@7: # Get the signature method object.
yading@7: signature_method = self.signature_methods[signature_method]
yading@7: except:
yading@7: signature_method_names = ', '.join(self.signature_methods.keys())
yading@7: raise OAuthError('Signature method %s not supported try one of the '
yading@7: 'following: %s' % (signature_method, signature_method_names))
yading@7:
yading@7: return signature_method
yading@7:
yading@7: def _get_consumer(self, oauth_request):
yading@7: consumer_key = oauth_request.get_parameter('oauth_consumer_key')
yading@7: consumer = self.data_store.lookup_consumer(consumer_key)
yading@7: if not consumer:
yading@7: raise OAuthError('Invalid consumer.')
yading@7: return consumer
yading@7:
yading@7: def _get_token(self, oauth_request, token_type='access'):
yading@7: """Try to find the token for the provided request token key."""
yading@7: token_field = oauth_request.get_parameter('oauth_token')
yading@7: token = self.data_store.lookup_token(token_type, token_field)
yading@7: if not token:
yading@7: raise OAuthError('Invalid %s token: %s' % (token_type, token_field))
yading@7: return token
yading@7:
yading@7: def _get_verifier(self, oauth_request):
yading@7: return oauth_request.get_parameter('oauth_verifier')
yading@7:
yading@7: def _check_signature(self, oauth_request, consumer, token):
yading@7: timestamp, nonce = oauth_request._get_timestamp_nonce()
yading@7: self._check_timestamp(timestamp)
yading@7: self._check_nonce(consumer, token, nonce)
yading@7: signature_method = self._get_signature_method(oauth_request)
yading@7: try:
yading@7: signature = oauth_request.get_parameter('oauth_signature')
yading@7: except:
yading@7: raise OAuthError('Missing signature.')
yading@7: # Validate the signature.
yading@7: valid_sig = signature_method.check_signature(oauth_request, consumer,
yading@7: token, signature)
yading@7: if not valid_sig:
yading@7: key, base = signature_method.build_signature_base_string(
yading@7: oauth_request, consumer, token)
yading@7: raise OAuthError('Invalid signature. Expected signature base '
yading@7: 'string: %s' % base)
yading@7: built = signature_method.build_signature(oauth_request, consumer, token)
yading@7:
yading@7: def _check_timestamp(self, timestamp):
yading@7: """Verify that timestamp is recentish."""
yading@7: timestamp = int(timestamp)
yading@7: now = int(time.time())
yading@7: lapsed = abs(now - timestamp)
yading@7: if lapsed > self.timestamp_threshold:
yading@7: raise OAuthError('Expired timestamp: given %d and now %s has a '
yading@7: 'greater difference than threshold %d' %
yading@7: (timestamp, now, self.timestamp_threshold))
yading@7:
yading@7: def _check_nonce(self, consumer, token, nonce):
yading@7: """Verify that the nonce is uniqueish."""
yading@7: nonce = self.data_store.lookup_nonce(consumer, token, nonce)
yading@7: if nonce:
yading@7: raise OAuthError('Nonce already used: %s' % str(nonce))
yading@7:
yading@7:
yading@7: class OAuthClient(object):
yading@7: """OAuthClient is a worker to attempt to execute a request."""
yading@7: consumer = None
yading@7: token = None
yading@7:
yading@7: def __init__(self, oauth_consumer, oauth_token):
yading@7: self.consumer = oauth_consumer
yading@7: self.token = oauth_token
yading@7:
yading@7: def get_consumer(self):
yading@7: return self.consumer
yading@7:
yading@7: def get_token(self):
yading@7: return self.token
yading@7:
yading@7: def fetch_request_token(self, oauth_request):
yading@7: """-> OAuthToken."""
yading@7: raise NotImplementedError
yading@7:
yading@7: def fetch_access_token(self, oauth_request):
yading@7: """-> OAuthToken."""
yading@7: raise NotImplementedError
yading@7:
yading@7: def access_resource(self, oauth_request):
yading@7: """-> Some protected resource."""
yading@7: raise NotImplementedError
yading@7:
yading@7:
yading@7: class OAuthDataStore(object):
yading@7: """A database abstraction used to lookup consumers and tokens."""
yading@7:
yading@7: def lookup_consumer(self, key):
yading@7: """-> OAuthConsumer."""
yading@7: raise NotImplementedError
yading@7:
yading@7: def lookup_token(self, oauth_consumer, token_type, token_token):
yading@7: """-> OAuthToken."""
yading@7: raise NotImplementedError
yading@7:
yading@7: def lookup_nonce(self, oauth_consumer, oauth_token, nonce):
yading@7: """-> OAuthToken."""
yading@7: raise NotImplementedError
yading@7:
yading@7: def fetch_request_token(self, oauth_consumer, oauth_callback):
yading@7: """-> OAuthToken."""
yading@7: raise NotImplementedError
yading@7:
yading@7: def fetch_access_token(self, oauth_consumer, oauth_token, oauth_verifier):
yading@7: """-> OAuthToken."""
yading@7: raise NotImplementedError
yading@7:
yading@7: def authorize_request_token(self, oauth_token, user):
yading@7: """-> OAuthToken."""
yading@7: raise NotImplementedError
yading@7:
yading@7:
yading@7: class OAuthSignatureMethod(object):
yading@7: """A strategy class that implements a signature method."""
yading@7: def get_name(self):
yading@7: """-> str."""
yading@7: raise NotImplementedError
yading@7:
yading@7: def build_signature_base_string(self, oauth_request, oauth_consumer, oauth_token):
yading@7: """-> str key, str raw."""
yading@7: raise NotImplementedError
yading@7:
yading@7: def build_signature(self, oauth_request, oauth_consumer, oauth_token):
yading@7: """-> str."""
yading@7: raise NotImplementedError
yading@7:
yading@7: def check_signature(self, oauth_request, consumer, token, signature):
yading@7: built = self.build_signature(oauth_request, consumer, token)
yading@7: return built == signature
yading@7:
yading@7:
yading@7: class OAuthSignatureMethod_HMAC_SHA1(OAuthSignatureMethod):
yading@7:
yading@7: def get_name(self):
yading@7: return 'HMAC-SHA1'
yading@7:
yading@7: def build_signature_base_string(self, oauth_request, consumer, token):
yading@7: sig = (
yading@7: escape(oauth_request.get_normalized_http_method()),
yading@7: escape(oauth_request.get_normalized_http_url()),
yading@7: escape(oauth_request.get_normalized_parameters()),
yading@7: )
yading@7:
yading@7: key = '%s&' % escape(consumer.secret)
yading@7: if token:
yading@7: key += escape(token.secret)
yading@7: raw = '&'.join(sig)
yading@7: return key, raw
yading@7:
yading@7: def build_signature(self, oauth_request, consumer, token):
yading@7: """Builds the base signature string."""
yading@7: key, raw = self.build_signature_base_string(oauth_request, consumer,
yading@7: token)
yading@7:
yading@7: # HMAC object.
yading@7: try:
yading@7: import hashlib # 2.5
yading@7: hashed = hmac.new(key, raw, hashlib.sha1)
yading@7: except:
yading@7: import sha # Deprecated
yading@7: hashed = hmac.new(key, raw, sha)
yading@7:
yading@7: # Calculate the digest base 64.
yading@7: return binascii.b2a_base64(hashed.digest())[:-1]
yading@7:
yading@7:
yading@7: class OAuthSignatureMethod_PLAINTEXT(OAuthSignatureMethod):
yading@7:
yading@7: def get_name(self):
yading@7: return 'PLAINTEXT'
yading@7:
yading@7: def build_signature_base_string(self, oauth_request, consumer, token):
yading@7: """Concatenates the consumer key and secret."""
yading@7: sig = '%s&' % escape(consumer.secret)
yading@7: if token:
yading@7: sig = sig + escape(token.secret)
yading@7: return sig, sig
yading@7:
yading@7: def build_signature(self, oauth_request, consumer, token):
yading@7: key, raw = self.build_signature_base_string(oauth_request, consumer,
yading@7: token)
yading@7: return key