Похоже, PHP разработчики редко используют параллельность. Говорить о простоте синхронного кода не буду, однопоточное программирование, конечно, проще и понятнее, но иногда небольшое использование параллельности может принести ощутимое повышение производительности.

В этой статье мы взглянем на то, как многопоточность может быть достигнута в PHP с помощью расширения pthreads. Для этого потребуется установленная ZTS (Zend Thread Safety) версия PHP 7.x, вместе с установленным расширением pthreads v3. (На момент написания статьи, в PHP 7.1 пользователям нужно будет установить из ветки master в репозитории pthreads  – см. подробнее как установить (en) стороннее расширение.)

Небольшое уточнение: pthreads v2 предназначена для PHP 5.x и более не поддерживается, pthreads v3 - для PHP 7.х и активно развивается.

Большое спасибо Joe Watkins (создателю расширения pthreads) за вычитку и помощь в улучшении моей статьи!

Когда не стоит использовать pthreads

Прежде чем мы начнём, я хотел бы уточнить, когда вы не должны (да и не можете) использовать расширение pthreads.

В pthreads v2, рекомендация была в том, что pthreads не должна использоваться в веб-серверной среде (т.е. в fcgi процессе). Что касается pthreads v3, эта рекомендация является программным ограничением, так что теперь вы просто не сможете использовать его в среде веб-сервера. Две известные причины:

  1. Это небезопасно использовать несколько потоков в такой среде (например, в связи с ошибками ввода-вывода, да и кучи других проблем).
  2. Это не очень хорошо масштабируется. Например, скажем, у вас есть PHP-скрипт, который создает новый поток, чтобы обработать какую-то задачу, и этот скрипт выполняется при каждом запросе. Это означает, что для каждого запроса, ваше приложение будет создавать новый поток (это модель потоков 1:1 – один поток на один запрос). Если ваше приложение обслуживает 1тыс. запросов в секунду, это означает создание 1тыс. нитей в секунду! Наличие множества потоков, запущенных на одной машине, быстро наводнит ее, и проблема будет лишь усугубляться, т. к. скорость выполнения запроса будет увеличиваться.

Вот почему многопоточность не является хорошим решением в такой среде. Если вы рассматриваете многопоточность как решение задач, блокирующих ввод-вывод (например, выполнение http-запросов), то позвольте мне обратить ваше внимание на асинхронное программирование, которое можно реализовать с помощью фреймворков, таких как Amp.

После такого отступления, давайте сразу перейдём к делу!

Обработка разовых задач

Иногда вы хотите обрабатывать разовые задачи многопоточным способом (например, выполнение некой задачи, завязанной на ввод-вывод). В таких случаях можно использовать класс Thread, чтобы создать новый поток и запустить некую обработку в отдельном потоке.

Например:

$task = new class extends Thread {
    private $response;

    public function run()
    {
        $content = file_get_contents("http://google.com");
        preg_match("~<title>(.+)</title>~", $content, $matches);
        $this->response = $matches[1];
    }
};

$task->start() && $task->join();

var_dump($task->response); // string(6) "Google"

Здесь метод run — это наша обработка, которая будет выполняться внутри нового потока. При вызове Thread::start, порождается новый поток и вызывается метод run. Затем мы присоединяем порождённый поток обратно в основной поток, вызвав Thread::join, который будет заблокирован до тех пор, пока порождённый поток не завершит своё выполнение. Это гарантирует, что задача завершит выполнение, прежде чем мы попытаемся вывести результат (который хранится в $task->response).

Возможно, не желательно загрязнять класс дополнительной ответственностью, связанной с логикой потока (в том числе обязанность определения метода run). Мы можем выделить такие классы, унаследовав их от класса Threaded. Тогда они могут быть запущены внутри другого потока:

class Task extends Threaded
{
    public $response;

    public function someWork()
    {
        $content = file_get_contents('http://google.com');
        preg_match('~~', $content, $matches);
        $this->response = $matches[1];
    }
}

$task = new Task;

$thread = new class($task) extends Thread {
    private $task;

    public function __construct(Threaded $task)
    {
        $this->task = $task;
    }

    public function run()
    {
        $this->task->someWork();
    }
};

$thread->start() && $thread->join();

var_dump($task->response);

Любой класс, который должен быть запущен в отдельном потоке, должен наследоваться от класса Threaded. Это потому что он предоставляет необходимые возможности для выполнения обработки в разных потоках, а также неявную безопасность и полезные интерфейсы (такие, как синхронизация ресурсов).

Давайте взглянем на иерархию классов, предлагаемую расширением pthreads:

Threaded (implements Traversable, Collectable)
    Thread
        Worker
    Volatile
Pool

Мы уже рассмотрели и узнали основы классов Thread и Threaded, теперь давайте взглянем на остальные три (Worker, Volatile, и Pool).

Переиспользование потоков

