Mercurial > hg > ugly-duckling
comparison src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 243:7106cdd59e62
Combine the features as they come in. On every percentage point change, emit a StreamingResponse with the collected features so far, enabling progress reporting for the client.
author | Lucas Thompson <dev@lucas.im> |
---|---|
date | Wed, 26 Apr 2017 16:16:48 +0100 |
parents | 53ea6406d601 |
children | 4224929943bc |
comparison
equal
deleted
inserted
replaced
242:ca5fcdfa0851 | 243:7106cdd59e62 |
---|---|
5 import {PiperVampService, ListRequest, ListResponse} from 'piper'; | 5 import {PiperVampService, ListRequest, ListResponse} from 'piper'; |
6 import { | 6 import { |
7 SimpleRequest | 7 SimpleRequest |
8 } from 'piper/HigherLevelUtilities'; | 8 } from 'piper/HigherLevelUtilities'; |
9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule'; | 9 import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule'; |
10 import {AvailableLibraries} from './feature-extraction.service'; | 10 import { |
11 AvailableLibraries | |
12 } from './feature-extraction.service'; | |
11 import { | 13 import { |
12 DedicatedWorkerGlobalScope, | 14 DedicatedWorkerGlobalScope, |
13 WebWorkerStreamingServer | 15 WebWorkerStreamingServer |
14 } from 'piper/servers/WebWorkerStreamingServer'; | 16 } from 'piper/servers/WebWorkerStreamingServer'; |
15 import { | 17 import { |
17 StreamingResponse, | 19 StreamingResponse, |
18 StreamingService | 20 StreamingService |
19 } from 'piper/StreamingService'; | 21 } from 'piper/StreamingService'; |
20 import {Observable} from 'rxjs/Observable'; | 22 import {Observable} from 'rxjs/Observable'; |
21 import {EmscriptenModule} from 'piper/PiperVampService'; | 23 import {EmscriptenModule} from 'piper/PiperVampService'; |
24 import {streamingResponseReducer} from './FeatureReducers'; | |
22 | 25 |
23 interface MessageEvent { | 26 interface MessageEvent { |
24 readonly data: any; | 27 readonly data: any; |
25 } | 28 } |
26 | 29 |
65 return this.services.has(key) ? | 68 return this.services.has(key) ? |
66 this.services.get(key).collect(request) : Observable.throw('Invalid key'); | 69 this.services.get(key).collect(request) : Observable.throw('Invalid key'); |
67 } | 70 } |
68 } | 71 } |
69 | 72 |
73 class ReducingAggregateService extends AggregateStreamingService { | |
74 constructor() { | |
75 super(); | |
76 } | |
77 | |
78 collect(request: SimpleRequest): Observable<StreamingResponse> { | |
79 let lastPercentagePoint = 0; | |
80 return super.collect(request) | |
81 .scan(streamingResponseReducer) | |
82 .filter(val => { | |
83 const percentage = | |
84 100 * (val.processedBlockCount / val.totalBlockCount) | 0; | |
85 const pointDifference = (percentage - lastPercentagePoint); | |
86 if (pointDifference === 1 || percentage === 100) { | |
87 lastPercentagePoint = percentage; | |
88 return true; | |
89 } else { | |
90 return false; | |
91 } | |
92 }); | |
93 } | |
94 } | |
95 | |
70 export default class FeatureExtractionWorker { | 96 export default class FeatureExtractionWorker { |
71 private workerScope: DedicatedWorkerGlobalScope; | 97 private workerScope: DedicatedWorkerGlobalScope; |
72 private remoteLibraries: Map<LibraryKey, LibraryUri>; | 98 private remoteLibraries: Map<LibraryKey, LibraryUri>; |
73 private server: WebWorkerStreamingServer; | 99 private server: WebWorkerStreamingServer; |
74 private service: AggregateStreamingService; | 100 private service: AggregateStreamingService; |
75 | 101 |
76 constructor(workerScope: DedicatedWorkerGlobalScope, | 102 constructor(workerScope: DedicatedWorkerGlobalScope, |
77 private requireJs: RequireJs) { | 103 private requireJs: RequireJs) { |
78 this.workerScope = workerScope; | 104 this.workerScope = workerScope; |
79 this.remoteLibraries = new Map<LibraryKey, LibraryUri>(); | 105 this.remoteLibraries = new Map<LibraryKey, LibraryUri>(); |
80 this.service = new AggregateStreamingService(); | 106 this.service = new ReducingAggregateService(); |
81 this.setupImportLibraryListener(); | 107 this.setupImportLibraryListener(); |
82 this.server = new WebWorkerStreamingServer( | 108 this.server = new WebWorkerStreamingServer( |
83 this.workerScope, | 109 this.workerScope, |
84 this.service | 110 this.service |
85 ); | 111 ); |