diff cpack/dml/api/perspectives.pl @ 0:718306e29690 tip

commiting public release
author Daniel Wolff
date Tue, 09 Feb 2016 21:05:06 +0100
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cpack/dml/api/perspectives.pl	Tue Feb 09 21:05:06 2016 +0100
@@ -0,0 +1,542 @@
+/* Part of DML (Digital Music Laboratory)
+	Copyright 2014-2015 Samer Abdallah, University of London
+	 
+	This program is free software; you can redistribute it and/or
+	modify it under the terms of the GNU General Public License
+	as published by the Free Software Foundation; either version 2
+	of the License, or (at your option) any later version.
+
+	This program is distributed in the hope that it will be useful,
+	but WITHOUT ANY WARRANTY; without even the implied warranty of
+	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+	GNU General Public License for more details.
+
+	You should have received a copy of the GNU General Public
+	License along with this library; if not, write to the Free Software
+	Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+:- module(perspectives, []).
+
+/** <module> VIS API Perspectives
+
+   Todo
+
+   - Chord sequences
+   - Standardise data structures
+*/
+:- use_module(library(http/http_dispatch), [http_link_to_id/3]).
+:- use_module(library(semweb/rdf_db)).
+:- use_module(library(semweb/rdf_label)).
+:- use_module(library(dcg_core)).
+:- use_module(library(insist)).
+:- use_module(library(computations)).
+:- use_module(library(backend_json)).
+:- use_module(library(dataset)).
+:- use_module(library(memo)).
+:- use_module(library(async)).
+:- use_module(library(mlserver)).
+:- use_module(api(dmlvis)).
+:- use_module(api(archive)).
+
+% :- setting(memoise_failures,boolean,false,"Whether or not to record failed computations to avoid retrying").
+:- setting(default_recompute_policy,oneof([none,failed,force]),none,'Default policy on recomputing memoised computations').
+:- setting(default_vamp_on_demand,boolean,false,'Default policy on doing VAMP computations on demand').
+
+% registry of perspectives.
+dmlvis:perspective( getRecordingPerspective,  Name, 
+                    [+uri(URI),vamp_on_demand(V)-false|Params], 
+                    cc(perspectives:rla(Pred,[vamp_on_demand(V)],URI))
+                  ) :- rec_persp(Name, Params, Pred).
+
+dmlvis:perspective( getCollectionPerspective, Name, 
+                    [+cid(CID),recompute(R)-none,vamp_on_demand(V)-false,coverage(C)-summary|Params], 
+                    cc(perspectives:cla(Pred,[recompute(R),vamp_on_demand(V),coverage(C)],CID))
+                  ) :- coll_persp(Name, Params, Pred).
+
+:- meta_predicate rla(2,+,+,-,-), cla(2,+,+,-,-).
+
+rla(Pred,Opts,URI,Result,stable) :- 
+   option(vamp_on_demand(V), Opts, false),
+   with_global(vamp_on_demand, V, call(Pred,URI,Result)).
+
+cla(Pred,Opts,CID,Result,stable) :- 
+   check_collection(CID), 
+   call(Pred,Opts,CID,Result1),
+   option(coverage(Cov),Opts,full),
+   insist(filter_coverage(Cov,Result1,Result), invalid_coverage_parameter(Cov)).
+
+:- op(1050,xfy,=>).
+G1 => G2 --> (call_dcg(G1) -> call_dcg(G2); []).
+
+filter_coverage(full) --> [].
+filter_coverage(summary) --> dtrans(coverage,C1,C2) => {summarise_coverage(C1,C2)}.
+summarise_coverage --> foldl(replace_list_with_length,[failed,errors],[failed_count,errors_count]).
+replace_list_with_length(Key) --> dtrans(Key,List,Length) => {length(List,Length)}.
+replace_list_with_length(Key1,Key2) --> ddel(Key1,List) => {length(List,Len)}, dput(Key2,Len).
+dtrans(Key,Val1,Val2,D1,D2) :- get_dict(Key,D1,Val1,D2,Val2).
+ddel(Key,Val,D1,D2) :- del_dict(Key,D1,Val,D2).
+dput(Key,Val,D1,D2) :- put_dict(Key,D1,Val,D2).
+% dget(Key,Val,D,D) :- get_dict(Key,D,Val).
+
+check_collection(CID) :-
+   insist(dataset_size(CID,Size), unknown_collection(CID)),
+   debug(dmlvis(perspective),'Doing collection level analysis on ~d items.',[Size]).
+
+rec_persp( transcription,      [], output_link(transcription(0))).
+rec_persp( transcription_fine, [], output_link(transcription(1))).
+rec_persp( chords,             [], output_link(chords)).
+rec_persp( chord_notes,        [], output_link(chord_notes)).
+rec_persp( beatroot,           [], output_link(beats(beatroot))).
+rec_persp( key,                [], output_link(key)).
+rec_persp( key_tonic,          [], output_link(tonic)).
+rec_persp( beats,              [], output_link(beats(qm))).
+rec_persp( tempo,              [], output_link(tempo)).
+rec_persp( chromagram,         [], output_link(chromagram)).
+rec_persp( mfcc,               [], output_link(mfcc)).
+
+rec_persp( spectrogram,          [offset(O)-0,length(L)-60], spectrogram_link(O,L)).
+rec_persp( tempo_nonuniform,     [],                         nonuniform_tempo).
+rec_persp( tempo_uniform,        [period(DT)-1,lang(L)-ml ], uniform_tempo(L,DT)).
+rec_persp( tempo_normalised,     [num_samples(N)-20,lang(L)-ml ], normalised_tempo(L,N)).
+rec_persp( chord_histogram,      [],                         chord_histogram).
+rec_persp( midi_pitch_histogram, [weighting(W)-none],        pitch_histogram(W)).
+rec_persp( pitch_histogram,      [weighting(W)-none, quant(Q)-5, min(Min)-0, max(Max)-127, lang(L)-ml ], 
+           freq_histogram(L,Min,Max,Q,W)).
+rec_persp( tempo_histogram,      [period(DT)-1, num_bins(N)-50, min(Min)-20, max(Max)-360, lang(L)-ml ], 
+           tempo_histogram(L,DT,Min,Max,N)).
+
+%% coll_persp(P:perspective(A), Params:list(param), Pred:pred(+options,+dataset,-A)) is nondet.
+%
+%  Database of collection perspectives. The first argument is an atom denoting a perspective
+%  which returns results of type A. Params must be defined as in dmlvis:options_optspec/2.
+%  Pred must accept a list of options and a collection (dataset) id and produce a result.
+coll_persp( mean_tempo_curve,     [num_samples(N)-20,lang(L)-ml], mem(collection_tempo_curve(L,N))).
+coll_persp( midi_pitch_histogram, [weighting(W)-none],            mem(collection_pitch_histogram(W))).
+coll_persp( pitch_histogram,      [weighting(W)-none, quant(Q)-5, min(Min)-20, max(Max)-100, lang(L)-ml], 
+            mem(collection_freq_histogram(L,Min,Max,Q,W))).
+coll_persp( tempo_histogram,      [period(DT)-1, num_bins(N)-50, min(Min)-20, max(Max)-100, lang(L)-ml], 
+            mem(collection_tempo_histogram(L,DT,Min,Max,N))).
+coll_persp( pitch_lookup,         [+midi_pitch(P), weighting(W)-none, limit(Lim)-5000,offset(Off)-0], 
+            nomem(collection_pitch_lookup(W,P,Lim,Off))).
+
+% using python back-end
+coll_persp( tonic_relative_pitch_class_histogram, [], 
+            mem(py_hist(transcription_tonic_duration, tonic_norm_semitone_hist:aggregate, [opts{normalisation:piece}]))).
+coll_persp( tonic_histogram,        [], mem(py_hist(tagged(tonic), key_tonic_hist:aggregate, []))).
+coll_persp( pitch_class_histogram,  [], mem(py_hist(tagged(transcription), semitone_hist:aggregate, []))).
+coll_persp( tuning_stats,           [], mem(py_cla(tagged(transcription(1)),tuning_stats:per_file,[]))).
+coll_persp( tuning_stats_by_year,   [], mem(py_cla(transcription_date,tuning_stats_byyear:per_file,[]))).
+coll_persp( places_hist,            [], nomem(py_cla(list_places,places_hist:per_file,[]))).
+coll_persp( key_relative_chord_seq, 
+            [  spm_minlen(MinLen)-2, spm_maxseqs(MaxSeqs)-500, spm_algorithm(Alg)-'CM-SPADE',
+               spm_ignore_n(Ignn)-1, spm_maxtime(Smaxt)-60,    spm_minsupport(Smins)-50 ], 
+            mem(py_cla( keys_chords,chord_seq_key_relative:aggregate, 
+                        [opts{ spm_minlen:MinLen, spm_maxseqs:MaxSeqs, spm_algorithm:Alg,
+                               spm_ignore_n:Ignn,spm_maxtime:Smaxt,spm_minsupport:Smins } ]))).
+
+coll_persp( similarity,     
+            [  sim_downsample(SimDown)-1,sim_clusters(SimClusters)-40,sim_reclimit(Limo)-2000,
+               sim_type(SimType)-'euclidean',sim_features(SimFeat)-'chromagram',
+               sim_compressor(SimComp)-'zlib'], 
+            mem(py_cla(similarity_bundle,similarity:per_file,
+                       [opts{sim_type:SimType,sim_clusters:SimClusters,sim_downsample:SimDown,
+                             sim_reclimit:Limo,sim_features:SimFeat,sim_compressor:SimComp}]))).
+
+% adaptor to ignore collection perspective options parameter
+nomem(Goal,Opts,CID,Result) :- 
+   option(vamp_on_demand(V), Opts, false),
+   with_global(vamp_on_demand, V, 
+      with_progress_stack(call(Goal,CID,Result))).
+											   
+dmlvis:param( recompute, [oneof([none,failed,force]), default(Def), 
+                          description('Controls handling of memoised collection level results')]) :-
+   setting(default_recompute_policy,Def).
+dmlvis:param( vamp_on_demand, [boolean, default(Def), 
+                           description('Whether to run VAMP plugins if results are not already available')]) :-
+   setting(default_vamp_on_demand,Def).
+dmlvis:param( coverage, [oneof([full,summary]), default(summary), 
+                           description('How much detail to provide about recordings not successfully included in CLA')]).
+dmlvis:param( offset,    [number, default(0), description('Offset into signal in seconds')]).
+dmlvis:param( length,    [number, default(60), description('Length of signal extract in seconds')]).
+dmlvis:param( weighting, [oneof([none,dur,vel]), default(none), description('Weighting for pitch_histogram perspective')]).
+dmlvis:param( quant,     [nonneg, default(5), description('Subdivisions of a semitone for freq_histogram')]).
+dmlvis:param( period,    [number, default(1), description('Sampling period in seconds')]).
+dmlvis:param( num_bins,  [nonneg, default(50), description('Number of bins for histogram')]).
+dmlvis:param( num_samples, [nonneg, default(50), description('Number of samples for normalised histogram')]).
+dmlvis:param( max,       [nonneg, default(100), description('Max pitch for pitch histogram')]).
+dmlvis:param( min,       [nonneg, default(20), description('Min pitch for pitch histogram')]).
+dmlvis:param( lang,      [oneof([ml,r]), default(r), description('Numerical computations language')]).
+
+/* chord sequence parameters */
+dmlvis:param( spm_minlen,    [nonneg, default(2),          description('Minimum length of chord sequence')]).
+dmlvis:param( spm_maxseqs,   [nonneg, default(500),        description('Maximum number of sequences to return')]).
+dmlvis:param( spm_algorithm, [atom,   default('CM-SPADE'), description('CM-SPADE, TKS or ClaSP')]).
+dmlvis:param( spm_ignore_n,   [nonneg, default(1),         description('Ignore failed chord detections')]).
+dmlvis:param( spm_maxtime,   [nonneg, default(60),         description('Max. runtime for SPM algorithm')]).
+dmlvis:param( spm_minsupport,   [nonneg, default(50),         description('Minimal Support in Percent')]).
+
+
+/* similarity parameters */
+dmlvis:param( sim_type, [atom,   default('euclidean'), description('Tpye of similarity measure: euclidean, compression')]).
+dmlvis:param( sim_clusters, [nonneg,   default(40), description('Number of clusters for vector Quantisation (40-200)')]).
+dmlvis:param( sim_downsample, [nonneg,   default(1), description('Downsample the audio analysis to a resolution of 1 second')]).
+dmlvis:param( sim_reclimit, [nonneg,   default(2000), description('Maximum number of recordings in dataset')]).
+dmlvis:param( sim_features, [atom,   default('chromagram'), description('Feature basis of the similarity estimation, any combination, separated by comma: chromagram,mfcc,chords')]).
+dmlvis:param( sim_compressor, [atom,   default('zlib'), description('Compressor for similarity estimation: zlib, zxd')]).
+
+
+:- rdf_meta transform_computation(+,r,r).
+transform_computation(Class,In,Out) :- 
+   (  transform(Class,Fn), computation(Fn,In,Out) *-> true
+   ;  (  nb_current(vamp_on_demand,true)
+      -> insist(transform(Class,Fn), unrecognised_transform_class(Class)), % picks first match 
+         format(string(Desc),"Running computation ~w on ~w.",[Fn,In]),
+         simple_task(Desc,computation_memo(Fn,In,Out))
+      ;  throw(missing_computation(Class,In))
+      )
+   ).
+
+:- rdf_meta transform_op(+,+,r,-).
+transform_op(TName,Op,In,Out) :- 
+   transform_computation(TName,In,X), 
+   csv_op(Op,X,Out).
+
+% ------- recording level perspectives ------------
+
+spectrogram_link(Offs,Len,URI,_{image_url:Link}) :-
+   http_link_to_id(spectrogram_window, [uri(URI), offset(Offs), length(Len)], Link).
+
+output_link(TransformName,Input,_{csv:Output}) :-
+   transform_computation(TransformName,Input,Output).
+
+chord_histogram(URI,_{values:Chords, counts:Counts}) :-
+   transform_op(chords,chord_hist,URI,Hist),
+   unzip(Hist,Chords,Counts).
+
+pitch_histogram(W,URI,_{values:NNs, counts:Counts}) :-
+   transform_op(transcription,pitch_hist(W),URI,Hist),
+   unzip(Hist,NNs,Counts).
+
+freq_histogram(ml,Min,Max,Q,W,URI,_{edges:Edges, counts:Counts}) :-
+   microtone_map(Min,Max,Q,Map),
+   transform_op(transcription(1),freq_hist(Map,W),URI,Counts),
+   map_edges(ml,Map,Edges).
+freq_histogram(r,Min,Max,Q,W,URI,_{edges:Edges, counts:Counts}) :-
+   microtone_map(Min,Max,Q,Map),
+   transform_op(transcription(1),freq_hist_r(Map,W),URI,Counts),
+   map_edges(r,Map,Edges).
+
+nonuniform_tempo(URI,_{times:Times, values:Values}) :-
+   transform_op(tempo,tempo,URI,Result),
+   unzip(Result,Times,Values).
+
+uniform_tempo(ml,DT,URI,_{times:Times, values:Values}) :-
+   transform_op(tempo,uniform_tempo(DT),URI,Result),
+   Result=Times-Values.
+
+uniform_tempo(r,DT,URI,_{times:Times, values:Values}) :-
+   transform_op(tempo,uniform_tempo_r(DT),URI,Result),
+   Result=Times-Values.
+
+normalised_tempo(ml,N,URI,_{times:Times, values:Values}) :-
+   transform_op(tempo,normalised_tempo(N),URI,Result),
+   Result=Times-Values.
+
+normalised_tempo(r,N,URI,_{times:Times, values:Values}) :-
+   transform_op(tempo,normalised_tempo_r(N),URI,Result),
+   Result=Times-Values.
+
+tempo_histogram(ml,DT,Min,Max,N,URI,_{edges:Edges, counts:Counts}) :-
+   insist(Min>0, domain_error(min,"positive value",Min)), 
+   Map=expmap(Min,Max,N),
+   map_edges(ml,Map,Edges),
+   transform_op(tempo,tempo_hist(DT,Map),URI,Result),
+   Result=_-Counts.
+tempo_histogram(r,DT,Min,Max,N,URI,_{edges:Edges, counts:Counts}) :-
+   insist(Min>0, domain_error(min,"positive value",Min)), 
+   Map=expmap(Min,Max,N),
+   map_edges(r,Map,Edges),
+   transform_op(tempo,tempo_hist_r(DT,Map),URI,Result),
+   Result=_-Counts.
+
+
+% ------- collection level perspectives ------------
+
+collection_pitch_histogram(W,CID,Result) :-
+   Min-Max = 20-100, % !!! FIXME
+   numlist(Min,Max,NNs),
+   dataset_histogram(CID, dense_pitch_hist(Min,Max,W), _{values:NNs}, Result).
+
+collection_pitch_lookup(Weighting, Pitch, Lim, Offset, CID, Result) :-
+   map_reduce_dataset(rec_pitch_hist(Weighting), pitch_lookup_cont(Pitch,Lim,Offset), CID, Result).
+
+pitch_lookup_cont(Pitch,Lim,Offset,RecHists, _{items:Items}) :-
+   findall( _{ uri: Rec, label:Label, count:Count, prob:Prob },
+            offset(Offset, limit(Lim, order_by( [desc(Prob)], 
+                   (  member(Rec-Hist,RecHists),
+                      rdf_display_label(Rec,Label),
+                      pitch_hist_prob(Hist,Pitch,Count,Prob)
+                   )))),
+            Items).
+
+% collection_pitch_lookup_alt(Weighting, Pitch, Lim, Offset, CID, _{ items:Items, coverage:Coverage}) :-
+%    findall_map_coverage(dataset_item(CID), rec_transcription, RecTrans, Coverage),
+%    findall( _{ uri: Rec, label:Label, count:Count, prob:Prob },
+%             offset(Offset, limit(Lim, order_by( [desc(Prob)],
+%                    (  csv_pitch_count_prob(Weighting,Trans,Pitch,Count,Prob),
+%                       member(Rec-Trans,RecTrans),
+%                       rdf_display_label(Rec,Label)
+%                    )))),
+%             Items).
+%
+% rec_transcription(Rec,Rec-Transcription) :- transform_computation(transcription,Rec,Transcription).
+
+collection_freq_histogram(Lang,Min,Max,Q,W,CID,Result) :-
+   Map=binmap(Min,Max,(Max-Min)*Q+1),
+   map_edges(Lang,Map,Edges),
+   dataset_histogram(CID, dense_freq_hist(Lang,Map,W),_{edges:Edges}, Result).
+
+collection_tempo_histogram(Lang,DT,Min,Max,N,CID,Result) :-
+   insist(Min>0, domain_error(min,"positive value",Min)), 
+   Map=expmap(Min,Max,N),
+   map_edges(Lang,Map,Edges),
+   dataset_histogram(CID, tempo_hist(Lang,DT,Map), _{edges:Edges}, Result).
+
+collection_tempo_curve(Lang,N,CID, Result) :-
+   map_reduce_dataset(tempo_curve(Lang,N), tempo_curves_stats(Lang), CID, Result).
+
+
+dataset_histogram(CID, Mapper, Dict, Result) :-
+   dataset_map_fold_reduce(CID,Mapper,with_dl(fold_hist),finish_hist(Dict),nothing,Result).
+              
+fold_hist([], S, S) :- !.
+fold_hist(Xs, just(C1), just(C2)) :- !, insist(seqmap(maplist(add),Xs,C1,C2)).
+fold_hist([X|Xs], nothing, just(C)) :- insist(seqmap(maplist(add),Xs,X,C)).
+
+finish_hist(Dict,just(Counts),Hist) :- put_dict(counts,Dict,Counts,Hist).
+
+py_hist(Mapper, PyFunction, Args, CID, _{counts:H,values:D,coverage:C,py_coverage:PYC}) :-
+   py_cla(Mapper,PyFunction,Args, CID, _{stats:_{counts:H,domain:D},coverage:C,py_coverage:PYC}).
+
+py_cla(Mapper, PyFunction, Args, CID, Result) :-
+   map_reduce_dataset(Mapper, py_cla_cont(PyFunction,Args), CID, Result).
+
+py_cla_cont(PyFunction,Args, Ok, _{stats:Result, py_coverage:Coverage}) :-
+   python_apply(PyFunction,[Ok|Args],Reply),
+   Reply = _{result:Result, stats:Coverage}.
+
+
+% CLA mappers
+rec_pitch_hist(W,Rec,Rec-Hist) :- transform_op(transcription,pitch_hist(W),Rec,Hist).
+
+dense_pitch_hist(Min,Max,W,Rec,DenseHist) :-
+   transform_op(transcription,pitch_hist(W),Rec,SparseHist),
+   sparse_to_dense(Min,Max,SparseHist,DenseHist).
+
+dense_freq_hist(ml,Map,W,Rec,Counts) :-
+   transform_op(transcription(1),freq_hist(Map,W),Rec,Counts).
+dense_freq_hist(r,Map,W,Rec,Counts) :-
+   transform_op(transcription(1),freq_hist_r(Map,W),Rec,Counts).
+
+tempo_hist(ml,DT,Map,Rec,Counts) :-
+   transform_op(tempo,tempo_hist(DT,Map),Rec,Result),
+   Result=_-Counts.
+tempo_hist(r,DT,Map,Rec,Counts) :-
+   transform_op(tempo,tempo_hist_r(DT,Map),Rec,Result),
+   Result=_-Counts.
+
+tempo_curve(ml,N,Rec,Values) :-
+   transform_op(tempo,normalised_tempo(N),Rec,Result),
+   Result=_-Values.
+tempo_curve(r,N,Rec,Values) :-
+   transform_op(tempo,normalised_tempo_r(N),Rec,Result),
+   Result=_-Values.
+
+transcription_tonic_duration(Rec, _{transcription: Transcription, tonic: Tonic, duration:0 }) :-
+   tagged(transcription,Rec,Transcription),
+   tagged(tonic,Rec,Tonic).
+
+transcription_date(Rec, _{transcription: Transcription, date:Date}) :-
+   tagged(transcription(1),Rec,Transcription),
+   insist(recording_property(Rec,date,Date),missing_property(Rec,date)).
+
+keys_chords(Rec, _{keys: Keys, chords:Chords}) :-
+   tagged(key,Rec,Keys),
+   tagged(chords,Rec,Chords).
+   
+similarity_bundle(Rec, _{chromagram: Chromagram, mfcc:Mfcc, keys: Keys, chords:Chords, list:_{uri:Rec, label:Label}}) :-
+   % nb_getval(vamp_on_demand,Vamp),
+   % concurrent_maplist(tagged_parallel(Vamp,Rec),[chromagram,mfcc,key,chords],[Chromagram,Mfcc,Keys,Chords]),
+   maplist(tagged,[chromagram,mfcc,key,chords],[Rec,Rec,Rec,Rec],[Chromagram,Mfcc,Keys,Chords]),
+   insist(recording_property(Rec,label,Label),missing_property(Rec,label)).
+   
+tagged_parallel(Vamp,Rec,Transform,Result) :-
+   nb_setval(vamp_on_demand,Vamp),
+   with_progress_stack(tagged(Transform,Rec,Result)).
+
+list_places(Rec, _{place:Place,list:_{uri:Rec, label:Label}}) :-
+   insist(recording_property(Rec,place,Place),missing_property(Rec,place)),
+   insist(recording_property(Rec,label,Label),missing_property(Rec,label)).
+   
+% for later...
+% tagged_list(Spec,Rec,Dict) :-
+%    maplist(tagged_item(Rec),Spec,Pairs),
+%    dict_create(Dict,_,Pairs).
+% tagged_item(Rec,Key:Transform,Key:Value) :- tagged(Transform,Rec,Value).
+
+tagged(Transform,Input,csv{value:Path}) :- 
+   transform_computation(Transform,Input,R), uri_absolute_path(R,Path).
+
+% ---------------------------------------------------
+
+:- initialization time(memo_attach(memo(perspectives),[])).
+
+:- persistent_memo cla_memo(+spec:ground,+cid:atom,-result:any).
+cla_memo(Spec,CID,Result) :- 
+   debug(perspectives(cla),'cla_mem: ~q',[call(Spec,CID,Result)]),
+   with_progress_stack(call(Spec,CID,Result)).
+
+%% mem(+Spec:pred(+cid,-A),+Opts:options,+CID:cid,-Result:A) is det.
+%
+%  Asynchronous memoised collection-level computation.
+%  Spec must be a ground term that can be called with two arguments: the id of a
+%  collection and a variable, which must be bound to an arbitrary result term on exit.
+%  If the computation has already be done and memoised in cla_memo/3, then the result is
+%  retrieved. Otherwise, the computation is started asynchronously and an exception 
+%  describing the state of the computation will be thrown.
+%  ==
+%  * dml_error(10, _{status:already_waiting, position:n})
+%     the goal was previously added and is now waiting at position n in the queue.
+%  * dml_error(11, _{status:already_running, progress:Progress})
+%     the goal was previously added and is currently running, with some progress information.
+%  * dml_error(12, _{status:initiate, position:N})
+%     Means the goal has been added to the work queue of the thread pool at position N.
+%  ==
+%  Options are options passed to control interaction with async_memo.
+
+mem(Spec,Opts,CID,Result) :-
+   option(vamp_on_demand(V), Opts, false),
+   async_memo(vis_cla,cla_memo(Spec,CID,Result),Status,
+              [ progress_levels([elapsed,summary,partial_result]),
+                globals([vamp_on_demand-V])|Opts ]),
+   (  Status=done(_-ok) -> true
+   ;  status_response(Status,Code,Dict), 
+      (  Status=done(_) -> Dict1=Dict
+      ;  estimate_run_time(Spec,CID,ERT),
+         put_dict(ert,Dict,ERT,Dict1)
+      ),
+      throw(dml_error(Code,Dict1))
+   ).
+
+% very crude estimate
+estimate_run_time(Spec,CID0,ERT) :-
+   findall(Size-Dur, (  browse(cla_memo(Spec,CID,_),comp(_,_,Dur)-ok),
+                       dataset_size(CID,Size)), Pairs),
+   length(Pairs,N),
+   (  N=0 -> ERT is -1
+   ;  maplist(computations:pair,Sizes,Durs,Pairs),
+      sumlist(Sizes,TotalSize),
+      sumlist(Durs,TotalDur),
+      dataset_size(CID0,Size), 
+      ERT is TotalDur*Size/TotalSize
+   ).
+
+status_response(spawned(ID,Pos), 12, Info) :-
+   Info = _{status:initiated, id:ID, position:Pos }.
+status_response(waiting(ID,T,Pos), 10, Info) :-
+   Info = _{status:already_waiting, id:ID, submit_time:TS, position:Pos },
+   time_to_string(T,TS).
+status_response(running(ID,TStart,_,nothing), 11, Info) :-
+   Info = _{ id:ID, status:already_running, start_time:TS },
+   time_to_string(TStart,TS).
+status_response(running(ID,TStart,_,just(Time-[Progress,Partial])), 11, Info) :-
+   maplist(progress_json,Progress,Progs),
+   time_to_string(TStart,TS),
+   Elapsed is Time-TStart,
+   (  member(stepwise(_,Done/Total),Progress), Done>0
+   -> ETA is Elapsed*(Total-Done)/Done
+   ;  ETA is -1
+   ),
+   Info1 = _{ id:ID
+            , status:already_running
+            , start_time:TS
+            , elapsed_time:Elapsed
+            , progress:Progs
+            , eta:ETA
+            },
+   (  Partial=just(R) 
+   -> put_dict(partial_result,Info1,R,Info)
+   ;  Info=Info1
+   ).
+
+status_response(recomputing(ID,Pos,Meta), 13, Info) :-
+   Info = _{status:recomputing, id:ID, position:Pos, meta:MD}, 
+   meta_dict(Meta,MD).
+status_response(done(Meta), 14, _{status:failed, meta:MD}) :- 
+   meta_dict(Meta,MD).
+
+meta_dict(comp(_,Time,Dur)-Result, _{ date: Date, duration:Dur, reason:Reason}) :-
+   format_result(Result,Reason),
+   time_to_string(Time,Date).
+
+format_result(fail,'Unspecified failure').
+format_result(ex(Ex),Description) :- message_to_string(Ex,Description).
+
+time_to_string(Time,String) :- format_time(string(String),'%FT%T%:z',Time).
+
+progress_json(A,A) :- atomic(A), !.
+progress_json(stepwise(Desc,Done/Total), _{ task:Task, total:Total, done:Done }) :- !,
+   progress_json(Desc,Task).
+progress_json(T,A) :- message_to_string(T,A).
+
+prolog:message(map_fold(_Mapper,_Folder)) --> ['Map-fold'].
+
+:- multifile thread_pool:create_pool/1.
+thread_pool:create_pool(vis_cla) :-
+   current_prolog_flag(cpu_count,N),
+   thread_pool_create(vis_cla, N, [backlog(20)]).
+
+% ------------ computations with progress ------------------
+
+map_reduce_dataset(Mapper,Reducer,CID,Result) :-
+   dataset_map_fold_reduce(CID,Mapper,append_dl,with_dl(Reducer),H-H,Result).
+
+append_dl(HH-TT,H-HH,H-TT).
+with_dl(P,H-[],A) :- call(P,H,A).
+with_dl(P,H-[],A,B) :- call(P,H,A,B).
+
+dataset_map_fold_reduce(CID,Mapper,Folder,Reducer,S0,Result) :-
+   dataset_items(CID,Items), 
+   with_cont( 'Map-fold-reduce',
+              map_fold_with_progress( safe_call(Mapper),
+                                      safe_fold(Folder),
+                                      Items, s(0,F-F,E-E,S0)),
+              reduce_cont(Reducer), Result).
+
+reduce_cont(Reducer,s(NOk,Failed-[],Erroneous-[],S), R) :-
+   (  NOk>0
+   -> simple_task(reducing(Reducer),call(Reducer,S,R1)),
+      put_coverage(NOk,Failed,Erroneous,R1,R)
+   ;  put_coverage(NOk,Failed,Erroneous,_{status:'no successfully mapped items'},D),
+      throw(dml_error(20, D))
+   ).
+
+put_coverage(NOk,Failed,Erroneous,R1,R) :-
+   put_dict(coverage,R1,_{ok_count:NOk, failed:Failed, errors:Erroneous},R).
+
+safe_call(Mapper,X,Z) :-
+   (  catch((call(Mapper,X,Y), Z=ok(X,Y)), Ex, 
+            (  Ex=abort(_) -> throw(Ex)
+            ;  Z=error(X,Ex))), !
+   ;  Z=fail(X)
+   ).
+
+safe_fold(Folder,Items,s(NOk1,FH-FT1,EH-ET1,S1),s(NOk2,FH-FT2,EH-ET2,S2)) :-
+   seqmap(partition,Items,s(NOk1,OkH,FT1,ET1),s(NOk2,OkT,FT2,ET2)),
+   call(Folder,OkH-OkT,S1,S2).
+
+partition(ok(_,X),s(N,[X|O],F,E),s(M,O,F,E)) :- M is N+1.
+partition(fail(X),s(N,O,[X|F],E),s(N,O,F,E)).
+partition(error(X,Ex),s(N,O,F,[_{item:X, error:Msg}|E]),s(N,O,F,E)) :- message_to_string(Ex,Msg).
+