Когда оправдана реализация веб сервера с использованием потоков
Перейти к содержимому

Когда оправдана реализация веб сервера с использованием потоков

  • автор:

Когда оправдана реализация веб сервера с использованием потоков

В этой статье я постараюсь максимально широко изложить схемы работы веб-серверов. Это поможет выбрать сервер или решать, какая архитектура быстрее, не основываясь на часто необъективных бенчмарках.

В общем — статья представляет собой глобальный обзор «что бывает». Без циферок.

Статья написана на основе опыта работы с серверами:

  • Apache, Lighttpd, Nginx (на C)
  • Tomcat, Jetty (на Java)
  • Twisted (Python)
  • Erlang OTP (язык Erlang)
  • и операционными системами Linux, FreeBSD

Тем не менее, принципы достаточно общие, поэтому должны распространяться в каком-то виде на OS Windows, Solaris, и на большое количество других веб-серверов.

Цель веб-сервера

Цель веб-сервера проста — обслуживать одновременно большое количество клиентов, максимально эффективно используя hardware. Как это сделать — в этом основная заморочка и предмет статьи 😉

Работа с соединениями

С чего начинается обработка запроса? Очевидно — с приема соединения от пользователя.

Для этого в разных OS используются разные системные вызовы. Наиболее известный и медленный на большом количестве соединений — select. Более эффективные — poll, kpoll, epoll.

Современные веб-серверы постепенно отказываются от select.

Оптимизации ОС

Еще до приема соединения возможны оптимизации на уровне ядра ОС. Например, ядро ОС, получив соединение, может не беспокоить веб-сервер, пока не произошло одно из событий.

  • пока не пришли данные (dataready)
  • пока не пришел целиком HTTP-запрос (httpready)

На момент написания статьи оба способа поддерживаются во FreeBSD (ACCEPT_FILTER_HTTP, ACCEPT_FITER_DATA), и только первый — в Linux (TCP_DEFER_ACCEPT).

Эти оптимизации позволяют серверу меньше времени простаивать в ожидании данных, повышая таким образом общую эффективность работы.

Соединение принято

Итак, соединение принято. Теперь на плечи сервера ложится основная задача — обработать запрос и отослать ответ посетителю. Будем здесь рассматривать только динамические запросы, существенно более сложные, чем отдача картинок.

Во всех серверах используется асинхронный подход.

Он заключается в том, что обработка запроса спихивается куда-нибудь «налево» — отдается на выполнение вспомогательному процессу/потоку, а сервер продолжает работать и принимать-отдавать на выполнение все новые соединения.

В зависимости от реализации — процесс-помощник («worker») может пересылать результат обратно серверу целиком (для последующей отдачи клиенту), может передавать серверу только дескриптор результата (без копирования), или может отдавать результат клиенту сам.

Основные стратегии работы с worker’ами

Работа с воркерами состоит из нескольких элементов, которые можно по-разному комбинировать и получать разный результат.

Тип worker’а

Основных типов два — это процесс и поток. Для улучшения производительности иногда используют оба типа одновременно, порождая несколько процессов и кучу потоков в каждом.

Процесс

Различные worker’ы могут быть процессами. В этом случае они не взаимодействуют между собой, и данные различных worker’а полностью независимы друг от друга.

Поток

Потоки, в отличие от процессов, имеют общие, разделяемые структуры данных. В коде worker’а должна быть реализована синхронизация доступа, чтобы одновременная запись одной и той же структуры не привела к хаосу.

Адресное пространство

Каждый процесс, в том числе и сервер, обладает своим адресным пространством, которое использует для разделения данных.

Внутри сервера

При работе внутри сервера — worker имеет доступ к данным сервера. Он может поменять любые структуры и делать разные гадости, особенно если написан с ошибками.

Плюсом является отсутствие пересылки данных из одного адресного пространства в другое.

Снаружи сервера

Worker может быть запущен вообще независимо от сервера и принимать данные на обработку по специальному протоколу (например FastCGI).

Конечно, этот вариант — самый безопасный для сервера. Но требует дополнительной работы по пересылке запроса — результата между сервером и worker’ом.

Рождение worker’ов

Чтобы обрабатывать много соединений одновременно — нужно иметь достаточное количество рабочих.

Основных стратегий — две.

Статика

Количество рабочих может быть жестко фиксированно. Например, 20 рабочих процессов всего. Если же все рабочие заняты и приходит 21й запрос — сервер выдает код Temporary Unavailable — «временно недоступен».

Динамика

Для более гибкого управления ресурсами — рабочие могут порождаться динамически, в зависимости от загрузки. Алгоритм порождения рабочих может быть параметризован, например (Apache pre-fork), так:

  • Минимальное количество свободных рабочих = 5
  • Максимальное количество свободных рабочих = 20
  • Всего рабочих не более = 30
  • Начальное количество рабочих = 10

Чистка между запросами

Рабочие могут либо заново инициализовать себя между запросами, либо — просто обрабатывать запросы один за другим.

Чистый

Перед каждым запросом очищается от того, что было раньше, чистит внутренние переменные и пр.

В результате нет проблем и ошибок, связанных с использованием переменных, оставшихся от старого запроса.

Персистентный

Никакой очистки состояния. В результате — экономия ресурсов.

Разбор типичных конфигураций

Посмотрим, как эти комбинации работают на примере различных серверов.

Apache (pre-fork MPM) + mod_php

  • Процесс
  • Внутри сервера
  • Динамика
  • Чистый

Apache (worker MPM) + mod_php

Для обработки динамических запросов используется модуль php, работающий в контексте сервера.

При этом, так как php работает в адресном пространстве сервера, разделяемые потоками данные периодически портятся, поэтому связка нестабильна и не рекомендована. Это происходит из-за ошибок в mod_php, который включает в себя ядро PHP и различные php-модули.

Ошибка в модуле, благодаря одному адресному пространству, может повалить весь сервер.

  • Поток
  • Внутри сервера
  • Динамика
  • Чистый

Apache (event mpm) + mod_php

Event MPM — это стратегия работы с worker’ами, которую использует только Apache. Все — точно так же, как с обычными потоками, но с небольшим дополнением для обработки Keep-Alive

Установка Keep-Alive служит для того, чтобы клиент мог прислать много запросов в одном соединении. Например, получить веб-страницу и 20 картинок. Обычно, worker заканчивает обработку запроса — и ждет какое-то время (keep-alive time), не последуют ли в этом соединении дополнительные запросы. То есть, просто висит в памяти.

Event MPM создает дополнительный поток, который берет на себя ожидание всех Keep-Alive запросов, освобождая рабочего для других полезных дел. В результате, общее количество worker’ов значительно сокращается, т.к никто теперь не ждет клиентов, а все работают.

  • Поток
  • Внутри сервера
  • Динамика
  • Чистый

Apache + mod_perl

Особенность связки Apache с mod_perl — в возможности вызывать Perl-процедуры по ходу обработки запроса апачем.

Благодаря тому, что mod_perl работает в одном адресном пространстве с сервером — он может регистрировать свои процедуры через Apache hooks, на разных стадиях работы сервера.

Например, можно работать на той же стадии, что и mod_rewrite, переписывая урл в хуке PerlTransHandler.

Следующий пример описывает rewrite с /example на /passed, но на перле.

# в конфиге апача при включенном mod_perl PerlModule MyPackage::Example PerlTransHandler MyPackage::Example
# в файле MyPackage/Example.pm package MyPackage::Example use Apache::Constants qw(DECLINED); use strict; sub handler < my $r = shift; $r->uri('/passed') if $r->uri == '/example' return DECLINED; > 1;

К сожалению, mod_perl — весьма тяжелый сам по себе, поэтому использование его лишь реврайтов — весьма накладно.

В отличие от mod_php, перловый модуль персистентен, т.е не инициализует себя заново каждый раз. Это удобно, т.к освобождает от необходимости загружать заново большую пачку модулей перед каждым запросом.

  • Процесс/поток — зависит от MPM
  • Внутри сервера
  • Динамика
  • Персистентный

Twisted

Этот асинхронный сервер написан на Python. Его особенность — в том, что программист веб-приложения сам создает дополнительных рабочих и дает им задания.

# пример кода на сервере twisted # долгая функция, обработка запроса def do_something_big(data): . # в процессе обработки запроса d = deferToThread(do_something_big, "параметры") # привязать каллбеки на результат do_something_big d.addCallback(handleOK) # .. и на ошибку при выполнении do_something_big d.addErrback(handleError)

Здесь программист, получив запрос, использует вызов deferToThread для создания отдельного потока, которому поручено выполнить функцию do_something_big. При успешном окончании do_something_big, будет выполнена функция handleOK, при ошибке — handleError.

А текущий поток в это время продолжит обычную обработку соединений.

Все происходит в едином адресном пространстве, поэтому все рабочие могут разделять, например, один и тот же массив с пользователями. Поэтому на Twisted легко писать многопользовательские приложения типа чата.

  • Поток
  • Внутри сервера
  • Динамика
  • Персистентный

Tomcat, Servlets

Сервлеты — классический пример поточных веб-приложений. Единый Java-код приложения запускается во множестве потоков. Синхронизация обязательна и должна выполняться программистом.

  • Поток
  • Внутри сервера
  • Динамика
  • Персистентный

FastCGI

FastCGI — интерфейс общения web-сервера с внешними worker’ами, которые обычно запущены как процессы. Сервер в специальном (не HTTP) формате передает переменные окружения, заголовки и тело запроса, а worker — возвращает ответ.

Есть два способа порождения таких worker’ов.

  1. Интегрированный с сервером
  2. Отдельный от сервера

В первом случае сервер сам создает внешние рабочие процессы и управляет их числом.

Во втором случае — для порождения рабочих процессов используется отдельный «spawner», второй, дополнительный сервер, который умеет общаться только по FastCGI-протоколу и управлять рабочими. Обычно spawner порождает рабочих в виде процессов, а не потоков. Динамика/Статика — определяется настройками spawner’а, а Чистый/Персистентный — характеристиками рабочего процесса.

Пути работы с FastCGI

С FastCGI можно работать двумя путями. Первый способ — самый простой, его использует Apache.

получить запрос -> отдать на обработку в FastCGI -> подождать ответа -> отдать ответ клиенту.

Второй способ используют сервера типа lighttpd/nginx/litespeed/и т.п.

получить запрос -> отдать на обработку в FastCGI -> обработать других клиентов -> отдать ответ клиенту, когда придет.

Отмеченное отличие позволяет Lighttpd + fastcgi работать эффективнее, чем это делает Apache, т.к пока процесс Apache ждет — Lighttpd успевает обслужить другие соединения.

Режимы работы FastCGI
  • Responder — обычный режим, когда FastCGI принимает запрос и переменные, и возвращает ответ
  • Authorizer — режим, когда FastCGI в качестве ответа разрешает или запрещает доступ. Удобно для контроля за закрытыми статическими файлами

Оба режима поддерживаются не во всех серверах. Например, в сервере Lighttpd — поддерживаются оба.

FastCGI PHP vs PERL

PHP-интерпретатор каждый раз очищает себя перед обработкой скрипта, а Perl — просто обрабатывает запросы один за другим в цикле вида:

подключить модули; while (пришел запрос)

Поэтому Perl-FastCGI гораздо эффективнее там, где большУю часть времени выполнения занимают include вспомогательных модулей.

Резюме

В статье рассмотрена общая структура обработки запросов и виды worker’ов. Кроме того, заглянули в Apache Event MPM и способы работы с FastCGI, посмотрели сервлеты и Twisted.

Потоки или события

Существует два способа обрабатывать параллельные запросы к серверу. Потоковые (threaded, синхронные) серверы используют множество одновременно выполняющихся потоков, каждый из которых обрабатывает один запрос. В это время событийные (evented, асинхронные) серверы выполняют единственной цикл событий, который обрабатывает все запросы.

Чтобы выбрать один из двух подходов, нужно определить профиль нагрузки на сервер.

Предположим, что каждый запрос требует c миллиекунд CPU и w миллисекунд реального времени для обработки. Время CPU расходуется на активные вычисления, а реальное время включает все запросы к внешним ресурсам. Например, запрос требует 5 мс времени c от CPU и 95 мс на ожидание ответа от базы данных. В итоге получается 100мс. Давайте также предположим, что потоковая версия сервера может поддерживать до t потоков, прежде чем начнутся проблемы с планированием и переключением контекста.

Если каждый запрос требует только время CPU для обработки, сервер в состоянии ответить на самое большее 1000/c запросов в секунду. Например, если каждый запрос занимает 2 миллисекунды времени CPU, тогда получится 1000/2=500 запросов в секунду.

В общем случае многопоточный сервер в состоянии обработать t*1000/w запросов в секунду.

Пропускная способность потокового сервера состовляет минимум от этих выражений (1000/c и t*1000/w). Событийний сервер ограничен лишь производительностью CPU (1000/c), поскольку использует всего один поток. Все описанное выше можно выразить следующим образом:

 def max_request_rate(t, c, w): cpu_bound = 1000. / c thread_bound = t * 1000. / w print 'threaded: %d\nevented: %d' % (min(cpu_bound, thread_bound), cpu_bound) 

Теперь рассмотрим различные типы серверов и посмотрим, как они себя покажут в различной реализации.

Для примеров я буду использовать t = 100.

Начнем с классического примера: HTTP прокси сервер. Этот тип серверов почти не требует времени CPU, поэтому предположим, что c = 0.1 мс. Предположим, что стоящие следом сервера получают задержку, скажем w = 50 мс. Тогда мы получим:

 >>> max_request_rate(100, 0.1, 50) threaded: 2000 evented: 10000 

Наши расчеты показывают, что потоковый сервер будет в состоянии обработать 2 000 запросов в секунду, а событийный 10 000. Большая производительность событийного сервера говорит нам, что для потокового сервера узким местом стало количество потоков.

Другой пример — сервер веб-приложений. Сперва рассмотрим случай приложения, которое не требует никаких внешних ресурсов, но производит определенные вычисления. Время обработки будет, допустим 5 мс. Т.к. никакие блокирующие вызовы не совершаются, время w составит также 5 мс. Тогда:

 >>> max_request_rate(100, 5, 5) threaded: 200 evented: 200 

В данном случае узким местом стала производительность CPU.

А теперь представим, что приложению нужно запросить данные из внешнего ресурса и время CPU составит 0.5 мс, а общее время w = 100 мс.

 >>> max_request_rate(100, 0.5, 100) threaded: 1000 evented: 2000 

Как видно из простых расчетов, да и вообще из математики, синхронная потоковая реализация не будет иметь большую производительность, чем асинхронная событийная.

Однако стоит учитывать, что конечного пользователя интересует в первую очередь реальное время, которое он тратит на ожидание ответа. Проблемы начинаются, когда приложению приходится делать что-то долгое. Например, совершить большой и сложный запрос в базу данных, провести сложный расчет, обработать графику, произвести запрос на чужой сервер. Примеров можно привести много.

Как только однопоточной программе, обслуживающей в цикле десять тысяч клиентов, приходится ради одного из них задержаться, например, на секунду, эту секунду ждут все остальные. Если таких запросов много — ни один клиент не получит ответ раньше, чем общее время обработки запросов. Кооперативная многозадачность в действии. В случае потоковых решений, действует вытесняющая многозадачность. Система не позволит «тяжелому» запросу тратить время всех.

С одной стороны мы имеем прозводительные асинхронные решения, узким местом которых является сложность написания программ, поскольку большинство существующих библиотек написаны в блокирующем стиле. Программист берет на себя отвестственность за то, что ошибка в одном запросе не скажется на остальных.

С другой стороны мы имеем блокирующие решения, для которых мы привыкли писать программы. Проблему количества поддерживаемых потоков можно решить с помощью специализированных средств, например гринлетов или Erlang-процессов. Если потоковые сервера достингут планки, при которой узким местом перестанет быть количество потоков, они будут выглядеть более привлекательно за счет времени отклика и надежности.

Язык программирования Rust

Превращение однопоточного сервера в многопоточный сервер

