(PECL pthreads >= 2.0.0)
Введение
A - это контейнер и контроллер для регулируемого числа Workers.
Объединение в пул обеспечивает более высокий уровень абстракции рабочих функций, включая управление ссылками способом, требуемым pthreads.
Обзор классов
class Pool {
/* Свойства */
protected $size ;
protected $class ;
protected $workers ;
protected $ctor ;
protected $last ;
/* Методы */
public int collect ([ Callable $collector ] )
public Pool __construct ( int $size [, string $class [, array $ctor ]] )
public void resize ( int $size )
public void shutdown ( void )
public int submit ( Threaded $task )
public int submitTo ( int $worker , Threaded $task )
}
Свойства
- size
-
Максимальное число Workers, которое может использовать этот Pool
- class
-
класс Worker
- workers
-
ссылки на Workers
- ctor
-
аргументы для конструктора новых Workers
- last
-
смещение в workers последнего используемого Worker
Простой пример с Collectable (в основном Thread предназначен для Pool) и Pool
<?php
class job extends Collectable {
public $val;
public function __construct($val){
// инициировать некоторые свойства
$this->val = $val;
}
public function run(){
// поработай немного
$this->val = $this->val . file_get_contents('http://www.example.com/', null, null, 3, 20);
$this->setGarbage();
}
}
// Максимум 3 темы будут работать одновременно
$p = new Pool(3);
$tasks = array(
new job('0'),
new job('1'),
new job('2'),
new job('3'),
new job('4'),
new job('5'),
new job('6'),
new job('7'),
new job('8'),
new job('9'),
new job('10'),
);
// Добавить задачи в очередь пула
foreach ($tasks as $task) {
$p->submit($task);
}
// завершение работы будет ожидать завершения текущей очереди
$p->shutdown();
// проверка / чтение результатов сборки мусора
$p->collect(function($checkingTask){
echo $checkingTask->val;
return $checkingTask->isGarbage();
});
?>
Обратите внимание, что при использовании функции сбора важно расширить класс пула, чтобы вы могли продолжать проверять готовые потоки, пока все они не будут завершены.
<?php
class TestWork extends Threaded {
protected $complete;
//$pData-это данные, отправляемые в рабочий поток для выполнения его работы.
public function __construct($pData){
//перенести все переменные в локальные переменные
$this->complete = false;
$this->testData = $pData;
}
//Здесь будет сделана вся ваша работа.
public function run(){
usleep(2000000); //сон 2 секунды для того чтобы сымитировать большую работу
$this->complete = true;
}
public function isGarbage() {
return $this->complete;
}
}
class ExamplePool extends Pool
{
public $data = array();
public function process()
{
// Выполните этот цикл,
// пока у нас есть задания в пуле
while (count($this->work)) {
$this->collect(function (TestWork $task) {
// Если задача была помечена как выполненная,
// соберите ее результаты
if ($task->isGarbage()) {
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
// это то, как вы возвращаете свои законченные данные [доступно для $pool->process()]
$this->data[] = $tmpObj;
}
return $task->isGarbage();
});
}
// Все работы выполнены,
// мы можем закрыть pool
$this->shutdown();
return $this->data;
}
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for($i=0;$i<5;$i++) {
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //получить все результаты
echo '<pre>';
print_r($retArr); //вернуть массив результатов (и, возможно, ошибки)
echo '</pre>';
?>
Пример класса, чтобы продемонстрировать использование пула / рабочего механизма, а также показать несколько трюков и подсказок ;)
<?php
class Config extends Threaded{ // общий глобальный объект
protected $val=0, $val2=0;
protected function inc(){++$this->val;} // защищенная синхронизация на объект
public function inc2(){++$this->val2;} // нет синхронизации
}
class WorkerClass extends Worker{
protected static $worker_id_next = -1;
protected $worker_id;
protected $config;
public function __construct($config){
$this->worker_id = ++static::$worker_id_next; // статические члены не доступны в потоке, но находятся в "основном потоке"
$this->config = $config;
}
public function run(){
global $config;
$config = $this->config; // ПРИМЕЧАНИЕ: настройка по ссылке НЕ РАБОТАЕТ
global $worker_id;
$worker_id = $this->worker_id;
echo "working context {$worker_id} is created!\n";
//$this->say_config(); // глобально синхронизированная функция.
}
protected function say_config(){ // 'protected' синхронизируется побочным объектом, поэтому НЕ БУДЕТ работать между несколькими экземплярами,
global $config; // вы можете использовать общий объект $config в качестве источника синхронизации.
$config->synchronized(function() use (&$config){ // ПРИМЕЧАНИЕ: вы можете использовать здесь Closures,
// но если вы прикрепите Closure к объекту с потоком, он будет уничтожен, поскольку не может быть сериализован
var_dump($config);
});
}
}
class Task extends Stackable{ // Стекируемый до сих пор существует, просто как-то исчез из документации (вероятно, по ошибке).
// Для получения более подробной информации смотрите документацию к старой версии.
protected $set;
public function __construct($set){
$this->set = $set;
}
public function run(){
global $worker_id;
echo "task is running in {$worker_id}!\n";
usleep(mt_rand(1,100)*100);
$config = $this->getConfig();
$val = $config->arr->shift();
$config->arr[] = $this->set;
for ($i = 0 ; $i < 1000; ++$i){
$config->inc();
$config->inc2();
}
}
public function getConfig(){
global $config; // WorkerClass устанавливает это в области видимости потока, может быть повторно использовано Задачами для дополнительного асинхронного источника данных.
//(то есть: пул соединений или очередь задач к демультиплексору)
return $config;
}
}
$config = new Config;
$config->arr = new \Threaded();
$config->arr->merge(array(1,2,3,4,5,6));
class PoolClass extends Pool{
public function worker_list(){
if ($this->workers !== null)
return array_keys($this->workers);
return null;
}
}
$pool = new PoolClass(3, 'WorkerClass', [$config] );
$pool->worker_list();
//$pool->submitTo(0,new Task(-10)); // submitTo не пытается создать worker
$spammed_id = -1;
for ($i = 1; $i <= 100; ++$i){ // добавить некоторые задания
if ($spammed_id == -1 && ($x = $pool->worker_list())!= null && @$x[2]){
$spammed_id = $x[2];
echo "spamming worker {$spammed_id} with lots of tasks from now on\n";
}
if ($spammed_id != -1 && ($i % 5) == 0) // каждая пятая работа направляется одному работнику, поэтому она имеет 20% от общего числа рабочих мест
//(с 3 работниками она должна делать ~33%, а не (33+20)%, поэтому делегируйте только работнику, если вы планируете также балансировать... )
$pool->submitTo($spammed_id,new Task(10*$i));
else
$pool->submit(new Task(10*$i));
}
$pool->shutdown();
var_dump($config); // "val" составляет ровно 100000, "val2", наверное, немного меньше
// также: если вы отключите спамер, вы увидите, что порядок "arr" является случайным.
?>