marco@16: from . import TestController marco@16: marco@16: from sword2 import Connection, Entry marco@16: from sword2.exceptions import PackagingFormatNotAvailable marco@16: from sword2.compatible_libs import json marco@16: marco@16: SSS_PY_URL="http://sword-app.svn.sourceforge.net/viewvc/sword-app/sss/trunk/sss.py?revision=HEAD" marco@16: PORT_NUMBER="8081" marco@16: marco@16: import subprocess, urllib, tempfile marco@16: import os marco@16: marco@16: import atexit marco@16: marco@16: from time import sleep marco@16: marco@16: long_service_doc = ''' marco@16: marco@16: marco@16: 2.0 marco@16: 16777216 marco@16: marco@16: marco@16: Main Site marco@16: marco@16: marco@16: Collection 43 marco@16: */* marco@16: */* marco@16: Collection Policy marco@16: Collection Description marco@16: false marco@16: Treatment description marco@16: http://purl.org/net/sword/package/SimpleZip marco@16: http://purl.org/net/sword/package/METSDSpaceSIP marco@16: http://swordapp.org/sd-iri/e4 marco@16: marco@16: marco@16: marco@16: Sub-site marco@16: marco@16: marco@16: Collection 44 marco@16: */* marco@16: */* marco@16: Collection Policy marco@16: Collection Description marco@16: true marco@16: Treatment description marco@16: http://purl.org/net/sword/package/SimpleZip marco@16: http://swordapp.org/sd-iri/e5 marco@16: http://swordapp.org/sd-iri/e6 marco@16: http://swordapp.org/sd-iri/e7 marco@16: http://swordapp.org/sd-iri/e8 marco@16: marco@16: marco@16: Collection 46 marco@16: application/zip marco@16: application/zip marco@16: Only Theses marco@16: Theses dropbox marco@16: true marco@16: Treatment description marco@16: http://purl.org/net/sword/package/SimpleZip marco@16: marco@16: marco@16: ''' marco@16: marco@16: marco@16: f, sss_path = tempfile.mkstemp(suffix=".py") marco@16: os.close(f) marco@16: marco@16: urllib.urlretrieve(SSS_PY_URL, sss_path) marco@16: sss_pid = subprocess.Popen(['python', sss_path, PORT_NUMBER]) marco@16: sleep(1) marco@16: marco@16: atexit.register(sss_pid.kill) marco@16: marco@16: class TestConnection(TestController): marco@16: def test_01_blank_init(self): marco@16: conn = Connection("http://example.org/service-doc") marco@16: assert conn.sd_iri == "http://example.org/service-doc" marco@16: assert conn.sd == None marco@16: marco@16: def test_02_init_then_load_from_string(self): marco@16: conn = Connection("http://example.org/service-doc") marco@16: assert conn.sd_iri == "http://example.org/service-doc" marco@16: assert conn.sd == None marco@16: conn.load_service_document(long_service_doc) marco@16: assert conn.sd != None marco@16: assert len(conn.sd.workspaces) == 2 marco@16: assert len(conn.workspaces) == 2 marco@16: assert conn.sd.workspaces[0][0] == "Main Site" marco@16: assert conn.sd.workspaces[1][0] == "Sub-site" marco@16: assert len(conn.sd.workspaces[1][1]) == 2 marco@16: marco@16: def test_03_init_then_load_from_string_t_history(self): marco@16: conn = Connection("http://example.org/service-doc") marco@16: assert conn.sd_iri == "http://example.org/service-doc" marco@16: assert conn.sd == None marco@16: conn.load_service_document(long_service_doc) marco@16: # Should have made a two client 'transactions', the init and subsequent XML load marco@16: assert len(conn.history) == 2 marco@16: assert conn.history[0]['type'] == "init" marco@16: assert conn.history[1]['type'] == "SD Parse" marco@16: marco@16: def test_04_init_from_sss_then_get_doc(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword") marco@16: assert conn.sd_iri == "http://localhost:%s/sd-uri" % PORT_NUMBER marco@16: assert conn.sd == None # Not asked to get sd doc yet marco@16: conn.get_service_document() marco@16: assert conn.sd != None marco@16: assert conn.sd.parsed == True marco@16: assert conn.sd.valid == True marco@16: assert len(conn.sd.workspaces) == 1 marco@16: marco@16: def test_05_init_from_sss(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: assert conn.sd_iri == "http://localhost:%s/sd-uri" % PORT_NUMBER marco@16: assert conn.sd != None marco@16: assert conn.sd.parsed == True marco@16: assert conn.sd.valid == True marco@16: assert len(conn.sd.workspaces) == 1 marco@16: marco@16: def test_06_Simple_POST_to_sss(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: resp = conn.create(payload = "Payload is just a load of text", marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: workspace = 'Main Site', marco@16: collection = conn.sd.workspaces[0][1][0].title, marco@16: in_progress=True, marco@16: metadata_entry=None) marco@16: assert resp.code == 201 marco@16: marco@16: def test_07_Multipart_POST_to_sss(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: resp = conn.create(payload = "Multipart payload here", marco@16: metadata_entry = e, marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: workspace='Main Site', marco@16: collection=conn.sd.workspaces[0][1][0].title, marco@16: in_progress=True) marco@16: assert resp.code == 201 marco@16: marco@16: marco@16: def test_08_Simple_POST_to_sss_w_coliri(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: resp = conn.create(payload = "Payload is just a load of text", marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: col_iri = conn.sd.workspaces[0][1][0].href, marco@16: in_progress=True, marco@16: metadata_entry=None) marco@16: assert resp.code == 201 marco@16: marco@16: def test_09_Multipart_POST_to_sss_w_coliri(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: resp = conn.create(payload = "Multipart payload here", marco@16: metadata_entry = e, marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: col_iri = conn.sd.workspaces[0][1][0].href, marco@16: in_progress=True) marco@16: assert resp != None marco@16: marco@16: marco@16: def test_10_Multipart_POST_then_update_on_EM_IRI(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", marco@16: metadata_entry = e, marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: col_iri = conn.workspaces[0][1][0].href, marco@16: in_progress=True) marco@16: assert deposit_receipt.edit_media != None marco@16: dr = conn.update(payload = "Multipart_POST_then_update_on_EM_IRI -- updated resource", marco@16: mimetype = "text/plain", marco@16: filename = "readthis.txt", marco@16: packaging = "http://purl.org/net/sword/package/Binary", marco@16: edit_media_iri = deposit_receipt.edit_media) marco@16: assert dr.code == 204 # empty response marco@16: marco@16: def test_11_Multipart_POST_then_update_metadata_on_Edit_IRI(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", marco@16: metadata_entry = e, marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: col_iri = conn.sd.workspaces[0][1][0].href, marco@16: in_progress=True) marco@16: assert deposit_receipt.code == 201 # new resource marco@16: marco@16: e.add_fields(dcterms_newfield = "blah de blah") marco@16: dr = conn.update_metadata_for_resource(edit_iri = deposit_receipt.edit, marco@16: metadata_entry = e) marco@16: assert (dr.code == 200 or dr.code == 204) marco@16: marco@16: marco@16: def test_12_Metadata_POST_to_sss(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: resp = conn.create(metadata_entry = e, marco@16: workspace='Main Site', marco@16: collection=conn.sd.workspaces[0][1][0].title, marco@16: in_progress=True) marco@16: assert resp != None marco@16: marco@16: marco@16: def test_13_Metadata_POST_to_sss_altering_entry(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: e.add_fields(dcterms_identifier="doi://somerubbish", dcterms_foo="blah blah") marco@16: resp = conn.create(metadata_entry = e, marco@16: workspace='Main Site', marco@16: collection=conn.sd.workspaces[0][1][0].title, marco@16: in_progress=False) marco@16: assert resp != None marco@16: marco@16: def test_14_Invalid_Packaging_cached_receipt(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True, honour_receipts=True) marco@16: col_iri = conn.sd.workspaces[0][1][0].href # pick the first collection marco@16: dr = conn.create(payload = "Payload is just a load of text", marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: col_iri = col_iri, marco@16: in_progress=True) marco@16: # Now to GET that resource with invalid packaging marco@16: try: marco@16: content = conn.get_resource(dr.cont_iri, packaging="foofar") marco@16: assert 1 == 0 # fail marco@16: except PackagingFormatNotAvailable: marco@16: # test the 'honour_receipts' flag and cached deposit marco@16: pass marco@16: marco@16: def test_15_Metadata_POST_to_sss_w_coliri(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: dr = conn.create(metadata_entry = e, marco@16: col_iri = conn.sd.workspaces[0][1][0].href, marco@16: in_progress=True) marco@16: assert dr.code == 201 marco@16: marco@16: def test_16_Invalid_Packaging_cached_receipt(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True, honour_receipts=True) marco@16: col_iri = conn.sd.workspaces[0][1][0].href # pick the first collection marco@16: dr = conn.create(payload = "Payload is just a load of text", marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: col_iri = col_iri, marco@16: in_progress=True, marco@16: metadata_entry=None) marco@16: # Now to GET that resource with invalid packaging marco@16: try: marco@16: content = conn.get_resource(dr.cont_iri, packaging="foofar") marco@16: assert 1 == 0 # fail marco@16: except PackagingFormatNotAvailable: marco@16: # test the 'honour_receipts' flag and cached deposit marco@16: pass marco@16: marco@16: def test_17_Simple_POST_and_GET(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: col_iri = conn.sd.workspaces[0][1][0].href # pick the first collection marco@16: dr = conn.create(payload = "Simple_POST_and_GET", marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: col_iri = col_iri, marco@16: in_progress=True, marco@16: metadata_entry=None) marco@16: assert dr.code == 201 marco@16: # Now to GET that resource with no prescribed for packaging marco@16: content_object = conn.get_resource(dr.cont_iri) marco@16: # Can't guarantee that sss.py won't mangle submissions, so can't validate response at this moment marco@16: assert content_object != None marco@16: marco@16: marco@16: def test_18_Metadata_POST_to_se_iri(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", marco@16: metadata_entry = e, marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: col_iri = conn.sd.workspaces[0][1][0].href, marco@16: in_progress=True) marco@16: marco@16: assert deposit_receipt.se_iri != None marco@16: e.add_fields(dcterms_identifier="doi://somerubbish", dcterms_foo="blah blah") marco@16: dr = conn.append(se_iri = deposit_receipt.se_iri, marco@16: metadata_entry = e, marco@16: in_progress=False) marco@16: assert dr.code == 201 marco@16: marco@16: def test_19_File_POST_to_se_iri(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", marco@16: metadata_entry = e, marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: col_iri = conn.sd.workspaces[0][1][0].href, marco@16: in_progress=True) marco@16: marco@16: assert deposit_receipt.se_iri != None marco@16: dr = conn.append(se_iri = deposit_receipt.se_iri, marco@16: payload = "Multipart_POST_then_appending_file_on_SE_IRI -- updated resource", marco@16: mimetype = "text/plain", marco@16: filename = "readthisextrafile.txt", marco@16: packaging = "http://purl.org/net/sword/package/Binary") marco@16: assert dr.code == 201 marco@16: marco@16: def test_20_Multipart_POST_to_se_iri(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", marco@16: metadata_entry = e, marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: col_iri = conn.sd.workspaces[0][1][0].href, marco@16: in_progress=True) marco@16: marco@16: assert deposit_receipt.se_iri != None marco@16: e.add_fields(dcterms_identifier="doi://multipart_update_to_SE_IRI") marco@16: dr = conn.append(se_iri = deposit_receipt.se_iri, marco@16: payload = "Multipart_POST_then_appending_file_on_SE_IRI -- updated resource", marco@16: mimetype = "text/plain", marco@16: filename = "readthisextrafile.txt", marco@16: packaging = "http://purl.org/net/sword/package/Binary", marco@16: metadata_entry = e) marco@16: print dr.code marco@16: assert dr.code == 201 marco@16: marco@16: marco@16: def test_21_Create_deposit_and_delete_content(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", marco@16: metadata_entry = e, marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: col_iri = conn.sd.workspaces[0][1][0].href, marco@16: in_progress=True) marco@16: assert deposit_receipt.edit_media != None marco@16: dr = conn.delete(resource_iri = deposit_receipt.edit_media) marco@16: assert dr.code == 204 or dr.code == 200 marco@16: marco@16: marco@16: def test_22_Create_deposit_and_delete_deposit(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", marco@16: metadata_entry = e, marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: col_iri = conn.sd.workspaces[0][1][0].href, marco@16: in_progress=True) marco@16: assert deposit_receipt.edit != None marco@16: dr = conn.delete(resource_iri = deposit_receipt.edit) marco@16: assert dr.code == 204 or dr.code == 200 marco@16: marco@16: marco@16: def test_23_Finish_in_progress_deposit(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", marco@16: metadata_entry = e, marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: col_iri = conn.sd.workspaces[0][1][0].href, marco@16: in_progress=True) marco@16: assert deposit_receipt.edit != None marco@16: dr = conn.complete_deposit(se_iri = deposit_receipt.se_iri) marco@16: print "This will fail until the sss.py SWORD2 server responds properly, rather than with code 201" marco@16: assert dr.code == 200 marco@16: marco@16: def test_24_get_sword_statement(self): marco@16: conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True) marco@16: e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar") marco@16: deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI", marco@16: metadata_entry = e, marco@16: mimetype = "text/plain", marco@16: filename = "readme.txt", marco@16: packaging = 'http://purl.org/net/sword/package/Binary', marco@16: col_iri = conn.sd.workspaces[0][1][0].href, marco@16: in_progress=True) marco@16: ss_iri = None marco@16: for item_dict in deposit_receipt.links['http://purl.org/net/sword/terms/statement']: marco@16: if item_dict.has_key('type') and item_dict.get('type', None) == "application/atom+xml;type=feed": marco@16: ss_iri = item_dict.get('href') marco@16: assert ss_iri != None marco@16: ss = conn.get_atom_sword_statement(ss_iri) marco@16: assert ss != None marco@16: assert ss.entries[0].metadata.get('sword_depositedBy') == 'sword'