Mercurial > hg > ugly-duckling
comparison src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 305:75a234459d3b
Fix for changes to streaming api in piper-js i.e. collect on the client
author | Lucas Thompson <dev@lucas.im> |
---|---|
date | Fri, 12 May 2017 08:28:18 +0100 |
parents | c60b03098bae |
children | a16d968d646e |
comparison
equal
deleted
inserted
replaced
304:5527c0f82059 | 305:75a234459d3b |
---|---|
61 | 61 |
62 process(request: SimpleRequest): Observable<StreamingResponse> { | 62 process(request: SimpleRequest): Observable<StreamingResponse> { |
63 return this.dispatch('process', request); | 63 return this.dispatch('process', request); |
64 } | 64 } |
65 | 65 |
66 collect(request: SimpleRequest): Observable<StreamingResponse> { | 66 protected dispatch(method: 'process', |
67 return this.dispatch('collect', request); | |
68 } | |
69 | |
70 protected dispatch(method: 'process' | 'collect', | |
71 request: SimpleRequest): Observable<StreamingResponse> { | 67 request: SimpleRequest): Observable<StreamingResponse> { |
72 const key = request.key.split(':')[0]; | 68 const key = request.key.split(':')[0]; |
73 return this.services.has(key) ? | 69 return this.services.has(key) ? |
74 this.services.get(key)[method](request) : Observable.throw('Invalid key'); | 70 this.services.get(key)[method](request) : Observable.throw('Invalid key'); |
75 } | 71 } |
78 class ThrottledReducingAggregateService extends AggregateStreamingService { | 74 class ThrottledReducingAggregateService extends AggregateStreamingService { |
79 constructor() { | 75 constructor() { |
80 super(); | 76 super(); |
81 } | 77 } |
82 | 78 |
83 protected dispatch(method: 'process' | 'collect', | 79 protected dispatch(method: 'process', |
84 request: SimpleRequest): Observable<StreamingResponse> { | 80 request: SimpleRequest): Observable<StreamingResponse> { |
85 let lastPercentagePoint = 0; | 81 let lastPercentagePoint = 0; |
82 let shouldClear = false; | |
86 return super.dispatch(method, request) | 83 return super.dispatch(method, request) |
87 .scan(streamingResponseReducer) | 84 .scan((acc, value) => { |
85 if (shouldClear) { | |
86 acc.features = []; | |
87 } | |
88 return streamingResponseReducer(acc, value); | |
89 }) | |
88 .filter(val => { | 90 .filter(val => { |
91 const progress = val.progress; | |
89 const percentage = | 92 const percentage = |
90 100 * (val.processedBlockCount / val.totalBlockCount) | 0; | 93 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0; |
91 const pointDifference = (percentage - lastPercentagePoint); | 94 const pointDifference = (percentage - lastPercentagePoint); |
92 const shouldEmit = pointDifference === 1 || percentage === 100; | 95 const shouldEmit = pointDifference === 1 || percentage === 100; |
93 if (shouldEmit) { | 96 if (shouldEmit) { |
94 lastPercentagePoint = percentage; | 97 lastPercentagePoint = percentage; |
95 } | 98 } |
99 shouldClear = shouldEmit; | |
96 return shouldEmit; | 100 return shouldEmit; |
97 }); | 101 }); |
98 } | 102 } |
99 } | 103 } |
100 | 104 |