2016-04-03 2 views
4

Для огромного проекта с большим количеством объектов я написал общий метод save().Zend framework 2/Doctrine 2/Выполнение массовых операций и событий

Этот метод хранится в абстрактной службе и используется во всем проекте для сохранения состояния объектов.

AbstractService :: сохранить() выглядит следующим образом:

public function save($entity) 
{ 
    $transactionStarted = $this->beginTransaction(); 

    try 
    { 
     $action = $entity->getId() ? self::UPDATE : self::CREATION; 

     $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]); 

     $this->getEntityManager()->persist($entity); 
     $this->getEntityManager()->flush(); 

     $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]); 

     if ($transactionStarted) 
     { 
      $this->commitTransaction(); 
     } 
    } catch (\Exception $e) 
    { 
     if ($transactionStarted) 
     { 
      $this->rollbackTransaction(); 
     } 

     throw new Exception('Unable to save entity', $e); 
    } 

    return true; 
} 

public function beginTransaction() 
{ 
    if (!$this->getEntityManager()->getConnection()->isTransactionActive()) 
    { 
     $this->getEntityManager()->getConnection()->beginTransaction(); 

     return true; 
    } 

    return false; 
} 

public function commitTransaction() 
{ 
    $this->getEntityManager()->getConnection()->commit(); 

    return $this; 
} 

public function rollbackTransaction() 
{ 
    $this->getEntityManager()->getConnection()->rollBack(); 

    return $this; 
} 

В моем случае, когда элемент вставлен (новый Member объект) при вызове Member службы (расширенный AbstractService), отправляется сообщение электронной почты (например) через событие save.post. Или может быть выполнено другое действие, связанное с другим методом вызова службы.

Пример "ребенок" MemberService :: сохранить() метод

MemberService 

public function save(Member $member) 
{ 
    // some stuff, e.g set a property 
    $member->setFirstName('John'); 

    return parent::save($member); 
} 

Пример сработавшего события

$sharedEventManager->attach(MemberService::class, 'save.post', [$this, 'onMembersCreation']); 

public function onMembersCreation(EventInterface $event) 
{ 
    // send an email 

    // anything else ... update another entity ... (call AnotherService::save() too) 
} 

Это отлично подходит для простого процесса сохранения.

Но теперь я хочу массово импортировать множество членов, с творениями, обновлениями ... И для этого я прочитал документ Доктрины, связанный с массовым импортом. Doc here

Но как правильно обновить код, чтобы обрабатывать «объемную экономию» и «единую экономию»? И сохранить безопасность транзакций и события?

+0

Что такое "много членов"? 1k? 1М? Ваш ответ определит, какую стратегию вы должны adaopt – JesusTheHun

+0

Привет, JesusTheHun, спасибо, что вы первый, кого интересует моя проблема :) «много членов» от 4k до 10k – ceadreak

+0

Является ли это одним выстрелом или он должен часто? Основной вопрос: имеет ли значение производительность? – JesusTheHun

ответ

1

В принципе, я предлагаю вам реализовать интерфейс Doctrine \ Common \ Collections \ Collection, возможно, расширить ArrayCollection и создать метод save, который будет делать то, что сообщил вам doc.

<?php 

class MyDirtyCollection extends \Doctrine\Common\Collections\ArrayCollection { 

    public function __construct(AbstractService $abstractService) 
    { 
     $this->service = $abstractService; 
    } 

    public function save() 
    { 
     foreach ($this as $entity) { 
      $this->service->save($entity); 
     } 
    } 
} 

class MyCollection extends \Doctrine\Common\Collections\ArrayCollection { 

    public $bulkSize = 500; 

    protected $eventManager; 
    protected $entityManager; 

    public function __construct(EntityManager $entityManager, EventManager $eventManager) 
    { 
     $this->entityManager = $entityManager; 
     $this->eventManager = $eventManager; 
    } 

    public function getEventManager() 
    { 
     return $this->eventManager; 
    } 

