annotate vendor/guzzlehttp/promises/src/functions.php @ 19:fa3358dc1485 tip

Add ndrum files
author Chris Cannam
date Wed, 28 Aug 2019 13:14:47 +0100
parents 4c8ae668cc8c
children
rev   line source
Chris@0 1 <?php
Chris@0 2 namespace GuzzleHttp\Promise;
Chris@0 3
Chris@0 4 /**
Chris@0 5 * Get the global task queue used for promise resolution.
Chris@0 6 *
Chris@0 7 * This task queue MUST be run in an event loop in order for promises to be
Chris@0 8 * settled asynchronously. It will be automatically run when synchronously
Chris@0 9 * waiting on a promise.
Chris@0 10 *
Chris@0 11 * <code>
Chris@0 12 * while ($eventLoop->isRunning()) {
Chris@0 13 * GuzzleHttp\Promise\queue()->run();
Chris@0 14 * }
Chris@0 15 * </code>
Chris@0 16 *
Chris@0 17 * @param TaskQueueInterface $assign Optionally specify a new queue instance.
Chris@0 18 *
Chris@0 19 * @return TaskQueueInterface
Chris@0 20 */
Chris@0 21 function queue(TaskQueueInterface $assign = null)
Chris@0 22 {
Chris@0 23 static $queue;
Chris@0 24
Chris@0 25 if ($assign) {
Chris@0 26 $queue = $assign;
Chris@0 27 } elseif (!$queue) {
Chris@0 28 $queue = new TaskQueue();
Chris@0 29 }
Chris@0 30
Chris@0 31 return $queue;
Chris@0 32 }
Chris@0 33
Chris@0 34 /**
Chris@0 35 * Adds a function to run in the task queue when it is next `run()` and returns
Chris@0 36 * a promise that is fulfilled or rejected with the result.
Chris@0 37 *
Chris@0 38 * @param callable $task Task function to run.
Chris@0 39 *
Chris@0 40 * @return PromiseInterface
Chris@0 41 */
Chris@0 42 function task(callable $task)
Chris@0 43 {
Chris@0 44 $queue = queue();
Chris@0 45 $promise = new Promise([$queue, 'run']);
Chris@0 46 $queue->add(function () use ($task, $promise) {
Chris@0 47 try {
Chris@0 48 $promise->resolve($task());
Chris@0 49 } catch (\Throwable $e) {
Chris@0 50 $promise->reject($e);
Chris@0 51 } catch (\Exception $e) {
Chris@0 52 $promise->reject($e);
Chris@0 53 }
Chris@0 54 });
Chris@0 55
Chris@0 56 return $promise;
Chris@0 57 }
Chris@0 58
Chris@0 59 /**
Chris@0 60 * Creates a promise for a value if the value is not a promise.
Chris@0 61 *
Chris@0 62 * @param mixed $value Promise or value.
Chris@0 63 *
Chris@0 64 * @return PromiseInterface
Chris@0 65 */
Chris@0 66 function promise_for($value)
Chris@0 67 {
Chris@0 68 if ($value instanceof PromiseInterface) {
Chris@0 69 return $value;
Chris@0 70 }
Chris@0 71
Chris@0 72 // Return a Guzzle promise that shadows the given promise.
Chris@0 73 if (method_exists($value, 'then')) {
Chris@0 74 $wfn = method_exists($value, 'wait') ? [$value, 'wait'] : null;
Chris@0 75 $cfn = method_exists($value, 'cancel') ? [$value, 'cancel'] : null;
Chris@0 76 $promise = new Promise($wfn, $cfn);
Chris@0 77 $value->then([$promise, 'resolve'], [$promise, 'reject']);
Chris@0 78 return $promise;
Chris@0 79 }
Chris@0 80
Chris@0 81 return new FulfilledPromise($value);
Chris@0 82 }
Chris@0 83
Chris@0 84 /**
Chris@0 85 * Creates a rejected promise for a reason if the reason is not a promise. If
Chris@0 86 * the provided reason is a promise, then it is returned as-is.
Chris@0 87 *
Chris@0 88 * @param mixed $reason Promise or reason.
Chris@0 89 *
Chris@0 90 * @return PromiseInterface
Chris@0 91 */
Chris@0 92 function rejection_for($reason)
Chris@0 93 {
Chris@0 94 if ($reason instanceof PromiseInterface) {
Chris@0 95 return $reason;
Chris@0 96 }
Chris@0 97
Chris@0 98 return new RejectedPromise($reason);
Chris@0 99 }
Chris@0 100
Chris@0 101 /**
Chris@0 102 * Create an exception for a rejected promise value.
Chris@0 103 *
Chris@0 104 * @param mixed $reason
Chris@0 105 *
Chris@0 106 * @return \Exception|\Throwable
Chris@0 107 */
Chris@0 108 function exception_for($reason)
Chris@0 109 {
Chris@0 110 return $reason instanceof \Exception || $reason instanceof \Throwable
Chris@0 111 ? $reason
Chris@0 112 : new RejectionException($reason);
Chris@0 113 }
Chris@0 114
Chris@0 115 /**
Chris@0 116 * Returns an iterator for the given value.
Chris@0 117 *
Chris@0 118 * @param mixed $value
Chris@0 119 *
Chris@0 120 * @return \Iterator
Chris@0 121 */
Chris@0 122 function iter_for($value)
Chris@0 123 {
Chris@0 124 if ($value instanceof \Iterator) {
Chris@0 125 return $value;
Chris@0 126 } elseif (is_array($value)) {
Chris@0 127 return new \ArrayIterator($value);
Chris@0 128 } else {
Chris@0 129 return new \ArrayIterator([$value]);
Chris@0 130 }
Chris@0 131 }
Chris@0 132
Chris@0 133 /**
Chris@0 134 * Synchronously waits on a promise to resolve and returns an inspection state
Chris@0 135 * array.
Chris@0 136 *
Chris@0 137 * Returns a state associative array containing a "state" key mapping to a
Chris@0 138 * valid promise state. If the state of the promise is "fulfilled", the array
Chris@0 139 * will contain a "value" key mapping to the fulfilled value of the promise. If
Chris@0 140 * the promise is rejected, the array will contain a "reason" key mapping to
Chris@0 141 * the rejection reason of the promise.
Chris@0 142 *
Chris@0 143 * @param PromiseInterface $promise Promise or value.
Chris@0 144 *
Chris@0 145 * @return array
Chris@0 146 */
Chris@0 147 function inspect(PromiseInterface $promise)
Chris@0 148 {
Chris@0 149 try {
Chris@0 150 return [
Chris@0 151 'state' => PromiseInterface::FULFILLED,
Chris@0 152 'value' => $promise->wait()
Chris@0 153 ];
Chris@0 154 } catch (RejectionException $e) {
Chris@0 155 return ['state' => PromiseInterface::REJECTED, 'reason' => $e->getReason()];
Chris@0 156 } catch (\Throwable $e) {
Chris@0 157 return ['state' => PromiseInterface::REJECTED, 'reason' => $e];
Chris@0 158 } catch (\Exception $e) {
Chris@0 159 return ['state' => PromiseInterface::REJECTED, 'reason' => $e];
Chris@0 160 }
Chris@0 161 }
Chris@0 162
Chris@0 163 /**
Chris@0 164 * Waits on all of the provided promises, but does not unwrap rejected promises
Chris@0 165 * as thrown exception.
Chris@0 166 *
Chris@0 167 * Returns an array of inspection state arrays.
Chris@0 168 *
Chris@0 169 * @param PromiseInterface[] $promises Traversable of promises to wait upon.
Chris@0 170 *
Chris@0 171 * @return array
Chris@0 172 * @see GuzzleHttp\Promise\inspect for the inspection state array format.
Chris@0 173 */
Chris@0 174 function inspect_all($promises)
Chris@0 175 {
Chris@0 176 $results = [];
Chris@0 177 foreach ($promises as $key => $promise) {
Chris@0 178 $results[$key] = inspect($promise);
Chris@0 179 }
Chris@0 180
Chris@0 181 return $results;
Chris@0 182 }
Chris@0 183
Chris@0 184 /**
Chris@0 185 * Waits on all of the provided promises and returns the fulfilled values.
Chris@0 186 *
Chris@0 187 * Returns an array that contains the value of each promise (in the same order
Chris@0 188 * the promises were provided). An exception is thrown if any of the promises
Chris@0 189 * are rejected.
Chris@0 190 *
Chris@0 191 * @param mixed $promises Iterable of PromiseInterface objects to wait on.
Chris@0 192 *
Chris@0 193 * @return array
Chris@0 194 * @throws \Exception on error
Chris@0 195 * @throws \Throwable on error in PHP >=7
Chris@0 196 */
Chris@0 197 function unwrap($promises)
Chris@0 198 {
Chris@0 199 $results = [];
Chris@0 200 foreach ($promises as $key => $promise) {
Chris@0 201 $results[$key] = $promise->wait();
Chris@0 202 }
Chris@0 203
Chris@0 204 return $results;
Chris@0 205 }
Chris@0 206
Chris@0 207 /**
Chris@0 208 * Given an array of promises, return a promise that is fulfilled when all the
Chris@0 209 * items in the array are fulfilled.
Chris@0 210 *
Chris@0 211 * The promise's fulfillment value is an array with fulfillment values at
Chris@0 212 * respective positions to the original array. If any promise in the array
Chris@0 213 * rejects, the returned promise is rejected with the rejection reason.
Chris@0 214 *
Chris@0 215 * @param mixed $promises Promises or values.
Chris@0 216 *
Chris@0 217 * @return PromiseInterface
Chris@0 218 */
Chris@0 219 function all($promises)
Chris@0 220 {
Chris@0 221 $results = [];
Chris@0 222 return each(
Chris@0 223 $promises,
Chris@0 224 function ($value, $idx) use (&$results) {
Chris@0 225 $results[$idx] = $value;
Chris@0 226 },
Chris@0 227 function ($reason, $idx, Promise $aggregate) {
Chris@0 228 $aggregate->reject($reason);
Chris@0 229 }
Chris@0 230 )->then(function () use (&$results) {
Chris@0 231 ksort($results);
Chris@0 232 return $results;
Chris@0 233 });
Chris@0 234 }
Chris@0 235
Chris@0 236 /**
Chris@0 237 * Initiate a competitive race between multiple promises or values (values will
Chris@0 238 * become immediately fulfilled promises).
Chris@0 239 *
Chris@0 240 * When count amount of promises have been fulfilled, the returned promise is
Chris@0 241 * fulfilled with an array that contains the fulfillment values of the winners
Chris@0 242 * in order of resolution.
Chris@0 243 *
Chris@0 244 * This prommise is rejected with a {@see GuzzleHttp\Promise\AggregateException}
Chris@0 245 * if the number of fulfilled promises is less than the desired $count.
Chris@0 246 *
Chris@0 247 * @param int $count Total number of promises.
Chris@0 248 * @param mixed $promises Promises or values.
Chris@0 249 *
Chris@0 250 * @return PromiseInterface
Chris@0 251 */
Chris@0 252 function some($count, $promises)
Chris@0 253 {
Chris@0 254 $results = [];
Chris@0 255 $rejections = [];
Chris@0 256
Chris@0 257 return each(
Chris@0 258 $promises,
Chris@0 259 function ($value, $idx, PromiseInterface $p) use (&$results, $count) {
Chris@0 260 if ($p->getState() !== PromiseInterface::PENDING) {
Chris@0 261 return;
Chris@0 262 }
Chris@0 263 $results[$idx] = $value;
Chris@0 264 if (count($results) >= $count) {
Chris@0 265 $p->resolve(null);
Chris@0 266 }
Chris@0 267 },
Chris@0 268 function ($reason) use (&$rejections) {
Chris@0 269 $rejections[] = $reason;
Chris@0 270 }
Chris@0 271 )->then(
Chris@0 272 function () use (&$results, &$rejections, $count) {
Chris@0 273 if (count($results) !== $count) {
Chris@0 274 throw new AggregateException(
Chris@0 275 'Not enough promises to fulfill count',
Chris@0 276 $rejections
Chris@0 277 );
Chris@0 278 }
Chris@0 279 ksort($results);
Chris@0 280 return array_values($results);
Chris@0 281 }
Chris@0 282 );
Chris@0 283 }
Chris@0 284
Chris@0 285 /**
Chris@0 286 * Like some(), with 1 as count. However, if the promise fulfills, the
Chris@0 287 * fulfillment value is not an array of 1 but the value directly.
Chris@0 288 *
Chris@0 289 * @param mixed $promises Promises or values.
Chris@0 290 *
Chris@0 291 * @return PromiseInterface
Chris@0 292 */
Chris@0 293 function any($promises)
Chris@0 294 {
Chris@0 295 return some(1, $promises)->then(function ($values) { return $values[0]; });
Chris@0 296 }
Chris@0 297
Chris@0 298 /**
Chris@0 299 * Returns a promise that is fulfilled when all of the provided promises have
Chris@0 300 * been fulfilled or rejected.
Chris@0 301 *
Chris@0 302 * The returned promise is fulfilled with an array of inspection state arrays.
Chris@0 303 *
Chris@0 304 * @param mixed $promises Promises or values.
Chris@0 305 *
Chris@0 306 * @return PromiseInterface
Chris@0 307 * @see GuzzleHttp\Promise\inspect for the inspection state array format.
Chris@0 308 */
Chris@0 309 function settle($promises)
Chris@0 310 {
Chris@0 311 $results = [];
Chris@0 312
Chris@0 313 return each(
Chris@0 314 $promises,
Chris@0 315 function ($value, $idx) use (&$results) {
Chris@0 316 $results[$idx] = ['state' => PromiseInterface::FULFILLED, 'value' => $value];
Chris@0 317 },
Chris@0 318 function ($reason, $idx) use (&$results) {
Chris@0 319 $results[$idx] = ['state' => PromiseInterface::REJECTED, 'reason' => $reason];
Chris@0 320 }
Chris@0 321 )->then(function () use (&$results) {
Chris@0 322 ksort($results);
Chris@0 323 return $results;
Chris@0 324 });
Chris@0 325 }
Chris@0 326
Chris@0 327 /**
Chris@0 328 * Given an iterator that yields promises or values, returns a promise that is
Chris@0 329 * fulfilled with a null value when the iterator has been consumed or the
Chris@0 330 * aggregate promise has been fulfilled or rejected.
Chris@0 331 *
Chris@0 332 * $onFulfilled is a function that accepts the fulfilled value, iterator
Chris@0 333 * index, and the aggregate promise. The callback can invoke any necessary side
Chris@0 334 * effects and choose to resolve or reject the aggregate promise if needed.
Chris@0 335 *
Chris@0 336 * $onRejected is a function that accepts the rejection reason, iterator
Chris@0 337 * index, and the aggregate promise. The callback can invoke any necessary side
Chris@0 338 * effects and choose to resolve or reject the aggregate promise if needed.
Chris@0 339 *
Chris@0 340 * @param mixed $iterable Iterator or array to iterate over.
Chris@0 341 * @param callable $onFulfilled
Chris@0 342 * @param callable $onRejected
Chris@0 343 *
Chris@0 344 * @return PromiseInterface
Chris@0 345 */
Chris@0 346 function each(
Chris@0 347 $iterable,
Chris@0 348 callable $onFulfilled = null,
Chris@0 349 callable $onRejected = null
Chris@0 350 ) {
Chris@0 351 return (new EachPromise($iterable, [
Chris@0 352 'fulfilled' => $onFulfilled,
Chris@0 353 'rejected' => $onRejected
Chris@0 354 ]))->promise();
Chris@0 355 }
Chris@0 356
Chris@0 357 /**
Chris@0 358 * Like each, but only allows a certain number of outstanding promises at any
Chris@0 359 * given time.
Chris@0 360 *
Chris@0 361 * $concurrency may be an integer or a function that accepts the number of
Chris@0 362 * pending promises and returns a numeric concurrency limit value to allow for
Chris@0 363 * dynamic a concurrency size.
Chris@0 364 *
Chris@0 365 * @param mixed $iterable
Chris@0 366 * @param int|callable $concurrency
Chris@0 367 * @param callable $onFulfilled
Chris@0 368 * @param callable $onRejected
Chris@0 369 *
Chris@0 370 * @return PromiseInterface
Chris@0 371 */
Chris@0 372 function each_limit(
Chris@0 373 $iterable,
Chris@0 374 $concurrency,
Chris@0 375 callable $onFulfilled = null,
Chris@0 376 callable $onRejected = null
Chris@0 377 ) {
Chris@0 378 return (new EachPromise($iterable, [
Chris@0 379 'fulfilled' => $onFulfilled,
Chris@0 380 'rejected' => $onRejected,
Chris@0 381 'concurrency' => $concurrency
Chris@0 382 ]))->promise();
Chris@0 383 }
Chris@0 384
Chris@0 385 /**
Chris@0 386 * Like each_limit, but ensures that no promise in the given $iterable argument
Chris@0 387 * is rejected. If any promise is rejected, then the aggregate promise is
Chris@0 388 * rejected with the encountered rejection.
Chris@0 389 *
Chris@0 390 * @param mixed $iterable
Chris@0 391 * @param int|callable $concurrency
Chris@0 392 * @param callable $onFulfilled
Chris@0 393 *
Chris@0 394 * @return PromiseInterface
Chris@0 395 */
Chris@0 396 function each_limit_all(
Chris@0 397 $iterable,
Chris@0 398 $concurrency,
Chris@0 399 callable $onFulfilled = null
Chris@0 400 ) {
Chris@0 401 return each_limit(
Chris@0 402 $iterable,
Chris@0 403 $concurrency,
Chris@0 404 $onFulfilled,
Chris@0 405 function ($reason, $idx, PromiseInterface $aggregate) {
Chris@0 406 $aggregate->reject($reason);
Chris@0 407 }
Chris@0 408 );
Chris@0 409 }
Chris@0 410
Chris@0 411 /**
Chris@0 412 * Returns true if a promise is fulfilled.
Chris@0 413 *
Chris@0 414 * @param PromiseInterface $promise
Chris@0 415 *
Chris@0 416 * @return bool
Chris@0 417 */
Chris@0 418 function is_fulfilled(PromiseInterface $promise)
Chris@0 419 {
Chris@0 420 return $promise->getState() === PromiseInterface::FULFILLED;
Chris@0 421 }
Chris@0 422
Chris@0 423 /**
Chris@0 424 * Returns true if a promise is rejected.
Chris@0 425 *
Chris@0 426 * @param PromiseInterface $promise
Chris@0 427 *
Chris@0 428 * @return bool
Chris@0 429 */
Chris@0 430 function is_rejected(PromiseInterface $promise)
Chris@0 431 {
Chris@0 432 return $promise->getState() === PromiseInterface::REJECTED;
Chris@0 433 }
Chris@0 434
Chris@0 435 /**
Chris@0 436 * Returns true if a promise is fulfilled or rejected.
Chris@0 437 *
Chris@0 438 * @param PromiseInterface $promise
Chris@0 439 *
Chris@0 440 * @return bool
Chris@0 441 */
Chris@0 442 function is_settled(PromiseInterface $promise)
Chris@0 443 {
Chris@0 444 return $promise->getState() !== PromiseInterface::PENDING;
Chris@0 445 }
Chris@0 446
Chris@0 447 /**
Chris@0 448 * @see Coroutine
Chris@0 449 *
Chris@0 450 * @param callable $generatorFn
Chris@0 451 *
Chris@0 452 * @return PromiseInterface
Chris@0 453 */
Chris@0 454 function coroutine(callable $generatorFn)
Chris@0 455 {
Chris@0 456 return new Coroutine($generatorFn);
Chris@0 457 }