Mercurial > hg > cmmr2012-drupal-site
comparison core/modules/migrate/src/MigrateExecutable.php @ 0:c75dbcec494b
Initial commit from drush-created site
author | Chris Cannam |
---|---|
date | Thu, 05 Jul 2018 14:24:15 +0000 |
parents | |
children | a9cd425dd02b |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:c75dbcec494b |
---|---|
1 <?php | |
2 | |
3 namespace Drupal\migrate; | |
4 | |
5 use Drupal\Component\Utility\Bytes; | |
6 use Drupal\Core\Utility\Error; | |
7 use Drupal\Core\StringTranslation\StringTranslationTrait; | |
8 use Drupal\migrate\Event\MigrateEvents; | |
9 use Drupal\migrate\Event\MigrateImportEvent; | |
10 use Drupal\migrate\Event\MigratePostRowSaveEvent; | |
11 use Drupal\migrate\Event\MigratePreRowSaveEvent; | |
12 use Drupal\migrate\Event\MigrateRollbackEvent; | |
13 use Drupal\migrate\Event\MigrateRowDeleteEvent; | |
14 use Drupal\migrate\Exception\RequirementsException; | |
15 use Drupal\migrate\Plugin\MigrateIdMapInterface; | |
16 use Drupal\migrate\Plugin\MigrationInterface; | |
17 use Symfony\Component\EventDispatcher\EventDispatcherInterface; | |
18 | |
19 /** | |
20 * Defines a migrate executable class. | |
21 */ | |
22 class MigrateExecutable implements MigrateExecutableInterface { | |
23 use StringTranslationTrait; | |
24 | |
25 /** | |
26 * The configuration of the migration to do. | |
27 * | |
28 * @var \Drupal\migrate\Plugin\MigrationInterface | |
29 */ | |
30 protected $migration; | |
31 | |
32 /** | |
33 * Status of one row. | |
34 * | |
35 * The value is a MigrateIdMapInterface::STATUS_* constant, for example: | |
36 * STATUS_IMPORTED. | |
37 * | |
38 * @var int | |
39 */ | |
40 protected $sourceRowStatus; | |
41 | |
42 /** | |
43 * The ratio of the memory limit at which an operation will be interrupted. | |
44 * | |
45 * @var float | |
46 */ | |
47 protected $memoryThreshold = 0.85; | |
48 | |
49 /** | |
50 * The PHP memory_limit expressed in bytes. | |
51 * | |
52 * @var int | |
53 */ | |
54 protected $memoryLimit; | |
55 | |
56 /** | |
57 * The configuration values of the source. | |
58 * | |
59 * @var array | |
60 */ | |
61 protected $sourceIdValues; | |
62 | |
63 /** | |
64 * An array of counts. Initially used for cache hit/miss tracking. | |
65 * | |
66 * @var array | |
67 */ | |
68 protected $counts = []; | |
69 | |
70 /** | |
71 * The source. | |
72 * | |
73 * @var \Drupal\migrate\Plugin\MigrateSourceInterface | |
74 */ | |
75 protected $source; | |
76 | |
77 /** | |
78 * The event dispatcher. | |
79 * | |
80 * @var \Symfony\Component\EventDispatcher\EventDispatcherInterface | |
81 */ | |
82 protected $eventDispatcher; | |
83 | |
84 /** | |
85 * Migration message service. | |
86 * | |
87 * @todo https://www.drupal.org/node/2822663 Make this protected. | |
88 * | |
89 * @var \Drupal\migrate\MigrateMessageInterface | |
90 */ | |
91 public $message; | |
92 | |
93 /** | |
94 * Constructs a MigrateExecutable and verifies and sets the memory limit. | |
95 * | |
96 * @param \Drupal\migrate\Plugin\MigrationInterface $migration | |
97 * The migration to run. | |
98 * @param \Drupal\migrate\MigrateMessageInterface $message | |
99 * (optional) The migrate message service. | |
100 * @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $event_dispatcher | |
101 * (optional) The event dispatcher. | |
102 * | |
103 * @throws \Drupal\migrate\MigrateException | |
104 */ | |
105 public function __construct(MigrationInterface $migration, MigrateMessageInterface $message = NULL, EventDispatcherInterface $event_dispatcher = NULL) { | |
106 $this->migration = $migration; | |
107 $this->message = $message ?: new MigrateMessage(); | |
108 $this->migration->getIdMap()->setMessage($this->message); | |
109 $this->eventDispatcher = $event_dispatcher; | |
110 // Record the memory limit in bytes | |
111 $limit = trim(ini_get('memory_limit')); | |
112 if ($limit == '-1') { | |
113 $this->memoryLimit = PHP_INT_MAX; | |
114 } | |
115 else { | |
116 $this->memoryLimit = Bytes::toInt($limit); | |
117 } | |
118 } | |
119 | |
120 /** | |
121 * Returns the source. | |
122 * | |
123 * Makes sure source is initialized based on migration settings. | |
124 * | |
125 * @return \Drupal\migrate\Plugin\MigrateSourceInterface | |
126 * The source. | |
127 */ | |
128 protected function getSource() { | |
129 if (!isset($this->source)) { | |
130 $this->source = $this->migration->getSourcePlugin(); | |
131 } | |
132 return $this->source; | |
133 } | |
134 | |
135 /** | |
136 * Gets the event dispatcher. | |
137 * | |
138 * @return \Symfony\Component\EventDispatcher\EventDispatcherInterface | |
139 */ | |
140 protected function getEventDispatcher() { | |
141 if (!$this->eventDispatcher) { | |
142 $this->eventDispatcher = \Drupal::service('event_dispatcher'); | |
143 } | |
144 return $this->eventDispatcher; | |
145 } | |
146 | |
147 /** | |
148 * {@inheritdoc} | |
149 */ | |
150 public function import() { | |
151 // Only begin the import operation if the migration is currently idle. | |
152 if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) { | |
153 $this->message->display($this->t('Migration @id is busy with another operation: @status', | |
154 [ | |
155 '@id' => $this->migration->id(), | |
156 '@status' => $this->t($this->migration->getStatusLabel()), | |
157 ]), 'error'); | |
158 return MigrationInterface::RESULT_FAILED; | |
159 } | |
160 $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_IMPORT, new MigrateImportEvent($this->migration, $this->message)); | |
161 | |
162 // Knock off migration if the requirements haven't been met. | |
163 try { | |
164 $this->migration->checkRequirements(); | |
165 } | |
166 catch (RequirementsException $e) { | |
167 $this->message->display( | |
168 $this->t( | |
169 'Migration @id did not meet the requirements. @message @requirements', | |
170 [ | |
171 '@id' => $this->migration->id(), | |
172 '@message' => $e->getMessage(), | |
173 '@requirements' => $e->getRequirementsString(), | |
174 ] | |
175 ), | |
176 'error' | |
177 ); | |
178 | |
179 return MigrationInterface::RESULT_FAILED; | |
180 } | |
181 | |
182 $this->migration->setStatus(MigrationInterface::STATUS_IMPORTING); | |
183 $return = MigrationInterface::RESULT_COMPLETED; | |
184 $source = $this->getSource(); | |
185 $id_map = $this->migration->getIdMap(); | |
186 | |
187 try { | |
188 $source->rewind(); | |
189 } | |
190 catch (\Exception $e) { | |
191 $this->message->display( | |
192 $this->t('Migration failed with source plugin exception: @e', ['@e' => $e->getMessage()]), 'error'); | |
193 $this->migration->setStatus(MigrationInterface::STATUS_IDLE); | |
194 return MigrationInterface::RESULT_FAILED; | |
195 } | |
196 | |
197 $destination = $this->migration->getDestinationPlugin(); | |
198 while ($source->valid()) { | |
199 $row = $source->current(); | |
200 $this->sourceIdValues = $row->getSourceIdValues(); | |
201 | |
202 try { | |
203 $this->processRow($row); | |
204 $save = TRUE; | |
205 } | |
206 catch (MigrateException $e) { | |
207 $this->migration->getIdMap()->saveIdMapping($row, [], $e->getStatus()); | |
208 $this->saveMessage($e->getMessage(), $e->getLevel()); | |
209 $save = FALSE; | |
210 } | |
211 catch (MigrateSkipRowException $e) { | |
212 if ($e->getSaveToMap()) { | |
213 $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED); | |
214 } | |
215 if ($message = trim($e->getMessage())) { | |
216 $this->saveMessage($message, MigrationInterface::MESSAGE_INFORMATIONAL); | |
217 } | |
218 $save = FALSE; | |
219 } | |
220 | |
221 if ($save) { | |
222 try { | |
223 $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROW_SAVE, new MigratePreRowSaveEvent($this->migration, $this->message, $row)); | |
224 $destination_id_values = $destination->import($row, $id_map->lookupDestinationId($this->sourceIdValues)); | |
225 $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROW_SAVE, new MigratePostRowSaveEvent($this->migration, $this->message, $row, $destination_id_values)); | |
226 if ($destination_id_values) { | |
227 // We do not save an idMap entry for config. | |
228 if ($destination_id_values !== TRUE) { | |
229 $id_map->saveIdMapping($row, $destination_id_values, $this->sourceRowStatus, $destination->rollbackAction()); | |
230 } | |
231 } | |
232 else { | |
233 $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED); | |
234 if (!$id_map->messageCount()) { | |
235 $message = $this->t('New object was not saved, no error provided'); | |
236 $this->saveMessage($message); | |
237 $this->message->display($message); | |
238 } | |
239 } | |
240 } | |
241 catch (MigrateException $e) { | |
242 $this->migration->getIdMap()->saveIdMapping($row, [], $e->getStatus()); | |
243 $this->saveMessage($e->getMessage(), $e->getLevel()); | |
244 } | |
245 catch (\Exception $e) { | |
246 $this->migration->getIdMap()->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED); | |
247 $this->handleException($e); | |
248 } | |
249 } | |
250 | |
251 $this->sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED; | |
252 | |
253 // Check for memory exhaustion. | |
254 if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) { | |
255 break; | |
256 } | |
257 | |
258 // If anyone has requested we stop, return the requested result. | |
259 if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { | |
260 $return = $this->migration->getInterruptionResult(); | |
261 $this->migration->clearInterruptionResult(); | |
262 break; | |
263 } | |
264 | |
265 try { | |
266 $source->next(); | |
267 } | |
268 catch (\Exception $e) { | |
269 $this->message->display( | |
270 $this->t('Migration failed with source plugin exception: @e', | |
271 ['@e' => $e->getMessage()]), 'error'); | |
272 $this->migration->setStatus(MigrationInterface::STATUS_IDLE); | |
273 return MigrationInterface::RESULT_FAILED; | |
274 } | |
275 } | |
276 | |
277 $this->getEventDispatcher()->dispatch(MigrateEvents::POST_IMPORT, new MigrateImportEvent($this->migration, $this->message)); | |
278 $this->migration->setStatus(MigrationInterface::STATUS_IDLE); | |
279 return $return; | |
280 } | |
281 | |
282 /** | |
283 * {@inheritdoc} | |
284 */ | |
285 public function rollback() { | |
286 // Only begin the rollback operation if the migration is currently idle. | |
287 if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) { | |
288 $this->message->display($this->t('Migration @id is busy with another operation: @status', ['@id' => $this->migration->id(), '@status' => $this->t($this->migration->getStatusLabel())]), 'error'); | |
289 return MigrationInterface::RESULT_FAILED; | |
290 } | |
291 | |
292 // Announce that rollback is about to happen. | |
293 $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROLLBACK, new MigrateRollbackEvent($this->migration)); | |
294 | |
295 // Optimistically assume things are going to work out; if not, $return will be | |
296 // updated to some other status. | |
297 $return = MigrationInterface::RESULT_COMPLETED; | |
298 | |
299 $this->migration->setStatus(MigrationInterface::STATUS_ROLLING_BACK); | |
300 $id_map = $this->migration->getIdMap(); | |
301 $destination = $this->migration->getDestinationPlugin(); | |
302 | |
303 // Loop through each row in the map, and try to roll it back. | |
304 foreach ($id_map as $map_row) { | |
305 $destination_key = $id_map->currentDestination(); | |
306 if ($destination_key) { | |
307 $map_row = $id_map->getRowByDestination($destination_key); | |
308 if ($map_row['rollback_action'] == MigrateIdMapInterface::ROLLBACK_DELETE) { | |
309 $this->getEventDispatcher() | |
310 ->dispatch(MigrateEvents::PRE_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key)); | |
311 $destination->rollback($destination_key); | |
312 $this->getEventDispatcher() | |
313 ->dispatch(MigrateEvents::POST_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key)); | |
314 } | |
315 // We're now done with this row, so remove it from the map. | |
316 $id_map->deleteDestination($destination_key); | |
317 } | |
318 else { | |
319 // If there is no destination key the import probably failed and we can | |
320 // remove the row without further action. | |
321 $source_key = $id_map->currentSource(); | |
322 $id_map->delete($source_key); | |
323 } | |
324 | |
325 // Check for memory exhaustion. | |
326 if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) { | |
327 break; | |
328 } | |
329 | |
330 // If anyone has requested we stop, return the requested result. | |
331 if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { | |
332 $return = $this->migration->getInterruptionResult(); | |
333 $this->migration->clearInterruptionResult(); | |
334 break; | |
335 } | |
336 } | |
337 | |
338 // Notify modules that rollback attempt was complete. | |
339 $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROLLBACK, new MigrateRollbackEvent($this->migration)); | |
340 $this->migration->setStatus(MigrationInterface::STATUS_IDLE); | |
341 | |
342 return $return; | |
343 } | |
344 | |
345 /** | |
346 * {@inheritdoc} | |
347 */ | |
348 public function processRow(Row $row, array $process = NULL, $value = NULL) { | |
349 foreach ($this->migration->getProcessPlugins($process) as $destination => $plugins) { | |
350 $multiple = FALSE; | |
351 /** @var $plugin \Drupal\migrate\Plugin\MigrateProcessInterface */ | |
352 foreach ($plugins as $plugin) { | |
353 $definition = $plugin->getPluginDefinition(); | |
354 // Many plugins expect a scalar value but the current value of the | |
355 // pipeline might be multiple scalars (this is set by the previous | |
356 // plugin) and in this case the current value needs to be iterated | |
357 // and each scalar separately transformed. | |
358 if ($multiple && !$definition['handle_multiples']) { | |
359 $new_value = []; | |
360 if (!is_array($value)) { | |
361 throw new MigrateException(sprintf('Pipeline failed at %s plugin for destination %s: %s received instead of an array,', $plugin->getPluginId(), $destination, $value)); | |
362 } | |
363 $break = FALSE; | |
364 foreach ($value as $scalar_value) { | |
365 try { | |
366 $new_value[] = $plugin->transform($scalar_value, $this, $row, $destination); | |
367 } | |
368 catch (MigrateSkipProcessException $e) { | |
369 $new_value[] = NULL; | |
370 $break = TRUE; | |
371 } | |
372 } | |
373 $value = $new_value; | |
374 if ($break) { | |
375 break; | |
376 } | |
377 } | |
378 else { | |
379 try { | |
380 $value = $plugin->transform($value, $this, $row, $destination); | |
381 } | |
382 catch (MigrateSkipProcessException $e) { | |
383 $value = NULL; | |
384 break; | |
385 } | |
386 $multiple = $plugin->multiple(); | |
387 } | |
388 } | |
389 // Ensure all values, including nulls, are migrated. | |
390 if ($plugins) { | |
391 if (isset($value)) { | |
392 $row->setDestinationProperty($destination, $value); | |
393 } | |
394 else { | |
395 $row->setEmptyDestinationProperty($destination); | |
396 } | |
397 } | |
398 // Reset the value. | |
399 $value = NULL; | |
400 } | |
401 } | |
402 | |
403 /** | |
404 * Fetches the key array for the current source record. | |
405 * | |
406 * @return array | |
407 * The current source IDs. | |
408 */ | |
409 protected function currentSourceIds() { | |
410 return $this->getSource()->getCurrentIds(); | |
411 } | |
412 | |
413 /** | |
414 * {@inheritdoc} | |
415 */ | |
416 public function saveMessage($message, $level = MigrationInterface::MESSAGE_ERROR) { | |
417 $this->migration->getIdMap()->saveMessage($this->sourceIdValues, $message, $level); | |
418 } | |
419 | |
420 /** | |
421 * Takes an Exception object and both saves and displays it. | |
422 * | |
423 * Pulls in additional information on the location triggering the exception. | |
424 * | |
425 * @param \Exception $exception | |
426 * Object representing the exception. | |
427 * @param bool $save | |
428 * (optional) Whether to save the message in the migration's mapping table. | |
429 * Set to FALSE in contexts where this doesn't make sense. | |
430 */ | |
431 protected function handleException(\Exception $exception, $save = TRUE) { | |
432 $result = Error::decodeException($exception); | |
433 $message = $result['@message'] . ' (' . $result['%file'] . ':' . $result['%line'] . ')'; | |
434 if ($save) { | |
435 $this->saveMessage($message); | |
436 } | |
437 $this->message->display($message, 'error'); | |
438 } | |
439 | |
440 /** | |
441 * Checks for exceptional conditions, and display feedback. | |
442 */ | |
443 protected function checkStatus() { | |
444 if ($this->memoryExceeded()) { | |
445 return MigrationInterface::RESULT_INCOMPLETE; | |
446 } | |
447 return MigrationInterface::RESULT_COMPLETED; | |
448 } | |
449 | |
450 /** | |
451 * Tests whether we've exceeded the desired memory threshold. | |
452 * | |
453 * If so, output a message. | |
454 * | |
455 * @return bool | |
456 * TRUE if the threshold is exceeded, otherwise FALSE. | |
457 */ | |
458 protected function memoryExceeded() { | |
459 $usage = $this->getMemoryUsage(); | |
460 $pct_memory = $usage / $this->memoryLimit; | |
461 if (!$threshold = $this->memoryThreshold) { | |
462 return FALSE; | |
463 } | |
464 if ($pct_memory > $threshold) { | |
465 $this->message->display( | |
466 $this->t( | |
467 'Memory usage is @usage (@pct% of limit @limit), reclaiming memory.', | |
468 [ | |
469 '@pct' => round($pct_memory * 100), | |
470 '@usage' => $this->formatSize($usage), | |
471 '@limit' => $this->formatSize($this->memoryLimit), | |
472 ] | |
473 ), | |
474 'warning' | |
475 ); | |
476 $usage = $this->attemptMemoryReclaim(); | |
477 $pct_memory = $usage / $this->memoryLimit; | |
478 // Use a lower threshold - we don't want to be in a situation where we keep | |
479 // coming back here and trimming a tiny amount | |
480 if ($pct_memory > (0.90 * $threshold)) { | |
481 $this->message->display( | |
482 $this->t( | |
483 'Memory usage is now @usage (@pct% of limit @limit), not enough reclaimed, starting new batch', | |
484 [ | |
485 '@pct' => round($pct_memory * 100), | |
486 '@usage' => $this->formatSize($usage), | |
487 '@limit' => $this->formatSize($this->memoryLimit), | |
488 ] | |
489 ), | |
490 'warning' | |
491 ); | |
492 return TRUE; | |
493 } | |
494 else { | |
495 $this->message->display( | |
496 $this->t( | |
497 'Memory usage is now @usage (@pct% of limit @limit), reclaimed enough, continuing', | |
498 [ | |
499 '@pct' => round($pct_memory * 100), | |
500 '@usage' => $this->formatSize($usage), | |
501 '@limit' => $this->formatSize($this->memoryLimit), | |
502 ] | |
503 ), | |
504 'warning'); | |
505 return FALSE; | |
506 } | |
507 } | |
508 else { | |
509 return FALSE; | |
510 } | |
511 } | |
512 | |
513 /** | |
514 * Returns the memory usage so far. | |
515 * | |
516 * @return int | |
517 * The memory usage. | |
518 */ | |
519 protected function getMemoryUsage() { | |
520 return memory_get_usage(); | |
521 } | |
522 | |
523 /** | |
524 * Tries to reclaim memory. | |
525 * | |
526 * @return int | |
527 * The memory usage after reclaim. | |
528 */ | |
529 protected function attemptMemoryReclaim() { | |
530 // First, try resetting Drupal's static storage - this frequently releases | |
531 // plenty of memory to continue. | |
532 drupal_static_reset(); | |
533 | |
534 // Entity storage can blow up with caches so clear them out. | |
535 $manager = \Drupal::entityManager(); | |
536 foreach ($manager->getDefinitions() as $id => $definition) { | |
537 $manager->getStorage($id)->resetCache(); | |
538 } | |
539 | |
540 // @TODO: explore resetting the container. | |
541 | |
542 // Run garbage collector to further reduce memory. | |
543 gc_collect_cycles(); | |
544 | |
545 return memory_get_usage(); | |
546 } | |
547 | |
548 /** | |
549 * Generates a string representation for the given byte count. | |
550 * | |
551 * @param int $size | |
552 * A size in bytes. | |
553 * | |
554 * @return string | |
555 * A translated string representation of the size. | |
556 */ | |
557 protected function formatSize($size) { | |
558 return format_size($size); | |
559 } | |
560 | |
561 } |