    public function getEntityManager() 
    { 
     return $this->entityManager; 
    } 

    public function setBulkSize(int $bulkSize) 
    { 
     $this->bulkSize = $bulkSize; 
    } 

    public function save() 
    { 
     $transactionStarted = $this->getEntityManager()->getConnection()->beginTransaction(); 

     try { 
      foreach ($this as $entity) { 
       $action = $entity->getId() ? self::UPDATE : self::CREATION; 
       $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]); 
      } 

      $i = 0; 
      foreach ($this as $entity) { 
       $i++; 

       $this->getEntityManager()->persist($entity); 

       if (($i % $this->bulkSize) === 0) { 
        $this->getEntityManager()->flush(); 
        $this->getEntityManager()->clear(); 
       } 
      } 

      $this->getEntityManager()->flush(); 
      $this->getEntityManager()->clear(); 

      foreach ($this as $entity) { 
       $action = $entity->getId() ? self::UPDATE : self::CREATION; 
       $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]); 
      } 

      if ($transactionStarted) { 
       $this->getEntityManager()->getConnection()->commitTransaction(); 
      } 

     } catch (Exception $e) { 
      $this->getEntityManager()->rollbackTransaction(); 
     } 
    } 
} 

Нечто подобное;) Когда вы запрашиваете ваши данные, которые вы гидрат вашей коллекции, то вы имеете дело с вашей организацией и, наконец, позвонить $collection->save();

EDIT: Добавить класс вставки и использовать корпус ниже:

Производительность здесь будет низкой, но все же лучше, чем фиксация фиксацией. Однако вы должны подумать об использовании Doctrine DBAL вместо ORM, если вы ищете производительность hgih. Здесь я поделюсь с вами моей DBAL класса для бестарной Вставки:

<?php 

namespace JTH\Doctrine\DBAL; 

use Doctrine\DBAL\Query\QueryBuilder; 
use Exception; 
use InvalidArgumentException; 
use Traversable; 
use UnderflowException; 

class Insert extends QueryBuilder 
{ 
    const CALLBACK_FAILURE_SKIP = 0; 
    const CALLBACK_FAILURE_BREAK = 1; 

    protected $callbackFailureStrategy = self::CALLBACK_FAILURE_BREAK; 

    public static $defaultBulkSize = 500; 

    public $ignore = false; 
    public $onDuplicate = null; 

    public function values(array $values) 
    { 
     $this->resetQueryPart('values'); 
     $this->addValues($values); 
    } 

    public function addValues(array $values) 
    { 
     $this->add('values', $values, true); 
    } 

    public function setCallbackFailureStrategy($strategy) 
    { 
     if ($strategy == static::CALLBACK_FAILURE_BREAK) { 
      $this->callbackFailureStrategy = static::CALLBACK_FAILURE_BREAK; 
     } elseif ($strategy == static::CALLBACK_FAILURE_SKIP) { 
      $this->callbackFailureStrategy = static::CALLBACK_FAILURE_SKIP; 
     } else { 
      $class = self::class; 
      throw new InvalidArgumentException(
       "Invalid failure behaviour. See $class::CALLBACK_FAILURE_SKIP and $class::CALLBACK_FAILURE_BREAK" 
      ); 
     } 
    } 

    public function getCallbackFailureStrategy() 
    { 
     return $this->callbackFailureStrategy; 
    } 

    public function execute() 
    { 
     return $this->getConnection()->executeUpdate(
      $this->getSQLForInsert(), 
      $this->getParameters(), 
      $this->getParameterTypes() 
     ); 
    } 

    /** 
    * Converts this instance into an INSERT string in SQL. 
    * @return string 
    * @throws \Exception 
    */ 
    private function getSQLForInsert() 
    { 
     $count = sizeof($this->getQueryPart('values')); 

     if ($count == 0) { 
      throw new UnderflowException("No values ready for INSERT"); 
     } 

     $values = current($this->getQueryPart('values')); 
     $ignore = $this->ignore ? 'IGNORE' : '' ; 
     $sql = "INSERT $ignore INTO " . $this->getQueryPart('from')['table'] . 
      ' (' . implode(', ', array_keys($values)) . ')' . ' VALUES '; 

     foreach ($this->getQueryPart('values') as $values) { 
      $sql .= '(' ; 

      foreach ($values as $value) { 
       if (is_array($value)) { 
        if ($value['raw']) { 
         $sql .= $value['value'] . ','; 
        } else { 
         $sql .= $this->expr()->literal($value['value'], $value['type']) . ','; 
        } 
       } else { 
        $sql .= $this->expr()->literal($value) . ','; 
       } 
      } 

      $sql = substr($sql, 0, -1); 
      $sql .= '),'; 
     } 

     $sql = substr($sql, 0, -1); 

     if (!is_null($this->onDuplicate)) { 
      $sql .= ' ON DUPLICATE KEY UPDATE ' . $this->onDuplicate . ' '; 
     } 

     return $sql; 
    } 

    /** 
    * @param $loopable array | Traversable An array or object to loop over 
    * @param $callable Callable A callable that will be called before actually insert the row. 
    * two parameters will be passed : 
    * - the key of the current row 
    * - the row values (Array) 
    * An array of rows to insert must be returned 
    * @param $bulkSize int How many rows will be inserted at once 
    * @param bool $transactionnal 
    * @throws \Doctrine\DBAL\ConnectionException 
    * @throws \Exception 
    */ 
    public function bulk($loopable, callable $callable, $bulkSize = null, $transactionnal = true) 
    { 
     if (!is_array($loopable) and !($loopable instanceof Traversable)) { 
      throw new InvalidArgumentException("\$loppable must be either an array or a traversable object"); 
     } 

     $bulkSize = $bulkSize ?? static::$defaultBulkSize; 

     $this->getConnection()->getConfiguration()->setSQLLogger(null); // Avoid MonoLog memory overload 

     if ($transactionnal) { 
      $this->getConnection()->beginTransaction(); 
     } 

     $this->resetQueryPart('values'); 

     foreach ($loopable as $key => $values) { 
      try { 
       $callbackedValues = $callable($key, $values); 

       if (sizeof($callbackedValues) > 0) { 
        foreach ($callbackedValues as $callbackedValuesRow) { 
         $this->addValues($callbackedValuesRow); 
        } 
       } 
      } catch (Exception $e) { 
       /* 
       * If a callback exception must break the transaction, then throw the exception to the call stack 
       * Else, skip the row insertion 
       */ 
       if ($this->callbackFailureStrategy == static::CALLBACK_FAILURE_BREAK) { 
        throw $e; 
       } else { 
        continue; 
       } 
      } 

      $count = count($this->getQueryPart('values')); 

      if ($count >= $bulkSize) { 
       $this->execute(); 
       $this->resetQueryPart('values'); 
      } 
     } 

     $count = count($this->getQueryPart('values')); 

     if ($count > 0) { 
      $this->execute(); 
     } 

     $this->resetQueryPart('values'); 

     if ($transactionnal) { 
      $this->getConnection()->commit(); 
     } 
    } 

    /** 
    * @return boolean 
    */ 
    public function isIgnore() 
    { 
     return $this->ignore; 
    } 

    /** 
    * @param boolean $ignore 
    */ 
    public function setIgnore(bool $ignore) 
    { 
     $this->ignore = $ignore; 
    } 

    /** 
    * @return null|string 
    */ 
    public function getOnDuplicate() : string 
    { 
     return $this->onDuplicate; 
    } 

    /** 
    * @param null $onDuplicate 
    */ 
    public function setOnDuplicate($onDuplicate) 
    { 
     $this->onDuplicate = $onDuplicate; 
     $this->ignore = false; 
    } 


} 

использование:

try { 
     $i = new Insert($this->getDoctrine()->getConnection('myDB')); 
     $i->insert('myTable'); 
     $i->setOnDuplicate('col1 = VALUES(col1), updated_last = NOW()'); 
     $i->setCallbackFailureStrategy(Insert::CALLBACK_FAILURE_BREAK); 
     $i->bulk($myArrayOfRows, function ($key, $row) { 

      // Some pre-insert processing 

      $rowset[] = $row; 

      return $rowset; 

     }, 500, true); 

     $this->addFlash('success', 'Yay !'); 

    } catch (DBALException $e) { 
     $this->addFlash('error', 'Damn, error : ' . $e->getMessage()); 
    } 
+0

Mhhh Я не уверен в этом, я попробовал очень похожий метод с массивом, а не с коллекцией, но имел проблемы с транзакциями ... Я снова проведу проверку с коллекцией, чтобы проверить, не является ли проблема такой же. – ceadreak

+0

В чем была проблема с транзакциями? Проверьте мое обновление также – JesusTheHun

+0

@ JesusTheHun Я проверю ваше решение на этом сайте, спасибо за время – ceadreak

0

Наконец, я использовал метод merge доктрины и, кажется, отлично работают.

я написал отдельный AbstractService::saveBulk() метод, чтобы сохранить большое количество Member объектов, таких как:

/** 
    * @param ArrayCollection $entities 
    * 
    * @return bool 
    * @throws Exception 
    */ 
    public function saveBulk(ArrayCollection $entities) 
    { 
     $batchSize = 100; 
     $i   = 0; 

     foreach ($entities as $entity) 
     { 
      $transactionStarted = $this->beginTransaction(); 

      try 
      { 
       $action = $entity->getId() ? self::UPDATE : self::CREATION; 

       $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]); 

       $entity = $this->getEntityManager()->merge($entity); 

       $this->getEntityManager()->persist($entity); 
       $this->getEntityManager()->flush(); 

       $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]); 

       if (($i % $batchSize) === 0) 
       { 
        $this->getEntityManager()->clear(); 
       } 

       if ($transactionStarted) 
       { 
        $this->commitTransaction(); 
       } 
      } catch (\Exception $e) 
      { 
       if ($transactionStarted) 
       { 
        $this->rollbackTransaction(); 
       } 

       throw new Exception(Exception::UNEXPECTED_ERROR, 'Unable to save entity', $e); 
      } 
     } 

     $this->getEntityManager()->clear(); 

     return true; 
    } 

вопреки документации doctrine2, я просто называю clear() и не flush() + clear() для каждой партии, потому что для некоторых событий называется , Мне нужно знать, имеет ли объект идентификатор базы данных.

@JesusTheHun благодарит за комментарии, которые мне очень помогают.

+0

Вы начинаете транзакцию для каждого объекта, а не один раз для своего объема. Почему бы вам не использовать метод, который я дал вам в классе MyCollection? (http://stackoverflow.com/questions/36390063/zend-framework-2-doctrine-2-bulk-operations-and-events-triggering/36602128#36602128) Если вы нажмете на каждую сущность, это не имеет никакого смысла, вы должны очистить каждый $ batchSize. Размер партии var должен быть параметром, константой класса или свойством объекта, инициализированным конструкцией или статическим свойством. Кроме того, используя условие modulo, и у вас есть 121 объект, например, вы не будете передавать последние 21 сущности. – JesusTheHun

+0

Я пробовал ваш метод, но без слияния, все еще есть эта ошибка 'Новый объект был найден через связь ...'.'beginTransaction()' метод проверяет, является ли транзакция уже стартером, я не запускаю транзакцию e для каждого объекта – ceadreak

+0

Я попытался снова использовать ваш метод с слиянием перед наступлением, и теперь он работает. для 500 объектов, это займет 4 секунды меньше с вашими методами, чем у меня;). Ответ принят +1 – ceadreak