Chris@0: . Chris@0: */ Chris@0: Chris@0: namespace Doctrine\Common\Cache; Chris@0: Chris@0: use Riak\Bucket; Chris@0: use Riak\Connection; Chris@0: use Riak\Input; Chris@0: use Riak\Exception; Chris@0: use Riak\Object; Chris@0: Chris@0: /** Chris@0: * Riak cache provider. Chris@0: * Chris@0: * @link www.doctrine-project.org Chris@0: * @since 1.1 Chris@0: * @author Guilherme Blanco Chris@0: */ Chris@0: class RiakCache extends CacheProvider Chris@0: { Chris@0: const EXPIRES_HEADER = 'X-Riak-Meta-Expires'; Chris@0: Chris@0: /** Chris@0: * @var \Riak\Bucket Chris@0: */ Chris@0: private $bucket; Chris@0: Chris@0: /** Chris@0: * Sets the riak bucket instance to use. Chris@0: * Chris@0: * @param \Riak\Bucket $bucket Chris@0: */ Chris@0: public function __construct(Bucket $bucket) Chris@0: { Chris@0: $this->bucket = $bucket; Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: protected function doFetch($id) Chris@0: { Chris@0: try { Chris@0: $response = $this->bucket->get($id); Chris@0: Chris@0: // No objects found Chris@0: if ( ! $response->hasObject()) { Chris@0: return false; Chris@0: } Chris@0: Chris@0: // Check for attempted siblings Chris@0: $object = ($response->hasSiblings()) Chris@0: ? $this->resolveConflict($id, $response->getVClock(), $response->getObjectList()) Chris@0: : $response->getFirstObject(); Chris@0: Chris@0: // Check for expired object Chris@0: if ($this->isExpired($object)) { Chris@0: $this->bucket->delete($object); Chris@0: Chris@0: return false; Chris@0: } Chris@0: Chris@0: return unserialize($object->getContent()); Chris@0: } catch (Exception\RiakException $e) { Chris@0: // Covers: Chris@0: // - Riak\ConnectionException Chris@0: // - Riak\CommunicationException Chris@0: // - Riak\UnexpectedResponseException Chris@0: // - Riak\NotFoundException Chris@0: } Chris@0: Chris@0: return false; Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: protected function doContains($id) Chris@0: { Chris@0: try { Chris@0: // We only need the HEAD, not the entire object Chris@0: $input = new Input\GetInput(); Chris@0: Chris@0: $input->setReturnHead(true); Chris@0: Chris@0: $response = $this->bucket->get($id, $input); Chris@0: Chris@0: // No objects found Chris@0: if ( ! $response->hasObject()) { Chris@0: return false; Chris@0: } Chris@0: Chris@0: $object = $response->getFirstObject(); Chris@0: Chris@0: // Check for expired object Chris@0: if ($this->isExpired($object)) { Chris@0: $this->bucket->delete($object); Chris@0: Chris@0: return false; Chris@0: } Chris@0: Chris@0: return true; Chris@0: } catch (Exception\RiakException $e) { Chris@0: // Do nothing Chris@0: } Chris@0: Chris@0: return false; Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: protected function doSave($id, $data, $lifeTime = 0) Chris@0: { Chris@0: try { Chris@0: $object = new Object($id); Chris@0: Chris@0: $object->setContent(serialize($data)); Chris@0: Chris@0: if ($lifeTime > 0) { Chris@0: $object->addMetadata(self::EXPIRES_HEADER, (string) (time() + $lifeTime)); Chris@0: } Chris@0: Chris@0: $this->bucket->put($object); Chris@0: Chris@0: return true; Chris@0: } catch (Exception\RiakException $e) { Chris@0: // Do nothing Chris@0: } Chris@0: Chris@0: return false; Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: protected function doDelete($id) Chris@0: { Chris@0: try { Chris@0: $this->bucket->delete($id); Chris@0: Chris@0: return true; Chris@0: } catch (Exception\BadArgumentsException $e) { Chris@0: // Key did not exist on cluster already Chris@0: } catch (Exception\RiakException $e) { Chris@0: // Covers: Chris@0: // - Riak\Exception\ConnectionException Chris@0: // - Riak\Exception\CommunicationException Chris@0: // - Riak\Exception\UnexpectedResponseException Chris@0: } Chris@0: Chris@0: return false; Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: protected function doFlush() Chris@0: { Chris@0: try { Chris@0: $keyList = $this->bucket->getKeyList(); Chris@0: Chris@0: foreach ($keyList as $key) { Chris@0: $this->bucket->delete($key); Chris@0: } Chris@0: Chris@0: return true; Chris@0: } catch (Exception\RiakException $e) { Chris@0: // Do nothing Chris@0: } Chris@0: Chris@0: return false; Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: protected function doGetStats() Chris@0: { Chris@0: // Only exposed through HTTP stats API, not Protocol Buffers API Chris@0: return null; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Check if a given Riak Object have expired. Chris@0: * Chris@0: * @param \Riak\Object $object Chris@0: * Chris@0: * @return bool Chris@0: */ Chris@0: private function isExpired(Object $object) Chris@0: { Chris@0: $metadataMap = $object->getMetadataMap(); Chris@0: Chris@0: return isset($metadataMap[self::EXPIRES_HEADER]) Chris@0: && $metadataMap[self::EXPIRES_HEADER] < time(); Chris@0: } Chris@0: Chris@0: /** Chris@0: * On-read conflict resolution. Applied approach here is last write wins. Chris@0: * Specific needs may override this method to apply alternate conflict resolutions. Chris@0: * Chris@0: * {@internal Riak does not attempt to resolve a write conflict, and store Chris@0: * it as sibling of conflicted one. By following this approach, it is up to Chris@0: * the next read to resolve the conflict. When this happens, your fetched Chris@0: * object will have a list of siblings (read as a list of objects). Chris@0: * In our specific case, we do not care about the intermediate ones since Chris@0: * they are all the same read from storage, and we do apply a last sibling Chris@0: * (last write) wins logic. Chris@0: * If by any means our resolution generates another conflict, it'll up to Chris@0: * next read to properly solve it.} Chris@0: * Chris@0: * @param string $id Chris@0: * @param string $vClock Chris@0: * @param array $objectList Chris@0: * Chris@0: * @return \Riak\Object Chris@0: */ Chris@0: protected function resolveConflict($id, $vClock, array $objectList) Chris@0: { Chris@0: // Our approach here is last-write wins Chris@0: $winner = $objectList[count($objectList)]; Chris@0: Chris@0: $putInput = new Input\PutInput(); Chris@0: $putInput->setVClock($vClock); Chris@0: Chris@0: $mergedObject = new Object($id); Chris@0: $mergedObject->setContent($winner->getContent()); Chris@0: Chris@0: $this->bucket->put($mergedObject, $putInput); Chris@0: Chris@0: return $mergedObject; Chris@0: } Chris@0: }