В текущей реализации сервер обрабатывает каждый запрос по очереди, то есть, он не начнёт обрабатывать второе соединение, пока не завершит обработку первого. При росте числа запросов к серверу, такое последовательное выполнение было бы все менее и менее оптимальным. Если сервер получает какой-то запрос, обработка которого занимает достаточно много времени, последующим запросам придётся ждать завершения обработки длительного запроса, даже если эти новые запросы сами по себе могут быть обработаны быстро. Нам нужно это исправить, но сначала рассмотрим проблему в действии.

Имитация медленного запроса в текущей реализации сервера

Мы посмотрим, как запрос с медленной обработкой может повлиять на другие запросы, сделанные к серверу в текущей реализации. В листинге 20-10 реализована обработка запроса к ресурсу /sleep с эмуляцией медленного ответа, при которой сервер будет ждать 5 секунд перед тем, как ответить.

use std::< fs, io::, net::, thread, time::Duration, >; // --snip-- fn main()  let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming()  let stream = stream.unwrap(); handle_connection(stream); > > fn handle_connection(mut stream: TcpStream) < // --snip-- let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] < "GET / HTTP/1.1" =>("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => < thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") >_ => ("HTTP/1.1 404 NOT FOUND", "404.html"), >; // --snip-- let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("\r\nContent-Length: \r\n\r\n"); stream.write_all(response.as_bytes()).unwrap(); > 

Листинг 20-10: Имитация медленного запроса с помощью 5-секундной задержки

Мы переключились с if на match , так как теперь у нас есть три случая. Нам придётся явно сопоставить срез от request_line для проверки совпадения шаблона со строковыми литералами; match не делает автоматические ссылки и разыменования, как это делает метод равенства.

Первая ветка совпадает с блоком if из листинга 20-9. Вторая ветка соответствует запросу /sleep . Когда этот запрос получен, сервер заснёт на 5 секунд, прежде чем отдать успешную HTML-страницу. Третья ветка совпадает с блоком else из листинга 20-9.

Можно увидеть, насколько примитивен наш сервер: в реальных библиотеках распознавание разных запросов осуществлялось бы гораздо менее многословно!

Запустите сервер командой cargo run . Затем откройте два окна браузера: одно с адресом http://127.0.0.1:7878/, другое с http://127.0.0.1:7878/sleep. Если вы несколько раз обратитесь к URI /, то как и раньше увидите, что сервер быстро ответит. Но если вы введёте URI /sleep, а затем загрузите URI /, то увидите что / ждёт, пока /sleep не отработает полные 5 секунд перед загрузкой страницы.

Есть несколько способов, которые можно использовать, чтобы избавиться от подтормаживания запросов после одного медленного запроса; способ, который мы реализуем, называется пулом потоков.

Улучшение пропускной способности с помощью пула потоков

Пул потоков является группой заранее порождённых потоков, ожидающих в пуле и готовых выполнить задачу. Когда программа получает новую задачу, она назначает эту задачу одному из потоков в пуле, и тогда задача будет обработана этим потоком. Остальные потоки в пуле доступны для обработки любых других задач, поступающих в то время, пока первый поток занят. Когда первый поток завершает обработку своей задачи, он возвращается в пул свободных потоков, готовых приступить к новой задаче. Пул потоков позволяет обрабатывать соединения параллельно, увеличивая пропускную способность вашего сервера.

Мы ограничим число потоков в пуле небольшим числом, чтобы защитить нас от атак типа «отказ в обслуживании» (DoS - Denial of Service); если бы наша программа создавала новый поток в момент поступления каждого запроса, то кто-то сделавший 10 миллионов запросов к серверу, мог бы создать хаос, использовать все ресурсы нашего сервера и остановить обработку запросов.

Вместо порождения неограниченного количества потоков, у нас будет фиксированное количество потоков, ожидающих в пуле. Поступающие запросы будут отправляться в пул для обработки. Пул будет иметь очередь входящих запросов. Каждый из потоков в пуле будет извлекать запрос из этой очереди, обрабатывать запрос и затем запрашивать в очереди следующий запрос. При таком дизайне мы можем обрабатывать N запросов одновременно, где N - количество потоков. Если каждый поток отвечает на длительный запрос, последующие запросы могут по-прежнему задержаться в очереди, но теперь мы увеличили количество "длинных" запросов, которые мы можем обработать, перед тем, как эта ситуация снова возникнет.

Этот подход - лишь один из многих способов улучшить пропускную способность веб-сервера. Другими вариантами, на которые возможно стоило бы обратить внимание, являются: модель fork/join, модель однопоточного асинхронного ввода-вывода или модель многопоточного асинхронного ввода-вывода. Если вам интересна эта тема, вы можете почитать больше информации о других решениях и попробовать реализовать их самостоятельно. С таким низкоуровневым языком как Rust, любой из этих вариантов осуществим.

Прежде чем приступить к реализации пула потоков, давайте поговорим о том, как должно выглядеть использование пула. Когда вы пытаетесь проектировать код, сначала необходимо написать клиентский интерфейс. Напишите API кода, чтобы он был структурирован так, как вы хотите его вызывать, затем реализуйте функциональность данной структуры, вместо подхода реализовывать функционал, а затем разрабатывать общедоступный API.

Подобно тому, как мы использовали разработку через тестирование (test-driven) в проекте главы 12, мы будем использовать здесь разработку, управляемую компилятором (compiler-driven). Мы напишем код, вызывающий нужные нам функции, а затем посмотрим на ошибки компилятора, чтобы определить, что мы должны изменить дальше, чтобы заставить код работать. Однако перед этим, в качестве отправной точки, мы рассмотрим технику, которую мы не будем применять в дальнейшем.

Порождение потока для каждого запроса

Сначала давайте рассмотрим, как мог бы выглядеть код, если бы он создавал бы новый поток для каждого соединения. Как упоминалось ранее, мы не планируем использовать этот способ в окончательной реализации, из-за возможных проблем при потенциально неограниченном числе порождённых потоков. Это лишь отправная точка, с которой начнёт работу наш многопоточный сервер. Затем мы улучшим код, добавив пул потоков, и тогда разницу между этими двумя решениями будет легче заметить. В листинге 20-11 показаны изменения, которые нужно внести в код main , чтобы порождать новый поток для обработки каждого входящего соединения внутри цикла for .

use std:: fs, io::, net::, thread, time::Duration, >; fn main() < let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() < let stream = stream.unwrap(); thread::spawn(|| < handle_connection(stream); >); > > fn handle_connection(mut stream: TcpStream)  let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..]  "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" =>  thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") > _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), >; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("\r\nContent-Length: \r\n\r\n"); stream.write_all(response.as_bytes()).unwrap(); > 

Листинг 20-11: Порождение нового потока для каждого соединения

Как вы изучили в главе 16, функция thread::spawn создаст новый поток и затем запустит код замыкания в этом новом потоке. Если вы запустите этот код и загрузите /sleep в своём браузере, а затем загрузите / в двух других вкладках браузера, вы действительно увидите, что запросам к / не приходится ждать завершения /sleep. Но, как мы уже упоминали, это в какой-то момент приведёт к сильному снижению производительности системы, так как вы будете создавать новые потоки без каких-либо ограничений.

Создание конечного числа потоков

Мы хотим, чтобы наш пул потоков работал аналогичным, знакомым образом, чтобы переключение с потоков на пул потоков не требовало больших изменений в коде использующем наш API. В листинге 20-12 показан гипотетический интерфейс для структуры ThreadPool , который мы хотим использовать вместо thread::spawn .

use std:: fs, io::, net::, thread, time::Duration, >; fn main() < let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming() < let stream = stream.unwrap(); pool.execute(|| < handle_connection(stream); >); > > fn handle_connection(mut stream: TcpStream)  let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..]  "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" =>  thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") > _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), >; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("\r\nContent-Length: \r\n\r\n"); stream.write_all(response.as_bytes()).unwrap(); > 

Листинг 20-12: Наш идеальный интерфейс ThreadPool

Мы используем ThreadPool::new , чтобы создать новый пул потоков с конфигурируемым числом потоков, в данном случае четырьмя. Затем в цикле for функция pool.execute имеет интерфейс, похожий на thread::spawn , в том смысле, что он так же принимает замыкание, код которого пул должен выполнить для каждого соединения. Нам нужно реализовать pool.execute , чтобы он принимал замыкание и передавал его потоку из пула для выполнения. Этот код пока не скомпилируется, но мы постараемся, чтобы компилятор помог нам это исправить.

