samer@0: :- module(reactive, samer@0: [ run_process/1 samer@0: , init_process/2 samer@0: , step_event/3 samer@0: , step_timeout/2 samer@0: , step_timeout/3 samer@0: , get_timeout/2 samer@0: , req_message/2 samer@0: , req_message_or_timeout/4 samer@0: , (>)/3 samer@0: ]). samer@0: samer@0: /** Tools for reactive programming samer@0: samer@0: This module provides a framework for doing samer@0: reactive programming, where a reactive process is samer@0: represented by one or two continuations samer@0: which define how it will react to received events or the samer@0: passage of time. samer@0: samer@0: Relevant types: samer@0: == samer@0: ptail == pred(process). samer@0: samer@0: process ---> get(term,ptail) samer@0: ; gett(time,term,ptail,ptail). samer@0: samer@0: samer@0: event ---> event(time,_). samer@0: samer@0: event_handler ---> term^ptail. samer@0: == samer@0: samer@0: The type =|ptail|= represents a slice of execution for a process. samer@0: When called, code may execute until the process samer@0: wishes to block waiting for an event or time-out to occur. samer@0: It does this by returning a value of type =|process|= samer@0: representing either a process waiting indefinitely for an event, samer@0: or a processing waiting until a certain for an event. samer@0: samer@0: Values of type time are either floats representing times as samer@0: returned by get_time/1 or the atom 'inf' representing a point samer@0: infinitely far in the future. samer@0: */ samer@0: samer@0: :- meta_predicate req_message(:,-). samer@0: :- meta_predicate req_message_or_timeout(+,:,:,-). samer@0: :- meta_predicate run_process(1). samer@0: :- meta_predicate init_process(1,?). samer@0: :- meta_predicate >(+,1,?). samer@0: samer@0: :- use_module(qutils). samer@0: samer@0: %--------------------------------------- samer@0: samer@0: %% init_process( +Spec:ptail, -P:process) is det. samer@0: % samer@0: % Initialise process specified by Spec, which is samer@0: % simply a callable term taking one argument. samer@0: % P is bound to representation of sleeping samer@0: % process on exit. samer@0: init_process(PS,P1) :- call(PS,P1). samer@0: samer@0: %% >(+E:event,+Spec:ptail,-P:process) is det. samer@0: % samer@0: % Initialise process and immediately supply an event. samer@0: % This uses init_process/2 to initialise the process specified samer@0: % by Spec and then use step_event/3 to process the given event. samer@0: % samer@0: % This implies that if Spec is of type ptail, then samer@0: % Event>Spec is also of type ptail. samer@0: >(Event,Spec,Proc) :- samer@0: init_process(Spec,P1), samer@0: step_event(Event,P1,Proc). samer@0: samer@0: %% run_process( +Spec:ptail) is det. samer@0: % samer@0: % Runs process represented by Spec (see init_process/2). samer@0: % After initialising, this predicate goes into a recursive procedure samer@0: % waiting for events on the current thread's message queue and samer@0: % passing them to the waiting process. samer@0: run_process(M:Spec) :- init_process(M:Spec,Proc), cont_process(Proc). samer@0: samer@0: samer@0: %% cont_process( +Proc:process) is det. samer@0: % samer@0: % Takes a quiescent process, waits for and then handles a new event. samer@0: % (Including timeout events.) samer@0: cont_process(Proc) :- samer@0: get_timeout(Proc,T), !, samer@0: garbage_collect_atoms, samer@0: get_message_or_timeout(T,Event), samer@0: event_process_cont(Event,Proc,T). samer@0: samer@0: samer@0: %% event_process_cont(+Event:evdesc,+P:process,+Deadline:time) is det. samer@0: % samer@0: % Handle latest event Event on process Proc. The evdesc type is defined samer@0: % as =|evdesc ---> quit; timeout; event(time,term)|= with the following samer@0: % semantics: samer@0: % samer@0: % * quit samer@0: % The process will terminate: this procedure succeeds and returns immediately. samer@0: % samer@0: % * timeout samer@0: % The timeout continuation of the process is followed. samer@0: % samer@0: % * event(Time,Data) samer@0: % If the current time is less than 0.5s after the event time, samer@0: % the process's event continuation is called. Otherwise, samer@0: % the event is dropped and we continue as if it had never happened. samer@0: event_process_cont(quit,_,_) :- !. samer@0: event_process_cont(timeout,P1,_) :- step_timeout(P1,P2), !, cont_process(P2). samer@0: event_process_cont(event(ET,EData),P1,T1) :- !, samer@0: get_time(Now), samer@0: ( Now>ET+0.5 -> writeln('#'), cont_process(P1) samer@0: ; cont_event(T1,P1,ET,EData)). samer@0: samer@0: %% cont_event(+Deadline:time, +P:process, +EvTime:time, +EvData:term) is det. samer@0: % samer@0: % Handles process continuation after receiving a valid event. samer@0: % If the event is later than the process timeout, the timeout samer@0: % continuation is called recursively until the process is ready samer@0: % to receive the current event. Then this event is handled and samer@0: % the process continues from there. samer@0: cont_event(T1,P1,ET,EData) :- samer@0: ( T1\=inf, ET>T1 samer@0: -> step_timeout(P1,P2), samer@0: get_timeout(P2,T2), !, samer@0: cont_event(T2,P2,ET,EData) samer@0: ; step_event(event(ET,EData),P1,P2), !, samer@0: cont_process(P2) samer@0: ). samer@0: samer@0: cont_event_safely(T1,P1,ET,EData) :- samer@0: ( T1\=inf, ET>T1 samer@0: -> step_timeout(P1,P2), samer@0: get_timeout(P2,T2), !, samer@0: cont_event_safely(T2,P2,ET,EData) samer@0: ; ( step_event(event(ET,EData),P1,P2) -> true samer@0: ; writeln(failed(step_event(event(ET,EData),P1,P2))), P1=P2 samer@0: ), samer@0: cont_process(P2) samer@0: ). samer@0: samer@0: %% req_message( +EventHandler:event_handler, -P:process) is det. samer@0: % samer@0: % Returns process state representing quiescent process samer@0: % waiting indefinitley for event. EventHandler must be samer@0: % a term of the form Msg^PTail, where Msg is to be unified samer@0: % with next event term before calling PTail. samer@0: req_message(Mod:Temp^OnEvent, get(Temp,Mod:OnEvent)). samer@0: samer@0: %% req_message_or_timeout( +T:time, +TimeOutHandler:ptail, +EventHandler:event_handler, -P:process) is det. samer@0: % samer@0: % Returns process state representing quiescent process samer@0: % waiting for event. EventHandler must be samer@0: % a term of the form Msg^PTail, where Msg is to be unified samer@0: % with next event term before calling PTail to get the next samer@0: % continuation of the process. samer@0: % TimeOutHandler will be called if time no event arrives samer@0: % before time T. samer@0: req_message_or_timeout(T,OnTimeout,Mod:Temp^OnEvent, samer@0: gett(T,Temp,Mod:OnEvent,OnTimeout)). samer@0: samer@0: leq(inf,T2) :- !, T2=inf. samer@0: leq(T1,T2) :- !, (T2=inf -> true; T1= step_timeout(P1,P2); P1=P2). samer@0: samer@3: %:- index(step_event(0,1,0)). samer@0: samer@0: %% step_event(+E:event, +P1:process, -P2:process) is det. samer@0: % samer@0: % Wake up process P1 with event E and run until blocking at P2. samer@0: step_event(E,get(E,OnEvent),P2) :- call(OnEvent,P2). samer@0: step_event(E,gett(_,E,OnEvent,_),P2) :- call(OnEvent,P2).