dev@74
|
1 import {Injectable, Inject} from '@angular/core';
|
dev@47
|
2 import {
|
dev@226
|
3 ListResponse
|
dev@47
|
4 } from "piper";
|
dev@64
|
5 import {
|
dev@226
|
6 SimpleRequest,
|
dev@226
|
7 SimpleResponse
|
dev@64
|
8 } from "piper/HigherLevelUtilities";
|
dev@51
|
9 import {Subject} from "rxjs/Subject";
|
dev@51
|
10 import {Observable} from "rxjs";
|
dev@74
|
11 import {Http, Response} from "@angular/http";
|
dev@226
|
12 import {
|
dev@226
|
13 countingIdProvider,
|
dev@226
|
14 WebWorkerStreamingClient
|
dev@226
|
15 } from "piper/client-stubs/WebWorkerStreamingClient";
|
dev@226
|
16 import {RequestId} from "piper/protocols/WebWorkerProtocol";
|
dev@44
|
17
|
dev@47
|
18 interface RequestMessage<RequestType> {
|
dev@47
|
19 method: string;
|
dev@47
|
20 params: RequestType;
|
dev@47
|
21 }
|
dev@47
|
22
|
dev@47
|
23 interface ResponseMessage<ResponseType> {
|
dev@47
|
24 method: string;
|
dev@47
|
25 result: ResponseType;
|
dev@47
|
26 }
|
dev@40
|
27
|
dev@74
|
28 type RepoUri = string;
|
dev@74
|
29 export interface AvailableLibraries {
|
dev@74
|
30 [libraryKey: string]: RepoUri;
|
dev@74
|
31 }
|
dev@74
|
32
|
dev@226
|
33 export interface Progress {
|
dev@226
|
34 id: RequestId;
|
dev@226
|
35 value: number; // between 0 and 100, for material-ui
|
dev@226
|
36 }
|
dev@226
|
37
|
dev@40
|
38 @Injectable()
|
dev@40
|
39 export class FeatureExtractionService {
|
dev@40
|
40
|
dev@40
|
41 private worker: Worker;
|
dev@64
|
42 private featuresExtracted: Subject<SimpleResponse>;
|
dev@64
|
43 featuresExtracted$: Observable<SimpleResponse>;
|
dev@74
|
44 private librariesUpdated: Subject<ListResponse>;
|
dev@74
|
45 librariesUpdated$: Observable<ListResponse>;
|
dev@226
|
46 private progressUpdated: Subject<Progress>;
|
dev@226
|
47 progressUpdated$: Observable<Progress>;
|
dev@226
|
48 private client: WebWorkerStreamingClient;
|
dev@44
|
49
|
dev@74
|
50 constructor(private http: Http, @Inject('PiperRepoUri') private repositoryUri: RepoUri) {
|
dev@40
|
51 this.worker = new Worker('bootstrap-feature-extraction-worker.js');
|
dev@64
|
52 this.featuresExtracted = new Subject<SimpleResponse>();
|
dev@51
|
53 this.featuresExtracted$ = this.featuresExtracted.asObservable();
|
dev@74
|
54 this.librariesUpdated = new Subject<ListResponse>();
|
dev@74
|
55 this.librariesUpdated$ = this.librariesUpdated.asObservable();
|
dev@226
|
56 this.progressUpdated = new Subject<Progress>();
|
dev@226
|
57 this.progressUpdated$ = this.progressUpdated.asObservable();
|
dev@74
|
58 this.worker.addEventListener('message', (ev: MessageEvent) => {
|
dev@74
|
59 const isValidResponse = ev.data.method === 'import'
|
dev@226
|
60 && ev.data.result && ev.data.result.available ;
|
dev@74
|
61 if (isValidResponse) {
|
dev@226
|
62 (ev as Event).stopImmediatePropagation();
|
dev@74
|
63 this.librariesUpdated.next(ev.data.result);
|
dev@74
|
64 }
|
dev@226
|
65 }, true);
|
dev@226
|
66
|
dev@226
|
67 this.client = new WebWorkerStreamingClient(
|
dev@226
|
68 this.worker,
|
dev@226
|
69 countingIdProvider(0)
|
dev@226
|
70 )
|
dev@40
|
71 }
|
dev@40
|
72
|
dev@47
|
73 list(): Promise<ListResponse> {
|
dev@226
|
74 return this.client.list({});
|
dev@40
|
75 }
|
dev@40
|
76
|
dev@226
|
77 extract(analysisItemId: string, request: SimpleRequest): Promise<void> {
|
dev@226
|
78 const arrayReducer = (acc, val) => {
|
dev@226
|
79 acc.push.apply(acc, val);
|
dev@226
|
80 return acc;
|
dev@226
|
81 };
|
dev@226
|
82 const typedArrayReducer = (acc: Float32Array,
|
dev@226
|
83 val: Float32Array): Float32Array => {
|
dev@226
|
84 return Float32Array.of(...acc, ...val);
|
dev@226
|
85 };
|
dev@226
|
86 return this.client.collect(request)
|
dev@226
|
87 .do(val => {
|
dev@226
|
88 this.progressUpdated.next({
|
dev@226
|
89 id: analysisItemId,
|
dev@226
|
90 value: (val.processedBlockCount / val.totalBlockCount) * 100
|
dev@226
|
91 });
|
dev@226
|
92 })
|
dev@226
|
93 .reduce((acc, val) => {
|
dev@226
|
94 if (acc.features.data instanceof Array &&
|
dev@226
|
95 val.features.data instanceof Array) {
|
dev@226
|
96 acc.features.data = arrayReducer(
|
dev@226
|
97 acc.features.data,
|
dev@226
|
98 val.features.data
|
dev@226
|
99 );
|
dev@226
|
100 } else if (acc.features.data instanceof Float32Array &&
|
dev@226
|
101 val.features.data instanceof Float32Array) {
|
dev@226
|
102 acc.features.data = typedArrayReducer(
|
dev@226
|
103 acc.features.data,
|
dev@226
|
104 val.features.data
|
dev@226
|
105 );
|
dev@226
|
106 } else {
|
dev@226
|
107 throw "Invalid feature output. Aborting";
|
dev@226
|
108 }
|
dev@226
|
109 return acc;
|
dev@226
|
110 })
|
dev@226
|
111 .toPromise()
|
dev@226
|
112 .then((response) => {
|
dev@226
|
113 this.featuresExtracted.next(response);
|
dev@226
|
114 });
|
dev@62
|
115 }
|
dev@62
|
116
|
dev@74
|
117 updateAvailableLibraries(): Observable<AvailableLibraries> {
|
dev@74
|
118 return this.http.get(this.repositoryUri)
|
dev@74
|
119 .map(res => {
|
dev@74
|
120 const map = res.json();
|
dev@74
|
121 this.worker.postMessage({
|
dev@74
|
122 method: 'addRemoteLibraries',
|
dev@74
|
123 params: map
|
dev@74
|
124 });
|
dev@74
|
125 return map;
|
dev@74
|
126 })
|
dev@74
|
127 .catch((error: Response | any) => {
|
dev@74
|
128 console.error(error);
|
dev@74
|
129 return Observable.throw(error);
|
dev@74
|
130 });
|
dev@74
|
131 }
|
dev@74
|
132
|
dev@74
|
133 load(libraryKey: string): void {
|
dev@74
|
134 this.worker.postMessage({method: 'import', params: libraryKey});
|
dev@74
|
135 }
|
dev@40
|
136 }
|