comparison cpack/dml/lib/async.pl @ 0:718306e29690 tip

commiting public release
author Daniel Wolff
date Tue, 09 Feb 2016 21:05:06 +0100
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:718306e29690
1 /* Part of DML (Digital Music Laboratory)
2 Copyright 2014-2015 Samer Abdallah, University of London
3
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 as published by the Free Software Foundation; either version 2
7 of the License, or (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19 :- module(async,
20 [ async_memo/4
21 , async_current_job/5
22 , async_thread_progress/4
23 , async_get_progress_callback/1
24 , async_set_progress_callback/1
25 , async_unset_progress_callback/0
26 , async_cancel/3
27 , async_cancel/2
28 , with_progress_stack/1
29 , with_cont/4
30 , simple_task/2
31 , stepwise_task/4
32 , updatable_status_task/3
33 , map_with_progress/3
34 , map_fold_with_progress/5
35 , with_global/3
36 , with_new_ref/3
37 ]).
38
39 /** <module> Asynchronous memoised computations with progress and partial results
40
41 This module provides services for managing memoised computations in a thread pool
42 with the ability to check on the progress of running jobs and receive partial
43 results if they are available. Completed computations are memoised using
44 library(memo).
45
46 ---++ Job control
47 The first half of the module is concerned with managing memoised computations.
48 Computations are requested using async_memo/4. If the result is already available in
49 the memoised database, the result is returned. If the computation is already running,
50 status information about the job is returned. Otherwise, a new job is started. Status
51 information is returned as the type =|async_status(A)|=.
52 ==
53 async_status(A) ---> done(metadata)
54 ; initiated(job_id,natural)
55 ; waiting(natural)
56 ; running(job_id,A))
57 ; recompute(job_id,natural,metadata).
58
59 job_id == atom.
60 ==
61 The type =|metadata|= is defined in library(memo).
62
63
64 ---++ Helper meta-predicates
65 The second half of the module provides a collection of predicates for computations
66 to use to handle requests for progress information, in the form of a stack of tasks.
67 It recognises the following levels of progress information:
68 ==
69 summary :: progress_level(list(any)).
70 partial_result :: progress_level(any).
71 ==
72 */
73 :- meta_predicate
74 async_set_progress_callback(2)
75 , async_memo(+,0,-,+)
76 , with_global(+,+,0)
77 , with_new_ref(-,+,0)
78 , with_progress_stack(0)
79 , with_progress_handler(2,0)
80 , with_cont(+,1,2,-)
81 , simple_task(+,0)
82 , stepwise_task(+,+,-,0)
83 % , stepwise_partial_task(+,+,-,-,0)
84 % , fold_with_progress(3,+,+,-)
85 , map_with_progress(2,+,-)
86 , map_fold_with_progress(2,3,+,+,-).
87
88 :- use_module(library(thread_pool)).
89 :- use_module(library(insist)).
90 :- use_module(library(memo)).
91 :- use_module(library(dcg_core),[seqmap/4]).
92
93 %% async_memo(+P:thread_pool, +G:memo_goal, -S:async_status(_), Opts:options) is det.
94 %
95 % Start running a goal asynchronously in a thread pool if it is not
96 % already running.
97 % In order for the goal to provide progress information, it must use nb_setval/2
98 % to set the global variable =|progress|=.
99 %
100 % Options:
101 % * status_var(Var:status_var(A))
102 % If computation is already running, information of type =|A|=
103 % associated with job is returned in Stats. Default is =|progress(summary)|=.
104 % * recompute(R:oneof([none,failed,test(Pred)]))
105 % If =|none|=, then a previously failed computation is not retried, and the
106 % status =|done(Comp-Result)|= is returned, where Comp includes
107 % the computation time and host and Result is =|fail|= or =|ex(Exception)|=.
108 % If =|failed|=, unsuccessful computations are retried and the status
109 % =|recomputing(ID,Pos,Meta)|= is returned, where =|ID|= is the ID of the
110 % new job, =|Pos|= its initial position in the queue, and =|Meta=Comp-Result|=
111 % the status of the previous computation.
112 % If =|test(Pred)|=, then the computation is retried if =|call(Pred,Meta)|=
113 % succeeds, where Meta is the memoisied computation metadata and the variables
114 % in Goal will be unified with their values resulting from that computation.
115 % The status returned is =|initiated(ID,Pos)|=. Pred must include its module
116 % specifier as a I am too lazy to worry about meta-predicate options right now.
117 % * globals(list(pair(atom,any)))
118 % Set global variables to given values in new thread.
119 %
120 % The value of the =|status_var(_)|= option determines the information returned
121 % if the status of the computation is =|running(ID,StatusVals)|=.
122 % ==
123 % progress(progress_level(A)) :: status_var(A).
124 % stats(list(stats_key)) :: status_var(list(stats_vals)).
125 % time :: status_var(pair(time,float)). % start and elapsed times
126 % status_var(A)-status_var(B) :: status_var(pair(A,B)).
127 %
128 % time==float. % as returned by get_time/1.
129 % ==
130 % The type =|stats_key|= denotes any atom accepted by thread_statistics/3, and
131 % =|stats_vals|= the type of values returned.
132
133 async_memo(_,Goal,done(Meta),Opts) :-
134 option(recompute(none),Opts,none),
135 browse(Goal,Meta), !.
136
137 async_memo(Pool,Goal,Status,Opts) :-
138 with_mutex(async,unsafe_async_memo(Pool,Goal,Status,Opts)).
139
140 unsafe_async_memo(Pool,Goal,Status,Opts) :-
141 option(globals(Globals),Opts,[]),
142 ( browse(Goal,Meta)
143 -> option(recompute(Recompute),Opts,none),
144 ( recompute(Recompute,Meta)
145 -> clear_all(Goal,Meta),
146 spawn(Pool,Goal,Globals,ID,Waiting),
147 Status=recomputing(ID,Waiting,Meta)
148 ; Status=done(Meta)
149 )
150 ; % !!! it's possible that another thread might be computing
151 % this goal and complete at this point. If this happens,
152 % that job will not be detected and a new a job will be started
153 % here. If that happens, the newly created job will find the result
154 % and complete quickly.
155 ( async_current_job(Pool,Goal,ID,T0,S)
156 -> option(progress_levels(Levels),Opts,[summary]),
157 % !!! it's possible that the job will complete here
158 % and so the progress request will create an error
159 goal_status(ID,T0,Levels,S,Status)
160 ; spawn(Pool,Goal,Globals,ID,Waiting),
161 Status=spawned(ID,Waiting)
162 )
163 ).
164
165 recompute(failed,_-fail).
166 recompute(failed,_-ex(_)).
167 recompute(test(P),M) :- call(P,M).
168
169 goal_status(ID, TSubmit, _, waiting(Pos), waiting(ID,TSubmit,Pos)).
170 goal_status(ID, _, Levels, running(Thread,TStart), running(ID,TStart,Thread,Progress)) :-
171 catch( ( async_thread_progress(Thread,Levels,T,Vals), Progress=just(T-Vals)), Ex,
172 ( print_message(warning,error_getting_job_progress(Thread,Ex)), Progress=nothing)).
173
174 spawn(Pool,Goal,Globals,ID,Waiting) :-
175 uuid(ID),
176 (thread_pool_property(Pool,backlog(Waiting)) -> true; Waiting=0),
177 get_time(Time),
178 setup_call_catcher_cleanup(
179 recordz(Pool,job(ID,Time,Goal),Ref),
180 thread_create_in_pool(Pool,spawnee(ID,Goal,Globals),_,
181 [ detached(true), at_exit(erase(Ref))]),
182 exception(_),
183 erase(Ref)).
184
185 spawnee(ID,Goal,Globals) :-
186 setup_call_cleanup(
187 with_mutex(async,
188 ( recorded(ID,async:cancelled,Ref)
189 -> debug(async,'Cancelled before starting: ~q',[Goal]),
190 erase(Ref), fail
191 ; pairs_keys_values(Globals,GNames,GVals),
192 maplist(nb_setval,GNames,GVals),
193 get_time(T0),
194 thread_self(Thread),
195 recordz(ID,running(Thread,T0),Ref)
196 )),
197 ( freeze(Res,check_result(Res)), % prevent memoisation if abort(_) is caught
198 debug(async,"Running asyc goal ~q",[Goal]),
199 memo(Goal,_-Res)),
200 erase(Ref)).
201
202 check_result(Ex) :-
203 ( Ex=ex(abort(Reason))
204 -> debug(async,'Computation aborted (~w)',[Reason]),
205 throw(abort(Reason))
206 ; true
207 ).
208
209 %% async_current_job(+Pool:thread_pool, -Goal:callable, -ID:job_id, -T:time, -S:job_status) is nondet.
210 % Retrieves information about current pending jobs. A job is pending if it is running
211 % or waiting to run in the specified thread pool. T is the time the job was submitted.
212 % Job status is:
213 % ==
214 % job_status ---> waiting(natural) % queued at given postion
215 % ; running(thread_id,time). % with start time
216 % ==
217 async_current_job(Pool,Goal,ID,Time,Status) :-
218 recorded(Pool,job(ID,Time,Goal)), % this finds ALL jobs
219 ( recorded(ID,running(Thread,StartTime))
220 -> Status=running(Thread,StartTime)
221 ; thread_pool_property(Pool,waiting(Waiting)),
222 nth1(Pos,Waiting,create(Pool,async:spawnee(ID,_,_),_,_,_))
223 -> Status=waiting(Pos)
224 ; print_message(warning, async_job_status_indeterminate(Pool,ID,Goal)),
225 Status=indeterminate % could be finished, or transferring from waiting to running
226 ).
227
228
229 %% async_thread_progress(+Thread:thread_id,+Levels:list(progress_level), -T:time, -Vals:list) is det.
230 % Called in main thread to get information about job running in thread Thread.
231 % Will throw an exception if:
232 % - the thread has terminated
233 % - the thread does not respond with progress info in less than 3 seconds
234 % - the thread does not have a progress handler call back registered
235 % - the progress handler fails on any of the progress levels requested.
236 async_thread_progress(Thread,Levels,Time,Vals) :-
237 thread_self(Me),
238 gensym(prog,RID), % progress request ID
239 flush_all_messages(Me,progress(_,_,_,_)),
240 thread_signal(Thread,report_progress(RID,Levels,Me)),
241 ( thread_get_message(Me,progress(RID,Time,Vals,St),[timeout(3)]) -> true
242 ; throw(get_thread_progress_timeout(Thread,Levels))
243 ),
244 debug(async,'Got progress ~w from thread ~w: ~w',[Levels,Thread,Vals]),
245 memo:reflect(St).
246
247 flush_all_messages(Queue,Pattern) :-
248 repeat, \+thread_get_message(Queue,Pattern,[timeout(0)]), !.
249
250 %% report_progress(+RID:atomic, +Var:status_var(A), +Thread:thread_id) is det.
251 % Called from job thread in response to a signal, to send progress information to thread Thread.
252 report_progress(RID,Levels,Thread) :-
253 get_time(Time),
254 memo:reify(async:my_progress(Levels,Vals),St), !,
255 debug(async,'reporting progress(~w): ~q',[Levels,Vals]),
256 thread_send_message(Thread,progress(RID,Time,Vals,St)).
257
258 my_progress(Levels,Vals) :- nb_getval(async_progress,G), insist(maplist(G,Levels,Vals)).
259
260
261 %% async_cancel(+Pool:thread_pool, +ID:job_id, +Ex:term) is det.
262 %% async_cancel(+Pool:thread_pool, +ID:job_id) is det.
263 % Cancel job with given ID, if it is currently pending. If it is running, the
264 % exception term Ex is thrown (=|abort(cancel)|= in async_cancel/2).
265 % If not, the recorded database is used to mark the job for immediate failure
266 % as soon as it is started.
267 async_cancel(Pool,ID) :- async_cancel(Pool,ID,abort(cancel)).
268 async_cancel(Pool,ID,Ex) :-
269 debug(async,'Cancelling ~w:~w with ~w',[Pool,ID,Ex]),
270 with_mutex(async, (async_current_job(Pool,_,ID,_,S) -> cancel(ID,S,Ex); true)).
271
272 cancel(ID,waiting(_),_) :- recordz(ID,async:cancelled).
273 cancel(_, running(Thread,_),Ex) :-
274 catch( thread_signal(Thread,throw(Ex)), E,
275 print_message(warning,error_signalling_job_thread(Thread,E))).
276 cancel(ID,indeterminate,_) :- print_message(warning,indeterminate_job_state_on_cancel(ID)).
277
278 %% async_set_progress_callback(+P:pred(+Level:any,-Info:any)) is det.
279 % Sets the progress handler for the current thread.
280 async_set_progress_callback(Pred) :- nb_setval(async_progress,Pred).
281
282 %% async_get_progress_callback(-P:pred(+Level:any,-Info:any)) is semidet.
283 % Gets the progress handler for the current thread, if one has been set.
284 async_get_progress_callback(Pred) :- nb_current(async_progress,Pred).
285
286 %% async_unset_progress_callback is det.
287 % Unsets the progress handler for the current thread. Subsequent calls to
288 % async_get_progress_callback/1 will fail.
289 async_unset_progress_callback :- nb_delete(async_progress).
290
291 % -----------------------------------------------------------------------
292
293
294 %% with_progress_stack(+Goal:callable) is nondet.
295 %
296 % Runs Goal with the current thread's progress handler (which must be initially
297 % unset) set to an empty task stack. Tasks (goals which run in cooperation with a
298 % a progress handler) can be pushed on to the task stack using with_progress_handler/2.
299 % Fails if progress callback has already been set to something else.
300 %
301 % The progress handler recognises progress level terms:
302 % * summary :: progress_level(list(any))
303 % Yields a list of progress terms, one for each entry in the task stack.
304 % * partial_result :: progress_level(any)
305 % Yields partial results of the computation if they are available.
306 with_progress_stack(Goal) :-
307 async_get_progress_callback(CB), !,
308 CB=async:get_stack_progress,
309 call(Goal).
310 with_progress_stack(Goal) :-
311 with_global(async_progress_stack, nil,
312 setup_call_cleanup(
313 async_set_progress_callback(get_stack_progress), Goal,
314 async_unset_progress_callback)).
315
316 %% get_stack_progress(+Level:progress_level(A),-Info:A) is semidet.
317 % Gets the request level of progress information from the currently installed
318 % progress stack.
319 get_stack_progress(summary,P) :-
320 nb_getval(async_progress_stack,H),
321 call(H,progress(summary),P).
322
323 get_stack_progress(partial_result,R) :-
324 nb_getval(async_progress_stack,H),
325 call(H,partial_result,R).
326
327
328 %% with_progress_handler(+H:progress_handler, +Goal:callable) is nondet.
329 % Pushes the progress handler H to the top of the task stack associated with
330 % the current thread and calls Goal. The handler is removed on exit.
331 with_progress_handler(H,Goal) :-
332 nb_getval(async_progress_stack,H0), call(H0,push(H),H1),
333 setup_call_cleanup( nb_setval(async_progress_stack,H1), Goal,
334 nb_setval(async_progress_stack,H0)).
335
336 nil(progress(_),[]).
337 nil(push(H),chain(H,nil)).
338 nil(partial_result,nothing).
339 nil(partial_result(Cont),R) :- call(Cont,nothing,R).
340
341 chain(Head,Tail1,push(H),chain(Head,Tail2)) :- call(Tail1,push(H),Tail2).
342 chain(Head,Tail,progress(L),[PH|PT]) :-
343 call(Head,progress(L),PH),
344 call(Tail,progress(L),PT).
345 chain(Head,Tail,partial_result,MR) :-
346 call(Tail,partial_result,R1),
347 (call(Head,partial_result(R1),R) -> MR=just(R); MR=nothing).
348
349
350 %% simple_task(+Desc,+Goal:callable) is det.
351 %
352 % Calls goal with a progress handler that reports Desc when progress
353 % information is requested. No partial results are available.
354 simple_task(Desc,Goal) :- with_progress_handler(simple_progress(Desc),Goal).
355
356 simple_progress(Desc,progress(_),Desc).
357
358
359 %% with_cont(+Desc,+Pred:pred(-A),+Cont:pred(+A,-B),-Result:B) is det.
360 %
361 % Calls Pred and then passes the result to Cont yielding Result.
362 % A progress handler is installed such that any partial results yielded by
363 % subtasks in Pred are passed to Cont to yield a partial result. Progress requests
364 % yield Desc.
365 with_cont(Desc,Pred,Cont,Result) :-
366 with_progress_handler(cont(Desc,Cont),
367 (call(Pred,R),call(Cont,R,Result))).
368
369 cont(_,Cont,Tail1,push(H),cont(Cont,Tail2)) :- call(Tail1,push(H),Tail2).
370 cont(Desc,_,progress(_),Desc).
371 cont(_,Cont,partial_result(just(R1)),R) :- call(Cont,R1,R).
372
373 %% stepwise_task(+Desc,+Total:nonneg,-Step:callable,+Goal:callable) is det.
374 %
375 % Calls Goal with a progress handler that is able to report progress as a term
376 % =|stepwise(Desc,Done/Total)|=. Goal is permitted to call Step in order to increment
377 % the =|Done|= counter. Partial results are not available.
378 stepwise_task(Desc,Total,async:increment(Counter),Goal) :-
379 with_new_ref(Counter,0, with_progress_handler(stepwise_progress(Desc,Total,Counter), Goal)).
380
381 stepwise_progress(Desc,Total,Counter,progress(_),stepwise(Desc,Done/Total)) :- b_getval(Counter,Done).
382
383
384 %% updatable_status_task(+Initial,-Update:pred(any),+Goal:callable) is det.
385 %
386 % Calls goal with Update set to a predicate which can be used to set the
387 % current status of the task. This will be reported if progress is requested.
388 % No partial results are available. Initial is the status before Update is called.
389 :- meta_predicate updatable_status_task(+,-,0).
390 updatable_status_task(Initial,async:set_ref(Status),Goal) :-
391 with_new_ref(Status,Initial, with_progress_handler(status_progress(Status),Goal)).
392
393 status_progress(Status,progress(_),S) :- b_getval(Status,S).
394
395
396 map_with_progress(P,Xs,Ys) :-
397 length(Xs,Total),
398 stepwise_task(map(P),Total,Step, maplist(call_and_step(Step,P),Xs,Ys)).
399
400 call_and_step(Step,P,X,Y) :- call(P,X,Y), call(Step).
401
402 /*
403 fold_with_progress(Folder,Items,S0,S1) :-
404 length(Items,N),
405 stepwise_partial_task(folding(Folder),N,Step,Update,
406 seqmap(step_update(Step,Update,Folder),Items,S0,S1)).
407
408 step_update(Step,Update,P,X,S0,S1) :-
409 call(P,X,S0,S1),
410 call(Update,S1),
411 call(Step).
412 */
413
414 :- setting(map_fold_batch,nonneg,512,"Batch size for flushing fold queue.").
415
416 %% map_fold_with_progress(+Mapper:pred(A,B), +Folder:pred(list(B),S,S), +Items:list(A), +S1:S, -S2:S) is det.
417 %
418 % Map Mapper over Items and fold over the results with Folder, with progress information
419 % partial results available. Note that Folder operates on a _list_ of items, not just one
420 % item at a time. Mapper is used to accumulate up to N items before folding them batchwise
421 % into the accumulator, where N is the value of the setting =|map_fold_batch|=.
422 %
423 % Progress information is returned as a term =|stepwise(Desc,Done/Total)|=, where
424 % =|Desc=map_fold(Mapper,Folder)|=. If partial results are requested, the current batch
425 % of mapped items is first folded in before returning the value of the accumulator.
426 map_fold_with_progress(Mapper,Folder,Items,S1,S2) :-
427 length(Items,Total),
428 setting(map_fold_batch,M1),
429 with_new_ref(StateRef,s(Items,0,M1,Head,Head,S1),
430 ( Handler=map_fold_progress(map_fold(Mapper,Folder),Total,StateRef),
431 with_progress_handler(Handler,
432 ( map_fold_loop(Mapper,Folder,StateRef),
433 call(Handler,partial_result(nothing),S2))))).
434
435 map_fold_loop(Mapper,Folder,StateRef) :-
436 b_getval(StateRef,s(Xs1,N1,M1,Head,Ys1,S)),
437 ( Xs1=[] -> Ys1=[]
438 ; Xs1=[X|Xs2]
439 -> call(Mapper,X,Y), !,
440 Ys1=[Y|Ys2], succ(N1,N2),
441 State1=s(Xs2,N2,M2,Head,Ys2,S),
442 ( succ(M2,M1) -> State2=State1
443 ; simple_task(folding,flush_fold(Folder,State1,State2))
444 ),
445 b_setval(StateRef,State2), !,
446 map_fold_loop(Mapper,Folder,StateRef)
447 ).
448
449 map_fold_progress(Desc,Total,StateRef,progress(_),stepwise(Desc,Done/Total)) :-
450 b_getval(StateRef,s(_,Done,_,_,_,_)).
451 map_fold_progress(map_fold(_,Folder),_,StateRef,partial_result(_),S2) :-
452 b_getval(StateRef,State1),
453 flush_fold(Folder,State1,State2),
454 b_setval(StateRef,State2),
455 State2=s(_,_,_,_,_,S2).
456
457 flush_fold(Folder,s(Xs,Done,_,Head,[],S1),s(Xs,Done,M1,H2,H2,S2)) :-
458 setting(map_fold_batch,M1),
459 call(Folder,Head,S1,S2), !.
460
461 % for managing global variable
462 new_ref(Ref) :- gensym('$ref',Ref).
463 new_ref(Ref,Val) :- new_ref(Ref), b_setval(Ref,Val).
464
465 with_global(Name,Val,Goal) :- setup_call_cleanup(b_setval(Name,Val), Goal, nb_delete(Name)).
466 with_new_ref(Name,Val,Goal) :- new_ref(Name), with_global(Name,Val,Goal).
467
468 set_ref(Ref,Val) :- nb_setval(Ref,Val).
469 increment(Counter) :- b_getval(Counter,N), succ(N,M), b_setval(Counter,M).