Запуск нового потока для каждой задачи, которую нужно распараллелить, достаточно затратно. Это потому что архитектура "ничего-общего" должна быть реализована в pthreads, чтобы добиться многопоточности внутри PHP. Что означает, что весь контекст выполнения текущего экземпляра интерпретатора PHP (в том числе и каждый класс, интерфейс, трейт и функция) должна быть скопирована для каждого созданного потока. Поскольку это влечет за собой заметное влияние на производительность, поток всегда должен быть повторно использован, когда это возможно. Потоки могут быть переиспользованы двумя способами: с помощью Worker-ов или с помощью Pool-ов.

Класс Worker используется для выполнения ряда задач синхронно внутри другого потока. Это делается путем создания нового экземпляра Worker-а (который создает новый поток), а затем внесением задач в стек этого отдельного потока (с помощью Worker::stack).

Вот небольшой пример:

class Task extends Threaded
{
    private $value;

    public function __construct(int $i)
    {
        $this->value = $i;
    }

    public function run()
    {
        usleep(250000);
        echo "Task: {$this->value}\n";
    }
}

$worker = new Worker();
$worker->start();

for ($i = 0; $i < 15; ++$i) {
    $worker->stack(new Task($i));
}

while ($worker->collect());

$worker->shutdown();

Выход:

В вышеприведённом примере в стек заносится 15 задач для нового объекта $worker через метод Worker::stack, а затем они обрабатываются в порядке их внесения. Метод Worker::collect, как показано выше, используется для очистки задач, как только они закончат выполнение. С его помощью внутри цикла while, мы блокируем основной поток, пока не будут завершены все задачи из стека и пока они не будут очищены — до того как мы вызовем Worker::shutdown. Завершение worker-а досрочно (т. е. пока есть еще задачи, которые должны быть выполнены) будет по-прежнему блокировать основной поток до тех пор, пока все задачи не завершат своё выполнение, просто задачи не будут почищены сборщиком мусора (что влечёт за собой утечки памяти).

Класс Worker предоставляет несколько других методов, относящихся к его стеку задач, включая Worker::unstack для удаления последней внесённой задачи и Worker::getStacked для получения количества задач в стеке выполнения. Стек worker-а содержит только задачи, которые должны быть выполнены. Как только задача из стека была выполнена, она удаляется и размещается в отдельном (внутреннем) стеке для сборки мусора (с помощью метода Worker::collect).

Еще один способ переиспользовать поток при выполнении многих задач — это использование пула потоков (через класс Pool). Пул потоков использует группу Worker-ов, чтобы дать возможность выполнять задачи одновременно, в котором фактор параллельности (число потоков пула, с которыми он работает) задается при создании пула.

Давайте адаптируем приведенный выше пример для использования пула worker-ов:

class Task extends Threaded
{
    private $value;

    public function __construct(int $i)
    {
        $this->value = $i;
    }

    public function run()
    {
        usleep(250000);
        echo "Task: {$this->value}\n";
    }
}

$pool = new Pool(4);

for ($i = 0; $i < 15; ++$i) {
    $pool->submit(new Task($i));
}

while ($pool->collect());

$pool->shutdown();

Выход:

Есть несколько заметных различий при использовании пула, в отличие от воркера. Во-первых, пул не требует запуска вручную, он приступает к выполнению задач, как только они становятся доступными. Во-вторых, мы отправляем задачи в пул, а не укладываем их в стек. Кроме того, класс Pool не наследуется от Threaded, и поэтому он не может быть передан в другие потоки (в отличие от Worker).

Как хорошая практика, для воркеров и пулов следует всегда подчищать их задачи, как только они завершились, и затем вручную завершать их самих. Потоки, созданные с помощью класса Thread также должны быть присоединены к порождающему потоку.

pthreads и (не)изменяемость

Последний класс, которого мы коснёмся, – Volatile, – новое дополнение к pthreads v3. Понятие неизменяемости стало важной концепцией в pthreads, так как без неё производительность существенно снижается. Поэтому по умолчанию, свойства Threaded-классов, которые сами являются Threaded-объектами, сейчас являются неизменными, и поэтому они не могут быть перезаписаны после их первоначального присвоения. Явная изменяемость для таких свойств сейчас пока предпочтительна, и все еще может быть достигнута с помощью нового класса Volatile.

Давайте взглянем на пример, который продемонстрирует новые ограничения неизменяемости:

class Task extends Threaded // a Threaded class
{
    public function __construct()
    {
        $this->data = new Threaded();
        // $this->data is not overwritable, since it is a Threaded property of a Threaded class
    }
}

$task = new class(new Task()) extends Thread { // a Threaded class, since Thread extends Threaded
    public function __construct($tm)
    {
        $this->threadedMember = $tm;
        var_dump($this->threadedMember->data); // object(Threaded)#3 (0) {}
        $this->threadedMember = new StdClass(); // invalid, since the property is a Threaded member of a Threaded class
    }
};

Threaded-свойства у классов Volatile, с другой стороны, изменяемы:

class Task extends Volatile
{
    public function __construct()
    {
        $this->data = new Threaded();
        $this->data = new StdClass(); // valid, since we are in a volatile class
    }
}

$task = new class(new Task()) extends Thread {
    public function __construct($vm)
    {
        $this->volatileMember = $vm;

        var_dump($this->volatileMember->data); // object(stdClass)#4 (0) {}

        // still invalid, since Volatile extends Threaded, so the property is still a Threaded member of a Threaded class
        $this->volatileMember = new StdClass();
    }
};

