# HG changeset patch # User Lucas Thompson # Date 1494574098 -3600 # Node ID 75a234459d3b8c7f4cf7854b25ee404031b06d05 # Parent 5527c0f8205921b74b974a808e61c00afe14e6f3 Fix for changes to streaming api in piper-js i.e. collect on the client diff -r 5527c0f82059 -r 75a234459d3b src/app/services/feature-extraction/FeatureExtractionWorker.ts --- a/src/app/services/feature-extraction/FeatureExtractionWorker.ts Fri May 12 08:26:18 2017 +0100 +++ b/src/app/services/feature-extraction/FeatureExtractionWorker.ts Fri May 12 08:28:18 2017 +0100 @@ -63,11 +63,7 @@ return this.dispatch('process', request); } - collect(request: SimpleRequest): Observable { - return this.dispatch('collect', request); - } - - protected dispatch(method: 'process' | 'collect', + protected dispatch(method: 'process', request: SimpleRequest): Observable { const key = request.key.split(':')[0]; return this.services.has(key) ? @@ -80,19 +76,27 @@ super(); } - protected dispatch(method: 'process' | 'collect', + protected dispatch(method: 'process', request: SimpleRequest): Observable { let lastPercentagePoint = 0; + let shouldClear = false; return super.dispatch(method, request) - .scan(streamingResponseReducer) + .scan((acc, value) => { + if (shouldClear) { + acc.features = []; + } + return streamingResponseReducer(acc, value); + }) .filter(val => { + const progress = val.progress; const percentage = - 100 * (val.processedBlockCount / val.totalBlockCount) | 0; + 100 * (progress.processedBlockCount / progress.totalBlockCount) | 0; const pointDifference = (percentage - lastPercentagePoint); const shouldEmit = pointDifference === 1 || percentage === 100; if (shouldEmit) { lastPercentagePoint = percentage; } + shouldClear = shouldEmit; return shouldEmit; }); } diff -r 5527c0f82059 -r 75a234459d3b src/app/services/feature-extraction/feature-extraction.service.ts --- a/src/app/services/feature-extraction/feature-extraction.service.ts Fri May 12 08:26:18 2017 +0100 +++ b/src/app/services/feature-extraction/feature-extraction.service.ts Fri May 12 08:28:18 2017 +0100 @@ -14,6 +14,7 @@ WebWorkerStreamingClient } from 'piper/client-stubs/WebWorkerStreamingClient'; import {RequestId} from 'piper/protocols/WebWorkerProtocol'; +import {collect, StreamingConfiguration} from "piper/StreamingService"; type RepoUri = string; export interface AvailableLibraries { @@ -66,19 +67,24 @@ } extract(analysisItemId: string, request: SimpleRequest): Promise { - return this.client.collect(request) - .do(val => { - if (val.totalBlockCount > 0) { - this.progressUpdated.next({ - id: analysisItemId, - value: (val.processedBlockCount / val.totalBlockCount) * 100 - }); - } - }) - .toPromise() - .then((response) => { - this.featuresExtracted.next(response); + let config: StreamingConfiguration; + return collect(this.client.process(request), val => { + if (val.configuration) { + config = val.configuration; + } + const progress = val.progress; + if (progress.totalBlockCount > 0) { + this.progressUpdated.next({ + id: analysisItemId, + value: (progress.processedBlockCount / progress.totalBlockCount) * 100 + }); + } + }).then(features => { + this.featuresExtracted.next({ + features: features, + outputDescriptor: config.outputDescriptor }); + }); } updateAvailableLibraries(): Observable {