Mercurial > hg > isophonics-drupal-site
comparison vendor/guzzlehttp/promises/src/EachPromise.php @ 0:4c8ae668cc8c
Initial import (non-working)
author | Chris Cannam |
---|---|
date | Wed, 29 Nov 2017 16:09:58 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4c8ae668cc8c |
---|---|
1 <?php | |
2 namespace GuzzleHttp\Promise; | |
3 | |
4 /** | |
5 * Represents a promise that iterates over many promises and invokes | |
6 * side-effect functions in the process. | |
7 */ | |
8 class EachPromise implements PromisorInterface | |
9 { | |
10 private $pending = []; | |
11 | |
12 /** @var \Iterator */ | |
13 private $iterable; | |
14 | |
15 /** @var callable|int */ | |
16 private $concurrency; | |
17 | |
18 /** @var callable */ | |
19 private $onFulfilled; | |
20 | |
21 /** @var callable */ | |
22 private $onRejected; | |
23 | |
24 /** @var Promise */ | |
25 private $aggregate; | |
26 | |
27 /** @var bool */ | |
28 private $mutex; | |
29 | |
30 /** | |
31 * Configuration hash can include the following key value pairs: | |
32 * | |
33 * - fulfilled: (callable) Invoked when a promise fulfills. The function | |
34 * is invoked with three arguments: the fulfillment value, the index | |
35 * position from the iterable list of the promise, and the aggregate | |
36 * promise that manages all of the promises. The aggregate promise may | |
37 * be resolved from within the callback to short-circuit the promise. | |
38 * - rejected: (callable) Invoked when a promise is rejected. The | |
39 * function is invoked with three arguments: the rejection reason, the | |
40 * index position from the iterable list of the promise, and the | |
41 * aggregate promise that manages all of the promises. The aggregate | |
42 * promise may be resolved from within the callback to short-circuit | |
43 * the promise. | |
44 * - concurrency: (integer) Pass this configuration option to limit the | |
45 * allowed number of outstanding concurrently executing promises, | |
46 * creating a capped pool of promises. There is no limit by default. | |
47 * | |
48 * @param mixed $iterable Promises or values to iterate. | |
49 * @param array $config Configuration options | |
50 */ | |
51 public function __construct($iterable, array $config = []) | |
52 { | |
53 $this->iterable = iter_for($iterable); | |
54 | |
55 if (isset($config['concurrency'])) { | |
56 $this->concurrency = $config['concurrency']; | |
57 } | |
58 | |
59 if (isset($config['fulfilled'])) { | |
60 $this->onFulfilled = $config['fulfilled']; | |
61 } | |
62 | |
63 if (isset($config['rejected'])) { | |
64 $this->onRejected = $config['rejected']; | |
65 } | |
66 } | |
67 | |
68 public function promise() | |
69 { | |
70 if ($this->aggregate) { | |
71 return $this->aggregate; | |
72 } | |
73 | |
74 try { | |
75 $this->createPromise(); | |
76 $this->iterable->rewind(); | |
77 $this->refillPending(); | |
78 } catch (\Throwable $e) { | |
79 $this->aggregate->reject($e); | |
80 } catch (\Exception $e) { | |
81 $this->aggregate->reject($e); | |
82 } | |
83 | |
84 return $this->aggregate; | |
85 } | |
86 | |
87 private function createPromise() | |
88 { | |
89 $this->mutex = false; | |
90 $this->aggregate = new Promise(function () { | |
91 reset($this->pending); | |
92 if (empty($this->pending) && !$this->iterable->valid()) { | |
93 $this->aggregate->resolve(null); | |
94 return; | |
95 } | |
96 | |
97 // Consume a potentially fluctuating list of promises while | |
98 // ensuring that indexes are maintained (precluding array_shift). | |
99 while ($promise = current($this->pending)) { | |
100 next($this->pending); | |
101 $promise->wait(); | |
102 if ($this->aggregate->getState() !== PromiseInterface::PENDING) { | |
103 return; | |
104 } | |
105 } | |
106 }); | |
107 | |
108 // Clear the references when the promise is resolved. | |
109 $clearFn = function () { | |
110 $this->iterable = $this->concurrency = $this->pending = null; | |
111 $this->onFulfilled = $this->onRejected = null; | |
112 }; | |
113 | |
114 $this->aggregate->then($clearFn, $clearFn); | |
115 } | |
116 | |
117 private function refillPending() | |
118 { | |
119 if (!$this->concurrency) { | |
120 // Add all pending promises. | |
121 while ($this->addPending() && $this->advanceIterator()); | |
122 return; | |
123 } | |
124 | |
125 // Add only up to N pending promises. | |
126 $concurrency = is_callable($this->concurrency) | |
127 ? call_user_func($this->concurrency, count($this->pending)) | |
128 : $this->concurrency; | |
129 $concurrency = max($concurrency - count($this->pending), 0); | |
130 // Concurrency may be set to 0 to disallow new promises. | |
131 if (!$concurrency) { | |
132 return; | |
133 } | |
134 // Add the first pending promise. | |
135 $this->addPending(); | |
136 // Note this is special handling for concurrency=1 so that we do | |
137 // not advance the iterator after adding the first promise. This | |
138 // helps work around issues with generators that might not have the | |
139 // next value to yield until promise callbacks are called. | |
140 while (--$concurrency | |
141 && $this->advanceIterator() | |
142 && $this->addPending()); | |
143 } | |
144 | |
145 private function addPending() | |
146 { | |
147 if (!$this->iterable || !$this->iterable->valid()) { | |
148 return false; | |
149 } | |
150 | |
151 $promise = promise_for($this->iterable->current()); | |
152 $idx = $this->iterable->key(); | |
153 | |
154 $this->pending[$idx] = $promise->then( | |
155 function ($value) use ($idx) { | |
156 if ($this->onFulfilled) { | |
157 call_user_func( | |
158 $this->onFulfilled, $value, $idx, $this->aggregate | |
159 ); | |
160 } | |
161 $this->step($idx); | |
162 }, | |
163 function ($reason) use ($idx) { | |
164 if ($this->onRejected) { | |
165 call_user_func( | |
166 $this->onRejected, $reason, $idx, $this->aggregate | |
167 ); | |
168 } | |
169 $this->step($idx); | |
170 } | |
171 ); | |
172 | |
173 return true; | |
174 } | |
175 | |
176 private function advanceIterator() | |
177 { | |
178 // Place a lock on the iterator so that we ensure to not recurse, | |
179 // preventing fatal generator errors. | |
180 if ($this->mutex) { | |
181 return false; | |
182 } | |
183 | |
184 $this->mutex = true; | |
185 | |
186 try { | |
187 $this->iterable->next(); | |
188 $this->mutex = false; | |
189 return true; | |
190 } catch (\Throwable $e) { | |
191 $this->aggregate->reject($e); | |
192 $this->mutex = false; | |
193 return false; | |
194 } catch (\Exception $e) { | |
195 $this->aggregate->reject($e); | |
196 $this->mutex = false; | |
197 return false; | |
198 } | |
199 } | |
200 | |
201 private function step($idx) | |
202 { | |
203 // If the promise was already resolved, then ignore this step. | |
204 if ($this->aggregate->getState() !== PromiseInterface::PENDING) { | |
205 return; | |
206 } | |
207 | |
208 unset($this->pending[$idx]); | |
209 | |
210 // Only refill pending promises if we are not locked, preventing the | |
211 // EachPromise to recursively invoke the provided iterator, which | |
212 // cause a fatal error: "Cannot resume an already running generator" | |
213 if ($this->advanceIterator() && !$this->checkIfFinished()) { | |
214 // Add more pending promises if possible. | |
215 $this->refillPending(); | |
216 } | |
217 } | |
218 | |
219 private function checkIfFinished() | |
220 { | |
221 if (!$this->pending && !$this->iterable->valid()) { | |
222 // Resolve the promise if there's nothing left to do. | |
223 $this->aggregate->resolve(null); | |
224 return true; | |
225 } | |
226 | |
227 return false; | |
228 } | |
229 } |