Сайт справки
  • Авторизация
  • Обратная связь
  • Теги
    • Joomla
      • JLoader
      • Mootools
      • Bootstrap
      • jQuery
      • jQuery UI
      • Конфликты JS
      • Joomla! Javascript Framework
      • JS Фреймворки
      • Ajax
      • JS
      • MVC
      • JoomShopping
      • Сессии
    • JS

Расширенный поиск
  • Главная
  • Разное
  • Netbeans
  • php
  • Многопоточность
  • pthreads
  • Pool

(PECL pthreads >= 2.0.0)

Введение

A - это контейнер и контроллер для регулируемого числа Workers.

Объединение в пул обеспечивает более высокий уровень абстракции рабочих функций, включая управление ссылками способом, требуемым pthreads.

Обзор классов

class Pool {
	/* Свойства */
	protected $size ;
	protected $class ;
	protected $workers ;
	protected $ctor ;
	protected $last ;
	/* Методы */
	public int collect ([ Callable $collector ] )
	public Pool __construct ( int $size [, string $class [, array $ctor ]] )
	public void resize ( int $size )
	public void shutdown ( void )
	public int submit ( Threaded $task )
	public int submitTo ( int $worker , Threaded $task )
}

Свойства

size

Максимальное число Workers, которое может использовать этот Pool

class

класс Worker

workers

ссылки на Workers

ctor

аргументы для конструктора новых Workers

last

смещение в workers последнего используемого Worker

Простой пример с Collectable (в основном Thread предназначен для Pool) и Pool

<?php

class job extends Collectable {
  public $val;

  public function __construct($val){
    // инициировать некоторые свойства
    $this->val = $val;
  }
  public function run(){
    // поработай немного
    $this->val = $this->val . file_get_contents('http://www.example.com/', null, null, 3, 20);
    $this->setGarbage();
  }
}

// Максимум 3 темы будут работать одновременно
$p = new Pool(3);

$tasks = array(
  new job('0'),
  new job('1'),
  new job('2'),
  new job('3'),
  new job('4'),
  new job('5'),
  new job('6'),
  new job('7'),
  new job('8'),
  new job('9'),
  new job('10'),
);
// Добавить задачи в очередь пула
foreach ($tasks as $task) {
  $p->submit($task);
}

// завершение работы будет ожидать завершения текущей очереди
$p->shutdown();
// проверка / чтение результатов сборки мусора
$p->collect(function($checkingTask){
  echo $checkingTask->val;
  return $checkingTask->isGarbage();
});

?>

Обратите внимание, что при использовании функции сбора важно расширить класс пула, чтобы вы могли продолжать проверять готовые потоки, пока все они не будут завершены.

