Chris@0: name = $name; Chris@0: $this->connection = $connection; Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function createItem($data) { Chris@0: $try_again = FALSE; Chris@0: try { Chris@0: $id = $this->doCreateItem($data); Chris@0: } Chris@0: catch (\Exception $e) { Chris@0: // If there was an exception, try to create the table. Chris@0: if (!$try_again = $this->ensureTableExists()) { Chris@0: // If the exception happened for other reason than the missing table, Chris@0: // propagate the exception. Chris@0: throw $e; Chris@0: } Chris@0: } Chris@0: // Now that the table has been created, try again if necessary. Chris@0: if ($try_again) { Chris@0: $id = $this->doCreateItem($data); Chris@0: } Chris@0: return $id; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Adds a queue item and store it directly to the queue. Chris@0: * Chris@0: * @param $data Chris@0: * Arbitrary data to be associated with the new task in the queue. Chris@0: * Chris@0: * @return Chris@0: * A unique ID if the item was successfully created and was (best effort) Chris@0: * added to the queue, otherwise FALSE. We don't guarantee the item was Chris@0: * committed to disk etc, but as far as we know, the item is now in the Chris@0: * queue. Chris@0: */ Chris@0: protected function doCreateItem($data) { Chris@0: $query = $this->connection->insert(static::TABLE_NAME) Chris@0: ->fields([ Chris@0: 'name' => $this->name, Chris@0: 'data' => serialize($data), Chris@0: // We cannot rely on REQUEST_TIME because many items might be created Chris@0: // by a single request which takes longer than 1 second. Chris@0: 'created' => time(), Chris@0: ]); Chris@0: // Return the new serial ID, or FALSE on failure. Chris@0: return $query->execute(); Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function numberOfItems() { Chris@0: try { Chris@17: return (int) $this->connection->query('SELECT COUNT(item_id) FROM {' . static::TABLE_NAME . '} WHERE name = :name', [':name' => $this->name]) Chris@0: ->fetchField(); Chris@0: } Chris@0: catch (\Exception $e) { Chris@0: $this->catchException($e); Chris@0: // If there is no table there cannot be any items. Chris@0: return 0; Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function claimItem($lease_time = 30) { Chris@0: // Claim an item by updating its expire fields. If claim is not successful Chris@0: // another thread may have claimed the item in the meantime. Therefore loop Chris@0: // until an item is successfully claimed or we are reasonably sure there Chris@0: // are no unclaimed items left. Chris@0: while (TRUE) { Chris@0: try { Chris@0: $item = $this->connection->queryRange('SELECT data, created, item_id FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, [':name' => $this->name])->fetchObject(); Chris@0: } Chris@0: catch (\Exception $e) { Chris@0: $this->catchException($e); Chris@0: // If the table does not exist there are no items currently available to Chris@0: // claim. Chris@0: return FALSE; Chris@0: } Chris@0: if ($item) { Chris@0: // Try to update the item. Only one thread can succeed in UPDATEing the Chris@0: // same row. We cannot rely on REQUEST_TIME because items might be Chris@0: // claimed by a single consumer which runs longer than 1 second. If we Chris@0: // continue to use REQUEST_TIME instead of the current time(), we steal Chris@0: // time from the lease, and will tend to reset items before the lease Chris@0: // should really expire. Chris@0: $update = $this->connection->update(static::TABLE_NAME) Chris@0: ->fields([ Chris@0: 'expire' => time() + $lease_time, Chris@0: ]) Chris@0: ->condition('item_id', $item->item_id) Chris@0: ->condition('expire', 0); Chris@0: // If there are affected rows, this update succeeded. Chris@0: if ($update->execute()) { Chris@0: $item->data = unserialize($item->data); Chris@0: return $item; Chris@0: } Chris@0: } Chris@0: else { Chris@0: // No items currently available to claim. Chris@0: return FALSE; Chris@0: } Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function releaseItem($item) { Chris@0: try { Chris@0: $update = $this->connection->update(static::TABLE_NAME) Chris@0: ->fields([ Chris@0: 'expire' => 0, Chris@0: ]) Chris@0: ->condition('item_id', $item->item_id); Chris@0: return $update->execute(); Chris@0: } Chris@0: catch (\Exception $e) { Chris@0: $this->catchException($e); Chris@0: // If the table doesn't exist we should consider the item released. Chris@0: return TRUE; Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function deleteItem($item) { Chris@0: try { Chris@0: $this->connection->delete(static::TABLE_NAME) Chris@0: ->condition('item_id', $item->item_id) Chris@0: ->execute(); Chris@0: } Chris@0: catch (\Exception $e) { Chris@0: $this->catchException($e); Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function createQueue() { Chris@0: // All tasks are stored in a single database table (which is created on Chris@0: // demand) so there is nothing we need to do to create a new queue. Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function deleteQueue() { Chris@0: try { Chris@0: $this->connection->delete(static::TABLE_NAME) Chris@0: ->condition('name', $this->name) Chris@0: ->execute(); Chris@0: } Chris@0: catch (\Exception $e) { Chris@0: $this->catchException($e); Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function garbageCollection() { Chris@0: try { Chris@0: // Clean up the queue for failed batches. Chris@0: $this->connection->delete(static::TABLE_NAME) Chris@0: ->condition('created', REQUEST_TIME - 864000, '<') Chris@0: ->condition('name', 'drupal_batch:%', 'LIKE') Chris@0: ->execute(); Chris@0: Chris@0: // Reset expired items in the default queue implementation table. If that's Chris@0: // not used, this will simply be a no-op. Chris@0: $this->connection->update(static::TABLE_NAME) Chris@0: ->fields([ Chris@0: 'expire' => 0, Chris@0: ]) Chris@0: ->condition('expire', 0, '<>') Chris@0: ->condition('expire', REQUEST_TIME, '<') Chris@0: ->execute(); Chris@0: } Chris@0: catch (\Exception $e) { Chris@0: $this->catchException($e); Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * Check if the table exists and create it if not. Chris@0: */ Chris@0: protected function ensureTableExists() { Chris@0: try { Chris@0: $database_schema = $this->connection->schema(); Chris@0: if (!$database_schema->tableExists(static::TABLE_NAME)) { Chris@0: $schema_definition = $this->schemaDefinition(); Chris@0: $database_schema->createTable(static::TABLE_NAME, $schema_definition); Chris@0: return TRUE; Chris@0: } Chris@0: } Chris@0: // If another process has already created the queue table, attempting to Chris@0: // recreate it will throw an exception. In this case just catch the Chris@0: // exception and do nothing. Chris@0: catch (SchemaObjectExistsException $e) { Chris@0: return TRUE; Chris@0: } Chris@0: return FALSE; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Act on an exception when queue might be stale. Chris@0: * Chris@0: * If the table does not yet exist, that's fine, but if the table exists and Chris@0: * yet the query failed, then the queue is stale and the exception needs to Chris@0: * propagate. Chris@0: * Chris@0: * @param $e Chris@0: * The exception. Chris@0: * Chris@0: * @throws \Exception Chris@0: * If the table exists the exception passed in is rethrown. Chris@0: */ Chris@0: protected function catchException(\Exception $e) { Chris@0: if ($this->connection->schema()->tableExists(static::TABLE_NAME)) { Chris@0: throw $e; Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * Defines the schema for the queue table. Chris@0: * Chris@0: * @internal Chris@0: */ Chris@0: public function schemaDefinition() { Chris@0: return [ Chris@0: 'description' => 'Stores items in queues.', Chris@0: 'fields' => [ Chris@0: 'item_id' => [ Chris@0: 'type' => 'serial', Chris@0: 'unsigned' => TRUE, Chris@0: 'not null' => TRUE, Chris@0: 'description' => 'Primary Key: Unique item ID.', Chris@0: ], Chris@0: 'name' => [ Chris@0: 'type' => 'varchar_ascii', Chris@0: 'length' => 255, Chris@0: 'not null' => TRUE, Chris@0: 'default' => '', Chris@0: 'description' => 'The queue name.', Chris@0: ], Chris@0: 'data' => [ Chris@0: 'type' => 'blob', Chris@0: 'not null' => FALSE, Chris@0: 'size' => 'big', Chris@0: 'serialize' => TRUE, Chris@0: 'description' => 'The arbitrary data for the item.', Chris@0: ], Chris@0: 'expire' => [ Chris@0: 'type' => 'int', Chris@0: 'not null' => TRUE, Chris@0: 'default' => 0, Chris@0: 'description' => 'Timestamp when the claim lease expires on the item.', Chris@0: ], Chris@0: 'created' => [ Chris@0: 'type' => 'int', Chris@0: 'not null' => TRUE, Chris@0: 'default' => 0, Chris@0: 'description' => 'Timestamp when the item was created.', Chris@0: ], Chris@0: ], Chris@0: 'primary key' => ['item_id'], Chris@0: 'indexes' => [ Chris@0: 'name_created' => ['name', 'created'], Chris@0: 'expire' => ['expire'], Chris@0: ], Chris@0: ]; Chris@0: } Chris@0: Chris@0: }