Chris@0: /* $Id$ Chris@0: Chris@0: Part of SWI-Prolog Chris@0: Chris@0: Author: Jan Wielemaker Chris@0: E-mail: wielemak@science.uva.nl Chris@0: WWW: http://www.swi-prolog.org Chris@0: Copyright (C): 1985-2007, University of Amsterdam Chris@0: Chris@0: This program is free software; you can redistribute it and/or Chris@0: modify it under the terms of the GNU General Public License Chris@0: as published by the Free Software Foundation; either version 2 Chris@0: of the License, or (at your option) any later version. Chris@0: Chris@0: This program is distributed in the hope that it will be useful, Chris@0: but WITHOUT ANY WARRANTY; without even the implied warranty of Chris@0: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Chris@0: GNU General Public License for more details. Chris@0: Chris@0: You should have received a copy of the GNU General Public Chris@0: License along with this library; if not, write to the Free Software Chris@0: Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA Chris@0: Chris@0: As a special exception, if you link this library with other files, Chris@0: compiled with a Free Software compiler, to produce an executable, this Chris@0: library does not by itself cause the resulting executable to be covered Chris@0: by the GNU General Public License. This exception does not however Chris@0: invalidate any other reasons why the executable file might be covered by Chris@0: the GNU General Public License. Chris@0: */ Chris@0: Chris@0: :- module(rdf_history, Chris@0: [ rdfh_transaction/1, % :Goal Chris@0: rdfh_assert/3, % +S,+P,+O Chris@0: rdfh_retractall/3, % +S,+P,+O Chris@0: rdfh_update/3, % +S[->NS],+P[->NP],+O[->[NO] Chris@0: rdfh_db_transaction/3, % ?DB, +Condition, ?Transaction Chris@0: rdfh_triple_transaction/2, % +Triple, -Transaction Chris@0: rdfh_transaction_member/2 % ?Action, +Transaction Chris@0: ]). Chris@0: :- use_module(library('http/http_session')). Chris@0: :- use_module(library(lists)). Chris@0: :- use_module(library(record)). Chris@0: :- use_module(library(error)). Chris@0: :- use_module(library(debug)). Chris@0: :- use_module(library('semweb/rdf_persistency')). Chris@0: :- use_module(library('semweb/rdf_db')). Chris@0: Chris@0: Chris@0: /** RDF Persistent store change history Chris@0: Chris@0: This module deals with accessing the journal files of the RDF Chris@0: persistency layer to get insight in the provenance and history of the Chris@0: RDF database. It is designed for Wiki-like collaborative editing of an Chris@0: RDF graph. We make the following assumptions: Chris@0: Chris@0: * Users are identified using a URI, typically an OpenID (http://openid.net/) Chris@0: * Triples created by a user are added to a named graph identified by the Chris@0: URI of the user. Chris@0: * Changes are grouped using rdf_transaction(Goal, log(Message, User)) Chris@0: * The number that is associated with the named graph of a triple (normally Chris@0: expressing the line number in the source) is used to store the time-stamp. Chris@0: Although this information is redundant (the time stamp is the same as Chris@0: for the transaction), it allows for binary search through the history Chris@0: file for the enclosing transaction. Chris@0: Chris@0: @tbd Cleanup thoughts on delete and update. Chris@0: Chris@0: @author Jan Wielemaker Chris@0: */ Chris@0: Chris@0: /******************************* Chris@0: * DECLARATIONS * Chris@0: *******************************/ Chris@0: Chris@0: :- module_transparent Chris@0: rdfh_transaction/1. Chris@0: Chris@0: :- rdf_meta Chris@0: rdfh_assert(r,r,o), Chris@0: rdfh_retractall(r,r,o), Chris@0: rdfh_update(t,t,t). Chris@0: Chris@0: :- multifile Chris@0: rdfh_hook/1. Chris@0: Chris@0: :- record Chris@0: rdf_transaction(id:integer, Chris@0: nesting:integer, Chris@0: time:number, Chris@0: message, Chris@0: actions:list, Chris@0: other_graphs:list). Chris@0: Chris@0: Chris@0: /******************************* Chris@0: * MODIFICATIONS * Chris@0: *******************************/ Chris@0: Chris@0: %% rdfh_transaction(:Goal) is semidet. Chris@0: % Chris@0: % Run Goal using rdf_transaction/2, using information from the HTTP Chris@0: % layer to provide OpenID and session-id. Chris@0: Chris@0: rdfh_transaction(Goal) :- Chris@0: rdfh_user(User), Chris@0: transaction_context(Context), Chris@0: rdf_transaction(Goal, log(rdfh([user(User)|Context]), User)). Chris@0: Chris@0: Chris@0: %% rdfh_assert(+S, +P, +O) is det. Chris@0: % Chris@0: % Assert a triple, adding current user and time to the triple Chris@0: % context. Chris@0: Chris@0: rdfh_assert(S,P,O) :- Chris@0: ( rdf_active_transaction(log(rdfh(_), User)) Chris@0: -> rdfh_time(Time), Chris@0: rdf_assert(S,P,O,User:Time) Chris@0: ; throw(error(permission_error(assert, triple, rdf(S,P,O)), Chris@0: context(_, 'No rdfh_transaction/1'))) Chris@0: ). Chris@0: Chris@0: Chris@0: %% rdfh_retractall(+S, +P, +O) is det. Chris@0: % Chris@0: % Retract triples that match {S,P,O}. Note that all matching Chris@0: % triples are added to the journal, so we can undo the action as Chris@0: % well as report on retracted triples, even if multiple are Chris@0: % retracted at the same time. Chris@0: % Chris@0: % One of the problems we are faced with is that a retract action Chris@0: % goes into the journal of the user whose triple is retracted, Chris@0: % which may or may not be the one who performed the action. Chris@0: Chris@0: rdfh_retractall(S,P,O) :- Chris@0: ( rdf_active_transaction(log(rdfh(_), _User)) Chris@0: -> rdf_retractall(S,P,O) Chris@0: ; throw(error(permission_error(retract, triple, rdf(S,P,O)), Chris@0: context(_, 'No rdfh_transaction/1'))) Chris@0: ). Chris@0: Chris@0: Chris@0: %% rdfh_update(+S, +P, +O) is det. Chris@0: % Chris@0: % More tricky stuff, replacing a triple by another. Typically this Chris@0: % will be changing the predicate or object. Provenance info should Chris@0: % move the new triple to the user making the change, surely if the Chris@0: % object is changed. If the predicate is changed to a related Chris@0: % predicate, this actually becomes less obvious. Chris@0: % Chris@0: % Current simple-minded approach is to turn an update into a Chris@0: % retract and assert. The S,P,O specifications are either a ground Chris@0: % value or of the form _Old_ =|->|= _New_. Here is an example: Chris@0: % Chris@0: % == Chris@0: % rdfh_update(Work, Style, wn:oldstyle -> wn:newstyle) Chris@0: % == Chris@0: Chris@0: rdfh_update(S,P,O) :- Chris@0: ( rdf_active_transaction(log(rdfh(_), User)) Chris@0: -> update(S,P,O, rdf(RS, RP, RO), rdf(AS, AP, AO)), Chris@0: must_be(ground, RS), Chris@0: must_be(ground, RP), Chris@0: must_be(ground, RO), Chris@0: rdfh_time(Time), Chris@0: rdf_retractall(RS, RP, RO), Chris@0: rdf_assert(AS, AP, AO, User:Time) Chris@0: ; throw(error(permission_error(retract, triple, rdf(S,P,O)), Chris@0: context(_, 'No rdfh_transaction/1'))) Chris@0: ). Chris@0: Chris@0: update(Ss, Ps, Os, rdf(S0, P0, O0), rdf(S,P,O)) :- Chris@0: update(Ss, S0, S), Chris@0: update(Ps, P0, P), Chris@0: update(Os, O0, O). Chris@0: Chris@0: update(From->To, From, To) :- !. Chris@0: update(Value, Value, Value). Chris@0: Chris@0: Chris@0: %% transaction_context(-Term) is det. Chris@0: % Chris@0: % Context to pass with an RDF transaction. Note that we pass the Chris@0: % user. We don't need this for simple additions, but we do need it Chris@0: % to track deletions. Chris@0: Chris@0: transaction_context(Context) :- Chris@0: ( rdfh_session(Session) Chris@0: -> Context = [session(Session)] Chris@0: ; Context = [] Chris@0: ). Chris@0: Chris@0: %% rdfh_session(-Session) is semidet. Chris@0: % Chris@0: % Session is a (ground) identifier for the current session. Chris@0: Chris@0: rdfh_session(Session) :- Chris@0: rdfh_hook(session(Session)), !. Chris@0: rdfh_session(Session) :- Chris@0: catch(http_session_id(Session), _, fail). Chris@0: Chris@0: Chris@0: %% rdfh_user(-URI) is det. Chris@0: % Chris@0: % Get user-id of current session. Chris@0: % Chris@0: % @tbd Make hookable, so we can use the SeRQL user/openid hooks Chris@0: Chris@0: rdfh_user(User) :- Chris@0: rdfh_hook(user(User)), !. Chris@0: rdfh_user(OpenId) :- Chris@0: http_session_data(openid(OpenId)). Chris@0: Chris@0: %% rdfh_time(-Time:integer) is det. Chris@0: % Chris@0: % Get time stamp as integer. Second resolution is enough, and Chris@0: % avoids rounding problems associated with floats. Chris@0: Chris@0: rdfh_time(Seconds) :- Chris@0: get_time(Now), Chris@0: Seconds is round(Now). Chris@0: Chris@0: Chris@0: /******************************* Chris@0: * EXAMINE HISTORY * Chris@0: *******************************/ Chris@0: Chris@0: %% rdfh_triple_transaction(+Triple:rdf(S,P,O), -Transaction) is nondet. Chris@0: % Chris@0: % True if the (partial) Triple is modified in Transaction. Chris@0: Chris@0: rdfh_triple_transaction(rdf(S,P,O), Transaction) :- Chris@0: rdf(S,P,O,DB:Time), Chris@0: After is Time - 1, Chris@0: rdfh_db_transaction(DB, after(After), Transaction), Chris@0: rdfh_transaction_member(assert(S,P,O,Time), Transaction). Chris@0: Chris@0: %% rdfh_db_transaction(?DB, +Condition, ?Transaction) is nondet. Chris@0: % Chris@0: % True if Transaction satisfying Condition was executed on DB. Chris@0: % Condition is one of: Chris@0: % Chris@0: % * true Chris@0: % Always true, returns all transactions. Chris@0: % * id(Id) Chris@0: % Specifies the identifier of the transaction. Only makes sense Chris@0: % if DB is specified as transaction identifiers are local to each Chris@0: % DB. Chris@0: % * after(Time) Chris@0: % True if transaction is executed at or after Time. Chris@0: % Chris@0: % @tbd More conditions (e.g. before(Time)). Chris@0: Chris@0: rdfh_db_transaction(DB, true, Transaction) :- !, Chris@0: rdf_journal_file(DB, Journal), Chris@0: journal_transaction(Journal, Transaction). Chris@0: rdfh_db_transaction(DB, id(Id), Transaction) :- !, Chris@0: must_be(atom, DB), Chris@0: rdf_journal_file(DB, Journal), Chris@0: open_journal(Journal, Fd), Chris@0: call_cleanup((seek_journal(Fd, id(Id)), Chris@0: read_transaction(Fd, Transaction)), Chris@0: close(Fd)). Chris@0: rdfh_db_transaction(DB, Condition, Transaction) :- !, Chris@0: valid_condition(Condition), Chris@0: rdf_journal_file(DB, Journal), Chris@0: open_journal(Journal, Fd), Chris@0: seek_journal(Fd, Condition), Chris@0: stream_transaction(Fd, Transaction). Chris@0: Chris@0: valid_condition(Var) :- Chris@0: var(Var), !, Chris@0: instantiation_error(Var). Chris@0: valid_condition(after(Time)) :- !, Chris@0: must_be(number, Time). Chris@0: valid_condition(Cond) :- Chris@0: type_error(condition, Cond). Chris@0: Chris@0: %% open_journal(+File, -Stream) is det. Chris@0: % Chris@0: % Open a journal file. Journal files are always UTF-8 encoded. Chris@0: Chris@0: open_journal(JournalFile, Fd) :- Chris@0: open(JournalFile, read, Fd, [encoding(utf8)]). Chris@0: Chris@0: %% journal_transaction(+JournalFile, ?Transaction) is nondet. Chris@0: % Chris@0: % True if Transaction is a transaction in JournalFile, Chris@0: Chris@0: journal_transaction(JournalFile, Transaction) :- Chris@0: open_journal(JournalFile, Fd), Chris@0: stream_transaction(Fd, Transaction). Chris@0: Chris@0: stream_transaction(JFD, Transaction) :- Chris@0: call_cleanup(read_transaction(JFD, Transaction), close(JFD)). Chris@0: Chris@0: read_transaction(In, Transaction) :- Chris@0: repeat, Chris@0: read(In, T0), Chris@0: ( T0 == end_of_file Chris@0: -> !, fail Chris@0: ; transaction(T0, In, T), % transaction/3 is not steadfast Chris@0: T = Transaction Chris@0: ). Chris@0: Chris@0: transaction(begin(Id, Nest, Time, Msg), In, Chris@0: rdf_transaction(Id, Nest, Time, Msg, Actions, Others)) :- !, Chris@0: read(In, T2), Chris@0: read_transaction_actions(T2, Id, In, Actions, Others). Chris@0: transaction(start(_), _, _) :- !, fail. % Open journal Chris@0: transaction(end(_), _, _) :- !, fail. % Close journal Chris@0: transaction(Action, _, Action). % Action outside transaction? Chris@0: Chris@0: read_transaction_actions(end(Id, _, Others), Id, _, [], Others) :- !. Chris@0: read_transaction_actions(end_of_file, _, _, [], []) :- !. % TBD: Incomplete transaction (error) Chris@0: read_transaction_actions(Action, Id, In, Actions, Others) :- Chris@0: ignore_in_transaction(Action), !, Chris@0: read(In, T2), Chris@0: read_transaction_actions(T2, Id, In, Actions, Others). Chris@0: read_transaction_actions(Action, Id, In, [Action|Actions], Others) :- Chris@0: read(In, T2), Chris@0: read_transaction_actions(T2, Id, In, Actions, Others). Chris@0: Chris@0: ignore_in_transaction(start(_)). Chris@0: ignore_in_transaction(end(_)). Chris@0: ignore_in_transaction(begin(_,_,_,_)). Chris@0: ignore_in_transaction(end(_,_,_)). Chris@0: Chris@0: Chris@0: %% seek_journal(+Fd:stream, +Spec) is semidet. Chris@0: % Chris@0: % See an open journal descriptor to the start of a transaction Chris@0: % specified by Spec. Spec is one of: Chris@0: % Chris@0: % * after(Time) Chris@0: % First transaction at or after Time. Fails if there are no Chris@0: % transactions after time. Chris@0: % * id(Id) Chris@0: % Start of transaction labeled with given Id. Fails if there Chris@0: % is no transaction labeled Id. Chris@0: % Chris@0: % The implementation relies on the incrementing identifier numbers Chris@0: % and time-stamps. Chris@0: Chris@0: seek_journal(Fd, Spec) :- Chris@0: stream_property(Fd, file_name(File)), Chris@0: size_file(File, Size), Chris@0: Here is Size//2, Chris@0: Last = last(-), Chris@0: ( is_after_spec(Spec) Chris@0: -> ( bsearch_journal(Fd, 0, Here, Size, Spec, Last) Chris@0: -> true Chris@0: ; arg(1, Last, StartOfTerm), Chris@0: StartOfTerm \== (-), Chris@0: seek(Fd, StartOfTerm, bof, _) Chris@0: ) Chris@0: ; bsearch_journal(Fd, 0, Here, Size, Spec, Last) Chris@0: ). Chris@0: Chris@0: is_after_spec(after(_Time)). Chris@0: Chris@0: %% bsearch_journal(+Fd, +Start, +Here, +End, +Spec, !Last) is semidet. Chris@0: % Chris@0: % Perform a binary search in the journal opened as Fd. Chris@0: Chris@0: bsearch_journal(Fd, Start, Here, End, Spec, Last) :- Chris@0: start_of_transaction(Fd, Here, StartOfTerm, Begin), !, Chris@0: compare_transaction(Spec, Begin, Diff), Chris@0: ( Diff == (=) Chris@0: -> seek(Fd, StartOfTerm, bof, _) Chris@0: ; Diff == (<) Chris@0: -> NewHere is Start+(Here-Start)//2, Chris@0: NewHere < Here, Chris@0: nb_setarg(1, Last, StartOfTerm), Chris@0: bsearch_journal(Fd, Start, NewHere, Here, Spec, Last) Chris@0: ; NewHere is StartOfTerm+(End-StartOfTerm)//2, Chris@0: NewHere > StartOfTerm, Chris@0: bsearch_journal(Fd, StartOfTerm, NewHere, End, Spec, Last) Chris@0: ). Chris@0: bsearch_journal(Fd, Start, Here, _End, Spec, Last) :- Chris@0: NewHere is Start+(Here-Start)//2, Chris@0: NewHere < Here, Chris@0: bsearch_journal(Fd, Start, NewHere, Here, Spec, Last). Chris@0: Chris@0: compare_transaction(id(Id), begin(Id2,_,_,_), Diff) :- !, Chris@0: compare(Diff, Id, Id2). Chris@0: compare_transaction(after(Time), begin(_,_,T,_), Diff) :- !, Chris@0: compare(Diff, Time, T). Chris@0: Chris@0: %% start_of_transaction(+Fd, +From, -Start, -Term) is semidet. Chris@0: % Chris@0: % Term is the start term of the first transaction after byte Chris@0: % position From. Fails if no transaction can be found after From. Chris@0: Chris@0: start_of_transaction(Fd, From, Start, Term) :- Chris@0: seek(Fd, From, bof, _), Chris@0: skip(Fd, 10), Chris@0: repeat, Chris@0: seek(Fd, 0, current, Start), Chris@0: read(Fd, Term), Chris@0: ( transaction_start(Term) Chris@0: -> ! Chris@0: ; Term == end_of_file Chris@0: -> !, fail Chris@0: ; fail Chris@0: ). Chris@0: Chris@0: transaction_start(begin(_Id,_Nest,_Time,_Message)). Chris@0: Chris@0: %% rdfh_transaction_member(Action, Transaction) is nondet. Chris@0: % Chris@0: % True if Action is an action in Transaction. Chris@0: Chris@0: rdfh_transaction_member(Action, Transaction) :- Chris@0: rdf_transaction_actions(Transaction, Actions), Chris@0: member(Action, Actions).