Chris@0: Chris@0: * while ($eventLoop->isRunning()) { Chris@0: * GuzzleHttp\Promise\queue()->run(); Chris@0: * } Chris@0: * Chris@0: * Chris@0: * @param TaskQueueInterface $assign Optionally specify a new queue instance. Chris@0: * Chris@0: * @return TaskQueueInterface Chris@0: */ Chris@0: function queue(TaskQueueInterface $assign = null) Chris@0: { Chris@0: static $queue; Chris@0: Chris@0: if ($assign) { Chris@0: $queue = $assign; Chris@0: } elseif (!$queue) { Chris@0: $queue = new TaskQueue(); Chris@0: } Chris@0: Chris@0: return $queue; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Adds a function to run in the task queue when it is next `run()` and returns Chris@0: * a promise that is fulfilled or rejected with the result. Chris@0: * Chris@0: * @param callable $task Task function to run. Chris@0: * Chris@0: * @return PromiseInterface Chris@0: */ Chris@0: function task(callable $task) Chris@0: { Chris@0: $queue = queue(); Chris@0: $promise = new Promise([$queue, 'run']); Chris@0: $queue->add(function () use ($task, $promise) { Chris@0: try { Chris@0: $promise->resolve($task()); Chris@0: } catch (\Throwable $e) { Chris@0: $promise->reject($e); Chris@0: } catch (\Exception $e) { Chris@0: $promise->reject($e); Chris@0: } Chris@0: }); Chris@0: Chris@0: return $promise; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Creates a promise for a value if the value is not a promise. Chris@0: * Chris@0: * @param mixed $value Promise or value. Chris@0: * Chris@0: * @return PromiseInterface Chris@0: */ Chris@0: function promise_for($value) Chris@0: { Chris@0: if ($value instanceof PromiseInterface) { Chris@0: return $value; Chris@0: } Chris@0: Chris@0: // Return a Guzzle promise that shadows the given promise. Chris@0: if (method_exists($value, 'then')) { Chris@0: $wfn = method_exists($value, 'wait') ? [$value, 'wait'] : null; Chris@0: $cfn = method_exists($value, 'cancel') ? [$value, 'cancel'] : null; Chris@0: $promise = new Promise($wfn, $cfn); Chris@0: $value->then([$promise, 'resolve'], [$promise, 'reject']); Chris@0: return $promise; Chris@0: } Chris@0: Chris@0: return new FulfilledPromise($value); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Creates a rejected promise for a reason if the reason is not a promise. If Chris@0: * the provided reason is a promise, then it is returned as-is. Chris@0: * Chris@0: * @param mixed $reason Promise or reason. Chris@0: * Chris@0: * @return PromiseInterface Chris@0: */ Chris@0: function rejection_for($reason) Chris@0: { Chris@0: if ($reason instanceof PromiseInterface) { Chris@0: return $reason; Chris@0: } Chris@0: Chris@0: return new RejectedPromise($reason); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Create an exception for a rejected promise value. Chris@0: * Chris@0: * @param mixed $reason Chris@0: * Chris@0: * @return \Exception|\Throwable Chris@0: */ Chris@0: function exception_for($reason) Chris@0: { Chris@0: return $reason instanceof \Exception || $reason instanceof \Throwable Chris@0: ? $reason Chris@0: : new RejectionException($reason); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Returns an iterator for the given value. Chris@0: * Chris@0: * @param mixed $value Chris@0: * Chris@0: * @return \Iterator Chris@0: */ Chris@0: function iter_for($value) Chris@0: { Chris@0: if ($value instanceof \Iterator) { Chris@0: return $value; Chris@0: } elseif (is_array($value)) { Chris@0: return new \ArrayIterator($value); Chris@0: } else { Chris@0: return new \ArrayIterator([$value]); Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * Synchronously waits on a promise to resolve and returns an inspection state Chris@0: * array. Chris@0: * Chris@0: * Returns a state associative array containing a "state" key mapping to a Chris@0: * valid promise state. If the state of the promise is "fulfilled", the array Chris@0: * will contain a "value" key mapping to the fulfilled value of the promise. If Chris@0: * the promise is rejected, the array will contain a "reason" key mapping to Chris@0: * the rejection reason of the promise. Chris@0: * Chris@0: * @param PromiseInterface $promise Promise or value. Chris@0: * Chris@0: * @return array Chris@0: */ Chris@0: function inspect(PromiseInterface $promise) Chris@0: { Chris@0: try { Chris@0: return [ Chris@0: 'state' => PromiseInterface::FULFILLED, Chris@0: 'value' => $promise->wait() Chris@0: ]; Chris@0: } catch (RejectionException $e) { Chris@0: return ['state' => PromiseInterface::REJECTED, 'reason' => $e->getReason()]; Chris@0: } catch (\Throwable $e) { Chris@0: return ['state' => PromiseInterface::REJECTED, 'reason' => $e]; Chris@0: } catch (\Exception $e) { Chris@0: return ['state' => PromiseInterface::REJECTED, 'reason' => $e]; Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * Waits on all of the provided promises, but does not unwrap rejected promises Chris@0: * as thrown exception. Chris@0: * Chris@0: * Returns an array of inspection state arrays. Chris@0: * Chris@0: * @param PromiseInterface[] $promises Traversable of promises to wait upon. Chris@0: * Chris@0: * @return array Chris@0: * @see GuzzleHttp\Promise\inspect for the inspection state array format. Chris@0: */ Chris@0: function inspect_all($promises) Chris@0: { Chris@0: $results = []; Chris@0: foreach ($promises as $key => $promise) { Chris@0: $results[$key] = inspect($promise); Chris@0: } Chris@0: Chris@0: return $results; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Waits on all of the provided promises and returns the fulfilled values. Chris@0: * Chris@0: * Returns an array that contains the value of each promise (in the same order Chris@0: * the promises were provided). An exception is thrown if any of the promises Chris@0: * are rejected. Chris@0: * Chris@0: * @param mixed $promises Iterable of PromiseInterface objects to wait on. Chris@0: * Chris@0: * @return array Chris@0: * @throws \Exception on error Chris@0: * @throws \Throwable on error in PHP >=7 Chris@0: */ Chris@0: function unwrap($promises) Chris@0: { Chris@0: $results = []; Chris@0: foreach ($promises as $key => $promise) { Chris@0: $results[$key] = $promise->wait(); Chris@0: } Chris@0: Chris@0: return $results; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Given an array of promises, return a promise that is fulfilled when all the Chris@0: * items in the array are fulfilled. Chris@0: * Chris@0: * The promise's fulfillment value is an array with fulfillment values at Chris@0: * respective positions to the original array. If any promise in the array Chris@0: * rejects, the returned promise is rejected with the rejection reason. Chris@0: * Chris@0: * @param mixed $promises Promises or values. Chris@0: * Chris@0: * @return PromiseInterface Chris@0: */ Chris@0: function all($promises) Chris@0: { Chris@0: $results = []; Chris@0: return each( Chris@0: $promises, Chris@0: function ($value, $idx) use (&$results) { Chris@0: $results[$idx] = $value; Chris@0: }, Chris@0: function ($reason, $idx, Promise $aggregate) { Chris@0: $aggregate->reject($reason); Chris@0: } Chris@0: )->then(function () use (&$results) { Chris@0: ksort($results); Chris@0: return $results; Chris@0: }); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Initiate a competitive race between multiple promises or values (values will Chris@0: * become immediately fulfilled promises). Chris@0: * Chris@0: * When count amount of promises have been fulfilled, the returned promise is Chris@0: * fulfilled with an array that contains the fulfillment values of the winners Chris@0: * in order of resolution. Chris@0: * Chris@0: * This prommise is rejected with a {@see GuzzleHttp\Promise\AggregateException} Chris@0: * if the number of fulfilled promises is less than the desired $count. Chris@0: * Chris@0: * @param int $count Total number of promises. Chris@0: * @param mixed $promises Promises or values. Chris@0: * Chris@0: * @return PromiseInterface Chris@0: */ Chris@0: function some($count, $promises) Chris@0: { Chris@0: $results = []; Chris@0: $rejections = []; Chris@0: Chris@0: return each( Chris@0: $promises, Chris@0: function ($value, $idx, PromiseInterface $p) use (&$results, $count) { Chris@0: if ($p->getState() !== PromiseInterface::PENDING) { Chris@0: return; Chris@0: } Chris@0: $results[$idx] = $value; Chris@0: if (count($results) >= $count) { Chris@0: $p->resolve(null); Chris@0: } Chris@0: }, Chris@0: function ($reason) use (&$rejections) { Chris@0: $rejections[] = $reason; Chris@0: } Chris@0: )->then( Chris@0: function () use (&$results, &$rejections, $count) { Chris@0: if (count($results) !== $count) { Chris@0: throw new AggregateException( Chris@0: 'Not enough promises to fulfill count', Chris@0: $rejections Chris@0: ); Chris@0: } Chris@0: ksort($results); Chris@0: return array_values($results); Chris@0: } Chris@0: ); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Like some(), with 1 as count. However, if the promise fulfills, the Chris@0: * fulfillment value is not an array of 1 but the value directly. Chris@0: * Chris@0: * @param mixed $promises Promises or values. Chris@0: * Chris@0: * @return PromiseInterface Chris@0: */ Chris@0: function any($promises) Chris@0: { Chris@0: return some(1, $promises)->then(function ($values) { return $values[0]; }); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Returns a promise that is fulfilled when all of the provided promises have Chris@0: * been fulfilled or rejected. Chris@0: * Chris@0: * The returned promise is fulfilled with an array of inspection state arrays. Chris@0: * Chris@0: * @param mixed $promises Promises or values. Chris@0: * Chris@0: * @return PromiseInterface Chris@0: * @see GuzzleHttp\Promise\inspect for the inspection state array format. Chris@0: */ Chris@0: function settle($promises) Chris@0: { Chris@0: $results = []; Chris@0: Chris@0: return each( Chris@0: $promises, Chris@0: function ($value, $idx) use (&$results) { Chris@0: $results[$idx] = ['state' => PromiseInterface::FULFILLED, 'value' => $value]; Chris@0: }, Chris@0: function ($reason, $idx) use (&$results) { Chris@0: $results[$idx] = ['state' => PromiseInterface::REJECTED, 'reason' => $reason]; Chris@0: } Chris@0: )->then(function () use (&$results) { Chris@0: ksort($results); Chris@0: return $results; Chris@0: }); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Given an iterator that yields promises or values, returns a promise that is Chris@0: * fulfilled with a null value when the iterator has been consumed or the Chris@0: * aggregate promise has been fulfilled or rejected. Chris@0: * Chris@0: * $onFulfilled is a function that accepts the fulfilled value, iterator Chris@0: * index, and the aggregate promise. The callback can invoke any necessary side Chris@0: * effects and choose to resolve or reject the aggregate promise if needed. Chris@0: * Chris@0: * $onRejected is a function that accepts the rejection reason, iterator Chris@0: * index, and the aggregate promise. The callback can invoke any necessary side Chris@0: * effects and choose to resolve or reject the aggregate promise if needed. Chris@0: * Chris@0: * @param mixed $iterable Iterator or array to iterate over. Chris@0: * @param callable $onFulfilled Chris@0: * @param callable $onRejected Chris@0: * Chris@0: * @return PromiseInterface Chris@0: */ Chris@0: function each( Chris@0: $iterable, Chris@0: callable $onFulfilled = null, Chris@0: callable $onRejected = null Chris@0: ) { Chris@0: return (new EachPromise($iterable, [ Chris@0: 'fulfilled' => $onFulfilled, Chris@0: 'rejected' => $onRejected Chris@0: ]))->promise(); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Like each, but only allows a certain number of outstanding promises at any Chris@0: * given time. Chris@0: * Chris@0: * $concurrency may be an integer or a function that accepts the number of Chris@0: * pending promises and returns a numeric concurrency limit value to allow for Chris@0: * dynamic a concurrency size. Chris@0: * Chris@0: * @param mixed $iterable Chris@0: * @param int|callable $concurrency Chris@0: * @param callable $onFulfilled Chris@0: * @param callable $onRejected Chris@0: * Chris@0: * @return PromiseInterface Chris@0: */ Chris@0: function each_limit( Chris@0: $iterable, Chris@0: $concurrency, Chris@0: callable $onFulfilled = null, Chris@0: callable $onRejected = null Chris@0: ) { Chris@0: return (new EachPromise($iterable, [ Chris@0: 'fulfilled' => $onFulfilled, Chris@0: 'rejected' => $onRejected, Chris@0: 'concurrency' => $concurrency Chris@0: ]))->promise(); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Like each_limit, but ensures that no promise in the given $iterable argument Chris@0: * is rejected. If any promise is rejected, then the aggregate promise is Chris@0: * rejected with the encountered rejection. Chris@0: * Chris@0: * @param mixed $iterable Chris@0: * @param int|callable $concurrency Chris@0: * @param callable $onFulfilled Chris@0: * Chris@0: * @return PromiseInterface Chris@0: */ Chris@0: function each_limit_all( Chris@0: $iterable, Chris@0: $concurrency, Chris@0: callable $onFulfilled = null Chris@0: ) { Chris@0: return each_limit( Chris@0: $iterable, Chris@0: $concurrency, Chris@0: $onFulfilled, Chris@0: function ($reason, $idx, PromiseInterface $aggregate) { Chris@0: $aggregate->reject($reason); Chris@0: } Chris@0: ); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Returns true if a promise is fulfilled. Chris@0: * Chris@0: * @param PromiseInterface $promise Chris@0: * Chris@0: * @return bool Chris@0: */ Chris@0: function is_fulfilled(PromiseInterface $promise) Chris@0: { Chris@0: return $promise->getState() === PromiseInterface::FULFILLED; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Returns true if a promise is rejected. Chris@0: * Chris@0: * @param PromiseInterface $promise Chris@0: * Chris@0: * @return bool Chris@0: */ Chris@0: function is_rejected(PromiseInterface $promise) Chris@0: { Chris@0: return $promise->getState() === PromiseInterface::REJECTED; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Returns true if a promise is fulfilled or rejected. Chris@0: * Chris@0: * @param PromiseInterface $promise Chris@0: * Chris@0: * @return bool Chris@0: */ Chris@0: function is_settled(PromiseInterface $promise) Chris@0: { Chris@0: return $promise->getState() !== PromiseInterface::PENDING; Chris@0: } Chris@0: Chris@0: /** Chris@0: * @see Coroutine Chris@0: * Chris@0: * @param callable $generatorFn Chris@0: * Chris@0: * @return PromiseInterface Chris@0: */ Chris@0: function coroutine(callable $generatorFn) Chris@0: { Chris@0: return new Coroutine($generatorFn); Chris@0: }