annotate modules/system/system.queue.inc @ 13:134d4b2e75f6

updated quicktabs and google analytics modules
author danieleb <danielebarchiesi@me.com>
date Tue, 29 Oct 2013 13:48:59 +0000
parents ff03f76ab3fe
children
rev   line source
danielebarchiesi@0 1 <?php
danielebarchiesi@0 2
danielebarchiesi@0 3 /**
danielebarchiesi@0 4 * @file
danielebarchiesi@0 5 * Queue functionality.
danielebarchiesi@0 6 */
danielebarchiesi@0 7
danielebarchiesi@0 8 /**
danielebarchiesi@0 9 * @defgroup queue Queue operations
danielebarchiesi@0 10 * @{
danielebarchiesi@0 11 * Queue items to allow later processing.
danielebarchiesi@0 12 *
danielebarchiesi@0 13 * The queue system allows placing items in a queue and processing them later.
danielebarchiesi@0 14 * The system tries to ensure that only one consumer can process an item.
danielebarchiesi@0 15 *
danielebarchiesi@0 16 * Before a queue can be used it needs to be created by
danielebarchiesi@0 17 * DrupalQueueInterface::createQueue().
danielebarchiesi@0 18 *
danielebarchiesi@0 19 * Items can be added to the queue by passing an arbitrary data object to
danielebarchiesi@0 20 * DrupalQueueInterface::createItem().
danielebarchiesi@0 21 *
danielebarchiesi@0 22 * To process an item, call DrupalQueueInterface::claimItem() and specify how
danielebarchiesi@0 23 * long you want to have a lease for working on that item. When finished
danielebarchiesi@0 24 * processing, the item needs to be deleted by calling
danielebarchiesi@0 25 * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
danielebarchiesi@0 26 * made available again by the DrupalQueueInterface implementation once the
danielebarchiesi@0 27 * lease expires. Another consumer will then be able to receive it when calling
danielebarchiesi@0 28 * DrupalQueueInterface::claimItem(). Due to this, the processing code should
danielebarchiesi@0 29 * be aware that an item might be handed over for processing more than once.
danielebarchiesi@0 30 *
danielebarchiesi@0 31 * The $item object used by the DrupalQueueInterface can contain arbitrary
danielebarchiesi@0 32 * metadata depending on the implementation. Systems using the interface should
danielebarchiesi@0 33 * only rely on the data property which will contain the information passed to
danielebarchiesi@0 34 * DrupalQueueInterface::createItem(). The full queue item returned by
danielebarchiesi@0 35 * DrupalQueueInterface::claimItem() needs to be passed to
danielebarchiesi@0 36 * DrupalQueueInterface::deleteItem() once processing is completed.
danielebarchiesi@0 37 *
danielebarchiesi@0 38 * There are two kinds of queue backends available: reliable, which preserves
danielebarchiesi@0 39 * the order of messages and guarantees that every item will be executed at
danielebarchiesi@0 40 * least once. The non-reliable kind only does a best effort to preserve order
danielebarchiesi@0 41 * in messages and to execute them at least once but there is a small chance
danielebarchiesi@0 42 * that some items get lost. For example, some distributed back-ends like
danielebarchiesi@0 43 * Amazon SQS will be managing jobs for a large set of producers and consumers
danielebarchiesi@0 44 * where a strict FIFO ordering will likely not be preserved. Another example
danielebarchiesi@0 45 * would be an in-memory queue backend which might lose items if it crashes.
danielebarchiesi@0 46 * However, such a backend would be able to deal with significantly more writes
danielebarchiesi@0 47 * than a reliable queue and for many tasks this is more important. See
danielebarchiesi@0 48 * aggregator_cron() for an example of how to effectively utilize a
danielebarchiesi@0 49 * non-reliable queue. Another example is doing Twitter statistics -- the small
danielebarchiesi@0 50 * possibility of losing a few items is insignificant next to power of the
danielebarchiesi@0 51 * queue being able to keep up with writes. As described in the processing
danielebarchiesi@0 52 * section, regardless of the queue being reliable or not, the processing code
danielebarchiesi@0 53 * should be aware that an item might be handed over for processing more than
danielebarchiesi@0 54 * once (because the processing code might time out before it finishes).
danielebarchiesi@0 55 */
danielebarchiesi@0 56
danielebarchiesi@0 57 /**
danielebarchiesi@0 58 * Factory class for interacting with queues.
danielebarchiesi@0 59 */
danielebarchiesi@0 60 class DrupalQueue {
danielebarchiesi@0 61 /**
danielebarchiesi@0 62 * Returns the queue object for a given name.
danielebarchiesi@0 63 *
danielebarchiesi@0 64 * The following variables can be set by variable_set or $conf overrides:
danielebarchiesi@0 65 * - queue_class_$name: the class to be used for the queue $name.
danielebarchiesi@0 66 * - queue_default_class: the class to use when queue_class_$name is not
danielebarchiesi@0 67 * defined. Defaults to SystemQueue, a reliable backend using SQL.
danielebarchiesi@0 68 * - queue_default_reliable_class: the class to use when queue_class_$name is
danielebarchiesi@0 69 * not defined and the queue_default_class is not reliable. Defaults to
danielebarchiesi@0 70 * SystemQueue.
danielebarchiesi@0 71 *
danielebarchiesi@0 72 * @param $name
danielebarchiesi@0 73 * Arbitrary string. The name of the queue to work with.
danielebarchiesi@0 74 * @param $reliable
danielebarchiesi@0 75 * TRUE if the ordering of items and guaranteeing every item executes at
danielebarchiesi@0 76 * least once is important, FALSE if scalability is the main concern.
danielebarchiesi@0 77 *
danielebarchiesi@0 78 * @return
danielebarchiesi@0 79 * The queue object for a given name.
danielebarchiesi@0 80 */
danielebarchiesi@0 81 public static function get($name, $reliable = FALSE) {
danielebarchiesi@0 82 static $queues;
danielebarchiesi@0 83 if (!isset($queues[$name])) {
danielebarchiesi@0 84 $class = variable_get('queue_class_' . $name, NULL);
danielebarchiesi@0 85 if (!$class) {
danielebarchiesi@0 86 $class = variable_get('queue_default_class', 'SystemQueue');
danielebarchiesi@0 87 }
danielebarchiesi@0 88 $object = new $class($name);
danielebarchiesi@0 89 if ($reliable && !$object instanceof DrupalReliableQueueInterface) {
danielebarchiesi@0 90 $class = variable_get('queue_default_reliable_class', 'SystemQueue');
danielebarchiesi@0 91 $object = new $class($name);
danielebarchiesi@0 92 }
danielebarchiesi@0 93 $queues[$name] = $object;
danielebarchiesi@0 94 }
danielebarchiesi@0 95 return $queues[$name];
danielebarchiesi@0 96 }
danielebarchiesi@0 97 }
danielebarchiesi@0 98
danielebarchiesi@0 99 interface DrupalQueueInterface {
danielebarchiesi@0 100
danielebarchiesi@0 101 /**
danielebarchiesi@0 102 * Add a queue item and store it directly to the queue.
danielebarchiesi@0 103 *
danielebarchiesi@0 104 * @param $data
danielebarchiesi@0 105 * Arbitrary data to be associated with the new task in the queue.
danielebarchiesi@0 106 * @return
danielebarchiesi@0 107 * TRUE if the item was successfully created and was (best effort) added
danielebarchiesi@0 108 * to the queue, otherwise FALSE. We don't guarantee the item was
danielebarchiesi@0 109 * committed to disk etc, but as far as we know, the item is now in the
danielebarchiesi@0 110 * queue.
danielebarchiesi@0 111 */
danielebarchiesi@0 112 public function createItem($data);
danielebarchiesi@0 113
danielebarchiesi@0 114 /**
danielebarchiesi@0 115 * Retrieve the number of items in the queue.
danielebarchiesi@0 116 *
danielebarchiesi@0 117 * This is intended to provide a "best guess" count of the number of items in
danielebarchiesi@0 118 * the queue. Depending on the implementation and the setup, the accuracy of
danielebarchiesi@0 119 * the results of this function may vary.
danielebarchiesi@0 120 *
danielebarchiesi@0 121 * e.g. On a busy system with a large number of consumers and items, the
danielebarchiesi@0 122 * result might only be valid for a fraction of a second and not provide an
danielebarchiesi@0 123 * accurate representation.
danielebarchiesi@0 124 *
danielebarchiesi@0 125 * @return
danielebarchiesi@0 126 * An integer estimate of the number of items in the queue.
danielebarchiesi@0 127 */
danielebarchiesi@0 128 public function numberOfItems();
danielebarchiesi@0 129
danielebarchiesi@0 130 /**
danielebarchiesi@0 131 * Claim an item in the queue for processing.
danielebarchiesi@0 132 *
danielebarchiesi@0 133 * @param $lease_time
danielebarchiesi@0 134 * How long the processing is expected to take in seconds, defaults to an
danielebarchiesi@0 135 * hour. After this lease expires, the item will be reset and another
danielebarchiesi@0 136 * consumer can claim the item. For idempotent tasks (which can be run
danielebarchiesi@0 137 * multiple times without side effects), shorter lease times would result
danielebarchiesi@0 138 * in lower latency in case a consumer fails. For tasks that should not be
danielebarchiesi@0 139 * run more than once (non-idempotent), a larger lease time will make it
danielebarchiesi@0 140 * more rare for a given task to run multiple times in cases of failure,
danielebarchiesi@0 141 * at the cost of higher latency.
danielebarchiesi@0 142 * @return
danielebarchiesi@0 143 * On success we return an item object. If the queue is unable to claim an
danielebarchiesi@0 144 * item it returns false. This implies a best effort to retrieve an item
danielebarchiesi@0 145 * and either the queue is empty or there is some other non-recoverable
danielebarchiesi@0 146 * problem.
danielebarchiesi@0 147 */
danielebarchiesi@0 148 public function claimItem($lease_time = 3600);
danielebarchiesi@0 149
danielebarchiesi@0 150 /**
danielebarchiesi@0 151 * Delete a finished item from the queue.
danielebarchiesi@0 152 *
danielebarchiesi@0 153 * @param $item
danielebarchiesi@0 154 * The item returned by DrupalQueueInterface::claimItem().
danielebarchiesi@0 155 */
danielebarchiesi@0 156 public function deleteItem($item);
danielebarchiesi@0 157
danielebarchiesi@0 158 /**
danielebarchiesi@0 159 * Release an item that the worker could not process, so another
danielebarchiesi@0 160 * worker can come in and process it before the timeout expires.
danielebarchiesi@0 161 *
danielebarchiesi@0 162 * @param $item
danielebarchiesi@0 163 * @return boolean
danielebarchiesi@0 164 */
danielebarchiesi@0 165 public function releaseItem($item);
danielebarchiesi@0 166
danielebarchiesi@0 167 /**
danielebarchiesi@0 168 * Create a queue.
danielebarchiesi@0 169 *
danielebarchiesi@0 170 * Called during installation and should be used to perform any necessary
danielebarchiesi@0 171 * initialization operations. This should not be confused with the
danielebarchiesi@0 172 * constructor for these objects, which is called every time an object is
danielebarchiesi@0 173 * instantiated to operate on a queue. This operation is only needed the
danielebarchiesi@0 174 * first time a given queue is going to be initialized (for example, to make
danielebarchiesi@0 175 * a new database table or directory to hold tasks for the queue -- it
danielebarchiesi@0 176 * depends on the queue implementation if this is necessary at all).
danielebarchiesi@0 177 */
danielebarchiesi@0 178 public function createQueue();
danielebarchiesi@0 179
danielebarchiesi@0 180 /**
danielebarchiesi@0 181 * Delete a queue and every item in the queue.
danielebarchiesi@0 182 */
danielebarchiesi@0 183 public function deleteQueue();
danielebarchiesi@0 184 }
danielebarchiesi@0 185
danielebarchiesi@0 186 /**
danielebarchiesi@0 187 * Reliable queue interface.
danielebarchiesi@0 188 *
danielebarchiesi@0 189 * Classes implementing this interface preserve the order of messages and
danielebarchiesi@0 190 * guarantee that every item will be executed at least once.
danielebarchiesi@0 191 */
danielebarchiesi@0 192 interface DrupalReliableQueueInterface extends DrupalQueueInterface {
danielebarchiesi@0 193 }
danielebarchiesi@0 194
danielebarchiesi@0 195 /**
danielebarchiesi@0 196 * Default queue implementation.
danielebarchiesi@0 197 */
danielebarchiesi@0 198 class SystemQueue implements DrupalReliableQueueInterface {
danielebarchiesi@0 199 /**
danielebarchiesi@0 200 * The name of the queue this instance is working with.
danielebarchiesi@0 201 *
danielebarchiesi@0 202 * @var string
danielebarchiesi@0 203 */
danielebarchiesi@0 204 protected $name;
danielebarchiesi@0 205
danielebarchiesi@0 206 public function __construct($name) {
danielebarchiesi@0 207 $this->name = $name;
danielebarchiesi@0 208 }
danielebarchiesi@0 209
danielebarchiesi@0 210 public function createItem($data) {
danielebarchiesi@0 211 // During a Drupal 6.x to 7.x update, drupal_get_schema() does not contain
danielebarchiesi@0 212 // the queue table yet, so we cannot rely on drupal_write_record().
danielebarchiesi@0 213 $query = db_insert('queue')
danielebarchiesi@0 214 ->fields(array(
danielebarchiesi@0 215 'name' => $this->name,
danielebarchiesi@0 216 'data' => serialize($data),
danielebarchiesi@0 217 // We cannot rely on REQUEST_TIME because many items might be created
danielebarchiesi@0 218 // by a single request which takes longer than 1 second.
danielebarchiesi@0 219 'created' => time(),
danielebarchiesi@0 220 ));
danielebarchiesi@0 221 return (bool) $query->execute();
danielebarchiesi@0 222 }
danielebarchiesi@0 223
danielebarchiesi@0 224 public function numberOfItems() {
danielebarchiesi@0 225 return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
danielebarchiesi@0 226 }
danielebarchiesi@0 227
danielebarchiesi@0 228 public function claimItem($lease_time = 30) {
danielebarchiesi@0 229 // Claim an item by updating its expire fields. If claim is not successful
danielebarchiesi@0 230 // another thread may have claimed the item in the meantime. Therefore loop
danielebarchiesi@0 231 // until an item is successfully claimed or we are reasonably sure there
danielebarchiesi@0 232 // are no unclaimed items left.
danielebarchiesi@0 233 while (TRUE) {
danielebarchiesi@0 234 $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject();
danielebarchiesi@0 235 if ($item) {
danielebarchiesi@0 236 // Try to update the item. Only one thread can succeed in UPDATEing the
danielebarchiesi@0 237 // same row. We cannot rely on REQUEST_TIME because items might be
danielebarchiesi@0 238 // claimed by a single consumer which runs longer than 1 second. If we
danielebarchiesi@0 239 // continue to use REQUEST_TIME instead of the current time(), we steal
danielebarchiesi@0 240 // time from the lease, and will tend to reset items before the lease
danielebarchiesi@0 241 // should really expire.
danielebarchiesi@0 242 $update = db_update('queue')
danielebarchiesi@0 243 ->fields(array(
danielebarchiesi@0 244 'expire' => time() + $lease_time,
danielebarchiesi@0 245 ))
danielebarchiesi@0 246 ->condition('item_id', $item->item_id)
danielebarchiesi@0 247 ->condition('expire', 0);
danielebarchiesi@0 248 // If there are affected rows, this update succeeded.
danielebarchiesi@0 249 if ($update->execute()) {
danielebarchiesi@0 250 $item->data = unserialize($item->data);
danielebarchiesi@0 251 return $item;
danielebarchiesi@0 252 }
danielebarchiesi@0 253 }
danielebarchiesi@0 254 else {
danielebarchiesi@0 255 // No items currently available to claim.
danielebarchiesi@0 256 return FALSE;
danielebarchiesi@0 257 }
danielebarchiesi@0 258 }
danielebarchiesi@0 259 }
danielebarchiesi@0 260
danielebarchiesi@0 261 public function releaseItem($item) {
danielebarchiesi@0 262 $update = db_update('queue')
danielebarchiesi@0 263 ->fields(array(
danielebarchiesi@0 264 'expire' => 0,
danielebarchiesi@0 265 ))
danielebarchiesi@0 266 ->condition('item_id', $item->item_id);
danielebarchiesi@0 267 return $update->execute();
danielebarchiesi@0 268 }
danielebarchiesi@0 269
danielebarchiesi@0 270 public function deleteItem($item) {
danielebarchiesi@0 271 db_delete('queue')
danielebarchiesi@0 272 ->condition('item_id', $item->item_id)
danielebarchiesi@0 273 ->execute();
danielebarchiesi@0 274 }
danielebarchiesi@0 275
danielebarchiesi@0 276 public function createQueue() {
danielebarchiesi@0 277 // All tasks are stored in a single database table (which is created when
danielebarchiesi@0 278 // Drupal is first installed) so there is nothing we need to do to create
danielebarchiesi@0 279 // a new queue.
danielebarchiesi@0 280 }
danielebarchiesi@0 281
danielebarchiesi@0 282 public function deleteQueue() {
danielebarchiesi@0 283 db_delete('queue')
danielebarchiesi@0 284 ->condition('name', $this->name)
danielebarchiesi@0 285 ->execute();
danielebarchiesi@0 286 }
danielebarchiesi@0 287 }
danielebarchiesi@0 288
danielebarchiesi@0 289 /**
danielebarchiesi@0 290 * Static queue implementation.
danielebarchiesi@0 291 *
danielebarchiesi@0 292 * This allows "undelayed" variants of processes relying on the Queue
danielebarchiesi@0 293 * interface. The queue data resides in memory. It should only be used for
danielebarchiesi@0 294 * items that will be queued and dequeued within a given page request.
danielebarchiesi@0 295 */
danielebarchiesi@0 296 class MemoryQueue implements DrupalQueueInterface {
danielebarchiesi@0 297 /**
danielebarchiesi@0 298 * The queue data.
danielebarchiesi@0 299 *
danielebarchiesi@0 300 * @var array
danielebarchiesi@0 301 */
danielebarchiesi@0 302 protected $queue;
danielebarchiesi@0 303
danielebarchiesi@0 304 /**
danielebarchiesi@0 305 * Counter for item ids.
danielebarchiesi@0 306 *
danielebarchiesi@0 307 * @var int
danielebarchiesi@0 308 */
danielebarchiesi@0 309 protected $id_sequence;
danielebarchiesi@0 310
danielebarchiesi@0 311 /**
danielebarchiesi@0 312 * Start working with a queue.
danielebarchiesi@0 313 *
danielebarchiesi@0 314 * @param $name
danielebarchiesi@0 315 * Arbitrary string. The name of the queue to work with.
danielebarchiesi@0 316 */
danielebarchiesi@0 317 public function __construct($name) {
danielebarchiesi@0 318 $this->queue = array();
danielebarchiesi@0 319 $this->id_sequence = 0;
danielebarchiesi@0 320 }
danielebarchiesi@0 321
danielebarchiesi@0 322 public function createItem($data) {
danielebarchiesi@0 323 $item = new stdClass();
danielebarchiesi@0 324 $item->item_id = $this->id_sequence++;
danielebarchiesi@0 325 $item->data = $data;
danielebarchiesi@0 326 $item->created = time();
danielebarchiesi@0 327 $item->expire = 0;
danielebarchiesi@0 328 $this->queue[$item->item_id] = $item;
danielebarchiesi@0 329 }
danielebarchiesi@0 330
danielebarchiesi@0 331 public function numberOfItems() {
danielebarchiesi@0 332 return count($this->queue);
danielebarchiesi@0 333 }
danielebarchiesi@0 334
danielebarchiesi@0 335 public function claimItem($lease_time = 30) {
danielebarchiesi@0 336 foreach ($this->queue as $key => $item) {
danielebarchiesi@0 337 if ($item->expire == 0) {
danielebarchiesi@0 338 $item->expire = time() + $lease_time;
danielebarchiesi@0 339 $this->queue[$key] = $item;
danielebarchiesi@0 340 return $item;
danielebarchiesi@0 341 }
danielebarchiesi@0 342 }
danielebarchiesi@0 343 return FALSE;
danielebarchiesi@0 344 }
danielebarchiesi@0 345
danielebarchiesi@0 346 public function deleteItem($item) {
danielebarchiesi@0 347 unset($this->queue[$item->item_id]);
danielebarchiesi@0 348 }
danielebarchiesi@0 349
danielebarchiesi@0 350 public function releaseItem($item) {
danielebarchiesi@0 351 if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
danielebarchiesi@0 352 $this->queue[$item->item_id]->expire = 0;
danielebarchiesi@0 353 return TRUE;
danielebarchiesi@0 354 }
danielebarchiesi@0 355 return FALSE;
danielebarchiesi@0 356 }
danielebarchiesi@0 357
danielebarchiesi@0 358 public function createQueue() {
danielebarchiesi@0 359 // Nothing needed here.
danielebarchiesi@0 360 }
danielebarchiesi@0 361
danielebarchiesi@0 362 public function deleteQueue() {
danielebarchiesi@0 363 $this->queue = array();
danielebarchiesi@0 364 $this->id_sequence = 0;
danielebarchiesi@0 365 }
danielebarchiesi@0 366 }
danielebarchiesi@0 367
danielebarchiesi@0 368 /**
danielebarchiesi@0 369 * @} End of "defgroup queue".
danielebarchiesi@0 370 */