Pool::collect — Собирать ссылки на выполненные задачи
(PECL pthreads >= 2.0.0)
Pool::collect — Собирать ссылки на выполненные задачи
Описание
Позволяет пулу собирать ссылки, определенные сборщиком как мусор.
Список параметров
collector
-
Вызываемый коллектор, возвращающий логическое значение о том, может ли задача быть собрана или нет. Только в редких случаях необходимо использовать пользовательский коллектор.
Возвращаемые значения
Число оставшихся задач в пуле, которые необходимо собрать.
Список изменений
Версия | Описание |
---|---|
v3 | Теперь возвращается целое число, а параметр collector является необязательным. |
Примеры
Пример #1 Простой пример Pool::collect()
<?php $pool = new Pool(4); for ($i = 0; $i < 15; ++$i) { $pool->submit(new class extends Threaded {}); } while ($pool->collect()); // блокирует выполнение всех задач $pool->shutdown();
Пример ошибки кода и заставила меня потратить 2 РАБОЧИХ дня
Прежде всего,` Stackable ' не имеет атрибута с именем $worker или его метод доступа сделал его недоступным.
Во-вторых, `Stackable` также не имеет `getThreadId()` . Рекомендуется использовать класс Thread для реализации потока, так как он имеет больше функций управления.
Лучше использовать ' Stackable` для хранения объектов и использовать его `run()` в качестве инициализации.
Рабочий пример
<?php class MyWork extends Thread { protected $complete; public function __construct() { $this->complete = false; } public function run() { printf( "Hello from %s in Thread #%lu\n", __CLASS__, $this->getThreadId()); $this->complete = true; } public function isComplete() { return $this->complete; } } class Something {} class MyWorker extends Worker { public function __construct(Something $something) { $this->something = $something; } public function run() { /** ... **/ } } $pool = new Pool(8, \MyWorker::class, [new Something()]); $pool->submit(new MyWork()); usleep(1000); $pool->collect(function($work){ return $work->isComplete(); }); var_dump($pool); ?>
Этот пример демонстрирует различные аспекты MTP с pthreads-существенно стоит отметить, двунаправленная связь с дочерними потоками.
Я ничего не смог найти об этом, поэтому хотел бы представить вам результаты своих исследований.
<?php class Model { public $id; public $value; } class Connection extends Worker { protected static $link; public function __construct($hostname, $username, $password, $database, $port = 3306) { $this->hostname = $hostname; $this->username = $username; $this->password = $password; $this->database = $database; $this->port = $port; } public function getConnection() { if(!self::$link) { echo 'Thread: '. $this->getThreadId() ." Connecting to db\n"; self::$link = new \PDO(...); } return self::$link; } } /** @property Connection $worker */ class QueryTask extends Threaded { public $data; public $result; protected $_complete; public function __construct(Model $data) { $this->_complete = false; $this->data = $data; } public function run() { /** @var \PDO $pdo */ $pdo = $this->worker->getConnection(); $text = 'Thread: '. $this->worker->getThreadId() .' Job: '. $this->data->id .' Data: '. $this->data->value; $t = microtime(true); $stmt = $pdo->prepare(" INSERT INTO `test` (`id`, `text`) VALUES (NULL, '". $text ."') "); $stmt->execute(); $dt = microtime(true) - $t; $result = (int) $stmt->rowCount(); echo $text .' Result: '. $result .' Exec time: '. $dt ."s\n"; $this->result = $result; $this->_complete = true; } public function isGarbage() : bool { return $this->_complete; } } $t = microtime(true); // запуск $pool = new Pool(5, 'Connection', [ 'localhost', 'root', 'password', 'test' ]); // задачи $tasks = 10; for($i=0; $i<$tasks; ++$i) { $object = new Model(); $object->id = $i; $object->value = rand(); $pool->submit(new QueryTask($object)); } // ожидание завершения $data = []; while(1) { $newData = []; $pool->collect(function(QueryTask $task) use (&$newData) { if($task->isGarbage()) { $tmpObj = new stdclass(); $tmpObj->complete = $task->complete; $newData[ $task->data->id ] = $task->data->value; } return $task->isGarbage(); }); $data = array_merge($data, $newData); if(count($data) == $tasks) break; usleep(100000); } var_dump($data); ?>
Результат:
Thread: 6796 Connecting to db Thread: 3156 Connecting to db Thread: 9040 Connecting to db Thread: 7748 Connecting to db Thread: 8836 Connecting to db Job: 0 Done in: 0.0070011615753174s Job: 4 Done in: 0.0069999694824219s Job: 2 Done in: 0.0090010166168213s Job: 3 Done in: 0.0090010166168213s Job: 1 Done in: 0.003000020980835s Job: 5 Done in: 0.0069999694824219s Job: 7 Done in: 0.0079998970031738s Job: 6 Done in: 0.0049998760223389s Job: 9 Done in: 0.0079998970031738s Job: 8 Done in: 0.0069999694824219s array(10) { [0] => int(17730) [1] => int(18771) [2] => int(12944) [3] => int(6025) [4] => int(29582) [5] => int(10159) [6] => int(26556) [7] => int(9029) [8] => int(15002) [9] => int(4043) }
Вещи, которые стоит отметить здесь:
- Построение 5 рабочих для 10 задач. 5 последняя задача выполняется на существующих потоках с уже настроенным подключением к БД.
- Вы можете "send" данные в поток, создав новую задачу и отправив ее.
- Вы можете повторить результат по функции collect.
- Простой объект можно передать конструктору задач.