Энтони Уильямс - Параллельное программирование на С++ в действии. Практика разработки многопоточных программ
- Название:Параллельное программирование на С++ в действии. Практика разработки многопоточных программ
- Автор:
- Жанр:
- Издательство:ДМК Пресс
- Год:2012
- Город:Москва
- ISBN:978-5-94074-448-1
- Рейтинг:
- Избранное:Добавить в избранное
-
Отзывы:
-
Ваша оценка:
Энтони Уильямс - Параллельное программирование на С++ в действии. Практика разработки многопоточных программ краткое содержание
Книга «Параллельное программирование на С++ в действии» не предполагает предварительных знаний в этой области. Вдумчиво читая ее, вы научитесь писать надежные и элегантные многопоточные программы на С++11. Вы узнаете о том, что такое потоковая модель памяти, и о том, какие средства поддержки многопоточности, в том числе запуска и синхронизации потоков, имеются в стандартной библиотеке. Попутно вы познакомитесь с различными нетривиальными проблемами программирования в условиях параллелизма.
Параллельное программирование на С++ в действии. Практика разработки многопоточных программ - читать онлайн бесплатно полную версию (весь текст целиком)
Интервал:
Закладка:
В главе 4 мы уже познакомились с подходящим для решения проблемы средством. Для чего нам вообще нужны новые потоки? Для того чтобы вычислить результат и при этом учесть возможность возникновения исключений. Но это именно то , для чего предназначено сочетание std::packaged_task
и std::future
. В листинге 8.3 показан код, переписанный с использованием std::packaged_task
.
Листинг 8.3.Параллельная версия std::accumulate
с применением std::packaged_task
template
struct accumulate_block {
T operator()(Iterator first, Iterator last) {←
(1)
return std::accumulate(first, last, T()); ←
(2)
}
};
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread — 1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(
hardware_threads i = 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector > futures(num_threads-1);←
(3)
std::vector threads(num_threads — 1);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task task( ←
(4)
accumulate_block());
futures[i] = task.get_future(); ←
(5)
threads[i] =
std::thread(std::move(task), block_start, block_end);←
(6)
block_start = block_end;
}
T last_result = accumulate_block()(block_start, last); ←
(7)
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join));
T result = init; ←
(8)
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
result += futures[i].get(); ←
(9)
}
result += last_result; ←
(10)
return result;
}
Первое изменение заключается в том, что оператор вызова в accumulate_block
теперь возвращает результат по значению, а не принимает ссылку на место, где его требуется сохранить (1). Для обеспечения безопасности относительно исключений мы используем std::packaged_task
и std::future
, поэтому можем воспользоваться этим и для передачи результата. Правда, для этого требуется при вызове std::accumulate
(2)явно передавать сконструированный по умолчанию экземпляр T
, а не использовать повторно предоставленное значение result
, но это не слишком существенное изменение.
Далее, вместо того заводить вектор результатов, мы создаем вектор futures
(3), в котором храним объекты std::future
для каждого запущенного потока. В цикле запуска потоков мы сначала создаем задачу для accumulate_block
(4). В классе std::packaged_task
объявлена задача, которая принимает два объекта Iterator
и возвращает T
, а это именно то, что делает наша функция. Затем мы получаем будущий результат для этой задачи (5)и исполняем ее в новом потоке, передавая начало и конец обрабатываемого блока (6). Результат работы задачи, равно как и исключение, если оно возникнет, запоминается в объекте future
.
Поскольку используются будущие результаты, массива results
больше нет, поэтому мы должны сохранить результат обработки последнего блока в переменной (7), а не в элементе массива. Кроме того, поскольку мы получаем значения из будущих результатов, проще не вызывать std::accumulate
, а написать простой цикл for
, в котором к переданному начальному значению (8)будут прибавляться значения, полученные из каждого будущего результата (9). Если какая-то задача возбудит исключение, то оно будет запомнено в будущем результате и повторно возбуждено при обращении к get()
. Наконец, перед тем как возвращать окончательный результат вызывающей программе, мы прибавляем результат обработки последнего блока (10).
Таким образом, мы устранили одну из потенциальных проблем: исключения, возбужденные в рабочих потоках, повторно возбуждаются в главном. Если исключения возникнут в нескольких рабочих потоках, то вверх распространится только одно, но это не очень страшно. Если вы считаете, что это все-таки важно, то можете воспользоваться чем-то вроде класса std::nested_exception
, чтобы собрать все такие исключения и передать их главному потоку.
Осталось решить проблему утечки потоков в случае, когда исключение возникает между моментом запуска первого потока и присоединением всех запущенных. Для этого проще всего перехватить любое исключение, дождаться присоединения потоков, которые все еще находятся в состоянии joinable()
, а потом возбудить исключение повторно:
try {
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
// ... как и раньше
}
T last_result = accumulate_block()(block_start, last);
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join));
} catch (...) {
for (unsigned long i = 0; i < (num_thread - 1); ++i) {
if (threads[i].joinable())
thread[i].join();
}
throw;
}
Теперь все работает. Все потоки будут присоединены вне зависимости от того, как завершилась обработка блока. Однако блоки try-catch
выглядят некрасиво, и часть кода дублируется. Мы присоединяем потоки как в «нормальной» ветке, так и в блоке catch
. Дублирование кода — вещь почти всегда нежелательная, потому что изменения придётся вносить в несколько мест. Давайте лучше перенесём этот код в деструктор — ведь именно такова идиома очистки ресурсов в С++. Вот как выглядит этот класс:
class join_threads {
std::vector& threads;
public:
explicit join_threads(std::vector& threads_):
threads(threads_) {}
~join_threads() {
for (unsigned long i = 0; i < threads.size(); ++i) {
if (threads[i].joinable())
threads[i].join();
}
}
};
Это похоже на класс thread_guard
из листинга 2.3, только на этот раз мы «охраняем» целый вектор потоков. Теперь наш код упрощается.
Листинг 8.4.Безопасная относительно исключений версия std::accumulate
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(
hardware_threads i = 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector > futures(num_threads — 1);
std::vector threads(num_threads - 1);
join_threads joiner(threads); ←
(1)
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task task(
accumulate_block());
futures[i] = task.get_future();
Интервал:
Закладка: