Энтони Уильямс - Параллельное программирование на С++ в действии. Практика разработки многопоточных программ
- Название:Параллельное программирование на С++ в действии. Практика разработки многопоточных программ
- Автор:
- Жанр:
- Издательство:ДМК Пресс
- Год:2012
- Город:Москва
- ISBN:978-5-94074-448-1
- Рейтинг:
- Избранное:Добавить в избранное
-
Отзывы:
-
Ваша оценка:
Энтони Уильямс - Параллельное программирование на С++ в действии. Практика разработки многопоточных программ краткое содержание
Книга «Параллельное программирование на С++ в действии» не предполагает предварительных знаний в этой области. Вдумчиво читая ее, вы научитесь писать надежные и элегантные многопоточные программы на С++11. Вы узнаете о том, что такое потоковая модель памяти, и о том, какие средства поддержки многопоточности, в том числе запуска и синхронизации потоков, имеются в стандартной библиотеке. Попутно вы познакомитесь с различными нетривиальными проблемами программирования в условиях параллелизма.
Параллельное программирование на С++ в действии. Практика разработки многопоточных программ - читать онлайн бесплатно полную версию (весь текст целиком)
Интервал:
Закладка:
Листинг 9.2.Пул потоков, ожидающий завершения задачи
class function_wrapper {
struct impl_base {
virtual void call() = 0;
virtual ~impl_base() {}
};
std::unique_ptr impl;
template
struct impl_type: impl_base {
F f;
impl_type(F&& f_): f(std::move(f_)) {}
void call() { f(); }
};
public:
template function_wrapper(F&& f):
impl(new impl_type(std::move(f))) {}
void operator()() { impl->call(); }
function_wrapper() = default;
function_wrapper(function_wrapper&& other):
impl(std::move(other.impl)) {}
function_wrapper& operator=(function_wrapper&& other) {
impl = std::move(other.impl);
return *this;
}
function_wrapper(const function_wrapper&) = delete;
function_wrapper(function_wrapper&) = delete;
function_wrapper& operator=(const function_wrapper&) = delete;
};
class thread_pool {
thread_safe_queue work_queue;←┐
│
Используем
void worker_thread() │
function_
{ │
wrapper
while (!done) │
вместо std::
{ │
function
function_wrapper task; ←┘
if (work_queue.try_pop(task))
task();
else
std::this_thread::yield();
}
}
public:
template
std::future::type>←
(1)
submit(FunctionType f) {
typedef typename std::result_of::type
result_type; ←
(2)
std::packaged_task task(std::move(f));←
(3)
std::future res(task.get_future()); ←
(4)
work_queue.push(std::move(task)); ←
(5)
return res; ←
(6)
}
// остальное, как и раньше
};
Прежде всего отметим, что модифицированная функция submit()
(1)возвращает объект std::future<>
, который будет содержать возвращенное задачей значение и позволит вызывающей программе ждать ее завершения. Для этого нам необходимо знать тип значения, возвращаемого переданной функцией f
, и здесь на помощь приходит шаблон std::result_of<>
: std::result_of::type
— это тип результата, возвращенного вызовом объекта типа FunctionType
(например, f
) без аргументов. Выражение std::result_of<>
мы используем также в определении псевдонима типа result_type
(2)внутри функции.
Затем f
обертывается объектом std::packaged_task
(3), потому что f
— функция или допускающий вызов объект, который не принимает параметров и возвращает результат типа result_type
. Теперь мы можем получить будущий результат из std::packaged_task<>
(4), перед тем как помещать задачу в очередь (5)и возвращать будущий результат (6). Отметим, что при помещении задачи в очередь мы должны использовать функцию std::move()
, потому что класс std::packaged_task<> не допускает копирования. Именно поэтому в очереди хранятся объекты function_wrapper
, а не объекты типа.
Этот пул позволяет ожидать завершения задач и получать возвращаемые ими результаты. В листинге ниже показано, как выглядит функция parallel_accumulate
, работающая с таким пулом потоков.
Листинг 9.3.Функция parallel_accumulate
, реализованная с помощью пула потоков, допускающего ожидание задач
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const block_size = 25;
unsigned long const num_blocks =
(length + block_size - 1) / block_size; ←
(1)
std::vector > futures(num_blocks-1);
thread_pool pool;
Iterator block_start = first;
for (unsigned long i = 0; i < (num_blocks - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
futures[i] = pool.submit(accumulate_block());←
(2)
block_start = block_end;
}
T last_result =
accumulate_block()(block_start, last);
T result = init;
for (unsigned long i = 0; i < (num_blocks - 1); ++i) {
result += futures[i].get();
}
result += last_result;
return result;
}
Сравнивая этот код с листингом 8.4, следует обратить внимание на две вещи. Во-первых, мы работаем с количеством блоков ( num_blocks
(1)), а не потоков. Чтобы в полной мере воспользоваться масштабируемостью пула потоков, мы должны разбить работу на максимально мелкие блоки, с которыми имеет смысл работать параллельно. Если потоков в пуле немного, то каждый поток будет обрабатывать много блоков, но по мере роста числа потоков, поддерживаемых оборудованием, будет расти и количество блоков, обрабатываемых параллельно.
Но, выбирая «максимально мелкие блоки, с которыми имеет смысл работать параллельно», будьте осторожны. Отправка задачи пулу потоков, выбор ее рабочим потоком из очереди и передача возвращенного значения с помощью std::future<>
— всё это операции не бесплатные, и для совсем мелких задач они не окупятся. Если размер задачи слишком мал, то программа, в которой используется пул потоков, может работать медленнее, чем однопоточная.
В предположении, что размер блока выбран разумно, вам не надо заботиться об упаковке задач, получении будущих результатов и хранении объектов std::thread
, чтобы впоследствии их можно было присоединить; все это пул берет на себя. Вам остается лишь вызвать функцию submit()
, передав ей свою задачу (2).
Пул потоков обеспечивает также безопасность относительно исключений. Любое возбужденное задачей исключение передается через будущий результат, возвращенный submit()
, а, если выход из функции происходит в результате исключения, то деструктор пула потоков снимет еще работающие задачи и дождется завершения потоков, входящих в пул.
Эта схема работает для простых случаев, когда задачи независимы. Но не годится, когда одни задачи зависят от других, также переданных пулу.
9.1.3. Задачи, ожидающие других задач
В этой книге я уже неоднократно приводил пример алгоритма Quicksort. Его идея проста — подлежащие сортировке данные разбиваются на две части: до и после опорного элемента (в смысле заданной операции сравнения). Затем обе части рекурсивно сортируются и объединяются для получения полностью отсортированной последовательности. При распараллеливании алгоритма надо позаботиться о том, чтобы рекурсивные вызовы задействовали имеющийся аппаратный параллелизм.
В главе 4, где этот пример впервые был представлен, мы использовали std::async
для выполнения одного из рекурсивных вызовов на каждом шаге и оставляли библиотеке решение о том, запускать ли новый поток или сортировать синхронно при обращении к get()
. Этот подход неплохо работает — каждая задача либо выполняется в отдельном потоке, либо в тот момент, когда нужны ее результаты.
В главе 8 мы переработали эту реализацию, продемонстрировав альтернативный подход, когда количество потоков фиксировано и определяется уровнем аппаратного параллелизма. В данном случае мы воспользовались стеком ожидающих сортировки блоков. Разбивая на части предложенные для сортировки данные, каждый поток помещал один блок в стек, а второй сортировал непосредственно. Бесхитростное ожидание завершения сортировки второго блока могло бы закончиться взаимоблокировкой, потому что число потоков ограничено, и некоторые из них ждут. Очень легко оказаться в ситуации, когда все потоки ждут завершения сортировки блоков, и ни один ничего не делает. Тогда мы решили эту проблему, заставив поток извлекать блоки из стека и сортировать их, пока тот конкретный блок, которого он ждет, еще не отсортирован.
Читать дальшеИнтервал:
Закладка: