changeset 0:a0213fab5674

Split off from triserver.
author samer
date Mon, 23 Jan 2012 17:38:47 +0000
parents
children 11e8c4e65a03
files Makefile qutils.pl reactive.pl recorder.pl
diffstat 4 files changed, 409 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Makefile	Mon Jan 23 17:38:47 2012 +0000
@@ -0,0 +1,14 @@
+INSTALL_PL_TO=~/lib/prolog
+SUBDIR=reactive
+SUB_MODULES="qutils reactive recorder"
+
+main:
+
+install:
+	install -d $(INSTALL_PL_TO)
+	install -d $(INSTALL_PL_TO)/$(SUBDIR)
+	for f in "$(SUB_MODULES)"; do install $(INSTALL_FLAGS) -m 644 $$f.pl $(INSTALL_PL_TO)/$(SUBDIR); done
+
+#for f in "$(ROOT_MODULES)"; do install $(INSTALL_FLAGS) -m 644 $$f.pl $(INSTALL_PL_TO); done
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/qutils.pl	Mon Jan 23 17:38:47 2012 +0000
@@ -0,0 +1,97 @@
+:- module( qutils, 
+		[	with_message/2
+		,	with_message_or_timeout/4
+		,	get_message_or_timeout/2
+		,	send_with_size_limit/3
+		]).
+
+:- meta_predicate with_message(?,0).
+:- meta_predicate with_message_or_timeout(+,?,0,0).
+
+%% get_message_or_timeout( +TimeOut, -Msg) is det.
+%
+%  Time-limited wait for next term in message queue.
+%  Waits up to TimeOut seconds for the next message that 
+%  unifies with Msg in the current thread.  If none arrives, 
+%  then Msg is unified  with timeout. 
+%  NB: may fail if Msg is bound on input.
+get_message_or_timeout(inf,Msg) :- !, thread_get_message(Msg).
+get_message_or_timeout(Deadline,Msg) :- 
+	get_time(Now), 
+	(	Now>=Deadline -> Msg=timeout, writeln(elapsed)
+	;	thread_self(Thread),
+		setup_call_cleanup(
+			alarm_at(Deadline, thread_send_message(Thread,timeout), Id), 
+			thread_get_message(Thread,Msg),
+			time:remove_alarm_notrace(Id))
+		%,(Msg\=timeout -> writeln(msg:Msg); true)
+	).	
+
+/*
+get_message_or_timeout(Deadline,Msg) :- 
+	get_time(Now), 
+	(	Now>=Deadline -> Msg=timeout, writeln(elapsed)
+	;	catch(
+			deadline_get_message(Deadline,Msg),
+			time_limit_exceeded,
+			Msg=timeout)
+		%, (Msg\=timeout -> writeln(msg:Msg); true)
+	).	
+
+deadline_get_message(Deadline,Msg) :-
+	setup_call_cleanup(
+		alarm_at(Deadline, ding_ding, Id), 
+		thread_get_message(Msg),
+		time:remove_alarm_notrace(Id)).
+
+ding_ding :- throw(time_limit_exceeded).
+
+
+*/
+
+
+
+%% with_message( Msg, :Goal) is nondet.
+%
+%  Waits for next message, unify with Msg, and call Goal.
+%  This procedure gets the next term out of the current thread's
+%  message queue, waiting if necessary. If it is 'quit', then
+%  it succeeds immediately leaving Msg unchanged. If it unifies
+%  with Msg, then Goal is called. Otherwise, a message is printed
+%  and the proceduce succeeds.
+with_message(Temp,OnEvent) :-
+	thread_get_message(Msg),
+	(	Msg=quit -> true
+	;	Msg=Temp -> call(OnEvent)
+	;	writeln(template_mismatch(Temp,Msg))
+	).
+
+%% with_message_or_timeout( +TimeOut, Msg, :OnEvent, :OnTimeOut) is nondet.
+%
+%  Time limited wait for message with handler goals for either case.
+%  Waits up to TimeOut seconds for next message, unify with Msg, 
+%  and call OnEvent. If none arrives before TimeOut expires,
+%  OnTimeOut is called instead. If the message is 'quit', it
+%  just returns. If the message does not unify with Msg, some
+%  text is printed and the proceduce succeeds.
+with_message_or_timeout(T,Temp,OnEvent,OnTimeout) :- 
+	get_message_or_timeout(T,Msg),
+	(	Msg=quit    -> true
+	;	Msg=timeout -> call(OnTimeout)
+	;	Msg=Temp    -> call(OnEvent)
+	;	writeln(template_mismatch(Temp,Msg))
+	).
+
+%% send_with_size_limit( +Max:natural, +Queue:queue_id, +Msg:term) is det.
+%
+%  Adds term to message queue unless queue is already full.
+%  If message queue Queu already contains more than Max elements,
+%  then "*" is printed and the procedure succeeds. NB. the message
+%  is discarded.
+send_with_size_limit(Max,Queue,Msg) :-
+	message_queue_property(Queue,size(QSize)),
+	(	QSize<Max -> thread_send_message(Queue,Msg)
+	;	writeln('*')
+	).
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/reactive.pl	Mon Jan 23 17:38:47 2012 +0000
@@ -0,0 +1,197 @@
+:- module(reactive,
+		[	run_process/1
+		,	init_process/2
+		,	step_event/3
+		,	step_timeout/2
+		,	step_timeout/3
+		,	get_timeout/2
+		,	req_message/2
+		,	req_message_or_timeout/4
+		,	(>)/3
+		]).
+
+/** <module> Tools for reactive programming
+
+	This module provides a framework for doing
+	reactive programming, where a reactive process is
+	represented by one or two continuations  
+	which define how it will react to received events or the 
+	passage of time.
+
+	Relevant types:
+	==
+	ptail    ==  pred(process).
+
+	process ---> get(term,ptail)
+	           ; gett(time,term,ptail,ptail).
+
+
+	event   ---> event(time,_).
+
+	event_handler ---> term^ptail.
+	==
+
+	The type =|ptail|= represents a slice of execution for a process.
+	When called, code may execute until the process
+	wishes to block waiting for an event or time-out to occur. 
+	It does this by returning a value of type =|process|= 
+	representing either a process waiting indefinitely for an event, 
+	or a processing waiting until a certain for an event.
+
+	Values of type time are either floats representing times as
+	returned by get_time/1 or the atom 'inf' representing a point
+	infinitely far in the future. 
+*/
+
+:- meta_predicate req_message(:,-).
+:- meta_predicate req_message_or_timeout(+,:,:,-).
+:- meta_predicate run_process(1).
+:- meta_predicate init_process(1,?).
+:- meta_predicate >(+,1,?).
+
+:- use_module(qutils).
+
+%---------------------------------------
+
+%% init_process( +Spec:ptail, -P:process) is det.
+%
+%  Initialise process specified by Spec, which is
+%  simply a callable term taking one argument.
+%  P is bound to representation of sleeping
+%  process on exit. 
+init_process(PS,P1) :- call(PS,P1).
+
+%% >(+E:event,+Spec:ptail,-P:process) is det.
+%
+%  Initialise process and immediately supply an event.
+%  This uses init_process/2 to initialise the process specified
+%  by Spec and then use step_event/3 to process the given event.
+%
+%  This implies that if Spec is of type ptail, then 
+%  Event>Spec is also of type ptail.
+>(Event,Spec,Proc) :-
+	init_process(Spec,P1),
+	step_event(Event,P1,Proc).
+
+%% run_process( +Spec:ptail) is det.
+%
+%	Runs process represented by Spec (see init_process/2).
+%	After initialising, this predicate goes into a recursive procedure
+%	waiting for events on the current thread's message queue and
+%	passing them to the waiting process.
+run_process(M:Spec) :- init_process(M:Spec,Proc), cont_process(Proc).
+
+
+%% cont_process( +Proc:process) is det.
+%
+%  Takes a quiescent process, waits for and then handles a new event.
+%  (Including timeout events.)
+cont_process(Proc) :-
+	get_timeout(Proc,T), !,
+	garbage_collect_atoms,
+	get_message_or_timeout(T,Event),
+	event_process_cont(Event,Proc,T).
+
+
+%% event_process_cont(+Event:evdesc,+P:process,+Deadline:time) is det.
+%
+%  Handle latest event Event on process Proc. The evdesc type is defined
+%  as =|evdesc ---> quit; timeout; event(time,term)|= with the following
+%  semantics:
+%
+%   * quit
+%   The process will terminate: this procedure succeeds and returns immediately.
+%
+%   * timeout
+%   The timeout continuation of the process is followed.
+%
+%   * event(Time,Data)
+%   If the current time is less than 0.5s after the event time,
+%   the process's event continuation is called. Otherwise,
+%   the event is dropped and we continue as if it had never happened.
+event_process_cont(quit,_,_) :- !.
+event_process_cont(timeout,P1,_) :- step_timeout(P1,P2), !, cont_process(P2).
+event_process_cont(event(ET,EData),P1,T1) :- !, 
+	get_time(Now),
+	(	Now>ET+0.5 -> writeln('#'), cont_process(P1)
+	;	cont_event(T1,P1,ET,EData)).
+
+%% cont_event(+Deadline:time, +P:process, +EvTime:time, +EvData:term) is det.
+%
+%  Handles process continuation after receiving a valid event.
+%  If the event is later than the process timeout, the timeout
+%  continuation is called recursively until the process is ready
+%  to receive the current event. Then this event is handled and
+%  the process continues from there.
+cont_event(T1,P1,ET,EData) :- 
+	(	T1\=inf, ET>T1 
+	-> step_timeout(P1,P2),
+		get_timeout(P2,T2), !,
+		cont_event(T2,P2,ET,EData)
+	;	step_event(event(ET,EData),P1,P2), !,
+		cont_process(P2)
+	).
+
+cont_event_safely(T1,P1,ET,EData) :- 
+	(	T1\=inf, ET>T1 
+	-> step_timeout(P1,P2),
+		get_timeout(P2,T2), !,
+		cont_event_safely(T2,P2,ET,EData)
+	;	(	step_event(event(ET,EData),P1,P2) ->	true
+		;	writeln(failed(step_event(event(ET,EData),P1,P2))), P1=P2
+		),
+		cont_process(P2)
+	).
+
+%% req_message( +EventHandler:event_handler, -P:process) is det.
+%
+%  Returns process state representing quiescent process
+%  waiting indefinitley for event. EventHandler must be
+%  a term of the form Msg^PTail, where Msg is to be unified
+%  with next event term before calling PTail.
+req_message(Mod:Temp^OnEvent, get(Temp,Mod:OnEvent)).
+
+%% req_message_or_timeout( +T:time, +TimeOutHandler:ptail, +EventHandler:event_handler, -P:process) is det.
+%
+%  Returns process state representing quiescent process
+%  waiting for event. EventHandler must be
+%  a term of the form Msg^PTail, where Msg is to be unified
+%  with next event term  before calling PTail to get the next
+%  continuation of the process.
+%  TimeOutHandler will be called if time no event arrives
+%  before time T.
+req_message_or_timeout(T,OnTimeout,Mod:Temp^OnEvent,
+	gett(T,Temp,Mod:OnEvent,OnTimeout)).
+
+leq(inf,T2) :- !, T2=inf.
+leq(T1,T2)  :- !, (T2=inf -> true; T1=<T2). 
+
+%% get_timeout(+P:process,-Deadline:time) is det
+%
+% Get timeout time of quiescent process. May be inf.
+get_timeout(get(_,_),inf).
+get_timeout(gett(T,_,_,_),T).
+
+%% step_timeout(+P1:process,-P2:process) is det.
+%
+%  Wake up process P1 with timeout event and run until
+%  it blocks at P2.
+step_timeout(gett(_,_,_,OnTimeout),P2) :- call(OnTimeout,P2).
+step_timeout(get(E,OnEvent),get(E,OnEvent)).
+
+%% step_timeout(+T:time, +P1:process,-P2:process) is det.
+%
+%  If time T is later than P1's timeout time, wake it up and
+%  run timeout event until it blocks at P2. Otherwise, P2=P1,
+%  ie process is still waiting.
+step_timeout(T,P1,P2) :-
+	get_timeout(P1,T1), 
+	(leq(T1,T) -> step_timeout(P1,P2); P1=P2).
+
+:- index(step_event(0,1,0)).
+
+%% step_event(+E:event, +P1:process, -P2:process) is det.
+%
+%  Wake up process P1 with event E and run until blocking at P2.
+step_event(E,get(E,OnEvent),P2) :- call(OnEvent,P2).
+step_event(E,gett(_,E,OnEvent,_),P2) :- call(OnEvent,P2).
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/recorder.pl	Mon Jan 23 17:38:47 2012 +0000
@@ -0,0 +1,101 @@
+:- module(recorder, [recorder/2, save_events/1, load_events/1, player/2]).
+/** <module> event recording
+
+This module provides a way to capture and record events processed by
+the reactive programming framework of reactive.pl.
+*/
+:- meta_predicate recorder(1,?), player(1,?).
+:- dynamic start_time/1, event/2.
+
+:- use_module(library(fileutils)).
+:- use_module(reactive).
+
+
+%% recorder( +Client:ptail, -Proc:process) is det.
+%
+%  This predicate represents a reactive process that behaves as Client,
+%  but records all events in the Prolog dynamic database. The type signature
+%  implies that the term recorder(Client) is of type =|ptail|=. All previously
+%  recorded events are deleted first. The time at which this predicate was
+%  called is also recorded in the database. 
+
+recorder(Client,Proc) :-
+	get_time(Now),
+	retractall(event(_,_)),
+	retractall(start_time(_)),
+	assert(start_time(Now)),
+	call(Client,C1),
+	recorder_cont(C1,Proc).
+
+recorder_on_event(E,C1,Proc) :-
+	assert(E),	
+	step_event(E,C1,C2),
+	recorder_cont(C2,Proc).
+
+recorder_on_timeout(T,C1,Proc) :-
+	step_timeout(T,C1,C2),
+	recorder_cont(C2,Proc).
+
+recorder_cont(C1,Proc) :-
+	get_timeout(C1,T1),
+	(	T1=inf
+	->	req_message(E^recorder_on_event(E,C1), Proc)
+	;	req_message_or_timeout(T1,
+			recorder_on_timeout(T1,C1),
+			E^recorder_on_event(E,C1), Proc)
+	).
+
+%% player( +Client:ptail, -Proc:process) is det.
+%
+%  This predicate represents a reactive process that behaves as Client,
+%  but plays back events in the Prolog dynamic database. The events are
+%  time shifted by the difference between the recorded start time and the
+%  time at which player/2 is called. The type signature
+%  implies that the term recorder(Client) is of type =|ptail|=.
+
+player(Client,Proc) :-
+	get_time(Now), 
+	start_time(T0), DT is Now-T0,
+	setof(event(T,Msg),T1^(event(T1,Msg),T is DT+T1),Events),
+	call(Client,C1),
+	player_cont(Events,C1,Proc).
+
+player_on_event(Events,C1,Proc) :-
+	player_cont(Events,C1,Proc).
+
+player_on_timeout(T,ET,Events,C1,Proc) :-
+	(	T<ET 
+	->	step_timeout(T,C1,C2),
+		player_cont(Events,C2,Proc)
+	;	Events=[E1|EX],
+		step_event(E1,C1,C2),
+		player_cont(EX,C2,Proc)
+	).
+
+player_cont([],C1,C1) :- !,
+	writeln('Playback finished - continuing in interactive mode').
+
+player_cont(Events,C1,Proc) :-
+	Events=[event(ET,_)|_],
+	get_timeout(C1,T1),
+	min(ET,T1,TO),
+
+	req_message_or_timeout(TO,
+		player_on_timeout(TO,ET,Events,C1),
+		_^player_on_event(Events,C1), Proc).
+
+
+%% save_events(+FileName:atom) is det.
+%  Save events in the event database to the named file (as Prolog clauses).
+save_events(File) :- 
+	with_output_to_file(File,(listing(start_time),listing(event))).
+
+
+%% load_events(+FileName:atom) is det.
+%  Load events from the named file to the event database, after removing
+%  any events currently in there.
+load_events(File) :-
+	retractall(event(_,_)),
+	retractall(start_time(_)),
+	consult(File).
+