annotate src/app/services/feature-extraction/feature-extraction.service.ts @ 305:75a234459d3b

Fix for changes to streaming api in piper-js i.e. collect on the client
author Lucas Thompson <dev@lucas.im>
date Fri, 12 May 2017 08:28:18 +0100
parents 7a6ef711c73a
children 98490d0ceb77
rev   line source
dev@74 1 import {Injectable, Inject} from '@angular/core';
dev@47 2 import {
dev@226 3 ListResponse
dev@236 4 } from 'piper';
dev@64 5 import {
dev@226 6 SimpleRequest,
dev@226 7 SimpleResponse
dev@236 8 } from 'piper/HigherLevelUtilities';
dev@236 9 import {Subject} from 'rxjs/Subject';
dev@236 10 import {Observable} from 'rxjs/Observable';
dev@236 11 import {Http, Response} from '@angular/http';
dev@226 12 import {
dev@226 13 countingIdProvider,
dev@226 14 WebWorkerStreamingClient
dev@236 15 } from 'piper/client-stubs/WebWorkerStreamingClient';
dev@236 16 import {RequestId} from 'piper/protocols/WebWorkerProtocol';
dev@305 17 import {collect, StreamingConfiguration} from "piper/StreamingService";
dev@40 18
dev@74 19 type RepoUri = string;
dev@74 20 export interface AvailableLibraries {
dev@74 21 [libraryKey: string]: RepoUri;
dev@74 22 }
dev@74 23
dev@226 24 export interface Progress {
dev@226 25 id: RequestId;
dev@226 26 value: number; // between 0 and 100, for material-ui
dev@226 27 }
dev@226 28
dev@40 29 @Injectable()
dev@40 30 export class FeatureExtractionService {
dev@40 31
dev@40 32 private worker: Worker;
dev@64 33 private featuresExtracted: Subject<SimpleResponse>;
dev@64 34 featuresExtracted$: Observable<SimpleResponse>;
dev@74 35 private librariesUpdated: Subject<ListResponse>;
dev@74 36 librariesUpdated$: Observable<ListResponse>;
dev@226 37 private progressUpdated: Subject<Progress>;
dev@226 38 progressUpdated$: Observable<Progress>;
dev@226 39 private client: WebWorkerStreamingClient;
dev@44 40
dev@236 41 constructor(private http: Http,
dev@236 42 @Inject('PiperRepoUri') private repositoryUri: RepoUri) {
dev@40 43 this.worker = new Worker('bootstrap-feature-extraction-worker.js');
dev@64 44 this.featuresExtracted = new Subject<SimpleResponse>();
dev@51 45 this.featuresExtracted$ = this.featuresExtracted.asObservable();
dev@74 46 this.librariesUpdated = new Subject<ListResponse>();
dev@74 47 this.librariesUpdated$ = this.librariesUpdated.asObservable();
dev@226 48 this.progressUpdated = new Subject<Progress>();
dev@226 49 this.progressUpdated$ = this.progressUpdated.asObservable();
dev@74 50 this.worker.addEventListener('message', (ev: MessageEvent) => {
dev@74 51 const isValidResponse = ev.data.method === 'import'
dev@226 52 && ev.data.result && ev.data.result.available ;
dev@74 53 if (isValidResponse) {
dev@226 54 (ev as Event).stopImmediatePropagation();
dev@74 55 this.librariesUpdated.next(ev.data.result);
dev@74 56 }
dev@226 57 }, true);
dev@226 58
dev@226 59 this.client = new WebWorkerStreamingClient(
dev@226 60 this.worker,
dev@226 61 countingIdProvider(0)
dev@236 62 );
dev@40 63 }
dev@40 64
dev@47 65 list(): Promise<ListResponse> {
dev@226 66 return this.client.list({});
dev@40 67 }
dev@40 68
dev@226 69 extract(analysisItemId: string, request: SimpleRequest): Promise<void> {
dev@305 70 let config: StreamingConfiguration;
dev@305 71 return collect(this.client.process(request), val => {
dev@305 72 if (val.configuration) {
dev@305 73 config = val.configuration;
dev@305 74 }
dev@305 75 const progress = val.progress;
dev@305 76 if (progress.totalBlockCount > 0) {
dev@305 77 this.progressUpdated.next({
dev@305 78 id: analysisItemId,
dev@305 79 value: (progress.processedBlockCount / progress.totalBlockCount) * 100
dev@305 80 });
dev@305 81 }
dev@305 82 }).then(features => {
dev@305 83 this.featuresExtracted.next({
dev@305 84 features: features,
dev@305 85 outputDescriptor: config.outputDescriptor
dev@226 86 });
dev@305 87 });
dev@62 88 }
dev@62 89
dev@74 90 updateAvailableLibraries(): Observable<AvailableLibraries> {
dev@74 91 return this.http.get(this.repositoryUri)
dev@74 92 .map(res => {
dev@74 93 const map = res.json();
dev@74 94 this.worker.postMessage({
dev@74 95 method: 'addRemoteLibraries',
dev@74 96 params: map
dev@74 97 });
dev@74 98 return map;
dev@74 99 })
dev@74 100 .catch((error: Response | any) => {
dev@74 101 console.error(error);
dev@74 102 return Observable.throw(error);
dev@74 103 });
dev@74 104 }
dev@74 105
dev@74 106 load(libraryKey: string): void {
dev@74 107 this.worker.postMessage({method: 'import', params: libraryKey});
dev@74 108 }
dev@40 109 }