Daniel@0
|
1 /* Part of DML (Digital Music Laboratory)
|
Daniel@0
|
2 Copyright 2014-2015 Samer Abdallah, University of London
|
Daniel@0
|
3
|
Daniel@0
|
4 This program is free software; you can redistribute it and/or
|
Daniel@0
|
5 modify it under the terms of the GNU General Public License
|
Daniel@0
|
6 as published by the Free Software Foundation; either version 2
|
Daniel@0
|
7 of the License, or (at your option) any later version.
|
Daniel@0
|
8
|
Daniel@0
|
9 This program is distributed in the hope that it will be useful,
|
Daniel@0
|
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
|
Daniel@0
|
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
Daniel@0
|
12 GNU General Public License for more details.
|
Daniel@0
|
13
|
Daniel@0
|
14 You should have received a copy of the GNU General Public
|
Daniel@0
|
15 License along with this library; if not, write to the Free Software
|
Daniel@0
|
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
Daniel@0
|
17 */
|
Daniel@0
|
18
|
Daniel@0
|
19 :- module(async,
|
Daniel@0
|
20 [ async_memo/4
|
Daniel@0
|
21 , async_current_job/5
|
Daniel@0
|
22 , async_thread_progress/4
|
Daniel@0
|
23 , async_get_progress_callback/1
|
Daniel@0
|
24 , async_set_progress_callback/1
|
Daniel@0
|
25 , async_unset_progress_callback/0
|
Daniel@0
|
26 , async_cancel/3
|
Daniel@0
|
27 , async_cancel/2
|
Daniel@0
|
28 , with_progress_stack/1
|
Daniel@0
|
29 , with_cont/4
|
Daniel@0
|
30 , simple_task/2
|
Daniel@0
|
31 , stepwise_task/4
|
Daniel@0
|
32 , updatable_status_task/3
|
Daniel@0
|
33 , map_with_progress/3
|
Daniel@0
|
34 , map_fold_with_progress/5
|
Daniel@0
|
35 , with_global/3
|
Daniel@0
|
36 , with_new_ref/3
|
Daniel@0
|
37 ]).
|
Daniel@0
|
38
|
Daniel@0
|
39 /** <module> Asynchronous memoised computations with progress and partial results
|
Daniel@0
|
40
|
Daniel@0
|
41 This module provides services for managing memoised computations in a thread pool
|
Daniel@0
|
42 with the ability to check on the progress of running jobs and receive partial
|
Daniel@0
|
43 results if they are available. Completed computations are memoised using
|
Daniel@0
|
44 library(memo).
|
Daniel@0
|
45
|
Daniel@0
|
46 ---++ Job control
|
Daniel@0
|
47 The first half of the module is concerned with managing memoised computations.
|
Daniel@0
|
48 Computations are requested using async_memo/4. If the result is already available in
|
Daniel@0
|
49 the memoised database, the result is returned. If the computation is already running,
|
Daniel@0
|
50 status information about the job is returned. Otherwise, a new job is started. Status
|
Daniel@0
|
51 information is returned as the type =|async_status(A)|=.
|
Daniel@0
|
52 ==
|
Daniel@0
|
53 async_status(A) ---> done(metadata)
|
Daniel@0
|
54 ; initiated(job_id,natural)
|
Daniel@0
|
55 ; waiting(natural)
|
Daniel@0
|
56 ; running(job_id,A))
|
Daniel@0
|
57 ; recompute(job_id,natural,metadata).
|
Daniel@0
|
58
|
Daniel@0
|
59 job_id == atom.
|
Daniel@0
|
60 ==
|
Daniel@0
|
61 The type =|metadata|= is defined in library(memo).
|
Daniel@0
|
62
|
Daniel@0
|
63
|
Daniel@0
|
64 ---++ Helper meta-predicates
|
Daniel@0
|
65 The second half of the module provides a collection of predicates for computations
|
Daniel@0
|
66 to use to handle requests for progress information, in the form of a stack of tasks.
|
Daniel@0
|
67 It recognises the following levels of progress information:
|
Daniel@0
|
68 ==
|
Daniel@0
|
69 summary :: progress_level(list(any)).
|
Daniel@0
|
70 partial_result :: progress_level(any).
|
Daniel@0
|
71 ==
|
Daniel@0
|
72 */
|
Daniel@0
|
73 :- meta_predicate
|
Daniel@0
|
74 async_set_progress_callback(2)
|
Daniel@0
|
75 , async_memo(+,0,-,+)
|
Daniel@0
|
76 , with_global(+,+,0)
|
Daniel@0
|
77 , with_new_ref(-,+,0)
|
Daniel@0
|
78 , with_progress_stack(0)
|
Daniel@0
|
79 , with_progress_handler(2,0)
|
Daniel@0
|
80 , with_cont(+,1,2,-)
|
Daniel@0
|
81 , simple_task(+,0)
|
Daniel@0
|
82 , stepwise_task(+,+,-,0)
|
Daniel@0
|
83 % , stepwise_partial_task(+,+,-,-,0)
|
Daniel@0
|
84 % , fold_with_progress(3,+,+,-)
|
Daniel@0
|
85 , map_with_progress(2,+,-)
|
Daniel@0
|
86 , map_fold_with_progress(2,3,+,+,-).
|
Daniel@0
|
87
|
Daniel@0
|
88 :- use_module(library(thread_pool)).
|
Daniel@0
|
89 :- use_module(library(insist)).
|
Daniel@0
|
90 :- use_module(library(memo)).
|
Daniel@0
|
91 :- use_module(library(dcg_core),[seqmap/4]).
|
Daniel@0
|
92
|
Daniel@0
|
93 %% async_memo(+P:thread_pool, +G:memo_goal, -S:async_status(_), Opts:options) is det.
|
Daniel@0
|
94 %
|
Daniel@0
|
95 % Start running a goal asynchronously in a thread pool if it is not
|
Daniel@0
|
96 % already running.
|
Daniel@0
|
97 % In order for the goal to provide progress information, it must use nb_setval/2
|
Daniel@0
|
98 % to set the global variable =|progress|=.
|
Daniel@0
|
99 %
|
Daniel@0
|
100 % Options:
|
Daniel@0
|
101 % * status_var(Var:status_var(A))
|
Daniel@0
|
102 % If computation is already running, information of type =|A|=
|
Daniel@0
|
103 % associated with job is returned in Stats. Default is =|progress(summary)|=.
|
Daniel@0
|
104 % * recompute(R:oneof([none,failed,test(Pred)]))
|
Daniel@0
|
105 % If =|none|=, then a previously failed computation is not retried, and the
|
Daniel@0
|
106 % status =|done(Comp-Result)|= is returned, where Comp includes
|
Daniel@0
|
107 % the computation time and host and Result is =|fail|= or =|ex(Exception)|=.
|
Daniel@0
|
108 % If =|failed|=, unsuccessful computations are retried and the status
|
Daniel@0
|
109 % =|recomputing(ID,Pos,Meta)|= is returned, where =|ID|= is the ID of the
|
Daniel@0
|
110 % new job, =|Pos|= its initial position in the queue, and =|Meta=Comp-Result|=
|
Daniel@0
|
111 % the status of the previous computation.
|
Daniel@0
|
112 % If =|test(Pred)|=, then the computation is retried if =|call(Pred,Meta)|=
|
Daniel@0
|
113 % succeeds, where Meta is the memoisied computation metadata and the variables
|
Daniel@0
|
114 % in Goal will be unified with their values resulting from that computation.
|
Daniel@0
|
115 % The status returned is =|initiated(ID,Pos)|=. Pred must include its module
|
Daniel@0
|
116 % specifier as a I am too lazy to worry about meta-predicate options right now.
|
Daniel@0
|
117 % * globals(list(pair(atom,any)))
|
Daniel@0
|
118 % Set global variables to given values in new thread.
|
Daniel@0
|
119 %
|
Daniel@0
|
120 % The value of the =|status_var(_)|= option determines the information returned
|
Daniel@0
|
121 % if the status of the computation is =|running(ID,StatusVals)|=.
|
Daniel@0
|
122 % ==
|
Daniel@0
|
123 % progress(progress_level(A)) :: status_var(A).
|
Daniel@0
|
124 % stats(list(stats_key)) :: status_var(list(stats_vals)).
|
Daniel@0
|
125 % time :: status_var(pair(time,float)). % start and elapsed times
|
Daniel@0
|
126 % status_var(A)-status_var(B) :: status_var(pair(A,B)).
|
Daniel@0
|
127 %
|
Daniel@0
|
128 % time==float. % as returned by get_time/1.
|
Daniel@0
|
129 % ==
|
Daniel@0
|
130 % The type =|stats_key|= denotes any atom accepted by thread_statistics/3, and
|
Daniel@0
|
131 % =|stats_vals|= the type of values returned.
|
Daniel@0
|
132
|
Daniel@0
|
133 async_memo(_,Goal,done(Meta),Opts) :-
|
Daniel@0
|
134 option(recompute(none),Opts,none),
|
Daniel@0
|
135 browse(Goal,Meta), !.
|
Daniel@0
|
136
|
Daniel@0
|
137 async_memo(Pool,Goal,Status,Opts) :-
|
Daniel@0
|
138 with_mutex(async,unsafe_async_memo(Pool,Goal,Status,Opts)).
|
Daniel@0
|
139
|
Daniel@0
|
140 unsafe_async_memo(Pool,Goal,Status,Opts) :-
|
Daniel@0
|
141 option(globals(Globals),Opts,[]),
|
Daniel@0
|
142 ( browse(Goal,Meta)
|
Daniel@0
|
143 -> option(recompute(Recompute),Opts,none),
|
Daniel@0
|
144 ( recompute(Recompute,Meta)
|
Daniel@0
|
145 -> clear_all(Goal,Meta),
|
Daniel@0
|
146 spawn(Pool,Goal,Globals,ID,Waiting),
|
Daniel@0
|
147 Status=recomputing(ID,Waiting,Meta)
|
Daniel@0
|
148 ; Status=done(Meta)
|
Daniel@0
|
149 )
|
Daniel@0
|
150 ; % !!! it's possible that another thread might be computing
|
Daniel@0
|
151 % this goal and complete at this point. If this happens,
|
Daniel@0
|
152 % that job will not be detected and a new a job will be started
|
Daniel@0
|
153 % here. If that happens, the newly created job will find the result
|
Daniel@0
|
154 % and complete quickly.
|
Daniel@0
|
155 ( async_current_job(Pool,Goal,ID,T0,S)
|
Daniel@0
|
156 -> option(progress_levels(Levels),Opts,[summary]),
|
Daniel@0
|
157 % !!! it's possible that the job will complete here
|
Daniel@0
|
158 % and so the progress request will create an error
|
Daniel@0
|
159 goal_status(ID,T0,Levels,S,Status)
|
Daniel@0
|
160 ; spawn(Pool,Goal,Globals,ID,Waiting),
|
Daniel@0
|
161 Status=spawned(ID,Waiting)
|
Daniel@0
|
162 )
|
Daniel@0
|
163 ).
|
Daniel@0
|
164
|
Daniel@0
|
165 recompute(failed,_-fail).
|
Daniel@0
|
166 recompute(failed,_-ex(_)).
|
Daniel@0
|
167 recompute(test(P),M) :- call(P,M).
|
Daniel@0
|
168
|
Daniel@0
|
169 goal_status(ID, TSubmit, _, waiting(Pos), waiting(ID,TSubmit,Pos)).
|
Daniel@0
|
170 goal_status(ID, _, Levels, running(Thread,TStart), running(ID,TStart,Thread,Progress)) :-
|
Daniel@0
|
171 catch( ( async_thread_progress(Thread,Levels,T,Vals), Progress=just(T-Vals)), Ex,
|
Daniel@0
|
172 ( print_message(warning,error_getting_job_progress(Thread,Ex)), Progress=nothing)).
|
Daniel@0
|
173
|
Daniel@0
|
174 spawn(Pool,Goal,Globals,ID,Waiting) :-
|
Daniel@0
|
175 uuid(ID),
|
Daniel@0
|
176 (thread_pool_property(Pool,backlog(Waiting)) -> true; Waiting=0),
|
Daniel@0
|
177 get_time(Time),
|
Daniel@0
|
178 setup_call_catcher_cleanup(
|
Daniel@0
|
179 recordz(Pool,job(ID,Time,Goal),Ref),
|
Daniel@0
|
180 thread_create_in_pool(Pool,spawnee(ID,Goal,Globals),_,
|
Daniel@0
|
181 [ detached(true), at_exit(erase(Ref))]),
|
Daniel@0
|
182 exception(_),
|
Daniel@0
|
183 erase(Ref)).
|
Daniel@0
|
184
|
Daniel@0
|
185 spawnee(ID,Goal,Globals) :-
|
Daniel@0
|
186 setup_call_cleanup(
|
Daniel@0
|
187 with_mutex(async,
|
Daniel@0
|
188 ( recorded(ID,async:cancelled,Ref)
|
Daniel@0
|
189 -> debug(async,'Cancelled before starting: ~q',[Goal]),
|
Daniel@0
|
190 erase(Ref), fail
|
Daniel@0
|
191 ; pairs_keys_values(Globals,GNames,GVals),
|
Daniel@0
|
192 maplist(nb_setval,GNames,GVals),
|
Daniel@0
|
193 get_time(T0),
|
Daniel@0
|
194 thread_self(Thread),
|
Daniel@0
|
195 recordz(ID,running(Thread,T0),Ref)
|
Daniel@0
|
196 )),
|
Daniel@0
|
197 ( freeze(Res,check_result(Res)), % prevent memoisation if abort(_) is caught
|
Daniel@0
|
198 debug(async,"Running asyc goal ~q",[Goal]),
|
Daniel@0
|
199 memo(Goal,_-Res)),
|
Daniel@0
|
200 erase(Ref)).
|
Daniel@0
|
201
|
Daniel@0
|
202 check_result(Ex) :-
|
Daniel@0
|
203 ( Ex=ex(abort(Reason))
|
Daniel@0
|
204 -> debug(async,'Computation aborted (~w)',[Reason]),
|
Daniel@0
|
205 throw(abort(Reason))
|
Daniel@0
|
206 ; true
|
Daniel@0
|
207 ).
|
Daniel@0
|
208
|
Daniel@0
|
209 %% async_current_job(+Pool:thread_pool, -Goal:callable, -ID:job_id, -T:time, -S:job_status) is nondet.
|
Daniel@0
|
210 % Retrieves information about current pending jobs. A job is pending if it is running
|
Daniel@0
|
211 % or waiting to run in the specified thread pool. T is the time the job was submitted.
|
Daniel@0
|
212 % Job status is:
|
Daniel@0
|
213 % ==
|
Daniel@0
|
214 % job_status ---> waiting(natural) % queued at given postion
|
Daniel@0
|
215 % ; running(thread_id,time). % with start time
|
Daniel@0
|
216 % ==
|
Daniel@0
|
217 async_current_job(Pool,Goal,ID,Time,Status) :-
|
Daniel@0
|
218 recorded(Pool,job(ID,Time,Goal)), % this finds ALL jobs
|
Daniel@0
|
219 ( recorded(ID,running(Thread,StartTime))
|
Daniel@0
|
220 -> Status=running(Thread,StartTime)
|
Daniel@0
|
221 ; thread_pool_property(Pool,waiting(Waiting)),
|
Daniel@0
|
222 nth1(Pos,Waiting,create(Pool,async:spawnee(ID,_,_),_,_,_))
|
Daniel@0
|
223 -> Status=waiting(Pos)
|
Daniel@0
|
224 ; print_message(warning, async_job_status_indeterminate(Pool,ID,Goal)),
|
Daniel@0
|
225 Status=indeterminate % could be finished, or transferring from waiting to running
|
Daniel@0
|
226 ).
|
Daniel@0
|
227
|
Daniel@0
|
228
|
Daniel@0
|
229 %% async_thread_progress(+Thread:thread_id,+Levels:list(progress_level), -T:time, -Vals:list) is det.
|
Daniel@0
|
230 % Called in main thread to get information about job running in thread Thread.
|
Daniel@0
|
231 % Will throw an exception if:
|
Daniel@0
|
232 % - the thread has terminated
|
Daniel@0
|
233 % - the thread does not respond with progress info in less than 3 seconds
|
Daniel@0
|
234 % - the thread does not have a progress handler call back registered
|
Daniel@0
|
235 % - the progress handler fails on any of the progress levels requested.
|
Daniel@0
|
236 async_thread_progress(Thread,Levels,Time,Vals) :-
|
Daniel@0
|
237 thread_self(Me),
|
Daniel@0
|
238 gensym(prog,RID), % progress request ID
|
Daniel@0
|
239 flush_all_messages(Me,progress(_,_,_,_)),
|
Daniel@0
|
240 thread_signal(Thread,report_progress(RID,Levels,Me)),
|
Daniel@0
|
241 ( thread_get_message(Me,progress(RID,Time,Vals,St),[timeout(3)]) -> true
|
Daniel@0
|
242 ; throw(get_thread_progress_timeout(Thread,Levels))
|
Daniel@0
|
243 ),
|
Daniel@0
|
244 debug(async,'Got progress ~w from thread ~w: ~w',[Levels,Thread,Vals]),
|
Daniel@0
|
245 memo:reflect(St).
|
Daniel@0
|
246
|
Daniel@0
|
247 flush_all_messages(Queue,Pattern) :-
|
Daniel@0
|
248 repeat, \+thread_get_message(Queue,Pattern,[timeout(0)]), !.
|
Daniel@0
|
249
|
Daniel@0
|
250 %% report_progress(+RID:atomic, +Var:status_var(A), +Thread:thread_id) is det.
|
Daniel@0
|
251 % Called from job thread in response to a signal, to send progress information to thread Thread.
|
Daniel@0
|
252 report_progress(RID,Levels,Thread) :-
|
Daniel@0
|
253 get_time(Time),
|
Daniel@0
|
254 memo:reify(async:my_progress(Levels,Vals),St), !,
|
Daniel@0
|
255 debug(async,'reporting progress(~w): ~q',[Levels,Vals]),
|
Daniel@0
|
256 thread_send_message(Thread,progress(RID,Time,Vals,St)).
|
Daniel@0
|
257
|
Daniel@0
|
258 my_progress(Levels,Vals) :- nb_getval(async_progress,G), insist(maplist(G,Levels,Vals)).
|
Daniel@0
|
259
|
Daniel@0
|
260
|
Daniel@0
|
261 %% async_cancel(+Pool:thread_pool, +ID:job_id, +Ex:term) is det.
|
Daniel@0
|
262 %% async_cancel(+Pool:thread_pool, +ID:job_id) is det.
|
Daniel@0
|
263 % Cancel job with given ID, if it is currently pending. If it is running, the
|
Daniel@0
|
264 % exception term Ex is thrown (=|abort(cancel)|= in async_cancel/2).
|
Daniel@0
|
265 % If not, the recorded database is used to mark the job for immediate failure
|
Daniel@0
|
266 % as soon as it is started.
|
Daniel@0
|
267 async_cancel(Pool,ID) :- async_cancel(Pool,ID,abort(cancel)).
|
Daniel@0
|
268 async_cancel(Pool,ID,Ex) :-
|
Daniel@0
|
269 debug(async,'Cancelling ~w:~w with ~w',[Pool,ID,Ex]),
|
Daniel@0
|
270 with_mutex(async, (async_current_job(Pool,_,ID,_,S) -> cancel(ID,S,Ex); true)).
|
Daniel@0
|
271
|
Daniel@0
|
272 cancel(ID,waiting(_),_) :- recordz(ID,async:cancelled).
|
Daniel@0
|
273 cancel(_, running(Thread,_),Ex) :-
|
Daniel@0
|
274 catch( thread_signal(Thread,throw(Ex)), E,
|
Daniel@0
|
275 print_message(warning,error_signalling_job_thread(Thread,E))).
|
Daniel@0
|
276 cancel(ID,indeterminate,_) :- print_message(warning,indeterminate_job_state_on_cancel(ID)).
|
Daniel@0
|
277
|
Daniel@0
|
278 %% async_set_progress_callback(+P:pred(+Level:any,-Info:any)) is det.
|
Daniel@0
|
279 % Sets the progress handler for the current thread.
|
Daniel@0
|
280 async_set_progress_callback(Pred) :- nb_setval(async_progress,Pred).
|
Daniel@0
|
281
|
Daniel@0
|
282 %% async_get_progress_callback(-P:pred(+Level:any,-Info:any)) is semidet.
|
Daniel@0
|
283 % Gets the progress handler for the current thread, if one has been set.
|
Daniel@0
|
284 async_get_progress_callback(Pred) :- nb_current(async_progress,Pred).
|
Daniel@0
|
285
|
Daniel@0
|
286 %% async_unset_progress_callback is det.
|
Daniel@0
|
287 % Unsets the progress handler for the current thread. Subsequent calls to
|
Daniel@0
|
288 % async_get_progress_callback/1 will fail.
|
Daniel@0
|
289 async_unset_progress_callback :- nb_delete(async_progress).
|
Daniel@0
|
290
|
Daniel@0
|
291 % -----------------------------------------------------------------------
|
Daniel@0
|
292
|
Daniel@0
|
293
|
Daniel@0
|
294 %% with_progress_stack(+Goal:callable) is nondet.
|
Daniel@0
|
295 %
|
Daniel@0
|
296 % Runs Goal with the current thread's progress handler (which must be initially
|
Daniel@0
|
297 % unset) set to an empty task stack. Tasks (goals which run in cooperation with a
|
Daniel@0
|
298 % a progress handler) can be pushed on to the task stack using with_progress_handler/2.
|
Daniel@0
|
299 % Fails if progress callback has already been set to something else.
|
Daniel@0
|
300 %
|
Daniel@0
|
301 % The progress handler recognises progress level terms:
|
Daniel@0
|
302 % * summary :: progress_level(list(any))
|
Daniel@0
|
303 % Yields a list of progress terms, one for each entry in the task stack.
|
Daniel@0
|
304 % * partial_result :: progress_level(any)
|
Daniel@0
|
305 % Yields partial results of the computation if they are available.
|
Daniel@0
|
306 with_progress_stack(Goal) :-
|
Daniel@0
|
307 async_get_progress_callback(CB), !,
|
Daniel@0
|
308 CB=async:get_stack_progress,
|
Daniel@0
|
309 call(Goal).
|
Daniel@0
|
310 with_progress_stack(Goal) :-
|
Daniel@0
|
311 with_global(async_progress_stack, nil,
|
Daniel@0
|
312 setup_call_cleanup(
|
Daniel@0
|
313 async_set_progress_callback(get_stack_progress), Goal,
|
Daniel@0
|
314 async_unset_progress_callback)).
|
Daniel@0
|
315
|
Daniel@0
|
316 %% get_stack_progress(+Level:progress_level(A),-Info:A) is semidet.
|
Daniel@0
|
317 % Gets the request level of progress information from the currently installed
|
Daniel@0
|
318 % progress stack.
|
Daniel@0
|
319 get_stack_progress(summary,P) :-
|
Daniel@0
|
320 nb_getval(async_progress_stack,H),
|
Daniel@0
|
321 call(H,progress(summary),P).
|
Daniel@0
|
322
|
Daniel@0
|
323 get_stack_progress(partial_result,R) :-
|
Daniel@0
|
324 nb_getval(async_progress_stack,H),
|
Daniel@0
|
325 call(H,partial_result,R).
|
Daniel@0
|
326
|
Daniel@0
|
327
|
Daniel@0
|
328 %% with_progress_handler(+H:progress_handler, +Goal:callable) is nondet.
|
Daniel@0
|
329 % Pushes the progress handler H to the top of the task stack associated with
|
Daniel@0
|
330 % the current thread and calls Goal. The handler is removed on exit.
|
Daniel@0
|
331 with_progress_handler(H,Goal) :-
|
Daniel@0
|
332 nb_getval(async_progress_stack,H0), call(H0,push(H),H1),
|
Daniel@0
|
333 setup_call_cleanup( nb_setval(async_progress_stack,H1), Goal,
|
Daniel@0
|
334 nb_setval(async_progress_stack,H0)).
|
Daniel@0
|
335
|
Daniel@0
|
336 nil(progress(_),[]).
|
Daniel@0
|
337 nil(push(H),chain(H,nil)).
|
Daniel@0
|
338 nil(partial_result,nothing).
|
Daniel@0
|
339 nil(partial_result(Cont),R) :- call(Cont,nothing,R).
|
Daniel@0
|
340
|
Daniel@0
|
341 chain(Head,Tail1,push(H),chain(Head,Tail2)) :- call(Tail1,push(H),Tail2).
|
Daniel@0
|
342 chain(Head,Tail,progress(L),[PH|PT]) :-
|
Daniel@0
|
343 call(Head,progress(L),PH),
|
Daniel@0
|
344 call(Tail,progress(L),PT).
|
Daniel@0
|
345 chain(Head,Tail,partial_result,MR) :-
|
Daniel@0
|
346 call(Tail,partial_result,R1),
|
Daniel@0
|
347 (call(Head,partial_result(R1),R) -> MR=just(R); MR=nothing).
|
Daniel@0
|
348
|
Daniel@0
|
349
|
Daniel@0
|
350 %% simple_task(+Desc,+Goal:callable) is det.
|
Daniel@0
|
351 %
|
Daniel@0
|
352 % Calls goal with a progress handler that reports Desc when progress
|
Daniel@0
|
353 % information is requested. No partial results are available.
|
Daniel@0
|
354 simple_task(Desc,Goal) :- with_progress_handler(simple_progress(Desc),Goal).
|
Daniel@0
|
355
|
Daniel@0
|
356 simple_progress(Desc,progress(_),Desc).
|
Daniel@0
|
357
|
Daniel@0
|
358
|
Daniel@0
|
359 %% with_cont(+Desc,+Pred:pred(-A),+Cont:pred(+A,-B),-Result:B) is det.
|
Daniel@0
|
360 %
|
Daniel@0
|
361 % Calls Pred and then passes the result to Cont yielding Result.
|
Daniel@0
|
362 % A progress handler is installed such that any partial results yielded by
|
Daniel@0
|
363 % subtasks in Pred are passed to Cont to yield a partial result. Progress requests
|
Daniel@0
|
364 % yield Desc.
|
Daniel@0
|
365 with_cont(Desc,Pred,Cont,Result) :-
|
Daniel@0
|
366 with_progress_handler(cont(Desc,Cont),
|
Daniel@0
|
367 (call(Pred,R),call(Cont,R,Result))).
|
Daniel@0
|
368
|
Daniel@0
|
369 cont(_,Cont,Tail1,push(H),cont(Cont,Tail2)) :- call(Tail1,push(H),Tail2).
|
Daniel@0
|
370 cont(Desc,_,progress(_),Desc).
|
Daniel@0
|
371 cont(_,Cont,partial_result(just(R1)),R) :- call(Cont,R1,R).
|
Daniel@0
|
372
|
Daniel@0
|
373 %% stepwise_task(+Desc,+Total:nonneg,-Step:callable,+Goal:callable) is det.
|
Daniel@0
|
374 %
|
Daniel@0
|
375 % Calls Goal with a progress handler that is able to report progress as a term
|
Daniel@0
|
376 % =|stepwise(Desc,Done/Total)|=. Goal is permitted to call Step in order to increment
|
Daniel@0
|
377 % the =|Done|= counter. Partial results are not available.
|
Daniel@0
|
378 stepwise_task(Desc,Total,async:increment(Counter),Goal) :-
|
Daniel@0
|
379 with_new_ref(Counter,0, with_progress_handler(stepwise_progress(Desc,Total,Counter), Goal)).
|
Daniel@0
|
380
|
Daniel@0
|
381 stepwise_progress(Desc,Total,Counter,progress(_),stepwise(Desc,Done/Total)) :- b_getval(Counter,Done).
|
Daniel@0
|
382
|
Daniel@0
|
383
|
Daniel@0
|
384 %% updatable_status_task(+Initial,-Update:pred(any),+Goal:callable) is det.
|
Daniel@0
|
385 %
|
Daniel@0
|
386 % Calls goal with Update set to a predicate which can be used to set the
|
Daniel@0
|
387 % current status of the task. This will be reported if progress is requested.
|
Daniel@0
|
388 % No partial results are available. Initial is the status before Update is called.
|
Daniel@0
|
389 :- meta_predicate updatable_status_task(+,-,0).
|
Daniel@0
|
390 updatable_status_task(Initial,async:set_ref(Status),Goal) :-
|
Daniel@0
|
391 with_new_ref(Status,Initial, with_progress_handler(status_progress(Status),Goal)).
|
Daniel@0
|
392
|
Daniel@0
|
393 status_progress(Status,progress(_),S) :- b_getval(Status,S).
|
Daniel@0
|
394
|
Daniel@0
|
395
|
Daniel@0
|
396 map_with_progress(P,Xs,Ys) :-
|
Daniel@0
|
397 length(Xs,Total),
|
Daniel@0
|
398 stepwise_task(map(P),Total,Step, maplist(call_and_step(Step,P),Xs,Ys)).
|
Daniel@0
|
399
|
Daniel@0
|
400 call_and_step(Step,P,X,Y) :- call(P,X,Y), call(Step).
|
Daniel@0
|
401
|
Daniel@0
|
402 /*
|
Daniel@0
|
403 fold_with_progress(Folder,Items,S0,S1) :-
|
Daniel@0
|
404 length(Items,N),
|
Daniel@0
|
405 stepwise_partial_task(folding(Folder),N,Step,Update,
|
Daniel@0
|
406 seqmap(step_update(Step,Update,Folder),Items,S0,S1)).
|
Daniel@0
|
407
|
Daniel@0
|
408 step_update(Step,Update,P,X,S0,S1) :-
|
Daniel@0
|
409 call(P,X,S0,S1),
|
Daniel@0
|
410 call(Update,S1),
|
Daniel@0
|
411 call(Step).
|
Daniel@0
|
412 */
|
Daniel@0
|
413
|
Daniel@0
|
414 :- setting(map_fold_batch,nonneg,512,"Batch size for flushing fold queue.").
|
Daniel@0
|
415
|
Daniel@0
|
416 %% map_fold_with_progress(+Mapper:pred(A,B), +Folder:pred(list(B),S,S), +Items:list(A), +S1:S, -S2:S) is det.
|
Daniel@0
|
417 %
|
Daniel@0
|
418 % Map Mapper over Items and fold over the results with Folder, with progress information
|
Daniel@0
|
419 % partial results available. Note that Folder operates on a _list_ of items, not just one
|
Daniel@0
|
420 % item at a time. Mapper is used to accumulate up to N items before folding them batchwise
|
Daniel@0
|
421 % into the accumulator, where N is the value of the setting =|map_fold_batch|=.
|
Daniel@0
|
422 %
|
Daniel@0
|
423 % Progress information is returned as a term =|stepwise(Desc,Done/Total)|=, where
|
Daniel@0
|
424 % =|Desc=map_fold(Mapper,Folder)|=. If partial results are requested, the current batch
|
Daniel@0
|
425 % of mapped items is first folded in before returning the value of the accumulator.
|
Daniel@0
|
426 map_fold_with_progress(Mapper,Folder,Items,S1,S2) :-
|
Daniel@0
|
427 length(Items,Total),
|
Daniel@0
|
428 setting(map_fold_batch,M1),
|
Daniel@0
|
429 with_new_ref(StateRef,s(Items,0,M1,Head,Head,S1),
|
Daniel@0
|
430 ( Handler=map_fold_progress(map_fold(Mapper,Folder),Total,StateRef),
|
Daniel@0
|
431 with_progress_handler(Handler,
|
Daniel@0
|
432 ( map_fold_loop(Mapper,Folder,StateRef),
|
Daniel@0
|
433 call(Handler,partial_result(nothing),S2))))).
|
Daniel@0
|
434
|
Daniel@0
|
435 map_fold_loop(Mapper,Folder,StateRef) :-
|
Daniel@0
|
436 b_getval(StateRef,s(Xs1,N1,M1,Head,Ys1,S)),
|
Daniel@0
|
437 ( Xs1=[] -> Ys1=[]
|
Daniel@0
|
438 ; Xs1=[X|Xs2]
|
Daniel@0
|
439 -> call(Mapper,X,Y), !,
|
Daniel@0
|
440 Ys1=[Y|Ys2], succ(N1,N2),
|
Daniel@0
|
441 State1=s(Xs2,N2,M2,Head,Ys2,S),
|
Daniel@0
|
442 ( succ(M2,M1) -> State2=State1
|
Daniel@0
|
443 ; simple_task(folding,flush_fold(Folder,State1,State2))
|
Daniel@0
|
444 ),
|
Daniel@0
|
445 b_setval(StateRef,State2), !,
|
Daniel@0
|
446 map_fold_loop(Mapper,Folder,StateRef)
|
Daniel@0
|
447 ).
|
Daniel@0
|
448
|
Daniel@0
|
449 map_fold_progress(Desc,Total,StateRef,progress(_),stepwise(Desc,Done/Total)) :-
|
Daniel@0
|
450 b_getval(StateRef,s(_,Done,_,_,_,_)).
|
Daniel@0
|
451 map_fold_progress(map_fold(_,Folder),_,StateRef,partial_result(_),S2) :-
|
Daniel@0
|
452 b_getval(StateRef,State1),
|
Daniel@0
|
453 flush_fold(Folder,State1,State2),
|
Daniel@0
|
454 b_setval(StateRef,State2),
|
Daniel@0
|
455 State2=s(_,_,_,_,_,S2).
|
Daniel@0
|
456
|
Daniel@0
|
457 flush_fold(Folder,s(Xs,Done,_,Head,[],S1),s(Xs,Done,M1,H2,H2,S2)) :-
|
Daniel@0
|
458 setting(map_fold_batch,M1),
|
Daniel@0
|
459 call(Folder,Head,S1,S2), !.
|
Daniel@0
|
460
|
Daniel@0
|
461 % for managing global variable
|
Daniel@0
|
462 new_ref(Ref) :- gensym('$ref',Ref).
|
Daniel@0
|
463 new_ref(Ref,Val) :- new_ref(Ref), b_setval(Ref,Val).
|
Daniel@0
|
464
|
Daniel@0
|
465 with_global(Name,Val,Goal) :- setup_call_cleanup(b_setval(Name,Val), Goal, nb_delete(Name)).
|
Daniel@0
|
466 with_new_ref(Name,Val,Goal) :- new_ref(Name), with_global(Name,Val,Goal).
|
Daniel@0
|
467
|
Daniel@0
|
468 set_ref(Ref,Val) :- nb_setval(Ref,Val).
|
Daniel@0
|
469 increment(Counter) :- b_getval(Counter,N), succ(N,M), b_setval(Counter,M).
|