Mercurial > hg > reactive
changeset 0:a0213fab5674
Split off from triserver.
author | samer |
---|---|
date | Mon, 23 Jan 2012 17:38:47 +0000 |
parents | |
children | 11e8c4e65a03 |
files | Makefile qutils.pl reactive.pl recorder.pl |
diffstat | 4 files changed, 409 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Makefile Mon Jan 23 17:38:47 2012 +0000 @@ -0,0 +1,14 @@ +INSTALL_PL_TO=~/lib/prolog +SUBDIR=reactive +SUB_MODULES="qutils reactive recorder" + +main: + +install: + install -d $(INSTALL_PL_TO) + install -d $(INSTALL_PL_TO)/$(SUBDIR) + for f in "$(SUB_MODULES)"; do install $(INSTALL_FLAGS) -m 644 $$f.pl $(INSTALL_PL_TO)/$(SUBDIR); done + +#for f in "$(ROOT_MODULES)"; do install $(INSTALL_FLAGS) -m 644 $$f.pl $(INSTALL_PL_TO); done + +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/qutils.pl Mon Jan 23 17:38:47 2012 +0000 @@ -0,0 +1,97 @@ +:- module( qutils, + [ with_message/2 + , with_message_or_timeout/4 + , get_message_or_timeout/2 + , send_with_size_limit/3 + ]). + +:- meta_predicate with_message(?,0). +:- meta_predicate with_message_or_timeout(+,?,0,0). + +%% get_message_or_timeout( +TimeOut, -Msg) is det. +% +% Time-limited wait for next term in message queue. +% Waits up to TimeOut seconds for the next message that +% unifies with Msg in the current thread. If none arrives, +% then Msg is unified with timeout. +% NB: may fail if Msg is bound on input. +get_message_or_timeout(inf,Msg) :- !, thread_get_message(Msg). +get_message_or_timeout(Deadline,Msg) :- + get_time(Now), + ( Now>=Deadline -> Msg=timeout, writeln(elapsed) + ; thread_self(Thread), + setup_call_cleanup( + alarm_at(Deadline, thread_send_message(Thread,timeout), Id), + thread_get_message(Thread,Msg), + time:remove_alarm_notrace(Id)) + %,(Msg\=timeout -> writeln(msg:Msg); true) + ). + +/* +get_message_or_timeout(Deadline,Msg) :- + get_time(Now), + ( Now>=Deadline -> Msg=timeout, writeln(elapsed) + ; catch( + deadline_get_message(Deadline,Msg), + time_limit_exceeded, + Msg=timeout) + %, (Msg\=timeout -> writeln(msg:Msg); true) + ). + +deadline_get_message(Deadline,Msg) :- + setup_call_cleanup( + alarm_at(Deadline, ding_ding, Id), + thread_get_message(Msg), + time:remove_alarm_notrace(Id)). + +ding_ding :- throw(time_limit_exceeded). + + +*/ + + + +%% with_message( Msg, :Goal) is nondet. +% +% Waits for next message, unify with Msg, and call Goal. +% This procedure gets the next term out of the current thread's +% message queue, waiting if necessary. If it is 'quit', then +% it succeeds immediately leaving Msg unchanged. If it unifies +% with Msg, then Goal is called. Otherwise, a message is printed +% and the proceduce succeeds. +with_message(Temp,OnEvent) :- + thread_get_message(Msg), + ( Msg=quit -> true + ; Msg=Temp -> call(OnEvent) + ; writeln(template_mismatch(Temp,Msg)) + ). + +%% with_message_or_timeout( +TimeOut, Msg, :OnEvent, :OnTimeOut) is nondet. +% +% Time limited wait for message with handler goals for either case. +% Waits up to TimeOut seconds for next message, unify with Msg, +% and call OnEvent. If none arrives before TimeOut expires, +% OnTimeOut is called instead. If the message is 'quit', it +% just returns. If the message does not unify with Msg, some +% text is printed and the proceduce succeeds. +with_message_or_timeout(T,Temp,OnEvent,OnTimeout) :- + get_message_or_timeout(T,Msg), + ( Msg=quit -> true + ; Msg=timeout -> call(OnTimeout) + ; Msg=Temp -> call(OnEvent) + ; writeln(template_mismatch(Temp,Msg)) + ). + +%% send_with_size_limit( +Max:natural, +Queue:queue_id, +Msg:term) is det. +% +% Adds term to message queue unless queue is already full. +% If message queue Queu already contains more than Max elements, +% then "*" is printed and the procedure succeeds. NB. the message +% is discarded. +send_with_size_limit(Max,Queue,Msg) :- + message_queue_property(Queue,size(QSize)), + ( QSize<Max -> thread_send_message(Queue,Msg) + ; writeln('*') + ). + +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/reactive.pl Mon Jan 23 17:38:47 2012 +0000 @@ -0,0 +1,197 @@ +:- module(reactive, + [ run_process/1 + , init_process/2 + , step_event/3 + , step_timeout/2 + , step_timeout/3 + , get_timeout/2 + , req_message/2 + , req_message_or_timeout/4 + , (>)/3 + ]). + +/** <module> Tools for reactive programming + + This module provides a framework for doing + reactive programming, where a reactive process is + represented by one or two continuations + which define how it will react to received events or the + passage of time. + + Relevant types: + == + ptail == pred(process). + + process ---> get(term,ptail) + ; gett(time,term,ptail,ptail). + + + event ---> event(time,_). + + event_handler ---> term^ptail. + == + + The type =|ptail|= represents a slice of execution for a process. + When called, code may execute until the process + wishes to block waiting for an event or time-out to occur. + It does this by returning a value of type =|process|= + representing either a process waiting indefinitely for an event, + or a processing waiting until a certain for an event. + + Values of type time are either floats representing times as + returned by get_time/1 or the atom 'inf' representing a point + infinitely far in the future. +*/ + +:- meta_predicate req_message(:,-). +:- meta_predicate req_message_or_timeout(+,:,:,-). +:- meta_predicate run_process(1). +:- meta_predicate init_process(1,?). +:- meta_predicate >(+,1,?). + +:- use_module(qutils). + +%--------------------------------------- + +%% init_process( +Spec:ptail, -P:process) is det. +% +% Initialise process specified by Spec, which is +% simply a callable term taking one argument. +% P is bound to representation of sleeping +% process on exit. +init_process(PS,P1) :- call(PS,P1). + +%% >(+E:event,+Spec:ptail,-P:process) is det. +% +% Initialise process and immediately supply an event. +% This uses init_process/2 to initialise the process specified +% by Spec and then use step_event/3 to process the given event. +% +% This implies that if Spec is of type ptail, then +% Event>Spec is also of type ptail. +>(Event,Spec,Proc) :- + init_process(Spec,P1), + step_event(Event,P1,Proc). + +%% run_process( +Spec:ptail) is det. +% +% Runs process represented by Spec (see init_process/2). +% After initialising, this predicate goes into a recursive procedure +% waiting for events on the current thread's message queue and +% passing them to the waiting process. +run_process(M:Spec) :- init_process(M:Spec,Proc), cont_process(Proc). + + +%% cont_process( +Proc:process) is det. +% +% Takes a quiescent process, waits for and then handles a new event. +% (Including timeout events.) +cont_process(Proc) :- + get_timeout(Proc,T), !, + garbage_collect_atoms, + get_message_or_timeout(T,Event), + event_process_cont(Event,Proc,T). + + +%% event_process_cont(+Event:evdesc,+P:process,+Deadline:time) is det. +% +% Handle latest event Event on process Proc. The evdesc type is defined +% as =|evdesc ---> quit; timeout; event(time,term)|= with the following +% semantics: +% +% * quit +% The process will terminate: this procedure succeeds and returns immediately. +% +% * timeout +% The timeout continuation of the process is followed. +% +% * event(Time,Data) +% If the current time is less than 0.5s after the event time, +% the process's event continuation is called. Otherwise, +% the event is dropped and we continue as if it had never happened. +event_process_cont(quit,_,_) :- !. +event_process_cont(timeout,P1,_) :- step_timeout(P1,P2), !, cont_process(P2). +event_process_cont(event(ET,EData),P1,T1) :- !, + get_time(Now), + ( Now>ET+0.5 -> writeln('#'), cont_process(P1) + ; cont_event(T1,P1,ET,EData)). + +%% cont_event(+Deadline:time, +P:process, +EvTime:time, +EvData:term) is det. +% +% Handles process continuation after receiving a valid event. +% If the event is later than the process timeout, the timeout +% continuation is called recursively until the process is ready +% to receive the current event. Then this event is handled and +% the process continues from there. +cont_event(T1,P1,ET,EData) :- + ( T1\=inf, ET>T1 + -> step_timeout(P1,P2), + get_timeout(P2,T2), !, + cont_event(T2,P2,ET,EData) + ; step_event(event(ET,EData),P1,P2), !, + cont_process(P2) + ). + +cont_event_safely(T1,P1,ET,EData) :- + ( T1\=inf, ET>T1 + -> step_timeout(P1,P2), + get_timeout(P2,T2), !, + cont_event_safely(T2,P2,ET,EData) + ; ( step_event(event(ET,EData),P1,P2) -> true + ; writeln(failed(step_event(event(ET,EData),P1,P2))), P1=P2 + ), + cont_process(P2) + ). + +%% req_message( +EventHandler:event_handler, -P:process) is det. +% +% Returns process state representing quiescent process +% waiting indefinitley for event. EventHandler must be +% a term of the form Msg^PTail, where Msg is to be unified +% with next event term before calling PTail. +req_message(Mod:Temp^OnEvent, get(Temp,Mod:OnEvent)). + +%% req_message_or_timeout( +T:time, +TimeOutHandler:ptail, +EventHandler:event_handler, -P:process) is det. +% +% Returns process state representing quiescent process +% waiting for event. EventHandler must be +% a term of the form Msg^PTail, where Msg is to be unified +% with next event term before calling PTail to get the next +% continuation of the process. +% TimeOutHandler will be called if time no event arrives +% before time T. +req_message_or_timeout(T,OnTimeout,Mod:Temp^OnEvent, + gett(T,Temp,Mod:OnEvent,OnTimeout)). + +leq(inf,T2) :- !, T2=inf. +leq(T1,T2) :- !, (T2=inf -> true; T1=<T2). + +%% get_timeout(+P:process,-Deadline:time) is det +% +% Get timeout time of quiescent process. May be inf. +get_timeout(get(_,_),inf). +get_timeout(gett(T,_,_,_),T). + +%% step_timeout(+P1:process,-P2:process) is det. +% +% Wake up process P1 with timeout event and run until +% it blocks at P2. +step_timeout(gett(_,_,_,OnTimeout),P2) :- call(OnTimeout,P2). +step_timeout(get(E,OnEvent),get(E,OnEvent)). + +%% step_timeout(+T:time, +P1:process,-P2:process) is det. +% +% If time T is later than P1's timeout time, wake it up and +% run timeout event until it blocks at P2. Otherwise, P2=P1, +% ie process is still waiting. +step_timeout(T,P1,P2) :- + get_timeout(P1,T1), + (leq(T1,T) -> step_timeout(P1,P2); P1=P2). + +:- index(step_event(0,1,0)). + +%% step_event(+E:event, +P1:process, -P2:process) is det. +% +% Wake up process P1 with event E and run until blocking at P2. +step_event(E,get(E,OnEvent),P2) :- call(OnEvent,P2). +step_event(E,gett(_,E,OnEvent,_),P2) :- call(OnEvent,P2).
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/recorder.pl Mon Jan 23 17:38:47 2012 +0000 @@ -0,0 +1,101 @@ +:- module(recorder, [recorder/2, save_events/1, load_events/1, player/2]). +/** <module> event recording + +This module provides a way to capture and record events processed by +the reactive programming framework of reactive.pl. +*/ +:- meta_predicate recorder(1,?), player(1,?). +:- dynamic start_time/1, event/2. + +:- use_module(library(fileutils)). +:- use_module(reactive). + + +%% recorder( +Client:ptail, -Proc:process) is det. +% +% This predicate represents a reactive process that behaves as Client, +% but records all events in the Prolog dynamic database. The type signature +% implies that the term recorder(Client) is of type =|ptail|=. All previously +% recorded events are deleted first. The time at which this predicate was +% called is also recorded in the database. + +recorder(Client,Proc) :- + get_time(Now), + retractall(event(_,_)), + retractall(start_time(_)), + assert(start_time(Now)), + call(Client,C1), + recorder_cont(C1,Proc). + +recorder_on_event(E,C1,Proc) :- + assert(E), + step_event(E,C1,C2), + recorder_cont(C2,Proc). + +recorder_on_timeout(T,C1,Proc) :- + step_timeout(T,C1,C2), + recorder_cont(C2,Proc). + +recorder_cont(C1,Proc) :- + get_timeout(C1,T1), + ( T1=inf + -> req_message(E^recorder_on_event(E,C1), Proc) + ; req_message_or_timeout(T1, + recorder_on_timeout(T1,C1), + E^recorder_on_event(E,C1), Proc) + ). + +%% player( +Client:ptail, -Proc:process) is det. +% +% This predicate represents a reactive process that behaves as Client, +% but plays back events in the Prolog dynamic database. The events are +% time shifted by the difference between the recorded start time and the +% time at which player/2 is called. The type signature +% implies that the term recorder(Client) is of type =|ptail|=. + +player(Client,Proc) :- + get_time(Now), + start_time(T0), DT is Now-T0, + setof(event(T,Msg),T1^(event(T1,Msg),T is DT+T1),Events), + call(Client,C1), + player_cont(Events,C1,Proc). + +player_on_event(Events,C1,Proc) :- + player_cont(Events,C1,Proc). + +player_on_timeout(T,ET,Events,C1,Proc) :- + ( T<ET + -> step_timeout(T,C1,C2), + player_cont(Events,C2,Proc) + ; Events=[E1|EX], + step_event(E1,C1,C2), + player_cont(EX,C2,Proc) + ). + +player_cont([],C1,C1) :- !, + writeln('Playback finished - continuing in interactive mode'). + +player_cont(Events,C1,Proc) :- + Events=[event(ET,_)|_], + get_timeout(C1,T1), + min(ET,T1,TO), + + req_message_or_timeout(TO, + player_on_timeout(TO,ET,Events,C1), + _^player_on_event(Events,C1), Proc). + + +%% save_events(+FileName:atom) is det. +% Save events in the event database to the named file (as Prolog clauses). +save_events(File) :- + with_output_to_file(File,(listing(start_time),listing(event))). + + +%% load_events(+FileName:atom) is det. +% Load events from the named file to the event database, after removing +% any events currently in there. +load_events(File) :- + retractall(event(_,_)), + retractall(start_time(_)), + consult(File). +