2014-01-13 4 views
3

EDIT: Чтобы уточнить и упростить: я ищу «хороший» способ отправить больше объектов Stackable в Pool каждый раз, когда Stackable заканчивается (используя данные из этого первого Stackable, чтобы добавить второй) , У меня есть идеи опроса объектов до тех пор, пока они не закончится (неэффективны и некрасивы) и передадут ссылки на объект Pool (я не смог заставить его работать). Базовый код это один: https://github.com/krakjoe/pthreads/blob/master/examples/Pooling.phpPHP Пул по пул

Теперь, полное описание:

Я работаю над приложением в PHP, которая выросла слишком много и занимает много времени. Из-за этого я пытаюсь использовать многопоточность этого приложения, используя пул потоков (я знаю, что PHP не самый лучший вариант, но я не хочу и не могу изменить язык на данном этапе).

Проблема заключается в том, что есть две стадии приложения, которые должны идти в порядке, и у каждого из них есть много подзадач, которые могут идти одновременно. Итак, это процесс в моей голове:

  • На этапе 1 будут подзадача N, эти подзадачи будут Stackable objects.
  • Когда заканчивается подзадача i, необходимо уведомлять «основное» (то, которое создает пул, стеки и т. Д.), И выполнить этап 2 для подзадачи i с некоторыми данными из подзадачи i (другой объект Stackable). На этом этапе будут подзадачи M для каждой из подзадач этапа 1.

Я хотел бы использовать тот же пул потоков для потоков на этапах 1 и 2, и единственное решение, для перехода с этапа 1 на этап 2 выполняется опрос каждой из подзадач N до тех пор, пока один из них не закончится, а затем вызовите этап 2 для завершаемого и повторите до тех пор, пока все подзадачи N не закончится.

Я использую пример пула потоков, включенный в источник pthreads Джо Уоткинсом в качестве базового кода.

+1

* "PHP не лучший вариант" * является своего рода занижение. –

+0

Я не понимал, что PHP стал [потокобезопасным] (http://stackoverflow.com/questions/681081/is-php-thread-safe). –

+0

Я ищу способ перехода от этапа 1 к этапу 2 – markmb

ответ

7

Вы должны начать с чтения: https://gist.github.com/krakjoe/6437782

<?php 
/** 
* Normal worker 
*/ 
class PooledWorker extends Worker { 
    public function run(){} 
} 


/** 
* Don't descend from pthreads, normal objects should be used for pools 
*/ 
class Pool { 
    protected $size; 
    protected $workers; 

    /** 
    * Construct a worker pool of the given size 
    * @param integer $size 
    */ 
    public function __construct($size) { 
     $this->size = $size; 
    } 

    /** 
    * Start worker threads 
    */ 
    public function start() { 
     while (@$worker++ < $this->size) { 
      $this->workers[$worker] = new PooledWorker(); 
      $this->workers[$worker]->start(); 
     } 
     return count($this->workers); 
    } 

    /** 
    * Submit a task to pool 
    */ 
    public function submit(Stackable $task) { 
     $this->workers[array_rand($this->workers)] 
      ->stack($task); 
     return $task; 
    } 

    /** 
    * Shutdown worker threads 
    */ 
    public function shutdown() { 
     foreach ($this->workers as $worker) 
      $worker->shutdown(); 
    } 
} 

class StageTwo extends Stackable { 
    /** 
    * Construct StageTwo from a part of StageOne data 
    * @param int $data 
    */ 
    public function __construct($data) { 
     $this->data = $data; 
    } 

    public function run(){ 
     printf(
      "Thread %lu got data: %d\n", 
      $this->worker->getThreadId(), $this->data); 
    } 
} 

class StageOne extends Stackable { 
    protected $done; 

    /** 
    * Construct StageOne with suitable storage for data 
    * @param StagingData $data 
    */ 
    public function __construct(StagingData $data) { 
     $this->data = $data; 
    } 

    public function run() { 
     /* create dummy data array */ 
     while (@$i++ < 100) { 
      $this->data[] = mt_rand(
       20, 1000); 
     } 
     $this->done = true; 
    } 
} 

/** 
* StagingData to hold data from StageOne 
*/ 
class StagingData extends Stackable { 
    public function run() {} 
} 

/* stage and data reference arrays */ 
$one = []; 
$two = []; 
$data = []; 

$pool = new Pool(8); 
$pool->start(); 

/* construct stage one */ 
while (count($one) < 10) { 
    $staging = new StagingData(); 
    /* maintain reference counts by storing return value in normal array in local scope */ 
    $one[] = $pool 
     ->submit(new StageOne($staging)); 
    /* maintain reference counts */ 
    $data[] = $staging; 
} 

/* construct stage two */ 
while (count($one)) { 

    /* find completed StageOne objects */ 
    foreach ($one as $id => $job) { 
     /* if done is set, the data from this StageOne can be used */ 
     if ($job->done) { 
      /* use each element of data to create new tasks for StageTwo */ 
      foreach ($job->data as $chunk) { 
       /* submit stage two */ 
       $two[] = $pool 
        ->submit(new StageTwo($chunk)); 
      } 

      /* no longer required */ 
      unset($one[$id]); 
     } 
    } 

    /* in the real world, it is unecessary to keep polling the array */ 
    /* you probably have some work you want to do ... do it :) */ 
    if (count($one)) { 
     /* everyone likes sleep ... */ 
     usleep(1000000); 
    } 
} 

/* all tasks stacked, the pool can be shutdown */ 
$pool->shutdown(); 
?> 

Выведет:

Thread 140012266239744 got data: 612 
Thread 140012275222272 got data: 267 
Thread 140012257257216 got data: 971 
Thread 140012033140480 got data: 881 
Thread 140012257257216 got data: 1000 
Thread 140012016355072 got data: 261 
Thread 140012257257216 got data: 510 
Thread 140012016355072 got data: 148 
Thread 140012016355072 got data: 501 
Thread 140012257257216 got data: 767 
Thread 140012024747776 got data: 504 
Thread 140012033140480 got data: 401 
Thread 140012275222272 got data: 20 
<-- trimmed from 1000 lines --> 
Thread 140012041533184 got data: 285 
Thread 140012275222272 got data: 811 
Thread 140012041533184 got data: 436 
Thread 140012257257216 got data: 977 
Thread 140012033140480 got data: 830 
Thread 140012275222272 got data: 554 
Thread 140012024747776 got data: 704 
Thread 140012033140480 got data: 50 
Thread 140012257257216 got data: 794 
Thread 140012024747776 got data: 724 
Thread 140012033140480 got data: 624 
Thread 140012266239744 got data: 756 
Thread 140012284204800 got data: 997 
Thread 140012266239744 got data: 708 
Thread 140012266239744 got data: 981 

Потому что вы хотите использовать один пул, у вас есть небольшой выбор, но создавать задачи в основном контексте, где пул создан. Я могу представить другие решения, но вы специально попросили такого решения.

В зависимости от оборудования, которое у меня было в моем распоряжении, и характера задач и данных, которые нужно обработать, я мог бы иметь несколько [маленьких] пулов потоков, по одному для каждого рабочего, это позволит StageOne создавать StageTwo объекты в рабочем контексте, которые их выполняют, может быть что-то рассмотреть.

+0

Спасибо за ваш ответ и очень интересное чтение. Но это не совсем то, что мне нужно. Когда Stackables со стадии 1 заканчивается, данные должны быть обработаны, и новые Stackables должны быть созданы. Для каждого Stackable с этапа 1 может быть как 1000 Stackables на втором этапе. – markmb

+1

. Я сегодня второй занятый парень на поверхности земли, позвольте мне вернуться к нему завтра, и вы получите лучший пример ... –

+0

Звуки как то, что я пытаюсь сделать: я хочу, чтобы задача или работник решали, что следующая задача должна выполняться на бит данных. В конечном итоге я решил создать очередь, которая задает, какую задачу выполнять, на какой бит данных, основной цикл программы собирает задачи из очереди и отправляет их в пул. Тем не менее, все же не были обработаны все ошибки, но самый важный из них: он не работает. – Gralgrathor

1

Кажется, это тоже работает:

//Simple example with Collectable (basically Thread meant for Pool) and Pool 

<?php 

class job extends Collectable { 
    public $val; 

    public function __construct($val){ 
    // init some properties 
    $this->val = $val; 
    } 
    public function run(){ 
    // do some work 
    $this->val = $this->val . file_get_contents('http://www.example.com/', null, null, 3, 20); 
    $this->setGarbage(); 
    } 
} 

// At most 3 threads will work at once 
$p = new Pool(3); 

$tasks = array(
    new job('0'), 
    new job('1'), 
    new job('2'), 
    new job('3'), 
    new job('4'), 
    new job('5'), 
    new job('6'), 
    new job('7'), 
    new job('8'), 
    new job('9'), 
    new job('10'), 
); 
// Add tasks to pool queue 
foreach ($tasks as $task) { 
    $p->submit($task); 
} 

// shutdown will wait for current queue to be completed 
$p->shutdown(); 
// garbage collection check/read results 
$p->collect(function($checkingTask){ 
    echo $checkingTask->val; 
    return $checkingTask->isGarbage(); 
}); 

?> 
Смежные вопросы