annotate core/lib/Drupal/Core/Queue/DatabaseQueue.php @ 19:fa3358dc1485 tip

Add ndrum files
author Chris Cannam
date Wed, 28 Aug 2019 13:14:47 +0100
parents 129ea1e6d783
children
rev   line source
Chris@0 1 <?php
Chris@0 2
Chris@0 3 namespace Drupal\Core\Queue;
Chris@0 4
Chris@0 5 use Drupal\Core\Database\Connection;
Chris@0 6 use Drupal\Core\Database\SchemaObjectExistsException;
Chris@0 7 use Drupal\Core\DependencyInjection\DependencySerializationTrait;
Chris@0 8
Chris@0 9 /**
Chris@0 10 * Default queue implementation.
Chris@0 11 *
Chris@0 12 * @ingroup queue
Chris@0 13 */
Chris@0 14 class DatabaseQueue implements ReliableQueueInterface, QueueGarbageCollectionInterface {
Chris@0 15
Chris@0 16 use DependencySerializationTrait;
Chris@0 17
Chris@0 18 /**
Chris@0 19 * The database table name.
Chris@0 20 */
Chris@0 21 const TABLE_NAME = 'queue';
Chris@0 22
Chris@0 23 /**
Chris@0 24 * The name of the queue this instance is working with.
Chris@0 25 *
Chris@0 26 * @var string
Chris@0 27 */
Chris@0 28 protected $name;
Chris@0 29
Chris@0 30 /**
Chris@0 31 * The database connection.
Chris@0 32 *
Chris@0 33 * @var \Drupal\Core\Database\Connection
Chris@0 34 */
Chris@0 35 protected $connection;
Chris@0 36
Chris@0 37 /**
Chris@0 38 * Constructs a \Drupal\Core\Queue\DatabaseQueue object.
Chris@0 39 *
Chris@0 40 * @param string $name
Chris@0 41 * The name of the queue.
Chris@0 42 * @param \Drupal\Core\Database\Connection $connection
Chris@0 43 * The Connection object containing the key-value tables.
Chris@0 44 */
Chris@0 45 public function __construct($name, Connection $connection) {
Chris@0 46 $this->name = $name;
Chris@0 47 $this->connection = $connection;
Chris@0 48 }
Chris@0 49
Chris@0 50 /**
Chris@0 51 * {@inheritdoc}
Chris@0 52 */
Chris@0 53 public function createItem($data) {
Chris@0 54 $try_again = FALSE;
Chris@0 55 try {
Chris@0 56 $id = $this->doCreateItem($data);
Chris@0 57 }
Chris@0 58 catch (\Exception $e) {
Chris@0 59 // If there was an exception, try to create the table.
Chris@0 60 if (!$try_again = $this->ensureTableExists()) {
Chris@0 61 // If the exception happened for other reason than the missing table,
Chris@0 62 // propagate the exception.
Chris@0 63 throw $e;
Chris@0 64 }
Chris@0 65 }
Chris@0 66 // Now that the table has been created, try again if necessary.
Chris@0 67 if ($try_again) {
Chris@0 68 $id = $this->doCreateItem($data);
Chris@0 69 }
Chris@0 70 return $id;
Chris@0 71 }
Chris@0 72
Chris@0 73 /**
Chris@0 74 * Adds a queue item and store it directly to the queue.
Chris@0 75 *
Chris@0 76 * @param $data
Chris@0 77 * Arbitrary data to be associated with the new task in the queue.
Chris@0 78 *
Chris@0 79 * @return
Chris@0 80 * A unique ID if the item was successfully created and was (best effort)
Chris@0 81 * added to the queue, otherwise FALSE. We don't guarantee the item was
Chris@0 82 * committed to disk etc, but as far as we know, the item is now in the
Chris@0 83 * queue.
Chris@0 84 */
Chris@0 85 protected function doCreateItem($data) {
Chris@0 86 $query = $this->connection->insert(static::TABLE_NAME)
Chris@0 87 ->fields([
Chris@0 88 'name' => $this->name,
Chris@0 89 'data' => serialize($data),
Chris@0 90 // We cannot rely on REQUEST_TIME because many items might be created
Chris@0 91 // by a single request which takes longer than 1 second.
Chris@0 92 'created' => time(),
Chris@0 93 ]);
Chris@0 94 // Return the new serial ID, or FALSE on failure.
Chris@0 95 return $query->execute();
Chris@0 96 }
Chris@0 97
Chris@0 98 /**
Chris@0 99 * {@inheritdoc}
Chris@0 100 */
Chris@0 101 public function numberOfItems() {
Chris@0 102 try {
Chris@17 103 return (int) $this->connection->query('SELECT COUNT(item_id) FROM {' . static::TABLE_NAME . '} WHERE name = :name', [':name' => $this->name])
Chris@0 104 ->fetchField();
Chris@0 105 }
Chris@0 106 catch (\Exception $e) {
Chris@0 107 $this->catchException($e);
Chris@0 108 // If there is no table there cannot be any items.
Chris@0 109 return 0;
Chris@0 110 }
Chris@0 111 }
Chris@0 112
Chris@0 113 /**
Chris@0 114 * {@inheritdoc}
Chris@0 115 */
Chris@0 116 public function claimItem($lease_time = 30) {
Chris@0 117 // Claim an item by updating its expire fields. If claim is not successful
Chris@0 118 // another thread may have claimed the item in the meantime. Therefore loop
Chris@0 119 // until an item is successfully claimed or we are reasonably sure there
Chris@0 120 // are no unclaimed items left.
Chris@0 121 while (TRUE) {
Chris@0 122 try {
Chris@0 123 $item = $this->connection->queryRange('SELECT data, created, item_id FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, [':name' => $this->name])->fetchObject();
Chris@0 124 }
Chris@0 125 catch (\Exception $e) {
Chris@0 126 $this->catchException($e);
Chris@0 127 // If the table does not exist there are no items currently available to
Chris@0 128 // claim.
Chris@0 129 return FALSE;
Chris@0 130 }
Chris@0 131 if ($item) {
Chris@0 132 // Try to update the item. Only one thread can succeed in UPDATEing the
Chris@0 133 // same row. We cannot rely on REQUEST_TIME because items might be
Chris@0 134 // claimed by a single consumer which runs longer than 1 second. If we
Chris@0 135 // continue to use REQUEST_TIME instead of the current time(), we steal
Chris@0 136 // time from the lease, and will tend to reset items before the lease
Chris@0 137 // should really expire.
Chris@0 138 $update = $this->connection->update(static::TABLE_NAME)
Chris@0 139 ->fields([
Chris@0 140 'expire' => time() + $lease_time,
Chris@0 141 ])
Chris@0 142 ->condition('item_id', $item->item_id)
Chris@0 143 ->condition('expire', 0);
Chris@0 144 // If there are affected rows, this update succeeded.
Chris@0 145 if ($update->execute()) {
Chris@0 146 $item->data = unserialize($item->data);
Chris@0 147 return $item;
Chris@0 148 }
Chris@0 149 }
Chris@0 150 else {
Chris@0 151 // No items currently available to claim.
Chris@0 152 return FALSE;
Chris@0 153 }
Chris@0 154 }
Chris@0 155 }
Chris@0 156
Chris@0 157 /**
Chris@0 158 * {@inheritdoc}
Chris@0 159 */
Chris@0 160 public function releaseItem($item) {
Chris@0 161 try {
Chris@0 162 $update = $this->connection->update(static::TABLE_NAME)
Chris@0 163 ->fields([
Chris@0 164 'expire' => 0,
Chris@0 165 ])
Chris@0 166 ->condition('item_id', $item->item_id);
Chris@0 167 return $update->execute();
Chris@0 168 }
Chris@0 169 catch (\Exception $e) {
Chris@0 170 $this->catchException($e);
Chris@0 171 // If the table doesn't exist we should consider the item released.
Chris@0 172 return TRUE;
Chris@0 173 }
Chris@0 174 }
Chris@0 175
Chris@0 176 /**
Chris@0 177 * {@inheritdoc}
Chris@0 178 */
Chris@0 179 public function deleteItem($item) {
Chris@0 180 try {
Chris@0 181 $this->connection->delete(static::TABLE_NAME)
Chris@0 182 ->condition('item_id', $item->item_id)
Chris@0 183 ->execute();
Chris@0 184 }
Chris@0 185 catch (\Exception $e) {
Chris@0 186 $this->catchException($e);
Chris@0 187 }
Chris@0 188 }
Chris@0 189
Chris@0 190 /**
Chris@0 191 * {@inheritdoc}
Chris@0 192 */
Chris@0 193 public function createQueue() {
Chris@0 194 // All tasks are stored in a single database table (which is created on
Chris@0 195 // demand) so there is nothing we need to do to create a new queue.
Chris@0 196 }
Chris@0 197
Chris@0 198 /**
Chris@0 199 * {@inheritdoc}
Chris@0 200 */
Chris@0 201 public function deleteQueue() {
Chris@0 202 try {
Chris@0 203 $this->connection->delete(static::TABLE_NAME)
Chris@0 204 ->condition('name', $this->name)
Chris@0 205 ->execute();
Chris@0 206 }
Chris@0 207 catch (\Exception $e) {
Chris@0 208 $this->catchException($e);
Chris@0 209 }
Chris@0 210 }
Chris@0 211
Chris@0 212 /**
Chris@0 213 * {@inheritdoc}
Chris@0 214 */
Chris@0 215 public function garbageCollection() {
Chris@0 216 try {
Chris@0 217 // Clean up the queue for failed batches.
Chris@0 218 $this->connection->delete(static::TABLE_NAME)
Chris@0 219 ->condition('created', REQUEST_TIME - 864000, '<')
Chris@0 220 ->condition('name', 'drupal_batch:%', 'LIKE')
Chris@0 221 ->execute();
Chris@0 222
Chris@0 223 // Reset expired items in the default queue implementation table. If that's
Chris@0 224 // not used, this will simply be a no-op.
Chris@0 225 $this->connection->update(static::TABLE_NAME)
Chris@0 226 ->fields([
Chris@0 227 'expire' => 0,
Chris@0 228 ])
Chris@0 229 ->condition('expire', 0, '<>')
Chris@0 230 ->condition('expire', REQUEST_TIME, '<')
Chris@0 231 ->execute();
Chris@0 232 }
Chris@0 233 catch (\Exception $e) {
Chris@0 234 $this->catchException($e);
Chris@0 235 }
Chris@0 236 }
Chris@0 237
Chris@0 238 /**
Chris@0 239 * Check if the table exists and create it if not.
Chris@0 240 */
Chris@0 241 protected function ensureTableExists() {
Chris@0 242 try {
Chris@0 243 $database_schema = $this->connection->schema();
Chris@0 244 if (!$database_schema->tableExists(static::TABLE_NAME)) {
Chris@0 245 $schema_definition = $this->schemaDefinition();
Chris@0 246 $database_schema->createTable(static::TABLE_NAME, $schema_definition);
Chris@0 247 return TRUE;
Chris@0 248 }
Chris@0 249 }
Chris@0 250 // If another process has already created the queue table, attempting to
Chris@0 251 // recreate it will throw an exception. In this case just catch the
Chris@0 252 // exception and do nothing.
Chris@0 253 catch (SchemaObjectExistsException $e) {
Chris@0 254 return TRUE;
Chris@0 255 }
Chris@0 256 return FALSE;
Chris@0 257 }
Chris@0 258
Chris@0 259 /**
Chris@0 260 * Act on an exception when queue might be stale.
Chris@0 261 *
Chris@0 262 * If the table does not yet exist, that's fine, but if the table exists and
Chris@0 263 * yet the query failed, then the queue is stale and the exception needs to
Chris@0 264 * propagate.
Chris@0 265 *
Chris@0 266 * @param $e
Chris@0 267 * The exception.
Chris@0 268 *
Chris@0 269 * @throws \Exception
Chris@0 270 * If the table exists the exception passed in is rethrown.
Chris@0 271 */
Chris@0 272 protected function catchException(\Exception $e) {
Chris@0 273 if ($this->connection->schema()->tableExists(static::TABLE_NAME)) {
Chris@0 274 throw $e;
Chris@0 275 }
Chris@0 276 }
Chris@0 277
Chris@0 278 /**
Chris@0 279 * Defines the schema for the queue table.
Chris@0 280 *
Chris@0 281 * @internal
Chris@0 282 */
Chris@0 283 public function schemaDefinition() {
Chris@0 284 return [
Chris@0 285 'description' => 'Stores items in queues.',
Chris@0 286 'fields' => [
Chris@0 287 'item_id' => [
Chris@0 288 'type' => 'serial',
Chris@0 289 'unsigned' => TRUE,
Chris@0 290 'not null' => TRUE,
Chris@0 291 'description' => 'Primary Key: Unique item ID.',
Chris@0 292 ],
Chris@0 293 'name' => [
Chris@0 294 'type' => 'varchar_ascii',
Chris@0 295 'length' => 255,
Chris@0 296 'not null' => TRUE,
Chris@0 297 'default' => '',
Chris@0 298 'description' => 'The queue name.',
Chris@0 299 ],
Chris@0 300 'data' => [
Chris@0 301 'type' => 'blob',
Chris@0 302 'not null' => FALSE,
Chris@0 303 'size' => 'big',
Chris@0 304 'serialize' => TRUE,
Chris@0 305 'description' => 'The arbitrary data for the item.',
Chris@0 306 ],
Chris@0 307 'expire' => [
Chris@0 308 'type' => 'int',
Chris@0 309 'not null' => TRUE,
Chris@0 310 'default' => 0,
Chris@0 311 'description' => 'Timestamp when the claim lease expires on the item.',
Chris@0 312 ],
Chris@0 313 'created' => [
Chris@0 314 'type' => 'int',
Chris@0 315 'not null' => TRUE,
Chris@0 316 'default' => 0,
Chris@0 317 'description' => 'Timestamp when the item was created.',
Chris@0 318 ],
Chris@0 319 ],
Chris@0 320 'primary key' => ['item_id'],
Chris@0 321 'indexes' => [
Chris@0 322 'name_created' => ['name', 'created'],
Chris@0 323 'expire' => ['expire'],
Chris@0 324 ],
Chris@0 325 ];
Chris@0 326 }
Chris@0 327
Chris@0 328 }