Chris@0: 0, Chris@0: MigrateIdMapInterface::STATUS_IGNORED => 0, Chris@0: MigrateIdMapInterface::STATUS_IMPORTED => 0, Chris@0: MigrateIdMapInterface::STATUS_NEEDS_UPDATE => 0, Chris@0: ); Chris@0: Chris@0: /** Chris@0: * Counter of map deletions. Chris@0: * Chris@0: * @var int Chris@0: */ Chris@0: protected $deleteCounter = 0; Chris@0: Chris@0: /** Chris@0: * Maximum number of items to process in this migration. 0 indicates no limit Chris@0: * is to be applied. Chris@0: * Chris@0: * @var int Chris@0: */ Chris@0: protected $itemLimit = 0; Chris@0: Chris@0: /** Chris@0: * Frequency (in items) at which progress messages should be emitted. Chris@0: * Chris@0: * @var int Chris@0: */ Chris@0: protected $feedback = 0; Chris@0: Chris@0: /** Chris@0: * List of specific source IDs to import. Chris@0: * Chris@0: * @var array Chris@0: */ Chris@0: protected $idlist = []; Chris@0: Chris@0: /** Chris@0: * Count of number of items processed so far in this migration. Chris@0: * @var int Chris@0: */ Chris@0: protected $counter = 0; Chris@0: Chris@0: /** Chris@0: * Whether the destination item exists before saving. Chris@0: * Chris@0: * @var bool Chris@0: */ Chris@0: protected $preExistingItem = FALSE; Chris@0: Chris@0: /** Chris@0: * List of event listeners we have registered. Chris@0: * Chris@0: * @var array Chris@0: */ Chris@0: protected $listeners = []; Chris@0: Chris@0: /** Chris@0: * {@inheritdoc} Chris@0: */ Chris@0: public function __construct(MigrationInterface $migration, MigrateMessageInterface $message, array $options = []) { Chris@0: parent::__construct($migration, $message); Chris@0: if (isset($options['limit'])) { Chris@0: $this->itemLimit = $options['limit']; Chris@0: } Chris@0: if (isset($options['feedback'])) { Chris@0: $this->feedback = $options['feedback']; Chris@0: } Chris@0: if (isset($options['idlist'])) { Chris@0: $this->idlist = explode(',', $options['idlist']); Chris@0: } Chris@0: Chris@0: $this->listeners[MigrateEvents::MAP_SAVE] = [$this, 'onMapSave']; Chris@0: $this->listeners[MigrateEvents::MAP_DELETE] = [$this, 'onMapDelete']; Chris@0: $this->listeners[MigrateEvents::POST_IMPORT] = [$this, 'onPostImport']; Chris@0: $this->listeners[MigrateEvents::POST_ROLLBACK] = [$this, 'onPostRollback']; Chris@0: $this->listeners[MigrateEvents::PRE_ROW_SAVE] = [$this, 'onPreRowSave']; Chris@0: $this->listeners[MigrateEvents::POST_ROW_DELETE] = [$this, 'onPostRowDelete']; Chris@0: $this->listeners[MigratePlusEvents::PREPARE_ROW] = [$this, 'onPrepareRow']; Chris@0: foreach ($this->listeners as $event => $listener) { Chris@0: \Drupal::service('event_dispatcher')->addListener($event, $listener); Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * Count up any map save events. Chris@0: * Chris@0: * @param \Drupal\migrate\Event\MigrateMapSaveEvent $event Chris@0: * The map event. Chris@0: */ Chris@0: public function onMapSave(MigrateMapSaveEvent $event) { Chris@0: // Only count saves for this migration. Chris@0: if ($event->getMap()->getQualifiedMapTableName() == $this->migration->getIdMap()->getQualifiedMapTableName()) { Chris@0: $fields = $event->getFields(); Chris@0: // Distinguish between creation and update. Chris@0: if ($fields['source_row_status'] == MigrateIdMapInterface::STATUS_IMPORTED && Chris@0: $this->preExistingItem Chris@0: ) { Chris@0: $this->saveCounters[MigrateIdMapInterface::STATUS_NEEDS_UPDATE]++; Chris@0: } Chris@0: else { Chris@0: $this->saveCounters[$fields['source_row_status']]++; Chris@0: } Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * Count up any rollback events. Chris@0: * Chris@0: * @param \Drupal\migrate\Event\MigrateMapDeleteEvent $event Chris@0: * The map event. Chris@0: */ Chris@0: public function onMapDelete(MigrateMapDeleteEvent $event) { Chris@0: $this->deleteCounter++; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Return the number of items created. Chris@0: * Chris@0: * @return int Chris@0: */ Chris@0: public function getCreatedCount() { Chris@0: return $this->saveCounters[MigrateIdMapInterface::STATUS_IMPORTED]; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Return the number of items updated. Chris@0: * Chris@0: * @return int Chris@0: */ Chris@0: public function getUpdatedCount() { Chris@0: return $this->saveCounters[MigrateIdMapInterface::STATUS_NEEDS_UPDATE]; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Return the number of items ignored. Chris@0: * Chris@0: * @return int Chris@0: */ Chris@0: public function getIgnoredCount() { Chris@0: return $this->saveCounters[MigrateIdMapInterface::STATUS_IGNORED]; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Return the number of items that failed. Chris@0: * Chris@0: * @return int Chris@0: */ Chris@0: public function getFailedCount() { Chris@0: return $this->saveCounters[MigrateIdMapInterface::STATUS_FAILED]; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Return the total number of items processed. Note that STATUS_NEEDS_UPDATE Chris@0: * is not counted, since this is typically set on stubs created as side Chris@0: * effects, not on the primary item being imported. Chris@0: * Chris@0: * @return int Chris@0: */ Chris@0: public function getProcessedCount() { Chris@0: return $this->saveCounters[MigrateIdMapInterface::STATUS_IMPORTED] + Chris@0: $this->saveCounters[MigrateIdMapInterface::STATUS_NEEDS_UPDATE] + Chris@0: $this->saveCounters[MigrateIdMapInterface::STATUS_IGNORED] + Chris@0: $this->saveCounters[MigrateIdMapInterface::STATUS_FAILED]; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Return the number of items rolled back. Chris@0: * Chris@0: * @return int Chris@0: */ Chris@0: public function getRollbackCount() { Chris@0: return $this->deleteCounter; Chris@0: } Chris@0: Chris@0: /** Chris@0: * Reset all the per-status counters to 0. Chris@0: */ Chris@0: protected function resetCounters() { Chris@0: foreach ($this->saveCounters as $status => $count) { Chris@0: $this->saveCounters[$status] = 0; Chris@0: } Chris@0: $this->deleteCounter = 0; Chris@0: } Chris@0: Chris@0: /** Chris@0: * React to migration completion. Chris@0: * Chris@0: * @param \Drupal\migrate\Event\MigrateImportEvent $event Chris@0: * The map event. Chris@0: */ Chris@0: public function onPostImport(MigrateImportEvent $event) { Chris@0: $migrate_last_imported_store = \Drupal::keyValue('migrate_last_imported'); Chris@0: $migrate_last_imported_store->set($event->getMigration()->id(), round(microtime(TRUE) * 1000)); Chris@0: $this->progressMessage(); Chris@0: $this->removeListeners(); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Clean up all our event listeners. Chris@0: */ Chris@0: protected function removeListeners() { Chris@0: foreach ($this->listeners as $event => $listener) { Chris@0: \Drupal::service('event_dispatcher')->removeListener($event, $listener); Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * Emit information on what we've done since the last feedback (or the Chris@0: * beginning of this migration). Chris@0: * Chris@0: * @param bool $done Chris@0: */ Chris@0: protected function progressMessage($done = TRUE) { Chris@0: $processed = $this->getProcessedCount(); Chris@0: if ($done) { Chris@0: $singular_message = "Processed 1 item (@created created, @updated updated, @failures failed, @ignored ignored) - done with '@name'"; Chris@0: $plural_message = "Processed @numitems items (@created created, @updated updated, @failures failed, @ignored ignored) - done with '@name'"; Chris@0: } Chris@0: else { Chris@0: $singular_message = "Processed 1 item (@created created, @updated updated, @failures failed, @ignored ignored) - continuing with '@name'"; Chris@0: $plural_message = "Processed @numitems items (@created created, @updated updated, @failures failed, @ignored ignored) - continuing with '@name'"; Chris@0: } Chris@0: $this->message->display(\Drupal::translation()->formatPlural($processed, Chris@0: $singular_message, $plural_message, Chris@0: array('@numitems' => $processed, Chris@0: '@created' => $this->getCreatedCount(), Chris@0: '@updated' => $this->getUpdatedCount(), Chris@0: '@failures' => $this->getFailedCount(), Chris@0: '@ignored' => $this->getIgnoredCount(), Chris@0: '@name' => $this->migration->id()))); Chris@0: } Chris@0: Chris@0: /** Chris@0: * React to rollback completion. Chris@0: * Chris@0: * @param \Drupal\migrate\Event\MigrateRollbackEvent $event Chris@0: * The map event. Chris@0: */ Chris@0: public function onPostRollback(MigrateRollbackEvent $event) { Chris@0: $this->rollbackMessage(); Chris@0: $this->removeListeners(); Chris@0: } Chris@0: Chris@0: /** Chris@0: * Emit information on what we've done since the last feedback (or the Chris@0: * beginning of this migration). Chris@0: * Chris@0: * @param bool $done Chris@0: */ Chris@0: protected function rollbackMessage($done = TRUE) { Chris@0: $rolled_back = $this->getRollbackCount(); Chris@0: if ($done) { Chris@0: $singular_message = "Rolled back 1 item - done with '@name'"; Chris@0: $plural_message = "Rolled back @numitems items - done with '@name'"; Chris@0: } Chris@0: else { Chris@0: $singular_message = "Rolled back 1 item - continuing with '@name'"; Chris@0: $plural_message = "Rolled back @numitems items - continuing with '@name'"; Chris@0: } Chris@0: $this->message->display(\Drupal::translation()->formatPlural($rolled_back, Chris@0: $singular_message, $plural_message, Chris@0: array('@numitems' => $rolled_back, Chris@0: '@name' => $this->migration->id()))); Chris@0: } Chris@0: Chris@0: /** Chris@0: * React to an item about to be imported. Chris@0: * Chris@0: * @param \Drupal\migrate\Event\MigratePreRowSaveEvent $event Chris@0: * The pre-save event. Chris@0: */ Chris@0: public function onPreRowSave(MigratePreRowSaveEvent $event) { Chris@0: $id_map = $event->getRow()->getIdMap(); Chris@0: if (!empty($id_map['destid1'])) { Chris@0: $this->preExistingItem = TRUE; Chris@0: } Chris@0: else { Chris@0: $this->preExistingItem = FALSE; Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * React to item rollback. Chris@0: * Chris@0: * @param \Drupal\migrate\Event\MigrateRowDeleteEvent $event Chris@0: * The post-save event. Chris@0: */ Chris@0: public function onPostRowDelete(MigrateRowDeleteEvent $event) { Chris@0: if ($this->feedback && ($this->deleteCounter) && $this->deleteCounter % $this->feedback == 0) { Chris@0: $this->rollbackMessage(FALSE); Chris@0: $this->resetCounters(); Chris@0: } Chris@0: } Chris@0: Chris@0: /** Chris@0: * React to a new row. Chris@0: * Chris@0: * @param \Drupal\migrate_plus\Event\MigratePrepareRowEvent $event Chris@0: * The prepare-row event. Chris@0: * Chris@0: * @throws \Drupal\migrate\MigrateSkipRowException Chris@0: * Chris@0: */ Chris@0: public function onPrepareRow(MigratePrepareRowEvent $event) { Chris@0: if ($this->idlist) { Chris@0: $row = $event->getRow(); Chris@0: $source_id = $row->getSourceIdValues(); Chris@0: if (!in_array(reset($source_id), $this->idlist)) { Chris@0: throw new MigrateSkipRowException(NULL, FALSE); Chris@0: } Chris@0: } Chris@0: if ($this->feedback && ($this->counter) && $this->counter % $this->feedback == 0) { Chris@0: $this->progressMessage(FALSE); Chris@0: $this->resetCounters(); Chris@0: } Chris@0: $this->counter++; Chris@0: if ($this->itemLimit && $this->counter >= $this->itemLimit) { Chris@0: $event->getMigration()->interruptMigration(MigrationInterface::RESULT_COMPLETED); Chris@0: } Chris@0: Chris@0: } Chris@0: Chris@0: }