Mercurial > hg > isophonics-drupal-site
comparison core/lib/Drupal/Core/Queue/DatabaseQueue.php @ 0:4c8ae668cc8c
Initial import (non-working)
author | Chris Cannam |
---|---|
date | Wed, 29 Nov 2017 16:09:58 +0000 |
parents | |
children | 129ea1e6d783 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4c8ae668cc8c |
---|---|
1 <?php | |
2 | |
3 namespace Drupal\Core\Queue; | |
4 | |
5 use Drupal\Core\Database\Connection; | |
6 use Drupal\Core\Database\SchemaObjectExistsException; | |
7 use Drupal\Core\DependencyInjection\DependencySerializationTrait; | |
8 | |
9 /** | |
10 * Default queue implementation. | |
11 * | |
12 * @ingroup queue | |
13 */ | |
14 class DatabaseQueue implements ReliableQueueInterface, QueueGarbageCollectionInterface { | |
15 | |
16 use DependencySerializationTrait; | |
17 | |
18 /** | |
19 * The database table name. | |
20 */ | |
21 const TABLE_NAME = 'queue'; | |
22 | |
23 /** | |
24 * The name of the queue this instance is working with. | |
25 * | |
26 * @var string | |
27 */ | |
28 protected $name; | |
29 | |
30 /** | |
31 * The database connection. | |
32 * | |
33 * @var \Drupal\Core\Database\Connection | |
34 */ | |
35 protected $connection; | |
36 | |
37 /** | |
38 * Constructs a \Drupal\Core\Queue\DatabaseQueue object. | |
39 * | |
40 * @param string $name | |
41 * The name of the queue. | |
42 * @param \Drupal\Core\Database\Connection $connection | |
43 * The Connection object containing the key-value tables. | |
44 */ | |
45 public function __construct($name, Connection $connection) { | |
46 $this->name = $name; | |
47 $this->connection = $connection; | |
48 } | |
49 | |
50 /** | |
51 * {@inheritdoc} | |
52 */ | |
53 public function createItem($data) { | |
54 $try_again = FALSE; | |
55 try { | |
56 $id = $this->doCreateItem($data); | |
57 } | |
58 catch (\Exception $e) { | |
59 // If there was an exception, try to create the table. | |
60 if (!$try_again = $this->ensureTableExists()) { | |
61 // If the exception happened for other reason than the missing table, | |
62 // propagate the exception. | |
63 throw $e; | |
64 } | |
65 } | |
66 // Now that the table has been created, try again if necessary. | |
67 if ($try_again) { | |
68 $id = $this->doCreateItem($data); | |
69 } | |
70 return $id; | |
71 } | |
72 | |
73 /** | |
74 * Adds a queue item and store it directly to the queue. | |
75 * | |
76 * @param $data | |
77 * Arbitrary data to be associated with the new task in the queue. | |
78 * | |
79 * @return | |
80 * A unique ID if the item was successfully created and was (best effort) | |
81 * added to the queue, otherwise FALSE. We don't guarantee the item was | |
82 * committed to disk etc, but as far as we know, the item is now in the | |
83 * queue. | |
84 */ | |
85 protected function doCreateItem($data) { | |
86 $query = $this->connection->insert(static::TABLE_NAME) | |
87 ->fields([ | |
88 'name' => $this->name, | |
89 'data' => serialize($data), | |
90 // We cannot rely on REQUEST_TIME because many items might be created | |
91 // by a single request which takes longer than 1 second. | |
92 'created' => time(), | |
93 ]); | |
94 // Return the new serial ID, or FALSE on failure. | |
95 return $query->execute(); | |
96 } | |
97 | |
98 /** | |
99 * {@inheritdoc} | |
100 */ | |
101 public function numberOfItems() { | |
102 try { | |
103 return $this->connection->query('SELECT COUNT(item_id) FROM {' . static::TABLE_NAME . '} WHERE name = :name', [':name' => $this->name]) | |
104 ->fetchField(); | |
105 } | |
106 catch (\Exception $e) { | |
107 $this->catchException($e); | |
108 // If there is no table there cannot be any items. | |
109 return 0; | |
110 } | |
111 } | |
112 | |
113 /** | |
114 * {@inheritdoc} | |
115 */ | |
116 public function claimItem($lease_time = 30) { | |
117 // Claim an item by updating its expire fields. If claim is not successful | |
118 // another thread may have claimed the item in the meantime. Therefore loop | |
119 // until an item is successfully claimed or we are reasonably sure there | |
120 // are no unclaimed items left. | |
121 while (TRUE) { | |
122 try { | |
123 $item = $this->connection->queryRange('SELECT data, created, item_id FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, [':name' => $this->name])->fetchObject(); | |
124 } | |
125 catch (\Exception $e) { | |
126 $this->catchException($e); | |
127 // If the table does not exist there are no items currently available to | |
128 // claim. | |
129 return FALSE; | |
130 } | |
131 if ($item) { | |
132 // Try to update the item. Only one thread can succeed in UPDATEing the | |
133 // same row. We cannot rely on REQUEST_TIME because items might be | |
134 // claimed by a single consumer which runs longer than 1 second. If we | |
135 // continue to use REQUEST_TIME instead of the current time(), we steal | |
136 // time from the lease, and will tend to reset items before the lease | |
137 // should really expire. | |
138 $update = $this->connection->update(static::TABLE_NAME) | |
139 ->fields([ | |
140 'expire' => time() + $lease_time, | |
141 ]) | |
142 ->condition('item_id', $item->item_id) | |
143 ->condition('expire', 0); | |
144 // If there are affected rows, this update succeeded. | |
145 if ($update->execute()) { | |
146 $item->data = unserialize($item->data); | |
147 return $item; | |
148 } | |
149 } | |
150 else { | |
151 // No items currently available to claim. | |
152 return FALSE; | |
153 } | |
154 } | |
155 } | |
156 | |
157 /** | |
158 * {@inheritdoc} | |
159 */ | |
160 public function releaseItem($item) { | |
161 try { | |
162 $update = $this->connection->update(static::TABLE_NAME) | |
163 ->fields([ | |
164 'expire' => 0, | |
165 ]) | |
166 ->condition('item_id', $item->item_id); | |
167 return $update->execute(); | |
168 } | |
169 catch (\Exception $e) { | |
170 $this->catchException($e); | |
171 // If the table doesn't exist we should consider the item released. | |
172 return TRUE; | |
173 } | |
174 } | |
175 | |
176 /** | |
177 * {@inheritdoc} | |
178 */ | |
179 public function deleteItem($item) { | |
180 try { | |
181 $this->connection->delete(static::TABLE_NAME) | |
182 ->condition('item_id', $item->item_id) | |
183 ->execute(); | |
184 } | |
185 catch (\Exception $e) { | |
186 $this->catchException($e); | |
187 } | |
188 } | |
189 | |
190 /** | |
191 * {@inheritdoc} | |
192 */ | |
193 public function createQueue() { | |
194 // All tasks are stored in a single database table (which is created on | |
195 // demand) so there is nothing we need to do to create a new queue. | |
196 } | |
197 | |
198 /** | |
199 * {@inheritdoc} | |
200 */ | |
201 public function deleteQueue() { | |
202 try { | |
203 $this->connection->delete(static::TABLE_NAME) | |
204 ->condition('name', $this->name) | |
205 ->execute(); | |
206 } | |
207 catch (\Exception $e) { | |
208 $this->catchException($e); | |
209 } | |
210 } | |
211 | |
212 /** | |
213 * {@inheritdoc} | |
214 */ | |
215 public function garbageCollection() { | |
216 try { | |
217 // Clean up the queue for failed batches. | |
218 $this->connection->delete(static::TABLE_NAME) | |
219 ->condition('created', REQUEST_TIME - 864000, '<') | |
220 ->condition('name', 'drupal_batch:%', 'LIKE') | |
221 ->execute(); | |
222 | |
223 // Reset expired items in the default queue implementation table. If that's | |
224 // not used, this will simply be a no-op. | |
225 $this->connection->update(static::TABLE_NAME) | |
226 ->fields([ | |
227 'expire' => 0, | |
228 ]) | |
229 ->condition('expire', 0, '<>') | |
230 ->condition('expire', REQUEST_TIME, '<') | |
231 ->execute(); | |
232 } | |
233 catch (\Exception $e) { | |
234 $this->catchException($e); | |
235 } | |
236 } | |
237 | |
238 /** | |
239 * Check if the table exists and create it if not. | |
240 */ | |
241 protected function ensureTableExists() { | |
242 try { | |
243 $database_schema = $this->connection->schema(); | |
244 if (!$database_schema->tableExists(static::TABLE_NAME)) { | |
245 $schema_definition = $this->schemaDefinition(); | |
246 $database_schema->createTable(static::TABLE_NAME, $schema_definition); | |
247 return TRUE; | |
248 } | |
249 } | |
250 // If another process has already created the queue table, attempting to | |
251 // recreate it will throw an exception. In this case just catch the | |
252 // exception and do nothing. | |
253 catch (SchemaObjectExistsException $e) { | |
254 return TRUE; | |
255 } | |
256 return FALSE; | |
257 } | |
258 | |
259 /** | |
260 * Act on an exception when queue might be stale. | |
261 * | |
262 * If the table does not yet exist, that's fine, but if the table exists and | |
263 * yet the query failed, then the queue is stale and the exception needs to | |
264 * propagate. | |
265 * | |
266 * @param $e | |
267 * The exception. | |
268 * | |
269 * @throws \Exception | |
270 * If the table exists the exception passed in is rethrown. | |
271 */ | |
272 protected function catchException(\Exception $e) { | |
273 if ($this->connection->schema()->tableExists(static::TABLE_NAME)) { | |
274 throw $e; | |
275 } | |
276 } | |
277 | |
278 /** | |
279 * Defines the schema for the queue table. | |
280 * | |
281 * @internal | |
282 */ | |
283 public function schemaDefinition() { | |
284 return [ | |
285 'description' => 'Stores items in queues.', | |
286 'fields' => [ | |
287 'item_id' => [ | |
288 'type' => 'serial', | |
289 'unsigned' => TRUE, | |
290 'not null' => TRUE, | |
291 'description' => 'Primary Key: Unique item ID.', | |
292 ], | |
293 'name' => [ | |
294 'type' => 'varchar_ascii', | |
295 'length' => 255, | |
296 'not null' => TRUE, | |
297 'default' => '', | |
298 'description' => 'The queue name.', | |
299 ], | |
300 'data' => [ | |
301 'type' => 'blob', | |
302 'not null' => FALSE, | |
303 'size' => 'big', | |
304 'serialize' => TRUE, | |
305 'description' => 'The arbitrary data for the item.', | |
306 ], | |
307 'expire' => [ | |
308 'type' => 'int', | |
309 'not null' => TRUE, | |
310 'default' => 0, | |
311 'description' => 'Timestamp when the claim lease expires on the item.', | |
312 ], | |
313 'created' => [ | |
314 'type' => 'int', | |
315 'not null' => TRUE, | |
316 'default' => 0, | |
317 'description' => 'Timestamp when the item was created.', | |
318 ], | |
319 ], | |
320 'primary key' => ['item_id'], | |
321 'indexes' => [ | |
322 'name_created' => ['name', 'created'], | |
323 'expire' => ['expire'], | |
324 ], | |
325 ]; | |
326 } | |
327 | |
328 } |