(PECL pthreads >= 2.0.0)
Введение
A - это контейнер и контроллер для регулируемого числа Workers.
Объединение в пул обеспечивает более высокий уровень абстракции рабочих функций, включая управление ссылками способом, требуемым pthreads.
Обзор классов
class Pool { /* Свойства */ protected $size ; protected $class ; protected $workers ; protected $ctor ; protected $last ; /* Методы */ public int collect ([ Callable $collector ] ) public Pool __construct ( int $size [, string $class [, array $ctor ]] ) public void resize ( int $size ) public void shutdown ( void ) public int submit ( Threaded $task ) public int submitTo ( int $worker , Threaded $task ) }
Свойства
- size
-
Максимальное число Workers, которое может использовать этот Pool
- class
-
класс Worker
- workers
-
ссылки на Workers
- ctor
-
аргументы для конструктора новых Workers
- last
-
смещение в workers последнего используемого Worker
Простой пример с Collectable (в основном Thread предназначен для Pool) и Pool
<?php class job extends Collectable { public $val; public function __construct($val){ // инициировать некоторые свойства $this->val = $val; } public function run(){ // поработай немного $this->val = $this->val . file_get_contents('http://www.example.com/', null, null, 3, 20); $this->setGarbage(); } } // Максимум 3 темы будут работать одновременно $p = new Pool(3); $tasks = array( new job('0'), new job('1'), new job('2'), new job('3'), new job('4'), new job('5'), new job('6'), new job('7'), new job('8'), new job('9'), new job('10'), ); // Добавить задачи в очередь пула foreach ($tasks as $task) { $p->submit($task); } // завершение работы будет ожидать завершения текущей очереди $p->shutdown(); // проверка / чтение результатов сборки мусора $p->collect(function($checkingTask){ echo $checkingTask->val; return $checkingTask->isGarbage(); }); ?>
Обратите внимание, что при использовании функции сбора важно расширить класс пула, чтобы вы могли продолжать проверять готовые потоки, пока все они не будут завершены.
<?php class TestWork extends Threaded { protected $complete; //$pData-это данные, отправляемые в рабочий поток для выполнения его работы. public function __construct($pData){ //перенести все переменные в локальные переменные $this->complete = false; $this->testData = $pData; } //Здесь будет сделана вся ваша работа. public function run(){ usleep(2000000); //сон 2 секунды для того чтобы сымитировать большую работу $this->complete = true; } public function isGarbage() { return $this->complete; } } class ExamplePool extends Pool { public $data = array(); public function process() { // Выполните этот цикл, // пока у нас есть задания в пуле while (count($this->work)) { $this->collect(function (TestWork $task) { // Если задача была помечена как выполненная, // соберите ее результаты if ($task->isGarbage()) { $tmpObj = new stdclass(); $tmpObj->complete = $task->complete; // это то, как вы возвращаете свои законченные данные [доступно для $pool->process()] $this->data[] = $tmpObj; } return $task->isGarbage(); }); } // Все работы выполнены, // мы можем закрыть pool $this->shutdown(); return $this->data; } } $pool = new ExamplePool(3); $testData = 'asdf'; for($i=0;$i<5;$i++) { $pool->submit(new TestWork($testData)); } $retArr = $pool->process(); //получить все результаты echo '<pre>'; print_r($retArr); //вернуть массив результатов (и, возможно, ошибки) echo '</pre>'; ?>
Пример класса, чтобы продемонстрировать использование пула / рабочего механизма, а также показать несколько трюков и подсказок ;)
<?php class Config extends Threaded{ // общий глобальный объект protected $val=0, $val2=0; protected function inc(){++$this->val;} // защищенная синхронизация на объект public function inc2(){++$this->val2;} // нет синхронизации } class WorkerClass extends Worker{ protected static $worker_id_next = -1; protected $worker_id; protected $config; public function __construct($config){ $this->worker_id = ++static::$worker_id_next; // статические члены не доступны в потоке, но находятся в "основном потоке" $this->config = $config; } public function run(){ global $config; $config = $this->config; // ПРИМЕЧАНИЕ: настройка по ссылке НЕ РАБОТАЕТ global $worker_id; $worker_id = $this->worker_id; echo "working context {$worker_id} is created!\n"; //$this->say_config(); // глобально синхронизированная функция. } protected function say_config(){ // 'protected' синхронизируется побочным объектом, поэтому НЕ БУДЕТ работать между несколькими экземплярами, global $config; // вы можете использовать общий объект $config в качестве источника синхронизации. $config->synchronized(function() use (&$config){ // ПРИМЕЧАНИЕ: вы можете использовать здесь Closures, // но если вы прикрепите Closure к объекту с потоком, он будет уничтожен, поскольку не может быть сериализован var_dump($config); }); } } class Task extends Stackable{ // Стекируемый до сих пор существует, просто как-то исчез из документации (вероятно, по ошибке). // Для получения более подробной информации смотрите документацию к старой версии. protected $set; public function __construct($set){ $this->set = $set; } public function run(){ global $worker_id; echo "task is running in {$worker_id}!\n"; usleep(mt_rand(1,100)*100); $config = $this->getConfig(); $val = $config->arr->shift(); $config->arr[] = $this->set; for ($i = 0 ; $i < 1000; ++$i){ $config->inc(); $config->inc2(); } } public function getConfig(){ global $config; // WorkerClass устанавливает это в области видимости потока, может быть повторно использовано Задачами для дополнительного асинхронного источника данных. //(то есть: пул соединений или очередь задач к демультиплексору) return $config; } } $config = new Config; $config->arr = new \Threaded(); $config->arr->merge(array(1,2,3,4,5,6)); class PoolClass extends Pool{ public function worker_list(){ if ($this->workers !== null) return array_keys($this->workers); return null; } } $pool = new PoolClass(3, 'WorkerClass', [$config] ); $pool->worker_list(); //$pool->submitTo(0,new Task(-10)); // submitTo не пытается создать worker $spammed_id = -1; for ($i = 1; $i <= 100; ++$i){ // добавить некоторые задания if ($spammed_id == -1 && ($x = $pool->worker_list())!= null && @$x[2]){ $spammed_id = $x[2]; echo "spamming worker {$spammed_id} with lots of tasks from now on\n"; } if ($spammed_id != -1 && ($i % 5) == 0) // каждая пятая работа направляется одному работнику, поэтому она имеет 20% от общего числа рабочих мест //(с 3 работниками она должна делать ~33%, а не (33+20)%, поэтому делегируйте только работнику, если вы планируете также балансировать... ) $pool->submitTo($spammed_id,new Task(10*$i)); else $pool->submit(new Task(10*$i)); } $pool->shutdown(); var_dump($config); // "val" составляет ровно 100000, "val2", наверное, немного меньше // также: если вы отключите спамер, вы увидите, что порядок "arr" является случайным. ?>