Pool::collect — Собирать ссылки на выполненные задачи
(PECL pthreads >= 2.0.0)
Pool::collect — Собирать ссылки на выполненные задачи
Описание
Позволяет пулу собирать ссылки, определенные сборщиком как мусор.
Список параметров
collector-
Вызываемый коллектор, возвращающий логическое значение о том, может ли задача быть собрана или нет. Только в редких случаях необходимо использовать пользовательский коллектор.
Возвращаемые значения
Число оставшихся задач в пуле, которые необходимо собрать.
Список изменений
| Версия | Описание |
|---|---|
| v3 | Теперь возвращается целое число, а параметр collector является необязательным. |
Примеры
Пример #1 Простой пример Pool::collect()
<?php
$pool = new Pool(4);
for ($i = 0; $i < 15; ++$i) {
$pool->submit(new class extends Threaded {});
}
while ($pool->collect()); // блокирует выполнение всех задач
$pool->shutdown();
Пример ошибки кода и заставила меня потратить 2 РАБОЧИХ дня
Прежде всего,` Stackable ' не имеет атрибута с именем $worker или его метод доступа сделал его недоступным.
Во-вторых, `Stackable` также не имеет `getThreadId()` . Рекомендуется использовать класс Thread для реализации потока, так как он имеет больше функций управления.
Лучше использовать ' Stackable` для хранения объектов и использовать его `run()` в качестве инициализации.
Рабочий пример
<?php
class MyWork extends Thread {
protected $complete;
public function __construct() {
$this->complete = false;
}
public function run() {
printf(
"Hello from %s in Thread #%lu\n",
__CLASS__, $this->getThreadId());
$this->complete = true;
}
public function isComplete() {
return $this->complete;
}
}
class Something {}
class MyWorker extends Worker {
public function __construct(Something $something) {
$this->something = $something;
}
public function run() {
/** ... **/
}
}
$pool = new Pool(8, \MyWorker::class, [new Something()]);
$pool->submit(new MyWork());
usleep(1000);
$pool->collect(function($work){
return $work->isComplete();
});
var_dump($pool);
?>
Этот пример демонстрирует различные аспекты MTP с pthreads-существенно стоит отметить, двунаправленная связь с дочерними потоками.
Я ничего не смог найти об этом, поэтому хотел бы представить вам результаты своих исследований.
<?php
class Model
{
public $id;
public $value;
}
class Connection
extends Worker
{
protected static $link;
public function __construct($hostname, $username, $password, $database, $port = 3306)
{
$this->hostname = $hostname;
$this->username = $username;
$this->password = $password;
$this->database = $database;
$this->port = $port;
}
public function getConnection()
{
if(!self::$link)
{
echo 'Thread: '. $this->getThreadId() ." Connecting to db\n";
self::$link = new \PDO(...);
}
return self::$link;
}
}
/** @property Connection $worker */
class QueryTask
extends Threaded
{
public $data;
public $result;
protected $_complete;
public function __construct(Model $data)
{
$this->_complete = false;
$this->data = $data;
}
public function run()
{
/** @var \PDO $pdo */
$pdo = $this->worker->getConnection();
$text = 'Thread: '. $this->worker->getThreadId() .' Job: '. $this->data->id .' Data: '. $this->data->value;
$t = microtime(true);
$stmt = $pdo->prepare("
INSERT INTO `test` (`id`, `text`) VALUES (NULL, '". $text ."')
");
$stmt->execute();
$dt = microtime(true) - $t;
$result = (int) $stmt->rowCount();
echo $text .' Result: '. $result .' Exec time: '. $dt ."s\n";
$this->result = $result;
$this->_complete = true;
}
public function isGarbage() : bool
{
return $this->_complete;
}
}
$t = microtime(true);
// запуск
$pool = new Pool(5, 'Connection', [ 'localhost', 'root', 'password', 'test' ]);
// задачи
$tasks = 10;
for($i=0; $i<$tasks; ++$i)
{
$object = new Model();
$object->id = $i;
$object->value = rand();
$pool->submit(new QueryTask($object));
}
// ожидание завершения
$data = [];
while(1)
{
$newData = [];
$pool->collect(function(QueryTask $task) use (&$newData) {
if($task->isGarbage())
{
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
$newData[ $task->data->id ] = $task->data->value;
}
return $task->isGarbage();
});
$data = array_merge($data, $newData);
if(count($data) == $tasks)
break;
usleep(100000);
}
var_dump($data);
?>
Результат:
Thread: 6796 Connecting to db
Thread: 3156 Connecting to db
Thread: 9040 Connecting to db
Thread: 7748 Connecting to db
Thread: 8836 Connecting to db
Job: 0 Done in: 0.0070011615753174s
Job: 4 Done in: 0.0069999694824219s
Job: 2 Done in: 0.0090010166168213s
Job: 3 Done in: 0.0090010166168213s
Job: 1 Done in: 0.003000020980835s
Job: 5 Done in: 0.0069999694824219s
Job: 7 Done in: 0.0079998970031738s
Job: 6 Done in: 0.0049998760223389s
Job: 9 Done in: 0.0079998970031738s
Job: 8 Done in: 0.0069999694824219s
array(10) {
[0] =>
int(17730)
[1] =>
int(18771)
[2] =>
int(12944)
[3] =>
int(6025)
[4] =>
int(29582)
[5] =>
int(10159)
[6] =>
int(26556)
[7] =>
int(9029)
[8] =>
int(15002)
[9] =>
int(4043)
}
Вещи, которые стоит отметить здесь:
- Построение 5 рабочих для 10 задач. 5 последняя задача выполняется на существующих потоках с уже настроенным подключением к БД.
- Вы можете "send" данные в поток, создав новую задачу и отправив ее.
- Вы можете повторить результат по функции collect.
- Простой объект можно передать конструктору задач.