(PECL pthreads >= 2.0.0)

Pool::collect — Собирать ссылки на выполненные задачи

Описание

public int Pool::collect ([ Callable $collector ] )

Позволяет пулу собирать ссылки, определенные сборщиком как мусор.

Список параметров

collector

Вызываемый коллектор, возвращающий логическое значение о том, может ли задача быть собрана или нет. Только в редких случаях необходимо использовать пользовательский коллектор.

Возвращаемые значения

Число оставшихся задач в пуле, которые необходимо собрать.

Список изменений

ВерсияОписание
v3 Теперь возвращается целое число, а параметр collector является необязательным.

Примеры

Пример #1 Простой пример Pool::collect()

<?php
$pool = new Pool(4);

for ($i = 0; $i < 15; ++$i) {
    $pool->submit(new class extends Threaded {});
}

while ($pool->collect()); // блокирует выполнение всех задач

$pool->shutdown();

Пример ошибки кода и заставила меня потратить 2 РАБОЧИХ дня
Прежде всего,` Stackable ' не имеет атрибута с именем $worker или его метод доступа сделал его недоступным.

Во-вторых, `Stackable` также не имеет `getThreadId()` . Рекомендуется использовать класс Thread для реализации потока, так как он имеет больше функций управления.
Лучше использовать ' Stackable` для хранения объектов и использовать его `run()` в качестве инициализации.

Рабочий пример

<?php
    class MyWork extends Thread {
        protected $complete;

        public function __construct() {
            $this->complete = false;
        }

        public function run() {
            printf(
                "Hello from %s in Thread #%lu\n",
                __CLASS__, $this->getThreadId());
            $this->complete = true;
        }

        public function isComplete() {
            return $this->complete;
        }
    }

    class Something {}

    class MyWorker extends Worker {

        public function __construct(Something $something) {
            $this->something = $something;
        }

        public function run() {
            /** ... **/
        }
    }

    $pool = new Pool(8, \MyWorker::class, [new Something()]);
    $pool->submit(new MyWork());

    usleep(1000);

    $pool->collect(function($work){
        return $work->isComplete();
    });
    var_dump($pool);
?>

Этот пример демонстрирует различные аспекты MTP с pthreads-существенно стоит отметить, двунаправленная связь с дочерними потоками.
Я ничего не смог найти об этом, поэтому хотел бы представить вам результаты своих исследований.

<?php

class Model
{
   
    public $id;
    public $value;
   
}

class Connection
    extends Worker
{
   
    protected static $link;
   
   
    public function __construct($hostname, $username, $password, $database, $port = 3306)
    {
        $this->hostname = $hostname;
        $this->username = $username;
        $this->password = $password;
        $this->database = $database;
        $this->port = $port;
    }
   
    public function getConnection()
    {
        if(!self::$link)
        {
            echo 'Thread: '. $this->getThreadId() ." Connecting to db\n";
            self::$link = new \PDO(...);
        }
       
        return self::$link;
    }
   
}

/** @property Connection $worker */
class QueryTask
    extends Threaded
{
   
    public $data;
    public $result;
   
    protected $_complete;
   
   
    public function __construct(Model $data)
    {
        $this->_complete = false;
        $this->data = $data;
    }
   
    public function run()
    {
        /** @var \PDO $pdo */
        $pdo = $this->worker->getConnection();
       
        $text = 'Thread: '. $this->worker->getThreadId() .' Job: '. $this->data->id .' Data: '. $this->data->value;
       
        $t = microtime(true);
       
        $stmt = $pdo->prepare("
            INSERT INTO `test` (`id`, `text`) VALUES (NULL, '". $text ."')
        ");
        $stmt->execute();
       
        $dt = microtime(true) - $t;
       
        $result = (int) $stmt->rowCount();
       
        echo $text .' Result: '. $result .' Exec time: '. $dt ."s\n";
       
        $this->result = $result;
        $this->_complete = true;
    }
   
    public function isGarbage() : bool
    {
        return $this->_complete;
    }
   
}

$t = microtime(true);

// запуск
$pool = new Pool(5, 'Connection', [ 'localhost', 'root', 'password', 'test' ]);

// задачи
$tasks = 10;

for($i=0; $i<$tasks; ++$i)
{
    $object = new Model();
    $object->id = $i;
    $object->value = rand();

    $pool->submit(new QueryTask($object));
}

// ожидание завершения
$data = [];

while(1)
{
    $newData = [];

    $pool->collect(function(QueryTask $task) use (&$newData) {
        if($task->isGarbage())
        {
            $tmpObj = new stdclass();
            $tmpObj->complete = $task->complete;
           
            $newData[ $task->data->id ] = $task->data->value;
        }
       
        return $task->isGarbage();
    });
   
    $data = array_merge($data, $newData);
   
    if(count($data) == $tasks)
        break;
   
    usleep(100000);
}

var_dump($data);
?>

Результат:

Thread: 6796 Connecting to db
Thread: 3156 Connecting to db
Thread: 9040 Connecting to db
Thread: 7748 Connecting to db
Thread: 8836 Connecting to db
Job: 0 Done in: 0.0070011615753174s
Job: 4 Done in: 0.0069999694824219s
Job: 2 Done in: 0.0090010166168213s
Job: 3 Done in: 0.0090010166168213s
Job: 1 Done in: 0.003000020980835s
Job: 5 Done in: 0.0069999694824219s
Job: 7 Done in: 0.0079998970031738s
Job: 6 Done in: 0.0049998760223389s
Job: 9 Done in: 0.0079998970031738s
Job: 8 Done in: 0.0069999694824219s

array(10) {
  [0] =>
  int(17730)
  [1] =>
  int(18771)
  [2] =>
  int(12944)
  [3] =>
  int(6025)
  [4] =>
  int(29582)
  [5] =>
  int(10159)
  [6] =>
  int(26556)
  [7] =>
  int(9029)
  [8] =>
  int(15002)
  [9] =>
  int(4043)
}

Вещи, которые стоит отметить здесь:

  1. Построение 5 рабочих для 10 задач. 5 последняя задача выполняется на существующих потоках с уже настроенным подключением к БД.
  2. Вы можете "send" данные в поток, создав новую задачу и отправив ее.
  3. Вы можете повторить результат по функции collect.
  4. Простой объект можно передать конструктору задач.