annotate vendor/guzzlehttp/promises/src/EachPromise.php @ 19:fa3358dc1485 tip

Add ndrum files
author Chris Cannam
date Wed, 28 Aug 2019 13:14:47 +0100
parents 4c8ae668cc8c
children
rev   line source
Chris@0 1 <?php
Chris@0 2 namespace GuzzleHttp\Promise;
Chris@0 3
Chris@0 4 /**
Chris@0 5 * Represents a promise that iterates over many promises and invokes
Chris@0 6 * side-effect functions in the process.
Chris@0 7 */
Chris@0 8 class EachPromise implements PromisorInterface
Chris@0 9 {
Chris@0 10 private $pending = [];
Chris@0 11
Chris@0 12 /** @var \Iterator */
Chris@0 13 private $iterable;
Chris@0 14
Chris@0 15 /** @var callable|int */
Chris@0 16 private $concurrency;
Chris@0 17
Chris@0 18 /** @var callable */
Chris@0 19 private $onFulfilled;
Chris@0 20
Chris@0 21 /** @var callable */
Chris@0 22 private $onRejected;
Chris@0 23
Chris@0 24 /** @var Promise */
Chris@0 25 private $aggregate;
Chris@0 26
Chris@0 27 /** @var bool */
Chris@0 28 private $mutex;
Chris@0 29
Chris@0 30 /**
Chris@0 31 * Configuration hash can include the following key value pairs:
Chris@0 32 *
Chris@0 33 * - fulfilled: (callable) Invoked when a promise fulfills. The function
Chris@0 34 * is invoked with three arguments: the fulfillment value, the index
Chris@0 35 * position from the iterable list of the promise, and the aggregate
Chris@0 36 * promise that manages all of the promises. The aggregate promise may
Chris@0 37 * be resolved from within the callback to short-circuit the promise.
Chris@0 38 * - rejected: (callable) Invoked when a promise is rejected. The
Chris@0 39 * function is invoked with three arguments: the rejection reason, the
Chris@0 40 * index position from the iterable list of the promise, and the
Chris@0 41 * aggregate promise that manages all of the promises. The aggregate
Chris@0 42 * promise may be resolved from within the callback to short-circuit
Chris@0 43 * the promise.
Chris@0 44 * - concurrency: (integer) Pass this configuration option to limit the
Chris@0 45 * allowed number of outstanding concurrently executing promises,
Chris@0 46 * creating a capped pool of promises. There is no limit by default.
Chris@0 47 *
Chris@0 48 * @param mixed $iterable Promises or values to iterate.
Chris@0 49 * @param array $config Configuration options
Chris@0 50 */
Chris@0 51 public function __construct($iterable, array $config = [])
Chris@0 52 {
Chris@0 53 $this->iterable = iter_for($iterable);
Chris@0 54
Chris@0 55 if (isset($config['concurrency'])) {
Chris@0 56 $this->concurrency = $config['concurrency'];
Chris@0 57 }
Chris@0 58
Chris@0 59 if (isset($config['fulfilled'])) {
Chris@0 60 $this->onFulfilled = $config['fulfilled'];
Chris@0 61 }
Chris@0 62
Chris@0 63 if (isset($config['rejected'])) {
Chris@0 64 $this->onRejected = $config['rejected'];
Chris@0 65 }
Chris@0 66 }
Chris@0 67
Chris@0 68 public function promise()
Chris@0 69 {
Chris@0 70 if ($this->aggregate) {
Chris@0 71 return $this->aggregate;
Chris@0 72 }
Chris@0 73
Chris@0 74 try {
Chris@0 75 $this->createPromise();
Chris@0 76 $this->iterable->rewind();
Chris@0 77 $this->refillPending();
Chris@0 78 } catch (\Throwable $e) {
Chris@0 79 $this->aggregate->reject($e);
Chris@0 80 } catch (\Exception $e) {
Chris@0 81 $this->aggregate->reject($e);
Chris@0 82 }
Chris@0 83
Chris@0 84 return $this->aggregate;
Chris@0 85 }
Chris@0 86
Chris@0 87 private function createPromise()
Chris@0 88 {
Chris@0 89 $this->mutex = false;
Chris@0 90 $this->aggregate = new Promise(function () {
Chris@0 91 reset($this->pending);
Chris@0 92 if (empty($this->pending) && !$this->iterable->valid()) {
Chris@0 93 $this->aggregate->resolve(null);
Chris@0 94 return;
Chris@0 95 }
Chris@0 96
Chris@0 97 // Consume a potentially fluctuating list of promises while
Chris@0 98 // ensuring that indexes are maintained (precluding array_shift).
Chris@0 99 while ($promise = current($this->pending)) {
Chris@0 100 next($this->pending);
Chris@0 101 $promise->wait();
Chris@0 102 if ($this->aggregate->getState() !== PromiseInterface::PENDING) {
Chris@0 103 return;
Chris@0 104 }
Chris@0 105 }
Chris@0 106 });
Chris@0 107
Chris@0 108 // Clear the references when the promise is resolved.
Chris@0 109 $clearFn = function () {
Chris@0 110 $this->iterable = $this->concurrency = $this->pending = null;
Chris@0 111 $this->onFulfilled = $this->onRejected = null;
Chris@0 112 };
Chris@0 113
Chris@0 114 $this->aggregate->then($clearFn, $clearFn);
Chris@0 115 }
Chris@0 116
Chris@0 117 private function refillPending()
Chris@0 118 {
Chris@0 119 if (!$this->concurrency) {
Chris@0 120 // Add all pending promises.
Chris@0 121 while ($this->addPending() && $this->advanceIterator());
Chris@0 122 return;
Chris@0 123 }
Chris@0 124
Chris@0 125 // Add only up to N pending promises.
Chris@0 126 $concurrency = is_callable($this->concurrency)
Chris@0 127 ? call_user_func($this->concurrency, count($this->pending))
Chris@0 128 : $this->concurrency;
Chris@0 129 $concurrency = max($concurrency - count($this->pending), 0);
Chris@0 130 // Concurrency may be set to 0 to disallow new promises.
Chris@0 131 if (!$concurrency) {
Chris@0 132 return;
Chris@0 133 }
Chris@0 134 // Add the first pending promise.
Chris@0 135 $this->addPending();
Chris@0 136 // Note this is special handling for concurrency=1 so that we do
Chris@0 137 // not advance the iterator after adding the first promise. This
Chris@0 138 // helps work around issues with generators that might not have the
Chris@0 139 // next value to yield until promise callbacks are called.
Chris@0 140 while (--$concurrency
Chris@0 141 && $this->advanceIterator()
Chris@0 142 && $this->addPending());
Chris@0 143 }
Chris@0 144
Chris@0 145 private function addPending()
Chris@0 146 {
Chris@0 147 if (!$this->iterable || !$this->iterable->valid()) {
Chris@0 148 return false;
Chris@0 149 }
Chris@0 150
Chris@0 151 $promise = promise_for($this->iterable->current());
Chris@0 152 $idx = $this->iterable->key();
Chris@0 153
Chris@0 154 $this->pending[$idx] = $promise->then(
Chris@0 155 function ($value) use ($idx) {
Chris@0 156 if ($this->onFulfilled) {
Chris@0 157 call_user_func(
Chris@0 158 $this->onFulfilled, $value, $idx, $this->aggregate
Chris@0 159 );
Chris@0 160 }
Chris@0 161 $this->step($idx);
Chris@0 162 },
Chris@0 163 function ($reason) use ($idx) {
Chris@0 164 if ($this->onRejected) {
Chris@0 165 call_user_func(
Chris@0 166 $this->onRejected, $reason, $idx, $this->aggregate
Chris@0 167 );
Chris@0 168 }
Chris@0 169 $this->step($idx);
Chris@0 170 }
Chris@0 171 );
Chris@0 172
Chris@0 173 return true;
Chris@0 174 }
Chris@0 175
Chris@0 176 private function advanceIterator()
Chris@0 177 {
Chris@0 178 // Place a lock on the iterator so that we ensure to not recurse,
Chris@0 179 // preventing fatal generator errors.
Chris@0 180 if ($this->mutex) {
Chris@0 181 return false;
Chris@0 182 }
Chris@0 183
Chris@0 184 $this->mutex = true;
Chris@0 185
Chris@0 186 try {
Chris@0 187 $this->iterable->next();
Chris@0 188 $this->mutex = false;
Chris@0 189 return true;
Chris@0 190 } catch (\Throwable $e) {
Chris@0 191 $this->aggregate->reject($e);
Chris@0 192 $this->mutex = false;
Chris@0 193 return false;
Chris@0 194 } catch (\Exception $e) {
Chris@0 195 $this->aggregate->reject($e);
Chris@0 196 $this->mutex = false;
Chris@0 197 return false;
Chris@0 198 }
Chris@0 199 }
Chris@0 200
Chris@0 201 private function step($idx)
Chris@0 202 {
Chris@0 203 // If the promise was already resolved, then ignore this step.
Chris@0 204 if ($this->aggregate->getState() !== PromiseInterface::PENDING) {
Chris@0 205 return;
Chris@0 206 }
Chris@0 207
Chris@0 208 unset($this->pending[$idx]);
Chris@0 209
Chris@0 210 // Only refill pending promises if we are not locked, preventing the
Chris@0 211 // EachPromise to recursively invoke the provided iterator, which
Chris@0 212 // cause a fatal error: "Cannot resume an already running generator"
Chris@0 213 if ($this->advanceIterator() && !$this->checkIfFinished()) {
Chris@0 214 // Add more pending promises if possible.
Chris@0 215 $this->refillPending();
Chris@0 216 }
Chris@0 217 }
Chris@0 218
Chris@0 219 private function checkIfFinished()
Chris@0 220 {
Chris@0 221 if (!$this->pending && !$this->iterable->valid()) {
Chris@0 222 // Resolve the promise if there's nothing left to do.
Chris@0 223 $this->aggregate->resolve(null);
Chris@0 224 return true;
Chris@0 225 }
Chris@0 226
Chris@0 227 return false;
Chris@0 228 }
Chris@0 229 }