/* Part of DML (Digital Music Laboratory)
Copyright 2014-2015 Samer Abdallah, University of London
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public
+	Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
:- module(async, 
[  async_memo/4
,  async_current_job/5
,  async_thread_progress/4
,  async_get_progress_callback/1
,  async_set_progress_callback/1
,  async_unset_progress_callback/0
,  async_cancel/3
,  async_cancel/2
,  with_progress_stack/1
,  with_cont/4
,  simple_task/2
,  stepwise_task/4
,  updatable_status_task/3
,  map_with_progress/3
,  map_fold_with_progress/5
,  with_global/3
,  with_new_ref/3
]).
/** <module> Asynchronous memoised computations with progress and partial results
+   This module provides services for managing memoised computations in a thread pool
---++ Job control
+   results if they are available. Completed computations are memoised using
+   library(memo).
+   ---++ Job control
---++ Helper meta-predicates
+   Computations are requested using async_memo/4. If the result is already available in
==
   summary :: progress_level(list(any)).
   partial_result :: progress_level(any).
   ==
+   status information about the job is returned. Otherwise, a new job is started. Status
+   information is returned as the type =|async_status(A)|=.
+   ==
+   async_status(A) ---> done(metadata)
+                      ; initiated(job_id,natural)
+                      ; waiting(natural)
recompute(test(P),M) :- call(P,M).
+                      ; recompute(job_id,natural,metadata).
+   job_id == atom.
+   ==
+   The type =|metadata|= is defined in library(memo).
+   ---++ Helper meta-predicates
+   The second half of the module provides a collection of predicates for computations
+   to use to handle requests for progress information, in the form of a stack of tasks. 
+   It recognises the following levels of progress information:
+   ==
+   summary :: progress_level(list(any)).
+   partial_result :: progress_level(any).
+   ==
+:- meta_predicate 
+         async_set_progress_callback(2)
+      ,  async_memo(+,0,-,+)
+      ,  with_global(+,+,0)
+      ,  with_new_ref(-,+,0)
+      ,  with_progress_stack(0)
+      ,  with_progress_handler(2,0)
+      ,  with_cont(+,1,2,-)
+      ,  simple_task(+,0)
+      ,  stepwise_task(+,+,-,0)
+      % ,  stepwise_partial_task(+,+,-,-,0)
+      % ,  fold_with_progress(3,+,+,-)
+      ,  map_with_progress(2,+,-)
+      ,  map_fold_with_progress(2,3,+,+,-).
+:- use_module(library(thread_pool)).
+:- use_module(library(insist)).
+:- use_module(library(memo)).
+:- use_module(library(dcg_core),[seqmap/4]).
+%% async_memo(+P:thread_pool, +G:memo_goal, -S:async_status(_), Opts:options) is det.
+%  Start running a goal asynchronously in a thread pool if it is not
+%  already running. 
+%  In order for the goal to provide progress information, it must use nb_setval/2
+%  to set the global variable =|progress|=.
+%  Options:
+%     * status_var(Var:status_var(A))
+%       If computation is already running, information of type =|A|=
+%       associated with job is returned in Stats. Default is =|progress(summary)|=.
+%     * recompute(R:oneof([none,failed,test(Pred)]))
+%       If =|none|=, then a previously failed computation is not retried, and the
+%       status =|done(Comp-Result)|= is returned, where Comp includes
+%       the computation time and host and Result is =|fail|= or =|ex(Exception)|=.
+%       If =|failed|=, unsuccessful computations are retried and the status
+%       =|recomputing(ID,Pos,Meta)|= is returned, where =|ID|= is the ID of the
+%       new job, =|Pos|= its initial position in the queue, and =|Meta=Comp-Result|=
+%       the status of the previous computation.
+%       If =|test(Pred)|=, then the computation is retried if =|call(Pred,Meta)|=
+%       succeeds, where Meta is the memoisied computation metadata and the variables
+%       in Goal will be unified with their values resulting from that computation.
+%       The status returned is =|initiated(ID,Pos)|=. Pred must include its module
+%       specifier as a I am too lazy to worry about meta-predicate options right now.
+%     * globals(list(pair(atom,any)))
+%       Set global variables to given values in new thread.
+%  The value of the =|status_var(_)|= option determines the information returned
+%  if the status of the computation is =|running(ID,StatusVals)|=.
+%  ==
+%  progress(progress_level(A)) :: status_var(A).
+%  stats(list(stats_key))      :: status_var(list(stats_vals)).
+%  time                        :: status_var(pair(time,float)). % start and elapsed times
+%  status_var(A)-status_var(B) :: status_var(pair(A,B)).
+%  time==float. % as returned by get_time/1.
+%  ==
+%  The type =|stats_key|= denotes any atom accepted by thread_statistics/3, and
+%  =|stats_vals|= the type of values returned.
+async_memo(_,Goal,done(Meta),Opts) :- 
+   option(recompute(none),Opts,none),
+   browse(Goal,Meta), !.
+async_memo(Pool,Goal,Status,Opts) :- 
+   with_mutex(async,unsafe_async_memo(Pool,Goal,Status,Opts)).
+unsafe_async_memo(Pool,Goal,Status,Opts) :-
+   option(globals(Globals),Opts,[]),
+   (  browse(Goal,Meta)
+   -> option(recompute(Recompute),Opts,none),
+      (  recompute(Recompute,Meta) 
+      -> clear_all(Goal,Meta),
+         spawn(Pool,Goal,Globals,ID,Waiting),
+         Status=recomputing(ID,Waiting,Meta)
+      ;  Status=done(Meta)
+      )
+   ;  % !!! it's possible that another thread might be computing
+      % this goal and complete at this point. If this happens, 
+      % that job will not be detected and a new a job will be started
+      % here. If that happens, the newly created job will find the result
+      % and complete quickly.
+      (  async_current_job(Pool,Goal,ID,T0,S) 
+      -> option(progress_levels(Levels),Opts,[summary]),
+         % !!! it's possible that the job will complete here
+         % and so the progress request will create an error
+         goal_status(ID,T0,Levels,S,Status)
+      ;  spawn(Pool,Goal,Globals,ID,Waiting), 
+         Status=spawned(ID,Waiting)
+      )
+   ).
+recompute(test(P),M) :- call(P,M).
+goal_status(ID, TSubmit, _, waiting(Pos),           waiting(ID,TSubmit,Pos)).
+goal_status(ID, _, Levels,  running(Thread,TStart), running(ID,TStart,Thread,Progress)) :-
+   catch( ( async_thread_progress(Thread,Levels,T,Vals), Progress=just(T-Vals)), Ex, 
+          ( print_message(warning,error_getting_job_progress(Thread,Ex)), Progress=nothing)).
+spawn(Pool,Goal,Globals,ID,Waiting) :-
+   uuid(ID),
+   (thread_pool_property(Pool,backlog(Waiting)) -> true; Waiting=0),
+   get_time(Time),
+   setup_call_catcher_cleanup(
+      recordz(Pool,job(ID,Time,Goal),Ref),
+      thread_create_in_pool(Pool,spawnee(ID,Goal,Globals),_,
+                            [ detached(true), at_exit(erase(Ref))]),
+      exception(_),
+      erase(Ref)).
+spawnee(ID,Goal,Globals) :- 
+   setup_call_cleanup(
+      with_mutex(async, 
+         (  recorded(ID,async:cancelled,Ref)
+         -> debug(async,'Cancelled before starting: ~q',[Goal]),
+            erase(Ref), fail
+         ;  pairs_keys_values(Globals,GNames,GVals),
+            maplist(nb_setval,GNames,GVals),
+            get_time(T0),
+            thread_self(Thread),
+            recordz(ID,running(Thread,T0),Ref)
+         )),
+      ( freeze(Res,check_result(Res)), % prevent memoisation if abort(_) is caught
+         debug(async,"Running asyc goal ~q",[Goal]),
+         memo(Goal,_-Res)),
+      erase(Ref)).
+check_result(Ex) :-
+   (  Ex=ex(abort(Reason)) 
+   -> debug(async,'Computation aborted (~w)',[Reason]),
+      throw(abort(Reason))
+   ;  true
+   ).
+%% async_current_job(+Pool:thread_pool, -Goal:callable, -ID:job_id, -T:time, -S:job_status) is nondet.
+%  Retrieves information about current pending jobs. A job is pending if it is running
+%  or waiting to run in the specified thread pool. T is the time the job was submitted.
+%  Job status is:
+%  ==
+%  job_status ---> waiting(natural)     % queued at given postion
+%                ; running(thread_id,time). % with start time
+%  ==
+async_current_job(Pool,Goal,ID,Time,Status) :-
+   recorded(Pool,job(ID,Time,Goal)), % this finds ALL jobs
+   (  recorded(ID,running(Thread,StartTime)) 
+   -> Status=running(Thread,StartTime)
+   ;  thread_pool_property(Pool,waiting(Waiting)),
+      nth1(Pos,Waiting,create(Pool,async:spawnee(ID,_,_),_,_,_)) 
+   -> Status=waiting(Pos)
+   ;  print_message(warning, async_job_status_indeterminate(Pool,ID,Goal)),
+      Status=indeterminate % could be finished, or transferring from waiting to running
+   ).
+%% async_thread_progress(+Thread:thread_id,+Levels:list(progress_level), -T:time, -Vals:list) is det.
+%  Called in main thread to get information about job running in thread Thread.  
+%  Will throw an exception if:
+%     -  the thread has terminated
+%     -  the thread does not respond with progress info in less than 3 seconds
+%     -  the thread does not have a progress handler call back registered
+%     -  the progress handler fails on any of the progress levels requested.
+async_thread_progress(Thread,Levels,Time,Vals) :-
+   thread_self(Me),
+   gensym(prog,RID), % progress request ID
+   flush_all_messages(Me,progress(_,_,_,_)),
+   thread_signal(Thread,report_progress(RID,Levels,Me)),
+   (  thread_get_message(Me,progress(RID,Time,Vals,St),[timeout(3)]) -> true
+   ;  throw(get_thread_progress_timeout(Thread,Levels))
+   ),
+   debug(async,'Got progress ~w from thread ~w: ~w',[Levels,Thread,Vals]),
+   memo:reflect(St).
+flush_all_messages(Queue,Pattern) :-
+   repeat, \+thread_get_message(Queue,Pattern,[timeout(0)]), !.
+%% report_progress(+RID:atomic, +Var:status_var(A), +Thread:thread_id) is det.
+%  Called from job thread in response to a signal, to send progress information to thread Thread.
+report_progress(RID,Levels,Thread) :-
+   get_time(Time),
+   memo:reify(async:my_progress(Levels,Vals),St), !,
+   debug(async,'reporting progress(~w): ~q',[Levels,Vals]),
+   thread_send_message(Thread,progress(RID,Time,Vals,St)).
+my_progress(Levels,Vals) :- nb_getval(async_progress,G), insist(maplist(G,Levels,Vals)).  
+%% async_cancel(+Pool:thread_pool, +ID:job_id, +Ex:term) is det.
+%% async_cancel(+Pool:thread_pool, +ID:job_id) is det.
+%  Cancel job with given ID, if it is currently pending. If it is running, the
+%  exception term Ex is thrown (=|abort(cancel)|= in async_cancel/2).
+%  If not, the recorded database is used to mark the job for immediate failure 
+%  as soon as it is started.
+async_cancel(Pool,ID) :- async_cancel(Pool,ID,abort(cancel)).
+async_cancel(Pool,ID,Ex) :-
+   debug(async,'Cancelling ~w:~w with ~w',[Pool,ID,Ex]),
+   with_mutex(async, (async_current_job(Pool,_,ID,_,S) -> cancel(ID,S,Ex); true)).
+cancel(ID,waiting(_),_) :- recordz(ID,async:cancelled).
+cancel(_, running(Thread,_),Ex) :- 
+   catch( thread_signal(Thread,throw(Ex)), E,
+          print_message(warning,error_signalling_job_thread(Thread,E))). 
+cancel(ID,indeterminate,_) :- print_message(warning,indeterminate_job_state_on_cancel(ID)).
+%% async_set_progress_callback(+P:pred(+Level:any,-Info:any)) is det.
+%  Sets the progress handler for the current thread.
+async_set_progress_callback(Pred) :- nb_setval(async_progress,Pred).
+%% async_get_progress_callback(-P:pred(+Level:any,-Info:any)) is semidet.
+%  Gets the progress handler for the current thread, if one has been set.
+async_get_progress_callback(Pred) :- nb_current(async_progress,Pred).
+%% async_unset_progress_callback is det.
+%  Unsets the progress handler for the current thread. Subsequent calls to 
+%  async_get_progress_callback/1 will fail.
+async_unset_progress_callback :- nb_delete(async_progress).
+% -----------------------------------------------------------------------
+%% with_progress_stack(+Goal:callable) is nondet.
+%  Runs Goal with the current thread's progress handler (which must be initially
+%  unset) set to an empty task stack. Tasks (goals which run in cooperation with a
+%  a progress handler) can be pushed on to the task stack using with_progress_handler/2.
+%  Fails if progress callback has already been set to something else.
+%  The progress handler recognises progress level terms:
+%     *  summary :: progress_level(list(any))
+%        Yields a list of progress terms, one for each entry in the task stack.
+%     *  partial_result :: progress_level(any)
+%        Yields partial results of the computation if they are available.
+with_progress_stack(Goal) :-
+   async_get_progress_callback(CB), !,
+   CB=async:get_stack_progress,
+   call(Goal).
+with_progress_stack(Goal) :-
+   with_global(async_progress_stack, nil,
+      setup_call_cleanup( 
+         async_set_progress_callback(get_stack_progress), Goal,
+         async_unset_progress_callback)).
+%% get_stack_progress(+Level:progress_level(A),-Info:A) is semidet.
+%  Gets the request level of progress information from the currently installed
+%  progress stack.
+get_stack_progress(summary,P) :-
+   nb_getval(async_progress_stack,H),
+   call(H,progress(summary),P).
+get_stack_progress(partial_result,R) :-
+   nb_getval(async_progress_stack,H),
+   call(H,partial_result,R).
+%% with_progress_handler(+H:progress_handler, +Goal:callable) is nondet.
+%  Pushes the progress handler H to the top of the task stack associated with
+%  the current thread and calls Goal. The handler is removed on exit.
+with_progress_handler(H,Goal) :- 
+   nb_getval(async_progress_stack,H0), call(H0,push(H),H1),
+   setup_call_cleanup( nb_setval(async_progress_stack,H1), Goal,
+                       nb_setval(async_progress_stack,H0)).
+nil(partial_result(Cont),R) :- call(Cont,nothing,R).
+chain(Head,Tail1,push(H),chain(Head,Tail2)) :- call(Tail1,push(H),Tail2).
+chain(Head,Tail,progress(L),[PH|PT]) :- 
+   call(Head,progress(L),PH), 
+   call(Tail,progress(L),PT).
+chain(Head,Tail,partial_result,MR) :- 
+   call(Tail,partial_result,R1),
+   (call(Head,partial_result(R1),R) -> MR=just(R); MR=nothing).
+%% simple_task(+Desc,+Goal:callable) is det.
+%  Calls goal with a progress handler that reports Desc when progress
+%  information is requested. No partial results are available.
+simple_task(Desc,Goal) :- with_progress_handler(simple_progress(Desc),Goal).
+%% with_cont(+Desc,+Pred:pred(-A),+Cont:pred(+A,-B),-Result:B) is det.
+%  Calls Pred and then passes the result to Cont yielding Result.
+%  A progress handler is installed such that any partial results yielded by
+%  subtasks in Pred are passed to Cont to yield a partial result. Progress requests
+%  yield Desc.
+with_cont(Desc,Pred,Cont,Result) :- 
+   with_progress_handler(cont(Desc,Cont),
+      (call(Pred,R),call(Cont,R,Result))).
+cont(_,Cont,Tail1,push(H),cont(Cont,Tail2)) :- call(Tail1,push(H),Tail2).
+cont(_,Cont,partial_result(just(R1)),R) :- call(Cont,R1,R).
+%% stepwise_task(+Desc,+Total:nonneg,-Step:callable,+Goal:callable) is det.
+%  Calls Goal with a progress handler that is able to report progress as a term
+%  =|stepwise(Desc,Done/Total)|=. Goal is permitted to call Step in order to increment
+%  the =|Done|= counter. Partial results are not available.
+stepwise_task(Desc,Total,async:increment(Counter),Goal) :-
+   with_new_ref(Counter,0, with_progress_handler(stepwise_progress(Desc,Total,Counter), Goal)).
+stepwise_progress(Desc,Total,Counter,progress(_),stepwise(Desc,Done/Total)) :- b_getval(Counter,Done).
+%% updatable_status_task(+Initial,-Update:pred(any),+Goal:callable) is det.
+%  Calls goal with Update set to a predicate which can be used to set the
+%  current status of the task. This will be reported if progress is requested.
+%  No partial results are available. Initial is the status before Update is called.
+:- meta_predicate updatable_status_task(+,-,0).
+updatable_status_task(Initial,async:set_ref(Status),Goal) :-
+   with_new_ref(Status,Initial, with_progress_handler(status_progress(Status),Goal)).
+status_progress(Status,progress(_),S) :- b_getval(Status,S).
+map_with_progress(P,Xs,Ys) :-
+   length(Xs,Total),
+   stepwise_task(map(P),Total,Step, maplist(call_and_step(Step,P),Xs,Ys)).
+call_and_step(Step,P,X,Y) :- call(P,X,Y), call(Step).
+fold_with_progress(Folder,Items,S0,S1) :-
+   length(Items,N),
+   stepwise_partial_task(folding(Folder),N,Step,Update,
+      seqmap(step_update(Step,Update,Folder),Items,S0,S1)).
+step_update(Step,Update,P,X,S0,S1) :-
+   call(P,X,S0,S1),
+   call(Update,S1),
+   call(Step).
+:- setting(map_fold_batch,nonneg,512,"Batch size for flushing fold queue.").
+%% map_fold_with_progress(+Mapper:pred(A,B), +Folder:pred(list(B),S,S), +Items:list(A), +S1:S, -S2:S) is det.
+%  Map Mapper over Items and fold over the results with Folder, with progress information
+%  partial results available. Note that Folder operates on a _list_ of items, not just one
+%  item at a time. Mapper is used to accumulate up to N items before folding them batchwise
+%  into the accumulator, where N is the value of the setting =|map_fold_batch|=.
+%  Progress information is returned as a term =|stepwise(Desc,Done/Total)|=, where
+%  =|Desc=map_fold(Mapper,Folder)|=. If partial results are requested, the current batch
+%  of mapped items is first folded in before returning the value of the accumulator.
+map_fold_with_progress(Mapper,Folder,Items,S1,S2) :-
+   length(Items,Total),
+   setting(map_fold_batch,M1),
+   with_new_ref(StateRef,s(Items,0,M1,Head,Head,S1),
+      (  Handler=map_fold_progress(map_fold(Mapper,Folder),Total,StateRef),
+         with_progress_handler(Handler,
+            (  map_fold_loop(Mapper,Folder,StateRef),
+               call(Handler,partial_result(nothing),S2))))).
+map_fold_loop(Mapper,Folder,StateRef) :-
+   b_getval(StateRef,s(Xs1,N1,M1,Head,Ys1,S)), 
+   (  Xs1=[] -> Ys1=[]
+   ;  Xs1=[X|Xs2] 
+   -> call(Mapper,X,Y), !, 
+      Ys1=[Y|Ys2], succ(N1,N2), 
+      State1=s(Xs2,N2,M2,Head,Ys2,S), 
+      (  succ(M2,M1) -> State2=State1
+      ;  simple_task(folding,flush_fold(Folder,State1,State2))
+      ),
+      b_setval(StateRef,State2), !,
+      map_fold_loop(Mapper,Folder,StateRef)
+   ).
+map_fold_progress(Desc,Total,StateRef,progress(_),stepwise(Desc,Done/Total)) :-
+   b_getval(StateRef,s(_,Done,_,_,_,_)).
+map_fold_progress(map_fold(_,Folder),_,StateRef,partial_result(_),S2) :-
+   b_getval(StateRef,State1),
+   flush_fold(Folder,State1,State2),
+   b_setval(StateRef,State2),
+   State2=s(_,_,_,_,_,S2).
+flush_fold(Folder,s(Xs,Done,_,Head,[],S1),s(Xs,Done,M1,H2,H2,S2)) :-
+   setting(map_fold_batch,M1),
+   call(Folder,Head,S1,S2), !.
+% for managing global variable
+new_ref(Ref) :- gensym('$ref',Ref).
+new_ref(Ref,Val) :- new_ref(Ref), b_setval(Ref,Val).
+with_global(Name,Val,Goal) :- setup_call_cleanup(b_setval(Name,Val), Goal, nb_delete(Name)).
+with_new_ref(Name,Val,Goal) :- new_ref(Name), with_global(Name,Val,Goal).
+set_ref(Ref,Val) :- nb_setval(Ref,Val).
+increment(Counter) :- b_getval(Counter,N), succ(N,M), b_setval(Counter,M).