Mercurial > hg > sworduploader
view sword2-libraries-pyinstaller-compatible/tests/http/test_sss.py @ 22:d1752c7031e4 timeouts tip
Updated .hgignore to ignore sword2_logging.conf and anything in .cache
author | Steve Welburn <stephen.welburn@eecs.qmul.ac.uk> |
---|---|
date | Tue, 22 Jan 2013 14:43:42 +0000 |
parents | 8b69bba225c9 |
children |
line wrap: on
line source
from . import TestController from sword2 import Connection, Entry from sword2.exceptions import PackagingFormatNotAvailable from sword2.compatible_libs import json SSS_PY_URL="http://sword-app.svn.sourceforge.net/viewvc/sword-app/sss/trunk/sss.py?revision=HEAD" PORT_NUMBER="8081" import subprocess, urllib, tempfile import os import atexit from time import sleep long_service_doc = '''<?xml version="1.0" ?> <service xmlns:dcterms="http://purl.org/dc/terms/" xmlns:sword="http://purl.org/net/sword/terms/" xmlns:atom="http://www.w3.org/2005/Atom" xmlns="http://www.w3.org/2007/app"> <sword:version>2.0</sword:version> <sword:maxUploadSize>16777216</sword:maxUploadSize> <workspace> <atom:title>Main Site</atom:title> <collection href="http://swordapp.org/col-iri/43"> <atom:title>Collection 43</atom:title> <accept>*/*</accept> <accept alternate="multipart-related">*/*</accept> <sword:collectionPolicy>Collection Policy</sword:collectionPolicy> <dcterms:abstract>Collection Description</dcterms:abstract> <sword:mediation>false</sword:mediation> <sword:treatment>Treatment description</sword:treatment> <sword:acceptPackaging>http://purl.org/net/sword/package/SimpleZip</sword:acceptPackaging> <sword:acceptPackaging>http://purl.org/net/sword/package/METSDSpaceSIP</sword:acceptPackaging> <sword:service>http://swordapp.org/sd-iri/e4</sword:service> </collection> </workspace> <workspace> <atom:title>Sub-site</atom:title> <collection href="http://swordapp.org/col-iri/44"> <atom:title>Collection 44</atom:title> <accept>*/*</accept> <accept alternate="multipart-related">*/*</accept> <sword:collectionPolicy>Collection Policy</sword:collectionPolicy> <dcterms:abstract>Collection Description</dcterms:abstract> <sword:mediation>true</sword:mediation> <sword:treatment>Treatment description</sword:treatment> <sword:acceptPackaging>http://purl.org/net/sword/package/SimpleZip</sword:acceptPackaging> <sword:service>http://swordapp.org/sd-iri/e5</sword:service> <sword:service>http://swordapp.org/sd-iri/e6</sword:service> <sword:service>http://swordapp.org/sd-iri/e7</sword:service> <sword:service>http://swordapp.org/sd-iri/e8</sword:service> </collection> <collection href="http://swordapp.org/col-iri/46"> <atom:title>Collection 46</atom:title> <accept>application/zip</accept> <accept alternate="multipart-related">application/zip</accept> <sword:collectionPolicy>Only Theses</sword:collectionPolicy> <dcterms:abstract>Theses dropbox</dcterms:abstract> <sword:mediation>true</sword:mediation> <sword:treatment>Treatment description</sword:treatment> <sword:acceptPackaging>http://purl.org/net/sword/package/SimpleZip</sword:acceptPackaging> </collection> </workspace> </service>''' f, sss_path = tempfile.mkstemp(suffix=".py") os.close(f) urllib.urlretrieve(SSS_PY_URL, sss_path) sss_pid = subprocess.Popen(['python', sss_path, PORT_NUMBER]) sleep(1) atexit.register(sss_pid.kill) class TestConnection(TestController): def test_01_blank_init(self): conn = Connection("http://example.org/service-doc") assert conn.sd_iri == "http://example.org/service-doc" assert conn.sd == None def test_02_init_then_load_from_string(self): conn = Connection("http://example.org/service-doc") assert conn.sd_iri == "http://example.org/service-doc" assert conn.sd == None conn.load_service_document(long_service_doc) assert conn.sd != None assert len(conn.sd.workspaces) == 2 assert len(conn.workspaces) == 2 assert conn.sd.workspaces[0][0] == "Main Site" assert conn.sd.workspaces[1][0] == "Sub-site" assert len(conn.sd.workspaces[1][1]) == 2 def test_03_init_then_load_from_string_t_history(self): conn = Connection("http://example.org/service-doc") assert conn.sd_iri == "http://example.org/service-doc" assert conn.sd == None conn.load_service_document(long_service_doc) # Should have made a two client 'transactions', the init and subsequent XML load assert len(conn.history) == 2 assert conn.history[0]['type'] == "init" assert conn.history[1]['type'] == "SD Parse" def test_04_init_from_sss_then_get_doc(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword") assert conn.sd_iri == "http://localhost:%s/sd-uri" % PORT_NUMBER assert conn.sd == None # Not asked to get sd doc yet conn.get_service_document() assert conn.sd != None assert conn.sd.parsed == True assert conn.sd.valid == True assert len(conn.sd.workspaces) == 1 def test_05_init_from_sss(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) assert conn.sd_iri == "http://localhost:%s/sd-uri" % PORT_NUMBER assert conn.sd != None assert conn.sd.parsed == True assert conn.sd.valid == True assert len(conn.sd.workspaces) == 1 def test_06_Simple_POST_to_sss(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) resp = conn.create(payload = "Payload is just a load of text", mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', workspace = 'Main Site', collection = conn.sd.workspaces[0][1][0].title, in_progress=True, metadata_entry=None) assert resp.code == 201 def test_07_Multipart_POST_to_sss(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") resp = conn.create(payload = "Multipart payload here", metadata_entry = e, mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', workspace='Main Site', collection=conn.sd.workspaces[0][1][0].title, in_progress=True) assert resp.code == 201 def test_08_Simple_POST_to_sss_w_coliri(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") resp = conn.create(payload = "Payload is just a load of text", mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', col_iri = conn.sd.workspaces[0][1][0].href, in_progress=True, metadata_entry=None) assert resp.code == 201 def test_09_Multipart_POST_to_sss_w_coliri(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") resp = conn.create(payload = "Multipart payload here", metadata_entry = e, mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', col_iri = conn.sd.workspaces[0][1][0].href, in_progress=True) assert resp != None def test_10_Multipart_POST_then_update_on_EM_IRI(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", metadata_entry = e, mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', col_iri = conn.workspaces[0][1][0].href, in_progress=True) assert deposit_receipt.edit_media != None dr = conn.update(payload = "Multipart_POST_then_update_on_EM_IRI -- updated resource", mimetype = "text/plain", filename = "readthis.txt", packaging = "http://purl.org/net/sword/package/Binary", edit_media_iri = deposit_receipt.edit_media) assert dr.code == 204 # empty response def test_11_Multipart_POST_then_update_metadata_on_Edit_IRI(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", metadata_entry = e, mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', col_iri = conn.sd.workspaces[0][1][0].href, in_progress=True) assert deposit_receipt.code == 201 # new resource e.add_fields(dcterms_newfield = "blah de blah") dr = conn.update_metadata_for_resource(edit_iri = deposit_receipt.edit, metadata_entry = e) assert (dr.code == 200 or dr.code == 204) def test_12_Metadata_POST_to_sss(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") resp = conn.create(metadata_entry = e, workspace='Main Site', collection=conn.sd.workspaces[0][1][0].title, in_progress=True) assert resp != None def test_13_Metadata_POST_to_sss_altering_entry(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") e.add_fields(dcterms_identifier="doi://somerubbish", dcterms_foo="blah blah") resp = conn.create(metadata_entry = e, workspace='Main Site', collection=conn.sd.workspaces[0][1][0].title, in_progress=False) assert resp != None def test_14_Invalid_Packaging_cached_receipt(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True, honour_receipts=True) col_iri = conn.sd.workspaces[0][1][0].href # pick the first collection dr = conn.create(payload = "Payload is just a load of text", mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', col_iri = col_iri, in_progress=True) # Now to GET that resource with invalid packaging try: content = conn.get_resource(dr.cont_iri, packaging="foofar") assert 1 == 0 # fail except PackagingFormatNotAvailable: # test the 'honour_receipts' flag and cached deposit pass def test_15_Metadata_POST_to_sss_w_coliri(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") dr = conn.create(metadata_entry = e, col_iri = conn.sd.workspaces[0][1][0].href, in_progress=True) assert dr.code == 201 def test_16_Invalid_Packaging_cached_receipt(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True, honour_receipts=True) col_iri = conn.sd.workspaces[0][1][0].href # pick the first collection dr = conn.create(payload = "Payload is just a load of text", mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', col_iri = col_iri, in_progress=True, metadata_entry=None) # Now to GET that resource with invalid packaging try: content = conn.get_resource(dr.cont_iri, packaging="foofar") assert 1 == 0 # fail except PackagingFormatNotAvailable: # test the 'honour_receipts' flag and cached deposit pass def test_17_Simple_POST_and_GET(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) col_iri = conn.sd.workspaces[0][1][0].href # pick the first collection dr = conn.create(payload = "Simple_POST_and_GET", mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', col_iri = col_iri, in_progress=True, metadata_entry=None) assert dr.code == 201 # Now to GET that resource with no prescribed for packaging content_object = conn.get_resource(dr.cont_iri) # Can't guarantee that sss.py won't mangle submissions, so can't validate response at this moment assert content_object != None def test_18_Metadata_POST_to_se_iri(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", metadata_entry = e, mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', col_iri = conn.sd.workspaces[0][1][0].href, in_progress=True) assert deposit_receipt.se_iri != None e.add_fields(dcterms_identifier="doi://somerubbish", dcterms_foo="blah blah") dr = conn.append(se_iri = deposit_receipt.se_iri, metadata_entry = e, in_progress=False) assert dr.code == 201 def test_19_File_POST_to_se_iri(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", metadata_entry = e, mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', col_iri = conn.sd.workspaces[0][1][0].href, in_progress=True) assert deposit_receipt.se_iri != None dr = conn.append(se_iri = deposit_receipt.se_iri, payload = "Multipart_POST_then_appending_file_on_SE_IRI -- updated resource", mimetype = "text/plain", filename = "readthisextrafile.txt", packaging = "http://purl.org/net/sword/package/Binary") assert dr.code == 201 def test_20_Multipart_POST_to_se_iri(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", metadata_entry = e, mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', col_iri = conn.sd.workspaces[0][1][0].href, in_progress=True) assert deposit_receipt.se_iri != None e.add_fields(dcterms_identifier="doi://multipart_update_to_SE_IRI") dr = conn.append(se_iri = deposit_receipt.se_iri, payload = "Multipart_POST_then_appending_file_on_SE_IRI -- updated resource", mimetype = "text/plain", filename = "readthisextrafile.txt", packaging = "http://purl.org/net/sword/package/Binary", metadata_entry = e) print dr.code assert dr.code == 201 def test_21_Create_deposit_and_delete_content(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", metadata_entry = e, mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', col_iri = conn.sd.workspaces[0][1][0].href, in_progress=True) assert deposit_receipt.edit_media != None dr = conn.delete(resource_iri = deposit_receipt.edit_media) assert dr.code == 204 or dr.code == 200 def test_22_Create_deposit_and_delete_deposit(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", metadata_entry = e, mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', col_iri = conn.sd.workspaces[0][1][0].href, in_progress=True) assert deposit_receipt.edit != None dr = conn.delete(resource_iri = deposit_receipt.edit) assert dr.code == 204 or dr.code == 200 def test_23_Finish_in_progress_deposit(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", metadata_entry = e, mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', col_iri = conn.sd.workspaces[0][1][0].href, in_progress=True) assert deposit_receipt.edit != None dr = conn.complete_deposit(se_iri = deposit_receipt.se_iri) print "This will fail until the sss.py SWORD2 server responds properly, rather than with code 201" assert dr.code == 200 def test_24_get_sword_statement(self): conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", metadata_entry = e, mimetype = "text/plain", filename = "readme.txt", packaging = 'http://purl.org/net/sword/package/Binary', col_iri = conn.sd.workspaces[0][1][0].href, in_progress=True) ss_iri = None for item_dict in deposit_receipt.links['http://purl.org/net/sword/terms/statement']: if item_dict.has_key('type') and item_dict.get('type', None) == "application/atom+xml;type=feed": ss_iri = item_dict.get('href') assert ss_iri != None ss = conn.get_atom_sword_statement(ss_iri) assert ss != None assert ss.entries[0].metadata.get('sword_depositedBy') == 'sword'