Создание ThreadPool с помощью разработки, управляемой компилятором

Внесите изменения листинга 20-12 в файл src/main.rs, а затем давайте воспользуемся ошибками компилятора из команды cargo check для управления нашей разработкой. Вот первая ошибка, которую мы получаем:

$ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0433]: failed to resolve: use of undeclared type `ThreadPool` --> src/main.rs:11:16 | 11 | let pool = ThreadPool::new(4); | ^^^^^^^^^^ use of undeclared type `ThreadPool` For more information about this error, try `rustc --explain E0433`. error: could not compile `hello` due to previous error 

Замечательно! Ошибка говорит о том, что нам нужен тип или модуль ThreadPool , поэтому мы сейчас его создадим. Наша реализация ThreadPool не будет зависеть от того, что делает наш веб-сервер. Итак, давайте переделаем крейт hello из бинарного в библиотечный, чтобы хранить там нашу реализацию ThreadPool . После того, как мы переключимся в библиотечный крейт, мы также сможем использовать отдельную библиотеку пула потоков для любой подходящей работы, а не только для обслуживания веб-запросов.

Создайте файл src/lib.rs, который содержит следующий код, который является простейшим определением структуры ThreadPool , которое мы можем иметь на данный момент:

pub struct ThreadPool; 

Затем отредактируйте файл main.rs, чтобы внести ThreadPool из библиотечного крейта в текущую область видимости, добавив следующий код в начало src/main.rs:

use hello::ThreadPool; use std:: fs, io::, net::, thread, time::Duration, >; fn main()  let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming()  let stream = stream.unwrap(); pool.execute(||  handle_connection(stream); >); > > fn handle_connection(mut stream: TcpStream)  let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..]  "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" =>  thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") > _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), >; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("\r\nContent-Length: \r\n\r\n"); stream.write_all(response.as_bytes()).unwrap(); > 

Этот код по-прежнему не будет работать, но давайте проверим его ещё раз, чтобы получить следующую ошибку, которую нам нужно устранить:

$ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope --> src/main.rs:12:28 | 12 | let pool = ThreadPool::new(4); | ^^^ function or associated item not found in `ThreadPool` For more information about this error, try `rustc --explain E0599`. error: could not compile `hello` due to previous error 

Эта ошибка указывает, что далее нам нужно создать ассоциированную функцию с именем new для ThreadPool . Мы также знаем, что new должна иметь один параметр, который может принимать 4 в качестве аргумента и должен возвращать экземпляр ThreadPool . Давайте реализуем простейшую функцию new , которая будет иметь эти характеристики:

pub struct ThreadPool; impl ThreadPool < pub fn new(size: usize) ->ThreadPool < ThreadPool >> 

Мы выбираем usize в качестве типа параметра size , потому что мы знаем, что отрицательное число потоков не имеет никакого смысла. Мы также знаем, что мы будем использовать число 4 в качестве количества элементов в коллекции потоков, для чего предназначен тип usize , как обсуждалось в разделе "Целочисленные типы" главы 3.

Давайте проверим код ещё раз:

$ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope --> src/main.rs:17:14 | 17 | pool.execute(|| < | ^^^^^^^ method not found in `ThreadPool` For more information about this error, try `rustc --explain E0599`. error: could not compile `hello` due to previous error 

Теперь мы ошибка возникает из-за того, что у нас нет метода execute в структуре ThreadPool . Вспомните раздел "Создание конечного числа потоков" , в котором мы решили, что наш пул потоков должен иметь интерфейс, похожий на thread::spawn . Кроме того, мы реализуем функцию execute , чтобы она принимала замыкание и передавала его свободному потоку из пула для запуска.

Мы определим метод execute у ThreadPool , принимающий замыкание в качестве параметра. Вспомните из раздела "Перемещение захваченных значений из замыканий и трейты Fn " главы 13 информацию о том, что мы можем принимать замыкания в качестве параметров тремя различными типажами: Fn , FnMut и FnOnce . Нам нужно решить, какой тип замыкания использовать здесь. Мы знаем, что в конечном счёте мы сделаем что-то похожее на реализацию стандартной библиотеки thread::spawn , поэтому мы можем посмотреть, какие ограничения накладывает на свой параметр сигнатура функции thread::spawn . Документация показывает следующее:

pub fn spawn(f: F) -> JoinHandle where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static, 

Параметр типа F - это как раз то, что нас интересует; параметр типа T относится к возвращаемому значению и нам он не интересен. Можно увидеть, что spawn использует FnOnce в качестве ограничения типажа у F . Возможно это как раз то, чего мы хотим, так как в конечном итоге мы передадим полученный в execute аргумент в функцию spawn . Дополнительную уверенность в том, что FnOnce - это именно тот типаж, который мы хотим использовать, нам даёт факт, что поток для выполнения запроса будет выполнять замыкание этого запроса только один раз, что соответствует части Once ("единожды") в названии типажа FnOnce .

Параметр типа F также имеет ограничение типажа Send и ограничение времени жизни 'static , которые полезны в нашей ситуации: нам нужен Send для передачи замыкания из одного потока в другой и 'static , потому что мы не знаем, сколько времени поток будет выполняться. Давайте создадим метод execute для ThreadPool , который будет принимать обобщённый параметр типа F со следующими ограничениями:

pub struct ThreadPool; impl ThreadPool < // --snip-- pub fn new(size: usize) -> ThreadPool  ThreadPool > pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static, < >> 

Мы по-прежнему используем () после FnOnce потому что типаж FnOnce представляет замыкание, которое не принимает параметров и возвращает единичный тип () . Также как и при определении функций, тип возвращаемого значения в сигнатуре может быть опущен, но даже если у нас нет параметров, нам все равно нужны скобки.

Опять же, это самая простая реализация метода execute : она ничего не делает, мы просто пытаемся сделать код компилируемым. Давайте проверим снова:

$ cargo check Checking hello v0.1.0 (file:///projects/hello) Finished dev [unoptimized + debuginfo] target(s) in 0.24s 

Сейчас мы получаем только предупреждения, что означает, что код компилируется! Но обратите внимание, если вы попробуете cargo run и сделаете запрос в браузере, вы увидите ошибки в браузере, которые мы видели в начале главы. Наша библиотека на самом деле ещё не вызывает замыкание, переданное в execute !

Примечание: вы возможно слышали высказывание о языках со строгими компиляторами, таких как Haskell и Rust, которое звучит так: «Если код компилируется, то он работает». Но это высказывание не всегда верно. Наш проект компилируется, но абсолютно ничего не делает! Если бы мы создавали реальный, законченный проект, это был бы хороший момент начать писать модульные тесты, чтобы проверять, что код компилируется и имеет желаемое поведение.

Проверка количества потоков в new

Мы ничего не делаем с параметрами new и execute . Давайте реализуем тела этих функций с нужным нам поведением. Для начала давайте подумаем о new . Ранее мы выбрали беззнаковый тип для параметра size , потому что пул с отрицательным числом потоков не имеет смысла. Пул с нулём потоков также не имеет смысла, однако ноль - это вполне допустимое значение usize . Мы добавим код для проверки того, что size больше нуля, прежде чем вернуть экземпляр ThreadPool , и заставим программу паниковать, если она получит ноль, используя макрос assert! , как показано в листинге 20-13.

pub struct ThreadPool; impl ThreadPool < /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) ->ThreadPool < assert!(size >0); ThreadPool > // --snip-- pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static,  > > 

Листинг 20-13: Реализация ThreadPool::new с аварийным завершениям работы, если size равен нулю

Мы добавили немного документации для нашей структуры ThreadPool с помощью комментариев. Обратите внимание, что мы следовали хорошим практикам документирования, добавив раздел, в котором указывается ситуация, при которой функция может аварийно завершаться, как это обсуждалось в главе 14. Попробуйте запустить cargo doc --open и кликнуть на структуру ThreadPool , чтобы увидеть как выглядит сгенерированная документация для new !

Вместо добавления макроса assert! , как мы здесь сделали, мы могли бы преобразовать функцию new в функцию build таким образом, чтобы она возвращала Result , аналогично тому, как мы делали в функции Config::new проекта ввода/вывода в листинге 12-9. Но в данном случае мы решили, что попытка создания пула потоков без указания хотя бы одного потока должна быть непоправимой ошибкой. Если вы чувствуете такое стремление, попробуйте написать функцию build с сигнатурой ниже, для сравнения с функцией new :

pub fn build(size: usize) -> Result  
Создание места для хранения потоков

Теперь, имея возможность удостовериться, что количество потоков для хранения в пуле соответствует требованиям, мы можем создавать эти потоки и сохранять их в структуре ThreadPool перед тем как возвратить её. Но как мы "сохраним" поток? Давайте ещё раз посмотрим на сигнатуру thread::spawn :

pub fn spawn(f: F) -> JoinHandle where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static, 

Функция spawn возвращает тип JoinHandle , где T является типом, который возвращает замыкание. Давайте попробуем использовать JoinHandle и посмотрим, что произойдёт. В нашем случае замыкания, которые мы передаём пулу потоков, будут обрабатывать соединение и не будут возвращать ничего, поэтому T будет единичным (unit) типом () .

Код в листинге 20-14 скомпилируется, но пока не создаст ни одного потока. Мы изменили определение ThreadPool так, чтобы он содержал вектор экземпляров thread::JoinHandle , инициализировали вектор ёмкостью size , установили цикл for , который будет выполнять некоторый код для создания потоков, и вернули экземпляр ThreadPool , содержащий их.

use std::thread; pub struct ThreadPool < threads: Vec>, > impl ThreadPool < // --snip-- /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool < assert!(size >0); let mut threads = Vec::with_capacity(size); for _ in 0..size < // create some threads and store them in the vector >ThreadPool < threads >> // --snip-- pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static,  > > 

Листинг 20-14: Создание вектора в ThreadPool для хранения потоков

Мы включили std::thread в область видимости библиотечного крейта, потому что мы используем thread::JoinHandle в качестве типа элементов вектора в ThreadPool .

После получения корректного значения size, наш ThreadPool создаёт новый вектор, который может содержать size элементов. Функция with_capacity выполняет ту же задачу, что и Vec::new , но с важным отличием: она заранее выделяет необходимый объём памяти в векторе. Поскольку мы знаем, что нам нужно хранить size элементов в векторе, предварительное выделение памяти для этих элементов будет немного более эффективным, чем использование Vec::new , при котором размер вектора будет увеличиваться по мере вставки элементов.

Если вы снова запустите команду cargo check , она должна завершиться успешно.

Структура Worker , ответственная за отправку кода из ThreadPool в поток

Мы специально оставили комментарий в цикле for в Листинге 20-14 по поводу создания потоков. Сейчас мы разберёмся, как на самом деле создаются потоки. Стандартная библиотека предоставляет thread::spawn для создания потоков, причём thread::spawn ожидает получить некоторый код, который поток должен выполнить, как только он будет создан. Однако в нашем случае мы хотим создавать потоки и заставлять их ожидать код, который мы будем передавать им позже. Реализация потоков в стандартной библиотеке не предоставляет никакого способа сделать это, мы должны реализовать это вручную.

Мы будем реализовывать это поведение, добавив новую структуру данных между ThreadPool и потоками, которая будет управлять этим новым поведением. Мы назовём эту структуру Worker ("работник"), это общепринятое имя в реализации пулов. Работник берёт код, который нужно выполнить, и запускает этот код внутри рабочего потока. Представьте людей, работающих на кухне ресторана: работники ожидают, пока не поступят заказы от клиентов, а затем они несут ответственность за принятие этих заказов и их выполнение.

Вместо того чтобы хранить вектор экземпляров JoinHandle в пуле потоков, мы будем хранить экземпляры структуры Worker . Каждый Worker будет хранить один экземпляр JoinHandle . Затем мы реализуем метод у Worker , который будет принимать замыкание и отправлять его в существующий поток для выполнения. Для того чтобы мы могли различать работники в пуле при логировании или отладке, мы также присвоим каждому работнику id .

Вот как выглядит новая последовательность действий, которые будут происходить при создании ThreadPool . Мы реализуем код, который будет отправлять замыкание в поток, после того, как у нас будет Worker , заданный следующим образом:

  1. Определим структуру Worker , которая содержит id и JoinHandle .
  2. Изменим ThreadPool , чтобы он содержал вектор экземпляров Worker .
  3. Определим функцию Worker::new , которая принимает номер id и возвращает экземпляр Worker , который содержит id и поток, порождённый с пустым замыканием.
  4. В ThreadPool::new используем счётчик цикла for для генерации id , создаём новый Worker с этим id и сохраняем экземпляр "работника" в вектор.

Если вы готовы принять вызов, попробуйте реализовать эти изменения самостоятельно, не глядя на код в листинге 20-15.

Готовы? Вот листинг 20-15 с одним из способов сделать указанные ранее изменения.

use std::thread; pub struct ThreadPool < workers: Vec, > impl ThreadPool < // --snip-- /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool < assert!(size >0); let mut workers = Vec::with_capacity(size); for id in 0..size < workers.push(Worker::new(id)); >ThreadPool < workers >> // --snip-- pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static,  > > struct Worker < id: usize, thread: thread::JoinHandle, > impl Worker < fn new(id: usize) ->Worker < let thread = thread::spawn(|| <>); Worker < id, thread >> > 

Листинг 20-15: Изменение ThreadPool для хранения экземпляров Worker вместо непосредственного хранения потоков

Мы изменили название поля в ThreadPool с threads на workers , поскольку теперь оно содержит экземпляры Worker вместо экземпляров JoinHandle . Мы используем счётчик в цикле for для передачи цифрового идентификатора в качестве аргумента Worker::new , и сохраняем каждый новый Worker в векторе с именем workers .

Внешний код (вроде нашего сервера в src/bin/main.rs) не обязательно должен знать подробности реализации, касающиеся использования структуры Worker внутри ThreadPool , поэтому мы делаем структуру Worker и её функцию new приватными. Функция Worker::new использует заданный нами id и сохраняет экземпляр JoinHandle , который создаётся при порождении нового потока с пустым замыканием.

Примечание: Если операционная система не может создать поток из-за нехватки системных ресурсов, thread::spawn аварийно завершится. Это приведёт к аварийному завершению нашего сервера целиком, даже если некоторые потоки были созданы успешно. Для простоты будем считать, что нас устраивает такое поведение, но в реальной реализации пула потоков вы, вероятно, захотите использовать std::thread::Builder и его метод spawn , который вместо этого возвращает Result .

Этот код скомпилируется и будет хранить количество экземпляров Worker , которое мы указали в качестве аргумента функции ThreadPool::new . Но мы всё ещё не обрабатываем замыкание, которое мы получаем в методе execute . Давайте посмотрим, как это сделать далее.

Отправка запросов в потоки через каналы

Следующая проблема, с которой мы будем бороться, заключается в том, что замыкания, переданные в thread::spawn абсолютно ничего не делают. Сейчас мы получаем замыкание, которое хотим выполнить, в методе execute . Но мы должны передать какое-то замыкание в метод thread::spawn , при создании каждого Worker во время создания ThreadPool .

Мы хотим, чтобы вновь созданные структуры Worker извлекали код для запуска из очереди, хранящейся в ThreadPool и отправляли этот код в свой поток для выполнения.

Каналы (channels), простой способ коммуникации между двумя потоками, с которыми мы познакомились в главе 16, кажется идеально подойдут для этого сценария. Мы будем использовать канал в качестве очереди заданий, а команда execute отправит задание из ThreadPool экземплярам Worker , которые будут отправлять задание в свой поток. План таков:

  1. ThreadPool создаст канал и будет хранить отправитель.
  2. Каждый Worker будет хранить приёмник.
  3. Мы создадим новую структуру Job , которая будет хранить замыкания, которые мы хотим отправить в канал.
  4. Метод execute отправит задание, которое он хочет выполнить, в отправляющую сторону канала.
  5. В своём потоке Worker будет циклически опрашивать принимающую сторону канала и выполнять замыкание любого задания, которое он получит.

Давайте начнём с создания канала в ThreadPool::new и удержания отправляющей стороны в экземпляре ThreadPool , как показано в листинге 20-16. В структуре Job сейчас ничего не содержится, но это будет тип элемента, который мы отправляем в канал.

use std::; pub struct ThreadPool < workers: Vec, sender: mpsc::Sender, > struct Job; impl ThreadPool < // --snip-- /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool < assert!(size >0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size < workers.push(Worker::new(id)); >ThreadPool < workers, sender >> // --snip-- pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static,  > > struct Worker  id: usize, thread: thread::JoinHandle, > impl Worker  fn new(id: usize) -> Worker  let thread = thread::spawn(|| <>); Worker > > 

Листинг 20-16: Модификация ThreadPool для хранения отправляющей части канала, который отправляет экземпляры Job

В ThreadPool::new мы создаём наш новый канал и сохраняем в пуле его отправляющую сторону. Код успешно скомпилируется.

Давайте попробуем передавать принимающую сторону канала каждому "работнику" (структуре Worker), когда пул потоков создаёт канал. Мы знаем, что хотим использовать получающую часть канала в потоке, порождаемым "работником", поэтому мы будем ссылаться на параметр receiver в замыкании. Код 20-17 пока не компилируется.

use std::; pub struct ThreadPool  workers: Vec, sender: mpsc::Sender, > struct Job; impl ThreadPool < // --snip-- /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool < assert!(size >0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size < workers.push(Worker::new(id, receiver)); >ThreadPool < workers, sender >> // --snip-- pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static,  > > // --snip-- struct Worker  id: usize, thread: thread::JoinHandle, > impl Worker < fn new(id: usize, receiver: mpsc::Receiver) -> Worker < let thread = thread::spawn(|| < receiver; >); Worker < id, thread >> > 

Листинг 20-17: Передача принимающей части канала "работникам"

Мы внесли несколько небольших и простых изменений: мы передаём принимающую часть канала в Worker::new , а затем используем его внутри замыкания.

При попытке проверить код, мы получаем ошибку:

$ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0382]: use of moved value: `receiver` --> src/lib.rs:26:42 | 21 | let (sender, receiver) = mpsc::channel(); | -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver`, which does not implement the `Copy` trait . 26 | workers.push(Worker::new(id, receiver)); | ^^^^^^^^ value moved here, in previous iteration of loop For more information about this error, try `rustc --explain E0382`. error: could not compile `hello` due to previous error 

Код пытается передать receiver нескольким экземплярам Worker . Это не сработает, поскольку, как вы можете помнить из главы 16: реализация канала, которую предоставляет Rust - несколько производителей, один потребитель. Это означает, что мы не можем просто клонировать принимающую сторону канала, чтобы исправить этот код. Кроме этого, мы не хотим отправлять одно и то же сообщение нескольким потребителям, поэтому нам нужен единый список сообщений для множества обработчиков, чтобы каждое сообщение обрабатывалось лишь один раз.

Кроме того, удаление задачи из очереди канала включает изменение receiver , поэтому потокам необходим безопасный способ делиться и изменять receiver , в противном случае мы можем получить условия гонки (как описано в главе 16).

Вспомните умные указатели, которые обсуждались в главе 16: чтобы делиться владением между несколькими потоками и разрешать потокам изменять значение, нам нужно использовать тип Arc> . Тип Arc позволит нескольким "работникам" владеть получателем (receiver), а Mutex гарантирует что только один "работник" сможет получить задание (job) от получателя за раз. Листинг 20-18 показывает изменения, которые мы должны сделать.

use std::< sync::, thread, >; // --snip-- pub struct ThreadPool  workers: Vec, sender: mpsc::Sender, > struct Job; impl ThreadPool < // --snip-- /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool < assert!(size >0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size < workers.push(Worker::new(id, Arc::clone(&receiver))); >ThreadPool < workers, sender >> // --snip-- pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static,  > > // --snip-- struct Worker  id: usize, thread: thread::JoinHandle, > impl Worker < fn new(id: usize, receiver: Arc>) -> Worker < // --snip-- let thread = thread::spawn(||  receiver; >); Worker > > 

Листинг 20-18. Совместное использование приёмника в "работниках" с применением Arc и Mutex

В ThreadPool::new мы помещаем принимающую сторону канала внутрь Arc и Mutex . Для каждого нового "работника" мы клонируем Arc , чтобы увеличить счётчик ссылок так, что "работники" могут разделять владение принимающей стороной канала.

С этими изменениями код компилируется! Мы подбираемся к цели!

Реализация метода execute

Давайте реализуем наконец метод execute у структуры ThreadPool . Мы также изменим тип Job со структуры на псевдоним типа для типаж-объекта, который будет содержать тип замыкания, принимаемый методом execute . Как описано в разделе "Создание синонимов типа с помощью псевдонимов типа" главы 19, псевдонимы типов позволяют делать длинные типы короче, облегчая их использование. Посмотрите на листинг 20-19.

use std:: sync::, thread, >; pub struct ThreadPool  workers: Vec, sender: mpsc::Sender, > // --snip-- type Job = Box; impl ThreadPool < // --snip-- /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool  assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size  workers.push(Worker::new(id, Arc::clone(&receiver))); > ThreadPool > pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static, < let job = Box::new(f); self.sender.send(job).unwrap(); >> // --snip-- struct Worker  id: usize, thread: thread::JoinHandle, > impl Worker  fn new(id: usize, receiver: Arc>) -> Worker  let thread = thread::spawn(||  receiver; >); Worker > > 

Листинг 20-19: Создание псевдонима типа Job для указателя Box , содержащего каждое замыкание и затем отправляющее задание (job) в канал

После создания нового экземпляра Job с замыканием, полученным в execute , мы посылаем его через отправляющий конец канала. На тот случай, если отправка не удастся, вызываем unwrap у send . Это может произойти, например, если мы остановим выполнение всех наших потоков, что означает, что принимающая сторона прекратила получать новые сообщения. На данный момент мы не можем остановить выполнение наших потоков: наши потоки будут функционировать до тех пор, пока существует пул. Причина, по которой мы используем unwrap , заключается в том, что, хотя мы знаем, что сбой не произойдёт, компилятор этого не знает.

Но мы ещё не закончили! В "работнике" (worker) наше замыкание, переданное в thread::spawn все ещё ссылается только на принимающую сторону канала. Вместо этого нам нужно, чтобы замыкание работало в бесконечном цикле, запрашивая задание у принимающей части канала и выполняя задание, когда оно принято. Давайте внесём изменения, показанные в листинге 20-20 внутри Worker::new .

use std:: sync::, thread, >; pub struct ThreadPool  workers: Vec, sender: mpsc::Sender, > type Job = Box; impl ThreadPool  /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool  assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size  workers.push(Worker::new(id, Arc::clone(&receiver))); > ThreadPool > pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static,  let job = Box::new(f); self.sender.send(job).unwrap(); > > struct Worker  id: usize, thread: thread::JoinHandle, > // --snip-- impl Worker < fn new(id: usize, receiver: Arc>) -> Worker < let thread = thread::spawn(move || loop < let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker got a job; executing."); job(); >); Worker < id, thread >> > 

Листинг 20-20: Получение и выполнение заданий в потоке "работника"

Здесь мы сначала вызываем lock у receiver , чтобы получить мьютекс, а затем вызываем unwrap , чтобы аварийно завершить работу при любых ошибках. Захват блокировки может завершиться неудачей, если мьютекс находится в отравленном состоянии (poisoned state), что может произойти, если какой-то другой поток завершился аварийно, удерживая блокировку, вместо снятия блокировки. В этой ситуации вызвать unwrap для аварийного завершения потока вполне оправдано. Не стесняйтесь заменить unwrap на expect с сообщением об ошибке, которое имеет для вас значение.

Если мы получили блокировку мьютекса, мы вызываем recv , чтобы получить Job из канала. Последний вызов unwrap позволяет миновать любые ошибки, которые могут возникнуть, если поток, контролирующий отправитель, прекратил функционировать, подобно тому, как метод send возвращает Err , если получатель не принимает сообщение.

Вызов recv - блокирующий, поэтому пока задач нет, текущий поток будет ждать, пока задача не появится. Mutex гарантирует, что только один поток Worker за раз попытается запросить задачу.

Наш пул потоков теперь находится в рабочем состоянии! Выполните cargo run и сделайте несколько запросов:

$ cargo run Compiling hello v0.1.0 (file:///projects/hello) warning: field is never read: `workers` --> src/lib.rs:7:5 | 7 | workers: Vec, | ^^^^^^^^^^^^^^^^^^^^ | = note: `#[warn(dead_code)]` on by default warning: field is never read: `id` --> src/lib.rs:48:5 | 48 | id: usize, | ^^^^^^^^^ warning: field is never read: `thread` --> src/lib.rs:49:5 | 49 | thread: thread::JoinHandle, | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ warning: `hello` (lib) generated 3 warnings Finished dev [unoptimized + debuginfo] target(s) in 1.40s Running `target/debug/hello` Worker 0 got a job; executing. Worker 2 got a job; executing. Worker 1 got a job; executing. Worker 3 got a job; executing. Worker 0 got a job; executing. Worker 2 got a job; executing. Worker 1 got a job; executing. Worker 3 got a job; executing. Worker 0 got a job; executing. Worker 2 got a job; executing. 

