comparison src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 243:7106cdd59e62

Combine the features as they come in. On every percentage point change, emit a StreamingResponse with the collected features so far, enabling progress reporting for the client.
author Lucas Thompson <dev@lucas.im>
date Wed, 26 Apr 2017 16:16:48 +0100
parents 53ea6406d601
children 4224929943bc
comparison
equal deleted inserted replaced
242:ca5fcdfa0851 243:7106cdd59e62
5 import {PiperVampService, ListRequest, ListResponse} from 'piper'; 5 import {PiperVampService, ListRequest, ListResponse} from 'piper';
6 import { 6 import {
7 SimpleRequest 7 SimpleRequest
8 } from 'piper/HigherLevelUtilities'; 8 } from 'piper/HigherLevelUtilities';
9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule'; 9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
10 import {AvailableLibraries} from './feature-extraction.service'; 10 import {
11 AvailableLibraries
12 } from './feature-extraction.service';
11 import { 13 import {
12 DedicatedWorkerGlobalScope, 14 DedicatedWorkerGlobalScope,
13 WebWorkerStreamingServer 15 WebWorkerStreamingServer
14 } from 'piper/servers/WebWorkerStreamingServer'; 16 } from 'piper/servers/WebWorkerStreamingServer';
15 import { 17 import {
17 StreamingResponse, 19 StreamingResponse,
18 StreamingService 20 StreamingService
19 } from 'piper/StreamingService'; 21 } from 'piper/StreamingService';
20 import {Observable} from 'rxjs/Observable'; 22 import {Observable} from 'rxjs/Observable';
21 import {EmscriptenModule} from 'piper/PiperVampService'; 23 import {EmscriptenModule} from 'piper/PiperVampService';
24 import {streamingResponseReducer} from './FeatureReducers';
22 25
23 interface MessageEvent { 26 interface MessageEvent {
24 readonly data: any; 27 readonly data: any;
25 } 28 }
26 29
65 return this.services.has(key) ? 68 return this.services.has(key) ?
66 this.services.get(key).collect(request) : Observable.throw('Invalid key'); 69 this.services.get(key).collect(request) : Observable.throw('Invalid key');
67 } 70 }
68 } 71 }
69 72
73 class ReducingAggregateService extends AggregateStreamingService {
74 constructor() {
75 super();
76 }
77
78 collect(request: SimpleRequest): Observable<StreamingResponse> {
79 let lastPercentagePoint = 0;
80 return super.collect(request)
81 .scan(streamingResponseReducer)
82 .filter(val => {
83 const percentage =
84 100 * (val.processedBlockCount / val.totalBlockCount) | 0;
85 const pointDifference = (percentage - lastPercentagePoint);
86 if (pointDifference === 1 || percentage === 100) {
87 lastPercentagePoint = percentage;
88 return true;
89 } else {
90 return false;
91 }
92 });
93 }
94 }
95
70 export default class FeatureExtractionWorker { 96 export default class FeatureExtractionWorker {
71 private workerScope: DedicatedWorkerGlobalScope; 97 private workerScope: DedicatedWorkerGlobalScope;
72 private remoteLibraries: Map<LibraryKey, LibraryUri>; 98 private remoteLibraries: Map<LibraryKey, LibraryUri>;
73 private server: WebWorkerStreamingServer; 99 private server: WebWorkerStreamingServer;
74 private service: AggregateStreamingService; 100 private service: AggregateStreamingService;
75 101
76 constructor(workerScope: DedicatedWorkerGlobalScope, 102 constructor(workerScope: DedicatedWorkerGlobalScope,
77 private requireJs: RequireJs) { 103 private requireJs: RequireJs) {
78 this.workerScope = workerScope; 104 this.workerScope = workerScope;
79 this.remoteLibraries = new Map<LibraryKey, LibraryUri>(); 105 this.remoteLibraries = new Map<LibraryKey, LibraryUri>();
80 this.service = new AggregateStreamingService(); 106 this.service = new ReducingAggregateService();
81 this.setupImportLibraryListener(); 107 this.setupImportLibraryListener();
82 this.server = new WebWorkerStreamingServer( 108 this.server = new WebWorkerStreamingServer(
83 this.workerScope, 109 this.workerScope,
84 this.service 110 this.service
85 ); 111 );