Chris@0: iterable = iter_for($iterable); Chris@0: Chris@0: if (isset($config['concurrency'])) { Chris@0: $this->concurrency = $config['concurrency']; Chris@0: } Chris@0: Chris@0: if (isset($config['fulfilled'])) { Chris@0: $this->onFulfilled = $config['fulfilled']; Chris@0: } Chris@0: Chris@0: if (isset($config['rejected'])) { Chris@0: $this->onRejected = $config['rejected']; Chris@0: } Chris@0: } Chris@0: Chris@0: public function promise() Chris@0: { Chris@0: if ($this->aggregate) { Chris@0: return $this->aggregate; Chris@0: } Chris@0: Chris@0: try { Chris@0: $this->createPromise(); Chris@0: $this->iterable->rewind(); Chris@0: $this->refillPending(); Chris@0: } catch (\Throwable $e) { Chris@0: $this->aggregate->reject($e); Chris@0: } catch (\Exception $e) { Chris@0: $this->aggregate->reject($e); Chris@0: } Chris@0: Chris@0: return $this->aggregate; Chris@0: } Chris@0: Chris@0: private function createPromise() Chris@0: { Chris@0: $this->mutex = false; Chris@0: $this->aggregate = new Promise(function () { Chris@0: reset($this->pending); Chris@0: if (empty($this->pending) && !$this->iterable->valid()) { Chris@0: $this->aggregate->resolve(null); Chris@0: return; Chris@0: } Chris@0: Chris@0: // Consume a potentially fluctuating list of promises while Chris@0: // ensuring that indexes are maintained (precluding array_shift). Chris@0: while ($promise = current($this->pending)) { Chris@0: next($this->pending); Chris@0: $promise->wait(); Chris@0: if ($this->aggregate->getState() !== PromiseInterface::PENDING) { Chris@0: return; Chris@0: } Chris@0: } Chris@0: }); Chris@0: Chris@0: // Clear the references when the promise is resolved. Chris@0: $clearFn = function () { Chris@0: $this->iterable = $this->concurrency = $this->pending = null; Chris@0: $this->onFulfilled = $this->onRejected = null; Chris@0: }; Chris@0: Chris@0: $this->aggregate->then($clearFn, $clearFn); Chris@0: } Chris@0: Chris@0: private function refillPending() Chris@0: { Chris@0: if (!$this->concurrency) { Chris@0: // Add all pending promises. Chris@0: while ($this->addPending() && $this->advanceIterator()); Chris@0: return; Chris@0: } Chris@0: Chris@0: // Add only up to N pending promises. Chris@0: $concurrency = is_callable($this->concurrency) Chris@0: ? call_user_func($this->concurrency, count($this->pending)) Chris@0: : $this->concurrency; Chris@0: $concurrency = max($concurrency - count($this->pending), 0); Chris@0: // Concurrency may be set to 0 to disallow new promises. Chris@0: if (!$concurrency) { Chris@0: return; Chris@0: } Chris@0: // Add the first pending promise. Chris@0: $this->addPending(); Chris@0: // Note this is special handling for concurrency=1 so that we do Chris@0: // not advance the iterator after adding the first promise. This Chris@0: // helps work around issues with generators that might not have the Chris@0: // next value to yield until promise callbacks are called. Chris@0: while (--$concurrency Chris@0: && $this->advanceIterator() Chris@0: && $this->addPending()); Chris@0: } Chris@0: Chris@0: private function addPending() Chris@0: { Chris@0: if (!$this->iterable || !$this->iterable->valid()) { Chris@0: return false; Chris@0: } Chris@0: Chris@0: $promise = promise_for($this->iterable->current()); Chris@0: $idx = $this->iterable->key(); Chris@0: Chris@0: $this->pending[$idx] = $promise->then( Chris@0: function ($value) use ($idx) { Chris@0: if ($this->onFulfilled) { Chris@0: call_user_func( Chris@0: $this->onFulfilled, $value, $idx, $this->aggregate Chris@0: ); Chris@0: } Chris@0: $this->step($idx); Chris@0: }, Chris@0: function ($reason) use ($idx) { Chris@0: if ($this->onRejected) { Chris@0: call_user_func( Chris@0: $this->onRejected, $reason, $idx, $this->aggregate Chris@0: ); Chris@0: } Chris@0: $this->step($idx); Chris@0: } Chris@0: ); Chris@0: Chris@0: return true; Chris@0: } Chris@0: Chris@0: private function advanceIterator() Chris@0: { Chris@0: // Place a lock on the iterator so that we ensure to not recurse, Chris@0: // preventing fatal generator errors. Chris@0: if ($this->mutex) { Chris@0: return false; Chris@0: } Chris@0: Chris@0: $this->mutex = true; Chris@0: Chris@0: try { Chris@0: $this->iterable->next(); Chris@0: $this->mutex = false; Chris@0: return true; Chris@0: } catch (\Throwable $e) { Chris@0: $this->aggregate->reject($e); Chris@0: $this->mutex = false; Chris@0: return false; Chris@0: } catch (\Exception $e) { Chris@0: $this->aggregate->reject($e); Chris@0: $this->mutex = false; Chris@0: return false; Chris@0: } Chris@0: } Chris@0: Chris@0: private function step($idx) Chris@0: { Chris@0: // If the promise was already resolved, then ignore this step. Chris@0: if ($this->aggregate->getState() !== PromiseInterface::PENDING) { Chris@0: return; Chris@0: } Chris@0: Chris@0: unset($this->pending[$idx]); Chris@0: Chris@0: // Only refill pending promises if we are not locked, preventing the Chris@0: // EachPromise to recursively invoke the provided iterator, which Chris@0: // cause a fatal error: "Cannot resume an already running generator" Chris@0: if ($this->advanceIterator() && !$this->checkIfFinished()) { Chris@0: // Add more pending promises if possible. Chris@0: $this->refillPending(); Chris@0: } Chris@0: } Chris@0: Chris@0: private function checkIfFinished() Chris@0: { Chris@0: if (!$this->pending && !$this->iterable->valid()) { Chris@0: // Resolve the promise if there's nothing left to do. Chris@0: $this->aggregate->resolve(null); Chris@0: return true; Chris@0: } Chris@0: Chris@0: return false; Chris@0: } Chris@0: }