Chris@0: factory = isset($options['handle_factory']) Chris@0: ? $options['handle_factory'] : new CurlFactory(50); Chris@0: $this->selectTimeout = isset($options['select_timeout']) Chris@0: ? $options['select_timeout'] : 1; Chris@0: } Chris@0: Chris@0: public function __get($name) Chris@0: { Chris@0: if ($name === '_mh') { Chris@0: return $this->_mh = curl_multi_init(); Chris@0: } Chris@0: Chris@0: throw new \BadMethodCallException(); Chris@0: } Chris@0: Chris@0: public function __destruct() Chris@0: { Chris@0: if (isset($this->_mh)) { Chris@0: curl_multi_close($this->_mh); Chris@0: unset($this->_mh); Chris@0: } Chris@0: } Chris@0: Chris@0: public function __invoke(RequestInterface $request, array $options) Chris@0: { Chris@0: $easy = $this->factory->create($request, $options); Chris@0: $id = (int) $easy->handle; Chris@0: Chris@0: $promise = new Promise( Chris@0: [$this, 'execute'], Chris@13: function () use ($id) { Chris@13: return $this->cancel($id); Chris@13: } Chris@0: ); Chris@0: Chris@0: $this->addRequest(['easy' => $easy, 'deferred' => $promise]); Chris@0: Chris@0: return $promise; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Ticks the curl event loop. Chris@0: */ Chris@0: public function tick() Chris@0: { Chris@0: // Add any delayed handles if needed. Chris@0: if ($this->delays) { Chris@0: $currentTime = microtime(true); Chris@0: foreach ($this->delays as $id => $delay) { Chris@0: if ($currentTime >= $delay) { Chris@0: unset($this->delays[$id]); Chris@0: curl_multi_add_handle( Chris@0: $this->_mh, Chris@0: $this->handles[$id]['easy']->handle Chris@0: ); Chris@0: } Chris@0: } Chris@0: } Chris@0: Chris@0: // Step through the task queue which may add additional requests. Chris@0: P\queue()->run(); Chris@0: Chris@0: if ($this->active && Chris@0: curl_multi_select($this->_mh, $this->selectTimeout) === -1 Chris@0: ) { Chris@0: // Perform a usleep if a select returns -1. Chris@0: // See: https://bugs.php.net/bug.php?id=61141 Chris@0: usleep(250); Chris@0: } Chris@0: Chris@0: while (curl_multi_exec($this->_mh, $this->active) === CURLM_CALL_MULTI_PERFORM); Chris@0: Chris@0: $this->processMessages(); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Runs until all outstanding connections have completed. Chris@0: */ Chris@0: public function execute() Chris@0: { Chris@0: $queue = P\queue(); Chris@0: Chris@0: while ($this->handles || !$queue->isEmpty()) { Chris@0: // If there are no transfers, then sleep for the next delay Chris@0: if (!$this->active && $this->delays) { Chris@0: usleep($this->timeToNext()); Chris@0: } Chris@0: $this->tick(); Chris@0: } Chris@0: } Chris@0: Chris@0: private function addRequest(array $entry) Chris@0: { Chris@0: $easy = $entry['easy']; Chris@0: $id = (int) $easy->handle; Chris@0: $this->handles[$id] = $entry; Chris@0: if (empty($easy->options['delay'])) { Chris@0: curl_multi_add_handle($this->_mh, $easy->handle); Chris@0: } else { Chris@0: $this->delays[$id] = microtime(true) + ($easy->options['delay'] / 1000); Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * Cancels a handle from sending and removes references to it. Chris@0: * Chris@0: * @param int $id Handle ID to cancel and remove. Chris@0: * Chris@0: * @return bool True on success, false on failure. Chris@0: */ Chris@0: private function cancel($id) Chris@0: { Chris@0: // Cannot cancel if it has been processed. Chris@0: if (!isset($this->handles[$id])) { Chris@0: return false; Chris@0: } Chris@0: Chris@0: $handle = $this->handles[$id]['easy']->handle; Chris@0: unset($this->delays[$id], $this->handles[$id]); Chris@0: curl_multi_remove_handle($this->_mh, $handle); Chris@0: curl_close($handle); Chris@0: Chris@0: return true; Chris@0: } Chris@0: Chris@0: private function processMessages() Chris@0: { Chris@0: while ($done = curl_multi_info_read($this->_mh)) { Chris@0: $id = (int) $done['handle']; Chris@0: curl_multi_remove_handle($this->_mh, $done['handle']); Chris@0: Chris@0: if (!isset($this->handles[$id])) { Chris@0: // Probably was cancelled. Chris@0: continue; Chris@0: } Chris@0: Chris@0: $entry = $this->handles[$id]; Chris@0: unset($this->handles[$id], $this->delays[$id]); Chris@0: $entry['easy']->errno = $done['result']; Chris@0: $entry['deferred']->resolve( Chris@0: CurlFactory::finish( Chris@0: $this, Chris@0: $entry['easy'], Chris@0: $this->factory Chris@0: ) Chris@0: ); Chris@0: } Chris@0: } Chris@0: Chris@0: private function timeToNext() Chris@0: { Chris@0: $currentTime = microtime(true); Chris@0: $nextTime = PHP_INT_MAX; Chris@0: foreach ($this->delays as $time) { Chris@0: if ($time < $nextTime) { Chris@0: $nextTime = $time; Chris@0: } Chris@0: } Chris@0: Chris@0: return max(0, $nextTime - $currentTime) * 1000000; Chris@0: } Chris@0: }