p@21: """ p@21: The MIT License p@21: p@21: Copyright (c) 2007 Leah Culver p@21: p@21: Permission is hereby granted, free of charge, to any person obtaining a copy p@21: of this software and associated documentation files (the "Software"), to deal p@21: in the Software without restriction, including without limitation the rights p@21: to use, copy, modify, merge, publish, distribute, sublicense, and/or sell p@21: copies of the Software, and to permit persons to whom the Software is p@21: furnished to do so, subject to the following conditions: p@21: p@21: The above copyright notice and this permission notice shall be included in p@21: all copies or substantial portions of the Software. p@21: p@21: THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR p@21: IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, p@21: FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE p@21: AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER p@21: LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, p@21: OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN p@21: THE SOFTWARE. p@21: """ p@21: p@21: from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer p@21: import urllib p@21: p@21: import oauth.oauth as oauth p@21: p@21: # fake urls for the test server p@21: REQUEST_TOKEN_URL = 'https://photos.example.net/request_token' p@21: ACCESS_TOKEN_URL = 'https://photos.example.net/access_token' p@21: AUTHORIZATION_URL = 'https://photos.example.net/authorize' p@21: CALLBACK_URL = 'http://printer.example.com/request_token_ready' p@21: RESOURCE_URL = 'http://photos.example.net/photos' p@21: REALM = 'http://photos.example.net/' p@21: VERIFIER = 'verifier' p@21: p@21: # example store for one of each thing p@21: class MockOAuthDataStore(oauth.OAuthDataStore): p@21: p@21: def __init__(self): p@21: self.consumer = oauth.OAuthConsumer('key', 'secret') p@21: self.request_token = oauth.OAuthToken('requestkey', 'requestsecret') p@21: self.access_token = oauth.OAuthToken('accesskey', 'accesssecret') p@21: self.nonce = 'nonce' p@21: self.verifier = VERIFIER p@21: p@21: def lookup_consumer(self, key): p@21: if key == self.consumer.key: p@21: return self.consumer p@21: return None p@21: p@21: def lookup_token(self, token_type, token): p@21: token_attrib = getattr(self, '%s_token' % token_type) p@21: if token == token_attrib.key: p@21: ## HACK p@21: token_attrib.set_callback(CALLBACK_URL) p@21: return token_attrib p@21: return None p@21: p@21: def lookup_nonce(self, oauth_consumer, oauth_token, nonce): p@21: if oauth_token and oauth_consumer.key == self.consumer.key and (oauth_token.key == self.request_token.key or oauth_token.key == self.access_token.key) and nonce == self.nonce: p@21: return self.nonce p@21: return None p@21: p@21: def fetch_request_token(self, oauth_consumer, oauth_callback): p@21: if oauth_consumer.key == self.consumer.key: p@21: if oauth_callback: p@21: # want to check here if callback is sensible p@21: # for mock store, we assume it is p@21: self.request_token.set_callback(oauth_callback) p@21: return self.request_token p@21: return None p@21: p@21: def fetch_access_token(self, oauth_consumer, oauth_token, oauth_verifier): p@21: if oauth_consumer.key == self.consumer.key and oauth_token.key == self.request_token.key and oauth_verifier == self.verifier: p@21: # want to check here if token is authorized p@21: # for mock store, we assume it is p@21: return self.access_token p@21: return None p@21: p@21: def authorize_request_token(self, oauth_token, user): p@21: if oauth_token.key == self.request_token.key: p@21: # authorize the request token in the store p@21: # for mock store, do nothing p@21: return self.request_token p@21: return None p@21: p@21: class RequestHandler(BaseHTTPRequestHandler): p@21: p@21: def __init__(self, *args, **kwargs): p@21: self.oauth_server = oauth.OAuthServer(MockOAuthDataStore()) p@21: self.oauth_server.add_signature_method(oauth.OAuthSignatureMethod_PLAINTEXT()) p@21: self.oauth_server.add_signature_method(oauth.OAuthSignatureMethod_HMAC_SHA1()) p@21: BaseHTTPRequestHandler.__init__(self, *args, **kwargs) p@21: p@21: # example way to send an oauth error p@21: def send_oauth_error(self, err=None): p@21: # send a 401 error p@21: self.send_error(401, str(err.message)) p@21: # return the authenticate header p@21: header = oauth.build_authenticate_header(realm=REALM) p@21: for k, v in header.iteritems(): p@21: self.send_header(k, v) p@21: p@21: def do_GET(self): p@21: p@21: # debug info p@21: #print self.command, self.path, self.headers p@21: p@21: # get the post data (if any) p@21: postdata = None p@21: if self.command == 'POST': p@21: try: p@21: length = int(self.headers.getheader('content-length')) p@21: postdata = self.rfile.read(length) p@21: except: p@21: pass p@21: p@21: # construct the oauth request from the request parameters p@21: oauth_request = oauth.OAuthRequest.from_request(self.command, self.path, headers=self.headers, query_string=postdata) p@21: p@21: # request token p@21: if self.path.startswith(REQUEST_TOKEN_URL): p@21: try: p@21: # create a request token p@21: token = self.oauth_server.fetch_request_token(oauth_request) p@21: # send okay response p@21: self.send_response(200, 'OK') p@21: self.end_headers() p@21: # return the token p@21: self.wfile.write(token.to_string()) p@21: except oauth.OAuthError, err: p@21: self.send_oauth_error(err) p@21: return p@21: p@21: # user authorization p@21: if self.path.startswith(AUTHORIZATION_URL): p@21: try: p@21: # get the request token p@21: token = self.oauth_server.fetch_request_token(oauth_request) p@21: # authorize the token (kind of does nothing for now) p@21: token = self.oauth_server.authorize_token(token, None) p@21: token.set_verifier(VERIFIER) p@21: # send okay response p@21: self.send_response(200, 'OK') p@21: self.end_headers() p@21: # return the callback url (to show server has it) p@21: self.wfile.write(token.get_callback_url()) p@21: except oauth.OAuthError, err: p@21: self.send_oauth_error(err) p@21: return p@21: p@21: # access token p@21: if self.path.startswith(ACCESS_TOKEN_URL): p@21: try: p@21: # create an access token p@21: token = self.oauth_server.fetch_access_token(oauth_request) p@21: # send okay response p@21: self.send_response(200, 'OK') p@21: self.end_headers() p@21: # return the token p@21: self.wfile.write(token.to_string()) p@21: except oauth.OAuthError, err: p@21: self.send_oauth_error(err) p@21: return p@21: p@21: # protected resources p@21: if self.path.startswith(RESOURCE_URL): p@21: try: p@21: # verify the request has been oauth authorized p@21: consumer, token, params = self.oauth_server.verify_request(oauth_request) p@21: # send okay response p@21: self.send_response(200, 'OK') p@21: self.end_headers() p@21: # return the extra parameters - just for something to return p@21: self.wfile.write(str(params)) p@21: except oauth.OAuthError, err: p@21: self.send_oauth_error(err) p@21: return p@21: p@21: def do_POST(self): p@21: return self.do_GET() p@21: p@21: def main(): p@21: try: p@21: server = HTTPServer(('', 8080), RequestHandler) p@21: print 'Test server running...' p@21: server.serve_forever() p@21: except KeyboardInterrupt: p@21: server.socket.close() p@21: p@21: if __name__ == '__main__': p@21: main()