<?php
class TestWork extends Threaded {
    protected $complete;
    //$pData-это данные, отправляемые в рабочий поток для выполнения его работы.
    public function __construct($pData){
        //перенести все переменные в локальные переменные
        $this->complete = false;
        $this->testData = $pData;
    }
    //Здесь будет сделана вся ваша работа.
    public function run(){
        usleep(2000000); //сон 2 секунды для того чтобы сымитировать большую работу
        $this->complete = true;
    }
    public function isGarbage() {
        return $this->complete;
    }
}
class ExamplePool extends Pool
{
    public $data = array();
    public function process()
    {
        // Выполните этот цикл,
        // пока у нас есть задания в пуле
        while (count($this->work)) {
            $this->collect(function (TestWork $task) {
                // Если задача была помечена как выполненная,
                // соберите ее результаты
                if ($task->isGarbage()) {
                    $tmpObj = new stdclass();
                    $tmpObj->complete = $task->complete;
                    // это то, как вы возвращаете свои законченные данные [доступно для $pool->process()]
                    $this->data[] = $tmpObj;
                }
                return $task->isGarbage();
            });
        }
        // Все работы выполнены,
        // мы можем закрыть pool
        $this->shutdown();
        return $this->data;
    }
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for($i=0;$i<5;$i++) {
    $pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //получить все результаты
echo '<pre>';
print_r($retArr); //вернуть массив результатов (и, возможно, ошибки)
echo '</pre>';
?>

Пример класса, чтобы продемонстрировать использование пула / рабочего механизма, а также показать несколько трюков и подсказок ;)

<?php
class Config extends Threaded{    // общий глобальный объект
    protected $val=0, $val2=0;
    protected function inc(){++$this->val;}    // защищенная синхронизация на объект
    public function inc2(){++$this->val2;}    // нет синхронизации
}
class WorkerClass extends Worker{
    protected static $worker_id_next = -1;
    protected $worker_id;
    protected $config;
    public function __construct($config){
        $this->worker_id = ++static::$worker_id_next;    // статические члены не доступны в потоке, но находятся в "основном потоке"
        $this->config = $config;
    }
    public function run(){
        global $config;
        $config = $this->config;    // ПРИМЕЧАНИЕ: настройка по ссылке НЕ РАБОТАЕТ
        global $worker_id;
        $worker_id = $this->worker_id;
        echo "working context {$worker_id} is created!\n";
        //$this->say_config();    // глобально синхронизированная функция.
    }
    protected function say_config(){    // 'protected' синхронизируется побочным объектом, поэтому НЕ БУДЕТ работать между несколькими экземплярами,
        global $config;        // вы можете использовать общий объект $config в качестве источника синхронизации.
        $config->synchronized(function() use (&$config){    // ПРИМЕЧАНИЕ: вы можете использовать здесь Closures, 
															// но если вы прикрепите Closure к объекту с потоком, он будет уничтожен, поскольку не может быть сериализован
            var_dump($config);
        });
    }
}
class Task extends Stackable{    // Стекируемый до сих пор существует, просто как-то исчез из документации (вероятно, по ошибке). 
								// Для получения более подробной информации смотрите документацию к старой версии.
    protected $set;
    public function __construct($set){
        $this->set = $set;
    }
    public function run(){
        global $worker_id;
        echo "task is running in {$worker_id}!\n";
        usleep(mt_rand(1,100)*100);
        $config = $this->getConfig();
        $val = $config->arr->shift();
        $config->arr[] = $this->set;
        for ($i = 0 ; $i < 1000; ++$i){
            $config->inc();
            $config->inc2();
        }
    }
    public function getConfig(){
        global $config;    // WorkerClass устанавливает это в области видимости потока, может быть повторно использовано Задачами для дополнительного асинхронного источника данных. 
							//(то есть: пул соединений или очередь задач к демультиплексору)
        return $config;
    }
}

$config = new Config;
$config->arr = new \Threaded();
$config->arr->merge(array(1,2,3,4,5,6));
class PoolClass extends Pool{
    public function worker_list(){
        if ($this->workers !== null)
            return array_keys($this->workers);
        return null;
    }
}
$pool = new PoolClass(3, 'WorkerClass', [$config] );
$pool->worker_list();
//$pool->submitTo(0,new Task(-10));    // submitTo не пытается создать worker

$spammed_id = -1;
for ($i = 1; $i <= 100; ++$i){    // добавить некоторые задания
    if ($spammed_id == -1 && ($x = $pool->worker_list())!= null && @$x[2]){
        $spammed_id = $x[2];
        echo "spamming worker {$spammed_id} with lots of tasks from now on\n";
    }
    if ($spammed_id != -1 && ($i % 5) == 0)    // каждая пятая работа направляется одному работнику, поэтому она имеет 20% от общего числа рабочих мест 
												//(с 3 работниками она должна делать ~33%, а не (33+20)%, поэтому делегируйте только работнику, если вы планируете также балансировать... )
        $pool->submitTo($spammed_id,new Task(10*$i));   
    else
        $pool->submit(new Task(10*$i));
}
$pool->shutdown();
var_dump($config); // "val" составляет ровно 100000, "val2", наверное, немного меньше
// также: если вы отключите спамер, вы увидите, что порядок "arr" является случайным.
?>

Содержание

Pool::__construct - создает новый пул работников
Pool::collect — Собирать ссылки на выполненные задачи
Pool::resize - изменить размер пула
Pool::shutdown - отключение всех работников
Pool::submit - отправляет объект на выполнение
Pool::submitTo - отправляет задачу конкретному работнику для выполнения

  • Карта сайта

Наверх

© Сайт справки 2023

Яндекс.Метрика