Daniel@0: /* Part of DML (Digital Music Laboratory) Daniel@0: Copyright 2014-2015 Samer Abdallah, University of London Daniel@0: Daniel@0: This program is free software; you can redistribute it and/or Daniel@0: modify it under the terms of the GNU General Public License Daniel@0: as published by the Free Software Foundation; either version 2 Daniel@0: of the License, or (at your option) any later version. Daniel@0: Daniel@0: This program is distributed in the hope that it will be useful, Daniel@0: but WITHOUT ANY WARRANTY; without even the implied warranty of Daniel@0: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Daniel@0: GNU General Public License for more details. Daniel@0: Daniel@0: You should have received a copy of the GNU General Public Daniel@0: License along with this library; if not, write to the Free Software Daniel@0: Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA Daniel@0: */ Daniel@0: Daniel@0: :- module(async, Daniel@0: [ async_memo/4 Daniel@0: , async_current_job/5 Daniel@0: , async_thread_progress/4 Daniel@0: , async_get_progress_callback/1 Daniel@0: , async_set_progress_callback/1 Daniel@0: , async_unset_progress_callback/0 Daniel@0: , async_cancel/3 Daniel@0: , async_cancel/2 Daniel@0: , with_progress_stack/1 Daniel@0: , with_cont/4 Daniel@0: , simple_task/2 Daniel@0: , stepwise_task/4 Daniel@0: , updatable_status_task/3 Daniel@0: , map_with_progress/3 Daniel@0: , map_fold_with_progress/5 Daniel@0: , with_global/3 Daniel@0: , with_new_ref/3 Daniel@0: ]). Daniel@0: Daniel@0: /** Asynchronous memoised computations with progress and partial results Daniel@0: Daniel@0: This module provides services for managing memoised computations in a thread pool Daniel@0: with the ability to check on the progress of running jobs and receive partial Daniel@0: results if they are available. Completed computations are memoised using Daniel@0: library(memo). Daniel@0: Daniel@0: ---++ Job control Daniel@0: The first half of the module is concerned with managing memoised computations. Daniel@0: Computations are requested using async_memo/4. If the result is already available in Daniel@0: the memoised database, the result is returned. If the computation is already running, Daniel@0: status information about the job is returned. Otherwise, a new job is started. Status Daniel@0: information is returned as the type =|async_status(A)|=. Daniel@0: == Daniel@0: async_status(A) ---> done(metadata) Daniel@0: ; initiated(job_id,natural) Daniel@0: ; waiting(natural) Daniel@0: ; running(job_id,A)) Daniel@0: ; recompute(job_id,natural,metadata). Daniel@0: Daniel@0: job_id == atom. Daniel@0: == Daniel@0: The type =|metadata|= is defined in library(memo). Daniel@0: Daniel@0: Daniel@0: ---++ Helper meta-predicates Daniel@0: The second half of the module provides a collection of predicates for computations Daniel@0: to use to handle requests for progress information, in the form of a stack of tasks. Daniel@0: It recognises the following levels of progress information: Daniel@0: == Daniel@0: summary :: progress_level(list(any)). Daniel@0: partial_result :: progress_level(any). Daniel@0: == Daniel@0: */ Daniel@0: :- meta_predicate Daniel@0: async_set_progress_callback(2) Daniel@0: , async_memo(+,0,-,+) Daniel@0: , with_global(+,+,0) Daniel@0: , with_new_ref(-,+,0) Daniel@0: , with_progress_stack(0) Daniel@0: , with_progress_handler(2,0) Daniel@0: , with_cont(+,1,2,-) Daniel@0: , simple_task(+,0) Daniel@0: , stepwise_task(+,+,-,0) Daniel@0: % , stepwise_partial_task(+,+,-,-,0) Daniel@0: % , fold_with_progress(3,+,+,-) Daniel@0: , map_with_progress(2,+,-) Daniel@0: , map_fold_with_progress(2,3,+,+,-). Daniel@0: Daniel@0: :- use_module(library(thread_pool)). Daniel@0: :- use_module(library(insist)). Daniel@0: :- use_module(library(memo)). Daniel@0: :- use_module(library(dcg_core),[seqmap/4]). Daniel@0: Daniel@0: %% async_memo(+P:thread_pool, +G:memo_goal, -S:async_status(_), Opts:options) is det. Daniel@0: % Daniel@0: % Start running a goal asynchronously in a thread pool if it is not Daniel@0: % already running. Daniel@0: % In order for the goal to provide progress information, it must use nb_setval/2 Daniel@0: % to set the global variable =|progress|=. Daniel@0: % Daniel@0: % Options: Daniel@0: % * status_var(Var:status_var(A)) Daniel@0: % If computation is already running, information of type =|A|= Daniel@0: % associated with job is returned in Stats. Default is =|progress(summary)|=. Daniel@0: % * recompute(R:oneof([none,failed,test(Pred)])) Daniel@0: % If =|none|=, then a previously failed computation is not retried, and the Daniel@0: % status =|done(Comp-Result)|= is returned, where Comp includes Daniel@0: % the computation time and host and Result is =|fail|= or =|ex(Exception)|=. Daniel@0: % If =|failed|=, unsuccessful computations are retried and the status Daniel@0: % =|recomputing(ID,Pos,Meta)|= is returned, where =|ID|= is the ID of the Daniel@0: % new job, =|Pos|= its initial position in the queue, and =|Meta=Comp-Result|= Daniel@0: % the status of the previous computation. Daniel@0: % If =|test(Pred)|=, then the computation is retried if =|call(Pred,Meta)|= Daniel@0: % succeeds, where Meta is the memoisied computation metadata and the variables Daniel@0: % in Goal will be unified with their values resulting from that computation. Daniel@0: % The status returned is =|initiated(ID,Pos)|=. Pred must include its module Daniel@0: % specifier as a I am too lazy to worry about meta-predicate options right now. Daniel@0: % * globals(list(pair(atom,any))) Daniel@0: % Set global variables to given values in new thread. Daniel@0: % Daniel@0: % The value of the =|status_var(_)|= option determines the information returned Daniel@0: % if the status of the computation is =|running(ID,StatusVals)|=. Daniel@0: % == Daniel@0: % progress(progress_level(A)) :: status_var(A). Daniel@0: % stats(list(stats_key)) :: status_var(list(stats_vals)). Daniel@0: % time :: status_var(pair(time,float)). % start and elapsed times Daniel@0: % status_var(A)-status_var(B) :: status_var(pair(A,B)). Daniel@0: % Daniel@0: % time==float. % as returned by get_time/1. Daniel@0: % == Daniel@0: % The type =|stats_key|= denotes any atom accepted by thread_statistics/3, and Daniel@0: % =|stats_vals|= the type of values returned. Daniel@0: Daniel@0: async_memo(_,Goal,done(Meta),Opts) :- Daniel@0: option(recompute(none),Opts,none), Daniel@0: browse(Goal,Meta), !. Daniel@0: Daniel@0: async_memo(Pool,Goal,Status,Opts) :- Daniel@0: with_mutex(async,unsafe_async_memo(Pool,Goal,Status,Opts)). Daniel@0: Daniel@0: unsafe_async_memo(Pool,Goal,Status,Opts) :- Daniel@0: option(globals(Globals),Opts,[]), Daniel@0: ( browse(Goal,Meta) Daniel@0: -> option(recompute(Recompute),Opts,none), Daniel@0: ( recompute(Recompute,Meta) Daniel@0: -> clear_all(Goal,Meta), Daniel@0: spawn(Pool,Goal,Globals,ID,Waiting), Daniel@0: Status=recomputing(ID,Waiting,Meta) Daniel@0: ; Status=done(Meta) Daniel@0: ) Daniel@0: ; % !!! it's possible that another thread might be computing Daniel@0: % this goal and complete at this point. If this happens, Daniel@0: % that job will not be detected and a new a job will be started Daniel@0: % here. If that happens, the newly created job will find the result Daniel@0: % and complete quickly. Daniel@0: ( async_current_job(Pool,Goal,ID,T0,S) Daniel@0: -> option(progress_levels(Levels),Opts,[summary]), Daniel@0: % !!! it's possible that the job will complete here Daniel@0: % and so the progress request will create an error Daniel@0: goal_status(ID,T0,Levels,S,Status) Daniel@0: ; spawn(Pool,Goal,Globals,ID,Waiting), Daniel@0: Status=spawned(ID,Waiting) Daniel@0: ) Daniel@0: ). Daniel@0: Daniel@0: recompute(failed,_-fail). Daniel@0: recompute(failed,_-ex(_)). Daniel@0: recompute(test(P),M) :- call(P,M). Daniel@0: Daniel@0: goal_status(ID, TSubmit, _, waiting(Pos), waiting(ID,TSubmit,Pos)). Daniel@0: goal_status(ID, _, Levels, running(Thread,TStart), running(ID,TStart,Thread,Progress)) :- Daniel@0: catch( ( async_thread_progress(Thread,Levels,T,Vals), Progress=just(T-Vals)), Ex, Daniel@0: ( print_message(warning,error_getting_job_progress(Thread,Ex)), Progress=nothing)). Daniel@0: Daniel@0: spawn(Pool,Goal,Globals,ID,Waiting) :- Daniel@0: uuid(ID), Daniel@0: (thread_pool_property(Pool,backlog(Waiting)) -> true; Waiting=0), Daniel@0: get_time(Time), Daniel@0: setup_call_catcher_cleanup( Daniel@0: recordz(Pool,job(ID,Time,Goal),Ref), Daniel@0: thread_create_in_pool(Pool,spawnee(ID,Goal,Globals),_, Daniel@0: [ detached(true), at_exit(erase(Ref))]), Daniel@0: exception(_), Daniel@0: erase(Ref)). Daniel@0: Daniel@0: spawnee(ID,Goal,Globals) :- Daniel@0: setup_call_cleanup( Daniel@0: with_mutex(async, Daniel@0: ( recorded(ID,async:cancelled,Ref) Daniel@0: -> debug(async,'Cancelled before starting: ~q',[Goal]), Daniel@0: erase(Ref), fail Daniel@0: ; pairs_keys_values(Globals,GNames,GVals), Daniel@0: maplist(nb_setval,GNames,GVals), Daniel@0: get_time(T0), Daniel@0: thread_self(Thread), Daniel@0: recordz(ID,running(Thread,T0),Ref) Daniel@0: )), Daniel@0: ( freeze(Res,check_result(Res)), % prevent memoisation if abort(_) is caught Daniel@0: debug(async,"Running asyc goal ~q",[Goal]), Daniel@0: memo(Goal,_-Res)), Daniel@0: erase(Ref)). Daniel@0: Daniel@0: check_result(Ex) :- Daniel@0: ( Ex=ex(abort(Reason)) Daniel@0: -> debug(async,'Computation aborted (~w)',[Reason]), Daniel@0: throw(abort(Reason)) Daniel@0: ; true Daniel@0: ). Daniel@0: Daniel@0: %% async_current_job(+Pool:thread_pool, -Goal:callable, -ID:job_id, -T:time, -S:job_status) is nondet. Daniel@0: % Retrieves information about current pending jobs. A job is pending if it is running Daniel@0: % or waiting to run in the specified thread pool. T is the time the job was submitted. Daniel@0: % Job status is: Daniel@0: % == Daniel@0: % job_status ---> waiting(natural) % queued at given postion Daniel@0: % ; running(thread_id,time). % with start time Daniel@0: % == Daniel@0: async_current_job(Pool,Goal,ID,Time,Status) :- Daniel@0: recorded(Pool,job(ID,Time,Goal)), % this finds ALL jobs Daniel@0: ( recorded(ID,running(Thread,StartTime)) Daniel@0: -> Status=running(Thread,StartTime) Daniel@0: ; thread_pool_property(Pool,waiting(Waiting)), Daniel@0: nth1(Pos,Waiting,create(Pool,async:spawnee(ID,_,_),_,_,_)) Daniel@0: -> Status=waiting(Pos) Daniel@0: ; print_message(warning, async_job_status_indeterminate(Pool,ID,Goal)), Daniel@0: Status=indeterminate % could be finished, or transferring from waiting to running Daniel@0: ). Daniel@0: Daniel@0: Daniel@0: %% async_thread_progress(+Thread:thread_id,+Levels:list(progress_level), -T:time, -Vals:list) is det. Daniel@0: % Called in main thread to get information about job running in thread Thread. Daniel@0: % Will throw an exception if: Daniel@0: % - the thread has terminated Daniel@0: % - the thread does not respond with progress info in less than 3 seconds Daniel@0: % - the thread does not have a progress handler call back registered Daniel@0: % - the progress handler fails on any of the progress levels requested. Daniel@0: async_thread_progress(Thread,Levels,Time,Vals) :- Daniel@0: thread_self(Me), Daniel@0: gensym(prog,RID), % progress request ID Daniel@0: flush_all_messages(Me,progress(_,_,_,_)), Daniel@0: thread_signal(Thread,report_progress(RID,Levels,Me)), Daniel@0: ( thread_get_message(Me,progress(RID,Time,Vals,St),[timeout(3)]) -> true Daniel@0: ; throw(get_thread_progress_timeout(Thread,Levels)) Daniel@0: ), Daniel@0: debug(async,'Got progress ~w from thread ~w: ~w',[Levels,Thread,Vals]), Daniel@0: memo:reflect(St). Daniel@0: Daniel@0: flush_all_messages(Queue,Pattern) :- Daniel@0: repeat, \+thread_get_message(Queue,Pattern,[timeout(0)]), !. Daniel@0: Daniel@0: %% report_progress(+RID:atomic, +Var:status_var(A), +Thread:thread_id) is det. Daniel@0: % Called from job thread in response to a signal, to send progress information to thread Thread. Daniel@0: report_progress(RID,Levels,Thread) :- Daniel@0: get_time(Time), Daniel@0: memo:reify(async:my_progress(Levels,Vals),St), !, Daniel@0: debug(async,'reporting progress(~w): ~q',[Levels,Vals]), Daniel@0: thread_send_message(Thread,progress(RID,Time,Vals,St)). Daniel@0: Daniel@0: my_progress(Levels,Vals) :- nb_getval(async_progress,G), insist(maplist(G,Levels,Vals)). Daniel@0: Daniel@0: Daniel@0: %% async_cancel(+Pool:thread_pool, +ID:job_id, +Ex:term) is det. Daniel@0: %% async_cancel(+Pool:thread_pool, +ID:job_id) is det. Daniel@0: % Cancel job with given ID, if it is currently pending. If it is running, the Daniel@0: % exception term Ex is thrown (=|abort(cancel)|= in async_cancel/2). Daniel@0: % If not, the recorded database is used to mark the job for immediate failure Daniel@0: % as soon as it is started. Daniel@0: async_cancel(Pool,ID) :- async_cancel(Pool,ID,abort(cancel)). Daniel@0: async_cancel(Pool,ID,Ex) :- Daniel@0: debug(async,'Cancelling ~w:~w with ~w',[Pool,ID,Ex]), Daniel@0: with_mutex(async, (async_current_job(Pool,_,ID,_,S) -> cancel(ID,S,Ex); true)). Daniel@0: Daniel@0: cancel(ID,waiting(_),_) :- recordz(ID,async:cancelled). Daniel@0: cancel(_, running(Thread,_),Ex) :- Daniel@0: catch( thread_signal(Thread,throw(Ex)), E, Daniel@0: print_message(warning,error_signalling_job_thread(Thread,E))). Daniel@0: cancel(ID,indeterminate,_) :- print_message(warning,indeterminate_job_state_on_cancel(ID)). Daniel@0: Daniel@0: %% async_set_progress_callback(+P:pred(+Level:any,-Info:any)) is det. Daniel@0: % Sets the progress handler for the current thread. Daniel@0: async_set_progress_callback(Pred) :- nb_setval(async_progress,Pred). Daniel@0: Daniel@0: %% async_get_progress_callback(-P:pred(+Level:any,-Info:any)) is semidet. Daniel@0: % Gets the progress handler for the current thread, if one has been set. Daniel@0: async_get_progress_callback(Pred) :- nb_current(async_progress,Pred). Daniel@0: Daniel@0: %% async_unset_progress_callback is det. Daniel@0: % Unsets the progress handler for the current thread. Subsequent calls to Daniel@0: % async_get_progress_callback/1 will fail. Daniel@0: async_unset_progress_callback :- nb_delete(async_progress). Daniel@0: Daniel@0: % ----------------------------------------------------------------------- Daniel@0: Daniel@0: Daniel@0: %% with_progress_stack(+Goal:callable) is nondet. Daniel@0: % Daniel@0: % Runs Goal with the current thread's progress handler (which must be initially Daniel@0: % unset) set to an empty task stack. Tasks (goals which run in cooperation with a Daniel@0: % a progress handler) can be pushed on to the task stack using with_progress_handler/2. Daniel@0: % Fails if progress callback has already been set to something else. Daniel@0: % Daniel@0: % The progress handler recognises progress level terms: Daniel@0: % * summary :: progress_level(list(any)) Daniel@0: % Yields a list of progress terms, one for each entry in the task stack. Daniel@0: % * partial_result :: progress_level(any) Daniel@0: % Yields partial results of the computation if they are available. Daniel@0: with_progress_stack(Goal) :- Daniel@0: async_get_progress_callback(CB), !, Daniel@0: CB=async:get_stack_progress, Daniel@0: call(Goal). Daniel@0: with_progress_stack(Goal) :- Daniel@0: with_global(async_progress_stack, nil, Daniel@0: setup_call_cleanup( Daniel@0: async_set_progress_callback(get_stack_progress), Goal, Daniel@0: async_unset_progress_callback)). Daniel@0: Daniel@0: %% get_stack_progress(+Level:progress_level(A),-Info:A) is semidet. Daniel@0: % Gets the request level of progress information from the currently installed Daniel@0: % progress stack. Daniel@0: get_stack_progress(summary,P) :- Daniel@0: nb_getval(async_progress_stack,H), Daniel@0: call(H,progress(summary),P). Daniel@0: Daniel@0: get_stack_progress(partial_result,R) :- Daniel@0: nb_getval(async_progress_stack,H), Daniel@0: call(H,partial_result,R). Daniel@0: Daniel@0: Daniel@0: %% with_progress_handler(+H:progress_handler, +Goal:callable) is nondet. Daniel@0: % Pushes the progress handler H to the top of the task stack associated with Daniel@0: % the current thread and calls Goal. The handler is removed on exit. Daniel@0: with_progress_handler(H,Goal) :- Daniel@0: nb_getval(async_progress_stack,H0), call(H0,push(H),H1), Daniel@0: setup_call_cleanup( nb_setval(async_progress_stack,H1), Goal, Daniel@0: nb_setval(async_progress_stack,H0)). Daniel@0: Daniel@0: nil(progress(_),[]). Daniel@0: nil(push(H),chain(H,nil)). Daniel@0: nil(partial_result,nothing). Daniel@0: nil(partial_result(Cont),R) :- call(Cont,nothing,R). Daniel@0: Daniel@0: chain(Head,Tail1,push(H),chain(Head,Tail2)) :- call(Tail1,push(H),Tail2). Daniel@0: chain(Head,Tail,progress(L),[PH|PT]) :- Daniel@0: call(Head,progress(L),PH), Daniel@0: call(Tail,progress(L),PT). Daniel@0: chain(Head,Tail,partial_result,MR) :- Daniel@0: call(Tail,partial_result,R1), Daniel@0: (call(Head,partial_result(R1),R) -> MR=just(R); MR=nothing). Daniel@0: Daniel@0: Daniel@0: %% simple_task(+Desc,+Goal:callable) is det. Daniel@0: % Daniel@0: % Calls goal with a progress handler that reports Desc when progress Daniel@0: % information is requested. No partial results are available. Daniel@0: simple_task(Desc,Goal) :- with_progress_handler(simple_progress(Desc),Goal). Daniel@0: Daniel@0: simple_progress(Desc,progress(_),Desc). Daniel@0: Daniel@0: Daniel@0: %% with_cont(+Desc,+Pred:pred(-A),+Cont:pred(+A,-B),-Result:B) is det. Daniel@0: % Daniel@0: % Calls Pred and then passes the result to Cont yielding Result. Daniel@0: % A progress handler is installed such that any partial results yielded by Daniel@0: % subtasks in Pred are passed to Cont to yield a partial result. Progress requests Daniel@0: % yield Desc. Daniel@0: with_cont(Desc,Pred,Cont,Result) :- Daniel@0: with_progress_handler(cont(Desc,Cont), Daniel@0: (call(Pred,R),call(Cont,R,Result))). Daniel@0: Daniel@0: cont(_,Cont,Tail1,push(H),cont(Cont,Tail2)) :- call(Tail1,push(H),Tail2). Daniel@0: cont(Desc,_,progress(_),Desc). Daniel@0: cont(_,Cont,partial_result(just(R1)),R) :- call(Cont,R1,R). Daniel@0: Daniel@0: %% stepwise_task(+Desc,+Total:nonneg,-Step:callable,+Goal:callable) is det. Daniel@0: % Daniel@0: % Calls Goal with a progress handler that is able to report progress as a term Daniel@0: % =|stepwise(Desc,Done/Total)|=. Goal is permitted to call Step in order to increment Daniel@0: % the =|Done|= counter. Partial results are not available. Daniel@0: stepwise_task(Desc,Total,async:increment(Counter),Goal) :- Daniel@0: with_new_ref(Counter,0, with_progress_handler(stepwise_progress(Desc,Total,Counter), Goal)). Daniel@0: Daniel@0: stepwise_progress(Desc,Total,Counter,progress(_),stepwise(Desc,Done/Total)) :- b_getval(Counter,Done). Daniel@0: Daniel@0: Daniel@0: %% updatable_status_task(+Initial,-Update:pred(any),+Goal:callable) is det. Daniel@0: % Daniel@0: % Calls goal with Update set to a predicate which can be used to set the Daniel@0: % current status of the task. This will be reported if progress is requested. Daniel@0: % No partial results are available. Initial is the status before Update is called. Daniel@0: :- meta_predicate updatable_status_task(+,-,0). Daniel@0: updatable_status_task(Initial,async:set_ref(Status),Goal) :- Daniel@0: with_new_ref(Status,Initial, with_progress_handler(status_progress(Status),Goal)). Daniel@0: Daniel@0: status_progress(Status,progress(_),S) :- b_getval(Status,S). Daniel@0: Daniel@0: Daniel@0: map_with_progress(P,Xs,Ys) :- Daniel@0: length(Xs,Total), Daniel@0: stepwise_task(map(P),Total,Step, maplist(call_and_step(Step,P),Xs,Ys)). Daniel@0: Daniel@0: call_and_step(Step,P,X,Y) :- call(P,X,Y), call(Step). Daniel@0: Daniel@0: /* Daniel@0: fold_with_progress(Folder,Items,S0,S1) :- Daniel@0: length(Items,N), Daniel@0: stepwise_partial_task(folding(Folder),N,Step,Update, Daniel@0: seqmap(step_update(Step,Update,Folder),Items,S0,S1)). Daniel@0: Daniel@0: step_update(Step,Update,P,X,S0,S1) :- Daniel@0: call(P,X,S0,S1), Daniel@0: call(Update,S1), Daniel@0: call(Step). Daniel@0: */ Daniel@0: Daniel@0: :- setting(map_fold_batch,nonneg,512,"Batch size for flushing fold queue."). Daniel@0: Daniel@0: %% map_fold_with_progress(+Mapper:pred(A,B), +Folder:pred(list(B),S,S), +Items:list(A), +S1:S, -S2:S) is det. Daniel@0: % Daniel@0: % Map Mapper over Items and fold over the results with Folder, with progress information Daniel@0: % partial results available. Note that Folder operates on a _list_ of items, not just one Daniel@0: % item at a time. Mapper is used to accumulate up to N items before folding them batchwise Daniel@0: % into the accumulator, where N is the value of the setting =|map_fold_batch|=. Daniel@0: % Daniel@0: % Progress information is returned as a term =|stepwise(Desc,Done/Total)|=, where Daniel@0: % =|Desc=map_fold(Mapper,Folder)|=. If partial results are requested, the current batch Daniel@0: % of mapped items is first folded in before returning the value of the accumulator. Daniel@0: map_fold_with_progress(Mapper,Folder,Items,S1,S2) :- Daniel@0: length(Items,Total), Daniel@0: setting(map_fold_batch,M1), Daniel@0: with_new_ref(StateRef,s(Items,0,M1,Head,Head,S1), Daniel@0: ( Handler=map_fold_progress(map_fold(Mapper,Folder),Total,StateRef), Daniel@0: with_progress_handler(Handler, Daniel@0: ( map_fold_loop(Mapper,Folder,StateRef), Daniel@0: call(Handler,partial_result(nothing),S2))))). Daniel@0: Daniel@0: map_fold_loop(Mapper,Folder,StateRef) :- Daniel@0: b_getval(StateRef,s(Xs1,N1,M1,Head,Ys1,S)), Daniel@0: ( Xs1=[] -> Ys1=[] Daniel@0: ; Xs1=[X|Xs2] Daniel@0: -> call(Mapper,X,Y), !, Daniel@0: Ys1=[Y|Ys2], succ(N1,N2), Daniel@0: State1=s(Xs2,N2,M2,Head,Ys2,S), Daniel@0: ( succ(M2,M1) -> State2=State1 Daniel@0: ; simple_task(folding,flush_fold(Folder,State1,State2)) Daniel@0: ), Daniel@0: b_setval(StateRef,State2), !, Daniel@0: map_fold_loop(Mapper,Folder,StateRef) Daniel@0: ). Daniel@0: Daniel@0: map_fold_progress(Desc,Total,StateRef,progress(_),stepwise(Desc,Done/Total)) :- Daniel@0: b_getval(StateRef,s(_,Done,_,_,_,_)). Daniel@0: map_fold_progress(map_fold(_,Folder),_,StateRef,partial_result(_),S2) :- Daniel@0: b_getval(StateRef,State1), Daniel@0: flush_fold(Folder,State1,State2), Daniel@0: b_setval(StateRef,State2), Daniel@0: State2=s(_,_,_,_,_,S2). Daniel@0: Daniel@0: flush_fold(Folder,s(Xs,Done,_,Head,[],S1),s(Xs,Done,M1,H2,H2,S2)) :- Daniel@0: setting(map_fold_batch,M1), Daniel@0: call(Folder,Head,S1,S2), !. Daniel@0: Daniel@0: % for managing global variable Daniel@0: new_ref(Ref) :- gensym('$ref',Ref). Daniel@0: new_ref(Ref,Val) :- new_ref(Ref), b_setval(Ref,Val). Daniel@0: Daniel@0: with_global(Name,Val,Goal) :- setup_call_cleanup(b_setval(Name,Val), Goal, nb_delete(Name)). Daniel@0: with_new_ref(Name,Val,Goal) :- new_ref(Name), with_global(Name,Val,Goal). Daniel@0: Daniel@0: set_ref(Ref,Val) :- nb_setval(Ref,Val). Daniel@0: increment(Counter) :- b_getval(Counter,N), succ(N,M), b_setval(Counter,M).