comparison src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 305:75a234459d3b

Fix for changes to streaming api in piper-js i.e. collect on the client
author Lucas Thompson <dev@lucas.im>
date Fri, 12 May 2017 08:28:18 +0100
parents c60b03098bae
children a16d968d646e
comparison
equal deleted inserted replaced
304:5527c0f82059 305:75a234459d3b
61 61
62 process(request: SimpleRequest): Observable<StreamingResponse> { 62 process(request: SimpleRequest): Observable<StreamingResponse> {
63 return this.dispatch('process', request); 63 return this.dispatch('process', request);
64 } 64 }
65 65
66 collect(request: SimpleRequest): Observable<StreamingResponse> { 66 protected dispatch(method: 'process',
67 return this.dispatch('collect', request);
68 }
69
70 protected dispatch(method: 'process' | 'collect',
71 request: SimpleRequest): Observable<StreamingResponse> { 67 request: SimpleRequest): Observable<StreamingResponse> {
72 const key = request.key.split(':')[0]; 68 const key = request.key.split(':')[0];
73 return this.services.has(key) ? 69 return this.services.has(key) ?
74 this.services.get(key)[method](request) : Observable.throw('Invalid key'); 70 this.services.get(key)[method](request) : Observable.throw('Invalid key');
75 } 71 }
78 class ThrottledReducingAggregateService extends AggregateStreamingService { 74 class ThrottledReducingAggregateService extends AggregateStreamingService {
79 constructor() { 75 constructor() {
80 super(); 76 super();
81 } 77 }
82 78
83 protected dispatch(method: 'process' | 'collect', 79 protected dispatch(method: 'process',
84 request: SimpleRequest): Observable<StreamingResponse> { 80 request: SimpleRequest): Observable<StreamingResponse> {
85 let lastPercentagePoint = 0; 81 let lastPercentagePoint = 0;
82 let shouldClear = false;
86 return super.dispatch(method, request) 83 return super.dispatch(method, request)
87 .scan(streamingResponseReducer) 84 .scan((acc, value) => {
85 if (shouldClear) {
86 acc.features = [];
87 }
88 return streamingResponseReducer(acc, value);
89 })
88 .filter(val => { 90 .filter(val => {
91 const progress = val.progress;
89 const percentage = 92 const percentage =
90 100 * (val.processedBlockCount / val.totalBlockCount) | 0; 93 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0;
91 const pointDifference = (percentage - lastPercentagePoint); 94 const pointDifference = (percentage - lastPercentagePoint);
92 const shouldEmit = pointDifference === 1 || percentage === 100; 95 const shouldEmit = pointDifference === 1 || percentage === 100;
93 if (shouldEmit) { 96 if (shouldEmit) {
94 lastPercentagePoint = percentage; 97 lastPercentagePoint = percentage;
95 } 98 }
99 shouldClear = shouldEmit;
96 return shouldEmit; 100 return shouldEmit;
97 }); 101 });
98 } 102 }
99 } 103 }
100 104