Мы видим, что класс Volatile переопределяет неизменяемость, навязанную родительским классом Threaded, чтобы предоставить возможность изменять Threaded-свойства (а также unset()-ить).

Есть ещё один предмет обсуждения чтобы раскрыть тему изменяемости и класса Volatile – массивы. В pthreads массивы автоматически приводятся к Volatile-объектам при присвоении к свойству класса Threaded. Это потому что просто небезопасно манипулировать массивом из нескольких контекстов PHP.

Давайте снова взглянем на пример, чтобы лучше понимать некоторые вещи:

$array = [1,2,3];

$task = new class($array) extends Thread {
    private $data;

    public function __construct(array $array)
    {
        $this->data = $array;
    }

    public function run()
    {
        $this->data[3] = 4;
        $this->data[] = 5;

        print_r($this->data);
    }
};

$task->start() && $task->join();

/* Вывод:
Volatile Object
(
    [0] => 1
    [1] => 2
    [2] => 3
    [3] => 4
    [4] => 5
)
*/

Мы видим, что Volatile-объекты могут быть обработаны так, как если бы они были массивами, т. к. они поддерживают операции с массивами, такие как (как показано выше) оператор подмножеств ([]). Однако, классы Volatile не поддерживают базовые функции с массивами, такие как array_pop и array_shift. Вместо этого, класс Threaded предоставляет нам подобные операции как встроенные методы.

В качестве демонстрации:

$data = new class extends Volatile {
    public $a = 1;
    public $b = 2;
    public $c = 3;
};

var_dump($data);
var_dump($data->pop());
var_dump($data->shift());
var_dump($data);

/* Вывод:
object(class@anonymous)#1 (3) {
  ["a"]=> int(1)
  ["b"]=> int(2)
  ["c"]=> int(3)
}
int(3)
int(1)
object(class@anonymous)#1 (1) {
  ["b"]=> int(2)
}
*/

Другие поддерживаемые операции включают в себя Threaded::chunk и Threaded::merge.

Синхронизация

В последнем разделе этой статьи мы рассмотрим синхронизацию в pthreads. Синхронизация — это метод, позволяющий контролировать доступ к общим ресурсам.

Для примера, давайте реализуем простейший счетчик:

$counter = new class extends Thread {
    public $i = 0;

    public function run()
    {
        for ($i = 0; $i < 10; ++$i) {
            ++$this->i;
        }
    }
};

$counter->start();

for ($i = 0; $i < 10; ++$i) {
    ++$counter->i;
}

$counter->join();

var_dump($counter->i); // выведет число от 10 до 20

Без использования синхронизации, вывод не детерминирован. Несколько потоков пишут в одну переменную без контролируемого доступа, что означает что обновления будут потеряны.

Давайте исправим это так, что мы получим правильный вывод 20, путем добавления синхронизации:

$counter = new class extends Thread {
    public $i = 0;

    public function run()
    {
        $this->synchronized(function () {
            for ($i = 0; $i < 10; ++$i) {
                ++$this->i;
            }
        });
    }
};

$counter->start();

$counter->synchronized(function ($counter) {
    for ($i = 0; $i < 10; ++$i) {
        ++$counter->i;
    }
}, $counter);

$counter->join();

var_dump($counter->i); // int(20)

Синхронизированные блоки кода могут также взаимодействовать друг с другом, используя методы Threaded::wait и Threaded::notify (или Threaded::notifyAll).

Вот поочерёдный инкремент в двух синхронизированных циклах while:

$counter = new class extends Thread {
    public $cond = 1;

    public function run()
    {
        $this->synchronized(function () {
            for ($i = 0; $i < 10; ++$i) {
                var_dump($i);
                $this->notify();

                if ($this->cond === 1) {
                    $this->cond = 2;
                    $this->wait();
                }
            }
        });
    }
};

$counter->start();

$counter->synchronized(function ($counter) {
    if ($counter->cond !== 2) {
        $counter->wait(); // wait for the other to start first
    }

    for ($i = 10; $i < 20; ++$i) {
        var_dump($i);
        $counter->notify();

        if ($counter->cond === 2) {
            $counter->cond = 1;
            $counter->wait();
        }
    }
}, $counter);

$counter->join();

/* Вывод:
int(0)
int(10)
int(1)
int(11)
int(2)
int(12)
int(3)
int(13)
int(4)
int(14)
int(5)
int(15)
int(6)
int(16)
int(7)
int(17)
int(8)
int(18)
int(9)
int(19)
*/

Вы можете заметить дополнительные условия, которые были размещены вокруг обращения к Threaded::wait. Эти условия имеют решающее значение, поскольку они позволяют синхронизированному колбэку возобновить работу, когда он получил уведомление и указанное условие равно true. Это важно, потому что уведомления могут поступать из других мест, кроме как при вызове Threaded::notify. Таким образом, если вызовы метода Threaded::wait не были заключены в условиях, мы будем выполнять ложные вызовы пробуждения, которые приведут к непредсказуемому поведению кода.