view src/app/services/feature-extraction/FeatureExtractionWorker.ts @ 250:c60b03098bae

Rename to indicate not all responses are sent.
author Lucas Thompson <dev@lucas.im>
date Thu, 27 Apr 2017 10:37:26 +0100
parents 55be5d2e96f6
children 75a234459d3b
line wrap: on
line source
/**
 * Created by lucas on 01/12/2016.
 */

import {PiperVampService, ListRequest, ListResponse} from 'piper';
import {
  SimpleRequest
} from 'piper/HigherLevelUtilities';
import { VampExamplePlugins } from 'piper/ext/VampExamplePluginsModule';
import {
  AvailableLibraries
} from './feature-extraction.service';
import {
  DedicatedWorkerGlobalScope,
  WebWorkerStreamingServer
} from 'piper/servers/WebWorkerStreamingServer';
import {
  PiperStreamingService,
  StreamingResponse,
  StreamingService
} from 'piper/StreamingService';
import {Observable} from 'rxjs/Observable';
import {EmscriptenModule} from 'piper/PiperVampService';
import {streamingResponseReducer} from './FeatureReducers';

interface MessageEvent {
  readonly data: any;
}

type LibraryUri = string;
type LibraryKey = string;

type RequireJs = (libs: string[], callback: (...libs: any[]) => void) => void;

class AggregateStreamingService implements StreamingService {
  private services: Map<LibraryKey, PiperStreamingService>;

  constructor() {
    this.services = new Map<LibraryKey, PiperStreamingService>();
    this.services.set(
      'vamp-example-plugins',
      new PiperStreamingService(new PiperVampService(VampExamplePlugins()))
    );
  }

  addService(key: LibraryKey, service: PiperStreamingService): void {
    this.services.set(key, service);
  }

  list(request: ListRequest): Promise<ListResponse> {
    return Promise.all(
      [...this.services.values()].map(client => client.list({}))
    ).then(allAvailable => ({
        available: allAvailable.reduce(
          (all, current) => all.concat(current.available),
          []
        )
      })
    );
  }

  process(request: SimpleRequest): Observable<StreamingResponse> {
    return this.dispatch('process', request);
  }

  collect(request: SimpleRequest): Observable<StreamingResponse> {
    return this.dispatch('collect', request);
  }

  protected dispatch(method: 'process' | 'collect',
                     request: SimpleRequest): Observable<StreamingResponse> {
    const key = request.key.split(':')[0];
    return this.services.has(key) ?
      this.services.get(key)[method](request) : Observable.throw('Invalid key');
  }
}

class ThrottledReducingAggregateService extends AggregateStreamingService {
  constructor() {
    super();
  }

  protected dispatch(method: 'process' | 'collect',
                     request: SimpleRequest): Observable<StreamingResponse> {
    let lastPercentagePoint = 0;
    return super.dispatch(method, request)
      .scan(streamingResponseReducer)
      .filter(val => {
        const percentage =
          100 * (val.processedBlockCount / val.totalBlockCount) | 0;
        const pointDifference = (percentage - lastPercentagePoint);
        const shouldEmit = pointDifference === 1 || percentage === 100;
        if (shouldEmit) {
          lastPercentagePoint = percentage;
        }
        return shouldEmit;
      });
  }
}

export default class FeatureExtractionWorker {
  private workerScope: DedicatedWorkerGlobalScope;
  private remoteLibraries: Map<LibraryKey, LibraryUri>;
  private server: WebWorkerStreamingServer;
  private service: AggregateStreamingService;

  constructor(workerScope: DedicatedWorkerGlobalScope,
              private requireJs: RequireJs) {
    this.workerScope = workerScope;
    this.remoteLibraries = new Map<LibraryKey, LibraryUri>();
    this.service = new ThrottledReducingAggregateService();
    this.setupImportLibraryListener();
    this.server = new WebWorkerStreamingServer(
      this.workerScope,
      this.service
    );
  }

  private setupImportLibraryListener(): void {

    this.workerScope.onmessage = (ev: MessageEvent) => {
      const sendResponse = (result) => {
        this.workerScope.postMessage({
          method: ev.data.method,
          result: result
        });
      };
      switch (ev.data.method) {
        case 'import':
          const key: LibraryKey = ev.data.params;
          if (this.remoteLibraries.has(key)) {
            this.requireJs([this.remoteLibraries.get(key)], (plugin) => {
              // TODO a factory with more logic probably belongs in piper-js
              const lib: any | EmscriptenModule = plugin.createLibrary();
              const isEmscriptenModule = typeof lib.cwrap === 'function';
              const service = new PiperStreamingService(
                isEmscriptenModule ? new PiperVampService(lib) : lib // TODO
              );
              this.service.addService(key, service);
              this.service.list({}).then(sendResponse);
            });
          } else {
            console.error('Non registered library key.'); // TODO handle error
          }
          break;
        case 'addRemoteLibraries': // TODO rename
          const available: AvailableLibraries = ev.data.params;
          Object.keys(available).forEach(libraryKey => {
            this.remoteLibraries.set(libraryKey, available[libraryKey]);
          });
      }
    };
  }
}