Успех! Теперь у нас есть пул потоков, который обрабатывает соединения асинхронно. Никогда не создаётся более четырёх потоков, поэтому наша система не будет перегружена, если сервер получит много запросов. Если мы отправим запрос ресурса /sleep, сервер сможет обслуживать другие запросы, обрабатывая их в другом потоке.

Примечание: если вы запросите /sleep в нескольких окнах браузера одновременно, они могут загружаться по одному, с интервалами в 5 секунд. Некоторые веб-браузеры выполняют несколько экземпляров одного и того же запроса последовательно из-за кэширования. Такое ограничение не связано с работой нашего веб-сервера.

После изучения цикла while let в главе 18 вы можете удивиться, почему мы не написали код рабочего потока (worker thread), как показано в листинге 20-22.

use std:: sync::, thread, >; pub struct ThreadPool  workers: Vec, sender: mpsc::Sender, > type Job = Box; impl ThreadPool  /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool  assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size  workers.push(Worker::new(id, Arc::clone(&receiver))); > ThreadPool > pub fn execute(&self, f: F) where F: FnOnce() + Send + 'static,  let job = Box::new(f); self.sender.send(job).unwrap(); > > struct Worker  id: usize, thread: thread::JoinHandle, > // --snip-- impl Worker < fn new(id: usize, receiver: Arc>) -> Worker < let thread = thread::spawn(move || < while let Ok(job) = receiver.lock().unwrap().recv() < println!("Worker got a job; executing."); job(); > >); Worker < id, thread >> > 

Листинг 20-22: Альтернативная реализация Worker::new с использованием while let

Этот код компилируется и запускается, но не даёт желаемого поведения: медленный запрос всё равно приведёт к тому, что другие запросы будут ждать обработки. Причина здесь несколько тоньше: структура Mutex не имеет публичного метода unlock , так как владение блокировкой основано на времени жизни MutexGuard внутри LockResult , которое возвращает метод lock . Во время компиляции анализатор заимствований может проследить за выполнением правила, согласно которому к ресурсу, охраняемому Mutex , нельзя получить доступ пока мы удерживаем блокировку. Однако в этой реализация мы также можем получить ситуацию, когда блокировка будет удерживаться дольше, чем предполагалось, если мы не будем внимательно учитывать время жизни MutexGuard .

Код в листинге 20-20, использующий let job = receiver.lock().unwrap().recv().unwrap(); работает, потому что при использовании let любые промежуточные значения, используемые в выражении справа от знака равенства, немедленно уничтожаются после завершения оператора let . Однако while let (и if let и match ) не удаляет временные значения до конца связанного блока. Таким образом, в листинге 20-21 блокировка не снимается в течение всего времени вызова job() , что означает, что другие работники не могут получать задания.

Асинхронный Flask 2.0

В мае появился async Flask 2.0, который, среди прочего, предложил асинхронные инструменты. Пусть даже они довольно ограниченные, но это уже шаг вперед (оставим пока за скобками aioflask, хотя он выглядит получше). В качестве обзора асинхронного Flask 2.0 предлагаю туториал Патрика Кеннеди (Patrick Kennedy).

Flask 2.0 вышел 11 мая 2021 г. и добавляет встроенную поддержку асинхронных маршрутов (routes), обработчиков ошибок (error handlers), функций до (before request) и после (after request) запроса, а также обратных вызовов (коллбэков) разрыва!

Рассмотрим новые асинхронные функции Flask 2.0 и способы их использования в проектах с Flask.

Содержание

Когда следует использовать async?

Обработчик асинхронных маршрутов

Тестирование асинхронных маршрутов

Еще примеры с async

Async во Flask 1.x

Flask 2.0 async

Начиная с Flask 2.0, появляется возможность создавать обработчиков асинхронных маршрутов (routes), используя async / await :

import asyncio async def async_get_data(): await asyncio.sleep(1) return 'Done!' @app.route("/data") async def get_data(): data = await async_get_data() return data

Асинхронные маршруты не сложнее в разработке, чем синхронные:

1. Просто устанавливаем Flask с async с помощью pip install "flask[async] ".

2. Затем можно добавлять ключевое слово async в функции и использовать await .

Как это все работает?

На следующей схеме представлено исполнение асинхронного кода в Flask 2.0:

Чтобы запускать асинхронный код на Python, необходим цикл с ожиданием событий (event loop) и запуском сопрограмм (coroutines). Flask 2.0 берет на себя включение асинхронного цикла с ожиданием событий (как обычно, это делается через asyncio.run() ) для запуска сопрограмм.

При обработке функции маршрута async появляется новый подпоток (sub-thread). Внутри этого подпотока будет выполняться цикл с ожиданием событий для запуска обработчика маршрута (сопрограммы).

В данной реализации используется библиотека asgiref (если точнее, функционал asynctosync), используемая в django для выполнения асинхронного кода.

Подробнее о реализации можно посмотреть в документации по async_to_sync() в исходном коде Flask.

Данная реализация хороша тем, что она позволяет запускать Flask с любым типом процесса-исполнителя (worker) (потоки, gevent, eventlet и т.д.).

До появления Flask 2.0 для выполнение асинхронного кода требовалось делать новые асинхронные циклы с ожиданием событий в каждом обработчике маршрута, что требовало запуска приложения Flask с использованием потоковых процессов-исполнителей. Подробнее далее.

Кроме того, использование асинхронных обработчиков маршрутов дает обратную совместимость. Можно как угодно сочетать обработчики асинхронных и синхронных маршрутов в приложении Flask без каких-либо компромиссов в части производительности. Это позволяет сразу перейти к созданию прототипа даже одного обработчика асинхронного маршрута в существующем проекте Flask.

Почему не нужен ASGI?

Flask изначально создавался как синхронный веб-фреймворк, реализующая протокол WSGI (Интерфейс веб-серверного шлюза).

Во Flask каждый запрос обрабатывается индивидуально в процессе-исполнителе. Асинхронная функциональность, добавленная во Flask 2.0, всегда находится в пределах одного обрабатываемого запроса:

Не забывайте, что даже невзирая на возможность исполнения асинхронного кода во Flask, его исполнение осуществляется в контексте синхронного фреймворка. Другими словами, пусть даже можно исполнять разные асинхронные задачи в одном запросе (request), каждая асинхронная задача должна завершиться до отправки ответа обратно. Поэтому не во всех ситуациях асинхронные маршруты будут по-настоящему выгодны. Есть и другие веб-фреймворки Python, в которых поддерживается ASGI (Интерфейс асинхронного серверного шлюза) с поддержкой асинхронных стеков вызовов, что позволяет исполнять маршруты параллельно:

Фреймворк Асинхронный стек запроса
(например, поддержка ASGI)
Асинхронные маршруты
Quart Да Да
Django >= 3.2 Да Да
FastAPI Да Да
Flask >= 2.0 Нет Да

Когда следует использовать async?

Асинхронное исполнение все чаще доминирует в обсуждениях и генерирует заголовки, но оно не будет лучшим подходом в каждой ситуации.

Оно идеально подходит для операций, ориентированных на ввод/вывод, когда выполняются оба следующих условия:

1. Есть ряд операций

2. Полное исполнение каждой операции занимает менее нескольких секунд

1. Запросы по HTTP или API

2. Взаимодействие с базой данных

3. Работа с файловой системой

Оно не подходит для фоновых и долгосрочных задач, а также для операций, связанных с работой процессора, например:

1. Запуск моделей машинного обучения

2. Обработка изображений или PDF

3. Выполнение резервного копирования

Такие задачи лучше реализовывать с помощью очереди задач (task queue), например Celery, для управления отдельными длительными задачами.

Асинхронные HTTP-вызовы

Асинхронный подход действительно приносит дивиденды, когда нужно сделать ряд HTTP-запросов на внешний веб-сайт или API. Каждому запросу потребуется значительное время на получение ответа. Вследствие данного времени ожидания пользователи посчитают веб-приложение медленным или инертным.

Вместо того, чтобы делать внешние запросы по одному (через пакет requests), можно значительно ускорить процесс, используя async / await .

В синхронном подходе выполняется внешний вызов API (например, через GET), а затем приложение ожидает ответа. Период ожидания ответа называется задержкой (latency), которая зависит от качества подключения к Интернету и времени отклика сервера. В данном случае задержка, возможно, составит около 0,2–1,5 секунды на запрос.

В асинхронном подходе выполняется внешний вызов API, а затем обработка продолжается переходом к следующему вызову API. Полученный от внешнего сервера ответ сразу обрабатывается. Так ресурсы задействуются гораздо эффективнее.

В целом, асинхронное программирование идеально подходит для ситуаций наподобие вышеописанной, когда выполняется несколько внешних вызовов и приходится подолгу ждать ответов из интерфейса ввода/вывода.

Обработчик асинхронных маршрутов

aiohttp – пакет, в котором asyncio (асинхронный ввод/вывод) используется для создания асинхронных http-клиентов и серверов. Для тех, кто знаком с пакетом requests для синхронного выполнения HTTP-вызовов, aiohttp будет аналогичным пакетом, ориентированным на асинхронные HTTP-вызовы.

Рассмотрим пример использования aiohttp в маршруте Flask:

urls = ['https://www.kennedyrecipes.com', 'https://www.kennedyrecipes.com/breakfast/pancakes/', 'https://www.kennedyrecipes.com/breakfast/honey_bran_muffins/'] # вспомогательные функции async def fetch_url(session, url): """извлечь указанный URL-адрес, используя указанную сессию aiohttp.""" response = await session.get(url) return # маршруты @app.route('/async_get_urls_v2') async def async_get_urls_v2(): """асинхронно извлекаем список url-адресов.""" async with ClientSession() as session: tasks = [] for url in urls: task = asyncio.create_task(fetch_url(session, url)) tasks.append(task) sites = await asyncio.gather(*tasks) # Generate the HTML response response = '

URLs:

' for site in sites: response += f"

URL: --- Status Code: " return response

Исходный код данного примера есть в репозитории flask-async на gitlab.

В сопрограмме async_get_urls_v2() используется стандартный паттерн asyncio:

1. Создаем несколько асинхронных задач ( asyncio.create_task() )

2. Запускаем их одновременно ( asyncio.gather() )

Тестирование асинхронных маршрутов

Можно протестировать обработчика асинхронного маршрута по стандартному подходу с pytest, так как Flask берет на себя всю асинхронную обработку:

@pytest.fixture(scope='module') def test_client(): # создаем тестового клиента, используя приложение flask with app.test_client() as testing_client: yield testing_client # именно здесь происходит тестирование! def test_async_get_urls_v2(test_client): """ ДАНО: тестовый клиент Flask ЕСЛИ: идет запрос на страницу '/async_get_urls_v2' (GET) ТО: проверить правильность ответа """ response = test_client.get('/async_get_urls_v2') assert response.status_code == 200 assert b'URLs' in response.data

Это будет базовая проверка на верность ответа с URL-адреса /async_get_urls_v2 через зафиксированный объект (fixture) test_client .

Еще примеры с async

Обратные вызовы запроса тоже может выполнять асинхронно во Flask 2.0:

# вспомогательные функции async def load_user_from_database(): """имитирует длительную операцию по загрузке пользователя из внешней базы данных.""" app.logger.info('загрузка пользователя из базы данных. ') await asyncio.sleep(1) async def log_request_status(): """имитирует длительную операцию по регистрации статуса запроса.""" app.logger.info('регистрация статуса запроса. ') await asyncio.sleep(1) # обратные вызовы запроса @app.before_request async def app_before_request(): await load_user_from_database() @app.after_request async def app_after_request(response): await log_request_status() return response

Аналогично с обработчиками ошибок:

# вспомогательные функции async def send_error_email(error): """имитирует длительную операцию по регистрации ошибки.""" app.logger.info('регистрация статуса ошибки. ') await asyncio.sleep(1) # обработчики ошибок @app.errorhandler(500) async def internal_error(error): await send_error_email(error) return '500 error', 500

Async во Flask 1.x

При работе с Flask 1.x можно имитировать поддержку async из Flask 2.0, используя asyncio.run() для управления асинхронным циклом с ожиданием событий:

# вспомогательные функции async def fetch_url(session, url): """извлекаем указанный URL-адрес через указанную сессию aiohttp.""" response = await session.get(url) return async def get_all_urls(): """асинхронно извлекаем список url-адресов с помощью aiohttp.""" async with ClientSession() as session: tasks = [] for url in urls: task = asyncio.create_task(fetch_url(session, url)) tasks.append(task) results = await asyncio.gather(*tasks) return results # маршруты @app.route('/async_get_urls_v1') def async_get_urls_v1(): """асинхронно извлекаем список url-адресов (работает во Flask 1.1.x при использовании потоков).""" sites = asyncio.run(get_all_urls()) # генерируем ответ html response = '

URLs:

' for site in sites: response += f"

URL: --- Status Code: " return response

Функциональность, которая реализуется в сопрограмме get_all_urls() , похожа на ту, которую мы рассмотрели для обработчика маршрута async_get_urls_v2() .

Как это все работает?

Асинхронный цикл с ожиданием событий будет правильно работать во flask 1.x, если запускать приложение Flask с использованием потоков (именно они являются процессами-исполнителями по умолчанию в gunicorn, uWSGI и в сервере разработки Flask):

Каждый поток будет запускать экземпляр приложения Flask при обработке запроса. Внутри каждого потока создается отдельный асинхронный цикл с ожиданием событий для выполнения любых асинхронных операций.

Тестирование сопрограмм

Вот так можно использовать pytest-asyncio для тестирования асинхронного кода:

@pytest.mark.asyncio async def test_fetch_url(): """ ДАНО: "асинхронный цикл с ожиданием событий ЕСЛИ: вызывается сопрограмма fetch_url() ТО: проверить правильность ответа """ async with aiohttp.ClientSession() as session: result = await fetch_url(session, 'https://www.kennedyrecipes.com/baked_goods/bagels/') assert str(result['url']) == 'https://www.kennedyrecipes.com/baked_goods/bagels/' assert int(result['status']) == 200

В данной тестовой функции используется декоратор @pytest.mark.asyncio , который указывает pytest на необходимость исполнить сопрограмму как асинхронную задачу через асинхронный цикл с ожиданием событий.

Заключение

Во Flask 2.0 добавлена очень удачная поддержка асинхронного функционала. Однако асинхронный код следует использовать только, если он дает преимущество перед эквивалентным синхронным кодом. Мы увидели, что одним из примеров осмысленного использования асинхронного исполнения является ситуация, когда нужно сделать несколько HTTP-вызовов в обработчике маршрута.

Я сделал несколько тестов тайминга асинхронной функции из Flask 2.0 ( async_get_urls_v2() ) в сравнении с эквивалентной синхронной функцией. Выполнено по десять вызовов на каждый маршрут:

Тип Среднее время (секунды) Медианное время (секунды)
Синхронный 4,071443 3,419016
Асинхронный 0,531841 0,406068

Асинхронная версия быстрее примерно в 8 раз! Итак, если вам нужно сделать несколько внешних HTTP-вызовов в обработчике маршрута, повышенная сложность использования asyncio и aiohttp определенно оправдана вследствие значительного сокращения срока исполнения.

Если хотите лучше изучить Flask, обязательно посмотрите мой курс: Developing Web Applications with Python and Flask.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *