Chris@0: database = $database; Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function acquire($name, $timeout = 30.0) { Chris@0: $name = $this->normalizeName($name); Chris@0: Chris@0: // Insure that the timeout is at least 1 ms. Chris@0: $timeout = max($timeout, 0.001); Chris@0: $expire = microtime(TRUE) + $timeout; Chris@0: if (isset($this->locks[$name])) { Chris@0: // Try to extend the expiration of a lock we already acquired. Chris@0: $success = (bool) $this->database->update('semaphore') Chris@0: ->fields(['expire' => $expire]) Chris@0: ->condition('name', $name) Chris@0: ->condition('value', $this->getLockId()) Chris@0: ->execute(); Chris@0: if (!$success) { Chris@0: // The lock was broken. Chris@0: unset($this->locks[$name]); Chris@0: } Chris@0: return $success; Chris@0: } Chris@0: else { Chris@0: // Optimistically try to acquire the lock, then retry once if it fails. Chris@0: // The first time through the loop cannot be a retry. Chris@0: $retry = FALSE; Chris@0: // We always want to do this code at least once. Chris@0: do { Chris@0: try { Chris@0: $this->database->insert('semaphore') Chris@0: ->fields([ Chris@0: 'name' => $name, Chris@0: 'value' => $this->getLockId(), Chris@0: 'expire' => $expire, Chris@0: ]) Chris@0: ->execute(); Chris@0: // We track all acquired locks in the global variable. Chris@0: $this->locks[$name] = TRUE; Chris@0: // We never need to try again. Chris@0: $retry = FALSE; Chris@0: } Chris@0: catch (IntegrityConstraintViolationException $e) { Chris@0: // Suppress the error. If this is our first pass through the loop, Chris@0: // then $retry is FALSE. In this case, the insert failed because some Chris@0: // other request acquired the lock but did not release it. We decide Chris@0: // whether to retry by checking lockMayBeAvailable(). This will clear Chris@0: // the offending row from the database table in case it has expired. Chris@0: $retry = $retry ? FALSE : $this->lockMayBeAvailable($name); Chris@0: } Chris@0: catch (\Exception $e) { Chris@0: // Create the semaphore table if it does not exist and retry. Chris@0: if ($this->ensureTableExists()) { Chris@0: // Retry only once. Chris@0: $retry = !$retry; Chris@0: } Chris@0: else { Chris@0: throw $e; Chris@0: } Chris@0: } Chris@0: // We only retry in case the first attempt failed, but we then broke Chris@0: // an expired lock. Chris@0: } while ($retry); Chris@0: } Chris@0: return isset($this->locks[$name]); Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function lockMayBeAvailable($name) { Chris@0: $name = $this->normalizeName($name); Chris@0: Chris@0: try { Chris@0: $lock = $this->database->query('SELECT expire, value FROM {semaphore} WHERE name = :name', [':name' => $name])->fetchAssoc(); Chris@0: } Chris@0: catch (\Exception $e) { Chris@0: $this->catchException($e); Chris@0: // If the table does not exist yet then the lock may be available. Chris@0: $lock = FALSE; Chris@0: } Chris@0: if (!$lock) { Chris@0: return TRUE; Chris@0: } Chris@0: $expire = (float) $lock['expire']; Chris@0: $now = microtime(TRUE); Chris@0: if ($now > $expire) { Chris@0: // We check two conditions to prevent a race condition where another Chris@0: // request acquired the lock and set a new expire time. We add a small Chris@0: // number to $expire to avoid errors with float to string conversion. Chris@0: return (bool) $this->database->delete('semaphore') Chris@0: ->condition('name', $name) Chris@0: ->condition('value', $lock['value']) Chris@0: ->condition('expire', 0.0001 + $expire, '<=') Chris@0: ->execute(); Chris@0: } Chris@0: return FALSE; Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function release($name) { Chris@0: $name = $this->normalizeName($name); Chris@0: Chris@0: unset($this->locks[$name]); Chris@0: try { Chris@0: $this->database->delete('semaphore') Chris@0: ->condition('name', $name) Chris@0: ->condition('value', $this->getLockId()) Chris@0: ->execute(); Chris@0: } Chris@0: catch (\Exception $e) { Chris@0: $this->catchException($e); Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function releaseAll($lock_id = NULL) { Chris@0: // Only attempt to release locks if any were acquired. Chris@0: if (!empty($this->locks)) { Chris@0: $this->locks = []; Chris@0: if (empty($lock_id)) { Chris@0: $lock_id = $this->getLockId(); Chris@0: } Chris@0: $this->database->delete('semaphore') Chris@0: ->condition('value', $lock_id) Chris@0: ->execute(); Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * Check if the semaphore table exists and create it if not. Chris@0: */ Chris@0: protected function ensureTableExists() { Chris@0: try { Chris@0: $database_schema = $this->database->schema(); Chris@0: if (!$database_schema->tableExists(static::TABLE_NAME)) { Chris@0: $schema_definition = $this->schemaDefinition(); Chris@0: $database_schema->createTable(static::TABLE_NAME, $schema_definition); Chris@0: return TRUE; Chris@0: } Chris@0: } Chris@0: // If another process has already created the semaphore table, attempting to Chris@0: // recreate it will throw an exception. In this case just catch the Chris@0: // exception and do nothing. Chris@0: catch (SchemaObjectExistsException $e) { Chris@0: return TRUE; Chris@0: } Chris@0: return FALSE; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Act on an exception when semaphore might be stale. Chris@0: * Chris@0: * If the table does not yet exist, that's fine, but if the table exists and Chris@0: * yet the query failed, then the semaphore is stale and the exception needs Chris@0: * to propagate. Chris@0: * Chris@0: * @param $e Chris@0: * The exception. Chris@0: * Chris@0: * @throws \Exception Chris@0: */ Chris@0: protected function catchException(\Exception $e) { Chris@0: if ($this->database->schema()->tableExists(static::TABLE_NAME)) { Chris@0: throw $e; Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * Normalizes a lock name in order to comply with database limitations. Chris@0: * Chris@0: * @param string $name Chris@0: * The passed in lock name. Chris@0: * Chris@0: * @return string Chris@0: * An ASCII-encoded lock name that is at most 255 characters long. Chris@0: */ Chris@0: protected function normalizeName($name) { Chris@0: // Nothing to do if the name is a US ASCII string of 255 characters or less. Chris@0: $name_is_ascii = mb_check_encoding($name, 'ASCII'); Chris@0: Chris@0: if (strlen($name) <= 255 && $name_is_ascii) { Chris@0: return $name; Chris@0: } Chris@0: // Return a string that uses as much as possible of the original name with Chris@0: // the hash appended. Chris@0: $hash = Crypt::hashBase64($name); Chris@0: Chris@0: if (!$name_is_ascii) { Chris@0: return $hash; Chris@0: } Chris@0: Chris@0: return substr($name, 0, 255 - strlen($hash)) . $hash; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Defines the schema for the semaphore table. Chris@0: * Chris@0: * @internal Chris@0: */ Chris@0: public function schemaDefinition() { Chris@0: return [ Chris@0: 'description' => 'Table for holding semaphores, locks, flags, etc. that cannot be stored as state since they must not be cached.', Chris@0: 'fields' => [ Chris@0: 'name' => [ Chris@0: 'description' => 'Primary Key: Unique name.', Chris@0: 'type' => 'varchar_ascii', Chris@0: 'length' => 255, Chris@0: 'not null' => TRUE, Chris@17: 'default' => '', Chris@0: ], Chris@0: 'value' => [ Chris@0: 'description' => 'A value for the semaphore.', Chris@0: 'type' => 'varchar_ascii', Chris@0: 'length' => 255, Chris@0: 'not null' => TRUE, Chris@17: 'default' => '', Chris@0: ], Chris@0: 'expire' => [ Chris@0: 'description' => 'A Unix timestamp with microseconds indicating when the semaphore should expire.', Chris@0: 'type' => 'float', Chris@0: 'size' => 'big', Chris@17: 'not null' => TRUE, Chris@0: ], Chris@0: ], Chris@0: 'indexes' => [ Chris@0: 'value' => ['value'], Chris@0: 'expire' => ['expire'], Chris@0: ], Chris@0: 'primary key' => ['name'], Chris@0: ]; Chris@0: } Chris@0: Chris@0: }