annotate vendor/guzzlehttp/guzzle/src/Pool.php @ 19:fa3358dc1485 tip

Add ndrum files
author Chris Cannam
date Wed, 28 Aug 2019 13:14:47 +0100
parents 4c8ae668cc8c
children
rev   line source
Chris@0 1 <?php
Chris@0 2 namespace GuzzleHttp;
Chris@0 3
Chris@0 4 use GuzzleHttp\Promise\PromisorInterface;
Chris@0 5 use Psr\Http\Message\RequestInterface;
Chris@0 6 use GuzzleHttp\Promise\EachPromise;
Chris@0 7
Chris@0 8 /**
Chris@0 9 * Sends and iterator of requests concurrently using a capped pool size.
Chris@0 10 *
Chris@0 11 * The pool will read from an iterator until it is cancelled or until the
Chris@0 12 * iterator is consumed. When a request is yielded, the request is sent after
Chris@0 13 * applying the "request_options" request options (if provided in the ctor).
Chris@0 14 *
Chris@0 15 * When a function is yielded by the iterator, the function is provided the
Chris@0 16 * "request_options" array that should be merged on top of any existing
Chris@0 17 * options, and the function MUST then return a wait-able promise.
Chris@0 18 */
Chris@0 19 class Pool implements PromisorInterface
Chris@0 20 {
Chris@0 21 /** @var EachPromise */
Chris@0 22 private $each;
Chris@0 23
Chris@0 24 /**
Chris@0 25 * @param ClientInterface $client Client used to send the requests.
Chris@0 26 * @param array|\Iterator $requests Requests or functions that return
Chris@0 27 * requests to send concurrently.
Chris@0 28 * @param array $config Associative array of options
Chris@0 29 * - concurrency: (int) Maximum number of requests to send concurrently
Chris@0 30 * - options: Array of request options to apply to each request.
Chris@0 31 * - fulfilled: (callable) Function to invoke when a request completes.
Chris@0 32 * - rejected: (callable) Function to invoke when a request is rejected.
Chris@0 33 */
Chris@0 34 public function __construct(
Chris@0 35 ClientInterface $client,
Chris@0 36 $requests,
Chris@0 37 array $config = []
Chris@0 38 ) {
Chris@0 39 // Backwards compatibility.
Chris@0 40 if (isset($config['pool_size'])) {
Chris@0 41 $config['concurrency'] = $config['pool_size'];
Chris@0 42 } elseif (!isset($config['concurrency'])) {
Chris@0 43 $config['concurrency'] = 25;
Chris@0 44 }
Chris@0 45
Chris@0 46 if (isset($config['options'])) {
Chris@0 47 $opts = $config['options'];
Chris@0 48 unset($config['options']);
Chris@0 49 } else {
Chris@0 50 $opts = [];
Chris@0 51 }
Chris@0 52
Chris@0 53 $iterable = \GuzzleHttp\Promise\iter_for($requests);
Chris@0 54 $requests = function () use ($iterable, $client, $opts) {
Chris@0 55 foreach ($iterable as $key => $rfn) {
Chris@0 56 if ($rfn instanceof RequestInterface) {
Chris@0 57 yield $key => $client->sendAsync($rfn, $opts);
Chris@0 58 } elseif (is_callable($rfn)) {
Chris@0 59 yield $key => $rfn($opts);
Chris@0 60 } else {
Chris@0 61 throw new \InvalidArgumentException('Each value yielded by '
Chris@0 62 . 'the iterator must be a Psr7\Http\Message\RequestInterface '
Chris@0 63 . 'or a callable that returns a promise that fulfills '
Chris@0 64 . 'with a Psr7\Message\Http\ResponseInterface object.');
Chris@0 65 }
Chris@0 66 }
Chris@0 67 };
Chris@0 68
Chris@0 69 $this->each = new EachPromise($requests(), $config);
Chris@0 70 }
Chris@0 71
Chris@0 72 public function promise()
Chris@0 73 {
Chris@0 74 return $this->each->promise();
Chris@0 75 }
Chris@0 76
Chris@0 77 /**
Chris@0 78 * Sends multiple requests concurrently and returns an array of responses
Chris@0 79 * and exceptions that uses the same ordering as the provided requests.
Chris@0 80 *
Chris@0 81 * IMPORTANT: This method keeps every request and response in memory, and
Chris@0 82 * as such, is NOT recommended when sending a large number or an
Chris@0 83 * indeterminate number of requests concurrently.
Chris@0 84 *
Chris@0 85 * @param ClientInterface $client Client used to send the requests
Chris@0 86 * @param array|\Iterator $requests Requests to send concurrently.
Chris@0 87 * @param array $options Passes through the options available in
Chris@0 88 * {@see GuzzleHttp\Pool::__construct}
Chris@0 89 *
Chris@0 90 * @return array Returns an array containing the response or an exception
Chris@0 91 * in the same order that the requests were sent.
Chris@0 92 * @throws \InvalidArgumentException if the event format is incorrect.
Chris@0 93 */
Chris@0 94 public static function batch(
Chris@0 95 ClientInterface $client,
Chris@0 96 $requests,
Chris@0 97 array $options = []
Chris@0 98 ) {
Chris@0 99 $res = [];
Chris@0 100 self::cmpCallback($options, 'fulfilled', $res);
Chris@0 101 self::cmpCallback($options, 'rejected', $res);
Chris@0 102 $pool = new static($client, $requests, $options);
Chris@0 103 $pool->promise()->wait();
Chris@0 104 ksort($res);
Chris@0 105
Chris@0 106 return $res;
Chris@0 107 }
Chris@0 108
Chris@0 109 private static function cmpCallback(array &$options, $name, array &$results)
Chris@0 110 {
Chris@0 111 if (!isset($options[$name])) {
Chris@0 112 $options[$name] = function ($v, $k) use (&$results) {
Chris@0 113 $results[$k] = $v;
Chris@0 114 };
Chris@0 115 } else {
Chris@0 116 $currentFn = $options[$name];
Chris@0 117 $options[$name] = function ($v, $k) use (&$results, $currentFn) {
Chris@0 118 $currentFn($v, $k);
Chris@0 119 $results[$k] = $v;
Chris@0 120 };
Chris@0 121 }
Chris@0 122 }
Chris@0 123 }