Энтони Уильямс - Параллельное программирование на С++ в действии. Практика разработки многопоточных программ
- Название:Параллельное программирование на С++ в действии. Практика разработки многопоточных программ
- Автор:
- Жанр:
- Издательство:ДМК Пресс
- Год:2012
- Город:Москва
- ISBN:978-5-94074-448-1
- Рейтинг:
- Избранное:Добавить в избранное
-
Отзывы:
-
Ваша оценка:
Энтони Уильямс - Параллельное программирование на С++ в действии. Практика разработки многопоточных программ краткое содержание
Книга «Параллельное программирование на С++ в действии» не предполагает предварительных знаний в этой области. Вдумчиво читая ее, вы научитесь писать надежные и элегантные многопоточные программы на С++11. Вы узнаете о том, что такое потоковая модель памяти, и о том, какие средства поддержки многопоточности, в том числе запуска и синхронизации потоков, имеются в стандартной библиотеке. Попутно вы познакомитесь с различными нетривиальными проблемами программирования в условиях параллелизма.
Параллельное программирование на С++ в действии. Практика разработки многопоточных программ - читать онлайн бесплатно полную версию (весь текст целиком)
Интервал:
Закладка:
В следующем листинге приведена простая реализация барьера.
Листинг 8.12.Простой класс барьера
class barrier {
unsigned const count;
std::atomic spaces;
std::atomic generation;
public:
explicit barrier (unsigned count_) : ←
(1)
count(count_), spaces(count), generation(0) {}
void wait() {
unsigned const my_generation = generation; ←
(2)
if (!--spaces) {←
(3)
spaces = count;←
(4)
++generation; ←
(5)
} else {
while (generation == my_generation) ←
(6)
std::this_thread::yield(); ←
(7)
}
}
};
Здесь при конструировании барьера мы указываем количество «мест» (1), которое сохраняется в переменной count. В начале количество мест у барьера spaces равно count
. Когда очередной поток выполняет функцию wait()
, значение spaces
уменьшается на единицу (3). Как только оно обращается в нуль, количество мест возвращается к исходному значению count
(4), а переменная generation
увеличивается, что служит для других потоков сигналом к продолжению работы (5). Если число свободных мест больше нуля, то поток должен ждать. В этой реализации используется простой спинлок (6), который сравнивает текущее значение generation
с тем, что было запомнено в начале wait()
(2). Поскольку generation
изменяется только после того, как все потоки подошли к барьеру (5), мы можем во время ожидания уступить процессор с помощью yield()
(7), чтобы ожидающий поток не потреблял процессорное время.
Эта реализация действительно очень простая — в ней для ожидания используется спинлок, поэтому она не идеальна для случая, когда потоки могут ждать долго, и совсем не годится в ситуации, когда в каждый момент времени может существовать более count потоков, выполнивших wait()
. Если требуется, чтобы и в этих случаях барьер работал правильно, то следует использовать более надежную (но и более сложную) реализацию. Кроме того, я ограничился последовательно согласованными операциями над атомарными переменными, потому что они проще для понимания, но вы, возможно, захотите ослабить ограничения на порядок доступа к памяти. Такая глобальная синхронизация дорого обходится в массивно параллельных архитектурах, так как строку кэша, содержащую состояние барьера, приходится передавать всем участвующим процессорам (см. обсуждение перебрасывания кэша в разделе 8.2.2). Поэтому нужно тщательно продумать, является ли это решение оптимальным.
Как бы то ни было, барьер — это именно то, что нам необходимо в данном случае; имеется фиксированное число потоков, которые должны синхронизироваться на каждой итерации цикла. Ну почти фиксированное. Как вы, наверное, помните, элементы, расположенные близко к началу списка, получают окончательные значения всего через пару шагов. Это означает, что либо все потоки должны крутиться в цикле, пока не будет обработан весь диапазон, либо барьер должен поддерживать выпадение потоков, уменьшая значение count
. Я предпочитаю второй вариант, чтобы не заставлять потоки выполнять бессмысленную работу в ожидании завершения последнего шага.
Но тогда нужно сделать count
атомарной переменной, чтобы ее можно было изменять из нескольких потоков без внешней синхронизации:
std::atomic count;
Инициализация не меняется, но при переустановке spaces
теперь нужно явно загружать spaces
с помощью операции load()
:
spaces = count.load();
Больше никаких изменений в wait()
вносить не надо, но необходима новая функция-член для уменьшения count. Назовем ее done_waiting()
, потому что с ее помощью поток заявляет, что больше ждать не будет.
void done_waiting() {
--count; ←
(1)
if (!--spaces) { ←
(2)
spaces = count.load();←
(3)
++generation;
}
}
Прежде всего мы уменьшаем count
(1)
, чтобы при следующей переустановке spaces
было отражено новое, меньшее прежнего, количество ожидающих потоков. Затем уменьшаем количество свободных мест spaces
(2). Если этого не сделать, то остальные потоки будут ждать вечно, так как при инициализации spaces
было задано старое, большее, значение. Если текущий поток является последним в партии, то количество мест нужно переустановить и увеличить generation
на единицу (3)— так же, как в wait()
. Основное отличие от wait()
заключается в том, что поток не должен ждать — он же сам объявляет, что больше ждать не будет!
Теперь мы готовы написать вторую реализацию алгоритма вычисления частичных сумм. На каждом шаге каждый поток вызывает функцию wait()
барьера, чтобы все потоки пересекли его вместе, а, закончив работу, поток вызывает функцию done_waiting()
, чтобы уменьшить счетчик. Если наряду с исходным диапазоном использовать второй буфер, то барьер обеспечивает необходимую синхронизацию. На каждом шаге потоки либо читают исходный диапазон и записывают новое значение в буфер, либо наоборот — читают буфер и записывают новое значение в исходный диапазон. На следующем шаге исходный диапазон и буфер меняются местами. Тем самым мы гарантируем отсутствие гонки между операциями чтения и записи в разных потоках. Перед выходом из цикла каждый поток должен позаботиться о записи окончательного значения в исходный диапазон. В листинге 8.13 все это собрано воедино.
Листинг 8.13.Параллельная реализация partial_sum
методом попарных обновлений
struct barrier {
std::atomic count;
std::atomic spaces;
std::atomic generation;
barrier(unsigned count_) :
count(count_), spaces(count_), generation(0) {}
void wait() {
unsigned const gen = generation.load();
if (!--spaces) {
spaces = count.load();
++generation;
} else {
while (generation.load() == gen) {
std::this_thread::yield();
}
}
}
void done_waiting() {
--count;
if (!--spaces) {
spaces = count.load();
++generation;
}
}
};
template
void parallel_partial_sum(Iterator first, Iterator last) {
typedef typename Iterator::value_type value_type;
struct process_element { ←
(1)
void operator()(Iterator first, Iterator last,
std::vector& buffer,
unsigned i, barrier& b) {
value_type& ith_element = *(first + i);
bool update_source = false;
for (unsigned step = 0, stride = 1;
stride <= i; ++step, stride *= 2) {
value_type const& source = (step % 2) ? ←
(2)
buffer[i] : ith_element;
value_type& dest = (step % 2) ?
ith_element:buffer[i];
value_type const& addend = (step % 2) ? ←
(3)
buffer[i - stride] : *(first + i – stride);
dest = source + addend; ←
(4)
update_source = !(step % 2);
b.wait(); ←
(5)
}
if (update_source) { ←
(6)
Интервал:
Закладка: