2014-12-13 3 views
17

У меня есть PHP-скрипт, который извлекает строки из базы данных и затем выполняет работу на основе содержимого. Работа может занять много времени (но не обязательно дорогостоящим путем), и поэтому мне нужно разрешить параллельное выполнение нескольких сценариев.Реализация простой очереди с PHP и MySQL?

Строка в базе данных выглядит примерно так:

+---------------------+---------------+------+-----+---------------------+----------------+ 
| Field    | Type   | Null | Key | Default    | Extra   | 
+---------------------+---------------+------+-----+---------------------+----------------+ 
| id     | bigint(11) | NO | PRI | NULL    | auto_increment | 
..... 
| date_update_started | datetime  | NO |  | 0000-00-00 00:00:00 |    | 
| date_last_updated | datetime  | NO |  | 0000-00-00 00:00:00 |    | 
+---------------------+---------------+------+-----+---------------------+----------------+ 

Моего сценарий в настоящее время выбирает строки с древнейшими датами в date_last_updated (которая обновляется после того, как работа выполнена) и не делает использование date_update_started.

Если бы мне пришлось запускать несколько экземпляров скрипта параллельно, они будут выбирать одни и те же строки (по крайней мере, некоторое время), и будет выполняться дублирующая работа.

Что я имею в виду делать это с помощью транзакции, чтобы выбрать строки, обновить date_update_started столбец, а затем добавить WHERE условие к оператору SQL выбора строк только выбранных строк с date_update_started больше некоторого значения (для убедитесь, что другой скрипт не работает над ним). Например.

$sth = $dbh->prepare(' 
    START TRANSACTION; 
    SELECT * FROM table WHERE date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000; 
    UPDATE table DAY SET date_update_started = UTC_TIMESTAMP() WHERE id IN (SELECT id FROM table WHERE date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000;); 
    COMMIT; 
'); 
$sth->execute(); // in real code some values will be bound 
$rows = $sth->fetchAll(PDO::FETCH_ASSOC); 

Из того, что я прочитал, это, по сути, реализация очереди, и, похоже, в MySQL она недоказана. Тем не менее, мне нужно найти способ позволить нескольким сценариям работать параллельно, и после исследования, которое я сделал, это то, что я придумал.

Будет ли этот тип подхода работать? Есть ли способ лучше?

+0

Как вы запускаете параллельные скрипты? – Lupin

+0

@ Lupin В настоящее время скрипт выполняется каждые 15 минут с помощью задания cron. Скрипт проверяет, работает ли другой экземпляр, и если это так прекращается. Я не уверен, как я буду управлять несколькими сценариями, но пока у меня есть счетчик в базе данных, чтобы увидеть, сколько запущено и ограничено количество экземпляров таким образом, но одна проблема за раз :-) – Nate

+0

OK , некоторые дополнительные вопросы, чтобы я мог полностью понять: 1. У вас есть скрипт, который выбирает строки и работает над ними, а затем обновляет его обратно в БД, правильно? 2. Вы хотите иметь параллельные скрипты и делать то же самое, но в разных строках? 3. Каждый раз, когда скрипт запускается, выбираемые строки являются непрерывными, то есть они 1-100, 101-200 и т. Д. Или являются ли они случайными с точки зрения идентификатора и выбираются только теми, у которых date_update_started больше 1? – Lupin

ответ

5

Я думаю, что ваш подход может работать, если вы также добавляете какой-то идентификатор к выбранным вами строкам, в которые они в настоящее время работают, он может быть предложен как @JuniusRendel, и я бы даже подумал об использовании другой строки key (случайный или идентификатор экземпляра) для случаев, когда сценарий приводил к ошибкам и не выполнялся изящно, так как вам придется очищать эти поля после обновления строк после вашей работы.

Проблема с этим подходом, поскольку я вижу, что это вариант, что будут два сценария, которые работают в одной точке, и будут выбирать одни и те же строки до того, как они будут подписаны как заблокированные. здесь, как я могу это видеть, это действительно зависит от того, какую работу вы выполняете в строках, если конечный результат в этих обоих сценариях будет таким же, я думаю, что единственная проблема, с которой вы столкнулись, - это потраченное время и память сервера (что не маленькие проблемы, но я отложу их на время ...). если ваша работа приведет к различным обновлениям на обоих сценариях, ваша проблема будет заключаться в том, что вы можете иметь неправильное обновление в конце TB.

@Jean упомянул о втором подходе, который вы можете использовать, используя блокировки MySql. Я не эксперт в этом вопросе, но, похоже, хороший подход, и использование инструкции «Select .... FOR UPDATE» может дать вам то, что вы ищете, как вы могли бы сделать по тому же вызову, чтобы выбрать & обновление - которое будет быстрее, чем 2 отдельные запросы и могут уменьшить риск для других экземпляров для выбора этих строк, поскольку они будут заблокированы.

«SELECT .... FOR UPDATE» позволяет запустить оператор выбора и зафиксировать те конкретные строки для их обновления, так что ваше заявление может выглядеть следующим образом:

START TRANSACTION; 
    SELECT * FROM tb where field='value' LIMIT 1000 FOR UPDATE; 
    UPDATE tb SET lock_field='1' WHERE field='value' LIMIT 1000; 
COMMIT; 

Замки мощный, но будьте осторожны, что это не повлияет на ваше приложение в разных разделах. Убедитесь, что эти выбранные строки, которые в настоящее время заблокированы для обновления, запрашиваются в другом месте приложения (возможно, для конечного пользователя) и что произойдет в этом случае.

Кроме того, таблицы должны быть InnoDB, и рекомендуется, чтобы поля, в которых вы проверяете предложение where, имеют индекс Mysql, как будто вы не можете заблокировать всю таблицу или встретить «Gap Lock».

Существует также вероятность того, что процесс блокировки и особенно при запуске параллельных скриптов будет тяжелым на вашем CPU & памяти.

вот еще читать по теме: http://www.percona.com/blog/2006/08/06/select-lock-in-share-mode-and-for-update/

Надеется, что это помогает, и хотел бы услышать, как вы прогрессировали.

1

Редактировать: К сожалению, я совершенно не понял ваш вопрос

Вы должны просто поставить «запертой» колонку на вашем столе поставить значение истина на записи ваш скрипт работает с, и когда это будет сделано поместить это ложно.

В моем случае я поставил три другие метки времени (целочисленные): target_ts, start_ts, done_ts. Вы

UPDATE table SET locked = TRUE WHERE target_ts<=UNIX_TIMESTAMP() AND ISNULL(done_ts) AND ISNULL(start_ts); 

, а затем

SELECT * FROM table WHERE target_ts<=UNIX_TIMESTAMP() AND ISNULL(start_ts) AND locked=TRUE; 

ли ваши рабочие места и обновлять каждую запись по одному (чтобы избежать inconcistencies данных), установив свойство done_ts для текущего времени (вы можете также разблокировать их сейчас). Вы можете обновить target_ts до следующего обновления, если хотите, или можете проигнорировать этот столбец и просто использовать done_ts для своего выбора.

+0

Я не думаю, что PHP фактически поддерживает многопоточность, но в любом случае получение нескольких экземпляров сценария для запуска не является проблемой. Вопрос в первую очередь заключается в том, как обрабатывать извлечение строк из БД. – Nate

+0

Я обновил, извините, может быть, я был пьян :). Для потоков, я не знаю, это то, что утверждает расширение PECL, но я не тестировал его, поэтому ... – n00dl3

4

У нас есть что-то подобное в исполнении.

Чтобы избежать дубликатов, мы делаем MySQL UPDATE, как это (я изменил запрос напоминать таблицу):

UPDATE queue SET id = LAST_INSERT_ID(id), date_update_started = ... 
WHERE date_update_started IS NULL AND ... 
LIMIT 1; 

Мы делаем это UPDATE в одной транзакции, и мы использовать функцию LAST_INSERT_ID. При использовании этого параметра с параметром он записывает в сеансе транзакции параметр, который в этом случае является идентификатором единственной (LIMIT 1) очереди, которая была обновлена ​​(если таковая имеется).

Только после этого мы делаем:

SELECT LAST_INSERT_ID(); 

При использовании без параметра, он извлекает ранее сохраненное значение, получение идентификатора элемента в очереди, которая должна быть выполнена.

+0

Можете ли вы подробнее рассказать о том, что вы подразумеваете под «write locks»? Возможно, с примером кода? – Nate

+0

@Nate, отредактированный и расширенный;) Я также предлагаю использовать RabbitMQ, BTW. Мы мечтаем использовать его: D – Jean

1

Каждый раз, когда запускается скрипт, у меня будет скрипт генерации uniqid.

$sctiptInstance = uniqid(); 

Я хотел бы добавить столбец экземпляра сценария, чтобы держать это значение в качестве VARCHAR и поставить индекс на нем. Когда сценарий запускается, я бы использовал select для обновления внутри транзакции, чтобы выбирать ваши строки на основе любой логики, исключая строки с экземпляром сценария, а затем обновлять эти строки экземпляром сценария.Что-то вроде:

START TRANSACTION; 
SELECT * FROM table WHERE script_instance = '' AND date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000 FOR UPDATE; 
UPDATE table SET date_update_started = UTC_TIMESTAMP(), script_instance = '{$scriptInstance}' WHERE script_instance = '' AND date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000; 
COMMIT; 

Теперь эти строки будут исключены из других экземпляров скрипта. Вы работаете, а затем обновляете строки, чтобы вернуть экземпляр сценария в нуль или пустое, а также обновите свой последний обновленный столбец.

Вы также можете использовать экземпляр сценария для записи в другую таблицу, называемую «текущие экземпляры» или что-то в этом роде, и попросите скрипт проверить, что таблица получает количество запущенных скриптов для управления количеством одновременных скриптов. Я бы добавил PID скрипта к таблице. Затем вы можете использовать эту информацию для создания сценария ведения домашнего хозяйства, который периодически запускается из cron, чтобы проверять длительные процессы или процессы изгоев и убивать их и т. Д.

1

У меня есть такая система, которая работает именно так на производстве. Мы выполняем скрипт каждую минуту, чтобы выполнить некоторую обработку, и иногда этот запуск может занять более минуты.

У нас есть столбец таблицы для статуса, который равен 0 для NOT RUN YET, 1 для FINISHED и другого значения для текущего.

Первое, что делает скрипт, - это обновление таблицы, установка строки или нескольких строк со значением, означающим, что мы работаем над этой линией. Мы используем getmypid() для обновления строк, над которыми мы хотим работать, и которые по-прежнему не обрабатываются.

Когда мы закончим обработку, скрипт обновляет строки, имеющие один и тот же идентификатор процесса, помечая их как завершенные (статус 1).

Таким образом, мы избегаем каждого сценария, чтобы попытаться обработать линию, которая уже обрабатывается, и она работает как шарм. Это не означает, что нет лучшего способа, но это делает работу.

1

Я использовал хранимую процедуру по очень похожим причинам в прошлом. Мы использовали блокировку чтения FOR UPDATE для блокировки таблицы, в то время как выбранный флаг был обновлен, чтобы удалить эту запись из любых будущих выборов. Это выглядело примерно так:

Нет причин, по которым это должно быть сделано в хранимой процедуре, хотя теперь я думаю об этом.

Смежные вопросы