Chris@0: migration = $migration; Chris@14: $this->message = $message ?: new MigrateMessage(); Chris@18: $this->getIdMap()->setMessage($this->message); Chris@0: $this->eventDispatcher = $event_dispatcher; Chris@0: // Record the memory limit in bytes Chris@0: $limit = trim(ini_get('memory_limit')); Chris@0: if ($limit == '-1') { Chris@0: $this->memoryLimit = PHP_INT_MAX; Chris@0: } Chris@0: else { Chris@0: $this->memoryLimit = Bytes::toInt($limit); Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * Returns the source. Chris@0: * Chris@0: * Makes sure source is initialized based on migration settings. Chris@0: * Chris@0: * @return \Drupal\migrate\Plugin\MigrateSourceInterface Chris@0: * The source. Chris@0: */ Chris@0: protected function getSource() { Chris@0: if (!isset($this->source)) { Chris@0: $this->source = $this->migration->getSourcePlugin(); Chris@0: } Chris@0: return $this->source; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Gets the event dispatcher. Chris@0: * Chris@0: * @return \Symfony\Component\EventDispatcher\EventDispatcherInterface Chris@0: */ Chris@0: protected function getEventDispatcher() { Chris@0: if (!$this->eventDispatcher) { Chris@0: $this->eventDispatcher = \Drupal::service('event_dispatcher'); Chris@0: } Chris@0: return $this->eventDispatcher; Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function import() { Chris@0: // Only begin the import operation if the migration is currently idle. Chris@0: if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) { Chris@0: $this->message->display($this->t('Migration @id is busy with another operation: @status', Chris@0: [ Chris@0: '@id' => $this->migration->id(), Chris@0: '@status' => $this->t($this->migration->getStatusLabel()), Chris@0: ]), 'error'); Chris@0: return MigrationInterface::RESULT_FAILED; Chris@0: } Chris@0: $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_IMPORT, new MigrateImportEvent($this->migration, $this->message)); Chris@0: Chris@0: // Knock off migration if the requirements haven't been met. Chris@0: try { Chris@0: $this->migration->checkRequirements(); Chris@0: } Chris@0: catch (RequirementsException $e) { Chris@0: $this->message->display( Chris@0: $this->t( Chris@0: 'Migration @id did not meet the requirements. @message @requirements', Chris@0: [ Chris@0: '@id' => $this->migration->id(), Chris@0: '@message' => $e->getMessage(), Chris@0: '@requirements' => $e->getRequirementsString(), Chris@0: ] Chris@0: ), Chris@0: 'error' Chris@0: ); Chris@0: Chris@0: return MigrationInterface::RESULT_FAILED; Chris@0: } Chris@0: Chris@0: $this->migration->setStatus(MigrationInterface::STATUS_IMPORTING); Chris@0: $return = MigrationInterface::RESULT_COMPLETED; Chris@0: $source = $this->getSource(); Chris@18: $id_map = $this->getIdMap(); Chris@0: Chris@0: try { Chris@0: $source->rewind(); Chris@0: } Chris@0: catch (\Exception $e) { Chris@0: $this->message->display( Chris@0: $this->t('Migration failed with source plugin exception: @e', ['@e' => $e->getMessage()]), 'error'); Chris@0: $this->migration->setStatus(MigrationInterface::STATUS_IDLE); Chris@0: return MigrationInterface::RESULT_FAILED; Chris@0: } Chris@0: Chris@0: $destination = $this->migration->getDestinationPlugin(); Chris@0: while ($source->valid()) { Chris@0: $row = $source->current(); Chris@0: $this->sourceIdValues = $row->getSourceIdValues(); Chris@0: Chris@0: try { Chris@0: $this->processRow($row); Chris@0: $save = TRUE; Chris@0: } Chris@0: catch (MigrateException $e) { Chris@18: $this->getIdMap()->saveIdMapping($row, [], $e->getStatus()); Chris@0: $this->saveMessage($e->getMessage(), $e->getLevel()); Chris@0: $save = FALSE; Chris@0: } Chris@0: catch (MigrateSkipRowException $e) { Chris@0: if ($e->getSaveToMap()) { Chris@0: $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED); Chris@0: } Chris@0: if ($message = trim($e->getMessage())) { Chris@0: $this->saveMessage($message, MigrationInterface::MESSAGE_INFORMATIONAL); Chris@0: } Chris@0: $save = FALSE; Chris@0: } Chris@0: Chris@0: if ($save) { Chris@0: try { Chris@0: $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROW_SAVE, new MigratePreRowSaveEvent($this->migration, $this->message, $row)); Chris@17: $destination_ids = $id_map->lookupDestinationIds($this->sourceIdValues); Chris@17: $destination_id_values = $destination_ids ? reset($destination_ids) : []; Chris@17: $destination_id_values = $destination->import($row, $destination_id_values); Chris@0: $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROW_SAVE, new MigratePostRowSaveEvent($this->migration, $this->message, $row, $destination_id_values)); Chris@0: if ($destination_id_values) { Chris@0: // We do not save an idMap entry for config. Chris@0: if ($destination_id_values !== TRUE) { Chris@0: $id_map->saveIdMapping($row, $destination_id_values, $this->sourceRowStatus, $destination->rollbackAction()); Chris@0: } Chris@0: } Chris@0: else { Chris@0: $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED); Chris@0: if (!$id_map->messageCount()) { Chris@0: $message = $this->t('New object was not saved, no error provided'); Chris@0: $this->saveMessage($message); Chris@0: $this->message->display($message); Chris@0: } Chris@0: } Chris@0: } Chris@0: catch (MigrateException $e) { Chris@18: $this->getIdMap()->saveIdMapping($row, [], $e->getStatus()); Chris@0: $this->saveMessage($e->getMessage(), $e->getLevel()); Chris@0: } Chris@0: catch (\Exception $e) { Chris@18: $this->getIdMap()->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED); Chris@0: $this->handleException($e); Chris@0: } Chris@0: } Chris@0: Chris@0: $this->sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED; Chris@0: Chris@0: // Check for memory exhaustion. Chris@0: if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) { Chris@0: break; Chris@0: } Chris@0: Chris@0: // If anyone has requested we stop, return the requested result. Chris@0: if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { Chris@0: $return = $this->migration->getInterruptionResult(); Chris@0: $this->migration->clearInterruptionResult(); Chris@0: break; Chris@0: } Chris@0: Chris@0: try { Chris@0: $source->next(); Chris@0: } Chris@0: catch (\Exception $e) { Chris@0: $this->message->display( Chris@0: $this->t('Migration failed with source plugin exception: @e', Chris@0: ['@e' => $e->getMessage()]), 'error'); Chris@0: $this->migration->setStatus(MigrationInterface::STATUS_IDLE); Chris@0: return MigrationInterface::RESULT_FAILED; Chris@0: } Chris@0: } Chris@0: Chris@0: $this->getEventDispatcher()->dispatch(MigrateEvents::POST_IMPORT, new MigrateImportEvent($this->migration, $this->message)); Chris@0: $this->migration->setStatus(MigrationInterface::STATUS_IDLE); Chris@0: return $return; Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function rollback() { Chris@0: // Only begin the rollback operation if the migration is currently idle. Chris@0: if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) { Chris@0: $this->message->display($this->t('Migration @id is busy with another operation: @status', ['@id' => $this->migration->id(), '@status' => $this->t($this->migration->getStatusLabel())]), 'error'); Chris@0: return MigrationInterface::RESULT_FAILED; Chris@0: } Chris@0: Chris@0: // Announce that rollback is about to happen. Chris@0: $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROLLBACK, new MigrateRollbackEvent($this->migration)); Chris@0: Chris@0: // Optimistically assume things are going to work out; if not, $return will be Chris@0: // updated to some other status. Chris@0: $return = MigrationInterface::RESULT_COMPLETED; Chris@0: Chris@0: $this->migration->setStatus(MigrationInterface::STATUS_ROLLING_BACK); Chris@18: $id_map = $this->getIdMap(); Chris@0: $destination = $this->migration->getDestinationPlugin(); Chris@0: Chris@0: // Loop through each row in the map, and try to roll it back. Chris@18: $id_map->rewind(); Chris@18: while ($id_map->valid()) { Chris@0: $destination_key = $id_map->currentDestination(); Chris@0: if ($destination_key) { Chris@0: $map_row = $id_map->getRowByDestination($destination_key); Chris@0: if ($map_row['rollback_action'] == MigrateIdMapInterface::ROLLBACK_DELETE) { Chris@0: $this->getEventDispatcher() Chris@0: ->dispatch(MigrateEvents::PRE_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key)); Chris@0: $destination->rollback($destination_key); Chris@0: $this->getEventDispatcher() Chris@0: ->dispatch(MigrateEvents::POST_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key)); Chris@0: } Chris@0: // We're now done with this row, so remove it from the map. Chris@0: $id_map->deleteDestination($destination_key); Chris@0: } Chris@0: else { Chris@0: // If there is no destination key the import probably failed and we can Chris@0: // remove the row without further action. Chris@0: $source_key = $id_map->currentSource(); Chris@0: $id_map->delete($source_key); Chris@0: } Chris@18: $id_map->next(); Chris@0: Chris@0: // Check for memory exhaustion. Chris@0: if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) { Chris@0: break; Chris@0: } Chris@0: Chris@0: // If anyone has requested we stop, return the requested result. Chris@0: if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { Chris@0: $return = $this->migration->getInterruptionResult(); Chris@0: $this->migration->clearInterruptionResult(); Chris@0: break; Chris@0: } Chris@0: } Chris@0: Chris@0: // Notify modules that rollback attempt was complete. Chris@0: $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROLLBACK, new MigrateRollbackEvent($this->migration)); Chris@0: $this->migration->setStatus(MigrationInterface::STATUS_IDLE); Chris@0: Chris@0: return $return; Chris@0: } Chris@0: Chris@0: /** Chris@18: * Get the ID map from the current migration. Chris@18: * Chris@18: * @return \Drupal\migrate\Plugin\MigrateIdMapInterface Chris@18: * The ID map. Chris@18: */ Chris@18: protected function getIdMap() { Chris@18: return $this->migration->getIdMap(); Chris@18: } Chris@18: Chris@18: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function processRow(Row $row, array $process = NULL, $value = NULL) { Chris@0: foreach ($this->migration->getProcessPlugins($process) as $destination => $plugins) { Chris@0: $multiple = FALSE; Chris@0: /** @var $plugin \Drupal\migrate\Plugin\MigrateProcessInterface */ Chris@0: foreach ($plugins as $plugin) { Chris@0: $definition = $plugin->getPluginDefinition(); Chris@0: // Many plugins expect a scalar value but the current value of the Chris@0: // pipeline might be multiple scalars (this is set by the previous Chris@0: // plugin) and in this case the current value needs to be iterated Chris@0: // and each scalar separately transformed. Chris@0: if ($multiple && !$definition['handle_multiples']) { Chris@0: $new_value = []; Chris@0: if (!is_array($value)) { Chris@0: throw new MigrateException(sprintf('Pipeline failed at %s plugin for destination %s: %s received instead of an array,', $plugin->getPluginId(), $destination, $value)); Chris@0: } Chris@0: $break = FALSE; Chris@0: foreach ($value as $scalar_value) { Chris@0: try { Chris@0: $new_value[] = $plugin->transform($scalar_value, $this, $row, $destination); Chris@0: } Chris@0: catch (MigrateSkipProcessException $e) { Chris@0: $new_value[] = NULL; Chris@0: $break = TRUE; Chris@0: } Chris@0: } Chris@0: $value = $new_value; Chris@0: if ($break) { Chris@0: break; Chris@0: } Chris@0: } Chris@0: else { Chris@0: try { Chris@0: $value = $plugin->transform($value, $this, $row, $destination); Chris@0: } Chris@0: catch (MigrateSkipProcessException $e) { Chris@0: $value = NULL; Chris@0: break; Chris@0: } Chris@0: $multiple = $plugin->multiple(); Chris@0: } Chris@0: } Chris@0: // Ensure all values, including nulls, are migrated. Chris@0: if ($plugins) { Chris@0: if (isset($value)) { Chris@0: $row->setDestinationProperty($destination, $value); Chris@0: } Chris@0: else { Chris@0: $row->setEmptyDestinationProperty($destination); Chris@0: } Chris@0: } Chris@0: // Reset the value. Chris@0: $value = NULL; Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * Fetches the key array for the current source record. Chris@0: * Chris@0: * @return array Chris@0: * The current source IDs. Chris@0: */ Chris@0: protected function currentSourceIds() { Chris@0: return $this->getSource()->getCurrentIds(); Chris@0: } Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function saveMessage($message, $level = MigrationInterface::MESSAGE_ERROR) { Chris@18: $this->getIdMap()->saveMessage($this->sourceIdValues, $message, $level); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Takes an Exception object and both saves and displays it. Chris@0: * Chris@0: * Pulls in additional information on the location triggering the exception. Chris@0: * Chris@0: * @param \Exception $exception Chris@0: * Object representing the exception. Chris@0: * @param bool $save Chris@0: * (optional) Whether to save the message in the migration's mapping table. Chris@0: * Set to FALSE in contexts where this doesn't make sense. Chris@0: */ Chris@0: protected function handleException(\Exception $exception, $save = TRUE) { Chris@0: $result = Error::decodeException($exception); Chris@0: $message = $result['@message'] . ' (' . $result['%file'] . ':' . $result['%line'] . ')'; Chris@0: if ($save) { Chris@0: $this->saveMessage($message); Chris@0: } Chris@0: $this->message->display($message, 'error'); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Checks for exceptional conditions, and display feedback. Chris@0: */ Chris@0: protected function checkStatus() { Chris@0: if ($this->memoryExceeded()) { Chris@0: return MigrationInterface::RESULT_INCOMPLETE; Chris@0: } Chris@0: return MigrationInterface::RESULT_COMPLETED; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Tests whether we've exceeded the desired memory threshold. Chris@0: * Chris@0: * If so, output a message. Chris@0: * Chris@0: * @return bool Chris@0: * TRUE if the threshold is exceeded, otherwise FALSE. Chris@0: */ Chris@0: protected function memoryExceeded() { Chris@0: $usage = $this->getMemoryUsage(); Chris@0: $pct_memory = $usage / $this->memoryLimit; Chris@0: if (!$threshold = $this->memoryThreshold) { Chris@0: return FALSE; Chris@0: } Chris@0: if ($pct_memory > $threshold) { Chris@0: $this->message->display( Chris@0: $this->t( Chris@0: 'Memory usage is @usage (@pct% of limit @limit), reclaiming memory.', Chris@0: [ Chris@0: '@pct' => round($pct_memory * 100), Chris@0: '@usage' => $this->formatSize($usage), Chris@0: '@limit' => $this->formatSize($this->memoryLimit), Chris@0: ] Chris@0: ), Chris@0: 'warning' Chris@0: ); Chris@0: $usage = $this->attemptMemoryReclaim(); Chris@0: $pct_memory = $usage / $this->memoryLimit; Chris@0: // Use a lower threshold - we don't want to be in a situation where we keep Chris@0: // coming back here and trimming a tiny amount Chris@0: if ($pct_memory > (0.90 * $threshold)) { Chris@0: $this->message->display( Chris@0: $this->t( Chris@0: 'Memory usage is now @usage (@pct% of limit @limit), not enough reclaimed, starting new batch', Chris@0: [ Chris@0: '@pct' => round($pct_memory * 100), Chris@0: '@usage' => $this->formatSize($usage), Chris@0: '@limit' => $this->formatSize($this->memoryLimit), Chris@0: ] Chris@0: ), Chris@0: 'warning' Chris@0: ); Chris@0: return TRUE; Chris@0: } Chris@0: else { Chris@0: $this->message->display( Chris@0: $this->t( Chris@0: 'Memory usage is now @usage (@pct% of limit @limit), reclaimed enough, continuing', Chris@0: [ Chris@0: '@pct' => round($pct_memory * 100), Chris@0: '@usage' => $this->formatSize($usage), Chris@0: '@limit' => $this->formatSize($this->memoryLimit), Chris@0: ] Chris@0: ), Chris@0: 'warning'); Chris@0: return FALSE; Chris@0: } Chris@0: } Chris@0: else { Chris@0: return FALSE; Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * Returns the memory usage so far. Chris@0: * Chris@0: * @return int Chris@0: * The memory usage. Chris@0: */ Chris@0: protected function getMemoryUsage() { Chris@0: return memory_get_usage(); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Tries to reclaim memory. Chris@0: * Chris@0: * @return int Chris@0: * The memory usage after reclaim. Chris@0: */ Chris@0: protected function attemptMemoryReclaim() { Chris@0: // First, try resetting Drupal's static storage - this frequently releases Chris@0: // plenty of memory to continue. Chris@0: drupal_static_reset(); Chris@0: Chris@0: // Entity storage can blow up with caches so clear them out. Chris@0: $manager = \Drupal::entityManager(); Chris@0: foreach ($manager->getDefinitions() as $id => $definition) { Chris@0: $manager->getStorage($id)->resetCache(); Chris@0: } Chris@0: Chris@0: // @TODO: explore resetting the container. Chris@0: Chris@0: // Run garbage collector to further reduce memory. Chris@0: gc_collect_cycles(); Chris@0: Chris@0: return memory_get_usage(); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Generates a string representation for the given byte count. Chris@0: * Chris@0: * @param int $size Chris@0: * A size in bytes. Chris@0: * Chris@0: * @return string Chris@0: * A translated string representation of the size. Chris@0: */ Chris@0: protected function formatSize($size) { Chris@0: return format_size($size); Chris@0: } Chris@0: Chris@0: }